emqx_delayed_api_SUITE.erl 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %% http://www.apache.org/licenses/LICENSE-2.0
  8. %%
  9. %% Unless required by applicable law or agreed to in writing, software
  10. %% distributed under the License is distributed on an "AS IS" BASIS,
  11. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. %% See the License for the specific language governing permissions and
  13. %% limitations under the License.
  14. %%--------------------------------------------------------------------
  15. -module(emqx_delayed_api_SUITE).
  16. -compile(nowarn_export_all).
  17. -compile(export_all).
  18. -include_lib("common_test/include/ct.hrl").
  19. -include_lib("eunit/include/eunit.hrl").
  20. -define(BASE_CONF, #{
  21. <<"dealyed">> => <<"true">>,
  22. <<"max_delayed_messages">> => <<"0">>
  23. }).
  24. -import(emqx_dashboard_api_test_helpers, [request/2, request/3, uri/1]).
  25. all() ->
  26. emqx_common_test_helpers:all(?MODULE).
  27. init_per_suite(Config) ->
  28. ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF, #{
  29. raw_with_default => true
  30. }),
  31. ok = emqx_common_test_helpers:start_apps(
  32. [emqx_conf, emqx_modules, emqx_dashboard],
  33. fun set_special_configs/1
  34. ),
  35. emqx_delayed:load(),
  36. Config.
  37. end_per_suite(Config) ->
  38. ok = emqx_delayed:unload(),
  39. emqx_common_test_helpers:stop_apps([emqx_conf, emqx_dashboard, emqx_modules]),
  40. Config.
  41. init_per_testcase(_, Config) ->
  42. {ok, _} = emqx_cluster_rpc:start_link(),
  43. Config.
  44. set_special_configs(emqx_dashboard) ->
  45. emqx_dashboard_api_test_helpers:set_default_config();
  46. set_special_configs(_App) ->
  47. ok.
  48. %%------------------------------------------------------------------------------
  49. %% Test Cases
  50. %%------------------------------------------------------------------------------
  51. t_status(_Config) ->
  52. Path = uri(["mqtt", "delayed"]),
  53. {ok, 200, R1} = request(
  54. put,
  55. Path,
  56. #{enable => false, max_delayed_messages => 10}
  57. ),
  58. ?assertMatch(#{enable := false, max_delayed_messages := 10}, decode_json(R1)),
  59. {ok, 200, R2} = request(
  60. put,
  61. Path,
  62. #{enable => true, max_delayed_messages => 12}
  63. ),
  64. ?assertMatch(#{enable := true, max_delayed_messages := 12}, decode_json(R2)),
  65. ?assertMatch(
  66. {ok, 200, _},
  67. request(
  68. put,
  69. Path,
  70. #{enable => true}
  71. )
  72. ),
  73. ?assertMatch(
  74. {ok, 400, _},
  75. request(
  76. put,
  77. Path,
  78. #{enable => true, max_delayed_messages => -5}
  79. )
  80. ),
  81. {ok, 200, ConfJson} = request(get, Path),
  82. ReturnConf = decode_json(ConfJson),
  83. ?assertMatch(#{enable := true, max_delayed_messages := 12}, ReturnConf).
  84. t_messages(_) ->
  85. clear_all_record(),
  86. emqx_delayed:load(),
  87. {ok, C1} = emqtt:start_link([{clean_start, true}]),
  88. {ok, _} = emqtt:connect(C1),
  89. timer:sleep(500),
  90. Each = fun(I) ->
  91. Topic = list_to_binary(io_lib:format("$delayed/~B/msgs", [I + 60])),
  92. emqtt:publish(
  93. C1,
  94. Topic,
  95. <<"">>,
  96. [{qos, 0}, {retain, true}]
  97. )
  98. end,
  99. lists:foreach(Each, lists:seq(1, 5)),
  100. timer:sleep(1000),
  101. Msgs = get_messages(5),
  102. [First | _] = Msgs,
  103. ?assertMatch(
  104. #{
  105. delayed_interval := _,
  106. delayed_remaining := _,
  107. expected_at := _,
  108. from_clientid := _,
  109. from_username := _,
  110. msgid := _,
  111. node := _,
  112. publish_at := _,
  113. qos := _,
  114. topic := <<"msgs">>
  115. },
  116. First
  117. ),
  118. MsgId = maps:get(msgid, First),
  119. {ok, 200, LookupMsg} = request(
  120. get,
  121. uri(["mqtt", "delayed", "messages", node(), MsgId])
  122. ),
  123. ?assertEqual(MsgId, maps:get(msgid, decode_json(LookupMsg))),
  124. ?assertMatch(
  125. {ok, 404, _},
  126. request(
  127. get,
  128. uri(["mqtt", "delayed", "messages", node(), emqx_guid:to_hexstr(emqx_guid:gen())])
  129. )
  130. ),
  131. ?assertMatch(
  132. {ok, 400, _},
  133. request(
  134. get,
  135. uri(["mqtt", "delayed", "messages", node(), "invalid_msg_id"])
  136. )
  137. ),
  138. ?assertMatch(
  139. {ok, 400, _},
  140. request(
  141. get,
  142. uri(["mqtt", "delayed", "messages", atom_to_list('unknownnode@127.0.0.1'), MsgId])
  143. )
  144. ),
  145. ?assertMatch(
  146. {ok, 400, _},
  147. request(
  148. get,
  149. uri(["mqtt", "delayed", "messages", "some_unknown_atom", MsgId])
  150. )
  151. ),
  152. ?assertMatch(
  153. {ok, 404, _},
  154. request(
  155. delete,
  156. uri(["mqtt", "delayed", "messages", node(), emqx_guid:to_hexstr(emqx_guid:gen())])
  157. )
  158. ),
  159. ?assertMatch(
  160. {ok, 204, _},
  161. request(
  162. delete,
  163. uri(["mqtt", "delayed", "messages", node(), MsgId])
  164. )
  165. ),
  166. _ = get_messages(4),
  167. ok = emqtt:disconnect(C1).
  168. t_large_payload(_) ->
  169. clear_all_record(),
  170. emqx_delayed:load(),
  171. {ok, C1} = emqtt:start_link([{clean_start, true}]),
  172. {ok, _} = emqtt:connect(C1),
  173. timer:sleep(500),
  174. Topic = <<"$delayed/123/msgs">>,
  175. emqtt:publish(
  176. C1,
  177. Topic,
  178. iolist_to_binary([<<"x">> || _ <- lists:seq(1, 5000)]),
  179. [{qos, 0}, {retain, true}]
  180. ),
  181. timer:sleep(1000),
  182. [#{msgid := MsgId}] = get_messages(1),
  183. {ok, 200, Msg} = request(
  184. get,
  185. uri(["mqtt", "delayed", "messages", node(), MsgId])
  186. ),
  187. ?assertMatch(
  188. #{
  189. payload := <<"PAYLOAD_TOO_LARGE">>,
  190. topic := <<"msgs">>
  191. },
  192. decode_json(Msg)
  193. ).
  194. %%--------------------------------------------------------------------
  195. %% HTTP Request
  196. %%--------------------------------------------------------------------
  197. decode_json(Data) ->
  198. BinJson = emqx_json:decode(Data, [return_maps]),
  199. emqx_map_lib:unsafe_atom_key_map(BinJson).
  200. clear_all_record() ->
  201. ets:delete_all_objects(emqx_delayed).
  202. get_messages(Len) ->
  203. {ok, 200, MsgsJson} = request(get, uri(["mqtt", "delayed", "messages"])),
  204. #{data := Msgs} = decode_json(MsgsJson),
  205. MsgLen = erlang:length(Msgs),
  206. ?assertEqual(
  207. Len,
  208. MsgLen,
  209. lists:flatten(
  210. io_lib:format("message length is:~p~nWhere:~p~nHooks:~p~n", [
  211. MsgLen, erlang:whereis(emqx_delayed), ets:tab2list(emqx_hooks)
  212. ])
  213. )
  214. ),
  215. Msgs.