emqx_postgresql.erl 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_postgresql).
  17. -include("emqx_postgresql.hrl").
  18. -include_lib("emqx_connector/include/emqx_connector.hrl").
  19. -include_lib("typerefl/include/types.hrl").
  20. -include_lib("emqx/include/logger.hrl").
  21. -include_lib("hocon/include/hoconsc.hrl").
  22. -include_lib("epgsql/include/epgsql.hrl").
  23. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  24. -export([roots/0, fields/1, namespace/0]).
  25. -behaviour(emqx_resource).
  26. %% callbacks of behaviour emqx_resource
  27. -export([
  28. callback_mode/0,
  29. on_start/2,
  30. on_stop/2,
  31. on_query/3,
  32. on_batch_query/3,
  33. on_get_status/2,
  34. on_add_channel/4,
  35. on_remove_channel/3,
  36. on_get_channels/1,
  37. on_get_channel_status/3
  38. ]).
  39. -export([connect/1]).
  40. -export([
  41. query/3,
  42. prepared_query/3,
  43. execute_batch/3
  44. ]).
  45. %% for ecpool workers usage
  46. -export([do_get_status/1, prepare_sql_to_conn/2]).
  47. -define(PGSQL_HOST_OPTIONS, #{
  48. default_port => ?PGSQL_DEFAULT_PORT
  49. }).
  50. -type template() :: {unicode:chardata(), emqx_template_sql:row_template()}.
  51. -type state() ::
  52. #{
  53. pool_name := binary(),
  54. query_templates := #{binary() => template()},
  55. prepares := #{binary() => epgsql:statement()} | {error, _}
  56. }.
  57. %% FIXME: add `{error, sync_required}' to `epgsql:execute_batch'
  58. %% We want to be able to call sync if any message from the backend leaves the driver in an
  59. %% inconsistent state needing sync.
  60. -dialyzer({nowarn_function, [execute_batch/3]}).
  61. %%=====================================================================
  62. namespace() -> postgres.
  63. roots() ->
  64. [{config, #{type => hoconsc:ref(?MODULE, config)}}].
  65. fields(config) ->
  66. [{server, server()}] ++
  67. adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
  68. emqx_connector_schema_lib:ssl_fields() ++
  69. emqx_connector_schema_lib:prepare_statement_fields().
  70. server() ->
  71. Meta = #{desc => ?DESC("server")},
  72. emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS).
  73. adjust_fields(Fields) ->
  74. lists:map(
  75. fun
  76. ({username, Sc}) ->
  77. %% to please dialyzer...
  78. Override = #{type => hocon_schema:field_schema(Sc, type), required => true},
  79. {username, hocon_schema:override(Sc, Override)};
  80. (Field) ->
  81. Field
  82. end,
  83. Fields
  84. ).
  85. %% ===================================================================
  86. callback_mode() -> always_sync.
  87. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.
  88. on_start(
  89. InstId,
  90. #{
  91. server := Server,
  92. database := DB,
  93. username := User,
  94. pool_size := PoolSize,
  95. ssl := SSL
  96. } = Config
  97. ) ->
  98. #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?PGSQL_HOST_OPTIONS),
  99. ?SLOG(info, #{
  100. msg => "starting_postgresql_connector",
  101. connector => InstId,
  102. config => emqx_utils:redact(Config)
  103. }),
  104. SslOpts =
  105. case maps:get(enable, SSL) of
  106. true ->
  107. [
  108. %% note: this is converted to `required' in
  109. %% `conn_opts/2', and there's a boolean guard
  110. %% there; if this is set to `required' here,
  111. %% that'll require changing `conn_opts/2''s guard.
  112. {ssl, true},
  113. {ssl_opts, emqx_tls_lib:to_client_opts(SSL)}
  114. ];
  115. false ->
  116. [{ssl, false}]
  117. end,
  118. Options = [
  119. {host, Host},
  120. {port, Port},
  121. {username, User},
  122. {password, maps:get(password, Config, emqx_secret:wrap(""))},
  123. {database, DB},
  124. {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
  125. {pool_size, PoolSize}
  126. ],
  127. State1 = parse_prepare_sql(Config, <<"send_message">>),
  128. State2 = State1#{installed_channels => #{}},
  129. case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
  130. ok ->
  131. {ok, init_prepare(State2#{pool_name => InstId, prepares => #{}})};
  132. {error, Reason} ->
  133. ?tp(
  134. pgsql_connector_start_failed,
  135. #{error => Reason}
  136. ),
  137. {error, Reason}
  138. end.
  139. on_stop(InstId, State) ->
  140. ?SLOG(info, #{
  141. msg => "stopping_postgresql_connector",
  142. connector => InstId
  143. }),
  144. close_connections(State),
  145. Res = emqx_resource_pool:stop(InstId),
  146. ?tp(postgres_stopped, #{instance_id => InstId}),
  147. Res.
  148. close_connections(#{pool_name := PoolName} = _State) ->
  149. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  150. close_connections_with_worker_pids(WorkerPids),
  151. ok.
  152. close_connections_with_worker_pids([WorkerPid | Rest]) ->
  153. %% We ignore errors since any error probably means that the
  154. %% connection is closed already.
  155. try ecpool_worker:client(WorkerPid) of
  156. {ok, Conn} ->
  157. _ = epgsql:close(Conn),
  158. close_connections_with_worker_pids(Rest);
  159. _ ->
  160. close_connections_with_worker_pids(Rest)
  161. catch
  162. _:_ ->
  163. close_connections_with_worker_pids(Rest)
  164. end;
  165. close_connections_with_worker_pids([]) ->
  166. ok.
  167. on_add_channel(
  168. _InstId,
  169. #{
  170. installed_channels := InstalledChannels
  171. } = OldState,
  172. ChannelId,
  173. ChannelConfig
  174. ) ->
  175. %% The following will throw an exception if the bridge producers fails to start
  176. {ok, ChannelState} = create_channel_state(ChannelId, OldState, ChannelConfig),
  177. case ChannelState of
  178. #{prepares := {error, Reason}} ->
  179. {error, {unhealthy_target, Reason}};
  180. _ ->
  181. NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
  182. %% Update state
  183. NewState = OldState#{installed_channels => NewInstalledChannels},
  184. {ok, NewState}
  185. end.
  186. create_channel_state(
  187. ChannelId,
  188. #{pool_name := PoolName} = _ConnectorState,
  189. #{parameters := Parameters} = _ChannelConfig
  190. ) ->
  191. State1 = parse_prepare_sql(Parameters, ChannelId),
  192. {ok,
  193. init_prepare(State1#{
  194. pool_name => PoolName,
  195. prepare_statement => #{}
  196. })}.
  197. on_remove_channel(
  198. _InstId,
  199. #{
  200. installed_channels := InstalledChannels
  201. } = OldState,
  202. ChannelId
  203. ) ->
  204. %% Close prepared statements
  205. ok = close_prepared_statement(ChannelId, OldState),
  206. NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
  207. %% Update state
  208. NewState = OldState#{installed_channels => NewInstalledChannels},
  209. {ok, NewState}.
  210. close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) ->
  211. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  212. close_prepared_statement(WorkerPids, ChannelId, State),
  213. ok.
  214. close_prepared_statement([WorkerPid | Rest], ChannelId, State) ->
  215. %% We ignore errors since any error probably means that the
  216. %% prepared statement doesn't exist.
  217. try ecpool_worker:client(WorkerPid) of
  218. {ok, Conn} ->
  219. Statement = get_prepared_statement(ChannelId, State),
  220. _ = epgsql:close(Conn, Statement),
  221. close_prepared_statement(Rest, ChannelId, State);
  222. _ ->
  223. close_prepared_statement(Rest, ChannelId, State)
  224. catch
  225. _:_ ->
  226. close_prepared_statement(Rest, ChannelId, State)
  227. end;
  228. close_prepared_statement([], _ChannelId, _State) ->
  229. ok.
  230. on_get_channel_status(
  231. _ResId,
  232. ChannelId,
  233. #{
  234. pool_name := PoolName,
  235. installed_channels := Channels
  236. } = _State
  237. ) ->
  238. ChannelState = maps:get(ChannelId, Channels),
  239. case
  240. do_check_channel_sql(
  241. PoolName,
  242. ChannelId,
  243. ChannelState
  244. )
  245. of
  246. ok ->
  247. connected;
  248. {error, undefined_table} ->
  249. {error, {unhealthy_target, <<"Table does not exist">>}}
  250. end.
  251. do_check_channel_sql(
  252. PoolName,
  253. ChannelId,
  254. #{query_templates := ChannelQueryTemplates} = _ChannelState
  255. ) ->
  256. {SQL, _RowTemplate} = maps:get(ChannelId, ChannelQueryTemplates),
  257. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  258. validate_table_existence(WorkerPids, SQL).
  259. on_get_channels(ResId) ->
  260. emqx_bridge_v2:get_channels_for_connector(ResId).
  261. on_query(InstId, {TypeOrKey, NameOrSQL}, State) ->
  262. on_query(InstId, {TypeOrKey, NameOrSQL, []}, State);
  263. on_query(
  264. InstId,
  265. {TypeOrKey, NameOrSQL, Params},
  266. #{pool_name := PoolName} = State
  267. ) ->
  268. ?SLOG(debug, #{
  269. msg => "postgresql_connector_received_sql_query",
  270. connector => InstId,
  271. type => TypeOrKey,
  272. sql => NameOrSQL,
  273. state => State
  274. }),
  275. Type = pgsql_query_type(TypeOrKey),
  276. {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
  277. Res = on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data),
  278. ?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}),
  279. handle_result(Res).
  280. pgsql_query_type(sql) ->
  281. query;
  282. pgsql_query_type(query) ->
  283. query;
  284. pgsql_query_type(prepared_query) ->
  285. prepared_query;
  286. %% for bridge
  287. pgsql_query_type(_) ->
  288. pgsql_query_type(prepared_query).
  289. on_batch_query(
  290. InstId,
  291. [{Key, _} = Request | _] = BatchReq,
  292. #{pool_name := PoolName} = State
  293. ) ->
  294. BinKey = to_bin(Key),
  295. case get_template(BinKey, State) of
  296. undefined ->
  297. Log = #{
  298. connector => InstId,
  299. first_request => Request,
  300. state => State,
  301. msg => "batch prepare not implemented"
  302. },
  303. ?SLOG(error, Log),
  304. {error, {unrecoverable_error, batch_prepare_not_implemented}};
  305. {_Statement, RowTemplate} ->
  306. PrepStatement = get_prepared_statement(BinKey, State),
  307. Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
  308. case on_sql_query(InstId, PoolName, execute_batch, PrepStatement, Rows) of
  309. {error, _Error} = Result ->
  310. handle_result(Result);
  311. {_Column, Results} ->
  312. handle_batch_result(Results, 0)
  313. end
  314. end;
  315. on_batch_query(InstId, BatchReq, State) ->
  316. ?SLOG(error, #{
  317. connector => InstId,
  318. request => BatchReq,
  319. state => State,
  320. msg => "invalid request"
  321. }),
  322. {error, {unrecoverable_error, invalid_request}}.
  323. proc_sql_params(query, SQLOrKey, Params, _State) ->
  324. {SQLOrKey, Params};
  325. proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
  326. {SQLOrKey, Params};
  327. proc_sql_params(TypeOrKey, SQLOrData, Params, State) ->
  328. BinKey = to_bin(TypeOrKey),
  329. case get_template(BinKey, State) of
  330. undefined ->
  331. {SQLOrData, Params};
  332. {_Statement, RowTemplate} ->
  333. {BinKey, render_prepare_sql_row(RowTemplate, SQLOrData)}
  334. end.
  335. get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) ->
  336. BinKey = to_bin(Key),
  337. ChannelState = maps:get(BinKey, Channels),
  338. ChannelQueryTemplates = maps:get(query_templates, ChannelState),
  339. maps:get(BinKey, ChannelQueryTemplates);
  340. get_template(Key, #{query_templates := Templates}) ->
  341. BinKey = to_bin(Key),
  342. maps:get(BinKey, Templates, undefined).
  343. get_prepared_statement(Key, #{installed_channels := Channels} = _State) when
  344. is_map_key(Key, Channels)
  345. ->
  346. BinKey = to_bin(Key),
  347. ChannelState = maps:get(BinKey, Channels),
  348. ChannelPreparedStatements = maps:get(prepares, ChannelState),
  349. maps:get(BinKey, ChannelPreparedStatements);
  350. get_prepared_statement(Key, #{prepares := PrepStatements}) ->
  351. BinKey = to_bin(Key),
  352. maps:get(BinKey, PrepStatements).
  353. on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
  354. try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
  355. {error, Reason} ->
  356. ?tp(
  357. pgsql_connector_query_return,
  358. #{error => Reason}
  359. ),
  360. TranslatedError = translate_to_log_context(Reason),
  361. ?SLOG(
  362. error,
  363. maps:merge(
  364. #{
  365. msg => "postgresql_connector_do_sql_query_failed",
  366. connector => InstId,
  367. type => Type,
  368. sql => NameOrSQL
  369. },
  370. TranslatedError
  371. )
  372. ),
  373. case Reason of
  374. sync_required ->
  375. {error, {recoverable_error, Reason}};
  376. ecpool_empty ->
  377. {error, {recoverable_error, Reason}};
  378. {error, error, _, undefined_table, _, _} ->
  379. {error, {unrecoverable_error, export_error(TranslatedError)}};
  380. _ ->
  381. {error, export_error(TranslatedError)}
  382. end;
  383. Result ->
  384. ?tp(
  385. pgsql_connector_query_return,
  386. #{result => Result}
  387. ),
  388. Result
  389. catch
  390. error:function_clause:Stacktrace ->
  391. ?SLOG(error, #{
  392. msg => "postgresql_connector_do_sql_query_failed",
  393. connector => InstId,
  394. type => Type,
  395. sql => NameOrSQL,
  396. reason => function_clause,
  397. stacktrace => Stacktrace
  398. }),
  399. {error, {unrecoverable_error, invalid_request}}
  400. end.
  401. on_get_status(_InstId, #{pool_name := PoolName} = State) ->
  402. case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
  403. true ->
  404. case do_check_prepares(State) of
  405. ok ->
  406. connected;
  407. {ok, NState} ->
  408. %% return new state with prepared statements
  409. {connected, NState};
  410. {error, undefined_table} ->
  411. %% return new state indicating that we are connected but the target table is not created
  412. {disconnected, State, unhealthy_target};
  413. {error, _Reason} ->
  414. %% do not log error, it is logged in prepare_sql_to_conn
  415. connecting
  416. end;
  417. false ->
  418. connecting
  419. end.
  420. do_get_status(Conn) ->
  421. ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")).
  422. do_check_prepares(
  423. #{
  424. pool_name := PoolName,
  425. query_templates := #{<<"send_message">> := {SQL, _RowTemplate}}
  426. }
  427. ) ->
  428. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  429. case validate_table_existence(WorkerPids, SQL) of
  430. ok ->
  431. ok;
  432. {error, Reason} ->
  433. {error, Reason}
  434. end;
  435. do_check_prepares(#{prepares := Prepares}) when is_map(Prepares) ->
  436. ok;
  437. do_check_prepares(#{prepares := {error, _}} = State) ->
  438. %% retry to prepare
  439. case prepare_sql(State) of
  440. {ok, PrepStatements} ->
  441. %% remove the error
  442. {ok, State#{prepares := PrepStatements}};
  443. {error, Reason} ->
  444. {error, Reason}
  445. end.
  446. -spec validate_table_existence([pid()], binary()) -> ok | {error, undefined_table}.
  447. validate_table_existence([WorkerPid | Rest], SQL) ->
  448. try ecpool_worker:client(WorkerPid) of
  449. {ok, Conn} ->
  450. case epgsql:parse2(Conn, "", SQL, []) of
  451. {error, {_, _, _, undefined_table, _, _}} ->
  452. {error, undefined_table};
  453. Res when is_tuple(Res) andalso ok == element(1, Res) ->
  454. ok;
  455. Res ->
  456. ?tp(postgres_connector_bad_parse2, #{result => Res}),
  457. validate_table_existence(Rest, SQL)
  458. end;
  459. _ ->
  460. validate_table_existence(Rest, SQL)
  461. catch
  462. exit:{noproc, _} ->
  463. validate_table_existence(Rest, SQL)
  464. end;
  465. validate_table_existence([], _SQL) ->
  466. %% All workers either replied an unexpected error; we will retry
  467. %% on the next health check.
  468. ok.
  469. %% ===================================================================
  470. connect(Opts) ->
  471. Host = proplists:get_value(host, Opts),
  472. Username = proplists:get_value(username, Opts),
  473. %% TODO: teach `epgsql` to accept 0-arity closures as passwords.
  474. Password = emqx_secret:unwrap(proplists:get_value(password, Opts)),
  475. case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of
  476. {ok, _Conn} = Ok ->
  477. Ok;
  478. {error, Reason} ->
  479. {error, Reason}
  480. end.
  481. query(Conn, SQL, Params) ->
  482. case epgsql:equery(Conn, SQL, Params) of
  483. {error, sync_required} = Res ->
  484. ok = epgsql:sync(Conn),
  485. Res;
  486. Res ->
  487. Res
  488. end.
  489. prepared_query(Conn, Name, Params) ->
  490. case epgsql:prepared_query2(Conn, Name, Params) of
  491. {error, sync_required} = Res ->
  492. ok = epgsql:sync(Conn),
  493. Res;
  494. Res ->
  495. Res
  496. end.
  497. execute_batch(Conn, Statement, Params) ->
  498. case epgsql:execute_batch(Conn, Statement, Params) of
  499. {error, sync_required} = Res ->
  500. ok = epgsql:sync(Conn),
  501. Res;
  502. Res ->
  503. Res
  504. end.
  505. conn_opts(Opts) ->
  506. conn_opts(Opts, []).
  507. conn_opts([], Acc) ->
  508. Acc;
  509. conn_opts([Opt = {database, _} | Opts], Acc) ->
  510. conn_opts(Opts, [Opt | Acc]);
  511. conn_opts([{ssl, Bool} | Opts], Acc) when is_boolean(Bool) ->
  512. Flag =
  513. case Bool of
  514. true -> required;
  515. false -> false
  516. end,
  517. conn_opts(Opts, [{ssl, Flag} | Acc]);
  518. conn_opts([Opt = {port, _} | Opts], Acc) ->
  519. conn_opts(Opts, [Opt | Acc]);
  520. conn_opts([Opt = {timeout, _} | Opts], Acc) ->
  521. conn_opts(Opts, [Opt | Acc]);
  522. conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
  523. conn_opts(Opts, [Opt | Acc]);
  524. conn_opts([_Opt | Opts], Acc) ->
  525. conn_opts(Opts, Acc).
  526. parse_prepare_sql(Config, SQLID) ->
  527. Queries =
  528. case Config of
  529. #{prepare_statement := Qs} ->
  530. Qs;
  531. #{sql := Query} ->
  532. #{SQLID => Query};
  533. #{} ->
  534. #{}
  535. end,
  536. Templates = maps:fold(fun parse_prepare_sql/3, #{}, Queries),
  537. #{query_templates => Templates}.
  538. parse_prepare_sql(Key, Query, Acc) ->
  539. Template = emqx_template_sql:parse_prepstmt(Query, #{parameters => '$n'}),
  540. Acc#{Key => Template}.
  541. render_prepare_sql_row(RowTemplate, Data) ->
  542. % NOTE: ignoring errors here, missing variables will be replaced with `null`.
  543. {Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, {emqx_jsonish, Data}),
  544. Row.
  545. init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 ->
  546. State;
  547. init_prepare(State = #{}) ->
  548. case prepare_sql(State) of
  549. {ok, PrepStatements} ->
  550. State#{prepares => PrepStatements};
  551. Error ->
  552. TranslatedError = translate_to_log_context(Error),
  553. ?SLOG(
  554. error,
  555. maps:merge(
  556. #{msg => <<"postgresql_init_prepare_statement_failed">>},
  557. TranslatedError
  558. )
  559. ),
  560. %% mark the prepares failed
  561. State#{prepares => {error, export_error(TranslatedError)}}
  562. end.
  563. prepare_sql(#{query_templates := Templates, pool_name := PoolName}) ->
  564. prepare_sql(maps:to_list(Templates), PoolName).
  565. prepare_sql(Templates, PoolName) ->
  566. case do_prepare_sql(Templates, PoolName) of
  567. {ok, _Sts} = Ok ->
  568. %% prepare for reconnect
  569. ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]}),
  570. Ok;
  571. Error ->
  572. Error
  573. end.
  574. do_prepare_sql(Templates, PoolName) ->
  575. do_prepare_sql(ecpool:workers(PoolName), Templates, #{}).
  576. do_prepare_sql([{_Name, Worker} | Rest], Templates, _LastSts) ->
  577. {ok, Conn} = ecpool_worker:client(Worker),
  578. case prepare_sql_to_conn(Conn, Templates) of
  579. {ok, Sts} ->
  580. do_prepare_sql(Rest, Templates, Sts);
  581. Error ->
  582. Error
  583. end;
  584. do_prepare_sql([], _Prepares, LastSts) ->
  585. {ok, LastSts}.
  586. prepare_sql_to_conn(Conn, Prepares) ->
  587. prepare_sql_to_conn(Conn, Prepares, #{}).
  588. prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) ->
  589. {ok, Statements};
  590. prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when is_pid(Conn) ->
  591. LogMeta = #{msg => "postgresql_prepare_statement", name => Key, sql => SQL},
  592. ?SLOG(info, LogMeta),
  593. case epgsql:parse2(Conn, Key, SQL, []) of
  594. {ok, Statement} ->
  595. prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement});
  596. {error, {error, error, _, undefined_table, _, _} = Error} ->
  597. %% Target table is not created
  598. ?tp(pgsql_undefined_table, #{}),
  599. LogMsg =
  600. maps:merge(
  601. LogMeta#{msg => "postgresql_parse_failed"},
  602. translate_to_log_context(Error)
  603. ),
  604. ?SLOG(error, LogMsg),
  605. {error, undefined_table};
  606. {error, Error} ->
  607. TranslatedError = translate_to_log_context(Error),
  608. LogMsg =
  609. maps:merge(
  610. LogMeta#{msg => "postgresql_parse_failed"},
  611. TranslatedError
  612. ),
  613. ?SLOG(error, LogMsg),
  614. {error, export_error(TranslatedError)}
  615. end.
  616. to_bin(Bin) when is_binary(Bin) ->
  617. Bin;
  618. to_bin(Atom) when is_atom(Atom) ->
  619. erlang:atom_to_binary(Atom).
  620. handle_result({error, {recoverable_error, _Error}} = Res) ->
  621. Res;
  622. handle_result({error, {unrecoverable_error, _Error}} = Res) ->
  623. Res;
  624. handle_result({error, disconnected}) ->
  625. {error, {recoverable_error, disconnected}};
  626. handle_result({error, Error}) ->
  627. TranslatedError = translate_to_log_context(Error),
  628. {error, {unrecoverable_error, export_error(TranslatedError)}};
  629. handle_result(Res) ->
  630. Res.
  631. handle_batch_result([{ok, Count} | Rest], Acc) ->
  632. handle_batch_result(Rest, Acc + Count);
  633. handle_batch_result([{error, Error} | _Rest], _Acc) ->
  634. TranslatedError = translate_to_log_context(Error),
  635. {error, {unrecoverable_error, export_error(TranslatedError)}};
  636. handle_batch_result([], Acc) ->
  637. {ok, Acc}.
  638. translate_to_log_context({error, Reason}) ->
  639. translate_to_log_context(Reason);
  640. translate_to_log_context(#error{} = Reason) ->
  641. #error{
  642. severity = Severity,
  643. code = Code,
  644. codename = Codename,
  645. message = Message,
  646. extra = Extra
  647. } = Reason,
  648. #{
  649. driver_severity => Severity,
  650. driver_error_codename => Codename,
  651. driver_error_code => Code,
  652. driver_error_message => emqx_logger_textfmt:try_format_unicode(Message),
  653. driver_error_extra => Extra
  654. };
  655. translate_to_log_context(Reason) ->
  656. #{reason => Reason}.
  657. export_error(#{
  658. driver_severity := Severity,
  659. driver_error_codename := Codename,
  660. driver_error_code := Code
  661. }) ->
  662. %% Extra information has already been logged.
  663. #{
  664. error_code => Code,
  665. error_codename => Codename,
  666. severity => Severity
  667. };
  668. export_error(#{reason := Reason}) ->
  669. Reason;
  670. export_error(Error) ->
  671. Error.