| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_postgresql).
- -include("emqx_postgresql.hrl").
- -include_lib("emqx_connector/include/emqx_connector.hrl").
- -include_lib("typerefl/include/types.hrl").
- -include_lib("emqx/include/logger.hrl").
- -include_lib("hocon/include/hoconsc.hrl").
- -include_lib("epgsql/include/epgsql.hrl").
- -include_lib("snabbkaffe/include/snabbkaffe.hrl").
- -export([roots/0, fields/1, namespace/0]).
- -behaviour(emqx_resource).
- %% callbacks of behaviour emqx_resource
- -export([
- resource_type/0,
- callback_mode/0,
- on_start/2,
- on_stop/2,
- on_query/3,
- on_batch_query/3,
- on_get_status/2,
- on_add_channel/4,
- on_remove_channel/3,
- on_get_channels/1,
- on_get_channel_status/3,
- on_format_query_result/1
- ]).
- -export([connect/1]).
- -export([
- query/3,
- prepared_query/3,
- execute_batch/3
- ]).
- -export([disable_prepared_statements/0]).
- %% for ecpool workers usage
- -export([do_get_status/1, prepare_sql_to_conn/2]).
- -define(PGSQL_HOST_OPTIONS, #{
- default_port => ?PGSQL_DEFAULT_PORT
- }).
- -type connector_resource_id() :: binary().
- -type action_resource_id() :: binary().
- -type template() :: {unicode:chardata(), emqx_template_sql:row_template()}.
- -type state() ::
- #{
- pool_name := binary(),
- query_templates := #{binary() => template()},
- prepares := disabled | #{binary() => epgsql:statement()} | {error, _}
- }.
- %% FIXME: add `{error, sync_required}' to `epgsql:execute_batch'
- %% We want to be able to call sync if any message from the backend leaves the driver in an
- %% inconsistent state needing sync.
- -dialyzer({nowarn_function, [execute_batch/3]}).
- %%=====================================================================
- namespace() -> postgres.
- roots() ->
- [{config, #{type => hoconsc:ref(?MODULE, config)}}].
- fields(config) ->
- [
- {server, server()},
- disable_prepared_statements()
- ] ++
- adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
- emqx_connector_schema_lib:ssl_fields() ++
- emqx_connector_schema_lib:prepare_statement_fields().
- server() ->
- Meta = #{desc => ?DESC("server")},
- emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS).
- disable_prepared_statements() ->
- {disable_prepared_statements,
- hoconsc:mk(
- boolean(),
- #{
- default => false,
- required => false,
- desc => ?DESC("disable_prepared_statements")
- }
- )}.
- adjust_fields(Fields) ->
- lists:map(
- fun
- ({username, Sc}) ->
- %% to please dialyzer...
- Override = #{type => hocon_schema:field_schema(Sc, type), required => true},
- {username, hocon_schema:override(Sc, Override)};
- (Field) ->
- Field
- end,
- Fields
- ).
- %% ===================================================================
- resource_type() -> postgresql.
- callback_mode() -> always_sync.
- -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.
- on_start(
- InstId,
- #{
- server := Server,
- disable_prepared_statements := DisablePreparedStatements,
- database := DB,
- username := User,
- pool_size := PoolSize,
- ssl := SSL
- } = Config
- ) ->
- #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?PGSQL_HOST_OPTIONS),
- ?SLOG(info, #{
- msg => "starting_postgresql_connector",
- connector => InstId,
- config => emqx_utils:redact(Config)
- }),
- SslOpts =
- case maps:get(enable, SSL) of
- true ->
- [
- %% note: this is converted to `required' in
- %% `conn_opts/2', and there's a boolean guard
- %% there; if this is set to `required' here,
- %% that'll require changing `conn_opts/2''s guard.
- {ssl, true},
- {ssl_opts, emqx_tls_lib:to_client_opts(SSL)}
- ];
- false ->
- [{ssl, false}]
- end,
- Options = [
- {host, Host},
- {port, Port},
- {username, User},
- {password, maps:get(password, Config, emqx_secret:wrap(""))},
- {database, DB},
- {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
- {pool_size, PoolSize}
- ],
- State1 = parse_sql_template(Config, <<"send_message">>),
- State2 = State1#{installed_channels => #{}},
- case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
- ok ->
- Prepares =
- case DisablePreparedStatements of
- true -> disabled;
- false -> #{}
- end,
- {ok, init_prepare(State2#{pool_name => InstId, prepares => Prepares})};
- {error, Reason} ->
- ?tp(
- pgsql_connector_start_failed,
- #{error => Reason}
- ),
- {error, Reason}
- end.
- on_stop(InstId, State) ->
- ?SLOG(info, #{
- msg => "stopping_postgresql_connector",
- connector => InstId
- }),
- close_connections(State),
- Res = emqx_resource_pool:stop(InstId),
- ?tp(postgres_stopped, #{instance_id => InstId}),
- Res.
- close_connections(#{pool_name := PoolName} = _State) ->
- WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
- close_connections_with_worker_pids(WorkerPids),
- ok.
- close_connections_with_worker_pids([WorkerPid | Rest]) ->
- %% We ignore errors since any error probably means that the
- %% connection is closed already.
- try ecpool_worker:client(WorkerPid) of
- {ok, Conn} ->
- _ = epgsql:close(Conn),
- close_connections_with_worker_pids(Rest);
- _ ->
- close_connections_with_worker_pids(Rest)
- catch
- _:_ ->
- close_connections_with_worker_pids(Rest)
- end;
- close_connections_with_worker_pids([]) ->
- ok.
- on_add_channel(
- _InstId,
- #{
- installed_channels := InstalledChannels
- } = OldState,
- ChannelId,
- ChannelConfig
- ) ->
- %% The following will throw an exception if the bridge producers fails to start
- {ok, ChannelState} = create_channel_state(ChannelId, OldState, ChannelConfig),
- case ChannelState of
- #{prepares := {error, Reason}} ->
- {error, {unhealthy_target, Reason}};
- _ ->
- NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
- %% Update state
- NewState = OldState#{installed_channels => NewInstalledChannels},
- {ok, NewState}
- end.
- create_channel_state(
- ChannelId,
- #{
- pool_name := PoolName,
- prepares := Prepares
- } = _ConnectorState,
- #{parameters := Parameters} = _ChannelConfig
- ) ->
- State1 = parse_sql_template(Parameters, ChannelId),
- {ok,
- init_prepare(State1#{
- pool_name => PoolName,
- prepares => Prepares,
- prepare_statement => #{}
- })}.
- on_remove_channel(
- _InstId,
- #{
- installed_channels := InstalledChannels
- } = OldState,
- ChannelId
- ) ->
- %% Close prepared statements
- ok = close_prepared_statement(ChannelId, OldState),
- NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
- %% Update state
- NewState = OldState#{installed_channels => NewInstalledChannels},
- {ok, NewState}.
- close_prepared_statement(_ChannelId, #{prepares := disabled}) ->
- ok;
- close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) ->
- WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
- close_prepared_statement(WorkerPids, ChannelId, State),
- ok.
- close_prepared_statement([WorkerPid | Rest], ChannelId, State) ->
- %% We ignore errors since any error probably means that the
- %% prepared statement doesn't exist. If it exists when we try
- %% to insert one with the same name, we will try to remove it
- %% again anyway.
- try ecpool_worker:client(WorkerPid) of
- {ok, Conn} ->
- Statement = get_templated_statement(ChannelId, State),
- _ = epgsql:close(Conn, Statement),
- close_prepared_statement(Rest, ChannelId, State);
- _ ->
- close_prepared_statement(Rest, ChannelId, State)
- catch
- _:_ ->
- close_prepared_statement(Rest, ChannelId, State)
- end;
- close_prepared_statement([], _ChannelId, _State) ->
- ok.
- on_get_channel_status(
- _ResId,
- ChannelId,
- #{
- pool_name := PoolName,
- installed_channels := Channels
- } = _State
- ) ->
- ChannelState = maps:get(ChannelId, Channels),
- case
- do_check_channel_sql(
- PoolName,
- ChannelId,
- ChannelState
- )
- of
- ok ->
- connected;
- {error, undefined_table} ->
- {error, {unhealthy_target, <<"Table does not exist">>}}
- end.
- do_check_channel_sql(
- PoolName,
- ChannelId,
- #{query_templates := ChannelQueryTemplates} = _ChannelState
- ) ->
- {SQL, _RowTemplate} = maps:get(ChannelId, ChannelQueryTemplates),
- WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
- validate_table_existence(WorkerPids, SQL).
- on_get_channels(ResId) ->
- emqx_bridge_v2:get_channels_for_connector(ResId).
- -spec on_query
- %% Called from authn and authz modules
- (connector_resource_id(), {prepared_query, binary(), [term()]}, state()) ->
- {ok, _} | {error, term()};
- %% Called from bridges
- (connector_resource_id(), {action_resource_id(), map()}, state()) ->
- {ok, _} | {error, term()}.
- on_query(InstId, {TypeOrKey, NameOrMap}, State) ->
- on_query(InstId, {TypeOrKey, NameOrMap, []}, State);
- on_query(
- InstId,
- {TypeOrKey, NameOrMap, Params},
- #{pool_name := PoolName} = State
- ) ->
- ?SLOG(debug, #{
- msg => "postgresql_connector_received_sql_query",
- connector => InstId,
- type => TypeOrKey,
- sql => NameOrMap,
- state => State
- }),
- {QueryType, NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrMap, Params, State),
- emqx_trace:rendered_action_template(
- TypeOrKey,
- #{
- statement_type => QueryType,
- statement_or_name => NameOrSQL2,
- data => Data
- }
- ),
- Res = on_sql_query(InstId, PoolName, QueryType, NameOrSQL2, Data),
- ?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}),
- handle_result(Res).
- on_batch_query(
- InstId,
- [{Key, _} = Request | _] = BatchReq,
- #{pool_name := PoolName} = State
- ) ->
- BinKey = to_bin(Key),
- case get_template(BinKey, State) of
- undefined ->
- Log = #{
- connector => InstId,
- first_request => Request,
- state => State,
- msg => "batch prepare not implemented"
- },
- ?SLOG(error, Log),
- {error, {unrecoverable_error, batch_prepare_not_implemented}};
- {_Statement, RowTemplate} ->
- StatementTemplate = get_templated_statement(BinKey, State),
- Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
- emqx_trace:rendered_action_template(
- Key,
- #{
- statement_type => execute_batch,
- statement_or_name => StatementTemplate,
- data => Rows
- }
- ),
- case on_sql_query(InstId, PoolName, execute_batch, StatementTemplate, Rows) of
- {error, _Error} = Result ->
- handle_result(Result);
- {_Column, Results} ->
- handle_batch_result(Results, 0)
- end
- end;
- on_batch_query(InstId, BatchReq, State) ->
- ?SLOG(error, #{
- connector => InstId,
- request => BatchReq,
- state => State,
- msg => "invalid request"
- }),
- {error, {unrecoverable_error, invalid_request}}.
- proc_sql_params(ActionResId, #{} = Map, [], State) when is_binary(ActionResId) ->
- %% When this connector is called from actions/bridges.
- DisablePreparedStatements = prepared_statements_disabled(State),
- {ExprTemplate, RowTemplate} = get_template(ActionResId, State),
- Rendered = render_prepare_sql_row(RowTemplate, Map),
- case DisablePreparedStatements of
- true ->
- {query, ExprTemplate, Rendered};
- false ->
- {prepared_query, ActionResId, Rendered}
- end;
- proc_sql_params(prepared_query, ConnResId, Params, State) ->
- %% When this connector is called from authn/authz modules
- DisablePreparedStatements = prepared_statements_disabled(State),
- case DisablePreparedStatements of
- true ->
- #{query_templates := #{ConnResId := {ExprTemplate, _VarsTemplate}}} = State,
- {query, ExprTemplate, Params};
- false ->
- %% Connector resource id itself is the prepared statement name
- {prepared_query, ConnResId, Params}
- end;
- proc_sql_params(QueryType, SQL, Params, _State) when
- is_atom(QueryType) andalso
- (is_binary(SQL) orelse is_list(SQL)) andalso
- is_list(Params)
- ->
- %% When called to do ad-hoc commands/queries.
- {QueryType, SQL, Params}.
- prepared_statements_disabled(State) ->
- maps:get(prepares, State, #{}) =:= disabled.
- get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) ->
- BinKey = to_bin(Key),
- ChannelState = maps:get(BinKey, Channels),
- ChannelQueryTemplates = maps:get(query_templates, ChannelState),
- maps:get(BinKey, ChannelQueryTemplates);
- get_template(Key, #{query_templates := Templates}) ->
- BinKey = to_bin(Key),
- maps:get(BinKey, Templates, undefined).
- get_templated_statement(Key, #{installed_channels := Channels} = _State) when
- is_map_key(Key, Channels)
- ->
- BinKey = to_bin(Key),
- ChannelState = maps:get(BinKey, Channels),
- case ChannelState of
- #{prepares := disabled, query_templates := #{BinKey := {ExprTemplate, _}}} ->
- ExprTemplate;
- #{prepares := #{BinKey := ExprTemplate}} ->
- ExprTemplate
- end;
- get_templated_statement(Key, #{prepares := PrepStatements}) ->
- BinKey = to_bin(Key),
- maps:get(BinKey, PrepStatements).
- on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
- try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
- {error, Reason} ->
- ?tp(
- pgsql_connector_query_return,
- #{error => Reason}
- ),
- TranslatedError = translate_to_log_context(Reason),
- ?SLOG(
- error,
- maps:merge(
- #{
- msg => "postgresql_connector_do_sql_query_failed",
- connector => InstId,
- type => Type,
- sql => NameOrSQL
- },
- TranslatedError
- )
- ),
- case Reason of
- sync_required ->
- {error, {recoverable_error, Reason}};
- ecpool_empty ->
- {error, {recoverable_error, Reason}};
- {error, error, _, undefined_table, _, _} ->
- {error, {unrecoverable_error, export_error(TranslatedError)}};
- _ ->
- {error, export_error(TranslatedError)}
- end;
- Result ->
- ?tp(
- pgsql_connector_query_return,
- #{result => Result}
- ),
- Result
- catch
- error:function_clause:Stacktrace ->
- ?SLOG(error, #{
- msg => "postgresql_connector_do_sql_query_failed",
- connector => InstId,
- type => Type,
- sql => NameOrSQL,
- reason => function_clause,
- stacktrace => Stacktrace
- }),
- {error, {unrecoverable_error, invalid_request}}
- end.
- on_get_status(_InstId, #{pool_name := PoolName} = State) ->
- case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
- true ->
- case do_check_prepares(State) of
- ok ->
- connected;
- {ok, NState} ->
- %% return new state with prepared statements
- {connected, NState};
- {error, undefined_table} ->
- %% return new state indicating that we are connected but the target table is not created
- {disconnected, State, unhealthy_target};
- {error, _Reason} ->
- %% do not log error, it is logged in prepare_sql_to_conn
- connecting
- end;
- false ->
- connecting
- end.
- do_get_status(Conn) ->
- ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")).
- do_check_prepares(
- #{
- pool_name := PoolName,
- query_templates := #{<<"send_message">> := {SQL, _RowTemplate}}
- }
- ) ->
- WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
- case validate_table_existence(WorkerPids, SQL) of
- ok ->
- ok;
- {error, Reason} ->
- {error, Reason}
- end;
- do_check_prepares(#{prepares := disabled}) ->
- ok;
- do_check_prepares(#{prepares := Prepares}) when is_map(Prepares) ->
- ok;
- do_check_prepares(#{prepares := {error, _}} = State) ->
- %% retry to prepare
- case prepare_sql(State) of
- {ok, PrepStatements} ->
- %% remove the error
- {ok, State#{prepares := PrepStatements}};
- {error, Reason} ->
- {error, Reason}
- end.
- -spec validate_table_existence([pid()], binary()) -> ok | {error, undefined_table}.
- validate_table_existence([WorkerPid | Rest], SQL) ->
- try ecpool_worker:client(WorkerPid) of
- {ok, Conn} ->
- case epgsql:parse2(Conn, "", SQL, []) of
- {error, {_, _, _, undefined_table, _, _}} ->
- {error, undefined_table};
- Res when is_tuple(Res) andalso ok == element(1, Res) ->
- ok;
- Res ->
- ?tp(postgres_connector_bad_parse2, #{result => Res}),
- validate_table_existence(Rest, SQL)
- end;
- _ ->
- validate_table_existence(Rest, SQL)
- catch
- exit:{noproc, _} ->
- validate_table_existence(Rest, SQL)
- end;
- validate_table_existence([], _SQL) ->
- %% All workers either replied an unexpected error; we will retry
- %% on the next health check.
- ok.
- %% ===================================================================
- connect(Opts) ->
- Host = proplists:get_value(host, Opts),
- Username = proplists:get_value(username, Opts),
- %% TODO: teach `epgsql` to accept 0-arity closures as passwords.
- Password = emqx_secret:unwrap(proplists:get_value(password, Opts)),
- case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of
- {ok, _Conn} = Ok ->
- Ok;
- {error, Reason} ->
- {error, Reason}
- end.
- query(Conn, SQL, Params) ->
- case epgsql:equery(Conn, SQL, Params) of
- {error, sync_required} = Res ->
- ok = epgsql:sync(Conn),
- Res;
- Res ->
- Res
- end.
- prepared_query(Conn, Name, Params) ->
- case epgsql:prepared_query2(Conn, Name, Params) of
- {error, sync_required} = Res ->
- ok = epgsql:sync(Conn),
- Res;
- Res ->
- Res
- end.
- execute_batch(Conn, Statement, Params) ->
- case epgsql:execute_batch(Conn, Statement, Params) of
- {error, sync_required} = Res ->
- ok = epgsql:sync(Conn),
- Res;
- Res ->
- Res
- end.
- conn_opts(Opts) ->
- conn_opts(Opts, []).
- conn_opts([], Acc) ->
- Acc;
- conn_opts([Opt = {database, _} | Opts], Acc) ->
- conn_opts(Opts, [Opt | Acc]);
- conn_opts([{ssl, Bool} | Opts], Acc) when is_boolean(Bool) ->
- Flag =
- case Bool of
- true -> required;
- false -> false
- end,
- conn_opts(Opts, [{ssl, Flag} | Acc]);
- conn_opts([Opt = {port, _} | Opts], Acc) ->
- conn_opts(Opts, [Opt | Acc]);
- conn_opts([Opt = {timeout, _} | Opts], Acc) ->
- conn_opts(Opts, [Opt | Acc]);
- conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
- conn_opts(Opts, [Opt | Acc]);
- conn_opts([_Opt | Opts], Acc) ->
- conn_opts(Opts, Acc).
- parse_sql_template(Config, SQLID) ->
- Queries =
- case Config of
- #{prepare_statement := Qs} ->
- Qs;
- #{sql := Query} ->
- #{SQLID => Query};
- #{} ->
- #{}
- end,
- Templates = maps:fold(fun parse_sql_template/3, #{}, Queries),
- #{query_templates => Templates}.
- parse_sql_template(Key, Query, Acc) ->
- Template = emqx_template_sql:parse_prepstmt(Query, #{parameters => '$n'}),
- Acc#{Key => Template}.
- render_prepare_sql_row(RowTemplate, Data) ->
- % NOTE: ignoring errors here, missing variables will be replaced with `null`.
- {Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, {emqx_jsonish, Data}),
- Row.
- init_prepare(State = #{prepares := disabled}) ->
- State;
- init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 ->
- State;
- init_prepare(State = #{}) ->
- case prepare_sql(State) of
- {ok, PrepStatements} ->
- State#{prepares => PrepStatements};
- Error ->
- TranslatedError = translate_to_log_context(Error),
- ?SLOG(
- error,
- maps:merge(
- #{msg => <<"postgresql_init_prepare_statement_failed">>},
- TranslatedError
- )
- ),
- %% mark the prepares failed
- State#{prepares => {error, export_error(TranslatedError)}}
- end.
- prepare_sql(#{query_templates := Templates, pool_name := PoolName}) ->
- prepare_sql(maps:to_list(Templates), PoolName).
- prepare_sql(Templates, PoolName) ->
- case do_prepare_sql(Templates, PoolName) of
- {ok, _Sts} = Ok ->
- %% prepare for reconnect
- ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]}),
- Ok;
- Error ->
- Error
- end.
- do_prepare_sql(Templates, PoolName) ->
- do_prepare_sql(ecpool:workers(PoolName), Templates, #{}).
- do_prepare_sql([{_Name, Worker} | Rest], Templates, _LastSts) ->
- {ok, Conn} = ecpool_worker:client(Worker),
- case prepare_sql_to_conn(Conn, Templates) of
- {ok, Sts} ->
- do_prepare_sql(Rest, Templates, Sts);
- Error ->
- Error
- end;
- do_prepare_sql([], _Prepares, LastSts) ->
- {ok, LastSts}.
- prepare_sql_to_conn(Conn, Prepares) ->
- prepare_sql_to_conn(Conn, Prepares, #{}, 0).
- prepare_sql_to_conn(Conn, [], Statements, _Attempts) when is_pid(Conn) ->
- {ok, Statements};
- prepare_sql_to_conn(Conn, [{_Key, _} | _Rest], _Statements, _MaxAttempts = 2) when is_pid(Conn) ->
- failed_to_remove_prev_prepared_statement_error();
- prepare_sql_to_conn(
- Conn, [{Key, {SQL, _RowTemplate}} | Rest] = ToPrepare, Statements, Attempts
- ) when is_pid(Conn) ->
- LogMeta = #{msg => "postgresql_prepare_statement", name => Key, sql => SQL},
- ?SLOG(info, LogMeta),
- case epgsql:parse2(Conn, Key, SQL, []) of
- {ok, Statement} ->
- prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}, 0);
- {error, #error{severity = error, codename = undefined_table} = Error} ->
- %% Target table is not created
- ?tp(pgsql_undefined_table, #{}),
- LogMsg =
- maps:merge(
- LogMeta#{msg => "postgresql_parse_failed"},
- translate_to_log_context(Error)
- ),
- ?SLOG(error, LogMsg),
- {error, undefined_table};
- {error, #error{severity = error, codename = duplicate_prepared_statement}} = Error ->
- ?tp(pgsql_prepared_statement_exists, #{}),
- LogMsg =
- maps:merge(
- LogMeta#{
- msg => "postgresql_prepared_statment_with_same_name_already_exists",
- explain => <<
- "A prepared statement with the same name already "
- "exists in the driver. Will attempt to remove the "
- "previous prepared statement with the name and then "
- "try again."
- >>
- },
- translate_to_log_context(Error)
- ),
- ?SLOG(warning, LogMsg),
- case epgsql:close(Conn, statement, Key) of
- ok ->
- ?SLOG(info, #{msg => "pqsql_closed_statement_successfully"}),
- prepare_sql_to_conn(Conn, ToPrepare, Statements, Attempts + 1);
- {error, CloseError} ->
- ?SLOG(error, #{msg => "pqsql_close_statement_failed", cause => CloseError}),
- failed_to_remove_prev_prepared_statement_error()
- end;
- {error, Error} ->
- TranslatedError = translate_to_log_context(Error),
- LogMsg =
- maps:merge(
- LogMeta#{msg => "postgresql_parse_failed"},
- TranslatedError
- ),
- ?SLOG(error, LogMsg),
- {error, export_error(TranslatedError)}
- end.
- failed_to_remove_prev_prepared_statement_error() ->
- Msg =
- ("A previous prepared statement for the action already exists "
- "but cannot be closed. Please, try to disable and then enable "
- "the connector to resolve this issue."),
- {error, unicode:characters_to_binary(Msg)}.
- to_bin(Bin) when is_binary(Bin) ->
- Bin;
- to_bin(Atom) when is_atom(Atom) ->
- erlang:atom_to_binary(Atom).
- handle_result({error, {recoverable_error, _Error}} = Res) ->
- Res;
- handle_result({error, {unrecoverable_error, _Error}} = Res) ->
- Res;
- handle_result({error, disconnected}) ->
- {error, {recoverable_error, disconnected}};
- handle_result({error, Error}) ->
- TranslatedError = translate_to_log_context(Error),
- {error, {unrecoverable_error, export_error(TranslatedError)}};
- handle_result(Res) ->
- Res.
- on_format_query_result({ok, Cnt}) when is_integer(Cnt) ->
- #{result => ok, affected_rows => Cnt};
- on_format_query_result(Res) ->
- Res.
- handle_batch_result([{ok, Count} | Rest], Acc) ->
- handle_batch_result(Rest, Acc + Count);
- handle_batch_result([{error, Error} | _Rest], _Acc) ->
- TranslatedError = translate_to_log_context(Error),
- {error, {unrecoverable_error, export_error(TranslatedError)}};
- handle_batch_result([], Acc) ->
- ?tp("postgres_success_batch_result", #{row_count => Acc}),
- {ok, Acc}.
- translate_to_log_context({error, Reason}) ->
- translate_to_log_context(Reason);
- translate_to_log_context(#error{} = Reason) ->
- #error{
- severity = Severity,
- code = Code,
- codename = Codename,
- message = Message,
- extra = Extra
- } = Reason,
- #{
- driver_severity => Severity,
- driver_error_codename => Codename,
- driver_error_code => Code,
- driver_error_message => emqx_logger_textfmt:try_format_unicode(Message),
- driver_error_extra => Extra
- };
- translate_to_log_context(Reason) ->
- #{reason => Reason}.
- export_error(#{
- driver_severity := Severity,
- driver_error_codename := Codename,
- driver_error_code := Code
- }) ->
- %% Extra information has already been logged.
- #{
- error_code => Code,
- error_codename => Codename,
- severity => Severity
- };
- export_error(#{reason := Reason}) ->
- Reason;
- export_error(Error) ->
- Error.
|