Просмотр исходного кода

Merge branch 'develop' into tune-node-config

Shawn 6 лет назад
Родитель
Сommit
4655ef94d2
4 измененных файлов с 51 добавлено и 28 удалено
  1. 6 5
      etc/emqx.conf
  2. 9 2
      priv/emqx.schema
  3. 30 9
      src/emqx_mqtt_caps.erl
  4. 6 12
      test/emqx_mqtt_caps_SUITE.erl

+ 6 - 5
etc/emqx.conf

@@ -330,7 +330,8 @@ rpc.tcp_client_port = 5369
 ## Number of outgoing RPC connections.
 ##
 ## Value: Interger [1-256]
-rpc.tcp_client_num = 32
+## Defaults to NumberOfCPUSchedulers / 2
+#rpc.tcp_client_num = 1
 
 ## RCP Client connect timeout.
 ##
@@ -605,7 +606,7 @@ zone.external.acl_deny_action = ignore
 ## messages | bytes passed through.
 ##
 ## Numbers delimited by `|'. Zero or negative is to disable.
-zone.external.force_gc_policy = 10000|10MB
+zone.external.force_gc_policy = 16000|16MB
 
 ## Max message queue length and total heap size to force shutdown
 ## connection/session process.
@@ -617,7 +618,7 @@ zone.external.force_gc_policy = 10000|10MB
 ## Default:
 ##   - 10000|32MB on ARCH_64 system
 ##   - 10000|16MB on ARCH_32 sytem
-## zone.external.force_shutdown_policy = 10000|32MB
+## zone.external.force_shutdown_policy = 32000|32MB
 
 ## Maximum MQTT packet size allowed.
 ##
@@ -793,7 +794,7 @@ zone.internal.enable_acl = off
 zone.internal.acl_deny_action = ignore
 
 ## See zone.$name.force_gc_policy
-## zone.internal.force_gc_policy = 100000|100MB
+## zone.internal.force_gc_policy = 128000|128MB
 
 ## See zone.$name.wildcard_subscription.
 ##
@@ -840,7 +841,7 @@ zone.internal.enable_flapping_detect = off
 ## Default:
 ##   - 10000|32MB on ARCH_64 system
 ##   - 10000|16MB on ARCH_32 sytem
-## zone.internal.force_shutdown_policy = 100000|64MB
+## zone.internal.force_shutdown_policy = 128000|128MB
 
 ## All the topics will be prefixed with the mountpoint path if this option is enabled.
 ##

+ 9 - 2
priv/emqx.schema

@@ -362,11 +362,18 @@ end}.
 
 %% Default TCP port for outgoing connections
 {mapping, "rpc.tcp_client_num", "gen_rpc.tcp_client_num", [
-  {default, 32},
+  {default, 0},
   {datatype, integer},
   {validators, ["range:gt_0_lt_256"]}
 ]}.
 
+{translation, "gen_rpc.tcp_client_num", fun(Conf) ->
+  case cuttlefish:conf_get("rpc.tcp_client_num", Conf) of
+    0 -> max(1, erlang:system_info(schedulers) div 2);
+    V -> V
+  end
+end}.
+
 %% Client connect timeout
 {mapping, "rpc.connect_timeout", "gen_rpc.connect_timeout", [
   {default, "5s"},
@@ -428,7 +435,7 @@ end}.
 ]}.
 
 {validator, "range:gt_0_lt_256", "must greater than 0 and less than 256",
-  fun(X) -> X > 0 andalso X < 256 end
+  fun(X) -> X >= 0 andalso X < 256 end
 }.
 
 %%--------------------------------------------------------------------

+ 30 - 9
src/emqx_mqtt_caps.erl

@@ -29,6 +29,8 @@
         , get_caps/3
         ]).
 
+-export([default_caps/0]).
+
 -export([default/0]).
 
 -export_type([caps/0]).
@@ -116,19 +118,31 @@ do_check_sub(#{is_shared := true}, #{shared_subscription := false}) ->
     {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED};
 do_check_sub(_Flags, _Caps) -> ok.
 
--spec(get_caps(emqx_zone:zone()) -> caps()).
-get_caps(Zone) ->
-    maps:map(fun(Cap, Def) -> emqx_zone:get_env(Zone, Cap, Def) end, ?DEFAULT_CAPS).
+default_caps() ->
+    ?DEFAULT_CAPS.
+
+get_caps(Zone, Cap, Def) ->
+    emqx_zone:get_env(Zone, Cap, Def).
 
--spec(get_caps(emqx_zone:zone(), publish|subscribe) -> caps()).
 get_caps(Zone, publish) ->
-    filter_caps(?PUBCAP_KEYS, get_caps(Zone));
+    with_env(Zone, '$mqtt_pub_caps',
+             fun() ->
+                 filter_caps(?PUBCAP_KEYS, get_caps(Zone))
+             end);
+
 get_caps(Zone, subscribe) ->
-    filter_caps(?SUBCAP_KEYS, get_caps(Zone)).
+    with_env(Zone, '$mqtt_sub_caps',
+             fun() ->
+                 filter_caps(?SUBCAP_KEYS, get_caps(Zone))
+             end).
 
--spec(get_caps(emqx_zone:zone(), atom(), term()) -> term()).
-get_caps(Zone, Cap, Def) ->
-    emqx_zone:get_env(Zone, Cap, Def).
+get_caps(Zone) ->
+    with_env(Zone, '$mqtt_caps',
+             fun() ->
+                maps:map(fun(Cap, Def) ->
+                    emqx_zone:get_env(Zone, Cap, Def)
+                end, ?DEFAULT_CAPS)
+             end).
 
 filter_caps(Keys, Caps) ->
     maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps).
@@ -136,3 +150,10 @@ filter_caps(Keys, Caps) ->
 -spec(default() -> caps()).
 default() -> ?DEFAULT_CAPS.
 
+with_env(Zone, Key, InitFun) ->
+    case emqx_zone:get_env(Zone, Key) of
+        undefined -> Caps = InitFun(),
+                     ok = emqx_zone:set_env(Zone, Key, Caps),
+                     Caps;
+        ZoneCaps  -> ZoneCaps
+    end.

+ 6 - 12
test/emqx_mqtt_caps_SUITE.erl

@@ -28,9 +28,8 @@ t_check_pub(_) ->
     PubCaps = #{max_qos_allowed => ?QOS_1,
                 retain_available => false
                },
-    lists:foreach(fun({Key, Val}) ->
-        ok = emqx_zone:set_env(zone, Key, Val)
-    end, maps:to_list(PubCaps)),
+    emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps),
+    timer:sleep(50),
     ok = emqx_mqtt_caps:check_pub(zone, #{qos => ?QOS_1,
                                           retain => false}),
     PubFlags1 = #{qos => ?QOS_2, retain => false},
@@ -39,9 +38,7 @@ t_check_pub(_) ->
     PubFlags2 = #{qos => ?QOS_1, retain => true},
     ?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED},
                  emqx_mqtt_caps:check_pub(zone, PubFlags2)),
-    lists:foreach(fun({Key, _Val}) ->
-        true = emqx_zone:unset_env(zone, Key)
-    end, maps:to_list(PubCaps)).
+    emqx_zone:unset_env(zone, '$mqtt_pub_caps').
 
 t_check_sub(_) ->
     SubOpts = #{rh  => 0,
@@ -54,9 +51,8 @@ t_check_sub(_) ->
                 shared_subscription => false,
                 wildcard_subscription => false
                },
-    lists:foreach(fun({Key, Val}) ->
-        ok = emqx_zone:set_env(zone, Key, Val)
-    end, maps:to_list(SubCaps)),
+    emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps),
+    timer:sleep(50),
     ok = emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts),
     ?assertEqual({error, ?RC_TOPIC_FILTER_INVALID},
                  emqx_mqtt_caps:check_sub(zone, <<"a/b/c/d">>, SubOpts)),
@@ -64,6 +60,4 @@ t_check_sub(_) ->
                  emqx_mqtt_caps:check_sub(zone, <<"+/#">>, SubOpts)),
     ?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
                  emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts#{share => true})),
-    lists:foreach(fun({Key, _Val}) ->
-        true = emqx_zone:unset_env(zone, Key)
-    end, maps:to_list(SubCaps)).
+    emqx_zone:unset_env(zone, '$mqtt_pub_caps').