emqx_postgresql.erl 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_postgresql).
  17. -include("emqx_postgresql.hrl").
  18. -include_lib("emqx_connector/include/emqx_connector.hrl").
  19. -include_lib("typerefl/include/types.hrl").
  20. -include_lib("emqx/include/logger.hrl").
  21. -include_lib("hocon/include/hoconsc.hrl").
  22. -include_lib("epgsql/include/epgsql.hrl").
  23. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  24. -export([roots/0, fields/1, namespace/0]).
  25. -behaviour(emqx_resource).
  26. %% callbacks of behaviour emqx_resource
  27. -export([
  28. resource_type/0,
  29. callback_mode/0,
  30. on_start/2,
  31. on_stop/2,
  32. on_query/3,
  33. on_batch_query/3,
  34. on_get_status/2,
  35. on_add_channel/4,
  36. on_remove_channel/3,
  37. on_get_channels/1,
  38. on_get_channel_status/3,
  39. on_format_query_result/1
  40. ]).
  41. -export([connect/1]).
  42. -export([
  43. query/3,
  44. prepared_query/3,
  45. execute_batch/3
  46. ]).
  47. -export([disable_prepared_statements/0]).
  48. %% for ecpool workers usage
  49. -export([do_get_status/1, prepare_sql_to_conn/2, get_reconnect_callback_signature/1]).
  50. -define(PGSQL_HOST_OPTIONS, #{
  51. default_port => ?PGSQL_DEFAULT_PORT
  52. }).
  53. -type connector_resource_id() :: binary().
  54. -type action_resource_id() :: binary().
  55. -type template() :: {unicode:chardata(), emqx_template_sql:row_template()}.
  56. -type state() ::
  57. #{
  58. pool_name := binary(),
  59. query_templates := #{binary() => template()},
  60. prepares := disabled | #{binary() => epgsql:statement()} | {error, _}
  61. }.
  62. %% FIXME: add `{error, sync_required}' to `epgsql:execute_batch'
  63. %% We want to be able to call sync if any message from the backend leaves the driver in an
  64. %% inconsistent state needing sync.
  65. -dialyzer({nowarn_function, [execute_batch/3]}).
  66. %%=====================================================================
  67. namespace() -> postgres.
  68. roots() ->
  69. [{config, #{type => hoconsc:ref(?MODULE, config)}}].
  70. fields(config) ->
  71. [
  72. {server, server()},
  73. disable_prepared_statements()
  74. ] ++
  75. adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
  76. emqx_connector_schema_lib:ssl_fields() ++
  77. emqx_connector_schema_lib:prepare_statement_fields().
  78. server() ->
  79. Meta = #{desc => ?DESC("server")},
  80. emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS).
  81. disable_prepared_statements() ->
  82. {disable_prepared_statements,
  83. hoconsc:mk(
  84. boolean(),
  85. #{
  86. default => false,
  87. required => false,
  88. desc => ?DESC("disable_prepared_statements")
  89. }
  90. )}.
  91. adjust_fields(Fields) ->
  92. lists:map(
  93. fun
  94. ({username, Sc}) ->
  95. %% to please dialyzer...
  96. Override = #{type => hocon_schema:field_schema(Sc, type), required => true},
  97. {username, hocon_schema:override(Sc, Override)};
  98. (Field) ->
  99. Field
  100. end,
  101. Fields
  102. ).
  103. %% ===================================================================
  104. resource_type() -> pgsql.
  105. callback_mode() -> always_sync.
  106. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.
  107. on_start(
  108. InstId,
  109. #{
  110. server := Server,
  111. disable_prepared_statements := DisablePreparedStatements,
  112. database := DB,
  113. username := User,
  114. pool_size := PoolSize,
  115. ssl := SSL
  116. } = Config
  117. ) ->
  118. #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?PGSQL_HOST_OPTIONS),
  119. ?SLOG(info, #{
  120. msg => "starting_postgresql_connector",
  121. connector => InstId,
  122. config => emqx_utils:redact(Config)
  123. }),
  124. SslOpts =
  125. case maps:get(enable, SSL) of
  126. true ->
  127. [
  128. %% note: this is converted to `required' in
  129. %% `conn_opts/2', and there's a boolean guard
  130. %% there; if this is set to `required' here,
  131. %% that'll require changing `conn_opts/2''s guard.
  132. {ssl, true},
  133. {ssl_opts, emqx_tls_lib:to_client_opts(SSL)}
  134. ];
  135. false ->
  136. [{ssl, false}]
  137. end,
  138. Options = [
  139. {host, Host},
  140. {port, Port},
  141. {username, User},
  142. {password, maps:get(password, Config, emqx_secret:wrap(""))},
  143. {database, DB},
  144. {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
  145. {pool_size, PoolSize}
  146. ],
  147. State1 = parse_sql_template(Config, <<"send_message">>),
  148. State2 = State1#{installed_channels => #{}},
  149. case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
  150. ok ->
  151. Prepares =
  152. case DisablePreparedStatements of
  153. true -> disabled;
  154. false -> #{}
  155. end,
  156. {ok, init_prepare(State2#{pool_name => InstId, prepares => Prepares})};
  157. {error, Reason} ->
  158. ?tp(
  159. pgsql_connector_start_failed,
  160. #{error => Reason}
  161. ),
  162. {error, Reason}
  163. end.
  164. on_stop(InstId, State) ->
  165. ?SLOG(info, #{
  166. msg => "stopping_postgresql_connector",
  167. connector => InstId
  168. }),
  169. close_connections(State),
  170. Res = emqx_resource_pool:stop(InstId),
  171. ?tp(postgres_stopped, #{instance_id => InstId}),
  172. Res.
  173. close_connections(#{pool_name := PoolName} = _State) ->
  174. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  175. close_connections_with_worker_pids(WorkerPids),
  176. ok.
  177. close_connections_with_worker_pids([WorkerPid | Rest]) ->
  178. %% We ignore errors since any error probably means that the
  179. %% connection is closed already.
  180. try ecpool_worker:client(WorkerPid) of
  181. {ok, Conn} ->
  182. _ = epgsql:close(Conn),
  183. close_connections_with_worker_pids(Rest);
  184. _ ->
  185. close_connections_with_worker_pids(Rest)
  186. catch
  187. _:_ ->
  188. close_connections_with_worker_pids(Rest)
  189. end;
  190. close_connections_with_worker_pids([]) ->
  191. ok.
  192. on_add_channel(
  193. _InstId,
  194. #{
  195. installed_channels := InstalledChannels
  196. } = OldState,
  197. ChannelId,
  198. ChannelConfig
  199. ) ->
  200. %% The following will throw an exception if the bridge producers fails to start
  201. {ok, ChannelState} = create_channel_state(ChannelId, OldState, ChannelConfig),
  202. case ChannelState of
  203. #{prepares := {error, Reason}} ->
  204. {error, {unhealthy_target, Reason}};
  205. _ ->
  206. NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
  207. %% Update state
  208. NewState = OldState#{installed_channels => NewInstalledChannels},
  209. {ok, NewState}
  210. end.
  211. create_channel_state(
  212. ChannelId,
  213. #{
  214. pool_name := PoolName,
  215. prepares := Prepares
  216. } = _ConnectorState,
  217. #{parameters := Parameters} = _ChannelConfig
  218. ) ->
  219. State1 = parse_sql_template(Parameters, ChannelId),
  220. {ok,
  221. init_prepare(State1#{
  222. pool_name => PoolName,
  223. prepares => Prepares,
  224. prepare_statement => #{}
  225. })}.
  226. on_remove_channel(
  227. _InstId,
  228. #{
  229. installed_channels := InstalledChannels
  230. } = OldState,
  231. ChannelId
  232. ) ->
  233. %% Close prepared statements
  234. ok = close_prepared_statement(ChannelId, OldState),
  235. NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
  236. %% Update state
  237. NewState = OldState#{installed_channels => NewInstalledChannels},
  238. {ok, NewState}.
  239. close_prepared_statement(_ChannelId, #{prepares := disabled}) ->
  240. ok;
  241. close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) ->
  242. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  243. close_prepared_statement(WorkerPids, ChannelId, State),
  244. ok.
  245. close_prepared_statement([WorkerPid | Rest], ChannelId, State) ->
  246. %% We ignore errors since any error probably means that the
  247. %% prepared statement doesn't exist. If it exists when we try
  248. %% to insert one with the same name, we will try to remove it
  249. %% again anyway.
  250. try ecpool_worker:client(WorkerPid) of
  251. {ok, Conn} ->
  252. ok = ecpool_worker:remove_reconnect_callback_by_signature(WorkerPid, ChannelId),
  253. case get_templated_statement(ChannelId, State) of
  254. {ok, Statement} ->
  255. _ = epgsql:close(Conn, Statement),
  256. close_prepared_statement(Rest, ChannelId, State);
  257. error ->
  258. %% channel was not added
  259. ok
  260. end;
  261. _ ->
  262. close_prepared_statement(Rest, ChannelId, State)
  263. catch
  264. _:_ ->
  265. close_prepared_statement(Rest, ChannelId, State)
  266. end;
  267. close_prepared_statement([], _ChannelId, _State) ->
  268. ok.
  269. on_get_channel_status(
  270. _ResId,
  271. ChannelId,
  272. #{
  273. pool_name := PoolName,
  274. installed_channels := Channels
  275. } = _State
  276. ) ->
  277. ChannelState = maps:get(ChannelId, Channels),
  278. case
  279. do_check_channel_sql(
  280. PoolName,
  281. ChannelId,
  282. ChannelState
  283. )
  284. of
  285. ok ->
  286. connected;
  287. {error, undefined_table} ->
  288. {error, {unhealthy_target, <<"Table does not exist">>}}
  289. end.
  290. do_check_channel_sql(
  291. PoolName,
  292. ChannelId,
  293. #{query_templates := ChannelQueryTemplates} = _ChannelState
  294. ) ->
  295. {SQL, _RowTemplate} = maps:get(ChannelId, ChannelQueryTemplates),
  296. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  297. validate_table_existence(WorkerPids, SQL).
  298. on_get_channels(ResId) ->
  299. emqx_bridge_v2:get_channels_for_connector(ResId).
  300. -spec on_query
  301. %% Called from authn and authz modules
  302. (connector_resource_id(), {prepared_query, binary(), [term()]}, state()) ->
  303. {ok, _} | {error, term()};
  304. %% Called from bridges
  305. (connector_resource_id(), {action_resource_id(), map()}, state()) ->
  306. {ok, _} | {error, term()}.
  307. on_query(InstId, {TypeOrKey, NameOrMap}, State) ->
  308. on_query(InstId, {TypeOrKey, NameOrMap, []}, State);
  309. on_query(
  310. InstId,
  311. {TypeOrKey, NameOrMap, Params},
  312. #{pool_name := PoolName} = State
  313. ) ->
  314. ?TRACE("QUERY", "postgresql_connector_received_sql_query", #{
  315. connector => InstId,
  316. type => TypeOrKey,
  317. sql => NameOrMap,
  318. state => State
  319. }),
  320. {QueryType, NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrMap, Params, State),
  321. emqx_trace:rendered_action_template(
  322. TypeOrKey,
  323. #{
  324. statement_type => QueryType,
  325. statement_or_name => NameOrSQL2,
  326. data => Data
  327. }
  328. ),
  329. Res = on_sql_query(InstId, PoolName, QueryType, NameOrSQL2, Data),
  330. ?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}),
  331. handle_result(Res).
  332. on_batch_query(
  333. InstId,
  334. [{Key, _} = Request | _] = BatchReq,
  335. #{pool_name := PoolName} = State
  336. ) ->
  337. BinKey = to_bin(Key),
  338. case get_template(BinKey, State) of
  339. undefined ->
  340. Log = #{
  341. connector => InstId,
  342. first_request => Request,
  343. state => State,
  344. msg => "batch prepare not implemented"
  345. },
  346. ?SLOG(error, Log),
  347. {error, {unrecoverable_error, batch_prepare_not_implemented}};
  348. {_Statement, RowTemplate} ->
  349. {ok, StatementTemplate} = get_templated_statement(BinKey, State),
  350. Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
  351. emqx_trace:rendered_action_template(
  352. Key,
  353. #{
  354. statement_type => execute_batch,
  355. statement_or_name => StatementTemplate,
  356. data => Rows
  357. }
  358. ),
  359. case on_sql_query(InstId, PoolName, execute_batch, StatementTemplate, Rows) of
  360. {error, _Error} = Result ->
  361. handle_result(Result);
  362. {_Column, Results} ->
  363. handle_batch_result(Results, 0)
  364. end
  365. end;
  366. on_batch_query(InstId, BatchReq, State) ->
  367. ?SLOG(error, #{
  368. connector => InstId,
  369. request => BatchReq,
  370. state => State,
  371. msg => "invalid request"
  372. }),
  373. {error, {unrecoverable_error, invalid_request}}.
  374. proc_sql_params(ActionResId, #{} = Map, [], State) when is_binary(ActionResId) ->
  375. %% When this connector is called from actions/bridges.
  376. DisablePreparedStatements = prepared_statements_disabled(State),
  377. {ExprTemplate, RowTemplate} = get_template(ActionResId, State),
  378. Rendered = render_prepare_sql_row(RowTemplate, Map),
  379. case DisablePreparedStatements of
  380. true ->
  381. {query, ExprTemplate, Rendered};
  382. false ->
  383. {prepared_query, ActionResId, Rendered}
  384. end;
  385. proc_sql_params(prepared_query, ConnResId, Params, State) ->
  386. %% When this connector is called from authn/authz modules
  387. DisablePreparedStatements = prepared_statements_disabled(State),
  388. case DisablePreparedStatements of
  389. true ->
  390. #{query_templates := #{ConnResId := {ExprTemplate, _VarsTemplate}}} = State,
  391. {query, ExprTemplate, Params};
  392. false ->
  393. %% Connector resource id itself is the prepared statement name
  394. {prepared_query, ConnResId, Params}
  395. end;
  396. proc_sql_params(QueryType, SQL, Params, _State) when
  397. is_atom(QueryType) andalso
  398. (is_binary(SQL) orelse is_list(SQL)) andalso
  399. is_list(Params)
  400. ->
  401. %% When called to do ad-hoc commands/queries.
  402. {QueryType, SQL, Params}.
  403. prepared_statements_disabled(State) ->
  404. maps:get(prepares, State, #{}) =:= disabled.
  405. get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) ->
  406. BinKey = to_bin(Key),
  407. ChannelState = maps:get(BinKey, Channels),
  408. ChannelQueryTemplates = maps:get(query_templates, ChannelState),
  409. maps:get(BinKey, ChannelQueryTemplates);
  410. get_template(Key, #{query_templates := Templates}) ->
  411. BinKey = to_bin(Key),
  412. maps:get(BinKey, Templates, undefined).
  413. get_templated_statement(Key, #{installed_channels := Channels} = _State) ->
  414. BinKey = to_bin(Key),
  415. case is_map_key(BinKey, Channels) of
  416. true ->
  417. ChannelState = maps:get(BinKey, Channels),
  418. case ChannelState of
  419. #{prepares := disabled, query_templates := #{BinKey := {ExprTemplate, _}}} ->
  420. {ok, ExprTemplate};
  421. #{prepares := #{BinKey := ExprTemplate}} ->
  422. {ok, ExprTemplate}
  423. end;
  424. false ->
  425. error
  426. end;
  427. get_templated_statement(Key, #{prepares := PrepStatements}) ->
  428. BinKey = to_bin(Key),
  429. {ok, maps:get(BinKey, PrepStatements)}.
  430. on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
  431. try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
  432. {error, Reason} ->
  433. ?tp(
  434. pgsql_connector_query_return,
  435. #{error => Reason}
  436. ),
  437. TranslatedError = translate_to_log_context(Reason),
  438. ?SLOG(
  439. error,
  440. maps:merge(
  441. #{
  442. msg => "postgresql_connector_do_sql_query_failed",
  443. connector => InstId,
  444. type => Type,
  445. sql => NameOrSQL
  446. },
  447. TranslatedError
  448. )
  449. ),
  450. case Reason of
  451. sync_required ->
  452. {error, {recoverable_error, Reason}};
  453. ecpool_empty ->
  454. {error, {recoverable_error, Reason}};
  455. {error, error, _, undefined_table, _, _} ->
  456. {error, {unrecoverable_error, export_error(TranslatedError)}};
  457. _ ->
  458. {error, export_error(TranslatedError)}
  459. end;
  460. Result ->
  461. ?tp(
  462. pgsql_connector_query_return,
  463. #{result => Result}
  464. ),
  465. Result
  466. catch
  467. error:function_clause:Stacktrace ->
  468. ?SLOG(error, #{
  469. msg => "postgresql_connector_do_sql_query_failed",
  470. connector => InstId,
  471. type => Type,
  472. sql => NameOrSQL,
  473. reason => function_clause,
  474. stacktrace => Stacktrace
  475. }),
  476. {error, {unrecoverable_error, invalid_request}}
  477. end.
  478. on_get_status(_InstId, #{pool_name := PoolName} = State) ->
  479. case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
  480. true ->
  481. case do_check_prepares(State) of
  482. ok ->
  483. connected;
  484. {ok, NState} ->
  485. %% return new state with prepared statements
  486. {connected, NState};
  487. {error, undefined_table} ->
  488. %% return new state indicating that we are connected but the target table is not created
  489. {disconnected, State, unhealthy_target};
  490. {error, _Reason} ->
  491. %% do not log error, it is logged in prepare_sql_to_conn
  492. connecting
  493. end;
  494. false ->
  495. connecting
  496. end.
  497. do_get_status(Conn) ->
  498. ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")).
  499. do_check_prepares(
  500. #{
  501. pool_name := PoolName,
  502. query_templates := #{<<"send_message">> := {SQL, _RowTemplate}}
  503. }
  504. ) ->
  505. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  506. case validate_table_existence(WorkerPids, SQL) of
  507. ok ->
  508. ok;
  509. {error, Reason} ->
  510. {error, Reason}
  511. end;
  512. do_check_prepares(#{prepares := disabled}) ->
  513. ok;
  514. do_check_prepares(#{prepares := Prepares}) when is_map(Prepares) ->
  515. ok;
  516. do_check_prepares(#{prepares := {error, _}} = State) ->
  517. %% retry to prepare
  518. case prepare_sql(State) of
  519. {ok, PrepStatements} ->
  520. %% remove the error
  521. {ok, State#{prepares := PrepStatements}};
  522. {error, Reason} ->
  523. {error, Reason}
  524. end.
  525. -spec validate_table_existence([pid()], binary()) -> ok | {error, undefined_table}.
  526. validate_table_existence([WorkerPid | Rest], SQL) ->
  527. try ecpool_worker:client(WorkerPid) of
  528. {ok, Conn} ->
  529. case epgsql:parse2(Conn, "", SQL, []) of
  530. {error, {_, _, _, undefined_table, _, _}} ->
  531. {error, undefined_table};
  532. Res when is_tuple(Res) andalso ok == element(1, Res) ->
  533. ok;
  534. Res ->
  535. ?tp(postgres_connector_bad_parse2, #{result => Res}),
  536. validate_table_existence(Rest, SQL)
  537. end;
  538. _ ->
  539. validate_table_existence(Rest, SQL)
  540. catch
  541. exit:{noproc, _} ->
  542. validate_table_existence(Rest, SQL)
  543. end;
  544. validate_table_existence([], _SQL) ->
  545. %% All workers either replied an unexpected error; we will retry
  546. %% on the next health check.
  547. ok.
  548. %% ===================================================================
  549. connect(Opts) ->
  550. Host = proplists:get_value(host, Opts),
  551. Username = proplists:get_value(username, Opts),
  552. %% TODO: teach `epgsql` to accept 0-arity closures as passwords.
  553. Password = emqx_secret:unwrap(proplists:get_value(password, Opts)),
  554. case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of
  555. {ok, _Conn} = Ok ->
  556. Ok;
  557. {error, Reason} ->
  558. {error, Reason}
  559. end.
  560. query(Conn, SQL, Params) ->
  561. case epgsql:equery(Conn, SQL, Params) of
  562. {error, sync_required} = Res ->
  563. ok = epgsql:sync(Conn),
  564. Res;
  565. Res ->
  566. Res
  567. end.
  568. prepared_query(Conn, Name, Params) ->
  569. case epgsql:prepared_query2(Conn, Name, Params) of
  570. {error, sync_required} = Res ->
  571. ok = epgsql:sync(Conn),
  572. Res;
  573. Res ->
  574. Res
  575. end.
  576. execute_batch(Conn, Statement, Params) ->
  577. case epgsql:execute_batch(Conn, Statement, Params) of
  578. {error, sync_required} = Res ->
  579. ok = epgsql:sync(Conn),
  580. Res;
  581. Res ->
  582. Res
  583. end.
  584. conn_opts(Opts) ->
  585. conn_opts(Opts, []).
  586. conn_opts([], Acc) ->
  587. Acc;
  588. conn_opts([Opt = {database, _} | Opts], Acc) ->
  589. conn_opts(Opts, [Opt | Acc]);
  590. conn_opts([{ssl, Bool} | Opts], Acc) when is_boolean(Bool) ->
  591. Flag =
  592. case Bool of
  593. true -> required;
  594. false -> false
  595. end,
  596. conn_opts(Opts, [{ssl, Flag} | Acc]);
  597. conn_opts([Opt = {port, _} | Opts], Acc) ->
  598. conn_opts(Opts, [Opt | Acc]);
  599. conn_opts([Opt = {timeout, _} | Opts], Acc) ->
  600. conn_opts(Opts, [Opt | Acc]);
  601. conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
  602. conn_opts(Opts, [Opt | Acc]);
  603. conn_opts([_Opt | Opts], Acc) ->
  604. conn_opts(Opts, Acc).
  605. parse_sql_template(Config, ChannelId) ->
  606. Queries =
  607. case Config of
  608. #{prepare_statement := Qs} ->
  609. Qs;
  610. #{sql := Query} ->
  611. #{ChannelId => Query};
  612. #{} ->
  613. #{}
  614. end,
  615. Templates = maps:fold(fun parse_sql_template/3, #{}, Queries),
  616. #{query_templates => Templates}.
  617. parse_sql_template(Key, Query, Acc) ->
  618. Template = emqx_template_sql:parse_prepstmt(Query, #{parameters => '$n'}),
  619. Acc#{Key => Template}.
  620. render_prepare_sql_row(RowTemplate, Data) ->
  621. % NOTE: ignoring errors here, missing variables will be replaced with `null`.
  622. {Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, {emqx_jsonish, Data}),
  623. Row.
  624. init_prepare(State = #{prepares := disabled}) ->
  625. State;
  626. init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 ->
  627. State;
  628. init_prepare(State = #{}) ->
  629. case prepare_sql(State) of
  630. {ok, PrepStatements} ->
  631. State#{prepares => PrepStatements};
  632. Error ->
  633. TranslatedError = translate_to_log_context(Error),
  634. ?SLOG(
  635. error,
  636. maps:merge(
  637. #{msg => <<"postgresql_init_prepare_statement_failed">>},
  638. TranslatedError
  639. )
  640. ),
  641. %% mark the prepares failed
  642. State#{prepares => {error, export_error(TranslatedError)}}
  643. end.
  644. prepare_sql(#{query_templates := Templates, pool_name := PoolName}) ->
  645. prepare_sql(maps:to_list(Templates), PoolName).
  646. prepare_sql(Templates, PoolName) ->
  647. case do_prepare_sql(Templates, PoolName) of
  648. {ok, _Sts} = Ok ->
  649. %% prepare for reconnect
  650. ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]}),
  651. Ok;
  652. Error ->
  653. Error
  654. end.
  655. %% this callback accepts the arg list provided to
  656. %% ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]})
  657. %% so ecpool_worker can de-duplicate the callbacks based on the signature.
  658. get_reconnect_callback_signature([[{ChannelId, _Template}]]) ->
  659. ChannelId.
  660. do_prepare_sql(Templates, PoolName) ->
  661. do_prepare_sql(ecpool:workers(PoolName), Templates, #{}).
  662. do_prepare_sql([{_Name, Worker} | Rest], Templates, _LastSts) ->
  663. {ok, Conn} = ecpool_worker:client(Worker),
  664. case prepare_sql_to_conn(Conn, Templates) of
  665. {ok, Sts} ->
  666. do_prepare_sql(Rest, Templates, Sts);
  667. Error ->
  668. Error
  669. end;
  670. do_prepare_sql([], _Prepares, LastSts) ->
  671. {ok, LastSts}.
  672. prepare_sql_to_conn(Conn, Prepares) ->
  673. prepare_sql_to_conn(Conn, Prepares, #{}, 0).
  674. prepare_sql_to_conn(Conn, [], Statements, _Attempts) when is_pid(Conn) ->
  675. {ok, Statements};
  676. prepare_sql_to_conn(Conn, [{_Key, _} | _Rest], _Statements, _MaxAttempts = 2) when is_pid(Conn) ->
  677. failed_to_remove_prev_prepared_statement_error();
  678. prepare_sql_to_conn(
  679. Conn, [{Key, {SQL, _RowTemplate}} | Rest] = ToPrepare, Statements, Attempts
  680. ) when is_pid(Conn) ->
  681. LogMeta = #{msg => "postgresql_prepare_statement", name => Key, sql => SQL},
  682. ?SLOG(info, LogMeta),
  683. case epgsql:parse2(Conn, Key, SQL, []) of
  684. {ok, Statement} ->
  685. prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}, 0);
  686. {error, #error{severity = error, codename = undefined_table} = Error} ->
  687. %% Target table is not created
  688. ?tp(pgsql_undefined_table, #{}),
  689. LogMsg =
  690. maps:merge(
  691. LogMeta#{msg => "postgresql_parse_failed"},
  692. translate_to_log_context(Error)
  693. ),
  694. ?SLOG(error, LogMsg),
  695. {error, undefined_table};
  696. {error, #error{severity = error, codename = duplicate_prepared_statement}} = Error ->
  697. ?tp(pgsql_prepared_statement_exists, #{}),
  698. LogMsg =
  699. maps:merge(
  700. LogMeta#{
  701. msg => "postgresql_prepared_statment_with_same_name_already_exists",
  702. explain => <<
  703. "A prepared statement with the same name already "
  704. "exists in the driver. Will attempt to remove the "
  705. "previous prepared statement with the name and then "
  706. "try again."
  707. >>
  708. },
  709. translate_to_log_context(Error)
  710. ),
  711. ?SLOG(warning, LogMsg),
  712. case epgsql:close(Conn, statement, Key) of
  713. ok ->
  714. ?SLOG(info, #{msg => "pqsql_closed_statement_successfully"}),
  715. prepare_sql_to_conn(Conn, ToPrepare, Statements, Attempts + 1);
  716. {error, CloseError} ->
  717. ?SLOG(error, #{msg => "pqsql_close_statement_failed", cause => CloseError}),
  718. failed_to_remove_prev_prepared_statement_error()
  719. end;
  720. {error, Error} ->
  721. TranslatedError = translate_to_log_context(Error),
  722. LogMsg =
  723. maps:merge(
  724. LogMeta#{msg => "postgresql_parse_failed"},
  725. TranslatedError
  726. ),
  727. ?SLOG(error, LogMsg),
  728. {error, export_error(TranslatedError)}
  729. end.
  730. failed_to_remove_prev_prepared_statement_error() ->
  731. Msg =
  732. ("A previous prepared statement for the action already exists "
  733. "but cannot be closed. Please, try to disable and then enable "
  734. "the connector to resolve this issue."),
  735. {error, unicode:characters_to_binary(Msg)}.
  736. to_bin(Bin) when is_binary(Bin) ->
  737. Bin;
  738. to_bin(Atom) when is_atom(Atom) ->
  739. erlang:atom_to_binary(Atom).
  740. handle_result({error, {recoverable_error, _Error}} = Res) ->
  741. Res;
  742. handle_result({error, {unrecoverable_error, _Error}} = Res) ->
  743. Res;
  744. handle_result({error, disconnected}) ->
  745. {error, {recoverable_error, disconnected}};
  746. handle_result({error, Error}) ->
  747. TranslatedError = translate_to_log_context(Error),
  748. {error, {unrecoverable_error, export_error(TranslatedError)}};
  749. handle_result(Res) ->
  750. Res.
  751. on_format_query_result({ok, Cnt}) when is_integer(Cnt) ->
  752. #{result => ok, affected_rows => Cnt};
  753. on_format_query_result(Res) ->
  754. Res.
  755. handle_batch_result([{ok, Count} | Rest], Acc) ->
  756. handle_batch_result(Rest, Acc + Count);
  757. handle_batch_result([{error, Error} | _Rest], _Acc) ->
  758. TranslatedError = translate_to_log_context(Error),
  759. {error, {unrecoverable_error, export_error(TranslatedError)}};
  760. handle_batch_result([], Acc) ->
  761. ?tp("postgres_success_batch_result", #{row_count => Acc}),
  762. {ok, Acc}.
  763. translate_to_log_context({error, Reason}) ->
  764. translate_to_log_context(Reason);
  765. translate_to_log_context(#error{} = Reason) ->
  766. #error{
  767. severity = Severity,
  768. code = Code,
  769. codename = Codename,
  770. message = Message,
  771. extra = Extra
  772. } = Reason,
  773. #{
  774. driver_severity => Severity,
  775. driver_error_codename => Codename,
  776. driver_error_code => Code,
  777. driver_error_message => emqx_logger_textfmt:try_format_unicode(Message),
  778. driver_error_extra => Extra
  779. };
  780. translate_to_log_context(Reason) ->
  781. #{reason => Reason}.
  782. export_error(#{
  783. driver_severity := Severity,
  784. driver_error_codename := Codename,
  785. driver_error_code := Code
  786. }) ->
  787. %% Extra information has already been logged.
  788. #{
  789. error_code => Code,
  790. error_codename => Codename,
  791. severity => Severity
  792. };
  793. export_error(#{reason := Reason}) ->
  794. Reason;
  795. export_error(Error) ->
  796. Error.