|
|
@@ -32,7 +32,9 @@
|
|
|
|
|
|
-export([connect/1]).
|
|
|
|
|
|
--export([query/3]).
|
|
|
+-export([ query/3
|
|
|
+ , prepared_query/4
|
|
|
+ ]).
|
|
|
|
|
|
-export([do_health_check/1]).
|
|
|
|
|
|
@@ -60,8 +62,7 @@ on_start(InstId, #{server := {Host, Port},
|
|
|
connector => InstId, config => Config}),
|
|
|
SslOpts = case maps:get(enable, SSL) of
|
|
|
true ->
|
|
|
- [{ssl, [{server_name_indication, disable} |
|
|
|
- emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)]}];
|
|
|
+ [{ssl, [emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)]}];
|
|
|
false -> []
|
|
|
end,
|
|
|
Options = [{host, Host},
|
|
|
@@ -80,12 +81,16 @@ on_stop(InstId, #{poolname := PoolName}) ->
|
|
|
connector => InstId}),
|
|
|
emqx_plugin_libs_pool:stop_pool(PoolName).
|
|
|
|
|
|
-on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) ->
|
|
|
- on_query(InstId, {sql, SQL, []}, AfterQuery, State);
|
|
|
-on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) ->
|
|
|
+on_query(InstId, QueryParams, AfterQuery, #{poolname := PoolName} = State) ->
|
|
|
+ {Command, Args} = case QueryParams of
|
|
|
+ {query, SQL} -> {query, [SQL, []]};
|
|
|
+ {query, SQL, Params} -> {query, [SQL, Params]};
|
|
|
+ {prepared_query, Name, SQL} -> {prepared_query, [Name, SQL, []]};
|
|
|
+ {prepared_query, Name, SQL, Params} -> {prepared_query, [Name, SQL, Params]}
|
|
|
+ end,
|
|
|
?SLOG(debug, #{msg => "postgresql connector received sql query",
|
|
|
- connector => InstId, sql => SQL, state => State}),
|
|
|
- case Result = ecpool:pick_and_do(PoolName, {?MODULE, query, [SQL, Params]}, no_handover) of
|
|
|
+ connector => InstId, command => Command, args => Args, state => State}),
|
|
|
+ case Result = ecpool:pick_and_do(PoolName, {?MODULE, Command, Args}, no_handover) of
|
|
|
{error, Reason} ->
|
|
|
?SLOG(error, #{
|
|
|
msg => "postgresql connector do sql query failed",
|
|
|
@@ -115,6 +120,9 @@ connect(Opts) ->
|
|
|
query(Conn, SQL, Params) ->
|
|
|
epgsql:equery(Conn, SQL, Params).
|
|
|
|
|
|
+prepared_query(Conn, Name, SQL, Params) ->
|
|
|
+ epgsql:prepared_query2(Conn, Name, SQL, Params).
|
|
|
+
|
|
|
conn_opts(Opts) ->
|
|
|
conn_opts(Opts, []).
|
|
|
conn_opts([], Acc) ->
|