Prechádzať zdrojové kódy

Merge pull request #11733 from keynslug/fix/EMQX-10827/sesson-takeover

fix(cm): bring back pre-v5.3.0 compat in `takeover_session_begin/1`
Zaiming (Stone) Shi 2 rokov pred
rodič
commit
a2e86c67db

+ 11 - 4
apps/emqx/src/emqx_cm.erl

@@ -298,9 +298,9 @@ takeover_session_begin(ClientId) ->
 
 takeover_session_begin(ClientId, ChanPid) when is_pid(ChanPid) ->
     case takeover_session(ClientId, ChanPid) of
-        {living, ConnMod, Session} ->
+        {living, ConnMod, ChanPid, Session} ->
             {ok, Session, {ConnMod, ChanPid}};
-        none ->
+        _ ->
             none
     end;
 takeover_session_begin(_ClientId, undefined) ->
@@ -368,13 +368,20 @@ do_takeover_begin(ClientId, ChanPid) when node(ChanPid) == node() ->
         ConnMod when is_atom(ConnMod) ->
             case request_stepdown({takeover, 'begin'}, ConnMod, ChanPid) of
                 {ok, Session} ->
-                    {living, ConnMod, Session};
+                    {living, ConnMod, ChanPid, Session};
                 {error, Reason} ->
                     error(Reason)
             end
     end;
 do_takeover_begin(ClientId, ChanPid) ->
-    wrap_rpc(emqx_cm_proto_v2:takeover_session(ClientId, ChanPid)).
+    case wrap_rpc(emqx_cm_proto_v2:takeover_session(ClientId, ChanPid)) of
+        %% NOTE: v5.3.0
+        {living, ConnMod, Session} ->
+            {living, ConnMod, ChanPid, Session};
+        %% NOTE: other versions
+        Res ->
+            Res
+    end.
 
 %% @doc Discard all the sessions identified by the ClientId.
 -spec discard_session(emqx_types:clientid()) -> ok.

+ 3 - 1
apps/emqx/src/proto/emqx_cm_proto_v2.erl

@@ -65,8 +65,10 @@ get_chann_conn_mod(ClientId, ChanPid) ->
 
 -spec takeover_session(emqx_types:clientid(), emqx_cm:chan_pid()) ->
     none
-    | {expired | persistent, emqx_session:session()}
     | {living, _ConnMod :: atom(), emqx_cm:chan_pid(), emqx_session:session()}
+    %% NOTE: v5.3.0
+    | {living, _ConnMod :: atom(), emqx_session:session()}
+    | {expired | persistent, emqx_session:session()}
     | {badrpc, _}.
 takeover_session(ClientId, ChanPid) ->
     rpc:call(node(ChanPid), emqx_cm, takeover_session, [ClientId, ChanPid], ?T_TAKEOVER * 2).

+ 1 - 0
changes/ee/fix-11733.en.md

@@ -0,0 +1 @@
+Resolved an incompatibility issue that led to crashes during session takeover / channel eviction when the session was residing on a remote node running EMQX v5.2.x or earlier.