Explorar el Código

fix: topics and retained metrics are cluster consistented

JimMoen hace 2 años
padre
commit
4a1d1b6aeb
Se han modificado 1 ficheros con 43 adiciones y 7 borrados
  1. 43 7
      apps/emqx_prometheus/src/emqx_prometheus.erl

+ 43 - 7
apps/emqx_prometheus/src/emqx_prometheus.erl

@@ -192,6 +192,11 @@ collect_mf(?PROMETHEUS_DEFAULT_REGISTRY, Callback) ->
     RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
     RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
     %% TODO: license expiry epoch and cert expiry epoch should be cached
     %% TODO: license expiry epoch and cert expiry epoch should be cached
     ok = add_collect_family(Callback, stats_metric_meta(), ?MG(stats_data, RawData)),
     ok = add_collect_family(Callback, stats_metric_meta(), ?MG(stats_data, RawData)),
+    ok = add_collect_family(
+        Callback,
+        stats_metric_cluster_consistened_meta(),
+        ?MG(stats_data_cluster_consistented, RawData)
+    ),
     ok = add_collect_family(Callback, vm_metric_meta(), ?MG(vm_data, RawData)),
     ok = add_collect_family(Callback, vm_metric_meta(), ?MG(vm_data, RawData)),
     ok = add_collect_family(Callback, cluster_metric_meta(), ?MG(cluster_data, RawData)),
     ok = add_collect_family(Callback, cluster_metric_meta(), ?MG(cluster_data, RawData)),
 
 
@@ -214,8 +219,8 @@ collect_mf(_Registry, _Callback) ->
 collect(<<"json">>) ->
 collect(<<"json">>) ->
     RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
     RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
     (maybe_license_collect_json_data(RawData))#{
     (maybe_license_collect_json_data(RawData))#{
-        stats => collect_json_data(?MG(stats_data, RawData)),
-        metrics => collect_json_data(?MG(vm_data, RawData)),
+        stats => collect_stats_json_data(RawData),
+        metrics => collect_vm_json_data(?MG(vm_data, RawData)),
         packets => collect_json_data(?MG(emqx_packet_data, RawData)),
         packets => collect_json_data(?MG(emqx_packet_data, RawData)),
         messages => collect_json_data(?MG(emqx_message_data, RawData)),
         messages => collect_json_data(?MG(emqx_message_data, RawData)),
         delivery => collect_json_data(?MG(emqx_delivery_data, RawData)),
         delivery => collect_json_data(?MG(emqx_delivery_data, RawData)),
@@ -259,6 +264,7 @@ fetch_from_local_node(Mode) ->
 
 
 fetch_cluster_consistented_data() ->
 fetch_cluster_consistented_data() ->
     (maybe_license_fetch_data())#{
     (maybe_license_fetch_data())#{
+        stats_data_cluster_consistented => stats_data_cluster_consistented(),
         cert_data => cert_data()
         cert_data => cert_data()
     }.
     }.
 
 
@@ -477,8 +483,6 @@ stats_metric_meta() ->
         {emqx_channels_count, counter, 'channels.count'},
         {emqx_channels_count, counter, 'channels.count'},
         {emqx_channels_max, counter, 'channels.max'},
         {emqx_channels_max, counter, 'channels.max'},
         %% pub/sub stats
         %% pub/sub stats
-        {emqx_topics_count, counter, 'topics.count'},
-        {emqx_topics_max, counter, 'topics.max'},
         {emqx_suboptions_count, counter, 'suboptions.count'},
         {emqx_suboptions_count, counter, 'suboptions.count'},
         {emqx_suboptions_max, counter, 'suboptions.max'},
         {emqx_suboptions_max, counter, 'suboptions.max'},
         {emqx_subscribers_count, counter, 'subscribers.count'},
         {emqx_subscribers_count, counter, 'subscribers.count'},
@@ -487,14 +491,21 @@ stats_metric_meta() ->
         {emqx_subscriptions_max, counter, 'subscriptions.max'},
         {emqx_subscriptions_max, counter, 'subscriptions.max'},
         {emqx_subscriptions_shared_count, counter, 'subscriptions.shared.count'},
         {emqx_subscriptions_shared_count, counter, 'subscriptions.shared.count'},
         {emqx_subscriptions_shared_max, counter, 'subscriptions.shared.max'},
         {emqx_subscriptions_shared_max, counter, 'subscriptions.shared.max'},
-        %% retained
-        {emqx_retained_count, counter, 'retained.count'},
-        {emqx_retained_max, counter, 'retained.max'},
         %% delayed
         %% delayed
         {emqx_delayed_count, counter, 'delayed.count'},
         {emqx_delayed_count, counter, 'delayed.count'},
         {emqx_delayed_max, counter, 'delayed.max'}
         {emqx_delayed_max, counter, 'delayed.max'}
     ].
     ].
 
 
+stats_metric_cluster_consistened_meta() ->
+    [
+        %% topics
+        {emqx_topics_max, counter, 'topics.max'},
+        {emqx_topics_count, counter, 'topics.count'},
+        %% retained
+        {emqx_retained_count, counter, 'retained.count'},
+        {emqx_retained_max, counter, 'retained.max'}
+    ].
+
 stats_data(Mode) ->
 stats_data(Mode) ->
     Stats = emqx_stats:getstats(),
     Stats = emqx_stats:getstats(),
     lists:foldl(
     lists:foldl(
@@ -505,6 +516,16 @@ stats_data(Mode) ->
         stats_metric_meta()
         stats_metric_meta()
     ).
     ).
 
 
+stats_data_cluster_consistented() ->
+    Stats = emqx_stats:getstats(),
+    lists:foldl(
+        fun({Name, _Type, MetricKAtom}, AccIn) ->
+            AccIn#{Name => [{[], ?C(MetricKAtom, Stats)}]}
+        end,
+        #{},
+        stats_metric_cluster_consistened_meta()
+    ).
+
 %%========================================
 %%========================================
 %% Erlang VM
 %% Erlang VM
 %%========================================
 %%========================================
@@ -875,10 +896,25 @@ date_to_expiry_epoch(DateTime) ->
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 %% merge / zip formatting funcs for type `application/json`
 %% merge / zip formatting funcs for type `application/json`
 
 
+collect_stats_json_data(RawData) ->
+    StatsData = ?MG(stats_data, RawData),
+    StatsClData = ?MG(stats_data_cluster_consistented, RawData),
+    D = maps:merge(StatsData, StatsClData),
+    collect_json_data(D).
+
 %% always return json array
 %% always return json array
 collect_cert_json_data(Data) ->
 collect_cert_json_data(Data) ->
     collect_json_data_(Data).
     collect_json_data_(Data).
 
 
+collect_vm_json_data(Data) ->
+    DataListPerNode = collect_json_data_(Data),
+    case {?GET_PROM_DATA_MODE(), DataListPerNode} of
+        {?PROM_DATA_MODE__NODE, [NData | _]} ->
+            NData;
+        {_, _} ->
+            DataListPerNode
+    end.
+
 collect_json_data(Data0) ->
 collect_json_data(Data0) ->
     DataListPerNode = collect_json_data_(Data0),
     DataListPerNode = collect_json_data_(Data0),
     case {?GET_PROM_DATA_MODE(), DataListPerNode} of
     case {?GET_PROM_DATA_MODE(), DataListPerNode} of