emqx_delayed_api_SUITE.erl 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %% http://www.apache.org/licenses/LICENSE-2.0
  8. %%
  9. %% Unless required by applicable law or agreed to in writing, software
  10. %% distributed under the License is distributed on an "AS IS" BASIS,
  11. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. %% See the License for the specific language governing permissions and
  13. %% limitations under the License.
  14. %%--------------------------------------------------------------------
  15. -module(emqx_delayed_api_SUITE).
  16. -compile(nowarn_export_all).
  17. -compile(export_all).
  18. -include_lib("common_test/include/ct.hrl").
  19. -include_lib("eunit/include/eunit.hrl").
  20. -define(BASE_CONF, #{
  21. <<"dealyed">> => <<"true">>,
  22. <<"max_delayed_messages">> => <<"0">>
  23. }).
  24. -import(emqx_dashboard_api_test_helpers, [request/2, request/3, uri/1]).
  25. all() ->
  26. emqx_common_test_helpers:all(?MODULE).
  27. init_per_suite(Config) ->
  28. ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF),
  29. ok = emqx_common_test_helpers:start_apps(
  30. [emqx_conf, emqx_modules, emqx_dashboard],
  31. fun set_special_configs/1
  32. ),
  33. emqx_delayed:enable(),
  34. Config.
  35. end_per_suite(Config) ->
  36. ok = emqx_delayed:disable(),
  37. emqx_common_test_helpers:stop_apps([emqx_conf, emqx_dashboard, emqx_modules]),
  38. Config.
  39. init_per_testcase(_, Config) ->
  40. {ok, _} = emqx_cluster_rpc:start_link(),
  41. Config.
  42. set_special_configs(emqx_dashboard) ->
  43. emqx_dashboard_api_test_helpers:set_default_config();
  44. set_special_configs(_App) ->
  45. ok.
  46. %%------------------------------------------------------------------------------
  47. %% Test Cases
  48. %%------------------------------------------------------------------------------
  49. t_status(_Config) ->
  50. Path = uri(["mqtt", "delayed"]),
  51. {ok, 200, R1} = request(
  52. put,
  53. Path,
  54. #{enable => false, max_delayed_messages => 10}
  55. ),
  56. ?assertMatch(#{enable := false, max_delayed_messages := 10}, decode_json(R1)),
  57. {ok, 200, R2} = request(
  58. put,
  59. Path,
  60. #{enable => true, max_delayed_messages => 12}
  61. ),
  62. ?assertMatch(#{enable := true, max_delayed_messages := 12}, decode_json(R2)),
  63. ?assertMatch(
  64. {ok, 200, _},
  65. request(
  66. put,
  67. Path,
  68. #{enable => true}
  69. )
  70. ),
  71. ?assertMatch(
  72. {ok, 400, _},
  73. request(
  74. put,
  75. Path,
  76. #{enable => true, max_delayed_messages => -5}
  77. )
  78. ),
  79. {ok, 200, ConfJson} = request(get, Path),
  80. ReturnConf = decode_json(ConfJson),
  81. ?assertMatch(#{enable := true, max_delayed_messages := 12}, ReturnConf).
  82. t_messages(_) ->
  83. clear_all_record(),
  84. {ok, C1} = emqtt:start_link([{clean_start, true}]),
  85. {ok, _} = emqtt:connect(C1),
  86. timer:sleep(500),
  87. Each = fun(I) ->
  88. Topic = list_to_binary(io_lib:format("$delayed/~B/msgs", [I + 60])),
  89. emqtt:publish(
  90. C1,
  91. Topic,
  92. <<"">>,
  93. [{qos, 0}, {retain, true}]
  94. )
  95. end,
  96. lists:foreach(Each, lists:seq(1, 5)),
  97. timer:sleep(500),
  98. Msgs = get_messages(5),
  99. [First | _] = Msgs,
  100. ?assertMatch(
  101. #{
  102. delayed_interval := _,
  103. delayed_remaining := _,
  104. expected_at := _,
  105. from_clientid := _,
  106. from_username := _,
  107. msgid := _,
  108. node := _,
  109. publish_at := _,
  110. qos := _,
  111. topic := <<"msgs">>
  112. },
  113. First
  114. ),
  115. MsgId = maps:get(msgid, First),
  116. {ok, 200, LookupMsg} = request(
  117. get,
  118. uri(["mqtt", "delayed", "messages", node(), MsgId])
  119. ),
  120. ?assertEqual(MsgId, maps:get(msgid, decode_json(LookupMsg))),
  121. ?assertMatch(
  122. {ok, 404, _},
  123. request(
  124. get,
  125. uri(["mqtt", "delayed", "messages", node(), emqx_guid:to_hexstr(emqx_guid:gen())])
  126. )
  127. ),
  128. ?assertMatch(
  129. {ok, 400, _},
  130. request(
  131. get,
  132. uri(["mqtt", "delayed", "messages", node(), "invalid_msg_id"])
  133. )
  134. ),
  135. ?assertMatch(
  136. {ok, 400, _},
  137. request(
  138. get,
  139. uri(["mqtt", "delayed", "messages", atom_to_list('unknownnode@127.0.0.1'), MsgId])
  140. )
  141. ),
  142. ?assertMatch(
  143. {ok, 400, _},
  144. request(
  145. get,
  146. uri(["mqtt", "delayed", "messages", "some_unknown_atom", MsgId])
  147. )
  148. ),
  149. ?assertMatch(
  150. {ok, 404, _},
  151. request(
  152. delete,
  153. uri(["mqtt", "delayed", "messages", node(), emqx_guid:to_hexstr(emqx_guid:gen())])
  154. )
  155. ),
  156. ?assertMatch(
  157. {ok, 204, _},
  158. request(
  159. delete,
  160. uri(["mqtt", "delayed", "messages", node(), MsgId])
  161. )
  162. ),
  163. _ = get_messages(4),
  164. ok = emqtt:disconnect(C1).
  165. t_large_payload(_) ->
  166. clear_all_record(),
  167. {ok, C1} = emqtt:start_link([{clean_start, true}]),
  168. {ok, _} = emqtt:connect(C1),
  169. timer:sleep(500),
  170. Topic = <<"$delayed/123/msgs">>,
  171. emqtt:publish(
  172. C1,
  173. Topic,
  174. iolist_to_binary([<<"x">> || _ <- lists:seq(1, 5000)]),
  175. [{qos, 0}, {retain, true}]
  176. ),
  177. timer:sleep(500),
  178. [#{msgid := MsgId}] = get_messages(1),
  179. {ok, 200, Msg} = request(
  180. get,
  181. uri(["mqtt", "delayed", "messages", node(), MsgId])
  182. ),
  183. ?assertMatch(
  184. #{
  185. payload := <<"PAYLOAD_TOO_LARGE">>,
  186. topic := <<"msgs">>
  187. },
  188. decode_json(Msg)
  189. ).
  190. %%--------------------------------------------------------------------
  191. %% HTTP Request
  192. %%--------------------------------------------------------------------
  193. decode_json(Data) ->
  194. BinJson = emqx_json:decode(Data, [return_maps]),
  195. emqx_map_lib:unsafe_atom_key_map(BinJson).
  196. clear_all_record() ->
  197. ets:delete_all_objects(emqx_delayed).
  198. get_messages(Len) ->
  199. {ok, 200, MsgsJson} = request(get, uri(["mqtt", "delayed", "messages"])),
  200. #{data := Msgs} = decode_json(MsgsJson),
  201. MsgLen = erlang:length(Msgs),
  202. ?assert(
  203. MsgLen =:= Len,
  204. lists:flatten(io_lib:format("message length is:~p~n", [MsgLen]))
  205. ),
  206. Msgs.