فهرست منبع

feat(limiter): change zone to group and simplify config

lafirest 4 سال پیش
والد
کامیت
e7dec7835f

+ 22 - 24
apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf

@@ -5,51 +5,49 @@
 limiter {
 limiter {
   ## rate limiter for message publish
   ## rate limiter for message publish
   bytes_in {
   bytes_in {
-    bucket.default {
-      aggregated.rate = infinity
-      aggregated.capacity = infinity
-      per_client.rate = infinity
-      per_client.capacity = infinity
+    group.default {
+      bucket.default {
+        rate = infinity
+        capacity = infinity
+      }
     }
     }
   }
   }
 
 
   ## rate limiter for message publish
   ## rate limiter for message publish
   message_in {
   message_in {
-    bucket.default {
-      aggregated.rate = infinity
-      aggregated.capacity = infinity
-      per_client.rate = infinity
-      per_client.capacity = infinity
+    group.default {
+      bucket.default {
+        rate = infinity
+        capacity = infinity
+      }
     }
     }
   }
   }
 
 
   ## connection rate limiter
   ## connection rate limiter
   connection {
   connection {
-    bucket.default {
-      aggregated.rate = infinity
-      aggregated.capacity = infinity
-      per_client.rate = infinity
-      per_client.capacity = infinity
+    group.default {
+      bucket.default {
+        rate = infinity
+        capacity = infinity
+      }
     }
     }
   }
   }
 
 
   ## rate limiter for message deliver
   ## rate limiter for message deliver
   message_routing {
   message_routing {
-    bucket.default {
-      aggregated.rate = infinity
-      aggregated.capacity = infinity
-      per_client.rate = infinity
-      per_client.capacity = infinity
+    group.default {
+      bucket.default {
+        rate = infinity
+        capacity = infinity
+      }
     }
     }
   }
   }
 
 
   ## Some functions that don't need to use global and zone scope, them can shared use this type
   ## Some functions that don't need to use global and zone scope, them can shared use this type
   shared {
   shared {
     bucket.retainer {
     bucket.retainer {
-      aggregated.rate = infinity
-      aggregated.capacity = infinity
-      per_client.rate = infinity
-      per_client.capacity = infinity
+      rate = infinity
+      capacity = infinity
     }
     }
   }
   }
 }
 }

+ 18 - 12
apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl

@@ -23,8 +23,8 @@
 
 
 %% API
 %% API
 -export([ start_link/0, start_server/1, find_bucket/1
 -export([ start_link/0, start_server/1, find_bucket/1
-        , find_bucket/3, insert_bucket/2, insert_bucket/4
-        , make_path/3, restart_server/1]).
+        , find_bucket/2, insert_bucket/2, insert_bucket/3
+        , make_path/2, make_path/3, restart_server/1]).
 
 
 %% gen_server callbacks
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -36,6 +36,7 @@
 -type limiter_type() :: emqx_limiter_schema:limiter_type().
 -type limiter_type() :: emqx_limiter_schema:limiter_type().
 -type zone_name() :: emqx_limiter_schema:zone_name().
 -type zone_name() :: emqx_limiter_schema:zone_name().
 -type bucket_name() :: emqx_limiter_schema:bucket_name().
 -type bucket_name() :: emqx_limiter_schema:bucket_name().
+-type bucket_path() :: emqx_limiter_schema:bucket_path().
 
 
 %% counter record in ets table
 %% counter record in ets table
 -record(bucket, { path :: path()
 -record(bucket, { path :: path()
@@ -57,10 +58,10 @@ start_server(Type) ->
 restart_server(Type) ->
 restart_server(Type) ->
     emqx_limiter_server_sup:restart(Type).
     emqx_limiter_server_sup:restart(Type).
 
 
--spec find_bucket(limiter_type(), zone_name(), bucket_name()) ->
+-spec find_bucket(limiter_type(), bucket_path()) ->
           {ok, bucket_ref()} | undefined.
           {ok, bucket_ref()} | undefined.
-find_bucket(Type, Zone, BucketId) ->
-    find_bucket(make_path(Type, Zone, BucketId)).
+find_bucket(Type, BucketPath) ->
+    find_bucket(make_path(Type, BucketPath)).
 
 
 -spec find_bucket(path()) -> {ok, bucket_ref()} | undefined.
 -spec find_bucket(path()) -> {ok, bucket_ref()} | undefined.
 find_bucket(Path) ->
 find_bucket(Path) ->
@@ -72,21 +73,26 @@ find_bucket(Path) ->
     end.
     end.
 
 
 -spec insert_bucket(limiter_type(),
 -spec insert_bucket(limiter_type(),
-                    zone_name(),
-                    bucket_name(),
+                    bucket_path(),
                     bucket_ref()) -> boolean().
                     bucket_ref()) -> boolean().
-insert_bucket(Type, Zone, BucketId, Bucket) ->
-    inner_insert_bucket(make_path(Type, Zone, BucketId),
-                        Bucket).
+insert_bucket(Type, BucketPath, Bucket) ->
+    inner_insert_bucket(make_path(Type, BucketPath), Bucket).
 
 
 
 
 -spec insert_bucket(path(), bucket_ref()) -> true.
 -spec insert_bucket(path(), bucket_ref()) -> true.
 insert_bucket(Path, Bucket) ->
 insert_bucket(Path, Bucket) ->
     inner_insert_bucket(Path, Bucket).
     inner_insert_bucket(Path, Bucket).
 
 
+-spec make_path(limiter_type(), bucket_path()) -> path().
+make_path(Type, BucketPath) ->
+    [Type | BucketPath].
+
 -spec make_path(limiter_type(), zone_name(), bucket_name()) -> path().
 -spec make_path(limiter_type(), zone_name(), bucket_name()) -> path().
-make_path(Type, Name, BucketId) ->
-    [Type, Name, BucketId].
+make_path(shared, _GroupName, BucketName) ->
+    [shared, BucketName];
+
+make_path(Type, GroupName, BucketName) ->
+    [Type, GroupName, BucketName].
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% @doc
 %% @doc

+ 70 - 34
apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl

@@ -20,7 +20,7 @@
 
 
 -export([ roots/0, fields/1, to_rate/1, to_capacity/1
 -export([ roots/0, fields/1, to_rate/1, to_capacity/1
         , minimum_period/0, to_burst_rate/1, to_initial/1
         , minimum_period/0, to_burst_rate/1, to_initial/1
-        , namespace/0]).
+        , namespace/0, to_bucket_path/1, default_group_name/0]).
 
 
 -define(KILOBYTE, 1024).
 -define(KILOBYTE, 1024).
 
 
@@ -36,6 +36,7 @@
 -type burst_rate() :: 0 | float().
 -type burst_rate() :: 0 | float().
 -type capacity() :: infinity | number().    %% the capacity of the token bucket
 -type capacity() :: infinity | number().    %% the capacity of the token bucket
 -type initial() :: non_neg_integer().       %% initial capacity of the token bucket
 -type initial() :: non_neg_integer().       %% initial capacity of the token bucket
+-type bucket_path() :: list(atom()).
 
 
 %% the processing strategy after the failure of the token request
 %% the processing strategy after the failure of the token request
 -type failure_strategy() :: force           %% Forced to pass
 -type failure_strategy() :: force           %% Forced to pass
@@ -46,17 +47,20 @@
 -typerefl_from_string({burst_rate/0, ?MODULE, to_burst_rate}).
 -typerefl_from_string({burst_rate/0, ?MODULE, to_burst_rate}).
 -typerefl_from_string({capacity/0, ?MODULE, to_capacity}).
 -typerefl_from_string({capacity/0, ?MODULE, to_capacity}).
 -typerefl_from_string({initial/0, ?MODULE, to_initial}).
 -typerefl_from_string({initial/0, ?MODULE, to_initial}).
+-typerefl_from_string({bucket_path/0, ?MODULE, to_bucket_path}).
 
 
 -reflect_type([ rate/0
 -reflect_type([ rate/0
               , burst_rate/0
               , burst_rate/0
               , capacity/0
               , capacity/0
               , initial/0
               , initial/0
               , failure_strategy/0
               , failure_strategy/0
+              , bucket_path/0
               ]).
               ]).
 
 
 -export_type([limiter_type/0, bucket_name/0, zone_name/0]).
 -export_type([limiter_type/0, bucket_name/0, zone_name/0]).
 
 
 -import(emqx_schema, [sc/2, map/2]).
 -import(emqx_schema, [sc/2, map/2]).
+-define(UNIT_TIME_IN_MS, 1000).
 
 
 namespace() -> limiter.
 namespace() -> limiter.
 
 
@@ -67,43 +71,43 @@ fields(limiter) ->
     , {message_in, sc(ref(limiter_opts), #{})}
     , {message_in, sc(ref(limiter_opts), #{})}
     , {connection, sc(ref(limiter_opts), #{})}
     , {connection, sc(ref(limiter_opts), #{})}
     , {message_routing, sc(ref(limiter_opts), #{})}
     , {message_routing, sc(ref(limiter_opts), #{})}
-    , {shared, sc(ref(shared_limiter_opts),
+    , {shared, sc(ref(shared_opts),
                   #{description =>
                   #{description =>
                         <<"Some functions that do not need to use global and zone scope,"
                         <<"Some functions that do not need to use global and zone scope,"
                           "them can shared use this type">>})}
                           "them can shared use this type">>})}
     ];
     ];
 
 
 fields(limiter_opts) ->
 fields(limiter_opts) ->
-    [ {global, sc(ref(rate_burst), #{required => false})}
-    , {zone, sc(map("zone name", ref(rate_burst)), #{required => false})}
-    , {bucket, sc(map("bucket_id", ref(bucket)),
-                  #{desc => "Token bucket"})}
-    ];
+    fields(rate_burst) ++ %% the node global limit
+        [ {group, sc(map("group name", ref(group_opts)), #{})}
+        ];
 
 
-fields(shared_limiter_opts) ->
-    [{bucket, sc(map("bucket_id", ref(bucket)),
-                 #{desc => "Token bucket"})}
-    ];
+fields(group_opts) ->
+    fields(rate_burst) ++  %% the group limite
+        [ {bucket, sc(map("bucket name", ref(bucket_opts)), #{})}
+        ];
 
 
 fields(rate_burst) ->
 fields(rate_burst) ->
-    [ {rate, sc(rate(), #{})}
+    [ {rate, sc(rate(), #{default => "infinity"})}
     , {burst, sc(burst_rate(), #{default => "0/0s"})}
     , {burst, sc(burst_rate(), #{default => "0/0s"})}
     ];
     ];
 
 
-fields(bucket) ->
-    [ {zone, sc(atom(), #{desc => "The bucket's zone", default => default})}
-    , {aggregated, sc(ref(bucket_aggregated), #{})}
-    , {per_client, sc(ref(client_bucket), #{})}
-    ];
+fields(shared_opts) ->
+    [{bucket, sc(map("bucket name", ref(bucket_opts)), #{})}];
 
 
-fields(bucket_aggregated) ->
+fields(bucket_opts) ->
     [ {rate, sc(rate(), #{})}
     [ {rate, sc(rate(), #{})}
     , {initial, sc(initial(), #{default => "0"})}
     , {initial, sc(initial(), #{default => "0"})}
     , {capacity, sc(capacity(), #{})}
     , {capacity, sc(capacity(), #{})}
+    , {per_client, sc(ref(client_bucket),
+                      #{default => #{},
+                        desc => "The rate limit for each user of the bucket,"
+                        "this field is not required"
+                       })}
     ];
     ];
 
 
 fields(client_bucket) ->
 fields(client_bucket) ->
-    [ {rate, sc(rate(), #{})}
+    [ {rate, sc(rate(), #{default => "infinity"})}
     , {initial, sc(initial(), #{default => "0"})}
     , {initial, sc(initial(), #{default => "0"})}
       %% low_water_mark add for emqx_channel and emqx_session
       %% low_water_mark add for emqx_channel and emqx_session
       %% both modules consume first and then check
       %% both modules consume first and then check
@@ -113,13 +117,14 @@ fields(client_bucket) ->
                           #{desc => "If the remaining tokens are lower than this value,
                           #{desc => "If the remaining tokens are lower than this value,
 the check/consume will succeed, but it will be forced to wait for a short period of time.",
 the check/consume will succeed, but it will be forced to wait for a short period of time.",
                             default => "0"})}
                             default => "0"})}
-    , {capacity, sc(capacity(), #{desc => "The capacity of the token bucket."})}
+    , {capacity, sc(capacity(), #{desc => "The capacity of the token bucket.",
+                                  default => "infinity"})}
     , {divisible, sc(boolean(),
     , {divisible, sc(boolean(),
                      #{desc => "Is it possible to split the number of requested tokens?",
                      #{desc => "Is it possible to split the number of requested tokens?",
                        default => false})}
                        default => false})}
     , {max_retry_time, sc(emqx_schema:duration(),
     , {max_retry_time, sc(emqx_schema:duration(),
                           #{ desc => "The maximum retry time when acquire failed."
                           #{ desc => "The maximum retry time when acquire failed."
-                           , default => "5s"})}
+                           , default => "10s"})}
     , {failure_strategy, sc(failure_strategy(),
     , {failure_strategy, sc(failure_strategy(),
                             #{ desc => "The strategy when all the retries failed."
                             #{ desc => "The strategy when all the retries failed."
                              , default => force})}
                              , default => force})}
@@ -132,6 +137,10 @@ minimum_period() ->
 to_rate(Str) ->
 to_rate(Str) ->
     to_rate(Str, true, false).
     to_rate(Str, true, false).
 
 
+%% default group name for shared type limiter
+default_group_name() ->
+    '_default'.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Internal functions
 %% Internal functions
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -145,22 +154,38 @@ to_rate(Str, CanInfinity, CanZero) ->
     case Tokens of
     case Tokens of
         ["infinity"] when CanInfinity ->
         ["infinity"] when CanInfinity ->
             {ok, infinity};
             {ok, infinity};
-        ["0", _] when CanZero ->
-            {ok, 0}; %% for burst
-        [Quota, Interval] ->
-            {ok, Val} = to_capacity(Quota),
-            case emqx_schema:to_duration_ms(Interval) of
-                {ok, Ms} when Ms > 0 ->
-                    {ok, Val * minimum_period() / Ms};
-                _ ->
-                    {error, Str}
-            end;
+        [QuotaStr] -> %% if time unit is 1s, it can be omitted
+            {ok, Val} = to_capacity(QuotaStr),
+            check_capacity(Str, Val, CanZero,
+                           fun(Quota) ->
+                                   Quota * minimum_period() / ?UNIT_TIME_IN_MS
+                           end);
+        [QuotaStr, Interval] ->
+            {ok, Val} = to_capacity(QuotaStr),
+            check_capacity(Str, Val, CanZero,
+                           fun(Quota) ->
+                                   case emqx_schema:to_duration_ms(Interval) of
+                                       {ok, Ms} when Ms > 0 ->
+                                           {ok, Quota * minimum_period() / Ms};
+                                       _ ->
+                                           {error, Str}
+                                   end
+                           end);
         _ ->
         _ ->
             {error, Str}
             {error, Str}
     end.
     end.
 
 
+check_capacity(_Str, 0, true, _Cont) ->
+    {ok, 0};
+
+check_capacity(Str, 0, false, _Cont) ->
+    {error, Str};
+
+check_capacity(_Str, Quota, _CanZero, Cont) ->
+    Cont(Quota).
+
 to_capacity(Str) ->
 to_capacity(Str) ->
-    Regex = "^\s*(?:(?:([1-9][0-9]*)([a-zA-z]*))|infinity)\s*$",
+    Regex = "^\s*(?:([0-9]+)([a-zA-z]*))|infinity\s*$",
     to_quota(Str, Regex).
     to_quota(Str, Regex).
 
 
 to_initial(Str) ->
 to_initial(Str) ->
@@ -175,9 +200,9 @@ to_quota(Str, Regex) ->
             Val = erlang:list_to_integer(Quota),
             Val = erlang:list_to_integer(Quota),
             Unit2 = string:to_lower(Unit),
             Unit2 = string:to_lower(Unit),
             {ok, apply_unit(Unit2, Val)};
             {ok, apply_unit(Unit2, Val)};
-        {match, [Quota]} ->
+        {match, [Quota, ""]} ->
             {ok, erlang:list_to_integer(Quota)};
             {ok, erlang:list_to_integer(Quota)};
-        {match, []} ->
+        {match, ""} ->
             {ok, infinity};
             {ok, infinity};
         _ ->
         _ ->
             {error, Str}
             {error, Str}
@@ -188,3 +213,14 @@ apply_unit("kb", Val) -> Val * ?KILOBYTE;
 apply_unit("mb", Val) -> Val * ?KILOBYTE * ?KILOBYTE;
 apply_unit("mb", Val) -> Val * ?KILOBYTE * ?KILOBYTE;
 apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE;
 apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE;
 apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit).
 apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit).
+
+to_bucket_path(Str) ->
+    Dirs = [erlang:list_to_atom(string:trim(T)) || T <- string:tokens(Str, ".")],
+    case Dirs of
+        [_Group, _Bucket] = Path ->
+            {ok, Path};
+        [_Bucket] = Path ->
+            {ok, Path};
+        _ ->
+            {error, Str}
+    end.

+ 119 - 83
apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl

@@ -1,4 +1,4 @@
-%%--------------------------------------------------------------------
+%--------------------------------------------------------------------
 %% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
 %% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% Licensed under the Apache License, Version 2.0 (the "License");
@@ -53,7 +53,9 @@
 
 
 -type bucket() :: #{ id := node_id()
 -type bucket() :: #{ id := node_id()
                    , name := bucket_name()
                    , name := bucket_name()
-                   , zone := zone_name()               %% pointer to zone node, use for burst
+                     %% pointer to zone node, use for burst
+                     %% it also can use nodeId, nodeId is more direct, but nodeName is clearer
+                   , zone := zone_name()
                    , rate := rate()
                    , rate := rate()
                    , obtained := non_neg_integer()
                    , obtained := non_neg_integer()
                    , correction := emqx_limiter_decimal:zero_or_float() %% token correction value
                    , correction := emqx_limiter_decimal:zero_or_float() %% token correction value
@@ -82,9 +84,11 @@
 -type capacity() :: decimal().
 -type capacity() :: decimal().
 -type decimal() :: emqx_limiter_decimal:decimal().
 -type decimal() :: emqx_limiter_decimal:decimal().
 -type index() :: pos_integer().
 -type index() :: pos_integer().
+-type bucket_path() :: emqx_limiter_schema:bucket_path().
 
 
 -define(CALL(Type), gen_server:call(name(Type), ?FUNCTION_NAME)).
 -define(CALL(Type), gen_server:call(name(Type), ?FUNCTION_NAME)).
 -define(OVERLOAD_MIN_ALLOC, 0.3).  %% minimum coefficient for overloaded limiter
 -define(OVERLOAD_MIN_ALLOC, 0.3).  %% minimum coefficient for overloaded limiter
+-define(CURRYING(X, Fun2), fun(Y) -> Fun2(X, Y) end).
 
 
 -export_type([index/0]).
 -export_type([index/0]).
 -import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]).
 -import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]).
@@ -95,17 +99,28 @@
 %% API
 %% API
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 -spec connect(limiter_type(),
 -spec connect(limiter_type(),
-              bucket_name() | #{limiter_type() => bucket_name()}) -> emqx_htb_limiter:limiter().
-connect(Type, BucketName) when is_atom(BucketName) ->
-    Path = [limiter, Type, bucket, BucketName],
-    case emqx:get_config(Path, undefined) of
-        undefined ->
-            ?SLOG(error, #{msg => "bucket_config_not_found", path => Path}),
-            throw("bucket's config not found");
-        #{zone := Zone,
-          aggregated := #{rate := AggrRate, capacity := AggrSize},
-          per_client := #{rate := CliRate, capacity := CliSize} = Cfg} ->
-            case emqx_limiter_manager:find_bucket(Type, Zone, BucketName) of
+              bucket_path() | #{limiter_type() => bucket_path() | undefined}) ->
+          emqx_htb_limiter:limiter().
+%% If no bucket path is set in config, there will be no limit
+connect(_Type, undefined) ->
+    emqx_htb_limiter:make_infinity_limiter(undefined);
+
+%% Shared type can use bucket name directly
+connect(shared, BucketName) when is_atom(BucketName) ->
+    connect(shared, [BucketName]);
+
+connect(Type, BucketPath) when is_list(BucketPath) ->
+    FullPath = get_bucket_full_cfg_path(Type, BucketPath),
+    case emqx:get_config(FullPath, undefined) of
+                       undefined ->
+                           io:format(">>>>> config:~p~n fullpath:~p~n", [emqx:get_config([limiter]), FullPath]),
+                           io:format(">>>>> ets:~p~n", [ets:tab2list(emqx_limiter_counters)]),
+                           ?SLOG(error, #{msg => "bucket_config_not_found", path => BucketPath}),
+                           throw("bucket's config not found");
+                       #{rate := AggrRate,
+                         capacity := AggrSize,
+                         per_client := #{rate := CliRate, capacity := CliSize} = Cfg} ->
+                           case emqx_limiter_manager:find_bucket(Type, BucketPath) of
                 {ok, Bucket} ->
                 {ok, Bucket} ->
                     if CliRate < AggrRate orelse CliSize < AggrSize ->
                     if CliRate < AggrRate orelse CliSize < AggrSize ->
                             emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket);
                             emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket);
@@ -115,13 +130,14 @@ connect(Type, BucketName) when is_atom(BucketName) ->
                             emqx_htb_limiter:make_ref_limiter(Cfg, Bucket)
                             emqx_htb_limiter:make_ref_limiter(Cfg, Bucket)
                     end;
                     end;
                 undefined ->
                 undefined ->
-                    ?SLOG(error, #{msg => "bucket_not_found", path => Path}),
+                    io:format(">>>>> ets:~p~n", [ets:tab2list(emqx_limiter_counters)]),
+                    ?SLOG(error, #{msg => "bucket_not_found", path => BucketPath}),
                     throw("invalid bucket")
                     throw("invalid bucket")
             end
             end
     end;
     end;
 
 
-connect(Type, Names) ->
-    connect(Type, maps:get(Type, Names, default)).
+connect(Type, Paths) ->
+    connect(Type, maps:get(Type, Paths, undefined)).
 
 
 -spec info(limiter_type()) -> state().
 -spec info(limiter_type()) -> state().
 info(Type) ->
 info(Type) ->
@@ -374,6 +390,7 @@ maybe_burst(#{buckets := Buckets,
               nodes := Nodes} = State) when Burst > 0 ->
               nodes := Nodes} = State) when Burst > 0 ->
     %% find empty buckets and group by zone name
     %% find empty buckets and group by zone name
     GroupFun = fun(Id, Groups) ->
     GroupFun = fun(Id, Groups) ->
+                       %% TODO filter undefined counter
                        #{counter := Counter,
                        #{counter := Counter,
                          index := Index,
                          index := Index,
                          zone := Zone} = maps:get(Id, Nodes),
                          zone := Zone} = maps:get(Id, Nodes),
@@ -426,7 +443,8 @@ dispatch_burst(GroupL,
                            0 -> NodeAcc;
                            0 -> NodeAcc;
                            ZoneFlow ->
                            ZoneFlow ->
                                EachFlow = ZoneFlow / erlang:length(Childs),
                                EachFlow = ZoneFlow / erlang:length(Childs),
-                               {Alloced, NodeAcc2} = dispatch_burst_to_buckets(Childs, EachFlow, 0, NodeAcc),
+                               {Alloced, NodeAcc2} =
+                                   dispatch_burst_to_buckets(Childs, EachFlow, 0, NodeAcc),
                                Zone2 = Zone#{obtained := Obtained + Alloced},
                                Zone2 = Zone#{obtained := Obtained + Alloced},
                                NodeAcc2#{ZoneId := Zone2}
                                NodeAcc2#{ZoneId := Zone2}
                        end
                        end
@@ -434,7 +452,8 @@ dispatch_burst(GroupL,
     State#{nodes := lists:foldl(Dispatch, Nodes, GroupL)}.
     State#{nodes := lists:foldl(Dispatch, Nodes, GroupL)}.
 
 
 -spec dispatch_burst_to_buckets(list(node_id()),
 -spec dispatch_burst_to_buckets(list(node_id()),
-                                float(), non_neg_integer(), nodes()) -> {non_neg_integer(), nodes()}.
+                                float(),
+                                non_neg_integer(), nodes()) -> {non_neg_integer(), nodes()}.
 dispatch_burst_to_buckets([ChildId | T], InFlow, Alloced, Nodes) ->
 dispatch_burst_to_buckets([ChildId | T], InFlow, Alloced, Nodes) ->
     #{counter := Counter,
     #{counter := Counter,
       index := Index,
       index := Index,
@@ -451,76 +470,91 @@ dispatch_burst_to_buckets([], _, Alloced, Nodes) ->
 
 
 -spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state().
 -spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state().
 init_tree(Type, State) ->
 init_tree(Type, State) ->
-    case emqx:get_config([limiter, Type]) of
-        #{global := Global,
-          zone := Zone,
-          bucket := Bucket} -> ok;
-        #{bucket := Bucket} ->
-            Global = default_rate_burst_cfg(),
-            Zone = #{default => default_rate_burst_cfg()},
+    Cfg = emqx:get_config([limiter, Type]),
+    GlobalCfg = maps:merge(#{rate => infinity, burst => 0}, Cfg),
+    case GlobalCfg of
+        #{group := Group} -> ok;
+        #{bucket := _} ->
+            Group = make_shared_default_group(GlobalCfg),
             ok
             ok
     end,
     end,
-    {Factor, Root} = make_root(Global, Zone),
-    State2 = State#{root := Root},
-    {NodeId, State3} = make_zone(maps:to_list(Zone), Factor, 1, State2),
-    State4 = State3#{counter := counters:new(maps:size(Bucket),
-                                             [write_concurrency])},
-    make_bucket(maps:to_list(Bucket), Global, Zone, Factor, NodeId, [], State4).
-
--spec make_root(hocons:confg(), hocon:config()) -> {number(), root()}.
-make_root(#{rate := Rate, burst := Burst}, Zone) ->
-    ZoneNum = maps:size(Zone),
-    Childs = lists:seq(1, ZoneNum),
+
+    {Factor, Root} = make_root(GlobalCfg),
+    {Zones, Nodes, DelayBuckets} = make_zone(maps:to_list(Group), Type,
+                                             GlobalCfg, Factor, 1, #{}, #{}, #{}),
+
+    State2 = State#{root := Root#{childs := maps:values(Zones)},
+                    zones := Zones,
+                    nodes := Nodes,
+                    buckets := maps:keys(DelayBuckets),
+                    counter := counters:new(maps:size(DelayBuckets), [write_concurrency])
+                   },
+
+    lists:foldl(fun(F, Acc) -> F(Acc) end, State2, maps:values(DelayBuckets)).
+
+-spec make_root(hocons:confg()) -> {number(), root()}.
+make_root(#{rate := Rate, burst := Burst}) ->
     MiniPeriod = emqx_limiter_schema:minimum_period(),
     MiniPeriod = emqx_limiter_schema:minimum_period(),
     if Rate >= 1 ->
     if Rate >= 1 ->
             {1, #{rate => Rate,
             {1, #{rate => Rate,
                   burst => Burst,
                   burst => Burst,
                   period => MiniPeriod,
                   period => MiniPeriod,
-                  childs => Childs,
+                  childs => [],
                   consumed => 0}};
                   consumed => 0}};
        true ->
        true ->
             Factor = 1 / Rate,
             Factor = 1 / Rate,
             {Factor, #{rate => 1,
             {Factor, #{rate => 1,
                        burst => Burst * Factor,
                        burst => Burst * Factor,
                        period => erlang:floor(Factor * MiniPeriod),
                        period => erlang:floor(Factor * MiniPeriod),
-                       childs => Childs,
+                       childs => [],
                        consumed => 0}}
                        consumed => 0}}
     end.
     end.
 
 
-make_zone([{Name, ZoneCfg} | T], Factor, NodeId, State) ->
-    #{rate := Rate, burst := Burst} = ZoneCfg,
-    #{zones := Zones, nodes := Nodes} = State,
+make_zone([{Name, ZoneCfg} | T], Type, GlobalCfg, Factor, NodeId, Zones, Nodes, DelayBuckets) ->
+    #{rate := Rate, burst := Burst, bucket := BucketMap} = ZoneCfg,
+    BucketCfgs = maps:to_list(BucketMap),
+
+    FirstChildId = NodeId + 1,
+    Buckets = make_bucket(BucketCfgs, Type, GlobalCfg, Name, ZoneCfg, Factor, FirstChildId, #{}),
+    ChildNum = maps:size(Buckets),
+    NextZoneId = FirstChildId + ChildNum,
+
     Zone = #{id => NodeId,
     Zone = #{id => NodeId,
              name => Name,
              name => Name,
              rate => mul(Rate, Factor),
              rate => mul(Rate, Factor),
              burst => Burst,
              burst => Burst,
              obtained => 0,
              obtained => 0,
-             childs => []},
-    State2 = State#{zones := Zones#{Name => NodeId},
-                    nodes := Nodes#{NodeId => Zone}},
-    make_zone(T, Factor, NodeId + 1, State2);
+             childs => maps:keys(Buckets)
+            },
+
+    make_zone(T, Type, GlobalCfg, Factor, NextZoneId,
+              Zones#{Name => NodeId}, Nodes#{NodeId => Zone}, maps:merge(DelayBuckets, Buckets)
+             );
 
 
-make_zone([], _, NodeId, State2) ->
-    {NodeId, State2}.
+make_zone([], _Type, _Global, _Factor, _NodeId, Zones, Nodes, DelayBuckets) ->
+    {Zones, Nodes, DelayBuckets}.
 
 
-make_bucket([{Name, Conf} | T], Global, Zone, Factor, Id, Buckets, #{type := Type} = State) ->
-    #{zone := ZoneName,
-      aggregated := Aggregated} = Conf,
+make_bucket([{Name, Conf} | T], Type, GlobalCfg, ZoneName, ZoneCfg, Factor, Id, DelayBuckets) ->
     Path = emqx_limiter_manager:make_path(Type, ZoneName, Name),
     Path = emqx_limiter_manager:make_path(Type, ZoneName, Name),
-    case get_counter_rate(Conf, Zone, Global) of
-          infinity ->
-              State2 = State,
-              Rate = infinity,
-              Capacity = infinity,
-              Counter = undefined,
-              Index = undefined,
-              Ref = emqx_limiter_bucket_ref:new(Counter, Index, Rate),
-              emqx_limiter_manager:insert_bucket(Path, Ref);
+    case get_counter_rate(Conf, ZoneCfg, GlobalCfg) of
+        infinity ->
+            Rate = infinity,
+            Capacity = infinity,
+            Ref = emqx_limiter_bucket_ref:new(undefined, undefined, Rate),
+            emqx_limiter_manager:insert_bucket(Path, Ref),
+            InitFun = fun(#{id := NodeId} = Node, #{nodes := Nodes} = State) ->
+                              State#{nodes := Nodes#{NodeId => Node}}
+                      end;
         RawRate ->
         RawRate ->
-            #{capacity := Capacity} = Aggregated,
-            Initial = get_initial_val(Aggregated),
-            {Counter, Index, State2} = alloc_counter(Path, RawRate, Initial, State),
-            Rate = mul(RawRate, Factor)
+            #{capacity := Capacity} = Conf,
+            Initial = get_initial_val(Conf),
+            Rate = mul(RawRate, Factor),
+
+            InitFun = fun(#{id := NodeId} = Node, #{nodes := Nodes} = State) ->
+                              {Counter, Idx, State2} = alloc_counter(Path, RawRate, Initial, State),
+                              Node2 = Node#{counter := Counter, index := Idx},
+                              State2#{nodes := Nodes#{NodeId => Node2}}
+                      end
     end,
     end,
 
 
     Node = #{ id => Id
     Node = #{ id => Id
@@ -530,14 +564,16 @@ make_bucket([{Name, Conf} | T], Global, Zone, Factor, Id, Buckets, #{type := Typ
             , obtained => 0
             , obtained => 0
             , correction => 0
             , correction => 0
             , capacity => Capacity
             , capacity => Capacity
-            , counter => Counter
-            , index => Index},
+            , counter => undefined
+            , index => undefined},
+
+    DelayInit = ?CURRYING(Node, InitFun),
 
 
-    State3 = add_zone_child(Id, Node, ZoneName, State2),
-    make_bucket(T, Global, Zone, Factor, Id + 1, [Id | Buckets], State3);
+    make_bucket(T,
+                Type, GlobalCfg, ZoneName, ZoneCfg, Factor, Id + 1, DelayBuckets#{Id => DelayInit});
 
 
-make_bucket([], _, _, _, _, Buckets, State) ->
-    State#{buckets := Buckets}.
+make_bucket([], _Type, _Global, _ZoneName, _Zone, _Factor, _Id, DelayBuckets) ->
+    DelayBuckets.
 
 
 -spec alloc_counter(emqx_limiter_manager:path(), rate(), capacity(), state()) ->
 -spec alloc_counter(emqx_limiter_manager:path(), rate(), capacity(), state()) ->
           {counters:counters_ref(), pos_integer(), state()}.
           {counters:counters_ref(), pos_integer(), state()}.
@@ -558,20 +594,10 @@ init_counter(Path, Counter, Index, Rate, Initial, State) ->
     emqx_limiter_manager:insert_bucket(Path, Ref),
     emqx_limiter_manager:insert_bucket(Path, Ref),
     {Counter, Index, State}.
     {Counter, Index, State}.
 
 
--spec add_zone_child(node_id(), bucket(), zone_name(), state()) -> state().
-add_zone_child(NodeId, Bucket, Name, #{zones := Zones, nodes := Nodes} = State) ->
-    ZoneId = maps:get(Name, Zones),
-    #{childs := Childs} = Zone = maps:get(ZoneId, Nodes),
-    Nodes2 = Nodes#{ZoneId => Zone#{childs := [NodeId | Childs]},
-                    NodeId => Bucket},
-    State#{nodes := Nodes2}.
-
 %% @doc find first limited node
 %% @doc find first limited node
-get_counter_rate(#{zone := ZoneName,
-                   aggregated := Cfg}, ZoneCfg, Global) ->
-    Zone = maps:get(ZoneName, ZoneCfg),
+get_counter_rate(BucketCfg, ZoneCfg, GlobalCfg) ->
     Search = lists:search(fun(E) -> is_limited(E) end,
     Search = lists:search(fun(E) -> is_limited(E) end,
-                          [Cfg, Zone, Global]),
+                          [BucketCfg, ZoneCfg, GlobalCfg]),
     case Search of
     case Search of
         {value, #{rate := Rate}} ->
         {value, #{rate := Rate}} ->
             Rate;
             Rate;
@@ -585,6 +611,7 @@ is_limited(#{rate := Rate, capacity := Capacity}) ->
 is_limited(#{rate := Rate}) ->
 is_limited(#{rate := Rate}) ->
     Rate =/= infinity.
     Rate =/= infinity.
 
 
+-spec get_initial_val(hocons:config()) -> decimal().
 get_initial_val(#{initial := Initial,
 get_initial_val(#{initial := Initial,
                   rate := Rate,
                   rate := Rate,
                   capacity := Capacity}) ->
                   capacity := Capacity}) ->
@@ -599,5 +626,14 @@ get_initial_val(#{initial := Initial,
             0
             0
     end.
     end.
 
 
-default_rate_burst_cfg() ->
-    #{rate => infinity, burst => 0}.
+-spec make_shared_default_group(hocons:config()) -> honcs:config().
+make_shared_default_group(Cfg) ->
+    GroupName = emqx_limiter_schema:default_group_name(),
+    #{GroupName => Cfg#{rate => infinity, burst => 0}}.
+
+-spec get_bucket_full_cfg_path(limiter_type(), bucket_path()) -> list(atom()).
+get_bucket_full_cfg_path(shared, [BucketName]) ->
+    [limiter, shared, bucket, BucketName];
+
+get_bucket_full_cfg_path(Type, [GroupName, BucketName]) ->
+    [limiter, Type, group, GroupName, bucket, BucketName].

+ 1 - 1
apps/emqx/src/emqx_schema.erl

@@ -1197,7 +1197,7 @@ base_listener() ->
           #{ default => 'default'
           #{ default => 'default'
            })}
            })}
     , {"limiter",
     , {"limiter",
-       sc(map("ratelimit bucket's name", atom()), #{default => #{}})}
+       sc(map("ratelimit bucket's name", emqx_limiter_schema:bucket_path()), #{default => #{}})}
     ].
     ].
 
 
 %% utils
 %% utils

+ 2 - 0
apps/emqx_dashboard/src/emqx_dashboard_swagger.erl

@@ -493,6 +493,8 @@ typename_to_spec("failure_strategy()", _Mod) ->
     #{type => string, example => <<"force">>};
     #{type => string, example => <<"force">>};
 typename_to_spec("initial()", _Mod) ->
 typename_to_spec("initial()", _Mod) ->
     #{type => string, example => <<"0M">>};
     #{type => string, example => <<"0M">>};
+typename_to_spec("bucket_path()", _Mod) ->
+    #{type => string, example => <<"groupName.bucketName">>};
 typename_to_spec(Name, Mod) ->
 typename_to_spec(Name, Mod) ->
     Spec = range(Name),
     Spec = range(Name),
     Spec1 = remote_module_type(Spec, Name, Mod),
     Spec1 = remote_module_type(Spec, Name, Mod),