Parcourir la source

Merge pull request #9853 from zmstone/0127-refactor-buffer-worker-no-need-to-keep-request-for-reply-callback

0127 refactor buffer worker no need to keep request for reply callback
Zaiming (Stone) Shi il y a 3 ans
Parent
commit
b3e486041b

+ 113 - 78
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -52,7 +52,7 @@
 
 -export([queue_item_marshaller/1, estimate_size/1]).
 
--export([reply_after_query/8, batch_reply_after_query/8]).
+-export([handle_async_reply/2, handle_async_batch_reply/2]).
 
 -export([clear_disk_queue_dir/2]).
 
@@ -73,9 +73,8 @@
 -type id() :: binary().
 -type index() :: pos_integer().
 -type expire_at() :: infinity | integer().
--type queue_query() :: ?QUERY(from(), request(), HasBeenSent :: boolean(), expire_at()).
+-type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()).
 -type request() :: term().
--type from() :: pid() | reply_fun() | request_from().
 -type request_from() :: undefined | gen_statem:from().
 -type state() :: blocked | running.
 -type inflight_key() :: integer().
@@ -125,7 +124,7 @@ simple_sync_query(Id, Request) ->
     Index = undefined,
     QueryOpts = simple_query_opts(),
     emqx_resource_metrics:matched_inc(Id),
-    Ref = make_message_ref(),
+    Ref = make_request_ref(),
     Result = call_query(sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
     _ = handle_query_result(Id, Result, _HasBeenSent = false),
     Result.
@@ -136,7 +135,7 @@ simple_async_query(Id, Request) ->
     Index = undefined,
     QueryOpts = simple_query_opts(),
     emqx_resource_metrics:matched_inc(Id),
-    Ref = make_message_ref(),
+    Ref = make_request_ref(),
     Result = call_query(async, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
     _ = handle_query_result(Id, Result, _HasBeenSent = false),
     Result.
@@ -512,7 +511,7 @@ flush(Data0) ->
                         buffer_worker_flush_potentially_partial,
                         #{expired => Expired, not_expired => NotExpired}
                     ),
-                    Ref = make_message_ref(),
+                    Ref = make_request_ref(),
                     do_flush(Data2, #{
                         new_queue => Q1,
                         is_batch => IsBatch,
@@ -898,13 +897,21 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re
     ?APPLY_RESOURCE(
         call_query_async,
         begin
-            ReplyFun = fun ?MODULE:reply_after_query/8,
-            Args = [self(), Id, Index, InflightTID, Ref, Query, QueryOpts],
+            ReplyFun = fun ?MODULE:handle_async_reply/2,
+            ReplyContext = #{
+                buffer_worker => self(),
+                resource_id => Id,
+                worker_index => Index,
+                inflight_tid => InflightTID,
+                request_ref => Ref,
+                query_opts => QueryOpts,
+                query => minimize(Query)
+            },
             IsRetriable = false,
             WorkerMRef = undefined,
             InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
             ok = inflight_append(InflightTID, InflightItem, Id, Index),
-            Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt),
+            Result = Mod:on_query_async(Id, Request, {ReplyFun, [ReplyContext]}, ResSt),
             {async_return, Result}
         end,
         Request
@@ -913,7 +920,7 @@ apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, R
     ?tp(call_batch_query, #{
         id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
     }),
-    Requests = [Request || ?QUERY(_ReplyTo, Request, _, _ExpireAt) <- Batch],
+    Requests = lists:map(fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch),
     ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
 apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) ->
     ?tp(call_batch_query_async, #{
@@ -923,32 +930,43 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re
     ?APPLY_RESOURCE(
         call_batch_query_async,
         begin
-            ReplyFun = fun ?MODULE:batch_reply_after_query/8,
-            ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch, QueryOpts]},
-            Requests = [Request || ?QUERY(_ReplyTo, Request, _, _ExpireAt) <- Batch],
+            ReplyFun = fun ?MODULE:handle_async_batch_reply/2,
+            ReplyContext = #{
+                buffer_worker => self(),
+                resource_id => Id,
+                worker_index => Index,
+                inflight_tid => InflightTID,
+                request_ref => Ref,
+                query_opts => QueryOpts,
+                batch => minimize(Batch)
+            },
+            Requests = lists:map(
+                fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch
+            ),
             IsRetriable = false,
             WorkerMRef = undefined,
             InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
             ok = inflight_append(InflightTID, InflightItem, Id, Index),
-            Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt),
+            Result = Mod:on_batch_query_async(Id, Requests, {ReplyFun, [ReplyContext]}, ResSt),
             {async_return, Result}
         end,
         Batch
     ).
 
-reply_after_query(
-    Pid,
-    Id,
-    Index,
-    InflightTID,
-    Ref,
-    ?QUERY(_ReplyTo, _Request, _HasBeenSent, ExpireAt) = Query,
-    QueryOpts,
+handle_async_reply(
+    #{
+        request_ref := Ref,
+        inflight_tid := InflightTID,
+        resource_id := Id,
+        worker_index := Index,
+        buffer_worker := Pid,
+        query := ?QUERY(_, _, _, ExpireAt) = _Query
+    } = ReplyContext,
     Result
 ) ->
     ?tp(
-        buffer_worker_reply_after_query_enter,
-        #{batch_or_query => [Query], ref => Ref}
+        handle_async_reply_enter,
+        #{batch_or_query => [_Query], ref => Ref}
     ),
     Now = now_(),
     case is_expired(ExpireAt, Now) of
@@ -957,52 +975,60 @@ reply_after_query(
             IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
             IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
             IsFullBefore andalso ?MODULE:flush_worker(Pid),
-            ?tp(buffer_worker_reply_after_query_expired, #{expired => [Query]}),
+            ?tp(handle_async_reply_expired, #{expired => [_Query]}),
             ok;
         false ->
-            do_reply_after_query(Pid, Id, Index, InflightTID, Ref, Query, QueryOpts, Result)
+            do_handle_async_reply(ReplyContext, Result)
     end.
 
-do_reply_after_query(
-    Pid,
-    Id,
-    Index,
-    InflightTID,
-    Ref,
-    ?QUERY(ReplyTo, _Request, HasBeenSent, _ExpireAt),
-    QueryOpts,
+do_handle_async_reply(
+    #{
+        query_opts := QueryOpts,
+        resource_id := Id,
+        request_ref := Ref,
+        worker_index := Index,
+        buffer_worker := Pid,
+        inflight_tid := InflightTID,
+        query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query
+    },
     Result
 ) ->
     %% NOTE: 'inflight' is the count of messages that were sent async
     %% but received no ACK, NOT the number of messages queued in the
     %% inflight window.
     {Action, PostFn} = reply_caller_defer_metrics(
-        Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts
+        Id, ?REPLY(ReplyTo, Sent, Result), QueryOpts
     ),
+
+    ?tp(handle_async_reply, #{
+        action => Action,
+        batch_or_query => [_Query],
+        ref => Ref,
+        result => Result
+    }),
+
     case Action of
         nack ->
             %% Keep retrying.
-            ?tp(buffer_worker_reply_after_query, #{
-                action => Action,
-                batch_or_query => ?QUERY(ReplyTo, _Request, HasBeenSent, _ExpireAt),
-                ref => Ref,
-                result => Result
-            }),
             mark_inflight_as_retriable(InflightTID, Ref),
             ?MODULE:block(Pid);
         ack ->
-            ?tp(buffer_worker_reply_after_query, #{
-                action => Action,
-                batch_or_query => ?QUERY(ReplyTo, _Request, HasBeenSent, _ExpireAt),
-                ref => Ref,
-                result => Result
-            }),
             do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts)
     end.
 
-batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) ->
+handle_async_batch_reply(
+    #{
+        buffer_worker := Pid,
+        resource_id := Id,
+        worker_index := Index,
+        inflight_tid := InflightTID,
+        request_ref := Ref,
+        batch := Batch
+    } = ReplyContext,
+    Result
+) ->
     ?tp(
-        buffer_worker_reply_after_query_enter,
+        handle_async_reply_enter,
         #{batch_or_query => Batch, ref => Ref}
     ),
     Now = now_(),
@@ -1012,45 +1038,41 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Resu
             IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
             IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
             IsFullBefore andalso ?MODULE:flush_worker(Pid),
-            ?tp(buffer_worker_reply_after_query_expired, #{expired => Batch}),
+            ?tp(handle_async_reply_expired, #{expired => Batch}),
             ok;
         {NotExpired, Expired} ->
             NumExpired = length(Expired),
             emqx_resource_metrics:late_reply_inc(Id, NumExpired),
             NumExpired > 0 andalso
-                ?tp(buffer_worker_reply_after_query_expired, #{expired => Expired}),
-            do_batch_reply_after_query(
-                Pid, Id, Index, InflightTID, Ref, NotExpired, QueryOpts, Result
-            )
+                ?tp(handle_async_reply_expired, #{expired => Expired}),
+            do_handle_async_batch_reply(ReplyContext#{batch := NotExpired}, Result)
     end.
 
-do_batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) ->
-    ?tp(
-        buffer_worker_reply_after_query_enter,
-        #{batch_or_query => Batch, ref => Ref}
-    ),
-    %% NOTE: 'inflight' is the count of messages that were sent async
-    %% but received no ACK, NOT the number of messages queued in the
-    %% inflight window.
+do_handle_async_batch_reply(
+    #{
+        buffer_worker := Pid,
+        resource_id := Id,
+        worker_index := Index,
+        inflight_tid := InflightTID,
+        request_ref := Ref,
+        batch := Batch,
+        query_opts := QueryOpts
+    },
+    Result
+) ->
     {Action, PostFn} = batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts),
+    ?tp(handle_async_reply, #{
+        action => Action,
+        batch_or_query => Batch,
+        ref => Ref,
+        result => Result
+    }),
     case Action of
         nack ->
             %% Keep retrying.
-            ?tp(buffer_worker_reply_after_query, #{
-                action => nack,
-                batch_or_query => Batch,
-                ref => Ref,
-                result => Result
-            }),
             mark_inflight_as_retriable(InflightTID, Ref),
             ?MODULE:block(Pid);
         ack ->
-            ?tp(buffer_worker_reply_after_query, #{
-                action => ack,
-                batch_or_query => Batch,
-                ref => Ref,
-                result => Result
-            }),
             do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts)
     end.
 
@@ -1097,7 +1119,8 @@ append_queue(Id, Index, Q, Queries) ->
                 emqx_resource_metrics:dropped_queue_full_inc(Id, Dropped),
                 ?SLOG(info, #{
                     msg => buffer_worker_overflow,
-                    worker_id => Id,
+                    resource_id => Id,
+                    worker_index => Index,
                     dropped => Dropped
                 }),
                 {Items2, Q1}
@@ -1132,7 +1155,7 @@ inflight_new(InfltWinSZ, Id, Index) ->
     inflight_append(TableId, {?SIZE_REF, 0}, Id, Index),
     inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}, Id, Index),
     inflight_append(
-        TableId, {?INITIAL_MONOTONIC_TIME_REF, make_message_ref()}, Id, Index
+        TableId, {?INITIAL_MONOTONIC_TIME_REF, make_request_ref()}, Id, Index
     ),
     TableId.
 
@@ -1371,8 +1394,8 @@ cancel_flush_timer(St = #{tref := {TRef, _Ref}}) ->
     _ = erlang:cancel_timer(TRef),
     St#{tref => undefined}.
 
--spec make_message_ref() -> inflight_key().
-make_message_ref() ->
+-spec make_request_ref() -> inflight_key().
+make_request_ref() ->
     now_().
 
 collect_requests(Acc, Limit) ->
@@ -1458,3 +1481,15 @@ ensure_expire_at(#{timeout := TimeoutMS} = Opts) ->
     TimeoutNS = erlang:convert_time_unit(TimeoutMS, millisecond, nanosecond),
     ExpireAt = now_() + TimeoutNS,
     Opts#{expire_at => ExpireAt}.
+
+%% no need to keep the request for async reply handler
+minimize(?QUERY(_, _, _, _) = Q) ->
+    do_minimize(Q);
+minimize(L) when is_list(L) ->
+    lists:map(fun do_minimize/1, L).
+
+-ifdef(TEST).
+do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt) = Query) -> Query.
+-else.
+do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt).
+-endif.

+ 8 - 8
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -1718,7 +1718,7 @@ do_t_expiration_before_sending_partial_batch(QueryMode) ->
                 async ->
                     {ok, _} = ?block_until(
                         #{
-                            ?snk_kind := buffer_worker_reply_after_query,
+                            ?snk_kind := handle_async_reply,
                             action := ack,
                             batch_or_query := [{query, _, {inc_counter, 99}, _, _}]
                         },
@@ -1849,7 +1849,7 @@ do_t_expiration_async_after_reply(IsBatch) ->
             ?force_ordering(
                 #{?snk_kind := delay},
                 #{
-                    ?snk_kind := buffer_worker_reply_after_query_enter,
+                    ?snk_kind := handle_async_reply_enter,
                     batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _]
                 }
             ),
@@ -1874,7 +1874,7 @@ do_t_expiration_async_after_reply(IsBatch) ->
                 #{?snk_kind := buffer_worker_flush_potentially_partial}, 4 * TimeoutMS
             ),
             {ok, _} = ?block_until(
-                #{?snk_kind := buffer_worker_reply_after_query_expired}, 10 * TimeoutMS
+                #{?snk_kind := handle_async_reply_expired}, 10 * TimeoutMS
             ),
 
             unlink(Pid0),
@@ -1888,7 +1888,7 @@ do_t_expiration_async_after_reply(IsBatch) ->
                         expired := [{query, _, {inc_counter, 199}, _, _}]
                     }
                 ],
-                ?of_kind(buffer_worker_reply_after_query_expired, Trace)
+                ?of_kind(handle_async_reply_expired, Trace)
             ),
             wait_telemetry_event(success, #{n_events => 1, timeout => 4_000}),
             Metrics = tap_metrics(?LINE),
@@ -1936,7 +1936,7 @@ t_expiration_batch_all_expired_after_reply(_Config) ->
             ?force_ordering(
                 #{?snk_kind := delay},
                 #{
-                    ?snk_kind := buffer_worker_reply_after_query_enter,
+                    ?snk_kind := handle_async_reply_enter,
                     batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _]
                 }
             ),
@@ -1955,7 +1955,7 @@ t_expiration_batch_all_expired_after_reply(_Config) ->
                 end),
 
             {ok, _} = ?block_until(
-                #{?snk_kind := buffer_worker_reply_after_query_expired}, 10 * TimeoutMS
+                #{?snk_kind := handle_async_reply_expired}, 10 * TimeoutMS
             ),
 
             unlink(Pid0),
@@ -1969,7 +1969,7 @@ t_expiration_batch_all_expired_after_reply(_Config) ->
                         expired := [{query, _, {inc_counter, 199}, _, _}]
                     }
                 ],
-                ?of_kind(buffer_worker_reply_after_query_expired, Trace)
+                ?of_kind(handle_async_reply_expired, Trace)
             ),
             Metrics = tap_metrics(?LINE),
             ?assertMatch(
@@ -2335,7 +2335,7 @@ assert_async_retry_fail_then_succeed_inflight(Trace) ->
     ct:pal("  ~p", [Trace]),
     ?assert(
         ?strict_causality(
-            #{?snk_kind := buffer_worker_reply_after_query, action := nack, ref := _Ref},
+            #{?snk_kind := handle_async_reply, action := nack},
             #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref},
             Trace
         )

+ 1 - 1
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl

@@ -227,7 +227,7 @@ render_timestamp(Template, Message) ->
 %% Wolff producer never gives up retrying
 %% so there can only be 'ok' results.
 on_kafka_ack(_Partition, Offset, {ReplyFn, Args}) when is_integer(Offset) ->
-    %% the ReplyFn is emqx_resource_worker:reply_after_query/8
+    %% the ReplyFn is emqx_resource_worker:handle_async_reply/2
     apply(ReplyFn, Args ++ [ok]);
 on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) ->
     %% wolff should bump the dropped_queue_full counter

+ 2 - 2
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl

@@ -920,7 +920,7 @@ t_write_failure(Config) ->
                 async ->
                     ?wait_async_action(
                         ?assertEqual(ok, send_message(Config, SentData)),
-                        #{?snk_kind := buffer_worker_reply_after_query},
+                        #{?snk_kind := handle_async_reply},
                         1_000
                     )
             end
@@ -938,7 +938,7 @@ t_write_failure(Config) ->
                         #{got => Result}
                     );
                 async ->
-                    Trace = ?of_kind(buffer_worker_reply_after_query, Trace0),
+                    Trace = ?of_kind(handle_async_reply, Trace0),
                     ?assertMatch([#{action := nack} | _], Trace),
                     [#{result := Result} | _] = Trace,
                     ?assert(