|
|
@@ -20,6 +20,7 @@
|
|
|
|
|
|
-include_lib("snabbkaffe/include/trace.hrl").
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
+-include_lib("stdlib/include/ms_transform.hrl").
|
|
|
|
|
|
-behaviour(gen_server).
|
|
|
|
|
|
@@ -38,26 +39,46 @@
|
|
|
-export([
|
|
|
samplers/0,
|
|
|
samplers/2,
|
|
|
- current_rate/1,
|
|
|
- granularity_adapter/1
|
|
|
+ current_rate/1
|
|
|
]).
|
|
|
|
|
|
--ifdef(TEST).
|
|
|
--export([current_rate_cluster/0]).
|
|
|
--endif.
|
|
|
-
|
|
|
%% for rpc
|
|
|
-export([do_sample/2]).
|
|
|
|
|
|
+%% For tests
|
|
|
+-export([
|
|
|
+ current_rate_cluster/0,
|
|
|
+ sample_interval/1,
|
|
|
+ store/1,
|
|
|
+ format/1,
|
|
|
+ clean/1,
|
|
|
+ lookup/1,
|
|
|
+ sample_nodes/2,
|
|
|
+ randomize/2,
|
|
|
+ randomize/3,
|
|
|
+ sample_fill_gap/2,
|
|
|
+ fill_gaps/2
|
|
|
+]).
|
|
|
+
|
|
|
-define(TAB, ?MODULE).
|
|
|
|
|
|
-%% 1 hour = 60 * 60 * 1000 milliseconds
|
|
|
--define(CLEAN_EXPIRED_INTERVAL, 60 * 60 * 1000).
|
|
|
-%% 7 days = 7 * 24 * 60 * 60 * 1000 milliseconds
|
|
|
--define(RETENTION_TIME, 7 * 24 * 60 * 60 * 1000).
|
|
|
+-define(ONE_SECOND, 1_000).
|
|
|
+-define(SECONDS, ?ONE_SECOND).
|
|
|
+-define(ONE_MINUTE, 60 * ?SECONDS).
|
|
|
+-define(MINUTES, ?ONE_MINUTE).
|
|
|
+-define(ONE_HOUR, 60 * ?MINUTES).
|
|
|
+-define(HOURS, ?ONE_HOUR).
|
|
|
+-define(ONE_DAY, 24 * ?HOURS).
|
|
|
+-define(DAYS, ?ONE_DAY).
|
|
|
+
|
|
|
+-define(CLEAN_EXPIRED_INTERVAL, 10 * ?MINUTES).
|
|
|
+-define(RETENTION_TIME, 7 * ?DAYS).
|
|
|
+-define(MAX_POSSIBLE_SAMPLES, 1440).
|
|
|
|
|
|
-record(state, {
|
|
|
- last
|
|
|
+ last,
|
|
|
+ clean_timer,
|
|
|
+ extra = []
|
|
|
}).
|
|
|
|
|
|
-record(emqx_monit, {
|
|
|
@@ -79,41 +100,20 @@ create_tables() ->
|
|
|
%% API
|
|
|
|
|
|
samplers() ->
|
|
|
- format(do_sample(all, infinity)).
|
|
|
+ format(sample_fill_gap(all, 0)).
|
|
|
|
|
|
samplers(NodeOrCluster, Latest) ->
|
|
|
- Time = latest2time(Latest),
|
|
|
- case format(do_sample(NodeOrCluster, Time)) of
|
|
|
+ SinceTime = latest2time(Latest),
|
|
|
+ case format(sample_fill_gap(NodeOrCluster, SinceTime)) of
|
|
|
{badrpc, Reason} ->
|
|
|
{badrpc, Reason};
|
|
|
List when is_list(List) ->
|
|
|
- granularity_adapter(List)
|
|
|
+ List
|
|
|
end.
|
|
|
|
|
|
-latest2time(infinity) -> infinity;
|
|
|
+latest2time(infinity) -> 0;
|
|
|
latest2time(Latest) -> erlang:system_time(millisecond) - (Latest * 1000).
|
|
|
|
|
|
-%% When the number of samples exceeds 1000, it affects the rendering speed of dashboard UI.
|
|
|
-%% granularity_adapter is an oversampling of the samples.
|
|
|
-%% Use more granular data and reduce data density.
|
|
|
-%%
|
|
|
-%% [
|
|
|
-%% Data1 = #{time => T1, k1 => 1, k2 => 2},
|
|
|
-%% Data2 = #{time => T2, k1 => 3, k2 => 4},
|
|
|
-%% ...
|
|
|
-%% ]
|
|
|
-%% After granularity_adapter, Merge Data1 Data2
|
|
|
-%%
|
|
|
-%% [
|
|
|
-%% #{time => T2, k1 => 1 + 3, k2 => 2 + 6},
|
|
|
-%% ...
|
|
|
-%% ]
|
|
|
-%%
|
|
|
-granularity_adapter(List) when length(List) > 1000 ->
|
|
|
- granularity_adapter(List, []);
|
|
|
-granularity_adapter(List) ->
|
|
|
- List.
|
|
|
-
|
|
|
current_rate(all) ->
|
|
|
current_rate_cluster();
|
|
|
current_rate(Node) when Node == node() ->
|
|
|
@@ -167,8 +167,9 @@ start_link() ->
|
|
|
|
|
|
init([]) ->
|
|
|
sample_timer(),
|
|
|
- clean_timer(),
|
|
|
- {ok, #state{last = undefined}}.
|
|
|
+ %% clean immediately
|
|
|
+ self() ! clean_expired,
|
|
|
+ {ok, #state{last = undefined, clean_timer = undefined, extra = []}}.
|
|
|
|
|
|
handle_call(current_rate, _From, State = #state{last = Last}) ->
|
|
|
NowTime = erlang:system_time(millisecond),
|
|
|
@@ -189,10 +190,11 @@ handle_info({sample, Time}, State = #state{last = Last}) ->
|
|
|
?tp(dashboard_monitor_flushed, #{}),
|
|
|
sample_timer(),
|
|
|
{noreply, State#state{last = Now}};
|
|
|
-handle_info(clean_expired, State) ->
|
|
|
+handle_info(clean_expired, #state{clean_timer = TrefOld} = State) ->
|
|
|
+ ok = maybe_cancel_timer(TrefOld),
|
|
|
clean(),
|
|
|
- clean_timer(),
|
|
|
- {noreply, State};
|
|
|
+ TrefNew = clean_timer(),
|
|
|
+ {noreply, State#state{clean_timer = TrefNew}};
|
|
|
handle_info(_Info, State = #state{}) ->
|
|
|
{noreply, State}.
|
|
|
|
|
|
@@ -205,65 +207,125 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
|
|
|
%% -------------------------------------------------------------------------------------------------
|
|
|
%% Internal functions
|
|
|
|
|
|
+%% for testing
|
|
|
+randomize(Count, Data) when is_map(Data) ->
|
|
|
+ MaxAge = 7 * ?DAYS,
|
|
|
+ randomize(Count, Data, MaxAge).
|
|
|
+
|
|
|
+randomize(Count, Data, Age) when is_map(Data) andalso is_integer(Age) ->
|
|
|
+ Now = erlang:system_time(millisecond) - 1,
|
|
|
+ Interval = sample_interval(Age),
|
|
|
+ NowBase = Now - (Now rem Interval),
|
|
|
+ StartTs = NowBase - Age,
|
|
|
+ lists:foreach(
|
|
|
+ fun(_) ->
|
|
|
+ Ts = StartTs + rand:uniform(Now - StartTs),
|
|
|
+ Record = #emqx_monit{time = Ts, data = Data},
|
|
|
+ case ets:lookup(?TAB, Ts) of
|
|
|
+ [] ->
|
|
|
+ store(Record);
|
|
|
+ [#emqx_monit{data = D} = R] ->
|
|
|
+ store(R#emqx_monit{data = merge_sampler_maps(D, Data)})
|
|
|
+ end
|
|
|
+ end,
|
|
|
+ lists:seq(1, Count)
|
|
|
+ ).
|
|
|
+
|
|
|
+maybe_cancel_timer(Tref) when is_reference(Tref) ->
|
|
|
+ _ = erlang:cancel_timer(Tref),
|
|
|
+ ok;
|
|
|
+maybe_cancel_timer(_) ->
|
|
|
+ ok.
|
|
|
+
|
|
|
do_call(Request) ->
|
|
|
gen_server:call(?MODULE, Request, 5000).
|
|
|
|
|
|
-do_sample(all, Time) ->
|
|
|
- do_sample(emqx:running_nodes(), Time, #{});
|
|
|
-do_sample(Node, Time) when Node == node() ->
|
|
|
- MS = match_spec(Time),
|
|
|
- internal_format(ets:select(?TAB, MS));
|
|
|
-do_sample(Node, Time) ->
|
|
|
+do_sample(Node, infinity) ->
|
|
|
+ %% handle RPC from old version nodes
|
|
|
+ do_sample(Node, 0);
|
|
|
+do_sample(all, Time) when is_integer(Time) ->
|
|
|
+ AllNodes = emqx:running_nodes(),
|
|
|
+ All = sample_nodes(AllNodes, Time),
|
|
|
+ maps:map(fun(_, S) -> adjust_synthetic_cluster_metrics(S) end, All);
|
|
|
+do_sample(Node, Time) when Node == node() andalso is_integer(Time) ->
|
|
|
+ do_sample_local(Time);
|
|
|
+do_sample(Node, Time) when is_integer(Time) ->
|
|
|
case emqx_dashboard_proto_v1:do_sample(Node, Time) of
|
|
|
{badrpc, Reason} ->
|
|
|
- {badrpc, {Node, Reason}};
|
|
|
+ {badrpc, #{node => Node, reason => Reason}};
|
|
|
Res ->
|
|
|
Res
|
|
|
end.
|
|
|
|
|
|
-do_sample([], _Time, Samples) ->
|
|
|
- maps:map(
|
|
|
- fun(_TS, Sample) -> adjust_synthetic_cluster_metrics(Sample) end,
|
|
|
- Samples
|
|
|
- );
|
|
|
-do_sample([Node | Nodes], Time, Res) ->
|
|
|
- case do_sample(Node, Time) of
|
|
|
- {badrpc, Reason} ->
|
|
|
- {badrpc, Reason};
|
|
|
- Samplers ->
|
|
|
- do_sample(Nodes, Time, merge_cluster_samplers(Samplers, Res))
|
|
|
- end.
|
|
|
-
|
|
|
-match_spec(infinity) ->
|
|
|
- [{'$1', [], ['$1']}];
|
|
|
-match_spec(Time) ->
|
|
|
- [{{'_', '$1', '_'}, [{'>=', '$1', Time}], ['$_']}].
|
|
|
+do_sample_local(Time) ->
|
|
|
+ MS = ets:fun2ms(fun(#emqx_monit{time = T} = A) when T >= Time -> A end),
|
|
|
+ FromDB = ets:select(?TAB, MS),
|
|
|
+ Map = to_ts_data_map(FromDB),
|
|
|
+ %% downsample before return RPC calls for less data to merge by the caller nodes
|
|
|
+ downsample(Time, Map).
|
|
|
|
|
|
-merge_cluster_samplers(NodeSamples, Cluster) ->
|
|
|
- maps:fold(fun merge_cluster_samplers/3, Cluster, NodeSamples).
|
|
|
+sample_nodes(Nodes, Time) ->
|
|
|
+ ResList = concurrently_sample_nodes(Nodes, Time),
|
|
|
+ {Failed, Success} = lists:partition(
|
|
|
+ fun
|
|
|
+ ({badrpc, _}) -> true;
|
|
|
+ (_) -> false
|
|
|
+ end,
|
|
|
+ ResList
|
|
|
+ ),
|
|
|
+ Failed =/= [] andalso
|
|
|
+ ?SLOG(warning, #{msg => "failed_to_sample_monitor_data", errors => Failed}),
|
|
|
+ lists:foldl(fun(I, B) -> merge_samplers(Time, I, B) end, #{}, Success).
|
|
|
+
|
|
|
+concurrently_sample_nodes(Nodes, Time) ->
|
|
|
+ %% emqx_dashboard_proto_v1:do_sample has a timeout (5s),
|
|
|
+ Timeout = ?RPC_TIMEOUT + ?ONE_SECOND,
|
|
|
+ %% call emqx_utils:pmap here instead of a rpc multicall
|
|
|
+ %% to avoid having to introduce a new bpapi proto version
|
|
|
+ emqx_utils:pmap(fun(Node) -> do_sample(Node, Time) end, Nodes, Timeout).
|
|
|
+
|
|
|
+merge_samplers(SinceTime, Increment0, Base) ->
|
|
|
+ Increment =
|
|
|
+ case map_size(Increment0) > ?MAX_POSSIBLE_SAMPLES of
|
|
|
+ true ->
|
|
|
+ %% this is a response from older version node
|
|
|
+ downsample(SinceTime, Increment0);
|
|
|
+ false ->
|
|
|
+ Increment0
|
|
|
+ end,
|
|
|
+ maps:fold(fun merge_samplers_loop/3, Base, Increment).
|
|
|
|
|
|
-merge_cluster_samplers(TS, NodeSample, Cluster) ->
|
|
|
- case maps:get(TS, Cluster, undefined) of
|
|
|
+merge_samplers_loop(TS, Increment, Base) when is_map(Increment) ->
|
|
|
+ case maps:get(TS, Base, undefined) of
|
|
|
undefined ->
|
|
|
- Cluster#{TS => NodeSample};
|
|
|
- ClusterSample ->
|
|
|
- Cluster#{TS => merge_cluster_sampler_map(NodeSample, ClusterSample)}
|
|
|
+ Base#{TS => Increment};
|
|
|
+ BaseSample when is_map(BaseSample) ->
|
|
|
+ Base#{TS => merge_sampler_maps(Increment, BaseSample)}
|
|
|
end.
|
|
|
|
|
|
-merge_cluster_sampler_map(M1, M2) ->
|
|
|
- Fun =
|
|
|
- fun
|
|
|
- (Key, Map) when
|
|
|
- %% cluster-synced values
|
|
|
- Key =:= topics;
|
|
|
- Key =:= subscriptions_durable;
|
|
|
- Key =:= disconnected_durable_sessions
|
|
|
- ->
|
|
|
- Map#{Key => maps:get(Key, M1, maps:get(Key, M2, 0))};
|
|
|
- (Key, Map) ->
|
|
|
- Map#{Key => maps:get(Key, M1, 0) + maps:get(Key, M2, 0)}
|
|
|
- end,
|
|
|
- lists:foldl(Fun, #{}, ?SAMPLER_LIST).
|
|
|
+merge_sampler_maps(M1, M2) when is_map(M1) andalso is_map(M2) ->
|
|
|
+ Fun = fun(Key, Acc) -> merge_values(Key, M1, Acc) end,
|
|
|
+ lists:foldl(Fun, M2, ?SAMPLER_LIST).
|
|
|
+
|
|
|
+%% topics, subscriptions_durable and disconnected_durable_sessions are cluster synced
|
|
|
+merge_values(topics, M1, M2) ->
|
|
|
+ max_values(topics, M1, M2);
|
|
|
+merge_values(subscriptions_durable, M1, M2) ->
|
|
|
+ max_values(subscriptions_durable, M1, M2);
|
|
|
+merge_values(disconnected_durable_sessions, M1, M2) ->
|
|
|
+ max_values(disconnected_durable_sessions, M1, M2);
|
|
|
+merge_values(Key, M1, M2) ->
|
|
|
+ sum_values(Key, M1, M2).
|
|
|
+
|
|
|
+max_values(Key, M1, M2) when is_map_key(Key, M1) orelse is_map_key(Key, M2) ->
|
|
|
+ M2#{Key => max(maps:get(Key, M1, 0), maps:get(Key, M2, 0))};
|
|
|
+max_values(_Key, _M1, M2) ->
|
|
|
+ M2.
|
|
|
+
|
|
|
+sum_values(Key, M1, M2) when is_map_key(Key, M1) orelse is_map_key(Key, M2) ->
|
|
|
+ M2#{Key => maps:get(Key, M1, 0) + maps:get(Key, M2, 0)};
|
|
|
+sum_values(_Key, _M1, M2) ->
|
|
|
+ M2.
|
|
|
|
|
|
merge_cluster_rate(Node, Cluster) ->
|
|
|
Fun =
|
|
|
@@ -310,13 +372,10 @@ adjust_synthetic_cluster_metrics(Metrics0) ->
|
|
|
|
|
|
format({badrpc, Reason}) ->
|
|
|
{badrpc, Reason};
|
|
|
-format(Data) ->
|
|
|
- All = maps:fold(fun format/3, [], Data),
|
|
|
- Compare = fun(#{time_stamp := T1}, #{time_stamp := T2}) -> T1 =< T2 end,
|
|
|
- lists:sort(Compare, All).
|
|
|
-
|
|
|
-format(TimeStamp, Data, All) ->
|
|
|
- [Data#{time_stamp => TimeStamp} | All].
|
|
|
+format(Data0) ->
|
|
|
+ Data1 = maps:to_list(Data0),
|
|
|
+ Data = lists:keysort(1, Data1),
|
|
|
+ lists:map(fun({TimeStamp, V}) -> V#{time_stamp => TimeStamp} end, Data).
|
|
|
|
|
|
cal_rate(_Now, undefined) ->
|
|
|
AllSamples = ?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP),
|
|
|
@@ -350,18 +409,83 @@ cal_rate_(Key, {Now, Last, TDelta, Res}) ->
|
|
|
RateKey = maps:get(Key, ?DELTA_SAMPLER_RATE_MAP),
|
|
|
{Now, Last, TDelta, Res#{RateKey => Rate}}.
|
|
|
|
|
|
-granularity_adapter([], Res) ->
|
|
|
- lists:reverse(Res);
|
|
|
-granularity_adapter([Sampler], Res) ->
|
|
|
- granularity_adapter([], [Sampler | Res]);
|
|
|
-granularity_adapter([Sampler1, Sampler2 | Rest], Res) ->
|
|
|
- Fun =
|
|
|
- fun(Key, M) ->
|
|
|
- Value1 = maps:get(Key, Sampler1),
|
|
|
- Value2 = maps:get(Key, Sampler2),
|
|
|
- M#{Key => Value1 + Value2}
|
|
|
+%% Try to keep the total number of recrods around 1000.
|
|
|
+%% When the oldest data point is
|
|
|
+%% < 1h: sample every 10s: 360 data points
|
|
|
+%% < 1d: sample every 1m: 1440 data points
|
|
|
+%% < 3d: sample every 5m: 864 data points
|
|
|
+%% < 7d: sample every 10m: 1008 data points
|
|
|
+sample_interval(Age) when Age =< 60 * ?SECONDS ->
|
|
|
+ %% so far this can happen only during tests
|
|
|
+ ?ONE_SECOND;
|
|
|
+sample_interval(Age) when Age =< ?ONE_HOUR ->
|
|
|
+ 10 * ?SECONDS;
|
|
|
+sample_interval(Age) when Age =< ?ONE_DAY ->
|
|
|
+ ?ONE_MINUTE;
|
|
|
+sample_interval(Age) when Age =< 3 * ?DAYS ->
|
|
|
+ 5 * ?MINUTES;
|
|
|
+sample_interval(_Age) ->
|
|
|
+ 10 * ?MINUTES.
|
|
|
+
|
|
|
+sample_fill_gap(Node, SinceTs) ->
|
|
|
+ Samples = do_sample(Node, SinceTs),
|
|
|
+ fill_gaps(Samples, SinceTs).
|
|
|
+
|
|
|
+fill_gaps(Samples, SinceTs) ->
|
|
|
+ TsList = lists:sort(maps:keys(Samples)),
|
|
|
+ case length(TsList) >= 2 of
|
|
|
+ true ->
|
|
|
+ do_fill_gaps(hd(TsList), tl(TsList), Samples, SinceTs);
|
|
|
+ false ->
|
|
|
+ Samples
|
|
|
+ end.
|
|
|
+
|
|
|
+do_fill_gaps(FirstTs, TsList, Samples, SinceTs) ->
|
|
|
+ Latest = lists:last(TsList),
|
|
|
+ Interval = sample_interval(Latest - SinceTs),
|
|
|
+ StartTs =
|
|
|
+ case round_down(SinceTs, Interval) of
|
|
|
+ T when T =:= 0 orelse T =:= FirstTs ->
|
|
|
+ FirstTs;
|
|
|
+ T ->
|
|
|
+ T
|
|
|
end,
|
|
|
- granularity_adapter(Rest, [lists:foldl(Fun, Sampler2, ?DELTA_SAMPLER_LIST) | Res]).
|
|
|
+ fill_gaps_loop(StartTs, Interval, Latest, Samples).
|
|
|
+
|
|
|
+fill_gaps_loop(T, _Interval, Latest, Samples) when T >= Latest ->
|
|
|
+ Samples;
|
|
|
+fill_gaps_loop(T, Interval, Latest, Samples) ->
|
|
|
+ Samples1 =
|
|
|
+ case is_map_key(T, Samples) of
|
|
|
+ true ->
|
|
|
+ Samples;
|
|
|
+ false ->
|
|
|
+ Samples#{T => #{}}
|
|
|
+ end,
|
|
|
+ fill_gaps_loop(T + Interval, Interval, Latest, Samples1).
|
|
|
+
|
|
|
+downsample(SinceTs, TsDataMap) when map_size(TsDataMap) >= 2 ->
|
|
|
+ TsList = ts_list(TsDataMap),
|
|
|
+ Latest = lists:max(TsList),
|
|
|
+ Interval = sample_interval(Latest - SinceTs),
|
|
|
+ downsample_loop(TsList, TsDataMap, Interval, #{});
|
|
|
+downsample(_Since, TsDataMap) ->
|
|
|
+ TsDataMap.
|
|
|
+
|
|
|
+ts_list(TsDataMap) ->
|
|
|
+ maps:keys(TsDataMap).
|
|
|
+
|
|
|
+round_down(Ts, Interval) ->
|
|
|
+ Ts - (Ts rem Interval).
|
|
|
+
|
|
|
+downsample_loop([], _TsDataMap, _Interval, Res) ->
|
|
|
+ Res;
|
|
|
+downsample_loop([Ts | Rest], TsDataMap, Interval, Res) ->
|
|
|
+ Bucket = round_down(Ts, Interval),
|
|
|
+ Agg0 = maps:get(Bucket, Res, #{}),
|
|
|
+ Inc = maps:get(Ts, TsDataMap),
|
|
|
+ Agg = merge_sampler_maps(Inc, Agg0),
|
|
|
+ downsample_loop(Rest, TsDataMap, Interval, Res#{Bucket => Agg}).
|
|
|
|
|
|
%% -------------------------------------------------------------------------------------------------
|
|
|
%% timer
|
|
|
@@ -381,7 +505,7 @@ clean_timer() ->
|
|
|
next_interval() ->
|
|
|
Interval = emqx_conf:get([dashboard, sample_interval], ?DEFAULT_SAMPLE_INTERVAL) * 1000,
|
|
|
Now = erlang:system_time(millisecond),
|
|
|
- NextTime = ((Now div Interval) + 1) * Interval,
|
|
|
+ NextTime = round_down(Now, Interval) + Interval,
|
|
|
Remaining = NextTime - Now,
|
|
|
{NextTime, Remaining}.
|
|
|
|
|
|
@@ -410,31 +534,30 @@ delta(LastData, NowData) ->
|
|
|
end,
|
|
|
lists:foldl(Fun, NowData, ?DELTA_SAMPLER_LIST).
|
|
|
|
|
|
+lookup(Ts) ->
|
|
|
+ ets:lookup(?TAB, Ts).
|
|
|
+
|
|
|
store(MonitData) ->
|
|
|
{atomic, ok} =
|
|
|
mria:transaction(mria:local_content_shard(), fun mnesia:write/3, [?TAB, MonitData, write]).
|
|
|
|
|
|
clean() ->
|
|
|
+ clean(?RETENTION_TIME).
|
|
|
+
|
|
|
+clean(Retention) ->
|
|
|
Now = erlang:system_time(millisecond),
|
|
|
- ExpiredMS = [{{'_', '$1', '_'}, [{'>', {'-', Now, '$1'}, ?RETENTION_TIME}], ['$_']}],
|
|
|
- Expired = ets:select(?TAB, ExpiredMS),
|
|
|
- lists:foreach(
|
|
|
- fun(Data) ->
|
|
|
- true = ets:delete_object(?TAB, Data)
|
|
|
- end,
|
|
|
- Expired
|
|
|
- ),
|
|
|
+ MS = ets:fun2ms(fun(#emqx_monit{time = T}) -> Now - T > Retention end),
|
|
|
+ _ = ets:select_delete(?TAB, MS),
|
|
|
ok.
|
|
|
|
|
|
-%% To make it easier to do data aggregation
|
|
|
-internal_format(List) when is_list(List) ->
|
|
|
+%% This data structure should not be changed because it's a RPC contract.
|
|
|
+%% Otherwise dashboard may not work during rolling upgrade.
|
|
|
+to_ts_data_map(List) when is_list(List) ->
|
|
|
Fun =
|
|
|
- fun(Data, All) ->
|
|
|
- maps:merge(internal_format(Data), All)
|
|
|
+ fun(#emqx_monit{time = Time, data = Data}, All) ->
|
|
|
+ All#{Time => Data}
|
|
|
end,
|
|
|
- lists:foldl(Fun, #{}, List);
|
|
|
-internal_format(#emqx_monit{time = Time, data = Data}) ->
|
|
|
- #{Time => Data}.
|
|
|
+ lists:foldl(Fun, #{}, List).
|
|
|
|
|
|
getstats(Key) ->
|
|
|
%% Stats ets maybe not exist when ekka join.
|