Kaynağa Gözat

Support client, session stats

Feng Lee 9 yıl önce
ebeveyn
işleme
fa8882b499
1 değiştirilmiş dosya ile 55 ekleme ve 19 silme
  1. 55 19
      src/emqttd_stats.erl

+ 55 - 19
src/emqttd_stats.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2012-2017 Feng Lee <feng@emqtt.io>.
+%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -14,34 +14,39 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
-%% @doc emqttd statistics
 -module(emqttd_stats).
 
--include("emqttd.hrl").
-
 -behaviour(gen_server).
 
--define(SERVER, ?MODULE).
+-author("Feng Lee <feng@emqtt.io>").
+
+-include("emqttd.hrl").
 
 -export([start_link/0, stop/0]).
 
-%% statistics API.
--export([statsfun/1, statsfun/2,
-         getstats/0, getstat/1,
-         setstat/2, setstats/3]).
+%% Client and Session Stats
+-export([set_client_stats/2, get_client_stats/1, del_client_stats/1,
+         set_session_stats/2, get_session_stats/1, del_session_stats/1]).
+
+%% Statistics API.
+-export([statsfun/1, statsfun/2, getstats/0, getstat/1, setstat/2, setstats/3]).
 
 %% gen_server Function Exports
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
 
--record(state, {tick_tref}).
+-record(state, {tick}).
+
+-type(stats() :: list({atom(), non_neg_integer()})).
 
 -define(STATS_TAB, mqtt_stats).
+-define(CLIENT_STATS_TAB, mqtt_client_stats).
+-define(SESSION_STATS_TAB, mqtt_session_stats).
 
 %% $SYS Topics for Clients
 -define(SYSTOP_CLIENTS, [
-    'clients/count',    % clients connected current
-    'clients/max'       % max clients connected
+    'clients/count', % clients connected current
+    'clients/max'    % max clients connected
 ]).
 
 %% $SYS Topics for Sessions
@@ -75,10 +80,40 @@
 %% @doc Start stats server
 -spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
 start_link() ->
-    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 stop() ->
-    gen_server:call(?SERVER, stop).
+    gen_server:call(?MODULE, stop).
+
+-spec(set_client_stats(binary(), stats()) -> true).
+set_client_stats(ClientId, Stats) ->
+    ets:insert(?CLIENT_STATS_TAB, {ClientId, [{'$ts', emqttd_time:now_secs()}|Stats]}).
+
+-spec(get_client_stats(binary()) -> stats()).
+get_client_stats(ClientId) ->
+    case ets:lookup(?CLIENT_STATS_TAB, ClientId) of
+        [{_, Stats}] -> Stats;
+        [] -> []
+    end.
+
+-spec(del_client_stats(binary()) -> true).
+del_client_stats(ClientId) ->
+    ets:delete(?CLIENT_STATS_TAB, ClientId).
+
+-spec(set_session_stats(binary(), stats()) -> true).
+set_session_stats(ClientId, Stats) ->
+    ets:insert(?SESSION_STATS_TAB, {ClientId, [{'$ts', emqttd_time:now_secs()}|Stats]}).
+
+-spec(get_session_stats(binary()) -> stats()).
+get_session_stats(ClientId) ->
+    case ets:lookup(?SESSION_STATS_TAB, ClientId) of
+        [{_, Stats}] -> Stats;
+        [] -> []
+    end.
+
+-spec(del_session_stats(binary()) -> true).
+del_session_stats(ClientId) ->
+    ets:delete(?SESSION_STATS_TAB, ClientId).
 
 %% @doc Generate stats fun
 -spec(statsfun(Stat :: atom()) -> fun()).
@@ -118,13 +153,14 @@ setstats(Stat, MaxStat, Val) ->
 
 init([]) ->
     emqttd_time:seed(),
-    ets:new(?STATS_TAB, [set, public, named_table, {write_concurrency, true}]),
+    lists:foreach(
+      fun(Tab) ->
+        Tab = ets:new(Tab, [set, public, named_table, {write_concurrency, true}])
+      end, [?STATS_TAB, ?CLIENT_STATS_TAB, ?SESSION_STATS_TAB]),
     Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB ++ ?SYSTOP_RETAINED,
     ets:insert(?STATS_TAB, [{Topic, 0} || Topic <- Topics]),
-    % Create $SYS Topics
-    % [ok = emqttd:create(topic, stats_topic(Topic)) || Topic <- Topics],
     % Tick to publish stats
-    {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}.
+    {ok, #state{tick = emqttd_broker:start_tick(tick)}, hibernate}.
 
 handle_call(stop, _From, State) ->
     {stop, normal, ok, State};
@@ -154,7 +190,7 @@ handle_info(tick, State) ->
 handle_info(_Info, State) ->
     {noreply, State}.
 
-terminate(_Reason, #state{tick_tref = TRef}) ->
+terminate(_Reason, #state{tick = TRef}) ->
     emqttd_broker:stop_tick(TRef).
 
 code_change(_OldVsn, State, _Extra) ->