|
|
@@ -52,6 +52,7 @@
|
|
|
-define(TIMEOUT_NODE_START_MS, 15000).
|
|
|
-define(TIMEOUT_APPS_START_MS, 30000).
|
|
|
-define(TIMEOUT_NODE_STOP_S, 15).
|
|
|
+-define(TIMEOUT_CLUSTER_WAIT_MS, timer:seconds(10)).
|
|
|
|
|
|
%%
|
|
|
|
|
|
@@ -91,11 +92,7 @@
|
|
|
%% Working directory
|
|
|
%% If this directory is not empty, starting up the node applications will fail
|
|
|
%% Default: "${ClusterOpts.work_dir}/${nodename}"
|
|
|
- work_dir => file:name(),
|
|
|
-
|
|
|
- % Tooling to manage nodes
|
|
|
- % Default: `ct_slave`.
|
|
|
- driver => ct_slave | slave
|
|
|
+ work_dir => file:name()
|
|
|
}}.
|
|
|
|
|
|
-spec start([nodespec()], ClusterOpts) ->
|
|
|
@@ -118,11 +115,52 @@ start(NodeSpecs) ->
|
|
|
% 2. Start applications needed to enable clustering
|
|
|
% Generally, this causes some applications to restart, but we deliberately don't
|
|
|
% start them yet.
|
|
|
- _ = lists:foreach(fun run_node_phase_cluster/1, NodeSpecs),
|
|
|
+ ShouldAppearInRunningNodes = lists:map(fun run_node_phase_cluster/1, NodeSpecs),
|
|
|
+ IsClustered = lists:member(true, ShouldAppearInRunningNodes),
|
|
|
% 3. Start applications after cluster is formed
|
|
|
% Cluster-joins are complete, so they shouldn't restart in the background anymore.
|
|
|
_ = emqx_utils:pmap(fun run_node_phase_apps/1, NodeSpecs, ?TIMEOUT_APPS_START_MS),
|
|
|
- [Node || #{name := Node} <- NodeSpecs].
|
|
|
+ Nodes = [Node || #{name := Node} <- NodeSpecs],
|
|
|
+ %% 4. Wait for the nodes to cluster
|
|
|
+ case IsClustered of
|
|
|
+ true ->
|
|
|
+ ok = wait_clustered(Nodes, ?TIMEOUT_CLUSTER_WAIT_MS);
|
|
|
+ false ->
|
|
|
+ ok
|
|
|
+ end,
|
|
|
+ Nodes.
|
|
|
+
|
|
|
+%% Wait until all nodes see all nodes as mria running nodes
|
|
|
+wait_clustered(Nodes, Timeout) ->
|
|
|
+ Check = fun(Node) ->
|
|
|
+ Running = erpc:call(Node, mria, running_nodes, []),
|
|
|
+ case Nodes -- Running of
|
|
|
+ [] ->
|
|
|
+ true;
|
|
|
+ NotRunning ->
|
|
|
+ {false, NotRunning}
|
|
|
+ end
|
|
|
+ end,
|
|
|
+ wait_clustered(Nodes, Check, deadline(Timeout)).
|
|
|
+
|
|
|
+wait_clustered([], _Check, _Deadline) ->
|
|
|
+ ok;
|
|
|
+wait_clustered([Node | Nodes] = All, Check, Deadline) ->
|
|
|
+ IsOverdue = is_overdue(Deadline),
|
|
|
+ case Check(Node) of
|
|
|
+ true ->
|
|
|
+ wait_clustered(Nodes, Check, Deadline);
|
|
|
+ {false, NodesNotRunnging} when IsOverdue ->
|
|
|
+ error(
|
|
|
+ {timeout, #{
|
|
|
+ checking_from_node => Node,
|
|
|
+ nodes_not_running => NodesNotRunnging
|
|
|
+ }}
|
|
|
+ );
|
|
|
+ {false, Nodes} ->
|
|
|
+ timer:sleep(100),
|
|
|
+ wait_clustered(All, Check, Deadline)
|
|
|
+ end.
|
|
|
|
|
|
restart(Node, Spec) ->
|
|
|
ct:pal("Stopping peer node ~p", [Node]),
|
|
|
@@ -162,8 +200,7 @@ mk_init_nodespec(N, Name, NodeOpts, ClusterOpts) ->
|
|
|
role => core,
|
|
|
apps => [],
|
|
|
base_port => BasePort,
|
|
|
- work_dir => filename:join([WorkDir, Node]),
|
|
|
- driver => ct_slave
|
|
|
+ work_dir => filename:join([WorkDir, Node])
|
|
|
},
|
|
|
maps:merge(Defaults, NodeOpts).
|
|
|
|
|
|
@@ -309,15 +346,21 @@ start_bare_nodes(Names, Timeout) ->
|
|
|
end,
|
|
|
Names
|
|
|
),
|
|
|
- Deadline = erlang:monotonic_time() + erlang:convert_time_unit(Timeout, millisecond, nanosecond),
|
|
|
+ Deadline = deadline(Timeout),
|
|
|
Nodes = wait_boot_complete(Waits, Deadline),
|
|
|
lists:foreach(fun(Node) -> pong = net_adm:ping(Node) end, Nodes),
|
|
|
Nodes.
|
|
|
|
|
|
+deadline(Timeout) ->
|
|
|
+ erlang:monotonic_time() + erlang:convert_time_unit(Timeout, millisecond, nanosecond).
|
|
|
+
|
|
|
+is_overdue(Deadline) ->
|
|
|
+ erlang:monotonic_time() > Deadline.
|
|
|
+
|
|
|
wait_boot_complete([], _) ->
|
|
|
[];
|
|
|
wait_boot_complete(Waits, Deadline) ->
|
|
|
- case erlang:monotonic_time() > Deadline of
|
|
|
+ case is_overdue(Deadline) of
|
|
|
true ->
|
|
|
error({timeout, Waits});
|
|
|
false ->
|
|
|
@@ -340,11 +383,11 @@ node_init(Node) ->
|
|
|
ok = snabbkaffe:forward_trace(Node),
|
|
|
ok.
|
|
|
|
|
|
+%% Returns 'true' if this node should appear in running nodes list.
|
|
|
run_node_phase_cluster(Spec = #{name := Node}) ->
|
|
|
ok = load_apps(Node, Spec),
|
|
|
ok = start_apps_clustering(Node, Spec),
|
|
|
- ok = maybe_join_cluster(Node, Spec),
|
|
|
- ok.
|
|
|
+ maybe_join_cluster(Node, Spec).
|
|
|
|
|
|
run_node_phase_apps(Spec = #{name := Node}) ->
|
|
|
ok = start_apps(Node, Spec),
|
|
|
@@ -368,18 +411,20 @@ start_apps(Node, #{apps := Apps} = Spec) ->
|
|
|
suite_opts(Spec) ->
|
|
|
maps:with([work_dir, boot_type], Spec).
|
|
|
|
|
|
+%% Returns 'true' if this node should appear in the cluster.
|
|
|
maybe_join_cluster(_Node, #{boot_type := restart}) ->
|
|
|
%% when restart, the node should already be in the cluster
|
|
|
%% hence no need to (re)join
|
|
|
- ok;
|
|
|
+ true;
|
|
|
maybe_join_cluster(_Node, #{role := replicant}) ->
|
|
|
- ok;
|
|
|
+ true;
|
|
|
maybe_join_cluster(Node, Spec) ->
|
|
|
case get_cluster_seeds(Spec) of
|
|
|
[JoinTo | _] ->
|
|
|
- ok = join_cluster(Node, JoinTo);
|
|
|
+ ok = join_cluster(Node, JoinTo),
|
|
|
+ true;
|
|
|
[] ->
|
|
|
- ok
|
|
|
+ false
|
|
|
end.
|
|
|
|
|
|
join_cluster(Node, JoinTo) ->
|