Просмотр исходного кода

fix(rebalance): add wait_health_check timeout to node evacuation

Co-authored-by: Thales Macedo Garitezi <thalesmg@gmail.com>
Ilya Averyanov 2 лет назад
Родитель
Сommit
7f2de66dab

+ 3 - 3
apps/emqx_node_rebalance/src/emqx_node_rebalance.erl

@@ -48,8 +48,8 @@
 -type start_opts() :: #{
     conn_evict_rate => pos_integer(),
     sess_evict_rate => pos_integer(),
-    wait_health_check => pos_integer(),
-    wait_takeover => pos_integer(),
+    wait_health_check => number(),
+    wait_takeover => number(),
     abs_conn_threshold => pos_integer(),
     rel_conn_threshold => number(),
     abs_sess_threshold => pos_integer(),
@@ -438,7 +438,7 @@ is_node_available() ->
     node().
 
 all_nodes() ->
-    mria_mnesia:running_nodes().
+    emqx:running_nodes().
 
 seconds(Sec) ->
     round(timer:seconds(Sec)).

+ 11 - 2
apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl

@@ -202,10 +202,10 @@ schema("/load_rebalance/:node/evacuation/stop") ->
     }}.
 
 '/load_rebalance/availability_check'(get, #{}) ->
-    case emqx_eviction_agent:status() of
+    case emqx_node_rebalance_status:local_status() of
         disabled ->
             {200, #{}};
-        {enabled, _Stats} ->
+        _ ->
             error_response(503, ?NODE_EVACUATING, <<"Node Evacuating">>)
     end.
 
@@ -434,6 +434,14 @@ fields(rebalance_start) ->
     ];
 fields(rebalance_evacuation_start) ->
     [
+        {"wait_health_check",
+            mk(
+                emqx_schema:timeout_duration_s(),
+                #{
+                    desc => ?DESC(wait_health_check),
+                    required => false
+                }
+            )},
         {"conn_evict_rate",
             mk(
                 pos_integer(),
@@ -714,6 +722,7 @@ rebalance_example() ->
 
 rebalance_evacuation_example() ->
     #{
+        wait_health_check => 10,
         conn_evict_rate => 100,
         sess_evict_rate => 100,
         redirect_to => <<"othernode:1883">>,

+ 5 - 2
apps/emqx_node_rebalance/src/emqx_node_rebalance_cli.erl

@@ -103,6 +103,7 @@ cli(_) ->
         [
             {
                 "rebalance start --evacuation \\\n"
+                "    [--wait-health-check Secs] \\\n"
                 "    [--redirect-to \"Host1:Port1 Host2:Port2 ...\"] \\\n"
                 "    [--conn-evict-rate CountPerSec] \\\n"
                 "    [--migrate-to \"node1@host1 node2@host2 ...\"] \\\n"
@@ -182,8 +183,6 @@ collect_args(["--migrate-to", MigrateTo | Args], Map) ->
 %% rebalance
 collect_args(["--nodes", Nodes | Args], Map) ->
     collect_args(Args, Map#{"--nodes" => Nodes});
-collect_args(["--wait-health-check", WaitHealthCheck | Args], Map) ->
-    collect_args(Args, Map#{"--wait-health-check" => WaitHealthCheck});
 collect_args(["--abs-conn-threshold", AbsConnThres | Args], Map) ->
     collect_args(Args, Map#{"--abs-conn-threshold" => AbsConnThres});
 collect_args(["--rel-conn-threshold", RelConnThres | Args], Map) ->
@@ -193,6 +192,8 @@ collect_args(["--abs-sess-threshold", AbsSessThres | Args], Map) ->
 collect_args(["--rel-sess-threshold", RelSessThres | Args], Map) ->
     collect_args(Args, Map#{"--rel-sess-threshold" => RelSessThres});
 %% common
+collect_args(["--wait-health-check", WaitHealthCheck | Args], Map) ->
+    collect_args(Args, Map#{"--wait-health-check" => WaitHealthCheck});
 collect_args(["--conn-evict-rate", ConnEvictRate | Args], Map) ->
     collect_args(Args, Map#{"--conn-evict-rate" => ConnEvictRate});
 collect_args(["--wait-takeover", WaitTakeover | Args], Map) ->
@@ -207,6 +208,8 @@ validate_evacuation([], Map) ->
     {ok, Map};
 validate_evacuation([{"--evacuation", _} | Rest], Map) ->
     validate_evacuation(Rest, Map);
+validate_evacuation([{"--wait-health-check", _} | _] = Opts, Map) ->
+    validate_pos_int(wait_health_check, Opts, Map, fun validate_evacuation/2);
 validate_evacuation([{"--redirect-to", ServerReference} | Rest], Map) ->
     validate_evacuation(Rest, Map#{server_reference => list_to_binary(ServerReference)});
 validate_evacuation([{"--conn-evict-rate", _} | _] = Opts, Map) ->

+ 44 - 22
apps/emqx_node_rebalance/src/emqx_node_rebalance_evacuation.erl

@@ -53,10 +53,11 @@
     server_reference => emqx_eviction_agent:server_reference(),
     conn_evict_rate => pos_integer(),
     sess_evict_rate => pos_integer(),
-    wait_takeover => pos_integer(),
-    migrate_to => migrate_to()
+    wait_takeover => number(),
+    migrate_to => migrate_to(),
+    wait_health_check => number()
 }.
--type start_error() :: already_started | eviction_agent_busy.
+-type start_error() :: already_started.
 -type stats() :: #{
     initial_conns := non_neg_integer(),
     initial_sessions := non_neg_integer(),
@@ -97,7 +98,7 @@ available_nodes(Nodes) when is_list(Nodes) ->
 
 callback_mode() -> handle_event_function.
 
-%% states: disabled, evicting_conns, waiting_takeover, evicting_sessions, prohibiting
+%% states: disabled, waiting_health_check, evicting_conns, waiting_takeover, evicting_sessions, prohibiting
 
 init([]) ->
     case emqx_node_rebalance_evacuation_persist:read(default_opts()) of
@@ -119,25 +120,20 @@ init([]) ->
 %% start
 handle_event(
     {call, From},
-    {start, #{server_reference := ServerReference} = Opts},
+    {start, #{wait_health_check := WaitHealthCheck} = Opts},
     disabled,
     #{} = Data
 ) ->
-    case emqx_eviction_agent:enable(?MODULE, ServerReference) of
-        ok ->
-            NewData = init_data(Data, Opts),
-            ok = emqx_node_rebalance_evacuation_persist:save(Opts),
-            ?SLOG(warning, #{
-                msg => "node_evacuation_started",
-                opts => Opts
-            }),
-            {next_state, evicting_conns, NewData, [
-                {state_timeout, 0, evict_conns},
-                {reply, From, ok}
-            ]};
-        {error, eviction_agent_busy} ->
-            {keep_state_and_data, [{reply, From, {error, eviction_agent_busy}}]}
-    end;
+    ?SLOG(warning, #{
+        msg => "node_evacuation_started",
+        opts => Opts
+    }),
+    NewData = init_data(Data, Opts),
+    ok = emqx_node_rebalance_evacuation_persist:save(Opts),
+    {next_state, waiting_health_check, NewData, [
+        {state_timeout, seconds(WaitHealthCheck), start_eviction},
+        {reply, From, ok}
+    ]};
 handle_event({call, From}, {start, _Opts}, _State, #{}) ->
     {keep_state_and_data, [{reply, From, {error, already_started}}]};
 %% stop
@@ -167,6 +163,27 @@ handle_event({call, From}, status, State, #{migrate_to := MigrateTo} = Data) ->
     {keep_state_and_data, [
         {reply, From, {enabled, Stats#{state => State, migrate_to => migrate_to(MigrateTo)}}}
     ]};
+%% start eviction
+handle_event(
+    state_timeout,
+    start_eviction,
+    waiting_health_check,
+    #{server_reference := ServerReference} = Data
+) ->
+    case emqx_eviction_agent:enable(?MODULE, ServerReference) of
+        ok ->
+            ?tp(debug, eviction_agent_started, #{
+                data => Data
+            }),
+            {next_state, evicting_conns, Data, [
+                {state_timeout, 0, evict_conns}
+            ]};
+        {error, eviction_agent_busy} ->
+            ?tp(warning, eviction_agent_busy, #{
+                data => Data
+            }),
+            {next_state, disabled, deinit(Data)}
+    end;
 %% conn eviction
 handle_event(
     state_timeout,
@@ -270,12 +287,14 @@ default_opts() ->
         conn_evict_rate => ?DEFAULT_CONN_EVICT_RATE,
         sess_evict_rate => ?DEFAULT_SESS_EVICT_RATE,
         wait_takeover => ?DEFAULT_WAIT_TAKEOVER,
+        wait_health_check => ?DEFAULT_WAIT_HEALTH_CHECK,
         migrate_to => undefined
     }.
 
 init_data(Data0, Opts) ->
     Data1 = maps:merge(Data0, Opts),
-    {enabled, #{connections := ConnCount, sessions := SessCount}} = emqx_eviction_agent:status(),
+    ConnCount = emqx_eviction_agent:connection_count(),
+    SessCount = emqx_eviction_agent:session_count(),
     Data1#{
         initial_conns => ConnCount,
         current_conns => ConnCount,
@@ -305,4 +324,7 @@ is_node_available() ->
     node().
 
 all_nodes() ->
-    mria_mnesia:running_nodes() -- [node()].
+    emqx:running_nodes() -- [node()].
+
+seconds(Sec) ->
+    round(timer:seconds(Sec)).

+ 5 - 13
apps/emqx_node_rebalance/src/emqx_node_rebalance_evacuation_persist.erl

@@ -21,24 +21,16 @@
 %% APIs
 %%--------------------------------------------------------------------
 
-%% do not persist `migrate_to`:
-%% * after restart there is nothing to migrate
-%% * this value may be invalid after node was offline
--type persisted_start_opts() :: #{
-    server_reference => emqx_eviction_agent:server_reference(),
-    conn_evict_rate => pos_integer(),
-    sess_evict_rate => pos_integer(),
-    wait_takeover => pos_integer()
-}.
 -type start_opts() :: #{
     server_reference => emqx_eviction_agent:server_reference(),
     conn_evict_rate => pos_integer(),
     sess_evict_rate => pos_integer(),
-    wait_takeover => pos_integer(),
-    migrate_to => emqx_node_rebalance_evacuation:migrate_to()
+    wait_takeover => number(),
+    migrate_to => emqx_node_rebalance_evacuation:migrate_to(),
+    wait_health_check => number()
 }.
 
--spec save(persisted_start_opts()) -> ok_or_error(term()).
+-spec save(start_opts()) -> ok_or_error(term()).
 save(
     #{
         server_reference := ServerReference,
@@ -50,7 +42,7 @@ save(
     (is_binary(ServerReference) orelse ServerReference =:= undefined) andalso
         is_integer(ConnEvictRate) andalso ConnEvictRate > 0 andalso
         is_integer(SessEvictRate) andalso SessEvictRate > 0 andalso
-        is_integer(WaitTakeover) andalso WaitTakeover >= 0
+        is_number(WaitTakeover) andalso WaitTakeover >= 0
 ->
     Filepath = evacuation_filepath(),
     case filelib:ensure_dir(Filepath) of

+ 2 - 0
apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl

@@ -69,6 +69,7 @@ t_start_evacuation_validation(Config) ->
         #{sess_evict_rate => <<"sess">>},
         #{redirect_to => 123},
         #{wait_takeover => <<"wait">>},
+        #{wait_health_check => <<"wait">>},
         #{migrate_to => []},
         #{migrate_to => <<"migrate_to">>},
         #{migrate_to => [<<"bad_node">>]},
@@ -103,6 +104,7 @@ t_start_evacuation_validation(Config) ->
                 conn_evict_rate => 10,
                 sess_evict_rate => 10,
                 wait_takeover => 10,
+                wait_health_check => 10,
                 redirect_to => <<"srv">>,
                 migrate_to => [atom_to_binary(RecipientNode)]
             }

+ 18 - 6
apps/emqx_node_rebalance/test/emqx_node_rebalance_evacuation_SUITE.erl

@@ -86,11 +86,13 @@ end_per_testcase(_Case, Config) ->
 
 t_agent_busy(Config) ->
     [{DonorNode, _DonorPort}] = ?config(cluster_nodes, Config),
+
     ok = rpc:call(DonorNode, emqx_eviction_agent, enable, [other_rebalance, undefined]),
 
-    ?assertEqual(
-        {error, eviction_agent_busy},
-        rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)])
+    ?assertWaitEvent(
+        rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
+        #{?snk_kind := eviction_agent_busy},
+        5000
     ).
 
 t_already_started(Config) ->
@@ -115,7 +117,12 @@ t_start(Config) ->
 
     [{DonorNode, DonorPort}] = ?config(cluster_nodes, Config),
 
-    ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
+    ?assertWaitEvent(
+        rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
+        #{?snk_kind := eviction_agent_started},
+        5000
+    ),
+
     ?assertMatch(
         {error, {use_another_server, #{}}},
         emqtt_try_connect([{port, DonorPort}])
@@ -126,7 +133,11 @@ t_persistence(Config) ->
 
     [{DonorNode, DonorPort}] = ?config(cluster_nodes, Config),
 
-    ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
+    ?assertWaitEvent(
+        rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
+        #{?snk_kind := eviction_agent_started},
+        5000
+    ),
 
     ?assertMatch(
         {error, {use_another_server, #{}}},
@@ -179,7 +190,7 @@ t_conn_evicted(Config) ->
     ?assertWaitEvent(
         ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
         #{?snk_kind := node_evacuation_evict_conn},
-        1000
+        5000
     ),
 
     ?assertMatch(
@@ -251,6 +262,7 @@ opts(Config) ->
         conn_evict_rate => 10,
         sess_evict_rate => 10,
         wait_takeover => 1,
+        wait_health_check => 1,
         migrate_to => migrate_to(Config)
     }.
 

+ 1 - 1
rel/i18n/emqx_node_rebalance_api.hocon

@@ -49,7 +49,7 @@ param_node.label:
 """Node name"""
 
 wait_health_check.desc:
-"""Time to wait before starting the rebalance process, in seconds"""
+"""Time to wait before starting the rebalance/evacuation process, in seconds"""
 
 wait_health_check.label:
 """Wait health check"""