Просмотр исходного кода

Merge pull request #4834 from zmstone/fix-emqx-cm-do-not-log-noproc-as-error

fix(emqx_cm): do not log noproc as error
Zaiming (Stone) Shi 4 лет назад
Родитель
Сommit
f1dcb35f53

+ 1 - 1
apps/emqx_management/src/emqx_management.app.src

@@ -1,6 +1,6 @@
 {application, emqx_management,
  [{description, "EMQ X Management API and CLI"},
-  {vsn, "4.3.1"}, % strict semver, bump manually!
+  {vsn, "4.3.2"}, % strict semver, bump manually!
   {modules, []},
   {registered, [emqx_management_sup]},
   {applications, [kernel,stdlib,minirest]},

+ 4 - 4
apps/emqx_management/src/emqx_management.appup.src

@@ -1,12 +1,12 @@
 %% -*-: erlang -*-
-{"4.3.1",
- [ {"4.3.0",
+{"4.3.2",
+ [ {<<"4.3.[0-1]">>,
     [ {load_module, emqx_mgmt_data_backup, brutal_purge, soft_purge, []}
     ]}
  ],
  [
-   {"4.3.0",
+   {<<"4.3.[0-1]">>,
     [ {load_module, emqx_mgmt_data_backup, brutal_purge, soft_purge, []}
     ]}
  ]
-}.
+}.

+ 2 - 0
src/emqx.appup.src

@@ -3,6 +3,7 @@
  [
    {"4.3.1", [
      {load_module, emqx_connection, brutal_purge, soft_purge, []},
+     {load_module, emqx_cm, brutal_purge, soft_purge, []},
      {load_module, emqx_congestion, brutal_purge, soft_purge, []},
      {load_module, emqx_node_dump, brutal_purge, soft_purge, []}
    ]},
@@ -20,6 +21,7 @@
  [
    {"4.3.1", [
      {load_module, emqx_connection, brutal_purge, soft_purge, []},
+     {load_module, emqx_cm, brutal_purge, soft_purge, []},
      {load_module, emqx_congestion, brutal_purge, soft_purge, []},
      {load_module, emqx_node_dump, brutal_purge, soft_purge, []}
    ]},

+ 20 - 12
src/emqx_cm.erl

@@ -22,6 +22,7 @@
 -include("emqx.hrl").
 -include("logger.hrl").
 -include("types.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -logger_header("[CM]").
 
@@ -279,18 +280,25 @@ takeover_session(ClientId, ChanPid) ->
 discard_session(ClientId) when is_binary(ClientId) ->
     case lookup_channels(ClientId) of
         [] -> ok;
-        ChanPids ->
-            lists:foreach(
-              fun(ChanPid) ->
-                      try
-                          discard_session(ClientId, ChanPid)
-                      catch
-                          _:{noproc,_}:_Stk -> ok;
-                          _:{{shutdown,_},_}:_Stk -> ok;
-                          _:Error:_Stk ->
-                              ?LOG(error, "Failed to discard ~0p: ~0p", [ChanPid, Error])
-                      end
-              end, ChanPids)
+        ChanPids -> lists:foreach(fun(Pid) -> do_discard_session(ClientId, Pid) end, ChanPids)
+    end.
+
+do_discard_session(ClientId, Pid) ->
+    try
+        discard_session(ClientId, Pid)
+    catch
+        _ : noproc -> % emqx_ws_connection: call
+            ?tp(debug, "session_already_gone", #{pid => Pid}),
+            ok;
+        _ : {noproc, _} -> % emqx_connection: gen_server:call
+            ?tp(debug, "session_already_gone", #{pid => Pid}),
+            ok;
+        _ : {{shutdown, _}, _} ->
+            ?tp(debug, "session_already_shutdown", #{pid => Pid}),
+            ok;
+        _ : Error : St ->
+            ?tp(error, "failed_to_discard_session",
+                #{pid => Pid, reason => Error, stacktrace=>St})
     end.
 
 discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->

+ 13 - 0
test/emqx_cm_SUITE.erl

@@ -21,6 +21,7 @@
 
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(CM, emqx_cm).
 -define(ChanInfo,#{conninfo =>
@@ -179,6 +180,18 @@ t_discard_session(_) ->
     ok = emqx_cm:unregister_channel(<<"clientid">>),
     ok = meck:unload(emqx_connection).
 
+t_discard_session_race(_) ->
+    ok = snabbkaffe:start_trace(),
+    #{conninfo := ConnInfo0} = ?ChanInfo,
+    ConnInfo = ConnInfo0#{conn_mod := emqx_ws_connection},
+    {Pid, Ref} = spawn_monitor(fun() -> receive stop -> exit(normal) end end),
+    ok = emqx_cm:register_channel(<<"clientid">>, Pid, ConnInfo),
+    Pid ! stop,
+    receive {'DOWN', Ref, process, Pid, normal} -> ok end,
+    ok = emqx_cm:discard_session(<<"clientid">>),
+    {ok, _} = ?block_until(#{?snk_kind := "session_already_gone", pid := Pid}, 1000),
+    snabbkaffe:stop().
+
 t_takeover_session(_) ->
     #{conninfo := ConnInfo} = ?ChanInfo,
     {error, not_found} = emqx_cm:takeover_session(<<"clientid">>),