| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%--------------------------------------------------------------------
- -module(emqx_node_rebalance_api_SUITE).
- -compile(export_all).
- -compile(nowarn_export_all).
- -include_lib("eunit/include/eunit.hrl").
- -include_lib("common_test/include/ct.hrl").
- -import(
- emqx_mgmt_api_test_util,
- [
- request_api/3,
- request/2,
- request/3,
- uri/1
- ]
- ).
- -import(
- emqx_eviction_agent_test_helpers,
- [emqtt_connect_many/2, stop_many/1, case_specific_node_name/3]
- ).
- all() ->
- emqx_common_test_helpers:all(?MODULE).
- init_per_suite(Config) ->
- Apps = emqx_cth_suite:start([emqx, emqx_node_rebalance], #{
- work_dir => ?config(priv_dir, Config)
- }),
- [{apps, Apps} | Config].
- end_per_suite(Config) ->
- emqx_cth_suite:stop(?config(apps, Config)).
- init_per_testcase(Case, Config) ->
- DonorNode = case_specific_node_name(?MODULE, Case, '_donor'),
- RecipientNode = case_specific_node_name(?MODULE, Case, '_recipient'),
- Spec = #{
- role => core,
- join_to => emqx_cth_cluster:node_name(DonorNode),
- listeners => true,
- apps => app_specs()
- },
- Cluster = [{Node, Spec} || Node <- [DonorNode, RecipientNode]],
- ClusterNodes =
- [Node1 | _] = emqx_cth_cluster:start(
- Cluster,
- #{work_dir => ?config(priv_dir, Config)}
- ),
- ok = rpc:call(Node1, emqx_mgmt_api_test_util, init_suite, []),
- ok = take_auth_header_from(Node1),
- [{cluster_nodes, ClusterNodes} | Config].
- end_per_testcase(_Case, Config) ->
- Nodes = ?config(cluster_nodes, Config),
- _ = emqx_cth_cluster:stop(Nodes),
- ok.
- %%--------------------------------------------------------------------
- %% Tests
- %%--------------------------------------------------------------------
- t_start_evacuation_validation(Config) ->
- [DonorNode, RecipientNode] = ?config(cluster_nodes, Config),
- BadOpts = [
- #{conn_evict_rate => <<"conn">>},
- #{sess_evict_rate => <<"sess">>},
- #{wait_takeover => <<"wait">>},
- #{wait_health_check => <<"wait">>},
- #{migrate_to => []},
- #{migrate_to => <<"migrate_to">>},
- #{migrate_to => [<<"bad_node">>]},
- #{migrate_to => [<<"bad_node">>, atom_to_binary(DonorNode)]},
- #{unknown => <<"Value">>}
- ],
- lists:foreach(
- fun(Opts) ->
- ?assertMatch(
- {ok, 400, #{}},
- api_post(
- ["load_rebalance", atom_to_list(DonorNode), "evacuation", "start"],
- Opts
- ),
- Opts
- )
- end,
- BadOpts
- ),
- ?assertMatch(
- {ok, 404, #{}},
- api_post(
- ["load_rebalance", "bad@node", "evacuation", "start"],
- #{}
- )
- ),
- ?assertMatch(
- {ok, 200, #{}},
- api_post(
- ["load_rebalance", atom_to_list(DonorNode), "evacuation", "start"],
- #{
- conn_evict_rate => 10,
- sess_evict_rate => 10,
- wait_takeover => <<"10s">>,
- wait_health_check => <<"10s">>,
- redirect_to => <<"srv">>,
- migrate_to => [atom_to_binary(RecipientNode)]
- }
- )
- ),
- DonorNodeBin = atom_to_binary(DonorNode),
- ?assertMatch(
- {ok, 200, #{<<"evacuations">> := [#{<<"node">> := DonorNodeBin}]}},
- api_get(["load_rebalance", "global_status"])
- ).
- %% TODO: uncomment after we officially release the feature.
- skipped_t_start_purge_validation(Config) ->
- [Node1 | _] = ?config(cluster_nodes, Config),
- Port1 = get_mqtt_port(Node1, tcp),
- BadOpts = [
- #{purge_rate => <<"conn">>},
- #{purge_rate => 0},
- #{purge_rate => -1},
- #{purge_rate => 1.1},
- #{unknown => <<"Value">>}
- ],
- lists:foreach(
- fun(Opts) ->
- ?assertMatch(
- {ok, 400, #{}},
- api_post(
- ["load_rebalance", atom_to_list(Node1), "purge", "start"],
- Opts
- ),
- Opts
- )
- end,
- BadOpts
- ),
- ?assertMatch(
- {ok, 404, #{}},
- api_post(
- ["load_rebalance", "bad@node", "purge", "start"],
- #{}
- )
- ),
- process_flag(trap_exit, true),
- Conns = emqtt_connect_many(Port1, 100),
- ?assertMatch(
- {ok, 200, #{}},
- api_post(
- ["load_rebalance", atom_to_list(Node1), "purge", "start"],
- #{purge_rate => 10}
- )
- ),
- Node1Bin = atom_to_binary(Node1),
- ?assertMatch(
- {ok, 200, #{<<"purges">> := [#{<<"node">> := Node1Bin}]}},
- api_get(["load_rebalance", "global_status"])
- ),
- ?assertMatch(
- {ok, 200, #{
- <<"process">> := <<"purge">>,
- <<"purge_rate">> := 10,
- <<"session_goal">> := 0,
- <<"state">> := <<"purging">>,
- <<"stats">> :=
- #{
- <<"current_sessions">> := _,
- <<"initial_sessions">> := 100
- }
- }},
- api_get(["load_rebalance", "status"])
- ),
- ?assertMatch(
- {ok, 200, #{}},
- api_post(
- ["load_rebalance", atom_to_list(Node1), "purge", "stop"],
- #{}
- )
- ),
- ok = stop_many(Conns),
- ok.
- t_start_rebalance_validation(Config) ->
- process_flag(trap_exit, true),
- [DonorNode, RecipientNode] = ?config(cluster_nodes, Config),
- DonorPort = get_mqtt_port(DonorNode, tcp),
- BadOpts = [
- #{conn_evict_rate => <<"conn">>},
- #{sess_evict_rate => <<"sess">>},
- #{abs_conn_threshold => <<"act">>},
- #{rel_conn_threshold => <<"rct">>},
- #{abs_sess_threshold => <<"act">>},
- #{rel_sess_threshold => <<"rct">>},
- #{wait_takeover => <<"wait">>},
- #{wait_health_check => <<"wait">>},
- #{nodes => <<"nodes">>},
- #{nodes => []},
- #{nodes => [<<"bad_node">>]},
- #{nodes => [<<"bad_node">>, atom_to_binary(DonorNode)]},
- #{unknown => <<"Value">>}
- ],
- lists:foreach(
- fun(Opts) ->
- ?assertMatch(
- {ok, 400, #{}},
- api_post(
- ["load_rebalance", atom_to_list(DonorNode), "start"],
- Opts
- )
- )
- end,
- BadOpts
- ),
- ?assertMatch(
- {ok, 404, #{}},
- api_post(
- ["load_rebalance", "bad@node", "start"],
- #{}
- )
- ),
- Conns = emqtt_connect_many(DonorPort, 50),
- ?assertMatch(
- {ok, 200, #{}},
- api_post(
- ["load_rebalance", atom_to_list(DonorNode), "start"],
- #{
- conn_evict_rate => 10,
- sess_evict_rate => 10,
- wait_takeover => <<"10s">>,
- wait_health_check => <<"10s">>,
- abs_conn_threshold => 10,
- rel_conn_threshold => 1.001,
- abs_sess_threshold => 10,
- rel_sess_threshold => 1.001,
- nodes => [
- atom_to_binary(DonorNode),
- atom_to_binary(RecipientNode)
- ]
- }
- )
- ),
- DonorNodeBin = atom_to_binary(DonorNode),
- ?assertMatch(
- {ok, 200, #{<<"rebalances">> := [#{<<"node">> := DonorNodeBin}]}},
- api_get(["load_rebalance", "global_status"])
- ),
- ok = stop_many(Conns).
- t_start_stop_evacuation(Config) ->
- [DonorNode, RecipientNode] = ?config(cluster_nodes, Config),
- StartOpts = maps:merge(
- maps:get(evacuation, emqx_node_rebalance_api:rebalance_evacuation_example()),
- #{migrate_to => [atom_to_binary(RecipientNode)]}
- ),
- ?assertMatch(
- {ok, 200, #{}},
- api_post(
- ["load_rebalance", atom_to_list(DonorNode), "evacuation", "start"],
- StartOpts
- )
- ),
- StatusResponse = api_get(["load_rebalance", "status"]),
- ?assertMatch(
- {ok, 200, _},
- StatusResponse
- ),
- {ok, 200, Status} = StatusResponse,
- ?assertMatch(
- #{
- process := evacuation,
- connection_eviction_rate := 100,
- session_eviction_rate := 100,
- connection_goal := 0,
- session_goal := 0,
- stats := #{
- initial_connected := _,
- current_connected := _,
- initial_sessions := _,
- current_sessions := _
- }
- },
- emqx_node_rebalance_api:translate(local_status_enabled, Status)
- ),
- DonorNodeBin = atom_to_binary(DonorNode),
- GlobalStatusResponse = api_get(["load_rebalance", "global_status"]),
- ?assertMatch(
- {ok, 200, _},
- GlobalStatusResponse
- ),
- {ok, 200, GlobalStatus} = GlobalStatusResponse,
- ?assertMatch(
- #{
- rebalances := [],
- evacuations := [
- #{
- node := DonorNodeBin,
- connection_eviction_rate := 100,
- session_eviction_rate := 100,
- connection_goal := 0,
- session_goal := 0,
- stats := #{
- initial_connected := _,
- current_connected := _,
- initial_sessions := _,
- current_sessions := _
- }
- }
- ]
- },
- emqx_node_rebalance_api:translate(global_status, GlobalStatus)
- ),
- ?assertMatch(
- {ok, 200, #{}},
- api_post(
- ["load_rebalance", atom_to_list(DonorNode), "evacuation", "stop"],
- #{}
- )
- ),
- ?assertMatch(
- {ok, 200, #{<<"status">> := <<"disabled">>}},
- api_get(["load_rebalance", "status"])
- ),
- ?assertMatch(
- {ok, 200, #{<<"evacuations">> := [], <<"rebalances">> := []}},
- api_get(["load_rebalance", "global_status"])
- ).
- t_start_stop_rebalance(Config) ->
- process_flag(trap_exit, true),
- [DonorNode, RecipientNode] = ?config(cluster_nodes, Config),
- DonorPort = get_mqtt_port(DonorNode, tcp),
- ?assertMatch(
- {ok, 200, #{<<"status">> := <<"disabled">>}},
- api_get(["load_rebalance", "status"])
- ),
- Conns = emqtt_connect_many(DonorPort, 100),
- StartOpts = maps:without(
- [nodes],
- maps:get(rebalance, emqx_node_rebalance_api:rebalance_example())
- ),
- ?assertMatch(
- {ok, 200, #{}},
- api_post(
- ["load_rebalance", atom_to_list(DonorNode), "start"],
- StartOpts
- )
- ),
- StatusResponse = api_get(["load_rebalance", "status"]),
- ?assertMatch(
- {ok, 200, _},
- StatusResponse
- ),
- {ok, 200, Status} = StatusResponse,
- ?assertMatch(
- #{process := rebalance, connection_eviction_rate := 10, session_eviction_rate := 20},
- emqx_node_rebalance_api:translate(local_status_enabled, Status)
- ),
- DonorNodeBin = atom_to_binary(DonorNode),
- RecipientNodeBin = atom_to_binary(RecipientNode),
- GlobalStatusResponse = api_get(["load_rebalance", "global_status"]),
- ?assertMatch(
- {ok, 200, _},
- GlobalStatusResponse
- ),
- {ok, 200, GlobalStatus} = GlobalStatusResponse,
- ?assertMatch(
- {ok, 200, #{
- <<"evacuations">> := [],
- <<"rebalances">> :=
- [
- #{
- <<"state">> := _,
- <<"node">> := DonorNodeBin,
- <<"coordinator_node">> := _,
- <<"connection_eviction_rate">> := 10,
- <<"session_eviction_rate">> := 20,
- <<"donors">> := [DonorNodeBin],
- <<"recipients">> := [RecipientNodeBin]
- }
- ]
- }},
- GlobalStatusResponse
- ),
- ?assertMatch(
- #{
- evacuations := [],
- rebalances := [
- #{
- state := _,
- node := DonorNodeBin,
- coordinator_node := _,
- connection_eviction_rate := 10,
- session_eviction_rate := 20,
- donors := [DonorNodeBin],
- recipients := [RecipientNodeBin]
- }
- ]
- },
- emqx_node_rebalance_api:translate(global_status, GlobalStatus)
- ),
- ?assertMatch(
- {ok, 200, #{}},
- api_post(
- ["load_rebalance", atom_to_list(DonorNode), "stop"],
- #{}
- )
- ),
- ?assertMatch(
- {ok, 200, #{<<"status">> := <<"disabled">>}},
- api_get(["load_rebalance", "status"])
- ),
- ?assertMatch(
- {ok, 200, #{<<"evacuations">> := [], <<"rebalances">> := []}},
- api_get(["load_rebalance", "global_status"])
- ),
- ok = stop_many(Conns).
- t_availability_check(Config) ->
- [DonorNode | _] = ?config(cluster_nodes, Config),
- ?assertMatch(
- {ok, _},
- api_get_noauth(["load_rebalance", "availability_check"])
- ),
- ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [#{}]),
- ?assertMatch(
- {error, {_, 503, _}},
- api_get_noauth(["load_rebalance", "availability_check"])
- ),
- ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, stop, []),
- ?assertMatch(
- {ok, _},
- api_get_noauth(["load_rebalance", "availability_check"])
- ).
- %%--------------------------------------------------------------------
- %% Helpers
- %%--------------------------------------------------------------------
- api_get_noauth(Path) ->
- request_api(get, uri(Path), emqx_common_test_http:auth_header("invalid", "password")).
- api_get(Path) ->
- case request(get, uri(Path)) of
- {ok, Code, ResponseBody} ->
- {ok, Code, jiffy:decode(ResponseBody, [return_maps])};
- {error, _} = Error ->
- Error
- end.
- api_post(Path, Data) ->
- case request(post, uri(Path), Data) of
- {ok, Code, ResponseBody} ->
- Res =
- case emqx_utils_json:safe_decode(ResponseBody, [return_maps]) of
- {ok, Decoded} -> Decoded;
- {error, _} -> ResponseBody
- end,
- {ok, Code, Res};
- {error, _} = Error ->
- Error
- end.
- take_auth_header_from(Node) ->
- meck:new(emqx_common_test_http, [passthrough]),
- meck:expect(
- emqx_common_test_http,
- default_auth_header,
- fun() -> rpc:call(Node, emqx_common_test_http, default_auth_header, []) end
- ),
- ok.
- case_specific_data_dir(Case, Config) ->
- case ?config(priv_dir, Config) of
- undefined -> undefined;
- PrivDir -> filename:join(PrivDir, atom_to_list(Case))
- end.
- app_specs() ->
- [
- {emqx, #{
- before_start => fun() ->
- emqx_app:set_config_loader(?MODULE)
- end,
- override_env => [{boot_modules, [broker, listeners]}]
- }},
- {emqx_retainer, #{
- config =>
- #{
- retainer =>
- #{enable => true}
- }
- }},
- emqx_node_rebalance
- ].
- get_mqtt_port(Node, Type) ->
- {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
- Port.
|