Quellcode durchsuchen

Merge pull request #9619 from thalesmg/refactor-gauges-v50

refactor(metrics): use absolute gauge values rather than deltas (v5.0)
Thales Macedo Garitezi vor 3 Jahren
Ursprung
Commit
7e02eac3bc

+ 116 - 9
apps/emqx/src/emqx_metrics_worker.erl

@@ -18,6 +18,8 @@
 
 -behaviour(gen_server).
 
+-include_lib("stdlib/include/ms_transform.hrl").
+
 %% API functions
 -export([
     start_link/1,
@@ -30,6 +32,11 @@
     inc/3,
     inc/4,
     get/3,
+    get_gauge/3,
+    set_gauge/5,
+    shift_gauge/5,
+    get_gauges/2,
+    delete_gauges/2,
     get_rate/2,
     get_counters/2,
     create_metrics/3,
@@ -68,14 +75,21 @@
     last5m := float()
 }.
 -type metrics() :: #{
-    counters := #{atom() => integer()},
-    rate := #{atom() => rate()}
+    counters := #{metric_name() => integer()},
+    gauges := #{metric_name() => integer()},
+    rate := #{metric_name() => rate()}
 }.
 -type handler_name() :: atom().
+%% metric_id() is actually a resource id
 -type metric_id() :: binary() | atom().
+-type metric_name() :: atom().
+-type worker_id() :: term().
 
 -define(CntrRef(Name), {?MODULE, Name}).
 -define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)).
+-define(GAUGE_TABLE(NAME),
+    list_to_atom(atom_to_list(?MODULE) ++ "_" ++ atom_to_list(NAME) ++ "_gauge")
+).
 
 -record(rate, {
     max = 0 :: number(),
@@ -112,11 +126,12 @@ child_spec(ChldName, Name) ->
         modules => [emqx_metrics_worker]
     }.
 
--spec create_metrics(handler_name(), metric_id(), [atom()]) -> ok | {error, term()}.
+-spec create_metrics(handler_name(), metric_id(), [metric_name()]) -> ok | {error, term()}.
 create_metrics(Name, Id, Metrics) ->
     create_metrics(Name, Id, Metrics, Metrics).
 
--spec create_metrics(handler_name(), metric_id(), [atom()], [atom()]) -> ok | {error, term()}.
+-spec create_metrics(handler_name(), metric_id(), [metric_name()], [metric_name()]) ->
+    ok | {error, term()}.
 create_metrics(Name, Id, Metrics, RateMetrics) ->
     gen_server:call(Name, {create_metrics, Id, Metrics, RateMetrics}).
 
@@ -135,7 +150,7 @@ has_metrics(Name, Id) ->
         _ -> true
     end.
 
--spec get(handler_name(), metric_id(), atom() | integer()) -> number().
+-spec get(handler_name(), metric_id(), metric_name() | integer()) -> number().
 get(Name, Id, Metric) ->
     case get_ref(Name, Id) of
         not_found ->
@@ -167,16 +182,102 @@ reset_counters(Name, Id) ->
 
 -spec get_metrics(handler_name(), metric_id()) -> metrics().
 get_metrics(Name, Id) ->
-    #{rate => get_rate(Name, Id), counters => get_counters(Name, Id)}.
+    #{
+        rate => get_rate(Name, Id),
+        counters => get_counters(Name, Id),
+        gauges => get_gauges(Name, Id)
+    }.
 
 -spec inc(handler_name(), metric_id(), atom()) -> ok.
 inc(Name, Id, Metric) ->
     inc(Name, Id, Metric, 1).
 
--spec inc(handler_name(), metric_id(), atom(), integer()) -> ok.
+-spec inc(handler_name(), metric_id(), metric_name(), integer()) -> ok.
 inc(Name, Id, Metric, Val) ->
     counters:add(get_ref(Name, Id), idx_metric(Name, Id, Metric), Val).
 
+-spec set_gauge(handler_name(), metric_id(), worker_id(), metric_name(), integer()) -> ok.
+set_gauge(Name, Id, WorkerId, Metric, Val) ->
+    Table = ?GAUGE_TABLE(Name),
+    try
+        true = ets:insert(Table, {{Id, Metric, WorkerId}, Val}),
+        ok
+    catch
+        error:badarg ->
+            ok
+    end.
+
+-spec shift_gauge(handler_name(), metric_id(), worker_id(), metric_name(), integer()) -> ok.
+shift_gauge(Name, Id, WorkerId, Metric, Val) ->
+    Table = ?GAUGE_TABLE(Name),
+    try
+        _ = ets:update_counter(
+            Table,
+            {Id, Metric, WorkerId},
+            Val,
+            {{Id, Metric, WorkerId}, 0}
+        ),
+        ok
+    catch
+        error:badarg ->
+            ok
+    end.
+
+-spec get_gauge(handler_name(), metric_id(), metric_name()) -> integer().
+get_gauge(Name, Id, Metric) ->
+    Table = ?GAUGE_TABLE(Name),
+    MatchSpec =
+        ets:fun2ms(
+            fun({{Id0, Metric0, _WorkerId}, Val}) when Id0 =:= Id, Metric0 =:= Metric ->
+                Val
+            end
+        ),
+    try
+        lists:sum(ets:select(Table, MatchSpec))
+    catch
+        error:badarg ->
+            0
+    end.
+
+-spec get_gauges(handler_name(), metric_id()) -> map().
+get_gauges(Name, Id) ->
+    Table = ?GAUGE_TABLE(Name),
+    MatchSpec =
+        ets:fun2ms(
+            fun({{Id0, Metric, _WorkerId}, Val}) when Id0 =:= Id ->
+                {Metric, Val}
+            end
+        ),
+    try
+        lists:foldr(
+            fun({Metric, Val}, Acc) ->
+                maps:update_with(Metric, fun(X) -> X + Val end, Val, Acc)
+            end,
+            #{},
+            ets:select(Table, MatchSpec)
+        )
+    catch
+        error:badarg ->
+            #{}
+    end.
+
+-spec delete_gauges(handler_name(), metric_id()) -> ok.
+delete_gauges(Name, Id) ->
+    Table = ?GAUGE_TABLE(Name),
+    MatchSpec =
+        ets:fun2ms(
+            fun({{Id0, _Metric, _WorkerId}, _Val}) when Id0 =:= Id ->
+                true
+            end
+        ),
+    try
+        _ = ets:select_delete(Table, MatchSpec),
+        ok
+    catch
+        error:badarg ->
+            ok
+    end.
+
 start_link(Name) ->
     gen_server:start_link({local, Name}, ?MODULE, Name, []).
 
@@ -185,6 +286,7 @@ init(Name) ->
     %% the rate metrics
     erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
     persistent_term:put(?CntrRef(Name), #{}),
+    _ = ets:new(?GAUGE_TABLE(Name), [named_table, ordered_set, public, {write_concurrency, true}]),
     {ok, #state{}}.
 
 handle_call({get_rate, _Id}, _From, State = #state{rates = undefined}) ->
@@ -220,7 +322,10 @@ handle_call(
     _From,
     State = #state{metric_ids = MIDs, rates = Rates}
 ) ->
-    {reply, delete_counters(get_self_name(), Id), State#state{
+    Name = get_self_name(),
+    delete_counters(Name, Id),
+    delete_gauges(Name, Id),
+    {reply, ok, State#state{
         metric_ids = sets:del_element(Id, MIDs),
         rates =
             case Rates of
@@ -233,7 +338,9 @@ handle_call(
     _From,
     State = #state{rates = Rates}
 ) ->
-    {reply, reset_counters(get_self_name(), Id), State#state{
+    Name = get_self_name(),
+    delete_gauges(Name, Id),
+    {reply, reset_counters(Name, Id), State#state{
         rates =
             case Rates of
                 undefined ->

+ 9 - 1
apps/emqx/test/emqx_common_test_helpers.erl

@@ -65,7 +65,8 @@
 -export([clear_screen/0]).
 -export([with_mock/4]).
 -export([
-    on_exit/1
+    on_exit/1,
+    call_janitor/0
 ]).
 
 %% Toxiproxy API
@@ -933,6 +934,13 @@ latency_up_proxy(off, Name, ProxyHost, ProxyPort) ->
 %% Testcase teardown utilities
 %%-------------------------------------------------------------------------------
 
+%% stop the janitor gracefully to ensure proper cleanup order and less
+%% noise in the logs.
+call_janitor() ->
+    Janitor = get_or_spawn_janitor(),
+    exit(Janitor, normal),
+    ok.
+
 get_or_spawn_janitor() ->
     case get({?MODULE, janitor_proc}) of
         undefined ->

+ 206 - 33
apps/emqx/test/emqx_metrics_worker_SUITE.erl

@@ -47,7 +47,8 @@ end_per_testcase(_, _Config) ->
 
 t_get_metrics(_) ->
     Metrics = [a, b, c],
-    ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, Metrics),
+    Id = <<"testid">>,
+    ok = emqx_metrics_worker:create_metrics(?NAME, Id, Metrics),
     %% all the metrics are set to zero at start
     ?assertMatch(
         #{
@@ -56,18 +57,22 @@ t_get_metrics(_) ->
                 b := #{current := 0.0, max := 0.0, last5m := 0.0},
                 c := #{current := 0.0, max := 0.0, last5m := 0.0}
             },
+            gauges := #{},
             counters := #{
                 a := 0,
                 b := 0,
                 c := 0
             }
         },
-        emqx_metrics_worker:get_metrics(?NAME, <<"testid">>)
+        emqx_metrics_worker:get_metrics(?NAME, Id)
     ),
-    ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a),
-    ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b),
-    ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c),
-    ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c),
+    ok = emqx_metrics_worker:inc(?NAME, Id, a),
+    ok = emqx_metrics_worker:inc(?NAME, Id, b),
+    ok = emqx_metrics_worker:inc(?NAME, Id, c),
+    ok = emqx_metrics_worker:inc(?NAME, Id, c),
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5),
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7),
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9),
     ct:sleep(1500),
     ?LET(
         #{
@@ -76,27 +81,73 @@ t_get_metrics(_) ->
                 b := #{current := CurrB, max := MaxB, last5m := _},
                 c := #{current := CurrC, max := MaxC, last5m := _}
             },
+            gauges := #{
+                inflight := Inflight,
+                queuing := Queuing
+            },
             counters := #{
                 a := 1,
                 b := 1,
                 c := 2
             }
         },
-        emqx_metrics_worker:get_metrics(?NAME, <<"testid">>),
+        emqx_metrics_worker:get_metrics(?NAME, Id),
         {
             ?assert(CurrA > 0),
             ?assert(CurrB > 0),
             ?assert(CurrC > 0),
             ?assert(MaxA > 0),
             ?assert(MaxB > 0),
-            ?assert(MaxC > 0)
+            ?assert(MaxC > 0),
+            ?assert(Inflight == 12),
+            ?assert(Queuing == 9)
         }
     ),
-    ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>).
+    ok = emqx_metrics_worker:clear_metrics(?NAME, Id).
+
+t_clear_metrics(_Config) ->
+    Metrics = [a, b, c],
+    Id = <<"testid">>,
+    ok = emqx_metrics_worker:create_metrics(?NAME, Id, Metrics),
+    ?assertMatch(
+        #{
+            rate := #{
+                a := #{current := 0.0, max := 0.0, last5m := 0.0},
+                b := #{current := 0.0, max := 0.0, last5m := 0.0},
+                c := #{current := 0.0, max := 0.0, last5m := 0.0}
+            },
+            gauges := #{},
+            counters := #{
+                a := 0,
+                b := 0,
+                c := 0
+            }
+        },
+        emqx_metrics_worker:get_metrics(?NAME, Id)
+    ),
+    ok = emqx_metrics_worker:inc(?NAME, Id, a),
+    ok = emqx_metrics_worker:inc(?NAME, Id, b),
+    ok = emqx_metrics_worker:inc(?NAME, Id, c),
+    ok = emqx_metrics_worker:inc(?NAME, Id, c),
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5),
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7),
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9),
+    ct:sleep(1500),
+    ok = emqx_metrics_worker:clear_metrics(?NAME, Id),
+    ?assertEqual(
+        #{
+            counters => #{},
+            gauges => #{},
+            rate => #{current => 0.0, last5m => 0.0, max => 0.0}
+        },
+        emqx_metrics_worker:get_metrics(?NAME, Id)
+    ),
+    ok.
 
 t_reset_metrics(_) ->
     Metrics = [a, b, c],
-    ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, Metrics),
+    Id = <<"testid">>,
+    ok = emqx_metrics_worker:create_metrics(?NAME, Id, Metrics),
     %% all the metrics are set to zero at start
     ?assertMatch(
         #{
@@ -105,20 +156,24 @@ t_reset_metrics(_) ->
                 b := #{current := 0.0, max := 0.0, last5m := 0.0},
                 c := #{current := 0.0, max := 0.0, last5m := 0.0}
             },
+            gauges := #{},
             counters := #{
                 a := 0,
                 b := 0,
                 c := 0
             }
         },
-        emqx_metrics_worker:get_metrics(?NAME, <<"testid">>)
+        emqx_metrics_worker:get_metrics(?NAME, Id)
     ),
-    ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a),
-    ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b),
-    ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c),
-    ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c),
+    ok = emqx_metrics_worker:inc(?NAME, Id, a),
+    ok = emqx_metrics_worker:inc(?NAME, Id, b),
+    ok = emqx_metrics_worker:inc(?NAME, Id, c),
+    ok = emqx_metrics_worker:inc(?NAME, Id, c),
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5),
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7),
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9),
     ct:sleep(1500),
-    ok = emqx_metrics_worker:reset_metrics(?NAME, <<"testid">>),
+    ok = emqx_metrics_worker:reset_metrics(?NAME, Id),
     ?LET(
         #{
             rate := #{
@@ -126,68 +181,83 @@ t_reset_metrics(_) ->
                 b := #{current := CurrB, max := MaxB, last5m := _},
                 c := #{current := CurrC, max := MaxC, last5m := _}
             },
+            gauges := Gauges,
             counters := #{
                 a := 0,
                 b := 0,
                 c := 0
             }
         },
-        emqx_metrics_worker:get_metrics(?NAME, <<"testid">>),
+        emqx_metrics_worker:get_metrics(?NAME, Id),
         {
             ?assert(CurrA == 0),
             ?assert(CurrB == 0),
             ?assert(CurrC == 0),
             ?assert(MaxA == 0),
             ?assert(MaxB == 0),
-            ?assert(MaxC == 0)
+            ?assert(MaxC == 0),
+            ?assertEqual(#{}, Gauges)
         }
     ),
-    ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>).
+    ok = emqx_metrics_worker:clear_metrics(?NAME, Id).
 
 t_get_metrics_2(_) ->
     Metrics = [a, b, c],
+    Id = <<"testid">>,
     ok = emqx_metrics_worker:create_metrics(
         ?NAME,
-        <<"testid">>,
+        Id,
         Metrics,
         [a]
     ),
-    ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a),
-    ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b),
-    ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c),
+    ok = emqx_metrics_worker:inc(?NAME, Id, a),
+    ok = emqx_metrics_worker:inc(?NAME, Id, b),
+    ok = emqx_metrics_worker:inc(?NAME, Id, c),
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5),
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7),
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9),
     ?assertMatch(
         #{
             rate := Rate = #{
                 a := #{current := _, max := _, last5m := _}
             },
+            gauges := #{},
             counters := #{
                 a := 1,
                 b := 1,
                 c := 1
             }
         } when map_size(Rate) =:= 1,
-        emqx_metrics_worker:get_metrics(?NAME, <<"testid">>)
+        emqx_metrics_worker:get_metrics(?NAME, Id)
     ),
-    ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>).
+    ok = emqx_metrics_worker:clear_metrics(?NAME, Id).
 
 t_recreate_metrics(_) ->
-    ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, [a]),
-    ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a),
+    Id = <<"testid">>,
+    ok = emqx_metrics_worker:create_metrics(?NAME, Id, [a]),
+    ok = emqx_metrics_worker:inc(?NAME, Id, a),
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5),
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7),
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9),
     ?assertMatch(
         #{
             rate := R = #{
                 a := #{current := _, max := _, last5m := _}
             },
+            gauges := #{
+                inflight := 12,
+                queuing := 9
+            },
             counters := C = #{
                 a := 1
             }
         } when map_size(R) == 1 andalso map_size(C) == 1,
-        emqx_metrics_worker:get_metrics(?NAME, <<"testid">>)
+        emqx_metrics_worker:get_metrics(?NAME, Id)
     ),
     %% we create the metrics again, to add some counters
-    ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, [a, b, c]),
-    ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b),
-    ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c),
+    ok = emqx_metrics_worker:create_metrics(?NAME, Id, [a, b, c]),
+    ok = emqx_metrics_worker:inc(?NAME, Id, b),
+    ok = emqx_metrics_worker:inc(?NAME, Id, c),
     ?assertMatch(
         #{
             rate := R = #{
@@ -195,13 +265,17 @@ t_recreate_metrics(_) ->
                 b := #{current := _, max := _, last5m := _},
                 c := #{current := _, max := _, last5m := _}
             },
+            gauges := #{
+                inflight := 12,
+                queuing := 9
+            },
             counters := C = #{
                 a := 1, b := 1, c := 1
             }
         } when map_size(R) == 3 andalso map_size(C) == 3,
-        emqx_metrics_worker:get_metrics(?NAME, <<"testid">>)
+        emqx_metrics_worker:get_metrics(?NAME, Id)
     ),
-    ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>).
+    ok = emqx_metrics_worker:clear_metrics(?NAME, Id).
 
 t_inc_matched(_) ->
     Metrics = ['rules.matched'],
@@ -238,3 +312,102 @@ t_rate(_) ->
     ct:sleep(3000),
     ok = emqx_metrics_worker:clear_metrics(?NAME, <<"rule1">>),
     ok = emqx_metrics_worker:clear_metrics(?NAME, <<"rule:2">>).
+
+t_get_gauge(_Config) ->
+    Metric = 'queueing',
+    %% unknown handler name (inexistent table)
+    ?assertEqual(0, emqx_metrics_worker:get_gauge(unknown_name, unknown_id, Metric)),
+    %% unknown resource id
+    ?assertEqual(0, emqx_metrics_worker:get_gauge(?NAME, unknown_id, Metric)),
+
+    Id = <<"some id">>,
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 2),
+
+    ?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)),
+    ?assertEqual(0, emqx_metrics_worker:get_gauge(?NAME, unknown, Metric)),
+
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, Metric, 3),
+    ?assertEqual(5, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)),
+
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 1),
+    ?assertEqual(4, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)),
+
+    ?assertEqual(0, emqx_metrics_worker:get_gauge(?NAME, Id, another_metric)),
+
+    ok.
+
+t_get_gauges(_Config) ->
+    %% unknown handler name (inexistent table)
+    ?assertEqual(#{}, emqx_metrics_worker:get_gauges(unknown_name, unknown_id)),
+    %% unknown resource id
+    ?assertEqual(#{}, emqx_metrics_worker:get_gauges(?NAME, unknown_id)),
+
+    Metric = 'queuing',
+    Id = <<"some id">>,
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 2),
+
+    ?assertEqual(#{queuing => 2}, emqx_metrics_worker:get_gauges(?NAME, Id)),
+    ?assertEqual(#{}, emqx_metrics_worker:get_gauges(?NAME, unknown)),
+
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, Metric, 3),
+    ?assertEqual(#{queuing => 5}, emqx_metrics_worker:get_gauges(?NAME, Id)),
+
+    AnotherMetric = 'inflight',
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 1),
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, AnotherMetric, 10),
+    ?assertEqual(#{queuing => 4, inflight => 10}, emqx_metrics_worker:get_gauges(?NAME, Id)),
+
+    ok.
+
+t_delete_gauge(_Config) ->
+    %% unknown handler name (inexistent table)
+    ?assertEqual(ok, emqx_metrics_worker:delete_gauges(unknown_name, unknown_id)),
+    %% unknown resource id
+    ?assertEqual(ok, emqx_metrics_worker:delete_gauges(?NAME, unknown_id)),
+
+    Metric = 'queuing',
+    AnotherMetric = 'inflight',
+    Id = <<"some id">>,
+    AnotherId = <<"another id">>,
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, Metric, 2),
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, Metric, 3),
+    ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, AnotherMetric, 10),
+    ok = emqx_metrics_worker:set_gauge(?NAME, AnotherId, worker_id1, AnotherMetric, 10),
+    ?assertEqual(#{queuing => 5, inflight => 10}, emqx_metrics_worker:get_gauges(?NAME, Id)),
+
+    ?assertEqual(ok, emqx_metrics_worker:delete_gauges(?NAME, Id)),
+
+    ?assertEqual(#{}, emqx_metrics_worker:get_gauges(?NAME, Id)),
+    ?assertEqual(#{inflight => 10}, emqx_metrics_worker:get_gauges(?NAME, AnotherId)),
+
+    ok.
+
+t_shift_gauge(_Config) ->
+    Metric = 'queueing',
+    Id = <<"some id">>,
+    AnotherId = <<"another id">>,
+
+    %% unknown handler name (inexistent table)
+    ?assertEqual(
+        ok, emqx_metrics_worker:shift_gauge(unknown_name, unknown_id, worker_id0, Metric, 2)
+    ),
+    ?assertEqual(0, emqx_metrics_worker:get_gauge(unknown_name, unknown_id, Metric)),
+
+    %% empty resource id
+    ?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, Id, worker_id0, Metric, 2)),
+    ?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, AnotherId, worker_id0, Metric, 2)),
+    ?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)),
+    ?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, AnotherId, Metric)),
+
+    ?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, Id, worker_id0, Metric, 3)),
+    ?assertEqual(5, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)),
+
+    ?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, Id, worker_id1, Metric, 10)),
+    ?assertEqual(15, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)),
+
+    ?assertEqual(ok, emqx_metrics_worker:shift_gauge(?NAME, Id, worker_id1, Metric, -4)),
+    ?assertEqual(11, emqx_metrics_worker:get_gauge(?NAME, Id, Metric)),
+
+    ?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, AnotherId, Metric)),
+
+    ok.

+ 2 - 3
apps/emqx/test/emqx_test_janitor.erl

@@ -49,8 +49,7 @@ push_on_exit_callback(Server, Callback) when is_function(Callback, 0) ->
 
 init(Parent) ->
     process_flag(trap_exit, true),
-    Ref = monitor(process, Parent),
-    {ok, #{callbacks => [], owner => {Ref, Parent}}}.
+    {ok, #{callbacks => [], owner => Parent}}.
 
 terminate(_Reason, #{callbacks := Callbacks}) ->
     lists:foreach(fun(Fun) -> Fun() end, Callbacks).
@@ -63,7 +62,7 @@ handle_call(_Req, _From, State) ->
 handle_cast(_Req, State) ->
     {noreply, State}.
 
-handle_info({'DOWN', Ref, process, Parent, _Reason}, State = #{owner := {Ref, Parent}}) ->
+handle_info({'EXIT', Parent, _Reason}, State = #{owner := Parent}) ->
     {stop, normal, State};
 handle_info(_Msg, State) ->
     {noreply, State}.

+ 1 - 1
apps/emqx_bridge/i18n/emqx_bridge_schema.conf

@@ -195,7 +195,7 @@ emqx_bridge_schema {
 
     metric_sent_inflight {
                    desc {
-                         en: """Count of messages that were sent asynchronously but ACKs are not received."""
+                         en: """Count of messages that were sent asynchronously but ACKs are not yet received."""
                          zh: """已异步地发送但没有收到 ACK 的消息个数。"""
                         }
                    label: {

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge, [
     {description, "EMQX bridges"},
-    {vsn, "0.1.7"},
+    {vsn, "0.1.8"},
     {registered, []},
     {mod, {emqx_bridge_app, []}},
     {applications, [

+ 4 - 3
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -686,7 +686,6 @@ format_resp(
 
 format_metrics(#{
     counters := #{
-        'batching' := Batched,
         'dropped' := Dropped,
         'dropped.other' := DroppedOther,
         'dropped.queue_full' := DroppedQueueFull,
@@ -694,17 +693,19 @@ format_metrics(#{
         'dropped.resource_not_found' := DroppedResourceNotFound,
         'dropped.resource_stopped' := DroppedResourceStopped,
         'matched' := Matched,
-        'queuing' := Queued,
         'retried' := Retried,
         'failed' := SentFailed,
-        'inflight' := SentInflight,
         'success' := SentSucc,
         'received' := Rcvd
     },
+    gauges := Gauges,
     rate := #{
         matched := #{current := Rate, last5m := Rate5m, max := RateMax}
     }
 }) ->
+    Batched = maps:get('batching', Gauges, 0),
+    Queued = maps:get('queuing', Gauges, 0),
+    SentInflight = maps:get('inflight', Gauges, 0),
     ?METRICS(
         Batched,
         Dropped,

+ 19 - 9
apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl

@@ -19,6 +19,7 @@
 -compile(export_all).
 
 -import(emqx_dashboard_api_test_helpers, [request/4, uri/1]).
+-import(emqx_common_test_helpers, [on_exit/1]).
 
 -include("emqx/include/emqx.hrl").
 -include_lib("eunit/include/eunit.hrl").
@@ -124,6 +125,7 @@ init_per_testcase(_, Config) ->
     Config.
 end_per_testcase(_, _Config) ->
     clear_resources(),
+    emqx_common_test_helpers:call_janitor(),
     ok.
 
 clear_resources() ->
@@ -672,6 +674,12 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
         <<"name">> := ?BRIDGE_NAME_EGRESS
     } = jsx:decode(Bridge),
     BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
+    on_exit(fun() ->
+        %% delete the bridge
+        {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
+        {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+        ok
+    end),
     %% we now test if the bridge works as expected
     LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
     RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
@@ -733,15 +741,20 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
 
     %% verify the metrics of the bridge, the message should be queued
     {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []),
+    Decoded1 = jsx:decode(BridgeStr1),
+    ?assertMatch(
+        Status when (Status == <<"connected">> orelse Status == <<"connecting">>),
+        maps:get(<<"status">>, Decoded1)
+    ),
     %% matched >= 3 because of possible retries.
     ?assertMatch(
         #{
-            <<"status">> := Status,
-            <<"metrics">> := #{
-                <<"matched">> := Matched, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2
-            }
-        } when Matched >= 3 andalso (Status == <<"connected">> orelse Status == <<"connecting">>),
-        jsx:decode(BridgeStr1)
+            <<"matched">> := Matched,
+            <<"success">> := 1,
+            <<"failed">> := 0,
+            <<"queuing">> := 2
+        } when Matched >= 3,
+        maps:get(<<"metrics">>, Decoded1)
     ),
 
     %% start the listener 1883 to make the bridge reconnected
@@ -766,9 +779,6 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
     %% also verify the 2 messages have been sent to the remote broker
     assert_mqtt_msg_received(RemoteTopic, Payload1),
     assert_mqtt_msg_received(RemoteTopic, Payload2),
-    %% delete the bridge
-    {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
-    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
     ok.
 
 assert_mqtt_msg_received(Topic, Payload) ->

+ 1 - 1
apps/emqx_resource/src/emqx_resource.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_resource, [
     {description, "Manager for all external resources"},
-    {vsn, "0.1.3"},
+    {vsn, "0.1.4"},
     {registered, []},
     {mod, {emqx_resource_app, []}},
     {applications, [

+ 0 - 3
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -141,9 +141,6 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
             'dropped.resource_not_found',
             'dropped.resource_stopped',
             'dropped.other',
-            'queuing',
-            'batching',
-            'inflight',
             'received'
         ],
         [matched]

+ 66 - 21
apps/emqx_resource/src/emqx_resource_metrics.erl

@@ -24,11 +24,12 @@
 ]).
 
 -export([
-    batching_change/2,
+    batching_set/3,
+    batching_shift/3,
     batching_get/1,
-    inflight_change/2,
+    inflight_set/3,
     inflight_get/1,
-    queuing_change/2,
+    queuing_set/3,
     queuing_get/1,
     dropped_inc/1,
     dropped_inc/2,
@@ -114,8 +115,6 @@ handle_telemetry_event(
     _HandlerConfig
 ) ->
     case Event of
-        batching ->
-            emqx_metrics_worker:inc(?RES_METRICS, ID, 'batching', Val);
         dropped_other ->
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val);
@@ -133,12 +132,8 @@ handle_telemetry_event(
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val);
         failed ->
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val);
-        inflight ->
-            emqx_metrics_worker:inc(?RES_METRICS, ID, 'inflight', Val);
         matched ->
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val);
-        queuing ->
-            emqx_metrics_worker:inc(?RES_METRICS, ID, 'queuing', Val);
         retried_failed ->
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val),
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val),
@@ -152,6 +147,34 @@ handle_telemetry_event(
         _ ->
             ok
     end;
+handle_telemetry_event(
+    [?TELEMETRY_PREFIX, Event],
+    _Measurements = #{gauge_set := Val},
+    _Metadata = #{resource_id := ID, worker_id := WorkerID},
+    _HandlerConfig
+) ->
+    case Event of
+        batching ->
+            emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'batching', Val);
+        inflight ->
+            emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'inflight', Val);
+        queuing ->
+            emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'queuing', Val);
+        _ ->
+            ok
+    end;
+handle_telemetry_event(
+    [?TELEMETRY_PREFIX, Event],
+    _Measurements = #{gauge_shift := Val},
+    _Metadata = #{resource_id := ID, worker_id := WorkerID},
+    _HandlerConfig
+) ->
+    case Event of
+        batching ->
+            emqx_metrics_worker:shift_gauge(?RES_METRICS, ID, WorkerID, 'batching', Val);
+        _ ->
+            ok
+    end;
 handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) ->
     ok.
 
@@ -160,26 +183,48 @@ handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) ->
 
 %% @doc Count of messages that are currently accumulated in memory waiting for
 %% being sent in one batch
-batching_change(ID, Val) ->
-    telemetry:execute([?TELEMETRY_PREFIX, batching], #{counter_inc => Val}, #{resource_id => ID}).
+batching_set(ID, WorkerID, Val) ->
+    telemetry:execute(
+        [?TELEMETRY_PREFIX, batching],
+        #{gauge_set => Val},
+        #{resource_id => ID, worker_id => WorkerID}
+    ).
+
+batching_shift(_ID, _WorkerID = undefined, _Val) ->
+    ok;
+batching_shift(ID, WorkerID, Val) ->
+    telemetry:execute(
+        [?TELEMETRY_PREFIX, batching],
+        #{gauge_shift => Val},
+        #{resource_id => ID, worker_id => WorkerID}
+    ).
 
 batching_get(ID) ->
-    emqx_metrics_worker:get(?RES_METRICS, ID, 'batching').
+    emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'batching').
 
-%% @doc Count of messages that are currently queuing. [Gauge]
-queuing_change(ID, Val) ->
-    telemetry:execute([?TELEMETRY_PREFIX, queuing], #{counter_inc => Val}, #{resource_id => ID}).
+%% @doc Count of batches of messages that are currently
+%% queuing. [Gauge]
+queuing_set(ID, WorkerID, Val) ->
+    telemetry:execute(
+        [?TELEMETRY_PREFIX, queuing],
+        #{gauge_set => Val},
+        #{resource_id => ID, worker_id => WorkerID}
+    ).
 
 queuing_get(ID) ->
-    emqx_metrics_worker:get(?RES_METRICS, ID, 'queuing').
+    emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'queuing').
 
-%% @doc Count of messages that were sent asynchronously but ACKs are not
-%% received. [Gauge]
-inflight_change(ID, Val) ->
-    telemetry:execute([?TELEMETRY_PREFIX, inflight], #{counter_inc => Val}, #{resource_id => ID}).
+%% @doc Count of batches of messages that were sent asynchronously but
+%% ACKs are not yet received. [Gauge]
+inflight_set(ID, WorkerID, Val) ->
+    telemetry:execute(
+        [?TELEMETRY_PREFIX, inflight],
+        #{gauge_set => Val},
+        #{resource_id => ID, worker_id => WorkerID}
+    ).
 
 inflight_get(ID) ->
-    emqx_metrics_worker:get(?RES_METRICS, ID, 'inflight').
+    emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'inflight').
 
 %% Counters (value can only got up):
 %% --------------------------------------

+ 143 - 88
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -52,7 +52,7 @@
 
 -export([queue_item_marshaller/1, estimate_size/1]).
 
--export([reply_after_query/6, batch_reply_after_query/6]).
+-export([reply_after_query/7, batch_reply_after_query/7]).
 
 -define(Q_ITEM(REQUEST), {q_item, REQUEST}).
 
@@ -90,13 +90,27 @@ async_query(Id, Request, Opts) ->
 %% simple query the resource without batching and queuing messages.
 -spec simple_sync_query(id(), request()) -> Result :: term().
 simple_sync_query(Id, Request) ->
-    Result = call_query(sync, Id, ?QUERY(self(), Request, false), #{}),
+    %% Note: since calling this function implies in bypassing the
+    %% buffer workers, and each buffer worker index is used when
+    %% collecting gauge metrics, we use this dummy index.  If this
+    %% call ends up calling buffering functions, that's a bug and
+    %% would mess up the metrics anyway.  `undefined' is ignored by
+    %% `emqx_resource_metrics:*_shift/3'.
+    Index = undefined,
+    Result = call_query(sync, Id, Index, ?QUERY(self(), Request, false), #{}),
     _ = handle_query_result(Id, Result, false, false),
     Result.
 
 -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term().
 simple_async_query(Id, Request, ReplyFun) ->
-    Result = call_query(async, Id, ?QUERY(ReplyFun, Request, false), #{}),
+    %% Note: since calling this function implies in bypassing the
+    %% buffer workers, and each buffer worker index is used when
+    %% collecting gauge metrics, we use this dummy index.  If this
+    %% call ends up calling buffering functions, that's a bug and
+    %% would mess up the metrics anyway.  `undefined' is ignored by
+    %% `emqx_resource_metrics:*_shift/3'.
+    Index = undefined,
+    Result = call_query(async, Id, Index, ?QUERY(ReplyFun, Request, false), #{}),
     _ = handle_query_result(Id, Result, false, false),
     Result.
 
@@ -133,9 +147,11 @@ init({Id, Index, Opts}) ->
             false ->
                 undefined
         end,
-    emqx_resource_metrics:queuing_change(Id, queue_count(Queue)),
+    emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)),
+    emqx_resource_metrics:batching_set(Id, Index, 0),
+    emqx_resource_metrics:inflight_set(Id, Index, 0),
     InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
-    ok = inflight_new(Name, InfltWinSZ),
+    ok = inflight_new(Name, InfltWinSZ, Id, Index),
     HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
     St = #{
         id => Id,
@@ -158,10 +174,12 @@ running(cast, resume, _St) ->
     keep_state_and_data;
 running(cast, block, St) ->
     {next_state, blocked, St};
-running(cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, queue := Q} = St) when
+running(
+    cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, index := Index, queue := Q} = St
+) when
     is_list(Batch)
 ->
-    Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]),
+    Q1 = maybe_append_queue(Id, Index, Q, [?Q_ITEM(Query) || Query <- Batch]),
     {next_state, blocked, St#{queue := Q1}};
 running({call, From}, {query, Request, _Opts}, St) ->
     query_or_acc(From, Request, St);
@@ -180,28 +198,39 @@ blocked(enter, _, #{resume_interval := ResumeT} = _St) ->
     {keep_state_and_data, {state_timeout, ResumeT, resume}};
 blocked(cast, block, _St) ->
     keep_state_and_data;
-blocked(cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, queue := Q} = St) when
+blocked(
+    cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, index := Index, queue := Q} = St
+) when
     is_list(Batch)
 ->
-    Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]),
+    Q1 = maybe_append_queue(Id, Index, Q, [?Q_ITEM(Query) || Query <- Batch]),
     {keep_state, St#{queue := Q1}};
 blocked(cast, resume, St) ->
     do_resume(St);
 blocked(state_timeout, resume, St) ->
     do_resume(St);
-blocked({call, From}, {query, Request, _Opts}, #{id := Id, queue := Q} = St) ->
+blocked({call, From}, {query, Request, _Opts}, #{id := Id, index := Index, queue := Q} = St) ->
     Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
     _ = reply_caller(Id, ?REPLY(From, Request, false, Error)),
-    {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(From, Request, false))])}};
-blocked(cast, {query, Request, Opts}, #{id := Id, queue := Q} = St) ->
+    {keep_state, St#{
+        queue := maybe_append_queue(Id, Index, Q, [?Q_ITEM(?QUERY(From, Request, false))])
+    }};
+blocked(cast, {query, Request, Opts}, #{id := Id, index := Index, queue := Q} = St) ->
     ReplyFun = maps:get(async_reply_fun, Opts, undefined),
     Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
     _ = reply_caller(Id, ?REPLY(ReplyFun, Request, false, Error)),
     {keep_state, St#{
-        queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplyFun, Request, false))])
+        queue := maybe_append_queue(Id, Index, Q, [?Q_ITEM(?QUERY(ReplyFun, Request, false))])
     }}.
 
-terminate(_Reason, #{id := Id, index := Index}) ->
+terminate(_Reason, #{id := Id, index := Index, queue := Q}) ->
+    GaugeFns =
+        [
+            fun emqx_resource_metrics:batching_set/3,
+            fun emqx_resource_metrics:inflight_set/3
+        ],
+    lists:foreach(fun(Fn) -> Fn(Id, Index, 0) end, GaugeFns),
+    emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)),
     gproc_pool:disconnect_worker(Id, {Id, Index}).
 
 code_change(_OldVsn, State, _Extra) ->
@@ -240,24 +269,33 @@ do_resume(#{id := Id, name := Name} = St) ->
 
 retry_queue(#{queue := undefined} = St) ->
     {next_state, running, St};
-retry_queue(#{queue := Q, id := Id, enable_batch := false, resume_interval := ResumeT} = St) ->
+retry_queue(
+    #{
+        queue := Q,
+        id := Id,
+        index := Index,
+        enable_batch := false,
+        resume_interval := ResumeT
+    } = St
+) ->
     case get_first_n_from_queue(Q, 1) of
         [] ->
             {next_state, running, St};
         [?QUERY(_, Request, HasSent) = Query] ->
             QueryOpts = #{inflight_name => maps:get(name, St)},
-            Result = call_query(configured, Id, Query, QueryOpts),
+            Result = call_query(configured, Id, Index, Query, QueryOpts),
             case reply_caller(Id, ?REPLY(undefined, Request, HasSent, Result)) of
                 true ->
                     {keep_state, St, {state_timeout, ResumeT, resume}};
                 false ->
-                    retry_queue(St#{queue := drop_head(Q, Id)})
+                    retry_queue(St#{queue := drop_head(Q, Id, Index)})
             end
     end;
 retry_queue(
     #{
         queue := Q,
         id := Id,
+        index := Index,
         enable_batch := true,
         batch_size := BatchSize,
         resume_interval := ResumeT
@@ -268,7 +306,7 @@ retry_queue(
             {next_state, running, St};
         Batch0 ->
             QueryOpts = #{inflight_name => maps:get(name, St)},
-            Result = call_query(configured, Id, Batch0, QueryOpts),
+            Result = call_query(configured, Id, Index, Batch0, QueryOpts),
             %% The caller has been replied with ?RESOURCE_ERROR(blocked, _) before saving into the queue,
             %% we now change the 'from' field to 'undefined' so it will not reply the caller again.
             Batch = [?QUERY(undefined, Request, HasSent) || ?QUERY(_, Request, HasSent) <- Batch0],
@@ -276,41 +314,55 @@ retry_queue(
                 true ->
                     {keep_state, St, {state_timeout, ResumeT, resume}};
                 false ->
-                    retry_queue(St#{queue := drop_first_n_from_queue(Q, length(Batch), Id)})
+                    retry_queue(St#{queue := drop_first_n_from_queue(Q, length(Batch), Id, Index)})
             end
     end.
 
 retry_inflight_sync(
-    Id, Ref, ?QUERY(_, _, HasSent) = Query, Name, #{resume_interval := ResumeT} = St0
+    Id,
+    Ref,
+    ?QUERY(_, _, HasSent) = Query,
+    Name,
+    #{index := Index, resume_interval := ResumeT} = St0
 ) ->
-    Result = call_query(sync, Id, Query, #{}),
+    Result = call_query(sync, Id, Index, Query, #{}),
     case handle_query_result(Id, Result, HasSent, false) of
         %% Send failed because resource down
         true ->
             {keep_state, St0, {state_timeout, ResumeT, resume}};
         %% Send ok or failed but the resource is working
         false ->
-            inflight_drop(Name, Ref),
+            inflight_drop(Name, Ref, Id, Index),
             do_resume(St0)
     end.
 
-query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) ->
+query_or_acc(
+    From,
+    Request,
+    #{
+        enable_batch := true,
+        acc := Acc,
+        acc_left := Left,
+        index := Index,
+        id := Id
+    } = St0
+) ->
     Acc1 = [?QUERY(From, Request, false) | Acc],
-    emqx_resource_metrics:batching_change(Id, 1),
+    emqx_resource_metrics:batching_shift(Id, Index, 1),
     St = St0#{acc := Acc1, acc_left := Left - 1},
     case Left =< 1 of
         true -> flush(St);
         false -> {keep_state, ensure_flush_timer(St)}
     end;
-query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id} = St) ->
+query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id, index := Index} = St) ->
     QueryOpts = #{
         inflight_name => maps:get(name, St)
     },
-    Result = call_query(configured, Id, ?QUERY(From, Request, false), QueryOpts),
+    Result = call_query(configured, Id, Index, ?QUERY(From, Request, false), QueryOpts),
     case reply_caller(Id, ?REPLY(From, Request, false, Result)) of
         true ->
             Query = ?QUERY(From, Request, false),
-            {next_state, blocked, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(Query)])}};
+            {next_state, blocked, St#{queue := maybe_append_queue(Id, Index, Q, [?Q_ITEM(Query)])}};
         false ->
             {keep_state, St}
     end.
@@ -320,6 +372,7 @@ flush(#{acc := []} = St) ->
 flush(
     #{
         id := Id,
+        index := Index,
         acc := Batch0,
         batch_size := Size,
         queue := Q0
@@ -329,12 +382,12 @@ flush(
     QueryOpts = #{
         inflight_name => maps:get(name, St)
     },
-    emqx_resource_metrics:batching_change(Id, -length(Batch)),
-    Result = call_query(configured, Id, Batch, QueryOpts),
+    emqx_resource_metrics:batching_shift(Id, Index, -length(Batch)),
+    Result = call_query(configured, Id, Index, Batch, QueryOpts),
     St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}),
     case batch_reply_caller(Id, Result, Batch) of
         true ->
-            Q1 = maybe_append_queue(Id, Q0, [?Q_ITEM(Query) || Query <- Batch]),
+            Q1 = maybe_append_queue(Id, Index, Q0, [?Q_ITEM(Query) || Query <- Batch]),
             {next_state, blocked, St1#{queue := Q1}};
         false ->
             {keep_state, St1}
@@ -412,7 +465,7 @@ handle_query_result(Id, Result, HasSent, BlockWorker) ->
     inc_sent_success(Id, HasSent),
     BlockWorker.
 
-call_query(QM0, Id, Query, QueryOpts) ->
+call_query(QM0, Id, Index, Query, QueryOpts) ->
     ?tp(call_query_enter, #{id => Id, query => Query}),
     case emqx_resource_manager:ets_lookup(Id) of
         {ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} ->
@@ -423,7 +476,7 @@ call_query(QM0, Id, Query, QueryOpts) ->
                 end,
             CM = maps:get(callback_mode, Data),
             emqx_resource_metrics:matched_inc(Id),
-            apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt, QueryOpts);
+            apply_query_fun(call_mode(QM, CM), Mod, Id, Index, Query, ResSt, QueryOpts);
         {ok, _Group, #{status := stopped}} ->
             emqx_resource_metrics:matched_inc(Id),
             ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
@@ -452,10 +505,10 @@ call_query(QM0, Id, Query, QueryOpts) ->
     end
 ).
 
-apply_query_fun(sync, Mod, Id, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) ->
+apply_query_fun(sync, Mod, Id, _Index, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) ->
     ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}),
     ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request);
-apply_query_fun(async, Mod, Id, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
+apply_query_fun(async, Mod, Id, Index, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
     ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
     Name = maps:get(inflight_name, QueryOpts, undefined),
     ?APPLY_RESOURCE(
@@ -464,21 +517,20 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts)
             true ->
                 {async_return, inflight_full};
             false ->
-                ok = emqx_resource_metrics:inflight_change(Id, 1),
-                ReplyFun = fun ?MODULE:reply_after_query/6,
+                ReplyFun = fun ?MODULE:reply_after_query/7,
                 Ref = make_message_ref(),
-                Args = [self(), Id, Name, Ref, Query],
-                ok = inflight_append(Name, Ref, Query),
+                Args = [self(), Id, Index, Name, Ref, Query],
+                ok = inflight_append(Name, Ref, Query, Id, Index),
                 Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt),
                 {async_return, Result}
         end,
         Request
     );
-apply_query_fun(sync, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) ->
+apply_query_fun(sync, Mod, Id, _Index, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) ->
     ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
     Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
     ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
-apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
+apply_query_fun(async, Mod, Id, Index, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
     ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
     Name = maps:get(inflight_name, QueryOpts, undefined),
     ?APPLY_RESOURCE(
@@ -487,55 +539,46 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts)
             true ->
                 {async_return, inflight_full};
             false ->
-                BatchLen = length(Batch),
-                ok = emqx_resource_metrics:inflight_change(Id, BatchLen),
-                ReplyFun = fun ?MODULE:batch_reply_after_query/6,
+                ReplyFun = fun ?MODULE:batch_reply_after_query/7,
                 Ref = make_message_ref(),
-                Args = {ReplyFun, [self(), Id, Name, Ref, Batch]},
+                Args = {ReplyFun, [self(), Id, Index, Name, Ref, Batch]},
                 Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
-                ok = inflight_append(Name, Ref, Batch),
+                ok = inflight_append(Name, Ref, Batch, Id, Index),
                 Result = Mod:on_batch_query_async(Id, Requests, Args, ResSt),
                 {async_return, Result}
         end,
         Batch
     ).
 
-reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request, HasSent), Result) ->
-    %% NOTE: 'inflight' is message count that sent async but no ACK received,
-    %%        NOT the message number ququed in the inflight window.
-    emqx_resource_metrics:inflight_change(Id, -1),
+reply_after_query(Pid, Id, Index, Name, Ref, ?QUERY(From, Request, HasSent), Result) ->
+    %% NOTE: 'inflight' is the count of messages that were sent async
+    %% but received no ACK, NOT the number of messages queued in the
+    %% inflight window.
     case reply_caller(Id, ?REPLY(From, Request, HasSent, Result)) of
         true ->
-            %% we marked these messages are 'queuing' although they are actually
-            %% keeped in inflight window, not replayq
-            emqx_resource_metrics:queuing_change(Id, 1),
             ?MODULE:block(Pid);
         false ->
-            drop_inflight_and_resume(Pid, Name, Ref)
+            drop_inflight_and_resume(Pid, Name, Ref, Id, Index)
     end.
 
-batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) ->
-    %% NOTE: 'inflight' is message count that sent async but no ACK received,
-    %%        NOT the message number ququed in the inflight window.
-    BatchLen = length(Batch),
-    emqx_resource_metrics:inflight_change(Id, -BatchLen),
+batch_reply_after_query(Pid, Id, Index, Name, Ref, Batch, Result) ->
+    %% NOTE: 'inflight' is the count of messages that were sent async
+    %% but received no ACK, NOT the number of messages queued in the
+    %% inflight window.
     case batch_reply_caller(Id, Result, Batch) of
         true ->
-            %% we marked these messages are 'queuing' although they are actually
-            %% kept in inflight window, not replayq
-            emqx_resource_metrics:queuing_change(Id, BatchLen),
             ?MODULE:block(Pid);
         false ->
-            drop_inflight_and_resume(Pid, Name, Ref)
+            drop_inflight_and_resume(Pid, Name, Ref, Id, Index)
     end.
 
-drop_inflight_and_resume(Pid, Name, Ref) ->
+drop_inflight_and_resume(Pid, Name, Ref, Id, Index) ->
     case inflight_is_full(Name) of
         true ->
-            inflight_drop(Name, Ref),
+            inflight_drop(Name, Ref, Id, Index),
             ?MODULE:resume(Pid);
         false ->
-            inflight_drop(Name, Ref)
+            inflight_drop(Name, Ref, Id, Index)
     end.
 
 %%==============================================================================
@@ -548,10 +591,10 @@ queue_item_marshaller(Bin) when is_binary(Bin) ->
 estimate_size(QItem) ->
     size(queue_item_marshaller(QItem)).
 
-maybe_append_queue(Id, undefined, _Items) ->
+maybe_append_queue(Id, _Index, undefined, _Items) ->
     emqx_resource_metrics:dropped_queue_not_enabled_inc(Id),
     undefined;
-maybe_append_queue(Id, Q, Items) ->
+maybe_append_queue(Id, Index, Q, Items) ->
     Q2 =
         case replayq:overflow(Q) of
             Overflow when Overflow =< 0 ->
@@ -561,13 +604,13 @@ maybe_append_queue(Id, Q, Items) ->
                 {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts),
                 ok = replayq:ack(Q1, QAckRef),
                 Dropped = length(Items2),
-                emqx_resource_metrics:queuing_change(Id, -Dropped),
                 emqx_resource_metrics:dropped_queue_full_inc(Id),
                 ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}),
                 Q1
         end,
-    emqx_resource_metrics:queuing_change(Id, 1),
-    replayq:append(Q2, Items).
+    Q3 = replayq:append(Q2, Items),
+    emqx_resource_metrics:queuing_set(Id, Index, replayq:count(Q3)),
+    Q3.
 
 get_first_n_from_queue(Q, N) ->
     get_first_n_from_queue(Q, N, []).
@@ -580,23 +623,23 @@ get_first_n_from_queue(Q, N, Acc) when N > 0 ->
         ?Q_ITEM(Query) -> get_first_n_from_queue(Q, N - 1, [Query | Acc])
     end.
 
-drop_first_n_from_queue(Q, 0, _Id) ->
+drop_first_n_from_queue(Q, 0, _Id, _Index) ->
     Q;
-drop_first_n_from_queue(Q, N, Id) when N > 0 ->
-    drop_first_n_from_queue(drop_head(Q, Id), N - 1, Id).
+drop_first_n_from_queue(Q, N, Id, Index) when N > 0 ->
+    drop_first_n_from_queue(drop_head(Q, Id, Index), N - 1, Id, Index).
 
-drop_head(Q, Id) ->
-    {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}),
-    ok = replayq:ack(Q1, AckRef),
-    emqx_resource_metrics:queuing_change(Id, -1),
-    Q1.
+drop_head(Q, Id, Index) ->
+    {NewQ, AckRef, _} = replayq:pop(Q, #{count_limit => 1}),
+    ok = replayq:ack(NewQ, AckRef),
+    emqx_resource_metrics:queuing_set(Id, Index, replayq:count(NewQ)),
+    NewQ.
 
 %%==============================================================================
 %% the inflight queue for async query
 -define(SIZE_REF, -1).
-inflight_new(Name, InfltWinSZ) ->
+inflight_new(Name, InfltWinSZ, Id, Index) ->
     _ = ets:new(Name, [named_table, ordered_set, public, {write_concurrency, true}]),
-    inflight_append(Name, ?SIZE_REF, {max_size, InfltWinSZ}),
+    inflight_append(Name, ?SIZE_REF, {max_size, InfltWinSZ}, Id, Index),
     ok.
 
 inflight_get_first(Name) ->
@@ -617,27 +660,39 @@ inflight_is_full(undefined) ->
     false;
 inflight_is_full(Name) ->
     [{_, {max_size, MaxSize}}] = ets:lookup(Name, ?SIZE_REF),
+    Size = inflight_size(Name),
+    Size >= MaxSize.
+
+inflight_size(Name) ->
+    %% Note: we subtract 1 because there's a metadata row that hold
+    %% the maximum size value.
+    MetadataRowCount = 1,
     case ets:info(Name, size) of
-        Size when Size > MaxSize -> true;
-        _ -> false
+        undefined -> 0;
+        Size -> max(0, Size - MetadataRowCount)
     end.
 
-inflight_append(undefined, _Ref, _Query) ->
+inflight_append(undefined, _Ref, _Query, _Id, _Index) ->
     ok;
-inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch) ->
+inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch, Id, Index) ->
     ets:insert(Name, {Ref, [?QUERY(From, Req, true) || ?QUERY(From, Req, _) <- Batch]}),
+    emqx_resource_metrics:inflight_set(Id, Index, inflight_size(Name)),
     ok;
-inflight_append(Name, Ref, ?QUERY(From, Req, _)) ->
+inflight_append(Name, Ref, ?QUERY(From, Req, _), Id, Index) ->
     ets:insert(Name, {Ref, ?QUERY(From, Req, true)}),
+    emqx_resource_metrics:inflight_set(Id, Index, inflight_size(Name)),
     ok;
-inflight_append(Name, Ref, Data) ->
+inflight_append(Name, Ref, Data, _Id, _Index) ->
     ets:insert(Name, {Ref, Data}),
+    %% this is a metadata row being inserted; therefore, we don't bump
+    %% the inflight metric.
     ok.
 
-inflight_drop(undefined, _) ->
+inflight_drop(undefined, _, _Id, _Index) ->
     ok;
-inflight_drop(Name, Ref) ->
+inflight_drop(Name, Ref, Id, Index) ->
     ets:delete(Name, Ref),
+    emqx_resource_metrics:inflight_set(Id, Index, inflight_size(Name)),
     ok.
 
 %%==============================================================================

+ 7 - 2
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -39,6 +39,7 @@ groups() ->
 init_per_testcase(_, Config) ->
     emqx_connector_demo:set_callback_mode(always_sync),
     Config.
+
 end_per_testcase(_, _Config) ->
     _ = emqx_resource:remove(?ID).
 
@@ -503,7 +504,10 @@ t_stop_start(_) ->
     ),
 
     %% add some metrics to test their persistence
-    emqx_resource_metrics:batching_change(?ID, 5),
+    WorkerID0 = <<"worker:0">>,
+    WorkerID1 = <<"worker:1">>,
+    emqx_resource_metrics:batching_set(?ID, WorkerID0, 2),
+    emqx_resource_metrics:batching_set(?ID, WorkerID1, 3),
     ?assertEqual(5, emqx_resource_metrics:batching_get(?ID)),
 
     {ok, _} = emqx_resource:check_and_recreate(
@@ -537,7 +541,8 @@ t_stop_start(_) ->
     ?assert(is_process_alive(Pid1)),
 
     %% now stop while resetting the metrics
-    emqx_resource_metrics:batching_change(?ID, 5),
+    emqx_resource_metrics:batching_set(?ID, WorkerID0, 1),
+    emqx_resource_metrics:batching_set(?ID, WorkerID1, 4),
     ?assertEqual(5, emqx_resource_metrics:batching_get(?ID)),
     ok = emqx_resource:stop(?ID),
     ?assertEqual(0, emqx_resource_metrics:batching_get(?ID)),

+ 3 - 3
lib-ee/emqx_ee_bridge/rebar.config

@@ -1,9 +1,9 @@
 {erl_opts, [debug_info]}.
 {deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.33.0"}}}
-       , {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.0"}}}
-       , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.0"}}}
+       , {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.4"}}}
+       , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.2"}}}
        , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}}
-       , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.4"}}}
+       , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.7"}}}
        , {emqx_connector, {path, "../../apps/emqx_connector"}}
        , {emqx_resource, {path, "../../apps/emqx_resource"}}
        , {emqx_bridge, {path, "../../apps/emqx_bridge"}}

+ 45 - 25
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl

@@ -33,7 +33,10 @@ on_start(InstId, Config) ->
         authentication := Auth,
         ssl := SSL
     } = Config,
-    _ = maybe_install_wolff_telemetry_handlers(InstId),
+    %% TODO: change this to `kafka_producer` after refactoring for kafka_consumer
+    BridgeType = kafka,
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
+    _ = maybe_install_wolff_telemetry_handlers(InstId, ResourceID),
     %% it's a bug if producer config is not found
     %% the caller should not try to start a producer if
     %% there is no producer config
@@ -137,7 +140,7 @@ on_query(_InstId, {send_message, Message}, #{message_template := Template, produ
     %% If the producer process is down when sending, this function would
     %% raise an error exception which is to be caught by the caller of this callback
     {_Partition, _Pid} = wolff:send(Producers, [KafkaMessage], {fun ?MODULE:on_kafka_ack/3, [#{}]}),
-    ok.
+    {async_return, ok}.
 
 compile_message_template(#{
     key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate
@@ -304,62 +307,79 @@ get_required(Field, Config, Throw) ->
     Value =:= none andalso throw(Throw),
     Value.
 
+%% we *must* match the bridge id in the event metadata with that in
+%% the handler config; otherwise, multiple kafka producer bridges will
+%% install multiple handlers to the same wolff events, multiplying the
 handle_telemetry_event(
     [wolff, dropped],
     #{counter_inc := Val},
     #{bridge_id := ID},
-    _HandlerConfig
+    #{bridge_id := ID}
 ) when is_integer(Val) ->
     emqx_resource_metrics:dropped_inc(ID, Val);
 handle_telemetry_event(
     [wolff, dropped_queue_full],
     #{counter_inc := Val},
     #{bridge_id := ID},
-    _HandlerConfig
+    #{bridge_id := ID}
 ) when is_integer(Val) ->
-    emqx_resource_metrics:dropped_queue_full_inc(ID, Val);
+    %% When wolff emits a `dropped_queue_full' event due to replayq
+    %% overflow, it also emits a `dropped' event (at the time of
+    %% writing, wolff is 1.7.4).  Since we already bump `dropped' when
+    %% `dropped.queue_full' occurs, we have to correct it here.  This
+    %% correction will have to be dropped if wolff stops also emitting
+    %% `dropped'.
+    emqx_resource_metrics:dropped_queue_full_inc(ID, Val),
+    emqx_resource_metrics:dropped_inc(ID, -Val);
 handle_telemetry_event(
     [wolff, queuing],
-    #{counter_inc := Val},
-    #{bridge_id := ID},
-    _HandlerConfig
+    #{gauge_set := Val},
+    #{bridge_id := ID, partition_id := PartitionID},
+    #{bridge_id := ID}
 ) when is_integer(Val) ->
-    emqx_resource_metrics:queuing_change(ID, Val);
+    emqx_resource_metrics:queuing_set(ID, PartitionID, Val);
 handle_telemetry_event(
     [wolff, retried],
     #{counter_inc := Val},
     #{bridge_id := ID},
-    _HandlerConfig
+    #{bridge_id := ID}
 ) when is_integer(Val) ->
     emqx_resource_metrics:retried_inc(ID, Val);
 handle_telemetry_event(
     [wolff, failed],
     #{counter_inc := Val},
     #{bridge_id := ID},
-    _HandlerConfig
+    #{bridge_id := ID}
 ) when is_integer(Val) ->
     emqx_resource_metrics:failed_inc(ID, Val);
 handle_telemetry_event(
     [wolff, inflight],
-    #{counter_inc := Val},
-    #{bridge_id := ID},
-    _HandlerConfig
+    #{gauge_set := Val},
+    #{bridge_id := ID, partition_id := PartitionID},
+    #{bridge_id := ID}
 ) when is_integer(Val) ->
-    emqx_resource_metrics:inflight_change(ID, Val);
+    emqx_resource_metrics:inflight_set(ID, PartitionID, Val);
 handle_telemetry_event(
     [wolff, retried_failed],
     #{counter_inc := Val},
     #{bridge_id := ID},
-    _HandlerConfig
+    #{bridge_id := ID}
 ) when is_integer(Val) ->
     emqx_resource_metrics:retried_failed_inc(ID, Val);
 handle_telemetry_event(
     [wolff, retried_success],
     #{counter_inc := Val},
     #{bridge_id := ID},
-    _HandlerConfig
+    #{bridge_id := ID}
 ) when is_integer(Val) ->
     emqx_resource_metrics:retried_success_inc(ID, Val);
+handle_telemetry_event(
+    [wolff, success],
+    #{counter_inc := Val},
+    #{bridge_id := ID},
+    #{bridge_id := ID}
+) when is_integer(Val) ->
+    emqx_resource_metrics:success_inc(ID, Val);
 handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) ->
     %% Event that we do not handle
     ok.
@@ -372,17 +392,12 @@ uninstall_telemetry_handlers(InstanceID) ->
     HandlerID = telemetry_handler_id(InstanceID),
     telemetry:detach(HandlerID).
 
-maybe_install_wolff_telemetry_handlers(InstanceID) ->
+maybe_install_wolff_telemetry_handlers(InstanceID, ResourceID) ->
     %% Attach event handlers for Kafka telemetry events. If a handler with the
     %% handler id already exists, the attach_many function does nothing
     telemetry:attach_many(
         %% unique handler id
         telemetry_handler_id(InstanceID),
-        %% Note: we don't handle `[wolff, success]' because,
-        %% currently, we already increment the success counter for
-        %% this resource at `emqx_rule_runtime:handle_action' when
-        %% the response is `ok' and we would double increment it
-        %% here.
         [
             [wolff, dropped],
             [wolff, dropped_queue_full],
@@ -391,8 +406,13 @@ maybe_install_wolff_telemetry_handlers(InstanceID) ->
             [wolff, failed],
             [wolff, inflight],
             [wolff, retried_failed],
-            [wolff, retried_success]
+            [wolff, retried_success],
+            [wolff, success]
         ],
         fun ?MODULE:handle_telemetry_event/4,
-        []
+        %% we *must* keep track of the same id that is handed down to
+        %% wolff producers; otherwise, multiple kafka producer bridges
+        %% will install multiple handlers to the same wolff events,
+        %% multiplying the metric counts...
+        #{bridge_id => ResourceID}
     ).

+ 2 - 2
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl

@@ -390,7 +390,7 @@ t_failed_creation_then_fix(_Config) ->
     },
     {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
     ct:pal("base offset before testing ~p", [Offset]),
-    ?assertEqual(ok, ?PRODUCER:on_query(ResourceId, {send_message, Msg}, State)),
+    ?assertEqual({async_return, ok}, ?PRODUCER:on_query(ResourceId, {send_message, Msg}, State)),
     {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
     ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
     %% TODO: refactor those into init/end per testcase
@@ -455,7 +455,7 @@ publish_helper(#{
     StartRes = ?PRODUCER:on_start(InstId, Conf),
     {ok, State} = StartRes,
     OnQueryRes = ?PRODUCER:on_query(InstId, {send_message, Msg}, State),
-    ok = OnQueryRes,
+    {async_return, ok} = OnQueryRes,
     {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
     ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
     ok = ?PRODUCER:on_stop(InstId, State),

+ 56 - 2
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl

@@ -124,6 +124,7 @@ init_per_testcase(TestCase, Config0) when
             delete_all_bridges(),
             Tid = install_telemetry_handler(TestCase),
             Config = generate_config(Config0),
+            put(telemetry_table, Tid),
             [{telemetry_table, Tid} | Config];
         false ->
             {skip, no_batching}
@@ -133,12 +134,14 @@ init_per_testcase(TestCase, Config0) ->
     delete_all_bridges(),
     Tid = install_telemetry_handler(TestCase),
     Config = generate_config(Config0),
+    put(telemetry_table, Tid),
     [{telemetry_table, Tid} | Config].
 
 end_per_testcase(_TestCase, _Config) ->
     ok = snabbkaffe:stop(),
     delete_all_bridges(),
     ok = emqx_connector_web_hook_server:stop(),
+    emqx_common_test_helpers:call_janitor(),
     ok.
 
 %%------------------------------------------------------------------------------
@@ -392,7 +395,11 @@ assert_metrics(ExpectedMetrics, ResourceId) ->
             maps:keys(ExpectedMetrics)
         ),
     CurrentMetrics = current_metrics(ResourceId),
-    ?assertEqual(ExpectedMetrics, Metrics, #{current_metrics => CurrentMetrics}),
+    TelemetryTable = get(telemetry_table),
+    RecordedEvents = ets:tab2list(TelemetryTable),
+    ?assertEqual(ExpectedMetrics, Metrics, #{
+        current_metrics => CurrentMetrics, recorded_events => RecordedEvents
+    }),
     ok.
 
 assert_empty_metrics(ResourceId) ->
@@ -553,6 +560,7 @@ t_publish_success(Config) ->
     ResourceId = ?config(resource_id, Config),
     ServiceAccountJSON = ?config(service_account_json, Config),
     TelemetryTable = ?config(telemetry_table, Config),
+    QueryMode = ?config(query_mode, Config),
     Topic = <<"t/topic">>,
     ?check_trace(
         create_bridge(Config),
@@ -581,6 +589,17 @@ t_publish_success(Config) ->
     ),
     %% to avoid test flakiness
     wait_telemetry_event(TelemetryTable, success, ResourceId),
+    ExpectedInflightEvents =
+        case QueryMode of
+            sync -> 1;
+            async -> 3
+        end,
+    wait_telemetry_event(
+        TelemetryTable,
+        inflight,
+        ResourceId,
+        #{n_events => ExpectedInflightEvents, timeout => 5_000}
+    ),
     assert_metrics(
         #{
             batching => 0,
@@ -600,6 +619,7 @@ t_publish_success_local_topic(Config) ->
     ResourceId = ?config(resource_id, Config),
     ServiceAccountJSON = ?config(service_account_json, Config),
     TelemetryTable = ?config(telemetry_table, Config),
+    QueryMode = ?config(query_mode, Config),
     LocalTopic = <<"local/topic">>,
     {ok, _} = create_bridge(Config, #{<<"local_topic">> => LocalTopic}),
     assert_empty_metrics(ResourceId),
@@ -618,6 +638,17 @@ t_publish_success_local_topic(Config) ->
     ),
     %% to avoid test flakiness
     wait_telemetry_event(TelemetryTable, success, ResourceId),
+    ExpectedInflightEvents =
+        case QueryMode of
+            sync -> 1;
+            async -> 3
+        end,
+    wait_telemetry_event(
+        TelemetryTable,
+        inflight,
+        ResourceId,
+        #{n_events => ExpectedInflightEvents, timeout => 5_000}
+    ),
     assert_metrics(
         #{
             batching => 0,
@@ -648,6 +679,7 @@ t_publish_templated(Config) ->
     ResourceId = ?config(resource_id, Config),
     ServiceAccountJSON = ?config(service_account_json, Config),
     TelemetryTable = ?config(telemetry_table, Config),
+    QueryMode = ?config(query_mode, Config),
     Topic = <<"t/topic">>,
     PayloadTemplate = <<
         "{\"payload\": \"${payload}\","
@@ -693,6 +725,17 @@ t_publish_templated(Config) ->
     ),
     %% to avoid test flakiness
     wait_telemetry_event(TelemetryTable, success, ResourceId),
+    ExpectedInflightEvents =
+        case QueryMode of
+            sync -> 1;
+            async -> 3
+        end,
+    wait_telemetry_event(
+        TelemetryTable,
+        inflight,
+        ResourceId,
+        #{n_events => ExpectedInflightEvents, timeout => 5_000}
+    ),
     assert_metrics(
         #{
             batching => 0,
@@ -1046,7 +1089,7 @@ do_econnrefused_or_timeout_test(Config, Error) ->
         %% response expired, this succeeds.
         {econnrefused, async, _} ->
             wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{
-                timeout => 10_000, n_events => 2
+                timeout => 10_000, n_events => 1
             }),
             CurrentMetrics = current_metrics(ResourceId),
             RecordedEvents = ets:tab2list(TelemetryTable),
@@ -1283,6 +1326,17 @@ t_unrecoverable_error(Config) ->
         end
     ),
     wait_telemetry_event(TelemetryTable, failed, ResourceId),
+    ExpectedInflightEvents =
+        case QueryMode of
+            sync -> 1;
+            async -> 3
+        end,
+    wait_telemetry_event(
+        TelemetryTable,
+        inflight,
+        ResourceId,
+        #{n_events => ExpectedInflightEvents, timeout => 5_000}
+    ),
     assert_metrics(
         #{
             batching => 0,

+ 3 - 3
mix.exs

@@ -132,10 +132,10 @@ defmodule EMQXUmbrella.MixProject do
     [
       {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"},
       {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.4", override: true},
-      {:wolff, github: "kafka4beam/wolff", tag: "1.7.0"},
-      {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.0", override: true},
+      {:wolff, github: "kafka4beam/wolff", tag: "1.7.4"},
+      {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.2", override: true},
       {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"},
-      {:brod, github: "kafka4beam/brod", tag: "3.16.4"},
+      {:brod, github: "kafka4beam/brod", tag: "3.16.7"},
       {:snappyer, "1.2.8", override: true},
       {:supervisor3, "1.1.11", override: true}
     ]

+ 3 - 2
scripts/merge-i18n.escript

@@ -10,8 +10,9 @@ main(_) ->
     Conf = [merge(Conf0, Cfgs1),
             io_lib:nl()
            ],
-    ok = filelib:ensure_dir("apps/emqx_dashboard/priv/i18n.conf"),
-    ok = file:write_file("apps/emqx_dashboard/priv/i18n.conf", Conf).
+    OutputFile = "apps/emqx_dashboard/priv/i18n.conf",
+    ok = filelib:ensure_dir(OutputFile),
+    ok = file:write_file(OutputFile, Conf).
 
 merge(BaseConf, Cfgs) ->
     lists:foldl(