Explorar el Código

feat(bridge): add mysql sink

firest hace 3 años
padre
commit
88fd7e14dc

+ 75 - 0
lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mysql.conf

@@ -0,0 +1,75 @@
+emqx_ee_bridge_mysql {
+    sql {
+        desc {
+            en: """SQL Template"""
+            zh: """SQL 模板"""
+            }
+        label {
+            en: "SQL Template"
+            zh: "SQL 模板"
+        }
+    }
+    config_enable {
+        desc {
+            en: """Enable or disable this bridge"""
+            zh: """启用/禁用桥接"""
+        }
+        label {
+            en: "Enable Or Disable Bridge"
+            zh: "启用/禁用桥接"
+        }
+        }
+    config_direction {
+        desc {
+            en: """The direction of this bridge, MUST be 'egress'"""
+            zh: """桥接的方向, 必须是 egress"""
+        }
+        label {
+            en: "Bridge Direction"
+            zh: "桥接方向"
+        }
+    }
+
+    desc_config {
+        desc {
+            en: """Configuration for an HStreamDB bridge."""
+            zh: """HStreamDB 桥接配置"""
+        }
+        label: {
+            en: "HStreamDB Bridge Configuration"
+            zh: "HStreamDB 桥接配置"
+        }
+    }
+
+    desc_type {
+        desc {
+            en: """The Bridge Type"""
+            zh: """Bridge 类型"""
+        }
+        label {
+            en: "Bridge Type"
+            zh: "桥接类型"
+        }
+    }
+
+    desc_name {
+        desc {
+            en: """Bridge name, used as a human-readable description of the bridge."""
+            zh: """桥接名字,可读描述"""
+        }
+        label {
+            en: "Bridge Name"
+            zh: "桥接名字"
+        }
+    }
+    desc_connector {
+        desc {
+                en: """Generic configuration for the connector."""
+                zh: """连接器的通用配置。"""
+        }
+        label: {
+                en: "Connector Generic Configuration"
+                zh: "连接器通用配置。"
+        }
+    }
+}

+ 9 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl

@@ -15,6 +15,7 @@
 api_schemas(Method) ->
     [
         ref(emqx_ee_bridge_hstream, Method),
+        ref(emqx_ee_bridge_mysql, Method),
         ref(emqx_ee_bridge_influxdb, Method ++ "_udp"),
         ref(emqx_ee_bridge_influxdb, Method ++ "_api_v1"),
         ref(emqx_ee_bridge_influxdb, Method ++ "_api_v2")
@@ -23,7 +24,8 @@ api_schemas(Method) ->
 schema_modules() ->
     [
         emqx_ee_bridge_hstream,
-        emqx_ee_bridge_influxdb
+        emqx_ee_bridge_influxdb,
+        emqx_ee_bridge_mysql
     ].
 
 conn_bridge_examples(Method) ->
@@ -40,6 +42,7 @@ conn_bridge_examples(Method) ->
 
 resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8));
 resource_type(hstreamdb) -> emqx_ee_connector_hstream;
+resource_type(mysql) -> emqx_ee_connector_mysql;
 resource_type(influxdb_udp) -> emqx_ee_connector_influxdb;
 resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb;
 resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb.
@@ -50,6 +53,11 @@ fields(bridges) ->
             mk(
                 hoconsc:map(name, ref(emqx_ee_bridge_hstream, "config")),
                 #{desc => <<"EMQX Enterprise Config">>}
+            )},
+        {mysql,
+            mk(
+                hoconsc:map(name, ref(emqx_ee_bridge_mysql, "config")),
+                #{desc => <<"EMQX Enterprise Config">>}
             )}
     ] ++ fields(influxdb);
 fields(influxdb) ->

+ 104 - 0
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl

@@ -0,0 +1,104 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_ee_bridge_mysql).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include("emqx_ee_bridge.hrl").
+
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+-export([
+    conn_bridge_examples/1
+]).
+
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1
+]).
+
+-define(DEFAULT_SQL, <<
+    "insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) "
+    "values (${id}, ${topic}, ${qos}, ${payload}, FROM_UNIXTIME(${timestamp}/1000))"
+>>).
+
+%% -------------------------------------------------------------------------------------------------
+%% api
+
+conn_bridge_examples(Method) ->
+    [
+        #{
+            <<"mysql">> => #{
+                summary => <<"MySQL Bridge">>,
+                value => values(Method)
+            }
+        }
+    ].
+
+values(get) ->
+    maps:merge(values(post), ?METRICS_EXAMPLE);
+values(post) ->
+    #{
+        type => mysql,
+        name => <<"mysql">>,
+        connector => #{
+            server => <<"127.0.0.1:3306">>,
+            database => <<"test">>,
+            pool_size => 8,
+            username => <<"root">>,
+            password => <<"public">>,
+            auto_reconnect => true
+        },
+        enable => true,
+        direction => egress,
+        sql => ?DEFAULT_SQL
+    };
+values(put) ->
+    values(post).
+
+%% -------------------------------------------------------------------------------------------------
+%% Hocon Schema Definitions
+namespace() -> "bridge".
+
+roots() -> [].
+
+fields("config") ->
+    [
+        {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
+        {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})},
+        {sql, mk(binary(), #{default => ?DEFAULT_SQL, desc => ?DESC("sql")})},
+        {connector, field(connector)}
+    ];
+fields("post") ->
+    [type_field(), name_field() | fields("config")];
+fields("put") ->
+    fields("config");
+fields("get") ->
+    emqx_bridge_schema:metrics_status_fields() ++ fields("post").
+
+field(connector) ->
+    mk(
+        ref(emqx_ee_connector_mysql, config),
+        #{
+            required => true,
+            desc => ?DESC("desc_connector")
+        }
+    ).
+
+desc("config") ->
+    ?DESC("desc_config");
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+    ["Configuration for MySQL using `", string:to_upper(Method), "` method."];
+desc(_) ->
+    undefined.
+
+%% -------------------------------------------------------------------------------------------------
+%% internal
+type_field() ->
+    {type, mk(enum([mysql]), #{required => true, desc => ?DESC("desc_type")})}.
+
+name_field() ->
+    {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.

+ 23 - 0
lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_mysql.conf

@@ -0,0 +1,23 @@
+emqx_ee_connector_mysql {
+    type {
+        desc {
+                en: "The Connector Type."
+                zh: "连接器类型。"
+        }
+        label: {
+                en: "Connector Type"
+                zh: "连接器类型"
+            }
+    }
+
+    name {
+        desc {
+                en: "Connector name, used as a human-readable description of the connector."
+                zh: "连接器名称,人类可读的连接器描述。"
+        }
+        label: {
+                en: "Connector Name"
+                zh: "连接器名称"
+            }
+    }
+}

+ 66 - 0
lib-ee/emqx_ee_connector/src/emqx_ee_connector_mysql.erl

@@ -0,0 +1,66 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_ee_connector_mysql).
+
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("emqx/include/logger.hrl").
+
+-import(hoconsc, [mk/2, enum/1]).
+
+-behaviour(emqx_resource).
+
+%% callbacks of behaviour emqx_resource
+-export([
+    on_start/2,
+    on_stop/2,
+    on_query/4,
+    on_get_status/2
+]).
+
+-export([
+    roots/0,
+    fields/1
+]).
+
+-define(SEND_MSG_KEY, send_message).
+
+%% -------------------------------------------------------------------------------------------------
+%% resource callback
+
+on_start(InstId, #{egress := #{sql := SQL}} = Config) ->
+    {PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(SQL, '?'),
+    {ok, State} = emqx_connector_mysql:on_start(InstId, Config#{
+        prepare_statement => #{?SEND_MSG_KEY => PrepareSQL}
+    }),
+    {ok, State#{'ParamsTokens' => ParamsTokens}}.
+
+on_stop(InstId, State) ->
+    emqx_connector_mysql:on_stop(InstId, State).
+
+on_query(
+    InstId,
+    {?SEND_MSG_KEY, Msg},
+    AfterQuery,
+    #{'ParamsTokens' := ParamsTokens} = State
+) ->
+    Data = emqx_plugin_libs_rule:proc_sql(ParamsTokens, Msg),
+    emqx_connector_mysql:on_query(
+        InstId, {prepared_query, ?SEND_MSG_KEY, Data}, AfterQuery, State
+    ).
+
+on_get_status(InstId, State) ->
+    emqx_connector_mysql:on_get_status(InstId, State).
+
+%% -------------------------------------------------------------------------------------------------
+%% schema
+
+roots() ->
+    fields(config).
+
+fields(config) ->
+    emqx_connector_mysql:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields().
+
+%% -------------------------------------------------------------------------------------------------
+%% internal functions