Просмотр исходного кода

Merge pull request #14065 from thalesmg/20241023-r58-data-buffer-metrics

feat(buffer worker): report queued bytes as a metric
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
1f887d4bce

+ 91 - 1
apps/emqx_bridge/include/emqx_bridge.hrl

@@ -14,9 +14,95 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
+-define(EMPTY_METRICS_V1,
+    ?METRICS_V1(
+        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
+    )
+).
+
+-define(METRICS_V1(
+    Dropped,
+    DroppedOther,
+    DroppedExpired,
+    DroppedQueueFull,
+    DroppedResourceNotFound,
+    DroppedResourceStopped,
+    Matched,
+    Queued,
+    Retried,
+    LateReply,
+    SentFailed,
+    SentInflight,
+    SentSucc,
+    RATE,
+    RATE_5,
+    RATE_MAX,
+    Rcvd
+),
+    #{
+        'dropped' => Dropped,
+        'dropped.other' => DroppedOther,
+        'dropped.expired' => DroppedExpired,
+        'dropped.queue_full' => DroppedQueueFull,
+        'dropped.resource_not_found' => DroppedResourceNotFound,
+        'dropped.resource_stopped' => DroppedResourceStopped,
+        'matched' => Matched,
+        'queuing' => Queued,
+        'retried' => Retried,
+        'late_reply' => LateReply,
+        'failed' => SentFailed,
+        'inflight' => SentInflight,
+        'success' => SentSucc,
+        rate => RATE,
+        rate_last5m => RATE_5,
+        rate_max => RATE_MAX,
+        received => Rcvd
+    }
+).
+
+-define(metrics_v1(
+    Dropped,
+    DroppedOther,
+    DroppedExpired,
+    DroppedQueueFull,
+    DroppedResourceNotFound,
+    DroppedResourceStopped,
+    Matched,
+    Queued,
+    Retried,
+    LateReply,
+    SentFailed,
+    SentInflight,
+    SentSucc,
+    RATE,
+    RATE_5,
+    RATE_MAX,
+    Rcvd
+),
+    #{
+        'dropped' := Dropped,
+        'dropped.other' := DroppedOther,
+        'dropped.expired' := DroppedExpired,
+        'dropped.queue_full' := DroppedQueueFull,
+        'dropped.resource_not_found' := DroppedResourceNotFound,
+        'dropped.resource_stopped' := DroppedResourceStopped,
+        'matched' := Matched,
+        'queuing' := Queued,
+        'retried' := Retried,
+        'late_reply' := LateReply,
+        'failed' := SentFailed,
+        'inflight' := SentInflight,
+        'success' := SentSucc,
+        rate := RATE,
+        rate_last5m := RATE_5,
+        rate_max := RATE_MAX,
+        received := Rcvd
+    }
+).
+
 -define(EMPTY_METRICS,
     ?METRICS(
-        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
+        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
     )
 ).
 
@@ -29,6 +115,7 @@
     DroppedResourceStopped,
     Matched,
     Queued,
+    QueuedBytes,
     Retried,
     LateReply,
     SentFailed,
@@ -48,6 +135,7 @@
         'dropped.resource_stopped' => DroppedResourceStopped,
         'matched' => Matched,
         'queuing' => Queued,
+        'queuing_bytes' => QueuedBytes,
         'retried' => Retried,
         'late_reply' => LateReply,
         'failed' => SentFailed,
@@ -69,6 +157,7 @@
     DroppedResourceStopped,
     Matched,
     Queued,
+    QueuedBytes,
     Retried,
     LateReply,
     SentFailed,
@@ -88,6 +177,7 @@
         'dropped.resource_stopped' := DroppedResourceStopped,
         'matched' := Matched,
         'queuing' := Queued,
+        'queuing_bytes' := QueuedBytes,
         'retried' := Retried,
         'late_reply' := LateReply,
         'failed' := SentFailed,

+ 6 - 6
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -893,20 +893,20 @@ collect_metrics(Bridges) ->
     [#{node => Node, metrics => Metrics} || {Node, Metrics} <- Bridges].
 
 aggregate_metrics(AllMetrics) ->
-    InitMetrics = ?EMPTY_METRICS,
+    InitMetrics = ?EMPTY_METRICS_V1,
     lists:foldl(fun aggregate_metrics/2, InitMetrics, AllMetrics).
 
 aggregate_metrics(
     #{
-        metrics := ?metrics(
+        metrics := ?metrics_v1(
             M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17
         )
     },
-    ?metrics(
+    ?metrics_v1(
         N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17
     )
 ) ->
-    ?METRICS(
+    ?METRICS_V1(
         M1 + N1,
         M2 + N2,
         M3 + N3,
@@ -980,7 +980,7 @@ format_metrics(#{
 }) ->
     Queued = maps:get('queuing', Gauges, 0),
     SentInflight = maps:get('inflight', Gauges, 0),
-    ?METRICS(
+    ?METRICS_V1(
         Dropped,
         DroppedOther,
         DroppedExpired,
@@ -1003,7 +1003,7 @@ format_metrics(_Metrics) ->
     %% Empty metrics: can happen when a node joins another and a
     %% bridge is not yet replicated to it, so the counters map is
     %% empty.
-    ?METRICS(
+    ?METRICS_V1(
         _Dropped = 0,
         _DroppedOther = 0,
         _DroppedExpired = 0,

+ 7 - 3
apps/emqx_bridge/src/emqx_bridge_v2_api.erl

@@ -1296,6 +1296,7 @@ format_metrics(#{
     }
 }) ->
     Queued = maps:get('queuing', Gauges, 0),
+    QueuedBytes = maps:get('queuing_bytes', Gauges, 0),
     SentInflight = maps:get('inflight', Gauges, 0),
     ?METRICS(
         Dropped,
@@ -1306,6 +1307,7 @@ format_metrics(#{
         DroppedResourceStopped,
         Matched,
         Queued,
+        QueuedBytes,
         Retried,
         LateReply,
         SentFailed,
@@ -1332,6 +1334,7 @@ empty_metrics() ->
         _DroppedResourceStopped = 0,
         _Matched = 0,
         _Queued = 0,
+        _QueuedBytes = 0,
         _Retried = 0,
         _LateReply = 0,
         _SentFailed = 0,
@@ -1365,11 +1368,11 @@ aggregate_metrics(AllMetrics) ->
 aggregate_metrics(
     #{
         metrics := ?metrics(
-            M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17
+            M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17, M18
         )
     },
     ?metrics(
-        N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17
+        N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17, N18
     )
 ) ->
     ?METRICS(
@@ -1389,7 +1392,8 @@ aggregate_metrics(
         M14 + N14,
         M15 + N15,
         M16 + N16,
-        M17 + N17
+        M17 + N17,
+        M18 + N18
     ).
 
 fill_defaults(ConfRootKey, Type, RawConf) ->

+ 13 - 2
apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl

@@ -1491,8 +1491,19 @@ t_metrics(Config) ->
 
     ?assertMatch(
         {ok, 200, #{
-            <<"metrics">> := #{<<"matched">> := 0},
-            <<"node_metrics">> := [#{<<"metrics">> := #{<<"matched">> := 0}} | _]
+            <<"metrics">> := #{
+                <<"matched">> := 0,
+                <<"queuing_bytes">> := 0
+            },
+            <<"node_metrics">> := [
+                #{
+                    <<"metrics">> := #{
+                        <<"matched">> := 0,
+                        <<"queuing_bytes">> := 0
+                    }
+                }
+                | _
+            ]
         }},
         request_json(get, uri([?ACTIONS_ROOT, ActionID, "metrics"]), Config)
     ),

+ 1 - 1
apps/emqx_bridge_azure_event_hub/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, "4.0.2"},
+    {wolff, "4.0.3"},
     {kafka_protocol, "4.1.8"},
     {brod_gssapi, "0.1.3"},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

+ 1 - 1
apps/emqx_bridge_confluent/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, "4.0.2"},
+    {wolff, "4.0.3"},
     {kafka_protocol, "4.1.8"},
     {brod_gssapi, "0.1.3"},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

+ 1 - 1
apps/emqx_bridge_kafka/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, "4.0.2"},
+    {wolff, "4.0.3"},
     {kafka_protocol, "4.1.8"},
     {brod_gssapi, "0.1.3"},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

+ 8 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -928,6 +928,13 @@ handle_telemetry_event(
     #{bridge_id := ID}
 ) when is_integer(Val) ->
     emqx_resource_metrics:queuing_set(ID, PartitionID, Val);
+handle_telemetry_event(
+    [wolff, queuing_bytes],
+    #{gauge_set := Val},
+    #{bridge_id := ID, partition_id := PartitionID},
+    #{bridge_id := ID}
+) when is_integer(Val) ->
+    emqx_resource_metrics:queuing_bytes_set(ID, PartitionID, Val);
 handle_telemetry_event(
     [wolff, retried],
     #{counter_inc := Val},
@@ -965,6 +972,7 @@ maybe_install_wolff_telemetry_handlers(TelemetryId) ->
         [
             [wolff, dropped_queue_full],
             [wolff, queuing],
+            [wolff, queuing_bytes],
             [wolff, retried],
             [wolff, inflight]
         ],

+ 2 - 0
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -714,6 +714,8 @@ t_create_connector_while_connection_is_down(Config) ->
                 ),
                 ?assertEqual(PreviousFailed, emqx_resource_metrics:failed_get(ActionId)),
                 ?assertEqual(1, emqx_resource_metrics:queuing_get(ActionId)),
+                QueuingBytes = emqx_resource_metrics:queuing_bytes_get(ActionId),
+                ?assert(QueuingBytes > 0, #{bytes => QueuingBytes}),
                 ?assertEqual(0, emqx_resource_metrics:inflight_get(ActionId)),
                 ?assertEqual(0, emqx_resource_metrics:dropped_get(ActionId)),
                 ?assertEqual(0, emqx_resource_metrics:success_get(ActionId)),

+ 6 - 0
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -273,6 +273,7 @@ init({Id, Index, Opts}) ->
     QueueOpts = replayq_opts(Id, Index, Opts),
     Queue = replayq:open(QueueOpts),
     emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)),
+    emqx_resource_metrics:queuing_bytes_set(Id, Index, queue_bytes(Queue)),
     emqx_resource_metrics:inflight_set(Id, Index, 0),
     InflightWinSize = maps:get(inflight_window, Opts, ?DEFAULT_INFLIGHT),
     InflightTID = inflight_new(InflightWinSize),
@@ -375,6 +376,7 @@ terminate(_Reason, #{id := Id, index := Index, queue := Q}) ->
     %% since we want volatile queues, this will be 0 after
     %% termination.
     emqx_resource_metrics:queuing_set(Id, Index, 0),
+    emqx_resource_metrics:queuing_bytes_set(Id, Index, 0),
     gproc_pool:disconnect_worker(Id, {Id, Index}),
     ok.
 
@@ -1207,6 +1209,7 @@ log_expired_message_count(_Data = #{id := Id, index := Index, counters := Counte
 -spec set_gauges(data()) -> ok.
 set_gauges(_Data = #{id := Id, index := Index, queue := Q, inflight_tid := InflightTID}) ->
     emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)),
+    emqx_resource_metrics:queuing_bytes_set(Id, Index, queue_bytes(Q)),
     emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
     ok.
 
@@ -2117,6 +2120,9 @@ assert_ok_result(R) ->
 queue_count(Q) ->
     replayq:count(Q).
 
+queue_bytes(Q) ->
+    replayq:bytes(Q).
+
 disk_queue_dir(Id, Index) ->
     QDir0 = binary_to_list(Id) ++ ":" ++ integer_to_list(Index),
     QDir = filename:join([emqx:data_dir(), "bufs", node(), QDir0]),

+ 16 - 0
apps/emqx_resource/src/emqx_resource_metrics.erl

@@ -31,6 +31,8 @@
     inflight_get/1,
     queuing_set/3,
     queuing_get/1,
+    queuing_bytes_set/3,
+    queuing_bytes_get/1,
     dropped_inc/1,
     dropped_inc/2,
     dropped_get/1,
@@ -92,6 +94,7 @@ events() ->
             inflight,
             matched,
             queuing,
+            queuing_bytes,
             received,
             retried_failed,
             retried_success,
@@ -218,6 +221,8 @@ handle_gauge_telemetry_event(Event, ID, WorkerID, Val) ->
             emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'inflight', Val);
         queuing ->
             emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'queuing', Val);
+        queuing_bytes ->
+            emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'queuing_bytes', Val);
         _ ->
             ok
     end.
@@ -237,6 +242,17 @@ queuing_set(ID, WorkerID, Val) ->
 queuing_get(ID) ->
     emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'queuing').
 
+%% @doc Number of bytes currently queued. [Gauge]
+queuing_bytes_set(ID, WorkerID, Val) ->
+    telemetry:execute(
+        [?TELEMETRY_PREFIX, queuing_bytes],
+        #{gauge_set => Val},
+        #{resource_id => ID, worker_id => WorkerID}
+    ).
+
+queuing_bytes_get(ID) ->
+    emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'queuing_bytes').
+
 %% @doc Count of batches of messages that were sent asynchronously but
 %% ACKs are not yet received. [Gauge]
 inflight_set(ID, WorkerID, Val) ->

+ 14 - 2
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -1432,8 +1432,10 @@ t_delete_and_re_create_with_same_name(_Config) ->
     ),
     %% pre-condition: we should have just created a new queue
     Queuing0 = emqx_resource_metrics:queuing_get(?ID),
+    QueuingBytes0 = emqx_resource_metrics:queuing_bytes_get(?ID),
     Inflight0 = emqx_resource_metrics:inflight_get(?ID),
     ?assertEqual(0, Queuing0),
+    ?assertEqual(0, QueuingBytes0),
     ?assertEqual(0, Inflight0),
     ?check_trace(
         begin
@@ -1445,7 +1447,8 @@ t_delete_and_re_create_with_same_name(_Config) ->
                 _Timeout = 5_000
             ),
             %% ensure replayq offloads to disk
-            Payload = binary:copy(<<"a">>, 119),
+            NumBytes = 119,
+            Payload = binary:copy(<<"a">>, NumBytes),
             lists:foreach(
                 fun(N) ->
                     spawn_link(fun() ->
@@ -1468,6 +1471,13 @@ t_delete_and_re_create_with_same_name(_Config) ->
                 _Attempts0 = 20,
                 ?assert(emqx_resource_metrics:queuing_get(?ID) > 0)
             ),
+            ?retry(
+                _Sleep = 300,
+                _Attempts0 = 20,
+                %% `> NumBytes' because replayq reports total usage, not just payload, so
+                %% headers and metadata are included.
+                ?assert(emqx_resource_metrics:queuing_bytes_get(?ID) > NumBytes)
+            ),
             ?retry(
                 _Sleep = 300,
                 _Attempts0 = 20,
@@ -1501,8 +1511,10 @@ t_delete_and_re_create_with_same_name(_Config) ->
 
             %% it shouldn't have anything enqueued, as it's a fresh resource
             Queuing2 = emqx_resource_metrics:queuing_get(?ID),
-            Inflight2 = emqx_resource_metrics:queuing_get(?ID),
+            QueuingBytes2 = emqx_resource_metrics:queuing_bytes_get(?ID),
+            Inflight2 = emqx_resource_metrics:inflight_get(?ID),
             ?assertEqual(0, Queuing2),
+            ?assertEqual(0, QueuingBytes2),
             ?assertEqual(0, Inflight2),
 
             ok

+ 2 - 0
changes/ce/feat-14065.en.md

@@ -0,0 +1,2 @@
+Added the new `queuing_bytes` to available data integration metrics.  This metric indicates how much RAM and/or disk resources the buffering of a given action is consuming.
+Currently, only Pulsar Producer action lacks support for this metric.

+ 1 - 1
mix.exs

@@ -274,7 +274,7 @@ defmodule EMQXUmbrella.MixProject do
   def common_dep(:influxdb),
     do: {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}
 
-  def common_dep(:wolff), do: {:wolff, "4.0.2"}
+  def common_dep(:wolff), do: {:wolff, "4.0.3"}
   def common_dep(:brod_gssapi), do: {:brod_gssapi, "0.1.3"}
 
   def common_dep(:kafka_protocol),