Просмотр исходного кода

Ensure the 'CONNACK' and 'DISCONNECT' packets delivered.

Feng Lee 6 лет назад
Родитель
Сommit
03744ead30
3 измененных файлов с 12 добавлено и 3 удалено
  1. 3 0
      src/emqx_channel.erl
  2. 7 3
      src/emqx_protocol.erl
  3. 2 0
      src/emqx_ws_channel.erl

+ 3 - 0
src/emqx_channel.erl

@@ -516,6 +516,9 @@ handle_incoming(Packet = ?PACKET(Type), SuccFun,
                             State#state{proto_state = NProtoState});
         {error, Reason, NProtoState} ->
             shutdown(Reason, State#state{proto_state = NProtoState});
+        {error, Reason, OutPacket, NProtoState} ->
+            Shutdown = fun(NewSt) -> shutdown(Reason, NewSt) end,
+            handle_outgoing(OutPacket, Shutdown, State#state{proto_state = NProtoState});
         {stop, Error, NProtoState} ->
             stop(Error, State#state{proto_state = NProtoState})
     end.

+ 7 - 3
src/emqx_protocol.erl

@@ -416,11 +416,15 @@ handle_out({unsuback, PacketId, ReasonCodes}, PState = #protocol{proto_ver = ?MQ
 handle_out({unsuback, PacketId, _ReasonCodes}, PState) ->
     {ok, ?UNSUBACK_PACKET(PacketId), PState};
 
-handle_out({disconnect, ReasonCode}, PState) ->
-    {ok, PState};
+handle_out({disconnect, ReasonCode}, PState = #protocol{proto_ver = ?MQTT_PROTO_V5}) ->
+    Reason = emqx_reason_codes:name(ReasonCode),
+    {error, Reason, ?DISCONNECT_PACKET(ReasonCode), PState};
+
+handle_out({disconnect, ReasonCode}, PState = #protocol{proto_ver = ProtoVer}) ->
+    {error, emqx_reason_codes:name(ReasonCode, ProtoVer), PState};
 
 handle_out(Packet, PState) ->
-    io:format("Out: ~p~n", [Packet]),
+    ?LOG(error, "Unexpected out:~p", [Packet]),
     {ok, PState}.
 
 %%--------------------------------------------------------------------

+ 2 - 0
src/emqx_ws_channel.erl

@@ -436,6 +436,8 @@ handle_incoming(Packet = ?PACKET(Type), SuccFun,
             SuccFun(enqueue(OutPackets, State#state{proto_state = NProtoState}));
         {error, Reason, NProtoState} ->
             stop(Reason, State#state{proto_state = NProtoState});
+        {error, Reason, OutPacket, NProtoState} ->
+            stop(Reason, enqueue(OutPacket, State#state{proto_state = NProtoState}));
         {stop, Error, NProtoState} ->
             stop(Error, State#state{proto_state = NProtoState})
     end.