|
|
@@ -19,6 +19,7 @@
|
|
|
-compile(nowarn_export_all).
|
|
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
+-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
|
|
|
|
-define(CLIENTID, <<"api_clientid">>).
|
|
|
-define(USERNAME, <<"api_username">>).
|
|
|
@@ -36,6 +37,16 @@ init_per_suite(Config) ->
|
|
|
end_per_suite(_) ->
|
|
|
emqx_mgmt_api_test_util:end_suite().
|
|
|
|
|
|
+init_per_testcase(Case, Config) ->
|
|
|
+ ?MODULE:Case({init, Config}).
|
|
|
+
|
|
|
+end_per_testcase(Case, Config) ->
|
|
|
+ ?MODULE:Case({'end', Config}).
|
|
|
+
|
|
|
+t_publish_api({init, Config}) ->
|
|
|
+ Config;
|
|
|
+t_publish_api({'end', _Config}) ->
|
|
|
+ ok;
|
|
|
t_publish_api(_) ->
|
|
|
{ok, Client} = emqtt:start_link(#{
|
|
|
username => <<"api_username">>, clientid => <<"api_clientid">>
|
|
|
@@ -48,11 +59,113 @@ t_publish_api(_) ->
|
|
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
Body = #{topic => ?TOPIC1, payload => Payload},
|
|
|
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
|
|
|
- ResponseMap = emqx_json:decode(Response, [return_maps]),
|
|
|
- ?assertEqual([<<"id">>], maps:keys(ResponseMap)),
|
|
|
+ ResponseMap = decode_json(Response),
|
|
|
+ ?assertEqual([<<"id">>], lists:sort(maps:keys(ResponseMap))),
|
|
|
?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
|
|
|
- emqtt:disconnect(Client).
|
|
|
+ emqtt:stop(Client).
|
|
|
+
|
|
|
+t_publish_no_subscriber({init, Config}) ->
|
|
|
+ Config;
|
|
|
+t_publish_no_subscriber({'end', _Config}) ->
|
|
|
+ ok;
|
|
|
+t_publish_no_subscriber(_) ->
|
|
|
+ Payload = <<"hello">>,
|
|
|
+ Path = emqx_mgmt_api_test_util:api_path(["publish"]),
|
|
|
+ Auth = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
+ Body = #{topic => ?TOPIC1, payload => Payload},
|
|
|
+ {ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
|
|
|
+ ResponseMap = decode_json(Response),
|
|
|
+ ?assertEqual([<<"message">>, <<"reason_code">>], lists:sort(maps:keys(ResponseMap))),
|
|
|
+ ?assertMatch(#{<<"reason_code">> := ?RC_NO_MATCHING_SUBSCRIBERS}, ResponseMap),
|
|
|
+ ok.
|
|
|
+
|
|
|
+t_publish_bad_topic({init, Config}) ->
|
|
|
+ Config;
|
|
|
+t_publish_bad_topic({'end', _Config}) ->
|
|
|
+ ok;
|
|
|
+t_publish_bad_topic(_) ->
|
|
|
+ Payload = <<"hello">>,
|
|
|
+ Path = emqx_mgmt_api_test_util:api_path(["publish"]),
|
|
|
+ Auth = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
+ Body = #{topic => <<"not/a+/valid/topic">>, payload => Payload},
|
|
|
+ ?assertMatch(
|
|
|
+ {error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body)
|
|
|
+ ).
|
|
|
+
|
|
|
+t_publish_bad_base64({init, Config}) ->
|
|
|
+ Config;
|
|
|
+t_publish_bad_base64({'end', _Config}) ->
|
|
|
+ ok;
|
|
|
+t_publish_bad_base64(_) ->
|
|
|
+ %% not a valid base64
|
|
|
+ Payload = <<"hello">>,
|
|
|
+ Path = emqx_mgmt_api_test_util:api_path(["publish"]),
|
|
|
+ Auth = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
+ Body = #{
|
|
|
+ topic => <<"not/a+/valid/topic">>, payload => Payload, payload_encoding => <<"base64">>
|
|
|
+ },
|
|
|
+ ?assertMatch(
|
|
|
+ {error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body)
|
|
|
+ ).
|
|
|
+
|
|
|
+t_publish_too_large({init, Config}) ->
|
|
|
+ MaxPacketSize = 100,
|
|
|
+ meck:new(emqx_config, [no_link, passthrough, no_history]),
|
|
|
+ meck:expect(emqx_config, get, fun
|
|
|
+ ([mqtt, max_packet_size]) ->
|
|
|
+ MaxPacketSize;
|
|
|
+ (Other) ->
|
|
|
+ meck:passthrough(Other)
|
|
|
+ end),
|
|
|
+ [{max_packet_size, MaxPacketSize} | Config];
|
|
|
+t_publish_too_large({'end', _Config}) ->
|
|
|
+ meck:unload(emqx_config),
|
|
|
+ ok;
|
|
|
+t_publish_too_large(Config) ->
|
|
|
+ MaxPacketSize = proplists:get_value(max_packet_size, Config),
|
|
|
+ Payload = lists:duplicate(MaxPacketSize, $0),
|
|
|
+ Path = emqx_mgmt_api_test_util:api_path(["publish"]),
|
|
|
+ Auth = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
+ Body = #{topic => <<"random/topic">>, payload => Payload},
|
|
|
+ {error, {Summary, _Headers, ResponseBody}} =
|
|
|
+ emqx_mgmt_api_test_util:request_api(
|
|
|
+ post,
|
|
|
+ Path,
|
|
|
+ "",
|
|
|
+ Auth,
|
|
|
+ Body,
|
|
|
+ #{return_body => true}
|
|
|
+ ),
|
|
|
+ ?assertMatch({_, 400, _}, Summary),
|
|
|
+ ?assertMatch(
|
|
|
+ #{
|
|
|
+ <<"reason_code">> := ?RC_QUOTA_EXCEEDED,
|
|
|
+ <<"message">> := <<"packet_too_large">>
|
|
|
+ },
|
|
|
+ decode_json(ResponseBody)
|
|
|
+ ),
|
|
|
+ ok.
|
|
|
+
|
|
|
+t_publish_bad_topic_bulk({init, Config}) ->
|
|
|
+ Config;
|
|
|
+t_publish_bad_topic_bulk({'end', _Config}) ->
|
|
|
+ ok;
|
|
|
+t_publish_bad_topic_bulk(_Config) ->
|
|
|
+ Payload = <<"hello">>,
|
|
|
+ Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
|
|
|
+ Auth = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
+ Body = [
|
|
|
+ #{topic => <<"not/a+/valid/topic">>, payload => Payload},
|
|
|
+ #{topic => <<"good/topic">>, payload => Payload}
|
|
|
+ ],
|
|
|
+ ?assertMatch(
|
|
|
+ {error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body)
|
|
|
+ ).
|
|
|
|
|
|
+t_publish_bulk_api({init, Config}) ->
|
|
|
+ Config;
|
|
|
+t_publish_bulk_api({'end', _Config}) ->
|
|
|
+ ok;
|
|
|
t_publish_bulk_api(_) ->
|
|
|
{ok, Client} = emqtt:start_link(#{
|
|
|
username => <<"api_username">>, clientid => <<"api_clientid">>
|
|
|
@@ -63,19 +176,135 @@ t_publish_bulk_api(_) ->
|
|
|
Payload = <<"hello">>,
|
|
|
Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
|
|
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
- Body = [#{topic => ?TOPIC1, payload => Payload}, #{topic => ?TOPIC2, payload => Payload}],
|
|
|
+ Body = [
|
|
|
+ #{
|
|
|
+ topic => ?TOPIC1,
|
|
|
+ payload => Payload,
|
|
|
+ payload_encoding => plain
|
|
|
+ },
|
|
|
+ #{
|
|
|
+ topic => ?TOPIC2,
|
|
|
+ payload => base64:encode(Payload),
|
|
|
+ payload_encoding => base64
|
|
|
+ }
|
|
|
+ ],
|
|
|
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
|
|
|
- ResponseList = emqx_json:decode(Response, [return_maps]),
|
|
|
+ ResponseList = decode_json(Response),
|
|
|
?assertEqual(2, erlang:length(ResponseList)),
|
|
|
lists:foreach(
|
|
|
fun(ResponseMap) ->
|
|
|
- ?assertEqual([<<"id">>], maps:keys(ResponseMap))
|
|
|
+ ?assertMatch(
|
|
|
+ [<<"id">>], lists:sort(maps:keys(ResponseMap))
|
|
|
+ )
|
|
|
end,
|
|
|
ResponseList
|
|
|
),
|
|
|
?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
|
|
|
?assertEqual(ok, receive_assert(?TOPIC2, 0, Payload)),
|
|
|
- emqtt:disconnect(Client).
|
|
|
+ emqtt:stop(Client).
|
|
|
+
|
|
|
+t_publish_no_subscriber_bulk({init, Config}) ->
|
|
|
+ Config;
|
|
|
+t_publish_no_subscriber_bulk({'end', _Config}) ->
|
|
|
+ ok;
|
|
|
+t_publish_no_subscriber_bulk(_) ->
|
|
|
+ {ok, Client} = emqtt:start_link(#{
|
|
|
+ username => <<"api_username">>, clientid => <<"api_clientid">>
|
|
|
+ }),
|
|
|
+ {ok, _} = emqtt:connect(Client),
|
|
|
+ {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
|
|
|
+ {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
|
|
|
+ Payload = <<"hello">>,
|
|
|
+ Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
|
|
|
+ Auth = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
+ Body = [
|
|
|
+ #{topic => ?TOPIC1, payload => Payload},
|
|
|
+ #{topic => ?TOPIC2, payload => Payload},
|
|
|
+ #{topic => <<"no/subscrbier/topic">>, payload => Payload}
|
|
|
+ ],
|
|
|
+ {ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
|
|
|
+ ResponseList = decode_json(Response),
|
|
|
+ ?assertMatch(
|
|
|
+ [
|
|
|
+ #{<<"id">> := _},
|
|
|
+ #{<<"id">> := _},
|
|
|
+ #{<<"message">> := <<"no_matching_subscribers">>}
|
|
|
+ ],
|
|
|
+ ResponseList
|
|
|
+ ),
|
|
|
+ ?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
|
|
|
+ ?assertEqual(ok, receive_assert(?TOPIC2, 0, Payload)),
|
|
|
+ emqtt:stop(Client).
|
|
|
+
|
|
|
+t_publish_bulk_dispatch_one_message_invalid_topic({init, Config}) ->
|
|
|
+ Config;
|
|
|
+t_publish_bulk_dispatch_one_message_invalid_topic({'end', _Config}) ->
|
|
|
+ ok;
|
|
|
+t_publish_bulk_dispatch_one_message_invalid_topic(Config) when is_list(Config) ->
|
|
|
+ Payload = <<"hello">>,
|
|
|
+ Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
|
|
|
+ Auth = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
+ Body = [
|
|
|
+ #{topic => ?TOPIC1, payload => Payload},
|
|
|
+ #{topic => ?TOPIC2, payload => Payload},
|
|
|
+ #{topic => <<"bad/#/topic">>, payload => Payload}
|
|
|
+ ],
|
|
|
+ {error, {Summary, _Headers, ResponseBody}} =
|
|
|
+ emqx_mgmt_api_test_util:request_api(
|
|
|
+ post,
|
|
|
+ Path,
|
|
|
+ "",
|
|
|
+ Auth,
|
|
|
+ Body,
|
|
|
+ #{return_body => true}
|
|
|
+ ),
|
|
|
+ ?assertMatch({_, 400, _}, Summary),
|
|
|
+ ?assertMatch(
|
|
|
+ #{<<"reason_code">> := ?RC_TOPIC_NAME_INVALID},
|
|
|
+ decode_json(ResponseBody)
|
|
|
+ ).
|
|
|
+
|
|
|
+t_publish_bulk_dispatch_failure({init, Config}) ->
|
|
|
+ meck:new(emqx, [no_link, passthrough, no_history]),
|
|
|
+ meck:expect(emqx, is_running, fun() -> false end),
|
|
|
+ Config;
|
|
|
+t_publish_bulk_dispatch_failure({'end', _Config}) ->
|
|
|
+ meck:unload(emqx),
|
|
|
+ ok;
|
|
|
+t_publish_bulk_dispatch_failure(Config) when is_list(Config) ->
|
|
|
+ {ok, Client} = emqtt:start_link(#{
|
|
|
+ username => <<"api_username">>, clientid => <<"api_clientid">>
|
|
|
+ }),
|
|
|
+ {ok, _} = emqtt:connect(Client),
|
|
|
+ {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
|
|
|
+ {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
|
|
|
+ Payload = <<"hello">>,
|
|
|
+ Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
|
|
|
+ Auth = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
+ Body = [
|
|
|
+ #{topic => ?TOPIC1, payload => Payload},
|
|
|
+ #{topic => ?TOPIC2, payload => Payload},
|
|
|
+ #{topic => <<"no/subscrbier/topic">>, payload => Payload}
|
|
|
+ ],
|
|
|
+ {error, {Summary, _Headers, ResponseBody}} =
|
|
|
+ emqx_mgmt_api_test_util:request_api(
|
|
|
+ post,
|
|
|
+ Path,
|
|
|
+ "",
|
|
|
+ Auth,
|
|
|
+ Body,
|
|
|
+ #{return_body => true}
|
|
|
+ ),
|
|
|
+ ?assertMatch({_, 503, _}, Summary),
|
|
|
+ ?assertMatch(
|
|
|
+ [
|
|
|
+ #{<<"reason_code">> := ?RC_IMPLEMENTATION_SPECIFIC_ERROR},
|
|
|
+ #{<<"reason_code">> := ?RC_IMPLEMENTATION_SPECIFIC_ERROR},
|
|
|
+ #{<<"reason_code">> := ?RC_NO_MATCHING_SUBSCRIBERS}
|
|
|
+ ],
|
|
|
+ decode_json(ResponseBody)
|
|
|
+ ),
|
|
|
+ emqtt:stop(Client).
|
|
|
|
|
|
receive_assert(Topic, Qos, Payload) ->
|
|
|
receive
|
|
|
@@ -90,3 +319,6 @@ receive_assert(Topic, Qos, Payload) ->
|
|
|
after 5000 ->
|
|
|
timeout
|
|
|
end.
|
|
|
+
|
|
|
+decode_json(In) ->
|
|
|
+ emqx_json:decode(In, [return_maps]).
|