Просмотр исходного кода

Merge pull request #236 from emqtt/dev

Session collection, plugin improvement, shrunk bootstrap memory
Feng Lee 10 лет назад
Родитель
Сommit
59a3cd93dc

+ 12 - 0
CHANGELOG.md

@@ -2,6 +2,18 @@
 emqttd ChangeLog
 ==================
 
+0.10.0-alpha (2015-08-10)
+-------------------------
+
+New Plugin Architecture
+
+Web Dashboard
+
+MySQL Authentication and ACL Plugin
+
+Session Statistics
+
+
 0.9.3-alpha (2015-07-25)
 -------------------------
 

+ 5 - 2
Makefile

@@ -34,8 +34,11 @@ rel: compile
 plugins:
 	@for plugin in ./plugins/* ; do \
 	if [ -d $${plugin} ]; then \
-		echo "copy $${plugin}"; \
-		cp -R $${plugin} $(DIST)/plugins/ && rm -rf $(DIST)/$${plugin}/src/ ; \
+		mkdir -p $(DIST)/$${plugin}/ ; \
+		cp -R $${plugin}/ebin $(DIST)/$${plugin}/ ; \
+		[ -d "$${plugin}/priv" ] && cp -R $${plugin}/priv $(DIST)/$${plugin}/ ; \
+		[ -d "$${plugin}/etc" ] && cp -R $${plugin}/etc $(DIST)/$${plugin}/ ; \
+		echo "$${plugin} copied" ; \
 	fi \
 	done
 

+ 5 - 1
PLUGIN.md

@@ -1,4 +1,8 @@
 
-Please see [Plugin Design](https://github.com/emqtt/emqttd/wiki/Plugin%20Design).
+git submodule init 
+
+or
 
+git submodule update --remote
 
+Please see [Plugin Design](https://github.com/emqtt/emqttd/wiki/Plugin%20Design).

+ 1 - 1
plugins/emqttd_dashboard

@@ -1 +1 @@
-Subproject commit 2d3c9aeabeb5289b9ae27c503f017ad71bd81174
+Subproject commit dd202346fcfce6b0ae8da76bb7233e91db996bfa

+ 1 - 1
plugins/emysql

@@ -1 +1 @@
-Subproject commit 38927104b44b3f8d237bcf3a2b50f2e0608291b3
+Subproject commit 3305c1ad951e091c198ae10ca852752ca598e5b0

+ 11 - 8
rel/files/emqttd.config

@@ -87,9 +87,6 @@
         ]},
         %% Session
         {session, [
-            %% Expired after 2 days
-            {expired_after, 48},
-
             %% Max number of QoS 1 and 2 messages that can be “in flight” at one time.
             %% 0 means no limit
             {max_inflight, 100},
@@ -104,7 +101,13 @@
             {await_rel_timeout, 8},
 
             %% Max Packets that Awaiting PUBREL, 0 means no limit
-            {max_awaiting_rel, 0}
+            {max_awaiting_rel, 0},
+
+            %% Statistics Collection Interval(seconds)
+            {collect_interval, 10},
+
+            %% Expired after 2 days
+            {expired_after, 48}
 
         ]},
         %% Session
@@ -176,12 +179,12 @@
             %% Size of acceptor pool
             {acceptors, 16},
             %% Maximum number of concurrent clients
-            {max_clients, 1024},
+            {max_clients, 512},
             %% Socket Access Control
             {access, [{allow, all}]},
             %% Socket Options
             {sockopts, [
-                {backlog, 1024}
+                {backlog, 512}
                 %Set buffer if hight thoughtput
                 %{recbuf, 4096},
                 %{sndbuf, 4096}
@@ -192,7 +195,7 @@
             %% Size of acceptor pool
             {acceptors, 4},
             %% Maximum number of concurrent clients
-            {max_clients, 1024},
+            {max_clients, 512},
             %% Socket Access Control
             {access, [{allow, all}]},
             %% SSL certificate and key files
@@ -226,7 +229,7 @@
             %% Size of acceptor pool
             {acceptors, 4},
             %% Maximum number of concurrent clients
-            {max_clients, 512},
+            {max_clients, 64},
             %% Socket Access Control
             {access, [{allow, all}]},
             %% Socket Options

+ 45 - 0
rel/files/emqttd_ctl

@@ -242,6 +242,45 @@ case "$1" in
             exit 1
         fi
         ;;
+    clients)
+        # Make sure the local node IS running
+        RES=`$NODETOOL ping`
+        if [ "$RES" != "pong" ]; then
+            echo "emqttd is not running!"
+            exit 1
+        fi
+        if [ $# -eq 2 -a $2 = "list" ]; then
+            $NODETOOL rpc emqttd_ctl clients list
+        elif [ $# -eq 3 ]; then
+            shift
+            $NODETOOL rpc emqttd_ctl clients $@
+        else
+            echo "Usage: "
+            echo "$SCRIPT clients list"
+            echo "$SCRIPT clients show <ClientId>"
+            echo "$SCRIPT clients kick <ClientId>"
+            exit 1
+        fi
+        ;;
+    sessions)
+        # Make sure the local node IS running
+        RES=`$NODETOOL ping`
+        if [ "$RES" != "pong" ]; then
+            echo "emqttd is not running!"
+            exit 1
+        fi
+        if [ $# -eq 2 -a $2 = "list" ]; then
+            $NODETOOL rpc emqttd_ctl sessions list
+        elif [ $# -eq 3 ]; then
+            shift
+            $NODETOOL rpc emqttd_ctl sessions $@
+        else
+            echo "Usage: "
+            echo "$SCRIPT sessions list"
+            echo "$SCRIPT sessions show <ClientId>"
+            exit 1
+        fi
+        ;;
     plugins)
         # Make sure the local node IS running
         RES=`$NODETOOL ping`
@@ -309,6 +348,12 @@ case "$1" in
         echo "  metrics                       #query broker metrics"
 		echo "  cluster [<Node>]              #query or cluster nodes"
         echo "  ----------------------------------------------------------------"
+        echo "  clients list                  #list all clients"
+        echo "  clients show <ClientId>       #show a client"
+        echo "  clients kick <ClientId>       #kick a client"
+        echo "  session list                  #list all sessions"
+        echo "  session show <ClientId>       #show a sessions"
+        echo "  ----------------------------------------------------------------"
         echo "  plugins list                  #query loaded plugins"
         echo "  plugins load <Plugin>         #load plugin"
         echo "  plugins unload <Plugin>       #unload plugin"

+ 4 - 5
rel/files/vm.args

@@ -22,10 +22,10 @@
 
 ## Enable kernel poll and a few async threads
 +K true
-+A 32
++A 16
 
 ## max process numbers
-+P 1000000
++P 8192
 
 ##-------------------------------------------------------------------------
 ## Env
@@ -36,8 +36,7 @@
 
 -env ERTS_MAX_PORTS 4096
 
-#-env ERL_MAX_ETS_TABLES 1024
+-env ERL_MAX_ETS_TABLES 1024
 
 ## Tweak GC to run more often
-##-env ERL_FULLSWEEP_AFTER 1000
-#
+-env ERL_FULLSWEEP_AFTER 1000

+ 8 - 1
src/emqttd_client.erl

@@ -24,6 +24,7 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_client).
 
 -author("Feng Lee <feng@emqtt.io>").
@@ -33,7 +34,7 @@
 -include("emqttd_protocol.hrl").
 
 %% API Function Exports
--export([start_link/2, info/1]).
+-export([start_link/2, info/1, kick/1]).
 
 -behaviour(gen_server).
 
@@ -60,6 +61,9 @@ start_link(SockArgs, PktOpts) ->
 info(Pid) ->
     gen_server:call(Pid, info, infinity).
 
+kick(Pid) ->
+    gen_server:call(Pid, kick).
+
 init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) ->
     % Transform if ssl.
     {ok, NewSock} = esockd_connection:accept(SockArgs),
@@ -84,6 +88,9 @@ handle_call(info, _From, State = #state{conn_name = ConnName,
                                         proto_state = ProtoState}) ->
     {reply, [{conn_name, ConnName} | emqttd_protocol:info(ProtoState)], State};
 
+handle_call(kick, _From, State) ->
+    {stop, {shutdown, kick}, ok, State};
+
 handle_call(Req, _From, State = #state{peername = Peername}) ->
     lager:critical("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]),
     {reply, {error, unsupported_request}, State}.    

+ 87 - 3
src/emqttd_ctl.erl

@@ -42,6 +42,8 @@
          stats/1,
          metrics/1,
          cluster/1,
+         clients/1,
+         sessions/1,
          listeners/1,
          bridges/1,
          plugins/1,
@@ -135,6 +137,41 @@ stats([]) ->
     
 metrics([]) ->
     [?PRINT("~s: ~p~n", [Metric, Val]) || {Metric, Val} <- emqttd_metrics:all()].
+
+clients(["list"]) ->
+    dump(client, mqtt_client);
+
+clients(["show", ClientId]) ->
+    case emqttd_cm:lookup(list_to_binary(ClientId)) of
+        undefined ->
+            ?PRINT_MSG("Not Found.~n");
+        Client -> 
+            print(client, Client)
+    end;
+
+clients(["kick", ClientId]) ->
+    case emqttd_cm:lookup(list_to_binary(ClientId)) of
+        undefined ->
+            ?PRINT_MSG("Not Found.~n");
+        #mqtt_client{client_pid = Pid} -> 
+            emqttd_client:kick(Pid)
+    end.
+
+sessions(["list"]) ->
+    dump(session, mqtt_transient_session),
+    dump(session, mqtt_persistent_session);
+
+sessions(["show", ClientId0]) ->
+    ClientId = list_to_binary(ClientId0),
+    case {ets:lookup(mqtt_transient_session, ClientId),
+          ets:lookup(mqtt_persistent_session, ClientId)} of
+        {[], []} ->
+            ?PRINT_MSG("Not Found.~n");
+        {[SessInfo], _} -> 
+            print(session, SessInfo);
+        {_, [SessInfo]} -> 
+            print(session, SessInfo)
+    end.
     
 listeners([]) ->
     lists:foreach(fun({{Protocol, Port}, Pid}) ->
@@ -178,9 +215,7 @@ bridges(["stop", SNode, Topic]) ->
     end.
 
 plugins(["list"]) ->
-    lists:foreach(fun(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) ->
-            ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", [Name, Ver, Descr, Active])
-        end, emqttd_plugins:list());
+    lists:foreach(fun(Plugin) -> print(plugin, Plugin) end, emqttd_plugins:list());
 
 plugins(["load", Name]) ->
     case emqttd_plugins:load(list_to_atom(Name)) of
@@ -223,6 +258,7 @@ stop_trace(Who, Name) ->
             ?PRINT("stop to trace ~s ~s error: ~p.~n", [Who, Name, Error])
     end.
 
+
 node_name(SNode) ->
     SNode1 =
     case string:tokens(SNode, "@") of
@@ -260,3 +296,51 @@ parse_opt(bridge, queue, Len) ->
 parse_opt(_Cmd, Opt, _Val) ->
     ?PRINT("Bad Option: ~s~n", [Opt]).
 
+dump(Type, Table) ->
+    dump(Type, Table, ets:first(Table)).
+
+dump(_Type, _Table, '$end_of_table') ->
+    ok;
+dump(Type, Table, Key) ->
+    case ets:lookup(Table, Key) of
+        [Record] -> print(Type, Record);
+        [] -> ignore
+    end,
+    dump(Type, Table, ets:next(Table, Key)).
+
+print(client, #mqtt_client{client_id = ClientId, clean_sess = CleanSess,
+                           username = Username, peername = Peername,
+                           connected_at = ConnectedAt}) ->
+    ?PRINT("Client(~s, clean_sess=~s, username=~s, peername=~s, connected_at=~p)~n",
+            [ClientId, CleanSess, Username,
+             emqttd_net:format(Peername),
+             emqttd_util:now_to_secs(ConnectedAt)]);
+
+print(session, {ClientId, SessInfo}) ->
+    InfoKeys = [clean_sess, 
+                max_inflight,
+                inflight_queue,
+                message_queue,
+                awaiting_rel,
+                awaiting_ack,
+                awaiting_comp,
+                created_at,
+                subscriptions],
+    ?PRINT("Session(~s, clean_sess=~s, max_inflight=~w, inflight_queue=~w, "
+           "message_queue=~w, awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, "
+           "created_at=~w, subscriptions=~s)~n",
+            [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]);
+
+print(plugin, #mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) ->
+    ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n",
+               [Name, Ver, Descr, Active]).
+
+format(created_at, Val) ->
+    emqttd_util:now_to_secs(Val);
+
+format(subscriptions, List) ->
+    string:join([io_lib:format("~s:~w", [Topic, Qos]) || {Topic, Qos} <- List], ",");
+
+format(_, Val) ->
+    Val.
+

+ 4 - 1
src/emqttd_mqueue.erl

@@ -58,7 +58,8 @@
 
 -export([new/3, name/1,
          is_empty/1, is_full/1,
-         len/1, in/2, out/1]).
+         len/1, max_len/1,
+         in/2, out/1]).
 
 -define(LOW_WM, 0.2).
 
@@ -108,6 +109,8 @@ is_full(_MQ) -> false.
 
 len(#mqueue{len = Len}) -> Len.
 
+max_len(#mqueue{max_len= MaxLen}) -> MaxLen.
+
 %%------------------------------------------------------------------------------
 %% @doc Queue one message.
 %% @end

+ 2 - 1
src/emqttd_plugins.erl

@@ -173,7 +173,8 @@ set_config([{AppName, Envs} | Config]) ->
 start_app(App, SuccFun) ->
     case application:ensure_all_started(App) of
         {ok, Started} ->
-            lager:info("Started Apps: ~p, load plugin ~p successfully", [Started, App]),
+            lager:info("Started Apps: ~p", [Started]),
+            lager:info("Load plugin ~p successfully", [App]),
             SuccFun(App),
             {ok, Started};
         {error, {ErrApp, Reason}} ->

+ 0 - 47
src/emqttd_qos.erl

@@ -1,47 +0,0 @@
-%%%-----------------------------------------------------------------------------
-%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
-%%%
-%%% Permission is hereby granted, free of charge, to any person obtaining a copy
-%%% of this software and associated documentation files (the "Software"), to deal
-%%% in the Software without restriction, including without limitation the rights
-%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-%%% copies of the Software, and to permit persons to whom the Software is
-%%% furnished to do so, subject to the following conditions:
-%%%
-%%% The above copyright notice and this permission notice shall be included in all
-%%% copies or substantial portions of the Software.
-%%%
-%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-%%% SOFTWARE.
-%%%-----------------------------------------------------------------------------
-%%% @doc
-%%% emqttd Qos Functions.
-%%%
-%%% @end
-%%%-----------------------------------------------------------------------------
--module(emqttd_qos).
-
--include("emqttd_protocol.hrl").
-
--export([a/1, i/1]).
-
-a(?QOS_0) -> qos0;
-a(?QOS_1) -> qos1;
-a(?QOS_2) -> qos2;
-a(qos0)   -> qos0;
-a(qos1)   -> qos1;
-a(qos2)   -> qos2.
-
-i(?QOS_0) -> ?QOS_0;
-i(?QOS_1) -> ?QOS_1;
-i(?QOS_2) -> ?QOS_2;
-i(qos0)   -> ?QOS_0;
-i(qos1)   -> ?QOS_1;
-i(qos2)   -> ?QOS_2.
-
-

+ 41 - 3
src/emqttd_session.erl

@@ -126,6 +126,10 @@
         expired_after = 172800,
 
         expired_timer,
+
+        collect_interval,
+
+        collect_timer,
         
         timestamp}).
 
@@ -231,8 +235,11 @@ init([CleanSess, ClientId, ClientPid]) ->
             await_rel_timeout = emqttd_opts:g(await_rel_timeout, SessEnv),
             max_awaiting_rel  = emqttd_opts:g(max_awaiting_rel, SessEnv),
             expired_after     = emqttd_opts:g(expired_after, SessEnv) * 3600,
+            collect_interval  = emqttd_opts:g(collect_interval, SessEnv, 0),
             timestamp         = os:timestamp()},
-    {ok, Session, hibernate}.
+    emqttd_sm:register_session(CleanSess, ClientId, info(Session)),
+    %% start statistics
+    {ok, start_collector(Session), hibernate}.
 
 handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = ClientId,
                                                                subscriptions = Subscriptions}) ->
@@ -482,6 +489,10 @@ handle_info({timeout, awaiting_comp, PktId}, Session = #session{client_id = Clie
             noreply(Session)
     end;
 
+handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = ClientId}) ->
+    emqttd_sm:register_session(CleanSess, ClientId, info(Session)),
+    {noreply, start_collector(Session), hibernate};
+
 handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true,
                                                              client_pid = ClientPid}) ->
     {stop, normal, Session};
@@ -510,8 +521,8 @@ handle_info(Info, Session = #session{client_id = ClientId}) ->
     lager:critical("Session ~s received unexpected info: ~p", [ClientId, Info]),
     {noreply, Session}.
 
-terminate(_Reason, _Session) ->
-    ok.
+terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) ->
+    emqttd_sm:unregister_session(CleanSess, ClientId).
 
 code_change(_OldVsn, Session, _Extra) ->
     {ok, Session}.
@@ -629,3 +640,30 @@ cancel_timer(Ref) ->
 noreply(State) ->
     {noreply, State, hibernate}.
 
+start_collector(Session = #session{collect_interval = 0}) ->
+    Session;
+
+start_collector(Session = #session{collect_interval = Interval}) ->
+    TRef = erlang:send_after(Interval * 1000, self(), collect_info),
+    Session#session{collect_timer = TRef}.
+
+info(#session{clean_sess        = CleanSess,
+              subscriptions     = Subscriptions,
+              inflight_queue    = InflightQueue,
+              max_inflight      = MaxInflight,
+              message_queue     = MessageQueue,
+              awaiting_rel      = AwaitingRel,
+              awaiting_ack      = AwaitingAck,
+              awaiting_comp     = AwaitingComp,
+              timestamp         = CreatedAt}) ->
+    [{pid, self()}, 
+     {clean_sess, CleanSess},
+     {subscriptions, Subscriptions},
+     {max_inflight, MaxInflight},
+     {inflight_queue, length(InflightQueue)},
+     {message_queue, emqttd_mqueue:len(MessageQueue)},
+     {awaiting_rel, maps:size(AwaitingRel)},
+     {awaiting_ack, maps:size(AwaitingAck)},
+     {awaiting_comp, maps:size(AwaitingComp)},
+     {created_at, CreatedAt}].
+

+ 44 - 4
src/emqttd_sm.erl

@@ -42,6 +42,8 @@
 
 -export([start_session/2, lookup_session/1]).
 
+-export([register_session/3, unregister_session/2]).
+
 -behaviour(gen_server).
 
 %% gen_server Function Exports
@@ -57,6 +59,7 @@
 %%%=============================================================================
 
 mnesia(boot) ->
+    %% global session...
     ok = emqttd_mnesia:create_table(session, [
                 {type, ordered_set},
                 {ram_copies, [node()]},
@@ -107,6 +110,34 @@ lookup_session(ClientId) ->
         [] -> undefined
     end.
 
+%%------------------------------------------------------------------------------
+%% @doc Register a session with info.
+%% @end
+%%------------------------------------------------------------------------------
+-spec register_session(CleanSess, ClientId, Info) -> ok when
+    CleanSess :: boolean(),
+    ClientId :: binary(),
+    Info :: [tuple()].
+register_session(true, ClientId, Info) ->
+    ets:insert(mqtt_transient_session, {ClientId, Info});
+
+register_session(false, ClientId, Info) ->
+    SM = gproc_pool:pick_worker(?SM_POOL, ClientId),
+    gen_server:cast(SM, {register, ClientId, Info}).
+
+%%------------------------------------------------------------------------------
+%% @doc Unregister a session.
+%% @end
+%%------------------------------------------------------------------------------
+-spec unregister_session(CleanSess, ClientId) -> ok when
+    CleanSess :: boolean(),
+    ClientId  :: binary().
+unregister_session(true, ClientId) ->
+    ets:delete(mqtt_transient_session, ClientId);
+unregister_session(false, ClientId) ->
+    SM = gproc_pool:pick_worker(?SM_POOL, ClientId),
+    gen_server:cast(SM, {unregister, ClientId}).
+
 call(SM, Req) -> gen_server:call(SM, Req, infinity).
 
 %%%=============================================================================
@@ -143,7 +174,17 @@ handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) ->
 handle_call(_Request, _From, State) ->
     {reply, ok, State}.
 
-handle_cast(_Msg, State) ->
+%% persistent session
+handle_cast({register, ClientId, Info}, State) ->
+    ets:insert(mqtt_persistent_session, {ClientId, Info}),
+    {noreply, setstats(State)};
+
+handle_cast({unregister, ClientId}, State) ->
+    ets:delete(mqtt_persistent_session, ClientId),
+    {noreply, setstats(State)};
+
+handle_cast(Msg, State) ->
+    lager:critical("Unexpected Msg: ~p", [Msg]),
     {noreply, State}.
 
 handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) ->
@@ -275,7 +316,6 @@ remove_session(Session) ->
             mnesia:delete_object(session, Session, write)
         end).
 
-setstats(State = #state{statsfun = _StatsFun}) ->
-    State.
-
+setstats(State = #state{statsfun = StatsFun}) ->
+    StatsFun(ets:info(mqtt_persistent_session, size)), State.
 

+ 6 - 0
src/emqttd_sm_sup.erl

@@ -43,6 +43,7 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
+    init_session_ets(),
     Schedulers = erlang:system_info(schedulers),
     gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]),
     StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
@@ -55,3 +56,8 @@ init([]) ->
                  end, lists:seq(1, Schedulers)),
     {ok, {{one_for_all, 10, 100}, Children}}.
 
+init_session_ets() ->
+    Tables = [mqtt_transient_session, mqtt_persistent_session],
+    Attrs  = [ordered_set, named_table, public, {write_concurrency, true}],
+    lists:foreach(fun(Tab) -> ets:new(Tab, Attrs) end, Tables).
+