|
|
@@ -1193,7 +1193,7 @@ t_mqueue_messages(Config) ->
|
|
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
|
Topic = <<"t/test_mqueue_msgs">>,
|
|
|
Count = emqx_mgmt:default_row_limit(),
|
|
|
- {ok, _Client} = client_with_mqueue(ClientId, Topic, Count),
|
|
|
+ ok = client_with_mqueue(ClientId, Topic, Count),
|
|
|
Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "mqueue_messages"]),
|
|
|
?assert(Count =< emqx:get_config([mqtt, max_mqueue_len])),
|
|
|
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
@@ -1244,14 +1244,16 @@ client_with_mqueue(ClientId, Topic, Count) ->
|
|
|
{ok, Client} = emqtt:start_link([
|
|
|
{proto_ver, v5},
|
|
|
{clientid, ClientId},
|
|
|
- {clean_start, false},
|
|
|
+ {clean_start, true},
|
|
|
{properties, #{'Session-Expiry-Interval' => 120}}
|
|
|
]),
|
|
|
{ok, _} = emqtt:connect(Client),
|
|
|
{ok, _, _} = emqtt:subscribe(Client, Topic, 1),
|
|
|
+ ct:sleep(300),
|
|
|
ok = emqtt:disconnect(Client),
|
|
|
+ ct:sleep(100),
|
|
|
publish_msgs(Topic, Count),
|
|
|
- {ok, Client}.
|
|
|
+ ok.
|
|
|
|
|
|
client_with_inflight(ClientId, Topic, Count) ->
|
|
|
{ok, Client} = emqtt:start_link([
|
|
|
@@ -1275,13 +1277,18 @@ publish_msgs(Topic, Count) ->
|
|
|
|
|
|
test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding, IsMqueue) ->
|
|
|
Qs0 = io_lib:format("payload=~s", [PayloadEncoding]),
|
|
|
- {ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path, Qs0, AuthHeader),
|
|
|
- #{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp),
|
|
|
- #{<<"start">> := StartPos, <<"position">> := Pos} = Meta,
|
|
|
|
|
|
- ?assertEqual(StartPos, msg_pos(hd(Msgs), IsMqueue)),
|
|
|
- ?assertEqual(Pos, msg_pos(lists:last(Msgs), IsMqueue)),
|
|
|
- ?assertEqual(length(Msgs), Count),
|
|
|
+ {Msgs, StartPos, Pos} = ?retry(500, 10, begin
|
|
|
+ {ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path, Qs0, AuthHeader),
|
|
|
+ #{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp),
|
|
|
+ #{<<"start">> := StartPos, <<"position">> := Pos} = Meta,
|
|
|
+
|
|
|
+ ?assertEqual(StartPos, msg_pos(hd(Msgs), IsMqueue)),
|
|
|
+ ?assertEqual(Pos, msg_pos(lists:last(Msgs), IsMqueue)),
|
|
|
+ ?assertEqual(length(Msgs), Count),
|
|
|
+
|
|
|
+ {Msgs, StartPos, Pos}
|
|
|
+ end),
|
|
|
|
|
|
lists:foreach(
|
|
|
fun({Seq, #{<<"payload">> := P} = M}) ->
|