|
|
@@ -26,13 +26,15 @@
|
|
|
|
|
|
-define(NOW, erlang:system_time(millisecond)).
|
|
|
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
|
|
|
+-define(LANTENCY, 101).
|
|
|
|
|
|
-define(BASE_CONF, <<
|
|
|
""
|
|
|
"\n"
|
|
|
"slow_subs {\n"
|
|
|
" enable = true\n"
|
|
|
- " top_k_num = 5,\n"
|
|
|
+ " top_k_num = 5\n"
|
|
|
+ " threshold = 100ms\n"
|
|
|
" expire_interval = 5m\n"
|
|
|
" stats_type = whole\n"
|
|
|
" }"
|
|
|
@@ -64,10 +66,10 @@ end_per_suite(_Config) ->
|
|
|
|
|
|
init_per_testcase(t_expire, Config) ->
|
|
|
{ok, _} = emqx_cluster_rpc:start_link(),
|
|
|
- Cfg = emqx_config:get([slow_subs]),
|
|
|
- emqx_slow_subs:update_settings(Cfg#{expire_interval := 1500}),
|
|
|
+ update_config(<<"expire_interval">>, <<"1500ms">>),
|
|
|
Config;
|
|
|
init_per_testcase(_, Config) ->
|
|
|
+ {ok, _} = emqx_cluster_rpc:start_link(),
|
|
|
Config.
|
|
|
|
|
|
end_per_testcase(_, _) ->
|
|
|
@@ -84,9 +86,49 @@ end_per_testcase(_, _) ->
|
|
|
%% Test Cases
|
|
|
%%--------------------------------------------------------------------
|
|
|
t_pub(_) ->
|
|
|
+ _ = [stats_with_type(Type) || Type <- [whole, internal, response]],
|
|
|
+ ok.
|
|
|
+
|
|
|
+t_expire(_) ->
|
|
|
+ Now = ?NOW,
|
|
|
+ Each = fun(I) ->
|
|
|
+ ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])),
|
|
|
+ ets:insert(?TOPK_TAB, #top_k{
|
|
|
+ index = ?TOPK_INDEX(1, ?ID(ClientId, <<"topic">>)),
|
|
|
+ last_update_time = Now - timer:minutes(5)
|
|
|
+ })
|
|
|
+ end,
|
|
|
+
|
|
|
+ lists:foreach(Each, lists:seq(1, 5)),
|
|
|
+
|
|
|
+ timer:sleep(3000),
|
|
|
+ Size = ets:info(?TOPK_TAB, size),
|
|
|
+ ?assertEqual(0, Size),
|
|
|
+ ok.
|
|
|
+
|
|
|
+start_client(Type, Subs) ->
|
|
|
+ [spawn(fun() -> client(I, Type, Subs) end) || I <- lists:seq(1, 10)].
|
|
|
+
|
|
|
+client(I, Type, Subs) ->
|
|
|
+ ConnOptions = make_conn_options(Type, I),
|
|
|
+ {ok, C} = emqtt:start_link(ConnOptions),
|
|
|
+ {ok, _} = emqtt:connect(C),
|
|
|
+
|
|
|
+ Len = erlang:length(Subs),
|
|
|
+ Sub = lists:nth(I rem Len + 1, Subs),
|
|
|
+ _ = emqtt:subscribe(C, Sub),
|
|
|
+
|
|
|
+ receive
|
|
|
+ stop ->
|
|
|
+ ok
|
|
|
+ end.
|
|
|
+
|
|
|
+stats_with_type(Type) ->
|
|
|
+ emqx_slow_subs:clear_history(),
|
|
|
+ update_stats_type(Type),
|
|
|
%% Sub topic first
|
|
|
Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}],
|
|
|
- Clients = start_client(Subs),
|
|
|
+ Clients = start_client(Type, Subs),
|
|
|
timer:sleep(1000),
|
|
|
Now = ?NOW,
|
|
|
%% publish
|
|
|
@@ -95,7 +137,7 @@ t_pub(_) ->
|
|
|
fun(I) ->
|
|
|
Topic = list_to_binary(io_lib:format("/test1/~p", [I])),
|
|
|
Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>),
|
|
|
- emqx:publish(Msg#message{timestamp = Now - 500}),
|
|
|
+ emqx:publish(Msg#message{timestamp = Now - ?LANTENCY}),
|
|
|
timer:sleep(100)
|
|
|
end,
|
|
|
lists:seq(1, 10)
|
|
|
@@ -105,7 +147,7 @@ t_pub(_) ->
|
|
|
fun(I) ->
|
|
|
Topic = list_to_binary(io_lib:format("/test2/~p", [I])),
|
|
|
Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>),
|
|
|
- emqx:publish(Msg#message{timestamp = Now - 500}),
|
|
|
+ emqx:publish(Msg#message{timestamp = Now - ?LANTENCY}),
|
|
|
timer:sleep(100)
|
|
|
end,
|
|
|
lists:seq(1, 10)
|
|
|
@@ -113,45 +155,48 @@ t_pub(_) ->
|
|
|
|
|
|
timer:sleep(1000),
|
|
|
Size = ets:info(?TOPK_TAB, size),
|
|
|
- ?assert(Size =< 10 andalso Size >= 3, io_lib:format("the size is :~p~n", [Size])),
|
|
|
-
|
|
|
- [Client ! stop || Client <- Clients],
|
|
|
- ok.
|
|
|
-
|
|
|
-t_expire(_) ->
|
|
|
- Now = ?NOW,
|
|
|
- Each = fun(I) ->
|
|
|
- ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])),
|
|
|
- ets:insert(?TOPK_TAB, #top_k{
|
|
|
- index = ?TOPK_INDEX(1, ?ID(ClientId, <<"topic">>)),
|
|
|
- last_update_time = Now - timer:minutes(5)
|
|
|
- })
|
|
|
- end,
|
|
|
+ ?assert(
|
|
|
+ Size =< 10 andalso Size >= 3,
|
|
|
+ lists:flatten(io_lib:format("with_type:~p, the size is :~p~n", [Type, Size]))
|
|
|
+ ),
|
|
|
|
|
|
- lists:foreach(Each, lists:seq(1, 5)),
|
|
|
+ ?assert(
|
|
|
+ lists:all(
|
|
|
+ fun(#{timespan := Ts}) ->
|
|
|
+ Ts >= 101 andalso Ts < ?NOW - Now
|
|
|
+ end,
|
|
|
+ emqx_slow_subs_api:get_history()
|
|
|
+ )
|
|
|
+ ),
|
|
|
|
|
|
- timer:sleep(3000),
|
|
|
- Size = ets:info(?TOPK_TAB, size),
|
|
|
- ?assertEqual(0, Size),
|
|
|
+ [Client ! stop || Client <- Clients],
|
|
|
ok.
|
|
|
|
|
|
-start_client(Subs) ->
|
|
|
- [spawn(fun() -> client(I, Subs) end) || I <- lists:seq(1, 10)].
|
|
|
-
|
|
|
-client(I, Subs) ->
|
|
|
- {ok, C} = emqtt:start_link([
|
|
|
+update_stats_type(Type) ->
|
|
|
+ update_config(<<"stats_type">>, erlang:atom_to_binary(Type)).
|
|
|
+
|
|
|
+update_config(Key, Value) ->
|
|
|
+ Raw = #{
|
|
|
+ <<"enable">> => true,
|
|
|
+ <<"expire_interval">> => <<"5m">>,
|
|
|
+ <<"stats_type">> => <<"whole">>,
|
|
|
+ <<"threshold">> => <<"100ms">>,
|
|
|
+ <<"top_k_num">> => 5
|
|
|
+ },
|
|
|
+ emqx_slow_subs:update_settings(Raw#{Key => Value}).
|
|
|
+
|
|
|
+make_conn_options(response, I) ->
|
|
|
+ [
|
|
|
+ {msg_handler, #{
|
|
|
+ publish => fun(_) -> timer:sleep(?LANTENCY) end,
|
|
|
+ disconnected => fun(_) -> ok end
|
|
|
+ }}
|
|
|
+ | make_conn_options(whole, I)
|
|
|
+ ];
|
|
|
+make_conn_options(_, I) ->
|
|
|
+ [
|
|
|
{host, "localhost"},
|
|
|
{clientid, io_lib:format("slow_subs_~p", [I])},
|
|
|
{username, <<"plain">>},
|
|
|
{password, <<"plain">>}
|
|
|
- ]),
|
|
|
- {ok, _} = emqtt:connect(C),
|
|
|
-
|
|
|
- Len = erlang:length(Subs),
|
|
|
- Sub = lists:nth(I rem Len + 1, Subs),
|
|
|
- _ = emqtt:subscribe(C, Sub),
|
|
|
-
|
|
|
- receive
|
|
|
- stop ->
|
|
|
- ok
|
|
|
- end.
|
|
|
+ ].
|