Forráskód Böngészése

Merge pull request #13616 from zmstone/0814-flaky-test

0814 flaky test
zmstone 1 éve
szülő
commit
c8a4dc7e95

+ 40 - 13
apps/emqx/test/emqx_takeover_SUITE.erl

@@ -71,6 +71,7 @@ init_per_group(persistence_enabled = Group, Config) ->
             {emqx,
             {emqx,
                 "durable_sessions = {\n"
                 "durable_sessions = {\n"
                 "  enable = true\n"
                 "  enable = true\n"
+                "  force_persistence = true\n"
                 "  heartbeat_interval = 100ms\n"
                 "  heartbeat_interval = 100ms\n"
                 "  renew_streams_interval = 100ms\n"
                 "  renew_streams_interval = 100ms\n"
                 "  session_gc_interval = 2s\n"
                 "  session_gc_interval = 2s\n"
@@ -113,7 +114,15 @@ end_per_group(_Group, _Config) ->
 
 
 t_takeover(Config) ->
 t_takeover(Config) ->
     process_flag(trap_exit, true),
     process_flag(trap_exit, true),
-    ClientId = atom_to_binary(?FUNCTION_NAME),
+    Vsn = atom_to_list(?config(mqtt_vsn, Config)),
+    Persist =
+        case ?config(persistence_enabled, Config) of
+            true ->
+                "persistent-";
+            false ->
+                "not-persistent-"
+        end,
+    ClientId = iolist_to_binary("t_takeover-" ++ Persist ++ Vsn),
     ClientOpts = [
     ClientOpts = [
         {proto_ver, ?config(mqtt_vsn, Config)},
         {proto_ver, ?config(mqtt_vsn, Config)},
         {clean_start, false}
         {clean_start, false}
@@ -176,7 +185,7 @@ t_takeover(Config) ->
 t_takeover_willmsg(Config) ->
 t_takeover_willmsg(Config) ->
     process_flag(trap_exit, true),
     process_flag(trap_exit, true),
     ClientId = atom_to_binary(?FUNCTION_NAME),
     ClientId = atom_to_binary(?FUNCTION_NAME),
-    WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
+    WillTopic = <<ClientId/binary, <<"_willtopic">>/binary>>,
     Middle = ?CNT div 2,
     Middle = ?CNT div 2,
     Client1Msgs = messages(ClientId, 0, Middle),
     Client1Msgs = messages(ClientId, 0, Middle),
     Client2Msgs = messages(ClientId, Middle, ?CNT div 2),
     Client2Msgs = messages(ClientId, Middle, ?CNT div 2),
@@ -890,16 +899,11 @@ t_kick_session(Config) ->
             {fun start_client/5, [
             {fun start_client/5, [
                 <<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
                 <<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
             ]},
             ]},
-            %% kick may fail (not found) without this delay
-            {
-                fun(CTX) ->
-                    timer:sleep(300),
-                    CTX
-                end,
-                []
-            },
+            {fun wait_for_chan_reg/2, [ClientId]},
             %% WHEN: client is kicked with kick_session
             %% WHEN: client is kicked with kick_session
-            {fun kick_client/2, [ClientId]}
+            {fun kick_client/2, [ClientId]},
+            {fun wait_for_chan_dereg/2, [ClientId]},
+            {fun wait_for_pub_client_down/1, []}
         ]),
         ]),
     FCtx = lists:foldl(
     FCtx = lists:foldl(
         fun({Fun, Args}, Ctx) ->
         fun({Fun, Args}, Ctx) ->
@@ -911,7 +915,7 @@ t_kick_session(Config) ->
     ),
     ),
     #{client := [CPidSub, CPid1]} = FCtx,
     #{client := [CPidSub, CPid1]} = FCtx,
     assert_client_exit(CPid1, ?config(mqtt_vsn, Config), kicked),
     assert_client_exit(CPid1, ?config(mqtt_vsn, Config), kicked),
-    Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
+    Received = [Msg || {publish, Msg} <- ?drainMailbox(timer:seconds(1))],
     ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
     ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
     %% THEN: payload <<"willpayload_kick">> should be published
     %% THEN: payload <<"willpayload_kick">> should be published
     {IsWill, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_kick">>),
     {IsWill, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_kick">>),
@@ -920,6 +924,30 @@ t_kick_session(Config) ->
     ?assert(not is_process_alive(CPid1)),
     ?assert(not is_process_alive(CPid1)),
     ok.
     ok.
 
 
+wait_for_chan_reg(CTX, ClientId) ->
+    ?retry(
+        3_000,
+        100,
+        true = is_map(emqx_cm:get_chan_info(ClientId))
+    ),
+    CTX.
+
+wait_for_chan_dereg(CTX, ClientId) ->
+    ?retry(
+        3_000,
+        100,
+        undefined = emqx_cm:get_chan_info(ClientId)
+    ),
+    CTX.
+
+wait_for_pub_client_down(#{client := [_SubClient, PubClient]} = CTX) ->
+    ?retry(
+        3_000,
+        100,
+        false = is_process_alive(PubClient)
+    ),
+    CTX.
+
 %% t_takover_in_cluster(_) ->
 %% t_takover_in_cluster(_) ->
 %%     todo.
 %%     todo.
 
 
@@ -929,7 +957,6 @@ start_client(Ctx, ClientId, Topic, Qos, Opts) ->
     {ok, CPid} = emqtt:start_link([{clientid, ClientId} | Opts]),
     {ok, CPid} = emqtt:start_link([{clientid, ClientId} | Opts]),
     _ = erlang:spawn_link(fun() ->
     _ = erlang:spawn_link(fun() ->
         {ok, _} = emqtt:connect(CPid),
         {ok, _} = emqtt:connect(CPid),
-        ct:pal("CLIENT: connected ~p", [CPid]),
         {ok, _, [Qos]} = emqtt:subscribe(CPid, Topic, Qos)
         {ok, _, [Qos]} = emqtt:subscribe(CPid, Topic, Qos)
     end),
     end),
     Ctx#{client => [CPid | maps:get(client, Ctx, [])]}.
     Ctx#{client => [CPid | maps:get(client, Ctx, [])]}.

+ 38 - 26
apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl

@@ -56,6 +56,7 @@ init_per_suite(Config) ->
     ),
     ),
     meck:new(mria, [non_strict, passthrough, no_link]),
     meck:new(mria, [non_strict, passthrough, no_link]),
     meck:expect(mria, running_nodes, 0, [?NODE1, {node(), ?NODE2}, {node(), ?NODE3}]),
     meck:expect(mria, running_nodes, 0, [?NODE1, {node(), ?NODE2}, {node(), ?NODE3}]),
+    ok = emqx_cluster_rpc:wait_for_cluster_rpc(),
     [{suite_apps, Apps} | Config].
     [{suite_apps, Apps} | Config].
 
 
 end_per_suite(Config) ->
 end_per_suite(Config) ->
@@ -227,29 +228,39 @@ t_catch_up_status_handle_next_commit(_Config) ->
 t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
 t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
     {atomic, []} = emqx_cluster_rpc:status(),
     {atomic, []} = emqx_cluster_rpc:status(),
     ets:new(test, [named_table, public]),
     ets:new(test, [named_table, public]),
-    ets:insert(test, {other_mfa_result, failed}),
-    ct:pal("111:~p~n", [ets:tab2list(cluster_rpc_commit)]),
-    {M, F, A} = {?MODULE, failed_on_other_recover_after_retry, [erlang:whereis(?NODE1)]},
-    {ok, 1, ok} = multicall(M, F, A, 1, 1000),
-    ct:pal("222:~p~n", [ets:tab2list(cluster_rpc_commit)]),
-    ct:pal("333:~p~n", [emqx_cluster_rpc:status()]),
-    {atomic, [_Status | L]} = emqx_cluster_rpc:status(),
-    ?assertEqual([], L),
-    ets:insert(test, {other_mfa_result, ok}),
-    {ok, 2, ok} = multicall(?MODULE, format, ["format:~p~n", [?FUNCTION_NAME]], 1, 1000),
-    ct:sleep(1000),
-    {atomic, NewStatus} = emqx_cluster_rpc:status(),
-    ?assertEqual(3, length(NewStatus)),
-    Pid = self(),
-    Msg = ?FUNCTION_NAME,
-    MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, Msg]},
-    {ok, TnxId, ok} = multicall(M1, F1, A1),
-    {atomic, Query} = emqx_cluster_rpc:query(TnxId),
-    ?assertEqual(MFAEcho, maps:get(mfa, Query)),
-    ?assertEqual(node(), maps:get(initiator, Query)),
-    ?assert(maps:is_key(created_at, Query)),
-    ?assertEqual(ok, receive_msg(3, Msg)),
-    ok.
+    try
+        %% step1: expect initial commits to be zero for all nodes
+        Commits1 = ets:tab2list(cluster_rpc_commit),
+        ct:pal("step1(expect all tnx_id to be zero):~n~p~n", [Commits1]),
+        ct:pal("step1_inspect_status:~n~p~n", [emqx_cluster_rpc:status()]),
+        ?assertEqual([0, 0, 0], lists:map(fun({_RecordName, _Node, ID}) -> ID end, Commits1)),
+        %% step2: insert stub a failure, and cause one node to fail
+        ets:insert(test, {other_mfa_result, failed}),
+        {M, F, A} = {?MODULE, failed_on_other_recover_after_retry, [erlang:whereis(?NODE1)]},
+        {ok, 1, ok} = multicall(M, F, A, 1, 1000),
+        Commits2 = ets:tab2list(cluster_rpc_commit),
+        ct:pal("step2(expect node1 to have tnx_id=1):~n~p~n", [Commits2]),
+        ct:pal("step2_inspect_status:~n~p~n", [emqx_cluster_rpc:status()]),
+        {atomic, [_Status | L]} = emqx_cluster_rpc:status(),
+        ?assertEqual([], L),
+        ets:insert(test, {other_mfa_result, ok}),
+        {ok, 2, ok} = multicall(?MODULE, format, ["format:~p~n", [?FUNCTION_NAME]], 1, 1000),
+        ct:sleep(1000),
+        {atomic, NewStatus} = emqx_cluster_rpc:status(),
+        ?assertEqual(3, length(NewStatus)),
+        Pid = self(),
+        Msg = ?FUNCTION_NAME,
+        MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, Msg]},
+        {ok, TnxId, ok} = multicall(M1, F1, A1),
+        {atomic, Query} = emqx_cluster_rpc:query(TnxId),
+        ?assertEqual(MFAEcho, maps:get(mfa, Query)),
+        ?assertEqual(node(), maps:get(initiator, Query)),
+        ?assert(maps:is_key(created_at, Query)),
+        ?assertEqual(ok, receive_msg(3, Msg)),
+        ok
+    after
+        ets:delete(test)
+    end.
 
 
 t_del_stale_mfa(_Config) ->
 t_del_stale_mfa(_Config) ->
     {atomic, []} = emqx_cluster_rpc:status(),
     {atomic, []} = emqx_cluster_rpc:status(),
@@ -362,8 +373,6 @@ tnx_ids(Status) ->
 start() ->
 start() ->
     {ok, _Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500),
     {ok, _Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500),
     {ok, _Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500),
     {ok, _Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500),
-    ok = emqx_cluster_rpc:wait_for_cluster_rpc(),
-    ok = emqx_cluster_rpc:reset(),
     %% Ensure all processes are idle status.
     %% Ensure all processes are idle status.
     ok = gen_server:call(?NODE2, test),
     ok = gen_server:call(?NODE2, test),
     ok = gen_server:call(?NODE3, test),
     ok = gen_server:call(?NODE3, test),
@@ -382,7 +391,10 @@ stop() ->
             end
             end
         end
         end
      || N <- [?NODE2, ?NODE3]
      || N <- [?NODE2, ?NODE3]
-    ].
+    ],
+    %% erase all commit history, set commit tnx_id back to 0 for all nodes
+    ok = emqx_cluster_rpc:reset(),
+    ok.
 
 
 receive_msg(0, _Msg) ->
 receive_msg(0, _Msg) ->
     ok;
     ok;

+ 1 - 1
apps/emqx_ds_backends/src/emqx_ds_backends.app.src.script

@@ -20,7 +20,7 @@ Backends = case Profile of
                [emqx_ds_builtin_local, emqx_ds_builtin_raft, emqx_fdb_ds]
                [emqx_ds_builtin_local, emqx_ds_builtin_raft, emqx_fdb_ds]
            end,
            end,
 
 
-io:format(user, "DS backends available for this release (~p): ~p~n", [Profile, Backends]),
+io:format(user, "DS backends available for this release (~p): ~0p~n", [Profile, Backends]),
 
 
 {application, emqx_ds_backends, [
 {application, emqx_ds_backends, [
     {description, "A placeholder application that depends on all available DS backends"},
     {description, "A placeholder application that depends on all available DS backends"},