Procházet zdrojové kódy

fix(limiter): Simplified limiter configuration

1. Simplified limiter configuration
2. add switch for limiter server
firest před 3 roky
rodič
revize
74f715ca08

+ 11 - 0
apps/emqx/i18n/emqx_limiter_i18n.conf

@@ -1,5 +1,16 @@
 emqx_limiter_schema {
 
+  enable {
+    desc {
+      en: """Enable"""
+      zh: """是否开启"""
+    }
+    label: {
+      en: """Enable"""
+      zh: """是否开启"""
+    }
+  }
+
   failure_strategy {
     desc {
       en: """The strategy when all the retries failed."""

+ 0 - 39
apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf

@@ -3,43 +3,4 @@
 ##--------------------------------------------------------------------
 
 limiter {
-  ## rate limiter for message publish
-  bytes_in {
-    bucket.default {
-      rate = infinity
-      capacity = infinity
-    }
-  }
-
-  ## rate limiter for message publish
-  message_in {
-    bucket.default {
-      rate = infinity
-      capacity = infinity
-    }
-  }
-
-  ## connection rate limiter
-  connection {
-    bucket.default {
-      rate = infinity
-      capacity = infinity
-    }
-  }
-
-  ## rate limiter for message deliver
-  message_routing {
-    bucket.default {
-      rate = infinity
-      capacity = infinity
-    }
-  }
-
-  ## rate limiter for internal batch operation
-  batch {
-    bucket.retainer {
-      rate = infinity
-      capacity = infinity
-    }
-  }
 }

+ 1 - 1
apps/emqx/src/emqx_limiter/src/emqx_esockd_htb_limiter.erl

@@ -40,7 +40,7 @@ new_create_options(Type, BucketName) ->
 
 -spec create(create_options()) -> esockd_generic_limiter:limiter().
 create(#{module := ?MODULE, type := Type, bucket := BucketName}) ->
-    Limiter = emqx_limiter_server:connect(Type, BucketName),
+    {ok, Limiter} = emqx_limiter_server:connect(Type, BucketName),
     #{module => ?MODULE, name => Type, limiter => Limiter}.
 
 delete(_GLimiter) ->

+ 2 - 2
apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl

@@ -89,7 +89,7 @@ new(Types, Names) ->
 ) -> container().
 get_limiter_by_names(Types, BucketNames) ->
     Init = fun(Type, Acc) ->
-        Limiter = emqx_limiter_server:connect(Type, BucketNames),
+        {ok, Limiter} = emqx_limiter_server:connect(Type, BucketNames),
         add_new(Type, Limiter, Acc)
     end,
     lists:foldl(Init, #{retry_ctx => undefined}, Types).
@@ -101,7 +101,7 @@ get_limiter_by_names(Types, BucketNames) ->
     container()
 ) -> container().
 update_by_name(Type, Buckets, Container) ->
-    Limiter = emqx_limiter_server:connect(Type, Buckets),
+    {ok, Limiter} = emqx_limiter_server:connect(Type, Buckets),
     add_new(Type, Limiter, Container).
 
 -spec add_new(limiter_type(), limiter(), container()) -> container().

+ 33 - 4
apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl

@@ -24,15 +24,21 @@
 %% API
 -export([
     start_link/0,
-    start_server/1,
     find_bucket/1,
     find_bucket/2,
-    insert_bucket/2, insert_bucket/3,
+    insert_bucket/2,
+    insert_bucket/3,
     make_path/2,
-    restart_server/1,
     post_config_update/5
 ]).
 
+-export([
+    start_server/1,
+    start_server/2,
+    restart_server/1,
+    stop_server/1
+]).
+
 %% gen_server callbacks
 -export([
     init/1,
@@ -67,10 +73,18 @@
 start_server(Type) ->
     emqx_limiter_server_sup:start(Type).
 
+-spec start_server(limiter_type(), hocons:config()) -> _.
+start_server(Type, Cfg) ->
+    emqx_limiter_server_sup:start(Type, Cfg).
+
 -spec restart_server(limiter_type()) -> _.
 restart_server(Type) ->
     emqx_limiter_server:restart(Type).
 
+-spec stop_server(limiter_type()) -> _.
+stop_server(Type) ->
+    emqx_limiter_server_sup:stop(Type).
+
 -spec find_bucket(limiter_type(), bucket_name()) ->
     {ok, bucket_ref()} | undefined.
 find_bucket(Type, BucketName) ->
@@ -103,7 +117,22 @@ make_path(Type, BucketName) ->
 
 post_config_update([limiter, Type], _Config, NewConf, _OldConf, _AppEnvs) ->
     Config = maps:get(Type, NewConf),
-    emqx_limiter_server:update_config(Type, Config).
+    case emqx_limiter_server:whereis(Type) of
+        undefined ->
+            case Config of
+                #{enable := false} ->
+                    ok;
+                _ ->
+                    start_server(Type)
+            end;
+        _ ->
+            case Config of
+                #{enable := false} ->
+                    stop_server(Type);
+                _ ->
+                    emqx_limiter_server:update_config(Type, Config)
+            end
+    end.
 
 %%--------------------------------------------------------------------
 %% @doc

+ 19 - 12
apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl

@@ -30,7 +30,8 @@
     namespace/0,
     get_bucket_cfg_path/2,
     desc/1,
-    types/0
+    types/0,
+    is_enable/1
 ]).
 
 -define(KILOBYTE, 1024).
@@ -86,29 +87,31 @@ roots() -> [limiter].
 
 fields(limiter) ->
     [
-        {bytes_in, sc(ref(limiter_opts), #{desc => ?DESC(bytes_in)})},
-        {message_in, sc(ref(limiter_opts), #{desc => ?DESC(message_in)})},
-        {connection, sc(ref(limiter_opts), #{desc => ?DESC(connection)})},
-        {message_routing, sc(ref(limiter_opts), #{desc => ?DESC(message_routing)})},
-        {batch, sc(ref(limiter_opts), #{desc => ?DESC(batch)})}
+        {Type, sc(ref(limiter_opts), #{desc => ?DESC(Type), default => #{<<"enable">> => false}})}
+     || Type <- types()
     ];
 fields(limiter_opts) ->
     [
-        {rate, sc(rate(), #{default => "infinity", desc => ?DESC(rate)})},
+        {enable, sc(boolean(), #{desc => ?DESC(enable), default => true})},
+        {rate, sc(rate(), #{desc => ?DESC(rate), default => "infinity"})},
         {burst,
             sc(
                 burst_rate(),
                 #{
-                    default => "0/0s",
-                    desc => ?DESC(burst)
+                    desc => ?DESC(burst),
+                    default => 0
                 }
             )},
-        {bucket, sc(map("bucket_name", ref(bucket_opts)), #{desc => ?DESC(bucket_cfg)})}
+        {bucket,
+            sc(
+                map("bucket_name", ref(bucket_opts)),
+                #{desc => ?DESC(bucket_cfg), default => #{<<"default">> => #{}}}
+            )}
     ];
 fields(bucket_opts) ->
     [
-        {rate, sc(rate(), #{desc => ?DESC(rate)})},
-        {capacity, sc(capacity(), #{desc => ?DESC(capacity)})},
+        {rate, sc(rate(), #{desc => ?DESC(rate), default => "infinity"})},
+        {capacity, sc(capacity(), #{desc => ?DESC(capacity), default => "infinity"})},
         {initial, sc(initial(), #{default => "0", desc => ?DESC(initial)})},
         {per_client,
             sc(
@@ -188,6 +191,10 @@ to_rate(Str) ->
 get_bucket_cfg_path(Type, BucketName) ->
     [limiter, Type, bucket, BucketName].
 
+-spec is_enable(limiter_type()) -> boolean().
+is_enable(Type) ->
+    emqx:get_config([limiter, Type, enable], false).
+
 types() ->
     [bytes_in, message_in, connection, message_routing, batch].
 

+ 54 - 26
apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl

@@ -41,8 +41,9 @@
 ]).
 
 -export([
-    start_link/1,
+    start_link/2,
     connect/2,
+    whereis/1,
     info/1,
     name/1,
     get_initial_val/1,
@@ -88,7 +89,7 @@
 -type decimal() :: emqx_limiter_decimal:decimal().
 -type index() :: pos_integer().
 
--define(CALL(Type, Msg), gen_server:call(name(Type), Msg)).
+-define(CALL(Type, Msg), call(Type, Msg)).
 -define(CALL(Type), ?CALL(Type, ?FUNCTION_NAME)).
 
 %% minimum coefficient for overloaded limiter
@@ -107,16 +108,18 @@
     limiter_type(),
     bucket_name() | #{limiter_type() => bucket_name() | undefined}
 ) ->
-    emqx_htb_limiter:limiter().
+    {ok, emqx_htb_limiter:limiter()} | {error, _}.
 %% If no bucket path is set in config, there will be no limit
 connect(_Type, undefined) ->
-    emqx_htb_limiter:make_infinity_limiter();
+    {ok, emqx_htb_limiter:make_infinity_limiter()};
 connect(Type, BucketName) when is_atom(BucketName) ->
-    CfgPath = emqx_limiter_schema:get_bucket_cfg_path(Type, BucketName),
-    case emqx:get_config(CfgPath, undefined) of
+    case check_enable_and_get_bucket_cfg(Type, BucketName) of
         undefined ->
-            ?SLOG(error, #{msg => "bucket_config_not_found", path => CfgPath}),
-            throw("bucket's config not found");
+            ?SLOG(error, #{msg => "bucket_config_not_found", type => Type, bucket => BucketName}),
+            {error, config_not_found};
+        limiter_not_started ->
+            ?SLOG(error, #{msg => "limiter_not_started", type => Type, bucket => BucketName}),
+            {error, limiter_not_started};
         #{
             rate := AggrRate,
             capacity := AggrSize,
@@ -124,23 +127,24 @@ connect(Type, BucketName) when is_atom(BucketName) ->
         } ->
             case emqx_limiter_manager:find_bucket(Type, BucketName) of
                 {ok, Bucket} ->
-                    if
-                        CliRate < AggrRate orelse CliSize < AggrSize ->
-                            emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket);
-                        Bucket =:= infinity ->
-                            emqx_htb_limiter:make_infinity_limiter();
-                        true ->
-                            emqx_htb_limiter:make_ref_limiter(Cfg, Bucket)
-                    end;
+                    {ok,
+                        if
+                            CliRate < AggrRate orelse CliSize < AggrSize ->
+                                emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket);
+                            Bucket =:= infinity ->
+                                emqx_htb_limiter:make_infinity_limiter();
+                            true ->
+                                emqx_htb_limiter:make_ref_limiter(Cfg, Bucket)
+                        end};
                 undefined ->
-                    ?SLOG(error, #{msg => "bucket_not_found", path => CfgPath}),
-                    throw("invalid bucket")
+                    ?SLOG(error, #{msg => "bucket_not_found", type => Type, bucket => BucketName}),
+                    {error, invalid_bucket}
             end
     end;
 connect(Type, Paths) ->
     connect(Type, maps:get(Type, Paths, undefined)).
 
--spec info(limiter_type()) -> state().
+-spec info(limiter_type()) -> state() | {error, _}.
 info(Type) ->
     ?CALL(Type).
 
@@ -148,22 +152,26 @@ info(Type) ->
 name(Type) ->
     erlang:list_to_atom(io_lib:format("~s_~s", [?MODULE, Type])).
 
--spec restart(limiter_type()) -> ok.
+-spec restart(limiter_type()) -> ok | {error, _}.
 restart(Type) ->
     ?CALL(Type).
 
--spec update_config(limiter_type(), hocons:config()) -> ok.
+-spec update_config(limiter_type(), hocons:config()) -> ok | {error, _}.
 update_config(Type, Config) ->
     ?CALL(Type, {update_config, Type, Config}).
 
+-spec whereis(limiter_type()) -> pid() | undefined.
+whereis(Type) ->
+    erlang:whereis(name(Type)).
+
 %%--------------------------------------------------------------------
 %% @doc
 %% Starts the server
 %% @end
 %%--------------------------------------------------------------------
--spec start_link(limiter_type()) -> _.
-start_link(Type) ->
-    gen_server:start_link({local, name(Type)}, ?MODULE, [Type], []).
+-spec start_link(limiter_type(), hocons:config()) -> _.
+start_link(Type, Cfg) ->
+    gen_server:start_link({local, name(Type)}, ?MODULE, [Type, Cfg], []).
 
 %%--------------------------------------------------------------------
 %%% gen_server callbacks
@@ -181,8 +189,8 @@ start_link(Type) ->
     | {ok, State :: term(), hibernate}
     | {stop, Reason :: term()}
     | ignore.
-init([Type]) ->
-    State = init_tree(Type),
+init([Type, Cfg]) ->
+    State = init_tree(Type, Cfg),
     #{root := #{period := Perido}} = State,
     oscillate(Perido),
     {ok, State}.
@@ -597,3 +605,23 @@ get_initial_val(#{
         true ->
             0
     end.
+
+-spec call(limiter_type(), any()) -> {error, _} | _.
+call(Type, Msg) ->
+    case ?MODULE:whereis(Type) of
+        undefined ->
+            {error, limiter_not_started};
+        Pid ->
+            gen_server:call(Pid, Msg)
+    end.
+
+-spec check_enable_and_get_bucket_cfg(limiter_type(), bucket_name()) ->
+    undefined | limiter_not_started | hocons:config().
+check_enable_and_get_bucket_cfg(Type, Bucket) ->
+    case emqx_limiter_schema:is_enable(Type) of
+        false ->
+            limiter_not_started;
+        _ ->
+            Path = emqx_limiter_schema:get_bucket_cfg_path(Type, Bucket),
+            emqx:get_config(Path, undefined)
+    end.

+ 25 - 4
apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl

@@ -19,7 +19,7 @@
 -behaviour(supervisor).
 
 %% API
--export([start_link/0, start/1]).
+-export([start_link/0, start/1, start/2, stop/1]).
 
 %% Supervisor callbacks
 -export([init/1]).
@@ -47,6 +47,15 @@ start(Type) ->
     Spec = make_child(Type),
     supervisor:start_child(?MODULE, Spec).
 
+-spec start(emqx_limiter_schema:limiter_type(), hocons:config()) -> _.
+start(Type, Cfg) ->
+    Spec = make_child(Type, Cfg),
+    supervisor:start_child(?MODULE, Spec).
+
+stop(Type) ->
+    Id = emqx_limiter_server:name(Type),
+    supervisor:terminate_child(?MODULE, Id).
+
 %%--------------------------------------------------------------------
 %%  Supervisor callbacks
 %%--------------------------------------------------------------------
@@ -76,10 +85,14 @@ init([]) ->
 %%  Internal functions
 %%--==================================================================
 make_child(Type) ->
+    Cfg = emqx:get_config([limiter, Type]),
+    make_child(Type, Cfg).
+
+make_child(Type, Cfg) ->
     Id = emqx_limiter_server:name(Type),
     #{
         id => Id,
-        start => {emqx_limiter_server, start_link, [Type]},
+        start => {emqx_limiter_server, start_link, [Type, Cfg]},
         restart => transient,
         shutdown => 5000,
         type => worker,
@@ -88,5 +101,13 @@ make_child(Type) ->
 
 childs() ->
     Conf = emqx:get_config([limiter]),
-    Types = maps:keys(Conf),
-    [make_child(Type) || Type <- Types].
+    lists:foldl(
+        fun
+            ({Type, #{enable := true}}, Acc) ->
+                [make_child(Type) | Acc];
+            (_, Acc) ->
+                Acc
+        end,
+        [],
+        maps:to_list(Conf)
+    ).

+ 2 - 1
apps/emqx/test/emqx_ratelimiter_SUITE.erl

@@ -749,7 +749,8 @@ delay_return(Case) ->
     end.
 
 connect(Name) ->
-    emqx_limiter_server:connect(message_routing, Name).
+    {ok, Limiter} = emqx_limiter_server:connect(message_routing, Name),
+    Limiter.
 
 check_average_rate(Counter, Second, Rate) ->
     Cost = counters:get(Counter, 1),

+ 0 - 10
apps/emqx_retainer/etc/emqx_retainer.conf

@@ -56,16 +56,6 @@ retainer {
     ##
     ## Default: 0
     batch_deliver_number = 0
-
-    ## The rate limiter name for retained messages delivery.
-    ## In order to avoid delivering too many messages to the client at once,  which may cause the client
-    ## to block or crash, or message dropped due to exceeding the size of the message queue. We need
-    ## to specify a rate limiter for the retained messages delivery to the client.
-    ##
-    ## The names of the available rate limiters are taken from the existing rate limiters under `limiter.batch`.
-    ## You can remove this field if you don't want any limit
-    ## Default: retainer
-    batch_deliver_limiter = retainer
   }
 
   ## Maximum retained message size.

+ 3 - 3
apps/emqx_retainer/src/emqx_retainer_dispatcher.erl

@@ -111,8 +111,8 @@ start_link(Pool, Id) ->
 init([Pool, Id]) ->
     erlang:process_flag(trap_exit, true),
     true = gproc_pool:connect_worker(Pool, {Pool, Id}),
-    BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]),
-    Limiter = emqx_limiter_server:connect(batch, BucketName),
+    BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter], undefined),
+    {ok, Limiter} = emqx_limiter_server:connect(batch, BucketName),
     {ok, #{pool => Pool, id => Id, limiter => Limiter}}.
 
 %%--------------------------------------------------------------------
@@ -152,7 +152,7 @@ handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) ->
     {noreply, State#{limiter := Limiter2}};
 handle_cast(refresh_limiter, State) ->
     BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]),
-    Limiter = emqx_limiter_server:connect(batch, BucketName),
+    {ok, Limiter} = emqx_limiter_server:connect(batch, BucketName),
     {noreply, State#{limiter := Limiter}};
 handle_cast(Msg, State) ->
     ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),