|
|
@@ -53,7 +53,7 @@
|
|
|
format/1,
|
|
|
clean/1,
|
|
|
lookup/1,
|
|
|
- sample_nodes/3,
|
|
|
+ sample_nodes/2,
|
|
|
randomize/2,
|
|
|
randomize/3,
|
|
|
sample_fill_gap/2,
|
|
|
@@ -73,6 +73,7 @@
|
|
|
|
|
|
-define(CLEAN_EXPIRED_INTERVAL, 10 * ?MINUTES).
|
|
|
-define(RETENTION_TIME, 7 * ?DAYS).
|
|
|
+-define(MAX_POSSIBLE_SAMPLES, 1440).
|
|
|
|
|
|
-record(state, {
|
|
|
last,
|
|
|
@@ -244,7 +245,7 @@ do_sample(Node, infinity) ->
|
|
|
do_sample(Node, 0);
|
|
|
do_sample(all, Time) when is_integer(Time) ->
|
|
|
AllNodes = emqx:running_nodes(),
|
|
|
- All = sample_nodes(AllNodes, Time, #{}),
|
|
|
+ All = sample_nodes(AllNodes, Time),
|
|
|
maps:map(fun(_, S) -> adjust_synthetic_cluster_metrics(S) end, All);
|
|
|
do_sample(Node, Time) when Node == node() andalso is_integer(Time) ->
|
|
|
do_sample_local(Time);
|
|
|
@@ -263,7 +264,7 @@ do_sample_local(Time) ->
|
|
|
%% downsample before return RPC calls for less data to merge by the caller nodes
|
|
|
downsample(Time, Map).
|
|
|
|
|
|
-sample_nodes(Nodes, Time, Local) ->
|
|
|
+sample_nodes(Nodes, Time) ->
|
|
|
ResList = concurrently_sample_nodes(Nodes, Time),
|
|
|
{Failed, Success} = lists:partition(
|
|
|
fun
|
|
|
@@ -274,7 +275,7 @@ sample_nodes(Nodes, Time, Local) ->
|
|
|
),
|
|
|
Failed =/= [] andalso
|
|
|
?SLOG(warning, #{msg => "failed_to_sample_monitor_data", errors => Failed}),
|
|
|
- lists:foldl(fun merge_samplers/2, Local, Success).
|
|
|
+ lists:foldl(fun(I, B) -> merge_samplers(Time, I, B) end, #{}, Success).
|
|
|
|
|
|
concurrently_sample_nodes(Nodes, Time) ->
|
|
|
%% emqx_dashboard_proto_v1:do_sample has a timeout (5s),
|
|
|
@@ -283,7 +284,15 @@ concurrently_sample_nodes(Nodes, Time) ->
|
|
|
%% to avoid having to introduce a new bpapi proto version
|
|
|
emqx_utils:pmap(fun(Node) -> do_sample(Node, Time) end, Nodes, Timeout).
|
|
|
|
|
|
-merge_samplers(Increment, Base) ->
|
|
|
+merge_samplers(SinceTime, Increment0, Base) ->
|
|
|
+ Increment =
|
|
|
+ case map_size(Increment0) > ?MAX_POSSIBLE_SAMPLES of
|
|
|
+ true ->
|
|
|
+ %% this is a response from older version node
|
|
|
+ downsample(SinceTime, Increment0);
|
|
|
+ false ->
|
|
|
+ Increment0
|
|
|
+ end,
|
|
|
maps:fold(fun merge_samplers_loop/3, Base, Increment).
|
|
|
|
|
|
merge_samplers_loop(TS, Increment, Base) when is_map(Increment) ->
|