Sfoglia il codice sorgente

fix(limiter): improve test case and fix some bugs

firest 4 anni fa
parent
commit
d28b34f0d1

+ 18 - 15
apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl

@@ -22,7 +22,7 @@
 
 %% API
 -export([ make_token_bucket_limiter/2, make_ref_limiter/2, check/2
-        , consume/2, set_retry/2, retry/1, make_infinity_limiter/1
+        , consume/2, set_retry/2, retry/1, make_infinity_limiter/0
         , make_future/1, available/1
         ]).
 -export_type([token_bucket_limiter/0]).
@@ -108,6 +108,8 @@
 
 -import(emqx_limiter_decimal, [sub/2, mul/2, floor_div/2, add/2]).
 
+-elvis([{elvis_style, no_if_expression, disable}]).
+
 %%--------------------------------------------------------------------
 %%  API
 %%--------------------------------------------------------------------
@@ -124,8 +126,8 @@ make_token_bucket_limiter(Cfg, Bucket) ->
 make_ref_limiter(Cfg, Bucket) when Bucket =/= infinity ->
     Cfg#{bucket => Bucket}.
 
--spec make_infinity_limiter(limiter_bucket_cfg()) -> infinity.
-make_infinity_limiter(_) ->
+-spec make_infinity_limiter() -> infinity.
+make_infinity_limiter() ->
     infinity.
 
 %% @doc request some tokens
@@ -252,12 +254,11 @@ try_consume(_, _, Limiter) ->
 
 -spec do_check(acquire_type(Limiter), Limiter) -> inner_check_result(Limiter)
               when Limiter :: limiter().
-do_check(Need, #{tokens := Tokens} = Limiter) ->
-    if Need =< Tokens ->
-            do_check_with_parent_limiter(Need, Limiter);
-       true ->
-            do_reset(Need, Limiter)
-    end;
+do_check(Need, #{tokens := Tokens} = Limiter) when Need =< Tokens ->
+    do_check_with_parent_limiter(Need, Limiter);
+
+do_check(Need, #{tokens := _} = Limiter) ->
+    do_reset(Need, Limiter);
 
 do_check(Need, #{divisible := Divisible,
                  bucket := Bucket} = Ref) ->
@@ -280,7 +281,8 @@ on_failure(throw, Limiter) ->
     Message = io_lib:format("limiter consume failed, limiter:~p~n", [Limiter]),
     erlang:throw({rate_check_fail, Message}).
 
--spec do_check_with_parent_limiter(pos_integer(), token_bucket_limiter()) -> inner_check_result(token_bucket_limiter()).
+-spec do_check_with_parent_limiter(pos_integer(), token_bucket_limiter()) ->
+          inner_check_result(token_bucket_limiter()).
 do_check_with_parent_limiter(Need,
                              #{tokens := Tokens,
                                divisible := Divisible,
@@ -306,15 +308,16 @@ do_reset(Need,
            capacity := Capacity} = Limiter) ->
     Now = ?NOW,
     Tokens2 = apply_elapsed_time(Rate, Now - LastTime, Tokens, Capacity),
-    if Tokens2 >= Need ->
+    Available = erlang:floor(Tokens2),
+    if Available >= Need ->
             Limiter2 = Limiter#{tokens := Tokens2, lasttime := Now},
             do_check_with_parent_limiter(Need, Limiter2);
-       Divisible andalso Tokens2 > 0 ->
+       Divisible andalso Available > 0 ->
             %% must be allocated here, because may be Need > Capacity
             return_pause(Rate,
                          partial,
                          fun do_reset/2,
-                         Need - Tokens2,
+                         Need - Available,
                          Limiter#{tokens := 0, lasttime := Now});
        true ->
             return_pause(Rate, pause, fun do_reset/2, Need, Limiter)
@@ -331,8 +334,8 @@ return_pause(Rate, PauseType, Fun, Diff, Limiter) ->
     Pause = emqx_misc:clamp(Val, ?MINIMUM_PAUSE, ?MAXIMUM_PAUSE),
     {PauseType, Pause, make_retry_context(Fun, Diff), Limiter}.
 
--spec make_retry_context(undefined | retry_fun(Limiter), non_neg_integer()) -> retry_context(Limiter)
-              when Limiter :: limiter().
+-spec make_retry_context(undefined | retry_fun(Limiter), non_neg_integer()) ->
+          retry_context(Limiter) when Limiter :: limiter().
 make_retry_context(Fun, Diff) ->
     #{continuation => Fun, diff => Diff}.
 

+ 1 - 2
apps/emqx/src/emqx_limiter/src/emqx_limiter_app.erl

@@ -39,8 +39,7 @@
           {ok, Pid :: pid(), State :: term()} |
           {error, Reason :: term()}.
 start(_StartType, _StartArgs) ->
-    {ok, _} = Result = emqx_limiter_sup:start_link(),
-    Result.
+    {ok, _} = emqx_limiter_sup:start_link().
 
 %%--------------------------------------------------------------------
 %% @private

+ 7 - 2
apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl

@@ -21,7 +21,7 @@
 %% @end
 
 %% API
--export([ new/0, new/1, get_limiter_by_names/2
+-export([ new/0, new/1, new/2, get_limiter_by_names/2
         , add_new/3, update_by_name/3, set_retry_context/2
         , check/3, retry/2, get_retry_context/1
         , check_list/2, retry_list/2
@@ -60,7 +60,12 @@ new() ->
 %% @doc generate default data according to the type of limiter
 -spec new(list(limiter_type())) -> container().
 new(Types) ->
-    get_limiter_by_names(Types, #{}).
+    new(Types, #{}).
+
+-spec new(list(limiter_type()),
+          #{limiter_type() => emqx_limiter_schema:bucket_name()}) -> container().
+new(Types, Names) ->
+    get_limiter_by_names(Types, Names).
 
 %% @doc generate a container
 %% according to the type of limiter and the bucket name configuration of the limiter

+ 2 - 2
apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl

@@ -29,7 +29,7 @@
                       | message_in
                       | connection
                       | message_routing
-                      | shared.
+                      | batch.
 
 -type bucket_name() :: atom().
 -type rate() :: infinity | float().
@@ -142,7 +142,7 @@ to_rate(Str, CanInfinity, CanZero) ->
             {ok, Val} = to_capacity(QuotaStr),
             check_capacity(Str, Val, CanZero,
                            fun(Quota) ->
-                                   Quota * minimum_period() / ?UNIT_TIME_IN_MS
+                                   {ok, Quota * minimum_period() / ?UNIT_TIME_IN_MS}
                            end);
         [QuotaStr, Interval] ->
             {ok, Val} = to_capacity(QuotaStr),

+ 27 - 16
apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl

@@ -34,7 +34,7 @@
          terminate/2, code_change/3, format_status/2]).
 
 -export([ start_link/1, connect/2, info/1
-        , name/1, get_initial_val/1
+        , name/1, get_initial_val/1, update_config/1
         ]).
 
 -type root() :: #{ rate := rate()             %% number of tokens generated per period
@@ -85,7 +85,7 @@
           emqx_htb_limiter:limiter().
 %% If no bucket path is set in config, there will be no limit
 connect(_Type, undefined) ->
-    emqx_htb_limiter:make_infinity_limiter(undefined);
+    emqx_htb_limiter:make_infinity_limiter();
 
 connect(Type, BucketName) when is_atom(BucketName) ->
     CfgPath = emqx_limiter_schema:get_bucket_cfg_path(Type, BucketName),
@@ -101,7 +101,7 @@ connect(Type, BucketName) when is_atom(BucketName) ->
                     if CliRate < AggrRate orelse CliSize < AggrSize ->
                             emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket);
                        Bucket =:= infinity ->
-                            emqx_htb_limiter:make_infinity_limiter(Cfg);
+                            emqx_htb_limiter:make_infinity_limiter();
                        true ->
                             emqx_htb_limiter:make_ref_limiter(Cfg, Bucket)
                     end;
@@ -122,6 +122,10 @@ info(Type) ->
 name(Type) ->
     erlang:list_to_atom(io_lib:format("~s_~s", [?MODULE, Type])).
 
+-spec update_config(limiter_type()) -> ok.
+update_config(Type) ->
+    ?CALL(Type).
+
 %%--------------------------------------------------------------------
 %% @doc
 %% Starts the server
@@ -131,6 +135,7 @@ name(Type) ->
 start_link(Type) ->
     gen_server:start_link({local, name(Type)}, ?MODULE, [Type], []).
 
+
 %%--------------------------------------------------------------------
 %%% gen_server callbacks
 %%--------------------------------------------------------------------
@@ -147,16 +152,10 @@ start_link(Type) ->
           {stop, Reason :: term()} |
           ignore.
 init([Type]) ->
-    State = #{ type => Type
-             , root => undefined
-             , counter => undefined
-             , index => 1
-             , buckets => #{}
-             },
-    State2 = init_tree(Type, State),
-    #{root := #{period := Perido}} = State2,
+    State = init_tree(Type),
+    #{root := #{period := Perido}} = State,
     oscillate(Perido),
-    {ok, State2}.
+    {ok, State}.
 
 %%--------------------------------------------------------------------
 %% @private
@@ -176,6 +175,10 @@ init([Type]) ->
 handle_call(info, _From, State) ->
     {reply, State, State};
 
+handle_call(update_config, _From, #{type := Type}) ->
+    NewState = init_tree(Type),
+    {reply, ok, NewState};
+
 handle_call(Req, _From, State) ->
     ?SLOG(error, #{msg => "unexpected_call", call => Req}),
     {reply, ignored, State}.
@@ -362,10 +365,11 @@ maybe_burst(State) ->
 dispatch_burst([], _, State) ->
     State;
 
-dispatch_burst(Empties, InFlow, #{consumed := Consumed, buckets := Buckets} = State) ->
+dispatch_burst(Empties, InFlow,
+               #{root := #{consumed := Consumed} = Root, buckets := Buckets} = State) ->
     EachFlow = InFlow / erlang:length(Empties),
     {Alloced, Buckets2} = dispatch_burst_to_buckets(Empties, EachFlow, 0, Buckets),
-    State#{consumed := Consumed + Alloced, buckets := Buckets2}.
+    State#{root := Root#{consumed := Consumed + Alloced}, buckets := Buckets2}.
 
 -spec dispatch_burst_to_buckets(list(bucket()),
                                 float(),
@@ -385,8 +389,15 @@ dispatch_burst_to_buckets([Bucket | T], InFlow, Alloced, Buckets) ->
 dispatch_burst_to_buckets([], _, Alloced, Buckets) ->
     {Alloced, Buckets}.
 
--spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state().
-init_tree(Type, State) ->
+-spec init_tree(emqx_limiter_schema:limiter_type()) -> state().
+init_tree(Type) ->
+    State = #{ type => Type
+             , root => undefined
+             , counter => undefined
+             , index => 1
+             , buckets => #{}
+             },
+
     #{bucket := Buckets} = Cfg = emqx:get_config([limiter, Type]),
     {Factor, Root} = make_root(Cfg),
     {CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, Factor, 1, []),

+ 1 - 0
apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl

@@ -46,6 +46,7 @@ start(Type) ->
     Spec = make_child(Type),
     supervisor:start_child(?MODULE, Spec).
 
+%% XXX This is maybe a workaround, not so good
 -spec restart(emqx_limiter_schema:limiter_type()) -> _.
 restart(Type) ->
     Id = emqx_limiter_server:name(Type),

+ 35 - 60
apps/emqx/test/emqx_channel_SUITE.erl

@@ -113,57 +113,32 @@ listener_mqtt_ws_conf() ->
 listeners_conf() ->
     #{tcp => #{default => listener_mqtt_tcp_conf()},
       ws => #{default => listener_mqtt_ws_conf()}
-    }.
+     }.
 
 limiter_conf() ->
-    #{bytes_in =>
-        #{bucket =>
-                #{default =>
-                    #{aggregated =>
-                            #{capacity => infinity,initial => 0,rate => infinity},
-                        per_client =>
-                            #{capacity => infinity,divisible => false,
-                            failure_strategy => force,initial => 0,low_water_mark => 0,
-                            max_retry_time => 5000,rate => infinity},
-                        zone => default}},
-            global => #{burst => 0,rate => infinity},
-            zone => #{default => #{burst => 0,rate => infinity}}},
-      connection =>
-        #{bucket =>
-                #{default =>
-                    #{aggregated =>
-                            #{capacity => infinity,initial => 0,rate => infinity},
-                        per_client =>
-                            #{capacity => infinity,divisible => false,
-                            failure_strategy => force,initial => 0,low_water_mark => 0,
-                            max_retry_time => 5000,rate => infinity},
-                        zone => default}},
-            global => #{burst => 0,rate => infinity},
-            zone => #{default => #{burst => 0,rate => infinity}}},
-      message_in =>
-        #{bucket =>
-                #{default =>
-                    #{aggregated =>
-                            #{capacity => infinity,initial => 0,rate => infinity},
-                        per_client =>
-                            #{capacity => infinity,divisible => false,
-                            failure_strategy => force,initial => 0,low_water_mark => 0,
-                            max_retry_time => 5000,rate => infinity},
-                        zone => default}},
-            global => #{burst => 0,rate => infinity},
-            zone => #{default => #{burst => 0,rate => infinity}}},
-      message_routing =>
-        #{bucket =>
-                #{default =>
-                    #{aggregated =>
-                            #{capacity => infinity,initial => 0,rate => infinity},
-                        per_client =>
-                            #{capacity => infinity,divisible => false,
-                            failure_strategy => force,initial => 0,low_water_mark => 0,
-                            max_retry_time => 5000,rate => infinity},
-                        zone => default}},
-            global => #{burst => 0,rate => infinity},
-            zone => #{default => #{burst => 0,rate => infinity}}}}.
+    Make = fun() ->
+                   #{bucket =>
+                         #{default =>
+                               #{capacity => infinity,
+                                 initial => 0,
+                                 rate => infinity,
+                                 per_client =>
+                                     #{capacity => infinity,divisible => false,
+                                       failure_strategy => force,initial => 0,low_water_mark => 0,
+                                       max_retry_time => 5000,rate => infinity
+                                      }
+                                }
+                          },
+                     burst => 0,
+                     rate => infinity
+                    }
+           end,
+
+    lists:foldl(fun(Name, Acc) ->
+                        Acc#{Name => Make()}
+                end,
+                #{},
+                [bytes_in, message_in, message_routing, connection, batch]).
 
 stats_conf() ->
     #{enable => true}.
@@ -232,7 +207,7 @@ end_per_suite(_Config) ->
 init_per_testcase(TestCase, Config) ->
     OldConf = set_test_listener_confs(),
     emqx_common_test_helpers:start_apps([]),
-    modify_limiter(TestCase, OldConf),
+    check_modify_limiter(TestCase),
     [{config, OldConf}|Config].
 
 end_per_testcase(_TestCase, Config) ->
@@ -240,18 +215,19 @@ end_per_testcase(_TestCase, Config) ->
     emqx_common_test_helpers:stop_apps([]),
     Config.
 
-modify_limiter(TestCase, NewConf) ->
+check_modify_limiter(TestCase) ->
     Checks = [t_quota_qos0, t_quota_qos1, t_quota_qos2],
     case lists:member(TestCase, Checks) of
         true ->
-            modify_limiter(NewConf);
+            modify_limiter();
         _ ->
             ok
     end.
 
 %% per_client 5/1s,5
 %% aggregated 10/1s,10
-modify_limiter(#{limiter := Limiter} = NewConf) ->
+modify_limiter() ->
+    Limiter = emqx_config:get([limiter]),
     #{message_routing := #{bucket := Bucket} = Routing} = Limiter,
     #{default := #{per_client := Client} = Default} = Bucket,
     Client2 = Client#{rate := 5,
@@ -259,16 +235,15 @@ modify_limiter(#{limiter := Limiter} = NewConf) ->
                       capacity := 5,
                       low_water_mark := 1},
     Default2 = Default#{per_client := Client2,
-                        aggregated := #{rate => 10,
-                                        initial => 0,
-                                        capacity => 10
-                                       }},
+                        rate => 10,
+                        initial => 0,
+                        capacity => 10},
     Bucket2 = Bucket#{default := Default2},
     Routing2 = Routing#{bucket := Bucket2},
 
-    NewConf2 = NewConf#{limiter := Limiter#{message_routing := Routing2}},
-    emqx_config:put(NewConf2),
+    emqx_config:put([limiter], Limiter#{message_routing := Routing2}),
     emqx_limiter_manager:restart_server(message_routing),
+    timer:sleep(100),
     ok.
 
 %%--------------------------------------------------------------------
@@ -1078,4 +1053,4 @@ session(InitFields) when is_map(InitFields) ->
 quota() ->
     emqx_limiter_container:get_limiter_by_names([message_routing], limiter_cfg()).
 
-limiter_cfg() -> #{}.
+limiter_cfg() -> #{message_routing => default}.

+ 120 - 309
apps/emqx/test/emqx_ratelimiter_SUITE.erl

@@ -27,59 +27,37 @@
 -define(BASE_CONF, <<"""
 limiter {
   bytes_in {
-    global.rate = infinity
-    zone.default.rate = infinity
     bucket.default {
-      zone = default
-      aggregated.rate = infinity
-      aggregated.capacity = infinity
-      per_client.rate = \"100MB/1s\"
-      per_client.capacity = infinity
+      rate = infinity
+      capacity = infinity
     }
   }
 
   message_in {
-    global.rate = infinity
-    zone.default.rate = infinity
     bucket.default {
-      zone = default
-      aggregated.rate = infinity
-      aggregated.capacity = infinity
-      per_client.rate = infinity
-      per_client.capacity = infinity
+      rate = infinity
+      capacity = infinity
     }
   }
 
   connection {
-    global.rate = infinity
-    zone.default.rate = infinity
     bucket.default {
-      zone = default
-      aggregated.rate = infinity
-      aggregated.capacity = infinity
-      per_client.rate = infinity
-      per_client.capacity = infinity
+      rate = infinity
+      capacity = infinity
     }
   }
 
   message_routing {
-    global.rate = infinity
-    zone.default.rate = infinity
     bucket.default {
-      zone = default
-      aggregated.rate = infinity
-      aggregated.capacity = infinity
-      per_client.rate = infinity
-      per_client.capacity = infinity
+      rate = infinity
+      capacity = infinity
     }
   }
 
-  shared {
+  batch {
     bucket.retainer {
-      aggregated.rate = infinity
-      aggregated.capacity = infinity
-      per_client.rate = infinity
-      per_client.capacity = infinity
+      rate = infinity
+      capacity = infinity
     }
   }
 }
@@ -97,6 +75,7 @@ limiter {
 -define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
 -define(RATE(Rate), to_rate(Rate)).
 -define(NOW, erlang:system_time(millisecond)).
+-define(CONST(X), fun(_) -> X end).
 
 %%--------------------------------------------------------------------
 %% Setups
@@ -231,12 +210,11 @@ t_low_water_mark(_) ->
     with_per_client(default, Cfg, Case).
 
 t_infinity_client(_) ->
-    Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
-                  Aggr2 = Aggr#{rate := infinity,
-                                capacity := infinity},
+    Fun = fun(#{per_client := Cli} = Bucket) ->
+                  Bucket2 = Bucket#{rate := infinity,
+                                    capacity := infinity},
                   Cli2 = Cli#{rate := infinity, capacity := infinity},
-                  Bucket#{aggregated := Aggr2,
-                          per_client := Cli2}
+                  Bucket2#{per_client := Cli2}
           end,
     Case = fun() ->
                    Client = connect(default),
@@ -247,14 +225,13 @@ t_infinity_client(_) ->
     with_bucket(default, Fun, Case).
 
 t_try_restore_agg(_) ->
-    Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
-                  Aggr2 = Aggr#{rate := 1,
-                                capacity := 200,
-                                initial := 50},
+    Fun = fun(#{per_client := Cli} = Bucket) ->
+                  Bucket2 = Bucket#{rate := 1,
+                                    capacity := 200,
+                                    initial := 50},
                   Cli2 = Cli#{rate := infinity, capacity := infinity, divisible := true,
                               max_retry_time := 100, failure_strategy := force},
-                  Bucket#{aggregated := Aggr2,
-                          per_client := Cli2}
+                  Bucket2#{per_client := Cli2}
           end,
     Case = fun() ->
                    Client = connect(default),
@@ -267,15 +244,14 @@ t_try_restore_agg(_) ->
     with_bucket(default, Fun, Case).
 
 t_short_board(_) ->
-    Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
-                  Aggr2 = Aggr#{rate := ?RATE("100/1s"),
-                                initial := 0,
-                                capacity := 100},
+    Fun = fun(#{per_client := Cli} = Bucket) ->
+                  Bucket2 = Bucket#{rate := ?RATE("100/1s"),
+                                    initial := 0,
+                                    capacity := 100},
                   Cli2 = Cli#{rate := ?RATE("600/1s"),
                               capacity := 600,
                               initial := 600},
-                  Bucket#{aggregated := Aggr2,
-                          per_client := Cli2}
+                  Bucket2#{per_client := Cli2}
           end,
     Case = fun() ->
                    Counter = counters:new(1, []),
@@ -286,15 +262,14 @@ t_short_board(_) ->
     with_bucket(default, Fun, Case).
 
 t_rate(_) ->
-    Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
-              Aggr2 = Aggr#{rate := ?RATE("100/100ms"),
-                            initial := 0,
-                            capacity := infinity},
+    Fun = fun(#{per_client := Cli} = Bucket) ->
+              Bucket2 = Bucket#{rate := ?RATE("100/100ms"),
+                                initial := 0,
+                                capacity := infinity},
               Cli2 = Cli#{rate := infinity,
                           capacity := infinity,
                           initial := 0},
-              Bucket#{aggregated := Aggr2,
-                      per_client := Cli2}
+              Bucket2#{per_client := Cli2}
       end,
     Case = fun() ->
                Client = connect(default),
@@ -311,113 +286,74 @@ t_rate(_) ->
 
 t_capacity(_) ->
     Capacity = 600,
-    Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
-              Aggr2 = Aggr#{rate := ?RATE("100/100ms"),
-                            initial := 0,
-                            capacity := 600},
-              Cli2 = Cli#{rate := infinity,
-                          capacity := infinity,
-                          initial := 0},
-              Bucket#{aggregated := Aggr2,
-                      per_client := Cli2}
+    Fun = fun(#{per_client := Cli} = Bucket) ->
+              Bucket2 = Bucket#{rate := ?RATE("100/100ms"),
+                                initial := 0,
+                                capacity := 600},
+                  Cli2 = Cli#{rate := infinity,
+                              capacity := infinity,
+                              initial := 0},
+                  Bucket2#{per_client := Cli2}
           end,
     Case = fun() ->
-               Client = connect(default),
-               timer:sleep(1000),
+                   Client = connect(default),
+                   timer:sleep(1000),
                    C1 = emqx_htb_limiter:available(Client),
                    ?assertEqual(Capacity, C1, "test bucket capacity")
            end,
     with_bucket(default, Fun, Case).
 
-%%--------------------------------------------------------------------
-%% Test Cases Zone Level
-%%--------------------------------------------------------------------
-t_limit_zone_with_unlimit_bucket(_) ->
-    ZoneMod = fun(Cfg) ->
-                  Cfg#{rate := ?RATE("600/1s"),
-                       burst := ?RATE("60/1s")}
-              end,
-
-    Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
-                 Aggr2 = Aggr#{rate := infinity,
-                               initial := 0,
-                               capacity := infinity},
-                 Cli2 = Cli#{rate := infinity,
-                             initial := 0,
-                             capacity := infinity,
-                             divisible := true},
-                 Bucket#{aggregated := Aggr2, per_client := Cli2}
-             end,
-
-    Case = fun() ->
-               C1 = counters:new(1, []),
-               start_client(b1, ?NOW + 2000, C1, 20),
-               timer:sleep(2100),
-               check_average_rate(C1, 2, 600)
-           end,
-
-    with_zone(default, ZoneMod, [{b1, Bucket}], Case).
-
-
 %%--------------------------------------------------------------------
 %% Test Cases Global Level
 %%--------------------------------------------------------------------
-t_burst_and_fairness(_) ->
+t_collaborative_alloc(_) ->
     GlobalMod = fun(Cfg) ->
-                    Cfg#{burst := ?RATE("60/1s")}
+                        Cfg#{rate := ?RATE("600/1s")}
                 end,
 
-    ZoneMod = fun(Cfg) ->
-                  Cfg#{rate := ?RATE("600/1s"),
-                       burst := ?RATE("60/1s")}
-              end,
-
-    Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
-                     Aggr2 = Aggr#{rate := ?RATE("500/1s"),
-                                   initial := 0,
-                                   capacity := 500},
-                     Cli2 = Cli#{rate := ?RATE("600/1s"),
-                                 capacity := 600,
-                                 initial := 600},
-                     Bucket#{aggregated := Aggr2,
-                             per_client := Cli2}
+    Bucket1 = fun(#{per_client := Cli} = Bucket) ->
+                      Bucket2 = Bucket#{rate := ?RATE("400/1s"),
+                                        initial := 0,
+                                        capacity := 600},
+                      Cli2 = Cli#{rate := ?RATE("50"),
+                                  capacity := 100,
+                                  initial := 100},
+                      Bucket2#{per_client := Cli2}
              end,
 
+    Bucket2 = fun(Bucket) ->
+                      Bucket2 = Bucket1(Bucket),
+                      Bucket2#{rate := ?RATE("200/1s")}
+              end,
+
     Case = fun() ->
                    C1 = counters:new(1, []),
                    C2 = counters:new(1, []),
                    start_client(b1, ?NOW + 2000, C1, 20),
                    start_client(b2, ?NOW + 2000, C2, 30),
                    timer:sleep(2100),
-                   check_average_rate(C1, 2, 330),
-                   check_average_rate(C2, 2, 330)
+                   check_average_rate(C1, 2, 300),
+                   check_average_rate(C2, 2, 300)
            end,
 
     with_global(GlobalMod,
-                default,
-                ZoneMod,
-                [{b1, Bucket}, {b2, Bucket}],
+                [{b1, Bucket1}, {b2, Bucket2}],
                 Case).
 
 t_burst(_) ->
     GlobalMod = fun(Cfg) ->
-                        Cfg#{burst := ?RATE("60/1s")}
+                        Cfg#{rate := ?RATE("200/1s"),
+                             burst := ?RATE("400/1s")}
                 end,
 
-    ZoneMod = fun(Cfg) ->
-                      Cfg#{rate := ?RATE("60/1s"),
-                           burst := ?RATE("60/1s")}
-              end,
-
-    Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
-                     Aggr2 = Aggr#{rate := ?RATE("500/1s"),
-                                   initial := 0,
-                                   capacity := 500},
-                     Cli2 = Cli#{rate := ?RATE("600/1s"),
-                                 capacity := 600,
+    Bucket = fun(#{per_client := Cli} = Bucket) ->
+                     Bucket2 = Bucket#{rate := ?RATE("200/1s"),
+                                       initial := 0,
+                                       capacity := 200},
+                     Cli2 = Cli#{rate := ?RATE("50/1s"),
+                                 capacity := 200,
                                  divisible := true},
-                     Bucket#{aggregated := Aggr2,
-                             per_client := Cli2}
+                     Bucket2#{per_client := Cli2}
              end,
 
     Case = fun() ->
@@ -430,178 +366,37 @@ t_burst(_) ->
                    timer:sleep(2100),
 
                    Total = lists:sum([counters:get(X, 1) || X <- [C1, C2, C3]]),
-                   in_range(Total / 2, 30)
+                   in_range(Total / 2, 300)
            end,
 
     with_global(GlobalMod,
-                default,
-                ZoneMod,
                 [{b1, Bucket}, {b2, Bucket}, {b3, Bucket}],
                 Case).
 
-
 t_limit_global_with_unlimit_other(_) ->
     GlobalMod = fun(Cfg) ->
                         Cfg#{rate := ?RATE("600/1s")}
                 end,
 
-    ZoneMod = fun(Cfg) -> Cfg#{rate := infinity} end,
-
-    Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
-                 Aggr2 = Aggr#{rate := infinity,
-                               initial := 0,
-                               capacity := infinity},
-                 Cli2 = Cli#{rate := infinity,
-                             capacity := infinity,
-                             initial := 0},
-                 Bucket#{aggregated := Aggr2,
-                         per_client := Cli2}
-             end,
-
-    Case = fun() ->
-               C1 = counters:new(1, []),
-               start_client(b1, ?NOW + 2000, C1, 20),
-               timer:sleep(2100),
-               check_average_rate(C1, 2, 600)
-           end,
-
-    with_global(GlobalMod,
-                default,
-                ZoneMod,
-                [{b1, Bucket}],
-                Case).
-
-t_multi_zones(_) ->
-    GlobalMod = fun(Cfg) ->
-                    Cfg#{rate := ?RATE("600/1s")}
-                end,
-
-    Zone1 = fun(Cfg) ->
-                Cfg#{rate := ?RATE("400/1s")}
-            end,
-
-    Zone2 = fun(Cfg) ->
-                Cfg#{rate := ?RATE("500/1s")}
-            end,
-
-    Bucket = fun(Zone, Rate) ->
-                 fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
-                     Aggr2 = Aggr#{rate := infinity,
-                                   initial := 0,
-                                   capacity := infinity},
-                     Cli2 = Cli#{rate := Rate,
-                                 capacity := infinity,
-                                 initial := 0},
-                     Bucket#{aggregated := Aggr2,
-                             per_client := Cli2,
-                             zone := Zone}
-                 end
-             end,
-
-    Case = fun() ->
-               C1 = counters:new(1, []),
-               C2 = counters:new(1, []),
-               start_client(b1, ?NOW + 2000, C1, 25),
-               start_client(b2, ?NOW + 2000, C2, 20),
-               timer:sleep(2100),
-               check_average_rate(C1, 2, 300),
-               check_average_rate(C2, 2, 300)
-           end,
-
-    with_global(GlobalMod,
-                [z1, z2],
-                [Zone1, Zone2],
-                [{b1, Bucket(z1, ?RATE("400/1s"))}, {b2, Bucket(z2, ?RATE("500/1s"))}],
-                Case).
-
-%% because the simulated client will try to reach the maximum rate
-%% when divisiable = true, a large number of divided tokens will be generated
-%% so this is not an accurate test
-t_multi_zones_with_divisible(_) ->
-    GlobalMod = fun(Cfg) ->
-                    Cfg#{rate := ?RATE("600/1s")}
-                end,
-
-    Zone1 = fun(Cfg) ->
-                Cfg#{rate := ?RATE("400/1s")}
-            end,
-
-    Zone2 = fun(Cfg) ->
-                Cfg#{rate := ?RATE("500/1s")}
-            end,
-
-    Bucket = fun(Zone, Rate) ->
-                 fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
-                     Aggr2 = Aggr#{rate := Rate,
-                                   initial := 0,
-                                   capacity := infinity},
-                     Cli2 = Cli#{rate := Rate,
-                                 divisible := true,
-                                 capacity := infinity,
-                                 initial := 0},
-                     Bucket#{aggregated := Aggr2,
-                             per_client := Cli2,
-                             zone := Zone}
-                 end
-             end,
-
-    Case = fun() ->
-               C1 = counters:new(1, []),
-               C2 = counters:new(1, []),
-               start_client(b1, ?NOW + 2000, C1, 25),
-               start_client(b2, ?NOW + 2000, C2, 20),
-               timer:sleep(2100),
-               check_average_rate(C1, 2, 300),
-               check_average_rate(C2, 2, 300)
-           end,
-
-    with_global(GlobalMod,
-                [z1, z2],
-                [Zone1, Zone2],
-                [{b1, Bucket(z1, ?RATE("400/1s"))}, {b2, Bucket(z2, ?RATE("500/1s"))}],
-                Case).
-
-t_zone_hunger_and_fair(_) ->
-    GlobalMod = fun(Cfg) ->
-                    Cfg#{rate := ?RATE("600/1s")}
-                end,
-
-    Zone1 = fun(Cfg) ->
-                Cfg#{rate := ?RATE("600/1s")}
-            end,
-
-    Zone2 = fun(Cfg) ->
-                Cfg#{rate := ?RATE("50/1s")}
-            end,
-
-    Bucket = fun(Zone, Rate) ->
-                 fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
-                     Aggr2 = Aggr#{rate := infinity,
-                                   initial := 0,
-                                   capacity := infinity},
-                     Cli2 = Cli#{rate := Rate,
+    Bucket = fun(#{per_client := Cli} = Bucket) ->
+                     Bucket2 = Bucket#{rate := infinity,
+                                       initial := 0,
+                                       capacity := infinity},
+                     Cli2 = Cli#{rate := infinity,
                                  capacity := infinity,
                                  initial := 0},
-                     Bucket#{aggregated := Aggr2,
-                             per_client := Cli2,
-                             zone := Zone}
-                 end
+                     Bucket2#{per_client := Cli2}
              end,
 
     Case = fun() ->
-               C1 = counters:new(1, []),
-               C2 = counters:new(1, []),
-               start_client(b1, ?NOW + 2000, C1, 20),
-               start_client(b2, ?NOW + 2000, C2, 20),
-               timer:sleep(2100),
-               check_average_rate(C1, 2, 550),
-               check_average_rate(C2, 2, 50)
+                   C1 = counters:new(1, []),
+                   start_client(b1, ?NOW + 2000, C1, 20),
+                   timer:sleep(2100),
+                   check_average_rate(C1, 2, 600)
            end,
 
     with_global(GlobalMod,
-                [z1, z2],
-                [Zone1, Zone2],
-                [{b1, Bucket(z1, ?RATE("600/1s"))}, {b2, Bucket(z2, ?RATE("50/1s"))}],
+                [{b1, Bucket}],
                 Case).
 
 %%--------------------------------------------------------------------
@@ -626,7 +421,8 @@ t_check_container(_) ->
                        capacity := 1000}
           end,
     Case = fun() ->
-                   C1 = emqx_limiter_container:new([message_routing]),
+                   C1 = emqx_limiter_container:new([message_routing],
+                                                   #{message_routing => default}),
                    {ok, C2} = emqx_limiter_container:check(1000, message_routing, C1),
                    {pause, Pause, C3} = emqx_limiter_container:check(1000, message_routing, C2),
                    timer:sleep(Pause),
@@ -663,9 +459,7 @@ t_limiter_server(_) ->
     ?assertMatch(#{root := _,
                    counter := _,
                    index := _,
-                   zones := _,
                    buckets := _,
-                   nodes := _,
                    type := message_routing}, State),
 
     Name = emqx_limiter_server:name(message_routing),
@@ -675,6 +469,32 @@ t_limiter_server(_) ->
     ok = emqx_limiter_server:format_status(normal, ok),
     ok.
 
+t_decimal(_) ->
+    ?assertEqual(infinity, emqx_limiter_decimal:add(infinity, 3)),
+    ?assertEqual(5, emqx_limiter_decimal:add(2, 3)),
+    ?assertEqual(infinity, emqx_limiter_decimal:sub(infinity, 3)),
+    ?assertEqual(-1, emqx_limiter_decimal:sub(2, 3)),
+    ?assertEqual(infinity, emqx_limiter_decimal:mul(infinity, 3)),
+    ?assertEqual(6, emqx_limiter_decimal:mul(2, 3)),
+    ?assertEqual(infinity, emqx_limiter_decimal:floor_div(infinity, 3)),
+    ?assertEqual(2, emqx_limiter_decimal:floor_div(7, 3)),
+    ok.
+
+t_schema_unit(_) ->
+    M = emqx_limiter_schema,
+    ?assertEqual(limiter, M:namespace()),
+    ?assertEqual({ok, infinity}, M:to_rate(" infinity ")),
+    ?assertMatch({ok, _}, M:to_rate("100")),
+    ?assertMatch({error, _}, M:to_rate("0")),
+    ?assertMatch({ok, _}, M:to_rate("100/10s")),
+    ?assertMatch({error, _}, M:to_rate("100/10x")),
+    ?assertEqual({ok, infinity}, M:to_capacity("infinity")),
+    ?assertEqual({ok, 100}, M:to_capacity("100")),
+    ?assertEqual({ok, 100 * 1024}, M:to_capacity("100KB")),
+    ?assertEqual({ok, 100 * 1024 * 1024}, M:to_capacity("100MB")),
+    ?assertEqual({ok, 100 * 1024 * 1024 * 1024}, M:to_capacity("100GB")),
+    ok.
+
 %%--------------------------------------------------------------------
 %%% Internal functions
 %%--------------------------------------------------------------------
@@ -752,7 +572,6 @@ client_try_check(Need, #client{counter = Counter,
             end
     end.
 
-
 %% XXX not a god test, because client's rate maybe bigger than global rate
 %% so if client' rate = infinity
 %% client's divisible should be true or capacity must be bigger than number of each consume
@@ -769,25 +588,17 @@ to_rate(Str) ->
     {ok, Rate} = emqx_limiter_schema:to_rate(Str),
     Rate.
 
-with_global(Modifier, ZoneName, ZoneModifier, Buckets, Case) ->
-    Path = [limiter, message_routing],
-    #{global := Global} = Cfg = emqx_config:get(Path),
-    Cfg2 = Cfg#{global := Modifier(Global)},
-    with_zone(Cfg2, ZoneName, ZoneModifier, Buckets, Case).
-
-with_zone(Name, Modifier, Buckets, Case) ->
-    Path = [limiter, message_routing],
-    Cfg = emqx_config:get(Path),
-    with_zone(Cfg, Name, Modifier, Buckets, Case).
+with_global(Modifier, BuckeTemps, Case) ->
+    Fun = fun(Cfg) ->
+                  #{bucket := #{default := BucketCfg}} = Cfg2 = Modifier(Cfg),
+                  Fun = fun({Name, BMod}, Acc) ->
+                                Acc#{Name => BMod(BucketCfg)}
+                        end,
+                  Buckets = lists:foldl(Fun, #{}, BuckeTemps),
+                  Cfg2#{bucket := Buckets}
+          end,
 
-with_zone(Cfg, Name, Modifier, Buckets, Case) ->
-    Path = [limiter, message_routing],
-    #{zone := ZoneCfgs,
-      bucket := BucketCfgs} = Cfg,
-    ZoneCfgs2 = apply_modifier(Name, Modifier, ZoneCfgs),
-    BucketCfgs2 = apply_modifier(Buckets, BucketCfgs),
-    Cfg2 = Cfg#{zone := ZoneCfgs2, bucket := BucketCfgs2},
-    with_config(Path, fun(_) -> Cfg2 end, Case).
+    with_config([limiter, message_routing], Fun, Case).
 
 with_bucket(Bucket, Modifier, Case) ->
     Path = [limiter, message_routing, bucket, Bucket],
@@ -802,8 +613,8 @@ with_config(Path, Modifier, Case) ->
     NewCfg = Modifier(Cfg),
     ct:pal("test with config:~p~n", [NewCfg]),
     emqx_config:put(Path, NewCfg),
-    emqx_limiter_manager:restart_server(message_routing),
-    timer:sleep(100),
+    emqx_limiter_server:update_config(message_routing),
+    timer:sleep(500),
     DelayReturn = delay_return(Case),
     emqx_config:put(Path, Cfg),
     DelayReturn().

+ 17 - 3
apps/emqx/test/emqx_ws_connection_SUITE.erl

@@ -405,16 +405,30 @@ t_handle_timeout_emit_stats(_) ->
     ?assertEqual(undefined, ?ws_conn:info(stats_timer, St)).
 
 t_ensure_rate_limit(_) ->
+    %% XXX In the future, limiter should provide API for config update
+    Path = [limiter, bytes_in, bucket, default, per_client],
+    PerClient = emqx_config:get(Path),
+    {ok, Rate}= emqx_limiter_schema:to_rate("50MB"),
+    emqx_config:put(Path, PerClient#{rate := Rate}),
+    emqx_limiter_server:update_config(bytes_in),
+    timer:sleep(100),
+
     Limiter = init_limiter(),
     St = st(#{limiter => Limiter}),
-    {ok, Need} = emqx_limiter_schema:to_capacity("1GB"), %% must bigger than value in emqx_ratelimit_SUITE
+
+    %% must bigger than value in emqx_ratelimit_SUITE
+    {ok, Need} = emqx_limiter_schema:to_capacity("1GB"),
     St1 = ?ws_conn:check_limiter([{Need, bytes_in}],
                                  [],
                                  fun(_, _, S) -> S end,
                                  [],
                                  St),
     ?assertEqual(blocked, ?ws_conn:info(sockstate, St1)),
-    ?assertEqual([{active, false}], ?ws_conn:info(postponed, St1)).
+    ?assertEqual([{active, false}], ?ws_conn:info(postponed, St1)),
+
+    emqx_config:put(Path, PerClient),
+    emqx_limiter_server:update_config(bytes_in),
+    timer:sleep(100).
 
 t_parse_incoming(_) ->
     {Packets, St} = ?ws_conn:parse_incoming(<<48,3>>, [], st()),
@@ -558,7 +572,7 @@ ws_client(State) ->
         ct:fail(ws_timeout)
     end.
 
-limiter_cfg() -> #{}.
+limiter_cfg() -> #{bytes_in => default, message_in => default}.
 
 init_limiter() ->
     emqx_limiter_container:get_limiter_by_names([bytes_in, message_in], limiter_cfg()).

+ 0 - 2
apps/emqx_dashboard/src/emqx_dashboard_swagger.erl

@@ -493,8 +493,6 @@ typename_to_spec("failure_strategy()", _Mod) ->
     #{type => string, example => <<"force">>};
 typename_to_spec("initial()", _Mod) ->
     #{type => string, example => <<"0M">>};
-typename_to_spec("bucket_path()", _Mod) ->
-    #{type => string, example => <<"groupName.bucketName">>};
 typename_to_spec(Name, Mod) ->
     Spec = range(Name),
     Spec1 = remote_module_type(Spec, Name, Mod),

+ 4 - 3
apps/emqx_retainer/src/emqx_retainer_dispatcher.erl

@@ -198,14 +198,15 @@ dispatch(Context, Pid, Topic, Cursor, Limiter) ->
     Mod = emqx_retainer:get_backend_module(),
     case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of
         false ->
-            {ok, Result} = Mod:read_message(Context, Topic),
+            {ok, Result} = erlang:apply(Mod, read_message, [Context, Topic]),
             deliver(Result, Context, Pid, Topic, undefined, Limiter);
         true  ->
-            {ok, Result, NewCursor} =  Mod:match_messages(Context, Topic, Cursor),
+            {ok, Result, NewCursor} = erlang:apply(Mod, match_messages, [Context, Topic, Cursor]),
             deliver(Result, Context, Pid, Topic, NewCursor, Limiter)
     end.
 
--spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}.
+-spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) ->
+          {ok, limiter()}.
 deliver([], _Context, _Pid, _Topic, undefined, Limiter) ->
     {ok, Limiter};
 

+ 8 - 9
apps/emqx_retainer/test/emqx_retainer_SUITE.erl

@@ -36,7 +36,7 @@ retainer {
     flow_control {
         batch_read_number = 0
         batch_deliver_number = 0
-        limiter_bucket_name = retainer
+        limiter.batch = retainer
      }
    backend {
         type = built_in_database
@@ -281,12 +281,11 @@ t_stop_publish_clear_msg(_) ->
     ok = emqtt:disconnect(C1).
 
 t_flow_control(_) ->
-    #{per_client := PerClient} = RetainerCfg = emqx_config:get([limiter, shared, bucket, retainer]),
-    RetainerCfg2 = RetainerCfg#{
-                     per_client := PerClient#{
-                                     rate := emqx_ratelimiter_SUITE:to_rate("1/1s"),
-                                     capacity := 1}},
-    emqx_config:put([limiter, shared, bucket, retainer], RetainerCfg2),
+    #{per_client := PerClient} = RetainerCfg = emqx_config:get([limiter, batch, bucket, retainer]),
+    RetainerCfg2 = RetainerCfg#{per_client :=
+                                    PerClient#{rate := emqx_ratelimiter_SUITE:to_rate("1/1s"),
+                                               capacity := 1}},
+    emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg2),
     emqx_limiter_manager:restart_server(shared),
     timer:sleep(500),
 
@@ -296,7 +295,7 @@ t_flow_control(_) ->
     emqx_retainer:update_config(#{<<"flow_control">> =>
                                       #{<<"batch_read_number">> => 1,
                                         <<"batch_deliver_number">> => 1,
-                                        <<"limiter_bucket_name">> => retainer}}),
+                                        <<"limiter">> => #{<<"batch">> => retainer}}}),
     {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
     {ok, _} = emqtt:connect(C1),
     emqtt:publish(
@@ -326,7 +325,7 @@ t_flow_control(_) ->
     ok = emqtt:disconnect(C1),
 
     %% recover the limiter
-    emqx_config:put([limiter, shared, bucket, retainer], RetainerCfg),
+    emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg),
     emqx_limiter_manager:restart_server(shared),
     timer:sleep(500),