Bladeren bron

feat: cluster-rpc failed fast when some nodes is down

Zhongwen Deng 3 jaren geleden
bovenliggende
commit
1175008a74
1 gewijzigde bestanden met toevoegingen van 26 en 14 verwijderingen
  1. 26 14
      apps/emqx_conf/src/emqx_cluster_rpc.erl

+ 26 - 14
apps/emqx_conf/src/emqx_cluster_rpc.erl

@@ -72,6 +72,7 @@
 -define(TIMEOUT, timer:minutes(1)).
 -define(APPLY_KIND_REPLICATE, replicate).
 -define(APPLY_KIND_INITIATE, initiate).
+-define(IS_ACTION(_A_), (_A_ =:= peers_lagging orelse _A_ =:= stopped_nodes)).
 
 -type tnx_id() :: pos_integer().
 
@@ -123,13 +124,13 @@ start_link(Node, Name, RetryMs) ->
 %% the result is expected to be `ok | {ok, _}' to indicate success,
 %% and `{error, _}' to indicate failure.
 %%
-%% The excpetion of the MFA evaluation is captured and translated
+%% The exception of the MFA evaluation is captured and translated
 %% into an `{error, _}' tuple.
 %% This call tries to wait for all peer nodes to be in-sync before
 %% returning the result.
 %%
 %% In case of partial success, an `error' level log is emitted
-%% but the initial localy apply result is returned.
+%% but the initial locally apply result is returned.
 -spec multicall(module(), atom(), list()) -> term().
 multicall(M, F, A) ->
     multicall(M, F, A, all, timer:minutes(2)).
@@ -141,11 +142,12 @@ multicall(M, F, A, RequiredSyncs, Timeout) when RequiredSyncs =:= all orelse Req
             Result;
         {init_failure, Error} ->
             Error;
-        {peers_lagging, TnxId, Res, Nodes} ->
+        {Action, TnxId, Res, Nodes} when ?IS_ACTION(Action) ->
             %% The init MFA return ok, but some other nodes failed.
             ?SLOG(error, #{
                 msg => "cluster_rpc_peers_lagging",
-                lagging_nodes => Nodes,
+                action => Action,
+                nodes => Nodes,
                 tnx_id => TnxId
             }),
             Res
@@ -193,9 +195,9 @@ do_multicall(M, F, A, RequiredSyncs, Timeout) ->
             InitRes;
         {init_failure, Error0} ->
             {init_failure, Error0};
-        {peers_lagging, Nodes} ->
+        {Action, Nodes} when ?IS_ACTION(Action) ->
             {ok, TnxId0, MFARes} = InitRes,
-            {peers_lagging, TnxId0, MFARes, Nodes}
+            {Action, TnxId0, MFARes, Nodes}
     end.
 
 -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}.
@@ -509,14 +511,18 @@ do_alarm(Fun, Res, #{tnx_id := Id} = Meta) ->
     emqx_alarm:Fun(cluster_rpc_apply_failed, Meta#{result => ?TO_BIN(Res)}, AlarmMsg).
 
 wait_for_all_nodes_commit(TnxId, Delay, Remain) ->
-    case lagging_node(TnxId) of
+    Lagging = lagging_node(TnxId),
+    Stopped = stopped_nodes(),
+    case Lagging -- Stopped of
+        [] when Stopped =:= [] ->
+            ok;
+        [] ->
+            {stopped_nodes, Stopped};
         [_ | _] when Remain > 0 ->
             ok = timer:sleep(Delay),
             wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay);
-        [] ->
-            ok;
-        Nodes ->
-            {peers_lagging, Nodes}
+        [_ | _] ->
+            {peers_lagging, Lagging}
     end.
 
 wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) ->
@@ -527,10 +533,13 @@ wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) ->
         false when Remain > 0 ->
             wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain - Delay);
         false ->
-            case lagging_node(TnxId) of
+            Lagging = lagging_node(TnxId),
+            Stopped = stopped_nodes(),
+            case Lagging -- Stopped of
                 %% All commit but The succeedNum > length(nodes()).
-                [] -> ok;
-                Nodes -> {peers_lagging, Nodes}
+                [] when Stopped =:= [] -> ok;
+                [] -> {stopped_nodes, Stopped};
+                [_ | _] -> {peers_lagging, Lagging}
             end
     end.
 
@@ -548,6 +557,9 @@ commit_status_trans(Operator, TnxId) ->
     Result = '$2',
     mnesia:select(?CLUSTER_COMMIT, [{MatchHead, [Guard], [Result]}]).
 
+stopped_nodes() ->
+    ekka_cluster:info(stopped_nodes).
+
 get_retry_ms() ->
     emqx_conf:get([node, cluster_call, retry_interval], timer:minutes(1)).