Переглянути джерело

feat(mgmt): add a 'node' parameter to /metrics query string

zmstone 1 рік тому
батько
коміт
440de2322d

+ 47 - 5
apps/emqx_management/src/emqx_mgmt_api_metrics.erl

@@ -41,6 +41,9 @@
 %% http handlers
 -export([metrics/2]).
 
+%% test
+-export([cluster_metrics/1]).
+
 %%--------------------------------------------------------------------
 %% minirest behaviour callbacks
 %%--------------------------------------------------------------------
@@ -59,18 +62,39 @@ paths() ->
 metrics(get, #{query_string := Qs}) ->
     case maps:get(<<"aggregate">>, Qs, false) of
         true ->
+            %% sum counters from all nodes into one single map
             {200, emqx_mgmt:get_metrics()};
         false ->
-            Data = cluster_metrics(emqx:running_nodes()),
+            Node = parse_node(maps:get(<<"node">>, Qs, no_value)),
+            %% return metrics for individual nodes
+            %% make a remote call to make test easier
+            Data = ?MODULE:cluster_metrics(Node),
             {200, Data}
     end.
 
+parse_node(no_value) ->
+    no_value;
+parse_node(Node) ->
+    try
+        binary_to_existing_atom(Node)
+    catch
+        _:_ ->
+            invalid
+    end.
+
 %% if no success (unlikely to happen though), log error level
 %% otherwise warning.
 badrpc_log_level([]) -> error;
 badrpc_log_level(_) -> warning.
 
-cluster_metrics(Nodes) ->
+cluster_metrics(no_value) ->
+    cluster_metrics(emqx:running_nodes());
+cluster_metrics(invalid) ->
+    %% invalid node name is equivalent to a node running but rpc failed
+    [];
+cluster_metrics(Node) when is_atom(Node) ->
+    cluster_metrics([Node]);
+cluster_metrics(Nodes) when is_list(Nodes) ->
     %% each call has 5 seconds timeout, so it's ok to use infinity here
     L1 = emqx_utils:pmap(
         fun(Node) ->
@@ -78,7 +102,7 @@ cluster_metrics(Nodes) ->
                 {error, Reason} ->
                     #{node => Node, reason => Reason};
                 Result when is_list(Result) ->
-                    Result
+                    [{node, Node} | Result]
             end
         end,
         Nodes,
@@ -100,7 +124,7 @@ cluster_metrics(Nodes) ->
             #{msg => "failed_to_fetch_metrics", errors => Failed},
             #{tag => "MGMT"}
         ),
-    lists:map(fun({Node, L}) when is_list(L) -> maps:from_list([{node, Node} | L]) end, OK).
+    lists:map(fun maps:from_list/1, OK).
 
 %%--------------------------------------------------------------------
 %% swagger defines
@@ -121,7 +145,23 @@ schema("/metrics") ->
                                 #{
                                     in => query,
                                     required => false,
-                                    desc => <<"Whether to aggregate all nodes Metrics">>
+                                    desc => <<
+                                        "Whether to aggregate all nodes Metrics. "
+                                        "Default value is 'true'."
+                                    >>
+                                }
+                            )},
+                        {node,
+                            mk(
+                                binary(),
+                                #{
+                                    in => query,
+                                    required => false,
+                                    desc => <<
+                                        "Specify which specific node to fetch data from. "
+                                        "If not provided, return values for all nodes. "
+                                        "This parameter only works when 'aggregate' is 'false'."
+                                    >>
                                 }
                             )}
                     ],
@@ -129,7 +169,9 @@ schema("/metrics") ->
                     #{
                         200 => hoconsc:union(
                             [
+                                %% aggregate=true
                                 ref(?MODULE, aggregated_metrics),
+                                %% aggregate=false
                                 hoconsc:array(ref(?MODULE, node_metrics))
                             ]
                         )

+ 39 - 0
apps/emqx_management/test/emqx_mgmt_api_metrics_SUITE.erl

@@ -44,6 +44,45 @@ t_metrics_api(_) ->
     AggregateMetrics = emqx_mgmt:get_metrics(),
     match_helper(AggregateMetrics, MetricsFromAPI).
 
+t_metrics_api_cluster_partial_fail(_) ->
+    meck:new(emqx_mgmt_api_metrics, [non_strict, passthrough, no_history, no_link]),
+    meck:expect(
+        emqx_mgmt_api_metrics,
+        cluster_metrics,
+        fun(_) ->
+            Nodes = [node(), 'emqx@127.0.0.8', 'emqx@127.0.0.7'],
+            meck:passthrough([Nodes])
+        end
+    ),
+    try
+        {ok, MetricsResponse} = request_helper("metrics?aggregate=false"),
+        [MetricsFromAPI] = emqx_utils_json:decode(MetricsResponse, [return_maps]),
+        AggregateMetrics = emqx_mgmt:get_metrics(),
+        match_helper(AggregateMetrics#{node => atom_to_binary(node())}, MetricsFromAPI)
+    after
+        meck:unload(emqx_mgmt_api_metrics)
+    end.
+
+t_metrics_api_cluster_all_fail(_) ->
+    meck:new(emqx_mgmt_api_metrics, [non_strict, passthrough, no_history, no_link]),
+    meck:expect(
+        emqx_mgmt_api_metrics,
+        cluster_metrics,
+        fun(_) ->
+            Nodes = ['emqx@127.0.0.8', 'emqx@127.0.0.7'],
+            meck:passthrough([Nodes])
+        end
+    ),
+    try
+        ?assertEqual({ok, "[]"}, request_helper("metrics?aggregate=false"))
+    after
+        meck:unload(emqx_mgmt_api_metrics)
+    end.
+
+t_metrics_api_cluster_bad_nodename(_) ->
+    Qs = "?aggregate=false&node=notanexistingatom",
+    ?assertEqual({ok, "[]"}, request_helper("metrics" ++ Qs)).
+
 t_single_node_metrics_api(_) ->
     {ok, MetricsResponse} = request_helper("metrics"),
     [MetricsFromAPI] = emqx_utils_json:decode(MetricsResponse, [return_maps]),