emqx_postgresql.erl 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_postgresql).
  17. -include("emqx_postgresql.hrl").
  18. -include_lib("emqx_resource/include/emqx_resource.hrl").
  19. -include_lib("emqx_connector/include/emqx_connector.hrl").
  20. -include_lib("typerefl/include/types.hrl").
  21. -include_lib("emqx/include/logger.hrl").
  22. -include_lib("hocon/include/hoconsc.hrl").
  23. -include_lib("epgsql/include/epgsql.hrl").
  24. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  25. -export([roots/0, fields/1, namespace/0]).
  26. -behaviour(emqx_resource).
  27. %% callbacks of behaviour emqx_resource
  28. -export([
  29. resource_type/0,
  30. callback_mode/0,
  31. on_start/2,
  32. on_stop/2,
  33. on_query/3,
  34. on_batch_query/3,
  35. on_get_status/2,
  36. on_add_channel/4,
  37. on_remove_channel/3,
  38. on_get_channels/1,
  39. on_get_channel_status/3,
  40. on_format_query_result/1
  41. ]).
  42. -export([connect/1]).
  43. -export([
  44. query/3,
  45. prepared_query/3,
  46. execute_batch/3
  47. ]).
  48. -export([disable_prepared_statements/0]).
  49. %% for ecpool workers usage
  50. -export([do_get_status/1, prepare_sql_to_conn/2, get_reconnect_callback_signature/1]).
  51. -define(PGSQL_HOST_OPTIONS, #{
  52. default_port => ?PGSQL_DEFAULT_PORT
  53. }).
  54. -type template() :: {unicode:chardata(), emqx_template_sql:row_template()}.
  55. -type state() ::
  56. #{
  57. pool_name := binary(),
  58. query_templates := #{binary() => template()},
  59. prepares := disabled | #{binary() => epgsql:statement()} | {error, _}
  60. }.
  61. %% FIXME: add `{error, sync_required}' to `epgsql:execute_batch'
  62. %% We want to be able to call sync if any message from the backend leaves the driver in an
  63. %% inconsistent state needing sync.
  64. -dialyzer({nowarn_function, [execute_batch/3]}).
  65. %%=====================================================================
  66. namespace() -> postgres.
  67. roots() ->
  68. [{config, #{type => hoconsc:ref(?MODULE, config)}}].
  69. fields(config) ->
  70. [
  71. {server, server()},
  72. disable_prepared_statements()
  73. ] ++
  74. adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
  75. emqx_connector_schema_lib:ssl_fields() ++
  76. emqx_connector_schema_lib:prepare_statement_fields().
  77. server() ->
  78. Meta = #{desc => ?DESC("server")},
  79. emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS).
  80. disable_prepared_statements() ->
  81. {disable_prepared_statements,
  82. hoconsc:mk(
  83. boolean(),
  84. #{
  85. default => false,
  86. required => false,
  87. desc => ?DESC("disable_prepared_statements")
  88. }
  89. )}.
  90. adjust_fields(Fields) ->
  91. lists:map(
  92. fun
  93. ({username, Sc}) ->
  94. %% to please dialyzer...
  95. Override = #{type => hocon_schema:field_schema(Sc, type), required => true},
  96. {username, hocon_schema:override(Sc, Override)};
  97. (Field) ->
  98. Field
  99. end,
  100. Fields
  101. ).
  102. %% ===================================================================
  103. resource_type() -> pgsql.
  104. callback_mode() -> always_sync.
  105. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.
  106. on_start(
  107. InstId,
  108. #{
  109. server := Server,
  110. disable_prepared_statements := DisablePreparedStatements,
  111. database := DB,
  112. username := User,
  113. pool_size := PoolSize,
  114. ssl := SSL
  115. } = Config
  116. ) ->
  117. #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?PGSQL_HOST_OPTIONS),
  118. ?SLOG(info, #{
  119. msg => "starting_postgresql_connector",
  120. connector => InstId,
  121. config => emqx_utils:redact(Config)
  122. }),
  123. SslOpts =
  124. case maps:get(enable, SSL) of
  125. true ->
  126. [
  127. %% note: this is converted to `required' in
  128. %% `conn_opts/2', and there's a boolean guard
  129. %% there; if this is set to `required' here,
  130. %% that'll require changing `conn_opts/2''s guard.
  131. {ssl, true},
  132. {ssl_opts, emqx_tls_lib:to_client_opts(SSL)}
  133. ];
  134. false ->
  135. [{ssl, false}]
  136. end,
  137. Options = [
  138. {host, Host},
  139. {port, Port},
  140. {username, User},
  141. {password, maps:get(password, Config, emqx_secret:wrap(""))},
  142. {database, DB},
  143. {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
  144. {pool_size, PoolSize}
  145. ],
  146. State1 = parse_sql_template(Config, <<"send_message">>),
  147. State2 = State1#{installed_channels => #{}},
  148. case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
  149. ok ->
  150. Prepares =
  151. case DisablePreparedStatements of
  152. true -> disabled;
  153. false -> #{}
  154. end,
  155. case init_prepare(State2#{pool_name => InstId, prepares => Prepares}) of
  156. #{prepares := {error, _} = Error} ->
  157. Error;
  158. State ->
  159. {ok, State}
  160. end;
  161. {error, Reason} ->
  162. ?tp(
  163. pgsql_connector_start_failed,
  164. #{error => Reason}
  165. ),
  166. {error, Reason}
  167. end.
  168. on_stop(InstId, State) ->
  169. ?SLOG(info, #{
  170. msg => "stopping_postgresql_connector",
  171. connector => InstId
  172. }),
  173. close_connections(State),
  174. Res = emqx_resource_pool:stop(InstId),
  175. ?tp(postgres_stopped, #{instance_id => InstId}),
  176. Res.
  177. close_connections(#{pool_name := PoolName} = _State) ->
  178. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  179. close_connections_with_worker_pids(WorkerPids),
  180. ok.
  181. close_connections_with_worker_pids([WorkerPid | Rest]) ->
  182. %% We ignore errors since any error probably means that the
  183. %% connection is closed already.
  184. try ecpool_worker:client(WorkerPid) of
  185. {ok, Conn} ->
  186. _ = epgsql:close(Conn),
  187. close_connections_with_worker_pids(Rest);
  188. _ ->
  189. close_connections_with_worker_pids(Rest)
  190. catch
  191. _:_ ->
  192. close_connections_with_worker_pids(Rest)
  193. end;
  194. close_connections_with_worker_pids([]) ->
  195. ok.
  196. on_add_channel(
  197. _InstId,
  198. #{
  199. installed_channels := InstalledChannels
  200. } = OldState,
  201. ChannelId,
  202. ChannelConfig
  203. ) ->
  204. %% The following will throw an exception if the bridge producers fails to start
  205. {ok, ChannelState} = create_channel_state(ChannelId, OldState, ChannelConfig),
  206. case ChannelState of
  207. #{prepares := {error, Reason}} ->
  208. {error, {unhealthy_target, Reason}};
  209. _ ->
  210. NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
  211. %% Update state
  212. NewState = OldState#{installed_channels => NewInstalledChannels},
  213. {ok, NewState}
  214. end.
  215. create_channel_state(
  216. ChannelId,
  217. #{
  218. pool_name := PoolName,
  219. prepares := Prepares
  220. } = _ConnectorState,
  221. #{parameters := Parameters} = _ChannelConfig
  222. ) ->
  223. State1 = parse_sql_template(Parameters, ChannelId),
  224. {ok,
  225. init_prepare(State1#{
  226. pool_name => PoolName,
  227. prepares => Prepares,
  228. prepare_statement => #{}
  229. })}.
  230. on_remove_channel(
  231. _InstId,
  232. #{
  233. installed_channels := InstalledChannels
  234. } = OldState,
  235. ChannelId
  236. ) ->
  237. %% Close prepared statements
  238. ok = close_prepared_statement(ChannelId, OldState),
  239. NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
  240. %% Update state
  241. NewState = OldState#{installed_channels => NewInstalledChannels},
  242. {ok, NewState}.
  243. close_prepared_statement(_ChannelId, #{prepares := disabled}) ->
  244. ok;
  245. close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) ->
  246. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  247. close_prepared_statement(WorkerPids, ChannelId, State),
  248. ok.
  249. close_prepared_statement([WorkerPid | Rest], ChannelId, State) ->
  250. %% We ignore errors since any error probably means that the
  251. %% prepared statement doesn't exist. If it exists when we try
  252. %% to insert one with the same name, we will try to remove it
  253. %% again anyway.
  254. try ecpool_worker:client(WorkerPid) of
  255. {ok, Conn} ->
  256. ok = ecpool_worker:remove_reconnect_callback_by_signature(WorkerPid, ChannelId),
  257. case get_templated_statement(ChannelId, State) of
  258. {ok, Statement} ->
  259. _ = epgsql:close(Conn, Statement),
  260. close_prepared_statement(Rest, ChannelId, State);
  261. error ->
  262. %% channel was not added
  263. ok
  264. end;
  265. _ ->
  266. close_prepared_statement(Rest, ChannelId, State)
  267. catch
  268. _:_ ->
  269. close_prepared_statement(Rest, ChannelId, State)
  270. end;
  271. close_prepared_statement([], _ChannelId, _State) ->
  272. ok.
  273. on_get_channel_status(
  274. _ResId,
  275. ChannelId,
  276. #{
  277. pool_name := PoolName,
  278. installed_channels := Channels
  279. } = _State
  280. ) ->
  281. ChannelState = maps:get(ChannelId, Channels),
  282. case
  283. do_check_channel_sql(
  284. PoolName,
  285. ChannelId,
  286. ChannelState
  287. )
  288. of
  289. ok ->
  290. ?status_connected;
  291. {error, undefined_table} ->
  292. {?status_disconnected, {unhealthy_target, <<"Table does not exist">>}}
  293. end.
  294. do_check_channel_sql(
  295. PoolName,
  296. ChannelId,
  297. #{query_templates := ChannelQueryTemplates} = _ChannelState
  298. ) ->
  299. {SQL, _RowTemplate} = maps:get(ChannelId, ChannelQueryTemplates),
  300. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  301. validate_table_existence(WorkerPids, SQL).
  302. on_get_channels(ResId) ->
  303. emqx_bridge_v2:get_channels_for_connector(ResId).
  304. -spec on_query
  305. %% Called from authn and authz modules
  306. (connector_resource_id(), {prepared_query, binary(), [term()]}, state()) ->
  307. {ok, _} | {error, term()};
  308. %% Called from bridges
  309. (connector_resource_id(), {action_resource_id(), map()}, state()) ->
  310. {ok, _} | {error, term()}.
  311. on_query(InstId, {TypeOrKey, NameOrMap}, State) ->
  312. on_query(InstId, {TypeOrKey, NameOrMap, []}, State);
  313. on_query(
  314. InstId,
  315. {TypeOrKey, NameOrMap, Params},
  316. #{pool_name := PoolName} = State
  317. ) ->
  318. ?TRACE("QUERY", "postgresql_connector_received_sql_query", #{
  319. connector => InstId,
  320. type => TypeOrKey,
  321. sql => NameOrMap,
  322. state => State
  323. }),
  324. {QueryType, NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrMap, Params, State),
  325. emqx_trace:rendered_action_template(
  326. TypeOrKey,
  327. #{
  328. statement_type => QueryType,
  329. statement_or_name => NameOrSQL2,
  330. data => Data
  331. }
  332. ),
  333. Res = on_sql_query(InstId, PoolName, QueryType, NameOrSQL2, Data),
  334. ?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}),
  335. handle_result(Res).
  336. on_batch_query(
  337. InstId,
  338. [{Key, _} = Request | _] = BatchReq,
  339. #{pool_name := PoolName} = State
  340. ) ->
  341. BinKey = to_bin(Key),
  342. case get_template(BinKey, State) of
  343. undefined ->
  344. Log = #{
  345. connector => InstId,
  346. first_request => Request,
  347. state => State,
  348. msg => "batch prepare not implemented"
  349. },
  350. ?SLOG(error, Log),
  351. {error, {unrecoverable_error, batch_prepare_not_implemented}};
  352. {_Statement, RowTemplate} ->
  353. {ok, StatementTemplate} = get_templated_statement(BinKey, State),
  354. Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
  355. emqx_trace:rendered_action_template(
  356. Key,
  357. #{
  358. statement_type => execute_batch,
  359. statement_or_name => StatementTemplate,
  360. data => Rows
  361. }
  362. ),
  363. case on_sql_query(InstId, PoolName, execute_batch, StatementTemplate, Rows) of
  364. {error, _Error} = Result ->
  365. handle_result(Result);
  366. {_Column, Results} ->
  367. handle_batch_result(Results, 0)
  368. end
  369. end;
  370. on_batch_query(InstId, BatchReq, State) ->
  371. ?SLOG(error, #{
  372. connector => InstId,
  373. request => BatchReq,
  374. state => State,
  375. msg => "invalid request"
  376. }),
  377. {error, {unrecoverable_error, invalid_request}}.
  378. proc_sql_params(ActionResId, #{} = Map, [], State) when is_binary(ActionResId) ->
  379. %% When this connector is called from actions/bridges.
  380. DisablePreparedStatements = prepared_statements_disabled(State),
  381. {ExprTemplate, RowTemplate} = get_template(ActionResId, State),
  382. Rendered = render_prepare_sql_row(RowTemplate, Map),
  383. case DisablePreparedStatements of
  384. true ->
  385. {query, ExprTemplate, Rendered};
  386. false ->
  387. {prepared_query, ActionResId, Rendered}
  388. end;
  389. proc_sql_params(prepared_query, ConnResId, Params, State) ->
  390. %% When this connector is called from authn/authz modules
  391. DisablePreparedStatements = prepared_statements_disabled(State),
  392. case DisablePreparedStatements of
  393. true ->
  394. #{query_templates := #{ConnResId := {ExprTemplate, _VarsTemplate}}} = State,
  395. {query, ExprTemplate, Params};
  396. false ->
  397. %% Connector resource id itself is the prepared statement name
  398. {prepared_query, ConnResId, Params}
  399. end;
  400. proc_sql_params(QueryType, SQL, Params, _State) when
  401. is_atom(QueryType) andalso
  402. (is_binary(SQL) orelse is_list(SQL)) andalso
  403. is_list(Params)
  404. ->
  405. %% When called to do ad-hoc commands/queries.
  406. {QueryType, SQL, Params}.
  407. prepared_statements_disabled(State) ->
  408. maps:get(prepares, State, #{}) =:= disabled.
  409. get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) ->
  410. BinKey = to_bin(Key),
  411. ChannelState = maps:get(BinKey, Channels),
  412. ChannelQueryTemplates = maps:get(query_templates, ChannelState),
  413. maps:get(BinKey, ChannelQueryTemplates);
  414. get_template(Key, #{query_templates := Templates}) ->
  415. BinKey = to_bin(Key),
  416. maps:get(BinKey, Templates, undefined).
  417. get_templated_statement(Key, #{installed_channels := Channels} = _State) ->
  418. BinKey = to_bin(Key),
  419. case is_map_key(BinKey, Channels) of
  420. true ->
  421. ChannelState = maps:get(BinKey, Channels),
  422. case ChannelState of
  423. #{prepares := disabled, query_templates := #{BinKey := {ExprTemplate, _}}} ->
  424. {ok, ExprTemplate};
  425. #{prepares := #{BinKey := ExprTemplate}} ->
  426. {ok, ExprTemplate}
  427. end;
  428. false ->
  429. error
  430. end;
  431. get_templated_statement(Key, #{prepares := PrepStatements}) ->
  432. BinKey = to_bin(Key),
  433. {ok, maps:get(BinKey, PrepStatements)}.
  434. on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
  435. try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
  436. {error, Reason} ->
  437. ?tp(
  438. pgsql_connector_query_return,
  439. #{error => Reason}
  440. ),
  441. TranslatedError = translate_to_log_context(Reason),
  442. ?SLOG(
  443. error,
  444. maps:merge(
  445. #{
  446. msg => "postgresql_connector_do_sql_query_failed",
  447. connector => InstId,
  448. type => Type,
  449. sql => NameOrSQL
  450. },
  451. TranslatedError
  452. )
  453. ),
  454. case Reason of
  455. sync_required ->
  456. {error, {recoverable_error, Reason}};
  457. ecpool_empty ->
  458. {error, {recoverable_error, Reason}};
  459. {error, error, _, undefined_table, _, _} ->
  460. {error, {unrecoverable_error, export_error(TranslatedError)}};
  461. _ ->
  462. {error, export_error(TranslatedError)}
  463. end;
  464. Result ->
  465. ?tp(
  466. pgsql_connector_query_return,
  467. #{result => Result}
  468. ),
  469. Result
  470. catch
  471. error:function_clause:Stacktrace ->
  472. ?SLOG(error, #{
  473. msg => "postgresql_connector_do_sql_query_failed",
  474. connector => InstId,
  475. type => Type,
  476. sql => NameOrSQL,
  477. reason => function_clause,
  478. stacktrace => Stacktrace
  479. }),
  480. {error, {unrecoverable_error, invalid_request}}
  481. end.
  482. on_get_status(_InstId, #{pool_name := PoolName} = State) ->
  483. case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
  484. true ->
  485. case do_check_prepares(State) of
  486. ok ->
  487. ?status_connected;
  488. {error, undefined_table} ->
  489. %% return error indicating that we are connected but the target table
  490. %% is not created
  491. {?status_disconnected, unhealthy_target}
  492. end;
  493. false ->
  494. ?status_connecting
  495. end.
  496. do_get_status(Conn) ->
  497. ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")).
  498. do_check_prepares(
  499. #{
  500. pool_name := PoolName,
  501. query_templates := #{<<"send_message">> := {SQL, _RowTemplate}}
  502. }
  503. ) ->
  504. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  505. case validate_table_existence(WorkerPids, SQL) of
  506. ok ->
  507. ok;
  508. {error, Reason} ->
  509. {error, Reason}
  510. end;
  511. do_check_prepares(_) ->
  512. ok.
  513. -spec validate_table_existence([pid()], binary()) -> ok | {error, undefined_table}.
  514. validate_table_existence([WorkerPid | Rest], SQL) ->
  515. try ecpool_worker:client(WorkerPid) of
  516. {ok, Conn} ->
  517. case epgsql:parse2(Conn, "", SQL, []) of
  518. {error, {_, _, _, undefined_table, _, _}} ->
  519. {error, undefined_table};
  520. Res when is_tuple(Res) andalso ok == element(1, Res) ->
  521. ok;
  522. Res ->
  523. ?tp(postgres_connector_bad_parse2, #{result => Res}),
  524. validate_table_existence(Rest, SQL)
  525. end;
  526. _ ->
  527. validate_table_existence(Rest, SQL)
  528. catch
  529. exit:{noproc, _} ->
  530. validate_table_existence(Rest, SQL)
  531. end;
  532. validate_table_existence([], _SQL) ->
  533. %% All workers either replied an unexpected error; we will retry
  534. %% on the next health check.
  535. ok.
  536. %% ===================================================================
  537. connect(Opts) ->
  538. Host = proplists:get_value(host, Opts),
  539. Username = proplists:get_value(username, Opts),
  540. %% TODO: teach `epgsql` to accept 0-arity closures as passwords.
  541. Password = emqx_secret:unwrap(proplists:get_value(password, Opts)),
  542. case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of
  543. {ok, _Conn} = Ok ->
  544. Ok;
  545. {error, Reason} ->
  546. {error, Reason}
  547. end.
  548. query(Conn, SQL, Params) ->
  549. case epgsql:equery(Conn, SQL, Params) of
  550. {error, sync_required} = Res ->
  551. ok = epgsql:sync(Conn),
  552. Res;
  553. Res ->
  554. Res
  555. end.
  556. prepared_query(Conn, Name, Params) ->
  557. case epgsql:prepared_query2(Conn, Name, Params) of
  558. {error, sync_required} = Res ->
  559. ok = epgsql:sync(Conn),
  560. Res;
  561. Res ->
  562. Res
  563. end.
  564. execute_batch(Conn, Statement, Params) ->
  565. case epgsql:execute_batch(Conn, Statement, Params) of
  566. {error, sync_required} = Res ->
  567. ok = epgsql:sync(Conn),
  568. Res;
  569. Res ->
  570. Res
  571. end.
  572. conn_opts(Opts) ->
  573. conn_opts(Opts, []).
  574. conn_opts([], Acc) ->
  575. Acc;
  576. conn_opts([Opt = {database, _} | Opts], Acc) ->
  577. conn_opts(Opts, [Opt | Acc]);
  578. conn_opts([{ssl, Bool} | Opts], Acc) when is_boolean(Bool) ->
  579. Flag =
  580. case Bool of
  581. true -> required;
  582. false -> false
  583. end,
  584. conn_opts(Opts, [{ssl, Flag} | Acc]);
  585. conn_opts([Opt = {port, _} | Opts], Acc) ->
  586. conn_opts(Opts, [Opt | Acc]);
  587. conn_opts([Opt = {timeout, _} | Opts], Acc) ->
  588. conn_opts(Opts, [Opt | Acc]);
  589. conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
  590. conn_opts(Opts, [Opt | Acc]);
  591. conn_opts([_Opt | Opts], Acc) ->
  592. conn_opts(Opts, Acc).
  593. parse_sql_template(Config, ChannelId) ->
  594. Queries =
  595. case Config of
  596. #{prepare_statement := Qs} ->
  597. Qs;
  598. #{sql := Query} ->
  599. #{ChannelId => Query};
  600. #{} ->
  601. #{}
  602. end,
  603. Templates = maps:fold(fun parse_sql_template/3, #{}, Queries),
  604. #{query_templates => Templates}.
  605. parse_sql_template(Key, Query, Acc) ->
  606. Template = emqx_template_sql:parse_prepstmt(Query, #{parameters => '$n'}),
  607. Acc#{Key => Template}.
  608. render_prepare_sql_row(RowTemplate, Data) ->
  609. % NOTE: ignoring errors here, missing variables will be replaced with `null`.
  610. {Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, {emqx_jsonish, Data}),
  611. Row.
  612. init_prepare(State = #{prepares := disabled}) ->
  613. State;
  614. init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 ->
  615. State;
  616. init_prepare(State = #{}) ->
  617. case prepare_sql(State) of
  618. {ok, PrepStatements} ->
  619. State#{prepares => PrepStatements};
  620. Error ->
  621. TranslatedError = translate_to_log_context(Error),
  622. ?SLOG(
  623. error,
  624. maps:merge(
  625. #{msg => <<"postgresql_init_prepare_statement_failed">>},
  626. TranslatedError
  627. )
  628. ),
  629. %% mark the prepares failed
  630. State#{prepares => {error, export_error(TranslatedError)}}
  631. end.
  632. prepare_sql(#{query_templates := Templates, pool_name := PoolName}) ->
  633. prepare_sql(maps:to_list(Templates), PoolName).
  634. prepare_sql(Templates, PoolName) ->
  635. case do_prepare_sql(Templates, PoolName) of
  636. {ok, _Sts} = Ok ->
  637. %% prepare for reconnect
  638. ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]}),
  639. Ok;
  640. Error ->
  641. Error
  642. end.
  643. %% this callback accepts the arg list provided to
  644. %% ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]})
  645. %% so ecpool_worker can de-duplicate the callbacks based on the signature.
  646. get_reconnect_callback_signature([[{ChannelId, _Template}]]) ->
  647. ChannelId.
  648. do_prepare_sql(Templates, PoolName) ->
  649. do_prepare_sql(ecpool:workers(PoolName), Templates, #{}).
  650. do_prepare_sql([{_Name, Worker} | Rest], Templates, _LastSts) ->
  651. {ok, Conn} = ecpool_worker:client(Worker),
  652. case prepare_sql_to_conn(Conn, Templates) of
  653. {ok, Sts} ->
  654. do_prepare_sql(Rest, Templates, Sts);
  655. Error ->
  656. Error
  657. end;
  658. do_prepare_sql([], _Prepares, LastSts) ->
  659. {ok, LastSts}.
  660. prepare_sql_to_conn(Conn, Prepares) ->
  661. prepare_sql_to_conn(Conn, Prepares, #{}, 0).
  662. prepare_sql_to_conn(Conn, [], Statements, _Attempts) when is_pid(Conn) ->
  663. {ok, Statements};
  664. prepare_sql_to_conn(Conn, [{_Key, _} | _Rest], _Statements, _MaxAttempts = 2) when is_pid(Conn) ->
  665. failed_to_remove_prev_prepared_statement_error();
  666. prepare_sql_to_conn(
  667. Conn, [{Key, {SQL, _RowTemplate}} | Rest] = ToPrepare, Statements, Attempts
  668. ) when is_pid(Conn) ->
  669. LogMeta = #{msg => "postgresql_prepare_statement", name => Key, sql => SQL},
  670. ?SLOG(info, LogMeta),
  671. case epgsql:parse2(Conn, Key, SQL, []) of
  672. {ok, Statement} ->
  673. prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}, 0);
  674. {error, #error{severity = error, codename = undefined_table} = Error} ->
  675. %% Target table is not created
  676. ?tp(pgsql_undefined_table, #{}),
  677. LogMsg =
  678. maps:merge(
  679. LogMeta#{msg => "postgresql_parse_failed"},
  680. translate_to_log_context(Error)
  681. ),
  682. ?SLOG(error, LogMsg),
  683. {error, undefined_table};
  684. {error, #error{severity = error, codename = duplicate_prepared_statement}} = Error ->
  685. ?tp(pgsql_prepared_statement_exists, #{}),
  686. LogMsg =
  687. maps:merge(
  688. LogMeta#{
  689. msg => "postgresql_prepared_statment_with_same_name_already_exists",
  690. explain => <<
  691. "A prepared statement with the same name already "
  692. "exists in the driver. Will attempt to remove the "
  693. "previous prepared statement with the name and then "
  694. "try again."
  695. >>
  696. },
  697. translate_to_log_context(Error)
  698. ),
  699. ?SLOG(warning, LogMsg),
  700. case epgsql:close(Conn, statement, Key) of
  701. ok ->
  702. ?SLOG(info, #{msg => "pqsql_closed_statement_successfully"}),
  703. prepare_sql_to_conn(Conn, ToPrepare, Statements, Attempts + 1);
  704. {error, CloseError} ->
  705. ?SLOG(error, #{msg => "pqsql_close_statement_failed", cause => CloseError}),
  706. failed_to_remove_prev_prepared_statement_error()
  707. end;
  708. {error, Error} ->
  709. TranslatedError = translate_to_log_context(Error),
  710. LogMsg =
  711. maps:merge(
  712. LogMeta#{msg => "postgresql_parse_failed"},
  713. TranslatedError
  714. ),
  715. ?SLOG(error, LogMsg),
  716. {error, export_error(TranslatedError)}
  717. end.
  718. failed_to_remove_prev_prepared_statement_error() ->
  719. Msg =
  720. ("A previous prepared statement for the action already exists "
  721. "but cannot be closed. Please, try to disable and then enable "
  722. "the connector to resolve this issue."),
  723. {error, unicode:characters_to_binary(Msg)}.
  724. to_bin(Bin) when is_binary(Bin) ->
  725. Bin;
  726. to_bin(Atom) when is_atom(Atom) ->
  727. erlang:atom_to_binary(Atom).
  728. handle_result({error, {recoverable_error, _Error}} = Res) ->
  729. Res;
  730. handle_result({error, {unrecoverable_error, _Error}} = Res) ->
  731. Res;
  732. handle_result({error, disconnected}) ->
  733. {error, {recoverable_error, disconnected}};
  734. handle_result({error, Error}) ->
  735. TranslatedError = translate_to_log_context(Error),
  736. {error, {unrecoverable_error, export_error(TranslatedError)}};
  737. handle_result(Res) ->
  738. Res.
  739. on_format_query_result({ok, Cnt}) when is_integer(Cnt) ->
  740. #{result => ok, affected_rows => Cnt};
  741. on_format_query_result(Res) ->
  742. Res.
  743. handle_batch_result([{ok, Count} | Rest], Acc) ->
  744. handle_batch_result(Rest, Acc + Count);
  745. handle_batch_result([{error, Error} | _Rest], _Acc) ->
  746. TranslatedError = translate_to_log_context(Error),
  747. {error, {unrecoverable_error, export_error(TranslatedError)}};
  748. handle_batch_result([], Acc) ->
  749. ?tp("postgres_success_batch_result", #{row_count => Acc}),
  750. {ok, Acc}.
  751. translate_to_log_context({error, Reason}) ->
  752. translate_to_log_context(Reason);
  753. translate_to_log_context(#error{} = Reason) ->
  754. #error{
  755. severity = Severity,
  756. code = Code,
  757. codename = Codename,
  758. message = Message,
  759. extra = Extra
  760. } = Reason,
  761. #{
  762. driver_severity => Severity,
  763. driver_error_codename => Codename,
  764. driver_error_code => Code,
  765. driver_error_message => emqx_logger_textfmt:try_format_unicode(Message),
  766. driver_error_extra => Extra
  767. };
  768. translate_to_log_context(Reason) ->
  769. #{reason => Reason}.
  770. export_error(#{
  771. driver_severity := Severity,
  772. driver_error_codename := Codename,
  773. driver_error_code := Code
  774. }) ->
  775. %% Extra information has already been logged.
  776. #{
  777. error_code => Code,
  778. error_codename => Codename,
  779. severity => Severity
  780. };
  781. export_error(#{reason := Reason}) ->
  782. Reason;
  783. export_error(Error) ->
  784. Error.