Просмотр исходного кода

test: attempt to fix flaky test

https://github.com/emqx/emqx/actions/runs/9662725303/job/26653594859?pr=13328#step:6:186

```
%%% emqx_resource_SUITE ==> t_expiration_retry: FAILED
%%% emqx_resource_SUITE ==> {{panic,
     #{msg => "Unexpected result",
       result =>
           {run_stage_failed,error,
               {badmatch,{ok,timeout}},
               [{emqx_resource_SUITE,'-do_t_expiration_retry/0-fun-12-',0,
                    [{file,
                         "/__w/emqx/emqx/apps/emqx_resource/test/emqx_resource_SUITE.erl"},
                     {line,2569}]},
                {emqx_resource_SUITE,do_t_expiration_retry,0,
                    [{file,
                         "/__w/emqx/emqx/apps/emqx_resource/test/emqx_resource_SUITE.erl"},
                     {line,2518}]}]}}},
 [{emqx_resource_SUITE,do_t_expiration_retry,0,
      [{file,"/__w/emqx/emqx/apps/emqx_resource/test/emqx_resource_SUITE.erl"},
       {line,2594}]},
  {test_server,ts_tc,3,[{file,"test_server.erl"},{line,1793}]},
  {test_server,run_test_case_eval1,6,[{file,"test_server.erl"},{line,1302}]},
  {test_server,run_test_case_eval,9,[{file,"test_server.erl"},{line,1234}]}]}
```
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
954adc71c4
1 измененных файлов с 35 добавлено и 22 удалено
  1. 35 22
      apps/emqx_resource/test/emqx_resource_SUITE.erl

+ 35 - 22
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -29,6 +29,9 @@
 -define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}).
 -define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}).
 -define(TELEMETRY_PREFIX, emqx, resource).
+-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX),
+    {query, FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX}
+).
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
@@ -2494,7 +2497,7 @@ t_expiration_retry(_Config) ->
             resume_interval => 300
         }
     ),
-    do_t_expiration_retry().
+    do_t_expiration_retry(#{is_batch => false}).
 
 t_expiration_retry_batch(_Config) ->
     emqx_connector_demo:set_callback_mode(always_sync),
@@ -2511,20 +2514,17 @@ t_expiration_retry_batch(_Config) ->
             resume_interval => 300
         }
     ),
-    do_t_expiration_retry().
+    do_t_expiration_retry(#{is_batch => true}).
 
-do_t_expiration_retry() ->
+do_t_expiration_retry(Context) ->
+    IsBatch = maps:get(is_batch, Context),
     ResumeInterval = 300,
     ?check_trace(
+        #{timetrap => 10_000},
         begin
             ok = emqx_resource:simple_sync_query(?ID, block),
 
-            {ok, SRef0} = snabbkaffe:subscribe(
-                ?match_event(#{?snk_kind := buffer_worker_flush_nack}),
-                1,
-                200
-            ),
-            TimeoutMS = 100,
+            TimeoutMS = 200,
             %% the request that expires must be first, so it's the
             %% head of the inflight table (and retriable).
             {ok, SRef1} = snabbkaffe:subscribe(
@@ -2542,6 +2542,8 @@ do_t_expiration_retry() ->
                     )
                 )
             end),
+            %% This second message must be enqueued while the resource is blocked by the
+            %% previous message.
             Pid1 =
                 spawn_link(fun() ->
                     receive
@@ -2556,22 +2558,33 @@ do_t_expiration_retry() ->
                         )
                     )
                 end),
+            ?tp("waiting for first message to be appended to the queue", #{}),
             {ok, _} = snabbkaffe:receive_events(SRef1),
-            Pid1 ! go,
-            {ok, _} = snabbkaffe:receive_events(SRef0),
 
-            {ok, _} =
-                ?block_until(
-                    #{?snk_kind := buffer_worker_retry_expired},
-                    ResumeInterval * 10
-                ),
+            ?tp("waiting for first message to expire during blocked retries", #{}),
+            {ok, _} = ?block_until(#{?snk_kind := buffer_worker_retry_expired}),
 
-            {ok, {ok, _}} =
-                ?wait_async_action(
-                    emqx_resource:simple_sync_query(?ID, resume),
-                    #{?snk_kind := buffer_worker_retry_inflight_succeeded},
-                    ResumeInterval * 5
-                ),
+            %% Now we wait until the worker tries the second message at least once before
+            %% unblocking it.
+            Pid1 ! go,
+            ?tp("waiting for second message to be retried and be nacked while blocked", #{}),
+            case IsBatch of
+                false ->
+                    {ok, _} = ?block_until(#{
+                        ?snk_kind := buffer_worker_flush_nack,
+                        batch_or_query := ?QUERY(_, {inc_counter, 2}, _, _, _)
+                    });
+                true ->
+                    {ok, _} = ?block_until(#{
+                        ?snk_kind := buffer_worker_flush_nack,
+                        batch_or_query := [?QUERY(_, {inc_counter, 2}, _, _, _) | _]
+                    })
+            end,
+
+            %% Bypass the buffer worker and unblock the resource.
+            ok = emqx_resource:simple_sync_query(?ID, resume),
+            ?tp("waiting for second message to be retried and be acked, unblocking", #{}),
+            {ok, _} = ?block_until(#{?snk_kind := buffer_worker_retry_inflight_succeeded}),
 
             ok
         end,