|
|
@@ -35,14 +35,16 @@ init_per_suite(Config) ->
|
|
|
ok = ekka:start(),
|
|
|
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
|
|
|
emqx_retainer_SUITE:load_base_conf(),
|
|
|
- emqx_mgmt_api_test_util:init_suite([emqx_retainer]),
|
|
|
+ emqx_mgmt_api_test_util:init_suite([emqx_retainer, emqx_conf]),
|
|
|
+ %% make sure no "$SYS/#" topics
|
|
|
+ emqx_conf:update([sys_topics], raw_systopic_conf(), #{override_to => cluster}),
|
|
|
Config.
|
|
|
|
|
|
end_per_suite(Config) ->
|
|
|
ekka:stop(),
|
|
|
mria:stop(),
|
|
|
mria_mnesia:delete_schema(),
|
|
|
- emqx_mgmt_api_test_util:end_suite([emqx_retainer]),
|
|
|
+ emqx_mgmt_api_test_util:end_suite([emqx_retainer, emqx_conf]),
|
|
|
Config.
|
|
|
|
|
|
init_per_testcase(_, Config) ->
|
|
|
@@ -110,11 +112,10 @@ t_messages(_) ->
|
|
|
),
|
|
|
|
|
|
{ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
|
|
|
- Msgs = decode_json(MsgsJson),
|
|
|
+ #{data := Msgs, meta := _} = decode_json(MsgsJson),
|
|
|
MsgLen = erlang:length(Msgs),
|
|
|
?assert(
|
|
|
- MsgLen >= 5,
|
|
|
- %% maybe has $SYS messages
|
|
|
+ MsgLen =:= 5,
|
|
|
io_lib:format("message length is:~p~n", [MsgLen])
|
|
|
),
|
|
|
|
|
|
@@ -133,6 +134,59 @@ t_messages(_) ->
|
|
|
|
|
|
ok = emqtt:disconnect(C1).
|
|
|
|
|
|
+t_messages_page(_) ->
|
|
|
+ {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
|
|
+ {ok, _} = emqtt:connect(C1),
|
|
|
+ emqx_retainer:clean(),
|
|
|
+
|
|
|
+ Each = fun(I) ->
|
|
|
+ emqtt:publish(
|
|
|
+ C1,
|
|
|
+ <<"retained/", (I + 60)>>,
|
|
|
+ <<"retained">>,
|
|
|
+ [{qos, 0}, {retain, true}]
|
|
|
+ )
|
|
|
+ end,
|
|
|
+
|
|
|
+ ?check_trace(
|
|
|
+ ?wait_async_action(
|
|
|
+ lists:foreach(Each, lists:seq(1, 5)),
|
|
|
+ #{?snk_kind := message_retained, topic := <<"retained/A">>},
|
|
|
+ 500
|
|
|
+ ),
|
|
|
+ []
|
|
|
+ ),
|
|
|
+ Page = 4,
|
|
|
+
|
|
|
+ {ok, MsgsJson} = request_api(
|
|
|
+ get,
|
|
|
+ api_path([
|
|
|
+ "mqtt", "retainer", "messages?page=" ++ erlang:integer_to_list(Page) ++ "&limit=1"
|
|
|
+ ])
|
|
|
+ ),
|
|
|
+ #{data := Msgs, meta := #{page := Page, limit := 1}} = decode_json(MsgsJson),
|
|
|
+ MsgLen = erlang:length(Msgs),
|
|
|
+ ?assert(
|
|
|
+ MsgLen =:= 1,
|
|
|
+ io_lib:format("message length is:~p~n", [MsgLen])
|
|
|
+ ),
|
|
|
+
|
|
|
+ [OnlyOne] = Msgs,
|
|
|
+ Topic = <<"retained/", (Page + 60)>>,
|
|
|
+ ?assertMatch(
|
|
|
+ #{
|
|
|
+ msgid := _,
|
|
|
+ topic := Topic,
|
|
|
+ qos := _,
|
|
|
+ publish_at := _,
|
|
|
+ from_clientid := _,
|
|
|
+ from_username := _
|
|
|
+ },
|
|
|
+ OnlyOne
|
|
|
+ ),
|
|
|
+
|
|
|
+ ok = emqtt:disconnect(C1).
|
|
|
+
|
|
|
t_lookup_and_delete(_) ->
|
|
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
|
|
{ok, _} = emqtt:connect(C1),
|
|
|
@@ -171,3 +225,19 @@ t_lookup_and_delete(_) ->
|
|
|
decode_json(Data) ->
|
|
|
BinJson = emqx_json:decode(Data, [return_maps]),
|
|
|
emqx_map_lib:unsafe_atom_key_map(BinJson).
|
|
|
+
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+%% Internal funcs
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+raw_systopic_conf() ->
|
|
|
+ #{
|
|
|
+ <<"sys_event_messages">> =>
|
|
|
+ #{
|
|
|
+ <<"client_connected">> => false,
|
|
|
+ <<"client_disconnected">> => false,
|
|
|
+ <<"client_subscribed">> => false,
|
|
|
+ <<"client_unsubscribed">> => false
|
|
|
+ },
|
|
|
+ <<"sys_heartbeat_interval">> => <<"1440m">>,
|
|
|
+ <<"sys_msg_interval">> => <<"1440m">>
|
|
|
+ }.
|