Explorar o código

refactor(buffer_worker): no need to keep request for REPLY macro

Zaiming (Stone) Shi %!s(int64=3) %!d(string=hai) anos
pai
achega
d4fab92b72
Modificáronse 1 ficheiros con 21 adicións e 19 borrados
  1. 21 19
      apps/emqx_resource/src/emqx_resource_buffer_worker.erl

+ 21 - 19
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -63,11 +63,7 @@
 -define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}).
 -define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}).
 -define(SIMPLE_QUERY(REQUEST), ?QUERY(undefined, REQUEST, false, infinity)).
--define(REPLY(FROM, REQUEST, SENT, RESULT), {reply, FROM, REQUEST, SENT, RESULT}).
--define(EXPAND(RESULT, BATCH), [
-    ?REPLY(FROM, REQUEST, SENT, RESULT)
- || ?QUERY(FROM, REQUEST, SENT, _EXPIRE_AT) <- BATCH
-]).
+-define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}).
 -define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef),
     {Ref, BatchOrQuery, IsRetriable, WorkerMRef}
 ).
@@ -370,8 +366,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
     Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
     ReplyResult =
         case QueryOrBatch of
-            ?QUERY(From, CoreReq, HasBeenSent, _ExpireAt) ->
-                Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
+            ?QUERY(From, _, HasBeenSent, _ExpireAt) ->
+                Reply = ?REPLY(From, HasBeenSent, Result),
                 reply_caller_defer_metrics(Id, Reply, QueryOpts);
             [?QUERY(_, _, _, _) | _] = Batch ->
                 batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts)
@@ -548,10 +544,10 @@ do_flush(
         inflight_tid := InflightTID
     } = Data0,
     %% unwrap when not batching (i.e., batch size == 1)
-    [?QUERY(From, CoreReq, HasBeenSent, _ExpireAt) = Request] = Batch,
+    [?QUERY(From, _, HasBeenSent, _ExpireAt) = Request] = Batch,
     QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
     Result = call_query(configured, Id, Index, Ref, Request, QueryOpts),
-    Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
+    Reply = ?REPLY(From, HasBeenSent, Result),
     case reply_caller(Id, Reply, QueryOpts) of
         %% Failed; remove the request from the queue, as we cannot pop
         %% from it again, but we'll retry it using the inflight table.
@@ -705,6 +701,14 @@ batch_reply_caller(Id, BatchResult, Batch, QueryOpts) ->
     ShouldBlock.
 
 batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
+    %% the `Mod:on_batch_query/3` returns a single result for a batch,
+    %% so we need to expand
+    Replies = lists:map(
+        fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT)) ->
+            ?REPLY(FROM, SENT, BatchResult)
+        end,
+        Batch
+    ),
     {ShouldAck, PostFns} =
         lists:foldl(
             fun(Reply, {_ShouldAck, PostFns}) ->
@@ -712,9 +716,7 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
                 {ShouldAck, [PostFn | PostFns]}
             end,
             {ack, []},
-            %% the `Mod:on_batch_query/3` returns a single result for a batch,
-            %% so we need to expand
-            ?EXPAND(BatchResult, Batch)
+            Replies
         ),
     PostFn = fun() -> lists:foreach(fun(F) -> F() end, PostFns) end,
     {ShouldAck, PostFn}.
@@ -726,9 +728,9 @@ reply_caller(Id, Reply, QueryOpts) ->
 
 %% Should only reply to the caller when the decision is final (not
 %% retriable).  See comment on `handle_query_result_pure'.
-reply_caller_defer_metrics(Id, ?REPLY(undefined, _, HasBeenSent, Result), _QueryOpts) ->
+reply_caller_defer_metrics(Id, ?REPLY(undefined, HasBeenSent, Result), _QueryOpts) ->
     handle_query_result_pure(Id, Result, HasBeenSent);
-reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), QueryOpts) when
+reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, HasBeenSent, Result), QueryOpts) when
     is_function(ReplyFun)
 ->
     IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
@@ -750,7 +752,7 @@ reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result),
             ok
     end,
     {ShouldAck, PostFn};
-reply_caller_defer_metrics(Id, ?REPLY(From, _, HasBeenSent, Result), QueryOpts) ->
+reply_caller_defer_metrics(Id, ?REPLY(From, HasBeenSent, Result), QueryOpts) ->
     IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
     IsUnrecoverableError = is_unrecoverable_error(Result),
     {ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
@@ -989,7 +991,7 @@ do_reply_after_query(
     Index,
     InflightTID,
     Ref,
-    ?QUERY(From, Request, HasBeenSent, _ExpireAt),
+    ?QUERY(From, _Request, HasBeenSent, _ExpireAt),
     QueryOpts,
     Result
 ) ->
@@ -997,14 +999,14 @@ do_reply_after_query(
     %% but received no ACK, NOT the number of messages queued in the
     %% inflight window.
     {Action, PostFn} = reply_caller_defer_metrics(
-        Id, ?REPLY(From, Request, HasBeenSent, Result), QueryOpts
+        Id, ?REPLY(From, HasBeenSent, Result), QueryOpts
     ),
     case Action of
         nack ->
             %% Keep retrying.
             ?tp(buffer_worker_reply_after_query, #{
                 action => Action,
-                batch_or_query => ?QUERY(From, Request, HasBeenSent, _ExpireAt),
+                batch_or_query => ?QUERY(From, _Request, HasBeenSent, _ExpireAt),
                 ref => Ref,
                 result => Result
             }),
@@ -1013,7 +1015,7 @@ do_reply_after_query(
         ack ->
             ?tp(buffer_worker_reply_after_query, #{
                 action => Action,
-                batch_or_query => ?QUERY(From, Request, HasBeenSent, _ExpireAt),
+                batch_or_query => ?QUERY(From, _Request, HasBeenSent, _ExpireAt),
                 ref => Ref,
                 result => Result
             }),