|
|
@@ -214,7 +214,7 @@ all_data() ->
|
|
|
inplace_downsample() ->
|
|
|
All = all_data(),
|
|
|
Now = erlang:system_time(millisecond),
|
|
|
- Compacted = compact(Now, All, []),
|
|
|
+ Compacted = compact(Now, All),
|
|
|
{Deletes, Writes} = compare(All, Compacted, [], []),
|
|
|
{atomic, ok} = mria:transaction(
|
|
|
mria:local_content_shard(),
|
|
|
@@ -255,18 +255,23 @@ compare([{T0, _} | _] = All, [{T1, Data1} | Compacted], Deletes, Writes) when T0
|
|
|
compare(All, Compacted, Deletes, [{T1, Data1} | Writes]).
|
|
|
|
|
|
%% compact the data points to a smaller set of buckets
|
|
|
-compact(_Now, [], Acc) ->
|
|
|
+%% Pre-condition: data fed to this function must be sorted chronologically.
|
|
|
+compact(Now, Data) ->
|
|
|
+ Gauges = gauges(),
|
|
|
+ compact(Now, Data, Gauges, []).
|
|
|
+
|
|
|
+compact(_Now, [], _Gauges, Acc) ->
|
|
|
lists:reverse(Acc);
|
|
|
-compact(Now, [{Time, Data} | Rest], Acc) ->
|
|
|
+compact(Now, [{Time, Data} | Rest], Gauges, Acc) ->
|
|
|
Interval = sample_interval(Now - Time),
|
|
|
Bucket = round_down(Time, Interval),
|
|
|
- NewAcc = merge_to_bucket(Bucket, Data, Acc),
|
|
|
- compact(Now, Rest, NewAcc).
|
|
|
+ NewAcc = merge_to_bucket(Bucket, Data, Gauges, Acc),
|
|
|
+ compact(Now, Rest, Gauges, NewAcc).
|
|
|
|
|
|
-merge_to_bucket(Bucket, Data, [{Bucket, Data0} | Acc]) ->
|
|
|
- NewData = merge_sampler_maps(Data, Data0),
|
|
|
+merge_to_bucket(Bucket, Data, Gauges, [{Bucket, Data0} | Acc]) ->
|
|
|
+ NewData = merge_local_sampler_maps(Data0, Data, Gauges),
|
|
|
[{Bucket, NewData} | Acc];
|
|
|
-merge_to_bucket(Bucket, Data, Acc) ->
|
|
|
+merge_to_bucket(Bucket, Data, _Gauges, Acc) ->
|
|
|
[{Bucket, Data} | Acc].
|
|
|
|
|
|
%% for testing
|
|
|
@@ -277,6 +282,7 @@ randomize(Count, Data) when is_map(Data) ->
|
|
|
randomize(Count, Data, Age) when is_map(Data) andalso is_integer(Age) ->
|
|
|
Now = erlang:system_time(millisecond) - 1,
|
|
|
StartTs = Now - Age,
|
|
|
+ Gauges = gauges(),
|
|
|
lists:foreach(
|
|
|
fun(_) ->
|
|
|
Ts = round_down(StartTs + rand:uniform(Age), timer:seconds(10)),
|
|
|
@@ -285,7 +291,7 @@ randomize(Count, Data, Age) when is_map(Data) andalso is_integer(Age) ->
|
|
|
[] ->
|
|
|
store(Record);
|
|
|
[#emqx_monit{data = D} = R] ->
|
|
|
- store(R#emqx_monit{data = merge_sampler_maps(D, Data)})
|
|
|
+ store(R#emqx_monit{data = merge_local_sampler_maps(Data, D, Gauges)})
|
|
|
end
|
|
|
end,
|
|
|
lists:seq(1, Count)
|
|
|
@@ -370,6 +376,7 @@ merge_sampler_maps(M1, M2) when is_map(M1) andalso is_map(M2) ->
|
|
|
Fun = fun(Key, Acc) -> merge_values(Key, M1, Acc) end,
|
|
|
lists:foldl(Fun, M2, ?SAMPLER_LIST).
|
|
|
|
|
|
+%% `M1' is assumed to be newer data compared to anything `M2' has seen.
|
|
|
merge_local_sampler_maps(M1, M2, Gauges) when is_map(M1) andalso is_map(M2) ->
|
|
|
Fun = fun(Key, Acc) -> merge_local_values(Key, M1, Acc, Gauges) end,
|
|
|
lists:foldl(Fun, M2, ?SAMPLER_LIST).
|
|
|
@@ -391,7 +398,7 @@ merge_local_values(Key, M1, M2, Gauges) when
|
|
|
%% First argument is assumed to be from a newer timestamp, so we keep the latest.
|
|
|
M2#{Key => maps:get(Key, M1, maps:get(Key, M2, 0))};
|
|
|
merge_local_values(Key, M1, M2, _Gauges) ->
|
|
|
- sum_values(Key, M1, M2).
|
|
|
+ merge_values(Key, M1, M2).
|
|
|
|
|
|
max_values(Key, M1, M2) when is_map_key(Key, M1) orelse is_map_key(Key, M2) ->
|
|
|
M2#{Key => max(maps:get(Key, M1, 0), maps:get(Key, M2, 0))};
|
|
|
@@ -554,7 +561,7 @@ downsample_local(SinceTs, TsDataMap) when map_size(TsDataMap) >= 2 ->
|
|
|
TsList = ts_list(TsDataMap),
|
|
|
Latest = lists:max(TsList),
|
|
|
Interval = sample_interval(Latest - SinceTs),
|
|
|
- Gauges = maps:from_keys(?GAUGE_SAMPLER_LIST, true),
|
|
|
+ Gauges = gauges(),
|
|
|
downsample_local_loop(TsList, Gauges, TsDataMap, Interval, #{});
|
|
|
downsample_local(_Since, TsDataMap) ->
|
|
|
TsDataMap.
|
|
|
@@ -583,6 +590,9 @@ downsample_local_loop([Ts | Rest], Gauges, TsDataMap, Interval, Res) ->
|
|
|
Agg = merge_local_sampler_maps(Inc, Agg0, Gauges),
|
|
|
downsample_local_loop(Rest, Gauges, TsDataMap, Interval, Res#{Bucket => Agg}).
|
|
|
|
|
|
+gauges() ->
|
|
|
+ maps:from_keys(?GAUGE_SAMPLER_LIST, true).
|
|
|
+
|
|
|
%% -------------------------------------------------------------------------------------------------
|
|
|
%% timer
|
|
|
|