Просмотр исходного кода

feat: add support for counters and gauges to the Kafka Bridge

This commit adds support for counters and gauges to the Kafka Brige.
The Kafka bridge uses [Wolff](https://github.com/kafka4beam/wolff) for
the  Kafka connection. Wolff does its own batching and does not use the
batching functionality in `emqx_resource_worker` that is used by other
bridge types. Therefore, the counter events have to be generated by
Wolff. We have added
[telemetry](https://github.com/beam-telemetry/telemetry) events to Wolff
that we hook into to change counters and gauges for the Kafka bridge. The
counter called `matched` does not depend on specific functionality of
any bridge type so the updates of this counter is moved higher up in the
call chain then previously so that it also gets updated for Kafka
bridges.
Kjell Winblad 3 лет назад
Родитель
Сommit
57270fb8fc

+ 197 - 0
apps/emqx_resource/src/emqx_resource_metrics.erl

@@ -0,0 +1,197 @@
+-module(emqx_resource_metrics).
+
+-export([
+    batching_change/2,
+    batching_get/1,
+    inflight_change/2,
+    inflight_get/1,
+    queuing_change/2,
+    queuing_get/1,
+    dropped_inc/1,
+    dropped_inc/2,
+    dropped_get/1,
+    dropped_other_inc/1,
+    dropped_other_inc/2,
+    dropped_other_get/1,
+    dropped_queue_full_inc/1,
+    dropped_queue_full_inc/2,
+    dropped_queue_full_get/1,
+    dropped_queue_not_enabled_inc/1,
+    dropped_queue_not_enabled_inc/2,
+    dropped_queue_not_enabled_get/1,
+    dropped_resource_not_found_inc/1,
+    dropped_resource_not_found_inc/2,
+    dropped_resource_not_found_get/1,
+    dropped_resource_stopped_inc/1,
+    dropped_resource_stopped_inc/2,
+    dropped_resource_stopped_get/1,
+    failed_inc/1,
+    failed_inc/2,
+    failed_get/1,
+    matched_inc/1,
+    matched_inc/2,
+    matched_get/1,
+    retried_inc/1,
+    retried_inc/2,
+    retried_get/1,
+    retried_failed_inc/1,
+    retried_failed_inc/2,
+    retried_failed_get/1,
+    retried_success_inc/1,
+    retried_success_inc/2,
+    retried_success_get/1,
+    success_inc/1,
+    success_inc/2,
+    success_get/1
+]).
+
+-define(RES_METRICS, resource_metrics).
+
+%% Gauges (value can go both up and down):
+%% --------------------------------------
+
+%% @doc Count of messages that are currently accumulated in memory waiting for
+%% being sent in one batch
+batching_change(ID, Val) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'batching', Val).
+
+batching_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'batching').
+
+%% @doc Count of messages that are currently queuing. [Gauge]
+queuing_change(ID, Val) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'queuing', Val).
+
+queuing_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'queuing').
+
+%% @doc Count of messages that were sent asynchronously but ACKs are not
+%% received. [Gauge]
+inflight_change(ID, Val) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'inflight', Val).
+
+inflight_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'inflight').
+
+%% Counters (value can only got up):
+%% --------------------------------------
+
+%% @doc Count of messages dropped
+dropped_inc(ID) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped').
+
+dropped_inc(ID, Val) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val).
+
+dropped_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped').
+
+%% @doc Count of messages dropped due to other reasons
+dropped_other_inc(ID) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other').
+
+dropped_other_inc(ID, Val) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val).
+
+dropped_other_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.other').
+
+%% @doc Count of messages dropped because the queue was full
+dropped_queue_full_inc(ID) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full').
+
+dropped_queue_full_inc(ID, Val) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full', Val).
+
+dropped_queue_full_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_full').
+
+%% @doc Count of messages dropped because the queue was not enabled
+dropped_queue_not_enabled_inc(ID) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled').
+
+dropped_queue_not_enabled_inc(ID, Val) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled', Val).
+
+dropped_queue_not_enabled_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_not_enabled').
+
+%% @doc Count of messages dropped because the resource was not found
+dropped_resource_not_found_inc(ID) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found').
+
+dropped_resource_not_found_inc(ID, Val) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found', Val).
+
+dropped_resource_not_found_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.resource_not_found').
+
+%% @doc Count of messages dropped because the resource was stopped
+dropped_resource_stopped_inc(ID) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped').
+
+dropped_resource_stopped_inc(ID, Val) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val).
+
+dropped_resource_stopped_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.resource_stopped').
+
+%% @doc Count of how many times this bridge has been matched and queried
+matched_inc(ID) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched').
+
+matched_inc(ID, Val) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val).
+
+matched_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'matched').
+
+%% @doc The number of times message sends have been retried
+retried_inc(ID) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried').
+
+retried_inc(ID, Val) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val).
+
+retried_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'retried').
+
+%% @doc Count of message sends that have failed
+failed_inc(ID) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed').
+
+failed_inc(ID, Val) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val).
+
+failed_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'failed').
+
+%%% @doc Count of message sends that have failed after having been retried
+retried_failed_inc(ID) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.failed').
+
+retried_failed_inc(ID, Val) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.failed', Val).
+
+retried_failed_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'retried.failed').
+
+%% @doc Count messages that were sucessfully sent after at least one retry
+retried_success_inc(ID) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.success').
+
+retried_success_inc(ID, Val) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.success', Val).
+
+retried_success_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'retried.success').
+
+%% @doc Count of messages that have been sent successfully
+success_inc(ID) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'success').
+
+success_inc(ID, Val) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val).
+
+success_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'success').

+ 32 - 35
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -80,27 +80,23 @@ start_link(Id, Index, Opts) ->
 sync_query(Id, Request, Opts) ->
     PickKey = maps:get(pick_key, Opts, self()),
     Timeout = maps:get(timeout, Opts, infinity),
-    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
     pick_call(Id, PickKey, {query, Request, Opts}, Timeout).
 
 -spec async_query(id(), request(), query_opts()) -> Result :: term().
 async_query(Id, Request, Opts) ->
     PickKey = maps:get(pick_key, Opts, self()),
-    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
     pick_cast(Id, PickKey, {query, Request, Opts}).
 
 %% simple query the resource without batching and queuing messages.
 -spec simple_sync_query(id(), request()) -> Result :: term().
 simple_sync_query(Id, Request) ->
     Result = call_query(sync, Id, ?QUERY(self(), Request, false), #{}),
-    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
     _ = handle_query_result(Id, Result, false, false),
     Result.
 
 -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term().
 simple_async_query(Id, Request, ReplyFun) ->
     Result = call_query(async, Id, ?QUERY(ReplyFun, Request, false), #{}),
-    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
     _ = handle_query_result(Id, Result, false, false),
     Result.
 
@@ -134,7 +130,7 @@ init({Id, Index, Opts}) ->
             false ->
                 undefined
         end,
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', queue_count(Queue)),
+    emqx_resource_metrics:queuing_change(Id, queue_count(Queue)),
     InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
     ok = inflight_new(Name, InfltWinSZ),
     HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
@@ -297,7 +293,7 @@ retry_inflight_sync(
 
 query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) ->
     Acc1 = [?QUERY(From, Request, false) | Acc],
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching'),
+    emqx_resource_metrics:batching_change(Id, 1),
     St = St0#{acc := Acc1, acc_left := Left - 1},
     case Left =< 1 of
         true -> flush(St);
@@ -330,7 +326,7 @@ flush(
     QueryOpts = #{
         inflight_name => maps:get(name, St)
     },
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching', -length(Batch)),
+    emqx_resource_metrics:batching_change(Id, -length(Batch)),
     Result = call_query(configured, Id, Batch, QueryOpts),
     St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}),
     case batch_reply_caller(Id, Result, Batch) of
@@ -380,18 +376,18 @@ handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasSent, _) when
     true;
 handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasSent, BlockWorker) ->
     ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_not_found'),
+    emqx_resource_metrics:dropped_inc(Id),
+    emqx_resource_metrics:dropped_resource_not_found_inc(Id),
     BlockWorker;
 handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasSent, BlockWorker) ->
     ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_stopped'),
+    emqx_resource_metrics:dropped_inc(Id),
+    emqx_resource_metrics:dropped_resource_stopped_inc(Id),
     BlockWorker;
 handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), _HasSent, BlockWorker) ->
     ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'),
+    emqx_resource_metrics:dropped_inc(Id),
+    emqx_resource_metrics:dropped_other_inc(Id),
     BlockWorker;
 handle_query_result(Id, {error, {recoverable_error, Reason}}, _HasSent, _BlockWorker) ->
     %% the message will be queued in replayq or inflight window,
@@ -425,6 +421,7 @@ call_query(QM0, Id, Query, QueryOpts) ->
                     _ -> QM0
                 end,
             CM = maps:get(callback_mode, Data),
+            emqx_resource_metrics:matched_inc(Id),
             apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt, QueryOpts);
         {ok, _Group, #{status := stopped}} ->
             ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
@@ -464,7 +461,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts)
             true ->
                 {async_return, inflight_full};
             false ->
-                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight'),
+                ok = emqx_resource_metrics:inflight_change(Id, 1),
                 ReplyFun = fun ?MODULE:reply_after_query/6,
                 Ref = make_message_ref(),
                 Args = [self(), Id, Name, Ref, Query],
@@ -488,7 +485,7 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts)
                 {async_return, inflight_full};
             false ->
                 BatchLen = length(Batch),
-                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', BatchLen),
+                ok = emqx_resource_metrics:inflight_change(Id, BatchLen),
                 ReplyFun = fun ?MODULE:batch_reply_after_query/6,
                 Ref = make_message_ref(),
                 Args = {ReplyFun, [self(), Id, Name, Ref, Batch]},
@@ -503,12 +500,12 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts)
 reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request, HasSent), Result) ->
     %% NOTE: 'inflight' is message count that sent async but no ACK received,
     %%        NOT the message number ququed in the inflight window.
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', -1),
+    emqx_resource_metrics:inflight_change(Id, -1),
     case reply_caller(Id, ?REPLY(From, Request, HasSent, Result)) of
         true ->
             %% we marked these messages are 'queuing' although they are actually
             %% keeped in inflight window, not replayq
-            emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'),
+            emqx_resource_metrics:queuing_change(Id, 1),
             ?MODULE:block(Pid);
         false ->
             drop_inflight_and_resume(Pid, Name, Ref)
@@ -518,12 +515,12 @@ batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) ->
     %% NOTE: 'inflight' is message count that sent async but no ACK received,
     %%        NOT the message number ququed in the inflight window.
     BatchLen = length(Batch),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', -BatchLen),
+    emqx_resource_metrics:inflight_change(Id, -BatchLen),
     case batch_reply_caller(Id, Result, Batch) of
         true ->
             %% we marked these messages are 'queuing' although they are actually
-            %% keeped in inflight window, not replayq
-            emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', BatchLen),
+            %% kept in inflight window, not replayq
+            emqx_resource_metrics:queuing_change(Id, BatchLen),
             ?MODULE:block(Pid);
         false ->
             drop_inflight_and_resume(Pid, Name, Ref)
@@ -549,8 +546,8 @@ estimate_size(QItem) ->
     size(queue_item_marshaller(QItem)).
 
 maybe_append_queue(Id, undefined, _Items) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_not_enabled'),
+    emqx_resource_metrics:dropped_inc(Id),
+    emqx_resource_metrics:dropped_queue_not_enabled_inc(Id),
     undefined;
 maybe_append_queue(Id, Q, Items) ->
     Q2 =
@@ -562,13 +559,13 @@ maybe_append_queue(Id, Q, Items) ->
                 {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts),
                 ok = replayq:ack(Q1, QAckRef),
                 Dropped = length(Items2),
-                emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -Dropped),
-                emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
-                emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'),
+                emqx_resource_metrics:queuing_change(Id, -Dropped),
+                emqx_resource_metrics:dropped_inc(Id),
+                emqx_resource_metrics:dropped_queue_full_inc(Id),
                 ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}),
                 Q1
         end,
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'),
+    emqx_resource_metrics:queuing_change(Id, 1),
     replayq:append(Q2, Items).
 
 get_first_n_from_queue(Q, N) ->
@@ -590,7 +587,7 @@ drop_first_n_from_queue(Q, N, Id) when N > 0 ->
 drop_head(Q, Id) ->
     {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}),
     ok = replayq:ack(Q1, AckRef),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -1),
+    emqx_resource_metrics:queuing_change(Id, -1),
     Q1.
 
 %%==============================================================================
@@ -645,18 +642,18 @@ inflight_drop(Name, Ref) ->
 %%==============================================================================
 
 inc_sent_failed(Id, true) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried.failed');
+    emqx_resource_metrics:failed_inc(Id),
+    emqx_resource_metrics:retried_inc(Id),
+    emqx_resource_metrics:retried_failed_inc(Id);
 inc_sent_failed(Id, _HasSent) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed').
+    emqx_resource_metrics:failed_inc(Id).
 
 inc_sent_success(Id, true) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'success'),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried.success');
+    emqx_resource_metrics:success_inc(Id),
+    emqx_resource_metrics:retried_inc(Id),
+    emqx_resource_metrics:retried_success_inc(Id);
 inc_sent_success(Id, _HasSent) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'success').
+    emqx_resource_metrics:success_inc(Id).
 
 call_mode(sync, _) -> sync;
 call_mode(async, always_sync) -> sync;

+ 1 - 1
lib-ee/emqx_ee_bridge/rebar.config

@@ -1,6 +1,6 @@
 {erl_opts, [debug_info]}.
 {deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}}
-       , {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.6.4"}}}
+       , {wolff, {git, "https://github.com/kjellwinblad/wolff.git", {branch, "kjell/add_counters_support_ok"}}}
        , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.0"}}}
        , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}}
        , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.4"}}}

+ 2 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src

@@ -4,7 +4,8 @@
     {applications, [
         kernel,
         stdlib,
-        emqx_ee_connector
+        emqx_ee_connector,
+        telemetry
     ]},
     {env, []},
     {modules, []},

+ 97 - 2
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl

@@ -12,7 +12,10 @@
     on_get_status/2
 ]).
 
--export([on_kafka_ack/3]).
+-export([
+    on_kafka_ack/3,
+    handle_telemetry_event/4
+]).
 
 -include_lib("emqx/include/logger.hrl").
 
@@ -30,6 +33,7 @@ on_start(InstId, Config) ->
         authentication := Auth,
         ssl := SSL
     } = Config,
+    maybe_install_wolff_telemetry_handlers(),
     %% it's a bug if producer config is not found
     %% the caller should not try to start a producer if
     %% there is no producer config
@@ -222,6 +226,7 @@ producers_config(BridgeName, ClientId, Input) ->
             disk -> {false, replayq_dir(ClientId)};
             hybrid -> {true, replayq_dir(ClientId)}
         end,
+    BridgeNameBin = erlang:atom_to_binary(BridgeName),
     #{
         name => make_producer_name(BridgeName),
         partitioner => PartitionStrategy,
@@ -234,7 +239,9 @@ producers_config(BridgeName, ClientId, Input) ->
         required_acks => RequiredAcks,
         max_batch_bytes => MaxBatchBytes,
         max_send_ahead => MaxInflight - 1,
-        compression => Compression
+        compression => Compression,
+        telemetry_meta_data =>
+            #{bridge_id => <<<<"bridge:kafka:">>/binary, BridgeNameBin/binary>>}
     }.
 
 replayq_dir(ClientId) ->
@@ -268,3 +275,91 @@ get_required(Field, Config, Throw) ->
     Value = maps:get(Field, Config, none),
     Value =:= none andalso throw(Throw),
     Value.
+
+handle_telemetry_event(
+    [wolff, dropped],
+    #{counter_inc := Val},
+    #{bridge_id := ID},
+    _
+) when is_integer(Val) ->
+    emqx_resource_metrics:dropped_inc(ID, Val);
+handle_telemetry_event(
+    [wolff, dropped_queue_full],
+    #{counter_inc := Val},
+    #{bridge_id := ID},
+    _
+) when is_integer(Val) ->
+    emqx_resource_metrics:dropped_queue_full_inc(ID, Val);
+handle_telemetry_event(
+    [wolff, queuing],
+    #{counter_inc := Val},
+    #{bridge_id := ID},
+    _
+) when is_integer(Val) ->
+    emqx_resource_metrics:queuing_inc(ID, Val);
+handle_telemetry_event(
+    [wolff, retried],
+    #{counter_inc := Val},
+    #{bridge_id := ID},
+    _
+) when is_integer(Val) ->
+    emqx_resource_metrics:retried_inc(ID, Val);
+handle_telemetry_event(
+    [wolff, failed],
+    #{counter_inc := Val},
+    #{bridge_id := ID},
+    _
+) when is_integer(Val) ->
+    emqx_resource_metrics:failed_inc(ID, Val);
+handle_telemetry_event(
+    [wolff, inflight],
+    #{counter_inc := Val},
+    #{bridge_id := ID},
+    _
+) when is_integer(Val) ->
+    emqx_resource_metrics:inflight_inc(ID, Val);
+handle_telemetry_event(
+    [wolff, retried_failed],
+    #{counter_inc := Val},
+    #{bridge_id := ID},
+    _
+) when is_integer(Val) ->
+    emqx_resource_metrics:retried_failed_inc(ID, Val);
+handle_telemetry_event(
+    [wolff, retried_success],
+    #{counter_inc := Val},
+    #{bridge_id := ID},
+    _
+) when is_integer(Val) ->
+    emqx_resource_metrics:retried_success_inc(ID, Val);
+handle_telemetry_event(
+    [wolff, success],
+    #{counter_inc := Val},
+    #{bridge_id := ID},
+    _
+) when is_integer(Val) ->
+    emqx_resource_metrics:success_inc(ID, Val);
+handle_telemetry_event(_EventId, _Metrics, _MetaData, _Config) ->
+    %% Event that we do not handle
+    ok.
+
+maybe_install_wolff_telemetry_handlers() ->
+    %% Attach event handlers for Kafka telemetry events. If a handler with the
+    %% handler id already exists, the attach_many function does nothing
+    telemetry:attach_many(
+        %% unique handler id
+        <<"emqx-bridge-kafka-producer-telemetry-handler">>,
+        [
+            [wolff, dropped],
+            [wolff, dropped_queue_full],
+            [wolff, queuing],
+            [wolff, retried],
+            [wolff, failed],
+            [wolff, inflight],
+            [wolff, retried_failed],
+            [wolff, retried_success],
+            [wolff, success]
+        ],
+        fun emqx_bridge_impl_kafka_producer:handle_telemetry_event/4,
+        []
+    ).

+ 58 - 13
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl

@@ -184,10 +184,6 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) ->
             true -> kafka_hosts_string_ssl();
             false -> kafka_hosts_string()
         end,
-    kafka_bridge_rest_api_helper(#{
-        <<"bootstrap_hosts">> => NormalHostsString,
-        <<"authentication">> => <<"none">>
-    }),
     SASLHostsString =
         case UseSSL of
             true -> kafka_hosts_string_ssl_sasl();
@@ -204,6 +200,15 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) ->
             true -> #{<<"ssl">> => BinifyMap(valid_ssl_settings())};
             false -> #{}
         end,
+    kafka_bridge_rest_api_helper(
+        maps:merge(
+            #{
+                <<"bootstrap_hosts">> => NormalHostsString,
+                <<"authentication">> => <<"none">>
+            },
+            SSLSettings
+        )
+    ),
     kafka_bridge_rest_api_helper(
         maps:merge(
             #{
@@ -243,10 +248,20 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) ->
     ok.
 
 kafka_bridge_rest_api_helper(Config) ->
+    BridgeType = "kafka",
+    BridgeName = "my_kafka_bridge",
+    BridgeID = emqx_bridge_resource:bridge_id(
+        erlang:list_to_binary(BridgeType),
+        erlang:list_to_binary(BridgeName)
+    ),
+    ResourceId = emqx_bridge_resource:resource_id(
+        erlang:list_to_binary(BridgeType),
+        erlang:list_to_binary(BridgeName)
+    ),
     UrlEscColon = "%3A",
-    BridgeIdUrlEnc = "kafka" ++ UrlEscColon ++ "my_kafka_bridge",
+    BridgeIdUrlEnc = BridgeType ++ UrlEscColon ++ BridgeName,
     BridgesParts = ["bridges"],
-    BridgesPartsId = ["bridges", BridgeIdUrlEnc],
+    BridgesPartsIdDeleteAlsoActions = ["bridges", BridgeIdUrlEnc ++ "?also_delete_dep_actions"],
     OpUrlFun = fun(OpName) -> ["bridges", BridgeIdUrlEnc, "operation", OpName] end,
     BridgesPartsOpDisable = OpUrlFun("disable"),
     BridgesPartsOpEnable = OpUrlFun("enable"),
@@ -268,15 +283,13 @@ kafka_bridge_rest_api_helper(Config) ->
     case MyKafkaBridgeExists() of
         true ->
             %% Delete the bridge my_kafka_bridge
-            show(
-                '========================================== DELETE ========================================'
-            ),
-            {ok, 204, <<>>} = show(http_delete(BridgesPartsId));
+            {ok, 204, <<>>} = show(http_delete(BridgesPartsIdDeleteAlsoActions));
         false ->
             ok
     end,
     false = MyKafkaBridgeExists(),
     %% Create new Kafka bridge
+    KafkaTopic = "test-topic-one-partition",
     CreateBodyTmp = #{
         <<"type">> => <<"kafka">>,
         <<"name">> => <<"my_kafka_bridge">>,
@@ -288,7 +301,7 @@ kafka_bridge_rest_api_helper(Config) ->
                 topic => <<"t/#">>
             },
             <<"kafka">> => #{
-                <<"topic">> => <<"test-topic-one-partition">>
+                <<"topic">> => erlang:list_to_binary(KafkaTopic)
             }
         }
     },
@@ -300,6 +313,34 @@ kafka_bridge_rest_api_helper(Config) ->
     {ok, 201, _Data} = show(http_post(BridgesParts, show(CreateBody))),
     %% Check that the new bridge is in the list of bridges
     true = MyKafkaBridgeExists(),
+    %% Create a rule that uses the bridge
+    {ok, 201, _Rule} = http_post(
+        ["rules"],
+        #{
+            <<"name">> => <<"kafka_bridge_rest_api_helper_rule">>,
+            <<"enable">> => true,
+            <<"actions">> => [BridgeID],
+            <<"sql">> => <<"SELECT * from \"kafka_bridge_topic/#\"">>
+        }
+    ),
+    %% Get offset before sending message
+    {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
+    %% Send message to topic and check that it got forwarded to Kafka
+    Body = <<"message from EMQX">>,
+    emqx:publish(emqx_message:make(<<"kafka_bridge_topic/1">>, Body)),
+    %% Give Kafka some time to get message
+    timer:sleep(100),
+    %% Check that Kafka got message
+    BrodOut = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
+    {ok, {_, [KafkaMsg]}} = show(BrodOut),
+    Body = KafkaMsg#kafka_message.value,
+    %% Check crucial counters and gauges
+    1 = emqx_resource_metrics:matched_get(ResourceId),
+    1 = emqx_resource_metrics:success_get(ResourceId),
+    0 = emqx_resource_metrics:dropped_get(ResourceId),
+    0 = emqx_resource_metrics:failed_get(ResourceId),
+    0 = emqx_resource_metrics:inflight_get(ResourceId),
+    0 = emqx_resource_metrics:queuing_get(ResourceId),
     %% Perform operations
     {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})),
     {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})),
@@ -309,7 +350,7 @@ kafka_bridge_rest_api_helper(Config) ->
     {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})),
     {ok, 200, _} = show(http_post(show(BridgesPartsOpRestart), #{})),
     %% Cleanup
-    {ok, 204, _} = show(http_delete(BridgesPartsId)),
+    {ok, 204, _} = show(http_delete(BridgesPartsIdDeleteAlsoActions)),
     false = MyKafkaBridgeExists(),
     ok.
 
@@ -325,7 +366,8 @@ publish_with_and_without_ssl(AuthSettings) ->
     publish_helper(#{
         auth_settings => AuthSettings,
         ssl_settings => valid_ssl_settings()
-    }).
+    }),
+    ok.
 
 publish_helper(#{
     auth_settings := AuthSettings,
@@ -345,6 +387,7 @@ publish_helper(#{
     Hash = erlang:phash2([HostsString, AuthSettings, SSLSettings]),
     Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
     InstId = emqx_bridge_resource:resource_id("kafka", Name),
+    BridgeId = emqx_bridge_resource:bridge_id("kafka", Name),
     KafkaTopic = "test-topic-one-partition",
     Conf = config(#{
         "authentication" => AuthSettings,
@@ -353,6 +396,7 @@ publish_helper(#{
         "instance_id" => InstId,
         "ssl" => SSLSettings
     }),
+    emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), Conf, #{}),
     %% To make sure we get unique value
     timer:sleep(1),
     Time = erlang:monotonic_time(),
@@ -371,6 +415,7 @@ publish_helper(#{
     {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
     ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
     ok = ?PRODUCER:on_stop(InstId, State),
+    ok = emqx_bridge_resource:remove(BridgeId),
     ok.
 
 config(Args) ->

+ 2 - 2
mix.exs

@@ -130,7 +130,7 @@ defmodule EMQXUmbrella.MixProject do
     [
       {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"},
       {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.4", override: true},
-      {:wolff, github: "kafka4beam/wolff", tag: "1.6.4"},
+      {:wolff, github: "kjellwinblad/wolff", branch: "kjell/add_counters_support_ok"},
       {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.0", override: true},
       {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"},
       {:brod, github: "kafka4beam/brod", tag: "3.16.4"},
@@ -516,7 +516,7 @@ defmodule EMQXUmbrella.MixProject do
     |> Path.join("RELEASES")
     |> File.open!([:write, :utf8], fn handle ->
       IO.puts(handle, "%% coding: utf-8")
-      :io.format(handle, '~tp.~n', [release_entry])
+      :io.format(handle, ~c"~tp.~n", [release_entry])
     end)
 
     release