Kaynağa Gözat

refactor(cm): force kill the proc that failed to response takeover call

port from: #7026
JianBo He 4 yıl önce
ebeveyn
işleme
9c1fe4336b
2 değiştirilmiş dosya ile 123 ekleme ve 59 silme
  1. 76 45
      apps/emqx/src/emqx_cm.erl
  2. 47 14
      apps/emqx/test/emqx_cm_SUITE.erl

+ 76 - 45
apps/emqx/src/emqx_cm.erl

@@ -262,6 +262,14 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
 open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
     Self = self(),
     ResumeStart = fun(_) ->
+                      CreateSess =
+                          fun() ->
+                              Session = create_session(ClientInfo, ConnInfo),
+                              Session1 = emqx_persistent_session:persist(
+                                           ClientInfo,ConnInfo, Session),
+                              register_channel(ClientId, Self, ConnInfo),
+                              {ok, #{session => Session1, present => false}}
+                          end,
                       case takeover_session(ClientId) of
                           {persistent, Session} ->
                               %% This is a persistent session without a managing process.
@@ -274,15 +282,20 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
                                      pendings => Pendings}};
                           {living, ConnMod, ChanPid, Session} ->
                               ok = emqx_session:resume(ClientInfo, Session),
-                              Session1 = emqx_persistent_session:persist( ClientInfo
-                                                                        , ConnInfo
-                                                                        , Session
-                                                                        ),
-                              Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER),
-                              register_channel(ClientId, Self, ConnInfo),
-                              {ok, #{session  => Session1,
-                                     present  => true,
-                                     pendings => Pendings}};
+                              case request_stepdown(
+                                     {takeover, 'end'},
+                                     ConnMod,
+                                     ChanPid) of
+                                  {ok, Pendings} ->
+                                      Session1 = emqx_persistent_session:persist(
+                                                   ClientInfo, ConnInfo, Session),
+                                      register_channel(ClientId, Self, ConnInfo),
+                                      {ok, #{session  => Session1,
+                                             present  => true,
+                                             pendings => Pendings}};
+                                  {error, _} ->
+                                      CreateSess()
+                              end;
                           {expired, OldSession} ->
                               _ = emqx_persistent_session:discard(ClientId, OldSession),
                               Session = create_session(ClientInfo, ConnInfo),
@@ -293,13 +306,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
                               register_channel(ClientId, Self, ConnInfo),
                               {ok, #{session => Session1, present => false}};
                           none ->
-                              Session = create_session(ClientInfo, ConnInfo),
-                              Session1 = emqx_persistent_session:persist( ClientInfo
-                                                                        , ConnInfo
-                                                                        , Session
-                                                                        ),
-                              register_channel(ClientId, Self, ConnInfo),
-                              {ok, #{session => Session1, present => false}}
+                              CreateSess()
                       end
                   end,
     emqx_cm_locker:trans(ClientId, ResumeStart).
@@ -359,9 +366,9 @@ takeover_session(ClientId) ->
 takeover_session(ClientId, Pid) ->
     try do_takeover_session(ClientId, Pid)
     catch
-        _ : noproc -> % emqx_ws_connection: call
-            emqx_persistent_session:lookup(ClientId);
-        _ : {noproc, _} -> % emqx_connection: gen_server:call
+        _ : R when R == noproc;
+                   R == timeout;
+                   R == unexpected_exception -> %% request_stepdown/3
             emqx_persistent_session:lookup(ClientId);
         _ : {'EXIT', {noproc, _}} -> % rpc_call/3
             emqx_persistent_session:lookup(ClientId)
@@ -372,9 +379,12 @@ do_takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
         undefined ->
             emqx_persistent_session:lookup(ClientId);
         ConnMod when is_atom(ConnMod) ->
-            %% TODO: if takeover times out, maybe kill the old?
-            Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
-            {living, ConnMod, ChanPid, Session}
+            case request_stepdown({takeover, 'begin'}, ConnMod, ChanPid) of
+                {ok, Session} ->
+                    {living, ConnMod, ChanPid, Session};
+                {error, Reason} ->
+                    error(Reason)
+            end
     end;
 do_takeover_session(ClientId, ChanPid) ->
     wrap_rpc(emqx_cm_proto_v1:takeover_session(ClientId, ChanPid)).
@@ -391,31 +401,52 @@ discard_session(ClientId) when is_binary(ClientId) ->
 %% If failed to kick (e.g. timeout) force a kill.
 %% Keeping the stale pid around, or returning error or raise an exception
 %% benefits nobody.
--spec kick_or_kill(kick | discard, module(), pid()) -> ok.
-kick_or_kill(Action, ConnMod, Pid) ->
-    try
+-spec request_stepdown(Action, module(), pid())
+    -> ok
+     | {ok, emqx_session:session() | list(emqx_type:deliver())}
+     | {error, term()}
+  when Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'}.
+request_stepdown(Action, ConnMod, Pid) ->
+    Timeout =
+        case Action == kick orelse Action == discard of
+            true -> ?T_KICK;
+            _ -> ?T_TAKEOVER
+        end,
+    Return =
         %% this is essentially a gen_server:call implemented in emqx_connection
         %% and emqx_ws_connection.
         %% the handle_call is implemented in emqx_channel
-        ok = apply(ConnMod, call, [Pid, Action, ?T_KICK])
-    catch
-        _ : noproc -> % emqx_ws_connection: call
-            ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action});
-        _ : {noproc, _} -> % emqx_connection: gen_server:call
-            ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action});
-        _ : {shutdown, _} ->
-            ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action});
-        _ : {{shutdown, _}, _} ->
-            ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action});
-        _ : {timeout, {gen_server, call, _}} ->
-            ?tp(warning, "session_kick_timeout",
-                #{pid => Pid, action => Action, stale_channel => stale_channel_info(Pid)}),
-            ok = force_kill(Pid);
-        _ : Error : St ->
-            ?tp(error, "session_kick_exception",
-                #{pid => Pid, action => Action, reason => Error, stacktrace => St,
-                    stale_channel => stale_channel_info(Pid)}),
-            ok = force_kill(Pid)
+        try apply(ConnMod, call, [Pid, Action, Timeout]) of
+            ok -> ok;
+            Reply -> {ok, Reply}
+        catch
+            _ : noproc -> % emqx_ws_connection: call
+                ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}),
+                {error, noproc};
+            _ : {noproc, _} -> % emqx_connection: gen_server:call
+                ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}),
+                {error, noproc};
+            _ : {shutdown, _} ->
+                ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}),
+                {error, noproc};
+            _ : {{shutdown, _}, _} ->
+                ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}),
+                {error, noproc};
+            _ : {timeout, {gen_server, call, _}} ->
+                ?tp(warning, "session_stepdown_request_timeout",
+                    #{pid => Pid, action => Action, stale_channel => stale_channel_info(Pid)}),
+                ok = force_kill(Pid),
+                {error, timeout};
+            _ : Error : St ->
+                ?tp(error, "session_stepdown_request_exception",
+                    #{pid => Pid, action => Action, reason => Error, stacktrace => St,
+                        stale_channel => stale_channel_info(Pid)}),
+                ok = force_kill(Pid),
+                {error, unexpected_exception}
+        end,
+    case Action == kick orelse Action == discard of
+        true -> ok;
+        _ -> Return
     end.
 
 force_kill(Pid) ->
@@ -438,7 +469,7 @@ do_kick_session(Action, ClientId, ChanPid) ->
             %% already deregistered
             ok;
         ConnMod when is_atom(ConnMod) ->
-            ok = kick_or_kill(Action, ConnMod, ChanPid)
+            ok = request_stepdown(Action, ConnMod, ChanPid)
     end.
 
 %% @private This function is shared for session 'kick' and 'discard' (as the first arg Action).

+ 47 - 14
apps/emqx/test/emqx_cm_SUITE.erl

@@ -190,45 +190,77 @@ t_open_session_race_condition(_) ->
     ?assertEqual([], emqx_cm:lookup_channels(ClientId)).
 
 t_kick_session_discard_normal(_) ->
-    test_kick_session(discard, normal).
+    test_stepdown_session(discard, normal).
 
 t_kick_session_discard_shutdown(_) ->
-    test_kick_session(discard, shutdown).
+    test_stepdown_session(discard, shutdown).
 
 t_kick_session_discard_shutdown_with_reason(_) ->
-    test_kick_session(discard, {shutdown, discard}).
+    test_stepdown_session(discard, {shutdown, discard}).
 
 t_kick_session_discard_timeout(_) ->
-    test_kick_session(discard, timeout).
+    test_stepdown_session(discard, timeout).
 
 t_kick_session_discard_noproc(_) ->
-    test_kick_session(discard, noproc).
+    test_stepdown_session(discard, noproc).
 
 t_kick_session_kick_normal(_) ->
-    test_kick_session(discard, normal).
+    test_stepdown_session(kick, normal).
 
 t_kick_session_kick_shutdown(_) ->
-    test_kick_session(discard, shutdown).
+    test_stepdown_session(kick, shutdown).
 
 t_kick_session_kick_shutdown_with_reason(_) ->
-    test_kick_session(discard, {shutdown, discard}).
+    test_stepdown_session(kick, {shutdown, kicked}).
 
 t_kick_session_kick_timeout(_) ->
-    test_kick_session(discard, timeout).
+    test_stepdown_session(kick, timeout).
 
 t_kick_session_kick_noproc(_) ->
-    test_kick_session(discard, noproc).
+    test_stepdown_session(kick, noproc).
 
-test_kick_session(Action, Reason) ->
+t_stepdown_session_takeover_begin_normal(_) ->
+    test_stepdown_session({takeover, 'begin'}, normal).
+
+t_stepdown_session_takeover_begin_shutdown(_) ->
+    test_stepdown_session({takeover, 'begin'}, shutdown).
+
+t_stepdown_session_takeover_begin_shutdown_with_reason(_) ->
+    test_stepdown_session({takeover, 'begin'}, {shutdown, kicked}).
+
+t_stepdown_session_takeover_begin_timeout(_) ->
+    test_stepdown_session({takeover, 'begin'}, timeout).
+
+t_stepdown_session_takeover_begin_noproc(_) ->
+    test_stepdown_session({takeover, 'begin'}, noproc).
+
+t_stepdown_session_takeover_end_normal(_) ->
+    test_stepdown_session({takeover, 'end'}, normal).
+
+t_stepdown_session_takeover_end_shutdown(_) ->
+    test_stepdown_session({takeover, 'end'}, shutdown).
+
+t_stepdown_session_takeover_end_shutdown_with_reason(_) ->
+    test_stepdown_session({takeover, 'end'}, {shutdown, kicked}).
+
+t_stepdown_session_takeover_end_timeout(_) ->
+    test_stepdown_session({takeover, 'end'}, timeout).
+
+t_stepdown_session_takeover_end_noproc(_) ->
+    test_stepdown_session({takeover, 'end'}, noproc).
+
+test_stepdown_session(Action, Reason) ->
     ClientId = rand_client_id(),
     #{conninfo := ConnInfo} = ?ChanInfo,
     FakeSessionFun =
         fun Loop() ->
                      receive
                          {'$gen_call', From, A} when A =:= kick orelse
-                                                     A =:= discard ->
+                                                     A =:= discard orelse
+                                                     A =:= {takeover, 'begin'} orelse
+                                                     A =:= {takeover, 'end'} ->
                              case Reason of
-                                 normal ->
+                                 normal when A =:= kick orelse A =:= discard ->
                                      gen_server:reply(From, ok);
                                  timeout ->
                                      %% no response to the call
@@ -253,7 +285,8 @@ test_kick_session(Action, Reason) ->
     end,
     ok = case Action of
              kick -> emqx_cm:kick_session(ClientId);
-             discard -> emqx_cm:discard_session(ClientId)
+             discard -> emqx_cm:discard_session(ClientId);
+             {takeover, _} -> none = emqx_cm:takeover_session(ClientId), ok
          end,
     case Reason =:= timeout orelse Reason =:= noproc of
         true ->