Просмотр исходного кода

perf(buffer_worker): flush metrics periodically inside buffer worker process

Fixes https://emqx.atlassian.net/browse/EMQX-9905

Since calling `telemetry` is costly in a hot path, we instead collect
metrics inside the buffer workers state and periodically flush them,
rather than immediately as events happen.
Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
7d798c10e9

+ 15 - 11
apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl

@@ -100,17 +100,21 @@
     ?assertMetrics(Pat, true, BridgeID)
 ).
 -define(assertMetrics(Pat, Guard, BridgeID),
-    ?assertMatch(
-        #{
-            <<"metrics">> := Pat,
-            <<"node_metrics">> := [
-                #{
-                    <<"node">> := _,
-                    <<"metrics">> := Pat
-                }
-            ]
-        } when Guard,
-        request_bridge_metrics(BridgeID)
+    ?retry(
+        _Sleep = 300,
+        _Attempts0 = 20,
+        ?assertMatch(
+            #{
+                <<"metrics">> := Pat,
+                <<"node_metrics">> := [
+                    #{
+                        <<"node">> := _,
+                        <<"metrics">> := Pat
+                    }
+                ]
+            } when Guard,
+            request_bridge_metrics(BridgeID)
+        )
     )
 ).
 

+ 11 - 46
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl

@@ -288,6 +288,7 @@ gcp_pubsub_config(Config) ->
             "  pipelining = ~b\n"
             "  resource_opts = {\n"
             "    request_timeout = 500ms\n"
+            "    metrics_flush_interval = 700ms\n"
             "    worker_pool_size = 1\n"
             "    query_mode = ~s\n"
             "    batch_size = ~b\n"
@@ -529,12 +530,14 @@ wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
     end.
 
 receive_all_events(EventName, Timeout) ->
-    receive_all_events(EventName, Timeout, []).
+    receive_all_events(EventName, Timeout, _MaxEvents = 10, _Count = 0, _Acc = []).
 
-receive_all_events(EventName, Timeout, Acc) ->
+receive_all_events(_EventName, _Timeout, MaxEvents, Count, Acc) when Count >= MaxEvents ->
+    lists:reverse(Acc);
+receive_all_events(EventName, Timeout, MaxEvents, Count, Acc) ->
     receive
         {telemetry, #{name := [_, _, EventName]} = Event} ->
-            receive_all_events(EventName, Timeout, [Event | Acc])
+            receive_all_events(EventName, Timeout, MaxEvents, Count + 1, [Event | Acc])
     after Timeout ->
         lists:reverse(Acc)
     end.
@@ -557,8 +560,9 @@ wait_n_events(_TelemetryTable, _ResourceId, NEvents, _Timeout, _EventName) when
     ok;
 wait_n_events(TelemetryTable, ResourceId, NEvents, Timeout, EventName) ->
     receive
-        {telemetry, #{name := [_, _, EventName]}} ->
-            wait_n_events(TelemetryTable, ResourceId, NEvents - 1, Timeout, EventName)
+        {telemetry, #{name := [_, _, EventName], measurements := #{counter_inc := Inc}} = Event} ->
+            ct:pal("telemetry event: ~p", [Event]),
+            wait_n_events(TelemetryTable, ResourceId, NEvents - Inc, Timeout, EventName)
     after Timeout ->
         RecordedEvents = ets:tab2list(TelemetryTable),
         CurrentMetrics = current_metrics(ResourceId),
@@ -575,7 +579,6 @@ t_publish_success(Config) ->
     ResourceId = ?config(resource_id, Config),
     ServiceAccountJSON = ?config(service_account_json, Config),
     TelemetryTable = ?config(telemetry_table, Config),
-    QueryMode = ?config(query_mode, Config),
     Topic = <<"t/topic">>,
     ?check_trace(
         create_bridge(Config),
@@ -604,17 +607,6 @@ t_publish_success(Config) ->
     ),
     %% to avoid test flakiness
     wait_telemetry_event(TelemetryTable, success, ResourceId),
-    ExpectedInflightEvents =
-        case QueryMode of
-            sync -> 1;
-            async -> 3
-        end,
-    wait_telemetry_event(
-        TelemetryTable,
-        inflight,
-        ResourceId,
-        #{n_events => ExpectedInflightEvents, timeout => 5_000}
-    ),
     wait_until_gauge_is(queuing, 0, 500),
     wait_until_gauge_is(inflight, 0, 500),
     assert_metrics(
@@ -635,7 +627,6 @@ t_publish_success_local_topic(Config) ->
     ResourceId = ?config(resource_id, Config),
     ServiceAccountJSON = ?config(service_account_json, Config),
     TelemetryTable = ?config(telemetry_table, Config),
-    QueryMode = ?config(query_mode, Config),
     LocalTopic = <<"local/topic">>,
     {ok, _} = create_bridge(Config, #{<<"local_topic">> => LocalTopic}),
     assert_empty_metrics(ResourceId),
@@ -654,17 +645,6 @@ t_publish_success_local_topic(Config) ->
     ),
     %% to avoid test flakiness
     wait_telemetry_event(TelemetryTable, success, ResourceId),
-    ExpectedInflightEvents =
-        case QueryMode of
-            sync -> 1;
-            async -> 3
-        end,
-    wait_telemetry_event(
-        TelemetryTable,
-        inflight,
-        ResourceId,
-        #{n_events => ExpectedInflightEvents, timeout => 5_000}
-    ),
     wait_until_gauge_is(queuing, 0, 500),
     wait_until_gauge_is(inflight, 0, 500),
     assert_metrics(
@@ -696,7 +676,6 @@ t_publish_templated(Config) ->
     ResourceId = ?config(resource_id, Config),
     ServiceAccountJSON = ?config(service_account_json, Config),
     TelemetryTable = ?config(telemetry_table, Config),
-    QueryMode = ?config(query_mode, Config),
     Topic = <<"t/topic">>,
     PayloadTemplate = <<
         "{\"payload\": \"${payload}\","
@@ -742,17 +721,6 @@ t_publish_templated(Config) ->
     ),
     %% to avoid test flakiness
     wait_telemetry_event(TelemetryTable, success, ResourceId),
-    ExpectedInflightEvents =
-        case QueryMode of
-            sync -> 1;
-            async -> 3
-        end,
-    wait_telemetry_event(
-        TelemetryTable,
-        inflight,
-        ResourceId,
-        #{n_events => ExpectedInflightEvents, timeout => 5_000}
-    ),
     wait_until_gauge_is(queuing, 0, 500),
     wait_until_gauge_is(inflight, 0, 500),
     assert_metrics(
@@ -1089,9 +1057,6 @@ do_econnrefused_or_timeout_test(Config, Error) ->
         %% message as dropped; and since it never considers the
         %% response expired, this succeeds.
         econnrefused ->
-            wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{
-                timeout => 10_000, n_events => 1
-            }),
             %% even waiting, hard to avoid flakiness... simpler to just sleep
             %% a bit until stabilization.
             ct:sleep(200),
@@ -1111,8 +1076,8 @@ do_econnrefused_or_timeout_test(Config, Error) ->
                 CurrentMetrics
             );
         timeout ->
-            wait_until_gauge_is(inflight, 0, _Timeout = 400),
-            wait_until_gauge_is(queuing, 0, _Timeout = 400),
+            wait_until_gauge_is(inflight, 0, _Timeout = 1_000),
+            wait_until_gauge_is(queuing, 0, _Timeout = 1_000),
             assert_metrics(
                 #{
                     dropped => 0,

+ 4 - 0
apps/emqx_resource/include/emqx_resource.hrl

@@ -103,6 +103,10 @@
 -define(HEALTHCHECK_INTERVAL, 15000).
 -define(HEALTHCHECK_INTERVAL_RAW, <<"15s">>).
 
+%% milliseconds
+-define(DEFAULT_METRICS_FLUSH_INTERVAL, 5_000).
+-define(DEFAULT_METRICS_FLUSH_INTERVAL_RAW, <<"5s">>).
+
 %% milliseconds
 -define(START_TIMEOUT, 5000).
 -define(START_TIMEOUT_RAW, <<"5s">>).

+ 235 - 148
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -80,16 +80,30 @@
 -type health_check_interval() :: timer:time().
 -type state() :: blocked | running.
 -type inflight_key() :: integer().
+-type counters() :: #{
+    dropped_expired => non_neg_integer(),
+    dropped_queue_full => non_neg_integer(),
+    dropped_resource_not_found => non_neg_integer(),
+    dropped_resource_stopped => non_neg_integer(),
+    success => non_neg_integer(),
+    failed => non_neg_integer(),
+    retried_success => non_neg_integer(),
+    retried_failed => non_neg_integer()
+}.
+-type inflight_table() :: ets:tid() | atom() | reference().
 -type data() :: #{
     id := id(),
     index := index(),
-    inflight_tid := ets:tid(),
+    inflight_tid := inflight_table(),
     async_workers := #{pid() => reference()},
     batch_size := pos_integer(),
     batch_time := timer:time(),
+    counters := counters(),
+    metrics_flush_interval := timer:time(),
     queue := replayq:q(),
     resume_interval := timer:time(),
-    tref := undefined | timer:tref()
+    tref := undefined | {timer:tref() | reference(), reference()},
+    metrics_tref := undefined | {timer:tref() | reference(), reference()}
 }.
 
 callback_mode() -> [state_functions, state_enter].
@@ -171,24 +185,29 @@ init({Id, Index, Opts}) ->
     emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)),
     emqx_resource_metrics:inflight_set(Id, Index, 0),
     InflightWinSize = maps:get(inflight_window, Opts, ?DEFAULT_INFLIGHT),
-    InflightTID = inflight_new(InflightWinSize, Id, Index),
+    InflightTID = inflight_new(InflightWinSize),
     HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
     RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT),
     BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
     BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0),
     DefaultResumeInterval = default_resume_interval(RequestTimeout, HealthCheckInterval),
     ResumeInterval = maps:get(resume_interval, Opts, DefaultResumeInterval),
-    Data = #{
+    MetricsFlushInterval = maps:get(metrics_flush_interval, Opts, ?DEFAULT_METRICS_FLUSH_INTERVAL),
+    Data0 = #{
         id => Id,
         index => Index,
         inflight_tid => InflightTID,
         async_workers => #{},
         batch_size => BatchSize,
         batch_time => BatchTime,
+        counters => #{},
+        metrics_flush_interval => MetricsFlushInterval,
         queue => Queue,
         resume_interval => ResumeInterval,
-        tref => undefined
+        tref => undefined,
+        metrics_tref => undefined
     },
+    Data = ensure_metrics_flush_timer(Data0),
     ?tp(buffer_worker_init, #{id => Id, index => Index, queue_opts => QueueOpts}),
     {ok, running, Data}.
 
@@ -208,11 +227,16 @@ running(cast, block, St) ->
     {next_state, blocked, St};
 running(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data) ->
     handle_query_requests(Request0, Data);
-running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) ->
-    flush(St#{tref := undefined});
-running(info, {flush, _Ref}, _St) ->
+running(info, {flush, Ref}, Data = #{tref := {_TRef, Ref}}) ->
+    flush(Data#{tref := undefined});
+running(info, {flush, _Ref}, _Data) ->
     ?tp(discarded_stale_flush, #{}),
     keep_state_and_data;
+running(info, {flush_metrics, Ref}, Data0 = #{metrics_tref := {_TRef, Ref}}) ->
+    Data = flush_metrics(Data0#{metrics_tref := undefined}),
+    {keep_state, Data};
+running(info, {flush_metrics, _Ref}, _Data) ->
+    keep_state_and_data;
 running(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
     is_map_key(Pid, AsyncWorkers0)
 ->
@@ -241,6 +265,11 @@ blocked(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data0) ->
 blocked(info, {flush, _Ref}, _Data) ->
     %% ignore stale timer
     keep_state_and_data;
+blocked(info, {flush_metrics, Ref}, Data0 = #{metrics_tref := {_TRef, Ref}}) ->
+    Data = flush_metrics(Data0#{metrics_tref := undefined}),
+    {keep_state, Data};
+blocked(info, {flush_metrics, _Ref}, _Data) ->
+    keep_state_and_data;
 blocked(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
     is_map_key(Pid, AsyncWorkers0)
 ->
@@ -310,11 +339,7 @@ pick_cast(Id, Key, Query) ->
 
 resume_from_blocked(Data) ->
     ?tp(buffer_worker_resume_from_blocked_enter, #{}),
-    #{
-        id := Id,
-        index := Index,
-        inflight_tid := InflightTID
-    } = Data,
+    #{inflight_tid := InflightTID} = Data,
     Now = now_(),
     case inflight_get_first_retriable(InflightTID, Now) of
         none ->
@@ -326,10 +351,15 @@ resume_from_blocked(Data) ->
             end;
         {expired, Ref, Batch} ->
             WorkerPid = self(),
-            IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid),
-            IsAcked andalso emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)),
+            IsAcked = ack_inflight(InflightTID, Ref, WorkerPid),
+            Counters =
+                case IsAcked of
+                    true -> #{dropped_expired => length(Batch)};
+                    false -> #{}
+                end,
+            NData = aggregate_counters(Data, Counters),
             ?tp(buffer_worker_retry_expired, #{expired => Batch}),
-            resume_from_blocked(Data);
+            resume_from_blocked(NData);
         {single, Ref, Query} ->
             %% We retry msgs in inflight window sync, as if we send them
             %% async, they will be appended to the end of inflight window again.
@@ -339,11 +369,11 @@ resume_from_blocked(Data) ->
         {batch, Ref, NotExpired, Expired} ->
             NumExpired = length(Expired),
             ok = update_inflight_item(InflightTID, Ref, NotExpired, NumExpired),
-            emqx_resource_metrics:dropped_expired_inc(Id, NumExpired),
+            NData = aggregate_counters(Data, #{dropped_expired => NumExpired}),
             ?tp(buffer_worker_retry_expired, #{expired => Expired}),
             %% We retry msgs in inflight window sync, as if we send them
             %% async, they will be appended to the end of inflight window again.
-            retry_inflight_sync(Ref, NotExpired, Data)
+            retry_inflight_sync(Ref, NotExpired, NData)
     end.
 
 retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
@@ -356,7 +386,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
     ?tp(buffer_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}),
     QueryOpts = #{simple_query => false},
     Result = call_query(force_sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
-    ReplyResult =
+    {ShouldAck, PostFn, DeltaCounters} =
         case QueryOrBatch of
             ?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) ->
                 Reply = ?REPLY(ReplyTo, HasBeenSent, Result),
@@ -364,9 +394,10 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
             [?QUERY(_, _, _, _) | _] = Batch ->
                 batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts)
         end,
-    case ReplyResult of
+    Data1 = aggregate_counters(Data0, DeltaCounters),
+    case ShouldAck of
         %% Send failed because resource is down
-        {nack, PostFn} ->
+        nack ->
             PostFn(),
             ?tp(
                 buffer_worker_retry_inflight_failed,
@@ -375,11 +406,11 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
                     query_or_batch => QueryOrBatch
                 }
             ),
-            {keep_state, Data0, {state_timeout, ResumeT, unblock}};
+            {keep_state, Data1, {state_timeout, ResumeT, unblock}};
         %% Send ok or failed but the resource is working
-        {ack, PostFn} ->
+        ack ->
             WorkerPid = self(),
-            IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid),
+            IsAcked = ack_inflight(InflightTID, Ref, WorkerPid),
             %% we need to defer bumping the counters after
             %% `inflight_drop' to avoid the race condition when an
             %% inflight request might get completed concurrently with
@@ -394,7 +425,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
                     query_or_batch => QueryOrBatch
                 }
             ),
-            resume_from_blocked(Data0)
+            resume_from_blocked(Data1)
     end.
 
 %% Called during the `running' state only.
@@ -426,9 +457,9 @@ collect_and_enqueue_query_requests(Request0, Data0) ->
             end,
             Requests
         ),
-    {Overflown, NewQ} = append_queue(Id, Index, Q, Queries),
+    {Overflown, NewQ, DeltaCounters} = append_queue(Id, Index, Q, Queries),
     ok = reply_overflown(Overflown),
-    Data0#{queue := NewQ}.
+    aggregate_counters(Data0#{queue := NewQ}, DeltaCounters).
 
 reply_overflown([]) ->
     ok;
@@ -463,8 +494,6 @@ maybe_flush(Data0) ->
 -spec flush(data()) -> gen_statem:event_handler_result(state(), data()).
 flush(Data0) ->
     #{
-        id := Id,
-        index := Index,
         batch_size := BatchSize,
         inflight_tid := InflightTID,
         queue := Q0
@@ -497,13 +526,13 @@ flush(Data0) ->
             case sieve_expired_requests(Batch, Now) of
                 {[], _AllExpired} ->
                     ok = replayq:ack(Q1, QAckRef),
-                    emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)),
-                    emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
+                    NumExpired = length(Batch),
+                    Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}),
                     ?tp(buffer_worker_flush_all_expired, #{batch => Batch}),
-                    flush(Data2);
+                    flush(Data3);
                 {NotExpired, Expired} ->
                     NumExpired = length(Expired),
-                    emqx_resource_metrics:dropped_expired_inc(Id, NumExpired),
+                    Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}),
                     IsBatch = (BatchSize > 1),
                     %% We *must* use the new queue, because we currently can't
                     %% `nack' a `pop'.
@@ -513,7 +542,7 @@ flush(Data0) ->
                         #{expired => Expired, not_expired => NotExpired}
                     ),
                     Ref = make_request_ref(),
-                    do_flush(Data2, #{
+                    do_flush(Data3, #{
                         is_batch => IsBatch,
                         batch => NotExpired,
                         ref => Ref,
@@ -548,7 +577,9 @@ do_flush(
     QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
     Result = call_query(async_if_possible, Id, Index, Ref, Request, QueryOpts),
     Reply = ?REPLY(ReplyTo, HasBeenSent, Result),
-    case reply_caller(Id, Reply, QueryOpts) of
+    {ShouldAck, DeltaCounters} = reply_caller(Id, Reply, QueryOpts),
+    Data1 = aggregate_counters(Data0, DeltaCounters),
+    case ShouldAck of
         %% Failed; remove the request from the queue, as we cannot pop
         %% from it again, but we'll retry it using the inflight table.
         nack ->
@@ -562,11 +593,10 @@ do_flush(
             %% request will be retried (i.e., it might not have been
             %% inserted during `call_query' if the resource was down
             %% and/or if it was a sync request).
-            inflight_append(InflightTID, InflightItem, Id, Index),
+            inflight_append(InflightTID, InflightItem),
             mark_inflight_as_retriable(InflightTID, Ref),
-            {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
+            {Data2, WorkerMRef} = ensure_async_worker_monitored(Data1, Result),
             store_async_worker_reference(InflightTID, Ref, WorkerMRef),
-            emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
             ?tp(
                 buffer_worker_flush_nack,
                 #{
@@ -576,7 +606,7 @@ do_flush(
                     result => Result
                 }
             ),
-            {next_state, blocked, Data1};
+            {next_state, blocked, Data2};
         %% Success; just ack.
         ack ->
             ok = replayq:ack(Q1, QAckRef),
@@ -588,15 +618,14 @@ do_flush(
             WorkerPid = self(),
             case is_async_return(Result) of
                 true when IsUnrecoverableError ->
-                    ack_inflight(InflightTID, Ref, Id, Index, WorkerPid);
+                    ack_inflight(InflightTID, Ref, WorkerPid);
                 true ->
                     ok;
                 false ->
-                    ack_inflight(InflightTID, Ref, Id, Index, WorkerPid)
+                    ack_inflight(InflightTID, Ref, WorkerPid)
             end,
-            {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
+            {Data2, WorkerMRef} = ensure_async_worker_monitored(Data1, Result),
             store_async_worker_reference(InflightTID, Ref, WorkerMRef),
-            emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
             ?tp(
                 buffer_worker_flush_ack,
                 #{
@@ -617,7 +646,7 @@ do_flush(
                     }),
                     ok
             end,
-            {keep_state, Data1}
+            {keep_state, Data2}
     end;
 do_flush(#{queue := Q1} = Data0, #{
     is_batch := true,
@@ -633,7 +662,9 @@ do_flush(#{queue := Q1} = Data0, #{
     } = Data0,
     QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
     Result = call_query(async_if_possible, Id, Index, Ref, Batch, QueryOpts),
-    case batch_reply_caller(Id, Result, Batch, QueryOpts) of
+    {ShouldAck, DeltaCounters} = batch_reply_caller(Id, Result, Batch, QueryOpts),
+    Data1 = aggregate_counters(Data0, DeltaCounters),
+    case ShouldAck of
         %% Failed; remove the request from the queue, as we cannot pop
         %% from it again, but we'll retry it using the inflight table.
         nack ->
@@ -647,11 +678,10 @@ do_flush(#{queue := Q1} = Data0, #{
             %% request will be retried (i.e., it might not have been
             %% inserted during `call_query' if the resource was down
             %% and/or if it was a sync request).
-            inflight_append(InflightTID, InflightItem, Id, Index),
+            inflight_append(InflightTID, InflightItem),
             mark_inflight_as_retriable(InflightTID, Ref),
-            {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
+            {Data2, WorkerMRef} = ensure_async_worker_monitored(Data1, Result),
             store_async_worker_reference(InflightTID, Ref, WorkerMRef),
-            emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
             ?tp(
                 buffer_worker_flush_nack,
                 #{
@@ -661,7 +691,7 @@ do_flush(#{queue := Q1} = Data0, #{
                     result => Result
                 }
             ),
-            {next_state, blocked, Data1};
+            {next_state, blocked, Data2};
         %% Success; just ack.
         ack ->
             ok = replayq:ack(Q1, QAckRef),
@@ -673,15 +703,14 @@ do_flush(#{queue := Q1} = Data0, #{
             WorkerPid = self(),
             case is_async_return(Result) of
                 true when IsUnrecoverableError ->
-                    ack_inflight(InflightTID, Ref, Id, Index, WorkerPid);
+                    ack_inflight(InflightTID, Ref, WorkerPid);
                 true ->
                     ok;
                 false ->
-                    ack_inflight(InflightTID, Ref, Id, Index, WorkerPid)
+                    ack_inflight(InflightTID, Ref, WorkerPid)
             end,
-            {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
+            {Data2, WorkerMRef} = ensure_async_worker_monitored(Data1, Result),
             store_async_worker_reference(InflightTID, Ref, WorkerMRef),
-            emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
             CurrentCount = queue_count(Q1),
             ?tp(
                 buffer_worker_flush_ack,
@@ -691,13 +720,13 @@ do_flush(#{queue := Q1} = Data0, #{
                     queue_count => CurrentCount
                 }
             ),
-            Data2 =
+            Data3 =
                 case {CurrentCount > 0, CurrentCount >= BatchSize} of
                     {false, _} ->
                         ?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{
                             inflight => inflight_count(InflightTID)
                         }),
-                        Data1;
+                        Data2;
                     {true, true} ->
                         ?tp(buffer_worker_flush_ack_reflush, #{
                             batch_or_query => Batch,
@@ -706,17 +735,18 @@ do_flush(#{queue := Q1} = Data0, #{
                             batch_size => BatchSize
                         }),
                         flush_worker(self()),
-                        Data1;
+                        Data2;
                     {true, false} ->
-                        ensure_flush_timer(Data1)
+                        ensure_flush_timer(Data2)
                 end,
-            {keep_state, Data2}
+            {keep_state, Data3}
     end.
 
 batch_reply_caller(Id, BatchResult, Batch, QueryOpts) ->
-    {ShouldBlock, PostFn} = batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts),
+    {ShouldBlock, PostFn, DeltaCounters} =
+        batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts),
     PostFn(),
-    ShouldBlock.
+    {ShouldBlock, DeltaCounters}.
 
 batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
     %% the `Mod:on_batch_query/3` returns a single result for a batch,
@@ -727,23 +757,25 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
         end,
         Batch
     ),
-    {ShouldAck, PostFns} =
+    {ShouldAck, PostFns, Counters} =
         lists:foldl(
-            fun(Reply, {_ShouldAck, PostFns}) ->
+            fun(Reply, {_ShouldAck, PostFns, OldCounters}) ->
                 %% _ShouldAck should be the same as ShouldAck starting from the second reply
-                {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts),
-                {ShouldAck, [PostFn | PostFns]}
+                {ShouldAck, PostFn, DeltaCounters} = reply_caller_defer_metrics(
+                    Id, Reply, QueryOpts
+                ),
+                {ShouldAck, [PostFn | PostFns], merge_counters(OldCounters, DeltaCounters)}
             end,
-            {ack, []},
+            {ack, [], #{}},
             Replies
         ),
     PostFn = fun() -> lists:foreach(fun(F) -> F() end, lists:reverse(PostFns)) end,
-    {ShouldAck, PostFn}.
+    {ShouldAck, PostFn, Counters}.
 
 reply_caller(Id, Reply, QueryOpts) ->
-    {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts),
+    {ShouldAck, PostFn, DeltaCounters} = reply_caller_defer_metrics(Id, Reply, QueryOpts),
     PostFn(),
-    ShouldAck.
+    {ShouldAck, DeltaCounters}.
 
 %% Should only reply to the caller when the decision is final (not
 %% retriable).  See comment on `handle_query_result_pure'.
@@ -752,7 +784,7 @@ reply_caller_defer_metrics(Id, ?REPLY(undefined, HasBeenSent, Result), _QueryOpt
 reply_caller_defer_metrics(Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts) ->
     IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
     IsUnrecoverableError = is_unrecoverable_error(Result),
-    {ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
+    {ShouldAck, PostFn, DeltaCounters} = handle_query_result_pure(Id, Result, HasBeenSent),
     case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of
         {ack, {async_return, _}, true, _} ->
             ok = do_reply_caller(ReplyTo, Result);
@@ -765,11 +797,14 @@ reply_caller_defer_metrics(Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts)
         {ack, _, _, _} ->
             ok = do_reply_caller(ReplyTo, Result)
     end,
-    {ShouldAck, PostFn}.
+    {ShouldAck, PostFn, DeltaCounters}.
 
+%% This is only called by `simple_{,a}sync_query', so we can bump the
+%% counters here.
 handle_query_result(Id, Result, HasBeenSent) ->
-    {ShouldBlock, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
+    {ShouldBlock, PostFn, DeltaCounters} = handle_query_result_pure(Id, Result, HasBeenSent),
     PostFn(),
+    bump_counters(Id, DeltaCounters),
     ShouldBlock.
 
 %% We should always retry (nack), except when:
@@ -778,85 +813,156 @@ handle_query_result(Id, Result, HasBeenSent) ->
 %%   * the result is a success (or at least a delayed result)
 %% We also retry even sync requests.  In that case, we shouldn't reply
 %% the caller until one of those final results above happen.
+-spec handle_query_result_pure(id(), term(), HasBeenSent :: boolean()) ->
+    {ack | nack, function(), counters()}.
 handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(exception, Msg), _HasBeenSent) ->
     PostFn = fun() ->
         ?SLOG(error, #{msg => resource_exception, info => Msg}),
         ok
     end,
-    {nack, PostFn};
+    {nack, PostFn, #{}};
 handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent) when
     NotWorking == not_connected; NotWorking == blocked
 ->
-    {nack, fun() -> ok end};
+    {nack, fun() -> ok end, #{}};
 handle_query_result_pure(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent) ->
     PostFn = fun() ->
         ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}),
-        emqx_resource_metrics:dropped_resource_not_found_inc(Id),
         ok
     end,
-    {ack, PostFn};
+    {ack, PostFn, #{dropped_resource_not_found => 1}};
 handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent) ->
     PostFn = fun() ->
         ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}),
-        emqx_resource_metrics:dropped_resource_stopped_inc(Id),
         ok
     end,
-    {ack, PostFn};
+    {ack, PostFn, #{dropped_resource_stopped => 1}};
 handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) ->
     PostFn = fun() ->
         ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}),
         ok
     end,
-    {nack, PostFn};
+    {nack, PostFn, #{}};
 handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
     case is_unrecoverable_error(Error) of
         true ->
             PostFn =
                 fun() ->
                     ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}),
-                    inc_sent_failed(Id, HasBeenSent),
                     ok
                 end,
-            {ack, PostFn};
+            Counters =
+                case HasBeenSent of
+                    true -> #{retried_failed => 1};
+                    false -> #{failed => 1}
+                end,
+            {ack, PostFn, Counters};
         false ->
             PostFn =
                 fun() ->
                     ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}),
                     ok
                 end,
-            {nack, PostFn}
+            {nack, PostFn, #{}}
     end;
 handle_query_result_pure(Id, {async_return, Result}, HasBeenSent) ->
     handle_query_async_result_pure(Id, Result, HasBeenSent);
-handle_query_result_pure(Id, Result, HasBeenSent) ->
+handle_query_result_pure(_Id, Result, HasBeenSent) ->
     PostFn = fun() ->
         assert_ok_result(Result),
-        inc_sent_success(Id, HasBeenSent),
         ok
     end,
-    {ack, PostFn}.
+    Counters =
+        case HasBeenSent of
+            true -> #{retried_success => 1};
+            false -> #{success => 1}
+        end,
+    {ack, PostFn, Counters}.
 
+-spec handle_query_async_result_pure(id(), term(), HasBeenSent :: boolean()) ->
+    {ack | nack, function(), counters()}.
 handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
     case is_unrecoverable_error(Error) of
         true ->
             PostFn =
                 fun() ->
                     ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}),
-                    inc_sent_failed(Id, HasBeenSent),
                     ok
                 end,
-            {ack, PostFn};
+            Counters =
+                case HasBeenSent of
+                    true -> #{retried_failed => 1};
+                    false -> #{failed => 1}
+                end,
+            {ack, PostFn, Counters};
         false ->
             PostFn = fun() ->
                 ?SLOG(error, #{id => Id, msg => async_send_error, reason => Reason}),
                 ok
             end,
-            {nack, PostFn}
+            {nack, PostFn, #{}}
     end;
 handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent) when is_pid(Pid) ->
-    {ack, fun() -> ok end};
+    {ack, fun() -> ok end, #{}};
 handle_query_async_result_pure(_Id, ok, _HasBeenSent) ->
-    {ack, fun() -> ok end}.
+    {ack, fun() -> ok end, #{}}.
+
+-spec aggregate_counters(data(), counters()) -> data().
+aggregate_counters(Data = #{counters := OldCounters}, DeltaCounters) ->
+    Counters = merge_counters(OldCounters, DeltaCounters),
+    Data#{counters := Counters}.
+
+-spec merge_counters(counters(), counters()) -> counters().
+merge_counters(OldCounters, DeltaCounters) ->
+    maps:fold(
+        fun(Metric, Val, Acc) ->
+            maps:update_with(Metric, fun(X) -> X + Val end, Val, Acc)
+        end,
+        OldCounters,
+        DeltaCounters
+    ).
+
+-spec flush_metrics(data()) -> data().
+flush_metrics(Data = #{id := Id, counters := Counters}) ->
+    bump_counters(Id, Counters),
+    set_gauges(Data),
+    ensure_metrics_flush_timer(Data#{counters := #{}}).
+
+-spec ensure_metrics_flush_timer(data()) -> data().
+ensure_metrics_flush_timer(Data = #{metrics_tref := undefined, metrics_flush_interval := T}) ->
+    Ref = make_ref(),
+    TRef = erlang:send_after(T, self(), {flush_metrics, Ref}),
+    Data#{metrics_tref := {TRef, Ref}}.
+
+-spec bump_counters(id(), counters()) -> ok.
+bump_counters(Id, Counters) ->
+    maps:foreach(
+        fun
+            (dropped_expired, Val) ->
+                emqx_resource_metrics:dropped_expired_inc(Id, Val);
+            (dropped_queue_full, Val) ->
+                emqx_resource_metrics:dropped_queue_full_inc(Id, Val);
+            (failed, Val) ->
+                emqx_resource_metrics:failed_inc(Id, Val);
+            (retried_failed, Val) ->
+                emqx_resource_metrics:retried_failed_inc(Id, Val);
+            (success, Val) ->
+                emqx_resource_metrics:success_inc(Id, Val);
+            (retried_success, Val) ->
+                emqx_resource_metrics:retried_success_inc(Id, Val);
+            (dropped_resource_not_found, Val) ->
+                emqx_resource_metrics:dropped_resource_not_found_inc(Id, Val);
+            (dropped_resource_stopped, Val) ->
+                emqx_resource_metrics:dropped_resource_stopped_inc(Id, Val)
+        end,
+        Counters
+    ).
+
+-spec set_gauges(data()) -> ok.
+set_gauges(_Data = #{id := Id, index := Index, queue := Q, inflight_tid := InflightTID}) ->
+    emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)),
+    emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
+    ok.
 
 handle_async_worker_down(Data0, Pid) ->
     #{async_workers := AsyncWorkers0} = Data0,
@@ -942,7 +1048,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re
             IsRetriable = false,
             WorkerMRef = undefined,
             InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
-            ok = inflight_append(InflightTID, InflightItem, Id, Index),
+            ok = inflight_append(InflightTID, InflightItem),
             Result = Mod:on_query_async(Id, Request, {ReplyFun, [ReplyContext]}, ResSt),
             {async_return, Result}
         end,
@@ -978,7 +1084,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re
             IsRetriable = false,
             WorkerMRef = undefined,
             InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
-            ok = inflight_append(InflightTID, InflightItem, Id, Index),
+            ok = inflight_append(InflightTID, InflightItem),
             Result = Mod:on_batch_query_async(Id, Requests, {ReplyFun, [ReplyContext]}, ResSt),
             {async_return, Result}
         end,
@@ -1005,7 +1111,6 @@ handle_async_reply1(
         request_ref := Ref,
         inflight_tid := InflightTID,
         resource_id := Id,
-        worker_index := Index,
         buffer_worker := WorkerPid,
         min_query := ?QUERY(_, _, _, ExpireAt) = _Query
     } = ReplyContext,
@@ -1018,7 +1123,9 @@ handle_async_reply1(
     Now = now_(),
     case is_expired(ExpireAt, Now) of
         true ->
-            IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid),
+            IsAcked = ack_inflight(InflightTID, Ref, WorkerPid),
+            %% evalutate metrics call here since we're not inside
+            %% buffer worker
             IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
             ?tp(handle_async_reply_expired, #{expired => [_Query]}),
             ok;
@@ -1031,7 +1138,6 @@ do_handle_async_reply(
         query_opts := QueryOpts,
         resource_id := Id,
         request_ref := Ref,
-        worker_index := Index,
         buffer_worker := WorkerPid,
         inflight_tid := InflightTID,
         min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query
@@ -1041,7 +1147,7 @@ do_handle_async_reply(
     %% NOTE: 'inflight' is the count of messages that were sent async
     %% but received no ACK, NOT the number of messages queued in the
     %% inflight window.
-    {Action, PostFn} = reply_caller_defer_metrics(
+    {Action, PostFn, DeltaCounters} = reply_caller_defer_metrics(
         Id, ?REPLY(ReplyTo, Sent, Result), QueryOpts
     ),
 
@@ -1058,7 +1164,7 @@ do_handle_async_reply(
             ok = ?MODULE:block(WorkerPid),
             blocked;
         ack ->
-            ok = do_async_ack(InflightTID, Ref, Id, Index, WorkerPid, PostFn, QueryOpts)
+            ok = do_async_ack(InflightTID, Ref, Id, PostFn, WorkerPid, DeltaCounters, QueryOpts)
     end.
 
 handle_async_batch_reply(
@@ -1110,7 +1216,6 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
     ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef) = Inflight,
     #{
         resource_id := Id,
-        worker_index := Index,
         buffer_worker := WorkerPid,
         inflight_tid := InflightTID,
         request_ref := Ref,
@@ -1130,11 +1235,13 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
             RealNotExpired0
         ),
     NumExpired = length(RealExpired),
+    %% evalutate metrics call here since we're not inside buffer
+    %% worker
     emqx_resource_metrics:late_reply_inc(Id, NumExpired),
     case RealNotExpired of
         [] ->
             %% all expired, no need to update back the inflight batch
-            _ = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid),
+            _ = ack_inflight(InflightTID, Ref, WorkerPid),
             ok;
         _ ->
             %% some queries are not expired, put them back to the inflight batch
@@ -1147,7 +1254,6 @@ do_handle_async_batch_reply(
     #{
         buffer_worker := WorkerPid,
         resource_id := Id,
-        worker_index := Index,
         inflight_tid := InflightTID,
         request_ref := Ref,
         min_batch := Batch,
@@ -1155,7 +1261,9 @@ do_handle_async_batch_reply(
     },
     Result
 ) ->
-    {Action, PostFn} = batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts),
+    {Action, PostFn, DeltaCounters} = batch_reply_caller_defer_metrics(
+        Id, Result, Batch, QueryOpts
+    ),
     ?tp(handle_async_reply, #{
         action => Action,
         batch_or_query => Batch,
@@ -1169,16 +1277,18 @@ do_handle_async_batch_reply(
             ok = ?MODULE:block(WorkerPid),
             blocked;
         ack ->
-            ok = do_async_ack(InflightTID, Ref, Id, Index, WorkerPid, PostFn, QueryOpts)
+            ok = do_async_ack(InflightTID, Ref, Id, PostFn, WorkerPid, DeltaCounters, QueryOpts)
     end.
 
-do_async_ack(InflightTID, Ref, Id, Index, WorkerPid, PostFn, QueryOpts) ->
-    IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid),
+do_async_ack(InflightTID, Ref, Id, PostFn, WorkerPid, DeltaCounters, QueryOpts) ->
+    IsKnownRef = ack_inflight(InflightTID, Ref, WorkerPid),
     case maps:get(simple_query, QueryOpts, false) of
         true ->
-            PostFn();
+            PostFn(),
+            bump_counters(Id, DeltaCounters);
         false when IsKnownRef ->
-            PostFn();
+            PostFn(),
+            bump_counters(Id, DeltaCounters);
         false ->
             ok
     end,
@@ -1222,31 +1332,30 @@ estimate_size(QItem) ->
     erlang:external_size(QItem).
 
 -spec append_queue(id(), index(), replayq:q(), [queue_query()]) ->
-    {[queue_query()], replayq:q()}.
+    {[queue_query()], replayq:q(), counters()}.
 append_queue(Id, Index, Q, Queries) ->
     %% this assertion is to ensure that we never append a raw binary
     %% because the marshaller will get lost.
     false = is_binary(hd(Queries)),
     Q0 = replayq:append(Q, Queries),
-    {Overflown, Q2} =
+    {Overflown, Q2, DeltaCounters} =
         case replayq:overflow(Q0) of
             OverflownBytes when OverflownBytes =< 0 ->
-                {[], Q0};
+                {[], Q0, #{}};
             OverflownBytes ->
                 PopOpts = #{bytes_limit => OverflownBytes, count_limit => 999999999},
                 {Q1, QAckRef, Items2} = replayq:pop(Q0, PopOpts),
                 ok = replayq:ack(Q1, QAckRef),
                 Dropped = length(Items2),
-                emqx_resource_metrics:dropped_queue_full_inc(Id, Dropped),
+                Counters = #{dropped_queue_full => Dropped},
                 ?SLOG(info, #{
                     msg => buffer_worker_overflow,
                     resource_id => Id,
                     worker_index => Index,
                     dropped => Dropped
                 }),
-                {Items2, Q1}
+                {Items2, Q1, Counters}
         end,
-    emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q2)),
     ?tp(
         buffer_worker_appended_to_queue,
         #{
@@ -1256,7 +1365,7 @@ append_queue(Id, Index, Q, Queries) ->
             overflown => length(Overflown)
         }
     ),
-    {Overflown, Q2}.
+    {Overflown, Q2, DeltaCounters}.
 
 %%==============================================================================
 %% the inflight queue for async query
@@ -1266,20 +1375,18 @@ append_queue(Id, Index, Q, Queries) ->
 -define(INITIAL_TIME_REF, initial_time).
 -define(INITIAL_MONOTONIC_TIME_REF, initial_monotonic_time).
 
-inflight_new(InfltWinSZ, Id, Index) ->
+inflight_new(InfltWinSZ) ->
     TableId = ets:new(
         emqx_resource_buffer_worker_inflight_tab,
         [ordered_set, public, {write_concurrency, true}]
     ),
-    inflight_append(TableId, {?MAX_SIZE_REF, InfltWinSZ}, Id, Index),
+    inflight_append(TableId, {?MAX_SIZE_REF, InfltWinSZ}),
     %% we use this counter because we might deal with batches as
     %% elements.
-    inflight_append(TableId, {?SIZE_REF, 0}, Id, Index),
-    inflight_append(TableId, {?BATCH_COUNT_REF, 0}, Id, Index),
-    inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}, Id, Index),
-    inflight_append(
-        TableId, {?INITIAL_MONOTONIC_TIME_REF, make_request_ref()}, Id, Index
-    ),
+    inflight_append(TableId, {?SIZE_REF, 0}),
+    inflight_append(TableId, {?BATCH_COUNT_REF, 0}),
+    inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}),
+    inflight_append(TableId, {?INITIAL_MONOTONIC_TIME_REF, make_request_ref()}),
     TableId.
 
 -spec inflight_get_first_retriable(ets:tid(), integer()) ->
@@ -1331,38 +1438,32 @@ inflight_num_msgs(InflightTID) ->
     [{_, Size}] = ets:lookup(InflightTID, ?SIZE_REF),
     Size.
 
-inflight_append(undefined, _InflightItem, _Id, _Index) ->
+inflight_append(undefined, _InflightItem) ->
     ok;
 inflight_append(
     InflightTID,
-    ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, WorkerMRef),
-    Id,
-    Index
+    ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, WorkerMRef)
 ) ->
     Batch = mark_as_sent(Batch0),
     InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
     IsNew = ets:insert_new(InflightTID, InflightItem),
     BatchSize = length(Batch),
     IsNew andalso inc_inflight(InflightTID, BatchSize),
-    emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
     ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
     ok;
 inflight_append(
     InflightTID,
     ?INFLIGHT_ITEM(
         Ref, ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, WorkerMRef
-    ),
-    Id,
-    Index
+    )
 ) ->
     Query = mark_as_sent(Query0),
     InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
     IsNew = ets:insert_new(InflightTID, InflightItem),
     IsNew andalso inc_inflight(InflightTID, 1),
-    emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
     ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
     ok;
-inflight_append(InflightTID, {Ref, Data}, _Id, _Index) ->
+inflight_append(InflightTID, {Ref, Data}) ->
     ets:insert(InflightTID, {Ref, Data}),
     %% this is a metadata row being inserted; therefore, we don't bump
     %% the inflight metric.
@@ -1398,6 +1499,8 @@ ensure_async_worker_monitored(
 ensure_async_worker_monitored(Data0, _Result) ->
     {Data0, undefined}.
 
+-spec store_async_worker_reference(undefined | ets:tid(), inflight_key(), undefined | reference()) ->
+    ok.
 store_async_worker_reference(undefined = _InflightTID, _Ref, _WorkerMRef) ->
     ok;
 store_async_worker_reference(_InflightTID, _Ref, undefined = _WorkerRef) ->
@@ -1410,9 +1513,9 @@ store_async_worker_reference(InflightTID, Ref, WorkerMRef) when
     ),
     ok.
 
-ack_inflight(undefined, _Ref, _Id, _Index, _WorkerPid) ->
+ack_inflight(undefined, _Ref, _WorkerPid) ->
     false;
-ack_inflight(InflightTID, Ref, Id, Index, WorkerPid) ->
+ack_inflight(InflightTID, Ref, WorkerPid) ->
     {Count, Removed} =
         case ets:take(InflightTID, Ref) of
             [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] ->
@@ -1428,12 +1531,6 @@ ack_inflight(InflightTID, Ref, Id, Index, WorkerPid) ->
         flush -> ?MODULE:flush_worker(WorkerPid)
     end,
     IsKnownRef = (Count > 0),
-    case IsKnownRef of
-        true ->
-            emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID));
-        false ->
-            ok
-    end,
     IsKnownRef.
 
 mark_inflight_items_as_retriable(Data, WorkerMRef) ->
@@ -1496,16 +1593,6 @@ dec_inflight_update(InflightTID, Count) when Count > 0 ->
 
 %%==============================================================================
 
-inc_sent_failed(Id, _HasBeenSent = true) ->
-    emqx_resource_metrics:retried_failed_inc(Id);
-inc_sent_failed(Id, _HasBeenSent) ->
-    emqx_resource_metrics:failed_inc(Id).
-
-inc_sent_success(Id, _HasBeenSent = true) ->
-    emqx_resource_metrics:retried_success_inc(Id);
-inc_sent_success(Id, _HasBeenSent) ->
-    emqx_resource_metrics:success_inc(Id).
-
 call_mode(force_sync, _) -> sync;
 call_mode(async_if_possible, always_sync) -> sync;
 call_mode(async_if_possible, async_if_possible) -> async.

+ 6 - 0
apps/emqx_resource/src/schema/emqx_resource_schema.erl

@@ -44,6 +44,7 @@ fields("creation_opts") ->
         {worker_pool_size, fun worker_pool_size/1},
         {health_check_interval, fun health_check_interval/1},
         {resume_interval, fun resume_interval/1},
+        {metrics_flush_interval, fun metrics_flush_interval/1},
         {start_after_created, fun start_after_created/1},
         {start_timeout, fun start_timeout/1},
         {auto_restart_interval, fun auto_restart_interval/1},
@@ -77,6 +78,11 @@ resume_interval(desc) -> ?DESC("resume_interval");
 resume_interval(required) -> false;
 resume_interval(_) -> undefined.
 
+metrics_flush_interval(type) -> emqx_schema:duration_ms();
+metrics_flush_interval(importance) -> ?IMPORTANCE_HIDDEN;
+metrics_flush_interval(required) -> false;
+metrics_flush_interval(_) -> undefined.
+
 health_check_interval(type) -> emqx_schema:duration_ms();
 health_check_interval(desc) -> ?DESC("health_check_interval");
 health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW;

+ 70 - 16
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -316,7 +316,11 @@ t_query_counter_async_query(_) ->
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
         #{name => test_resource, register => true},
-        #{query_mode => async, batch_size => 1}
+        #{
+            query_mode => async,
+            batch_size => 1,
+            metrics_flush_interval => 50
+        }
     ),
     ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
     NMsgs = 1_000,
@@ -350,7 +354,11 @@ t_query_counter_async_query(_) ->
         end
     ),
     #{counters := C} = emqx_resource:get_metrics(?ID),
-    ?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C),
+    ?retry(
+        _Sleep = 300,
+        _Attempts0 = 20,
+        ?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C)
+    ),
     ok = emqx_resource:remove_local(?ID).
 
 t_query_counter_async_callback(_) ->
@@ -1171,6 +1179,7 @@ t_unblock_only_required_buffer_workers(_) ->
         #{
             query_mode => async,
             batch_size => 5,
+            metrics_flush_interval => 50,
             batch_time => 100
         }
     ),
@@ -1219,6 +1228,7 @@ t_retry_batch(_Config) ->
             batch_size => 5,
             batch_time => 100,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => 1_000
         }
     ),
@@ -1318,6 +1328,7 @@ t_delete_and_re_create_with_same_name(_Config) ->
             worker_pool_size => NumBufferWorkers,
             buffer_mode => volatile_offload,
             buffer_seg_bytes => 100,
+            metrics_flush_interval => 50,
             resume_interval => 1_000
         }
     ),
@@ -1354,10 +1365,16 @@ t_delete_and_re_create_with_same_name(_Config) ->
 
             %% ensure that stuff got enqueued into disk
             tap_metrics(?LINE),
-            Queuing1 = emqx_resource_metrics:queuing_get(?ID),
-            Inflight1 = emqx_resource_metrics:inflight_get(?ID),
-            ?assert(Queuing1 > 0),
-            ?assertEqual(2, Inflight1),
+            ?retry(
+                _Sleep = 300,
+                _Attempts0 = 20,
+                ?assert(emqx_resource_metrics:queuing_get(?ID) > 0)
+            ),
+            ?retry(
+                _Sleep = 300,
+                _Attempts0 = 20,
+                ?assertEqual(2, emqx_resource_metrics:inflight_get(?ID))
+            ),
 
             %% now, we delete the resource
             process_flag(trap_exit, true),
@@ -1409,6 +1426,7 @@ t_always_overflow(_Config) ->
             batch_size => 1,
             worker_pool_size => 1,
             max_buffer_bytes => 1,
+            metrics_flush_interval => 50,
             resume_interval => 1_000
         }
     ),
@@ -1446,6 +1464,7 @@ t_retry_sync_inflight(_Config) ->
             query_mode => sync,
             batch_size => 1,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => ResumeInterval
         }
     ),
@@ -1496,6 +1515,7 @@ t_retry_sync_inflight_batch(_Config) ->
             batch_size => 2,
             batch_time => 200,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => ResumeInterval
         }
     ),
@@ -1546,6 +1566,7 @@ t_retry_async_inflight(_Config) ->
             query_mode => async,
             batch_size => 1,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => ResumeInterval
         }
     ),
@@ -1590,6 +1611,7 @@ t_retry_async_inflight_full(_Config) ->
             inflight_window => AsyncInflightWindow,
             batch_size => 1,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => ResumeInterval
         }
     ),
@@ -1653,6 +1675,7 @@ t_async_reply_multi_eval(_Config) ->
             batch_size => 3,
             batch_time => 10,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => ResumeInterval
         }
     ),
@@ -1667,7 +1690,7 @@ t_async_reply_multi_eval(_Config) ->
         #{}
     ),
     ?retry(
-        ResumeInterval,
+        2 * ResumeInterval,
         TotalTime div ResumeInterval,
         begin
             Metrics = tap_metrics(?LINE),
@@ -1683,7 +1706,7 @@ t_async_reply_multi_eval(_Config) ->
                 failed := Failed
             } = Counters,
             ?assertEqual(TotalQueries, Matched - 1),
-            ?assertEqual(Matched, Success + Dropped + LateReply + Failed)
+            ?assertEqual(Matched, Success + Dropped + LateReply + Failed, #{counters => Counters})
         end
     ).
 
@@ -1700,6 +1723,7 @@ t_retry_async_inflight_batch(_Config) ->
             batch_size => 2,
             batch_time => 200,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => ResumeInterval
         }
     ),
@@ -1745,6 +1769,7 @@ t_async_pool_worker_death(_Config) ->
             query_mode => async,
             batch_size => 1,
             worker_pool_size => NumBufferWorkers,
+            metrics_refresh_interval => 50,
             resume_interval => ResumeInterval
         }
     ),
@@ -1768,8 +1793,11 @@ t_async_pool_worker_death(_Config) ->
             inc_counter_in_parallel_increasing(NumReqs, 1, ReqOpts),
             {ok, _} = snabbkaffe:receive_events(SRef0),
 
-            Inflight0 = emqx_resource_metrics:inflight_get(?ID),
-            ?assertEqual(NumReqs, Inflight0),
+            ?retry(
+                _Sleep = 300,
+                _Attempts0 = 20,
+                ?assertEqual(NumReqs, emqx_resource_metrics:inflight_get(?ID))
+            ),
 
             %% grab one of the worker pids and kill it
             {ok, #{pid := Pid0}} = emqx_resource:simple_sync_query(?ID, get_state),
@@ -1820,6 +1848,7 @@ t_expiration_sync_before_sending(_Config) ->
             query_mode => sync,
             batch_size => 1,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => 1_000
         }
     ),
@@ -1837,6 +1866,7 @@ t_expiration_sync_batch_before_sending(_Config) ->
             batch_size => 2,
             batch_time => 100,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => 1_000
         }
     ),
@@ -1853,6 +1883,7 @@ t_expiration_async_before_sending(_Config) ->
             query_mode => async,
             batch_size => 1,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => 1_000
         }
     ),
@@ -1870,6 +1901,7 @@ t_expiration_async_batch_before_sending(_Config) ->
             batch_size => 2,
             batch_time => 100,
             worker_pool_size => 1,
+            metrics_flush_interval => 50,
             resume_interval => 1_000
         }
     ),
@@ -1950,6 +1982,7 @@ t_expiration_sync_before_sending_partial_batch(_Config) ->
             batch_size => 2,
             batch_time => 100,
             worker_pool_size => 1,
+            metrics_flush_interval => 250,
             resume_interval => 1_000
         }
     ),
@@ -1968,6 +2001,7 @@ t_expiration_async_before_sending_partial_batch(_Config) ->
             batch_size => 2,
             batch_time => 100,
             worker_pool_size => 1,
+            metrics_flush_interval => 250,
             resume_interval => 1_000
         }
     ),
@@ -2057,7 +2091,14 @@ do_t_expiration_before_sending_partial_batch(QueryMode) ->
                 ],
                 ?of_kind(buffer_worker_flush_potentially_partial, Trace)
             ),
-            wait_until_gauge_is(inflight, 0, 500),
+            wait_until_gauge_is(
+                inflight,
+                #{
+                    expected_value => 0,
+                    timeout => 500,
+                    max_events => 10
+                }
+            ),
             Metrics = tap_metrics(?LINE),
             case QueryMode of
                 async ->
@@ -2933,8 +2974,15 @@ install_telemetry_handler(TestCase) ->
     put({?MODULE, telemetry_table}, Tid),
     Tid.
 
-wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
-    Events = receive_all_events(GaugeName, Timeout),
+wait_until_gauge_is(
+    GaugeName,
+    #{
+        expected_value := ExpectedValue,
+        timeout := Timeout,
+        max_events := MaxEvents
+    }
+) ->
+    Events = receive_all_events(GaugeName, Timeout, MaxEvents),
     case length(Events) > 0 andalso lists:last(Events) of
         #{measurements := #{gauge_set := ExpectedValue}} ->
             ok;
@@ -2948,12 +2996,18 @@ wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
     end.
 
 receive_all_events(EventName, Timeout) ->
-    receive_all_events(EventName, Timeout, []).
+    receive_all_events(EventName, Timeout, _MaxEvents = 50, _Count = 0, _Acc = []).
+
+receive_all_events(EventName, Timeout, MaxEvents) ->
+    receive_all_events(EventName, Timeout, MaxEvents, _Count = 0, _Acc = []).
 
-receive_all_events(EventName, Timeout, Acc) ->
+receive_all_events(_EventName, _Timeout, MaxEvents, Count, Acc) when Count >= MaxEvents ->
+    lists:reverse(Acc);
+receive_all_events(EventName, Timeout, MaxEvents, Count, Acc) ->
     receive
         {telemetry, #{name := [_, _, EventName]} = Event} ->
-            receive_all_events(EventName, Timeout, [Event | Acc])
+            ct:pal("telemetry event: ~p", [Event]),
+            receive_all_events(EventName, Timeout, MaxEvents, Count + 1, [Event | Acc])
     after Timeout ->
         lists:reverse(Acc)
     end.