emqx_bridge_sqlserver_connector.erl 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_sqlserver_connector).
  5. -behaviour(emqx_resource).
  6. -include("emqx_bridge_sqlserver.hrl").
  7. -include_lib("kernel/include/file.hrl").
  8. -include_lib("emqx/include/logger.hrl").
  9. -include_lib("emqx_resource/include/emqx_resource.hrl").
  10. -include_lib("typerefl/include/types.hrl").
  11. -include_lib("hocon/include/hoconsc.hrl").
  12. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  13. %%====================================================================
  14. %% Exports
  15. %%====================================================================
  16. %% Hocon config schema exports
  17. -export([
  18. roots/0,
  19. fields/1
  20. ]).
  21. %% callbacks for behaviour emqx_resource
  22. -export([
  23. callback_mode/0,
  24. on_start/2,
  25. on_stop/2,
  26. on_query/3,
  27. on_batch_query/3,
  28. on_get_status/2
  29. ]).
  30. %% callbacks for ecpool
  31. -export([connect/1]).
  32. %% Internal exports used to execute code with ecpool worker
  33. -export([do_get_status/1, worker_do_insert/3]).
  34. -import(emqx_utils_conv, [str/1]).
  35. -import(hoconsc, [mk/2, enum/1, ref/2]).
  36. -define(ACTION_SEND_MESSAGE, send_message).
  37. -define(SYNC_QUERY_MODE, handover).
  38. -define(SQLSERVER_HOST_OPTIONS, #{
  39. default_port => ?SQLSERVER_DEFAULT_PORT
  40. }).
  41. -define(REQUEST_TTL(RESOURCE_OPTS),
  42. maps:get(request_ttl, RESOURCE_OPTS, ?DEFAULT_REQUEST_TTL)
  43. ).
  44. -define(BATCH_INSERT_TEMP, batch_insert_temp).
  45. -define(BATCH_INSERT_PART, batch_insert_part).
  46. -define(BATCH_PARAMS_TOKENS, batch_insert_tks).
  47. -define(FILE_MODE_755, 33261).
  48. %% 32768 + 8#00400 + 8#00200 + 8#00100 + 8#00040 + 8#00010 + 8#00004 + 8#00001
  49. %% See also
  50. %% https://www.erlang.org/doc/man/file.html#read_file_info-2
  51. %% Copied from odbc reference page
  52. %% https://www.erlang.org/doc/man/odbc.html
  53. %% as returned by connect/2
  54. -type connection_reference() :: pid().
  55. -type time_out() :: milliseconds() | infinity.
  56. -type sql() :: string() | binary().
  57. -type milliseconds() :: pos_integer().
  58. %% Tuple of column values e.g. one row of the result set.
  59. %% it's a variable size tuple of column values.
  60. -type row() :: tuple().
  61. %% Some kind of explanation of what went wrong
  62. -type common_reason() :: connection_closed | extended_error() | term().
  63. %% extended error type with ODBC
  64. %% and native database error codes, as well as the base reason that would have been
  65. %% returned had extended_errors not been enabled.
  66. -type extended_error() :: {string(), integer(), _Reason :: term()}.
  67. %% Name of column in the result set
  68. -type col_name() :: string().
  69. %% e.g. a list of the names of the selected columns in the result set.
  70. -type col_names() :: [col_name()].
  71. %% A list of rows from the result set.
  72. -type rows() :: list(row()).
  73. %% -type result_tuple() :: {updated, n_rows()} | {selected, col_names(), rows()}.
  74. -type updated_tuple() :: {updated, n_rows()}.
  75. -type selected_tuple() :: {selected, col_names(), rows()}.
  76. %% The number of affected rows for UPDATE,
  77. %% INSERT, or DELETE queries. For other query types the value
  78. %% is driver defined, and hence should be ignored.
  79. -type n_rows() :: integer().
  80. %% These type was not used in this module, but we may use it later
  81. %% -type odbc_data_type() ::
  82. %% sql_integer
  83. %% | sql_smallint
  84. %% | sql_tinyint
  85. %% | {sql_decimal, precision(), scale()}
  86. %% | {sql_numeric, precision(), scale()}
  87. %% | {sql_char, size()}
  88. %% | {sql_wchar, size()}
  89. %% | {sql_varchar, size()}
  90. %% | {sql_wvarchar, size()}
  91. %% | {sql_float, precision()}
  92. %% | {sql_wlongvarchar, size()}
  93. %% | {sql_float, precision()}
  94. %% | sql_real
  95. %% | sql_double
  96. %% | sql_bit
  97. %% | atom().
  98. %% -type precision() :: integer().
  99. %% -type scale() :: integer().
  100. %% -type size() :: integer().
  101. -type state() :: #{
  102. pool_name := binary(),
  103. resource_opts := map(),
  104. sql_templates := map()
  105. }.
  106. %%====================================================================
  107. %% Configuration and default values
  108. %%====================================================================
  109. roots() ->
  110. [{config, #{type => hoconsc:ref(?MODULE, config)}}].
  111. fields(config) ->
  112. [
  113. {server, server()}
  114. | add_default_username(emqx_connector_schema_lib:relational_db_fields())
  115. ].
  116. add_default_username(Fields) ->
  117. lists:map(
  118. fun
  119. ({username, OrigUsernameFn}) ->
  120. {username, add_default_fn(OrigUsernameFn, <<"sa">>)};
  121. (Field) ->
  122. Field
  123. end,
  124. Fields
  125. ).
  126. add_default_fn(OrigFn, Default) ->
  127. fun
  128. (default) -> Default;
  129. (Field) -> OrigFn(Field)
  130. end.
  131. server() ->
  132. Meta = #{desc => ?DESC("server")},
  133. emqx_schema:servers_sc(Meta, ?SQLSERVER_HOST_OPTIONS).
  134. %%====================================================================
  135. %% Callbacks defined in emqx_resource
  136. %%====================================================================
  137. callback_mode() -> always_sync.
  138. on_start(
  139. ResourceId = PoolName,
  140. #{
  141. server := Server,
  142. username := Username,
  143. driver := Driver,
  144. database := Database,
  145. pool_size := PoolSize,
  146. resource_opts := ResourceOpts
  147. } = Config
  148. ) ->
  149. ?SLOG(info, #{
  150. msg => "starting_sqlserver_connector",
  151. connector => ResourceId,
  152. config => emqx_utils:redact(Config)
  153. }),
  154. ODBCDir = code:priv_dir(odbc),
  155. OdbcserverDir = filename:join(ODBCDir, "bin/odbcserver"),
  156. {ok, Info = #file_info{mode = Mode}} = file:read_file_info(OdbcserverDir),
  157. case ?FILE_MODE_755 =:= Mode of
  158. true ->
  159. ok;
  160. false ->
  161. _ = file:write_file_info(OdbcserverDir, Info#file_info{mode = ?FILE_MODE_755}),
  162. ok
  163. end,
  164. Options = [
  165. {server, to_bin(Server)},
  166. {username, Username},
  167. {password, emqx_secret:wrap(maps:get(password, Config, ""))},
  168. {driver, Driver},
  169. {database, Database},
  170. {pool_size, PoolSize}
  171. ],
  172. State = #{
  173. %% also ResourceId
  174. pool_name => PoolName,
  175. sql_templates => parse_sql_template(Config),
  176. resource_opts => ResourceOpts
  177. },
  178. case emqx_resource_pool:start(PoolName, ?MODULE, Options) of
  179. ok ->
  180. {ok, State};
  181. {error, Reason} ->
  182. ?tp(
  183. sqlserver_connector_start_failed,
  184. #{error => Reason}
  185. ),
  186. {error, Reason}
  187. end.
  188. on_stop(ResourceId, _State) ->
  189. ?SLOG(info, #{
  190. msg => "stopping_sqlserver_connector",
  191. connector => ResourceId
  192. }),
  193. emqx_resource_pool:stop(ResourceId).
  194. -spec on_query(
  195. resource_id(),
  196. {?ACTION_SEND_MESSAGE, map()},
  197. state()
  198. ) ->
  199. ok
  200. | {ok, list()}
  201. | {error, {recoverable_error, term()}}
  202. | {error, term()}.
  203. on_query(ResourceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) ->
  204. ?TRACE(
  205. "SINGLE_QUERY_SYNC",
  206. "bridge_sqlserver_received",
  207. #{requests => Query, connector => ResourceId, state => State}
  208. ),
  209. do_query(ResourceId, Query, ?SYNC_QUERY_MODE, State).
  210. -spec on_batch_query(
  211. resource_id(),
  212. [{?ACTION_SEND_MESSAGE, map()}],
  213. state()
  214. ) ->
  215. ok
  216. | {ok, list()}
  217. | {error, {recoverable_error, term()}}
  218. | {error, term()}.
  219. on_batch_query(ResourceId, BatchRequests, State) ->
  220. ?TRACE(
  221. "BATCH_QUERY_SYNC",
  222. "bridge_sqlserver_received",
  223. #{requests => BatchRequests, connector => ResourceId, state => State}
  224. ),
  225. do_query(ResourceId, BatchRequests, ?SYNC_QUERY_MODE, State).
  226. on_get_status(_InstanceId, #{pool_name := PoolName} = _State) ->
  227. Health = emqx_resource_pool:health_check_workers(
  228. PoolName,
  229. {?MODULE, do_get_status, []}
  230. ),
  231. status_result(Health).
  232. status_result(_Status = true) -> connected;
  233. status_result(_Status = false) -> connecting.
  234. %% TODO:
  235. %% case for disconnected
  236. %%====================================================================
  237. %% ecpool callback fns
  238. %%====================================================================
  239. -spec connect(Options :: list()) -> {ok, connection_reference()} | {error, term()}.
  240. connect(Options) ->
  241. ConnectStr = lists:concat(conn_str(Options, [])),
  242. Opts = proplists:get_value(options, Options, []),
  243. odbc:connect(ConnectStr, Opts).
  244. -spec do_get_status(connection_reference()) -> Result :: boolean().
  245. do_get_status(Conn) ->
  246. case execute(Conn, <<"SELECT 1">>) of
  247. {selected, [[]], [{1}]} -> true;
  248. _ -> false
  249. end.
  250. %%====================================================================
  251. %% Internal Helper fns
  252. %%====================================================================
  253. %% TODO && FIXME:
  254. %% About the connection string attribute `Encrypt`:
  255. %% The default value is `yes` in odbc version 18.0+ and `no` in previous versions.
  256. %% And encrypted connections always verify the server's certificate.
  257. %% So `Encrypt=YES;TrustServerCertificate=YES` must be set in the connection string
  258. %% when connecting to a server that has a self-signed certificate.
  259. %% See also:
  260. %% 'https://learn.microsoft.com/en-us/sql/connect/odbc/
  261. %% dsn-connection-string-attribute?source=recommendations&view=sql-server-ver16#encrypt'
  262. conn_str([], Acc) ->
  263. %% we should use this for msodbcsql 18+
  264. %% lists:join(";", ["Encrypt=YES", "TrustServerCertificate=YES" | Acc]);
  265. lists:join(";", Acc);
  266. conn_str([{driver, Driver} | Opts], Acc) ->
  267. conn_str(Opts, ["Driver=" ++ str(Driver) | Acc]);
  268. conn_str([{server, Server} | Opts], Acc) ->
  269. #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?SQLSERVER_HOST_OPTIONS),
  270. conn_str(Opts, ["Server=" ++ str(Host) ++ "," ++ str(Port) | Acc]);
  271. conn_str([{database, Database} | Opts], Acc) ->
  272. conn_str(Opts, ["Database=" ++ str(Database) | Acc]);
  273. conn_str([{username, Username} | Opts], Acc) ->
  274. conn_str(Opts, ["UID=" ++ str(Username) | Acc]);
  275. conn_str([{password, Password} | Opts], Acc) ->
  276. conn_str(Opts, ["PWD=" ++ str(emqx_secret:unwrap(Password)) | Acc]);
  277. conn_str([{_, _} | Opts], Acc) ->
  278. conn_str(Opts, Acc).
  279. %% Query with singe & batch sql statement
  280. -spec do_query(
  281. resource_id(),
  282. Query :: {?ACTION_SEND_MESSAGE, map()} | [{?ACTION_SEND_MESSAGE, map()}],
  283. ApplyMode :: handover,
  284. state()
  285. ) ->
  286. {ok, list()}
  287. | {error, {recoverable_error, term()}}
  288. | {error, {unrecoverable_error, term()}}
  289. | {error, term()}.
  290. do_query(
  291. ResourceId,
  292. Query,
  293. ApplyMode,
  294. #{pool_name := PoolName, sql_templates := Templates} = State
  295. ) ->
  296. ?TRACE(
  297. "SINGLE_QUERY_SYNC",
  298. "sqlserver_connector_received",
  299. #{query => Query, connector => ResourceId, state => State}
  300. ),
  301. %% only insert sql statement for single query and batch query
  302. case apply_template(Query, Templates) of
  303. {?ACTION_SEND_MESSAGE, SQL} ->
  304. Result = ecpool:pick_and_do(
  305. PoolName,
  306. {?MODULE, worker_do_insert, [SQL, State]},
  307. ApplyMode
  308. );
  309. Query ->
  310. Result = {error, {unrecoverable_error, invalid_query}};
  311. _ ->
  312. Result = {error, {unrecoverable_error, failed_to_apply_sql_template}}
  313. end,
  314. case Result of
  315. {error, Reason} ->
  316. ?tp(
  317. sqlserver_connector_query_return,
  318. #{error => Reason}
  319. ),
  320. ?SLOG(error, #{
  321. msg => "sqlserver_connector_do_query_failed",
  322. connector => ResourceId,
  323. query => Query,
  324. reason => Reason
  325. }),
  326. case Reason of
  327. ecpool_empty ->
  328. {error, {recoverable_error, Reason}};
  329. _ ->
  330. Result
  331. end;
  332. _ ->
  333. ?tp(
  334. sqlserver_connector_query_return,
  335. #{result => Result}
  336. ),
  337. Result
  338. end.
  339. worker_do_insert(
  340. Conn, SQL, #{resource_opts := ResourceOpts, pool_name := ResourceId} = State
  341. ) ->
  342. LogMeta = #{connector => ResourceId, state => State},
  343. try
  344. case execute(Conn, SQL, ?REQUEST_TTL(ResourceOpts)) of
  345. {selected, Rows, _} ->
  346. {ok, Rows};
  347. {updated, _} ->
  348. ok;
  349. {error, ErrStr} ->
  350. ?SLOG(error, LogMeta#{msg => "invalid_request", reason => ErrStr}),
  351. {error, {unrecoverable_error, {invalid_request, ErrStr}}}
  352. end
  353. catch
  354. _Type:Reason ->
  355. ?SLOG(error, LogMeta#{msg => "invalid_request", reason => Reason}),
  356. {error, {unrecoverable_error, {invalid_request, Reason}}}
  357. end.
  358. -spec execute(pid(), sql()) ->
  359. updated_tuple()
  360. | selected_tuple()
  361. | [updated_tuple()]
  362. | [selected_tuple()]
  363. | {error, common_reason()}.
  364. execute(Conn, SQL) ->
  365. odbc:sql_query(Conn, str(SQL)).
  366. -spec execute(pid(), sql(), time_out()) ->
  367. updated_tuple()
  368. | selected_tuple()
  369. | [updated_tuple()]
  370. | [selected_tuple()]
  371. | {error, common_reason()}.
  372. execute(Conn, SQL, Timeout) ->
  373. odbc:sql_query(Conn, str(SQL), Timeout).
  374. to_bin(List) when is_list(List) ->
  375. unicode:characters_to_binary(List, utf8).
  376. %% for bridge data to sql server
  377. parse_sql_template(Config) ->
  378. RawSQLTemplates =
  379. case maps:get(sql, Config, undefined) of
  380. undefined -> #{};
  381. <<>> -> #{};
  382. SQLTemplate -> #{?ACTION_SEND_MESSAGE => SQLTemplate}
  383. end,
  384. BatchInsertTks = #{},
  385. parse_sql_template(maps:to_list(RawSQLTemplates), BatchInsertTks).
  386. parse_sql_template([{Key, H} | T], BatchInsertTks) ->
  387. case emqx_utils_sql:get_statement_type(H) of
  388. select ->
  389. parse_sql_template(T, BatchInsertTks);
  390. insert ->
  391. case emqx_utils_sql:parse_insert(H) of
  392. {ok, {InsertSQL, Params}} ->
  393. parse_sql_template(
  394. T,
  395. BatchInsertTks#{
  396. Key =>
  397. #{
  398. ?BATCH_INSERT_PART => InsertSQL,
  399. ?BATCH_PARAMS_TOKENS => emqx_placeholder:preproc_tmpl(Params)
  400. }
  401. }
  402. );
  403. {error, Reason} ->
  404. ?SLOG(error, #{msg => "split_sql_failed", sql => H, reason => Reason}),
  405. parse_sql_template(T, BatchInsertTks)
  406. end;
  407. Type when is_atom(Type) ->
  408. ?SLOG(error, #{msg => "detect_sql_type_unsupported", sql => H, type => Type}),
  409. parse_sql_template(T, BatchInsertTks);
  410. {error, Reason} ->
  411. ?SLOG(error, #{msg => "detect_sql_type_failed", sql => H, reason => Reason}),
  412. parse_sql_template(T, BatchInsertTks)
  413. end;
  414. parse_sql_template([], BatchInsertTks) ->
  415. #{
  416. ?BATCH_INSERT_TEMP => BatchInsertTks
  417. }.
  418. %% single insert
  419. apply_template(
  420. {?ACTION_SEND_MESSAGE = _Key, _Msg} = Query, Templates
  421. ) ->
  422. %% TODO: fix emqx_placeholder:proc_tmpl/2
  423. %% it won't add single quotes for string
  424. apply_template([Query], Templates);
  425. %% batch inserts
  426. apply_template(
  427. [{?ACTION_SEND_MESSAGE = Key, _Msg} | _T] = BatchReqs,
  428. #{?BATCH_INSERT_TEMP := BatchInsertsTks} = _Templates
  429. ) ->
  430. case maps:get(Key, BatchInsertsTks, undefined) of
  431. undefined ->
  432. BatchReqs;
  433. #{?BATCH_INSERT_PART := BatchInserts, ?BATCH_PARAMS_TOKENS := BatchParamsTks} ->
  434. SQL = proc_batch_sql(BatchReqs, BatchInserts, BatchParamsTks),
  435. {Key, SQL}
  436. end;
  437. apply_template(Query, Templates) ->
  438. %% TODO: more detail information
  439. ?SLOG(error, #{msg => "apply_sql_template_failed", query => Query, templates => Templates}),
  440. {error, failed_to_apply_sql_template}.
  441. proc_batch_sql(BatchReqs, BatchInserts, Tokens) ->
  442. Values = erlang:iolist_to_binary(
  443. lists:join($,, [
  444. emqx_placeholder:proc_sql_param_str(Tokens, Msg)
  445. || {_, Msg} <- BatchReqs
  446. ])
  447. ),
  448. <<BatchInserts/binary, " values ", Values/binary>>.