Преглед изворни кода

fix(limiter): fix test case error

firest пре 3 година
родитељ
комит
ce46cb9216

+ 4 - 1
apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl

@@ -102,7 +102,10 @@ fields(limiter) ->
                     ?R_REF(client_fields),
                     ?R_REF(client_fields),
                     #{
                     #{
                         desc => ?DESC(client),
                         desc => ?DESC(client),
-                        default => #{}
+                        default => maps:from_list([
+                            {erlang:atom_to_binary(Type), #{}}
+                         || Type <- types()
+                        ])
                     }
                     }
                 )}
                 )}
         ];
         ];

+ 16 - 13
apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl

@@ -124,19 +124,22 @@ connect(Id, Type, Cfg) ->
             #{
             #{
                 rate := BucketRate,
                 rate := BucketRate,
                 capacity := BucketSize
                 capacity := BucketSize
-            } = BucketCfg,
+            },
             #{rate := CliRate, capacity := CliSize} = ClientCfg
             #{rate := CliRate, capacity := CliSize} = ClientCfg
         } ->
         } ->
-            {ok,
-                if
-                    CliRate < BucketRate orelse CliSize < BucketSize ->
-                        emqx_htb_limiter:make_token_bucket_limiter(ClientCfg, BucketCfg);
-                    true ->
-                        emqx_htb_limiter:make_ref_limiter(ClientCfg, BucketCfg)
-                end};
-        undefined ->
-            ?SLOG(error, #{msg => "bucket_not_found", type => Type, id => Id}),
-            {error, invalid_bucket}
+            case emqx_limiter_manager:find_bucket(Id, Type) of
+                {ok, Bucket} ->
+                    {ok,
+                        if
+                            CliRate < BucketRate orelse CliSize < BucketSize ->
+                                emqx_htb_limiter:make_token_bucket_limiter(ClientCfg, Bucket);
+                            true ->
+                                emqx_htb_limiter:make_ref_limiter(ClientCfg, Bucket)
+                        end};
+                undefined ->
+                    ?SLOG(error, #{msg => "bucket_not_found", type => Type, id => Id}),
+                    {error, invalid_bucket}
+            end
     end.
     end.
 
 
 -spec add_bucket(limiter_id(), limiter_type(), hocons:config() | undefined) -> ok.
 -spec add_bucket(limiter_id(), limiter_type(), hocons:config() | undefined) -> ok.
@@ -523,7 +526,7 @@ make_bucket(
     _ = put_to_counter(Counter, NewIndex, Initial),
     _ = put_to_counter(Counter, NewIndex, Initial),
     Ref = emqx_limiter_bucket_ref:new(Counter, NewIndex, Rate),
     Ref = emqx_limiter_bucket_ref:new(Counter, NewIndex, Rate),
     emqx_limiter_manager:insert_bucket(Id, Type, Ref),
     emqx_limiter_manager:insert_bucket(Id, Type, Ref),
-    State#{buckets := Buckets#{Id => Bucket}}.
+    State#{buckets := Buckets#{Id => Bucket}, index := NewIndex}.
 
 
 do_del_bucket(Id, #{type := Type, buckets := Buckets} = State) ->
 do_del_bucket(Id, #{type := Type, buckets := Buckets} = State) ->
     case maps:get(Id, Buckets, undefined) of
     case maps:get(Id, Buckets, undefined) of
@@ -568,7 +571,7 @@ find_limiter_cfg(Type, #{rate := _} = Cfg) ->
     {Cfg, find_client_cfg(Type, maps:get(client, Cfg, undefined))};
     {Cfg, find_client_cfg(Type, maps:get(client, Cfg, undefined))};
 find_limiter_cfg(Type, Cfg) ->
 find_limiter_cfg(Type, Cfg) ->
     {
     {
-        maps:get(Type, Cfg),
+        maps:get(Type, Cfg, undefined),
         find_client_cfg(Type, emqx_map_lib:deep_get([client, Type], Cfg, undefined))
         find_client_cfg(Type, emqx_map_lib:deep_get([client, Type], Cfg, undefined))
     }.
     }.
 
 

+ 2 - 2
apps/emqx/src/emqx_listeners.erl

@@ -538,13 +538,13 @@ zone(Opts) ->
 limiter(Opts) ->
 limiter(Opts) ->
     maps:get(limiter, Opts, #{}).
     maps:get(limiter, Opts, #{}).
 
 
-add_limiter_bucket(Id, #{limiter := Limiters}) ->
+add_limiter_bucket(Id, #{limiter := Limiter}) ->
     maps:fold(
     maps:fold(
         fun(Type, Cfg, _) ->
         fun(Type, Cfg, _) ->
             emqx_limiter_server:add_bucket(Id, Type, Cfg)
             emqx_limiter_server:add_bucket(Id, Type, Cfg)
         end,
         end,
         ok,
         ok,
-        Limiters
+        maps:without([client], Limiter)
     );
     );
 add_limiter_bucket(_Id, _Cfg) ->
 add_limiter_bucket(_Id, _Cfg) ->
     ok.
     ok.

+ 9 - 5
apps/emqx/test/emqx_channel_SUITE.erl

@@ -1205,9 +1205,7 @@ session(InitFields) when is_map(InitFields) ->
 quota() ->
 quota() ->
     emqx_limiter_container:get_limiter_by_types(?MODULE, [message_routing], limiter_cfg()).
     emqx_limiter_container:get_limiter_by_types(?MODULE, [message_routing], limiter_cfg()).
 
 
-limiter_cfg() -> #{message_routing => make_limiter_cfg()}.
-
-make_limiter_cfg() ->
+limiter_cfg() ->
     Client = #{
     Client = #{
         rate => 5,
         rate => 5,
         initial => 0,
         initial => 0,
@@ -1217,10 +1215,16 @@ make_limiter_cfg() ->
         max_retry_time => timer:seconds(5),
         max_retry_time => timer:seconds(5),
         failure_strategy => force
         failure_strategy => force
     },
     },
-    #{client => Client, rate => 10, initial => 0, capacity => 10}.
+    #{
+        message_routing => bucket_cfg(),
+        client => #{message_routing => Client}
+    }.
+
+bucket_cfg() ->
+    #{rate => 10, initial => 0, capacity => 10}.
 
 
 add_bucket() ->
 add_bucket() ->
-    emqx_limiter_server:add_bucket(?MODULE, message_routing, make_limiter_cfg()).
+    emqx_limiter_server:add_bucket(?MODULE, message_routing, bucket_cfg()).
 
 
 del_bucket() ->
 del_bucket() ->
     emqx_limiter_server:del_bucket(?MODULE, message_routing).
     emqx_limiter_server:del_bucket(?MODULE, message_routing).

+ 7 - 6
apps/emqx/test/emqx_connection_SUITE.erl

@@ -708,11 +708,8 @@ init_limiter() ->
     emqx_limiter_container:get_limiter_by_types(?LIMITER_ID, [bytes_in, message_in], limiter_cfg()).
     emqx_limiter_container:get_limiter_by_types(?LIMITER_ID, [bytes_in, message_in], limiter_cfg()).
 
 
 limiter_cfg() ->
 limiter_cfg() ->
-    Cfg = make_limiter_cfg(),
-    #{bytes_in => Cfg, message_in => Cfg}.
-
-make_limiter_cfg() ->
     Infinity = emqx_limiter_schema:infinity_value(),
     Infinity = emqx_limiter_schema:infinity_value(),
+    Cfg = bucket_cfg(),
     Client = #{
     Client = #{
         rate => Infinity,
         rate => Infinity,
         initial => 0,
         initial => 0,
@@ -722,10 +719,14 @@ make_limiter_cfg() ->
         max_retry_time => timer:seconds(5),
         max_retry_time => timer:seconds(5),
         failure_strategy => force
         failure_strategy => force
     },
     },
-    #{client => Client, rate => Infinity, initial => 0, capacity => Infinity}.
+    #{bytes_in => Cfg, message_in => Cfg, client => #{bytes_in => Client, message_in => Client}}.
+
+bucket_cfg() ->
+    Infinity = emqx_limiter_schema:infinity_value(),
+    #{rate => Infinity, initial => 0, capacity => Infinity}.
 
 
 add_bucket() ->
 add_bucket() ->
-    Cfg = make_limiter_cfg(),
+    Cfg = bucket_cfg(),
     emqx_limiter_server:add_bucket(?LIMITER_ID, bytes_in, Cfg),
     emqx_limiter_server:add_bucket(?LIMITER_ID, bytes_in, Cfg),
     emqx_limiter_server:add_bucket(?LIMITER_ID, message_in, Cfg).
     emqx_limiter_server:add_bucket(?LIMITER_ID, message_in, Cfg).
 
 

+ 16 - 14
apps/emqx/test/emqx_ratelimiter_SUITE.erl

@@ -310,8 +310,8 @@ t_capacity(_) ->
 %% Test Cases Global Level
 %% Test Cases Global Level
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 t_collaborative_alloc(_) ->
 t_collaborative_alloc(_) ->
-    GlobalMod = fun(Cfg) ->
-        Cfg#{rate := ?RATE("600/1s")}
+    GlobalMod = fun(#{message_routing := MR} = Cfg) ->
+        Cfg#{message_routing := MR#{rate := ?RATE("600/1s")}}
     end,
     end,
 
 
     Bucket1 = fun(#{client := Cli} = Bucket) ->
     Bucket1 = fun(#{client := Cli} = Bucket) ->
@@ -350,10 +350,12 @@ t_collaborative_alloc(_) ->
     ).
     ).
 
 
 t_burst(_) ->
 t_burst(_) ->
-    GlobalMod = fun(Cfg) ->
+    GlobalMod = fun(#{message_routing := MR} = Cfg) ->
         Cfg#{
         Cfg#{
-            rate := ?RATE("200/1s"),
-            burst := ?RATE("400/1s")
+            message_routing := MR#{
+                rate := ?RATE("200/1s"),
+                burst := ?RATE("400/1s")
+            }
         }
         }
     end,
     end,
 
 
@@ -391,8 +393,8 @@ t_burst(_) ->
     ).
     ).
 
 
 t_limit_global_with_unlimit_other(_) ->
 t_limit_global_with_unlimit_other(_) ->
-    GlobalMod = fun(Cfg) ->
-        Cfg#{rate := ?RATE("600/1s")}
+    GlobalMod = fun(#{message_routing := MR} = Cfg) ->
+        Cfg#{message_routing := MR#{rate := ?RATE("600/1s")}}
     end,
     end,
 
 
     Bucket = fun(#{client := Cli} = Bucket) ->
     Bucket = fun(#{client := Cli} = Bucket) ->
@@ -433,11 +435,11 @@ t_check_container(_) ->
             capacity := 1000
             capacity := 1000
         }
         }
     end,
     end,
-    Case = fun(BucketCfg) ->
+    Case = fun(#{client := Client} = BucketCfg) ->
         C1 = emqx_limiter_container:get_limiter_by_types(
         C1 = emqx_limiter_container:get_limiter_by_types(
             ?MODULE,
             ?MODULE,
             [message_routing],
             [message_routing],
-            #{message_routing => BucketCfg}
+            #{message_routing => BucketCfg, client => #{message_routing => Client}}
         ),
         ),
         {ok, C2} = emqx_limiter_container:check(1000, message_routing, C1),
         {ok, C2} = emqx_limiter_container:check(1000, message_routing, C1),
         {pause, Pause, C3} = emqx_limiter_container:check(1000, message_routing, C2),
         {pause, Pause, C3} = emqx_limiter_container:check(1000, message_routing, C2),
@@ -455,8 +457,8 @@ t_check_container(_) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 t_bucket_no_client(_) ->
 t_bucket_no_client(_) ->
     Rate = ?RATE("1/s"),
     Rate = ?RATE("1/s"),
-    GlobalMod = fun(#{client := Client} = Cfg) ->
-        Cfg#{client := Client#{rate := Rate}}
+    GlobalMod = fun(#{client := #{message_routing := MR} = Client} = Cfg) ->
+        Cfg#{client := Client#{message_routing := MR#{rate := Rate}}}
     end,
     end,
     BucketMod = fun(Bucket) ->
     BucketMod = fun(Bucket) ->
         maps:remove(client, Bucket)
         maps:remove(client, Bucket)
@@ -470,8 +472,8 @@ t_bucket_no_client(_) ->
 t_bucket_client(_) ->
 t_bucket_client(_) ->
     GlobalRate = ?RATE("1/s"),
     GlobalRate = ?RATE("1/s"),
     BucketRate = ?RATE("10/s"),
     BucketRate = ?RATE("10/s"),
-    GlobalMod = fun(#{client := Client} = Cfg) ->
-        Cfg#{client := Client#{rate := GlobalRate}}
+    GlobalMod = fun(#{client := #{message_routing := MR} = Client} = Cfg) ->
+        Cfg#{client := Client#{message_routing := MR#{rate := GlobalRate}}}
     end,
     end,
     BucketMod = fun(#{client := Client} = Bucket) ->
     BucketMod = fun(#{client := Client} = Bucket) ->
         Bucket#{client := Client#{rate := BucketRate}}
         Bucket#{client := Client#{rate := BucketRate}}
@@ -682,7 +684,7 @@ to_rate(Str) ->
     Rate.
     Rate.
 
 
 with_global(Modifier, Buckets, Case) ->
 with_global(Modifier, Buckets, Case) ->
-    with_config([limiter, message_routing], Modifier, Buckets, Case).
+    with_config([limiter], Modifier, Buckets, Case).
 
 
 with_bucket(Modifier, Case) ->
 with_bucket(Modifier, Case) ->
     Cfg = Modifier(make_limiter_cfg()),
     Cfg = Modifier(make_limiter_cfg()),

+ 17 - 12
apps/emqx/test/emqx_ws_connection_SUITE.erl

@@ -509,8 +509,9 @@ t_handle_timeout_emit_stats(_) ->
 t_ensure_rate_limit(_) ->
 t_ensure_rate_limit(_) ->
     {ok, Rate} = emqx_limiter_schema:to_rate("50MB"),
     {ok, Rate} = emqx_limiter_schema:to_rate("50MB"),
     Limiter = init_limiter(#{
     Limiter = init_limiter(#{
-        bytes_in => make_limiter_cfg(Rate),
-        message_in => make_limiter_cfg()
+        bytes_in => bucket_cfg(),
+        message_in => bucket_cfg(),
+        client => #{bytes_in => client_cfg(Rate)}
     }),
     }),
     St = st(#{limiter => Limiter}),
     St = st(#{limiter => Limiter}),
 
 
@@ -698,28 +699,32 @@ init_limiter(LimiterCfg) ->
     emqx_limiter_container:get_limiter_by_types(?LIMITER_ID, [bytes_in, message_in], LimiterCfg).
     emqx_limiter_container:get_limiter_by_types(?LIMITER_ID, [bytes_in, message_in], LimiterCfg).
 
 
 limiter_cfg() ->
 limiter_cfg() ->
-    Cfg = make_limiter_cfg(),
-    #{bytes_in => Cfg, message_in => Cfg}.
+    Cfg = bucket_cfg(),
+    Client = client_cfg(),
+    #{bytes_in => Cfg, message_in => Cfg, client => #{bytes_in => Client, message_in => Client}}.
 
 
-make_limiter_cfg() ->
+client_cfg() ->
     Infinity = emqx_limiter_schema:infinity_value(),
     Infinity = emqx_limiter_schema:infinity_value(),
-    make_limiter_cfg(Infinity).
+    client_cfg(Infinity).
 
 
-make_limiter_cfg(ClientRate) ->
+client_cfg(Rate) ->
     Infinity = emqx_limiter_schema:infinity_value(),
     Infinity = emqx_limiter_schema:infinity_value(),
-    Client = #{
-        rate => ClientRate,
+    #{
+        rate => Rate,
         initial => 0,
         initial => 0,
         capacity => Infinity,
         capacity => Infinity,
         low_watermark => 1,
         low_watermark => 1,
         divisible => false,
         divisible => false,
         max_retry_time => timer:seconds(5),
         max_retry_time => timer:seconds(5),
         failure_strategy => force
         failure_strategy => force
-    },
-    #{client => Client, rate => Infinity, initial => 0, capacity => Infinity}.
+    }.
+
+bucket_cfg() ->
+    Infinity = emqx_limiter_schema:infinity_value(),
+    #{rate => Infinity, initial => 0, capacity => Infinity}.
 
 
 add_bucket() ->
 add_bucket() ->
-    Cfg = make_limiter_cfg(),
+    Cfg = bucket_cfg(),
     emqx_limiter_server:add_bucket(?LIMITER_ID, bytes_in, Cfg),
     emqx_limiter_server:add_bucket(?LIMITER_ID, bytes_in, Cfg),
     emqx_limiter_server:add_bucket(?LIMITER_ID, message_in, Cfg).
     emqx_limiter_server:add_bucket(?LIMITER_ID, message_in, Cfg).