|
|
@@ -243,16 +243,11 @@ do_sample(Node, infinity) ->
|
|
|
%% handle RPC from old version nodes
|
|
|
do_sample(Node, 0);
|
|
|
do_sample(all, Time) when is_integer(Time) ->
|
|
|
- AllNodes = emqx:running_nodes() -- [node()],
|
|
|
- Local = do_sample(node(), Time),
|
|
|
- All = sample_nodes(AllNodes, Time, Local),
|
|
|
+ AllNodes = emqx:running_nodes(),
|
|
|
+ All = sample_nodes(AllNodes, Time, #{}),
|
|
|
maps:map(fun(_, S) -> adjust_synthetic_cluster_metrics(S) end, All);
|
|
|
do_sample(Node, Time) when Node == node() andalso is_integer(Time) ->
|
|
|
- MS = ets:fun2ms(fun(#emqx_monit{time = T} = A) when T >= Time -> A end),
|
|
|
- FromDB = ets:select(?TAB, MS),
|
|
|
- Map = to_ts_data_map(FromDB),
|
|
|
- %% downsample before return RPC calls for less data to merge by the caller nodes
|
|
|
- downsample(Time, Map);
|
|
|
+ do_sample_local(Time);
|
|
|
do_sample(Node, Time) when is_integer(Time) ->
|
|
|
case emqx_dashboard_proto_v1:do_sample(Node, Time) of
|
|
|
{badrpc, Reason} ->
|
|
|
@@ -261,6 +256,13 @@ do_sample(Node, Time) when is_integer(Time) ->
|
|
|
Res
|
|
|
end.
|
|
|
|
|
|
+do_sample_local(Time) ->
|
|
|
+ MS = ets:fun2ms(fun(#emqx_monit{time = T} = A) when T >= Time -> A end),
|
|
|
+ FromDB = ets:select(?TAB, MS),
|
|
|
+ Map = to_ts_data_map(FromDB),
|
|
|
+ %% downsample before return RPC calls for less data to merge by the caller nodes
|
|
|
+ downsample(Time, Map).
|
|
|
+
|
|
|
sample_nodes(Nodes, Time, Local) ->
|
|
|
ResList = concurrently_sample_nodes(Nodes, Time),
|
|
|
{Failed, Success} = lists:partition(
|
|
|
@@ -293,19 +295,28 @@ merge_samplers_loop(TS, Increment, Base) when is_map(Increment) ->
|
|
|
end.
|
|
|
|
|
|
merge_sampler_maps(M1, M2) when is_map(M1) andalso is_map(M2) ->
|
|
|
- Fun =
|
|
|
- fun
|
|
|
- (Key, Map) when
|
|
|
- %% cluster-synced values
|
|
|
- Key =:= topics;
|
|
|
- Key =:= subscriptions_durable;
|
|
|
- Key =:= disconnected_durable_sessions
|
|
|
- ->
|
|
|
- Map#{Key => max(maps:get(Key, M1, 0), maps:get(Key, M2, 0))};
|
|
|
- (Key, Map) ->
|
|
|
- Map#{Key => maps:get(Key, M1, 0) + maps:get(Key, M2, 0)}
|
|
|
- end,
|
|
|
- lists:foldl(Fun, #{}, ?SAMPLER_LIST).
|
|
|
+ Fun = fun(Key, Acc) -> merge_values(Key, M1, Acc) end,
|
|
|
+ lists:foldl(Fun, M2, ?SAMPLER_LIST).
|
|
|
+
|
|
|
+%% topics, subscriptions_durable and disconnected_durable_sessions are cluster synced
|
|
|
+merge_values(topics, M1, M2) ->
|
|
|
+ max_values(topics, M1, M2);
|
|
|
+merge_values(subscriptions_durable, M1, M2) ->
|
|
|
+ max_values(subscriptions_durable, M1, M2);
|
|
|
+merge_values(disconnected_durable_sessions, M1, M2) ->
|
|
|
+ max_values(disconnected_durable_sessions, M1, M2);
|
|
|
+merge_values(Key, M1, M2) ->
|
|
|
+ sum_values(Key, M1, M2).
|
|
|
+
|
|
|
+max_values(Key, M1, M2) when is_map_key(Key, M1) orelse is_map_key(Key, M2) ->
|
|
|
+ M2#{Key => max(maps:get(Key, M1, 0), maps:get(Key, M2, 0))};
|
|
|
+max_values(_Key, _M1, M2) ->
|
|
|
+ M2.
|
|
|
+
|
|
|
+sum_values(Key, M1, M2) when is_map_key(Key, M1) orelse is_map_key(Key, M2) ->
|
|
|
+ M2#{Key => maps:get(Key, M1, 0) + maps:get(Key, M2, 0)};
|
|
|
+sum_values(_Key, _M1, M2) ->
|
|
|
+ M2.
|
|
|
|
|
|
merge_cluster_rate(Node, Cluster) ->
|
|
|
Fun =
|
|
|
@@ -445,13 +456,16 @@ fill_gaps_loop(T, Interval, Latest, Samples) ->
|
|
|
fill_gaps_loop(T + Interval, Interval, Latest, Samples1).
|
|
|
|
|
|
downsample(SinceTs, TsDataMap) when map_size(TsDataMap) >= 2 ->
|
|
|
- TsList = lists:sort(maps:keys(TsDataMap)),
|
|
|
- Latest = lists:last(TsList),
|
|
|
+ TsList = ts_list(TsDataMap),
|
|
|
+ Latest = lists:max(TsList),
|
|
|
Interval = sample_interval(Latest - SinceTs),
|
|
|
downsample_loop(TsList, TsDataMap, Interval, #{});
|
|
|
downsample(_Since, TsDataMap) ->
|
|
|
TsDataMap.
|
|
|
|
|
|
+ts_list(TsDataMap) ->
|
|
|
+ maps:keys(TsDataMap).
|
|
|
+
|
|
|
round_down(Ts, Interval) ->
|
|
|
Ts - (Ts rem Interval).
|
|
|
|