|
|
@@ -13,6 +13,9 @@
|
|
|
|
|
|
% Bridge defaults
|
|
|
-define(TOPIC, "TopicTest").
|
|
|
+-define(DENY_TOPIC, "DENY_TOPIC").
|
|
|
+-define(ACCESS_KEY, "RocketMQ").
|
|
|
+-define(SECRET_KEY, "12345678").
|
|
|
-define(BATCH_SIZE, 10).
|
|
|
-define(PAYLOAD, <<"HELLO">>).
|
|
|
|
|
|
@@ -25,17 +28,19 @@
|
|
|
all() ->
|
|
|
[
|
|
|
{group, async},
|
|
|
- {group, sync}
|
|
|
+ {group, sync},
|
|
|
+ {group, acl}
|
|
|
].
|
|
|
|
|
|
groups() ->
|
|
|
- TCs = emqx_common_test_helpers:all(?MODULE),
|
|
|
+ TCs = emqx_common_test_helpers:all(?MODULE) -- [t_acl_deny],
|
|
|
BatchingGroups = [{group, with_batch}, {group, without_batch}],
|
|
|
[
|
|
|
{async, BatchingGroups},
|
|
|
{sync, BatchingGroups},
|
|
|
{with_batch, TCs},
|
|
|
- {without_batch, TCs}
|
|
|
+ {without_batch, TCs},
|
|
|
+ {acl, [t_acl_deny]}
|
|
|
].
|
|
|
|
|
|
init_per_group(async, Config) ->
|
|
|
@@ -48,6 +53,9 @@ init_per_group(with_batch, Config0) ->
|
|
|
init_per_group(without_batch, Config0) ->
|
|
|
Config = [{batch_size, 1} | Config0],
|
|
|
common_init(Config);
|
|
|
+init_per_group(acl, Config0) ->
|
|
|
+ Config = [{batch_size, 1}, {query_mode, sync} | Config0],
|
|
|
+ common_init(Config);
|
|
|
init_per_group(_Group, Config) ->
|
|
|
Config.
|
|
|
|
|
|
@@ -137,6 +145,8 @@ rocketmq_config(BridgeType, Config) ->
|
|
|
"bridges.~s.~s {\n"
|
|
|
" enable = true\n"
|
|
|
" servers = ~p\n"
|
|
|
+ " access_key = ~p\n"
|
|
|
+ " secret_key = ~p\n"
|
|
|
" topic = ~p\n"
|
|
|
" resource_opts = {\n"
|
|
|
" request_timeout = 1500ms\n"
|
|
|
@@ -148,6 +158,8 @@ rocketmq_config(BridgeType, Config) ->
|
|
|
BridgeType,
|
|
|
Name,
|
|
|
Server,
|
|
|
+ ?ACCESS_KEY,
|
|
|
+ ?SECRET_KEY,
|
|
|
?TOPIC,
|
|
|
BatchSize,
|
|
|
QueryMode
|
|
|
@@ -271,3 +283,29 @@ t_simple_query(Config) ->
|
|
|
Result = query_resource(Config, Request),
|
|
|
?assertEqual(ok, Result),
|
|
|
ok.
|
|
|
+
|
|
|
+t_acl_deny(Config0) ->
|
|
|
+ RocketCfg = ?GET_CONFIG(rocketmq_config, Config0),
|
|
|
+ RocketCfg2 = RocketCfg#{<<"topic">> := ?DENY_TOPIC},
|
|
|
+ Config = lists:keyreplace(rocketmq_config, 1, Config0, {rocketmq_config, RocketCfg2}),
|
|
|
+ ?assertMatch(
|
|
|
+ {ok, _},
|
|
|
+ create_bridge(Config)
|
|
|
+ ),
|
|
|
+ SentData = #{payload => ?PAYLOAD},
|
|
|
+ ?check_trace(
|
|
|
+ begin
|
|
|
+ ?wait_async_action(
|
|
|
+ ?assertMatch({error, #{<<"code">> := 1}}, send_message(Config, SentData)),
|
|
|
+ #{?snk_kind := rocketmq_connector_query_return},
|
|
|
+ 10_000
|
|
|
+ ),
|
|
|
+ ok
|
|
|
+ end,
|
|
|
+ fun(Trace0) ->
|
|
|
+ Trace = ?of_kind(rocketmq_connector_query_return, Trace0),
|
|
|
+ ?assertMatch([#{error := #{<<"code">> := 1}}], Trace),
|
|
|
+ ok
|
|
|
+ end
|
|
|
+ ),
|
|
|
+ ok.
|