Просмотр исходного кода

feat(bridges): integrate PostgreSQL into bridges

support both simple and batch query
firest 3 лет назад
Родитель
Сommit
f7b50c56da

+ 237 - 28
apps/emqx_connector/src/emqx_connector_pgsql.erl

@@ -20,6 +20,7 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("epgsql/include/epgsql.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -export([roots/0, fields/1]).
 
@@ -31,6 +32,7 @@
     on_start/2,
     on_stop/2,
     on_query/3,
+    on_batch_query/3,
     on_get_status/2
 ]).
 
@@ -38,7 +40,8 @@
 
 -export([
     query/3,
-    prepared_query/3
+    prepared_query/3,
+    execute_batch/3
 ]).
 
 -export([do_get_status/1]).
@@ -47,6 +50,18 @@
     default_port => ?PGSQL_DEFAULT_PORT
 }).
 
+-type prepares() :: #{atom() => binary()}.
+-type params_tokens() :: #{atom() => list()}.
+
+-type state() ::
+    #{
+        poolname := atom(),
+        auto_reconnect := boolean(),
+        prepare_sql := prepares(),
+        params_tokens := params_tokens(),
+        prepare_statement := epgsql:statement()
+    }.
+
 %%=====================================================================
 
 roots() ->
@@ -65,6 +80,7 @@ server() ->
 %% ===================================================================
 callback_mode() -> always_sync.
 
+-spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}.
 on_start(
     InstId,
     #{
@@ -87,7 +103,7 @@ on_start(
         case maps:get(enable, SSL) of
             true ->
                 [
-                    {ssl, true},
+                    {ssl, required},
                     {ssl_opts, emqx_tls_lib:to_client_opts(SSL)}
                 ];
             false ->
@@ -100,13 +116,21 @@ on_start(
         {password, emqx_secret:wrap(Password)},
         {database, DB},
         {auto_reconnect, reconn_interval(AutoReconn)},
-        {pool_size, PoolSize},
-        {prepare_statement, maps:to_list(maps:get(prepare_statement, Config, #{}))}
+        {pool_size, PoolSize}
     ],
     PoolName = emqx_plugin_libs_pool:pool_name(InstId),
+    Prepares = parse_prepare_sql(Config),
+    InitState = #{poolname => PoolName, auto_reconnect => AutoReconn, prepare_statement => #{}},
+    State = maps:merge(InitState, Prepares),
     case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
-        ok -> {ok, #{poolname => PoolName, auto_reconnect => AutoReconn}};
-        {error, Reason} -> {error, Reason}
+        ok ->
+            {ok, init_prepare(State)};
+        {error, Reason} ->
+            ?tp(
+                pgsql_connector_start_failed,
+                #{error => Reason}
+            ),
+            {error, Reason}
     end.
 
 on_stop(InstId, #{poolname := PoolName}) ->
@@ -116,37 +140,145 @@ on_stop(InstId, #{poolname := PoolName}) ->
     }),
     emqx_plugin_libs_pool:stop_pool(PoolName).
 
-on_query(InstId, {Type, NameOrSQL}, #{poolname := _PoolName} = State) ->
-    on_query(InstId, {Type, NameOrSQL, []}, State);
-on_query(InstId, {Type, NameOrSQL, Params}, #{poolname := PoolName} = State) ->
+on_query(InstId, {TypeOrKey, NameOrSQL}, #{poolname := _PoolName} = State) ->
+    on_query(InstId, {TypeOrKey, NameOrSQL, []}, State);
+on_query(
+    InstId,
+    {TypeOrKey, NameOrSQL, Params},
+    #{poolname := PoolName} = State
+) ->
     ?SLOG(debug, #{
         msg => "postgresql connector received sql query",
         connector => InstId,
+        type => TypeOrKey,
         sql => NameOrSQL,
         state => State
     }),
-    case Result = ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Params]}, no_handover) of
+    Type = pgsql_query_type(TypeOrKey),
+    {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
+    on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data).
+
+pgsql_query_type(sql) ->
+    query;
+pgsql_query_type(query) ->
+    query;
+pgsql_query_type(prepared_query) ->
+    prepared_query;
+%% for bridge
+pgsql_query_type(_) ->
+    pgsql_query_type(prepared_query).
+
+on_batch_query(
+    InstId,
+    BatchReq,
+    #{poolname := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State
+) ->
+    case BatchReq of
+        [{Key, _} = Request | _] ->
+            BinKey = to_bin(Key),
+            case maps:get(BinKey, Tokens, undefined) of
+                undefined ->
+                    Log = #{
+                        connector => InstId,
+                        first_request => Request,
+                        state => State,
+                        msg => "batch prepare not implemented"
+                    },
+                    ?SLOG(error, Log),
+                    {error, batch_prepare_not_implemented};
+                TokenList ->
+                    {_, Datas} = lists:unzip(BatchReq),
+                    Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas],
+                    St = maps:get(BinKey, Sts),
+                    {_Column, Results} = on_sql_query(InstId, PoolName, execute_batch, St, Datas2),
+                    %% this local function only suits for the result of batch insert
+                    TransResult = fun
+                        Trans([{ok, Count} | T], Acc) ->
+                            Trans(T, Acc + Count);
+                        Trans([{error, _} = Error | _], _Acc) ->
+                            Error;
+                        Trans([], Acc) ->
+                            {ok, Acc}
+                    end,
+
+                    TransResult(Results, 0)
+            end;
+        _ ->
+            Log = #{
+                connector => InstId,
+                request => BatchReq,
+                state => State,
+                msg => "invalid request"
+            },
+            ?SLOG(error, Log),
+            {error, invalid_request}
+    end.
+
+proc_sql_params(query, SQLOrKey, Params, _State) ->
+    {SQLOrKey, Params};
+proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
+    {SQLOrKey, Params};
+proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) ->
+    Key = to_bin(TypeOrKey),
+    case maps:get(Key, ParamsTokens, undefined) of
+        undefined ->
+            {SQLOrData, Params};
+        Tokens ->
+            {Key, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)}
+    end.
+
+on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
+    Result = ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover),
+    case Result of
         {error, Reason} ->
             ?SLOG(error, #{
                 msg => "postgresql connector do sql query failed",
                 connector => InstId,
+                type => Type,
                 sql => NameOrSQL,
                 reason => Reason
             });
         _ ->
+            ?tp(
+                pgsql_connector_query_return,
+                #{result => Result}
+            ),
             ok
     end,
     Result.
 
-on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn}) ->
+on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) ->
     case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
-        true -> connected;
-        false -> conn_status(AutoReconn)
+        true ->
+            case do_check_prepares(State) of
+                ok ->
+                    connected;
+                {ok, NState} ->
+                    %% return new state with prepared statements
+                    {connected, NState};
+                false ->
+                    %% do not log error, it is logged in prepare_sql_to_conn
+                    conn_status(AutoReconn)
+            end;
+        false ->
+            conn_status(AutoReconn)
     end.
 
 do_get_status(Conn) ->
     ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")).
 
+do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) ->
+    ok;
+do_check_prepares(State = #{poolname := PoolName, prepare_sql := {error, Prepares}}) ->
+    %% retry to prepare
+    case prepare_sql(Prepares, PoolName) of
+        {ok, Sts} ->
+            %% remove the error
+            {ok, State#{prepare_sql => Prepares, prepare_statement := Sts}};
+        _Error ->
+            false
+    end.
+
 %% ===================================================================
 conn_status(_AutoReconn = true) -> connecting;
 conn_status(_AutoReconn = false) -> disconnected.
@@ -158,13 +290,9 @@ connect(Opts) ->
     Host = proplists:get_value(host, Opts),
     Username = proplists:get_value(username, Opts),
     Password = emqx_secret:unwrap(proplists:get_value(password, Opts)),
-    PrepareStatement = proplists:get_value(prepare_statement, Opts),
     case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of
-        {ok, Conn} ->
-            case parse(Conn, PrepareStatement) of
-                ok -> {ok, Conn};
-                {error, Reason} -> {error, Reason}
-            end;
+        {ok, _Conn} = Ok ->
+            Ok;
         {error, Reason} ->
             {error, Reason}
     end.
@@ -175,15 +303,8 @@ query(Conn, SQL, Params) ->
 prepared_query(Conn, Name, Params) ->
     epgsql:prepared_query2(Conn, Name, Params).
 
-parse(_Conn, []) ->
-    ok;
-parse(Conn, [{Name, Query} | More]) ->
-    case epgsql:parse2(Conn, Name, Query, []) of
-        {ok, _Statement} ->
-            parse(Conn, More);
-        Other ->
-            Other
-    end.
+execute_batch(Conn, Statement, Params) ->
+    epgsql:execute_batch(Conn, Statement, Params).
 
 conn_opts(Opts) ->
     conn_opts(Opts, []).
@@ -206,3 +327,91 @@ conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
     conn_opts(Opts, [Opt | Acc]);
 conn_opts([_Opt | Opts], Acc) ->
     conn_opts(Opts, Acc).
+
+parse_prepare_sql(Config) ->
+    SQL =
+        case maps:get(prepare_statement, Config, undefined) of
+            undefined ->
+                case maps:get(sql, Config, undefined) of
+                    undefined -> #{};
+                    Template -> #{<<"send_message">> => Template}
+                end;
+            Any ->
+                Any
+        end,
+    parse_prepare_sql(maps:to_list(SQL), #{}, #{}).
+
+parse_prepare_sql([{Key, H} | T], Prepares, Tokens) ->
+    {PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, '$n'),
+    parse_prepare_sql(
+        T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}
+    );
+parse_prepare_sql([], Prepares, Tokens) ->
+    #{
+        prepare_sql => Prepares,
+        params_tokens => Tokens
+    }.
+
+init_prepare(State = #{prepare_sql := Prepares, poolname := PoolName}) ->
+    case maps:size(Prepares) of
+        0 ->
+            State;
+        _ ->
+            case prepare_sql(Prepares, PoolName) of
+                {ok, Sts} ->
+                    State#{prepare_statement := Sts};
+                Error ->
+                    LogMeta = #{
+                        msg => <<"PostgreSQL init prepare statement failed">>, error => Error
+                    },
+                    ?SLOG(error, LogMeta),
+                    %% mark the prepare_sqlas failed
+                    State#{prepare_sql => {error, Prepares}}
+            end
+    end.
+
+prepare_sql(Prepares, PoolName) when is_map(Prepares) ->
+    prepare_sql(maps:to_list(Prepares), PoolName);
+prepare_sql(Prepares, PoolName) ->
+    case do_prepare_sql(Prepares, PoolName) of
+        {ok, _Sts} = Ok ->
+            %% prepare for reconnect
+            ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}),
+            Ok;
+        Error ->
+            Error
+    end.
+
+do_prepare_sql(Prepares, PoolName) ->
+    do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, #{}).
+
+do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) ->
+    {ok, Conn} = ecpool_worker:client(Worker),
+    case prepare_sql_to_conn(Conn, Prepares) of
+        {ok, Sts} ->
+            do_prepare_sql(T, Prepares, PoolName, Sts);
+        Error ->
+            Error
+    end;
+do_prepare_sql([], _Prepares, _PoolName, LastSts) ->
+    {ok, LastSts}.
+
+prepare_sql_to_conn(Conn, Prepares) ->
+    prepare_sql_to_conn(Conn, Prepares, #{}).
+
+prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements};
+prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) ->
+    LogMeta = #{msg => "PostgreSQL Prepare Statement", name => Key, prepare_sql => SQL},
+    ?SLOG(info, LogMeta),
+    case epgsql:parse2(Conn, Key, SQL, []) of
+        {ok, Statement} ->
+            prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => Statement});
+        {error, Error} = Other ->
+            ?SLOG(error, LogMeta#{msg => "PostgreSQL parse failed", error => Error}),
+            Other
+    end.
+
+to_bin(Bin) when is_binary(Bin) ->
+    Bin;
+to_bin(Atom) when is_atom(Atom) ->
+    erlang:atom_to_binary(Atom).

+ 74 - 0
lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_pgsql.conf

@@ -0,0 +1,74 @@
+emqx_ee_bridge_pgsql {
+
+    local_topic {
+        desc {
+            en: """The MQTT topic filter to be forwarded to PostgreSQL. All MQTT 'PUBLISH' messages with the topic
+matching the local_topic will be forwarded.</br>
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is
+configured, then both the data got from the rule and the MQTT messages that match local_topic
+will be forwarded.
+"""
+            zh: """发送到 'local_topic' 的消息都会转发到 PostgreSQL。 </br>
+注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。
+"""
+        }
+        label {
+                en: "Local Topic"
+                zh: "本地 Topic"
+            }
+    }
+
+    sql_template {
+        desc {
+            en: """SQL Template"""
+            zh: """SQL 模板"""
+            }
+        label {
+            en: "SQL Template"
+            zh: "SQL 模板"
+        }
+    }
+    config_enable {
+        desc {
+            en: """Enable or disable this bridge"""
+            zh: """启用/禁用桥接"""
+        }
+        label {
+            en: "Enable Or Disable Bridge"
+            zh: "启用/禁用桥接"
+        }
+        }
+
+    desc_config {
+        desc {
+            en: """Configuration for an PostgreSQL bridge."""
+            zh: """PostgreSQL 桥接配置"""
+        }
+        label: {
+            en: "PostgreSQL Bridge Configuration"
+            zh: "PostgreSQL 桥接配置"
+        }
+    }
+
+    desc_type {
+        desc {
+            en: """The Bridge Type"""
+            zh: """Bridge 类型"""
+        }
+        label {
+            en: "Bridge Type"
+            zh: "桥接类型"
+        }
+    }
+
+    desc_name {
+        desc {
+            en: """Bridge name."""
+            zh: """桥接名字"""
+        }
+        label {
+            en: "Bridge Name"
+            zh: "桥接名字"
+        }
+    }
+}

+ 13 - 2
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl

@@ -17,6 +17,7 @@ api_schemas(Method) ->
         ref(emqx_ee_bridge_gcp_pubsub, Method),
         ref(emqx_ee_bridge_kafka, Method),
         ref(emqx_ee_bridge_mysql, Method),
+        ref(emqx_ee_bridge_pgsql, Method),
         ref(emqx_ee_bridge_mongodb, Method ++ "_rs"),
         ref(emqx_ee_bridge_mongodb, Method ++ "_sharded"),
         ref(emqx_ee_bridge_mongodb, Method ++ "_single"),
@@ -36,7 +37,8 @@ schema_modules() ->
         emqx_ee_bridge_influxdb,
         emqx_ee_bridge_mongodb,
         emqx_ee_bridge_mysql,
-        emqx_ee_bridge_redis
+        emqx_ee_bridge_redis,
+        emqx_ee_bridge_pgsql
     ].
 
 examples(Method) ->
@@ -63,7 +65,8 @@ resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb;
 resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb;
 resource_type(redis_single) -> emqx_ee_connector_redis;
 resource_type(redis_sentinel) -> emqx_ee_connector_redis;
-resource_type(redis_cluster) -> emqx_ee_connector_redis.
+resource_type(redis_cluster) -> emqx_ee_connector_redis;
+resource_type(pgsql) -> emqx_connector_pgsql.
 
 fields(bridges) ->
     [
@@ -98,6 +101,14 @@ fields(bridges) ->
                     desc => <<"MySQL Bridge Config">>,
                     required => false
                 }
+            )},
+        {pgsql,
+            mk(
+                hoconsc:map(name, ref(emqx_ee_bridge_pgsql, "config")),
+                #{
+                    desc => <<"PostgreSQL Bridge Config">>,
+                    required => false
+                }
             )}
     ] ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs().
 

+ 130 - 0
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl

@@ -0,0 +1,130 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_ee_bridge_pgsql).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx_bridge/include/emqx_bridge.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+-export([
+    conn_bridge_examples/1
+]).
+
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1
+]).
+
+-define(DEFAULT_SQL, <<
+    "insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) "
+    "values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
+>>).
+
+%% -------------------------------------------------------------------------------------------------
+%% api
+
+conn_bridge_examples(Method) ->
+    [
+        #{
+            <<"pgsql">> => #{
+                summary => <<"PostgreSQL Bridge">>,
+                value => values(Method)
+            }
+        }
+    ].
+
+values(get) ->
+    maps:merge(values(post), ?METRICS_EXAMPLE);
+values(post) ->
+    #{
+        enable => true,
+        type => pgsql,
+        name => <<"foo">>,
+        server => <<"127.0.0.1:5432">>,
+        database => <<"mqtt">>,
+        pool_size => 8,
+        username => <<"root">>,
+        password => <<"public">>,
+        auto_reconnect => true,
+        sql => ?DEFAULT_SQL,
+        local_topic => <<"local/topic/#">>,
+        resource_opts => #{
+            worker_pool_size => 8,
+            health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
+            auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
+            batch_size => ?DEFAULT_BATCH_SIZE,
+            batch_time => ?DEFAULT_BATCH_TIME,
+            query_mode => async,
+            max_queue_bytes => ?DEFAULT_QUEUE_SIZE
+        }
+    };
+values(put) ->
+    values(post).
+
+%% -------------------------------------------------------------------------------------------------
+%% Hocon Schema Definitions
+namespace() -> "bridge_pgsql".
+
+roots() -> [].
+
+fields("config") ->
+    [
+        {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
+        {sql,
+            mk(
+                binary(),
+                #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
+            )},
+        {local_topic,
+            mk(
+                binary(),
+                #{desc => ?DESC("local_topic"), default => undefined}
+            )},
+        {resource_opts,
+            mk(
+                ref(?MODULE, "creation_opts"),
+                #{
+                    required => false,
+                    default => #{},
+                    desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
+                }
+            )}
+    ] ++
+        emqx_connector_mysql:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields();
+fields("creation_opts") ->
+    Opts = emqx_resource_schema:fields("creation_opts"),
+    [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)];
+fields("post") ->
+    [type_field(), name_field() | fields("config")];
+fields("put") ->
+    fields("config");
+fields("get") ->
+    emqx_bridge_schema:metrics_status_fields() ++ fields("post").
+
+desc("config") ->
+    ?DESC("desc_config");
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+    ["Configuration for PostgreSQL using `", string:to_upper(Method), "` method."];
+desc("creation_opts" = Name) ->
+    emqx_resource_schema:desc(Name);
+desc(_) ->
+    undefined.
+
+%% -------------------------------------------------------------------------------------------------
+%% internal
+is_hidden_opts(Field) ->
+    lists:member(Field, [
+        async_inflight_window
+    ]).
+
+type_field() ->
+    {type, mk(enum([mysql]), #{required => true, desc => ?DESC("desc_type")})}.
+
+name_field() ->
+    {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.