Просмотр исходного кода

Refix code about mqtt spec (#2806)

Refix code about mqtt spec
tigercl 6 лет назад
Родитель
Сommit
749ef823ab
3 измененных файлов с 10 добавлено и 16 удалено
  1. 6 13
      src/emqx_channel.erl
  2. 2 2
      src/emqx_cm.erl
  3. 2 1
      src/emqx_connection.erl

+ 6 - 13
src/emqx_channel.erl

@@ -280,7 +280,7 @@ handle_in(?CONNECT_PACKET(
             handle_out(connack, ReasonCode, NChannel)
     end;
 
-handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel = #channel{proto_ver = Ver}) ->
+handle_in(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId), Channel = #channel{proto_ver = Ver}) ->
     case pipeline([fun validate_in/2,
                    fun process_alias/2,
                    fun check_publish/2], Packet, Channel) of
@@ -289,11 +289,7 @@ handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel = #channel{pro
         {error, ReasonCode, NChannel} ->
             ?LOG(warning, "Cannot publish message to ~s due to ~s",
                  [Topic, emqx_reason_codes:text(ReasonCode, Ver)]),
-            case QoS of
-                ?QOS_0 -> handle_out(puberr, ReasonCode, NChannel);
-                ?QOS_1 -> handle_out(puback, {PacketId, ReasonCode}, NChannel);
-                ?QOS_2 -> handle_out(pubrec, {PacketId, ReasonCode}, NChannel)
-            end
+                 handle_out(disconnect, ReasonCode, NChannel)
     end;
 
 %%TODO: How to handle the ReasonCode?
@@ -845,13 +841,10 @@ check_will_retain(#mqtt_packet_connect{will_retain = true},
 %%--------------------------------------------------------------------
 
 enrich_client(ConnPkt, Channel) ->
-    case pipeline([fun set_username/2,
-                   fun maybe_use_username_as_clientid/2,
-                   fun maybe_assign_clientid/2,
-                   fun set_rest_client_fields/2], ConnPkt, Channel) of
-        {ok, NConnPkt, NChannel} -> {ok, NConnPkt, NChannel};
-        Error -> Error
-    end.
+    pipeline([fun set_username/2,
+              fun maybe_use_username_as_clientid/2,
+              fun maybe_assign_clientid/2,
+              fun set_rest_client_fields/2], ConnPkt, Channel).
 
 maybe_use_username_as_clientid(_ConnPkt, Channel = #channel{client = #{username := undefined}}) ->
     {ok, Channel};

+ 2 - 2
src/emqx_cm.erl

@@ -189,7 +189,7 @@ takeover_session(ClientId) ->
             takeover_session(ClientId, ChanPid);
         ChanPids ->
             [ChanPid|StalePids] = lists:reverse(ChanPids),
-            ?LOG(error, "[SM] More than one channel found: ~p", [ChanPids]),
+            ?LOG(error, "More than one channel found: ~p", [ChanPids]),
             lists:foreach(fun(StalePid) ->
                                   catch discard_session(ClientId, StalePid)
                           end, StalePids),
@@ -220,7 +220,7 @@ discard_session(ClientId) when is_binary(ClientId) ->
                           discard_session(ClientId, ChanPid)
                       catch
                           _:Error:_Stk ->
-                              ?LOG(error, "[SM] Failed to discard ~p: ~p", [ChanPid, Error])
+                              ?LOG(error, "Failed to discard ~p: ~p", [ChanPid, Error])
                       end
               end, ChanPids)
     end.

+ 2 - 1
src/emqx_connection.erl

@@ -257,7 +257,8 @@ connected(enter, _PrevSt, State = #state{chan_state = ChanState}) ->
 
 connected(cast, {incoming, Packet = ?PACKET(?CONNECT)}, State) ->
     ?LOG(warning, "Unexpected connect: ~p", [Packet]),
-    shutdown(unexpected_incoming_connect, State);
+    Shutdown = fun(NewSt) -> shutdown(?RC_PROTOCOL_ERROR, NewSt) end,
+    handle_outgoing(?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), Shutdown, State);
 
 connected(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) ->
     handle_incoming(Packet, fun keep_state/1, State);