emqx_node_rebalance_api_SUITE.erl 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_node_rebalance_api_SUITE).
  5. -compile(export_all).
  6. -compile(nowarn_export_all).
  7. -include_lib("eunit/include/eunit.hrl").
  8. -include_lib("common_test/include/ct.hrl").
  9. -import(
  10. emqx_mgmt_api_test_util,
  11. [
  12. request_api/3,
  13. request/2,
  14. request/3,
  15. uri/1
  16. ]
  17. ).
  18. -import(
  19. emqx_eviction_agent_test_helpers,
  20. [emqtt_connect_many/2, stop_many/1, case_specific_node_name/3]
  21. ).
  22. all() ->
  23. emqx_common_test_helpers:all(?MODULE).
  24. init_per_suite(Config) ->
  25. Apps = emqx_cth_suite:start([emqx, emqx_node_rebalance], #{
  26. work_dir => ?config(priv_dir, Config)
  27. }),
  28. [{apps, Apps} | Config].
  29. end_per_suite(Config) ->
  30. emqx_cth_suite:stop(?config(apps, Config)).
  31. init_per_testcase(Case, Config) ->
  32. DonorNode = case_specific_node_name(?MODULE, Case, '_donor'),
  33. RecipientNode = case_specific_node_name(?MODULE, Case, '_recipient'),
  34. Spec = #{
  35. role => core,
  36. join_to => emqx_cth_cluster:node_name(DonorNode),
  37. listeners => true,
  38. apps => app_specs()
  39. },
  40. Cluster = [{Node, Spec} || Node <- [DonorNode, RecipientNode]],
  41. ClusterNodes =
  42. [Node1 | _] = emqx_cth_cluster:start(
  43. Cluster,
  44. #{work_dir => ?config(priv_dir, Config)}
  45. ),
  46. ok = rpc:call(Node1, emqx_mgmt_api_test_util, init_suite, []),
  47. ok = take_auth_header_from(Node1),
  48. [{cluster_nodes, ClusterNodes} | Config].
  49. end_per_testcase(_Case, Config) ->
  50. Nodes = ?config(cluster_nodes, Config),
  51. _ = emqx_cth_cluster:stop(Nodes),
  52. ok.
  53. %%--------------------------------------------------------------------
  54. %% Tests
  55. %%--------------------------------------------------------------------
  56. t_start_evacuation_validation(Config) ->
  57. [DonorNode, RecipientNode] = ?config(cluster_nodes, Config),
  58. BadOpts = [
  59. #{conn_evict_rate => <<"conn">>},
  60. #{sess_evict_rate => <<"sess">>},
  61. #{wait_takeover => <<"wait">>},
  62. #{wait_health_check => <<"wait">>},
  63. #{migrate_to => []},
  64. #{migrate_to => <<"migrate_to">>},
  65. #{migrate_to => [<<"bad_node">>]},
  66. #{migrate_to => [<<"bad_node">>, atom_to_binary(DonorNode)]},
  67. #{unknown => <<"Value">>}
  68. ],
  69. lists:foreach(
  70. fun(Opts) ->
  71. ?assertMatch(
  72. {ok, 400, #{}},
  73. api_post(
  74. ["load_rebalance", atom_to_list(DonorNode), "evacuation", "start"],
  75. Opts
  76. ),
  77. Opts
  78. )
  79. end,
  80. BadOpts
  81. ),
  82. ?assertMatch(
  83. {ok, 404, #{}},
  84. api_post(
  85. ["load_rebalance", "bad@node", "evacuation", "start"],
  86. #{}
  87. )
  88. ),
  89. ?assertMatch(
  90. {ok, 200, #{}},
  91. api_post(
  92. ["load_rebalance", atom_to_list(DonorNode), "evacuation", "start"],
  93. #{
  94. conn_evict_rate => 10,
  95. sess_evict_rate => 10,
  96. wait_takeover => <<"10s">>,
  97. wait_health_check => <<"10s">>,
  98. redirect_to => <<"srv">>,
  99. migrate_to => [atom_to_binary(RecipientNode)]
  100. }
  101. )
  102. ),
  103. DonorNodeBin = atom_to_binary(DonorNode),
  104. ?assertMatch(
  105. {ok, 200, #{<<"evacuations">> := [#{<<"node">> := DonorNodeBin}]}},
  106. api_get(["load_rebalance", "global_status"])
  107. ).
  108. %% TODO: uncomment after we officially release the feature.
  109. skipped_t_start_purge_validation(Config) ->
  110. [Node1 | _] = ?config(cluster_nodes, Config),
  111. Port1 = get_mqtt_port(Node1, tcp),
  112. BadOpts = [
  113. #{purge_rate => <<"conn">>},
  114. #{purge_rate => 0},
  115. #{purge_rate => -1},
  116. #{purge_rate => 1.1},
  117. #{unknown => <<"Value">>}
  118. ],
  119. lists:foreach(
  120. fun(Opts) ->
  121. ?assertMatch(
  122. {ok, 400, #{}},
  123. api_post(
  124. ["load_rebalance", atom_to_list(Node1), "purge", "start"],
  125. Opts
  126. ),
  127. Opts
  128. )
  129. end,
  130. BadOpts
  131. ),
  132. ?assertMatch(
  133. {ok, 404, #{}},
  134. api_post(
  135. ["load_rebalance", "bad@node", "purge", "start"],
  136. #{}
  137. )
  138. ),
  139. process_flag(trap_exit, true),
  140. Conns = emqtt_connect_many(Port1, 100),
  141. ?assertMatch(
  142. {ok, 200, #{}},
  143. api_post(
  144. ["load_rebalance", atom_to_list(Node1), "purge", "start"],
  145. #{purge_rate => 10}
  146. )
  147. ),
  148. Node1Bin = atom_to_binary(Node1),
  149. ?assertMatch(
  150. {ok, 200, #{<<"purges">> := [#{<<"node">> := Node1Bin}]}},
  151. api_get(["load_rebalance", "global_status"])
  152. ),
  153. ?assertMatch(
  154. {ok, 200, #{
  155. <<"process">> := <<"purge">>,
  156. <<"purge_rate">> := 10,
  157. <<"session_goal">> := 0,
  158. <<"state">> := <<"purging">>,
  159. <<"stats">> :=
  160. #{
  161. <<"current_sessions">> := _,
  162. <<"initial_sessions">> := 100
  163. }
  164. }},
  165. api_get(["load_rebalance", "status"])
  166. ),
  167. ?assertMatch(
  168. {ok, 200, #{}},
  169. api_post(
  170. ["load_rebalance", atom_to_list(Node1), "purge", "stop"],
  171. #{}
  172. )
  173. ),
  174. ok = stop_many(Conns),
  175. ok.
  176. t_start_rebalance_validation(Config) ->
  177. process_flag(trap_exit, true),
  178. [DonorNode, RecipientNode] = ?config(cluster_nodes, Config),
  179. DonorPort = get_mqtt_port(DonorNode, tcp),
  180. BadOpts = [
  181. #{conn_evict_rate => <<"conn">>},
  182. #{sess_evict_rate => <<"sess">>},
  183. #{abs_conn_threshold => <<"act">>},
  184. #{rel_conn_threshold => <<"rct">>},
  185. #{abs_sess_threshold => <<"act">>},
  186. #{rel_sess_threshold => <<"rct">>},
  187. #{wait_takeover => <<"wait">>},
  188. #{wait_health_check => <<"wait">>},
  189. #{nodes => <<"nodes">>},
  190. #{nodes => []},
  191. #{nodes => [<<"bad_node">>]},
  192. #{nodes => [<<"bad_node">>, atom_to_binary(DonorNode)]},
  193. #{unknown => <<"Value">>}
  194. ],
  195. lists:foreach(
  196. fun(Opts) ->
  197. ?assertMatch(
  198. {ok, 400, #{}},
  199. api_post(
  200. ["load_rebalance", atom_to_list(DonorNode), "start"],
  201. Opts
  202. )
  203. )
  204. end,
  205. BadOpts
  206. ),
  207. ?assertMatch(
  208. {ok, 404, #{}},
  209. api_post(
  210. ["load_rebalance", "bad@node", "start"],
  211. #{}
  212. )
  213. ),
  214. Conns = emqtt_connect_many(DonorPort, 50),
  215. ?assertMatch(
  216. {ok, 200, #{}},
  217. api_post(
  218. ["load_rebalance", atom_to_list(DonorNode), "start"],
  219. #{
  220. conn_evict_rate => 10,
  221. sess_evict_rate => 10,
  222. wait_takeover => <<"10s">>,
  223. wait_health_check => <<"10s">>,
  224. abs_conn_threshold => 10,
  225. rel_conn_threshold => 1.001,
  226. abs_sess_threshold => 10,
  227. rel_sess_threshold => 1.001,
  228. nodes => [
  229. atom_to_binary(DonorNode),
  230. atom_to_binary(RecipientNode)
  231. ]
  232. }
  233. )
  234. ),
  235. DonorNodeBin = atom_to_binary(DonorNode),
  236. ?assertMatch(
  237. {ok, 200, #{<<"rebalances">> := [#{<<"node">> := DonorNodeBin}]}},
  238. api_get(["load_rebalance", "global_status"])
  239. ),
  240. ok = stop_many(Conns).
  241. t_start_stop_evacuation(Config) ->
  242. [DonorNode, RecipientNode] = ?config(cluster_nodes, Config),
  243. StartOpts = maps:merge(
  244. maps:get(evacuation, emqx_node_rebalance_api:rebalance_evacuation_example()),
  245. #{migrate_to => [atom_to_binary(RecipientNode)]}
  246. ),
  247. ?assertMatch(
  248. {ok, 200, #{}},
  249. api_post(
  250. ["load_rebalance", atom_to_list(DonorNode), "evacuation", "start"],
  251. StartOpts
  252. )
  253. ),
  254. StatusResponse = api_get(["load_rebalance", "status"]),
  255. ?assertMatch(
  256. {ok, 200, _},
  257. StatusResponse
  258. ),
  259. {ok, 200, Status} = StatusResponse,
  260. ?assertMatch(
  261. #{
  262. process := evacuation,
  263. connection_eviction_rate := 100,
  264. session_eviction_rate := 100,
  265. connection_goal := 0,
  266. session_goal := 0,
  267. stats := #{
  268. initial_connected := _,
  269. current_connected := _,
  270. initial_sessions := _,
  271. current_sessions := _
  272. }
  273. },
  274. emqx_node_rebalance_api:translate(local_status_enabled, Status)
  275. ),
  276. DonorNodeBin = atom_to_binary(DonorNode),
  277. GlobalStatusResponse = api_get(["load_rebalance", "global_status"]),
  278. ?assertMatch(
  279. {ok, 200, _},
  280. GlobalStatusResponse
  281. ),
  282. {ok, 200, GlobalStatus} = GlobalStatusResponse,
  283. ?assertMatch(
  284. #{
  285. rebalances := [],
  286. evacuations := [
  287. #{
  288. node := DonorNodeBin,
  289. connection_eviction_rate := 100,
  290. session_eviction_rate := 100,
  291. connection_goal := 0,
  292. session_goal := 0,
  293. stats := #{
  294. initial_connected := _,
  295. current_connected := _,
  296. initial_sessions := _,
  297. current_sessions := _
  298. }
  299. }
  300. ]
  301. },
  302. emqx_node_rebalance_api:translate(global_status, GlobalStatus)
  303. ),
  304. ?assertMatch(
  305. {ok, 200, #{}},
  306. api_post(
  307. ["load_rebalance", atom_to_list(DonorNode), "evacuation", "stop"],
  308. #{}
  309. )
  310. ),
  311. ?assertMatch(
  312. {ok, 200, #{<<"status">> := <<"disabled">>}},
  313. api_get(["load_rebalance", "status"])
  314. ),
  315. ?assertMatch(
  316. {ok, 200, #{<<"evacuations">> := [], <<"rebalances">> := []}},
  317. api_get(["load_rebalance", "global_status"])
  318. ).
  319. t_start_stop_rebalance(Config) ->
  320. process_flag(trap_exit, true),
  321. [DonorNode, RecipientNode] = ?config(cluster_nodes, Config),
  322. DonorPort = get_mqtt_port(DonorNode, tcp),
  323. ?assertMatch(
  324. {ok, 200, #{<<"status">> := <<"disabled">>}},
  325. api_get(["load_rebalance", "status"])
  326. ),
  327. Conns = emqtt_connect_many(DonorPort, 100),
  328. StartOpts = maps:without(
  329. [nodes],
  330. maps:get(rebalance, emqx_node_rebalance_api:rebalance_example())
  331. ),
  332. ?assertMatch(
  333. {ok, 200, #{}},
  334. api_post(
  335. ["load_rebalance", atom_to_list(DonorNode), "start"],
  336. StartOpts
  337. )
  338. ),
  339. StatusResponse = api_get(["load_rebalance", "status"]),
  340. ?assertMatch(
  341. {ok, 200, _},
  342. StatusResponse
  343. ),
  344. {ok, 200, Status} = StatusResponse,
  345. ?assertMatch(
  346. #{process := rebalance, connection_eviction_rate := 10, session_eviction_rate := 20},
  347. emqx_node_rebalance_api:translate(local_status_enabled, Status)
  348. ),
  349. DonorNodeBin = atom_to_binary(DonorNode),
  350. RecipientNodeBin = atom_to_binary(RecipientNode),
  351. GlobalStatusResponse = api_get(["load_rebalance", "global_status"]),
  352. ?assertMatch(
  353. {ok, 200, _},
  354. GlobalStatusResponse
  355. ),
  356. {ok, 200, GlobalStatus} = GlobalStatusResponse,
  357. ?assertMatch(
  358. {ok, 200, #{
  359. <<"evacuations">> := [],
  360. <<"rebalances">> :=
  361. [
  362. #{
  363. <<"state">> := _,
  364. <<"node">> := DonorNodeBin,
  365. <<"coordinator_node">> := _,
  366. <<"connection_eviction_rate">> := 10,
  367. <<"session_eviction_rate">> := 20,
  368. <<"donors">> := [DonorNodeBin],
  369. <<"recipients">> := [RecipientNodeBin]
  370. }
  371. ]
  372. }},
  373. GlobalStatusResponse
  374. ),
  375. ?assertMatch(
  376. #{
  377. evacuations := [],
  378. rebalances := [
  379. #{
  380. state := _,
  381. node := DonorNodeBin,
  382. coordinator_node := _,
  383. connection_eviction_rate := 10,
  384. session_eviction_rate := 20,
  385. donors := [DonorNodeBin],
  386. recipients := [RecipientNodeBin]
  387. }
  388. ]
  389. },
  390. emqx_node_rebalance_api:translate(global_status, GlobalStatus)
  391. ),
  392. ?assertMatch(
  393. {ok, 200, #{}},
  394. api_post(
  395. ["load_rebalance", atom_to_list(DonorNode), "stop"],
  396. #{}
  397. )
  398. ),
  399. ?assertMatch(
  400. {ok, 200, #{<<"status">> := <<"disabled">>}},
  401. api_get(["load_rebalance", "status"])
  402. ),
  403. ?assertMatch(
  404. {ok, 200, #{<<"evacuations">> := [], <<"rebalances">> := []}},
  405. api_get(["load_rebalance", "global_status"])
  406. ),
  407. ok = stop_many(Conns).
  408. t_availability_check(Config) ->
  409. [DonorNode | _] = ?config(cluster_nodes, Config),
  410. ?assertMatch(
  411. {ok, _},
  412. api_get_noauth(["load_rebalance", "availability_check"])
  413. ),
  414. ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [#{}]),
  415. ?assertMatch(
  416. {error, {_, 503, _}},
  417. api_get_noauth(["load_rebalance", "availability_check"])
  418. ),
  419. ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, stop, []),
  420. ?assertMatch(
  421. {ok, _},
  422. api_get_noauth(["load_rebalance", "availability_check"])
  423. ).
  424. %%--------------------------------------------------------------------
  425. %% Helpers
  426. %%--------------------------------------------------------------------
  427. api_get_noauth(Path) ->
  428. request_api(get, uri(Path), emqx_common_test_http:auth_header("invalid", "password")).
  429. api_get(Path) ->
  430. case request(get, uri(Path)) of
  431. {ok, Code, ResponseBody} ->
  432. {ok, Code, jiffy:decode(ResponseBody, [return_maps])};
  433. {error, _} = Error ->
  434. Error
  435. end.
  436. api_post(Path, Data) ->
  437. case request(post, uri(Path), Data) of
  438. {ok, Code, ResponseBody} ->
  439. Res =
  440. case emqx_utils_json:safe_decode(ResponseBody, [return_maps]) of
  441. {ok, Decoded} -> Decoded;
  442. {error, _} -> ResponseBody
  443. end,
  444. {ok, Code, Res};
  445. {error, _} = Error ->
  446. Error
  447. end.
  448. take_auth_header_from(Node) ->
  449. meck:new(emqx_common_test_http, [passthrough]),
  450. meck:expect(
  451. emqx_common_test_http,
  452. default_auth_header,
  453. fun() -> rpc:call(Node, emqx_common_test_http, default_auth_header, []) end
  454. ),
  455. ok.
  456. case_specific_data_dir(Case, Config) ->
  457. case ?config(priv_dir, Config) of
  458. undefined -> undefined;
  459. PrivDir -> filename:join(PrivDir, atom_to_list(Case))
  460. end.
  461. app_specs() ->
  462. [
  463. {emqx, #{
  464. before_start => fun() ->
  465. emqx_app:set_config_loader(?MODULE)
  466. end,
  467. override_env => [{boot_modules, [broker, listeners]}]
  468. }},
  469. {emqx_retainer, #{
  470. config =>
  471. #{
  472. retainer =>
  473. #{enable => true}
  474. }
  475. }},
  476. emqx_node_rebalance
  477. ].
  478. get_mqtt_port(Node, Type) ->
  479. {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
  480. Port.