emqx_delayed_api_SUITE.erl 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %% http://www.apache.org/licenses/LICENSE-2.0
  8. %%
  9. %% Unless required by applicable law or agreed to in writing, software
  10. %% distributed under the License is distributed on an "AS IS" BASIS,
  11. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. %% See the License for the specific language governing permissions and
  13. %% limitations under the License.
  14. %%--------------------------------------------------------------------
  15. -module(emqx_delayed_api_SUITE).
  16. -compile(nowarn_export_all).
  17. -compile(export_all).
  18. -include_lib("common_test/include/ct.hrl").
  19. -include_lib("eunit/include/eunit.hrl").
  20. -define(BASE_CONF, #{
  21. <<"dealyed">> => <<"true">>,
  22. <<"max_delayed_messages">> => <<"0">>
  23. }).
  24. -import(emqx_dashboard_api_test_helpers, [request/2, request/3, uri/1]).
  25. all() ->
  26. emqx_common_test_helpers:all(?MODULE).
  27. init_per_suite(Config) ->
  28. ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF),
  29. ok = emqx_common_test_helpers:start_apps(
  30. [emqx_conf, emqx_modules, emqx_dashboard],
  31. fun set_special_configs/1
  32. ),
  33. emqx_delayed:enable(),
  34. Config.
  35. end_per_suite(Config) ->
  36. ok = emqx_delayed:disable(),
  37. emqx_common_test_helpers:stop_apps([emqx_conf, emqx_dashboard, emqx_modules]),
  38. Config.
  39. init_per_testcase(_, Config) ->
  40. {ok, _} = emqx_cluster_rpc:start_link(),
  41. Config.
  42. set_special_configs(emqx_dashboard) ->
  43. emqx_dashboard_api_test_helpers:set_default_config();
  44. set_special_configs(_App) ->
  45. ok.
  46. %%------------------------------------------------------------------------------
  47. %% Test Cases
  48. %%------------------------------------------------------------------------------
  49. t_status(_Config) ->
  50. Path = uri(["mqtt", "delayed"]),
  51. {ok, 200, R1} = request(
  52. put,
  53. Path,
  54. #{enable => false, max_delayed_messages => 10}
  55. ),
  56. ?assertMatch(#{enable := false, max_delayed_messages := 10}, decode_json(R1)),
  57. {ok, 200, R2} = request(
  58. put,
  59. Path,
  60. #{enable => true, max_delayed_messages => 12}
  61. ),
  62. ?assertMatch(#{enable := true, max_delayed_messages := 12}, decode_json(R2)),
  63. ?assertMatch(
  64. {ok, 200, _},
  65. request(
  66. put,
  67. Path,
  68. #{enable => true}
  69. )
  70. ),
  71. ?assertMatch(
  72. {ok, 400, _},
  73. request(
  74. put,
  75. Path,
  76. #{enable => true, max_delayed_messages => -5}
  77. )
  78. ),
  79. {ok, 200, ConfJson} = request(get, Path),
  80. ReturnConf = decode_json(ConfJson),
  81. ?assertMatch(#{enable := true, max_delayed_messages := 12}, ReturnConf).
  82. t_messages(_) ->
  83. clear_all_record(),
  84. emqx_delayed:enable(),
  85. {ok, C1} = emqtt:start_link([{clean_start, true}]),
  86. {ok, _} = emqtt:connect(C1),
  87. timer:sleep(500),
  88. Each = fun(I) ->
  89. Topic = list_to_binary(io_lib:format("$delayed/~B/msgs", [I + 60])),
  90. emqtt:publish(
  91. C1,
  92. Topic,
  93. <<"">>,
  94. [{qos, 0}, {retain, true}]
  95. )
  96. end,
  97. lists:foreach(Each, lists:seq(1, 5)),
  98. timer:sleep(1000),
  99. Msgs = get_messages(5),
  100. [First | _] = Msgs,
  101. ?assertMatch(
  102. #{
  103. delayed_interval := _,
  104. delayed_remaining := _,
  105. expected_at := _,
  106. from_clientid := _,
  107. from_username := _,
  108. msgid := _,
  109. node := _,
  110. publish_at := _,
  111. qos := _,
  112. topic := <<"msgs">>
  113. },
  114. First
  115. ),
  116. MsgId = maps:get(msgid, First),
  117. {ok, 200, LookupMsg} = request(
  118. get,
  119. uri(["mqtt", "delayed", "messages", node(), MsgId])
  120. ),
  121. ?assertEqual(MsgId, maps:get(msgid, decode_json(LookupMsg))),
  122. ?assertMatch(
  123. {ok, 404, _},
  124. request(
  125. get,
  126. uri(["mqtt", "delayed", "messages", node(), emqx_guid:to_hexstr(emqx_guid:gen())])
  127. )
  128. ),
  129. ?assertMatch(
  130. {ok, 400, _},
  131. request(
  132. get,
  133. uri(["mqtt", "delayed", "messages", node(), "invalid_msg_id"])
  134. )
  135. ),
  136. ?assertMatch(
  137. {ok, 400, _},
  138. request(
  139. get,
  140. uri(["mqtt", "delayed", "messages", atom_to_list('unknownnode@127.0.0.1'), MsgId])
  141. )
  142. ),
  143. ?assertMatch(
  144. {ok, 400, _},
  145. request(
  146. get,
  147. uri(["mqtt", "delayed", "messages", "some_unknown_atom", MsgId])
  148. )
  149. ),
  150. ?assertMatch(
  151. {ok, 404, _},
  152. request(
  153. delete,
  154. uri(["mqtt", "delayed", "messages", node(), emqx_guid:to_hexstr(emqx_guid:gen())])
  155. )
  156. ),
  157. ?assertMatch(
  158. {ok, 204, _},
  159. request(
  160. delete,
  161. uri(["mqtt", "delayed", "messages", node(), MsgId])
  162. )
  163. ),
  164. _ = get_messages(4),
  165. ok = emqtt:disconnect(C1).
  166. t_large_payload(_) ->
  167. clear_all_record(),
  168. emqx_delayed:enable(),
  169. {ok, C1} = emqtt:start_link([{clean_start, true}]),
  170. {ok, _} = emqtt:connect(C1),
  171. timer:sleep(500),
  172. Topic = <<"$delayed/123/msgs">>,
  173. emqtt:publish(
  174. C1,
  175. Topic,
  176. iolist_to_binary([<<"x">> || _ <- lists:seq(1, 5000)]),
  177. [{qos, 0}, {retain, true}]
  178. ),
  179. timer:sleep(1000),
  180. [#{msgid := MsgId}] = get_messages(1),
  181. {ok, 200, Msg} = request(
  182. get,
  183. uri(["mqtt", "delayed", "messages", node(), MsgId])
  184. ),
  185. ?assertMatch(
  186. #{
  187. payload := <<"PAYLOAD_TOO_LARGE">>,
  188. topic := <<"msgs">>
  189. },
  190. decode_json(Msg)
  191. ).
  192. %%--------------------------------------------------------------------
  193. %% HTTP Request
  194. %%--------------------------------------------------------------------
  195. decode_json(Data) ->
  196. BinJson = emqx_json:decode(Data, [return_maps]),
  197. emqx_map_lib:unsafe_atom_key_map(BinJson).
  198. clear_all_record() ->
  199. ets:delete_all_objects(emqx_delayed).
  200. get_messages(Len) ->
  201. {ok, 200, MsgsJson} = request(get, uri(["mqtt", "delayed", "messages"])),
  202. #{data := Msgs} = decode_json(MsgsJson),
  203. MsgLen = erlang:length(Msgs),
  204. ?assertEqual(
  205. Len,
  206. MsgLen,
  207. lists:flatten(
  208. io_lib:format("message length is:~p~nWhere:~p~nHooks:~p~n", [
  209. MsgLen, erlang:whereis(emqx_delayed), ets:tab2list(emqx_hooks)
  210. ])
  211. )
  212. ),
  213. Msgs.