emqx_bridge_sqlserver_connector.erl 18 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_sqlserver_connector).
  5. -behaviour(emqx_resource).
  6. -include("emqx_bridge_sqlserver.hrl").
  7. -include_lib("kernel/include/file.hrl").
  8. -include_lib("emqx/include/logger.hrl").
  9. -include_lib("emqx_resource/include/emqx_resource.hrl").
  10. -include_lib("typerefl/include/types.hrl").
  11. -include_lib("hocon/include/hoconsc.hrl").
  12. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  13. %%====================================================================
  14. %% Exports
  15. %%====================================================================
  16. %% Hocon config schema exports
  17. -export([
  18. roots/0,
  19. fields/1,
  20. namespace/0
  21. ]).
  22. %% callbacks for behaviour emqx_resource
  23. -export([
  24. callback_mode/0,
  25. on_start/2,
  26. on_stop/2,
  27. on_query/3,
  28. on_batch_query/3,
  29. on_get_status/2,
  30. on_add_channel/4,
  31. on_remove_channel/3,
  32. on_get_channels/1,
  33. on_get_channel_status/3
  34. ]).
  35. %% callbacks for ecpool
  36. -export([connect/1]).
  37. %% Internal exports used to execute code with ecpool worker
  38. -export([do_get_status/1, worker_do_insert/3]).
  39. -import(emqx_utils_conv, [str/1]).
  40. -import(hoconsc, [mk/2, enum/1, ref/2]).
  41. -define(ACTION_SEND_MESSAGE, send_message).
  42. -define(SYNC_QUERY_MODE, handover).
  43. -define(SQLSERVER_HOST_OPTIONS, #{
  44. default_port => ?SQLSERVER_DEFAULT_PORT
  45. }).
  46. -define(REQUEST_TTL(RESOURCE_OPTS),
  47. maps:get(request_ttl, RESOURCE_OPTS, ?DEFAULT_REQUEST_TTL)
  48. ).
  49. -define(BATCH_INSERT_TEMP, batch_insert_temp).
  50. -define(BATCH_INSERT_PART, batch_insert_part).
  51. -define(BATCH_PARAMS_TOKENS, batch_insert_tks).
  52. -define(FILE_MODE_755, 33261).
  53. %% 32768 + 8#00400 + 8#00200 + 8#00100 + 8#00040 + 8#00010 + 8#00004 + 8#00001
  54. %% See also
  55. %% https://www.erlang.org/doc/man/file.html#read_file_info-2
  56. %% Copied from odbc reference page
  57. %% https://www.erlang.org/doc/man/odbc.html
  58. %% as returned by connect/2
  59. -type connection_reference() :: pid().
  60. -type time_out() :: milliseconds() | infinity.
  61. -type sql() :: string() | binary().
  62. -type milliseconds() :: pos_integer().
  63. %% Tuple of column values e.g. one row of the result set.
  64. %% it's a variable size tuple of column values.
  65. -type row() :: tuple().
  66. %% Some kind of explanation of what went wrong
  67. -type common_reason() :: connection_closed | extended_error() | term().
  68. %% extended error type with ODBC
  69. %% and native database error codes, as well as the base reason that would have been
  70. %% returned had extended_errors not been enabled.
  71. -type extended_error() :: {string(), integer(), _Reason :: term()}.
  72. %% Name of column in the result set
  73. -type col_name() :: string().
  74. %% e.g. a list of the names of the selected columns in the result set.
  75. -type col_names() :: [col_name()].
  76. %% A list of rows from the result set.
  77. -type rows() :: list(row()).
  78. %% -type result_tuple() :: {updated, n_rows()} | {selected, col_names(), rows()}.
  79. -type updated_tuple() :: {updated, n_rows()}.
  80. -type selected_tuple() :: {selected, col_names(), rows()}.
  81. %% The number of affected rows for UPDATE,
  82. %% INSERT, or DELETE queries. For other query types the value
  83. %% is driver defined, and hence should be ignored.
  84. -type n_rows() :: integer().
  85. %% These type was not used in this module, but we may use it later
  86. %% -type odbc_data_type() ::
  87. %% sql_integer
  88. %% | sql_smallint
  89. %% | sql_tinyint
  90. %% | {sql_decimal, precision(), scale()}
  91. %% | {sql_numeric, precision(), scale()}
  92. %% | {sql_char, size()}
  93. %% | {sql_wchar, size()}
  94. %% | {sql_varchar, size()}
  95. %% | {sql_wvarchar, size()}
  96. %% | {sql_float, precision()}
  97. %% | {sql_wlongvarchar, size()}
  98. %% | {sql_float, precision()}
  99. %% | sql_real
  100. %% | sql_double
  101. %% | sql_bit
  102. %% | atom().
  103. %% -type precision() :: integer().
  104. %% -type scale() :: integer().
  105. %% -type size() :: integer().
  106. -type state() :: #{
  107. installed_channels := map(),
  108. pool_name := binary(),
  109. resource_opts := map()
  110. }.
  111. %%====================================================================
  112. %% Configuration and default values
  113. %%====================================================================
  114. namespace() -> sqlserver.
  115. roots() ->
  116. [{config, #{type => hoconsc:ref(?MODULE, config)}}].
  117. fields(config) ->
  118. [
  119. {server, server()}
  120. | add_default_username(emqx_connector_schema_lib:relational_db_fields())
  121. ].
  122. add_default_username(Fields) ->
  123. lists:map(
  124. fun
  125. ({username, OrigUsernameFn}) ->
  126. {username, add_default_fn(OrigUsernameFn, <<"sa">>)};
  127. (Field) ->
  128. Field
  129. end,
  130. Fields
  131. ).
  132. add_default_fn(OrigFn, Default) ->
  133. fun
  134. (default) -> Default;
  135. (Field) -> OrigFn(Field)
  136. end.
  137. server() ->
  138. Meta = #{desc => ?DESC("server")},
  139. emqx_schema:servers_sc(Meta, ?SQLSERVER_HOST_OPTIONS).
  140. %%====================================================================
  141. %% Callbacks defined in emqx_resource
  142. %%====================================================================
  143. callback_mode() -> always_sync.
  144. on_start(
  145. InstanceId = PoolName,
  146. #{
  147. server := Server,
  148. username := Username,
  149. driver := Driver,
  150. database := Database,
  151. pool_size := PoolSize,
  152. resource_opts := ResourceOpts
  153. } = Config
  154. ) ->
  155. ?SLOG(info, #{
  156. msg => "starting_sqlserver_connector",
  157. connector => InstanceId,
  158. config => emqx_utils:redact(Config)
  159. }),
  160. ODBCDir = code:priv_dir(odbc),
  161. OdbcserverDir = filename:join(ODBCDir, "bin/odbcserver"),
  162. {ok, Info = #file_info{mode = Mode}} = file:read_file_info(OdbcserverDir),
  163. case ?FILE_MODE_755 =:= Mode of
  164. true ->
  165. ok;
  166. false ->
  167. _ = file:write_file_info(OdbcserverDir, Info#file_info{mode = ?FILE_MODE_755}),
  168. ok
  169. end,
  170. %% odbc connection string required
  171. ConnectOptions = [
  172. {server, to_bin(Server)},
  173. {username, Username},
  174. {password, maps:get(password, Config, emqx_secret:wrap(""))},
  175. {driver, Driver},
  176. {database, Database},
  177. {pool_size, PoolSize}
  178. ],
  179. State = #{
  180. %% also InstanceId
  181. pool_name => PoolName,
  182. installed_channels => #{},
  183. resource_opts => ResourceOpts
  184. },
  185. case emqx_resource_pool:start(PoolName, ?MODULE, ConnectOptions) of
  186. ok ->
  187. {ok, State};
  188. {error, Reason} ->
  189. ?tp(
  190. sqlserver_connector_start_failed,
  191. #{error => Reason}
  192. ),
  193. {error, Reason}
  194. end.
  195. on_add_channel(
  196. _InstId,
  197. #{
  198. installed_channels := InstalledChannels
  199. } = OldState,
  200. ChannelId,
  201. ChannelConfig
  202. ) ->
  203. {ok, ChannelState} = create_channel_state(ChannelConfig),
  204. NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
  205. %% Update state
  206. NewState = OldState#{installed_channels => NewInstalledChannels},
  207. {ok, NewState}.
  208. create_channel_state(
  209. #{parameters := Conf} = _ChannelConfig
  210. ) ->
  211. State = #{sql_templates => parse_sql_template(Conf)},
  212. {ok, State}.
  213. on_remove_channel(
  214. _InstId,
  215. #{
  216. installed_channels := InstalledChannels
  217. } = OldState,
  218. ChannelId
  219. ) ->
  220. NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
  221. %% Update state
  222. NewState = OldState#{installed_channels => NewInstalledChannels},
  223. {ok, NewState}.
  224. on_get_channel_status(
  225. InstanceId,
  226. ChannelId,
  227. #{installed_channels := Channels} = State
  228. ) ->
  229. case maps:find(ChannelId, Channels) of
  230. {ok, _} -> on_get_status(InstanceId, State);
  231. error -> ?status_disconnected
  232. end.
  233. on_get_channels(ResId) ->
  234. emqx_bridge_v2:get_channels_for_connector(ResId).
  235. on_stop(InstanceId, _State) ->
  236. ?tp(
  237. sqlserver_connector_on_stop,
  238. #{instance_id => InstanceId}
  239. ),
  240. ?SLOG(info, #{
  241. msg => "stopping_sqlserver_connector",
  242. connector => InstanceId
  243. }),
  244. emqx_resource_pool:stop(InstanceId).
  245. -spec on_query(
  246. resource_id(),
  247. Query :: {channel_id(), map()},
  248. state()
  249. ) ->
  250. ok
  251. | {ok, list()}
  252. | {error, {recoverable_error, term()}}
  253. | {error, term()}.
  254. on_query(ResourceId, {_ChannelId, _Msg} = Query, State) ->
  255. ?TRACE(
  256. "SINGLE_QUERY_SYNC",
  257. "bridge_sqlserver_received",
  258. #{requests => Query, connector => ResourceId, state => State}
  259. ),
  260. do_query(ResourceId, Query, ?SYNC_QUERY_MODE, State).
  261. -spec on_batch_query(
  262. resource_id(),
  263. [{channel_id(), map()}],
  264. state()
  265. ) ->
  266. ok
  267. | {ok, list()}
  268. | {error, {recoverable_error, term()}}
  269. | {error, term()}.
  270. on_batch_query(ResourceId, BatchRequests, State) ->
  271. ?TRACE(
  272. "BATCH_QUERY_SYNC",
  273. "bridge_sqlserver_received",
  274. #{requests => BatchRequests, connector => ResourceId, state => State}
  275. ),
  276. do_query(ResourceId, BatchRequests, ?SYNC_QUERY_MODE, State).
  277. on_get_status(_InstanceId, #{pool_name := PoolName} = _State) ->
  278. Health = emqx_resource_pool:health_check_workers(
  279. PoolName,
  280. {?MODULE, do_get_status, []}
  281. ),
  282. status_result(Health).
  283. status_result(_Status = true) -> ?status_connected;
  284. status_result(_Status = false) -> ?status_connecting.
  285. %% TODO:
  286. %% case for disconnected
  287. %%====================================================================
  288. %% ecpool callback fns
  289. %%====================================================================
  290. -spec connect(Options :: list()) -> {ok, connection_reference()} | {error, term()}.
  291. connect(Options) ->
  292. ConnectStr = lists:concat(conn_str(Options, [])),
  293. Opts = proplists:get_value(options, Options, []),
  294. odbc:connect(ConnectStr, Opts).
  295. -spec do_get_status(connection_reference()) -> Result :: boolean().
  296. do_get_status(Conn) ->
  297. case execute(Conn, <<"SELECT 1">>) of
  298. {selected, [[]], [{1}]} -> true;
  299. _ -> false
  300. end.
  301. %%====================================================================
  302. %% Internal Functions
  303. %%====================================================================
  304. %% TODO && FIXME:
  305. %% About the connection string attribute `Encrypt`:
  306. %% The default value is `yes` in odbc version 18.0+ and `no` in previous versions.
  307. %% And encrypted connections always verify the server's certificate.
  308. %% So `Encrypt=YES;TrustServerCertificate=YES` must be set in the connection string
  309. %% when connecting to a server that has a self-signed certificate.
  310. %% See also:
  311. %% 'https://learn.microsoft.com/en-us/sql/connect/odbc/
  312. %% dsn-connection-string-attribute?source=recommendations&view=sql-server-ver16#encrypt'
  313. conn_str([], Acc) ->
  314. %% we should use this for msodbcsql 18+
  315. %% lists:join(";", ["Encrypt=YES", "TrustServerCertificate=YES" | Acc]);
  316. lists:join(";", Acc);
  317. conn_str([{driver, Driver} | Opts], Acc) ->
  318. conn_str(Opts, ["Driver=" ++ str(Driver) | Acc]);
  319. conn_str([{server, Server} | Opts], Acc) ->
  320. #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?SQLSERVER_HOST_OPTIONS),
  321. conn_str(Opts, ["Server=" ++ str(Host) ++ "," ++ str(Port) | Acc]);
  322. conn_str([{database, Database} | Opts], Acc) ->
  323. conn_str(Opts, ["Database=" ++ str(Database) | Acc]);
  324. conn_str([{username, Username} | Opts], Acc) ->
  325. conn_str(Opts, ["UID=" ++ str(Username) | Acc]);
  326. conn_str([{password, Password} | Opts], Acc) ->
  327. conn_str(Opts, ["PWD=" ++ str(emqx_secret:unwrap(Password)) | Acc]);
  328. conn_str([{_, _} | Opts], Acc) ->
  329. conn_str(Opts, Acc).
  330. %% Query with singe & batch sql statement
  331. -spec do_query(
  332. resource_id(),
  333. Query :: {channel_id(), map()} | [{channel_id(), map()}],
  334. ApplyMode :: handover,
  335. state()
  336. ) ->
  337. {ok, list()}
  338. | {error, {recoverable_error, term()}}
  339. | {error, {unrecoverable_error, term()}}
  340. | {error, term()}.
  341. do_query(
  342. ResourceId,
  343. Query,
  344. ApplyMode,
  345. #{
  346. pool_name := PoolName,
  347. installed_channels := Channels
  348. } = State
  349. ) ->
  350. ?TRACE(
  351. "SINGLE_QUERY_SYNC",
  352. "sqlserver_connector_received",
  353. #{query => Query, connector => ResourceId, state => State}
  354. ),
  355. ChannelId = get_channel_id(Query),
  356. QueryTuple = get_query_tuple(Query),
  357. #{sql_templates := Templates} = _ChannelState = maps:get(ChannelId, Channels),
  358. %% only insert sql statement for single query and batch query
  359. case apply_template(QueryTuple, Templates) of
  360. {?ACTION_SEND_MESSAGE, SQL} ->
  361. emqx_trace:rendered_action_template(ChannelId, #{
  362. sql => SQL
  363. }),
  364. Result = ecpool:pick_and_do(
  365. PoolName,
  366. {?MODULE, worker_do_insert, [SQL, State]},
  367. ApplyMode
  368. );
  369. QueryTuple ->
  370. Result = {error, {unrecoverable_error, invalid_query}};
  371. _ ->
  372. Result = {error, {unrecoverable_error, failed_to_apply_sql_template}}
  373. end,
  374. case Result of
  375. {error, Reason} ->
  376. ?tp(
  377. sqlserver_connector_query_return,
  378. #{error => Reason}
  379. ),
  380. ?SLOG(error, #{
  381. msg => "sqlserver_connector_do_query_failed",
  382. connector => ResourceId,
  383. query => Query,
  384. reason => Reason
  385. }),
  386. case Reason of
  387. ecpool_empty ->
  388. {error, {recoverable_error, Reason}};
  389. _ ->
  390. Result
  391. end;
  392. _ ->
  393. ?tp(
  394. sqlserver_connector_query_return,
  395. #{result => Result}
  396. ),
  397. Result
  398. end.
  399. worker_do_insert(
  400. Conn, SQL, #{resource_opts := ResourceOpts, pool_name := ResourceId} = State
  401. ) ->
  402. LogMeta = #{connector => ResourceId, state => State},
  403. try
  404. case execute(Conn, SQL, ?REQUEST_TTL(ResourceOpts)) of
  405. {selected, Rows, _} ->
  406. {ok, Rows};
  407. {updated, _} ->
  408. ok;
  409. {error, ErrStr} ->
  410. ?SLOG(error, LogMeta#{msg => "invalid_request", reason => ErrStr}),
  411. {error, {unrecoverable_error, {invalid_request, ErrStr}}}
  412. end
  413. catch
  414. _Type:Reason ->
  415. ?SLOG(error, LogMeta#{msg => "invalid_request", reason => Reason}),
  416. {error, {unrecoverable_error, {invalid_request, Reason}}}
  417. end.
  418. -spec execute(pid(), sql()) ->
  419. updated_tuple()
  420. | selected_tuple()
  421. | [updated_tuple()]
  422. | [selected_tuple()]
  423. | {error, common_reason()}.
  424. execute(Conn, SQL) ->
  425. odbc:sql_query(Conn, str(SQL)).
  426. -spec execute(pid(), sql(), time_out()) ->
  427. updated_tuple()
  428. | selected_tuple()
  429. | [updated_tuple()]
  430. | [selected_tuple()]
  431. | {error, common_reason()}.
  432. execute(Conn, SQL, Timeout) ->
  433. odbc:sql_query(Conn, str(SQL), Timeout).
  434. get_channel_id([{ChannelId, _Req} | _]) ->
  435. ChannelId;
  436. get_channel_id({ChannelId, _Req}) ->
  437. ChannelId.
  438. get_query_tuple({_ChannelId, {QueryType, Data}} = _Query) ->
  439. {QueryType, Data};
  440. get_query_tuple({_ChannelId, Data} = _Query) ->
  441. {send_message, Data};
  442. get_query_tuple([{_ChannelId, {_QueryType, _Data}} | _]) ->
  443. error(
  444. {unrecoverable_error,
  445. {invalid_request, <<"The only query type that supports batching is insert.">>}}
  446. );
  447. get_query_tuple([InsertQuery | _]) ->
  448. get_query_tuple(InsertQuery).
  449. %% for bridge data to sql server
  450. parse_sql_template(Config) ->
  451. RawSQLTemplates =
  452. case maps:get(sql, Config, undefined) of
  453. undefined -> #{};
  454. <<>> -> #{};
  455. SQLTemplate -> #{?ACTION_SEND_MESSAGE => SQLTemplate}
  456. end,
  457. BatchInsertTks = #{},
  458. parse_sql_template(maps:to_list(RawSQLTemplates), BatchInsertTks).
  459. parse_sql_template([{Key, H} | T], BatchInsertTks) ->
  460. case emqx_utils_sql:get_statement_type(H) of
  461. select ->
  462. parse_sql_template(T, BatchInsertTks);
  463. insert ->
  464. case emqx_utils_sql:parse_insert(H) of
  465. {ok, {InsertSQL, Params}} ->
  466. parse_sql_template(
  467. T,
  468. BatchInsertTks#{
  469. Key =>
  470. #{
  471. ?BATCH_INSERT_PART => InsertSQL,
  472. ?BATCH_PARAMS_TOKENS => emqx_placeholder:preproc_tmpl(Params)
  473. }
  474. }
  475. );
  476. {error, Reason} ->
  477. ?SLOG(error, #{msg => "split_sql_failed", sql => H, reason => Reason}),
  478. parse_sql_template(T, BatchInsertTks)
  479. end;
  480. Type when is_atom(Type) ->
  481. ?SLOG(error, #{msg => "detect_sql_type_unsupported", sql => H, type => Type}),
  482. parse_sql_template(T, BatchInsertTks);
  483. {error, Reason} ->
  484. ?SLOG(error, #{msg => "detect_sql_type_failed", sql => H, reason => Reason}),
  485. parse_sql_template(T, BatchInsertTks)
  486. end;
  487. parse_sql_template([], BatchInsertTks) ->
  488. #{
  489. ?BATCH_INSERT_TEMP => BatchInsertTks
  490. }.
  491. %% single insert
  492. apply_template(
  493. {?ACTION_SEND_MESSAGE = _Key, _Msg} = Query, Templates
  494. ) ->
  495. %% TODO: fix emqx_placeholder:proc_tmpl/2
  496. %% it won't add single quotes for string
  497. apply_template([Query], Templates);
  498. %% batch inserts
  499. apply_template(
  500. [{?ACTION_SEND_MESSAGE = Key, _Msg} | _T] = BatchReqs,
  501. #{?BATCH_INSERT_TEMP := BatchInsertsTks} = _Templates
  502. ) ->
  503. case maps:get(Key, BatchInsertsTks, undefined) of
  504. undefined ->
  505. BatchReqs;
  506. #{?BATCH_INSERT_PART := BatchInserts, ?BATCH_PARAMS_TOKENS := BatchParamsTks} ->
  507. SQL = proc_batch_sql(BatchReqs, BatchInserts, BatchParamsTks),
  508. {Key, SQL}
  509. end;
  510. apply_template(Query, Templates) ->
  511. %% TODO: more detail information
  512. ?SLOG(error, #{msg => "apply_sql_template_failed", query => Query, templates => Templates}),
  513. {error, failed_to_apply_sql_template}.
  514. proc_batch_sql(BatchReqs, BatchInserts, Tokens) ->
  515. Values = erlang:iolist_to_binary(
  516. lists:join($,, [
  517. emqx_placeholder:proc_sql_param_str(Tokens, Msg)
  518. || {_, Msg} <- BatchReqs
  519. ])
  520. ),
  521. <<BatchInserts/binary, " values ", Values/binary>>.
  522. to_bin(List) when is_list(List) ->
  523. unicode:characters_to_binary(List, utf8).