Explorar o código

Fix followed packet parse failure (#2333)

To fix issue#2303(https://github.com/emqx/emqx/issues/2303)
It will report the following error, when a connection sends a TCP frame contained many of MQTT packet and followed a split MQTT packet.
JianBo He %!s(int64=7) %!d(string=hai) anos
pai
achega
8a73c62f66
Modificáronse 2 ficheiros con 54 adicións e 15 borrados
  1. 11 13
      src/emqx_connection.erl
  2. 43 2
      test/emqx_protocol_SUITE.erl

+ 11 - 13
src/emqx_connection.erl

@@ -192,10 +192,10 @@ idle(enter, _, State) ->
 idle(timeout, _Timeout, State) ->
     {stop, idle_timeout, State};
 
-idle(cast, {incoming, Packet, PState}, _State) ->
+idle(cast, {incoming, Packet}, State) ->
     handle_packet(Packet, fun(NState) ->
                               {next_state, connected, reset_parser(NState)}
-                          end, PState);
+                          end, State);
 
 idle(EventType, Content, State) ->
     ?HANDLE(EventType, Content, State).
@@ -208,12 +208,12 @@ connected(enter, _, _State) ->
     keep_state_and_data;
 
 %% Handle Input
-connected(cast, {incoming, Packet = ?PACKET(Type), PState}, _State) ->
+connected(cast, {incoming, Packet = ?PACKET(Type)}, State) ->
     _ = emqx_metrics:received(Packet),
     (Type == ?PUBLISH) andalso emqx_pd:update_counter(incoming_pubs, 1),
     handle_packet(Packet, fun(NState) ->
-                              {keep_state, reset_parser(NState)}
-                          end, PState);
+                              {keep_state, NState}
+                          end, State);
 
 %% Handle Output
 connected(info, {deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
@@ -373,14 +373,14 @@ terminate(Reason, _StateName, #state{transport = Transport,
 %% Process incoming data
 
 process_incoming(<<>>, Packets, State) ->
-    {keep_state, State, next_events({Packets, State})};
+    {keep_state, State, next_events(Packets)};
 
 process_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
     try emqx_frame:parse(Data, ParseState) of
         {ok, Packet, Rest} ->
             process_incoming(Rest, [Packet|Packets], reset_parser(State));
         {more, NewParseState} ->
-            {keep_state, State#state{parse_state = NewParseState}, next_events({Packets, State})};
+            {keep_state, State#state{parse_state = NewParseState}, next_events(Packets)};
         {error, Reason} ->
             shutdown(Reason, State)
     catch
@@ -392,12 +392,10 @@ process_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
 reset_parser(State = #state{proto_state = ProtoState}) ->
     State#state{parse_state = emqx_protocol:parser(ProtoState)}.
 
-next_events([]) ->
-    [];
-next_events([{Packet, State}]) ->
-    {next_event, cast, {incoming, Packet, State}};
-next_events({Packets, State}) ->
-    [next_events([{Packet, State}]) || Packet <- lists:reverse(Packets)].
+next_events(Packets) when is_list(Packets) ->
+    [next_events(Packet) || Packet <- lists:reverse(Packets)];
+next_events(Packet) ->
+    {next_event, cast, {incoming, Packet}}.
 
 %%------------------------------------------------------------------------------
 %% Handle incoming packet

+ 43 - 2
test/emqx_protocol_SUITE.erl

@@ -36,7 +36,8 @@ all() ->
      {group, mqtt_common},
      {group, mqttv4},
      {group, mqttv5},
-     {group, acl}
+     {group, acl},
+     {group, frame_partial}
     ].
 
 groups() ->
@@ -51,7 +52,9 @@ groups() ->
       [connect_v5,
        subscribe_v5]},
      {acl, [sequence],
-      [acl_deny_action_ct]}].
+      [acl_deny_action_ct]},
+     {frame_partial, [sequence],
+       [handle_followed_packet]}].
 
 init_per_suite(Config) ->
     [start_apps(App, SchemaFile, ConfigFile) ||
@@ -97,6 +100,44 @@ with_connection(DoFun) ->
     %     emqx_client_sock:close(Sock)
     % end.
 
+handle_followed_packet(_Config) ->
+    ConnPkt = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>,
+    PartialPkt1 = <<50,182,1,0,4,116,101,115,116,0,1,48,48,48,48,48,48,48,48,48,48,48,48,48,
+                    48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,
+                    48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,
+                    48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,
+                    48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48>>,
+    PartialPkt2 = <<48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,
+                    48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,
+                    48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48>>,
+
+    %% This is a PUBLISH message (Qos=1)
+    PubPkt = <<PartialPkt1/binary, PartialPkt2/binary>>,
+    ComplexPkt = <<PubPkt/binary, PubPkt/binary, PubPkt/binary, PartialPkt1/binary>>,
+
+    AssertConnAck = fun(R) -> ?assertEqual({ok, <<32,2,0,0>>}, R) end,
+    AssertPubAck  = fun(R) -> ?assertEqual({ok, <<64,2,0,1>>}, R) end,
+
+    {ok, Sock} = gen_tcp:connect("127.0.0.1", 1883, [{active, false}, binary]),
+
+    %% CONNECT
+    ok = gen_tcp:send(Sock, ConnPkt),
+    AssertConnAck(gen_tcp:recv(Sock, 4, 500)),
+
+    %% Once Publish
+    ok = gen_tcp:send(Sock, PubPkt),
+    AssertPubAck(gen_tcp:recv(Sock, 4, 500)),
+
+    %% Complex Packet
+    ok = gen_tcp:send(Sock, ComplexPkt),
+    AssertPubAck(gen_tcp:recv(Sock, 4, 500)),
+    AssertPubAck(gen_tcp:recv(Sock, 4, 500)),
+    AssertPubAck(gen_tcp:recv(Sock, 4, 500)),
+
+    ok = gen_tcp:send(Sock, PartialPkt2),
+    AssertPubAck(gen_tcp:recv(Sock, 4, 500)),
+    gen_tcp:close(Sock).
+
 connect_v4(_) ->
     with_connection(fun([Sock]) ->
                             emqx_client_sock:send(Sock, raw_send_serialize(?PACKET(?PUBLISH))),