Преглед изворни кода

fix(buffer_worker): ensure async flush message is sent

This is a new issue introduced in the previous fix commits
after handling the partial expiry correctly, the
IsFullBefore check is no longer the state before the reply
is received but the state after a partially-expired batch
is shrinked.
The fix is simple, move the check to the entry-point of
where async reply callback enters, then send an async
'flush' notification regardless of the handling result.
Zaiming (Stone) Shi пре 3 година
родитељ
комит
356a94af30
1 измењених фајлова са 26 додато и 16 уклоњено
  1. 26 16
      apps/emqx_resource/src/emqx_resource_buffer_worker.erl

+ 26 - 16
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -1019,15 +1019,16 @@ do_handle_async_reply(
         ref => Ref,
         result => Result
     }),
-
+    IsFullBefore = is_inflight_full(InflightTID),
     case Action of
         nack ->
             %% Keep retrying.
             ok = mark_inflight_as_retriable(InflightTID, Ref),
             ?MODULE:block(Pid);
         ack ->
-            do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts)
-    end.
+            do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts)
+    end,
+    ok = maybe_flush_after_async_reply(IsFullBefore).
 
 handle_async_batch_reply(
     #{
@@ -1042,19 +1043,21 @@ handle_async_batch_reply(
         #{batch_or_query => Batch, ref => Ref}
     ),
     Now = now_(),
+    IsFullBefore = is_inflight_full(InflightTID),
     case sieve_expired_requests(Batch, Now) of
         {_NotExpired, []} ->
             %% this is the critical code path,
             %% we try not to do ets:lookup in this case
             %% because the batch can be quite big
-            do_handle_async_batch_reply(ReplyContext, Result);
+            ok = do_handle_async_batch_reply(ReplyContext, Result);
         {_NotExpired, _Expired} ->
             %% at least one is expired
             %% the batch from reply context is minimized, so it cannot be used
             %% to update the inflight items, hence discard Batch and lookup the RealBatch
             ?tp(handle_async_reply_expired, #{expired => _Expired}),
-            handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now)
-    end.
+            ok = handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now)
+    end,
+    ok = maybe_flush_after_async_reply(IsFullBefore).
 
 handle_async_batch_reply2([], _, _, _) ->
     %% e.g. if the driver evaluates the callback more than once
@@ -1063,7 +1066,6 @@ handle_async_batch_reply2([], _, _, _) ->
 handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
     ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef) = Inflight,
     #{
-        buffer_worker := Pid,
         resource_id := Id,
         worker_index := Index,
         inflight_tid := InflightTID,
@@ -1088,15 +1090,15 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
     case RealNotExpired of
         [] ->
             %% all expired, no need to update back the inflight batch
-            IsFullBefore = is_inflight_full(InflightTID),
-            IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
-            IsFullBefore andalso IsAcked andalso ?MODULE:flush_worker(Pid);
+            _ = ack_inflight(InflightTID, Ref, Id, Index),
+            ok;
         _ ->
             %% some queries are not expired, put them back to the inflight batch
             %% so it can be either acked now or retried later
             ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired),
-            do_handle_async_batch_reply(ReplyContext#{batch := RealNotExpired}, Result)
-    end.
+            ok = do_handle_async_batch_reply(ReplyContext#{batch := RealNotExpired}, Result)
+    end,
+    ok.
 
 do_handle_async_batch_reply(
     #{
@@ -1123,11 +1125,10 @@ do_handle_async_batch_reply(
             ok = mark_inflight_as_retriable(InflightTID, Ref),
             ?MODULE:block(Pid);
         ack ->
-            do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts)
+            ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts)
     end.
 
-do_ack(InflightTID, Ref, Id, Index, PostFn, WorkerPid, QueryOpts) ->
-    IsFullBefore = is_inflight_full(InflightTID),
+do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) ->
     IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index),
     case maps:get(simple_query, QueryOpts, false) of
         true ->
@@ -1137,9 +1138,18 @@ do_ack(InflightTID, Ref, Id, Index, PostFn, WorkerPid, QueryOpts) ->
         false ->
             ok
     end,
-    IsFullBefore andalso ?MODULE:flush_worker(WorkerPid),
     ok.
 
+maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = false) ->
+    %% inflight was not full before async reply is handled,
+    %% after it is handled, the inflight table must be even smaller
+    %% hance we can rely on the buffer worker's flush timer to trigger
+    %% the next flush
+    ok;
+maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) ->
+    %% the inflight table was full before handling aync reply
+    ok = ?MODULE:flush_worker(self()).
+
 %%==============================================================================
 %% operations for queue
 queue_item_marshaller(Bin) when is_binary(Bin) ->