|
|
@@ -56,12 +56,10 @@ client_msgs_testcases() ->
|
|
|
].
|
|
|
|
|
|
init_per_suite(Config) ->
|
|
|
- ok = snabbkaffe:start_trace(),
|
|
|
emqx_mgmt_api_test_util:init_suite(),
|
|
|
Config.
|
|
|
|
|
|
end_per_suite(_) ->
|
|
|
- ok = snabbkaffe:stop(),
|
|
|
emqx_mgmt_api_test_util:end_suite().
|
|
|
|
|
|
init_per_group(persistent_sessions, Config) ->
|
|
|
@@ -95,10 +93,15 @@ end_per_group(persistent_sessions, Config) ->
|
|
|
end_per_group(_Group, _Config) ->
|
|
|
ok.
|
|
|
|
|
|
+init_per_testcase(_TC, Config) ->
|
|
|
+ ok = snabbkaffe:start_trace(),
|
|
|
+ Config.
|
|
|
+
|
|
|
end_per_testcase(TC, _Config) when
|
|
|
TC =:= t_inflight_messages;
|
|
|
TC =:= t_mqueue_messages
|
|
|
->
|
|
|
+ ok = snabbkaffe:stop(),
|
|
|
ClientId = atom_to_binary(TC),
|
|
|
lists:foreach(fun(P) -> exit(P, kill) end, emqx_cm:lookup_channels(local, ClientId)),
|
|
|
ok = emqx_common_test_helpers:wait_for(
|
|
|
@@ -108,7 +111,7 @@ end_per_testcase(TC, _Config) when
|
|
|
5000
|
|
|
);
|
|
|
end_per_testcase(_TC, _Config) ->
|
|
|
- ok.
|
|
|
+ ok = snabbkaffe:stop().
|
|
|
|
|
|
t_clients(_) ->
|
|
|
process_flag(trap_exit, true),
|
|
|
@@ -313,8 +316,7 @@ t_persistent_sessions2(Config) ->
|
|
|
%% 2) Client connects to the same node and takes over, listed only once.
|
|
|
C2 = connect_client(#{port => Port1, clientid => ClientId}),
|
|
|
assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
|
|
|
- ok = emqtt:stop(C2),
|
|
|
- ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]),
|
|
|
+ ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0}),
|
|
|
?retry(
|
|
|
100,
|
|
|
20,
|
|
|
@@ -322,9 +324,7 @@ t_persistent_sessions2(Config) ->
|
|
|
{ok, {{_, 200, _}, _, #{<<"data">> := []}}},
|
|
|
list_request(APIPort)
|
|
|
)
|
|
|
- ),
|
|
|
-
|
|
|
- ok
|
|
|
+ )
|
|
|
end,
|
|
|
[]
|
|
|
),
|
|
|
@@ -360,10 +360,7 @@ t_persistent_sessions3(Config) ->
|
|
|
list_request(APIPort, "node=" ++ atom_to_list(N1))
|
|
|
)
|
|
|
),
|
|
|
- ok = emqtt:stop(C2),
|
|
|
- ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]),
|
|
|
-
|
|
|
- ok
|
|
|
+ ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0})
|
|
|
end,
|
|
|
[]
|
|
|
),
|
|
|
@@ -403,10 +400,7 @@ t_persistent_sessions4(Config) ->
|
|
|
list_request(APIPort, "node=" ++ atom_to_list(N1))
|
|
|
)
|
|
|
),
|
|
|
- ok = emqtt:stop(C2),
|
|
|
- ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]),
|
|
|
-
|
|
|
- ok
|
|
|
+ ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0})
|
|
|
end,
|
|
|
[]
|
|
|
),
|
|
|
@@ -1076,18 +1070,19 @@ t_mqueue_messages(Config) ->
|
|
|
Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "mqueue_messages"]),
|
|
|
?assert(Count =< emqx:get_config([mqtt, max_mqueue_len])),
|
|
|
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
- test_messages(Path, Topic, Count, AuthHeader, ?config(payload_encoding, Config)),
|
|
|
+ IsMqueue = true,
|
|
|
+ test_messages(Path, Topic, Count, AuthHeader, ?config(payload_encoding, Config), IsMqueue),
|
|
|
|
|
|
?assertMatch(
|
|
|
{error, {_, 400, _}},
|
|
|
emqx_mgmt_api_test_util:request_api(
|
|
|
- get, Path, "limit=10&after=not-base64%23%21", AuthHeader
|
|
|
+ get, Path, "limit=10&position=not-valid", AuthHeader
|
|
|
)
|
|
|
),
|
|
|
?assertMatch(
|
|
|
{error, {_, 400, _}},
|
|
|
emqx_mgmt_api_test_util:request_api(
|
|
|
- get, Path, "limit=-5&after=not-base64%23%21", AuthHeader
|
|
|
+ get, Path, "limit=-5&position=not-valid", AuthHeader
|
|
|
)
|
|
|
).
|
|
|
|
|
|
@@ -1099,18 +1094,21 @@ t_inflight_messages(Config) ->
|
|
|
Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "inflight_messages"]),
|
|
|
InflightLimit = emqx:get_config([mqtt, max_inflight]),
|
|
|
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
- test_messages(Path, Topic, InflightLimit, AuthHeader, ?config(payload_encoding, Config)),
|
|
|
+ IsMqueue = false,
|
|
|
+ test_messages(
|
|
|
+ Path, Topic, InflightLimit, AuthHeader, ?config(payload_encoding, Config), IsMqueue
|
|
|
+ ),
|
|
|
|
|
|
?assertMatch(
|
|
|
{error, {_, 400, _}},
|
|
|
emqx_mgmt_api_test_util:request_api(
|
|
|
- get, Path, "limit=10&after=not-int", AuthHeader
|
|
|
+ get, Path, "limit=10&position=not-int", AuthHeader
|
|
|
)
|
|
|
),
|
|
|
?assertMatch(
|
|
|
{error, {_, 400, _}},
|
|
|
emqx_mgmt_api_test_util:request_api(
|
|
|
- get, Path, "limit=-5&after=invalid-int", AuthHeader
|
|
|
+ get, Path, "limit=-5&position=invalid-int", AuthHeader
|
|
|
)
|
|
|
),
|
|
|
emqtt:stop(Client).
|
|
|
@@ -1148,19 +1146,16 @@ publish_msgs(Topic, Count) ->
|
|
|
lists:seq(1, Count)
|
|
|
).
|
|
|
|
|
|
-test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) ->
|
|
|
+test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding, IsMqueue) ->
|
|
|
Qs0 = io_lib:format("payload=~s", [PayloadEncoding]),
|
|
|
{ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path, Qs0, AuthHeader),
|
|
|
#{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp),
|
|
|
+ #{<<"start">> := StartPos, <<"position">> := Pos} = Meta,
|
|
|
|
|
|
- ?assertMatch(
|
|
|
- #{
|
|
|
- <<"last">> := <<"end_of_data">>,
|
|
|
- <<"count">> := Count
|
|
|
- },
|
|
|
- Meta
|
|
|
- ),
|
|
|
+ ?assertEqual(StartPos, msg_pos(hd(Msgs), IsMqueue)),
|
|
|
+ ?assertEqual(Pos, msg_pos(lists:last(Msgs), IsMqueue)),
|
|
|
?assertEqual(length(Msgs), Count),
|
|
|
+
|
|
|
lists:foreach(
|
|
|
fun({Seq, #{<<"payload">> := P} = M}) ->
|
|
|
?assertEqual(Seq, binary_to_integer(decode_payload(P, PayloadEncoding))),
|
|
|
@@ -1171,10 +1166,12 @@ test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) ->
|
|
|
<<"qos">> := _,
|
|
|
<<"publish_at">> := _,
|
|
|
<<"from_clientid">> := _,
|
|
|
- <<"from_username">> := _
|
|
|
+ <<"from_username">> := _,
|
|
|
+ <<"inserted_at">> := _
|
|
|
},
|
|
|
M
|
|
|
- )
|
|
|
+ ),
|
|
|
+ IsMqueue andalso ?assertMatch(#{<<"mqueue_priority">> := _}, M)
|
|
|
end,
|
|
|
lists:zip(lists:seq(1, Count), Msgs)
|
|
|
),
|
|
|
@@ -1189,62 +1186,69 @@ test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) ->
|
|
|
get, Path, QsPayloadLimit, AuthHeader
|
|
|
),
|
|
|
#{<<"meta">> := _, <<"data">> := FirstMsgOnly} = emqx_utils_json:decode(LimitedMsgsResp),
|
|
|
- ct:pal("~p", [FirstMsgOnly]),
|
|
|
?assertEqual(1, length(FirstMsgOnly)),
|
|
|
?assertEqual(
|
|
|
<<"1">>, decode_payload(maps:get(<<"payload">>, hd(FirstMsgOnly)), PayloadEncoding)
|
|
|
),
|
|
|
|
|
|
Limit = 19,
|
|
|
- LastCont = lists:foldl(
|
|
|
- fun(PageSeq, Cont) ->
|
|
|
- Qs = io_lib:format("payload=~s&after=~s&limit=~p", [PayloadEncoding, Cont, Limit]),
|
|
|
- {ok, MsgsRespP} = emqx_mgmt_api_test_util:request_api(get, Path, Qs, AuthHeader),
|
|
|
+ LastPos = lists:foldl(
|
|
|
+ fun(PageSeq, ThisPos) ->
|
|
|
+ Qs = io_lib:format("payload=~s&position=~s&limit=~p", [PayloadEncoding, ThisPos, Limit]),
|
|
|
+ {ok, MsgsRespPage} = emqx_mgmt_api_test_util:request_api(get, Path, Qs, AuthHeader),
|
|
|
#{
|
|
|
- <<"meta">> := #{<<"last">> := NextCont} = MetaP,
|
|
|
- <<"data">> := MsgsP
|
|
|
- } = emqx_utils_json:decode(MsgsRespP),
|
|
|
- ?assertMatch(#{<<"count">> := Count}, MetaP),
|
|
|
- ?assertNotEqual(<<"end_of_data">>, NextCont),
|
|
|
- ?assertEqual(length(MsgsP), Limit),
|
|
|
+ <<"meta">> := #{<<"position">> := NextPos, <<"start">> := ThisStart},
|
|
|
+ <<"data">> := MsgsPage
|
|
|
+ } = emqx_utils_json:decode(MsgsRespPage),
|
|
|
+
|
|
|
+ ?assertEqual(NextPos, msg_pos(lists:last(MsgsPage), IsMqueue)),
|
|
|
+ %% Start position is the same in every response and points to the first msg
|
|
|
+ ?assertEqual(StartPos, ThisStart),
|
|
|
+
|
|
|
+ ?assertEqual(length(MsgsPage), Limit),
|
|
|
ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1),
|
|
|
ExpLastPayload = integer_to_binary(PageSeq * Limit),
|
|
|
?assertEqual(
|
|
|
- ExpFirstPayload, decode_payload(maps:get(<<"payload">>, hd(MsgsP)), PayloadEncoding)
|
|
|
+ ExpFirstPayload,
|
|
|
+ decode_payload(maps:get(<<"payload">>, hd(MsgsPage)), PayloadEncoding)
|
|
|
),
|
|
|
?assertEqual(
|
|
|
ExpLastPayload,
|
|
|
- decode_payload(maps:get(<<"payload">>, lists:last(MsgsP)), PayloadEncoding)
|
|
|
+ decode_payload(maps:get(<<"payload">>, lists:last(MsgsPage)), PayloadEncoding)
|
|
|
),
|
|
|
- NextCont
|
|
|
+ NextPos
|
|
|
end,
|
|
|
none,
|
|
|
lists:seq(1, Count div 19)
|
|
|
),
|
|
|
LastPartialPage = Count div 19 + 1,
|
|
|
- LastQs = io_lib:format("payload=~s&after=~s&limit=~p", [PayloadEncoding, LastCont, Limit]),
|
|
|
+ LastQs = io_lib:format("payload=~s&position=~s&limit=~p", [PayloadEncoding, LastPos, Limit]),
|
|
|
{ok, MsgsRespLastP} = emqx_mgmt_api_test_util:request_api(get, Path, LastQs, AuthHeader),
|
|
|
- #{<<"meta">> := #{<<"last">> := EmptyCont} = MetaLastP, <<"data">> := MsgsLastP} = emqx_utils_json:decode(
|
|
|
+ #{<<"meta">> := #{<<"position">> := LastPartialPos}, <<"data">> := MsgsLastPage} = emqx_utils_json:decode(
|
|
|
MsgsRespLastP
|
|
|
),
|
|
|
- ?assertEqual(<<"end_of_data">>, EmptyCont),
|
|
|
- ?assertMatch(#{<<"count">> := Count}, MetaLastP),
|
|
|
+ %% The same as the position of all messages returned in one request
|
|
|
+ ?assertEqual(Pos, LastPartialPos),
|
|
|
|
|
|
?assertEqual(
|
|
|
integer_to_binary(LastPartialPage * Limit - Limit + 1),
|
|
|
- decode_payload(maps:get(<<"payload">>, hd(MsgsLastP)), PayloadEncoding)
|
|
|
+ decode_payload(maps:get(<<"payload">>, hd(MsgsLastPage)), PayloadEncoding)
|
|
|
),
|
|
|
?assertEqual(
|
|
|
integer_to_binary(Count),
|
|
|
- decode_payload(maps:get(<<"payload">>, lists:last(MsgsLastP)), PayloadEncoding)
|
|
|
+ decode_payload(maps:get(<<"payload">>, lists:last(MsgsLastPage)), PayloadEncoding)
|
|
|
),
|
|
|
|
|
|
- ExceedQs = io_lib:format("payload=~s&after=~s&limit=~p", [
|
|
|
- PayloadEncoding, EmptyCont, Limit
|
|
|
+ ExceedQs = io_lib:format("payload=~s&position=~s&limit=~p", [
|
|
|
+ PayloadEncoding, LastPartialPos, Limit
|
|
|
]),
|
|
|
+ {ok, MsgsEmptyResp} = emqx_mgmt_api_test_util:request_api(get, Path, ExceedQs, AuthHeader),
|
|
|
?assertMatch(
|
|
|
- {error, {_, 400, _}},
|
|
|
- emqx_mgmt_api_test_util:request_api(get, Path, ExceedQs, AuthHeader)
|
|
|
+ #{
|
|
|
+ <<"data">> := [],
|
|
|
+ <<"meta">> := #{<<"position">> := LastPartialPos, <<"start">> := StartPos}
|
|
|
+ },
|
|
|
+ emqx_utils_json:decode(MsgsEmptyResp)
|
|
|
),
|
|
|
|
|
|
%% Invalid common page params
|
|
|
@@ -1275,6 +1279,11 @@ test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) ->
|
|
|
emqx_mgmt_api_test_util:request_api(get, Path, "max_payload_bytes=0MB", AuthHeader)
|
|
|
).
|
|
|
|
|
|
+msg_pos(#{<<"inserted_at">> := TsBin, <<"mqueue_priority">> := Prio} = _Msg, true = _IsMqueue) ->
|
|
|
+ <<TsBin/binary, "_", (emqx_utils_conv:bin(Prio))/binary>>;
|
|
|
+msg_pos(#{<<"inserted_at">> := TsBin} = _Msg, _IsMqueue) ->
|
|
|
+ TsBin.
|
|
|
+
|
|
|
decode_payload(Payload, base64) ->
|
|
|
base64:decode(Payload);
|
|
|
decode_payload(Payload, _) ->
|