Просмотр исходного кода

chore: supply the code_change logic

JianBo He 5 лет назад
Родитель
Сommit
f1b3bbd7bc
6 измененных файлов с 78 добавлено и 138 удалено
  1. 1 1
      rebar.config
  2. 56 0
      src/emqx_connection.erl
  3. 18 115
      src/emqx_frame.erl
  4. 1 7
      src/emqx_limiter.erl
  5. 1 14
      test/emqx_frame_SUITE.erl
  6. 1 1
      test/emqx_vm_SUITE.erl

+ 1 - 1
rebar.config

@@ -40,7 +40,7 @@
    [{plugins, [{coveralls, {git, "https://github.com/emqx/coveralls-erl", {branch, "github"}}}]},
     {deps,
      [{bbmustache, "1.7.0"},
-      {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}},
+      {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}},
       {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.3.0"}}}
      ]},
     {erl_opts, [debug_info]}

+ 56 - 0
src/emqx_connection.erl

@@ -441,6 +441,62 @@ system_continue(Parent, _Debug, State) ->
 system_terminate(Reason, _Parent, _Debug, State) ->
     terminate(Reason, State).
 
+system_code_change(State, _Mod, {down, Vsn}, _Extra)
+    when Vsn == "4.2.0";
+         Vsn == "4.2.1" ->
+    Channel = State#state.channel,
+    NSerialize = emqx_frame:serialize_fun(State#state.serialize),
+    case {State#state.parse_state, element(10, Channel)} of
+        {{none, _}, undefined} ->
+            {ok, State#state{serialize = NSerialize}};
+        {Ps, Quota} ->
+            %% BACKW: e4.2.0-e4.2.1
+            %% We can't recover/reconstruct anonymous function state for
+            %% Parser or Quota consumer. So just close it.
+            ?LOG(error, "Unsupport downgrade connection ~0p, peername: ~0p."
+              " Due to it have an incomplete frame or unsafed quota counter,"
+              " parser_state: ~0p, quota: ~0p."
+              " Force close it now!!!", [self(), State#state.peername, Ps, Quota]),
+            self() ! {close, unsupported_downgrade_connection_state},
+            {ok, State#state{serialize = NSerialize}}
+    end;
+
+system_code_change(State, _Mod, Vsn, _Extra)
+    when Vsn == "4.2.0";
+         Vsn == "4.2.1" ->
+    Channel = State#state.channel,
+    NChannel =
+        case element(10, Channel) of
+            undefined -> Channel;
+            Quoter ->
+                Zone = element(2, Quoter),
+                Cks = element(3, Quoter),
+                NCks = [case Name == overall_messages_routing of
+                            true -> Ck#{consumer => Zone};
+                            _ -> Ck
+                        end || Ck = #{name := Name} <- Cks],
+                setelement(10, Channel, setelement(3, Quoter, NCks))
+        end,
+
+    NParseState =
+        case State#state.parse_state of
+            Ps = {none, _} ->  Ps;
+            Ps when is_function(Ps) ->
+                case erlang:fun_info(Ps, env) of
+                    {_, [Hdr, Opts]} ->
+                        {{len, #{hdr => Hdr, len => {1,0}}}, Opts};
+                    {_, [Bin, Hdr, Len, Opts]} when is_binary(Bin) ->
+                        {{body, #{hdr => Hdr, len => Len, rest => Bin}}, Opts};
+                    {_, [Hdr, Multip, Len, Opts]} ->
+                        {{len, #{hdr => Hdr, len => {Multip, Len}}}, Opts}
+                end
+        end,
+
+    {_, [Ver, MaxSize]} = erlang:fun_info(State#state.serialize, env),
+    NSerialize = #{version => Ver, max_size => MaxSize},
+
+    {ok, State#state{channel = NChannel, parse_state = NParseState, serialize = NSerialize}};
+
 system_code_change(State, _Mod, _OldVsn, _Extra) ->
     {ok, State}.
 

+ 18 - 115
src/emqx_frame.erl

@@ -27,27 +27,16 @@
         , parse/2
         , serialize_fun/0
         , serialize_fun/1
-        , serialize/1
-        , serialize/2
-        ]).
-
-%% The new version APIs to avoid saving
-%% anonymous func
--export([ parse2/1
-        , parse2/2
         , serialize_opts/0
         , serialize_opts/1
         , serialize_pkt/2
+        , serialize/1
+        , serialize/2
         ]).
 
 -export_type([ options/0
              , parse_state/0
              , parse_result/0
-             , serialize_fun/0
-             ]).
-
--export_type([ parse_state2/0
-             , parse_result2/0
              , serialize_opts/0
              ]).
 
@@ -56,17 +45,11 @@
                      version => emqx_types:version()
                     }).
 
--type(parse_state() :: {none, options()} | cont_fun()).
--type(parse_state2() :: {none, options()} | {cont_state(), options()}).
+-type(parse_state() :: {none, options()} | {cont_state(), options()}).
 
--type(parse_result() :: {more, cont_fun()}
+-type(parse_result() :: {more, parse_state()}
                       | {ok, emqx_types:packet(), binary(), parse_state()}).
 
--type(parse_result2() :: {more, parse_state()}
-                       | {ok, emqx_types:packet(), binary(), parse_state()}).
-
--type(cont_fun() :: fun((binary()) -> parse_result())).
-
 -type(cont_state() :: {Stage :: len | body,
                        State ::  #{hdr := #mqtt_packet_header{},
                                    len := {pos_integer(), non_neg_integer()} | non_neg_integer(),
@@ -74,8 +57,6 @@
                                   }
                       }).
 
--type(serialize_fun() :: fun((emqx_types:packet()) -> iodata())).
-
 -type(serialize_opts() :: options()).
 
 -define(none(Options), {none, Options}).
@@ -108,96 +89,13 @@ merge_opts(Options) ->
 %% Parse MQTT Frame
 %%--------------------------------------------------------------------
 
--spec(parse2(binary()) -> parse_result2()).
-parse2(Bin) ->
-    parse2(Bin, initial_parse_state()).
-
--spec(parse2(binary(), parse_state()) -> parse_result2()).
-parse2(<<>>, {none, Options}) ->
-    {more, {none, Options}};
-parse2(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>,
-      {none, Options = #{strict_mode := StrictMode}}) ->
-    %% Validate header if strict mode.
-    StrictMode andalso validate_header(Type, Dup, QoS, Retain),
-    Header = #mqtt_packet_header{type   = Type,
-                                 dup    = bool(Dup),
-                                 qos    = QoS,
-                                 retain = bool(Retain)
-                                },
-    Header1 = case fixqos(Type, QoS) of
-                  QoS      -> Header;
-                  FixedQoS -> Header#mqtt_packet_header{qos = FixedQoS}
-              end,
-    parse_remaining_len2(Rest, Header1, Options);
-
-parse2(Bin, {{len, #{hdr := Header,
-                    len := {Multiplier, Length}}
-             }, Options}) when is_binary(Bin) ->
-    parse_remaining_len2(Bin, Header, Multiplier, Length, Options);
-parse2(Bin, {{body, #{hdr := Header,
-                     len := Length,
-                     rest := Rest}
-             }, Options}) when is_binary(Bin) ->
-    parse_frame2(<<Rest/binary, Bin/binary>>, Header, Length, Options).
-
-parse_remaining_len2(<<>>, Header, Options) ->
-    {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}};
-parse_remaining_len2(Rest, Header, Options) ->
-    parse_remaining_len2(Rest, Header, 1, 0, Options).
-
-parse_remaining_len2(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize})
-  when Length > MaxSize ->
-    error(frame_too_large);
-parse_remaining_len2(<<>>, Header, Multiplier, Length, Options) ->
-    {more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}};
-%% Match DISCONNECT without payload
-parse_remaining_len2(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) ->
-    Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}),
-    {ok, Packet, Rest, ?none(Options)};
-%% Match PINGREQ.
-parse_remaining_len2(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
-    parse_frame2(Rest, Header, 0, Options);
-%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
-parse_remaining_len2(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
-    parse_frame2(Rest, Header, 2, Options);
-parse_remaining_len2(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
-    parse_remaining_len2(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
-parse_remaining_len2(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,
-                    Options = #{max_size := MaxSize}) ->
-    FrameLen = Value + Len * Multiplier,
-    if
-        FrameLen > MaxSize -> error(frame_too_large);
-        true -> parse_frame2(Rest, Header, FrameLen, Options)
-    end.
-
-parse_frame2(Bin, Header, 0, Options) ->
-    {ok, packet(Header), Bin, ?none(Options)};
-
-parse_frame2(Bin, Header, Length, Options) ->
-    case Bin of
-        <<FrameBin:Length/binary, Rest/binary>> ->
-            case parse_packet(Header, FrameBin, Options) of
-                {Variable, Payload} ->
-                    {ok, packet(Header, Variable, Payload), Rest, ?none(Options)};
-                Variable = #mqtt_packet_connect{proto_ver = Ver} ->
-                    {ok, packet(Header, Variable), Rest, ?none(Options#{version := Ver})};
-                Variable ->
-                    {ok, packet(Header, Variable), Rest, ?none(Options)}
-            end;
-        TooShortBin ->
-            {more, {{body, #{hdr => Header, len => Length, rest => TooShortBin}}, Options}}
-    end.
-
-%% Deprecated parse funcs
-%% It should be removed after 4.2.x
-
 -spec(parse(binary()) -> parse_result()).
 parse(Bin) ->
     parse(Bin, initial_parse_state()).
 
 -spec(parse(binary(), parse_state()) -> parse_result()).
 parse(<<>>, {none, Options}) ->
-    {more, fun(Bin) -> parse(Bin, {none, Options}) end};
+    {more, {none, Options}};
 parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>,
       {none, Options = #{strict_mode := StrictMode}}) ->
     %% Validate header if strict mode.
@@ -212,11 +110,19 @@ parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>,
                   FixedQoS -> Header#mqtt_packet_header{qos = FixedQoS}
               end,
     parse_remaining_len(Rest, Header1, Options);
-parse(Bin, Cont) when is_binary(Bin), is_function(Cont) ->
-    Cont(Bin).
+
+parse(Bin, {{len, #{hdr := Header,
+                    len := {Multiplier, Length}}
+             }, Options}) when is_binary(Bin) ->
+    parse_remaining_len(Bin, Header, Multiplier, Length, Options);
+parse(Bin, {{body, #{hdr := Header,
+                     len := Length,
+                     rest := Rest}
+             }, Options}) when is_binary(Bin) ->
+    parse_frame(<<Rest/binary, Bin/binary>>, Header, Length, Options).
 
 parse_remaining_len(<<>>, Header, Options) ->
-    {more, fun(Bin) -> parse_remaining_len(Bin, Header, Options) end};
+    {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}};
 parse_remaining_len(Rest, Header, Options) ->
     parse_remaining_len(Rest, Header, 1, 0, Options).
 
@@ -224,7 +130,7 @@ parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize})
   when Length > MaxSize ->
     error(frame_too_large);
 parse_remaining_len(<<>>, Header, Multiplier, Length, Options) ->
-    {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end};
+    {more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}};
 %% Match DISCONNECT without payload
 parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) ->
     Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}),
@@ -260,9 +166,7 @@ parse_frame(Bin, Header, Length, Options) ->
                     {ok, packet(Header, Variable), Rest, ?none(Options)}
             end;
         TooShortBin ->
-            {more, fun(BinMore) ->
-                           parse_frame(<<TooShortBin/binary, BinMore/binary>>, Header, Length, Options)
-                   end}
+            {more, {{body, #{hdr => Header, len => Length, rest => TooShortBin}}, Options}}
     end.
 
 -compile({inline, [packet/1, packet/2, packet/3]}).
@@ -870,4 +774,3 @@ fixqos(?PUBREL, 0)      -> 1;
 fixqos(?SUBSCRIBE, 0)   -> 1;
 fixqos(?UNSUBSCRIBE, 0) -> 1;
 fixqos(_Type, QoS)      -> QoS.
-

+ 1 - 7
src/emqx_limiter.erl

@@ -128,13 +128,7 @@ consume(Pubs, Bytes, #{name := Name, consumer := Cons}) ->
         _ ->
             case is_overall_limiter(Name) of
                 true ->
-                    {_, Intv} = case erlang:is_function(Cons) of
-                        true -> %% Compatible with hot-upgrade from e4.2.0, e4.2.1.
-                                %% It should be removed after 4.3.0
-                            {env, [Zone|_]} = erlang:fun_info(Cons, env),
-                            esockd_limiter:consume({Zone, Name}, Tokens);
-                        _ -> esockd_limiter:consume({Cons, Name}, Tokens)
-                    end,
+                    {_, Intv} = esockd_limiter:consume({Cons, Name}, Tokens),
                     {Intv, Cons};
                 _ ->
                     esockd_rate_limit:check(Tokens, Cons)

+ 1 - 14
test/emqx_frame_SUITE.erl

@@ -26,7 +26,6 @@
 
 all() ->
     [{group, parse},
-     {group, parse2},
      {group, connect},
      {group, connack},
      {group, publish},
@@ -45,8 +44,6 @@ groups() ->
       [t_parse_cont,
        t_parse_frame_too_large
       ]},
-     {parse2, [parallel],
-      [t_parse_cont2]},
      {connect, [parallel],
       [t_serialize_parse_v3_connect,
        t_serialize_parse_v4_connect,
@@ -132,16 +129,6 @@ t_parse_frame_too_large(_) ->
     ?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})),
     ?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})).
 
-t_parse_cont2(_) ->
-    Packet = ?CONNECT_PACKET(#mqtt_packet_connect{}),
-    ParseState = emqx_frame:initial_parse_state(),
-    <<HdrBin:1/binary, LenBin:1/binary, RestBin/binary>> = serialize_to_binary(Packet),
-    {more, ContParse} = emqx_frame:parse2(<<>>, ParseState),
-    {more, ContParse1} = emqx_frame:parse2(HdrBin, ContParse),
-    {more, ContParse2} = emqx_frame:parse2(LenBin, ContParse1),
-    {more, ContParse3} = emqx_frame:parse2(<<>>, ContParse2),
-    {ok, Packet, <<>>, _} = emqx_frame:parse2(RestBin, ContParse3).
-
 t_serialize_parse_v3_connect(_) ->
     Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,
             113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108,
@@ -522,7 +509,7 @@ parse_serialize(Packet, Opts) when is_map(Opts) ->
     Ver = maps:get(version, Opts, ?MQTT_PROTO_V4),
     Bin = iolist_to_binary(emqx_frame:serialize(Packet, Ver)),
     ParseState = emqx_frame:initial_parse_state(Opts),
-    {ok, NPacket, <<>>, _} = emqx_frame:parse2(Bin, ParseState),
+    {ok, NPacket, <<>>, _} = emqx_frame:parse(Bin, ParseState),
     NPacket.
 
 serialize_to_binary(Packet) ->

+ 1 - 1
test/emqx_vm_SUITE.erl

@@ -80,7 +80,7 @@ t_get_port_info(_Config) ->
     {ok, Sock} = gen_tcp:connect("localhost", 5678, [binary, {packet, 0}]),
     emqx_vm:get_port_info(),
     ok = gen_tcp:close(Sock),
-    [Port | _] = erlang:ports().
+    [_Port | _] = erlang:ports().
 
 t_transform_port(_Config) ->
     [Port | _] = erlang:ports(),