|
|
@@ -112,12 +112,63 @@ base_conf() ->
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Test Cases Bucket Level
|
|
|
%%--------------------------------------------------------------------
|
|
|
+t_consume(_) ->
|
|
|
+ Cfg = fun(Cfg) ->
|
|
|
+ Cfg#{rate := 100,
|
|
|
+ capacity := 100,
|
|
|
+ initial := 100,
|
|
|
+ max_retry_time := 1000,
|
|
|
+ failure_strategy := force}
|
|
|
+ end,
|
|
|
+ Case = fun() ->
|
|
|
+ Client = connect(default),
|
|
|
+ {ok, L2} = emqx_htb_limiter:consume(50, Client),
|
|
|
+ {ok, _L3} = emqx_htb_limiter:consume(150, L2)
|
|
|
+ end,
|
|
|
+ with_per_client(default, Cfg, Case).
|
|
|
+
|
|
|
+t_retry(_) ->
|
|
|
+ Cfg = fun(Cfg) ->
|
|
|
+ Cfg#{rate := 50,
|
|
|
+ capacity := 200,
|
|
|
+ initial := 0,
|
|
|
+ max_retry_time := 1000,
|
|
|
+ failure_strategy := force}
|
|
|
+ end,
|
|
|
+ Case = fun() ->
|
|
|
+ Client = connect(default),
|
|
|
+ {ok, Client} = emqx_htb_limiter:retry(Client),
|
|
|
+ {_, _, Retry, L2} = emqx_htb_limiter:check(150, Client),
|
|
|
+ L3 = emqx_htb_limiter:set_retry(Retry, L2),
|
|
|
+ timer:sleep(500),
|
|
|
+ {ok, _L4} = emqx_htb_limiter:retry(L3)
|
|
|
+ end,
|
|
|
+ with_per_client(default, Cfg, Case).
|
|
|
+
|
|
|
+t_restore(_) ->
|
|
|
+ Cfg = fun(Cfg) ->
|
|
|
+ Cfg#{rate := 1,
|
|
|
+ capacity := 200,
|
|
|
+ initial := 50,
|
|
|
+ max_retry_time := 100,
|
|
|
+ failure_strategy := force}
|
|
|
+ end,
|
|
|
+ Case = fun() ->
|
|
|
+ Client = connect(default),
|
|
|
+ {_, _, Retry, L2} = emqx_htb_limiter:check(150, Client),
|
|
|
+ timer:sleep(200),
|
|
|
+ {ok, L3} = emqx_htb_limiter:check(Retry, L2),
|
|
|
+ Avaiable = emqx_htb_limiter:available(L3),
|
|
|
+ ?assert(Avaiable >= 50)
|
|
|
+ end,
|
|
|
+ with_per_client(default, Cfg, Case).
|
|
|
+
|
|
|
t_max_retry_time(_) ->
|
|
|
Cfg = fun(Cfg) ->
|
|
|
- Cfg#{rate := 1,
|
|
|
- capacity := 1,
|
|
|
- max_retry_time := 500,
|
|
|
- failure_strategy := drop}
|
|
|
+ Cfg#{rate := 1,
|
|
|
+ capacity := 1,
|
|
|
+ max_retry_time := 500,
|
|
|
+ failure_strategy := drop}
|
|
|
end,
|
|
|
Case = fun() ->
|
|
|
Client = connect(default),
|
|
|
@@ -186,6 +237,26 @@ t_infinity_client(_) ->
|
|
|
end,
|
|
|
with_bucket(default, Fun, Case).
|
|
|
|
|
|
+t_try_restore_agg(_) ->
|
|
|
+ Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
|
|
|
+ Aggr2 = Aggr#{rate := 1,
|
|
|
+ capacity := 200,
|
|
|
+ initial := 50},
|
|
|
+ Cli2 = Cli#{rate := infinity, capacity := infinity, divisible := true,
|
|
|
+ max_retry_time := 100, failure_strategy := force},
|
|
|
+ Bucket#{aggregated := Aggr2,
|
|
|
+ per_client := Cli2}
|
|
|
+ end,
|
|
|
+ Case = fun() ->
|
|
|
+ Client = connect(default),
|
|
|
+ {_, _, Retry, L2} = emqx_htb_limiter:check(150, Client),
|
|
|
+ timer:sleep(200),
|
|
|
+ {ok, L3} = emqx_htb_limiter:check(Retry, L2),
|
|
|
+ Avaiable = emqx_htb_limiter:available(L3),
|
|
|
+ ?assert(Avaiable >= 50)
|
|
|
+ end,
|
|
|
+ with_bucket(default, Fun, Case).
|
|
|
+
|
|
|
t_short_board(_) ->
|
|
|
Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
|
|
|
Aggr2 = Aggr#{rate := ?RATE("100/1s"),
|
|
|
@@ -285,7 +356,7 @@ t_limit_zone_with_unlimit_bucket(_) ->
|
|
|
t_burst_and_fairness(_) ->
|
|
|
GlobalMod = fun(Cfg) ->
|
|
|
Cfg#{burst := ?RATE("60/1s")}
|
|
|
- end,
|
|
|
+ end,
|
|
|
|
|
|
ZoneMod = fun(Cfg) ->
|
|
|
Cfg#{rate := ?RATE("600/1s"),
|
|
|
@@ -319,9 +390,50 @@ t_burst_and_fairness(_) ->
|
|
|
[{b1, Bucket}, {b2, Bucket}],
|
|
|
Case).
|
|
|
|
|
|
+t_burst(_) ->
|
|
|
+ GlobalMod = fun(Cfg) ->
|
|
|
+ Cfg#{burst := ?RATE("60/1s")}
|
|
|
+ end,
|
|
|
+
|
|
|
+ ZoneMod = fun(Cfg) ->
|
|
|
+ Cfg#{rate := ?RATE("60/1s"),
|
|
|
+ burst := ?RATE("60/1s")}
|
|
|
+ end,
|
|
|
+
|
|
|
+ Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
|
|
|
+ Aggr2 = Aggr#{rate := ?RATE("500/1s"),
|
|
|
+ initial := 0,
|
|
|
+ capacity := 500},
|
|
|
+ Cli2 = Cli#{rate := ?RATE("600/1s"),
|
|
|
+ capacity := 600,
|
|
|
+ divisible := true},
|
|
|
+ Bucket#{aggregated := Aggr2,
|
|
|
+ per_client := Cli2}
|
|
|
+ end,
|
|
|
+
|
|
|
+ Case = fun() ->
|
|
|
+ C1 = counters:new(1, []),
|
|
|
+ C2 = counters:new(1, []),
|
|
|
+ C3 = counters:new(1, []),
|
|
|
+ start_client(b1, ?NOW + 2000, C1, 20),
|
|
|
+ start_client(b2, ?NOW + 2000, C2, 30),
|
|
|
+ start_client(b3, ?NOW + 2000, C3, 30),
|
|
|
+ timer:sleep(2100),
|
|
|
+
|
|
|
+ Total = lists:sum([counters:get(X, 1) || X <- [C1, C2, C3]]),
|
|
|
+ in_range(Total / 2, 30)
|
|
|
+ end,
|
|
|
+
|
|
|
+ with_global(GlobalMod,
|
|
|
+ default,
|
|
|
+ ZoneMod,
|
|
|
+ [{b1, Bucket}, {b2, Bucket}, {b3, Bucket}],
|
|
|
+ Case).
|
|
|
+
|
|
|
+
|
|
|
t_limit_global_with_unlimit_other(_) ->
|
|
|
GlobalMod = fun(Cfg) ->
|
|
|
- Cfg#{rate := ?RATE("600/1s")}
|
|
|
+ Cfg#{rate := ?RATE("600/1s")}
|
|
|
end,
|
|
|
|
|
|
ZoneMod = fun(Cfg) -> Cfg#{rate := infinity} end,
|
|
|
@@ -483,14 +595,85 @@ t_zone_hunger_and_fair(_) ->
|
|
|
[{b1, Bucket(z1, ?RATE("600/1s"))}, {b2, Bucket(z2, ?RATE("50/1s"))}],
|
|
|
Case).
|
|
|
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+%% Test Cases container
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+t_new_container(_) ->
|
|
|
+ C1 = emqx_limiter_container:new(),
|
|
|
+ C2 = emqx_limiter_container:new([message_routing]),
|
|
|
+ C3 = emqx_limiter_container:update_by_name(message_routing, default, C1),
|
|
|
+ ?assertMatch(#{message_routing := _,
|
|
|
+ retry_ctx := undefined,
|
|
|
+ {retry, message_routing} := _}, C2),
|
|
|
+ ?assertMatch(#{message_routing := _,
|
|
|
+ retry_ctx := undefined,
|
|
|
+ {retry, message_routing} := _}, C3),
|
|
|
+ ok.
|
|
|
+
|
|
|
+t_check_container(_) ->
|
|
|
+ Cfg = fun(Cfg) ->
|
|
|
+ Cfg#{rate := ?RATE("1000/1s"),
|
|
|
+ initial := 1000,
|
|
|
+ capacity := 1000}
|
|
|
+ end,
|
|
|
+ Case = fun() ->
|
|
|
+ C1 = emqx_limiter_container:new([message_routing]),
|
|
|
+ {ok, C2} = emqx_limiter_container:check(1000, message_routing, C1),
|
|
|
+ {pause, Pause, C3} = emqx_limiter_container:check(1000, message_routing, C2),
|
|
|
+ timer:sleep(Pause),
|
|
|
+ {ok, C4} = emqx_limiter_container:retry(message_routing, C3),
|
|
|
+ Context = test,
|
|
|
+ C5 = emqx_limiter_container:set_retry_context(Context, C4),
|
|
|
+ RetryData = emqx_limiter_container:get_retry_context(C5),
|
|
|
+ ?assertEqual(Context, RetryData)
|
|
|
+ end,
|
|
|
+ with_per_client(default, Cfg, Case).
|
|
|
+
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+%% Test Cases misc
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+t_limiter_manager(_) ->
|
|
|
+ {error, _} = emqx_limiter_manager:start_server(message_routing),
|
|
|
+ ignore = gen_server:call(emqx_limiter_manager, unexpected_call),
|
|
|
+ ok = gen_server:cast(emqx_limiter_manager, unexpected_cast),
|
|
|
+ erlang:send(erlang:whereis(emqx_limiter_manager), unexpected_info),
|
|
|
+ ok = emqx_limiter_manager:format_status(normal, ok),
|
|
|
+ ok.
|
|
|
+
|
|
|
+t_limiter_app(_) ->
|
|
|
+ try
|
|
|
+ _ = emqx_limiter_app:start(undefined, undefined)
|
|
|
+ catch _:_ ->
|
|
|
+ ok
|
|
|
+ end,
|
|
|
+ ok = emqx_limiter_app:stop(undefined),
|
|
|
+ ok.
|
|
|
+
|
|
|
+t_limiter_server(_) ->
|
|
|
+ State = emqx_limiter_server:info(message_routing),
|
|
|
+ ?assertMatch(#{root := _,
|
|
|
+ counter := _,
|
|
|
+ index := _,
|
|
|
+ zones := _,
|
|
|
+ buckets := _,
|
|
|
+ nodes := _,
|
|
|
+ type := message_routing}, State),
|
|
|
+
|
|
|
+ Name = emqx_limiter_server:name(message_routing),
|
|
|
+ ignored = gen_server:call(Name, unexpected_call),
|
|
|
+ ok = gen_server:cast(Name, unexpected_cast),
|
|
|
+ erlang:send(erlang:whereis(Name), unexpected_info),
|
|
|
+ ok = emqx_limiter_server:format_status(normal, ok),
|
|
|
+ ok.
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%%% Internal functions
|
|
|
%%--------------------------------------------------------------------
|
|
|
start_client(Name, EndTime, Counter, Number) ->
|
|
|
lists:foreach(fun(_) ->
|
|
|
- spawn(fun() ->
|
|
|
- start_client(Name, EndTime, Counter)
|
|
|
- end)
|
|
|
+ spawn(fun() ->
|
|
|
+ start_client(Name, EndTime, Counter)
|
|
|
+ end)
|
|
|
end,
|
|
|
lists:seq(1, Number)).
|
|
|
|
|
|
@@ -612,16 +795,18 @@ with_config(Path, Modifier, Case) ->
|
|
|
emqx_config:put(Path, NewCfg),
|
|
|
emqx_limiter_manager:restart_server(message_routing),
|
|
|
timer:sleep(100),
|
|
|
- DelayReturn
|
|
|
- = try
|
|
|
- Return = Case(),
|
|
|
- fun() -> Return end
|
|
|
- catch Type:Reason:Trace ->
|
|
|
- fun() -> erlang:raise(Type, Reason, Trace) end
|
|
|
- end,
|
|
|
+ DelayReturn = delay_return(Case),
|
|
|
emqx_config:put(Path, Cfg),
|
|
|
DelayReturn().
|
|
|
|
|
|
+delay_return(Case) ->
|
|
|
+ try
|
|
|
+ Return = Case(),
|
|
|
+ fun() -> Return end
|
|
|
+ catch Type:Reason:Trace ->
|
|
|
+ fun() -> erlang:raise(Type, Reason, Trace) end
|
|
|
+ end.
|
|
|
+
|
|
|
connect(Name) ->
|
|
|
emqx_limiter_server:connect(message_routing, Name).
|
|
|
|
|
|
@@ -636,7 +821,7 @@ print_average_rate(Counter, Second) ->
|
|
|
PerSec = Cost / Second,
|
|
|
ct:pal("Cost:~p PerSec:~p ~n", [Cost, PerSec]).
|
|
|
|
|
|
-in_range(Val, Expected) when Val < Expected * 0.6 ->
|
|
|
+in_range(Val, Expected) when Val < Expected * 0.5 ->
|
|
|
ct:pal("Val:~p smaller than min bound", [Val]),
|
|
|
false;
|
|
|
in_range(Val, Expected) when Val > Expected * 1.8 ->
|
|
|
@@ -663,6 +848,6 @@ apply_modifier(Name, Modifier, #{default := Template} = Cfg) ->
|
|
|
|
|
|
apply_modifier(Pairs, #{default := Template}) ->
|
|
|
Fun = fun({N, M}, Acc) ->
|
|
|
- Acc#{N => M(Template)}
|
|
|
+ Acc#{N => M(Template)}
|
|
|
end,
|
|
|
lists:foldl(Fun, #{}, Pairs).
|