Просмотр исходного кода

fix(emqx_frame): no need to split incoming bytes

Prior to this commit, there was a bug in emqx_frame:split/2
the tail number of bytes was used for header number of bytes
whens split. As a result, if the tail happens to be longer
then haeder, the parsing state becomes invalid and it crashes
when the next packet arrives

The split was a over-engineered micro-optimization, so it
has been deleted instead of fixed
Zaiming Shi 4 лет назад
Родитель
Сommit
979e495a1e
3 измененных файлов с 30 добавлено и 22 удалено
  1. 2 0
      src/emqx.appup.src
  2. 8 21
      src/emqx_frame.erl
  3. 20 1
      test/emqx_frame_SUITE.erl

+ 2 - 0
src/emqx.appup.src

@@ -3,6 +3,7 @@
  [
  [
    {"4.3.1", [
    {"4.3.1", [
      {load_module, emqx_connection, brutal_purge, soft_purge, []},
      {load_module, emqx_connection, brutal_purge, soft_purge, []},
+     {load_module, emqx_frame, brutal_purge, soft_purge, []},
      {load_module, emqx_cm, brutal_purge, soft_purge, []},
      {load_module, emqx_cm, brutal_purge, soft_purge, []},
      {load_module, emqx_congestion, brutal_purge, soft_purge, []},
      {load_module, emqx_congestion, brutal_purge, soft_purge, []},
      {load_module, emqx_node_dump, brutal_purge, soft_purge, []},
      {load_module, emqx_node_dump, brutal_purge, soft_purge, []},
@@ -30,6 +31,7 @@
  [
  [
    {"4.3.1", [
    {"4.3.1", [
      {load_module, emqx_connection, brutal_purge, soft_purge, []},
      {load_module, emqx_connection, brutal_purge, soft_purge, []},
+     {load_module, emqx_frame, brutal_purge, soft_purge, []},
      {load_module, emqx_cm, brutal_purge, soft_purge, []},
      {load_module, emqx_cm, brutal_purge, soft_purge, []},
      {load_module, emqx_congestion, brutal_purge, soft_purge, []},
      {load_module, emqx_congestion, brutal_purge, soft_purge, []},
      {load_module, emqx_node_dump, brutal_purge, soft_purge, []},
      {load_module, emqx_node_dump, brutal_purge, soft_purge, []},

+ 8 - 21
src/emqx_frame.erl

@@ -121,17 +121,8 @@ parse(Bin, {{body, #{hdr := Header,
                      len := Length,
                      len := Length,
                      rest := Body}
                      rest := Body}
              }, Options}) when is_binary(Bin) ->
              }, Options}) when is_binary(Bin) ->
-    BodyBytes = body_bytes(Body),
-    {NewBodyPart, Tail} = split(BodyBytes + size(Bin) - Length, Bin),
-    NewBody = append_body(Body, NewBodyPart),
-    parse_frame(NewBody, Tail, Header, Length, Options).
-
-%% split given binary with the first N bytes
-split(N, Bin) when N =< 0 ->
-    {Bin, <<>>};
-split(N, Bin) when N =< size(Bin) ->
-    <<H:N/binary, T/binary>> = Bin,
-    {H, T}.
+    NewBody = append_body(Body, Bin),
+    parse_frame(NewBody, Header, Length, Options).
 
 
 parse_remaining_len(<<>>, Header, Options) ->
 parse_remaining_len(<<>>, Header, Options) ->
     {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}};
     {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}};
@@ -178,19 +169,15 @@ append_body(H, T) when is_binary(H) ->
 append_body(?Q(Bytes, Q), T) ->
 append_body(?Q(Bytes, Q), T) ->
     ?Q(Bytes + iolist_size(T), queue:in(T, Q)).
     ?Q(Bytes + iolist_size(T), queue:in(T, Q)).
 
 
-flatten_body(Body, Tail) when is_binary(Body) -> <<Body/binary, Tail/binary>>;
-flatten_body(?Q(_, Q), Tail) -> iolist_to_binary([queue:to_list(Q), Tail]).
+flatten_body(Body) when is_binary(Body) -> Body;
+flatten_body(?Q(_, Q)) -> iolist_to_binary(queue:to_list(Q)).
 
 
+parse_frame(Body, Header, 0, Options) ->
+    {ok, packet(Header), flatten_body(Body), ?none(Options)};
 parse_frame(Body, Header, Length, Options) ->
 parse_frame(Body, Header, Length, Options) ->
-    %% already appended
-    parse_frame(Body, _SplitTail = <<>>, Header, Length, Options).
-
-parse_frame(Body, Tail, Header, 0, Options) ->
-    {ok, packet(Header), flatten_body(Body, Tail), ?none(Options)};
-parse_frame(Body, Tail, Header, Length, Options) ->
     case body_bytes(Body) >= Length of
     case body_bytes(Body) >= Length of
         true ->
         true ->
-            <<FrameBin:Length/binary, Rest/binary>> = flatten_body(Body, Tail),
+            <<FrameBin:Length/binary, Rest/binary>> = flatten_body(Body),
             case parse_packet(Header, FrameBin, Options) of
             case parse_packet(Header, FrameBin, Options) of
                 {Variable, Payload} ->
                 {Variable, Payload} ->
                     {ok, packet(Header, Variable, Payload), Rest, ?none(Options)};
                     {ok, packet(Header, Variable, Payload), Rest, ?none(Options)};
@@ -202,7 +189,7 @@ parse_frame(Body, Tail, Header, Length, Options) ->
         false ->
         false ->
             {more, {{body, #{hdr => Header,
             {more, {{body, #{hdr => Header,
                              len => Length,
                              len => Length,
-                             rest => append_body(Body, Tail)
+                             rest => Body
                             }}, Options}}
                             }}, Options}}
     end.
     end.
 
 

+ 20 - 1
test/emqx_frame_SUITE.erl

@@ -58,7 +58,8 @@ groups() ->
        t_serialize_parse_connack_v5
        t_serialize_parse_connack_v5
       ]},
       ]},
      {publish, [parallel],
      {publish, [parallel],
-      [t_serialize_parse_qos0_publish,
+      [t_parse_sticky_frames,
+       t_serialize_parse_qos0_publish,
        t_serialize_parse_qos1_publish,
        t_serialize_parse_qos1_publish,
        t_serialize_parse_qos2_publish,
        t_serialize_parse_qos2_publish,
        t_serialize_parse_publish_v5
        t_serialize_parse_publish_v5
@@ -286,6 +287,24 @@ t_serialize_parse_connack_v5(_) ->
     Packet = ?CONNACK_PACKET(?RC_SUCCESS, 0, Props),
     Packet = ?CONNACK_PACKET(?RC_SUCCESS, 0, Props),
     ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
     ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
 
 
+t_parse_sticky_frames(_) ->
+    Payload = lists:duplicate(10, 0),
+    P = #mqtt_packet{header = #mqtt_packet_header{type   = ?PUBLISH,
+                                                  dup    = false,
+                                                  qos    = ?QOS_0,
+                                                  retain = false},
+                     variable = #mqtt_packet_publish{topic_name = <<"a/b">>,
+                                                     packet_id  = undefined},
+                     payload  = iolist_to_binary(Payload)
+                    },
+    Bin = serialize_to_binary(P),
+    Size = size(Bin),
+    <<H:(Size-2)/binary, TailTwoBytes/binary>> = Bin,
+    {more, PState1} = emqx_frame:parse(H), %% needs 2 more bytes
+    %% feed 3 bytes as if the next 1 byte belongs to the next packet.
+    {ok, _, <<42>>, PState2} = emqx_frame:parse(iolist_to_binary([TailTwoBytes, 42]), PState1),
+    ?assertMatch({none, _}, PState2).
+
 t_serialize_parse_qos0_publish(_) ->
 t_serialize_parse_qos0_publish(_) ->
     Bin = <<48,14,0,7,120,120,120,47,121,121,121,104,101,108,108,111>>,
     Bin = <<48,14,0,7,120,120,120,47,121,121,121,104,101,108,108,111>>,
     Packet = #mqtt_packet{header   = #mqtt_packet_header{type   = ?PUBLISH,
     Packet = #mqtt_packet{header   = #mqtt_packet_header{type   = ?PUBLISH,