emqx_delayed_api_SUITE.erl 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %% http://www.apache.org/licenses/LICENSE-2.0
  8. %%
  9. %% Unless required by applicable law or agreed to in writing, software
  10. %% distributed under the License is distributed on an "AS IS" BASIS,
  11. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. %% See the License for the specific language governing permissions and
  13. %% limitations under the License.
  14. %%--------------------------------------------------------------------
  15. -module(emqx_delayed_api_SUITE).
  16. -compile(nowarn_export_all).
  17. -compile(export_all).
  18. -include_lib("common_test/include/ct.hrl").
  19. -include_lib("eunit/include/eunit.hrl").
  20. -define(BASE_CONF, #{
  21. <<"dealyed">> => <<"true">>,
  22. <<"max_delayed_messages">> => <<"0">>
  23. }).
  24. -import(emqx_mgmt_api_test_util, [request/2, request/3, uri/1]).
  25. all() ->
  26. emqx_common_test_helpers:all(?MODULE).
  27. init_per_suite(Config) ->
  28. ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF, #{
  29. raw_with_default => true
  30. }),
  31. ok = emqx_mgmt_api_test_util:init_suite(
  32. [emqx_conf, emqx_modules]
  33. ),
  34. emqx_delayed:load(),
  35. Config.
  36. end_per_suite(Config) ->
  37. ok = emqx_delayed:unload(),
  38. emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_modules]),
  39. Config.
  40. init_per_testcase(_, Config) ->
  41. {ok, _} = emqx_cluster_rpc:start_link(),
  42. Config.
  43. %%------------------------------------------------------------------------------
  44. %% Test Cases
  45. %%------------------------------------------------------------------------------
  46. t_status(_Config) ->
  47. Path = uri(["mqtt", "delayed"]),
  48. {ok, 200, R1} = request(
  49. put,
  50. Path,
  51. #{enable => false, max_delayed_messages => 10}
  52. ),
  53. ?assertMatch(#{enable := false, max_delayed_messages := 10}, decode_json(R1)),
  54. {ok, 200, R2} = request(
  55. put,
  56. Path,
  57. #{enable => true, max_delayed_messages => 12}
  58. ),
  59. ?assertMatch(#{enable := true, max_delayed_messages := 12}, decode_json(R2)),
  60. ?assertMatch(
  61. {ok, 200, _},
  62. request(
  63. put,
  64. Path,
  65. #{enable => true}
  66. )
  67. ),
  68. ?assertMatch(
  69. {ok, 400, _},
  70. request(
  71. put,
  72. Path,
  73. #{enable => true, max_delayed_messages => -5}
  74. )
  75. ),
  76. {ok, 200, ConfJson} = request(get, Path),
  77. ReturnConf = decode_json(ConfJson),
  78. ?assertMatch(#{enable := true, max_delayed_messages := 12}, ReturnConf).
  79. t_messages(_) ->
  80. clear_all_record(),
  81. emqx_delayed:load(),
  82. {ok, C1} = emqtt:start_link([{clean_start, true}]),
  83. {ok, _} = emqtt:connect(C1),
  84. timer:sleep(500),
  85. Each = fun(I) ->
  86. Topic = list_to_binary(io_lib:format("$delayed/~B/msgs", [I + 60])),
  87. emqtt:publish(
  88. C1,
  89. Topic,
  90. <<"">>,
  91. [{qos, 0}, {retain, true}]
  92. )
  93. end,
  94. lists:foreach(Each, lists:seq(1, 5)),
  95. timer:sleep(1000),
  96. Msgs = get_messages(5),
  97. [First | _] = Msgs,
  98. ?assertMatch(
  99. #{
  100. delayed_interval := _,
  101. delayed_remaining := _,
  102. expected_at := _,
  103. from_clientid := _,
  104. from_username := _,
  105. msgid := _,
  106. node := _,
  107. publish_at := _,
  108. qos := _,
  109. topic := <<"msgs">>
  110. },
  111. First
  112. ),
  113. MsgId = maps:get(msgid, First),
  114. {ok, 200, LookupMsg} = request(
  115. get,
  116. uri(["mqtt", "delayed", "messages", node(), MsgId])
  117. ),
  118. ?assertEqual(MsgId, maps:get(msgid, decode_json(LookupMsg))),
  119. ?assertMatch(
  120. {ok, 404, _},
  121. request(
  122. get,
  123. uri(["mqtt", "delayed", "messages", node(), emqx_guid:to_hexstr(emqx_guid:gen())])
  124. )
  125. ),
  126. ?assertMatch(
  127. {ok, 400, _},
  128. request(
  129. get,
  130. uri(["mqtt", "delayed", "messages", node(), "invalid_msg_id"])
  131. )
  132. ),
  133. ?assertMatch(
  134. {ok, 400, _},
  135. request(
  136. get,
  137. uri(["mqtt", "delayed", "messages", atom_to_list('unknownnode@127.0.0.1'), MsgId])
  138. )
  139. ),
  140. ?assertMatch(
  141. {ok, 400, _},
  142. request(
  143. get,
  144. uri(["mqtt", "delayed", "messages", "some_unknown_atom", MsgId])
  145. )
  146. ),
  147. ?assertMatch(
  148. {ok, 404, _},
  149. request(
  150. delete,
  151. uri(["mqtt", "delayed", "messages", node(), emqx_guid:to_hexstr(emqx_guid:gen())])
  152. )
  153. ),
  154. ?assertMatch(
  155. {ok, 204, _},
  156. request(
  157. delete,
  158. uri(["mqtt", "delayed", "messages", node(), MsgId])
  159. )
  160. ),
  161. _ = get_messages(4),
  162. ok = emqtt:disconnect(C1).
  163. t_large_payload(_) ->
  164. clear_all_record(),
  165. emqx_delayed:load(),
  166. {ok, C1} = emqtt:start_link([{clean_start, true}]),
  167. {ok, _} = emqtt:connect(C1),
  168. timer:sleep(500),
  169. Topic = <<"$delayed/123/msgs">>,
  170. emqtt:publish(
  171. C1,
  172. Topic,
  173. iolist_to_binary([<<"x">> || _ <- lists:seq(1, 5000)]),
  174. [{qos, 0}, {retain, true}]
  175. ),
  176. timer:sleep(1000),
  177. [#{msgid := MsgId}] = get_messages(1),
  178. {ok, 200, Msg} = request(
  179. get,
  180. uri(["mqtt", "delayed", "messages", node(), MsgId])
  181. ),
  182. ?assertMatch(
  183. #{
  184. payload := <<"PAYLOAD_TOO_LARGE">>,
  185. topic := <<"msgs">>
  186. },
  187. decode_json(Msg)
  188. ).
  189. %%--------------------------------------------------------------------
  190. %% HTTP Request
  191. %%--------------------------------------------------------------------
  192. decode_json(Data) ->
  193. BinJson = emqx_json:decode(Data, [return_maps]),
  194. emqx_map_lib:unsafe_atom_key_map(BinJson).
  195. clear_all_record() ->
  196. ets:delete_all_objects(emqx_delayed).
  197. get_messages(Len) ->
  198. {ok, 200, MsgsJson} = request(get, uri(["mqtt", "delayed", "messages"])),
  199. #{data := Msgs} = decode_json(MsgsJson),
  200. MsgLen = erlang:length(Msgs),
  201. ?assertEqual(
  202. Len,
  203. MsgLen,
  204. lists:flatten(
  205. io_lib:format("message length is:~p~nWhere:~p~nHooks:~p~n", [
  206. MsgLen, erlang:whereis(emqx_delayed), ets:tab2list(emqx_hooks)
  207. ])
  208. )
  209. ),
  210. Msgs.