|
|
@@ -19,8 +19,6 @@
|
|
|
-compile(export_all).
|
|
|
-compile(nowarn_export_all).
|
|
|
|
|
|
--include_lib("emqx/include/emqx.hrl").
|
|
|
--include_lib("emqx/include/emqx_mqtt.hrl").
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
-include_lib("lc/include/lc.hrl").
|
|
|
@@ -62,61 +60,41 @@ t_disable_enable(_Config) ->
|
|
|
ok = emqx_olp:disable(),
|
|
|
?assert(not is_process_alive(Old)),
|
|
|
{ok, Pid} = emqx_olp:enable(),
|
|
|
- timer:sleep(1000),
|
|
|
?assert(is_process_alive(Pid)).
|
|
|
|
|
|
%% Test that overload detection works
|
|
|
t_is_overloaded(_Config) ->
|
|
|
- P = burst_runq(),
|
|
|
- timer:sleep(3000),
|
|
|
+ meck:new(load_ctl, [passthrough]),
|
|
|
+ meck:expect(load_ctl, is_overloaded, fun() -> true end),
|
|
|
?assert(emqx_olp:is_overloaded()),
|
|
|
- exit(P, kill),
|
|
|
- timer:sleep(3000),
|
|
|
- ?assert(not emqx_olp:is_overloaded()).
|
|
|
+ meck:expect(load_ctl, is_overloaded, fun() -> false end),
|
|
|
+ ?assert(not emqx_olp:is_overloaded()),
|
|
|
+ meck:unload(load_ctl).
|
|
|
|
|
|
%% Test that new conn is rejected when olp is enabled
|
|
|
t_overloaded_conn(_Config) ->
|
|
|
process_flag(trap_exit, true),
|
|
|
?assert(erlang:is_process_alive(load_ctl:whereis_runq_flagman())),
|
|
|
emqx_config:put([overload_protection, enable], true),
|
|
|
- P = burst_runq(),
|
|
|
- timer:sleep(1000),
|
|
|
+ meck:new(load_ctl, [passthrough]),
|
|
|
+ meck:expect(load_ctl, is_overloaded, fun() -> true end),
|
|
|
?assert(emqx_olp:is_overloaded()),
|
|
|
true = emqx:is_running(node()),
|
|
|
{ok, C} = emqtt:start_link([{host, "localhost"}, {clientid, "myclient"}]),
|
|
|
?assertNotMatch({ok, _Pid}, emqtt:connect(C)),
|
|
|
- exit(P, kill).
|
|
|
+ meck:unload(load_ctl).
|
|
|
|
|
|
%% Test that new conn is rejected when olp is enabled
|
|
|
t_overload_cooldown_conn(Config) ->
|
|
|
t_overloaded_conn(Config),
|
|
|
- timer:sleep(1000),
|
|
|
+ meck:new(load_ctl, [passthrough]),
|
|
|
+ meck:expect(load_ctl, is_overloaded, fun() -> false end),
|
|
|
?assert(not emqx_olp:is_overloaded()),
|
|
|
true = emqx:is_running(node()),
|
|
|
{ok, C} = emqtt:start_link([{host, "localhost"}, {clientid, "myclient"}]),
|
|
|
?assertMatch({ok, _Pid}, emqtt:connect(C)),
|
|
|
- emqtt:stop(C).
|
|
|
-
|
|
|
--spec burst_runq() -> ParentToKill :: pid().
|
|
|
-burst_runq() ->
|
|
|
- NProc = erlang:system_info(schedulers_online),
|
|
|
- spawn(?MODULE, worker_parent, [NProc * 1000, {?MODULE, busy_loop, []}]).
|
|
|
-
|
|
|
-%% internal helpers
|
|
|
-worker_parent(N, {M, F, A}) ->
|
|
|
- lists:foreach(
|
|
|
- fun(_) ->
|
|
|
- proc_lib:spawn_link(fun() -> apply(M, F, A) end)
|
|
|
- end,
|
|
|
- lists:seq(1, N)
|
|
|
- ),
|
|
|
- receive
|
|
|
- stop -> ok
|
|
|
- end.
|
|
|
-
|
|
|
-busy_loop() ->
|
|
|
- erlang:yield(),
|
|
|
- busy_loop().
|
|
|
+ emqtt:stop(C),
|
|
|
+ meck:unload(load_ctl).
|
|
|
|
|
|
wait_for(_Fun, 0) ->
|
|
|
false;
|