Просмотр исходного кода

Merge pull request #11562 from thalesmg/cluster-purge-r52-20230904

feat: cluster purge (r5.2)
Zaiming (Stone) Shi 2 лет назад
Родитель
Сommit
f6d347304b
31 измененных файлов с 1530 добавлено и 72 удалено
  1. 6 0
      apps/emqx/priv/bpapi.versions
  2. 21 0
      apps/emqx/src/emqx_cm.erl
  3. 2 0
      apps/emqx/test/emqx_cth_cluster.erl
  4. 1 1
      apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src
  5. 61 1
      apps/emqx_eviction_agent/src/emqx_eviction_agent.erl
  6. 35 0
      apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v2.erl
  7. 5 1
      apps/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl
  8. 4 1
      apps/emqx_management/src/emqx_mgmt_api_plugins.erl
  9. 0 1
      apps/emqx_management/src/emqx_mgmt_data_backup.erl
  10. 3 2
      apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl
  11. 1 1
      apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl
  12. 22 6
      apps/emqx_modules/src/emqx_delayed.erl
  13. 48 0
      apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl
  14. 3 3
      apps/emqx_modules/test/emqx_delayed_SUITE.erl
  15. 1 1
      apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src
  16. 2 2
      apps/emqx_node_rebalance/src/emqx_node_rebalance.erl
  17. 18 6
      apps/emqx_node_rebalance/src/emqx_node_rebalance_agent.erl
  18. 126 6
      apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl
  19. 75 5
      apps/emqx_node_rebalance/src/emqx_node_rebalance_cli.erl
  20. 233 0
      apps/emqx_node_rebalance/src/emqx_node_rebalance_purge.erl
  21. 51 12
      apps/emqx_node_rebalance/src/emqx_node_rebalance_status.erl
  22. 1 0
      apps/emqx_node_rebalance/src/emqx_node_rebalance_sup.erl
  23. 59 0
      apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_api_proto_v2.erl
  24. 84 0
      apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_proto_v2.erl
  25. 29 0
      apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_purge_proto_v1.erl
  26. 46 0
      apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_status_proto_v2.erl
  27. 131 22
      apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl
  28. 83 0
      apps/emqx_node_rebalance/test/emqx_node_rebalance_cli_SUITE.erl
  29. 360 0
      apps/emqx_node_rebalance/test/emqx_node_rebalance_purge_SUITE.erl
  30. 1 1
      apps/emqx_node_rebalance/test/emqx_node_rebalance_status_SUITE.erl
  31. 18 0
      rel/i18n/emqx_node_rebalance_api.hocon

+ 6 - 0
apps/emqx/priv/bpapi.versions

@@ -15,7 +15,9 @@
 {emqx_conf,3}.
 {emqx_dashboard,1}.
 {emqx_delayed,1}.
+{emqx_delayed,2}.
 {emqx_eviction_agent,1}.
+{emqx_eviction_agent,2}.
 {emqx_exhook,1}.
 {emqx_ft_storage_exporter_fs,1}.
 {emqx_ft_storage_fs,1}.
@@ -37,9 +39,13 @@
 {emqx_mgmt_trace,1}.
 {emqx_mgmt_trace,2}.
 {emqx_node_rebalance,1}.
+{emqx_node_rebalance,2}.
 {emqx_node_rebalance_api,1}.
+{emqx_node_rebalance_api,2}.
 {emqx_node_rebalance_evacuation,1}.
+{emqx_node_rebalance_purge,1}.
 {emqx_node_rebalance_status,1}.
+{emqx_node_rebalance_status,2}.
 {emqx_persistent_session,1}.
 {emqx_plugins,1}.
 {emqx_prometheus,1}.

+ 21 - 0
apps/emqx/src/emqx_cm.erl

@@ -75,6 +75,7 @@
 
 %% Client management
 -export([
+    all_channels_table/1,
     channel_with_session_table/1,
     live_connection_table/1
 ]).
@@ -582,6 +583,26 @@ channel_with_session_table(ConnModuleList) ->
         sets:is_element(ConnModule, ConnModules)
     ]).
 
+%% @doc Get clientinfo for all clients, regardless if they use clean start or not.
+all_channels_table(ConnModuleList) ->
+    Ms = ets:fun2ms(
+        fun({{ClientId, _ChanPid}, Info, _Stats}) ->
+            {ClientId, Info}
+        end
+    ),
+    Table = ets:table(?CHAN_INFO_TAB, [{traverse, {select, Ms}}]),
+    ConnModules = sets:from_list(ConnModuleList, [{version, 2}]),
+    qlc:q([
+        {ClientId, ConnState, ConnInfo, ClientInfo}
+     || {ClientId, #{
+            conn_state := ConnState,
+            clientinfo := ClientInfo,
+            conninfo := #{conn_mod := ConnModule} = ConnInfo
+        }} <-
+            Table,
+        sets:is_element(ConnModule, ConnModules)
+    ]).
+
 %% @doc Get all local connection query handle
 live_connection_table(ConnModules) ->
     Ms = lists:map(fun live_connection_ms/1, ConnModules),

+ 2 - 0
apps/emqx/test/emqx_cth_cluster.erl

@@ -238,6 +238,8 @@ default_appspec(emqx_conf, Spec, _NodeSpecs) ->
             listeners => allocate_listener_ports([tcp, ssl, ws, wss], Spec)
         }
     };
+default_appspec(emqx, Spec, _NodeSpecs) ->
+    #{config => #{listeners => allocate_listener_ports([tcp, ssl, ws, wss], Spec)}};
 default_appspec(_App, _, _) ->
     #{}.
 

+ 1 - 1
apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src

@@ -1,6 +1,6 @@
 {application, emqx_eviction_agent, [
     {description, "EMQX Eviction Agent"},
-    {vsn, "5.1.0"},
+    {vsn, "5.1.1"},
     {registered, [
         emqx_eviction_agent_sup,
         emqx_eviction_agent,

+ 61 - 1
apps/emqx_eviction_agent/src/emqx_eviction_agent.erl

@@ -18,14 +18,19 @@
     disable/1,
     status/0,
     connection_count/0,
+    all_channels_count/0,
     session_count/0,
     session_count/1,
     evict_connections/1,
     evict_sessions/2,
     evict_sessions/3,
+    purge_sessions/1,
     evict_session_channel/3
 ]).
 
+%% RPC targets
+-export([all_local_channels_count/0]).
+
 -behaviour(gen_server).
 
 -export([
@@ -113,6 +118,14 @@ evict_sessions(N, Nodes, ConnState) when
             {error, disabled}
     end.
 
+purge_sessions(N) ->
+    case enable_status() of
+        {enabled, _Kind, _ServerReference} ->
+            ok = do_purge_sessions(N);
+        disabled ->
+            {error, disabled}
+    end.
+
 %%--------------------------------------------------------------------
 %% gen_server callbacks
 %%--------------------------------------------------------------------
@@ -231,6 +244,33 @@ channel_with_session_table(RequiredConnState) ->
         RequiredConnState =:= ConnState
     ]).
 
+-spec all_channels_count() -> non_neg_integer().
+all_channels_count() ->
+    Nodes = emqx:running_nodes(),
+    Timeout = 15_000,
+    Results = emqx_eviction_agent_proto_v2:all_channels_count(Nodes, Timeout),
+    NodeResults = lists:zip(Nodes, Results),
+    Errors = lists:filter(
+        fun
+            ({_Node, {ok, _}}) -> false;
+            ({_Node, _Err}) -> true
+        end,
+        NodeResults
+    ),
+    Errors =/= [] andalso
+        ?SLOG(
+            warning,
+            #{
+                msg => "error_collecting_all_channels_count",
+                errors => maps:from_list(Errors)
+            }
+        ),
+    lists:sum([N || {ok, N} <- Results]).
+
+-spec all_local_channels_count() -> non_neg_integer().
+all_local_channels_count() ->
+    table_count(emqx_cm:all_channels_table(?CONN_MODULES)).
+
 session_count() ->
     session_count(any).
 
@@ -247,6 +287,17 @@ take_connections(N) ->
     ok = qlc:delete_cursor(ChanPidCursor),
     ChanPids.
 
+take_channels(N) ->
+    QH = qlc:q([
+        {ClientId, ConnInfo, ClientInfo}
+     || {ClientId, _, ConnInfo, ClientInfo} <-
+            emqx_cm:all_channels_table(?CONN_MODULES)
+    ]),
+    ChanPidCursor = qlc:cursor(QH),
+    Channels = qlc:next_answers(ChanPidCursor, N),
+    ok = qlc:delete_cursor(ChanPidCursor),
+    Channels.
+
 take_channel_with_sessions(N, ConnState) ->
     ChanPidCursor = qlc:cursor(channel_with_session_table(ConnState)),
     Channels = qlc:next_answers(ChanPidCursor, N),
@@ -283,7 +334,7 @@ evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo) ->
             client_info => ClientInfo
         }
     ),
-    case emqx_eviction_agent_proto_v1:evict_session_channel(Node, ClientId, ConnInfo, ClientInfo) of
+    case emqx_eviction_agent_proto_v2:evict_session_channel(Node, ClientId, ConnInfo, ClientInfo) of
         {badrpc, Reason} ->
             ?SLOG(
                 error,
@@ -344,5 +395,14 @@ disconnect_channel(ChanPid, ServerReference) ->
             'Server-Reference' => ServerReference
         }}.
 
+do_purge_sessions(N) when N > 0 ->
+    Channels = take_channels(N),
+    ok = lists:foreach(
+        fun({ClientId, _ConnInfo, _ClientInfo}) ->
+            emqx_cm:discard_session(ClientId)
+        end,
+        Channels
+    ).
+
 select_random(List) when length(List) > 0 ->
     lists:nth(rand:uniform(length(List)), List).

+ 35 - 0
apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v2.erl

@@ -0,0 +1,35 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_eviction_agent_proto_v2).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+
+    evict_session_channel/4,
+
+    %% Introduced in v2:
+    all_channels_count/2
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.2.1".
+
+-spec evict_session_channel(
+    node(),
+    emqx_types:clientid(),
+    emqx_types:conninfo(),
+    emqx_types:clientinfo()
+) -> supervisor:startchild_err() | emqx_rpc:badrpc().
+evict_session_channel(Node, ClientId, ConnInfo, ClientInfo) ->
+    rpc:call(Node, emqx_eviction_agent, evict_session_channel, [ClientId, ConnInfo, ClientInfo]).
+
+%% Introduced in v2:
+-spec all_channels_count([node()], time:time()) -> emqx_rpc:erpc_multicall(non_neg_integer()).
+all_channels_count(Nodes, Timeout) ->
+    erpc:multicall(Nodes, emqx_eviction_agent, all_local_channels_count, [], Timeout).

+ 5 - 1
apps/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl

@@ -9,6 +9,7 @@
     emqtt_connect/1,
     emqtt_connect/2,
     emqtt_connect_many/2,
+    emqtt_connect_many/3,
     stop_many/1,
 
     emqtt_try_connect/1,
@@ -42,6 +43,9 @@ emqtt_connect(Opts) ->
     end.
 
 emqtt_connect_many(Port, Count) ->
+    emqtt_connect_many(Port, Count, _StartN = 1).
+
+emqtt_connect_many(Port, Count, StartN) ->
     lists:map(
         fun(N) ->
             NBin = integer_to_binary(N),
@@ -49,7 +53,7 @@ emqtt_connect_many(Port, Count) ->
             {ok, C} = emqtt_connect([{clientid, ClientId}, {clean_start, false}, {port, Port}]),
             C
         end,
-        lists:seq(1, Count)
+        lists:seq(StartN, StartN + Count - 1)
     ).
 
 stop_many(Clients) ->

+ 4 - 1
apps/emqx_management/src/emqx_mgmt_api_plugins.erl

@@ -166,7 +166,10 @@ schema("/plugins/:name/move") ->
             tags => ?TAGS,
             parameters => [hoconsc:ref(name)],
             'requestBody' => move_request_body(),
-            responses => #{200 => <<"OK">>}
+            responses => #{
+                200 => <<"OK">>,
+                400 => emqx_dashboard_swagger:error_codes(['MOVE_FAILED'], <<"Move failed">>)
+            }
         }
     }.
 

+ 0 - 1
apps/emqx_management/src/emqx_mgmt_data_backup.erl

@@ -525,7 +525,6 @@ do_import_conf(RawConf, Opts) ->
     Errors =
         lists:foldr(
             fun(Module, ErrorsAcc) ->
-                Module:import_config(RawConf),
                 case Module:import_config(RawConf) of
                     {ok, #{changed := Changed}} ->
                         maybe_print_changed(Changed, Opts),

+ 3 - 2
apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl

@@ -297,7 +297,8 @@ update_plugin(Config, Name, Action) when is_list(Config) ->
 update_boot_order(Name, MoveBody, Config) ->
     #{host := Host, auth := Auth} = get_host_and_auth(Config),
     Path = emqx_mgmt_api_test_util:api_path(Host, ["plugins", Name, "move"]),
-    case emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, MoveBody) of
+    Opts = #{return_all => true},
+    case emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, MoveBody, Opts) of
         {ok, Res} ->
             Resp =
                 case emqx_utils_json:safe_decode(Res, [return_maps]) of
@@ -362,8 +363,8 @@ cluster(TestCase, Config) ->
 
 app_specs(_Config) ->
     [
-        emqx_conf,
         emqx,
+        emqx_conf,
         emqx_management,
         emqx_plugins
     ].

+ 1 - 1
apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl

@@ -440,8 +440,8 @@ create_test_tab(Attributes) ->
 
 apps_to_start() ->
     [
-        {emqx_conf, "dashboard.listeners.http.bind = 0"},
         {emqx, #{override_env => [{boot_modules, [broker, router]}]}},
+        {emqx_conf, "dashboard.listeners.http.bind = 0"},
         emqx_psk,
         emqx_management,
         emqx_dashboard,

+ 22 - 6
apps/emqx_modules/src/emqx_delayed.erl

@@ -45,18 +45,22 @@
     code_change/3
 ]).
 
-%% gen_server callbacks
+%% API
 -export([
     load/0,
     unload/0,
     load_or_unload/1,
     get_conf/1,
     update_config/1,
+    delayed_count/0,
     list/1,
     get_delayed_message/1,
     get_delayed_message/2,
     delete_delayed_message/1,
     delete_delayed_message/2,
+    clear_all/0,
+    %% rpc target
+    clear_all_local/0,
     cluster_list/1
 ]).
 
@@ -167,6 +171,9 @@ unload() ->
 load_or_unload(Bool) ->
     gen_server:call(?SERVER, {do_load_or_unload, Bool}).
 
+-spec delayed_count() -> non_neg_integer().
+delayed_count() -> mnesia:table_info(?TAB, size).
+
 list(Params) ->
     emqx_mgmt_api:paginate(?TAB, Params, ?FORMAT_FUN).
 
@@ -243,7 +250,7 @@ get_delayed_message(Id) ->
 get_delayed_message(Node, Id) when Node =:= node() ->
     get_delayed_message(Id);
 get_delayed_message(Node, Id) ->
-    emqx_delayed_proto_v1:get_delayed_message(Node, Id).
+    emqx_delayed_proto_v2:get_delayed_message(Node, Id).
 
 -spec delete_delayed_message(binary()) -> with_id_return().
 delete_delayed_message(Id) ->
@@ -258,7 +265,19 @@ delete_delayed_message(Id) ->
 delete_delayed_message(Node, Id) when Node =:= node() ->
     delete_delayed_message(Id);
 delete_delayed_message(Node, Id) ->
-    emqx_delayed_proto_v1:delete_delayed_message(Node, Id).
+    emqx_delayed_proto_v2:delete_delayed_message(Node, Id).
+
+-spec clear_all() -> ok.
+clear_all() ->
+    Nodes = emqx:running_nodes(),
+    _ = emqx_delayed_proto_v2:clear_all(Nodes),
+    ok.
+
+%% rpc target
+-spec clear_all_local() -> ok.
+clear_all_local() ->
+    _ = mria:clear_table(?TAB),
+    ok.
 
 update_config(Config) ->
     emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}).
@@ -408,9 +427,6 @@ do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now ->
     end,
     do_publish(mnesia:dirty_next(?TAB, Key), Now, [Key | Acc]).
 
--spec delayed_count() -> non_neg_integer().
-delayed_count() -> mnesia:table_info(?TAB, size).
-
 do_load_or_unload(true, State) ->
     emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_DELAY_PUB),
     State;

+ 48 - 0
apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl

@@ -0,0 +1,48 @@
+%%--------------------------------------------------------------------
+%%Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_delayed_proto_v2).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+    get_delayed_message/2,
+    delete_delayed_message/2,
+
+    %% Introduced in v2:
+    clear_all/1
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.2.1".
+
+-spec get_delayed_message(node(), binary()) ->
+    emqx_delayed:with_id_return(map()) | emqx_rpc:badrpc().
+get_delayed_message(Node, Id) ->
+    rpc:call(Node, emqx_delayed, get_delayed_message, [Id]).
+
+-spec delete_delayed_message(node(), binary()) -> emqx_delayed:with_id_return() | emqx_rpc:badrpc().
+delete_delayed_message(Node, Id) ->
+    rpc:call(Node, emqx_delayed, delete_delayed_message, [Id]).
+
+%% Introduced in v2:
+
+-spec clear_all([node()]) -> emqx_rpc:erpc_multicall(ok).
+clear_all(Nodes) ->
+    erpc:multicall(Nodes, emqx_delayed, clear_all_local, []).

+ 3 - 3
apps/emqx_modules/test/emqx_delayed_SUITE.erl

@@ -164,15 +164,15 @@ t_cluster(_) ->
 
     ?assertMatch(
         {ok, _},
-        emqx_delayed_proto_v1:get_delayed_message(node(), Id)
+        emqx_delayed_proto_v2:get_delayed_message(node(), Id)
     ),
 
     %% The 'local' and the 'fake-remote' values should be the same,
     %% however there is a race condition, so we are just assert that they are both 'ok' tuples
     ?assertMatch({ok, _}, emqx_delayed:get_delayed_message(Id)),
-    ?assertMatch({ok, _}, emqx_delayed_proto_v1:get_delayed_message(node(), Id)),
+    ?assertMatch({ok, _}, emqx_delayed_proto_v2:get_delayed_message(node(), Id)),
 
-    ok = emqx_delayed_proto_v1:delete_delayed_message(node(), Id),
+    ok = emqx_delayed_proto_v2:delete_delayed_message(node(), Id),
 
     ?assertMatch(
         {error, _},

+ 1 - 1
apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src

@@ -1,6 +1,6 @@
 {application, emqx_node_rebalance, [
     {description, "EMQX Node Rebalance"},
-    {vsn, "5.0.4"},
+    {vsn, "5.0.5"},
     {registered, [
         emqx_node_rebalance_sup,
         emqx_node_rebalance,

+ 2 - 2
apps/emqx_node_rebalance/src/emqx_node_rebalance.erl

@@ -81,7 +81,7 @@ start_link() ->
 
 -spec available_nodes(list(node())) -> list(node()).
 available_nodes(Nodes) when is_list(Nodes) ->
-    {Available, _} = emqx_node_rebalance_proto_v1:available_nodes(Nodes),
+    {Available, _} = emqx_node_rebalance_proto_v2:available_nodes(Nodes),
     lists:filter(fun is_atom/1, Available).
 
 %%--------------------------------------------------------------------
@@ -370,7 +370,7 @@ avg(List) when length(List) >= 1 ->
     lists:sum(List) / length(List).
 
 multicall(Nodes, F, A) ->
-    case apply(emqx_node_rebalance_proto_v1, F, [Nodes | A]) of
+    case apply(emqx_node_rebalance_proto_v2, F, [Nodes | A]) of
         {Results, []} ->
             case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of
                 {OkResults, []} ->

+ 18 - 6
apps/emqx_node_rebalance/src/emqx_node_rebalance_agent.erl

@@ -14,7 +14,9 @@
 -export([
     start_link/0,
     enable/1,
+    enable/2,
     disable/1,
+    disable/2,
     status/0
 ]).
 
@@ -40,11 +42,21 @@ start_link() ->
 
 -spec enable(pid()) -> ok_or_error(already_enabled | eviction_agent_busy).
 enable(CoordinatorPid) ->
-    gen_server:call(?MODULE, {enable, CoordinatorPid}).
+    enable(CoordinatorPid, ?ENABLE_KIND).
+
+-spec enable(pid(), emqx_eviction_agent:kind()) ->
+    ok_or_error(already_enabled | eviction_agent_busy).
+enable(CoordinatorPid, Kind) ->
+    gen_server:call(?MODULE, {enable, CoordinatorPid, Kind}).
 
 -spec disable(pid()) -> ok_or_error(already_disabled | invalid_coordinator).
 disable(CoordinatorPid) ->
-    gen_server:call(?MODULE, {disable, CoordinatorPid}).
+    disable(CoordinatorPid, ?ENABLE_KIND).
+
+-spec disable(pid(), emqx_eviction_agent:kind()) ->
+    ok_or_error(already_disabled | invalid_coordinator).
+disable(CoordinatorPid, Kind) ->
+    gen_server:call(?MODULE, {disable, CoordinatorPid, Kind}).
 
 -spec status() -> status().
 status() ->
@@ -57,7 +69,7 @@ status() ->
 init([]) ->
     {ok, #{}}.
 
-handle_call({enable, CoordinatorPid}, _From, St) ->
+handle_call({enable, CoordinatorPid, Kind}, _From, St) ->
     case St of
         #{coordinator_pid := _Pid} ->
             {reply, {error, already_enabled}, St};
@@ -65,7 +77,7 @@ handle_call({enable, CoordinatorPid}, _From, St) ->
             true = link(CoordinatorPid),
             EvictionAgentPid = whereis(emqx_eviction_agent),
             true = link(EvictionAgentPid),
-            case emqx_eviction_agent:enable(?ENABLE_KIND, undefined) of
+            case emqx_eviction_agent:enable(Kind, undefined) of
                 ok ->
                     {reply, ok, #{
                         coordinator_pid => CoordinatorPid,
@@ -77,13 +89,13 @@ handle_call({enable, CoordinatorPid}, _From, St) ->
                     {reply, {error, eviction_agent_busy}, St}
             end
     end;
-handle_call({disable, CoordinatorPid}, _From, St) ->
+handle_call({disable, CoordinatorPid, Kind}, _From, St) ->
     case St of
         #{
             coordinator_pid := CoordinatorPid,
             eviction_agent_pid := EvictionAgentPid
         } ->
-            _ = emqx_eviction_agent:disable(?ENABLE_KIND),
+            _ = emqx_eviction_agent:disable(Kind),
             true = unlink(EvictionAgentPid),
             true = unlink(CoordinatorPid),
             NewSt = maps:without(

+ 126 - 6
apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl

@@ -31,7 +31,9 @@
     '/load_rebalance/:node/start'/2,
     '/load_rebalance/:node/stop'/2,
     '/load_rebalance/:node/evacuation/start'/2,
-    '/load_rebalance/:node/evacuation/stop'/2
+    '/load_rebalance/:node/evacuation/stop'/2,
+    '/load_rebalance/:node/purge/start'/2,
+    '/load_rebalance/:node/purge/stop'/2
 ]).
 
 %% Schema examples
@@ -67,6 +69,9 @@ paths() ->
         "/load_rebalance/:node/stop",
         "/load_rebalance/:node/evacuation/start",
         "/load_rebalance/:node/evacuation/stop"
+        %% TODO: uncomment after we officially release the feature.
+        %% "/load_rebalance/:node/purge/start",
+        %% "/load_rebalance/:node/purge/stop"
     ].
 
 schema("/load_rebalance/status") ->
@@ -176,6 +181,42 @@ schema("/load_rebalance/:node/evacuation/stop") ->
             }
         }
     }.
+%% TODO: uncomment after we officially release the feature.
+%% schema("/load_rebalance/:node/purge/start") ->
+%%     #{
+%%         'operationId' => '/load_rebalance/:node/purge/start',
+%%         post => #{
+%%             tags => [<<"load_rebalance">>],
+%%             summary => <<"Start purge on the whole cluster">>,
+%%             description => ?DESC("cluster_purge_start"),
+%%             parameters => [param_node()],
+%%             'requestBody' =>
+%%                 emqx_dashboard_swagger:schema_with_examples(
+%%                     ref(purge_start),
+%%                     purge_example()
+%%                 ),
+%%             responses => #{
+%%                 200 => response_schema(),
+%%                 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
+%%                 404 => error_codes([?NOT_FOUND], <<"Not Found">>)
+%%             }
+%%         }
+%%     };
+%% schema("/load_rebalance/:node/purge/stop") ->
+%%     #{
+%%         'operationId' => '/load_rebalance/:node/purge/stop',
+%%         post => #{
+%%             tags => [<<"load_rebalance">>],
+%%             summary => <<"Stop purge on the whole cluster">>,
+%%             description => ?DESC("cluster_purge_stop"),
+%%             parameters => [param_node()],
+%%             responses => #{
+%%                 200 => response_schema(),
+%%                 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
+%%                 404 => error_codes([?NOT_FOUND], <<"Not Found">>)
+%%             }
+%%         }
+%%     }.
 
 %%--------------------------------------------------------------------
 %% Handlers
@@ -188,16 +229,20 @@ schema("/load_rebalance/:node/evacuation/stop") ->
         {rebalance, Stats} ->
             {200, format_status(rebalance, Stats)};
         {evacuation, Stats} ->
-            {200, format_status(evacuation, Stats)}
+            {200, format_status(evacuation, Stats)};
+        {purge, Stats} ->
+            {200, format_status(purge, Stats)}
     end.
 
 '/load_rebalance/global_status'(get, #{}) ->
     #{
         evacuations := Evacuations,
+        purges := Purges,
         rebalances := Rebalances
     } = emqx_node_rebalance_status:global_status(),
     {200, #{
         evacuations => format_as_map_list(Evacuations),
+        purges => format_as_map_list(Purges),
         rebalances => format_as_map_list(Rebalances)
     }}.
 
@@ -214,7 +259,7 @@ schema("/load_rebalance/:node/evacuation/stop") ->
         Params1 = translate(rebalance_start, Params0),
         with_nodes_at_key(nodes, Params1, fun(Params2) ->
             wrap_rpc(
-                Node, emqx_node_rebalance_api_proto_v1:node_rebalance_start(Node, Params2)
+                Node, emqx_node_rebalance_api_proto_v2:node_rebalance_start(Node, Params2)
             )
         end)
     end).
@@ -222,7 +267,7 @@ schema("/load_rebalance/:node/evacuation/stop") ->
 '/load_rebalance/:node/stop'(post, #{bindings := #{node := NodeBin}}) ->
     emqx_utils_api:with_node(NodeBin, fun(Node) ->
         wrap_rpc(
-            Node, emqx_node_rebalance_api_proto_v1:node_rebalance_stop(Node)
+            Node, emqx_node_rebalance_api_proto_v2:node_rebalance_stop(Node)
         )
     end).
 
@@ -234,7 +279,7 @@ schema("/load_rebalance/:node/evacuation/stop") ->
         with_nodes_at_key(migrate_to, Params1, fun(Params2) ->
             wrap_rpc(
                 Node,
-                emqx_node_rebalance_api_proto_v1:node_rebalance_evacuation_start(
+                emqx_node_rebalance_api_proto_v2:node_rebalance_evacuation_start(
                     Node, Params2
                 )
             )
@@ -244,7 +289,27 @@ schema("/load_rebalance/:node/evacuation/stop") ->
 '/load_rebalance/:node/evacuation/stop'(post, #{bindings := #{node := NodeBin}}) ->
     emqx_utils_api:with_node(NodeBin, fun(Node) ->
         wrap_rpc(
-            Node, emqx_node_rebalance_api_proto_v1:node_rebalance_evacuation_stop(Node)
+            Node, emqx_node_rebalance_api_proto_v2:node_rebalance_evacuation_stop(Node)
+        )
+    end).
+
+'/load_rebalance/:node/purge/start'(post, #{
+    bindings := #{node := NodeBin}, body := Params0
+}) ->
+    emqx_utils_api:with_node(NodeBin, fun(Node) ->
+        Params1 = translate(purge_start, Params0),
+        wrap_rpc(
+            Node,
+            emqx_node_rebalance_api_proto_v2:node_rebalance_purge_start(
+                Node, Params1
+            )
+        )
+    end).
+
+'/load_rebalance/:node/purge/stop'(post, #{bindings := #{node := NodeBin}}) ->
+    emqx_utils_api:with_node(NodeBin, fun(Node) ->
+        wrap_rpc(
+            Node, emqx_node_rebalance_api_proto_v2:node_rebalance_purge_stop(Node)
         )
     end).
 
@@ -483,6 +548,17 @@ fields(rebalance_evacuation_start) ->
                 }
             )}
     ];
+fields(purge_start) ->
+    [
+        {"purge_rate",
+            mk(
+                pos_integer(),
+                #{
+                    desc => ?DESC(purge_rate),
+                    required => false
+                }
+            )}
+    ];
 fields(local_status_disabled) ->
     [
         {"status",
@@ -687,6 +763,38 @@ fields(global_evacuation_status) ->
                     }
                 )}
         ];
+fields(global_purge_status) ->
+    without(
+        [
+            "status",
+            "process",
+            "connection_eviction_rate",
+            "session_eviction_rate",
+            "connection_goal",
+            "disconnected_session_goal",
+            "session_recipients",
+            "recipients"
+        ],
+        fields(local_status_enabled)
+    ) ++
+        [
+            {"purge_rate",
+                mk(
+                    pos_integer(),
+                    #{
+                        desc => ?DESC(local_status_purge_rate),
+                        required => false
+                    }
+                )},
+            {"node",
+                mk(
+                    binary(),
+                    #{
+                        desc => ?DESC(evacuation_status_node),
+                        required => true
+                    }
+                )}
+        ];
 fields(global_status) ->
     [
         {"evacuations",
@@ -697,6 +805,14 @@ fields(global_status) ->
                     required => false
                 }
             )},
+        {"purges",
+            mk(
+                hoconsc:array(ref(global_purge_status)),
+                #{
+                    desc => ?DESC(global_status_purges),
+                    required => false
+                }
+            )},
         {"rebalances",
             mk(
                 hoconsc:array(ref(global_coordinator_status)),
@@ -735,6 +851,10 @@ rebalance_evacuation_example() ->
         }
     }.
 
+%% TODO: uncomment after we officially release the feature.
+%% purge_example() ->
+%%     #{purge => #{purge_rate => 100}}.
+
 local_status_response_schema() ->
     hoconsc:union([ref(local_status_disabled), ref(local_status_enabled)]).
 

+ 75 - 5
apps/emqx_node_rebalance/src/emqx_node_rebalance_cli.erl

@@ -29,6 +29,15 @@ cli(["start" | StartArgs]) ->
                     emqx_ctl:print("Rebalance is already enabled~n"),
                     false
             end;
+        {purge, Opts} ->
+            case emqx_node_rebalance_purge:start(Opts) of
+                ok ->
+                    emqx_ctl:print("Rebalance(purge) started~n"),
+                    true;
+                {error, Reason} ->
+                    emqx_ctl:print("Rebalance(purge) start error: ~p~n", [Reason]),
+                    false
+            end;
         {rebalance, Opts} ->
             case emqx_node_rebalance:start(Opts) of
                 ok ->
@@ -55,6 +64,7 @@ cli(["node-status"]) ->
 cli(["status"]) ->
     #{
         evacuations := Evacuations,
+        purges := Purges,
         rebalances := Rebalances
     } = emqx_node_rebalance_status:global_status(),
     lists:foreach(
@@ -69,6 +79,18 @@ cli(["status"]) ->
         end,
         Evacuations
     ),
+    lists:foreach(
+        fun({Node, Status}) ->
+            emqx_ctl:print(
+                "--------------------------------------------------------------------~n"
+            ),
+            emqx_ctl:print(
+                "Node ~p: purge~n~s",
+                [Node, emqx_node_rebalance_status:format_local_status(Status)]
+            )
+        end,
+        Purges
+    ),
     lists:foreach(
         fun({Node, Status}) ->
             emqx_ctl:print(
@@ -82,10 +104,14 @@ cli(["status"]) ->
         Rebalances
     );
 cli(["stop"]) ->
-    case emqx_node_rebalance_evacuation:status() of
-        {enabled, _} ->
-            ok = emqx_node_rebalance_evacuation:stop(),
-            emqx_ctl:print("Rebalance(evacuation) stopped~n"),
+    Checks =
+        [
+            {evacuation, fun emqx_node_rebalance_evacuation:status/0,
+                fun emqx_node_rebalance_evacuation:stop/0},
+            {purge, fun emqx_node_rebalance_purge:status/0, fun emqx_node_rebalance_purge:stop/0}
+        ],
+    case do_stop(Checks) of
+        ok ->
             true;
         disabled ->
             case emqx_node_rebalance:status() of
@@ -112,6 +138,13 @@ cli(_) ->
                 "Start current node evacuation with optional server redirect to the specified servers"
             },
 
+            %% TODO: uncomment after we officially release the feature.
+            %% {
+            %%     "rebalance start --purge \\\n"
+            %%     "    [--purge-rate CountPerSec]",
+            %%     "Start purge on all running nodes in the cluster"
+            %% },
+
             {
                 "rebalance start \\\n"
                 "    [--nodes \"node1@host1 node2@host2\"] \\\n"
@@ -140,7 +173,11 @@ cli(_) ->
 
 node_status(NodeStatus) ->
     case NodeStatus of
-        {Process, Status} when Process =:= evacuation orelse Process =:= rebalance ->
+        {Process, Status} when
+            Process =:= evacuation;
+            Process =:= purge;
+            Process =:= rebalance
+        ->
             emqx_ctl:print(
                 "Rebalance type: ~p~n~s~n",
                 [Process, emqx_node_rebalance_status:format_local_status(Status)]
@@ -160,6 +197,13 @@ start_args(Args) ->
                 {error, _} = Error ->
                     Error
             end;
+        {ok, #{"--purge" := true} = Collected} ->
+            case validate_purge(maps:to_list(Collected), #{}) of
+                {ok, Validated} ->
+                    {purge, Validated};
+                {error, _} = Error ->
+                    Error
+            end;
         {ok, #{} = Collected} ->
             case validate_rebalance(maps:to_list(Collected), #{}) of
                 {ok, Validated} ->
@@ -180,6 +224,11 @@ collect_args(["--redirect-to", ServerReference | Args], Map) ->
     collect_args(Args, Map#{"--redirect-to" => ServerReference});
 collect_args(["--migrate-to", MigrateTo | Args], Map) ->
     collect_args(Args, Map#{"--migrate-to" => MigrateTo});
+%% purge
+collect_args(["--purge" | Args], Map) ->
+    collect_args(Args, Map#{"--purge" => true});
+collect_args(["--purge-rate", PurgeRate | Args], Map) ->
+    collect_args(Args, Map#{"--purge-rate" => PurgeRate});
 %% rebalance
 collect_args(["--nodes", Nodes | Args], Map) ->
     collect_args(Args, Map#{"--nodes" => Nodes});
@@ -239,6 +288,15 @@ validate_evacuation([{"--migrate-to", MigrateTo} | Rest], Map) ->
 validate_evacuation(Rest, _Map) ->
     {error, io_lib:format("unknown evacuation arguments: ~p", [Rest])}.
 
+validate_purge([], Map) ->
+    {ok, Map};
+validate_purge([{"--purge", _} | Rest], Map) ->
+    validate_purge(Rest, Map);
+validate_purge([{"--purge-rate", _} | _] = Opts, Map) ->
+    validate_pos_int(purge_rate, Opts, Map, fun validate_purge/2);
+validate_purge(Rest, _Map) ->
+    {error, io_lib:format("unknown purge arguments: ~p", [Rest])}.
+
 validate_rebalance([], Map) ->
     {ok, Map};
 validate_rebalance([{"--wait-health-check", _} | _] = Opts, Map) ->
@@ -306,3 +364,15 @@ strings_to_atoms([Str | Rest], Atoms, Invalid) ->
         {error, _} ->
             strings_to_atoms(Rest, Atoms, [Str | Invalid])
     end.
+
+do_stop([{Type, Check, Stop} | Rest]) ->
+    case Check() of
+        {enabled, _} ->
+            ok = Stop(),
+            emqx_ctl:print("Rebalance(~s) stopped~n", [Type]),
+            ok;
+        disabled ->
+            do_stop(Rest)
+    end;
+do_stop([]) ->
+    disabled.

+ 233 - 0
apps/emqx_node_rebalance/src/emqx_node_rebalance_purge.erl

@@ -0,0 +1,233 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_node_rebalance_purge).
+
+-include("emqx_node_rebalance.hrl").
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx/include/types.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-export([
+    start/1,
+    status/0,
+    stop/0
+]).
+
+-export([start_link/0]).
+
+-behaviour(gen_statem).
+
+-export([
+    init/1,
+    callback_mode/0,
+    handle_event/4,
+    code_change/4
+]).
+
+-export_type([
+    start_opts/0,
+    start_error/0,
+    stop_error/0
+]).
+
+%%--------------------------------------------------------------------
+%% APIs
+%%--------------------------------------------------------------------
+
+-define(DEFAULT_PURGE_RATE, 500).
+-define(ENABLE_KIND, purge).
+
+%% gen_statem states
+-define(disabled, disabled).
+-define(purging, purging).
+-define(cleaning_data, cleaning_data).
+
+-type start_opts() :: #{
+    purge_rate => pos_integer()
+}.
+-type start_error() :: already_started.
+-type stop_error() :: not_started.
+-type stats() :: #{
+    initial_sessions := non_neg_integer(),
+    current_sessions := non_neg_integer(),
+    purge_rate := pos_integer()
+}.
+-type status() :: {enabled, stats()} | disabled.
+
+-spec start(start_opts()) -> ok_or_error(start_error()).
+start(StartOpts) ->
+    Opts = maps:merge(default_opts(), StartOpts),
+    gen_statem:call(?MODULE, {start, Opts}).
+
+-spec stop() -> ok_or_error(not_started).
+stop() ->
+    gen_statem:call(?MODULE, stop).
+
+-spec status() -> status().
+status() ->
+    gen_statem:call(?MODULE, status).
+
+-spec start_link() -> startlink_ret().
+start_link() ->
+    gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+%%--------------------------------------------------------------------
+%% gen_statem callbacks
+%%--------------------------------------------------------------------
+
+callback_mode() -> handle_event_function.
+
+%% states: disabled, purging, cleaning_data
+
+init([]) ->
+    {ok, disabled, #{}}.
+
+%% start
+handle_event({call, From}, {start, Opts}, ?disabled, #{} = Data) ->
+    ok = enable_purge(),
+    ?SLOG(warning, #{
+        msg => "cluster_purge_started",
+        opts => Opts
+    }),
+    NewData = init_data(Data, Opts),
+    {next_state, ?purging, NewData, [
+        {state_timeout, 0, purge},
+        {reply, From, ok}
+    ]};
+handle_event({call, From}, {start, _Opts}, _State, #{}) ->
+    {keep_state_and_data, [{reply, From, {error, already_started}}]};
+%% stop
+handle_event({call, From}, stop, ?disabled, #{}) ->
+    {keep_state_and_data, [{reply, From, {error, not_started}}]};
+handle_event({call, From}, stop, _State, Data) ->
+    ok = disable_purge(),
+    ?SLOG(warning, #{msg => "cluster_purge_stopped"}),
+    {next_state, disabled, deinit(Data), [{reply, From, ok}]};
+%% status
+handle_event({call, From}, status, ?disabled, #{}) ->
+    {keep_state_and_data, [{reply, From, disabled}]};
+handle_event({call, From}, status, State, Data) ->
+    Stats = maps:with(
+        [
+            initial_sessions,
+            current_sessions,
+            purge_rate
+        ],
+        Data
+    ),
+    {keep_state_and_data, [
+        {reply, From, {enabled, Stats#{state => State}}}
+    ]};
+%% session purge
+handle_event(
+    state_timeout,
+    purge,
+    ?purging,
+    #{
+        purge_rate := PurgeRate
+    } = Data
+) ->
+    case emqx_eviction_agent:all_channels_count() of
+        Sessions when Sessions > 0 ->
+            ok = purge_sessions(PurgeRate),
+            ?tp(
+                warning,
+                "cluster_purge_evict_sessions",
+                #{
+                    count => Sessions,
+                    purge_rate => PurgeRate
+                }
+            ),
+            NewData = Data#{current_sessions => Sessions},
+            {keep_state, NewData, [{state_timeout, ?EVICT_INTERVAL, purge}]};
+        _Sessions = 0 ->
+            NewData = Data#{current_conns => 0},
+            ?SLOG(warning, #{msg => "cluster_purge_evict_sessions_done"}),
+            {next_state, ?cleaning_data, NewData, [
+                {state_timeout, 0, clean_retained_messages}
+            ]}
+    end;
+handle_event(
+    state_timeout,
+    clean_retained_messages,
+    ?cleaning_data,
+    Data
+) ->
+    ?SLOG(warning, #{msg => "cluster_purge_cleaning_data"}),
+    ok = emqx_retainer:clean(),
+    ok = emqx_delayed:clear_all(),
+    ?tp(warning, "cluster_purge_done", #{}),
+    ok = disable_purge(),
+    ?tp(warning, "cluster_purge_finished_successfully", #{}),
+    {next_state, ?disabled, deinit(Data)};
+handle_event({call, From}, Msg, State, Data) ->
+    ?SLOG(warning, #{msg => "unknown_call", call => Msg, state => State, data => Data}),
+    {keep_state_and_data, [{reply, From, ignored}]};
+handle_event(info, Msg, State, Data) ->
+    ?SLOG(warning, #{msg => "unknown_info", info => Msg, state => State, data => Data}),
+    keep_state_and_data;
+handle_event(cast, Msg, State, Data) ->
+    ?SLOG(warning, #{msg => "unknown_cast", cast => Msg, state => State, data => Data}),
+    keep_state_and_data.
+
+code_change(_Vsn, State, Data, _Extra) ->
+    {ok, State, Data}.
+
+%%--------------------------------------------------------------------
+%% internal funs
+%%--------------------------------------------------------------------
+
+default_opts() ->
+    #{
+        purge_rate => ?DEFAULT_PURGE_RATE
+    }.
+
+init_data(Data0, Opts) ->
+    Data1 = maps:merge(Data0, Opts),
+    SessCount = emqx_eviction_agent:session_count(),
+    Data1#{
+        initial_sessions => SessCount,
+        current_sessions => SessCount
+    }.
+
+deinit(Data) ->
+    Keys =
+        [initial_sessions, current_sessions | maps:keys(default_opts())],
+    maps:without(Keys, Data).
+
+multicall(Nodes, F, A) ->
+    case apply(emqx_node_rebalance_proto_v2, F, [Nodes | A]) of
+        {Results, []} ->
+            case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of
+                {_OkResults, []} ->
+                    ok;
+                {_, BadResults} ->
+                    %% we crash on errors so that the coordinator death is signalled to
+                    %% the eviction agents in the cluster.
+                    error({bad_nodes, BadResults})
+            end;
+        {_, [_BadNode | _] = BadNodes} ->
+            error({bad_nodes, BadNodes})
+    end.
+
+is_ok({_Node, {ok, _}}) -> true;
+is_ok({_Node, ok}) -> true;
+is_ok(_) -> false.
+
+enable_purge() ->
+    Nodes = emqx:running_nodes(),
+    _ = multicall(Nodes, enable_rebalance_agent, [self(), ?ENABLE_KIND]),
+    ok.
+
+disable_purge() ->
+    Nodes = emqx:running_nodes(),
+    _ = multicall(Nodes, disable_rebalance_agent, [self(), ?ENABLE_KIND]),
+    ok.
+
+purge_sessions(PurgeRate) ->
+    Nodes = emqx:running_nodes(),
+    _ = multicall(Nodes, purge_sessions, [PurgeRate]),
+    ok.

+ 51 - 12
apps/emqx_node_rebalance/src/emqx_node_rebalance_status.erl

@@ -15,6 +15,7 @@
 %% For RPC
 -export([
     evacuation_status/0,
+    purge_status/0,
     rebalance_status/0
 ]).
 
@@ -22,11 +23,13 @@
 %% APIs
 %%--------------------------------------------------------------------
 
--spec local_status() -> disabled | {evacuation, map()} | {rebalance, map()}.
+-spec local_status() -> disabled | {evacuation, map()} | {purge, map()} | {rebalance, map()}.
 local_status() ->
-    case emqx_node_rebalance_evacuation:status() of
-        {enabled, Status} ->
-            {evacuation, evacuation(Status)};
+    Checks = [
+        {evacuation, fun emqx_node_rebalance_evacuation:status/0, fun evacuation/1},
+        {purge, fun emqx_node_rebalance_purge:status/0, fun purge/1}
+    ],
+    case do_local_status(Checks) of
         disabled ->
             case emqx_node_rebalance_agent:status() of
                 {enabled, CoordinatorPid} ->
@@ -38,28 +41,37 @@ local_status() ->
                     end;
                 disabled ->
                     disabled
-            end
+            end;
+        Res ->
+            Res
     end.
 
--spec local_status(node()) -> disabled | {evacuation, map()} | {rebalance, map()}.
+-spec local_status(node()) -> disabled | {evacuation, map()} | {purge, map()} | {rebalance, map()}.
 local_status(Node) ->
-    emqx_node_rebalance_status_proto_v1:local_status(Node).
+    emqx_node_rebalance_status_proto_v2:local_status(Node).
 
 -spec format_local_status(map()) -> iodata().
 format_local_status(Status) ->
     format_status(Status, local_status_field_format_order()).
 
--spec global_status() -> #{rebalances := [{node(), map()}], evacuations := [{node(), map()}]}.
+-spec global_status() ->
+    #{
+        rebalances := [{node(), map()}],
+        evacuations := [{node(), map()}],
+        purges := [{node(), map()}]
+    }.
 global_status() ->
     Nodes = emqx:running_nodes(),
-    {RebalanceResults, _} = emqx_node_rebalance_status_proto_v1:rebalance_status(Nodes),
+    {RebalanceResults, _} = emqx_node_rebalance_status_proto_v2:rebalance_status(Nodes),
     Rebalances = [
         {Node, coordinator_rebalance(Status)}
      || {Node, {enabled, Status}} <- RebalanceResults
     ],
-    {EvacuatioResults, _} = emqx_node_rebalance_status_proto_v1:evacuation_status(Nodes),
-    Evacuations = [{Node, evacuation(Status)} || {Node, {enabled, Status}} <- EvacuatioResults],
-    #{rebalances => Rebalances, evacuations => Evacuations}.
+    {EvacuationResults, _} = emqx_node_rebalance_status_proto_v2:evacuation_status(Nodes),
+    Evacuations = [{Node, evacuation(Status)} || {Node, {enabled, Status}} <- EvacuationResults],
+    {PurgeResults, _} = emqx_node_rebalance_status_proto_v2:purge_status(Nodes),
+    Purges = [{Node, purge(Status)} || {Node, {enabled, Status}} <- PurgeResults],
+    #{rebalances => Rebalances, evacuations => Evacuations, purges => Purges}.
 
 -spec format_coordinator_status(map()) -> iodata().
 format_coordinator_status(Status) ->
@@ -85,6 +97,17 @@ evacuation(Status) ->
         }
     }.
 
+purge(Status) ->
+    #{
+        state => maps:get(state, Status),
+        purge_rate => maps:get(purge_rate, Status),
+        session_goal => 0,
+        stats => #{
+            initial_sessions => maps:get(initial_sessions, Status),
+            current_sessions => maps:get(current_sessions, Status)
+        }
+    }.
+
 local_rebalance(#{donors := Donors} = Stats, Node) ->
     case lists:member(Node, Donors) of
         true -> {rebalance, donor_rebalance(Stats, Node)};
@@ -159,6 +182,7 @@ local_status_field_format_order() ->
         coordinator_node,
         connection_eviction_rate,
         session_eviction_rate,
+        purge_rate,
         connection_goal,
         session_goal,
         disconnected_session_goal,
@@ -201,6 +225,8 @@ format_local_status_field({connection_eviction_rate, ConnEvictRate}) ->
     io_lib:format("Connection eviction rate: ~p connections/second~n", [ConnEvictRate]);
 format_local_status_field({session_eviction_rate, SessEvictRate}) ->
     io_lib:format("Session eviction rate: ~p sessions/second~n", [SessEvictRate]);
+format_local_status_field({purge_rate, PurgeRate}) ->
+    io_lib:format("Purge rate: ~p sessions/second~n", [PurgeRate]);
 format_local_status_field({connection_goal, ConnGoal}) ->
     io_lib:format("Connection goal: ~p~n", [ConnGoal]);
 format_local_status_field({session_goal, SessGoal}) ->
@@ -231,8 +257,21 @@ format_local_stats(Stats) ->
         )
     ].
 
+do_local_status([{Type, Get, Cont} | Rest]) ->
+    case Get() of
+        disabled ->
+            do_local_status(Rest);
+        {enabled, Status} ->
+            {Type, Cont(Status)}
+    end;
+do_local_status([]) ->
+    disabled.
+
 evacuation_status() ->
     {node(), emqx_node_rebalance_evacuation:status()}.
 
+purge_status() ->
+    {node(), emqx_node_rebalance_purge:status()}.
+
 rebalance_status() ->
     {node(), emqx_node_rebalance:status()}.

+ 1 - 0
apps/emqx_node_rebalance/src/emqx_node_rebalance_sup.erl

@@ -15,6 +15,7 @@ start_link() ->
 
 init([]) ->
     Childs = [
+        child_spec(emqx_node_rebalance_purge, []),
         child_spec(emqx_node_rebalance_evacuation, []),
         child_spec(emqx_node_rebalance_agent, []),
         child_spec(emqx_node_rebalance, [])

+ 59 - 0
apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_api_proto_v2.erl

@@ -0,0 +1,59 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_node_rebalance_api_proto_v2).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+
+    node_rebalance_evacuation_start/2,
+    node_rebalance_evacuation_stop/1,
+
+    node_rebalance_start/2,
+    node_rebalance_stop/1,
+
+    %% Introduced in v2:
+    node_rebalance_purge_start/2,
+    node_rebalance_purge_stop/1
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+-include_lib("emqx/include/types.hrl").
+
+introduced_in() ->
+    "5.2.1".
+
+-spec node_rebalance_evacuation_start(node(), emqx_node_rebalance_evacuation:start_opts()) ->
+    emqx_rpc:badrpc() | ok_or_error(emqx_node_rebalance_evacuation:start_error()).
+node_rebalance_evacuation_start(Node, #{} = Opts) ->
+    rpc:call(Node, emqx_node_rebalance_evacuation, start, [Opts]).
+
+-spec node_rebalance_evacuation_stop(node()) ->
+    emqx_rpc:badrpc() | ok_or_error(not_started).
+node_rebalance_evacuation_stop(Node) ->
+    rpc:call(Node, emqx_node_rebalance_evacuation, stop, []).
+
+-spec node_rebalance_start(node(), emqx_node_rebalance:start_opts()) ->
+    emqx_rpc:badrpc() | ok_or_error(emqx_node_rebalance:start_error()).
+node_rebalance_start(Node, Opts) ->
+    rpc:call(Node, emqx_node_rebalance, start, [Opts]).
+
+-spec node_rebalance_stop(node()) ->
+    emqx_rpc:badrpc() | ok_or_error(not_started).
+node_rebalance_stop(Node) ->
+    rpc:call(Node, emqx_node_rebalance, stop, []).
+
+%% Introduced in v2:
+
+-spec node_rebalance_purge_start(node(), emqx_node_rebalance_purge:start_opts()) ->
+    emqx_rpc:badrpc() | ok_or_error(emqx_node_rebalance_purge:start_error()).
+node_rebalance_purge_start(Node, #{} = Opts) ->
+    rpc:call(Node, emqx_node_rebalance_purge, start, [Opts]).
+
+-spec node_rebalance_purge_stop(node()) ->
+    emqx_rpc:badrpc() | ok_or_error(emqx_node_rebalance_purge:stop_error()).
+node_rebalance_purge_stop(Node) ->
+    rpc:call(Node, emqx_node_rebalance_purge, stop, []).

+ 84 - 0
apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_proto_v2.erl

@@ -0,0 +1,84 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_node_rebalance_proto_v2).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+
+    available_nodes/1,
+    evict_connections/2,
+    evict_sessions/4,
+    connection_counts/1,
+    session_counts/1,
+    enable_rebalance_agent/2,
+    disable_rebalance_agent/2,
+    disconnected_session_counts/1,
+
+    %% Introduced in v2:
+    enable_rebalance_agent/3,
+    disable_rebalance_agent/3,
+    purge_sessions/2
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+-include_lib("emqx/include/types.hrl").
+
+introduced_in() ->
+    "5.2.1".
+
+-spec available_nodes([node()]) -> emqx_rpc:multicall_result(node()).
+available_nodes(Nodes) ->
+    rpc:multicall(Nodes, emqx_node_rebalance, is_node_available, []).
+
+-spec evict_connections([node()], non_neg_integer()) ->
+    emqx_rpc:multicall_result(ok_or_error(disabled)).
+evict_connections(Nodes, Count) ->
+    rpc:multicall(Nodes, emqx_eviction_agent, evict_connections, [Count]).
+
+-spec evict_sessions([node()], non_neg_integer(), [node()], emqx_channel:conn_state()) ->
+    emqx_rpc:multicall_result(ok_or_error(disabled)).
+evict_sessions(Nodes, Count, RecipientNodes, ConnState) ->
+    rpc:multicall(Nodes, emqx_eviction_agent, evict_sessions, [Count, RecipientNodes, ConnState]).
+
+-spec connection_counts([node()]) -> emqx_rpc:multicall_result({ok, non_neg_integer()}).
+connection_counts(Nodes) ->
+    rpc:multicall(Nodes, emqx_node_rebalance, connection_count, []).
+
+-spec session_counts([node()]) -> emqx_rpc:multicall_result({ok, non_neg_integer()}).
+session_counts(Nodes) ->
+    rpc:multicall(Nodes, emqx_node_rebalance, session_count, []).
+
+-spec enable_rebalance_agent([node()], pid()) ->
+    emqx_rpc:multicall_result(ok_or_error(already_enabled | eviction_agent_busy)).
+enable_rebalance_agent(Nodes, OwnerPid) ->
+    rpc:multicall(Nodes, emqx_node_rebalance_agent, enable, [OwnerPid]).
+
+-spec disable_rebalance_agent([node()], pid()) ->
+    emqx_rpc:multicall_result(ok_or_error(already_disabled | invalid_coordinator)).
+disable_rebalance_agent(Nodes, OwnerPid) ->
+    rpc:multicall(Nodes, emqx_node_rebalance_agent, disable, [OwnerPid]).
+
+-spec disconnected_session_counts([node()]) -> emqx_rpc:multicall_result({ok, non_neg_integer()}).
+disconnected_session_counts(Nodes) ->
+    rpc:multicall(Nodes, emqx_node_rebalance, disconnected_session_count, []).
+
+%% Introduced in v2:
+
+-spec enable_rebalance_agent([node()], pid(), emqx_eviction_agent:kind()) ->
+    emqx_rpc:multicall_result(ok_or_error(already_enabled | eviction_agent_busy)).
+enable_rebalance_agent(Nodes, OwnerPid, Kind) ->
+    rpc:multicall(Nodes, emqx_node_rebalance_agent, enable, [OwnerPid, Kind]).
+
+-spec disable_rebalance_agent([node()], pid(), emqx_eviction_agent:kind()) ->
+    emqx_rpc:multicall_result(ok_or_error(already_disabled | invalid_coordinator)).
+disable_rebalance_agent(Nodes, OwnerPid, Kind) ->
+    rpc:multicall(Nodes, emqx_node_rebalance_agent, disable, [OwnerPid, Kind]).
+
+-spec purge_sessions([node()], non_neg_integer()) ->
+    emqx_rpc:multicall_result(ok_or_error(disabled)).
+purge_sessions(Nodes, Count) ->
+    rpc:multicall(Nodes, emqx_eviction_agent, purge_sessions, [Count]).

+ 29 - 0
apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_purge_proto_v1.erl

@@ -0,0 +1,29 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_node_rebalance_purge_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+
+    start/2,
+    stop/1
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.2.1".
+
+-spec start([node()], emqx_node_rebalance_purge:start_opts()) ->
+    emqx_rpc:erpc_multicall(ok | {error, emqx_node_rebalance_purge:start_error()}).
+start(Nodes, Opts) ->
+    erpc:multicall(Nodes, emqx_node_rebalance_purge, start, [Opts]).
+
+-spec stop([node()]) ->
+    emqx_rpc:erpc_multicall(ok | {error, emqx_node_rebalance_purge:stop_error()}).
+stop(Nodes) ->
+    erpc:multicall(Nodes, emqx_node_rebalance_purge, stop, []).

+ 46 - 0
apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_status_proto_v2.erl

@@ -0,0 +1,46 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_node_rebalance_status_proto_v2).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+
+    local_status/1,
+    rebalance_status/1,
+    evacuation_status/1,
+
+    %% Introduced in v2:
+    purge_status/1
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+-include_lib("emqx/include/types.hrl").
+
+introduced_in() ->
+    "5.2.1".
+
+-spec local_status(node()) ->
+    emqx_rpc:badrpc() | disabled | {evacuation, map()} | {rebalance, map()}.
+local_status(Node) ->
+    rpc:call(Node, emqx_node_rebalance_status, local_status, []).
+
+-spec rebalance_status([node()]) ->
+    emqx_rpc:multicall_result({node(), map()}).
+rebalance_status(Nodes) ->
+    rpc:multicall(Nodes, emqx_node_rebalance_status, rebalance_status, []).
+
+-spec evacuation_status([node()]) ->
+    emqx_rpc:multicall_result({node(), map()}).
+evacuation_status(Nodes) ->
+    rpc:multicall(Nodes, emqx_node_rebalance_status, evacuation_status, []).
+
+%% Introduced in v2:
+
+-spec purge_status([node()]) ->
+    emqx_rpc:multicall_result({node(), map()}).
+purge_status(Nodes) ->
+    rpc:multicall(Nodes, emqx_node_rebalance_status, purge_status, []).

+ 131 - 22
apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl

@@ -38,32 +38,35 @@ end_per_suite(_Config) ->
     ok.
 
 init_per_testcase(Case, Config) ->
-    [{DonorNode, _} | _] =
-        ClusterNodes = emqx_eviction_agent_test_helpers:start_cluster(
-            [
-                {case_specific_node_name(?MODULE, Case, '_donor'), 2883},
-                {case_specific_node_name(?MODULE, Case, '_recipient'), 3883}
-            ],
-            ?START_APPS,
-            [{emqx, data_dir, case_specific_data_dir(Case, Config)}]
+    DonorNode = case_specific_node_name(?MODULE, Case, '_donor'),
+    RecipientNode = case_specific_node_name(?MODULE, Case, '_recipient'),
+    Spec = #{
+        role => core,
+        join_to => emqx_cth_cluster:node_name(DonorNode),
+        listeners => true,
+        apps => app_specs()
+    },
+    Cluster = [{Node, Spec} || Node <- [DonorNode, RecipientNode]],
+    ClusterNodes =
+        [Node1 | _] = emqx_cth_cluster:start(
+            Cluster,
+            #{work_dir => ?config(priv_dir, Config)}
         ),
-
-    ok = rpc:call(DonorNode, emqx_mgmt_api_test_util, init_suite, []),
-    ok = take_auth_header_from(DonorNode),
-
+    ok = rpc:call(Node1, emqx_mgmt_api_test_util, init_suite, []),
+    ok = take_auth_header_from(Node1),
     [{cluster_nodes, ClusterNodes} | Config].
 end_per_testcase(_Case, Config) ->
-    _ = emqx_eviction_agent_test_helpers:stop_cluster(
-        ?config(cluster_nodes, Config),
-        ?START_APPS
-    ).
+    Nodes = ?config(cluster_nodes, Config),
+    erpc:multicall(Nodes, meck, unload, []),
+    _ = emqx_cth_cluster:stop(Nodes),
+    ok.
 
 %%--------------------------------------------------------------------
 %% Tests
 %%--------------------------------------------------------------------
 
 t_start_evacuation_validation(Config) ->
-    [{DonorNode, _}, {RecipientNode, _}] = ?config(cluster_nodes, Config),
+    [DonorNode, RecipientNode] = ?config(cluster_nodes, Config),
     BadOpts = [
         #{conn_evict_rate => <<"conn">>},
         #{sess_evict_rate => <<"sess">>},
@@ -117,10 +120,87 @@ t_start_evacuation_validation(Config) ->
         api_get(["load_rebalance", "global_status"])
     ).
 
+%% TODO: uncomment after we officially release the feature.
+skipped_t_start_purge_validation(Config) ->
+    [Node1 | _] = ?config(cluster_nodes, Config),
+    Port1 = get_mqtt_port(Node1, tcp),
+    BadOpts = [
+        #{purge_rate => <<"conn">>},
+        #{purge_rate => 0},
+        #{purge_rate => -1},
+        #{purge_rate => 1.1},
+        #{unknown => <<"Value">>}
+    ],
+    lists:foreach(
+        fun(Opts) ->
+            ?assertMatch(
+                {ok, 400, #{}},
+                api_post(
+                    ["load_rebalance", atom_to_list(Node1), "purge", "start"],
+                    Opts
+                ),
+                Opts
+            )
+        end,
+        BadOpts
+    ),
+    ?assertMatch(
+        {ok, 404, #{}},
+        api_post(
+            ["load_rebalance", "bad@node", "purge", "start"],
+            #{}
+        )
+    ),
+
+    process_flag(trap_exit, true),
+    Conns = emqtt_connect_many(Port1, 100),
+
+    ?assertMatch(
+        {ok, 200, #{}},
+        api_post(
+            ["load_rebalance", atom_to_list(Node1), "purge", "start"],
+            #{purge_rate => 10}
+        )
+    ),
+
+    Node1Bin = atom_to_binary(Node1),
+    ?assertMatch(
+        {ok, 200, #{<<"purges">> := [#{<<"node">> := Node1Bin}]}},
+        api_get(["load_rebalance", "global_status"])
+    ),
+
+    ?assertMatch(
+        {ok, 200, #{
+            <<"process">> := <<"purge">>,
+            <<"purge_rate">> := 10,
+            <<"session_goal">> := 0,
+            <<"state">> := <<"purging">>,
+            <<"stats">> :=
+                #{
+                    <<"current_sessions">> := _,
+                    <<"initial_sessions">> := 100
+                }
+        }},
+        api_get(["load_rebalance", "status"])
+    ),
+
+    ?assertMatch(
+        {ok, 200, #{}},
+        api_post(
+            ["load_rebalance", atom_to_list(Node1), "purge", "stop"],
+            #{}
+        )
+    ),
+
+    ok = stop_many(Conns),
+
+    ok.
+
 t_start_rebalance_validation(Config) ->
     process_flag(trap_exit, true),
 
-    [{DonorNode, DonorPort}, {RecipientNode, _}] = ?config(cluster_nodes, Config),
+    [DonorNode, RecipientNode] = ?config(cluster_nodes, Config),
+    DonorPort = get_mqtt_port(DonorNode, tcp),
 
     BadOpts = [
         #{conn_evict_rate => <<"conn">>},
@@ -189,7 +269,7 @@ t_start_rebalance_validation(Config) ->
     ok = stop_many(Conns).
 
 t_start_stop_evacuation(Config) ->
-    [{DonorNode, _}, {RecipientNode, _}] = ?config(cluster_nodes, Config),
+    [DonorNode, RecipientNode] = ?config(cluster_nodes, Config),
 
     StartOpts = maps:merge(
         maps:get(evacuation, emqx_node_rebalance_api:rebalance_evacuation_example()),
@@ -284,7 +364,8 @@ t_start_stop_evacuation(Config) ->
 t_start_stop_rebalance(Config) ->
     process_flag(trap_exit, true),
 
-    [{DonorNode, DonorPort}, {RecipientNode, _}] = ?config(cluster_nodes, Config),
+    [DonorNode, RecipientNode] = ?config(cluster_nodes, Config),
+    DonorPort = get_mqtt_port(DonorNode, tcp),
 
     ?assertMatch(
         {ok, 200, #{<<"status">> := <<"disabled">>}},
@@ -390,7 +471,7 @@ t_start_stop_rebalance(Config) ->
     ok = stop_many(Conns).
 
 t_availability_check(Config) ->
-    [{DonorNode, _} | _] = ?config(cluster_nodes, Config),
+    [DonorNode | _] = ?config(cluster_nodes, Config),
     ?assertMatch(
         {ok, 200, #{}},
         api_get(["load_rebalance", "availability_check"])
@@ -425,7 +506,12 @@ api_get(Path) ->
 api_post(Path, Data) ->
     case request(post, uri(Path), Data) of
         {ok, Code, ResponseBody} ->
-            {ok, Code, jiffy:decode(ResponseBody, [return_maps])};
+            Res =
+                case emqx_utils_json:safe_decode(ResponseBody, [return_maps]) of
+                    {ok, Decoded} -> Decoded;
+                    {error, _} -> ResponseBody
+                end,
+            {ok, Code, Res};
         {error, _} = Error ->
             Error
     end.
@@ -444,3 +530,26 @@ case_specific_data_dir(Case, Config) ->
         undefined -> undefined;
         PrivDir -> filename:join(PrivDir, atom_to_list(Case))
     end.
+
+app_specs() ->
+    [
+        {emqx, #{
+            before_start => fun() ->
+                emqx_app:set_config_loader(?MODULE)
+            end,
+            override_env => [{boot_modules, [broker, listeners]}]
+        }},
+        {emqx_retainer, #{
+            config =>
+                #{
+                    retainer =>
+                        #{enable => true}
+                }
+        }},
+        emqx_eviction_agent,
+        emqx_node_rebalance
+    ].
+
+get_mqtt_port(Node, Type) ->
+    {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
+    Port.

+ 83 - 0
apps/emqx_node_rebalance/test/emqx_node_rebalance_cli_SUITE.erl

@@ -156,6 +156,80 @@ t_evacuation(_Config) ->
         emqx_node_rebalance_evacuation:status()
     ).
 
+t_purge(_Config) ->
+    %% start with invalid args
+    ?assertNot(
+        emqx_node_rebalance_cli:cli(["start", "--purge", "--foo-bar"])
+    ),
+
+    ?assertNot(
+        emqx_node_rebalance_cli:cli(["start", "--purge", "--purge-rate", "foobar"])
+    ),
+
+    %% not used by this scenario
+    ?assertNot(
+        emqx_node_rebalance_cli:cli(["start", "--purge", "--conn-evict-rate", "1"])
+    ),
+
+    ?assertNot(
+        emqx_node_rebalance_cli:cli(["start", "--purge", "--sess-evict-rate", "1"])
+    ),
+
+    ?assertNot(
+        emqx_node_rebalance_cli:cli(["start", "--purge", "--wait-takeover", "1"])
+    ),
+
+    ?assertNot(
+        emqx_node_rebalance_cli:cli([
+            "start",
+            "--purge",
+            "--migrate-to",
+            atom_to_list(node())
+        ])
+    ),
+    with_some_sessions(fun() ->
+        ?assert(
+            emqx_node_rebalance_cli:cli([
+                "start",
+                "--purge",
+                "--purge-rate",
+                "10"
+            ])
+        ),
+
+        %% status
+        ok = emqx_node_rebalance_cli:cli(["status"]),
+        ok = emqx_node_rebalance_cli:cli(["node-status"]),
+        ok = emqx_node_rebalance_cli:cli(["node-status", atom_to_list(node())]),
+
+        ?assertMatch(
+            {enabled, #{}},
+            emqx_node_rebalance_purge:status()
+        ),
+
+        %% already enabled
+        ?assertNot(
+            emqx_node_rebalance_cli:cli([
+                "start",
+                "--purge",
+                "--purge-rate",
+                "10"
+            ])
+        ),
+        true = emqx_node_rebalance_cli:cli(["stop"]),
+        ok
+    end),
+    %% stop
+
+    false = emqx_node_rebalance_cli:cli(["stop"]),
+
+    ?assertEqual(
+        disabled,
+        emqx_node_rebalance_purge:status()
+    ),
+
+    ok.
+
 t_rebalance(Config) ->
     process_flag(trap_exit, true),
 
@@ -289,3 +363,12 @@ emqx_node_rebalance_cli(Node, Args) ->
         Result ->
             Result
     end.
+
+%% to avoid it finishing too fast
+with_some_sessions(Fn) ->
+    emqx_common_test_helpers:with_mock(
+        emqx_eviction_agent,
+        all_channels_count,
+        fun() -> 100 end,
+        Fn
+    ).

+ 360 - 0
apps/emqx_node_rebalance/test/emqx_node_rebalance_purge_SUITE.erl

@@ -0,0 +1,360 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_node_rebalance_purge_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx/include/asserts.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-import(
+    emqx_eviction_agent_test_helpers,
+    [
+        emqtt_connect/1,
+        emqtt_try_connect/1,
+        case_specific_node_name/3
+    ]
+).
+
+all() ->
+    [{group, one_node}, {group, two_nodes}].
+
+groups() ->
+    [
+        {one_node, [], one_node_cases()},
+        {two_nodes, [], two_nodes_cases()}
+    ].
+
+two_nodes_cases() ->
+    [
+        t_already_started_two,
+        t_session_purged
+    ].
+
+one_node_cases() ->
+    emqx_common_test_helpers:all(?MODULE) -- two_nodes_cases().
+
+init_per_suite(Config) ->
+    ok = emqx_common_test_helpers:start_apps([]),
+    Config.
+
+end_per_suite(_Config) ->
+    ok = emqx_common_test_helpers:stop_apps([]),
+    ok.
+
+init_per_group(one_node, Config) ->
+    [{cluster_type, one_node} | Config];
+init_per_group(two_nodes, Config) ->
+    [{cluster_type, two_nodes} | Config].
+
+end_per_group(_Group, _Config) ->
+    ok.
+
+init_per_testcase(TestCase, Config) ->
+    ct:timetrap({seconds, 30}),
+    Nodes =
+        [Node1 | _] =
+        case ?config(cluster_type, Config) of
+            one_node ->
+                [case_specific_node_name(?MODULE, TestCase, '_1')];
+            two_nodes ->
+                [
+                    case_specific_node_name(?MODULE, TestCase, '_1'),
+                    case_specific_node_name(?MODULE, TestCase, '_2')
+                ]
+        end,
+    Spec = #{
+        role => core,
+        join_to => emqx_cth_cluster:node_name(Node1),
+        listeners => true,
+        apps => app_specs()
+    },
+    Cluster = [{Node, Spec} || Node <- Nodes],
+    ClusterNodes = emqx_cth_cluster:start(
+        Cluster,
+        #{work_dir => ?config(priv_dir, Config)}
+    ),
+    ok = snabbkaffe:start_trace(),
+    [{cluster_nodes, ClusterNodes} | Config].
+
+end_per_testcase(_TestCase, Config) ->
+    Nodes = ?config(cluster_nodes, Config),
+    ok = snabbkaffe:stop(),
+    erpc:multicall(Nodes, meck, unload, []),
+    ok = emqx_cth_cluster:stop(Nodes),
+    ok.
+
+%%--------------------------------------------------------------------
+%% Helpers
+%%--------------------------------------------------------------------
+
+app_specs() ->
+    [
+        {emqx, #{
+            before_start => fun() ->
+                emqx_app:set_config_loader(?MODULE)
+            end,
+            override_env => [{boot_modules, [broker, listeners]}]
+        }},
+        {emqx_retainer, #{
+            config =>
+                #{
+                    retainer =>
+                        #{enable => true}
+                }
+        }},
+        {emqx_modules, #{
+            config =>
+                #{delayed => #{enable => true}}
+        }},
+        emqx_eviction_agent,
+        emqx_node_rebalance
+    ].
+
+opts(_Config) ->
+    #{
+        purge_rate => 10
+    }.
+
+case_specific_data_dir(Case, Config) ->
+    case ?config(priv_dir, Config) of
+        undefined -> undefined;
+        PrivDir -> filename:join(PrivDir, atom_to_list(Case))
+    end.
+
+get_mqtt_port(Node, Type) ->
+    {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
+    Port.
+
+%% to avoid it finishing too fast
+with_some_sessions(Node, Fn) ->
+    erpc:call(Node, fun() ->
+        emqx_common_test_helpers:with_mock(
+            emqx_eviction_agent,
+            all_channels_count,
+            fun() -> 100 end,
+            Fn
+        )
+    end).
+
+drain_exits([ClientPid | Rest]) ->
+    receive
+        {'EXIT', ClientPid, _Reason} ->
+            drain_exits(Rest)
+    after 1_000 ->
+        ct:pal("mailbox:\n  ~p", [process_info(self(), messages)]),
+        ct:fail("pid ~p didn't die", [ClientPid])
+    end;
+drain_exits([]) ->
+    ok.
+
+emqtt_connect_many(Port, Count) ->
+    emqtt_connect_many(Port, Count, _StartN = 1).
+
+%% start many clients with mixed clean_start flags
+emqtt_connect_many(Port, Count, StartN) ->
+    lists:map(
+        fun(N) ->
+            NBin = integer_to_binary(N),
+            ClientId = <<"client-", NBin/binary>>,
+            CleanStart = N rem 2 == 0,
+            {ok, C} = emqtt_connect([{clientid, ClientId}, {clean_start, CleanStart}, {port, Port}]),
+            C
+        end,
+        lists:seq(StartN, StartN + Count - 1)
+    ).
+
+%%--------------------------------------------------------------------
+%% Test Cases : one node
+%%--------------------------------------------------------------------
+
+t_agent_busy(Config) ->
+    [Node] = ?config(cluster_nodes, Config),
+
+    ok = rpc:call(Node, emqx_eviction_agent, enable, [other_rebalance, undefined]),
+
+    erpc:call(Node, fun() ->
+        ?assertExit(
+            {{{bad_nodes, [{Node, {error, eviction_agent_busy}}]}, _}, _},
+            emqx_node_rebalance_purge:start(opts(Config))
+        )
+    end),
+
+    ok.
+
+t_already_started(Config) ->
+    [Node] = ?config(cluster_nodes, Config),
+    with_some_sessions(Node, fun() ->
+        ok = emqx_node_rebalance_purge:start(opts(Config)),
+
+        ?assertEqual(
+            {error, already_started},
+            emqx_node_rebalance_purge:start(opts(Config))
+        ),
+
+        ?assertEqual(
+            ok,
+            emqx_node_rebalance_purge:stop()
+        ),
+
+        ok
+    end),
+    ok.
+
+t_not_started(Config) ->
+    [Node] = ?config(cluster_nodes, Config),
+
+    ?assertEqual(
+        {error, not_started},
+        rpc:call(Node, emqx_node_rebalance_purge, stop, [])
+    ).
+
+t_start(Config) ->
+    [Node] = ?config(cluster_nodes, Config),
+    Port = get_mqtt_port(Node, tcp),
+
+    with_some_sessions(Node, fun() ->
+        process_flag(trap_exit, true),
+        ok = snabbkaffe:start_trace(),
+
+        ?assertEqual(
+            ok,
+            emqx_node_rebalance_purge:start(opts(Config))
+        ),
+        ?assertEqual({error, {use_another_server, #{}}}, emqtt_try_connect([{port, Port}])),
+        ok
+    end),
+    ok.
+
+t_non_persistence(Config) ->
+    [Node] = ?config(cluster_nodes, Config),
+    Port = get_mqtt_port(Node, tcp),
+
+    %% to avoid it finishing too fast
+    with_some_sessions(Node, fun() ->
+        process_flag(trap_exit, true),
+        ok = snabbkaffe:start_trace(),
+
+        ?assertEqual(
+            ok,
+            emqx_node_rebalance_purge:start(opts(Config))
+        ),
+
+        ?assertMatch(
+            {error, {use_another_server, #{}}},
+            emqtt_try_connect([{port, Port}])
+        ),
+
+        ok = supervisor:terminate_child(emqx_node_rebalance_sup, emqx_node_rebalance_purge),
+        {ok, _} = supervisor:restart_child(emqx_node_rebalance_sup, emqx_node_rebalance_purge),
+
+        ?assertMatch(
+            ok,
+            emqtt_try_connect([{port, Port}])
+        ),
+        ?assertMatch(disabled, emqx_node_rebalance_purge:status()),
+        ok
+    end),
+    ok.
+
+t_unknown_messages(Config) ->
+    process_flag(trap_exit, true),
+
+    [Node] = ?config(cluster_nodes, Config),
+
+    ok = rpc:call(Node, emqx_node_rebalance_purge, start, [opts(Config)]),
+    Pid = rpc:call(Node, erlang, whereis, [emqx_node_rebalance_purge]),
+    Pid ! unknown,
+    ok = gen_server:cast(Pid, unknown),
+    ?assertEqual(
+        ignored,
+        gen_server:call(Pid, unknown)
+    ),
+
+    ok.
+
+%%--------------------------------------------------------------------
+%% Test Cases : two nodes
+%%--------------------------------------------------------------------
+
+t_already_started_two(Config) ->
+    [Node1, _Node2] = ?config(cluster_nodes, Config),
+    with_some_sessions(Node1, fun() ->
+        ok = emqx_node_rebalance_purge:start(opts(Config)),
+
+        ?assertEqual(
+            {error, already_started},
+            emqx_node_rebalance_purge:start(opts(Config))
+        ),
+
+        ?assertEqual(
+            ok,
+            emqx_node_rebalance_purge:stop()
+        ),
+
+        ok
+    end),
+    ?assertEqual(
+        {error, not_started},
+        rpc:call(Node1, emqx_node_rebalance_purge, stop, [])
+    ),
+
+    ok.
+
+t_session_purged(Config) ->
+    process_flag(trap_exit, true),
+
+    [Node1, Node2] = ?config(cluster_nodes, Config),
+    Port1 = get_mqtt_port(Node1, tcp),
+    Port2 = get_mqtt_port(Node2, tcp),
+
+    %% N.B.: it's important to have an asymmetric number of clients for this test, as
+    %% otherwise the scenario might happen to finish successfully due to the wrong
+    %% reasons!
+    NumClientsNode1 = 5,
+    NumClientsNode2 = 35,
+    Node1Clients = emqtt_connect_many(Port1, NumClientsNode1, _StartN1 = 1),
+    Node2Clients = emqtt_connect_many(Port2, NumClientsNode2, _StartN2 = 21),
+    lists:foreach(
+        fun(C) ->
+            ClientId = proplists:get_value(clientid, emqtt:info(C)),
+            Topic = emqx_topic:join([<<"t">>, ClientId]),
+            Props = #{},
+            Payload = ClientId,
+            Opts = [{retain, true}],
+            ok = emqtt:publish(C, Topic, Props, Payload, Opts),
+            DelayedTopic = emqx_topic:join([<<"$delayed/120">>, Topic]),
+            ok = emqtt:publish(C, DelayedTopic, Payload),
+            {ok, _, [?RC_GRANTED_QOS_0]} = emqtt:subscribe(C, Topic),
+            ok
+        end,
+        Node1Clients ++ Node2Clients
+    ),
+
+    ?assertEqual(40, erpc:call(Node2, emqx_retainer, retained_count, [])),
+    ?assertEqual(NumClientsNode1, erpc:call(Node1, emqx_delayed, delayed_count, [])),
+    ?assertEqual(NumClientsNode2, erpc:call(Node2, emqx_delayed, delayed_count, [])),
+
+    {ok, SRef0} = snabbkaffe:subscribe(
+        ?match_event(#{?snk_kind := "cluster_purge_done"}),
+        15_000
+    ),
+    ok = rpc:call(Node1, emqx_node_rebalance_purge, start, [opts(Config)]),
+    {ok, _} = snabbkaffe:receive_events(SRef0),
+
+    ?assertEqual([], erpc:call(Node1, emqx_cm, all_channels, [])),
+    ?assertEqual([], erpc:call(Node2, emqx_cm, all_channels, [])),
+    ?assertEqual(0, erpc:call(Node1, emqx_retainer, retained_count, [])),
+    ?assertEqual(0, erpc:call(Node2, emqx_retainer, retained_count, [])),
+    ?assertEqual(0, erpc:call(Node1, emqx_delayed, delayed_count, [])),
+    ?assertEqual(0, erpc:call(Node2, emqx_delayed, delayed_count, [])),
+
+    ok = drain_exits(Node1Clients ++ Node2Clients),
+
+    ok.

+ 1 - 1
apps/emqx_node_rebalance/test/emqx_node_rebalance_status_SUITE.erl

@@ -57,7 +57,7 @@ end_per_suite(Config) ->
 
 t_cluster_status(Config) ->
     [CoreNode, ReplicantNode] = ?config(cluster_nodes, Config),
-    ok = emqx_node_rebalance_api_proto_v1:node_rebalance_evacuation_start(CoreNode, #{}),
+    ok = emqx_node_rebalance_api_proto_v2:node_rebalance_evacuation_start(CoreNode, #{}),
 
     ?assertMatch(
         #{evacuations := [_], rebalances := []},

+ 18 - 0
rel/i18n/emqx_node_rebalance_api.hocon

@@ -42,6 +42,18 @@ load_rebalance_evacuation_stop.desc:
 load_rebalance_evacuation_stop.label:
 """Stop evacuation"""
 
+cluster_purge_start.desc:
+"""Start purge process"""
+
+cluster_purge_start.label:
+"""Start purge"""
+
+cluster_purge_stop.desc:
+"""Stop purge process"""
+
+cluster_purge_stop.label:
+"""Stop purge"""
+
 param_node.desc:
 """Node name"""
 
@@ -150,6 +162,12 @@ local_status_session_eviction_rate.desc:
 local_status_session_eviction_rate.label:
 """Session eviction rate"""
 
+local_status_purge_rate.desc:
+"""The rate of purging sessions, in sessions per second"""
+
+local_status_purge_rate.label:
+"""Session purge rate"""
+
 local_status_connection_goal.desc:
 """The number of connections that the node should have after the rebalance/evacuation process"""