Просмотр исходного кода

test(emqx_broker): fix flaky tests

Zaiming Shi 4 лет назад
Родитель
Сommit
3fa442f4a4
1 измененных файлов с 112 добавлено и 41 удалено
  1. 112 41
      test/emqx_broker_SUITE.erl

+ 112 - 41
test/emqx_broker_SUITE.erl

@@ -37,23 +37,35 @@ init_per_suite(Config) ->
 end_per_suite(_Config) ->
     emqx_ct_helpers:stop_apps([]).
 
+init_per_testcase(Case, Config) ->
+    ?MODULE:Case({init, Config}).
+
+end_per_testcase(Case, Config) ->
+    ?MODULE:Case({'end', Config}).
+
 %%--------------------------------------------------------------------
 %% PubSub Test
 %%--------------------------------------------------------------------
 
-t_subscribed(_) ->
+t_subscribed({init, Config}) ->
     emqx_broker:subscribe(<<"topic">>),
+    Config;
+t_subscribed(Config) when is_list(Config) ->
     ?assertEqual(false, emqx_broker:subscribed(undefined, <<"topic">>)),
-    ?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>)),
+    ?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>));
+t_subscribed({'end', _Config}) ->
     emqx_broker:unsubscribe(<<"topic">>).
 
-t_subscribed_2(_) ->
+t_subscribed_2({init, Config}) ->
     emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
-    %?assertEqual(true, emqx_broker:subscribed(<<"clientid">>, <<"topic">>)),
-    ?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>)),
+    Config;
+t_subscribed_2(Config) when is_list(Config) ->
+    ?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>));
+t_subscribed_2({'end', _Config}) ->
     emqx_broker:unsubscribe(<<"topic">>).
 
-t_subopts(_) ->
+t_subopts({init, Config}) -> Config;
+t_subopts(Config) when is_list(Config) ->
     ?assertEqual(false, emqx_broker:set_subopts(<<"topic">>, #{qos => 1})),
     ?assertEqual(undefined, emqx_broker:get_subopts(self(), <<"topic">>)),
     ?assertEqual(undefined, emqx_broker:get_subopts(<<"clientid">>, <<"topic">>)),
@@ -70,42 +82,54 @@ t_subopts(_) ->
 
     ?assertEqual(true, emqx_broker:set_subopts(<<"topic">>, #{qos => 0})),
     ?assertEqual(#{nl => 0, qos => 0, rap => 0, rh => 0, subid => <<"clientid">>},
-                 emqx_broker:get_subopts(self(), <<"topic">>)),
+                 emqx_broker:get_subopts(self(), <<"topic">>));
+t_subopts({'end', _Config}) ->
     emqx_broker:unsubscribe(<<"topic">>).
 
-t_topics(_) ->
+t_topics({init, Config}) ->
     Topics = [<<"topic">>, <<"topic/1">>, <<"topic/2">>],
-    ok = emqx_broker:subscribe(lists:nth(1, Topics), <<"clientId">>),
-    ok = emqx_broker:subscribe(lists:nth(2, Topics), <<"clientId">>),
-    ok = emqx_broker:subscribe(lists:nth(3, Topics), <<"clientId">>),
+    [{topics, Topics} | Config];
+t_topics(Config) when is_list(Config) ->
+    Topics = [T1, T2, T3] = proplists:get_value(topics, Config),
+    ok = emqx_broker:subscribe(T1, <<"clientId">>),
+    ok = emqx_broker:subscribe(T2, <<"clientId">>),
+    ok = emqx_broker:subscribe(T3, <<"clientId">>),
     Topics1 = emqx_broker:topics(),
     ?assertEqual(true, lists:foldl(fun(Topic, Acc) ->
                                        case lists:member(Topic, Topics1) of
                                            true -> Acc;
                                            false -> false
                                        end
-                                   end, true, Topics)),
-    emqx_broker:unsubscribe(lists:nth(1, Topics)),
-    emqx_broker:unsubscribe(lists:nth(2, Topics)),
-    emqx_broker:unsubscribe(lists:nth(3, Topics)).
+                                   end, true, Topics));
+t_topics({'end', Config}) ->
+    Topics = proplists:get_value(topics, Config),
+    lists:foreach(fun(T) -> emqx_broker:unsubscribe(T) end, Topics).
 
-t_subscribers(_) ->
+t_subscribers({init, Config}) ->
     emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
-    ?assertEqual([self()], emqx_broker:subscribers(<<"topic">>)),
+    Config;
+t_subscribers(Config) when is_list(Config) ->
+    ?assertEqual([self()], emqx_broker:subscribers(<<"topic">>));
+t_subscribers({'end', _Config}) ->
     emqx_broker:unsubscribe(<<"topic">>).
 
-t_subscriptions(_) ->
+t_subscriptions({init, Config}) ->
     emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}),
-    ok = timer:sleep(100),
+    Config;
+t_subscriptions(Config) when is_list(Config) ->
+    ct:sleep(100),
     ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>},
                  proplists:get_value(<<"topic">>, emqx_broker:subscriptions(self()))),
     ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>},
-                 proplists:get_value(<<"topic">>, emqx_broker:subscriptions(<<"clientid">>))),
+                 proplists:get_value(<<"topic">>, emqx_broker:subscriptions(<<"clientid">>)));
+t_subscriptions({'end', _Config}) ->
     emqx_broker:unsubscribe(<<"topic">>).
 
-t_sub_pub(_) ->
+t_sub_pub({init, Config}) ->
     ok = emqx_broker:subscribe(<<"topic">>),
-    ct:sleep(10),
+    Config;
+t_sub_pub(Config) when is_list(Config) ->
+    ct:sleep(100),
     emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
     ?assert(
         receive
@@ -115,16 +139,22 @@ t_sub_pub(_) ->
                 false
         after 100 ->
             false
-        end).
+        end);
+t_sub_pub({'end', _Config}) ->
+    ok = emqx_broker:unsubscribe(<<"topic">>).
 
-t_nosub_pub(_) ->
+t_nosub_pub({init, Config}) -> Config;
+t_nosub_pub({'end', _Config}) -> ok;
+t_nosub_pub(Config) when is_list(Config) ->
     ?assertEqual(0, emqx_metrics:val('messages.dropped')),
     emqx_broker:publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
     ?assertEqual(1, emqx_metrics:val('messages.dropped')).
 
-t_shared_subscribe(_) ->
+t_shared_subscribe({init, Config}) ->
     emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{share => <<"group">>}),
-    ct:sleep(10),
+    ct:sleep(100),
+    Config;
+t_shared_subscribe(Config) when is_list(Config) ->
     emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
     ?assert(receive
                 {deliver, <<"topic">>, #message{payload = <<"hello">>}} ->
@@ -134,9 +164,12 @@ t_shared_subscribe(_) ->
                     false
             after 100 ->
                 false
-            end),
+            end);
+t_shared_subscribe({'end', _Config}) ->
     emqx_broker:unsubscribe(<<"$share/group/topic">>).
 
+t_shared_subscribe_2({init, Config}) -> Config;
+t_shared_subscribe_2({'end', _Config}) -> ok;
 t_shared_subscribe_2(_) ->
     {ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]),
     {ok, _} = emqtt:connect(ConnPid),
@@ -158,6 +191,8 @@ t_shared_subscribe_2(_) ->
     emqtt:disconnect(ConnPid),
     emqtt:disconnect(ConnPid2).
 
+t_shared_subscribe_3({init, Config}) -> Config;
+t_shared_subscribe_3({'end', _Config}) -> ok;
 t_shared_subscribe_3(_) ->
     {ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]),
     {ok, _} = emqtt:connect(ConnPid),
@@ -174,11 +209,13 @@ t_shared_subscribe_3(_) ->
     emqtt:disconnect(ConnPid),
     emqtt:disconnect(ConnPid2).
 
-t_shard(_) ->
+t_shard({init, Config}) ->
     ok = meck:new(emqx_broker_helper, [passthrough, no_history]),
     ok = meck:expect(emqx_broker_helper, get_sub_shard, fun(_, _) -> 1 end),
     emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
-    ct:sleep(10),
+    Config;
+t_shard(Config) when is_list(Config) ->
+    ct:sleep(100),
     emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
     ?assert(
         receive
@@ -188,23 +225,57 @@ t_shard(_) ->
                 false
         after 100 ->
             false
-        end),
+        end);
+t_shard({'end', _Config}) ->
+    emqx_broker:unsubscribe(<<"topic">>),
     ok = meck:unload(emqx_broker_helper).
 
-t_stats_fun(_) ->
-    N = emqx_stats:getstat('subscribers.count'),
-    N = emqx_stats:getstat('subscriptions.count'),
-    N = emqx_stats:getstat('suboptions.count'),
+t_stats_fun({init, Config}) ->
+    Parent = self(),
+    F = fun Loop() ->
+                N1 = emqx_stats:getstat('subscribers.count'),
+                N2 = emqx_stats:getstat('subscriptions.count'),
+                N3 = emqx_stats:getstat('suboptions.count'),
+                case N1 + N2 + N3 =:= 0 of
+                    true ->
+                        Parent ! {ready, self()},
+                        exit(normal);
+                    false ->
+                        receive
+                            stop ->
+                                exit(normal)
+                        after
+                            100 ->
+                                Loop()
+                        end
+                end
+        end,
+    Pid = spawn_link(F),
+    receive
+        {ready, P} when P =:= Pid->
+            Config
+    after
+        5000 ->
+            Pid ! stop,
+            ct:fail("timedout_waiting_for_sub_stats_to_reach_zero")
+    end;
+t_stats_fun(Config) when is_list(Config) ->
     ok = emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
     ok = emqx_broker:subscribe(<<"topic2">>, <<"clientid">>),
+    %% ensure stats refreshed
     emqx_broker:stats_fun(),
-    ct:sleep(10),
-    ?assertEqual(N + 2, emqx_stats:getstat('subscribers.count')),
-    ?assertEqual(N + 2, emqx_stats:getstat('subscribers.max')),
-    ?assertEqual(N + 2, emqx_stats:getstat('subscriptions.count')),
-    ?assertEqual(N + 2, emqx_stats:getstat('subscriptions.max')),
-    ?assertEqual(N + 2, emqx_stats:getstat('suboptions.count')),
-    ?assertEqual(N + 2, emqx_stats:getstat('suboptions.max')).
+    %% emqx_stats:set_stat is a gen_server cast
+    %% make a synced call sync
+    ignored = gen_server:call(emqx_stats, call, infinity),
+    ?assertEqual(2, emqx_stats:getstat('subscribers.count')),
+    ?assertEqual(2, emqx_stats:getstat('subscribers.max')),
+    ?assertEqual(2, emqx_stats:getstat('subscriptions.count')),
+    ?assertEqual(2, emqx_stats:getstat('subscriptions.max')),
+    ?assertEqual(2, emqx_stats:getstat('suboptions.count')),
+    ?assertEqual(2, emqx_stats:getstat('suboptions.max'));
+t_stats_fun({'end', _Config}) ->
+    ok = emqx_broker:unsubscribe(<<"topic">>),
+    ok = emqx_broker:unsubscribe(<<"topic2">>).
 
 recv_msgs(Count) ->
     recv_msgs(Count, []).