Pārlūkot izejas kodu

feat(pulsar producer): add telemetry support

Fixes https://emqx.atlassian.net/browse/EMQX-13365
Thales Macedo Garitezi 1 gadu atpakaļ
vecāks
revīzija
53dabf05d8

+ 1 - 1
apps/emqx_bridge_pulsar/mix.exs

@@ -25,7 +25,7 @@ defmodule EMQXBridgePulsar.MixProject do
     [
       UMP.common_dep(:crc32cer),
       UMP.common_dep(:snappyer),
-      {:pulsar, github: "emqx/pulsar-client-erl", tag: "0.8.6"},
+      {:pulsar, github: "emqx/pulsar-client-erl", tag: "1.0.0"},
       {:emqx_connector, in_umbrella: true, runtime: false},
       {:emqx_resource, in_umbrella: true},
       {:emqx_bridge, in_umbrella: true, runtime: false}

+ 1 - 1
apps/emqx_bridge_pulsar/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.6"}}},
+    {pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "1.0.0"}}},
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}},
     {emqx_bridge, {path, "../../apps/emqx_bridge"}}

+ 208 - 69
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl

@@ -26,6 +26,9 @@
     on_format_query_result/1
 ]).
 
+-export([on_pulsar_ack/2]).
+-export([handle_telemetry_event/4]).
+
 -type pulsar_client_id() :: atom().
 -type state() :: #{
     client_id := pulsar_client_id(),
@@ -51,6 +54,7 @@
 %% Allocatable resources
 -define(pulsar_client_id, pulsar_client_id).
 -define(pulsar_producers, pulsar_producers).
+-define(telemetry_handler_id, telemetry_handler_id).
 
 -define(HEALTH_CHECK_RETRY_TIMEOUT, 4_000).
 
@@ -71,12 +75,12 @@ query_opts(#{resource_opts := #{query_mode := sync}, parameters := #{sync_timeou
 query_opts(_) ->
     #{}.
 
--spec on_start(resource_id(), config()) -> {ok, state()}.
-on_start(InstanceId, Config) ->
+-spec on_start(connector_resource_id(), config()) -> {ok, state()}.
+on_start(ConnResId, Config) ->
     #{servers := Servers0, ssl := SSL} = Config,
     Servers = format_servers(Servers0),
-    ClientId = make_client_id(InstanceId),
-    ok = emqx_resource:allocate_resource(InstanceId, ?pulsar_client_id, ClientId),
+    ClientId = make_client_id(ConnResId),
+    ok = emqx_resource:allocate_resource(ConnResId, ?pulsar_client_id, ClientId),
     SSLOpts = emqx_tls_lib:to_client_opts(SSL),
     ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(10)),
     ClientOpts = #{
@@ -90,7 +94,7 @@ on_start(InstanceId, Config) ->
                 info,
                 "pulsar_client_started",
                 #{
-                    instance_id => InstanceId,
+                    instance_id => ConnResId,
                     pulsar_hosts => Servers
                 }
             );
@@ -98,7 +102,7 @@ on_start(InstanceId, Config) ->
             RedactedReason = emqx_utils:redact(Reason, fun is_sensitive_key/1),
             ?SLOG(error, #{
                 msg => "failed_to_start_pulsar_client",
-                instance_id => InstanceId,
+                instance_id => ConnResId,
                 pulsar_hosts => Servers,
                 reason => RedactedReason
             }),
@@ -112,62 +116,78 @@ on_start(InstanceId, Config) ->
     {ok, #{channels => #{}, client_id => ClientId, client_opts => ClientOpts}}.
 
 on_add_channel(
-    InstanceId,
+    ConnResId,
     #{channels := Channels, client_id := ClientId, client_opts := ClientOpts} = State,
-    ChannelId,
+    ActionResId,
     #{parameters := #{message := Message, sync_timeout := SyncTimeout} = Params}
 ) ->
-    case maps:is_key(ChannelId, Channels) of
+    case maps:is_key(ActionResId, Channels) of
         true ->
             {error, channel_already_exists};
         false ->
-            {ok, Producers} = start_producer(InstanceId, ChannelId, ClientId, ClientOpts, Params),
+            {ok, Producers} = start_producer(ConnResId, ActionResId, ClientId, ClientOpts, Params),
             Parameters = #{
                 message => compile_message_template(Message),
                 sync_timeout => SyncTimeout,
                 producers => Producers
             },
-            NewChannels = maps:put(ChannelId, Parameters, Channels),
+            NewChannels = maps:put(ActionResId, Parameters, Channels),
             {ok, State#{channels => NewChannels}}
     end.
 
-on_remove_channel(InstanceId, State, ChannelId) ->
-    #{channels := Channels, client_id := ClientId} = State,
-    case maps:find(ChannelId, Channels) of
+on_remove_channel(ConnResId, State, ActionResId) ->
+    #{channels := Channels} = State,
+    case maps:find(ActionResId, Channels) of
         {ok, #{producers := Producers}} ->
-            stop_producers(ClientId, Producers),
-            emqx_resource:deallocate_resource(InstanceId, {?pulsar_producers, ChannelId}),
-            {ok, State#{channels => maps:remove(ChannelId, Channels)}};
+            stop_producers(ActionResId, Producers),
+            emqx_resource:deallocate_resource(ConnResId, {?pulsar_producers, ActionResId}),
+            deallocate_telemetry_handlers(ConnResId, ActionResId),
+            {ok, State#{channels => maps:remove(ActionResId, Channels)}};
         error ->
             {ok, State}
     end.
 
-on_get_channels(InstanceId) ->
-    emqx_bridge_v2:get_channels_for_connector(InstanceId).
+on_get_channels(ConnResId) ->
+    emqx_bridge_v2:get_channels_for_connector(ConnResId).
 
 -spec on_stop(resource_id(), state()) -> ok.
-on_stop(InstanceId, _State) ->
-    Resources0 = emqx_resource:get_allocated_resources(InstanceId),
-    case maps:take(?pulsar_client_id, Resources0) of
-        {ClientId, Resources} ->
-            maps:foreach(
-                fun({?pulsar_producers, _BridgeV2Id}, Producers) ->
-                    stop_producers(ClientId, Producers)
-                end,
-                Resources
-            ),
-            stop_client(ClientId),
-            ?tp(pulsar_bridge_stopped, #{instance_id => InstanceId}),
-            ok;
-        error ->
-            ok
-    end.
+on_stop(ConnResId, _State) ->
+    Resources = emqx_resource:get_allocated_resources(ConnResId),
+    maps:foreach(
+        fun
+            ({?pulsar_producers, ActionResId}, Producers) ->
+                stop_producers(ActionResId, Producers);
+            (_, _) ->
+                ok
+        end,
+        Resources
+    ),
+    maps:foreach(
+        fun
+            ({?telemetry_handler_id, _ActionResId}, TelemetryId) ->
+                deallocate_telemetry_handlers(ConnResId, TelemetryId);
+            (_, _) ->
+                ok
+        end,
+        Resources
+    ),
+    maps:foreach(
+        fun
+            (?pulsar_client_id, ClientId) ->
+                stop_client(ClientId);
+            (_, _) ->
+                ok
+        end,
+        Resources
+    ),
+    ?tp(pulsar_bridge_stopped, #{instance_id => ConnResId}),
+    ok.
 
 %% Note: since Pulsar client has its own replayq that is not managed by
 %% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here.  Otherwise,
 %% `emqx_resource_manager' will kill the Pulsar producers and messages might be lost.
 -spec on_get_status(resource_id(), state()) -> connected | connecting.
-on_get_status(_InstanceId, State = #{}) ->
+on_get_status(_ConnResId, State = #{}) ->
     #{client_id := ClientId} = State,
     case pulsar_client_sup:find_client(ClientId) of
         {ok, Pid} ->
@@ -183,13 +203,13 @@ on_get_status(_InstanceId, State = #{}) ->
         {error, _} ->
             ?status_connecting
     end;
-on_get_status(_InstanceId, _State) ->
+on_get_status(_ConnResId, _State) ->
     %% If a health check happens just after a concurrent request to
     %% create the bridge is not quite finished, `State = undefined'.
     ?status_connecting.
 
-on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
-    case maps:find(ChannelId, Channels) of
+on_get_channel_status(_ConnResId, ActionResId, #{channels := Channels}) ->
+    case maps:find(ActionResId, Channels) of
         {ok, #{producers := Producers}} ->
             get_producer_status(Producers);
         error ->
@@ -200,21 +220,21 @@ on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
     {ok, term()}
     | {error, timeout}
     | {error, term()}.
-on_query(_InstanceId, {ChannelId, Message}, State) ->
+on_query(_ConnResId, {ActionResId, Message}, State) ->
     #{channels := Channels} = State,
-    case maps:find(ChannelId, Channels) of
+    case maps:find(ActionResId, Channels) of
         error ->
             {error, channel_not_found};
         {ok, #{message := MessageTmpl, sync_timeout := SyncTimeout, producers := Producers}} ->
             PulsarMessage = render_message(Message, MessageTmpl),
-            emqx_trace:rendered_action_template(ChannelId, #{
+            emqx_trace:rendered_action_template(ActionResId, #{
                 message => PulsarMessage,
                 sync_timeout => SyncTimeout,
                 is_async => false
             }),
             ?tp_span(
                 "pulsar_producer_query_enter",
-                #{instance_id => _InstanceId, message => Message, mode => sync},
+                #{instance_id => _ConnResId, message => Message, mode => sync},
                 try
                     ?tp("pulsar_producer_send", #{msg => PulsarMessage, mode => sync}),
                     pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout)
@@ -229,33 +249,97 @@ on_query(_InstanceId, {ChannelId, Message}, State) ->
     resource_id(), tuple(), {ReplyFun :: function(), Args :: list()}, state()
 ) ->
     {ok, pid()}.
-on_query_async(_InstanceId, {ChannelId, Message}, AsyncReplyFn, State) ->
+on_query_async(_ConnResId, {ActionResId, Message}, AsyncReplyFn, State) ->
     #{channels := Channels} = State,
-    case maps:find(ChannelId, Channels) of
+    case maps:find(ActionResId, Channels) of
         error ->
             {error, {unrecoverable_error, channel_not_found}};
         {ok, #{message := MessageTmpl, producers := Producers}} ->
             ?tp_span(
                 "pulsar_producer_query_enter",
-                #{instance_id => _InstanceId, message => Message, mode => async},
-                on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn)
+                #{instance_id => _ConnResId, message => Message, mode => async},
+                on_query_async2(ActionResId, Producers, Message, MessageTmpl, AsyncReplyFn)
             )
     end.
 
-on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) ->
+on_query_async2(ActionResId, Producers, Message, MessageTmpl, AsyncReplyFn) ->
     PulsarMessage = render_message(Message, MessageTmpl),
-    emqx_trace:rendered_action_template(ChannelId, #{
+    emqx_trace:rendered_action_template(ActionResId, #{
         message => PulsarMessage,
         is_async => true
     }),
+    CallbackFn = {fun ?MODULE:on_pulsar_ack/2, [AsyncReplyFn]},
     ?tp("pulsar_producer_send", #{msg => PulsarMessage, mode => async}),
-    pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}).
+    pulsar:send(Producers, [PulsarMessage], #{callback_fn => CallbackFn}).
 
 on_format_query_result({ok, Info}) ->
     #{result => ok, info => Info};
 on_format_query_result(Result) ->
     Result.
 
+on_pulsar_ack(_ReplyFnAndArgs, {error, Reason}) when
+    Reason =:= expired;
+    Reason =:= overflow
+->
+    %% We already bumped the dropped counter in `handle_telemetry_event/4', so no need to
+    %% call the wrapping callback here (it would bump the failure counter).
+    ok;
+on_pulsar_ack(ReplyFnAndArgs, Result) ->
+    emqx_resource:apply_reply_fun(ReplyFnAndArgs, Result).
+
+%%-------------------------------------------------------------------------------------
+%% `telemetry' API
+%%-------------------------------------------------------------------------------------
+
+%% we *must* match the bridge id in the event metadata with that in
+%% the handler config; otherwise, multiple pulsar producer bridges will
+%% install multiple handlers to the same pulsar events, multiplying the
+handle_telemetry_event(
+    [pulsar, dropped],
+    #{counter_inc := Val, reason := queue_full},
+    #{action_id := ID},
+    #{action_id := ID}
+) when is_integer(Val) ->
+    emqx_resource_metrics:dropped_queue_full_inc(ID, Val);
+handle_telemetry_event(
+    [pulsar, dropped],
+    #{counter_inc := Val, reason := expired},
+    #{action_id := ID},
+    #{action_id := ID}
+) when is_integer(Val) ->
+    emqx_resource_metrics:dropped_expired_inc(ID, Val);
+handle_telemetry_event(
+    [pulsar, queuing],
+    #{gauge_set := Val},
+    #{action_id := ID, partition_topic := PartitionTopic},
+    #{action_id := ID}
+) when is_integer(Val) ->
+    emqx_resource_metrics:queuing_set(ID, PartitionTopic, Val);
+handle_telemetry_event(
+    [pulsar, queuing_bytes],
+    #{gauge_set := Val},
+    #{action_id := ID, partition_topic := PartitionTopic},
+    #{action_id := ID}
+) when is_integer(Val) ->
+    emqx_resource_metrics:queuing_bytes_set(ID, PartitionTopic, Val);
+handle_telemetry_event(
+    [pulsar, retried],
+    #{counter_inc := Val},
+    #{action_id := ID},
+    #{action_id := ID}
+) when is_integer(Val) ->
+    emqx_resource_metrics:retried_inc(ID, Val);
+handle_telemetry_event(
+    [pulsar, inflight],
+    #{gauge_set := Val},
+    #{action_id := ID, partition_topic := PartitionTopic},
+    #{action_id := ID}
+) when is_integer(Val) ->
+    emqx_resource_metrics:inflight_set(ID, PartitionTopic, Val);
+handle_telemetry_event(_EventId, _Metrics, _Metadata, _HandlerConfig) ->
+    %% Event that we do not handle
+    ok.
+
 %%-------------------------------------------------------------------------------------
 %% Internal fns
 %%-------------------------------------------------------------------------------------
@@ -270,12 +354,12 @@ format_servers(Servers0) ->
     ).
 
 -spec make_client_id(resource_id()) -> pulsar_client_id().
-make_client_id(InstanceId) ->
-    case emqx_resource:is_dry_run(InstanceId) of
+make_client_id(ConnResId) ->
+    case emqx_resource:is_dry_run(ConnResId) of
         true ->
             pulsar_producer_probe;
         false ->
-            {pulsar, Name} = emqx_connector_resource:parse_connector_id(InstanceId),
+            {pulsar, Name} = emqx_connector_resource:parse_connector_id(ConnResId),
             ClientIdBin = iolist_to_binary([
                 <<"pulsar:">>,
                 emqx_utils_conv:bin(Name),
@@ -304,22 +388,22 @@ conn_opts(#{authentication := #{jwt := JWT}}) ->
 replayq_dir(ClientId) ->
     filename:join([emqx:data_dir(), "pulsar", emqx_utils_conv:bin(ClientId)]).
 
-producer_name(InstanceId, ChannelId) ->
-    case emqx_resource:is_dry_run(InstanceId) of
+producer_name(ConnResId, ActionResId) ->
+    case emqx_resource:is_dry_run(ConnResId) of
         %% do not create more atom
         true ->
             pulsar_producer_probe_worker;
         false ->
-            ChannelIdBin = emqx_utils_conv:bin(ChannelId),
+            ActionResIdBin = emqx_utils_conv:bin(ActionResId),
             binary_to_atom(
                 iolist_to_binary([
                     <<"producer-">>,
-                    ChannelIdBin
+                    ActionResIdBin
                 ])
             )
     end.
 
-start_producer(InstanceId, ChannelId, ClientId, ClientOpts, Params) ->
+start_producer(ConnResId, ActionResId, ClientId, ClientOpts, Params) ->
     #{
         conn_opts := ConnOpts,
         ssl_opts := SSLOpts
@@ -342,8 +426,8 @@ start_producer(InstanceId, ChannelId, ClientId, ClientOpts, Params) ->
     {OffloadMode, ReplayQDir} =
         case BufferMode of
             memory -> {false, false};
-            disk -> {false, replayq_dir(ChannelId)};
-            hybrid -> {true, replayq_dir(ChannelId)}
+            disk -> {false, replayq_dir(ActionResId)};
+            hybrid -> {true, replayq_dir(ActionResId)}
         end,
     MemOLP =
         case os:type() of
@@ -357,7 +441,7 @@ start_producer(InstanceId, ChannelId, ClientId, ClientOpts, Params) ->
         replayq_seg_bytes => SegmentBytes,
         drop_if_highmem => MemOLP
     },
-    ProducerName = producer_name(InstanceId, ChannelId),
+    ProducerName = producer_name(ConnResId, ActionResId),
     ?tp(pulsar_producer_capture_name, #{producer_name => ProducerName}),
     ProducerOpts0 =
         #{
@@ -369,15 +453,22 @@ start_producer(InstanceId, ChannelId, ClientId, ClientOpts, Params) ->
             retention_period => RetentionPeriod,
             ssl_opts => SSLOpts,
             strategy => partition_strategy(Strategy),
-            tcp_opts => [{sndbuf, SendBuffer}]
+            tcp_opts => [{sndbuf, SendBuffer}],
+            telemetry_metadata => #{action_id => ActionResId}
         },
     ProducerOpts = maps:merge(ReplayQOpts, ProducerOpts0),
     ?tp(pulsar_producer_about_to_start_producers, #{producer_name => ProducerName}),
+    ok = emqx_resource:allocate_resource(
+        ConnResId,
+        {?telemetry_handler_id, ActionResId},
+        ActionResId
+    ),
+    _ = maybe_install_telemetry_handlers(ActionResId),
     try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of
         {ok, Producers} ->
             ok = emqx_resource:allocate_resource(
-                InstanceId,
-                {?pulsar_producers, ChannelId},
+                ConnResId,
+                {?pulsar_producers, ActionResId},
                 Producers
             ),
             ?tp(pulsar_producer_producers_allocated, #{}),
@@ -389,7 +480,7 @@ start_producer(InstanceId, ChannelId, ClientId, ClientOpts, Params) ->
                 error,
                 "failed_to_start_pulsar_producer",
                 #{
-                    instance_id => InstanceId,
+                    instance_id => ConnResId,
                     kind => Kind,
                     reason => emqx_utils:redact(Error, fun is_sensitive_key/1),
                     stacktrace => Stacktrace
@@ -399,6 +490,7 @@ start_producer(InstanceId, ChannelId, ClientId, ClientOpts, Params) ->
                 pulsar_client_id => ClientId,
                 producers => undefined
             }),
+            _ = uninstall_telemetry_handlers(ActionResId),
             throw(failed_to_start_pulsar_producer)
     end.
 
@@ -417,20 +509,20 @@ stop_client(ClientId) ->
     ),
     ok.
 
--spec stop_producers(pulsar_client_id(), pulsar_producers:producers()) -> ok.
-stop_producers(ClientId, Producers) ->
+-spec stop_producers(action_resource_id(), pulsar_producers:producers()) -> ok.
+stop_producers(ActionResId, Producers) ->
     _ = log_when_error(
         fun() ->
             ok = pulsar:stop_and_delete_supervised_producers(Producers),
             ?tp(pulsar_bridge_producer_stopped, #{
-                pulsar_client_id => ClientId,
+                action_id => ActionResId,
                 producers => Producers
             }),
             ok
         end,
         #{
             msg => "failed_to_delete_pulsar_producer",
-            pulsar_client_id => ClientId
+            action_id => ActionResId
         }
     ),
     ok.
@@ -516,3 +608,50 @@ do_get_error_message(Iterator) ->
         none ->
             error
     end.
+
+maybe_install_telemetry_handlers(ActionResId) ->
+    %% Attach event handlers for telemetry events. If a handler with the
+    %% handler id already exists, the attach_many function does nothing
+    telemetry:attach_many(
+        %% unique handler id
+        ActionResId,
+        [
+            [pulsar, dropped],
+            [pulsar, queuing],
+            [pulsar, queuing_bytes],
+            [pulsar, retried],
+            [pulsar, inflight]
+        ],
+        fun ?MODULE:handle_telemetry_event/4,
+        %% we *must* keep track of the same id that is handed down to
+        %% wolff producers; otherwise, multiple kafka producer bridges
+        %% will install multiple handlers to the same wolff events,
+        %% multiplying the metric counts...
+        #{action_id => ActionResId}
+    ).
+
+with_log_at_error(Fun, Log) ->
+    try
+        Fun()
+    catch
+        C:E ->
+            ?SLOG(error, Log#{
+                exception => C,
+                reason => E
+            })
+    end.
+
+uninstall_telemetry_handlers(TelemetryId) ->
+    telemetry:detach(TelemetryId).
+
+deallocate_telemetry_handlers(ConnResId, ActionResId) ->
+    _ = with_log_at_error(
+        fun() ->
+            _ = uninstall_telemetry_handlers(ActionResId),
+            emqx_resource:deallocate_resource(ConnResId, {?telemetry_handler_id, ActionResId})
+        end,
+        #{
+            msg => "failed_to_uninstall_telemetry_handlers",
+            action_id => ActionResId
+        }
+    ).

+ 327 - 41
apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl

@@ -52,21 +52,18 @@ init_per_group(plain = Type, Config) ->
     PulsarHost = os:getenv("PULSAR_PLAIN_HOST", "toxiproxy"),
     PulsarPort = list_to_integer(os:getenv("PULSAR_PLAIN_PORT", "6652")),
     ProxyName = "pulsar_plain",
+    reset_proxy(),
     case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of
         true ->
             Config1 = common_init_per_group(),
-            ConnectorName = ?MODULE,
-            NewConfig =
-                [
-                    {proxy_name, ProxyName},
-                    {pulsar_host, PulsarHost},
-                    {pulsar_port, PulsarPort},
-                    {pulsar_type, Type},
-                    {use_tls, false}
-                    | Config1 ++ Config
-                ],
-            create_connector(ConnectorName, NewConfig),
-            NewConfig;
+            [
+                {proxy_name, ProxyName},
+                {pulsar_host, PulsarHost},
+                {pulsar_port, PulsarPort},
+                {pulsar_type, Type},
+                {use_tls, false}
+                | Config1 ++ Config
+            ];
         false ->
             maybe_skip_without_ci()
     end;
@@ -74,21 +71,18 @@ init_per_group(tls = Type, Config) ->
     PulsarHost = os:getenv("PULSAR_TLS_HOST", "toxiproxy"),
     PulsarPort = list_to_integer(os:getenv("PULSAR_TLS_PORT", "6653")),
     ProxyName = "pulsar_tls",
+    reset_proxy(),
     case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of
         true ->
             Config1 = common_init_per_group(),
-            ConnectorName = ?MODULE,
-            NewConfig =
-                [
-                    {proxy_name, ProxyName},
-                    {pulsar_host, PulsarHost},
-                    {pulsar_port, PulsarPort},
-                    {pulsar_type, Type},
-                    {use_tls, true}
-                    | Config1 ++ Config
-                ],
-            create_connector(ConnectorName, NewConfig),
-            NewConfig;
+            [
+                {proxy_name, ProxyName},
+                {pulsar_host, PulsarHost},
+                {pulsar_port, PulsarPort},
+                {pulsar_type, Type},
+                {use_tls, true}
+                | Config1 ++ Config
+            ];
         false ->
             maybe_skip_without_ci()
     end;
@@ -105,9 +99,9 @@ end_per_group(_Group, _Config) ->
     ok.
 
 common_init_per_group() ->
+    reset_proxy(),
     ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
     ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
-    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
     UniqueNum = integer_to_binary(erlang:unique_integer()),
     MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>,
     [
@@ -116,6 +110,12 @@ common_init_per_group() ->
         {mqtt_topic, MQTTTopic}
     ].
 
+reset_proxy() ->
+    ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
+    ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    ok.
+
 common_end_per_group(Config) ->
     ProxyHost = ?config(proxy_host, Config),
     ProxyPort = ?config(proxy_port, Config),
@@ -131,7 +131,7 @@ end_per_testcase(_Testcase, Config) ->
     ProxyHost = ?config(proxy_host, Config),
     ProxyPort = ?config(proxy_port, Config),
     emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
-    emqx_bridge_v2_testlib:delete_all_bridges(),
+    emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
     stop_consumer(Config),
     %% in CI, apparently this needs more time since the
     %% machines struggle with all the containers running...
@@ -159,21 +159,25 @@ common_init_per_testcase(TestCase, Config0) ->
 %% Helper fns
 %%------------------------------------------------------------------------------
 
-create_connector(Name, Config) ->
-    Connector = pulsar_connector(Config),
-    {ok, _} = emqx_connector:create(?TYPE, Name, Connector).
+create_connector(Config) ->
+    {201, _} = create_connector_api([
+        {connector_type, ?TYPE},
+        {connector_name, ?MODULE},
+        {connector_config, connector_config(Config)}
+    ]),
+    ok.
 
 delete_connector(Name) ->
     ok = emqx_connector:remove(?TYPE, Name).
 
 create_action(Name, Config) ->
-    Action = pulsar_action(Config),
+    Action = action_config(Config),
     {ok, _} = emqx_bridge_v2:create(actions, ?TYPE, Name, Action).
 
 delete_action(Name) ->
     ok = emqx_bridge_v2:remove(actions, ?TYPE, Name).
 
-pulsar_connector(Config) ->
+connector_config(Config) ->
     PulsarHost = ?config(pulsar_host, Config),
     PulsarPort = ?config(pulsar_port, Config),
     UseTLS = proplists:get_value(use_tls, Config, false),
@@ -201,11 +205,13 @@ pulsar_connector(Config) ->
     },
     emqx_bridge_v2_testlib:parse_and_check_connector(?TYPE, Name, InnerConfigMap).
 
-pulsar_action(Config) ->
+action_config(Config) ->
+    action_config(atom_to_binary(?MODULE), Config).
+
+action_config(ConnectorName, Config) ->
     QueryMode = proplists:get_value(query_mode, Config, <<"sync">>),
-    Name = atom_to_binary(?MODULE),
     InnerConfigMap = #{
-        <<"connector">> => Name,
+        <<"connector">> => ConnectorName,
         <<"enable">> => true,
         <<"parameters">> => #{
             <<"retention_period">> => <<"infinity">>,
@@ -231,7 +237,7 @@ pulsar_action(Config) ->
             <<"metrics_flush_interval">> => <<"300ms">>
         }
     },
-    emqx_bridge_v2_testlib:parse_and_check(action, ?TYPE, Name, InnerConfigMap).
+    emqx_bridge_v2_testlib:parse_and_check(action, ?TYPE, <<"some_action">>, InnerConfigMap).
 
 instance_id(Type, Name) ->
     ConnectorId = emqx_bridge_resource:resource_id(Type, ?TYPE, Name),
@@ -398,6 +404,39 @@ group_path(Config) ->
             Path
     end.
 
+create_connector_api(Config) ->
+    emqx_bridge_v2_testlib:simplify_result(
+        emqx_bridge_v2_testlib:create_connector_api(Config)
+    ).
+
+create_action_api(Config) ->
+    create_action_api(Config, _Overrides = #{}).
+
+create_action_api(Config, Overrides) ->
+    emqx_bridge_v2_testlib:simplify_result(
+        emqx_bridge_v2_testlib:create_kind_api(Config, Overrides)
+    ).
+
+update_action_api(Config, Overrides) ->
+    emqx_bridge_v2_testlib:simplify_result(
+        emqx_bridge_v2_testlib:update_bridge_api(Config, Overrides)
+    ).
+
+get_combined_metrics(ActionResId, RuleId) ->
+    Metrics = emqx_resource:get_metrics(ActionResId),
+    RuleMetrics = emqx_metrics_worker:get_counters(rule_metrics, RuleId),
+    Metrics#{rule => RuleMetrics}.
+
+reset_combined_metrics(ActionResId, RuleId) ->
+    #{
+        kind := action,
+        type := Type,
+        name := Name
+    } = emqx_bridge_v2:parse_id(ActionResId),
+    ok = emqx_bridge_v2:reset_metrics(actions, Type, Name),
+    ok = emqx_rule_engine:reset_metrics_for_rule(RuleId),
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -406,7 +445,8 @@ t_action_probe(matrix) ->
     [[plain], [tls]];
 t_action_probe(Config) when is_list(Config) ->
     Name = atom_to_binary(?FUNCTION_NAME),
-    Action = pulsar_action(Config),
+    create_connector(Config),
+    Action = action_config(Config),
     {ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action),
     ?assertMatch({{_, 204, _}, _, _}, Res0),
     ok.
@@ -424,6 +464,7 @@ t_action(Config) when is_list(Config) ->
             _ -> <<"async">>
         end,
     Name = atom_to_binary(?FUNCTION_NAME),
+    create_connector(Config),
     create_action(Name, [{query_mode, QueryMode} | Config]),
     Actions = emqx_bridge_v2:list(actions),
     Any = fun(#{name := BName}) -> BName =:= Name end,
@@ -477,8 +518,8 @@ t_multiple_actions_sharing_topic(matrix) ->
 t_multiple_actions_sharing_topic(Config) when is_list(Config) ->
     Type = ?TYPE,
     ConnectorName = <<"c">>,
-    ConnectorConfig = pulsar_connector(Config),
-    ActionConfig = pulsar_action(Config),
+    ConnectorConfig = connector_config(Config),
+    ActionConfig = action_config(ConnectorName, Config),
     ?check_trace(
         begin
             ConnectorParams = [
@@ -572,14 +613,259 @@ t_sync_query_down(Config0) when is_list(Config0) ->
         success_tp_filter =>
             ?match_event(#{?snk_kind := pulsar_echo_consumer_message})
     },
+    ConnectorName = atom_to_binary(?FUNCTION_NAME),
     Config = [
         {connector_type, ?TYPE},
-        {connector_name, ?FUNCTION_NAME},
-        {connector_config, pulsar_connector(Config0)},
+        {connector_name, ConnectorName},
+        {connector_config, connector_config(Config0)},
         {action_type, ?TYPE},
         {action_name, ?FUNCTION_NAME},
-        {action_config, pulsar_action(Config0)}
+        {action_config, action_config(ConnectorName, Config0)}
         | proplists_with([proxy_name, proxy_host, proxy_port], Config0)
     ],
     emqx_bridge_v2_testlib:t_sync_query_down(Config, Opts),
     ok.
+
+%% Checks that we correctly handle telemetry events emitted by pulsar.
+t_telemetry_metrics(matrix) ->
+    [[plain]];
+t_telemetry_metrics(Config) when is_list(Config) ->
+    ProxyName = ?config(proxy_name, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    Type = ?TYPE,
+    ConnectorName = <<"c">>,
+    ConnectorConfig = connector_config(Config),
+    ActionConfig = action_config(ConnectorName, Config),
+    ConnectorParams = [
+        {connector_config, ConnectorConfig},
+        {connector_name, ConnectorName},
+        {connector_type, Type}
+    ],
+    ActionName1 = <<"a1">>,
+    ActionParams1 = [
+        {action_config, ActionConfig},
+        {action_name, ActionName1},
+        {action_type, Type}
+    ],
+    ActionName2 = <<"a2">>,
+    ActionParams2 = [
+        {action_config, ActionConfig},
+        {action_name, ActionName2},
+        {action_type, Type}
+    ],
+    ?check_trace(
+        begin
+            {201, _} =
+                create_connector_api(ConnectorParams),
+            {201, _} =
+                create_action_api(
+                    ActionParams1,
+                    %% Initially, this will overflow on small messages
+                    #{
+                        <<"parameters">> => #{
+                            <<"buffer">> => #{
+                                <<"mode">> => <<"disk">>,
+                                <<"per_partition_limit">> => <<"2B">>,
+                                <<"segment_bytes">> => <<"1B">>
+                            }
+                        }
+                    }
+                ),
+            {201, _} =
+                create_action_api(ActionParams2),
+            RuleTopic = <<"t/a2">>,
+            {ok, #{<<"id">> := RuleId}} =
+                emqx_bridge_v2_testlib:create_rule_and_action_http(Type, RuleTopic, [
+                    {bridge_name, ActionName1}
+                ]),
+            {ok, C} = emqtt:start_link([]),
+            {ok, _} = emqtt:connect(C),
+            SendMessage = fun() ->
+                ReqPayload = payload(),
+                ReqPayloadBin = emqx_utils_json:encode(ReqPayload),
+                {ok, _} = emqtt:publish(C, RuleTopic, #{}, ReqPayloadBin, [
+                    {qos, 1}, {retain, false}
+                ]),
+                ok
+            end,
+            SendMessage(),
+            ActionResId1 = emqx_bridge_v2_testlib:bridge_id(ActionParams1),
+            ActionResId2 = emqx_bridge_v2_testlib:bridge_id(ActionParams2),
+            ?retry(
+                100,
+                10,
+                ?assertMatch(
+                    #{
+                        counters := #{
+                            'dropped.queue_full' := 1,
+                            'dropped.expired' := 0,
+                            success := 0,
+                            matched := 1,
+                            failed := 0,
+                            received := 0
+                        },
+                        gauges := #{
+                            inflight := 0,
+                            queuing := 0,
+                            queuing_bytes := 0
+                        },
+                        rule := #{
+                            matched := 1,
+                            %% todo: bump action failure count when dropped to mimic common
+                            %% buffer worker behavior.
+                            'actions.failed' := 0,
+                            'actions.failed.unknown' := 0,
+                            'actions.success' := 0
+                        }
+                    },
+                    get_combined_metrics(ActionResId1, RuleId)
+                )
+            ),
+            reset_combined_metrics(ActionResId1, RuleId),
+            %% Now to make it drop expired messages
+            {200, _} =
+                update_action_api(ActionParams1, #{
+                    <<"parameters">> => #{
+                        <<"retention_period">> => <<"10ms">>
+                    }
+                }),
+            emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+                SendMessage(),
+                ?retry(
+                    100,
+                    10,
+                    ?assertMatch(
+                        #{
+                            counters := #{
+                                'dropped.queue_full' := 0,
+                                'dropped.expired' := 0,
+                                success := 0,
+                                matched := 1,
+                                failed := 0,
+                                received := 0
+                            },
+                            gauges := #{
+                                inflight := 0,
+                                queuing := 1,
+                                queuing_bytes := QueuingBytes1
+                            }
+                        } when QueuingBytes1 > 0,
+                        get_combined_metrics(ActionResId1, RuleId)
+                    )
+                ),
+                %% Other action is not affected by telemetry events for first action.
+                ?assertMatch(
+                    #{
+                        counters := #{
+                            'dropped.queue_full' := 0,
+                            'dropped.expired' := 0,
+                            success := 0,
+                            matched := 0,
+                            failed := 0,
+                            received := 0
+                        },
+                        gauges := #{
+                            inflight := 0,
+                            queuing := 0,
+                            queuing_bytes := 0
+                        }
+                    },
+                    emqx_resource:get_metrics(ActionResId2)
+                ),
+                ct:sleep(20),
+                ok
+            end),
+            %% After connection is restored, the request is already expired
+            ?retry(
+                500,
+                20,
+                ?assertMatch(
+                    #{
+                        counters := #{
+                            'dropped.queue_full' := 0,
+                            'dropped.expired' := 1,
+                            success := 0,
+                            matched := 1,
+                            failed := 0,
+                            received := 0
+                        },
+                        gauges := #{
+                            inflight := 0,
+                            queuing := 0,
+                            queuing_bytes := 0
+                        },
+                        rule := #{
+                            matched := 1,
+                            %% todo: bump action failure count when dropped to mimic common
+                            %% buffer worker behavior.
+                            'actions.failed' := 0,
+                            'actions.failed.unknown' := 0,
+                            'actions.success' := 0
+                        }
+                    },
+                    get_combined_metrics(ActionResId1, RuleId)
+                )
+            ),
+            reset_combined_metrics(ActionResId1, RuleId),
+
+            %% Now, a success.
+            SendMessage(),
+            ?retry(
+                500,
+                20,
+                ?assertMatch(
+                    #{
+                        counters := #{
+                            'dropped.queue_full' := 0,
+                            'dropped.expired' := 0,
+                            success := 1,
+                            matched := 1,
+                            failed := 0,
+                            received := 0
+                        },
+                        gauges := #{
+                            inflight := 0,
+                            queuing := 0,
+                            queuing_bytes := 0
+                        },
+                        rule := #{
+                            matched := 1,
+                            'actions.failed' := 0,
+                            'actions.failed.unknown' := 0,
+                            'actions.success' := 1
+                        }
+                    },
+                    get_combined_metrics(ActionResId1, RuleId)
+                )
+            ),
+
+            %% Other action is not affected by telemetry events for first action.
+            ?retry(
+                100,
+                10,
+                ?assertMatch(
+                    #{
+                        counters := #{
+                            'dropped.queue_full' := 0,
+                            'dropped.expired' := 0,
+                            success := 0,
+                            matched := 0,
+                            failed := 0,
+                            received := 0
+                        },
+                        gauges := #{
+                            inflight := 0,
+                            queuing := 0,
+                            queuing_bytes := 0
+                        }
+                    },
+                    emqx_resource:get_metrics(ActionResId2)
+                )
+            ),
+
+            ok
+        end,
+        []
+    ),
+    ok.

+ 1 - 0
changes/ee/feat-14110.en.md

@@ -0,0 +1 @@
+Added support for Pulsar driver to report metrics.  Now, it will report metrics such as queuing, inflight and dropped message count for better observability.