소스 검색

fix(nolink_apply): avoid sending late replies to caller

Due to race conditions, it's possible that the caller to `pmap`/`nolink_apply` might
receive a late reply.

e.g. when a timeout occurred while resource manager was checking a resource's health:

```
19:18:23.084 [error] [data: ..., event_data: {#Reference<0.3247872820.3887857670.131018>, {:normal, [false, true, true, true, true, true]}}, event_type: :info, msg: :ignore_all_other_events, state: :connected]
```

Using an alias and also checking for the race condition in the `after` block (like
[`gen`](https://github.com/erlang/otp/blob/a76bf63197dbf41d9179413b26597afeeb46ff30/lib/stdlib/src/gen.erl#L270-L277)
does), we avoid polluting the caller's mailbox with late replies.
Thales Macedo Garitezi 2 년 전
부모
커밋
50e7d5d2ec
2개의 변경된 파일52개의 추가작업 그리고 4개의 파일을 삭제
  1. 20 4
      apps/emqx_utils/src/emqx_utils.erl
  2. 32 0
      apps/emqx_utils/test/emqx_utils_SUITE.erl

+ 20 - 4
apps/emqx_utils/src/emqx_utils.erl

@@ -20,6 +20,8 @@
 %% [TODO] Cleanup so the instruction below is not necessary.
 %% [TODO] Cleanup so the instruction below is not necessary.
 -elvis([{elvis_style, god_modules, disable}]).
 -elvis([{elvis_style, god_modules, disable}]).
 
 
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
 -export([
 -export([
     merge_opts/2,
     merge_opts/2,
     maybe_apply/2,
     maybe_apply/2,
@@ -432,7 +434,7 @@ nolink_apply(Fun) -> nolink_apply(Fun, infinity).
 -spec nolink_apply(function(), timer:timeout()) -> term().
 -spec nolink_apply(function(), timer:timeout()) -> term().
 nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
 nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
     Caller = self(),
     Caller = self(),
-    ResRef = make_ref(),
+    ResRef = alias([reply]),
     Middleman = erlang:spawn(
     Middleman = erlang:spawn(
         fun() ->
         fun() ->
             process_flag(trap_exit, true),
             process_flag(trap_exit, true),
@@ -446,7 +448,8 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
                             C:E:S ->
                             C:E:S ->
                                 {exception, {C, E, S}}
                                 {exception, {C, E, S}}
                         end,
                         end,
-                    _ = erlang:send(Caller, {ResRef, Res}),
+                    _ = erlang:send(ResRef, {ResRef, Res}),
+                    ?tp(pmap_middleman_sent_response, #{}),
                     exit(normal)
                     exit(normal)
                 end
                 end
             ),
             ),
@@ -460,7 +463,7 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
                     exit(normal);
                     exit(normal);
                 {'EXIT', Worker, Reason} ->
                 {'EXIT', Worker, Reason} ->
                     %% worker exited with some reason other than 'normal'
                     %% worker exited with some reason other than 'normal'
-                    _ = erlang:send(Caller, {ResRef, {'EXIT', Reason}}),
+                    _ = erlang:send(ResRef, {ResRef, {'EXIT', Reason}}),
                     exit(normal)
                     exit(normal)
             end
             end
         end
         end
@@ -473,8 +476,21 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
         {ResRef, {'EXIT', Reason}} ->
         {ResRef, {'EXIT', Reason}} ->
             exit(Reason)
             exit(Reason)
     after Timeout ->
     after Timeout ->
+        %% possible race condition: a message was received just as we enter the after
+        %% block.
+        ?tp(pmap_timeout, #{}),
+        unalias(ResRef),
         exit(Middleman, kill),
         exit(Middleman, kill),
-        exit(timeout)
+        receive
+            {ResRef, {normal, Result}} ->
+                Result;
+            {ResRef, {exception, {C, E, S}}} ->
+                erlang:raise(C, E, S);
+            {ResRef, {'EXIT', Reason}} ->
+                exit(Reason)
+        after 0 ->
+            exit(timeout)
+        end
     end.
     end.
 
 
 safe_to_existing_atom(In) ->
 safe_to_existing_atom(In) ->

+ 32 - 0
apps/emqx_utils/test/emqx_utils_SUITE.erl

@@ -20,6 +20,7 @@
 -compile(nowarn_export_all).
 -compile(nowarn_export_all).
 
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 
 -define(SOCKOPTS, [
 -define(SOCKOPTS, [
     binary,
     binary,
@@ -208,3 +209,34 @@ t_pmap_exception(_) ->
             [{2, 3}, {3, 4}, error]
             [{2, 3}, {3, 4}, error]
         )
         )
     ).
     ).
+
+t_pmap_late_reply(_) ->
+    ?check_trace(
+        begin
+            ?force_ordering(
+                #{?snk_kind := pmap_middleman_sent_response},
+                #{?snk_kind := pmap_timeout}
+            ),
+            Timeout = 100,
+            Res =
+                catch emqx_utils:pmap(
+                    fun(_) ->
+                        process_flag(trap_exit, true),
+                        timer:sleep(3 * Timeout),
+                        done
+                    end,
+                    [1, 2, 3],
+                    Timeout
+                ),
+            receive
+                {Ref, LateReply} when is_reference(Ref) ->
+                    ct:fail("should not receive late reply: ~p", [LateReply])
+            after (5 * Timeout) ->
+                ok
+            end,
+            ?assertMatch([done, done, done], Res),
+            ok
+        end,
+        []
+    ),
+    ok.