|
|
@@ -38,14 +38,14 @@
|
|
|
%% in `end_per_suite/1` or `end_per_group/2`) with the result from step 2.
|
|
|
-module(emqx_cth_cluster).
|
|
|
|
|
|
--export([start/2]).
|
|
|
+-export([start/1, start/2, restart/2]).
|
|
|
-export([stop/1, stop_node/1]).
|
|
|
|
|
|
--export([start_bare_node/2]).
|
|
|
+-export([start_bare_nodes/1, start_bare_nodes/2]).
|
|
|
|
|
|
-export([share_load_module/2]).
|
|
|
-export([node_name/1, mk_nodespecs/2]).
|
|
|
--export([start_apps/2, set_node_opts/2]).
|
|
|
+-export([start_apps/2]).
|
|
|
|
|
|
-define(APPS_CLUSTERING, [gen_rpc, mria, ekka]).
|
|
|
|
|
|
@@ -109,9 +109,12 @@ when
|
|
|
}.
|
|
|
start(Nodes, ClusterOpts) ->
|
|
|
NodeSpecs = mk_nodespecs(Nodes, ClusterOpts),
|
|
|
- ct:pal("Starting cluster:\n ~p", [NodeSpecs]),
|
|
|
+ start(NodeSpecs).
|
|
|
+
|
|
|
+start(NodeSpecs) ->
|
|
|
+ ct:pal("(Re)starting nodes:\n ~p", [NodeSpecs]),
|
|
|
% 1. Start bare nodes with only basic applications running
|
|
|
- _ = emqx_utils:pmap(fun start_node_init/1, NodeSpecs, ?TIMEOUT_NODE_START_MS),
|
|
|
+ ok = start_nodes_init(NodeSpecs, ?TIMEOUT_NODE_START_MS),
|
|
|
% 2. Start applications needed to enable clustering
|
|
|
% Generally, this causes some applications to restart, but we deliberately don't
|
|
|
% start them yet.
|
|
|
@@ -121,6 +124,11 @@ start(Nodes, ClusterOpts) ->
|
|
|
_ = emqx_utils:pmap(fun run_node_phase_apps/1, NodeSpecs, ?TIMEOUT_APPS_START_MS),
|
|
|
[Node || #{name := Node} <- NodeSpecs].
|
|
|
|
|
|
+restart(Node, Spec) ->
|
|
|
+ ct:pal("Stopping peer node ~p", [Node]),
|
|
|
+ ok = emqx_cth_peer:stop(Node),
|
|
|
+ start([Spec#{boot_type => restart}]).
|
|
|
+
|
|
|
mk_nodespecs(Nodes, ClusterOpts) ->
|
|
|
NodeSpecs = lists:zipwith(
|
|
|
fun(N, {Name, Opts}) -> mk_init_nodespec(N, Name, Opts, ClusterOpts) end,
|
|
|
@@ -282,8 +290,50 @@ allocate_listener_port(Type, #{base_port := BasePort}) ->
|
|
|
allocate_listener_ports(Types, Spec) ->
|
|
|
lists:foldl(fun maps:merge/2, #{}, [allocate_listener_port(Type, Spec) || Type <- Types]).
|
|
|
|
|
|
-start_node_init(Spec = #{name := Node}) ->
|
|
|
- Node = start_bare_node(Node, Spec),
|
|
|
+start_nodes_init(Specs, Timeout) ->
|
|
|
+ Names = lists:map(fun(#{name := Name}) -> Name end, Specs),
|
|
|
+ Nodes = start_bare_nodes(Names, Timeout),
|
|
|
+ lists:foreach(fun node_init/1, Nodes).
|
|
|
+
|
|
|
+start_bare_nodes(Names) ->
|
|
|
+ start_bare_nodes(Names, ?TIMEOUT_NODE_START_MS).
|
|
|
+start_bare_nodes(Names, Timeout) ->
|
|
|
+ Args = erl_flags(),
|
|
|
+ Envs = [],
|
|
|
+ Waits = lists:map(
|
|
|
+ fun(Name) ->
|
|
|
+ WaitTag = {boot_complete, Name},
|
|
|
+ WaitBoot = {self(), WaitTag},
|
|
|
+ {ok, _} = emqx_cth_peer:start(Name, Args, Envs, WaitBoot),
|
|
|
+ WaitTag
|
|
|
+ end,
|
|
|
+ Names
|
|
|
+ ),
|
|
|
+ Deadline = erlang:monotonic_time() + erlang:convert_time_unit(Timeout, millisecond, nanosecond),
|
|
|
+ Nodes = wait_boot_complete(Waits, Deadline),
|
|
|
+ lists:foreach(fun(Node) -> pong = net_adm:ping(Node) end, Nodes),
|
|
|
+ Nodes.
|
|
|
+
|
|
|
+wait_boot_complete([], _) ->
|
|
|
+ [];
|
|
|
+wait_boot_complete(Waits, Deadline) ->
|
|
|
+ case erlang:monotonic_time() > Deadline of
|
|
|
+ true ->
|
|
|
+ error({timeout, Waits});
|
|
|
+ false ->
|
|
|
+ ok
|
|
|
+ end,
|
|
|
+ receive
|
|
|
+ {{boot_complete, _Name} = Wait, {started, Node, _Pid}} ->
|
|
|
+ ct:pal("~p", [Wait]),
|
|
|
+ [Node | wait_boot_complete(Waits -- [Wait], Deadline)];
|
|
|
+ {{boot_complete, _Name}, Otherwise} ->
|
|
|
+ error({unexpected, Otherwise})
|
|
|
+ after 100 ->
|
|
|
+ wait_boot_complete(Waits, Deadline)
|
|
|
+ end.
|
|
|
+
|
|
|
+node_init(Node) ->
|
|
|
% Make it possible to call `ct:pal` and friends (if running under rebar3)
|
|
|
_ = share_load_module(Node, cthr),
|
|
|
% Enable snabbkaffe trace forwarding
|
|
|
@@ -300,12 +350,6 @@ run_node_phase_apps(Spec = #{name := Node}) ->
|
|
|
ok = start_apps(Node, Spec),
|
|
|
ok.
|
|
|
|
|
|
-set_node_opts(Node, Spec) ->
|
|
|
- erpc:call(Node, persistent_term, put, [{?MODULE, opts}, Spec]).
|
|
|
-
|
|
|
-get_node_opts(Node) ->
|
|
|
- erpc:call(Node, persistent_term, get, [{?MODULE, opts}]).
|
|
|
-
|
|
|
load_apps(Node, #{apps := Apps}) ->
|
|
|
erpc:call(Node, emqx_cth_suite, load_apps, [Apps]).
|
|
|
|
|
|
@@ -322,8 +366,12 @@ start_apps(Node, #{apps := Apps} = Spec) ->
|
|
|
ok.
|
|
|
|
|
|
suite_opts(Spec) ->
|
|
|
- maps:with([work_dir], Spec).
|
|
|
+ maps:with([work_dir, boot_type], Spec).
|
|
|
|
|
|
+maybe_join_cluster(_Node, #{boot_type := restart}) ->
|
|
|
+ %% when restart, the node should already be in the cluster
|
|
|
+ %% hence no need to (re)join
|
|
|
+ ok;
|
|
|
maybe_join_cluster(_Node, #{role := replicant}) ->
|
|
|
ok;
|
|
|
maybe_join_cluster(Node, Spec) ->
|
|
|
@@ -352,23 +400,7 @@ stop(Nodes) ->
|
|
|
|
|
|
stop_node(Name) ->
|
|
|
Node = node_name(Name),
|
|
|
- try get_node_opts(Node) of
|
|
|
- Opts ->
|
|
|
- stop_node(Name, Opts)
|
|
|
- catch
|
|
|
- error:{erpc, _} ->
|
|
|
- ok
|
|
|
- end.
|
|
|
-
|
|
|
-stop_node(Node, #{driver := ct_slave}) ->
|
|
|
- case ct_slave:stop(Node, [{stop_timeout, ?TIMEOUT_NODE_STOP_S}]) of
|
|
|
- {ok, _} ->
|
|
|
- ok;
|
|
|
- {error, Reason, _} when Reason == not_connected; Reason == not_started ->
|
|
|
- ok
|
|
|
- end;
|
|
|
-stop_node(Node, #{driver := slave}) ->
|
|
|
- slave:stop(Node).
|
|
|
+ ok = emqx_cth_peer:stop(Node).
|
|
|
|
|
|
%% Ports
|
|
|
|
|
|
@@ -391,36 +423,12 @@ listener_port(BasePort, wss) ->
|
|
|
|
|
|
%%
|
|
|
|
|
|
--spec start_bare_node(atom(), map()) -> node().
|
|
|
-start_bare_node(Name, Spec = #{driver := ct_slave}) ->
|
|
|
- {ok, Node} = ct_slave:start(
|
|
|
- node_name(Name),
|
|
|
- [
|
|
|
- {kill_if_fail, true},
|
|
|
- {monitor_master, true},
|
|
|
- {init_timeout, 20_000},
|
|
|
- {startup_timeout, 20_000},
|
|
|
- {erl_flags, erl_flags()},
|
|
|
- {env, []}
|
|
|
- ]
|
|
|
- ),
|
|
|
- init_bare_node(Node, Spec);
|
|
|
-start_bare_node(Name, Spec = #{driver := slave}) ->
|
|
|
- {ok, Node} = slave:start_link(host(), Name, ebin_path()),
|
|
|
- init_bare_node(Node, Spec).
|
|
|
-
|
|
|
-init_bare_node(Node, Spec) ->
|
|
|
- pong = net_adm:ping(Node),
|
|
|
- % Preserve node spec right on the remote node
|
|
|
- ok = set_node_opts(Node, Spec),
|
|
|
- Node.
|
|
|
-
|
|
|
erl_flags() ->
|
|
|
- %% One core and redirecting logs to master
|
|
|
- "+S 1:1 -master " ++ atom_to_list(node()) ++ " " ++ ebin_path().
|
|
|
+ %% One core
|
|
|
+ ["+S", "1:1"] ++ ebin_path().
|
|
|
|
|
|
ebin_path() ->
|
|
|
- string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
|
|
|
+ ["-pa" | lists:filter(fun is_lib/1, code:get_path())].
|
|
|
|
|
|
is_lib(Path) ->
|
|
|
string:prefix(Path, code:lib_dir()) =:= nomatch andalso
|