|
|
@@ -41,8 +41,20 @@
|
|
|
%% call/cast/info events
|
|
|
-record(tally_subs, {}).
|
|
|
-record(tally_disconnected_sessions, {}).
|
|
|
--record(get_subscription_count, {}).
|
|
|
--record(get_disconnected_session_count, {}).
|
|
|
+
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
+%% Stat records & table
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
+
|
|
|
+-record(stat_field, {name, value}).
|
|
|
+
|
|
|
+-define(tab, ?MODULE).
|
|
|
+-define(subs_count, subs_count).
|
|
|
+-define(disconnected_session_count, disconnected_session_count).
|
|
|
+-define(subs_count(VALUE), #stat_field{name = ?subs_count, value = VALUE}).
|
|
|
+-define(disconnected_session_count(VALUE), #stat_field{
|
|
|
+ name = ?disconnected_session_count, value = VALUE
|
|
|
+}).
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% API
|
|
|
@@ -57,7 +69,12 @@ start_link() ->
|
|
|
get_subscription_count() ->
|
|
|
case emqx_persistent_message:is_persistence_enabled() of
|
|
|
true ->
|
|
|
- gen_server:call(?MODULE, #get_subscription_count{}, infinity);
|
|
|
+ try ets:lookup(?tab, ?subs_count) of
|
|
|
+ [?subs_count(N)] -> N;
|
|
|
+ [] -> 0
|
|
|
+ catch
|
|
|
+ error:badarg -> 0
|
|
|
+ end;
|
|
|
false ->
|
|
|
0
|
|
|
end.
|
|
|
@@ -67,7 +84,12 @@ get_subscription_count() ->
|
|
|
get_disconnected_session_count() ->
|
|
|
case emqx_persistent_message:is_persistence_enabled() of
|
|
|
true ->
|
|
|
- gen_server:call(?MODULE, #get_disconnected_session_count{}, infinity);
|
|
|
+ try ets:lookup(?tab, ?disconnected_session_count) of
|
|
|
+ [?disconnected_session_count(N)] -> N;
|
|
|
+ [] -> 0
|
|
|
+ catch
|
|
|
+ error:badarg -> 0
|
|
|
+ end;
|
|
|
false ->
|
|
|
0
|
|
|
end.
|
|
|
@@ -80,8 +102,7 @@ init(_Opts) ->
|
|
|
case emqx_persistent_message:is_persistence_enabled() of
|
|
|
true ->
|
|
|
State = #{
|
|
|
- subs_count => 0,
|
|
|
- disconnected_session_count => 0
|
|
|
+ tab => ets:new(?tab, [named_table, set, protected, {keypos, #stat_field.name}])
|
|
|
},
|
|
|
{ok, State, {continue, #tally_subs{}}};
|
|
|
false ->
|
|
|
@@ -97,12 +118,6 @@ handle_continue(#tally_disconnected_sessions{}, State0) ->
|
|
|
ensure_disconnected_sessions_tally_timer(),
|
|
|
{noreply, State}.
|
|
|
|
|
|
-handle_call(#get_subscription_count{}, _From, State) ->
|
|
|
- #{subs_count := N} = State,
|
|
|
- {reply, N, State};
|
|
|
-handle_call(#get_disconnected_session_count{}, _From, State) ->
|
|
|
- #{disconnected_session_count := N} = State,
|
|
|
- {reply, N, State};
|
|
|
handle_call(_Call, _From, State) ->
|
|
|
{reply, {error, bad_call}, State}.
|
|
|
|
|
|
@@ -124,13 +139,15 @@ handle_info(_Info, State) ->
|
|
|
%% Internal fns
|
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
-tally_persistent_subscriptions(State0) ->
|
|
|
+tally_persistent_subscriptions(#{tab := Tab} = State) ->
|
|
|
N = emqx_persistent_session_ds_state:total_subscription_count(),
|
|
|
- State0#{subs_count := N}.
|
|
|
+ _ = ets:insert(Tab, ?subs_count(N)),
|
|
|
+ State.
|
|
|
|
|
|
-tally_disconnected_persistent_sessions(State0) ->
|
|
|
+tally_disconnected_persistent_sessions(#{tab := Tab} = State) ->
|
|
|
N = do_tally_disconnected_persistent_sessions(),
|
|
|
- State0#{disconnected_session_count := N}.
|
|
|
+ _ = ets:insert(Tab, ?disconnected_session_count(N)),
|
|
|
+ State.
|
|
|
|
|
|
ensure_subs_tally_timer() ->
|
|
|
Timeout = emqx_config:get([durable_sessions, subscription_count_refresh_interval]),
|