emqx_bridge_pgsql.erl 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_pgsql).
  5. -include_lib("typerefl/include/types.hrl").
  6. -include_lib("hocon/include/hoconsc.hrl").
  7. -include_lib("emqx_resource/include/emqx_resource.hrl").
  8. -import(hoconsc, [mk/2, enum/1, ref/2]).
  9. -export([
  10. conn_bridge_examples/1,
  11. values/2,
  12. fields/2
  13. ]).
  14. -export([
  15. namespace/0,
  16. roots/0,
  17. fields/1,
  18. desc/1
  19. ]).
  20. -define(DEFAULT_SQL, <<
  21. "insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) "
  22. "values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
  23. >>).
  24. %% -------------------------------------------------------------------------------------------------
  25. %% api
  26. conn_bridge_examples(Method) ->
  27. [
  28. #{
  29. <<"pgsql">> => #{
  30. summary => <<"PostgreSQL Bridge">>,
  31. value => values(Method, pgsql)
  32. }
  33. }
  34. ].
  35. values(_Method, Type) ->
  36. #{
  37. enable => true,
  38. type => Type,
  39. name => <<"foo">>,
  40. server => <<"127.0.0.1:5432">>,
  41. database => <<"mqtt">>,
  42. pool_size => 8,
  43. username => <<"root">>,
  44. password => <<"******">>,
  45. sql => ?DEFAULT_SQL,
  46. local_topic => <<"local/topic/#">>,
  47. resource_opts => #{
  48. worker_pool_size => 8,
  49. health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
  50. batch_size => ?DEFAULT_BATCH_SIZE,
  51. batch_time => ?DEFAULT_BATCH_TIME,
  52. query_mode => async,
  53. max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
  54. }
  55. }.
  56. %% -------------------------------------------------------------------------------------------------
  57. %% Hocon Schema Definitions
  58. namespace() -> "bridge_pgsql".
  59. roots() -> [].
  60. fields("config") ->
  61. [
  62. {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
  63. {sql,
  64. mk(
  65. binary(),
  66. #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
  67. )},
  68. {local_topic,
  69. mk(
  70. binary(),
  71. #{desc => ?DESC("local_topic"), default => undefined}
  72. )}
  73. ] ++ emqx_resource_schema:fields("resource_opts") ++
  74. (emqx_connector_pgsql:fields(config) --
  75. emqx_connector_schema_lib:prepare_statement_fields());
  76. fields("post") ->
  77. fields("post", pgsql);
  78. fields("put") ->
  79. fields("config");
  80. fields("get") ->
  81. emqx_bridge_schema:status_fields() ++ fields("post").
  82. fields("post", Type) ->
  83. [type_field(Type), name_field() | fields("config")].
  84. desc("config") ->
  85. ?DESC("desc_config");
  86. desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
  87. ["Configuration for PostgreSQL using `", string:to_upper(Method), "` method."];
  88. desc(_) ->
  89. undefined.
  90. %% -------------------------------------------------------------------------------------------------
  91. type_field(Type) ->
  92. {type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
  93. name_field() ->
  94. {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.