Explorar o código

Merge pull request #9128 from thalesmg/kjell/jira-EMQX-7312/kafka_rule_engine_counters_ok

feat: add support for counters and gauges to the Kafka Bridge
Thales Macedo Garitezi %!s(int64=3) %!d(string=hai) anos
pai
achega
6cbb5aa172

+ 1 - 1
Makefile

@@ -7,7 +7,7 @@ export EMQX_DEFAULT_RUNNER = debian:11-slim
 export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh)
 export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh)
 export EMQX_DASHBOARD_VERSION ?= v1.0.9
-export EMQX_EE_DASHBOARD_VERSION ?= e1.0.1-beta.4
+export EMQX_EE_DASHBOARD_VERSION ?= e1.0.1-beta.5
 export EMQX_REL_FORM ?= tgz
 export QUICER_DOWNLOAD_FROM_RELEASE = 1
 ifeq ($(OS),Windows_NT)

+ 36 - 11
apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl

@@ -23,6 +23,7 @@
 -include("emqx/include/emqx.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include("emqx_dashboard/include/emqx_dashboard.hrl").
 
 %% output functions
@@ -511,15 +512,15 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
     %% we now test if the bridge works as expected
     LocalTopic = <<"local_topic/1">>,
     RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
-    Payload = <<"hello">>,
+    Payload0 = <<"hello">>,
     emqx:subscribe(RemoteTopic),
     timer:sleep(100),
     %% PUBLISH a message to the 'local' broker, as we have only one broker,
     %% the remote broker is also the local one.
-    emqx:publish(emqx_message:make(LocalTopic, Payload)),
+    emqx:publish(emqx_message:make(LocalTopic, Payload0)),
 
     %% we should receive a message on the "remote" broker, with specified topic
-    assert_mqtt_msg_received(RemoteTopic, Payload),
+    assert_mqtt_msg_received(RemoteTopic, Payload0),
 
     %% verify the metrics of the bridge
     {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
@@ -540,20 +541,43 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
 
     %% stop the listener 1883 to make the bridge disconnected
     ok = emqx_listeners:stop_listener('tcp:default'),
+    ct:sleep(1500),
 
     %% PUBLISH 2 messages to the 'local' broker, the message should
-    emqx:publish(emqx_message:make(LocalTopic, Payload)),
-    emqx:publish(emqx_message:make(LocalTopic, Payload)),
+    ok = snabbkaffe:start_trace(),
+    {ok, SRef} =
+        snabbkaffe:subscribe(
+            fun
+                (
+                    #{
+                        ?snk_kind := call_query_enter,
+                        query := {query, _From, {send_message, #{}}, _Sent}
+                    }
+                ) ->
+                    true;
+                (_) ->
+                    false
+            end,
+            _NEvents = 2,
+            _Timeout = 1_000
+        ),
+    Payload1 = <<"hello2">>,
+    Payload2 = <<"hello3">>,
+    emqx:publish(emqx_message:make(LocalTopic, Payload1)),
+    emqx:publish(emqx_message:make(LocalTopic, Payload2)),
+    {ok, _} = snabbkaffe:receive_events(SRef),
+    ok = snabbkaffe:stop(),
 
     %% verify the metrics of the bridge, the message should be queued
     {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []),
+    %% matched >= 3 because of possible retries.
     ?assertMatch(
         #{
             <<"status">> := Status,
             <<"metrics">> := #{
-                <<"matched">> := 3, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2
+                <<"matched">> := Matched, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2
             }
-        } when Status == <<"connected">> orelse Status == <<"connecting">>,
+        } when Matched >= 3 andalso (Status == <<"connected">> orelse Status == <<"connecting">>),
         jsx:decode(BridgeStr1)
     ),
 
@@ -562,22 +586,23 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
     timer:sleep(1500),
     %% verify the metrics of the bridge, the 2 queued messages should have been sent
     {ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []),
+    %% matched >= 3 because of possible retries.
     ?assertMatch(
         #{
             <<"status">> := <<"connected">>,
             <<"metrics">> := #{
-                <<"matched">> := 3,
+                <<"matched">> := Matched,
                 <<"success">> := 3,
                 <<"failed">> := 0,
                 <<"queuing">> := 0,
                 <<"retried">> := _
             }
-        },
+        } when Matched >= 3,
         jsx:decode(BridgeStr2)
     ),
     %% also verify the 2 messages have been sent to the remote broker
-    assert_mqtt_msg_received(RemoteTopic, Payload),
-    assert_mqtt_msg_received(RemoteTopic, Payload),
+    assert_mqtt_msg_received(RemoteTopic, Payload1),
+    assert_mqtt_msg_received(RemoteTopic, Payload2),
     %% delete the bridge
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),

+ 2 - 1
apps/emqx_resource/src/emqx_resource.app.src

@@ -9,7 +9,8 @@
         stdlib,
         gproc,
         jsx,
-        emqx
+        emqx,
+        telemetry
     ]},
     {env, []},
     {modules, []},

+ 9 - 0
apps/emqx_resource/src/emqx_resource_app.erl

@@ -23,9 +23,18 @@
 -export([start/2, stop/1]).
 
 start(_StartType, _StartArgs) ->
+    %% since the handler is generic and executed in the process
+    %% emitting the event, we need to install only a single handler
+    %% for the whole app.
+    TelemetryHandlerID = telemetry_handler_id(),
+    ok = emqx_resource_metrics:install_telemetry_handler(TelemetryHandlerID),
     emqx_resource_sup:start_link().
 
 stop(_State) ->
+    TelemetryHandlerID = telemetry_handler_id(),
+    ok = emqx_resource_metrics:uninstall_telemetry_handler(TelemetryHandlerID),
     ok.
 
 %% internal functions
+telemetry_handler_id() ->
+    <<"emqx-resource-app-telemetry-handler">>.

+ 4 - 1
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -507,13 +507,16 @@ start_resource(Data, From) ->
 
 stop_resource(#data{state = undefined, id = ResId} = _Data) ->
     _ = maybe_clear_alarm(ResId),
+    ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId),
     ok;
 stop_resource(Data) ->
     %% We don't care the return value of the Mod:on_stop/2.
     %% The callback mod should make sure the resource is stopped after on_stop/2
     %% is returned.
+    ResId = Data#data.id,
     _ = emqx_resource:call_stop(Data#data.manager_id, Data#data.mod, Data#data.state),
-    _ = maybe_clear_alarm(Data#data.id),
+    _ = maybe_clear_alarm(ResId),
+    ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId),
     ok.
 
 make_test_id() ->

+ 319 - 0
apps/emqx_resource/src/emqx_resource_metrics.erl

@@ -0,0 +1,319 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_resource_metrics).
+
+-export([
+    events/0,
+    install_telemetry_handler/1,
+    uninstall_telemetry_handler/1,
+    handle_telemetry_event/4
+]).
+
+-export([
+    batching_change/2,
+    batching_get/1,
+    inflight_change/2,
+    inflight_get/1,
+    queuing_change/2,
+    queuing_get/1,
+    dropped_inc/1,
+    dropped_inc/2,
+    dropped_get/1,
+    dropped_other_inc/1,
+    dropped_other_inc/2,
+    dropped_other_get/1,
+    dropped_queue_full_inc/1,
+    dropped_queue_full_inc/2,
+    dropped_queue_full_get/1,
+    dropped_queue_not_enabled_inc/1,
+    dropped_queue_not_enabled_inc/2,
+    dropped_queue_not_enabled_get/1,
+    dropped_resource_not_found_inc/1,
+    dropped_resource_not_found_inc/2,
+    dropped_resource_not_found_get/1,
+    dropped_resource_stopped_inc/1,
+    dropped_resource_stopped_inc/2,
+    dropped_resource_stopped_get/1,
+    failed_inc/1,
+    failed_inc/2,
+    failed_get/1,
+    matched_inc/1,
+    matched_inc/2,
+    matched_get/1,
+    retried_inc/1,
+    retried_inc/2,
+    retried_get/1,
+    retried_failed_inc/1,
+    retried_failed_inc/2,
+    retried_failed_get/1,
+    retried_success_inc/1,
+    retried_success_inc/2,
+    retried_success_get/1,
+    success_inc/1,
+    success_inc/2,
+    success_get/1
+]).
+
+-define(RES_METRICS, resource_metrics).
+-define(TELEMETRY_PREFIX, emqx, resource).
+
+-spec events() -> [telemetry:event_name()].
+events() ->
+    [
+        [?TELEMETRY_PREFIX, Event]
+     || Event <- [
+            batching,
+            dropped_other,
+            dropped_queue_full,
+            dropped_queue_not_enabled,
+            dropped_resource_not_found,
+            dropped_resource_stopped,
+            failed,
+            inflight,
+            matched,
+            queuing,
+            retried_failed,
+            retried_success,
+            success
+        ]
+    ].
+
+-spec install_telemetry_handler(binary()) -> ok.
+install_telemetry_handler(HandlerID) ->
+    _ = telemetry:attach_many(
+        HandlerID,
+        events(),
+        fun ?MODULE:handle_telemetry_event/4,
+        _HandlerConfig = #{}
+    ),
+    ok.
+
+-spec uninstall_telemetry_handler(binary()) -> ok.
+uninstall_telemetry_handler(HandlerID) ->
+    _ = telemetry:detach(HandlerID),
+    ok.
+
+handle_telemetry_event(
+    [?TELEMETRY_PREFIX, Event],
+    _Measurements = #{counter_inc := Val},
+    _Metadata = #{resource_id := ID},
+    _HandlerConfig
+) ->
+    case Event of
+        batching ->
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'batching', Val);
+        dropped_other ->
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val);
+        dropped_queue_full ->
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full', Val);
+        dropped_queue_not_enabled ->
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled', Val);
+        dropped_resource_not_found ->
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found', Val);
+        dropped_resource_stopped ->
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val);
+        failed ->
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val);
+        inflight ->
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'inflight', Val);
+        matched ->
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val);
+        queuing ->
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'queuing', Val);
+        retried_failed ->
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val),
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val),
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.failed', Val);
+        retried_success ->
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val),
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val),
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried.success', Val);
+        success ->
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val);
+        _ ->
+            ok
+    end;
+handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) ->
+    ok.
+
+%% Gauges (value can go both up and down):
+%% --------------------------------------
+
+%% @doc Count of messages that are currently accumulated in memory waiting for
+%% being sent in one batch
+batching_change(ID, Val) ->
+    telemetry:execute([?TELEMETRY_PREFIX, batching], #{counter_inc => Val}, #{resource_id => ID}).
+
+batching_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'batching').
+
+%% @doc Count of messages that are currently queuing. [Gauge]
+queuing_change(ID, Val) ->
+    telemetry:execute([?TELEMETRY_PREFIX, queuing], #{counter_inc => Val}, #{resource_id => ID}).
+
+queuing_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'queuing').
+
+%% @doc Count of messages that were sent asynchronously but ACKs are not
+%% received. [Gauge]
+inflight_change(ID, Val) ->
+    telemetry:execute([?TELEMETRY_PREFIX, inflight], #{counter_inc => Val}, #{resource_id => ID}).
+
+inflight_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'inflight').
+
+%% Counters (value can only got up):
+%% --------------------------------------
+
+%% @doc Count of messages dropped
+dropped_inc(ID) ->
+    dropped_inc(ID, 1).
+
+dropped_inc(ID, Val) ->
+    telemetry:execute([?TELEMETRY_PREFIX, dropped], #{counter_inc => Val}, #{resource_id => ID}).
+
+dropped_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped').
+
+%% @doc Count of messages dropped due to other reasons
+dropped_other_inc(ID) ->
+    dropped_other_inc(ID, 1).
+
+dropped_other_inc(ID, Val) ->
+    telemetry:execute([?TELEMETRY_PREFIX, dropped_other], #{counter_inc => Val}, #{
+        resource_id => ID
+    }).
+
+dropped_other_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.other').
+
+%% @doc Count of messages dropped because the queue was full
+dropped_queue_full_inc(ID) ->
+    dropped_queue_full_inc(ID, 1).
+
+dropped_queue_full_inc(ID, Val) ->
+    telemetry:execute([?TELEMETRY_PREFIX, dropped_queue_full], #{counter_inc => Val}, #{
+        resource_id => ID
+    }).
+
+dropped_queue_full_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_full').
+
+%% @doc Count of messages dropped because the queue was not enabled
+dropped_queue_not_enabled_inc(ID) ->
+    dropped_queue_not_enabled_inc(ID, 1).
+
+dropped_queue_not_enabled_inc(ID, Val) ->
+    telemetry:execute([?TELEMETRY_PREFIX, dropped_queue_not_enabled], #{counter_inc => Val}, #{
+        resource_id => ID
+    }).
+
+dropped_queue_not_enabled_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_not_enabled').
+
+%% @doc Count of messages dropped because the resource was not found
+dropped_resource_not_found_inc(ID) ->
+    dropped_resource_not_found_inc(ID, 1).
+
+dropped_resource_not_found_inc(ID, Val) ->
+    telemetry:execute([?TELEMETRY_PREFIX, dropped_resource_not_found], #{counter_inc => Val}, #{
+        resource_id => ID
+    }).
+
+dropped_resource_not_found_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.resource_not_found').
+
+%% @doc Count of messages dropped because the resource was stopped
+dropped_resource_stopped_inc(ID) ->
+    dropped_resource_stopped_inc(ID, 1).
+
+dropped_resource_stopped_inc(ID, Val) ->
+    telemetry:execute([?TELEMETRY_PREFIX, dropped_resource_stopped], #{counter_inc => Val}, #{
+        resource_id => ID
+    }).
+
+dropped_resource_stopped_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.resource_stopped').
+
+%% @doc Count of how many times this bridge has been matched and queried
+matched_inc(ID) ->
+    matched_inc(ID, 1).
+
+matched_inc(ID, Val) ->
+    telemetry:execute([?TELEMETRY_PREFIX, matched], #{counter_inc => Val}, #{resource_id => ID}).
+
+matched_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'matched').
+
+%% @doc The number of times message sends have been retried
+retried_inc(ID) ->
+    retried_inc(ID, 1).
+
+retried_inc(ID, Val) ->
+    telemetry:execute([?TELEMETRY_PREFIX, retried], #{counter_inc => Val}, #{resource_id => ID}).
+
+retried_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'retried').
+
+%% @doc Count of message sends that have failed
+failed_inc(ID) ->
+    failed_inc(ID, 1).
+
+failed_inc(ID, Val) ->
+    telemetry:execute([?TELEMETRY_PREFIX, failed], #{counter_inc => Val}, #{resource_id => ID}).
+
+failed_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'failed').
+
+%%% @doc Count of message sends that have failed after having been retried
+retried_failed_inc(ID) ->
+    retried_failed_inc(ID, 1).
+
+retried_failed_inc(ID, Val) ->
+    telemetry:execute([?TELEMETRY_PREFIX, retried_failed], #{counter_inc => Val}, #{
+        resource_id => ID
+    }).
+
+retried_failed_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'retried.failed').
+
+%% @doc Count messages that were sucessfully sent after at least one retry
+retried_success_inc(ID) ->
+    retried_success_inc(ID, 1).
+
+retried_success_inc(ID, Val) ->
+    telemetry:execute([?TELEMETRY_PREFIX, retried_success], #{counter_inc => Val}, #{
+        resource_id => ID
+    }).
+
+retried_success_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'retried.success').
+
+%% @doc Count of messages that have been sent successfully
+success_inc(ID) ->
+    success_inc(ID, 1).
+
+success_inc(ID, Val) ->
+    telemetry:execute([?TELEMETRY_PREFIX, success], #{counter_inc => Val}, #{resource_id => ID}).
+
+success_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'success').

+ 28 - 37
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -80,27 +80,23 @@ start_link(Id, Index, Opts) ->
 sync_query(Id, Request, Opts) ->
     PickKey = maps:get(pick_key, Opts, self()),
     Timeout = maps:get(timeout, Opts, infinity),
-    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
     pick_call(Id, PickKey, {query, Request, Opts}, Timeout).
 
 -spec async_query(id(), request(), query_opts()) -> Result :: term().
 async_query(Id, Request, Opts) ->
     PickKey = maps:get(pick_key, Opts, self()),
-    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
     pick_cast(Id, PickKey, {query, Request, Opts}).
 
 %% simple query the resource without batching and queuing messages.
 -spec simple_sync_query(id(), request()) -> Result :: term().
 simple_sync_query(Id, Request) ->
     Result = call_query(sync, Id, ?QUERY(self(), Request, false), #{}),
-    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
     _ = handle_query_result(Id, Result, false, false),
     Result.
 
 -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term().
 simple_async_query(Id, Request, ReplyFun) ->
     Result = call_query(async, Id, ?QUERY(ReplyFun, Request, false), #{}),
-    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
     _ = handle_query_result(Id, Result, false, false),
     Result.
 
@@ -134,7 +130,7 @@ init({Id, Index, Opts}) ->
             false ->
                 undefined
         end,
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', queue_count(Queue)),
+    emqx_resource_metrics:queuing_change(Id, queue_count(Queue)),
     InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
     ok = inflight_new(Name, InfltWinSZ),
     HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
@@ -297,7 +293,7 @@ retry_inflight_sync(
 
 query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) ->
     Acc1 = [?QUERY(From, Request, false) | Acc],
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching'),
+    emqx_resource_metrics:batching_change(Id, 1),
     St = St0#{acc := Acc1, acc_left := Left - 1},
     case Left =< 1 of
         true -> flush(St);
@@ -330,7 +326,7 @@ flush(
     QueryOpts = #{
         inflight_name => maps:get(name, St)
     },
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching', -length(Batch)),
+    emqx_resource_metrics:batching_change(Id, -length(Batch)),
     Result = call_query(configured, Id, Batch, QueryOpts),
     St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}),
     case batch_reply_caller(Id, Result, Batch) of
@@ -380,18 +376,15 @@ handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasSent, _) when
     true;
 handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasSent, BlockWorker) ->
     ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_not_found'),
+    emqx_resource_metrics:dropped_resource_not_found_inc(Id),
     BlockWorker;
 handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasSent, BlockWorker) ->
     ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_stopped'),
+    emqx_resource_metrics:dropped_resource_stopped_inc(Id),
     BlockWorker;
 handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), _HasSent, BlockWorker) ->
     ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'),
+    emqx_resource_metrics:dropped_other_inc(Id),
     BlockWorker;
 handle_query_result(Id, {error, {recoverable_error, Reason}}, _HasSent, _BlockWorker) ->
     %% the message will be queued in replayq or inflight window,
@@ -417,6 +410,7 @@ handle_query_result(Id, Result, HasSent, BlockWorker) ->
     BlockWorker.
 
 call_query(QM0, Id, Query, QueryOpts) ->
+    ?tp(call_query_enter, #{id => Id, query => Query}),
     case emqx_resource_manager:ets_lookup(Id) of
         {ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} ->
             QM =
@@ -425,10 +419,13 @@ call_query(QM0, Id, Query, QueryOpts) ->
                     _ -> QM0
                 end,
             CM = maps:get(callback_mode, Data),
+            emqx_resource_metrics:matched_inc(Id),
             apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt, QueryOpts);
         {ok, _Group, #{status := stopped}} ->
+            emqx_resource_metrics:matched_inc(Id),
             ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
         {ok, _Group, #{status := S}} when S == connecting; S == disconnected ->
+            emqx_resource_metrics:matched_inc(Id),
             ?RESOURCE_ERROR(not_connected, "resource not connected");
         {error, not_found} ->
             ?RESOURCE_ERROR(not_found, "resource not found")
@@ -464,7 +461,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts)
             true ->
                 {async_return, inflight_full};
             false ->
-                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight'),
+                ok = emqx_resource_metrics:inflight_change(Id, 1),
                 ReplyFun = fun ?MODULE:reply_after_query/6,
                 Ref = make_message_ref(),
                 Args = [self(), Id, Name, Ref, Query],
@@ -488,7 +485,7 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts)
                 {async_return, inflight_full};
             false ->
                 BatchLen = length(Batch),
-                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', BatchLen),
+                ok = emqx_resource_metrics:inflight_change(Id, BatchLen),
                 ReplyFun = fun ?MODULE:batch_reply_after_query/6,
                 Ref = make_message_ref(),
                 Args = {ReplyFun, [self(), Id, Name, Ref, Batch]},
@@ -503,12 +500,12 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts)
 reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request, HasSent), Result) ->
     %% NOTE: 'inflight' is message count that sent async but no ACK received,
     %%        NOT the message number ququed in the inflight window.
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', -1),
+    emqx_resource_metrics:inflight_change(Id, -1),
     case reply_caller(Id, ?REPLY(From, Request, HasSent, Result)) of
         true ->
             %% we marked these messages are 'queuing' although they are actually
             %% keeped in inflight window, not replayq
-            emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'),
+            emqx_resource_metrics:queuing_change(Id, 1),
             ?MODULE:block(Pid);
         false ->
             drop_inflight_and_resume(Pid, Name, Ref)
@@ -518,12 +515,12 @@ batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) ->
     %% NOTE: 'inflight' is message count that sent async but no ACK received,
     %%        NOT the message number ququed in the inflight window.
     BatchLen = length(Batch),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', -BatchLen),
+    emqx_resource_metrics:inflight_change(Id, -BatchLen),
     case batch_reply_caller(Id, Result, Batch) of
         true ->
             %% we marked these messages are 'queuing' although they are actually
-            %% keeped in inflight window, not replayq
-            emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', BatchLen),
+            %% kept in inflight window, not replayq
+            emqx_resource_metrics:queuing_change(Id, BatchLen),
             ?MODULE:block(Pid);
         false ->
             drop_inflight_and_resume(Pid, Name, Ref)
@@ -549,8 +546,7 @@ estimate_size(QItem) ->
     size(queue_item_marshaller(QItem)).
 
 maybe_append_queue(Id, undefined, _Items) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_not_enabled'),
+    emqx_resource_metrics:dropped_queue_not_enabled_inc(Id),
     undefined;
 maybe_append_queue(Id, Q, Items) ->
     Q2 =
@@ -562,13 +558,12 @@ maybe_append_queue(Id, Q, Items) ->
                 {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts),
                 ok = replayq:ack(Q1, QAckRef),
                 Dropped = length(Items2),
-                emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -Dropped),
-                emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
-                emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'),
+                emqx_resource_metrics:queuing_change(Id, -Dropped),
+                emqx_resource_metrics:dropped_queue_full_inc(Id),
                 ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}),
                 Q1
         end,
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'),
+    emqx_resource_metrics:queuing_change(Id, 1),
     replayq:append(Q2, Items).
 
 get_first_n_from_queue(Q, N) ->
@@ -590,7 +585,7 @@ drop_first_n_from_queue(Q, N, Id) when N > 0 ->
 drop_head(Q, Id) ->
     {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}),
     ok = replayq:ack(Q1, AckRef),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -1),
+    emqx_resource_metrics:queuing_change(Id, -1),
     Q1.
 
 %%==============================================================================
@@ -644,19 +639,15 @@ inflight_drop(Name, Ref) ->
 
 %%==============================================================================
 
-inc_sent_failed(Id, true) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried.failed');
+inc_sent_failed(Id, _HasSent = true) ->
+    emqx_resource_metrics:retried_failed_inc(Id);
 inc_sent_failed(Id, _HasSent) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed').
+    emqx_resource_metrics:failed_inc(Id).
 
-inc_sent_success(Id, true) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'success'),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried.success');
+inc_sent_success(Id, _HasSent = true) ->
+    emqx_resource_metrics:retried_success_inc(Id);
 inc_sent_success(Id, _HasSent) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'success').
+    emqx_resource_metrics:success_inc(Id).
 
 call_mode(sync, _) -> sync;
 call_mode(async, always_sync) -> sync;

+ 22 - 4
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -100,6 +100,15 @@ on_query(_InstId, {inc_counter, N}, #{pid := Pid}) ->
     after 1000 ->
         {error, timeout}
     end;
+on_query(_InstId, get_incorrect_status_count, #{pid := Pid}) ->
+    ReqRef = make_ref(),
+    From = {self(), ReqRef},
+    Pid ! {From, get_incorrect_status_count},
+    receive
+        {ReqRef, Count} -> {ok, Count}
+    after 1000 ->
+        {error, timeout}
+    end;
 on_query(_InstId, get_counter, #{pid := Pid}) ->
     ReqRef = make_ref(),
     From = {self(), ReqRef},
@@ -157,9 +166,15 @@ spawn_counter_process(Name, Register) ->
     Pid.
 
 counter_loop() ->
-    counter_loop(#{counter => 0, status => running}).
-
-counter_loop(#{counter := Num, status := Status} = State) ->
+    counter_loop(#{counter => 0, status => running, incorrect_status_count => 0}).
+
+counter_loop(
+    #{
+        counter := Num,
+        status := Status,
+        incorrect_status_count := IncorrectCount
+    } = State
+) ->
     NewState =
         receive
             block ->
@@ -179,10 +194,13 @@ counter_loop(#{counter := Num, status := Status} = State) ->
                 State#{counter => Num + N};
             {{FromPid, ReqRef}, {inc, _N}} when Status == blocked ->
                 FromPid ! {ReqRef, incorrect_status},
-                State;
+                State#{incorrect_status_count := IncorrectCount + 1};
             {get, ReplyFun} ->
                 apply_reply(ReplyFun, Num),
                 State;
+            {{FromPid, ReqRef}, get_incorrect_status_count} ->
+                FromPid ! {ReqRef, IncorrectCount},
+                State;
             {{FromPid, ReqRef}, get} ->
                 FromPid ! {ReqRef, Num},
                 State

+ 26 - 3
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -420,10 +420,18 @@ t_query_counter_async_inflight(_) ->
 
     {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
     ct:pal("metrics: ~p", [C]),
+    {ok, IncorrectStatusCount} = emqx_resource:simple_sync_query(?ID, get_incorrect_status_count),
+    %% The `simple_sync_query' we just did also increases the matched
+    %% count, hence the + 1.
+    ExtraSimpleCallCount = IncorrectStatusCount + 1,
     ?assertMatch(
         #{matched := M, success := Ss, dropped := Dp, 'retried.success' := Rs} when
-            M == Ss + Dp - Rs,
-        C
+            M == Ss + Dp - Rs + ExtraSimpleCallCount,
+        C,
+        #{
+            metrics => C,
+            extra_simple_call_count => ExtraSimpleCallCount
+        }
     ),
     ?assert(
         lists:all(
@@ -494,6 +502,10 @@ t_stop_start(_) ->
         #{<<"name">> => <<"test_resource">>}
     ),
 
+    %% add some metrics to test their persistence
+    emqx_resource_metrics:batching_change(?ID, 5),
+    ?assertEqual(5, emqx_resource_metrics:batching_get(?ID)),
+
     {ok, _} = emqx_resource:check_and_recreate(
         ?ID,
         ?TEST_RESOURCE,
@@ -505,6 +517,9 @@ t_stop_start(_) ->
 
     ?assert(is_process_alive(Pid0)),
 
+    %% metrics are reset when recreating
+    ?assertEqual(0, emqx_resource_metrics:batching_get(?ID)),
+
     ok = emqx_resource:stop(?ID),
 
     ?assertNot(is_process_alive(Pid0)),
@@ -519,7 +534,15 @@ t_stop_start(_) ->
 
     {ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state),
 
-    ?assert(is_process_alive(Pid1)).
+    ?assert(is_process_alive(Pid1)),
+
+    %% now stop while resetting the metrics
+    emqx_resource_metrics:batching_change(?ID, 5),
+    ?assertEqual(5, emqx_resource_metrics:batching_get(?ID)),
+    ok = emqx_resource:stop(?ID),
+    ?assertEqual(0, emqx_resource_metrics:batching_get(?ID)),
+
+    ok.
 
 t_stop_start_local(_) ->
     {error, _} = emqx_resource:check_and_create_local(

+ 1 - 1
lib-ee/emqx_ee_bridge/rebar.config

@@ -1,6 +1,6 @@
 {erl_opts, [debug_info]}.
 {deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}}
-       , {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.6.4"}}}
+       , {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.0"}}}
        , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.0"}}}
        , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}}
        , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.4"}}}

+ 2 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src

@@ -4,7 +4,8 @@
     {applications, [
         kernel,
         stdlib,
-        emqx_ee_connector
+        emqx_ee_connector,
+        telemetry
     ]},
     {env, []},
     {modules, []},

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl

@@ -145,7 +145,7 @@ fields(producer_opts) ->
             })}
     ];
 fields(producer_mqtt_opts) ->
-    [{topic, mk(string(), #{desc => ?DESC(mqtt_topic)})}];
+    [{topic, mk(binary(), #{desc => ?DESC(mqtt_topic)})}];
 fields(producer_kafka_opts) ->
     [
         {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})},

+ 113 - 5
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl

@@ -12,7 +12,10 @@
     on_get_status/2
 ]).
 
--export([on_kafka_ack/3]).
+-export([
+    on_kafka_ack/3,
+    handle_telemetry_event/4
+]).
 
 -include_lib("emqx/include/logger.hrl").
 
@@ -30,6 +33,7 @@ on_start(InstId, Config) ->
         authentication := Auth,
         ssl := SSL
     } = Config,
+    _ = maybe_install_wolff_telemetry_handlers(InstId),
     %% it's a bug if producer config is not found
     %% the caller should not try to start a producer if
     %% there is no producer config
@@ -85,20 +89,27 @@ on_start(InstId, Config) ->
             throw(failed_to_start_kafka_producer)
     end.
 
-on_stop(_InstId, #{client_id := ClientID, producers := Producers}) ->
-    with_log_at_error(
+on_stop(InstanceID, #{client_id := ClientID, producers := Producers}) ->
+    _ = with_log_at_error(
         fun() -> wolff:stop_and_delete_supervised_producers(Producers) end,
         #{
             msg => "failed_to_delete_kafka_producer",
             client_id => ClientID
         }
     ),
-    with_log_at_error(
+    _ = with_log_at_error(
         fun() -> wolff:stop_and_delete_supervised_client(ClientID) end,
         #{
             msg => "failed_to_delete_kafka_client",
             client_id => ClientID
         }
+    ),
+    with_log_at_error(
+        fun() -> uninstall_telemetry_handlers(InstanceID) end,
+        #{
+            msg => "failed_to_uninstall_telemetry_handlers",
+            client_id => ClientID
+        }
     ).
 
 %% @doc The callback API for rule-engine (or bridge without rules)
@@ -222,6 +233,9 @@ producers_config(BridgeName, ClientId, Input) ->
             disk -> {false, replayq_dir(ClientId)};
             hybrid -> {true, replayq_dir(ClientId)}
         end,
+    %% TODO: change this once we add kafka source
+    BridgeType = kafka,
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
     #{
         name => make_producer_name(BridgeName),
         partitioner => PartitionStrategy,
@@ -234,7 +248,8 @@ producers_config(BridgeName, ClientId, Input) ->
         required_acks => RequiredAcks,
         max_batch_bytes => MaxBatchBytes,
         max_send_ahead => MaxInflight - 1,
-        compression => Compression
+        compression => Compression,
+        telemetry_meta_data => #{bridge_id => ResourceID}
     }.
 
 replayq_dir(ClientId) ->
@@ -268,3 +283,96 @@ get_required(Field, Config, Throw) ->
     Value = maps:get(Field, Config, none),
     Value =:= none andalso throw(Throw),
     Value.
+
+handle_telemetry_event(
+    [wolff, dropped],
+    #{counter_inc := Val},
+    #{bridge_id := ID},
+    _HandlerConfig
+) when is_integer(Val) ->
+    emqx_resource_metrics:dropped_inc(ID, Val);
+handle_telemetry_event(
+    [wolff, dropped_queue_full],
+    #{counter_inc := Val},
+    #{bridge_id := ID},
+    _HandlerConfig
+) when is_integer(Val) ->
+    emqx_resource_metrics:dropped_queue_full_inc(ID, Val);
+handle_telemetry_event(
+    [wolff, queuing],
+    #{counter_inc := Val},
+    #{bridge_id := ID},
+    _HandlerConfig
+) when is_integer(Val) ->
+    emqx_resource_metrics:queuing_change(ID, Val);
+handle_telemetry_event(
+    [wolff, retried],
+    #{counter_inc := Val},
+    #{bridge_id := ID},
+    _HandlerConfig
+) when is_integer(Val) ->
+    emqx_resource_metrics:retried_inc(ID, Val);
+handle_telemetry_event(
+    [wolff, failed],
+    #{counter_inc := Val},
+    #{bridge_id := ID},
+    _HandlerConfig
+) when is_integer(Val) ->
+    emqx_resource_metrics:failed_inc(ID, Val);
+handle_telemetry_event(
+    [wolff, inflight],
+    #{counter_inc := Val},
+    #{bridge_id := ID},
+    _HandlerConfig
+) when is_integer(Val) ->
+    emqx_resource_metrics:inflight_change(ID, Val);
+handle_telemetry_event(
+    [wolff, retried_failed],
+    #{counter_inc := Val},
+    #{bridge_id := ID},
+    _HandlerConfig
+) when is_integer(Val) ->
+    emqx_resource_metrics:retried_failed_inc(ID, Val);
+handle_telemetry_event(
+    [wolff, retried_success],
+    #{counter_inc := Val},
+    #{bridge_id := ID},
+    _HandlerConfig
+) when is_integer(Val) ->
+    emqx_resource_metrics:retried_success_inc(ID, Val);
+handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) ->
+    %% Event that we do not handle
+    ok.
+
+-spec telemetry_handler_id(emqx_resource:resource_id()) -> binary().
+telemetry_handler_id(InstanceID) ->
+    <<"emqx-bridge-kafka-producer-", InstanceID/binary>>.
+
+uninstall_telemetry_handlers(InstanceID) ->
+    HandlerID = telemetry_handler_id(InstanceID),
+    telemetry:detach(HandlerID).
+
+maybe_install_wolff_telemetry_handlers(InstanceID) ->
+    %% Attach event handlers for Kafka telemetry events. If a handler with the
+    %% handler id already exists, the attach_many function does nothing
+    telemetry:attach_many(
+        %% unique handler id
+        telemetry_handler_id(InstanceID),
+        %% Note: we don't handle `[wolff, success]' because,
+        %% currently, we already increment the success counter for
+        %% this resource at `emqx_rule_runtime:handle_action' when
+        %% the response is `ok' and we would double increment it
+        %% here.
+        [
+            [wolff, dropped],
+            [wolff, dropped_queue_full],
+            [wolff, queuing],
+            [wolff, retried],
+            [wolff, failed],
+            [wolff, inflight],
+            [wolff, retried_failed],
+            [wolff, retried_success]
+        ],
+        fun ?MODULE:handle_telemetry_event/4,
+        []
+    ).

+ 84 - 14
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl

@@ -184,10 +184,6 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) ->
             true -> kafka_hosts_string_ssl();
             false -> kafka_hosts_string()
         end,
-    kafka_bridge_rest_api_helper(#{
-        <<"bootstrap_hosts">> => NormalHostsString,
-        <<"authentication">> => <<"none">>
-    }),
     SASLHostsString =
         case UseSSL of
             true -> kafka_hosts_string_ssl_sasl();
@@ -204,6 +200,15 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) ->
             true -> #{<<"ssl">> => BinifyMap(valid_ssl_settings())};
             false -> #{}
         end,
+    kafka_bridge_rest_api_helper(
+        maps:merge(
+            #{
+                <<"bootstrap_hosts">> => NormalHostsString,
+                <<"authentication">> => <<"none">>
+            },
+            SSLSettings
+        )
+    ),
     kafka_bridge_rest_api_helper(
         maps:merge(
             #{
@@ -243,10 +248,20 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) ->
     ok.
 
 kafka_bridge_rest_api_helper(Config) ->
+    BridgeType = "kafka",
+    BridgeName = "my_kafka_bridge",
+    BridgeID = emqx_bridge_resource:bridge_id(
+        erlang:list_to_binary(BridgeType),
+        erlang:list_to_binary(BridgeName)
+    ),
+    ResourceId = emqx_bridge_resource:resource_id(
+        erlang:list_to_binary(BridgeType),
+        erlang:list_to_binary(BridgeName)
+    ),
     UrlEscColon = "%3A",
-    BridgeIdUrlEnc = "kafka" ++ UrlEscColon ++ "my_kafka_bridge",
+    BridgeIdUrlEnc = BridgeType ++ UrlEscColon ++ BridgeName,
     BridgesParts = ["bridges"],
-    BridgesPartsId = ["bridges", BridgeIdUrlEnc],
+    BridgesPartsIdDeleteAlsoActions = ["bridges", BridgeIdUrlEnc ++ "?also_delete_dep_actions"],
     OpUrlFun = fun(OpName) -> ["bridges", BridgeIdUrlEnc, "operation", OpName] end,
     BridgesPartsOpDisable = OpUrlFun("disable"),
     BridgesPartsOpEnable = OpUrlFun("enable"),
@@ -268,15 +283,13 @@ kafka_bridge_rest_api_helper(Config) ->
     case MyKafkaBridgeExists() of
         true ->
             %% Delete the bridge my_kafka_bridge
-            show(
-                '========================================== DELETE ========================================'
-            ),
-            {ok, 204, <<>>} = show(http_delete(BridgesPartsId));
+            {ok, 204, <<>>} = show(http_delete(BridgesPartsIdDeleteAlsoActions));
         false ->
             ok
     end,
     false = MyKafkaBridgeExists(),
     %% Create new Kafka bridge
+    KafkaTopic = "test-topic-one-partition",
     CreateBodyTmp = #{
         <<"type">> => <<"kafka">>,
         <<"name">> => <<"my_kafka_bridge">>,
@@ -288,7 +301,7 @@ kafka_bridge_rest_api_helper(Config) ->
                 topic => <<"t/#">>
             },
             <<"kafka">> => #{
-                <<"topic">> => <<"test-topic-one-partition">>
+                <<"topic">> => erlang:list_to_binary(KafkaTopic)
             }
         }
     },
@@ -300,6 +313,59 @@ kafka_bridge_rest_api_helper(Config) ->
     {ok, 201, _Data} = show(http_post(BridgesParts, show(CreateBody))),
     %% Check that the new bridge is in the list of bridges
     true = MyKafkaBridgeExists(),
+    %% Create a rule that uses the bridge
+    {ok, 201, _Rule} = http_post(
+        ["rules"],
+        #{
+            <<"name">> => <<"kafka_bridge_rest_api_helper_rule">>,
+            <<"enable">> => true,
+            <<"actions">> => [BridgeID],
+            <<"sql">> => <<"SELECT * from \"kafka_bridge_topic/#\"">>
+        }
+    ),
+    %% counters should be empty before
+    ?assertEqual(0, emqx_resource_metrics:matched_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:success_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:dropped_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:failed_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:inflight_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:batching_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:queuing_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:dropped_other_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:dropped_queue_not_enabled_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:retried_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:retried_failed_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:retried_success_get(ResourceId)),
+    %% Get offset before sending message
+    {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
+    %% Send message to topic and check that it got forwarded to Kafka
+    Body = <<"message from EMQX">>,
+    emqx:publish(emqx_message:make(<<"kafka_bridge_topic/1">>, Body)),
+    %% Give Kafka some time to get message
+    timer:sleep(100),
+    %% Check that Kafka got message
+    BrodOut = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
+    {ok, {_, [KafkaMsg]}} = show(BrodOut),
+    Body = KafkaMsg#kafka_message.value,
+    %% Check crucial counters and gauges
+    ?assertEqual(1, emqx_resource_metrics:matched_get(ResourceId)),
+    ?assertEqual(1, emqx_resource_metrics:success_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:dropped_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:failed_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:inflight_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:batching_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:queuing_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:dropped_other_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:dropped_queue_not_enabled_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:retried_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:retried_failed_get(ResourceId)),
+    ?assertEqual(0, emqx_resource_metrics:retried_success_get(ResourceId)),
     %% Perform operations
     {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})),
     {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})),
@@ -309,7 +375,7 @@ kafka_bridge_rest_api_helper(Config) ->
     {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})),
     {ok, 200, _} = show(http_post(show(BridgesPartsOpRestart), #{})),
     %% Cleanup
-    {ok, 204, _} = show(http_delete(BridgesPartsId)),
+    {ok, 204, _} = show(http_delete(BridgesPartsIdDeleteAlsoActions)),
     false = MyKafkaBridgeExists(),
     ok.
 
@@ -325,7 +391,8 @@ publish_with_and_without_ssl(AuthSettings) ->
     publish_helper(#{
         auth_settings => AuthSettings,
         ssl_settings => valid_ssl_settings()
-    }).
+    }),
+    ok.
 
 publish_helper(#{
     auth_settings := AuthSettings,
@@ -345,6 +412,7 @@ publish_helper(#{
     Hash = erlang:phash2([HostsString, AuthSettings, SSLSettings]),
     Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
     InstId = emqx_bridge_resource:resource_id("kafka", Name),
+    BridgeId = emqx_bridge_resource:bridge_id("kafka", Name),
     KafkaTopic = "test-topic-one-partition",
     Conf = config(#{
         "authentication" => AuthSettings,
@@ -353,6 +421,7 @@ publish_helper(#{
         "instance_id" => InstId,
         "ssl" => SSLSettings
     }),
+    emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), Conf, #{}),
     %% To make sure we get unique value
     timer:sleep(1),
     Time = erlang:monotonic_time(),
@@ -371,6 +440,7 @@ publish_helper(#{
     {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
     ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
     ok = ?PRODUCER:on_stop(InstId, State),
+    ok = emqx_bridge_resource:remove(BridgeId),
     ok.
 
 config(Args) ->
@@ -407,7 +477,7 @@ hocon_config_template() ->
 """
 bootstrap_hosts = \"{{ kafka_hosts_string }}\"
 enable = true
-authentication = {{{ authentication }}} 
+authentication = {{{ authentication }}}
 ssl = {{{ ssl }}}
 producer = {
     mqtt {

+ 3 - 1
mix.exs

@@ -63,6 +63,7 @@ defmodule EMQXUmbrella.MixProject do
       {:rulesql, github: "emqx/rulesql", tag: "0.1.4"},
       {:observer_cli, "1.7.1"},
       {:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"},
+      {:telemetry, "1.1.0"},
       # in conflict by emqtt and hocon
       {:getopt, "1.0.2", override: true},
       {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.0", override: true},
@@ -130,7 +131,7 @@ defmodule EMQXUmbrella.MixProject do
     [
       {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"},
       {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.4", override: true},
-      {:wolff, github: "kafka4beam/wolff", tag: "1.6.4"},
+      {:wolff, github: "kafka4beam/wolff", tag: "1.7.0"},
       {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.0", override: true},
       {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"},
       {:brod, github: "kafka4beam/brod", tag: "3.16.4"},
@@ -207,6 +208,7 @@ defmodule EMQXUmbrella.MixProject do
       redbug: :permanent,
       xmerl: :permanent,
       hocon: :load,
+      telemetry: :permanent,
       emqx: :load,
       emqx_conf: :load,
       emqx_machine: :permanent

+ 1 - 0
rebar.config

@@ -71,6 +71,7 @@
     , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.1"}}}
     , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
     , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}
+    , {telemetry, "1.1.0"}
     ]}.
 
 {xref_ignores,

+ 1 - 0
rebar.config.erl

@@ -360,6 +360,7 @@ relx_apps(ReleaseType, Edition) ->
         redbug,
         xmerl,
         {hocon, load},
+        telemetry,
         % started by emqx_machine
         {emqx, load},
         {emqx_conf, load},