Browse Source

refactor: cosmetic changes and add test case

zmstone 1 year ago
parent
commit
6bd4b2e558

+ 41 - 35
apps/emqx_dashboard/src/emqx_dashboard_monitor.erl

@@ -20,6 +20,7 @@
 
 -include_lib("snabbkaffe/include/trace.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
 
 -behaviour(gen_server).
 
@@ -42,13 +43,12 @@
     downsample/1
 ]).
 
--ifdef(TEST).
--export([current_rate_cluster/0]).
--endif.
-
 %% for rpc
 -export([do_sample/2]).
 
+%% For tests
+-export([current_rate_cluster/0, sample_interval/1, store/1, format/1, clean/1, lookup/1]).
+
 -define(TAB, ?MODULE).
 
 -define(ONE_SECOND, 1_000).
@@ -91,15 +91,15 @@ samplers() ->
     format(do_sample(all, infinity)).
 
 samplers(NodeOrCluster, Latest) ->
-    Time = latest2time(Latest),
-    case format(do_sample(NodeOrCluster, Time)) of
+    SinceTime = latest2time(Latest),
+    case format(do_sample(NodeOrCluster, SinceTime)) of
         {badrpc, Reason} ->
             {badrpc, Reason};
         List when is_list(List) ->
             List
     end.
 
-latest2time(infinity) -> infinity;
+latest2time(infinity) -> 0;
 latest2time(Latest) -> erlang:system_time(millisecond) - (Latest * 1000).
 
 current_rate(all) ->
@@ -204,18 +204,21 @@ maybe_cancel_timer(_) ->
 do_call(Request) ->
     gen_server:call(?MODULE, Request, 5000).
 
-do_sample(all, Time) ->
+do_sample(Node, infinity) ->
+    %% handle RPC from old version nodes
+    do_sample(Node, 0);
+do_sample(all, Time) when is_integer(Time) ->
     AllNodes = emqx:running_nodes() -- [node()],
     Local = do_sample(node(), Time),
     All = sample_nodes(AllNodes, Time, Local),
     maps:map(fun(_, S) -> adjust_synthetic_cluster_metrics(S) end, All);
-do_sample(Node, Time) when Node == node() ->
-    MS = match_spec(Time),
+do_sample(Node, Time) when Node == node() andalso is_integer(Time) ->
+    MS = ets:fun2ms(fun(#emqx_monit{time = T} = A) when T >= Time -> A end),
     FromDB = ets:select(?TAB, MS),
     Map = to_ts_data_map(FromDB),
     %% downsample before return RPC calls for less data to merge by the caller nodes
     downsample(Map);
-do_sample(Node, Time) ->
+do_sample(Node, Time) when is_integer(Time) ->
     case emqx_dashboard_proto_v1:do_sample(Node, Time) of
         {badrpc, Reason} ->
             {badrpc, #{node => Node, reason => Reason}};
@@ -243,11 +246,6 @@ concurrently_sample_nodes(Nodes, Time) ->
     %% to avoid having to introduce a new bpapi proto version
     emqx_utils:pmap(fun(Node) -> do_sample(Node, Time) end, Nodes, Timeout).
 
-match_spec(infinity) ->
-    [{'$1', [], ['$1']}];
-match_spec(Time) ->
-    [{{'_', '$1', '_'}, [{'>=', '$1', Time}], ['$_']}].
-
 merge_samplers(Increment, Base) ->
     maps:fold(fun merge_samplers_loop/3, Base, Increment).
 
@@ -362,6 +360,9 @@ cal_rate_(Key, {Now, Last, TDelta, Res}) ->
 %% < 1d: sample every 1m: 1440 data points
 %% < 3d: sample every 5m: 864 data points
 %% < 7d: sample every 10m: 1008 data points
+sample_interval(Age) when Age =< 60 * ?SECONDS ->
+    %% so far this can happen only during tests
+    ?ONE_SECOND;
 sample_interval(Age) when Age =< ?ONE_HOUR ->
     10 * ?SECONDS;
 sample_interval(Age) when Age =< ?ONE_DAY ->
@@ -379,24 +380,29 @@ downsample(TsDataMap) when map_size(TsDataMap) >= 2 ->
 downsample(TsDataMap) ->
     TsDataMap.
 
-downsample_loop([], _TsDataMap, _Interval, Res, _LastRoundDown) ->
+round_down(Ts, Interval) ->
+    Ts - (Ts rem Interval).
+
+downsample_loop([], _TsDataMap, _Interval, Res, _LastBucket) ->
     Res;
-downsample_loop([Ts | Rest], TsDataMap, Interval, Res, LastRoundDown) ->
-    RoundDown = Ts - (Ts rem Interval),
-    Res1 = maybe_inject_missing_data_points(Res, LastRoundDown, RoundDown, Interval),
-    Agg0 = maps:get(RoundDown, Res1, #{}),
+downsample_loop([Ts | Rest], TsDataMap, Interval, Res, LastBucket) ->
+    Bucket = round_down(Ts, Interval),
+    Res1 = maybe_inject_missing_data_points(Res, LastBucket, Bucket, Interval),
+    Agg0 = maps:get(Bucket, Res1, #{}),
     Inc = maps:get(Ts, TsDataMap),
     Agg = merge_sampler_maps(Inc, Agg0),
-    downsample_loop(Rest, TsDataMap, Interval, Res1#{RoundDown => Agg}, RoundDown).
+    downsample_loop(Rest, TsDataMap, Interval, Res1#{Bucket => Agg}, Bucket).
 
-maybe_inject_missing_data_points(Res, 0, _Stop, _Interval) ->
+maybe_inject_missing_data_points(Res, 0, _Current, _Interval) ->
     Res;
-maybe_inject_missing_data_points(Res, T, Stop, Interval) ->
-    case T + Interval >= Stop of
+maybe_inject_missing_data_points(Res, Last, Current, Interval) ->
+    Next = Last + Interval,
+    case Next >= Current of
         true ->
             Res;
         false ->
-            maybe_inject_missing_data_points(Res#{T => #{}}, T + Interval, Stop, Interval)
+            NewRes = Res#{Next => #{}},
+            maybe_inject_missing_data_points(NewRes, Next, Current, Interval)
     end.
 
 %% -------------------------------------------------------------------------------------------------
@@ -417,7 +423,7 @@ clean_timer() ->
 next_interval() ->
     Interval = emqx_conf:get([dashboard, sample_interval], ?DEFAULT_SAMPLE_INTERVAL) * 1000,
     Now = erlang:system_time(millisecond),
-    NextTime = ((Now div Interval) + 1) * Interval,
+    NextTime = round_down(Now, Interval) + Interval,
     Remaining = NextTime - Now,
     {NextTime, Remaining}.
 
@@ -446,20 +452,20 @@ delta(LastData, NowData) ->
         end,
     lists:foldl(Fun, NowData, ?DELTA_SAMPLER_LIST).
 
+lookup(Ts) ->
+    ets:lookup(?TAB, Ts).
+
 store(MonitData) ->
     {atomic, ok} =
         mria:transaction(mria:local_content_shard(), fun mnesia:write/3, [?TAB, MonitData, write]).
 
 clean() ->
+    clean(?RETENTION_TIME).
+
+clean(Retention) ->
     Now = erlang:system_time(millisecond),
-    ExpiredMS = [{{'_', '$1', '_'}, [{'>', {'-', Now, '$1'}, ?RETENTION_TIME}], ['$_']}],
-    Expired = ets:select(?TAB, ExpiredMS),
-    lists:foreach(
-        fun(Data) ->
-            true = ets:delete_object(?TAB, Data)
-        end,
-        Expired
-    ),
+    MS = ets:fun2ms(fun(#emqx_monit{time = T}) -> Now - T > Retention end),
+    _ = ets:select_delete(?TAB, MS),
     ok.
 
 %% This data structure should not be changed because it's a RPC contract.

+ 72 - 27
apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl

@@ -158,32 +158,75 @@ end_per_testcase(_TestCase, _Config) ->
 %% Test Cases
 %%--------------------------------------------------------------------
 
-t_monitor_samplers_all(_Config) ->
-    {ok, _} =
-        snabbkaffe:block_until(
-            ?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}),
-            infinity
-        ),
-    Size = mnesia:table_info(emqx_dashboard_monitor, size),
-    All = emqx_dashboard_monitor:samplers(all, infinity),
-    All2 = emqx_dashboard_monitor:samplers(),
-    ?assert(erlang:length(All) == Size),
-    ?assert(erlang:length(All2) == Size),
-    ok.
+t_empty_table(_Config) ->
+    sys:suspend(whereis(emqx_dashboard_monitor)),
+    try
+        emqx_dashboard_monitor:clean(0),
+        ?assertEqual({ok, []}, request(["monitor"], "latest=20000"))
+    after
+        sys:resume(whereis(emqx_dashboard_monitor))
+    end.
 
-t_monitor_samplers_latest(_Config) ->
-    {ok, _} =
-        snabbkaffe:block_until(
-            ?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}),
-            infinity
-        ),
-    ?retry(1_000, 10, begin
-        Samplers = emqx_dashboard_monitor:samplers(node(), 2),
-        Latest = emqx_dashboard_monitor:samplers(node(), 1),
-        ?assert(erlang:length(Samplers) == 2, #{samplers => Samplers}),
-        ?assert(erlang:length(Latest) == 1, #{latest => Latest}),
-        ?assert(hd(Latest) == lists:nth(2, Samplers))
-    end),
+t_downsample_7d(_Config) ->
+    MaxAge = 7 * timer:hours(24),
+    test_downsample(MaxAge, 10).
+
+t_downsample_3d(_Config) ->
+    MaxAge = 3 * timer:hours(24),
+    test_downsample(MaxAge, 10).
+
+t_downsample_1d(_Config) ->
+    MaxAge = timer:hours(24),
+    test_downsample(MaxAge, 10).
+
+t_downsample_1h(_Config) ->
+    MaxAge = timer:hours(1),
+    test_downsample(MaxAge, 10).
+
+sent_1() -> #{sent => 1}.
+
+test_downsample(MaxAge, DataPoints) ->
+    Now = erlang:system_time(millisecond) - 1,
+    Interval = emqx_dashboard_monitor:sample_interval(MaxAge),
+    NowBase = Now - (Now rem Interval),
+    StartTs = NowBase - MaxAge,
+    emqx_dashboard_monitor:clean(0),
+    ?assertEqual([], ets:tab2list(emqx_dashboard_monitor)),
+    %% insert the start mark for deterministic test boundary
+    ok = write(StartTs, sent_1()),
+    ok = insert_data_points(DataPoints - 1, StartTs, Now),
+    Data = emqx_dashboard_monitor:format(emqx_dashboard_monitor:do_sample(all, StartTs)),
+    ?assertEqual(StartTs, maps:get(time_stamp, hd(Data))),
+    TotalSent = check_sample_intervals(Interval, hd(Data), tl(Data), _Index = 1, _Total = 1),
+    ?assertEqual(DataPoints, TotalSent).
+
+check_sample_intervals(_Interval, _, [], _Index, Total) ->
+    Total;
+check_sample_intervals(Interval, #{time_stamp := T} = Prev, [First | Rest], Index, Total) ->
+    NewTotal = Total + maps:get(sent, Prev, 0),
+    #{time_stamp := T2} = First,
+    ?assertEqual({Index, T + Interval}, {Index, T2}),
+    check_sample_intervals(Interval, First, Rest, Index + 1, NewTotal).
+
+insert_data_points(0, _TsMin, _TsMax) ->
+    ok;
+insert_data_points(N, TsMin, TsMax) when N > 0 ->
+    Data = sent_1(),
+    FakeTs = TsMin + rand:uniform(TsMax - TsMin),
+    case read(FakeTs) of
+        [] ->
+            ok = write(FakeTs, Data),
+            insert_data_points(N - 1, TsMin, TsMax);
+        _ ->
+            %% clashed, try again
+            insert_data_points(N, TsMin, TsMax)
+    end.
+
+read(Ts) ->
+    emqx_dashboard_monitor:lookup(Ts).
+
+write(Time, Data) ->
+    {atomic, ok} = emqx_dashboard_monitor:store({emqx_monit, Time, Data}),
     ok.
 
 t_monitor_sampler_format(_Config) ->
@@ -233,12 +276,14 @@ t_handle_old_monitor_data(_Config) ->
     ok.
 
 t_monitor_api(_) ->
+    emqx_dashboard_monitor:clean(0),
     {ok, _} =
         snabbkaffe:block_until(
             ?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}),
-            infinity
+            infinity,
+            0
         ),
-    {ok, Samplers} = request(["monitor"], "latest=200"),
+    {ok, Samplers} = request(["monitor"], "latest=20"),
     ?assert(erlang:length(Samplers) >= 2, #{samplers => Samplers}),
     Fun =
         fun(Sampler) ->