瀏覽代碼

Add 'active_n' option for WebSocket listener

Feng Lee 6 年之前
父節點
當前提交
4c9dda105f
共有 4 個文件被更改,包括 27 次插入160 次删除
  1. 12 2
      etc/emqx.conf
  2. 15 4
      priv/emqx.schema
  3. 0 120
      src/emqx_zone_options.erl
  4. 0 34
      test/emqx_time_SUITE.erl

+ 12 - 2
etc/emqx.conf

@@ -1349,9 +1349,14 @@ listener.ws.external.max_connections = 102400
 ## Value: Number
 listener.ws.external.max_conn_rate = 1000
 
+## Simulate the {active, N} option for the MQTT/WebSocket connections.
+##
+## Value: Number
+listener.ws.external.active_n = 100
+
 ## Rate limit for the MQTT/WebSocket connections.
 ##
-## Value: limit,duration
+## Value: Limit,Duration
 ## Default: 100KB incoming per 10 seconds.
 ## listener.ws.external.rate_limit = 100KB,10s
 
@@ -1557,9 +1562,14 @@ listener.wss.external.max_connections = 16
 ## Value: Number
 listener.wss.external.max_conn_rate = 1000
 
+## Simulate the {active, N} option for the MQTT/WebSocket/SSL connections.
+##
+## Value: Number
+listener.wss.external.active_n = 100
+
 ## Rate limit for the MQTT/WebSocket/SSL connections.
 ##
-## Value: limit,duration
+## Value: Limit,Duration
 ## Default: 100KB incoming per 10 seconds.
 ## listener.wss.external.rate_limit = 100KB,10s
 

+ 15 - 4
priv/emqx.schema

@@ -959,17 +959,18 @@ end}.
                     {force_gc_policy, GcPolicy};
                ("force_shutdown_policy", "default") ->
                     {DefaultLen, DefaultSize} =
-                        case erlang:system_info(wordsize) of
+                        case WordSize = erlang:system_info(wordsize) of
                             8 -> % arch_64
                               {8000, cuttlefish_bytesize:parse("800MB")};
                             4 -> % arch_32
                               {1000, cuttlefish_bytesize:parse("100MB")}
                         end,
                     {force_shutdown_policy, #{message_queue_len => DefaultLen,
-                                              max_heap_size => DefaultSize}};
+                                              max_heap_size => DefaultSize div WordSize
+                                             }};
                ("force_shutdown_policy", Val) ->
                     [Len, Siz] = string:tokens(Val, "| "),
-                    MaxSiz = case erlang:system_info(wordsize) of
+                    MaxSiz = case WordSize = erlang:system_info(wordsize) of
                                 8 -> % arch_64
                                   (1 bsl 59) - 1;
                                 4 -> % arch_32
@@ -983,7 +984,7 @@ end}.
                               cuttlefish:invalid(io_lib:format("force_shutdown_policy: heap-size ~s is too large", [Siz]));
                           Siz1 ->
                               #{message_queue_len => list_to_integer(Len),
-                                max_heap_size => Siz1}
+                                max_heap_size => Siz1 div WordSize}
                       end,
                     {force_shutdown_policy, ShutdownPolicy};
                ("mqueue_priorities", Val) ->
@@ -1289,6 +1290,11 @@ end}.
   {datatype, integer}
 ]}.
 
+{mapping, "listener.ws.$name.active_n", "emqx.listeners", [
+  {default, 100},
+  {datatype, integer}
+]}.
+
 {mapping, "listener.ws.$name.zone", "emqx.listeners", [
   {datatype, string}
 ]}.
@@ -1442,6 +1448,11 @@ end}.
   {datatype, integer}
 ]}.
 
+{mapping, "listener.wss.$name.active_n", "emqx.listeners", [
+  {default, 100},
+  {datatype, integer}
+]}.
+
 {mapping, "listener.wss.$name.zone", "emqx.listeners", [
   {datatype, string}
 ]}.

+ 0 - 120
src/emqx_zone_options.erl

@@ -1,120 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_zone_options).
-
--compile(inline).
-
--include("types.hrl").
--include("emqx_mqtt.hrl").
-
--export([ idle_timeout/1
-        , publish_limit/1
-        , mqtt_frame_options/1
-        , mqtt_strict_mode/1
-        , max_packet_size/1
-        , mountpoint/1
-        , use_username_as_clientid/1
-        , enable_stats/1
-        , enable_acl/1
-        , enable_ban/1
-        , enable_flapping_detect/1
-        , ignore_loop_deliver/1
-        , server_keepalive/1
-        , keepalive_backoff/1
-        , max_inflight/1
-        , session_expiry_interval/1
-        , force_gc_policy/1
-        , force_shutdown_policy/1
-        ]).
-
--import(emqx_zone, [get_env/2, get_env/3]).
-
--define(DEFAULT_IDLE_TIMEOUT, 30000).
-
--spec(idle_timeout(emqx_zone:zone()) -> pos_integer()).
-idle_timeout(Zone) ->
-    get_env(Zone, idle_timeout, ?DEFAULT_IDLE_TIMEOUT).
-
--spec(publish_limit(emqx_zone:zone()) -> maybe(esockd_rate_limit:bucket())).
-publish_limit(Zone) ->
-    get_env(Zone, publish_limit).
-
--spec(mqtt_frame_options(emqx_zone:zone()) -> emqx_frame:options()).
-mqtt_frame_options(Zone) ->
-    #{strict_mode => mqtt_strict_mode(Zone),
-      max_size    => max_packet_size(Zone)
-     }.
-
--spec(mqtt_strict_mode(emqx_zone:zone()) -> boolean()).
-mqtt_strict_mode(Zone) ->
-    get_env(Zone, mqtt_strict_mode, false).
-
--spec(max_packet_size(emqx_zone:zone()) -> integer()).
-max_packet_size(Zone) ->
-    get_env(Zone, max_packet_size, ?MAX_PACKET_SIZE).
-
--spec(mountpoint(emqx_zone:zone()) -> maybe(emqx_mountpoint:mountpoint())).
-mountpoint(Zone) -> get_env(Zone, mountpoint).
-
--spec(use_username_as_clientid(emqx_zone:zone()) -> boolean()).
-use_username_as_clientid(Zone) ->
-    get_env(Zone, use_username_as_clientid, false).
-
--spec(enable_stats(emqx_zone:zone()) -> boolean()).
-enable_stats(Zone) ->
-    get_env(Zone, enable_stats, true).
-
--spec(enable_acl(emqx_zone:zone()) -> boolean()).
-enable_acl(Zone) ->
-    get_env(Zone, enable_acl, true).
-
--spec(enable_ban(emqx_zone:zone()) -> boolean()).
-enable_ban(Zone) ->
-    get_env(Zone, enable_ban, false).
-
--spec(enable_flapping_detect(emqx_zone:zone()) -> boolean()).
-enable_flapping_detect(Zone) ->
-    get_env(Zone, enable_flapping_detect, false).
-
--spec(ignore_loop_deliver(emqx_zone:zone()) -> boolean()).
-ignore_loop_deliver(Zone) ->
-    get_env(Zone, ignore_loop_deliver, false).
-
--spec(server_keepalive(emqx_zone:zone()) -> pos_integer()).
-server_keepalive(Zone) ->
-    get_env(Zone, server_keepalive).
-
--spec(keepalive_backoff(emqx_zone:zone()) -> float()).
-keepalive_backoff(Zone) ->
-    get_env(Zone, keepalive_backoff, 0.75).
-
--spec(max_inflight(emqx_zone:zone()) -> 0..65535).
-max_inflight(Zone) ->
-    get_env(Zone, max_inflight, 65535).
-
--spec(session_expiry_interval(emqx_zone:zone()) -> non_neg_integer()).
-session_expiry_interval(Zone) ->
-    get_env(Zone, session_expiry_interval, 0).
-
--spec(force_gc_policy(emqx_zone:zone()) -> maybe(emqx_gc:opts())).
-force_gc_policy(Zone) ->
-    get_env(Zone, force_gc_policy).
-
--spec(force_shutdown_policy(emqx_zone:zone()) -> maybe(emqx_oom:opts())).
-force_shutdown_policy(Zone) ->
-    get_env(Zone, force_shutdown_policy).
-

+ 0 - 34
test/emqx_time_SUITE.erl

@@ -1,34 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_time_SUITE).
-
--compile(export_all).
--compile(nowarn_export_all).
-
--include_lib("eunit/include/eunit.hrl").
-
-all() -> emqx_ct:all(?MODULE).
-
-t_seed(_) ->
-    ?assert(is_tuple(emqx_time:seed())).
-
-t_now_secs(_) ->
-    ?assert(emqx_time:now_secs() =< emqx_time:now_secs(os:timestamp())).
-
-t_now_ms(_) ->
-    ?assert(emqx_time:now_ms() =< emqx_time:now_ms(os:timestamp())).
-