emqx_bridge_pgsql.erl 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_pgsql).
  5. -include_lib("emqx_connector/include/emqx_connector.hrl").
  6. -include_lib("emqx_postgresql/include/emqx_postgresql.hrl").
  7. -include_lib("typerefl/include/types.hrl").
  8. -include_lib("emqx/include/logger.hrl").
  9. -include_lib("hocon/include/hoconsc.hrl").
  10. -include_lib("epgsql/include/epgsql.hrl").
  11. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  12. -include_lib("emqx_resource/include/emqx_resource.hrl").
  13. -export([
  14. namespace/0,
  15. roots/0,
  16. fields/1,
  17. desc/1,
  18. fields/2
  19. ]).
  20. %% Examples
  21. -export([
  22. bridge_v2_examples/1,
  23. conn_bridge_examples/1
  24. ]).
  25. %% Exported for timescale and matrix bridges
  26. -export([
  27. values/1,
  28. values_conn_bridge_examples/2
  29. ]).
  30. -define(PGSQL_HOST_OPTIONS, #{
  31. default_port => ?PGSQL_DEFAULT_PORT
  32. }).
  33. %% Hocon Schema Definitions
  34. namespace() -> "bridge_pgsql".
  35. roots() ->
  36. [].
  37. fields("config_connector") ->
  38. emqx_postgresql_connector_schema:fields("config_connector");
  39. fields(config) ->
  40. fields("config_connector") ++
  41. fields(action);
  42. fields(action) ->
  43. {pgsql,
  44. hoconsc:mk(
  45. hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql, pgsql_action)),
  46. #{
  47. desc => <<"PostgreSQL Action Config">>,
  48. required => false
  49. }
  50. )};
  51. fields(action_parameters) ->
  52. [
  53. {sql,
  54. hoconsc:mk(
  55. binary(),
  56. #{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>}
  57. )}
  58. ] ++
  59. emqx_connector_schema_lib:prepare_statement_fields();
  60. fields(pgsql_action) ->
  61. emqx_bridge_v2_schema:make_producer_action_schema(
  62. hoconsc:mk(
  63. hoconsc:ref(?MODULE, action_parameters),
  64. #{
  65. required => true,
  66. desc => ?DESC("action_parameters")
  67. }
  68. )
  69. );
  70. fields("put_bridge_v2") ->
  71. fields(pgsql_action);
  72. fields("get_bridge_v2") ->
  73. fields(pgsql_action);
  74. fields("post_bridge_v2") ->
  75. fields(pgsql_action);
  76. fields("config") ->
  77. [
  78. {enable, hoconsc:mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
  79. {sql,
  80. hoconsc:mk(
  81. binary(),
  82. #{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>}
  83. )},
  84. {local_topic,
  85. hoconsc:mk(
  86. binary(),
  87. #{desc => ?DESC("local_topic"), default => undefined}
  88. )}
  89. ] ++ emqx_resource_schema:fields("resource_opts") ++
  90. (emqx_postgresql:fields(config) --
  91. emqx_connector_schema_lib:prepare_statement_fields());
  92. fields("post") ->
  93. fields("post", pgsql);
  94. fields("put") ->
  95. fields("config");
  96. fields("get") ->
  97. emqx_bridge_schema:status_fields() ++ fields("post").
  98. fields("post", Type) ->
  99. [type_field(Type), name_field() | fields("config")].
  100. type_field(Type) ->
  101. {type, hoconsc:mk(hoconsc:enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
  102. name_field() ->
  103. {name, hoconsc:mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
  104. desc("config") ->
  105. ?DESC("desc_config");
  106. desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
  107. ["Configuration for PostgreSQL using `", string:to_upper(Method), "` method."];
  108. desc(pgsql_action) ->
  109. ?DESC("pgsql_action");
  110. desc(action_parameters) ->
  111. ?DESC("action_parameters");
  112. desc("config_connector") ->
  113. ?DESC(emqx_postgresql_connector_schema, "config_connector");
  114. desc(_) ->
  115. undefined.
  116. default_sql() ->
  117. <<
  118. "insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) "
  119. "values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
  120. >>.
  121. %% Examples
  122. bridge_v2_examples(Method) ->
  123. [
  124. #{
  125. <<"pgsql">> => #{
  126. summary => <<"PostgreSQL Action">>,
  127. value => values({Method, pgsql})
  128. }
  129. }
  130. ].
  131. conn_bridge_examples(Method) ->
  132. [
  133. #{
  134. <<"pgsql">> => #{
  135. summary => <<"PostgreSQL Bridge">>,
  136. value => values_conn_bridge_examples(Method, pgsql)
  137. }
  138. }
  139. ].
  140. values({get, PostgreSQLType}) ->
  141. maps:merge(
  142. #{
  143. status => <<"connected">>,
  144. node_status => [
  145. #{
  146. node => <<"emqx@localhost">>,
  147. status => <<"connected">>
  148. }
  149. ]
  150. },
  151. values({put, PostgreSQLType})
  152. );
  153. values({post, PostgreSQLType}) ->
  154. values({put, PostgreSQLType});
  155. values({put, PostgreSQLType}) ->
  156. maps:merge(
  157. #{
  158. name => <<"my_action">>,
  159. type => PostgreSQLType,
  160. enable => true,
  161. connector => <<"my_connector">>,
  162. resource_opts => #{
  163. batch_size => 1,
  164. batch_time => <<"50ms">>,
  165. inflight_window => 100,
  166. max_buffer_bytes => <<"256MB">>,
  167. request_ttl => <<"45s">>,
  168. worker_pool_size => 16
  169. }
  170. },
  171. values(parameters)
  172. );
  173. values(parameters) ->
  174. #{
  175. <<"parameters">> => #{
  176. <<"sql">> =>
  177. <<
  178. "INSERT INTO client_events(clientid, event, created_at)"
  179. "VALUES (\n"
  180. " ${clientid},\n"
  181. " ${event},\n"
  182. " TO_TIMESTAMP((${timestamp} :: bigint))\n"
  183. ")"
  184. >>
  185. }
  186. }.
  187. values_conn_bridge_examples(get, Type) ->
  188. maps:merge(
  189. #{
  190. status => <<"connected">>,
  191. node_status => [
  192. #{
  193. node => <<"emqx@localhost">>,
  194. status => <<"connected">>
  195. }
  196. ]
  197. },
  198. values_conn_bridge_examples(post, Type)
  199. );
  200. values_conn_bridge_examples(_Method, Type) ->
  201. #{
  202. enable => true,
  203. type => Type,
  204. name => <<"foo">>,
  205. server => <<"127.0.0.1:5432">>,
  206. database => <<"mqtt">>,
  207. pool_size => 8,
  208. username => <<"root">>,
  209. password => <<"******">>,
  210. sql => default_sql(),
  211. local_topic => <<"local/topic/#">>,
  212. resource_opts => #{
  213. worker_pool_size => 8,
  214. health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
  215. batch_size => ?DEFAULT_BATCH_SIZE,
  216. batch_time => ?DEFAULT_BATCH_TIME,
  217. query_mode => async,
  218. max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
  219. }
  220. }.