Просмотр исходного кода

test(cluster_rpc): reset commit when processes are down

zmstone 1 год назад
Родитель
Сommit
76c74f30f9
1 измененных файлов с 38 добавлено и 26 удалено
  1. 38 26
      apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl

+ 38 - 26
apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl

@@ -56,6 +56,7 @@ init_per_suite(Config) ->
     ),
     meck:new(mria, [non_strict, passthrough, no_link]),
     meck:expect(mria, running_nodes, 0, [?NODE1, {node(), ?NODE2}, {node(), ?NODE3}]),
+    ok = emqx_cluster_rpc:wait_for_cluster_rpc(),
     [{suite_apps, Apps} | Config].
 
 end_per_suite(Config) ->
@@ -227,29 +228,39 @@ t_catch_up_status_handle_next_commit(_Config) ->
 t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
     {atomic, []} = emqx_cluster_rpc:status(),
     ets:new(test, [named_table, public]),
-    ets:insert(test, {other_mfa_result, failed}),
-    ct:pal("111:~p~n", [ets:tab2list(cluster_rpc_commit)]),
-    {M, F, A} = {?MODULE, failed_on_other_recover_after_retry, [erlang:whereis(?NODE1)]},
-    {ok, 1, ok} = multicall(M, F, A, 1, 1000),
-    ct:pal("222:~p~n", [ets:tab2list(cluster_rpc_commit)]),
-    ct:pal("333:~p~n", [emqx_cluster_rpc:status()]),
-    {atomic, [_Status | L]} = emqx_cluster_rpc:status(),
-    ?assertEqual([], L),
-    ets:insert(test, {other_mfa_result, ok}),
-    {ok, 2, ok} = multicall(?MODULE, format, ["format:~p~n", [?FUNCTION_NAME]], 1, 1000),
-    ct:sleep(1000),
-    {atomic, NewStatus} = emqx_cluster_rpc:status(),
-    ?assertEqual(3, length(NewStatus)),
-    Pid = self(),
-    Msg = ?FUNCTION_NAME,
-    MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, Msg]},
-    {ok, TnxId, ok} = multicall(M1, F1, A1),
-    {atomic, Query} = emqx_cluster_rpc:query(TnxId),
-    ?assertEqual(MFAEcho, maps:get(mfa, Query)),
-    ?assertEqual(node(), maps:get(initiator, Query)),
-    ?assert(maps:is_key(created_at, Query)),
-    ?assertEqual(ok, receive_msg(3, Msg)),
-    ok.
+    try
+        %% step1: expect initial commits to be zero for all nodes
+        Commits1 = ets:tab2list(cluster_rpc_commit),
+        ct:pal("step1(expect all tnx_id to be zero):~n~p~n", [Commits1]),
+        ct:pal("step1_inspect_status:~n~p~n", [emqx_cluster_rpc:status()]),
+        ?assertEqual([0, 0, 0], lists:map(fun({_RecordName, _Node, ID}) -> ID end, Commits1)),
+        %% step2: insert stub a failure, and cause one node to fail
+        ets:insert(test, {other_mfa_result, failed}),
+        {M, F, A} = {?MODULE, failed_on_other_recover_after_retry, [erlang:whereis(?NODE1)]},
+        {ok, 1, ok} = multicall(M, F, A, 1, 1000),
+        Commits2 = ets:tab2list(cluster_rpc_commit),
+        ct:pal("step2(expect node1 to have tnx_id=1):~n~p~n", [Commits2]),
+        ct:pal("step2_inspect_status:~n~p~n", [emqx_cluster_rpc:status()]),
+        {atomic, [_Status | L]} = emqx_cluster_rpc:status(),
+        ?assertEqual([], L),
+        ets:insert(test, {other_mfa_result, ok}),
+        {ok, 2, ok} = multicall(?MODULE, format, ["format:~p~n", [?FUNCTION_NAME]], 1, 1000),
+        ct:sleep(1000),
+        {atomic, NewStatus} = emqx_cluster_rpc:status(),
+        ?assertEqual(3, length(NewStatus)),
+        Pid = self(),
+        Msg = ?FUNCTION_NAME,
+        MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, Msg]},
+        {ok, TnxId, ok} = multicall(M1, F1, A1),
+        {atomic, Query} = emqx_cluster_rpc:query(TnxId),
+        ?assertEqual(MFAEcho, maps:get(mfa, Query)),
+        ?assertEqual(node(), maps:get(initiator, Query)),
+        ?assert(maps:is_key(created_at, Query)),
+        ?assertEqual(ok, receive_msg(3, Msg)),
+        ok
+    after
+        ets:delete(test)
+    end.
 
 t_del_stale_mfa(_Config) ->
     {atomic, []} = emqx_cluster_rpc:status(),
@@ -362,8 +373,6 @@ tnx_ids(Status) ->
 start() ->
     {ok, _Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500),
     {ok, _Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500),
-    ok = emqx_cluster_rpc:wait_for_cluster_rpc(),
-    ok = emqx_cluster_rpc:reset(),
     %% Ensure all processes are idle status.
     ok = gen_server:call(?NODE2, test),
     ok = gen_server:call(?NODE3, test),
@@ -382,7 +391,10 @@ stop() ->
             end
         end
      || N <- [?NODE2, ?NODE3]
-    ].
+    ],
+    %% erase all commit history, set commit tnx_id back to 0 for all nodes
+    ok = emqx_cluster_rpc:reset(),
+    ok.
 
 receive_msg(0, _Msg) ->
     ok;