Explorar el Código

Merge pull request #9984 from olcai/add-regression-test-for-resource-buffer-worker-flush

Add regression test for resource buffer worker flush
Erik Timan hace 3 años
padre
commit
be6ef9379d

+ 1 - 1
apps/emqx_resource/src/emqx_resource.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 %% -*- mode: erlang -*-
 {application, emqx_resource, [
 {application, emqx_resource, [
     {description, "Manager for all external resources"},
     {description, "Manager for all external resources"},
-    {vsn, "0.1.7"},
+    {vsn, "0.1.8"},
     {registered, []},
     {registered, []},
     {mod, {emqx_resource_app, []}},
     {mod, {emqx_resource_app, []}},
     {applications, [
     {applications, [

+ 18 - 4
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -468,7 +468,10 @@ flush(Data0) ->
         queue := Q0
         queue := Q0
     } = Data0,
     } = Data0,
     Data1 = cancel_flush_timer(Data0),
     Data1 = cancel_flush_timer(Data0),
-    case {queue_count(Q0), is_inflight_full(InflightTID)} of
+    CurrentCount = queue_count(Q0),
+    IsFull = is_inflight_full(InflightTID),
+    ?tp(buffer_worker_flush, #{queue_count => CurrentCount, is_full => IsFull}),
+    case {CurrentCount, IsFull} of
         {0, _} ->
         {0, _} ->
             {keep_state, Data1};
             {keep_state, Data1};
         {_, true} ->
         {_, true} ->
@@ -595,8 +598,12 @@ do_flush(
                     result => Result
                     result => Result
                 }
                 }
             ),
             ),
-            case queue_count(Q1) > 0 of
+            CurrentCount = queue_count(Q1),
+            case CurrentCount > 0 of
                 true ->
                 true ->
+                    ?tp(buffer_worker_flush_ack_reflush, #{
+                        batch_or_query => Request, result => Result, queue_count => CurrentCount
+                    }),
                     flush_worker(self());
                     flush_worker(self());
                 false ->
                 false ->
                     ok
                     ok
@@ -666,19 +673,26 @@ do_flush(Data0, #{
             {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
             {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
             store_async_worker_reference(InflightTID, Ref, WorkerMRef),
             store_async_worker_reference(InflightTID, Ref, WorkerMRef),
             emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
             emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
+            CurrentCount = queue_count(Q1),
             ?tp(
             ?tp(
                 buffer_worker_flush_ack,
                 buffer_worker_flush_ack,
                 #{
                 #{
                     batch_or_query => Batch,
                     batch_or_query => Batch,
-                    result => Result
+                    result => Result,
+                    queue_count => CurrentCount
                 }
                 }
             ),
             ),
-            CurrentCount = queue_count(Q1),
             Data2 =
             Data2 =
                 case {CurrentCount > 0, CurrentCount >= BatchSize} of
                 case {CurrentCount > 0, CurrentCount >= BatchSize} of
                     {false, _} ->
                     {false, _} ->
                         Data1;
                         Data1;
                     {true, true} ->
                     {true, true} ->
+                        ?tp(buffer_worker_flush_ack_reflush, #{
+                            batch_or_query => Batch,
+                            result => Result,
+                            queue_count => CurrentCount,
+                            batch_size => BatchSize
+                        }),
                         flush_worker(self()),
                         flush_worker(self()),
                         Data1;
                         Data1;
                     {true, false} ->
                     {true, false} ->

+ 61 - 0
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -2291,6 +2291,67 @@ t_expiration_retry_batch_multiple_times(_Config) ->
     ),
     ),
     ok.
     ok.
 
 
+t_recursive_flush(_Config) ->
+    emqx_connector_demo:set_callback_mode(async_if_possible),
+    {ok, _} = emqx_resource:create(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource},
+        #{
+            query_mode => async,
+            batch_size => 1,
+            batch_time => 10_000,
+            worker_pool_size => 1
+        }
+    ),
+    do_t_recursive_flush().
+
+t_recursive_flush_batch(_Config) ->
+    emqx_connector_demo:set_callback_mode(async_if_possible),
+    {ok, _} = emqx_resource:create(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource},
+        #{
+            query_mode => async,
+            batch_size => 2,
+            batch_time => 10_000,
+            worker_pool_size => 1
+        }
+    ),
+    do_t_recursive_flush().
+
+do_t_recursive_flush() ->
+    ?check_trace(
+        begin
+            Timeout = 1_000,
+            Pid = spawn_link(fun S() ->
+                emqx_resource:query(?ID, {inc_counter, 1}),
+                S()
+            end),
+            %% we want two reflushes to happen before we analyze the
+            %% trace, so that we get a single full interaction
+            {ok, _} = snabbkaffe:block_until(
+                ?match_n_events(2, #{?snk_kind := buffer_worker_flush_ack_reflush}), Timeout
+            ),
+            unlink(Pid),
+            exit(Pid, kill),
+            ok
+        end,
+        fun(Trace) ->
+            %% check that a recursive flush leads to a new call to flush/1
+            Pairs = ?find_pairs(
+                #{?snk_kind := buffer_worker_flush_ack_reflush},
+                #{?snk_kind := buffer_worker_flush},
+                Trace
+            ),
+            ?assert(lists:any(fun(E) -> E end, [true || {pair, _, _} <- Pairs]))
+        end
+    ),
+    ok.
+
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% Helpers
 %% Helpers
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------