emqx_postgresql.erl 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_postgresql).
  17. -include("emqx_postgresql.hrl").
  18. -include_lib("emqx_connector/include/emqx_connector.hrl").
  19. -include_lib("typerefl/include/types.hrl").
  20. -include_lib("emqx/include/logger.hrl").
  21. -include_lib("hocon/include/hoconsc.hrl").
  22. -include_lib("epgsql/include/epgsql.hrl").
  23. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  24. -export([roots/0, fields/1]).
  25. -behaviour(emqx_resource).
  26. %% callbacks of behaviour emqx_resource
  27. -export([
  28. callback_mode/0,
  29. on_start/2,
  30. on_stop/2,
  31. on_query/3,
  32. on_batch_query/3,
  33. on_get_status/2
  34. ]).
  35. -export([connect/1]).
  36. -export([
  37. query/3,
  38. prepared_query/3,
  39. execute_batch/3
  40. ]).
  41. %% for ecpool workers usage
  42. -export([do_get_status/1, prepare_sql_to_conn/2]).
  43. -define(PGSQL_HOST_OPTIONS, #{
  44. default_port => ?PGSQL_DEFAULT_PORT
  45. }).
  46. -type template() :: {unicode:chardata(), emqx_template_sql:row_template()}.
  47. -type state() ::
  48. #{
  49. pool_name := binary(),
  50. query_templates := #{binary() => template()},
  51. prepares := #{binary() => epgsql:statement()} | {error, _}
  52. }.
  53. %% FIXME: add `{error, sync_required}' to `epgsql:execute_batch'
  54. %% We want to be able to call sync if any message from the backend leaves the driver in an
  55. %% inconsistent state needing sync.
  56. -dialyzer({nowarn_function, [execute_batch/3]}).
  57. %%=====================================================================
  58. roots() ->
  59. [{config, #{type => hoconsc:ref(?MODULE, config)}}].
  60. fields(config) ->
  61. [{server, server()}] ++
  62. adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
  63. emqx_connector_schema_lib:ssl_fields() ++
  64. emqx_connector_schema_lib:prepare_statement_fields().
  65. server() ->
  66. Meta = #{desc => ?DESC("server")},
  67. emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS).
  68. adjust_fields(Fields) ->
  69. lists:map(
  70. fun
  71. ({username, Sc}) ->
  72. %% to please dialyzer...
  73. Override = #{type => hocon_schema:field_schema(Sc, type), required => true},
  74. {username, hocon_schema:override(Sc, Override)};
  75. (Field) ->
  76. Field
  77. end,
  78. Fields
  79. ).
  80. %% ===================================================================
  81. callback_mode() -> always_sync.
  82. -spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}.
  83. on_start(
  84. InstId,
  85. #{
  86. server := Server,
  87. database := DB,
  88. username := User,
  89. pool_size := PoolSize,
  90. ssl := SSL
  91. } = Config
  92. ) ->
  93. #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?PGSQL_HOST_OPTIONS),
  94. ?SLOG(info, #{
  95. msg => "starting_postgresql_connector",
  96. connector => InstId,
  97. config => emqx_utils:redact(Config)
  98. }),
  99. SslOpts =
  100. case maps:get(enable, SSL) of
  101. true ->
  102. [
  103. %% note: this is converted to `required' in
  104. %% `conn_opts/2', and there's a boolean guard
  105. %% there; if this is set to `required' here,
  106. %% that'll require changing `conn_opts/2''s guard.
  107. {ssl, true},
  108. {ssl_opts, emqx_tls_lib:to_client_opts(SSL)}
  109. ];
  110. false ->
  111. [{ssl, false}]
  112. end,
  113. Options = [
  114. {host, Host},
  115. {port, Port},
  116. {username, User},
  117. {password, maps:get(password, Config, emqx_secret:wrap(""))},
  118. {database, DB},
  119. {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
  120. {pool_size, PoolSize}
  121. ],
  122. State = parse_prepare_sql(Config),
  123. case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
  124. ok ->
  125. {ok, init_prepare(State#{pool_name => InstId, prepares => #{}})};
  126. {error, Reason} ->
  127. ?tp(
  128. pgsql_connector_start_failed,
  129. #{error => Reason}
  130. ),
  131. {error, Reason}
  132. end.
  133. on_stop(InstId, _State) ->
  134. ?SLOG(info, #{
  135. msg => "stopping_postgresql_connector",
  136. connector => InstId
  137. }),
  138. emqx_resource_pool:stop(InstId).
  139. on_query(InstId, {TypeOrKey, NameOrSQL}, State) ->
  140. on_query(InstId, {TypeOrKey, NameOrSQL, []}, State);
  141. on_query(
  142. InstId,
  143. {TypeOrKey, NameOrSQL, Params},
  144. #{pool_name := PoolName} = State
  145. ) ->
  146. ?SLOG(debug, #{
  147. msg => "postgresql_connector_received_sql_query",
  148. connector => InstId,
  149. type => TypeOrKey,
  150. sql => NameOrSQL,
  151. state => State
  152. }),
  153. Type = pgsql_query_type(TypeOrKey),
  154. {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
  155. Res = on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data),
  156. handle_result(Res).
  157. pgsql_query_type(sql) ->
  158. query;
  159. pgsql_query_type(query) ->
  160. query;
  161. pgsql_query_type(prepared_query) ->
  162. prepared_query;
  163. %% for bridge
  164. pgsql_query_type(_) ->
  165. pgsql_query_type(prepared_query).
  166. on_batch_query(
  167. InstId,
  168. [{Key, _} = Request | _] = BatchReq,
  169. #{pool_name := PoolName, query_templates := Templates, prepares := PrepStatements} = State
  170. ) ->
  171. BinKey = to_bin(Key),
  172. case maps:get(BinKey, Templates, undefined) of
  173. undefined ->
  174. Log = #{
  175. connector => InstId,
  176. first_request => Request,
  177. state => State,
  178. msg => "batch prepare not implemented"
  179. },
  180. ?SLOG(error, Log),
  181. {error, {unrecoverable_error, batch_prepare_not_implemented}};
  182. {_Statement, RowTemplate} ->
  183. PrepStatement = maps:get(BinKey, PrepStatements),
  184. Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
  185. case on_sql_query(InstId, PoolName, execute_batch, PrepStatement, Rows) of
  186. {error, _Error} = Result ->
  187. handle_result(Result);
  188. {_Column, Results} ->
  189. handle_batch_result(Results, 0)
  190. end
  191. end;
  192. on_batch_query(InstId, BatchReq, State) ->
  193. ?SLOG(error, #{
  194. connector => InstId,
  195. request => BatchReq,
  196. state => State,
  197. msg => "invalid request"
  198. }),
  199. {error, {unrecoverable_error, invalid_request}}.
  200. proc_sql_params(query, SQLOrKey, Params, _State) ->
  201. {SQLOrKey, Params};
  202. proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
  203. {SQLOrKey, Params};
  204. proc_sql_params(TypeOrKey, SQLOrData, Params, #{query_templates := Templates}) ->
  205. Key = to_bin(TypeOrKey),
  206. case maps:get(Key, Templates, undefined) of
  207. undefined ->
  208. {SQLOrData, Params};
  209. {_Statement, RowTemplate} ->
  210. {Key, render_prepare_sql_row(RowTemplate, SQLOrData)}
  211. end.
  212. on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
  213. try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
  214. {error, Reason} = Result ->
  215. ?tp(
  216. pgsql_connector_query_return,
  217. #{error => Reason}
  218. ),
  219. ?SLOG(
  220. error,
  221. maps:merge(
  222. #{
  223. msg => "postgresql_connector_do_sql_query_failed",
  224. connector => InstId,
  225. type => Type,
  226. sql => NameOrSQL
  227. },
  228. translate_to_log_context(Reason)
  229. )
  230. ),
  231. case Reason of
  232. sync_required ->
  233. {error, {recoverable_error, Reason}};
  234. ecpool_empty ->
  235. {error, {recoverable_error, Reason}};
  236. {error, error, _, undefined_table, _, _} ->
  237. {error, {unrecoverable_error, Reason}};
  238. _ ->
  239. Result
  240. end;
  241. Result ->
  242. ?tp(
  243. pgsql_connector_query_return,
  244. #{result => Result}
  245. ),
  246. Result
  247. catch
  248. error:function_clause:Stacktrace ->
  249. ?SLOG(error, #{
  250. msg => "postgresql_connector_do_sql_query_failed",
  251. connector => InstId,
  252. type => Type,
  253. sql => NameOrSQL,
  254. reason => function_clause,
  255. stacktrace => Stacktrace
  256. }),
  257. {error, {unrecoverable_error, invalid_request}}
  258. end.
  259. on_get_status(_InstId, #{pool_name := PoolName} = State) ->
  260. case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
  261. true ->
  262. case do_check_prepares(State) of
  263. ok ->
  264. connected;
  265. {ok, NState} ->
  266. %% return new state with prepared statements
  267. {connected, NState};
  268. {error, undefined_table} ->
  269. %% return new state indicating that we are connected but the target table is not created
  270. {disconnected, State, unhealthy_target};
  271. {error, _Reason} ->
  272. %% do not log error, it is logged in prepare_sql_to_conn
  273. connecting
  274. end;
  275. false ->
  276. connecting
  277. end.
  278. do_get_status(Conn) ->
  279. ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")).
  280. do_check_prepares(
  281. #{
  282. pool_name := PoolName,
  283. query_templates := #{<<"send_message">> := {SQL, _RowTemplate}}
  284. }
  285. ) ->
  286. WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  287. case validate_table_existence(WorkerPids, SQL) of
  288. ok ->
  289. ok;
  290. {error, Reason} ->
  291. {error, Reason}
  292. end;
  293. do_check_prepares(#{prepares := Prepares}) when is_map(Prepares) ->
  294. ok;
  295. do_check_prepares(#{prepares := {error, _}} = State) ->
  296. %% retry to prepare
  297. case prepare_sql(State) of
  298. {ok, PrepStatements} ->
  299. %% remove the error
  300. {ok, State#{prepares := PrepStatements}};
  301. {error, Reason} ->
  302. {error, Reason}
  303. end.
  304. -spec validate_table_existence([pid()], binary()) -> ok | {error, undefined_table}.
  305. validate_table_existence([WorkerPid | Rest], SQL) ->
  306. try ecpool_worker:client(WorkerPid) of
  307. {ok, Conn} ->
  308. case epgsql:parse2(Conn, "", SQL, []) of
  309. {error, {_, _, _, undefined_table, _, _}} ->
  310. {error, undefined_table};
  311. Res when is_tuple(Res) andalso ok == element(1, Res) ->
  312. ok;
  313. Res ->
  314. ?tp(postgres_connector_bad_parse2, #{result => Res}),
  315. validate_table_existence(Rest, SQL)
  316. end;
  317. _ ->
  318. validate_table_existence(Rest, SQL)
  319. catch
  320. exit:{noproc, _} ->
  321. validate_table_existence(Rest, SQL)
  322. end;
  323. validate_table_existence([], _SQL) ->
  324. %% All workers either replied an unexpected error; we will retry
  325. %% on the next health check.
  326. ok.
  327. %% ===================================================================
  328. connect(Opts) ->
  329. Host = proplists:get_value(host, Opts),
  330. Username = proplists:get_value(username, Opts),
  331. %% TODO: teach `epgsql` to accept 0-arity closures as passwords.
  332. Password = emqx_secret:unwrap(proplists:get_value(password, Opts)),
  333. case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of
  334. {ok, _Conn} = Ok ->
  335. Ok;
  336. {error, Reason} ->
  337. {error, Reason}
  338. end.
  339. query(Conn, SQL, Params) ->
  340. case epgsql:equery(Conn, SQL, Params) of
  341. {error, sync_required} = Res ->
  342. ok = epgsql:sync(Conn),
  343. Res;
  344. Res ->
  345. Res
  346. end.
  347. prepared_query(Conn, Name, Params) ->
  348. case epgsql:prepared_query2(Conn, Name, Params) of
  349. {error, sync_required} = Res ->
  350. ok = epgsql:sync(Conn),
  351. Res;
  352. Res ->
  353. Res
  354. end.
  355. execute_batch(Conn, Statement, Params) ->
  356. case epgsql:execute_batch(Conn, Statement, Params) of
  357. {error, sync_required} = Res ->
  358. ok = epgsql:sync(Conn),
  359. Res;
  360. Res ->
  361. Res
  362. end.
  363. conn_opts(Opts) ->
  364. conn_opts(Opts, []).
  365. conn_opts([], Acc) ->
  366. Acc;
  367. conn_opts([Opt = {database, _} | Opts], Acc) ->
  368. conn_opts(Opts, [Opt | Acc]);
  369. conn_opts([{ssl, Bool} | Opts], Acc) when is_boolean(Bool) ->
  370. Flag =
  371. case Bool of
  372. true -> required;
  373. false -> false
  374. end,
  375. conn_opts(Opts, [{ssl, Flag} | Acc]);
  376. conn_opts([Opt = {port, _} | Opts], Acc) ->
  377. conn_opts(Opts, [Opt | Acc]);
  378. conn_opts([Opt = {timeout, _} | Opts], Acc) ->
  379. conn_opts(Opts, [Opt | Acc]);
  380. conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
  381. conn_opts(Opts, [Opt | Acc]);
  382. conn_opts([_Opt | Opts], Acc) ->
  383. conn_opts(Opts, Acc).
  384. parse_prepare_sql(Config) ->
  385. Queries =
  386. case Config of
  387. #{prepare_statement := Qs} ->
  388. Qs;
  389. #{sql := Query} ->
  390. #{<<"send_message">> => Query};
  391. #{} ->
  392. #{}
  393. end,
  394. Templates = maps:fold(fun parse_prepare_sql/3, #{}, Queries),
  395. #{query_templates => Templates}.
  396. parse_prepare_sql(Key, Query, Acc) ->
  397. Template = emqx_template_sql:parse_prepstmt(Query, #{parameters => '$n'}),
  398. Acc#{Key => Template}.
  399. render_prepare_sql_row(RowTemplate, Data) ->
  400. % NOTE: ignoring errors here, missing variables will be replaced with `null`.
  401. {Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, {emqx_jsonish, Data}),
  402. Row.
  403. init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 ->
  404. State;
  405. init_prepare(State = #{}) ->
  406. case prepare_sql(State) of
  407. {ok, PrepStatements} ->
  408. State#{prepares => PrepStatements};
  409. Error ->
  410. ?SLOG(
  411. error,
  412. maps:merge(
  413. #{msg => <<"postgresql_init_prepare_statement_failed">>},
  414. translate_to_log_context(Error)
  415. )
  416. ),
  417. %% mark the prepares failed
  418. State#{prepares => Error}
  419. end.
  420. prepare_sql(#{query_templates := Templates, pool_name := PoolName}) ->
  421. prepare_sql(maps:to_list(Templates), PoolName).
  422. prepare_sql(Templates, PoolName) ->
  423. case do_prepare_sql(Templates, PoolName) of
  424. {ok, _Sts} = Ok ->
  425. %% prepare for reconnect
  426. ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]}),
  427. Ok;
  428. Error ->
  429. Error
  430. end.
  431. do_prepare_sql(Templates, PoolName) ->
  432. do_prepare_sql(ecpool:workers(PoolName), Templates, #{}).
  433. do_prepare_sql([{_Name, Worker} | Rest], Templates, _LastSts) ->
  434. {ok, Conn} = ecpool_worker:client(Worker),
  435. case prepare_sql_to_conn(Conn, Templates) of
  436. {ok, Sts} ->
  437. do_prepare_sql(Rest, Templates, Sts);
  438. Error ->
  439. Error
  440. end;
  441. do_prepare_sql([], _Prepares, LastSts) ->
  442. {ok, LastSts}.
  443. prepare_sql_to_conn(Conn, Prepares) ->
  444. prepare_sql_to_conn(Conn, Prepares, #{}).
  445. prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) ->
  446. {ok, Statements};
  447. prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when is_pid(Conn) ->
  448. LogMeta = #{msg => "postgresql_prepare_statement", name => Key, sql => SQL},
  449. ?SLOG(info, LogMeta),
  450. case epgsql:parse2(Conn, Key, SQL, []) of
  451. {ok, Statement} ->
  452. prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement});
  453. {error, {error, error, _, undefined_table, _, _} = Error} ->
  454. %% Target table is not created
  455. ?tp(pgsql_undefined_table, #{}),
  456. LogMsg =
  457. maps:merge(
  458. LogMeta#{msg => "postgresql_parse_failed"},
  459. translate_to_log_context(Error)
  460. ),
  461. ?SLOG(error, LogMsg),
  462. {error, undefined_table};
  463. {error, Error} = Other ->
  464. LogMsg =
  465. maps:merge(
  466. LogMeta#{msg => "postgresql_parse_failed"},
  467. translate_to_log_context(Error)
  468. ),
  469. ?SLOG(error, LogMsg),
  470. Other
  471. end.
  472. to_bin(Bin) when is_binary(Bin) ->
  473. Bin;
  474. to_bin(Atom) when is_atom(Atom) ->
  475. erlang:atom_to_binary(Atom).
  476. handle_result({error, {recoverable_error, _Error}} = Res) ->
  477. Res;
  478. handle_result({error, {unrecoverable_error, _Error}} = Res) ->
  479. Res;
  480. handle_result({error, disconnected}) ->
  481. {error, {recoverable_error, disconnected}};
  482. handle_result({error, Error}) ->
  483. {error, {unrecoverable_error, Error}};
  484. handle_result(Res) ->
  485. Res.
  486. handle_batch_result([{ok, Count} | Rest], Acc) ->
  487. handle_batch_result(Rest, Acc + Count);
  488. handle_batch_result([{error, Error} | _Rest], _Acc) ->
  489. {error, {unrecoverable_error, Error}};
  490. handle_batch_result([], Acc) ->
  491. {ok, Acc}.
  492. translate_to_log_context(#error{} = Reason) ->
  493. #error{
  494. severity = Severity,
  495. code = Code,
  496. codename = Codename,
  497. message = Message,
  498. extra = Extra
  499. } = Reason,
  500. #{
  501. driver_severity => Severity,
  502. driver_error_codename => Codename,
  503. driver_error_code => Code,
  504. driver_error_message => emqx_logger_textfmt:try_format_unicode(Message),
  505. driver_error_extra => Extra
  506. };
  507. translate_to_log_context(Reason) ->
  508. #{reason => Reason}.