emqx_bridge_pgsql.erl 6.8 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_pgsql).
  5. -include_lib("emqx_connector/include/emqx_connector.hrl").
  6. -include_lib("emqx_postgresql/include/emqx_postgresql.hrl").
  7. -include_lib("typerefl/include/types.hrl").
  8. -include_lib("emqx/include/logger.hrl").
  9. -include_lib("hocon/include/hoconsc.hrl").
  10. -include_lib("epgsql/include/epgsql.hrl").
  11. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  12. -include_lib("emqx_resource/include/emqx_resource.hrl").
  13. %% `hocon_schema' API
  14. -export([
  15. namespace/0,
  16. roots/0,
  17. fields/1,
  18. desc/1
  19. ]).
  20. %% for sharing with other actions
  21. -export([fields/3]).
  22. %% Examples
  23. -export([
  24. bridge_v2_examples/1,
  25. conn_bridge_examples/1
  26. ]).
  27. %% Exported for timescale and matrix bridges
  28. -export([
  29. values/1,
  30. values_conn_bridge_examples/2
  31. ]).
  32. -define(ACTION_TYPE, pgsql).
  33. %% Hocon Schema Definitions
  34. namespace() -> "bridge_pgsql".
  35. roots() ->
  36. [].
  37. fields("config_connector") ->
  38. emqx_postgresql_connector_schema:fields("config_connector");
  39. fields(config) ->
  40. fields("config_connector") ++
  41. fields(action);
  42. fields(action) ->
  43. {pgsql,
  44. hoconsc:mk(
  45. hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql, pgsql_action)),
  46. #{
  47. desc => <<"PostgreSQL Action Config">>,
  48. required => false
  49. }
  50. )};
  51. fields(action_parameters) ->
  52. [
  53. {sql,
  54. hoconsc:mk(
  55. emqx_schema:template(),
  56. #{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>}
  57. )}
  58. ];
  59. fields(pgsql_action) ->
  60. emqx_bridge_v2_schema:make_producer_action_schema(
  61. hoconsc:mk(
  62. hoconsc:ref(?MODULE, action_parameters),
  63. #{
  64. required => true,
  65. desc => ?DESC("action_parameters")
  66. }
  67. )
  68. );
  69. fields("put_bridge_v2") ->
  70. fields(pgsql_action);
  71. fields("get_bridge_v2") ->
  72. fields(pgsql_action);
  73. fields("post_bridge_v2") ->
  74. fields("post", pgsql, pgsql_action);
  75. fields("config") ->
  76. %% Bridge v1 config for all postgres-based bridges (pgsql, matrix, timescale)
  77. [
  78. {enable, hoconsc:mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
  79. {sql,
  80. hoconsc:mk(
  81. binary(),
  82. #{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>}
  83. )},
  84. {local_topic,
  85. hoconsc:mk(
  86. binary(),
  87. #{desc => ?DESC("local_topic"), default => undefined}
  88. )}
  89. ] ++ emqx_resource_schema:fields("resource_opts") ++
  90. proplists:delete(
  91. disable_prepared_statements,
  92. emqx_postgresql:fields(config) --
  93. emqx_connector_schema_lib:prepare_statement_fields()
  94. );
  95. fields("post") ->
  96. fields("post", ?ACTION_TYPE, "config");
  97. fields("put") ->
  98. fields("config");
  99. fields("get") ->
  100. emqx_bridge_schema:status_fields() ++ fields("post").
  101. fields("post", Type, StructName) ->
  102. [type_field(Type), name_field() | fields(StructName)].
  103. type_field(Type) ->
  104. {type, hoconsc:mk(hoconsc:enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
  105. name_field() ->
  106. {name, hoconsc:mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
  107. desc("config") ->
  108. ?DESC("desc_config");
  109. desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
  110. ["Configuration for PostgreSQL using `", string:to_upper(Method), "` method."];
  111. desc(pgsql_action) ->
  112. ?DESC("pgsql_action");
  113. desc(action_parameters) ->
  114. ?DESC("action_parameters");
  115. desc("config_connector") ->
  116. ?DESC(emqx_postgresql_connector_schema, "config_connector");
  117. desc(_) ->
  118. undefined.
  119. default_sql() ->
  120. <<
  121. "insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) "
  122. "values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
  123. >>.
  124. %% Examples
  125. bridge_v2_examples(Method) ->
  126. [
  127. #{
  128. <<"pgsql">> => #{
  129. summary => <<"PostgreSQL Action">>,
  130. value => values({Method, pgsql})
  131. }
  132. }
  133. ].
  134. conn_bridge_examples(Method) ->
  135. [
  136. #{
  137. <<"pgsql">> => #{
  138. summary => <<"PostgreSQL Bridge">>,
  139. value => values_conn_bridge_examples(Method, pgsql)
  140. }
  141. }
  142. ].
  143. values({get, PostgreSQLType}) ->
  144. maps:merge(
  145. #{
  146. status => <<"connected">>,
  147. node_status => [
  148. #{
  149. node => <<"emqx@localhost">>,
  150. status => <<"connected">>
  151. }
  152. ]
  153. },
  154. values({put, PostgreSQLType})
  155. );
  156. values({post, PostgreSQLType}) ->
  157. values({put, PostgreSQLType});
  158. values({put, PostgreSQLType}) ->
  159. maps:merge(
  160. #{
  161. name => <<"my_action">>,
  162. type => PostgreSQLType,
  163. enable => true,
  164. connector => <<"my_connector">>,
  165. resource_opts => #{
  166. batch_size => 1,
  167. batch_time => <<"50ms">>,
  168. inflight_window => 100,
  169. max_buffer_bytes => <<"256MB">>,
  170. request_ttl => <<"45s">>,
  171. worker_pool_size => 16
  172. }
  173. },
  174. values(parameters)
  175. );
  176. values(parameters) ->
  177. #{
  178. <<"parameters">> => #{
  179. <<"sql">> =>
  180. <<
  181. "INSERT INTO client_events(clientid, event, created_at)"
  182. "VALUES (\n"
  183. " ${clientid},\n"
  184. " ${event},\n"
  185. " TO_TIMESTAMP((${timestamp} :: bigint))\n"
  186. ")"
  187. >>
  188. }
  189. }.
  190. values_conn_bridge_examples(get, Type) ->
  191. maps:merge(
  192. #{
  193. status => <<"connected">>,
  194. node_status => [
  195. #{
  196. node => <<"emqx@localhost">>,
  197. status => <<"connected">>
  198. }
  199. ]
  200. },
  201. values_conn_bridge_examples(post, Type)
  202. );
  203. values_conn_bridge_examples(_Method, Type) ->
  204. #{
  205. enable => true,
  206. type => Type,
  207. name => <<"foo">>,
  208. server => <<"127.0.0.1:5432">>,
  209. database => <<"mqtt">>,
  210. pool_size => 8,
  211. username => <<"root">>,
  212. password => <<"******">>,
  213. sql => default_sql(),
  214. local_topic => <<"local/topic/#">>,
  215. resource_opts => #{
  216. worker_pool_size => 8,
  217. health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
  218. batch_size => ?DEFAULT_BATCH_SIZE,
  219. batch_time => ?DEFAULT_BATCH_TIME,
  220. query_mode => async,
  221. max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
  222. }
  223. }.