|
@@ -18,7 +18,7 @@
|
|
|
|
|
|
|
|
-include("emqx.hrl").
|
|
-include("emqx.hrl").
|
|
|
|
|
|
|
|
--export([start_link/0]).
|
|
|
|
|
|
|
+-export([start_link/0, start_link/1, stop/0]).
|
|
|
|
|
|
|
|
%% Stats API.
|
|
%% Stats API.
|
|
|
-export([getstats/0, getstat/1]).
|
|
-export([getstats/0, getstat/1]).
|
|
@@ -31,7 +31,8 @@
|
|
|
code_change/3]).
|
|
code_change/3]).
|
|
|
|
|
|
|
|
-record(update, {name, countdown, interval, func}).
|
|
-record(update, {name, countdown, interval, func}).
|
|
|
--record(state, {timer, updates :: [#update{}]}).
|
|
|
|
|
|
|
+-record(state, {timer, updates :: [#update{}],
|
|
|
|
|
+ tick_ms :: timeout()}).
|
|
|
|
|
|
|
|
-type(stats() :: list({atom(), non_neg_integer()})).
|
|
-type(stats() :: list({atom(), non_neg_integer()})).
|
|
|
|
|
|
|
@@ -77,10 +78,20 @@
|
|
|
-define(TAB, ?MODULE).
|
|
-define(TAB, ?MODULE).
|
|
|
-define(SERVER, ?MODULE).
|
|
-define(SERVER, ?MODULE).
|
|
|
|
|
|
|
|
|
|
+-type opts() :: #{tick_ms := timeout()}.
|
|
|
|
|
+
|
|
|
%% @doc Start stats server
|
|
%% @doc Start stats server
|
|
|
-spec(start_link() -> emqx_types:startlink_ret()).
|
|
-spec(start_link() -> emqx_types:startlink_ret()).
|
|
|
start_link() ->
|
|
start_link() ->
|
|
|
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
|
|
|
|
|
|
+ start_link(#{tick_ms => timer:seconds(1)}).
|
|
|
|
|
+
|
|
|
|
|
+-spec(start_link(opts()) -> emqx_types:startlink_ret()).
|
|
|
|
|
+start_link(Opts) ->
|
|
|
|
|
+ gen_server:start_link({local, ?SERVER}, ?MODULE, Opts, []).
|
|
|
|
|
+
|
|
|
|
|
+-spec(stop() -> ok).
|
|
|
|
|
+stop() ->
|
|
|
|
|
+ gen_server:call(?SERVER, stop, infinity).
|
|
|
|
|
|
|
|
%% @doc Generate stats fun
|
|
%% @doc Generate stats fun
|
|
|
-spec(statsfun(Stat :: atom()) -> fun()).
|
|
-spec(statsfun(Stat :: atom()) -> fun()).
|
|
@@ -140,16 +151,18 @@ cast(Msg) ->
|
|
|
%% gen_server callbacks
|
|
%% gen_server callbacks
|
|
|
%%------------------------------------------------------------------------------
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
-init([]) ->
|
|
|
|
|
|
|
+init(#{tick_ms := TickMs}) ->
|
|
|
_ = emqx_tables:new(?TAB, [set, public, {write_concurrency, true}]),
|
|
_ = emqx_tables:new(?TAB, [set, public, {write_concurrency, true}]),
|
|
|
Stats = lists:append([?CONNECTION_STATS, ?SESSION_STATS, ?PUBSUB_STATS,
|
|
Stats = lists:append([?CONNECTION_STATS, ?SESSION_STATS, ?PUBSUB_STATS,
|
|
|
?ROUTE_STATS, ?RETAINED_STATS]),
|
|
?ROUTE_STATS, ?RETAINED_STATS]),
|
|
|
true = ets:insert(?TAB, [{Name, 0} || Name <- Stats]),
|
|
true = ets:insert(?TAB, [{Name, 0} || Name <- Stats]),
|
|
|
- {ok, start_timer(#state{updates = []}), hibernate}.
|
|
|
|
|
|
|
+ {ok, start_timer(#state{updates = [], tick_ms = TickMs}), hibernate}.
|
|
|
|
|
|
|
|
-start_timer(State) ->
|
|
|
|
|
- State#state{timer = emqx_misc:start_timer(timer:seconds(1), tick)}.
|
|
|
|
|
|
|
+start_timer(#state{tick_ms = Ms} = State) ->
|
|
|
|
|
+ State#state{timer = emqx_misc:start_timer(Ms, tick)}.
|
|
|
|
|
|
|
|
|
|
+handle_call(stop, _From, State) ->
|
|
|
|
|
+ {stop, normal, _Reply = ok, State};
|
|
|
handle_call(Req, _From, State) ->
|
|
handle_call(Req, _From, State) ->
|
|
|
emqx_logger:error("[Stats] unexpected call: ~p", [Req]),
|
|
emqx_logger:error("[Stats] unexpected call: ~p", [Req]),
|
|
|
{reply, ignored, State}.
|
|
{reply, ignored, State}.
|
|
@@ -201,7 +214,7 @@ handle_info(Info, State) ->
|
|
|
{noreply, State}.
|
|
{noreply, State}.
|
|
|
|
|
|
|
|
terminate(_Reason, #state{timer = TRef}) ->
|
|
terminate(_Reason, #state{timer = TRef}) ->
|
|
|
- timer:cancel(TRef).
|
|
|
|
|
|
|
+ emqx_misc:cancel_timer(TRef).
|
|
|
|
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
{ok, State}.
|
|
{ok, State}.
|