Feng 10 лет назад
Родитель
Сommit
c85617c080

+ 36 - 32
include/emqttd.hrl

@@ -1,24 +1,24 @@
-%%------------------------------------------------------------------------------
-%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
-%% 
-%% Permission is hereby granted, free of charge, to any person obtaining a copy
-%% of this software and associated documentation files (the "Software"), to deal
-%% in the Software without restriction, including without limitation the rights
-%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-%% copies of the Software, and to permit persons to whom the Software is
-%% furnished to do so, subject to the following conditions:
-%% 
-%% The above copyright notice and this permission notice shall be included in all
-%% copies or substantial portions of the Software.
-%% 
-%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-%% SOFTWARE.
-%%------------------------------------------------------------------------------
+%%%-----------------------------------------------------------------------------
+%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
+%%%
+%%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%%% of this software and associated documentation files (the "Software"), to deal
+%%% in the Software without restriction, including without limitation the rights
+%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%%% copies of the Software, and to permit persons to whom the Software is
+%%% furnished to do so, subject to the following conditions:
+%%%
+%%% The above copyright notice and this permission notice shall be included in all
+%%% copies or substantial portions of the Software.
+%%%
+%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+%%% SOFTWARE.
+%%%-----------------------------------------------------------------------------
 %%% @doc
 %%% MQTT Broker Header.
 %%%
@@ -73,7 +73,7 @@
 %%------------------------------------------------------------------------------
 -record(mqtt_queue, {
     name     :: binary(),
-    subpid   :: pid(),
+    qpid     :: pid(),
     qos = 0  :: 0 | 1 | 2
 }).
 
@@ -83,12 +83,15 @@
 %% MQTT Client
 %%------------------------------------------------------------------------------
 -record(mqtt_client, {
-    client_id   :: binary() | undefined,
-    username    :: binary() | undefined,
-    ipaddress   :: inet:ip_address(),
-    clean_sess  :: boolean(),
-    client_pid  :: pid(),
-    proto_ver   :: 3 | 4
+    client_id     :: binary() | undefined,
+    client_pid    :: pid(),
+    username      :: binary() | undefined,
+    peername      :: {inet:ip_address(), integer()},
+    clean_sess    :: boolean(),
+    proto_ver     :: 3 | 4,
+    keepalive = 0,
+    will_topic    :: undefined | binary(),
+    connected_at  :: erlang:timestamp()
 }).
 
 -type mqtt_client() :: #mqtt_client{}.
@@ -98,8 +101,9 @@
 %%------------------------------------------------------------------------------
 -record(mqtt_session, {
     client_id,
-    session_pid,
-    subscriptions = []
+    sess_pid,
+    persistent,
+    on_node
 }).
 
 -type mqtt_session() :: #mqtt_session{}.
@@ -111,8 +115,8 @@
 -type mqtt_pktid() :: 1..16#ffff | undefined.
 
 -record(mqtt_message, {
-    msgid           :: mqtt_msgid(),      %% Unique Message ID
-    pktid           :: 1..16#ffff,        %% PacketId
+    msgid           :: mqtt_msgid(),      %% Global unique message ID
+    pktid           :: mqtt_pktid(),      %% PacketId
     topic           :: binary(),          %% Topic that the message is published to
     from            :: binary() | atom(), %% ClientId of publisher
     qos    = 0      :: 0 | 1 | 2,         %% Message QoS

+ 0 - 1
src/emqttd.erl

@@ -230,4 +230,3 @@ is_running(Node) ->
         Pid when is_pid(Pid) -> true
     end.
 
-

+ 2 - 2
src/emqttd_access_control.erl

@@ -102,8 +102,8 @@ check_acl(#mqtt_client{client_id = ClientId}, PubSub, Topic, []) ->
     allow;
 check_acl(Client, PubSub, Topic, [{M, State}|AclMods]) ->
     case M:check_acl({Client, PubSub, Topic}, State) of
-        allow -> allow;
-        deny  -> deny;
+        allow  -> allow;
+        deny   -> deny;
         ignore -> check_acl(Client, PubSub, Topic, AclMods)
     end.
 

+ 2 - 2
src/emqttd_access_rule.erl

@@ -114,9 +114,9 @@ match_who(#mqtt_client{client_id = ClientId}, {client, ClientId}) ->
     true;
 match_who(#mqtt_client{username = Username}, {user, Username}) ->
     true;
-match_who(#mqtt_client{ipaddress = undefined}, {ipaddr, _Tup}) ->
+match_who(#mqtt_client{peername = undefined}, {ipaddr, _Tup}) ->
     false;
-match_who(#mqtt_client{ipaddress = IP}, {ipaddr, {_CDIR, Start, End}}) ->
+match_who(#mqtt_client{peername = {IP, _}}, {ipaddr, {_CDIR, Start, End}}) ->
     I = esockd_access:atoi(IP),
     I >= Start andalso I =< End;
 match_who(_Client, _Who) ->

+ 1 - 0
src/emqttd_app.erl

@@ -70,6 +70,7 @@ print_vsn() ->
 start_servers(Sup) ->
     Servers = [{"emqttd trace", emqttd_trace},
                {"emqttd pooler", {supervisor, emqttd_pooler_sup}},
+               {"emqttd client manager", {supervisor, emqttd_cm_sup}},
                {"emqttd session manager", {supervisor, emqttd_sm_sup}},
                {"emqttd session supervisor", {supervisor, emqttd_session_sup}},
                {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}},

+ 2 - 2
src/emqttd_auth_clientid.erl

@@ -101,9 +101,9 @@ init(Opts) ->
 
 check(#mqtt_client{client_id = undefined}, _Password, []) ->
     {error, "ClientId undefined"};
-check(#mqtt_client{client_id = ClientId, ipaddress = IpAddress}, _Password, []) ->
+check(#mqtt_client{client_id = ClientId, peername = {IpAddress, _}}, _Password, []) ->
     check_clientid_only(ClientId, IpAddress);
-check(#mqtt_client{client_id = ClientId, ipaddress = IpAddress}, _Password, [{password, no}|_]) ->
+check(#mqtt_client{client_id = ClientId, peername = {IpAddress, _}}, _Password, [{password, no}|_]) ->
     check_clientid_only(ClientId, IpAddress);
 check(_Client, undefined, [{password, yes}|_]) ->
     {error, "Password undefined"};

+ 23 - 6
src/emqttd_bridge.erl

@@ -46,7 +46,8 @@
 -record(state, {node, subtopic,
                 qos,
                 topic_suffix       = <<>>,
-                topic_prefix       = <<>>,  
+                topic_prefix       = <<>>,
+                mqueue             = emqttd_mqueue:mqueue(),
                 max_queue_len      = 0,
                 ping_down_interval = ?PING_DOWN_INTERVAL,
                 status             = up}).
@@ -81,8 +82,11 @@ init([Node, SubTopic, Options]) ->
         true -> 
             true = erlang:monitor_node(Node, true),
             State = parse_opts(Options, #state{node = Node, subtopic = SubTopic}),
+            MQueue = emqttd_mqueue:new(qname(Node, SubTopic),
+                                       [{max_len, State#state.max_queue_len}],
+                                       emqttd_alarm:alarm_fun()),
             emqttd_pubsub:subscribe({SubTopic, State#state.qos}),
-            {ok, State};
+            {ok, State#state{mqueue = MQueue}};
         false -> 
             {stop, {cannot_connect, Node}}
     end.
@@ -100,15 +104,19 @@ parse_opts([{max_queue_len, Len} | Opts], State) ->
 parse_opts([{ping_down_interval, Interval} | Opts], State) ->
     parse_opts(Opts, State#state{ping_down_interval = Interval*1000}).
 
+qname(Node, SubTopic) when is_atom(Node) ->
+    qname(atom_to_list(Node), SubTopic);
+qname(Node, SubTopic) ->
+    list_to_binary(["Bridge:", Node, ":", SubTopic]).
+
 handle_call(_Request, _From, State) ->
     {reply, error, State}.
 
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
-handle_info({dispatch, Msg}, State = #state{node = Node, status = down}) ->
-    lager:error("Bridge Dropped Msg for ~p Down: ~s", [Node, emqttd_message:format(Msg)]),
-    {noreply, State};
+handle_info({dispatch, Msg}, State = #state{mqueue = MQ, status = down}) ->
+    {noreply, State#state{mqueue = emqttd_mqueue:in(Msg, MQ)}};
 
 handle_info({dispatch, Msg}, State = #state{node = Node, status = up}) ->
     rpc:cast(Node, emqttd_pubsub, publish, [transform(Msg, State)]),
@@ -124,7 +132,7 @@ handle_info({nodeup, Node}, State = #state{node = Node}) ->
     case emqttd:is_running(Node) of
         true -> 
             lager:warning("Bridge Node Up: ~p", [Node]),
-            {noreply, State#state{status = up}};
+            {noreply, dequeue(State#state{status = up})};
         false ->
             self() ! {nodedown, Node},
             {noreply, State#state{status = down}}
@@ -159,6 +167,15 @@ code_change(_OldVsn, State, _Extra) ->
 %%% Internal functions
 %%%=============================================================================
 
+dequeue(State = #state{mqueue = MQ}) ->
+    case emqttd_mqueue:out(MQ) of
+        {empty, MQ1} ->
+            State#state{mqueue = MQ1};
+        {{value, Msg}, MQ1} ->
+            handle_info({dispatch, Msg}, State),
+            dequeue(State#state{mqueue = MQ1})
+    end.
+
 transform(Msg = #mqtt_message{topic = Topic}, #state{topic_prefix = Prefix,
                                                      topic_suffix = Suffix}) ->
     Msg#mqtt_message{topic = <<Prefix/binary, Topic/binary, Suffix/binary>>}.

+ 0 - 1
src/emqttd_bridge_sup.erl

@@ -48,7 +48,6 @@
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
-%%TODO: bridges...
 -spec bridges() -> [{tuple(), pid()}].
 bridges() ->
     [{{Node, SubTopic}, Pid} || {{bridge, Node, SubTopic}, Pid, worker, _} 

+ 12 - 6
src/emqttd_broker.erl

@@ -58,7 +58,7 @@
 
 -define(BROKER_TAB, mqtt_broker).
 
--record(state, {started_at, sys_interval, tick_tref}).
+-record(state, {started_at, sys_interval, heartbeat, tick_tref}).
 
 %% $SYS Topics of Broker
 -define(SYSTOP_BROKERS, [
@@ -224,7 +224,9 @@ init([]) ->
     emqttd_pubsub:create(<<"$SYS/brokers">>),
     [ok = create_topic(Topic) || Topic <- ?SYSTOP_BROKERS],
     % Tick
-    {ok, #state{started_at = os:timestamp(), tick_tref = start_tick(tick)}, hibernate}.
+    {ok, #state{started_at = os:timestamp(),
+                heartbeat  = start_tick(1000, heartbeat),
+                tick_tref  = start_tick(tick)}, hibernate}.
 
 handle_call(uptime, _From, State) ->
     {reply, uptime(State), State};
@@ -260,18 +262,22 @@ handle_call(_Request, _From, State) ->
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
+handle_info(heartbeat, State) ->
+    publish(uptime, list_to_binary(uptime(State))),
+    publish(datetime, list_to_binary(datetime())),
+    {noreply, State, hibernate};
+
 handle_info(tick, State) ->
     retain(brokers),
-    retain(version, list_to_binary(version())),
+    retain(version,  list_to_binary(version())),
     retain(sysdescr, list_to_binary(sysdescr())),
-    publish(uptime, list_to_binary(uptime(State))),
-    publish(datetime, list_to_binary(datetime())),
     {noreply, State, hibernate};
 
 handle_info(_Info, State) ->
     {noreply, State}.
 
-terminate(_Reason, #state{tick_tref = TRef}) ->
+terminate(_Reason, #state{heartbeat = Hb, tick_tref = TRef}) ->
+    stop_tick(Hb),
     stop_tick(TRef).
 
 code_change(_OldVsn, State, _Extra) ->

+ 19 - 28
src/emqttd_client.erl

@@ -41,7 +41,7 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          code_change/3, terminate/2]).
 
-%%Client State...
+%% Client State...
 -record(state, {transport,
                 socket,
                 peername,
@@ -49,7 +49,7 @@
                 await_recv,
                 conn_state,
                 conserve,
-                parse_state,
+                parser,
                 proto_state,
                 packet_opts,
                 keepalive}).
@@ -57,18 +57,16 @@
 start_link(SockArgs, PktOpts) ->
     {ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, PktOpts]])}.
 
-%%TODO: rename?
 info(Pid) ->
-    gen_server:call(Pid, info).
+    gen_server:call(Pid, info, infinity).
 
 init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) ->
-    %transform if ssl.
+    % Transform if ssl.
     {ok, NewSock} = esockd_connection:accept(SockArgs),
     {ok, Peername} = emqttd_net:peername(Sock),
     {ok, ConnStr} = emqttd_net:connection_string(Sock, inbound),
     lager:info("Connect from ~s", [ConnStr]),
     SendFun = fun(Data) -> Transport:send(NewSock, Data) end,
-    ParserState = emqttd_parser:init(PacketOpts),
     ProtoState = emqttd_protocol:init(Peername, SendFun, PacketOpts),
     State = control_throttle(#state{transport    = Transport,
                                     socket       = NewSock,
@@ -78,17 +76,16 @@ init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) ->
                                     conn_state   = running,
                                     conserve     = false,
                                     packet_opts  = PacketOpts,
-                                    parse_state  = ParserState,
+                                    parser       = emqttd_parser:new(PacketOpts),
                                     proto_state  = ProtoState}),
     gen_server:enter_loop(?MODULE, [], State, 10000).
 
-%%TODO: Not enough...
-handle_call(info, _From, State = #state{conn_name=ConnName,
+handle_call(info, _From, State = #state{conn_name = ConnName,
                                         proto_state = ProtoState}) ->
     {reply, [{conn_name, ConnName} | emqttd_protocol:info(ProtoState)], State};
 
 handle_call(Req, _From, State = #state{peername = Peername}) ->
-    lager:critical("Client ~s: unexpected request - ~p",[emqttd_net:format(Peername), Req]),
+    lager:critical("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]),
     {reply, {error, unsupported_request}, State}.    
 
 handle_cast(Msg, State = #state{peername = Peername}) ->
@@ -100,8 +97,6 @@ handle_info(timeout, State) ->
     
 handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState,
                                                         conn_name=ConnName}) ->
-    %% need transfer data???
-    %% emqttd_client:transfer(NewPid, Data),
     lager:error("Shutdown for duplicate clientid: ~s, conn:~s", 
                 [emqttd_protocol:clientid(ProtoState), ConnName]), 
     stop({shutdown, duplicate_id}, State);
@@ -124,8 +119,7 @@ handle_info({inet_reply, _Ref, ok}, State) ->
 handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peername = Peername, socket = Sock}) ->
     lager:debug("RECV from ~s: ~p", [emqttd_net:format(Peername), Data]),
     emqttd_metrics:inc('bytes/received', size(Data)),
-    process_received_bytes(Data,
-                           control_throttle(State #state{await_recv = false}));
+    received(Data, control_throttle(State #state{await_recv = false}));
 
 handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
     network_error(Reason, State);
@@ -170,24 +164,22 @@ code_change(_OldVsn, State, _Extra) ->
 %-------------------------------------------------------
 % receive and parse tcp data
 %-------------------------------------------------------
-process_received_bytes(<<>>, State) ->
+received(<<>>, State) ->
     {noreply, State, hibernate};
 
-process_received_bytes(Bytes, State = #state{packet_opts = PacketOpts,
-                                             parse_state = ParseState,
-                                             proto_state = ProtoState,
-                                             conn_name   = ConnStr}) ->
-    case emqttd_parser:parse(Bytes, ParseState) of
-    {more, ParseState1} ->
-        {noreply,
-         control_throttle(State #state{parse_state = ParseState1}),
-         hibernate};
+received(Bytes, State = #state{packet_opts = PacketOpts,
+                               parser = Parser,
+                               proto_state = ProtoState,
+                               conn_name   = ConnStr}) ->
+    case Parser(Bytes) of
+    {more, NewParser} ->
+        {noreply, control_throttle(State #state{parser = NewParser}), hibernate};
     {ok, Packet, Rest} ->
         received_stats(Packet),
         case emqttd_protocol:received(Packet, ProtoState) of
         {ok, ProtoState1} ->
-            process_received_bytes(Rest, State#state{parse_state = emqttd_parser:init(PacketOpts),
-                                                     proto_state = ProtoState1});
+            received(Rest, State#state{parser = emqttd_parser:new(PacketOpts),
+                                       proto_state = ProtoState1});
         {error, Error} ->
             lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]),
             stop({shutdown, Error}, State);
@@ -201,7 +193,6 @@ process_received_bytes(Bytes, State = #state{packet_opts = PacketOpts,
         stop({shutdown, Error}, State)
     end.
 
-%%----------------------------------------------------------------------------
 network_error(Reason, State = #state{peername = Peername}) ->
     lager:warning("Client ~s: MQTT detected network error '~p'",
                     [emqttd_net:format(Peername), Reason]),
@@ -244,4 +235,4 @@ inc(?DISCONNECT) ->
     emqttd_metrics:inc('packets/disconnect');
 inc(_) ->
     ignore.
-    
+

+ 151 - 0
src/emqttd_cm.erl

@@ -0,0 +1,151 @@
+%%%-----------------------------------------------------------------------------
+%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
+%%%
+%%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%%% of this software and associated documentation files (the "Software"), to deal
+%%% in the Software without restriction, including without limitation the rights
+%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%%% copies of the Software, and to permit persons to whom the Software is
+%%% furnished to do so, subject to the following conditions:
+%%%
+%%% The above copyright notice and this permission notice shall be included in all
+%%% copies or substantial portions of the Software.
+%%%
+%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+%%% SOFTWARE.
+%%%-----------------------------------------------------------------------------
+%%% @doc
+%%% MQTT Client Manager
+%%%
+%%% @end
+%%%-----------------------------------------------------------------------------
+-module(emqttd_cm).
+
+-author("Feng Lee <feng@emqtt.io>").
+
+-include("emqttd.hrl").
+
+-behaviour(gen_server).
+
+-define(SERVER, ?MODULE).
+
+%% API Exports 
+-export([start_link/2, pool/0]).
+
+-export([lookup/1, register/1, unregister/1]).
+
+%% gen_server Function Exports
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-record(state, {id, statsfun}).
+
+-define(CM_POOL, cm_pool).
+
+%%%=============================================================================
+%%% API
+%%%=============================================================================
+
+%%------------------------------------------------------------------------------
+%% @doc Start client manager
+%% @end
+%%------------------------------------------------------------------------------
+-spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when
+        Id :: pos_integer(),
+        StatsFun :: fun().
+start_link(Id, StatsFun) ->
+    gen_server:start_link(?MODULE, [Id, StatsFun], []).
+
+pool() -> ?CM_POOL.
+
+%%------------------------------------------------------------------------------
+%% @doc Lookup client pid with clientId
+%% @end
+%%------------------------------------------------------------------------------
+-spec lookup(ClientId :: binary()) -> mqtt_client() | undefined.
+lookup(ClientId) when is_binary(ClientId) ->
+    case ets:lookup(mqtt_client, ClientId) of
+	[Client] -> Client;
+	[] -> undefined
+	end.
+
+%%------------------------------------------------------------------------------
+%% @doc Register clientId with pid.
+%% @end
+%%------------------------------------------------------------------------------
+-spec register(Client :: mqtt_client()) -> ok.
+register(Client = #mqtt_client{client_id = ClientId}) ->
+    CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId),
+    gen_server:cast(CmPid, {register, Client}).
+
+%%------------------------------------------------------------------------------
+%% @doc Unregister clientId with pid.
+%% @end
+%%------------------------------------------------------------------------------
+-spec unregister(ClientId :: binary()) -> ok.
+unregister(ClientId) when is_binary(ClientId) ->
+    CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId),
+    gen_server:cast(CmPid, {unregister, ClientId, self()}).
+
+%%%=============================================================================
+%%% gen_server callbacks
+%%%=============================================================================
+
+init([Id, StatsFun]) ->
+    gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}),
+    {ok, #state{id = Id, statsfun = StatsFun}}.
+
+handle_call(Req, _From, State) ->
+    lager:error("unexpected request: ~p", [Req]),
+    {reply, {error, badreq}, State}.
+
+handle_cast({register, Client = #mqtt_client{client_id = ClientId, client_pid = Pid}}, State) ->
+    lager:info("CM register ~s with ~p", [ClientId, Pid]),
+	case ets:lookup(mqtt_client, ClientId) of
+        [#mqtt_client{client_pid = Pid}] ->
+			lager:error("ClientId '~s' has been registered with ~p", [ClientId, Pid]),
+            ignore;
+		[#mqtt_client{client_pid = OldPid}] ->
+			lager:error("ClientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]);
+		[] -> 
+            ok
+	end,
+    ets:insert(mqtt_client, Client),
+    {noreply, setstats(State)};
+
+handle_cast({unregister, ClientId, Pid}, State) ->
+    lager:info("CM unregister ~s with ~p", [ClientId, Pid]),
+	case ets:lookup(mqtt_client, ClientId) of
+	[#mqtt_client{client_pid = Pid}] ->
+		ets:delete(mqtt_client, ClientId);
+	[_] -> 
+		ignore;
+	[] ->
+		lager:error("cannot find clientId '~s' with ~p", [ClientId, Pid])
+	end,
+	{noreply, setstats(State)};
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, #state{id = Id}) ->
+    gproc_pool:disconnect_worker(?CM_POOL, {?MODULE, Id}), ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%%=============================================================================
+%%% Internal functions
+%%%=============================================================================
+
+setstats(State = #state{statsfun = StatsFun}) ->
+    StatsFun(ets:info(mqtt_client, size)), State.
+

+ 59 - 0
src/emqttd_cm_sup.erl

@@ -0,0 +1,59 @@
+%%%-----------------------------------------------------------------------------
+%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
+%%%
+%%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%%% of this software and associated documentation files (the "Software"), to deal
+%%% in the Software without restriction, including without limitation the rights
+%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%%% copies of the Software, and to permit persons to whom the Software is
+%%% furnished to do so, subject to the following conditions:
+%%%
+%%% The above copyright notice and this permission notice shall be included in all
+%%% copies or substantial portions of the Software.
+%%%
+%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+%%% SOFTWARE.
+%%%-----------------------------------------------------------------------------
+%%% @doc
+%%% emqttd client manager supervisor.
+%%%
+%%% @end
+%%%-----------------------------------------------------------------------------
+-module(emqttd_cm_sup).
+
+-author("Feng Lee <feng@emqtt.io>").
+
+-include("emqttd.hrl").
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    ets:new(mqtt_client, [ordered_set, named_table, public,
+                          {keypos, 2}, {write_concurrency, true}]),
+    Schedulers = erlang:system_info(schedulers),
+    gproc_pool:new(emqttd_cm:pool(), hash, [{size, Schedulers}]),
+    StatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'),
+    Children = lists:map(
+                 fun(I) ->
+                    Name = {emqttd_cm, I},
+                    gproc_pool:add_worker(emqttd_cm:pool(), Name, I),
+                    {Name, {emqttd_cm, start_link, [I, StatsFun]},
+                                permanent, 10000, worker, [emqttd_cm]}
+                 end, lists:seq(1, Schedulers)),
+    {ok, {{one_for_all, 10, 100}, Children}}.
+
+

+ 2 - 2
src/emqttd_guid.erl

@@ -31,14 +31,14 @@
 %%% 1. Timestamp: erlang:system_time if Erlang >= R18, otherwise os:timestamp
 %%% 2. NodeId:    encode node() to 2 bytes integer
 %%% 3. Pid:       encode pid to 4 bytes integer
-%%% 4. Sequence:  2 bytes sequence per pid
+%%% 4. Sequence:  2 bytes sequence in one process
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
 
 -module(emqttd_guid).
 
--export([gen/0]).
+-export([gen/0, new/0]).
 
 -define(MAX_SEQ, 16#FFFF).
 

+ 3 - 1
src/emqttd_http.erl

@@ -24,6 +24,7 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_http).
 
 -author("Feng Lee <feng@emqtt.io>").
@@ -46,13 +47,14 @@ handle_request('POST', "/mqtt/publish", Req) ->
     lager:info("HTTP Publish: ~p", [Params]),
 	case authorized(Req) of
 	true ->
+        ClientId = get_value("client", Params, http),
         Qos = int(get_value("qos", Params, "0")),
         Retain = bool(get_value("retain", Params,  "0")),
         Topic = list_to_binary(get_value("topic", Params)),
         Payload = list_to_binary(get_value("message", Params)),
         case {validate(qos, Qos), validate(topic, Topic)} of
             {true, true} ->
-                Msg = emqttd_message:make(http, Qos, Topic, Payload),
+                Msg = emqttd_message:make(ClientId, Qos, Topic, Payload),
                 emqttd_pubsub:publish(Msg#mqtt_message{retain  = Retain}),
                 Req:ok({"text/plan", <<"ok\n">>});
            {false, _} ->

+ 11 - 4
src/emqttd_message.erl

@@ -24,6 +24,7 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_message).
 
 -author("Feng Lee <feng@emqtt.io>").
@@ -150,7 +151,6 @@ set_flag(retain, Msg = #mqtt_message{retain = false}) ->
     Msg#mqtt_message{retain = true};
 set_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
 
-
 %%------------------------------------------------------------------------------
 %% @doc Unset dup, retain flag
 %% @end
@@ -170,7 +170,14 @@ unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
 %% @doc Format MQTT Message
 %% @end
 %%------------------------------------------------------------------------------
-format(#mqtt_message{msgid=MsgId, pktid = PktId, from = From, qos=Qos, retain=Retain, dup=Dup, topic=Topic}) ->
-    io_lib:format("Message(MsgId=~p, PktId=~p, from=~s, Qos=~p, Retain=~s, Dup=~s, Topic=~s)",
-              [MsgId, PktId, From, Qos, Retain, Dup, Topic]).
+format(#mqtt_message{msgid=MsgId,
+                     pktid = PktId,
+                     from = From,
+                     qos=Qos,
+                     retain=Retain,
+                     dup=Dup,
+                     topic=Topic}) ->
+    io_lib:format("Message(MsgId=~p, PktId=~p, from=~s, "
+                    "Qos=~p, Retain=~s, Dup=~s, Topic=~s)",
+                      [MsgId, PktId, From, Qos, Retain, Dup, Topic]).
 

+ 0 - 2
src/emqttd_metrics.erl

@@ -78,7 +78,6 @@
     {counter, 'messages/received'},      % Messages received
     {counter, 'messages/sent'},          % Messages sent
     {gauge,   'messages/retained'},      % Messagea retained
-    {gauge,   'messages/stored/count'},  % Messages stored
     {counter, 'messages/dropped'}        % Messages dropped
 ]).
 
@@ -236,4 +235,3 @@ create_metric({counter, Name}) ->
 metric_topic(Metric) ->
     emqttd_topic:systop(list_to_binary(lists:concat(['metrics/', Metric]))).
 
-

+ 1 - 0
src/emqttd_mnesia.erl

@@ -24,6 +24,7 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_mnesia).
 
 -author('feng@emqtt.io').

+ 6 - 2
src/emqttd_mod_presence.erl

@@ -24,12 +24,15 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_mod_presence).
 
 -author("Feng Lee <feng@emqtt.io>").
 
 -include("emqttd.hrl").
 
+-behaviour(emqttd_gen_mod).
+
 -export([load/1, unload/1]).
 
 -export([client_connected/3, client_disconnected/3]).
@@ -41,7 +44,7 @@ load(Opts) ->
 
 client_connected(ConnAck, #mqtt_client{client_id  = ClientId,
                                        username   = Username,
-                                       ipaddress  = IpAddress,
+                                       peername   = {IpAddress, _},
                                        clean_sess = CleanSess,
                                        proto_ver  = ProtoVer}, Opts) ->
     Sess = case CleanSess of
@@ -80,7 +83,8 @@ topic(connected, ClientId) ->
 topic(disconnected, ClientId) ->
     emqttd_topic:systop(list_to_binary(["clients/", ClientId, "/disconnected"])).
 
-reason(Reason) when is_atom(Reason) -> Reason;
+reason(Reason) when is_atom(Reason)    -> Reason;
 reason({Error, _}) when is_atom(Error) -> Error;
 reason(_) -> internal_error.
 
+

+ 2 - 2
src/emqttd_mod_rewrite.erl

@@ -49,7 +49,7 @@ load(Opts) ->
                        {?MODULE, rewrite, [subscribe, Sections]}),
     emqttd_broker:hook('client.unsubscribe', {?MODULE, rewrite_unsubscribe},
                        {?MODULE, rewrite, [unsubscribe, Sections]}),
-    emqttd_broker:hook('client.publish', {?MODULE, rewrite_publish},
+    emqttd_broker:hook('message.publish', {?MODULE, rewrite_publish},
                        {?MODULE, rewrite, [publish, Sections]}).
 
 rewrite(_ClientId, TopicTable, subscribe, Sections) ->
@@ -85,7 +85,7 @@ reload(File) ->
 unload(_) ->
     emqttd_broker:unhook('client.subscribe', {?MODULE, rewrite_subscribe}),
     emqttd_broker:unhook('client.unsubscribe', {?MODULE, rewrite_unsubscribe}),
-    emqttd_broker:unhook('client.publish', {?MODULE, rewrite_publish}).
+    emqttd_broker:unhook('message.publish', {?MODULE, rewrite_publish}).
 
 %%%=============================================================================
 %%% Internal functions

+ 1 - 0
src/emqttd_mod_sup.erl

@@ -24,6 +24,7 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_mod_sup).
 
 -author("Feng Lee <feng@emqtt.io>").

+ 1 - 0
src/emqttd_packet.erl

@@ -24,6 +24,7 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_packet).
 
 -author("Feng Lee <feng@emqtt.io>").

+ 8 - 4
src/emqttd_parser.erl

@@ -33,21 +33,25 @@
 -include("emqttd_protocol.hrl").
 
 %% API
--export([init/1, parse/2]).
+-export([new/1, parse/2]).
 
 -record(mqtt_packet_limit, {max_packet_size}).
 
 -type option() :: {atom(),  any()}.
 
+-type parser() :: fun( (binary()) -> any() ).
+
 %%------------------------------------------------------------------------------
 %% @doc Initialize a parser
 %% @end
 %%------------------------------------------------------------------------------
--spec init(Opts :: [option()]) -> {none, #mqtt_packet_limit{}}.
-init(Opts) -> {none, limit(Opts)}.
+-spec new(Opts :: [option()]) -> parser().
+new(Opts) ->
+    fun(Bin) -> parse(Bin, {none, limit(Opts)}) end.
 
 limit(Opts) ->
-    #mqtt_packet_limit{max_packet_size = proplists:get_value(max_packet_size, Opts, ?MAX_LEN)}.
+    #mqtt_packet_limit{max_packet_size = 
+                        proplists:get_value(max_packet_size, Opts, ?MAX_LEN)}.
 
 %%------------------------------------------------------------------------------
 %% @doc Parse MQTT Packet

+ 1 - 0
src/emqttd_pooler_sup.erl

@@ -24,6 +24,7 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_pooler_sup).
 
 -author("Feng Lee <feng@emqtt.io>").

+ 87 - 60
src/emqttd_protocol.erl

@@ -24,6 +24,7 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_protocol).
 
 -author("Feng Lee <feng@emqtt.io>").
@@ -40,20 +41,20 @@
 -export([handle/2]).
 
 %% Protocol State
--record(proto_state, {
-        peername,
-        sendfun,
-        connected = false, %received CONNECT action?
-        proto_ver,
-        proto_name,
-        username,
-		client_id,
-		clean_sess,
-        session,    
-		will_msg,
-        max_clientid_len = ?MAX_CLIENTID_LEN,
-        client_pid
-}).
+-record(proto_state, {peername,
+                      sendfun,
+                      connected = false, %received CONNECT action?
+                      proto_ver,
+                      proto_name,
+                      username,
+                      client_id,
+                      clean_sess,
+                      session,
+                      will_msg,
+                      keepalive,
+                      max_clientid_len = ?MAX_CLIENTID_LEN,
+                      client_pid,
+                      connected_at}).
 
 -type proto_state() :: #proto_state{}.
 
@@ -61,6 +62,7 @@
 %% @doc Init protocol
 %% @end
 %%------------------------------------------------------------------------------
+
 init(Peername, SendFun, Opts) ->
     MaxLen = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
 	#proto_state{peername         = Peername,
@@ -68,32 +70,50 @@ init(Peername, SendFun, Opts) ->
                  max_clientid_len = MaxLen,
                  client_pid       = self()}.
 
-info(#proto_state{proto_ver    = ProtoVer,
+info(#proto_state{client_id	   = ClientId,
+                  username     = Username,
+                  peername     = Peername,
+                  proto_ver    = ProtoVer,
                   proto_name   = ProtoName,
-				  client_id	   = ClientId,
+                  keepalive    = KeepAlive,
 				  clean_sess   = CleanSess,
-				  will_msg	   = WillMsg}) ->
-	[{proto_ver,  ProtoVer},
+				  will_msg	   = WillMsg,
+                  connected_at = ConnectedAt}) ->
+    [{client_id, ClientId},
+     {username, Username},
+     {peername, Peername},
+     {proto_ver, ProtoVer},
      {proto_name, ProtoName},
-	 {client_id,   ClientId},
-	 {clean_sess, CleanSess},
-	 {will_msg,   WillMsg}].
+     {keepalive, KeepAlive},
+     {clean_sess, CleanSess},
+     {will_msg,	WillMsg},
+     {connected_at, ConnectedAt}].
 
 clientid(#proto_state{client_id = ClientId}) ->
     ClientId.
 
-client(#proto_state{peername = {Addr, _Port},
-                    client_id = ClientId,
-                    username = Username,
+client(#proto_state{client_id  = ClientId,
+                    peername   = Peername,
+                    username   = Username,
                     clean_sess = CleanSess,
                     proto_ver  = ProtoVer,
-                    client_pid = Pid}) ->
-    #mqtt_client{client_id   = ClientId,
+                    keepalive  = Keepalive,
+                    will_msg   = WillMsg,
+                    client_pid = Pid,
+                    connected_at = Time}) ->
+    WillTopic = if
+                    WillMsg =:= undefined -> undefined;
+                    true -> WillMsg#mqtt_message.topic
+                end,
+    #mqtt_client{client_id  = ClientId,
+                 client_pid = Pid,
                  username   = Username,
-                 ipaddress  = Addr,
+                 peername   = Peername,
                  clean_sess = CleanSess,
                  proto_ver  = ProtoVer,
-                 client_pid = Pid}.
+                 keepalive  = Keepalive,
+                 will_topic = WillTopic,
+                 connected_at = Time}.
 
 %% CONNECT – Client requests a connection to a Server
 
@@ -111,7 +131,7 @@ received(_Packet, State = #proto_state{connected = false}) ->
 
 received(Packet = ?PACKET(_Type), State) ->
     trace(recv, Packet, State),
-	case validate_packet(Packet) of	
+	case validate_packet(Packet) of
 	ok ->
         handle(Packet, State);
 	{error, Reason} ->
@@ -121,18 +141,21 @@ received(Packet = ?PACKET(_Type), State) ->
 handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}) ->
 
     #mqtt_packet_connect{proto_ver  = ProtoVer,
-                         proto_name = ProtoName, 
+                         proto_name = ProtoName,
                          username   = Username,
                          password   = Password,
                          clean_sess = CleanSess,
-                         keep_alive = KeepAlive,
-                         client_id   = ClientId} = Var,
+                         keep_alive  = KeepAlive,
+                         client_id  = ClientId} = Var,
 
     State1 = State0#proto_state{proto_ver  = ProtoVer,
-                                proto_name = ProtoName, 
+                                proto_name = ProtoName,
                                 username   = Username,
                                 client_id  = ClientId,
-                                clean_sess = CleanSess},
+                                clean_sess = CleanSess,
+                                keepalive  = KeepAlive,
+                                will_msg   = willmsg(Var),
+                                connected_at = os:timestamp()},
 
     trace(recv, Packet, State1),
 
@@ -142,16 +165,20 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}
             case emqttd_access_control:auth(client(State1), Password) of
                 ok ->
                     %% Generate clientId if null
-                    State2 = State1#proto_state{client_id = clientid(ClientId, State1)},
-
-                    %%Starting session
-                    {ok, Session} = emqttd_sm:start_session(CleanSess, clientid(State2)),
-
-                    %% Start keepalive
-                    start_keepalive(KeepAlive),
-
-                    %% ACCEPT
-                    {?CONNACK_ACCEPT, State2#proto_state{session = Session, will_msg = willmsg(Var)}};
+                    State2 = maybe_set_clientid(State1),
+
+                    %% Start session
+                    case emqttd_sm:start_session(CleanSess, clientid(State2)) of
+                        {ok, Session} ->
+                            %% Register the client
+                            emqttd_cm:register(client(State2)),
+                            %% Start keepalive
+                            start_keepalive(KeepAlive),
+                            %% ACCEPT
+                            {?CONNACK_ACCEPT, State2#proto_state{session = Session}};
+                        {error, Error} ->
+                            exit({shutdown, Error})
+                    end;
                 {error, Reason}->
                     lager:error("~s@~s: username '~s', login failed - ~s",
                                     [ClientId, emqttd_net:format(Peername), Username, Reason]),
@@ -177,8 +204,6 @@ handle(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload),
     end,
 	{ok, State};
 
-
-
 handle(?PUBACK_PACKET(?PUBACK, PacketId), State = #proto_state{session = Session}) ->
     emqttd_session:puback(Session, PacketId),
     {ok, State};
@@ -239,8 +264,7 @@ publish(Packet = ?PUBLISH(?QOS_1, PacketId), State = #proto_state{client_id = Cl
         ok ->
             send(?PUBACK_PACKET(?PUBACK, PacketId), State);
         {error, Error} ->
-            %%TODO: log format...
-            lager:error("Client ~s: publish qos1 error ~p", [ClientId, Error])
+            lager:error("Client ~s: publish qos1 error - ~p", [ClientId, Error])
     end;
 
 publish(Packet = ?PUBLISH(?QOS_2, PacketId), State = #proto_state{client_id = ClientId, session = Session}) ->
@@ -248,15 +272,15 @@ publish(Packet = ?PUBLISH(?QOS_2, PacketId), State = #proto_state{client_id = Cl
         ok ->
             send(?PUBACK_PACKET(?PUBREC, PacketId), State);
         {error, Error} ->
-            %%TODO: log format...
-            lager:error("Client ~s: publish qos2 error ~p", [ClientId, Error])
+            lager:error("Client ~s: publish qos2 error - ~p", [ClientId, Error])
     end.
 
 -spec send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}.
 send(Msg, State) when is_record(Msg, mqtt_message) ->
 	send(emqttd_message:to_packet(Msg), State);
 
-send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername}) when is_record(Packet, mqtt_packet) ->
+send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername})
+        when is_record(Packet, mqtt_packet) ->
     trace(send, Packet, State),
     sent_stats(Packet),
     Data = emqttd_serialiser:serialise(Packet),
@@ -280,26 +304,29 @@ redeliver({?PUBREL, PacketId}, State) ->
 shutdown(duplicate_id, _State) ->
     quiet; %%
 
-shutdown(_, #proto_state{client_id = undefined}) ->
+shutdown(Error, #proto_state{client_id = undefined}) ->
+    lager:info("Protocol shutdown ~p", [Error]),
     ignore;
 
 shutdown(Error, #proto_state{peername = Peername, client_id = ClientId, will_msg = WillMsg}) ->
 	lager:info([{client, ClientId}], "Client ~s@~s: shutdown ~p",
                    [ClientId, emqttd_net:format(Peername), Error]),
     send_willmsg(ClientId, WillMsg),
+    emqttd_cm:unregister(ClientId),
     emqttd_broker:foreach_hooks('client.disconnected', [Error, ClientId]).
 
 willmsg(Packet) when is_record(Packet, mqtt_packet_connect) ->
     emqttd_message:from_packet(Packet).
 
-%% generate a clientId
-clientid(undefined, State) ->
-    clientid(<<>>, State);
-%%TODO: <<>> is not right.
-clientid(<<>>, #proto_state{peername = Peername}) ->
-    {_, _, MicroSecs} = os:timestamp(),
-    iolist_to_binary(["emqttd_", base64:encode(emqttd_net:format(Peername)), integer_to_list(MicroSecs)]);
-clientid(ClientId, _State) -> ClientId.
+%% Generate a client if if nulll
+maybe_set_clientid(State = #proto_state{client_id = NullId})
+        when NullId =:= undefined orelse NullId =:= <<>> ->
+    {_, NPid, _} = emqttd_guid:new(),
+    ClientId = iolist_to_binary(["emqttd_", integer_to_list(NPid)]),
+    State#proto_state{client_id = ClientId};
+
+maybe_set_clientid(State) ->
+    State.
 
 send_willmsg(_ClientId, undefined) ->
     ignore;

+ 24 - 23
src/emqttd_pubsub.erl

@@ -39,17 +39,19 @@
 -boot_mnesia({mnesia, [boot]}).
 -copy_mnesia({mnesia, [copy]}).
 
--behaviour(gen_server).
-
 %% API Exports 
 -export([start_link/2]).
 
 -export([create/1,
          subscribe/1,
          unsubscribe/1,
-         publish/1,
-         %local node
-         dispatch/2, match/1]).
+         publish/1]).
+
+%% Local node
+-export([dispatch/2,
+         match/1]).
+
+-behaviour(gen_server).
 
 %% gen_server Function Exports
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -62,6 +64,7 @@
 %%%=============================================================================
 %%% Mnesia callbacks
 %%%=============================================================================
+
 mnesia(boot) ->
     %% p2p queue table
     ok = emqttd_mnesia:create_table(queue, [
@@ -111,6 +114,7 @@ start_link(Id, Opts) ->
 create(<<"$Q/", _Queue/binary>>) ->
     %% protecte from queue
     {error, cannot_create_queue};
+
 create(Topic) when is_binary(Topic) ->
     TopicR = #mqtt_topic{topic = Topic, node = node()},
     case mnesia:transaction(fun add_topic/1, [TopicR]) of
@@ -124,7 +128,8 @@ create(Topic) when is_binary(Topic) ->
 %% @doc Subscribe topic
 %% @end
 %%------------------------------------------------------------------------------
--spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} | {error, any()} when
+-spec subscribe({Topic, Qos} | list({Topic, Qos})) -> 
+    {ok, Qos | list(Qos)} | {error, any()} when
     Topic   :: binary(),
     Qos     :: mqtt_qos().
 subscribe({Topic, Qos}) when is_binary(Topic) andalso ?IS_QOS(Qos) ->
@@ -158,15 +163,14 @@ cast(Msg) ->
 %%------------------------------------------------------------------------------
 -spec publish(Msg :: mqtt_message()) -> ok.
 publish(#mqtt_message{from = From} = Msg) ->
-
     trace(publish, From, Msg),
-
-    Msg1 = #mqtt_message{topic = Topic} = emqttd_broker:foldl_hooks('client.publish', [], Msg),
+    Msg1 = #mqtt_message{topic = Topic}
+               = emqttd_broker:foldl_hooks('message.publish', [], Msg),
 
     %% Retain message first. Don't create retained topic.
     case emqttd_retained:retain(Msg1) of
         ok ->
-            %TODO: why unset 'retain' flag?
+            %% TODO: why unset 'retain' flag?
             publish(Topic, emqttd_message:unset_flag(Msg1));
         ignore ->
             publish(Topic, Msg1)
@@ -174,12 +178,12 @@ publish(#mqtt_message{from = From} = Msg) ->
 
 publish(<<"$Q/", _/binary>> = Queue, #mqtt_message{qos = Qos} = Msg) ->
     lists:foreach(
-        fun(#mqtt_queue{subpid = SubPid, qos = SubQos}) -> 
+        fun(#mqtt_queue{qpid = QPid, qos = SubQos}) -> 
             Msg1 = if
                 Qos > SubQos -> Msg#mqtt_message{qos = SubQos};
                 true -> Msg
             end,
-            SubPid ! {dispatch, Msg1}
+            QPid ! {dispatch, Msg1}
         end, mnesia:dirty_read(queue, Queue));
     
 publish(Topic, Msg) when is_binary(Topic) ->
@@ -197,7 +201,7 @@ publish(Topic, Msg) when is_binary(Topic) ->
 -spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer().
 dispatch(Topic, #mqtt_message{qos = Qos} = Msg ) when is_binary(Topic) ->
     Subscribers = mnesia:dirty_read(subscriber, Topic),
-    setstats(dropped, Subscribers =:= []), %%TODO:...
+    setstats(dropped, Subscribers =:= []),
     lists:foreach(
         fun(#mqtt_subscriber{subpid=SubPid, qos = SubQos}) ->
                 Msg1 = if
@@ -220,12 +224,11 @@ match(Topic) when is_binary(Topic) ->
 init([Id, _Opts]) ->
     process_flag(min_heap_size, 1024*1024),
     gproc_pool:connect_worker(pubsub, {?MODULE, Id}),
-    %%TODO: gb_trees to replace maps?
     {ok, #state{id = Id, submap = maps:new()}}.
 
 handle_call({subscribe, SubPid, Topics}, _From, State) ->
     TopicSubs = lists:map(fun({<<"$Q/", _/binary>> = Queue, Qos}) ->
-                            #mqtt_queue{name = Queue, subpid = SubPid, qos = Qos};
+                            #mqtt_queue{name = Queue, qpid = SubPid, qos = Qos};
                              ({Topic, Qos}) ->
                             {#mqtt_topic{topic = Topic, node = node()},
                              #mqtt_subscriber{topic = Topic, subpid = SubPid, qos = Qos}}
@@ -252,7 +255,7 @@ handle_call({subscribe, SubPid, <<"$Q/", _/binary>> = Queue, Qos}, _From, State)
         [OldQueueR] -> lager:error("Queue is overwrited by ~p: ~p", [SubPid, OldQueueR]);
         [] -> ok
     end,
-    QueueR = #mqtt_queue{name = Queue, subpid = SubPid, qos = Qos},
+    QueueR = #mqtt_queue{name = Queue, qpid = SubPid, qos = Qos},
     case mnesia:transaction(fun add_queue/1, [QueueR]) of
         {atomic, ok} ->
             setstats(queues),
@@ -279,7 +282,7 @@ handle_call(Req, _From, State) ->
 handle_cast({unsubscribe, SubPid, Topics}, State) when is_list(Topics) ->
 
     TopicSubs = lists:map(fun(<<"$Q/", _/binary>> = Queue) ->
-                                #mqtt_queue{name = Queue, subpid = SubPid};
+                                #mqtt_queue{name = Queue, qpid = SubPid};
                              (Topic) ->
                                 {#mqtt_topic{topic = Topic, node = node()},
                                  #mqtt_subscriber{topic = Topic, subpid = SubPid, _ = '_'}}
@@ -300,7 +303,7 @@ handle_cast({unsubscribe, SubPid, Topics}, State) when is_list(Topics) ->
     {noreply, State};
 
 handle_cast({unsubscribe, SubPid, <<"$Q/", _/binary>> = Queue}, State) ->
-    QueueR = #mqtt_queue{name = Queue, subpid = SubPid},
+    QueueR = #mqtt_queue{name = Queue, qpid = SubPid},
     case mnesia:transaction(fun remove_queue/1, [QueueR]) of
         {atomic, _} -> 
             setstats(queues);
@@ -329,7 +332,7 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMa
             Node = node(),
             F = fun() -> 
                     %% remove queue...
-                    Queues = mnesia:match_object(queue, #mqtt_queue{subpid = DownPid, _ = '_'}, write),
+                    Queues = mnesia:match_object(queue, #mqtt_queue{qpid = DownPid, _ = '_'}, write),
                     lists:foreach(fun(QueueR) ->
                                 mnesia:delete_object(queue, QueueR, write) 
                         end, Queues),
@@ -420,9 +423,9 @@ monitor_subscriber(SubPid, State = #state{submap = SubMap}) ->
     end,
     State#state{submap = NewSubMap}.
 
-remove_queue(#mqtt_queue{name = Name, subpid = Pid}) ->
+remove_queue(#mqtt_queue{name = Name, qpid = Pid}) ->
     case mnesia:wread({queue, Name}) of
-        [R = #mqtt_queue{subpid = Pid}] -> 
+        [R = #mqtt_queue{qpid = Pid}] -> 
             mnesia:delete(queue, R, write);
         _ ->
             ok
@@ -463,13 +466,11 @@ setstats(subscribers) ->
     emqttd_stats:setstats('subscribers/count', 'subscribers/max',
                            mnesia:table_info(subscriber, size)).
 
-%%TODO: queue dropped?
 setstats(dropped, false) ->
     ignore;
 setstats(dropped, true) ->
     emqttd_metrics:inc('messages/dropped').
 
-
 %%%=============================================================================
 %%% Trace functions
 %%%=============================================================================

+ 1 - 0
src/emqttd_pubsub_sup.erl

@@ -24,6 +24,7 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_pubsub_sup).
 
 -author("Feng Lee <feng@emqtt.io>").

+ 4 - 3
src/emqttd_retained.erl

@@ -24,6 +24,7 @@
 %%% 
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_retained).
 
 -author("Feng Lee <feng@emqtt.io>").
@@ -73,7 +74,7 @@ retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) ->
 retain(Msg = #mqtt_message{topic = Topic,
                            retain = true,
                            payload = Payload}) ->
-    TabSize = mnesia:table_info(message, size),
+    TabSize = mnesia:table_info(retained, size),
     case {TabSize < limit(table), size(Payload) < limit(payload)} of
         {true, true} ->
             Retained = #mqtt_retained{topic = Topic, message = Msg},
@@ -83,7 +84,7 @@ retain(Msg = #mqtt_message{topic = Topic,
        {false, _}->
             lager:error("Dropped retained message(topic=~s) for table is full!", [Topic]);
        {_, false}->
-            lager:error("Dropped retained message(topic=~s, payload=~p) for payload is too big!", [Topic, size(Payload)])
+            lager:error("Dropped retained message(topic=~s, payload_size=~p) for payload is too big!", [Topic, size(Payload)])
     end, ok.
 
 limit(table) ->
@@ -107,7 +108,7 @@ env() ->
 -spec dispatch(Topic, CPid) -> any() when
         Topic  :: binary(),
         CPid   :: pid().
-dispatch(Topic, CPid) when is_binary(Topic) andalso is_pid(CPid) ->
+dispatch(Topic, CPid) when is_binary(Topic) ->
     Msgs =
     case emqttd_topic:wildcard(Topic) of
         false ->

+ 1 - 0
src/emqttd_serialiser.erl

@@ -24,6 +24,7 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_serialiser).
 
 -author("Feng Lee <feng@emqtt.io>").

+ 159 - 75
src/emqttd_sm.erl

@@ -22,16 +22,6 @@
 %%% @doc
 %%% emqttd session manager.
 %%%
-%%% The Session state in the Server consists of:
-%%% The existence of a Session, even if the rest of the Session state is empty.
-%%% The Client’s subscriptions.
-%%% QoS 1 and QoS 2 messages which have been sent to the Client, but have not 
-%%% been completely acknowledged.
-%%% QoS 1 and QoS 2 messages pending transmission to the Client.
-%%% QoS 2 messages which have been received from the Client, but have not been
-%%% completely acknowledged.
-%%% Optionally, QoS 0 messages pending transmission to the Client.
-%%%
 %%% @end
 %%%-----------------------------------------------------------------------------
 
@@ -41,14 +31,18 @@
 
 -include("emqttd.hrl").
 
--behaviour(gen_server).
+%% Mnesia Callbacks
+-export([mnesia/1]).
+
+-boot_mnesia({mnesia, [boot]}).
+-copy_mnesia({mnesia, [copy]}).
 
 %% API Function Exports
--export([start_link/2, pool/0, table/0]).
+-export([start_link/2, pool/0]).
 
--export([lookup_session/1,
-         start_session/2,
-         destroy_session/1]).
+-export([start_session/2, lookup_session/1]).
+
+-behaviour(gen_server).
 
 %% gen_server Function Exports
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -58,7 +52,20 @@
 
 -define(SM_POOL, sm_pool).
 
--define(SESSION_TAB, mqtt_session).
+%%%=============================================================================
+%%% Mnesia callbacks
+%%%=============================================================================
+
+mnesia(boot) ->
+    ok = emqttd_mnesia:create_table(session, [
+                {type, ordered_set},
+                {ram_copies, [node()]},
+                {record_name, mqtt_session},
+                {attributes, record_info(fields, mqtt_session)},
+                {index, [sess_pid]}]);
+
+mnesia(copy) ->
+    ok = emqttd_mnesia:copy_table(session).
 
 %%%=============================================================================
 %%% API
@@ -69,8 +76,8 @@
 %% @end
 %%------------------------------------------------------------------------------
 -spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when
-        Id :: pos_integer(),
-        StatsFun :: {fun(), fun()}.
+        Id       :: pos_integer(),
+        StatsFun :: fun().
 start_link(Id, StatsFun) ->
     gen_server:start_link(?MODULE, [Id, StatsFun], []).
 
@@ -80,43 +87,27 @@ start_link(Id, StatsFun) ->
 %%------------------------------------------------------------------------------
 pool() -> ?SM_POOL.
 
-%%------------------------------------------------------------------------------
-%% @doc Table name.
-%% @end
-%%------------------------------------------------------------------------------
-table() -> ?SESSION_TAB.
-
 %%------------------------------------------------------------------------------
 %% @doc Start a session
 %% @end
 %%------------------------------------------------------------------------------
-
 -spec start_session(CleanSess :: boolean(), binary()) -> {ok, pid()} | {error, any()}.
 start_session(CleanSess, ClientId) ->
     SM = gproc_pool:pick_worker(?SM_POOL, ClientId),
     call(SM, {start_session, {CleanSess, ClientId, self()}}).
 
 %%------------------------------------------------------------------------------
-%% @doc Lookup Session Pid
+%% @doc Lookup a Session
 %% @end
 %%------------------------------------------------------------------------------
 -spec lookup_session(binary()) -> pid() | undefined.
 lookup_session(ClientId) ->
-    case ets:lookup(?SESSION_TAB, ClientId) of
-        [{_Clean, _, SessPid, _}] -> SessPid;
+    case mnesia:dirty_read(session, ClientId) of
+        [Session] -> Session;
         [] -> undefined
     end.
 
-%%------------------------------------------------------------------------------
-%% @doc Destroy a session
-%% @end
-%%------------------------------------------------------------------------------
--spec destroy_session(binary()) -> ok.
-destroy_session(ClientId) ->
-    SM = gproc_pool:pick_worker(?SM_POOL, ClientId),
-    call(SM, {destroy_session, ClientId}).
-
-call(SM, Req) -> gen_server:call(SM, Req).
+call(SM, Req) -> gen_server:call(SM, Req, infinity).
 
 %%%=============================================================================
 %%% gen_server callbacks
@@ -126,37 +117,28 @@ init([Id, StatsFun]) ->
     gproc_pool:connect_worker(?SM_POOL, {?MODULE, Id}),
     {ok, #state{id = Id, statsfun = StatsFun}}.
 
+%% persistent session
 handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) ->
-    Reply =
-    case ets:lookup(?SESSION_TAB, ClientId) of
-        [{_Clean, _, SessPid, _MRef}] ->
-            emqttd_session:resume(SessPid, ClientId, ClientPid),
-            {ok, SessPid};
-        [] ->
-            new_session(false, ClientId, ClientPid)
-    end,
-    {reply, Reply, setstats(State)};
+    case lookup_session(ClientId) of
+        undefined ->
+            %% create session locally
+            {reply, create_session(false, ClientId, ClientPid), State};
+        Session ->
+            {reply, resume_session(Session, ClientPid), State}
+    end;
 
 handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) ->
-    case ets:lookup(?SESSION_TAB, ClientId) of
-        [{_Clean, _, SessPid, MRef}] ->
-            erlang:demonitor(MRef, [flush]),
-            emqttd_session:destroy(SessPid, ClientId);
-        [] ->
-            ok
-    end,
-    {reply, new_session(true, ClientId, ClientPid), setstats(State)};
-
-handle_call({destroy_session, ClientId}, _From, State) ->
-    case ets:lookup(?SESSION_TAB, ClientId) of
-        [{_Clean, _, SessPid, MRef}] ->
-            emqttd_session:destroy(SessPid, ClientId),
-            erlang:demonitor(MRef, [flush]),
-            ets:delete(?SESSION_TAB, ClientId);
-        [] ->
-            ignore
-    end,
-    {reply, ok, setstats(State)};
+    case lookup_session(ClientId) of
+        undefined ->
+            {reply, create_session(true, ClientId, ClientPid), State};
+        Session ->
+            case destroy_session(Session) of
+                ok ->
+                    {reply, create_session(true, ClientId, ClientPid), State};
+                {error, Error} ->
+                    {reply, {error, Error}, State}
+            end
+    end;
 
 handle_call(_Request, _From, State) ->
     {reply, ok, State}.
@@ -164,8 +146,11 @@ handle_call(_Request, _From, State) ->
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
-handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
-	ets:match_delete(?SESSION_TAB, {'_', '_', DownPid, MRef}),
+handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) ->
+    mnesia:transaction(fun() ->
+                [mnesia:delete_object(session, Sess, write) || Sess 
+                    <- mnesia:index_read(session, DownPid, #mqtt_session.sess_pid)]
+        end),
     {noreply, setstats(State)};
 
 handle_info(_Info, State) ->
@@ -181,17 +166,116 @@ code_change(_OldVsn, State, _Extra) ->
 %%% Internal functions
 %%%=============================================================================
 
-new_session(CleanSess, ClientId, ClientPid) ->
+create_session(CleanSess, ClientId, ClientPid) ->
     case emqttd_session_sup:start_session(CleanSess, ClientId, ClientPid) of
         {ok, SessPid} ->
-            MRef = erlang:monitor(process, SessPid),
-            ets:insert(?SESSION_TAB, {CleanSess, ClientId, SessPid, MRef}),
-            {ok, SessPid};
+            Session = #mqtt_session{client_id  = ClientId,
+                                    sess_pid   = SessPid,
+                                    persistent = not CleanSess,
+                                    on_node    = node()},
+            case insert_session(Session) of
+                {aborted, {conflict, Node}} ->
+                    %% conflict with othe node?
+                    lager:critical("Session ~s conflict with node ~p!", [ClientId, Node]),
+                    {error, conflict};
+                {atomic, ok} ->
+                    erlang:monitor(process, SessPid),
+                    {ok, SessPid}
+            end;
         {error, Error} ->
             {error, Error}
     end.
 
-setstats(State = #state{statsfun = {CFun, SFun}}) ->
-    CFun(ets:info(?SESSION_TAB, size)),
-    SFun(ets:select_count(?SESSION_TAB,  [{{false, '_', '_', '_'}, [], [true]}])),
+insert_session(Session = #mqtt_session{client_id = ClientId}) ->
+    mnesia:transaction(fun() ->
+                case mnesia:wread({session, ClientId}) of
+                    [] ->
+                        mnesia:write(session, Session, write);
+                    [#mqtt_session{on_node = Node}] ->
+                        mnesia:abort({conflict, Node})
+                end
+        end).
+
+%% local node
+resume_session(#mqtt_session{client_id = ClientId,
+                             sess_pid  = SessPid,
+                             on_node   = Node}, ClientPid)
+        when Node =:= node() ->
+    case is_process_alive(SessPid) of 
+        true ->
+            emqttd_session:resume(SessPid, ClientId, ClientPid),
+            {ok, SessPid};
+        false ->
+            lager:critical("Session ~s@~p died unexpectedly!", [ClientId, SessPid]),
+            {error, session_died}
+    end;
+
+%% remote node
+resume_session(Session = #mqtt_session{client_id = ClientId,
+                                       sess_pid = SessPid,
+                                       on_node = Node}, ClientPid) ->
+    case emqttd:is_running(Node) of
+        true ->
+            case rpc:call(Node, emqttd_session, resume, [SessPid, ClientId, ClientPid]) of
+                ok ->
+                    {ok, SessPid};
+                {badrpc, Reason} ->
+                    lager:critical("Resume session ~s on remote node ~p failed for ~p",
+                                    [ClientId, Node, Reason]),
+                    {error, list_to_atom("session_" ++ atom_to_list(Reason))}
+            end;
+        false ->
+            lager:critical("Session ~s died for node ~p down!", [ClientId, Node]),
+            remove_session(Session),
+            {error, session_node_down}
+    end.
+
+%% local node
+destroy_session(Session = #mqtt_session{client_id = ClientId,
+                                        sess_pid  = SessPid,
+                                        on_node   = Node}) when Node =:= node() ->
+    case is_process_alive(SessPid) of
+        true ->
+            emqttd_session:destroy(SessPid, ClientId);
+        false ->
+            lager:critical("Session ~s@~p died unexpectedly!", [ClientId, SessPid])
+    end,
+    case remove_session(Session) of
+        {atomic, ok} -> ok;
+        {aborted, Error} -> {error, Error}
+    end;
+
+%% remote node
+destroy_session(Session = #mqtt_session{client_id = ClientId,
+                                        sess_pid  = SessPid,
+                                        on_node   = Node}) ->
+    case emqttd:is_running(Node) of
+        true ->
+            case rpc:call(Node, emqttd_session, destroy, [SessPid, ClientId]) of
+                ok ->
+                    case remove_session(Session) of
+                        {atomic, ok} -> ok;
+                        {aborted, Error} -> {error, Error}
+                    end;
+                {badrpc, Reason} ->
+                    lager:critical("Destroy session ~s on remote node ~p failed for ~p",
+                                    [ClientId, Node, Reason]),
+                    {error, list_to_atom("session_" ++ atom_to_list(Reason))}
+             end;
+        false ->
+            lager:error("Session ~s died for node ~p down!", [ClientId, Node]),
+            case remove_session(Session) of
+                {atomic, ok} -> ok;
+                {aborted, Error} -> {error, Error}
+            end
+    end.
+
+remove_session(Session) ->
+    mnesia:transaction(fun() ->
+            mnesia:delete_object(session, Session, write)
+        end).
+
+setstats(State = #state{statsfun = _StatsFun}) ->
     State.
+
+

+ 2 - 7
src/emqttd_sm_sup.erl

@@ -43,20 +43,15 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
-    ets:new(emqttd_sm:table(), [set, named_table, public, {keypos, 2},
-                                {write_concurrency, true}]),
     Schedulers = erlang:system_info(schedulers),
     gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]),
+    StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
     Children = lists:map(
                  fun(I) ->
                     Name = {emqttd_sm, I},
                     gproc_pool:add_worker(emqttd_sm:pool(), Name, I),
-                    {Name, {emqttd_sm, start_link, [I, statsfun()]},
+                    {Name, {emqttd_sm, start_link, [I, StatsFun]},
                                 permanent, 10000, worker, [emqttd_sm]}
                  end, lists:seq(1, Schedulers)),
     {ok, {{one_for_all, 10, 100}, Children}}.
 
-statsfun() ->
-    {emqttd_stats:statsfun('clients/count', 'clients/max'),
-     emqttd_stats:statsfun('sessions/count', 'sessions/max')}.
-

+ 1 - 1
src/emqttd_throttle.erl

@@ -28,5 +28,5 @@
 
 -author("Feng Lee <feng@emqtt.io>").
 
-%% TODO:... 0.9.0...
+%% TODO:... 0.10.0...
 

+ 1 - 0
src/emqttd_topic.erl

@@ -24,6 +24,7 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_topic).
 
 -author("Feng Lee <feng@emqtt.io>").

+ 0 - 1
src/emqttd_trace.erl

@@ -52,7 +52,6 @@
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
-
 %%------------------------------------------------------------------------------
 %% @doc Start to trace client or topic.
 %% @end

+ 5 - 0
src/emqttd_vm.erl

@@ -27,6 +27,8 @@
 
 -module(emqttd_vm).
 
+-export([schedulers/0]).
+
 -export([microsecs/0]).
 
 -export([loads/0, scheduler_usage/1]).
@@ -164,6 +166,9 @@
                       sndbuf,
                       tos]).
 
+schedulers() ->
+    erlang:system_info(schedulers).
+
 microsecs() ->
     {Mega, Sec, Micro} = erlang:now(),
     (Mega * 1000000 + Sec) * 1000000 + Micro.

+ 9 - 9
src/emqttd_ws_client.erl

@@ -43,7 +43,7 @@
          terminate/2, code_change/3]).
 
 %% WebSocket Loop State
--record(wsocket_state, {request, client_pid, packet_opts, parser_state}).
+-record(wsocket_state, {request, client_pid, packet_opts, parser}).
 
 %% Client State
 -record(client_state, {ws_pid, request, proto_state, keepalive}).
@@ -59,7 +59,7 @@ start_link(Req) ->
     ReentryWs(#wsocket_state{request      = Req,
                              client_pid   = ClientPid,
                              packet_opts  = PktOpts,
-                             parser_state = emqttd_parser:init(PktOpts)}).
+                             parser       = emqttd_parser:new(PktOpts)}).
 
 %%------------------------------------------------------------------------------
 %% @private
@@ -77,14 +77,14 @@ ws_loop(<<>>, State, _ReplyChannel) ->
     State;
 ws_loop([<<>>], State, _ReplyChannel) ->
     State;
-ws_loop(Data, State = #wsocket_state{request = Req,
+ws_loop(Data, State = #wsocket_state{request    = Req,
                                      client_pid = ClientPid,
-                                     parser_state = ParserState}, ReplyChannel) ->
+                                     parser     = Parser}, ReplyChannel) ->
     Peer = Req:get(peer),
     lager:debug("RECV from ~s(WebSocket): ~p", [Peer, Data]),
-    case emqttd_parser:parse(iolist_to_binary(Data), ParserState) of
-    {more, ParserState1} ->
-        State#wsocket_state{parser_state = ParserState1};
+    case Parser(iolist_to_binary(Data)) of
+    {more, NewParser} ->
+        State#wsocket_state{parser = NewParser};
     {ok, Packet, Rest} ->
         gen_server:cast(ClientPid, {received, Packet}),
         ws_loop(Rest, reset_parser(State), ReplyChannel);
@@ -93,8 +93,8 @@ ws_loop(Data, State = #wsocket_state{request = Req,
         exit({shutdown, Error})
     end.
 
-reset_parser(State = #wsocket_state{packet_opts  = PktOpts}) ->
-    State#wsocket_state{parser_state = emqttd_parser:init(PktOpts)}.
+reset_parser(State = #wsocket_state{packet_opts = PktOpts}) ->
+    State#wsocket_state{parser = emqttd_parser:new (PktOpts)}.
 
 %%%=============================================================================
 %%% gen_fsm callbacks