Browse Source

refactor(buffer_worker): ensure flsh message is never missed

Zaiming (Stone) Shi 3 năm trước cách đây
mục cha
commit
3a6dbbdd05

+ 46 - 34
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -70,6 +70,18 @@
 -define(RETRY_IDX, 3).
 -define(WORKER_MREF_IDX, 4).
 
+-define(ENSURE_ASYNC_FLUSH(InflightTID, EXPR),
+    (fun() ->
+        IsFullBefore = is_inflight_full(InflightTID),
+        case (EXPR) of
+            blocked ->
+                ok;
+            ok ->
+                maybe_flush_after_async_reply(IsFullBefore)
+        end
+    end)()
+).
+
 -type id() :: binary().
 -type index() :: pos_integer().
 -type expire_at() :: infinity | integer().
@@ -194,8 +206,8 @@ init({Id, Index, Opts}) ->
     ?tp(buffer_worker_init, #{id => Id, index => Index}),
     {ok, running, Data}.
 
-running(enter, _, Data) ->
-    ?tp(buffer_worker_enter_running, #{id => maps:get(id, Data)}),
+running(enter, _, #{tref := _Tref} = Data) ->
+    ?tp(buffer_worker_enter_running, #{id => maps:get(id, Data), tref => _Tref}),
     %% According to `gen_statem' laws, we mustn't call `maybe_flush'
     %% directly because it may decide to return `{next_state, blocked, _}',
     %% and that's an invalid response for a state enter call.
@@ -212,9 +224,8 @@ running(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data) ->
     handle_query_requests(Request0, Data);
 running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) ->
     flush(St#{tref := undefined});
-running(internal, flush, St) ->
-    flush(St);
 running(info, {flush, _Ref}, _St) ->
+    ?tp(discarded_stale_flush, #{}),
     keep_state_and_data;
 running(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
     is_map_key(Pid, AsyncWorkers0)
@@ -472,10 +483,15 @@ flush(Data0) ->
     Data1 = cancel_flush_timer(Data0),
     CurrentCount = queue_count(Q0),
     IsFull = is_inflight_full(InflightTID),
-    ?tp(buffer_worker_flush, #{queue_count => CurrentCount, is_full => IsFull}),
+    InflightCount = inflight_num_batches(InflightTID),
+    ?tp(buffer_worker_flush, #{
+        queued => CurrentCount,
+        is_inflight_full => IsFull,
+        inflight => InflightCount
+    }),
     case {CurrentCount, IsFull} of
         {0, _} ->
-            ?tp(buffer_worker_queue_drained, #{inflight => inflight_num_batches(InflightTID)}),
+            ?tp(buffer_worker_queue_drained, #{inflight => InflightCount}),
             {keep_state, Data1};
         {_, true} ->
             ?tp(buffer_worker_flush_but_inflight_full, #{}),
@@ -714,18 +730,18 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
         end,
         Batch
     ),
-    {Action, PostFn1} = reply_caller_defer_metrics(Id, hd(Replies), QueryOpts),
-    PostFns =
+    {ShouldAck, PostFns} =
         lists:foldl(
-            fun(Reply, PostFns) ->
-                {_, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts),
-                [PostFn | PostFns]
+            fun(Reply, {_ShouldAck, PostFns}) ->
+                %% _ShouldAck should be the same as ShouldAck starting from the second reply
+                {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts),
+                {ShouldAck, [PostFn | PostFns]}
             end,
-            [PostFn1],
-            tl(Replies)
+            {ack, []},
+            Replies
         ),
     PostFn = fun() -> lists:foreach(fun(F) -> F() end, lists:reverse(PostFns)) end,
-    {Action, PostFn}.
+    {ShouldAck, PostFn}.
 
 reply_caller(Id, Reply, QueryOpts) ->
     {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts),
@@ -978,7 +994,7 @@ handle_async_reply(
         discard ->
             ok;
         continue ->
-            handle_async_reply1(ReplyContext, Result)
+            ?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_reply1(ReplyContext, Result))
     end.
 
 handle_async_reply1(
@@ -999,10 +1015,8 @@ handle_async_reply1(
     Now = now_(),
     case is_expired(ExpireAt, Now) of
         true ->
-            IsFullBefore = is_inflight_full(InflightTID),
             IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
             IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
-            IsFullBefore andalso ?MODULE:flush_worker(Pid),
             ?tp(handle_async_reply_expired, #{expired => [_Query]}),
             ok;
         false ->
@@ -1034,16 +1048,15 @@ do_handle_async_reply(
         ref => Ref,
         result => Result
     }),
-    IsFullBefore = is_inflight_full(InflightTID),
     case Action of
         nack ->
             %% Keep retrying.
             ok = mark_inflight_as_retriable(InflightTID, Ref),
-            ?MODULE:block(Pid);
+            ok = ?MODULE:block(Pid),
+            blocked;
         ack ->
-            do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts)
-    end,
-    ok = maybe_flush_after_async_reply(IsFullBefore).
+            ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts)
+    end.
 
 handle_async_batch_reply(
     #{
@@ -1056,7 +1069,7 @@ handle_async_batch_reply(
         discard ->
             ok;
         continue ->
-            handle_async_batch_reply1(ReplyContext, Result)
+            ?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_batch_reply1(ReplyContext, Result))
     end.
 
 handle_async_batch_reply1(
@@ -1072,21 +1085,19 @@ handle_async_batch_reply1(
         #{batch_or_query => Batch, ref => Ref, result => Result}
     ),
     Now = now_(),
-    IsFullBefore = is_inflight_full(InflightTID),
     case sieve_expired_requests(Batch, Now) of
         {_NotExpired, []} ->
             %% this is the critical code path,
             %% we try not to do ets:lookup in this case
             %% because the batch can be quite big
-            ok = do_handle_async_batch_reply(ReplyContext, Result);
+            do_handle_async_batch_reply(ReplyContext, Result);
         {_NotExpired, _Expired} ->
             %% at least one is expired
             %% the batch from reply context is minimized, so it cannot be used
             %% to update the inflight items, hence discard Batch and lookup the RealBatch
             ?tp(handle_async_reply_expired, #{expired => _Expired}),
-            ok = handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now)
-    end,
-    ok = maybe_flush_after_async_reply(IsFullBefore).
+            handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now)
+    end.
 
 handle_async_batch_reply2([], _, _, _) ->
     %% should have caused the unknown_async_reply_discarded
@@ -1124,9 +1135,8 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
             %% some queries are not expired, put them back to the inflight batch
             %% so it can be either acked now or retried later
             ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired),
-            ok = do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Result)
-    end,
-    ok.
+            do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Result)
+    end.
 
 do_handle_async_batch_reply(
     #{
@@ -1151,7 +1161,8 @@ do_handle_async_batch_reply(
         nack ->
             %% Keep retrying.
             ok = mark_inflight_as_retriable(InflightTID, Ref),
-            ok = ?MODULE:block(Pid);
+            ok = ?MODULE:block(Pid),
+            blocked;
         ack ->
             ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts)
     end.
@@ -1173,9 +1184,11 @@ maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = false) ->
     %% after it is handled, the inflight table must be even smaller
     %% hance we can rely on the buffer worker's flush timer to trigger
     %% the next flush
+    ?tp(skip_flushing_worker, #{}),
     ok;
 maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) ->
     %% the inflight table was full before handling aync reply
+    ?tp(do_flushing_worker, #{}),
     ok = ?MODULE:flush_worker(self()).
 
 %% check if the async reply is valid.
@@ -1189,7 +1202,6 @@ maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) ->
 maybe_handle_unknown_async_reply(InflightTID, Ref) ->
     try ets:member(InflightTID, Ref) of
         true ->
-            %% NOTE: this does not mean the
             continue;
         false ->
             ?tp(
@@ -1446,7 +1458,7 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) ->
     ok.
 
 %% used to update a batch after dropping expired individual queries.
-update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) ->
+update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) when NumExpired > 0 ->
     _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}),
     ok = dec_inflight(InflightTID, NumExpired),
     ok.

+ 2 - 0
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -314,6 +314,8 @@ counter_loop(
                 %% drain the buffer (and inflights table)
                 ReplyCount = 1 + (RandNum rem 3),
                 Results = random_replies(ReplyCount),
+                %% add a delay to trigger inflight full
+                timer:sleep(5),
                 lists:foreach(
                     fun(Result) ->
                         apply_reply(ReplyFun, Result)

+ 19 - 16
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -1510,8 +1510,9 @@ t_retry_async_inflight_full(_Config) ->
 %% this test case is to ensure the buffer worker will not go crazy even
 %% if the underlying connector is misbehaving: evaluate async callbacks multiple times
 t_async_reply_multi_eval(_Config) ->
-    ResumeInterval = 20,
-    AsyncInflightWindow = 5,
+    ResumeInterval = 5,
+    TotalTime = 5_000,
+    AsyncInflightWindow = 3,
     emqx_connector_demo:set_callback_mode(async_if_possible),
     {ok, _} = emqx_resource:create(
         ?ID,
@@ -1528,29 +1529,31 @@ t_async_reply_multi_eval(_Config) ->
         }
     ),
     ?check_trace(
-        #{timetrap => 15_000},
+        #{timetrap => 30_000},
         begin
             %% block
             ok = emqx_resource:simple_sync_query(?ID, block),
 
-            {ok, {ok, _}} =
-                ?wait_async_action(
-                    inc_counter_in_parallel(
-                        AsyncInflightWindow * 2,
-                        fun() ->
-                            Rand = rand:uniform(1000),
-                            {random_reply, Rand}
-                        end,
-                        #{}
-                    ),
-                    #{?snk_kind := buffer_worker_queue_drained, inflight := 0},
-                    ResumeInterval * 200
+            ?wait_async_action(
+                inc_counter_in_parallel(
+                    AsyncInflightWindow * 5,
+                    fun() ->
+                        Rand = rand:uniform(1000),
+                        {random_reply, Rand}
+                    end,
+                    #{}
                 ),
+                #{?snk_kind := buffer_worker_flush, inflight := 0, queued := 0},
+                TotalTime
+            ),
             ok
         end,
         [
             fun(Trace) ->
-                ?assertMatch([#{inflight := 0}], ?of_kind(buffer_worker_queue_drained, Trace))
+                ?assertMatch(
+                    [#{inflight := 0} | _],
+                    lists:reverse(?of_kind(buffer_worker_queue_drained, Trace))
+                )
             end
         ]
     ),