Просмотр исходного кода

Merge pull request #12132 from sstrigler/EMQX-11154-bridge-v-2-my-sql-support

feat(emqx_bridge_mysql): port to shared connectors
Stefan Strigler 2 лет назад
Родитель
Сommit
5d7ae4b980

+ 1 - 1
apps/emqx_auth_mysql/src/emqx_auth_mysql.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_auth_mysql, [
     {description, "EMQX MySQL Authentication and Authorization"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, []},
     {mod, {emqx_auth_mysql_app, []}},
     {applications, [

+ 1 - 2
apps/emqx_auth_mysql/src/emqx_authn_mysql_schema.erl

@@ -55,8 +55,7 @@ fields(mysql) ->
         {password_hash_algorithm, fun emqx_authn_password_hashing:type_ro/1},
         {query, fun query/1},
         {query_timeout, fun query_timeout/1}
-    ] ++ emqx_authn_schema:common_fields() ++
-        proplists:delete(prepare_statement, emqx_mysql:fields(config)).
+    ] ++ emqx_authn_schema:common_fields() ++ emqx_mysql:fields(config).
 
 desc(mysql) ->
     ?DESC(mysql);

+ 1 - 0
apps/emqx_auth_mysql/src/emqx_authz_mysql_schema.erl

@@ -37,6 +37,7 @@ type() -> ?AUTHZ_TYPE.
 fields(mysql) ->
     emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++
         emqx_mysql:fields(config) ++
+        emqx_connector_schema_lib:prepare_statement_fields() ++
         [{query, query()}].
 
 desc(mysql) ->

+ 1 - 0
apps/emqx_bridge/src/emqx_action_info.erl

@@ -79,6 +79,7 @@ hard_coded_action_info_modules_ee() ->
         emqx_bridge_kafka_action_info,
         emqx_bridge_matrix_action_info,
         emqx_bridge_mongodb_action_info,
+        emqx_bridge_mysql_action_info,
         emqx_bridge_pgsql_action_info,
         emqx_bridge_syskeeper_action_info,
         emqx_bridge_timescale_action_info,

+ 50 - 1
apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl

@@ -31,7 +31,8 @@
     get_response/0,
     put_request/0,
     post_request/0,
-    examples/1
+    examples/1,
+    action_values/4
 ]).
 
 %% Exported for mocking
@@ -103,6 +104,54 @@ bridge_api_union(Refs) ->
             end
     end.
 
+-type http_method() :: get | post | put.
+-type schema_example_map() :: #{atom() => term()}.
+
+-spec action_values(http_method(), atom(), atom(), schema_example_map()) -> schema_example_map().
+action_values(Method, ActionType, ConnectorType, ActionValues) ->
+    ActionTypeBin = atom_to_binary(ActionType),
+    ConnectorTypeBin = atom_to_binary(ConnectorType),
+    lists:foldl(
+        fun(M1, M2) ->
+            maps:merge(M1, M2)
+        end,
+        #{
+            enable => true,
+            description => <<"My example ", ActionTypeBin/binary, " action">>,
+            connector => <<ConnectorTypeBin/binary, "_connector">>,
+            resource_opts => #{
+                health_check_interval => "30s"
+            }
+        },
+        [
+            ActionValues,
+            method_values(Method, ActionType)
+        ]
+    ).
+
+-spec method_values(http_method(), atom()) -> schema_example_map().
+method_values(post, Type) ->
+    TypeBin = atom_to_binary(Type),
+    #{
+        name => <<TypeBin/binary, "_action">>,
+        type => TypeBin
+    };
+method_values(get, Type) ->
+    maps:merge(
+        method_values(post, Type),
+        #{
+            status => <<"connected">>,
+            node_status => [
+                #{
+                    node => <<"emqx@localhost">>,
+                    status => <<"connected">>
+                }
+            ]
+        }
+    );
+method_values(put, _Type) ->
+    #{}.
+
 %%======================================================================================
 %% HOCON Schema Callbacks
 %%======================================================================================

+ 1 - 3
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl

@@ -36,9 +36,7 @@
 namespace() ->
     "bridge_mongodb".
 
-roots() ->
-    %% ???
-    [].
+roots() -> [].
 
 fields("config") ->
     [

+ 3 - 16
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_action_info.erl

@@ -31,9 +31,9 @@ connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
         maps:merge(
             maps:without(
                 [<<"connector">>],
-                map_unindent(<<"parameters">>, ActionConfig)
+                emqx_utils_maps:unindent(<<"parameters">>, ActionConfig)
             ),
-            map_unindent(<<"parameters">>, ConnectorConfig)
+            emqx_utils_maps:unindent(<<"parameters">>, ConnectorConfig)
         )
     ).
 
@@ -66,7 +66,7 @@ bridge_v1_config_to_connector_config(BridgeV1Config) ->
 
 make_config_map(PickKeys, IndentKeys, Config) ->
     Conf0 = maps:with(PickKeys, Config),
-    map_indent(<<"parameters">>, IndentKeys, Conf0).
+    emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0).
 
 bridge_v1_type_name() ->
     {fun ?MODULE:bridge_v1_type_name_fun/1, bridge_v1_type_names()}.
@@ -86,18 +86,5 @@ v1_type(<<"rs">>) -> mongodb_rs;
 v1_type(<<"sharded">>) -> mongodb_sharded;
 v1_type(<<"single">>) -> mongodb_single.
 
-map_unindent(Key, Map) ->
-    maps:merge(
-        maps:get(Key, Map),
-        maps:remove(Key, Map)
-    ).
-
-map_indent(IndentKey, PickKeys, Map) ->
-    maps:put(
-        IndentKey,
-        maps:with(PickKeys, Map),
-        maps:without(PickKeys, Map)
-    ).
-
 schema_keys(Name) ->
     [bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))].

+ 1 - 1
apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src

@@ -9,7 +9,7 @@
         emqx_resource,
         emqx_mysql
     ]},
-    {env, []},
+    {env, [{emqx_action_info_modules, [emqx_bridge_mysql_action_info]}]},
     {modules, []},
     {links, []}
 ]}.

+ 101 - 4
apps/emqx_bridge_mysql/src/emqx_bridge_mysql.erl

@@ -10,7 +10,9 @@
 -import(hoconsc, [mk/2, enum/1, ref/2]).
 
 -export([
-    conn_bridge_examples/1
+    bridge_v2_examples/1,
+    conn_bridge_examples/1,
+    connector_examples/1
 ]).
 
 -export([
@@ -20,6 +22,9 @@
     desc/1
 ]).
 
+-define(CONNECTOR_TYPE, mysql).
+-define(ACTION_TYPE, ?CONNECTOR_TYPE).
+
 -define(DEFAULT_SQL, <<
     "insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) "
     "values (${id}, ${topic}, ${qos}, ${payload}, FROM_UNIXTIME(${timestamp}/1000))"
@@ -28,6 +33,22 @@
 %% -------------------------------------------------------------------------------------------------
 %% api
 
+bridge_v2_examples(Method) ->
+    [
+        #{
+            <<"mysql">> =>
+                #{
+                    summary => <<"MySQL Action">>,
+                    value => emqx_bridge_v2_schema:action_values(
+                        Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
+                    )
+                }
+        }
+    ].
+
+action_values() ->
+    #{parameters => #{sql => ?DEFAULT_SQL}}.
+
 conn_bridge_examples(Method) ->
     [
         #{
@@ -38,6 +59,29 @@ conn_bridge_examples(Method) ->
         }
     ].
 
+connector_examples(Method) ->
+    [
+        #{
+            <<"mysql">> =>
+                #{
+                    summary => <<"MySQL Connector">>,
+                    value => emqx_connector_schema:connector_values(
+                        Method, ?CONNECTOR_TYPE, connector_values()
+                    )
+                }
+        }
+    ].
+
+connector_values() ->
+    #{
+        server => <<"127.0.0.1:3306">>,
+        database => <<"test">>,
+        pool_size => 8,
+        username => <<"root">>,
+        password => <<"******">>,
+        resource_opts => #{health_check_interval => <<"20s">>}
+    }.
+
 values(_Method) ->
     #{
         enable => true,
@@ -80,17 +124,70 @@ fields("config") ->
                 #{desc => ?DESC("local_topic"), default => undefined}
             )}
     ] ++ emqx_resource_schema:fields("resource_opts") ++
-        (emqx_mysql:fields(config) --
-            emqx_connector_schema_lib:prepare_statement_fields());
+        emqx_mysql:fields(config);
+fields(action) ->
+    {mysql,
+        mk(
+            hoconsc:map(name, ref(?MODULE, mysql_action)),
+            #{desc => <<"MySQL Action Config">>, required => false}
+        )};
+fields(mysql_action) ->
+    emqx_bridge_v2_schema:make_producer_action_schema(
+        mk(
+            ref(?MODULE, action_parameters),
+            #{
+                required => true, desc => ?DESC(action_parameters)
+            }
+        )
+    );
+fields(action_parameters) ->
+    [
+        {sql,
+            mk(
+                binary(),
+                #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
+            )}
+    ];
+fields("config_connector") ->
+    emqx_connector_schema:common_fields() ++
+        emqx_mysql:fields(config) ++
+        emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
+fields(connector_resource_opts) ->
+    emqx_connector_schema:resource_opts_fields();
 fields("post") ->
     [type_field(), name_field() | fields("config")];
 fields("put") ->
     fields("config");
 fields("get") ->
-    emqx_bridge_schema:status_fields() ++ fields("post").
+    emqx_bridge_schema:status_fields() ++ fields("post");
+fields("get_bridge_v2") ->
+    emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2");
+fields("post_bridge_v2") ->
+    [type_field(), name_field() | fields(mysql_action)];
+fields("put_bridge_v2") ->
+    fields(mysql_action);
+fields(Field) when
+    Field == "get_connector";
+    Field == "put_connector";
+    Field == "post_connector"
+->
+    emqx_connector_schema:api_fields(
+        Field,
+        ?CONNECTOR_TYPE,
+        emqx_mysql:fields(config) ++
+            emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts)
+    ).
 
 desc("config") ->
     ?DESC("desc_config");
+desc("config_connector") ->
+    ?DESC("desc_config");
+desc(connector_resource_opts) ->
+    ?DESC(emqx_resource_schema, "resource_opts");
+desc(action_parameters) ->
+    ?DESC(action_parameters);
+desc(mysql_action) ->
+    ?DESC(mysql_action);
 desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
     ["Configuration for MySQL using `", string:to_upper(Method), "` method."];
 desc(_) ->

+ 64 - 0
apps/emqx_bridge_mysql/src/emqx_bridge_mysql_action_info.erl

@@ -0,0 +1,64 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_mysql_action_info).
+
+-behaviour(emqx_action_info).
+
+%% behaviour callbacks
+-export([
+    action_type_name/0,
+    bridge_v1_config_to_action_config/2,
+    bridge_v1_config_to_connector_config/1,
+    bridge_v1_type_name/0,
+    connector_action_config_to_bridge_v1_config/2,
+    connector_type_name/0,
+    schema_module/0
+]).
+
+-import(emqx_utils_conv, [bin/1]).
+
+-define(MYSQL_TYPE, mysql).
+-define(SCHEMA_MODULE, emqx_bridge_mysql).
+
+action_type_name() -> ?MYSQL_TYPE.
+bridge_v1_type_name() -> ?MYSQL_TYPE.
+connector_type_name() -> ?MYSQL_TYPE.
+
+schema_module() -> ?SCHEMA_MODULE.
+
+connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
+    MergedConfig =
+        emqx_utils_maps:deep_merge(
+            maps:without(
+                [<<"connector">>],
+                emqx_utils_maps:unindent(<<"parameters">>, ActionConfig)
+            ),
+            ConnectorConfig
+        ),
+    BridgeV1Keys = schema_keys("config"),
+    maps:with(BridgeV1Keys, MergedConfig).
+
+bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
+    ActionTopLevelKeys = schema_keys(mysql_action),
+    ActionParametersKeys = schema_keys(action_parameters),
+    ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
+    ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
+    ActionConfig#{<<"connector">> => ConnectorName}.
+
+bridge_v1_config_to_connector_config(BridgeV1Config) ->
+    ConnectorKeys = schema_keys("config_connector"),
+    ResourceOptsKeys = schema_keys(connector_resource_opts),
+    maps:update_with(
+        <<"resource_opts">>,
+        fun(ResourceOpts) -> maps:with(ResourceOptsKeys, ResourceOpts) end,
+        #{},
+        maps:with(ConnectorKeys, BridgeV1Config)
+    ).
+
+make_config_map(PickKeys, IndentKeys, Config) ->
+    Conf0 = maps:with(PickKeys, Config),
+    emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0).
+
+schema_keys(Name) ->
+    [bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))].

+ 150 - 0
apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl

@@ -0,0 +1,150 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_mysql_connector).
+
+-behaviour(emqx_resource).
+
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+%% `emqx_resource' API
+-export([
+    on_remove_channel/3,
+    callback_mode/0,
+    on_add_channel/4,
+    on_batch_query/3,
+    on_get_channel_status/3,
+    on_get_channels/1,
+    on_get_status/2,
+    on_query/3,
+    on_start/2,
+    on_stop/2
+]).
+
+%%========================================================================================
+%% `emqx_resource' API
+%%========================================================================================
+
+callback_mode() -> emqx_mysql:callback_mode().
+
+on_add_channel(
+    _InstanceId,
+    #{channels := Channels, connector_state := ConnectorState} = State0,
+    ChannelId,
+    ChannelConfig0
+) ->
+    ChannelConfig1 = emqx_utils_maps:unindent(parameters, ChannelConfig0),
+    QueryTemplates = emqx_mysql:parse_prepare_sql(ChannelId, ChannelConfig1),
+    ChannelConfig2 = maps:merge(ChannelConfig1, QueryTemplates),
+    ChannelConfig = set_prepares(ChannelConfig2, ConnectorState),
+    State = State0#{
+        channels => maps:put(ChannelId, ChannelConfig, Channels),
+        connector_state => ConnectorState
+    },
+    {ok, State}.
+
+on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
+    case maps:get(ChannelId, Channels) of
+        #{prepares := ok} ->
+            connected;
+        #{prepares := {error, _}} ->
+            connecting
+    end.
+
+on_get_channels(InstanceId) ->
+    emqx_bridge_v2:get_channels_for_connector(InstanceId).
+
+on_get_status(InstanceId, #{channels := Channels0, connector_state := ConnectorState} = State0) ->
+    case emqx_mysql:on_get_status(InstanceId, ConnectorState) of
+        WithState when is_tuple(WithState) ->
+            NewConnectorState = element(2, WithState),
+            State = State0#{connector_state => NewConnectorState},
+            setelement(2, WithState, State);
+        connected ->
+            Channels =
+                maps:map(
+                    fun
+                        (_ChannelId, #{prepares := ok} = ChannelConfig) ->
+                            ChannelConfig;
+                        (_ChannelId, #{prepares := {error, _}} = ChannelConfig) ->
+                            set_prepares(ChannelConfig, ConnectorState)
+                    end,
+                    Channels0
+                ),
+            State = State0#{channels => Channels},
+            {connected, State};
+        Other ->
+            Other
+    end.
+
+on_query(InstId, {TypeOrKey, SQLOrKey}, State) ->
+    on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, State);
+on_query(InstId, {TypeOrKey, SQLOrKey, Params}, State) ->
+    on_query(InstId, {TypeOrKey, SQLOrKey, Params, default_timeout}, State);
+on_query(
+    InstanceId,
+    {Channel, _Message, _Params, _Timeout} = Request,
+    #{channels := Channels, connector_state := ConnectorState}
+) when is_binary(Channel) ->
+    ChannelConfig = maps:get(Channel, Channels),
+    Result = emqx_mysql:on_query(
+        InstanceId,
+        Request,
+        maps:merge(ConnectorState, ChannelConfig)
+    ),
+    ?tp(mysql_connector_on_query_return, #{instance_id => InstanceId, result => Result}),
+    Result;
+on_query(InstanceId, Request, _State = #{channels := _Channels, connector_state := ConnectorState}) ->
+    emqx_mysql:on_query(InstanceId, Request, ConnectorState).
+
+on_batch_query(
+    InstanceId,
+    [Req | _] = BatchRequest,
+    #{channels := Channels, connector_state := ConnectorState}
+) when is_binary(element(1, Req)) ->
+    Channel = element(1, Req),
+    ChannelConfig = maps:get(Channel, Channels),
+    Result = emqx_mysql:on_batch_query(
+        InstanceId,
+        BatchRequest,
+        maps:merge(ConnectorState, ChannelConfig)
+    ),
+    ?tp(mysql_connector_on_batch_query_return, #{instance_id => InstanceId, result => Result}),
+    Result;
+on_batch_query(InstanceId, BatchRequest, _State = #{connector_state := ConnectorState}) ->
+    emqx_mysql:on_batch_query(InstanceId, BatchRequest, ConnectorState).
+
+on_remove_channel(
+    _InstanceId, #{channels := Channels, connector_state := ConnectorState} = State, ChannelId
+) ->
+    ChannelConfig = maps:get(ChannelId, Channels),
+    emqx_mysql:unprepare_sql(maps:merge(ChannelConfig, ConnectorState)),
+    NewState = State#{channels => maps:remove(ChannelId, Channels)},
+    {ok, NewState}.
+
+-spec on_start(binary(), hocon:config()) ->
+    {ok, #{connector_state := emqx_mysql:state(), channels := map()}} | {error, _}.
+on_start(InstanceId, Config) ->
+    case emqx_mysql:on_start(InstanceId, Config) of
+        {ok, ConnectorState} ->
+            State = #{
+                connector_state => ConnectorState,
+                channels => #{}
+            },
+            {ok, State};
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+on_stop(InstanceId, _State = #{connector_state := ConnectorState}) ->
+    ok = emqx_mysql:on_stop(InstanceId, ConnectorState),
+    ?tp(mysql_connector_stopped, #{instance_id => InstanceId}),
+    ok.
+
+%%========================================================================================
+%% Helper fns
+%%========================================================================================
+set_prepares(ChannelConfig, ConnectorState) ->
+    #{prepares := Prepares} =
+        emqx_mysql:init_prepare(maps:merge(ConnectorState, ChannelConfig)),
+    ChannelConfig#{prepares => Prepares}.

+ 65 - 21
apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl

@@ -242,13 +242,12 @@ send_message(Config, Payload) ->
 query_resource(Config, Request) ->
     Name = ?config(mysql_name, Config),
     BridgeType = ?config(mysql_bridge_type, Config),
-    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
-    emqx_resource:query(ResourceID, Request, #{timeout => 500}).
+    emqx_bridge_v2:query(BridgeType, Name, Request, #{timeout => 500}).
 
 sync_query_resource(Config, Request) ->
     Name = ?config(mysql_name, Config),
     BridgeType = ?config(mysql_bridge_type, Config),
-    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+    ResourceID = emqx_bridge_v2:id(BridgeType, Name),
     emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request).
 
 query_resource_async(Config, Request) ->
@@ -256,8 +255,7 @@ query_resource_async(Config, Request) ->
     BridgeType = ?config(mysql_bridge_type, Config),
     Ref = alias([reply]),
     AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
-    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
-    Return = emqx_resource:query(ResourceID, Request, #{
+    Return = emqx_bridge_v2:query(BridgeType, Name, Request, #{
         timeout => 500, async_reply_fun => {AsyncReplyFun, []}
     }),
     {Return, Ref}.
@@ -274,7 +272,9 @@ unprepare(Config, Key) ->
     Name = ?config(mysql_name, Config),
     BridgeType = ?config(mysql_bridge_type, Config),
     ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
-    {ok, _, #{state := #{pool_name := PoolName}}} = emqx_resource:get_instance(ResourceID),
+    {ok, _, #{state := #{connector_state := #{pool_name := PoolName}}}} = emqx_resource:get_instance(
+        ResourceID
+    ),
     [
         begin
             {ok, Conn} = ecpool_worker:client(Worker),
@@ -343,6 +343,17 @@ create_rule_and_action_http(Config) ->
             Error
     end.
 
+request_api_status(BridgeId) ->
+    Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader) of
+        {ok, Res0} ->
+            #{<<"status">> := Status} = _Res = emqx_utils_json:decode(Res0, [return_maps]),
+            {ok, binary_to_existing_atom(Status)};
+        Error ->
+            Error
+    end.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -519,14 +530,18 @@ t_write_timeout(Config) ->
         2 * Timeout
     ),
     emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
+        Name = ?config(mysql_name, Config),
+        BridgeType = ?config(mysql_bridge_type, Config),
+        ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+
         case QueryMode of
             sync ->
                 ?assertMatch(
                     {error, {resource_error, #{reason := timeout}}},
-                    query_resource(Config, {send_message, SentData, [], Timeout})
+                    query_resource(Config, {ResourceID, SentData, [], Timeout})
                 );
             async ->
-                query_resource(Config, {send_message, SentData, [], Timeout}),
+                query_resource(Config, {ResourceID, SentData, [], Timeout}),
                 ok
         end,
         ok
@@ -703,7 +718,10 @@ t_uninitialized_prepared_statement(Config) ->
     ),
     Val = integer_to_binary(erlang:unique_integer()),
     SentData = #{payload => Val, timestamp => 1668602148000},
-    unprepare(Config, send_message),
+    Name = ?config(mysql_name, Config),
+    BridgeType = ?config(mysql_bridge_type, Config),
+    ResourceID = emqx_bridge_v2:id(BridgeType, Name),
+    unprepare(Config, ResourceID),
     ?check_trace(
         begin
             {Res, {ok, _}} =
@@ -721,7 +739,7 @@ t_uninitialized_prepared_statement(Config) ->
                     #{?snk_kind := mysql_connector_prepare_query_failed, error := not_prepared},
                     #{
                         ?snk_kind := mysql_connector_on_query_prepared_sql,
-                        type_or_key := send_message
+                        type_or_key := ResourceID
                     },
                     Trace
                 )
@@ -736,33 +754,58 @@ t_uninitialized_prepared_statement(Config) ->
     ok.
 
 t_missing_table(Config) ->
+    QueryMode = ?config(query_mode, Config),
     Name = ?config(mysql_name, Config),
     BridgeType = ?config(mysql_bridge_type, Config),
-    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
 
     ?check_trace(
         begin
             connect_and_drop_table(Config),
             ?assertMatch({ok, _}, create_bridge(Config)),
+            BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
             ?retry(
                 _Sleep = 1_000,
                 _Attempts = 20,
                 ?assertMatch(
                     {ok, Status} when Status == connecting orelse Status == disconnected,
-                    emqx_resource_manager:health_check(ResourceID)
+                    request_api_status(BridgeID)
                 )
             ),
             Val = integer_to_binary(erlang:unique_integer()),
             SentData = #{payload => Val, timestamp => 1668602148000},
-            Timeout = 1000,
-            ?assertMatch(
-                {error, {resource_error, #{reason := unhealthy_target}}},
-                query_resource(Config, {send_message, SentData, [], Timeout})
-            ),
+            %Timeout = 1000,
+            ResourceID = emqx_bridge_v2:id(BridgeType, Name),
+            Request = {ResourceID, SentData},
+            Result =
+                case QueryMode of
+                    sync ->
+                        query_resource(Config, Request);
+                    async ->
+                        {_, Ref} = query_resource_async(Config, Request),
+                        {ok, Res} = receive_result(Ref, 2_000),
+                        Res
+                end,
+
+            BatchSize = ?config(batch_size, Config),
+            IsBatch = BatchSize > 1,
+            case IsBatch of
+                true ->
+                    ?assertMatch(
+                        {error,
+                            {unrecoverable_error,
+                                {1146, <<"42S02">>, <<"Table 'mqtt.mqtt_test' doesn't exist">>}}},
+                        Result
+                    );
+                false ->
+                    ?assertMatch(
+                        {error, undefined_table},
+                        Result
+                    )
+            end,
             ok
         end,
         fun(Trace) ->
-            ?assertMatch([_, _, _], ?of_kind(mysql_undefined_table, Trace)),
+            ?assertMatch([_ | _], ?of_kind(mysql_undefined_table, Trace)),
             ok
         end
     ).
@@ -770,9 +813,9 @@ t_missing_table(Config) ->
 t_table_removed(Config) ->
     Name = ?config(mysql_name, Config),
     BridgeType = ?config(mysql_bridge_type, Config),
-    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
     connect_and_create_table(Config),
     ?assertMatch({ok, _}, create_bridge(Config)),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
     ?retry(
         _Sleep = 1_000,
         _Attempts = 20,
@@ -782,17 +825,17 @@ t_table_removed(Config) ->
     Val = integer_to_binary(erlang:unique_integer()),
     SentData = #{payload => Val, timestamp => 1668602148000},
     Timeout = 1000,
+    ActionID = emqx_bridge_v2:id(BridgeType, Name),
     ?assertMatch(
         {error,
             {unrecoverable_error, {1146, <<"42S02">>, <<"Table 'mqtt.mqtt_test' doesn't exist">>}}},
-        sync_query_resource(Config, {send_message, SentData, [], Timeout})
+        sync_query_resource(Config, {ActionID, SentData, [], Timeout})
     ),
     ok.
 
 t_nested_payload_template(Config) ->
     Name = ?config(mysql_name, Config),
     BridgeType = ?config(mysql_bridge_type, Config),
-    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
     Value = integer_to_binary(erlang:unique_integer()),
     {ok, _} = create_bridge(
         Config,
@@ -803,6 +846,7 @@ t_nested_payload_template(Config) ->
         }
     ),
     {ok, #{<<"from">> := [Topic]}} = create_rule_and_action_http(Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
     ?retry(
         _Sleep = 1_000,
         _Attempts = 20,

+ 30 - 18
apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl

@@ -34,6 +34,8 @@ resource_type(matrix) ->
     emqx_postgresql;
 resource_type(mongodb) ->
     emqx_bridge_mongodb_connector;
+resource_type(mysql) ->
+    emqx_bridge_mysql_connector;
 resource_type(pgsql) ->
     emqx_postgresql;
 resource_type(syskeeper_forwarder) ->
@@ -94,27 +96,27 @@ connector_structs() ->
                     required => false
                 }
             )},
-        {mongodb,
+        {matrix,
             mk(
-                hoconsc:map(name, ref(emqx_bridge_mongodb, "config_connector")),
+                hoconsc:map(name, ref(emqx_bridge_matrix, "config_connector")),
                 #{
-                    desc => <<"MongoDB Connector Config">>,
+                    desc => <<"Matrix Connector Config">>,
                     required => false
                 }
             )},
-        {syskeeper_forwarder,
+        {mongodb,
             mk(
-                hoconsc:map(name, ref(emqx_bridge_syskeeper_connector, config)),
+                hoconsc:map(name, ref(emqx_bridge_mongodb, "config_connector")),
                 #{
-                    desc => <<"Syskeeper Connector Config">>,
+                    desc => <<"MongoDB Connector Config">>,
                     required => false
                 }
             )},
-        {syskeeper_proxy,
+        {mysql,
             mk(
-                hoconsc:map(name, ref(emqx_bridge_syskeeper_proxy, config)),
+                hoconsc:map(name, ref(emqx_bridge_mysql, "config_connector")),
                 #{
-                    desc => <<"Syskeeper Proxy Connector Config">>,
+                    desc => <<"MySQL Connector Config">>,
                     required => false
                 }
             )},
@@ -126,27 +128,35 @@ connector_structs() ->
                     required => false
                 }
             )},
-        {timescale,
+        {redis,
             mk(
-                hoconsc:map(name, ref(emqx_bridge_timescale, "config_connector")),
+                hoconsc:map(name, ref(emqx_bridge_redis_schema, "config_connector")),
                 #{
-                    desc => <<"Timescale Connector Config">>,
+                    desc => <<"Redis Connector Config">>,
                     required => false
                 }
             )},
-        {matrix,
+        {syskeeper_forwarder,
             mk(
-                hoconsc:map(name, ref(emqx_bridge_matrix, "config_connector")),
+                hoconsc:map(name, ref(emqx_bridge_syskeeper_connector, config)),
                 #{
-                    desc => <<"Matrix Connector Config">>,
+                    desc => <<"Syskeeper Connector Config">>,
                     required => false
                 }
             )},
-        {redis,
+        {syskeeper_proxy,
             mk(
-                hoconsc:map(name, ref(emqx_bridge_redis_schema, "config_connector")),
+                hoconsc:map(name, ref(emqx_bridge_syskeeper_proxy, config)),
                 #{
-                    desc => <<"Redis Connector Config">>,
+                    desc => <<"Syskeeper Proxy Connector Config">>,
+                    required => false
+                }
+            )},
+        {timescale,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_timescale, "config_connector")),
+                #{
+                    desc => <<"Timescale Connector Config">>,
                     required => false
                 }
             )}
@@ -160,6 +170,7 @@ schema_modules() ->
         emqx_bridge_kafka,
         emqx_bridge_matrix,
         emqx_bridge_mongodb,
+        emqx_bridge_mysql,
         emqx_bridge_syskeeper_connector,
         emqx_bridge_syskeeper_proxy,
         emqx_bridge_timescale,
@@ -185,6 +196,7 @@ api_schemas(Method) ->
         api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
         api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"),
         api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"),
+        api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"),
         api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
         api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method),
         api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"),

+ 49 - 3
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -36,9 +36,11 @@
 -export([get_response/0, put_request/0, post_request/0]).
 
 -export([connector_type_to_bridge_types/1]).
+
 -export([
     api_fields/3,
     common_fields/0,
+    connector_values/3,
     status_and_actions_fields/0,
     type_and_name_fields/1
 ]).
@@ -128,16 +130,18 @@ connector_type_to_bridge_types(matrix) ->
     [matrix];
 connector_type_to_bridge_types(mongodb) ->
     [mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
+connector_type_to_bridge_types(mysql) ->
+    [mysql];
 connector_type_to_bridge_types(pgsql) ->
     [pgsql];
+connector_type_to_bridge_types(redis) ->
+    [redis, redis_single, redis_sentinel, redis_cluster];
 connector_type_to_bridge_types(syskeeper_forwarder) ->
     [syskeeper_forwarder];
 connector_type_to_bridge_types(syskeeper_proxy) ->
     [];
 connector_type_to_bridge_types(timescale) ->
-    [timescale];
-connector_type_to_bridge_types(redis) ->
-    [redis, redis_single, redis_sentinel, redis_cluster].
+    [timescale].
 
 actions_config_name() -> <<"actions">>.
 
@@ -549,6 +553,48 @@ resource_opts_fields(Overrides) ->
         emqx_resource_schema:create_opts(Overrides)
     ).
 
+-type http_method() :: get | post | put.
+-type schema_example_map() :: #{atom() => term()}.
+
+-spec connector_values(http_method(), atom(), schema_example_map()) -> schema_example_map().
+connector_values(Method, Type, ConnectorValues) ->
+    TypeBin = atom_to_binary(Type),
+    lists:foldl(
+        fun(M1, M2) ->
+            maps:merge(M1, M2)
+        end,
+        #{
+            description => <<"My example ", TypeBin/binary, " connector">>
+        },
+        [
+            ConnectorValues,
+            method_values(Method, Type)
+        ]
+    ).
+
+method_values(post, Type) ->
+    TypeBin = atom_to_binary(Type),
+    #{
+        name => <<TypeBin/binary, "_connector">>,
+        type => TypeBin
+    };
+method_values(get, Type) ->
+    maps:merge(
+        method_values(post, Type),
+        #{
+            status => <<"connected">>,
+            node_status => [
+                #{
+                    node => <<"emqx@localhost">>,
+                    status => <<"connected">>
+                }
+            ],
+            actions => [<<"my_action">>]
+        }
+    );
+method_values(put, _Type) ->
+    #{}.
+
 %%======================================================================================
 %% Helper Functions
 %%======================================================================================

+ 1 - 1
apps/emqx_mysql/rebar.config

@@ -3,7 +3,7 @@
 {erl_opts, [debug_info]}.
 {deps, [
         %% NOTE: mind ecpool version when updating eredis_cluster version
-        {mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.4"}}},
+        {mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.4.1"}}},
         {emqx_connector, {path, "../../apps/emqx_connector"}},
         {emqx_resource, {path, "../../apps/emqx_resource"}}
 ]}.

+ 44 - 17
apps/emqx_mysql/src/emqx_mysql.erl

@@ -36,7 +36,13 @@
 %% ecpool connect & reconnect
 -export([connect/1, prepare_sql_to_conn/2]).
 
--export([prepare_sql/2]).
+-export([
+    init_prepare/1,
+    prepare_sql/2,
+    parse_prepare_sql/1,
+    parse_prepare_sql/2,
+    unprepare_sql/1
+]).
 
 -export([roots/0, fields/1]).
 
@@ -51,9 +57,10 @@
     #{
         pool_name := binary(),
         prepares := ok | {error, _},
-        templates := #{{atom(), batch | prepstmt} => template()}
+        templates := #{{atom(), batch | prepstmt} => template()},
+        query_templates := map()
     }.
-
+-export_type([state/0]).
 %%=====================================================================
 %% Hocon schema
 roots() ->
@@ -62,8 +69,7 @@ roots() ->
 fields(config) ->
     [{server, server()}] ++
         add_default_username(emqx_connector_schema_lib:relational_db_fields(), []) ++
-        emqx_connector_schema_lib:ssl_fields() ++
-        emqx_connector_schema_lib:prepare_statement_fields().
+        emqx_connector_schema_lib:ssl_fields().
 
 add_default_username([{username, OrigUsernameFn} | Tail], Head) ->
     Head ++ [{username, add_default_fn(OrigUsernameFn, <<"root">>)} | Tail];
@@ -267,7 +273,7 @@ do_check_prepares(
     );
 do_check_prepares(#{prepares := ok}) ->
     ok;
-do_check_prepares(#{prepares := {error, _}} = State) ->
+do_check_prepares(#{prepares := {error, _}, query_templates := _} = State) ->
     %% retry to prepare
     case prepare_sql(State) of
         ok ->
@@ -275,7 +281,9 @@ do_check_prepares(#{prepares := {error, _}} = State) ->
             {ok, State#{prepares => ok}};
         {error, Reason} ->
             {error, Reason}
-    end.
+    end;
+do_check_prepares(_NoTemplates) ->
+    ok.
 
 %% ===================================================================
 
@@ -323,16 +331,18 @@ prepare_sql(Templates, PoolName) ->
     end.
 
 do_prepare_sql(Templates, PoolName) ->
-    Conns =
-        [
-            begin
-                {ok, Conn} = ecpool_worker:client(Worker),
-                Conn
-            end
-         || {_Name, Worker} <- ecpool:workers(PoolName)
-        ],
+    Conns = get_connections_from_pool(PoolName),
     prepare_sql_to_conn_list(Conns, Templates).
 
+get_connections_from_pool(PoolName) ->
+    [
+        begin
+            {ok, Conn} = ecpool_worker:client(Worker),
+            Conn
+        end
+     || {_Name, Worker} <- ecpool:workers(PoolName)
+    ].
+
 prepare_sql_to_conn_list([], _Templates) ->
     ok;
 prepare_sql_to_conn_list([Conn | ConnList], Templates) ->
@@ -369,6 +379,18 @@ prepare_sql_to_conn(Conn, [{{Key, prepstmt}, {SQL, _RowTemplate}} | Rest]) ->
 prepare_sql_to_conn(Conn, [{_Key, _Template} | Rest]) ->
     prepare_sql_to_conn(Conn, Rest).
 
+unprepare_sql(#{query_templates := Templates, pool_name := PoolName}) ->
+    ecpool:remove_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn}),
+    lists:foreach(
+        fun(Conn) ->
+            lists:foreach(
+                fun(Template) -> unprepare_sql_to_conn(Conn, Template) end,
+                maps:to_list(Templates)
+            )
+        end,
+        get_connections_from_pool(PoolName)
+    ).
+
 unprepare_sql_to_conn(Conn, {{Key, prepstmt}, _}) ->
     mysql:unprepare(Conn, Key);
 unprepare_sql_to_conn(Conn, Key) when is_atom(Key) ->
@@ -377,12 +399,15 @@ unprepare_sql_to_conn(_Conn, _) ->
     ok.
 
 parse_prepare_sql(Config) ->
+    parse_prepare_sql(send_message, Config).
+
+parse_prepare_sql(Key, Config) ->
     Queries =
         case Config of
             #{prepare_statement := Qs} ->
                 Qs;
             #{sql := Query} ->
-                #{send_message => Query};
+                #{Key => Query};
             _ ->
                 #{}
         end,
@@ -436,7 +461,9 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{query_templates := Templates}) -
                 {emqx_jsonish, SQLOrData}
             ),
             {TypeOrKey, Row}
-    end.
+    end;
+proc_sql_params(_TypeOrKey, SQLOrData, Params, _State) ->
+    {SQLOrData, Params}.
 
 on_batch_insert(InstId, BatchReqs, {InsertPart, RowTemplate}, State) ->
     Rows = [render_row(RowTemplate, Msg) || {_, Msg} <- BatchReqs],

+ 30 - 13
apps/emqx_utils/src/emqx_utils_maps.erl

@@ -16,27 +16,29 @@
 -module(emqx_utils_maps).
 
 -export([
+    best_effort_recursive_sum/3,
+    binary_key_map/1,
+    binary_string/1,
+    deep_convert/3,
+    deep_find/2,
+    deep_force_put/3,
     deep_get/2,
     deep_get/3,
-    deep_find/2,
+    deep_merge/2,
     deep_put/3,
-    deep_force_put/3,
     deep_remove/2,
-    deep_merge/2,
-    binary_key_map/1,
-    safe_atom_key_map/1,
-    unsafe_atom_key_map/1,
-    jsonable_map/1,
-    jsonable_map/2,
-    binary_string/1,
-    deep_convert/3,
     diff_maps/2,
-    best_effort_recursive_sum/3,
     if_only_to_toggle_enable/2,
-    update_if_present/3,
+    indent/3,
+    jsonable_map/1,
+    jsonable_map/2,
+    key_comparer/1,
     put_if/4,
     rename/3,
-    key_comparer/1
+    safe_atom_key_map/1,
+    unindent/2,
+    unsafe_atom_key_map/1,
+    update_if_present/3
 ]).
 
 -export_type([config_key/0, config_key_path/0]).
@@ -332,3 +334,18 @@ key_comparer(K) ->
         (M1, M2) ->
             M1 < M2
     end.
+
+-spec indent(term(), [term()], map()) -> map().
+indent(IndentKey, PickKeys, Map) ->
+    maps:put(
+        IndentKey,
+        maps:with(PickKeys, Map),
+        maps:without(PickKeys, Map)
+    ).
+
+-spec unindent(term(), map()) -> map().
+unindent(Key, Map) ->
+    maps:merge(
+        maps:remove(Key, Map),
+        maps:get(Key, Map, #{})
+    ).

+ 43 - 0
apps/emqx_utils/test/emqx_utils_maps_tests.erl

@@ -17,6 +17,8 @@
 -module(emqx_utils_maps_tests).
 -include_lib("eunit/include/eunit.hrl").
 
+-import(emqx_utils_maps, [indent/3, unindent/2]).
+
 best_effort_recursive_sum_test_() ->
     DummyLogger = fun(_) -> ok end,
     [
@@ -129,3 +131,44 @@ key_comparer_test() ->
             #{}
         ])
     ).
+
+map_indent_unindent_test_() ->
+    M = #{a => 1, b => 2},
+    [
+        ?_assertEqual(
+            #{a => 1, c => #{b => 2}},
+            indent(c, [b], M)
+        ),
+        ?_assertEqual(
+            M,
+            unindent(c, indent(c, [b], M))
+        ),
+        ?_assertEqual(
+            #{a => 1, b => #{b => 2}},
+            indent(b, [b], M)
+        ),
+        ?_assertEqual(
+            M,
+            unindent(b, #{a => 1, b => #{b => 2}})
+        ),
+        ?_assertEqual(
+            #{a => 2},
+            unindent(b, #{a => 1, b => #{a => 2}})
+        ),
+        ?_assertEqual(
+            #{c => #{a => 1, b => 2}},
+            indent(c, [a, b], M)
+        ),
+        ?_assertEqual(
+            #{a => 1, b => 2, c => #{}},
+            indent(c, [], M)
+        ),
+        ?_assertEqual(
+            #{a => 1, b => 2, c => #{}},
+            indent(c, [d, e, f], M)
+        ),
+        ?_assertEqual(
+            #{a => 1, b => 2},
+            unindent(c, M)
+        )
+    ].

+ 1 - 1
mix.exs

@@ -59,7 +59,7 @@ defmodule EMQXUmbrella.MixProject do
       {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.0", override: true},
       {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true},
       {:minirest, github: "emqx/minirest", tag: "1.3.15", override: true},
-      {:ecpool, github: "emqx/ecpool", tag: "0.5.4", override: true},
+      {:ecpool, github: "emqx/ecpool", tag: "0.5.7", override: true},
       {:replayq, github: "emqx/replayq", tag: "0.3.7", override: true},
       {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
       # maybe forbid to fetch quicer

+ 1 - 1
rebar.config

@@ -75,7 +75,7 @@
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.0"}}}
     , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}}
     , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.15"}}}
-    , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.4"}}}
+    , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.7"}}}
     , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
     , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.10.1"}}}

+ 10 - 0
rel/i18n/emqx_bridge_mysql.hocon

@@ -40,4 +40,14 @@ sql_template.desc:
 sql_template.label:
 """SQL Template"""
 
+action_parameters.label:
+"""Action Parameters"""
+action_parameters.desc:
+"""Additional parameters specific to this action type"""
+
+mysql_action.label:
+"""MySQL Action"""
+mysql_action.desc:
+"""Action to interact with a MySQL connector"""
+
 }