Просмотр исходного кода

refactor(limiter): simplify limiter configuration

firest 2 лет назад
Родитель
Сommit
d914d1ee1d

+ 91 - 10
apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl

@@ -32,9 +32,13 @@
     get_bucket_cfg_path/2,
     desc/1,
     types/0,
+    short_paths/0,
     calc_capacity/1,
     extract_with_type/2,
-    default_client_config/0
+    default_client_config/0,
+    short_paths_fields/1,
+    get_listener_opts/1,
+    get_node_opts/1
 ]).
 
 -define(KILOBYTE, 1024).
@@ -104,15 +108,17 @@ roots() ->
     ].
 
 fields(limiter) ->
-    [
-        {Type,
-            ?HOCON(?R_REF(node_opts), #{
-                desc => ?DESC(Type),
-                importance => ?IMPORTANCE_HIDDEN,
-                aliases => alias_of_type(Type)
-            })}
-     || Type <- types()
-    ] ++
+    short_paths_fields(?MODULE) ++
+        [
+            {Type,
+                ?HOCON(?R_REF(node_opts), #{
+                    desc => ?DESC(Type),
+                    importance => ?IMPORTANCE_HIDDEN,
+                    required => {false, recursively},
+                    aliases => alias_of_type(Type)
+                })}
+         || Type <- types()
+        ] ++
         [
             %% This is an undocumented feature, and it won't be support anymore
             {client,
@@ -203,6 +209,14 @@ fields(listener_client_fields) ->
 fields(Type) ->
     simple_bucket_field(Type).
 
+short_paths_fields(DesModule) ->
+    [
+        {Name,
+            ?HOCON(rate(), #{desc => ?DESC(DesModule, Name), required => false, example => Example})}
+     || {Name, Example} <-
+            lists:zip(short_paths(), [<<"1000/s">>, <<"1000/s">>, <<"100MB/s">>])
+    ].
+
 desc(limiter) ->
     "Settings for the rate limiter.";
 desc(node_opts) ->
@@ -236,6 +250,9 @@ get_bucket_cfg_path(Type, BucketName) ->
 types() ->
     [bytes, messages, connection, message_routing, internal].
 
+short_paths() ->
+    [max_conn_rate, messages_rate, bytes_rate].
+
 calc_capacity(#{rate := infinity}) ->
     infinity;
 calc_capacity(#{rate := Rate, burst := Burst}) ->
@@ -266,6 +283,31 @@ default_client_config() ->
         failure_strategy => force
     }.
 
+default_bucket_config() ->
+    #{
+        rate => infinity,
+        burst => 0
+    }.
+
+get_listener_opts(Conf) ->
+    Limiter = maps:get(limiter, Conf, undefined),
+    ShortPaths = maps:with(short_paths(), Conf),
+    get_listener_opts(Limiter, ShortPaths).
+
+get_node_opts(Type) ->
+    Opts = emqx:get_config([limiter, Type], default_bucket_config()),
+    case type_to_short_path_name(Type) of
+        undefined ->
+            Opts;
+        Name ->
+            case emqx:get_config([limiter, Name], undefined) of
+                undefined ->
+                    Opts;
+                Rate ->
+                    Opts#{rate := Rate}
+            end
+    end.
+
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------
@@ -476,3 +518,42 @@ merge_client_bucket(Type, _, {ok, BucketVal}) ->
     #{Type => BucketVal};
 merge_client_bucket(_, _, _) ->
     undefined.
+
+short_path_name_to_type(max_conn_rate) ->
+    connection;
+short_path_name_to_type(messages_rate) ->
+    messages;
+short_path_name_to_type(bytes_rate) ->
+    bytes.
+
+type_to_short_path_name(connection) ->
+    max_conn_rate;
+type_to_short_path_name(messages) ->
+    messages_rate;
+type_to_short_path_name(bytes) ->
+    bytes_rate;
+type_to_short_path_name(_) ->
+    undefined.
+
+get_listener_opts(Limiter, ShortPaths) when map_size(ShortPaths) =:= 0 ->
+    Limiter;
+get_listener_opts(undefined, ShortPaths) ->
+    convert_listener_short_paths(ShortPaths);
+get_listener_opts(Limiter, ShortPaths) ->
+    Shorts = convert_listener_short_paths(ShortPaths),
+    emqx_utils_maps:deep_merge(Limiter, Shorts).
+
+convert_listener_short_paths(ShortPaths) ->
+    DefBucket = default_bucket_config(),
+    DefClient = default_client_config(),
+    Fun = fun(Name, Rate, Acc) ->
+        Type = short_path_name_to_type(Name),
+        case Name of
+            max_conn_rate ->
+                Acc#{Type => DefBucket#{rate => Rate}};
+            _ ->
+                Client = maps:get(client, Acc, #{}),
+                Acc#{client => Client#{Type => DefClient#{rate => Rate}}}
+        end
+    end,
+    maps:fold(Fun, #{}, ShortPaths).

+ 3 - 6
apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl

@@ -481,7 +481,7 @@ dispatch_burst_to_buckets([], _, Alloced, Buckets) ->
 
 -spec init_tree(emqx_limiter_schema:limiter_type()) -> state().
 init_tree(Type) when is_atom(Type) ->
-    Cfg = emqx:get_config([limiter, Type]),
+    Cfg = emqx_limiter_schema:get_node_opts(Type),
     init_tree(Type, Cfg).
 
 init_tree(Type, #{rate := Rate} = Cfg) ->
@@ -625,13 +625,10 @@ find_referenced_bucket(Id, Type, #{rate := Rate} = Cfg) when Rate =/= infinity -
             {error, invalid_bucket}
     end;
 %% this is a node-level reference
-find_referenced_bucket(Id, Type, _) ->
-    case emqx:get_config([limiter, Type], undefined) of
+find_referenced_bucket(_Id, Type, _) ->
+    case emqx_limiter_schema:get_node_opts(Type) of
         #{rate := infinity} ->
             false;
-        undefined ->
-            ?SLOG(error, #{msg => "invalid limiter type", type => Type, id => Id}),
-            {error, invalid_bucket};
         NodeCfg ->
             {ok, Bucket} = emqx_limiter_manager:find_root(Type),
             {ok, Bucket, NodeCfg}

+ 1 - 1
apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl

@@ -86,7 +86,7 @@ init([]) ->
 %%  Internal functions
 %%--==================================================================
 make_child(Type) ->
-    Cfg = emqx:get_config([limiter, Type]),
+    Cfg = emqx_limiter_schema:get_node_opts(Type),
     make_child(Type, Cfg).
 
 make_child(Type, Cfg) ->

+ 1 - 1
apps/emqx/src/emqx_listeners.erl

@@ -639,7 +639,7 @@ zone(Opts) ->
     maps:get(zone, Opts, undefined).
 
 limiter(Opts) ->
-    maps:get(limiter, Opts, undefined).
+    emqx_limiter_schema:get_listener_opts(Opts).
 
 add_limiter_bucket(Id, #{limiter := Limiter}) ->
     maps:fold(

+ 3 - 2
apps/emqx/src/emqx_schema.erl

@@ -2000,7 +2000,8 @@ base_listener(Bind) ->
                     listener_fields
                 ),
                 #{
-                    desc => ?DESC(base_listener_limiter)
+                    desc => ?DESC(base_listener_limiter),
+                    importance => ?IMPORTANCE_HIDDEN
                 }
             )},
         {"enable_authn",
@@ -2011,7 +2012,7 @@ base_listener(Bind) ->
                     default => true
                 }
             )}
-    ].
+    ] ++ emqx_limiter_schema:short_paths_fields(?MODULE).
 
 desc("persistent_session_store") ->
     "Settings for message persistence.";

+ 84 - 12
apps/emqx/test/emqx_ratelimiter_SUITE.erl

@@ -47,7 +47,7 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, ?BASE_CONF),
+    load_conf(),
     emqx_common_test_helpers:start_apps([?APP]),
     Config.
 
@@ -55,13 +55,15 @@ end_per_suite(_Config) ->
     emqx_common_test_helpers:stop_apps([?APP]).
 
 init_per_testcase(_TestCase, Config) ->
+    emqx_config:erase(limiter),
+    load_conf(),
     Config.
 
 end_per_testcase(_TestCase, Config) ->
     Config.
 
 load_conf() ->
-    emqx_common_test_helpers:load_config(emqx_limiter_schema, ?BASE_CONF).
+    ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, ?BASE_CONF).
 
 init_config() ->
     emqx_config:init_load(emqx_limiter_schema, ?BASE_CONF).
@@ -313,8 +315,8 @@ t_capacity(_) ->
 %% Test Cases Global Level
 %%--------------------------------------------------------------------
 t_collaborative_alloc(_) ->
-    GlobalMod = fun(#{message_routing := MR} = Cfg) ->
-        Cfg#{message_routing := MR#{rate := ?RATE("600/1s")}}
+    GlobalMod = fun(Cfg) ->
+        Cfg#{message_routing => #{rate => ?RATE("600/1s"), burst => 0}}
     end,
 
     Bucket1 = fun(#{client := Cli} = Bucket) ->
@@ -353,11 +355,11 @@ t_collaborative_alloc(_) ->
     ).
 
 t_burst(_) ->
-    GlobalMod = fun(#{message_routing := MR} = Cfg) ->
+    GlobalMod = fun(Cfg) ->
         Cfg#{
-            message_routing := MR#{
-                rate := ?RATE("200/1s"),
-                burst := ?RATE("400/1s")
+            message_routing => #{
+                rate => ?RATE("200/1s"),
+                burst => ?RATE("400/1s")
             }
         }
     end,
@@ -653,16 +655,16 @@ t_not_exists_instance(_) ->
     ),
 
     ?assertEqual(
-        {error, invalid_bucket},
+        {ok, infinity},
         emqx_limiter_server:connect(?FUNCTION_NAME, not_exists, Cfg)
     ),
     ok.
 
 t_create_instance_with_node(_) ->
-    GlobalMod = fun(#{message_routing := MR} = Cfg) ->
+    GlobalMod = fun(Cfg) ->
         Cfg#{
-            message_routing := MR#{rate := ?RATE("200/1s")},
-            messages := MR#{rate := ?RATE("200/1s")}
+            message_routing => #{rate => ?RATE("200/1s"), burst => 0},
+            messages => #{rate => ?RATE("200/1s"), burst => 0}
         }
     end,
 
@@ -739,6 +741,68 @@ t_esockd_htb_consume(_) ->
     ?assertMatch({ok, _}, C2R),
     ok.
 
+%%--------------------------------------------------------------------
+%% Test Cases short paths
+%%--------------------------------------------------------------------
+t_node_short_paths(_) ->
+    CfgStr = <<"limiter {max_conn_rate = \"1000\", messages_rate = \"100\", bytes_rate = \"10\"}">>,
+    ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, CfgStr),
+    Accessor = fun emqx_limiter_schema:get_node_opts/1,
+    ?assertMatch(#{rate := 100.0}, Accessor(connection)),
+    ?assertMatch(#{rate := 10.0}, Accessor(messages)),
+    ?assertMatch(#{rate := 1.0}, Accessor(bytes)),
+    ?assertMatch(#{rate := infinity}, Accessor(message_routing)),
+    ?assertEqual(undefined, emqx:get_config([limiter, connection], undefined)).
+
+t_compatibility_for_node_short_paths(_) ->
+    CfgStr =
+        <<"limiter {max_conn_rate = \"1000\", connection.rate = \"500\", bytes.rate = \"200\"}">>,
+    ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, CfgStr),
+    Accessor = fun emqx_limiter_schema:get_node_opts/1,
+    ?assertMatch(#{rate := 100.0}, Accessor(connection)),
+    ?assertMatch(#{rate := 20.0}, Accessor(bytes)).
+
+t_listener_short_paths(_) ->
+    CfgStr = <<
+        ""
+        "listeners.tcp.default {max_conn_rate = \"1000\", messages_rate = \"100\", bytes_rate = \"10\"}"
+        ""
+    >>,
+    ok = emqx_common_test_helpers:load_config(emqx_schema, CfgStr),
+    ListenerOpt = emqx:get_config([listeners, tcp, default]),
+    ?assertMatch(
+        #{
+            client := #{
+                messages := #{rate := 10.0},
+                bytes := #{rate := 1.0}
+            },
+            connection := #{rate := 100.0}
+        },
+        emqx_limiter_schema:get_listener_opts(ListenerOpt)
+    ).
+
+t_compatibility_for_listener_short_paths(_) ->
+    CfgStr = <<
+        "" "listeners.tcp.default {max_conn_rate = \"1000\", limiter.connection.rate = \"500\"}" ""
+    >>,
+    ok = emqx_common_test_helpers:load_config(emqx_schema, CfgStr),
+    ListenerOpt = emqx:get_config([listeners, tcp, default]),
+    ?assertMatch(
+        #{
+            connection := #{rate := 100.0}
+        },
+        emqx_limiter_schema:get_listener_opts(ListenerOpt)
+    ).
+
+t_no_limiter_for_listener(_) ->
+    CfgStr = <<>>,
+    ok = emqx_common_test_helpers:load_config(emqx_schema, CfgStr),
+    ListenerOpt = emqx:get_config([listeners, tcp, default]),
+    ?assertEqual(
+        undefined,
+        emqx_limiter_schema:get_listener_opts(ListenerOpt)
+    ).
+
 %%--------------------------------------------------------------------
 %%% Internal functions
 %%--------------------------------------------------------------------
@@ -1043,3 +1107,11 @@ make_create_test_data_with_infinity_node(FakeInstnace) ->
         %% client = C bucket = B C > B
         {MkA(1000, 100), IsRefLimiter(FakeInstnace)}
     ].
+
+parse_schema(ConfigString) ->
+    {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
+    hocon_tconf:check_plain(
+        emqx_limiter_schema,
+        RawConf,
+        #{required => false, atom_key => false}
+    ).

+ 42 - 3
apps/emqx_management/src/emqx_mgmt_api_configs.erl

@@ -28,7 +28,8 @@
     config_reset/3,
     configs/3,
     get_full_config/0,
-    global_zone_configs/3
+    global_zone_configs/3,
+    limiter/3
 ]).
 
 -define(PREFIX, "/configs/").
@@ -42,7 +43,6 @@
     <<"alarm">>,
     <<"sys_topics">>,
     <<"sysmon">>,
-    <<"limiter">>,
     <<"log">>,
     <<"persistent_session_store">>,
     <<"zones">>
@@ -57,7 +57,8 @@ paths() ->
     [
         "/configs",
         "/configs_reset/:rootname",
-        "/configs/global_zone"
+        "/configs/global_zone",
+        "/configs/limiter"
     ] ++
         lists:map(fun({Name, _Type}) -> ?PREFIX ++ binary_to_list(Name) end, config_list()).
 
@@ -147,6 +148,28 @@ schema("/configs/global_zone") ->
             }
         }
     };
+schema("/configs/limiter") ->
+    #{
+        'operationId' => limiter,
+        get => #{
+            tags => ?TAGS,
+            description => <<"Get the node-level limiter configs">>,
+            responses => #{
+                200 => hoconsc:mk(hoconsc:ref(emqx_limiter_schema, limiter)),
+                404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"config not found">>)
+            }
+        },
+        put => #{
+            tags => ?TAGS,
+            description => <<"Update the node-level limiter configs">>,
+            'requestBody' => hoconsc:mk(hoconsc:ref(emqx_limiter_schema, limiter)),
+            responses => #{
+                200 => hoconsc:mk(hoconsc:ref(emqx_limiter_schema, limiter)),
+                400 => emqx_dashboard_swagger:error_codes(['UPDATE_FAILED']),
+                403 => emqx_dashboard_swagger:error_codes(['UPDATE_FAILED'])
+            }
+        }
+    };
 schema(Path) ->
     {RootKey, {_Root, Schema}} = find_schema(Path),
     #{
@@ -272,6 +295,22 @@ configs(get, Params, _Req) ->
             {200, Res}
     end.
 
+limiter(get, _Params, _Req) ->
+    {200, format_limiter_config(get_raw_config(limiter))};
+limiter(put, #{body := NewConf}, _Req) ->
+    case emqx_conf:update([limiter], NewConf, ?OPTS) of
+        {ok, #{raw_config := RawConf}} ->
+            {200, format_limiter_config(RawConf)};
+        {error, {permission_denied, Reason}} ->
+            {403, #{code => 'UPDATE_FAILED', message => Reason}};
+        {error, Reason} ->
+            {400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Reason)}}
+    end.
+
+format_limiter_config(RawConf) ->
+    Shorts = lists:map(fun erlang:atom_to_binary/1, emqx_limiter_schema:short_paths()),
+    maps:with(Shorts, RawConf).
+
 conf_path_reset(Req) ->
     <<"/api/v5", ?PREFIX_RESET, Path/binary>> = cowboy_req:path(Req),
     string:lexemes(Path, "/ ").

+ 21 - 0
rel/i18n/emqx_limiter_schema.hocon

@@ -1,5 +1,26 @@
 emqx_limiter_schema {
 
+max_conn_rate.desc:
+"""Maximum connection rate.<br/>
+This is used to limit the connection rate for this node,
+once the limit is reached, new connections will be deferred or refused"""
+max_conn_rate.label:
+"""Maximum Connection Rate"""
+
+messages_rate.desc:
+"""Messages publish rate.<br/>
+This is used to limit the inbound message numbers for this node,
+once the limit is reached, the restricted client will slow down and even be hung for a while."""
+messages_rate.label:
+"""Messages Publish Rate"""
+
+bytes_rate.desc:
+"""Data publish rate.<br/>
+This is used to limit the inbound bytes rate for this node,
+once the limit is reached, the restricted client will slow down and even be hung for a while."""
+bytes_rate.label:
+"""Data Publish Rate"""
+
 bucket_cfg.desc:
 """Bucket Configs"""
 

+ 21 - 0
rel/i18n/emqx_schema.hocon

@@ -1033,6 +1033,27 @@ base_listener_limiter.desc:
 base_listener_limiter.label:
 """Type of the rate limit."""
 
+max_conn_rate.desc:
+"""Maximum connection rate.<br/>
+This is used to limit the connection rate for this listener,
+once the limit is reached, new connections will be deferred or refused"""
+max_conn_rate.label:
+"""Maximum Connection Rate"""
+
+messages_rate.desc:
+"""Messages publish rate.<br/>
+This is used to limit the inbound message numbers for each client connected to this listener,
+once the limit is reached, the restricted client will slow down and even be hung for a while."""
+messages_rate.label:
+"""Messages Publish Rate"""
+
+bytes_rate.desc:
+"""Data publish rate.<br/>
+This is used to limit the inbound bytes rate for each client connected to this listener,
+once the limit is reached, the restricted client will slow down and even be hung for a while."""
+bytes_rate.label:
+"""Data Publish Rate"""
+
 persistent_session_store_backend.desc:
 """Database management system used to store information about persistent sessions and messages.
 - `builtin`: Use the embedded database (mria)"""