|
@@ -146,7 +146,7 @@ multicall(M, F, A, RequiredSyncs, Timeout) when RequiredSyncs =:= all orelse Req
|
|
|
%% The init MFA return ok, but some other nodes failed.
|
|
%% The init MFA return ok, but some other nodes failed.
|
|
|
?SLOG(error, #{
|
|
?SLOG(error, #{
|
|
|
msg => "cluster_rpc_peers_lagging",
|
|
msg => "cluster_rpc_peers_lagging",
|
|
|
- status=> Status,
|
|
|
|
|
|
|
+ status => Status,
|
|
|
nodes => Nodes,
|
|
nodes => Nodes,
|
|
|
tnx_id => TnxId
|
|
tnx_id => TnxId
|
|
|
}),
|
|
}),
|
|
@@ -511,7 +511,7 @@ do_alarm(Fun, Res, #{tnx_id := Id} = Meta) ->
|
|
|
emqx_alarm:Fun(cluster_rpc_apply_failed, Meta#{result => ?TO_BIN(Res)}, AlarmMsg).
|
|
emqx_alarm:Fun(cluster_rpc_apply_failed, Meta#{result => ?TO_BIN(Res)}, AlarmMsg).
|
|
|
|
|
|
|
|
wait_for_all_nodes_commit(TnxId, Delay, Remain) ->
|
|
wait_for_all_nodes_commit(TnxId, Delay, Remain) ->
|
|
|
- Lagging = lagging_node(TnxId),
|
|
|
|
|
|
|
+ Lagging = lagging_nodes(TnxId),
|
|
|
Stopped = stopped_nodes(),
|
|
Stopped = stopped_nodes(),
|
|
|
case Lagging -- Stopped of
|
|
case Lagging -- Stopped of
|
|
|
[] when Stopped =:= [] ->
|
|
[] when Stopped =:= [] ->
|
|
@@ -533,17 +533,18 @@ wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) ->
|
|
|
false when Remain > 0 ->
|
|
false when Remain > 0 ->
|
|
|
wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain - Delay);
|
|
wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain - Delay);
|
|
|
false ->
|
|
false ->
|
|
|
- Lagging = lagging_node(TnxId),
|
|
|
|
|
- Stopped = stopped_nodes(),
|
|
|
|
|
- case Lagging -- Stopped of
|
|
|
|
|
- %% All commit but The succeedNum > length(nodes()).
|
|
|
|
|
- [] when Stopped =:= [] -> ok;
|
|
|
|
|
- [] -> {stopped_nodes, Stopped};
|
|
|
|
|
- [_ | _] -> {peers_lagging, Lagging}
|
|
|
|
|
|
|
+ case lagging_nodes(TnxId) of
|
|
|
|
|
+ [] ->
|
|
|
|
|
+ ok;
|
|
|
|
|
+ Lagging ->
|
|
|
|
|
+ case stopped_nodes() of
|
|
|
|
|
+ [] -> {peers_lagging, Lagging};
|
|
|
|
|
+ Stopped -> {stopped_nodes, Stopped}
|
|
|
|
|
+ end
|
|
|
end
|
|
end
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-lagging_node(TnxId) ->
|
|
|
|
|
|
|
+lagging_nodes(TnxId) ->
|
|
|
{atomic, Nodes} = transaction(fun ?MODULE:commit_status_trans/2, ['<', TnxId]),
|
|
{atomic, Nodes} = transaction(fun ?MODULE:commit_status_trans/2, ['<', TnxId]),
|
|
|
Nodes.
|
|
Nodes.
|
|
|
|
|
|