Просмотр исходного кода

fix(buffer_worker): ensure flush timer reset in blocked state

Zaiming (Stone) Shi 3 лет назад
Родитель
Сommit
7a6465e2cf

+ 14 - 6
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -236,21 +236,24 @@ running(info, Info, _St) ->
     ?SLOG(error, #{msg => unexpected_msg, state => running, info => Info}),
     keep_state_and_data.
 
-blocked(enter, _, #{resume_interval := ResumeT} = _St) ->
+blocked(enter, _, #{resume_interval := ResumeT} = St0) ->
     ?tp(buffer_worker_enter_blocked, #{}),
-    {keep_state_and_data, {state_timeout, ResumeT, unblock}};
+    %% discard the old timer, new timer will be started when entering running state again
+    St = cancel_flush_timer(St0),
+    {keep_state, St, {state_timeout, ResumeT, unblock}};
 blocked(cast, block, _St) ->
     keep_state_and_data;
 blocked(cast, resume, St) ->
     resume_from_blocked(St);
-blocked(cast, flush, Data) ->
-    resume_from_blocked(Data);
+blocked(cast, flush, St) ->
+    resume_from_blocked(St);
 blocked(state_timeout, unblock, St) ->
     resume_from_blocked(St);
 blocked(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data0) ->
     Data = collect_and_enqueue_query_requests(Request0, Data0),
     {keep_state, Data};
 blocked(info, {flush, _Ref}, _Data) ->
+    %% ignore stale timer
     keep_state_and_data;
 blocked(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
     is_map_key(Pid, AsyncWorkers0)
@@ -622,6 +625,9 @@ do_flush(
                     }),
                     flush_worker(self());
                 false ->
+                    ?tp(buffer_worker_queue_drained, #{
+                        inflight => inflight_num_batches(InflightTID)
+                    }),
                     ok
             end,
             {keep_state, Data1}
@@ -700,6 +706,9 @@ do_flush(#{queue := Q1} = Data0, #{
             Data2 =
                 case {CurrentCount > 0, CurrentCount >= BatchSize} of
                     {false, _} ->
+                        ?tp(buffer_worker_queue_drained, #{
+                            inflight => inflight_num_batches(InflightTID)
+                        }),
                         Data1;
                     {true, true} ->
                         ?tp(buffer_worker_flush_ack_reflush, #{
@@ -1003,7 +1012,6 @@ handle_async_reply1(
         inflight_tid := InflightTID,
         resource_id := Id,
         worker_index := Index,
-        buffer_worker := Pid,
         min_query := ?QUERY(_, _, _, ExpireAt) = _Query
     } = ReplyContext,
     Result
@@ -1100,7 +1108,7 @@ handle_async_batch_reply1(
     end.
 
 handle_async_batch_reply2([], _, _, _) ->
-    %% should have caused the unknown_async_reply_discarded
+    %% this usually should never happen unless the async callback is being evaluated concurrently
     ok;
 handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
     ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef) = Inflight,

+ 1 - 1
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -1543,7 +1543,7 @@ t_async_reply_multi_eval(_Config) ->
                     end,
                     #{}
                 ),
-                #{?snk_kind := buffer_worker_flush, inflight := 0, queued := 0},
+                #{?snk_kind := buffer_worker_queue_drained, inflight := 0},
                 TotalTime
             ),
             ok