Просмотр исходного кода

Merge pull request #6393 from emqx/fix-takeover-race-on-enqueued-messages

Tobias Lindahl 4 лет назад
Родитель
Сommit
92f116afa4
1 измененных файлов с 15 добавлено и 9 удалено
  1. 15 9
      apps/emqx/src/emqx_channel.erl

+ 15 - 9
apps/emqx/src/emqx_channel.erl

@@ -782,7 +782,21 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel.
 
 -spec(handle_deliver(list(emqx_types:deliver()), channel())
       -> {ok, channel()} | {ok, replies(), channel()}).
+
+handle_deliver(Delivers, Channel = #channel{takeover = true,
+                                            pendings = Pendings,
+                                            session = Session,
+                                            clientinfo = #{clientid := ClientId}}) ->
+    %% NOTE: Order is important here. While the takeover is in
+    %% progress, the session cannot enqueue messages, since it already
+    %% passed on the queue to the new connection in the session state.
+    NPendings = lists:append(
+                  Pendings,
+                  emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)),
+    {ok, Channel#channel{pendings = NPendings}};
+
 handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
+                                            takeover   = false,
                                             session    = Session,
                                             clientinfo = #{clientid := ClientId}}) ->
     Delivers1 = maybe_nack(Delivers),
@@ -793,16 +807,8 @@ handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
     maybe_mark_as_delivered(Session, Delivers),
     {ok, NChannel};
 
-handle_deliver(Delivers, Channel = #channel{takeover = true,
-                                            pendings = Pendings,
-                                            session = Session,
-                                            clientinfo = #{clientid := ClientId}}) ->
-    NPendings = lists:append(
-                  Pendings,
-                  emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)),
-    {ok, Channel#channel{pendings = NPendings}};
-
 handle_deliver(Delivers, Channel = #channel{session = Session,
+                                            takeover   = false,
                                             clientinfo = #{clientid := ClientId}
                                            }) ->
     case emqx_session:deliver(emqx_session:ignore_local(Delivers, ClientId, Session), Session) of