Procházet zdrojové kódy

fix: project sub-fields in action/connector `resource_opts` for some bridges

Fixes https://emqx.atlassian.net/browse/EMQX-11589
Thales Macedo Garitezi před 2 roky
rodič
revize
a5d848515b

+ 1 - 5
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -216,12 +216,8 @@ create_bridge_api(Config, Overrides) ->
     BridgeName = ?config(bridge_name, Config),
     BridgeConfig0 = ?config(bridge_config, Config),
     BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
-    ConnectorName = ?config(connector_name, Config),
-    ConnectorType = ?config(connector_type, Config),
-    ConnectorConfig = ?config(connector_config, Config),
 
-    {ok, _Connector} =
-        emqx_connector:create(ConnectorType, ConnectorName, ConnectorConfig),
+    {ok, {{_, 201, _}, _, _}} = create_connector_api(Config),
 
     Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName},
     Path = emqx_mgmt_api_test_util:api_path(["actions"]),

+ 8 - 4
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_action_info.erl

@@ -27,10 +27,14 @@ bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
     ParamsKeys = producer_action_parameters_field_keys(),
     Config1 = maps:with(CommonActionKeys, BridgeV1Config),
     Params = maps:with(ParamsKeys, BridgeV1Config),
-    Config1#{
-        <<"connector">> => ConnectorName,
-        <<"parameters">> => Params
-    }.
+    emqx_utils_maps:update_if_present(
+        <<"resource_opts">>,
+        fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
+        Config1#{
+            <<"connector">> => ConnectorName,
+            <<"parameters">> => Params
+        }
+    ).
 
 %%------------------------------------------------------------------------------------------
 %% Internal helper fns

+ 4 - 1
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl

@@ -107,7 +107,10 @@ fields(Field) when
     Field == "put_connector";
     Field == "post_connector"
 ->
-    emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, fields("connection_fields"));
+    Fields =
+        fields("connection_fields") ++
+            emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
+    emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
 fields("get_bridge_v2") ->
     emqx_bridge_schema:status_fields() ++
         fields("post_bridge_v2");

+ 6 - 5
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_action_info.erl

@@ -30,7 +30,11 @@ bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
     ActionParametersKeys = schema_keys(action_parameters),
     ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
     ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
-    ActionConfig#{<<"connector">> => ConnectorName}.
+    emqx_utils_maps:update_if_present(
+        <<"resource_opts">>,
+        fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
+        ActionConfig#{<<"connector">> => ConnectorName}
+    ).
 
 bridge_v1_config_to_connector_config(BridgeV1Config) ->
     ActionTopLevelKeys = schema_keys(mongodb_action),
@@ -42,10 +46,7 @@ bridge_v1_config_to_connector_config(BridgeV1Config) ->
     ConnConfig0 = make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config),
     emqx_utils_maps:update_if_present(
         <<"resource_opts">>,
-        fun(ResourceOpts) ->
-            CommonROSubfields = emqx_connector_schema:common_resource_opts_subfields_bin(),
-            maps:with(CommonROSubfields, ResourceOpts)
-        end,
+        fun emqx_connector_schema:project_to_connector_resource_opts/1,
         ConnConfig0
     ).
 

+ 52 - 30
apps/emqx_bridge_mongodb/test/emqx_bridge_mongodb_SUITE.erl

@@ -58,10 +58,10 @@ init_per_group(Type = rs, Config) ->
     MongoPort = list_to_integer(os:getenv("MONGO_RS_PORT", "27017")),
     case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of
         true ->
-            ok = start_apps(),
-            emqx_mgmt_api_test_util:init_suite(),
+            Apps = start_apps(Config),
             {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, Config),
             [
+                {apps, Apps},
                 {mongo_host, MongoHost},
                 {mongo_port, MongoPort},
                 {mongo_config, MongoConfig},
@@ -77,10 +77,10 @@ init_per_group(Type = sharded, Config) ->
     MongoPort = list_to_integer(os:getenv("MONGO_SHARDED_PORT", "27017")),
     case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of
         true ->
-            ok = start_apps(),
-            emqx_mgmt_api_test_util:init_suite(),
+            Apps = start_apps(Config),
             {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, Config),
             [
+                {apps, Apps},
                 {mongo_host, MongoHost},
                 {mongo_port, MongoPort},
                 {mongo_config, MongoConfig},
@@ -96,8 +96,7 @@ init_per_group(Type = single, Config) ->
     MongoPort = list_to_integer(os:getenv("MONGO_SINGLE_PORT", "27017")),
     case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of
         true ->
-            ok = start_apps(),
-            emqx_mgmt_api_test_util:init_suite(),
+            Apps = start_apps(Config),
             %% NOTE: `mongo-single` has auth enabled, see `credentials.env`.
             AuthSource = bin(os:getenv("MONGO_AUTHSOURCE", "admin")),
             Username = bin(os:getenv("MONGO_USERNAME", "")),
@@ -113,6 +112,7 @@ init_per_group(Type = single, Config) ->
             ],
             {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, NConfig),
             [
+                {apps, Apps},
                 {mongo_host, MongoHost},
                 {mongo_port, MongoPort},
                 {mongo_config, MongoConfig},
@@ -124,6 +124,14 @@ init_per_group(Type = single, Config) ->
             {skip, no_mongo}
     end.
 
+end_per_group(Type, Config) when
+    Type =:= rs;
+    Type =:= sharded;
+    Type =:= single
+->
+    Apps = ?config(apps, Config),
+    emqx_cth_suite:stop(Apps),
+    ok;
 end_per_group(_Type, _Config) ->
     ok.
 
@@ -131,18 +139,6 @@ init_per_suite(Config) ->
     Config.
 
 end_per_suite(_Config) ->
-    emqx_mgmt_api_test_util:end_suite(),
-    ok = emqx_common_test_helpers:stop_apps(
-        [
-            emqx_management,
-            emqx_bridge_mongodb,
-            emqx_mongodb,
-            emqx_bridge,
-            emqx_connector,
-            emqx_rule_engine,
-            emqx_conf
-        ]
-    ),
     ok.
 
 init_per_testcase(_Testcase, Config) ->
@@ -162,23 +158,22 @@ end_per_testcase(_Testcase, Config) ->
 %% Helper fns
 %%------------------------------------------------------------------------------
 
-start_apps() ->
-    ensure_loaded(),
-    %% some configs in emqx_conf app are mandatory,
-    %% we want to make sure they are loaded before
-    %% ekka start in emqx_common_test_helpers:start_apps/1
-    emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
-    ok = emqx_common_test_helpers:start_apps(
+start_apps(Config) ->
+    Apps = emqx_cth_suite:start(
         [
+            emqx,
             emqx_conf,
-            emqx_rule_engine,
             emqx_connector,
             emqx_bridge,
-            emqx_mongodb,
             emqx_bridge_mongodb,
-            emqx_management
-        ]
-    ).
+            emqx_rule_engine,
+            emqx_management,
+            {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    {ok, _Api} = emqx_common_test_http:create_default_app(),
+    Apps.
 
 ensure_loaded() ->
     _ = application:load(emqtt),
@@ -221,6 +216,15 @@ mongo_config(MongoHost, MongoPort0, rs = Type, Config) ->
             "\n   resource_opts = {"
             "\n     query_mode = ~s"
             "\n     worker_pool_size = 1"
+            "\n     health_check_interval = 15s"
+            "\n     start_timeout = 5s"
+            "\n     start_after_created = true"
+            "\n     request_ttl = 45s"
+            "\n     inflight_window = 100"
+            "\n     max_buffer_bytes = 256MB"
+            "\n     buffer_mode = memory_only"
+            "\n     metrics_flush_interval = 5s"
+            "\n     resume_interval = 15s"
             "\n   }"
             "\n }",
             [
@@ -248,6 +252,15 @@ mongo_config(MongoHost, MongoPort0, sharded = Type, Config) ->
             "\n   resource_opts = {"
             "\n     query_mode = ~s"
             "\n     worker_pool_size = 1"
+            "\n     health_check_interval = 15s"
+            "\n     start_timeout = 5s"
+            "\n     start_after_created = true"
+            "\n     request_ttl = 45s"
+            "\n     inflight_window = 100"
+            "\n     max_buffer_bytes = 256MB"
+            "\n     buffer_mode = memory_only"
+            "\n     metrics_flush_interval = 5s"
+            "\n     resume_interval = 15s"
             "\n   }"
             "\n }",
             [
@@ -278,6 +291,15 @@ mongo_config(MongoHost, MongoPort0, single = Type, Config) ->
             "\n   resource_opts = {"
             "\n     query_mode = ~s"
             "\n     worker_pool_size = 1"
+            "\n     health_check_interval = 15s"
+            "\n     start_timeout = 5s"
+            "\n     start_after_created = true"
+            "\n     request_ttl = 45s"
+            "\n     inflight_window = 100"
+            "\n     max_buffer_bytes = 256MB"
+            "\n     buffer_mode = memory_only"
+            "\n     metrics_flush_interval = 5s"
+            "\n     resume_interval = 15s"
             "\n   }"
             "\n }",
             [

+ 21 - 3
apps/emqx_bridge_mongodb/test/emqx_bridge_v2_mongodb_SUITE.erl

@@ -144,7 +144,12 @@ connector_config(Name, Config) ->
             <<"srv_record">> => false,
             <<"username">> => Username,
             <<"password">> => iolist_to_binary(["file://", PassFile]),
-            <<"auth_source">> => AuthSource
+            <<"auth_source">> => AuthSource,
+            <<"resource_opts">> => #{
+                <<"health_check_interval">> => <<"15s">>,
+                <<"start_after_created">> => true,
+                <<"start_timeout">> => <<"5s">>
+            }
         },
     InnerConfigMap = serde_roundtrip(InnerConfigMap0),
     parse_and_check_connector_config(InnerConfigMap, Name).
@@ -166,8 +171,21 @@ bridge_config(Name, ConnectorId) ->
             <<"connector">> => ConnectorId,
             <<"parameters">> =>
                 #{},
-            <<"local_topic">> => <<"t/aeh">>
-            %%,
+            <<"local_topic">> => <<"t/mongo">>,
+            <<"resource_opts">> => #{
+                <<"batch_size">> => 1,
+                <<"batch_time">> => <<"0ms">>,
+                <<"buffer_mode">> => <<"memory_only">>,
+                <<"buffer_seg_bytes">> => <<"10MB">>,
+                <<"health_check_interval">> => <<"15s">>,
+                <<"inflight_window">> => 100,
+                <<"max_buffer_bytes">> => <<"256MB">>,
+                <<"metrics_flush_interval">> => <<"1s">>,
+                <<"query_mode">> => <<"sync">>,
+                <<"request_ttl">> => <<"45s">>,
+                <<"resume_interval">> => <<"15s">>,
+                <<"worker_pool_size">> => <<"1">>
+            }
         },
     InnerConfigMap = serde_roundtrip(InnerConfigMap0),
     parse_and_check_bridge_config(InnerConfigMap, Name).

+ 7 - 5
apps/emqx_bridge_mysql/src/emqx_bridge_mysql_action_info.erl

@@ -44,15 +44,17 @@ bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
     ActionParametersKeys = schema_keys(action_parameters),
     ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
     ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
-    ActionConfig#{<<"connector">> => ConnectorName}.
+    emqx_utils_maps:update_if_present(
+        <<"resource_opts">>,
+        fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
+        ActionConfig#{<<"connector">> => ConnectorName}
+    ).
 
 bridge_v1_config_to_connector_config(BridgeV1Config) ->
     ConnectorKeys = schema_keys("config_connector"),
-    ResourceOptsKeys = schema_keys(connector_resource_opts),
-    maps:update_with(
+    emqx_utils_maps:update_if_present(
         <<"resource_opts">>,
-        fun(ResourceOpts) -> maps:with(ResourceOptsKeys, ResourceOpts) end,
-        #{},
+        fun emqx_connector_schema:project_to_connector_resource_opts/1,
         maps:with(ConnectorKeys, BridgeV1Config)
     ).
 

+ 9 - 0
apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl

@@ -182,6 +182,15 @@ mysql_config(BridgeType, Config) ->
             "    batch_size = ~b\n"
             "    query_mode = ~s\n"
             "    worker_pool_size = ~b\n"
+            "    health_check_interval = 15s\n"
+            "    start_timeout = 5s\n"
+            "    inflight_window = 100\n"
+            "    max_buffer_bytes = 256MB\n"
+            "    buffer_mode = memory_only\n"
+            "    batch_time = 0\n"
+            "    metrics_flush_interval = 5s\n"
+            "    buffer_seg_bytes = 10MB\n"
+            "    start_after_created = true\n"
             "  }\n"
             "  ssl = {\n"
             "    enable = ~w\n"

+ 44 - 10
apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl

@@ -100,11 +100,18 @@ init_per_group(timescale, Config0) ->
 init_per_group(_Group, Config) ->
     Config.
 
-end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
-    connect_and_drop_table(Config),
+end_per_group(Group, Config) when
+    Group =:= with_batch;
+    Group =:= without_batch;
+    Group =:= matrix;
+    Group =:= timescale
+->
+    Apps = ?config(apps, Config),
     ProxyHost = ?config(proxy_host, Config),
     ProxyPort = ?config(proxy_port, Config),
+    connect_and_drop_table(Config),
     emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    ok = emqx_cth_suite:stop(Apps),
     ok;
 end_per_group(_Group, _Config) ->
     ok.
@@ -113,8 +120,6 @@ init_per_suite(Config) ->
     Config.
 
 end_per_suite(_Config) ->
-    emqx_mgmt_api_test_util:end_suite(),
-    ok = emqx_common_test_helpers:stop_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]),
     ok.
 
 init_per_testcase(_Testcase, Config) ->
@@ -147,14 +152,31 @@ common_init(Config0) ->
             ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
             emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
             % Ensure enterprise bridge module is loaded
-            ok = emqx_common_test_helpers:start_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]),
-            _ = emqx_bridge_enterprise:module_info(),
-            emqx_mgmt_api_test_util:init_suite(),
+            Apps = emqx_cth_suite:start(
+                [
+                    emqx,
+                    emqx_conf,
+                    emqx_connector,
+                    emqx_bridge,
+                    emqx_bridge_pgsql,
+                    emqx_rule_engine,
+                    emqx_management,
+                    {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
+                ],
+                #{work_dir => emqx_cth_suite:work_dir(Config0)}
+            ),
+            {ok, _Api} = emqx_common_test_http:create_default_app(),
+
+            %% ok = emqx_common_test_helpers:start_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]),
+            %% _ = emqx_bridge_enterprise:module_info(),
+            %% emqx_mgmt_api_test_util:init_suite(),
+
             % Connect to pgsql directly and create the table
             connect_and_create_table(Config0),
             {Name, PGConf} = pgsql_config(BridgeType, Config0),
             Config =
                 [
+                    {apps, Apps},
                     {pgsql_config, PGConf},
                     {pgsql_bridge_type, BridgeType},
                     {pgsql_name, Name},
@@ -198,6 +220,16 @@ pgsql_config(BridgeType, Config) ->
             "\n     request_ttl = 500ms"
             "\n     batch_size = ~b"
             "\n     query_mode = ~s"
+            "\n     worker_pool_size = 1"
+            "\n     health_check_interval = 15s"
+            "\n     start_after_created = true"
+            "\n     start_timeout = 5s"
+            "\n     inflight_window = 100"
+            "\n     max_buffer_bytes = 256MB"
+            "\n     buffer_seg_bytes = 10MB"
+            "\n     buffer_mode = memory_only"
+            "\n     metrics_flush_interval = 5s"
+            "\n     resume_interval = 15s"
             "\n   }"
             "\n   ssl = {"
             "\n     enable = ~w"
@@ -218,6 +250,9 @@ pgsql_config(BridgeType, Config) ->
         ),
     {Name, parse_and_check(ConfigString, BridgeType, Name)}.
 
+default_sql() ->
+    ?SQL_BRIDGE.
+
 create_passfile(BridgeType, Config) ->
     Filename = binary_to_list(BridgeType) ++ ".passfile",
     Filepath = filename:join(?config(priv_dir, Config), Filename),
@@ -689,14 +724,13 @@ t_missing_table(Config) ->
 t_table_removed(Config) ->
     Name = ?config(pgsql_name, Config),
     BridgeType = ?config(pgsql_bridge_type, Config),
-    %%ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
     ?check_trace(
         begin
             connect_and_create_table(Config),
             ?assertMatch({ok, _}, create_bridge(Config)),
             ?retry(
-                _Sleep = 1_000,
-                _Attempts = 20,
+                _Sleep = 100,
+                _Attempts = 200,
                 ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name))
             ),
             connect_and_drop_table(Config),

+ 233 - 0
apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl

@@ -0,0 +1,233 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_v2_pgsql_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-define(BRIDGE_TYPE, pgsql).
+-define(BRIDGE_TYPE_BIN, <<"pgsql">>).
+-define(CONNECTOR_TYPE, pgsql).
+-define(CONNECTOR_TYPE_BIN, <<"pgsql">>).
+
+-import(emqx_common_test_helpers, [on_exit/1]).
+-import(emqx_utils_conv, [bin/1]).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    PostgresHost = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
+    PostgresPort = list_to_integer(os:getenv("PGSQL_TCP_PORT", "5432")),
+    case emqx_common_test_helpers:is_tcp_server_available(PostgresHost, PostgresPort) of
+        true ->
+            Apps = emqx_cth_suite:start(
+                [
+                    emqx,
+                    emqx_conf,
+                    emqx_connector,
+                    emqx_bridge,
+                    emqx_bridge_pgsql,
+                    emqx_rule_engine,
+                    emqx_management,
+                    {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
+                ],
+                #{work_dir => emqx_cth_suite:work_dir(Config)}
+            ),
+            {ok, Api} = emqx_common_test_http:create_default_app(),
+            NConfig = [
+                {apps, Apps},
+                {api, Api},
+                {pgsql_host, PostgresHost},
+                {pgsql_port, PostgresPort},
+                {enable_tls, false},
+                {postgres_host, PostgresHost},
+                {postgres_port, PostgresPort}
+                | Config
+            ],
+            emqx_bridge_pgsql_SUITE:connect_and_create_table(NConfig),
+            NConfig;
+        false ->
+            case os:getenv("IS_CI") of
+                "yes" ->
+                    throw(no_postgres);
+                _ ->
+                    {skip, no_postgres}
+            end
+    end.
+
+end_per_suite(Config) ->
+    Apps = ?config(apps, Config),
+    emqx_cth_suite:stop(Apps),
+    ok.
+
+init_per_testcase(TestCase, Config) ->
+    common_init_per_testcase(TestCase, Config).
+
+common_init_per_testcase(TestCase, Config) ->
+    ct:timetrap(timer:seconds(60)),
+    emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
+    emqx_config:delete_override_conf_files(),
+    UniqueNum = integer_to_binary(erlang:unique_integer()),
+    Name = iolist_to_binary([atom_to_binary(TestCase), UniqueNum]),
+    Username = <<"root">>,
+    Password = <<"public">>,
+    Passfile = filename:join(?config(priv_dir, Config), "passfile"),
+    ok = file:write_file(Passfile, Password),
+    NConfig = [
+        {postgres_username, Username},
+        {postgres_password, Password},
+        {postgres_passfile, Passfile}
+        | Config
+    ],
+    ConnectorConfig = connector_config(Name, NConfig),
+    BridgeConfig = bridge_config(Name, Name),
+    ok = snabbkaffe:start_trace(),
+    [
+        {connector_type, ?CONNECTOR_TYPE},
+        {connector_name, Name},
+        {connector_config, ConnectorConfig},
+        {bridge_type, ?BRIDGE_TYPE},
+        {bridge_name, Name},
+        {bridge_config, BridgeConfig}
+        | NConfig
+    ].
+
+end_per_testcase(_Testcase, Config) ->
+    case proplists:get_bool(skip_does_not_apply, Config) of
+        true ->
+            ok;
+        false ->
+            emqx_bridge_pgsql_SUITE:connect_and_clear_table(Config),
+            emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
+            emqx_common_test_helpers:call_janitor(60_000),
+            ok = snabbkaffe:stop(),
+            ok
+    end.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+connector_config(Name, Config) ->
+    PostgresHost = ?config(postgres_host, Config),
+    PostgresPort = ?config(postgres_port, Config),
+    Username = ?config(postgres_username, Config),
+    PassFile = ?config(postgres_passfile, Config),
+    InnerConfigMap0 =
+        #{
+            <<"enable">> => true,
+            <<"database">> => <<"mqtt">>,
+            <<"server">> => iolist_to_binary([PostgresHost, ":", integer_to_binary(PostgresPort)]),
+            <<"pool_size">> => 8,
+            <<"username">> => Username,
+            <<"password">> => iolist_to_binary(["file://", PassFile]),
+            <<"resource_opts">> => #{
+                <<"health_check_interval">> => <<"15s">>,
+                <<"start_after_created">> => true,
+                <<"start_timeout">> => <<"5s">>
+            }
+        },
+    InnerConfigMap = serde_roundtrip(InnerConfigMap0),
+    parse_and_check_connector_config(InnerConfigMap, Name).
+
+parse_and_check_connector_config(InnerConfigMap, Name) ->
+    TypeBin = ?CONNECTOR_TYPE_BIN,
+    RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap}}},
+    #{<<"connectors">> := #{TypeBin := #{Name := Config}}} =
+        hocon_tconf:check_plain(emqx_connector_schema, RawConf, #{
+            required => false, atom_key => false
+        }),
+    ct:pal("parsed config: ~p", [Config]),
+    InnerConfigMap.
+
+bridge_config(Name, ConnectorId) ->
+    InnerConfigMap0 =
+        #{
+            <<"enable">> => true,
+            <<"connector">> => ConnectorId,
+            <<"parameters">> =>
+                #{<<"sql">> => emqx_bridge_pgsql_SUITE:default_sql()},
+            <<"local_topic">> => <<"t/postgres">>,
+            <<"resource_opts">> => #{
+                <<"batch_size">> => 1,
+                <<"batch_time">> => <<"0ms">>,
+                <<"buffer_mode">> => <<"memory_only">>,
+                <<"buffer_seg_bytes">> => <<"10MB">>,
+                <<"health_check_interval">> => <<"15s">>,
+                <<"inflight_window">> => 100,
+                <<"max_buffer_bytes">> => <<"256MB">>,
+                <<"metrics_flush_interval">> => <<"1s">>,
+                <<"query_mode">> => <<"sync">>,
+                <<"request_ttl">> => <<"45s">>,
+                <<"resume_interval">> => <<"15s">>,
+                <<"worker_pool_size">> => <<"1">>
+            }
+        },
+    InnerConfigMap = serde_roundtrip(InnerConfigMap0),
+    parse_and_check_bridge_config(InnerConfigMap, Name).
+
+%% check it serializes correctly
+serde_roundtrip(InnerConfigMap0) ->
+    IOList = hocon_pp:do(InnerConfigMap0, #{}),
+    {ok, InnerConfigMap} = hocon:binary(IOList),
+    InnerConfigMap.
+
+parse_and_check_bridge_config(InnerConfigMap, Name) ->
+    TypeBin = ?BRIDGE_TYPE_BIN,
+    RawConf = #{<<"bridges">> => #{TypeBin => #{Name => InnerConfigMap}}},
+    hocon_tconf:check_plain(emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}),
+    InnerConfigMap.
+
+make_message() ->
+    ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
+    Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
+    #{
+        clientid => ClientId,
+        payload => Payload,
+        timestamp => 1668602148000
+    }.
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_start_stop(Config) ->
+    emqx_bridge_v2_testlib:t_start_stop(Config, postgres_stopped),
+    ok.
+
+t_create_via_http(Config) ->
+    emqx_bridge_v2_testlib:t_create_via_http(Config),
+    ok.
+
+t_on_get_status(Config) ->
+    emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
+    ok.
+
+t_sync_query(Config) ->
+    ok = emqx_bridge_v2_testlib:t_sync_query(
+        Config,
+        fun make_message/0,
+        fun(Res) -> ?assertMatch({ok, _}, Res) end,
+        postgres_bridge_connector_on_query_return
+    ),
+    ok.

+ 3 - 4
apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl

@@ -44,12 +44,11 @@ bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
     ActionParametersKeys = schema_keys(emqx_bridge_redis:fields(action_parameters)),
     ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
     ActionConfig0 = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
-    ActionConfig = emqx_utils_maps:update_if_present(
+    emqx_utils_maps:update_if_present(
         <<"resource_opts">>,
         fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
-        ActionConfig0
-    ),
-    ActionConfig#{<<"connector">> => ConnectorName}.
+        ActionConfig0#{<<"connector">> => ConnectorName}
+    ).
 
 bridge_v1_config_to_connector_config(BridgeV1Config) ->
     ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(redis_action)),

+ 16 - 2
apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl

@@ -599,7 +599,14 @@ toxiproxy_redis_bridge_config() ->
             <<"worker_pool_size">> => <<"1">>,
             <<"batch_size">> => integer_to_binary(?BATCH_SIZE),
             <<"health_check_interval">> => <<"1s">>,
-            <<"start_timeout">> => <<"15s">>
+            <<"max_buffer_bytes">> => <<"256MB">>,
+            <<"buffer_seg_bytes">> => <<"10MB">>,
+            <<"request_ttl">> => <<"45s">>,
+            <<"inflight_window">> => <<"100">>,
+            <<"resume_interval">> => <<"1s">>,
+            <<"metrics_flush_interval">> => <<"1s">>,
+            <<"start_after_created">> => true,
+            <<"start_timeout">> => <<"5s">>
         }
     },
     maps:merge(Conf0, ?COMMON_REDIS_OPTS).
@@ -611,7 +618,14 @@ username_password_redis_bridge_config() ->
             <<"worker_pool_size">> => <<"1">>,
             <<"batch_size">> => integer_to_binary(?BATCH_SIZE),
             <<"health_check_interval">> => <<"1s">>,
-            <<"start_timeout">> => <<"15s">>
+            <<"max_buffer_bytes">> => <<"256MB">>,
+            <<"buffer_seg_bytes">> => <<"10MB">>,
+            <<"request_ttl">> => <<"45s">>,
+            <<"inflight_window">> => <<"100">>,
+            <<"resume_interval">> => <<"15s">>,
+            <<"metrics_flush_interval">> => <<"1s">>,
+            <<"start_after_created">> => true,
+            <<"start_timeout">> => <<"5s">>
         }
     },
     Conf1 = maps:merge(Conf0, ?COMMON_REDIS_OPTS),

+ 5 - 1
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -290,7 +290,11 @@ transform_bridge_v1_config_to_action_config(
     TopMap = maps:with(TopKeys, ActionMap1),
     RestMap = maps:without(TopKeys, ActionMap1),
     %% Other parameters should be stuffed into `parameters'
-    emqx_utils_maps:deep_merge(TopMap, #{<<"parameters">> => RestMap}).
+    emqx_utils_maps:update_if_present(
+        <<"resource_opts">>,
+        fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
+        emqx_utils_maps:deep_merge(TopMap, #{<<"parameters">> => RestMap})
+    ).
 
 generate_connector_name(ConnectorsMap, BridgeName, Attempt) ->
     ConnectorNameList =

+ 4 - 1
apps/emqx_postgresql/src/emqx_postgresql.erl

@@ -159,7 +159,9 @@ on_stop(InstId, State) ->
         connector => InstId
     }),
     close_connections(State),
-    emqx_resource_pool:stop(InstId).
+    Res = emqx_resource_pool:stop(InstId),
+    ?tp(postgres_stopped, #{instance_id => InstId}),
+    Res.
 
 close_connections(#{pool_name := PoolName} = _State) ->
     WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
@@ -301,6 +303,7 @@ on_query(
     Type = pgsql_query_type(TypeOrKey),
     {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
     Res = on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data),
+    ?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}),
     handle_result(Res).
 
 pgsql_query_type(sql) ->