Browse Source

Merge pull request #13103 from thalesmg/fix-another-ds-counter-nit-r57-20240523

fix(monitor api): fix cluster metric aggregation
Thales Macedo Garitezi 1 year ago
parent
commit
118c1d2806

+ 1 - 0
apps/emqx/test/emqx_cth_cluster.erl

@@ -109,6 +109,7 @@ start(Nodes, ClusterOpts) ->
     start(NodeSpecs).
 
 start(NodeSpecs) ->
+    emqx_common_test_helpers:clear_screen(),
     ct:pal("(Re)starting nodes:\n  ~p", [NodeSpecs]),
     % 1. Start bare nodes with only basic applications running
     ok = start_nodes_init(NodeSpecs, ?TIMEOUT_NODE_START_MS),

+ 14 - 13
apps/emqx_dashboard/src/emqx_dashboard_monitor.erl

@@ -119,7 +119,7 @@ current_rate(all) ->
 current_rate(Node) when Node == node() ->
     try
         {ok, Rate} = do_call(current_rate),
-        {ok, Rate}
+        {ok, adjust_individual_node_metrics(Rate)}
     catch
         _E:R ->
             ?SLOG(warning, #{msg => "dashboard_monitor_error", reason => R}),
@@ -156,8 +156,8 @@ current_rate_cluster() ->
     case lists:foldl(Fun, #{}, mria:cluster_nodes(running)) of
         {badrpc, Reason} ->
             {badrpc, Reason};
-        Rate ->
-            {ok, Rate}
+        Metrics ->
+            {ok, adjust_synthetic_cluster_metrics(Metrics)}
     end.
 
 %% -------------------------------------------------------------------------------------------------
@@ -281,22 +281,23 @@ merge_cluster_rate(Node, Cluster) ->
                 ClusterValue = maps:get(Key, NCluster, 0),
                 NCluster#{Key => Value + ClusterValue}
         end,
-    Metrics = maps:fold(Fun, Cluster, Node),
-    adjust_synthetic_cluster_metrics(Metrics).
+    maps:fold(Fun, Cluster, Node).
 
-adjust_synthetic_cluster_metrics(Metrics0) ->
+adjust_individual_node_metrics(Metrics0) ->
     %% ensure renamed
-    Metrics1 = emqx_utils_maps:rename(durable_subscriptions, subscriptions_durable, Metrics0),
-    DSSubs = maps:get(subscriptions_durable, Metrics1, 0),
-    RamSubs = maps:get(subscriptions, Metrics1, 0),
-    DisconnectedDSs = maps:get(disconnected_durable_sessions, Metrics1, 0),
-    Metrics2 = maps:update_with(
+    emqx_utils_maps:rename(durable_subscriptions, subscriptions_durable, Metrics0).
+
+adjust_synthetic_cluster_metrics(Metrics0) ->
+    DSSubs = maps:get(subscriptions_durable, Metrics0, 0),
+    RamSubs = maps:get(subscriptions, Metrics0, 0),
+    DisconnectedDSs = maps:get(disconnected_durable_sessions, Metrics0, 0),
+    Metrics1 = maps:update_with(
         subscriptions,
         fun(Subs) -> Subs + DSSubs end,
         0,
-        Metrics1
+        Metrics0
     ),
-    Metrics = maps:put(subscriptions_ram, RamSubs, Metrics2),
+    Metrics = maps:put(subscriptions_ram, RamSubs, Metrics1),
     maps:update_with(
         connections,
         fun(RamConns) -> RamConns + DisconnectedDSs end,

+ 1 - 1
apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl

@@ -253,7 +253,7 @@ maybe_reject_cluster_only_metrics(<<"all">>, Rates) ->
     Rates;
 maybe_reject_cluster_only_metrics(_Node, Rates) ->
     ClusterOnlyMetrics = [
-        durable_subscriptions,
+        subscriptions_durable,
         disconnected_durable_sessions
     ],
     maps:without(ClusterOnlyMetrics, Rates).

+ 89 - 26
apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl

@@ -49,6 +49,8 @@
     "}"
 >>).
 
+-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)).
+
 %%--------------------------------------------------------------------
 %% CT boilerplate
 %%--------------------------------------------------------------------
@@ -79,21 +81,37 @@ end_per_suite(_Config) ->
     ok.
 
 init_per_group(persistent_sessions = Group, Config) ->
-    Apps = emqx_cth_suite:start(
+    AppSpecsFn = fun(Enable) ->
+        Port =
+            case Enable of
+                true -> "18083";
+                false -> "0"
+            end,
         [
             emqx_conf,
             {emqx, "durable_sessions {enable = true}"},
             {emqx_retainer, ?BASE_RETAINER_CONF},
             emqx_management,
             emqx_mgmt_api_test_util:emqx_dashboard(
-                "dashboard.listeners.http { enable = true, bind = 18083 }\n"
-                "dashboard.sample_interval = 1s"
+                lists:concat([
+                    "dashboard.listeners.http { bind = " ++ Port ++ " }\n",
+                    "dashboard.sample_interval = 1s\n",
+                    "dashboard.listeners.http.enable = " ++ atom_to_list(Enable)
+                ])
             )
-        ],
-        #{work_dir => emqx_cth_suite:work_dir(Group, Config)}
-    ),
-    {ok, _} = emqx_common_test_http:create_default_app(),
-    [{apps, Apps} | Config];
+        ]
+    end,
+    NodeSpecs = [
+        {dashboard_monitor1, #{apps => AppSpecsFn(true)}},
+        {dashboard_monitor2, #{apps => AppSpecsFn(false)}}
+    ],
+    Nodes =
+        [N1 | _] = emqx_cth_cluster:start(
+            NodeSpecs,
+            #{work_dir => emqx_cth_suite:work_dir(Group, Config)}
+        ),
+    ?ON(N1, {ok, _} = emqx_common_test_http:create_default_app()),
+    [{cluster, Nodes} | Config];
 init_per_group(common = Group, Config) ->
     Apps = emqx_cth_suite:start(
         [
@@ -111,7 +129,11 @@ init_per_group(common = Group, Config) ->
     {ok, _} = emqx_common_test_http:create_default_app(),
     [{apps, Apps} | Config].
 
-end_per_group(_Group, Config) ->
+end_per_group(persistent_sessions, Config) ->
+    Cluster = ?config(cluster, Config),
+    emqx_cth_cluster:stop(Cluster),
+    ok;
+end_per_group(common, Config) ->
     Apps = ?config(apps, Config),
     emqx_cth_suite:stop(Apps),
     ok.
@@ -325,26 +347,36 @@ t_monitor_api_error(_) ->
     ok.
 
 %% Verifies that subscriptions from persistent sessions are correctly accounted for.
-t_persistent_session_stats(_Config) ->
+t_persistent_session_stats(Config) ->
+    [N1, N2 | _] = ?config(cluster, Config),
     %% pre-condition
-    true = emqx_persistent_message:is_persistence_enabled(),
+    true = ?ON(N1, emqx_persistent_message:is_persistence_enabled()),
+    Port1 = get_mqtt_port(N1, tcp),
+    Port2 = get_mqtt_port(N2, tcp),
 
     NonPSClient = start_and_connect(#{
+        port => Port1,
         clientid => <<"non-ps">>,
         expiry_interval => 0
     }),
-    PSClient = start_and_connect(#{
-        clientid => <<"ps">>,
+    PSClient1 = start_and_connect(#{
+        port => Port1,
+        clientid => <<"ps1">>,
+        expiry_interval => 30
+    }),
+    PSClient2 = start_and_connect(#{
+        port => Port2,
+        clientid => <<"ps2">>,
         expiry_interval => 30
     }),
     {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"non/ps/topic/+">>, 2),
     {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"non/ps/topic">>, 2),
     {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"common/topic/+">>, 2),
     {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"common/topic">>, 2),
-    {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"ps/topic/+">>, 2),
-    {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"ps/topic">>, 2),
-    {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"common/topic/+">>, 2),
-    {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"common/topic">>, 2),
+    {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient1, <<"ps/topic/+">>, 2),
+    {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient1, <<"ps/topic">>, 2),
+    {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient1, <<"common/topic/+">>, 2),
+    {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient1, <<"common/topic">>, 2),
     {ok, _} =
         snabbkaffe:block_until(
             ?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}),
@@ -353,7 +385,7 @@ t_persistent_session_stats(_Config) ->
     ?retry(1_000, 10, begin
         ?assertMatch(
             {ok, #{
-                <<"connections">> := 2,
+                <<"connections">> := 3,
                 <<"disconnected_durable_sessions">> := 0,
                 %% N.B.: we currently don't perform any deduplication between persistent
                 %% and non-persistent routes, so we count `commont/topic' twice and get 8
@@ -363,25 +395,25 @@ t_persistent_session_stats(_Config) ->
                 <<"subscriptions_ram">> := 4,
                 <<"subscriptions_durable">> := 4
             }},
-            request(["monitor_current"])
+            ?ON(N1, request(["monitor_current"]))
         )
     end),
     %% Sanity checks
-    PSRouteCount = emqx_persistent_session_ds_router:stats(n_routes),
+    PSRouteCount = ?ON(N1, emqx_persistent_session_ds_router:stats(n_routes)),
     ?assert(PSRouteCount > 0, #{ps_route_count => PSRouteCount}),
-    PSSubCount = emqx_persistent_session_bookkeeper:get_subscription_count(),
+    PSSubCount = ?ON(N1, emqx_persistent_session_bookkeeper:get_subscription_count()),
     ?assert(PSSubCount > 0, #{ps_sub_count => PSSubCount}),
 
     %% Now with disconnected but alive persistent sessions
     {ok, {ok, _}} =
         ?wait_async_action(
-            emqtt:disconnect(PSClient),
+            emqtt:disconnect(PSClient1),
             #{?snk_kind := dashboard_monitor_flushed}
         ),
     ?retry(1_000, 10, begin
         ?assertMatch(
             {ok, #{
-                <<"connections">> := 2,
+                <<"connections">> := 3,
                 <<"disconnected_durable_sessions">> := 1,
                 %% N.B.: we currently don't perform any deduplication between persistent
                 %% and non-persistent routes, so we count `commont/topic' twice and get 8
@@ -391,7 +423,28 @@ t_persistent_session_stats(_Config) ->
                 <<"subscriptions_ram">> := 4,
                 <<"subscriptions_durable">> := 4
             }},
-            request(["monitor_current"])
+            ?ON(N1, request(["monitor_current"]))
+        )
+    end),
+    {ok, {ok, _}} =
+        ?wait_async_action(
+            emqtt:disconnect(PSClient2),
+            #{?snk_kind := dashboard_monitor_flushed}
+        ),
+    ?retry(1_000, 10, begin
+        ?assertMatch(
+            {ok, #{
+                <<"connections">> := 3,
+                <<"disconnected_durable_sessions">> := 2,
+                %% N.B.: we currently don't perform any deduplication between persistent
+                %% and non-persistent routes, so we count `commont/topic' twice and get 8
+                %% instead of 6 here.
+                <<"topics">> := 8,
+                <<"subscriptions">> := 8,
+                <<"subscriptions_ram">> := 4,
+                <<"subscriptions_durable">> := 4
+            }},
+            ?ON(N1, request(["monitor_current"]))
         )
     end),
 
@@ -467,15 +520,21 @@ waiting_emqx_stats_and_monitor_update(WaitKey) ->
     ok.
 
 start_and_connect(Opts) ->
-    Defaults = #{clean_start => false, expiry_interval => 30},
+    Defaults = #{
+        clean_start => false,
+        expiry_interval => 30,
+        port => 1883
+    },
     #{
         clientid := ClientId,
         clean_start := CleanStart,
-        expiry_interval := EI
+        expiry_interval := EI,
+        port := Port
     } = maps:merge(Defaults, Opts),
     {ok, Client} = emqtt:start_link([
         {clientid, ClientId},
         {clean_start, CleanStart},
+        {port, Port},
         {proto_ver, v5},
         {properties, #{'Session-Expiry-Interval' => EI}}
     ]),
@@ -484,3 +543,7 @@ start_and_connect(Opts) ->
     end),
     {ok, _} = emqtt:connect(Client),
     Client.
+
+get_mqtt_port(Node, Type) ->
+    {_IP, Port} = ?ON(Node, emqx_config:get([listeners, Type, default, bind])),
+    Port.