Переглянути джерело

fix(buffer_worker): log unknown async replies

Zaiming (Stone) Shi 3 роки тому
батько
коміт
dbfdeec5e9

+ 82 - 18
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -475,6 +475,7 @@ flush(Data0) ->
     ?tp(buffer_worker_flush, #{queue_count => CurrentCount, is_full => IsFull}),
     case {CurrentCount, IsFull} of
         {0, _} ->
+            ?tp(buffer_worker_queue_drained, #{inflight => inflight_num_batches(InflightTID)}),
             {keep_state, Data1};
         {_, true} ->
             ?tp(buffer_worker_flush_but_inflight_full, #{}),
@@ -918,7 +919,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re
                 inflight_tid => InflightTID,
                 request_ref => Ref,
                 query_opts => QueryOpts,
-                query => minimize(Query)
+                min_query => minimize(Query)
             },
             IsRetriable = false,
             WorkerMRef = undefined,
@@ -951,7 +952,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re
                 inflight_tid => InflightTID,
                 request_ref => Ref,
                 query_opts => QueryOpts,
-                batch => minimize(Batch)
+                min_batch => minimize(Batch)
             },
             Requests = lists:map(
                 fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch
@@ -967,19 +968,33 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re
     ).
 
 handle_async_reply(
+    #{
+        request_ref := Ref,
+        inflight_tid := InflightTID
+    } = ReplyContext,
+    Result
+) ->
+    case maybe_handle_unknown_async_reply(InflightTID, Ref) of
+        discard ->
+            ok;
+        continue ->
+            handle_async_reply1(ReplyContext, Result)
+    end.
+
+handle_async_reply1(
     #{
         request_ref := Ref,
         inflight_tid := InflightTID,
         resource_id := Id,
         worker_index := Index,
         buffer_worker := Pid,
-        query := ?QUERY(_, _, _, ExpireAt) = _Query
+        min_query := ?QUERY(_, _, _, ExpireAt) = _Query
     } = ReplyContext,
     Result
 ) ->
     ?tp(
         handle_async_reply_enter,
-        #{batch_or_query => [_Query], ref => Ref}
+        #{batch_or_query => [_Query], ref => Ref, result => Result}
     ),
     Now = now_(),
     case is_expired(ExpireAt, Now) of
@@ -1002,7 +1017,7 @@ do_handle_async_reply(
         worker_index := Index,
         buffer_worker := Pid,
         inflight_tid := InflightTID,
-        query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query
+        min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query
     },
     Result
 ) ->
@@ -1031,16 +1046,30 @@ do_handle_async_reply(
     ok = maybe_flush_after_async_reply(IsFullBefore).
 
 handle_async_batch_reply(
+    #{
+        inflight_tid := InflightTID,
+        request_ref := Ref
+    } = ReplyContext,
+    Result
+) ->
+    case maybe_handle_unknown_async_reply(InflightTID, Ref) of
+        discard ->
+            ok;
+        continue ->
+            handle_async_batch_reply1(ReplyContext, Result)
+    end.
+
+handle_async_batch_reply1(
     #{
         inflight_tid := InflightTID,
         request_ref := Ref,
-        batch := Batch
+        min_batch := Batch
     } = ReplyContext,
     Result
 ) ->
     ?tp(
         handle_async_reply_enter,
-        #{batch_or_query => Batch, ref => Ref}
+        #{batch_or_query => Batch, ref => Ref, result => Result}
     ),
     Now = now_(),
     IsFullBefore = is_inflight_full(InflightTID),
@@ -1060,8 +1089,7 @@ handle_async_batch_reply(
     ok = maybe_flush_after_async_reply(IsFullBefore).
 
 handle_async_batch_reply2([], _, _, _) ->
-    %% e.g. if the driver evaluates the callback more than once
-    %% which should really be a bug
+    %% should have caused the unknown_async_reply_discarded
     ok;
 handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
     ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef) = Inflight,
@@ -1070,7 +1098,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
         worker_index := Index,
         inflight_tid := InflightTID,
         request_ref := Ref,
-        batch := Batch
+        min_batch := Batch
     } = ReplyContext,
     %% All batch items share the same HasBeenSent flag
     %% So we just take the original flag from the ReplyContext batch
@@ -1096,7 +1124,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
             %% some queries are not expired, put them back to the inflight batch
             %% so it can be either acked now or retried later
             ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired),
-            ok = do_handle_async_batch_reply(ReplyContext#{batch := RealNotExpired}, Result)
+            ok = do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Result)
     end,
     ok.
 
@@ -1107,7 +1135,7 @@ do_handle_async_batch_reply(
         worker_index := Index,
         inflight_tid := InflightTID,
         request_ref := Ref,
-        batch := Batch,
+        min_batch := Batch,
         query_opts := QueryOpts
     },
     Result
@@ -1123,7 +1151,7 @@ do_handle_async_batch_reply(
         nack ->
             %% Keep retrying.
             ok = mark_inflight_as_retriable(InflightTID, Ref),
-            ?MODULE:block(Pid);
+            ok = ?MODULE:block(Pid);
         ack ->
             ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts)
     end.
@@ -1150,6 +1178,32 @@ maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) ->
     %% the inflight table was full before handling aync reply
     ok = ?MODULE:flush_worker(self()).
 
+%% check if the async reply is valid.
+%% e.g. if a connector evaluates the callback more than once:
+%% 1. If the request was previously deleted from inflight table due to
+%%    either succeeded previously or expired, this function logs a
+%%    warning message and returns 'discard' instruction.
+%% 2. If the request was previously failed and now pending on a retry,
+%%    then this function will return 'continue' as there is no way to
+%%    tell if this reply is stae or not.
+maybe_handle_unknown_async_reply(InflightTID, Ref) ->
+    try ets:member(InflightTID, Ref) of
+        true ->
+            %% NOTE: this does not mean the
+            continue;
+        false ->
+            ?tp(
+                warning,
+                unknown_async_reply_discarded,
+                #{inflight_key => Ref}
+            ),
+            discard
+    catch
+        error:badarg ->
+            %% shutdown ?
+            discard
+    end.
+
 %%==============================================================================
 %% operations for queue
 queue_item_marshaller(Bin) when is_binary(Bin) ->
@@ -1287,7 +1341,7 @@ inflight_append(
     InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
     IsNew = ets:insert_new(InflightTID, InflightItem),
     BatchSize = length(Batch),
-    IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}),
+    IsNew andalso inc_inflight(InflightTID, BatchSize),
     emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
     ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
     ok;
@@ -1302,7 +1356,7 @@ inflight_append(
     Query = mark_as_sent(Query0),
     InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
     IsNew = ets:insert_new(InflightTID, InflightItem),
-    IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}),
+    IsNew andalso inc_inflight(InflightTID, 1),
     emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
     ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
     ok;
@@ -1318,6 +1372,8 @@ mark_inflight_as_retriable(undefined, _Ref) ->
     ok;
 mark_inflight_as_retriable(InflightTID, Ref) ->
     _ = ets:update_element(InflightTID, Ref, {?RETRY_IDX, true}),
+    %% the old worker's DOWN should not affect this inflight any more
+    _ = ets:update_element(InflightTID, Ref, {?WORKER_MREF_IDX, erased}),
     ok.
 
 %% Track each worker pid only once.
@@ -1367,7 +1423,7 @@ ack_inflight(InflightTID, Ref, Id, Index) ->
     IsKnownRef = (Count > 0),
     case IsKnownRef of
         true ->
-            ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
+            ok = dec_inflight(InflightTID, Count),
             emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID));
         false ->
             ok
@@ -1390,9 +1446,17 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) ->
     ok.
 
 %% used to update a batch after dropping expired individual queries.
-update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) when NumExpired > 0 ->
+update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) ->
     _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}),
-    _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -NumExpired, 0, 0}),
+    ok = dec_inflight(InflightTID, NumExpired),
+    ok.
+
+inc_inflight(InflightTID, Count) ->
+    _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}),
+    ok.
+
+dec_inflight(InflightTID, Count) when Count > 0 ->
+    _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
     ok.
 
 %%==============================================================================

+ 47 - 10
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -135,11 +135,11 @@ on_query(_InstId, get_counter, #{pid := Pid}) ->
     after 1000 ->
         {error, timeout}
     end;
-on_query(_InstId, {sleep, For}, #{pid := Pid}) ->
+on_query(_InstId, {sleep_before_reply, For}, #{pid := Pid}) ->
     ?tp(connector_demo_sleep, #{mode => sync, for => For}),
     ReqRef = make_ref(),
     From = {self(), ReqRef},
-    Pid ! {From, {sleep, For}},
+    Pid ! {From, {sleep_before_reply, For}},
     receive
         {ReqRef, Result} ->
             Result
@@ -159,9 +159,9 @@ on_query_async(_InstId, block_now, ReplyFun, #{pid := Pid}) ->
 on_query_async(_InstId, {big_payload, Payload}, ReplyFun, #{pid := Pid}) ->
     Pid ! {big_payload, Payload, ReplyFun},
     {ok, Pid};
-on_query_async(_InstId, {sleep, For}, ReplyFun, #{pid := Pid}) ->
+on_query_async(_InstId, {sleep_before_reply, For}, ReplyFun, #{pid := Pid}) ->
     ?tp(connector_demo_sleep, #{mode => async, for => For}),
-    Pid ! {{sleep, For}, ReplyFun},
+    Pid ! {{sleep_before_reply, For}, ReplyFun},
     {ok, Pid}.
 
 on_batch_query(InstId, BatchReq, State) ->
@@ -173,10 +173,13 @@ on_batch_query(InstId, BatchReq, State) ->
         get_counter ->
             batch_get_counter(sync, InstId, State);
         {big_payload, _Payload} ->
-            batch_big_payload(sync, InstId, BatchReq, State)
+            batch_big_payload(sync, InstId, BatchReq, State);
+        {random_reply, Num} ->
+            %% async batch retried
+            random_reply(Num)
     end.
 
-on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, State) ->
+on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, #{pid := Pid} = State) ->
     %% Requests can be of multiple types, but cannot be mixed.
     case hd(BatchReq) of
         {inc_counter, _} ->
@@ -186,7 +189,11 @@ on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, State) ->
         block_now ->
             on_query_async(InstId, block_now, ReplyFunAndArgs, State);
         {big_payload, _Payload} ->
-            batch_big_payload({async, ReplyFunAndArgs}, InstId, BatchReq, State)
+            batch_big_payload({async, ReplyFunAndArgs}, InstId, BatchReq, State);
+        {random_reply, Num} ->
+            %% only take the first Num in the batch should be random enough
+            Pid ! {{random_reply, Num}, ReplyFunAndArgs},
+            {ok, Pid}
     end.
 
 batch_inc_counter(CallMode, InstId, BatchReq, State) ->
@@ -299,16 +306,31 @@ counter_loop(
             {{FromPid, ReqRef}, get} ->
                 FromPid ! {ReqRef, Num},
                 State;
-            {{sleep, _} = SleepQ, ReplyFun} ->
+            {{random_reply, RandNum}, ReplyFun} ->
+                %% usually a behaving  connector should reply once and only once for
+                %% each (batch) request
+                %% but we try to reply random results a random number of times
+                %% with 'ok' in the result, the buffer worker should eventually
+                %% drain the buffer (and inflights table)
+                ReplyCount = 1 + (RandNum rem 3),
+                Results = random_replies(ReplyCount),
+                lists:foreach(
+                    fun(Result) ->
+                        apply_reply(ReplyFun, Result)
+                    end,
+                    Results
+                ),
+                State;
+            {{sleep_before_reply, _} = SleepQ, ReplyFun} ->
                 apply_reply(ReplyFun, handle_query(async, SleepQ, Status)),
                 State;
-            {{FromPid, ReqRef}, {sleep, _} = SleepQ} ->
+            {{FromPid, ReqRef}, {sleep_before_reply, _} = SleepQ} ->
                 FromPid ! {ReqRef, handle_query(sync, SleepQ, Status)},
                 State
         end,
     counter_loop(NewState).
 
-handle_query(Mode, {sleep, For} = Query, Status) ->
+handle_query(Mode, {sleep_before_reply, For} = Query, Status) ->
     ok = timer:sleep(For),
     Result =
         case Status of
@@ -329,3 +351,18 @@ maybe_register(_Name, _Pid, false) ->
 
 apply_reply({ReplyFun, Args}, Result) when is_function(ReplyFun) ->
     apply(ReplyFun, Args ++ [Result]).
+
+random_replies(0) ->
+    [];
+random_replies(N) ->
+    [random_reply(N) | random_replies(N - 1)].
+
+random_reply(N) ->
+    case rand:uniform(3) of
+        1 ->
+            {ok, N};
+        2 ->
+            {error, {recoverable_error, N}};
+        3 ->
+            {error, {unrecoverable_error, N}}
+    end.

+ 63 - 1
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -1482,7 +1482,7 @@ t_retry_async_inflight_full(_Config) ->
                         AsyncInflightWindow * 2,
                         fun() ->
                             For = (ResumeInterval div 4) + rand:uniform(ResumeInterval div 4),
-                            {sleep, For}
+                            {sleep_before_reply, For}
                         end,
                         #{async_reply_fun => {fun(Res) -> ct:pal("Res = ~p", [Res]) end, []}}
                     ),
@@ -1507,6 +1507,68 @@ t_retry_async_inflight_full(_Config) ->
     ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)),
     ok.
 
+%% this test case is to ensure the buffer worker will not go crazy even
+%% if the underlying connector is misbehaving: evaluate async callbacks multiple times
+t_async_reply_multi_eval(_Config) ->
+    ResumeInterval = 20,
+    AsyncInflightWindow = 5,
+    emqx_connector_demo:set_callback_mode(async_if_possible),
+    {ok, _} = emqx_resource:create(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => ?FUNCTION_NAME},
+        #{
+            query_mode => async,
+            async_inflight_window => AsyncInflightWindow,
+            batch_size => 3,
+            batch_time => 10,
+            worker_pool_size => 1,
+            resume_interval => ResumeInterval
+        }
+    ),
+    ?check_trace(
+        #{timetrap => 15_000},
+        begin
+            %% block
+            ok = emqx_resource:simple_sync_query(?ID, block),
+
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    inc_counter_in_parallel(
+                        AsyncInflightWindow * 2,
+                        fun() ->
+                            Rand = rand:uniform(1000),
+                            {random_reply, Rand}
+                        end,
+                        #{}
+                    ),
+                    #{?snk_kind := buffer_worker_queue_drained, inflight := 0},
+                    ResumeInterval * 200
+                ),
+            ok
+        end,
+        [
+            fun(Trace) ->
+                ?assertMatch([#{inflight := 0}], ?of_kind(buffer_worker_queue_drained, Trace))
+            end
+        ]
+    ),
+    Metrics = tap_metrics(?LINE),
+    #{
+        counters := Counters,
+        gauges := #{queuing := 0, inflight := 0}
+    } = Metrics,
+    #{
+        matched := Matched,
+        success := Success,
+        dropped := Dropped,
+        late_reply := LateReply,
+        failed := Failed
+    } = Counters,
+    ?assertEqual(Matched, Success + Dropped + LateReply + Failed),
+    ok.
+
 t_retry_async_inflight_batch(_Config) ->
     ResumeInterval = 1_000,
     emqx_connector_demo:set_callback_mode(async_if_possible),