Просмотр исходного кода

perf(limiter): simplify the memory represent of limiter configuration

firest 2 лет назад
Родитель
Сommit
4f47e65b7b

+ 14 - 15
apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl

@@ -22,7 +22,7 @@
 
 
 %% API
 %% API
 -export([
 -export([
-    make_token_bucket_limiter/2,
+    make_local_limiter/2,
     make_ref_limiter/2,
     make_ref_limiter/2,
     check/2,
     check/2,
     consume/2,
     consume/2,
@@ -32,12 +32,11 @@
     make_future/1,
     make_future/1,
     available/1
     available/1
 ]).
 ]).
--export_type([token_bucket_limiter/0]).
+-export_type([local_limiter/0]).
 
 
-%% a token bucket limiter with a limiter server's bucket reference
-
-%% the number of tokens currently available
--type token_bucket_limiter() :: #{
+%% a token bucket limiter which may or not contains a reference to another limiter,
+%% and can be used in a client alone
+-type local_limiter() :: #{
     tokens := non_neg_integer(),
     tokens := non_neg_integer(),
     rate := decimal(),
     rate := decimal(),
     capacity := decimal(),
     capacity := decimal(),
@@ -58,12 +57,12 @@
     retry_ctx =>
     retry_ctx =>
         undefined
         undefined
         %% the retry context
         %% the retry context
-        | retry_context(token_bucket_limiter()),
+        | retry_context(local_limiter()),
     %% allow to add other keys
     %% allow to add other keys
     atom => any()
     atom => any()
 }.
 }.
 
 
-%% a limiter server's bucket reference
+%% a limiter instance which only contains a reference to another limiter(bucket)
 -type ref_limiter() :: #{
 -type ref_limiter() :: #{
     max_retry_time := non_neg_integer(),
     max_retry_time := non_neg_integer(),
     failure_strategy := failure_strategy(),
     failure_strategy := failure_strategy(),
@@ -88,7 +87,7 @@
 }.
 }.
 
 
 -type bucket() :: emqx_limiter_bucket_ref:bucket_ref().
 -type bucket() :: emqx_limiter_bucket_ref:bucket_ref().
--type limiter() :: token_bucket_limiter() | ref_limiter() | infinity.
+-type limiter() :: local_limiter() | ref_limiter() | infinity.
 -type millisecond() :: non_neg_integer().
 -type millisecond() :: non_neg_integer().
 
 
 -type pause_type() :: pause | partial.
 -type pause_type() :: pause | partial.
@@ -116,7 +115,7 @@
     rate := decimal(),
     rate := decimal(),
     initial := non_neg_integer(),
     initial := non_neg_integer(),
     low_watermark := non_neg_integer(),
     low_watermark := non_neg_integer(),
-    capacity := decimal(),
+    burst := decimal(),
     divisible := boolean(),
     divisible := boolean(),
     max_retry_time := non_neg_integer(),
     max_retry_time := non_neg_integer(),
     failure_strategy := failure_strategy()
     failure_strategy := failure_strategy()
@@ -134,8 +133,8 @@
 %%  API
 %%  API
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %%@doc create a limiter
 %%@doc create a limiter
--spec make_token_bucket_limiter(limiter_bucket_cfg(), bucket()) -> _.
-make_token_bucket_limiter(Cfg, Bucket) ->
+-spec make_local_limiter(limiter_bucket_cfg(), bucket()) -> _.
+make_local_limiter(Cfg, Bucket) ->
     Cfg#{
     Cfg#{
         tokens => emqx_limiter_server:get_initial_val(Cfg),
         tokens => emqx_limiter_server:get_initial_val(Cfg),
         lasttime => ?NOW,
         lasttime => ?NOW,
@@ -312,8 +311,8 @@ on_failure(throw, Limiter) ->
     Message = io_lib:format("limiter consume failed, limiter:~p~n", [Limiter]),
     Message = io_lib:format("limiter consume failed, limiter:~p~n", [Limiter]),
     erlang:throw({rate_check_fail, Message}).
     erlang:throw({rate_check_fail, Message}).
 
 
--spec do_check_with_parent_limiter(pos_integer(), token_bucket_limiter()) ->
-    inner_check_result(token_bucket_limiter()).
+-spec do_check_with_parent_limiter(pos_integer(), local_limiter()) ->
+    inner_check_result(local_limiter()).
 do_check_with_parent_limiter(
 do_check_with_parent_limiter(
     Need,
     Need,
     #{
     #{
@@ -336,7 +335,7 @@ do_check_with_parent_limiter(
             )
             )
     end.
     end.
 
 
--spec do_reset(pos_integer(), token_bucket_limiter()) -> inner_check_result(token_bucket_limiter()).
+-spec do_reset(pos_integer(), local_limiter()) -> inner_check_result(local_limiter()).
 do_reset(
 do_reset(
     Need,
     Need,
     #{
     #{

+ 24 - 1
apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl

@@ -30,6 +30,12 @@
     post_config_update/5
     post_config_update/5
 ]).
 ]).
 
 
+-export([
+    find_root/1,
+    insert_root/2,
+    delete_root/1
+]).
+
 -export([
 -export([
     start_server/1,
     start_server/1,
     start_server/2,
     start_server/2,
@@ -62,6 +68,7 @@
 
 
 -define(UID(Id, Type), {Id, Type}).
 -define(UID(Id, Type), {Id, Type}).
 -define(TAB, emqx_limiter_counters).
 -define(TAB, emqx_limiter_counters).
+-define(ROOT_ID, root).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %%  API
 %%  API
@@ -104,9 +111,25 @@ insert_bucket(Id, Type, Bucket) ->
     ).
     ).
 
 
 -spec delete_bucket(limiter_id(), limiter_type()) -> true.
 -spec delete_bucket(limiter_id(), limiter_type()) -> true.
-delete_bucket(Type, Id) ->
+delete_bucket(Id, Type) ->
     ets:delete(?TAB, ?UID(Id, Type)).
     ets:delete(?TAB, ?UID(Id, Type)).
 
 
+-spec find_root(limiter_type()) ->
+    {ok, bucket_ref()} | undefined.
+find_root(Type) ->
+    find_bucket(?ROOT_ID, Type).
+
+-spec insert_root(
+    limiter_type(),
+    bucket_ref()
+) -> boolean().
+insert_root(Type, Bucket) ->
+    insert_bucket(?ROOT_ID, Type, Bucket).
+
+-spec delete_root(limiter_type()) -> true.
+delete_root(Type) ->
+    delete_bucket(?ROOT_ID, Type).
+
 post_config_update([limiter], _Config, NewConf, _OldConf, _AppEnvs) ->
 post_config_update([limiter], _Config, NewConf, _OldConf, _AppEnvs) ->
     Types = lists:delete(client, maps:keys(NewConf)),
     Types = lists:delete(client, maps:keys(NewConf)),
     _ = [on_post_config_update(Type, NewConf) || Type <- Types],
     _ = [on_post_config_update(Type, NewConf) || Type <- Types],

+ 54 - 14
apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl

@@ -32,7 +32,9 @@
     get_bucket_cfg_path/2,
     get_bucket_cfg_path/2,
     desc/1,
     desc/1,
     types/0,
     types/0,
-    calc_capacity/1
+    calc_capacity/1,
+    extract_with_type/2,
+    default_client_config/0
 ]).
 ]).
 
 
 -define(KILOBYTE, 1024).
 -define(KILOBYTE, 1024).
@@ -94,30 +96,33 @@
 namespace() -> limiter.
 namespace() -> limiter.
 
 
 roots() ->
 roots() ->
-    [{limiter, hoconsc:mk(hoconsc:ref(?MODULE, limiter), #{importance => ?IMPORTANCE_HIDDEN})}].
+    [
+        {limiter,
+            hoconsc:mk(hoconsc:ref(?MODULE, limiter), #{
+                importance => ?IMPORTANCE_HIDDEN
+            })}
+    ].
 
 
 fields(limiter) ->
 fields(limiter) ->
     [
     [
         {Type,
         {Type,
             ?HOCON(?R_REF(node_opts), #{
             ?HOCON(?R_REF(node_opts), #{
                 desc => ?DESC(Type),
                 desc => ?DESC(Type),
-                default => #{},
                 importance => ?IMPORTANCE_HIDDEN,
                 importance => ?IMPORTANCE_HIDDEN,
                 aliases => alias_of_type(Type)
                 aliases => alias_of_type(Type)
             })}
             })}
      || Type <- types()
      || Type <- types()
     ] ++
     ] ++
         [
         [
+            %% This is an undocumented feature, and it won't be support anymore
             {client,
             {client,
                 ?HOCON(
                 ?HOCON(
                     ?R_REF(client_fields),
                     ?R_REF(client_fields),
                     #{
                     #{
                         desc => ?DESC(client),
                         desc => ?DESC(client),
                         importance => ?IMPORTANCE_HIDDEN,
                         importance => ?IMPORTANCE_HIDDEN,
-                        default => maps:from_list([
-                            {erlang:atom_to_binary(Type), #{}}
-                         || Type <- types()
-                        ])
+                        required => {false, recursively},
+                        deprecated => {since, "5.0.24"}
                     }
                     }
                 )}
                 )}
         ];
         ];
@@ -131,7 +136,7 @@ fields(node_opts) ->
             })}
             })}
     ];
     ];
 fields(client_fields) ->
 fields(client_fields) ->
-    client_fields(types(), #{default => #{}});
+    client_fields(types());
 fields(bucket_opts) ->
 fields(bucket_opts) ->
     fields_of_bucket(<<"infinity">>);
     fields_of_bucket(<<"infinity">>);
 fields(client_opts) ->
 fields(client_opts) ->
@@ -194,7 +199,7 @@ fields(client_opts) ->
 fields(listener_fields) ->
 fields(listener_fields) ->
     composite_bucket_fields(?LISTENER_BUCKET_KEYS, listener_client_fields);
     composite_bucket_fields(?LISTENER_BUCKET_KEYS, listener_client_fields);
 fields(listener_client_fields) ->
 fields(listener_client_fields) ->
-    client_fields(?LISTENER_BUCKET_KEYS, #{required => false});
+    client_fields(?LISTENER_BUCKET_KEYS);
 fields(Type) ->
 fields(Type) ->
     simple_bucket_field(Type).
     simple_bucket_field(Type).
 
 
@@ -236,6 +241,31 @@ calc_capacity(#{rate := infinity}) ->
 calc_capacity(#{rate := Rate, burst := Burst}) ->
 calc_capacity(#{rate := Rate, burst := Burst}) ->
     erlang:floor(1000 * Rate / default_period()) + Burst.
     erlang:floor(1000 * Rate / default_period()) + Burst.
 
 
+extract_with_type(_Type, undefined) ->
+    undefined;
+extract_with_type(Type, #{client := ClientCfg} = BucketCfg) ->
+    BucketVal = maps:find(Type, BucketCfg),
+    ClientVal = maps:find(Type, ClientCfg),
+    merge_client_bucket(Type, ClientVal, BucketVal);
+extract_with_type(Type, BucketCfg) ->
+    BucketVal = maps:find(Type, BucketCfg),
+    merge_client_bucket(Type, undefined, BucketVal).
+
+%% Since the client configuration can be absent and be a undefined value,
+%% but we must need some basic settings to control the behaviour of the limiter,
+%% so here add this helper function to generate a default setting.
+%% This is a temporary workaround until we found a better way to simplify.
+default_client_config() ->
+    #{
+        rate => infinity,
+        initial => 0,
+        low_watermark => 0,
+        burst => 0,
+        divisible => false,
+        max_retry_time => timer:seconds(10),
+        failure_strategy => force
+    }.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Internal functions
 %% Internal functions
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -362,7 +392,7 @@ simple_bucket_field(Type) when is_atom(Type) ->
                     ?R_REF(?MODULE, client_opts),
                     ?R_REF(?MODULE, client_opts),
                     #{
                     #{
                         desc => ?DESC(client),
                         desc => ?DESC(client),
-                        required => false,
+                        required => {false, recursively},
                         importance => importance_of_type(Type),
                         importance => importance_of_type(Type),
                         aliases => alias_of_type(Type)
                         aliases => alias_of_type(Type)
                     }
                     }
@@ -375,7 +405,7 @@ composite_bucket_fields(Types, ClientRef) ->
         {Type,
         {Type,
             ?HOCON(?R_REF(?MODULE, bucket_opts), #{
             ?HOCON(?R_REF(?MODULE, bucket_opts), #{
                 desc => ?DESC(?MODULE, Type),
                 desc => ?DESC(?MODULE, Type),
-                required => false,
+                required => {false, recursively},
                 importance => importance_of_type(Type),
                 importance => importance_of_type(Type),
                 aliases => alias_of_type(Type)
                 aliases => alias_of_type(Type)
             })}
             })}
@@ -387,7 +417,7 @@ composite_bucket_fields(Types, ClientRef) ->
                     ?R_REF(?MODULE, ClientRef),
                     ?R_REF(?MODULE, ClientRef),
                     #{
                     #{
                         desc => ?DESC(client),
                         desc => ?DESC(client),
-                        required => false
+                        required => {false, recursively}
                     }
                     }
                 )}
                 )}
         ].
         ].
@@ -410,11 +440,12 @@ fields_of_bucket(Default) ->
             })}
             })}
     ].
     ].
 
 
-client_fields(Types, Meta) ->
+client_fields(Types) ->
     [
     [
         {Type,
         {Type,
-            ?HOCON(?R_REF(client_opts), Meta#{
+            ?HOCON(?R_REF(client_opts), #{
                 desc => ?DESC(Type),
                 desc => ?DESC(Type),
+                required => false,
                 importance => importance_of_type(Type),
                 importance => importance_of_type(Type),
                 aliases => alias_of_type(Type)
                 aliases => alias_of_type(Type)
             })}
             })}
@@ -436,3 +467,12 @@ alias_of_type(bytes) ->
     [bytes_in];
     [bytes_in];
 alias_of_type(_) ->
 alias_of_type(_) ->
     [].
     [].
+
+merge_client_bucket(Type, {ok, ClientVal}, {ok, BucketVal}) ->
+    #{Type => BucketVal, client => #{Type => ClientVal}};
+merge_client_bucket(Type, {ok, ClientVal}, _) ->
+    #{client => #{Type => ClientVal}};
+merge_client_bucket(Type, _, {ok, BucketVal}) ->
+    #{Type => BucketVal};
+merge_client_bucket(_, _, _) ->
+    undefined.

+ 109 - 66
apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl

@@ -59,7 +59,8 @@
     burst := rate(),
     burst := rate(),
     %% token generation interval(second)
     %% token generation interval(second)
     period := pos_integer(),
     period := pos_integer(),
-    produced := float()
+    produced := float(),
+    correction := emqx_limiter_decimal:zero_or_float()
 }.
 }.
 
 
 -type bucket() :: #{
 -type bucket() :: #{
@@ -98,6 +99,7 @@
 %% minimum coefficient for overloaded limiter
 %% minimum coefficient for overloaded limiter
 -define(OVERLOAD_MIN_ALLOC, 0.3).
 -define(OVERLOAD_MIN_ALLOC, 0.3).
 -define(COUNTER_SIZE, 8).
 -define(COUNTER_SIZE, 8).
+-define(ROOT_COUNTER_IDX, 1).
 
 
 -export_type([index/0]).
 -export_type([index/0]).
 -import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]).
 -import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]).
@@ -110,47 +112,24 @@
 -spec connect(
 -spec connect(
     limiter_id(),
     limiter_id(),
     limiter_type(),
     limiter_type(),
-    bucket_name() | #{limiter_type() => bucket_name() | undefined}
+    hocons:config() | undefined
 ) ->
 ) ->
     {ok, emqx_htb_limiter:limiter()} | {error, _}.
     {ok, emqx_htb_limiter:limiter()} | {error, _}.
-%% If no bucket path is set in config, there will be no limit
-connect(_Id, _Type, undefined) ->
-    {ok, emqx_htb_limiter:make_infinity_limiter()};
+%% undefined is the default situation, no limiter setting by default
+connect(Id, Type, undefined) ->
+    create_limiter(Id, Type, undefined, undefined);
+connect(Id, Type, #{rate := _} = Cfg) ->
+    create_limiter(Id, Type, maps:get(client, Cfg, undefined), Cfg);
 connect(Id, Type, Cfg) ->
 connect(Id, Type, Cfg) ->
-    case find_limiter_cfg(Type, Cfg) of
-        {_ClientCfg, undefined, _NodeCfg} ->
-            {ok, emqx_htb_limiter:make_infinity_limiter()};
-        {#{rate := infinity}, #{rate := infinity}, #{rate := infinity}} ->
-            {ok, emqx_htb_limiter:make_infinity_limiter()};
-        {ClientCfg, #{rate := infinity}, #{rate := infinity}} ->
-            {ok,
-                emqx_htb_limiter:make_token_bucket_limiter(
-                    ClientCfg, emqx_limiter_bucket_ref:infinity_bucket()
-                )};
-        {
-            #{rate := CliRate} = ClientCfg,
-            #{rate := BucketRate} = BucketCfg,
-            _
-        } ->
-            case emqx_limiter_manager:find_bucket(Id, Type) of
-                {ok, Bucket} ->
-                    BucketSize = emqx_limiter_schema:calc_capacity(BucketCfg),
-                    CliSize = emqx_limiter_schema:calc_capacity(ClientCfg),
-                    {ok,
-                        if
-                            CliRate < BucketRate orelse CliSize < BucketSize ->
-                                emqx_htb_limiter:make_token_bucket_limiter(ClientCfg, Bucket);
-                            true ->
-                                emqx_htb_limiter:make_ref_limiter(ClientCfg, Bucket)
-                        end};
-                undefined ->
-                    ?SLOG(error, #{msg => "bucket_not_found", type => Type, id => Id}),
-                    {error, invalid_bucket}
-            end
-    end.
+    create_limiter(
+        Id,
+        Type,
+        emqx_utils_maps:deep_get([client, Type], Cfg, undefined),
+        maps:get(Type, Cfg, undefined)
+    ).
 
 
 -spec add_bucket(limiter_id(), limiter_type(), hocons:config() | undefined) -> ok.
 -spec add_bucket(limiter_id(), limiter_type(), hocons:config() | undefined) -> ok.
-add_bucket(_Id, _Type, undefine) ->
+add_bucket(_Id, _Type, undefined) ->
     ok;
     ok;
 add_bucket(Id, Type, Cfg) ->
 add_bucket(Id, Type, Cfg) ->
     ?CALL(Type, {add_bucket, Id, Cfg}).
     ?CALL(Type, {add_bucket, Id, Cfg}).
@@ -288,7 +267,8 @@ handle_info(Info, State) ->
     Reason :: normal | shutdown | {shutdown, term()} | term(),
     Reason :: normal | shutdown | {shutdown, term()} | term(),
     State :: term()
     State :: term()
 ) -> any().
 ) -> any().
-terminate(_Reason, _State) ->
+terminate(_Reason, #{type := Type}) ->
+    emqx_limiter_manager:delete_root(Type),
     ok.
     ok.
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -343,10 +323,14 @@ oscillation(
     oscillate(Interval),
     oscillate(Interval),
     Ordereds = get_ordered_buckets(Buckets),
     Ordereds = get_ordered_buckets(Buckets),
     {Alloced, Buckets2} = transverse(Ordereds, Flow, 0.0, Buckets),
     {Alloced, Buckets2} = transverse(Ordereds, Flow, 0.0, Buckets),
-    maybe_burst(State#{
-        buckets := Buckets2,
-        root := Root#{produced := Produced + Alloced}
-    }).
+    State2 = maybe_adjust_root_tokens(
+        State#{
+            buckets := Buckets2,
+            root := Root#{produced := Produced + Alloced}
+        },
+        Alloced
+    ),
+    maybe_burst(State2).
 
 
 %% @doc horizontal spread
 %% @doc horizontal spread
 -spec transverse(
 -spec transverse(
@@ -419,6 +403,24 @@ get_ordered_buckets(Buckets) ->
         Buckets
         Buckets
     ).
     ).
 
 
+-spec maybe_adjust_root_tokens(state(), float()) -> state().
+maybe_adjust_root_tokens(#{root := #{rate := infinity}} = State, _Alloced) ->
+    State;
+maybe_adjust_root_tokens(#{root := #{rate := Rate}} = State, Alloced) when Alloced >= Rate ->
+    State;
+maybe_adjust_root_tokens(#{root := #{rate := Rate} = Root, counter := Counter} = State, Alloced) ->
+    InFlow = Rate - Alloced,
+    Token = counters:get(Counter, ?ROOT_COUNTER_IDX),
+    case Token >= Rate of
+        true ->
+            State;
+        _ ->
+            Available = erlang:min(Rate - Token, InFlow),
+            {Inc, Root2} = emqx_limiter_correction:add(Available, Root),
+            counters:add(Counter, ?ROOT_COUNTER_IDX, Inc),
+            State#{root := Root2}
+    end.
+
 -spec maybe_burst(state()) -> state().
 -spec maybe_burst(state()) -> state().
 maybe_burst(
 maybe_burst(
     #{
     #{
@@ -482,12 +484,16 @@ init_tree(Type) when is_atom(Type) ->
     Cfg = emqx:get_config([limiter, Type]),
     Cfg = emqx:get_config([limiter, Type]),
     init_tree(Type, Cfg).
     init_tree(Type, Cfg).
 
 
-init_tree(Type, Cfg) ->
+init_tree(Type, #{rate := Rate} = Cfg) ->
+    Counter = counters:new(?COUNTER_SIZE, [write_concurrency]),
+    RootBucket = emqx_limiter_bucket_ref:new(Counter, ?ROOT_COUNTER_IDX, Rate),
+    emqx_limiter_manager:insert_root(Type, RootBucket),
     #{
     #{
         type => Type,
         type => Type,
         root => make_root(Cfg),
         root => make_root(Cfg),
-        counter => counters:new(?COUNTER_SIZE, [write_concurrency]),
-        index => 0,
+        counter => Counter,
+        %% The first slot is reserved for the root
+        index => ?ROOT_COUNTER_IDX,
         buckets => #{}
         buckets => #{}
     }.
     }.
 
 
@@ -497,7 +503,8 @@ make_root(#{rate := Rate, burst := Burst}) ->
         rate => Rate,
         rate => Rate,
         burst => Burst,
         burst => Burst,
         period => emqx_limiter_schema:default_period(),
         period => emqx_limiter_schema:default_period(),
-        produced => 0.0
+        produced => 0.0,
+        correction => 0
     }.
     }.
 
 
 do_add_bucket(_Id, #{rate := infinity}, #{root := #{rate := infinity}} = State) ->
 do_add_bucket(_Id, #{rate := infinity}, #{root := #{rate := infinity}} = State) ->
@@ -571,25 +578,61 @@ call(Type, Msg) ->
             gen_server:call(Pid, Msg)
             gen_server:call(Pid, Msg)
     end.
     end.
 
 
-find_limiter_cfg(Type, #{rate := _} = Cfg) ->
-    {find_client_cfg(Type, maps:get(client, Cfg, undefined)), Cfg, find_node_cfg(Type)};
-find_limiter_cfg(Type, Cfg) ->
-    {
-        find_client_cfg(Type, emqx_utils_maps:deep_get([client, Type], Cfg, undefined)),
-        maps:get(Type, Cfg, undefined),
-        find_node_cfg(Type)
-    }.
-
-find_client_cfg(Type, BucketCfg) ->
-    NodeCfg = emqx:get_config([limiter, client, Type], undefined),
-    merge_client_cfg(NodeCfg, BucketCfg).
+create_limiter(Id, Type, #{rate := Rate} = ClientCfg, BucketCfg) when Rate =/= infinity ->
+    create_limiter_with_client(Id, Type, ClientCfg, BucketCfg);
+create_limiter(Id, Type, _, BucketCfg) ->
+    create_limiter_without_client(Id, Type, BucketCfg).
+
+%% create a limiter with the client-level configuration
+create_limiter_with_client(Id, Type, ClientCfg, BucketCfg) ->
+    case find_referenced_bucket(Id, Type, BucketCfg) of
+        false ->
+            {ok, emqx_htb_limiter:make_local_limiter(ClientCfg, infinity)};
+        {ok, Bucket, RefCfg} ->
+            create_limiter_with_ref(Bucket, ClientCfg, RefCfg);
+        Error ->
+            Error
+    end.
 
 
-merge_client_cfg(undefined, BucketCfg) ->
-    BucketCfg;
-merge_client_cfg(NodeCfg, undefined) ->
-    NodeCfg;
-merge_client_cfg(NodeCfg, BucketCfg) ->
-    maps:merge(NodeCfg, BucketCfg).
+%% create a limiter only with the referenced configuration
+create_limiter_without_client(Id, Type, BucketCfg) ->
+    case find_referenced_bucket(Id, Type, BucketCfg) of
+        false ->
+            {ok, emqx_htb_limiter:make_infinity_limiter()};
+        {ok, Bucket, RefCfg} ->
+            ClientCfg = emqx_limiter_schema:default_client_config(),
+            create_limiter_with_ref(Bucket, ClientCfg, RefCfg);
+        Error ->
+            Error
+    end.
 
 
-find_node_cfg(Type) ->
-    emqx:get_config([limiter, Type], #{rate => infinity, burst => 0}).
+create_limiter_with_ref(
+    Bucket,
+    #{rate := CliRate} = ClientCfg,
+    #{rate := RefRate}
+) when CliRate < RefRate ->
+    {ok, emqx_htb_limiter:make_local_limiter(ClientCfg, Bucket)};
+create_limiter_with_ref(Bucket, ClientCfg, _) ->
+    {ok, emqx_htb_limiter:make_ref_limiter(ClientCfg, Bucket)}.
+
+%% this is a listener(server)-level reference
+find_referenced_bucket(Id, Type, #{rate := Rate} = Cfg) when Rate =/= infinity ->
+    case emqx_limiter_manager:find_bucket(Id, Type) of
+        {ok, Bucket} ->
+            {ok, Bucket, Cfg};
+        _ ->
+            ?SLOG(error, #{msg => "bucket not found", type => Type, id => Id}),
+            {error, invalid_bucket}
+    end;
+%% this is a node-level reference
+find_referenced_bucket(Id, Type, _) ->
+    case emqx:get_config([limiter, Type], undefined) of
+        #{rate := infinity} ->
+            false;
+        undefined ->
+            ?SLOG(error, #{msg => "invalid limiter type", type => Type, id => Id}),
+            {error, invalid_bucket};
+        NodeCfg ->
+            {ok, Bucket} = emqx_limiter_manager:find_root(Type),
+            {ok, Bucket, NodeCfg}
+    end.

+ 2 - 2
apps/emqx/src/emqx_listeners.erl

@@ -494,7 +494,7 @@ esockd_opts(ListenerId, Type, Opts0) ->
     Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
     Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
     Limiter = limiter(Opts0),
     Limiter = limiter(Opts0),
     Opts2 =
     Opts2 =
-        case maps:get(connection, Limiter, undefined) of
+        case emqx_limiter_schema:extract_with_type(connection, Limiter) of
             undefined ->
             undefined ->
                 Opts1;
                 Opts1;
             BucketCfg ->
             BucketCfg ->
@@ -639,7 +639,7 @@ zone(Opts) ->
     maps:get(zone, Opts, undefined).
     maps:get(zone, Opts, undefined).
 
 
 limiter(Opts) ->
 limiter(Opts) ->
-    maps:get(limiter, Opts, #{}).
+    maps:get(limiter, Opts, undefined).
 
 
 add_limiter_bucket(Id, #{limiter := Limiter}) ->
 add_limiter_bucket(Id, #{limiter := Limiter}) ->
     maps:fold(
     maps:fold(

+ 236 - 70
apps/emqx/test/emqx_ratelimiter_SUITE.erl

@@ -38,6 +38,7 @@
 -define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
 -define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
 -define(RATE(Rate), to_rate(Rate)).
 -define(RATE(Rate), to_rate(Rate)).
 -define(NOW, erlang:system_time(millisecond)).
 -define(NOW, erlang:system_time(millisecond)).
+-define(ROOT_COUNTER_IDX, 1).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Setups
 %% Setups
@@ -211,11 +212,11 @@ t_infinity_client(_) ->
     end,
     end,
     with_per_client(Fun, Case).
     with_per_client(Fun, Case).
 
 
-t_try_restore_agg(_) ->
+t_try_restore_with_bucket(_) ->
     Fun = fun(#{client := Cli} = Bucket) ->
     Fun = fun(#{client := Cli} = Bucket) ->
         Bucket2 = Bucket#{
         Bucket2 = Bucket#{
-            rate := 1,
-            burst := 199,
+            rate := 100,
+            burst := 100,
             initial := 50
             initial := 50
         },
         },
         Cli2 = Cli#{
         Cli2 = Cli#{
@@ -394,38 +395,6 @@ t_burst(_) ->
         Case
         Case
     ).
     ).
 
 
-t_limit_global_with_unlimit_other(_) ->
-    GlobalMod = fun(#{message_routing := MR} = Cfg) ->
-        Cfg#{message_routing := MR#{rate := ?RATE("600/1s")}}
-    end,
-
-    Bucket = fun(#{client := Cli} = Bucket) ->
-        Bucket2 = Bucket#{
-            rate := infinity,
-            initial := 0,
-            burst := 0
-        },
-        Cli2 = Cli#{
-            rate := infinity,
-            burst := 0,
-            initial := 0
-        },
-        Bucket2#{client := Cli2}
-    end,
-
-    Case = fun() ->
-        C1 = counters:new(1, []),
-        start_client({b1, Bucket}, ?NOW + 2000, C1, 20),
-        timer:sleep(2200),
-        check_average_rate(C1, 2, 600)
-    end,
-
-    with_global(
-        GlobalMod,
-        [{b1, Bucket}],
-        Case
-    ).
-
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Test Cases container
 %% Test Cases container
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -454,38 +423,6 @@ t_check_container(_) ->
     end,
     end,
     with_per_client(Cfg, Case).
     with_per_client(Cfg, Case).
 
 
-%%--------------------------------------------------------------------
-%% Test Override
-%%--------------------------------------------------------------------
-t_bucket_no_client(_) ->
-    Rate = ?RATE("1/s"),
-    GlobalMod = fun(#{client := #{message_routing := MR} = Client} = Cfg) ->
-        Cfg#{client := Client#{message_routing := MR#{rate := Rate}}}
-    end,
-    BucketMod = fun(Bucket) ->
-        maps:remove(client, Bucket)
-    end,
-    Case = fun() ->
-        Limiter = connect(BucketMod(make_limiter_cfg())),
-        ?assertMatch(#{rate := Rate}, Limiter)
-    end,
-    with_global(GlobalMod, [BucketMod], Case).
-
-t_bucket_client(_) ->
-    GlobalRate = ?RATE("1/s"),
-    BucketRate = ?RATE("10/s"),
-    GlobalMod = fun(#{client := #{message_routing := MR} = Client} = Cfg) ->
-        Cfg#{client := Client#{message_routing := MR#{rate := GlobalRate}}}
-    end,
-    BucketMod = fun(#{client := Client} = Bucket) ->
-        Bucket#{client := Client#{rate := BucketRate}}
-    end,
-    Case = fun() ->
-        Limiter = connect(BucketMod(make_limiter_cfg())),
-        ?assertMatch(#{rate := BucketRate}, Limiter)
-    end,
-    with_global(GlobalMod, [BucketMod], Case).
-
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Test Cases misc
 %% Test Cases misc
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -574,7 +511,7 @@ t_schema_unit(_) ->
     ?assertEqual({ok, 100 * 1024 * 1024 * 1024}, M:to_capacity("100GB")),
     ?assertEqual({ok, 100 * 1024 * 1024 * 1024}, M:to_capacity("100GB")),
     ok.
     ok.
 
 
-compatibility_for_capacity(_) ->
+t_compatibility_for_capacity(_) ->
     CfgStr = <<
     CfgStr = <<
         ""
         ""
         "\n"
         "\n"
@@ -594,7 +531,7 @@ compatibility_for_capacity(_) ->
         parse_and_check(CfgStr)
         parse_and_check(CfgStr)
     ).
     ).
 
 
-compatibility_for_message_in(_) ->
+t_compatibility_for_message_in(_) ->
     CfgStr = <<
     CfgStr = <<
         ""
         ""
         "\n"
         "\n"
@@ -614,7 +551,7 @@ compatibility_for_message_in(_) ->
         parse_and_check(CfgStr)
         parse_and_check(CfgStr)
     ).
     ).
 
 
-compatibility_for_bytes_in(_) ->
+t_compatibility_for_bytes_in(_) ->
     CfgStr = <<
     CfgStr = <<
         ""
         ""
         "\n"
         "\n"
@@ -634,6 +571,174 @@ compatibility_for_bytes_in(_) ->
         parse_and_check(CfgStr)
         parse_and_check(CfgStr)
     ).
     ).
 
 
+t_extract_with_type(_) ->
+    IsOnly = fun
+        (_Key, Cfg) when map_size(Cfg) =/= 1 ->
+            false;
+        (Key, Cfg) ->
+            maps:is_key(Key, Cfg)
+    end,
+    Checker = fun
+        (Type, #{client := Client} = Cfg) ->
+            Cfg2 = maps:remove(client, Cfg),
+            IsOnly(Type, Client) andalso
+                (IsOnly(Type, Cfg2) orelse
+                    map_size(Cfg2) =:= 0);
+        (Type, Cfg) ->
+            IsOnly(Type, Cfg)
+    end,
+    ?assertEqual(undefined, emqx_limiter_schema:extract_with_type(messages, undefined)),
+    ?assert(
+        Checker(
+            messages,
+            emqx_limiter_schema:extract_with_type(messages, #{
+                messages => #{rate => 1}, bytes => #{rate => 1}
+            })
+        )
+    ),
+    ?assert(
+        Checker(
+            messages,
+            emqx_limiter_schema:extract_with_type(messages, #{
+                messages => #{rate => 1},
+                bytes => #{rate => 1},
+                client => #{messages => #{rate => 2}}
+            })
+        )
+    ),
+    ?assert(
+        Checker(
+            messages,
+            emqx_limiter_schema:extract_with_type(messages, #{
+                client => #{messages => #{rate => 2}, bytes => #{rate => 1}}
+            })
+        )
+    ).
+
+%%--------------------------------------------------------------------
+%% Test Cases  Create Instance
+%%--------------------------------------------------------------------
+t_create_instance_with_infinity_node(_) ->
+    emqx_limiter_manager:insert_bucket(?FUNCTION_NAME, bytes, ?FUNCTION_NAME),
+    Cases = make_create_test_data_with_infinity_node(?FUNCTION_NAME),
+    lists:foreach(
+        fun({Cfg, Expected}) ->
+            {ok, Result} = emqx_limiter_server:connect(?FUNCTION_NAME, bytes, Cfg),
+            IsMatched =
+                case is_atom(Expected) of
+                    true ->
+                        Result =:= Expected;
+                    _ ->
+                        Expected(Result)
+                end,
+            ?assert(
+                IsMatched,
+                lists:flatten(
+                    io_lib:format("Got unexpected:~p~n, Cfg:~p~n", [
+                        Result, Cfg
+                    ])
+                )
+            )
+        end,
+        Cases
+    ),
+    emqx_limiter_manager:delete_bucket(?FUNCTION_NAME, bytes),
+    ok.
+
+t_not_exists_instance(_) ->
+    Cfg = #{bytes => #{rate => 100, burst => 0, initial => 0}},
+    ?assertEqual(
+        {error, invalid_bucket},
+        emqx_limiter_server:connect(?FUNCTION_NAME, bytes, Cfg)
+    ),
+
+    ?assertEqual(
+        {error, invalid_bucket},
+        emqx_limiter_server:connect(?FUNCTION_NAME, not_exists, Cfg)
+    ),
+    ok.
+
+t_create_instance_with_node(_) ->
+    GlobalMod = fun(#{message_routing := MR} = Cfg) ->
+        Cfg#{
+            message_routing := MR#{rate := ?RATE("200/1s")},
+            messages := MR#{rate := ?RATE("200/1s")}
+        }
+    end,
+
+    B1 = fun(Bucket) ->
+        Bucket#{rate := ?RATE("400/1s")}
+    end,
+
+    B2 = fun(Bucket) ->
+        Bucket#{rate := infinity}
+    end,
+
+    IsRefLimiter = fun
+        ({ok, #{tokens := _}}, _IsRoot) ->
+            false;
+        ({ok, #{bucket := #{index := ?ROOT_COUNTER_IDX}}}, true) ->
+            true;
+        ({ok, #{bucket := #{index := Index}}}, false) when Index =/= ?ROOT_COUNTER_IDX ->
+            true;
+        (Result, _IsRoot) ->
+            ct:pal("The result is:~p~n", [Result]),
+            false
+    end,
+
+    Case = fun() ->
+        BucketCfg = make_limiter_cfg(),
+
+        ?assert(
+            IsRefLimiter(emqx_limiter_server:connect(b1, message_routing, B1(BucketCfg)), false)
+        ),
+        ?assert(
+            IsRefLimiter(emqx_limiter_server:connect(b2, message_routing, B2(BucketCfg)), true)
+        ),
+        ?assert(IsRefLimiter(emqx_limiter_server:connect(x, messages, undefined), true)),
+        ?assertNot(IsRefLimiter(emqx_limiter_server:connect(x, bytes, undefined), false))
+    end,
+
+    with_global(
+        GlobalMod,
+        [{b1, B1}, {b2, B2}],
+        Case
+    ),
+    ok.
+
+%%--------------------------------------------------------------------
+%% Test Cases emqx_esockd_htb_limiter
+%%--------------------------------------------------------------------
+t_create_esockd_htb_limiter(_) ->
+    Opts = emqx_esockd_htb_limiter:new_create_options(?FUNCTION_NAME, bytes, undefined),
+    ?assertMatch(
+        #{module := _, id := ?FUNCTION_NAME, type := bytes, bucket := undefined},
+        Opts
+    ),
+
+    Limiter = emqx_esockd_htb_limiter:create(Opts),
+    ?assertMatch(
+        #{module := _, name := bytes, limiter := infinity},
+        Limiter
+    ),
+
+    ?assertEqual(ok, emqx_esockd_htb_limiter:delete(Limiter)),
+    ok.
+
+t_esockd_htb_consume(_) ->
+    ClientCfg = emqx_limiter_schema:default_client_config(),
+    Cfg = #{client => #{bytes => ClientCfg#{rate := 50, max_retry_time := 0}}},
+    Opts = emqx_esockd_htb_limiter:new_create_options(?FUNCTION_NAME, bytes, Cfg),
+    Limiter = emqx_esockd_htb_limiter:create(Opts),
+
+    C1R = emqx_esockd_htb_limiter:consume(51, Limiter),
+    ?assertMatch({pause, _Ms, _Limiter2}, C1R),
+
+    timer:sleep(300),
+    C2R = emqx_esockd_htb_limiter:consume(50, Limiter),
+    ?assertMatch({ok, _}, C2R),
+    ok.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %%% Internal functions
 %%% Internal functions
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -877,3 +982,64 @@ apply_modifier(Pairs, #{default := Template}) ->
 parse_and_check(ConfigString) ->
 parse_and_check(ConfigString) ->
     ok = emqx_common_test_helpers:load_config(emqx_schema, ConfigString),
     ok = emqx_common_test_helpers:load_config(emqx_schema, ConfigString),
     emqx:get_config([listeners, tcp, default, limiter]).
     emqx:get_config([listeners, tcp, default, limiter]).
+
+make_create_test_data_with_infinity_node(FakeInstnace) ->
+    Infinity = emqx_htb_limiter:make_infinity_limiter(),
+    ClientCfg = emqx_limiter_schema:default_client_config(),
+    InfinityRef = emqx_limiter_bucket_ref:infinity_bucket(),
+    MkC = fun(Rate) ->
+        #{client => #{bytes => ClientCfg#{rate := Rate}}}
+    end,
+    MkB = fun(Rate) ->
+        #{bytes => #{rate => Rate, burst => 0, initial => 0}}
+    end,
+
+    MkA = fun(Client, Bucket) ->
+        maps:merge(MkC(Client), MkB(Bucket))
+    end,
+    IsRefLimiter = fun(Expected) ->
+        fun
+            (#{tokens := _}) -> false;
+            (#{bucket := Bucket}) -> Bucket =:= Expected;
+            (_) -> false
+        end
+    end,
+
+    IsTokenLimiter = fun(Expected) ->
+        fun
+            (#{tokens := _, bucket := Bucket}) -> Bucket =:= Expected;
+            (_) -> false
+        end
+    end,
+
+    [
+        %% default situation, no limiter setting
+        {undefined, Infinity},
+
+        %% client = undefined bucket = undefined
+        {#{}, Infinity},
+        %% client = undefined bucket = infinity
+        {MkB(infinity), Infinity},
+        %% client = undefined bucket = other
+        {MkB(100), IsRefLimiter(FakeInstnace)},
+
+        %% client = infinity bucket = undefined
+        {MkC(infinity), Infinity},
+        %% client = infinity bucket = infinity
+        {MkA(infinity, infinity), Infinity},
+
+        %% client = infinity bucket = other
+        {MkA(infinity, 100), IsRefLimiter(FakeInstnace)},
+
+        %% client = other bucket = undefined
+        {MkC(100), IsTokenLimiter(InfinityRef)},
+
+        %% client = other bucket = infinity
+        {MkC(100), IsTokenLimiter(InfinityRef)},
+
+        %% client = C bucket = B C < B
+        {MkA(100, 1000), IsTokenLimiter(FakeInstnace)},
+
+        %% client = C bucket = B C > B
+        {MkA(1000, 100), IsRefLimiter(FakeInstnace)}
+    ].