emqx_delayed_api_SUITE.erl 7.7 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %% http://www.apache.org/licenses/LICENSE-2.0
  8. %%
  9. %% Unless required by applicable law or agreed to in writing, software
  10. %% distributed under the License is distributed on an "AS IS" BASIS,
  11. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. %% See the License for the specific language governing permissions and
  13. %% limitations under the License.
  14. %%--------------------------------------------------------------------
  15. -module(emqx_delayed_api_SUITE).
  16. -compile(nowarn_export_all).
  17. -compile(export_all).
  18. -include_lib("common_test/include/ct.hrl").
  19. -include_lib("eunit/include/eunit.hrl").
  20. -define(BASE_CONF, #{
  21. <<"dealyed">> => <<"true">>,
  22. <<"max_delayed_messages">> => <<"0">>
  23. }).
  24. -import(emqx_mgmt_api_test_util, [request/2, request/3, uri/1]).
  25. all() ->
  26. emqx_common_test_helpers:all(?MODULE).
  27. init_per_suite(Config) ->
  28. Apps = emqx_cth_suite:start(
  29. [
  30. emqx_conf,
  31. {emqx_modules, #{config => ?BASE_CONF}},
  32. emqx_management,
  33. emqx_mgmt_api_test_util:emqx_dashboard()
  34. ],
  35. #{work_dir => emqx_cth_suite:work_dir(Config)}
  36. ),
  37. [{apps, Apps} | Config].
  38. end_per_suite(Config) ->
  39. Apps = ?config(apps, Config),
  40. emqx_cth_suite:stop(Apps),
  41. ok.
  42. init_per_testcase(_, Config) ->
  43. {ok, _} = emqx_cluster_rpc:start_link(),
  44. Config.
  45. %%------------------------------------------------------------------------------
  46. %% Test Cases
  47. %%------------------------------------------------------------------------------
  48. t_status(_Config) ->
  49. Path = uri(["mqtt", "delayed"]),
  50. {ok, 200, R1} = request(
  51. put,
  52. Path,
  53. #{enable => false, max_delayed_messages => 10}
  54. ),
  55. ?assertMatch(#{enable := false, max_delayed_messages := 10}, decode_json(R1)),
  56. {ok, 200, R2} = request(
  57. put,
  58. Path,
  59. #{enable => true, max_delayed_messages => 12}
  60. ),
  61. ?assertMatch(#{enable := true, max_delayed_messages := 12}, decode_json(R2)),
  62. ?assertMatch(
  63. {ok, 200, _},
  64. request(
  65. put,
  66. Path,
  67. #{enable => true}
  68. )
  69. ),
  70. ?assertMatch(
  71. {ok, 400, _},
  72. request(
  73. put,
  74. Path,
  75. #{enable => true, max_delayed_messages => -5}
  76. )
  77. ),
  78. {ok, 200, ConfJson} = request(get, Path),
  79. ReturnConf = decode_json(ConfJson),
  80. ?assertMatch(#{enable := true, max_delayed_messages := 12}, ReturnConf).
  81. t_messages(_) ->
  82. clear_all_record(),
  83. emqx_delayed:load(),
  84. {ok, C1} = emqtt:start_link([{clean_start, true}]),
  85. {ok, _} = emqtt:connect(C1),
  86. timer:sleep(500),
  87. Each = fun(I) ->
  88. Topic = list_to_binary(io_lib:format("$delayed/~B/msgs", [I + 60])),
  89. emqtt:publish(
  90. C1,
  91. Topic,
  92. <<"">>,
  93. [{qos, 0}, {retain, true}]
  94. )
  95. end,
  96. lists:foreach(Each, lists:seq(1, 5)),
  97. timer:sleep(1000),
  98. Msgs = get_messages(5),
  99. [First | _] = Msgs,
  100. ?assertMatch(
  101. #{
  102. delayed_interval := _,
  103. delayed_remaining := _,
  104. expected_at := _,
  105. from_clientid := _,
  106. from_username := _,
  107. msgid := _,
  108. node := _,
  109. publish_at := _,
  110. qos := _,
  111. topic := <<"msgs">>
  112. },
  113. First
  114. ),
  115. MsgId = maps:get(msgid, First),
  116. {ok, 200, LookupMsg} = request(
  117. get,
  118. uri(["mqtt", "delayed", "messages", node(), MsgId])
  119. ),
  120. ?assertEqual(MsgId, maps:get(msgid, decode_json(LookupMsg))),
  121. ?assertMatch(
  122. {ok, 404, _},
  123. request(
  124. get,
  125. uri(["mqtt", "delayed", "messages", node(), emqx_guid:to_hexstr(emqx_guid:gen())])
  126. )
  127. ),
  128. ?assertMatch(
  129. {ok, 400, _},
  130. request(
  131. get,
  132. uri(["mqtt", "delayed", "messages", node(), "invalid_msg_id"])
  133. )
  134. ),
  135. ?assertMatch(
  136. {ok, 400, _},
  137. request(
  138. get,
  139. uri(["mqtt", "delayed", "messages", atom_to_list('unknownnode@127.0.0.1'), MsgId])
  140. )
  141. ),
  142. ?assertMatch(
  143. {ok, 400, _},
  144. request(
  145. get,
  146. uri(["mqtt", "delayed", "messages", "some_unknown_atom", MsgId])
  147. )
  148. ),
  149. ?assertMatch(
  150. {ok, 404, _},
  151. request(
  152. delete,
  153. uri(["mqtt", "delayed", "messages", node(), emqx_guid:to_hexstr(emqx_guid:gen())])
  154. )
  155. ),
  156. ?assertMatch(
  157. {ok, 204, _},
  158. request(
  159. delete,
  160. uri(["mqtt", "delayed", "messages", node(), MsgId])
  161. )
  162. ),
  163. _ = get_messages(4),
  164. ok = emqtt:disconnect(C1).
  165. t_delete_messages_via_topic(_) ->
  166. clear_all_record(),
  167. emqx_delayed:load(),
  168. OriginTopic = <<"t/a">>,
  169. Topic = <<"$delayed/123/", OriginTopic/binary>>,
  170. publish_a_delayed_message(Topic),
  171. publish_a_delayed_message(Topic),
  172. %% assert: delayed messages are saved
  173. ?assertMatch([_, _], get_messages(2)),
  174. %% delete these messages via topic
  175. TopicInUrl = uri_string:quote(OriginTopic),
  176. {ok, 204, _} = request(
  177. delete,
  178. uri(["mqtt", "delayed", "messages", TopicInUrl])
  179. ),
  180. %% assert: messages are deleted
  181. ?assertEqual([], get_messages(0)),
  182. %% assert: return 400 if the topic parameter is invalid
  183. TopicFilter = uri_string:quote(<<"t/#">>),
  184. ?assertMatch(
  185. {ok, 400, _},
  186. request(
  187. delete,
  188. uri(["mqtt", "delayed", "messages", TopicFilter])
  189. )
  190. ),
  191. %% assert: return 404 if no messages found for the topic
  192. ?assertMatch(
  193. {ok, 404, _},
  194. request(
  195. delete,
  196. uri(["mqtt", "delayed", "messages", TopicInUrl])
  197. )
  198. ),
  199. ok.
  200. t_large_payload(_) ->
  201. clear_all_record(),
  202. emqx_delayed:load(),
  203. {ok, C1} = emqtt:start_link([{clean_start, true}]),
  204. {ok, _} = emqtt:connect(C1),
  205. timer:sleep(500),
  206. Topic = <<"$delayed/123/msgs">>,
  207. emqtt:publish(
  208. C1,
  209. Topic,
  210. iolist_to_binary([<<"x">> || _ <- lists:seq(1, 5000)]),
  211. [{qos, 0}, {retain, true}]
  212. ),
  213. timer:sleep(1000),
  214. [#{msgid := MsgId}] = get_messages(1),
  215. {ok, 200, Msg} = request(
  216. get,
  217. uri(["mqtt", "delayed", "messages", node(), MsgId])
  218. ),
  219. ?assertMatch(
  220. #{
  221. payload := <<"PAYLOAD_TOO_LARGE">>,
  222. topic := <<"msgs">>
  223. },
  224. decode_json(Msg)
  225. ).
  226. %%--------------------------------------------------------------------
  227. %% HTTP Request
  228. %%--------------------------------------------------------------------
  229. decode_json(Data) ->
  230. BinJson = emqx_utils_json:decode(Data, [return_maps]),
  231. emqx_utils_maps:unsafe_atom_key_map(BinJson).
  232. clear_all_record() ->
  233. ets:delete_all_objects(emqx_delayed).
  234. get_messages(Len) ->
  235. {ok, 200, MsgsJson} = request(get, uri(["mqtt", "delayed", "messages"])),
  236. #{data := Msgs} = decode_json(MsgsJson),
  237. MsgLen = erlang:length(Msgs),
  238. ?assertEqual(
  239. Len,
  240. MsgLen,
  241. lists:flatten(
  242. io_lib:format("message length is:~p~nWhere:~p~nHooks:~p~n", [
  243. MsgLen, erlang:whereis(emqx_delayed), ets:tab2list(emqx_hooks)
  244. ])
  245. )
  246. ),
  247. Msgs.
  248. publish_a_delayed_message(Topic) ->
  249. {ok, C1} = emqtt:start_link([{clean_start, true}]),
  250. {ok, _} = emqtt:connect(C1),
  251. emqtt:publish(
  252. C1,
  253. Topic,
  254. <<"This is a delayed messages">>,
  255. [{qos, 1}]
  256. ),
  257. ok = emqtt:disconnect(C1).