emqx_bridge_sqlserver_connector.erl 18 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_sqlserver_connector).
  5. -behaviour(emqx_resource).
  6. -include("emqx_bridge_sqlserver.hrl").
  7. -include_lib("kernel/include/file.hrl").
  8. -include_lib("emqx/include/logger.hrl").
  9. -include_lib("emqx_resource/include/emqx_resource.hrl").
  10. -include_lib("typerefl/include/types.hrl").
  11. -include_lib("hocon/include/hoconsc.hrl").
  12. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  13. %%====================================================================
  14. %% Exports
  15. %%====================================================================
  16. %% Hocon config schema exports
  17. -export([
  18. roots/0,
  19. fields/1,
  20. namespace/0
  21. ]).
  22. %% callbacks for behaviour emqx_resource
  23. -export([
  24. resource_type/0,
  25. callback_mode/0,
  26. on_start/2,
  27. on_stop/2,
  28. on_query/3,
  29. on_batch_query/3,
  30. on_get_status/2,
  31. on_add_channel/4,
  32. on_remove_channel/3,
  33. on_get_channels/1,
  34. on_get_channel_status/3,
  35. on_format_query_result/1
  36. ]).
  37. %% callbacks for ecpool
  38. -export([connect/1]).
  39. %% Internal exports used to execute code with ecpool worker
  40. -export([do_get_status/1, worker_do_insert/3]).
  41. -import(emqx_utils_conv, [str/1]).
  42. -import(hoconsc, [mk/2, enum/1, ref/2]).
  43. -define(ACTION_SEND_MESSAGE, send_message).
  44. -define(SYNC_QUERY_MODE, handover).
  45. -define(SQLSERVER_HOST_OPTIONS, #{
  46. default_port => ?SQLSERVER_DEFAULT_PORT
  47. }).
  48. -define(REQUEST_TTL(RESOURCE_OPTS),
  49. maps:get(request_ttl, RESOURCE_OPTS, ?DEFAULT_REQUEST_TTL)
  50. ).
  51. -define(BATCH_INSERT_TEMP, batch_insert_temp).
  52. -define(BATCH_INSERT_PART, batch_insert_part).
  53. -define(BATCH_PARAMS_TOKENS, batch_insert_tks).
  54. -define(FILE_MODE_755, 33261).
  55. %% 32768 + 8#00400 + 8#00200 + 8#00100 + 8#00040 + 8#00010 + 8#00004 + 8#00001
  56. %% See also
  57. %% https://www.erlang.org/doc/man/file.html#read_file_info-2
  58. %% Copied from odbc reference page
  59. %% https://www.erlang.org/doc/man/odbc.html
  60. %% as returned by connect/2
  61. -type connection_reference() :: term().
  62. -type time_out() :: milliseconds() | infinity.
  63. -type sql() :: string() | binary().
  64. -type milliseconds() :: pos_integer().
  65. %% Tuple of column values e.g. one row of the result set.
  66. %% it's a variable size tuple of column values.
  67. -type row() :: tuple().
  68. %% Some kind of explanation of what went wrong
  69. -type common_reason() :: connection_closed | extended_error() | term().
  70. %% extended error type with ODBC
  71. %% and native database error codes, as well as the base reason that would have been
  72. %% returned had extended_errors not been enabled.
  73. -type extended_error() :: {string(), integer(), _Reason :: term()}.
  74. %% Name of column in the result set
  75. -type col_name() :: string().
  76. %% e.g. a list of the names of the selected columns in the result set.
  77. -type col_names() :: [col_name()].
  78. %% A list of rows from the result set.
  79. -type rows() :: list(row()).
  80. %% -type result_tuple() :: {updated, n_rows()} | {selected, col_names(), rows()}.
  81. -type updated_tuple() :: {updated, n_rows()}.
  82. -type selected_tuple() :: {selected, col_names(), rows()}.
  83. %% The number of affected rows for UPDATE,
  84. %% INSERT, or DELETE queries. For other query types the value
  85. %% is driver defined, and hence should be ignored.
  86. -type n_rows() :: integer().
  87. %% These type was not used in this module, but we may use it later
  88. %% -type odbc_data_type() ::
  89. %% sql_integer
  90. %% | sql_smallint
  91. %% | sql_tinyint
  92. %% | {sql_decimal, precision(), scale()}
  93. %% | {sql_numeric, precision(), scale()}
  94. %% | {sql_char, size()}
  95. %% | {sql_wchar, size()}
  96. %% | {sql_varchar, size()}
  97. %% | {sql_wvarchar, size()}
  98. %% | {sql_float, precision()}
  99. %% | {sql_wlongvarchar, size()}
  100. %% | {sql_float, precision()}
  101. %% | sql_real
  102. %% | sql_double
  103. %% | sql_bit
  104. %% | atom().
  105. %% -type precision() :: integer().
  106. %% -type scale() :: integer().
  107. %% -type size() :: integer().
  108. -type state() :: #{
  109. installed_channels := map(),
  110. pool_name := binary(),
  111. resource_opts := map()
  112. }.
  113. %%====================================================================
  114. %% Configuration and default values
  115. %%====================================================================
  116. namespace() -> sqlserver.
  117. roots() ->
  118. [{config, #{type => hoconsc:ref(?MODULE, config)}}].
  119. fields(config) ->
  120. [
  121. {server, server()}
  122. | add_default_username(emqx_connector_schema_lib:relational_db_fields())
  123. ].
  124. add_default_username(Fields) ->
  125. lists:map(
  126. fun
  127. ({username, OrigUsernameFn}) ->
  128. {username, add_default_fn(OrigUsernameFn, <<"sa">>)};
  129. (Field) ->
  130. Field
  131. end,
  132. Fields
  133. ).
  134. add_default_fn(OrigFn, Default) ->
  135. fun
  136. (default) -> Default;
  137. (Field) -> OrigFn(Field)
  138. end.
  139. server() ->
  140. Meta = #{desc => ?DESC("server")},
  141. emqx_schema:servers_sc(Meta, ?SQLSERVER_HOST_OPTIONS).
  142. %%====================================================================
  143. %% Callbacks defined in emqx_resource
  144. %%====================================================================
  145. resource_type() -> sqlserver.
  146. callback_mode() -> always_sync.
  147. on_start(
  148. InstanceId = PoolName,
  149. #{
  150. server := Server,
  151. username := Username,
  152. driver := Driver,
  153. database := Database,
  154. pool_size := PoolSize,
  155. resource_opts := ResourceOpts
  156. } = Config
  157. ) ->
  158. ?SLOG(info, #{
  159. msg => "starting_sqlserver_connector",
  160. connector => InstanceId,
  161. config => emqx_utils:redact(Config)
  162. }),
  163. ODBCDir = code:priv_dir(odbc),
  164. OdbcserverDir = filename:join(ODBCDir, "bin/odbcserver"),
  165. {ok, Info = #file_info{mode = Mode}} = file:read_file_info(OdbcserverDir),
  166. case ?FILE_MODE_755 =:= Mode of
  167. true ->
  168. ok;
  169. false ->
  170. _ = file:write_file_info(OdbcserverDir, Info#file_info{mode = ?FILE_MODE_755}),
  171. ok
  172. end,
  173. %% odbc connection string required
  174. ConnectOptions = [
  175. {server, to_bin(Server)},
  176. {username, Username},
  177. {password, maps:get(password, Config, emqx_secret:wrap(""))},
  178. {driver, Driver},
  179. {database, Database},
  180. {pool_size, PoolSize}
  181. ],
  182. State = #{
  183. %% also InstanceId
  184. pool_name => PoolName,
  185. installed_channels => #{},
  186. resource_opts => ResourceOpts
  187. },
  188. case emqx_resource_pool:start(PoolName, ?MODULE, ConnectOptions) of
  189. ok ->
  190. {ok, State};
  191. {error, Reason} ->
  192. ?tp(
  193. sqlserver_connector_start_failed,
  194. #{error => Reason}
  195. ),
  196. {error, Reason}
  197. end.
  198. on_add_channel(
  199. _InstId,
  200. #{
  201. installed_channels := InstalledChannels
  202. } = OldState,
  203. ChannelId,
  204. ChannelConfig
  205. ) ->
  206. {ok, ChannelState} = create_channel_state(ChannelConfig),
  207. NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
  208. %% Update state
  209. NewState = OldState#{installed_channels => NewInstalledChannels},
  210. {ok, NewState}.
  211. create_channel_state(
  212. #{parameters := Conf} = _ChannelConfig
  213. ) ->
  214. State = #{sql_templates => parse_sql_template(Conf)},
  215. {ok, State}.
  216. on_remove_channel(
  217. _InstId,
  218. #{
  219. installed_channels := InstalledChannels
  220. } = OldState,
  221. ChannelId
  222. ) ->
  223. NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
  224. %% Update state
  225. NewState = OldState#{installed_channels => NewInstalledChannels},
  226. {ok, NewState}.
  227. on_get_channel_status(
  228. InstanceId,
  229. ChannelId,
  230. #{installed_channels := Channels} = State
  231. ) ->
  232. case maps:find(ChannelId, Channels) of
  233. {ok, _} -> on_get_status(InstanceId, State);
  234. error -> ?status_disconnected
  235. end.
  236. on_get_channels(ResId) ->
  237. emqx_bridge_v2:get_channels_for_connector(ResId).
  238. on_stop(InstanceId, _State) ->
  239. ?tp(
  240. sqlserver_connector_on_stop,
  241. #{instance_id => InstanceId}
  242. ),
  243. ?SLOG(info, #{
  244. msg => "stopping_sqlserver_connector",
  245. connector => InstanceId
  246. }),
  247. emqx_resource_pool:stop(InstanceId).
  248. -spec on_query(
  249. resource_id(),
  250. Query :: {channel_id(), map()},
  251. state()
  252. ) ->
  253. ok
  254. | {ok, list()}
  255. | {error, {recoverable_error, term()}}
  256. | {error, term()}.
  257. on_query(ResourceId, {_ChannelId, _Msg} = Query, State) ->
  258. ?TRACE(
  259. "SINGLE_QUERY_SYNC",
  260. "bridge_sqlserver_received",
  261. #{requests => Query, connector => ResourceId, state => State}
  262. ),
  263. do_query(ResourceId, Query, ?SYNC_QUERY_MODE, State).
  264. -spec on_batch_query(
  265. resource_id(),
  266. [{channel_id(), map()}],
  267. state()
  268. ) ->
  269. ok
  270. | {ok, list()}
  271. | {error, {recoverable_error, term()}}
  272. | {error, term()}.
  273. on_batch_query(ResourceId, BatchRequests, State) ->
  274. ?TRACE(
  275. "BATCH_QUERY_SYNC",
  276. "bridge_sqlserver_received",
  277. #{requests => BatchRequests, connector => ResourceId, state => State}
  278. ),
  279. do_query(ResourceId, BatchRequests, ?SYNC_QUERY_MODE, State).
  280. on_format_query_result({ok, Rows}) ->
  281. #{result => ok, rows => Rows};
  282. on_format_query_result(Result) ->
  283. Result.
  284. on_get_status(_InstanceId, #{pool_name := PoolName} = _State) ->
  285. Health = emqx_resource_pool:health_check_workers(
  286. PoolName,
  287. {?MODULE, do_get_status, []}
  288. ),
  289. status_result(Health).
  290. status_result(_Status = true) -> ?status_connected;
  291. status_result(_Status = false) -> ?status_connecting.
  292. %% TODO:
  293. %% case for disconnected
  294. %%====================================================================
  295. %% ecpool callback fns
  296. %%====================================================================
  297. -spec connect(Options :: list()) -> {ok, connection_reference()} | {error, term()}.
  298. connect(Options) ->
  299. ConnectStr = lists:concat(conn_str(Options, [])),
  300. Opts = proplists:get_value(options, Options, []),
  301. odbc:connect(ConnectStr, Opts).
  302. -spec do_get_status(connection_reference()) -> Result :: boolean().
  303. do_get_status(Conn) ->
  304. case execute(Conn, <<"SELECT 1">>) of
  305. {selected, [[]], [{1}]} -> true;
  306. _ -> false
  307. end.
  308. %%====================================================================
  309. %% Internal Functions
  310. %%====================================================================
  311. %% TODO && FIXME:
  312. %% About the connection string attribute `Encrypt`:
  313. %% The default value is `yes` in odbc version 18.0+ and `no` in previous versions.
  314. %% And encrypted connections always verify the server's certificate.
  315. %% So `Encrypt=YES;TrustServerCertificate=YES` must be set in the connection string
  316. %% when connecting to a server that has a self-signed certificate.
  317. %% See also:
  318. %% 'https://learn.microsoft.com/en-us/sql/connect/odbc/
  319. %% dsn-connection-string-attribute?source=recommendations&view=sql-server-ver16#encrypt'
  320. conn_str([], Acc) ->
  321. %% we should use this for msodbcsql 18+
  322. %% lists:join(";", ["Encrypt=YES", "TrustServerCertificate=YES" | Acc]);
  323. lists:join(";", Acc);
  324. conn_str([{driver, Driver} | Opts], Acc) ->
  325. conn_str(Opts, ["Driver=" ++ str(Driver) | Acc]);
  326. conn_str([{server, Server} | Opts], Acc) ->
  327. #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?SQLSERVER_HOST_OPTIONS),
  328. conn_str(Opts, ["Server=" ++ str(Host) ++ "," ++ str(Port) | Acc]);
  329. conn_str([{database, Database} | Opts], Acc) ->
  330. conn_str(Opts, ["Database=" ++ str(Database) | Acc]);
  331. conn_str([{username, Username} | Opts], Acc) ->
  332. conn_str(Opts, ["UID=" ++ str(Username) | Acc]);
  333. conn_str([{password, Password} | Opts], Acc) ->
  334. conn_str(Opts, ["PWD=" ++ str(emqx_secret:unwrap(Password)) | Acc]);
  335. conn_str([{_, _} | Opts], Acc) ->
  336. conn_str(Opts, Acc).
  337. %% Query with singe & batch sql statement
  338. -spec do_query(
  339. resource_id(),
  340. Query :: {channel_id(), map()} | [{channel_id(), map()}],
  341. ApplyMode :: handover,
  342. state()
  343. ) ->
  344. {ok, list()}
  345. | {error, {recoverable_error, term()}}
  346. | {error, {unrecoverable_error, term()}}
  347. | {error, term()}.
  348. do_query(
  349. ResourceId,
  350. Query,
  351. ApplyMode,
  352. #{
  353. pool_name := PoolName,
  354. installed_channels := Channels
  355. } = State
  356. ) ->
  357. ?TRACE(
  358. "SINGLE_QUERY_SYNC",
  359. "sqlserver_connector_received",
  360. #{query => Query, connector => ResourceId, state => State}
  361. ),
  362. ChannelId = get_channel_id(Query),
  363. QueryTuple = get_query_tuple(Query),
  364. #{sql_templates := Templates} = _ChannelState = maps:get(ChannelId, Channels),
  365. %% only insert sql statement for single query and batch query
  366. case apply_template(QueryTuple, Templates) of
  367. {?ACTION_SEND_MESSAGE, SQL} ->
  368. emqx_trace:rendered_action_template(ChannelId, #{
  369. sql => SQL
  370. }),
  371. Result = ecpool:pick_and_do(
  372. PoolName,
  373. {?MODULE, worker_do_insert, [SQL, State]},
  374. ApplyMode
  375. );
  376. QueryTuple ->
  377. Result = {error, {unrecoverable_error, invalid_query}};
  378. _ ->
  379. Result = {error, {unrecoverable_error, failed_to_apply_sql_template}}
  380. end,
  381. case Result of
  382. {error, Reason} ->
  383. ?tp(
  384. sqlserver_connector_query_return,
  385. #{error => Reason}
  386. ),
  387. ?SLOG(error, #{
  388. msg => "sqlserver_connector_do_query_failed",
  389. connector => ResourceId,
  390. query => Query,
  391. reason => Reason
  392. }),
  393. case Reason of
  394. ecpool_empty ->
  395. {error, {recoverable_error, Reason}};
  396. _ ->
  397. Result
  398. end;
  399. _ ->
  400. ?tp(
  401. sqlserver_connector_query_return,
  402. #{result => Result}
  403. ),
  404. Result
  405. end.
  406. worker_do_insert(
  407. Conn, SQL, #{resource_opts := ResourceOpts, pool_name := ResourceId} = State
  408. ) ->
  409. LogMeta = #{connector => ResourceId, state => State},
  410. try
  411. case execute(Conn, SQL, ?REQUEST_TTL(ResourceOpts)) of
  412. {selected, Rows, _} ->
  413. {ok, Rows};
  414. {updated, _} ->
  415. ok;
  416. {error, ErrStr} ->
  417. ?SLOG(error, LogMeta#{msg => "invalid_request", reason => ErrStr}),
  418. {error, {unrecoverable_error, {invalid_request, ErrStr}}}
  419. end
  420. catch
  421. _Type:Reason ->
  422. ?SLOG(error, LogMeta#{msg => "invalid_request", reason => Reason}),
  423. {error, {unrecoverable_error, {invalid_request, Reason}}}
  424. end.
  425. -spec execute(connection_reference(), sql()) ->
  426. updated_tuple()
  427. | selected_tuple()
  428. | [updated_tuple()]
  429. | [selected_tuple()]
  430. | {error, common_reason()}.
  431. execute(Conn, SQL) ->
  432. odbc:sql_query(Conn, str(SQL)).
  433. -spec execute(connection_reference(), sql(), time_out()) ->
  434. updated_tuple()
  435. | selected_tuple()
  436. | [updated_tuple()]
  437. | [selected_tuple()]
  438. | {error, common_reason()}.
  439. execute(Conn, SQL, Timeout) ->
  440. odbc:sql_query(Conn, str(SQL), Timeout).
  441. get_channel_id([{ChannelId, _Req} | _]) ->
  442. ChannelId;
  443. get_channel_id({ChannelId, _Req}) ->
  444. ChannelId.
  445. get_query_tuple({_ChannelId, {QueryType, Data}} = _Query) ->
  446. {QueryType, Data};
  447. get_query_tuple({_ChannelId, Data} = _Query) ->
  448. {send_message, Data};
  449. get_query_tuple([{_ChannelId, {_QueryType, _Data}} | _]) ->
  450. error(
  451. {unrecoverable_error,
  452. {invalid_request, <<"The only query type that supports batching is insert.">>}}
  453. );
  454. get_query_tuple([InsertQuery | _]) ->
  455. get_query_tuple(InsertQuery).
  456. %% for bridge data to sql server
  457. parse_sql_template(Config) ->
  458. RawSQLTemplates =
  459. case maps:get(sql, Config, undefined) of
  460. undefined -> #{};
  461. <<>> -> #{};
  462. SQLTemplate -> #{?ACTION_SEND_MESSAGE => SQLTemplate}
  463. end,
  464. BatchInsertTks = #{},
  465. parse_sql_template(maps:to_list(RawSQLTemplates), BatchInsertTks).
  466. parse_sql_template([{Key, H} | T], BatchInsertTks) ->
  467. case emqx_utils_sql:get_statement_type(H) of
  468. select ->
  469. parse_sql_template(T, BatchInsertTks);
  470. insert ->
  471. case emqx_utils_sql:parse_insert(H) of
  472. {ok, {InsertSQL, Params}} ->
  473. parse_sql_template(
  474. T,
  475. BatchInsertTks#{
  476. Key =>
  477. #{
  478. ?BATCH_INSERT_PART => InsertSQL,
  479. ?BATCH_PARAMS_TOKENS => emqx_placeholder:preproc_tmpl(Params)
  480. }
  481. }
  482. );
  483. {error, Reason} ->
  484. ?SLOG(error, #{msg => "split_sql_failed", sql => H, reason => Reason}),
  485. parse_sql_template(T, BatchInsertTks)
  486. end;
  487. Type when is_atom(Type) ->
  488. ?SLOG(error, #{msg => "detect_sql_type_unsupported", sql => H, type => Type}),
  489. parse_sql_template(T, BatchInsertTks);
  490. {error, Reason} ->
  491. ?SLOG(error, #{msg => "detect_sql_type_failed", sql => H, reason => Reason}),
  492. parse_sql_template(T, BatchInsertTks)
  493. end;
  494. parse_sql_template([], BatchInsertTks) ->
  495. #{
  496. ?BATCH_INSERT_TEMP => BatchInsertTks
  497. }.
  498. %% single insert
  499. apply_template(
  500. {?ACTION_SEND_MESSAGE = _Key, _Msg} = Query, Templates
  501. ) ->
  502. %% TODO: fix emqx_placeholder:proc_tmpl/2
  503. %% it won't add single quotes for string
  504. apply_template([Query], Templates);
  505. %% batch inserts
  506. apply_template(
  507. [{?ACTION_SEND_MESSAGE = Key, _Msg} | _T] = BatchReqs,
  508. #{?BATCH_INSERT_TEMP := BatchInsertsTks} = _Templates
  509. ) ->
  510. case maps:get(Key, BatchInsertsTks, undefined) of
  511. undefined ->
  512. BatchReqs;
  513. #{?BATCH_INSERT_PART := BatchInserts, ?BATCH_PARAMS_TOKENS := BatchParamsTks} ->
  514. SQL = proc_batch_sql(BatchReqs, BatchInserts, BatchParamsTks),
  515. {Key, SQL}
  516. end;
  517. apply_template(Query, Templates) ->
  518. %% TODO: more detail information
  519. ?SLOG(error, #{msg => "apply_sql_template_failed", query => Query, templates => Templates}),
  520. {error, failed_to_apply_sql_template}.
  521. proc_batch_sql(BatchReqs, BatchInserts, Tokens) ->
  522. Values = erlang:iolist_to_binary(
  523. lists:join($,, [
  524. emqx_placeholder:proc_sql_param_str(Tokens, Msg)
  525. || {_, Msg} <- BatchReqs
  526. ])
  527. ),
  528. <<BatchInserts/binary, " values ", Values/binary>>.
  529. to_bin(List) when is_list(List) ->
  530. unicode:characters_to_binary(List, utf8).