emqx_bridge_sqlserver.erl 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_sqlserver).
  5. -behaviour(emqx_connector_examples).
  6. -include_lib("typerefl/include/types.hrl").
  7. -include_lib("hocon/include/hoconsc.hrl").
  8. -include_lib("emqx_bridge/include/emqx_bridge.hrl").
  9. -include_lib("emqx_resource/include/emqx_resource.hrl").
  10. -import(hoconsc, [mk/2, enum/1, ref/2]).
  11. -export([
  12. bridge_v2_examples/1,
  13. connector_examples/1,
  14. conn_bridge_examples/1
  15. ]).
  16. -export([
  17. namespace/0,
  18. roots/0,
  19. fields/1,
  20. desc/1
  21. ]).
  22. -define(CONNECTOR_TYPE, sqlserver).
  23. -define(ACTION_TYPE, ?CONNECTOR_TYPE).
  24. -define(DEFAULT_SQL, <<
  25. "insert into t_mqtt_msg(msgid, topic, qos, payload) "
  26. "values ( ${id}, ${topic}, ${qos}, ${payload} )"
  27. >>).
  28. -define(DEFAULT_DRIVER, <<"ms-sql">>).
  29. %% -------------------------------------------------------------------------------------------------
  30. %% api.
  31. conn_bridge_examples(Method) ->
  32. [
  33. #{
  34. <<"sqlserver">> => #{
  35. summary => <<"Microsoft SQL Server Bridge">>,
  36. value => values(Method)
  37. }
  38. }
  39. ].
  40. values(get) ->
  41. values(post);
  42. values(post) ->
  43. #{
  44. enable => true,
  45. type => sqlserver,
  46. name => <<"bar">>,
  47. server => <<"127.0.0.1:1433">>,
  48. database => <<"test">>,
  49. pool_size => 8,
  50. username => <<"sa">>,
  51. password => <<"******">>,
  52. sql => ?DEFAULT_SQL,
  53. driver => ?DEFAULT_DRIVER,
  54. local_topic => <<"local/topic/#">>,
  55. resource_opts => #{
  56. worker_pool_size => 1,
  57. health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
  58. batch_size => ?DEFAULT_BATCH_SIZE,
  59. batch_time => ?DEFAULT_BATCH_TIME,
  60. query_mode => async,
  61. max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
  62. }
  63. };
  64. values(put) ->
  65. values(post).
  66. %% ====================
  67. %% Bridge V2: Connector + Action
  68. connector_examples(Method) ->
  69. [
  70. #{
  71. <<"sqlserver">> =>
  72. #{
  73. summary => <<"Microsoft SQL Server Connector">>,
  74. value => emqx_connector_schema:connector_values(
  75. Method, ?CONNECTOR_TYPE, connector_values()
  76. )
  77. }
  78. }
  79. ].
  80. connector_values() ->
  81. #{
  82. server => <<"127.0.0.1:1433">>,
  83. database => <<"test">>,
  84. pool_size => 8,
  85. username => <<"sa">>,
  86. password => <<"******">>,
  87. driver => ?DEFAULT_DRIVER,
  88. resource_opts => #{health_check_interval => <<"20s">>}
  89. }.
  90. bridge_v2_examples(Method) ->
  91. [
  92. #{
  93. <<"sqlserver">> =>
  94. #{
  95. summary => <<"Microsoft SQL Server Action">>,
  96. value => emqx_bridge_v2_schema:action_values(
  97. Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
  98. )
  99. }
  100. }
  101. ].
  102. action_values() ->
  103. #{
  104. <<"parameters">> =>
  105. #{<<"sql">> => ?DEFAULT_SQL}
  106. }.
  107. %% -------------------------------------------------------------------------------------------------
  108. %% Hocon Schema Definitions
  109. namespace() -> "bridge_sqlserver".
  110. roots() -> [].
  111. fields(Field) when
  112. Field == "get_bridge_v2";
  113. Field == "post_bridge_v2";
  114. Field == "put_bridge_v2"
  115. ->
  116. emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(sqlserver_action));
  117. fields(Field) when
  118. Field == "get_connector";
  119. Field == "put_connector";
  120. Field == "post_connector"
  121. ->
  122. emqx_connector_schema:api_fields(
  123. Field,
  124. ?CONNECTOR_TYPE,
  125. fields("config_connector") -- emqx_connector_schema:common_fields()
  126. );
  127. fields("config_connector") ->
  128. driver_fields() ++
  129. emqx_connector_schema:common_fields() ++
  130. emqx_bridge_sqlserver_connector:fields(config) ++
  131. emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
  132. fields(connector_resource_opts) ->
  133. emqx_connector_schema:resource_opts_fields();
  134. fields("config") ->
  135. [
  136. {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
  137. {sql,
  138. mk(
  139. binary(),
  140. #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
  141. )},
  142. {local_topic,
  143. mk(
  144. binary(),
  145. #{desc => ?DESC("local_topic"), default => undefined}
  146. )},
  147. {resource_opts,
  148. mk(
  149. ref(?MODULE, "creation_opts"),
  150. #{
  151. required => false,
  152. default => #{},
  153. desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
  154. }
  155. )}
  156. ] ++ driver_fields() ++
  157. (emqx_bridge_sqlserver_connector:fields(config) --
  158. emqx_connector_schema_lib:prepare_statement_fields());
  159. fields(action) ->
  160. {?ACTION_TYPE,
  161. mk(
  162. hoconsc:map(name, ref(?MODULE, sqlserver_action)),
  163. #{desc => ?DESC("sqlserver_action"), required => false}
  164. )};
  165. fields(sqlserver_action) ->
  166. emqx_bridge_v2_schema:make_producer_action_schema(
  167. mk(
  168. ref(?MODULE, action_parameters),
  169. #{required => true, desc => ?DESC(action_parameters)}
  170. )
  171. );
  172. fields(action_parameters) ->
  173. [
  174. {sql,
  175. mk(
  176. emqx_schema:template(),
  177. #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
  178. )}
  179. ];
  180. fields("creation_opts") ->
  181. emqx_resource_schema:fields("creation_opts");
  182. fields("post") ->
  183. fields("post", sqlserver);
  184. fields("put") ->
  185. fields("config");
  186. fields("get") ->
  187. emqx_bridge_schema:status_fields() ++ fields("post").
  188. fields("post", Type) ->
  189. [type_field(Type), name_field() | fields("config")].
  190. driver_fields() ->
  191. [{driver, mk(binary(), #{desc => ?DESC("driver"), default => ?DEFAULT_DRIVER})}].
  192. desc("config") ->
  193. ?DESC("desc_config");
  194. desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
  195. ["Configuration for Microsoft SQL Server using `", string:to_upper(Method), "` method."];
  196. desc("creation_opts" = Name) ->
  197. emqx_resource_schema:desc(Name);
  198. desc("config_connector") ->
  199. ?DESC("config_connector");
  200. desc(sqlserver_action) ->
  201. ?DESC("sqlserver_action");
  202. desc(action_parameters) ->
  203. ?DESC("action_parameters");
  204. desc(connector_resource_opts) ->
  205. ?DESC(emqx_resource_schema, "resource_opts");
  206. desc(_) ->
  207. undefined.
  208. %% -------------------------------------------------------------------------------------------------
  209. type_field(Type) ->
  210. {type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
  211. name_field() ->
  212. {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.