Просмотр исходного кода

perf: improve demonitor performance

demonitor(Ref, [flush]) often will not be be able to get optimised
by the compiler hence fallback to a full mailbox scan to drain
the DOWN message.

This commit tries to avoid the 'flush' when it's for sure
that there is no 'DOWN' message sent.
Zaiming (Stone) Shi 3 лет назад
Родитель
Сommit
67608f623f

+ 21 - 3
apps/emqx/src/emqx_pmon.erl

@@ -16,13 +16,14 @@
 
 -module(emqx_pmon).
 
--compile({no_auto_import, [monitor/3]}).
+-compile({no_auto_import, [monitor/3, demonitor/1, demonitor/2]}).
 
 -export([new/0]).
 
 -export([
     monitor/2,
     monitor/3,
+    demonitor/1,
     demonitor/2
 ]).
 
@@ -65,13 +66,30 @@ monitor(Pid, Val, PMon = ?PMON(Map)) ->
 demonitor(Pid, PMon = ?PMON(Map)) ->
     case maps:find(Pid, Map) of
         {ok, {Ref, _Val}} ->
-            %% flush
-            _ = erlang:demonitor(Ref, [flush]),
+            ok = demonitor(Ref),
             ?PMON(maps:remove(Pid, Map));
         error ->
             PMon
     end.
 
+%% @doc Improved version of erlang:demonitor(Ref, [flush]).
+%% Only try to receive the 'DOWN' messages when it might have been sent.
+-spec demonitor(reference()) -> ok.
+demonitor(Ref) when is_reference(Ref) ->
+    case erlang:demonitor(Ref, [info]) of
+        true ->
+            %% succeeded
+            ok;
+        _ ->
+            %% '_', but not 'false' because this may change in the future according to OTP doc
+            receive
+                {'DOWN', Ref, process, _, _} ->
+                    ok
+            after 0 ->
+                ok
+            end
+    end.
+
 -spec find(pid(), pmon()) -> error | {ok, term()}.
 find(Pid, ?PMON(Map)) ->
     case maps:find(Pid, Map) of

+ 1 - 1
apps/emqx/src/emqx_shared_sub.erl

@@ -191,7 +191,7 @@ dispatch_with_ack(SubPid, Topic, Msg) ->
             {error, timeout}
         end
     after
-        _ = erlang:demonitor(Ref, [flush])
+        ok = emqx_pmon:demonitor(Ref)
     end.
 
 with_ack_ref(Msg, SenderRef) ->

+ 2 - 2
apps/emqx/src/emqx_ws_connection.erl

@@ -185,12 +185,12 @@ call(WsPid, Req, Timeout) when is_pid(WsPid) ->
     WsPid ! {call, {self(), Mref}, Req},
     receive
         {Mref, Reply} ->
-            erlang:demonitor(Mref, [flush]),
+            ok = emqx_pmon:demonitor(Mref),
             Reply;
         {'DOWN', Mref, _, _, Reason} ->
             exit(Reason)
     after Timeout ->
-        erlang:demonitor(Mref, [flush]),
+        ok = emqx_pmon:demonitor(Mref),
         exit(timeout)
     end.
 

+ 30 - 2
apps/emqx/test/emqx_pmon_SUITE.erl

@@ -60,5 +60,33 @@ t_erase(_) ->
     ?assertEqual([{self(), val}], Items),
     ?assertEqual(0, emqx_pmon:count(PMon3)).
 
-% t_erase_all(_) ->
-%     error('TODO').
+t_demonitor(_) ->
+    Pid = self(),
+    Ref1 = erlang:monitor(process, Pid),
+    Ref2 = erlang:monitor(process, spawn(fun() -> ok end)),
+    Ref3 = erlang:make_ref(),
+    ok = emqx_pmon:demonitor(Ref1),
+    ?assertNot(erlang:demonitor(Ref1, [info])),
+    ok = emqx_pmon:demonitor(Ref2),
+    % demonitor twice
+    ok = emqx_pmon:demonitor(Ref2),
+    ?assertNot(erlang:demonitor(Ref2, [info])),
+    % not a monitor ref, should return ok
+    ok = emqx_pmon:demonitor(Ref3),
+    ?assertNot(erlang:demonitor(Ref3, [info])),
+    Pid2 = spawn(fun() ->
+        receive
+            stop ->
+                exit(normal)
+        end
+    end),
+    Ref4 = erlang:monitor(process, Pid2),
+    Ref5 = erlang:monitor(process, Pid2),
+    ok = emqx_pmon:demonitor(Ref4),
+    ?assertNot(erlang:demonitor(Ref4, [info])),
+    _ = erlang:send(Pid2, stop),
+    receive
+        {'DOWN', Ref, process, Pid2, normal} ->
+            ?assertEqual(Ref, Ref5)
+    end,
+    ok.