Преглед изворни кода

Merge pull request #12121 from zhongwencool/cluster-rpc

fix: occasionally return stale view when updating configurations
JianBo He пре 2 година
родитељ
комит
5b70baf624

+ 34 - 18
apps/emqx_conf/src/emqx_cluster_rpc.erl

@@ -343,13 +343,8 @@ handle_call(reset, _From, State) ->
     _ = mria:clear_table(?CLUSTER_COMMIT),
     _ = mria:clear_table(?CLUSTER_MFA),
     {reply, ok, State, {continue, ?CATCH_UP}};
-handle_call(?INITIATE(MFA), _From, State = #{node := Node}) ->
-    case transaction(fun ?MODULE:init_mfa/2, [Node, MFA]) of
-        {atomic, {ok, TnxId, Result}} ->
-            {reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}};
-        {aborted, Error} ->
-            {reply, {init_failure, Error}, State, {continue, ?CATCH_UP}}
-    end;
+handle_call(?INITIATE(MFA), _From, State) ->
+    do_initiate(MFA, State, 1, #{});
 handle_call(skip_failed_commit, _From, State = #{node := Node}) ->
     Timeout = catch_up(State, true),
     {atomic, LatestId} = transaction(fun ?MODULE:get_node_tnx_id/1, [Node]),
@@ -465,11 +460,40 @@ get_oldest_mfa_id() ->
         Id -> Id
     end.
 
+do_initiate(_MFA, State, Count, Failure) when Count > 10 ->
+    %% refuse to initiate cluster call from this node
+    %% because it's likely that the caller is based on
+    %% a stale view event we retry 10 time.
+    Error = stale_view_of_cluster_msg(Failure, Count),
+    {reply, {init_failure, Error}, State, {continue, ?CATCH_UP}};
+do_initiate(MFA, State = #{node := Node}, Count, Failure0) ->
+    case transaction(fun ?MODULE:init_mfa/2, [Node, MFA]) of
+        {atomic, {ok, TnxId, Result}} ->
+            {reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}};
+        {atomic, {retry, Failure1}} when Failure0 =:= Failure1 ->
+            %% Useless retry, so we return early.
+            Error = stale_view_of_cluster_msg(Failure0, Count),
+            {reply, {init_failure, Error}, State, {continue, ?CATCH_UP}};
+        {atomic, {retry, Failure1}} ->
+            catch_up(State),
+            do_initiate(MFA, State, Count + 1, Failure1);
+        {aborted, Error} ->
+            {reply, {init_failure, Error}, State, {continue, ?CATCH_UP}}
+    end.
+
+stale_view_of_cluster_msg(Meta, Count) ->
+    Reason = Meta#{
+        msg => stale_view_of_cluster_state,
+        retry_times => Count
+    },
+    ?SLOG(warning, Reason),
+    Reason.
+
 %% The entry point of a config change transaction.
 init_mfa(Node, MFA) ->
     mnesia:write_lock_table(?CLUSTER_MFA),
     LatestId = get_cluster_tnx_id(),
-    MyTnxId = get_node_tnx_id(node()),
+    MyTnxId = get_node_tnx_id(Node),
     case MyTnxId =:= LatestId of
         true ->
             TnxId = LatestId + 1,
@@ -486,16 +510,8 @@ init_mfa(Node, MFA) ->
                 {false, Error} -> mnesia:abort(Error)
             end;
         false ->
-            %% refuse to initiate cluster call from this node
-            %% because it's likely that the caller is based on
-            %% a stale view.
-            Reason = #{
-                msg => stale_view_of_cluster_state,
-                cluster_tnx_id => LatestId,
-                node_tnx_id => MyTnxId
-            },
-            ?SLOG(warning, Reason),
-            mnesia:abort({error, Reason})
+            Meta = #{cluster_tnx_id => LatestId, node_tnx_id => MyTnxId},
+            {retry, Meta}
     end.
 
 transaction(Func, Args) ->

+ 102 - 12
apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl

@@ -35,9 +35,10 @@ all() ->
         t_commit_ok_apply_fail_on_other_node_then_recover,
         t_del_stale_mfa,
         t_skip_failed_commit,
-        t_fast_forward_commit
+        t_fast_forward_commit,
+        t_commit_concurrency
     ].
-suite() -> [{timetrap, {minutes, 3}}].
+suite() -> [{timetrap, {minutes, 5}}].
 groups() -> [].
 
 init_per_suite(Config) ->
@@ -63,6 +64,7 @@ end_per_suite(_Config) ->
     ok.
 
 init_per_testcase(_TestCase, Config) ->
+    stop(),
     start(),
     Config.
 
@@ -119,17 +121,101 @@ t_commit_crash_test(_Config) ->
 t_commit_ok_but_apply_fail_on_other_node(_Config) ->
     emqx_cluster_rpc:reset(),
     {atomic, []} = emqx_cluster_rpc:status(),
-    MFA = {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
+    Pid = self(),
+    {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]},
+    {ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA),
+    ?assertEqual(ok, receive_msg(3, test)),
+
+    {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
     {ok, _, ok} = multicall(M, F, A, 1, 1000),
-    {atomic, [Status]} = emqx_cluster_rpc:status(),
-    ?assertEqual(MFA, maps:get(mfa, Status)),
-    ?assertEqual(node(), maps:get(node, Status)),
+    {atomic, AllStatus} = emqx_cluster_rpc:status(),
+    Node = node(),
+    ?assertEqual(
+        [
+            {1, {Node, emqx_cluster_rpc2}},
+            {1, {Node, emqx_cluster_rpc3}},
+            {2, Node}
+        ],
+        lists:sort([{T, N} || #{tnx_id := T, node := N} <- AllStatus])
+    ),
     erlang:send(?NODE2, test),
     Call = emqx_cluster_rpc:make_initiate_call_req(M, F, A),
-    Res = gen_server:call(?NODE2, Call),
-    ?assertEqual({init_failure, "MFA return not ok"}, Res),
+    Res1 = gen_server:call(?NODE2, Call),
+    Res2 = gen_server:call(?NODE3, Call),
+    %% Node2 is retry on tnx_id 1, and should not run Next MFA.
+    ?assertEqual(
+        {init_failure, #{
+            msg => stale_view_of_cluster_state,
+            retry_times => 2,
+            cluster_tnx_id => 2,
+            node_tnx_id => 1
+        }},
+        Res1
+    ),
+    ?assertEqual(Res1, Res2),
+    ok.
+
+t_commit_concurrency(_Config) ->
+    emqx_cluster_rpc:reset(),
+    {atomic, []} = emqx_cluster_rpc:status(),
+    Pid = self(),
+    {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]},
+    {ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA),
+    ?assertEqual(ok, receive_msg(3, test)),
+
+    %% call concurrently without stale tnx_id error
+    Workers = lists:seq(1, 256),
+    lists:foreach(
+        fun(Seq) ->
+            {EchoM, EchoF, EchoA} = {?MODULE, echo_delay, [Pid, Seq]},
+            Call = emqx_cluster_rpc:make_initiate_call_req(EchoM, EchoF, EchoA),
+            spawn_link(fun() ->
+                ?assertMatch({ok, _, ok}, gen_server:call(?NODE1, Call, infinity))
+            end),
+            spawn_link(fun() ->
+                ?assertMatch({ok, _, ok}, gen_server:call(?NODE2, Call, infinity))
+            end),
+            spawn_link(fun() ->
+                ?assertMatch({ok, _, ok}, gen_server:call(?NODE3, Call, infinity))
+            end)
+        end,
+        Workers
+    ),
+    %% receive seq msg in order
+    List = lists:sort(receive_seq_msg([])),
+    ?assertEqual(256 * 3 * 3, length(List), List),
+    {atomic, Status} = emqx_cluster_rpc:status(),
+    lists:map(
+        fun(#{tnx_id := TnxId} = S) ->
+            ?assertEqual(256 * 3 + 1, TnxId, S)
+        end,
+        Status
+    ),
+    AllMsgIndex = lists:flatten(lists:duplicate(9, Workers)),
+    Result =
+        lists:foldl(
+            fun(Index, Acc) ->
+                ?assertEqual(true, lists:keymember(Index, 1, Acc), {Index, Acc}),
+                lists:keydelete(Index, 1, Acc)
+            end,
+            List,
+            AllMsgIndex
+        ),
+    ?assertEqual([], Result),
+    receive
+        Unknown -> throw({receive_unknown_msg, Unknown})
+    after 1000 -> ok
+    end,
     ok.
 
+receive_seq_msg(Acc) ->
+    receive
+        {msg, Seq, Time, Pid} ->
+            receive_seq_msg([{Seq, Time, Pid} | Acc])
+    after 3000 ->
+        Acc
+    end.
+
 t_catch_up_status_handle_next_commit(_Config) ->
     emqx_cluster_rpc:reset(),
     {atomic, []} = emqx_cluster_rpc:status(),
@@ -296,9 +382,8 @@ stop() ->
                     erlang:exit(P, kill)
             end
         end
-     || N <- [?NODE1, ?NODE2, ?NODE3]
-    ],
-    gen_server:stop(emqx_cluster_rpc_cleaner, normal, 5000).
+     || N <- [?NODE1, ?NODE2, ?NODE3, emqx_cluster_rpc_cleaner]
+    ].
 
 receive_msg(0, _Msg) ->
     ok;
@@ -306,7 +391,7 @@ receive_msg(Count, Msg) when Count > 0 ->
     receive
         Msg ->
             receive_msg(Count - 1, Msg)
-    after 800 ->
+    after 1000 ->
         timeout
     end.
 
@@ -314,6 +399,11 @@ echo(Pid, Msg) ->
     erlang:send(Pid, Msg),
     ok.
 
+echo_delay(Pid, Msg) ->
+    timer:sleep(rand:uniform(150)),
+    erlang:send(Pid, {msg, Msg, erlang:system_time(), self()}),
+    ok.
+
 failed_on_node(Pid) ->
     case Pid =:= self() of
         true -> ok;

+ 1 - 0
changes/ce/fix-12121.en.md

@@ -0,0 +1 @@
+Fixed occasionally return stale view when updating configurations on different nodes concurrently