| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_delayed_api_SUITE).
- -compile(nowarn_export_all).
- -compile(export_all).
- -include_lib("common_test/include/ct.hrl").
- -include_lib("eunit/include/eunit.hrl").
- -define(BASE_CONF, #{
- <<"dealyed">> => <<"true">>,
- <<"max_delayed_messages">> => <<"0">>
- }).
- -import(emqx_mgmt_api_test_util, [request/2, request/3, uri/1]).
- all() ->
- emqx_common_test_helpers:all(?MODULE).
- init_per_suite(Config) ->
- ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF),
- ok = emqx_mgmt_api_test_util:init_suite(
- [emqx_conf, emqx_modules]
- ),
- emqx_delayed:load(),
- Config.
- end_per_suite(Config) ->
- ok = emqx_delayed:unload(),
- emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_modules]),
- Config.
- init_per_testcase(_, Config) ->
- {ok, _} = emqx_cluster_rpc:start_link(),
- Config.
- %%------------------------------------------------------------------------------
- %% Test Cases
- %%------------------------------------------------------------------------------
- t_status(_Config) ->
- Path = uri(["mqtt", "delayed"]),
- {ok, 200, R1} = request(
- put,
- Path,
- #{enable => false, max_delayed_messages => 10}
- ),
- ?assertMatch(#{enable := false, max_delayed_messages := 10}, decode_json(R1)),
- {ok, 200, R2} = request(
- put,
- Path,
- #{enable => true, max_delayed_messages => 12}
- ),
- ?assertMatch(#{enable := true, max_delayed_messages := 12}, decode_json(R2)),
- ?assertMatch(
- {ok, 200, _},
- request(
- put,
- Path,
- #{enable => true}
- )
- ),
- ?assertMatch(
- {ok, 400, _},
- request(
- put,
- Path,
- #{enable => true, max_delayed_messages => -5}
- )
- ),
- {ok, 200, ConfJson} = request(get, Path),
- ReturnConf = decode_json(ConfJson),
- ?assertMatch(#{enable := true, max_delayed_messages := 12}, ReturnConf).
- t_messages(_) ->
- clear_all_record(),
- emqx_delayed:load(),
- {ok, C1} = emqtt:start_link([{clean_start, true}]),
- {ok, _} = emqtt:connect(C1),
- timer:sleep(500),
- Each = fun(I) ->
- Topic = list_to_binary(io_lib:format("$delayed/~B/msgs", [I + 60])),
- emqtt:publish(
- C1,
- Topic,
- <<"">>,
- [{qos, 0}, {retain, true}]
- )
- end,
- lists:foreach(Each, lists:seq(1, 5)),
- timer:sleep(1000),
- Msgs = get_messages(5),
- [First | _] = Msgs,
- ?assertMatch(
- #{
- delayed_interval := _,
- delayed_remaining := _,
- expected_at := _,
- from_clientid := _,
- from_username := _,
- msgid := _,
- node := _,
- publish_at := _,
- qos := _,
- topic := <<"msgs">>
- },
- First
- ),
- MsgId = maps:get(msgid, First),
- {ok, 200, LookupMsg} = request(
- get,
- uri(["mqtt", "delayed", "messages", node(), MsgId])
- ),
- ?assertEqual(MsgId, maps:get(msgid, decode_json(LookupMsg))),
- ?assertMatch(
- {ok, 404, _},
- request(
- get,
- uri(["mqtt", "delayed", "messages", node(), emqx_guid:to_hexstr(emqx_guid:gen())])
- )
- ),
- ?assertMatch(
- {ok, 400, _},
- request(
- get,
- uri(["mqtt", "delayed", "messages", node(), "invalid_msg_id"])
- )
- ),
- ?assertMatch(
- {ok, 400, _},
- request(
- get,
- uri(["mqtt", "delayed", "messages", atom_to_list('unknownnode@127.0.0.1'), MsgId])
- )
- ),
- ?assertMatch(
- {ok, 400, _},
- request(
- get,
- uri(["mqtt", "delayed", "messages", "some_unknown_atom", MsgId])
- )
- ),
- ?assertMatch(
- {ok, 404, _},
- request(
- delete,
- uri(["mqtt", "delayed", "messages", node(), emqx_guid:to_hexstr(emqx_guid:gen())])
- )
- ),
- ?assertMatch(
- {ok, 204, _},
- request(
- delete,
- uri(["mqtt", "delayed", "messages", node(), MsgId])
- )
- ),
- _ = get_messages(4),
- ok = emqtt:disconnect(C1).
- t_delete_messages_via_topic(_) ->
- clear_all_record(),
- emqx_delayed:load(),
- OriginTopic = <<"t/a">>,
- Topic = <<"$delayed/123/", OriginTopic/binary>>,
- publish_a_delayed_message(Topic),
- publish_a_delayed_message(Topic),
- %% assert: delayed messages are saved
- ?assertMatch([_, _], get_messages(2)),
- %% delete these messages via topic
- TopicInUrl = uri_string:quote(OriginTopic),
- {ok, 204, _} = request(
- delete,
- uri(["mqtt", "delayed", "messages", TopicInUrl])
- ),
- %% assert: messages are deleted
- ?assertEqual([], get_messages(0)),
- %% assert: return 400 if the topic parameter is invalid
- TopicFilter = uri_string:quote(<<"t/#">>),
- ?assertMatch(
- {ok, 400, _},
- request(
- delete,
- uri(["mqtt", "delayed", "messages", TopicFilter])
- )
- ),
- %% assert: return 404 if no messages found for the topic
- ?assertMatch(
- {ok, 404, _},
- request(
- delete,
- uri(["mqtt", "delayed", "messages", TopicInUrl])
- )
- ),
- ok.
- t_large_payload(_) ->
- clear_all_record(),
- emqx_delayed:load(),
- {ok, C1} = emqtt:start_link([{clean_start, true}]),
- {ok, _} = emqtt:connect(C1),
- timer:sleep(500),
- Topic = <<"$delayed/123/msgs">>,
- emqtt:publish(
- C1,
- Topic,
- iolist_to_binary([<<"x">> || _ <- lists:seq(1, 5000)]),
- [{qos, 0}, {retain, true}]
- ),
- timer:sleep(1000),
- [#{msgid := MsgId}] = get_messages(1),
- {ok, 200, Msg} = request(
- get,
- uri(["mqtt", "delayed", "messages", node(), MsgId])
- ),
- ?assertMatch(
- #{
- payload := <<"PAYLOAD_TOO_LARGE">>,
- topic := <<"msgs">>
- },
- decode_json(Msg)
- ).
- %%--------------------------------------------------------------------
- %% HTTP Request
- %%--------------------------------------------------------------------
- decode_json(Data) ->
- BinJson = emqx_utils_json:decode(Data, [return_maps]),
- emqx_utils_maps:unsafe_atom_key_map(BinJson).
- clear_all_record() ->
- ets:delete_all_objects(emqx_delayed).
- get_messages(Len) ->
- {ok, 200, MsgsJson} = request(get, uri(["mqtt", "delayed", "messages"])),
- #{data := Msgs} = decode_json(MsgsJson),
- MsgLen = erlang:length(Msgs),
- ?assertEqual(
- Len,
- MsgLen,
- lists:flatten(
- io_lib:format("message length is:~p~nWhere:~p~nHooks:~p~n", [
- MsgLen, erlang:whereis(emqx_delayed), ets:tab2list(emqx_hooks)
- ])
- )
- ),
- Msgs.
- publish_a_delayed_message(Topic) ->
- {ok, C1} = emqtt:start_link([{clean_start, true}]),
- {ok, _} = emqtt:connect(C1),
- emqtt:publish(
- C1,
- Topic,
- <<"This is a delayed messages">>,
- [{qos, 1}]
- ),
- ok = emqtt:disconnect(C1).
|