Explorar o código

test(emqx_resource): add regression test for recursive flushing

Erik Timan %!s(int64=3) %!d(string=hai) anos
pai
achega
2442a4dea7
Modificáronse 1 ficheiros con 61 adicións e 0 borrados
  1. 61 0
      apps/emqx_resource/test/emqx_resource_SUITE.erl

+ 61 - 0
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -2291,6 +2291,67 @@ t_expiration_retry_batch_multiple_times(_Config) ->
     ),
     ok.
 
+t_recursive_flush(_Config) ->
+    emqx_connector_demo:set_callback_mode(async_if_possible),
+    {ok, _} = emqx_resource:create(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource},
+        #{
+            query_mode => async,
+            batch_size => 1,
+            batch_time => 10_000,
+            worker_pool_size => 1
+        }
+    ),
+    do_t_recursive_flush().
+
+t_recursive_flush_batch(_Config) ->
+    emqx_connector_demo:set_callback_mode(async_if_possible),
+    {ok, _} = emqx_resource:create(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource},
+        #{
+            query_mode => async,
+            batch_size => 2,
+            batch_time => 10_000,
+            worker_pool_size => 1
+        }
+    ),
+    do_t_recursive_flush().
+
+do_t_recursive_flush() ->
+    ?check_trace(
+        begin
+            Timeout = 1_000,
+            Pid = spawn_link(fun S() ->
+                emqx_resource:query(?ID, {inc_counter, 1}),
+                S()
+            end),
+            %% we want two reflushes to happen before we analyze the
+            %% trace, so that we get a single full interaction
+            {ok, _} = snabbkaffe:block_until(
+                ?match_n_events(2, #{?snk_kind := buffer_worker_flush_ack_reflush}), Timeout
+            ),
+            unlink(Pid),
+            exit(Pid, kill),
+            ok
+        end,
+        fun(Trace) ->
+            %% check that a recursive flush leads to a new call to flush/1
+            Pairs = ?find_pairs(
+                #{?snk_kind := buffer_worker_flush_ack_reflush},
+                #{?snk_kind := buffer_worker_flush},
+                Trace
+            ),
+            ?assert(lists:any(fun(E) -> E end, [true || {pair, _, _} <- Pairs]))
+        end
+    ),
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Helpers
 %%------------------------------------------------------------------------------