|
|
@@ -23,16 +23,23 @@
|
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
-include_lib("emqx/include/asserts.hrl").
|
|
|
+-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
|
|
|
|
all() ->
|
|
|
AllTCs = emqx_common_test_helpers:all(?MODULE),
|
|
|
[
|
|
|
- {group, persistent_sessions}
|
|
|
- | AllTCs -- persistent_session_testcases()
|
|
|
+ {group, persistent_sessions},
|
|
|
+ {group, msgs_base64_encoding},
|
|
|
+ {group, msgs_plain_encoding}
|
|
|
+ | AllTCs -- (persistent_session_testcases() ++ client_msgs_testcases())
|
|
|
].
|
|
|
|
|
|
groups() ->
|
|
|
- [{persistent_sessions, persistent_session_testcases()}].
|
|
|
+ [
|
|
|
+ {persistent_sessions, persistent_session_testcases()},
|
|
|
+ {msgs_base64_encoding, client_msgs_testcases()},
|
|
|
+ {msgs_plain_encoding, client_msgs_testcases()}
|
|
|
+ ].
|
|
|
|
|
|
persistent_session_testcases() ->
|
|
|
[
|
|
|
@@ -42,12 +49,19 @@ persistent_session_testcases() ->
|
|
|
t_persistent_sessions4,
|
|
|
t_persistent_sessions5
|
|
|
].
|
|
|
+client_msgs_testcases() ->
|
|
|
+ [
|
|
|
+ t_inflight_messages,
|
|
|
+ t_mqueue_messages
|
|
|
+ ].
|
|
|
|
|
|
init_per_suite(Config) ->
|
|
|
+ ok = snabbkaffe:start_trace(),
|
|
|
emqx_mgmt_api_test_util:init_suite(),
|
|
|
Config.
|
|
|
|
|
|
end_per_suite(_) ->
|
|
|
+ ok = snabbkaffe:stop(),
|
|
|
emqx_mgmt_api_test_util:end_suite().
|
|
|
|
|
|
init_per_group(persistent_sessions, Config) ->
|
|
|
@@ -67,6 +81,10 @@ init_per_group(persistent_sessions, Config) ->
|
|
|
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
|
|
),
|
|
|
[{nodes, Nodes} | Config];
|
|
|
+init_per_group(msgs_base64_encoding, Config) ->
|
|
|
+ [{payload_encoding, base64} | Config];
|
|
|
+init_per_group(msgs_plain_encoding, Config) ->
|
|
|
+ [{payload_encoding, plain} | Config];
|
|
|
init_per_group(_Group, Config) ->
|
|
|
Config.
|
|
|
|
|
|
@@ -77,6 +95,21 @@ end_per_group(persistent_sessions, Config) ->
|
|
|
end_per_group(_Group, _Config) ->
|
|
|
ok.
|
|
|
|
|
|
+end_per_testcase(TC, _Config) when
|
|
|
+ TC =:= t_inflight_messages;
|
|
|
+ TC =:= t_mqueue_messages
|
|
|
+->
|
|
|
+ ClientId = atom_to_binary(TC),
|
|
|
+ lists:foreach(fun(P) -> exit(P, kill) end, emqx_cm:lookup_channels(local, ClientId)),
|
|
|
+ ok = emqx_common_test_helpers:wait_for(
|
|
|
+ ?FUNCTION_NAME,
|
|
|
+ ?LINE,
|
|
|
+ fun() -> [] =:= emqx_cm:lookup_channels(local, ClientId) end,
|
|
|
+ 5000
|
|
|
+ );
|
|
|
+end_per_testcase(_TC, _Config) ->
|
|
|
+ ok.
|
|
|
+
|
|
|
t_clients(_) ->
|
|
|
process_flag(trap_exit, true),
|
|
|
|
|
|
@@ -682,6 +715,238 @@ t_query_clients_with_time(_) ->
|
|
|
{ok, _} = emqx_mgmt_api_test_util:request_api(delete, Client1Path),
|
|
|
{ok, _} = emqx_mgmt_api_test_util:request_api(delete, Client2Path).
|
|
|
|
|
|
+t_query_multiple_clients(_) ->
|
|
|
+ process_flag(trap_exit, true),
|
|
|
+ ClientIdsUsers = [
|
|
|
+ {<<"multi_client1">>, <<"multi_user1">>},
|
|
|
+ {<<"multi_client1-1">>, <<"multi_user1">>},
|
|
|
+ {<<"multi_client2">>, <<"multi_user2">>},
|
|
|
+ {<<"multi_client2-1">>, <<"multi_user2">>},
|
|
|
+ {<<"multi_client3">>, <<"multi_user3">>},
|
|
|
+ {<<"multi_client3-1">>, <<"multi_user3">>},
|
|
|
+ {<<"multi_client4">>, <<"multi_user4">>},
|
|
|
+ {<<"multi_client4-1">>, <<"multi_user4">>}
|
|
|
+ ],
|
|
|
+ _Clients = lists:map(
|
|
|
+ fun({ClientId, Username}) ->
|
|
|
+ {ok, C} = emqtt:start_link(#{clientid => ClientId, username => Username}),
|
|
|
+ {ok, _} = emqtt:connect(C),
|
|
|
+ C
|
|
|
+ end,
|
|
|
+ ClientIdsUsers
|
|
|
+ ),
|
|
|
+ timer:sleep(100),
|
|
|
+
|
|
|
+ Auth = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
+
|
|
|
+ %% Not found clients/users
|
|
|
+ ?assertEqual([], get_clients(Auth, "clientid=no_such_client")),
|
|
|
+ ?assertEqual([], get_clients(Auth, "clientid=no_such_client&clientid=no_such_client1")),
|
|
|
+ %% Duplicates must cause no issues
|
|
|
+ ?assertEqual([], get_clients(Auth, "clientid=no_such_client&clientid=no_such_client")),
|
|
|
+ ?assertEqual([], get_clients(Auth, "username=no_such_user&clientid=no_such_user1")),
|
|
|
+ ?assertEqual([], get_clients(Auth, "username=no_such_user&clientid=no_such_user")),
|
|
|
+ ?assertEqual(
|
|
|
+ [],
|
|
|
+ get_clients(
|
|
|
+ Auth,
|
|
|
+ "clientid=no_such_client&clientid=no_such_client"
|
|
|
+ "username=no_such_user&clientid=no_such_user1"
|
|
|
+ )
|
|
|
+ ),
|
|
|
+
|
|
|
+ %% Requested ClientId / username values relate to different clients
|
|
|
+ ?assertEqual([], get_clients(Auth, "clientid=multi_client1&username=multi_user2")),
|
|
|
+ ?assertEqual(
|
|
|
+ [],
|
|
|
+ get_clients(
|
|
|
+ Auth,
|
|
|
+ "clientid=multi_client1&clientid=multi_client1-1"
|
|
|
+ "&username=multi_user2&username=multi_user3"
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ ?assertEqual([<<"multi_client1">>], get_clients(Auth, "clientid=multi_client1")),
|
|
|
+ %% Duplicates must cause no issues
|
|
|
+ ?assertEqual(
|
|
|
+ [<<"multi_client1">>], get_clients(Auth, "clientid=multi_client1&clientid=multi_client1")
|
|
|
+ ),
|
|
|
+ ?assertEqual(
|
|
|
+ [<<"multi_client1">>], get_clients(Auth, "clientid=multi_client1&username=multi_user1")
|
|
|
+ ),
|
|
|
+ ?assertEqual(
|
|
|
+ lists:sort([<<"multi_client1">>, <<"multi_client1-1">>]),
|
|
|
+ lists:sort(get_clients(Auth, "username=multi_user1"))
|
|
|
+ ),
|
|
|
+ ?assertEqual(
|
|
|
+ lists:sort([<<"multi_client1">>, <<"multi_client1-1">>]),
|
|
|
+ lists:sort(get_clients(Auth, "clientid=multi_client1&clientid=multi_client1-1"))
|
|
|
+ ),
|
|
|
+ ?assertEqual(
|
|
|
+ lists:sort([<<"multi_client1">>, <<"multi_client1-1">>]),
|
|
|
+ lists:sort(
|
|
|
+ get_clients(
|
|
|
+ Auth,
|
|
|
+ "clientid=multi_client1&clientid=multi_client1-1"
|
|
|
+ "&username=multi_user1"
|
|
|
+ )
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ ?assertEqual(
|
|
|
+ lists:sort([<<"multi_client1">>, <<"multi_client1-1">>]),
|
|
|
+ lists:sort(
|
|
|
+ get_clients(
|
|
|
+ Auth,
|
|
|
+ "clientid=no-such-client&clientid=multi_client1&clientid=multi_client1-1"
|
|
|
+ "&username=multi_user1"
|
|
|
+ )
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ ?assertEqual(
|
|
|
+ lists:sort([<<"multi_client1">>, <<"multi_client1-1">>]),
|
|
|
+ lists:sort(
|
|
|
+ get_clients(
|
|
|
+ Auth,
|
|
|
+ "clientid=no-such-client&clientid=multi_client1&clientid=multi_client1-1"
|
|
|
+ "&username=multi_user1&username=no-such-user"
|
|
|
+ )
|
|
|
+ )
|
|
|
+ ),
|
|
|
+
|
|
|
+ AllQsFun = fun(QsKey, Pos) ->
|
|
|
+ QsParts = [
|
|
|
+ QsKey ++ "=" ++ binary_to_list(element(Pos, ClientUser))
|
|
|
+ || ClientUser <- ClientIdsUsers
|
|
|
+ ],
|
|
|
+ lists:flatten(lists:join("&", QsParts))
|
|
|
+ end,
|
|
|
+ AllClientsQs = AllQsFun("clientid", 1),
|
|
|
+ AllUsersQs = AllQsFun("username", 2),
|
|
|
+ AllClientIds = lists:sort([C || {C, _U} <- ClientIdsUsers]),
|
|
|
+
|
|
|
+ ?assertEqual(AllClientIds, lists:sort(get_clients(Auth, AllClientsQs))),
|
|
|
+ ?assertEqual(AllClientIds, lists:sort(get_clients(Auth, AllUsersQs))),
|
|
|
+ ?assertEqual(AllClientIds, lists:sort(get_clients(Auth, AllClientsQs ++ "&" ++ AllUsersQs))),
|
|
|
+
|
|
|
+ %% Test with other filter params
|
|
|
+ NodeQs = "&node=" ++ atom_to_list(node()),
|
|
|
+ NoNodeQs = "&node=nonode@nohost",
|
|
|
+ ?assertEqual(
|
|
|
+ AllClientIds, lists:sort(get_clients(Auth, AllClientsQs ++ "&" ++ AllUsersQs ++ NodeQs))
|
|
|
+ ),
|
|
|
+ ?assertMatch(
|
|
|
+ {error, _}, get_clients_expect_error(Auth, AllClientsQs ++ "&" ++ AllUsersQs ++ NoNodeQs)
|
|
|
+ ),
|
|
|
+
|
|
|
+ %% fuzzy search (like_{key}) must be ignored if accurate filter ({key}) is present
|
|
|
+ ?assertEqual(
|
|
|
+ AllClientIds,
|
|
|
+ lists:sort(get_clients(Auth, AllClientsQs ++ "&" ++ AllUsersQs ++ "&like_clientid=multi"))
|
|
|
+ ),
|
|
|
+ ?assertEqual(
|
|
|
+ AllClientIds,
|
|
|
+ lists:sort(get_clients(Auth, AllClientsQs ++ "&" ++ AllUsersQs ++ "&like_username=multi"))
|
|
|
+ ),
|
|
|
+ ?assertEqual(
|
|
|
+ AllClientIds,
|
|
|
+ lists:sort(
|
|
|
+ get_clients(Auth, AllClientsQs ++ "&" ++ AllUsersQs ++ "&like_clientid=does-not-matter")
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ ?assertEqual(
|
|
|
+ AllClientIds,
|
|
|
+ lists:sort(
|
|
|
+ get_clients(Auth, AllClientsQs ++ "&" ++ AllUsersQs ++ "&like_username=does-not-matter")
|
|
|
+ )
|
|
|
+ ),
|
|
|
+
|
|
|
+ %% Combining multiple clientids with like_username and vice versa must narrow down search results
|
|
|
+ ?assertEqual(
|
|
|
+ lists:sort([<<"multi_client1">>, <<"multi_client1-1">>]),
|
|
|
+ lists:sort(get_clients(Auth, AllClientsQs ++ "&like_username=user1"))
|
|
|
+ ),
|
|
|
+ ?assertEqual(
|
|
|
+ lists:sort([<<"multi_client1">>, <<"multi_client1-1">>]),
|
|
|
+ lists:sort(get_clients(Auth, AllUsersQs ++ "&like_clientid=client1"))
|
|
|
+ ),
|
|
|
+ ?assertEqual([], get_clients(Auth, AllClientsQs ++ "&like_username=nouser")),
|
|
|
+ ?assertEqual([], get_clients(Auth, AllUsersQs ++ "&like_clientid=nouser")).
|
|
|
+
|
|
|
+t_query_multiple_clients_urlencode(_) ->
|
|
|
+ process_flag(trap_exit, true),
|
|
|
+ ClientIdsUsers = [
|
|
|
+ {<<"multi_client=a?">>, <<"multi_user=a?">>},
|
|
|
+ {<<"mutli_client=b?">>, <<"multi_user=b?">>}
|
|
|
+ ],
|
|
|
+ _Clients = lists:map(
|
|
|
+ fun({ClientId, Username}) ->
|
|
|
+ {ok, C} = emqtt:start_link(#{clientid => ClientId, username => Username}),
|
|
|
+ {ok, _} = emqtt:connect(C),
|
|
|
+ C
|
|
|
+ end,
|
|
|
+ ClientIdsUsers
|
|
|
+ ),
|
|
|
+ timer:sleep(100),
|
|
|
+
|
|
|
+ Auth = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
+ ClientsQs = uri_string:compose_query([{<<"clientid">>, C} || {C, _} <- ClientIdsUsers]),
|
|
|
+ UsersQs = uri_string:compose_query([{<<"username">>, U} || {_, U} <- ClientIdsUsers]),
|
|
|
+ ExpectedClients = lists:sort([C || {C, _} <- ClientIdsUsers]),
|
|
|
+ ?assertEqual(ExpectedClients, lists:sort(get_clients(Auth, ClientsQs))),
|
|
|
+ ?assertEqual(ExpectedClients, lists:sort(get_clients(Auth, UsersQs))).
|
|
|
+
|
|
|
+t_query_clients_with_fields(_) ->
|
|
|
+ process_flag(trap_exit, true),
|
|
|
+ TCBin = atom_to_binary(?FUNCTION_NAME),
|
|
|
+ ClientId = <<TCBin/binary, "_client">>,
|
|
|
+ Username = <<TCBin/binary, "_user">>,
|
|
|
+ {ok, C} = emqtt:start_link(#{clientid => ClientId, username => Username}),
|
|
|
+ {ok, _} = emqtt:connect(C),
|
|
|
+ timer:sleep(100),
|
|
|
+
|
|
|
+ Auth = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
+ ?assertEqual([#{<<"clientid">> => ClientId}], get_clients_all_fields(Auth, "fields=clientid")),
|
|
|
+ ?assertEqual(
|
|
|
+ [#{<<"clientid">> => ClientId, <<"username">> => Username}],
|
|
|
+ get_clients_all_fields(Auth, "fields=clientid,username")
|
|
|
+ ),
|
|
|
+
|
|
|
+ AllFields = get_clients_all_fields(Auth, "fields=all"),
|
|
|
+ DefaultFields = get_clients_all_fields(Auth, ""),
|
|
|
+
|
|
|
+ ?assertEqual(AllFields, DefaultFields),
|
|
|
+ ?assertMatch(
|
|
|
+ [#{<<"clientid">> := ClientId, <<"username">> := Username}],
|
|
|
+ AllFields
|
|
|
+ ),
|
|
|
+ ?assert(map_size(hd(AllFields)) > 2),
|
|
|
+ ?assertMatch({error, _}, get_clients_expect_error(Auth, "fields=bad_field_name")),
|
|
|
+ ?assertMatch({error, _}, get_clients_expect_error(Auth, "fields=all,bad_field_name")),
|
|
|
+ ?assertMatch({error, _}, get_clients_expect_error(Auth, "fields=all,username,clientid")).
|
|
|
+
|
|
|
+get_clients_all_fields(Auth, Qs) ->
|
|
|
+ get_clients(Auth, Qs, false, false).
|
|
|
+
|
|
|
+get_clients_expect_error(Auth, Qs) ->
|
|
|
+ get_clients(Auth, Qs, true, true).
|
|
|
+
|
|
|
+get_clients(Auth, Qs) ->
|
|
|
+ get_clients(Auth, Qs, false, true).
|
|
|
+
|
|
|
+get_clients(Auth, Qs, ExpectError, ClientIdOnly) ->
|
|
|
+ ClientsPath = emqx_mgmt_api_test_util:api_path(["clients"]),
|
|
|
+ Resp = emqx_mgmt_api_test_util:request_api(get, ClientsPath, Qs, Auth),
|
|
|
+ case ExpectError of
|
|
|
+ false ->
|
|
|
+ {ok, Body} = Resp,
|
|
|
+ #{<<"data">> := Clients} = emqx_utils_json:decode(Body),
|
|
|
+ case ClientIdOnly of
|
|
|
+ true -> [ClientId || #{<<"clientid">> := ClientId} <- Clients];
|
|
|
+ false -> Clients
|
|
|
+ end;
|
|
|
+ true ->
|
|
|
+ Resp
|
|
|
+ end.
|
|
|
+
|
|
|
t_keepalive(_Config) ->
|
|
|
Username = "user_keepalive",
|
|
|
ClientId = "client_keepalive",
|
|
|
@@ -759,8 +1024,262 @@ t_client_id_not_found(_Config) ->
|
|
|
?assertMatch({error, {Http, _, Body}}, PostFun(post, PathFun(["unsubscribe"]), UnsubBody)),
|
|
|
?assertMatch(
|
|
|
{error, {Http, _, Body}}, PostFun(post, PathFun(["unsubscribe", "bulk"]), [UnsubBody])
|
|
|
+ ),
|
|
|
+ %% Mqueue messages
|
|
|
+ ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["mqueue_messages"]))),
|
|
|
+ %% Inflight messages
|
|
|
+ ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["inflight_messages"]))).
|
|
|
+
|
|
|
+t_sessions_count(_Config) ->
|
|
|
+ ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
|
+ Topic = <<"t/test_sessions_count">>,
|
|
|
+ Conf0 = emqx_config:get([broker]),
|
|
|
+ Conf1 = hocon_maps:deep_merge(Conf0, #{session_history_retain => 5}),
|
|
|
+ %% from 1 seconds ago, which is for sure less than histry retain duration
|
|
|
+ %% hence force a call to the gen_server emqx_cm_registry_keeper
|
|
|
+ Since = erlang:system_time(seconds) - 1,
|
|
|
+ ok = emqx_config:put(#{broker => Conf1}),
|
|
|
+ {ok, Client} = emqtt:start_link([
|
|
|
+ {proto_ver, v5},
|
|
|
+ {clientid, ClientId},
|
|
|
+ {clean_start, true}
|
|
|
+ ]),
|
|
|
+ {ok, _} = emqtt:connect(Client),
|
|
|
+ {ok, _, _} = emqtt:subscribe(Client, Topic, 1),
|
|
|
+ Path = emqx_mgmt_api_test_util:api_path(["sessions_count"]),
|
|
|
+ AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
+ ?assertMatch(
|
|
|
+ {ok, "1"},
|
|
|
+ emqx_mgmt_api_test_util:request_api(
|
|
|
+ get, Path, "since=" ++ integer_to_list(Since), AuthHeader
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ ok = emqtt:disconnect(Client),
|
|
|
+ %% simulate the situation in which the process is not running
|
|
|
+ ok = supervisor:terminate_child(emqx_cm_sup, emqx_cm_registry_keeper),
|
|
|
+ ?assertMatch(
|
|
|
+ {error, {_, 400, _}},
|
|
|
+ emqx_mgmt_api_test_util:request_api(
|
|
|
+ get, Path, "since=" ++ integer_to_list(Since), AuthHeader
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ %% restore default value
|
|
|
+ ok = emqx_config:put(#{broker => Conf0}),
|
|
|
+ ok = emqx_cm_registry_keeper:purge(),
|
|
|
+ ok.
|
|
|
+
|
|
|
+t_mqueue_messages(Config) ->
|
|
|
+ ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
|
+ Topic = <<"t/test_mqueue_msgs">>,
|
|
|
+ Count = emqx_mgmt:default_row_limit(),
|
|
|
+ {ok, _Client} = client_with_mqueue(ClientId, Topic, Count),
|
|
|
+ Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "mqueue_messages"]),
|
|
|
+ ?assert(Count =< emqx:get_config([mqtt, max_mqueue_len])),
|
|
|
+ AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
+ test_messages(Path, Topic, Count, AuthHeader, ?config(payload_encoding, Config)),
|
|
|
+
|
|
|
+ ?assertMatch(
|
|
|
+ {error, {_, 400, _}},
|
|
|
+ emqx_mgmt_api_test_util:request_api(
|
|
|
+ get, Path, "limit=10&after=not-base64%23%21", AuthHeader
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ ?assertMatch(
|
|
|
+ {error, {_, 400, _}},
|
|
|
+ emqx_mgmt_api_test_util:request_api(
|
|
|
+ get, Path, "limit=-5&after=not-base64%23%21", AuthHeader
|
|
|
+ )
|
|
|
+ ).
|
|
|
+
|
|
|
+t_inflight_messages(Config) ->
|
|
|
+ ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
|
+ Topic = <<"t/test_inflight_msgs">>,
|
|
|
+ PubCount = emqx_mgmt:default_row_limit(),
|
|
|
+ {ok, Client} = client_with_inflight(ClientId, Topic, PubCount),
|
|
|
+ Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "inflight_messages"]),
|
|
|
+ InflightLimit = emqx:get_config([mqtt, max_inflight]),
|
|
|
+ AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
+ test_messages(Path, Topic, InflightLimit, AuthHeader, ?config(payload_encoding, Config)),
|
|
|
+
|
|
|
+ ?assertMatch(
|
|
|
+ {error, {_, 400, _}},
|
|
|
+ emqx_mgmt_api_test_util:request_api(
|
|
|
+ get, Path, "limit=10&after=not-int", AuthHeader
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ ?assertMatch(
|
|
|
+ {error, {_, 400, _}},
|
|
|
+ emqx_mgmt_api_test_util:request_api(
|
|
|
+ get, Path, "limit=-5&after=invalid-int", AuthHeader
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ emqtt:stop(Client).
|
|
|
+
|
|
|
+client_with_mqueue(ClientId, Topic, Count) ->
|
|
|
+ {ok, Client} = emqtt:start_link([
|
|
|
+ {proto_ver, v5},
|
|
|
+ {clientid, ClientId},
|
|
|
+ {clean_start, false},
|
|
|
+ {properties, #{'Session-Expiry-Interval' => 120}}
|
|
|
+ ]),
|
|
|
+ {ok, _} = emqtt:connect(Client),
|
|
|
+ {ok, _, _} = emqtt:subscribe(Client, Topic, 1),
|
|
|
+ ok = emqtt:disconnect(Client),
|
|
|
+ publish_msgs(Topic, Count),
|
|
|
+ {ok, Client}.
|
|
|
+
|
|
|
+client_with_inflight(ClientId, Topic, Count) ->
|
|
|
+ {ok, Client} = emqtt:start_link([
|
|
|
+ {proto_ver, v5},
|
|
|
+ {clientid, ClientId},
|
|
|
+ {clean_start, true},
|
|
|
+ {auto_ack, never}
|
|
|
+ ]),
|
|
|
+ {ok, _} = emqtt:connect(Client),
|
|
|
+ {ok, _, _} = emqtt:subscribe(Client, Topic, 1),
|
|
|
+ publish_msgs(Topic, Count),
|
|
|
+ {ok, Client}.
|
|
|
+
|
|
|
+publish_msgs(Topic, Count) ->
|
|
|
+ lists:foreach(
|
|
|
+ fun(Seq) ->
|
|
|
+ emqx_broker:publish(emqx_message:make(undefined, ?QOS_1, Topic, integer_to_binary(Seq)))
|
|
|
+ end,
|
|
|
+ lists:seq(1, Count)
|
|
|
+ ).
|
|
|
+
|
|
|
+test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) ->
|
|
|
+ Qs0 = io_lib:format("payload=~s", [PayloadEncoding]),
|
|
|
+ {ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path, Qs0, AuthHeader),
|
|
|
+ #{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp),
|
|
|
+
|
|
|
+ ?assertMatch(
|
|
|
+ #{
|
|
|
+ <<"last">> := <<"end_of_data">>,
|
|
|
+ <<"count">> := Count
|
|
|
+ },
|
|
|
+ Meta
|
|
|
+ ),
|
|
|
+ ?assertEqual(length(Msgs), Count),
|
|
|
+ lists:foreach(
|
|
|
+ fun({Seq, #{<<"payload">> := P} = M}) ->
|
|
|
+ ?assertEqual(Seq, binary_to_integer(decode_payload(P, PayloadEncoding))),
|
|
|
+ ?assertMatch(
|
|
|
+ #{
|
|
|
+ <<"msgid">> := _,
|
|
|
+ <<"topic">> := Topic,
|
|
|
+ <<"qos">> := _,
|
|
|
+ <<"publish_at">> := _,
|
|
|
+ <<"from_clientid">> := _,
|
|
|
+ <<"from_username">> := _
|
|
|
+ },
|
|
|
+ M
|
|
|
+ )
|
|
|
+ end,
|
|
|
+ lists:zip(lists:seq(1, Count), Msgs)
|
|
|
+ ),
|
|
|
+
|
|
|
+ %% The first message payload is <<"1">>,
|
|
|
+ %% and when it is urlsafe base64 encoded (with no padding), it's <<"MQ">>,
|
|
|
+ %% so we cover both cases:
|
|
|
+ %% - when total payload size exceeds the limit,
|
|
|
+ %% - when the first message payload already exceeds the limit but is still returned in the response.
|
|
|
+ QsPayloadLimit = io_lib:format("payload=~s&max_payload_bytes=1", [PayloadEncoding]),
|
|
|
+ {ok, LimitedMsgsResp} = emqx_mgmt_api_test_util:request_api(
|
|
|
+ get, Path, QsPayloadLimit, AuthHeader
|
|
|
+ ),
|
|
|
+ #{<<"meta">> := _, <<"data">> := FirstMsgOnly} = emqx_utils_json:decode(LimitedMsgsResp),
|
|
|
+ ct:pal("~p", [FirstMsgOnly]),
|
|
|
+ ?assertEqual(1, length(FirstMsgOnly)),
|
|
|
+ ?assertEqual(
|
|
|
+ <<"1">>, decode_payload(maps:get(<<"payload">>, hd(FirstMsgOnly)), PayloadEncoding)
|
|
|
+ ),
|
|
|
+
|
|
|
+ Limit = 19,
|
|
|
+ LastCont = lists:foldl(
|
|
|
+ fun(PageSeq, Cont) ->
|
|
|
+ Qs = io_lib:format("payload=~s&after=~s&limit=~p", [PayloadEncoding, Cont, Limit]),
|
|
|
+ {ok, MsgsRespP} = emqx_mgmt_api_test_util:request_api(get, Path, Qs, AuthHeader),
|
|
|
+ #{
|
|
|
+ <<"meta">> := #{<<"last">> := NextCont} = MetaP,
|
|
|
+ <<"data">> := MsgsP
|
|
|
+ } = emqx_utils_json:decode(MsgsRespP),
|
|
|
+ ?assertMatch(#{<<"count">> := Count}, MetaP),
|
|
|
+ ?assertNotEqual(<<"end_of_data">>, NextCont),
|
|
|
+ ?assertEqual(length(MsgsP), Limit),
|
|
|
+ ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1),
|
|
|
+ ExpLastPayload = integer_to_binary(PageSeq * Limit),
|
|
|
+ ?assertEqual(
|
|
|
+ ExpFirstPayload, decode_payload(maps:get(<<"payload">>, hd(MsgsP)), PayloadEncoding)
|
|
|
+ ),
|
|
|
+ ?assertEqual(
|
|
|
+ ExpLastPayload,
|
|
|
+ decode_payload(maps:get(<<"payload">>, lists:last(MsgsP)), PayloadEncoding)
|
|
|
+ ),
|
|
|
+ NextCont
|
|
|
+ end,
|
|
|
+ none,
|
|
|
+ lists:seq(1, Count div 19)
|
|
|
+ ),
|
|
|
+ LastPartialPage = Count div 19 + 1,
|
|
|
+ LastQs = io_lib:format("payload=~s&after=~s&limit=~p", [PayloadEncoding, LastCont, Limit]),
|
|
|
+ {ok, MsgsRespLastP} = emqx_mgmt_api_test_util:request_api(get, Path, LastQs, AuthHeader),
|
|
|
+ #{<<"meta">> := #{<<"last">> := EmptyCont} = MetaLastP, <<"data">> := MsgsLastP} = emqx_utils_json:decode(
|
|
|
+ MsgsRespLastP
|
|
|
+ ),
|
|
|
+ ?assertEqual(<<"end_of_data">>, EmptyCont),
|
|
|
+ ?assertMatch(#{<<"count">> := Count}, MetaLastP),
|
|
|
+
|
|
|
+ ?assertEqual(
|
|
|
+ integer_to_binary(LastPartialPage * Limit - Limit + 1),
|
|
|
+ decode_payload(maps:get(<<"payload">>, hd(MsgsLastP)), PayloadEncoding)
|
|
|
+ ),
|
|
|
+ ?assertEqual(
|
|
|
+ integer_to_binary(Count),
|
|
|
+ decode_payload(maps:get(<<"payload">>, lists:last(MsgsLastP)), PayloadEncoding)
|
|
|
+ ),
|
|
|
+
|
|
|
+ ExceedQs = io_lib:format("payload=~s&after=~s&limit=~p", [
|
|
|
+ PayloadEncoding, EmptyCont, Limit
|
|
|
+ ]),
|
|
|
+ ?assertMatch(
|
|
|
+ {error, {_, 400, _}},
|
|
|
+ emqx_mgmt_api_test_util:request_api(get, Path, ExceedQs, AuthHeader)
|
|
|
+ ),
|
|
|
+
|
|
|
+ %% Invalid common page params
|
|
|
+ ?assertMatch(
|
|
|
+ {error, {_, 400, _}},
|
|
|
+ emqx_mgmt_api_test_util:request_api(get, Path, "limit=0", AuthHeader)
|
|
|
+ ),
|
|
|
+ ?assertMatch(
|
|
|
+ {error, {_, 400, _}},
|
|
|
+ emqx_mgmt_api_test_util:request_api(get, Path, "limit=limit", AuthHeader)
|
|
|
+ ),
|
|
|
+
|
|
|
+ %% Invalid max_paylod_bytes param
|
|
|
+ ?assertMatch(
|
|
|
+ {error, {_, 400, _}},
|
|
|
+ emqx_mgmt_api_test_util:request_api(get, Path, "max_payload_bytes=0", AuthHeader)
|
|
|
+ ),
|
|
|
+ ?assertMatch(
|
|
|
+ {error, {_, 400, _}},
|
|
|
+ emqx_mgmt_api_test_util:request_api(get, Path, "max_payload_bytes=-1", AuthHeader)
|
|
|
+ ),
|
|
|
+ ?assertMatch(
|
|
|
+ {error, {_, 400, _}},
|
|
|
+ emqx_mgmt_api_test_util:request_api(get, Path, "max_payload_bytes=-1MB", AuthHeader)
|
|
|
+ ),
|
|
|
+ ?assertMatch(
|
|
|
+ {error, {_, 400, _}},
|
|
|
+ emqx_mgmt_api_test_util:request_api(get, Path, "max_payload_bytes=0MB", AuthHeader)
|
|
|
).
|
|
|
|
|
|
+decode_payload(Payload, base64) ->
|
|
|
+ base64:decode(Payload);
|
|
|
+decode_payload(Payload, _) ->
|
|
|
+ Payload.
|
|
|
+
|
|
|
t_subscribe_shared_topic(_Config) ->
|
|
|
ClientId = <<"client_subscribe_shared">>,
|
|
|
|