瀏覽代碼

chore: simplify session eviction for node rebalance

Ilya Averyanov 2 年之前
父節點
當前提交
11c39c4b6a

+ 1 - 22
apps/emqx/src/emqx_cm.erl

@@ -77,7 +77,6 @@
 %% Client management
 %% Client management
 -export([
 -export([
     all_channels_table/1,
     all_channels_table/1,
-    channel_with_session_table/1,
     live_connection_table/1
     live_connection_table/1
 ]).
 ]).
 
 
@@ -564,27 +563,7 @@ all_channels() ->
     Pat = [{{'_', '$1'}, [], ['$1']}],
     Pat = [{{'_', '$1'}, [], ['$1']}],
     ets:select(?CHAN_TAB, Pat).
     ets:select(?CHAN_TAB, Pat).
 
 
-%% @doc Get clientinfo for all clients with sessions
-channel_with_session_table(ConnModuleList) ->
-    Ms = ets:fun2ms(
-        fun({{ClientId, _ChanPid}, Info, _Stats}) ->
-            {ClientId, Info}
-        end
-    ),
-    Table = ets:table(?CHAN_INFO_TAB, [{traverse, {select, Ms}}]),
-    ConnModules = sets:from_list(ConnModuleList, [{version, 2}]),
-    qlc:q([
-        {ClientId, ConnState, ConnInfo, ClientInfo}
-     || {ClientId, #{
-            conn_state := ConnState,
-            clientinfo := ClientInfo,
-            conninfo := #{clean_start := false, conn_mod := ConnModule} = ConnInfo
-        }} <-
-            Table,
-        sets:is_element(ConnModule, ConnModules)
-    ]).
-
-%% @doc Get clientinfo for all clients, regardless if they use clean start or not.
+%% @doc Get clientinfo for all clients
 all_channels_table(ConnModuleList) ->
 all_channels_table(ConnModuleList) ->
     Ms = ets:fun2ms(
     Ms = ets:fun2ms(
         fun({{ClientId, _ChanPid}, Info, _Stats}) ->
         fun({{ClientId, _ChanPid}, Info, _Stats}) ->

+ 19 - 9
apps/emqx_eviction_agent/src/emqx_eviction_agent.erl

@@ -230,17 +230,17 @@ connection_table() ->
 connection_count() ->
 connection_count() ->
     table_count(connection_table()).
     table_count(connection_table()).
 
 
-channel_with_session_table(any) ->
+channel_table(any) ->
     qlc:q([
     qlc:q([
         {ClientId, ConnInfo, ClientInfo}
         {ClientId, ConnInfo, ClientInfo}
      || {ClientId, _, ConnInfo, ClientInfo} <-
      || {ClientId, _, ConnInfo, ClientInfo} <-
-            emqx_cm:channel_with_session_table(?CONN_MODULES)
+            emqx_cm:all_channels_table(?CONN_MODULES)
     ]);
     ]);
-channel_with_session_table(RequiredConnState) ->
+channel_table(RequiredConnState) ->
     qlc:q([
     qlc:q([
         {ClientId, ConnInfo, ClientInfo}
         {ClientId, ConnInfo, ClientInfo}
      || {ClientId, ConnState, ConnInfo, ClientInfo} <-
      || {ClientId, ConnState, ConnInfo, ClientInfo} <-
-            emqx_cm:channel_with_session_table(?CONN_MODULES),
+            emqx_cm:all_channels_table(?CONN_MODULES),
         RequiredConnState =:= ConnState
         RequiredConnState =:= ConnState
     ]).
     ]).
 
 
@@ -269,13 +269,13 @@ all_channels_count() ->
 
 
 -spec all_local_channels_count() -> non_neg_integer().
 -spec all_local_channels_count() -> non_neg_integer().
 all_local_channels_count() ->
 all_local_channels_count() ->
-    table_count(emqx_cm:all_channels_table(?CONN_MODULES)).
+    table_count(channel_table(any)).
 
 
 session_count() ->
 session_count() ->
     session_count(any).
     session_count(any).
 
 
 session_count(ConnState) ->
 session_count(ConnState) ->
-    table_count(channel_with_session_table(ConnState)).
+    table_count(channel_table(ConnState)).
 
 
 table_count(QH) ->
 table_count(QH) ->
     qlc:fold(fun(_, Acc) -> Acc + 1 end, 0, QH).
     qlc:fold(fun(_, Acc) -> Acc + 1 end, 0, QH).
@@ -298,8 +298,8 @@ take_channels(N) ->
     ok = qlc:delete_cursor(ChanPidCursor),
     ok = qlc:delete_cursor(ChanPidCursor),
     Channels.
     Channels.
 
 
-take_channel_with_sessions(N, ConnState) ->
-    ChanPidCursor = qlc:cursor(channel_with_session_table(ConnState)),
+take_channels(N, ConnState) ->
+    ChanPidCursor = qlc:cursor(channel_table(ConnState)),
     Channels = qlc:next_answers(ChanPidCursor, N),
     Channels = qlc:next_answers(ChanPidCursor, N),
     ok = qlc:delete_cursor(ChanPidCursor),
     ok = qlc:delete_cursor(ChanPidCursor),
     Channels.
     Channels.
@@ -314,7 +314,7 @@ do_evict_connections(N, ServerReference) when N > 0 ->
     ).
     ).
 
 
 do_evict_sessions(N, Nodes, ConnState) when N > 0 ->
 do_evict_sessions(N, Nodes, ConnState) when N > 0 ->
-    Channels = take_channel_with_sessions(N, ConnState),
+    Channels = take_channels(N, ConnState),
     ok = lists:foreach(
     ok = lists:foreach(
         fun({ClientId, ConnInfo, ClientInfo}) ->
         fun({ClientId, ConnInfo, ClientInfo}) ->
             evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo)
             evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo)
@@ -346,6 +346,16 @@ evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo) ->
                 }
                 }
             ),
             ),
             {error, Reason};
             {error, Reason};
+        {error, {no_session, _}} = Error ->
+            ?SLOG(
+                warning,
+                #{
+                    msg => "evict_session_channel_no_session",
+                    client_id => ClientId,
+                    node => Node
+                }
+            ),
+            Error;
         {error, Reason} = Error ->
         {error, Reason} = Error ->
             ?SLOG(
             ?SLOG(
                 error,
                 error,

+ 3 - 3
apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl

@@ -15,7 +15,7 @@
 
 
 -import(
 -import(
     emqx_eviction_agent_test_helpers,
     emqx_eviction_agent_test_helpers,
-    [emqtt_connect/0, emqtt_connect/1, emqtt_connect/2]
+    [emqtt_connect/0, emqtt_connect/1, emqtt_connect/2, emqtt_connect_for_publish/1]
 ).
 ).
 
 
 -define(assertPrinted(Printed, Code),
 -define(assertPrinted(Printed, Code),
@@ -202,7 +202,7 @@ t_explicit_session_takeover(Config) ->
 
 
     ok = rpc:call(Node1, emqx_eviction_agent, disable, [test_eviction]),
     ok = rpc:call(Node1, emqx_eviction_agent, disable, [test_eviction]),
 
 
-    {ok, C1} = emqtt_connect([{port, Port1}]),
+    {ok, C1} = emqtt_connect_for_publish(Port1),
     emqtt:publish(C1, <<"t1">>, <<"MessageToEvictedSession1">>),
     emqtt:publish(C1, <<"t1">>, <<"MessageToEvictedSession1">>),
     ok = emqtt:disconnect(C1),
     ok = emqtt:disconnect(C1),
 
 
@@ -229,7 +229,7 @@ t_explicit_session_takeover(Config) ->
     ok = rpc:call(Node1, emqx_eviction_agent, disable, [test_eviction]),
     ok = rpc:call(Node1, emqx_eviction_agent, disable, [test_eviction]),
 
 
     %% Session is on Node2, but we connect to Node1
     %% Session is on Node2, but we connect to Node1
-    {ok, C2} = emqtt_connect([{port, Port1}]),
+    {ok, C2} = emqtt_connect_for_publish(Port1),
     emqtt:publish(C2, <<"t1">>, <<"MessageToEvictedSession2">>),
     emqtt:publish(C2, <<"t1">>, <<"MessageToEvictedSession2">>),
     ok = emqtt:disconnect(C2),
     ok = emqtt:disconnect(C2),
 
 

+ 0 - 1
apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl

@@ -9,7 +9,6 @@
 
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("common_test/include/ct.hrl").
--include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("emqx/include/emqx_channel.hrl").
 -include_lib("emqx/include/emqx_channel.hrl").
 
 
 -define(CLIENT_ID, <<"client_with_session">>).
 -define(CLIENT_ID, <<"client_with_session">>).

+ 9 - 0
apps/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl

@@ -8,6 +8,7 @@
     emqtt_connect/0,
     emqtt_connect/0,
     emqtt_connect/1,
     emqtt_connect/1,
     emqtt_connect/2,
     emqtt_connect/2,
+    emqtt_connect_for_publish/1,
     emqtt_connect_many/2,
     emqtt_connect_many/2,
     emqtt_connect_many/3,
     emqtt_connect_many/3,
     stop_many/1,
     stop_many/1,
@@ -42,6 +43,14 @@ emqtt_connect(Opts) ->
         {error, _} = Error -> Error
         {error, _} = Error -> Error
     end.
     end.
 
 
+emqtt_connect_for_publish(Port) ->
+    ClientId = <<"pubclient-", (integer_to_binary(erlang:unique_integer([positive])))/binary>>,
+    {ok, C} = emqtt:start_link([{clientid, ClientId}, {port, Port}]),
+    case emqtt:connect(C) of
+        {ok, _} -> {ok, C};
+        {error, _} = Error -> Error
+    end.
+
 emqtt_connect_many(Port, Count) ->
 emqtt_connect_many(Port, Count) ->
     emqtt_connect_many(Port, Count, _StartN = 1).
     emqtt_connect_many(Port, Count, _StartN = 1).
 
 

+ 1 - 0
changes/ee/feat-11612.en.md

@@ -0,0 +1 @@
+During node evacuation, evacuate all disconnected sessions, not only those started with `clean_start` set to `false`.