Просмотр исходного кода

Merge pull request #7494 from zmstone/0401-5.0-perf-demonitor

perf: improve demonitor performance
Zaiming (Stone) Shi 3 лет назад
Родитель
Сommit
9ed44e3bd7

+ 4 - 4
apps/emqx/include/emqx_mqtt.hrl

@@ -662,9 +662,9 @@ end).
     end
 ).
 
--define(FRAME_PARSE_ERROR(Reason), {frame_parse_error, Reason}).
--define(FRAME_SERIALIZE_ERROR(Reason), {frame_serialize_error, Reason}).
--define(THROW_FRAME_ERROR(Reason), erlang:throw(?FRAME_PARSE_ERROR(Reason))).
--define(THROW_SERIALIZE_ERROR(Reason), erlang:throw(?FRAME_SERIALIZE_ERROR(Reason))).
+-define(FRAME_PARSE_ERROR, frame_parse_error).
+-define(FRAME_SERIALIZE_ERROR, frame_serialize_error).
+-define(THROW_FRAME_ERROR(Reason), erlang:throw({?FRAME_PARSE_ERROR, Reason})).
+-define(THROW_SERIALIZE_ERROR(Reason), erlang:throw({?FRAME_SERIALIZE_ERROR, Reason})).
 
 -endif.

+ 4 - 4
apps/emqx/src/emqx_connection.erl

@@ -765,7 +765,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
             NState = State#state{parse_state = NParseState},
             parse_incoming(Rest, [Packet | Packets], NState)
     catch
-        throw:?FRAME_PARSE_ERROR(Reason) ->
+        throw:{?FRAME_PARSE_ERROR, Reason} ->
             ?SLOG(info, #{
                 reason => Reason,
                 at_state => emqx_frame:describe_state(ParseState),
@@ -840,19 +840,19 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
                 Data
         catch
             %% Maybe Never happen.
-            throw:?FRAME_SERIALIZE_ERROR(Reason) ->
+            throw:{?FRAME_SERIALIZE_ERROR, Reason} ->
                 ?SLOG(info, #{
                     reason => Reason,
                     input_packet => Packet
                 }),
-                erlang:error(?FRAME_SERIALIZE_ERROR(Reason));
+                erlang:error({?FRAME_SERIALIZE_ERROR, Reason});
             error:Reason:Stacktrace ->
                 ?SLOG(error, #{
                     input_packet => Packet,
                     exception => Reason,
                     stacktrace => Stacktrace
                 }),
-                erlang:error(frame_serialize_error)
+                erlang:error(?FRAME_SERIALIZE_ERROR)
         end
     end.
 

+ 21 - 3
apps/emqx/src/emqx_pmon.erl

@@ -16,13 +16,14 @@
 
 -module(emqx_pmon).
 
--compile({no_auto_import, [monitor/3]}).
+-compile({no_auto_import, [monitor/3, demonitor/1, demonitor/2]}).
 
 -export([new/0]).
 
 -export([
     monitor/2,
     monitor/3,
+    demonitor/1,
     demonitor/2
 ]).
 
@@ -65,13 +66,30 @@ monitor(Pid, Val, PMon = ?PMON(Map)) ->
 demonitor(Pid, PMon = ?PMON(Map)) ->
     case maps:find(Pid, Map) of
         {ok, {Ref, _Val}} ->
-            %% flush
-            _ = erlang:demonitor(Ref, [flush]),
+            ok = demonitor(Ref),
             ?PMON(maps:remove(Pid, Map));
         error ->
             PMon
     end.
 
+%% @doc Improved version of erlang:demonitor(Ref, [flush]).
+%% Only try to receive the 'DOWN' messages when it might have been sent.
+-spec demonitor(reference()) -> ok.
+demonitor(Ref) when is_reference(Ref) ->
+    case erlang:demonitor(Ref, [info]) of
+        true ->
+            %% succeeded
+            ok;
+        _ ->
+            %% '_', but not 'false' because this may change in the future according to OTP doc
+            receive
+                {'DOWN', Ref, process, _, _} ->
+                    ok
+            after 0 ->
+                ok
+            end
+    end.
+
 -spec find(pid(), pmon()) -> error | {ok, term()}.
 find(Pid, ?PMON(Map)) ->
     case maps:find(Pid, Map) of

+ 1 - 1
apps/emqx/src/emqx_shared_sub.erl

@@ -191,7 +191,7 @@ dispatch_with_ack(SubPid, Topic, Msg) ->
             {error, timeout}
         end
     after
-        _ = erlang:demonitor(Ref, [flush])
+        ok = emqx_pmon:demonitor(Ref)
     end.
 
 with_ack_ref(Msg, SenderRef) ->

+ 6 - 6
apps/emqx/src/emqx_ws_connection.erl

@@ -185,12 +185,12 @@ call(WsPid, Req, Timeout) when is_pid(WsPid) ->
     WsPid ! {call, {self(), Mref}, Req},
     receive
         {Mref, Reply} ->
-            erlang:demonitor(Mref, [flush]),
+            ok = emqx_pmon:demonitor(Mref),
             Reply;
         {'DOWN', Mref, _, _, Reason} ->
             exit(Reason)
     after Timeout ->
-        erlang:demonitor(Mref, [flush]),
+        ok = emqx_pmon:demonitor(Mref),
         exit(timeout)
     end.
 
@@ -676,7 +676,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
             NState = State#state{parse_state = NParseState},
             parse_incoming(Rest, [{incoming, Packet} | Packets], NState)
     catch
-        throw:?FRAME_PARSE_ERROR(Reason) ->
+        throw:{?FRAME_PARSE_ERROR, Reason} ->
             ?SLOG(info, #{
                 reason => Reason,
                 at_state => emqx_frame:describe_state(ParseState),
@@ -791,19 +791,19 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
                 Data
         catch
             %% Maybe Never happen.
-            throw:?FRAME_SERIALIZE_ERROR(Reason) ->
+            throw:{?FRAME_SERIALIZE_ERROR, Reason} ->
                 ?SLOG(info, #{
                     reason => Reason,
                     input_packet => Packet
                 }),
-                erlang:error(?FRAME_SERIALIZE_ERROR(Reason));
+                erlang:error({?FRAME_SERIALIZE_ERROR, Reason});
             error:Reason:Stacktrace ->
                 ?SLOG(error, #{
                     input_packet => Packet,
                     exception => Reason,
                     stacktrace => Stacktrace
                 }),
-                erlang:error(frame_serialize_error)
+                erlang:error(?FRAME_SERIALIZE_ERROR)
         end
     end.
 

+ 1 - 1
apps/emqx/test/emqx_frame_SUITE.erl

@@ -24,7 +24,7 @@
 -include_lib("common_test/include/ct.hrl").
 
 -define(ASSERT_FRAME_THROW(Reason, Expr),
-    ?assertThrow(?FRAME_PARSE_ERROR(Reason), Expr)
+    ?assertThrow({?FRAME_PARSE_ERROR, Reason}, Expr)
 ).
 
 all() ->

+ 30 - 2
apps/emqx/test/emqx_pmon_SUITE.erl

@@ -60,5 +60,33 @@ t_erase(_) ->
     ?assertEqual([{self(), val}], Items),
     ?assertEqual(0, emqx_pmon:count(PMon3)).
 
-% t_erase_all(_) ->
-%     error('TODO').
+t_demonitor(_) ->
+    Pid = self(),
+    Ref1 = erlang:monitor(process, Pid),
+    Ref2 = erlang:monitor(process, spawn(fun() -> ok end)),
+    Ref3 = erlang:make_ref(),
+    ok = emqx_pmon:demonitor(Ref1),
+    ?assertNot(erlang:demonitor(Ref1, [info])),
+    ok = emqx_pmon:demonitor(Ref2),
+    % demonitor twice
+    ok = emqx_pmon:demonitor(Ref2),
+    ?assertNot(erlang:demonitor(Ref2, [info])),
+    % not a monitor ref, should return ok
+    ok = emqx_pmon:demonitor(Ref3),
+    ?assertNot(erlang:demonitor(Ref3, [info])),
+    Pid2 = spawn(fun() ->
+        receive
+            stop ->
+                exit(normal)
+        end
+    end),
+    Ref4 = erlang:monitor(process, Pid2),
+    Ref5 = erlang:monitor(process, Pid2),
+    ok = emqx_pmon:demonitor(Ref4),
+    ?assertNot(erlang:demonitor(Ref4, [info])),
+    _ = erlang:send(Pid2, stop),
+    receive
+        {'DOWN', Ref, process, Pid2, normal} ->
+            ?assertEqual(Ref, Ref5)
+    end,
+    ok.