Kaynağa Gözat

fix(emqx_cm): make takeover session less likely to hit a race

Tobias Lindahl 4 yıl önce
ebeveyn
işleme
05f3bc8c90
2 değiştirilmiş dosya ile 53 ekleme ve 2 silme
  1. 16 2
      apps/emqx/src/emqx_cm.erl
  2. 37 0
      apps/emqx/test/emqx_cm_SUITE.erl

+ 16 - 2
apps/emqx/src/emqx_cm.erl

@@ -334,7 +334,21 @@ takeover_session(ClientId) ->
             takeover_session(ClientId, ChanPid)
             takeover_session(ClientId, ChanPid)
     end.
     end.
 
 
-takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
+takeover_session(ClientId, Pid) ->
+    try do_takeover_session(ClientId, Pid)
+    catch
+        _ : noproc -> % emqx_ws_connection: call
+            ?tp(debug, "session_gone", #{pid => Pid}),
+            emqx_persistent_session:lookup(ClientId);
+        _ : {noproc, _} -> % emqx_connection: gen_server:call
+            ?tp(debug, "session_gone", #{pid => Pid}),
+            emqx_persistent_session:lookup(ClientId);
+        _ : {'EXIT', {noproc, _}} -> % rpc_call/3
+            ?tp(debug, "session_gone", #{pid => Pid}),
+            emqx_persistent_session:lookup(ClientId)
+    end.
+
+do_takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
     case get_chann_conn_mod(ClientId, ChanPid) of
     case get_chann_conn_mod(ClientId, ChanPid) of
         undefined ->
         undefined ->
             emqx_persistent_session:lookup(ClientId);
             emqx_persistent_session:lookup(ClientId);
@@ -343,7 +357,7 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
             Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
             Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
             {living, ConnMod, ChanPid, Session}
             {living, ConnMod, ChanPid, Session}
     end;
     end;
-takeover_session(ClientId, ChanPid) ->
+do_takeover_session(ClientId, ChanPid) ->
     rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER).
     rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER).
 
 
 %% @doc Discard all the sessions identified by the ClientId.
 %% @doc Discard all the sessions identified by the ClientId.

+ 37 - 0
apps/emqx/test/emqx_cm_SUITE.erl

@@ -311,6 +311,43 @@ t_takeover_session(_) ->
     {living, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>),
     {living, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>),
     emqx_cm:unregister_channel(<<"clientid">>).
     emqx_cm:unregister_channel(<<"clientid">>).
 
 
+t_takeover_session_process_gone(_) ->
+    #{conninfo := ConnInfo} = ?ChanInfo,
+    ClientIDTcp = <<"clientidTCP">>,
+    ClientIDWs = <<"clientidWs">>,
+    ClientIDRpc = <<"clientidRPC">>,
+    none = emqx_cm:takeover_session(ClientIDTcp),
+    none = emqx_cm:takeover_session(ClientIDWs),
+    meck:new(emqx_connection, [passthrough, no_history]),
+    meck:expect(emqx_connection, call,
+                fun(Pid, {takeover, 'begin'}, _) ->
+                        exit({noproc, {gen_server,call,[Pid, takeover_session]}});
+                   (Pid, What, Args) ->
+                        meck:passthrough([Pid, What, Args])
+                end),
+    ok = emqx_cm:register_channel(ClientIDTcp, self(), ConnInfo),
+    none = emqx_cm:takeover_session(ClientIDTcp),
+    meck:expect(emqx_connection, call,
+                fun(_Pid, {takeover, 'begin'}, _) ->
+                        exit(noproc);
+                   (Pid, What, Args) ->
+                        meck:passthrough([Pid, What, Args])
+                end),
+    ok = emqx_cm:register_channel(ClientIDWs, self(), ConnInfo),
+    none = emqx_cm:takeover_session(ClientIDWs),
+    meck:expect(emqx_connection, call,
+                fun(Pid, {takeover, 'begin'}, _) ->
+                        exit({'EXIT', {noproc, {gen_server,call,[Pid, takeover_session]}}});
+                   (Pid, What, Args) ->
+                        meck:passthrough([Pid, What, Args])
+                end),
+    ok = emqx_cm:register_channel(ClientIDRpc, self(), ConnInfo),
+    none = emqx_cm:takeover_session(ClientIDRpc),
+    emqx_cm:unregister_channel(ClientIDTcp),
+    emqx_cm:unregister_channel(ClientIDWs),
+    emqx_cm:unregister_channel(ClientIDRpc),
+    meck:unload(emqx_connection).
+
 t_all_channels(_) ->
 t_all_channels(_) ->
     ?assertEqual(true, is_list(emqx_cm:all_channels())).
     ?assertEqual(true, is_list(emqx_cm:all_channels())).