Przeglądaj źródła

Merge remote-tracking branch 'origin/develop'

zhanghongtong 6 lat temu
rodzic
commit
a2cb77781a

+ 1 - 1
README.md

@@ -2,7 +2,7 @@
 
 [![GitHub Release](https://img.shields.io/github/release/emqx/emqx?color=brightgreen)](https://github.com/emqx/emqx/releases)
 [![Build Status](https://travis-ci.org/emqx/emqx.svg)](https://travis-ci.org/emqx/emqx)
-[![Coverage Status](https://coveralls.io/repos/github/emqx/emqx/badge.svg)](https://coveralls.io/github/emqx/emqx)
+[![Coverage Status](https://coveralls.io/repos/github/emqx/emqx/badge.svg?branch=master)](https://coveralls.io/github/emqx/emqx?branch=master)
 [![Docker Pulls](https://img.shields.io/docker/pulls/emqx/emqx)](https://hub.docker.com/r/emqx/emqx)
 [![Slack Invite](<https://slack-invite.emqx.io/badge.svg>)](https://slack-invite.emqx.io)
 [![Twitter](https://img.shields.io/badge/Twitter-EMQ%20X-1DA1F2?logo=twitter)](https://twitter.com/emqtt)

+ 30 - 14
etc/emqx.conf

@@ -211,7 +211,7 @@ node.data_dir = {{ platform_data_dir }}
 ## Value: 0-1024
 ##
 ## vm.args: +A Number
-node.async_threads = 32
+## node.async_threads = 4
 
 ## Sets the maximum number of simultaneously existing processes for this
 ## system if a Number is passed as value.
@@ -221,7 +221,7 @@ node.async_threads = 32
 ## Value: Number [1024-134217727]
 ##
 ## vm.args: +P Number
-node.process_limit = 2048000
+## node.process_limit = 2097152
 
 ## Sets the maximum number of simultaneously existing ports for this system.
 ##
@@ -230,16 +230,16 @@ node.process_limit = 2048000
 ## Value: Number [1024-134217727]
 ##
 ## vm.args: +Q Number
-node.max_ports = 1024000
+## node.max_ports = 1048576
 
-## Set the distribution buffer busy limit (dist_buf_busy_limit).
+## Sets the distribution buffer busy limit (dist_buf_busy_limit).
 ##
 ## See: http://erlang.org/doc/man/erl.html
 ##
 ## Value: Number [1KB-2GB]
 ##
 ## vm.args: +zdbbl size
-node.dist_buffer_size = 8MB
+## node.dist_buffer_size = 8MB
 
 ## Sets the maximum number of ETS tables. Note that mnesia and SSL will
 ## create temporary ETS tables.
@@ -247,14 +247,26 @@ node.dist_buffer_size = 8MB
 ## Value: Number
 ##
 ## vm.args: +e Number
-node.max_ets_tables = 256000
+## node.max_ets_tables = 262144
+
+## Global GC Interval.
+##
+## Value: Duration
+##
+## Examples:
+##  - 2h:  2 hours
+##  - 30m: 30 minutes
+##  - 20s: 20 seconds
+##
+## Defaut: 15 minutes
+node.global_gc_interval = 15m
 
 ## Tweak GC to run more often.
 ##
 ## Value: Number [0-65535]
 ##
 ## vm.args: -env ERL_FULLSWEEP_AFTER Number
-node.fullsweep_after = 1000
+## node.fullsweep_after = 1000
 
 ## Crash dump log file.
 ##
@@ -277,7 +289,7 @@ node.crash_dump = {{ platform_log_dir }}/crash.dump
 ## Value: Number
 ##
 ## vm.args: -kernel net_ticktime Number
-node.dist_net_ticktime = 60
+## node.dist_net_ticktime = 120
 
 ## Sets the port range for the listener socket of a distributed Erlang node.
 ## Note that if there are firewalls between clustered nodes, this port segment
@@ -315,10 +327,11 @@ rpc.tcp_server_port = 5369
 ## Value: Port [1024-65535]
 rpc.tcp_client_port = 5369
 
-## Number of utgoing RPC connections.
+## Number of outgoing RPC connections.
 ##
 ## Value: Interger [1-256]
-rpc.tcp_client_num = 32
+## Defaults to NumberOfCPUSchedulers / 2
+#rpc.tcp_client_num = 1
 
 ## RCP Client connect timeout.
 ##
@@ -589,11 +602,11 @@ zone.external.enable_stats = on
 ## Default: ignore
 zone.external.acl_deny_action = ignore
 
-## Force MQTT connection/session process GC after this number of
+## Force the MQTT connection process GC after this number of
 ## messages | bytes passed through.
 ##
 ## Numbers delimited by `|'. Zero or negative is to disable.
-zone.external.force_gc_policy = 1000|1MB
+zone.external.force_gc_policy = 16000|16MB
 
 ## Max message queue length and total heap size to force shutdown
 ## connection/session process.
@@ -605,7 +618,7 @@ zone.external.force_gc_policy = 1000|1MB
 ## Default:
 ##   - 10000|32MB on ARCH_64 system
 ##   - 10000|16MB on ARCH_32 sytem
-## zone.external.force_shutdown_policy = 10000|32MB
+## zone.external.force_shutdown_policy = 32000|32MB
 
 ## Maximum MQTT packet size allowed.
 ##
@@ -780,6 +793,9 @@ zone.internal.enable_acl = off
 ## Default: ignore
 zone.internal.acl_deny_action = ignore
 
+## See zone.$name.force_gc_policy
+## zone.internal.force_gc_policy = 128000|128MB
+
 ## See zone.$name.wildcard_subscription.
 ##
 ## Value: boolean
@@ -825,7 +841,7 @@ zone.internal.enable_flapping_detect = off
 ## Default:
 ##   - 10000|32MB on ARCH_64 system
 ##   - 10000|16MB on ARCH_32 sytem
-zone.internal.force_shutdown_policy = 100000|64MB
+## zone.internal.force_shutdown_policy = 128000|128MB
 
 ## All the topics will be prefixed with the mountpoint path if this option is enabled.
 ##

+ 32 - 12
etc/vm.args

@@ -1,6 +1,6 @@
-##############################
-# Erlang VM Args
-##############################
+######################################################################
+## Erlang VM Args for EMQ X Broker
+######################################################################
 
 ## NOTE:
 ##
@@ -10,13 +10,13 @@
 ## such as `node.name` for `-name` and `node.cooke` for `-setcookie`.
 
 ## Sets the maximum number of simultaneously existing processes for this system.
-#+P 2048000
++P 2097152
 
 ## Sets the maximum number of simultaneously existing ports for this system.
-#+Q 1024000
++Q 1048576
 
 ## Sets the maximum number of ETS tables
-#+e 256000
++e 262144
 
 ## Sets the maximum number of atoms the virtual machine can handle.
 #+t 1048576
@@ -26,7 +26,7 @@
 
 ## Set how many times generational garbages collections can be done without
 ## forcing a fullsweep collection.
-#-env ERL_FULLSWEEP_AFTER 1000
+-env ERL_FULLSWEEP_AFTER 1000
 
 ## Heartbeat management; auto-restarts VM if it dies or becomes unresponsive
 ## (Disabled by default..use with caution!)
@@ -43,7 +43,7 @@
 ## Specifies the net_kernel tick time in seconds.
 ## This is the approximate time a connected node may be unresponsive until
 ## it is considered down and thereby disconnected.
-#-kernel net_ticktime 60
+-kernel net_ticktime 120
 
 ## Sets the distribution buffer busy limit (dist_buf_busy_limit).
 #+zdbbl 8192
@@ -52,7 +52,8 @@
 +spp true
 
 ## Sets the number of threads in async thread pool. Valid range is 0-1024.
-#+A 8
+## Increase the parameter if there are many simultaneous file I/O operations.
++A 4
 
 ## Sets the default heap size of processes to the size Size.
 #+hms 233
@@ -60,11 +61,20 @@
 ## Sets the default binary virtual heap size of processes to the size Size.
 #+hmbs 46422
 
+## Sets the default maximum heap size of processes to the size Size.
+## Defaults to 0, which means that no maximum heap size is used.
+##For more information, see process_flag(max_heap_size, MaxHeapSize).
+#+hmax 0
+
+## Sets the default value for process flag message_queue_data. Defaults to on_heap.
+#+hmqd on_heap | off_heap
+
 ## Sets the number of IO pollsets to use when polling for I/O.
 #+IOp 1
 
 ## Sets the number of IO poll threads to use when polling for I/O.
-#+IOt 1
+## Increase this for the busy systems with many concurrent connection.
++IOt 4
 
 ## Sets the number of scheduler threads to create and scheduler threads to set online.
 #+S 8:8
@@ -73,7 +83,7 @@
 #+SDcpu 8:8
 
 ## Sets the number of dirty I/O scheduler threads to create.
-#+SDio 10
++SDio 8
 
 ## Suggested stack size, in kilowords, for scheduler threads.
 #+sss 32
@@ -92,4 +102,14 @@
 #+sct L0-3c0-3p0N0:L4-7c0-3p1N1
 
 ## Sets the mapping of warning messages for error_logger
-#+W w
+#+W w
+
+## Sets time warp mode: no_time_warp | single_time_warp | multi_time_warp
+#+C no_time_warp
+
+## Prevents loading information about source filenames and line numbers.
+#+L
+
+## Specifies how long time (in milliseconds) to spend shutting down the system.
+## See: http://erlang.org/doc/man/erl.html
+#-shutdown_time 15000

+ 24 - 8
etc/vm.args.edge

@@ -1,6 +1,6 @@
-##############################
-# Erlang VM Args
-##############################
+######################################################################
+## Erlang VM Args for EMQ X Edge
+######################################################################
 
 ## NOTE:
 ##
@@ -10,8 +10,7 @@
 ## such as `node.name` for `-name` and `node.cooke` for `-setcookie`.
 
 ## Sets the maximum number of simultaneously existing processes for this system.
-+P 20480
-
++P 16384
 ## Sets the maximum number of simultaneously existing ports for this system.
 +Q 4096
 
@@ -19,7 +18,7 @@
 +e 512
 
 ## Sets the maximum number of atoms the virtual machine can handle.
-+t 65536
++t 262144
 
 ## Set the location of crash dumps
 -env ERL_CRASH_DUMP {{ platform_log_dir }}/crash.dump
@@ -30,7 +29,7 @@
 
 ## Heartbeat management; auto-restarts VM if it dies or becomes unresponsive
 ## (Disabled by default..use with caution!)
-#-heart
+-heart
 
 ## Specify the erlang distributed protocol.
 ## Can be one of: inet_tcp, inet6_tcp, inet_tls
@@ -52,6 +51,7 @@
 +spp false
 
 ## Sets the number of threads in async thread pool. Valid range is 0-1024.
+## Increase the parameter if there are many simultaneous file I/O operations.
 +A 1
 
 ## Sets the default heap size of processes to the size Size.
@@ -60,6 +60,14 @@
 ## Sets the default binary virtual heap size of processes to the size Size.
 #+hmbs 46422
 
+## Sets the default maximum heap size of processes to the size Size.
+## Defaults to 0, which means that no maximum heap size is used.
+##For more information, see process_flag(max_heap_size, MaxHeapSize).
+#+hmax 0
+
+## Sets the default value for process flag message_queue_data. Defaults to on_heap.
+#+hmqd on_heap | off_heap
+
 ## Sets the number of IO pollsets to use when polling for I/O.
 +IOp 1
 
@@ -94,5 +102,13 @@
 ## Sets the mapping of warning messages for error_logger
 #+W w
 
-#Prevents loading information about source filenames and line numbers.
+## Sets time warp mode: no_time_warp | single_time_warp | multi_time_warp
+#+C no_time_warp
+
+## Prevents loading information about source filenames and line numbers.
 +L
+
+## Specifies how long time (in milliseconds) to spend shutting down the system.
+## See: http://erlang.org/doc/man/erl.html
+-shutdown_time 10000
+

+ 20 - 29
priv/emqx.schema

@@ -205,8 +205,6 @@ end}.
   {default, "emqx@127.0.0.1"}
 ]}.
 
-
-
 %% @doc Specify SSL Options in the file if using SSL for erlang distribution
 {mapping, "node.ssl_dist_optfile", "vm_args.-ssl_dist_optfile", [
   {datatype, string},
@@ -237,7 +235,6 @@ end}.
 
 %% @doc More information at: http://erlang.org/doc/man/erl.html
 {mapping, "node.async_threads", "vm_args.+A", [
-  {default, 64},
   {datatype, integer},
   {validators, ["range:0-1024"]}
 ]}.
@@ -245,16 +242,12 @@ end}.
 %% @doc Erlang Process Limit
 {mapping, "node.process_limit", "vm_args.+P", [
   {datatype, integer},
-  {default, 256000},
   hidden
 ]}.
 
-%% Note: OTP R15 and earlier uses -env ERL_MAX_PORTS, R16+ uses +Q
-%% @doc The number of concurrent ports/sockets
+%% @doc The maximum number of concurrent ports/sockets.
 %% Valid range is 1024-134217727
-{mapping, "node.max_ports",
-  cuttlefish:otp("R16", "vm_args.+Q", "vm_args.-env ERL_MAX_PORTS"), [
-  {default, 262144},
+{mapping, "node.max_ports", "vm_args.+Q", [
   {datatype, integer},
   {validators, ["range4ports"]}
 ]}.
@@ -287,6 +280,11 @@ end}.
  end
 }.
 
+%% @doc Global GC Interval
+{mapping, "node.global_gc_interval", "emqx.global_gc_interval", [
+  {datatype, {duration, s}}
+]}.
+
 %% @doc http://www.erlang.org/doc/man/erlang.html#system_flag-2
 {mapping, "node.fullsweep_after", "vm_args.-env ERL_FULLSWEEP_AFTER", [
   {default, 1000},
@@ -317,7 +315,6 @@ end}.
 
 %% @doc http://www.erlang.org/doc/man/kernel_app.html#net_ticktime
 {mapping, "node.dist_net_ticktime", "vm_args.-kernel net_ticktime", [
-  {commented, 60},
   {datatype, integer},
   hidden
 ]}.
@@ -365,11 +362,18 @@ end}.
 
 %% Default TCP port for outgoing connections
 {mapping, "rpc.tcp_client_num", "gen_rpc.tcp_client_num", [
-  {default, 32},
+  {default, 0},
   {datatype, integer},
   {validators, ["range:gt_0_lt_256"]}
 ]}.
 
+{translation, "gen_rpc.tcp_client_num", fun(Conf) ->
+  case cuttlefish:conf_get("rpc.tcp_client_num", Conf) of
+    0 -> max(1, erlang:system_info(schedulers) div 2);
+    V -> V
+  end
+end}.
+
 %% Client connect timeout
 {mapping, "rpc.connect_timeout", "gen_rpc.connect_timeout", [
   {default, "5s"},
@@ -431,7 +435,7 @@ end}.
 ]}.
 
 {validator, "range:gt_0_lt_256", "must greater than 0 and less than 256",
-  fun(X) -> X > 0 andalso X < 256 end
+  fun(X) -> X >= 0 andalso X < 256 end
 }.
 
 %%--------------------------------------------------------------------
@@ -640,15 +644,15 @@ end}.
 {translation, "emqx.flapping_detect_policy", fun(Conf) ->
     Policy = cuttlefish:conf_get("flapping_detect_policy", Conf),
     [Threshold, Duration, Interval] = string:tokens(Policy, ", "),
-    ParseDuration = fun(S) ->
-                        case cuttlefish_duration:parse(S, ms) of
+    ParseDuration = fun(S, Dur) ->
+                        case cuttlefish_duration:parse(S, Dur) of
                             I when is_integer(I) -> I;
                             {error, Reason} -> error(Reason)
                         end
                     end,
     #{threshold => list_to_integer(Threshold),
-      duration  => ParseDuration(Duration),
-      banned_interval => ParseDuration(Interval)
+      duration  => ParseDuration(Duration, ms),
+      banned_interval => ParseDuration(Interval, s)
      }
 end}.
 
@@ -913,7 +917,6 @@ end}.
 %% messages | bytes passed through.
 %% Numbers delimited by `|'. Zero or negative is to disable.
 {mapping, "zone.$name.force_gc_policy", "emqx.zones", [
-   {default, "0 | 0MB"},
    {datatype, string}
  ]}.
 
@@ -923,7 +926,6 @@ end}.
 %% of queued MQTT messages of QoS 1 and 2.
 %% Zero or negative is to disable.
 {mapping, "zone.$name.force_shutdown_policy", "emqx.zones", [
-  {default, "default"},
   {datatype, string}
 ]}.
 
@@ -963,17 +965,6 @@ end}.
                                          count => list_to_integer(Count)}
                                end,
                     {force_gc_policy, GcPolicy};
-               ("force_shutdown_policy", "default") ->
-                    {DefaultLen, DefaultSize} =
-                        case WordSize = erlang:system_info(wordsize) of
-                            8 -> % arch_64
-                              {10000, cuttlefish_bytesize:parse("32MB")};
-                            4 -> % arch_32
-                              {10000, cuttlefish_bytesize:parse("16MB")}
-                        end,
-                    {force_shutdown_policy, #{message_queue_len => DefaultLen,
-                                              max_heap_size => DefaultSize div WordSize
-                                             }};
                ("force_shutdown_policy", Val) ->
                     [Len, Siz] = string:tokens(Val, "| "),
                     MaxSiz = case WordSize = erlang:system_info(wordsize) of

+ 1 - 1
src/emqx_banned.erl

@@ -85,7 +85,7 @@ do_check(Who) when is_tuple(Who) ->
     case mnesia:dirty_read(?BANNED_TAB, Who) of
         [] -> false;
         [#banned{until = Until}] ->
-            Until > erlang:system_time(millisecond)
+            Until > erlang:system_time(second)
     end.
 
 -spec(create(emqx_types:banned()) -> ok).

+ 1 - 1
src/emqx_flapping.erl

@@ -124,7 +124,7 @@ handle_cast({detected, #flapping{clientid   = ClientId,
         true -> %% Flapping happened:(
             ?LOG(error, "Flapping detected: ~s(~s) disconnected ~w times in ~wms",
                  [ClientId, inet:ntoa(PeerHost), DetectCnt, Duration]),
-            Now = erlang:system_time(millisecond),
+            Now = erlang:system_time(second),
             Banned = #banned{who    = {clientid, ClientId},
                              by     = <<"flapping detector">>,
                              reason = <<"flapping is detected">>,

+ 97 - 0
src/emqx_global_gc.erl

@@ -0,0 +1,97 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_global_gc).
+
+-behaviour(gen_server).
+
+-include("types.hrl").
+
+-export([start_link/0, stop/0]).
+
+-export([run/0]).
+
+%% gen_server callbacks
+-export([ init/1
+        , handle_call/3
+        , handle_cast/2
+        , handle_info/2
+        , terminate/2
+        , code_change/3
+        ]).
+
+%% 5 minutes
+%% -define(DEFAULT_INTERVAL, 300000).
+
+%%--------------------------------------------------------------------
+%% APIs
+%%--------------------------------------------------------------------
+
+-spec(start_link() -> startlink_ret()).
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+-spec(run() -> {ok, timer:time()}).
+run() -> gen_server:call(?MODULE, run, infinity).
+
+-spec(stop() -> ok).
+stop() -> gen_server:stop(?MODULE).
+
+%%--------------------------------------------------------------------
+%% gen_server callbacks
+%%--------------------------------------------------------------------
+
+init([]) ->
+    {ok, ensure_timer(#{timer => undefined})}.
+
+handle_call(run, _From, State) ->
+    {Time, _} = timer:tc(fun run_gc/0),
+    {reply, {ok, Time div 1000}, State, hibernate};
+
+handle_call(_Req, _From, State) ->
+    {reply, ignored, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info({timeout, TRef, run}, State = #{timer := TRef}) ->
+    run_gc(),
+    {noreply, ensure_timer(State), hibernate};
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%% Internel function
+%%--------------------------------------------------------------------
+
+ensure_timer(State) ->
+    case emqx:get_env(global_gc_interval) of
+        undefined -> State;
+        Interval  -> TRef = emqx_misc:start_timer(timer:seconds(Interval), run),
+                     State#{timer := TRef}
+    end.
+
+run_gc() ->
+    [garbage_collect(P) || P <- processes(),
+                           {status, waiting} == process_info(P, status)].
+

+ 6 - 4
src/emqx_kernel_sup.erl

@@ -27,7 +27,8 @@ start_link() ->
 
 init([]) ->
     {ok, {{one_for_one, 10, 100},
-          [child_spec(emqx_pool_sup, supervisor),
+          [child_spec(emqx_global_gc, worker),
+           child_spec(emqx_pool_sup, supervisor),
            child_spec(emqx_hooks, worker),
            child_spec(emqx_stats, worker),
            child_spec(emqx_metrics, worker),
@@ -40,7 +41,8 @@ child_spec(M, worker) ->
       restart  => permanent,
       shutdown => 5000,
       type     => worker,
-      modules  => [M]};
+      modules  => [M]
+     };
 
 child_spec(M, supervisor) ->
     #{id       => M,
@@ -48,6 +50,6 @@ child_spec(M, supervisor) ->
       restart  => permanent,
       shutdown => infinity,
       type     => supervisor,
-      modules  => [M]}.
-
+      modules  => [M]
+     }.
 

+ 11 - 7
src/emqx_listeners.erl

@@ -82,7 +82,7 @@ start_mqtt_listener(Name, ListenOn, Options) ->
                 {emqx_connection, start_link, [Options -- SockOpts]}).
 
 start_http_listener(Start, Name, ListenOn, RanchOpts, ProtoOpts) ->
-    Start(Name, with_port(ListenOn, RanchOpts), ProtoOpts).
+    Start(ws_name(Name, ListenOn), with_port(ListenOn, RanchOpts), ProtoOpts).
 
 mqtt_path(Options) ->
     proplists:get_value(mqtt_path, Options, "/mqtt").
@@ -125,10 +125,10 @@ restart_listener(tcp, ListenOn, _Options) ->
 restart_listener(Proto, ListenOn, _Options) when Proto == ssl; Proto == tls ->
     esockd:reopen('mqtt:ssl', ListenOn);
 restart_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws ->
-    cowboy:stop_listener('mqtt:ws'),
+    cowboy:stop_listener(ws_name('mqtt:ws', ListenOn)),
     start_listener(Proto, ListenOn, Options);
 restart_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss ->
-    cowboy:stop_listener('mqtt:wss'),
+    cowboy:stop_listener(ws_name('mqtt:wss', ListenOn)),
     start_listener(Proto, ListenOn, Options);
 restart_listener(Proto, ListenOn, _Opts) ->
     esockd:reopen(Proto, ListenOn).
@@ -156,10 +156,10 @@ stop_listener(tcp, ListenOn, _Opts) ->
     esockd:close('mqtt:tcp', ListenOn);
 stop_listener(Proto, ListenOn, _Opts) when Proto == ssl; Proto == tls ->
     esockd:close('mqtt:ssl', ListenOn);
-stop_listener(Proto, _ListenOn, _Opts) when Proto == http; Proto == ws ->
-    cowboy:stop_listener('mqtt:ws');
-stop_listener(Proto, _ListenOn, _Opts) when Proto == https; Proto == wss ->
-    cowboy:stop_listener('mqtt:wss');
+stop_listener(Proto, ListenOn, _Opts) when Proto == http; Proto == ws ->
+    cowboy:stop_listener(ws_name('mqtt:ws', ListenOn));
+stop_listener(Proto, ListenOn, _Opts) when Proto == https; Proto == wss ->
+    cowboy:stop_listener(ws_name('mqtt:wss', ListenOn));
 stop_listener(Proto, ListenOn, _Opts) ->
     esockd:close(Proto, ListenOn).
 
@@ -178,3 +178,7 @@ format({Addr, Port}) when is_list(Addr) ->
 format({Addr, Port}) when is_tuple(Addr) ->
     io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).
 
+ws_name(Name, {_Addr, Port}) ->
+    ws_name(Name, Port);
+ws_name(Name, Port) ->
+    list_to_atom(lists:concat([Name, ":", Port])).

+ 12 - 2
src/emqx_misc.erl

@@ -23,6 +23,8 @@
 
 -export([ merge_opts/2
         , maybe_apply/2
+        , compose/1
+        , compose/2
         , run_fold/3
         , pipeline/3
         , start_timer/2
@@ -56,11 +58,19 @@ merge_opts(Defaults, Options) ->
 %% @doc Apply a function to a maybe argument.
 -spec(maybe_apply(fun((maybe(A)) -> maybe(A)), maybe(A))
       -> maybe(A) when A :: any()).
-maybe_apply(_Fun, undefined) ->
-    undefined;
+maybe_apply(_Fun, undefined) -> undefined;
 maybe_apply(Fun, Arg) when is_function(Fun) ->
     erlang:apply(Fun, [Arg]).
 
+-spec(compose(list(F)) -> G when F :: fun((any()) -> any()),
+                                 G :: fun((any()) -> any())).
+compose([F|More]) -> compose(F, More).
+
+-spec(compose(fun((X) -> Y), fun((Y) -> Z)) -> fun((X) -> Z)).
+compose(F, G) when is_function(G) -> fun(X) -> G(F(X)) end;
+compose(F, [G]) -> compose(F, G);
+compose(F, [G|More]) -> compose(compose(F, G), More).
+
 %% @doc RunFold
 run_fold([], Acc, _State) ->
     Acc;

+ 30 - 9
src/emqx_mqtt_caps.erl

@@ -29,6 +29,8 @@
         , get_caps/3
         ]).
 
+-export([default_caps/0]).
+
 -export([default/0]).
 
 -export_type([caps/0]).
@@ -116,19 +118,31 @@ do_check_sub(#{is_shared := true}, #{shared_subscription := false}) ->
     {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED};
 do_check_sub(_Flags, _Caps) -> ok.
 
--spec(get_caps(emqx_zone:zone()) -> caps()).
-get_caps(Zone) ->
-    maps:map(fun(Cap, Def) -> emqx_zone:get_env(Zone, Cap, Def) end, ?DEFAULT_CAPS).
+default_caps() ->
+    ?DEFAULT_CAPS.
+
+get_caps(Zone, Cap, Def) ->
+    emqx_zone:get_env(Zone, Cap, Def).
 
--spec(get_caps(emqx_zone:zone(), publish|subscribe) -> caps()).
 get_caps(Zone, publish) ->
-    filter_caps(?PUBCAP_KEYS, get_caps(Zone));
+    with_env(Zone, '$mqtt_pub_caps',
+             fun() ->
+                 filter_caps(?PUBCAP_KEYS, get_caps(Zone))
+             end);
+
 get_caps(Zone, subscribe) ->
-    filter_caps(?SUBCAP_KEYS, get_caps(Zone)).
+    with_env(Zone, '$mqtt_sub_caps',
+             fun() ->
+                 filter_caps(?SUBCAP_KEYS, get_caps(Zone))
+             end).
 
--spec(get_caps(emqx_zone:zone(), atom(), term()) -> term()).
-get_caps(Zone, Cap, Def) ->
-    emqx_zone:get_env(Zone, Cap, Def).
+get_caps(Zone) ->
+    with_env(Zone, '$mqtt_caps',
+             fun() ->
+                maps:map(fun(Cap, Def) ->
+                    emqx_zone:get_env(Zone, Cap, Def)
+                end, ?DEFAULT_CAPS)
+             end).
 
 filter_caps(Keys, Caps) ->
     maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps).
@@ -136,3 +150,10 @@ filter_caps(Keys, Caps) ->
 -spec(default() -> caps()).
 default() -> ?DEFAULT_CAPS.
 
+with_env(Zone, Key, InitFun) ->
+    case emqx_zone:get_env(Zone, Key) of
+        undefined -> Caps = InitFun(),
+                     ok = emqx_zone:set_env(Zone, Key, Caps),
+                     Caps;
+        ZoneCaps  -> ZoneCaps
+    end.

+ 4 - 4
src/emqx_session.erl

@@ -516,12 +516,12 @@ enrich_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS},
 enrich_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS},
                Session = #session{upgrade_qos = false}) ->
     enrich_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session);
-enrich_subopts([{rap, _}|Opts], Msg = #message{headers = #{retained := true}}, Session) ->
-    enrich_subopts(Opts, emqx_message:set_flag(retain, true, Msg), Session);
-enrich_subopts([{rap, 0}|Opts], Msg, Session) ->
-    enrich_subopts(Opts, emqx_message:set_flag(retain, false, Msg), Session);
 enrich_subopts([{rap, 1}|Opts], Msg, Session) ->
     enrich_subopts(Opts, Msg, Session);
+enrich_subopts([{rap, 0}|Opts], Msg = #message{headers = #{retained := true}}, Session) ->
+    enrich_subopts(Opts, Msg, Session);
+enrich_subopts([{rap, 0}|Opts], Msg, Session) ->
+    enrich_subopts(Opts, emqx_message:set_flag(retain, false, Msg), Session);
 enrich_subopts([{subid, SubId}|Opts], Msg, Session) ->
     Msg1 = emqx_message:set_header('Subscription-Identifier', SubId, Msg),
     enrich_subopts(Opts, Msg1, Session).

+ 2 - 2
src/emqx_ws_connection.erl

@@ -429,9 +429,9 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
 
 run_gc(Stats, State = #state{gc_state = GcSt}) ->
     case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
+        false -> State;
         {_IsGC, GcSt1} ->
-            State#state{gc_state = GcSt1};
-        false -> State
+            State#state{gc_state = GcSt1}
     end.
 
 check_oom(State = #state{channel = Channel}) ->

+ 3 - 1
src/emqx_zone.erl

@@ -45,6 +45,8 @@
           , session_expiry_interval/1
           , force_gc_policy/1
           , force_shutdown_policy/1
+          , get_env/2
+          , get_env/3
           ]}).
 
 %% APIs
@@ -114,7 +116,7 @@ start_link() ->
 stop() ->
     gen_server:stop(?SERVER).
 
--spec(init_gc_state(zone()) -> emqx_gc:gc_state()).
+-spec(init_gc_state(zone()) -> maybe(emqx_gc:gc_state())).
 init_gc_state(Zone) ->
     maybe_apply(fun emqx_gc:init/1, force_gc_policy(Zone)).
 

+ 2 - 2
test/emqx_flapping_SUITE.erl

@@ -31,7 +31,7 @@ set_special_configs(emqx) ->
     application:set_env(emqx, flapping_detect_policy,
                         #{threshold => 3,
                           duration => 100,
-                          banned_interval => 200
+                          banned_interval => 2
                          });
 set_special_configs(_App) -> ok.
 
@@ -52,7 +52,7 @@ t_detect_check(_) ->
     true = emqx_flapping:detect(ClientInfo),
     timer:sleep(100),
     true = emqx_banned:check(ClientInfo),
-    timer:sleep(200),
+    timer:sleep(3000),
     false = emqx_banned:check(ClientInfo),
     Childrens = supervisor:which_children(emqx_cm_sup),
     {flapping, Pid, _, _} = lists:keyfind(flapping, 1, Childrens),

+ 33 - 0
test/emqx_global_gc_SUITE.erl

@@ -0,0 +1,33 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_global_gc_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+
+all() -> emqx_ct:all(?MODULE).
+
+t_run_gc(_) ->
+    ok = application:set_env(emqx, global_gc_interval, 1),
+    {ok, _} = emqx_global_gc:start_link(),
+    ok = timer:sleep(1500),
+    {ok, MilliSecs} = emqx_global_gc:run(),
+    ct:print("Global GC: ~w(ms)~n", [MilliSecs]),
+    emqx_global_gc:stop().
+

+ 6 - 12
test/emqx_mqtt_caps_SUITE.erl

@@ -28,9 +28,8 @@ t_check_pub(_) ->
     PubCaps = #{max_qos_allowed => ?QOS_1,
                 retain_available => false
                },
-    lists:foreach(fun({Key, Val}) ->
-        ok = emqx_zone:set_env(zone, Key, Val)
-    end, maps:to_list(PubCaps)),
+    emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps),
+    timer:sleep(50),
     ok = emqx_mqtt_caps:check_pub(zone, #{qos => ?QOS_1,
                                           retain => false}),
     PubFlags1 = #{qos => ?QOS_2, retain => false},
@@ -39,9 +38,7 @@ t_check_pub(_) ->
     PubFlags2 = #{qos => ?QOS_1, retain => true},
     ?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED},
                  emqx_mqtt_caps:check_pub(zone, PubFlags2)),
-    lists:foreach(fun({Key, _Val}) ->
-        true = emqx_zone:unset_env(zone, Key)
-    end, maps:to_list(PubCaps)).
+    emqx_zone:unset_env(zone, '$mqtt_pub_caps').
 
 t_check_sub(_) ->
     SubOpts = #{rh  => 0,
@@ -54,9 +51,8 @@ t_check_sub(_) ->
                 shared_subscription => false,
                 wildcard_subscription => false
                },
-    lists:foreach(fun({Key, Val}) ->
-        ok = emqx_zone:set_env(zone, Key, Val)
-    end, maps:to_list(SubCaps)),
+    emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps),
+    timer:sleep(50),
     ok = emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts),
     ?assertEqual({error, ?RC_TOPIC_FILTER_INVALID},
                  emqx_mqtt_caps:check_sub(zone, <<"a/b/c/d">>, SubOpts)),
@@ -64,6 +60,4 @@ t_check_sub(_) ->
                  emqx_mqtt_caps:check_sub(zone, <<"+/#">>, SubOpts)),
     ?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
                  emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts#{share => true})),
-    lists:foreach(fun({Key, _Val}) ->
-        true = emqx_zone:unset_env(zone, Key)
-    end, maps:to_list(SubCaps)).
+    emqx_zone:unset_env(zone, '$mqtt_pub_caps').

+ 11 - 6
test/mqtt_protocol_v5_SUITE.erl

@@ -147,8 +147,8 @@ t_connect_keepalive_timeout(_) ->
         Msg -> 
             ReasonCode = 141,
             ?assertMatch({disconnected, ReasonCode, _Channel}, Msg)
-    after 
-        round(timer:seconds(Keepalive) * 2 * 1.5 ) -> error("keepalive timeout")
+    after round(timer:seconds(Keepalive) * 2 * 1.5 ) ->
+        error("keepalive timeout")
     end.
 
 %%--------------------------------------------------------------------
@@ -160,7 +160,7 @@ t_shared_subscriptions_client_terminates_when_qos_eq_2(_) ->
     application:set_env(emqx, shared_dispatch_ack_enabled, true),
 
     Topic = nth(1, ?TOPICS),
-    Shared_topic = list_to_binary("$share/sharename/" ++ binary_to_list(<<"TopicA">>)),
+    SharedTopic = list_to_binary("$share/sharename/" ++ binary_to_list(<<"TopicA">>)),
 
     CRef = counters:new(1, [atomics]),
     meck:expect(emqtt, connected, 
@@ -174,18 +174,23 @@ t_shared_subscriptions_client_terminates_when_qos_eq_2(_) ->
                                     {clientid, <<"sub_client_1">>},
                                     {keepalive, 5}]),
     {ok, _} = emqtt:connect(Sub1),
-    {ok, _, [2]} = emqtt:subscribe(Sub1, Shared_topic, qos2),
+    {ok, _, [2]} = emqtt:subscribe(Sub1, SharedTopic, qos2),
+
     {ok, Sub2} = emqtt:start_link([{proto_ver, v5},
                                     {clientid, <<"sub_client_2">>},
                                     {keepalive, 5}]),
     {ok, _} = emqtt:connect(Sub2),
-    {ok, _, [2]} = emqtt:subscribe(Sub2, Shared_topic, qos2),
+    {ok, _, [2]} = emqtt:subscribe(Sub2, SharedTopic, qos2),
 
     {ok, Pub} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"pub_client">>}]),
     {ok, _} = emqtt:connect(Pub),
     {ok, _} = emqtt:publish(Pub, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 2),
 
     receive
-        {disconnected,shutdown,for_testiong} -> ok
+        {'EXIT', _,{shutdown, for_testiong}} ->
+            ok
+    after 1000 ->
+            error("disconnected timeout")
     end,
+
     ?assertEqual(1, counters:get(CRef, 1)).