Преглед изворни кода

Merge pull request #13067 from thalesmg/fix-ds-monitor-subs-r57-20240517

fix(dssubs): introduce separate gauge for subscriptions from durable sessions
Thales Macedo Garitezi пре 1 година
родитељ
комит
d37edd69ae

+ 11 - 8
apps/emqx/src/emqx_broker_helper.erl

@@ -111,6 +111,11 @@ reclaim_seq(Topic) ->
 stats_fun() ->
 stats_fun() ->
     safe_update_stats(subscriber_val(), 'subscribers.count', 'subscribers.max'),
     safe_update_stats(subscriber_val(), 'subscribers.count', 'subscribers.max'),
     safe_update_stats(subscription_count(), 'subscriptions.count', 'subscriptions.max'),
     safe_update_stats(subscription_count(), 'subscriptions.count', 'subscriptions.max'),
+    safe_update_stats(
+        durable_subscription_count(),
+        'durable_subscriptions.count',
+        'durable_subscriptions.max'
+    ),
     safe_update_stats(table_size(?SUBOPTION), 'suboptions.count', 'suboptions.max').
     safe_update_stats(table_size(?SUBOPTION), 'suboptions.count', 'suboptions.max').
 
 
 safe_update_stats(undefined, _Stat, _MaxStat) ->
 safe_update_stats(undefined, _Stat, _MaxStat) ->
@@ -118,15 +123,13 @@ safe_update_stats(undefined, _Stat, _MaxStat) ->
 safe_update_stats(Val, Stat, MaxStat) when is_integer(Val) ->
 safe_update_stats(Val, Stat, MaxStat) when is_integer(Val) ->
     emqx_stats:setstat(Stat, MaxStat, Val).
     emqx_stats:setstat(Stat, MaxStat, Val).
 
 
+%% N.B.: subscriptions from durable sessions are not tied to any particular node.
+%% Therefore, do not sum them with node-local subscriptions.
 subscription_count() ->
 subscription_count() ->
-    NonPSCount = table_size(?SUBSCRIPTION),
-    PSCount = emqx_persistent_session_bookkeeper:get_subscription_count(),
-    case is_integer(NonPSCount) of
-        true ->
-            NonPSCount + PSCount;
-        false ->
-            PSCount
-    end.
+    table_size(?SUBSCRIPTION).
+
+durable_subscription_count() ->
+    emqx_persistent_session_bookkeeper:get_subscription_count().
 
 
 subscriber_val() ->
 subscriber_val() ->
     sum_subscriber(table_size(?SUBSCRIBER), table_size(?SHARED_SUBSCRIBER)).
     sum_subscriber(table_size(?SUBSCRIBER), table_size(?SHARED_SUBSCRIBER)).

+ 4 - 0
apps/emqx/src/emqx_stats.erl

@@ -109,6 +109,8 @@
 
 
 %% PubSub stats
 %% PubSub stats
 -define(PUBSUB_STATS, [
 -define(PUBSUB_STATS, [
+    'durable_subscriptions.count',
+    'durable_subscriptions.max',
     'topics.count',
     'topics.count',
     'topics.max',
     'topics.max',
     'suboptions.count',
     'suboptions.count',
@@ -166,6 +168,8 @@ names() ->
     [
     [
         emqx_connections_count,
         emqx_connections_count,
         emqx_connections_max,
         emqx_connections_max,
+        emqx_durable_subscriptions_count,
+        emqx_durable_subscriptions_max,
         emqx_live_connections_count,
         emqx_live_connections_count,
         emqx_live_connections_max,
         emqx_live_connections_max,
         emqx_cluster_sessions_count,
         emqx_cluster_sessions_count,

+ 1 - 0
apps/emqx_dashboard/include/emqx_dashboard.hrl

@@ -72,6 +72,7 @@
 ]).
 ]).
 
 
 -define(GAUGE_SAMPLER_LIST, [
 -define(GAUGE_SAMPLER_LIST, [
+    durable_subscriptions,
     subscriptions,
     subscriptions,
     topics,
     topics,
     connections,
     connections,

+ 3 - 0
apps/emqx_dashboard/src/emqx_dashboard_monitor.erl

@@ -262,6 +262,8 @@ merge_cluster_rate(Node, Cluster) ->
     Fun =
     Fun =
         fun
         fun
             %% cluster-synced values
             %% cluster-synced values
+            (durable_subscriptions, V, NCluster) ->
+                NCluster#{durable_subscriptions => V};
             (topics, V, NCluster) ->
             (topics, V, NCluster) ->
                 NCluster#{topics => V};
                 NCluster#{topics => V};
             (retained_msg_count, V, NCluster) ->
             (retained_msg_count, V, NCluster) ->
@@ -416,6 +418,7 @@ getstats(Key) ->
     end.
     end.
 
 
 stats(connections) -> emqx_stats:getstat('connections.count');
 stats(connections) -> emqx_stats:getstat('connections.count');
+stats(durable_subscriptions) -> emqx_stats:getstat('durable_subscriptions.count');
 stats(live_connections) -> emqx_stats:getstat('live_connections.count');
 stats(live_connections) -> emqx_stats:getstat('live_connections.count');
 stats(cluster_sessions) -> emqx_stats:getstat('cluster_sessions.count');
 stats(cluster_sessions) -> emqx_stats:getstat('cluster_sessions.count');
 stats(topics) -> emqx_stats:getstat('topics.count');
 stats(topics) -> emqx_stats:getstat('topics.count');

+ 4 - 1
apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl

@@ -194,8 +194,11 @@ swagger_desc(validation_failed) ->
     swagger_desc_format("Schema validations failed ");
     swagger_desc_format("Schema validations failed ");
 swagger_desc(persisted) ->
 swagger_desc(persisted) ->
     swagger_desc_format("Messages saved to the durable storage ");
     swagger_desc_format("Messages saved to the durable storage ");
+swagger_desc(durable_subscriptions) ->
+    <<"Subscriptions from durable sessions at the time of sampling.", ?APPROXIMATE_DESC>>;
 swagger_desc(subscriptions) ->
 swagger_desc(subscriptions) ->
-    <<"Subscriptions at the time of sampling.", ?APPROXIMATE_DESC>>;
+    <<"Subscriptions at the time of sampling (not considering durable sessions).",
+        ?APPROXIMATE_DESC>>;
 swagger_desc(topics) ->
 swagger_desc(topics) ->
     <<"Count topics at the time of sampling.", ?APPROXIMATE_DESC>>;
     <<"Count topics at the time of sampling.", ?APPROXIMATE_DESC>>;
 swagger_desc(connections) ->
 swagger_desc(connections) ->

+ 2 - 1
apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl

@@ -345,7 +345,8 @@ t_persistent_session_stats(_Config) ->
                 %% and non-persistent routes, so we count `commont/topic' twice and get 8
                 %% and non-persistent routes, so we count `commont/topic' twice and get 8
                 %% instead of 6 here.
                 %% instead of 6 here.
                 <<"topics">> := 8,
                 <<"topics">> := 8,
-                <<"subscriptions">> := 8
+                <<"durable_subscriptions">> := 4,
+                <<"subscriptions">> := 4
             }},
             }},
             request(["monitor_current"])
             request(["monitor_current"])
         )
         )

+ 13 - 2
apps/emqx_management/src/emqx_mgmt.erl

@@ -270,6 +270,12 @@ get_metrics() ->
 get_metrics(Node) ->
 get_metrics(Node) ->
     unwrap_rpc(emqx_proto_v1:get_metrics(Node)).
     unwrap_rpc(emqx_proto_v1:get_metrics(Node)).
 
 
+aggregated_only_keys() ->
+    [
+        'durable_subscriptions.count',
+        'durable_subscriptions.max'
+    ].
+
 get_stats() ->
 get_stats() ->
     GlobalStatsKeys =
     GlobalStatsKeys =
         [
         [
@@ -294,7 +300,7 @@ get_stats() ->
             emqx:running_nodes()
             emqx:running_nodes()
         )
         )
     ),
     ),
-    GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(get_stats(node()))),
+    GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(emqx_stats:getstats())),
     maps:merge(CountStats, GlobalStats).
     maps:merge(CountStats, GlobalStats).
 
 
 delete_keys(List, []) ->
 delete_keys(List, []) ->
@@ -303,7 +309,12 @@ delete_keys(List, [Key | Keys]) ->
     delete_keys(proplists:delete(Key, List), Keys).
     delete_keys(proplists:delete(Key, List), Keys).
 
 
 get_stats(Node) ->
 get_stats(Node) ->
-    unwrap_rpc(emqx_proto_v1:get_stats(Node)).
+    case unwrap_rpc(emqx_proto_v1:get_stats(Node)) of
+        {error, _} = Error ->
+            Error;
+        Stats when is_list(Stats) ->
+            delete_keys(Stats, aggregated_only_keys())
+    end.
 
 
 nodes_info_count(PropList) ->
 nodes_info_count(PropList) ->
     NodeCount =
     NodeCount =

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_api_nodes.erl

@@ -122,7 +122,7 @@ schema("/nodes/:node/stats") ->
                 responses =>
                 responses =>
                     #{
                     #{
                         200 => mk(
                         200 => mk(
-                            ref(?NODE_STATS_MODULE, node_stats_data),
+                            ref(?NODE_STATS_MODULE, aggregated_data),
                             #{desc => <<"Get node stats successfully">>}
                             #{desc => <<"Get node stats successfully">>}
                         ),
                         ),
                         404 => not_found()
                         404 => not_found()

+ 14 - 7
apps/emqx_management/src/emqx_mgmt_api_stats.erl

@@ -60,8 +60,8 @@ schema("/stats") ->
                     #{
                     #{
                         200 => mk(
                         200 => mk(
                             hoconsc:union([
                             hoconsc:union([
-                                ref(?MODULE, node_stats_data),
-                                array(ref(?MODULE, aggergate_data))
+                                array(ref(?MODULE, per_node_data)),
+                                ref(?MODULE, aggregated_data)
                             ]),
                             ]),
                             #{desc => <<"List stats ok">>}
                             #{desc => <<"List stats ok">>}
                         )
                         )
@@ -82,7 +82,7 @@ fields(aggregate) ->
                 }
                 }
             )}
             )}
     ];
     ];
-fields(node_stats_data) ->
+fields(aggregated_data) ->
     [
     [
         stats_schema('channels.count', <<"sessions.count">>),
         stats_schema('channels.count', <<"sessions.count">>),
         stats_schema('channels.max', <<"session.max">>),
         stats_schema('channels.max', <<"session.max">>),
@@ -106,7 +106,10 @@ fields(node_stats_data) ->
         stats_schema('subscribers.max', <<"Historical maximum number of subscribers">>),
         stats_schema('subscribers.max', <<"Historical maximum number of subscribers">>),
         stats_schema(
         stats_schema(
             'subscriptions.count',
             'subscriptions.count',
-            <<"Number of current subscriptions, including shared subscriptions">>
+            <<
+                "Number of current subscriptions, including shared subscriptions,"
+                " but not subscriptions from durable sessions"
+            >>
         ),
         ),
         stats_schema('subscriptions.max', <<"Historical maximum number of subscriptions">>),
         stats_schema('subscriptions.max', <<"Historical maximum number of subscriptions">>),
         stats_schema('subscriptions.shared.count', <<"Number of current shared subscriptions">>),
         stats_schema('subscriptions.shared.count', <<"Number of current shared subscriptions">>),
@@ -116,14 +119,18 @@ fields(node_stats_data) ->
         stats_schema('topics.count', <<"Number of current topics">>),
         stats_schema('topics.count', <<"Number of current topics">>),
         stats_schema('topics.max', <<"Historical maximum number of topics">>)
         stats_schema('topics.max', <<"Historical maximum number of topics">>)
     ];
     ];
-fields(aggergate_data) ->
+fields(per_node_data) ->
     [
     [
         {node,
         {node,
             mk(string(), #{
             mk(string(), #{
                 desc => <<"Node name">>,
                 desc => <<"Node name">>,
                 example => <<"emqx@127.0.0.1">>
                 example => <<"emqx@127.0.0.1">>
-            })}
-    ] ++ fields(node_stats_data).
+            })},
+        stats_schema(
+            'durable_subscriptions.count',
+            <<"Number of current subscriptions from durable sessions in the cluster">>
+        )
+    ] ++ fields(aggregated_data).
 
 
 stats_schema(Name, Desc) ->
 stats_schema(Name, Desc) ->
     {Name, mk(non_neg_integer(), #{desc => Desc, example => 0})}.
     {Name, mk(non_neg_integer(), #{desc => Desc, example => 0})}.

+ 16 - 5
apps/emqx_management/test/emqx_mgmt_api_stats_SUITE.erl

@@ -25,12 +25,22 @@ all() ->
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
     meck:expect(emqx, running_nodes, 0, [node(), 'fake@node']),
     meck:expect(emqx, running_nodes, 0, [node(), 'fake@node']),
-    emqx_mgmt_api_test_util:init_suite(),
-    Config.
+    Apps = emqx_cth_suite:start(
+        [
+            emqx,
+            emqx_management,
+            emqx_mgmt_api_test_util:emqx_dashboard()
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    {ok, _Api} = emqx_common_test_http:create_default_app(),
+    [{apps, Apps} | Config].
 
 
-end_per_suite(_) ->
+end_per_suite(Config) ->
+    Apps = proplists:get_value(apps, Config),
     meck:unload(emqx),
     meck:unload(emqx),
-    emqx_mgmt_api_test_util:end_suite().
+    emqx_cth_suite:stop(Apps),
+    ok.
 
 
 t_stats_api(_) ->
 t_stats_api(_) ->
     S = emqx_mgmt_api_test_util:api_path(["stats?aggregate=false"]),
     S = emqx_mgmt_api_test_util:api_path(["stats?aggregate=false"]),
@@ -39,7 +49,8 @@ t_stats_api(_) ->
     SystemStats1 = emqx_mgmt:get_stats(),
     SystemStats1 = emqx_mgmt:get_stats(),
     Fun1 =
     Fun1 =
         fun(Key) ->
         fun(Key) ->
-            ?assertEqual(maps:get(Key, SystemStats1), maps:get(atom_to_binary(Key, utf8), Stats1))
+            ?assertEqual(maps:get(Key, SystemStats1), maps:get(atom_to_binary(Key, utf8), Stats1)),
+            ?assertNot(is_map_key(<<"durable_subscriptions.count">>, Stats1), #{stats => Stats1})
         end,
         end,
     lists:foreach(Fun1, maps:keys(SystemStats1)),
     lists:foreach(Fun1, maps:keys(SystemStats1)),
     StatsPath = emqx_mgmt_api_test_util:api_path(["stats?aggregate=true"]),
     StatsPath = emqx_mgmt_api_test_util:api_path(["stats?aggregate=true"]),

+ 4 - 0
apps/emqx_prometheus/src/emqx_prometheus.erl

@@ -331,6 +331,8 @@ emqx_collect(K = emqx_channels_max, D) -> gauge_metrics(?MG(K, D));
 emqx_collect(K = emqx_cluster_sessions_count, D) -> gauge_metrics(?MG(K, D));
 emqx_collect(K = emqx_cluster_sessions_count, D) -> gauge_metrics(?MG(K, D));
 emqx_collect(K = emqx_cluster_sessions_max, D) -> gauge_metrics(?MG(K, D));
 emqx_collect(K = emqx_cluster_sessions_max, D) -> gauge_metrics(?MG(K, D));
 %% pub/sub stats
 %% pub/sub stats
+emqx_collect(K = emqx_durable_subscriptions_count, D) -> gauge_metrics(?MG(K, D));
+emqx_collect(K = emqx_durable_subscriptions_max, D) -> gauge_metrics(?MG(K, D));
 emqx_collect(K = emqx_topics_count, D) -> gauge_metrics(?MG(K, D));
 emqx_collect(K = emqx_topics_count, D) -> gauge_metrics(?MG(K, D));
 emqx_collect(K = emqx_topics_max, D) -> gauge_metrics(?MG(K, D));
 emqx_collect(K = emqx_topics_max, D) -> gauge_metrics(?MG(K, D));
 emqx_collect(K = emqx_suboptions_count, D) -> gauge_metrics(?MG(K, D));
 emqx_collect(K = emqx_suboptions_count, D) -> gauge_metrics(?MG(K, D));
@@ -541,6 +543,8 @@ stats_metric_meta() ->
         {emqx_subscribers_max, gauge, 'subscribers.max'},
         {emqx_subscribers_max, gauge, 'subscribers.max'},
         {emqx_subscriptions_count, gauge, 'subscriptions.count'},
         {emqx_subscriptions_count, gauge, 'subscriptions.count'},
         {emqx_subscriptions_max, gauge, 'subscriptions.max'},
         {emqx_subscriptions_max, gauge, 'subscriptions.max'},
+        {emqx_durable_subscriptions_count, gauge, 'durable_subscriptions.count'},
+        {emqx_durable_subscriptions_max, gauge, 'durable_subscriptions.max'},
         %% delayed
         %% delayed
         {emqx_delayed_count, gauge, 'delayed.count'},
         {emqx_delayed_count, gauge, 'delayed.count'},
         {emqx_delayed_max, gauge, 'delayed.max'}
         {emqx_delayed_max, gauge, 'delayed.max'}

+ 2 - 0
apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl

@@ -402,6 +402,8 @@ assert_json_data__stats(M, Mode) when
         #{
         #{
             emqx_connections_count := _,
             emqx_connections_count := _,
             emqx_connections_max := _,
             emqx_connections_max := _,
+            emqx_durable_subscriptions_count := _,
+            emqx_durable_subscriptions_max := _,
             emqx_live_connections_count := _,
             emqx_live_connections_count := _,
             emqx_live_connections_max := _,
             emqx_live_connections_max := _,
             emqx_sessions_count := _,
             emqx_sessions_count := _,

+ 1 - 0
changes/ce/fix-13067.en.md

@@ -0,0 +1 @@
+Adds a new `durable_subscriptions.count` statistic to track subscriptions that are tied to durable sessions.  `subscriptions.count` does not include such subscriptions.