emqx_postgresql.erl 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_postgresql).
  17. -include("emqx_postgresql.hrl").
  18. -include_lib("emqx_connector/include/emqx_connector.hrl").
  19. -include_lib("typerefl/include/types.hrl").
  20. -include_lib("emqx/include/logger.hrl").
  21. -include_lib("hocon/include/hoconsc.hrl").
  22. -include_lib("epgsql/include/epgsql.hrl").
  23. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  24. -export([roots/0, fields/1, namespace/0]).
  25. -behaviour(emqx_resource).
  26. %% callbacks of behaviour emqx_resource
  27. -export([
  28. callback_mode/0,
  29. on_start/2,
  30. on_stop/2,
  31. on_query/3,
  32. on_batch_query/3,
  33. on_get_status/2,
  34. on_add_channel/4,
  35. on_remove_channel/3,
  36. on_get_channels/1,
  37. on_get_channel_status/3,
  38. on_format_query_result/1
  39. ]).
  40. -export([connect/1]).
  41. -export([
  42. query/3,
  43. prepared_query/3,
  44. execute_batch/3
  45. ]).
  46. -export([disable_prepared_statements/0]).
  47. %% for ecpool workers usage
  48. -export([do_get_status/1, prepare_sql_to_conn/2]).
  49. -define(PGSQL_HOST_OPTIONS, #{
  50. default_port => ?PGSQL_DEFAULT_PORT
  51. }).
  52. -type template() :: {unicode:chardata(), emqx_template_sql:row_template()}.
  53. -type state() ::
  54. #{
  55. pool_name := binary(),
  56. query_templates := #{binary() => template()},
  57. prepares := disabled | #{binary() => epgsql:statement()} | {error, _}
  58. }.
  59. %% FIXME: add `{error, sync_required}' to `epgsql:execute_batch'
  60. %% We want to be able to call sync if any message from the backend leaves the driver in an
  61. %% inconsistent state needing sync.
  62. -dialyzer({nowarn_function, [execute_batch/3]}).
  63. %%=====================================================================
  64. namespace() -> postgres.
  65. roots() ->
  66. [{config, #{type => hoconsc:ref(?MODULE, config)}}].
  67. fields(config) ->
  68. [
  69. {server, server()},
  70. disable_prepared_statements()
  71. ] ++
  72. adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
  73. emqx_connector_schema_lib:ssl_fields() ++
  74. emqx_connector_schema_lib:prepare_statement_fields().
  75. server() ->
  76. Meta = #{desc => ?DESC("server")},
  77. emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS).
  78. disable_prepared_statements() ->
  79. {disable_prepared_statements,
  80. hoconsc:mk(
  81. boolean(),
  82. #{
  83. default => false,
  84. required => false,
  85. desc => ?DESC("disable_prepared_statements")
  86. }
  87. )}.
  88. adjust_fields(Fields) ->
  89. lists:map(
  90. fun
  91. ({username, Sc}) ->
  92. %% to please dialyzer...
  93. Override = #{type => hocon_schema:field_schema(Sc, type), required => true},
  94. {username, hocon_schema:override(Sc, Override)};
  95. (Field) ->
  96. Field
  97. end,
  98. Fields
  99. ).
  100. %% ===================================================================
  101. callback_mode() -> always_sync.
  102. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.
  103. on_start(
  104. InstId,
  105. #{
  106. server := Server,
  107. disable_prepared_statements := DisablePreparedStatements,
  108. database := DB,
  109. username := User,
  110. pool_size := PoolSize,
  111. ssl := SSL
  112. } = Config
  113. ) ->
  114. #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?PGSQL_HOST_OPTIONS),
  115. ?SLOG(info, #{
  116. msg => "starting_postgresql_connector",
  117. connector => InstId,
  118. config => emqx_utils:redact(Config)
  119. }),
  120. SslOpts =
  121. case maps:get(enable, SSL) of
  122. true ->
  123. [
  124. %% note: this is converted to `required' in
  125. %% `conn_opts/2', and there's a boolean guard
  126. %% there; if this is set to `required' here,
  127. %% that'll require changing `conn_opts/2''s guard.
  128. {ssl, true},
  129. {ssl_opts, emqx_tls_lib:to_client_opts(SSL)}
  130. ];
  131. false ->
  132. [{ssl, false}]
  133. end,
  134. Options = [
  135. {host, Host},
  136. {port, Port},
  137. {username, User},
  138. {password, maps:get(password, Config, emqx_secret:wrap(""))},
  139. {database, DB},
  140. {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
  141. {pool_size, PoolSize}
  142. ],
  143. State1 = parse_sql_template(Config, <<"send_message">>),
  144. State2 = State1#{installed_channels => #{}},
  145. case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
  146. ok ->
  147. Prepares =
  148. case DisablePreparedStatements of
  149. true -> disabled;
  150. false -> #{}
  151. end,
  152. {ok, init_prepare(State2#{pool_name => InstId, prepares => Prepares})};
  153. {error, Reason} ->
  154. ?tp(
  155. pgsql_connector_start_failed,
  156. #{error => Reason}
  157. ),
  158. {error, Reason}
  159. end.
  160. on_stop(InstId, State) ->
  161. ?SLOG(info, #{
  162. msg => "stopping_postgresql_connector",
  163. connector => InstId
  164. }),
  165. close_connections(State),
  166. Res = emqx_resource_pool:stop(InstId),
  167. ?tp(postgres_stopped, #{instance_id => InstId}),
  168. Res.
  169. close_connections(#{pool_name := PoolName} = _State) ->
  170. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  171. close_connections_with_worker_pids(WorkerPids),
  172. ok.
  173. close_connections_with_worker_pids([WorkerPid | Rest]) ->
  174. %% We ignore errors since any error probably means that the
  175. %% connection is closed already.
  176. try ecpool_worker:client(WorkerPid) of
  177. {ok, Conn} ->
  178. _ = epgsql:close(Conn),
  179. close_connections_with_worker_pids(Rest);
  180. _ ->
  181. close_connections_with_worker_pids(Rest)
  182. catch
  183. _:_ ->
  184. close_connections_with_worker_pids(Rest)
  185. end;
  186. close_connections_with_worker_pids([]) ->
  187. ok.
  188. on_add_channel(
  189. _InstId,
  190. #{
  191. installed_channels := InstalledChannels
  192. } = OldState,
  193. ChannelId,
  194. ChannelConfig
  195. ) ->
  196. %% The following will throw an exception if the bridge producers fails to start
  197. {ok, ChannelState} = create_channel_state(ChannelId, OldState, ChannelConfig),
  198. case ChannelState of
  199. #{prepares := {error, Reason}} ->
  200. {error, {unhealthy_target, Reason}};
  201. _ ->
  202. NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
  203. %% Update state
  204. NewState = OldState#{installed_channels => NewInstalledChannels},
  205. {ok, NewState}
  206. end.
  207. create_channel_state(
  208. ChannelId,
  209. #{
  210. pool_name := PoolName,
  211. prepares := Prepares
  212. } = _ConnectorState,
  213. #{parameters := Parameters} = _ChannelConfig
  214. ) ->
  215. State1 = parse_sql_template(Parameters, ChannelId),
  216. {ok,
  217. init_prepare(State1#{
  218. pool_name => PoolName,
  219. prepares => Prepares,
  220. prepare_statement => #{}
  221. })}.
  222. on_remove_channel(
  223. _InstId,
  224. #{
  225. installed_channels := InstalledChannels
  226. } = OldState,
  227. ChannelId
  228. ) ->
  229. %% Close prepared statements
  230. ok = close_prepared_statement(ChannelId, OldState),
  231. NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
  232. %% Update state
  233. NewState = OldState#{installed_channels => NewInstalledChannels},
  234. {ok, NewState}.
  235. close_prepared_statement(_ChannelId, #{prepares := disabled}) ->
  236. ok;
  237. close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) ->
  238. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  239. close_prepared_statement(WorkerPids, ChannelId, State),
  240. ok.
  241. close_prepared_statement([WorkerPid | Rest], ChannelId, State) ->
  242. %% We ignore errors since any error probably means that the
  243. %% prepared statement doesn't exist. If it exists when we try
  244. %% to insert one with the same name, we will try to remove it
  245. %% again anyway.
  246. try ecpool_worker:client(WorkerPid) of
  247. {ok, Conn} ->
  248. Statement = get_templated_statement(ChannelId, State),
  249. _ = epgsql:close(Conn, Statement),
  250. close_prepared_statement(Rest, ChannelId, State);
  251. _ ->
  252. close_prepared_statement(Rest, ChannelId, State)
  253. catch
  254. _:_ ->
  255. close_prepared_statement(Rest, ChannelId, State)
  256. end;
  257. close_prepared_statement([], _ChannelId, _State) ->
  258. ok.
  259. on_get_channel_status(
  260. _ResId,
  261. ChannelId,
  262. #{
  263. pool_name := PoolName,
  264. installed_channels := Channels
  265. } = _State
  266. ) ->
  267. ChannelState = maps:get(ChannelId, Channels),
  268. case
  269. do_check_channel_sql(
  270. PoolName,
  271. ChannelId,
  272. ChannelState
  273. )
  274. of
  275. ok ->
  276. connected;
  277. {error, undefined_table} ->
  278. {error, {unhealthy_target, <<"Table does not exist">>}}
  279. end.
  280. do_check_channel_sql(
  281. PoolName,
  282. ChannelId,
  283. #{query_templates := ChannelQueryTemplates} = _ChannelState
  284. ) ->
  285. {SQL, _RowTemplate} = maps:get(ChannelId, ChannelQueryTemplates),
  286. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  287. validate_table_existence(WorkerPids, SQL).
  288. on_get_channels(ResId) ->
  289. emqx_bridge_v2:get_channels_for_connector(ResId).
  290. on_query(InstId, {TypeOrKey, NameOrSQL}, State) ->
  291. on_query(InstId, {TypeOrKey, NameOrSQL, []}, State);
  292. on_query(
  293. InstId,
  294. {TypeOrKey, NameOrSQL, Params},
  295. #{pool_name := PoolName} = State
  296. ) ->
  297. ?SLOG(debug, #{
  298. msg => "postgresql_connector_received_sql_query",
  299. connector => InstId,
  300. type => TypeOrKey,
  301. sql => NameOrSQL,
  302. state => State
  303. }),
  304. Type = pgsql_query_type(TypeOrKey, State),
  305. {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
  306. Res = on_sql_query(TypeOrKey, InstId, PoolName, Type, NameOrSQL2, Data),
  307. ?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}),
  308. handle_result(Res).
  309. pgsql_query_type(_TypeOrTag, #{prepares := disabled}) ->
  310. query;
  311. pgsql_query_type(sql, _ConnectorState) ->
  312. query;
  313. pgsql_query_type(query, _ConnectorState) ->
  314. query;
  315. pgsql_query_type(prepared_query, _ConnectorState) ->
  316. prepared_query;
  317. %% for bridge
  318. pgsql_query_type(_, ConnectorState) ->
  319. pgsql_query_type(prepared_query, ConnectorState).
  320. on_batch_query(
  321. InstId,
  322. [{Key, _} = Request | _] = BatchReq,
  323. #{pool_name := PoolName} = State
  324. ) ->
  325. BinKey = to_bin(Key),
  326. case get_template(BinKey, State) of
  327. undefined ->
  328. Log = #{
  329. connector => InstId,
  330. first_request => Request,
  331. state => State,
  332. msg => "batch prepare not implemented"
  333. },
  334. ?SLOG(error, Log),
  335. {error, {unrecoverable_error, batch_prepare_not_implemented}};
  336. {_Statement, RowTemplate} ->
  337. StatementTemplate = get_templated_statement(BinKey, State),
  338. Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
  339. case on_sql_query(Key, InstId, PoolName, execute_batch, StatementTemplate, Rows) of
  340. {error, _Error} = Result ->
  341. handle_result(Result);
  342. {_Column, Results} ->
  343. handle_batch_result(Results, 0)
  344. end
  345. end;
  346. on_batch_query(InstId, BatchReq, State) ->
  347. ?SLOG(error, #{
  348. connector => InstId,
  349. request => BatchReq,
  350. state => State,
  351. msg => "invalid request"
  352. }),
  353. {error, {unrecoverable_error, invalid_request}}.
  354. proc_sql_params(query, SQLOrKey, Params, _State) ->
  355. {SQLOrKey, Params};
  356. proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
  357. {SQLOrKey, Params};
  358. proc_sql_params(TypeOrKey, SQLOrData, Params, State) ->
  359. DisablePreparedStatements = maps:get(prepares, State, #{}) =:= disabled,
  360. BinKey = to_bin(TypeOrKey),
  361. case get_template(BinKey, State) of
  362. undefined ->
  363. {SQLOrData, Params};
  364. {Statement, RowTemplate} ->
  365. Rendered = render_prepare_sql_row(RowTemplate, SQLOrData),
  366. case DisablePreparedStatements of
  367. true ->
  368. {Statement, Rendered};
  369. false ->
  370. {BinKey, Rendered}
  371. end
  372. end.
  373. get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) ->
  374. BinKey = to_bin(Key),
  375. ChannelState = maps:get(BinKey, Channels),
  376. ChannelQueryTemplates = maps:get(query_templates, ChannelState),
  377. maps:get(BinKey, ChannelQueryTemplates);
  378. get_template(Key, #{query_templates := Templates}) ->
  379. BinKey = to_bin(Key),
  380. maps:get(BinKey, Templates, undefined).
  381. get_templated_statement(Key, #{installed_channels := Channels} = _State) when
  382. is_map_key(Key, Channels)
  383. ->
  384. BinKey = to_bin(Key),
  385. ChannelState = maps:get(BinKey, Channels),
  386. ChannelPreparedStatements = maps:get(prepares, ChannelState),
  387. maps:get(BinKey, ChannelPreparedStatements);
  388. get_templated_statement(Key, #{prepares := PrepStatements}) ->
  389. BinKey = to_bin(Key),
  390. maps:get(BinKey, PrepStatements).
  391. on_sql_query(Key, InstId, PoolName, Type, NameOrSQL, Data) ->
  392. emqx_trace:rendered_action_template(
  393. Key,
  394. #{
  395. statement_type => Type,
  396. statement_or_name => NameOrSQL,
  397. data => Data
  398. }
  399. ),
  400. try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
  401. {error, Reason} ->
  402. ?tp(
  403. pgsql_connector_query_return,
  404. #{error => Reason}
  405. ),
  406. TranslatedError = translate_to_log_context(Reason),
  407. ?SLOG(
  408. error,
  409. maps:merge(
  410. #{
  411. msg => "postgresql_connector_do_sql_query_failed",
  412. connector => InstId,
  413. type => Type,
  414. sql => NameOrSQL
  415. },
  416. TranslatedError
  417. )
  418. ),
  419. case Reason of
  420. sync_required ->
  421. {error, {recoverable_error, Reason}};
  422. ecpool_empty ->
  423. {error, {recoverable_error, Reason}};
  424. {error, error, _, undefined_table, _, _} ->
  425. {error, {unrecoverable_error, export_error(TranslatedError)}};
  426. _ ->
  427. {error, export_error(TranslatedError)}
  428. end;
  429. Result ->
  430. ?tp(
  431. pgsql_connector_query_return,
  432. #{result => Result}
  433. ),
  434. Result
  435. catch
  436. error:function_clause:Stacktrace ->
  437. ?SLOG(error, #{
  438. msg => "postgresql_connector_do_sql_query_failed",
  439. connector => InstId,
  440. type => Type,
  441. sql => NameOrSQL,
  442. reason => function_clause,
  443. stacktrace => Stacktrace
  444. }),
  445. {error, {unrecoverable_error, invalid_request}}
  446. end.
  447. on_get_status(_InstId, #{pool_name := PoolName} = State) ->
  448. case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
  449. true ->
  450. case do_check_prepares(State) of
  451. ok ->
  452. connected;
  453. {ok, NState} ->
  454. %% return new state with prepared statements
  455. {connected, NState};
  456. {error, undefined_table} ->
  457. %% return new state indicating that we are connected but the target table is not created
  458. {disconnected, State, unhealthy_target};
  459. {error, _Reason} ->
  460. %% do not log error, it is logged in prepare_sql_to_conn
  461. connecting
  462. end;
  463. false ->
  464. connecting
  465. end.
  466. do_get_status(Conn) ->
  467. ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")).
  468. do_check_prepares(
  469. #{
  470. pool_name := PoolName,
  471. query_templates := #{<<"send_message">> := {SQL, _RowTemplate}}
  472. }
  473. ) ->
  474. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  475. case validate_table_existence(WorkerPids, SQL) of
  476. ok ->
  477. ok;
  478. {error, Reason} ->
  479. {error, Reason}
  480. end;
  481. do_check_prepares(#{prepares := disabled}) ->
  482. ok;
  483. do_check_prepares(#{prepares := Prepares}) when is_map(Prepares) ->
  484. ok;
  485. do_check_prepares(#{prepares := {error, _}} = State) ->
  486. %% retry to prepare
  487. case prepare_sql(State) of
  488. {ok, PrepStatements} ->
  489. %% remove the error
  490. {ok, State#{prepares := PrepStatements}};
  491. {error, Reason} ->
  492. {error, Reason}
  493. end.
  494. -spec validate_table_existence([pid()], binary()) -> ok | {error, undefined_table}.
  495. validate_table_existence([WorkerPid | Rest], SQL) ->
  496. try ecpool_worker:client(WorkerPid) of
  497. {ok, Conn} ->
  498. case epgsql:parse2(Conn, "", SQL, []) of
  499. {error, {_, _, _, undefined_table, _, _}} ->
  500. {error, undefined_table};
  501. Res when is_tuple(Res) andalso ok == element(1, Res) ->
  502. ok;
  503. Res ->
  504. ?tp(postgres_connector_bad_parse2, #{result => Res}),
  505. validate_table_existence(Rest, SQL)
  506. end;
  507. _ ->
  508. validate_table_existence(Rest, SQL)
  509. catch
  510. exit:{noproc, _} ->
  511. validate_table_existence(Rest, SQL)
  512. end;
  513. validate_table_existence([], _SQL) ->
  514. %% All workers either replied an unexpected error; we will retry
  515. %% on the next health check.
  516. ok.
  517. %% ===================================================================
  518. connect(Opts) ->
  519. Host = proplists:get_value(host, Opts),
  520. Username = proplists:get_value(username, Opts),
  521. %% TODO: teach `epgsql` to accept 0-arity closures as passwords.
  522. Password = emqx_secret:unwrap(proplists:get_value(password, Opts)),
  523. case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of
  524. {ok, _Conn} = Ok ->
  525. Ok;
  526. {error, Reason} ->
  527. {error, Reason}
  528. end.
  529. query(Conn, SQL, Params) ->
  530. case epgsql:equery(Conn, SQL, Params) of
  531. {error, sync_required} = Res ->
  532. ok = epgsql:sync(Conn),
  533. Res;
  534. Res ->
  535. Res
  536. end.
  537. prepared_query(Conn, Name, Params) ->
  538. case epgsql:prepared_query2(Conn, Name, Params) of
  539. {error, sync_required} = Res ->
  540. ok = epgsql:sync(Conn),
  541. Res;
  542. Res ->
  543. Res
  544. end.
  545. execute_batch(Conn, Statement, Params) ->
  546. case epgsql:execute_batch(Conn, Statement, Params) of
  547. {error, sync_required} = Res ->
  548. ok = epgsql:sync(Conn),
  549. Res;
  550. Res ->
  551. Res
  552. end.
  553. conn_opts(Opts) ->
  554. conn_opts(Opts, []).
  555. conn_opts([], Acc) ->
  556. Acc;
  557. conn_opts([Opt = {database, _} | Opts], Acc) ->
  558. conn_opts(Opts, [Opt | Acc]);
  559. conn_opts([{ssl, Bool} | Opts], Acc) when is_boolean(Bool) ->
  560. Flag =
  561. case Bool of
  562. true -> required;
  563. false -> false
  564. end,
  565. conn_opts(Opts, [{ssl, Flag} | Acc]);
  566. conn_opts([Opt = {port, _} | Opts], Acc) ->
  567. conn_opts(Opts, [Opt | Acc]);
  568. conn_opts([Opt = {timeout, _} | Opts], Acc) ->
  569. conn_opts(Opts, [Opt | Acc]);
  570. conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
  571. conn_opts(Opts, [Opt | Acc]);
  572. conn_opts([_Opt | Opts], Acc) ->
  573. conn_opts(Opts, Acc).
  574. parse_sql_template(Config, SQLID) ->
  575. Queries =
  576. case Config of
  577. #{prepare_statement := Qs} ->
  578. Qs;
  579. #{sql := Query} ->
  580. #{SQLID => Query};
  581. #{} ->
  582. #{}
  583. end,
  584. Templates = maps:fold(fun parse_sql_template/3, #{}, Queries),
  585. #{query_templates => Templates}.
  586. parse_sql_template(Key, Query, Acc) ->
  587. Template = emqx_template_sql:parse_prepstmt(Query, #{parameters => '$n'}),
  588. Acc#{Key => Template}.
  589. render_prepare_sql_row(RowTemplate, Data) ->
  590. % NOTE: ignoring errors here, missing variables will be replaced with `null`.
  591. {Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, {emqx_jsonish, Data}),
  592. Row.
  593. init_prepare(State = #{prepares := disabled}) ->
  594. State;
  595. init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 ->
  596. State;
  597. init_prepare(State = #{}) ->
  598. case prepare_sql(State) of
  599. {ok, PrepStatements} ->
  600. State#{prepares => PrepStatements};
  601. Error ->
  602. TranslatedError = translate_to_log_context(Error),
  603. ?SLOG(
  604. error,
  605. maps:merge(
  606. #{msg => <<"postgresql_init_prepare_statement_failed">>},
  607. TranslatedError
  608. )
  609. ),
  610. %% mark the prepares failed
  611. State#{prepares => {error, export_error(TranslatedError)}}
  612. end.
  613. prepare_sql(#{query_templates := Templates, pool_name := PoolName}) ->
  614. prepare_sql(maps:to_list(Templates), PoolName).
  615. prepare_sql(Templates, PoolName) ->
  616. case do_prepare_sql(Templates, PoolName) of
  617. {ok, _Sts} = Ok ->
  618. %% prepare for reconnect
  619. ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]}),
  620. Ok;
  621. Error ->
  622. Error
  623. end.
  624. do_prepare_sql(Templates, PoolName) ->
  625. do_prepare_sql(ecpool:workers(PoolName), Templates, #{}).
  626. do_prepare_sql([{_Name, Worker} | Rest], Templates, _LastSts) ->
  627. {ok, Conn} = ecpool_worker:client(Worker),
  628. case prepare_sql_to_conn(Conn, Templates) of
  629. {ok, Sts} ->
  630. do_prepare_sql(Rest, Templates, Sts);
  631. Error ->
  632. Error
  633. end;
  634. do_prepare_sql([], _Prepares, LastSts) ->
  635. {ok, LastSts}.
  636. prepare_sql_to_conn(Conn, Prepares) ->
  637. prepare_sql_to_conn(Conn, Prepares, #{}, 0).
  638. prepare_sql_to_conn(Conn, [], Statements, _Attempts) when is_pid(Conn) ->
  639. {ok, Statements};
  640. prepare_sql_to_conn(Conn, [{_Key, _} | _Rest], _Statements, _MaxAttempts = 2) when is_pid(Conn) ->
  641. failed_to_remove_prev_prepared_statement_error();
  642. prepare_sql_to_conn(
  643. Conn, [{Key, {SQL, _RowTemplate}} | Rest] = ToPrepare, Statements, Attempts
  644. ) when is_pid(Conn) ->
  645. LogMeta = #{msg => "postgresql_prepare_statement", name => Key, sql => SQL},
  646. ?SLOG(info, LogMeta),
  647. case epgsql:parse2(Conn, Key, SQL, []) of
  648. {ok, Statement} ->
  649. prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}, 0);
  650. {error, #error{severity = error, codename = undefined_table} = Error} ->
  651. %% Target table is not created
  652. ?tp(pgsql_undefined_table, #{}),
  653. LogMsg =
  654. maps:merge(
  655. LogMeta#{msg => "postgresql_parse_failed"},
  656. translate_to_log_context(Error)
  657. ),
  658. ?SLOG(error, LogMsg),
  659. {error, undefined_table};
  660. {error, #error{severity = error, codename = duplicate_prepared_statement}} = Error ->
  661. ?tp(pgsql_prepared_statement_exists, #{}),
  662. LogMsg =
  663. maps:merge(
  664. LogMeta#{
  665. msg => "postgresql_prepared_statment_with_same_name_already_exists",
  666. explain => <<
  667. "A prepared statement with the same name already "
  668. "exists in the driver. Will attempt to remove the "
  669. "previous prepared statement with the name and then "
  670. "try again."
  671. >>
  672. },
  673. translate_to_log_context(Error)
  674. ),
  675. ?SLOG(warning, LogMsg),
  676. case epgsql:close(Conn, statement, Key) of
  677. ok ->
  678. ?SLOG(info, #{msg => "pqsql_closed_statement_successfully"}),
  679. prepare_sql_to_conn(Conn, ToPrepare, Statements, Attempts + 1);
  680. {error, CloseError} ->
  681. ?SLOG(error, #{msg => "pqsql_close_statement_failed", cause => CloseError}),
  682. failed_to_remove_prev_prepared_statement_error()
  683. end;
  684. {error, Error} ->
  685. TranslatedError = translate_to_log_context(Error),
  686. LogMsg =
  687. maps:merge(
  688. LogMeta#{msg => "postgresql_parse_failed"},
  689. TranslatedError
  690. ),
  691. ?SLOG(error, LogMsg),
  692. {error, export_error(TranslatedError)}
  693. end.
  694. failed_to_remove_prev_prepared_statement_error() ->
  695. Msg =
  696. ("A previous prepared statement for the action already exists "
  697. "but cannot be closed. Please, try to disable and then enable "
  698. "the connector to resolve this issue."),
  699. {error, unicode:characters_to_binary(Msg)}.
  700. to_bin(Bin) when is_binary(Bin) ->
  701. Bin;
  702. to_bin(Atom) when is_atom(Atom) ->
  703. erlang:atom_to_binary(Atom).
  704. handle_result({error, {recoverable_error, _Error}} = Res) ->
  705. Res;
  706. handle_result({error, {unrecoverable_error, _Error}} = Res) ->
  707. Res;
  708. handle_result({error, disconnected}) ->
  709. {error, {recoverable_error, disconnected}};
  710. handle_result({error, Error}) ->
  711. TranslatedError = translate_to_log_context(Error),
  712. {error, {unrecoverable_error, export_error(TranslatedError)}};
  713. handle_result(Res) ->
  714. Res.
  715. on_format_query_result({ok, Cnt}) when is_integer(Cnt) ->
  716. #{result => ok, affected_rows => Cnt};
  717. on_format_query_result(Res) ->
  718. Res.
  719. handle_batch_result([{ok, Count} | Rest], Acc) ->
  720. handle_batch_result(Rest, Acc + Count);
  721. handle_batch_result([{error, Error} | _Rest], _Acc) ->
  722. TranslatedError = translate_to_log_context(Error),
  723. {error, {unrecoverable_error, export_error(TranslatedError)}};
  724. handle_batch_result([], Acc) ->
  725. {ok, Acc}.
  726. translate_to_log_context({error, Reason}) ->
  727. translate_to_log_context(Reason);
  728. translate_to_log_context(#error{} = Reason) ->
  729. #error{
  730. severity = Severity,
  731. code = Code,
  732. codename = Codename,
  733. message = Message,
  734. extra = Extra
  735. } = Reason,
  736. #{
  737. driver_severity => Severity,
  738. driver_error_codename => Codename,
  739. driver_error_code => Code,
  740. driver_error_message => emqx_logger_textfmt:try_format_unicode(Message),
  741. driver_error_extra => Extra
  742. };
  743. translate_to_log_context(Reason) ->
  744. #{reason => Reason}.
  745. export_error(#{
  746. driver_severity := Severity,
  747. driver_error_codename := Codename,
  748. driver_error_code := Code
  749. }) ->
  750. %% Extra information has already been logged.
  751. #{
  752. error_code => Code,
  753. error_codename => Codename,
  754. severity => Severity
  755. };
  756. export_error(#{reason := Reason}) ->
  757. Reason;
  758. export_error(Error) ->
  759. Error.