emqx_delayed_SUITE.erl 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_delayed_SUITE).
  17. -import(emqx_delayed, [on_message_publish/1]).
  18. -compile(export_all).
  19. -compile(nowarn_export_all).
  20. -record(delayed_message, {key, delayed, msg}).
  21. -include_lib("common_test/include/ct.hrl").
  22. -include_lib("eunit/include/eunit.hrl").
  23. -include_lib("emqx/include/emqx.hrl").
  24. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  25. %%--------------------------------------------------------------------
  26. %% Setups
  27. %%--------------------------------------------------------------------
  28. -define(BASE_CONF, #{
  29. <<"dealyed">> => <<"true">>,
  30. <<"max_delayed_messages">> => <<"0">>
  31. }).
  32. all() ->
  33. emqx_common_test_helpers:all(?MODULE).
  34. init_per_suite(Config) ->
  35. ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF),
  36. emqx_common_test_helpers:start_apps([emqx_conf, emqx_modules]),
  37. Config.
  38. end_per_suite(_) ->
  39. emqx_common_test_helpers:stop_apps([emqx_modules, emqx_conf]).
  40. init_per_testcase(t_load_case, Config) ->
  41. Config;
  42. init_per_testcase(_Case, Config) ->
  43. {atomic, ok} = mria:clear_table(emqx_delayed),
  44. ok = emqx_delayed:load(),
  45. Config.
  46. end_per_testcase(_Case, _Config) ->
  47. {atomic, ok} = mria:clear_table(emqx_delayed),
  48. ok = emqx_delayed:unload().
  49. %%--------------------------------------------------------------------
  50. %% Test cases
  51. %%--------------------------------------------------------------------
  52. t_enable_disable_case(_) ->
  53. emqx_delayed:unload(),
  54. timer:sleep(100),
  55. Hooks = emqx_hooks:lookup('message.publish'),
  56. MFA = {emqx_delayed, on_message_publish, []},
  57. ?assertEqual(false, lists:keyfind(MFA, 2, Hooks)),
  58. ok = emqx_delayed:load(),
  59. Hooks1 = emqx_hooks:lookup('message.publish'),
  60. ?assertNotEqual(false, lists:keyfind(MFA, 2, Hooks1)),
  61. Ts0 = integer_to_binary(erlang:system_time(second) + 10),
  62. DelayedMsg0 = emqx_message:make(
  63. ?MODULE, 1, <<"$delayed/", Ts0/binary, "/publish">>, <<"delayed_abs">>
  64. ),
  65. _ = on_message_publish(DelayedMsg0),
  66. ?assertMatch(#{data := Datas} when Datas =/= [], emqx_delayed:list(#{})),
  67. emqx_delayed:unload(),
  68. timer:sleep(100),
  69. ?assertEqual(false, lists:keyfind(MFA, 2, Hooks)),
  70. ?assertMatch(#{data := []}, emqx_delayed:list(#{})),
  71. ok.
  72. t_delayed_message(_) ->
  73. DelayedMsg = emqx_message:make(?MODULE, 1, <<"$delayed/1/publish">>, <<"delayed_m">>),
  74. ?assertEqual(
  75. {stop, DelayedMsg#message{topic = <<"publish">>, headers = #{allow_publish => false}}},
  76. on_message_publish(DelayedMsg)
  77. ),
  78. Msg = emqx_message:make(?MODULE, 1, <<"no_delayed_msg">>, <<"no_delayed">>),
  79. ?assertEqual({ok, Msg}, on_message_publish(Msg)),
  80. [#delayed_message{msg = #message{payload = Payload}}] = ets:tab2list(emqx_delayed),
  81. ?assertEqual(<<"delayed_m">>, Payload),
  82. ct:sleep(2500),
  83. EmptyKey = mnesia:dirty_all_keys(emqx_delayed),
  84. ?assertEqual([], EmptyKey).
  85. t_delayed_message_abs_time(_) ->
  86. Ts0 = integer_to_binary(erlang:system_time(second) + 1),
  87. DelayedMsg0 = emqx_message:make(
  88. ?MODULE, 1, <<"$delayed/", Ts0/binary, "/publish">>, <<"delayed_abs">>
  89. ),
  90. _ = on_message_publish(DelayedMsg0),
  91. ?assertMatch(
  92. [#delayed_message{msg = #message{payload = <<"delayed_abs">>}}],
  93. ets:tab2list(emqx_delayed)
  94. ),
  95. ct:sleep(2000),
  96. ?assertMatch(
  97. [],
  98. ets:tab2list(emqx_delayed)
  99. ),
  100. Ts1 = integer_to_binary(erlang:system_time(second) + 10000000),
  101. DelayedMsg1 = emqx_message:make(
  102. ?MODULE, 1, <<"$delayed/", Ts1/binary, "/publish">>, <<"delayed_abs">>
  103. ),
  104. ?assertError(
  105. invalid_delayed_timestamp,
  106. on_message_publish(DelayedMsg1)
  107. ).
  108. t_list(_) ->
  109. Ts0 = integer_to_binary(erlang:system_time(second) + 1),
  110. DelayedMsg0 = emqx_message:make(
  111. ?MODULE, 1, <<"$delayed/", Ts0/binary, "/publish">>, <<"delayed_abs">>
  112. ),
  113. _ = on_message_publish(DelayedMsg0),
  114. ?assertMatch(
  115. #{data := [#{topic := <<"publish">>}]},
  116. emqx_delayed:list(#{})
  117. ).
  118. t_max(_) ->
  119. emqx:update_config([delayed, max_delayed_messages], 1),
  120. DelayedMsg0 = emqx_message:make(?MODULE, 1, <<"$delayed/10/t0">>, <<"delayed0">>),
  121. DelayedMsg1 = emqx_message:make(?MODULE, 1, <<"$delayed/10/t1">>, <<"delayed1">>),
  122. _ = on_message_publish(DelayedMsg0),
  123. _ = on_message_publish(DelayedMsg1),
  124. ?assertMatch(
  125. #{data := [#{topic := <<"t0">>}]},
  126. emqx_delayed:list(#{})
  127. ).
  128. t_cluster(_) ->
  129. DelayedMsg = emqx_message:make(?MODULE, 1, <<"$delayed/1/publish">>, <<"delayed">>),
  130. Id = emqx_message:id(DelayedMsg),
  131. _ = on_message_publish(DelayedMsg),
  132. ?assertMatch(
  133. {ok, _},
  134. emqx_delayed_proto_v1:get_delayed_message(node(), Id)
  135. ),
  136. %% The 'local' and the 'fake-remote' values should be the same,
  137. %% however there is a race condition, so we are just assert that they are both 'ok' tuples
  138. ?assertMatch({ok, _}, emqx_delayed:get_delayed_message(Id)),
  139. ?assertMatch({ok, _}, emqx_delayed_proto_v1:get_delayed_message(node(), Id)),
  140. ok = emqx_delayed_proto_v1:delete_delayed_message(node(), Id),
  141. ?assertMatch(
  142. {error, _},
  143. emqx_delayed:get_delayed_message(Id)
  144. ).
  145. t_unknown_messages(_) ->
  146. OldPid = whereis(emqx_delayed),
  147. OldPid ! unknown,
  148. ok = gen_server:cast(OldPid, unknown),
  149. ?assertEqual(
  150. ignored,
  151. gen_server:call(OldPid, unknown)
  152. ).
  153. t_get_basic_usage_info(_Config) ->
  154. emqx:update_config([delayed, max_delayed_messages], 10000),
  155. ?assertEqual(#{delayed_message_count => 0}, emqx_delayed:get_basic_usage_info()),
  156. lists:foreach(
  157. fun(N) ->
  158. Num = integer_to_binary(N),
  159. Message = emqx_message:make(<<"$delayed/", Num/binary, "/delayed">>, <<"payload">>),
  160. {stop, _} = emqx_delayed:on_message_publish(Message)
  161. end,
  162. lists:seq(1, 4)
  163. ),
  164. ?assertEqual(#{delayed_message_count => 4}, emqx_delayed:get_basic_usage_info()),
  165. ok.
  166. t_delayed_precision(_) ->
  167. MaxSpan = 1250,
  168. FutureDiff = subscribe_proc(),
  169. DelayedMsg0 = emqx_message:make(
  170. ?MODULE, 1, <<"$delayed/1/delayed/test">>, <<"delayed/test">>
  171. ),
  172. _ = on_message_publish(DelayedMsg0),
  173. ?assert(FutureDiff() =< MaxSpan).
  174. t_banned_delayed(_) ->
  175. emqx:update_config([delayed, max_delayed_messages], 10000),
  176. ClientId1 = <<"bc1">>,
  177. ClientId2 = <<"bc2">>,
  178. Now = erlang:system_time(second),
  179. Who = {clientid, ClientId2},
  180. emqx_banned:create(#{
  181. who => Who,
  182. by => <<"test">>,
  183. reason => <<"test">>,
  184. at => Now,
  185. until => Now + 120
  186. }),
  187. snabbkaffe:start_trace(),
  188. {ok, SubRef} =
  189. snabbkaffe:subscribe(
  190. ?match_event(#{?snk_kind := ignore_delayed_message_publish}),
  191. _NEvents = 2,
  192. _Timeout = 10000,
  193. 0
  194. ),
  195. lists:foreach(
  196. fun(ClientId) ->
  197. Msg = emqx_message:make(ClientId, <<"$delayed/1/bc">>, <<"payload">>),
  198. emqx_delayed:on_message_publish(Msg)
  199. end,
  200. [ClientId1, ClientId1, ClientId1, ClientId2, ClientId2]
  201. ),
  202. {ok, Trace} = snabbkaffe:receive_events(SubRef),
  203. snabbkaffe:stop(),
  204. emqx_banned:delete(Who),
  205. mnesia:clear_table(emqx_delayed),
  206. ?assertEqual(2, length(?of_kind(ignore_delayed_message_publish, Trace))).
  207. subscribe_proc() ->
  208. Self = self(),
  209. Ref = erlang:make_ref(),
  210. erlang:spawn(fun() ->
  211. Topic = <<"delayed/+">>,
  212. emqx_broker:subscribe(Topic),
  213. Self !
  214. {Ref,
  215. receive
  216. {deliver, Topic, Msg} ->
  217. erlang:system_time(milli_seconds) - Msg#message.timestamp
  218. after 2000 ->
  219. 2000
  220. end},
  221. emqx_broker:unsubscribe(Topic)
  222. end),
  223. fun() ->
  224. receive
  225. {Ref, Diff} ->
  226. Diff
  227. after 2000 ->
  228. 2000
  229. end
  230. end.
  231. t_delayed_load_unload(_Config) ->
  232. Conf = emqx:get_raw_config([delayed]),
  233. Conf1 = Conf#{<<"max_delayed_messages">> => 1234},
  234. ?assertMatch({ok, _}, emqx:update_config([delayed], Conf1#{<<"enable">> := true})),
  235. ?assert(is_hooks_exist()),
  236. ?assertEqual(1234, emqx:get_config([delayed, max_delayed_messages])),
  237. ?assertMatch({ok, _}, emqx:update_config([delayed], Conf1#{<<"enable">> := false})),
  238. ?assertNot(is_hooks_exist()),
  239. ?assertMatch({ok, _}, emqx:update_config([delayed], Conf)),
  240. ok.
  241. is_hooks_exist() ->
  242. Hooks = emqx_hooks:lookup('message.publish'),
  243. false =/= lists:keyfind({emqx_delayed, on_message_publish, []}, 2, Hooks).