|
|
@@ -58,7 +58,7 @@
|
|
|
-record(update, {name, countdown, interval, func}).
|
|
|
|
|
|
-record(state, {
|
|
|
- timer :: reference(),
|
|
|
+ timer :: reference(),
|
|
|
updates :: [#update{}],
|
|
|
tick_ms :: timeout()
|
|
|
}).
|
|
|
@@ -66,48 +66,53 @@
|
|
|
-type(stats() :: list({atom(), non_neg_integer()})).
|
|
|
|
|
|
%% Connection stats
|
|
|
--define(CONNECTION_STATS, [
|
|
|
- 'connections.count', % current connections
|
|
|
- 'connections.max' % maximum connections connected
|
|
|
-]).
|
|
|
+-define(CONNECTION_STATS,
|
|
|
+ ['connections.count', %% Count of Concurrent Connections
|
|
|
+ 'connections.max' %% Maximum Number of Concurrent Connections
|
|
|
+ ]).
|
|
|
+
|
|
|
+%% Channel stats
|
|
|
+-define(CHANNEL_STATS,
|
|
|
+ ['channels.count', %% Count of Concurrent Channels
|
|
|
+ 'channels.max' %% Maximum Number of Concurrent Channels
|
|
|
+ ]).
|
|
|
|
|
|
%% Session stats
|
|
|
--define(SESSION_STATS, [
|
|
|
- 'sessions.count',
|
|
|
- 'sessions.max',
|
|
|
- 'sessions.persistent.count',
|
|
|
- 'sessions.persistent.max'
|
|
|
-]).
|
|
|
-
|
|
|
-%% Subscribers, Subscriptions stats
|
|
|
--define(PUBSUB_STATS, [
|
|
|
- 'topics.count',
|
|
|
- 'topics.max',
|
|
|
- 'suboptions.count',
|
|
|
- 'suboptions.max',
|
|
|
- 'subscribers.count',
|
|
|
- 'subscribers.max',
|
|
|
- 'subscriptions.count',
|
|
|
- 'subscriptions.max',
|
|
|
- 'subscriptions.shared.count',
|
|
|
- 'subscriptions.shared.max'
|
|
|
-]).
|
|
|
-
|
|
|
--define(ROUTE_STATS, [
|
|
|
- 'routes.count',
|
|
|
- 'routes.max'
|
|
|
-]).
|
|
|
+-define(SESSION_STATS,
|
|
|
+ ['sessions.count', %% Count of Concurrent Sessions
|
|
|
+ 'sessions.max' %% Maximum Number of Concurrent Sessions
|
|
|
+ ]).
|
|
|
+
|
|
|
+%% PubSub stats
|
|
|
+-define(PUBSUB_STATS,
|
|
|
+ ['topics.count',
|
|
|
+ 'topics.max',
|
|
|
+ 'suboptions.count',
|
|
|
+ 'suboptions.max',
|
|
|
+ 'subscribers.count',
|
|
|
+ 'subscribers.max',
|
|
|
+ 'subscriptions.count',
|
|
|
+ 'subscriptions.max',
|
|
|
+ 'subscriptions.shared.count',
|
|
|
+ 'subscriptions.shared.max'
|
|
|
+ ]).
|
|
|
+
|
|
|
+%% Route stats
|
|
|
+-define(ROUTE_STATS,
|
|
|
+ ['routes.count',
|
|
|
+ 'routes.max'
|
|
|
+ ]).
|
|
|
|
|
|
%% Retained stats
|
|
|
--define(RETAINED_STATS, [
|
|
|
- 'retained.count',
|
|
|
- 'retained.max'
|
|
|
-]).
|
|
|
+-define(RETAINED_STATS,
|
|
|
+ ['retained.count',
|
|
|
+ 'retained.max'
|
|
|
+ ]).
|
|
|
|
|
|
-define(TAB, ?MODULE).
|
|
|
-define(SERVER, ?MODULE).
|
|
|
|
|
|
--type opts() :: #{tick_ms := timeout()}.
|
|
|
+-type(opts() :: #{tick_ms := timeout()}).
|
|
|
|
|
|
%% @doc Start stats server
|
|
|
-spec(start_link() -> startlink_ret()).
|
|
|
@@ -122,7 +127,7 @@ start_link(Opts) ->
|
|
|
stop() ->
|
|
|
gen_server:call(?SERVER, stop, infinity).
|
|
|
|
|
|
-%% @doc Generate stats fun
|
|
|
+%% @doc Generate stats fun.
|
|
|
-spec(statsfun(Stat :: atom()) -> fun()).
|
|
|
statsfun(Stat) ->
|
|
|
fun(Val) -> setstat(Stat, Val) end.
|
|
|
@@ -131,7 +136,7 @@ statsfun(Stat) ->
|
|
|
statsfun(Stat, MaxStat) ->
|
|
|
fun(Val) -> setstat(Stat, MaxStat, Val) end.
|
|
|
|
|
|
-%% @doc Get all statistics
|
|
|
+%% @doc Get all statistics.
|
|
|
-spec(getstats() -> stats()).
|
|
|
getstats() ->
|
|
|
case ets:info(?TAB, name) of
|
|
|
@@ -139,7 +144,7 @@ getstats() ->
|
|
|
_ -> ets:tab2list(?TAB)
|
|
|
end.
|
|
|
|
|
|
-%% @doc Get stats by name
|
|
|
+%% @doc Get stats by name.
|
|
|
-spec(getstat(atom()) -> maybe(non_neg_integer())).
|
|
|
getstat(Name) ->
|
|
|
case ets:lookup(?TAB, Name) of
|
|
|
@@ -173,8 +178,7 @@ cancel_update(Name) ->
|
|
|
rec(Name, Secs, UpFun) ->
|
|
|
#update{name = Name, countdown = Secs, interval = Secs, func = UpFun}.
|
|
|
|
|
|
-cast(Msg) ->
|
|
|
- gen_server:cast(?SERVER, Msg).
|
|
|
+cast(Msg) -> gen_server:cast(?SERVER, Msg).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% gen_server callbacks
|
|
|
@@ -182,8 +186,13 @@ cast(Msg) ->
|
|
|
|
|
|
init(#{tick_ms := TickMs}) ->
|
|
|
ok = emqx_tables:new(?TAB, [public, set, {write_concurrency, true}]),
|
|
|
- Stats = lists:append([?CONNECTION_STATS, ?SESSION_STATS, ?PUBSUB_STATS,
|
|
|
- ?ROUTE_STATS, ?RETAINED_STATS]),
|
|
|
+ Stats = lists:append([?CONNECTION_STATS,
|
|
|
+ ?CHANNEL_STATS,
|
|
|
+ ?SESSION_STATS,
|
|
|
+ ?PUBSUB_STATS,
|
|
|
+ ?ROUTE_STATS,
|
|
|
+ ?RETAINED_STATS
|
|
|
+ ]),
|
|
|
true = ets:insert(?TAB, [{Name, 0} || Name <- Stats]),
|
|
|
{ok, start_timer(#state{updates = [], tick_ms = TickMs}), hibernate}.
|
|
|
|
|
|
@@ -211,16 +220,17 @@ handle_cast({setstat, Stat, MaxStat, Val}, State) ->
|
|
|
|
|
|
handle_cast({update_interval, Update = #update{name = Name}},
|
|
|
State = #state{updates = Updates}) ->
|
|
|
- case lists:keyfind(Name, #update.name, Updates) of
|
|
|
- #update{} ->
|
|
|
- ?LOG(warning, "Duplicated update: ~s", [Name]),
|
|
|
- {noreply, State};
|
|
|
- false ->
|
|
|
- {noreply, State#state{updates = [Update | Updates]}}
|
|
|
- end;
|
|
|
+ NState = case lists:keyfind(Name, #update.name, Updates) of
|
|
|
+ #update{} ->
|
|
|
+ ?LOG(warning, "Duplicated update: ~s", [Name]),
|
|
|
+ State;
|
|
|
+ false -> State#state{updates = [Update|Updates]}
|
|
|
+ end,
|
|
|
+ {noreply, NState};
|
|
|
|
|
|
handle_cast({cancel_update, Name}, State = #state{updates = Updates}) ->
|
|
|
- {noreply, State#state{updates = lists:keydelete(Name, #update.name, Updates)}};
|
|
|
+ Updates1 = lists:keydelete(Name, #update.name, Updates),
|
|
|
+ {noreply, State#state{updates = Updates1}};
|
|
|
|
|
|
handle_cast(Msg, State) ->
|
|
|
?LOG(error, "Unexpected cast: ~p", [Msg]),
|
|
|
@@ -233,7 +243,7 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update
|
|
|
try UpFun()
|
|
|
catch
|
|
|
_:Error ->
|
|
|
- ?LOG(error, "update ~s failed: ~p", [Name, Error])
|
|
|
+ ?LOG(error, "Update ~s failed: ~p", [Name, Error])
|
|
|
end,
|
|
|
[Update#update{countdown = I} | Acc];
|
|
|
(Update = #update{countdown = C}, Acc) ->
|
|
|
@@ -259,10 +269,9 @@ safe_update_element(Key, Val) ->
|
|
|
try ets:update_element(?TAB, Key, {2, Val}) of
|
|
|
false ->
|
|
|
ets:insert_new(?TAB, {Key, Val});
|
|
|
- true ->
|
|
|
- true
|
|
|
+ true -> true
|
|
|
catch
|
|
|
error:badarg ->
|
|
|
- ?LOG(warning, "Update ~p to ~p failed", [Key, Val])
|
|
|
+ ?LOG(warning, "Failed to update ~p to ~p", [Key, Val])
|
|
|
end.
|
|
|
|