Przeglądaj źródła

session statistics

Feng 10 lat temu
rodzic
commit
d35be39df7
4 zmienionych plików z 74 dodań i 4 usunięć
  1. 4 1
      src/emqttd_mqueue.erl
  2. 20 2
      src/emqttd_session.erl
  3. 44 1
      src/emqttd_sm.erl
  4. 6 0
      src/emqttd_sm_sup.erl

+ 4 - 1
src/emqttd_mqueue.erl

@@ -58,7 +58,8 @@
 
 -export([new/3, name/1,
          is_empty/1, is_full/1,
-         len/1, in/2, out/1]).
+         len/1, max_len/1,
+         in/2, out/1]).
 
 -define(LOW_WM, 0.2).
 
@@ -108,6 +109,8 @@ is_full(_MQ) -> false.
 
 len(#mqueue{len = Len}) -> Len.
 
+max_len(#mqueue{max_len= MaxLen}) -> MaxLen.
+
 %%------------------------------------------------------------------------------
 %% @doc Queue one message.
 %% @end

+ 20 - 2
src/emqttd_session.erl

@@ -232,6 +232,7 @@ init([CleanSess, ClientId, ClientPid]) ->
             max_awaiting_rel  = emqttd_opts:g(max_awaiting_rel, SessEnv),
             expired_after     = emqttd_opts:g(expired_after, SessEnv) * 3600,
             timestamp         = os:timestamp()},
+    emqttd_sm:register_session(CleanSess, ClientId, info(Session)),
     {ok, Session, hibernate}.
 
 handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = ClientId,
@@ -510,8 +511,8 @@ handle_info(Info, Session = #session{client_id = ClientId}) ->
     lager:critical("Session ~s received unexpected info: ~p", [ClientId, Info]),
     {noreply, Session}.
 
-terminate(_Reason, _Session) ->
-    ok.
+terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) ->
+    emqttd_sm:unregister_session(CleanSess, ClientId).
 
 code_change(_OldVsn, Session, _Extra) ->
     {ok, Session}.
@@ -629,3 +630,20 @@ cancel_timer(Ref) ->
 noreply(State) ->
     {noreply, State, hibernate}.
 
+info(#session{subscriptions     = Subscriptions,
+              inflight_queue    = InflightQueue,
+              max_inflight      = MaxInflight,
+              message_queue     = MessageQueue,
+              awaiting_rel      = AwaitingRel,
+              awaiting_ack      = AwaitingAck,
+              awaiting_comp     = AwaitingComp,
+              timestamp         = CreatedAt}) ->
+    [{pid, self()}, {subscriptions, Subscriptions},
+     {max_inflight, MaxInflight},
+     {inflight_queue, lists:length(InflightQueue)},
+     {message_queue, emqttd_mqueue:len(MessageQueue)},
+     {awaiting_rel, maps:size(AwaitingRel)},
+     {awaiting_ack, maps:size(AwaitingAck)},
+     {awaiting_comp, maps:size(AwaitingComp)},
+     {created_at, CreatedAt}].
+

+ 44 - 1
src/emqttd_sm.erl

@@ -42,6 +42,8 @@
 
 -export([start_session/2, lookup_session/1]).
 
+-export([register_session/3, unregister_session/2]).
+
 -behaviour(gen_server).
 
 %% gen_server Function Exports
@@ -108,6 +110,29 @@ lookup_session(ClientId) ->
         [] -> undefined
     end.
 
+%%------------------------------------------------------------------------------
+%% @doc Register a session with info.
+%% @end
+%%------------------------------------------------------------------------------
+-spec register_session(CleanSess, ClientId, Info) -> ok when
+    CleanSess :: boolean(),
+    ClientId :: binary(),
+    Info :: [tuple()].
+register_session(CleanSess, ClientId, Info) ->
+    SM = gproc_pool:pick_worker(?SM_POOL, ClientId),
+    gen_server:cast(SM, {register, CleanSess, ClientId, Info}).
+
+%%------------------------------------------------------------------------------
+%% @doc Unregister a session.
+%% @end
+%%------------------------------------------------------------------------------
+-spec unregister_session(CleanSess, ClientId) -> ok when
+    CleanSess :: boolean(),
+    ClientId :: binary().
+unregister_session(CleanSess, ClientId) ->
+    SM = gproc_pool:pick_worker(?SM_POOL, ClientId),
+    gen_server:cast(SM, {unregister, CleanSess, ClientId}).
+
 call(SM, Req) -> gen_server:call(SM, Req, infinity).
 
 %%%=============================================================================
@@ -144,7 +169,25 @@ handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) ->
 handle_call(_Request, _From, State) ->
     {reply, ok, State}.
 
-handle_cast(_Msg, State) ->
+%% transient session
+handle_cast({register, true, ClientId, Info}, State) ->
+    ets:insert(mqtt_transient_session, {ClientId, Info}),
+    {noreply, State};
+
+handle_cast({register, false, ClientId, Info}, State) ->
+    ets:insert(mqtt_persistent_session, {ClientId, Info}),
+    {noreply, setstats(State)};
+
+handle_cast({unregister, true, ClientId}, State) ->
+    ets:delete(mqtt_transient_session, ClientId),
+    {noreply, State};
+
+handle_cast({unregister, false, ClientId}, State) ->
+    ets:delete(mqtt_persistent_session, ClientId),
+    {noreply, State};
+
+handle_cast(Msg, State) ->
+    lager:critical("Unexpected Msg: ~p", [Msg]),
     {noreply, State}.
 
 handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) ->

+ 6 - 0
src/emqttd_sm_sup.erl

@@ -43,6 +43,7 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
+    init_session_ets(),
     Schedulers = erlang:system_info(schedulers),
     gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]),
     StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
@@ -55,3 +56,8 @@ init([]) ->
                  end, lists:seq(1, Schedulers)),
     {ok, {{one_for_all, 10, 100}, Children}}.
 
+init_session_ets() ->
+    Tables = [mqtt_transient_session, mqtt_persistent_session],
+    Attrs  = [ordered_set, named_table, public, {write_concurrency, true}],
+    lists:foreach(fun(Tab) -> ets:new(Tab, Attrs) end, Tables).
+