Browse Source

fix(wsconn): properly classify outgoing packets

Prior to this commit, outgoing packets ended up treated as "events",
making them postponed by appending to the process mailbox. As an
example, this caused the client to miss DISCONNECT packets, because
this behaviour was causing the WS socket to be closed _before_
DISCONNECT packet had a chance to be sent.
Andrew Mayorov 1 năm trước cách đây
mục cha
commit
28cd383fae
1 tập tin đã thay đổi với 16 bổ sung6 xóa
  1. 16 6
      apps/emqx/src/emqx_ws_connection.erl

+ 16 - 6
apps/emqx/src/emqx_ws_connection.erl

@@ -129,6 +129,8 @@
 %% Info, Stats
 %% Info, Stats
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
+-type info() :: atom() | {channel, _Info}.
+
 -spec info(pid() | state()) -> emqx_types:infos().
 -spec info(pid() | state()) -> emqx_types:infos().
 info(WsPid) when is_pid(WsPid) ->
 info(WsPid) when is_pid(WsPid) ->
     call(WsPid, info);
     call(WsPid, info);
@@ -140,8 +142,8 @@ info(State = #state{channel = Channel}) ->
     ChanInfo#{sockinfo => SockInfo}.
     ChanInfo#{sockinfo => SockInfo}.
 
 
 -spec info
 -spec info
-    (_Info :: atom(), state()) -> _Value;
-    ([Info], state()) -> [{Info, _Value}] when Info :: atom().
+    (info(), state()) -> _Value;
+    (info(), state()) -> [{atom(), _Value}].
 info(Keys, State) when is_list(Keys) ->
 info(Keys, State) when is_list(Keys) ->
     [{Key, info(Key, State)} || Key <- Keys];
     [{Key, info(Key, State)} || Key <- Keys];
 info(socktype, _State) ->
 info(socktype, _State) ->
@@ -163,7 +165,9 @@ info(postponed, #state{postponed = Postponed}) ->
 info(stats_timer, #state{stats_timer = TRef}) ->
 info(stats_timer, #state{stats_timer = TRef}) ->
     TRef;
     TRef;
 info(idle_timer, #state{idle_timer = TRef}) ->
 info(idle_timer, #state{idle_timer = TRef}) ->
-    TRef.
+    TRef;
+info({channel, Info}, #state{channel = Channel}) ->
+    emqx_channel:info(Info, Channel).
 
 
 -spec stats(pid() | state()) -> emqx_types:stats().
 -spec stats(pid() | state()) -> emqx_types:stats().
 stats(WsPid) when is_pid(WsPid) ->
 stats(WsPid) when is_pid(WsPid) ->
@@ -1004,9 +1008,15 @@ return(State = #state{postponed = Postponed}) ->
 
 
 classify([], Packets, Cmds, Events) ->
 classify([], Packets, Cmds, Events) ->
     {Packets, Cmds, Events};
     {Packets, Cmds, Events};
-classify([Packet | More], Packets, Cmds, Events) when
-    is_record(Packet, mqtt_packet)
-->
+classify([{outgoing, Outgoing} | More], Packets, Cmds, Events) ->
+    case is_list(Outgoing) of
+        true -> NPackets = Outgoing ++ Packets;
+        false -> NPackets = [Outgoing | Packets]
+    end,
+    classify(More, NPackets, Cmds, Events);
+classify([{connack, Packet} | More], Packets, Cmds, Events) ->
+    classify(More, [Packet | Packets], Cmds, Events);
+classify([Packet = #mqtt_packet{} | More], Packets, Cmds, Events) ->
     classify(More, [Packet | Packets], Cmds, Events);
     classify(More, [Packet | Packets], Cmds, Events);
 classify([Cmd = {active, _} | More], Packets, Cmds, Events) ->
 classify([Cmd = {active, _} | More], Packets, Cmds, Events) ->
     classify(More, Packets, [Cmd | Cmds], Events);
     classify(More, Packets, [Cmd | Cmds], Events);