Просмотр исходного кода

fix(emqx_frame): poor large frame concatenation performance

piror to this change, binary concatenation eats most of the CPU
Zaiming Shi 4 лет назад
Родитель
Сommit
de43da881a
2 измененных файлов с 55 добавлено и 20 удалено
  1. 1 1
      src/emqx_connection.erl
  2. 54 19
      src/emqx_frame.erl

+ 1 - 1
src/emqx_connection.erl

@@ -475,7 +475,7 @@ terminate(Reason, State = #state{channel = Channel, transport = Transport,
         E : C : S ->
             ?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S})
     end,
-    ?tp(debug, terminate, #{}),
+    ?tp(info, terminate, #{reason => Reason}),
     maybe_raise_excption(Reason).
 
 %% close socket, discard new state, always return ok.

+ 54 - 19
src/emqx_frame.erl

@@ -40,6 +40,8 @@
              , serialize_opts/0
              ]).
 
+-define(Q(BYTES, Q), {BYTES, Q}).
+
 -type(options() :: #{strict_mode => boolean(),
                      max_size => 1..?MAX_PACKET_SIZE,
                      version => emqx_types:version()
@@ -50,12 +52,12 @@
 -type(parse_result() :: {more, parse_state()}
                       | {ok, emqx_types:packet(), binary(), parse_state()}).
 
--type(cont_state() :: {Stage :: len | body,
-                       State ::  #{hdr := #mqtt_packet_header{},
-                                   len := {pos_integer(), non_neg_integer()} | non_neg_integer(),
-                                   rest => binary()
-                                  }
-                      }).
+-type(cont_state() ::
+      {Stage :: len | body,
+       State ::  #{hdr := #mqtt_packet_header{},
+                   len := {pos_integer(), non_neg_integer()} | non_neg_integer(),
+                   rest => binary() | ?Q(non_neg_integer(), queue:queue(binary()))
+                  }}).
 
 -type(serialize_opts() :: options()).
 
@@ -117,9 +119,19 @@ parse(Bin, {{len, #{hdr := Header,
     parse_remaining_len(Bin, Header, Multiplier, Length, Options);
 parse(Bin, {{body, #{hdr := Header,
                      len := Length,
-                     rest := Rest}
+                     rest := Body}
              }, Options}) when is_binary(Bin) ->
-    parse_frame(<<Rest/binary, Bin/binary>>, Header, Length, Options).
+    BodyBytes = body_bytes(Body),
+    {NewBodyPart, Tail} = split(BodyBytes + size(Bin) - Length, Bin),
+    NewBody = append_body(Body, NewBodyPart),
+    parse_frame(NewBody, Tail, Header, Length, Options).
+
+%% split given binary with the first N bytes
+split(N, Bin) when N =< 0 ->
+    {Bin, <<>>};
+split(N, Bin) when N =< size(Bin) ->
+    <<H:N/binary, T/binary>> = Bin,
+    {H, T}.
 
 parse_remaining_len(<<>>, Header, Options) ->
     {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}};
@@ -132,7 +144,8 @@ parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize})
 parse_remaining_len(<<>>, Header, Multiplier, Length, Options) ->
     {more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}};
 %% Match DISCONNECT without payload
-parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) ->
+parse_remaining_len(<<0:8, Rest/binary>>,
+                    Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) ->
     Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}),
     {ok, Packet, Rest, ?none(Options)};
 %% Match PINGREQ.
@@ -149,16 +162,35 @@ parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Opti
 parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,
                     Options = #{max_size := MaxSize}) ->
     FrameLen = Value + Len * Multiplier,
-    if
-        FrameLen > MaxSize -> error(frame_too_large);
-        true -> parse_frame(Rest, Header, FrameLen, Options)
+    case FrameLen > MaxSize of
+        true -> error(frame_too_large);
+        false -> parse_frame(Rest, Header, FrameLen, Options)
     end.
 
-parse_frame(Bin, Header, 0, Options) ->
-    {ok, packet(Header), Bin, ?none(Options)};
-parse_frame(Bin, Header, Length, Options) ->
-    case Bin of
-        <<FrameBin:Length/binary, Rest/binary>> ->
+body_bytes(B) when is_binary(B) -> size(B);
+body_bytes(?Q(Bytes, _)) -> Bytes.
+
+append_body(H, T) when is_binary(H) andalso size(H) < 1024 ->
+    <<H/binary, T/binary>>;
+append_body(H, T) when is_binary(H) ->
+    Bytes = size(H) + size(T),
+    ?Q(Bytes, queue:from_list([H, T]));
+append_body(?Q(Bytes, Q), T) ->
+    ?Q(Bytes + iolist_size(T), queue:in(T, Q)).
+
+flatten_body(Body, Tail) when is_binary(Body) -> <<Body/binary, Tail/binary>>;
+flatten_body(?Q(_, Q), Tail) -> iolist_to_binary([queue:to_list(Q), Tail]).
+
+parse_frame(Body, Header, Length, Options) ->
+    %% already appended
+    parse_frame(Body, _SplitTail = <<>>, Header, Length, Options).
+
+parse_frame(Body, Tail, Header, 0, Options) ->
+    {ok, packet(Header), flatten_body(Body, Tail), ?none(Options)};
+parse_frame(Body, Tail, Header, Length, Options) ->
+    case body_bytes(Body) >= Length of
+        true ->
+            <<FrameBin:Length/binary, Rest/binary>> = flatten_body(Body, Tail),
             case parse_packet(Header, FrameBin, Options) of
                 {Variable, Payload} ->
                     {ok, packet(Header, Variable, Payload), Rest, ?none(Options)};
@@ -167,8 +199,11 @@ parse_frame(Bin, Header, Length, Options) ->
                 Variable ->
                     {ok, packet(Header, Variable), Rest, ?none(Options)}
             end;
-        TooShortBin ->
-            {more, {{body, #{hdr => Header, len => Length, rest => TooShortBin}}, Options}}
+        false ->
+            {more, {{body, #{hdr => Header,
+                             len => Length,
+                             rest => append_body(Body, Tail)
+                            }}, Options}}
     end.
 
 -compile({inline, [packet/1, packet/2, packet/3]}).