Explorar o código

refactor(buffer_worker): do not keep request body in reply context

the request body can be potentially very large
the reply context is sent to the async call handler and kept
in its memory until the async reply is received from bridge
target service.

this commit tries to minimize the size of the reply context
by replacing the request body with `[]`.
Zaiming (Stone) Shi %!s(int64=3) %!d(string=hai) anos
pai
achega
fc38ea9571

+ 101 - 60
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -52,7 +52,7 @@
 
 
 -export([queue_item_marshaller/1, estimate_size/1]).
 -export([queue_item_marshaller/1, estimate_size/1]).
 
 
--export([handle_async_reply/8, handle_async_batch_reply/8]).
+-export([handle_async_reply/2, handle_async_batch_reply/2]).
 
 
 -export([clear_disk_queue_dir/2]).
 -export([clear_disk_queue_dir/2]).
 
 
@@ -124,7 +124,7 @@ simple_sync_query(Id, Request) ->
     Index = undefined,
     Index = undefined,
     QueryOpts = simple_query_opts(),
     QueryOpts = simple_query_opts(),
     emqx_resource_metrics:matched_inc(Id),
     emqx_resource_metrics:matched_inc(Id),
-    Ref = make_message_ref(),
+    Ref = make_request_ref(),
     Result = call_query(sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
     Result = call_query(sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
     _ = handle_query_result(Id, Result, _HasBeenSent = false),
     _ = handle_query_result(Id, Result, _HasBeenSent = false),
     Result.
     Result.
@@ -135,7 +135,7 @@ simple_async_query(Id, Request) ->
     Index = undefined,
     Index = undefined,
     QueryOpts = simple_query_opts(),
     QueryOpts = simple_query_opts(),
     emqx_resource_metrics:matched_inc(Id),
     emqx_resource_metrics:matched_inc(Id),
-    Ref = make_message_ref(),
+    Ref = make_request_ref(),
     Result = call_query(async, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
     Result = call_query(async, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
     _ = handle_query_result(Id, Result, _HasBeenSent = false),
     _ = handle_query_result(Id, Result, _HasBeenSent = false),
     Result.
     Result.
@@ -511,7 +511,7 @@ flush(Data0) ->
                         buffer_worker_flush_potentially_partial,
                         buffer_worker_flush_potentially_partial,
                         #{expired => Expired, not_expired => NotExpired}
                         #{expired => Expired, not_expired => NotExpired}
                     ),
                     ),
-                    Ref = make_message_ref(),
+                    Ref = make_request_ref(),
                     do_flush(Data2, #{
                     do_flush(Data2, #{
                         new_queue => Q1,
                         new_queue => Q1,
                         is_batch => IsBatch,
                         is_batch => IsBatch,
@@ -897,13 +897,21 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re
     ?APPLY_RESOURCE(
     ?APPLY_RESOURCE(
         call_query_async,
         call_query_async,
         begin
         begin
-            ReplyFun = fun ?MODULE:handle_async_reply/8,
-            Args = [self(), Id, Index, InflightTID, Ref, Query, QueryOpts],
+            ReplyFun = fun ?MODULE:handle_async_reply/2,
+            ReplyContext = #{
+                buffer_worker => self(),
+                resource_id => Id,
+                worker_index => Index,
+                inflight_tid => InflightTID,
+                request_ref => Ref,
+                query_opts => QueryOpts,
+                query => minimize(Query)
+            },
             IsRetriable = false,
             IsRetriable = false,
             WorkerMRef = undefined,
             WorkerMRef = undefined,
             InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
             InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
             ok = inflight_append(InflightTID, InflightItem, Id, Index),
             ok = inflight_append(InflightTID, InflightItem, Id, Index),
-            Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt),
+            Result = Mod:on_query_async(Id, Request, {ReplyFun, [ReplyContext]}, ResSt),
             {async_return, Result}
             {async_return, Result}
         end,
         end,
         Request
         Request
@@ -922,8 +930,16 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re
     ?APPLY_RESOURCE(
     ?APPLY_RESOURCE(
         call_batch_query_async,
         call_batch_query_async,
         begin
         begin
-            ReplyFun = fun ?MODULE:handle_async_batch_reply/8,
-            ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch, QueryOpts]},
+            ReplyFun = fun ?MODULE:handle_async_batch_reply/2,
+            ReplyContext = #{
+                buffer_worker => self(),
+                resource_id => Id,
+                worker_index => Index,
+                inflight_tid => InflightTID,
+                request_ref => Ref,
+                query_opts => QueryOpts,
+                batch => minimize(Batch)
+            },
             Requests = lists:map(
             Requests = lists:map(
                 fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch
                 fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch
             ),
             ),
@@ -931,20 +947,21 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re
             WorkerMRef = undefined,
             WorkerMRef = undefined,
             InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
             InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
             ok = inflight_append(InflightTID, InflightItem, Id, Index),
             ok = inflight_append(InflightTID, InflightItem, Id, Index),
-            Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt),
+            Result = Mod:on_batch_query_async(Id, Requests, {ReplyFun, [ReplyContext]}, ResSt),
             {async_return, Result}
             {async_return, Result}
         end,
         end,
         Batch
         Batch
     ).
     ).
 
 
 handle_async_reply(
 handle_async_reply(
-    Pid,
-    Id,
-    Index,
-    InflightTID,
-    Ref,
-    ?QUERY(_ReplyTo, _Request, _HasBeenSent, ExpireAt) = Query,
-    QueryOpts,
+    #{
+        request_ref := Ref,
+        inflight_tid := InflightTID,
+        resource_id := Id,
+        worker_index := Index,
+        buffer_worker := Pid,
+        query := ?QUERY(_, _, _, ExpireAt) = Query
+    } = ReplyContext,
     Result
     Result
 ) ->
 ) ->
     ?tp(
     ?tp(
@@ -961,47 +978,55 @@ handle_async_reply(
             ?tp(buffer_worker_reply_after_query_expired, #{expired => [Query]}),
             ?tp(buffer_worker_reply_after_query_expired, #{expired => [Query]}),
             ok;
             ok;
         false ->
         false ->
-            do_reply_after_query(Pid, Id, Index, InflightTID, Ref, Query, QueryOpts, Result)
+            do_handle_async_reply(ReplyContext, Result)
     end.
     end.
 
 
-do_reply_after_query(
-    Pid,
-    Id,
-    Index,
-    InflightTID,
-    Ref,
-    ?QUERY(ReplyTo, _Request, HasBeenSent, _ExpireAt),
-    QueryOpts,
+do_handle_async_reply(
+    #{
+        query_opts := QueryOpts,
+        resource_id := Id,
+        request_ref := Ref,
+        worker_index := Index,
+        buffer_worker := Pid,
+        inflight_tid := InflightTID,
+        query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = Query
+    },
     Result
     Result
 ) ->
 ) ->
     %% NOTE: 'inflight' is the count of messages that were sent async
     %% NOTE: 'inflight' is the count of messages that were sent async
     %% but received no ACK, NOT the number of messages queued in the
     %% but received no ACK, NOT the number of messages queued in the
     %% inflight window.
     %% inflight window.
     {Action, PostFn} = reply_caller_defer_metrics(
     {Action, PostFn} = reply_caller_defer_metrics(
-        Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts
+        Id, ?REPLY(ReplyTo, Sent, Result), QueryOpts
     ),
     ),
+
+    ?tp(buffer_worker_reply_after_query, #{
+        action => Action,
+        batch_or_query => [Query],
+        ref => Ref,
+        result => Result
+    }),
+
     case Action of
     case Action of
         nack ->
         nack ->
             %% Keep retrying.
             %% Keep retrying.
-            ?tp(buffer_worker_reply_after_query, #{
-                action => Action,
-                batch_or_query => ?QUERY(ReplyTo, _Request, HasBeenSent, _ExpireAt),
-                ref => Ref,
-                result => Result
-            }),
             mark_inflight_as_retriable(InflightTID, Ref),
             mark_inflight_as_retriable(InflightTID, Ref),
             ?MODULE:block(Pid);
             ?MODULE:block(Pid);
         ack ->
         ack ->
-            ?tp(buffer_worker_reply_after_query, #{
-                action => Action,
-                batch_or_query => ?QUERY(ReplyTo, _Request, HasBeenSent, _ExpireAt),
-                ref => Ref,
-                result => Result
-            }),
             do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts)
             do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts)
     end.
     end.
 
 
-handle_async_batch_reply(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) ->
+handle_async_batch_reply(
+    #{
+        buffer_worker := Pid,
+        resource_id := Id,
+        worker_index := Index,
+        inflight_tid := InflightTID,
+        request_ref := Ref,
+        batch := Batch
+    } = ReplyContext,
+    Result
+) ->
     ?tp(
     ?tp(
         buffer_worker_reply_after_query_enter,
         buffer_worker_reply_after_query_enter,
         #{batch_or_query => Batch, ref => Ref}
         #{batch_or_query => Batch, ref => Ref}
@@ -1020,12 +1045,21 @@ handle_async_batch_reply(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Res
             emqx_resource_metrics:late_reply_inc(Id, NumExpired),
             emqx_resource_metrics:late_reply_inc(Id, NumExpired),
             NumExpired > 0 andalso
             NumExpired > 0 andalso
                 ?tp(buffer_worker_reply_after_query_expired, #{expired => Expired}),
                 ?tp(buffer_worker_reply_after_query_expired, #{expired => Expired}),
-            do_batch_reply_after_query(
-                Pid, Id, Index, InflightTID, Ref, NotExpired, QueryOpts, Result
-            )
+            do_handle_async_batch_reply(ReplyContext#{batch := NotExpired}, Result)
     end.
     end.
 
 
-do_batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) ->
+do_handle_async_batch_reply(
+    #{
+        buffer_worker := Pid,
+        resource_id := Id,
+        worker_index := Index,
+        inflight_tid := InflightTID,
+        request_ref := Ref,
+        batch := Batch,
+        query_opts := QueryOpts
+    },
+    Result
+) ->
     ?tp(
     ?tp(
         buffer_worker_reply_after_query_enter,
         buffer_worker_reply_after_query_enter,
         #{batch_or_query => Batch, ref => Ref}
         #{batch_or_query => Batch, ref => Ref}
@@ -1034,24 +1068,18 @@ do_batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, R
     %% but received no ACK, NOT the number of messages queued in the
     %% but received no ACK, NOT the number of messages queued in the
     %% inflight window.
     %% inflight window.
     {Action, PostFn} = batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts),
     {Action, PostFn} = batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts),
+    ?tp(buffer_worker_reply_after_query, #{
+        action => Action,
+        batch_or_query => Batch,
+        ref => Ref,
+        result => Result
+    }),
     case Action of
     case Action of
         nack ->
         nack ->
             %% Keep retrying.
             %% Keep retrying.
-            ?tp(buffer_worker_reply_after_query, #{
-                action => nack,
-                batch_or_query => Batch,
-                ref => Ref,
-                result => Result
-            }),
             mark_inflight_as_retriable(InflightTID, Ref),
             mark_inflight_as_retriable(InflightTID, Ref),
             ?MODULE:block(Pid);
             ?MODULE:block(Pid);
         ack ->
         ack ->
-            ?tp(buffer_worker_reply_after_query, #{
-                action => ack,
-                batch_or_query => Batch,
-                ref => Ref,
-                result => Result
-            }),
             do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts)
             do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts)
     end.
     end.
 
 
@@ -1098,7 +1126,8 @@ append_queue(Id, Index, Q, Queries) ->
                 emqx_resource_metrics:dropped_queue_full_inc(Id),
                 emqx_resource_metrics:dropped_queue_full_inc(Id),
                 ?SLOG(info, #{
                 ?SLOG(info, #{
                     msg => buffer_worker_overflow,
                     msg => buffer_worker_overflow,
-                    worker_id => Id,
+                    resource_id => Id,
+                    worker_index => Index,
                     dropped => Dropped
                     dropped => Dropped
                 }),
                 }),
                 {Items2, Q1}
                 {Items2, Q1}
@@ -1133,7 +1162,7 @@ inflight_new(InfltWinSZ, Id, Index) ->
     inflight_append(TableId, {?SIZE_REF, 0}, Id, Index),
     inflight_append(TableId, {?SIZE_REF, 0}, Id, Index),
     inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}, Id, Index),
     inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}, Id, Index),
     inflight_append(
     inflight_append(
-        TableId, {?INITIAL_MONOTONIC_TIME_REF, make_message_ref()}, Id, Index
+        TableId, {?INITIAL_MONOTONIC_TIME_REF, make_request_ref()}, Id, Index
     ),
     ),
     TableId.
     TableId.
 
 
@@ -1372,8 +1401,8 @@ cancel_flush_timer(St = #{tref := {TRef, _Ref}}) ->
     _ = erlang:cancel_timer(TRef),
     _ = erlang:cancel_timer(TRef),
     St#{tref => undefined}.
     St#{tref => undefined}.
 
 
--spec make_message_ref() -> inflight_key().
-make_message_ref() ->
+-spec make_request_ref() -> inflight_key().
+make_request_ref() ->
     now_().
     now_().
 
 
 collect_requests(Acc, Limit) ->
 collect_requests(Acc, Limit) ->
@@ -1459,3 +1488,15 @@ ensure_expire_at(#{timeout := TimeoutMS} = Opts) ->
     TimeoutNS = erlang:convert_time_unit(TimeoutMS, millisecond, nanosecond),
     TimeoutNS = erlang:convert_time_unit(TimeoutMS, millisecond, nanosecond),
     ExpireAt = now_() + TimeoutNS,
     ExpireAt = now_() + TimeoutNS,
     Opts#{expire_at => ExpireAt}.
     Opts#{expire_at => ExpireAt}.
+
+%% no need to keep the request for async reply handler
+minimize(?QUERY(_, _, _, _) = Q) ->
+    do_minimize(Q);
+minimize(L) when is_list(L) ->
+    lists:map(fun do_minimize/1, L).
+
+-ifdef(TEST).
+do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt) = Query) -> Query.
+-else.
+do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt).
+-endif.

+ 1 - 1
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -2335,7 +2335,7 @@ assert_async_retry_fail_then_succeed_inflight(Trace) ->
     ct:pal("  ~p", [Trace]),
     ct:pal("  ~p", [Trace]),
     ?assert(
     ?assert(
         ?strict_causality(
         ?strict_causality(
-            #{?snk_kind := buffer_worker_reply_after_query, action := nack, ref := _Ref},
+            #{?snk_kind := buffer_worker_reply_after_query, action := nack},
             #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref},
             #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref},
             Trace
             Trace
         )
         )