Browse Source

Better zone (#2548)

Gilbert 6 years ago
parent
commit
0857b976ed

+ 10 - 0
etc/emqx.conf

@@ -723,6 +723,11 @@ zone.external.flapping_banned_expiry_interval = 1h
 ## Default: false
 zone.external.use_username_as_clientid = false
 
+## Whether to ignore loop delivery of messages.(for mqtt v3.1.1)
+##
+## Value: true | false
+zone.external.ignore_loop_deliver = false
+
 ##--------------------------------------------------------------------
 ## Internal Zone
 
@@ -818,6 +823,11 @@ zone.internal.flapping_banned_expiry_interval = 1h
 ## Default: false
 zone.internal.use_username_as_clientid = false
 
+## Whether to ignore loop delivery of messages.(for mqtt v3.1.1)
+##
+## Value: true | false
+zone.internal.ignore_loop_deliver = false
+
 ##--------------------------------------------------------------------
 ## Listeners
 ##--------------------------------------------------------------------

+ 1 - 1
priv/emqx.schema

@@ -646,7 +646,7 @@ end}.
 ]}.
 
 %% @doc Whether to ignore loop delivery of messages.(for mqtt v3.1.1)
-{mapping, "mqtt.ignore_loop_deliver", "emqx.mqtt_ignore_loop_deliver", [
+{mapping, "mqtt.ignore_loop_deliver", "emqx.ignore_loop_deliver", [
   {default, true},
   {datatype, {enum, [true, false]}}
 ]}.

+ 13 - 13
src/emqx_connection.erl

@@ -45,6 +45,7 @@
         ]).
 
 -record(state, {
+          zone,
           transport,
           socket,
           peername,
@@ -55,12 +56,10 @@
           parse_state,
           gc_state,
           keepalive,
-          enable_stats,
           stats_timer,
           rate_limit,
           pub_limit,
-          limit_timer,
-          idle_timeout
+          limit_timer
          }).
 
 -define(ACTIVE_N, 100).
@@ -152,7 +151,6 @@ init({Transport, RawSocket, Options}) ->
     RateLimit = init_limiter(proplists:get_value(rate_limit, Options)),
     PubLimit = init_limiter(emqx_zone:get_env(Zone, publish_limit)),
     ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N),
-    EnableStats = emqx_zone:get_env(Zone, enable_stats, true),
     IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
     SendFun = fun(Packet, SeriaOpts) ->
                       Data = emqx_frame:serialize(Packet, SeriaOpts),
@@ -171,7 +169,8 @@ init({Transport, RawSocket, Options}) ->
     ParseState = emqx_protocol:parser(ProtoState),
     GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
     GcState = emqx_gc:init(GcPolicy),
-    State = #state{transport    = Transport,
+    State = #state{zone         = Zone,
+                   transport    = Transport,
                    socket       = Socket,
                    peername     = Peername,
                    conn_state   = running,
@@ -180,9 +179,7 @@ init({Transport, RawSocket, Options}) ->
                    pub_limit    = PubLimit,
                    proto_state  = ProtoState,
                    parse_state  = ParseState,
-                   gc_state     = GcState,
-                   enable_stats = EnableStats,
-                   idle_timeout = IdleTimout},
+                   gc_state     = GcState},
     ok = emqx_misc:init_proc_mng_policy(Zone),
     gen_statem:enter_loop(?MODULE, [{hibernate_after, 2 * IdleTimout}],
                           idle, State, self(), [IdleTimout]).
@@ -470,11 +467,14 @@ activate_socket(#state{transport = Transport, socket = Socket, active_n = N}) ->
 %%------------------------------------------------------------------------------
 %% Ensure stats timer
 
-ensure_stats_timer(State = #state{enable_stats = true,
-                                  stats_timer = undefined,
-                                  idle_timeout = IdleTimeout}) ->
-    State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
-
+ensure_stats_timer(State = #state{zone = Zone, stats_timer = undefined}) ->
+    case emqx_zone:get_env(Zone, enable_stats, true) of
+        true ->
+            IdleTimeout = emqx_zone:get_env(Zone, idle_timeout, 30000),
+            State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
+        false ->
+            State
+    end;
 ensure_stats_timer(State) -> State.
 
 %%------------------------------------------------------------------------------

+ 108 - 100
src/emqx_protocol.erl

@@ -54,20 +54,14 @@
           session,
           clean_start,
           topic_aliases,
-          packet_size,
           will_topic,
           will_msg,
           keepalive,
           is_bridge,
-          enable_ban,
-          enable_acl,
-          enable_flapping_detect,
-          acl_deny_action,
           recv_stats,
           send_stats,
           connected,
           connected_at,
-          ignore_loop,
           topic_alias_maximum,
           conn_mod,
           credentials,
@@ -106,16 +100,10 @@ init(SocketOpts = #{ sockname := Sockname
             username               = init_username(Peercert, Options),
             clean_start            = false,
             topic_aliases          = #{},
-            packet_size            = emqx_zone:get_env(Zone, max_packet_size),
             is_bridge              = false,
-            enable_ban             = emqx_zone:get_env(Zone, enable_ban, false),
-            enable_acl             = emqx_zone:get_env(Zone, enable_acl),
-            enable_flapping_detect = emqx_zone:get_env(Zone, enable_flapping_detect, false),
-            acl_deny_action        = emqx_zone:get_env(Zone, acl_deny_action, ignore),
             recv_stats             = #{msg => 0, pkt => 0},
             send_stats             = #{msg => 0, pkt => 0},
             connected              = false,
-            ignore_loop            = emqx_config:get_env(mqtt_ignore_loop_deliver, false),
             topic_alias_maximum    = #{to_client => 0, from_client => 0},
             conn_mod               = maps:get(conn_mod, SocketOpts, undefined),
             credentials            = #{},
@@ -138,18 +126,18 @@ set_username(_Username, PState) ->
 %% API
 %%------------------------------------------------------------------------------
 
-info(PState = #pstate{conn_props    = ConnProps,
+info(PState = #pstate{zone          = Zone,
+                      conn_props    = ConnProps,
                       ack_props     = AckProps,
                       session       = Session,
                       topic_aliases = Aliases,
-                      will_msg      = WillMsg,
-                      enable_acl    = EnableAcl}) ->
+                      will_msg      = WillMsg}) ->
     maps:merge(attrs(PState), #{conn_props => ConnProps,
                                 ack_props => AckProps,
                                 session => Session,
                                 topic_aliases => Aliases,
                                 will_msg => WillMsg,
-                                enable_acl => EnableAcl
+                                enable_acl => emqx_zone:get_env(Zone, enable_acl, false)
                                }).
 
 attrs(#pstate{zone         = Zone,
@@ -244,7 +232,8 @@ stats(#pstate{recv_stats = #{pkt := RecvPkt, msg := RecvMsg},
 session(#pstate{session = SPid}) ->
     SPid.
 
-parser(#pstate{packet_size = Size, proto_ver = Ver}) ->
+parser(#pstate{zone = Zone, proto_ver = Ver}) ->
+    Size = emqx_zone:get_env(Zone, max_packet_size),
     emqx_frame:initial_state(#{max_packet_size => Size, version => Ver}).
 
 %%------------------------------------------------------------------------------
@@ -430,31 +419,37 @@ process(?CONNECT_PACKET(
               {ReasonCode, PState1}
       end);
 
-process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) ->
+process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload),
+        PState = #pstate{zone = Zone, proto_ver = ProtoVer}) ->
     case check_publish(Packet, PState) of
         ok ->
             do_publish(Packet, PState);
         {error, ReasonCode} ->
             ?LOG(warning, "[Protocol] Cannot publish qos0 message to ~s for ~s",
                  [Topic, emqx_reason_codes:text(ReasonCode)]),
-            do_acl_deny_action(Packet, ReasonCode, PState)
+            AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore),
+            ErrorTerm = {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState},
+            do_acl_deny_action(AclDenyAction, Packet, ReasonCode, ErrorTerm)
     end;
 
-process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState) ->
+process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload),
+        PState = #pstate{zone = Zone, proto_ver = ProtoVer}) ->
     case check_publish(Packet, PState) of
         ok ->
             do_publish(Packet, PState);
         {error, ReasonCode} ->
-            ?LOG(warning, "[Protocol] Cannot publish qos1 message to ~s for ~s",
-                [Topic, emqx_reason_codes:text(ReasonCode)]),
+            ?LOG(warning, "[Protocol] Cannot publish qos1 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]),
             case deliver({puback, PacketId, ReasonCode}, PState) of
                 {ok, PState1} ->
-                    do_acl_deny_action(Packet, ReasonCode, PState1);
+                    AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore),
+                    ErrorTerm = {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState1},
+                    do_acl_deny_action(AclDenyAction, Packet, ReasonCode, ErrorTerm);
                 Error -> Error
             end
     end;
 
-process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState) ->
+process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload),
+        PState = #pstate{zone = Zone, proto_ver = ProtoVer}) ->
     case check_publish(Packet, PState) of
         ok ->
             do_publish(Packet, PState);
@@ -463,7 +458,9 @@ process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState) ->
                  [Topic, emqx_reason_codes:text(ReasonCode)]),
             case deliver({pubrec, PacketId, ReasonCode}, PState) of
                 {ok, PState1} ->
-                    do_acl_deny_action(Packet, ReasonCode, PState1);
+                    AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore),
+                    ErrorTerm = {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState1},
+                    do_acl_deny_action(AclDenyAction, Packet, ReasonCode, ErrorTerm);
                 Error -> Error
             end
     end;
@@ -491,7 +488,7 @@ process(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid})
     {ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState};
 
 process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
-        PState = #pstate{session = SPid, credentials = Credentials}) ->
+        PState = #pstate{zone = Zone, proto_ver = ProtoVer, session = SPid, credentials = Credentials}) ->
     case check_subscribe(parse_topic_filters(?SUBSCRIBE, raw_topic_filters(PState, RawTopicFilters)), PState) of
         {ok, TopicFilters} ->
             TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [Credentials], TopicFilters),
@@ -509,7 +506,9 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
                  [SubTopics, [emqx_reason_codes:text(R) || R <- ReasonCodes]]),
             case deliver({suback, PacketId, ReasonCodes}, PState) of
                 {ok, PState1} ->
-                    do_acl_deny_action(Packet, ReasonCodes, PState1);
+                    AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore),
+                    ErrorTerm = {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState1},
+                    do_acl_deny_action(AclDenyAction, Packet, ReasonCodes, ErrorTerm);
                 Error ->
                     Error
             end
@@ -813,16 +812,13 @@ check_client_id(#mqtt_packet_connect{client_id = ClientId}, #pstate{zone = Zone}
 check_flapping(#mqtt_packet_connect{}, PState) ->
     do_flapping_detect(connect, PState).
 
-check_banned(_ConnPkt, #pstate{enable_ban = false}) ->
-    ok;
 check_banned(#mqtt_packet_connect{client_id = ClientId, username = Username},
-             #pstate{peername = Peername}) ->
-    case emqx_banned:check(#{client_id => ClientId,
-                             username  => Username,
-                             peername  => Peername}) of
-        true  -> {error, ?RC_BANNED};
-        false -> ok
-    end.
+             #pstate{zone = Zone, peername = Peername}) ->
+    Credentials = #{client_id => ClientId,
+                    username  => Username,
+                    peername  => Peername},
+    EnableBan = emqx_zone:get_env(Zone, enable_ban, false),
+    do_check_banned(EnableBan, Credentials).
 
 check_will_topic(#mqtt_packet_connect{will_flag = false}, _PState) ->
     ok;
@@ -833,14 +829,14 @@ check_will_topic(#mqtt_packet_connect{will_topic = WillTopic} = ConnPkt, PState)
             {error, ?RC_TOPIC_NAME_INVALID}
     end.
 
-check_will_acl(_ConnPkt, #pstate{enable_acl = EnableAcl}) when not EnableAcl ->
-    ok;
-check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, #pstate{credentials = Credentials}) ->
-    case emqx_access_control:check_acl(Credentials, publish, WillTopic) of
-        allow -> ok;
-        deny ->
+check_will_acl(#mqtt_packet_connect{will_topic = WillTopic},
+               #pstate{zone = Zone, credentials = Credentials}) ->
+    EnableAcl = emqx_zone:get_env(Zone, enable_acl, false),
+    case do_acl_check(EnableAcl, publish, Credentials, WillTopic) of
+        ok -> ok;
+        Other ->
             ?LOG(warning, "[Protocol] Cannot publish will message to ~p for acl denied", [WillTopic]),
-            {error, ?RC_NOT_AUTHORIZED}
+            Other
     end.
 
 check_publish(Packet, PState) ->
@@ -852,14 +848,13 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Ret
                #pstate{zone = Zone}) ->
     emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}).
 
-check_pub_acl(_Packet, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl})
-        when IsSuper orelse (not EnableAcl) ->
+check_pub_acl(_Packet, #pstate{credentials = #{is_superuser := IsSuper}})
+        when IsSuper ->
     ok;
-check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, #pstate{credentials = Credentials}) ->
-    case emqx_access_control:check_acl(Credentials, publish, Topic) of
-        allow -> ok;
-        deny -> {error, ?RC_NOT_AUTHORIZED}
-    end.
+check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
+              #pstate{zone = Zone, credentials = Credentials}) ->
+    EnableAcl = emqx_zone:get_env(Zone, enable_acl, false),
+    do_acl_check(EnableAcl, publish, Credentials, Topic).
 
 run_check_steps([], _Packet, _PState) ->
     ok;
@@ -879,17 +874,18 @@ check_subscribe(TopicFilters, PState = #pstate{zone = Zone}) ->
             {error, TopicFilter1}
     end.
 
-check_sub_acl(TopicFilters, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl})
-        when IsSuper orelse (not EnableAcl) ->
+check_sub_acl(TopicFilters, #pstate{credentials = #{is_superuser := IsSuper}})
+        when IsSuper ->
     {ok, TopicFilters};
-check_sub_acl(TopicFilters, #pstate{credentials = Credentials}) ->
+check_sub_acl(TopicFilters, #pstate{zone = Zone, credentials = Credentials}) ->
+    EnableAcl = emqx_zone:get_env(Zone, enable_acl, false),
     lists:foldr(
-      fun({Topic, SubOpts}, {Ok, Acc}) ->
-              case emqx_access_control:check_acl(Credentials, subscribe, Topic) of
-                  allow -> {Ok, [{Topic, SubOpts}|Acc]};
-                  deny  ->
-                      {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]}
-              end
+      fun({Topic, SubOpts}, {Ok, Acc}) when EnableAcl ->
+              AllowTerm = {Ok, [{Topic, SubOpts}|Acc]},
+              DenyTerm = {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]},
+              do_acl_check(subscribe, Credentials, Topic, AllowTerm, DenyTerm);
+         (TopicFilter, Acc) ->
+              {ok, [TopicFilter | Acc]}
       end, {ok, []}, TopicFilters).
 
 trace(recv, Packet) ->
@@ -950,52 +946,45 @@ flag(true)  -> 1.
 %% Execute actions in case acl deny
 
 do_flapping_detect(Action, #pstate{zone = Zone,
-                                   client_id = ClientId,
-                                   enable_flapping_detect = true}) ->
-    BanExpiryInterval = emqx_zone:get_env(Zone, flapping_ban_expiry_interval, 3600000),
-    Threshold = emqx_zone:get_env(Zone, flapping_threshold, 20),
-    Until = erlang:system_time(second) + BanExpiryInterval,
-    case emqx_flapping:check(Action, ClientId, Threshold) of
-        flapping ->
-            emqx_banned:add(#banned{who = {client_id, ClientId},
-                                    reason = <<"flapping">>,
-                                    by = <<"flapping_checker">>,
-                                    until = Until}),
-            ok;
-        _Other ->
-            ok
-    end;
-do_flapping_detect(_Action, _PState) ->
-    ok.
-
-do_acl_deny_action(?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload),
-                   ?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer,
-                                                        acl_deny_action = disconnect}) ->
-    {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState};
-
-do_acl_deny_action(?PUBLISH_PACKET(?QOS_1, _Topic, _PacketId, _Payload),
-                   ?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer,
-                                                        acl_deny_action = disconnect}) ->
+                                   client_id = ClientId}) ->
+    ok = case emqx_zone:get_env(Zone, enable_flapping_detect, false) of
+             true ->
+                 Threshold = emqx_zone:get_env(Zone, flapping_threshold, 20),
+                 case emqx_flapping:check(Action, ClientId, Threshold) of
+                     flapping ->
+                         BanExpiryInterval = emqx_zone:get_env(Zone, flapping_ban_expiry_interval, 3600000),
+                         Until = erlang:system_time(second) + BanExpiryInterval,
+                         emqx_banned:add(#banned{who = {client_id, ClientId},
+                                                 reason = <<"flapping">>,
+                                                 by = <<"flapping_checker">>,
+                                                 until = Until}),
+                         ok;
+                     _Other ->
+                         ok
+                 end;
+             _EnableFlappingDetect -> ok
+         end.
+
+do_acl_deny_action(disconnect, ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload),
+                   ?RC_NOT_AUTHORIZED, ErrorTerm) ->
+    ErrorTerm;
+
+do_acl_deny_action(disconnect, ?PUBLISH_PACKET(QoS, _Topic, _PacketId, _Payload),
+                   ?RC_NOT_AUTHORIZED, ErrorTerm = {_Error, _CodeName, PState})
+  when QoS =:= ?QOS_1; QoS =:= ?QOS_2 ->
     deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState),
-    {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState};
+    ErrorTerm;
 
-do_acl_deny_action(?PUBLISH_PACKET(?QOS_2, _Topic, _PacketId, _Payload),
-                   ?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer,
-                                                        acl_deny_action = disconnect}) ->
-    deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState),
-    {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState};
-
-do_acl_deny_action(?SUBSCRIBE_PACKET(_PacketId, _Properties, _RawTopicFilters),
-                   ReasonCodes, PState = #pstate{proto_ver = ProtoVer,
-                                                 acl_deny_action = disconnect}) ->
+do_acl_deny_action(disconnect, ?SUBSCRIBE_PACKET(_PacketId, _Properties, _RawTopicFilters),
+                   ReasonCodes, ErrorTerm = {_Error, _CodeName, PState}) ->
     case lists:member(?RC_NOT_AUTHORIZED, ReasonCodes) of
         true ->
             deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState),
-            {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState};
+            ErrorTerm;
         false ->
             {ok, PState}
     end;
-do_acl_deny_action(_PubSupPacket, _ReasonCode, PState) ->
+do_acl_deny_action(_OtherAction, _PubSupPacket, _ReasonCode, {_Error, _CodeName, PState}) ->
     {ok, PState}.
 
 %% Reason code compat
@@ -1006,9 +995,8 @@ reason_codes_compat(unsuback, _ReasonCodes, _ProtoVer) ->
 reason_codes_compat(PktType, ReasonCodes, _ProtoVer) ->
     [emqx_reason_codes:compat(PktType, RC) || RC <- ReasonCodes].
 
-raw_topic_filters(#pstate{proto_ver = ProtoVer,
-                          is_bridge = IsBridge,
-                          ignore_loop = IgnoreLoop}, RawTopicFilters) ->
+raw_topic_filters(#pstate{zone = Zone, proto_ver = ProtoVer, is_bridge = IsBridge}, RawTopicFilters) ->
+    IgnoreLoop = emqx_zone:get_env(Zone, ignore_loop_deliver, false),
     case ProtoVer < ?MQTT_PROTO_V5 of
         true ->
             IfIgnoreLoop = case IgnoreLoop of true -> 1; false -> 0 end,
@@ -1022,3 +1010,23 @@ raw_topic_filters(#pstate{proto_ver = ProtoVer,
 
 mountpoint(Credentials) ->
     maps:get(mountpoint, Credentials, undefined).
+
+do_check_banned(_EnableBan = true, Credentials) ->
+    case emqx_banned:check(Credentials) of
+        true  -> {error, ?RC_BANNED};
+        false -> ok
+    end;
+do_check_banned(_EnableBan, Credentials) -> ok.
+
+do_acl_check(_EnableAcl = true, Action, Credentials, Topic) ->
+    AllowTerm = ok,
+    DenyTerm = {error, ?RC_NOT_AUTHORIZED},
+    do_acl_check(Action, Credentials, Topic, AllowTerm, DenyTerm);
+do_acl_check(_EnableAcl, _Action, _Credentials, _Topic) ->
+    ok.
+
+do_acl_check(Action, Credentials, Topic, AllowTerm, DenyTerm) ->
+    case emqx_access_control:check_acl(Credentials, Action, Topic) of
+        allow -> AllowTerm;
+        deny -> DenyTerm
+    end.

+ 41 - 59
src/emqx_session.erl

@@ -84,6 +84,9 @@
 -import(emqx_zone, [get_env/2, get_env/3]).
 
 -record(state, {
+          %% zone
+          zone :: atom(),
+
           %% Idle timeout
           idle_timeout :: pos_integer(),
 
@@ -111,24 +114,15 @@
           %% Next packet id of the session
           next_pkt_id = 1 :: emqx_mqtt_types:packet_id(),
 
-          %% Max subscriptions
-          max_subscriptions :: non_neg_integer(),
-
           %% Client’s Subscriptions.
           subscriptions :: map(),
 
-          %% Upgrade QoS?
-          upgrade_qos = false :: boolean(),
-
           %% Client <- Broker: Inflight QoS1, QoS2 messages sent to the client but unacked.
           inflight :: emqx_inflight:inflight(),
 
           %% Max Inflight Size. DEPRECATED: Get from inflight
           %% max_inflight = 32 :: non_neg_integer(),
 
-          %% Retry interval for redelivering QoS1/2 messages
-          retry_interval = 20000 :: timeout(),
-
           %% Retry Timer
           retry_timer :: maybe(reference()),
 
@@ -141,12 +135,6 @@
           %% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel.
           awaiting_rel :: map(),
 
-          %% Max Packets Awaiting PUBREL
-          max_awaiting_rel = 100 :: non_neg_integer(),
-
-          %% Awaiting PUBREL Timeout
-          await_rel_timeout = 20000 :: timeout(),
-
           %% Awaiting PUBREL Timer
           await_rel_timer :: maybe(reference()),
 
@@ -156,9 +144,6 @@
           %% Expired Timer
           expiry_timer :: maybe(reference()),
 
-          %% Enable Stats
-          enable_stats :: boolean(),
-
           %% Stats timer
           stats_timer :: maybe(reference()),
 
@@ -191,28 +176,24 @@ start_link(SessAttrs) ->
 info(SPid) when is_pid(SPid) ->
     gen_server:call(SPid, info, infinity);
 
-info(State = #state{conn_pid = ConnPid,
+info(State = #state{zone = Zone,
+                    conn_pid = ConnPid,
                     next_pkt_id = PktId,
-                    max_subscriptions = MaxSubscriptions,
                     subscriptions = Subscriptions,
-                    upgrade_qos = UpgradeQoS,
                     inflight = Inflight,
-                    retry_interval = RetryInterval,
                     mqueue = MQueue,
-                    awaiting_rel = AwaitingRel,
-                    max_awaiting_rel = MaxAwaitingRel,
-                    await_rel_timeout = AwaitRelTimeout}) ->
+                    awaiting_rel = AwaitingRel}) ->
     attrs(State) ++ [{conn_pid, ConnPid},
                      {next_pkt_id, PktId},
-                     {max_subscriptions, MaxSubscriptions},
+                     {max_subscriptions, get_env(Zone, max_subscriptions, 0)},
                      {subscriptions, Subscriptions},
-                     {upgrade_qos, UpgradeQoS},
+                     {upgrade_qos, get_env(Zone, upgrade_qos, false)},
                      {inflight, Inflight},
-                     {retry_interval, RetryInterval},
+                     {retry_interval, get_env(Zone, retry_interval, 0)},
                      {mqueue_len, emqx_mqueue:len(MQueue)},
                      {awaiting_rel, AwaitingRel},
-                     {max_awaiting_rel, MaxAwaitingRel},
-                     {await_rel_timeout, AwaitRelTimeout}].
+                     {max_awaiting_rel, get_env(Zone, max_awaiting_rel)},
+                     {await_rel_timeout, get_env(Zone, await_rel_timeout)}].
 
 %% @doc Get session attrs
 -spec(attrs(spid() | #state{}) -> list({atom(), term()})).
@@ -236,21 +217,20 @@ attrs(#state{clean_start = CleanStart,
 stats(SPid) when is_pid(SPid) ->
     gen_server:call(SPid, stats, infinity);
 
-stats(#state{max_subscriptions = MaxSubscriptions,
+stats(#state{zone = Zone,
              subscriptions = Subscriptions,
              inflight = Inflight,
              mqueue = MQueue,
-             max_awaiting_rel = MaxAwaitingRel,
              awaiting_rel = AwaitingRel}) ->
     lists:append(emqx_misc:proc_stats(),
-                 [{max_subscriptions, MaxSubscriptions},
+                 [{max_subscriptions, get_env(Zone, max_subscriptions, 0)},
                   {subscriptions_count, maps:size(Subscriptions)},
                   {max_inflight, emqx_inflight:max_size(Inflight)},
                   {inflight_len, emqx_inflight:size(Inflight)},
                   {max_mqueue, emqx_mqueue:max_len(MQueue)},
                   {mqueue_len, emqx_mqueue:len(MQueue)},
                   {mqueue_dropped, emqx_mqueue:dropped(MQueue)},
-                  {max_awaiting_rel, MaxAwaitingRel},
+                  {max_awaiting_rel, get_env(Zone, max_awaiting_rel)},
                   {awaiting_rel_len, maps:size(AwaitingRel)},
                   {deliver_msg, emqx_pd:get_counter(deliver_stats)},
                   {enqueue_msg, emqx_pd:get_counter(enqueue_stats)}]).
@@ -364,23 +344,18 @@ init([Parent, #{zone            := Zone,
     emqx_logger:set_metadata_client_id(ClientId),
     GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
     IdleTimout = get_env(Zone, idle_timeout, 30000),
-    State = #state{idle_timeout      = IdleTimout,
+    State = #state{zone              = Zone,
+                   idle_timeout      = IdleTimout,
                    clean_start       = CleanStart,
                    deliver_fun       = deliver_fun(ConnPid),
                    client_id         = ClientId,
                    username          = Username,
                    conn_pid          = ConnPid,
                    subscriptions     = #{},
-                   max_subscriptions = get_env(Zone, max_subscriptions, 0),
-                   upgrade_qos       = get_env(Zone, upgrade_qos, false),
                    inflight          = emqx_inflight:new(MaxInflight),
                    mqueue            = init_mqueue(Zone),
-                   retry_interval    = get_env(Zone, retry_interval, 0),
                    awaiting_rel      = #{},
-                   await_rel_timeout = get_env(Zone, await_rel_timeout),
-                   max_awaiting_rel  = get_env(Zone, max_awaiting_rel),
                    expiry_interval   = ExpiryInterval,
-                   enable_stats      = get_env(Zone, enable_stats, true),
                    gc_state          = emqx_gc:init(GcPolicy),
                    created_at        = os:timestamp(),
                    will_msg          = WillMsg
@@ -433,9 +408,10 @@ handle_call({discard, ByPid}, _From, State = #state{client_id = ClientId, conn_p
 %% PUBLISH: This is only to register packetId to session state.
 %% The actual message dispatching should be done by the caller (e.g. connection) process.
 handle_call({register_publish_packet_id, PacketId, Ts}, _From,
-            State = #state{awaiting_rel = AwaitingRel}) ->
+            State = #state{zone = Zone, awaiting_rel = AwaitingRel}) ->
+    MaxAwaitingRel = get_env(Zone, max_awaiting_rel),
     reply(
-      case is_awaiting_full(State) of
+      case is_awaiting_full(MaxAwaitingRel, AwaitingRel) of
           false ->
               case maps:is_key(PacketId, AwaitingRel) of
                   true ->
@@ -742,7 +718,8 @@ retry_delivery(_Force, [], _Now, State) ->
     ensure_retry_timer(State);
 
 retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
-               State = #state{inflight = Inflight, retry_interval = Interval}) ->
+               State = #state{zone = Zone, inflight = Inflight}) ->
+    Interval = get_env(Zone, retry_interval, 0),
     %% Microseconds -> MilliSeconds
     Age = timer:now_diff(Now, Ts) div 1000,
     if
@@ -789,7 +766,8 @@ expire_awaiting_rel([], _Now, State) ->
     State#state{await_rel_timer = undefined};
 
 expire_awaiting_rel([{PacketId, Ts} | More], Now,
-                    State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) ->
+                    State = #state{zone = Zone, awaiting_rel = AwaitingRel}) ->
+    Timeout = get_env(Zone, await_rel_timeout),
     case (timer:now_diff(Now, Ts) div 1000) of
         Age when Age >= Timeout ->
             emqx_metrics:trans(inc, 'messages/qos2/expired'),
@@ -803,11 +781,10 @@ expire_awaiting_rel([{PacketId, Ts} | More], Now,
 %% Check awaiting rel
 %%------------------------------------------------------------------------------
 
-is_awaiting_full(#state{max_awaiting_rel = 0}) ->
+is_awaiting_full(_MaxAwaitingRel = 0, _AwaitingRel) ->
     false;
-is_awaiting_full(#state{awaiting_rel = AwaitingRel,
-                        max_awaiting_rel = MaxLen}) ->
-    maps:size(AwaitingRel) >= MaxLen.
+is_awaiting_full(MaxAwaitingRel, AwaitingRel) ->
+    maps:size(AwaitingRel) >= MaxAwaitingRel.
 
 %%------------------------------------------------------------------------------
 %% Dispatch messages
@@ -900,10 +877,11 @@ process_subopts([{nl, 1}|_Opts], #message{from = ClientId}, #state{client_id = C
     ignore;
 process_subopts([{nl, _}|Opts], Msg, State) ->
     process_subopts(Opts, Msg, State);
-process_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = false}) ->
-    process_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, State);
-process_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = true}) ->
-    process_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, State);
+process_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, State = #state{zone = Zone}) ->
+    case get_env(Zone, upgrade_qos, false) of
+        true -> process_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, State);
+        false -> process_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, State)
+    end;
 process_subopts([{rap, _Rap}|Opts], Msg = #message{flags = Flags, headers = #{retained := true}}, State = #state{}) ->
     process_subopts(Opts, Msg#message{flags = maps:put(retain, true, Flags)}, State);
 process_subopts([{rap, 0}|Opts], Msg = #message{flags = Flags}, State = #state{}) ->
@@ -1053,8 +1031,9 @@ drain_q(Cnt, Msgs, Q) ->
 %%------------------------------------------------------------------------------
 %% Ensure timers
 
-ensure_await_rel_timer(State = #state{await_rel_timer = undefined,
-                                      await_rel_timeout = Timeout}) ->
+ensure_await_rel_timer(State = #state{zone = Zone,
+                                      await_rel_timer = undefined}) ->
+    Timeout = get_env(Zone, await_rel_timeout),
     ensure_await_rel_timer(Timeout, State);
 ensure_await_rel_timer(State) ->
     State.
@@ -1064,8 +1043,8 @@ ensure_await_rel_timer(Timeout, State = #state{await_rel_timer = undefined}) ->
 ensure_await_rel_timer(_Timeout, State) ->
     State.
 
-ensure_retry_timer(State = #state{retry_timer = undefined,
-                                  retry_interval = Interval}) ->
+ensure_retry_timer(State = #state{zone = Zone, retry_timer = undefined}) ->
+    Interval = get_env(Zone, retry_interval, 0),
     ensure_retry_timer(Interval, State);
 ensure_retry_timer(State) ->
     State.
@@ -1087,10 +1066,13 @@ ensure_will_delay_timer(State = #state{will_msg = WillMsg}) ->
     send_willmsg(WillMsg),
     State#state{will_msg = undefined}.
 
-ensure_stats_timer(State = #state{enable_stats = true,
+ensure_stats_timer(State = #state{zone = Zone,
                                   stats_timer = undefined,
                                   idle_timeout = IdleTimeout}) ->
-    State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
+    case get_env(Zone, enable_stats, true) of
+        true -> State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
+        _Other -> State
+    end;
 ensure_stats_timer(State) ->
     State.
 

+ 4 - 5
src/emqx_zone.erl

@@ -62,7 +62,7 @@ get_env(Zone, Key) ->
 get_env(undefined, Key, Def) ->
     emqx_config:get_env(Key, Def);
 get_env(Zone, Key, Def) ->
-    try ets:lookup_element(?TAB, {Zone, Key}, 2)
+    try persistent_term:get({Zone, Key})
     catch error:badarg ->
         emqx_config:get_env(Key, Def)
     end.
@@ -84,7 +84,6 @@ stop() ->
 %%------------------------------------------------------------------------------
 
 init([]) ->
-    ok = emqx_tables:new(?TAB, [set, {read_concurrency, true}]),
     {ok, element(2, handle_info(reload, #{timer => undefined}))}.
 
 handle_call(force_reload, _From, State) ->
@@ -96,7 +95,7 @@ handle_call(Req, _From, State) ->
     {reply, ignored, State}.
 
 handle_cast({set_env, Zone, Key, Val}, State) ->
-    true = ets:insert(?TAB, {{Zone, Key}, Val}),
+    persistent_term:put({Zone, Key}, Val),
     {noreply, State};
 
 handle_cast(Msg, State) ->
@@ -122,11 +121,11 @@ code_change(_OldVsn, State, _Extra) ->
 %%------------------------------------------------------------------------------
 
 do_reload() ->
-    [ets:insert(?TAB, [{{Zone, Key}, Val} || {Key, Val} <- Opts])
+    [[persistent_term:put({Zone, Key}, Val)
+      || {Key, Val} <- Opts]
      || {Zone, Opts} <- emqx_config:get_env(zones, [])].
 
 ensure_reload_timer(State = #{timer := undefined}) ->
     State#{timer := erlang:send_after(timer:minutes(5), self(), reload)};
 ensure_reload_timer(State) ->
     State.
-

+ 7 - 16
test/emqx_protocol_SUITE.erl

@@ -55,7 +55,8 @@ groups() ->
 
 init_per_suite(Config) ->
     emqx_ct_helpers:start_apps([], fun set_special_configs/1),
-    emqx_zone:set_env(external, max_topic_alias, 20),
+    MqttCaps = emqx_zone:get_env(external, '$mqtt_caps'),
+    emqx_zone:set_env(external, '$mqtt_caps', MqttCaps#{max_topic_alias => 20}),
     Config.
 
 end_per_suite(_Config) ->
@@ -85,15 +86,6 @@ with_connection(DoFun, NumberOfConnections) ->
 with_connection(DoFun) ->
     with_connection(DoFun, 1).
 
-    % {ok, Sock} = emqx_client_sock:connect({127, 0, 0, 1}, 1883,
-    %                                       [binary, {packet, raw},
-    %                                        {active, false}], 3000),
-    % try
-    %     DoFun(Sock)
-    % after
-    %     emqx_client_sock:close(Sock)
-    % end.
-
 handle_followed_packet(_Config) ->
     ConnPkt = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>,
     PartialPkt1 = <<50,182,1,0,4,116,101,115,116,0,1,48,48,48,48,48,48,48,48,48,48,48,48,48,
@@ -191,16 +183,18 @@ connect_v5(_) ->
                             {ok, Data} = gen_tcp:recv(Sock, 0),
                             {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, Props), _} =
                                 raw_recv_parse(Data, ?MQTT_PROTO_V5),
-                            ?assertNot(maps:is_key('Response-Information', Props)),
-                            ok
+                            ?assertNot(maps:is_key('Response-Information', Props))
                     end),
 
     % topic alias = 0
     with_connection(fun([Sock]) ->
+
+                            %% ct:log("emqx_protocol: ~p~n", [emqx_zone:get_zone(external, max_topic_alias)]),
                             emqx_client_sock:send(Sock,
                                                   raw_send_serialize(
                                                       ?CONNECT_PACKET(
                                                           #mqtt_packet_connect{
+                                                              client_id  = "hello",
                                                               proto_ver  = ?MQTT_PROTO_V5,
                                                               properties =
                                                                   #{'Topic-Alias-Maximum' => 10}}),
@@ -210,7 +204,6 @@ connect_v5(_) ->
                             {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0,
                                                  #{'Topic-Alias-Maximum' := 20}), _} =
                                 raw_recv_parse(Data, ?MQTT_PROTO_V5),
-
                             emqx_client_sock:send(Sock,
                                                   raw_send_serialize(
                                                       ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, 1, #{'Topic-Alias' => 0}, <<"hello">>),
@@ -383,9 +376,7 @@ connect_v5(_) ->
                             ),
 
                             {ok, WillData} = gen_tcp:recv(Sock2, 0, 5000),
-                            {ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), _} = raw_recv_parse(WillData, ?MQTT_PROTO_V5),
-
-                            emqx_client_sock:close(Sock2)
+                            {ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), _} = raw_recv_parse(WillData, ?MQTT_PROTO_V5)
                     end),
 
     % duplicate client id

+ 2 - 2
test/emqx_session_SUITE.erl

@@ -31,7 +31,7 @@ end_per_suite(_Config) ->
     emqx_ct_helpers:stop_apps([]).
 
 ignore_loop(_Config) ->
-    application:set_env(emqx, mqtt_ignore_loop_deliver, true),
+    emqx_zone:set_env(external, ignore_loop_deliver, true),
     {ok, Client} = emqx_client:start_link(),
     {ok, _} = emqx_client:connect(Client),
     TestTopic = <<"Self">>,
@@ -41,7 +41,7 @@ ignore_loop(_Config) ->
     {ok, _} = emqx_client:publish(Client, TestTopic, <<"testmsg">>, 2),
     ?assertEqual(0, length(emqx_client_SUITE:receive_messages(3))),
     ok = emqx_client:disconnect(Client),
-    application:set_env(emqx, mqtt_ignore_loop_deliver, false).
+    emqx_zone:set_env(external, ignore_loop_deliver, false).
 
 t_session_all(_) ->
     emqx_zone:set_env(internal, idle_timeout, 1000),

+ 0 - 1
test/emqx_zone_SUITE.erl

@@ -25,7 +25,6 @@ all() -> [t_set_get_env].
 t_set_get_env(_) ->
     application:set_env(emqx, zones, [{china, [{language, chinese}]}]),
     {ok, _} = emqx_zone:start_link(),
-    ct:print("~p~n", [ets:tab2list(emqx_zone)]),
     chinese = emqx_zone:get_env(china, language),
     cn470 = emqx_zone:get_env(china, ism_band, cn470),
     undefined = emqx_zone:get_env(undefined, delay),