Просмотр исходного кода

fix(emqx_channel): fix race condition in session takeover

Sessions must not enqueue messages when another process is taking over
the client id, since it already passed on the message queue in the
session state.

Without this fix, messages arriving after `{takeover, 'begin'} to a
channel with no connection (i.e., a persistent session) would be lost.
Tobias Lindahl 4 лет назад
Родитель
Сommit
2348e612fa
1 измененных файлов с 11 добавлено и 6 удалено
  1. 11 6
      src/emqx_channel.erl

+ 11 - 6
src/emqx_channel.erl

@@ -720,20 +720,25 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel.
 
 
 -spec(handle_deliver(list(emqx_types:deliver()), channel())
 -spec(handle_deliver(list(emqx_types:deliver()), channel())
       -> {ok, channel()} | {ok, replies(), channel()}).
       -> {ok, channel()} | {ok, replies(), channel()}).
-handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
-                                            session    = Session,
-                                            clientinfo = #{clientid := ClientId}}) ->
-    NSession = emqx_session:enqueue(ignore_local(maybe_nack(Delivers), ClientId, Session), Session),
-    {ok, Channel#channel{session = NSession}};
-
 handle_deliver(Delivers, Channel = #channel{takeover = true,
 handle_deliver(Delivers, Channel = #channel{takeover = true,
                                             pendings = Pendings,
                                             pendings = Pendings,
                                             session = Session,
                                             session = Session,
                                             clientinfo = #{clientid := ClientId}}) ->
                                             clientinfo = #{clientid := ClientId}}) ->
+    %% NOTE: Order is important here. While the takeover is in
+    %% progress, the session cannot enqueue messages, since it already
+    %% passed on the queue to the new connection in the session state.
     NPendings = lists:append(Pendings, ignore_local(maybe_nack(Delivers), ClientId, Session)),
     NPendings = lists:append(Pendings, ignore_local(maybe_nack(Delivers), ClientId, Session)),
     {ok, Channel#channel{pendings = NPendings}};
     {ok, Channel#channel{pendings = NPendings}};
 
 
+handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
+                                            takeover   = false,
+                                            session    = Session,
+                                            clientinfo = #{clientid := ClientId}}) ->
+    NSession = emqx_session:enqueue(ignore_local(maybe_nack(Delivers), ClientId, Session), Session),
+    {ok, Channel#channel{session = NSession}};
+
 handle_deliver(Delivers, Channel = #channel{session = Session,
 handle_deliver(Delivers, Channel = #channel{session = Session,
+                                            takeover = false,
                                             clientinfo = #{clientid := ClientId}}) ->
                                             clientinfo = #{clientid := ClientId}}) ->
     case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of
     case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of
         {ok, Publishes, NSession} ->
         {ok, Publishes, NSession} ->