Просмотр исходного кода

fix(prom_di): use bridge_v2 metrics api

JimMoen 2 лет назад
Родитель
Сommit
a752119d05

+ 3 - 0
apps/emqx_bridge/src/emqx_bridge_v2_api.erl

@@ -1177,6 +1177,9 @@ format_resource(
         )
     ).
 
+%% FIXME:
+%% missing metrics:
+%% 'retried.success' and 'retried.failed'
 format_metrics(#{
     counters := #{
         'dropped' := Dropped,

+ 34 - 35
apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl

@@ -69,21 +69,24 @@
 %% Callback for emqx_prometheus_cluster
 %%--------------------------------------------------------------------
 
+-define(ROOT_KEY_ACTIONS, actions).
+
 fetch_from_local_node(Mode) ->
     Rules = emqx_rule_engine:get_rules(),
-    Bridges = emqx_bridge:list(),
+    Bridges = emqx_bridge_v2:list(?ROOT_KEY_ACTIONS),
+    Connectors = emqx_connector:list(),
     {node(self()), #{
         rule_metric_data => rule_metric_data(Mode, Rules),
         action_metric_data => action_metric_data(Mode, Bridges),
-        connector_metric_data => connector_metric_data(Mode, Bridges)
+        connector_metric_data => connector_metric_data(Mode, Connectors)
     }}.
 
 fetch_cluster_consistented_data() ->
     Rules = emqx_rule_engine:get_rules(),
-    Bridges = emqx_bridge:list(),
+    Connectors = emqx_connector:list(),
     (maybe_collect_schema_registry())#{
         rules_ov_data => rules_ov_data(Rules),
-        connectors_ov_data => connectors_ov_data(Bridges)
+        connectors_ov_data => connectors_ov_data(Connectors)
     }.
 
 aggre_or_zip_init_acc() ->
@@ -293,10 +296,10 @@ connectors_ov_metric_meta() ->
 connectors_ov_metric(names) ->
     emqx_prometheus_cluster:metric_names(connectors_ov_metric_meta()).
 
-connectors_ov_data(Brdiges) ->
+connectors_ov_data(Connectors) ->
     #{
         %% Both Bridge V1 and V2
-        emqx_connectors_count => erlang:length(Brdiges)
+        emqx_connectors_count => erlang:length(Connectors)
     }.
 
 %%========================================
@@ -416,29 +419,25 @@ action_point(Mode, Id, V) ->
     {with_node_label(Mode, [{id, Id}]), V}.
 
 get_bridge_metric(Type, Name) ->
-    case emqx_bridge:get_metrics(Type, Name) of
-        #{counters := Counters, gauges := Gauges} ->
-            #{
-                emqx_action_matched => ?MG0(matched, Counters),
-                emqx_action_dropped => ?MG0(dropped, Counters),
-                emqx_action_success => ?MG0(success, Counters),
-                emqx_action_failed => ?MG0(failed, Counters),
-                emqx_action_inflight => ?MG0(inflight, Gauges),
-                emqx_action_received => ?MG0(received, Counters),
-                emqx_action_late_reply => ?MG0(late_reply, Counters),
-                emqx_action_retried => ?MG0(retried, Counters),
-                emqx_action_retried_success => ?MG0('retried.success', Counters),
-                emqx_action_retried_failed => ?MG0('retried.failed', Counters),
-                emqx_action_dropped_resource_stopped => ?MG0('dropped.resource_stopped', Counters),
-                emqx_action_dropped_resource_not_found => ?MG0(
-                    'dropped.resource_not_found', Counters
-                ),
-                emqx_action_dropped_queue_full => ?MG0('dropped.queue_full', Counters),
-                emqx_action_dropped_other => ?MG0('dropped.other', Counters),
-                emqx_action_dropped_expired => ?MG0('dropped.expired', Counters),
-                emqx_action_queuing => ?MG0(queuing, Gauges)
-            }
-    end.
+    #{counters := Counters, gauges := Gauges} = emqx_bridge_v2:get_metrics(Type, Name),
+    #{
+        emqx_action_matched => ?MG0(matched, Counters),
+        emqx_action_dropped => ?MG0(dropped, Counters),
+        emqx_action_success => ?MG0(success, Counters),
+        emqx_action_failed => ?MG0(failed, Counters),
+        emqx_action_inflight => ?MG0(inflight, Gauges),
+        emqx_action_received => ?MG0(received, Counters),
+        emqx_action_late_reply => ?MG0(late_reply, Counters),
+        emqx_action_retried => ?MG0(retried, Counters),
+        emqx_action_retried_success => ?MG0('retried.success', Counters),
+        emqx_action_retried_failed => ?MG0('retried.failed', Counters),
+        emqx_action_dropped_resource_stopped => ?MG0('dropped.resource_stopped', Counters),
+        emqx_action_dropped_resource_not_found => ?MG0('dropped.resource_not_found', Counters),
+        emqx_action_dropped_queue_full => ?MG0('dropped.queue_full', Counters),
+        emqx_action_dropped_other => ?MG0('dropped.other', Counters),
+        emqx_action_dropped_expired => ?MG0('dropped.expired', Counters),
+        emqx_action_queuing => ?MG0(queuing, Gauges)
+    }.
 
 %%====================
 %% Connector Metric
@@ -453,14 +452,14 @@ connector_metric_meta() ->
 connectr_metric(names) ->
     emqx_prometheus_cluster:metric_names(connector_metric_meta()).
 
-connector_metric_data(Mode, Bridges) ->
+connector_metric_data(Mode, Connectors) ->
     lists:foldl(
-        fun(#{type := Type, name := Name} = Bridge, AccIn) ->
-            Id = emqx_bridge_resource:bridge_id(Type, Name),
-            merge_acc_with_connectors(Mode, Id, get_connector_status(Bridge), AccIn)
+        fun(#{type := Type, name := Name} = Connector, AccIn) ->
+            Id = emqx_connector_resource:connector_id(Type, Name),
+            merge_acc_with_connectors(Mode, Id, get_connector_status(Connector), AccIn)
         end,
         maps:from_keys(connectr_metric(names), []),
-        Bridges
+        Connectors
     ).
 
 merge_acc_with_connectors(Mode, Id, ConnectorMetrics, PointsAcc) ->
@@ -475,7 +474,7 @@ merge_acc_with_connectors(Mode, Id, ConnectorMetrics, PointsAcc) ->
 connector_point(Mode, Id, V) ->
     {with_node_label(Mode, [{id, Id}]), V}.
 
-get_connector_status(#{resource_data := ResourceData} = _Bridge) ->
+get_connector_status(#{resource_data := ResourceData} = _Connector) ->
     Enabled = emqx_utils_maps:deep_get([config, enable], ResourceData),
     Status = ?MG(status, ResourceData),
     #{