|
|
@@ -18,6 +18,8 @@
|
|
|
|
|
|
-behaviour(gen_server).
|
|
|
|
|
|
+-include("emqx_router.hrl").
|
|
|
+-include("emqx_shared_sub.hrl").
|
|
|
-include("logger.hrl").
|
|
|
-include("types.hrl").
|
|
|
|
|
|
@@ -33,6 +35,9 @@
|
|
|
reclaim_seq/1
|
|
|
]).
|
|
|
|
|
|
+%% Stats fun
|
|
|
+-export([stats_fun/0]).
|
|
|
+
|
|
|
%% gen_server callbacks
|
|
|
-export([
|
|
|
init/1,
|
|
|
@@ -99,6 +104,30 @@ create_seq(Topic) ->
|
|
|
reclaim_seq(Topic) ->
|
|
|
emqx_sequence:reclaim(?SUBSEQ, Topic).
|
|
|
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+%% Stats fun
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+
|
|
|
+stats_fun() ->
|
|
|
+ safe_update_stats(subscriber_val(), 'subscribers.count', 'subscribers.max'),
|
|
|
+ safe_update_stats(table_size(?SUBSCRIPTION), 'subscriptions.count', 'subscriptions.max'),
|
|
|
+ safe_update_stats(table_size(?SUBOPTION), 'suboptions.count', 'suboptions.max').
|
|
|
+
|
|
|
+safe_update_stats(undefined, _Stat, _MaxStat) ->
|
|
|
+ ok;
|
|
|
+safe_update_stats(Val, Stat, MaxStat) when is_integer(Val) ->
|
|
|
+ emqx_stats:setstat(Stat, MaxStat, Val).
|
|
|
+
|
|
|
+subscriber_val() ->
|
|
|
+ sum_subscriber(table_size(?SUBSCRIBER), table_size(?SHARED_SUBSCRIBER)).
|
|
|
+
|
|
|
+sum_subscriber(undefined, undefined) -> undefined;
|
|
|
+sum_subscriber(undefined, V2) when is_integer(V2) -> V2;
|
|
|
+sum_subscriber(V1, undefined) when is_integer(V1) -> V1;
|
|
|
+sum_subscriber(V1, V2) when is_integer(V1), is_integer(V2) -> V1 + V2.
|
|
|
+
|
|
|
+table_size(Tab) when is_atom(Tab) -> ets:info(Tab, size).
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% gen_server callbacks
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -115,7 +144,7 @@ init([]) ->
|
|
|
%% SubMon: SubPid -> SubId
|
|
|
ok = emqx_utils_ets:new(?SUBMON, [public, {read_concurrency, true}, {write_concurrency, true}]),
|
|
|
%% Stats timer
|
|
|
- ok = emqx_stats:update_interval(broker_stats, fun emqx_broker:stats_fun/0),
|
|
|
+ ok = emqx_stats:update_interval(broker_stats, fun ?MODULE:stats_fun/0),
|
|
|
{ok, #{pmon => emqx_pmon:new()}}.
|
|
|
|
|
|
handle_call(Req, _From, State) ->
|