| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%--------------------------------------------------------------------
- -module(emqx_node_rebalance_api_SUITE).
- -compile(export_all).
- -compile(nowarn_export_all).
- -include_lib("eunit/include/eunit.hrl").
- -include_lib("common_test/include/ct.hrl").
- -import(
- emqx_mgmt_api_test_util,
- [
- request/2,
- request/3,
- uri/1
- ]
- ).
- -import(
- emqx_eviction_agent_test_helpers,
- [emqtt_connect_many/2, stop_many/1, case_specific_node_name/3]
- ).
- -define(START_APPS, [emqx_eviction_agent, emqx_node_rebalance]).
- all() ->
- emqx_common_test_helpers:all(?MODULE).
- init_per_suite(Config) ->
- ok = emqx_common_test_helpers:start_apps(?START_APPS),
- Config.
- end_per_suite(_Config) ->
- ok = emqx_common_test_helpers:stop_apps(?START_APPS),
- ok.
- init_per_testcase(Case, Config) ->
- [{DonorNode, _} | _] =
- ClusterNodes = emqx_eviction_agent_test_helpers:start_cluster(
- [
- {case_specific_node_name(?MODULE, Case, '_donor'), 2883},
- {case_specific_node_name(?MODULE, Case, '_recipient'), 3883}
- ],
- ?START_APPS,
- [{emqx, data_dir, case_specific_data_dir(Case, Config)}]
- ),
- ok = rpc:call(DonorNode, emqx_mgmt_api_test_util, init_suite, []),
- ok = take_auth_header_from(DonorNode),
- [{cluster_nodes, ClusterNodes} | Config].
- end_per_testcase(_Case, Config) ->
- _ = emqx_eviction_agent_test_helpers:stop_cluster(
- ?config(cluster_nodes, Config),
- ?START_APPS
- ).
- %%--------------------------------------------------------------------
- %% Tests
- %%--------------------------------------------------------------------
- t_start_evacuation_validation(Config) ->
- [{DonorNode, _}, {RecipientNode, _}] = ?config(cluster_nodes, Config),
- BadOpts = [
- #{conn_evict_rate => <<"conn">>},
- #{sess_evict_rate => <<"sess">>},
- #{wait_takeover => <<"wait">>},
- #{wait_health_check => <<"wait">>},
- #{migrate_to => []},
- #{migrate_to => <<"migrate_to">>},
- #{migrate_to => [<<"bad_node">>]},
- #{migrate_to => [<<"bad_node">>, atom_to_binary(DonorNode)]},
- #{unknown => <<"Value">>}
- ],
- lists:foreach(
- fun(Opts) ->
- ?assertMatch(
- {ok, 400, #{}},
- api_post(
- ["load_rebalance", atom_to_list(DonorNode), "evacuation", "start"],
- Opts
- ),
- Opts
- )
- end,
- BadOpts
- ),
- ?assertMatch(
- {ok, 404, #{}},
- api_post(
- ["load_rebalance", "bad@node", "evacuation", "start"],
- #{}
- )
- ),
- ?assertMatch(
- {ok, 200, #{}},
- api_post(
- ["load_rebalance", atom_to_list(DonorNode), "evacuation", "start"],
- #{
- conn_evict_rate => 10,
- sess_evict_rate => 10,
- wait_takeover => <<"10s">>,
- wait_health_check => <<"10s">>,
- redirect_to => <<"srv">>,
- migrate_to => [atom_to_binary(RecipientNode)]
- }
- )
- ),
- DonorNodeBin = atom_to_binary(DonorNode),
- ?assertMatch(
- {ok, 200, #{<<"evacuations">> := [#{<<"node">> := DonorNodeBin}]}},
- api_get(["load_rebalance", "global_status"])
- ).
- t_start_rebalance_validation(Config) ->
- process_flag(trap_exit, true),
- [{DonorNode, DonorPort}, {RecipientNode, _}] = ?config(cluster_nodes, Config),
- BadOpts = [
- #{conn_evict_rate => <<"conn">>},
- #{sess_evict_rate => <<"sess">>},
- #{abs_conn_threshold => <<"act">>},
- #{rel_conn_threshold => <<"rct">>},
- #{abs_sess_threshold => <<"act">>},
- #{rel_sess_threshold => <<"rct">>},
- #{wait_takeover => <<"wait">>},
- #{wait_health_check => <<"wait">>},
- #{nodes => <<"nodes">>},
- #{nodes => []},
- #{nodes => [<<"bad_node">>]},
- #{nodes => [<<"bad_node">>, atom_to_binary(DonorNode)]},
- #{unknown => <<"Value">>}
- ],
- lists:foreach(
- fun(Opts) ->
- ?assertMatch(
- {ok, 400, #{}},
- api_post(
- ["load_rebalance", atom_to_list(DonorNode), "start"],
- Opts
- )
- )
- end,
- BadOpts
- ),
- ?assertMatch(
- {ok, 404, #{}},
- api_post(
- ["load_rebalance", "bad@node", "start"],
- #{}
- )
- ),
- Conns = emqtt_connect_many(DonorPort, 50),
- ?assertMatch(
- {ok, 200, #{}},
- api_post(
- ["load_rebalance", atom_to_list(DonorNode), "start"],
- #{
- conn_evict_rate => 10,
- sess_evict_rate => 10,
- wait_takeover => <<"10s">>,
- wait_health_check => <<"10s">>,
- abs_conn_threshold => 10,
- rel_conn_threshold => 1.001,
- abs_sess_threshold => 10,
- rel_sess_threshold => 1.001,
- nodes => [
- atom_to_binary(DonorNode),
- atom_to_binary(RecipientNode)
- ]
- }
- )
- ),
- DonorNodeBin = atom_to_binary(DonorNode),
- ?assertMatch(
- {ok, 200, #{<<"rebalances">> := [#{<<"node">> := DonorNodeBin}]}},
- api_get(["load_rebalance", "global_status"])
- ),
- ok = stop_many(Conns).
- t_start_stop_evacuation(Config) ->
- [{DonorNode, _}, {RecipientNode, _}] = ?config(cluster_nodes, Config),
- StartOpts = maps:merge(
- maps:get(evacuation, emqx_node_rebalance_api:rebalance_evacuation_example()),
- #{migrate_to => [atom_to_binary(RecipientNode)]}
- ),
- ?assertMatch(
- {ok, 200, #{}},
- api_post(
- ["load_rebalance", atom_to_list(DonorNode), "evacuation", "start"],
- StartOpts
- )
- ),
- StatusResponse = api_get(["load_rebalance", "status"]),
- ?assertMatch(
- {ok, 200, _},
- StatusResponse
- ),
- {ok, 200, Status} = StatusResponse,
- ?assertMatch(
- #{
- process := evacuation,
- connection_eviction_rate := 100,
- session_eviction_rate := 100,
- connection_goal := 0,
- session_goal := 0,
- stats := #{
- initial_connected := _,
- current_connected := _,
- initial_sessions := _,
- current_sessions := _
- }
- },
- emqx_node_rebalance_api:translate(local_status_enabled, Status)
- ),
- DonorNodeBin = atom_to_binary(DonorNode),
- GlobalStatusResponse = api_get(["load_rebalance", "global_status"]),
- ?assertMatch(
- {ok, 200, _},
- GlobalStatusResponse
- ),
- {ok, 200, GlobalStatus} = GlobalStatusResponse,
- ?assertMatch(
- #{
- rebalances := [],
- evacuations := [
- #{
- node := DonorNodeBin,
- connection_eviction_rate := 100,
- session_eviction_rate := 100,
- connection_goal := 0,
- session_goal := 0,
- stats := #{
- initial_connected := _,
- current_connected := _,
- initial_sessions := _,
- current_sessions := _
- }
- }
- ]
- },
- emqx_node_rebalance_api:translate(global_status, GlobalStatus)
- ),
- ?assertMatch(
- {ok, 200, #{}},
- api_post(
- ["load_rebalance", atom_to_list(DonorNode), "evacuation", "stop"],
- #{}
- )
- ),
- ?assertMatch(
- {ok, 200, #{<<"status">> := <<"disabled">>}},
- api_get(["load_rebalance", "status"])
- ),
- ?assertMatch(
- {ok, 200, #{<<"evacuations">> := [], <<"rebalances">> := []}},
- api_get(["load_rebalance", "global_status"])
- ).
- t_start_stop_rebalance(Config) ->
- process_flag(trap_exit, true),
- [{DonorNode, DonorPort}, {RecipientNode, _}] = ?config(cluster_nodes, Config),
- ?assertMatch(
- {ok, 200, #{<<"status">> := <<"disabled">>}},
- api_get(["load_rebalance", "status"])
- ),
- Conns = emqtt_connect_many(DonorPort, 100),
- StartOpts = maps:without(
- [nodes],
- maps:get(rebalance, emqx_node_rebalance_api:rebalance_example())
- ),
- ?assertMatch(
- {ok, 200, #{}},
- api_post(
- ["load_rebalance", atom_to_list(DonorNode), "start"],
- StartOpts
- )
- ),
- StatusResponse = api_get(["load_rebalance", "status"]),
- ?assertMatch(
- {ok, 200, _},
- StatusResponse
- ),
- {ok, 200, Status} = StatusResponse,
- ?assertMatch(
- #{process := rebalance, connection_eviction_rate := 10, session_eviction_rate := 20},
- emqx_node_rebalance_api:translate(local_status_enabled, Status)
- ),
- DonorNodeBin = atom_to_binary(DonorNode),
- RecipientNodeBin = atom_to_binary(RecipientNode),
- GlobalStatusResponse = api_get(["load_rebalance", "global_status"]),
- ?assertMatch(
- {ok, 200, _},
- GlobalStatusResponse
- ),
- {ok, 200, GlobalStatus} = GlobalStatusResponse,
- ?assertMatch(
- {ok, 200, #{
- <<"evacuations">> := [],
- <<"rebalances">> :=
- [
- #{
- <<"state">> := _,
- <<"node">> := DonorNodeBin,
- <<"coordinator_node">> := _,
- <<"connection_eviction_rate">> := 10,
- <<"session_eviction_rate">> := 20,
- <<"donors">> := [DonorNodeBin],
- <<"recipients">> := [RecipientNodeBin]
- }
- ]
- }},
- GlobalStatusResponse
- ),
- ?assertMatch(
- #{
- evacuations := [],
- rebalances := [
- #{
- state := _,
- node := DonorNodeBin,
- coordinator_node := _,
- connection_eviction_rate := 10,
- session_eviction_rate := 20,
- donors := [DonorNodeBin],
- recipients := [RecipientNodeBin]
- }
- ]
- },
- emqx_node_rebalance_api:translate(global_status, GlobalStatus)
- ),
- ?assertMatch(
- {ok, 200, #{}},
- api_post(
- ["load_rebalance", atom_to_list(DonorNode), "stop"],
- #{}
- )
- ),
- ?assertMatch(
- {ok, 200, #{<<"status">> := <<"disabled">>}},
- api_get(["load_rebalance", "status"])
- ),
- ?assertMatch(
- {ok, 200, #{<<"evacuations">> := [], <<"rebalances">> := []}},
- api_get(["load_rebalance", "global_status"])
- ),
- ok = stop_many(Conns).
- t_availability_check(Config) ->
- [{DonorNode, _} | _] = ?config(cluster_nodes, Config),
- ?assertMatch(
- {ok, 200, #{}},
- api_get(["load_rebalance", "availability_check"])
- ),
- ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [#{}]),
- ?assertMatch(
- {ok, 503, _},
- api_get(["load_rebalance", "availability_check"])
- ),
- ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, stop, []),
- ?assertMatch(
- {ok, 200, #{}},
- api_get(["load_rebalance", "availability_check"])
- ).
- %%--------------------------------------------------------------------
- %% Helpers
- %%--------------------------------------------------------------------
- api_get(Path) ->
- case request(get, uri(Path)) of
- {ok, Code, ResponseBody} ->
- {ok, Code, jiffy:decode(ResponseBody, [return_maps])};
- {error, _} = Error ->
- Error
- end.
- api_post(Path, Data) ->
- case request(post, uri(Path), Data) of
- {ok, Code, ResponseBody} ->
- {ok, Code, jiffy:decode(ResponseBody, [return_maps])};
- {error, _} = Error ->
- Error
- end.
- take_auth_header_from(Node) ->
- meck:new(emqx_common_test_http, [passthrough]),
- meck:expect(
- emqx_common_test_http,
- default_auth_header,
- fun() -> rpc:call(Node, emqx_common_test_http, default_auth_header, []) end
- ),
- ok.
- case_specific_data_dir(Case, Config) ->
- case ?config(priv_dir, Config) of
- undefined -> undefined;
- PrivDir -> filename:join(PrivDir, atom_to_list(Case))
- end.
|