emqx_node_rebalance_api_SUITE.erl 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_node_rebalance_api_SUITE).
  5. -compile(export_all).
  6. -compile(nowarn_export_all).
  7. -include_lib("eunit/include/eunit.hrl").
  8. -include_lib("common_test/include/ct.hrl").
  9. -import(
  10. emqx_mgmt_api_test_util,
  11. [
  12. request/2,
  13. request/3,
  14. uri/1
  15. ]
  16. ).
  17. -import(
  18. emqx_eviction_agent_test_helpers,
  19. [emqtt_connect_many/2, stop_many/1, case_specific_node_name/3]
  20. ).
  21. -define(START_APPS, [emqx_eviction_agent, emqx_node_rebalance]).
  22. all() ->
  23. emqx_common_test_helpers:all(?MODULE).
  24. init_per_suite(Config) ->
  25. ok = emqx_common_test_helpers:start_apps(?START_APPS),
  26. Config.
  27. end_per_suite(_Config) ->
  28. ok = emqx_common_test_helpers:stop_apps(?START_APPS),
  29. ok.
  30. init_per_testcase(Case, Config) ->
  31. [{DonorNode, _} | _] =
  32. ClusterNodes = emqx_eviction_agent_test_helpers:start_cluster(
  33. [
  34. {case_specific_node_name(?MODULE, Case, '_donor'), 2883},
  35. {case_specific_node_name(?MODULE, Case, '_recipient'), 3883}
  36. ],
  37. ?START_APPS,
  38. [{emqx, data_dir, case_specific_data_dir(Case, Config)}]
  39. ),
  40. ok = rpc:call(DonorNode, emqx_mgmt_api_test_util, init_suite, []),
  41. ok = take_auth_header_from(DonorNode),
  42. [{cluster_nodes, ClusterNodes} | Config].
  43. end_per_testcase(_Case, Config) ->
  44. _ = emqx_eviction_agent_test_helpers:stop_cluster(
  45. ?config(cluster_nodes, Config),
  46. ?START_APPS
  47. ).
  48. %%--------------------------------------------------------------------
  49. %% Tests
  50. %%--------------------------------------------------------------------
  51. t_start_evacuation_validation(Config) ->
  52. [{DonorNode, _}, {RecipientNode, _}] = ?config(cluster_nodes, Config),
  53. BadOpts = [
  54. #{conn_evict_rate => <<"conn">>},
  55. #{sess_evict_rate => <<"sess">>},
  56. #{wait_takeover => <<"wait">>},
  57. #{wait_health_check => <<"wait">>},
  58. #{migrate_to => []},
  59. #{migrate_to => <<"migrate_to">>},
  60. #{migrate_to => [<<"bad_node">>]},
  61. #{migrate_to => [<<"bad_node">>, atom_to_binary(DonorNode)]},
  62. #{unknown => <<"Value">>}
  63. ],
  64. lists:foreach(
  65. fun(Opts) ->
  66. ?assertMatch(
  67. {ok, 400, #{}},
  68. api_post(
  69. ["load_rebalance", atom_to_list(DonorNode), "evacuation", "start"],
  70. Opts
  71. ),
  72. Opts
  73. )
  74. end,
  75. BadOpts
  76. ),
  77. ?assertMatch(
  78. {ok, 404, #{}},
  79. api_post(
  80. ["load_rebalance", "bad@node", "evacuation", "start"],
  81. #{}
  82. )
  83. ),
  84. ?assertMatch(
  85. {ok, 200, #{}},
  86. api_post(
  87. ["load_rebalance", atom_to_list(DonorNode), "evacuation", "start"],
  88. #{
  89. conn_evict_rate => 10,
  90. sess_evict_rate => 10,
  91. wait_takeover => <<"10s">>,
  92. wait_health_check => <<"10s">>,
  93. redirect_to => <<"srv">>,
  94. migrate_to => [atom_to_binary(RecipientNode)]
  95. }
  96. )
  97. ),
  98. DonorNodeBin = atom_to_binary(DonorNode),
  99. ?assertMatch(
  100. {ok, 200, #{<<"evacuations">> := [#{<<"node">> := DonorNodeBin}]}},
  101. api_get(["load_rebalance", "global_status"])
  102. ).
  103. t_start_rebalance_validation(Config) ->
  104. process_flag(trap_exit, true),
  105. [{DonorNode, DonorPort}, {RecipientNode, _}] = ?config(cluster_nodes, Config),
  106. BadOpts = [
  107. #{conn_evict_rate => <<"conn">>},
  108. #{sess_evict_rate => <<"sess">>},
  109. #{abs_conn_threshold => <<"act">>},
  110. #{rel_conn_threshold => <<"rct">>},
  111. #{abs_sess_threshold => <<"act">>},
  112. #{rel_sess_threshold => <<"rct">>},
  113. #{wait_takeover => <<"wait">>},
  114. #{wait_health_check => <<"wait">>},
  115. #{nodes => <<"nodes">>},
  116. #{nodes => []},
  117. #{nodes => [<<"bad_node">>]},
  118. #{nodes => [<<"bad_node">>, atom_to_binary(DonorNode)]},
  119. #{unknown => <<"Value">>}
  120. ],
  121. lists:foreach(
  122. fun(Opts) ->
  123. ?assertMatch(
  124. {ok, 400, #{}},
  125. api_post(
  126. ["load_rebalance", atom_to_list(DonorNode), "start"],
  127. Opts
  128. )
  129. )
  130. end,
  131. BadOpts
  132. ),
  133. ?assertMatch(
  134. {ok, 404, #{}},
  135. api_post(
  136. ["load_rebalance", "bad@node", "start"],
  137. #{}
  138. )
  139. ),
  140. Conns = emqtt_connect_many(DonorPort, 50),
  141. ?assertMatch(
  142. {ok, 200, #{}},
  143. api_post(
  144. ["load_rebalance", atom_to_list(DonorNode), "start"],
  145. #{
  146. conn_evict_rate => 10,
  147. sess_evict_rate => 10,
  148. wait_takeover => <<"10s">>,
  149. wait_health_check => <<"10s">>,
  150. abs_conn_threshold => 10,
  151. rel_conn_threshold => 1.001,
  152. abs_sess_threshold => 10,
  153. rel_sess_threshold => 1.001,
  154. nodes => [
  155. atom_to_binary(DonorNode),
  156. atom_to_binary(RecipientNode)
  157. ]
  158. }
  159. )
  160. ),
  161. DonorNodeBin = atom_to_binary(DonorNode),
  162. ?assertMatch(
  163. {ok, 200, #{<<"rebalances">> := [#{<<"node">> := DonorNodeBin}]}},
  164. api_get(["load_rebalance", "global_status"])
  165. ),
  166. ok = stop_many(Conns).
  167. t_start_stop_evacuation(Config) ->
  168. [{DonorNode, _}, {RecipientNode, _}] = ?config(cluster_nodes, Config),
  169. StartOpts = maps:merge(
  170. maps:get(evacuation, emqx_node_rebalance_api:rebalance_evacuation_example()),
  171. #{migrate_to => [atom_to_binary(RecipientNode)]}
  172. ),
  173. ?assertMatch(
  174. {ok, 200, #{}},
  175. api_post(
  176. ["load_rebalance", atom_to_list(DonorNode), "evacuation", "start"],
  177. StartOpts
  178. )
  179. ),
  180. StatusResponse = api_get(["load_rebalance", "status"]),
  181. ?assertMatch(
  182. {ok, 200, _},
  183. StatusResponse
  184. ),
  185. {ok, 200, Status} = StatusResponse,
  186. ?assertMatch(
  187. #{
  188. process := evacuation,
  189. connection_eviction_rate := 100,
  190. session_eviction_rate := 100,
  191. connection_goal := 0,
  192. session_goal := 0,
  193. stats := #{
  194. initial_connected := _,
  195. current_connected := _,
  196. initial_sessions := _,
  197. current_sessions := _
  198. }
  199. },
  200. emqx_node_rebalance_api:translate(local_status_enabled, Status)
  201. ),
  202. DonorNodeBin = atom_to_binary(DonorNode),
  203. GlobalStatusResponse = api_get(["load_rebalance", "global_status"]),
  204. ?assertMatch(
  205. {ok, 200, _},
  206. GlobalStatusResponse
  207. ),
  208. {ok, 200, GlobalStatus} = GlobalStatusResponse,
  209. ?assertMatch(
  210. #{
  211. rebalances := [],
  212. evacuations := [
  213. #{
  214. node := DonorNodeBin,
  215. connection_eviction_rate := 100,
  216. session_eviction_rate := 100,
  217. connection_goal := 0,
  218. session_goal := 0,
  219. stats := #{
  220. initial_connected := _,
  221. current_connected := _,
  222. initial_sessions := _,
  223. current_sessions := _
  224. }
  225. }
  226. ]
  227. },
  228. emqx_node_rebalance_api:translate(global_status, GlobalStatus)
  229. ),
  230. ?assertMatch(
  231. {ok, 200, #{}},
  232. api_post(
  233. ["load_rebalance", atom_to_list(DonorNode), "evacuation", "stop"],
  234. #{}
  235. )
  236. ),
  237. ?assertMatch(
  238. {ok, 200, #{<<"status">> := <<"disabled">>}},
  239. api_get(["load_rebalance", "status"])
  240. ),
  241. ?assertMatch(
  242. {ok, 200, #{<<"evacuations">> := [], <<"rebalances">> := []}},
  243. api_get(["load_rebalance", "global_status"])
  244. ).
  245. t_start_stop_rebalance(Config) ->
  246. process_flag(trap_exit, true),
  247. [{DonorNode, DonorPort}, {RecipientNode, _}] = ?config(cluster_nodes, Config),
  248. ?assertMatch(
  249. {ok, 200, #{<<"status">> := <<"disabled">>}},
  250. api_get(["load_rebalance", "status"])
  251. ),
  252. Conns = emqtt_connect_many(DonorPort, 100),
  253. StartOpts = maps:without(
  254. [nodes],
  255. maps:get(rebalance, emqx_node_rebalance_api:rebalance_example())
  256. ),
  257. ?assertMatch(
  258. {ok, 200, #{}},
  259. api_post(
  260. ["load_rebalance", atom_to_list(DonorNode), "start"],
  261. StartOpts
  262. )
  263. ),
  264. StatusResponse = api_get(["load_rebalance", "status"]),
  265. ?assertMatch(
  266. {ok, 200, _},
  267. StatusResponse
  268. ),
  269. {ok, 200, Status} = StatusResponse,
  270. ?assertMatch(
  271. #{process := rebalance, connection_eviction_rate := 10, session_eviction_rate := 20},
  272. emqx_node_rebalance_api:translate(local_status_enabled, Status)
  273. ),
  274. DonorNodeBin = atom_to_binary(DonorNode),
  275. RecipientNodeBin = atom_to_binary(RecipientNode),
  276. GlobalStatusResponse = api_get(["load_rebalance", "global_status"]),
  277. ?assertMatch(
  278. {ok, 200, _},
  279. GlobalStatusResponse
  280. ),
  281. {ok, 200, GlobalStatus} = GlobalStatusResponse,
  282. ?assertMatch(
  283. {ok, 200, #{
  284. <<"evacuations">> := [],
  285. <<"rebalances">> :=
  286. [
  287. #{
  288. <<"state">> := _,
  289. <<"node">> := DonorNodeBin,
  290. <<"coordinator_node">> := _,
  291. <<"connection_eviction_rate">> := 10,
  292. <<"session_eviction_rate">> := 20,
  293. <<"donors">> := [DonorNodeBin],
  294. <<"recipients">> := [RecipientNodeBin]
  295. }
  296. ]
  297. }},
  298. GlobalStatusResponse
  299. ),
  300. ?assertMatch(
  301. #{
  302. evacuations := [],
  303. rebalances := [
  304. #{
  305. state := _,
  306. node := DonorNodeBin,
  307. coordinator_node := _,
  308. connection_eviction_rate := 10,
  309. session_eviction_rate := 20,
  310. donors := [DonorNodeBin],
  311. recipients := [RecipientNodeBin]
  312. }
  313. ]
  314. },
  315. emqx_node_rebalance_api:translate(global_status, GlobalStatus)
  316. ),
  317. ?assertMatch(
  318. {ok, 200, #{}},
  319. api_post(
  320. ["load_rebalance", atom_to_list(DonorNode), "stop"],
  321. #{}
  322. )
  323. ),
  324. ?assertMatch(
  325. {ok, 200, #{<<"status">> := <<"disabled">>}},
  326. api_get(["load_rebalance", "status"])
  327. ),
  328. ?assertMatch(
  329. {ok, 200, #{<<"evacuations">> := [], <<"rebalances">> := []}},
  330. api_get(["load_rebalance", "global_status"])
  331. ),
  332. ok = stop_many(Conns).
  333. t_availability_check(Config) ->
  334. [{DonorNode, _} | _] = ?config(cluster_nodes, Config),
  335. ?assertMatch(
  336. {ok, 200, #{}},
  337. api_get(["load_rebalance", "availability_check"])
  338. ),
  339. ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [#{}]),
  340. ?assertMatch(
  341. {ok, 503, _},
  342. api_get(["load_rebalance", "availability_check"])
  343. ),
  344. ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, stop, []),
  345. ?assertMatch(
  346. {ok, 200, #{}},
  347. api_get(["load_rebalance", "availability_check"])
  348. ).
  349. %%--------------------------------------------------------------------
  350. %% Helpers
  351. %%--------------------------------------------------------------------
  352. api_get(Path) ->
  353. case request(get, uri(Path)) of
  354. {ok, Code, ResponseBody} ->
  355. {ok, Code, jiffy:decode(ResponseBody, [return_maps])};
  356. {error, _} = Error ->
  357. Error
  358. end.
  359. api_post(Path, Data) ->
  360. case request(post, uri(Path), Data) of
  361. {ok, Code, ResponseBody} ->
  362. {ok, Code, jiffy:decode(ResponseBody, [return_maps])};
  363. {error, _} = Error ->
  364. Error
  365. end.
  366. take_auth_header_from(Node) ->
  367. meck:new(emqx_common_test_http, [passthrough]),
  368. meck:expect(
  369. emqx_common_test_http,
  370. default_auth_header,
  371. fun() -> rpc:call(Node, emqx_common_test_http, default_auth_header, []) end
  372. ),
  373. ok.
  374. case_specific_data_dir(Case, Config) ->
  375. case ?config(priv_dir, Config) of
  376. undefined -> undefined;
  377. PrivDir -> filename:join(PrivDir, atom_to_list(Case))
  378. end.