| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%--------------------------------------------------------------------
- -module(emqx_bridge_pgsql).
- -include_lib("emqx_connector/include/emqx_connector.hrl").
- -include_lib("emqx_postgresql/include/emqx_postgresql.hrl").
- -include_lib("typerefl/include/types.hrl").
- -include_lib("emqx/include/logger.hrl").
- -include_lib("hocon/include/hoconsc.hrl").
- -include_lib("epgsql/include/epgsql.hrl").
- -include_lib("snabbkaffe/include/snabbkaffe.hrl").
- -include_lib("emqx_resource/include/emqx_resource.hrl").
- %% `hocon_schema' API
- -export([
- namespace/0,
- roots/0,
- fields/1,
- desc/1
- ]).
- %% for sharing with other actions
- -export([fields/3]).
- %% Examples
- -export([
- bridge_v2_examples/1,
- conn_bridge_examples/1
- ]).
- %% Exported for timescale and matrix bridges
- -export([
- values/1,
- values_conn_bridge_examples/2
- ]).
- -define(ACTION_TYPE, pgsql).
- %% Hocon Schema Definitions
- namespace() -> "bridge_pgsql".
- roots() ->
- [].
- fields("config_connector") ->
- emqx_postgresql_connector_schema:fields("config_connector");
- fields(config) ->
- fields("config_connector") ++
- fields(action);
- fields(action) ->
- {pgsql,
- hoconsc:mk(
- hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql, pgsql_action)),
- #{
- desc => <<"PostgreSQL Action Config">>,
- required => false
- }
- )};
- fields(action_parameters) ->
- [
- {sql,
- hoconsc:mk(
- emqx_schema:template(),
- #{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>}
- )}
- ];
- fields(pgsql_action) ->
- emqx_bridge_v2_schema:make_producer_action_schema(
- hoconsc:mk(
- hoconsc:ref(?MODULE, action_parameters),
- #{
- required => true,
- desc => ?DESC("action_parameters")
- }
- )
- );
- fields("put_bridge_v2") ->
- fields(pgsql_action);
- fields("get_bridge_v2") ->
- fields(pgsql_action);
- fields("post_bridge_v2") ->
- fields("post", pgsql, pgsql_action);
- fields("config") ->
- %% Bridge v1 config for all postgres-based bridges (pgsql, matrix, timescale)
- [
- {enable, hoconsc:mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
- {sql,
- hoconsc:mk(
- binary(),
- #{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>}
- )},
- {local_topic,
- hoconsc:mk(
- binary(),
- #{desc => ?DESC("local_topic"), default => undefined}
- )}
- ] ++ emqx_resource_schema:fields("resource_opts") ++
- proplists:delete(
- disable_prepared_statements,
- emqx_postgresql:fields(config) --
- emqx_connector_schema_lib:prepare_statement_fields()
- );
- fields("post") ->
- fields("post", ?ACTION_TYPE, "config");
- fields("put") ->
- fields("config");
- fields("get") ->
- emqx_bridge_schema:status_fields() ++ fields("post").
- fields("post", Type, StructName) ->
- [type_field(Type), name_field() | fields(StructName)].
- type_field(Type) ->
- {type, hoconsc:mk(hoconsc:enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
- name_field() ->
- {name, hoconsc:mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
- desc("config") ->
- ?DESC("desc_config");
- desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
- ["Configuration for PostgreSQL using `", string:to_upper(Method), "` method."];
- desc(pgsql_action) ->
- ?DESC("pgsql_action");
- desc(action_parameters) ->
- ?DESC("action_parameters");
- desc("config_connector") ->
- ?DESC(emqx_postgresql_connector_schema, "config_connector");
- desc(_) ->
- undefined.
- default_sql() ->
- <<
- "insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) "
- "values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
- >>.
- %% Examples
- bridge_v2_examples(Method) ->
- [
- #{
- <<"pgsql">> => #{
- summary => <<"PostgreSQL Action">>,
- value => values({Method, pgsql})
- }
- }
- ].
- conn_bridge_examples(Method) ->
- [
- #{
- <<"pgsql">> => #{
- summary => <<"PostgreSQL Bridge">>,
- value => values_conn_bridge_examples(Method, pgsql)
- }
- }
- ].
- values({get, PostgreSQLType}) ->
- maps:merge(
- #{
- status => <<"connected">>,
- node_status => [
- #{
- node => <<"emqx@localhost">>,
- status => <<"connected">>
- }
- ]
- },
- values({put, PostgreSQLType})
- );
- values({post, PostgreSQLType}) ->
- values({put, PostgreSQLType});
- values({put, PostgreSQLType}) ->
- maps:merge(
- #{
- name => <<"my_action">>,
- type => PostgreSQLType,
- enable => true,
- connector => <<"my_connector">>,
- resource_opts => #{
- batch_size => 1,
- batch_time => <<"50ms">>,
- inflight_window => 100,
- max_buffer_bytes => <<"256MB">>,
- request_ttl => <<"45s">>,
- worker_pool_size => 16
- }
- },
- values(parameters)
- );
- values(parameters) ->
- #{
- <<"parameters">> => #{
- <<"sql">> =>
- <<
- "INSERT INTO client_events(clientid, event, created_at)"
- "VALUES (\n"
- " ${clientid},\n"
- " ${event},\n"
- " TO_TIMESTAMP((${timestamp} :: bigint))\n"
- ")"
- >>
- }
- }.
- values_conn_bridge_examples(get, Type) ->
- maps:merge(
- #{
- status => <<"connected">>,
- node_status => [
- #{
- node => <<"emqx@localhost">>,
- status => <<"connected">>
- }
- ]
- },
- values_conn_bridge_examples(post, Type)
- );
- values_conn_bridge_examples(_Method, Type) ->
- #{
- enable => true,
- type => Type,
- name => <<"foo">>,
- server => <<"127.0.0.1:5432">>,
- database => <<"mqtt">>,
- pool_size => 8,
- username => <<"root">>,
- password => <<"******">>,
- sql => default_sql(),
- local_topic => <<"local/topic/#">>,
- resource_opts => #{
- worker_pool_size => 8,
- health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
- batch_size => ?DEFAULT_BATCH_SIZE,
- batch_time => ?DEFAULT_BATCH_TIME,
- query_mode => async,
- max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
- }
- }.
|