Просмотр исходного кода

refactor: split cassandra bridges to actions and connectors

Shawn 2 лет назад
Родитель
Сommit
b32c0fb0d8

+ 1 - 0
apps/emqx_bridge/src/emqx_action_info.erl

@@ -92,6 +92,7 @@ hard_coded_action_info_modules_ee() ->
         emqx_bridge_matrix_action_info,
         emqx_bridge_mongodb_action_info,
         emqx_bridge_influxdb_action_info,
+        emqx_bridge_cassandra_action_info,
         emqx_bridge_mysql_action_info,
         emqx_bridge_pgsql_action_info,
         emqx_bridge_syskeeper_action_info,

+ 2 - 2
apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_cassandra, [
     {description, "EMQX Enterprise Cassandra Bridge"},
-    {vsn, "0.1.6"},
+    {vsn, "0.2.0"},
     {registered, []},
     {applications, [
         kernel,
@@ -8,7 +8,7 @@
         emqx_resource,
         ecql
     ]},
-    {env, []},
+    {env, [{emqx_action_info_modules, [emqx_bridge_cassandra_action_info]}]},
     {modules, []},
     {links, []}
 ]}.

+ 100 - 8
apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.erl

@@ -12,11 +12,17 @@
 
 %% schema examples
 -export([
-    conn_bridge_examples/1,
     values/2,
     fields/2
 ]).
 
+%% Examples
+-export([
+    bridge_v2_examples/1,
+    conn_bridge_examples/1,
+    connector_examples/1
+]).
+
 %% schema
 -export([
     namespace/0,
@@ -26,10 +32,13 @@
 ]).
 
 -define(DEFAULT_CQL, <<
-    "insert into mqtt_msg(topic, msgid, sender, qos, payload, arrived, retain) "
-    "values (${topic}, ${id}, ${clientid}, ${qos}, ${payload}, ${timestamp}, ${flags.retain})"
+    "insert into mqtt_msg(msgid, topic, qos, payload, arrived) "
+    "values (${id}, ${topic},  ${qos}, ${payload}, ${timestamp})"
 >>).
 
+-define(CONNECTOR_TYPE, cassandra).
+-define(ACTION_TYPE, cassandra).
+
 %%--------------------------------------------------------------------
 %% schema examples
 
@@ -43,6 +52,41 @@ conn_bridge_examples(Method) ->
         }
     ].
 
+bridge_v2_examples(Method) ->
+    ParamsExample = #{
+        parameters => #{
+            cql => ?DEFAULT_CQL
+        }
+    },
+    [
+        #{
+            <<"cassandra">> => #{
+                summary => <<"Cassandra Action">>,
+                value => emqx_bridge_v2_schema:action_values(
+                    Method, cassandra, cassandra, ParamsExample
+                )
+            }
+        }
+    ].
+
+connector_examples(Method) ->
+    [
+        #{
+            <<"cassandra">> => #{
+                summary => <<"Cassandra Connector">>,
+                value => emqx_connector_schema:connector_values(
+                    Method, cassandra, #{
+                        servers => <<"127.0.0.1:9042">>,
+                        keyspace => <<"mqtt">>,
+                        username => <<"root">>,
+                        password => <<"******">>,
+                        pool_size => 8
+                    }
+                )
+            }
+        }
+    ].
+
 %% no difference in get/post/put method
 values(_Method, Type) ->
     #{
@@ -73,14 +117,47 @@ namespace() -> "bridge_cassa".
 
 roots() -> [].
 
+fields("config_connector") ->
+    emqx_connector_schema:common_fields() ++
+        emqx_bridge_cassandra_connector:fields("connector") ++
+        emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
+fields(action) ->
+    {cassandra,
+        mk(
+            hoconsc:map(name, ref(?MODULE, cassandra_action)),
+            #{desc => <<"Cassandra Action Config">>, required => false}
+        )};
+fields(cassandra_action) ->
+    emqx_bridge_v2_schema:make_producer_action_schema(
+        mk(ref(?MODULE, action_parameters), #{
+            required => true, desc => ?DESC(action_parameters)
+        })
+    );
+fields(action_parameters) ->
+    [
+        cql_field()
+    ];
+fields(connector_resource_opts) ->
+    emqx_connector_schema:resource_opts_fields();
+fields(Field) when
+    Field == "get_connector";
+    Field == "put_connector";
+    Field == "post_connector"
+->
+    Fields =
+        emqx_bridge_cassandra_connector:fields("connector") ++
+            emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
+    emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
+fields(Field) when
+    Field == "get_bridge_v2";
+    Field == "post_bridge_v2";
+    Field == "put_bridge_v2"
+->
+    emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(cassandra_action));
 fields("config") ->
     [
+        cql_field(),
         {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
-        {cql,
-            mk(
-                binary(),
-                #{desc => ?DESC("cql_template"), default => ?DEFAULT_CQL, format => <<"sql">>}
-            )},
         {local_topic,
             mk(
                 binary(),
@@ -99,8 +176,23 @@ fields("get") ->
 fields("post", Type) ->
     [type_field(Type), name_field() | fields("config")].
 
+cql_field() ->
+    {cql,
+        mk(
+            binary(),
+            #{desc => ?DESC("cql_template"), default => ?DEFAULT_CQL, format => <<"sql">>}
+        )}.
+
 desc("config") ->
     ?DESC("desc_config");
+desc(cassandra_action) ->
+    ?DESC(cassandra_action);
+desc(action_parameters) ->
+    ?DESC(action_parameters);
+desc("config_connector") ->
+    ?DESC("desc_config");
+desc(connector_resource_opts) ->
+    ?DESC(emqx_resource_schema, "resource_opts");
 desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
     ["Configuration for Cassandra using `", string:to_upper(Method), "` method."];
 desc(_) ->

+ 62 - 0
apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_action_info.erl

@@ -0,0 +1,62 @@
+-module(emqx_bridge_cassandra_action_info).
+
+-behaviour(emqx_action_info).
+
+-export([
+    bridge_v1_config_to_action_config/2,
+    bridge_v1_config_to_connector_config/1,
+    connector_action_config_to_bridge_v1_config/2,
+    bridge_v1_type_name/0,
+    action_type_name/0,
+    connector_type_name/0,
+    schema_module/0
+]).
+
+-import(emqx_utils_conv, [bin/1]).
+
+-define(SCHEMA_MODULE, emqx_bridge_cassandra).
+
+bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
+    ActionTopLevelKeys = schema_keys(cassandra_action),
+    ActionParametersKeys = schema_keys(action_parameters),
+    ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
+    ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
+    emqx_utils_maps:update_if_present(
+        <<"resource_opts">>,
+        fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
+        ActionConfig#{<<"connector">> => ConnectorName}
+    ).
+
+bridge_v1_config_to_connector_config(BridgeV1Config) ->
+    ActionTopLevelKeys = schema_keys(cassandra_action),
+    ActionParametersKeys = schema_keys(action_parameters),
+    ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
+    ConnectorTopLevelKeys = schema_keys("config_connector"),
+    ConnectorKeys = maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys),
+    ConnConfig0 = maps:with(ConnectorKeys, BridgeV1Config),
+    emqx_utils_maps:update_if_present(
+        <<"resource_opts">>,
+        fun emqx_connector_schema:project_to_connector_resource_opts/1,
+        ConnConfig0
+    ).
+
+connector_action_config_to_bridge_v1_config(ConnectorRawConf, ActionRawConf) ->
+    RawConf = emqx_action_info:connector_action_config_to_bridge_v1_config(
+        ConnectorRawConf, ActionRawConf
+    ),
+    maps:without([<<"cassandra_type">>], RawConf).
+
+bridge_v1_type_name() -> cassandra.
+
+action_type_name() -> cassandra.
+
+connector_type_name() -> cassandra.
+
+schema_module() -> ?SCHEMA_MODULE.
+
+make_config_map(PickKeys, IndentKeys, Config) ->
+    Conf0 = maps:with(PickKeys, Config),
+    emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0).
+
+schema_keys(Name) ->
+    [bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))].

+ 122 - 166
apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl

@@ -14,13 +14,17 @@
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 %% schema
--export([roots/0, fields/1]).
+-export([roots/0, fields/1, desc/1]).
 
 %% callbacks of behaviour emqx_resource
 -export([
     callback_mode/0,
     on_start/2,
     on_stop/2,
+    on_add_channel/4,
+    on_remove_channel/3,
+    on_get_channel_status/3,
+    on_get_channels/1,
     on_query/3,
     on_query_async/4,
     on_batch_query/3,
@@ -28,6 +32,8 @@
     on_get_status/2
 ]).
 
+-export([transform_bridge_v1_config_to_connector_config/1]).
+
 %% callbacks of ecpool
 -export([
     connect/1,
@@ -39,16 +45,10 @@
 
 -export([do_get_status/1]).
 
--type prepares() :: #{atom() => binary()}.
--type params_tokens() :: #{atom() => list()}.
-
 -type state() ::
     #{
         pool_name := binary(),
-        prepare_cql := prepares(),
-        params_tokens := params_tokens(),
-        %% returned by ecql:prepare/2
-        prepare_statement := binary()
+        channels := #{}
     }.
 
 -define(DEFAULT_SERVER_OPTION, #{default_port => ?CASSANDRA_DEFAULT_PORT}).
@@ -62,7 +62,9 @@ roots() ->
 fields(config) ->
     cassandra_db_fields() ++
         emqx_connector_schema_lib:ssl_fields() ++
-        emqx_connector_schema_lib:prepare_statement_fields().
+        emqx_connector_schema_lib:prepare_statement_fields();
+fields("connector") ->
+    cassandra_db_fields() ++ emqx_connector_schema_lib:ssl_fields().
 
 cassandra_db_fields() ->
     [
@@ -83,6 +85,11 @@ keyspace(desc) -> ?DESC("keyspace");
 keyspace(required) -> true;
 keyspace(_) -> undefined.
 
+desc(config) ->
+    ?DESC("config");
+desc("connector") ->
+    ?DESC("connector").
+
 %%--------------------------------------------------------------------
 %% callbacks for emqx_resource
 
@@ -130,10 +137,9 @@ on_start(
             false ->
                 []
         end,
-    State = parse_prepare_cql(Config),
     case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
         ok ->
-            {ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})};
+            {ok, #{pool_name => InstId, channels => #{}}};
         {error, Reason} ->
             ?tp(
                 cassandra_connector_start_failed,
@@ -149,23 +155,49 @@ on_stop(InstId, _State) ->
     }),
     emqx_resource_pool:stop(InstId).
 
+on_add_channel(_InstId, #{channels := Channs} = OldState, ChannId, ChannConf0) ->
+    #{parameters := #{cql := CQL}} = ChannConf0,
+    {PrepareCQL, ParamsTokens} = emqx_placeholder:preproc_sql(CQL, '?'),
+    ParsedCql = #{
+        prepare_key => short_prepare_key(ChannId),
+        prepare_cql => PrepareCQL,
+        params_tokens => ParamsTokens
+    },
+    NewChanns = Channs#{ChannId => #{parsed_cql => ParsedCql, prepare_result => not_prepared}},
+    {ok, OldState#{channels => NewChanns}}.
+
+on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannId) ->
+    NewState = State#{channels => maps:remove(ChannId, Channels)},
+    {ok, NewState}.
+
+on_get_channel_status(InstanceId, ChannId, #{channels := Channels, pool_name := PoolName} = State) ->
+    case on_get_status(InstanceId, State) of
+        connected ->
+            #{parsed_cql := ParsedCql} = maps:get(ChannId, Channels),
+            case prepare_cql_to_cassandra(ParsedCql, PoolName) of
+                {ok, _} -> connected;
+                {error, Reason} -> {connecting, Reason}
+            end;
+        _ ->
+            connecting
+    end.
+
+on_get_channels(InstanceId) ->
+    emqx_bridge_v2:get_channels_for_connector(InstanceId).
+
 -type request() ::
     % emqx_bridge.erl
-    {send_message, Params :: map()}
+    {ChannId :: binary(), Params :: map()}
     % common query
-    | {query, SQL :: binary()}
-    | {query, SQL :: binary(), Params :: map()}.
+    | {query, CQL :: binary()}
+    | {query, CQL :: binary(), Params :: map()}.
 
 -spec on_query(
     emqx_resource:resource_id(),
     request(),
     state()
 ) -> ok | {ok, ecql:cql_result()} | {error, {recoverable_error | unrecoverable_error, term()}}.
-on_query(
-    InstId,
-    Request,
-    State
-) ->
+on_query(InstId, Request, State) ->
     do_single_query(InstId, Request, sync, State).
 
 -spec on_query_async(
@@ -174,21 +206,11 @@ on_query(
     {function(), list()},
     state()
 ) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}.
-on_query_async(
-    InstId,
-    Request,
-    Callback,
-    State
-) ->
+on_query_async(InstId, Request, Callback, State) ->
     do_single_query(InstId, Request, {async, Callback}, State).
 
-do_single_query(
-    InstId,
-    Request,
-    Async,
-    #{pool_name := PoolName} = State
-) ->
-    {Type, PreparedKeyOrSQL, Params} = parse_request_to_cql(Request),
+do_single_query(InstId, Request, Async, #{pool_name := PoolName} = State) ->
+    {Type, PreparedKeyOrCQL, Params} = parse_request_to_cql(Request),
     ?tp(
         debug,
         cassandra_connector_received_cql_query,
@@ -196,12 +218,12 @@ do_single_query(
             connector => InstId,
             type => Type,
             params => Params,
-            prepared_key_or_cql => PreparedKeyOrSQL,
+            prepared_key_or_cql => PreparedKeyOrCQL,
             state => State
         }
     ),
-    {PreparedKeyOrSQL1, Data} = proc_cql_params(Type, PreparedKeyOrSQL, Params, State),
-    Res = exec_cql_query(InstId, PoolName, Type, Async, PreparedKeyOrSQL1, Data),
+    {PreparedKeyOrCQL1, Data} = proc_cql_params(Type, PreparedKeyOrCQL, Params, State),
+    Res = exec_cql_query(InstId, PoolName, Type, Async, PreparedKeyOrCQL1, Data),
     handle_result(Res).
 
 -spec on_batch_query(
@@ -209,11 +231,7 @@ do_single_query(
     [request()],
     state()
 ) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}.
-on_batch_query(
-    InstId,
-    Requests,
-    State
-) ->
+on_batch_query(InstId, Requests, State) ->
     do_batch_query(InstId, Requests, sync, State).
 
 -spec on_batch_query_async(
@@ -222,25 +240,15 @@ on_batch_query(
     {function(), list()},
     state()
 ) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}.
-on_batch_query_async(
-    InstId,
-    Requests,
-    Callback,
-    State
-) ->
+on_batch_query_async(InstId, Requests, Callback, State) ->
     do_batch_query(InstId, Requests, {async, Callback}, State).
 
-do_batch_query(
-    InstId,
-    Requests,
-    Async,
-    #{pool_name := PoolName} = State
-) ->
+do_batch_query(InstId, Requests, Async, #{pool_name := PoolName} = State) ->
     CQLs =
         lists:map(
             fun(Request) ->
-                {Type, PreparedKeyOrSQL, Params} = parse_request_to_cql(Request),
-                proc_cql_params(Type, PreparedKeyOrSQL, Params, State)
+                {Type, PreparedKeyOrCQL, Params} = parse_request_to_cql(Request),
+                proc_cql_params(Type, PreparedKeyOrCQL, Params, State)
             end,
             Requests
         ),
@@ -256,26 +264,24 @@ do_batch_query(
     Res = exec_cql_batch_query(InstId, PoolName, Async, CQLs),
     handle_result(Res).
 
-parse_request_to_cql({send_message, Params}) ->
-    {prepared_query, _Key = send_message, Params};
-parse_request_to_cql({query, SQL}) ->
-    parse_request_to_cql({query, SQL, #{}});
-parse_request_to_cql({query, SQL, Params}) ->
-    {query, SQL, Params}.
-
-proc_cql_params(
-    prepared_query,
-    PreparedKey0,
-    Params,
-    #{prepare_statement := Prepares, params_tokens := ParamsTokens}
-) ->
-    %% assert
-    _PreparedKey = maps:get(PreparedKey0, Prepares),
-    Tokens = maps:get(PreparedKey0, ParamsTokens),
-    {PreparedKey0, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))};
-proc_cql_params(query, SQL, Params, _State) ->
-    {SQL1, Tokens} = emqx_placeholder:preproc_sql(SQL, '?'),
-    {SQL1, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))}.
+parse_request_to_cql({query, CQL}) ->
+    {query, CQL, #{}};
+parse_request_to_cql({query, CQL, Params}) ->
+    {query, CQL, Params};
+parse_request_to_cql({ChannId, Params}) ->
+    {prepared_query, ChannId, Params}.
+
+proc_cql_params(prepared_query, ChannId, Params, #{channels := Channs}) ->
+    #{
+        parsed_cql := #{
+            prepare_key := PrepareKey,
+            params_tokens := ParamsTokens
+        }
+    } = maps:get(ChannId, Channs),
+    {PrepareKey, assign_type_for_params(emqx_placeholder:proc_sql(ParamsTokens, Params))};
+proc_cql_params(query, CQL, Params, _State) ->
+    {CQL1, Tokens} = emqx_placeholder:preproc_sql(CQL, '?'),
+    {CQL1, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))}.
 
 exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when
     Type == query; Type == prepared_query
@@ -314,38 +320,15 @@ exec_cql_batch_query(InstId, PoolName, Async, CQLs) ->
 exec(PoolName, Query) ->
     ecpool:pick_and_do(PoolName, Query, no_handover).
 
-on_get_status(_InstId, #{pool_name := PoolName} = State) ->
+on_get_status(_InstId, #{pool_name := PoolName}) ->
     case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
-        true ->
-            case do_check_prepares(State) of
-                ok ->
-                    connected;
-                {ok, NState} ->
-                    %% return new state with prepared statements
-                    {connected, NState};
-                false ->
-                    %% do not log error, it is logged in prepare_cql_to_conn
-                    connecting
-            end;
-        false ->
-            connecting
+        true -> connected;
+        false -> connecting
     end.
 
 do_get_status(Conn) ->
     ok == element(1, ecql:query(Conn, "SELECT cluster_name FROM system.local")).
 
-do_check_prepares(#{prepare_cql := Prepares}) when is_map(Prepares) ->
-    ok;
-do_check_prepares(State = #{pool_name := PoolName, prepare_cql := {error, Prepares}}) ->
-    %% retry to prepare
-    case prepare_cql(Prepares, PoolName) of
-        {ok, Sts} ->
-            %% remove the error
-            {ok, State#{prepare_cql => Prepares, prepare_statement := Sts}};
-        _Error ->
-            false
-    end.
-
 %%--------------------------------------------------------------------
 %% callbacks query
 
@@ -394,88 +377,50 @@ conn_opts([Opt | Opts], Acc) ->
 
 %%--------------------------------------------------------------------
 %% prepare
-
-%% XXX: hardcode
-%% note: the `cql` param is passed by emqx_bridge_cassandra
-parse_prepare_cql(#{cql := SQL}) ->
-    parse_prepare_cql([{send_message, SQL}], #{}, #{});
-parse_prepare_cql(_) ->
-    #{prepare_cql => #{}, params_tokens => #{}}.
-
-parse_prepare_cql([{Key, H} | T], Prepares, Tokens) ->
-    {PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, '?'),
-    parse_prepare_cql(
-        T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}
-    );
-parse_prepare_cql([], Prepares, Tokens) ->
-    #{
-        prepare_cql => Prepares,
-        params_tokens => Tokens
-    }.
-
-init_prepare(State = #{prepare_cql := Prepares, pool_name := PoolName}) ->
-    case maps:size(Prepares) of
-        0 ->
-            State;
-        _ ->
-            case prepare_cql(Prepares, PoolName) of
-                {ok, Sts} ->
-                    State#{prepare_statement := Sts};
-                Error ->
-                    ?tp(
-                        error,
-                        cassandra_prepare_cql_failed,
-                        #{prepares => Prepares, reason => Error}
-                    ),
-                    %% mark the prepare_cql as failed
-                    State#{prepare_cql => {error, Prepares}}
-            end
-    end.
-
-prepare_cql(Prepares, PoolName) when is_map(Prepares) ->
-    prepare_cql(maps:to_list(Prepares), PoolName);
-prepare_cql(Prepares, PoolName) ->
-    case do_prepare_cql(Prepares, PoolName) of
-        {ok, _Sts} = Ok ->
+prepare_cql_to_cassandra(ParsedCql, PoolName) ->
+    case prepare_cql_to_cassandra(ecpool:workers(PoolName), ParsedCql, #{}) of
+        {ok, Statement} ->
             %% prepare for reconnect
-            ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_cql_to_conn, [Prepares]}),
-            Ok;
+            ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_cql_to_conn, [ParsedCql]}),
+            {ok, Statement};
         Error ->
+            ?tp(
+                error,
+                cassandra_prepare_cql_failed,
+                #{parsed_cql => ParsedCql, reason => Error}
+            ),
             Error
     end.
 
-do_prepare_cql(Prepares, PoolName) ->
-    do_prepare_cql(ecpool:workers(PoolName), Prepares, #{}).
-
-do_prepare_cql([{_Name, Worker} | T], Prepares, _LastSts) ->
+prepare_cql_to_cassandra([{_Name, Worker} | T], ParsedCql, _LastSts) ->
     {ok, Conn} = ecpool_worker:client(Worker),
-    case prepare_cql_to_conn(Conn, Prepares) of
-        {ok, Sts} ->
-            do_prepare_cql(T, Prepares, Sts);
+    case prepare_cql_to_conn(Conn, ParsedCql) of
+        {ok, Statement} ->
+            prepare_cql_to_cassandra(T, ParsedCql, Statement);
         Error ->
             Error
     end;
-do_prepare_cql([], _Prepares, LastSts) ->
+prepare_cql_to_cassandra([], _ParsedCql, LastSts) ->
     {ok, LastSts}.
 
-prepare_cql_to_conn(Conn, Prepares) ->
-    prepare_cql_to_conn(Conn, Prepares, #{}).
-
-prepare_cql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements};
-prepare_cql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) ->
-    ?SLOG(info, #{msg => "cassandra_prepare_cql", name => Key, prepare_cql => SQL}),
-    case ecql:prepare(Conn, Key, SQL) of
+prepare_cql_to_conn(Conn, #{prepare_key := PrepareKey, prepare_cql := PrepareCQL}) when
+    is_pid(Conn)
+->
+    ?SLOG(info, #{
+        msg => "cassandra_prepare_cql", prepare_key => PrepareKey, prepare_cql => PrepareCQL
+    }),
+    case ecql:prepare(Conn, PrepareKey, PrepareCQL) of
         {ok, Statement} ->
-            prepare_cql_to_conn(Conn, PrepareList, Statements#{Key => Statement});
-        {error, Error} = Other ->
+            {ok, Statement};
+        {error, Reason} = Error ->
             ?SLOG(error, #{
                 msg => "cassandra_prepare_cql_failed",
                 worker_pid => Conn,
-                name => Key,
-                prepare_cql => SQL,
-                error => Error
+                name => PrepareKey,
+                prepare_cql => PrepareCQL,
+                reason => Reason
             }),
-            Other
+            Error
     end.
 
 handle_result({error, disconnected}) ->
@@ -487,6 +432,9 @@ handle_result({error, Error}) ->
 handle_result(Res) ->
     Res.
 
+transform_bridge_v1_config_to_connector_config(_) ->
+    ok.
+
 %%--------------------------------------------------------------------
 %% utils
 
@@ -513,3 +461,11 @@ maybe_assign_type(V) when is_integer(V) ->
 maybe_assign_type(V) when is_float(V) -> {double, V};
 maybe_assign_type(V) ->
     V.
+
+short_prepare_key(Str) when is_binary(Str) ->
+    true = size(Str) > 0,
+    Sha = crypto:hash(sha, Str),
+    %% TODO: change to binary:encode_hex(X, lowercase) when OTP version is always > 25
+    Hex = string:lowercase(binary:encode_hex(Sha)),
+    <<UniqueEnough:16/binary, _/binary>> = Hex,
+    binary_to_atom(<<"cassa_prepare_key:", UniqueEnough/binary>>).

+ 16 - 5
apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl

@@ -301,17 +301,28 @@ send_message(Config, Payload) ->
 query_resource(Config, Request) ->
     Name = ?config(cassa_name, Config),
     BridgeType = ?config(cassa_bridge_type, Config),
-    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
-    emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
+    BridgeV2Id = emqx_bridge_v2:id(BridgeType, Name),
+    ConnectorResId = emqx_connector_resource:resource_id(
+        cassandra, <<"connector_emqx_bridge_cassandra_SUITE">>
+    ),
+    emqx_resource:query(BridgeV2Id, Request, #{
+        timeout => 1_000, connector_resource_id => ConnectorResId
+    }).
 
 query_resource_async(Config, Request) ->
     Name = ?config(cassa_name, Config),
     BridgeType = ?config(cassa_bridge_type, Config),
     Ref = alias([reply]),
     AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
-    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
-    Return = emqx_resource:query(ResourceID, Request, #{
-        timeout => 500, async_reply_fun => {AsyncReplyFun, []}
+    BridgeV2Id = emqx_bridge_v2:id(BridgeType, Name),
+    ConnectorResId = emqx_connector_resource:resource_id(
+        cassandra, <<"connector_emqx_bridge_cassandra_SUITE">>
+    ),
+    Return = emqx_resource:query(BridgeV2Id, Request, #{
+        timeout => 500,
+        async_reply_fun => {AsyncReplyFun, []},
+        connector_resource_id => ConnectorResId,
+        query_mode => async
     }),
     {Return, Ref}.
 

+ 0 - 4
apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl

@@ -22,10 +22,6 @@
 %%     ./rebar3 ct --name 'test@127.0.0.1' -v --suite \
 %%     apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE
 
-%% Cassandra servers are defined at `.ci/docker-compose-file/docker-compose-cassandra.yaml`
-%% You can change it to `127.0.0.1`, if you run this SUITE locally
--define(CASSANDRA_HOST, "cassandra").
--define(CASSANDRA_HOST_NOAUTH, "cassandra_noauth").
 -define(CASSANDRA_RESOURCE_MOD, emqx_bridge_cassandra_connector).
 
 %% Cassandra default username & password once enable `authenticator: PasswordAuthenticator`

+ 12 - 0
apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl

@@ -36,6 +36,8 @@ resource_type(mongodb) ->
     emqx_bridge_mongodb_connector;
 resource_type(influxdb) ->
     emqx_bridge_influxdb_connector;
+resource_type(cassandra) ->
+    emqx_bridge_cassandra_connector;
 resource_type(mysql) ->
     emqx_bridge_mysql_connector;
 resource_type(pgsql) ->
@@ -130,6 +132,14 @@ connector_structs() ->
                     required => false
                 }
             )},
+        {cassandra,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_cassandra, "config_connector")),
+                #{
+                    desc => <<"Cassandra Connector Config">>,
+                    required => false
+                }
+            )},
         {mysql,
             mk(
                 hoconsc:map(name, ref(emqx_bridge_mysql, "config_connector")),
@@ -205,6 +215,7 @@ schema_modules() ->
         emqx_bridge_matrix,
         emqx_bridge_mongodb,
         emqx_bridge_influxdb,
+        emqx_bridge_cassandra,
         emqx_bridge_mysql,
         emqx_bridge_syskeeper_connector,
         emqx_bridge_syskeeper_proxy,
@@ -234,6 +245,7 @@ api_schemas(Method) ->
         api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"),
         api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"),
         api_ref(emqx_bridge_influxdb, <<"influxdb">>, Method ++ "_connector"),
+        api_ref(emqx_bridge_cassandra, <<"cassandra">>, Method ++ "_connector"),
         api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"),
         api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
         api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method),

+ 2 - 0
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -137,6 +137,8 @@ connector_type_to_bridge_types(mongodb) ->
     [mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
 connector_type_to_bridge_types(influxdb) ->
     [influxdb, influxdb_api_v1, influxdb_api_v2];
+connector_type_to_bridge_types(cassandra) ->
+    [cassandra];
 connector_type_to_bridge_types(mysql) ->
     [mysql];
 connector_type_to_bridge_types(mqtt) ->

+ 10 - 0
rel/i18n/emqx_bridge_cassandra.hocon

@@ -1,5 +1,15 @@
 emqx_bridge_cassandra {
 
+action_parameters.desc:
+"""Action specific configs."""
+action_parameters.label:
+"""Action"""
+
+cassandra_action.desc:
+"""Action configs."""
+cassandra_action.label:
+"""Action"""
+
 config_enable.desc:
 """Enable or disable this bridge"""
 

+ 6 - 0
rel/i18n/emqx_bridge_cassandra_connector.hocon

@@ -1,5 +1,11 @@
 emqx_bridge_cassandra_connector {
 
+config.desc:
+"""Cassandra connection config"""
+
+config.label:
+"""Connection config"""
+
 keyspace.desc:
 """Keyspace name to connect to."""