Feng пре 10 година
родитељ
комит
2af91ea140
1 измењених фајлова са 21 додато и 23 уклоњено
  1. 21 23
      src/emqttd_session.erl

+ 21 - 23
src/emqttd_session.erl

@@ -81,9 +81,6 @@
         %% Client Pid bind with session
         %% Client Pid bind with session
         client_pid  :: pid(),
         client_pid  :: pid(),
 
 
-        %% Client Monitor
-        client_mon  :: reference(),
-
         %% Last packet id of the session
         %% Last packet id of the session
 		packet_id = 1,
 		packet_id = 1,
         
         
@@ -224,7 +221,8 @@ unsubscribe(SessPid, Topics) ->
 %%%=============================================================================
 %%%=============================================================================
 
 
 init([CleanSess, ClientId, ClientPid]) ->
 init([CleanSess, ClientId, ClientPid]) ->
-    %% process_flag(trap_exit, true),
+    process_flag(trap_exit, true),
+    true    = link(ClientPid),
     QEnv    = emqttd:env(mqtt, queue),
     QEnv    = emqttd:env(mqtt, queue),
     SessEnv = emqttd:env(mqtt, session),
     SessEnv = emqttd:env(mqtt, session),
     Session = #session{
     Session = #session{
@@ -245,10 +243,8 @@ init([CleanSess, ClientId, ClientPid]) ->
             collect_interval  = emqttd_opts:g(collect_interval, SessEnv, 0),
             collect_interval  = emqttd_opts:g(collect_interval, SessEnv, 0),
             timestamp         = os:timestamp()},
             timestamp         = os:timestamp()},
     emqttd_sm:register_session(CleanSess, ClientId, info(Session)),
     emqttd_sm:register_session(CleanSess, ClientId, info(Session)),
-    %% monitor client
-    MRef = erlang:monitor(process, ClientPid),
     %% start statistics
     %% start statistics
-    {ok, start_collector(Session#session{client_mon = MRef}), hibernate}.
+    {ok, start_collector(Session), hibernate}.
 
 
 prioritise_call(Msg, _From, _Len, _State) ->
 prioritise_call(Msg, _From, _Len, _State) ->
     case Msg of _  -> 0 end.
     case Msg of _  -> 0 end.
@@ -268,7 +264,6 @@ prioritise_cast(Msg, _Len, _State) ->
 
 
 prioritise_info(Msg, _Len, _State) ->
 prioritise_info(Msg, _Len, _State) ->
     case Msg of
     case Msg of
-        {'DOWN', _, _, _, _} -> 10;
         {'EXIT', _, _}  -> 10;
         {'EXIT', _, _}  -> 10;
         session_expired -> 10;
         session_expired -> 10;
         {timeout, _, _} -> 5;
         {timeout, _, _} -> 5;
@@ -368,7 +363,6 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
 
 
     #session{client_id      = ClientId,
     #session{client_id      = ClientId,
              client_pid     = OldClientPid,
              client_pid     = OldClientPid,
-             client_mon     = MRef,
              inflight_queue = InflightQ,
              inflight_queue = InflightQ,
              awaiting_ack   = AwaitingAck,
              awaiting_ack   = AwaitingAck,
              awaiting_comp  = AwaitingComp,
              awaiting_comp  = AwaitingComp,
@@ -388,10 +382,12 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
         true ->
         true ->
             lager:error([{client, ClientId}], "Session(~s): ~p kickout ~p",
             lager:error([{client, ClientId}], "Session(~s): ~p kickout ~p",
                             [ClientId, ClientPid, OldClientPid]),
                             [ClientId, ClientPid, OldClientPid]),
-            OldClientPid ! {stop, duplicate_id, ClientPid},
-            erlang:demonitor(MRef, [flush])
+            unlink(OldClientPid),
+            OldClientPid ! {stop, duplicate_id, ClientPid}
     end,
     end,
 
 
+    true = link(ClientPid),
+
     %% Redeliver PUBREL
     %% Redeliver PUBREL
     [ClientPid ! {redeliver, {?PUBREL, PktId}} || PktId <- maps:keys(AwaitingComp)],
     [ClientPid ! {redeliver, {?PUBREL, PktId}} || PktId <- maps:keys(AwaitingComp)],
 
 
@@ -402,7 +398,6 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
     [cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)],
     [cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)],
 
 
     Session1 = Session#session{client_pid    = ClientPid,
     Session1 = Session#session{client_pid    = ClientPid,
-                               client_mon    = erlang:monitor(process, ClientPid),
                                awaiting_ack  = #{},
                                awaiting_ack  = #{},
                                awaiting_comp = #{},
                                awaiting_comp = #{},
                                expired_timer = undefined},
                                expired_timer = undefined},
@@ -548,21 +543,24 @@ handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id =
     emqttd_sm:register_session(CleanSess, ClientId, info(Session)),
     emqttd_sm:register_session(CleanSess, ClientId, info(Session)),
     {noreply, start_collector(Session), hibernate};
     {noreply, start_collector(Session), hibernate};
 
 
-handle_info({'DOWN', _MRef, process, ClientPid, _}, Session = #session{clean_sess = true,
-                                                                       client_pid = ClientPid}) ->
+handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true,
+                                                             client_pid = ClientPid}) ->
     {stop, normal, Session};
     {stop, normal, Session};
 
 
-handle_info({'DOWN', _MRef, process, ClientPid, _}, Session = #session{clean_sess = false,
-                                                                       client_pid = ClientPid,
-                                                                       expired_after = Expires}) ->
+handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess    = false,
+                                                            client_id     = ClientId,
+                                                            client_pid    = ClientPid,
+                                                            expired_after = Expires}) ->
+    lager:info("Session(~s): unlink with client ~p: reason=~p",
+                   [ClientId, ClientPid, Reason]),
     TRef = timer(Expires, session_expired),
     TRef = timer(Expires, session_expired),
-    noreply(Session#session{client_pid = undefined, client_mon = undefined, expired_timer = TRef});
+    noreply(Session#session{client_pid = undefined, expired_timer = TRef});
+
+handle_info({'EXIT', Pid, Reason}, Session = #session{client_id  = ClientId,
+                                                      client_pid = ClientPid}) ->
 
 
-handle_info({'DOWN', _MRef, process, Pid, Reason}, Session = #session{client_id  = ClientId,
-                                                                      client_pid = ClientPid}) ->
-    lager:error([{client, ClientId}], "Session(~s): unexpected DOWN: "
-                    "client_pid=~p, down_pid=~p, reason=~p",
-                        [ClientId, ClientPid, Pid, Reason]),
+    lager:error("Session(~s): Unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p",
+                    [ClientId, ClientPid, Pid, Reason]),
     noreply(Session);
     noreply(Session);
 
 
 handle_info(session_expired, Session = #session{client_id = ClientId}) ->
 handle_info(session_expired, Session = #session{client_id = ClientId}) ->