Feng 10 лет назад
Родитель
Сommit
4ed62d018e
4 измененных файлов с 136 добавлено и 133 удалено
  1. 12 9
      src/emqttd_cm.erl
  2. 1 3
      src/emqttd_pubsub.erl
  3. 105 103
      src/emqttd_session.erl
  4. 18 18
      src/emqttd_sm.erl

+ 12 - 9
src/emqttd_cm.erl

@@ -37,8 +37,6 @@
 
 -behaviour(gen_server2).
 
--define(SERVER, ?MODULE).
-
 %% gen_server Function Exports
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
@@ -47,6 +45,9 @@
 
 -define(CM_POOL, ?MODULE).
 
+-define(LOG(Level, Format, Args, Client),
+            lager:Level("CM(~s): " ++ Format, [Client#mqtt_client.client_id|Args])).
+
 %%%=============================================================================
 %%% API
 %%%=============================================================================
@@ -102,15 +103,16 @@ init([Id, StatsFun]) ->
 
 handle_call(Req, _From, State) ->
     lager:error("unexpected request: ~p", [Req]),
-    {reply, {error, badreq}, State}.
+    {reply, {error, unsupported_req}, State}.
 
-handle_cast({register, Client = #mqtt_client{client_id = ClientId, client_pid = Pid}}, State) ->
+handle_cast({register, Client = #mqtt_client{client_id  = ClientId,
+                                             client_pid = Pid}}, State) ->
 	case ets:lookup(mqtt_client, ClientId) of
         [#mqtt_client{client_pid = Pid}] ->
-			lager:error("ClientId '~s' has been registered with ~p", [ClientId, Pid]),
             ignore;
 		[#mqtt_client{client_pid = OldPid}] ->
-			lager:warning("ClientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]);
+            %% TODO: should cancel monitor
+            ?LOG(warning, "client ~p conflict with ~p", [Pid, OldPid], Client);
 		[] -> 
             ok
 	end,
@@ -121,10 +123,10 @@ handle_cast({unregister, ClientId, Pid}, State) ->
 	case ets:lookup(mqtt_client, ClientId) of
 	[#mqtt_client{client_pid = Pid}] ->
 		ets:delete(mqtt_client, ClientId);
-	[_] -> 
+	[_] ->
 		ignore;
 	[] ->
-		lager:error("Cannot find clientId '~s' with ~p", [ClientId, Pid])
+        ?LOG(error, "Cannot find registered: ~p", [Pid], State)
 	end,
 	{noreply, setstats(State)};
 
@@ -137,7 +139,8 @@ handle_info(Info, State) ->
     {noreply, State}.
 
 terminate(_Reason, #state{id = Id}) ->
-    gproc_pool:disconnect_worker(?CM_POOL, {?MODULE, Id}), ok.
+    gproc_pool:disconnect_worker(?CM_POOL, {?MODULE, Id}),
+    ok.
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.

+ 1 - 3
src/emqttd_pubsub.erl

@@ -24,7 +24,6 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
-
 -module(emqttd_pubsub).
 
 -author("Feng Lee <feng@emqtt.io>").
@@ -48,8 +47,7 @@
          publish/1]).
 
 %% Local node
--export([dispatch/2,
-         match/1]).
+-export([dispatch/2, match/1]).
 
 -behaviour(gen_server2).
 

+ 105 - 103
src/emqttd_session.erl

@@ -44,7 +44,6 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
-
 -module(emqttd_session).
 
 -author("Feng Lee <feng@emqtt.io>").
@@ -53,16 +52,15 @@
 
 -include("emqttd_protocol.hrl").
 
+-behaviour(gen_server2).
+
 %% Session API
--export([start_link/3, resume/3, destroy/2]).
+-export([start_link/3, resume/3, info/1, destroy/2]).
 
 %% PubSub APIs
--export([publish/2,
-         puback/2, pubrec/2, pubrel/2, pubcomp/2,
+-export([publish/2, puback/2, pubrec/2, pubrel/2, pubcomp/2,
          subscribe/2, subscribe/3, unsubscribe/2]).
 
--behaviour(gen_server2).
-
 %% gen_server Function Exports
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
@@ -135,6 +133,10 @@
 
 -define(PUBSUB_TIMEOUT, 60000).
 
+-define(LOG(Level, Format, Args, State),
+            lager:Level([{client, State#session.client_id}],
+                        "Session(~s): " ++ Format, [State#session.client_id | Args])).
+
 %%------------------------------------------------------------------------------
 %% @doc Start a session.
 %% @end
@@ -151,6 +153,13 @@ start_link(CleanSess, ClientId, ClientPid) ->
 resume(SessPid, ClientId, ClientPid) ->
     gen_server2:cast(SessPid, {resume, ClientId, ClientPid}).
 
+%%------------------------------------------------------------------------------
+%% @doc Session Info.
+%% @end
+%%------------------------------------------------------------------------------
+info(SessPid) ->
+    gen_server2:call(SessPid, info).
+
 %%------------------------------------------------------------------------------
 %% @doc Destroy a session.
 %% @end
@@ -171,7 +180,7 @@ subscribe(SessPid, TopicTable) ->
 subscribe(SessPid, PacketId, TopicTable) ->
     From   = self(),
     AckFun = fun(GrantedQos) ->
-                 From ! {suback, PacketId, GrantedQos}
+               From ! {suback, PacketId, GrantedQos}
              end,
     gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}).
 
@@ -246,12 +255,15 @@ init([CleanSess, ClientId, ClientPid]) ->
             expired_after     = emqttd_opts:g(expired_after, SessEnv) * 3600,
             collect_interval  = emqttd_opts:g(collect_interval, SessEnv, 0),
             timestamp         = os:timestamp()},
-    emqttd_sm:register_session(CleanSess, ClientId, info(Session)),
+    emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)),
     %% start statistics
     {ok, start_collector(Session), hibernate}.
 
 prioritise_call(Msg, _From, _Len, _State) ->
-    case Msg of _  -> 0 end.
+    case Msg of 
+        info -> 10;
+        _    -> 0
+    end.
 
 prioritise_cast(Msg, _Len, _State) ->
     case Msg of
@@ -269,13 +281,16 @@ prioritise_cast(Msg, _Len, _State) ->
 prioritise_info(Msg, _Len, _State) ->
     case Msg of
         {'EXIT', _, _}  -> 10;
-        session_expired -> 10;
+        expired         -> 10;
         {timeout, _, _} -> 5;
         collect_info    -> 2;
         {dispatch, _}   -> 1;
         _               -> 0
     end.
 
+handle_call(info, _From, State) ->
+    {reply, sess_info(State), State};
+
 handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From,
                 Session = #session{client_id         = ClientId,
                                    awaiting_rel      = AwaitingRel,
@@ -292,11 +307,11 @@ handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From,
     end;
 
 handle_call(Req, _From, State) ->
-    lager:error("Unexpected Request: ~p", [Req]),
-    {reply, ok, State}.
+    ?LOG(critical, "Unexpected Request: ~p", [Req], State),
+    {reply, {error, unsupported_req}, State}.
 
-handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{
-                client_id = ClientId, subscriptions = Subscriptions}) ->
+handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId,
+                                                                 subscriptions = Subscriptions}) ->
 
     TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0),
 
@@ -312,18 +327,16 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{
 
             emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]),
 
-            lager:info([{client, ClientId}], "Session(~s): subscribe ~p, Granted QoS: ~p",
-                        [ClientId, TopicTable, GrantedQos]),
+            ?LOG(info, "Subscribe ~p, Granted QoS: ~p", [TopicTable, GrantedQos], Session),
 
             Subscriptions1 =
             lists:foldl(fun({Topic, Qos}, Acc) ->
                             case lists:keyfind(Topic, 1, Acc) of
                                 {Topic, Qos} ->
-                                    lager:warning([{client, ClientId}], "Session(~s): "
-                                                    "resubscribe ~s, qos = ~w", [ClientId, Topic, Qos]), Acc;
+                                    ?LOG(warning, "resubscribe ~s, qos = ~w", [Topic, Qos], Session),
+                                    Acc;
                                 {Topic, OldQos} ->
-                                    lager:warning([{client, ClientId}], "Session(~s): "
-                                                    "resubscribe ~s, old qos=~w, new qos=~w", [ClientId, Topic, OldQos, Qos]),
+                                    ?LOG(warning, "resubscribe ~s, old qos=~w, new qos=~w", [Topic, OldQos, Qos], Session),
                                     lists:keyreplace(Topic, 1, Acc, {Topic, Qos});
                                 false ->
                                     %%TODO: the design is ugly, rewrite later...:(
@@ -353,41 +366,31 @@ handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId,
                         {Topic, _Qos} ->
                             lists:keydelete(Topic, 1, Acc);
                         false ->
-                            lager:warning([{client, ClientId}], "Session(~s) not subscribe ~s", [ClientId, Topic]), Acc
+                            Acc
                     end
                 end, Subscriptions, Topics),
 
     noreply(Session#session{subscriptions = Subscriptions1});
 
 handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) ->
-    lager:warning([{client, ClientId}], "Session(~s) destroyed", [ClientId]),
+    ?LOG(warning, "destroyed", [], Session),
     {stop, {shutdown, destroy}, Session};
 
-handle_cast({resume, ClientId, ClientPid}, Session) ->
-
-    #session{client_id      = ClientId,
-             client_pid     = OldClientPid,
-             inflight_queue = InflightQ,
-             awaiting_ack   = AwaitingAck,
-             awaiting_comp  = AwaitingComp,
-             expired_timer  = ETimer} = Session,
+handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id      = ClientId,
+                                                              client_pid     = OldClientPid,
+                                                              inflight_queue = InflightQ,
+                                                              awaiting_ack   = AwaitingAck,
+                                                              awaiting_comp  = AwaitingComp,
+                                                              expired_timer  = ETimer} = Session) ->
 
-    lager:info([{client, ClientId}], "Session(~s) resumed by ~p", [ClientId, ClientPid]),
+    ?LOG(info, "resumed by ~p", [ClientPid], Session),
 
-    %% cancel expired timer
+    %% Cancel expired timer
     cancel_timer(ETimer),
 
-    %% Kickout old client
-    if
-        OldClientPid == undefined ->
-            ok;
-        OldClientPid == ClientPid ->
-            ok; %% ??
-        true ->
-            lager:error([{client, ClientId}], "Session(~s): ~p kickout ~p",
-                            [ClientId, ClientPid, OldClientPid]),
-            unlink(OldClientPid),
-            OldClientPid ! {stop, duplicate_id, ClientPid}
+    case kick(ClientId, OldClientPid, ClientPid) of
+        ok -> ?LOG(warning, "~p kickout ~p", [ClientPid, OldClientPid], Session);
+        ignore -> ok
     end,
 
     true = link(ClientPid),
@@ -416,19 +419,18 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
     noreply(dequeue(Session2));
 
 %% PUBACK
-handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_ack = AwaitingAck}) ->
+handle_cast({puback, PktId}, Session = #session{awaiting_ack = AwaitingAck}) ->
     case maps:find(PktId, AwaitingAck) of
         {ok, TRef} ->
             cancel_timer(TRef),
             noreply(dequeue(acked(PktId, Session)));
         error ->
-            lager:error([{client, ClientId}], "Session(~s) cannot find PUBACK ~w", [ClientId, PktId]),
+            ?LOG(error, "Cannot find PUBACK: ~p", [PktId], Session),
             noreply(Session)
     end;
 
 %% PUBREC
-handle_cast({pubrec, PktId}, Session = #session{client_id         = ClientId,
-                                                awaiting_ack      = AwaitingAck,
+handle_cast({pubrec, PktId}, Session = #session{awaiting_ack      = AwaitingAck,
                                                 awaiting_comp     = AwaitingComp,
                                                 await_rel_timeout = Timeout}) ->
     case maps:find(PktId, AwaitingAck) of
@@ -439,37 +441,36 @@ handle_cast({pubrec, PktId}, Session = #session{client_id         = ClientId,
             Session1 = acked(PktId, Session#session{awaiting_comp = AwaitingComp1}),
             noreply(dequeue(Session1));
         error ->
-            lager:error([{client, ClientId}], "Session(~s) cannot find PUBREC ~w", [ClientId, PktId]),
+            ?LOG(error, "Cannot find PUBREC: ~p", [PktId], Session),
             noreply(Session)
     end;
 
 %% PUBREL
-handle_cast({pubrel, PktId}, Session = #session{client_id = ClientId,
-                                                awaiting_rel = AwaitingRel}) ->
+handle_cast({pubrel, PktId}, Session = #session{awaiting_rel = AwaitingRel}) ->
     case maps:find(PktId, AwaitingRel) of
         {ok, {Msg, TRef}} ->
             cancel_timer(TRef),
             emqttd_pubsub:publish(Msg),
             noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)});
         error ->
-            lager:error([{client, ClientId}], "Session(~s) cannot find PUBREL ~w", [ClientId, PktId]),
+            ?LOG(error, "Cannot find PUBREL: ~p", [PktId], Session),
             noreply(Session)
     end;
 
 %% PUBCOMP
-handle_cast({pubcomp, PktId}, Session = #session{client_id = ClientId, awaiting_comp = AwaitingComp}) ->
+handle_cast({pubcomp, PktId}, Session = #session{awaiting_comp = AwaitingComp}) ->
     case maps:find(PktId, AwaitingComp) of
         {ok, TRef} ->
             cancel_timer(TRef),
             noreply(Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)});
         error ->
-            lager:error("Session(~s) cannot find PUBCOMP ~w", [ClientId, PktId]),
+            ?LOG(error, "Cannot find PUBCOMP: ~p", [PktId], Session),
             noreply(Session)
     end;
 
 handle_cast(Msg, State) ->
-    lager:error("Unexpected Msg: ~p, State: ~p", [Msg, State]),
-    {noreply, State}.
+    ?LOG(critical, "Unexpected Msg: ~p", [Msg], State),
+    noreply(State).
 
 %% Queue messages when client is offline
 handle_info({dispatch, Msg}, Session = #session{client_pid = undefined,
@@ -483,14 +484,15 @@ handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}},
     ClientPid ! {deliver, Msg},
     noreply(Session);
 
-handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, Session = #session{message_queue = MsgQ})
+handle_info({dispatch, Msg = #mqtt_message{qos = QoS}},
+            Session = #session{message_queue = MsgQ})
     when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
 
     case check_inflight(Session) of
-        true ->
-            {noreply, deliver(Msg, Session)};
+        true  ->
+            noreply(deliver(Msg, Session));
         false ->
-            {noreply, Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}}
+            noreply(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})
     end;
 
 handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined,
@@ -498,81 +500,70 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = unde
     %% just remove awaiting
     noreply(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)});
 
-handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id      = ClientId,
-                                                               inflight_queue = InflightQ,
+handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue = InflightQ,
                                                                awaiting_ack   = AwaitingAck}) ->
-    lager:info("Awaiting Ack Timeout: ~p:", [PktId]),
     case maps:find(PktId, AwaitingAck) of
         {ok, _TRef} ->
             case lists:keyfind(PktId, 1, InflightQ) of
                 {_, Msg} ->
                     noreply(redeliver(Msg, Session));
                 false ->
-                    lager:error([{client, ClientId}], "Session(~s):"
-                                    "Awaiting timeout but Cannot find PktId :~p", [ClientId, PktId]),
+                    ?LOG(error, "AwaitingAck timeout but Cannot find PktId: ~p", [PktId], Session),
                     noreply(dequeue(Session))
                 end;
         error ->
-            lager:error([{client, ClientId}], "Session(~s):"
-                           "Cannot find Awaiting Ack:~p", [ClientId, PktId]),
+            ?LOG(error, "Cannot find AwaitingAck: ~p", [PktId], Session),
             noreply(Session)
     end;
 
-handle_info({timeout, awaiting_rel, PktId}, Session = #session{client_id = ClientId,
-                                                               awaiting_rel = AwaitingRel}) ->
+handle_info({timeout, awaiting_rel, PktId}, Session = #session{awaiting_rel = AwaitingRel}) ->
     case maps:find(PktId, AwaitingRel) of
-        {ok, {Msg, _TRef}} ->
-            lager:error([{client, ClientId}], "Session(~s) AwaitingRel Timout!~n"
-                            "Drop Message:~p", [ClientId, Msg]),
+        {ok, {_Msg, _TRef}} ->
+            ?LOG(error, "AwaitingRel Timout: ~p, Drop Message!", [PktId], Session),
             noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)});
         error ->
-            lager:error([{client, ClientId}], "Session(~s) cannot find AwaitingRel ~w", [ClientId, PktId]),
-            {noreply, Session, hibernate}
+            ?LOG(error, "Cannot find AwaitingRel: ~p", [PktId], Session),
+            noreply(Session)
     end;
 
-handle_info({timeout, awaiting_comp, PktId}, Session = #session{client_id = ClientId,
-                                                                awaiting_comp = Awaiting}) ->
+handle_info({timeout, awaiting_comp, PktId}, Session = #session{awaiting_comp = Awaiting}) ->
     case maps:find(PktId, Awaiting) of
         {ok, _TRef} ->
-            lager:error([{client, ClientId}], "Session(~s) "
-                            "Awaiting PUBCOMP Timout: PktId=~p!", [ClientId, PktId]),
+            ?LOG(error, "Awaiting PUBCOMP Timout: ~p", [PktId], Session),
             noreply(Session#session{awaiting_comp = maps:remove(PktId, Awaiting)});
         error ->
-            lager:error([{client, ClientId}], "Session(~s) "
-                            "Cannot find Awaiting PUBCOMP: PktId=~p", [ClientId, PktId]),
+            ?LOG(error, "Cannot find Awaiting PUBCOMP: ~p", [PktId], Session),
             noreply(Session)
     end;
 
 handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = ClientId}) ->
-    emqttd_sm:register_session(CleanSess, ClientId, info(Session)),
-    {noreply, start_collector(Session), hibernate};
+    emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)),
+    noreply(start_collector(Session));
 
 handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true,
                                                              client_pid = ClientPid}) ->
     {stop, normal, Session};
 
 handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess    = false,
-                                                            client_id     = ClientId,
                                                             client_pid    = ClientPid,
                                                             expired_after = Expires}) ->
-    lager:info("Session(~s): unlink with client ~p: reason=~p",
-                   [ClientId, ClientPid, Reason]),
-    TRef = timer(Expires, session_expired),
+    ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], Session),
+    TRef = timer(Expires, expired),
+    erlang:garbage_collect(), %%TODO: ???
     noreply(Session#session{client_pid = undefined, expired_timer = TRef});
 
-handle_info({'EXIT', Pid, Reason}, Session = #session{client_id  = ClientId,
-                                                      client_pid = ClientPid}) ->
+handle_info({'EXIT', Pid, Reason}, Session = #session{client_pid = ClientPid}) ->
 
-    lager:error("Session(~s): Unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p",
-                    [ClientId, ClientPid, Pid, Reason]),
+    ?LOG(error, "Unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p",
+         [ClientPid, Pid, Reason], Session),
     noreply(Session);
 
-handle_info(session_expired, Session = #session{client_id = ClientId}) ->
-    lager:error("Session(~s) expired, shutdown now.", [ClientId]),
+handle_info(expired, Session) ->
+    ?LOG(info, "expired, shutdown now.", [], Session),
     {stop, {shutdown, expired}, Session};
 
-handle_info(Info, Session = #session{client_id = ClientId}) ->
-    lager:error("Session(~s) unexpected info: ~p", [ClientId, Info]),
+handle_info(Info, Session) ->
+    ?LOG(critical, "Unexpected info: ~p", [Info], Session),
     {noreply, Session}.
 
 terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) ->
@@ -585,6 +576,17 @@ code_change(_OldVsn, Session, _Extra) ->
 %%% Internal functions
 %%%=============================================================================
 
+%%------------------------------------------------------------------------------
+%% Kick old client out
+%%------------------------------------------------------------------------------
+kick(_ClientId, undefined, _Pid) ->
+    ignore;
+kick(_ClientId, Pid, Pid) ->
+    ignore;
+kick(ClientId, OldPid, Pid) ->
+    unlink(OldPid),
+    OldPid ! {shutdown, conflict, {ClientId, Pid}}.
+
 %%------------------------------------------------------------------------------
 %% Check inflight and awaiting_rel
 %%------------------------------------------------------------------------------
@@ -658,7 +660,7 @@ acked(PktId, Session = #session{client_id      = ClientId,
         {_, Msg} ->
             emqttd_broker:foreach_hooks('message.acked', [ClientId, Msg]);
         false ->
-            lager:error("Session(~s): Cannot find acked message: ~p", [PktId])
+            ?LOG(error, "Cannot find acked pktid: ~p", [PktId], Session)
     end,
     Session#session{awaiting_ack   = maps:remove(PktId, Awaiting),
                     inflight_queue = lists:keydelete(PktId, 1, InflightQ)}.
@@ -687,15 +689,15 @@ start_collector(Session = #session{collect_interval = Interval}) ->
     TRef = erlang:send_after(timer:seconds(Interval), self(), collect_info),
     Session#session{collect_timer = TRef}.
 
-info(#session{clean_sess      = CleanSess,
-              subscriptions   = Subscriptions,
-              inflight_queue  = InflightQueue,
-              max_inflight    = MaxInflight,
-              message_queue   = MessageQueue,
-              awaiting_rel    = AwaitingRel,
-              awaiting_ack    = AwaitingAck,
-              awaiting_comp   = AwaitingComp,
-              timestamp       = CreatedAt}) ->
+sess_info(#session{clean_sess      = CleanSess,
+                   subscriptions   = Subscriptions,
+                   inflight_queue  = InflightQueue,
+                   max_inflight    = MaxInflight,
+                   message_queue   = MessageQueue,
+                   awaiting_rel    = AwaitingRel,
+                   awaiting_ack    = AwaitingAck,
+                   awaiting_comp   = AwaitingComp,
+                   timestamp       = CreatedAt}) ->
     Stats = emqttd_mqueue:stats(MessageQueue),
     [{clean_sess,     CleanSess},
      {subscriptions,  Subscriptions},

+ 18 - 18
src/emqttd_sm.erl

@@ -57,7 +57,10 @@
 
 -define(SM_POOL, ?MODULE).
 
--define(SESSION_TIMEOUT, 60000).
+-define(CALL_TIMEOUT, 60000).
+
+-define(LOG(Level, Format, Args, Session),
+            lager:Level("SM(~s): " ++ Format, [Session#mqtt_session.client_id | Args])).
 
 %%%=============================================================================
 %%% Mnesia callbacks
@@ -113,7 +116,7 @@ start_session(CleanSess, ClientId) ->
 lookup_session(ClientId) ->
     case mnesia:dirty_read(session, ClientId) of
         [Session] -> Session;
-        [] -> undefined
+        []        -> undefined
     end.
 
 %%------------------------------------------------------------------------------
@@ -137,13 +140,11 @@ register_session(CleanSess, ClientId, Info) ->
 unregister_session(CleanSess, ClientId) ->
     ets:delete(sesstab(CleanSess), {ClientId, self()}).
 
-sesstab(true)  ->
-    mqtt_transient_session;
-sesstab(false) ->
-    mqtt_persistent_session.
+sesstab(true)  -> mqtt_transient_session;
+sesstab(false) -> mqtt_persistent_session.
 
 call(SM, Req) ->
-    gen_server2:call(SM, Req, ?SESSION_TIMEOUT). %%infinity).
+    gen_server2:call(SM, Req, ?CALL_TIMEOUT). %%infinity).
 
 %%%=============================================================================
 %%% gen_server callbacks
@@ -223,8 +224,8 @@ create_session(CleanSess, ClientId, ClientPid) ->
             case insert_session(Session) of
                 {aborted, {conflict, ConflictPid}} ->
                     %% Conflict with othe node?
-                    lager:error("Session(~s): Conflict with ~p!", [ClientId, ConflictPid]),
-                    {error, conflict};
+                    lager:error("SM(~s): Conflict with ~p", [ClientId, ConflictPid]),
+                    {error, mnesia_conflict};
                 {atomic, ok} ->
                     erlang:monitor(process, SessPid),
                     {ok, SessPid}
@@ -245,8 +246,8 @@ insert_session(Session = #mqtt_session{client_id = ClientId}) ->
       end).
 
 %% Local node
-resume_session(#mqtt_session{client_id = ClientId,
-                             sess_pid  = SessPid}, ClientPid)
+resume_session(Session = #mqtt_session{client_id = ClientId,
+                                       sess_pid  = SessPid}, ClientPid)
     when node(SessPid) =:= node() ->
 
     case is_process_alive(SessPid) of
@@ -254,7 +255,7 @@ resume_session(#mqtt_session{client_id = ClientId,
             emqttd_session:resume(SessPid, ClientId, ClientPid),
             {ok, SessPid};
         false ->
-            lager:error("Session(~s): Cannot resume ~p, it seems already dead!", [ClientId, SessPid]),
+            ?LOG(error, "Cannot resume ~p which seems already dead!", [SessPid], Session),
             {error, session_died}
     end;
 
@@ -265,12 +266,11 @@ resume_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}
         ok ->
             {ok, SessPid};
         {badrpc, nodedown} ->
-            lager:error("Session(~s): Died for node ~s down!", [ClientId, Node]),
+            ?LOG(error, "Session died for node '~s' down", [Node], Session),
             remove_session(Session),
             {error, session_nodedown};
         {badrpc, Reason} ->
-            lager:error("Session(~s): Failed to resume from node ~s for ~p",
-                            [ClientId, Node, Reason]),
+            ?LOG(error, "Failed to resume from node ~s for ~p", [Node, Reason], Session),
             {error, Reason}
     end.
 
@@ -288,11 +288,11 @@ destroy_session(Session = #mqtt_session{client_id = ClientId,
         ok ->
             remove_session(Session);
         {badrpc, nodedown} ->
-            lager:error("Session(~s): Died for node ~s down!", [ClientId, Node]),
+            ?LOG(error, "Node '~s' down", [Node], Session),
             remove_session(Session); 
         {badrpc, Reason} ->
-            lager:error("Session(~s): Failed to destory ~p on remote node ~p for ~s",
-                            [ClientId, SessPid, Node, Reason]),
+            ?LOG(error, "Failed to destory ~p on remote node ~p for ~s",
+                 [SessPid, Node, Reason], Session),
             {error, Reason}
      end.