Procházet zdrojové kódy

fix(limiter): set maximum value for `infinity` rate and capacity

There are now two types of limiters, `infinity` and `limited`.
When `infinity` is updated to `limited` by config, the changes only take effect for new users.
When `limited` is updated to `infinity`, old users will never get tokens, because the `countes`
they hold are no longer updated.
Setting the maximum value for `infinity` rate and capacity can unify these two limiters and slove this problem
firest před 3 roky
rodič
revize
e5d223000e

+ 1 - 7
apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl

@@ -50,13 +50,7 @@
 %%--------------------------------------------------------------------
 %%  API
 %%--------------------------------------------------------------------
--spec new(
-    undefined | counters:countres_ref(),
-    undefined | index(),
-    rate()
-) -> bucket_ref().
-new(undefined, _, _) ->
-    infinity;
+-spec new(counters:countres_ref(), index(), rate()) -> bucket_ref().
 new(Counter, Index, Rate) ->
     #{
         counter => Counter,

+ 16 - 3
apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl

@@ -30,7 +30,8 @@
     namespace/0,
     get_bucket_cfg_path/2,
     desc/1,
-    types/0
+    types/0,
+    infinity_value/0,
 ]).
 
 -define(KILOBYTE, 1024).
@@ -46,7 +47,7 @@
 -type rate() :: infinity | float().
 -type burst_rate() :: 0 | float().
 %% the capacity of the token bucket
--type capacity() :: infinity | number().
+-type capacity() :: non_neg_integer().
 %% initial capacity of the token bucket
 -type initial() :: non_neg_integer().
 -type bucket_path() :: list(atom()).
@@ -207,6 +208,18 @@ types() ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
+%% `infinity` to `infinity_value` rules:
+%% 1. all infinity capacity will change to infinity_value
+%% 2. if the rate of global and bucket  both are `infinity`,
+%%    use `infinity_value` as bucket rate. see `emqx_limiter_server:get_counter_rate/2`
+infinity_value() ->
+    %% 1 TB
+    1099511627776.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
 to_burst_rate(Str) ->
     to_rate(Str, false, true).
 
@@ -294,7 +307,7 @@ to_quota(Str, Regex) ->
             {match, [Quota, ""]} ->
                 {ok, erlang:list_to_integer(Quota)};
             {match, ""} ->
-                {ok, infinity};
+                {ok, infinity_value()};
             _ ->
                 {error, Str}
         end

+ 17 - 31
apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl

@@ -118,18 +118,16 @@ connect(Type, BucketName) when is_atom(BucketName) ->
             ?SLOG(error, #{msg => "bucket_config_not_found", type => Type, bucket => BucketName}),
             {error, config_not_found};
         #{
-            rate := AggrRate,
-            capacity := AggrSize,
+            rate := BucketRate,
+            capacity := BucketSize,
             per_client := #{rate := CliRate, capacity := CliSize} = Cfg
         } ->
             case emqx_limiter_manager:find_bucket(Type, BucketName) of
                 {ok, Bucket} ->
                     {ok,
                         if
-                            CliRate < AggrRate orelse CliSize < AggrSize ->
+                            CliRate < BucketRate orelse CliSize < BucketSize ->
                                 emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket);
-                            Bucket =:= infinity ->
-                                emqx_htb_limiter:make_infinity_limiter();
                             true ->
                                 emqx_htb_limiter:make_ref_limiter(Cfg, Bucket)
                         end};
@@ -372,9 +370,6 @@ longitudinal(
 
     case lists:min([ShouldAlloc, Flow, Capacity]) of
         Available when Available > 0 ->
-            %% XXX if capacity is infinity, and flow always > 0, the value in
-            %% counter will be overflow at some point in the future, do we need
-            %% to deal with this situation???
             {Inc, Bucket2} = emqx_limiter_correction:add(Available, Bucket),
             counters:add(Counter, Index, Inc),
 
@@ -491,26 +486,14 @@ make_root(#{rate := Rate, burst := Burst}) ->
 
 make_bucket([{Name, Conf} | T], Type, GlobalCfg, CounterNum, DelayBuckets) ->
     Path = emqx_limiter_manager:make_path(Type, Name),
-    case get_counter_rate(Conf, GlobalCfg) of
-        infinity ->
-            Rate = infinity,
-            Capacity = infinity,
-            Initial = 0,
-            Ref = emqx_limiter_bucket_ref:new(undefined, undefined, Rate),
-            emqx_limiter_manager:insert_bucket(Path, Ref),
-            CounterNum2 = CounterNum,
-            InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) ->
-                State#{buckets := Buckets#{BucketName => Bucket}}
-            end;
-        Rate ->
-            #{capacity := Capacity} = Conf,
-            Initial = get_initial_val(Conf),
-            CounterNum2 = CounterNum + 1,
-            InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) ->
-                {Counter, Idx, State2} = alloc_counter(Path, Rate, Initial, State),
-                Bucket2 = Bucket#{counter := Counter, index := Idx},
-                State2#{buckets := Buckets#{BucketName => Bucket2}}
-            end
+    Rate = get_counter_rate(Conf, GlobalCfg),
+    #{capacity := Capacity} = Conf,
+    Initial = get_initial_val(Conf),
+    CounterNum2 = CounterNum + 1,
+    InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) ->
+        {Counter, Idx, State2} = alloc_counter(Path, Rate, Initial, State),
+        Bucket2 = Bucket#{counter := Counter, index := Idx},
+        State2#{buckets := Buckets#{BucketName => Bucket2}}
     end,
 
     Bucket = #{
@@ -569,8 +552,10 @@ init_counter(Path, Counter, Index, Rate, Initial, State) ->
 %% @doc find first limited node
 get_counter_rate(#{rate := Rate}, _GlobalCfg) when Rate =/= infinity ->
     Rate;
-get_counter_rate(_Cfg, #{rate := Rate}) ->
-    Rate.
+get_counter_rate(_Cfg, #{rate := Rate}) when Rate =/= infinity ->
+    Rate;
+get_counter_rate(_Cfg, _GlobalCfg) ->
+    emqx_limiter_schema:infinity_value().
 
 -spec get_initial_val(hocons:config()) -> decimal().
 get_initial_val(#{
@@ -579,12 +564,13 @@ get_initial_val(#{
     capacity := Capacity
 }) ->
     %% initial will nevner be infinity(see the emqx_limiter_schema)
+    InfVal = emqx_limiter_schema:infinity_value(),
     if
         Initial > 0 ->
             Initial;
         Rate =/= infinity ->
             erlang:min(Rate, Capacity);
-        Capacity =/= infinity ->
+        Capacity =/= InfVal ->
             Capacity;
         true ->
             0