|
|
@@ -43,6 +43,16 @@ init_per_suite(Config) ->
|
|
|
end_per_suite(_) ->
|
|
|
emqx_common_test_helpers:stop_apps([emqx_modules]).
|
|
|
|
|
|
+init_per_testcase(t_load_case, Config) ->
|
|
|
+ Config;
|
|
|
+init_per_testcase(_Case, Config) ->
|
|
|
+ {atomic, ok} = mria:clear_table(emqx_delayed),
|
|
|
+ ok = emqx_delayed:enable(),
|
|
|
+ Config.
|
|
|
+
|
|
|
+end_per_testcase(_Case, _Config) ->
|
|
|
+ ok = emqx_delayed:disable().
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Test cases
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -57,7 +67,6 @@ t_load_case(_) ->
|
|
|
ok.
|
|
|
|
|
|
t_delayed_message(_) ->
|
|
|
- ok = emqx_delayed:enable(),
|
|
|
DelayedMsg = emqx_message:make(?MODULE, 1, <<"$delayed/1/publish">>, <<"delayed_m">>),
|
|
|
?assertEqual(
|
|
|
{stop, DelayedMsg#message{topic = <<"publish">>, headers = #{allow_publish => false}}},
|
|
|
@@ -67,11 +76,94 @@ t_delayed_message(_) ->
|
|
|
Msg = emqx_message:make(?MODULE, 1, <<"no_delayed_msg">>, <<"no_delayed">>),
|
|
|
?assertEqual({ok, Msg}, on_message_publish(Msg)),
|
|
|
|
|
|
- [Key] = mnesia:dirty_all_keys(emqx_delayed),
|
|
|
- [#delayed_message{msg = #message{payload = Payload}}] = mnesia:dirty_read({emqx_delayed, Key}),
|
|
|
+ [#delayed_message{msg = #message{payload = Payload}}] = ets:tab2list(emqx_delayed),
|
|
|
?assertEqual(<<"delayed_m">>, Payload),
|
|
|
- timer:sleep(5000),
|
|
|
+ ct:sleep(2000),
|
|
|
|
|
|
EmptyKey = mnesia:dirty_all_keys(emqx_delayed),
|
|
|
- ?assertEqual([], EmptyKey),
|
|
|
- ok = emqx_delayed:disable().
|
|
|
+ ?assertEqual([], EmptyKey).
|
|
|
+
|
|
|
+t_delayed_message_abs_time(_) ->
|
|
|
+ Ts0 = integer_to_binary(erlang:system_time(second) + 1),
|
|
|
+ DelayedMsg0 = emqx_message:make(
|
|
|
+ ?MODULE, 1, <<"$delayed/", Ts0/binary, "/publish">>, <<"delayed_abs">>
|
|
|
+ ),
|
|
|
+ _ = on_message_publish(DelayedMsg0),
|
|
|
+
|
|
|
+ ?assertMatch(
|
|
|
+ [#delayed_message{msg = #message{payload = <<"delayed_abs">>}}],
|
|
|
+ ets:tab2list(emqx_delayed)
|
|
|
+ ),
|
|
|
+
|
|
|
+ ct:sleep(2000),
|
|
|
+
|
|
|
+ ?assertMatch(
|
|
|
+ [],
|
|
|
+ ets:tab2list(emqx_delayed)
|
|
|
+ ),
|
|
|
+
|
|
|
+ Ts1 = integer_to_binary(erlang:system_time(second) + 10000000),
|
|
|
+ DelayedMsg1 = emqx_message:make(
|
|
|
+ ?MODULE, 1, <<"$delayed/", Ts1/binary, "/publish">>, <<"delayed_abs">>
|
|
|
+ ),
|
|
|
+
|
|
|
+ ?assertError(
|
|
|
+ invalid_delayed_timestamp,
|
|
|
+ on_message_publish(DelayedMsg1)
|
|
|
+ ).
|
|
|
+
|
|
|
+t_list(_) ->
|
|
|
+ Ts0 = integer_to_binary(erlang:system_time(second) + 1),
|
|
|
+ DelayedMsg0 = emqx_message:make(
|
|
|
+ ?MODULE, 1, <<"$delayed/", Ts0/binary, "/publish">>, <<"delayed_abs">>
|
|
|
+ ),
|
|
|
+ _ = on_message_publish(DelayedMsg0),
|
|
|
+
|
|
|
+ ?assertMatch(
|
|
|
+ #{data := [#{topic := <<"publish">>}]},
|
|
|
+ emqx_delayed:list(#{})
|
|
|
+ ).
|
|
|
+
|
|
|
+t_max(_) ->
|
|
|
+ emqx_delayed:set_max_delayed_messages(1),
|
|
|
+
|
|
|
+ DelayedMsg0 = emqx_message:make(?MODULE, 1, <<"$delayed/10/t0">>, <<"delayed0">>),
|
|
|
+ DelayedMsg1 = emqx_message:make(?MODULE, 1, <<"$delayed/10/t1">>, <<"delayed1">>),
|
|
|
+ _ = on_message_publish(DelayedMsg0),
|
|
|
+ _ = on_message_publish(DelayedMsg1),
|
|
|
+
|
|
|
+ ?assertMatch(
|
|
|
+ #{data := [#{topic := <<"t0">>}]},
|
|
|
+ emqx_delayed:list(#{})
|
|
|
+ ).
|
|
|
+
|
|
|
+t_cluster(_) ->
|
|
|
+ DelayedMsg = emqx_message:make(?MODULE, 1, <<"$delayed/1/publish">>, <<"delayed">>),
|
|
|
+ Id = emqx_message:id(DelayedMsg),
|
|
|
+ _ = on_message_publish(DelayedMsg),
|
|
|
+
|
|
|
+ ?assertMatch(
|
|
|
+ {ok, _},
|
|
|
+ emqx_delayed_proto_v1:get_delayed_message(node(), Id)
|
|
|
+ ),
|
|
|
+
|
|
|
+ ?assertEqual(
|
|
|
+ emqx_delayed:get_delayed_message(Id),
|
|
|
+ emqx_delayed_proto_v1:get_delayed_message(node(), Id)
|
|
|
+ ),
|
|
|
+
|
|
|
+ ok = emqx_delayed_proto_v1:delete_delayed_message(node(), Id),
|
|
|
+
|
|
|
+ ?assertMatch(
|
|
|
+ {error, _},
|
|
|
+ emqx_delayed:get_delayed_message(Id)
|
|
|
+ ).
|
|
|
+
|
|
|
+t_unknown_messages(_) ->
|
|
|
+ OldPid = whereis(emqx_delayed),
|
|
|
+ OldPid ! unknown,
|
|
|
+ ok = gen_server:cast(OldPid, unknown),
|
|
|
+ ?assertEqual(
|
|
|
+ ignored,
|
|
|
+ gen_server:call(OldPid, unknown)
|
|
|
+ ).
|