Feng Lee 10 лет назад
Родитель
Сommit
5a612657be
4 измененных файлов с 121 добавлено и 9 удалено
  1. 8 1
      src/emqttd_client.erl
  2. 87 3
      src/emqttd_ctl.erl
  3. 2 1
      src/emqttd_plugins.erl
  4. 24 4
      src/emqttd_session.erl

+ 8 - 1
src/emqttd_client.erl

@@ -24,6 +24,7 @@
 %%%
 %%%
 %%% @end
 %%% @end
 %%%-----------------------------------------------------------------------------
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_client).
 -module(emqttd_client).
 
 
 -author("Feng Lee <feng@emqtt.io>").
 -author("Feng Lee <feng@emqtt.io>").
@@ -33,7 +34,7 @@
 -include("emqttd_protocol.hrl").
 -include("emqttd_protocol.hrl").
 
 
 %% API Function Exports
 %% API Function Exports
--export([start_link/2, info/1]).
+-export([start_link/2, info/1, kick/1]).
 
 
 -behaviour(gen_server).
 -behaviour(gen_server).
 
 
@@ -60,6 +61,9 @@ start_link(SockArgs, PktOpts) ->
 info(Pid) ->
 info(Pid) ->
     gen_server:call(Pid, info, infinity).
     gen_server:call(Pid, info, infinity).
 
 
+kick(Pid) ->
+    gen_server:call(Pid, kick).
+
 init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) ->
 init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) ->
     % Transform if ssl.
     % Transform if ssl.
     {ok, NewSock} = esockd_connection:accept(SockArgs),
     {ok, NewSock} = esockd_connection:accept(SockArgs),
@@ -84,6 +88,9 @@ handle_call(info, _From, State = #state{conn_name = ConnName,
                                         proto_state = ProtoState}) ->
                                         proto_state = ProtoState}) ->
     {reply, [{conn_name, ConnName} | emqttd_protocol:info(ProtoState)], State};
     {reply, [{conn_name, ConnName} | emqttd_protocol:info(ProtoState)], State};
 
 
+handle_call(kick, _From, State) ->
+    {stop, {shutdown, kick}, ok, State};
+
 handle_call(Req, _From, State = #state{peername = Peername}) ->
 handle_call(Req, _From, State = #state{peername = Peername}) ->
     lager:critical("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]),
     lager:critical("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]),
     {reply, {error, unsupported_request}, State}.    
     {reply, {error, unsupported_request}, State}.    

+ 87 - 3
src/emqttd_ctl.erl

@@ -42,6 +42,8 @@
          stats/1,
          stats/1,
          metrics/1,
          metrics/1,
          cluster/1,
          cluster/1,
+         clients/1,
+         sessions/1,
          listeners/1,
          listeners/1,
          bridges/1,
          bridges/1,
          plugins/1,
          plugins/1,
@@ -135,6 +137,41 @@ stats([]) ->
     
     
 metrics([]) ->
 metrics([]) ->
     [?PRINT("~s: ~p~n", [Metric, Val]) || {Metric, Val} <- emqttd_metrics:all()].
     [?PRINT("~s: ~p~n", [Metric, Val]) || {Metric, Val} <- emqttd_metrics:all()].
+
+clients(["list"]) ->
+    dump(client, mqtt_client);
+
+clients(["show", ClientId]) ->
+    case emqttd_cm:lookup(list_to_binary(ClientId)) of
+        undefined ->
+            ?PRINT_MSG("Not Found.~n");
+        Client -> 
+            print(client, Client)
+    end;
+
+clients(["kick", ClientId]) ->
+    case emqttd_cm:lookup(list_to_binary(ClientId)) of
+        undefined ->
+            ?PRINT_MSG("Not Found.~n");
+        #mqtt_client{client_pid = Pid} -> 
+            emqttd_client:kick(Pid)
+    end.
+
+sessions(["list"]) ->
+    dump(session, mqtt_transient_session),
+    dump(session, mqtt_persistent_session);
+
+sessions(["show", ClientId0]) ->
+    ClientId = list_to_binary(ClientId0),
+    case {ets:lookup(mqtt_transient_session, ClientId),
+          ets:lookup(mqtt_persistent_session, ClientId)} of
+        {[], []} ->
+            ?PRINT_MSG("Not Found.~n");
+        {[SessInfo], _} -> 
+            print(session, SessInfo);
+        {_, [SessInfo]} -> 
+            print(session, SessInfo)
+    end.
     
     
 listeners([]) ->
 listeners([]) ->
     lists:foreach(fun({{Protocol, Port}, Pid}) ->
     lists:foreach(fun({{Protocol, Port}, Pid}) ->
@@ -178,9 +215,7 @@ bridges(["stop", SNode, Topic]) ->
     end.
     end.
 
 
 plugins(["list"]) ->
 plugins(["list"]) ->
-    lists:foreach(fun(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) ->
-            ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", [Name, Ver, Descr, Active])
-        end, emqttd_plugins:list());
+    lists:foreach(fun(Plugin) -> print(plugin, Plugin) end, emqttd_plugins:list());
 
 
 plugins(["load", Name]) ->
 plugins(["load", Name]) ->
     case emqttd_plugins:load(list_to_atom(Name)) of
     case emqttd_plugins:load(list_to_atom(Name)) of
@@ -223,6 +258,7 @@ stop_trace(Who, Name) ->
             ?PRINT("stop to trace ~s ~s error: ~p.~n", [Who, Name, Error])
             ?PRINT("stop to trace ~s ~s error: ~p.~n", [Who, Name, Error])
     end.
     end.
 
 
+
 node_name(SNode) ->
 node_name(SNode) ->
     SNode1 =
     SNode1 =
     case string:tokens(SNode, "@") of
     case string:tokens(SNode, "@") of
@@ -260,3 +296,51 @@ parse_opt(bridge, queue, Len) ->
 parse_opt(_Cmd, Opt, _Val) ->
 parse_opt(_Cmd, Opt, _Val) ->
     ?PRINT("Bad Option: ~s~n", [Opt]).
     ?PRINT("Bad Option: ~s~n", [Opt]).
 
 
+dump(Type, Table) ->
+    dump(Type, Table, ets:first(Table)).
+
+dump(_Type, _Table, '$end_of_table') ->
+    ok;
+dump(Type, Table, Key) ->
+    case ets:lookup(Table, Key) of
+        [Record] -> print(Type, Record);
+        [] -> ignore
+    end,
+    dump(Type, Table, ets:next(Table, Key)).
+
+print(client, #mqtt_client{client_id = ClientId, clean_sess = CleanSess,
+                           username = Username, peername = Peername,
+                           connected_at = ConnectedAt}) ->
+    ?PRINT("Client(~s, clean_sess=~s, username=~s, peername=~s, connected_at=~p)~n",
+            [ClientId, CleanSess, Username,
+             emqttd_net:format(Peername),
+             emqttd_util:now_to_secs(ConnectedAt)]);
+
+print(session, {ClientId, SessInfo}) ->
+    InfoKeys = [clean_sess, 
+                max_inflight,
+                inflight_queue,
+                message_queue,
+                awaiting_rel,
+                awaiting_ack,
+                awaiting_comp,
+                created_at,
+                subscriptions],
+    ?PRINT("Session(~s, clean_sess=~s, max_inflight=~w, inflight_queue=~w, "
+           "message_queue=~w, awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, "
+           "created_at=~w, subscriptions=~s)~n",
+            [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]);
+
+print(plugin, #mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) ->
+    ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n",
+               [Name, Ver, Descr, Active]).
+
+format(created_at, Val) ->
+    emqttd_util:now_to_secs(Val);
+
+format(subscriptions, List) ->
+    string:join([io_lib:format("~s:~w", [Topic, Qos]) || {Topic, Qos} <- List], ",");
+
+format(_, Val) ->
+    Val.
+

+ 2 - 1
src/emqttd_plugins.erl

@@ -173,7 +173,8 @@ set_config([{AppName, Envs} | Config]) ->
 start_app(App, SuccFun) ->
 start_app(App, SuccFun) ->
     case application:ensure_all_started(App) of
     case application:ensure_all_started(App) of
         {ok, Started} ->
         {ok, Started} ->
-            lager:info("Started Apps: ~p, load plugin ~p successfully", [Started, App]),
+            lager:info("Started Apps: ~p", [Started]),
+            lager:info("Load plugin ~p successfully", [App]),
             SuccFun(App),
             SuccFun(App),
             {ok, Started};
             {ok, Started};
         {error, {ErrApp, Reason}} ->
         {error, {ErrApp, Reason}} ->

+ 24 - 4
src/emqttd_session.erl

@@ -126,6 +126,10 @@
         expired_after = 172800,
         expired_after = 172800,
 
 
         expired_timer,
         expired_timer,
+
+        collect_interval,
+
+        collect_timer,
         
         
         timestamp}).
         timestamp}).
 
 
@@ -231,9 +235,11 @@ init([CleanSess, ClientId, ClientPid]) ->
             await_rel_timeout = emqttd_opts:g(await_rel_timeout, SessEnv),
             await_rel_timeout = emqttd_opts:g(await_rel_timeout, SessEnv),
             max_awaiting_rel  = emqttd_opts:g(max_awaiting_rel, SessEnv),
             max_awaiting_rel  = emqttd_opts:g(max_awaiting_rel, SessEnv),
             expired_after     = emqttd_opts:g(expired_after, SessEnv) * 3600,
             expired_after     = emqttd_opts:g(expired_after, SessEnv) * 3600,
+            collect_interval  = emqttd_opts:g(collect_interval, SessEnv, 0),
             timestamp         = os:timestamp()},
             timestamp         = os:timestamp()},
     emqttd_sm:register_session(CleanSess, ClientId, info(Session)),
     emqttd_sm:register_session(CleanSess, ClientId, info(Session)),
-    {ok, Session, hibernate}.
+    %% start statistics
+    {ok, start_collector(Session), hibernate}.
 
 
 handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = ClientId,
 handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = ClientId,
                                                                subscriptions = Subscriptions}) ->
                                                                subscriptions = Subscriptions}) ->
@@ -483,6 +489,10 @@ handle_info({timeout, awaiting_comp, PktId}, Session = #session{client_id = Clie
             noreply(Session)
             noreply(Session)
     end;
     end;
 
 
+handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = ClientId}) ->
+    emqttd_sm:register_session(CleanSess, ClientId, info(Session)),
+    {noreply, start_collector(Session), hibernate};
+
 handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true,
 handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true,
                                                              client_pid = ClientPid}) ->
                                                              client_pid = ClientPid}) ->
     {stop, normal, Session};
     {stop, normal, Session};
@@ -630,7 +640,15 @@ cancel_timer(Ref) ->
 noreply(State) ->
 noreply(State) ->
     {noreply, State, hibernate}.
     {noreply, State, hibernate}.
 
 
-info(#session{subscriptions     = Subscriptions,
+start_collector(Session = #session{collect_interval = 0}) ->
+    Session;
+
+start_collector(Session = #session{collect_interval = Interval}) ->
+    TRef = erlang:send_after(Interval * 1000, self(), collect_info),
+    Session#session{collect_timer = TRef}.
+
+info(#session{clean_sess        = CleanSess,
+              subscriptions     = Subscriptions,
               inflight_queue    = InflightQueue,
               inflight_queue    = InflightQueue,
               max_inflight      = MaxInflight,
               max_inflight      = MaxInflight,
               message_queue     = MessageQueue,
               message_queue     = MessageQueue,
@@ -638,9 +656,11 @@ info(#session{subscriptions     = Subscriptions,
               awaiting_ack      = AwaitingAck,
               awaiting_ack      = AwaitingAck,
               awaiting_comp     = AwaitingComp,
               awaiting_comp     = AwaitingComp,
               timestamp         = CreatedAt}) ->
               timestamp         = CreatedAt}) ->
-    [{pid, self()}, {subscriptions, Subscriptions},
+    [{pid, self()}, 
+     {clean_sess, CleanSess},
+     {subscriptions, Subscriptions},
      {max_inflight, MaxInflight},
      {max_inflight, MaxInflight},
-     {inflight_queue, lists:length(InflightQueue)},
+     {inflight_queue, length(InflightQueue)},
      {message_queue, emqttd_mqueue:len(MessageQueue)},
      {message_queue, emqttd_mqueue:len(MessageQueue)},
      {awaiting_rel, maps:size(AwaitingRel)},
      {awaiting_rel, maps:size(AwaitingRel)},
      {awaiting_ack, maps:size(AwaitingAck)},
      {awaiting_ack, maps:size(AwaitingAck)},