Kaynağa Gözat

Merge pull request #10448 from lafirest/fix/limiter_compatibility

fix(limiter): fix compatibility problem of configuration
lafirest 2 yıl önce
ebeveyn
işleme
8ccfbe9e16

+ 38 - 38
apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl

@@ -24,6 +24,7 @@
     fields/1,
     to_rate/1,
     to_capacity/1,
+    to_burst/1,
     default_period/0,
     to_burst_rate/1,
     to_initial/1,
@@ -54,8 +55,10 @@
 -type bucket_name() :: atom().
 -type rate() :: infinity | float().
 -type burst_rate() :: 0 | float().
+%% this is a compatible type for the deprecated field and type `capacity`.
+-type burst() :: burst_rate().
 %% the capacity of the token bucket
--type capacity() :: non_neg_integer().
+%%-type capacity() :: non_neg_integer().
 %% initial capacity of the token bucket
 -type initial() :: non_neg_integer().
 -type bucket_path() :: list(atom()).
@@ -72,13 +75,13 @@
 
 -typerefl_from_string({rate/0, ?MODULE, to_rate}).
 -typerefl_from_string({burst_rate/0, ?MODULE, to_burst_rate}).
--typerefl_from_string({capacity/0, ?MODULE, to_capacity}).
+-typerefl_from_string({burst/0, ?MODULE, to_burst}).
 -typerefl_from_string({initial/0, ?MODULE, to_initial}).
 
 -reflect_type([
     rate/0,
     burst_rate/0,
-    capacity/0,
+    burst/0,
     initial/0,
     failure_strategy/0,
     bucket_name/0
@@ -130,39 +133,9 @@ fields(node_opts) ->
 fields(client_fields) ->
     client_fields(types(), #{default => #{}});
 fields(bucket_infinity) ->
-    [
-        {rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => <<"infinity">>})},
-        {burst,
-            ?HOCON(capacity(), #{
-                desc => ?DESC(capacity),
-                default => <<"0">>,
-                importance => ?IMPORTANCE_HIDDEN,
-                aliases => [capacity]
-            })},
-        {initial,
-            ?HOCON(initial(), #{
-                default => <<"0">>,
-                desc => ?DESC(initial),
-                importance => ?IMPORTANCE_HIDDEN
-            })}
-    ];
+    fields_of_bucket(<<"infinity">>);
 fields(bucket_limit) ->
-    [
-        {rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => <<"1000/s">>})},
-        {burst,
-            ?HOCON(capacity(), #{
-                desc => ?DESC(burst),
-                default => <<"0">>,
-                importance => ?IMPORTANCE_HIDDEN,
-                aliases => [capacity]
-            })},
-        {initial,
-            ?HOCON(initial(), #{
-                default => <<"0">>,
-                desc => ?DESC(initial),
-                importance => ?IMPORTANCE_HIDDEN
-            })}
-    ];
+    fields_of_bucket(<<"1000/s">>);
 fields(client_opts) ->
     [
         {rate, ?HOCON(rate(), #{default => <<"infinity">>, desc => ?DESC(rate)})},
@@ -186,7 +159,7 @@ fields(client_opts) ->
                 }
             )},
         {burst,
-            ?HOCON(capacity(), #{
+            ?HOCON(burst(), #{
                 desc => ?DESC(burst),
                 default => <<"0">>,
                 importance => ?IMPORTANCE_HIDDEN,
@@ -265,8 +238,6 @@ types() ->
 
 calc_capacity(#{rate := infinity}) ->
     infinity;
-calc_capacity(#{burst := infinity}) ->
-    infinity;
 calc_capacity(#{rate := Rate, burst := Burst}) ->
     erlang:floor(1000 * Rate / default_period()) + Burst.
 
@@ -277,6 +248,17 @@ calc_capacity(#{rate := Rate, burst := Burst}) ->
 to_burst_rate(Str) ->
     to_rate(Str, false, true).
 
+%% The default value of `capacity` is `infinity`,
+%% but we have changed `capacity` to `burst` which should not be `infinity`
+%% and its default value is 0, so we should convert `infinity` to 0
+to_burst(Str) ->
+    case to_rate(Str, true, true) of
+        {ok, infinity} ->
+            {ok, 0};
+        Any ->
+            Any
+    end.
+
 %% rate can be: 10 10MB 10MB/s 10MB/2s infinity
 %% e.g. the bytes_in regex tree is:
 %%
@@ -415,6 +397,24 @@ composite_bucket_fields(Types, ClientRef) ->
                 )}
         ].
 
+fields_of_bucket(Default) ->
+    [
+        {rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => Default})},
+        {burst,
+            ?HOCON(burst(), #{
+                desc => ?DESC(burst),
+                default => <<"0">>,
+                importance => ?IMPORTANCE_HIDDEN,
+                aliases => [capacity]
+            })},
+        {initial,
+            ?HOCON(initial(), #{
+                default => <<"0">>,
+                desc => ?DESC(initial),
+                importance => ?IMPORTANCE_HIDDEN
+            })}
+    ].
+
 client_fields(Types, Meta) ->
     [
         {Type,

+ 72 - 8
apps/emqx/test/emqx_ratelimiter_SUITE.erl

@@ -220,7 +220,7 @@ t_try_restore_agg(_) ->
         },
         Cli2 = Cli#{
             rate := infinity,
-            burst := infinity,
+            burst := 0,
             divisible := true,
             max_retry_time := 100,
             failure_strategy := force
@@ -264,11 +264,11 @@ t_rate(_) ->
         Bucket2 = Bucket#{
             rate := ?RATE("100/100ms"),
             initial := 0,
-            burst := infinity
+            burst := 0
         },
         Cli2 = Cli#{
             rate := infinity,
-            burst := infinity,
+            burst := 0,
             initial := 0
         },
         Bucket2#{client := Cli2}
@@ -295,7 +295,7 @@ t_capacity(_) ->
         },
         Cli2 = Cli#{
             rate := infinity,
-            burst := infinity,
+            burst := 0,
             initial := 0
         },
         Bucket2#{client := Cli2}
@@ -403,11 +403,11 @@ t_limit_global_with_unlimit_other(_) ->
         Bucket2 = Bucket#{
             rate := infinity,
             initial := 0,
-            burst := infinity
+            burst := 0
         },
         Cli2 = Cli#{
             rate := infinity,
-            burst := infinity,
+            burst := 0,
             initial := 0
         },
         Bucket2#{client := Cli2}
@@ -574,6 +574,66 @@ t_schema_unit(_) ->
     ?assertEqual({ok, 100 * 1024 * 1024 * 1024}, M:to_capacity("100GB")),
     ok.
 
+compatibility_for_capacity(_) ->
+    CfgStr = <<
+        ""
+        "\n"
+        "listeners.tcp.default {\n"
+        "  bind = \"0.0.0.0:1883\"\n"
+        "  max_connections = 1024000\n"
+        "  limiter.messages.capacity = infinity\n"
+        "  limiter.client.messages.capacity = infinity\n"
+        "}\n"
+        ""
+    >>,
+    ?assertMatch(
+        #{
+            messages := #{burst := 0},
+            client := #{messages := #{burst := 0}}
+        },
+        parse_and_check(CfgStr)
+    ).
+
+compatibility_for_message_in(_) ->
+    CfgStr = <<
+        ""
+        "\n"
+        "listeners.tcp.default {\n"
+        "  bind = \"0.0.0.0:1883\"\n"
+        "  max_connections = 1024000\n"
+        "  limiter.message_in.rate = infinity\n"
+        "  limiter.client.message_in.rate = infinity\n"
+        "}\n"
+        ""
+    >>,
+    ?assertMatch(
+        #{
+            messages := #{rate := infinity},
+            client := #{messages := #{rate := infinity}}
+        },
+        parse_and_check(CfgStr)
+    ).
+
+compatibility_for_bytes_in(_) ->
+    CfgStr = <<
+        ""
+        "\n"
+        "listeners.tcp.default {\n"
+        "  bind = \"0.0.0.0:1883\"\n"
+        "  max_connections = 1024000\n"
+        "  limiter.bytes_in.rate = infinity\n"
+        "  limiter.client.bytes_in.rate = infinity\n"
+        "}\n"
+        ""
+    >>,
+    ?assertMatch(
+        #{
+            bytes := #{rate := infinity},
+            client := #{bytes := #{rate := infinity}}
+        },
+        parse_and_check(CfgStr)
+    ).
+
 %%--------------------------------------------------------------------
 %%% Internal functions
 %%--------------------------------------------------------------------
@@ -753,13 +813,13 @@ make_limiter_cfg() ->
     Client = #{
         rate => infinity,
         initial => 0,
-        burst => infinity,
+        burst => 0,
         low_watermark => 0,
         divisible => false,
         max_retry_time => timer:seconds(5),
         failure_strategy => force
     },
-    #{client => Client, rate => infinity, initial => 0, burst => infinity}.
+    #{client => Client, rate => infinity, initial => 0, burst => 0}.
 
 add_bucket(Cfg) ->
     add_bucket(?MODULE, Cfg).
@@ -813,3 +873,7 @@ apply_modifier(Pairs, #{default := Template}) ->
         Acc#{N => M(Template)}
     end,
     lists:foldl(Fun, #{}, Pairs).
+
+parse_and_check(ConfigString) ->
+    ok = emqx_common_test_helpers:load_config(emqx_schema, ConfigString),
+    emqx:get_config([listeners, tcp, default, limiter]).

+ 1 - 1
apps/emqx_dashboard/src/emqx_dashboard.app.src

@@ -2,7 +2,7 @@
 {application, emqx_dashboard, [
     {description, "EMQX Web Dashboard"},
     % strict semver, bump manually!
-    {vsn, "5.0.18"},
+    {vsn, "5.0.19"},
     {modules, []},
     {registered, [emqx_dashboard_sup]},
     {applications, [kernel, stdlib, mnesia, minirest, emqx, emqx_ctl]},

+ 1 - 1
apps/emqx_dashboard/src/emqx_dashboard_swagger.erl

@@ -768,7 +768,7 @@ typename_to_spec("log_level()", _Mod) ->
     };
 typename_to_spec("rate()", _Mod) ->
     #{type => string, example => <<"10MB">>};
-typename_to_spec("capacity()", _Mod) ->
+typename_to_spec("burst()", _Mod) ->
     #{type => string, example => <<"100MB">>};
 typename_to_spec("burst_rate()", _Mod) ->
     %% 0/0s = no burst

+ 3 - 0
changes/ce/fix-10448.en.md

@@ -0,0 +1,3 @@
+Fix a compatibility issue of limiter configuration introduced by v5.0.23 which broke the upgrade from previous versions if the `capacity` is `infinity`. 
+
+In v5.0.23 we have replaced `capacity` with `burst`. After this fix, a `capacity = infinity` config will be automatically converted to equivalent `burst = 0`.