|
|
@@ -34,8 +34,6 @@
|
|
|
on_stop/2,
|
|
|
on_query/3,
|
|
|
on_batch_query/3,
|
|
|
- on_query_async/4,
|
|
|
- on_batch_query_async/4,
|
|
|
on_get_status/2
|
|
|
]).
|
|
|
|
|
|
@@ -43,7 +41,7 @@
|
|
|
-export([connect/1]).
|
|
|
|
|
|
%% Internal exports used to execute code with ecpool worker
|
|
|
--export([do_get_status/1, worker_do_insert/3, do_async_reply/2]).
|
|
|
+-export([do_get_status/1, worker_do_insert/3]).
|
|
|
|
|
|
-import(emqx_plugin_libs_rule, [str/1]).
|
|
|
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
|
|
@@ -51,7 +49,6 @@
|
|
|
-define(ACTION_SEND_MESSAGE, send_message).
|
|
|
|
|
|
-define(SYNC_QUERY_MODE, handover).
|
|
|
--define(ASYNC_QUERY_MODE(REPLY), {handover_async, {?MODULE, do_async_reply, [REPLY]}}).
|
|
|
|
|
|
-define(SQLSERVER_HOST_OPTIONS, #{
|
|
|
default_port => 1433
|
|
|
@@ -169,7 +166,7 @@ server() ->
|
|
|
%% Callbacks defined in emqx_resource
|
|
|
%%====================================================================
|
|
|
|
|
|
-callback_mode() -> async_if_possible.
|
|
|
+callback_mode() -> always_sync.
|
|
|
|
|
|
is_buffer_supported() -> false.
|
|
|
|
|
|
@@ -253,28 +250,6 @@ on_query(InstanceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) ->
|
|
|
),
|
|
|
do_query(InstanceId, Query, ?SYNC_QUERY_MODE, State).
|
|
|
|
|
|
--spec on_query_async(
|
|
|
- manager_id(),
|
|
|
- {?ACTION_SEND_MESSAGE, map()},
|
|
|
- {ReplyFun :: function(), Args :: list()},
|
|
|
- state()
|
|
|
-) ->
|
|
|
- {ok, any()}
|
|
|
- | {error, term()}.
|
|
|
-on_query_async(
|
|
|
- InstanceId,
|
|
|
- {?ACTION_SEND_MESSAGE, _Msg} = Query,
|
|
|
- ReplyFunAndArgs,
|
|
|
- %% #{poolname := PoolName, sql_templates := Templates} = State
|
|
|
- State
|
|
|
-) ->
|
|
|
- ?TRACE(
|
|
|
- "SINGLE_QUERY_ASYNC",
|
|
|
- "bridge_sqlserver_received",
|
|
|
- #{requests => Query, connector => InstanceId, state => State}
|
|
|
- ),
|
|
|
- do_query(InstanceId, Query, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State).
|
|
|
-
|
|
|
-spec on_batch_query(
|
|
|
manager_id(),
|
|
|
[{?ACTION_SEND_MESSAGE, map()}],
|
|
|
@@ -292,20 +267,6 @@ on_batch_query(InstanceId, BatchRequests, State) ->
|
|
|
),
|
|
|
do_query(InstanceId, BatchRequests, ?SYNC_QUERY_MODE, State).
|
|
|
|
|
|
--spec on_batch_query_async(
|
|
|
- manager_id(),
|
|
|
- [{?ACTION_SEND_MESSAGE, map()}],
|
|
|
- {ReplyFun :: function(), Args :: list()},
|
|
|
- state()
|
|
|
-) -> {ok, any()}.
|
|
|
-on_batch_query_async(InstanceId, Requests, ReplyFunAndArgs, State) ->
|
|
|
- ?TRACE(
|
|
|
- "BATCH_QUERY_ASYNC",
|
|
|
- "bridge_sqlserver_received",
|
|
|
- #{requests => Requests, connector => InstanceId, state => State}
|
|
|
- ),
|
|
|
- do_query(InstanceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State).
|
|
|
-
|
|
|
on_get_status(_InstanceId, #{poolname := Pool} = _State) ->
|
|
|
Health = emqx_plugin_libs_pool:health_check_ecpool_workers(
|
|
|
Pool, {?MODULE, do_get_status, []}
|
|
|
@@ -365,13 +326,11 @@ conn_str([{password, Password} | Opts], Acc) ->
|
|
|
conn_str([{_, _} | Opts], Acc) ->
|
|
|
conn_str(Opts, Acc).
|
|
|
|
|
|
-%% Sync & Async query with singe & batch sql statement
|
|
|
+%% Query with singe & batch sql statement
|
|
|
-spec do_query(
|
|
|
manager_id(),
|
|
|
Query :: {?ACTION_SEND_MESSAGE, map()} | [{?ACTION_SEND_MESSAGE, map()}],
|
|
|
- ApplyMode ::
|
|
|
- handover
|
|
|
- | {handover_async, {?MODULE, do_async_reply, [{ReplyFun :: function(), Args :: list()}]}},
|
|
|
+ ApplyMode :: handover,
|
|
|
state()
|
|
|
) ->
|
|
|
{ok, list()}
|
|
|
@@ -531,6 +490,3 @@ apply_template(Query, Templates) ->
|
|
|
%% TODO: more detail infomatoin
|
|
|
?SLOG(error, #{msg => "apply sql template failed", query => Query, templates => Templates}),
|
|
|
{error, failed_to_apply_sql_template}.
|
|
|
-
|
|
|
-do_async_reply(Result, {ReplyFun, Args}) ->
|
|
|
- erlang:apply(ReplyFun, Args ++ [Result]).
|