فهرست منبع

fix issue #357 - Cannot kick transient client out when clientId collision

Feng 10 سال پیش
والد
کامیت
59b401c506
2فایلهای تغییر یافته به همراه56 افزوده شده و 15 حذف شده
  1. 44 15
      src/emqttd_cm.erl
  2. 12 0
      src/emqttd_session.erl

+ 44 - 15
src/emqttd_cm.erl

@@ -41,6 +41,9 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
 
+%% gen_server2 priorities
+-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]).
+
 -record(state, {id, statsfun}).
 
 -define(CM_POOL, ?MODULE).
@@ -101,8 +104,21 @@ init([Id, StatsFun]) ->
     gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}),
     {ok, #state{id = Id, statsfun = StatsFun}}.
 
+prioritise_call(_Req, _From, _Len, _State) ->
+    1.
+
+prioritise_cast(Msg, _Len, _State) ->
+    case Msg of
+        {register, _Client}           -> 2;
+        {unregister, _ClientId, _Pid} -> 3;
+        _                             -> 1
+    end.
+
+prioritise_info(_Msg, _Len, _State) ->
+    1.
+
 handle_call(Req, _From, State) ->
-    lager:error("unexpected request: ~p", [Req]),
+    lager:error("Unexpected request: ~p", [Req]),
     {reply, {error, unsupported_req}, State}.
 
 handle_cast({register, Client = #mqtt_client{client_id  = ClientId,
@@ -110,32 +126,45 @@ handle_cast({register, Client = #mqtt_client{client_id  = ClientId,
 	case ets:lookup(mqtt_client, ClientId) of
         [#mqtt_client{client_pid = Pid}] ->
             ignore;
-		[#mqtt_client{client_pid = OldPid}] ->
-            %% TODO: should cancel monitor
-            ?LOG(warning, "client ~p conflict with ~p", [Pid, OldPid], Client);
-		[] -> 
+        [#mqtt_client{client_pid = _OldPid, client_mon = MRef}] ->
+            %% demonitor
+            erlang:demonitor(MRef, [flush]);
+        [] ->
             ok
 	end,
-    ets:insert(mqtt_client, Client),
+    ets:insert(mqtt_client, Client#mqtt_client{client_mon = erlang:monitor(process, Pid)}),
     {noreply, setstats(State)};
 
 handle_cast({unregister, ClientId, Pid}, State) ->
 	case ets:lookup(mqtt_client, ClientId) of
-	[#mqtt_client{client_pid = Pid}] ->
-		ets:delete(mqtt_client, ClientId);
-	[_] ->
-		ignore;
-	[] ->
-        lager:warning("CM(~s): Cannot find registered pid ~p", [ClientId, Pid])
-	end,
-	{noreply, setstats(State)};
+        [#mqtt_client{client_pid = Pid, client_mon = MRef}] ->
+            erlang:demonitor(MRef, [flush]),
+            ets:delete(mqtt_client, ClientId),
+            {noreply, setstats(State)};
+        [_] ->
+            {noreply, State};
+        [] ->
+            lager:warning("CM(~s): Cannot find pid ~p", [ClientId, Pid]),
+            {noreply, State}
+    end;
 
 handle_cast(Msg, State) ->
     lager:error("Unexpected Msg: ~p", [Msg]),
     {noreply, State}.
 
+handle_info({'DOWN', MRef, process, DownPid, Reason}, State) ->
+    MP = #mqtt_client{client_pid = DownPid, client_mon = MRef, _ = '_'},
+    case ets:match_object(mqtt_client, MP) of
+        [Client] ->
+            ?LOG(warning, "client ~p DOWN for ~p", [DownPid, Reason], Client),
+            ets:delete_object(mqtt_client, Client);
+        [] ->
+            ignore
+    end,
+    {noreply, setstats(State)};
+
 handle_info(Info, State) ->
-    lager:error("Unexpected Msg: ~p", [Info]),
+    lager:error("Unexpected Info: ~p", [Info]),
     {noreply, State}.
 
 terminate(_Reason, #state{id = Id}) ->

+ 12 - 0
src/emqttd_session.erl

@@ -378,6 +378,7 @@ handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) ->
 
 handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id      = ClientId,
                                                               client_pid     = OldClientPid,
+                                                              clean_sess     = CleanSess,
                                                               inflight_queue = InflightQ,
                                                               awaiting_ack   = AwaitingAck,
                                                               awaiting_comp  = AwaitingComp,
@@ -405,10 +406,21 @@ handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id      = C
     [cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)],
 
     Session1 = Session#session{client_pid    = ClientPid,
+                               clean_sess    = false,
                                awaiting_ack  = #{},
                                awaiting_comp = #{},
                                expired_timer = undefined},
 
+    %% CleanSess: true -> false?
+    if
+        CleanSess =:= true  ->
+            ?LOG(warning, "CleanSess changed to false.", [], Session),
+            emqttd_sm:unregister_session(CleanSess, ClientId),
+            emqttd_sm:register_session(false, ClientId, sess_info(Session1));
+        CleanSess =:= false ->
+            ok
+    end,
+
     %% Redeliver inflight messages
     Session2 =
     lists:foldl(fun({_Id, Msg}, Sess) ->