|
|
@@ -86,7 +86,6 @@ on_start(InstId, #{server := {Host, Port},
|
|
|
{pool_size, PoolSize}],
|
|
|
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
|
|
|
Prepares = maps:get(prepare_statement, Config, #{}),
|
|
|
- io:format("Prepares ~p~n", [Prepares]),
|
|
|
State = init_prepare(#{poolname => PoolName, prepare_statement => Prepares}),
|
|
|
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
|
|
|
ok -> {ok, State};
|
|
|
@@ -116,26 +115,30 @@ on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery,
|
|
|
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected}),
|
|
|
%% kill the poll worker to trigger reconnection
|
|
|
_ = exit(Conn, restart),
|
|
|
- emqx_resource:query_failed(AfterQuery);
|
|
|
+ emqx_resource:query_failed(AfterQuery),
|
|
|
+ Result;
|
|
|
{error, not_prepared} ->
|
|
|
?SLOG(warning,
|
|
|
- LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => not_prepared}),
|
|
|
+ LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared}),
|
|
|
case prepare_sql(Prepares, PoolName) of
|
|
|
ok ->
|
|
|
+ %% not return result, next loop will try again
|
|
|
on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery, State);
|
|
|
{error, Reason} ->
|
|
|
?SLOG(error,
|
|
|
- LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}),
|
|
|
- emqx_resource:query_failed(AfterQuery)
|
|
|
+ LogMeta#{msg => "mysql_connector_do_prepare_failed", reason => Reason}),
|
|
|
+ emqx_resource:query_failed(AfterQuery),
|
|
|
+ {error, Reason}
|
|
|
end;
|
|
|
{error, Reason} ->
|
|
|
?SLOG(error,
|
|
|
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}),
|
|
|
- emqx_resource:query_failed(AfterQuery);
|
|
|
+ emqx_resource:query_failed(AfterQuery),
|
|
|
+ Result;
|
|
|
_ ->
|
|
|
- emqx_resource:query_success(AfterQuery)
|
|
|
- end,
|
|
|
- Result.
|
|
|
+ emqx_resource:query_success(AfterQuery),
|
|
|
+ Result
|
|
|
+ end.
|
|
|
|
|
|
mysql_function(sql) -> query;
|
|
|
mysql_function(prepared_query) -> execute.
|