Просмотр исходного кода

refactor(conn): not saving anonymous func

JianBo He 5 лет назад
Родитель
Сommit
3b1074d11f
6 измененных файлов с 157 добавлено и 57 удалено
  1. 7 46
      src/emqx.appup.src
  2. 5 5
      src/emqx_connection.erl
  3. 124 0
      src/emqx_frame.erl
  4. 2 0
      src/emqx_limiter.erl
  5. 5 5
      src/emqx_ws_connection.erl
  6. 14 1
      test/emqx_frame_SUITE.erl

+ 7 - 46
src/emqx.appup.src

@@ -1,48 +1,9 @@
 %% -*-: erlang -*-
-{DefaultLen, DefaultSize} =
-    case WordSize = erlang:system_info(wordsize) of
-        8 -> % arch_64
-            {10000, cuttlefish_bytesize:parse("64MB")};
-        4 -> % arch_32
-            {1000, cuttlefish_bytesize:parse("32MB")}
-    end,
-{"4.2.3",
-  [
-    {"4.2.2", [
-      {load_module, emqx_metrics, brutal_purge, soft_purge, []}
-    ]},
-    {"4.2.1", [
-      {load_module, emqx_metrics, brutal_purge, soft_purge, []},
-      {load_module, emqx_channel, brutal_purge, soft_purge, []},
-      {load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
-      {load_module, emqx_json, brutal_purge, soft_purge, []}
-    ]},
-    {"4.2.0", [
-      {load_module, emqx_metrics, brutal_purge, soft_purge, []},
-      {load_module, emqx_channel, brutal_purge, soft_purge, []},
-      {load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
-      {load_module, emqx_json, brutal_purge, soft_purge, []},
-      {apply, {application, set_env,
-                [emqx, force_shutdown_policy,
-                 #{message_queue_len => DefaultLen,
-                   max_heap_size => DefaultSize div WordSize}]}}
-    ]}
-  ],
-  [
-    {"4.2.2", [
-      {load_module, emqx_metrics, brutal_purge, soft_purge, []}
-    ]},
-    {"4.2.1", [
-      {load_module, emqx_metrics, brutal_purge, soft_purge, []},
-      {load_module, emqx_channel, brutal_purge, soft_purge, []},
-      {load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
-      {load_module, emqx_json, brutal_purge, soft_purge, []}
-    ]},
-    {"4.2.0", [
-      {load_module, emqx_metrics, brutal_purge, soft_purge, []},
-      {load_module, emqx_channel, brutal_purge, soft_purge, []},
-      {load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
-      {load_module, emqx_json, brutal_purge, soft_purge, []}
-    ]}
-  ]
+{VSN,
+ [
+   {<<".*">>, []}
+ ],
+ [
+   {<<".*">>, []}
+ ]
 }.

+ 5 - 5
src/emqx_connection.erl

@@ -80,8 +80,8 @@
           limit_timer :: maybe(reference()),
           %% Parse State
           parse_state :: emqx_frame:parse_state(),
-          %% Serialize function
-          serialize :: emqx_frame:serialize_fun(),
+          %% Serialize options
+          serialize :: emqx_frame:serialize_opts(),
           %% Channel State
           channel :: emqx_channel:channel(),
           %% GC State
@@ -203,7 +203,7 @@ init_state(Transport, Socket, Options) ->
     Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit),
     FrameOpts = emqx_zone:mqtt_frame_options(Zone),
     ParseState = emqx_frame:initial_parse_state(FrameOpts),
-    Serialize = emqx_frame:serialize_fun(),
+    Serialize = emqx_frame:serialize_opts(),
     Channel = emqx_channel:init(ConnInfo, Options),
     GcState = emqx_zone:init_gc_state(Zone),
     StatsTimer = emqx_zone:stats_timer(Zone),
@@ -337,7 +337,7 @@ handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
 handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
            State = #state{idle_timer = IdleTimer}) ->
     ok = emqx_misc:cancel_timer(IdleTimer),
-    Serialize = emqx_frame:serialize_fun(ConnPkt),
+    Serialize = emqx_frame:serialize_opts(ConnPkt),
     NState = State#state{serialize  = Serialize,
                          idle_timer = undefined
                         },
@@ -578,7 +578,7 @@ handle_outgoing(Packet, State) ->
 
 serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
     fun(Packet) ->
-        case Serialize(Packet) of
+        case emqx_frame:serialize_pkt(Packet, Serialize) of
             <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!",
                          [emqx_packet:format(Packet)]),
                     ok = emqx_metrics:inc('delivery.dropped.too_large'),

+ 124 - 0
src/emqx_frame.erl

@@ -31,26 +31,53 @@
         , serialize/2
         ]).
 
+%% The new version APIs to avoid saving
+%% anonymous func
+-export([ parse2/1
+        , parse2/2
+        , serialize_opts/0
+        , serialize_opts/1
+        , serialize_pkt/2
+        ]).
+
 -export_type([ options/0
              , parse_state/0
              , parse_result/0
              , serialize_fun/0
              ]).
 
+-export_type([ parse_state2/0
+             , parse_result2/0
+             , serialize_opts/0
+             ]).
+
 -type(options() :: #{strict_mode => boolean(),
                      max_size => 1..?MAX_PACKET_SIZE,
                      version => emqx_types:version()
                     }).
 
 -type(parse_state() :: {none, options()} | cont_fun()).
+-type(parse_state2() :: {none, options()} | {cont_state(), options()}).
 
 -type(parse_result() :: {more, cont_fun()}
                       | {ok, emqx_types:packet(), binary(), parse_state()}).
 
+-type(parse_result2() :: {more, parse_state()}
+                       | {ok, emqx_types:packet(), binary(), parse_state()}).
+
 -type(cont_fun() :: fun((binary()) -> parse_result())).
 
+-type(cont_state() :: {Stage :: len | body,
+                       State ::  #{hdr := #mqtt_packet_header{},
+                                   len := {pos_integer(), non_neg_integer()} | non_neg_integer(),
+                                   rest => binary()
+                                  }
+                      }).
+
 -type(serialize_fun() :: fun((emqx_types:packet()) -> iodata())).
 
+-type(serialize_opts() :: options()).
+
 -define(none(Options), {none, Options}).
 
 -define(DEFAULT_OPTIONS,
@@ -81,6 +108,89 @@ merge_opts(Options) ->
 %% Parse MQTT Frame
 %%--------------------------------------------------------------------
 
+-spec(parse2(binary()) -> parse_result2()).
+parse2(Bin) ->
+    parse2(Bin, initial_parse_state()).
+
+-spec(parse2(binary(), parse_state()) -> parse_result2()).
+parse2(<<>>, {none, Options}) ->
+    {more, {none, Options}};
+parse2(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>,
+      {none, Options = #{strict_mode := StrictMode}}) ->
+    %% Validate header if strict mode.
+    StrictMode andalso validate_header(Type, Dup, QoS, Retain),
+    Header = #mqtt_packet_header{type   = Type,
+                                 dup    = bool(Dup),
+                                 qos    = QoS,
+                                 retain = bool(Retain)
+                                },
+    Header1 = case fixqos(Type, QoS) of
+                  QoS      -> Header;
+                  FixedQoS -> Header#mqtt_packet_header{qos = FixedQoS}
+              end,
+    parse_remaining_len2(Rest, Header1, Options);
+
+parse2(Bin, {{len, #{hdr := Header,
+                    len := {Multiplier, Length}}
+             }, Options}) when is_binary(Bin) ->
+    parse_remaining_len2(Bin, Header, Multiplier, Length, Options);
+parse2(Bin, {{body, #{hdr := Header,
+                     len := Length,
+                     rest := Rest}
+             }, Options}) when is_binary(Bin) ->
+    parse_frame2(<<Rest/binary, Bin/binary>>, Header, Length, Options).
+
+parse_remaining_len2(<<>>, Header, Options) ->
+    {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}};
+parse_remaining_len2(Rest, Header, Options) ->
+    parse_remaining_len2(Rest, Header, 1, 0, Options).
+
+parse_remaining_len2(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize})
+  when Length > MaxSize ->
+    error(frame_too_large);
+parse_remaining_len2(<<>>, Header, Multiplier, Length, Options) ->
+    {more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}};
+%% Match DISCONNECT without payload
+parse_remaining_len2(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) ->
+    Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}),
+    {ok, Packet, Rest, ?none(Options)};
+%% Match PINGREQ.
+parse_remaining_len2(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
+    parse_frame2(Rest, Header, 0, Options);
+%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
+parse_remaining_len2(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
+    parse_frame2(Rest, Header, 2, Options);
+parse_remaining_len2(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
+    parse_remaining_len2(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
+parse_remaining_len2(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,
+                    Options = #{max_size := MaxSize}) ->
+    FrameLen = Value + Len * Multiplier,
+    if
+        FrameLen > MaxSize -> error(frame_too_large);
+        true -> parse_frame2(Rest, Header, FrameLen, Options)
+    end.
+
+parse_frame2(Bin, Header, 0, Options) ->
+    {ok, packet(Header), Bin, ?none(Options)};
+
+parse_frame2(Bin, Header, Length, Options) ->
+    case Bin of
+        <<FrameBin:Length/binary, Rest/binary>> ->
+            case parse_packet(Header, FrameBin, Options) of
+                {Variable, Payload} ->
+                    {ok, packet(Header, Variable, Payload), Rest, ?none(Options)};
+                Variable = #mqtt_packet_connect{proto_ver = Ver} ->
+                    {ok, packet(Header, Variable), Rest, ?none(Options#{version := Ver})};
+                Variable ->
+                    {ok, packet(Header, Variable), Rest, ?none(Options)}
+            end;
+        TooShortBin ->
+            {more, {{body, #{hdr => Header, len => Length, rest => TooShortBin}}, Options}}
+    end.
+
+%% Deprecated parse funcs
+%% It should be removed after 4.2.x
+
 -spec(parse(binary()) -> parse_result()).
 parse(Bin) ->
     parse(Bin, initial_parse_state()).
@@ -443,6 +553,20 @@ serialize_fun(#{version := Ver, max_size := MaxSize}) ->
         end
     end.
 
+serialize_opts() ->
+    ?DEFAULT_OPTIONS.
+
+serialize_opts(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) ->
+    MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE),
+    #{version => ProtoVer, max_size => MaxSize}.
+
+serialize_pkt(Packet, #{version := Ver, max_size := MaxSize}) ->
+    IoData = serialize(Packet, Ver),
+    case is_too_large(IoData, MaxSize) of
+        true -> <<>>;
+        false -> IoData
+    end.
+
 -spec(serialize(emqx_types:packet()) -> iodata()).
 serialize(Packet) -> serialize(Packet, ?MQTT_PROTO_V4).
 

+ 2 - 0
src/emqx_limiter.erl

@@ -53,6 +53,8 @@
 
 -type(limiter() :: #limiter{}).
 
+-dialyzer({nowarn_function, [consume/3]}).
+
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------

+ 5 - 5
src/emqx_ws_connection.erl

@@ -70,8 +70,8 @@
           limit_timer :: maybe(reference()),
           %% Parse State
           parse_state :: emqx_frame:parse_state(),
-          %% Serialize Fun
-          serialize :: emqx_frame:serialize_fun(),
+          %% Serialize options
+          serialize :: emqx_frame:serialize_opts(),
           %% Channel
           channel :: emqx_channel:channel(),
           %% GC State
@@ -231,7 +231,7 @@ websocket_init([Req, Opts]) ->
     MQTTPiggyback = proplists:get_value(mqtt_piggyback, Opts, multiple),
     FrameOpts = emqx_zone:mqtt_frame_options(Zone),
     ParseState = emqx_frame:initial_parse_state(FrameOpts),
-    Serialize = emqx_frame:serialize_fun(),
+    Serialize = emqx_frame:serialize_opts(),
     Channel = emqx_channel:init(ConnInfo, Opts),
     GcState = emqx_zone:init_gc_state(Zone),
     StatsTimer = emqx_zone:stats_timer(Zone),
@@ -292,7 +292,7 @@ websocket_info({cast, Msg}, State) ->
     handle_info(Msg, State);
 
 websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) ->
-    Serialize = emqx_frame:serialize_fun(ConnPkt),
+    Serialize = emqx_frame:serialize_opts(ConnPkt),
     NState = State#state{serialize = Serialize},
     handle_incoming(Packet, cancel_idle_timer(NState));
 
@@ -544,7 +544,7 @@ handle_outgoing(Packets, State = #state{active_n = ActiveN, mqtt_piggyback = MQT
 
 serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
     fun(Packet) ->
-        case Serialize(Packet) of
+        case emqx_frame:serialize_pkt(Packet, Serialize) of
             <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large.",
                          [emqx_packet:format(Packet)]),
                     ok = emqx_metrics:inc('delivery.dropped.too_large'),

+ 14 - 1
test/emqx_frame_SUITE.erl

@@ -26,6 +26,7 @@
 
 all() ->
     [{group, parse},
+     {group, parse2},
      {group, connect},
      {group, connack},
      {group, publish},
@@ -44,6 +45,8 @@ groups() ->
       [t_parse_cont,
        t_parse_frame_too_large
       ]},
+     {parse2, [parallel],
+      [t_parse_cont2]},
      {connect, [parallel],
       [t_serialize_parse_v3_connect,
        t_serialize_parse_v4_connect,
@@ -129,6 +132,16 @@ t_parse_frame_too_large(_) ->
     ?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})),
     ?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})).
 
+t_parse_cont2(_) ->
+    Packet = ?CONNECT_PACKET(#mqtt_packet_connect{}),
+    ParseState = emqx_frame:initial_parse_state(),
+    <<HdrBin:1/binary, LenBin:1/binary, RestBin/binary>> = serialize_to_binary(Packet),
+    {more, ContParse} = emqx_frame:parse2(<<>>, ParseState),
+    {more, ContParse1} = emqx_frame:parse2(HdrBin, ContParse),
+    {more, ContParse2} = emqx_frame:parse2(LenBin, ContParse1),
+    {more, ContParse3} = emqx_frame:parse2(<<>>, ContParse2),
+    {ok, Packet, <<>>, _} = emqx_frame:parse2(RestBin, ContParse3).
+
 t_serialize_parse_v3_connect(_) ->
     Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,
             113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108,
@@ -509,7 +522,7 @@ parse_serialize(Packet, Opts) when is_map(Opts) ->
     Ver = maps:get(version, Opts, ?MQTT_PROTO_V4),
     Bin = iolist_to_binary(emqx_frame:serialize(Packet, Ver)),
     ParseState = emqx_frame:initial_parse_state(Opts),
-    {ok, NPacket, <<>>, _} = emqx_frame:parse(Bin, ParseState),
+    {ok, NPacket, <<>>, _} = emqx_frame:parse2(Bin, ParseState),
     NPacket.
 
 serialize_to_binary(Packet) ->