Explorar el Código

fix(emqx_cm): do not log noproc as error

1. websocket call exit with noproc reason.
2. do not capture stacktrace when no need for it
Zaiming Shi hace 4 años
padre
commit
4e1798e3f3
Se han modificado 3 ficheros con 35 adiciones y 12 borrados
  1. 2 0
      src/emqx.appup.src
  2. 20 12
      src/emqx_cm.erl
  3. 13 0
      test/emqx_cm_SUITE.erl

+ 2 - 0
src/emqx.appup.src

@@ -3,6 +3,7 @@
  [
    {"4.3.1", [
      {load_module, emqx_connection, brutal_purge, soft_purge, []},
+     {load_module, emqx_cm, brutal_purge, soft_purge, []},
      {load_module, emqx_congestion, brutal_purge, soft_purge, []},
      {load_module, emqx_node_dump, brutal_purge, soft_purge, []}
    ]},
@@ -20,6 +21,7 @@
  [
    {"4.3.1", [
      {load_module, emqx_connection, brutal_purge, soft_purge, []},
+     {load_module, emqx_cm, brutal_purge, soft_purge, []},
      {load_module, emqx_congestion, brutal_purge, soft_purge, []},
      {load_module, emqx_node_dump, brutal_purge, soft_purge, []}
    ]},

+ 20 - 12
src/emqx_cm.erl

@@ -22,6 +22,7 @@
 -include("emqx.hrl").
 -include("logger.hrl").
 -include("types.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -logger_header("[CM]").
 
@@ -279,18 +280,25 @@ takeover_session(ClientId, ChanPid) ->
 discard_session(ClientId) when is_binary(ClientId) ->
     case lookup_channels(ClientId) of
         [] -> ok;
-        ChanPids ->
-            lists:foreach(
-              fun(ChanPid) ->
-                      try
-                          discard_session(ClientId, ChanPid)
-                      catch
-                          _:{noproc,_}:_Stk -> ok;
-                          _:{{shutdown,_},_}:_Stk -> ok;
-                          _:Error:_Stk ->
-                              ?LOG(error, "Failed to discard ~0p: ~0p", [ChanPid, Error])
-                      end
-              end, ChanPids)
+        ChanPids -> lists:foreach(fun(Pid) -> do_discard_session(ClientId, Pid) end, ChanPids)
+    end.
+
+do_discard_session(ClientId, Pid) ->
+    try
+        discard_session(ClientId, Pid)
+    catch
+        _ : noproc -> % emqx_ws_connection: call
+            ?tp(debug, "session_already_gone", #{pid => Pid}),
+            ok;
+        _ : {noproc, _} -> % emqx_connection: gen_server:call
+            ?tp(debug, "session_already_gone", #{pid => Pid}),
+            ok;
+        _ : {{shutdown, _}, _} ->
+            ?tp(debug, "session_already_shutdown", #{pid => Pid}),
+            ok;
+        _ : Error : St ->
+            ?tp(error, "failed_to_discard_session",
+                #{pid => Pid, reason => Error, stacktrace=>St})
     end.
 
 discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->

+ 13 - 0
test/emqx_cm_SUITE.erl

@@ -21,6 +21,7 @@
 
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(CM, emqx_cm).
 -define(ChanInfo,#{conninfo =>
@@ -179,6 +180,18 @@ t_discard_session(_) ->
     ok = emqx_cm:unregister_channel(<<"clientid">>),
     ok = meck:unload(emqx_connection).
 
+t_discard_session_race(_) ->
+    ok = snabbkaffe:start_trace(),
+    #{conninfo := ConnInfo0} = ?ChanInfo,
+    ConnInfo = ConnInfo0#{conn_mod := emqx_ws_connection},
+    {Pid, Ref} = spawn_monitor(fun() -> receive stop -> exit(normal) end end),
+    ok = emqx_cm:register_channel(<<"clientid">>, Pid, ConnInfo),
+    Pid ! stop,
+    receive {'DOWN', Ref, process, Pid, normal} -> ok end,
+    ok = emqx_cm:discard_session(<<"clientid">>),
+    {ok, _} = ?block_until(#{?snk_kind := "session_already_gone", pid := Pid}, 1000),
+    snabbkaffe:stop().
+
 t_takeover_session(_) ->
     #{conninfo := ConnInfo} = ?ChanInfo,
     {error, not_found} = emqx_cm:takeover_session(<<"clientid">>),