فهرست منبع

test: MS SQL Server data bridge

JimMoen 2 سال پیش
والد
کامیت
c366267b0f

+ 3 - 0
.ci/docker-compose-file/.env

@@ -8,4 +8,7 @@ TDENGINE_TAG=3.0.2.4
 DYNAMO_TAG=1.21.0
 CASSANDRA_TAG=3.11.6
 
+MS_IMAGE_ADDR=mcr.microsoft.com/mssql/server
+SQLSERVER_TAG=2019-CU11-ubuntu-20.04
+
 TARGET=emqx/emqx

+ 19 - 0
.ci/docker-compose-file/docker-compose-sqlserver.yaml

@@ -0,0 +1,19 @@
+version: '3.9'
+
+services:
+  sql_server:
+    container_name: sqlserver
+    # See also:
+    # https://mcr.microsoft.com/en-us/product/mssql/server/about
+    # https://hub.docker.com/_/microsoft-mssql-server
+    image: ${MS_IMAGE_ADDR}:${SQLSERVER_TAG}
+    environment:
+      # See also:
+      # https://learn.microsoft.com/en-us/sql/linux/sql-server-linux-configure-environment-variables
+      ACCEPT_EULA: "Y"
+      SA_PASSWORD: "mqtt_public"
+    restart: always
+    # ports:
+    #   - "1433:1433"
+    networks:
+      - emqx_bridge

+ 1 - 0
.ci/docker-compose-file/docker-compose-toxiproxy.yaml

@@ -16,6 +16,7 @@ services:
       - 8474:8474
       - 8086:8086
       - 8087:8087
+      - 11433:1433
       - 13306:3306
       - 13307:3307
       - 15432:5432

+ 1 - 0
.ci/docker-compose-file/docker-compose.yaml

@@ -24,6 +24,7 @@ services:
       - /tmp/emqx-ci/emqx-shared-secret:/var/lib/secret
       - ./kerberos/krb5.conf:/etc/kdc/krb5.conf
       - ./kerberos/krb5.conf:/etc/krb5.conf
+      - ./odbc/odbcinst.ini:/etc/odbcinst.ini
     working_dir: /emqx
     tty: true
     user: "${DOCKER_USER:-root}"

+ 10 - 0
.ci/docker-compose-file/odbc/odbcinst.ini

@@ -0,0 +1,10 @@
+[ms-sql]
+Description=Microsoft ODBC Driver 17 for SQL Server
+Driver=/opt/microsoft/msodbcsql17/lib64/libmsodbcsql-17.10.so.2.1
+UsageCount=1
+
+[ODBC Driver 17 for SQL Server]
+Description=Microsoft ODBC Driver 17 for SQL Server
+Driver=/opt/microsoft/msodbcsql17/lib64/libmsodbcsql-17.10.so.2.1
+UsageCount=1
+

+ 6 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -95,5 +95,11 @@
     "listen": "0.0.0.0:9142",
     "upstream": "cassandra:9142",
     "enabled": true
+  },
+  {
+    "name": "sqlserver",
+    "listen": "0.0.0.0:1433",
+    "upstream": "sqlserver:1433",
+    "enabled": true
   }
 ]

+ 1 - 0
lib-ee/emqx_ee_bridge/docker-ct

@@ -11,3 +11,4 @@ clickhouse
 dynamo
 rocketmq
 cassandra
+sqlserver

+ 654 - 0
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_sqlserver_SUITE.erl

@@ -0,0 +1,654 @@
+%%--------------------------------------------------------------------
+% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ee_bridge_sqlserver_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+% SQL definitions
+-define(SQL_BRIDGE,
+    "insert into t_mqtt_msg(msgid, topic, qos, payload) values ( ${id}, ${topic}, ${qos}, ${payload})"
+).
+-define(SQL_SERVER_DRIVER, "ms-sql").
+
+-define(SQL_CREATE_DATABASE_IF_NOT_EXISTS,
+    " IF NOT EXISTS(SELECT name FROM sys.databases WHERE name = 'mqtt')"
+    " BEGIN"
+    " CREATE DATABASE mqtt;"
+    " END"
+).
+
+-define(SQL_CREATE_TABLE_IN_DB_MQTT,
+    " CREATE TABLE mqtt.dbo.t_mqtt_msg"
+    " (id int PRIMARY KEY IDENTITY(1000000001,1) NOT NULL,"
+    " msgid   VARCHAR(64) NULL,"
+    " topic   VARCHAR(100) NULL,"
+    " qos     tinyint NOT NULL DEFAULT 0,"
+    " payload NVARCHAR(100) NULL,"
+    " arrived DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP)"
+).
+
+-define(SQL_DROP_DB_MQTT, "DROP DATABASE mqtt").
+-define(SQL_DROP_TABLE, "DROP TABLE mqtt.dbo.t_mqtt_msg").
+-define(SQL_DELETE, "DELETE from mqtt.dbo.t_mqtt_msg").
+-define(SQL_SELECT, "SELECT payload FROM mqtt.dbo.t_mqtt_msg").
+-define(SQL_SELECT_COUNT, "SELECT COUNT(*) FROM mqtt.dbo.t_mqtt_msg").
+% DB defaults
+-define(SQL_SERVER_DATABASE, "mqtt").
+-define(SQL_SERVER_USERNAME, "sa").
+-define(SQL_SERVER_PASSWORD, "mqtt_public").
+-define(BATCH_SIZE, 10).
+-define(REQUEST_TIMEOUT_MS, 500).
+
+-define(WORKER_POOL_SIZE, 4).
+
+-define(WITH_CON(Process),
+    Con = connect_direct_sqlserver(Config),
+    Process,
+    ok = disconnect(Con)
+).
+
+%% How to run it locally:
+%%   A: run ct on host
+%%     1. Start all deps services
+%%       sudo docker compose -f .ci/docker-compose-file/docker-compose.yaml \
+%%                           -f .ci/docker-compose-file/docker-compose-sqlserver.yaml \
+%%                           -f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \
+%%                           up --build
+%%
+%%     2. Run use cases with special environment variables
+%%       11433 is toxiproxy exported port
+%%       Local:
+%%       ```
+%%       SQLSERVER_HOST=toxiproxy SQLSERVER_PORT=11433 \
+%%           PROXY_HOST=toxiproxy PROXY_PORT=1433 \
+%%           ./rebar3 as test ct -c -v --readable true --name ct@127.0.0.1 --suite lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_sqlserver_SUITE.erl
+%%       ```
+%%
+%%   B: run ct in docker container
+%%     run script:
+%%     ./scripts/ct/run.sh --ci --app lib-ee/emqx_ee_bridge/ \
+%%                        -- --name 'test@127.0.0.1' -c -v --readable true --suite lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_sqlserver_SUITE.erl
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    [
+        {group, async},
+        {group, sync}
+    ].
+
+groups() ->
+    TCs = emqx_common_test_helpers:all(?MODULE),
+    NonBatchCases = [t_write_timeout],
+    BatchingGroups = [{group, with_batch}, {group, without_batch}],
+    [
+        {async, BatchingGroups},
+        {sync, BatchingGroups},
+        {with_batch, TCs -- NonBatchCases},
+        {without_batch, TCs}
+    ].
+
+init_per_group(async, Config) ->
+    [{query_mode, async} | Config];
+init_per_group(sync, Config) ->
+    [{query_mode, sync} | Config];
+init_per_group(with_batch, Config0) ->
+    Config = [{enable_batch, true} | Config0],
+    common_init(Config);
+init_per_group(without_batch, Config0) ->
+    Config = [{enable_batch, false} | Config0],
+    common_init(Config);
+init_per_group(_Group, Config) ->
+    Config.
+
+end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
+    connect_and_drop_table(Config),
+    connect_and_drop_db(Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    ok;
+end_per_group(_Group, _Config) ->
+    ok.
+
+init_per_suite(Config) ->
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_mgmt_api_test_util:end_suite(),
+    ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
+    ok.
+
+init_per_testcase(_Testcase, Config) ->
+    %% drop database and table
+    %% connect_and_clear_table(Config),
+    %% create a new one
+    %% TODO: create a new database for each test case
+    delete_bridge(Config),
+    snabbkaffe:start_trace(),
+    Config.
+
+end_per_testcase(_Testcase, Config) ->
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    connect_and_clear_table(Config),
+    ok = snabbkaffe:stop(),
+    delete_bridge(Config),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_setup_via_config_and_publish(Config) ->
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    Val = str(erlang:unique_integer()),
+    SentData = sent_data(Val),
+    ?check_trace(
+        begin
+            ?wait_async_action(
+                ?assertEqual(ok, send_message(Config, SentData)),
+                #{?snk_kind := sqlserver_connector_query_return},
+                10_000
+            ),
+            ?assertMatch(
+                [{Val}],
+                connect_and_get_payload(Config)
+            ),
+            ok
+        end,
+        fun(Trace0) ->
+            Trace = ?of_kind(sqlserver_connector_query_return, Trace0),
+            ?assertMatch([#{result := ok}], Trace),
+            ok
+        end
+    ),
+    ok.
+
+t_setup_via_http_api_and_publish(Config) ->
+    BridgeType = ?config(sqlserver_bridge_type, Config),
+    Name = ?config(sqlserver_name, Config),
+    SQLServerConfig0 = ?config(sqlserver_config, Config),
+    SQLServerConfig = SQLServerConfig0#{
+        <<"name">> => Name,
+        <<"type">> => BridgeType
+    },
+    ?assertMatch(
+        {ok, _},
+        create_bridge_http(SQLServerConfig)
+    ),
+    Val = str(erlang:unique_integer()),
+    SentData = sent_data(Val),
+    ?check_trace(
+        begin
+            ?wait_async_action(
+                ?assertEqual(ok, send_message(Config, SentData)),
+                #{?snk_kind := sqlserver_connector_query_return},
+                10_000
+            ),
+            ?assertMatch(
+                [{Val}],
+                connect_and_get_payload(Config)
+            ),
+            ok
+        end,
+        fun(Trace0) ->
+            Trace = ?of_kind(sqlserver_connector_query_return, Trace0),
+            ?assertMatch([#{result := ok}], Trace),
+            ok
+        end
+    ),
+    ok.
+
+t_get_status(Config) ->
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyName = ?config(proxy_name, Config),
+
+    Name = ?config(sqlserver_name, Config),
+    BridgeType = ?config(sqlserver_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+
+    ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)),
+    emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+        case emqx_resource_manager:health_check(ResourceID) of
+            {ok, Status} when Status =:= disconnected orelse Status =:= connecting ->
+                ok;
+            {error, timeout} ->
+                ok;
+            Other ->
+                ?assert(
+                    false, lists:flatten(io_lib:format("invalid health check result:~p~n", [Other]))
+                )
+        end
+    end),
+    ok.
+
+t_create_disconnected(Config) ->
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyName = ?config(proxy_name, Config),
+    ?check_trace(
+        emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+            ?assertMatch({ok, _}, create_bridge(Config))
+        end),
+        fun(Trace) ->
+            ?assertMatch(
+                [#{error := {start_pool_failed, _, _}}],
+                ?of_kind(sqlserver_connector_start_failed, Trace)
+            ),
+            ok
+        end
+    ),
+    ok.
+
+t_write_failure(Config) ->
+    ProxyName = ?config(proxy_name, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    QueryMode = ?config(query_mode, Config),
+    Val = str(erlang:unique_integer()),
+    SentData = sent_data(Val),
+    {{ok, _}, {ok, _}} =
+        ?wait_async_action(
+            create_bridge(Config),
+            #{?snk_kind := resource_connected_enter},
+            20_000
+        ),
+    emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+        case QueryMode of
+            sync ->
+                ?assertMatch(
+                    {error, {resource_error, #{reason := timeout}}},
+                    send_message(Config, SentData)
+                );
+            async ->
+                ?assertMatch(
+                    ok, send_message(Config, SentData)
+                )
+        end
+    end),
+    ok.
+
+t_write_timeout(_Config) ->
+    %% msodbc driver handled all connection exceptions
+    %% the case is same as t_write_failure/1
+    ok.
+
+t_simple_query(Config) ->
+    BatchSize = batch_size(Config),
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    {Requests, Vals} = gen_batch_req(BatchSize),
+    ?check_trace(
+        begin
+            ?wait_async_action(
+                begin
+                    [?assertEqual(ok, query_resource(Config, Request)) || Request <- Requests]
+                end,
+                #{?snk_kind := sqlserver_connector_query_return},
+                10_000
+            ),
+            %% just assert the data count is correct
+            ?assertMatch(
+                BatchSize,
+                connect_and_get_count(Config)
+            ),
+            %% assert the data order is correct
+            ?assertMatch(
+                Vals,
+                connect_and_get_payload(Config)
+            )
+        end,
+        fun(Trace0) ->
+            Trace = ?of_kind(sqlserver_connector_query_return, Trace0),
+            case BatchSize of
+                1 ->
+                    ?assertMatch([#{result := ok}], Trace);
+                _ ->
+                    [?assertMatch(#{result := ok}, Trace1) || Trace1 <- Trace]
+            end,
+            ok
+        end
+    ),
+    ok.
+
+-define(MISSING_TINYINT_ERROR,
+    "[Microsoft][ODBC Driver 17 for SQL Server][SQL Server]"
+    "Conversion failed when converting the varchar value 'undefined' to data type tinyint. SQLSTATE IS: 22018"
+).
+
+t_missing_data(Config) ->
+    QueryMode = ?config(query_mode, Config),
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    Result = send_message(Config, #{}),
+    case QueryMode of
+        sync ->
+            ?assertMatch(
+                {error, {unrecoverable_error, {invalid_request, ?MISSING_TINYINT_ERROR}}},
+                Result
+            );
+        async ->
+            ?assertMatch(
+                ok, send_message(Config, #{})
+            )
+    end,
+    ok.
+
+t_bad_parameter(Config) ->
+    QueryMode = ?config(query_mode, Config),
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    Result = send_message(Config, #{}),
+    case QueryMode of
+        sync ->
+            ?assertMatch(
+                {error, {unrecoverable_error, {invalid_request, ?MISSING_TINYINT_ERROR}}},
+                Result
+            );
+        async ->
+            ?assertMatch(
+                ok, send_message(Config, #{})
+            )
+    end,
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+common_init(ConfigT) ->
+    Host = os:getenv("SQLSERVER_HOST", "toxiproxy"),
+    Port = list_to_integer(os:getenv("SQLSERVER_PORT", "1433")),
+
+    Config0 = [
+        {sqlserver_host, Host},
+        {sqlserver_port, Port},
+        %% see also for `proxy_name` : $PROJ_ROOT/.ci/docker-compose-file/toxiproxy.json
+        {proxy_name, "sqlserver"},
+        {batch_size, ?BATCH_SIZE}
+        | ConfigT
+    ],
+
+    BridgeType = proplists:get_value(bridge_type, Config0, <<"sqlserver">>),
+    case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
+        true ->
+            % Setup toxiproxy
+            ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
+            ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
+            emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+            % Ensure EE bridge module is loaded
+            _ = application:load(emqx_ee_bridge),
+            _ = emqx_ee_bridge:module_info(),
+            ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
+            emqx_mgmt_api_test_util:init_suite(),
+            % Connect to sqlserver directly
+            % drop old db and table, and then create new ones
+            connect_and_create_db_and_table(Config0),
+            {Name, SQLServerConf} = sqlserver_config(BridgeType, Config0),
+            Config =
+                [
+                    {sqlserver_config, SQLServerConf},
+                    {sqlserver_bridge_type, BridgeType},
+                    {sqlserver_name, Name},
+                    {proxy_host, ProxyHost},
+                    {proxy_port, ProxyPort}
+                    | Config0
+                ],
+            Config;
+        false ->
+            case os:getenv("IS_CI") of
+                "yes" ->
+                    throw(no_sqlserver);
+                _ ->
+                    {skip, no_sqlserver}
+            end
+    end.
+
+sqlserver_config(BridgeType, Config) ->
+    Port = integer_to_list(?config(sqlserver_port, Config)),
+    Server = ?config(sqlserver_host, Config) ++ ":" ++ Port,
+    Name = atom_to_binary(?MODULE),
+    BatchSize = batch_size(Config),
+    QueryMode = ?config(query_mode, Config),
+    ConfigString =
+        io_lib:format(
+            "bridges.~s.~s {\n"
+            "  enable = true\n"
+            "  server = ~p\n"
+            "  database = ~p\n"
+            "  username = ~p\n"
+            "  password = ~p\n"
+            "  sql = ~p\n"
+            "  driver = ~p\n"
+            "  resource_opts = {\n"
+            "    request_timeout = 500ms\n"
+            "    batch_size = ~b\n"
+            "    query_mode = ~s\n"
+            "    worker_pool_size = ~b\n"
+            "  }\n"
+            "}",
+            [
+                BridgeType,
+                Name,
+                Server,
+                ?SQL_SERVER_DATABASE,
+                ?SQL_SERVER_USERNAME,
+                ?SQL_SERVER_PASSWORD,
+                ?SQL_BRIDGE,
+                ?SQL_SERVER_DRIVER,
+                BatchSize,
+                QueryMode,
+                ?WORKER_POOL_SIZE
+            ]
+        ),
+    {Name, parse_and_check(ConfigString, BridgeType, Name)}.
+
+parse_and_check(ConfigString, BridgeType, Name) ->
+    {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
+    hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
+    #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf,
+    Config.
+
+create_bridge(Config) ->
+    create_bridge(Config, _Overrides = #{}).
+
+create_bridge(Config, Overrides) ->
+    BridgeType = ?config(sqlserver_bridge_type, Config),
+    Name = ?config(sqlserver_name, Config),
+    SSConfig0 = ?config(sqlserver_config, Config),
+    SSConfig = emqx_map_lib:deep_merge(SSConfig0, Overrides),
+    emqx_bridge:create(BridgeType, Name, SSConfig).
+
+delete_bridge(Config) ->
+    BridgeType = ?config(sqlserver_bridge_type, Config),
+    Name = ?config(sqlserver_name, Config),
+    emqx_bridge:remove(BridgeType, Name).
+
+create_bridge_http(Params) ->
+    Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
+        {ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])};
+        Error -> Error
+    end.
+
+send_message(Config, Payload) ->
+    Name = ?config(sqlserver_name, Config),
+    BridgeType = ?config(sqlserver_bridge_type, Config),
+    BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
+    emqx_bridge:send_message(BridgeID, Payload).
+
+query_resource(Config, Request) ->
+    Name = ?config(sqlserver_name, Config),
+    BridgeType = ?config(sqlserver_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+    emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
+
+query_resource_async(Config, Request) ->
+    Name = ?config(sqlserver_name, Config),
+    BridgeType = ?config(sqlserver_bridge_type, Config),
+    Ref = alias([reply]),
+    AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+    Return = emqx_resource:query(ResourceID, Request, #{
+        timeout => 500, async_reply_fun => {AsyncReplyFun, []}
+    }),
+    {Return, Ref}.
+
+receive_result(Ref, Timeout) ->
+    receive
+        {result, Ref, Result} ->
+            {ok, Result};
+        {Ref, Result} ->
+            {ok, Result}
+    after Timeout ->
+        timeout
+    end.
+
+connect_direct_sqlserver(Config) ->
+    Opts = [
+        {host, ?config(sqlserver_host, Config)},
+        {port, ?config(sqlserver_port, Config)},
+        {username, ?SQL_SERVER_USERNAME},
+        {password, ?SQL_SERVER_PASSWORD},
+        {driver, ?SQL_SERVER_DRIVER},
+        {pool_size, 8}
+    ],
+    {ok, Con} = connect(Opts),
+    Con.
+
+connect(Options) ->
+    ConnectStr = lists:concat(conn_str(Options, [])),
+    Opts = proplists:get_value(options, Options, []),
+    odbc:connect(ConnectStr, Opts).
+
+disconnect(Ref) ->
+    odbc:disconnect(Ref).
+
+% These funs connect and then stop the sqlserver connection
+connect_and_create_db_and_table(Config) ->
+    ?WITH_CON(begin
+        {updated, undefined} = directly_query(Con, ?SQL_CREATE_DATABASE_IF_NOT_EXISTS),
+        {updated, undefined} = directly_query(Con, ?SQL_CREATE_TABLE_IN_DB_MQTT)
+    end).
+
+connect_and_drop_db(Config) ->
+    ?WITH_CON({updated, undefined} = directly_query(Con, ?SQL_DROP_DB_MQTT)).
+
+connect_and_drop_table(Config) ->
+    ?WITH_CON({updated, undefined} = directly_query(Con, ?SQL_DROP_TABLE)).
+
+connect_and_clear_table(Config) ->
+    ?WITH_CON({updated, _} = directly_query(Con, ?SQL_DELETE)).
+
+connect_and_get_payload(Config) ->
+    ?WITH_CON(
+        {selected, ["payload"], Rows} = directly_query(Con, ?SQL_SELECT)
+    ),
+    Rows.
+
+connect_and_get_count(Config) ->
+    ?WITH_CON(
+        {selected, [[]], [{Count}]} = directly_query(Con, ?SQL_SELECT_COUNT)
+    ),
+    Count.
+
+directly_query(Con, Query) ->
+    directly_query(Con, Query, ?REQUEST_TIMEOUT_MS).
+
+directly_query(Con, Query, Timeout) ->
+    odbc:sql_query(Con, Query, Timeout).
+
+%%--------------------------------------------------------------------
+%% help functions
+%%--------------------------------------------------------------------
+
+batch_size(Config) ->
+    case ?config(enable_batch, Config) of
+        true -> ?BATCH_SIZE;
+        false -> 1
+    end.
+
+conn_str([], Acc) ->
+    %% TODO: for msodbc 18+, we need to add "Encrypt=YES;TrustServerCertificate=YES"
+    %% but havn't tested now
+    %% we should use this for msodbcsql 18+
+    %% lists:join(";", ["Encrypt=YES", "TrustServerCertificate=YES" | Acc]);
+    lists:join(";", Acc);
+conn_str([{driver, Driver} | Opts], Acc) ->
+    conn_str(Opts, ["Driver=" ++ str(Driver) | Acc]);
+conn_str([{host, Host} | Opts], Acc) ->
+    Port = proplists:get_value(port, Opts, "1433"),
+    NOpts = proplists:delete(port, Opts),
+    conn_str(NOpts, ["Server=" ++ str(Host) ++ "," ++ str(Port) | Acc]);
+conn_str([{port, Port} | Opts], Acc) ->
+    Host = proplists:get_value(host, Opts, "localhost"),
+    NOpts = proplists:delete(host, Opts),
+    conn_str(NOpts, ["Server=" ++ str(Host) ++ "," ++ str(Port) | Acc]);
+conn_str([{database, Database} | Opts], Acc) ->
+    conn_str(Opts, ["Database=" ++ str(Database) | Acc]);
+conn_str([{username, Username} | Opts], Acc) ->
+    conn_str(Opts, ["UID=" ++ str(Username) | Acc]);
+conn_str([{password, Password} | Opts], Acc) ->
+    conn_str(Opts, ["PWD=" ++ str(Password) | Acc]);
+conn_str([{_, _} | Opts], Acc) ->
+    conn_str(Opts, Acc).
+
+sent_data(Payload) ->
+    #{
+        payload => to_bin(Payload),
+        id => <<"0005F8F84FFFAFB9F44200000D810002">>,
+        topic => <<"test/topic">>,
+        qos => 0
+    }.
+
+gen_batch_req(Count) when
+    is_integer(Count) andalso Count > 0
+->
+    Vals = [{str(erlang:unique_integer())} || _Seq <- lists:seq(1, Count)],
+    Requests = [{send_message, sent_data(Payload)} || {Payload} <- Vals],
+    {Requests, Vals};
+gen_batch_req(Count) ->
+    ct:pal("Gen batch requests failed with unexpected Count: ~p", [Count]).
+
+str(List) when is_list(List) ->
+    unicode:characters_to_list(List, utf8);
+str(Bin) when is_binary(Bin) ->
+    unicode:characters_to_list(Bin, utf8);
+str(Num) when is_number(Num) ->
+    number_to_list(Num).
+
+number_to_list(Int) when is_integer(Int) ->
+    integer_to_list(Int);
+number_to_list(Float) when is_float(Float) ->
+    float_to_list(Float, [{decimals, 10}, compact]).
+
+to_bin(List) when is_list(List) ->
+    unicode:characters_to_binary(List, utf8);
+to_bin(Bin) when is_binary(Bin) ->
+    Bin.

+ 1 - 0
lib-ee/emqx_ee_connector/docker-ct

@@ -2,3 +2,4 @@ toxiproxy
 influxdb
 clickhouse
 cassandra
+sqlserver

+ 3 - 0
scripts/ct/run.sh

@@ -180,6 +180,9 @@ for dep in ${CT_DEPS}; do
         cassandra)
             FILES+=( '.ci/docker-compose-file/docker-compose-cassandra.yaml' )
             ;;
+        sqlserver)
+            FILES+=( '.ci/docker-compose-file/docker-compose-sqlserver.yaml' )
+            ;;
         *)
             echo "unknown_ct_dependency $dep"
             exit 1