|
|
@@ -21,7 +21,8 @@
|
|
|
%% API
|
|
|
-export([
|
|
|
start_link/0,
|
|
|
- get_subscription_count/0
|
|
|
+ get_subscription_count/0,
|
|
|
+ get_disconnected_session_count/0
|
|
|
]).
|
|
|
|
|
|
%% `gen_server' API
|
|
|
@@ -39,7 +40,9 @@
|
|
|
|
|
|
%% call/cast/info events
|
|
|
-record(tally_subs, {}).
|
|
|
+-record(tally_disconnected_sessions, {}).
|
|
|
-record(get_subscription_count, {}).
|
|
|
+-record(get_disconnected_session_count, {}).
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% API
|
|
|
@@ -59,6 +62,16 @@ get_subscription_count() ->
|
|
|
0
|
|
|
end.
|
|
|
|
|
|
+%% @doc Gets a cached view of the cluster-global count of disconnected persistent sessions.
|
|
|
+-spec get_disconnected_session_count() -> non_neg_integer().
|
|
|
+get_disconnected_session_count() ->
|
|
|
+ case emqx_persistent_message:is_persistence_enabled() of
|
|
|
+ true ->
|
|
|
+ gen_server:call(?MODULE, #get_disconnected_session_count{}, infinity);
|
|
|
+ false ->
|
|
|
+ 0
|
|
|
+ end.
|
|
|
+
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% `gen_server' API
|
|
|
%%------------------------------------------------------------------------------
|
|
|
@@ -66,7 +79,10 @@ get_subscription_count() ->
|
|
|
init(_Opts) ->
|
|
|
case emqx_persistent_message:is_persistence_enabled() of
|
|
|
true ->
|
|
|
- State = #{subs_count => 0},
|
|
|
+ State = #{
|
|
|
+ subs_count => 0,
|
|
|
+ disconnected_session_count => 0
|
|
|
+ },
|
|
|
{ok, State, {continue, #tally_subs{}}};
|
|
|
false ->
|
|
|
ignore
|
|
|
@@ -75,11 +91,18 @@ init(_Opts) ->
|
|
|
handle_continue(#tally_subs{}, State0) ->
|
|
|
State = tally_persistent_subscriptions(State0),
|
|
|
ensure_subs_tally_timer(),
|
|
|
+ {noreply, State, {continue, #tally_disconnected_sessions{}}};
|
|
|
+handle_continue(#tally_disconnected_sessions{}, State0) ->
|
|
|
+ State = tally_disconnected_persistent_sessions(State0),
|
|
|
+ ensure_disconnected_sessions_tally_timer(),
|
|
|
{noreply, State}.
|
|
|
|
|
|
handle_call(#get_subscription_count{}, _From, State) ->
|
|
|
#{subs_count := N} = State,
|
|
|
{reply, N, State};
|
|
|
+handle_call(#get_disconnected_session_count{}, _From, State) ->
|
|
|
+ #{disconnected_session_count := N} = State,
|
|
|
+ {reply, N, State};
|
|
|
handle_call(_Call, _From, State) ->
|
|
|
{reply, {error, bad_call}, State}.
|
|
|
|
|
|
@@ -90,6 +113,10 @@ handle_info(#tally_subs{}, State0) ->
|
|
|
State = tally_persistent_subscriptions(State0),
|
|
|
ensure_subs_tally_timer(),
|
|
|
{noreply, State};
|
|
|
+handle_info(#tally_disconnected_sessions{}, State0) ->
|
|
|
+ State = tally_disconnected_persistent_sessions(State0),
|
|
|
+ ensure_disconnected_sessions_tally_timer(),
|
|
|
+ {noreply, State};
|
|
|
handle_info(_Info, State) ->
|
|
|
{noreply, State}.
|
|
|
|
|
|
@@ -101,7 +128,38 @@ tally_persistent_subscriptions(State0) ->
|
|
|
N = emqx_persistent_session_ds_state:total_subscription_count(),
|
|
|
State0#{subs_count := N}.
|
|
|
|
|
|
+tally_disconnected_persistent_sessions(State0) ->
|
|
|
+ N = do_tally_disconnected_persistent_sessions(),
|
|
|
+ State0#{disconnected_session_count := N}.
|
|
|
+
|
|
|
ensure_subs_tally_timer() ->
|
|
|
Timeout = emqx_config:get([durable_sessions, subscription_count_refresh_interval]),
|
|
|
_ = erlang:send_after(Timeout, self(), #tally_subs{}),
|
|
|
ok.
|
|
|
+
|
|
|
+ensure_disconnected_sessions_tally_timer() ->
|
|
|
+ Timeout = emqx_config:get([durable_sessions, disconnected_session_count_refresh_interval]),
|
|
|
+ _ = erlang:send_after(Timeout, self(), #tally_disconnected_sessions{}),
|
|
|
+ ok.
|
|
|
+
|
|
|
+do_tally_disconnected_persistent_sessions() ->
|
|
|
+ Iter = emqx_persistent_session_ds_state:make_session_iterator(),
|
|
|
+ do_tally_disconnected_persistent_sessions(Iter, 0).
|
|
|
+
|
|
|
+do_tally_disconnected_persistent_sessions('$end_of_table', N) ->
|
|
|
+ N;
|
|
|
+do_tally_disconnected_persistent_sessions(Iter0, N) ->
|
|
|
+ case emqx_persistent_session_ds_state:session_iterator_next(Iter0, 1) of
|
|
|
+ {[], _} ->
|
|
|
+ N;
|
|
|
+ {[{Id, _Meta}], Iter} ->
|
|
|
+ case is_live_session(Id) of
|
|
|
+ true ->
|
|
|
+ do_tally_disconnected_persistent_sessions(Iter, N);
|
|
|
+ false ->
|
|
|
+ do_tally_disconnected_persistent_sessions(Iter, N + 1)
|
|
|
+ end
|
|
|
+ end.
|
|
|
+
|
|
|
+is_live_session(Id) ->
|
|
|
+ [] =/= emqx_cm_registry:lookup_channels(Id).
|