Ver código fonte

feat: implement Microsoft SQL Server bridge (e5.0)

JimMoen 2 anos atrás
pai
commit
5841969877

+ 2 - 1
apps/emqx_bridge/src/emqx_bridge.erl

@@ -69,7 +69,8 @@
     T == tdengine;
     T == tdengine;
     T == dynamo;
     T == dynamo;
     T == rocketmq;
     T == rocketmq;
-    T == cassandra
+    T == cassandra;
+    T == sqlserver
 ).
 ).
 
 
 load() ->
 load() ->

+ 1 - 0
changes/ee/feat-10363.en.md

@@ -0,0 +1 @@
+Add MicroSoft SQL Server data Bridge support.

+ 19 - 4
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl

@@ -34,7 +34,8 @@ api_schemas(Method) ->
         ref(emqx_ee_bridge_clickhouse, Method),
         ref(emqx_ee_bridge_clickhouse, Method),
         ref(emqx_ee_bridge_dynamo, Method),
         ref(emqx_ee_bridge_dynamo, Method),
         ref(emqx_ee_bridge_rocketmq, Method),
         ref(emqx_ee_bridge_rocketmq, Method),
-        ref(emqx_ee_bridge_cassa, Method)
+        ref(emqx_ee_bridge_cassa, Method),
+        ref(emqx_ee_bridge_sqlserver, Method)
     ].
     ].
 
 
 schema_modules() ->
 schema_modules() ->
@@ -53,7 +54,8 @@ schema_modules() ->
         emqx_ee_bridge_clickhouse,
         emqx_ee_bridge_clickhouse,
         emqx_ee_bridge_dynamo,
         emqx_ee_bridge_dynamo,
         emqx_ee_bridge_rocketmq,
         emqx_ee_bridge_rocketmq,
-        emqx_ee_bridge_cassa
+        emqx_ee_bridge_cassa,
+        emqx_ee_bridge_sqlserver
     ].
     ].
 
 
 examples(Method) ->
 examples(Method) ->
@@ -91,7 +93,8 @@ resource_type(tdengine) -> emqx_ee_connector_tdengine;
 resource_type(clickhouse) -> emqx_ee_connector_clickhouse;
 resource_type(clickhouse) -> emqx_ee_connector_clickhouse;
 resource_type(dynamo) -> emqx_ee_connector_dynamo;
 resource_type(dynamo) -> emqx_ee_connector_dynamo;
 resource_type(rocketmq) -> emqx_ee_connector_rocketmq;
 resource_type(rocketmq) -> emqx_ee_connector_rocketmq;
-resource_type(cassandra) -> emqx_ee_connector_cassa.
+resource_type(cassandra) -> emqx_ee_connector_cassa;
+resource_type(sqlserver) -> emqx_ee_connector_sqlserver.
 
 
 fields(bridges) ->
 fields(bridges) ->
     [
     [
@@ -152,7 +155,7 @@ fields(bridges) ->
                 }
                 }
             )}
             )}
     ] ++ kafka_structs() ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++
     ] ++ kafka_structs() ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++
-        pgsql_structs() ++ clickhouse_structs().
+        pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs().
 
 
 mongodb_structs() ->
 mongodb_structs() ->
     [
     [
@@ -249,3 +252,15 @@ clickhouse_structs() ->
                 }
                 }
             )}
             )}
     ].
     ].
+
+sqlserver_structs() ->
+    [
+        {sqlserver,
+            mk(
+                hoconsc:map(name, ref(emqx_ee_bridge_sqlserver, "config")),
+                #{
+                    desc => <<"Microsoft SQL Server Bridge Config">>,
+                    required => false
+                }
+            )}
+    ].

+ 128 - 0
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl

@@ -0,0 +1,128 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_ee_bridge_sqlserver).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx_bridge/include/emqx_bridge.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+-export([
+    conn_bridge_examples/1
+]).
+
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1
+]).
+
+-define(DEFAULT_SQL, <<
+    "insert into t_mqtt_msg(msgid, topic, qos, payload)"
+    "values (${id}, ${topic}, ${qos}, ${payload})"
+>>).
+
+-define(DEFAULT_DRIVER, <<"ms-sqlserver-18">>).
+
+conn_bridge_examples(Method) ->
+    [
+        #{
+            <<"sqlserver">> => #{
+                summary => <<"Microsoft SQL Server Bridge">>,
+                value => values(Method)
+            }
+        }
+    ].
+
+values(get) ->
+    values(post);
+values(post) ->
+    #{
+        enable => true,
+        type => sqlserver,
+        name => <<"bar">>,
+        server => <<"127.0.0.1:1433">>,
+        database => <<"test">>,
+        pool_size => 8,
+        username => <<"sa">>,
+        password => <<"******">>,
+        sql => ?DEFAULT_SQL,
+        driver => ?DEFAULT_DRIVER,
+        local_topic => <<"local/topic/#">>,
+        resource_opts => #{
+            worker_pool_size => 1,
+            health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
+            auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
+            batch_size => ?DEFAULT_BATCH_SIZE,
+            batch_time => ?DEFAULT_BATCH_TIME,
+            query_mode => async,
+            max_queue_bytes => ?DEFAULT_QUEUE_SIZE
+        }
+    };
+values(put) ->
+    values(post).
+
+%% -------------------------------------------------------------------------------------------------
+%% Hocon Schema Definitions
+namespace() -> "bridge_sqlserver".
+
+roots() -> [].
+
+fields("config") ->
+    [
+        {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
+        {sql,
+            mk(
+                binary(),
+                #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
+            )},
+        {driver, mk(binary(), #{desc => ?DESC("driver"), default => ?DEFAULT_DRIVER})},
+        {local_topic,
+            mk(
+                binary(),
+                #{desc => ?DESC("local_topic"), default => undefined}
+            )},
+        {resource_opts,
+            mk(
+                ref(?MODULE, "creation_opts"),
+                #{
+                    required => false,
+                    default => #{},
+                    desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
+                }
+            )}
+    ] ++
+        (emqx_ee_connector_sqlserver:fields(config) --
+            emqx_connector_schema_lib:prepare_statement_fields());
+fields("creation_opts") ->
+    emqx_resource_schema:fields("creation_opts");
+fields("post") ->
+    fields("post", sqlserver);
+fields("put") ->
+    fields("config");
+fields("get") ->
+    emqx_bridge_schema:status_fields() ++ fields("post").
+
+fields("post", Type) ->
+    [type_field(Type), name_field() | fields("config")].
+
+desc("config") ->
+    ?DESC("desc_config");
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+    ["Configuration for Microsoft SQL Server using `", string:to_upper(Method), "` method."];
+desc("creation_opts" = Name) ->
+    emqx_resource_schema:desc(Name);
+desc(_) ->
+    undefined.
+
+%% -------------------------------------------------------------------------------------------------
+
+type_field(Type) ->
+    {type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
+
+name_field() ->
+    {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.

+ 3 - 1
lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src

@@ -5,13 +5,15 @@
     {applications, [
     {applications, [
         kernel,
         kernel,
         stdlib,
         stdlib,
+        ecpool,
         hstreamdb_erl,
         hstreamdb_erl,
         influxdb,
         influxdb,
         tdengine,
         tdengine,
         clickhouse,
         clickhouse,
         erlcloud,
         erlcloud,
         rocketmq,
         rocketmq,
-        ecql
+        ecql,
+        odbc
     ]},
     ]},
     {env, []},
     {env, []},
     {modules, []},
     {modules, []},

+ 531 - 0
lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl

@@ -0,0 +1,531 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ee_connector_sqlserver).
+
+-behaviour(emqx_resource).
+
+-include_lib("kernel/include/file.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+-include_lib("emqx_ee_connector/include/emqx_ee_connector.hrl").
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+%%====================================================================
+%% Exports
+%%====================================================================
+
+%% Hocon config schema exports
+-export([
+    roots/0,
+    fields/1
+]).
+
+%% callbacks for behaviour emqx_resource
+-export([
+    callback_mode/0,
+    is_buffer_supported/0,
+    on_start/2,
+    on_stop/2,
+    on_query/3,
+    on_batch_query/3,
+    on_query_async/4,
+    on_batch_query_async/4,
+    on_get_status/2
+]).
+
+%% callbacks for ecpool
+-export([connect/1]).
+
+%% Internal exports used to execute code with ecpool worker
+-export([do_get_status/2, worker_do_insert/3, do_async_reply/2]).
+
+-import(emqx_plugin_libs_rule, [str/1]).
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+-define(ACTION_SEND_MESSAGE, send_message).
+
+-define(SINGLE_INSERT, single_insert).
+
+-define(SYNC_QUERY_MODE, handover).
+-define(ASYNC_QUERY_MODE(REPLY), {handover_async, {?MODULE, do_async_reply, [REPLY]}}).
+
+-define(SQLSERVER_HOST_OPTIONS, #{
+    default_port => 1433
+}).
+
+-define(REQUEST_TIMEOUT(RESOURCE_OPTS),
+    maps:get(request_timeout, RESOURCE_OPTS, ?DEFAULT_REQUEST_TIMEOUT)
+).
+
+-define(SINGLE_INSERT_TEMP, single_insert_temp).
+-define(BATCH_INSERT_TEMP, batch_insert_temp).
+
+-define(BATCH_INSERT_PART, batch_insert_part).
+-define(BATCH_PARAMS_TOKENS, batch_insert_tks).
+
+%% Copied from odbc reference page
+%% https://www.erlang.org/doc/man/odbc.html
+
+-type bridge_id() :: binary().
+%% as returned by connect/2
+-type connection_reference() :: pid().
+-type time_out() :: milliseconds() | infinity.
+-type sql() :: string() | binary().
+-type milliseconds() :: pos_integer().
+%% Tuple of column values e.g. one row of the result set.
+%% it's a variable size tuple of column values.
+-type row() :: tuple().
+%% Some kind of explanation of what went wrong
+-type common_reason() :: connection_closed | extended_error() | term().
+%% extended error type with ODBC
+%% and native database error codes, as well as the base reason that would have been
+%% returned had extended_errors not been enabled.
+-type extended_error() :: {string(), integer(), _Reason :: term()}.
+%% Name of column in the result set
+-type col_name() :: string().
+%% e.g. a list of the names of the selected columns in the result set.
+-type col_names() :: [col_name()].
+%% A list of rows from the result set.
+-type rows() :: list(row()).
+
+%% -type result_tuple() :: {updated, n_rows()} | {selected, col_names(), rows()}.
+-type updated_tuple() :: {updated, n_rows()}.
+-type selected_tuple() :: {selected, col_names(), rows()}.
+%% The number of affected rows for UPDATE,
+%% INSERT, or DELETE queries. For other query types the value
+%% is driver defined, and hence should be ignored.
+-type n_rows() :: integer().
+
+%% These type was not used in this module, but we may use it later
+%% -type odbc_data_type() ::
+%%     sql_integer
+%%     | sql_smallint
+%%     | sql_tinyint
+%%     | {sql_decimal, precision(), scale()}
+%%     | {sql_numeric, precision(), scale()}
+%%     | {sql_char, size()}
+%%     | {sql_wchar, size()}
+%%     | {sql_varchar, size()}
+%%     | {sql_wvarchar, size()}
+%%     | {sql_float, precision()}
+%%     | {sql_wlongvarchar, size()}
+%%     | {sql_float, precision()}
+%%     | sql_real
+%%     | sql_double
+%%     | sql_bit
+%%     | atom().
+%% -type precision() :: integer().
+%% -type scale() :: integer().
+%% -type size() :: integer().
+
+-type state() :: #{
+    poolname := binary(),
+    resource_opts := map(),
+    sql_templates := map()
+}.
+
+%%====================================================================
+%% Configuration and default values
+%%====================================================================
+
+roots() ->
+    [{config, #{type => hoconsc:ref(?MODULE, config)}}].
+
+fields(config) ->
+    [
+        {server, server()}
+        | add_default_username(emqx_connector_schema_lib:relational_db_fields())
+    ].
+
+add_default_username(Fields) ->
+    lists:map(
+        fun
+            ({username, OrigUsernameFn}) ->
+                {username, add_default_fn(OrigUsernameFn, <<"sa">>)};
+            (Field) ->
+                Field
+        end,
+        Fields
+    ).
+
+add_default_fn(OrigFn, Default) ->
+    fun
+        (default) -> Default;
+        (Field) -> OrigFn(Field)
+    end.
+
+server() ->
+    Meta = #{desc => ?DESC("server")},
+    emqx_schema:servers_sc(Meta, ?SQLSERVER_HOST_OPTIONS).
+
+%%====================================================================
+%% Callbacks defined in emqx_resource
+%%====================================================================
+
+callback_mode() -> async_if_possible.
+
+is_buffer_supported() -> false.
+
+on_start(
+    BridgeId = PoolName,
+    #{
+        server := Server,
+        username := Username,
+        password := Password,
+        driver := Driver,
+        database := Database,
+        pool_size := PoolSize,
+        resource_opts := ResourceOpts
+    } = Config
+) ->
+    ?SLOG(info, #{
+        msg => "starting_sqlserver_connector",
+        connector => BridgeId,
+        config => emqx_misc:redact(Config)
+    }),
+
+    ODBCDir = code:priv_dir(odbc),
+    OdbcserverDir = filename:join(ODBCDir, "bin/odbcserver"),
+    {ok, Info = #file_info{mode = Mode}} = file:read_file_info(OdbcserverDir),
+    case 33261 =:= Mode of
+        true ->
+            ok;
+        false ->
+            _ = file:write_file_info(OdbcserverDir, Info#file_info{mode = 33261}),
+            ok
+    end,
+
+    Options = [
+        {server, to_bin(Server)},
+        {username, Username},
+        {password, Password},
+        {driver, Driver},
+        {database, Database},
+        {pool_size, PoolSize},
+        {poolname, PoolName}
+    ],
+
+    State = #{
+        %% also BridgeId
+        poolname => PoolName,
+        sql_templates => parse_sql_template(Config),
+        resource_opts => ResourceOpts
+    },
+    case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options) of
+        ok ->
+            {ok, State};
+        {error, Reason} ->
+            ?tp(
+                sqlserver_connector_start_failed,
+                #{error => Reason}
+            ),
+            {error, Reason}
+    end.
+
+on_stop(InstanceId, #{poolname := PoolName} = _State) ->
+    ?SLOG(info, #{
+        msg => "stopping_sqlserver_connector",
+        connector => InstanceId
+    }),
+    emqx_plugin_libs_pool:stop_pool(PoolName).
+
+-spec on_query(
+    bridge_id(),
+    {?ACTION_SEND_MESSAGE, map()},
+    state()
+) ->
+    ok
+    | {ok, list()}
+    | {error, {recoverable_error, term()}}
+    | {error, term()}.
+on_query(BridgeId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) ->
+    ?TRACE(
+        "SINGLE_QUERY_SYNC",
+        "bridge_sqlserver_received",
+        #{requests => Query, connector => BridgeId, state => State}
+    ),
+    do_query(BridgeId, Query, ?SYNC_QUERY_MODE, State).
+
+-spec on_query_async(
+    bridge_id(),
+    {?ACTION_SEND_MESSAGE, map()},
+    {ReplyFun :: function(), Args :: list()},
+    state()
+) ->
+    {ok, any()}
+    | {error, term()}.
+on_query_async(
+    BridgeId,
+    {?ACTION_SEND_MESSAGE, _Msg} = Query,
+    ReplyFunAndArgs,
+    %% #{poolname := PoolName, sql_templates := Templates} = State
+    State
+) ->
+    ?TRACE(
+        "SINGLE_QUERY_ASYNC",
+        "bridge_sqlserver_received",
+        #{requests => Query, connector => BridgeId, state => State}
+    ),
+    do_query(BridgeId, Query, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State).
+
+-spec on_batch_query(
+    bridge_id(),
+    [{?ACTION_SEND_MESSAGE, map()}],
+    state()
+) ->
+    ok
+    | {ok, list()}
+    | {error, {recoverable_error, term()}}
+    | {error, term()}.
+on_batch_query(BridgeId, BatchRequests, State) ->
+    ?TRACE(
+        "BATCH_QUERY_SYNC",
+        "bridge_sqlserver_received",
+        #{requests => BatchRequests, connector => BridgeId, state => State}
+    ),
+    do_query(BridgeId, BatchRequests, ?SYNC_QUERY_MODE, State).
+
+-spec on_batch_query_async(
+    bridge_id(),
+    [{?ACTION_SEND_MESSAGE, map()}],
+    {ReplyFun :: function(), Args :: list()},
+    state()
+) -> {ok, any()}.
+on_batch_query_async(BridgeId, Requests, ReplyFunAndArgs, State) ->
+    ?TRACE(
+        "BATCH_QUERY_ASYNC",
+        "bridge_sqlserver_received",
+        #{requests => Requests, connector => BridgeId, state => State}
+    ),
+    do_query(BridgeId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State).
+
+on_get_status(_InstanceId, #{poolname := Pool, resource_opts := ResourceOpts} = _State) ->
+    RequestTimeout = ?REQUEST_TIMEOUT(ResourceOpts),
+    Health = emqx_plugin_libs_pool:health_check_ecpool_workers(
+        Pool, {?MODULE, do_get_status, [RequestTimeout]}, RequestTimeout
+    ),
+    status_result(Health).
+
+status_result(_Status = true) -> connected;
+status_result(_Status = false) -> connecting.
+%% TODO:
+%% case for disconnected
+
+%%====================================================================
+%% ecpool callback fns
+%%====================================================================
+
+-spec connect(Options :: list()) -> {ok, connection_reference()} | {error, term()}.
+connect(Options) ->
+    ConnectStr = lists:concat(conn_str(Options, [])),
+    Opts = proplists:get_value(options, Options, []),
+    odbc:connect(ConnectStr, Opts).
+
+-spec do_get_status(connection_reference(), time_out()) -> Result :: boolean().
+do_get_status(Conn, RequestTimeout) ->
+    case execute(Conn, <<"SELECT 1">>, RequestTimeout) of
+        {selected, [[]], [{1}]} -> true;
+        _ -> false
+    end.
+
+%%====================================================================
+%% Internal Helper fns
+%%====================================================================
+
+%% TODO && FIXME:
+%% About the connection string attribute `Encrypt`:
+%% The default value is `yes` in odbc version 18.0+ and `no` in previous versions.
+%% And encrypted connections always verify the server's certificate.
+%% So `Encrypt=YES;TrustServerCertificate=YES` must be set in the connection string when connecting to a server that has a self-signed certificate.
+%% See also:
+%% https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?source=recommendations&view=sql-server-ver16#encrypt
+conn_str([], Acc) ->
+    %% we should use this for msodbcsql 18+
+    %% lists:join(";", ["Encrypt=YES", "TrustServerCertificate=YES" | Acc]);
+    lists:join(";", Acc);
+conn_str([{driver, Driver} | Opts], Acc) ->
+    conn_str(Opts, ["Driver=" ++ str(Driver) | Acc]);
+conn_str([{server, Server} | Opts], Acc) ->
+    {Host, Port} = emqx_schema:parse_server(Server, ?SQLSERVER_HOST_OPTIONS),
+    conn_str(Opts, ["Server=" ++ str(Host) ++ "," ++ str(Port) | Acc]);
+conn_str([{database, Database} | Opts], Acc) ->
+    conn_str(Opts, ["Database=" ++ str(Database) | Acc]);
+conn_str([{username, Username} | Opts], Acc) ->
+    conn_str(Opts, ["UID=" ++ str(Username) | Acc]);
+conn_str([{password, Password} | Opts], Acc) ->
+    conn_str(Opts, ["PWD=" ++ str(Password) | Acc]);
+conn_str([{_, _} | Opts], Acc) ->
+    conn_str(Opts, Acc).
+
+%% Sync & Async query with singe & batch sql statement
+-spec do_query(
+    bridge_id(),
+    Query :: {?ACTION_SEND_MESSAGE, map()} | [{?ACTION_SEND_MESSAGE, map()}],
+    ApplyMode ::
+        handover
+        | {handover_async, {?MODULE, do_async_reply, [{ReplyFun :: function(), Args :: list()}]}},
+    state()
+) ->
+    {ok, list()}
+    | {error, {recoverable_error, term()}}
+    | {error, term()}.
+do_query(
+    BridgeId,
+    Query,
+    ApplyMode,
+    #{poolname := PoolName, sql_templates := Templates} = State
+) ->
+    ?TRACE(
+        "SINGLE_QUERY_SYNC",
+        "sqlserver_connector_received",
+        #{query => Query, connector => BridgeId, state => State}
+    ),
+
+    %% only insert sql statement for single query and batch query
+    case apply_template(Query, Templates) of
+        {?ACTION_SEND_MESSAGE, SQL} ->
+            Result = ecpool:pick_and_do(
+                PoolName,
+                {?MODULE, worker_do_insert, [SQL, State]},
+                ApplyMode
+            );
+        Query ->
+            Result = {error, {unrecoverable_error, invalid_query}};
+        _ ->
+            Result = {error, {unrecoverable_error, failed_to_apply_sql_template}}
+    end,
+    case Result of
+        {error, Reason} ->
+            ?tp(
+                sqlserver_connector_query_return,
+                #{error => Reason}
+            ),
+            ?SLOG(error, #{
+                msg => "sqlserver_connector_do_query_failed",
+                connector => BridgeId,
+                query => Query,
+                reason => Reason
+            }),
+            Result;
+        _ ->
+            ?tp(
+                sqlserver_connector_query_return,
+                #{result => Result}
+            ),
+            Result
+    end.
+
+worker_do_insert(
+    Conn, SQL, #{resource_opts := ResourceOpts, poolname := BridgeId} = State
+) ->
+    LogMeta = #{connector => BridgeId, state => State},
+    try
+        case execute(Conn, SQL, ?REQUEST_TIMEOUT(ResourceOpts)) of
+            {selected, Rows, _} ->
+                {ok, Rows};
+            {updated, _} ->
+                ok;
+            {error, ErrStr} ->
+                ?SLOG(error, LogMeta#{msg => "invalid_request", reason => ErrStr}),
+                {error, {unrecoverable_error, {invalid_request, ErrStr}}}
+        end
+    catch
+        _Type:Reason ->
+            ?SLOG(error, LogMeta#{msg => "invalid_request", reason => Reason}),
+            {error, {unrecoverable_error, {invalid_request, Reason}}}
+    end.
+
+-spec execute(pid(), sql(), time_out()) ->
+    updated_tuple()
+    | selected_tuple()
+    | [updated_tuple()]
+    | [selected_tuple()]
+    | {error, common_reason()}.
+execute(Conn, SQL, Timeout) ->
+    odbc:sql_query(Conn, str(SQL), Timeout).
+
+to_bin(List) when is_list(List) ->
+    unicode:characters_to_binary(List, utf8).
+
+%% for bridge data to sql server
+parse_sql_template(Config) ->
+    RawSQLTemplates =
+        case maps:get(sql, Config, undefined) of
+            undefined -> #{};
+            <<>> -> #{};
+            SQLTemplate -> #{?ACTION_SEND_MESSAGE => SQLTemplate}
+        end,
+
+    SingleInsertTks = #{},
+    BatchInsertTks = #{},
+    parse_sql_template(maps:to_list(RawSQLTemplates), SingleInsertTks, BatchInsertTks).
+
+parse_sql_template([{Key, H} | T], SingleInsertTks, BatchInsertTks) ->
+    case emqx_plugin_libs_rule:detect_sql_type(H) of
+        {ok, select} ->
+            parse_sql_template(T, SingleInsertTks, BatchInsertTks);
+        {ok, insert} ->
+            case emqx_plugin_libs_rule:split_insert_sql(H) of
+                {ok, {InsertSQL, Params}} ->
+                    parse_sql_template(
+                        T,
+                        SingleInsertTks#{Key => emqx_plugin_libs_rule:preproc_tmpl(H)},
+                        BatchInsertTks#{
+                            Key =>
+                                #{
+                                    ?BATCH_INSERT_PART => InsertSQL,
+                                    ?BATCH_PARAMS_TOKENS => emqx_plugin_libs_rule:preproc_tmpl(
+                                        Params
+                                    )
+                                }
+                        }
+                    );
+                {error, Reason} ->
+                    ?SLOG(error, #{msg => "split sql failed", sql => H, reason => Reason}),
+                    parse_sql_template(T, SingleInsertTks, BatchInsertTks)
+            end;
+        {error, Reason} ->
+            ?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}),
+            parse_sql_template(T, SingleInsertTks, BatchInsertTks)
+    end;
+parse_sql_template([], SingleInsertTks, BatchInsertTks) ->
+    #{
+        ?SINGLE_INSERT_TEMP => SingleInsertTks,
+        ?BATCH_INSERT_TEMP => BatchInsertTks
+    }.
+
+%% single insert
+apply_template(
+    {?ACTION_SEND_MESSAGE = Key, Msg} = Query,
+    #{?SINGLE_INSERT_TEMP := SingleInsertTks} = _Templates
+) ->
+    case maps:get(Key, SingleInsertTks, undefined) of
+        undefined ->
+            Query;
+        Template ->
+            {Key, emqx_plugin_libs_rule:proc_tmpl(Template, Msg)}
+    end;
+%% batch inserts
+apply_template(
+    [{?ACTION_SEND_MESSAGE = Key, _Msg} | _T] = BatchReqs,
+    #{?BATCH_INSERT_TEMP := BatchInsertsTks} = _Templates
+) ->
+    case maps:get(Key, BatchInsertsTks, undefined) of
+        undefined ->
+            BatchReqs;
+        #{?BATCH_INSERT_PART := BatchInserts, ?BATCH_PARAMS_TOKENS := BatchParamsTks} ->
+            SQL = emqx_plugin_libs_rule:proc_batch_sql(BatchReqs, BatchInserts, BatchParamsTks),
+            {Key, SQL}
+    end;
+apply_template(Query, Templates) ->
+    %% TODO: more detail infomatoin
+    ?SLOG(error, #{msg => "apply sql template failed", query => Query, templates => Templates}).
+
+do_async_reply(Result, {ReplyFun, Args}) ->
+    erlang:apply(ReplyFun, Args ++ Result).

+ 87 - 0
rel/i18n/emqx_ee_bridge_sqlserver.hocon

@@ -0,0 +1,87 @@
+emqx_ee_bridge_sqlserver {
+
+  local_topic {
+    desc {
+      en: """The MQTT topic filter to be forwarded to MicroSoft SQL Server. All MQTT 'PUBLISH' messages with the topic
+matching the local_topic will be forwarded.</br>
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is
+configured, then both the data got from the rule and the MQTT messages that match local_topic
+will be forwarded.
+"""
+      zh: """发送到 'local_topic' 的消息都会转发到 MicroSoft SQL Server。 </br>
+注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。
+"""
+    }
+    label {
+      en: """Local Topic"""
+      zh: """本地 Topic"""
+    }
+  }
+
+  sql_template {
+    desc {
+      en: """SQL Template"""
+      zh: """SQL 模板"""
+    }
+    label {
+      en: """SQL Template"""
+      zh: """SQL 模板"""
+    }
+  }
+
+  driver {
+    desc {
+      en: """SQL Server Driver Name"""
+      zh: """SQL Server Driver 名称"""
+    }
+    desc {
+      en: """SQL Server Driver Name"""
+      zh: """SQL Server Driver 名称"""
+    }
+  }
+
+  config_enable {
+    desc {
+      en: """Enable or disable this bridge"""
+      zh: """启用/禁用桥接"""
+    }
+    label {
+      en: """Enable Or Disable Bridge"""
+      zh: """启用/禁用桥接"""
+    }
+  }
+
+  desc_config {
+    desc {
+      en: """Configuration for an MicroSoft SQL Server bridge."""
+      zh: """MicroSoft SQL Server 桥接配置"""
+    }
+    label: {
+      en: """MicroSoft SQL Server Bridge Configuration"""
+      zh: """MicroSoft SQL Server 桥接配置"""
+    }
+  }
+
+  desc_type {
+    desc {
+      en: """The Bridge Type"""
+      zh: """Bridge 类型"""
+    }
+    label {
+      en: """Bridge Type"""
+      zh: """桥接类型"""
+    }
+  }
+
+  desc_name {
+    desc {
+      en: """Bridge name."""
+      zh: """桥接名字"""
+    }
+    label {
+      en: """Bridge Name"""
+      zh: """桥接名字"""
+    }
+  }
+
+}

+ 22 - 0
rel/i18n/emqx_ee_connector_sqlserver.hocon

@@ -0,0 +1,22 @@
+emqx_ee_connector_sqlserver {
+
+  server {
+    desc {
+      en: """
+The IPv4 or IPv6 address or the hostname to connect to.<br/>
+A host entry has the following form: `Host[:Port]`.<br/>
+The SQL Server default port 1433 is used if `[:Port]` is not specified.
+"""
+      zh: """
+将要连接的 IPv4 或 IPv6 地址,或者主机名。<br/>
+主机名具有以下形式:`Host[:Port]`。<br/>
+如果未指定 `[:Port]`,则使用 SQL Server 默认端口 1433。
+"""
+    }
+    label: {
+      en: "Server Host"
+      zh: "服务器地址"
+    }
+  }
+
+}