|
|
@@ -796,7 +796,9 @@ t_publish_connection_down(Config0) ->
|
|
|
ok.
|
|
|
|
|
|
t_wrong_server(Config) ->
|
|
|
+ TypeBin = ?BRIDGE_TYPE_BIN,
|
|
|
Name = ?config(kinesis_name, Config),
|
|
|
+ KinesisConfig0 = ?config(kinesis_config, Config),
|
|
|
ResourceId = ?config(resource_id, Config),
|
|
|
Overrides =
|
|
|
#{
|
|
|
@@ -806,12 +808,57 @@ t_wrong_server(Config) ->
|
|
|
<<"health_check_interval">> => <<"60s">>
|
|
|
}
|
|
|
},
|
|
|
+ % probe
|
|
|
+ KinesisConfig = emqx_utils_maps:deep_merge(KinesisConfig0, Overrides),
|
|
|
+ Params = KinesisConfig#{<<"type">> => TypeBin, <<"name">> => Name},
|
|
|
+ ProbePath = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
|
|
|
+ AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
+ ?assertMatch(
|
|
|
+ {error, {_, 400, _}},
|
|
|
+ emqx_mgmt_api_test_util:request_api(post, ProbePath, "", AuthHeader, Params)
|
|
|
+ ),
|
|
|
+ % create
|
|
|
?wait_async_action(
|
|
|
create_bridge(Config, Overrides),
|
|
|
- #{?snk_kind := emqx_bridge_kinesis_impl_producer_start_ok},
|
|
|
+ #{?snk_kind := start_pool_failed},
|
|
|
30_000
|
|
|
),
|
|
|
- ?assertEqual({error, timeout}, emqx_resource_manager:health_check(ResourceId)),
|
|
|
- emqx_bridge_resource:stop(?BRIDGE_TYPE, Name),
|
|
|
- emqx_bridge_resource:remove(?BRIDGE_TYPE, Name),
|
|
|
+ ?assertMatch(
|
|
|
+ {ok, _, #{error := {start_pool_failed, ResourceId, _}}},
|
|
|
+ emqx_resource_manager:lookup_cached(ResourceId)
|
|
|
+ ),
|
|
|
+ ok.
|
|
|
+
|
|
|
+t_access_denied(Config) ->
|
|
|
+ TypeBin = ?BRIDGE_TYPE_BIN,
|
|
|
+ Name = ?config(kinesis_name, Config),
|
|
|
+ KinesisConfig = ?config(kinesis_config, Config),
|
|
|
+ ResourceId = ?config(resource_id, Config),
|
|
|
+ AccessError = {<<"AccessDeniedException">>, <<>>},
|
|
|
+ Params = KinesisConfig#{<<"type">> => TypeBin, <<"name">> => Name},
|
|
|
+ ProbePath = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
|
|
|
+ AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
+ emqx_common_test_helpers:with_mock(
|
|
|
+ erlcloud_kinesis,
|
|
|
+ list_streams,
|
|
|
+ fun() -> {error, AccessError} end,
|
|
|
+ fun() ->
|
|
|
+ % probe
|
|
|
+ ?assertMatch(
|
|
|
+ {error, {_, 400, _}},
|
|
|
+ emqx_mgmt_api_test_util:request_api(post, ProbePath, "", AuthHeader, Params)
|
|
|
+ ),
|
|
|
+ % create
|
|
|
+ ?wait_async_action(
|
|
|
+ create_bridge(Config),
|
|
|
+ #{?snk_kind := kinesis_init_failed},
|
|
|
+ 30_000
|
|
|
+ ),
|
|
|
+ ?assertMatch(
|
|
|
+ {ok, _, #{error := {start_pool_failed, ResourceId, AccessError}}},
|
|
|
+ emqx_resource_manager:lookup_cached(ResourceId)
|
|
|
+ ),
|
|
|
+ ok
|
|
|
+ end
|
|
|
+ ),
|
|
|
ok.
|