emqx_bridge_sqlserver_connector.erl 19 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_sqlserver_connector).
  5. -behaviour(emqx_resource).
  6. -include("emqx_bridge_sqlserver.hrl").
  7. -include_lib("kernel/include/file.hrl").
  8. -include_lib("emqx/include/logger.hrl").
  9. -include_lib("emqx_resource/include/emqx_resource.hrl").
  10. -include_lib("typerefl/include/types.hrl").
  11. -include_lib("hocon/include/hoconsc.hrl").
  12. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  13. %%====================================================================
  14. %% Exports
  15. %%====================================================================
  16. %% Hocon config schema exports
  17. -export([
  18. roots/0,
  19. fields/1,
  20. namespace/0
  21. ]).
  22. %% callbacks for behaviour emqx_resource
  23. -export([
  24. resource_type/0,
  25. callback_mode/0,
  26. on_start/2,
  27. on_stop/2,
  28. on_query/3,
  29. on_batch_query/3,
  30. on_get_status/2,
  31. on_add_channel/4,
  32. on_remove_channel/3,
  33. on_get_channels/1,
  34. on_get_channel_status/3,
  35. on_format_query_result/1
  36. ]).
  37. %% `ecpool_worker' API
  38. -export([
  39. connect/1,
  40. disconnect/1
  41. ]).
  42. %% Internal exports used to execute code with ecpool worker
  43. -export([do_get_status/1, worker_do_insert/3]).
  44. -import(emqx_utils_conv, [str/1]).
  45. -import(hoconsc, [mk/2, enum/1, ref/2]).
  46. -define(ACTION_SEND_MESSAGE, send_message).
  47. -define(SYNC_QUERY_MODE, handover).
  48. -define(SQLSERVER_HOST_OPTIONS, #{
  49. default_port => ?SQLSERVER_DEFAULT_PORT
  50. }).
  51. -define(REQUEST_TTL(RESOURCE_OPTS),
  52. maps:get(request_ttl, RESOURCE_OPTS, ?DEFAULT_REQUEST_TTL)
  53. ).
  54. -define(BATCH_INSERT_TEMP, batch_insert_temp).
  55. -define(BATCH_INSERT_PART, batch_insert_part).
  56. -define(BATCH_PARAMS_TOKENS, batch_insert_tks).
  57. -define(FILE_MODE_755, 33261).
  58. %% 32768 + 8#00400 + 8#00200 + 8#00100 + 8#00040 + 8#00010 + 8#00004 + 8#00001
  59. %% See also
  60. %% https://www.erlang.org/doc/man/file.html#read_file_info-2
  61. %% Copied from odbc reference page
  62. %% https://www.erlang.org/doc/man/odbc.html
  63. %% as returned by connect/2
  64. -type connection_reference() :: term().
  65. -type time_out() :: milliseconds() | infinity.
  66. -type sql() :: string() | binary().
  67. -type milliseconds() :: pos_integer().
  68. %% Tuple of column values e.g. one row of the result set.
  69. %% it's a variable size tuple of column values.
  70. -type row() :: tuple().
  71. %% Some kind of explanation of what went wrong
  72. -type common_reason() :: connection_closed | extended_error() | term().
  73. %% extended error type with ODBC
  74. %% and native database error codes, as well as the base reason that would have been
  75. %% returned had extended_errors not been enabled.
  76. -type extended_error() :: {string(), integer(), _Reason :: term()}.
  77. %% Name of column in the result set
  78. -type col_name() :: string().
  79. %% e.g. a list of the names of the selected columns in the result set.
  80. -type col_names() :: [col_name()].
  81. %% A list of rows from the result set.
  82. -type rows() :: list(row()).
  83. %% -type result_tuple() :: {updated, n_rows()} | {selected, col_names(), rows()}.
  84. -type updated_tuple() :: {updated, n_rows()}.
  85. -type selected_tuple() :: {selected, col_names(), rows()}.
  86. %% The number of affected rows for UPDATE,
  87. %% INSERT, or DELETE queries. For other query types the value
  88. %% is driver defined, and hence should be ignored.
  89. -type n_rows() :: integer().
  90. %% These type was not used in this module, but we may use it later
  91. %% -type odbc_data_type() ::
  92. %% sql_integer
  93. %% | sql_smallint
  94. %% | sql_tinyint
  95. %% | {sql_decimal, precision(), scale()}
  96. %% | {sql_numeric, precision(), scale()}
  97. %% | {sql_char, size()}
  98. %% | {sql_wchar, size()}
  99. %% | {sql_varchar, size()}
  100. %% | {sql_wvarchar, size()}
  101. %% | {sql_float, precision()}
  102. %% | {sql_wlongvarchar, size()}
  103. %% | {sql_float, precision()}
  104. %% | sql_real
  105. %% | sql_double
  106. %% | sql_bit
  107. %% | atom().
  108. %% -type precision() :: integer().
  109. %% -type scale() :: integer().
  110. %% -type size() :: integer().
  111. -type state() :: #{
  112. installed_channels := map(),
  113. pool_name := binary(),
  114. resource_opts := map()
  115. }.
  116. %%====================================================================
  117. %% Configuration and default values
  118. %%====================================================================
  119. namespace() -> sqlserver.
  120. roots() ->
  121. [{config, #{type => hoconsc:ref(?MODULE, config)}}].
  122. fields(config) ->
  123. [
  124. {server, server()}
  125. | add_default_username(emqx_connector_schema_lib:relational_db_fields())
  126. ].
  127. add_default_username(Fields) ->
  128. lists:map(
  129. fun
  130. ({username, OrigUsernameFn}) ->
  131. {username, add_default_fn(OrigUsernameFn, <<"sa">>)};
  132. (Field) ->
  133. Field
  134. end,
  135. Fields
  136. ).
  137. add_default_fn(OrigFn, Default) ->
  138. fun
  139. (default) -> Default;
  140. (Field) -> OrigFn(Field)
  141. end.
  142. server() ->
  143. Meta = #{desc => ?DESC("server")},
  144. emqx_schema:servers_sc(Meta, ?SQLSERVER_HOST_OPTIONS).
  145. %%====================================================================
  146. %% Callbacks defined in emqx_resource
  147. %%====================================================================
  148. resource_type() -> sqlserver.
  149. callback_mode() -> always_sync.
  150. on_start(
  151. InstanceId = PoolName,
  152. #{
  153. server := Server,
  154. username := Username,
  155. driver := Driver,
  156. database := Database,
  157. pool_size := PoolSize,
  158. resource_opts := ResourceOpts
  159. } = Config
  160. ) ->
  161. ?SLOG(info, #{
  162. msg => "starting_sqlserver_connector",
  163. connector => InstanceId,
  164. config => emqx_utils:redact(Config)
  165. }),
  166. ODBCDir = code:priv_dir(odbc),
  167. OdbcserverDir = filename:join(ODBCDir, "bin/odbcserver"),
  168. {ok, Info = #file_info{mode = Mode}} = file:read_file_info(OdbcserverDir),
  169. case ?FILE_MODE_755 =:= Mode of
  170. true ->
  171. ok;
  172. false ->
  173. _ = file:write_file_info(OdbcserverDir, Info#file_info{mode = ?FILE_MODE_755}),
  174. ok
  175. end,
  176. %% odbc connection string required
  177. ConnectOptions = [
  178. {server, to_bin(Server)},
  179. {username, Username},
  180. {password, maps:get(password, Config, emqx_secret:wrap(""))},
  181. {driver, Driver},
  182. {database, Database},
  183. {pool_size, PoolSize},
  184. {on_disconnect, {?MODULE, disconnect, []}}
  185. ],
  186. State = #{
  187. %% also InstanceId
  188. pool_name => PoolName,
  189. installed_channels => #{},
  190. resource_opts => ResourceOpts
  191. },
  192. case emqx_resource_pool:start(PoolName, ?MODULE, ConnectOptions) of
  193. ok ->
  194. {ok, State};
  195. {error, Reason} ->
  196. ?tp(
  197. sqlserver_connector_start_failed,
  198. #{error => Reason}
  199. ),
  200. {error, Reason}
  201. end.
  202. on_add_channel(
  203. _InstId,
  204. #{
  205. installed_channels := InstalledChannels
  206. } = OldState,
  207. ChannelId,
  208. ChannelConfig
  209. ) ->
  210. {ok, ChannelState} = create_channel_state(ChannelConfig),
  211. NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
  212. %% Update state
  213. NewState = OldState#{installed_channels => NewInstalledChannels},
  214. {ok, NewState}.
  215. create_channel_state(
  216. #{parameters := ChannelConf}
  217. ) ->
  218. State = #{sql_templates => parse_sql_template(ChannelConf), channel_conf => ChannelConf},
  219. {ok, State}.
  220. on_remove_channel(
  221. _InstId,
  222. #{
  223. installed_channels := InstalledChannels
  224. } = OldState,
  225. ChannelId
  226. ) ->
  227. NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
  228. %% Update state
  229. NewState = OldState#{installed_channels => NewInstalledChannels},
  230. {ok, NewState}.
  231. on_get_channel_status(
  232. InstanceId,
  233. ChannelId,
  234. #{installed_channels := Channels} = State
  235. ) ->
  236. case maps:find(ChannelId, Channels) of
  237. {ok, _} -> on_get_status(InstanceId, State);
  238. error -> ?status_disconnected
  239. end.
  240. on_get_channels(ResId) ->
  241. emqx_bridge_v2:get_channels_for_connector(ResId).
  242. on_stop(InstanceId, _State) ->
  243. ?tp(
  244. sqlserver_connector_on_stop,
  245. #{instance_id => InstanceId}
  246. ),
  247. ?SLOG(info, #{
  248. msg => "stopping_sqlserver_connector",
  249. connector => InstanceId
  250. }),
  251. emqx_resource_pool:stop(InstanceId).
  252. -spec on_query(
  253. resource_id(),
  254. Query :: {channel_id(), map()},
  255. state()
  256. ) ->
  257. ok
  258. | {ok, list()}
  259. | {error, {recoverable_error, term()}}
  260. | {error, term()}.
  261. on_query(ResourceId, {_ChannelId, _Msg} = Query, State) ->
  262. ?TRACE(
  263. "SINGLE_QUERY_SYNC",
  264. "bridge_sqlserver_received",
  265. #{requests => Query, connector => ResourceId, state => State}
  266. ),
  267. do_query(ResourceId, Query, ?SYNC_QUERY_MODE, State).
  268. -spec on_batch_query(
  269. resource_id(),
  270. [{channel_id(), map()}],
  271. state()
  272. ) ->
  273. ok
  274. | {ok, list()}
  275. | {error, {recoverable_error, term()}}
  276. | {error, term()}.
  277. on_batch_query(ResourceId, BatchRequests, State) ->
  278. ?TRACE(
  279. "BATCH_QUERY_SYNC",
  280. "bridge_sqlserver_received",
  281. #{requests => BatchRequests, connector => ResourceId, state => State}
  282. ),
  283. do_query(ResourceId, BatchRequests, ?SYNC_QUERY_MODE, State).
  284. on_format_query_result({ok, Rows}) ->
  285. #{result => ok, rows => Rows};
  286. on_format_query_result(Result) ->
  287. Result.
  288. on_get_status(_InstanceId, #{pool_name := PoolName} = _State) ->
  289. Health = emqx_resource_pool:health_check_workers(
  290. PoolName,
  291. {?MODULE, do_get_status, []}
  292. ),
  293. status_result(Health).
  294. status_result(_Status = true) -> ?status_connected;
  295. status_result(_Status = false) -> ?status_connecting.
  296. %% TODO:
  297. %% case for disconnected
  298. %%====================================================================
  299. %% ecpool callback fns
  300. %%====================================================================
  301. -spec connect(Options :: list()) -> {ok, connection_reference()} | {error, term()}.
  302. connect(Options) ->
  303. ConnectStr = lists:concat(conn_str(Options, [])),
  304. Opts = proplists:get_value(options, Options, []),
  305. odbc:connect(ConnectStr, Opts).
  306. -spec disconnect(connection_reference()) -> ok | {error, term()}.
  307. disconnect(ConnectionPid) ->
  308. odbc:disconnect(ConnectionPid).
  309. -spec do_get_status(connection_reference()) -> Result :: boolean().
  310. do_get_status(Conn) ->
  311. case execute(Conn, <<"SELECT 1">>) of
  312. {selected, [[]], [{1}]} -> true;
  313. _ -> false
  314. end.
  315. %%====================================================================
  316. %% Internal Functions
  317. %%====================================================================
  318. %% TODO && FIXME:
  319. %% About the connection string attribute `Encrypt`:
  320. %% The default value is `yes` in odbc version 18.0+ and `no` in previous versions.
  321. %% And encrypted connections always verify the server's certificate.
  322. %% So `Encrypt=YES;TrustServerCertificate=YES` must be set in the connection string
  323. %% when connecting to a server that has a self-signed certificate.
  324. %% See also:
  325. %% 'https://learn.microsoft.com/en-us/sql/connect/odbc/
  326. %% dsn-connection-string-attribute?source=recommendations&view=sql-server-ver16#encrypt'
  327. conn_str([], Acc) ->
  328. lists:join(";", ["Encrypt=YES", "TrustServerCertificate=YES" | Acc]);
  329. conn_str([{driver, Driver} | Opts], Acc) ->
  330. conn_str(Opts, ["Driver=" ++ str(Driver) | Acc]);
  331. conn_str([{server, Server} | Opts], Acc) ->
  332. #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?SQLSERVER_HOST_OPTIONS),
  333. conn_str(Opts, ["Server=" ++ str(Host) ++ "," ++ str(Port) | Acc]);
  334. conn_str([{database, Database} | Opts], Acc) ->
  335. conn_str(Opts, ["Database=" ++ str(Database) | Acc]);
  336. conn_str([{username, Username} | Opts], Acc) ->
  337. conn_str(Opts, ["UID=" ++ str(Username) | Acc]);
  338. conn_str([{password, Password} | Opts], Acc) ->
  339. conn_str(Opts, ["PWD=" ++ str(emqx_secret:unwrap(Password)) | Acc]);
  340. conn_str([{_, _} | Opts], Acc) ->
  341. conn_str(Opts, Acc).
  342. %% Query with singe & batch sql statement
  343. -spec do_query(
  344. resource_id(),
  345. Query :: {channel_id(), map()} | [{channel_id(), map()}],
  346. ApplyMode :: handover,
  347. state()
  348. ) ->
  349. {ok, list()}
  350. | {error, {recoverable_error, term()}}
  351. | {error, {unrecoverable_error, term()}}
  352. | {error, term()}.
  353. do_query(
  354. ResourceId,
  355. Query,
  356. ApplyMode,
  357. #{
  358. pool_name := PoolName,
  359. installed_channels := Channels
  360. } = State
  361. ) ->
  362. ?TRACE(
  363. "SINGLE_QUERY_SYNC",
  364. "sqlserver_connector_received",
  365. #{query => Query, connector => ResourceId, state => State}
  366. ),
  367. ChannelId = get_channel_id(Query),
  368. QueryTuple = get_query_tuple(Query),
  369. #{sql_templates := Templates} = ChannelState = maps:get(ChannelId, Channels),
  370. ChannelConf = maps:get(channel_conf, ChannelState, #{}),
  371. %% only insert sql statement for single query and batch query
  372. case apply_template(QueryTuple, Templates, ChannelConf) of
  373. {?ACTION_SEND_MESSAGE, SQL} ->
  374. emqx_trace:rendered_action_template(ChannelId, #{
  375. sql => SQL
  376. }),
  377. Result = ecpool:pick_and_do(
  378. PoolName,
  379. {?MODULE, worker_do_insert, [SQL, State]},
  380. ApplyMode
  381. );
  382. QueryTuple ->
  383. Result = {error, {unrecoverable_error, invalid_query}};
  384. _ ->
  385. Result = {error, {unrecoverable_error, failed_to_apply_sql_template}}
  386. end,
  387. case Result of
  388. {error, Reason} ->
  389. ?tp(
  390. sqlserver_connector_query_return,
  391. #{error => Reason}
  392. ),
  393. ?SLOG(error, #{
  394. msg => "sqlserver_connector_do_query_failed",
  395. connector => ResourceId,
  396. query => Query,
  397. reason => Reason
  398. }),
  399. case Reason of
  400. ecpool_empty ->
  401. {error, {recoverable_error, Reason}};
  402. _ ->
  403. Result
  404. end;
  405. _ ->
  406. ?tp(
  407. sqlserver_connector_query_return,
  408. #{result => Result}
  409. ),
  410. Result
  411. end.
  412. worker_do_insert(
  413. Conn, SQL, #{resource_opts := ResourceOpts, pool_name := ResourceId}
  414. ) ->
  415. LogMeta = #{connector => ResourceId},
  416. try
  417. case execute(Conn, SQL, ?REQUEST_TTL(ResourceOpts)) of
  418. {selected, Rows, _} ->
  419. {ok, Rows};
  420. {updated, _} ->
  421. ok;
  422. {error, ErrStr} ->
  423. ?SLOG(error, LogMeta#{msg => "invalid_request", reason => ErrStr}),
  424. {error, {unrecoverable_error, {invalid_request, ErrStr}}}
  425. end
  426. catch
  427. _Type:Reason:St ->
  428. ?SLOG(error, LogMeta#{msg => "invalid_request", reason => Reason, stacktrace => St}),
  429. {error, {unrecoverable_error, {invalid_request, Reason}}}
  430. end.
  431. -spec execute(connection_reference(), sql()) ->
  432. updated_tuple()
  433. | selected_tuple()
  434. | [updated_tuple()]
  435. | [selected_tuple()]
  436. | {error, common_reason()}.
  437. execute(Conn, SQL) ->
  438. odbc:sql_query(Conn, str(SQL)).
  439. -spec execute(connection_reference(), sql(), time_out()) ->
  440. updated_tuple()
  441. | selected_tuple()
  442. | [updated_tuple()]
  443. | [selected_tuple()]
  444. | {error, common_reason()}.
  445. execute(Conn, SQL, Timeout) ->
  446. odbc:sql_query(Conn, str(SQL), Timeout).
  447. get_channel_id([{ChannelId, _Req} | _]) ->
  448. ChannelId;
  449. get_channel_id({ChannelId, _Req}) ->
  450. ChannelId.
  451. get_query_tuple({_ChannelId, {QueryType, Data}} = _Query) ->
  452. {QueryType, Data};
  453. get_query_tuple({_ChannelId, Data} = _Query) ->
  454. {send_message, Data};
  455. get_query_tuple([{_ChannelId, {_QueryType, _Data}} | _]) ->
  456. error(
  457. {unrecoverable_error,
  458. {invalid_request, <<"The only query type that supports batching is insert.">>}}
  459. );
  460. get_query_tuple([_InsertQuery | _] = Reqs) ->
  461. lists:map(fun get_query_tuple/1, Reqs).
  462. %% for bridge data to sql server
  463. parse_sql_template(Config) ->
  464. RawSQLTemplates =
  465. case maps:get(sql, Config, undefined) of
  466. undefined -> #{};
  467. <<>> -> #{};
  468. SQLTemplate -> #{?ACTION_SEND_MESSAGE => SQLTemplate}
  469. end,
  470. BatchInsertTks = #{},
  471. parse_sql_template(maps:to_list(RawSQLTemplates), BatchInsertTks).
  472. parse_sql_template([{Key, H} | T], BatchInsertTks) ->
  473. case emqx_utils_sql:get_statement_type(H) of
  474. select ->
  475. parse_sql_template(T, BatchInsertTks);
  476. insert ->
  477. case emqx_utils_sql:parse_insert(H) of
  478. {ok, {InsertSQL, Params}} ->
  479. parse_sql_template(
  480. T,
  481. BatchInsertTks#{
  482. Key =>
  483. #{
  484. ?BATCH_INSERT_PART => InsertSQL,
  485. ?BATCH_PARAMS_TOKENS => emqx_placeholder:preproc_tmpl(Params)
  486. }
  487. }
  488. );
  489. {error, Reason} ->
  490. ?SLOG(error, #{msg => "split_sql_failed", sql => H, reason => Reason}),
  491. parse_sql_template(T, BatchInsertTks)
  492. end;
  493. Type when is_atom(Type) ->
  494. ?SLOG(error, #{msg => "detect_sql_type_unsupported", sql => H, type => Type}),
  495. parse_sql_template(T, BatchInsertTks);
  496. {error, Reason} ->
  497. ?SLOG(error, #{msg => "detect_sql_type_failed", sql => H, reason => Reason}),
  498. parse_sql_template(T, BatchInsertTks)
  499. end;
  500. parse_sql_template([], BatchInsertTks) ->
  501. #{
  502. ?BATCH_INSERT_TEMP => BatchInsertTks
  503. }.
  504. %% single insert
  505. apply_template(
  506. {?ACTION_SEND_MESSAGE = _Key, _Msg} = Query, Templates, ChannelConf
  507. ) ->
  508. %% TODO: fix emqx_placeholder:proc_tmpl/2
  509. %% it won't add single quotes for string
  510. apply_template([Query], Templates, ChannelConf);
  511. %% batch inserts
  512. apply_template(
  513. [{?ACTION_SEND_MESSAGE = Key, _Msg} | _T] = BatchReqs,
  514. #{?BATCH_INSERT_TEMP := BatchInsertsTks} = _Templates,
  515. ChannelConf
  516. ) ->
  517. case maps:get(Key, BatchInsertsTks, undefined) of
  518. undefined ->
  519. BatchReqs;
  520. #{?BATCH_INSERT_PART := BatchInserts, ?BATCH_PARAMS_TOKENS := BatchParamsTks} ->
  521. SQL = proc_batch_sql(BatchReqs, BatchInserts, BatchParamsTks, ChannelConf),
  522. {Key, SQL}
  523. end;
  524. apply_template(Query, Templates, _) ->
  525. %% TODO: more detail information
  526. ?SLOG(error, #{msg => "apply_sql_template_failed", query => Query, templates => Templates}),
  527. {error, failed_to_apply_sql_template}.
  528. proc_batch_sql(BatchReqs, BatchInserts, Tokens, ChannelConf) ->
  529. Values = erlang:iolist_to_binary(
  530. lists:join($,, [
  531. proc_msg(Tokens, Msg, ChannelConf)
  532. || {_, Msg} <- BatchReqs
  533. ])
  534. ),
  535. <<BatchInserts/binary, " values ", Values/binary>>.
  536. proc_msg(Tokens, Msg, #{undefined_vars_as_null := true}) ->
  537. emqx_placeholder:proc_sqlserver_param_str2(Tokens, Msg);
  538. proc_msg(Tokens, Msg, _) ->
  539. emqx_placeholder:proc_sqlserver_param_str(Tokens, Msg).
  540. to_bin(List) when is_list(List) ->
  541. unicode:characters_to_binary(List, utf8).