Просмотр исходного кода

feat(limiter): remove the group(zone) level

firest 4 лет назад
Родитель
Сommit
32030c8369

+ 14 - 22
apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf

@@ -5,46 +5,38 @@
 limiter {
   ## rate limiter for message publish
   bytes_in {
-    group.default {
-      bucket.default {
-        rate = infinity
-        capacity = infinity
-      }
+    bucket.default {
+      rate = infinity
+      capacity = infinity
     }
   }
 
   ## rate limiter for message publish
   message_in {
-    group.default {
-      bucket.default {
-        rate = infinity
-        capacity = infinity
-      }
+    bucket.default {
+      rate = infinity
+      capacity = infinity
     }
   }
 
   ## connection rate limiter
   connection {
-    group.default {
-      bucket.default {
-        rate = infinity
-        capacity = infinity
-      }
+    bucket.default {
+      rate = infinity
+      capacity = infinity
     }
   }
 
   ## rate limiter for message deliver
   message_routing {
-    group.default {
-      bucket.default {
-        rate = infinity
-        capacity = infinity
-      }
+    bucket.default {
+      rate = infinity
+      capacity = infinity
     }
   }
 
-  ## Some functions that don't need to use global and zone scope, them can shared use this type
-  shared {
+  ## rate limiter for internal batch operation
+  batch {
     bucket.retainer {
       rate = infinity
       capacity = infinity

+ 9 - 4
apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl

@@ -28,18 +28,23 @@
 -export_type([token_bucket_limiter/0]).
 
 %% a token bucket limiter with a limiter server's bucket reference
--type token_bucket_limiter() :: #{ tokens := non_neg_integer()  %% the number of tokens currently available
+-type token_bucket_limiter() :: #{ %% the number of tokens currently available
+                                   tokens := non_neg_integer()
                                  , rate := decimal()
                                  , capacity := decimal()
                                  , lasttime := millisecond()
-                                 , max_retry_time := non_neg_integer()      %% @see emqx_limiter_schema
-                                 , failure_strategy := failure_strategy()   %% @see emqx_limiter_schema
+                                   %% @see emqx_limiter_schema
+                                 , max_retry_time := non_neg_integer()
+                                   %% @see emqx_limiter_schema
+                                 , failure_strategy := failure_strategy()
                                  , divisible := boolean()               %% @see emqx_limiter_schema
                                  , low_water_mark := non_neg_integer()  %% @see emqx_limiter_schema
                                  , bucket := bucket() %% the limiter server's bucket
 
                                    %% retry contenxt
-                                 , retry_ctx => undefined                %% undefined meaning there is no retry context or no need to retry
+                                   %% undefined meaning no retry context or no need to retry
+
+                                 , retry_ctx => undefined
                                  | retry_context(token_bucket_limiter()) %% the retry context
                                  , atom => any() %% allow to add other keys
                                  }.

+ 6 - 2
apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl

@@ -30,7 +30,10 @@
 -export_type([container/0, check_result/0]).
 
 -type container() :: #{ limiter_type() => undefined | limiter()
-                      , retry_key() => undefined | retry_context() | future()  %% the retry context of the limiter
+                        %% the retry context of the limiter
+                      , retry_key() => undefined
+                      | retry_context()
+                      | future()
                       , retry_ctx := undefined | any()  %% the retry context of the container
                       }.
 
@@ -62,7 +65,8 @@ new(Types) ->
 %% @doc generate a container
 %% according to the type of limiter and the bucket name configuration of the limiter
 %% @end
--spec get_limiter_by_names(list(limiter_type()), #{limiter_type() => emqx_limiter_schema:bucket_name()}) -> container().
+-spec get_limiter_by_names(list(limiter_type()),
+                           #{limiter_type() => emqx_limiter_schema:bucket_name()}) -> container().
 get_limiter_by_names(Types, BucketNames) ->
     Init = fun(Type, Acc) ->
                    Limiter = emqx_limiter_server:connect(Type, BucketNames),

+ 11 - 19
apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl

@@ -24,7 +24,8 @@
 %% API
 -export([ start_link/0, start_server/1, find_bucket/1
         , find_bucket/2, insert_bucket/2, insert_bucket/3
-        , make_path/2, make_path/3, restart_server/1]).
+        , make_path/2, restart_server/1
+        ]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -34,9 +35,7 @@
 
 -type path() :: list(atom()).
 -type limiter_type() :: emqx_limiter_schema:limiter_type().
--type zone_name() :: emqx_limiter_schema:zone_name().
 -type bucket_name() :: emqx_limiter_schema:bucket_name().
--type bucket_path() :: emqx_limiter_schema:bucket_path().
 
 %% counter record in ets table
 -record(bucket, { path :: path()
@@ -58,10 +57,10 @@ start_server(Type) ->
 restart_server(Type) ->
     emqx_limiter_server_sup:restart(Type).
 
--spec find_bucket(limiter_type(), bucket_path()) ->
+-spec find_bucket(limiter_type(), bucket_name()) ->
           {ok, bucket_ref()} | undefined.
-find_bucket(Type, BucketPath) ->
-    find_bucket(make_path(Type, BucketPath)).
+find_bucket(Type, BucketName) ->
+    find_bucket(make_path(Type, BucketName)).
 
 -spec find_bucket(path()) -> {ok, bucket_ref()} | undefined.
 find_bucket(Path) ->
@@ -73,26 +72,19 @@ find_bucket(Path) ->
     end.
 
 -spec insert_bucket(limiter_type(),
-                    bucket_path(),
+                    bucket_name(),
                     bucket_ref()) -> boolean().
-insert_bucket(Type, BucketPath, Bucket) ->
-    inner_insert_bucket(make_path(Type, BucketPath), Bucket).
+insert_bucket(Type, BucketName, Bucket) ->
+    inner_insert_bucket(make_path(Type, BucketName), Bucket).
 
 
 -spec insert_bucket(path(), bucket_ref()) -> true.
 insert_bucket(Path, Bucket) ->
     inner_insert_bucket(Path, Bucket).
 
--spec make_path(limiter_type(), bucket_path()) -> path().
-make_path(Type, BucketPath) ->
-    [Type | BucketPath].
-
--spec make_path(limiter_type(), zone_name(), bucket_name()) -> path().
-make_path(shared, _GroupName, BucketName) ->
-    [shared, BucketName];
-
-make_path(Type, GroupName, BucketName) ->
-    [Type, GroupName, BucketName].
+-spec make_path(limiter_type(), bucket_name()) -> path().
+make_path(Type, BucketName) ->
+    [Type | BucketName].
 
 %%--------------------------------------------------------------------
 %% @doc

+ 14 - 41
apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl

@@ -20,7 +20,8 @@
 
 -export([ roots/0, fields/1, to_rate/1, to_capacity/1
         , minimum_period/0, to_burst_rate/1, to_initial/1
-        , namespace/0, to_bucket_path/1, default_group_name/0]).
+        , namespace/0, get_bucket_cfg_path/2
+        ]).
 
 -define(KILOBYTE, 1024).
 
@@ -31,7 +32,6 @@
                       | shared.
 
 -type bucket_name() :: atom().
--type zone_name() :: atom().
 -type rate() :: infinity | float().
 -type burst_rate() :: 0 | float().
 -type capacity() :: infinity | number().    %% the capacity of the token bucket
@@ -47,17 +47,15 @@
 -typerefl_from_string({burst_rate/0, ?MODULE, to_burst_rate}).
 -typerefl_from_string({capacity/0, ?MODULE, to_capacity}).
 -typerefl_from_string({initial/0, ?MODULE, to_initial}).
--typerefl_from_string({bucket_path/0, ?MODULE, to_bucket_path}).
 
 -reflect_type([ rate/0
               , burst_rate/0
               , capacity/0
               , initial/0
               , failure_strategy/0
-              , bucket_path/0
               ]).
 
--export_type([limiter_type/0, bucket_name/0, zone_name/0]).
+-export_type([limiter_type/0, bucket_name/0, bucket_path/0]).
 
 -import(emqx_schema, [sc/2, map/2]).
 -define(UNIT_TIME_IN_MS, 1000).
@@ -67,38 +65,24 @@ namespace() -> limiter.
 roots() -> [limiter].
 
 fields(limiter) ->
-    [ {bytes_in, sc(ref(limiter_opts), #{})}
-    , {message_in, sc(ref(limiter_opts), #{})}
-    , {connection, sc(ref(limiter_opts), #{})}
-    , {message_routing, sc(ref(limiter_opts), #{})}
-    , {shared, sc(ref(shared_opts),
-                  #{description =>
-                        <<"Some functions that do not need to use global and zone scope,"
-                          "them can shared use this type">>})}
+    [ {bytes_in, sc(ref(limiter_opts), #{description => <<"Limiter of message publish bytes">>})}
+    , {message_in, sc(ref(limiter_opts), #{description => <<"Message publish limiter">>})}
+    , {connection, sc(ref(limiter_opts), #{description => <<"Connection limiter">>})}
+    , {message_routing, sc(ref(limiter_opts), #{description => <<"Deliver limiter">>})}
+    , {batch, sc(ref(limiter_opts),
+                 #{description => <<"Internal batch operation limiter">>})}
     ];
 
 fields(limiter_opts) ->
-    fields(rate_burst) ++ %% the node global limit
-        [ {group, sc(map("group name", ref(group_opts)), #{})}
-        ];
-
-fields(group_opts) ->
-    fields(rate_burst) ++  %% the group limite
-        [ {bucket, sc(map("bucket name", ref(bucket_opts)), #{})}
-        ];
-
-fields(rate_burst) ->
     [ {rate, sc(rate(), #{default => "infinity"})}
     , {burst, sc(burst_rate(), #{default => "0/0s"})}
+    , {bucket, sc(map("bucket name", ref(bucket_opts)), #{})}
     ];
 
-fields(shared_opts) ->
-    [{bucket, sc(map("bucket name", ref(bucket_opts)), #{})}];
-
 fields(bucket_opts) ->
     [ {rate, sc(rate(), #{})}
-    , {initial, sc(initial(), #{default => "0"})}
     , {capacity, sc(capacity(), #{})}
+    , {initial, sc(initial(), #{default => "0"})}
     , {per_client, sc(ref(client_bucket),
                       #{default => #{},
                         desc => "The rate limit for each user of the bucket,"
@@ -137,9 +121,9 @@ minimum_period() ->
 to_rate(Str) ->
     to_rate(Str, true, false).
 
-%% default group name for shared type limiter
-default_group_name() ->
-    '_default'.
+-spec get_bucket_cfg_path(limiter_type(), bucket_name()) -> bucket_path().
+get_bucket_cfg_path(Type, BucketName) ->
+    [limiter, Type, bucket, BucketName].
 
 %%--------------------------------------------------------------------
 %% Internal functions
@@ -213,14 +197,3 @@ apply_unit("kb", Val) -> Val * ?KILOBYTE;
 apply_unit("mb", Val) -> Val * ?KILOBYTE * ?KILOBYTE;
 apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE;
 apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit).
-
-to_bucket_path(Str) ->
-    Dirs = [erlang:list_to_atom(string:trim(T)) || T <- string:tokens(Str, ".")],
-    case Dirs of
-        [_Group, _Bucket] = Path ->
-            {ok, Path};
-        [_Bucket] = Path ->
-            {ok, Path};
-        _ ->
-            {error, Str}
-    end.

+ 130 - 272
apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl

@@ -34,28 +34,16 @@
          terminate/2, code_change/3, format_status/2]).
 
 -export([ start_link/1, connect/2, info/1
-        , name/1, get_initial_val/1]).
+        , name/1, get_initial_val/1
+        ]).
 
 -type root() :: #{ rate := rate()             %% number of tokens generated per period
                  , burst := rate()
                  , period := pos_integer()    %% token generation interval(second)
-                 , childs := list(node_id())  %% node children
                  , consumed := non_neg_integer()
                  }.
 
--type zone() :: #{ id := node_id()
-                 , name := zone_name()
-                 , rate := rate()
-                 , burst := rate()
-                 , obtained := non_neg_integer()       %% number of tokens obtained
-                 , childs := list(node_id())
-                 }.
-
--type bucket() :: #{ id := node_id()
-                   , name := bucket_name()
-                     %% pointer to zone node, use for burst
-                     %% it also can use nodeId, nodeId is more direct, but nodeName is clearer
-                   , zone := zone_name()
+-type bucket() :: #{ name := bucket_name()
                    , rate := rate()
                    , obtained := non_neg_integer()
                    , correction := emqx_limiter_decimal:zero_or_float() %% token correction value
@@ -64,19 +52,14 @@
                    , index := undefined | index()
                    }.
 
--type state() :: #{ root := undefined | root()
+-type state() :: #{ type := limiter_type()
+                  , root := undefined | root()
+                  , buckets := buckets()
                   , counter := undefined | counters:counters_ref() %% current counter to alloc
                   , index := index()
-                  , zones := #{zone_name() => node_id()}
-                  , buckets := list(node_id())
-                  , nodes := nodes()
-                  , type := limiter_type()
                   }.
 
--type node_id() :: pos_integer().
--type node_data() :: zone() | bucket().
--type nodes() :: #{node_id() => node_data()}.
--type zone_name() :: emqx_limiter_schema:zone_name().
+-type buckets() :: #{bucket_name() => bucket()}.
 -type limiter_type() :: emqx_limiter_schema:limiter_type().
 -type bucket_name() :: emqx_limiter_schema:bucket_name().
 -type rate() :: decimal().
@@ -84,11 +67,10 @@
 -type capacity() :: decimal().
 -type decimal() :: emqx_limiter_decimal:decimal().
 -type index() :: pos_integer().
--type bucket_path() :: emqx_limiter_schema:bucket_path().
 
 -define(CALL(Type), gen_server:call(name(Type), ?FUNCTION_NAME)).
 -define(OVERLOAD_MIN_ALLOC, 0.3).  %% minimum coefficient for overloaded limiter
--define(CURRYING(X, Fun2), fun(Y) -> Fun2(X, Y) end).
+-define(CURRYING(X, F2), fun(Y) -> F2(X, Y) end).
 
 -export_type([index/0]).
 -import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]).
@@ -99,28 +81,22 @@
 %% API
 %%--------------------------------------------------------------------
 -spec connect(limiter_type(),
-              bucket_path() | #{limiter_type() => bucket_path() | undefined}) ->
+              bucket_name() | #{limiter_type() => bucket_name() | undefined}) ->
           emqx_htb_limiter:limiter().
 %% If no bucket path is set in config, there will be no limit
 connect(_Type, undefined) ->
     emqx_htb_limiter:make_infinity_limiter(undefined);
 
-%% Shared type can use bucket name directly
-connect(shared, BucketName) when is_atom(BucketName) ->
-    connect(shared, [BucketName]);
-
-connect(Type, BucketPath) when is_list(BucketPath) ->
-    FullPath = get_bucket_full_cfg_path(Type, BucketPath),
-    case emqx:get_config(FullPath, undefined) of
-                       undefined ->
-                           io:format(">>>>> config:~p~n fullpath:~p~n", [emqx:get_config([limiter]), FullPath]),
-                           io:format(">>>>> ets:~p~n", [ets:tab2list(emqx_limiter_counters)]),
-                           ?SLOG(error, #{msg => "bucket_config_not_found", path => BucketPath}),
-                           throw("bucket's config not found");
-                       #{rate := AggrRate,
-                         capacity := AggrSize,
-                         per_client := #{rate := CliRate, capacity := CliSize} = Cfg} ->
-                           case emqx_limiter_manager:find_bucket(Type, BucketPath) of
+connect(Type, BucketName) when is_atom(BucketName) ->
+    CfgPath = emqx_limiter_schema:get_bucket_cfg_path(Type, BucketName),
+    case emqx:get_config(CfgPath, undefined) of
+        undefined ->
+            ?SLOG(error, #{msg => "bucket_config_not_found", path => CfgPath}),
+            throw("bucket's config not found");
+        #{rate := AggrRate,
+          capacity := AggrSize,
+          per_client := #{rate := CliRate, capacity := CliSize} = Cfg} ->
+            case emqx_limiter_manager:find_bucket(Type, BucketName) of
                 {ok, Bucket} ->
                     if CliRate < AggrRate orelse CliSize < AggrSize ->
                             emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket);
@@ -130,8 +106,7 @@ connect(Type, BucketPath) when is_list(BucketPath) ->
                             emqx_htb_limiter:make_ref_limiter(Cfg, Bucket)
                     end;
                 undefined ->
-                    io:format(">>>>> ets:~p~n", [ets:tab2list(emqx_limiter_counters)]),
-                    ?SLOG(error, #{msg => "bucket_not_found", path => BucketPath}),
+                    ?SLOG(error, #{msg => "bucket_not_found", path => CfgPath}),
                     throw("invalid bucket")
             end
     end;
@@ -172,13 +147,12 @@ start_link(Type) ->
           {stop, Reason :: term()} |
           ignore.
 init([Type]) ->
-    State = #{root => undefined,
-              counter => undefined,
-              index => 1,
-              zones => #{},
-              nodes => #{},
-              buckets => [],
-              type => Type},
+    State = #{ type => Type
+             , root => undefined
+             , counter => undefined
+             , index => 1
+             , buckets => #{}
+             },
     State2 = init_tree(Type, State),
     #{root := #{period := Perido}} = State2,
     oscillate(Perido),
@@ -289,59 +263,38 @@ oscillate(Interval) ->
 -spec oscillation(state()) -> state().
 oscillation(#{root := #{rate := Flow,
                         period := Interval,
-                        childs := ChildIds,
                         consumed := Consumed} = Root,
-              nodes := Nodes} = State) ->
+              buckets := Buckets} = State) ->
     oscillate(Interval),
-    Childs = get_ordered_childs(ChildIds, Nodes),
-    {Alloced, Nodes2} = transverse(Childs, Flow, 0, Nodes),
-    maybe_burst(State#{nodes := Nodes2,
+    Ordereds = get_ordered_buckets(Buckets),
+    {Alloced, Buckets2} = transverse(Ordereds, Flow, 0, Buckets),
+    maybe_burst(State#{buckets := Buckets2,
                        root := Root#{consumed := Consumed + Alloced}}).
 
 %% @doc horizontal spread
--spec transverse(list(node_data()),
+-spec transverse(list(bucket()),
                  flow(),
                  non_neg_integer(),
-                 nodes()) -> {non_neg_integer(), nodes()}.
-transverse([H | T], InFlow, Alloced, Nodes) when InFlow > 0 ->
-    {NodeAlloced, Nodes2} = longitudinal(H, InFlow, Nodes),
-    InFlow2 = sub(InFlow, NodeAlloced),
-    Alloced2 = Alloced + NodeAlloced,
-    transverse(T, InFlow2, Alloced2, Nodes2);
+                 buckets()) -> {non_neg_integer(), buckets()}.
+transverse([H | T], InFlow, Alloced, Buckets) when InFlow > 0 ->
+    {BucketAlloced, Buckets2} = longitudinal(H, InFlow, Buckets),
+    InFlow2 = sub(InFlow, BucketAlloced),
+    Alloced2 = Alloced + BucketAlloced,
+    transverse(T, InFlow2, Alloced2, Buckets2);
 
-transverse(_, _, Alloced, Nodes) ->
-    {Alloced, Nodes}.
+transverse(_, _, Alloced, Buckets) ->
+    {Alloced, Buckets}.
 
 %% @doc vertical spread
--spec longitudinal(node_data(), flow(), nodes()) ->
-          {non_neg_integer(), nodes()}.
-longitudinal(#{id := Id,
-               rate := Rate,
-               obtained := Obtained,
-               childs := ChildIds} = Node, InFlow, Nodes) ->
-    Flow = erlang:min(InFlow, Rate),
-
-    if Flow > 0 ->
-            Childs = get_ordered_childs(ChildIds, Nodes),
-            {Alloced, Nodes2} = transverse(Childs, Flow, 0, Nodes),
-            if Alloced > 0 ->
-                    {Alloced,
-                     Nodes2#{Id => Node#{obtained := Obtained + Alloced}}};
-               true ->
-                    %% childs are empty or all counter childs are full
-                    {0, Nodes2}
-            end;
-       true ->
-            {0, Nodes}
-    end;
-
-longitudinal(#{id := Id,
+-spec longitudinal(bucket(), flow(), buckets()) ->
+          {non_neg_integer(), buckets()}.
+longitudinal(#{name := Name,
                rate := Rate,
                capacity := Capacity,
                counter := Counter,
                index := Index,
-               obtained := Obtained} = Node,
-             InFlow, Nodes) when Counter =/= undefined ->
+               obtained := Obtained} = Bucket,
+             InFlow, Buckets) when Counter =/= undefined ->
     Flow = erlang:min(InFlow, Rate),
 
     ShouldAlloc =
@@ -361,224 +314,149 @@ longitudinal(#{id := Id,
             %% XXX if capacity is infinity, and flow always > 0, the value in
             %% counter will be overflow at some point in the future, do we need
             %% to deal with this situation???
-            {Inc, Node2} = emqx_limiter_correction:add(Available, Node),
+            {Inc, Bucket2} = emqx_limiter_correction:add(Available, Bucket),
             counters:add(Counter, Index, Inc),
 
             {Inc,
-             Nodes#{Id := Node2#{obtained := Obtained + Inc}}};
+             Buckets#{Name := Bucket2#{obtained := Obtained + Inc}}};
         _ ->
-            {0, Nodes}
+            {0, Buckets}
     end;
 
-longitudinal(_, _, Nodes) ->
-    {0, Nodes}.
+longitudinal(_, _, Buckets) ->
+    {0, Buckets}.
 
--spec get_ordered_childs(list(node_id()), nodes()) -> list(node_data()).
-get_ordered_childs(Ids, Nodes) ->
-    Childs = [maps:get(Id, Nodes) || Id <- Ids],
+-spec get_ordered_buckets(list(bucket()) | buckets()) -> list(bucket()).
+get_ordered_buckets(Buckets) when is_map(Buckets) ->
+    BucketList = maps:values(Buckets),
+    get_ordered_buckets(BucketList);
 
+get_ordered_buckets(Buckets) ->
     %% sort by obtained, avoid node goes hungry
     lists:sort(fun(#{obtained := A}, #{obtained := B}) ->
                        A < B
                end,
-               Childs).
+               Buckets).
 
 -spec maybe_burst(state()) -> state().
 maybe_burst(#{buckets := Buckets,
-              zones := Zones,
-              root := #{burst := Burst},
-              nodes := Nodes} = State) when Burst > 0 ->
-    %% find empty buckets and group by zone name
-    GroupFun = fun(Id, Groups) ->
-                       %% TODO filter undefined counter
-                       #{counter := Counter,
-                         index := Index,
-                         zone := Zone} = maps:get(Id, Nodes),
-                       case counters:get(Counter, Index) of
-                           Any when Any =< 0 ->
-                               Group = maps:get(Zone, Groups, []),
-                               maps:put(Zone, [Id | Group], Groups);
-                           _ ->
-                               Groups
-                       end
-               end,
+              root := #{burst := Burst}} = State) when Burst > 0 ->
+    Fold = fun(_Name, #{counter := Cnt, index := Idx} = Bucket, Acc) when Cnt =/= undefined ->
+                   case counters:get(Cnt, Idx) > 0 of
+                       true ->
+                           Acc;
+                       false ->
+                           [Bucket | Acc]
+                   end;
+              (_Name, _Bucket, Acc) ->
+                   Acc
+           end,
 
-    case lists:foldl(GroupFun, #{}, Buckets) of
-        Groups when map_size(Groups) > 0 ->
-            %% remove the zone which don't support burst
-            Filter = fun({Name, Childs}, Acc) ->
-                             ZoneId = maps:get(Name, Zones),
-                             #{burst := ZoneBurst} = Zone = maps:get(ZoneId, Nodes),
-                             case ZoneBurst > 0 of
-                                 true ->
-                                     [{Zone, Childs} | Acc];
-                                 _ ->
-                                     Acc
-                             end
-                     end,
-
-            FilterL = lists:foldl(Filter, [], maps:to_list(Groups)),
-            dispatch_burst(FilterL, State);
-        _ ->
-            State
-    end;
+    Empties = maps:fold(Fold, [], Buckets),
+    dispatch_burst(Empties, Burst, State);
 
 maybe_burst(State) ->
     State.
 
--spec dispatch_burst(list({zone(), list(node_id())}), state()) -> state().
-dispatch_burst([], State) ->
+-spec dispatch_burst(list(bucket()), non_neg_integer(), state()) -> state().
+dispatch_burst([], _, State) ->
     State;
 
-dispatch_burst(GroupL,
-               #{root := #{burst := Burst},
-                 nodes := Nodes} = State) ->
-    InFlow = Burst / erlang:length(GroupL),
-    Dispatch = fun({Zone, Childs}, NodeAcc) ->
-                       #{id := ZoneId,
-                         burst := ZoneBurst,
-                         obtained := Obtained} = Zone,
-
-                       case erlang:min(InFlow, ZoneBurst) of
-                           0 -> NodeAcc;
-                           ZoneFlow ->
-                               EachFlow = ZoneFlow / erlang:length(Childs),
-                               {Alloced, NodeAcc2} =
-                                   dispatch_burst_to_buckets(Childs, EachFlow, 0, NodeAcc),
-                               Zone2 = Zone#{obtained := Obtained + Alloced},
-                               NodeAcc2#{ZoneId := Zone2}
-                       end
-               end,
-    State#{nodes := lists:foldl(Dispatch, Nodes, GroupL)}.
+dispatch_burst(Empties, InFlow, #{consumed := Consumed, buckets := Buckets} = State) ->
+    EachFlow = InFlow / erlang:length(Empties),
+    {Alloced, Buckets2} = dispatch_burst_to_buckets(Empties, EachFlow, 0, Buckets),
+    State#{consumed := Consumed + Alloced, buckets := Buckets2}.
 
--spec dispatch_burst_to_buckets(list(node_id()),
+-spec dispatch_burst_to_buckets(list(bucket()),
                                 float(),
-                                non_neg_integer(), nodes()) -> {non_neg_integer(), nodes()}.
-dispatch_burst_to_buckets([ChildId | T], InFlow, Alloced, Nodes) ->
-    #{counter := Counter,
+                                non_neg_integer(), buckets()) -> {non_neg_integer(), buckets()}.
+dispatch_burst_to_buckets([Bucket | T], InFlow, Alloced, Buckets) ->
+    #{name := Name,
+      counter := Counter,
       index := Index,
-      obtained := Obtained} = Bucket = maps:get(ChildId, Nodes),
+      obtained := Obtained} = Bucket,
     {Inc, Bucket2} = emqx_limiter_correction:add(InFlow, Bucket),
 
     counters:add(Counter, Index, Inc),
 
-    Nodes2 = Nodes#{ChildId := Bucket2#{obtained := Obtained + Inc}},
-    dispatch_burst_to_buckets(T, InFlow, Alloced + Inc, Nodes2);
+    Buckets2 = Buckets#{Name := Bucket2#{obtained := Obtained + Inc}},
+    dispatch_burst_to_buckets(T, InFlow, Alloced + Inc, Buckets2);
 
-dispatch_burst_to_buckets([], _, Alloced, Nodes) ->
-    {Alloced, Nodes}.
+dispatch_burst_to_buckets([], _, Alloced, Buckets) ->
+    {Alloced, Buckets}.
 
 -spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state().
 init_tree(Type, State) ->
-    Cfg = emqx:get_config([limiter, Type]),
-    GlobalCfg = maps:merge(#{rate => infinity, burst => 0}, Cfg),
-    case GlobalCfg of
-        #{group := Group} -> ok;
-        #{bucket := _} ->
-            Group = make_shared_default_group(GlobalCfg),
-            ok
-    end,
-
-    {Factor, Root} = make_root(GlobalCfg),
-    {Zones, Nodes, DelayBuckets} = make_zone(maps:to_list(Group), Type,
-                                             GlobalCfg, Factor, 1, #{}, #{}, #{}),
+    #{bucket := Buckets} = Cfg = emqx:get_config([limiter, Type]),
+    {Factor, Root} = make_root(Cfg),
+    {CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, Factor, 1, []),
 
-    State2 = State#{root := Root#{childs := maps:values(Zones)},
-                    zones := Zones,
-                    nodes := Nodes,
-                    buckets := maps:keys(DelayBuckets),
-                    counter := counters:new(maps:size(DelayBuckets), [write_concurrency])
+    State2 = State#{root := Root,
+                    counter := counters:new(CounterNum, [write_concurrency])
                    },
 
-    lists:foldl(fun(F, Acc) -> F(Acc) end, State2, maps:values(DelayBuckets)).
+    lists:foldl(fun(F, Acc) -> F(Acc) end, State2, DelayBuckets).
 
 -spec make_root(hocons:confg()) -> {number(), root()}.
+make_root(#{rate := Rate, burst := Burst}) when Rate >= 1 ->
+    {1, #{rate => Rate,
+          burst => Burst,
+          period => emqx_limiter_schema:minimum_period(),
+          consumed => 0}};
+
 make_root(#{rate := Rate, burst := Burst}) ->
     MiniPeriod = emqx_limiter_schema:minimum_period(),
-    if Rate >= 1 ->
-            {1, #{rate => Rate,
-                  burst => Burst,
-                  period => MiniPeriod,
-                  childs => [],
-                  consumed => 0}};
-       true ->
-            Factor = 1 / Rate,
-            {Factor, #{rate => 1,
-                       burst => Burst * Factor,
-                       period => erlang:floor(Factor * MiniPeriod),
-                       childs => [],
-                       consumed => 0}}
-    end.
-
-make_zone([{Name, ZoneCfg} | T], Type, GlobalCfg, Factor, NodeId, Zones, Nodes, DelayBuckets) ->
-    #{rate := Rate, burst := Burst, bucket := BucketMap} = ZoneCfg,
-    BucketCfgs = maps:to_list(BucketMap),
-
-    FirstChildId = NodeId + 1,
-    Buckets = make_bucket(BucketCfgs, Type, GlobalCfg, Name, ZoneCfg, Factor, FirstChildId, #{}),
-    ChildNum = maps:size(Buckets),
-    NextZoneId = FirstChildId + ChildNum,
-
-    Zone = #{id => NodeId,
-             name => Name,
-             rate => mul(Rate, Factor),
-             burst => Burst,
-             obtained => 0,
-             childs => maps:keys(Buckets)
-            },
-
-    make_zone(T, Type, GlobalCfg, Factor, NextZoneId,
-              Zones#{Name => NodeId}, Nodes#{NodeId => Zone}, maps:merge(DelayBuckets, Buckets)
-             );
-
-make_zone([], _Type, _Global, _Factor, _NodeId, Zones, Nodes, DelayBuckets) ->
-    {Zones, Nodes, DelayBuckets}.
-
-make_bucket([{Name, Conf} | T], Type, GlobalCfg, ZoneName, ZoneCfg, Factor, Id, DelayBuckets) ->
-    Path = emqx_limiter_manager:make_path(Type, ZoneName, Name),
-    case get_counter_rate(Conf, ZoneCfg, GlobalCfg) of
+    Factor = 1 / Rate,
+    {Factor, #{rate => 1,
+               burst => Burst * Factor,
+               period => erlang:floor(Factor * MiniPeriod),
+               consumed => 0}}.
+
+make_bucket([{Name, Conf} | T], Type, GlobalCfg, Factor, CounterNum, DelayBuckets) ->
+    Path = emqx_limiter_manager:make_path(Type, Name),
+    case get_counter_rate(Conf, GlobalCfg) of
         infinity ->
             Rate = infinity,
             Capacity = infinity,
             Ref = emqx_limiter_bucket_ref:new(undefined, undefined, Rate),
             emqx_limiter_manager:insert_bucket(Path, Ref),
-            InitFun = fun(#{id := NodeId} = Node, #{nodes := Nodes} = State) ->
-                              State#{nodes := Nodes#{NodeId => Node}}
+            CounterNum2 = CounterNum,
+            InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) ->
+                              State#{buckets := Buckets#{BucketName => Bucket}}
                       end;
         RawRate ->
             #{capacity := Capacity} = Conf,
             Initial = get_initial_val(Conf),
             Rate = mul(RawRate, Factor),
-
-            InitFun = fun(#{id := NodeId} = Node, #{nodes := Nodes} = State) ->
+            CounterNum2 = CounterNum + 1,
+            InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) ->
                               {Counter, Idx, State2} = alloc_counter(Path, RawRate, Initial, State),
-                              Node2 = Node#{counter := Counter, index := Idx},
-                              State2#{nodes := Nodes#{NodeId => Node2}}
+                              Bucket2 = Bucket#{counter := Counter, index := Idx},
+                              State2#{buckets := Buckets#{BucketName => Bucket2}}
                       end
     end,
 
-    Node = #{ id => Id
-            , name => Name
-            , zone => ZoneName
-            , rate => Rate
-            , obtained => 0
-            , correction => 0
-            , capacity => Capacity
-            , counter => undefined
-            , index => undefined},
+    Bucket = #{ name => Name
+              , rate => Rate
+              , obtained => 0
+              , correction => 0
+              , capacity => Capacity
+              , counter => undefined
+              , index => undefined},
 
-    DelayInit = ?CURRYING(Node, InitFun),
+    DelayInit = ?CURRYING(Bucket, InitFun),
 
     make_bucket(T,
-                Type, GlobalCfg, ZoneName, ZoneCfg, Factor, Id + 1, DelayBuckets#{Id => DelayInit});
+                Type, GlobalCfg, Factor, CounterNum2, [DelayInit | DelayBuckets]);
 
-make_bucket([], _Type, _Global, _ZoneName, _Zone, _Factor, _Id, DelayBuckets) ->
-    DelayBuckets.
+make_bucket([], _Type, _Global, _Factor, CounterNum, DelayBuckets) ->
+    {CounterNum, DelayBuckets}.
 
 -spec alloc_counter(emqx_limiter_manager:path(), rate(), capacity(), state()) ->
           {counters:counters_ref(), pos_integer(), state()}.
 alloc_counter(Path, Rate, Initial,
               #{counter := Counter, index := Index} = State) ->
+
     case emqx_limiter_manager:find_bucket(Path) of
         {ok, #{counter := ECounter,
                index := EIndex}} when ECounter =/= undefined ->
@@ -594,22 +472,14 @@ init_counter(Path, Counter, Index, Rate, Initial, State) ->
     emqx_limiter_manager:insert_bucket(Path, Ref),
     {Counter, Index, State}.
 
-%% @doc find first limited node
-get_counter_rate(BucketCfg, ZoneCfg, GlobalCfg) ->
-    Search = lists:search(fun(E) -> is_limited(E) end,
-                          [BucketCfg, ZoneCfg, GlobalCfg]),
-    case Search of
-        {value, #{rate := Rate}} ->
-            Rate;
-        false ->
-            infinity
-    end.
 
-is_limited(#{rate := Rate, capacity := Capacity}) ->
-    Rate =/= infinity orelse Capacity =/= infinity;
+%% @doc find first limited node
+get_counter_rate(#{rate := Rate, capacity := Capacity}, _GlobalCfg)
+  when Rate =/= infinity orelse Capacity =/= infinity ->  %% TODO maybe no need to check capacity
+    Rate;
 
-is_limited(#{rate := Rate}) ->
-    Rate =/= infinity.
+get_counter_rate(_Cfg, #{rate := Rate})  ->
+    Rate.
 
 -spec get_initial_val(hocons:config()) -> decimal().
 get_initial_val(#{initial := Initial,
@@ -625,15 +495,3 @@ get_initial_val(#{initial := Initial,
        true ->
             0
     end.
-
--spec make_shared_default_group(hocons:config()) -> honcs:config().
-make_shared_default_group(Cfg) ->
-    GroupName = emqx_limiter_schema:default_group_name(),
-    #{GroupName => Cfg#{rate => infinity, burst => 0}}.
-
--spec get_bucket_full_cfg_path(limiter_type(), bucket_path()) -> list(atom()).
-get_bucket_full_cfg_path(shared, [BucketName]) ->
-    [limiter, shared, bucket, BucketName];
-
-get_bucket_full_cfg_path(Type, [GroupName, BucketName]) ->
-    [limiter, Type, group, GroupName, bucket, BucketName].

+ 1 - 1
apps/emqx/src/emqx_schema.erl

@@ -1197,7 +1197,7 @@ base_listener() ->
           #{ default => 'default'
            })}
     , {"limiter",
-       sc(map("ratelimit bucket's name", emqx_limiter_schema:bucket_path()), #{default => #{}})}
+       sc(map("ratelimit's type", atom()), #{default => #{}})}
     ].
 
 %% utils

+ 1 - 3
apps/emqx_retainer/etc/emqx_retainer.conf

@@ -58,9 +58,7 @@ retainer {
     batch_deliver_number = 0
 
     ## deliver limiter bucket
-    ##
-    ## Default: 0s
-    limiter_bucket_name = retainer
+    limiter.batch = retainer
   }
 
   ## Maximum retained message size.

+ 4 - 4
apps/emqx_retainer/src/emqx_retainer_dispatcher.erl

@@ -85,8 +85,8 @@ start_link(Pool, Id) ->
 init([Pool, Id]) ->
     erlang:process_flag(trap_exit, true),
     true = gproc_pool:connect_worker(Pool, {Pool, Id}),
-    Bucket = emqx:get_config([retainer, flow_control, limiter_bucket_name]),
-    Limiter = emqx_limiter_server:connect(shared, Bucket),
+    LimiterCfg = emqx:get_config([retainer, flow_control, limiter]),
+    Limiter = emqx_limiter_server:connect(batch, LimiterCfg),
     {ok, #{pool => Pool, id => Id, limiter => Limiter}}.
 
 %%--------------------------------------------------------------------
@@ -124,8 +124,8 @@ handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) ->
     {noreply, State#{limiter := Limiter2}};
 
 handle_cast(refresh_limiter, State) ->
-    Bucket = emqx:get_config([retainer, flow_control, limiter_bucket_name]),
-    Limiter = emqx_limiter_server:connect(shared, Bucket),
+    LimiterCfg = emqx:get_config([retainer, flow_control, limiter]),
+    Limiter = emqx_limiter_server:connect(batch, LimiterCfg),
     {noreply, State#{limiter := Limiter}};
 
 handle_cast(Msg, State) ->

+ 1 - 1
apps/emqx_retainer/src/emqx_retainer_schema.erl

@@ -29,7 +29,7 @@ fields(mnesia_config) ->
 fields(flow_control) ->
     [ {batch_read_number, sc(integer(), 0, fun is_pos_integer/1)}
     , {batch_deliver_number, sc(range(0, 1000), 0)}
-    , {limiter_bucket_name, sc(atom(), retainer)}
+    , {limiter, sc(emqx_schema:map("limiter's type", atom()), #{})}
     ].
 
 %%--------------------------------------------------------------------