فهرست منبع

Merge pull request #12425 from emqx/clickhouse-bridge-v2

refactor: split clickhouse bridges to actions and connectors
Xinyu Liu 2 سال پیش
والد
کامیت
0fef19f86f

+ 1 - 0
apps/emqx_bridge/src/emqx_action_info.erl

@@ -97,6 +97,7 @@ hard_coded_action_info_modules_ee() ->
         emqx_bridge_rocketmq_action_info,
         emqx_bridge_influxdb_action_info,
         emqx_bridge_cassandra_action_info,
+        emqx_bridge_clickhouse_action_info,
         emqx_bridge_mysql_action_info,
         emqx_bridge_pgsql_action_info,
         emqx_bridge_syskeeper_action_info,

+ 0 - 5
apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl

@@ -32,8 +32,6 @@
     on_get_status/2
 ]).
 
--export([transform_bridge_v1_config_to_connector_config/1]).
-
 %% callbacks of ecpool
 -export([
     connect/1,
@@ -448,9 +446,6 @@ handle_result({error, Error}) ->
 handle_result(Res) ->
     Res.
 
-transform_bridge_v1_config_to_connector_config(_) ->
-    ok.
-
 %%--------------------------------------------------------------------
 %% utils
 

+ 2 - 2
apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_clickhouse, [
     {description, "EMQX Enterprise ClickHouse Bridge"},
-    {vsn, "0.2.5"},
+    {vsn, "0.3.0"},
     {registered, []},
     {applications, [
         kernel,
@@ -8,7 +8,7 @@
         emqx_resource,
         clickhouse
     ]},
-    {env, []},
+    {env, [{emqx_action_info_modules, [emqx_bridge_clickhouse_action_info]}]},
     {modules, []},
     {links, []}
 ]}.

+ 105 - 16
apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl

@@ -9,8 +9,11 @@
 
 -import(hoconsc, [mk/2, enum/1, ref/2]).
 
+%% Examples
 -export([
-    conn_bridge_examples/1
+    bridge_v2_examples/1,
+    conn_bridge_examples/1,
+    connector_examples/1
 ]).
 
 -export([
@@ -20,12 +23,10 @@
     desc/1
 ]).
 
--define(DEFAULT_SQL,
-    <<"INSERT INTO mqtt_test(payload, arrived) VALUES ('${payload}', ${timestamp})">>
-).
-
+-define(DEFAULT_SQL, <<"INSERT INTO messages(data, arrived) VALUES ('${payload}', ${timestamp})">>).
 -define(DEFAULT_BATCH_VALUE_SEPARATOR, <<", ">>).
-
+-define(CONNECTOR_TYPE, clickhouse).
+-define(ACTION_TYPE, clickhouse).
 %% -------------------------------------------------------------------------------------------------
 %% Callback used by HTTP API
 %% -------------------------------------------------------------------------------------------------
@@ -40,6 +41,42 @@ conn_bridge_examples(Method) ->
         }
     ].
 
+bridge_v2_examples(Method) ->
+    ParamsExample = #{
+        parameters => #{
+            batch_value_separator => ?DEFAULT_BATCH_VALUE_SEPARATOR,
+            sql => ?DEFAULT_SQL
+        }
+    },
+    [
+        #{
+            <<"clickhouse">> => #{
+                summary => <<"ClickHouse Action">>,
+                value => emqx_bridge_v2_schema:action_values(
+                    Method, clickhouse, clickhouse, ParamsExample
+                )
+            }
+        }
+    ].
+
+connector_examples(Method) ->
+    [
+        #{
+            <<"clickhouse">> => #{
+                summary => <<"ClickHouse Connector">>,
+                value => emqx_connector_schema:connector_values(
+                    Method, clickhouse, #{
+                        url => <<"http://localhost:8123">>,
+                        database => <<"mqtt">>,
+                        pool_size => 8,
+                        username => <<"default">>,
+                        password => <<"******">>
+                    }
+                )
+            }
+        }
+    ].
+
 values(_Method, Type) ->
     #{
         enable => true,
@@ -71,19 +108,49 @@ namespace() -> "bridge_clickhouse".
 
 roots() -> [].
 
+fields("config_connector") ->
+    emqx_connector_schema:common_fields() ++
+        emqx_bridge_clickhouse_connector:fields(config) ++
+        emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
+fields(action) ->
+    {clickhouse,
+        mk(
+            hoconsc:map(name, ref(?MODULE, clickhouse_action)),
+            #{desc => <<"ClickHouse Action Config">>, required => false}
+        )};
+fields(clickhouse_action) ->
+    emqx_bridge_v2_schema:make_producer_action_schema(
+        mk(ref(?MODULE, action_parameters), #{
+            required => true, desc => ?DESC(action_parameters)
+        })
+    );
+fields(action_parameters) ->
+    [
+        sql_field(),
+        batch_value_separator_field()
+    ];
+fields(connector_resource_opts) ->
+    emqx_connector_schema:resource_opts_fields();
+fields(Field) when
+    Field == "get_connector";
+    Field == "put_connector";
+    Field == "post_connector"
+->
+    Fields =
+        emqx_bridge_clickhouse_connector:fields(config) ++
+            emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
+    emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
+fields(Field) when
+    Field == "get_bridge_v2";
+    Field == "post_bridge_v2";
+    Field == "put_bridge_v2"
+->
+    emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(clickhouse_action));
 fields("config") ->
     [
         {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
-        {sql,
-            mk(
-                binary(),
-                #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
-            )},
-        {batch_value_separator,
-            mk(
-                binary(),
-                #{desc => ?DESC("batch_value_separator"), default => ?DEFAULT_BATCH_VALUE_SEPARATOR}
-            )},
+        sql_field(),
+        batch_value_separator_field(),
         {local_topic,
             mk(
                 binary(),
@@ -112,6 +179,28 @@ fields("get") ->
 fields("post", Type) ->
     [type_field(Type), name_field() | fields("config")].
 
+sql_field() ->
+    {sql,
+        mk(
+            binary(),
+            #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
+        )}.
+
+batch_value_separator_field() ->
+    {batch_value_separator,
+        mk(
+            binary(),
+            #{desc => ?DESC("batch_value_separator"), default => ?DEFAULT_BATCH_VALUE_SEPARATOR}
+        )}.
+
+desc(clickhouse_action) ->
+    ?DESC(clickhouse_action);
+desc(action_parameters) ->
+    ?DESC(action_parameters);
+desc("config_connector") ->
+    ?DESC("desc_config");
+desc(connector_resource_opts) ->
+    ?DESC(emqx_resource_schema, "resource_opts");
 desc("config") ->
     ?DESC("desc_config");
 desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->

+ 62 - 0
apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_action_info.erl

@@ -0,0 +1,62 @@
+-module(emqx_bridge_clickhouse_action_info).
+
+-behaviour(emqx_action_info).
+
+-export([
+    bridge_v1_config_to_action_config/2,
+    bridge_v1_config_to_connector_config/1,
+    connector_action_config_to_bridge_v1_config/2,
+    bridge_v1_type_name/0,
+    action_type_name/0,
+    connector_type_name/0,
+    schema_module/0
+]).
+
+-import(emqx_utils_conv, [bin/1]).
+
+-define(SCHEMA_MODULE, emqx_bridge_clickhouse).
+
+bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
+    ActionTopLevelKeys = schema_keys(clickhouse_action),
+    ActionParametersKeys = schema_keys(action_parameters),
+    ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
+    ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
+    emqx_utils_maps:update_if_present(
+        <<"resource_opts">>,
+        fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
+        ActionConfig#{<<"connector">> => ConnectorName}
+    ).
+
+bridge_v1_config_to_connector_config(BridgeV1Config) ->
+    ActionTopLevelKeys = schema_keys(clickhouse_action),
+    ActionParametersKeys = schema_keys(action_parameters),
+    ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
+    ConnectorTopLevelKeys = schema_keys("config_connector"),
+    ConnectorKeys = maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys),
+    ConnConfig0 = maps:with(ConnectorKeys, BridgeV1Config),
+    emqx_utils_maps:update_if_present(
+        <<"resource_opts">>,
+        fun emqx_connector_schema:project_to_connector_resource_opts/1,
+        ConnConfig0
+    ).
+
+connector_action_config_to_bridge_v1_config(ConnectorRawConf, ActionRawConf) ->
+    RawConf = emqx_action_info:connector_action_config_to_bridge_v1_config(
+        ConnectorRawConf, ActionRawConf
+    ),
+    maps:without([<<"clickhouse_type">>], RawConf).
+
+bridge_v1_type_name() -> clickhouse.
+
+action_type_name() -> clickhouse.
+
+connector_type_name() -> clickhouse.
+
+schema_module() -> ?SCHEMA_MODULE.
+
+make_config_map(PickKeys, IndentKeys, Config) ->
+    Conf0 = maps:with(PickKeys, Config),
+    emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0).
+
+schema_keys(Name) ->
+    [bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))].

+ 52 - 30
apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl

@@ -32,6 +32,10 @@
     callback_mode/0,
     on_start/2,
     on_stop/2,
+    on_add_channel/4,
+    on_remove_channel/3,
+    on_get_channel_status/3,
+    on_get_channels/1,
     on_query/3,
     on_batch_query/3,
     on_get_status/2
@@ -62,6 +66,7 @@
 
 -type state() ::
     #{
+        channels => #{binary() => templates()},
         templates := templates(),
         pool_name := binary(),
         connect_timeout := pos_integer()
@@ -155,10 +160,9 @@ on_start(
         {pool, InstanceID}
     ],
     try
-        Templates = prepare_sql_templates(Config),
         State = #{
+            channels => #{},
             pool_name => InstanceID,
-            templates => Templates,
             connect_timeout => ConnectTimeout
         },
         case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of
@@ -195,10 +199,8 @@ prepare_sql_templates(#{
     sql := Template,
     batch_value_separator := Separator
 }) ->
-    InsertTemplate =
-        emqx_placeholder:preproc_tmpl(Template),
-    BulkExtendInsertTemplate =
-        prepare_sql_bulk_extend_template(Template, Separator),
+    InsertTemplate = emqx_placeholder:preproc_tmpl(Template),
+    BulkExtendInsertTemplate = prepare_sql_bulk_extend_template(Template, Separator),
     #{
         send_message_template => InsertTemplate,
         extend_send_message_template => BulkExtendInsertTemplate
@@ -285,6 +287,27 @@ on_stop(InstanceID, _State) ->
     }),
     emqx_resource_pool:stop(InstanceID).
 
+%% -------------------------------------------------------------------
+%% channel related emqx_resouce callbacks
+%% -------------------------------------------------------------------
+on_add_channel(_InstId, #{channels := Channs} = OldState, ChannId, ChannConf0) ->
+    #{parameters := ParamConf} = ChannConf0,
+    NewChanns = Channs#{ChannId => #{templates => prepare_sql_templates(ParamConf)}},
+    {ok, OldState#{channels => NewChanns}}.
+
+on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannId) ->
+    NewState = State#{channels => maps:remove(ChannId, Channels)},
+    {ok, NewState}.
+
+on_get_channel_status(InstanceId, _ChannId, State) ->
+    case on_get_status(InstanceId, State) of
+        {connected, _} -> connected;
+        {disconnected, _, _} -> disconnected
+    end.
+
+on_get_channels(InstanceId) ->
+    emqx_bridge_v2:get_channels_for_connector(InstanceId).
+
 %% -------------------------------------------------------------------
 %% on_get_status emqx_resouce callback and related functions
 %% -------------------------------------------------------------------
@@ -339,8 +362,8 @@ do_get_status(PoolName, Timeout) ->
 
 -spec on_query
     (resource_id(), Request, resource_state()) -> query_result() when
-        Request :: {RequestType, Data},
-        RequestType :: send_message,
+        Request :: {ChannId, Data},
+        ChannId :: binary(),
         Data :: map();
     (resource_id(), Request, resource_state()) -> query_result() when
         Request :: {RequestType, SQL},
@@ -361,12 +384,20 @@ on_query(
     }),
     %% Have we got a query or data to fit into an SQL template?
     SimplifiedRequestType = query_type(RequestType),
-    #{templates := Templates} = State,
+    Templates = get_templates(RequestType, State),
     SQL = get_sql(SimplifiedRequestType, Templates, DataOrSQL),
     ClickhouseResult = execute_sql_in_clickhouse_server(PoolName, SQL),
     transform_and_log_clickhouse_result(ClickhouseResult, ResourceID, SQL).
 
-get_sql(send_message, #{send_message_template := PreparedSQL}, Data) ->
+get_templates(ChannId, State) ->
+    case maps:find(channels, State) of
+        {ok, Channels} ->
+            maps:get(templates, maps:get(ChannId, Channels, #{}), #{});
+        error ->
+            #{}
+    end.
+
+get_sql(channel_message, #{send_message_template := PreparedSQL}, Data) ->
     emqx_placeholder:proc_tmpl(PreparedSQL, Data);
 get_sql(_, _, SQL) ->
     SQL.
@@ -376,24 +407,21 @@ query_type(sql) ->
 query_type(query) ->
     query;
 %% Data that goes to bridges use the prepared template
-query_type(send_message) ->
-    send_message.
+query_type(ChannId) when is_binary(ChannId) ->
+    channel_message.
 
 %% -------------------------------------------------------------------
 %% on_batch_query emqx_resouce callback and related functions
 %% -------------------------------------------------------------------
 
 -spec on_batch_query(resource_id(), BatchReq, resource_state()) -> query_result() when
-    BatchReq :: nonempty_list({'send_message', map()}).
+    BatchReq :: nonempty_list({binary(), map()}).
 
-on_batch_query(
-    ResourceID,
-    BatchReq,
-    #{pool_name := PoolName, templates := Templates} = _State
-) ->
-    %% Currently we only support batch requests with the send_message key
-    {Keys, ObjectsToInsert} = lists:unzip(BatchReq),
-    ensure_keys_are_of_type_send_message(Keys),
+on_batch_query(ResourceID, BatchReq, #{pool_name := PoolName} = State) ->
+    %% Currently we only support batch requests with a binary ChannId
+    {[ChannId | _] = Keys, ObjectsToInsert} = lists:unzip(BatchReq),
+    ensure_channel_messages(Keys),
+    Templates = get_templates(ChannId, State),
     %% Create batch insert SQL statement
     SQL = objects_to_sql(ObjectsToInsert, Templates),
     %% Do the actual query in the database
@@ -401,22 +429,16 @@ on_batch_query(
     %% Transform the result to a better format
     transform_and_log_clickhouse_result(ResultFromClickhouse, ResourceID, SQL).
 
-ensure_keys_are_of_type_send_message(Keys) ->
-    case lists:all(fun is_send_message_atom/1, Keys) of
+ensure_channel_messages(Keys) ->
+    case lists:all(fun is_binary/1, Keys) of
         true ->
             ok;
         false ->
             erlang:error(
-                {unrecoverable_error,
-                    <<"Unexpected type for batch message (Expected send_message)">>}
+                {unrecoverable_error, <<"Unexpected type for batch message (Expected channel-id)">>}
             )
     end.
 
-is_send_message_atom(send_message) ->
-    true;
-is_send_message_atom(_) ->
-    false.
-
 objects_to_sql(
     [FirstObject | RemainingObjects] = _ObjectsToInsert,
     #{

+ 12 - 9
apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_SUITE.erl

@@ -9,6 +9,7 @@
 
 -define(APP, emqx_bridge_clickhouse).
 -define(CLICKHOUSE_HOST, "clickhouse").
+-define(CLICKHOUSE_PORT, "8123").
 -include_lib("emqx_connector/include/emqx_connector.hrl").
 
 %% See comment in
@@ -20,9 +21,9 @@
 %%------------------------------------------------------------------------------
 
 init_per_suite(Config) ->
-    case
-        emqx_common_test_helpers:is_tcp_server_available(?CLICKHOUSE_HOST, ?CLICKHOUSE_DEFAULT_PORT)
-    of
+    Host = clickhouse_host(),
+    Port = list_to_integer(clickhouse_port()),
+    case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
         true ->
             emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
             ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
@@ -114,13 +115,15 @@ sql_drop_table() ->
 sql_create_database() ->
     "CREATE DATABASE IF NOT EXISTS mqtt".
 
+clickhouse_host() ->
+    os:getenv("CLICKHOUSE_HOST", ?CLICKHOUSE_HOST).
+clickhouse_port() ->
+    os:getenv("CLICKHOUSE_PORT", ?CLICKHOUSE_PORT).
+
 clickhouse_url() ->
-    erlang:iolist_to_binary([
-        <<"http://">>,
-        ?CLICKHOUSE_HOST,
-        ":",
-        erlang:integer_to_list(?CLICKHOUSE_DEFAULT_PORT)
-    ]).
+    Host = clickhouse_host(),
+    Port = clickhouse_port(),
+    erlang:iolist_to_binary(["http://", Host, ":", Port]).
 
 clickhouse_config(Config) ->
     SQL = maps:get(sql, Config, sql_insert_template_for_bridge()),

+ 5 - 22
apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl

@@ -13,7 +13,6 @@
 -include_lib("common_test/include/ct.hrl").
 
 -define(APP, emqx_bridge_clickhouse).
--define(CLICKHOUSE_HOST, "clickhouse").
 -define(CLICKHOUSE_RESOURCE_MOD, emqx_bridge_clickhouse_connector).
 -define(CLICKHOUSE_PASSWORD, "public").
 
@@ -39,25 +38,17 @@ all() ->
 groups() ->
     [].
 
-clickhouse_url() ->
-    erlang:iolist_to_binary([
-        <<"http://">>,
-        ?CLICKHOUSE_HOST,
-        ":",
-        erlang:integer_to_list(?CLICKHOUSE_DEFAULT_PORT)
-    ]).
-
 init_per_suite(Config) ->
-    case
-        emqx_common_test_helpers:is_tcp_server_available(?CLICKHOUSE_HOST, ?CLICKHOUSE_DEFAULT_PORT)
-    of
+    Host = emqx_bridge_clickhouse_SUITE:clickhouse_host(),
+    Port = list_to_integer(emqx_bridge_clickhouse_SUITE:clickhouse_port()),
+    case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
         true ->
             ok = emqx_common_test_helpers:start_apps([emqx_conf]),
             ok = emqx_connector_test_helpers:start_apps([emqx_resource, ?APP]),
             %% Create the db table
             {ok, Conn} =
                 clickhouse:start_link([
-                    {url, clickhouse_url()},
+                    {url, emqx_bridge_clickhouse_SUITE:clickhouse_url()},
                     {user, <<"default">>},
                     {key, ?CLICKHOUSE_PASSWORD},
                     {pool, tmp_pool}
@@ -205,15 +196,7 @@ clickhouse_config(Overrides) ->
             username => <<"default">>,
             password => <<?CLICKHOUSE_PASSWORD>>,
             pool_size => 8,
-            url => iolist_to_binary(
-                io_lib:format(
-                    "http://~s:~b",
-                    [
-                        ?CLICKHOUSE_HOST,
-                        ?CLICKHOUSE_DEFAULT_PORT
-                    ]
-                )
-            ),
+            url => emqx_bridge_clickhouse_SUITE:clickhouse_url(),
             connect_timeout => <<"10s">>
         },
     #{<<"config">> => maps:merge(Config, Overrides)}.

+ 12 - 0
apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl

@@ -42,6 +42,8 @@ resource_type(influxdb) ->
     emqx_bridge_influxdb_connector;
 resource_type(cassandra) ->
     emqx_bridge_cassandra_connector;
+resource_type(clickhouse) ->
+    emqx_bridge_clickhouse_connector;
 resource_type(mysql) ->
     emqx_bridge_mysql_connector;
 resource_type(pgsql) ->
@@ -181,6 +183,14 @@ connector_structs() ->
                     required => false
                 }
             )},
+        {clickhouse,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_clickhouse, "config_connector")),
+                #{
+                    desc => <<"ClickHouse Connector Config">>,
+                    required => false
+                }
+            )},
         {mysql,
             mk(
                 hoconsc:map(name, ref(emqx_bridge_mysql, "config_connector")),
@@ -307,6 +317,7 @@ schema_modules() ->
         emqx_bridge_oracle,
         emqx_bridge_influxdb,
         emqx_bridge_cassandra,
+        emqx_bridge_clickhouse,
         emqx_bridge_mysql,
         emqx_bridge_syskeeper_connector,
         emqx_bridge_syskeeper_proxy,
@@ -345,6 +356,7 @@ api_schemas(Method) ->
         api_ref(emqx_bridge_oracle, <<"oracle">>, Method ++ "_connector"),
         api_ref(emqx_bridge_influxdb, <<"influxdb">>, Method ++ "_connector"),
         api_ref(emqx_bridge_cassandra, <<"cassandra">>, Method ++ "_connector"),
+        api_ref(emqx_bridge_clickhouse, <<"clickhouse">>, Method ++ "_connector"),
         api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"),
         api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
         api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method),

+ 2 - 0
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -142,6 +142,8 @@ connector_type_to_bridge_types(influxdb) ->
     [influxdb, influxdb_api_v1, influxdb_api_v2];
 connector_type_to_bridge_types(cassandra) ->
     [cassandra];
+connector_type_to_bridge_types(clickhouse) ->
+    [clickhouse];
 connector_type_to_bridge_types(mysql) ->
     [mysql];
 connector_type_to_bridge_types(mqtt) ->

+ 1 - 0
changes/ee/feat-12425.en.md

@@ -0,0 +1 @@
+The bridges for ClickHouse have been split so they are available via the connectors and actions APIs. They are still backwards compatible with the old bridge API.

+ 10 - 0
rel/i18n/emqx_bridge_clickhouse.hocon

@@ -6,6 +6,16 @@ batch_value_separator.desc:
 batch_value_separator.label:
 """Batch Value Separator"""
 
+action_parameters.desc:
+"""Action specific configs."""
+action_parameters.label:
+"""Action"""
+
+clickhouse_action.desc:
+"""Action configs."""
+clickhouse_action.label:
+"""Action"""
+
 config_enable.desc:
 """Enable or disable this bridge"""