Browse Source

feat(kafka-like producers): add queuing bytes metric telemetry support

https://github.com/kafka4beam/wolff/pull/82
Thales Macedo Garitezi 1 năm trước cách đây
mục cha
commit
efd859562c

+ 1 - 1
apps/emqx_bridge_azure_event_hub/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, "4.0.2"},
+    {wolff, "4.0.3"},
     {kafka_protocol, "4.1.8"},
     {brod_gssapi, "0.1.3"},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

+ 1 - 1
apps/emqx_bridge_confluent/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, "4.0.2"},
+    {wolff, "4.0.3"},
     {kafka_protocol, "4.1.8"},
     {brod_gssapi, "0.1.3"},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

+ 1 - 1
apps/emqx_bridge_kafka/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, "4.0.2"},
+    {wolff, "4.0.3"},
     {kafka_protocol, "4.1.8"},
     {brod_gssapi, "0.1.3"},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

+ 8 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -928,6 +928,13 @@ handle_telemetry_event(
     #{bridge_id := ID}
 ) when is_integer(Val) ->
     emqx_resource_metrics:queuing_set(ID, PartitionID, Val);
+handle_telemetry_event(
+    [wolff, queuing_bytes],
+    #{gauge_set := Val},
+    #{bridge_id := ID, partition_id := PartitionID},
+    #{bridge_id := ID}
+) when is_integer(Val) ->
+    emqx_resource_metrics:queuing_bytes_set(ID, PartitionID, Val);
 handle_telemetry_event(
     [wolff, retried],
     #{counter_inc := Val},
@@ -965,6 +972,7 @@ maybe_install_wolff_telemetry_handlers(TelemetryId) ->
         [
             [wolff, dropped_queue_full],
             [wolff, queuing],
+            [wolff, queuing_bytes],
             [wolff, retried],
             [wolff, inflight]
         ],

+ 2 - 0
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -714,6 +714,8 @@ t_create_connector_while_connection_is_down(Config) ->
                 ),
                 ?assertEqual(PreviousFailed, emqx_resource_metrics:failed_get(ActionId)),
                 ?assertEqual(1, emqx_resource_metrics:queuing_get(ActionId)),
+                QueuingBytes = emqx_resource_metrics:queuing_bytes_get(ActionId),
+                ?assert(QueuingBytes > 0, #{bytes => QueuingBytes}),
                 ?assertEqual(0, emqx_resource_metrics:inflight_get(ActionId)),
                 ?assertEqual(0, emqx_resource_metrics:dropped_get(ActionId)),
                 ?assertEqual(0, emqx_resource_metrics:success_get(ActionId)),

+ 2 - 0
changes/ce/feat-14065.en.md

@@ -0,0 +1,2 @@
+Added the new `queuing_bytes` to available data integration metrics.  This metric indicates how much RAM and/or disk resources the buffering of a given action is consuming.
+Currently, only Pulsar Producer action lacks support for this metric.

+ 1 - 1
mix.exs

@@ -274,7 +274,7 @@ defmodule EMQXUmbrella.MixProject do
   def common_dep(:influxdb),
     do: {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}
 
-  def common_dep(:wolff), do: {:wolff, "4.0.2"}
+  def common_dep(:wolff), do: {:wolff, "4.0.3"}
   def common_dep(:brod_gssapi), do: {:brod_gssapi, "0.1.3"}
 
   def common_dep(:kafka_protocol),