Просмотр исходного кода

Merge pull request #6120 from zmstone/test-fix-flaky-test-t_rest_clienit_inf

test: fix flaky test emqx_stomp_SUITE:t_rest_clienit_inf
JianBo He 4 лет назад
Родитель
Сommit
3c1ae385ee
2 измененных файлов с 22 добавлено и 5 удалено
  1. 13 1
      apps/emqx/src/emqx_pool.erl
  2. 9 4
      apps/emqx_gateway/test/emqx_stomp_SUITE.erl

+ 13 - 1
apps/emqx/src/emqx_pool.erl

@@ -32,7 +32,7 @@
         ]).
 
 -ifdef(TEST).
--export([worker/0]).
+-export([worker/0, flush_async_tasks/0]).
 -endif.
 
 %% gen_server callbacks
@@ -139,3 +139,15 @@ run({F, A}) when is_function(F), is_list(A) ->
 run(Fun) when is_function(Fun) ->
     Fun().
 
+-ifdef(TEST).
+%% This help function creates a large enough number of async tasks
+%% to force flush the pool workers.
+%% The number of tasks should be large enough to ensure all workers have
+%% the chance to work on at least one of the tasks.
+flush_async_tasks() ->
+    Ref = make_ref(),
+    Self = self(),
+    L = lists:seq(1, 997),
+    lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, Ref, I} end, []) end, L),
+    lists:foreach(fun(I) -> receive {done, Ref, I} -> ok end end, L).
+-endif.

+ 9 - 4
apps/emqx_gateway/test/emqx_stomp_SUITE.erl

@@ -97,7 +97,8 @@ t_connect(_) ->
     %                    {ok, Data} = gen_tcp:recv(Sock, 0),
     %                    {ok, #stomp_frame{command = <<"ERROR">>,
     %                                      headers = _,
-    %                                      body    = <<"Login or passcode error!">>}, _, _} = parse(Data)
+    %                                      body    = <<"Login or passcode error!">>}, _, _} =
+    %                                                   parse(Data)
     %                end),
 
     %% Connect will be failed, because of bad version
@@ -109,9 +110,12 @@ t_connect(_) ->
                                       {<<"passcode">>, <<"guest">>},
                                       {<<"heart-beat">>, <<"1000,2000">>}])),
         {ok, Data} = gen_tcp:recv(Sock, 0),
-        {ok, #stomp_frame{command = <<"ERROR">>,
-                          headers = _,
-                          body    = <<"Login Failed: Supported protocol versions < 1.2">>}, _, _} = parse(Data)
+        {ok,
+         #stomp_frame{command = <<"ERROR">>,
+                      headers = _,
+                      body    = <<"Login Failed: Supported protocol versions < 1.2">>},
+         _,
+         _ } = parse(Data)
     end).
 
 t_heartbeat(_) ->
@@ -403,6 +407,7 @@ t_rest_clienit_info(_) ->
 
         %% kickout
         {204, _} = request(delete, ClientPath),
+        ok = emqx_pool:flush_async_tasks(),
         {200, Clients2} = request(get, "/gateway/stomp/clients"),
         ?assertEqual(0, length(maps:get(data, Clients2)))
     end).