Jelajahi Sumber

fix: dashboard monitor fill gap before returning to browser

avoid fill gap when downsample because old version nodes may not
be happy about it
zmstone 1 tahun lalu
induk
melakukan
3682c2d7d6

+ 58 - 33
apps/emqx_dashboard/src/emqx_dashboard_monitor.erl

@@ -39,8 +39,7 @@
 -export([
     samplers/0,
     samplers/2,
-    current_rate/1,
-    downsample/1
+    current_rate/1
 ]).
 
 %% for rpc
@@ -56,7 +55,9 @@
     lookup/1,
     sample_nodes/3,
     randomize/2,
-    randomize/3
+    randomize/3,
+    sample_fill_gap/2,
+    fill_gaps/2
 ]).
 
 -define(TAB, ?MODULE).
@@ -98,11 +99,11 @@ create_tables() ->
 %% API
 
 samplers() ->
-    format(do_sample(all, infinity)).
+    format(sample_fill_gap(all, 0)).
 
 samplers(NodeOrCluster, Latest) ->
     SinceTime = latest2time(Latest),
-    case format(do_sample(NodeOrCluster, SinceTime)) of
+    case format(sample_fill_gap(NodeOrCluster, SinceTime)) of
         {badrpc, Reason} ->
             {badrpc, Reason};
         List when is_list(List) ->
@@ -206,11 +207,11 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
 %% Internal functions
 
 %% for testing
-randomize(Count, Data) ->
+randomize(Count, Data) when is_map(Data) ->
     MaxAge = 7 * ?DAYS,
     randomize(Count, Data, MaxAge).
 
-randomize(Count, Data, Age) ->
+randomize(Count, Data, Age) when is_map(Data) andalso is_integer(Age) ->
     Now = erlang:system_time(millisecond) - 1,
     Interval = sample_interval(Age),
     NowBase = Now - (Now rem Interval),
@@ -251,7 +252,7 @@ do_sample(Node, Time) when Node == node() andalso is_integer(Time) ->
     FromDB = ets:select(?TAB, MS),
     Map = to_ts_data_map(FromDB),
     %% downsample before return RPC calls for less data to merge by the caller nodes
-    downsample(Map);
+    downsample(Time, Map);
 do_sample(Node, Time) when is_integer(Time) ->
     case emqx_dashboard_proto_v1:do_sample(Node, Time) of
         {badrpc, Reason} ->
@@ -283,15 +284,15 @@ concurrently_sample_nodes(Nodes, Time) ->
 merge_samplers(Increment, Base) ->
     maps:fold(fun merge_samplers_loop/3, Base, Increment).
 
-merge_samplers_loop(TS, Increment, Base) ->
+merge_samplers_loop(TS, Increment, Base) when is_map(Increment) ->
     case maps:get(TS, Base, undefined) of
         undefined ->
             Base#{TS => Increment};
-        BaseSample ->
+        BaseSample when is_map(BaseSample) ->
             Base#{TS => merge_sampler_maps(Increment, BaseSample)}
     end.
 
-merge_sampler_maps(M1, M2) ->
+merge_sampler_maps(M1, M2) when is_map(M1) andalso is_map(M2) ->
     Fun =
         fun
             (Key, Map) when
@@ -406,38 +407,62 @@ sample_interval(Age) when Age =< 3 * ?DAYS ->
 sample_interval(_Age) ->
     10 * ?MINUTES.
 
-downsample(TsDataMap) when map_size(TsDataMap) >= 2 ->
-    [Oldest | _] = TsList = lists:sort(maps:keys(TsDataMap)),
+sample_fill_gap(Node, SinceTs) ->
+    Samples = do_sample(Node, SinceTs),
+    fill_gaps(Samples, SinceTs).
+
+fill_gaps(Samples, SinceTs) ->
+    TsList = lists:sort(maps:keys(Samples)),
+    case length(TsList) >= 2 of
+        true ->
+            do_fill_gaps(hd(TsList), tl(TsList), Samples, SinceTs);
+        false ->
+            Samples
+    end.
+
+do_fill_gaps(FirstTs, TsList, Samples, SinceTs) ->
+    Latest = lists:last(TsList),
+    Interval = sample_interval(Latest - SinceTs),
+    StartTs =
+        case round_down(SinceTs, Interval) of
+            T when T =:= 0 orelse T =:= FirstTs ->
+                FirstTs;
+            T ->
+                T
+        end,
+    fill_gaps_loop(StartTs, Interval, Latest, Samples).
+
+fill_gaps_loop(T, _Interval, Latest, Samples) when T >= Latest ->
+    Samples;
+fill_gaps_loop(T, Interval, Latest, Samples) ->
+    Samples1 =
+        case is_map_key(T, Samples) of
+            true ->
+                Samples;
+            false ->
+                Samples#{T => #{}}
+        end,
+    fill_gaps_loop(T + Interval, Interval, Latest, Samples1).
+
+downsample(SinceTs, TsDataMap) when map_size(TsDataMap) >= 2 ->
+    TsList = lists:sort(maps:keys(TsDataMap)),
     Latest = lists:last(TsList),
-    Interval = sample_interval(Latest - Oldest),
-    downsample_loop(TsList, TsDataMap, Interval, #{}, 0);
-downsample(TsDataMap) ->
+    Interval = sample_interval(Latest - SinceTs),
+    downsample_loop(TsList, TsDataMap, Interval, #{});
+downsample(_Since, TsDataMap) ->
     TsDataMap.
 
 round_down(Ts, Interval) ->
     Ts - (Ts rem Interval).
 
-downsample_loop([], _TsDataMap, _Interval, Res, _LastBucket) ->
+downsample_loop([], _TsDataMap, _Interval, Res) ->
     Res;
-downsample_loop([Ts | Rest], TsDataMap, Interval, Res, LastBucket) ->
+downsample_loop([Ts | Rest], TsDataMap, Interval, Res) ->
     Bucket = round_down(Ts, Interval),
-    Res1 = maybe_inject_missing_data_points(Res, LastBucket, Bucket, Interval),
-    Agg0 = maps:get(Bucket, Res1, #{}),
+    Agg0 = maps:get(Bucket, Res, #{}),
     Inc = maps:get(Ts, TsDataMap),
     Agg = merge_sampler_maps(Inc, Agg0),
-    downsample_loop(Rest, TsDataMap, Interval, Res1#{Bucket => Agg}, Bucket).
-
-maybe_inject_missing_data_points(Res, 0, _Current, _Interval) ->
-    Res;
-maybe_inject_missing_data_points(Res, Last, Current, Interval) ->
-    Next = Last + Interval,
-    case Next >= Current of
-        true ->
-            Res;
-        false ->
-            NewRes = Res#{Next => #{}},
-            maybe_inject_missing_data_points(NewRes, Next, Current, Interval)
-    end.
+    downsample_loop(Rest, TsDataMap, Interval, Res#{Bucket => Agg}).
 
 %% -------------------------------------------------------------------------------------------------
 %% timer

+ 43 - 21
apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl

@@ -168,19 +168,20 @@ t_empty_table(_Config) ->
     end.
 
 t_pmap_nodes(_Config) ->
-    MaxAge = 3 * timer:hours(24),
+    MaxAge = timer:hours(1),
     Now = erlang:system_time(millisecond) - 1,
     Interval = emqx_dashboard_monitor:sample_interval(MaxAge),
-    NowBase = Now - (Now rem Interval),
-    StartTs = NowBase - MaxAge,
+    StartTs = round_down(Now - MaxAge, Interval),
     DataPoints = 5,
     ok = emqx_dashboard_monitor:clean(0),
     ok = insert_data_points(DataPoints, StartTs, Now),
     Nodes = [node(), node(), node()],
-    Data = emqx_dashboard_monitor:format(emqx_dashboard_monitor:sample_nodes(Nodes, StartTs, #{})),
-    #{sent := Total0} = hd(Data),
-    TotalSent = check_sample_intervals(Interval, hd(Data), tl(Data), _Index = 1, Total0),
-    ?assertEqual(DataPoints * length(Nodes), TotalSent).
+    %% this function calls emqx_utils:pmap to do the job
+    Data0 = emqx_dashboard_monitor:sample_nodes(Nodes, StartTs, #{}),
+    Data1 = emqx_dashboard_monitor:fill_gaps(Data0, StartTs),
+    Data = emqx_dashboard_monitor:format(Data1),
+    ok = check_sample_intervals(Interval, hd(Data), tl(Data)),
+    ?assertEqual(DataPoints * length(Nodes), sum_value(Data, sent)).
 
 t_randomize(_Config) ->
     ok = emqx_dashboard_monitor:clean(0),
@@ -208,28 +209,37 @@ t_downsample_1h(_Config) ->
 
 sent_1() -> #{sent => 1}.
 
+round_down(Ts, Interval) ->
+    Ts - (Ts rem Interval).
+
 test_downsample(MaxAge, DataPoints) ->
     Now = erlang:system_time(millisecond) - 1,
     Interval = emqx_dashboard_monitor:sample_interval(MaxAge),
-    NowBase = Now - (Now rem Interval),
-    StartTs = NowBase - MaxAge,
+    StartTs = round_down(Now - MaxAge, Interval),
     ok = emqx_dashboard_monitor:clean(0),
     %% insert the start mark for deterministic test boundary
     ok = write(StartTs, sent_1()),
     ok = insert_data_points(DataPoints - 1, StartTs, Now),
-    Data = emqx_dashboard_monitor:format(emqx_dashboard_monitor:do_sample(all, StartTs)),
+    Data = emqx_dashboard_monitor:format(emqx_dashboard_monitor:sample_fill_gap(all, StartTs)),
     ?assertEqual(StartTs, maps:get(time_stamp, hd(Data))),
-    #{sent := Total0} = hd(Data),
-    TotalSent = check_sample_intervals(Interval, hd(Data), tl(Data), _Index = 1, Total0),
-    ?assertEqual(DataPoints, TotalSent).
-
-check_sample_intervals(_Interval, _, [], _Index, Total) ->
-    Total;
-check_sample_intervals(Interval, #{time_stamp := T} = Prev, [First | Rest], Index, Total) ->
-    NewTotal = Total + maps:get(sent, Prev, 0),
+    ok = check_sample_intervals(Interval, hd(Data), tl(Data)),
+    ?assertEqual(DataPoints, sum_value(Data, sent)),
+    ok.
+
+sum_value(Data, Key) ->
+    sum_value(Data, Key, 0).
+
+sum_value([], _, V) ->
+    V;
+sum_value([D | Rest], Key, V) ->
+    sum_value(Rest, Key, maps:get(Key, D, 0) + V).
+
+check_sample_intervals(_Interval, _, []) ->
+    ok;
+check_sample_intervals(Interval, #{time_stamp := T}, [First | Rest]) ->
     #{time_stamp := T2} = First,
-    ?assertEqual({Index, T + Interval}, {Index, T2}),
-    check_sample_intervals(Interval, First, Rest, Index + 1, NewTotal).
+    ?assertEqual(T + Interval, T2),
+    check_sample_intervals(Interval, First, Rest).
 
 insert_data_points(0, _TsMin, _TsMax) ->
     ok;
@@ -311,7 +321,19 @@ t_monitor_api(_) ->
     Fun =
         fun(Sampler) ->
             Keys = [binary_to_atom(Key, utf8) || Key <- maps:keys(Sampler)],
-            [?assert(lists:member(SamplerName, Keys)) || SamplerName <- ?SAMPLER_LIST]
+            case Keys =:= [time_stamp] of
+                true ->
+                    %% this is a dummy data point filling the gap
+                    ok;
+                false ->
+                    lists:all(
+                        fun(K) ->
+                            lists:member(K, Keys)
+                        end,
+                        ?SAMPLER_LIST
+                    ) orelse
+                        ct:fail(Keys)
+            end
         end,
     [Fun(Sampler) || Sampler <- Samplers],
     {ok, NodeSamplers} = request(["monitor", "nodes", node()]),