Sfoglia il codice sorgente

fix(sessds): Channel must destroy sessions that are kicked

ieQu1 1 anno fa
parent
commit
236685f714

+ 3 - 1
apps/emqx/src/emqx_channel.erl

@@ -1146,9 +1146,11 @@ handle_call(
     kick,
     Channel = #channel{
         conn_state = ConnState,
-        conninfo = #{proto_ver := ProtoVer}
+        conninfo = #{proto_ver := ProtoVer},
+        session = Session
     }
 ) ->
+    emqx_session:destroy(Session),
     Channel0 = maybe_publish_will_msg(kicked, Channel),
     Channel1 =
         case ConnState of

+ 10 - 1
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -36,7 +36,8 @@
 -export([
     create/3,
     open/3,
-    destroy/1
+    destroy/1,
+    kick_offline_session/1
 ]).
 
 -export([
@@ -204,6 +205,14 @@ destroy(#{clientid := ClientID}) ->
 destroy_session(ClientID) ->
     session_drop(ClientID, destroy).
 
+kick_offline_session(ClientID) ->
+    emqx_cm_locker:trans(
+        ClientID,
+        fun(_Nodes) ->
+            session_drop(ClientID, kicked)
+        end
+    ).
+
 %%--------------------------------------------------------------------
 %% Info, Stats
 %%--------------------------------------------------------------------

+ 4 - 0
apps/emqx_management/src/emqx_mgmt.erl

@@ -351,6 +351,10 @@ kickout_client(ClientId) ->
     case lookup_client({clientid, ClientId}, undefined) of
         [] ->
             {error, not_found};
+        [{ClientId, _}] ->
+            %% Offline durable session (client ID is a plain binary
+            %% without channel pid):
+            emqx_persistent_session_ds:kick_offline_session(ClientId);
         _ ->
             Results = [kickout_client(Node, ClientId) || Node <- emqx:running_nodes()],
             check_results(Results)

+ 1 - 0
changes/ce/fix-12740.en.md

@@ -0,0 +1 @@
+Add support for kickout of durable sessions.