Просмотр исходного кода

feat(bridge): support cassandra bridge

JianBo He 2 лет назад
Родитель
Сommit
c0a216a740

+ 4 - 0
.ci/docker-compose-file/cassandra/Dockerfile

@@ -0,0 +1,4 @@
+ARG CASSANDRA_TAG=3.11.6
+FROM cassandra:${CASSANDRA_TAG}
+COPY cassandra.yaml /etc/cassandra/cassandra.yaml
+CMD ["cassandra", "-f"]

+ 4 - 0
.ci/docker-compose-file/cassandra/Dockerfile-tls

@@ -0,0 +1,4 @@
+ARG CASSANDRA_TAG=3.11.6
+FROM cassandra:${CASSANDRA_TAG}
+COPY cassandra-tls.yaml /etc/cassandra/cassandra.yaml
+CMD ["cassandra", "-f"]

Разница между файлами не показана из-за своего большого размера
+ 1236 - 0
.ci/docker-compose-file/cassandra/cassandra-tls.yaml


Разница между файлами не показана из-за своего большого размера
+ 1237 - 0
.ci/docker-compose-file/cassandra/cassandra.yaml


+ 27 - 0
.ci/docker-compose-file/docker-compose-cassandra-tcp.yaml

@@ -0,0 +1,27 @@
+version: '3.9'
+
+services:
+  cassandra_server:
+    container_name: cassa_tcp
+    build:
+      context: ./cassandra
+      args:
+        CASSANDRA_TAG: ${CASSANDRA_TAG}
+    image: emqx-cassandra
+    restart: always
+    environment:
+      CASSANDRA_BROADCAST_ADDRESS: "1.2.3.4"
+      CASSANDRA_RPC_ADDRESS: "0.0.0.0"
+    ports:
+      - "9042:9042"
+    command:
+      - /bin/bash
+      - -c
+      - |
+        /opt/cassandra/bin/cassandra -f -R > /cassandra.log &
+        /opt/cassandra/bin/cqlsh -e "CREATE KEYSPACE mqtt WITH REPLICATION = { 'class':'SimpleStrategy','replication_factor':1};"
+        while [[ $$? -ne 0 ]];do sleep 5; /opt/cassandra/bin/cqlsh -e "CREATE KEYSPACE mqtt WITH REPLICATION = { 'class':'SimpleStrategy','replication_factor':1};"; done
+        /opt/cassandra/bin/cqlsh -e "describe keyspaces;"
+        tail -f /cassandra.log
+    networks:
+      - emqx_bridge

+ 1 - 0
.ci/docker-compose-file/docker-compose-toxiproxy.yaml

@@ -19,6 +19,7 @@ services:
       - 15433:5433
       - 16041:6041
       - 18000:8000
+      - 19042:9042
     command:
       - "-host=0.0.0.0"
       - "-config=/config/toxiproxy.json"

+ 12 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -53,5 +53,17 @@
     "listen": "0.0.0.0:8000",
     "upstream": "dynamo:8000",
     "enabled": true
+  },
+  {
+    "name": "cassa_tcp",
+    "listen": "0.0.0.0:9042",
+    "upstream": "cassa_tcp:9042",
+    "enabled": true
+  },
+  {
+    "name": "cassa_tls",
+    "listen": "0.0.0.0:9043",
+    "upstream": "cassa_tls:9043",
+    "enabled": false
   }
 ]

+ 2 - 1
apps/emqx_bridge/src/emqx_bridge.erl

@@ -63,7 +63,8 @@
     T == timescale;
     T == matrix;
     T == tdengine;
-    T == dynamo
+    T == dynamo;
+    T == cassandra
 ).
 
 load() ->

+ 1 - 0
lib-ee/emqx_ee_bridge/docker-ct

@@ -10,3 +10,4 @@ pgsql
 tdengine
 clickhouse
 dynamo
+cassandra

+ 72 - 0
lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_cassa.conf

@@ -0,0 +1,72 @@
+emqx_ee_bridge_cassa {
+
+    local_topic {
+        desc {
+            en: """The MQTT topic filter to be forwarded to Cassandra. All MQTT 'PUBLISH' messages with the topic
+matching the local_topic will be forwarded.</br>
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is
+configured, then both the data got from the rule and the MQTT messages that match local_topic
+will be forwarded."""
+            zh: """发送到 'local_topic' 的消息都会转发到 Cassandra。 </br>
+注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。"""
+        }
+        label {
+                en: "Local Topic"
+                zh: "本地 Topic"
+            }
+    }
+
+    sql_template {
+        desc {
+            en: """SQL Template"""
+            zh: """SQL 模板"""
+            }
+        label {
+            en: "SQL Template"
+            zh: "SQL 模板"
+        }
+    }
+    config_enable {
+        desc {
+            en: """Enable or disable this bridge"""
+            zh: """启用/禁用桥接"""
+        }
+        label {
+            en: "Enable Or Disable Bridge"
+            zh: "启用/禁用桥接"
+        }
+        }
+
+    desc_config {
+        desc {
+            en: """Configuration for an Cassandra bridge."""
+            zh: """Cassandra 桥接配置"""
+        }
+        label: {
+            en: "Cassandra Bridge Configuration"
+            zh: "Cassandra 桥接配置"
+        }
+    }
+
+    desc_type {
+        desc {
+            en: """The Bridge Type"""
+            zh: """Bridge 类型"""
+        }
+        label {
+            en: "Bridge Type"
+            zh: "桥接类型"
+        }
+    }
+
+    desc_name {
+        desc {
+            en: """Bridge name."""
+            zh: """桥接名字"""
+        }
+        label {
+            en: "Bridge Name"
+            zh: "桥接名字"
+        }
+    }
+}

+ 1 - 0
lib-ee/emqx_ee_bridge/rebar.config

@@ -3,6 +3,7 @@
        , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.2"}}}
        , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}}
        , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.7"}}}
+       , {ecql, {git, "https://github.com/emqx/ecql.git", {tag, "v0.4.2"}}}
        , {emqx_connector, {path, "../../apps/emqx_connector"}}
        , {emqx_resource, {path, "../../apps/emqx_resource"}}
        , {emqx_bridge, {path, "../../apps/emqx_bridge"}}

+ 14 - 3
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl

@@ -31,7 +31,8 @@ api_schemas(Method) ->
         ref(emqx_ee_bridge_matrix, Method),
         ref(emqx_ee_bridge_tdengine, Method),
         ref(emqx_ee_bridge_clickhouse, Method),
-        ref(emqx_ee_bridge_dynamo, Method)
+        ref(emqx_ee_bridge_dynamo, Method),
+        ref(emqx_ee_bridge_cassa, Method)
     ].
 
 schema_modules() ->
@@ -48,7 +49,8 @@ schema_modules() ->
         emqx_ee_bridge_matrix,
         emqx_ee_bridge_tdengine,
         emqx_ee_bridge_clickhouse,
-        emqx_ee_bridge_dynamo
+        emqx_ee_bridge_dynamo,
+        emqx_ee_bridge_cassa
     ].
 
 examples(Method) ->
@@ -81,7 +83,8 @@ resource_type(timescale) -> emqx_connector_pgsql;
 resource_type(matrix) -> emqx_connector_pgsql;
 resource_type(tdengine) -> emqx_ee_connector_tdengine;
 resource_type(clickhouse) -> emqx_ee_connector_clickhouse;
-resource_type(dynamo) -> emqx_ee_connector_dynamo.
+resource_type(dynamo) -> emqx_ee_connector_dynamo;
+resource_type(cassandra) -> emqx_ee_connector_cassa.
 
 fields(bridges) ->
     [
@@ -132,6 +135,14 @@ fields(bridges) ->
                     desc => <<"Dynamo Bridge Config">>,
                     required => false
                 }
+            )},
+        {cassandra,
+            mk(
+                hoconsc:map(name, ref(emqx_ee_bridge_cassa, "config")),
+                #{
+                    desc => <<"Cassandra Bridge Config">>,
+                    required => false
+                }
             )}
     ] ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ pgsql_structs() ++
         clickhouse_structs().

+ 133 - 0
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_cassa.erl

@@ -0,0 +1,133 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_ee_bridge_cassa).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx_bridge/include/emqx_bridge.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+%% schema examples
+-export([
+    conn_bridge_examples/1,
+    values/2,
+    fields/2
+]).
+
+%% schema
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1
+]).
+
+-define(DEFAULT_SQL, <<
+    "insert into mqtt_msg(topic, msgid, sender, qos, payload, arrived, retain) "
+    "values (${topic}, ${id}, ${clientid}, ${qos}, ${payload}, ${timestamp}, ${flags.retain})"
+>>).
+
+%%--------------------------------------------------------------------
+%% schema examples
+
+conn_bridge_examples(Method) ->
+    [
+        #{
+            <<"cassa">> => #{
+                summary => <<"Cassandra Bridge">>,
+                value => values(Method, cassandra)
+            }
+        }
+    ].
+
+values(get, Type) ->
+    maps:merge(values(post, Type), ?METRICS_EXAMPLE);
+values(post, Type) ->
+    #{
+        enable => true,
+        type => Type,
+        name => <<"foo">>,
+        servers => <<"127.0.0.1:9042">>,
+        keyspace => <<"mqtt">>,
+        pool_size => 8,
+        username => <<"root">>,
+        password => <<"public">>,
+        sql => ?DEFAULT_SQL,
+        local_topic => <<"local/topic/#">>,
+        resource_opts => #{
+            worker_pool_size => 8,
+            health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
+            auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
+            batch_size => ?DEFAULT_BATCH_SIZE,
+            batch_time => ?DEFAULT_BATCH_TIME,
+            query_mode => sync,
+            max_queue_bytes => ?DEFAULT_QUEUE_SIZE
+        }
+    };
+values(put, Type) ->
+    values(post, Type).
+
+%%--------------------------------------------------------------------
+%% schema
+
+namespace() -> "bridge_cassa".
+
+roots() -> [].
+
+fields("config") ->
+    [
+        {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
+        {sql,
+            mk(
+                binary(),
+                #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
+            )},
+        {local_topic,
+            mk(
+                binary(),
+                #{desc => ?DESC("local_topic"), default => undefined}
+            )},
+        {resource_opts,
+            mk(
+                ref(?MODULE, "creation_opts"),
+                #{
+                    required => false,
+                    default => #{},
+                    desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
+                }
+            )}
+    ] ++
+        (emqx_ee_connector_cassa:fields(config) --
+            emqx_connector_schema_lib:prepare_statement_fields());
+fields("creation_opts") ->
+    emqx_resource_schema:fields("creation_opts_sync_only");
+fields("post") ->
+    fields("post", cassa);
+fields("put") ->
+    fields("config");
+fields("get") ->
+    emqx_bridge_schema:status_fields() ++ fields("post").
+
+fields("post", Type) ->
+    [type_field(Type), name_field() | fields("config")].
+
+desc("config") ->
+    ?DESC("desc_config");
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+    ["Configuration for Cassandra using `", string:to_upper(Method), "` method."];
+desc("creation_opts" = Name) ->
+    emqx_resource_schema:desc(Name);
+desc(_) ->
+    undefined.
+
+%%--------------------------------------------------------------------
+%% utils
+
+type_field(Type) ->
+    {type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
+
+name_field() ->
+    {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.

+ 540 - 0
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl

@@ -0,0 +1,540 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ee_bridge_cassa_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+% SQL definitions
+-define(SQL_BRIDGE,
+    "insert into mqtt_msg_test(topic, payload, arrived) "
+    "values (${topic}, ${payload}, ${timestamp})"
+).
+-define(SQL_CREATE_TABLE,
+    ""
+    "\n"
+    "CREATE TABLE mqtt.mqtt_msg_test (\n"
+    "    topic text,\n"
+    "    payload text,\n"
+    "    arrived timestamp,\n"
+    "    PRIMARY KEY (topic)\n"
+    ");\n"
+    ""
+).
+-define(SQL_DROP_TABLE, "DROP TABLE mqtt_msg_test").
+-define(SQL_DELETE, "TRUNCATE mqtt_msg_test").
+-define(SQL_SELECT, "SELECT payload FROM mqtt_msg_test").
+
+% DB defaults
+-define(CASSA_KEYSPACE, "mqtt").
+-define(CASSA_USERNAME, "root").
+-define(CASSA_PASSWORD, "public").
+-define(BATCH_SIZE, 10).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    [
+        {group, tcp},
+        {group, tls}
+    ].
+
+groups() ->
+    TCs = emqx_common_test_helpers:all(?MODULE),
+    NonBatchCases = [t_write_timeout],
+    [
+        {tcp, [
+            %{group, with_batch},
+            {group, without_batch}
+        ]},
+        {tls, [
+            %{group, with_batch},
+            {group, without_batch}
+        ]},
+        {with_batch, TCs -- NonBatchCases},
+        {without_batch, TCs}
+    ].
+
+init_per_group(tcp, Config) ->
+    Host = os:getenv("CASSA_TCP_HOST", "toxiproxy"),
+    Port = list_to_integer(os:getenv("CASSA_TCP_PORT", "9042")),
+    [
+        {cassa_host, Host},
+        {cassa_port, Port},
+        {enable_tls, false},
+        {query_mode, sync},
+        {proxy_name, "cassa_tcp"}
+        | Config
+    ];
+init_per_group(tls, Config) ->
+    Host = os:getenv("CASSA_TLS_HOST", "toxiproxy"),
+    Port = list_to_integer(os:getenv("CASSA_TLS_PORT", "9043")),
+    [
+        {cassa_host, Host},
+        {cassa_port, Port},
+        {enable_tls, true},
+        {query_mode, sync},
+        {proxy_name, "cassa_tls"}
+        | Config
+    ];
+init_per_group(with_batch, Config0) ->
+    Config = [{enable_batch, true} | Config0],
+    common_init(Config);
+init_per_group(without_batch, Config0) ->
+    Config = [{enable_batch, false} | Config0],
+    common_init(Config);
+init_per_group(_Group, Config) ->
+    Config.
+
+end_per_group(Group, Config) when
+    Group == without_batch; Group == without_batch
+->
+    connect_and_drop_table(Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    ok;
+end_per_group(_Group, _Config) ->
+    ok.
+
+init_per_suite(Config) ->
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_mgmt_api_test_util:end_suite(),
+    ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
+    ok.
+
+init_per_testcase(_Testcase, Config) ->
+    connect_and_clear_table(Config),
+    delete_bridge(Config),
+    Config.
+
+end_per_testcase(_Testcase, Config) ->
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    connect_and_clear_table(Config),
+    ok = snabbkaffe:stop(),
+    delete_bridge(Config),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+common_init(Config0) ->
+    BridgeType = proplists:get_value(bridge_type, Config0, <<"cassandra">>),
+    Host = ?config(cassa_host, Config0),
+    Port = ?config(cassa_port, Config0),
+    case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
+        true ->
+            % Setup toxiproxy
+            ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
+            ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
+            emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+            % Ensure EE bridge module is loaded
+            _ = application:load(emqx_ee_bridge),
+            _ = emqx_ee_bridge:module_info(),
+            ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
+            emqx_mgmt_api_test_util:init_suite(),
+            % Connect to cassnadra directly and create the table
+            connect_and_create_table(Config0),
+            {Name, CassaConf} = cassa_config(BridgeType, Config0),
+            Config =
+                [
+                    {cassa_config, CassaConf},
+                    {cassa_bridge_type, BridgeType},
+                    {cassa_name, Name},
+                    {proxy_host, ProxyHost},
+                    {proxy_port, ProxyPort}
+                    | Config0
+                ],
+            Config;
+        false ->
+            case os:getenv("IS_CI") of
+                "yes" ->
+                    throw(no_cassandra);
+                _ ->
+                    {skip, no_cassandra}
+            end
+    end.
+
+cassa_config(BridgeType, Config) ->
+    Port = integer_to_list(?config(cassa_port, Config)),
+    Server = ?config(cassa_host, Config) ++ ":" ++ Port,
+    Name = atom_to_binary(?MODULE),
+    BatchSize =
+        case ?config(enable_batch, Config) of
+            true -> ?BATCH_SIZE;
+            false -> 1
+        end,
+    QueryMode = ?config(query_mode, Config),
+    TlsEnabled = ?config(enable_tls, Config),
+    ConfigString =
+        io_lib:format(
+            "bridges.~s.~s {\n"
+            "  enable = true\n"
+            "  servers = ~p\n"
+            "  keyspace = ~p\n"
+            "  username = ~p\n"
+            "  password = ~p\n"
+            "  sql = ~p\n"
+            "  resource_opts = {\n"
+            "    request_timeout = 500ms\n"
+            "    batch_size = ~b\n"
+            "    query_mode = ~s\n"
+            "  }\n"
+            "  ssl = {\n"
+            "    enable = ~w\n"
+            "  }\n"
+            "}",
+            [
+                BridgeType,
+                Name,
+                Server,
+                ?CASSA_KEYSPACE,
+                ?CASSA_USERNAME,
+                ?CASSA_PASSWORD,
+                ?SQL_BRIDGE,
+                BatchSize,
+                QueryMode,
+                TlsEnabled
+            ]
+        ),
+    {Name, parse_and_check(ConfigString, BridgeType, Name)}.
+
+parse_and_check(ConfigString, BridgeType, Name) ->
+    {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
+    hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
+    #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf,
+    Config.
+
+create_bridge(Config) ->
+    BridgeType = ?config(cassa_bridge_type, Config),
+    Name = ?config(cassa_name, Config),
+    PGConfig = ?config(cassa_config, Config),
+    emqx_bridge:create(BridgeType, Name, PGConfig).
+
+delete_bridge(Config) ->
+    BridgeType = ?config(cassa_bridge_type, Config),
+    Name = ?config(cassa_name, Config),
+    emqx_bridge:remove(BridgeType, Name).
+
+create_bridge_http(Params) ->
+    Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
+        {ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])};
+        Error -> Error
+    end.
+
+send_message(Config, Payload) ->
+    Name = ?config(cassa_name, Config),
+    BridgeType = ?config(cassa_bridge_type, Config),
+    BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
+    emqx_bridge:send_message(BridgeID, Payload).
+
+query_resource(Config, Request) ->
+    Name = ?config(cassa_name, Config),
+    BridgeType = ?config(cassa_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+    emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
+
+connect_direct_cassa(Config) ->
+    Opts = #{
+        host => ?config(cassa_host, Config),
+        port => ?config(cassa_port, Config),
+        username => ?CASSA_USERNAME,
+        password => ?CASSA_PASSWORD,
+        keyspace => ?CASSA_KEYSPACE
+    },
+
+    SslOpts =
+        case ?config(enable_tls, Config) of
+            true ->
+                Opts#{
+                    ssl => true,
+                    ssl_opts => emqx_tls_lib:to_client_opts(#{enable => true})
+                };
+            false ->
+                Opts
+        end,
+    {ok, Con} = ecql:connect(maps:to_list(SslOpts)),
+    Con.
+
+% These funs connect and then stop the cassandra connection
+connect_and_create_table(Config) ->
+    Con = connect_direct_cassa(Config),
+    {ok, _} = ecql:query(Con, ?SQL_CREATE_TABLE),
+    ok = ecql:close(Con).
+
+connect_and_drop_table(Config) ->
+    Con = connect_direct_cassa(Config),
+    {ok, _} = ecql:query(Con, ?SQL_DROP_TABLE),
+    ok = ecql:close(Con).
+
+connect_and_clear_table(Config) ->
+    Con = connect_direct_cassa(Config),
+    ok = ecql:query(Con, ?SQL_DELETE),
+    ok = ecql:close(Con).
+
+connect_and_get_payload(Config) ->
+    Con = connect_direct_cassa(Config),
+    {ok, {_Keyspace, _ColsSpec, [[Result]]}} = ecql:query(Con, ?SQL_SELECT),
+    ok = ecql:close(Con),
+    Result.
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_setup_via_config_and_publish(Config) ->
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    Val = integer_to_binary(erlang:unique_integer()),
+    SentData = #{
+        topic => atom_to_binary(?FUNCTION_NAME),
+        payload => Val,
+        timestamp => 1668602148000
+    },
+    ?check_trace(
+        begin
+            ?wait_async_action(
+                ?assertEqual(ok, send_message(Config, SentData)),
+                #{?snk_kind := cassandra_connector_query_return},
+                10_000
+            ),
+            ?assertMatch(
+                Val,
+                connect_and_get_payload(Config)
+            ),
+            ok
+        end,
+        fun(Trace0) ->
+            Trace = ?of_kind(cassandra_connector_query_return, Trace0),
+            case ?config(enable_batch, Config) of
+                true ->
+                    ?assertMatch([#{result := {_, [ok]}}], Trace);
+                false ->
+                    ?assertMatch([#{result := ok}], Trace)
+            end,
+            ok
+        end
+    ),
+    ok.
+
+t_setup_via_http_api_and_publish(Config) ->
+    BridgeType = ?config(cassa_bridge_type, Config),
+    Name = ?config(cassa_name, Config),
+    PgsqlConfig0 = ?config(cassa_config, Config),
+    PgsqlConfig = PgsqlConfig0#{
+        <<"name">> => Name,
+        <<"type">> => BridgeType
+    },
+    ?assertMatch(
+        {ok, _},
+        create_bridge_http(PgsqlConfig)
+    ),
+    Val = integer_to_binary(erlang:unique_integer()),
+    SentData = #{
+        topic => atom_to_binary(?FUNCTION_NAME),
+        payload => Val,
+        timestamp => 1668602148000
+    },
+    ?check_trace(
+        begin
+            ?wait_async_action(
+                ?assertEqual(ok, send_message(Config, SentData)),
+                #{?snk_kind := cassandra_connector_query_return},
+                10_000
+            ),
+            ?assertMatch(
+                Val,
+                connect_and_get_payload(Config)
+            ),
+            ok
+        end,
+        fun(Trace0) ->
+            Trace = ?of_kind(cassandra_connector_query_return, Trace0),
+            case ?config(enable_batch, Config) of
+                true ->
+                    ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
+                false ->
+                    ?assertMatch([#{result := ok}], Trace)
+            end,
+            ok
+        end
+    ),
+    ok.
+
+t_get_status(Config) ->
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyName = ?config(proxy_name, Config),
+
+    Name = ?config(cassa_name, Config),
+    BridgeType = ?config(cassa_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+
+    ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)),
+    emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+        ?assertMatch(
+            {ok, Status} when Status =:= disconnected orelse Status =:= connecting,
+            emqx_resource_manager:health_check(ResourceID)
+        )
+    end),
+    ok.
+
+t_create_disconnected(Config) ->
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyName = ?config(proxy_name, Config),
+    ?check_trace(
+        emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+            ?assertMatch({ok, _}, create_bridge(Config))
+        end),
+        fun(Trace) ->
+            ?assertMatch(
+                [#{error := {start_pool_failed, _, _}}],
+                ?of_kind(cassandra_connector_start_failed, Trace)
+            ),
+            ok
+        end
+    ),
+    ok.
+
+t_write_failure(Config) ->
+    ProxyName = ?config(proxy_name, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    QueryMode = ?config(query_mode, Config),
+    {ok, _} = create_bridge(Config),
+    Val = integer_to_binary(erlang:unique_integer()),
+    SentData = #{
+        topic => atom_to_binary(?FUNCTION_NAME),
+        payload => Val,
+        timestamp => 1668602148000
+    },
+    ?check_trace(
+        emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+            {_, {ok, _}} =
+                ?wait_async_action(
+                    case QueryMode of
+                        sync ->
+                            ?assertMatch({error, _}, send_message(Config, SentData));
+                        async ->
+                            send_message(Config, SentData)
+                    end,
+                    #{?snk_kind := buffer_worker_flush_nack},
+                    1_000
+                )
+        end),
+        fun(Trace0) ->
+            ct:pal("trace: ~p", [Trace0]),
+            Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
+            ?assertMatch([#{result := {error, _}} | _], Trace),
+            [#{result := {error, Error}} | _] = Trace,
+            case Error of
+                {resource_error, _} ->
+                    ok;
+                {recoverable_error, disconnected} ->
+                    ok;
+                _ ->
+                    ct:fail("unexpected error: ~p", [Error])
+            end
+        end
+    ),
+    ok.
+
+%% This test doesn't work with batch enabled since it is not possible
+%% to set the timeout directly for batch queries
+%%
+%% XXX: parameter with request timeout is not supported yet.
+%%
+%t_write_timeout(Config) ->
+%    ProxyName = ?config(proxy_name, Config),
+%    ProxyPort = ?config(proxy_port, Config),
+%    ProxyHost = ?config(proxy_host, Config),
+%    {ok, _} = create_bridge(Config),
+%    Val = integer_to_binary(erlang:unique_integer()),
+%    SentData = #{payload => Val, timestamp => 1668602148000},
+%    Timeout = 1000,
+%    emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
+%        ?assertMatch(
+%            {error, {resource_error, #{reason := timeout}}},
+%            query_resource(Config, {send_message, SentData, [], Timeout})
+%        )
+%    end),
+%    ok.
+
+t_simple_sql_query(Config) ->
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    Request = {query, <<"SELECT count(1) AS T FROM system.local">>},
+    Result = query_resource(Config, Request),
+    case ?config(enable_batch, Config) of
+        true -> ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
+        false -> ?assertMatch({ok, {<<"system.local">>, _, [[1]]}}, Result)
+    end,
+    ok.
+
+t_missing_data(Config) ->
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    %% emqx_ee_connector_cassa will send missed data as a `null` atom
+    %% to ecql driver
+    Result = send_message(Config, #{}),
+    ?assertMatch(
+        %% TODO: match error msgs
+        {error, {unrecoverable_error, {8704, <<"Expected 8 or 0 byte long for date (4)">>}}},
+        Result
+    ),
+    ok.
+
+t_bad_sql_parameter(Config) ->
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    Request = {query, <<"">>, [bad_parameter]},
+    Result = query_resource(Config, Request),
+    case ?config(enable_batch, Config) of
+        true ->
+            ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
+        false ->
+            ?assertMatch(
+                {error, {unrecoverable_error, _}}, Result
+            )
+    end,
+    ok.
+
+t_nasty_sql_string(Config) ->
+    ?assertMatch({ok, _}, create_bridge(Config)),
+    Payload = list_to_binary(lists:seq(1, 127)),
+    Message = #{
+        topic => atom_to_binary(?FUNCTION_NAME),
+        payload => Payload,
+        timestamp => erlang:system_time(millisecond)
+    },
+    %% XXX: why ok instead of {ok, AffectedLines}?
+    ?assertEqual(ok, send_message(Config, Message)),
+    ?assertEqual(Payload, connect_and_get_payload(Config)).

+ 28 - 0
lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_cassa.conf

@@ -0,0 +1,28 @@
+emqx_ee_connector_cassa {
+
+    servers {
+        desc {
+          en: """The IPv4 or IPv6 address or the hostname to connect to.<br/>
+A host entry has the following form: `Host[:Port][,Host2:Port]`.<br/>
+The Cassandra default port 9042 is used if `[:Port]` is not specified."""
+          zh: """将要连接的 IPv4 或 IPv6 地址,或者主机名。<br/>
+主机名具有以下形式:`Host[:Port][,Host2:Port]`。<br/>
+如果未指定 `[:Port]`,则使用 Cassandra 默认端口 9042。"""
+        }
+        label: {
+              en: "Servers"
+              zh: "Servers"
+            }
+    }
+
+    keyspace {
+        desc {
+          en: """Keyspace name to connect to."""
+          zh: """要连接到的 Keyspace 名称。"""
+        }
+        label: {
+              en: "Keyspace"
+              zh: "Keyspace"
+            }
+    }
+}

+ 1 - 0
lib-ee/emqx_ee_connector/include/emqx_ee_connector.hrl

@@ -3,3 +3,4 @@
 %%-------------------------------------------------------------------
 
 -define(INFLUXDB_DEFAULT_PORT, 8086).
+-define(CASSANDRA_DEFAULT_PORT, 9042).

+ 2 - 1
lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src

@@ -11,7 +11,8 @@
         wolff,
         brod,
         clickhouse,
-        erlcloud
+        erlcloud,
+        ecql
     ]},
     {env, []},
     {modules, []},

+ 415 - 0
lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl

@@ -0,0 +1,415 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_ee_connector_cassa).
+
+-behaviour(emqx_resource).
+
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+-include_lib("emqx_ee_connector/include/emqx_ee_connector.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+%% schema
+-export([roots/0, fields/1]).
+
+%% callbacks of behaviour emqx_resource
+-export([
+    callback_mode/0,
+    on_start/2,
+    on_stop/2,
+    on_query/3,
+    %% TODO: now_supported_now
+    %%on_batch_query/3,
+    on_get_status/2
+]).
+
+%% callbacks of ecpool
+-export([
+    connect/1,
+    prepare_sql_to_conn/2
+]).
+
+%% callbacks for query executing
+-export([query/3, prepared_query/3]).
+
+-export([do_get_status/1]).
+
+-type prepares() :: #{atom() => binary()}.
+-type params_tokens() :: #{atom() => list()}.
+
+-type state() ::
+    #{
+        poolname := atom(),
+        prepare_sql := prepares(),
+        params_tokens := params_tokens(),
+        %% returned by ecql:prepare/2
+        prepare_statement := binary()
+    }.
+
+-define(DEFAULT_SERVER_OPTION, #{default_port => ?CASSANDRA_DEFAULT_PORT}).
+
+%%--------------------------------------------------------------------
+%% schema
+
+roots() ->
+    [{config, #{type => hoconsc:ref(?MODULE, config)}}].
+
+fields(config) ->
+    cassandra_db_fields() ++
+        emqx_connector_schema_lib:ssl_fields() ++
+        emqx_connector_schema_lib:prepare_statement_fields().
+
+cassandra_db_fields() ->
+    [
+        {servers, servers()},
+        {keyspace, fun keyspace/1},
+        {pool_size, fun emqx_connector_schema_lib:pool_size/1},
+        {username, fun emqx_connector_schema_lib:username/1},
+        {password, fun emqx_connector_schema_lib:password/1},
+        {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
+    ].
+
+servers() ->
+    Meta = #{desc => ?DESC("servers")},
+    emqx_schema:servers_sc(Meta, ?DEFAULT_SERVER_OPTION).
+
+keyspace(type) -> binary();
+keyspace(desc) -> ?DESC("keyspace");
+keyspace(required) -> true;
+keyspace(_) -> undefined.
+
+%%--------------------------------------------------------------------
+%% callbacks for emqx_resource
+
+callback_mode() -> always_sync.
+
+-spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}.
+on_start(
+    InstId,
+    #{
+        servers := Servers,
+        keyspace := Keyspace,
+        username := Username,
+        pool_size := PoolSize,
+        ssl := SSL
+    } = Config
+) ->
+    {ok, _} = application:ensure_all_started(ecpool),
+    {ok, _} = application:ensure_all_started(ecql),
+
+    ?SLOG(info, #{
+        msg => "starting_cassandra_connector",
+        connector => InstId,
+        config => emqx_misc:redact(Config)
+    }),
+
+    Options = [
+        {nodes, emqx_schema:parse_servers(Servers, ?DEFAULT_SERVER_OPTION)},
+        {username, Username},
+        {password, emqx_secret:wrap(maps:get(password, Config, ""))},
+        {keyspace, Keyspace},
+        {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
+        {pool_size, PoolSize}
+    ],
+
+    %% FIXME: how to set tls options
+    SslOpts =
+        case maps:get(enable, SSL) of
+            true ->
+                [
+                    %% note: type defined at ecql:option/0
+                    {ssl, emqx_tls_lib:to_client_opts(SSL)}
+                ];
+            false ->
+                []
+        end,
+
+    PoolName = emqx_plugin_libs_pool:pool_name(InstId),
+    Prepares = parse_prepare_sql(Config),
+    InitState = #{poolname => PoolName, prepare_statement => #{}},
+    State = maps:merge(InitState, Prepares),
+    case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
+        ok ->
+            {ok, init_prepare(State)};
+        {error, Reason} ->
+            ?tp(
+                cassandra_connector_start_failed,
+                #{error => Reason}
+            ),
+            {error, Reason}
+    end.
+
+on_stop(InstId, #{poolname := PoolName}) ->
+    ?SLOG(info, #{
+        msg => "stopping_cassandra_connector",
+        connector => InstId
+    }),
+    emqx_plugin_libs_pool:stop_pool(PoolName).
+
+-type request() ::
+    % emqx_bridge.erl
+    {send_message, Params :: map()}
+    % common query
+    | {query, SQL :: binary()}
+    | {query, SQL :: binary(), Params :: map()}.
+
+-spec on_query(
+    emqx_resource:resource_id(),
+    request(),
+    state()
+) -> emqx_resource:query_result().
+on_query(
+    InstId,
+    Request,
+    #{poolname := PoolName} = State
+) ->
+    {Type, PreparedKeyOrSQL, Params} = parse_request_to_sql(Request),
+    ?tp(
+        debug,
+        cassandra_connector_received_sql_query,
+        #{
+            connector => InstId,
+            type => Type,
+            params => Params,
+            prepared_key_or_sql => PreparedKeyOrSQL,
+            state => State
+        }
+    ),
+    {PreparedKeyOrSQL1, Data} = proc_sql_params(Type, PreparedKeyOrSQL, Params, State),
+    Res = exec_sql_query(InstId, PoolName, Type, PreparedKeyOrSQL1, Data),
+    handle_result(Res).
+
+parse_request_to_sql({send_message, Params}) ->
+    {prepared_query, _Key = send_message, Params};
+parse_request_to_sql({query, SQL}) ->
+    parse_request_to_sql({query, SQL, #{}});
+parse_request_to_sql({query, SQL, Params}) ->
+    {query, SQL, Params}.
+
+proc_sql_params(
+    prepared_query,
+    PreparedKey0,
+    Params,
+    #{prepare_statement := Prepares, params_tokens := ParamsTokens}
+) ->
+    PreparedKey = maps:get(PreparedKey0, Prepares),
+    Tokens = maps:get(PreparedKey0, ParamsTokens),
+    {PreparedKey, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))};
+proc_sql_params(query, SQL, Params, _State) ->
+    {SQL1, Tokens} = emqx_plugin_libs_rule:preproc_sql(SQL, '?'),
+    {SQL1, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))}.
+
+exec_sql_query(InstId, PoolName, Type, PreparedKey, Data) when
+    Type == query; Type == prepared_query
+->
+    case ecpool:pick_and_do(PoolName, {?MODULE, Type, [PreparedKey, Data]}, no_handover) of
+        {error, Reason} = Result ->
+            ?tp(
+                error,
+                cassandra_connector_query_return,
+                #{connector => InstId, error => Reason}
+            ),
+            Result;
+        Result ->
+            ?tp(debug, cassandra_connector_query_return, #{result => Result}),
+            Result
+    end.
+
+on_get_status(_InstId, #{poolname := Pool} = State) ->
+    case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
+        true ->
+            case do_check_prepares(State) of
+                ok ->
+                    connected;
+                {ok, NState} ->
+                    %% return new state with prepared statements
+                    {connected, NState};
+                false ->
+                    %% do not log error, it is logged in prepare_sql_to_conn
+                    connecting
+            end;
+        false ->
+            connecting
+    end.
+
+do_get_status(Conn) ->
+    ok == element(1, ecql:query(Conn, "SELECT count(1) AS T FROM system.local")).
+
+do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) ->
+    ok;
+do_check_prepares(State = #{poolname := PoolName, prepare_sql := {error, Prepares}}) ->
+    %% retry to prepare
+    case prepare_sql(Prepares, PoolName) of
+        {ok, Sts} ->
+            %% remove the error
+            {ok, State#{prepare_sql => Prepares, prepare_statement := Sts}};
+        _Error ->
+            false
+    end.
+
+%%--------------------------------------------------------------------
+%% callbacks query
+
+query(Conn, SQL, Params) ->
+    ecql:query(Conn, SQL, Params).
+
+prepared_query(Conn, PreparedKey, Params) ->
+    ecql:execute(Conn, PreparedKey, Params).
+
+%%--------------------------------------------------------------------
+%% callbacks for ecpool
+
+connect(Opts) ->
+    case ecql:connect(conn_opts(Opts)) of
+        {ok, _Conn} = Ok ->
+            Ok;
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+conn_opts(Opts) ->
+    conn_opts(Opts, []).
+
+conn_opts([], Acc) ->
+    Acc;
+conn_opts([{password, Password} | Opts], Acc) ->
+    conn_opts(Opts, [{password, emqx_secret:unwrap(Password)} | Acc]);
+conn_opts([Opt | Opts], Acc) ->
+    conn_opts(Opts, [Opt | Acc]).
+
+%%--------------------------------------------------------------------
+%% prepare
+
+%% XXX: hardcode
+%% note: the `sql` param is passed by emqx_ee_bridge_cassa
+parse_prepare_sql(#{sql := SQL}) ->
+    parse_prepare_sql([{send_message, SQL}], #{}, #{});
+parse_prepare_sql(_) ->
+    #{prepare_sql => #{}, params_tokens => #{}}.
+
+parse_prepare_sql([{Key, H} | T], Prepares, Tokens) ->
+    {PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, '?'),
+    parse_prepare_sql(
+        T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}
+    );
+parse_prepare_sql([], Prepares, Tokens) ->
+    #{
+        prepare_sql => Prepares,
+        params_tokens => Tokens
+    }.
+
+init_prepare(State = #{prepare_sql := Prepares, poolname := PoolName}) ->
+    case maps:size(Prepares) of
+        0 ->
+            State;
+        _ ->
+            case prepare_sql(Prepares, PoolName) of
+                {ok, Sts} ->
+                    State#{prepare_statement := Sts};
+                Error ->
+                    ?tp(
+                        error,
+                        cassandra_prepare_sql_failed,
+                        #{prepares => Prepares, reason => Error}
+                    ),
+                    %% mark the prepare_sqlas failed
+                    State#{prepare_sql => {error, Prepares}}
+            end
+    end.
+
+prepare_sql(Prepares, PoolName) when is_map(Prepares) ->
+    prepare_sql(maps:to_list(Prepares), PoolName);
+prepare_sql(Prepares, PoolName) ->
+    case do_prepare_sql(Prepares, PoolName) of
+        {ok, _Sts} = Ok ->
+            %% prepare for reconnect
+            ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}),
+            Ok;
+        Error ->
+            Error
+    end.
+
+do_prepare_sql(Prepares, PoolName) ->
+    do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, #{}).
+
+do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) ->
+    {ok, Conn} = ecpool_worker:client(Worker),
+    case prepare_sql_to_conn(Conn, Prepares) of
+        {ok, Sts} ->
+            do_prepare_sql(T, Prepares, PoolName, Sts);
+        Error ->
+            Error
+    end;
+do_prepare_sql([], _Prepares, _PoolName, LastSts) ->
+    {ok, LastSts}.
+
+prepare_sql_to_conn(Conn, Prepares) ->
+    prepare_sql_to_conn(Conn, Prepares, #{}).
+
+prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements};
+prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) ->
+    ?SLOG(info, #{msg => "cassandra_prepare_sql", name => Key, prepare_sql => SQL}),
+    case ecql:prepare(Conn, Key, SQL) of
+        {ok, Statement} ->
+            prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => Statement});
+        {error, Error} = Other ->
+            ?SLOG(error, #{
+                msg => "cassandra_prepare_sql_failed",
+                worker_pid => Conn,
+                name => Key,
+                prepare_sql => SQL,
+                error => Error
+            }),
+            Other
+    end.
+
+handle_result({error, disconnected}) ->
+    {error, {recoverable_error, disconnected}};
+handle_result({error, Error}) ->
+    {error, {unrecoverable_error, Error}};
+handle_result(Res) ->
+    Res.
+
+%%--------------------------------------------------------------------
+%% utils
+
+%% see ecql driver requirements
+assign_type_for_params(Params) ->
+    assign_type_for_params(Params, []).
+
+assign_type_for_params([], Acc) ->
+    lists:reverse(Acc);
+assign_type_for_params([Param | More], Acc) ->
+    assign_type_for_params(More, [may_assign_type(Param) | Acc]).
+
+may_assign_type(V) when is_boolean(V) ->
+    {int,
+        if
+            V -> 1;
+            true -> 0
+        end};
+may_assign_type(V) when is_binary(V); is_list(V); is_atom(V) -> V;
+may_assign_type(V) when is_integer(V) ->
+    %% The max value of signed int(4) is 2147483647
+    case V > 2147483647 orelse V < -2147483647 of
+        true -> {bigint, V};
+        false -> {int, V}
+    end;
+may_assign_type(V) when is_float(V) -> {double, V};
+may_assign_type(V) ->
+    V.

+ 192 - 0
lib-ee/emqx_ee_connector/test/emqx_ee_connector_cassa_SUITE.erl

@@ -0,0 +1,192 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ee_connector_cassa_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include("emqx_connector.hrl").
+-include("emqx_ee_connector.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("stdlib/include/assert.hrl").
+
+-define(CASSANDRA_HOST, "127.0.0.1").
+-define(CASSANDRA_RESOURCE_MOD, emqx_ee_connector_cassa).
+
+%% This test SUITE requires a running cassandra instance. If you don't want to
+%% bring up the whole CI infrastuctucture with the `scripts/ct/run.sh` script
+%% you can create a cassandra instance with the following command (execute it
+%% from root of the EMQX directory.). You also need to set ?CASSANDRA_HOST and
+%% ?CASSANDRA_PORT to appropriate values.
+%%
+%% sudo docker run --rm -d --name cassandra --network host cassandra:3.11.14
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+groups() ->
+    [].
+
+cassandra_servers() ->
+    emqx_schema:parse_servers(
+        iolist_to_binary([?CASSANDRA_HOST, ":", erlang:integer_to_list(?CASSANDRA_DEFAULT_PORT)]),
+        #{default_port => ?CASSANDRA_DEFAULT_PORT}
+    ).
+
+init_per_suite(Config) ->
+    case
+        emqx_common_test_helpers:is_tcp_server_available(?CASSANDRA_HOST, ?CASSANDRA_DEFAULT_PORT)
+    of
+        true ->
+            ok = emqx_common_test_helpers:start_apps([emqx_conf]),
+            ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
+            {ok, _} = application:ensure_all_started(emqx_connector),
+            {ok, _} = application:ensure_all_started(emqx_ee_connector),
+            %% keyspace `mqtt` must be created in advance
+            {ok, Conn} =
+                ecql:connect([
+                    {nodes, cassandra_servers()},
+                    {username, <<"admin">>},
+                    {password, <<"public">>},
+                    {keyspace, "mqtt"}
+                ]),
+            ecql:close(Conn),
+            Config;
+        false ->
+            case os:getenv("IS_CI") of
+                "yes" ->
+                    throw(no_cassandra);
+                _ ->
+                    {skip, no_cassandra}
+            end
+    end.
+
+end_per_suite(_Config) ->
+    ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
+    ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
+    _ = application:stop(emqx_connector).
+
+init_per_testcase(_, Config) ->
+    Config.
+
+end_per_testcase(_, _Config) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% cases
+%%--------------------------------------------------------------------
+
+t_lifecycle(_Config) ->
+    perform_lifecycle_check(
+        <<"emqx_connector_cassandra_SUITE">>,
+        cassandra_config()
+    ).
+
+show(X) ->
+    erlang:display(X),
+    X.
+
+show(Label, What) ->
+    erlang:display({Label, What}),
+    What.
+
+perform_lifecycle_check(PoolName, InitialConfig) ->
+    {ok, #{config := CheckedConfig}} =
+        emqx_resource:check_config(?CASSANDRA_RESOURCE_MOD, InitialConfig),
+    {ok, #{
+        state := #{poolname := ReturnedPoolName} = State,
+        status := InitialStatus
+    }} =
+        emqx_resource:create_local(
+            PoolName,
+            ?CONNECTOR_RESOURCE_GROUP,
+            ?CASSANDRA_RESOURCE_MOD,
+            CheckedConfig,
+            #{}
+        ),
+    ?assertEqual(InitialStatus, connected),
+    % Instance should match the state and status of the just started resource
+    {ok, ?CONNECTOR_RESOURCE_GROUP, #{
+        state := State,
+        status := InitialStatus
+    }} =
+        emqx_resource:get_instance(PoolName),
+    ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
+    % % Perform query as further check that the resource is working as expected
+    (fun() ->
+        erlang:display({pool_name, PoolName}),
+        QueryNoParamsResWrapper = emqx_resource:query(PoolName, test_query_no_params()),
+        ?assertMatch({ok, _}, QueryNoParamsResWrapper)
+    end)(),
+    ?assertEqual(ok, emqx_resource:stop(PoolName)),
+    % Resource will be listed still, but state will be changed and healthcheck will fail
+    % as the worker no longer exists.
+    {ok, ?CONNECTOR_RESOURCE_GROUP, #{
+        state := State,
+        status := StoppedStatus
+    }} =
+        emqx_resource:get_instance(PoolName),
+    ?assertEqual(stopped, StoppedStatus),
+    ?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)),
+    % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
+    ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
+    % Can call stop/1 again on an already stopped instance
+    ?assertEqual(ok, emqx_resource:stop(PoolName)),
+    % Make sure it can be restarted and the healthchecks and queries work properly
+    ?assertEqual(ok, emqx_resource:restart(PoolName)),
+    % async restart, need to wait resource
+    timer:sleep(500),
+    {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
+        emqx_resource:get_instance(PoolName),
+    ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
+    (fun() ->
+        QueryNoParamsResWrapper =
+            emqx_resource:query(PoolName, test_query_no_params()),
+        ?assertMatch({ok, _}, QueryNoParamsResWrapper)
+    end)(),
+    % Stop and remove the resource in one go.
+    ?assertEqual(ok, emqx_resource:remove_local(PoolName)),
+    ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
+    % Should not even be able to get the resource data out of ets now unlike just stopping.
+    ?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)).
+
+%%--------------------------------------------------------------------
+%% utils
+%%--------------------------------------------------------------------
+
+cassandra_config() ->
+    Config =
+        #{
+            auto_reconnect => true,
+            keyspace => <<"mqtt">>,
+            username => <<"default">>,
+            password => <<"public">>,
+            pool_size => 8,
+            servers => iolist_to_binary(
+                io_lib:format(
+                    "~s:~b",
+                    [
+                        ?CASSANDRA_HOST,
+                        ?CASSANDRA_DEFAULT_PORT
+                    ]
+                )
+            )
+        },
+    #{<<"config">> => Config}.
+
+test_query_no_params() ->
+    {query, <<"SELECT count(1) AS T FROM system.local">>}.

+ 3 - 0
scripts/ct/run.sh

@@ -168,6 +168,9 @@ for dep in ${CT_DEPS}; do
         dynamo)
             FILES+=( '.ci/docker-compose-file/docker-compose-dynamo.yaml' )
             ;;
+        cassandra)
+            FILES+=( '.ci/docker-compose-file/docker-compose-cassandra-tcp.yaml' )
+            ;;
         *)
             echo "unknown_ct_dependency $dep"
             exit 1