Sfoglia il codice sorgente

Merge pull request #9849 from zmstone/0127-refactor-buffer-worker-simplify-caller-reply

0127 refactor buffer worker simplify caller reply
Zaiming (Stone) Shi 3 anni fa
parent
commit
96ed725a55
1 ha cambiato i file con 36 aggiunte e 59 eliminazioni
  1. 36 59
      apps/emqx_resource/src/emqx_resource_buffer_worker.erl

+ 36 - 59
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -210,7 +210,7 @@ running(cast, flush, Data) ->
     flush(Data);
 running(cast, block, St) ->
     {next_state, blocked, St};
-running(info, ?SEND_REQ(_From, _Req) = Request0, Data) ->
+running(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data) ->
     handle_query_requests(Request0, Data);
 running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) ->
     flush(St#{tref := undefined});
@@ -238,7 +238,7 @@ blocked(cast, flush, Data) ->
     resume_from_blocked(Data);
 blocked(state_timeout, unblock, St) ->
     resume_from_blocked(St);
-blocked(info, ?SEND_REQ(_ReqFrom, _Req) = Request0, Data0) ->
+blocked(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data0) ->
     Data = collect_and_enqueue_query_requests(Request0, Data0),
     {keep_state, Data};
 blocked(info, {flush, _Ref}, _Data) ->
@@ -284,7 +284,8 @@ pick_call(Id, Key, Query, Timeout) ->
         Caller = self(),
         MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
         From = {Caller, MRef},
-        erlang:send(Pid, ?SEND_REQ(From, Query)),
+        ReplyTo = {fun gen_statem:reply/2, [From]},
+        erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)),
         receive
             {MRef, Response} ->
                 erlang:demonitor(MRef, [flush]),
@@ -304,8 +305,8 @@ pick_call(Id, Key, Query, Timeout) ->
 
 pick_cast(Id, Key, Query) ->
     ?PICK(Id, Key, Pid, begin
-        From = undefined,
-        erlang:send(Pid, ?SEND_REQ(From, Query)),
+        ReplyTo = undefined,
+        erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)),
         ok
     end).
 
@@ -366,8 +367,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
     Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
     ReplyResult =
         case QueryOrBatch of
-            ?QUERY(From, _, HasBeenSent, _ExpireAt) ->
-                Reply = ?REPLY(From, HasBeenSent, Result),
+            ?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) ->
+                Reply = ?REPLY(ReplyTo, HasBeenSent, Result),
                 reply_caller_defer_metrics(Id, Reply, QueryOpts);
             [?QUERY(_, _, _, _) | _] = Batch ->
                 batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts)
@@ -421,15 +422,15 @@ collect_and_enqueue_query_requests(Request0, Data0) ->
     Queries =
         lists:map(
             fun
-                (?SEND_REQ(undefined = _From, {query, Req, Opts})) ->
+                (?SEND_REQ(undefined = _ReplyTo, {query, Req, Opts})) ->
                     ReplyFun = maps:get(async_reply_fun, Opts, undefined),
                     HasBeenSent = false,
                     ExpireAt = maps:get(expire_at, Opts),
                     ?QUERY(ReplyFun, Req, HasBeenSent, ExpireAt);
-                (?SEND_REQ(From, {query, Req, Opts})) ->
+                (?SEND_REQ(ReplyTo, {query, Req, Opts})) ->
                     HasBeenSent = false,
                     ExpireAt = maps:get(expire_at, Opts),
-                    ?QUERY(From, Req, HasBeenSent, ExpireAt)
+                    ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt)
             end,
             Requests
         ),
@@ -439,17 +440,18 @@ collect_and_enqueue_query_requests(Request0, Data0) ->
 
 reply_overflown([]) ->
     ok;
-reply_overflown([?QUERY(From, _Req, _HasBeenSent, _ExpireAt) | More]) ->
-    do_reply_caller(From, {error, buffer_overflow}),
+reply_overflown([?QUERY(ReplyTo, _Req, _HasBeenSent, _ExpireAt) | More]) ->
+    do_reply_caller(ReplyTo, {error, buffer_overflow}),
     reply_overflown(More).
 
 do_reply_caller(undefined, _Result) ->
     ok;
+do_reply_caller({F, Args}, {async_return, Result}) ->
+    %% this is an early return to async caller, the retry
+    %% decision has to be made by the caller
+    do_reply_caller({F, Args}, Result);
 do_reply_caller({F, Args}, Result) when is_function(F) ->
     _ = erlang:apply(F, Args ++ [Result]),
-    ok;
-do_reply_caller(From, Result) ->
-    _ = gen_statem:reply(From, Result),
     ok.
 
 maybe_flush(Data0) ->
@@ -544,10 +546,10 @@ do_flush(
         inflight_tid := InflightTID
     } = Data0,
     %% unwrap when not batching (i.e., batch size == 1)
-    [?QUERY(From, _, HasBeenSent, _ExpireAt) = Request] = Batch,
+    [?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) = Request] = Batch,
     QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
     Result = call_query(configured, Id, Index, Ref, Request, QueryOpts),
-    Reply = ?REPLY(From, HasBeenSent, Result),
+    Reply = ?REPLY(ReplyTo, HasBeenSent, Result),
     case reply_caller(Id, Reply, QueryOpts) of
         %% Failed; remove the request from the queue, as we cannot pop
         %% from it again, but we'll retry it using the inflight table.
@@ -730,46 +732,21 @@ reply_caller(Id, Reply, QueryOpts) ->
 %% retriable).  See comment on `handle_query_result_pure'.
 reply_caller_defer_metrics(Id, ?REPLY(undefined, HasBeenSent, Result), _QueryOpts) ->
     handle_query_result_pure(Id, Result, HasBeenSent);
-reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, HasBeenSent, Result), QueryOpts) when
-    is_function(ReplyFun)
-->
-    IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
-    IsUnrecoverableError = is_unrecoverable_error(Result),
-    {ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
-    case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of
-        {ack, {async_return, _}, true, _} ->
-            apply(ReplyFun, Args ++ [Result]),
-            ok;
-        {ack, {async_return, _}, false, _} ->
-            ok;
-        {_, _, _, true} ->
-            apply(ReplyFun, Args ++ [Result]),
-            ok;
-        {nack, _, _, _} ->
-            ok;
-        {ack, _, _, _} ->
-            apply(ReplyFun, Args ++ [Result]),
-            ok
-    end,
-    {ShouldAck, PostFn};
-reply_caller_defer_metrics(Id, ?REPLY(From, HasBeenSent, Result), QueryOpts) ->
+reply_caller_defer_metrics(Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts) ->
     IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
     IsUnrecoverableError = is_unrecoverable_error(Result),
     {ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
     case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of
         {ack, {async_return, _}, true, _} ->
-            gen_statem:reply(From, Result),
-            ok;
+            ok = do_reply_caller(ReplyTo, Result);
         {ack, {async_return, _}, false, _} ->
             ok;
         {_, _, _, true} ->
-            gen_statem:reply(From, Result),
-            ok;
+            ok = do_reply_caller(ReplyTo, Result);
         {nack, _, _, _} ->
             ok;
         {ack, _, _, _} ->
-            gen_statem:reply(From, Result),
-            ok
+            ok = do_reply_caller(ReplyTo, Result)
     end,
     {ShouldAck, PostFn}.
 
@@ -935,7 +912,7 @@ apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, R
     ?tp(call_batch_query, #{
         id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
     }),
-    Requests = [Request || ?QUERY(_From, Request, _, _ExpireAt) <- Batch],
+    Requests = [Request || ?QUERY(_ReplyTo, Request, _, _ExpireAt) <- Batch],
     ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
 apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) ->
     ?tp(call_batch_query_async, #{
@@ -947,7 +924,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re
         begin
             ReplyFun = fun ?MODULE:batch_reply_after_query/8,
             ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch, QueryOpts]},
-            Requests = [Request || ?QUERY(_From, Request, _, _ExpireAt) <- Batch],
+            Requests = [Request || ?QUERY(_ReplyTo, Request, _, _ExpireAt) <- Batch],
             IsRetriable = false,
             WorkerMRef = undefined,
             InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
@@ -964,7 +941,7 @@ reply_after_query(
     Index,
     InflightTID,
     Ref,
-    ?QUERY(_From, _Request, _HasBeenSent, ExpireAt) = Query,
+    ?QUERY(_ReplyTo, _Request, _HasBeenSent, ExpireAt) = Query,
     QueryOpts,
     Result
 ) ->
@@ -991,7 +968,7 @@ do_reply_after_query(
     Index,
     InflightTID,
     Ref,
-    ?QUERY(From, _Request, HasBeenSent, _ExpireAt),
+    ?QUERY(ReplyTo, _Request, HasBeenSent, _ExpireAt),
     QueryOpts,
     Result
 ) ->
@@ -999,14 +976,14 @@ do_reply_after_query(
     %% but received no ACK, NOT the number of messages queued in the
     %% inflight window.
     {Action, PostFn} = reply_caller_defer_metrics(
-        Id, ?REPLY(From, HasBeenSent, Result), QueryOpts
+        Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts
     ),
     case Action of
         nack ->
             %% Keep retrying.
             ?tp(buffer_worker_reply_after_query, #{
                 action => Action,
-                batch_or_query => ?QUERY(From, _Request, HasBeenSent, _ExpireAt),
+                batch_or_query => ?QUERY(ReplyTo, _Request, HasBeenSent, _ExpireAt),
                 ref => Ref,
                 result => Result
             }),
@@ -1015,7 +992,7 @@ do_reply_after_query(
         ack ->
             ?tp(buffer_worker_reply_after_query, #{
                 action => Action,
-                batch_or_query => ?QUERY(From, _Request, HasBeenSent, _ExpireAt),
+                batch_or_query => ?QUERY(ReplyTo, _Request, HasBeenSent, _ExpireAt),
                 ref => Ref,
                 result => Result
             }),
@@ -1175,7 +1152,7 @@ inflight_get_first_retriable(InflightTID, Now) ->
     case ets:select(InflightTID, MatchSpec, _Limit = 1) of
         '$end_of_table' ->
             none;
-        {[{Ref, Query = ?QUERY(_From, _CoreReq, _HasBeenSent, ExpireAt)}], _Continuation} ->
+        {[{Ref, Query = ?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)}], _Continuation} ->
             case is_expired(ExpireAt, Now) of
                 true ->
                     {expired, Ref, [Query]};
@@ -1234,7 +1211,7 @@ inflight_append(
 inflight_append(
     InflightTID,
     ?INFLIGHT_ITEM(
-        Ref, ?QUERY(_From, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, WorkerMRef
+        Ref, ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, WorkerMRef
     ),
     Id,
     Index
@@ -1405,7 +1382,7 @@ do_collect_requests(Acc, Count, Limit) when Count >= Limit ->
     lists:reverse(Acc);
 do_collect_requests(Acc, Count, Limit) ->
     receive
-        ?SEND_REQ(_From, _Req) = Request ->
+        ?SEND_REQ(_ReplyTo, _Req) = Request ->
             do_collect_requests([Request | Acc], Count + 1, Limit)
     after 0 ->
         lists:reverse(Acc)
@@ -1413,9 +1390,9 @@ do_collect_requests(Acc, Count, Limit) ->
 
 mark_as_sent(Batch) when is_list(Batch) ->
     lists:map(fun mark_as_sent/1, Batch);
-mark_as_sent(?QUERY(From, Req, _HasBeenSent, ExpireAt)) ->
+mark_as_sent(?QUERY(ReplyTo, Req, _HasBeenSent, ExpireAt)) ->
     HasBeenSent = true,
-    ?QUERY(From, Req, HasBeenSent, ExpireAt).
+    ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt).
 
 is_unrecoverable_error({error, {unrecoverable_error, _}}) ->
     true;
@@ -1439,7 +1416,7 @@ is_async_return(_) ->
 sieve_expired_requests(Batch, Now) ->
     {Expired, NotExpired} =
         lists:partition(
-            fun(?QUERY(_From, _CoreReq, _HasBeenSent, ExpireAt)) ->
+            fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)) ->
                 is_expired(ExpireAt, Now)
             end,
             Batch