emqx_cth_cluster.erl 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% @doc Common Test Helper / Running tests in a cluster
  17. %%
  18. %% This module allows setting up and tearing down clusters of EMQX nodes with
  19. %% the purpose of running integration tests in a distributed environment, but
  20. %% with the same isolation measures that `emqx_cth_suite` provides.
  21. %%
  22. %% Additionally to what `emqx_cth_suite` does with respect to isolation, each
  23. %% node in the cluster is started with a separate, unique working directory.
  24. %%
  25. %% What should be started on each node is defined by the same appspecs that are
  26. %% used by `emqx_cth_suite` to start applications on the CT node. However, there
  27. %% are additional set of defaults applied to appspecs to make sure that the
  28. %% cluster is started in a consistent, interconnected state, with no conflicts
  29. %% between applications.
  30. %%
  31. %% Most of the time, you just need to:
  32. %% 1. Describe the cluster with one or more _nodespecs_.
  33. %% 2. Call `emqx_cth_cluster:start/2` before the testrun (e.g. in `init_per_suite/1`
  34. %% or `init_per_group/2`), providing unique work dir (e.g.
  35. %% `emqx_cth_suite:work_dir/1`). Save the result in a context.
  36. %% 3. Call `emqx_cth_cluster:stop/1` after the testrun concludes (e.g.
  37. %% in `end_per_suite/1` or `end_per_group/2`) with the result from step 2.
  38. -module(emqx_cth_cluster).
  39. -export([start/1, start/2, restart/1, restart/2]).
  40. -export([stop/1, stop_node/1]).
  41. -export([start_bare_nodes/1, start_bare_nodes/2]).
  42. -export([share_load_module/2]).
  43. -export([node_name/1, mk_nodespecs/2]).
  44. -export([start_apps/2]).
  45. -define(APPS_CLUSTERING, [gen_rpc, mria, ekka]).
  46. -define(TIMEOUT_NODE_START_MS, 15000).
  47. -define(TIMEOUT_APPS_START_MS, 30000).
  48. -define(TIMEOUT_NODE_STOP_S, 15).
  49. -define(TIMEOUT_CLUSTER_WAIT_MS, timer:seconds(10)).
  50. %%
  51. -type nodespec() :: {_ShortName :: atom(), #{
  52. % DB Role
  53. % Default: `core`
  54. role => core | replicant,
  55. % DB Backend
  56. % Default: `mnesia` if there are no replicants in cluster, otherwise `rlog`
  57. %
  58. % NOTE
  59. % Default are chosen with the intention of lowering the chance of observing
  60. % inconsistencies due to data races (i.e. missing mria shards on nodes where some
  61. % application hasn't been started yet).
  62. db_backend => mnesia | rlog,
  63. % Applications to start on the node
  64. % Default: only applications needed for clustering are started
  65. %
  66. % NOTES
  67. % 1. Apps needed for clustering started unconditionally.
  68. % * It's not possible to redefine their startup order.
  69. % * It's possible to add `{ekka, #{start => false}}` appspec though.
  70. % 2. There are defaults applied to some appspecs if they present.
  71. % * We try to keep `emqx_conf` config consistent with default configuration of
  72. % clustering applications.
  73. apps => [emqx_cth_suite:appspec()],
  74. base_port => inet:port_number(),
  75. % Node to join to in clustering phase
  76. % If set to `undefined` this node won't try to join the cluster
  77. % Default: no (first core node is used to join to by default)
  78. join_to => node() | undefined,
  79. %% Working directory
  80. %% If this directory is not empty, starting up the node applications will fail
  81. %% Default: "${ClusterOpts.work_dir}/${nodename}"
  82. work_dir => file:name()
  83. }}.
  84. -spec start([nodespec()], ClusterOpts) ->
  85. [node()]
  86. when
  87. ClusterOpts :: #{
  88. %% Working directory
  89. %% Everything a test produces should go here. Each node's stuff should go in its
  90. %% own directory.
  91. work_dir := file:name()
  92. }.
  93. start(Nodes, ClusterOpts) ->
  94. NodeSpecs = mk_nodespecs(Nodes, ClusterOpts),
  95. start(NodeSpecs).
  96. start(NodeSpecs) ->
  97. ct:pal("(Re)starting nodes:\n ~p", [NodeSpecs]),
  98. % 1. Start bare nodes with only basic applications running
  99. ok = start_nodes_init(NodeSpecs, ?TIMEOUT_NODE_START_MS),
  100. % 2. Start applications needed to enable clustering
  101. % Generally, this causes some applications to restart, but we deliberately don't
  102. % start them yet.
  103. ShouldAppearInRunningNodes = lists:map(fun run_node_phase_cluster/1, NodeSpecs),
  104. IsClustered = lists:member(true, ShouldAppearInRunningNodes),
  105. % 3. Start applications after cluster is formed
  106. % Cluster-joins are complete, so they shouldn't restart in the background anymore.
  107. _ = emqx_utils:pmap(fun run_node_phase_apps/1, NodeSpecs, ?TIMEOUT_APPS_START_MS),
  108. Nodes = [Node || #{name := Node} <- NodeSpecs],
  109. %% 4. Wait for the nodes to cluster
  110. case IsClustered of
  111. true ->
  112. ok = wait_clustered(Nodes, ?TIMEOUT_CLUSTER_WAIT_MS);
  113. false ->
  114. ok
  115. end,
  116. Nodes.
  117. %% Wait until all nodes see all nodes as mria running nodes
  118. wait_clustered(Nodes, Timeout) ->
  119. Check = fun(Node) ->
  120. Running = erpc:call(Node, mria, running_nodes, []),
  121. case Nodes -- Running of
  122. [] ->
  123. true;
  124. NotRunning ->
  125. {false, NotRunning}
  126. end
  127. end,
  128. wait_clustered(Nodes, Check, deadline(Timeout)).
  129. wait_clustered([], _Check, _Deadline) ->
  130. ok;
  131. wait_clustered([Node | Nodes] = All, Check, Deadline) ->
  132. IsOverdue = is_overdue(Deadline),
  133. case Check(Node) of
  134. true ->
  135. wait_clustered(Nodes, Check, Deadline);
  136. {false, NodesNotRunnging} when IsOverdue ->
  137. error(
  138. {timeout, #{
  139. checking_from_node => Node,
  140. nodes_not_running => NodesNotRunnging
  141. }}
  142. );
  143. {false, Nodes} ->
  144. timer:sleep(100),
  145. wait_clustered(All, Check, Deadline)
  146. end.
  147. restart(NodeSpec) ->
  148. restart(maps:get(name, NodeSpec), NodeSpec).
  149. restart(Node, Spec) ->
  150. ct:pal("Stopping peer node ~p", [Node]),
  151. ok = emqx_cth_peer:stop(Node),
  152. start([Spec#{boot_type => restart}]).
  153. mk_nodespecs(Nodes, ClusterOpts) ->
  154. NodeSpecs = lists:zipwith(
  155. fun(N, {Name, Opts}) -> mk_init_nodespec(N, Name, Opts, ClusterOpts) end,
  156. lists:seq(1, length(Nodes)),
  157. Nodes
  158. ),
  159. CoreNodes = [Node || #{name := Node, role := core} <- NodeSpecs],
  160. Backend =
  161. case length(CoreNodes) of
  162. L when L == length(NodeSpecs) ->
  163. mnesia;
  164. _ ->
  165. rlog
  166. end,
  167. lists:map(
  168. fun(Spec0) ->
  169. Spec1 = maps:merge(#{core_nodes => CoreNodes, db_backend => Backend}, Spec0),
  170. Spec2 = merge_default_appspecs(Spec1, NodeSpecs),
  171. Spec3 = merge_clustering_appspecs(Spec2, NodeSpecs),
  172. Spec3
  173. end,
  174. NodeSpecs
  175. ).
  176. mk_init_nodespec(N, Name, NodeOpts, ClusterOpts) ->
  177. Node = node_name(Name),
  178. BasePort = base_port(N),
  179. WorkDir = maps:get(work_dir, ClusterOpts),
  180. Defaults = #{
  181. name => Node,
  182. role => core,
  183. apps => [],
  184. base_port => BasePort,
  185. work_dir => filename:join([WorkDir, Node])
  186. },
  187. maps:merge(Defaults, NodeOpts).
  188. merge_default_appspecs(#{apps := Apps} = Spec, NodeSpecs) ->
  189. Spec#{apps => [mk_node_appspec(App, Spec, NodeSpecs) || App <- Apps]}.
  190. merge_clustering_appspecs(#{apps := Apps} = Spec, NodeSpecs) ->
  191. AppsClustering = lists:map(
  192. fun(App) ->
  193. case lists:keyfind(App, 1, Apps) of
  194. AppSpec = {App, _} ->
  195. AppSpec;
  196. false ->
  197. {App, default_appspec(App, Spec, NodeSpecs)}
  198. end
  199. end,
  200. ?APPS_CLUSTERING
  201. ),
  202. AppsRest = [AppSpec || AppSpec = {App, _} <- Apps, not lists:member(App, ?APPS_CLUSTERING)],
  203. Spec#{apps => AppsClustering ++ AppsRest}.
  204. mk_node_appspec({App, Opts}, Spec, NodeSpecs) ->
  205. {App, emqx_cth_suite:merge_appspec(default_appspec(App, Spec, NodeSpecs), Opts)};
  206. mk_node_appspec(App, Spec, NodeSpecs) ->
  207. {App, default_appspec(App, Spec, NodeSpecs)}.
  208. default_appspec(gen_rpc, #{name := Node}, NodeSpecs) ->
  209. NodePorts = lists:foldl(
  210. fun(#{name := CNode, base_port := Port}, Acc) ->
  211. Acc#{CNode => {tcp, gen_rpc_port(Port)}}
  212. end,
  213. #{},
  214. NodeSpecs
  215. ),
  216. {tcp, Port} = maps:get(Node, NodePorts),
  217. #{
  218. override_env => [
  219. % NOTE
  220. % This is needed to make sure `gen_rpc` peers will find each other.
  221. {port_discovery, manual},
  222. {tcp_server_port, Port},
  223. {client_config_per_node, {internal, NodePorts}}
  224. ]
  225. };
  226. default_appspec(mria, #{role := Role, db_backend := Backend}, _NodeSpecs) ->
  227. #{
  228. override_env => [
  229. {node_role, Role},
  230. {db_backend, Backend}
  231. ]
  232. };
  233. default_appspec(ekka, Spec, _NodeSpecs) ->
  234. Overrides =
  235. case get_cluster_seeds(Spec) of
  236. [_ | _] = Seeds ->
  237. % NOTE
  238. % Presumably, this is needed for replicants to find core nodes.
  239. [{cluster_discovery, {static, [{seeds, Seeds}]}}];
  240. [] ->
  241. []
  242. end,
  243. #{
  244. override_env => Overrides
  245. };
  246. default_appspec(emqx_conf, Spec, _NodeSpecs) ->
  247. % NOTE
  248. % This usually sets up a lot of `gen_rpc` / `mria` / `ekka` application envs in
  249. % `emqx_config:init_load/2` during configuration mapping, so we need to keep them
  250. % in sync with the values we set up here.
  251. #{
  252. name := Node,
  253. role := Role,
  254. db_backend := Backend,
  255. base_port := BasePort,
  256. work_dir := WorkDir
  257. } = Spec,
  258. Cluster =
  259. case get_cluster_seeds(Spec) of
  260. [_ | _] = Seeds ->
  261. % NOTE
  262. % Presumably, this is needed for replicants to find core nodes.
  263. #{discovery_strategy => static, static => #{seeds => Seeds}};
  264. [] ->
  265. #{}
  266. end,
  267. #{
  268. config => #{
  269. node => #{
  270. name => Node,
  271. role => Role,
  272. cookie => erlang:get_cookie(),
  273. % TODO: will it be synced to the same value eventually?
  274. data_dir => unicode:characters_to_binary(WorkDir),
  275. db_backend => Backend
  276. },
  277. cluster => Cluster,
  278. rpc => #{
  279. % NOTE
  280. % This (along with `gen_rpc` env overrides) is needed to make sure `gen_rpc`
  281. % peers will find each other.
  282. protocol => tcp,
  283. tcp_server_port => gen_rpc_port(BasePort),
  284. port_discovery => manual
  285. },
  286. listeners => allocate_listener_ports([tcp, ssl, ws, wss], Spec)
  287. }
  288. };
  289. default_appspec(emqx, Spec, _NodeSpecs) ->
  290. #{config => #{listeners => allocate_listener_ports([tcp, ssl, ws, wss], Spec)}};
  291. default_appspec(_App, _, _) ->
  292. #{}.
  293. get_cluster_seeds(#{join_to := undefined}) ->
  294. [];
  295. get_cluster_seeds(#{join_to := Node}) ->
  296. [Node];
  297. get_cluster_seeds(#{core_nodes := CoreNodes}) ->
  298. CoreNodes.
  299. allocate_listener_port(Type, #{base_port := BasePort}) ->
  300. Port = listener_port(BasePort, Type),
  301. #{Type => #{default => #{bind => format("127.0.0.1:~p", [Port])}}}.
  302. allocate_listener_ports(Types, Spec) ->
  303. lists:foldl(fun maps:merge/2, #{}, [allocate_listener_port(Type, Spec) || Type <- Types]).
  304. start_nodes_init(Specs, Timeout) ->
  305. Names = lists:map(fun(#{name := Name}) -> Name end, Specs),
  306. _Nodes = start_bare_nodes(Names, Timeout),
  307. lists:foreach(fun node_init/1, Specs).
  308. start_bare_nodes(Names) ->
  309. start_bare_nodes(Names, ?TIMEOUT_NODE_START_MS).
  310. start_bare_nodes(Names, Timeout) ->
  311. Args = erl_flags(),
  312. Envs = [],
  313. Waits = lists:map(
  314. fun(Name) ->
  315. WaitTag = {boot_complete, Name},
  316. WaitBoot = {self(), WaitTag},
  317. {ok, _} = emqx_cth_peer:start(Name, Args, Envs, WaitBoot),
  318. WaitTag
  319. end,
  320. Names
  321. ),
  322. Deadline = deadline(Timeout),
  323. Nodes = wait_boot_complete(Waits, Deadline),
  324. lists:foreach(fun(Node) -> pong = net_adm:ping(Node) end, Nodes),
  325. Nodes.
  326. deadline(Timeout) ->
  327. erlang:monotonic_time() + erlang:convert_time_unit(Timeout, millisecond, native).
  328. is_overdue(Deadline) ->
  329. erlang:monotonic_time() > Deadline.
  330. wait_boot_complete([], _) ->
  331. [];
  332. wait_boot_complete(Waits, Deadline) ->
  333. case is_overdue(Deadline) of
  334. true ->
  335. error({timeout, Waits});
  336. false ->
  337. ok
  338. end,
  339. receive
  340. {{boot_complete, _Name} = Wait, {started, Node, _Pid}} ->
  341. ct:pal("~p", [Wait]),
  342. [Node | wait_boot_complete(Waits -- [Wait], Deadline)];
  343. {{boot_complete, _Name}, Otherwise} ->
  344. error({unexpected, Otherwise})
  345. after 100 ->
  346. wait_boot_complete(Waits, Deadline)
  347. end.
  348. node_init(#{name := Node, work_dir := WorkDir}) ->
  349. %% Create exclusive current directory for the node. Some configurations, like plugin
  350. %% installation directory, are the same for the whole cluster, and nodes on the same
  351. %% machine will step on each other's toes...
  352. ok = filelib:ensure_path(WorkDir),
  353. ok = erpc:call(Node, file, set_cwd, [WorkDir]),
  354. %% Make it possible to call `ct:pal` and friends (if running under rebar3)
  355. _ = share_load_module(Node, cthr),
  356. %% Enable snabbkaffe trace forwarding
  357. ok = snabbkaffe:forward_trace(Node),
  358. when_cover_enabled(fun() -> {ok, _} = cover:start([Node]) end),
  359. ok.
  360. %% Returns 'true' if this node should appear in running nodes list.
  361. run_node_phase_cluster(Spec = #{name := Node}) ->
  362. ok = load_apps(Node, Spec),
  363. ok = start_apps_clustering(Node, Spec),
  364. maybe_join_cluster(Node, Spec).
  365. run_node_phase_apps(Spec = #{name := Node}) ->
  366. ok = start_apps(Node, Spec),
  367. ok.
  368. load_apps(Node, #{apps := Apps}) ->
  369. erpc:call(Node, emqx_cth_suite, load_apps, [Apps]).
  370. start_apps_clustering(Node, #{apps := Apps} = Spec) ->
  371. SuiteOpts = suite_opts(Spec),
  372. AppsClustering = [lists:keyfind(App, 1, Apps) || App <- ?APPS_CLUSTERING],
  373. _Started = erpc:call(Node, emqx_cth_suite, start, [AppsClustering, SuiteOpts]),
  374. ok.
  375. start_apps(Node, #{apps := Apps} = Spec) ->
  376. SuiteOpts = suite_opts(Spec),
  377. AppsRest = [AppSpec || AppSpec = {App, _} <- Apps, not lists:member(App, ?APPS_CLUSTERING)],
  378. _Started = erpc:call(Node, emqx_cth_suite, start_apps, [AppsRest, SuiteOpts]),
  379. ok.
  380. suite_opts(Spec) ->
  381. maps:with([work_dir, boot_type], Spec).
  382. %% Returns 'true' if this node should appear in the cluster.
  383. maybe_join_cluster(_Node, #{boot_type := restart}) ->
  384. %% when restart, the node should already be in the cluster
  385. %% hence no need to (re)join
  386. true;
  387. maybe_join_cluster(_Node, #{role := replicant}) ->
  388. true;
  389. maybe_join_cluster(Node, Spec) ->
  390. case get_cluster_seeds(Spec) of
  391. [JoinTo | _] ->
  392. ok = join_cluster(Node, JoinTo),
  393. true;
  394. [] ->
  395. false
  396. end.
  397. join_cluster(Node, JoinTo) ->
  398. case erpc:call(Node, ekka, join, [JoinTo]) of
  399. ok ->
  400. ok;
  401. ignore ->
  402. ok;
  403. Error ->
  404. error({failed_to_join_cluster, #{node => Node, error => Error}})
  405. end.
  406. %%
  407. stop(Nodes) ->
  408. _ = emqx_utils:pmap(fun stop_node/1, Nodes, ?TIMEOUT_NODE_STOP_S * 1000),
  409. ok.
  410. stop_node(Name) ->
  411. Node = node_name(Name),
  412. when_cover_enabled(fun() -> cover:flush([Node]) end),
  413. ok = emqx_cth_peer:stop(Node).
  414. %% Ports
  415. base_port(Number) ->
  416. 10000 + Number * 100.
  417. gen_rpc_port(BasePort) ->
  418. BasePort - 1.
  419. listener_port(BasePort, tcp) ->
  420. BasePort;
  421. listener_port(BasePort, ssl) ->
  422. BasePort + 1;
  423. listener_port(BasePort, quic) ->
  424. BasePort + 2;
  425. listener_port(BasePort, ws) ->
  426. BasePort + 3;
  427. listener_port(BasePort, wss) ->
  428. BasePort + 4.
  429. %%
  430. erl_flags() ->
  431. %% One core
  432. ["+S", "1:1"] ++ ebin_path().
  433. ebin_path() ->
  434. ["-pa" | lists:filter(fun is_lib/1, code:get_path())].
  435. is_lib(Path) ->
  436. string:prefix(Path, code:lib_dir()) =:= nomatch andalso
  437. string:str(Path, "_build/default/plugins") =:= 0.
  438. share_load_module(Node, Module) ->
  439. case code:get_object_code(Module) of
  440. {Module, Code, Filename} ->
  441. {module, Module} = erpc:call(Node, code, load_binary, [Module, Filename, Code]),
  442. ok;
  443. error ->
  444. error
  445. end.
  446. -spec node_name(atom()) -> node().
  447. node_name(Name) ->
  448. case string:tokens(atom_to_list(Name), "@") of
  449. [_Name, _Host] ->
  450. %% the name already has a @
  451. Name;
  452. _ ->
  453. list_to_atom(atom_to_list(Name) ++ "@" ++ host())
  454. end.
  455. host() ->
  456. [_, Host] = string:tokens(atom_to_list(node()), "@"),
  457. Host.
  458. %%
  459. format(Format, Args) ->
  460. unicode:characters_to_binary(io_lib:format(Format, Args)).
  461. is_cover_enabled() ->
  462. case os:getenv("ENABLE_COVER_COMPILE") of
  463. "1" -> true;
  464. "true" -> true;
  465. _ -> false
  466. end.
  467. when_cover_enabled(Fun) ->
  468. %% We need to check if cover is enabled to avoid crashes when attempting to start it
  469. %% on the peer.
  470. case is_cover_enabled() of
  471. true ->
  472. Fun();
  473. false ->
  474. ok
  475. end.