Bladeren bron

Merge pull request #11971 from savonarola/1116-no-auth-rebalance-status

feat(rebalance): improve rebalance usability
Ilya Averyanov 2 jaren geleden
bovenliggende
commit
c6fd1e4c75
24 gewijzigde bestanden met toevoegingen van 662 en 335 verwijderingen
  1. 1 0
      apps/emqx/priv/bpapi.versions
  2. 1 1
      apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src
  3. 45 19
      apps/emqx_eviction_agent/src/emqx_eviction_agent.erl
  4. 63 9
      apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl
  5. 15 4
      apps/emqx_eviction_agent/test/emqx_eviction_agent_api_SUITE.erl
  6. 13 5
      apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl
  7. 12 4
      apps/emqx_eviction_agent/test/emqx_eviction_agent_cli_SUITE.erl
  8. 34 48
      apps/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl
  9. 3 2
      apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src
  10. 12 4
      apps/emqx_node_rebalance/src/emqx_node_rebalance.erl
  11. 105 72
      apps/emqx_node_rebalance/src/emqx_node_rebalance_agent.erl
  12. 5 4
      apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl
  13. 29 20
      apps/emqx_node_rebalance/src/emqx_node_rebalance_evacuation.erl
  14. 1 1
      apps/emqx_node_rebalance/src/emqx_node_rebalance_purge.erl
  15. 8 0
      apps/emqx_node_rebalance/src/emqx_node_rebalance_status.erl
  16. 96 0
      apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_proto_v3.erl
  17. 63 16
      apps/emqx_node_rebalance/test/emqx_node_rebalance_SUITE.erl
  18. 23 15
      apps/emqx_node_rebalance/test/emqx_node_rebalance_agent_SUITE.erl
  19. 16 14
      apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl
  20. 65 55
      apps/emqx_node_rebalance/test/emqx_node_rebalance_cli_SUITE.erl
  21. 29 24
      apps/emqx_node_rebalance/test/emqx_node_rebalance_evacuation_SUITE.erl
  22. 18 18
      apps/emqx_node_rebalance/test/emqx_node_rebalance_purge_SUITE.erl
  23. 1 0
      apps/emqx_node_rebalance/test/emqx_node_rebalance_status_SUITE.erl
  24. 4 0
      changes/ee/feat-11971.en.md

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -43,6 +43,7 @@
 {emqx_mgmt_trace,2}.
 {emqx_mgmt_trace,2}.
 {emqx_node_rebalance,1}.
 {emqx_node_rebalance,1}.
 {emqx_node_rebalance,2}.
 {emqx_node_rebalance,2}.
+{emqx_node_rebalance,3}.
 {emqx_node_rebalance_api,1}.
 {emqx_node_rebalance_api,1}.
 {emqx_node_rebalance_api,2}.
 {emqx_node_rebalance_api,2}.
 {emqx_node_rebalance_evacuation,1}.
 {emqx_node_rebalance_evacuation,1}.

+ 1 - 1
apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src

@@ -1,6 +1,6 @@
 {application, emqx_eviction_agent, [
 {application, emqx_eviction_agent, [
     {description, "EMQX Eviction Agent"},
     {description, "EMQX Eviction Agent"},
-    {vsn, "5.1.4"},
+    {vsn, "5.1.5"},
     {registered, [
     {registered, [
         emqx_eviction_agent_sup,
         emqx_eviction_agent_sup,
         emqx_eviction_agent,
         emqx_eviction_agent,

+ 45 - 19
apps/emqx_eviction_agent/src/emqx_eviction_agent.erl

@@ -15,8 +15,11 @@
 -export([
 -export([
     start_link/0,
     start_link/0,
     enable/2,
     enable/2,
+    enable/3,
+    default_options/0,
     disable/1,
     disable/1,
     status/0,
     status/0,
+    enable_status/0,
     connection_count/0,
     connection_count/0,
     all_channels_count/0,
     all_channels_count/0,
     session_count/0,
     session_count/0,
@@ -51,7 +54,7 @@
     unhook/0
     unhook/0
 ]).
 ]).
 
 
--export_type([server_reference/0]).
+-export_type([server_reference/0, kind/0, options/0]).
 
 
 -define(CONN_MODULES, [
 -define(CONN_MODULES, [
     emqx_connection, emqx_ws_connection, emqx_quic_connection, emqx_eviction_agent_channel
     emqx_connection, emqx_ws_connection, emqx_quic_connection, emqx_eviction_agent_channel
@@ -67,15 +70,31 @@
     connections := non_neg_integer(),
     connections := non_neg_integer(),
     sessions := non_neg_integer()
     sessions := non_neg_integer()
 }.
 }.
--type kind() :: atom().
+
+%% kind() is any() because it was not exported previously
+%% and bpapi checker remembered it as any()
+-type kind() :: any().
+-type options() :: #{
+    allow_connections => boolean()
+}.
 
 
 -spec start_link() -> startlink_ret().
 -spec start_link() -> startlink_ret().
 start_link() ->
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 
+-spec default_options() -> options().
+default_options() ->
+    #{
+        allow_connections => false
+    }.
+
 -spec enable(kind(), server_reference()) -> ok_or_error(eviction_agent_busy).
 -spec enable(kind(), server_reference()) -> ok_or_error(eviction_agent_busy).
 enable(Kind, ServerReference) ->
 enable(Kind, ServerReference) ->
-    gen_server:call(?MODULE, {enable, Kind, ServerReference}).
+    gen_server:call(?MODULE, {enable, Kind, ServerReference, default_options()}).
+
+-spec enable(kind(), server_reference(), options()) -> ok_or_error(eviction_agent_busy).
+enable(Kind, ServerReference, #{} = Options) ->
+    gen_server:call(?MODULE, {enable, Kind, ServerReference, Options}).
 
 
 -spec disable(kind()) -> ok.
 -spec disable(kind()) -> ok.
 disable(Kind) ->
 disable(Kind) ->
@@ -84,16 +103,20 @@ disable(Kind) ->
 -spec status() -> status().
 -spec status() -> status().
 status() ->
 status() ->
     case enable_status() of
     case enable_status() of
-        {enabled, _Kind, _ServerReference} ->
+        {enabled, _Kind, _ServerReference, _Options} ->
             {enabled, stats()};
             {enabled, stats()};
         disabled ->
         disabled ->
             disabled
             disabled
     end.
     end.
 
 
+-spec enable_status() -> disabled | {enabled, kind(), server_reference(), options()}.
+enable_status() ->
+    persistent_term:get(?MODULE, disabled).
+
 -spec evict_connections(pos_integer()) -> ok_or_error(disabled).
 -spec evict_connections(pos_integer()) -> ok_or_error(disabled).
 evict_connections(N) ->
 evict_connections(N) ->
     case enable_status() of
     case enable_status() of
-        {enabled, _Kind, ServerReference} ->
+        {enabled, _Kind, ServerReference, _Options} ->
             ok = do_evict_connections(N, ServerReference);
             ok = do_evict_connections(N, ServerReference);
         disabled ->
         disabled ->
             {error, disabled}
             {error, disabled}
@@ -112,15 +135,16 @@ evict_sessions(N, Nodes, ConnState) when
     is_list(Nodes) andalso length(Nodes) > 0
     is_list(Nodes) andalso length(Nodes) > 0
 ->
 ->
     case enable_status() of
     case enable_status() of
-        {enabled, _Kind, _ServerReference} ->
+        {enabled, _Kind, _ServerReference, _Options} ->
             ok = do_evict_sessions(N, Nodes, ConnState);
             ok = do_evict_sessions(N, Nodes, ConnState);
         disabled ->
         disabled ->
             {error, disabled}
             {error, disabled}
     end.
     end.
 
 
+-spec purge_sessions(non_neg_integer()) -> ok_or_error(disabled).
 purge_sessions(N) ->
 purge_sessions(N) ->
     case enable_status() of
     case enable_status() of
-        {enabled, _Kind, _ServerReference} ->
+        {enabled, _Kind, _ServerReference, _Options} ->
             ok = do_purge_sessions(N);
             ok = do_purge_sessions(N);
         disabled ->
         disabled ->
             {error, disabled}
             {error, disabled}
@@ -135,14 +159,14 @@ init([]) ->
     {ok, #{}}.
     {ok, #{}}.
 
 
 %% enable
 %% enable
-handle_call({enable, Kind, ServerReference}, _From, St) ->
+handle_call({enable, Kind, ServerReference, Options}, _From, St) ->
     Reply =
     Reply =
         case enable_status() of
         case enable_status() of
             disabled ->
             disabled ->
-                ok = persistent_term:put(?MODULE, {enabled, Kind, ServerReference});
-            {enabled, Kind, _ServerReference} ->
-                ok = persistent_term:put(?MODULE, {enabled, Kind, ServerReference});
-            {enabled, _OtherKind, _ServerReference} ->
+                ok = persistent_term:put(?MODULE, {enabled, Kind, ServerReference, Options});
+            {enabled, Kind, _ServerReference, _Options} ->
+                ok = persistent_term:put(?MODULE, {enabled, Kind, ServerReference, Options});
+            {enabled, _OtherKind, _ServerReference, _Options} ->
                 {error, eviction_agent_busy}
                 {error, eviction_agent_busy}
         end,
         end,
     {reply, Reply, St};
     {reply, Reply, St};
@@ -152,10 +176,10 @@ handle_call({disable, Kind}, _From, St) ->
         case enable_status() of
         case enable_status() of
             disabled ->
             disabled ->
                 {error, disabled};
                 {error, disabled};
-            {enabled, Kind, _ServerReference} ->
+            {enabled, Kind, _ServerReference, _Options} ->
                 _ = persistent_term:erase(?MODULE),
                 _ = persistent_term:erase(?MODULE),
                 ok;
                 ok;
-            {enabled, _OtherKind, _ServerReference} ->
+            {enabled, _OtherKind, _ServerReference, _Options} ->
                 {error, eviction_agent_busy}
                 {error, eviction_agent_busy}
         end,
         end,
     {reply, Reply, St};
     {reply, Reply, St};
@@ -180,8 +204,10 @@ code_change(_Vsn, State, _Extra) ->
 
 
 on_connect(_ConnInfo, _Props) ->
 on_connect(_ConnInfo, _Props) ->
     case enable_status() of
     case enable_status() of
-        {enabled, _Kind, _ServerReference} ->
+        {enabled, _Kind, _ServerReference, #{allow_connections := false}} ->
             {stop, {error, ?RC_USE_ANOTHER_SERVER}};
             {stop, {error, ?RC_USE_ANOTHER_SERVER}};
+        {enabled, _Kind, _ServerReference, _Options} ->
+            ignore;
         disabled ->
         disabled ->
             ignore
             ignore
     end.
     end.
@@ -192,7 +218,7 @@ on_connack(
     Props
     Props
 ) ->
 ) ->
     case enable_status() of
     case enable_status() of
-        {enabled, _Kind, ServerReference} ->
+        {enabled, _Kind, ServerReference, _Options} ->
             {ok, Props#{'Server-Reference' => ServerReference}};
             {ok, Props#{'Server-Reference' => ServerReference}};
         disabled ->
         disabled ->
             {ok, Props}
             {ok, Props}
@@ -214,10 +240,10 @@ unhook() ->
     ok = emqx_hooks:del('client.connect', {?MODULE, on_connect}),
     ok = emqx_hooks:del('client.connect', {?MODULE, on_connect}),
     ok = emqx_hooks:del('client.connack', {?MODULE, on_connack}).
     ok = emqx_hooks:del('client.connack', {?MODULE, on_connack}).
 
 
-enable_status() ->
-    persistent_term:get(?MODULE, disabled).
+%%--------------------------------------------------------------------
+%% Internal funcs
+%%--------------------------------------------------------------------
 
 
-% connection management
 stats() ->
 stats() ->
     #{
     #{
         connections => connection_count(),
         connections => connection_count(),

+ 63 - 9
apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl

@@ -15,7 +15,11 @@
 
 
 -import(
 -import(
     emqx_eviction_agent_test_helpers,
     emqx_eviction_agent_test_helpers,
-    [emqtt_connect/0, emqtt_connect/1, emqtt_connect/2, emqtt_connect_for_publish/1]
+    [
+        emqtt_connect/0, emqtt_connect/1, emqtt_connect/2,
+        emqtt_connect_for_publish/1,
+        case_specific_node_name/1
+    ]
 ).
 ).
 
 
 -define(assertPrinted(Printed, Code),
 -define(assertPrinted(Printed, Code),
@@ -29,11 +33,19 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
     emqx_common_test_helpers:all(?MODULE).
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
-    emqx_common_test_helpers:start_apps([emqx_eviction_agent]),
-    Config.
+    Apps = emqx_cth_suite:start(
+        [
+            emqx,
+            emqx_eviction_agent
+        ],
+        #{
+            work_dir => emqx_cth_suite:work_dir(Config)
+        }
+    ),
+    [{apps, Apps} | Config].
 
 
-end_per_suite(_Config) ->
-    emqx_common_test_helpers:stop_apps([emqx_eviction_agent]).
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(apps, Config)).
 
 
 init_per_testcase(Case, Config) ->
 init_per_testcase(Case, Config) ->
     _ = emqx_eviction_agent:disable(test_eviction),
     _ = emqx_eviction_agent:disable(test_eviction),
@@ -41,10 +53,17 @@ init_per_testcase(Case, Config) ->
     start_slave(Case, Config).
     start_slave(Case, Config).
 
 
 start_slave(t_explicit_session_takeover, Config) ->
 start_slave(t_explicit_session_takeover, Config) ->
+    NodeNames =
+        [
+            t_explicit_session_takeover_donor,
+            t_explicit_session_takeover_recipient
+        ],
     ClusterNodes = emqx_eviction_agent_test_helpers:start_cluster(
     ClusterNodes = emqx_eviction_agent_test_helpers:start_cluster(
-        [{evacuate_test1, 2883}, {evacuate_test2, 3883}],
-        [emqx_eviction_agent]
+        Config,
+        NodeNames,
+        [emqx_conf, emqx, emqx_eviction_agent]
     ),
     ),
+    ok = snabbkaffe:start_trace(),
     [{evacuate_nodes, ClusterNodes} | Config];
     [{evacuate_nodes, ClusterNodes} | Config];
 start_slave(_Case, Config) ->
 start_slave(_Case, Config) ->
     Config.
     Config.
@@ -56,8 +75,7 @@ end_per_testcase(TestCase, Config) ->
 
 
 stop_slave(t_explicit_session_takeover, Config) ->
 stop_slave(t_explicit_session_takeover, Config) ->
     emqx_eviction_agent_test_helpers:stop_cluster(
     emqx_eviction_agent_test_helpers:stop_cluster(
-        ?config(evacuate_nodes, Config),
-        [emqx_eviction_agent]
+        ?config(evacuate_nodes, Config)
     );
     );
 stop_slave(_Case, _Config) ->
 stop_slave(_Case, _Config) ->
     ok.
     ok.
@@ -77,13 +95,16 @@ t_enable_disable(_Config) ->
     {ok, C0} = emqtt_connect(),
     {ok, C0} = emqtt_connect(),
     ok = emqtt:disconnect(C0),
     ok = emqtt:disconnect(C0),
 
 
+    %% Enable
     ok = emqx_eviction_agent:enable(test_eviction, undefined),
     ok = emqx_eviction_agent:enable(test_eviction, undefined),
 
 
+    %% Can't enable with different kind
     ?assertMatch(
     ?assertMatch(
         {error, eviction_agent_busy},
         {error, eviction_agent_busy},
         emqx_eviction_agent:enable(bar, undefined)
         emqx_eviction_agent:enable(bar, undefined)
     ),
     ),
 
 
+    %% Enable with the same kind but different server ref
     ?assertMatch(
     ?assertMatch(
         ok,
         ok,
         emqx_eviction_agent:enable(test_eviction, <<"srv">>)
         emqx_eviction_agent:enable(test_eviction, <<"srv">>)
@@ -99,6 +120,39 @@ t_enable_disable(_Config) ->
         emqtt_connect()
         emqtt_connect()
     ),
     ),
 
 
+    %% Enable with the same kind and server ref and explicit options
+    ?assertMatch(
+        ok,
+        emqx_eviction_agent:enable(test_eviction, <<"srv">>, #{allow_connections => false})
+    ),
+
+    ?assertMatch(
+        {enabled, #{}},
+        emqx_eviction_agent:status()
+    ),
+
+    ?assertMatch(
+        {error, {use_another_server, #{}}},
+        emqtt_connect()
+    ),
+
+    %% Enable with the same kind and server ref and permissive options
+    ?assertMatch(
+        ok,
+        emqx_eviction_agent:enable(test_eviction, <<"srv">>, #{allow_connections => true})
+    ),
+
+    ?assertMatch(
+        {enabled, #{}},
+        emqx_eviction_agent:status()
+    ),
+
+    ?assertMatch(
+        {ok, _},
+        emqtt_connect()
+    ),
+
+    %% Can't enable using different kind
     ?assertMatch(
     ?assertMatch(
         {error, eviction_agent_busy},
         {error, eviction_agent_busy},
         emqx_eviction_agent:disable(bar)
         emqx_eviction_agent:disable(bar)

+ 15 - 4
apps/emqx_eviction_agent/test/emqx_eviction_agent_api_SUITE.erl

@@ -22,12 +22,23 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
     emqx_common_test_helpers:all(?MODULE).
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
-    emqx_mgmt_api_test_util:init_suite([emqx_eviction_agent]),
-    Config.
+    Apps = emqx_cth_suite:start(
+        [
+            emqx,
+            emqx_eviction_agent,
+            emqx_management,
+            {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
+        ],
+        #{
+            work_dir => emqx_cth_suite:work_dir(Config)
+        }
+    ),
+    _ = emqx_common_test_http:create_default_app(),
+    [{apps, Apps} | Config].
 
 
 end_per_suite(Config) ->
 end_per_suite(Config) ->
-    emqx_mgmt_api_test_util:end_suite([emqx_eviction_agent]),
-    Config.
+    emqx_common_test_http:delete_default_app(),
+    emqx_cth_suite:stop(?config(apps, Config)).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Tests
 %% Tests

+ 13 - 5
apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl

@@ -22,12 +22,20 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
     emqx_common_test_helpers:all(?MODULE).
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
-    emqx_common_test_helpers:start_apps([emqx_conf, emqx_eviction_agent]),
-    {ok, _} = emqx:update_config([rpc, port_discovery], manual),
-    Config.
+    Apps = emqx_cth_suite:start(
+        [
+            emqx_conf,
+            emqx,
+            emqx_eviction_agent
+        ],
+        #{
+            work_dir => emqx_cth_suite:work_dir(Config)
+        }
+    ),
+    [{apps, Apps} | Config].
 
 
-end_per_suite(_Config) ->
-    emqx_common_test_helpers:stop_apps([emqx_eviction_agent, emqx_conf]).
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(apps, Config)).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Tests
 %% Tests

+ 12 - 4
apps/emqx_eviction_agent/test/emqx_eviction_agent_cli_SUITE.erl

@@ -14,13 +14,21 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
     emqx_common_test_helpers:all(?MODULE).
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
-    emqx_common_test_helpers:start_apps([emqx_eviction_agent]),
-    Config.
+    Apps = emqx_cth_suite:start(
+        [
+            emqx,
+            emqx_eviction_agent
+        ],
+        #{
+            work_dir => emqx_cth_suite:work_dir(Config)
+        }
+    ),
+    [{apps, Apps} | Config].
 
 
 end_per_suite(Config) ->
 end_per_suite(Config) ->
     _ = emqx_eviction_agent:disable(foo),
     _ = emqx_eviction_agent:disable(foo),
-    emqx_common_test_helpers:stop_apps([emqx_eviction_agent]),
-    Config.
+
+    emqx_cth_suite:stop(?config(apps, Config)).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Tests
 %% Tests

+ 34 - 48
apps/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl

@@ -15,13 +15,15 @@
 
 
     emqtt_try_connect/1,
     emqtt_try_connect/1,
 
 
-    start_cluster/2,
     start_cluster/3,
     start_cluster/3,
-    stop_cluster/2,
+    stop_cluster/1,
 
 
     case_specific_node_name/2,
     case_specific_node_name/2,
     case_specific_node_name/3,
     case_specific_node_name/3,
-    concat_atoms/1
+    concat_atoms/1,
+
+    get_mqtt_port/2,
+    nodes_with_mqtt_tcp_ports/1
 ]).
 ]).
 
 
 emqtt_connect() ->
 emqtt_connect() ->
@@ -83,52 +85,24 @@ emqtt_try_connect(Opts) ->
             Error
             Error
     end.
     end.
 
 
-start_cluster(NamesWithPorts, Apps) ->
-    start_cluster(NamesWithPorts, Apps, []).
-
-start_cluster(NamesWithPorts, Apps, Env) ->
-    Specs = lists:map(
-        fun({ShortName, Port}) ->
-            {core, ShortName, #{listener_ports => [{tcp, Port}]}}
-        end,
-        NamesWithPorts
-    ),
-    Opts0 = [
-        {env, Env},
-        {apps, Apps},
-        {conf,
-            [{[listeners, Proto, default, enable], false} || Proto <- [ssl, ws, wss]] ++
-                [{[rpc, mode], async}]}
-    ],
-    Cluster = emqx_common_test_helpers:emqx_cluster(
-        Specs,
-        Opts0
+start_cluster(Config, NodeNames = [Node1 | _], Apps) ->
+    Spec = #{
+        role => core,
+        join_to => emqx_cth_cluster:node_name(Node1),
+        listeners => true,
+        apps => Apps
+    },
+    Cluster = [{NodeName, Spec} || NodeName <- NodeNames],
+    ClusterNodes = emqx_cth_cluster:start(
+        Cluster,
+        %% Use Node1 to scope the work dirs for all the nodes
+        #{work_dir => emqx_cth_suite:work_dir(Node1, Config)}
     ),
     ),
-    NodesWithPorts = [
-        {
-            emqx_common_test_helpers:start_slave(Name, Opts),
-            proplists:get_value(Name, NamesWithPorts)
-        }
-     || {Name, Opts} <- Cluster
-    ],
-    NodesWithPorts.
-
-stop_cluster(NodesWithPorts, Apps) ->
-    lists:foreach(
-        fun({Node, _Port}) ->
-            lists:foreach(
-                fun(App) ->
-                    rpc:call(Node, application, stop, [App])
-                end,
-                Apps
-            ),
-            %% This sleep is just to make logs cleaner
-            ct:sleep(100),
-            _ = rpc:call(Node, emqx_common_test_helpers, stop_apps, []),
-            emqx_common_test_helpers:stop_slave(Node)
-        end,
-        NodesWithPorts
-    ).
+    nodes_with_mqtt_tcp_ports(ClusterNodes).
+
+stop_cluster(NamesWithPorts) ->
+    {Nodes, _Ports} = lists:unzip(NamesWithPorts),
+    ok = emqx_cth_cluster:stop(Nodes).
 
 
 case_specific_node_name(Module, Case) ->
 case_specific_node_name(Module, Case) ->
     concat_atoms([Module, '__', Case]).
     concat_atoms([Module, '__', Case]).
@@ -145,3 +119,15 @@ concat_atoms(Atoms) ->
             )
             )
         )
         )
     ).
     ).
+
+get_mqtt_port(Node, Type) ->
+    {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
+    Port.
+
+nodes_with_mqtt_tcp_ports(Nodes) ->
+    lists:map(
+        fun(Node) ->
+            {Node, get_mqtt_port(Node, tcp)}
+        end,
+        Nodes
+    ).

+ 3 - 2
apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src

@@ -1,11 +1,12 @@
 {application, emqx_node_rebalance, [
 {application, emqx_node_rebalance, [
     {description, "EMQX Node Rebalance"},
     {description, "EMQX Node Rebalance"},
-    {vsn, "5.0.6"},
+    {vsn, "5.0.7"},
     {registered, [
     {registered, [
         emqx_node_rebalance_sup,
         emqx_node_rebalance_sup,
         emqx_node_rebalance,
         emqx_node_rebalance,
         emqx_node_rebalance_agent,
         emqx_node_rebalance_agent,
-        emqx_node_rebalance_evacuation
+        emqx_node_rebalance_evacuation,
+        emqx_node_rebalance_purge
     ]},
     ]},
     {applications, [
     {applications, [
         kernel,
         kernel,

+ 12 - 4
apps/emqx_node_rebalance/src/emqx_node_rebalance.erl

@@ -41,6 +41,8 @@
     start_error/0
     start_error/0
 ]).
 ]).
 
 
+-define(ENABLE_KIND, ?MODULE).
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% APIs
 %% APIs
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -143,9 +145,13 @@ handle_event(
     state_timeout,
     state_timeout,
     evict_conns,
     evict_conns,
     wait_health_check,
     wait_health_check,
-    Data
+    #{donors := DonorNodes} = Data
 ) ->
 ) ->
     ?SLOG(warning, #{msg => "node_rebalance_wait_health_check_over"}),
     ?SLOG(warning, #{msg => "node_rebalance_wait_health_check_over"}),
+    _ = multicall(DonorNodes, enable_rebalance_agent, [
+        self(), ?ENABLE_KIND, #{allow_connections => false}
+    ]),
+    ?tp(debug, node_rebalance_enable_started_prohibiting, #{}),
     {next_state, evicting_conns, Data, [{state_timeout, 0, evict_conns}]};
     {next_state, evicting_conns, Data, [{state_timeout, 0, evict_conns}]};
 handle_event(
 handle_event(
     state_timeout,
     state_timeout,
@@ -232,7 +238,9 @@ enable_rebalance(#{opts := Opts} = Data) ->
         false ->
         false ->
             {error, nothing_to_balance};
             {error, nothing_to_balance};
         true ->
         true ->
-            _ = multicall(DonorNodes, enable_rebalance_agent, [self()]),
+            _ = multicall(DonorNodes, enable_rebalance_agent, [
+                self(), ?ENABLE_KIND, #{allow_connections => true}
+            ]),
             {ok, Data#{
             {ok, Data#{
                 donors => DonorNodes,
                 donors => DonorNodes,
                 recipients => RecipientNodes,
                 recipients => RecipientNodes,
@@ -242,7 +250,7 @@ enable_rebalance(#{opts := Opts} = Data) ->
     end.
     end.
 
 
 disable_rebalance(#{donors := DonorNodes}) ->
 disable_rebalance(#{donors := DonorNodes}) ->
-    _ = multicall(DonorNodes, disable_rebalance_agent, [self()]),
+    _ = multicall(DonorNodes, disable_rebalance_agent, [self(), ?ENABLE_KIND]),
     ok.
     ok.
 
 
 evict_conns(#{donors := DonorNodes, recipients := RecipientNodes, opts := Opts} = Data) ->
 evict_conns(#{donors := DonorNodes, recipients := RecipientNodes, opts := Opts} = Data) ->
@@ -370,7 +378,7 @@ avg(List) when length(List) >= 1 ->
     lists:sum(List) / length(List).
     lists:sum(List) / length(List).
 
 
 multicall(Nodes, F, A) ->
 multicall(Nodes, F, A) ->
-    case apply(emqx_node_rebalance_proto_v2, F, [Nodes | A]) of
+    case apply(emqx_node_rebalance_proto_v3, F, [Nodes | A]) of
         {Results, []} ->
         {Results, []} ->
             case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of
             case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of
                 {OkResults, []} ->
                 {OkResults, []} ->

+ 105 - 72
apps/emqx_node_rebalance/src/emqx_node_rebalance_agent.erl

@@ -11,10 +11,13 @@
 -include_lib("stdlib/include/qlc.hrl").
 -include_lib("stdlib/include/qlc.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 
+-behaviour(gen_statem).
+
 -export([
 -export([
     start_link/0,
     start_link/0,
     enable/1,
     enable/1,
     enable/2,
     enable/2,
+    enable/3,
     disable/1,
     disable/1,
     disable/2,
     disable/2,
     status/0
     status/0
@@ -22,13 +25,13 @@
 
 
 -export([
 -export([
     init/1,
     init/1,
-    handle_call/3,
-    handle_info/2,
-    handle_cast/2,
-    code_change/3
+    callback_mode/0,
+    handle_event/4,
+    code_change/4
 ]).
 ]).
 
 
 -define(ENABLE_KIND, emqx_node_rebalance).
 -define(ENABLE_KIND, emqx_node_rebalance).
+-define(SERVER_REFERENCE, undefined).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% APIs
 %% APIs
@@ -38,16 +41,21 @@
 
 
 -spec start_link() -> startlink_ret().
 -spec start_link() -> startlink_ret().
 start_link() ->
 start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+    gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 
 -spec enable(pid()) -> ok_or_error(already_enabled | eviction_agent_busy).
 -spec enable(pid()) -> ok_or_error(already_enabled | eviction_agent_busy).
 enable(CoordinatorPid) ->
 enable(CoordinatorPid) ->
     enable(CoordinatorPid, ?ENABLE_KIND).
     enable(CoordinatorPid, ?ENABLE_KIND).
 
 
 -spec enable(pid(), emqx_eviction_agent:kind()) ->
 -spec enable(pid(), emqx_eviction_agent:kind()) ->
-    ok_or_error(already_enabled | eviction_agent_busy).
+    ok_or_error(invalid_coordinator | eviction_agent_busy).
 enable(CoordinatorPid, Kind) ->
 enable(CoordinatorPid, Kind) ->
-    gen_server:call(?MODULE, {enable, CoordinatorPid, Kind}).
+    enable(CoordinatorPid, Kind, emqx_eviction_agent:default_options()).
+
+-spec enable(pid(), emqx_eviction_agent:kind(), emqx_eviction_agent:options()) ->
+    ok_or_error(invalid_coordinator | eviction_agent_busy).
+enable(CoordinatorPid, Kind, Options) ->
+    gen_statem:call(?MODULE, {enable, CoordinatorPid, Kind, Options}).
 
 
 -spec disable(pid()) -> ok_or_error(already_disabled | invalid_coordinator).
 -spec disable(pid()) -> ok_or_error(already_disabled | invalid_coordinator).
 disable(CoordinatorPid) ->
 disable(CoordinatorPid) ->
@@ -56,88 +64,113 @@ disable(CoordinatorPid) ->
 -spec disable(pid(), emqx_eviction_agent:kind()) ->
 -spec disable(pid(), emqx_eviction_agent:kind()) ->
     ok_or_error(already_disabled | invalid_coordinator).
     ok_or_error(already_disabled | invalid_coordinator).
 disable(CoordinatorPid, Kind) ->
 disable(CoordinatorPid, Kind) ->
-    gen_server:call(?MODULE, {disable, CoordinatorPid, Kind}).
+    gen_statem:call(?MODULE, {disable, CoordinatorPid, Kind}).
 
 
 -spec status() -> status().
 -spec status() -> status().
 status() ->
 status() ->
-    gen_server:call(?MODULE, status).
+    gen_statem:call(?MODULE, status).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
-%% gen_server callbacks
+%% gen_statem callbacks
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
+-define(disabled, disabled).
+-define(enabled(ST), {enabled, ST}).
+
+callback_mode() ->
+    handle_event_function.
+
 init([]) ->
 init([]) ->
-    {ok, #{}}.
-
-handle_call({enable, CoordinatorPid, Kind}, _From, St) ->
-    case St of
-        #{coordinator_pid := _Pid} ->
-            {reply, {error, already_enabled}, St};
-        _ ->
-            true = link(CoordinatorPid),
-            EvictionAgentPid = whereis(emqx_eviction_agent),
-            true = link(EvictionAgentPid),
-            case emqx_eviction_agent:enable(Kind, undefined) of
-                ok ->
-                    {reply, ok, #{
-                        coordinator_pid => CoordinatorPid,
-                        eviction_agent_pid => EvictionAgentPid
-                    }};
-                {error, eviction_agent_busy} ->
-                    true = unlink(EvictionAgentPid),
-                    true = unlink(CoordinatorPid),
-                    {reply, {error, eviction_agent_busy}, St}
-            end
-    end;
-handle_call({disable, CoordinatorPid, Kind}, _From, St) ->
-    case St of
-        #{
-            coordinator_pid := CoordinatorPid,
-            eviction_agent_pid := EvictionAgentPid
-        } ->
-            _ = emqx_eviction_agent:disable(Kind),
+    {ok, ?disabled, #{}}.
+
+%% disabled status
+
+%% disabled status, enable command
+handle_event({call, From}, {enable, CoordinatorPid, Kind, Options}, ?disabled, Data) ->
+    true = link(CoordinatorPid),
+    EvictionAgentPid = whereis(emqx_eviction_agent),
+    true = link(EvictionAgentPid),
+    case emqx_eviction_agent:enable(Kind, ?SERVER_REFERENCE, Options) of
+        ok ->
+            {next_state,
+                ?enabled(#{
+                    coordinator_pid => CoordinatorPid,
+                    eviction_agent_pid => EvictionAgentPid,
+                    kind => Kind
+                }), Data, {reply, From, ok}};
+        {error, eviction_agent_busy} ->
             true = unlink(EvictionAgentPid),
             true = unlink(EvictionAgentPid),
             true = unlink(CoordinatorPid),
             true = unlink(CoordinatorPid),
-            NewSt = maps:without(
-                [coordinator_pid, eviction_agent_pid],
-                St
-            ),
-            {reply, ok, NewSt};
-        #{coordinator_pid := _CoordinatorPid} ->
-            {reply, {error, invalid_coordinator}, St};
-        #{} ->
-            {reply, {error, already_disabled}, St}
+            {keep_state_and_data, {reply, From, {error, eviction_agent_busy}}}
     end;
     end;
-handle_call(status, _From, St) ->
-    case St of
-        #{coordinator_pid := Pid} ->
-            {reply, {enabled, Pid}, St};
-        _ ->
-            {reply, disabled, St}
-    end;
-handle_call(Msg, _From, St) ->
+%% disabled status, disable command
+handle_event({call, From}, {disable, _CoordinatorPid, _Kind}, ?disabled, _Data) ->
+    {keep_state_and_data, {reply, From, {error, already_disabled}}};
+%% disabled status, status command
+handle_event({call, From}, status, ?disabled, _Data) ->
+    {keep_state_and_data, {reply, From, disabled}};
+%% enabled status
+
+%% enabled status, enable command
+handle_event(
+    {call, From},
+    {enable, CoordinatorPid, Kind, Options},
+    ?enabled(#{
+        coordinator_pid := CoordinatorPid,
+        kind := Kind
+    }),
+    _Data
+) ->
+    %% just updating options
+    ok = emqx_eviction_agent:enable(Kind, ?SERVER_REFERENCE, Options),
+    {keep_state_and_data, {reply, From, ok}};
+handle_event({call, From}, {enable, _CoordinatorPid, _Kind, _Options}, ?enabled(_St), _Data) ->
+    {keep_state_and_data, {reply, From, {error, invalid_coordinator}}};
+%% enabled status, disable command
+handle_event(
+    {call, From},
+    {disable, CoordinatorPid, Kind},
+    ?enabled(#{
+        coordinator_pid := CoordinatorPid,
+        eviction_agent_pid := EvictionAgentPid
+    }),
+    Data
+) ->
+    _ = emqx_eviction_agent:disable(Kind),
+    true = unlink(EvictionAgentPid),
+    true = unlink(CoordinatorPid),
+    {next_state, ?disabled, Data, {reply, From, ok}};
+handle_event({call, From}, {disable, _CoordinatorPid, _Kind}, ?enabled(_St), _Data) ->
+    {keep_state_and_data, {reply, From, {error, invalid_coordinator}}};
+%% enabled status, status command
+handle_event({call, From}, status, ?enabled(#{coordinator_pid := CoordinatorPid}), _Data) ->
+    {keep_state_and_data, {reply, From, {enabled, CoordinatorPid}}};
+%% fallbacks
+
+handle_event({call, From}, Msg, State, Data) ->
     ?SLOG(warning, #{
     ?SLOG(warning, #{
         msg => "unknown_call",
         msg => "unknown_call",
         call => Msg,
         call => Msg,
-        state => St
-    }),
-    {reply, ignored, St}.
-
-handle_info(Msg, St) ->
-    ?SLOG(warning, #{
-        msg => "unknown_info",
-        info => Msg,
-        state => St
+        state => State,
+        data => Data
     }),
     }),
-    {noreply, St}.
-
-handle_cast(Msg, St) ->
+    {keep_state_and_data, {reply, From, ignored}};
+handle_event(cast, Msg, State, Data) ->
     ?SLOG(warning, #{
     ?SLOG(warning, #{
         msg => "unknown_cast",
         msg => "unknown_cast",
         cast => Msg,
         cast => Msg,
-        state => St
+        state => State,
+        data => Data
+    }),
+    keep_state_and_data;
+handle_event(info, Msg, State, Data) ->
+    ?SLOG(warning, #{
+        msg => "unknown_info",
+        info => Msg,
+        state => State,
+        data => Data
     }),
     }),
-    {noreply, St}.
+    keep_state_and_data.
 
 
-code_change(_Vsn, State, _Extra) ->
-    {ok, State}.
+code_change(_Vsn, State, Data, _Extra) ->
+    {ok, State, Data}.

+ 5 - 4
apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl

@@ -109,7 +109,8 @@ schema("/load_rebalance/availability_check") ->
             responses => #{
             responses => #{
                 200 => response_schema(),
                 200 => response_schema(),
                 503 => error_codes([?NODE_EVACUATING], <<"Node Evacuating">>)
                 503 => error_codes([?NODE_EVACUATING], <<"Node Evacuating">>)
-            }
+            },
+            security => []
         }
         }
     };
     };
 schema("/load_rebalance/:node/start") ->
 schema("/load_rebalance/:node/start") ->
@@ -248,10 +249,10 @@ schema("/load_rebalance/:node/evacuation/stop") ->
     }}.
     }}.
 
 
 '/load_rebalance/availability_check'(get, #{}) ->
 '/load_rebalance/availability_check'(get, #{}) ->
-    case emqx_node_rebalance_status:local_status() of
-        disabled ->
+    case emqx_node_rebalance_status:availability_status() of
+        available ->
             {200, #{}};
             {200, #{}};
-        _ ->
+        unavailable ->
             error_response(503, ?NODE_EVACUATING, <<"Node Evacuating">>)
             error_response(503, ?NODE_EVACUATING, <<"Node Evacuating">>)
     end.
     end.
 
 

+ 29 - 20
apps/emqx_node_rebalance/src/emqx_node_rebalance_evacuation.erl

@@ -57,7 +57,7 @@
     migrate_to => migrate_to(),
     migrate_to => migrate_to(),
     wait_health_check => number()
     wait_health_check => number()
 }.
 }.
--type start_error() :: already_started.
+-type start_error() :: already_started | eviction_agent_busy.
 -type stats() :: #{
 -type stats() :: #{
     initial_conns := non_neg_integer(),
     initial_conns := non_neg_integer(),
     initial_sessions := non_neg_integer(),
     initial_sessions := non_neg_integer(),
@@ -102,9 +102,9 @@ callback_mode() -> handle_event_function.
 
 
 init([]) ->
 init([]) ->
     case emqx_node_rebalance_evacuation_persist:read(default_opts()) of
     case emqx_node_rebalance_evacuation_persist:read(default_opts()) of
-        {ok, #{server_reference := ServerReference} = Opts} ->
+        {ok, Opts} ->
             ?SLOG(warning, #{msg => "restoring_evacuation_state", opts => Opts}),
             ?SLOG(warning, #{msg => "restoring_evacuation_state", opts => Opts}),
-            case emqx_eviction_agent:enable(?MODULE, ServerReference) of
+            case enable_eviction_agent(Opts, _AllowConnections = false) of
                 ok ->
                 ok ->
                     Data = init_data(#{}, Opts),
                     Data = init_data(#{}, Opts),
                     ok = warn_enabled(),
                     ok = warn_enabled(),
@@ -122,18 +122,26 @@ handle_event(
     {call, From},
     {call, From},
     {start, #{wait_health_check := WaitHealthCheck} = Opts},
     {start, #{wait_health_check := WaitHealthCheck} = Opts},
     disabled,
     disabled,
-    #{} = Data
+    Data
 ) ->
 ) ->
-    ?SLOG(warning, #{
-        msg => "node_evacuation_started",
-        opts => Opts
-    }),
-    NewData = init_data(Data, Opts),
-    ok = emqx_node_rebalance_evacuation_persist:save(Opts),
-    {next_state, waiting_health_check, NewData, [
-        {state_timeout, seconds(WaitHealthCheck), start_eviction},
-        {reply, From, ok}
-    ]};
+    case enable_eviction_agent(Opts, _AllowConnections = true) of
+        ok ->
+            ?SLOG(warning, #{
+                msg => "node_evacuation_started",
+                opts => Opts
+            }),
+            NewData = init_data(Data, Opts),
+            ok = emqx_node_rebalance_evacuation_persist:save(Opts),
+            {next_state, waiting_health_check, NewData, [
+                {state_timeout, seconds(WaitHealthCheck), start_eviction},
+                {reply, From, ok}
+            ]};
+        {error, eviction_agent_busy} ->
+            ?tp(warning, eviction_agent_busy, #{
+                data => Data
+            }),
+            {keep_state_and_data, [{reply, From, {error, eviction_agent_busy}}]}
+    end;
 handle_event({call, From}, {start, _Opts}, _State, #{}) ->
 handle_event({call, From}, {start, _Opts}, _State, #{}) ->
     {keep_state_and_data, [{reply, From, {error, already_started}}]};
     {keep_state_and_data, [{reply, From, {error, already_started}}]};
 %% stop
 %% stop
@@ -168,9 +176,9 @@ handle_event(
     state_timeout,
     state_timeout,
     start_eviction,
     start_eviction,
     waiting_health_check,
     waiting_health_check,
-    #{server_reference := ServerReference} = Data
+    Data
 ) ->
 ) ->
-    case emqx_eviction_agent:enable(?MODULE, ServerReference) of
+    case enable_eviction_agent(Data, _AllowConnections = false) of
         ok ->
         ok ->
             ?tp(debug, eviction_agent_started, #{
             ?tp(debug, eviction_agent_started, #{
                 data => Data
                 data => Data
@@ -178,10 +186,8 @@ handle_event(
             {next_state, evicting_conns, Data, [
             {next_state, evicting_conns, Data, [
                 {state_timeout, 0, evict_conns}
                 {state_timeout, 0, evict_conns}
             ]};
             ]};
+        %% This should never happen
         {error, eviction_agent_busy} ->
         {error, eviction_agent_busy} ->
-            ?tp(warning, eviction_agent_busy, #{
-                data => Data
-            }),
             {next_state, disabled, deinit(Data)}
             {next_state, disabled, deinit(Data)}
     end;
     end;
 %% conn eviction
 %% conn eviction
@@ -212,7 +218,7 @@ handle_event(
             NewData = Data#{current_conns => 0},
             NewData = Data#{current_conns => 0},
             ?SLOG(warning, #{msg => "node_evacuation_evict_conns_done"}),
             ?SLOG(warning, #{msg => "node_evacuation_evict_conns_done"}),
             {next_state, waiting_takeover, NewData, [
             {next_state, waiting_takeover, NewData, [
-                {state_timeout, timer:seconds(WaitTakeover), evict_sessions}
+                {state_timeout, seconds(WaitTakeover), evict_sessions}
             ]}
             ]}
     end;
     end;
 handle_event(
 handle_event(
@@ -308,6 +314,9 @@ deinit(Data) ->
             maps:keys(default_opts()),
             maps:keys(default_opts()),
     maps:without(Keys, Data).
     maps:without(Keys, Data).
 
 
+enable_eviction_agent(#{server_reference := ServerReference} = _Opts, AllowConnections) ->
+    emqx_eviction_agent:enable(?MODULE, ServerReference, #{allow_connections => AllowConnections}).
+
 warn_enabled() ->
 warn_enabled() ->
     ?SLOG(warning, #{msg => "node_evacuation_enabled"}),
     ?SLOG(warning, #{msg => "node_evacuation_enabled"}),
     io:format(
     io:format(

+ 1 - 1
apps/emqx_node_rebalance/src/emqx_node_rebalance_purge.erl

@@ -199,7 +199,7 @@ deinit(Data) ->
     maps:without(Keys, Data).
     maps:without(Keys, Data).
 
 
 multicall(Nodes, F, A) ->
 multicall(Nodes, F, A) ->
-    case apply(emqx_node_rebalance_proto_v2, F, [Nodes | A]) of
+    case apply(emqx_node_rebalance_proto_v3, F, [Nodes | A]) of
         {Results, []} ->
         {Results, []} ->
             case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of
             case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of
                 {_OkResults, []} ->
                 {_OkResults, []} ->

+ 8 - 0
apps/emqx_node_rebalance/src/emqx_node_rebalance_status.erl

@@ -5,6 +5,7 @@
 -module(emqx_node_rebalance_status).
 -module(emqx_node_rebalance_status).
 
 
 -export([
 -export([
+    availability_status/0,
     local_status/0,
     local_status/0,
     local_status/1,
     local_status/1,
     global_status/0,
     global_status/0,
@@ -23,6 +24,13 @@
 %% APIs
 %% APIs
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
+-spec availability_status() -> available | unavailable.
+availability_status() ->
+    case emqx_eviction_agent:enable_status() of
+        {enabled, _Kind, _ServerReference, _Options} -> unavailable;
+        disabled -> available
+    end.
+
 -spec local_status() -> disabled | {evacuation, map()} | {purge, map()} | {rebalance, map()}.
 -spec local_status() -> disabled | {evacuation, map()} | {purge, map()} | {rebalance, map()}.
 local_status() ->
 local_status() ->
     Checks = [
     Checks = [

+ 96 - 0
apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_proto_v3.erl

@@ -0,0 +1,96 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_node_rebalance_proto_v3).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+
+    available_nodes/1,
+    evict_connections/2,
+    evict_sessions/4,
+    connection_counts/1,
+    session_counts/1,
+    enable_rebalance_agent/2,
+    disable_rebalance_agent/2,
+    disconnected_session_counts/1,
+
+    %% Introduced in v2:
+    enable_rebalance_agent/3,
+    disable_rebalance_agent/3,
+    purge_sessions/2,
+
+    %% Introduced in v3:
+    enable_rebalance_agent/4
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+-include_lib("emqx/include/types.hrl").
+
+introduced_in() ->
+    "5.4.0".
+
+-spec available_nodes([node()]) -> emqx_rpc:multicall_result(node()).
+available_nodes(Nodes) ->
+    rpc:multicall(Nodes, emqx_node_rebalance, is_node_available, []).
+
+-spec evict_connections([node()], non_neg_integer()) ->
+    emqx_rpc:multicall_result(ok_or_error(disabled)).
+evict_connections(Nodes, Count) ->
+    rpc:multicall(Nodes, emqx_eviction_agent, evict_connections, [Count]).
+
+-spec evict_sessions([node()], non_neg_integer(), [node()], emqx_channel:conn_state()) ->
+    emqx_rpc:multicall_result(ok_or_error(disabled)).
+evict_sessions(Nodes, Count, RecipientNodes, ConnState) ->
+    rpc:multicall(Nodes, emqx_eviction_agent, evict_sessions, [Count, RecipientNodes, ConnState]).
+
+-spec connection_counts([node()]) -> emqx_rpc:multicall_result({ok, non_neg_integer()}).
+connection_counts(Nodes) ->
+    rpc:multicall(Nodes, emqx_node_rebalance, connection_count, []).
+
+-spec session_counts([node()]) -> emqx_rpc:multicall_result({ok, non_neg_integer()}).
+session_counts(Nodes) ->
+    rpc:multicall(Nodes, emqx_node_rebalance, session_count, []).
+
+-spec enable_rebalance_agent([node()], pid()) ->
+    emqx_rpc:multicall_result(ok_or_error(already_enabled | eviction_agent_busy)).
+enable_rebalance_agent(Nodes, OwnerPid) ->
+    rpc:multicall(Nodes, emqx_node_rebalance_agent, enable, [OwnerPid]).
+
+-spec disable_rebalance_agent([node()], pid()) ->
+    emqx_rpc:multicall_result(ok_or_error(already_disabled | invalid_coordinator)).
+disable_rebalance_agent(Nodes, OwnerPid) ->
+    rpc:multicall(Nodes, emqx_node_rebalance_agent, disable, [OwnerPid]).
+
+-spec disconnected_session_counts([node()]) -> emqx_rpc:multicall_result({ok, non_neg_integer()}).
+disconnected_session_counts(Nodes) ->
+    rpc:multicall(Nodes, emqx_node_rebalance, disconnected_session_count, []).
+
+%% Introduced in v2:
+
+-spec enable_rebalance_agent([node()], pid(), emqx_eviction_agent:kind()) ->
+    emqx_rpc:multicall_result(ok_or_error(already_enabled | eviction_agent_busy)).
+enable_rebalance_agent(Nodes, OwnerPid, Kind) ->
+    rpc:multicall(Nodes, emqx_node_rebalance_agent, enable, [OwnerPid, Kind]).
+
+-spec disable_rebalance_agent([node()], pid(), emqx_eviction_agent:kind()) ->
+    emqx_rpc:multicall_result(ok_or_error(already_disabled | invalid_coordinator)).
+disable_rebalance_agent(Nodes, OwnerPid, Kind) ->
+    rpc:multicall(Nodes, emqx_node_rebalance_agent, disable, [OwnerPid, Kind]).
+
+-spec purge_sessions([node()], non_neg_integer()) ->
+    emqx_rpc:multicall_result(ok_or_error(disabled)).
+purge_sessions(Nodes, Count) ->
+    rpc:multicall(Nodes, emqx_eviction_agent, purge_sessions, [Count]).
+
+%% Introduced in v3:
+
+-spec enable_rebalance_agent(
+    [node()], pid(), emqx_eviction_agent:kind(), emqx_eviction_agent:options()
+) ->
+    emqx_rpc:multicall_result(ok_or_error(eviction_agent_busy | invalid_coordinator)).
+enable_rebalance_agent(Nodes, OwnerPid, Kind, Options) ->
+    rpc:multicall(Nodes, emqx_node_rebalance_agent, enable, [OwnerPid, Kind, Options]).

+ 63 - 16
apps/emqx_node_rebalance/test/emqx_node_rebalance_SUITE.erl

@@ -16,39 +16,46 @@
 
 
 -import(
 -import(
     emqx_eviction_agent_test_helpers,
     emqx_eviction_agent_test_helpers,
-    [emqtt_connect_many/1, emqtt_connect_many/2, stop_many/1, case_specific_node_name/3]
+    [
+        emqtt_connect_many/1,
+        emqtt_connect_many/2,
+        emqtt_try_connect/1,
+        stop_many/1,
+        case_specific_node_name/3,
+        start_cluster/3,
+        stop_cluster/1
+    ]
 ).
 ).
 
 
--define(START_APPS, [emqx_eviction_agent, emqx_node_rebalance]).
-
 all() ->
 all() ->
     emqx_common_test_helpers:all(?MODULE).
     emqx_common_test_helpers:all(?MODULE).
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
-    ok = emqx_common_test_helpers:start_apps([]),
-    Config.
+    Apps = emqx_cth_suite:start([emqx], #{
+        work_dir => ?config(priv_dir, Config)
+    }),
+    [{apps, Apps} | Config].
 
 
-end_per_suite(_Config) ->
-    ok = emqx_common_test_helpers:stop_apps([]),
-    ok.
+end_per_suite(Config) ->
+    emqx_cth_suite:stop(?config(apps, Config)).
 
 
 init_per_testcase(Case, Config) ->
 init_per_testcase(Case, Config) ->
-    ClusterNodes = emqx_eviction_agent_test_helpers:start_cluster(
+    NodeNames =
         [
         [
-            {case_specific_node_name(?MODULE, Case, '_donor'), 2883},
-            {case_specific_node_name(?MODULE, Case, '_recipient'), 3883}
+            case_specific_node_name(?MODULE, Case, '_donor'),
+            case_specific_node_name(?MODULE, Case, '_recipient')
         ],
         ],
-        ?START_APPS
+    ClusterNodes = start_cluster(
+        Config,
+        NodeNames,
+        [emqx, emqx_eviction_agent, emqx_node_rebalance]
     ),
     ),
     ok = snabbkaffe:start_trace(),
     ok = snabbkaffe:start_trace(),
     [{cluster_nodes, ClusterNodes} | Config].
     [{cluster_nodes, ClusterNodes} | Config].
 
 
 end_per_testcase(_Case, Config) ->
 end_per_testcase(_Case, Config) ->
     ok = snabbkaffe:stop(),
     ok = snabbkaffe:stop(),
-    ok = emqx_eviction_agent_test_helpers:stop_cluster(
-        ?config(cluster_nodes, Config),
-        ?START_APPS
-    ).
+    stop_cluster(?config(cluster_nodes, Config)).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Tests
 %% Tests
@@ -227,3 +234,43 @@ t_available_nodes(Config) ->
             [[DonorNode, RecipientNode]]
             [[DonorNode, RecipientNode]]
         )
         )
     ).
     ).
+
+t_before_health_check_over(Config) ->
+    process_flag(trap_exit, true),
+
+    [{DonorNode, DonorPort}, {RecipientNode, _RecipientPort}] = ?config(cluster_nodes, Config),
+
+    Nodes = [DonorNode, RecipientNode],
+
+    Conns = emqtt_connect_many(DonorPort, 50),
+
+    Opts = #{
+        conn_evict_rate => 1,
+        sess_evict_rate => 1,
+        evict_interval => 1000,
+        abs_conn_threshold => 1,
+        abs_sess_threshold => 1,
+        rel_conn_threshold => 1.0,
+        rel_sess_threshold => 1.0,
+        wait_health_check => 2,
+        wait_takeover => 100,
+        nodes => Nodes
+    },
+
+    ?assertWaitEvent(
+        begin
+            ok = rpc:call(DonorNode, emqx_node_rebalance, start, [Opts]),
+            ?assertMatch(
+                ok,
+                emqtt_try_connect([{port, DonorPort}])
+            )
+        end,
+        #{?snk_kind := node_rebalance_enable_started_prohibiting},
+        5000
+    ),
+    ?assertMatch(
+        {error, {use_another_server, #{}}},
+        emqtt_try_connect([{port, DonorPort}])
+    ),
+
+    stop_many(Conns).

+ 23 - 15
apps/emqx_node_rebalance/test/emqx_node_rebalance_agent_SUITE.erl

@@ -38,12 +38,13 @@ groups() ->
     ].
     ].
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
-    ok = emqx_common_test_helpers:start_apps([emqx_eviction_agent, emqx_node_rebalance]),
-    Config.
+    Apps = emqx_cth_suite:start([emqx, emqx_eviction_agent, emqx_node_rebalance], #{
+        work_dir => ?config(priv_dir, Config)
+    }),
+    [{apps, Apps} | Config].
 
 
-end_per_suite(_Config) ->
-    ok = emqx_common_test_helpers:stop_apps([emqx_eviction_agent, emqx_node_rebalance]),
-    ok.
+end_per_suite(Config) ->
+    emqx_cth_suite:stop(?config(apps, Config)).
 
 
 init_per_group(local, Config) ->
 init_per_group(local, Config) ->
     [{cluster, false} | Config];
     [{cluster, false} | Config];
@@ -56,9 +57,13 @@ end_per_group(_Group, _Config) ->
 init_per_testcase(Case, Config) ->
 init_per_testcase(Case, Config) ->
     case ?config(cluster, Config) of
     case ?config(cluster, Config) of
         true ->
         true ->
-            ClusterNodes = emqx_eviction_agent_test_helpers:start_cluster(
-                [{case_specific_node_name(?MODULE, Case), 2883}],
-                [emqx_eviction_agent, emqx_node_rebalance]
+            ClusterNodes = emqx_cth_cluster:start(
+                [
+                    {case_specific_node_name(?MODULE, Case), #{
+                        apps => [emqx, emqx_eviction_agent, emqx_node_rebalance]
+                    }}
+                ],
+                #{work_dir => emqx_cth_suite:work_dir(Case, Config)}
             ),
             ),
             [{cluster_nodes, ClusterNodes} | Config];
             [{cluster_nodes, ClusterNodes} | Config];
         false ->
         false ->
@@ -68,10 +73,7 @@ init_per_testcase(Case, Config) ->
 end_per_testcase(_Case, Config) ->
 end_per_testcase(_Case, Config) ->
     case ?config(cluster, Config) of
     case ?config(cluster, Config) of
         true ->
         true ->
-            emqx_eviction_agent_test_helpers:stop_cluster(
-                ?config(cluster_nodes, Config),
-                [emqx_eviction_agent, emqx_node_rebalance]
-            );
+            emqx_cth_cluster:stop(?config(cluster_nodes, Config));
         false ->
         false ->
             ok
             ok
     end.
     end.
@@ -94,7 +96,13 @@ t_enable_disable(_Config) ->
     ),
     ),
 
 
     ?assertEqual(
     ?assertEqual(
-        {error, already_enabled},
+        {error, invalid_coordinator},
+        emqx_node_rebalance_agent:enable(self(), other_rebalance)
+    ),
+
+    %% Options update
+    ?assertEqual(
+        ok,
         emqx_node_rebalance_agent:enable(self())
         emqx_node_rebalance_agent:enable(self())
     ),
     ),
 
 
@@ -150,7 +158,7 @@ t_unknown_messages(_Config) ->
 t_rebalance_agent_coordinator_fail(Config) ->
 t_rebalance_agent_coordinator_fail(Config) ->
     process_flag(trap_exit, true),
     process_flag(trap_exit, true),
 
 
-    [{Node, _}] = ?config(cluster_nodes, Config),
+    [Node] = ?config(cluster_nodes, Config),
 
 
     CoordinatorPid = spawn_link(
     CoordinatorPid = spawn_link(
         fun() ->
         fun() ->
@@ -189,7 +197,7 @@ t_rebalance_agent_coordinator_fail(Config) ->
 t_rebalance_agent_fail(Config) ->
 t_rebalance_agent_fail(Config) ->
     process_flag(trap_exit, true),
     process_flag(trap_exit, true),
 
 
-    [{Node, _}] = ?config(cluster_nodes, Config),
+    [Node] = ?config(cluster_nodes, Config),
 
 
     CoordinatorPid = spawn_link(
     CoordinatorPid = spawn_link(
         fun() ->
         fun() ->

+ 16 - 14
apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl

@@ -13,6 +13,7 @@
 -import(
 -import(
     emqx_mgmt_api_test_util,
     emqx_mgmt_api_test_util,
     [
     [
+        request_api/3,
         request/2,
         request/2,
         request/3,
         request/3,
         uri/1
         uri/1
@@ -24,18 +25,17 @@
     [emqtt_connect_many/2, stop_many/1, case_specific_node_name/3]
     [emqtt_connect_many/2, stop_many/1, case_specific_node_name/3]
 ).
 ).
 
 
--define(START_APPS, [emqx_eviction_agent, emqx_node_rebalance]).
-
 all() ->
 all() ->
     emqx_common_test_helpers:all(?MODULE).
     emqx_common_test_helpers:all(?MODULE).
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
-    ok = emqx_common_test_helpers:start_apps(?START_APPS),
-    Config.
+    Apps = emqx_cth_suite:start([emqx, emqx_eviction_agent, emqx_node_rebalance], #{
+        work_dir => ?config(priv_dir, Config)
+    }),
+    [{apps, Apps} | Config].
 
 
-end_per_suite(_Config) ->
-    ok = emqx_common_test_helpers:stop_apps(?START_APPS),
-    ok.
+end_per_suite(Config) ->
+    emqx_cth_suite:stop(?config(apps, Config)).
 
 
 init_per_testcase(Case, Config) ->
 init_per_testcase(Case, Config) ->
     DonorNode = case_specific_node_name(?MODULE, Case, '_donor'),
     DonorNode = case_specific_node_name(?MODULE, Case, '_donor'),
@@ -57,7 +57,6 @@ init_per_testcase(Case, Config) ->
     [{cluster_nodes, ClusterNodes} | Config].
     [{cluster_nodes, ClusterNodes} | Config].
 end_per_testcase(_Case, Config) ->
 end_per_testcase(_Case, Config) ->
     Nodes = ?config(cluster_nodes, Config),
     Nodes = ?config(cluster_nodes, Config),
-    erpc:multicall(Nodes, meck, unload, []),
     _ = emqx_cth_cluster:stop(Nodes),
     _ = emqx_cth_cluster:stop(Nodes),
     ok.
     ok.
 
 
@@ -473,28 +472,31 @@ t_start_stop_rebalance(Config) ->
 t_availability_check(Config) ->
 t_availability_check(Config) ->
     [DonorNode | _] = ?config(cluster_nodes, Config),
     [DonorNode | _] = ?config(cluster_nodes, Config),
     ?assertMatch(
     ?assertMatch(
-        {ok, 200, #{}},
-        api_get(["load_rebalance", "availability_check"])
+        {ok, _},
+        api_get_noauth(["load_rebalance", "availability_check"])
     ),
     ),
 
 
     ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [#{}]),
     ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [#{}]),
 
 
     ?assertMatch(
     ?assertMatch(
-        {ok, 503, _},
-        api_get(["load_rebalance", "availability_check"])
+        {error, {_, 503, _}},
+        api_get_noauth(["load_rebalance", "availability_check"])
     ),
     ),
 
 
     ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, stop, []),
     ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, stop, []),
 
 
     ?assertMatch(
     ?assertMatch(
-        {ok, 200, #{}},
-        api_get(["load_rebalance", "availability_check"])
+        {ok, _},
+        api_get_noauth(["load_rebalance", "availability_check"])
     ).
     ).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Helpers
 %% Helpers
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
+api_get_noauth(Path) ->
+    request_api(get, uri(Path), emqx_common_test_http:auth_header("invalid", "password")).
+
 api_get(Path) ->
 api_get(Path) ->
     case request(get, uri(Path)) of
     case request(get, uri(Path)) of
         {ok, Code, ResponseBody} ->
         {ok, Code, ResponseBody} ->

+ 65 - 55
apps/emqx_node_rebalance/test/emqx_node_rebalance_cli_SUITE.erl

@@ -15,27 +15,38 @@
     [emqtt_connect_many/2, stop_many/1, case_specific_node_name/3]
     [emqtt_connect_many/2, stop_many/1, case_specific_node_name/3]
 ).
 ).
 
 
--define(START_APPS, [emqx_eviction_agent, emqx_node_rebalance]).
+-define(START_APPS, [emqx, emqx_eviction_agent, emqx_node_rebalance]).
 
 
 all() ->
 all() ->
     emqx_common_test_helpers:all(?MODULE).
     emqx_common_test_helpers:all(?MODULE).
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
-    emqx_common_test_helpers:start_apps(?START_APPS),
-    Config.
+    Apps = emqx_cth_suite:start(?START_APPS, #{
+        work_dir => ?config(priv_dir, Config)
+    }),
+    [{apps, Apps} | Config].
 
 
 end_per_suite(Config) ->
 end_per_suite(Config) ->
-    emqx_common_test_helpers:stop_apps(lists:reverse(?START_APPS)),
-    Config.
+    emqx_cth_suite:stop(?config(apps, Config)).
 
 
 init_per_testcase(Case = t_rebalance, Config) ->
 init_per_testcase(Case = t_rebalance, Config) ->
     _ = emqx_node_rebalance_evacuation:stop(),
     _ = emqx_node_rebalance_evacuation:stop(),
-    ClusterNodes = emqx_eviction_agent_test_helpers:start_cluster(
+    Nodes =
+        [Node1 | _] =
         [
         [
-            {case_specific_node_name(?MODULE, Case, '_donor'), 2883},
-            {case_specific_node_name(?MODULE, Case, '_recipient'), 3883}
+            case_specific_node_name(?MODULE, Case, '_1'),
+            case_specific_node_name(?MODULE, Case, '_2')
         ],
         ],
-        ?START_APPS
+    Spec = #{
+        role => core,
+        join_to => emqx_cth_cluster:node_name(Node1),
+        listeners => true,
+        apps => ?START_APPS
+    },
+    Cluster = [{Node, Spec} || Node <- Nodes],
+    ClusterNodes = emqx_cth_cluster:start(
+        Cluster,
+        #{work_dir => emqx_cth_suite:work_dir(Case, Config)}
     ),
     ),
     [{cluster_nodes, ClusterNodes} | Config];
     [{cluster_nodes, ClusterNodes} | Config];
 init_per_testcase(_Case, Config) ->
 init_per_testcase(_Case, Config) ->
@@ -46,10 +57,7 @@ init_per_testcase(_Case, Config) ->
 end_per_testcase(t_rebalance, Config) ->
 end_per_testcase(t_rebalance, Config) ->
     _ = emqx_node_rebalance_evacuation:stop(),
     _ = emqx_node_rebalance_evacuation:stop(),
     _ = emqx_node_rebalance:stop(),
     _ = emqx_node_rebalance:stop(),
-    _ = emqx_eviction_agent_test_helpers:stop_cluster(
-        ?config(cluster_nodes, Config),
-        ?START_APPS
-    );
+    _ = emqx_cth_cluster:stop(?config(cluster_nodes, Config));
 end_per_testcase(_Case, _Config) ->
 end_per_testcase(_Case, _Config) ->
     _ = emqx_node_rebalance_evacuation:stop(),
     _ = emqx_node_rebalance_evacuation:stop(),
     _ = emqx_node_rebalance:stop().
     _ = emqx_node_rebalance:stop().
@@ -157,6 +165,8 @@ t_evacuation(_Config) ->
     ).
     ).
 
 
 t_purge(_Config) ->
 t_purge(_Config) ->
+    process_flag(trap_exit, true),
+
     %% start with invalid args
     %% start with invalid args
     ?assertNot(
     ?assertNot(
         emqx_node_rebalance_cli:cli(["start", "--purge", "--foo-bar"])
         emqx_node_rebalance_cli:cli(["start", "--purge", "--foo-bar"])
@@ -187,40 +197,44 @@ t_purge(_Config) ->
             atom_to_list(node())
             atom_to_list(node())
         ])
         ])
     ),
     ),
-    with_some_sessions(fun() ->
-        ?assert(
-            emqx_node_rebalance_cli:cli([
-                "start",
-                "--purge",
-                "--purge-rate",
-                "10"
-            ])
-        ),
-
-        %% status
-        ok = emqx_node_rebalance_cli:cli(["status"]),
-        ok = emqx_node_rebalance_cli:cli(["node-status"]),
-        ok = emqx_node_rebalance_cli:cli(["node-status", atom_to_list(node())]),
-
-        ?assertMatch(
-            {enabled, #{}},
-            emqx_node_rebalance_purge:status()
-        ),
-
-        %% already enabled
-        ?assertNot(
-            emqx_node_rebalance_cli:cli([
-                "start",
-                "--purge",
-                "--purge-rate",
-                "10"
-            ])
-        ),
-        true = emqx_node_rebalance_cli:cli(["stop"]),
-        ok
-    end),
+
+    Conns = emqtt_connect_many(get_mqtt_port(node(), tcp), 100),
+
+    ?assert(
+        emqx_node_rebalance_cli:cli([
+            "start",
+            "--purge",
+            "--purge-rate",
+            "10"
+        ])
+    ),
+
+    %% status
+    ok = emqx_node_rebalance_cli:cli(["status"]),
+    ok = emqx_node_rebalance_cli:cli(["node-status"]),
+    ok = emqx_node_rebalance_cli:cli(["node-status", atom_to_list(node())]),
+
+    ?assertMatch(
+        {enabled, #{}},
+        emqx_node_rebalance_purge:status()
+    ),
+
+    %% already enabled
+    ?assertNot(
+        emqx_node_rebalance_cli:cli([
+            "start",
+            "--purge",
+            "--purge-rate",
+            "10"
+        ])
+    ),
+
     %% stop
     %% stop
 
 
+    true = emqx_node_rebalance_cli:cli(["stop"]),
+
+    %% stop when not started
+
     false = emqx_node_rebalance_cli:cli(["stop"]),
     false = emqx_node_rebalance_cli:cli(["stop"]),
 
 
     ?assertEqual(
     ?assertEqual(
@@ -228,12 +242,13 @@ t_purge(_Config) ->
         emqx_node_rebalance_purge:status()
         emqx_node_rebalance_purge:status()
     ),
     ),
 
 
-    ok.
+    ok = stop_many(Conns).
 
 
 t_rebalance(Config) ->
 t_rebalance(Config) ->
     process_flag(trap_exit, true),
     process_flag(trap_exit, true),
 
 
-    [{DonorNode, DonorPort}, {RecipientNode, _}] = ?config(cluster_nodes, Config),
+    [DonorNode, RecipientNode] = ?config(cluster_nodes, Config),
+    DonorPort = get_mqtt_port(DonorNode, tcp),
 
 
     %% start with invalid args
     %% start with invalid args
     ?assertNot(
     ?assertNot(
@@ -364,11 +379,6 @@ emqx_node_rebalance_cli(Node, Args) ->
             Result
             Result
     end.
     end.
 
 
-%% to avoid it finishing too fast
-with_some_sessions(Fn) ->
-    emqx_common_test_helpers:with_mock(
-        emqx_eviction_agent,
-        all_channels_count,
-        fun() -> 100 end,
-        Fn
-    ).
+get_mqtt_port(Node, Type) ->
+    {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
+    Port.

+ 29 - 24
apps/emqx_node_rebalance/test/emqx_node_rebalance_evacuation_SUITE.erl

@@ -15,7 +15,13 @@
 
 
 -import(
 -import(
     emqx_eviction_agent_test_helpers,
     emqx_eviction_agent_test_helpers,
-    [emqtt_connect/1, emqtt_try_connect/1, case_specific_node_name/3]
+    [
+        emqtt_connect/1,
+        emqtt_try_connect/1,
+        case_specific_node_name/3,
+        start_cluster/3,
+        stop_cluster/1
+    ]
 ).
 ).
 
 
 all() -> [{group, one_node}, {group, two_node}].
 all() -> [{group, one_node}, {group, two_node}].
@@ -37,12 +43,13 @@ one_node_cases() ->
     emqx_common_test_helpers:all(?MODULE) -- two_node_cases().
     emqx_common_test_helpers:all(?MODULE) -- two_node_cases().
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
-    ok = emqx_common_test_helpers:start_apps([]),
-    Config.
+    Apps = emqx_cth_suite:start([emqx], #{
+        work_dir => ?config(priv_dir, Config)
+    }),
+    [{apps, Apps} | Config].
 
 
-end_per_suite(_Config) ->
-    ok = emqx_common_test_helpers:stop_apps([]),
-    ok.
+end_per_suite(Config) ->
+    emqx_cth_suite:stop(?config(apps, Config)).
 
 
 init_per_group(one_node, Config) ->
 init_per_group(one_node, Config) ->
     [{cluster_type, one_node} | Config];
     [{cluster_type, one_node} | Config];
@@ -53,30 +60,23 @@ end_per_group(_Group, _Config) ->
     ok.
     ok.
 
 
 init_per_testcase(Case, Config) ->
 init_per_testcase(Case, Config) ->
-    NodesWithPorts =
+    NodeNames =
         case ?config(cluster_type, Config) of
         case ?config(cluster_type, Config) of
             one_node ->
             one_node ->
-                [{case_specific_node_name(?MODULE, Case, '_evacuated'), 2883}];
+                [case_specific_node_name(?MODULE, Case, '_evacuated')];
             two_node ->
             two_node ->
                 [
                 [
-                    {case_specific_node_name(?MODULE, Case, '_evacuated'), 2883},
-                    {case_specific_node_name(?MODULE, Case, '_recipient'), 3883}
+                    case_specific_node_name(?MODULE, Case, '_evacuated'),
+                    case_specific_node_name(?MODULE, Case, '_recipient')
                 ]
                 ]
         end,
         end,
-    ClusterNodes = emqx_eviction_agent_test_helpers:start_cluster(
-        NodesWithPorts,
-        [emqx_eviction_agent, emqx_node_rebalance],
-        [{emqx, data_dir, case_specific_data_dir(Case, Config)}]
-    ),
+    ClusterNodes = start_cluster(Config, NodeNames, [emqx, emqx_eviction_agent, emqx_node_rebalance]),
     ok = snabbkaffe:start_trace(),
     ok = snabbkaffe:start_trace(),
     [{cluster_nodes, ClusterNodes} | Config].
     [{cluster_nodes, ClusterNodes} | Config].
 
 
 end_per_testcase(_Case, Config) ->
 end_per_testcase(_Case, Config) ->
     ok = snabbkaffe:stop(),
     ok = snabbkaffe:stop(),
-    ok = emqx_eviction_agent_test_helpers:stop_cluster(
-        ?config(cluster_nodes, Config),
-        [emqx_eviction_agent, emqx_node_rebalance]
-    ).
+    stop_cluster(?config(cluster_nodes, Config)).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Tests
 %% Tests
@@ -89,10 +89,9 @@ t_agent_busy(Config) ->
 
 
     ok = rpc:call(DonorNode, emqx_eviction_agent, enable, [other_rebalance, undefined]),
     ok = rpc:call(DonorNode, emqx_eviction_agent, enable, [other_rebalance, undefined]),
 
 
-    ?assertWaitEvent(
-        rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
-        #{?snk_kind := eviction_agent_busy},
-        5000
+    ?assertEqual(
+        {error, eviction_agent_busy},
+        rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)])
     ).
     ).
 
 
 t_already_started(Config) ->
 t_already_started(Config) ->
@@ -118,7 +117,13 @@ t_start(Config) ->
     [{DonorNode, DonorPort}] = ?config(cluster_nodes, Config),
     [{DonorNode, DonorPort}] = ?config(cluster_nodes, Config),
 
 
     ?assertWaitEvent(
     ?assertWaitEvent(
-        rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
+        begin
+            rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
+            ?assertMatch(
+                ok,
+                emqtt_try_connect([{port, DonorPort}])
+            )
+        end,
         #{?snk_kind := eviction_agent_started},
         #{?snk_kind := eviction_agent_started},
         5000
         5000
     ),
     ),

+ 18 - 18
apps/emqx_node_rebalance/test/emqx_node_rebalance_purge_SUITE.erl

@@ -18,7 +18,9 @@
     [
     [
         emqtt_connect/1,
         emqtt_connect/1,
         emqtt_try_connect/1,
         emqtt_try_connect/1,
-        case_specific_node_name/3
+        case_specific_node_name/3,
+        stop_many/1,
+        get_mqtt_port/2
     ]
     ]
 ).
 ).
 
 
@@ -41,11 +43,13 @@ one_node_cases() ->
     emqx_common_test_helpers:all(?MODULE) -- two_nodes_cases().
     emqx_common_test_helpers:all(?MODULE) -- two_nodes_cases().
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
-    ok = emqx_common_test_helpers:start_apps([]),
-    Config.
+    Apps = emqx_cth_suite:start([emqx], #{
+        work_dir => ?config(priv_dir, Config)
+    }),
+    [{apps, Apps} | Config].
 
 
-end_per_suite(_Config) ->
-    ok = emqx_common_test_helpers:stop_apps([]),
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(apps, Config)),
     ok.
     ok.
 
 
 init_per_group(one_node, Config) ->
 init_per_group(one_node, Config) ->
@@ -78,7 +82,7 @@ init_per_testcase(TestCase, Config) ->
     Cluster = [{Node, Spec} || Node <- Nodes],
     Cluster = [{Node, Spec} || Node <- Nodes],
     ClusterNodes = emqx_cth_cluster:start(
     ClusterNodes = emqx_cth_cluster:start(
         Cluster,
         Cluster,
-        #{work_dir => ?config(priv_dir, Config)}
+        #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
     ),
     ),
     ok = snabbkaffe:start_trace(),
     ok = snabbkaffe:start_trace(),
     [{cluster_nodes, ClusterNodes} | Config].
     [{cluster_nodes, ClusterNodes} | Config].
@@ -128,20 +132,12 @@ case_specific_data_dir(Case, Config) ->
         PrivDir -> filename:join(PrivDir, atom_to_list(Case))
         PrivDir -> filename:join(PrivDir, atom_to_list(Case))
     end.
     end.
 
 
-get_mqtt_port(Node, Type) ->
-    {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
-    Port.
-
 %% to avoid it finishing too fast
 %% to avoid it finishing too fast
 with_some_sessions(Node, Fn) ->
 with_some_sessions(Node, Fn) ->
-    erpc:call(Node, fun() ->
-        emqx_common_test_helpers:with_mock(
-            emqx_eviction_agent,
-            all_channels_count,
-            fun() -> 100 end,
-            Fn
-        )
-    end).
+    Port = get_mqtt_port(Node, tcp),
+    Conns = emqtt_connect_many(Port, 100),
+    _ = erpc:call(Node, Fn),
+    ok = stop_many(Conns).
 
 
 drain_exits([ClientPid | Rest]) ->
 drain_exits([ClientPid | Rest]) ->
     receive
     receive
@@ -189,6 +185,7 @@ t_agent_busy(Config) ->
     ok.
     ok.
 
 
 t_already_started(Config) ->
 t_already_started(Config) ->
+    process_flag(trap_exit, true),
     [Node] = ?config(cluster_nodes, Config),
     [Node] = ?config(cluster_nodes, Config),
     with_some_sessions(Node, fun() ->
     with_some_sessions(Node, fun() ->
         ok = emqx_node_rebalance_purge:start(opts(Config)),
         ok = emqx_node_rebalance_purge:start(opts(Config)),
@@ -216,6 +213,7 @@ t_not_started(Config) ->
     ).
     ).
 
 
 t_start(Config) ->
 t_start(Config) ->
+    process_flag(trap_exit, true),
     [Node] = ?config(cluster_nodes, Config),
     [Node] = ?config(cluster_nodes, Config),
     Port = get_mqtt_port(Node, tcp),
     Port = get_mqtt_port(Node, tcp),
 
 
@@ -233,6 +231,7 @@ t_start(Config) ->
     ok.
     ok.
 
 
 t_non_persistence(Config) ->
 t_non_persistence(Config) ->
+    process_flag(trap_exit, true),
     [Node] = ?config(cluster_nodes, Config),
     [Node] = ?config(cluster_nodes, Config),
     Port = get_mqtt_port(Node, tcp),
     Port = get_mqtt_port(Node, tcp),
 
 
@@ -284,6 +283,7 @@ t_unknown_messages(Config) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 t_already_started_two(Config) ->
 t_already_started_two(Config) ->
+    process_flag(trap_exit, true),
     [Node1, _Node2] = ?config(cluster_nodes, Config),
     [Node1, _Node2] = ?config(cluster_nodes, Config),
     with_some_sessions(Node1, fun() ->
     with_some_sessions(Node1, fun() ->
         ok = emqx_node_rebalance_purge:start(opts(Config)),
         ok = emqx_node_rebalance_purge:start(opts(Config)),

+ 1 - 0
apps/emqx_node_rebalance/test/emqx_node_rebalance_status_SUITE.erl

@@ -32,6 +32,7 @@ init_per_suite(Config) ->
     Apps = [
     Apps = [
         emqx_conf,
         emqx_conf,
         emqx,
         emqx,
+        emqx_eviction_agent,
         emqx_node_rebalance
         emqx_node_rebalance
     ],
     ],
     Cluster = [
     Cluster = [

+ 4 - 0
changes/ee/feat-11971.en.md

@@ -0,0 +1,4 @@
+Made `/api/v5/load_rebalance/availability_check` public, i.e. not requiring authentication. This simplifies load balancer setup.
+
+Made rebalance/evacuation more graceful during the wait health check phase. The connections to nodes marked for eviction are now not prohibited during this phase.
+During this phase it is unknown whether these nodes are all marked unhealthy by the load balancer, so prohibiting connections to them may cause multiple unssuccessful attempts to reconnect.