Просмотр исходного кода

Merge pull request #8167 from lafirest/fix/limiter_period

fix(limiter): fix precision issue
lafirest 3 лет назад
Родитель
Сommit
1963441472

+ 2 - 2
apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl

@@ -374,7 +374,7 @@ return_pause(infinity, PauseType, Fun, Diff, Limiter) ->
     %% workaround when emqx_limiter_server's rate is infinity
     {PauseType, ?MINIMUM_PAUSE, make_retry_context(Fun, Diff), Limiter};
 return_pause(Rate, PauseType, Fun, Diff, Limiter) ->
-    Val = erlang:round(Diff * emqx_limiter_schema:minimum_period() / Rate),
+    Val = erlang:round(Diff * emqx_limiter_schema:default_period() / Rate),
     Pause = emqx_misc:clamp(Val, ?MINIMUM_PAUSE, ?MAXIMUM_PAUSE),
     {PauseType, Pause, make_retry_context(Fun, Diff), Limiter}.
 
@@ -408,5 +408,5 @@ may_return_or_pause(_, Limiter) ->
 
 %% @doc apply the elapsed time to the limiter
 apply_elapsed_time(Rate, Elapsed, Tokens, Capacity) ->
-    Inc = floor_div(mul(Elapsed, Rate), emqx_limiter_schema:minimum_period()),
+    Inc = floor_div(mul(Elapsed, Rate), emqx_limiter_schema:default_period()),
     erlang:min(add(Tokens, Inc), Capacity).

+ 5 - 5
apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl

@@ -24,7 +24,7 @@
     fields/1,
     to_rate/1,
     to_capacity/1,
-    minimum_period/0,
+    default_period/0,
     to_burst_rate/1,
     to_initial/1,
     namespace/0,
@@ -191,8 +191,8 @@ desc(client_bucket) ->
 desc(_) ->
     undefined.
 
-%% minimum period is 100ms
-minimum_period() ->
+%% default period is 100ms
+default_period() ->
     100.
 
 to_rate(Str) ->
@@ -235,7 +235,7 @@ to_rate(Str, CanInfinity, CanZero) ->
         %% if time unit is 1s, it can be omitted
         {match, [QuotaStr]} ->
             Fun = fun(Quota) ->
-                {ok, Quota * minimum_period() / ?UNIT_TIME_IN_MS}
+                {ok, Quota * default_period() / ?UNIT_TIME_IN_MS}
             end,
             to_capacity(QuotaStr, Str, CanZero, Fun);
         {match, [QuotaStr, TimeVal, TimeUnit]} ->
@@ -250,7 +250,7 @@ to_rate(Str, CanInfinity, CanZero) ->
                 try
                     case emqx_schema:to_duration_ms(Interval) of
                         {ok, Ms} when Ms > 0 ->
-                            {ok, Quota * minimum_period() / Ms};
+                            {ok, Quota * default_period() / Ms};
                         {ok, 0} when CanZero ->
                             {ok, 0};
                         _ ->

+ 24 - 35
apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl

@@ -57,13 +57,13 @@
     burst := rate(),
     %% token generation interval(second)
     period := pos_integer(),
-    consumed := non_neg_integer()
+    produced := float()
 }.
 
 -type bucket() :: #{
     name := bucket_name(),
     rate := rate(),
-    obtained := non_neg_integer(),
+    obtained := float(),
     %% token correction value
     correction := emqx_limiter_decimal:zero_or_float(),
     capacity := capacity(),
@@ -314,26 +314,26 @@ oscillation(
         root := #{
             rate := Flow,
             period := Interval,
-            consumed := Consumed
+            produced := Produced
         } = Root,
         buckets := Buckets
     } = State
 ) ->
     oscillate(Interval),
     Ordereds = get_ordered_buckets(Buckets),
-    {Alloced, Buckets2} = transverse(Ordereds, Flow, 0, Buckets),
+    {Alloced, Buckets2} = transverse(Ordereds, Flow, 0.0, Buckets),
     maybe_burst(State#{
         buckets := Buckets2,
-        root := Root#{consumed := Consumed + Alloced}
+        root := Root#{produced := Produced + Alloced}
     }).
 
 %% @doc horizontal spread
 -spec transverse(
     list(bucket()),
     flow(),
-    non_neg_integer(),
+    float(),
     buckets()
-) -> {non_neg_integer(), buckets()}.
+) -> {float(), buckets()}.
 transverse([H | T], InFlow, Alloced, Buckets) when InFlow > 0 ->
     {BucketAlloced, Buckets2} = longitudinal(H, InFlow, Buckets),
     InFlow2 = sub(InFlow, BucketAlloced),
@@ -344,7 +344,7 @@ transverse(_, _, Alloced, Buckets) ->
 
 %% @doc vertical spread
 -spec longitudinal(bucket(), flow(), buckets()) ->
-    {non_neg_integer(), buckets()}.
+    {float(), buckets()}.
 longitudinal(
     #{
         name := Name,
@@ -381,7 +381,7 @@ longitudinal(
             {Inc, Bucket2} = emqx_limiter_correction:add(Available, Bucket),
             counters:add(Counter, Index, Inc),
 
-            {Inc, Buckets#{Name := Bucket2#{obtained := Obtained + Inc}}};
+            {Available, Buckets#{Name := Bucket2#{obtained := Obtained + Available}}};
         _ ->
             {0, Buckets}
     end;
@@ -431,11 +431,11 @@ dispatch_burst([], _, State) ->
 dispatch_burst(
     Empties,
     InFlow,
-    #{root := #{consumed := Consumed} = Root, buckets := Buckets} = State
+    #{root := #{produced := Produced} = Root, buckets := Buckets} = State
 ) ->
     EachFlow = InFlow / erlang:length(Empties),
     {Alloced, Buckets2} = dispatch_burst_to_buckets(Empties, EachFlow, 0, Buckets),
-    State#{root := Root#{consumed := Consumed + Alloced}, buckets := Buckets2}.
+    State#{root := Root#{produced := Produced + Alloced}, buckets := Buckets2}.
 
 -spec dispatch_burst_to_buckets(
     list(bucket()),
@@ -473,8 +473,8 @@ init_tree(Type, #{bucket := Buckets} = Cfg) ->
         buckets => #{}
     },
 
-    {Factor, Root} = make_root(Cfg),
-    {CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, Factor, 1, []),
+    Root = make_root(Cfg),
+    {CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, 1, []),
 
     State2 = State#{
         root := Root,
@@ -483,25 +483,16 @@ init_tree(Type, #{bucket := Buckets} = Cfg) ->
 
     lists:foldl(fun(F, Acc) -> F(Acc) end, State2, DelayBuckets).
 
--spec make_root(hocons:confg()) -> {number(), root()}.
-make_root(#{rate := Rate, burst := Burst}) when Rate >= 1 ->
-    {1, #{
+-spec make_root(hocons:confg()) -> root().
+make_root(#{rate := Rate, burst := Burst}) ->
+    #{
         rate => Rate,
         burst => Burst,
-        period => emqx_limiter_schema:minimum_period(),
-        consumed => 0
-    }};
-make_root(#{rate := Rate, burst := Burst}) ->
-    MiniPeriod = emqx_limiter_schema:minimum_period(),
-    Factor = 1 / Rate,
-    {Factor, #{
-        rate => 1,
-        burst => Burst * Factor,
-        period => erlang:floor(Factor * MiniPeriod),
-        consumed => 0
-    }}.
-
-make_bucket([{Name, Conf} | T], Type, GlobalCfg, Factor, CounterNum, DelayBuckets) ->
+        period => emqx_limiter_schema:default_period(),
+        produced => 0.0
+    }.
+
+make_bucket([{Name, Conf} | T], Type, GlobalCfg, CounterNum, DelayBuckets) ->
     Path = emqx_limiter_manager:make_path(Type, Name),
     case get_counter_rate(Conf, GlobalCfg) of
         infinity ->
@@ -514,13 +505,12 @@ make_bucket([{Name, Conf} | T], Type, GlobalCfg, Factor, CounterNum, DelayBucket
             InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) ->
                 State#{buckets := Buckets#{BucketName => Bucket}}
             end;
-        RawRate ->
+        Rate ->
             #{capacity := Capacity} = Conf,
             Initial = get_initial_val(Conf),
-            Rate = mul(RawRate, Factor),
             CounterNum2 = CounterNum + 1,
             InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) ->
-                {Counter, Idx, State2} = alloc_counter(Path, RawRate, Initial, State),
+                {Counter, Idx, State2} = alloc_counter(Path, Rate, Initial, State),
                 Bucket2 = Bucket#{counter := Counter, index := Idx},
                 State2#{buckets := Buckets#{BucketName => Bucket2}}
             end
@@ -542,11 +532,10 @@ make_bucket([{Name, Conf} | T], Type, GlobalCfg, Factor, CounterNum, DelayBucket
         T,
         Type,
         GlobalCfg,
-        Factor,
         CounterNum2,
         [DelayInit | DelayBuckets]
     );
-make_bucket([], _Type, _Global, _Factor, CounterNum, DelayBuckets) ->
+make_bucket([], _Type, _Global, CounterNum, DelayBuckets) ->
     {CounterNum, DelayBuckets}.
 
 -spec alloc_counter(emqx_limiter_manager:path(), rate(), capacity(), state()) ->

+ 1 - 1
apps/emqx/test/emqx_ratelimiter_SUITE.erl

@@ -646,7 +646,7 @@ client_loop(
     } = State
 ) ->
     Now = ?NOW,
-    Period = emqx_limiter_schema:minimum_period(),
+    Period = emqx_limiter_schema:default_period(),
     MinPeriod = erlang:ceil(0.25 * Period),
     if
         Now >= EndTime ->