Browse Source

perf(mgmt): get metrics from clsuter nodes concurrently

zmstone 1 năm trước cách đây
mục cha
commit
2f7110c944

+ 1 - 1
apps/emqx/src/proto/emqx_proto_v1.erl

@@ -54,7 +54,7 @@ get_stats(Node) ->
 
 -spec get_metrics(node()) -> [{emqx_metrics:metric_name(), non_neg_integer()}] | {badrpc, _}.
 get_metrics(Node) ->
-    rpc:call(Node, emqx_metrics, all, []).
+    rpc:call(Node, emqx_metrics, all, [], timer:seconds(5)).
 
 -spec clean_authz_cache(node(), emqx_types:clientid()) ->
     ok

+ 39 - 6
apps/emqx_management/src/emqx_mgmt_api_metrics.erl

@@ -21,6 +21,7 @@
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hocon_types.hrl").
 -include_lib("emqx/include/emqx_metrics.hrl").
+-include_lib("emqx/include/logger.hrl").
 
 -import(hoconsc, [mk/2, ref/2]).
 
@@ -60,15 +61,47 @@ metrics(get, #{query_string := Qs}) ->
         true ->
             {200, emqx_mgmt:get_metrics()};
         false ->
-            Data = [
-                maps:from_list(
-                    emqx_mgmt:get_metrics(Node) ++ [{node, Node}]
-                )
-             || Node <- emqx:running_nodes()
-            ],
+            Data = cluster_metrics(emqx:running_nodes()),
             {200, Data}
     end.
 
+%% if no success (unlikely to happen though), log error level
+%% otherwise warning.
+badrpc_log_level([]) -> error;
+badrpc_log_level(_) -> warning.
+
+cluster_metrics(Nodes) ->
+    %% each call has 5 seconds timeout, so it's ok to use infinity here
+    L1 = emqx_utils:pmap(
+        fun(Node) ->
+            case emqx_mgmt:get_metrics(Node) of
+                {error, Reason} ->
+                    #{node => Node, reason => Reason};
+                Result when is_list(Result) ->
+                    Result
+            end
+        end,
+        Nodes,
+        infinity
+    ),
+    {OK, Failed} =
+        lists:partition(
+            fun
+                (R) when is_list(R) ->
+                    true;
+                (E) when is_map(E) ->
+                    false
+            end,
+            L1
+        ),
+    Failed =/= [] orelse
+        ?SLOG(
+            badrpc_log_level(OK),
+            #{msg => "failed_to_fetch_metrics", errors => Failed},
+            #{tag => "MGMT"}
+        ),
+    lists:map(fun({Node, L}) when is_list(L) -> maps:from_list([{node, Node} | L]) end, OK).
+
 %%--------------------------------------------------------------------
 %% swagger defines
 %%--------------------------------------------------------------------