emqx_delayed_api_SUITE.erl 7.6 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %% http://www.apache.org/licenses/LICENSE-2.0
  8. %%
  9. %% Unless required by applicable law or agreed to in writing, software
  10. %% distributed under the License is distributed on an "AS IS" BASIS,
  11. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. %% See the License for the specific language governing permissions and
  13. %% limitations under the License.
  14. %%--------------------------------------------------------------------
  15. -module(emqx_delayed_api_SUITE).
  16. -compile(nowarn_export_all).
  17. -compile(export_all).
  18. -include_lib("common_test/include/ct.hrl").
  19. -include_lib("eunit/include/eunit.hrl").
  20. -define(BASE_CONF, #{
  21. <<"dealyed">> => <<"true">>,
  22. <<"max_delayed_messages">> => <<"0">>
  23. }).
  24. -import(emqx_mgmt_api_test_util, [request/2, request/3, uri/1]).
  25. all() ->
  26. emqx_common_test_helpers:all(?MODULE).
  27. init_per_suite(Config) ->
  28. ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF),
  29. ok = emqx_mgmt_api_test_util:init_suite(
  30. [emqx_conf, emqx_modules]
  31. ),
  32. emqx_delayed:load(),
  33. Config.
  34. end_per_suite(Config) ->
  35. ok = emqx_delayed:unload(),
  36. emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_modules]),
  37. Config.
  38. init_per_testcase(_, Config) ->
  39. {ok, _} = emqx_cluster_rpc:start_link(),
  40. Config.
  41. %%------------------------------------------------------------------------------
  42. %% Test Cases
  43. %%------------------------------------------------------------------------------
  44. t_status(_Config) ->
  45. Path = uri(["mqtt", "delayed"]),
  46. {ok, 200, R1} = request(
  47. put,
  48. Path,
  49. #{enable => false, max_delayed_messages => 10}
  50. ),
  51. ?assertMatch(#{enable := false, max_delayed_messages := 10}, decode_json(R1)),
  52. {ok, 200, R2} = request(
  53. put,
  54. Path,
  55. #{enable => true, max_delayed_messages => 12}
  56. ),
  57. ?assertMatch(#{enable := true, max_delayed_messages := 12}, decode_json(R2)),
  58. ?assertMatch(
  59. {ok, 200, _},
  60. request(
  61. put,
  62. Path,
  63. #{enable => true}
  64. )
  65. ),
  66. ?assertMatch(
  67. {ok, 400, _},
  68. request(
  69. put,
  70. Path,
  71. #{enable => true, max_delayed_messages => -5}
  72. )
  73. ),
  74. {ok, 200, ConfJson} = request(get, Path),
  75. ReturnConf = decode_json(ConfJson),
  76. ?assertMatch(#{enable := true, max_delayed_messages := 12}, ReturnConf).
  77. t_messages(_) ->
  78. clear_all_record(),
  79. emqx_delayed:load(),
  80. {ok, C1} = emqtt:start_link([{clean_start, true}]),
  81. {ok, _} = emqtt:connect(C1),
  82. timer:sleep(500),
  83. Each = fun(I) ->
  84. Topic = list_to_binary(io_lib:format("$delayed/~B/msgs", [I + 60])),
  85. emqtt:publish(
  86. C1,
  87. Topic,
  88. <<"">>,
  89. [{qos, 0}, {retain, true}]
  90. )
  91. end,
  92. lists:foreach(Each, lists:seq(1, 5)),
  93. timer:sleep(1000),
  94. Msgs = get_messages(5),
  95. [First | _] = Msgs,
  96. ?assertMatch(
  97. #{
  98. delayed_interval := _,
  99. delayed_remaining := _,
  100. expected_at := _,
  101. from_clientid := _,
  102. from_username := _,
  103. msgid := _,
  104. node := _,
  105. publish_at := _,
  106. qos := _,
  107. topic := <<"msgs">>
  108. },
  109. First
  110. ),
  111. MsgId = maps:get(msgid, First),
  112. {ok, 200, LookupMsg} = request(
  113. get,
  114. uri(["mqtt", "delayed", "messages", node(), MsgId])
  115. ),
  116. ?assertEqual(MsgId, maps:get(msgid, decode_json(LookupMsg))),
  117. ?assertMatch(
  118. {ok, 404, _},
  119. request(
  120. get,
  121. uri(["mqtt", "delayed", "messages", node(), emqx_guid:to_hexstr(emqx_guid:gen())])
  122. )
  123. ),
  124. ?assertMatch(
  125. {ok, 400, _},
  126. request(
  127. get,
  128. uri(["mqtt", "delayed", "messages", node(), "invalid_msg_id"])
  129. )
  130. ),
  131. ?assertMatch(
  132. {ok, 400, _},
  133. request(
  134. get,
  135. uri(["mqtt", "delayed", "messages", atom_to_list('unknownnode@127.0.0.1'), MsgId])
  136. )
  137. ),
  138. ?assertMatch(
  139. {ok, 400, _},
  140. request(
  141. get,
  142. uri(["mqtt", "delayed", "messages", "some_unknown_atom", MsgId])
  143. )
  144. ),
  145. ?assertMatch(
  146. {ok, 404, _},
  147. request(
  148. delete,
  149. uri(["mqtt", "delayed", "messages", node(), emqx_guid:to_hexstr(emqx_guid:gen())])
  150. )
  151. ),
  152. ?assertMatch(
  153. {ok, 204, _},
  154. request(
  155. delete,
  156. uri(["mqtt", "delayed", "messages", node(), MsgId])
  157. )
  158. ),
  159. _ = get_messages(4),
  160. ok = emqtt:disconnect(C1).
  161. t_delete_messages_via_topic(_) ->
  162. clear_all_record(),
  163. emqx_delayed:load(),
  164. OriginTopic = <<"t/a">>,
  165. Topic = <<"$delayed/123/", OriginTopic/binary>>,
  166. publish_a_delayed_message(Topic),
  167. publish_a_delayed_message(Topic),
  168. %% assert: delayed messages are saved
  169. ?assertMatch([_, _], get_messages(2)),
  170. %% delete these messages via topic
  171. TopicInUrl = uri_string:quote(OriginTopic),
  172. {ok, 204, _} = request(
  173. delete,
  174. uri(["mqtt", "delayed", "messages", TopicInUrl])
  175. ),
  176. %% assert: messages are deleted
  177. ?assertEqual([], get_messages(0)),
  178. %% assert: return 400 if the topic parameter is invalid
  179. TopicFilter = uri_string:quote(<<"t/#">>),
  180. ?assertMatch(
  181. {ok, 400, _},
  182. request(
  183. delete,
  184. uri(["mqtt", "delayed", "messages", TopicFilter])
  185. )
  186. ),
  187. %% assert: return 404 if no messages found for the topic
  188. ?assertMatch(
  189. {ok, 404, _},
  190. request(
  191. delete,
  192. uri(["mqtt", "delayed", "messages", TopicInUrl])
  193. )
  194. ),
  195. ok.
  196. t_large_payload(_) ->
  197. clear_all_record(),
  198. emqx_delayed:load(),
  199. {ok, C1} = emqtt:start_link([{clean_start, true}]),
  200. {ok, _} = emqtt:connect(C1),
  201. timer:sleep(500),
  202. Topic = <<"$delayed/123/msgs">>,
  203. emqtt:publish(
  204. C1,
  205. Topic,
  206. iolist_to_binary([<<"x">> || _ <- lists:seq(1, 5000)]),
  207. [{qos, 0}, {retain, true}]
  208. ),
  209. timer:sleep(1000),
  210. [#{msgid := MsgId}] = get_messages(1),
  211. {ok, 200, Msg} = request(
  212. get,
  213. uri(["mqtt", "delayed", "messages", node(), MsgId])
  214. ),
  215. ?assertMatch(
  216. #{
  217. payload := <<"PAYLOAD_TOO_LARGE">>,
  218. topic := <<"msgs">>
  219. },
  220. decode_json(Msg)
  221. ).
  222. %%--------------------------------------------------------------------
  223. %% HTTP Request
  224. %%--------------------------------------------------------------------
  225. decode_json(Data) ->
  226. BinJson = emqx_utils_json:decode(Data, [return_maps]),
  227. emqx_utils_maps:unsafe_atom_key_map(BinJson).
  228. clear_all_record() ->
  229. ets:delete_all_objects(emqx_delayed).
  230. get_messages(Len) ->
  231. {ok, 200, MsgsJson} = request(get, uri(["mqtt", "delayed", "messages"])),
  232. #{data := Msgs} = decode_json(MsgsJson),
  233. MsgLen = erlang:length(Msgs),
  234. ?assertEqual(
  235. Len,
  236. MsgLen,
  237. lists:flatten(
  238. io_lib:format("message length is:~p~nWhere:~p~nHooks:~p~n", [
  239. MsgLen, erlang:whereis(emqx_delayed), ets:tab2list(emqx_hooks)
  240. ])
  241. )
  242. ),
  243. Msgs.
  244. publish_a_delayed_message(Topic) ->
  245. {ok, C1} = emqtt:start_link([{clean_start, true}]),
  246. {ok, _} = emqtt:connect(C1),
  247. emqtt:publish(
  248. C1,
  249. Topic,
  250. <<"This is a delayed messages">>,
  251. [{qos, 1}]
  252. ),
  253. ok = emqtt:disconnect(C1).