Explorar o código

fix(prom): cluster aggre/unaggre labels

JimMoen %!s(int64=2) %!d(string=hai) anos
pai
achega
731efd8b49

+ 36 - 22
apps/emqx_prometheus/src/emqx_prometheus.erl

@@ -24,7 +24,7 @@
 
 -behaviour(emqx_prometheus_cluster).
 -export([
-    fetch_data_from_local_node/0,
+    fetch_from_local_node/1,
     fetch_cluster_consistented_data/0,
     aggre_or_zip_init_acc/0,
     logic_sum_metrics/0
@@ -241,20 +241,20 @@ add_collect_family(Name, Data, Callback, Type) ->
     Callback(create_mf(Name, _Help = <<"">>, Type, ?MODULE, Data)).
 
 %% behaviour
-fetch_data_from_local_node() ->
+fetch_from_local_node(Mode) ->
     {node(self()), #{
-        stats_data => stats_data(),
-        vm_data => vm_data(),
-        cluster_data => cluster_data(),
+        stats_data => stats_data(Mode),
+        vm_data => vm_data(Mode),
+        cluster_data => cluster_data(Mode),
         %% Metrics
-        emqx_packet_data => emqx_metric_data(emqx_packet_metric_meta()),
-        emqx_message_data => emqx_metric_data(message_metric_meta()),
-        emqx_delivery_data => emqx_metric_data(delivery_metric_meta()),
-        emqx_client_data => emqx_metric_data(client_metric_meta()),
-        emqx_session_data => emqx_metric_data(session_metric_meta()),
-        emqx_olp_data => emqx_metric_data(olp_metric_meta()),
-        emqx_acl_data => emqx_metric_data(acl_metric_meta()),
-        emqx_authn_data => emqx_metric_data(authn_metric_meta())
+        emqx_packet_data => emqx_metric_data(emqx_packet_metric_meta(), Mode),
+        emqx_message_data => emqx_metric_data(message_metric_meta(), Mode),
+        emqx_delivery_data => emqx_metric_data(delivery_metric_meta(), Mode),
+        emqx_client_data => emqx_metric_data(client_metric_meta(), Mode),
+        emqx_session_data => emqx_metric_data(session_metric_meta(), Mode),
+        emqx_olp_data => emqx_metric_data(olp_metric_meta(), Mode),
+        emqx_acl_data => emqx_metric_data(acl_metric_meta(), Mode),
+        emqx_authn_data => emqx_metric_data(authn_metric_meta(), Mode)
     }}.
 
 fetch_cluster_consistented_data() ->
@@ -495,11 +495,11 @@ stats_metric_meta() ->
         {emqx_delayed_max, counter, 'delayed.max'}
     ].
 
-stats_data() ->
+stats_data(Mode) ->
     Stats = emqx_stats:getstats(),
     lists:foldl(
         fun({Name, _Type, MetricKAtom}, AccIn) ->
-            AccIn#{Name => [{[], ?C(MetricKAtom, Stats)}]}
+            AccIn#{Name => [{with_node_label(Mode, []), ?C(MetricKAtom, Stats)}]}
         end,
         #{},
         stats_metric_meta()
@@ -519,11 +519,18 @@ vm_metric_meta() ->
         {emqx_vm_used_memory, gauge, 'used_memory'}
     ].
 
-vm_data() ->
+vm_data(Mode) ->
     VmStats = emqx_mgmt:vm_stats(),
     lists:foldl(
         fun({Name, _Type, MetricKAtom}, AccIn) ->
-            AccIn#{Name => [{[], ?C(MetricKAtom, VmStats)}]}
+            Labels =
+                case Mode of
+                    node ->
+                        [];
+                    _ ->
+                        [{node, node(self())}]
+                end,
+            AccIn#{Name => [{Labels, ?C(MetricKAtom, VmStats)}]}
         end,
         #{},
         vm_metric_meta()
@@ -539,23 +546,23 @@ cluster_metric_meta() ->
         {emqx_cluster_nodes_stopped, gauge, undefined}
     ].
 
-cluster_data() ->
+cluster_data(Mode) ->
     Running = emqx:cluster_nodes(running),
     Stopped = emqx:cluster_nodes(stopped),
     #{
-        emqx_cluster_nodes_running => [{[], length(Running)}],
-        emqx_cluster_nodes_stopped => [{[], length(Stopped)}]
+        emqx_cluster_nodes_running => [{with_node_label(Mode, []), length(Running)}],
+        emqx_cluster_nodes_stopped => [{with_node_label(Mode, []), length(Stopped)}]
     }.
 
 %%========================================
 %% Metrics
 %%========================================
 
-emqx_metric_data(MetricNameTypeKeyL) ->
+emqx_metric_data(MetricNameTypeKeyL, Mode) ->
     Metrics = emqx_metrics:all(),
     lists:foldl(
         fun({Name, _Type, MetricKAtom}, AccIn) ->
-            AccIn#{Name => [{[], ?C(MetricKAtom, Metrics)}]}
+            AccIn#{Name => [{with_node_label(Mode, []), ?C(MetricKAtom, Metrics)}]}
         end,
         #{},
         MetricNameTypeKeyL
@@ -911,6 +918,13 @@ zip_json_prom_stats_metrics(Key, Points, AllResultedAcc) ->
 metrics_name(MetricsAll) ->
     [Name || {Name, _, _} <- MetricsAll].
 
+with_node_label(?PROM_DATA_MODE__NODE, Labels) ->
+    Labels;
+with_node_label(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Labels) ->
+    Labels;
+with_node_label(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, Labels) ->
+    [{node, node(self())} | Labels].
+
 %%--------------------------------------------------------------------
 %% bpapi
 

+ 37 - 24
apps/emqx_prometheus/src/emqx_prometheus_auth.erl

@@ -26,7 +26,7 @@
 %% for bpapi
 -behaviour(emqx_prometheus_cluster).
 -export([
-    fetch_data_from_local_node/0,
+    fetch_from_local_node/1,
     fetch_cluster_consistented_data/0,
     aggre_or_zip_init_acc/0,
     logic_sum_metrics/0
@@ -126,10 +126,10 @@ collect_metrics(Name, Metrics) ->
     collect_auth(Name, Metrics).
 
 %% behaviour
-fetch_data_from_local_node() ->
+fetch_from_local_node(Mode) ->
     {node(self()), #{
-        authn_data => authn_data(),
-        authz_data => authz_data()
+        authn_data => authn_data(Mode),
+        authz_data => authz_data(Mode)
     }}.
 
 fetch_cluster_consistented_data() ->
@@ -224,38 +224,41 @@ authn_metric_meta() ->
 authn_metric(names) ->
     emqx_prometheus_cluster:metric_names(authn_metric_meta()).
 
--spec authn_data() -> #{Key => [Point]} when
+-spec authn_data(atom()) -> #{Key => [Point]} when
     Key :: authn_metric_name(),
     Point :: {[Label], Metric},
     Label :: IdLabel,
     IdLabel :: {id, AuthnName :: binary()},
     Metric :: number().
-authn_data() ->
+authn_data(Mode) ->
     Authns = emqx_config:get([authentication]),
     lists:foldl(
         fun(Key, AccIn) ->
-            AccIn#{Key => authn_backend_to_points(Key, Authns)}
+            AccIn#{Key => authn_backend_to_points(Mode, Key, Authns)}
         end,
         #{},
         authn_metric(names)
     ).
 
--spec authn_backend_to_points(Key, list(Authn)) -> list(Point) when
+-spec authn_backend_to_points(atom(), Key, list(Authn)) -> list(Point) when
     Key :: authn_metric_name(),
     Authn :: map(),
     Point :: {[Label], Metric},
     Label :: IdLabel,
     IdLabel :: {id, AuthnName :: binary()},
     Metric :: number().
-authn_backend_to_points(Key, Authns) ->
-    do_authn_backend_to_points(Key, Authns, []).
+authn_backend_to_points(Mode, Key, Authns) ->
+    do_authn_backend_to_points(Mode, Key, Authns, []).
 
-do_authn_backend_to_points(_K, [], AccIn) ->
+do_authn_backend_to_points(_Mode, _K, [], AccIn) ->
     lists:reverse(AccIn);
-do_authn_backend_to_points(K, [Authn | Rest], AccIn) ->
+do_authn_backend_to_points(Mode, K, [Authn | Rest], AccIn) ->
     Id = authenticator_id(Authn),
-    Point = {[{id, Id}], do_metric(K, Authn, lookup_authn_metrics_local(Id))},
-    do_authn_backend_to_points(K, Rest, [Point | AccIn]).
+    Point = {
+        with_node_label(Mode, [{id, Id}]),
+        do_metric(K, Authn, lookup_authn_metrics_local(Id))
+    },
+    do_authn_backend_to_points(Mode, K, Rest, [Point | AccIn]).
 
 lookup_authn_metrics_local(Id) ->
     case emqx_authn_api:lookup_from_local_node(?GLOBAL, Id) of
@@ -317,38 +320,41 @@ authz_metric_meta() ->
 authz_metric(names) ->
     emqx_prometheus_cluster:metric_names(authz_metric_meta()).
 
--spec authz_data() -> #{Key => [Point]} when
+-spec authz_data(atom()) -> #{Key => [Point]} when
     Key :: authz_metric_name(),
     Point :: {[Label], Metric},
     Label :: TypeLabel,
     TypeLabel :: {type, AuthZType :: binary()},
     Metric :: number().
-authz_data() ->
+authz_data(Mode) ->
     Authzs = emqx_config:get([authorization, sources]),
     lists:foldl(
         fun(Key, AccIn) ->
-            AccIn#{Key => authz_backend_to_points(Key, Authzs)}
+            AccIn#{Key => authz_backend_to_points(Mode, Key, Authzs)}
         end,
         #{},
         authz_metric(names)
     ).
 
--spec authz_backend_to_points(Key, list(Authz)) -> list(Point) when
+-spec authz_backend_to_points(atom(), Key, list(Authz)) -> list(Point) when
     Key :: authz_metric_name(),
     Authz :: map(),
     Point :: {[Label], Metric},
     Label :: TypeLabel,
     TypeLabel :: {type, AuthZType :: binary()},
     Metric :: number().
-authz_backend_to_points(Key, Authzs) ->
-    do_authz_backend_to_points(Key, Authzs, []).
+authz_backend_to_points(Mode, Key, Authzs) ->
+    do_authz_backend_to_points(Mode, Key, Authzs, []).
 
-do_authz_backend_to_points(_K, [], AccIn) ->
+do_authz_backend_to_points(_Mode, _K, [], AccIn) ->
     lists:reverse(AccIn);
-do_authz_backend_to_points(K, [Authz | Rest], AccIn) ->
+do_authz_backend_to_points(Mode, K, [Authz | Rest], AccIn) ->
     Type = maps:get(type, Authz),
-    Point = {[{type, Type}], do_metric(K, Authz, lookup_authz_metrics_local(Type))},
-    do_authz_backend_to_points(K, Rest, [Point | AccIn]).
+    Point = {
+        with_node_label(Mode, [{type, Type}]),
+        do_metric(K, Authz, lookup_authz_metrics_local(Type))
+    },
+    do_authz_backend_to_points(Mode, K, Rest, [Point | AccIn]).
 
 lookup_authz_metrics_local(Type) ->
     case emqx_authz_api_sources:lookup_from_local_node(Type) of
@@ -481,3 +487,10 @@ do_metric(emqx_authn_enable, #{enable := B}, _) ->
     emqx_prometheus_cluster:boolean_to_number(B);
 do_metric(K, _, Metrics) ->
     ?MG0(K, Metrics).
+
+with_node_label(?PROM_DATA_MODE__NODE, Labels) ->
+    Labels;
+with_node_label(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Labels) ->
+    Labels;
+with_node_label(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, Labels) ->
+    [{node, node(self())} | Labels].

+ 45 - 45
apps/emqx_prometheus/src/emqx_prometheus_cluster.erl

@@ -23,7 +23,7 @@
     collect_json_data/2,
 
     aggre_cluster/3,
-    with_node_name_label/2,
+    %% with_node_name_label/2,
 
     point_to_map_fun/1,
 
@@ -34,7 +34,7 @@
 
 -callback fetch_cluster_consistented_data() -> map().
 
--callback fetch_data_from_local_node() -> {node(), map()}.
+-callback fetch_from_local_node(atom()) -> {node(), map()}.
 
 -callback aggre_or_zip_init_acc() -> map().
 
@@ -46,23 +46,23 @@
 raw_data(Module, undefined) ->
     %% TODO: for push gateway, the format mode should be configurable
     raw_data(Module, ?PROM_DATA_MODE__NODE);
-raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_AGGREGATED) ->
-    AllNodesMetrics = aggre_cluster(Module),
+raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_AGGREGATED = Mode) ->
+    AllNodesMetrics = aggre_cluster(Module, Mode),
     Cluster = Module:fetch_cluster_consistented_data(),
     maps:merge(AllNodesMetrics, Cluster);
-raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) ->
-    AllNodesMetrics = with_node_name_label(Module),
+raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED = Mode) ->
+    AllNodesMetrics = zip_cluster_data(Module, Mode),
     Cluster = Module:fetch_cluster_consistented_data(),
     maps:merge(AllNodesMetrics, Cluster);
-raw_data(Module, ?PROM_DATA_MODE__NODE) ->
-    {_Node, LocalNodeMetrics} = Module:fetch_data_from_local_node(),
+raw_data(Module, ?PROM_DATA_MODE__NODE = Mode) ->
+    {_Node, LocalNodeMetrics} = Module:fetch_from_local_node(Mode),
     Cluster = Module:fetch_cluster_consistented_data(),
     maps:merge(LocalNodeMetrics, Cluster).
 
-metrics_data_from_all_nodes(Module) ->
+fetch_data_from_all_nodes(Module, Mode) ->
     Nodes = mria:running_nodes(),
     _ResL = emqx_prometheus_proto_v2:raw_prom_data(
-        Nodes, Module, fetch_data_from_local_node, []
+        Nodes, Module, fetch_from_local_node, [Mode]
     ).
 
 collect_json_data(Data, Func) when is_function(Func, 3) ->
@@ -76,17 +76,13 @@ collect_json_data(Data, Func) when is_function(Func, 3) ->
 collect_json_data(_, _) ->
     error(badarg).
 
-aggre_cluster(Module) ->
+aggre_cluster(Module, Mode) ->
     do_aggre_cluster(
         Module:logic_sum_metrics(),
-        metrics_data_from_all_nodes(Module),
+        fetch_data_from_all_nodes(Module, Mode),
         Module:aggre_or_zip_init_acc()
     ).
 
-with_node_name_label(Module) ->
-    ResL = metrics_data_from_all_nodes(Module),
-    do_with_node_name_label(ResL, Module:aggre_or_zip_init_acc()).
-
 aggre_cluster(LogicSumKs, ResL, Init) ->
     do_aggre_cluster(LogicSumKs, ResL, Init).
 
@@ -121,61 +117,65 @@ aggre_metric(LogicSumKs, NodeMetrics, AccIn0) ->
 
 do_aggre_metric(K, LogicSumKs, NodeMetrics, AccL) ->
     lists:foldl(
-        fun({Labels, Metric}, AccIn) ->
-            NMetric =
-                case lists:member(K, LogicSumKs) of
-                    true ->
-                        logic_sum(Metric, ?PG0(Labels, AccIn));
-                    false ->
-                        Metric + ?PG0(Labels, AccIn)
-                end,
-            [{Labels, NMetric} | AccIn]
+        fun(Point = {_Labels, _Metric}, AccIn) ->
+            sum(K, LogicSumKs, Point, AccIn)
         end,
         AccL,
         NodeMetrics
     ).
 
-with_node_name_label(ResL, Init) ->
-    do_with_node_name_label(ResL, Init).
+sum(K, LogicSumKs, {Labels, Metric} = Point, MetricAccL) ->
+    case lists:keytake(Labels, 1, MetricAccL) of
+        {value, {Labels, MetricAcc}, NMetricAccL} ->
+            NPoint = {Labels, do_sum(K, LogicSumKs, Metric, MetricAcc)},
+            [NPoint | NMetricAccL];
+        false ->
+            [Point | MetricAccL]
+    end.
+
+do_sum(K, LogicSumKs, Metric, MetricAcc) ->
+    case lists:member(K, LogicSumKs) of
+        true ->
+            logic_sum(Metric, MetricAcc);
+        false ->
+            Metric + MetricAcc
+    end.
+
+zip_cluster_data(Module, Mode) ->
+    zip_cluster(
+        fetch_data_from_all_nodes(Module, Mode),
+        Module:aggre_or_zip_init_acc()
+    ).
 
-do_with_node_name_label([], AccIn) ->
+zip_cluster([], AccIn) ->
     AccIn;
-do_with_node_name_label([{ok, {NodeName, NodeMetric}} | Rest], AccIn) ->
-    do_with_node_name_label(
+zip_cluster([{ok, {_NodeName, NodeMetric}} | Rest], AccIn) ->
+    zip_cluster(
         Rest,
         maps:fold(
             fun(K, V, AccIn0) ->
                 AccIn0#{
-                    K => zip_with_node_name(NodeName, V, ?MG(K, AccIn0))
+                    K => do_zip_cluster(V, ?MG(K, AccIn0))
                 }
             end,
             AccIn,
             NodeMetric
         )
     );
-do_with_node_name_label([{_, _} | Rest], AccIn) ->
-    do_with_node_name_label(Rest, AccIn).
+zip_cluster([{_, _} | Rest], AccIn) ->
+    zip_cluster(Rest, AccIn).
 
-zip_with_node_name(NodeName, NodeMetrics, AccIn0) ->
+do_zip_cluster(NodeMetrics, AccIn0) ->
     lists:foldl(
         fun(K, AccIn) ->
-            NAccL = do_zip_with_node_name(NodeName, ?MG(K, NodeMetrics), ?MG(K, AccIn)),
+            AccMetricL = ?MG(K, AccIn),
+            NAccL = ?MG(K, NodeMetrics) ++ AccMetricL,
             AccIn#{K => NAccL}
         end,
         AccIn0,
         maps:keys(NodeMetrics)
     ).
 
-do_zip_with_node_name(NodeName, NodeMetrics, AccL) ->
-    lists:foldl(
-        fun({Labels, Metric}, AccIn) ->
-            NLabels = [{node, NodeName} | Labels],
-            [{NLabels, Metric} | AccIn]
-        end,
-        AccL,
-        NodeMetrics
-    ).
-
 point_to_map_fun(Key) ->
     fun({Lables, Metric}, AccIn2) ->
         LablesKVMap = maps:from_list(Lables),

+ 33 - 23
apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl

@@ -31,7 +31,7 @@
 %% for bpapi
 -behaviour(emqx_prometheus_cluster).
 -export([
-    fetch_data_from_local_node/0,
+    fetch_from_local_node/1,
     fetch_cluster_consistented_data/0,
     aggre_or_zip_init_acc/0,
     logic_sum_metrics/0
@@ -69,13 +69,13 @@
 %% Callback for emqx_prometheus_cluster
 %%--------------------------------------------------------------------
 
-fetch_data_from_local_node() ->
+fetch_from_local_node(Mode) ->
     Rules = emqx_rule_engine:get_rules(),
     Bridges = emqx_bridge:list(),
     {node(self()), #{
-        rule_metric_data => rule_metric_data(Rules),
-        action_metric_data => action_metric_data(Bridges),
-        connector_metric_data => connector_metric_data(Bridges)
+        rule_metric_data => rule_metric_data(Mode, Rules),
+        action_metric_data => action_metric_data(Mode, Bridges),
+        connector_metric_data => connector_metric_data(Mode, Bridges)
     }}.
 
 fetch_cluster_consistented_data() ->
@@ -325,26 +325,26 @@ rule_metric_meta() ->
 rule_metric(names) ->
     emqx_prometheus_cluster:metric_names(rule_metric_meta()).
 
-rule_metric_data(Rules) ->
+rule_metric_data(Mode, Rules) ->
     lists:foldl(
         fun(#{id := Id} = Rule, AccIn) ->
-            merge_acc_with_rules(Id, get_metric(Rule), AccIn)
+            merge_acc_with_rules(Mode, Id, get_metric(Rule), AccIn)
         end,
         maps:from_keys(rule_metric(names), []),
         Rules
     ).
 
-merge_acc_with_rules(Id, RuleMetrics, PointsAcc) ->
+merge_acc_with_rules(Mode, Id, RuleMetrics, PointsAcc) ->
     maps:fold(
         fun(K, V, AccIn) ->
-            AccIn#{K => [rule_point(Id, V) | ?MG(K, AccIn)]}
+            AccIn#{K => [rule_point(Mode, Id, V) | ?MG(K, AccIn)]}
         end,
         PointsAcc,
         RuleMetrics
     ).
 
-rule_point(Id, V) ->
-    {[{id, Id}], V}.
+rule_point(Mode, Id, V) ->
+    {with_node_label(Mode, [{id, Id}]), V}.
 
 get_metric(#{id := Id, enable := Bool} = _Rule) ->
     case emqx_metrics_worker:get_metrics(rule_metrics, Id) of
@@ -393,27 +393,27 @@ action_metric_meta() ->
 action_metric(names) ->
     emqx_prometheus_cluster:metric_names(action_metric_meta()).
 
-action_metric_data(Bridges) ->
+action_metric_data(Mode, Bridges) ->
     lists:foldl(
         fun(#{type := Type, name := Name} = _Bridge, AccIn) ->
             Id = emqx_bridge_resource:bridge_id(Type, Name),
-            merge_acc_with_bridges(Id, get_bridge_metric(Type, Name), AccIn)
+            merge_acc_with_bridges(Mode, Id, get_bridge_metric(Type, Name), AccIn)
         end,
         maps:from_keys(action_metric(names), []),
         Bridges
     ).
 
-merge_acc_with_bridges(Id, BridgeMetrics, PointsAcc) ->
+merge_acc_with_bridges(Mode, Id, BridgeMetrics, PointsAcc) ->
     maps:fold(
         fun(K, V, AccIn) ->
-            AccIn#{K => [action_point(Id, V) | ?MG(K, AccIn)]}
+            AccIn#{K => [action_point(Mode, Id, V) | ?MG(K, AccIn)]}
         end,
         PointsAcc,
         BridgeMetrics
     ).
 
-action_point(Id, V) ->
-    {[{id, Id}], V}.
+action_point(Mode, Id, V) ->
+    {with_node_label(Mode, [{id, Id}]), V}.
 
 get_bridge_metric(Type, Name) ->
     case emqx_bridge:get_metrics(Type, Name) of
@@ -453,27 +453,27 @@ connector_metric_meta() ->
 connectr_metric(names) ->
     emqx_prometheus_cluster:metric_names(connector_metric_meta()).
 
-connector_metric_data(Bridges) ->
+connector_metric_data(Mode, Bridges) ->
     lists:foldl(
         fun(#{type := Type, name := Name} = Bridge, AccIn) ->
             Id = emqx_bridge_resource:bridge_id(Type, Name),
-            merge_acc_with_connectors(Id, get_connector_status(Bridge), AccIn)
+            merge_acc_with_connectors(Mode, Id, get_connector_status(Bridge), AccIn)
         end,
         maps:from_keys(connectr_metric(names), []),
         Bridges
     ).
 
-merge_acc_with_connectors(Id, ConnectorMetrics, PointsAcc) ->
+merge_acc_with_connectors(Mode, Id, ConnectorMetrics, PointsAcc) ->
     maps:fold(
         fun(K, V, AccIn) ->
-            AccIn#{K => [connector_point(Id, V) | ?MG(K, AccIn)]}
+            AccIn#{K => [connector_point(Mode, Id, V) | ?MG(K, AccIn)]}
         end,
         PointsAcc,
         ConnectorMetrics
     ).
 
-connector_point(Id, V) ->
-    {[{id, Id}], V}.
+connector_point(Mode, Id, V) ->
+    {with_node_label(Mode, [{id, Id}]), V}.
 
 get_connector_status(#{resource_data := ResourceData} = _Bridge) ->
     Enabled = emqx_utils_maps:deep_get([config, enable], ResourceData),
@@ -532,3 +532,13 @@ zip_json_data_integration_metrics(Key, Points, [] = _AccIn) ->
 zip_json_data_integration_metrics(Key, Points, AllResultedAcc) ->
     ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points),
     lists:zipwith(fun maps:merge/2, AllResultedAcc, ThisKeyResult).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Helper funcs
+
+with_node_label(?PROM_DATA_MODE__NODE, Labels) ->
+    Labels;
+with_node_label(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Labels) ->
+    Labels;
+with_node_label(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, Labels) ->
+    [{node, node(self())} | Labels].