|
|
@@ -83,9 +83,10 @@ end_per_suite(_Config) ->
|
|
|
ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
|
|
|
ok.
|
|
|
|
|
|
-init_per_testcase(_Testcase, Config) ->
|
|
|
+init_per_testcase(TestCase, Config) ->
|
|
|
create_table(Config),
|
|
|
- Config.
|
|
|
+ ok = snabbkaffe:start_trace(),
|
|
|
+ [{dynamo_name, atom_to_binary(TestCase)} | Config].
|
|
|
|
|
|
end_per_testcase(_Testcase, Config) ->
|
|
|
ProxyHost = ?config(proxy_host, Config),
|
|
|
@@ -93,7 +94,7 @@ end_per_testcase(_Testcase, Config) ->
|
|
|
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
|
|
ok = snabbkaffe:stop(),
|
|
|
delete_table(Config),
|
|
|
- delete_bridge(Config),
|
|
|
+ delete_all_bridges(),
|
|
|
ok.
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
@@ -186,15 +187,22 @@ parse_and_check(ConfigString, BridgeType, Name) ->
|
|
|
Config.
|
|
|
|
|
|
create_bridge(Config) ->
|
|
|
- BridgeType = ?config(dynamo_bridge_type, Config),
|
|
|
- Name = ?config(dynamo_name, Config),
|
|
|
- TDConfig = ?config(dynamo_config, Config),
|
|
|
- emqx_bridge:create(BridgeType, Name, TDConfig).
|
|
|
+ create_bridge(Config, _Overrides = #{}).
|
|
|
|
|
|
-delete_bridge(Config) ->
|
|
|
+create_bridge(Config, Overrides) ->
|
|
|
BridgeType = ?config(dynamo_bridge_type, Config),
|
|
|
Name = ?config(dynamo_name, Config),
|
|
|
- emqx_bridge:remove(BridgeType, Name).
|
|
|
+ DynamoConfig0 = ?config(dynamo_config, Config),
|
|
|
+ DynamoConfig = emqx_map_lib:deep_merge(DynamoConfig0, Overrides),
|
|
|
+ emqx_bridge:create(BridgeType, Name, DynamoConfig).
|
|
|
+
|
|
|
+delete_all_bridges() ->
|
|
|
+ lists:foreach(
|
|
|
+ fun(#{name := Name, type := Type}) ->
|
|
|
+ emqx_bridge:remove(Type, Name)
|
|
|
+ end,
|
|
|
+ emqx_bridge:list()
|
|
|
+ ).
|
|
|
|
|
|
create_bridge_http(Params) ->
|
|
|
Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
|
|
|
@@ -327,10 +335,12 @@ t_setup_via_http_api_and_publish(Config) ->
|
|
|
ok.
|
|
|
|
|
|
t_get_status(Config) ->
|
|
|
- ?assertMatch(
|
|
|
- {ok, _},
|
|
|
- create_bridge(Config)
|
|
|
- ),
|
|
|
+ {{ok, _}, {ok, _}} =
|
|
|
+ ?wait_async_action(
|
|
|
+ create_bridge(Config),
|
|
|
+ #{?snk_kind := resource_connected_enter},
|
|
|
+ 20_000
|
|
|
+ ),
|
|
|
|
|
|
ProxyPort = ?config(proxy_port, Config),
|
|
|
ProxyHost = ?config(proxy_host, Config),
|
|
|
@@ -359,7 +369,12 @@ t_write_failure(Config) ->
|
|
|
ProxyName = ?config(proxy_name, Config),
|
|
|
ProxyPort = ?config(proxy_port, Config),
|
|
|
ProxyHost = ?config(proxy_host, Config),
|
|
|
- {ok, _} = create_bridge(Config),
|
|
|
+ {{ok, _}, {ok, _}} =
|
|
|
+ ?wait_async_action(
|
|
|
+ create_bridge(Config),
|
|
|
+ #{?snk_kind := resource_connected_enter},
|
|
|
+ 20_000
|
|
|
+ ),
|
|
|
SentData = #{id => emqx_misc:gen_id(), payload => ?PAYLOAD},
|
|
|
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
|
|
?assertMatch(
|
|
|
@@ -372,7 +387,12 @@ t_write_timeout(Config) ->
|
|
|
ProxyName = ?config(proxy_name, Config),
|
|
|
ProxyPort = ?config(proxy_port, Config),
|
|
|
ProxyHost = ?config(proxy_host, Config),
|
|
|
- {ok, _} = create_bridge(Config),
|
|
|
+ {{ok, _}, {ok, _}} =
|
|
|
+ ?wait_async_action(
|
|
|
+ create_bridge(Config),
|
|
|
+ #{?snk_kind := resource_connected_enter},
|
|
|
+ 20_000
|
|
|
+ ),
|
|
|
SentData = #{id => emqx_misc:gen_id(), payload => ?PAYLOAD},
|
|
|
emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
|
|
|
?assertMatch(
|