emqx_bridge_sqlserver_connector.erl 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_sqlserver_connector).
  5. -behaviour(emqx_resource).
  6. -include("emqx_bridge_sqlserver.hrl").
  7. -include_lib("kernel/include/file.hrl").
  8. -include_lib("emqx/include/logger.hrl").
  9. -include_lib("emqx_resource/include/emqx_resource.hrl").
  10. -include_lib("typerefl/include/types.hrl").
  11. -include_lib("hocon/include/hoconsc.hrl").
  12. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  13. %%====================================================================
  14. %% Exports
  15. %%====================================================================
  16. %% Hocon config schema exports
  17. -export([
  18. roots/0,
  19. fields/1,
  20. namespace/0
  21. ]).
  22. %% callbacks for behaviour emqx_resource
  23. -export([
  24. callback_mode/0,
  25. on_start/2,
  26. on_stop/2,
  27. on_query/3,
  28. on_batch_query/3,
  29. on_get_status/2,
  30. on_add_channel/4,
  31. on_remove_channel/3,
  32. on_get_channels/1,
  33. on_get_channel_status/3,
  34. on_format_query_result/1
  35. ]).
  36. %% callbacks for ecpool
  37. -export([connect/1]).
  38. %% Internal exports used to execute code with ecpool worker
  39. -export([do_get_status/1, worker_do_insert/3]).
  40. -import(emqx_utils_conv, [str/1]).
  41. -import(hoconsc, [mk/2, enum/1, ref/2]).
  42. -define(ACTION_SEND_MESSAGE, send_message).
  43. -define(SYNC_QUERY_MODE, handover).
  44. -define(SQLSERVER_HOST_OPTIONS, #{
  45. default_port => ?SQLSERVER_DEFAULT_PORT
  46. }).
  47. -define(REQUEST_TTL(RESOURCE_OPTS),
  48. maps:get(request_ttl, RESOURCE_OPTS, ?DEFAULT_REQUEST_TTL)
  49. ).
  50. -define(BATCH_INSERT_TEMP, batch_insert_temp).
  51. -define(BATCH_INSERT_PART, batch_insert_part).
  52. -define(BATCH_PARAMS_TOKENS, batch_insert_tks).
  53. -define(FILE_MODE_755, 33261).
  54. %% 32768 + 8#00400 + 8#00200 + 8#00100 + 8#00040 + 8#00010 + 8#00004 + 8#00001
  55. %% See also
  56. %% https://www.erlang.org/doc/man/file.html#read_file_info-2
  57. %% Copied from odbc reference page
  58. %% https://www.erlang.org/doc/man/odbc.html
  59. %% as returned by connect/2
  60. -type connection_reference() :: odbc:connection_reference().
  61. -type time_out() :: milliseconds() | infinity.
  62. -type sql() :: string() | binary().
  63. -type milliseconds() :: pos_integer().
  64. %% Tuple of column values e.g. one row of the result set.
  65. %% it's a variable size tuple of column values.
  66. -type row() :: tuple().
  67. %% Some kind of explanation of what went wrong
  68. -type common_reason() :: connection_closed | extended_error() | term().
  69. %% extended error type with ODBC
  70. %% and native database error codes, as well as the base reason that would have been
  71. %% returned had extended_errors not been enabled.
  72. -type extended_error() :: {string(), integer(), _Reason :: term()}.
  73. %% Name of column in the result set
  74. -type col_name() :: string().
  75. %% e.g. a list of the names of the selected columns in the result set.
  76. -type col_names() :: [col_name()].
  77. %% A list of rows from the result set.
  78. -type rows() :: list(row()).
  79. %% -type result_tuple() :: {updated, n_rows()} | {selected, col_names(), rows()}.
  80. -type updated_tuple() :: {updated, n_rows()}.
  81. -type selected_tuple() :: {selected, col_names(), rows()}.
  82. %% The number of affected rows for UPDATE,
  83. %% INSERT, or DELETE queries. For other query types the value
  84. %% is driver defined, and hence should be ignored.
  85. -type n_rows() :: integer().
  86. %% These type was not used in this module, but we may use it later
  87. %% -type odbc_data_type() ::
  88. %% sql_integer
  89. %% | sql_smallint
  90. %% | sql_tinyint
  91. %% | {sql_decimal, precision(), scale()}
  92. %% | {sql_numeric, precision(), scale()}
  93. %% | {sql_char, size()}
  94. %% | {sql_wchar, size()}
  95. %% | {sql_varchar, size()}
  96. %% | {sql_wvarchar, size()}
  97. %% | {sql_float, precision()}
  98. %% | {sql_wlongvarchar, size()}
  99. %% | {sql_float, precision()}
  100. %% | sql_real
  101. %% | sql_double
  102. %% | sql_bit
  103. %% | atom().
  104. %% -type precision() :: integer().
  105. %% -type scale() :: integer().
  106. %% -type size() :: integer().
  107. -type state() :: #{
  108. installed_channels := map(),
  109. pool_name := binary(),
  110. resource_opts := map()
  111. }.
  112. %%====================================================================
  113. %% Configuration and default values
  114. %%====================================================================
  115. namespace() -> sqlserver.
  116. roots() ->
  117. [{config, #{type => hoconsc:ref(?MODULE, config)}}].
  118. fields(config) ->
  119. [
  120. {server, server()}
  121. | add_default_username(emqx_connector_schema_lib:relational_db_fields())
  122. ].
  123. add_default_username(Fields) ->
  124. lists:map(
  125. fun
  126. ({username, OrigUsernameFn}) ->
  127. {username, add_default_fn(OrigUsernameFn, <<"sa">>)};
  128. (Field) ->
  129. Field
  130. end,
  131. Fields
  132. ).
  133. add_default_fn(OrigFn, Default) ->
  134. fun
  135. (default) -> Default;
  136. (Field) -> OrigFn(Field)
  137. end.
  138. server() ->
  139. Meta = #{desc => ?DESC("server")},
  140. emqx_schema:servers_sc(Meta, ?SQLSERVER_HOST_OPTIONS).
  141. %%====================================================================
  142. %% Callbacks defined in emqx_resource
  143. %%====================================================================
  144. callback_mode() -> always_sync.
  145. on_start(
  146. InstanceId = PoolName,
  147. #{
  148. server := Server,
  149. username := Username,
  150. driver := Driver,
  151. database := Database,
  152. pool_size := PoolSize,
  153. resource_opts := ResourceOpts
  154. } = Config
  155. ) ->
  156. ?SLOG(info, #{
  157. msg => "starting_sqlserver_connector",
  158. connector => InstanceId,
  159. config => emqx_utils:redact(Config)
  160. }),
  161. ODBCDir = code:priv_dir(odbc),
  162. OdbcserverDir = filename:join(ODBCDir, "bin/odbcserver"),
  163. {ok, Info = #file_info{mode = Mode}} = file:read_file_info(OdbcserverDir),
  164. case ?FILE_MODE_755 =:= Mode of
  165. true ->
  166. ok;
  167. false ->
  168. _ = file:write_file_info(OdbcserverDir, Info#file_info{mode = ?FILE_MODE_755}),
  169. ok
  170. end,
  171. %% odbc connection string required
  172. ConnectOptions = [
  173. {server, to_bin(Server)},
  174. {username, Username},
  175. {password, maps:get(password, Config, emqx_secret:wrap(""))},
  176. {driver, Driver},
  177. {database, Database},
  178. {pool_size, PoolSize}
  179. ],
  180. State = #{
  181. %% also InstanceId
  182. pool_name => PoolName,
  183. installed_channels => #{},
  184. resource_opts => ResourceOpts
  185. },
  186. case emqx_resource_pool:start(PoolName, ?MODULE, ConnectOptions) of
  187. ok ->
  188. {ok, State};
  189. {error, Reason} ->
  190. ?tp(
  191. sqlserver_connector_start_failed,
  192. #{error => Reason}
  193. ),
  194. {error, Reason}
  195. end.
  196. on_add_channel(
  197. _InstId,
  198. #{
  199. installed_channels := InstalledChannels
  200. } = OldState,
  201. ChannelId,
  202. ChannelConfig
  203. ) ->
  204. {ok, ChannelState} = create_channel_state(ChannelConfig),
  205. NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
  206. %% Update state
  207. NewState = OldState#{installed_channels => NewInstalledChannels},
  208. {ok, NewState}.
  209. create_channel_state(
  210. #{parameters := Conf} = _ChannelConfig
  211. ) ->
  212. State = #{sql_templates => parse_sql_template(Conf)},
  213. {ok, State}.
  214. on_remove_channel(
  215. _InstId,
  216. #{
  217. installed_channels := InstalledChannels
  218. } = OldState,
  219. ChannelId
  220. ) ->
  221. NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
  222. %% Update state
  223. NewState = OldState#{installed_channels => NewInstalledChannels},
  224. {ok, NewState}.
  225. on_get_channel_status(
  226. InstanceId,
  227. ChannelId,
  228. #{installed_channels := Channels} = State
  229. ) ->
  230. case maps:find(ChannelId, Channels) of
  231. {ok, _} -> on_get_status(InstanceId, State);
  232. error -> ?status_disconnected
  233. end.
  234. on_get_channels(ResId) ->
  235. emqx_bridge_v2:get_channels_for_connector(ResId).
  236. on_stop(InstanceId, _State) ->
  237. ?tp(
  238. sqlserver_connector_on_stop,
  239. #{instance_id => InstanceId}
  240. ),
  241. ?SLOG(info, #{
  242. msg => "stopping_sqlserver_connector",
  243. connector => InstanceId
  244. }),
  245. emqx_resource_pool:stop(InstanceId).
  246. -spec on_query(
  247. resource_id(),
  248. Query :: {channel_id(), map()},
  249. state()
  250. ) ->
  251. ok
  252. | {ok, list()}
  253. | {error, {recoverable_error, term()}}
  254. | {error, term()}.
  255. on_query(ResourceId, {_ChannelId, _Msg} = Query, State) ->
  256. ?TRACE(
  257. "SINGLE_QUERY_SYNC",
  258. "bridge_sqlserver_received",
  259. #{requests => Query, connector => ResourceId, state => State}
  260. ),
  261. do_query(ResourceId, Query, ?SYNC_QUERY_MODE, State).
  262. -spec on_batch_query(
  263. resource_id(),
  264. [{channel_id(), map()}],
  265. state()
  266. ) ->
  267. ok
  268. | {ok, list()}
  269. | {error, {recoverable_error, term()}}
  270. | {error, term()}.
  271. on_batch_query(ResourceId, BatchRequests, State) ->
  272. ?TRACE(
  273. "BATCH_QUERY_SYNC",
  274. "bridge_sqlserver_received",
  275. #{requests => BatchRequests, connector => ResourceId, state => State}
  276. ),
  277. do_query(ResourceId, BatchRequests, ?SYNC_QUERY_MODE, State).
  278. on_format_query_result({ok, Rows}) ->
  279. #{result => ok, rows => Rows};
  280. on_format_query_result(Result) ->
  281. Result.
  282. on_get_status(_InstanceId, #{pool_name := PoolName} = _State) ->
  283. Health = emqx_resource_pool:health_check_workers(
  284. PoolName,
  285. {?MODULE, do_get_status, []}
  286. ),
  287. status_result(Health).
  288. status_result(_Status = true) -> ?status_connected;
  289. status_result(_Status = false) -> ?status_connecting.
  290. %% TODO:
  291. %% case for disconnected
  292. %%====================================================================
  293. %% ecpool callback fns
  294. %%====================================================================
  295. -spec connect(Options :: list()) -> {ok, connection_reference()} | {error, term()}.
  296. connect(Options) ->
  297. ConnectStr = lists:concat(conn_str(Options, [])),
  298. Opts = proplists:get_value(options, Options, []),
  299. odbc:connect(ConnectStr, Opts).
  300. -spec do_get_status(connection_reference()) -> Result :: boolean().
  301. do_get_status(Conn) ->
  302. case execute(Conn, <<"SELECT 1">>) of
  303. {selected, [[]], [{1}]} -> true;
  304. _ -> false
  305. end.
  306. %%====================================================================
  307. %% Internal Functions
  308. %%====================================================================
  309. %% TODO && FIXME:
  310. %% About the connection string attribute `Encrypt`:
  311. %% The default value is `yes` in odbc version 18.0+ and `no` in previous versions.
  312. %% And encrypted connections always verify the server's certificate.
  313. %% So `Encrypt=YES;TrustServerCertificate=YES` must be set in the connection string
  314. %% when connecting to a server that has a self-signed certificate.
  315. %% See also:
  316. %% 'https://learn.microsoft.com/en-us/sql/connect/odbc/
  317. %% dsn-connection-string-attribute?source=recommendations&view=sql-server-ver16#encrypt'
  318. conn_str([], Acc) ->
  319. %% we should use this for msodbcsql 18+
  320. %% lists:join(";", ["Encrypt=YES", "TrustServerCertificate=YES" | Acc]);
  321. lists:join(";", Acc);
  322. conn_str([{driver, Driver} | Opts], Acc) ->
  323. conn_str(Opts, ["Driver=" ++ str(Driver) | Acc]);
  324. conn_str([{server, Server} | Opts], Acc) ->
  325. #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?SQLSERVER_HOST_OPTIONS),
  326. conn_str(Opts, ["Server=" ++ str(Host) ++ "," ++ str(Port) | Acc]);
  327. conn_str([{database, Database} | Opts], Acc) ->
  328. conn_str(Opts, ["Database=" ++ str(Database) | Acc]);
  329. conn_str([{username, Username} | Opts], Acc) ->
  330. conn_str(Opts, ["UID=" ++ str(Username) | Acc]);
  331. conn_str([{password, Password} | Opts], Acc) ->
  332. conn_str(Opts, ["PWD=" ++ str(emqx_secret:unwrap(Password)) | Acc]);
  333. conn_str([{_, _} | Opts], Acc) ->
  334. conn_str(Opts, Acc).
  335. %% Query with singe & batch sql statement
  336. -spec do_query(
  337. resource_id(),
  338. Query :: {channel_id(), map()} | [{channel_id(), map()}],
  339. ApplyMode :: handover,
  340. state()
  341. ) ->
  342. {ok, list()}
  343. | {error, {recoverable_error, term()}}
  344. | {error, {unrecoverable_error, term()}}
  345. | {error, term()}.
  346. do_query(
  347. ResourceId,
  348. Query,
  349. ApplyMode,
  350. #{
  351. pool_name := PoolName,
  352. installed_channels := Channels
  353. } = State
  354. ) ->
  355. ?TRACE(
  356. "SINGLE_QUERY_SYNC",
  357. "sqlserver_connector_received",
  358. #{query => Query, connector => ResourceId, state => State}
  359. ),
  360. ChannelId = get_channel_id(Query),
  361. QueryTuple = get_query_tuple(Query),
  362. #{sql_templates := Templates} = _ChannelState = maps:get(ChannelId, Channels),
  363. %% only insert sql statement for single query and batch query
  364. case apply_template(QueryTuple, Templates) of
  365. {?ACTION_SEND_MESSAGE, SQL} ->
  366. emqx_trace:rendered_action_template(ChannelId, #{
  367. sql => SQL
  368. }),
  369. Result = ecpool:pick_and_do(
  370. PoolName,
  371. {?MODULE, worker_do_insert, [SQL, State]},
  372. ApplyMode
  373. );
  374. QueryTuple ->
  375. Result = {error, {unrecoverable_error, invalid_query}};
  376. _ ->
  377. Result = {error, {unrecoverable_error, failed_to_apply_sql_template}}
  378. end,
  379. case Result of
  380. {error, Reason} ->
  381. ?tp(
  382. sqlserver_connector_query_return,
  383. #{error => Reason}
  384. ),
  385. ?SLOG(error, #{
  386. msg => "sqlserver_connector_do_query_failed",
  387. connector => ResourceId,
  388. query => Query,
  389. reason => Reason
  390. }),
  391. case Reason of
  392. ecpool_empty ->
  393. {error, {recoverable_error, Reason}};
  394. _ ->
  395. Result
  396. end;
  397. _ ->
  398. ?tp(
  399. sqlserver_connector_query_return,
  400. #{result => Result}
  401. ),
  402. Result
  403. end.
  404. worker_do_insert(
  405. Conn, SQL, #{resource_opts := ResourceOpts, pool_name := ResourceId} = State
  406. ) ->
  407. LogMeta = #{connector => ResourceId, state => State},
  408. try
  409. case execute(Conn, SQL, ?REQUEST_TTL(ResourceOpts)) of
  410. {selected, Rows, _} ->
  411. {ok, Rows};
  412. {updated, _} ->
  413. ok;
  414. {error, ErrStr} ->
  415. ?SLOG(error, LogMeta#{msg => "invalid_request", reason => ErrStr}),
  416. {error, {unrecoverable_error, {invalid_request, ErrStr}}}
  417. end
  418. catch
  419. _Type:Reason ->
  420. ?SLOG(error, LogMeta#{msg => "invalid_request", reason => Reason}),
  421. {error, {unrecoverable_error, {invalid_request, Reason}}}
  422. end.
  423. -spec execute(connection_reference(), sql()) ->
  424. updated_tuple()
  425. | selected_tuple()
  426. | [updated_tuple()]
  427. | [selected_tuple()]
  428. | {error, common_reason()}.
  429. execute(Conn, SQL) ->
  430. odbc:sql_query(Conn, str(SQL)).
  431. -spec execute(connection_reference(), sql(), time_out()) ->
  432. updated_tuple()
  433. | selected_tuple()
  434. | [updated_tuple()]
  435. | [selected_tuple()]
  436. | {error, common_reason()}.
  437. execute(Conn, SQL, Timeout) ->
  438. odbc:sql_query(Conn, str(SQL), Timeout).
  439. get_channel_id([{ChannelId, _Req} | _]) ->
  440. ChannelId;
  441. get_channel_id({ChannelId, _Req}) ->
  442. ChannelId.
  443. get_query_tuple({_ChannelId, {QueryType, Data}} = _Query) ->
  444. {QueryType, Data};
  445. get_query_tuple({_ChannelId, Data} = _Query) ->
  446. {send_message, Data};
  447. get_query_tuple([{_ChannelId, {_QueryType, _Data}} | _]) ->
  448. error(
  449. {unrecoverable_error,
  450. {invalid_request, <<"The only query type that supports batching is insert.">>}}
  451. );
  452. get_query_tuple([InsertQuery | _]) ->
  453. get_query_tuple(InsertQuery).
  454. %% for bridge data to sql server
  455. parse_sql_template(Config) ->
  456. RawSQLTemplates =
  457. case maps:get(sql, Config, undefined) of
  458. undefined -> #{};
  459. <<>> -> #{};
  460. SQLTemplate -> #{?ACTION_SEND_MESSAGE => SQLTemplate}
  461. end,
  462. BatchInsertTks = #{},
  463. parse_sql_template(maps:to_list(RawSQLTemplates), BatchInsertTks).
  464. parse_sql_template([{Key, H} | T], BatchInsertTks) ->
  465. case emqx_utils_sql:get_statement_type(H) of
  466. select ->
  467. parse_sql_template(T, BatchInsertTks);
  468. insert ->
  469. case emqx_utils_sql:parse_insert(H) of
  470. {ok, {InsertSQL, Params}} ->
  471. parse_sql_template(
  472. T,
  473. BatchInsertTks#{
  474. Key =>
  475. #{
  476. ?BATCH_INSERT_PART => InsertSQL,
  477. ?BATCH_PARAMS_TOKENS => emqx_placeholder:preproc_tmpl(Params)
  478. }
  479. }
  480. );
  481. {error, Reason} ->
  482. ?SLOG(error, #{msg => "split_sql_failed", sql => H, reason => Reason}),
  483. parse_sql_template(T, BatchInsertTks)
  484. end;
  485. Type when is_atom(Type) ->
  486. ?SLOG(error, #{msg => "detect_sql_type_unsupported", sql => H, type => Type}),
  487. parse_sql_template(T, BatchInsertTks);
  488. {error, Reason} ->
  489. ?SLOG(error, #{msg => "detect_sql_type_failed", sql => H, reason => Reason}),
  490. parse_sql_template(T, BatchInsertTks)
  491. end;
  492. parse_sql_template([], BatchInsertTks) ->
  493. #{
  494. ?BATCH_INSERT_TEMP => BatchInsertTks
  495. }.
  496. %% single insert
  497. apply_template(
  498. {?ACTION_SEND_MESSAGE = _Key, _Msg} = Query, Templates
  499. ) ->
  500. %% TODO: fix emqx_placeholder:proc_tmpl/2
  501. %% it won't add single quotes for string
  502. apply_template([Query], Templates);
  503. %% batch inserts
  504. apply_template(
  505. [{?ACTION_SEND_MESSAGE = Key, _Msg} | _T] = BatchReqs,
  506. #{?BATCH_INSERT_TEMP := BatchInsertsTks} = _Templates
  507. ) ->
  508. case maps:get(Key, BatchInsertsTks, undefined) of
  509. undefined ->
  510. BatchReqs;
  511. #{?BATCH_INSERT_PART := BatchInserts, ?BATCH_PARAMS_TOKENS := BatchParamsTks} ->
  512. SQL = proc_batch_sql(BatchReqs, BatchInserts, BatchParamsTks),
  513. {Key, SQL}
  514. end;
  515. apply_template(Query, Templates) ->
  516. %% TODO: more detail information
  517. ?SLOG(error, #{msg => "apply_sql_template_failed", query => Query, templates => Templates}),
  518. {error, failed_to_apply_sql_template}.
  519. proc_batch_sql(BatchReqs, BatchInserts, Tokens) ->
  520. Values = erlang:iolist_to_binary(
  521. lists:join($,, [
  522. emqx_placeholder:proc_sql_param_str(Tokens, Msg)
  523. || {_, Msg} <- BatchReqs
  524. ])
  525. ),
  526. <<BatchInserts/binary, " values ", Values/binary>>.
  527. to_bin(List) when is_list(List) ->
  528. unicode:characters_to_binary(List, utf8).