emqx_node_rebalance_cli_SUITE.erl 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%%--------------------------------------------------------------------
  4. -module(emqx_node_rebalance_cli_SUITE).
  5. -compile(export_all).
  6. -compile(nowarn_export_all).
  7. -include_lib("eunit/include/eunit.hrl").
  8. -include_lib("common_test/include/ct.hrl").
  9. -import(
  10. emqx_eviction_agent_test_helpers,
  11. [emqtt_connect_many/2, stop_many/1, case_specific_node_name/3]
  12. ).
  13. -define(START_APPS, [emqx, emqx_node_rebalance]).
  14. all() ->
  15. emqx_common_test_helpers:all(?MODULE).
  16. init_per_suite(Config) ->
  17. Apps = emqx_cth_suite:start(?START_APPS, #{
  18. work_dir => ?config(priv_dir, Config)
  19. }),
  20. [{apps, Apps} | Config].
  21. end_per_suite(Config) ->
  22. emqx_cth_suite:stop(?config(apps, Config)).
  23. init_per_testcase(Case = t_rebalance, Config) ->
  24. _ = emqx_node_rebalance_evacuation:stop(),
  25. Nodes =
  26. [Node1 | _] =
  27. [
  28. case_specific_node_name(?MODULE, Case, '_1'),
  29. case_specific_node_name(?MODULE, Case, '_2')
  30. ],
  31. Spec = #{
  32. role => core,
  33. join_to => emqx_cth_cluster:node_name(Node1),
  34. listeners => true,
  35. apps => ?START_APPS
  36. },
  37. Cluster = [{Node, Spec} || Node <- Nodes],
  38. ClusterNodes = emqx_cth_cluster:start(
  39. Cluster,
  40. #{work_dir => emqx_cth_suite:work_dir(Case, Config)}
  41. ),
  42. [{cluster_nodes, ClusterNodes} | Config];
  43. init_per_testcase(_Case, Config) ->
  44. _ = emqx_node_rebalance_evacuation:stop(),
  45. _ = emqx_node_rebalance:stop(),
  46. Config.
  47. end_per_testcase(t_rebalance, Config) ->
  48. _ = emqx_node_rebalance_evacuation:stop(),
  49. _ = emqx_node_rebalance:stop(),
  50. _ = emqx_cth_cluster:stop(?config(cluster_nodes, Config));
  51. end_per_testcase(_Case, _Config) ->
  52. _ = emqx_node_rebalance_evacuation:stop(),
  53. _ = emqx_node_rebalance:stop().
  54. %%--------------------------------------------------------------------
  55. %% Tests
  56. %%--------------------------------------------------------------------
  57. t_evacuation(_Config) ->
  58. %% usage
  59. ok = emqx_node_rebalance_cli:cli(["foobar"]),
  60. %% status
  61. ok = emqx_node_rebalance_cli:cli(["status"]),
  62. ok = emqx_node_rebalance_cli:cli(["node-status"]),
  63. ok = emqx_node_rebalance_cli:cli(["node-status", atom_to_list(node())]),
  64. %% start with invalid args
  65. ?assertNot(
  66. emqx_node_rebalance_cli:cli(["start", "--evacuation", "--foo-bar"])
  67. ),
  68. ?assertNot(
  69. emqx_node_rebalance_cli:cli(["start", "--evacuation", "--conn-evict-rate", "foobar"])
  70. ),
  71. ?assertNot(
  72. emqx_node_rebalance_cli:cli(["start", "--evacuation", "--sess-evict-rate", "foobar"])
  73. ),
  74. ?assertNot(
  75. emqx_node_rebalance_cli:cli(["start", "--evacuation", "--wait-takeover", "foobar"])
  76. ),
  77. ?assertNot(
  78. emqx_node_rebalance_cli:cli([
  79. "start",
  80. "--evacuation",
  81. "--migrate-to",
  82. "nonexistent@node"
  83. ])
  84. ),
  85. ?assertNot(
  86. emqx_node_rebalance_cli:cli([
  87. "start",
  88. "--evacuation",
  89. "--migrate-to",
  90. ""
  91. ])
  92. ),
  93. ?assertNot(
  94. emqx_node_rebalance_cli:cli([
  95. "start",
  96. "--evacuation",
  97. "--unknown-arg"
  98. ])
  99. ),
  100. ?assert(
  101. emqx_node_rebalance_cli:cli([
  102. "start",
  103. "--evacuation",
  104. "--conn-evict-rate",
  105. "10",
  106. "--sess-evict-rate",
  107. "10",
  108. "--wait-takeover",
  109. "10",
  110. "--migrate-to",
  111. atom_to_list(node()),
  112. "--redirect-to",
  113. "srv"
  114. ])
  115. ),
  116. %% status
  117. ok = emqx_node_rebalance_cli:cli(["status"]),
  118. ok = emqx_node_rebalance_cli:cli(["node-status"]),
  119. ok = emqx_node_rebalance_cli:cli(["node-status", atom_to_list(node())]),
  120. ?assertMatch(
  121. {enabled, #{}},
  122. emqx_node_rebalance_evacuation:status()
  123. ),
  124. %% already enabled
  125. ?assertNot(
  126. emqx_node_rebalance_cli:cli([
  127. "start",
  128. "--evacuation",
  129. "--conn-evict-rate",
  130. "10",
  131. "--redirect-to",
  132. "srv"
  133. ])
  134. ),
  135. %% stop
  136. true = emqx_node_rebalance_cli:cli(["stop"]),
  137. false = emqx_node_rebalance_cli:cli(["stop"]),
  138. ?assertEqual(
  139. disabled,
  140. emqx_node_rebalance_evacuation:status()
  141. ).
  142. t_purge(_Config) ->
  143. process_flag(trap_exit, true),
  144. %% start with invalid args
  145. ?assertNot(
  146. emqx_node_rebalance_cli:cli(["start", "--purge", "--foo-bar"])
  147. ),
  148. ?assertNot(
  149. emqx_node_rebalance_cli:cli(["start", "--purge", "--purge-rate", "foobar"])
  150. ),
  151. %% not used by this scenario
  152. ?assertNot(
  153. emqx_node_rebalance_cli:cli(["start", "--purge", "--conn-evict-rate", "1"])
  154. ),
  155. ?assertNot(
  156. emqx_node_rebalance_cli:cli(["start", "--purge", "--sess-evict-rate", "1"])
  157. ),
  158. ?assertNot(
  159. emqx_node_rebalance_cli:cli(["start", "--purge", "--wait-takeover", "1"])
  160. ),
  161. ?assertNot(
  162. emqx_node_rebalance_cli:cli([
  163. "start",
  164. "--purge",
  165. "--migrate-to",
  166. atom_to_list(node())
  167. ])
  168. ),
  169. Conns = emqtt_connect_many(get_mqtt_port(node(), tcp), 100),
  170. ?assert(
  171. emqx_node_rebalance_cli:cli([
  172. "start",
  173. "--purge",
  174. "--purge-rate",
  175. "10"
  176. ])
  177. ),
  178. %% status
  179. ok = emqx_node_rebalance_cli:cli(["status"]),
  180. ok = emqx_node_rebalance_cli:cli(["node-status"]),
  181. ok = emqx_node_rebalance_cli:cli(["node-status", atom_to_list(node())]),
  182. ?assertMatch(
  183. {enabled, #{}},
  184. emqx_node_rebalance_purge:status()
  185. ),
  186. %% already enabled
  187. ?assertNot(
  188. emqx_node_rebalance_cli:cli([
  189. "start",
  190. "--purge",
  191. "--purge-rate",
  192. "10"
  193. ])
  194. ),
  195. %% stop
  196. true = emqx_node_rebalance_cli:cli(["stop"]),
  197. %% stop when not started
  198. false = emqx_node_rebalance_cli:cli(["stop"]),
  199. ?assertEqual(
  200. disabled,
  201. emqx_node_rebalance_purge:status()
  202. ),
  203. ok = stop_many(Conns).
  204. t_rebalance(Config) ->
  205. process_flag(trap_exit, true),
  206. [DonorNode, RecipientNode] = ?config(cluster_nodes, Config),
  207. DonorPort = get_mqtt_port(DonorNode, tcp),
  208. %% start with invalid args
  209. ?assertNot(
  210. emqx_node_rebalance_cli(DonorNode, ["start", "--foo-bar"])
  211. ),
  212. ?assertNot(
  213. emqx_node_rebalance_cli(DonorNode, ["start", "--conn-evict-rate", "foobar"])
  214. ),
  215. ?assertNot(
  216. emqx_node_rebalance_cli(DonorNode, ["start", "--abs-conn-threshold", "foobar"])
  217. ),
  218. ?assertNot(
  219. emqx_node_rebalance_cli(DonorNode, ["start", "--rel-conn-threshold", "foobar"])
  220. ),
  221. ?assertNot(
  222. emqx_node_rebalance_cli(DonorNode, ["start", "--sess-evict-rate", "foobar"])
  223. ),
  224. ?assertNot(
  225. emqx_node_rebalance_cli(DonorNode, ["start", "--abs-sess-threshold", "foobar"])
  226. ),
  227. ?assertNot(
  228. emqx_node_rebalance_cli(DonorNode, ["start", "--rel-sess-threshold", "foobar"])
  229. ),
  230. ?assertNot(
  231. emqx_node_rebalance_cli(DonorNode, ["start", "--wait-takeover", "foobar"])
  232. ),
  233. ?assertNot(
  234. emqx_node_rebalance_cli(DonorNode, ["start", "--wait-health-check", "foobar"])
  235. ),
  236. ?assertNot(
  237. emqx_node_rebalance_cli(DonorNode, [
  238. "start",
  239. "--nodes",
  240. "nonexistent@node"
  241. ])
  242. ),
  243. ?assertNot(
  244. emqx_node_rebalance_cli(DonorNode, [
  245. "start",
  246. "--nodes",
  247. ""
  248. ])
  249. ),
  250. ?assertNot(
  251. emqx_node_rebalance_cli(DonorNode, [
  252. "start",
  253. "--nodes",
  254. atom_to_list(RecipientNode)
  255. ])
  256. ),
  257. ?assertNot(
  258. emqx_node_rebalance_cli(DonorNode, [
  259. "start",
  260. "--unknown-arg"
  261. ])
  262. ),
  263. Conns = emqtt_connect_many(DonorPort, 20),
  264. ?assert(
  265. emqx_node_rebalance_cli(DonorNode, [
  266. "start",
  267. "--conn-evict-rate",
  268. "10",
  269. "--abs-conn-threshold",
  270. "10",
  271. "--rel-conn-threshold",
  272. "1.1",
  273. "--sess-evict-rate",
  274. "10",
  275. "--abs-sess-threshold",
  276. "10",
  277. "--rel-sess-threshold",
  278. "1.1",
  279. "--wait-takeover",
  280. "10",
  281. "--nodes",
  282. atom_to_list(DonorNode) ++ "," ++
  283. atom_to_list(RecipientNode)
  284. ])
  285. ),
  286. %% status
  287. ok = emqx_node_rebalance_cli(DonorNode, ["status"]),
  288. ok = emqx_node_rebalance_cli(DonorNode, ["node-status"]),
  289. ok = emqx_node_rebalance_cli(DonorNode, ["node-status", atom_to_list(DonorNode)]),
  290. ?assertMatch(
  291. {enabled, #{}},
  292. rpc:call(DonorNode, emqx_node_rebalance, status, [])
  293. ),
  294. %% already enabled
  295. ?assertNot(
  296. emqx_node_rebalance_cli(DonorNode, ["start"])
  297. ),
  298. %% stop
  299. true = emqx_node_rebalance_cli(DonorNode, ["stop"]),
  300. false = emqx_node_rebalance_cli(DonorNode, ["stop"]),
  301. ?assertEqual(
  302. disabled,
  303. rpc:call(DonorNode, emqx_node_rebalance, status, [])
  304. ),
  305. ok = stop_many(Conns).
  306. %%--------------------------------------------------------------------
  307. %% Helpers
  308. %%--------------------------------------------------------------------
  309. emqx_node_rebalance_cli(Node, Args) ->
  310. case rpc:call(Node, emqx_node_rebalance_cli, cli, [Args]) of
  311. {badrpc, Reason} ->
  312. error(Reason);
  313. Result ->
  314. Result
  315. end.
  316. get_mqtt_port(Node, Type) ->
  317. {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
  318. Port.