emqx_postgresql.erl 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_postgresql).
  17. -include("emqx_postgresql.hrl").
  18. -include_lib("emqx_connector/include/emqx_connector.hrl").
  19. -include_lib("typerefl/include/types.hrl").
  20. -include_lib("emqx/include/logger.hrl").
  21. -include_lib("hocon/include/hoconsc.hrl").
  22. -include_lib("epgsql/include/epgsql.hrl").
  23. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  24. -export([roots/0, fields/1]).
  25. -behaviour(emqx_resource).
  26. %% callbacks of behaviour emqx_resource
  27. -export([
  28. callback_mode/0,
  29. on_start/2,
  30. on_stop/2,
  31. on_query/3,
  32. on_batch_query/3,
  33. on_get_status/2,
  34. on_add_channel/4,
  35. on_remove_channel/3,
  36. on_get_channels/1,
  37. on_get_channel_status/3
  38. ]).
  39. -export([connect/1]).
  40. -export([
  41. query/3,
  42. prepared_query/3,
  43. execute_batch/3
  44. ]).
  45. %% for ecpool workers usage
  46. -export([do_get_status/1, prepare_sql_to_conn/2]).
  47. -define(PGSQL_HOST_OPTIONS, #{
  48. default_port => ?PGSQL_DEFAULT_PORT
  49. }).
  50. -type template() :: {unicode:chardata(), emqx_template_sql:row_template()}.
  51. -type state() ::
  52. #{
  53. pool_name := binary(),
  54. query_templates := #{binary() => template()},
  55. prepares := #{binary() => epgsql:statement()} | {error, _}
  56. }.
  57. %% FIXME: add `{error, sync_required}' to `epgsql:execute_batch'
  58. %% We want to be able to call sync if any message from the backend leaves the driver in an
  59. %% inconsistent state needing sync.
  60. -dialyzer({nowarn_function, [execute_batch/3]}).
  61. %%=====================================================================
  62. roots() ->
  63. [{config, #{type => hoconsc:ref(?MODULE, config)}}].
  64. fields(config) ->
  65. [{server, server()}] ++
  66. adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
  67. emqx_connector_schema_lib:ssl_fields() ++
  68. emqx_connector_schema_lib:prepare_statement_fields().
  69. server() ->
  70. Meta = #{desc => ?DESC("server")},
  71. emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS).
  72. adjust_fields(Fields) ->
  73. lists:map(
  74. fun
  75. ({username, Sc}) ->
  76. %% to please dialyzer...
  77. Override = #{type => hocon_schema:field_schema(Sc, type), required => true},
  78. {username, hocon_schema:override(Sc, Override)};
  79. (Field) ->
  80. Field
  81. end,
  82. Fields
  83. ).
  84. %% ===================================================================
  85. callback_mode() -> always_sync.
  86. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.
  87. on_start(
  88. InstId,
  89. #{
  90. server := Server,
  91. database := DB,
  92. username := User,
  93. pool_size := PoolSize,
  94. ssl := SSL
  95. } = Config
  96. ) ->
  97. #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?PGSQL_HOST_OPTIONS),
  98. ?SLOG(info, #{
  99. msg => "starting_postgresql_connector",
  100. connector => InstId,
  101. config => emqx_utils:redact(Config)
  102. }),
  103. SslOpts =
  104. case maps:get(enable, SSL) of
  105. true ->
  106. [
  107. %% note: this is converted to `required' in
  108. %% `conn_opts/2', and there's a boolean guard
  109. %% there; if this is set to `required' here,
  110. %% that'll require changing `conn_opts/2''s guard.
  111. {ssl, true},
  112. {ssl_opts, emqx_tls_lib:to_client_opts(SSL)}
  113. ];
  114. false ->
  115. [{ssl, false}]
  116. end,
  117. Options = [
  118. {host, Host},
  119. {port, Port},
  120. {username, User},
  121. {password, maps:get(password, Config, emqx_secret:wrap(""))},
  122. {database, DB},
  123. {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
  124. {pool_size, PoolSize}
  125. ],
  126. State1 = parse_prepare_sql(Config, <<"send_message">>),
  127. State2 = State1#{installed_channels => #{}},
  128. case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
  129. ok ->
  130. {ok, init_prepare(State2#{pool_name => InstId, prepares => #{}})};
  131. {error, Reason} ->
  132. ?tp(
  133. pgsql_connector_start_failed,
  134. #{error => Reason}
  135. ),
  136. {error, Reason}
  137. end.
  138. on_stop(InstId, State) ->
  139. ?SLOG(info, #{
  140. msg => "stopping_postgresql_connector",
  141. connector => InstId
  142. }),
  143. close_connections(State),
  144. emqx_resource_pool:stop(InstId).
  145. close_connections(#{pool_name := PoolName} = _State) ->
  146. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  147. close_connections_with_worker_pids(WorkerPids),
  148. ok.
  149. close_connections_with_worker_pids([WorkerPid | Rest]) ->
  150. %% We ignore errors since any error probably means that the
  151. %% connection is closed already.
  152. try ecpool_worker:client(WorkerPid) of
  153. {ok, Conn} ->
  154. _ = epgsql:close(Conn),
  155. close_connections_with_worker_pids(Rest);
  156. _ ->
  157. close_connections_with_worker_pids(Rest)
  158. catch
  159. _:_ ->
  160. close_connections_with_worker_pids(Rest)
  161. end;
  162. close_connections_with_worker_pids([]) ->
  163. ok.
  164. on_add_channel(
  165. _InstId,
  166. #{
  167. installed_channels := InstalledChannels
  168. } = OldState,
  169. ChannelId,
  170. ChannelConfig
  171. ) ->
  172. %% The following will throw an exception if the bridge producers fails to start
  173. {ok, ChannelState} = create_channel_state(ChannelId, OldState, ChannelConfig),
  174. case ChannelState of
  175. #{prepares := {error, Reason}} ->
  176. {error, {unhealthy_target, Reason}};
  177. _ ->
  178. NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
  179. %% Update state
  180. NewState = OldState#{installed_channels => NewInstalledChannels},
  181. {ok, NewState}
  182. end.
  183. create_channel_state(
  184. ChannelId,
  185. #{pool_name := PoolName} = _ConnectorState,
  186. #{parameters := Parameters} = _ChannelConfig
  187. ) ->
  188. State1 = parse_prepare_sql(Parameters, ChannelId),
  189. {ok,
  190. init_prepare(State1#{
  191. pool_name => PoolName,
  192. prepare_statement => #{}
  193. })}.
  194. on_remove_channel(
  195. _InstId,
  196. #{
  197. installed_channels := InstalledChannels
  198. } = OldState,
  199. ChannelId
  200. ) ->
  201. %% Close prepared statements
  202. ok = close_prepared_statement(ChannelId, OldState),
  203. NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
  204. %% Update state
  205. NewState = OldState#{installed_channels => NewInstalledChannels},
  206. {ok, NewState}.
  207. close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) ->
  208. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  209. close_prepared_statement(WorkerPids, ChannelId, State),
  210. ok.
  211. close_prepared_statement([WorkerPid | Rest], ChannelId, State) ->
  212. %% We ignore errors since any error probably means that the
  213. %% prepared statement doesn't exist.
  214. try ecpool_worker:client(WorkerPid) of
  215. {ok, Conn} ->
  216. Statement = get_prepared_statement(ChannelId, State),
  217. _ = epgsql:close(Conn, Statement),
  218. close_prepared_statement(Rest, ChannelId, State);
  219. _ ->
  220. close_prepared_statement(Rest, ChannelId, State)
  221. catch
  222. _:_ ->
  223. close_prepared_statement(Rest, ChannelId, State)
  224. end;
  225. close_prepared_statement([], _ChannelId, _State) ->
  226. ok.
  227. on_get_channel_status(
  228. _ResId,
  229. ChannelId,
  230. #{
  231. pool_name := PoolName,
  232. installed_channels := Channels
  233. } = _State
  234. ) ->
  235. ChannelState = maps:get(ChannelId, Channels),
  236. case
  237. do_check_channel_sql(
  238. PoolName,
  239. ChannelId,
  240. ChannelState
  241. )
  242. of
  243. ok ->
  244. connected;
  245. {error, undefined_table} ->
  246. {error, {unhealthy_target, <<"Table does not exist">>}}
  247. end.
  248. do_check_channel_sql(
  249. PoolName,
  250. ChannelId,
  251. #{query_templates := ChannelQueryTemplates} = _ChannelState
  252. ) ->
  253. {SQL, _RowTemplate} = maps:get(ChannelId, ChannelQueryTemplates),
  254. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  255. validate_table_existence(WorkerPids, SQL).
  256. on_get_channels(ResId) ->
  257. emqx_bridge_v2:get_channels_for_connector(ResId).
  258. on_query(InstId, {TypeOrKey, NameOrSQL}, State) ->
  259. on_query(InstId, {TypeOrKey, NameOrSQL, []}, State);
  260. on_query(
  261. InstId,
  262. {TypeOrKey, NameOrSQL, Params},
  263. #{pool_name := PoolName} = State
  264. ) ->
  265. ?SLOG(debug, #{
  266. msg => "postgresql_connector_received_sql_query",
  267. connector => InstId,
  268. type => TypeOrKey,
  269. sql => NameOrSQL,
  270. state => State
  271. }),
  272. Type = pgsql_query_type(TypeOrKey),
  273. {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
  274. Res = on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data),
  275. handle_result(Res).
  276. pgsql_query_type(sql) ->
  277. query;
  278. pgsql_query_type(query) ->
  279. query;
  280. pgsql_query_type(prepared_query) ->
  281. prepared_query;
  282. %% for bridge
  283. pgsql_query_type(_) ->
  284. pgsql_query_type(prepared_query).
  285. on_batch_query(
  286. InstId,
  287. [{Key, _} = Request | _] = BatchReq,
  288. #{pool_name := PoolName} = State
  289. ) ->
  290. BinKey = to_bin(Key),
  291. case get_template(BinKey, State) of
  292. undefined ->
  293. Log = #{
  294. connector => InstId,
  295. first_request => Request,
  296. state => State,
  297. msg => "batch prepare not implemented"
  298. },
  299. ?SLOG(error, Log),
  300. {error, {unrecoverable_error, batch_prepare_not_implemented}};
  301. {_Statement, RowTemplate} ->
  302. PrepStatement = get_prepared_statement(BinKey, State),
  303. Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
  304. case on_sql_query(InstId, PoolName, execute_batch, PrepStatement, Rows) of
  305. {error, _Error} = Result ->
  306. handle_result(Result);
  307. {_Column, Results} ->
  308. handle_batch_result(Results, 0)
  309. end
  310. end;
  311. on_batch_query(InstId, BatchReq, State) ->
  312. ?SLOG(error, #{
  313. connector => InstId,
  314. request => BatchReq,
  315. state => State,
  316. msg => "invalid request"
  317. }),
  318. {error, {unrecoverable_error, invalid_request}}.
  319. proc_sql_params(query, SQLOrKey, Params, _State) ->
  320. {SQLOrKey, Params};
  321. proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
  322. {SQLOrKey, Params};
  323. proc_sql_params(TypeOrKey, SQLOrData, Params, State) ->
  324. BinKey = to_bin(TypeOrKey),
  325. case get_template(BinKey, State) of
  326. undefined ->
  327. {SQLOrData, Params};
  328. {_Statement, RowTemplate} ->
  329. {BinKey, render_prepare_sql_row(RowTemplate, SQLOrData)}
  330. end.
  331. get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) ->
  332. BinKey = to_bin(Key),
  333. ChannelState = maps:get(BinKey, Channels),
  334. ChannelQueryTemplates = maps:get(query_templates, ChannelState),
  335. maps:get(BinKey, ChannelQueryTemplates);
  336. get_template(Key, #{query_templates := Templates}) ->
  337. BinKey = to_bin(Key),
  338. maps:get(BinKey, Templates, undefined).
  339. get_prepared_statement(Key, #{installed_channels := Channels} = _State) when
  340. is_map_key(Key, Channels)
  341. ->
  342. BinKey = to_bin(Key),
  343. ChannelState = maps:get(BinKey, Channels),
  344. ChannelPreparedStatements = maps:get(prepares, ChannelState),
  345. maps:get(BinKey, ChannelPreparedStatements);
  346. get_prepared_statement(Key, #{prepares := PrepStatements}) ->
  347. BinKey = to_bin(Key),
  348. maps:get(BinKey, PrepStatements).
  349. on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
  350. try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
  351. {error, Reason} ->
  352. ?tp(
  353. pgsql_connector_query_return,
  354. #{error => Reason}
  355. ),
  356. TranslatedError = translate_to_log_context(Reason),
  357. ?SLOG(
  358. error,
  359. maps:merge(
  360. #{
  361. msg => "postgresql_connector_do_sql_query_failed",
  362. connector => InstId,
  363. type => Type,
  364. sql => NameOrSQL
  365. },
  366. TranslatedError
  367. )
  368. ),
  369. case Reason of
  370. sync_required ->
  371. {error, {recoverable_error, Reason}};
  372. ecpool_empty ->
  373. {error, {recoverable_error, Reason}};
  374. {error, error, _, undefined_table, _, _} ->
  375. {error, {unrecoverable_error, export_error(TranslatedError)}};
  376. _ ->
  377. {error, export_error(TranslatedError)}
  378. end;
  379. Result ->
  380. ?tp(
  381. pgsql_connector_query_return,
  382. #{result => Result}
  383. ),
  384. Result
  385. catch
  386. error:function_clause:Stacktrace ->
  387. ?SLOG(error, #{
  388. msg => "postgresql_connector_do_sql_query_failed",
  389. connector => InstId,
  390. type => Type,
  391. sql => NameOrSQL,
  392. reason => function_clause,
  393. stacktrace => Stacktrace
  394. }),
  395. {error, {unrecoverable_error, invalid_request}}
  396. end.
  397. on_get_status(_InstId, #{pool_name := PoolName} = State) ->
  398. case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
  399. true ->
  400. case do_check_prepares(State) of
  401. ok ->
  402. connected;
  403. {ok, NState} ->
  404. %% return new state with prepared statements
  405. {connected, NState};
  406. {error, undefined_table} ->
  407. %% return new state indicating that we are connected but the target table is not created
  408. {disconnected, State, unhealthy_target};
  409. {error, _Reason} ->
  410. %% do not log error, it is logged in prepare_sql_to_conn
  411. connecting
  412. end;
  413. false ->
  414. connecting
  415. end.
  416. do_get_status(Conn) ->
  417. ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")).
  418. do_check_prepares(
  419. #{
  420. pool_name := PoolName,
  421. query_templates := #{<<"send_message">> := {SQL, _RowTemplate}}
  422. }
  423. ) ->
  424. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  425. case validate_table_existence(WorkerPids, SQL) of
  426. ok ->
  427. ok;
  428. {error, Reason} ->
  429. {error, Reason}
  430. end;
  431. do_check_prepares(#{prepares := Prepares}) when is_map(Prepares) ->
  432. ok;
  433. do_check_prepares(#{prepares := {error, _}} = State) ->
  434. %% retry to prepare
  435. case prepare_sql(State) of
  436. {ok, PrepStatements} ->
  437. %% remove the error
  438. {ok, State#{prepares := PrepStatements}};
  439. {error, Reason} ->
  440. {error, Reason}
  441. end.
  442. -spec validate_table_existence([pid()], binary()) -> ok | {error, undefined_table}.
  443. validate_table_existence([WorkerPid | Rest], SQL) ->
  444. try ecpool_worker:client(WorkerPid) of
  445. {ok, Conn} ->
  446. case epgsql:parse2(Conn, "", SQL, []) of
  447. {error, {_, _, _, undefined_table, _, _}} ->
  448. {error, undefined_table};
  449. Res when is_tuple(Res) andalso ok == element(1, Res) ->
  450. ok;
  451. Res ->
  452. ?tp(postgres_connector_bad_parse2, #{result => Res}),
  453. validate_table_existence(Rest, SQL)
  454. end;
  455. _ ->
  456. validate_table_existence(Rest, SQL)
  457. catch
  458. exit:{noproc, _} ->
  459. validate_table_existence(Rest, SQL)
  460. end;
  461. validate_table_existence([], _SQL) ->
  462. %% All workers either replied an unexpected error; we will retry
  463. %% on the next health check.
  464. ok.
  465. %% ===================================================================
  466. connect(Opts) ->
  467. Host = proplists:get_value(host, Opts),
  468. Username = proplists:get_value(username, Opts),
  469. %% TODO: teach `epgsql` to accept 0-arity closures as passwords.
  470. Password = emqx_secret:unwrap(proplists:get_value(password, Opts)),
  471. case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of
  472. {ok, _Conn} = Ok ->
  473. Ok;
  474. {error, Reason} ->
  475. {error, Reason}
  476. end.
  477. query(Conn, SQL, Params) ->
  478. case epgsql:equery(Conn, SQL, Params) of
  479. {error, sync_required} = Res ->
  480. ok = epgsql:sync(Conn),
  481. Res;
  482. Res ->
  483. Res
  484. end.
  485. prepared_query(Conn, Name, Params) ->
  486. case epgsql:prepared_query2(Conn, Name, Params) of
  487. {error, sync_required} = Res ->
  488. ok = epgsql:sync(Conn),
  489. Res;
  490. Res ->
  491. Res
  492. end.
  493. execute_batch(Conn, Statement, Params) ->
  494. case epgsql:execute_batch(Conn, Statement, Params) of
  495. {error, sync_required} = Res ->
  496. ok = epgsql:sync(Conn),
  497. Res;
  498. Res ->
  499. Res
  500. end.
  501. conn_opts(Opts) ->
  502. conn_opts(Opts, []).
  503. conn_opts([], Acc) ->
  504. Acc;
  505. conn_opts([Opt = {database, _} | Opts], Acc) ->
  506. conn_opts(Opts, [Opt | Acc]);
  507. conn_opts([{ssl, Bool} | Opts], Acc) when is_boolean(Bool) ->
  508. Flag =
  509. case Bool of
  510. true -> required;
  511. false -> false
  512. end,
  513. conn_opts(Opts, [{ssl, Flag} | Acc]);
  514. conn_opts([Opt = {port, _} | Opts], Acc) ->
  515. conn_opts(Opts, [Opt | Acc]);
  516. conn_opts([Opt = {timeout, _} | Opts], Acc) ->
  517. conn_opts(Opts, [Opt | Acc]);
  518. conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
  519. conn_opts(Opts, [Opt | Acc]);
  520. conn_opts([_Opt | Opts], Acc) ->
  521. conn_opts(Opts, Acc).
  522. parse_prepare_sql(Config, SQLID) ->
  523. Queries =
  524. case Config of
  525. #{prepare_statement := Qs} ->
  526. Qs;
  527. #{sql := Query} ->
  528. #{SQLID => Query};
  529. #{} ->
  530. #{}
  531. end,
  532. Templates = maps:fold(fun parse_prepare_sql/3, #{}, Queries),
  533. #{query_templates => Templates}.
  534. parse_prepare_sql(Key, Query, Acc) ->
  535. Template = emqx_template_sql:parse_prepstmt(Query, #{parameters => '$n'}),
  536. Acc#{Key => Template}.
  537. render_prepare_sql_row(RowTemplate, Data) ->
  538. % NOTE: ignoring errors here, missing variables will be replaced with `null`.
  539. {Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, {emqx_jsonish, Data}),
  540. Row.
  541. init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 ->
  542. State;
  543. init_prepare(State = #{}) ->
  544. case prepare_sql(State) of
  545. {ok, PrepStatements} ->
  546. State#{prepares => PrepStatements};
  547. Error ->
  548. TranslatedError = translate_to_log_context(Error),
  549. ?SLOG(
  550. error,
  551. maps:merge(
  552. #{msg => <<"postgresql_init_prepare_statement_failed">>},
  553. TranslatedError
  554. )
  555. ),
  556. %% mark the prepares failed
  557. State#{prepares => {error, export_error(TranslatedError)}}
  558. end.
  559. prepare_sql(#{query_templates := Templates, pool_name := PoolName}) ->
  560. prepare_sql(maps:to_list(Templates), PoolName).
  561. prepare_sql(Templates, PoolName) ->
  562. case do_prepare_sql(Templates, PoolName) of
  563. {ok, _Sts} = Ok ->
  564. %% prepare for reconnect
  565. ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]}),
  566. Ok;
  567. Error ->
  568. Error
  569. end.
  570. do_prepare_sql(Templates, PoolName) ->
  571. do_prepare_sql(ecpool:workers(PoolName), Templates, #{}).
  572. do_prepare_sql([{_Name, Worker} | Rest], Templates, _LastSts) ->
  573. {ok, Conn} = ecpool_worker:client(Worker),
  574. case prepare_sql_to_conn(Conn, Templates) of
  575. {ok, Sts} ->
  576. do_prepare_sql(Rest, Templates, Sts);
  577. Error ->
  578. Error
  579. end;
  580. do_prepare_sql([], _Prepares, LastSts) ->
  581. {ok, LastSts}.
  582. prepare_sql_to_conn(Conn, Prepares) ->
  583. prepare_sql_to_conn(Conn, Prepares, #{}).
  584. prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) ->
  585. {ok, Statements};
  586. prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when is_pid(Conn) ->
  587. LogMeta = #{msg => "postgresql_prepare_statement", name => Key, sql => SQL},
  588. ?SLOG(info, LogMeta),
  589. case epgsql:parse2(Conn, Key, SQL, []) of
  590. {ok, Statement} ->
  591. prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement});
  592. {error, {error, error, _, undefined_table, _, _} = Error} ->
  593. %% Target table is not created
  594. ?tp(pgsql_undefined_table, #{}),
  595. LogMsg =
  596. maps:merge(
  597. LogMeta#{msg => "postgresql_parse_failed"},
  598. translate_to_log_context(Error)
  599. ),
  600. ?SLOG(error, LogMsg),
  601. {error, undefined_table};
  602. {error, Error} ->
  603. TranslatedError = translate_to_log_context(Error),
  604. LogMsg =
  605. maps:merge(
  606. LogMeta#{msg => "postgresql_parse_failed"},
  607. TranslatedError
  608. ),
  609. ?SLOG(error, LogMsg),
  610. {error, export_error(TranslatedError)}
  611. end.
  612. to_bin(Bin) when is_binary(Bin) ->
  613. Bin;
  614. to_bin(Atom) when is_atom(Atom) ->
  615. erlang:atom_to_binary(Atom).
  616. handle_result({error, {recoverable_error, _Error}} = Res) ->
  617. Res;
  618. handle_result({error, {unrecoverable_error, _Error}} = Res) ->
  619. Res;
  620. handle_result({error, disconnected}) ->
  621. {error, {recoverable_error, disconnected}};
  622. handle_result({error, Error}) ->
  623. TranslatedError = translate_to_log_context(Error),
  624. {error, {unrecoverable_error, export_error(TranslatedError)}};
  625. handle_result(Res) ->
  626. Res.
  627. handle_batch_result([{ok, Count} | Rest], Acc) ->
  628. handle_batch_result(Rest, Acc + Count);
  629. handle_batch_result([{error, Error} | _Rest], _Acc) ->
  630. TranslatedError = translate_to_log_context(Error),
  631. {error, {unrecoverable_error, export_error(TranslatedError)}};
  632. handle_batch_result([], Acc) ->
  633. {ok, Acc}.
  634. translate_to_log_context({error, Reason}) ->
  635. translate_to_log_context(Reason);
  636. translate_to_log_context(#error{} = Reason) ->
  637. #error{
  638. severity = Severity,
  639. code = Code,
  640. codename = Codename,
  641. message = Message,
  642. extra = Extra
  643. } = Reason,
  644. #{
  645. driver_severity => Severity,
  646. driver_error_codename => Codename,
  647. driver_error_code => Code,
  648. driver_error_message => emqx_logger_textfmt:try_format_unicode(Message),
  649. driver_error_extra => Extra
  650. };
  651. translate_to_log_context(Reason) ->
  652. #{reason => Reason}.
  653. export_error(#{
  654. driver_severity := Severity,
  655. driver_error_codename := Codename,
  656. driver_error_code := Code
  657. }) ->
  658. %% Extra information has already been logged.
  659. #{
  660. error_code => Code,
  661. error_codename => Codename,
  662. severity => Severity
  663. };
  664. export_error(#{reason := Reason}) ->
  665. Reason;
  666. export_error(Error) ->
  667. Error.