|
@@ -44,6 +44,16 @@
|
|
|
default_port => ?MYSQL_DEFAULT_PORT
|
|
default_port => ?MYSQL_DEFAULT_PORT
|
|
|
}).
|
|
}).
|
|
|
|
|
|
|
|
|
|
+-type prepares() :: #{atom() => binary()}.
|
|
|
|
|
+-type params_tokens() :: #{atom() => list()}.
|
|
|
|
|
+-type state() ::
|
|
|
|
|
+ #{
|
|
|
|
|
+ poolname := atom(),
|
|
|
|
|
+ prepare_statement := prepares(),
|
|
|
|
|
+ auto_reconnect := boolean(),
|
|
|
|
|
+ params_tokens := params_tokens()
|
|
|
|
|
+ }.
|
|
|
|
|
+
|
|
|
%%=====================================================================
|
|
%%=====================================================================
|
|
|
%% Hocon schema
|
|
%% Hocon schema
|
|
|
roots() ->
|
|
roots() ->
|
|
@@ -63,6 +73,7 @@ server(desc) -> ?DESC("server");
|
|
|
server(_) -> undefined.
|
|
server(_) -> undefined.
|
|
|
|
|
|
|
|
%% ===================================================================
|
|
%% ===================================================================
|
|
|
|
|
+-spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}.
|
|
|
on_start(
|
|
on_start(
|
|
|
InstId,
|
|
InstId,
|
|
|
#{
|
|
#{
|
|
@@ -97,8 +108,8 @@ on_start(
|
|
|
{pool_size, PoolSize}
|
|
{pool_size, PoolSize}
|
|
|
],
|
|
],
|
|
|
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
|
|
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
|
|
|
- Prepares = maps:get(prepare_statement, Config, #{}),
|
|
|
|
|
- State = #{poolname => PoolName, prepare_statement => Prepares, auto_reconnect => AutoReconn},
|
|
|
|
|
|
|
+ Prepares = parse_prepare_sql(maps:get(prepare_statement, Config, #{})),
|
|
|
|
|
+ State = maps:merge(#{poolname => PoolName, auto_reconnect => AutoReconn}, Prepares),
|
|
|
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
|
|
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
|
|
|
ok -> {ok, init_prepare(State)};
|
|
ok -> {ok, init_prepare(State)};
|
|
|
{error, Reason} -> {error, Reason}
|
|
{error, Reason} -> {error, Reason}
|
|
@@ -111,13 +122,13 @@ on_stop(InstId, #{poolname := PoolName}) ->
|
|
|
}),
|
|
}),
|
|
|
emqx_plugin_libs_pool:stop_pool(PoolName).
|
|
emqx_plugin_libs_pool:stop_pool(PoolName).
|
|
|
|
|
|
|
|
-on_query(InstId, {Type, SQLOrKey}, AfterQuery, State) ->
|
|
|
|
|
- on_query(InstId, {Type, SQLOrKey, [], default_timeout}, AfterQuery, State);
|
|
|
|
|
-on_query(InstId, {Type, SQLOrKey, Params}, AfterQuery, State) ->
|
|
|
|
|
- on_query(InstId, {Type, SQLOrKey, Params, default_timeout}, AfterQuery, State);
|
|
|
|
|
|
|
+on_query(InstId, {TypeOrKey, SQLOrKey}, AfterQuery, State) ->
|
|
|
|
|
+ on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, AfterQuery, State);
|
|
|
|
|
+on_query(InstId, {TypeOrKey, SQLOrKey, Params}, AfterQuery, State) ->
|
|
|
|
|
+ on_query(InstId, {TypeOrKey, SQLOrKey, Params, default_timeout}, AfterQuery, State);
|
|
|
on_query(
|
|
on_query(
|
|
|
InstId,
|
|
InstId,
|
|
|
- {Type, SQLOrKey, Params, Timeout},
|
|
|
|
|
|
|
+ {TypeOrKey, SQLOrKey, Params, Timeout},
|
|
|
AfterQuery,
|
|
AfterQuery,
|
|
|
#{poolname := PoolName, prepare_statement := Prepares} = State
|
|
#{poolname := PoolName, prepare_statement := Prepares} = State
|
|
|
) ->
|
|
) ->
|
|
@@ -125,8 +136,9 @@ on_query(
|
|
|
?TRACE("QUERY", "mysql_connector_received", LogMeta),
|
|
?TRACE("QUERY", "mysql_connector_received", LogMeta),
|
|
|
Worker = ecpool:get_client(PoolName),
|
|
Worker = ecpool:get_client(PoolName),
|
|
|
{ok, Conn} = ecpool_worker:client(Worker),
|
|
{ok, Conn} = ecpool_worker:client(Worker),
|
|
|
- MySqlFunction = mysql_function(Type),
|
|
|
|
|
- Result = erlang:apply(mysql, MySqlFunction, [Conn, SQLOrKey, Params, Timeout]),
|
|
|
|
|
|
|
+ MySqlFunction = mysql_function(TypeOrKey),
|
|
|
|
|
+ {SQLOrKey2, Data} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State),
|
|
|
|
|
+ Result = erlang:apply(mysql, MySqlFunction, [Conn, SQLOrKey2, Data, Timeout]),
|
|
|
case Result of
|
|
case Result of
|
|
|
{error, disconnected} ->
|
|
{error, disconnected} ->
|
|
|
?SLOG(
|
|
?SLOG(
|
|
@@ -145,7 +157,7 @@ on_query(
|
|
|
case prepare_sql(Prepares, PoolName) of
|
|
case prepare_sql(Prepares, PoolName) of
|
|
|
ok ->
|
|
ok ->
|
|
|
%% not return result, next loop will try again
|
|
%% not return result, next loop will try again
|
|
|
- on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery, State);
|
|
|
|
|
|
|
+ on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, AfterQuery, State);
|
|
|
{error, Reason} ->
|
|
{error, Reason} ->
|
|
|
?SLOG(
|
|
?SLOG(
|
|
|
error,
|
|
error,
|
|
@@ -166,8 +178,13 @@ on_query(
|
|
|
Result
|
|
Result
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-mysql_function(sql) -> query;
|
|
|
|
|
-mysql_function(prepared_query) -> execute.
|
|
|
|
|
|
|
+mysql_function(sql) ->
|
|
|
|
|
+ query;
|
|
|
|
|
+mysql_function(prepared_query) ->
|
|
|
|
|
+ execute;
|
|
|
|
|
+%% for bridge
|
|
|
|
|
+mysql_function(_) ->
|
|
|
|
|
+ mysql_function(prepared_query).
|
|
|
|
|
|
|
|
on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) ->
|
|
on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) ->
|
|
|
case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
|
|
case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
|
|
@@ -287,3 +304,24 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList]) when is_pid(Conn) ->
|
|
|
|
|
|
|
|
unprepare_sql_to_conn(Conn, PrepareSqlKey) ->
|
|
unprepare_sql_to_conn(Conn, PrepareSqlKey) ->
|
|
|
mysql:unprepare(Conn, PrepareSqlKey).
|
|
mysql:unprepare(Conn, PrepareSqlKey).
|
|
|
|
|
+
|
|
|
|
|
+parse_prepare_sql(SQL) ->
|
|
|
|
|
+ parse_prepare_sql(maps:to_list(SQL), #{}, #{}).
|
|
|
|
|
+
|
|
|
|
|
+parse_prepare_sql([{Key, H} | T], SQL, Tokens) ->
|
|
|
|
|
+ {PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H),
|
|
|
|
|
+ parse_prepare_sql(T, SQL#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens});
|
|
|
|
|
+parse_prepare_sql([], SQL, Tokens) ->
|
|
|
|
|
+ #{prepare_statement => SQL, params_tokens => Tokens}.
|
|
|
|
|
+
|
|
|
|
|
+proc_sql_params(query, SQLOrKey, Params, _State) ->
|
|
|
|
|
+ {SQLOrKey, Params};
|
|
|
|
|
+proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
|
|
|
|
|
+ {SQLOrKey, Params};
|
|
|
|
|
+proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) ->
|
|
|
|
|
+ case maps:get(TypeOrKey, ParamsTokens, undefined) of
|
|
|
|
|
+ undefined ->
|
|
|
|
|
+ {SQLOrData, Params};
|
|
|
|
|
+ Tokens ->
|
|
|
|
|
+ {TypeOrKey, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)}
|
|
|
|
|
+ end.
|