Procházet zdrojové kódy

refactor(buffer_worker): more generic process for all_expired

Zaiming (Stone) Shi před 3 roky
rodič
revize
713220f88b

+ 55 - 55
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -489,7 +489,7 @@ flush(Data0) ->
             %% if the request has expired, the caller is no longer
             %% waiting for a response.
             case sieve_expired_requests(Batch, Now) of
-                all_expired ->
+                {[], _AllExpired} ->
                     ok = replayq:ack(Q1, QAckRef),
                     emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)),
                     emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
@@ -1031,9 +1031,6 @@ do_handle_async_reply(
 
 handle_async_batch_reply(
     #{
-        buffer_worker := Pid,
-        resource_id := Id,
-        worker_index := Index,
         inflight_tid := InflightTID,
         request_ref := Ref,
         batch := Batch
@@ -1046,44 +1043,59 @@ handle_async_batch_reply(
     ),
     Now = now_(),
     case sieve_expired_requests(Batch, Now) of
-        all_expired ->
-            IsFullBefore = is_inflight_full(InflightTID),
-            IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
-            IsAcked andalso emqx_resource_metrics:late_reply_inc(Id, length(Batch)),
-            IsFullBefore andalso IsAcked andalso ?MODULE:flush_worker(Pid),
-            ?tp(handle_async_reply_expired, #{expired => Batch}),
-            ok;
         {_NotExpired, []} ->
+            %% this is the critical code path,
+            %% we try not to do ets:lookup in this case
+            %% because the batch can be quite big
             do_handle_async_batch_reply(ReplyContext, Result);
         {_NotExpired, _Expired} ->
-            %% partial expire
+            %% at least one is expired
             %% the batch from reply context is minimized, so it cannot be used
             %% to update the inflight items, hence discard Batch and lookup the RealBatch
             ?tp(handle_async_reply_expired, #{expired => _Expired}),
-            case ets:lookup(InflightTID, Ref) of
-                [] ->
-                    %% e.g. if the driver evaluates it more than once
-                    %% which should really be a bug, TODO: add a unknown_reply counter
-                    ok;
-                [?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef)] ->
-                    %% All batch items share the same HasBeenSent flag
-                    %% So we just take the original flag from the ReplyContext batch
-                    %% and put it back to the batch found in inflight table
-                    %% which must have already been set to `false`
-                    [?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch,
-                    {RealNotExpired0, RealExpired} = sieve_expired_requests(RealBatch, Now),
-                    RealNotExpired =
-                        lists:map(
-                            fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) ->
-                                ?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt)
-                            end,
-                            RealNotExpired0
-                        ),
-                    NumExpired = length(RealExpired),
-                    emqx_resource_metrics:late_reply_inc(Id, NumExpired),
-                    ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired),
-                    do_handle_async_batch_reply(ReplyContext#{batch := RealNotExpired}, Result)
-            end
+            handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now)
+    end.
+
+handle_async_batch_reply2([], _, _, _) ->
+    %% e.g. if the driver evaluates the callback more than once
+    %% which should really be a bug
+    ok;
+handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
+    ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef) = Inflight,
+    #{
+        buffer_worker := Pid,
+        resource_id := Id,
+        worker_index := Index,
+        inflight_tid := InflightTID,
+        request_ref := Ref,
+        batch := Batch
+    } = ReplyContext,
+    %% All batch items share the same HasBeenSent flag
+    %% So we just take the original flag from the ReplyContext batch
+    %% and put it back to the batch found in inflight table
+    %% which must have already been set to `false`
+    [?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch,
+    {RealNotExpired0, RealExpired} = sieve_expired_requests(RealBatch, Now),
+    RealNotExpired =
+        lists:map(
+            fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) ->
+                ?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt)
+            end,
+            RealNotExpired0
+        ),
+    NumExpired = length(RealExpired),
+    emqx_resource_metrics:late_reply_inc(Id, NumExpired),
+    case RealNotExpired of
+        [] ->
+            %% all expired, no need to update back the inflight batch
+            IsFullBefore = is_inflight_full(InflightTID),
+            IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
+            IsFullBefore andalso IsAcked andalso ?MODULE:flush_worker(Pid);
+        _ ->
+            %% some queries are not expired, put them back to the inflight batch
+            %% so it can be either acked now or retried later
+            ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired),
+            do_handle_async_batch_reply(ReplyContext#{batch := RealNotExpired}, Result)
     end.
 
 do_handle_async_batch_reply(
@@ -1226,10 +1238,8 @@ inflight_get_first_retriable(InflightTID, Now) ->
                     {single, Ref, Query}
             end;
         {[{Ref, Batch = [_ | _]}], _Continuation} ->
-            %% batch is non-empty because we check that in
-            %% `sieve_expired_requests'.
             case sieve_expired_requests(Batch, Now) of
-                all_expired ->
+                {[], _AllExpired} ->
                     {expired, Ref, Batch};
                 {NotExpired, Expired} ->
                     {batch, Ref, NotExpired, Expired}
@@ -1482,22 +1492,12 @@ is_async_return(_) ->
     false.
 
 sieve_expired_requests(Batch, Now) ->
-    {Expired, NotExpired} =
-        lists:partition(
-            fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)) ->
-                is_expired(ExpireAt, Now)
-            end,
-            Batch
-        ),
-    case {NotExpired, Expired} of
-        {[], []} ->
-            %% Should be impossible for batch_size >= 1.
-            all_expired;
-        {[], [_ | _]} ->
-            all_expired;
-        {[_ | _], _} ->
-            {NotExpired, Expired}
-    end.
+    lists:partition(
+        fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)) ->
+            not is_expired(ExpireAt, Now)
+        end,
+        Batch
+    ).
 
 -spec is_expired(infinity | integer(), integer()) -> boolean().
 is_expired(infinity = _ExpireAt, _Now) ->