Просмотр исходного кода

fix(monitor api): count persistent routes and subscriptions

Fixes https://emqx.atlassian.net/browse/EMQX-12267
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
28355a4cfc

+ 11 - 1
apps/emqx/src/emqx_broker_helper.erl

@@ -110,7 +110,7 @@ reclaim_seq(Topic) ->
 
 stats_fun() ->
     safe_update_stats(subscriber_val(), 'subscribers.count', 'subscribers.max'),
-    safe_update_stats(table_size(?SUBSCRIPTION), 'subscriptions.count', 'subscriptions.max'),
+    safe_update_stats(subscription_count(), 'subscriptions.count', 'subscriptions.max'),
     safe_update_stats(table_size(?SUBOPTION), 'suboptions.count', 'suboptions.max').
 
 safe_update_stats(undefined, _Stat, _MaxStat) ->
@@ -118,6 +118,16 @@ safe_update_stats(undefined, _Stat, _MaxStat) ->
 safe_update_stats(Val, Stat, MaxStat) when is_integer(Val) ->
     emqx_stats:setstat(Stat, MaxStat, Val).
 
+subscription_count() ->
+    NonPSCount = table_size(?SUBSCRIPTION),
+    PSCount = emqx_persistent_session_bookkeeper:get_subscription_count(),
+    case is_integer(NonPSCount) of
+        true ->
+            NonPSCount + PSCount;
+        false ->
+            PSCount
+    end.
+
 subscriber_val() ->
     sum_subscriber(table_size(?SUBSCRIBER), table_size(?SHARED_SUBSCRIBER)).
 

+ 3 - 1
apps/emqx/src/emqx_cm_sup.erl

@@ -53,6 +53,7 @@ init([]) ->
     RegistryKeeper = child_spec(emqx_cm_registry_keeper, 5000, worker),
     Manager = child_spec(emqx_cm, 5000, worker),
     DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor),
+    DSSessionBookkeeper = child_spec(emqx_persistent_session_bookkeeper, 5_000, worker),
     Children =
         [
             Banned,
@@ -62,7 +63,8 @@ init([]) ->
             Registry,
             RegistryKeeper,
             Manager,
-            DSSessionGCSup
+            DSSessionGCSup,
+            DSSessionBookkeeper
         ],
     {ok, {SupFlags, Children}}.
 

+ 107 - 0
apps/emqx/src/emqx_persistent_session_bookkeeper.erl

@@ -0,0 +1,107 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_persistent_session_bookkeeper).
+
+-behaviour(gen_server).
+
+%% API
+-export([
+    start_link/0,
+    get_subscription_count/0
+]).
+
+%% `gen_server' API
+-export([
+    init/1,
+    handle_continue/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2
+]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+%% call/cast/info events
+-record(tally_subs, {}).
+-record(get_subscription_count, {}).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+-spec start_link() -> gen_server:start_ret().
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, _InitOpts = #{}, _Opts = []).
+
+%% @doc Gets a cached view of the cluster-global count of persistent subscriptions.
+-spec get_subscription_count() -> non_neg_integer().
+get_subscription_count() ->
+    case emqx_persistent_message:is_persistence_enabled() of
+        true ->
+            gen_server:call(?MODULE, #get_subscription_count{}, infinity);
+        false ->
+            0
+    end.
+
+%%------------------------------------------------------------------------------
+%% `gen_server' API
+%%------------------------------------------------------------------------------
+
+init(_Opts) ->
+    case emqx_persistent_message:is_persistence_enabled() of
+        true ->
+            State = #{subs_count => 0},
+            {ok, State, {continue, #tally_subs{}}};
+        false ->
+            ignore
+    end.
+
+handle_continue(#tally_subs{}, State0) ->
+    State = tally_persistent_subscriptions(State0),
+    ensure_subs_tally_timer(),
+    {noreply, State}.
+
+handle_call(#get_subscription_count{}, _From, State) ->
+    #{subs_count := N} = State,
+    {reply, N, State};
+handle_call(_Call, _From, State) ->
+    {reply, {error, bad_call}, State}.
+
+handle_cast(_Cast, State) ->
+    {noreply, State}.
+
+handle_info(#tally_subs{}, State0) ->
+    State = tally_persistent_subscriptions(State0),
+    ensure_subs_tally_timer(),
+    {noreply, State};
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
+
+tally_persistent_subscriptions(State0) ->
+    N = emqx_persistent_session_ds_state:total_subscription_count(),
+    State0#{subs_count := N}.
+
+ensure_subs_tally_timer() ->
+    Timeout = emqx_config:get([session_persistence, subscription_count_refresh_interval]),
+    _ = erlang:send_after(Timeout, self(), #tally_subs{}),
+    ok.

+ 7 - 0
apps/emqx/src/emqx_persistent_session_ds_state.erl

@@ -53,6 +53,7 @@
     cold_get_subscription/2,
     fold_subscriptions/3,
     n_subscriptions/1,
+    total_subscription_count/0,
     put_subscription/3,
     del_subscription/2
 ]).
@@ -401,6 +402,12 @@ fold_subscriptions(Fun, Acc, Rec) ->
 n_subscriptions(Rec) ->
     gen_size(?subscriptions, Rec).
 
+-spec total_subscription_count() -> non_neg_integer().
+total_subscription_count() ->
+    mria:async_dirty(?DS_MRIA_SHARD, fun() ->
+        mnesia:foldl(fun(#kv{}, Acc) -> Acc + 1 end, 0, ?subscription_tab)
+    end).
+
 -spec put_subscription(
     emqx_persistent_session_ds:topic_filter(),
     emqx_persistent_session_ds_subs:subscription(),

+ 11 - 1
apps/emqx/src/emqx_router_helper.erl

@@ -189,7 +189,17 @@ code_change(_OldVsn, State, _Extra) ->
 %%--------------------------------------------------------------------
 
 stats_fun() ->
-    emqx_stats:setstat('topics.count', 'topics.max', emqx_router:stats(n_routes)).
+    PSRouteCount = persistent_route_count(),
+    NonPSRouteCount = emqx_router:stats(n_routes),
+    emqx_stats:setstat('topics.count', 'topics.max', PSRouteCount + NonPSRouteCount).
+
+persistent_route_count() ->
+    case emqx_persistent_message:is_persistence_enabled() of
+        true ->
+            emqx_persistent_session_ds_router:stats(n_routes);
+        false ->
+            0
+    end.
 
 cleanup_routes(Node) ->
     emqx_router:cleanup_routes(Node).

+ 8 - 0
apps/emqx/src/emqx_schema.erl

@@ -1713,6 +1713,14 @@ fields("session_persistence") ->
                     desc => ?DESC(session_ds_session_gc_batch_size)
                 }
             )},
+        {"subscription_count_refresh_interval",
+            sc(
+                timeout_duration(),
+                #{
+                    default => <<"5s">>,
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )},
         {"message_retention_period",
             sc(
                 timeout_duration(),

+ 2 - 1
apps/emqx/test/emqx_config_SUITE.erl

@@ -475,6 +475,7 @@ zone_global_defaults() ->
                 message_retention_period => 86400000,
                 renew_streams_interval => 5000,
                 session_gc_batch_size => 100,
-                session_gc_interval => 600000
+                session_gc_interval => 600000,
+                subscription_count_refresh_interval => 5000
             }
     }.

+ 108 - 4
apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl

@@ -20,11 +20,13 @@
 -compile(export_all).
 
 -import(emqx_dashboard_SUITE, [auth_header_/0]).
+-import(emqx_common_test_helpers, [on_exit/1]).
 
 -include("emqx_dashboard.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
 
 -define(SERVER, "http://127.0.0.1:18083").
 -define(BASE_PATH, "/api/v5").
@@ -52,10 +54,47 @@
 %%--------------------------------------------------------------------
 
 all() ->
-    emqx_common_test_helpers:all(?MODULE).
+    [
+        {group, common},
+        {group, persistent_sessions}
+    ].
+
+groups() ->
+    AllTCs = emqx_common_test_helpers:all(?MODULE),
+    PSTCs = persistent_session_testcases(),
+    [
+        {common, [], AllTCs -- PSTCs},
+        {persistent_sessions, [], PSTCs}
+    ].
+
+persistent_session_testcases() ->
+    [
+        t_persistent_session_stats
+    ].
 
 init_per_suite(Config) ->
-    emqx_common_test_helpers:clear_screen(),
+    Config.
+
+end_per_suite(_Config) ->
+    ok.
+
+init_per_group(persistent_sessions = Group, Config) ->
+    Apps = emqx_cth_suite:start(
+        [
+            emqx_conf,
+            {emqx, "session_persistence {enable = true}"},
+            {emqx_retainer, ?BASE_RETAINER_CONF},
+            emqx_management,
+            emqx_mgmt_api_test_util:emqx_dashboard(
+                "dashboard.listeners.http { enable = true, bind = 18083 }\n"
+                "dashboard.sample_interval = 1s"
+            )
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Group, Config)}
+    ),
+    {ok, _} = emqx_common_test_http:create_default_app(),
+    [{apps, Apps} | Config];
+init_per_group(common = Group, Config) ->
     Apps = emqx_cth_suite:start(
         [
             emqx,
@@ -67,12 +106,12 @@ init_per_suite(Config) ->
                 "dashboard.sample_interval = 1s"
             )
         ],
-        #{work_dir => emqx_cth_suite:work_dir(Config)}
+        #{work_dir => emqx_cth_suite:work_dir(Group, Config)}
     ),
     {ok, _} = emqx_common_test_http:create_default_app(),
     [{apps, Apps} | Config].
 
-end_per_suite(Config) ->
+end_per_group(_Group, Config) ->
     Apps = ?config(apps, Config),
     emqx_cth_suite:stop(Apps),
     ok.
@@ -84,6 +123,7 @@ init_per_testcase(_TestCase, Config) ->
 
 end_per_testcase(_TestCase, _Config) ->
     ok = snabbkaffe:stop(),
+    emqx_common_test_helpers:call_janitor(),
     ok.
 
 %%--------------------------------------------------------------------
@@ -272,6 +312,51 @@ t_monitor_api_error(_) ->
         request(["monitor"], "latest=-1"),
     ok.
 
+%% Verifies that subscriptions from persistent sessions are correctly accounted for.
+t_persistent_session_stats(_Config) ->
+    %% pre-condition
+    true = emqx_persistent_message:is_persistence_enabled(),
+
+    NonPSClient = start_and_connect(#{
+        clientid => <<"non-ps">>,
+        expiry_interval => 0
+    }),
+    PSClient = start_and_connect(#{
+        clientid => <<"ps">>,
+        expiry_interval => 30
+    }),
+    {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"non/ps/topic/+">>, 2),
+    {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"non/ps/topic">>, 2),
+    {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"common/topic/+">>, 2),
+    {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"common/topic">>, 2),
+    {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"ps/topic/+">>, 2),
+    {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"ps/topic">>, 2),
+    {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"common/topic/+">>, 2),
+    {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"common/topic">>, 2),
+    {ok, _} =
+        snabbkaffe:block_until(
+            ?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}),
+            infinity
+        ),
+    ?retry(1_000, 10, begin
+        ?assertMatch(
+            {ok, #{
+                %% N.B.: we currently don't perform any deduplication between persistent
+                %% and non-persistent routes, so we count `commont/topic' twice and get 8
+                %% instead of 6 here.
+                <<"topics">> := 8,
+                <<"subscriptions">> := 8
+            }},
+            request(["monitor_current"])
+        )
+    end),
+    %% Sanity checks
+    PSRouteCount = emqx_persistent_session_ds_router:stats(n_routes),
+    ?assert(PSRouteCount > 0, #{ps_route_count => PSRouteCount}),
+    PSSubCount = emqx_persistent_session_bookkeeper:get_subscription_count(),
+    ?assert(PSSubCount > 0, #{ps_sub_count => PSSubCount}),
+    ok.
+
 request(Path) ->
     request(Path, "").
 
@@ -340,3 +425,22 @@ waiting_emqx_stats_and_monitor_update(WaitKey) ->
     %% manually call monitor update
     _ = emqx_dashboard_monitor:current_rate_cluster(),
     ok.
+
+start_and_connect(Opts) ->
+    Defaults = #{clean_start => false, expiry_interval => 30},
+    #{
+        clientid := ClientId,
+        clean_start := CleanStart,
+        expiry_interval := EI
+    } = maps:merge(Defaults, Opts),
+    {ok, Client} = emqtt:start_link([
+        {clientid, ClientId},
+        {clean_start, CleanStart},
+        {proto_ver, v5},
+        {properties, #{'Session-Expiry-Interval' => EI}}
+    ]),
+    on_exit(fun() ->
+        catch emqtt:disconnect(Client, ?RC_NORMAL_DISCONNECTION, #{'Session-Expiry-Interval' => 0})
+    end),
+    {ok, _} = emqtt:connect(Client),
+    Client.