Browse Source

fix(buffer_worker): avoid sending late reply messages to callers

Fixes https://emqx.atlassian.net/browse/EMQX-9635

During a sync call from process `A` to a buffer worker `B`, its call
to the underlying resource `C` can be very slow.  In those cases, `A`
will receive a timeout response and expect no more messages from `B`
nor `C`.  However, prior to this fix, if `B` is stuck in a long sync
call to `C` and then gets its response after `A` timed out, `B` would
still send the late response to `A`, polluting its mailbox.
Thales Macedo Garitezi 2 years atrás
parent
commit
c53741a08c

+ 13 - 4
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -52,7 +52,7 @@
 
 -export([queue_item_marshaller/1, estimate_size/1]).
 
--export([handle_async_reply/2, handle_async_batch_reply/2]).
+-export([handle_async_reply/2, handle_async_batch_reply/2, reply_call/2]).
 
 -export([clear_disk_queue_dir/2]).
 
@@ -293,10 +293,8 @@ code_change(_OldVsn, State, _Extra) ->
 
 pick_call(Id, Key, Query, Timeout) ->
     ?PICK(Id, Key, Pid, begin
-        Caller = self(),
         MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
-        From = {Caller, MRef},
-        ReplyTo = {fun gen_statem:reply/2, [From]},
+        ReplyTo = {fun ?MODULE:reply_call/2, [MRef]},
         erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)),
         receive
             {MRef, Response} ->
@@ -1703,6 +1701,17 @@ default_resume_interval(_RequestTimeout = infinity, HealthCheckInterval) ->
 default_resume_interval(RequestTimeout, HealthCheckInterval) ->
     max(1, min(HealthCheckInterval, RequestTimeout div 3)).
 
+-spec reply_call(reference(), term()) -> ok.
+reply_call(Alias, Response) ->
+    %% Since we use a reference created with `{alias,
+    %% reply_demonitor}', after we `demonitor' it in case of a
+    %% timeout, we won't send any more messages that the caller is not
+    %% expecting anymore.  Using `gen_statem:reply({pid(),
+    %% reference()}, _)' would still send a late reply even after the
+    %% demonitor.
+    erlang:send(Alias, {Alias, Response}),
+    ok.
+
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 adjust_batch_time_test_() ->

+ 5 - 1
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -144,7 +144,11 @@ on_query(_InstId, {sleep_before_reply, For}, #{pid := Pid}) ->
             Result
     after 1000 ->
         {error, timeout}
-    end.
+    end;
+on_query(_InstId, {sync_sleep_before_reply, SleepFor}, _State) ->
+    %% This simulates a slow sync call
+    timer:sleep(SleepFor),
+    {ok, slept}.
 
 on_query_async(_InstId, block, ReplyFun, #{pid := Pid}) ->
     Pid ! {block, ReplyFun},

+ 45 - 0
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -2751,6 +2751,51 @@ t_volatile_offload_mode(_Config) ->
         end
     ).
 
+t_late_call_reply(_Config) ->
+    emqx_connector_demo:set_callback_mode(always_sync),
+    RequestTimeout = 500,
+    ?assertMatch(
+        {ok, _},
+        emqx_resource:create(
+            ?ID,
+            ?DEFAULT_RESOURCE_GROUP,
+            ?TEST_RESOURCE,
+            #{name => test_resource},
+            #{
+                buffer_mode => memory_only,
+                request_timeout => RequestTimeout,
+                query_mode => sync
+            }
+        )
+    ),
+    ?check_trace(
+        begin
+            %% Sleep for longer than the request timeout; the call reply will
+            %% have been already returned (a timeout), but the resource will
+            %% still send a message with the reply.
+            %% The demo connector will reply with `{error, timeout}' after 1 s.
+            SleepFor = RequestTimeout + 500,
+            ?assertMatch(
+                {error, {resource_error, #{reason := timeout}}},
+                emqx_resource:query(
+                    ?ID,
+                    {sync_sleep_before_reply, SleepFor},
+                    #{timeout => RequestTimeout}
+                )
+            ),
+            %% Our process shouldn't receive any late messages.
+            receive
+                LateReply ->
+                    ct:fail("received late reply: ~p", [LateReply])
+            after SleepFor ->
+                ok
+            end,
+            ok
+        end,
+        []
+    ),
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Helpers
 %%------------------------------------------------------------------------------

+ 9 - 0
changes/ce/fix-10455.en.md

@@ -0,0 +1,9 @@
+Fixed an issue that could cause (otherwise harmless) noise in the logs.
+
+During some particularly slow synchronous calls to bridges, some late replies could be sent to connections processes that were no longer expecting a reply, and then emit an error log like:
+
+```
+2023-04-19T18:24:35.350233+00:00 [error] msg: unexpected_info, mfa: emqx_channel:handle_info/2, line: 1278, peername: 172.22.0.1:36384, clientid: caribdis_bench_sub_1137967633_4788, info: {#Ref<0.408802983.1941504010.189402>,{ok,200,[{<<"cache-control">>,<<"max-age=0, ...">>}}
+```
+
+Those logs are harmless, but they could flood and worry the users without need.