Просмотр исходного кода

refactor(limiter): refactor the user interface

firest 3 лет назад
Родитель
Сommit
d3f965dfe7

+ 8 - 14
apps/emqx/i18n/emqx_limiter_i18n.conf

@@ -124,20 +124,6 @@ the check/consume will succeed, but it will be forced to wait for a short period
     }
   }
 
-  batch {
-    desc {
-      en: """The batch limiter.
-This is used for EMQX internal batch operation
-e.g. limit the retainer's deliver rate"""
-      zh: """批量操作速率控制器。
-这是给 EMQX 内部的批量操作使用的,比如用来控制保留消息的派发速率"""
-    }
-    label: {
-      en: """Batch"""
-      zh: """批量操作"""
-    }
-  }
-
   message_routing {
     desc {
       en: """The message routing limiter.
@@ -193,4 +179,12 @@ Once the limit is reached, the restricted client will be slow down even be hung
       zh: """流入字节率"""
     }
   }
+
+  internal {
+    desc {
+      en: """Limiter for EMQX internal app."""
+      zh: """EMQX 内部功能所用限制器。"""
+
+    }
+  }
 }

+ 5 - 5
apps/emqx/src/emqx_channel.erl

@@ -252,11 +252,12 @@ init(
             <<>> -> undefined;
             MP -> MP
         end,
+    ListenerId = emqx_listeners:listener_id(Type, Listener),
     ClientInfo = set_peercert_infos(
         Peercert,
         #{
             zone => Zone,
-            listener => emqx_listeners:listener_id(Type, Listener),
+            listener => ListenerId,
             protocol => Protocol,
             peerhost => PeerHost,
             sockport => SockPort,
@@ -278,7 +279,9 @@ init(
             outbound => #{}
         },
         auth_cache = #{},
-        quota = emqx_limiter_container:get_limiter_by_names([?LIMITER_ROUTING], LimiterCfg),
+        quota = emqx_limiter_container:get_limiter_by_types(
+            ListenerId, [?LIMITER_ROUTING], LimiterCfg
+        ),
         timers = #{},
         conn_state = idle,
         takeover = false,
@@ -1199,9 +1202,6 @@ handle_call(
     disconnect_and_shutdown(takenover, AllPendings, Channel);
 handle_call(list_authz_cache, Channel) ->
     {reply, emqx_authz_cache:list_authz_cache(), Channel};
-handle_call({quota, Bucket}, #channel{quota = Quota} = Channel) ->
-    Quota2 = emqx_limiter_container:update_by_name(message_routing, Bucket, Quota),
-    reply(ok, Channel#channel{quota = Quota2});
 handle_call(
     {keepalive, Interval},
     Channel = #channel{

+ 1 - 7
apps/emqx/src/emqx_connection.erl

@@ -321,7 +321,7 @@ init_state(
     },
 
     LimiterTypes = [?LIMITER_BYTES_IN, ?LIMITER_MESSAGE_IN],
-    Limiter = emqx_limiter_container:get_limiter_by_names(LimiterTypes, LimiterCfg),
+    Limiter = emqx_limiter_container:get_limiter_by_types(Listener, LimiterTypes, LimiterCfg),
 
     FrameOpts = #{
         strict_mode => emqx_config:get_zone_conf(Zone, [mqtt, strict_mode]),
@@ -672,12 +672,6 @@ handle_call(_From, info, State) ->
     {reply, info(State), State};
 handle_call(_From, stats, State) ->
     {reply, stats(State), State};
-handle_call(_From, {ratelimit, Changes}, State = #state{limiter = Limiter}) ->
-    Fun = fun({Type, Bucket}, Acc) ->
-        emqx_limiter_container:update_by_name(Type, Bucket, Acc)
-    end,
-    Limiter2 = lists:foldl(Fun, Limiter, Changes),
-    {reply, ok, State#state{limiter = Limiter2}};
 handle_call(_From, Req, State = #state{channel = Channel}) ->
     case emqx_channel:handle_call(Req, Channel) of
         {reply, Reply, NChannel} ->

+ 9 - 7
apps/emqx/src/emqx_limiter/src/emqx_esockd_htb_limiter.erl

@@ -19,12 +19,13 @@
 -behaviour(esockd_generic_limiter).
 
 %% API
--export([new_create_options/2, create/1, delete/1, consume/2]).
+-export([new_create_options/3, create/1, delete/1, consume/2]).
 
 -type create_options() :: #{
     module := ?MODULE,
+    id := emqx_limiter_schema:limiter_id(),
     type := emqx_limiter_schema:limiter_type(),
-    bucket := emqx_limiter_schema:bucket_name()
+    bucket := hocons:config()
 }.
 
 %%--------------------------------------------------------------------
@@ -32,15 +33,16 @@
 %%--------------------------------------------------------------------
 
 -spec new_create_options(
+    emqx_limiter_schema:limiter_id(),
     emqx_limiter_schema:limiter_type(),
-    emqx_limiter_schema:bucket_name()
+    hocons:config()
 ) -> create_options().
-new_create_options(Type, BucketName) ->
-    #{module => ?MODULE, type => Type, bucket => BucketName}.
+new_create_options(Id, Type, BucketCfg) ->
+    #{module => ?MODULE, id => Id, type => Type, bucket => BucketCfg}.
 
 -spec create(create_options()) -> esockd_generic_limiter:limiter().
-create(#{module := ?MODULE, type := Type, bucket := BucketName}) ->
-    {ok, Limiter} = emqx_limiter_server:connect(Type, BucketName),
+create(#{module := ?MODULE, id := Id, type := Type, bucket := BucketCfg}) ->
+    {ok, Limiter} = emqx_limiter_server:connect(Id, Type, BucketCfg),
     #{module => ?MODULE, name => Type, limiter => Limiter}.
 
 delete(_GLimiter) ->

+ 10 - 34
apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl

@@ -22,10 +22,8 @@
 
 %% API
 -export([
-    new/0, new/1, new/2,
-    get_limiter_by_names/2,
+    get_limiter_by_types/3,
     add_new/3,
-    update_by_name/3,
     set_retry_context/2,
     check/3,
     retry/2,
@@ -48,10 +46,10 @@
 }.
 
 -type future() :: pos_integer().
+-type limiter_id() :: emqx_limiter_schema:limiter_id().
 -type limiter_type() :: emqx_limiter_schema:limiter_type().
 -type limiter() :: emqx_htb_limiter:limiter().
 -type retry_context() :: emqx_htb_limiter:retry_context().
--type bucket_name() :: emqx_limiter_schema:bucket_name().
 -type millisecond() :: non_neg_integer().
 -type check_result() ::
     {ok, container()}
@@ -64,46 +62,24 @@
 %%--------------------------------------------------------------------
 %%  API
 %%--------------------------------------------------------------------
--spec new() -> container().
-new() ->
-    new([]).
-
-%% @doc generate default data according to the type of limiter
--spec new(list(limiter_type())) -> container().
-new(Types) ->
-    new(Types, #{}).
-
--spec new(
-    list(limiter_type()),
-    #{limiter_type() => emqx_limiter_schema:bucket_name()}
-) -> container().
-new(Types, Names) ->
-    get_limiter_by_names(Types, Names).
-
 %% @doc generate a container
 %% according to the type of limiter and the bucket name configuration of the limiter
 %% @end
--spec get_limiter_by_names(
+-spec get_limiter_by_types(
+    limiter_id() | {atom(), atom()},
     list(limiter_type()),
-    #{limiter_type() => emqx_limiter_schema:bucket_name()}
+    #{limiter_type() => hocons:config()}
 ) -> container().
-get_limiter_by_names(Types, BucketNames) ->
+get_limiter_by_types({Type, Listener}, Types, BucketCfgs) ->
+    Id = emqx_listeners:listener_id(Type, Listener),
+    get_limiter_by_types(Id, Types, BucketCfgs);
+get_limiter_by_types(Id, Types, BucketCfgs) ->
     Init = fun(Type, Acc) ->
-        {ok, Limiter} = emqx_limiter_server:connect(Type, BucketNames),
+        {ok, Limiter} = emqx_limiter_server:connect(Id, Type, BucketCfgs),
         add_new(Type, Limiter, Acc)
     end,
     lists:foldl(Init, #{retry_ctx => undefined}, Types).
 
-%% @doc add the specified type of limiter to the container
--spec update_by_name(
-    limiter_type(),
-    bucket_name() | #{limiter_type() => bucket_name()},
-    container()
-) -> container().
-update_by_name(Type, Buckets, Container) ->
-    {ok, Limiter} = emqx_limiter_server:connect(Type, Buckets),
-    add_new(Type, Limiter, Container).
-
 -spec add_new(limiter_type(), limiter(), container()) -> container().
 add_new(Type, Limiter, Container) ->
     Container#{

+ 18 - 32
apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl

@@ -24,11 +24,9 @@
 %% API
 -export([
     start_link/0,
-    find_bucket/1,
     find_bucket/2,
-    insert_bucket/2,
     insert_bucket/3,
-    make_path/2,
+    delete_bucket/2,
     post_config_update/5
 ]).
 
@@ -50,20 +48,19 @@
     format_status/2
 ]).
 
--export_type([path/0]).
-
--type path() :: list(atom()).
+-type limiter_id() :: emqx_limiter_schema:limiter_id().
 -type limiter_type() :: emqx_limiter_schema:limiter_type().
--type bucket_name() :: emqx_limiter_schema:bucket_name().
+-type uid() :: {limiter_id(), limiter_type()}.
 
 %% counter record in ets table
 -record(bucket, {
-    path :: path(),
+    uid :: uid(),
     bucket :: bucket_ref()
 }).
 
 -type bucket_ref() :: emqx_limiter_bucket_ref:bucket_ref().
 
+-define(UID(Id, Type), {Id, Type}).
 -define(TAB, emqx_limiter_counters).
 
 %%--------------------------------------------------------------------
@@ -85,14 +82,10 @@ restart_server(Type) ->
 stop_server(Type) ->
     emqx_limiter_server_sup:stop(Type).
 
--spec find_bucket(limiter_type(), bucket_name()) ->
+-spec find_bucket(limiter_id(), limiter_type()) ->
     {ok, bucket_ref()} | undefined.
-find_bucket(Type, BucketName) ->
-    find_bucket(make_path(Type, BucketName)).
-
--spec find_bucket(path()) -> {ok, bucket_ref()} | undefined.
-find_bucket(Path) ->
-    case ets:lookup(?TAB, Path) of
+find_bucket(Id, Type) ->
+    case ets:lookup(?TAB, ?UID(Id, Type)) of
         [#bucket{bucket = Bucket}] ->
             {ok, Bucket};
         _ ->
@@ -100,20 +93,19 @@ find_bucket(Path) ->
     end.
 
 -spec insert_bucket(
+    limiter_id(),
     limiter_type(),
-    bucket_name(),
     bucket_ref()
 ) -> boolean().
-insert_bucket(Type, BucketName, Bucket) ->
-    inner_insert_bucket(make_path(Type, BucketName), Bucket).
-
--spec insert_bucket(path(), bucket_ref()) -> true.
-insert_bucket(Path, Bucket) ->
-    inner_insert_bucket(Path, Bucket).
+insert_bucket(Id, Type, Bucket) ->
+    ets:insert(
+        ?TAB,
+        #bucket{uid = ?UID(Id, Type), bucket = Bucket}
+    ).
 
--spec make_path(limiter_type(), bucket_name()) -> path().
-make_path(Type, BucketName) ->
-    [Type | BucketName].
+-spec delete_bucket(limiter_id(), limiter_type()) -> true.
+delete_bucket(Type, Id) ->
+    ets:delete(?TAB, ?UID(Id, Type)).
 
 post_config_update([limiter, Type], _Config, NewConf, _OldConf, _AppEnvs) ->
     Config = maps:get(Type, NewConf),
@@ -159,7 +151,7 @@ init([]) ->
         set,
         public,
         named_table,
-        {keypos, #bucket.path},
+        {keypos, #bucket.uid},
         {write_concurrency, true},
         {read_concurrency, true},
         {heir, erlang:whereis(emqx_limiter_sup), none}
@@ -266,9 +258,3 @@ format_status(_Opt, Status) ->
 %%--------------------------------------------------------------------
 %%  Internal functions
 %%--------------------------------------------------------------------
--spec inner_insert_bucket(path(), bucket_ref()) -> true.
-inner_insert_bucket(Path, Bucket) ->
-    ets:insert(
-        ?TAB,
-        #bucket{path = Path, bucket = Bucket}
-    ).

+ 38 - 45
apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl

@@ -31,7 +31,9 @@
     get_bucket_cfg_path/2,
     desc/1,
     types/0,
-    infinity_value/0
+    infinity_value/0,
+    bucket_opts/0,
+    bucket_opts_meta/0
 ]).
 
 -define(KILOBYTE, 1024).
@@ -41,8 +43,10 @@
     | message_in
     | connection
     | message_routing
-    | batch.
+    %% internal limiter for unclassified resources
+    | internal.
 
+-type limiter_id() :: atom().
 -type bucket_name() :: atom().
 -type rate() :: infinity | float().
 -type burst_rate() :: 0 | float().
@@ -76,7 +80,7 @@
     bucket_name/0
 ]).
 
--export_type([limiter_type/0, bucket_path/0]).
+-export_type([limiter_id/0, limiter_type/0, bucket_path/0]).
 
 -define(UNIT_TIME_IN_MS, 1000).
 
@@ -87,13 +91,13 @@ roots() -> [limiter].
 fields(limiter) ->
     [
         {Type,
-            ?HOCON(?R_REF(limiter_opts), #{
+            ?HOCON(?R_REF(node_opts), #{
                 desc => ?DESC(Type),
                 default => make_limiter_default(Type)
             })}
      || Type <- types()
     ];
-fields(limiter_opts) ->
+fields(node_opts) ->
     [
         {rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => "infinity"})},
         {burst,
@@ -101,38 +105,16 @@ fields(limiter_opts) ->
                 desc => ?DESC(burst),
                 default => 0
             })},
-        {bucket,
-            ?HOCON(
-                ?MAP("bucket_name", ?R_REF(bucket_opts)),
-                #{
-                    desc => ?DESC(bucket_cfg),
-                    default => #{<<"default">> => #{}},
-                    example => #{
-                        <<"mybucket-name">> => #{
-                            <<"rate">> => <<"infinity">>,
-                            <<"capcity">> => <<"infinity">>,
-                            <<"initial">> => <<"100">>,
-                            <<"per_client">> => #{<<"rate">> => <<"infinity">>}
-                        }
-                    }
-                }
-            )}
+        {client, ?HOCON(?R_REF(client_opts), #{default => #{}})}
     ];
 fields(bucket_opts) ->
     [
         {rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => "infinity"})},
         {capacity, ?HOCON(capacity(), #{desc => ?DESC(capacity), default => "infinity"})},
         {initial, ?HOCON(initial(), #{default => "0", desc => ?DESC(initial)})},
-        {per_client,
-            ?HOCON(
-                ?R_REF(client_bucket),
-                #{
-                    default => #{},
-                    desc => ?DESC(per_client)
-                }
-            )}
+        {client, ?HOCON(?R_REF(client_opts), #{required => false})}
     ];
-fields(client_bucket) ->
+fields(client_opts) ->
     [
         {rate, ?HOCON(rate(), #{default => "infinity", desc => ?DESC(rate)})},
         {initial, ?HOCON(initial(), #{default => "0", desc => ?DESC(initial)})},
@@ -181,15 +163,35 @@ fields(client_bucket) ->
 
 desc(limiter) ->
     "Settings for the rate limiter.";
-desc(limiter_opts) ->
-    "Settings for the limiter.";
+desc(node_opts) ->
+    "Settings for the limiter of the node level.";
+desc(node_client_opts) ->
+    "Settings for the client in the node level.";
 desc(bucket_opts) ->
     "Settings for the bucket.";
-desc(client_bucket) ->
-    "Settings for the client bucket.";
+desc(client_opts) ->
+    "Settings for the client in bucket level.";
 desc(_) ->
     undefined.
 
+bucket_opts() ->
+    ?HOCON(
+        ?MAP("bucket_name", ?R_REF(bucket_opts)),
+        bucket_opts_meta()
+    ).
+
+bucket_opts_meta() ->
+    #{
+        default => #{},
+        example =>
+            #{
+                <<"rate">> => <<"infinity">>,
+                <<"capcity">> => <<"infinity">>,
+                <<"initial">> => <<"100">>,
+                <<"client">> => #{<<"rate">> => <<"infinity">>}
+            }
+    }.
+
 %% default period is 100ms
 default_period() ->
     100.
@@ -202,7 +204,7 @@ get_bucket_cfg_path(Type, BucketName) ->
     [limiter, Type, bucket, BucketName].
 
 types() ->
-    [bytes_in, message_in, connection, message_routing, batch].
+    [bytes_in, message_in, connection, message_routing, internal].
 
 %%--------------------------------------------------------------------
 %% Internal functions
@@ -323,15 +325,6 @@ apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE;
 apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit).
 
 make_limiter_default(connection) ->
-    #{
-        <<"rate">> => <<"1000/s">>,
-        <<"bucket">> => #{
-            <<"default">> =>
-                #{
-                    <<"rate">> => <<"1000/s">>,
-                    <<"capacity">> => 1000
-                }
-        }
-    };
+    #{<<"rate">> => <<"1000/s">>};
 make_limiter_default(_) ->
     #{}.

+ 97 - 107
apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl

@@ -42,11 +42,13 @@
 
 -export([
     start_link/2,
-    connect/2,
+    connect/3,
+    add_bucket/3,
+    del_bucket/2,
+    get_initial_val/1,
     whereis/1,
     info/1,
     name/1,
-    get_initial_val/1,
     restart/1,
     update_config/2
 ]).
@@ -83,6 +85,7 @@
 -type buckets() :: #{bucket_name() => bucket()}.
 -type limiter_type() :: emqx_limiter_schema:limiter_type().
 -type bucket_name() :: emqx_limiter_schema:bucket_name().
+-type limiter_id() :: emqx_limiter_schema:limiter_id().
 -type rate() :: decimal().
 -type flow() :: decimal().
 -type capacity() :: decimal().
@@ -94,7 +97,7 @@
 
 %% minimum coefficient for overloaded limiter
 -define(OVERLOAD_MIN_ALLOC, 0.3).
--define(CURRYING(X, F2), fun(Y) -> F2(X, Y) end).
+-define(COUNTER_SIZE, 8).
 
 -export_type([index/0]).
 -import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]).
@@ -105,39 +108,53 @@
 %% API
 %%--------------------------------------------------------------------
 -spec connect(
+    limiter_id(),
     limiter_type(),
     bucket_name() | #{limiter_type() => bucket_name() | undefined}
 ) ->
     {ok, emqx_htb_limiter:limiter()} | {error, _}.
 %% If no bucket path is set in config, there will be no limit
-connect(_Type, undefined) ->
+connect(_Id, _Type, undefined) ->
     {ok, emqx_htb_limiter:make_infinity_limiter()};
-connect(Type, BucketName) when is_atom(BucketName) ->
-    case get_bucket_cfg(Type, BucketName) of
-        undefined ->
-            ?SLOG(error, #{msg => "bucket_config_not_found", type => Type, bucket => BucketName}),
-            {error, config_not_found};
-        #{
-            rate := BucketRate,
-            capacity := BucketSize,
-            per_client := #{rate := CliRate, capacity := CliSize} = Cfg
-        } ->
-            case emqx_limiter_manager:find_bucket(Type, BucketName) of
-                {ok, Bucket} ->
+connect(
+    Id,
+    Type,
+    #{
+        rate := BucketRate,
+        capacity := BucketSize
+    } = BucketCfg
+) ->
+    case emqx_limiter_manager:find_bucket(Id, Type) of
+        {ok, Bucket} ->
+            case find_client_cfg(Type, BucketCfg) of
+                #{rate := CliRate, capacity := CliSize} = ClientCfg ->
                     {ok,
                         if
                             CliRate < BucketRate orelse CliSize < BucketSize ->
-                                emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket);
+                                emqx_htb_limiter:make_token_bucket_limiter(ClientCfg, Bucket);
                             true ->
-                                emqx_htb_limiter:make_ref_limiter(Cfg, Bucket)
+                                emqx_htb_limiter:make_ref_limiter(ClientCfg, Bucket)
                         end};
-                undefined ->
-                    ?SLOG(error, #{msg => "bucket_not_found", type => Type, bucket => BucketName}),
-                    {error, invalid_bucket}
-            end
+                {error, invalid_node_cfg} = Error ->
+                    ?SLOG(error, #{msg => "invalid_node_cfg", type => Type, id => Id}),
+                    Error
+            end;
+        undefined ->
+            ?SLOG(error, #{msg => "bucket_not_found", type => Type, id => Id}),
+            {error, invalid_bucket}
     end;
-connect(Type, Paths) ->
-    connect(Type, maps:get(Type, Paths, undefined)).
+connect(Id, Type, Paths) ->
+    connect(Id, Type, maps:get(Type, Paths, undefined)).
+
+-spec add_bucket(limiter_id(), limiter_type(), hocons:config() | undefined) -> ok.
+add_bucket(_Id, _Type, undefine) ->
+    ok;
+add_bucket(Id, Type, Cfg) ->
+    ?CALL(Type, {add_bucket, Id, Cfg}).
+
+-spec del_bucket(limiter_id(), limiter_type()) -> ok.
+del_bucket(Id, Type) ->
+    ?CALL(Type, {del_bucket, Id}).
 
 -spec info(limiter_type()) -> state() | {error, _}.
 info(Type) ->
@@ -213,6 +230,12 @@ handle_call(restart, _From, #{type := Type}) ->
 handle_call({update_config, Type, Config}, _From, #{type := Type}) ->
     NewState = init_tree(Type, Config),
     {reply, ok, NewState};
+handle_call({add_bucket, Id, Cfg}, _From, State) ->
+    NewState = do_add_bucket(Id, Cfg, State),
+    {reply, ok, NewState};
+handle_call({del_bucket, Id}, _From, State) ->
+    NewState = do_del_bucket(Id, State),
+    {reply, ok, NewState};
 handle_call(Req, _From, State) ->
     ?SLOG(error, #{msg => "unexpected_call", call => Req}),
     {reply, ignored, State}.
@@ -456,24 +479,14 @@ init_tree(Type) when is_atom(Type) ->
     Cfg = emqx:get_config([limiter, Type]),
     init_tree(Type, Cfg).
 
-init_tree(Type, #{bucket := Buckets} = Cfg) ->
-    State = #{
+init_tree(Type, Cfg) ->
+    #{
         type => Type,
-        root => undefined,
-        counter => undefined,
-        index => 1,
+        root => make_root(Cfg),
+        counter => counters:new(?COUNTER_SIZE, [write_concurrency]),
+        index => 0,
         buckets => #{}
-    },
-
-    Root = make_root(Cfg),
-    {CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, 1, []),
-
-    State2 = State#{
-        root := Root,
-        counter := counters:new(CounterNum, [write_concurrency])
-    },
-
-    lists:foldl(fun(F, Acc) -> F(Acc) end, State2, DelayBuckets).
+    }.
 
 -spec make_root(hocons:confg()) -> root().
 make_root(#{rate := Rate, burst := Burst}) ->
@@ -484,79 +497,50 @@ make_root(#{rate := Rate, burst := Burst}) ->
         produced => 0.0
     }.
 
-make_bucket([{Name, Conf} | T], Type, GlobalCfg, CounterNum, DelayBuckets) ->
-    Path = emqx_limiter_manager:make_path(Type, Name),
-    Rate = get_counter_rate(Conf, GlobalCfg),
-    #{capacity := Capacity} = Conf,
-    Initial = get_initial_val(Conf),
-    CounterNum2 = CounterNum + 1,
-    InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) ->
-        {Counter, Idx, State2} = alloc_counter(Path, Rate, Initial, State),
-        Bucket2 = Bucket#{counter := Counter, index := Idx},
-        State2#{buckets := Buckets#{BucketName => Bucket2}}
-    end,
+do_add_bucket(Id, #{rate := Rate, capacity := Capacity} = Cfg, #{buckets := Buckets} = State) ->
+    case maps:get(Id, Buckets, undefined) of
+        undefined ->
+            make_bucket(Id, Cfg, State);
+        Bucket ->
+            Bucket2 = Bucket#{rate := Rate, capacity := Capacity},
+            State#{buckets := Buckets#{Id := Bucket2}}
+    end.
 
+make_bucket(Id, Cfg, #{index := ?COUNTER_SIZE} = State) ->
+    add_bucket(Id, Cfg, State#{
+        counter => counters:new(?COUNTER_SIZE, [write_concurrency]),
+        index => 0
+    });
+make_bucket(
+    Id,
+    #{rate := Rate, capacity := Capacity} = Cfg,
+    #{type := Type, counter := Counter, index := Index, buckets := Buckets} = State
+) ->
+    NewIndex = Index + 1,
+    Initial = get_initial_val(Cfg),
     Bucket = #{
-        name => Name,
+        name => Id,
         rate => Rate,
         obtained => Initial,
         correction => 0,
         capacity => Capacity,
-        counter => undefined,
-        index => undefined
+        counter => Counter,
+        index => NewIndex
     },
+    _ = put_to_counter(Counter, NewIndex, Initial),
+    Ref = emqx_limiter_bucket_ref:new(Counter, NewIndex, Rate),
+    emqx_limiter_manager:insert_bucket(Id, Type, Ref),
+    State#{buckets := Buckets#{Id => Bucket}}.
 
-    DelayInit = ?CURRYING(Bucket, InitFun),
-
-    make_bucket(
-        T,
-        Type,
-        GlobalCfg,
-        CounterNum2,
-        [DelayInit | DelayBuckets]
-    );
-make_bucket([], _Type, _Global, CounterNum, DelayBuckets) ->
-    {CounterNum, DelayBuckets}.
-
--spec alloc_counter(emqx_limiter_manager:path(), rate(), capacity(), state()) ->
-    {counters:counters_ref(), pos_integer(), state()}.
-alloc_counter(
-    Path,
-    Rate,
-    Initial,
-    #{counter := Counter, index := Index} = State
-) ->
-    case emqx_limiter_manager:find_bucket(Path) of
-        {ok, #{
-            counter := ECounter,
-            index := EIndex
-        }} when ECounter =/= undefined ->
-            init_counter(Path, ECounter, EIndex, Rate, Initial, State);
+do_del_bucket(Id, #{type := Type, buckets := Buckets} = State) ->
+    case maps:get(Id, Buckets, undefined) of
+        undefined ->
+            State;
         _ ->
-            init_counter(
-                Path,
-                Counter,
-                Index,
-                Rate,
-                Initial,
-                State#{index := Index + 1}
-            )
+            emqx_limiter_manager:delete_bucket(Id, Type),
+            State#{buckets := maps:remove(Id, Buckets)}
     end.
 
-init_counter(Path, Counter, Index, Rate, Initial, State) ->
-    _ = put_to_counter(Counter, Index, Initial),
-    Ref = emqx_limiter_bucket_ref:new(Counter, Index, Rate),
-    emqx_limiter_manager:insert_bucket(Path, Ref),
-    {Counter, Index, State}.
-
-%% @doc find first limited node
-get_counter_rate(#{rate := Rate}, _GlobalCfg) when Rate =/= infinity ->
-    Rate;
-get_counter_rate(_Cfg, #{rate := Rate}) when Rate =/= infinity ->
-    Rate;
-get_counter_rate(_Cfg, _GlobalCfg) ->
-    emqx_limiter_schema:infinity_value().
-
 -spec get_initial_val(hocons:config()) -> decimal().
 get_initial_val(
     #{
@@ -587,8 +571,14 @@ call(Type, Msg) ->
             gen_server:call(Pid, Msg)
     end.
 
--spec get_bucket_cfg(limiter_type(), bucket_name()) ->
-    undefined | limiter_not_started | hocons:config().
-get_bucket_cfg(Type, Bucket) ->
-    Path = emqx_limiter_schema:get_bucket_cfg_path(Type, Bucket),
-    emqx:get_config(Path, undefined).
+find_client_cfg(Type, Cfg) ->
+    NodeCfg = emqx:get_config([limiter, Type, client], undefined),
+    BucketCfg = maps:get(client, Cfg, undefined),
+    merge_client_cfg(NodeCfg, BucketCfg).
+
+merge_client_cfg(undefined, BucketCfg) ->
+    BucketCfg;
+merge_client_cfg(NodeCfg, undefined) ->
+    NodeCfg;
+merge_client_cfg(NodeCfg, BucketCfg) ->
+    maps:merge(NodeCfg, BucketCfg).

+ 47 - 12
apps/emqx/src/emqx_listeners.erl

@@ -279,12 +279,19 @@ stop_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
     end.
 
 -spec do_stop_listener(atom(), atom(), map()) -> ok | {error, term()}.
-do_stop_listener(Type, ListenerName, #{bind := ListenOn}) when Type == tcp; Type == ssl ->
-    esockd:close(listener_id(Type, ListenerName), ListenOn);
-do_stop_listener(Type, ListenerName, _Conf) when Type == ws; Type == wss ->
-    cowboy:stop_listener(listener_id(Type, ListenerName));
-do_stop_listener(quic, ListenerName, _Conf) ->
-    quicer:stop_listener(listener_id(quic, ListenerName)).
+
+do_stop_listener(Type, ListenerName, #{bind := ListenOn} = Conf) when Type == tcp; Type == ssl ->
+    Id = listener_id(Type, ListenerName),
+    del_limiter_bucket(Id, Conf),
+    esockd:close(Id, ListenOn);
+do_stop_listener(Type, ListenerName, Conf) when Type == ws; Type == wss ->
+    Id = listener_id(Type, ListenerName),
+    del_limiter_bucket(Id, Conf),
+    cowboy:stop_listener(Id);
+do_stop_listener(quic, ListenerName, Conf) ->
+    Id = listener_id(quic, ListenerName),
+    del_limiter_bucket(Id, Conf),
+    quicer:stop_listener(Id).
 
 -ifndef(TEST).
 console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
@@ -300,10 +307,12 @@ do_start_listener(_Type, _ListenerName, #{enabled := false}) ->
 do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
     Type == tcp; Type == ssl
 ->
+    Id = listener_id(Type, ListenerName),
+    add_limiter_bucket(Id, Opts),
     esockd:open(
-        listener_id(Type, ListenerName),
+        Id,
         ListenOn,
-        merge_default(esockd_opts(Type, Opts)),
+        merge_default(esockd_opts(Id, Type, Opts)),
         {emqx_connection, start_link, [
             #{
                 listener => {Type, ListenerName},
@@ -318,6 +327,7 @@ do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
     Type == ws; Type == wss
 ->
     Id = listener_id(Type, ListenerName),
+    add_limiter_bucket(Id, Opts),
     RanchOpts = ranch_opts(Type, ListenOn, Opts),
     WsOpts = ws_opts(Type, ListenerName, Opts),
     case Type of
@@ -352,8 +362,10 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) ->
                 limiter => limiter(Opts)
             },
             StreamOpts = [{stream_callback, emqx_quic_stream}],
+            Id = listener_id(quic, ListenerName),
+            add_limiter_bucket(Id, Opts),
             quicer:start_listener(
-                listener_id(quic, ListenerName),
+                Id,
                 port(ListenOn),
                 {ListenOpts, ConnectionOpts, StreamOpts}
             );
@@ -410,16 +422,18 @@ post_config_update([listeners, Type, Name], {action, _Action, _}, NewConf, OldCo
 post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) ->
     ok.
 
-esockd_opts(Type, Opts0) ->
+esockd_opts(ListenerId, Type, Opts0) ->
     Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
     Limiter = limiter(Opts0),
     Opts2 =
         case maps:get(connection, Limiter, undefined) of
             undefined ->
                 Opts1;
-            BucketName ->
+            BucketCfg ->
                 Opts1#{
-                    limiter => emqx_esockd_htb_limiter:new_create_options(connection, BucketName)
+                    limiter => emqx_esockd_htb_limiter:new_create_options(
+                        ListenerId, connection, BucketCfg
+                    )
                 }
         end,
     Opts3 = Opts2#{
@@ -524,6 +538,27 @@ zone(Opts) ->
 limiter(Opts) ->
     maps:get(limiter, Opts, #{}).
 
+add_limiter_bucket(Id, #{limiter := Limiters}) ->
+    maps:fold(
+        fun(Type, Cfg, _) ->
+            emqx_limiter_server:add_bucket(Id, Type, Cfg)
+        end,
+        ok,
+        Limiters
+    );
+add_limiter_bucket(_Id, _cfg) ->
+    ok.
+
+del_limiter_bucket(Id, #{limiter := Limiters}) ->
+    lists:foreach(
+        fun(Type) ->
+            emqx_limiter_server:del_bucket(Id, Type)
+        end,
+        maps:keys(Limiters)
+    );
+del_limiter_bucket(_Id, _cfg) ->
+    ok.
+
 enable_authn(Opts) ->
     maps:get(enable_authn, Opts, true).
 

+ 3 - 3
apps/emqx/src/emqx_schema.erl

@@ -1619,10 +1619,10 @@ base_listener(Bind) ->
             )},
         {"limiter",
             sc(
-                map("ratelimit_name", emqx_limiter_schema:bucket_name()),
+                map("ratelimit_name", ?R_REF(emqx_limiter_schema, bucket_opts)),
                 #{
-                    desc => ?DESC(base_listener_limiter),
-                    default => #{<<"connection">> => <<"default">>}
+                    desc => ?DESC(base_listener_limiter)
+                    %% TODO                    default => #{<<"connection">> => <<"default">>}
                 }
             )},
         {"enable_authn",

+ 5 - 6
apps/emqx/src/emqx_ws_connection.erl

@@ -273,7 +273,7 @@ check_origin_header(Req, #{listener := {Type, Listener}} = Opts) ->
     end.
 
 websocket_init([Req, Opts]) ->
-    #{zone := Zone, limiter := LimiterCfg, listener := {Type, Listener}} = Opts,
+    #{zone := Zone, limiter := LimiterCfg, listener := {Type, Listener} = ListenerCfg} = Opts,
     case check_max_connection(Type, Listener) of
         allow ->
             {Peername, PeerCert} = get_peer_info(Type, Listener, Req, Opts),
@@ -287,8 +287,10 @@ websocket_init([Req, Opts]) ->
                 ws_cookie => WsCookie,
                 conn_mod => ?MODULE
             },
-            Limiter = emqx_limiter_container:get_limiter_by_names(
-                [?LIMITER_BYTES_IN, ?LIMITER_MESSAGE_IN], LimiterCfg
+            Limiter = emqx_limiter_container:get_limiter_by_types(
+                ListenerCfg,
+                [?LIMITER_BYTES_IN, ?LIMITER_MESSAGE_IN],
+                LimiterCfg
             ),
             MQTTPiggyback = get_ws_opts(Type, Listener, mqtt_piggyback),
             FrameOpts = #{
@@ -487,9 +489,6 @@ handle_call(From, info, State) ->
 handle_call(From, stats, State) ->
     gen_server:reply(From, stats(State)),
     return(State);
-handle_call(_From, {ratelimit, Type, Bucket}, State = #state{limiter = Limiter}) ->
-    Limiter2 = emqx_limiter_container:update_by_name(Type, Bucket, Limiter),
-    {reply, ok, State#state{limiter = Limiter2}};
 handle_call(From, Req, State = #state{channel = Channel}) ->
     case emqx_channel:handle_call(Req, Channel) of
         {reply, Reply, NChannel} ->

+ 32 - 81
apps/emqx/test/emqx_channel_SUITE.erl

@@ -33,18 +33,6 @@ force_gc_conf() ->
 force_shutdown_conf() ->
     #{enable => true, max_heap_size => 4194304, max_message_queue_len => 1000}.
 
-rate_limit_conf() ->
-    #{
-        conn_bytes_in => ["100KB", "10s"],
-        conn_messages_in => ["100", "10s"],
-        max_conn_rate => 1000,
-        quota =>
-            #{
-                conn_messages_routing => infinity,
-                overall_messages_routing => infinity
-            }
-    }.
-
 rpc_conf() ->
     #{
         async_batch_size => 256,
@@ -173,27 +161,9 @@ listeners_conf() ->
 limiter_conf() ->
     Make = fun() ->
         #{
-            bucket =>
-                #{
-                    default =>
-                        #{
-                            capacity => infinity,
-                            initial => 0,
-                            rate => infinity,
-                            per_client =>
-                                #{
-                                    capacity => infinity,
-                                    divisible => false,
-                                    failure_strategy => force,
-                                    initial => 0,
-                                    low_watermark => 0,
-                                    max_retry_time => 5000,
-                                    rate => infinity
-                                }
-                        }
-                },
             burst => 0,
-            rate => infinity
+            rate => infinity,
+            capacity => infinity
         }
     end,
 
@@ -202,7 +172,7 @@ limiter_conf() ->
             Acc#{Name => Make()}
         end,
         #{},
-        [bytes_in, message_in, message_routing, connection, batch]
+        [bytes_in, message_in, message_routing, connection, internal]
     ).
 
 stats_conf() ->
@@ -213,7 +183,6 @@ zone_conf() ->
 
 basic_conf() ->
     #{
-        rate_limit => rate_limit_conf(),
         force_gc => force_gc_conf(),
         force_shutdown => force_shutdown_conf(),
         mqtt => mqtt_conf(),
@@ -274,10 +243,9 @@ end_per_suite(_Config) ->
         emqx_banned
     ]).
 
-init_per_testcase(TestCase, Config) ->
+init_per_testcase(_TestCase, Config) ->
     OldConf = set_test_listener_confs(),
     emqx_common_test_helpers:start_apps([]),
-    check_modify_limiter(TestCase),
     [{config, OldConf} | Config].
 
 end_per_testcase(_TestCase, Config) ->
@@ -285,41 +253,6 @@ end_per_testcase(_TestCase, Config) ->
     emqx_common_test_helpers:stop_apps([]),
     Config.
 
-check_modify_limiter(TestCase) ->
-    Checks = [t_quota_qos0, t_quota_qos1, t_quota_qos2],
-    case lists:member(TestCase, Checks) of
-        true ->
-            modify_limiter();
-        _ ->
-            ok
-    end.
-
-%% per_client 5/1s,5
-%% aggregated 10/1s,10
-modify_limiter() ->
-    Limiter = emqx_config:get([limiter]),
-    #{message_routing := #{bucket := Bucket} = Routing} = Limiter,
-    #{default := #{per_client := Client} = Default} = Bucket,
-    Client2 = Client#{
-        rate := 5,
-        initial := 0,
-        capacity := 5,
-        low_watermark := 1
-    },
-    Default2 = Default#{
-        per_client := Client2,
-        rate => 10,
-        initial => 0,
-        capacity => 10
-    },
-    Bucket2 = Bucket#{default := Default2},
-    Routing2 = Routing#{bucket := Bucket2},
-
-    emqx_config:put([limiter], Limiter#{message_routing := Routing2}),
-    emqx_limiter_manager:restart_server(message_routing),
-    timer:sleep(100),
-    ok.
-
 %%--------------------------------------------------------------------
 %% Test cases for channel info/stats/caps
 %%--------------------------------------------------------------------
@@ -729,6 +662,7 @@ t_process_unsubscribe(_) ->
 
 t_quota_qos0(_) ->
     esockd_limiter:start_link(),
+    add_bucket(),
     Cnter = counters:new(1, []),
     ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end),
     ok = meck:expect(
@@ -755,10 +689,12 @@ t_quota_qos0(_) ->
 
     ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
     ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
+    del_bucket(),
     esockd_limiter:stop().
 
 t_quota_qos1(_) ->
     esockd_limiter:start_link(),
+    add_bucket(),
     ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end),
     Chann = channel(#{conn_state => connected, quota => quota()}),
     Pub = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>),
@@ -769,10 +705,12 @@ t_quota_qos1(_) ->
     {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), Chann4} = emqx_channel:handle_in(Pub, Chann3),
     %% Quota in overall
     {ok, ?PUBACK_PACKET(1, ?RC_QUOTA_EXCEEDED), _} = emqx_channel:handle_in(Pub, Chann4),
+    del_bucket(),
     esockd_limiter:stop().
 
 t_quota_qos2(_) ->
     esockd_limiter:start_link(),
+    add_bucket(),
     ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end),
     Chann = channel(#{conn_state => connected, quota => quota()}),
     Pub1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
@@ -786,6 +724,7 @@ t_quota_qos2(_) ->
     {ok, ?PUBREC_PACKET(3, ?RC_SUCCESS), Chann4} = emqx_channel:handle_in(Pub3, Chann3),
     %% Quota in overall
     {ok, ?PUBREC_PACKET(4, ?RC_QUOTA_EXCEEDED), _} = emqx_channel:handle_in(Pub4, Chann4),
+    del_bucket(),
     esockd_limiter:stop().
 
 %%--------------------------------------------------------------------
@@ -952,12 +891,6 @@ t_handle_call_takeover_end(_) ->
     {shutdown, takenover, [], _, _Chan} =
         emqx_channel:handle_call({takeover, 'end'}, channel()).
 
-t_handle_call_quota(_) ->
-    {reply, ok, _Chan} = emqx_channel:handle_call(
-        {quota, default},
-        channel()
-    ).
-
 t_handle_call_unexpected(_) ->
     {reply, ignored, _Chan} = emqx_channel:handle_call(unexpected_req, channel()).
 
@@ -1176,7 +1109,7 @@ t_ws_cookie_init(_) ->
         ConnInfo,
         #{
             zone => default,
-            limiter => limiter_cfg(),
+            limiter => undefined,
             listener => {tcp, default}
         }
     ),
@@ -1210,7 +1143,7 @@ channel(InitFields) ->
             ConnInfo,
             #{
                 zone => default,
-                limiter => limiter_cfg(),
+                limiter => undefined,
                 listener => {tcp, default}
             }
         ),
@@ -1270,9 +1203,27 @@ session(InitFields) when is_map(InitFields) ->
 
 %% conn: 5/s; overall: 10/s
 quota() ->
-    emqx_limiter_container:get_limiter_by_names([message_routing], limiter_cfg()).
+    emqx_limiter_container:get_limiter_by_types(?MODULE, [message_routing], limiter_cfg()).
+
+limiter_cfg() -> #{message_routing => make_limiter_cfg()}.
+
+make_limiter_cfg() ->
+    Client = #{
+        rate => 5,
+        initial => 0,
+        capacity => 5,
+        low_watermark => 1,
+        divisible => false,
+        max_retry_time => timer:seconds(5),
+        failure_strategy => force
+    },
+    #{client => Client, rate => 10, initial => 0, capacity => 10}.
+
+add_bucket() ->
+    emqx_limiter_server:add_bucket(?MODULE, message_routing, make_limiter_cfg()).
 
-limiter_cfg() -> #{message_routing => default}.
+del_bucket() ->
+    emqx_limiter_server:del_bucket(?MODULE, message_routing).
 
 v4(Channel) ->
     ConnInfo = emqx_channel:info(conninfo, Channel),

+ 31 - 7
apps/emqx/test/emqx_connection_SUITE.erl

@@ -78,6 +78,7 @@ end_per_suite(_Config) ->
 init_per_testcase(TestCase, Config) when
     TestCase =/= t_ws_pingreq_before_connected
 ->
+    add_bucket(),
     ok = meck:expect(emqx_transport, wait, fun(Sock) -> {ok, Sock} end),
     ok = meck:expect(emqx_transport, type, fun(_Sock) -> tcp end),
     ok = meck:expect(
@@ -104,9 +105,11 @@ init_per_testcase(TestCase, Config) when
         _ -> Config
     end;
 init_per_testcase(_, Config) ->
+    add_bucket(),
     Config.
 
 end_per_testcase(TestCase, Config) ->
+    del_bucket(),
     case erlang:function_exported(?MODULE, TestCase, 2) of
         true -> ?MODULE:TestCase('end', Config);
         false -> ok
@@ -291,11 +294,6 @@ t_handle_call(_) ->
     ?assertMatch({ok, _St}, handle_msg({event, undefined}, St)),
     ?assertMatch({reply, _Info, _NSt}, handle_call(self(), info, St)),
     ?assertMatch({reply, _Stats, _NSt}, handle_call(self(), stats, St)),
-    ?assertMatch({reply, ok, _NSt}, handle_call(self(), {ratelimit, []}, St)),
-    ?assertMatch(
-        {reply, ok, _NSt},
-        handle_call(self(), {ratelimit, [{bytes_in, default}]}, St)
-    ),
     ?assertEqual({reply, ignored, St}, handle_call(self(), for_testing, St)),
     ?assertMatch(
         {stop, {shutdown, kicked}, ok, _NSt},
@@ -704,7 +702,33 @@ handle_msg(Msg, St) -> emqx_connection:handle_msg(Msg, St).
 
 handle_call(Pid, Call, St) -> emqx_connection:handle_call(Pid, Call, St).
 
-limiter_cfg() -> #{}.
+-define(LIMITER_ID, 'tcp:default').
 
 init_limiter() ->
-    emqx_limiter_container:get_limiter_by_names([bytes_in, message_in], limiter_cfg()).
+    emqx_limiter_container:get_limiter_by_types(?LIMITER_ID, [bytes_in, message_in], limiter_cfg()).
+
+limiter_cfg() ->
+    Cfg = make_limiter_cfg(),
+    #{bytes_in => Cfg, message_in => Cfg}.
+
+make_limiter_cfg() ->
+    Infinity = emqx_limiter_schema:infinity_value(),
+    Client = #{
+        rate => Infinity,
+        initial => 0,
+        capacity => Infinity,
+        low_watermark => 1,
+        divisible => false,
+        max_retry_time => timer:seconds(5),
+        failure_strategy => force
+    },
+    #{client => Client, rate => Infinity, initial => 0, capacity => Infinity}.
+
+add_bucket() ->
+    Cfg = make_limiter_cfg(),
+    emqx_limiter_server:add_bucket(?LIMITER_ID, bytes_in, Cfg),
+    emqx_limiter_server:add_bucket(?LIMITER_ID, message_in, Cfg).
+
+del_bucket() ->
+    emqx_limiter_server:del_bucket(?LIMITER_ID, bytes_in),
+    emqx_limiter_server:del_bucket(?LIMITER_ID, message_in).

+ 173 - 158
apps/emqx/test/emqx_ratelimiter_SUITE.erl

@@ -24,48 +24,7 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
--define(BASE_CONF, <<
-    ""
-    "\n"
-    "limiter {\n"
-    "  bytes_in {\n"
-    "    bucket.default {\n"
-    "      rate = infinity\n"
-    "      capacity = infinity\n"
-    "    }\n"
-    "  }\n"
-    "\n"
-    "  message_in {\n"
-    "    bucket.default {\n"
-    "      rate = infinity\n"
-    "      capacity = infinity\n"
-    "    }\n"
-    "  }\n"
-    "\n"
-    "  connection {\n"
-    "    bucket.default {\n"
-    "      rate = infinity\n"
-    "      capacity = infinity\n"
-    "    }\n"
-    "  }\n"
-    "\n"
-    "  message_routing {\n"
-    "    bucket.default {\n"
-    "      rate = infinity\n"
-    "      capacity = infinity\n"
-    "    }\n"
-    "  }\n"
-    "\n"
-    "  batch {\n"
-    "    bucket.retainer {\n"
-    "      rate = infinity\n"
-    "      capacity = infinity\n"
-    "    }\n"
-    "  }\n"
-    "}\n"
-    "\n"
-    ""
->>).
+-define(BASE_CONF, <<"">>).
 
 -record(client, {
     counter :: counters:counter_ref(),
@@ -97,6 +56,9 @@ end_per_suite(_Config) ->
 init_per_testcase(_TestCase, Config) ->
     Config.
 
+end_per_testcase(_TestCase, Config) ->
+    Config.
+
 load_conf() ->
     emqx_common_test_helpers:load_config(emqx_limiter_schema, ?BASE_CONF).
 
@@ -116,12 +78,12 @@ t_consume(_) ->
             failure_strategy := force
         }
     end,
-    Case = fun() ->
-        Client = connect(default),
+    Case = fun(BucketCfg) ->
+        Client = connect(BucketCfg),
         {ok, L2} = emqx_htb_limiter:consume(50, Client),
         {ok, _L3} = emqx_htb_limiter:consume(150, L2)
     end,
-    with_per_client(default, Cfg, Case).
+    with_per_client(Cfg, Case).
 
 t_retry(_) ->
     Cfg = fun(Cfg) ->
@@ -133,15 +95,15 @@ t_retry(_) ->
             failure_strategy := force
         }
     end,
-    Case = fun() ->
-        Client = connect(default),
-        {ok, Client} = emqx_htb_limiter:retry(Client),
-        {_, _, Retry, L2} = emqx_htb_limiter:check(150, Client),
+    Case = fun(BucketCfg) ->
+        Client = connect(BucketCfg),
+        {ok, Client2} = emqx_htb_limiter:retry(Client),
+        {_, _, Retry, L2} = emqx_htb_limiter:check(150, Client2),
         L3 = emqx_htb_limiter:set_retry(Retry, L2),
         timer:sleep(500),
         {ok, _L4} = emqx_htb_limiter:retry(L3)
     end,
-    with_per_client(default, Cfg, Case).
+    with_per_client(Cfg, Case).
 
 t_restore(_) ->
     Cfg = fun(Cfg) ->
@@ -153,15 +115,15 @@ t_restore(_) ->
             failure_strategy := force
         }
     end,
-    Case = fun() ->
-        Client = connect(default),
+    Case = fun(BucketCfg) ->
+        Client = connect(BucketCfg),
         {_, _, Retry, L2} = emqx_htb_limiter:check(150, Client),
         timer:sleep(200),
         {ok, L3} = emqx_htb_limiter:check(Retry, L2),
         Avaiable = emqx_htb_limiter:available(L3),
         ?assert(Avaiable >= 50)
     end,
-    with_per_client(default, Cfg, Case).
+    with_per_client(Cfg, Case).
 
 t_max_retry_time(_) ->
     Cfg = fun(Cfg) ->
@@ -172,15 +134,15 @@ t_max_retry_time(_) ->
             failure_strategy := drop
         }
     end,
-    Case = fun() ->
-        Client = connect(default),
+    Case = fun(BucketCfg) ->
+        Client = connect(BucketCfg),
         Begin = ?NOW,
         Result = emqx_htb_limiter:consume(101, Client),
         ?assertMatch({drop, _}, Result),
         Time = ?NOW - Begin,
         ?assert(Time >= 500 andalso Time < 550)
     end,
-    with_per_client(default, Cfg, Case).
+    with_per_client(Cfg, Case).
 
 t_divisible(_) ->
     Cfg = fun(Cfg) ->
@@ -191,8 +153,8 @@ t_divisible(_) ->
             capacity := 600
         }
     end,
-    Case = fun() ->
-        Client = connect(default),
+    Case = fun(BucketCfg) ->
+        Client = connect(BucketCfg),
         Result = emqx_htb_limiter:check(1000, Client),
         ?assertMatch(
             {partial, 400,
@@ -206,7 +168,7 @@ t_divisible(_) ->
             Result
         )
     end,
-    with_per_client(default, Cfg, Case).
+    with_per_client(Cfg, Case).
 
 t_low_watermark(_) ->
     Cfg = fun(Cfg) ->
@@ -217,8 +179,8 @@ t_low_watermark(_) ->
             capacity := 1000
         }
     end,
-    Case = fun() ->
-        Client = connect(default),
+    Case = fun(BucketCfg) ->
+        Client = connect(BucketCfg),
         Result = emqx_htb_limiter:check(500, Client),
         ?assertMatch({ok, _}, Result),
         {_, Client2} = Result,
@@ -233,28 +195,21 @@ t_low_watermark(_) ->
             Result2
         )
     end,
-    with_per_client(default, Cfg, Case).
+    with_per_client(Cfg, Case).
 
 t_infinity_client(_) ->
-    Fun = fun(#{per_client := Cli} = Bucket) ->
-        Bucket2 = Bucket#{
-            rate := infinity,
-            capacity := infinity
-        },
-        Cli2 = Cli#{rate := infinity, capacity := infinity},
-        Bucket2#{per_client := Cli2}
-    end,
-    Case = fun() ->
-        Client = connect(default),
+    Fun = fun(Cfg) -> Cfg end,
+    Case = fun(Cfg) ->
+        Client = connect(Cfg),
         InfVal = emqx_limiter_schema:infinity_value(),
         ?assertMatch(#{bucket := #{rate := InfVal}}, Client),
         Result = emqx_htb_limiter:check(100000, Client),
         ?assertEqual({ok, Client}, Result)
     end,
-    with_bucket(default, Fun, Case).
+    with_per_client(Fun, Case).
 
 t_try_restore_agg(_) ->
-    Fun = fun(#{per_client := Cli} = Bucket) ->
+    Fun = fun(#{client := Cli} = Bucket) ->
         Bucket2 = Bucket#{
             rate := 1,
             capacity := 200,
@@ -267,20 +222,20 @@ t_try_restore_agg(_) ->
             max_retry_time := 100,
             failure_strategy := force
         },
-        Bucket2#{per_client := Cli2}
+        Bucket2#{client := Cli2}
     end,
-    Case = fun() ->
-        Client = connect(default),
+    Case = fun(Cfg) ->
+        Client = connect(Cfg),
         {_, _, Retry, L2} = emqx_htb_limiter:check(150, Client),
         timer:sleep(200),
         {ok, L3} = emqx_htb_limiter:check(Retry, L2),
         Avaiable = emqx_htb_limiter:available(L3),
         ?assert(Avaiable >= 50)
     end,
-    with_bucket(default, Fun, Case).
+    with_bucket(Fun, Case).
 
 t_short_board(_) ->
-    Fun = fun(#{per_client := Cli} = Bucket) ->
+    Fun = fun(#{client := Cli} = Bucket) ->
         Bucket2 = Bucket#{
             rate := ?RATE("100/1s"),
             initial := 0,
@@ -291,18 +246,18 @@ t_short_board(_) ->
             capacity := 600,
             initial := 600
         },
-        Bucket2#{per_client := Cli2}
+        Bucket2#{client := Cli2}
     end,
-    Case = fun() ->
+    Case = fun(Cfg) ->
         Counter = counters:new(1, []),
-        start_client(default, ?NOW + 2000, Counter, 20),
+        start_client(Cfg, ?NOW + 2000, Counter, 20),
         timer:sleep(2100),
         check_average_rate(Counter, 2, 100)
     end,
-    with_bucket(default, Fun, Case).
+    with_bucket(Fun, Case).
 
 t_rate(_) ->
-    Fun = fun(#{per_client := Cli} = Bucket) ->
+    Fun = fun(#{client := Cli} = Bucket) ->
         Bucket2 = Bucket#{
             rate := ?RATE("100/100ms"),
             initial := 0,
@@ -313,10 +268,10 @@ t_rate(_) ->
             capacity := infinity,
             initial := 0
         },
-        Bucket2#{per_client := Cli2}
+        Bucket2#{client := Cli2}
     end,
-    Case = fun() ->
-        Client = connect(default),
+    Case = fun(Cfg) ->
+        Client = connect(Cfg),
         Ts1 = erlang:system_time(millisecond),
         C1 = emqx_htb_limiter:available(Client),
         timer:sleep(1000),
@@ -326,11 +281,11 @@ t_rate(_) ->
         Inc = C2 - C1,
         ?assert(in_range(Inc, ShouldInc - 100, ShouldInc + 100), "test bucket rate")
     end,
-    with_bucket(default, Fun, Case).
+    with_bucket(Fun, Case).
 
 t_capacity(_) ->
     Capacity = 600,
-    Fun = fun(#{per_client := Cli} = Bucket) ->
+    Fun = fun(#{client := Cli} = Bucket) ->
         Bucket2 = Bucket#{
             rate := ?RATE("100/100ms"),
             initial := 0,
@@ -341,15 +296,15 @@ t_capacity(_) ->
             capacity := infinity,
             initial := 0
         },
-        Bucket2#{per_client := Cli2}
+        Bucket2#{client := Cli2}
     end,
-    Case = fun() ->
-        Client = connect(default),
+    Case = fun(Cfg) ->
+        Client = connect(Cfg),
         timer:sleep(1000),
         C1 = emqx_htb_limiter:available(Client),
         ?assertEqual(Capacity, C1, "test bucket capacity")
     end,
-    with_bucket(default, Fun, Case).
+    with_bucket(Fun, Case).
 
 %%--------------------------------------------------------------------
 %% Test Cases Global Level
@@ -359,7 +314,7 @@ t_collaborative_alloc(_) ->
         Cfg#{rate := ?RATE("600/1s")}
     end,
 
-    Bucket1 = fun(#{per_client := Cli} = Bucket) ->
+    Bucket1 = fun(#{client := Cli} = Bucket) ->
         Bucket2 = Bucket#{
             rate := ?RATE("400/1s"),
             initial := 0,
@@ -370,7 +325,7 @@ t_collaborative_alloc(_) ->
             capacity := 100,
             initial := 100
         },
-        Bucket2#{per_client := Cli2}
+        Bucket2#{client := Cli2}
     end,
 
     Bucket2 = fun(Bucket) ->
@@ -381,8 +336,8 @@ t_collaborative_alloc(_) ->
     Case = fun() ->
         C1 = counters:new(1, []),
         C2 = counters:new(1, []),
-        start_client(b1, ?NOW + 2000, C1, 20),
-        start_client(b2, ?NOW + 2000, C2, 30),
+        start_client({b1, Bucket1}, ?NOW + 2000, C1, 20),
+        start_client({b2, Bucket2}, ?NOW + 2000, C2, 30),
         timer:sleep(2100),
         check_average_rate(C1, 2, 300),
         check_average_rate(C2, 2, 300)
@@ -402,7 +357,7 @@ t_burst(_) ->
         }
     end,
 
-    Bucket = fun(#{per_client := Cli} = Bucket) ->
+    Bucket = fun(#{client := Cli} = Bucket) ->
         Bucket2 = Bucket#{
             rate := ?RATE("200/1s"),
             initial := 0,
@@ -413,16 +368,16 @@ t_burst(_) ->
             capacity := 200,
             divisible := true
         },
-        Bucket2#{per_client := Cli2}
+        Bucket2#{client := Cli2}
     end,
 
     Case = fun() ->
         C1 = counters:new(1, []),
         C2 = counters:new(1, []),
         C3 = counters:new(1, []),
-        start_client(b1, ?NOW + 2000, C1, 20),
-        start_client(b2, ?NOW + 2000, C2, 30),
-        start_client(b3, ?NOW + 2000, C3, 30),
+        start_client({b1, Bucket}, ?NOW + 2000, C1, 20),
+        start_client({b2, Bucket}, ?NOW + 2000, C2, 30),
+        start_client({b3, Bucket}, ?NOW + 2000, C3, 30),
         timer:sleep(2100),
 
         Total = lists:sum([counters:get(X, 1) || X <- [C1, C2, C3]]),
@@ -440,7 +395,7 @@ t_limit_global_with_unlimit_other(_) ->
         Cfg#{rate := ?RATE("600/1s")}
     end,
 
-    Bucket = fun(#{per_client := Cli} = Bucket) ->
+    Bucket = fun(#{client := Cli} = Bucket) ->
         Bucket2 = Bucket#{
             rate := infinity,
             initial := 0,
@@ -451,12 +406,12 @@ t_limit_global_with_unlimit_other(_) ->
             capacity := infinity,
             initial := 0
         },
-        Bucket2#{per_client := Cli2}
+        Bucket2#{client := Cli2}
     end,
 
     Case = fun() ->
         C1 = counters:new(1, []),
-        start_client(b1, ?NOW + 2000, C1, 20),
+        start_client({b1, Bucket}, ?NOW + 2000, C1, 20),
         timer:sleep(2100),
         check_average_rate(C1, 2, 600)
     end,
@@ -470,28 +425,6 @@ t_limit_global_with_unlimit_other(_) ->
 %%--------------------------------------------------------------------
 %% Test Cases container
 %%--------------------------------------------------------------------
-t_new_container(_) ->
-    C1 = emqx_limiter_container:new(),
-    C2 = emqx_limiter_container:new([message_routing]),
-    C3 = emqx_limiter_container:update_by_name(message_routing, default, C1),
-    ?assertMatch(
-        #{
-            message_routing := _,
-            retry_ctx := undefined,
-            {retry, message_routing} := _
-        },
-        C2
-    ),
-    ?assertMatch(
-        #{
-            message_routing := _,
-            retry_ctx := undefined,
-            {retry, message_routing} := _
-        },
-        C3
-    ),
-    ok.
-
 t_check_container(_) ->
     Cfg = fun(Cfg) ->
         Cfg#{
@@ -500,10 +433,11 @@ t_check_container(_) ->
             capacity := 1000
         }
     end,
-    Case = fun() ->
-        C1 = emqx_limiter_container:new(
+    Case = fun(BucketCfg) ->
+        C1 = emqx_limiter_container:get_limiter_by_types(
+            ?MODULE,
             [message_routing],
-            #{message_routing => default}
+            #{message_routing => BucketCfg}
         ),
         {ok, C2} = emqx_limiter_container:check(1000, message_routing, C1),
         {pause, Pause, C3} = emqx_limiter_container:check(1000, message_routing, C2),
@@ -514,7 +448,39 @@ t_check_container(_) ->
         RetryData = emqx_limiter_container:get_retry_context(C5),
         ?assertEqual(Context, RetryData)
     end,
-    with_per_client(default, Cfg, Case).
+    with_per_client(Cfg, Case).
+
+%%--------------------------------------------------------------------
+%% Test Override
+%%--------------------------------------------------------------------
+t_bucket_no_client(_) ->
+    Rate = ?RATE("1/s"),
+    GlobalMod = fun(#{client := Client} = Cfg) ->
+        Cfg#{client := Client#{rate := Rate}}
+    end,
+    BucketMod = fun(Bucket) ->
+        maps:remove(client, Bucket)
+    end,
+    Case = fun() ->
+        Limiter = connect(BucketMod(make_limiter_cfg())),
+        ?assertMatch(#{rate := Rate}, Limiter)
+    end,
+    with_global(GlobalMod, [BucketMod], Case).
+
+t_bucket_client(_) ->
+    GlobalRate = ?RATE("1/s"),
+    BucketRate = ?RATE("10/s"),
+    GlobalMod = fun(#{client := Client} = Cfg) ->
+        Cfg#{client := Client#{rate := GlobalRate}}
+    end,
+    BucketMod = fun(#{client := Client} = Bucket) ->
+        Bucket#{client := Client#{rate := BucketRate}}
+    end,
+    Case = fun() ->
+        Limiter = connect(BucketMod(make_limiter_cfg())),
+        ?assertMatch(#{rate := BucketRate}, Limiter)
+    end,
+    with_global(GlobalMod, [BucketMod], Case).
 
 %%--------------------------------------------------------------------
 %% Test Cases misc
@@ -607,19 +573,23 @@ t_schema_unit(_) ->
 %%--------------------------------------------------------------------
 %%% Internal functions
 %%--------------------------------------------------------------------
-start_client(Name, EndTime, Counter, Number) ->
+start_client(Cfg, EndTime, Counter, Number) ->
     lists:foreach(
         fun(_) ->
             spawn(fun() ->
-                start_client(Name, EndTime, Counter)
+                do_start_client(Cfg, EndTime, Counter)
             end)
         end,
         lists:seq(1, Number)
     ).
 
-start_client(Name, EndTime, Counter) ->
-    #{per_client := PerClient} =
-        emqx_config:get([limiter, message_routing, bucket, Name]),
+do_start_client({Name, CfgFun}, EndTime, Counter) ->
+    do_start_client(Name, CfgFun(make_limiter_cfg()), EndTime, Counter);
+do_start_client(Cfg, EndTime, Counter) ->
+    do_start_client(?MODULE, Cfg, EndTime, Counter).
+
+do_start_client(Name, Cfg, EndTime, Counter) ->
+    #{client := PerClient} = Cfg,
     #{rate := Rate} = PerClient,
     Client = #client{
         start = ?NOW,
@@ -627,7 +597,7 @@ start_client(Name, EndTime, Counter) ->
         counter = Counter,
         obtained = 0,
         rate = Rate,
-        client = connect(Name)
+        client = connect(Name, Cfg)
     },
     client_loop(Client).
 
@@ -711,35 +681,50 @@ to_rate(Str) ->
     {ok, Rate} = emqx_limiter_schema:to_rate(Str),
     Rate.
 
-with_global(Modifier, BuckeTemps, Case) ->
-    Fun = fun(Cfg) ->
-        #{bucket := #{default := BucketCfg}} = Cfg2 = Modifier(Cfg),
-        Fun = fun({Name, BMod}, Acc) ->
-            Acc#{Name => BMod(BucketCfg)}
-        end,
-        Buckets = lists:foldl(Fun, #{}, BuckeTemps),
-        Cfg2#{bucket := Buckets}
-    end,
-
-    with_config([limiter, message_routing], Fun, Case).
+with_global(Modifier, Buckets, Case) ->
+    with_config([limiter, message_routing], Modifier, Buckets, Case).
 
-with_bucket(Bucket, Modifier, Case) ->
-    Path = [limiter, message_routing, bucket, Bucket],
-    with_config(Path, Modifier, Case).
+with_bucket(Modifier, Case) ->
+    Cfg = Modifier(make_limiter_cfg()),
+    add_bucket(Cfg),
+    Case(Cfg),
+    del_bucket().
 
-with_per_client(Bucket, Modifier, Case) ->
-    Path = [limiter, message_routing, bucket, Bucket, per_client],
-    with_config(Path, Modifier, Case).
+with_per_client(Modifier, Case) ->
+    #{client := Client} = Cfg = make_limiter_cfg(),
+    Cfg2 = Cfg#{client := Modifier(Client)},
+    add_bucket(Cfg2),
+    Case(Cfg2),
+    del_bucket().
 
-with_config(Path, Modifier, Case) ->
+with_config(Path, Modifier, Buckets, Case) ->
     Cfg = emqx_config:get(Path),
     NewCfg = Modifier(Cfg),
-    ct:pal("test with config:~p~n", [NewCfg]),
     emqx_config:put(Path, NewCfg),
     emqx_limiter_server:restart(message_routing),
     timer:sleep(500),
+    BucketCfg = make_limiter_cfg(),
+    lists:foreach(
+        fun
+            ({Name, BucketFun}) ->
+                add_bucket(Name, BucketFun(BucketCfg));
+            (BucketFun) ->
+                add_bucket(BucketFun(BucketCfg))
+        end,
+        Buckets
+    ),
     DelayReturn = delay_return(Case),
+    lists:foreach(
+        fun
+            ({Name, _Cfg}) ->
+                del_bucket(Name);
+            (_Cfg) ->
+                del_bucket()
+        end,
+        Buckets
+    ),
     emqx_config:put(Path, Cfg),
+    emqx_limiter_server:restart(message_routing),
     DelayReturn().
 
 delay_return(Case) ->
@@ -751,10 +736,40 @@ delay_return(Case) ->
             fun() -> erlang:raise(Type, Reason, Trace) end
     end.
 
-connect(Name) ->
-    {ok, Limiter} = emqx_limiter_server:connect(message_routing, Name),
+connect({Name, CfgFun}) ->
+    connect(Name, CfgFun(make_limiter_cfg()));
+connect(Cfg) ->
+    connect(?MODULE, Cfg).
+
+connect(Name, Cfg) ->
+    {ok, Limiter} = emqx_limiter_server:connect(Name, message_routing, Cfg),
     Limiter.
 
+make_limiter_cfg() ->
+    Infinity = emqx_limiter_schema:infinity_value(),
+    Client = #{
+        rate => Infinity,
+        initial => 0,
+        capacity => Infinity,
+        low_watermark => 0,
+        divisible => false,
+        max_retry_time => timer:seconds(5),
+        failure_strategy => force
+    },
+    #{client => Client, rate => Infinity, initial => 0, capacity => Infinity}.
+
+add_bucket(Cfg) ->
+    add_bucket(?MODULE, Cfg).
+
+add_bucket(Name, Cfg) ->
+    emqx_limiter_server:add_bucket(Name, message_routing, Cfg).
+
+del_bucket() ->
+    del_bucket(?MODULE).
+
+del_bucket(Name) ->
+    emqx_limiter_server:del_bucket(Name, message_routing).
+
 check_average_rate(Counter, Second, Rate) ->
     Cost = counters:get(Counter, 1),
     PerSec = Cost / Second,

+ 46 - 15
apps/emqx/test/emqx_ws_connection_SUITE.erl

@@ -59,6 +59,7 @@ init_per_testcase(TestCase, Config) when
     TestCase =/= t_ws_pingreq_before_connected,
     TestCase =/= t_ws_non_check_origin
 ->
+    add_bucket(),
     %% Meck Cm
     ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
     ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
@@ -96,6 +97,7 @@ init_per_testcase(TestCase, Config) when
         | Config
     ];
 init_per_testcase(t_ws_non_check_origin, Config) ->
+    add_bucket(),
     ok = emqx_common_test_helpers:start_apps([]),
     PrevConfig = emqx_config:get_listener_conf(ws, default, [websocket]),
     emqx_config:put_listener_conf(ws, default, [websocket, check_origin_enable], false),
@@ -105,6 +107,7 @@ init_per_testcase(t_ws_non_check_origin, Config) ->
         | Config
     ];
 init_per_testcase(_, Config) ->
+    add_bucket(),
     PrevConfig = emqx_config:get_listener_conf(ws, default, [websocket]),
     ok = emqx_common_test_helpers:start_apps([]),
     [
@@ -119,6 +122,7 @@ end_per_testcase(TestCase, _Config) when
     TestCase =/= t_ws_non_check_origin,
     TestCase =/= t_ws_pingreq_before_connected
 ->
+    del_bucket(),
     lists:foreach(
         fun meck:unload/1,
         [
@@ -131,11 +135,13 @@ end_per_testcase(TestCase, _Config) when
         ]
     );
 end_per_testcase(t_ws_non_check_origin, Config) ->
+    del_bucket(),
     PrevConfig = ?config(prev_config, Config),
     emqx_config:put_listener_conf(ws, default, [websocket], PrevConfig),
     emqx_common_test_helpers:stop_apps([]),
     ok;
 end_per_testcase(_, Config) ->
+    del_bucket(),
     PrevConfig = ?config(prev_config, Config),
     emqx_config:put_listener_conf(ws, default, [websocket], PrevConfig),
     emqx_common_test_helpers:stop_apps([]),
@@ -501,15 +507,11 @@ t_handle_timeout_emit_stats(_) ->
     ?assertEqual(undefined, ?ws_conn:info(stats_timer, St)).
 
 t_ensure_rate_limit(_) ->
-    %% XXX In the future, limiter should provide API for config update
-    Path = [limiter, bytes_in, bucket, default, per_client],
-    PerClient = emqx_config:get(Path),
     {ok, Rate} = emqx_limiter_schema:to_rate("50MB"),
-    emqx_config:put(Path, PerClient#{rate := Rate}),
-    emqx_limiter_server:restart(bytes_in),
-    timer:sleep(100),
-
-    Limiter = init_limiter(),
+    Limiter = init_limiter(#{
+        bytes_in => make_limiter_cfg(Rate),
+        message_in => make_limiter_cfg()
+    }),
     St = st(#{limiter => Limiter}),
 
     %% must bigger than value in emqx_ratelimit_SUITE
@@ -522,11 +524,7 @@ t_ensure_rate_limit(_) ->
         St
     ),
     ?assertEqual(blocked, ?ws_conn:info(sockstate, St1)),
-    ?assertEqual([{active, false}], ?ws_conn:info(postponed, St1)),
-
-    emqx_config:put(Path, PerClient),
-    emqx_limiter_server:restart(bytes_in),
-    timer:sleep(100).
+    ?assertEqual([{active, false}], ?ws_conn:info(postponed, St1)).
 
 t_parse_incoming(_) ->
     {Packets, St} = ?ws_conn:parse_incoming(<<48, 3>>, [], st()),
@@ -691,7 +689,40 @@ ws_client(State) ->
         ct:fail(ws_timeout)
     end.
 
-limiter_cfg() -> #{bytes_in => default, message_in => default}.
+-define(LIMITER_ID, 'ws:default').
 
 init_limiter() ->
-    emqx_limiter_container:get_limiter_by_names([bytes_in, message_in], limiter_cfg()).
+    init_limiter(limiter_cfg()).
+
+init_limiter(LimiterCfg) ->
+    emqx_limiter_container:get_limiter_by_types(?LIMITER_ID, [bytes_in, message_in], LimiterCfg).
+
+limiter_cfg() ->
+    Cfg = make_limiter_cfg(),
+    #{bytes_in => Cfg, message_in => Cfg}.
+
+make_limiter_cfg() ->
+    Infinity = emqx_limiter_schema:infinity_value(),
+    make_limiter_cfg(Infinity).
+
+make_limiter_cfg(ClientRate) ->
+    Infinity = emqx_limiter_schema:infinity_value(),
+    Client = #{
+        rate => ClientRate,
+        initial => 0,
+        capacity => Infinity,
+        low_watermark => 1,
+        divisible => false,
+        max_retry_time => timer:seconds(5),
+        failure_strategy => force
+    },
+    #{client => Client, rate => Infinity, initial => 0, capacity => Infinity}.
+
+add_bucket() ->
+    Cfg = make_limiter_cfg(),
+    emqx_limiter_server:add_bucket(?LIMITER_ID, bytes_in, Cfg),
+    emqx_limiter_server:add_bucket(?LIMITER_ID, message_in, Cfg).
+
+del_bucket() ->
+    emqx_limiter_server:del_bucket(?LIMITER_ID, bytes_in),
+    emqx_limiter_server:del_bucket(?LIMITER_ID, message_in).

+ 6 - 1
apps/emqx_retainer/src/emqx_retainer.erl

@@ -348,12 +348,16 @@ enable_retainer(
     #{context_id := ContextId} = State,
     #{
         msg_clear_interval := ClearInterval,
-        backend := BackendCfg
+        backend := BackendCfg,
+        flow_control := FlowControl
     }
 ) ->
     NewContextId = ContextId + 1,
     Context = create_resource(new_context(NewContextId), BackendCfg),
     load(Context),
+    emqx_limiter_server:add_bucket(
+        ?APP, internal, maps:get(batch_deliver_limiter, FlowControl, undefined)
+    ),
     State#{
         enable := true,
         context_id := NewContextId,
@@ -369,6 +373,7 @@ disable_retainer(
     } = State
 ) ->
     unload(),
+    emqx_limiter_server:del_bucket(?APP, internal),
     ok = close_resource(Context),
     State#{
         enable := false,

+ 4 - 4
apps/emqx_retainer/src/emqx_retainer_dispatcher.erl

@@ -115,8 +115,8 @@ start_link(Pool, Id) ->
 init([Pool, Id]) ->
     erlang:process_flag(trap_exit, true),
     true = gproc_pool:connect_worker(Pool, {Pool, Id}),
-    BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter], undefined),
-    {ok, Limiter} = emqx_limiter_server:connect(batch, BucketName),
+    BucketCfg = emqx:get_config([retainer, flow_control, batch_deliver_limiter], undefined),
+    {ok, Limiter} = emqx_limiter_server:connect(?APP, internal, BucketCfg),
     {ok, #{pool => Pool, id => Id, limiter => Limiter}}.
 
 %%--------------------------------------------------------------------
@@ -155,8 +155,8 @@ handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) ->
     {ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter),
     {noreply, State#{limiter := Limiter2}};
 handle_cast({refresh_limiter, Conf}, State) ->
-    BucketName = emqx_map_lib:deep_get([flow_control, batch_deliver_limiter], Conf, undefined),
-    {ok, Limiter} = emqx_limiter_server:connect(batch, BucketName),
+    BucketCfg = emqx_map_lib:deep_get([flow_control, batch_deliver_limiter], Conf, undefined),
+    {ok, Limiter} = emqx_limiter_server:connect(?APP, internal, BucketCfg),
     {noreply, State#{limiter := Limiter}};
 handle_cast(Msg, State) ->
     ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),

+ 1 - 1
apps/emqx_retainer/src/emqx_retainer_schema.erl

@@ -86,7 +86,7 @@ fields(flow_control) ->
             )},
         {batch_deliver_limiter,
             sc(
-                emqx_limiter_schema:bucket_name(),
+                ?R_REF(emqx_limiter_schema, bucket_opts),
                 batch_deliver_limiter,
                 undefined
             )}

+ 43 - 23
apps/emqx_retainer/test/emqx_retainer_SUITE.erl

@@ -368,27 +368,16 @@ t_stop_publish_clear_msg(_) ->
     ok = emqtt:disconnect(C1).
 
 t_flow_control(_) ->
-    #{per_client := PerClient} = RetainerCfg = emqx_config:get([limiter, batch, bucket, retainer]),
-    RetainerCfg2 = RetainerCfg#{
-        per_client :=
-            PerClient#{
-                rate := emqx_ratelimiter_SUITE:to_rate("1/1s"),
-                capacity := 1
-            }
-    },
-    emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg2),
-    emqx_limiter_manager:restart_server(batch),
-    timer:sleep(500),
-
-    emqx_retainer_dispatcher:refresh_limiter(),
-    timer:sleep(500),
-
+    Rate = emqx_ratelimiter_SUITE:to_rate("1/1s"),
+    LimiterCfg = make_limiter_cfg(Rate),
+    JsonCfg = make_limiter_json(<<"1/1s">>),
+    emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg),
     emqx_retainer:update_config(#{
         <<"flow_control">> =>
             #{
                 <<"batch_read_number">> => 1,
                 <<"batch_deliver_number">> => 1,
-                <<"batch_deliver_limiter">> => retainer
+                <<"batch_deliver_limiter">> => JsonCfg
             }
     }),
     {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
@@ -424,13 +413,14 @@ t_flow_control(_) ->
 
     ok = emqtt:disconnect(C1),
 
-    %% recover the limiter
-    emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg),
-    emqx_limiter_manager:restart_server(batch),
-    timer:sleep(500),
-
-    emqx_retainer_dispatcher:refresh_limiter(),
-    timer:sleep(500),
+    emqx_limiter_server:del_bucket(emqx_retainer, internal),
+    emqx_retainer:update_config(#{
+        <<"flow_control">> =>
+            #{
+                <<"batch_read_number">> => 1,
+                <<"batch_deliver_number">> => 1
+            }
+    }),
     ok.
 
 t_clear_expired(_) ->
@@ -684,3 +674,33 @@ with_conf(ConfMod, Case) ->
             emqx_retainer:update_config(Conf),
             erlang:raise(Type, Error, Strace)
     end.
+
+make_limiter_cfg(Rate) ->
+    Infinity = emqx_limiter_schema:infinity_value(),
+    Client = #{
+        rate => Rate,
+        initial => 0,
+        capacity => Infinity,
+        low_watermark => 1,
+        divisible => false,
+        max_retry_time => timer:seconds(5),
+        failure_strategy => force
+    },
+    #{client => Client, rate => Infinity, initial => 0, capacity => Infinity}.
+
+make_limiter_json(Rate) ->
+    Client = #{
+        <<"rate">> => Rate,
+        <<"initial">> => 0,
+        <<"capacity">> => <<"infinity">>,
+        <<"low_watermark">> => 0,
+        <<"divisible">> => <<"false">>,
+        <<"max_retry_time">> => <<"5s">>,
+        <<"failure_strategy">> => <<"force">>
+    },
+    #{
+        <<"client">> => Client,
+        <<"rate">> => <<"infinity">>,
+        <<"initial">> => 0,
+        <<"capacity">> => <<"infinity">>
+    }.