|
|
@@ -267,7 +267,12 @@ parse_and_check(ConfigString, Name) ->
|
|
|
resource_id(Config) ->
|
|
|
Type = ?BRIDGE_TYPE_BIN,
|
|
|
Name = ?config(oracle_name, Config),
|
|
|
- emqx_bridge_resource:resource_id(Type, Name).
|
|
|
+ <<"connector:", Type/binary, ":", Name/binary>>.
|
|
|
+
|
|
|
+action_id(Config) ->
|
|
|
+ Type = ?BRIDGE_TYPE_BIN,
|
|
|
+ Name = ?config(oracle_name, Config),
|
|
|
+ emqx_bridge_v2:id(Type, Name).
|
|
|
|
|
|
bridge_id(Config) ->
|
|
|
Type = ?BRIDGE_TYPE_BIN,
|
|
|
@@ -378,6 +383,7 @@ create_rule_and_action_http(Config) ->
|
|
|
|
|
|
t_sync_query(Config) ->
|
|
|
ResourceId = resource_id(Config),
|
|
|
+ Name = ?config(oracle_name, Config),
|
|
|
?check_trace(
|
|
|
begin
|
|
|
reset_table(Config),
|
|
|
@@ -387,6 +393,18 @@ t_sync_query(Config) ->
|
|
|
_Attempts = 20,
|
|
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
|
|
),
|
|
|
+ ?retry(
|
|
|
+ _Sleep1 = 1_000,
|
|
|
+ _Attempts1 = 30,
|
|
|
+ ?assertMatch(
|
|
|
+ #{status := connected},
|
|
|
+ emqx_bridge_v2:health_check(
|
|
|
+ ?BRIDGE_TYPE_BIN,
|
|
|
+ Name
|
|
|
+ )
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ ActionId = action_id(Config),
|
|
|
MsgId = erlang:unique_integer(),
|
|
|
Params = #{
|
|
|
topic => ?config(mqtt_topic, Config),
|
|
|
@@ -394,7 +412,7 @@ t_sync_query(Config) ->
|
|
|
payload => ?config(oracle_name, Config),
|
|
|
retain => true
|
|
|
},
|
|
|
- Message = {send_message, Params},
|
|
|
+ Message = {ActionId, Params},
|
|
|
?assertEqual(
|
|
|
{ok, [{affected_rows, 1}]}, emqx_resource:simple_sync_query(ResourceId, Message)
|
|
|
),
|
|
|
@@ -409,7 +427,7 @@ t_batch_sync_query(Config) ->
|
|
|
ProxyHost = ?config(proxy_host, Config),
|
|
|
ProxyName = ?config(proxy_name, Config),
|
|
|
ResourceId = resource_id(Config),
|
|
|
- BridgeId = bridge_id(Config),
|
|
|
+ Name = ?config(oracle_name, Config),
|
|
|
?check_trace(
|
|
|
begin
|
|
|
reset_table(Config),
|
|
|
@@ -419,6 +437,17 @@ t_batch_sync_query(Config) ->
|
|
|
_Attempts = 30,
|
|
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
|
|
),
|
|
|
+ ?retry(
|
|
|
+ _Sleep = 1_000,
|
|
|
+ _Attempts = 30,
|
|
|
+ ?assertMatch(
|
|
|
+ #{status := connected},
|
|
|
+ emqx_bridge_v2:health_check(
|
|
|
+ ?BRIDGE_TYPE_BIN,
|
|
|
+ Name
|
|
|
+ )
|
|
|
+ )
|
|
|
+ ),
|
|
|
MsgId = erlang:unique_integer(),
|
|
|
Params = #{
|
|
|
topic => ?config(mqtt_topic, Config),
|
|
|
@@ -431,9 +460,9 @@ t_batch_sync_query(Config) ->
|
|
|
% be sent async as callback_mode is set to async_if_possible.
|
|
|
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
|
|
ct:sleep(1000),
|
|
|
- emqx_bridge:send_message(BridgeId, Params),
|
|
|
- emqx_bridge:send_message(BridgeId, Params),
|
|
|
- emqx_bridge:send_message(BridgeId, Params),
|
|
|
+ emqx_bridge_v2:send_message(?BRIDGE_TYPE_BIN, Name, Params, #{}),
|
|
|
+ emqx_bridge_v2:send_message(?BRIDGE_TYPE_BIN, Name, Params, #{}),
|
|
|
+ emqx_bridge_v2:send_message(?BRIDGE_TYPE_BIN, Name, Params, #{}),
|
|
|
ok
|
|
|
end),
|
|
|
% Wait for reconnection.
|
|
|
@@ -442,6 +471,17 @@ t_batch_sync_query(Config) ->
|
|
|
_Attempts = 30,
|
|
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
|
|
),
|
|
|
+ ?retry(
|
|
|
+ _Sleep = 1_000,
|
|
|
+ _Attempts = 30,
|
|
|
+ ?assertMatch(
|
|
|
+ #{status := connected},
|
|
|
+ emqx_bridge_v2:health_check(
|
|
|
+ ?BRIDGE_TYPE_BIN,
|
|
|
+ Name
|
|
|
+ )
|
|
|
+ )
|
|
|
+ ),
|
|
|
?retry(
|
|
|
_Sleep = 1_000,
|
|
|
_Attempts = 30,
|
|
|
@@ -506,6 +546,17 @@ t_start_stop(Config) ->
|
|
|
_Attempts = 20,
|
|
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
|
|
),
|
|
|
+ ?retry(
|
|
|
+ _Sleep = 1_000,
|
|
|
+ _Attempts = 20,
|
|
|
+ ?assertMatch(
|
|
|
+ #{status := connected},
|
|
|
+ emqx_bridge_v2:health_check(
|
|
|
+ ?BRIDGE_TYPE_BIN,
|
|
|
+ OracleName
|
|
|
+ )
|
|
|
+ )
|
|
|
+ ),
|
|
|
|
|
|
%% Check that the bridge probe API doesn't leak atoms.
|
|
|
ProbeRes0 = probe_bridge_api(
|
|
|
@@ -554,6 +605,7 @@ t_probe_with_nested_tokens(Config) ->
|
|
|
t_message_with_nested_tokens(Config) ->
|
|
|
BridgeId = bridge_id(Config),
|
|
|
ResourceId = resource_id(Config),
|
|
|
+ Name = ?config(oracle_name, Config),
|
|
|
reset_table(Config),
|
|
|
?assertMatch(
|
|
|
{ok, _},
|
|
|
@@ -568,6 +620,17 @@ t_message_with_nested_tokens(Config) ->
|
|
|
_Attempts = 20,
|
|
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
|
|
),
|
|
|
+ ?retry(
|
|
|
+ _Sleep = 1_000,
|
|
|
+ _Attempts = 20,
|
|
|
+ ?assertMatch(
|
|
|
+ #{status := connected},
|
|
|
+ emqx_bridge_v2:health_check(
|
|
|
+ ?BRIDGE_TYPE_BIN,
|
|
|
+ Name
|
|
|
+ )
|
|
|
+ )
|
|
|
+ ),
|
|
|
MsgId = erlang:unique_integer(),
|
|
|
Data = binary_to_list(?config(oracle_name, Config)),
|
|
|
Params = #{
|
|
|
@@ -600,6 +663,7 @@ t_on_get_status(Config) ->
|
|
|
ProxyPort = ?config(proxy_port, Config),
|
|
|
ProxyHost = ?config(proxy_host, Config),
|
|
|
ProxyName = ?config(proxy_name, Config),
|
|
|
+ Name = ?config(oracle_name, Config),
|
|
|
ResourceId = resource_id(Config),
|
|
|
reset_table(Config),
|
|
|
?assertMatch({ok, _}, create_bridge(Config)),
|
|
|
@@ -612,13 +676,23 @@ t_on_get_status(Config) ->
|
|
|
),
|
|
|
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
|
|
ct:sleep(500),
|
|
|
- ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId))
|
|
|
+ ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)),
|
|
|
+ ?assertMatch(
|
|
|
+ #{status := disconnected},
|
|
|
+ emqx_bridge_v2:health_check(?BRIDGE_TYPE_BIN, Name)
|
|
|
+ )
|
|
|
end),
|
|
|
%% Check that it recovers itself.
|
|
|
?retry(
|
|
|
_Sleep = 1_000,
|
|
|
_Attempts = 20,
|
|
|
- ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
|
|
+ begin
|
|
|
+ ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
|
|
|
+ ?assertMatch(
|
|
|
+ #{status := connected},
|
|
|
+ emqx_bridge_v2:health_check(?BRIDGE_TYPE_BIN, Name)
|
|
|
+ )
|
|
|
+ end
|
|
|
),
|
|
|
ok.
|
|
|
|
|
|
@@ -664,6 +738,7 @@ t_missing_table(Config) ->
|
|
|
begin
|
|
|
drop_table_if_exists(Config),
|
|
|
?assertMatch({ok, _}, create_bridge_api(Config)),
|
|
|
+ ActionId = emqx_bridge_v2:id(?BRIDGE_TYPE_BIN, ?config(oracle_name, Config)),
|
|
|
?retry(
|
|
|
_Sleep = 1_000,
|
|
|
_Attempts = 20,
|
|
|
@@ -679,7 +754,7 @@ t_missing_table(Config) ->
|
|
|
payload => ?config(oracle_name, Config),
|
|
|
retain => true
|
|
|
},
|
|
|
- Message = {send_message, Params},
|
|
|
+ Message = {ActionId, Params},
|
|
|
?assertMatch(
|
|
|
{error, {resource_error, #{reason := not_connected}}},
|
|
|
emqx_resource:simple_sync_query(ResourceId, Message)
|
|
|
@@ -698,6 +773,7 @@ t_table_removed(Config) ->
|
|
|
begin
|
|
|
reset_table(Config),
|
|
|
?assertMatch({ok, _}, create_bridge_api(Config)),
|
|
|
+ ActionId = emqx_bridge_v2:id(?BRIDGE_TYPE_BIN, ?config(oracle_name, Config)),
|
|
|
?retry(
|
|
|
_Sleep = 1_000,
|
|
|
_Attempts = 20,
|
|
|
@@ -711,7 +787,7 @@ t_table_removed(Config) ->
|
|
|
payload => ?config(oracle_name, Config),
|
|
|
retain => true
|
|
|
},
|
|
|
- Message = {send_message, Params},
|
|
|
+ Message = {ActionId, Params},
|
|
|
?assertEqual(
|
|
|
{error, {unrecoverable_error, {942, "ORA-00942: table or view does not exist\n"}}},
|
|
|
emqx_resource:simple_sync_query(ResourceId, Message)
|