Просмотр исходного кода

refactor(emqx_resource): add more trace points for flushing

Erik Timan 3 лет назад
Родитель
Сommit
dcf70e0e68
1 измененных файлов с 18 добавлено и 4 удалено
  1. 18 4
      apps/emqx_resource/src/emqx_resource_buffer_worker.erl

+ 18 - 4
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -468,7 +468,10 @@ flush(Data0) ->
         queue := Q0
     } = Data0,
     Data1 = cancel_flush_timer(Data0),
-    case {queue_count(Q0), is_inflight_full(InflightTID)} of
+    CurrentCount = queue_count(Q0),
+    IsFull = is_inflight_full(InflightTID),
+    ?tp(buffer_worker_flush, #{queue_count => CurrentCount, is_full => IsFull}),
+    case {CurrentCount, IsFull} of
         {0, _} ->
             {keep_state, Data1};
         {_, true} ->
@@ -595,8 +598,12 @@ do_flush(
                     result => Result
                 }
             ),
-            case queue_count(Q1) > 0 of
+            CurrentCount = queue_count(Q1),
+            case CurrentCount > 0 of
                 true ->
+                    ?tp(buffer_worker_flush_ack_reflush, #{
+                        batch_or_query => Request, result => Result, queue_count => CurrentCount
+                    }),
                     flush_worker(self());
                 false ->
                     ok
@@ -666,19 +673,26 @@ do_flush(Data0, #{
             {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
             store_async_worker_reference(InflightTID, Ref, WorkerMRef),
             emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
+            CurrentCount = queue_count(Q1),
             ?tp(
                 buffer_worker_flush_ack,
                 #{
                     batch_or_query => Batch,
-                    result => Result
+                    result => Result,
+                    queue_count => CurrentCount
                 }
             ),
-            CurrentCount = queue_count(Q1),
             Data2 =
                 case {CurrentCount > 0, CurrentCount >= BatchSize} of
                     {false, _} ->
                         Data1;
                     {true, true} ->
+                        ?tp(buffer_worker_flush_ack_reflush, #{
+                            batch_or_query => Batch,
+                            result => Result,
+                            queue_count => CurrentCount,
+                            batch_size => BatchSize
+                        }),
                         flush_worker(self()),
                         Data1;
                     {true, false} ->