Feng Lee 10 лет назад
Родитель
Сommit
a0f90b3ac6

+ 1 - 1
apps/emqtt/include/emqtt.hrl

@@ -57,7 +57,7 @@
 
 -record(mqtt_message, {
     topic           :: binary(),                 %% topic published to
-    from            :: mqtt_clientid() | atom(), %% from clientid
+    from            :: binary() | atom(), %% from clientid
     qos    = ?QOS_0 :: mqtt_qos(),
     retain = false  :: boolean(),
     dup    = false  :: boolean(),

+ 1 - 1
apps/emqtt/include/emqtt_packet.hrl

@@ -162,7 +162,7 @@
     #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}, variable = Var}).
 
 -define(CONNACK_PACKET(ReturnCode),
-    #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
+    #mqtt_packet{header   = #mqtt_packet_header{type = ?CONNACK},
                  variable = #mqtt_packet_connack{return_code = ReturnCode}}).
 
 -define(PUBLISH_PACKET(Qos, Topic, PacketId, Payload),

+ 4 - 1
apps/emqtt/src/emqtt_message.erl

@@ -32,7 +32,7 @@
 
 -include("emqtt_packet.hrl").
 
--export([from_packet/1, to_packet/1]).
+-export([from_packet/1, from_packet/2, to_packet/1]).
 
 -export([set_flag/1, set_flag/2, unset_flag/1, unset_flag/2]).
 
@@ -70,6 +70,9 @@ from_packet(#mqtt_packet_connect{will_retain = Retain,
                   dup     = false,
                   payload = Msg}.
 
+from_packet(ClientId, Packet) ->
+    Msg = from_packet(Packet), Msg#mqtt_message{from = ClientId}.
+
 %%------------------------------------------------------------------------------
 %% @doc Message to packet
 %% @end

+ 2 - 2
apps/emqttd/src/emqttd_bridge.erl

@@ -106,11 +106,11 @@ handle_call(_Request, _From, State) ->
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
-handle_info({dispatch, {_From, Msg}}, State = #state{node = Node, status = down}) ->
+handle_info({dispatch, Msg}, State = #state{node = Node, status = down}) ->
     lager:warning("Bridge Dropped Msg for ~p Down:~n~p", [Node, Msg]),
     {noreply, State};
 
-handle_info({dispatch, {_From, Msg}}, State = #state{node = Node, status = up}) ->
+handle_info({dispatch, Msg}, State = #state{node = Node, status = up}) ->
     rpc:cast(Node, emqttd_pubsub, publish, [transform(Msg, State)]),
     {noreply, State};
 

+ 2 - 10
apps/emqttd/src/emqttd_client.erl

@@ -106,16 +106,8 @@ handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState
                 [emqttd_protocol:clientid(ProtoState), ConnName]), 
     stop({shutdown, duplicate_id}, State);
 
-%%TODO: ok??
-handle_info({dispatch, {From, Messages}}, #state{proto_state = ProtoState} = State) when is_list(Messages) ->
-    ProtoState1 =
-    lists:foldl(fun(Message, PState) ->
-            {ok, PState1} = emqttd_protocol:send({From, Message}, PState), PState1
-        end, ProtoState, Messages),
-    {noreply, State#state{proto_state = ProtoState1}};
-
-handle_info({dispatch, {From, Message}}, #state{proto_state = ProtoState} = State) ->
-    {ok, ProtoState1} = emqttd_protocol:send({From, Message}, ProtoState),
+handle_info({deliver, Message}, #state{proto_state = ProtoState} = State) ->
+    {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState),
     {noreply, State#state{proto_state = ProtoState1}};
 
 handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = State) ->

+ 26 - 21
apps/emqttd/src/emqttd_mqwin.erl

@@ -25,42 +25,47 @@
 %%% @end
 %%%-----------------------------------------------------------------------------
 
--module(emqttd_mqwin).
+-module(emqttd_inflight).
 
 -author("Feng Lee <feng@emqtt.io>").
 
--export([new/2, len/1, in/2, ack/2]).
+-include_lib("emqtt/include/emqtt.hrl").
 
--define(WIN_SIZE, 100).
+-export([new/2, is_full/1, len/1, in/2, ack/2]).
 
--record(mqwin, {name,
-                w     = [], %% window list
-                len   = 0,  %% current window len
-                size  = ?WIN_SIZE}).
+-define(MAX_SIZE, 100).
 
--type mqwin() :: #mqwin{}.
+-record(inflight, {name, q = [], len = 0, size = ?MAX_SIZE}).
 
--export_type([mqwin/0]).
+-type inflight() :: #inflight{}.
 
-new(Name, Opts) ->
-    WinSize = emqttd_opts:g(inflight_window, Opts, ?WIN_SIZE),
-    #mqwin{name = Name, size = WinSize}.
+-export_type([inflight/0]).
 
-len(#mqwin{len = Len}) ->
+new(Name, Max) ->
+    #inflight{name = Name, size = Max}.
+
+is_full(#inflight{size = 0}) ->
+    false;
+is_full(#inflight{len = Len, size = Size}) when Len < Size ->
+    false;
+is_full(_Inflight) ->
+    true.
+
+len(#inflight{len = Len}) ->
     Len.
 
-in(_Msg, #mqwin{len = Len, size = Size})
+in(_Msg, #inflight{len = Len, size = Size})
     when Len =:= Size -> {error, full};
 
-in(Msg, Win = #mqwin{w = W, len = Len}) ->
-    {ok, Win#mqwin{w = [Msg|W], len = Len +1}}.
+in(Msg = #mqtt_message{msgid = MsgId}, Inflight = #inflight{q = Q, len = Len}) ->
+    {ok, Inflight#inflight{q = [{MsgId, Msg}|Q], len = Len +1}}.
     
-ack(MsgId, QWin = #mqwin{w = W, len = Len}) ->
-    case lists:keyfind(MsgId, 2, W) of
+ack(MsgId, Inflight = #inflight{q = Q, len = Len}) ->
+    case lists:keyfind(MsgId, 1, Q) of
         false ->
-            lager:error("qwin(~s) cannot find msgid: ~p", [MsgId]), QWin;
+            lager:error("Inflight(~s) cannot find msgid: ~p", [MsgId]),
+            Inflight;
         _Msg ->
-            QWin#mqwin{w = lists:keydelete(MsgId, 2, W), len = Len - 1}
+            Inflight#inflight{q = lists:keydelete(MsgId, 1, Q), len = Len - 1}
     end.
 
-

+ 14 - 6
apps/emqttd/src/emqttd_mqueue.erl

@@ -56,7 +56,7 @@
 
 -export([new/2, name/1,
          is_empty/1, is_full/1,
-         len/1, in/2, out/2]).
+         len/1, in/2, out/1]).
 
 -define(LOW_WM, 0.2).
 
@@ -112,22 +112,30 @@ len(#mqueue{len = Len}) -> Len.
 %% @doc Queue one message.
 %% @end
 %%------------------------------------------------------------------------------
--spec in(mqtt_message(), mqueue()) -> mqueue().
+
+-spec in({new | old, mqtt_message()}, mqueue()) -> mqueue().
 
 %% drop qos0
-in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) ->
+in({_, #mqtt_message{qos = ?QOS_0}}, MQ = #mqueue{qos0 = false}) ->
     MQ;
 
 %% simply drop the oldest one if queue is full, improve later
-in(Msg, MQ = #mqueue{name = Name, len = Len, max_len = MaxLen})
+in({new, Msg}, MQ = #mqueue{name = Name, q = Q, len = Len, max_len = MaxLen})
     when Len =:= MaxLen ->
     {{value, OldMsg}, Q2} = queue:out(Q),
     lager:error("queue(~s) drop message: ~p", [Name, OldMsg]),
     MQ#mqueue{q = queue:in(Msg, Q2)};
 
-in(Msg, MQ = #mqueue{q = Q, len = Len}) ->
+in({old, Msg}, MQ = #mqueue{name = Name, len = Len, max_len = MaxLen})
+    when Len =:= MaxLen ->
+    lager:error("queue(~s) drop message: ~p", [Name, Msg]), MQ;
+
+in({new, Msg}, MQ = #mqueue{q = Q, len = Len}) ->
     maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1});
 
+in({old, Msg}, MQ = #mqueue{q = Q, len = Len}) ->
+    MQ#mqueue{q = queue:in_r(Msg, Q), len = Len + 1}.
+
 out(MQ = #mqueue{len = 0}) ->
     {empty, MQ};
 
@@ -143,7 +151,7 @@ maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm = f
 maybe_set_alarm(MQ) ->
     MQ.
 
-maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_watermark = LowWM, alarm = true})
+maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_wm = LowWM, alarm = true})
     when Len =< LowWM ->
     emqttd_alarm:clear_alarm({queue_high_watermark, Name}),
     MQ#mqueue{alarm = false};

+ 2 - 2
apps/emqttd/src/emqttd_msg_store.erl

@@ -123,7 +123,7 @@ redeliver(Topic, CPid) when is_binary(Topic) andalso is_pid(CPid) ->
 dispatch(_CPid, []) ->
     ignore;
 dispatch(CPid, Msgs) when is_list(Msgs) ->
-    CPid ! {dispatch, {self(), [Msg || Msg <- Msgs]}};
+    CPid ! {dispatch, [Msg || Msg <- Msgs]};
 dispatch(CPid, Msg) when is_record(Msg, mqtt_message) ->
-    CPid ! {dispatch, {self(), Msg}}.
+    CPid ! {dispatch, Msg}.
 

+ 38 - 47
apps/emqttd/src/emqttd_protocol.erl

@@ -51,8 +51,7 @@
         username,
 		clientid,
 		clean_sess,
-        sessmod,
-        session,    %% session state or session pid
+        session,    
 		will_msg,
         max_clientid_len = ?MAX_CLIENTID_LEN,
         client_pid
@@ -152,13 +151,13 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}
                     emqttd_cm:register(client(State2)),
 
                     %%Starting session
-                    {ok, SessMod, Session} = emqttd_sm:start_session(CleanSess, clientid(State2)),
+                    {ok, Session} = emqttd_sm:start_session(CleanSess, clientid(State2)),
 
                     %% Start keepalive
                     start_keepalive(KeepAlive),
 
                     %% ACCEPT
-                    {?CONNACK_ACCEPT, State2#proto_state{sessmod = SessMod, session = Session, will_msg = willmsg(Var)}};
+                    {?CONNACK_ACCEPT, State2#proto_state{session = Session, will_msg = willmsg(Var)}};
                 {error, Reason}->
                     lager:error("~s@~s: username '~s', login failed - ~s",
                                     [ClientId, emqttd_net:format(Peername), Username, Reason]),
@@ -177,7 +176,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload),
        State = #proto_state{clientid = ClientId, session = Session}) ->
     case check_acl(publish, Topic, State) of
         allow -> 
-            do_publish(Session, ClientId, ?QOS_0, Packet);
+            do_publish(Session, ClientId, Packet);
         deny -> 
             lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic])
     end,
@@ -187,7 +186,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload),
          State = #proto_state{clientid = ClientId, session = Session}) ->
     case check_acl(publish, Topic, State) of
         allow -> 
-            do_publish(Session, ClientId, ?QOS_1, Packet),
+            do_publish(Session, ClientId, Packet),
             send(?PUBACK_PACKET(?PUBACK, PacketId), State);
         deny -> 
             lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]),
@@ -198,26 +197,28 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload),
          State = #proto_state{clientid = ClientId, session = Session}) ->
     case check_acl(publish, Topic, State) of
         allow -> 
-            NewSession = do_publish(Session, ClientId, ?QOS_2, Packet), 
-            send(?PUBACK_PACKET(?PUBREC, PacketId), State#proto_state{session = NewSession});
+            do_publish(Session, ClientId, Packet), 
+            send(?PUBACK_PACKET(?PUBREC, PacketId), State);
         deny -> 
             lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]),
             {ok, State}
     end;
 
-handle(?PUBACK_PACKET(Type, PacketId), State = #proto_state{session = Session}) 
-    when Type >= ?PUBACK andalso Type =< ?PUBCOMP ->
-    NewSession = emqttd_session:puback(Session, {Type, PacketId}),
-    NewState = State#proto_state{session = NewSession},
-    if 
-        Type =:= ?PUBREC ->
-            send(?PUBREL_PACKET(PacketId), NewState);
-        Type =:= ?PUBREL ->
-            send(?PUBACK_PACKET(?PUBCOMP, PacketId), NewState);
-        true ->
-            ok
-    end,
-	{ok, NewState};
+handle(?PUBACK_PACKET(?PUBACK, PacketId), State = #proto_state{session = Session}) ->
+    emqttd_session:puback(Session, PacketId),
+    {ok, State};
+
+handle(?PUBACK_PACKET(?PUBREC, PacketId), State = #proto_state{session = Session}) ->
+    emqttd_session:pubrec(Session, PacketId),
+    send(?PUBREL_PACKET(PacketId), State);
+
+handle(?PUBACK_PACKET(?PUBREL, PacketId), State = #proto_state{session = Session}) ->
+    emqttd_session:pubrel(Session, PacketId),
+    send(?PUBACK_PACKET(?PUBCOMP, PacketId), State);
+
+handle(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Session}) ->
+    emqttd_session:pubcomp(Session, PacketId),
+    {ok, State};
 
 %% protect from empty topic list
 handle(?SUBSCRIBE_PACKET(PacketId, []), State) ->
@@ -233,13 +234,13 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid =
         false ->
             TopicTable1 = emqttd_broker:foldl_hooks(client_subscribe, [], TopicTable),
             %%TODO: GrantedQos should be renamed.
-            {ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable1),
-            send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession})
+            {ok, GrantedQos} = emqttd_session:subscribe(Session, TopicTable1),
+            send(?SUBACK_PACKET(PacketId, GrantedQos), State)
     end;
 
-handle({subscribe, Topic, Qos}, State = #proto_state{session = Session}) ->
-    {ok, NewSession, _GrantedQos} = emqttd_session:subscribe(Session, [{Topic, Qos}]),
-    {ok, State#proto_state{session = NewSession}};
+handle({subscribe, TopicTable}, State = #proto_state{session = Session}) ->
+    {ok, _GrantedQos} = emqttd_session:subscribe(Session, TopicTable),
+    {ok, State};
 
 %% protect from empty topic list
 handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
@@ -247,34 +248,24 @@ handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
 
 handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
     Topics1 = emqttd_broker:foldl_hooks(client_unsubscribe, [], Topics),
-    {ok, NewSession} = emqttd_session:unsubscribe(Session, Topics1),
-    send(?UNSUBACK_PACKET(PacketId), State#proto_state{session = NewSession});
+    ok = emqttd_session:unsubscribe(Session, Topics1),
+    send(?UNSUBACK_PACKET(PacketId), State);
 
 handle(?PACKET(?PINGREQ), State) ->
     send(?PACKET(?PINGRESP), State);
 
 handle(?PACKET(?DISCONNECT), State) ->
-    %%TODO: how to handle session?
     % clean willmsg
     {stop, normal, State#proto_state{will_msg = undefined}}.
 
-do_publish(Session, ClientId, Qos, Packet) ->
-    Message = emqttd_broker:foldl_hooks(client_publish, [], emqtt_message:from_packet(Packet)),
-    emqttd_session:publish(Session, ClientId, {Qos, Message}).
-
--spec send({pid() | tuple(), mqtt_message()} | mqtt_packet(), proto_state()) -> {ok, proto_state()}.
-%% qos0 message
-send({_From, Message = #mqtt_message{qos = ?QOS_0}}, State) ->
-	send(emqtt_message:to_packet(Message), State);
+do_publish(Session, ClientId, Packet) ->
+    Msg = emqtt_message:from_packet(ClientId, Packet),
+    Msg1 = emqttd_broker:foldl_hooks(client_publish, [], Msg),
+    emqttd_session:publish(Session, Msg1).
 
-%% message from session
-send({_From = SessPid, Message}, State = #proto_state{session = SessPid}) when is_pid(SessPid) ->
-	send(emqtt_message:to_packet(Message), State);
-%% message(qos1, qos2) not from session
-send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session = Session}) 
-    when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) ->
-    {Message1, NewSession} = emqttd_session:await_ack(Session, Message),
-	send(emqtt_message:to_packet(Message1), State#proto_state{session = NewSession});
+-spec send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}.
+send(Msg, State) when is_record(Msg, mqtt_message) ->
+	send(emqtt_message:to_packet(Msg), State);
 
 send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername}) when is_record(Packet, mqtt_packet) ->
     trace(send, Packet, State),
@@ -331,8 +322,8 @@ clientid(ClientId, _State) -> ClientId.
 send_willmsg(_ClientId, undefined) ->
     ignore;
 %%TODO:should call session...
-send_willmsg(ClientId, WillMsg) -> 
-    emqttd_pubsub:publish(ClientId, WillMsg).
+send_willmsg(ClientId, WillMsg) ->
+    emqttd_pubsub:publish(WillMsg#mqtt_message{from = ClientId}).
 
 start_keepalive(0) -> ignore;
 

+ 2 - 2
apps/emqttd/src/emqttd_pubsub.erl

@@ -177,7 +177,7 @@ publish(From, <<"$Q/", _/binary>> = Queue, #mqtt_message{qos = Qos} = Msg) ->
                 Qos > SubQos -> Msg#mqtt_message{qos = SubQos};
                 true -> Msg
             end,
-            SubPid ! {dispatch, {self(), Msg1}}
+            SubPid ! {dispatch, Msg1}
         end, mnesia:dirty_read(queue, Queue));
     
 publish(_From, Topic, Msg) when is_binary(Topic) ->
@@ -202,7 +202,7 @@ dispatch(Topic, #mqtt_message{qos = Qos} = Msg ) when is_binary(Topic) ->
                     Qos > SubQos -> Msg#mqtt_message{qos = SubQos};
                     true -> Msg
                 end,
-                SubPid ! {dispatch, {self(), Msg1}}
+                SubPid ! {dispatch, Msg1}
             end, Subscribers), 
     length(Subscribers).
 

+ 278 - 269
apps/emqttd/src/emqttd_session.erl

@@ -53,31 +53,26 @@
 
 -include_lib("emqtt/include/emqtt_packet.hrl").
 
-%% Start gen_server
--export([start_link/2, resume/3, destroy/2]).
-
-%% Init Session State
--export([new/1]).
+%% Session API
+-export([start_link/3, resume/3, destroy/2]).
 
 %% PubSub APIs
--export([publish/3,
-         puback/2,
-         subscribe/2,
-         unsubscribe/2,
-         await/2,
-         dispatch/2]).
+-export([publish/2,
+         puback/2, pubrec/2, pubrel/2, pubcomp/2,
+         subscribe/2, unsubscribe/2]).
 
 %% gen_server Function Exports
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
 
 -record(session, {
-        %% ClientId: Identifier of Session
-        clientid    :: binary(),
 
         %% Clean Session Flag
         clean_sess = true,
 
+        %% ClientId: Identifier of Session
+        clientid    :: binary(),
+
         %% Client Pid linked with session
         client_pid  :: pid(),
 
@@ -91,7 +86,7 @@
         %% QoS 1 and QoS 2 messages which have been sent to the Client,
         %% but have not been completely acknowledged.
         %% Client <- Broker
-        inflight_window :: emqttd_mqwin:mqwin(),
+        inflight_queue :: emqttd_inflight:inflight(),
 
         %% All qos1, qos2 messages published to when client is disconnected.
         %% QoS 1 and QoS 2 messages pending transmission to the Client.
@@ -129,167 +124,121 @@
         
         timestamp}).
 
--type session() :: #session{}.
-
-%%%=============================================================================
-%%% Session API
-%%%=============================================================================
-
 %%------------------------------------------------------------------------------
-%% @doc Start a session process.
+%% @doc Start a session.
 %% @end
 %%------------------------------------------------------------------------------
-start_link(ClientId, ClientPid) ->
-    gen_server:start_link(?MODULE, [ClientId, ClientPid], []).
+start_link(CleanSess, ClientId, ClientPid) ->
+    gen_server:start_link(?MODULE, [CleanSess, ClientId, ClientPid], []).
 
 %%------------------------------------------------------------------------------
 %% @doc Resume a session.
 %% @end
 %%------------------------------------------------------------------------------
-resume(Session, _ClientId, _ClientPid) when is_record(Session, session) ->
-    Session;
-resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) ->
-    gen_server:cast(SessPid, {resume, ClientId, ClientPid}), SessPid.
+resume(Session, ClientId, ClientPid) when is_pid(Session) ->
+    gen_server:cast(Session, {resume, ClientId, ClientPid}).
 
 %%------------------------------------------------------------------------------
 %% @doc Destroy a session.
 %% @end
 %%------------------------------------------------------------------------------
--spec destroy(SessPid :: pid(), ClientId :: binary()) -> ok.
-destroy(SessPid, ClientId)  when is_pid(SessPid) ->
-    gen_server:cast(SessPid, {destroy, ClientId}), SessPid.
-
-%%------------------------------------------------------------------------------
-%% @doc Init Session State.
-%% @end
-%%------------------------------------------------------------------------------
--spec new(binary()) -> session().
-new(ClientId) ->
-    QEnv = emqttd:env(mqtt, queue),
-    SessEnv = emqttd:env(mqtt, session),
-    #session{
-        clientid         = ClientId,
-        clean_sess       = true,
-        subscriptions    = [],
-        inflight_window  = emqttd_mqwin:new(ClientId, QEnv),
-        pending_queue    = emqttd_mqueue:new(ClientId, QEnv),
-        awaiting_rel     = #{},  
-        awaiting_ack     = #{},
-        awaiting_comp    = #{},
-        unack_retries    = emqttd_opts:g(unack_retries, SessEnv),
-        unack_timeout    = emqttd_opts:g(unack_timeout, SessEnv),
-        await_rel_timeout = emqttd_opts:g(await_rel_timeout, SessEnv),
-        max_awaiting_rel  = emqttd_opts:g(max_awaiting_rel, SessEnv),
-        expired_after    = emqttd_opts:g(expired_after, SessEnv) * 3600
-    }.
+-spec destroy(Session:: pid(), ClientId :: binary()) -> ok.
+destroy(Session, ClientId)  when is_pid(Session) ->
+    gen_server:call(Session, {destroy, ClientId}).
 
 %%------------------------------------------------------------------------------
 %% @doc Publish message
 %% @end
 %%------------------------------------------------------------------------------
--spec publish(session() | pid(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> session() | pid().
-publish(Session, ClientId, {?QOS_0, Message}) ->
+-spec publish(Session :: pid(), {mqtt_qos(), mqtt_message()}) -> ok.
+publish(Session, Msg = #mqtt_message{qos = ?QOS_0}) when is_pid(Session) ->
     %% publish qos0 directly
-    emqttd_pubsub:publish(ClientId, Message), Session;
+    emqttd_pubsub:publish(Msg);
 
-publish(Session, ClientId, {?QOS_1, Message}) ->
+publish(Session, Msg = #mqtt_message{qos = ?QOS_1}) when is_pid(Session) ->
     %% publish qos1 directly, and client will puback
-	emqttd_pubsub:publish(ClientId, Message), Session;
-
-publish(Session = #session{awaiting_rel      = AwaitingRel,
-                           await_rel_timeout = Timeout,
-                           max_awaiting_rel  = MaxLen}, ClientId,
-        {?QOS_2, Message = #mqtt_message{msgid = MsgId}}) ->
-    case maps:size(AwaitingRel) >= MaxLen of
-        true -> lager:error([{clientid, ClientId}], "Session ~s "
-                                " dropped Qos2 message for too many awaiting_rel: ~p", [ClientId, Message]);
-        false ->
-            %% store in awaiting_rel
-            TRef = erlang:send_after(Timeout * 1000, self(), {timeout, awaiting_rel, MsgId}),
-            Session#session{awaiting_rel = maps:put(MsgId, {Message, TRef}, AwaitingRel)};
-    end;
-publish(SessPid, ClientId, {?QOS_2, Message}) when is_pid(SessPid) ->
-    gen_server:cast(SessPid, {publish, ClientId, {?QOS_2, Message}}), SessPid.
+	emqttd_pubsub:publish(Msg);
+
+publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) when is_pid(Session) ->
+    %% publish qos2 by session 
+    gen_server:cast(Session, {publish, Msg}).
 
 %%------------------------------------------------------------------------------
 %% @doc PubAck message
 %% @end
 %%------------------------------------------------------------------------------
+-spec puback(Session :: pid(), MsgId :: mqtt_packet_id()) -> ok.
+puback(Session, MsgId) when is_pid(Session) ->
+    gen_server:cast(Session, {puback, MsgId}).
 
--spec puback(session(), {mqtt_packet_type(), mqtt_packet_id()}) -> session().
-puback(Session = #session{clientid = ClientId, awaiting_ack = Awaiting}, {?PUBACK, PacketId}) ->
-    case maps:is_key(PacketId, Awaiting) of
-        true -> ok;
-        false -> lager:warning("Session ~s: PUBACK PacketId '~p' not found!", [ClientId, PacketId])
-    end,
-    Session#session{awaiting_ack = maps:remove(PacketId, Awaiting)};
+-spec pubrec(Session :: pid(), MsgId :: mqtt_packet_id()) -> ok.
+pubrec(Session, MsgId) when is_pid(Session) ->
+    gen_server:cast(Session, {pubrec, MsgId}).
 
-puback(SessPid, {?PUBACK, PacketId}) when is_pid(SessPid) ->
-    gen_server:cast(SessPid, {puback, {?PUBACK, PacketId});
-
-%% PUBREC
-puback(Session = #session{clientid = ClientId, 
-                                  awaiting_ack = AwaitingAck,
-                                  awaiting_comp = AwaitingComp}, {?PUBREC, PacketId}) ->
-    case maps:is_key(PacketId, AwaitingAck) of
-        true -> ok;
-        false -> lager:warning("Session ~s: PUBREC PacketId '~p' not found!", [ClientId, PacketId])
-    end,
-    Session#session{awaiting_ack   = maps:remove(PacketId, AwaitingAck),
-                      awaiting_comp  = maps:put(PacketId, true, AwaitingComp)};
-
-puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) ->
-    gen_server:cast(SessPid, {puback, {?PUBREC, PacketId});
-
-%% PUBREL
-puback(Session = #session{clientid = ClientId, awaiting_rel = Awaiting}, {?PUBREL, PacketId}) ->
-    case maps:find(PacketId, Awaiting) of
-        {ok, {Msg, TRef}} ->
-            catch erlang:cancel_timer(TRef),
-            emqttd_pubsub:publish(ClientId, Msg);
-        error ->
-            lager:error("Session ~s cannot find PUBREL PacketId '~p'!", [ClientId, PacketId])
-    end,
-    Session#session{awaiting_rel = maps:remove(PacketId, Awaiting)};
+-spec pubrel(Session :: pid(), MsgId :: mqtt_packet_id()) -> ok.
+pubrel(Session, MsgId) when is_pid(Session) ->
+    gen_server:cast(Session, {pubrel, MsgId}).
 
-puback(SessPid, {?PUBREL, PacketId}) when is_pid(SessPid) ->
-    gen_server:cast(SessPid, {puback, {?PUBREL, PacketId});
-
-%% PUBCOMP
-puback(Session = #session{clientid = ClientId, 
-                                  awaiting_comp = AwaitingComp}, {?PUBCOMP, PacketId}) ->
-    case maps:is_key(PacketId, AwaitingComp) of
-        true -> ok;
-        false -> lager:warning("Session ~s: PUBREC PacketId '~p' not exist", [ClientId, PacketId])
-    end,
-    Session#session{awaiting_comp = maps:remove(PacketId, AwaitingComp)};
-
-puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) ->
-    gen_server:cast(SessPid, {puback, {?PUBCOMP, PacketId});
-
-wait_ack
-
-timeout(awaiting_rel, MsgId, Session = #session{clientid = ClientId, awaiting_rel = Awaiting}) ->
-    case maps:find(MsgId, Awaiting) of
-        {ok, {Msg, _TRef}} ->
-            lager:error([{client, ClientId}], "Session ~s Awaiting Rel Timout!~nDrop Message:~p", [ClientId, Msg]),
-            Session#session{awaiting_rel = maps:remove(MsgId, Awaiting)};
-        error ->
-            lager:error([{client, ClientId}], "Session ~s Cannot find Awaiting Rel: MsgId=~p", [ClientId, MsgId]),
-            Session
-    end.
+-spec pubcomp(Session :: pid(), MsgId :: mqtt_packet_id()) -> ok.
+pubcomp(Session, MsgId) when is_pid(Session) ->
+    gen_server:cast(Session, {pubcomp, MsgId}).
 
 %%------------------------------------------------------------------------------
 %% @doc Subscribe Topics
 %% @end
 %%------------------------------------------------------------------------------
--spec subscribe(session() | pid(), [{binary(), mqtt_qos()}]) -> {ok, session() | pid(), [mqtt_qos()]}.
-subscribe(Session = #session{clientid = ClientId, subscriptions = Subscriptions}, Topics) ->
+-spec subscribe(Session :: pid(), [{binary(), mqtt_qos()}]) -> {ok, [mqtt_qos()]}.
+subscribe(Session, Topics) when is_pid(Session) ->
+    gen_server:call(Session, {subscribe, Topics}).
+
+%%------------------------------------------------------------------------------
+%% @doc Unsubscribe Topics
+%% @end
+%%------------------------------------------------------------------------------
+-spec unsubscribe(Session :: pid(), [Topic :: binary()]) -> ok.
+unsubscribe(Session, Topics) when is_pid(Session) ->
+    gen_server:call(Session, {unsubscribe, Topics}).
+
+%%%=============================================================================
+%%% gen_server callbacks
+%%%=============================================================================
+init([CleanSess, ClientId, ClientPid]) ->
+    if
+        CleanSess =:= false ->
+            process_flag(trap_exit, true),
+            true = link(ClientPid);
+        CleanSess =:= true ->
+            ok
+    end,
+    QEnv = emqttd:env(mqtt, queue),
+    SessEnv = emqttd:env(mqtt, session),
+    PendingQ = emqttd_mqueue:new(ClientId, QEnv),
+    InflightQ = emqttd_inflight:new(ClientId, emqttd_opts:g(max_inflight, SessEnv)),
+    Session = #session{
+        clean_sess       = CleanSess,
+        clientid         = ClientId,
+        client_pid       = ClientPid,
+        subscriptions    = [],
+        inflight_queue   = InflightQ,
+        pending_queue    = PendingQ,
+        awaiting_rel     = #{},
+        awaiting_ack     = #{},
+        awaiting_comp    = #{},
+        unack_retries    = emqttd_opts:g(unack_retries, SessEnv),
+        unack_timeout    = emqttd_opts:g(unack_timeout, SessEnv),
+        await_rel_timeout = emqttd_opts:g(await_rel_timeout, SessEnv),
+        max_awaiting_rel  = emqttd_opts:g(max_awaiting_rel, SessEnv),
+        expired_after     = emqttd_opts:g(expired_after, SessEnv) * 3600,
+        timestamp         = os:timestamp()
+    },
+    {ok, Session, hibernate}.
+
+handle_call({subscribe, Topics}, _From, Session = #session{clientid = ClientId, subscriptions = Subscriptions}) ->
 
     %% subscribe first and don't care if the subscriptions have been existed
     {ok, GrantedQos} = emqttd_pubsub:subscribe(Topics),
 
-    lager:info([{client, ClientId}], "Client ~s subscribe ~p. Granted QoS: ~p",
+    lager:info([{client, ClientId}], "Session ~s subscribe ~p. Granted QoS: ~p",
                [ClientId, Topics, GrantedQos]),
 
     Subscriptions1 =
@@ -310,19 +259,9 @@ subscribe(Session = #session{clientid = ClientId, subscriptions = Subscriptions}
                         [{Topic, Qos} | Acc]
                 end
         end, Subscriptions, Topics),
+    {reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}};
 
-    {ok, Session#session{subscriptions = Subscriptions1}, GrantedQos};
-
-subscribe(SessPid, Topics) when is_pid(SessPid) ->
-    {ok, GrantedQos} = gen_server:call(SessPid, {subscribe, Topics}),
-    {ok, SessPid, GrantedQos}.
-
-%%------------------------------------------------------------------------------
-%% @doc Unsubscribe Topics
-%% @end
-%%------------------------------------------------------------------------------
--spec unsubscribe(session() | pid(), [binary()]) -> {ok, session() | pid()}.
-unsubscribe(Session = #session{clientid = ClientId, subscriptions = Subscriptions}, Topics) ->
+handle_call({unsubscribe, Topics}, _From, Session = #session{clientid = ClientId, subscriptions = Subscriptions}) ->
 
     %%unsubscribe from topic tree
     ok = emqttd_pubsub:unsubscribe(Topics),
@@ -338,63 +277,11 @@ unsubscribe(Session = #session{clientid = ClientId, subscriptions = Subscription
                     end
                 end, Subscriptions, Topics),
 
-    {ok, Session#session{subscriptions = Subscriptions1}};
-
-unsubscribe(SessPid, Topics) when is_pid(SessPid) ->
-    gen_server:call(SessPid, {unsubscribe, Topics}),
-    {ok, SessPid}.
-
-%%------------------------------------------------------------------------------
-%% @doc Destroy Session
-%% @end
-%%------------------------------------------------------------------------------
-
-% message(qos1) is awaiting ack
-await_ack(Msg = #mqtt_message{qos = ?QOS_1}, Session = #session{message_id = MsgId,
-                                                                  inflight_queue = InflightQ,
-                                                                  awaiting_ack = Awaiting,
-                                                                  unack_retry_after = Time,
-                                                                  max_unack_retries = Retries}) ->
-    %% assign msgid before send
-    Msg1 = Msg#mqtt_message{msgid = MsgId},
-    TRef = erlang:send_after(Time * 1000, self(), {retry, MsgId}),
-    Awaiting1 = maps:put(MsgId, {TRef, Retries, Time}, Awaiting),
-    {Msg1, next_msgid(Session#session{inflight_queue = [{MsgId, Msg1} | InflightQ],
-                                        awaiting_ack = Awaiting1})}.
-
-% message(qos2) is awaiting ack
-await_ack(Message = #mqtt_message{qos = Qos}, Session = #session{message_id = MsgId, awaiting_ack = Awaiting},)
-    when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) ->
-    %%assign msgid before send
-    Message1 = Message#mqtt_message{msgid = MsgId, dup = false},
-    Message2 =
-    if
-        Qos =:= ?QOS_2 -> Message1#mqtt_message{dup = false};
-        true -> Message1
-    end,
-    Awaiting1 = maps:put(MsgId, Message2, Awaiting),
-    {Message1, next_msgid(Session#session{awaiting_ack = Awaiting1})}.
-
-
-%%%=============================================================================
-%%% gen_server callbacks
-%%%=============================================================================
-init([ClientId, ClientPid]) ->
-    process_flag(trap_exit, true),
-    true = link(ClientPid),
-    Session = emqttd_session:new(ClientId),
-    {ok, Session#session{clean_sess = false,
-                         client_pid = ClientPid,
-                         timestamp = os:timestamp()}, hibernate}.
-
-
-handle_call({subscribe, Topics}, _From, Session) ->
-    {ok, NewSession, GrantedQos} = subscribe(Session, Topics),
-    {reply, {ok, GrantedQos}, NewSession};
+    {reply, ok, Session#session{subscriptions = Subscriptions1}};
 
-handle_call({unsubscribe, Topics}, _From, Session) ->
-    {ok, NewSession} = unsubscribe(Session, Topics),
-    {reply, ok, NewSession};
+handle_call({destroy, ClientId}, _From, Session = #session{clientid = ClientId}) ->
+    lager:warning("Session ~s destroyed", [ClientId]),
+    {stop, {shutdown, destroy}, ok, Session};
 
 handle_call(Req, _From, State) ->
     lager:error("Unexpected Request: ~p", [Req]),
@@ -403,10 +290,10 @@ handle_call(Req, _From, State) ->
 handle_cast({resume, ClientId, ClientPid}, State = #session{
                                                       clientid      = ClientId,
                                                       client_pid    = OldClientPid,
-                                                      msg_queue     = Queue,
+                                                      pending_queue     = Queue,
                                                       awaiting_ack  = AwaitingAck,
                                                       awaiting_comp = AwaitingComp,
-                                                      expire_timer  = ETimer}) ->
+                                                      expired_timer  = ETimer}) ->
 
     lager:info([{client, ClientId}], "Session ~s resumed by ~p",[ClientId, ClientPid]),
 
@@ -426,8 +313,8 @@ handle_cast({resume, ClientId, ClientPid}, State = #session{
     emqttd_util:cancel_timer(ETimer),
 
     %% redelivery PUBREL
-    lists:foreach(fun(PacketId) ->
-                ClientPid ! {redeliver, {?PUBREL, PacketId}}
+    lists:foreach(fun(MsgId) ->
+                ClientPid ! {redeliver, {?PUBREL, MsgId}}
         end, maps:keys(AwaitingComp)),
 
     %% redelivery messages that awaiting PUBACK or PUBREC
@@ -442,45 +329,114 @@ handle_cast({resume, ClientId, ClientPid}, State = #session{
         end, emqttd_queue:all(Queue)),
 
     {noreply, State#session{client_pid   = ClientPid,
-                            msg_queue    = emqttd_queue:clear(Queue),
-                            expire_timer = undefined}, hibernate};
+                            %%TODO:
+                            pending_queue = emqttd_queue:clear(Queue),
+                            expired_timer = undefined}, hibernate};
 
-handle_cast({publish, ClientId, {?QOS_2, Message}}, Session) ->
-    {noreply, publish(Session, ClientId, {?QOS_2, Message})};
+handle_cast({publish, Message = #mqtt_message{qos = ?QOS_2}}, Session) ->
+    {noreply, publish_qos2(Message, Session)};
 
-handle_cast({puback, {PubAck, PacketId}, Session) ->
-    {noreply, puback(Session, {PubAck, PacketId})};
 
-handle_cast({destroy, ClientId}, Session = #session{clientid = ClientId}) ->
-    lager:warning("Session ~s destroyed", [ClientId]),
-    {stop, normal, Session};
+handle_cast({puback, MsgId}, Session = #session{clientid = ClientId, inflight_queue = Q, awaiting_ack = Awaiting}) ->
+    case maps:find(MsgId, Awaiting) of
+        {ok, {_, TRef}} ->
+            catch erlang:cancel_timer(TRef),
+            {noreply, dispatch(Session#session{inflight_queue = emqttd_inflight:ack(MsgId, Q),
+                                               awaiting_ack   = maps:remove(MsgId, Awaiting)})};
+        error ->
+            lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, MsgId]),
+            {noreply, Session}
+    end;
+
+%% PUBREC
+handle_cast({pubrec, MsgId}, Session = #session{clientid = ClientId,
+                                                awaiting_ack = AwaitingAck,
+                                                awaiting_comp = AwaitingComp,
+                                                await_rel_timeout = Timeout}) ->
+    case maps:find(MsgId, AwaitingAck) of
+        {ok, {_, TRef}} ->
+            catch erlang:cancel_timer(TRef),
+            TRef1 = timer(Timeout, {timeout, awaiting_comp, MsgId}),
+            {noreply, dispatch(Session#session{awaiting_ack  = maps:remove(MsgId, AwaitingAck),
+                                               awaiting_comp = maps:put(MsgId, TRef1, AwaitingComp)})};
+        error ->
+            lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, MsgId]),
+            {noreply, Session}
+    end;
+
+handle_cast({pubrel, MsgId}, Session = #session{clientid = ClientId, awaiting_rel = Awaiting}) ->
+    case maps:find(MsgId, Awaiting) of
+        {ok, {Msg, TRef}} ->
+            catch erlang:cancel_timer(TRef),
+            emqttd_pubsub:publish(Msg),
+            {noreply, Session#session{awaiting_rel = maps:remove(MsgId, Awaiting)}};
+        error ->
+            lager:error("Session ~s cannot find PUBREL'~p'!", [ClientId, MsgId]),
+            {noreply, Session}
+    end;
+
+%% PUBCOMP
+handle_cast({pubcomp, MsgId}, Session = #session{clientid = ClientId, awaiting_comp = AwaitingComp}) ->
+    case maps:is_key(MsgId, AwaitingComp) of
+        true -> 
+            {noreply, Session#session{awaiting_comp = maps:remove(MsgId, AwaitingComp)}};
+        false ->
+            lager:error("Session ~s cannot find PUBREC MsgId '~p'", [ClientId, MsgId]),
+            {noreply, Session}
+    end;
 
 handle_cast(Msg, State) ->
-    lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]), 
+    lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]),
     {noreply, State}.
 
-handle_info({dispatch, {_From, Messages}}, Session) when is_list(Messages) ->
-    F = fun(Message, S) -> dispatch(Message, S) end,
-    {noreply, lists:foldl(F, Session, Messages)};
+handle_info({dispatch, MsgList}, Session) when is_list(MsgList) ->
+    NewSession = lists:foldl(fun(Msg, S) -> 
+                                     dispatch({new, Msg}, S)
+                             end, Session, MsgList),
+    {noreply, NewSession};
+
+handle_info({dispatch, {old, Msg}}, Session) when is_record(Msg, mqtt_message) ->
+    {noreply, dispatch({old, Msg}, Session)};
 
-handle_info({dispatch, {_From, Message}}, State) ->
-    {noreply, dispatch(Message, State)};
+handle_info({dispatch, Msg}, Session) when is_record(Msg, mqtt_message) ->
+    {noreply, dispatch({new, Msg}, Session)};
 
-handle_info({'EXIT', ClientPid, Reason}, Session = #session{clientid = ClientId,
-                                                            client_pid = ClientPid}) ->
-    lager:info("Session: client ~s@~p exited for ~p", [ClientId, ClientPid, Reason]),
-    {noreply, start_expire_timer(Session#session{client_pid = undefined})};
+handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false,
+                                                            clientid = ClientId,
+                                                            client_pid = ClientPid,
+                                                            expired_after = Expires}) ->
+    %%TODO: Clean puback, pubrel, pubcomp timers
+    lager:info("Session ~s: client ~p exited for ~p", [ClientId, ClientPid, Reason]),
+    TRef = timer(Expires * 1000, session_expired),
+    {noreply, Session#session{expired_timer = TRef}};
 
 handle_info({'EXIT', ClientPid0, _Reason}, State = #session{client_pid = ClientPid}) ->
-    lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]),
+    lager:critical("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]),
     {noreply, State};
 
 handle_info(session_expired, State = #session{clientid = ClientId}) ->
-    lager:warning("Session ~s expired!", [ClientId]),
+    lager:error("Session ~s expired, shutdown now!", [ClientId]),
     {stop, {shutdown, expired}, State};
 
-handle_info({timeout, awaiting_rel, MsgId}, Session) ->
-    {noreply, timeout(awaiting_rel, MsgId, Session)};
+handle_info({timeout, awaiting_rel, MsgId}, Session = #session{clientid = ClientId, awaiting_rel = Awaiting}) ->
+    case maps:find(MsgId, Awaiting) of
+        {ok, {Msg, _TRef}} ->
+            lager:error([{client, ClientId}], "Session ~s Awaiting Rel Timout!~nDrop Message:~p", [ClientId, Msg]),
+            {noreply, Session#session{awaiting_rel = maps:remove(MsgId, Awaiting)}};
+        error ->
+            lager:error([{client, ClientId}], "Session ~s Cannot find Awaiting Rel: MsgId=~p", [ClientId, MsgId]),
+            {noreply, Session}
+    end;
+
+handle_info({timeout, awaiting_comp, MsgId}, Session = #session{clientid = ClientId, awaiting_comp = Awaiting}) ->
+    case maps:find(MsgId, Awaiting) of
+        {ok, _TRef} ->
+            lager:error([{client, ClientId}], "Session ~s Awaiting PUBCOMP Timout: MsgId=~p!", [ClientId, MsgId]),
+            {noreply, Session#session{awaiting_comp = maps:remove(MsgId, Awaiting)}};
+        error ->
+            lager:error([{client, ClientId}], "Session ~s Cannot find Awaiting PUBCOMP: MsgId=~p", [ClientId, MsgId]),
+            {noreply, Session}
+    end;
 
 handle_info(Info, Session) ->
     lager:critical("Unexpected Info: ~p, Session: ~p", [Info, Session]),
@@ -492,53 +448,106 @@ terminate(_Reason, _Session) ->
 code_change(_OldVsn, Session, _Extra) ->
     {ok, Session}.
 
+%%%=============================================================================
+%%% Internal functions
+%%%=============================================================================
+
+%%------------------------------------------------------------------------------
+%% @private
+%% @doc Plubish Qos2 message from client -> broker, and then wait for pubrel.
+%% @end
+%%------------------------------------------------------------------------------
+
+publish_qos2(Message = #mqtt_message{qos = ?QOS_2,msgid = MsgId}, Session = #session{clientid = ClientId,
+                                                                                     awaiting_rel = AwaitingRel,
+                                                                                     await_rel_timeout = Timeout}) ->
+
+    case check_awaiting_rel(Session) of
+        true -> 
+            TRef = timer(Timeout, {timeout, awaiting_rel, MsgId}),
+            Session#session{awaiting_rel = maps:put(MsgId, {Message, TRef}, AwaitingRel)};
+        false ->
+            lager:error([{clientid, ClientId}], "Session ~s "
+                                " dropped Qos2 message for too many awaiting_rel: ~p", [ClientId, Message]),
+            Session
+    end.
+
+check_awaiting_rel(#session{max_awaiting_rel = 0}) ->
+    true;
+check_awaiting_rel(#session{awaiting_rel = AwaitingRel,
+                            max_awaiting_rel = MaxLen}) ->
+    maps:size(AwaitingRel) < MaxLen.
+
 %%%=============================================================================
 %%% Dispatch message from broker -> client.
 %%%=============================================================================
 
+dispatch(Session = #session{client_pid = undefined}) ->
+    %% do nothing
+    Session;
+
+dispatch(Session = #session{pending_queue = PendingQ}) ->
+    case emqttd_mqueue:out(PendingQ) of
+        {empty, _Q} ->
+            Session;
+        {{value, Msg}, Q1} ->
+            self() ! {dispatch, {old, Msg}},
+            Session#session{pending_queue = Q1}
+        end.
+
 %% queued the message if client is offline
-dispatch(Msg, Session = #session{client_pid = undefined}) ->
-    queue(Msg, Session);
+dispatch({Type, Msg}, Session = #session{client_pid = undefined,
+                                         pending_queue= PendingQ}) ->
+    Session#session{pending_queue = emqttd_mqueue:in({Type, Msg}, PendingQ)};
 
 %% dispatch qos0 directly to client process
-dispatch(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) ->
-    ClientPid ! {dispatch, {self(), Msg}}, Session;
-
-%% dispatch qos1/2 messages and wait for puback
-dispatch(Msg = #mqtt_message{qos = Qos}, Session = #session{clientid = ClientId,
-                                                            message_id = MsgId,
-                                                            pending_queue = Q,
-                                                            inflight_window = Win})
-    when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) ->
-
-    case emqttd_mqwin:is_full(InflightWin) of
-        true  ->
-            lager:error("Session ~s inflight window is full!", [ClientId]),
-            Session#session{pending_queue = emqttd_mqueue:in(Msg, Q)};
-        false ->
-            Msg1 = Msg#mqtt_message{msgid = MsgId},
-            Msg2 =
-            if
-                Qos =:= ?QOS_2 -> Msg1#mqtt_message{dup = false};
-                true -> Msg1
-            end,
-            ClientPid ! {dispatch, {self(), Msg2}},
-            NewWin = emqttd_mqwin:in(Msg2, Win),
-            await_ack(Msg2, next_msgid(Session#session{inflight_window = NewWin}))
+dispatch({_Type, Msg} = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) ->
+    ClientPid ! {deliver,  Msg}, Session;
+
+%% dispatch qos1/2 message and wait for puback
+dispatch({Type, Msg = #mqtt_message{qos = Qos}}, Session = #session{clientid = ClientId,
+                                                                    client_pid = ClientPid,
+                                                                    message_id = MsgId,
+                                                                    pending_queue = PendingQ,
+                                                                    inflight_queue= InflightQ})
+    when Qos =:= ?QOS_1 orelse Qos =:= ?QOS_2 ->
+    %% assign id first
+    Msg1 = Msg#mqtt_message{msgid = MsgId},
+    Msg2 =
+    if
+        Qos =:= ?QOS_1 -> Msg1;
+        Qos =:= ?QOS_2 -> Msg1#mqtt_message{dup = false}
+    end,
+    case emqttd_inflight:in(Msg1, InflightQ) of
+        {error, full} ->
+            lager:error("Session ~s inflight queue is full!", [ClientId]),
+            Session#session{pending_queue = emqttd_mqueue:in({Type, Msg}, PendingQ)};
+        {ok, InflightQ1} ->
+            ClientPid ! {deliver, Msg1},
+            await_ack(Msg1, next_msgid(Session#session{inflight_queue = InflightQ1}))
     end.
 
-queue(Msg, Session = #session{pending_queue= Queue}) ->
-    Session#session{pending_queue = emqttd_mqueue:in(Msg, Queue)}.
+deliver(Msg, Session) ->
+    ok.
+
+await(Msg, Session) ->
+    ok.
+
+% message(qos1/2) is awaiting ack
+await_ack(Msg = #mqtt_message{msgid = MsgId}, Session = #session{awaiting_ack = Awaiting,
+                                                                unack_retries = Retries,
+                                                                unack_timeout = Timeout}) ->
+    
+    TRef = timer(Timeout * 1000, {retry, MsgId}),
+    Awaiting1 = maps:put(MsgId, {{Retries, Timeout}, TRef}, Awaiting),
+    Session#session{awaiting_ack = Awaiting1}.
 
-next_msgid(State = #session{message_id = 16#ffff}) ->
-    State#session{message_id = 1};
+timer(Timeout, TimeoutMsg) ->
+    erlang:send_after(Timeout * 1000, self(), TimeoutMsg).
 
-next_msgid(State = #session{message_id = MsgId}) ->
-    State#session{message_id = MsgId + 1}.
+next_msgid(Session = #session{message_id = 16#ffff}) ->
+    Session#session{message_id = 1};
 
-start_expire_timer(Session = #session{expired_after = Expires,
-                                      expired_timer = OldTimer}) ->
-    emqttd_util:cancel_timer(OldTimer),
-    Timer = erlang:send_after(Expires * 1000, self(), session_expired),
-    Session#session{expired_timer = Timer}.
+next_msgid(Session = #session{message_id = MsgId}) ->
+    Session#session{message_id = MsgId + 1}.
 

+ 8 - 7
apps/emqttd/src/emqttd_session_sup.erl

@@ -30,7 +30,7 @@
 
 -behavior(supervisor).
 
--export([start_link/0, start_session/2]).
+-export([start_link/0, start_session/3]).
 
 -export([init/1]).
 
@@ -46,16 +46,17 @@ start_link() ->
 %% @doc Start a session
 %% @end
 %%------------------------------------------------------------------------------
--spec start_session(binary(), pid()) -> {ok, pid()}.
-start_session(ClientId, ClientPid) ->
-    supervisor:start_child(?MODULE, [ClientId, ClientPid]).
+-spec start_session(boolean(), binary(), pid()) -> {ok, pid()}.
+start_session(CleanSess, ClientId, ClientPid) ->
+    supervisor:start_child(?MODULE, [CleanSess, ClientId, ClientPid]).
 
 %%%=============================================================================
 %%% Supervisor callbacks
 %%%=============================================================================
 
 init([]) ->
-    {ok, {{simple_one_for_one, 10, 10},
-          [{session, {emqttd_session_proc, start_link, []},
-              transient, 10000, worker, [emqttd_session_proc]}]}}.
+    {ok, {{simple_one_for_one, 0, 1},
+          [{session, {emqttd_session, start_link, []},
+              transient, 10000, worker, [emqttd_session]}]}}.
+
 

+ 30 - 26
apps/emqttd/src/emqttd_sm.erl

@@ -90,20 +90,10 @@ table() -> ?SESSION_TAB.
 %% @end
 %%------------------------------------------------------------------------------
 
--spec start_session(CleanSess :: boolean(), binary()) -> {ok, module(), record() | pid()}.
-start_session(true, ClientId) ->
-    %% destroy old session if existed
-    ok = destroy_session(ClientId),
-    {ok, emqttd_session, emqttd_session:new(ClientId)};
-
-start_session(false, ClientId) ->
-    SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId),
-    case call(SmPid, {start_session, ClientId, self()}) of
-        {ok, SessPid} ->
-            {ok, emqttd_session_proc, SessPid};
-        {error, Error} ->
-            {error, Error}
-    end.
+-spec start_session(CleanSess :: boolean(), binary()) -> {ok, pid()} | {error, any()}.
+start_session(CleanSess, ClientId) ->
+    SM = gproc_pool:pick_worker(?SM_POOL, ClientId),
+    call(SM, {start_session, {CleanSess, ClientId, self()}}).
 
 %%------------------------------------------------------------------------------
 %% @doc Lookup Session Pid
@@ -122,10 +112,10 @@ lookup_session(ClientId) ->
 %%------------------------------------------------------------------------------
 -spec destroy_session(binary()) -> ok.
 destroy_session(ClientId) ->
-    SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId),
-    call(SmPid, {destroy_session, ClientId}).
+    SM = gproc_pool:pick_worker(?SM_POOL, ClientId),
+    call(SM, {destroy_session, ClientId}).
 
-call(SmPid, Req) -> gen_server:call(SmPid, Req).
+call(SM, Req) -> gen_server:call(SM, Req).
 
 %%%=============================================================================
 %%% gen_server callbacks
@@ -135,23 +125,27 @@ init([Id, StatsFun]) ->
     gproc_pool:connect_worker(?SM_POOL, {?MODULE, Id}),
     {ok, #state{id = Id, statsfun = StatsFun}}.
 
-handle_call({start_session, ClientId, ClientPid}, _From, State) ->
+handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) ->
     Reply =
     case ets:lookup(?SESSION_TAB, ClientId) of
         [{_, SessPid, _MRef}] ->
-            emqttd_session_proc:resume(SessPid, ClientId, ClientPid), 
+            emqttd_session:resume(SessPid, ClientId, ClientPid),
             {ok, SessPid};
         [] ->
-            case emqttd_session_sup:start_session(ClientId, ClientPid) of
-            {ok, SessPid} ->
-                ets:insert(?SESSION_TAB, {ClientId, SessPid, erlang:monitor(process, SessPid)}),
-                {ok, SessPid};
-            {error, Error} ->
-                {error, Error}
-            end
+            new_session(false, ClientId, ClientPid)
     end,
     {reply, Reply, setstats(State)};
 
+handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) ->
+    case ets:lookup(?SESSION_TAB, ClientId) of
+        [{_, SessPid, MRef}] ->
+            erlang:demonitor(MRef, [flush]),
+            emqttd_session:destroy_session(SessPid, ClientId);
+        [] ->
+            ok
+    end,
+    {reply, new_session(true, ClientId, ClientPid), setstats(State)};
+
 handle_call({destroy_session, ClientId}, _From, State) ->
     case ets:lookup(?SESSION_TAB, ClientId) of
         [{_, SessPid, MRef}] ->
@@ -186,6 +180,16 @@ code_change(_OldVsn, State, _Extra) ->
 %%% Internal functions
 %%%=============================================================================
 
+new_session(CleanSess, ClientId, ClientPid) ->
+    case emqttd_session_sup:start_session(CleanSess, ClientId, ClientPid) of
+        {ok, SessPid} ->
+            ets:insert(?SESSION_TAB, {ClientId, SessPid, erlang:monitor(process, SessPid)}),
+            {ok, SessPid};
+        {error, Error} ->
+            {error, Error}
+    end.
+
 setstats(State = #state{statsfun = StatsFun}) ->
     StatsFun(ets:info(?SESSION_TAB, size)), State.
 
+

+ 2 - 9
apps/emqttd/src/emqttd_ws_client.erl

@@ -130,15 +130,8 @@ handle_cast({received, Packet}, State = #state{proto_state = ProtoState}) ->
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
-handle_info({dispatch, {From, Messages}}, #state{proto_state = ProtoState} = State) when is_list(Messages) ->
-    ProtoState1 =
-    lists:foldl(fun(Message, PState) ->
-            {ok, PState1} = emqttd_protocol:send({From, Message}, PState), PState1
-        end, ProtoState, Messages),
-    {noreply, State#state{proto_state = ProtoState1}};
-
-handle_info({dispatch, {From, Message}}, #state{proto_state = ProtoState} = State) ->
-    {ok, ProtoState1} = emqttd_protocol:send({From, Message}, ProtoState),
+handle_info({deliver, Message}, #state{proto_state = ProtoState} = State) ->
+    {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState),
     {noreply, State#state{proto_state = ProtoState1}};
 
 handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = State) ->

+ 7 - 5
rel/files/emqttd.config

@@ -90,6 +90,10 @@
             %% Expired after 2 days
             {expired_after, 48},
 
+            %% Max number of QoS 1 and 2 messages that can be “in flight” at one time.
+            %% 0 means no limit
+            {max_inflight, 100},
+
             %% Max retries for unack Qos1/2 messages
             {unack_retries, 3},
 
@@ -99,16 +103,14 @@
             %% Awaiting PUBREL Timeout
             {await_rel_timeout, 8},
 
-            %% Max Packets that Awaiting PUBREL
-            {max_awaiting_rel, 100}
+            %% Max Packets that Awaiting PUBREL, 0 means no limit
+            {max_awaiting_rel, 0}
+
         ]},
         {queue, [
             %% Max queue length
             {max_length, 1000},
 
-            %% Max number of QoS 1 and 2 messages that can be “in flight” at one time.
-            {inflight_window, 100},
-
             %% Low watermark of queued messsages
             {low_watermark, 0.2},