|
|
@@ -38,6 +38,7 @@
|
|
|
-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
|
|
|
-define(RATE(Rate), to_rate(Rate)).
|
|
|
-define(NOW, erlang:system_time(millisecond)).
|
|
|
+-define(ROOT_COUNTER_IDX, 1).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Setups
|
|
|
@@ -72,7 +73,7 @@ t_consume(_) ->
|
|
|
Cfg = fun(Cfg) ->
|
|
|
Cfg#{
|
|
|
rate := 100,
|
|
|
- capacity := 100,
|
|
|
+ burst := 0,
|
|
|
initial := 100,
|
|
|
max_retry_time := 1000,
|
|
|
failure_strategy := force
|
|
|
@@ -89,7 +90,7 @@ t_retry(_) ->
|
|
|
Cfg = fun(Cfg) ->
|
|
|
Cfg#{
|
|
|
rate := 50,
|
|
|
- capacity := 200,
|
|
|
+ burst := 150,
|
|
|
initial := 0,
|
|
|
max_retry_time := 1000,
|
|
|
failure_strategy := force
|
|
|
@@ -109,7 +110,7 @@ t_restore(_) ->
|
|
|
Cfg = fun(Cfg) ->
|
|
|
Cfg#{
|
|
|
rate := 1,
|
|
|
- capacity := 200,
|
|
|
+ burst := 199,
|
|
|
initial := 50,
|
|
|
max_retry_time := 100,
|
|
|
failure_strategy := force
|
|
|
@@ -129,7 +130,7 @@ t_max_retry_time(_) ->
|
|
|
Cfg = fun(Cfg) ->
|
|
|
Cfg#{
|
|
|
rate := 1,
|
|
|
- capacity := 1,
|
|
|
+ burst := 0,
|
|
|
max_retry_time := 500,
|
|
|
failure_strategy := drop
|
|
|
}
|
|
|
@@ -139,8 +140,12 @@ t_max_retry_time(_) ->
|
|
|
Begin = ?NOW,
|
|
|
Result = emqx_htb_limiter:consume(101, Client),
|
|
|
?assertMatch({drop, _}, Result),
|
|
|
- Time = ?NOW - Begin,
|
|
|
- ?assert(Time >= 500 andalso Time < 550)
|
|
|
+ End = ?NOW,
|
|
|
+ Time = End - Begin,
|
|
|
+ ?assert(
|
|
|
+ Time >= 500 andalso Time < 550,
|
|
|
+ lists:flatten(io_lib:format("Begin:~p, End:~p, Time:~p~n", [Begin, End, Time]))
|
|
|
+ )
|
|
|
end,
|
|
|
with_per_client(Cfg, Case).
|
|
|
|
|
|
@@ -150,7 +155,7 @@ t_divisible(_) ->
|
|
|
divisible := true,
|
|
|
rate := ?RATE("1000/1s"),
|
|
|
initial := 600,
|
|
|
- capacity := 600
|
|
|
+ burst := 0
|
|
|
}
|
|
|
end,
|
|
|
Case = fun(BucketCfg) ->
|
|
|
@@ -176,7 +181,7 @@ t_low_watermark(_) ->
|
|
|
low_watermark := 400,
|
|
|
rate := ?RATE("1000/1s"),
|
|
|
initial := 1000,
|
|
|
- capacity := 1000
|
|
|
+ burst := 0
|
|
|
}
|
|
|
end,
|
|
|
Case = fun(BucketCfg) ->
|
|
|
@@ -201,23 +206,22 @@ t_infinity_client(_) ->
|
|
|
Fun = fun(Cfg) -> Cfg end,
|
|
|
Case = fun(Cfg) ->
|
|
|
Client = connect(Cfg),
|
|
|
- InfVal = emqx_limiter_schema:infinity_value(),
|
|
|
- ?assertMatch(#{bucket := #{rate := InfVal}}, Client),
|
|
|
+ ?assertMatch(infinity, Client),
|
|
|
Result = emqx_htb_limiter:check(100000, Client),
|
|
|
?assertEqual({ok, Client}, Result)
|
|
|
end,
|
|
|
with_per_client(Fun, Case).
|
|
|
|
|
|
-t_try_restore_agg(_) ->
|
|
|
+t_try_restore_with_bucket(_) ->
|
|
|
Fun = fun(#{client := Cli} = Bucket) ->
|
|
|
Bucket2 = Bucket#{
|
|
|
- rate := 1,
|
|
|
- capacity := 200,
|
|
|
+ rate := 100,
|
|
|
+ burst := 100,
|
|
|
initial := 50
|
|
|
},
|
|
|
Cli2 = Cli#{
|
|
|
rate := infinity,
|
|
|
- capacity := infinity,
|
|
|
+ burst := 0,
|
|
|
divisible := true,
|
|
|
max_retry_time := 100,
|
|
|
failure_strategy := force
|
|
|
@@ -239,11 +243,11 @@ t_short_board(_) ->
|
|
|
Bucket2 = Bucket#{
|
|
|
rate := ?RATE("100/1s"),
|
|
|
initial := 0,
|
|
|
- capacity := 100
|
|
|
+ burst := 0
|
|
|
},
|
|
|
Cli2 = Cli#{
|
|
|
rate := ?RATE("600/1s"),
|
|
|
- capacity := 600,
|
|
|
+ burst := 0,
|
|
|
initial := 600
|
|
|
},
|
|
|
Bucket2#{client := Cli2}
|
|
|
@@ -261,46 +265,45 @@ t_rate(_) ->
|
|
|
Bucket2 = Bucket#{
|
|
|
rate := ?RATE("100/100ms"),
|
|
|
initial := 0,
|
|
|
- capacity := infinity
|
|
|
+ burst := 0
|
|
|
},
|
|
|
Cli2 = Cli#{
|
|
|
rate := infinity,
|
|
|
- capacity := infinity,
|
|
|
+ burst := 0,
|
|
|
initial := 0
|
|
|
},
|
|
|
Bucket2#{client := Cli2}
|
|
|
end,
|
|
|
Case = fun(Cfg) ->
|
|
|
+ Time = 1000,
|
|
|
Client = connect(Cfg),
|
|
|
- Ts1 = erlang:system_time(millisecond),
|
|
|
C1 = emqx_htb_limiter:available(Client),
|
|
|
- timer:sleep(1000),
|
|
|
- Ts2 = erlang:system_time(millisecond),
|
|
|
+ timer:sleep(1100),
|
|
|
C2 = emqx_htb_limiter:available(Client),
|
|
|
- ShouldInc = floor((Ts2 - Ts1) / 100) * 100,
|
|
|
+ ShouldInc = floor(Time / 100) * 100,
|
|
|
Inc = C2 - C1,
|
|
|
?assert(in_range(Inc, ShouldInc - 100, ShouldInc + 100), "test bucket rate")
|
|
|
end,
|
|
|
with_bucket(Fun, Case).
|
|
|
|
|
|
t_capacity(_) ->
|
|
|
- Capacity = 600,
|
|
|
+ Capacity = 1200,
|
|
|
Fun = fun(#{client := Cli} = Bucket) ->
|
|
|
Bucket2 = Bucket#{
|
|
|
rate := ?RATE("100/100ms"),
|
|
|
initial := 0,
|
|
|
- capacity := 600
|
|
|
+ burst := 200
|
|
|
},
|
|
|
Cli2 = Cli#{
|
|
|
rate := infinity,
|
|
|
- capacity := infinity,
|
|
|
+ burst := 0,
|
|
|
initial := 0
|
|
|
},
|
|
|
Bucket2#{client := Cli2}
|
|
|
end,
|
|
|
Case = fun(Cfg) ->
|
|
|
Client = connect(Cfg),
|
|
|
- timer:sleep(1000),
|
|
|
+ timer:sleep(1500),
|
|
|
C1 = emqx_htb_limiter:available(Client),
|
|
|
?assertEqual(Capacity, C1, "test bucket capacity")
|
|
|
end,
|
|
|
@@ -318,11 +321,11 @@ t_collaborative_alloc(_) ->
|
|
|
Bucket2 = Bucket#{
|
|
|
rate := ?RATE("400/1s"),
|
|
|
initial := 0,
|
|
|
- capacity := 600
|
|
|
+ burst := 200
|
|
|
},
|
|
|
Cli2 = Cli#{
|
|
|
rate := ?RATE("50"),
|
|
|
- capacity := 100,
|
|
|
+ burst := 50,
|
|
|
initial := 100
|
|
|
},
|
|
|
Bucket2#{client := Cli2}
|
|
|
@@ -363,11 +366,11 @@ t_burst(_) ->
|
|
|
Bucket2 = Bucket#{
|
|
|
rate := ?RATE("200/1s"),
|
|
|
initial := 0,
|
|
|
- capacity := 200
|
|
|
+ burst := 0
|
|
|
},
|
|
|
Cli2 = Cli#{
|
|
|
rate := ?RATE("50/1s"),
|
|
|
- capacity := 200,
|
|
|
+ burst := 150,
|
|
|
divisible := true
|
|
|
},
|
|
|
Bucket2#{client := Cli2}
|
|
|
@@ -392,38 +395,6 @@ t_burst(_) ->
|
|
|
Case
|
|
|
).
|
|
|
|
|
|
-t_limit_global_with_unlimit_other(_) ->
|
|
|
- GlobalMod = fun(#{message_routing := MR} = Cfg) ->
|
|
|
- Cfg#{message_routing := MR#{rate := ?RATE("600/1s")}}
|
|
|
- end,
|
|
|
-
|
|
|
- Bucket = fun(#{client := Cli} = Bucket) ->
|
|
|
- Bucket2 = Bucket#{
|
|
|
- rate := infinity,
|
|
|
- initial := 0,
|
|
|
- capacity := infinity
|
|
|
- },
|
|
|
- Cli2 = Cli#{
|
|
|
- rate := infinity,
|
|
|
- capacity := infinity,
|
|
|
- initial := 0
|
|
|
- },
|
|
|
- Bucket2#{client := Cli2}
|
|
|
- end,
|
|
|
-
|
|
|
- Case = fun() ->
|
|
|
- C1 = counters:new(1, []),
|
|
|
- start_client({b1, Bucket}, ?NOW + 2000, C1, 20),
|
|
|
- timer:sleep(2100),
|
|
|
- check_average_rate(C1, 2, 600)
|
|
|
- end,
|
|
|
-
|
|
|
- with_global(
|
|
|
- GlobalMod,
|
|
|
- [{b1, Bucket}],
|
|
|
- Case
|
|
|
- ).
|
|
|
-
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Test Cases container
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -432,7 +403,7 @@ t_check_container(_) ->
|
|
|
Cfg#{
|
|
|
rate := ?RATE("1000/1s"),
|
|
|
initial := 1000,
|
|
|
- capacity := 1000
|
|
|
+ burst := 0
|
|
|
}
|
|
|
end,
|
|
|
Case = fun(#{client := Client} = BucketCfg) ->
|
|
|
@@ -452,38 +423,6 @@ t_check_container(_) ->
|
|
|
end,
|
|
|
with_per_client(Cfg, Case).
|
|
|
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-%% Test Override
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-t_bucket_no_client(_) ->
|
|
|
- Rate = ?RATE("1/s"),
|
|
|
- GlobalMod = fun(#{client := #{message_routing := MR} = Client} = Cfg) ->
|
|
|
- Cfg#{client := Client#{message_routing := MR#{rate := Rate}}}
|
|
|
- end,
|
|
|
- BucketMod = fun(Bucket) ->
|
|
|
- maps:remove(client, Bucket)
|
|
|
- end,
|
|
|
- Case = fun() ->
|
|
|
- Limiter = connect(BucketMod(make_limiter_cfg())),
|
|
|
- ?assertMatch(#{rate := Rate}, Limiter)
|
|
|
- end,
|
|
|
- with_global(GlobalMod, [BucketMod], Case).
|
|
|
-
|
|
|
-t_bucket_client(_) ->
|
|
|
- GlobalRate = ?RATE("1/s"),
|
|
|
- BucketRate = ?RATE("10/s"),
|
|
|
- GlobalMod = fun(#{client := #{message_routing := MR} = Client} = Cfg) ->
|
|
|
- Cfg#{client := Client#{message_routing := MR#{rate := GlobalRate}}}
|
|
|
- end,
|
|
|
- BucketMod = fun(#{client := Client} = Bucket) ->
|
|
|
- Bucket#{client := Client#{rate := BucketRate}}
|
|
|
- end,
|
|
|
- Case = fun() ->
|
|
|
- Limiter = connect(BucketMod(make_limiter_cfg())),
|
|
|
- ?assertMatch(#{rate := BucketRate}, Limiter)
|
|
|
- end,
|
|
|
- with_global(GlobalMod, [BucketMod], Case).
|
|
|
-
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Test Cases misc
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -565,13 +504,241 @@ t_schema_unit(_) ->
|
|
|
?assertMatch({error, _}, M:to_rate("100MB/1")),
|
|
|
?assertMatch({error, _}, M:to_rate("100/10x")),
|
|
|
|
|
|
- ?assertEqual({ok, emqx_limiter_schema:infinity_value()}, M:to_capacity("infinity")),
|
|
|
+ ?assertEqual({ok, infinity}, M:to_capacity("infinity")),
|
|
|
?assertEqual({ok, 100}, M:to_capacity("100")),
|
|
|
?assertEqual({ok, 100 * 1024}, M:to_capacity("100KB")),
|
|
|
?assertEqual({ok, 100 * 1024 * 1024}, M:to_capacity("100MB")),
|
|
|
?assertEqual({ok, 100 * 1024 * 1024 * 1024}, M:to_capacity("100GB")),
|
|
|
ok.
|
|
|
|
|
|
+t_compatibility_for_capacity(_) ->
|
|
|
+ CfgStr = <<
|
|
|
+ ""
|
|
|
+ "\n"
|
|
|
+ "listeners.tcp.default {\n"
|
|
|
+ " bind = \"0.0.0.0:1883\"\n"
|
|
|
+ " max_connections = 1024000\n"
|
|
|
+ " limiter.messages.capacity = infinity\n"
|
|
|
+ " limiter.client.messages.capacity = infinity\n"
|
|
|
+ "}\n"
|
|
|
+ ""
|
|
|
+ >>,
|
|
|
+ ?assertMatch(
|
|
|
+ #{
|
|
|
+ messages := #{burst := 0},
|
|
|
+ client := #{messages := #{burst := 0}}
|
|
|
+ },
|
|
|
+ parse_and_check(CfgStr)
|
|
|
+ ).
|
|
|
+
|
|
|
+t_compatibility_for_message_in(_) ->
|
|
|
+ CfgStr = <<
|
|
|
+ ""
|
|
|
+ "\n"
|
|
|
+ "listeners.tcp.default {\n"
|
|
|
+ " bind = \"0.0.0.0:1883\"\n"
|
|
|
+ " max_connections = 1024000\n"
|
|
|
+ " limiter.message_in.rate = infinity\n"
|
|
|
+ " limiter.client.message_in.rate = infinity\n"
|
|
|
+ "}\n"
|
|
|
+ ""
|
|
|
+ >>,
|
|
|
+ ?assertMatch(
|
|
|
+ #{
|
|
|
+ messages := #{rate := infinity},
|
|
|
+ client := #{messages := #{rate := infinity}}
|
|
|
+ },
|
|
|
+ parse_and_check(CfgStr)
|
|
|
+ ).
|
|
|
+
|
|
|
+t_compatibility_for_bytes_in(_) ->
|
|
|
+ CfgStr = <<
|
|
|
+ ""
|
|
|
+ "\n"
|
|
|
+ "listeners.tcp.default {\n"
|
|
|
+ " bind = \"0.0.0.0:1883\"\n"
|
|
|
+ " max_connections = 1024000\n"
|
|
|
+ " limiter.bytes_in.rate = infinity\n"
|
|
|
+ " limiter.client.bytes_in.rate = infinity\n"
|
|
|
+ "}\n"
|
|
|
+ ""
|
|
|
+ >>,
|
|
|
+ ?assertMatch(
|
|
|
+ #{
|
|
|
+ bytes := #{rate := infinity},
|
|
|
+ client := #{bytes := #{rate := infinity}}
|
|
|
+ },
|
|
|
+ parse_and_check(CfgStr)
|
|
|
+ ).
|
|
|
+
|
|
|
+t_extract_with_type(_) ->
|
|
|
+ IsOnly = fun
|
|
|
+ (_Key, Cfg) when map_size(Cfg) =/= 1 ->
|
|
|
+ false;
|
|
|
+ (Key, Cfg) ->
|
|
|
+ maps:is_key(Key, Cfg)
|
|
|
+ end,
|
|
|
+ Checker = fun
|
|
|
+ (Type, #{client := Client} = Cfg) ->
|
|
|
+ Cfg2 = maps:remove(client, Cfg),
|
|
|
+ IsOnly(Type, Client) andalso
|
|
|
+ (IsOnly(Type, Cfg2) orelse
|
|
|
+ map_size(Cfg2) =:= 0);
|
|
|
+ (Type, Cfg) ->
|
|
|
+ IsOnly(Type, Cfg)
|
|
|
+ end,
|
|
|
+ ?assertEqual(undefined, emqx_limiter_schema:extract_with_type(messages, undefined)),
|
|
|
+ ?assert(
|
|
|
+ Checker(
|
|
|
+ messages,
|
|
|
+ emqx_limiter_schema:extract_with_type(messages, #{
|
|
|
+ messages => #{rate => 1}, bytes => #{rate => 1}
|
|
|
+ })
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ ?assert(
|
|
|
+ Checker(
|
|
|
+ messages,
|
|
|
+ emqx_limiter_schema:extract_with_type(messages, #{
|
|
|
+ messages => #{rate => 1},
|
|
|
+ bytes => #{rate => 1},
|
|
|
+ client => #{messages => #{rate => 2}}
|
|
|
+ })
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ ?assert(
|
|
|
+ Checker(
|
|
|
+ messages,
|
|
|
+ emqx_limiter_schema:extract_with_type(messages, #{
|
|
|
+ client => #{messages => #{rate => 2}, bytes => #{rate => 1}}
|
|
|
+ })
|
|
|
+ )
|
|
|
+ ).
|
|
|
+
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+%% Test Cases Create Instance
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+t_create_instance_with_infinity_node(_) ->
|
|
|
+ emqx_limiter_manager:insert_bucket(?FUNCTION_NAME, bytes, ?FUNCTION_NAME),
|
|
|
+ Cases = make_create_test_data_with_infinity_node(?FUNCTION_NAME),
|
|
|
+ lists:foreach(
|
|
|
+ fun({Cfg, Expected}) ->
|
|
|
+ {ok, Result} = emqx_limiter_server:connect(?FUNCTION_NAME, bytes, Cfg),
|
|
|
+ IsMatched =
|
|
|
+ case is_atom(Expected) of
|
|
|
+ true ->
|
|
|
+ Result =:= Expected;
|
|
|
+ _ ->
|
|
|
+ Expected(Result)
|
|
|
+ end,
|
|
|
+ ?assert(
|
|
|
+ IsMatched,
|
|
|
+ lists:flatten(
|
|
|
+ io_lib:format("Got unexpected:~p~n, Cfg:~p~n", [
|
|
|
+ Result, Cfg
|
|
|
+ ])
|
|
|
+ )
|
|
|
+ )
|
|
|
+ end,
|
|
|
+ Cases
|
|
|
+ ),
|
|
|
+ emqx_limiter_manager:delete_bucket(?FUNCTION_NAME, bytes),
|
|
|
+ ok.
|
|
|
+
|
|
|
+t_not_exists_instance(_) ->
|
|
|
+ Cfg = #{bytes => #{rate => 100, burst => 0, initial => 0}},
|
|
|
+ ?assertEqual(
|
|
|
+ {error, invalid_bucket},
|
|
|
+ emqx_limiter_server:connect(?FUNCTION_NAME, bytes, Cfg)
|
|
|
+ ),
|
|
|
+
|
|
|
+ ?assertEqual(
|
|
|
+ {error, invalid_bucket},
|
|
|
+ emqx_limiter_server:connect(?FUNCTION_NAME, not_exists, Cfg)
|
|
|
+ ),
|
|
|
+ ok.
|
|
|
+
|
|
|
+t_create_instance_with_node(_) ->
|
|
|
+ GlobalMod = fun(#{message_routing := MR} = Cfg) ->
|
|
|
+ Cfg#{
|
|
|
+ message_routing := MR#{rate := ?RATE("200/1s")},
|
|
|
+ messages := MR#{rate := ?RATE("200/1s")}
|
|
|
+ }
|
|
|
+ end,
|
|
|
+
|
|
|
+ B1 = fun(Bucket) ->
|
|
|
+ Bucket#{rate := ?RATE("400/1s")}
|
|
|
+ end,
|
|
|
+
|
|
|
+ B2 = fun(Bucket) ->
|
|
|
+ Bucket#{rate := infinity}
|
|
|
+ end,
|
|
|
+
|
|
|
+ IsRefLimiter = fun
|
|
|
+ ({ok, #{tokens := _}}, _IsRoot) ->
|
|
|
+ false;
|
|
|
+ ({ok, #{bucket := #{index := ?ROOT_COUNTER_IDX}}}, true) ->
|
|
|
+ true;
|
|
|
+ ({ok, #{bucket := #{index := Index}}}, false) when Index =/= ?ROOT_COUNTER_IDX ->
|
|
|
+ true;
|
|
|
+ (Result, _IsRoot) ->
|
|
|
+ ct:pal("The result is:~p~n", [Result]),
|
|
|
+ false
|
|
|
+ end,
|
|
|
+
|
|
|
+ Case = fun() ->
|
|
|
+ BucketCfg = make_limiter_cfg(),
|
|
|
+
|
|
|
+ ?assert(
|
|
|
+ IsRefLimiter(emqx_limiter_server:connect(b1, message_routing, B1(BucketCfg)), false)
|
|
|
+ ),
|
|
|
+ ?assert(
|
|
|
+ IsRefLimiter(emqx_limiter_server:connect(b2, message_routing, B2(BucketCfg)), true)
|
|
|
+ ),
|
|
|
+ ?assert(IsRefLimiter(emqx_limiter_server:connect(x, messages, undefined), true)),
|
|
|
+ ?assertNot(IsRefLimiter(emqx_limiter_server:connect(x, bytes, undefined), false))
|
|
|
+ end,
|
|
|
+
|
|
|
+ with_global(
|
|
|
+ GlobalMod,
|
|
|
+ [{b1, B1}, {b2, B2}],
|
|
|
+ Case
|
|
|
+ ),
|
|
|
+ ok.
|
|
|
+
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+%% Test Cases emqx_esockd_htb_limiter
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+t_create_esockd_htb_limiter(_) ->
|
|
|
+ Opts = emqx_esockd_htb_limiter:new_create_options(?FUNCTION_NAME, bytes, undefined),
|
|
|
+ ?assertMatch(
|
|
|
+ #{module := _, id := ?FUNCTION_NAME, type := bytes, bucket := undefined},
|
|
|
+ Opts
|
|
|
+ ),
|
|
|
+
|
|
|
+ Limiter = emqx_esockd_htb_limiter:create(Opts),
|
|
|
+ ?assertMatch(
|
|
|
+ #{module := _, name := bytes, limiter := infinity},
|
|
|
+ Limiter
|
|
|
+ ),
|
|
|
+
|
|
|
+ ?assertEqual(ok, emqx_esockd_htb_limiter:delete(Limiter)),
|
|
|
+ ok.
|
|
|
+
|
|
|
+t_esockd_htb_consume(_) ->
|
|
|
+ ClientCfg = emqx_limiter_schema:default_client_config(),
|
|
|
+ Cfg = #{client => #{bytes => ClientCfg#{rate := 50, max_retry_time := 0}}},
|
|
|
+ Opts = emqx_esockd_htb_limiter:new_create_options(?FUNCTION_NAME, bytes, Cfg),
|
|
|
+ Limiter = emqx_esockd_htb_limiter:create(Opts),
|
|
|
+
|
|
|
+ C1R = emqx_esockd_htb_limiter:consume(51, Limiter),
|
|
|
+ ?assertMatch({pause, _Ms, _Limiter2}, C1R),
|
|
|
+
|
|
|
+ timer:sleep(300),
|
|
|
+ C2R = emqx_esockd_htb_limiter:consume(50, Limiter),
|
|
|
+ ?assertMatch({ok, _}, C2R),
|
|
|
+ ok.
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%%% Internal functions
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -748,17 +915,16 @@ connect(Name, Cfg) ->
|
|
|
Limiter.
|
|
|
|
|
|
make_limiter_cfg() ->
|
|
|
- Infinity = emqx_limiter_schema:infinity_value(),
|
|
|
Client = #{
|
|
|
- rate => Infinity,
|
|
|
+ rate => infinity,
|
|
|
initial => 0,
|
|
|
- capacity => Infinity,
|
|
|
+ burst => 0,
|
|
|
low_watermark => 0,
|
|
|
divisible => false,
|
|
|
max_retry_time => timer:seconds(5),
|
|
|
failure_strategy => force
|
|
|
},
|
|
|
- #{client => Client, rate => Infinity, initial => 0, capacity => Infinity}.
|
|
|
+ #{client => Client, rate => infinity, initial => 0, burst => 0}.
|
|
|
|
|
|
add_bucket(Cfg) ->
|
|
|
add_bucket(?MODULE, Cfg).
|
|
|
@@ -812,3 +978,68 @@ apply_modifier(Pairs, #{default := Template}) ->
|
|
|
Acc#{N => M(Template)}
|
|
|
end,
|
|
|
lists:foldl(Fun, #{}, Pairs).
|
|
|
+
|
|
|
+parse_and_check(ConfigString) ->
|
|
|
+ ok = emqx_common_test_helpers:load_config(emqx_schema, ConfigString),
|
|
|
+ emqx:get_config([listeners, tcp, default, limiter]).
|
|
|
+
|
|
|
+make_create_test_data_with_infinity_node(FakeInstnace) ->
|
|
|
+ Infinity = emqx_htb_limiter:make_infinity_limiter(),
|
|
|
+ ClientCfg = emqx_limiter_schema:default_client_config(),
|
|
|
+ InfinityRef = emqx_limiter_bucket_ref:infinity_bucket(),
|
|
|
+ MkC = fun(Rate) ->
|
|
|
+ #{client => #{bytes => ClientCfg#{rate := Rate}}}
|
|
|
+ end,
|
|
|
+ MkB = fun(Rate) ->
|
|
|
+ #{bytes => #{rate => Rate, burst => 0, initial => 0}}
|
|
|
+ end,
|
|
|
+
|
|
|
+ MkA = fun(Client, Bucket) ->
|
|
|
+ maps:merge(MkC(Client), MkB(Bucket))
|
|
|
+ end,
|
|
|
+ IsRefLimiter = fun(Expected) ->
|
|
|
+ fun
|
|
|
+ (#{tokens := _}) -> false;
|
|
|
+ (#{bucket := Bucket}) -> Bucket =:= Expected;
|
|
|
+ (_) -> false
|
|
|
+ end
|
|
|
+ end,
|
|
|
+
|
|
|
+ IsTokenLimiter = fun(Expected) ->
|
|
|
+ fun
|
|
|
+ (#{tokens := _, bucket := Bucket}) -> Bucket =:= Expected;
|
|
|
+ (_) -> false
|
|
|
+ end
|
|
|
+ end,
|
|
|
+
|
|
|
+ [
|
|
|
+ %% default situation, no limiter setting
|
|
|
+ {undefined, Infinity},
|
|
|
+
|
|
|
+ %% client = undefined bucket = undefined
|
|
|
+ {#{}, Infinity},
|
|
|
+ %% client = undefined bucket = infinity
|
|
|
+ {MkB(infinity), Infinity},
|
|
|
+ %% client = undefined bucket = other
|
|
|
+ {MkB(100), IsRefLimiter(FakeInstnace)},
|
|
|
+
|
|
|
+ %% client = infinity bucket = undefined
|
|
|
+ {MkC(infinity), Infinity},
|
|
|
+ %% client = infinity bucket = infinity
|
|
|
+ {MkA(infinity, infinity), Infinity},
|
|
|
+
|
|
|
+ %% client = infinity bucket = other
|
|
|
+ {MkA(infinity, 100), IsRefLimiter(FakeInstnace)},
|
|
|
+
|
|
|
+ %% client = other bucket = undefined
|
|
|
+ {MkC(100), IsTokenLimiter(InfinityRef)},
|
|
|
+
|
|
|
+ %% client = other bucket = infinity
|
|
|
+ {MkC(100), IsTokenLimiter(InfinityRef)},
|
|
|
+
|
|
|
+ %% client = C bucket = B C < B
|
|
|
+ {MkA(100, 1000), IsTokenLimiter(FakeInstnace)},
|
|
|
+
|
|
|
+ %% client = C bucket = B C > B
|
|
|
+ {MkA(1000, 100), IsRefLimiter(FakeInstnace)}
|
|
|
+ ].
|