Sfoglia il codice sorgente

fix(bridge): update inflight items after partial expiry

Zaiming (Stone) Shi 3 anni fa
parent
commit
fc614e16e5

+ 63 - 35
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -335,11 +335,13 @@ resume_from_blocked(Data) ->
             %% We retry msgs in inflight window sync, as if we send them
             %% async, they will be appended to the end of inflight window again.
             retry_inflight_sync(Ref, Query, Data);
+        {batch, Ref, NotExpired, []} ->
+            retry_inflight_sync(Ref, NotExpired, Data);
         {batch, Ref, NotExpired, Expired} ->
             NumExpired = length(Expired),
-            update_inflight_item(InflightTID, Ref, NotExpired, NumExpired),
+            ok = update_inflight_item(InflightTID, Ref, NotExpired, NumExpired),
             emqx_resource_metrics:dropped_expired_inc(Id, NumExpired),
-            NumExpired > 0 andalso ?tp(buffer_worker_retry_expired, #{expired => Expired}),
+            ?tp(buffer_worker_retry_expired, #{expired => Expired}),
             %% We retry msgs in inflight window sync, as if we send them
             %% async, they will be appended to the end of inflight window again.
             retry_inflight_sync(Ref, NotExpired, Data)
@@ -496,7 +498,7 @@ flush(Data0) ->
                 {NotExpired, Expired} ->
                     NumExpired = length(Expired),
                     emqx_resource_metrics:dropped_expired_inc(Id, NumExpired),
-                    IsBatch = BatchSize =/= 1,
+                    IsBatch = (BatchSize > 1),
                     %% We *must* use the new queue, because we currently can't
                     %% `nack' a `pop'.
                     %% Maybe we could re-open the queue?
@@ -506,7 +508,6 @@ flush(Data0) ->
                     ),
                     Ref = make_request_ref(),
                     do_flush(Data2, #{
-                        new_queue => Q1,
                         is_batch => IsBatch,
                         batch => NotExpired,
                         ref => Ref,
@@ -519,18 +520,16 @@ flush(Data0) ->
     is_batch := boolean(),
     batch := [queue_query()],
     ack_ref := replayq:ack_ref(),
-    ref := inflight_key(),
-    new_queue := replayq:q()
+    ref := inflight_key()
 }) ->
     gen_statem:event_handler_result(state(), data()).
 do_flush(
-    Data0,
+    #{queue := Q1} = Data0,
     #{
         is_batch := false,
         batch := Batch,
         ref := Ref,
-        ack_ref := QAckRef,
-        new_queue := Q1
+        ack_ref := QAckRef
     }
 ) ->
     #{
@@ -610,12 +609,11 @@ do_flush(
             end,
             {keep_state, Data1}
     end;
-do_flush(Data0, #{
+do_flush(#{queue := Q1} = Data0, #{
     is_batch := true,
     batch := Batch,
     ref := Ref,
-    ack_ref := QAckRef,
-    new_queue := Q1
+    ack_ref := QAckRef
 }) ->
     #{
         id := Id,
@@ -715,17 +713,18 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
         end,
         Batch
     ),
-    {ShouldAck, PostFns} =
+    {Action, PostFn1} = reply_caller_defer_metrics(Id, hd(Replies), QueryOpts),
+    PostFns =
         lists:foldl(
-            fun(Reply, {_ShouldAck, PostFns}) ->
-                {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts),
-                {ShouldAck, [PostFn | PostFns]}
+            fun(Reply, PostFns) ->
+                {_, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts),
+                [PostFn | PostFns]
             end,
-            {ack, []},
-            Replies
+            [PostFn1],
+            tl(Replies)
         ),
-    PostFn = fun() -> lists:foreach(fun(F) -> F() end, PostFns) end,
-    {ShouldAck, PostFn}.
+    PostFn = fun() -> lists:foreach(fun(F) -> F() end, lists:reverse(PostFns)) end,
+    {Action, PostFn}.
 
 reply_caller(Id, Reply, QueryOpts) ->
     {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts),
@@ -1024,7 +1023,7 @@ do_handle_async_reply(
     case Action of
         nack ->
             %% Keep retrying.
-            mark_inflight_as_retriable(InflightTID, Ref),
+            ok = mark_inflight_as_retriable(InflightTID, Ref),
             ?MODULE:block(Pid);
         ack ->
             do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts)
@@ -1051,15 +1050,40 @@ handle_async_batch_reply(
             IsFullBefore = is_inflight_full(InflightTID),
             IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
             IsAcked andalso emqx_resource_metrics:late_reply_inc(Id, length(Batch)),
-            IsFullBefore andalso ?MODULE:flush_worker(Pid),
+            IsFullBefore andalso IsAcked andalso ?MODULE:flush_worker(Pid),
             ?tp(handle_async_reply_expired, #{expired => Batch}),
             ok;
-        {NotExpired, Expired} ->
-            NumExpired = length(Expired),
-            emqx_resource_metrics:late_reply_inc(Id, NumExpired),
-            NumExpired > 0 andalso
-                ?tp(handle_async_reply_expired, #{expired => Expired}),
-            do_handle_async_batch_reply(ReplyContext#{batch := NotExpired}, Result)
+        {_NotExpired, []} ->
+            do_handle_async_batch_reply(ReplyContext, Result);
+        {_NotExpired, _Expired} ->
+            %% partial expire
+            %% the batch from reply context is minimized, so it cannot be used
+            %% to update the inflight items, hence discard Batch and lookup the RealBatch
+            ?tp(handle_async_reply_expired, #{expired => _Expired}),
+            case ets:lookup(InflightTID, Ref) of
+                [] ->
+                    %% e.g. if the driver evaluates it more than once
+                    %% which should really be a bug, TODO: add a unknown_reply counter
+                    ok;
+                [?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef)] ->
+                    %% All batch items share the same HasBeenSent flag
+                    %% So we just take the original flag from the ReplyContext batch
+                    %% and put it back to the batch found in inflight table
+                    %% which must have already been set to `false`
+                    [?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch,
+                    {RealNotExpired0, RealExpired} = sieve_expired_requests(RealBatch, Now),
+                    RealNotExpired =
+                        lists:map(
+                            fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) ->
+                                ?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt)
+                            end,
+                            RealNotExpired0
+                        ),
+                    NumExpired = length(RealExpired),
+                    emqx_resource_metrics:late_reply_inc(Id, NumExpired),
+                    ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired),
+                    do_handle_async_batch_reply(ReplyContext#{batch := RealNotExpired}, Result)
+            end
     end.
 
 do_handle_async_batch_reply(
@@ -1084,7 +1108,7 @@ do_handle_async_batch_reply(
     case Action of
         nack ->
             %% Keep retrying.
-            mark_inflight_as_retriable(InflightTID, Ref),
+            ok = mark_inflight_as_retriable(InflightTID, Ref),
             ?MODULE:block(Pid);
         ack ->
             do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts)
@@ -1320,10 +1344,15 @@ ack_inflight(InflightTID, Ref, Id, Index) ->
             [] ->
                 0
         end,
-    IsAcked = (Count > 0),
-    IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
-    emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
-    IsAcked.
+    IsKnownRef = (Count > 0),
+    case IsKnownRef of
+        true ->
+            ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
+            emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID));
+        false ->
+            ok
+    end,
+    IsKnownRef.
 
 mark_inflight_items_as_retriable(Data, WorkerMRef) ->
     #{inflight_tid := InflightTID} = Data,
@@ -1341,10 +1370,9 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) ->
     ok.
 
 %% used to update a batch after dropping expired individual queries.
-update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) ->
+update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) when NumExpired > 0 ->
     _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}),
     _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -NumExpired, 0, 0}),
-    ?tp(buffer_worker_worker_update_inflight_item, #{ref => Ref}),
     ok.
 
 %%==============================================================================

+ 1 - 1
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -1997,6 +1997,7 @@ do_t_expiration_async_after_reply(IsBatch) ->
             {ok, _} = ?block_until(
                 #{?snk_kind := handle_async_reply_expired}, 10 * TimeoutMS
             ),
+            wait_telemetry_event(success, #{n_events => 1, timeout => 4_000}),
 
             unlink(Pid0),
             exit(Pid0, kill),
@@ -2011,7 +2012,6 @@ do_t_expiration_async_after_reply(IsBatch) ->
                 ],
                 ?of_kind(handle_async_reply_expired, Trace)
             ),
-            wait_telemetry_event(success, #{n_events => 1, timeout => 4_000}),
             Metrics = tap_metrics(?LINE),
             ?assertMatch(
                 #{