Explorar el Código

Merge pull request #13889 from zmstone/0927-perf-monitor-and-metrics-api

perf: concurrent nodes query for `/monitor_current` and `/metrics` api
zmstone hace 1 año
padre
commit
7a8db0dcb0

+ 1 - 1
apps/emqx/src/proto/emqx_proto_v1.erl

@@ -54,7 +54,7 @@ get_stats(Node) ->
 
 -spec get_metrics(node()) -> [{emqx_metrics:metric_name(), non_neg_integer()}] | {badrpc, _}.
 get_metrics(Node) ->
-    rpc:call(Node, emqx_metrics, all, []).
+    rpc:call(Node, emqx_metrics, all, [], timer:seconds(5)).
 
 -spec clean_authz_cache(node(), emqx_types:clientid()) ->
     ok

+ 28 - 24
apps/emqx_dashboard/src/emqx_dashboard_monitor.erl

@@ -74,6 +74,7 @@
 -define(CLEAN_EXPIRED_INTERVAL, 10 * ?MINUTES).
 -define(RETENTION_TIME, 7 * ?DAYS).
 -define(MAX_POSSIBLE_SAMPLES, 1440).
+-define(LOG(LEVEL, DATA), ?SLOG(LEVEL, DATA, #{tag => "DASHBOARD"})).
 
 -record(state, {
     last,
@@ -120,8 +121,8 @@ current_rate(Node) when Node == node() ->
     try
         do_call(current_rate)
     catch
-        _E:R ->
-            ?SLOG(warning, #{msg => "dashboard_monitor_error", reason => R}),
+        _E:R:Stacktrace ->
+            ?LOG(warning, #{msg => "dashboard_monitor_error", reason => R, stacktrace => Stacktrace}),
             %% Rate map 0, ensure api will not crash.
             %% When joining cluster, dashboard monitor restart.
             Rate0 = [
@@ -133,31 +134,28 @@ current_rate(Node) when Node == node() ->
 current_rate(Node) ->
     case emqx_dashboard_proto_v1:current_rate(Node) of
         {badrpc, Reason} ->
-            {badrpc, {Node, Reason}};
+            {badrpc, #{node => Node, reason => Reason}};
         {ok, Rate} ->
             {ok, Rate}
     end.
 
 %% Get the current rate. Not the current sampler data.
 current_rate_cluster() ->
-    Fun =
+    Nodes = mria:cluster_nodes(running),
+    %% each call has 5s timeout, so it's ok to wait infinity here
+    L0 = emqx_utils:pmap(fun(Node) -> current_rate(Node) end, Nodes, infinity),
+    {L1, Failed} = lists:partition(
         fun
-            (Node, Cluster) when is_map(Cluster) ->
-                case current_rate(Node) of
-                    {ok, CurrentRate} ->
-                        merge_cluster_rate(CurrentRate, Cluster);
-                    {badrpc, Reason} ->
-                        {badrpc, {Node, Reason}}
-                end;
-            (_Node, Error) ->
-                Error
+            ({ok, _}) -> true;
+            (_) -> false
         end,
-    case lists:foldl(Fun, #{}, mria:cluster_nodes(running)) of
-        {badrpc, Reason} ->
-            {badrpc, Reason};
-        Metrics ->
-            {ok, adjust_synthetic_cluster_metrics(Metrics)}
-    end.
+        L0
+    ),
+    Failed =/= [] orelse
+        ?LOG(badrpc_log_level(L1), #{msg => "failed_to_sample_current_rate", errors => Failed}),
+    Fun = fun({ok, Result}, Cluster) -> merge_cluster_rate(Result, Cluster) end,
+    Metrics = lists:foldl(Fun, #{}, L1),
+    {ok, adjust_synthetic_cluster_metrics(Metrics)}.
 
 %% -------------------------------------------------------------------------------------------------
 %% gen_server functions
@@ -264,6 +262,10 @@ do_sample_local(Time) ->
     %% downsample before return RPC calls for less data to merge by the caller nodes
     downsample(Time, Map).
 
+%% log error level when there is no success (unlikely to happen), and warning otherwise
+badrpc_log_level([]) -> error;
+badrpc_log_level(_) -> warning.
+
 sample_nodes(Nodes, Time) ->
     ResList = concurrently_sample_nodes(Nodes, Time),
     {Failed, Success} = lists:partition(
@@ -274,15 +276,14 @@ sample_nodes(Nodes, Time) ->
         ResList
     ),
     Failed =/= [] andalso
-        ?SLOG(warning, #{msg => "failed_to_sample_monitor_data", errors => Failed}),
+        ?LOG(badrpc_log_level(Success), #{msg => "failed_to_sample_monitor_data", errors => Failed}),
     lists:foldl(fun(I, B) -> merge_samplers(Time, I, B) end, #{}, Success).
 
 concurrently_sample_nodes(Nodes, Time) ->
     %% emqx_dashboard_proto_v1:do_sample has a timeout (5s),
-    Timeout = ?RPC_TIMEOUT + ?ONE_SECOND,
     %% call emqx_utils:pmap here instead of a rpc multicall
     %% to avoid having to introduce a new bpapi proto version
-    emqx_utils:pmap(fun(Node) -> do_sample(Node, Time) end, Nodes, Timeout).
+    emqx_utils:pmap(fun(Node) -> do_sample(Node, Time) end, Nodes, infinity).
 
 merge_samplers(SinceTime, Increment0, Base) ->
     Increment =
@@ -428,10 +429,13 @@ sample_interval(_Age) ->
     10 * ?MINUTES.
 
 sample_fill_gap(Node, SinceTs) ->
-    Samples = do_sample(Node, SinceTs),
+    %% make a remote call so it can be mocked for testing
+    Samples = ?MODULE:do_sample(Node, SinceTs),
     fill_gaps(Samples, SinceTs).
 
-fill_gaps(Samples, SinceTs) ->
+fill_gaps({badrpc, _} = BadRpc, _) ->
+    BadRpc;
+fill_gaps(Samples, SinceTs) when is_map(Samples) ->
     TsList = lists:sort(maps:keys(Samples)),
     case length(TsList) >= 2 of
         true ->

+ 19 - 0
apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl

@@ -273,6 +273,25 @@ t_monitor_sampler_format(_Config) ->
     [?assert(lists:member(SamplerName, SamplerKeys)) || SamplerName <- ?SAMPLER_LIST],
     ok.
 
+t_sample_specific_node_but_badrpc(_Config) ->
+    meck:new(emqx_dashboard_monitor, [non_strict, passthrough, no_history, no_link]),
+    meck:expect(
+        emqx_dashboard_monitor,
+        do_sample,
+        fun(_Node, _Time) -> {badrpc, test} end
+    ),
+    ?assertMatch(
+        {error, {404, #{<<"code">> := <<"NOT_FOUND">>}}},
+        request(["monitor", "nodes", "a@b.net"], "latest=1000")
+    ),
+    %% arguably, it should be a 503
+    ?assertMatch(
+        {error, {400, #{<<"code">> := <<"BAD_REQUEST">>}}},
+        request(["monitor", "nodes", atom_to_list(node())], "latest=1000")
+    ),
+    meck:unload(emqx_dashboard_monitor),
+    ok.
+
 t_handle_old_monitor_data(_Config) ->
     Now = erlang:system_time(second),
     FakeOldData = maps:from_list(

+ 82 - 7
apps/emqx_management/src/emqx_mgmt_api_metrics.erl

@@ -21,6 +21,7 @@
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hocon_types.hrl").
 -include_lib("emqx/include/emqx_metrics.hrl").
+-include_lib("emqx/include/logger.hrl").
 
 -import(hoconsc, [mk/2, ref/2]).
 
@@ -40,6 +41,9 @@
 %% http handlers
 -export([metrics/2]).
 
+%% test
+-export([cluster_metrics/1]).
+
 %%--------------------------------------------------------------------
 %% minirest behaviour callbacks
 %%--------------------------------------------------------------------
@@ -58,17 +62,70 @@ paths() ->
 metrics(get, #{query_string := Qs}) ->
     case maps:get(<<"aggregate">>, Qs, false) of
         true ->
+            %% sum counters from all nodes into one single map
             {200, emqx_mgmt:get_metrics()};
         false ->
-            Data = [
-                maps:from_list(
-                    emqx_mgmt:get_metrics(Node) ++ [{node, Node}]
-                )
-             || Node <- emqx:running_nodes()
-            ],
+            Node = parse_node(maps:get(<<"node">>, Qs, no_value)),
+            %% return metrics for individual nodes
+            %% make a remote call to make test easier
+            Data = ?MODULE:cluster_metrics(Node),
             {200, Data}
     end.
 
+parse_node(no_value) ->
+    no_value;
+parse_node(Node) ->
+    try
+        binary_to_existing_atom(Node)
+    catch
+        _:_ ->
+            invalid
+    end.
+
+%% if no success (unlikely to happen though), log error level
+%% otherwise warning.
+badrpc_log_level([]) -> error;
+badrpc_log_level(_) -> warning.
+
+cluster_metrics(no_value) ->
+    cluster_metrics(emqx:running_nodes());
+cluster_metrics(invalid) ->
+    %% invalid node name is equivalent to a node running but rpc failed
+    [];
+cluster_metrics(Node) when is_atom(Node) ->
+    cluster_metrics([Node]);
+cluster_metrics(Nodes) when is_list(Nodes) ->
+    %% each call has 5 seconds timeout, so it's ok to use infinity here
+    L1 = emqx_utils:pmap(
+        fun(Node) ->
+            case emqx_mgmt:get_metrics(Node) of
+                {error, Reason} ->
+                    #{node => Node, reason => Reason};
+                Result when is_list(Result) ->
+                    [{node, Node} | Result]
+            end
+        end,
+        Nodes,
+        infinity
+    ),
+    {OK, Failed} =
+        lists:partition(
+            fun
+                (R) when is_list(R) ->
+                    true;
+                (E) when is_map(E) ->
+                    false
+            end,
+            L1
+        ),
+    Failed =/= [] orelse
+        ?SLOG(
+            badrpc_log_level(OK),
+            #{msg => "failed_to_fetch_metrics", errors => Failed},
+            #{tag => "MGMT"}
+        ),
+    lists:map(fun maps:from_list/1, OK).
+
 %%--------------------------------------------------------------------
 %% swagger defines
 %%--------------------------------------------------------------------
@@ -88,7 +145,23 @@ schema("/metrics") ->
                                 #{
                                     in => query,
                                     required => false,
-                                    desc => <<"Whether to aggregate all nodes Metrics">>
+                                    desc => <<
+                                        "Whether to aggregate all nodes Metrics. "
+                                        "Default value is 'true'."
+                                    >>
+                                }
+                            )},
+                        {node,
+                            mk(
+                                binary(),
+                                #{
+                                    in => query,
+                                    required => false,
+                                    desc => <<
+                                        "Specify which specific node to fetch data from. "
+                                        "If not provided, return values for all nodes. "
+                                        "This parameter only works when 'aggregate' is 'false'."
+                                    >>
                                 }
                             )}
                     ],
@@ -96,7 +169,9 @@ schema("/metrics") ->
                     #{
                         200 => hoconsc:union(
                             [
+                                %% aggregate=true
                                 ref(?MODULE, aggregated_metrics),
+                                %% aggregate=false
                                 hoconsc:array(ref(?MODULE, node_metrics))
                             ]
                         )

+ 39 - 0
apps/emqx_management/test/emqx_mgmt_api_metrics_SUITE.erl

@@ -44,6 +44,45 @@ t_metrics_api(_) ->
     AggregateMetrics = emqx_mgmt:get_metrics(),
     match_helper(AggregateMetrics, MetricsFromAPI).
 
+t_metrics_api_cluster_partial_fail(_) ->
+    meck:new(emqx_mgmt_api_metrics, [non_strict, passthrough, no_history, no_link]),
+    meck:expect(
+        emqx_mgmt_api_metrics,
+        cluster_metrics,
+        fun(_) ->
+            Nodes = [node(), 'emqx@127.0.0.8', 'emqx@127.0.0.7'],
+            meck:passthrough([Nodes])
+        end
+    ),
+    try
+        {ok, MetricsResponse} = request_helper("metrics?aggregate=false"),
+        [MetricsFromAPI] = emqx_utils_json:decode(MetricsResponse, [return_maps]),
+        AggregateMetrics = emqx_mgmt:get_metrics(),
+        match_helper(AggregateMetrics#{node => atom_to_binary(node())}, MetricsFromAPI)
+    after
+        meck:unload(emqx_mgmt_api_metrics)
+    end.
+
+t_metrics_api_cluster_all_fail(_) ->
+    meck:new(emqx_mgmt_api_metrics, [non_strict, passthrough, no_history, no_link]),
+    meck:expect(
+        emqx_mgmt_api_metrics,
+        cluster_metrics,
+        fun(_) ->
+            Nodes = ['emqx@127.0.0.8', 'emqx@127.0.0.7'],
+            meck:passthrough([Nodes])
+        end
+    ),
+    try
+        ?assertEqual({ok, "[]"}, request_helper("metrics?aggregate=false"))
+    after
+        meck:unload(emqx_mgmt_api_metrics)
+    end.
+
+t_metrics_api_cluster_bad_nodename(_) ->
+    Qs = "?aggregate=false&node=notanexistingatom",
+    ?assertEqual({ok, "[]"}, request_helper("metrics" ++ Qs)).
+
 t_single_node_metrics_api(_) ->
     {ok, MetricsResponse} = request_helper("metrics"),
     [MetricsFromAPI] = emqx_utils_json:decode(MetricsResponse, [return_maps]),