Преглед изворни кода

Merge pull request #14023 from thalesmg/20241018-r58-monitor-stats

fix(dashboard monitor): don't sum data for local aggregation
Thales Macedo Garitezi пре 1 година
родитељ
комит
c886c42cd5

+ 53 - 12
apps/emqx_dashboard/src/emqx_dashboard_monitor.erl

@@ -214,7 +214,7 @@ all_data() ->
 inplace_downsample() ->
     All = all_data(),
     Now = erlang:system_time(millisecond),
-    Compacted = compact(Now, All, []),
+    Compacted = compact(Now, All),
     {Deletes, Writes} = compare(All, Compacted, [], []),
     {atomic, ok} = mria:transaction(
         mria:local_content_shard(),
@@ -255,18 +255,23 @@ compare([{T0, _} | _] = All, [{T1, Data1} | Compacted], Deletes, Writes) when T0
     compare(All, Compacted, Deletes, [{T1, Data1} | Writes]).
 
 %% compact the data points to a smaller set of buckets
-compact(_Now, [], Acc) ->
+%% Pre-condition: data fed to this function must be sorted chronologically.
+compact(Now, Data) ->
+    Gauges = gauges(),
+    compact(Now, Data, Gauges, []).
+
+compact(_Now, [], _Gauges, Acc) ->
     lists:reverse(Acc);
-compact(Now, [{Time, Data} | Rest], Acc) ->
+compact(Now, [{Time, Data} | Rest], Gauges, Acc) ->
     Interval = sample_interval(Now - Time),
     Bucket = round_down(Time, Interval),
-    NewAcc = merge_to_bucket(Bucket, Data, Acc),
-    compact(Now, Rest, NewAcc).
+    NewAcc = merge_to_bucket(Bucket, Data, Gauges, Acc),
+    compact(Now, Rest, Gauges, NewAcc).
 
-merge_to_bucket(Bucket, Data, [{Bucket, Data0} | Acc]) ->
-    NewData = merge_sampler_maps(Data, Data0),
+merge_to_bucket(Bucket, Data, Gauges, [{Bucket, Data0} | Acc]) ->
+    NewData = merge_local_sampler_maps(Data0, Data, Gauges),
     [{Bucket, NewData} | Acc];
-merge_to_bucket(Bucket, Data, Acc) ->
+merge_to_bucket(Bucket, Data, _Gauges, Acc) ->
     [{Bucket, Data} | Acc].
 
 %% for testing
@@ -277,6 +282,7 @@ randomize(Count, Data) when is_map(Data) ->
 randomize(Count, Data, Age) when is_map(Data) andalso is_integer(Age) ->
     Now = erlang:system_time(millisecond) - 1,
     StartTs = Now - Age,
+    Gauges = gauges(),
     lists:foreach(
         fun(_) ->
             Ts = round_down(StartTs + rand:uniform(Age), timer:seconds(10)),
@@ -285,7 +291,7 @@ randomize(Count, Data, Age) when is_map(Data) andalso is_integer(Age) ->
                 [] ->
                     store(Record);
                 [#emqx_monit{data = D} = R] ->
-                    store(R#emqx_monit{data = merge_sampler_maps(D, Data)})
+                    store(R#emqx_monit{data = merge_local_sampler_maps(Data, D, Gauges)})
             end
         end,
         lists:seq(1, Count)
@@ -322,7 +328,7 @@ do_sample_local(Time) ->
     FromDB = ets:select(?TAB, MS),
     Map = to_ts_data_map(FromDB),
     %% downsample before return RPC calls for less data to merge by the caller nodes
-    downsample(Time, Map).
+    downsample_local(Time, Map).
 
 %% log error level when there is no success (unlikely to happen), and warning otherwise
 badrpc_log_level([]) -> error;
@@ -370,6 +376,11 @@ merge_sampler_maps(M1, M2) when is_map(M1) andalso is_map(M2) ->
     Fun = fun(Key, Acc) -> merge_values(Key, M1, Acc) end,
     lists:foldl(Fun, M2, ?SAMPLER_LIST).
 
+%% `M1' is assumed to be newer data compared to anything `M2' has seen.
+merge_local_sampler_maps(M1, M2, Gauges) when is_map(M1) andalso is_map(M2) ->
+    Fun = fun(Key, Acc) -> merge_local_values(Key, M1, Acc, Gauges) end,
+    lists:foldl(Fun, M2, ?SAMPLER_LIST).
+
 %% topics, subscriptions_durable and disconnected_durable_sessions are cluster synced
 merge_values(topics, M1, M2) ->
     max_values(topics, M1, M2);
@@ -380,6 +391,15 @@ merge_values(disconnected_durable_sessions, M1, M2) ->
 merge_values(Key, M1, M2) ->
     sum_values(Key, M1, M2).
 
+merge_local_values(Key, M1, M2, Gauges) when
+    is_map_key(Key, Gauges) andalso
+        (is_map_key(Key, M1) orelse is_map_key(Key, M2))
+->
+    %% First argument is assumed to be from a newer timestamp, so we keep the latest.
+    M2#{Key => maps:get(Key, M1, maps:get(Key, M2, 0))};
+merge_local_values(Key, M1, M2, _Gauges) ->
+    merge_values(Key, M1, M2).
+
 max_values(Key, M1, M2) when is_map_key(Key, M1) orelse is_map_key(Key, M2) ->
     M2#{Key => max(maps:get(Key, M1, 0), maps:get(Key, M2, 0))};
 max_values(_Key, _M1, M2) ->
@@ -497,7 +517,7 @@ sample_fill_gap(Node, SinceTs) ->
 fill_gaps({badrpc, _} = BadRpc, _) ->
     BadRpc;
 fill_gaps(Samples, SinceTs) when is_map(Samples) ->
-    TsList = lists:sort(maps:keys(Samples)),
+    TsList = ts_list(Samples),
     case length(TsList) >= 2 of
         true ->
             do_fill_gaps(hd(TsList), tl(TsList), Samples, SinceTs);
@@ -537,8 +557,17 @@ downsample(SinceTs, TsDataMap) when map_size(TsDataMap) >= 2 ->
 downsample(_Since, TsDataMap) ->
     TsDataMap.
 
+downsample_local(SinceTs, TsDataMap) when map_size(TsDataMap) >= 2 ->
+    TsList = ts_list(TsDataMap),
+    Latest = lists:max(TsList),
+    Interval = sample_interval(Latest - SinceTs),
+    Gauges = gauges(),
+    downsample_local_loop(TsList, Gauges, TsDataMap, Interval, #{});
+downsample_local(_Since, TsDataMap) ->
+    TsDataMap.
+
 ts_list(TsDataMap) ->
-    maps:keys(TsDataMap).
+    lists:sort(maps:keys(TsDataMap)).
 
 round_down(Ts, Interval) ->
     Ts - (Ts rem Interval).
@@ -552,6 +581,18 @@ downsample_loop([Ts | Rest], TsDataMap, Interval, Res) ->
     Agg = merge_sampler_maps(Inc, Agg0),
     downsample_loop(Rest, TsDataMap, Interval, Res#{Bucket => Agg}).
 
+downsample_local_loop([], _Gauges, _TsDataMap, _Interval, Res) ->
+    Res;
+downsample_local_loop([Ts | Rest], Gauges, TsDataMap, Interval, Res) ->
+    Bucket = round_down(Ts, Interval),
+    Agg0 = maps:get(Bucket, Res, #{}),
+    Inc = maps:get(Ts, TsDataMap),
+    Agg = merge_local_sampler_maps(Inc, Agg0, Gauges),
+    downsample_local_loop(Rest, Gauges, TsDataMap, Interval, Res#{Bucket => Agg}).
+
+gauges() ->
+    maps:from_keys(?GAUGE_SAMPLER_LIST, true).
+
 %% -------------------------------------------------------------------------------------------------
 %% timer
 

+ 265 - 61
apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl

@@ -84,36 +84,15 @@ end_per_suite(_Config) ->
 init_per_group(persistent_sessions = Group, Config) ->
     case emqx_ds_test_helpers:skip_if_norepl() of
         false ->
-            AppSpecsFn = fun(Enable) ->
-                Port =
-                    case Enable of
-                        true -> "18083";
-                        false -> "0"
-                    end,
-                [
-                    emqx_conf,
-                    {emqx, "durable_sessions {enable = true}"},
-                    {emqx_retainer, ?BASE_RETAINER_CONF},
-                    emqx_management,
-                    emqx_mgmt_api_test_util:emqx_dashboard(
-                        lists:concat([
-                            "dashboard.listeners.http { bind = " ++ Port ++ " }\n",
-                            "dashboard.sample_interval = 1s\n",
-                            "dashboard.listeners.http.enable = " ++ atom_to_list(Enable)
-                        ])
-                    )
-                ]
-            end,
+            Port = 18083,
             NodeSpecs = [
-                {dashboard_monitor1, #{apps => AppSpecsFn(true)}},
-                {dashboard_monitor2, #{apps => AppSpecsFn(false)}}
+                {dashboard_monitor1, #{apps => cluster_node_appspec(true, Port)}},
+                {dashboard_monitor2, #{apps => cluster_node_appspec(false, Port)}}
             ],
-            Nodes =
-                [N1 | _] = emqx_cth_cluster:start(
-                    NodeSpecs,
-                    #{work_dir => emqx_cth_suite:work_dir(Group, Config)}
-                ),
-            ?ON(N1, {ok, _} = emqx_common_test_http:create_default_app()),
+            Nodes = emqx_cth_cluster:start(
+                NodeSpecs,
+                #{work_dir => emqx_cth_suite:work_dir(Group, Config)}
+            ),
             [{cluster, Nodes} | Config];
         Yes ->
             Yes
@@ -144,11 +123,29 @@ end_per_group(common, Config) ->
     emqx_cth_suite:stop(Apps),
     ok.
 
+init_per_testcase(t_smoke_test_monitor_multiple_windows = TestCase, Config) ->
+    Port = 28083,
+    NodeSpecs = [
+        {smoke_multiple_windows1, #{apps => cluster_node_appspec(true, Port)}},
+        {smoke_multiple_windows2, #{apps => cluster_node_appspec(false, Port)}}
+    ],
+    Nodes =
+        [N1 | _] = emqx_cth_cluster:start(
+            NodeSpecs,
+            #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
+        ),
+    ok = snabbkaffe:start_trace(),
+    [{nodes, Nodes}, {api_node, N1} | Config];
 init_per_testcase(_TestCase, Config) ->
     ok = snabbkaffe:start_trace(),
     ct:timetrap({seconds, 30}),
     Config.
 
+end_per_testcase(t_smoke_test_monitor_multiple_windows, Config) ->
+    Nodes = ?config(nodes, Config),
+    ok = snabbkaffe:stop(),
+    ok = emqx_cth_cluster:stop(Nodes),
+    ok;
 end_per_testcase(_TestCase, _Config) ->
     ok = snabbkaffe:stop(),
     emqx_common_test_helpers:call_janitor(),
@@ -159,13 +156,9 @@ end_per_testcase(_TestCase, _Config) ->
 %%--------------------------------------------------------------------
 
 t_empty_table(_Config) ->
-    sys:suspend(whereis(emqx_dashboard_monitor)),
-    try
-        emqx_dashboard_monitor:clean(0),
-        ?assertEqual({ok, []}, request(["monitor"], "latest=20000"))
-    after
-        sys:resume(whereis(emqx_dashboard_monitor))
-    end.
+    pause_monitor_process(),
+    clean_data(),
+    ?assertEqual({ok, []}, request(["monitor"], "latest=20000")).
 
 t_pmap_nodes(_Config) ->
     MaxAge = timer:hours(1),
@@ -173,22 +166,23 @@ t_pmap_nodes(_Config) ->
     Interval = emqx_dashboard_monitor:sample_interval(MaxAge),
     StartTs = round_down(Now - MaxAge, Interval),
     DataPoints = 5,
-    ok = emqx_dashboard_monitor:clean(0),
-    ok = insert_data_points(DataPoints, StartTs, Now),
+    clean_data(),
+    LastVal = insert_data_points(DataPoints, fun sent_n/1, StartTs, Now),
     Nodes = [node(), node(), node()],
     %% this function calls emqx_utils:pmap to do the job
     Data0 = emqx_dashboard_monitor:sample_nodes(Nodes, StartTs),
     Data1 = emqx_dashboard_monitor:fill_gaps(Data0, StartTs),
     Data = emqx_dashboard_monitor:format(Data1),
     ok = check_sample_intervals(Interval, hd(Data), tl(Data)),
-    ?assertEqual(DataPoints * length(Nodes), sum_value(Data, sent)).
+    ?assertEqual(LastVal * length(Nodes), maps:get(sent, lists:last(Data))).
 
 t_inplace_downsample(_Config) ->
-    ok = emqx_dashboard_monitor:clean(0),
+    clean_data(),
     %% -20s to ensure the oldest data point will not expire during the test
     SinceT = 7 * timer:hours(24) - timer:seconds(20),
-    Total = 10000,
-    emqx_dashboard_monitor:randomize(Total, #{sent => 1}, SinceT),
+    Total = 10_000,
+    ConnectionGauge = 3,
+    emqx_dashboard_monitor:randomize(Total, #{sent => 1, connections => ConnectionGauge}, SinceT),
     %% assert original data (before downsample)
     All0 = emqx_dashboard_monitor:all_data(),
     AllSent0 = lists:map(fun({_, #{sent := S}}) -> S end, All0),
@@ -198,11 +192,13 @@ t_inplace_downsample(_Config) ->
     ok = gen_server:call(emqx_dashboard_monitor, dummy, infinity),
     All1 = emqx_dashboard_monitor:all_data(),
     All = drop_dummy_data_points(All1),
-    AllSent = lists:map(fun({_, #{sent := S}}) -> S end, All),
-    ?assertEqual(Total, lists:sum(AllSent)),
     %% check timestamps are not random after downsample
     ExpectedIntervals = [timer:minutes(10), timer:minutes(5), timer:minutes(1), timer:seconds(10)],
     ok = check_intervals(ExpectedIntervals, All),
+    %% Gauges, such as `connections', are not summed.
+    AllConnections = lists:map(fun({_Ts, #{connections := C}}) -> C end, All),
+    DistinctConnections = lists:usort(AllConnections),
+    ?assertEqual([ConnectionGauge], DistinctConnections),
     ok.
 
 %% there might be some data points added while downsample is running
@@ -226,7 +222,7 @@ check_intervals([Interval | Rest], [{Ts, _} | RestData] = All) ->
     end.
 
 t_randomize(_Config) ->
-    ok = emqx_dashboard_monitor:clean(0),
+    clean_data(),
     emqx_dashboard_monitor:randomize(1, #{sent => 100}),
     Since = integer_to_list(7 * timer:hours(24)),
     {ok, Samplers} = request(["monitor"], "latest=" ++ Since),
@@ -249,23 +245,52 @@ t_downsample_1h(_Config) ->
     MaxAge = timer:hours(1),
     test_downsample(MaxAge, 10).
 
-sent_1() -> #{sent => 1}.
+%% Since the monitor process is running, and tests like `t_downsample_*' expect some
+%% degree of determinism, we need to pause that process to avoid having it insert a rogue
+%% point amidst the "carefully crafted" dataset.
+pause_monitor_process() ->
+    ok = sys:suspend(emqx_dashboard_monitor),
+    on_exit(fun() -> ok = sys:resume(emqx_dashboard_monitor) end),
+    ok.
+
+sent_n(N) -> #{sent => N}.
+sent_1() -> sent_n(1).
+
+%% a gauge
+connections_n(N) -> #{connections => N}.
+connections_1() -> connections_n(1).
 
 round_down(Ts, Interval) ->
     Ts - (Ts rem Interval).
 
 test_downsample(MaxAge, DataPoints) ->
+    ok = pause_monitor_process(),
     Now = erlang:system_time(millisecond) - 1,
     Interval = emqx_dashboard_monitor:sample_interval(MaxAge),
     StartTs = round_down(Now - MaxAge, Interval),
-    ok = emqx_dashboard_monitor:clean(0),
+    clean_data(),
     %% insert the start mark for deterministic test boundary
-    ok = write(StartTs, sent_1()),
-    ok = insert_data_points(DataPoints - 1, StartTs, Now),
+    ok = write(StartTs, connections_1()),
+    TsMax = round_down(Now, Interval),
+    LastVal = insert_data_points(DataPoints - 1, fun connections_n/1, StartTs, TsMax),
+    AllData = emqx_dashboard_monitor:all_data(),
     Data = emqx_dashboard_monitor:format(emqx_dashboard_monitor:sample_fill_gap(all, StartTs)),
-    ?assertEqual(StartTs, maps:get(time_stamp, hd(Data))),
+    ?assertEqual(StartTs, maps:get(time_stamp, hd(Data)), #{
+        expected_one_of => StartTs,
+        start_ts => StartTs,
+        interval => Interval,
+        data => Data,
+        all_data => AllData,
+        now => Now
+    }),
     ok = check_sample_intervals(Interval, hd(Data), tl(Data)),
-    ?assertEqual(DataPoints, sum_value(Data, sent)),
+    ?assertEqual(LastVal, maps:get(connections, lists:last(Data)), #{
+        data => Data,
+        interval => Interval,
+        start_ts => StartTs,
+        all_data => AllData,
+        now => Now
+    }),
     ok.
 
 sum_value(Data, Key) ->
@@ -280,21 +305,51 @@ check_sample_intervals(_Interval, _, []) ->
     ok;
 check_sample_intervals(Interval, #{time_stamp := T}, [First | Rest]) ->
     #{time_stamp := T2} = First,
-    ?assertEqual(T + Interval, T2),
+    ?assertEqual(T + Interval, T2, #{
+        t => T,
+        interval => Interval,
+        diff => T + Interval - T2,
+        rest => Rest
+    }),
     check_sample_intervals(Interval, First, Rest).
 
-insert_data_points(0, _TsMin, _TsMax) ->
-    ok;
-insert_data_points(N, TsMin, TsMax) when N > 0 ->
-    Data = sent_1(),
-    FakeTs = TsMin + rand:uniform(TsMax - TsMin),
+insert_data_points(N, MkPointFn, TsMin, TsMax) ->
+    insert_data_points(N, MkPointFn, {_LastTs = 0, _LastVal = 0}, N, TsMin, TsMax).
+
+insert_data_points(0, _MkPointFn, {_LastTs, LastVal}, _InitialN, _TsMin, _TsMax) ->
+    LastVal;
+insert_data_points(N, MkPointFn, {LastTs, LastVal}, InitialN, TsMin, TsMax) when N > 0 ->
+    %% assert
+    true = TsMax - TsMin > 1,
+    %% + 2 because we don't want to insert 1.  It's used as a special "beginning-of-test"
+    %% marker.
+    Val = InitialN - N + 2,
+    Data = MkPointFn(Val),
+    FakeTs =
+        case N of
+            1 ->
+                %% Last point should be `TsMax', otherwise the resulting
+                %% `sample_interval' for this dataset might be different from the one
+                %% expected in the test case.
+                TsMax;
+            _ ->
+                TsMin + rand:uniform(TsMax - TsMin - 1)
+        end,
     case read(FakeTs) of
         [] ->
             ok = write(FakeTs, Data),
-            insert_data_points(N - 1, TsMin, TsMax);
+            {NewLastTs, NewLastVal} =
+                case FakeTs >= LastTs of
+                    true -> {FakeTs, Val};
+                    false -> {LastTs, LastVal}
+                end,
+            insert_data_points(N - 1, MkPointFn, {NewLastTs, NewLastVal}, InitialN, TsMin, TsMax);
+        _ when N =:= 1 ->
+            %% clashed, but trying again won't help because `FakeTs = TsMax'.  shouldn't happen.
+            ct:fail("failed to generate data set: last timestamp is taken!");
         _ ->
             %% clashed, try again
-            insert_data_points(N, TsMin, TsMax)
+            insert_data_points(N, MkPointFn, {LastTs, LastVal}, InitialN, TsMin, TsMax)
     end.
 
 read(Ts) ->
@@ -370,7 +425,7 @@ t_handle_old_monitor_data(_Config) ->
     ok.
 
 t_monitor_api(_) ->
-    emqx_dashboard_monitor:clean(0),
+    clean_data(),
     {ok, _} =
         snabbkaffe:block_until(
             ?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}),
@@ -660,6 +715,101 @@ t_persistent_session_stats(Config) ->
 
     ok.
 
+%% Checks that we get consistent data when changing the requested time window for
+%% `/monitor'.
+t_smoke_test_monitor_multiple_windows(Config) ->
+    [N1, N2 | _] = ?config(nodes, Config),
+    %% pre-condition
+    true = ?ON(N1, emqx_persistent_message:is_persistence_enabled()),
+
+    Port1 = get_mqtt_port(N1, tcp),
+    Port2 = get_mqtt_port(N2, tcp),
+    NonPSClient = start_and_connect(#{
+        port => Port1,
+        clientid => <<"non-ps">>,
+        expiry_interval => 0
+    }),
+    PSClient1 = start_and_connect(#{
+        port => Port1,
+        clientid => <<"ps1">>,
+        expiry_interval => 30
+    }),
+    PSClient2 = start_and_connect(#{
+        port => Port2,
+        clientid => <<"ps2">>,
+        expiry_interval => 30
+    }),
+    {ok, _} =
+        snabbkaffe:block_until(
+            ?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}),
+            infinity
+        ),
+    ?retry(1_000, 10, begin
+        ?assertMatch(
+            {ok, #{
+                <<"connections">> := 3,
+                <<"live_connections">> := 3
+            }},
+            get_latest_from_window(Config, {hours, 1})
+        )
+    end),
+    ?retry(1_000, 10, begin
+        ?assertMatch(
+            {ok, #{
+                <<"connections">> := 3,
+                <<"live_connections">> := 3
+            }},
+            get_latest_from_window(Config, {hours, 6})
+        )
+    end),
+    ?retry(1_000, 10, begin
+        ?assertMatch(
+            {ok, #{
+                <<"connections">> := 3,
+                <<"live_connections">> := 3
+            }},
+            get_latest_from_window(Config, {days, 7})
+        )
+    end),
+    %% Stop one memory and one persistent client
+    {ok, {ok, _}} =
+        ?wait_async_action(
+            begin
+                ok = emqtt:stop(NonPSClient),
+                ok = emqtt:stop(PSClient1)
+            end,
+            #{?snk_kind := dashboard_monitor_flushed}
+        ),
+    ?retry(1_000, 10, begin
+        ?assertMatch(
+            {ok, #{
+                <<"connections">> := 2,
+                <<"live_connections">> := 1
+            }},
+            get_latest_from_window(Config, {hours, 1})
+        )
+    end),
+    ?retry(1_000, 10, begin
+        ?assertMatch(
+            {ok, #{
+                <<"connections">> := 2,
+                <<"live_connections">> := 1
+            }},
+            get_latest_from_window(Config, {hours, 6})
+        )
+    end),
+    ?retry(1_000, 10, begin
+        ?assertMatch(
+            {ok, #{
+                <<"connections">> := 2,
+                <<"live_connections">> := 1
+            }},
+            get_latest_from_window(Config, {days, 7})
+        )
+    end),
+    ok = emqtt:stop(PSClient2),
+    ok.
+
 request(Path) ->
     request(Path, "").
 
@@ -667,12 +817,40 @@ request(Path, QS) ->
     Url = url(Path, QS),
     do_request_api(get, {Url, [auth_header_()]}).
 
+get_latest_from_window(Config, Window) ->
+    WindowS = integer_to_list(window_in_seconds(Window)),
+    case get_req_cluster(Config, ["monitor"], "latest=" ++ WindowS) of
+        {ok, Points} when is_list(Points) ->
+            {ok, lists:last(Points)};
+        Error ->
+            Error
+    end.
+
+window_in_seconds({hours, N}) ->
+    N * 3_600;
+window_in_seconds({days, N}) ->
+    N * 86_400.
+
+get_req_cluster(Config, Path, QS) ->
+    APINode = ?config(api_node, Config),
+    Port = get_http_dashboard_port(APINode),
+    Host = host(Port),
+    Url = url(Host, Path, QS),
+    Auth = ?ON(APINode, auth_header_()),
+    do_request_api(get, {Url, [Auth]}).
+
+host(Port) ->
+    "http://127.0.0.1:" ++ integer_to_list(Port).
+
 url(Parts, QS) ->
+    url(?SERVER, Parts, QS).
+
+url(Host, Parts, QS) ->
     case QS of
         "" ->
-            ?SERVER ++ filename:join([?BASE_PATH | Parts]);
+            Host ++ filename:join([?BASE_PATH | Parts]);
         _ ->
-            ?SERVER ++ filename:join([?BASE_PATH | Parts]) ++ "?" ++ QS
+            Host ++ filename:join([?BASE_PATH | Parts]) ++ "?" ++ QS
     end.
 
 do_request_api(Method, Request) ->
@@ -757,3 +935,29 @@ start_and_connect(Opts) ->
 get_mqtt_port(Node, Type) ->
     {_IP, Port} = ?ON(Node, emqx_config:get([listeners, Type, default, bind])),
     Port.
+
+get_http_dashboard_port(Node) ->
+    ?ON(Node, emqx_config:get([dashboard, listeners, http, bind])).
+
+cluster_node_appspec(Enable, Port0) ->
+    Port =
+        case Enable of
+            true -> integer_to_list(Port0);
+            false -> "0"
+        end,
+    [
+        emqx_conf,
+        {emqx, "durable_sessions {enable = true}"},
+        {emqx_retainer, ?BASE_RETAINER_CONF},
+        emqx_management,
+        emqx_mgmt_api_test_util:emqx_dashboard(
+            lists:concat([
+                "dashboard.listeners.http { bind = " ++ Port ++ " }\n",
+                "dashboard.sample_interval = 1s\n",
+                "dashboard.listeners.http.enable = " ++ atom_to_list(Enable)
+            ])
+        )
+    ].
+
+clean_data() ->
+    ok = emqx_dashboard_monitor:clean(-1).

+ 10 - 0
changes/ce/fix-14023.en.md

@@ -0,0 +1,10 @@
+Fixed an issue with the `GET /monitor` HTTP API where returned data could return values greater than reality depending on the requested time window.  For data points within 1 hour window, this distortion is merely visual in the dashboard.  For data points older than 1 hour, data is permanently distorted.
+
+The impacted gauges are:
+
+- `disconnected_durable_sessions`
+- `subscriptions_durable`
+- `subscriptions`
+- `topics`
+- `connections`
+- `live_connections`