Przeglądaj źródła

fix(kakfa_producer): prevent multiple producers from multiplying each other's metrics

Thales Macedo Garitezi 3 lat temu
rodzic
commit
61246c43c4

+ 37 - 24
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl

@@ -33,7 +33,10 @@ on_start(InstId, Config) ->
         authentication := Auth,
         ssl := SSL
     } = Config,
-    _ = maybe_install_wolff_telemetry_handlers(InstId),
+    %% TODO: change this to `kafka_producer` after refactoring for kafka_consumer
+    BridgeType = kafka,
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
+    _ = maybe_install_wolff_telemetry_handlers(InstId, ResourceID),
     %% it's a bug if producer config is not found
     %% the caller should not try to start a producer if
     %% there is no producer config
@@ -137,7 +140,7 @@ on_query(_InstId, {send_message, Message}, #{message_template := Template, produ
     %% If the producer process is down when sending, this function would
     %% raise an error exception which is to be caught by the caller of this callback
     {_Partition, _Pid} = wolff:send(Producers, [KafkaMessage], {fun ?MODULE:on_kafka_ack/3, [#{}]}),
-    ok.
+    {async_return, ok}.
 
 compile_message_template(#{
     key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate
@@ -299,62 +302,72 @@ get_required(Field, Config, Throw) ->
     Value =:= none andalso throw(Throw),
     Value.
 
+%% we *must* match the bridge id in the event metadata with that in
+%% the handler config; otherwise, multiple kafka producer bridges will
+%% install multiple handlers to the same wolff events, multiplying the
 handle_telemetry_event(
     [wolff, dropped],
     #{counter_inc := Val},
     #{bridge_id := ID},
-    _HandlerConfig
+    #{bridge_id := ID}
 ) when is_integer(Val) ->
     emqx_resource_metrics:dropped_inc(ID, Val);
 handle_telemetry_event(
     [wolff, dropped_queue_full],
     #{counter_inc := Val},
     #{bridge_id := ID},
-    _HandlerConfig
+    #{bridge_id := ID}
 ) when is_integer(Val) ->
     emqx_resource_metrics:dropped_queue_full_inc(ID, Val);
 handle_telemetry_event(
     [wolff, queuing],
-    #{counter_inc := _Val},
+    #{gauge_set := Val},
     #{bridge_id := ID, partition_id := PartitionID},
-    _HandlerConfig
-) when is_integer(_Val) ->
-    emqx_resource_metrics:queuing_set(ID, PartitionID, 0);
+    #{bridge_id := ID}
+) when is_integer(Val) ->
+    emqx_resource_metrics:queuing_set(ID, PartitionID, Val);
 handle_telemetry_event(
     [wolff, retried],
     #{counter_inc := Val},
     #{bridge_id := ID},
-    _HandlerConfig
+    #{bridge_id := ID}
 ) when is_integer(Val) ->
     emqx_resource_metrics:retried_inc(ID, Val);
 handle_telemetry_event(
     [wolff, failed],
     #{counter_inc := Val},
     #{bridge_id := ID},
-    _HandlerConfig
+    #{bridge_id := ID}
 ) when is_integer(Val) ->
     emqx_resource_metrics:failed_inc(ID, Val);
 handle_telemetry_event(
     [wolff, inflight],
-    #{counter_inc := _Val},
+    #{gauge_set := Val},
     #{bridge_id := ID, partition_id := PartitionID},
-    _HandlerConfig
-) when is_integer(_Val) ->
-    emqx_resource_metrics:inflight_set(ID, PartitionID, 0);
+    #{bridge_id := ID}
+) when is_integer(Val) ->
+    emqx_resource_metrics:inflight_set(ID, PartitionID, Val);
 handle_telemetry_event(
     [wolff, retried_failed],
     #{counter_inc := Val},
     #{bridge_id := ID},
-    _HandlerConfig
+    #{bridge_id := ID}
 ) when is_integer(Val) ->
     emqx_resource_metrics:retried_failed_inc(ID, Val);
 handle_telemetry_event(
     [wolff, retried_success],
     #{counter_inc := Val},
     #{bridge_id := ID},
-    _HandlerConfig
+    #{bridge_id := ID}
 ) when is_integer(Val) ->
     emqx_resource_metrics:retried_success_inc(ID, Val);
+handle_telemetry_event(
+    [wolff, success],
+    #{counter_inc := Val},
+    #{bridge_id := ID},
+    #{bridge_id := ID}
+) when is_integer(Val) ->
+    emqx_resource_metrics:success_inc(ID, Val);
 handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) ->
     %% Event that we do not handle
     ok.
@@ -367,17 +380,12 @@ uninstall_telemetry_handlers(InstanceID) ->
     HandlerID = telemetry_handler_id(InstanceID),
     telemetry:detach(HandlerID).
 
-maybe_install_wolff_telemetry_handlers(InstanceID) ->
+maybe_install_wolff_telemetry_handlers(InstanceID, ResourceID) ->
     %% Attach event handlers for Kafka telemetry events. If a handler with the
     %% handler id already exists, the attach_many function does nothing
     telemetry:attach_many(
         %% unique handler id
         telemetry_handler_id(InstanceID),
-        %% Note: we don't handle `[wolff, success]' because,
-        %% currently, we already increment the success counter for
-        %% this resource at `emqx_rule_runtime:handle_action' when
-        %% the response is `ok' and we would double increment it
-        %% here.
         [
             [wolff, dropped],
             [wolff, dropped_queue_full],
@@ -386,8 +394,13 @@ maybe_install_wolff_telemetry_handlers(InstanceID) ->
             [wolff, failed],
             [wolff, inflight],
             [wolff, retried_failed],
-            [wolff, retried_success]
+            [wolff, retried_success],
+            [wolff, success]
         ],
         fun ?MODULE:handle_telemetry_event/4,
-        []
+        %% we *must* keep track of the same id that is handed down to
+        %% wolff producers; otherwise, multiple kafka producer bridges
+        %% will install multiple handlers to the same wolff events,
+        %% multiplying the metric counts...
+        #{bridge_id => ResourceID}
     ).

+ 2 - 2
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl

@@ -390,7 +390,7 @@ t_failed_creation_then_fix(_Config) ->
     },
     {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
     ct:pal("base offset before testing ~p", [Offset]),
-    ?assertEqual(ok, ?PRODUCER:on_query(ResourceId, {send_message, Msg}, State)),
+    ?assertEqual({async_return, ok}, ?PRODUCER:on_query(ResourceId, {send_message, Msg}, State)),
     {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
     ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
     %% TODO: refactor those into init/end per testcase
@@ -455,7 +455,7 @@ publish_helper(#{
     StartRes = ?PRODUCER:on_start(InstId, Conf),
     {ok, State} = StartRes,
     OnQueryRes = ?PRODUCER:on_query(InstId, {send_message, Msg}, State),
-    ok = OnQueryRes,
+    {async_return, ok} = OnQueryRes,
     {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
     ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
     ok = ?PRODUCER:on_stop(InstId, State),