فهرست منبع

perf(dashboard): concurrent fetch of current-rate

zmstone 1 سال پیش
والد
کامیت
fd938d2339
1فایلهای تغییر یافته به همراه23 افزوده شده و 21 حذف شده
  1. 23 21
      apps/emqx_dashboard/src/emqx_dashboard_monitor.erl

+ 23 - 21
apps/emqx_dashboard/src/emqx_dashboard_monitor.erl

@@ -134,31 +134,28 @@ current_rate(Node) when Node == node() ->
 current_rate(Node) ->
     case emqx_dashboard_proto_v1:current_rate(Node) of
         {badrpc, Reason} ->
-            {badrpc, {Node, Reason}};
+            {badrpc, #{node => Node, reason => Reason}};
         {ok, Rate} ->
             {ok, Rate}
     end.
 
 %% Get the current rate. Not the current sampler data.
 current_rate_cluster() ->
-    Fun =
+    Nodes = mria:cluster_nodes(running),
+    %% each call has 5s timeout, so it's ok to wait infinity here
+    L0 = emqx_utils:pmap(fun(Node) -> current_rate(Node) end, Nodes, infinity),
+    {L1, Failed} = lists:partition(
         fun
-            (Node, Cluster) when is_map(Cluster) ->
-                case current_rate(Node) of
-                    {ok, CurrentRate} ->
-                        merge_cluster_rate(CurrentRate, Cluster);
-                    {badrpc, Reason} ->
-                        {badrpc, {Node, Reason}}
-                end;
-            (_Node, Error) ->
-                Error
+            ({ok, _}) -> true;
+            (_) -> false
         end,
-    case lists:foldl(Fun, #{}, mria:cluster_nodes(running)) of
-        {badrpc, Reason} ->
-            {badrpc, Reason};
-        Metrics ->
-            {ok, adjust_synthetic_cluster_metrics(Metrics)}
-    end.
+        L0
+    ),
+    Failed =/= [] orelse
+        ?LOG(badrpc_log_level(L1), #{msg => "failed_to_sample_current_rate", errors => Failed}),
+    Fun = fun({ok, Result}, Cluster) -> merge_cluster_rate(Result, Cluster) end,
+    Metrics = lists:foldl(Fun, #{}, L1),
+    {ok, adjust_synthetic_cluster_metrics(Metrics)}.
 
 %% -------------------------------------------------------------------------------------------------
 %% gen_server functions
@@ -265,6 +262,10 @@ do_sample_local(Time) ->
     %% downsample before return RPC calls for less data to merge by the caller nodes
     downsample(Time, Map).
 
+%% log error level when there is no success (unlikely to happen), and warning otherwise
+badrpc_log_level([]) -> error;
+badrpc_log_level(_) -> warning.
+
 sample_nodes(Nodes, Time) ->
     ResList = concurrently_sample_nodes(Nodes, Time),
     {Failed, Success} = lists:partition(
@@ -275,15 +276,14 @@ sample_nodes(Nodes, Time) ->
         ResList
     ),
     Failed =/= [] andalso
-        ?LOG(warning, #{msg => "failed_to_sample_monitor_data", errors => Failed}),
+        ?LOG(badrpc_log_level(Success), #{msg => "failed_to_sample_monitor_data", errors => Failed}),
     lists:foldl(fun(I, B) -> merge_samplers(Time, I, B) end, #{}, Success).
 
 concurrently_sample_nodes(Nodes, Time) ->
     %% emqx_dashboard_proto_v1:do_sample has a timeout (5s),
-    Timeout = ?RPC_TIMEOUT + ?ONE_SECOND,
     %% call emqx_utils:pmap here instead of a rpc multicall
     %% to avoid having to introduce a new bpapi proto version
-    emqx_utils:pmap(fun(Node) -> do_sample(Node, Time) end, Nodes, Timeout).
+    emqx_utils:pmap(fun(Node) -> do_sample(Node, Time) end, Nodes, infinity).
 
 merge_samplers(SinceTime, Increment0, Base) ->
     Increment =
@@ -432,7 +432,9 @@ sample_fill_gap(Node, SinceTs) ->
     Samples = do_sample(Node, SinceTs),
     fill_gaps(Samples, SinceTs).
 
-fill_gaps(Samples, SinceTs) ->
+fill_gaps({badrpc, _} = BadRpc, _) ->
+    BadRpc;
+fill_gaps(Samples, SinceTs) when is_map(Samples) ->
     TsList = lists:sort(maps:keys(Samples)),
     case length(TsList) >= 2 of
         true ->