emqx_postgresql.erl 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_postgresql).
  17. -include("emqx_postgresql.hrl").
  18. -include_lib("emqx_connector/include/emqx_connector.hrl").
  19. -include_lib("typerefl/include/types.hrl").
  20. -include_lib("emqx/include/logger.hrl").
  21. -include_lib("hocon/include/hoconsc.hrl").
  22. -include_lib("epgsql/include/epgsql.hrl").
  23. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  24. -export([roots/0, fields/1, namespace/0]).
  25. -behaviour(emqx_resource).
  26. %% callbacks of behaviour emqx_resource
  27. -export([
  28. callback_mode/0,
  29. on_start/2,
  30. on_stop/2,
  31. on_query/3,
  32. on_batch_query/3,
  33. on_get_status/2,
  34. on_add_channel/4,
  35. on_remove_channel/3,
  36. on_get_channels/1,
  37. on_get_channel_status/3,
  38. on_format_query_result/1
  39. ]).
  40. -export([connect/1]).
  41. -export([
  42. query/3,
  43. prepared_query/3,
  44. execute_batch/3
  45. ]).
  46. %% for ecpool workers usage
  47. -export([do_get_status/1, prepare_sql_to_conn/2]).
  48. -define(PGSQL_HOST_OPTIONS, #{
  49. default_port => ?PGSQL_DEFAULT_PORT
  50. }).
  51. -type template() :: {unicode:chardata(), emqx_template_sql:row_template()}.
  52. -type state() ::
  53. #{
  54. pool_name := binary(),
  55. query_templates := #{binary() => template()},
  56. prepares := #{binary() => epgsql:statement()} | {error, _}
  57. }.
  58. %% FIXME: add `{error, sync_required}' to `epgsql:execute_batch'
  59. %% We want to be able to call sync if any message from the backend leaves the driver in an
  60. %% inconsistent state needing sync.
  61. -dialyzer({nowarn_function, [execute_batch/3]}).
  62. %%=====================================================================
  63. namespace() -> postgres.
  64. roots() ->
  65. [{config, #{type => hoconsc:ref(?MODULE, config)}}].
  66. fields(config) ->
  67. [{server, server()}] ++
  68. adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
  69. emqx_connector_schema_lib:ssl_fields() ++
  70. emqx_connector_schema_lib:prepare_statement_fields().
  71. server() ->
  72. Meta = #{desc => ?DESC("server")},
  73. emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS).
  74. adjust_fields(Fields) ->
  75. lists:map(
  76. fun
  77. ({username, Sc}) ->
  78. %% to please dialyzer...
  79. Override = #{type => hocon_schema:field_schema(Sc, type), required => true},
  80. {username, hocon_schema:override(Sc, Override)};
  81. (Field) ->
  82. Field
  83. end,
  84. Fields
  85. ).
  86. %% ===================================================================
  87. callback_mode() -> always_sync.
  88. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.
  89. on_start(
  90. InstId,
  91. #{
  92. server := Server,
  93. database := DB,
  94. username := User,
  95. pool_size := PoolSize,
  96. ssl := SSL
  97. } = Config
  98. ) ->
  99. #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?PGSQL_HOST_OPTIONS),
  100. ?SLOG(info, #{
  101. msg => "starting_postgresql_connector",
  102. connector => InstId,
  103. config => emqx_utils:redact(Config)
  104. }),
  105. SslOpts =
  106. case maps:get(enable, SSL) of
  107. true ->
  108. [
  109. %% note: this is converted to `required' in
  110. %% `conn_opts/2', and there's a boolean guard
  111. %% there; if this is set to `required' here,
  112. %% that'll require changing `conn_opts/2''s guard.
  113. {ssl, true},
  114. {ssl_opts, emqx_tls_lib:to_client_opts(SSL)}
  115. ];
  116. false ->
  117. [{ssl, false}]
  118. end,
  119. Options = [
  120. {host, Host},
  121. {port, Port},
  122. {username, User},
  123. {password, maps:get(password, Config, emqx_secret:wrap(""))},
  124. {database, DB},
  125. {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
  126. {pool_size, PoolSize}
  127. ],
  128. State1 = parse_prepare_sql(Config, <<"send_message">>),
  129. State2 = State1#{installed_channels => #{}},
  130. case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
  131. ok ->
  132. {ok, init_prepare(State2#{pool_name => InstId, prepares => #{}})};
  133. {error, Reason} ->
  134. ?tp(
  135. pgsql_connector_start_failed,
  136. #{error => Reason}
  137. ),
  138. {error, Reason}
  139. end.
  140. on_stop(InstId, State) ->
  141. ?SLOG(info, #{
  142. msg => "stopping_postgresql_connector",
  143. connector => InstId
  144. }),
  145. close_connections(State),
  146. Res = emqx_resource_pool:stop(InstId),
  147. ?tp(postgres_stopped, #{instance_id => InstId}),
  148. Res.
  149. close_connections(#{pool_name := PoolName} = _State) ->
  150. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  151. close_connections_with_worker_pids(WorkerPids),
  152. ok.
  153. close_connections_with_worker_pids([WorkerPid | Rest]) ->
  154. %% We ignore errors since any error probably means that the
  155. %% connection is closed already.
  156. try ecpool_worker:client(WorkerPid) of
  157. {ok, Conn} ->
  158. _ = epgsql:close(Conn),
  159. close_connections_with_worker_pids(Rest);
  160. _ ->
  161. close_connections_with_worker_pids(Rest)
  162. catch
  163. _:_ ->
  164. close_connections_with_worker_pids(Rest)
  165. end;
  166. close_connections_with_worker_pids([]) ->
  167. ok.
  168. on_add_channel(
  169. _InstId,
  170. #{
  171. installed_channels := InstalledChannels
  172. } = OldState,
  173. ChannelId,
  174. ChannelConfig
  175. ) ->
  176. %% The following will throw an exception if the bridge producers fails to start
  177. {ok, ChannelState} = create_channel_state(ChannelId, OldState, ChannelConfig),
  178. case ChannelState of
  179. #{prepares := {error, Reason}} ->
  180. {error, {unhealthy_target, Reason}};
  181. _ ->
  182. NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
  183. %% Update state
  184. NewState = OldState#{installed_channels => NewInstalledChannels},
  185. {ok, NewState}
  186. end.
  187. create_channel_state(
  188. ChannelId,
  189. #{pool_name := PoolName} = _ConnectorState,
  190. #{parameters := Parameters} = _ChannelConfig
  191. ) ->
  192. State1 = parse_prepare_sql(Parameters, ChannelId),
  193. {ok,
  194. init_prepare(State1#{
  195. pool_name => PoolName,
  196. prepare_statement => #{}
  197. })}.
  198. on_remove_channel(
  199. _InstId,
  200. #{
  201. installed_channels := InstalledChannels
  202. } = OldState,
  203. ChannelId
  204. ) ->
  205. %% Close prepared statements
  206. ok = close_prepared_statement(ChannelId, OldState),
  207. NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
  208. %% Update state
  209. NewState = OldState#{installed_channels => NewInstalledChannels},
  210. {ok, NewState}.
  211. close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) ->
  212. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  213. close_prepared_statement(WorkerPids, ChannelId, State),
  214. ok.
  215. close_prepared_statement([WorkerPid | Rest], ChannelId, State) ->
  216. %% We ignore errors since any error probably means that the
  217. %% prepared statement doesn't exist. If it exists when we try
  218. %% to insert one with the same name, we will try to remove it
  219. %% again anyway.
  220. try ecpool_worker:client(WorkerPid) of
  221. {ok, Conn} ->
  222. Statement = get_prepared_statement(ChannelId, State),
  223. _ = epgsql:close(Conn, Statement),
  224. close_prepared_statement(Rest, ChannelId, State);
  225. _ ->
  226. close_prepared_statement(Rest, ChannelId, State)
  227. catch
  228. _:_ ->
  229. close_prepared_statement(Rest, ChannelId, State)
  230. end;
  231. close_prepared_statement([], _ChannelId, _State) ->
  232. ok.
  233. on_get_channel_status(
  234. _ResId,
  235. ChannelId,
  236. #{
  237. pool_name := PoolName,
  238. installed_channels := Channels
  239. } = _State
  240. ) ->
  241. ChannelState = maps:get(ChannelId, Channels),
  242. case
  243. do_check_channel_sql(
  244. PoolName,
  245. ChannelId,
  246. ChannelState
  247. )
  248. of
  249. ok ->
  250. connected;
  251. {error, undefined_table} ->
  252. {error, {unhealthy_target, <<"Table does not exist">>}}
  253. end.
  254. do_check_channel_sql(
  255. PoolName,
  256. ChannelId,
  257. #{query_templates := ChannelQueryTemplates} = _ChannelState
  258. ) ->
  259. {SQL, _RowTemplate} = maps:get(ChannelId, ChannelQueryTemplates),
  260. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  261. validate_table_existence(WorkerPids, SQL).
  262. on_get_channels(ResId) ->
  263. emqx_bridge_v2:get_channels_for_connector(ResId).
  264. on_query(InstId, {TypeOrKey, NameOrSQL}, State) ->
  265. on_query(InstId, {TypeOrKey, NameOrSQL, []}, State);
  266. on_query(
  267. InstId,
  268. {TypeOrKey, NameOrSQL, Params},
  269. #{pool_name := PoolName} = State
  270. ) ->
  271. ?SLOG(debug, #{
  272. msg => "postgresql_connector_received_sql_query",
  273. connector => InstId,
  274. type => TypeOrKey,
  275. sql => NameOrSQL,
  276. state => State
  277. }),
  278. Type = pgsql_query_type(TypeOrKey),
  279. {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
  280. Res = on_sql_query(TypeOrKey, InstId, PoolName, Type, NameOrSQL2, Data),
  281. ?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}),
  282. handle_result(Res).
  283. pgsql_query_type(sql) ->
  284. query;
  285. pgsql_query_type(query) ->
  286. query;
  287. pgsql_query_type(prepared_query) ->
  288. prepared_query;
  289. %% for bridge
  290. pgsql_query_type(_) ->
  291. pgsql_query_type(prepared_query).
  292. on_batch_query(
  293. InstId,
  294. [{Key, _} = Request | _] = BatchReq,
  295. #{pool_name := PoolName} = State
  296. ) ->
  297. BinKey = to_bin(Key),
  298. case get_template(BinKey, State) of
  299. undefined ->
  300. Log = #{
  301. connector => InstId,
  302. first_request => Request,
  303. state => State,
  304. msg => "batch prepare not implemented"
  305. },
  306. ?SLOG(error, Log),
  307. {error, {unrecoverable_error, batch_prepare_not_implemented}};
  308. {_Statement, RowTemplate} ->
  309. PrepStatement = get_prepared_statement(BinKey, State),
  310. Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
  311. case on_sql_query(Key, InstId, PoolName, execute_batch, PrepStatement, Rows) of
  312. {error, _Error} = Result ->
  313. handle_result(Result);
  314. {_Column, Results} ->
  315. handle_batch_result(Results, 0)
  316. end
  317. end;
  318. on_batch_query(InstId, BatchReq, State) ->
  319. ?SLOG(error, #{
  320. connector => InstId,
  321. request => BatchReq,
  322. state => State,
  323. msg => "invalid request"
  324. }),
  325. {error, {unrecoverable_error, invalid_request}}.
  326. proc_sql_params(query, SQLOrKey, Params, _State) ->
  327. {SQLOrKey, Params};
  328. proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
  329. {SQLOrKey, Params};
  330. proc_sql_params(TypeOrKey, SQLOrData, Params, State) ->
  331. BinKey = to_bin(TypeOrKey),
  332. case get_template(BinKey, State) of
  333. undefined ->
  334. {SQLOrData, Params};
  335. {_Statement, RowTemplate} ->
  336. {BinKey, render_prepare_sql_row(RowTemplate, SQLOrData)}
  337. end.
  338. get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) ->
  339. BinKey = to_bin(Key),
  340. ChannelState = maps:get(BinKey, Channels),
  341. ChannelQueryTemplates = maps:get(query_templates, ChannelState),
  342. maps:get(BinKey, ChannelQueryTemplates);
  343. get_template(Key, #{query_templates := Templates}) ->
  344. BinKey = to_bin(Key),
  345. maps:get(BinKey, Templates, undefined).
  346. get_prepared_statement(Key, #{installed_channels := Channels} = _State) when
  347. is_map_key(Key, Channels)
  348. ->
  349. BinKey = to_bin(Key),
  350. ChannelState = maps:get(BinKey, Channels),
  351. ChannelPreparedStatements = maps:get(prepares, ChannelState),
  352. maps:get(BinKey, ChannelPreparedStatements);
  353. get_prepared_statement(Key, #{prepares := PrepStatements}) ->
  354. BinKey = to_bin(Key),
  355. maps:get(BinKey, PrepStatements).
  356. on_sql_query(Key, InstId, PoolName, Type, NameOrSQL, Data) ->
  357. emqx_trace:rendered_action_template(
  358. Key,
  359. #{
  360. statement_type => Type,
  361. statement_or_name => NameOrSQL,
  362. data => Data
  363. }
  364. ),
  365. try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
  366. {error, Reason} ->
  367. ?tp(
  368. pgsql_connector_query_return,
  369. #{error => Reason}
  370. ),
  371. TranslatedError = translate_to_log_context(Reason),
  372. ?SLOG(
  373. error,
  374. maps:merge(
  375. #{
  376. msg => "postgresql_connector_do_sql_query_failed",
  377. connector => InstId,
  378. type => Type,
  379. sql => NameOrSQL
  380. },
  381. TranslatedError
  382. )
  383. ),
  384. case Reason of
  385. sync_required ->
  386. {error, {recoverable_error, Reason}};
  387. ecpool_empty ->
  388. {error, {recoverable_error, Reason}};
  389. {error, error, _, undefined_table, _, _} ->
  390. {error, {unrecoverable_error, export_error(TranslatedError)}};
  391. _ ->
  392. {error, export_error(TranslatedError)}
  393. end;
  394. Result ->
  395. ?tp(
  396. pgsql_connector_query_return,
  397. #{result => Result}
  398. ),
  399. Result
  400. catch
  401. error:function_clause:Stacktrace ->
  402. ?SLOG(error, #{
  403. msg => "postgresql_connector_do_sql_query_failed",
  404. connector => InstId,
  405. type => Type,
  406. sql => NameOrSQL,
  407. reason => function_clause,
  408. stacktrace => Stacktrace
  409. }),
  410. {error, {unrecoverable_error, invalid_request}}
  411. end.
  412. on_get_status(_InstId, #{pool_name := PoolName} = State) ->
  413. case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
  414. true ->
  415. case do_check_prepares(State) of
  416. ok ->
  417. connected;
  418. {ok, NState} ->
  419. %% return new state with prepared statements
  420. {connected, NState};
  421. {error, undefined_table} ->
  422. %% return new state indicating that we are connected but the target table is not created
  423. {disconnected, State, unhealthy_target};
  424. {error, _Reason} ->
  425. %% do not log error, it is logged in prepare_sql_to_conn
  426. connecting
  427. end;
  428. false ->
  429. connecting
  430. end.
  431. do_get_status(Conn) ->
  432. ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")).
  433. do_check_prepares(
  434. #{
  435. pool_name := PoolName,
  436. query_templates := #{<<"send_message">> := {SQL, _RowTemplate}}
  437. }
  438. ) ->
  439. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  440. case validate_table_existence(WorkerPids, SQL) of
  441. ok ->
  442. ok;
  443. {error, Reason} ->
  444. {error, Reason}
  445. end;
  446. do_check_prepares(#{prepares := Prepares}) when is_map(Prepares) ->
  447. ok;
  448. do_check_prepares(#{prepares := {error, _}} = State) ->
  449. %% retry to prepare
  450. case prepare_sql(State) of
  451. {ok, PrepStatements} ->
  452. %% remove the error
  453. {ok, State#{prepares := PrepStatements}};
  454. {error, Reason} ->
  455. {error, Reason}
  456. end.
  457. -spec validate_table_existence([pid()], binary()) -> ok | {error, undefined_table}.
  458. validate_table_existence([WorkerPid | Rest], SQL) ->
  459. try ecpool_worker:client(WorkerPid) of
  460. {ok, Conn} ->
  461. case epgsql:parse2(Conn, "", SQL, []) of
  462. {error, {_, _, _, undefined_table, _, _}} ->
  463. {error, undefined_table};
  464. Res when is_tuple(Res) andalso ok == element(1, Res) ->
  465. ok;
  466. Res ->
  467. ?tp(postgres_connector_bad_parse2, #{result => Res}),
  468. validate_table_existence(Rest, SQL)
  469. end;
  470. _ ->
  471. validate_table_existence(Rest, SQL)
  472. catch
  473. exit:{noproc, _} ->
  474. validate_table_existence(Rest, SQL)
  475. end;
  476. validate_table_existence([], _SQL) ->
  477. %% All workers either replied an unexpected error; we will retry
  478. %% on the next health check.
  479. ok.
  480. %% ===================================================================
  481. connect(Opts) ->
  482. Host = proplists:get_value(host, Opts),
  483. Username = proplists:get_value(username, Opts),
  484. %% TODO: teach `epgsql` to accept 0-arity closures as passwords.
  485. Password = emqx_secret:unwrap(proplists:get_value(password, Opts)),
  486. case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of
  487. {ok, _Conn} = Ok ->
  488. Ok;
  489. {error, Reason} ->
  490. {error, Reason}
  491. end.
  492. query(Conn, SQL, Params) ->
  493. case epgsql:equery(Conn, SQL, Params) of
  494. {error, sync_required} = Res ->
  495. ok = epgsql:sync(Conn),
  496. Res;
  497. Res ->
  498. Res
  499. end.
  500. prepared_query(Conn, Name, Params) ->
  501. case epgsql:prepared_query2(Conn, Name, Params) of
  502. {error, sync_required} = Res ->
  503. ok = epgsql:sync(Conn),
  504. Res;
  505. Res ->
  506. Res
  507. end.
  508. execute_batch(Conn, Statement, Params) ->
  509. case epgsql:execute_batch(Conn, Statement, Params) of
  510. {error, sync_required} = Res ->
  511. ok = epgsql:sync(Conn),
  512. Res;
  513. Res ->
  514. Res
  515. end.
  516. conn_opts(Opts) ->
  517. conn_opts(Opts, []).
  518. conn_opts([], Acc) ->
  519. Acc;
  520. conn_opts([Opt = {database, _} | Opts], Acc) ->
  521. conn_opts(Opts, [Opt | Acc]);
  522. conn_opts([{ssl, Bool} | Opts], Acc) when is_boolean(Bool) ->
  523. Flag =
  524. case Bool of
  525. true -> required;
  526. false -> false
  527. end,
  528. conn_opts(Opts, [{ssl, Flag} | Acc]);
  529. conn_opts([Opt = {port, _} | Opts], Acc) ->
  530. conn_opts(Opts, [Opt | Acc]);
  531. conn_opts([Opt = {timeout, _} | Opts], Acc) ->
  532. conn_opts(Opts, [Opt | Acc]);
  533. conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
  534. conn_opts(Opts, [Opt | Acc]);
  535. conn_opts([_Opt | Opts], Acc) ->
  536. conn_opts(Opts, Acc).
  537. parse_prepare_sql(Config, SQLID) ->
  538. Queries =
  539. case Config of
  540. #{prepare_statement := Qs} ->
  541. Qs;
  542. #{sql := Query} ->
  543. #{SQLID => Query};
  544. #{} ->
  545. #{}
  546. end,
  547. Templates = maps:fold(fun parse_prepare_sql/3, #{}, Queries),
  548. #{query_templates => Templates}.
  549. parse_prepare_sql(Key, Query, Acc) ->
  550. Template = emqx_template_sql:parse_prepstmt(Query, #{parameters => '$n'}),
  551. Acc#{Key => Template}.
  552. render_prepare_sql_row(RowTemplate, Data) ->
  553. % NOTE: ignoring errors here, missing variables will be replaced with `null`.
  554. {Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, {emqx_jsonish, Data}),
  555. Row.
  556. init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 ->
  557. State;
  558. init_prepare(State = #{}) ->
  559. case prepare_sql(State) of
  560. {ok, PrepStatements} ->
  561. State#{prepares => PrepStatements};
  562. Error ->
  563. TranslatedError = translate_to_log_context(Error),
  564. ?SLOG(
  565. error,
  566. maps:merge(
  567. #{msg => <<"postgresql_init_prepare_statement_failed">>},
  568. TranslatedError
  569. )
  570. ),
  571. %% mark the prepares failed
  572. State#{prepares => {error, export_error(TranslatedError)}}
  573. end.
  574. prepare_sql(#{query_templates := Templates, pool_name := PoolName}) ->
  575. prepare_sql(maps:to_list(Templates), PoolName).
  576. prepare_sql(Templates, PoolName) ->
  577. case do_prepare_sql(Templates, PoolName) of
  578. {ok, _Sts} = Ok ->
  579. %% prepare for reconnect
  580. ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]}),
  581. Ok;
  582. Error ->
  583. Error
  584. end.
  585. do_prepare_sql(Templates, PoolName) ->
  586. do_prepare_sql(ecpool:workers(PoolName), Templates, #{}).
  587. do_prepare_sql([{_Name, Worker} | Rest], Templates, _LastSts) ->
  588. {ok, Conn} = ecpool_worker:client(Worker),
  589. case prepare_sql_to_conn(Conn, Templates) of
  590. {ok, Sts} ->
  591. do_prepare_sql(Rest, Templates, Sts);
  592. Error ->
  593. Error
  594. end;
  595. do_prepare_sql([], _Prepares, LastSts) ->
  596. {ok, LastSts}.
  597. prepare_sql_to_conn(Conn, Prepares) ->
  598. prepare_sql_to_conn(Conn, Prepares, #{}, 0).
  599. prepare_sql_to_conn(Conn, [], Statements, _Attempts) when is_pid(Conn) ->
  600. {ok, Statements};
  601. prepare_sql_to_conn(Conn, [{Key, _} | _Rest], _Statements, _MaxAttempts = 3) when is_pid(Conn) ->
  602. {error, {failed_to_remove_prev_prepared_statement, Key}};
  603. prepare_sql_to_conn(
  604. Conn, [{Key, {SQL, _RowTemplate}} | Rest] = ToPrepare, Statements, Attempts
  605. ) when is_pid(Conn) ->
  606. LogMeta = #{msg => "postgresql_prepare_statement", name => Key, sql => SQL},
  607. ?SLOG(info, LogMeta),
  608. case epgsql:parse2(Conn, Key, SQL, []) of
  609. {ok, Statement} ->
  610. prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}, 0);
  611. {error, #error{severity = error, codename = undefined_table} = Error} ->
  612. %% Target table is not created
  613. ?tp(pgsql_undefined_table, #{}),
  614. LogMsg =
  615. maps:merge(
  616. LogMeta#{msg => "postgresql_parse_failed"},
  617. translate_to_log_context(Error)
  618. ),
  619. ?SLOG(error, LogMsg),
  620. {error, undefined_table};
  621. {error, #error{severity = error, codename = duplicate_prepared_statement}} = Error ->
  622. ?tp(pgsql_prepared_statement_exists, #{}),
  623. LogMsg =
  624. maps:merge(
  625. LogMeta#{
  626. msg => "postgresql_prepared_statment_with_same_name_already_exists",
  627. explain => <<
  628. "A prepared statement with the same name already "
  629. "exists in the driver. Will attempt to remove the "
  630. "previous prepared statement with the name and then "
  631. "try again."
  632. >>
  633. },
  634. translate_to_log_context(Error)
  635. ),
  636. ?SLOG(warning, LogMsg),
  637. case epgsql:close(Conn, statement, Key) of
  638. ok ->
  639. ?SLOG(info, #{msg => "pqsql_closed_statement_successfully"});
  640. {error, CloseError} ->
  641. ?SLOG(warning, #{msg => "pqsql_close_statement_failed", cause => CloseError})
  642. end,
  643. prepare_sql_to_conn(Conn, ToPrepare, Statements, Attempts + 1);
  644. {error, Error} ->
  645. TranslatedError = translate_to_log_context(Error),
  646. LogMsg =
  647. maps:merge(
  648. LogMeta#{msg => "postgresql_parse_failed"},
  649. TranslatedError
  650. ),
  651. ?SLOG(error, LogMsg),
  652. {error, export_error(TranslatedError)}
  653. end.
  654. to_bin(Bin) when is_binary(Bin) ->
  655. Bin;
  656. to_bin(Atom) when is_atom(Atom) ->
  657. erlang:atom_to_binary(Atom).
  658. handle_result({error, {recoverable_error, _Error}} = Res) ->
  659. Res;
  660. handle_result({error, {unrecoverable_error, _Error}} = Res) ->
  661. Res;
  662. handle_result({error, disconnected}) ->
  663. {error, {recoverable_error, disconnected}};
  664. handle_result({error, Error}) ->
  665. TranslatedError = translate_to_log_context(Error),
  666. {error, {unrecoverable_error, export_error(TranslatedError)}};
  667. handle_result(Res) ->
  668. Res.
  669. on_format_query_result({ok, Cnt}) when is_integer(Cnt) ->
  670. #{result => ok, affected_rows => Cnt};
  671. on_format_query_result(Res) ->
  672. Res.
  673. handle_batch_result([{ok, Count} | Rest], Acc) ->
  674. handle_batch_result(Rest, Acc + Count);
  675. handle_batch_result([{error, Error} | _Rest], _Acc) ->
  676. TranslatedError = translate_to_log_context(Error),
  677. {error, {unrecoverable_error, export_error(TranslatedError)}};
  678. handle_batch_result([], Acc) ->
  679. {ok, Acc}.
  680. translate_to_log_context({error, Reason}) ->
  681. translate_to_log_context(Reason);
  682. translate_to_log_context(#error{} = Reason) ->
  683. #error{
  684. severity = Severity,
  685. code = Code,
  686. codename = Codename,
  687. message = Message,
  688. extra = Extra
  689. } = Reason,
  690. #{
  691. driver_severity => Severity,
  692. driver_error_codename => Codename,
  693. driver_error_code => Code,
  694. driver_error_message => emqx_logger_textfmt:try_format_unicode(Message),
  695. driver_error_extra => Extra
  696. };
  697. translate_to_log_context(Reason) ->
  698. #{reason => Reason}.
  699. export_error(#{
  700. driver_severity := Severity,
  701. driver_error_codename := Codename,
  702. driver_error_code := Code
  703. }) ->
  704. %% Extra information has already been logged.
  705. #{
  706. error_code => Code,
  707. error_codename => Codename,
  708. severity => Severity
  709. };
  710. export_error(#{reason := Reason}) ->
  711. Reason;
  712. export_error(Error) ->
  713. Error.