|
|
@@ -249,6 +249,7 @@ t_create_disconnected(Config) ->
|
|
|
?assertMatch({ok, _}, create_bridge(Config)),
|
|
|
health_check_resource_down(Config)
|
|
|
end),
|
|
|
+ timer:sleep(10_000),
|
|
|
health_check_resource_ok(Config),
|
|
|
ok.
|
|
|
|
|
|
@@ -317,7 +318,8 @@ t_simple_query(Config) ->
|
|
|
{ok, _},
|
|
|
create_bridge(Config)
|
|
|
),
|
|
|
- {Requests, Vals} = gen_batch_req(BatchSize),
|
|
|
+
|
|
|
+ {Requests, Vals} = gen_batch_req(Config, BatchSize),
|
|
|
?check_trace(
|
|
|
begin
|
|
|
?wait_async_action(
|
|
|
@@ -519,14 +521,15 @@ create_bridge_http(Params) ->
|
|
|
send_message(Config, Payload) ->
|
|
|
Name = ?config(sqlserver_name, Config),
|
|
|
BridgeType = ?config(sqlserver_bridge_type, Config),
|
|
|
- BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
|
|
|
- emqx_bridge:send_message(BridgeID, Payload).
|
|
|
+ ActionId = emqx_bridge_v2:id(BridgeType, Name),
|
|
|
+ emqx_bridge_v2:query(BridgeType, Name, {ActionId, Payload}, #{}).
|
|
|
|
|
|
query_resource(Config, Request) ->
|
|
|
Name = ?config(sqlserver_name, Config),
|
|
|
BridgeType = ?config(sqlserver_bridge_type, Config),
|
|
|
- ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
|
|
- emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
|
|
|
+ ID = emqx_bridge_v2:id(BridgeType, Name),
|
|
|
+ ResID = emqx_connector_resource:resource_id(BridgeType, Name),
|
|
|
+ emqx_resource:query(ID, Request, #{timeout => 1_000, connector_resource_id => ResID}).
|
|
|
|
|
|
query_resource_async(Config, Request) ->
|
|
|
Name = ?config(sqlserver_name, Config),
|
|
|
@@ -545,7 +548,10 @@ resource_id(Config) ->
|
|
|
_ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name).
|
|
|
|
|
|
health_check_resource_ok(Config) ->
|
|
|
- ?assertEqual({ok, connected}, emqx_resource_manager:health_check(resource_id(Config))).
|
|
|
+ BridgeType = ?config(sqlserver_bridge_type, Config),
|
|
|
+ Name = ?config(sqlserver_name, Config),
|
|
|
+ ?assertEqual({ok, connected}, emqx_resource_manager:health_check(resource_id(Config))),
|
|
|
+ ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)).
|
|
|
|
|
|
health_check_resource_down(Config) ->
|
|
|
case emqx_resource_manager:health_check(resource_id(Config)) of
|
|
|
@@ -666,13 +672,16 @@ sent_data(Payload) ->
|
|
|
qos => 0
|
|
|
}.
|
|
|
|
|
|
-gen_batch_req(Count) when
|
|
|
+gen_batch_req(Config, Count) when
|
|
|
is_integer(Count) andalso Count > 0
|
|
|
->
|
|
|
+ BridgeType = ?config(sqlserver_bridge_type, Config),
|
|
|
+ Name = ?config(sqlserver_name, Config),
|
|
|
+ ActionId = emqx_bridge_v2:id(BridgeType, Name),
|
|
|
Vals = [{str(erlang:unique_integer())} || _Seq <- lists:seq(1, Count)],
|
|
|
- Requests = [{send_message, sent_data(Payload)} || {Payload} <- Vals],
|
|
|
+ Requests = [{ActionId, sent_data(Payload)} || {Payload} <- Vals],
|
|
|
{Requests, Vals};
|
|
|
-gen_batch_req(Count) ->
|
|
|
+gen_batch_req(_Config, Count) ->
|
|
|
ct:pal("Gen batch requests failed with unexpected Count: ~p", [Count]).
|
|
|
|
|
|
str(List) when is_list(List) ->
|