|
|
@@ -35,7 +35,11 @@
|
|
|
on_stop/2,
|
|
|
on_query/3,
|
|
|
on_batch_query/3,
|
|
|
- on_get_status/2
|
|
|
+ on_get_status/2,
|
|
|
+ on_add_channel/4,
|
|
|
+ on_remove_channel/3,
|
|
|
+ on_get_channels/1,
|
|
|
+ on_get_channel_status/3
|
|
|
]).
|
|
|
|
|
|
%% callbacks for ecpool
|
|
|
@@ -124,9 +128,9 @@
|
|
|
%% -type size() :: integer().
|
|
|
|
|
|
-type state() :: #{
|
|
|
+ installed_channels := map(),
|
|
|
pool_name := binary(),
|
|
|
- resource_opts := map(),
|
|
|
- sql_templates := map()
|
|
|
+ resource_opts := map()
|
|
|
}.
|
|
|
|
|
|
%%====================================================================
|
|
|
@@ -172,7 +176,7 @@ server() ->
|
|
|
callback_mode() -> always_sync.
|
|
|
|
|
|
on_start(
|
|
|
- ResourceId = PoolName,
|
|
|
+ InstanceId = PoolName,
|
|
|
#{
|
|
|
server := Server,
|
|
|
username := Username,
|
|
|
@@ -184,7 +188,7 @@ on_start(
|
|
|
) ->
|
|
|
?SLOG(info, #{
|
|
|
msg => "starting_sqlserver_connector",
|
|
|
- connector => ResourceId,
|
|
|
+ connector => InstanceId,
|
|
|
config => emqx_utils:redact(Config)
|
|
|
}),
|
|
|
|
|
|
@@ -199,7 +203,8 @@ on_start(
|
|
|
ok
|
|
|
end,
|
|
|
|
|
|
- Options = [
|
|
|
+ %% odbc connection string required
|
|
|
+ ConnectOptions = [
|
|
|
{server, to_bin(Server)},
|
|
|
{username, Username},
|
|
|
{password, maps:get(password, Config, emqx_secret:wrap(""))},
|
|
|
@@ -209,12 +214,12 @@ on_start(
|
|
|
],
|
|
|
|
|
|
State = #{
|
|
|
- %% also ResourceId
|
|
|
+ %% also InstanceId
|
|
|
pool_name => PoolName,
|
|
|
- sql_templates => parse_sql_template(Config),
|
|
|
+ installed_channels => #{},
|
|
|
resource_opts => ResourceOpts
|
|
|
},
|
|
|
- case emqx_resource_pool:start(PoolName, ?MODULE, Options) of
|
|
|
+ case emqx_resource_pool:start(PoolName, ?MODULE, ConnectOptions) of
|
|
|
ok ->
|
|
|
{ok, State};
|
|
|
{error, Reason} ->
|
|
|
@@ -225,23 +230,72 @@ on_start(
|
|
|
{error, Reason}
|
|
|
end.
|
|
|
|
|
|
-on_stop(ResourceId, _State) ->
|
|
|
+on_add_channel(
|
|
|
+ _InstId,
|
|
|
+ #{
|
|
|
+ installed_channels := InstalledChannels
|
|
|
+ } = OldState,
|
|
|
+ ChannelId,
|
|
|
+ ChannelConfig
|
|
|
+) ->
|
|
|
+ {ok, ChannelState} = create_channel_state(ChannelConfig),
|
|
|
+ NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
|
|
|
+ %% Update state
|
|
|
+ NewState = OldState#{installed_channels => NewInstalledChannels},
|
|
|
+ {ok, NewState}.
|
|
|
+
|
|
|
+create_channel_state(
|
|
|
+ #{parameters := Conf} = _ChannelConfig
|
|
|
+) ->
|
|
|
+ State = #{sql_templates => parse_sql_template(Conf)},
|
|
|
+ {ok, State}.
|
|
|
+
|
|
|
+on_remove_channel(
|
|
|
+ _InstId,
|
|
|
+ #{
|
|
|
+ installed_channels := InstalledChannels
|
|
|
+ } = OldState,
|
|
|
+ ChannelId
|
|
|
+) ->
|
|
|
+ NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
|
|
|
+ %% Update state
|
|
|
+ NewState = OldState#{installed_channels => NewInstalledChannels},
|
|
|
+ {ok, NewState}.
|
|
|
+
|
|
|
+on_get_channel_status(
|
|
|
+ InstanceId,
|
|
|
+ ChannelId,
|
|
|
+ #{installed_channels := Channels} = State
|
|
|
+) ->
|
|
|
+ case maps:find(ChannelId, Channels) of
|
|
|
+ {ok, _} -> on_get_status(InstanceId, State);
|
|
|
+ error -> ?status_disconnected
|
|
|
+ end.
|
|
|
+
|
|
|
+on_get_channels(ResId) ->
|
|
|
+ emqx_bridge_v2:get_channels_for_connector(ResId).
|
|
|
+
|
|
|
+on_stop(InstanceId, _State) ->
|
|
|
+ ?tp(
|
|
|
+ sqlserver_connector_on_stop,
|
|
|
+ #{instance_id => InstanceId}
|
|
|
+ ),
|
|
|
?SLOG(info, #{
|
|
|
msg => "stopping_sqlserver_connector",
|
|
|
- connector => ResourceId
|
|
|
+ connector => InstanceId
|
|
|
}),
|
|
|
- emqx_resource_pool:stop(ResourceId).
|
|
|
+ emqx_resource_pool:stop(InstanceId).
|
|
|
|
|
|
-spec on_query(
|
|
|
resource_id(),
|
|
|
- {?ACTION_SEND_MESSAGE, map()},
|
|
|
+ Query :: {channel_id(), map()},
|
|
|
state()
|
|
|
) ->
|
|
|
ok
|
|
|
| {ok, list()}
|
|
|
| {error, {recoverable_error, term()}}
|
|
|
| {error, term()}.
|
|
|
-on_query(ResourceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) ->
|
|
|
+on_query(ResourceId, {_ChannelId, _Msg} = Query, State) ->
|
|
|
?TRACE(
|
|
|
"SINGLE_QUERY_SYNC",
|
|
|
"bridge_sqlserver_received",
|
|
|
@@ -251,7 +305,7 @@ on_query(ResourceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) ->
|
|
|
|
|
|
-spec on_batch_query(
|
|
|
resource_id(),
|
|
|
- [{?ACTION_SEND_MESSAGE, map()}],
|
|
|
+ [{channel_id(), map()}],
|
|
|
state()
|
|
|
) ->
|
|
|
ok
|
|
|
@@ -273,8 +327,8 @@ on_get_status(_InstanceId, #{pool_name := PoolName} = _State) ->
|
|
|
),
|
|
|
status_result(Health).
|
|
|
|
|
|
-status_result(_Status = true) -> connected;
|
|
|
-status_result(_Status = false) -> connecting.
|
|
|
+status_result(_Status = true) -> ?status_connected;
|
|
|
+status_result(_Status = false) -> ?status_connecting.
|
|
|
%% TODO:
|
|
|
%% case for disconnected
|
|
|
|
|
|
@@ -296,7 +350,7 @@ do_get_status(Conn) ->
|
|
|
end.
|
|
|
|
|
|
%%====================================================================
|
|
|
-%% Internal Helper fns
|
|
|
+%% Internal Functions
|
|
|
%%====================================================================
|
|
|
|
|
|
%% TODO && FIXME:
|
|
|
@@ -329,7 +383,7 @@ conn_str([{_, _} | Opts], Acc) ->
|
|
|
%% Query with singe & batch sql statement
|
|
|
-spec do_query(
|
|
|
resource_id(),
|
|
|
- Query :: {?ACTION_SEND_MESSAGE, map()} | [{?ACTION_SEND_MESSAGE, map()}],
|
|
|
+ Query :: {channel_id(), map()} | [{channel_id(), map()}],
|
|
|
ApplyMode :: handover,
|
|
|
state()
|
|
|
) ->
|
|
|
@@ -341,7 +395,10 @@ do_query(
|
|
|
ResourceId,
|
|
|
Query,
|
|
|
ApplyMode,
|
|
|
- #{pool_name := PoolName, sql_templates := Templates} = State
|
|
|
+ #{
|
|
|
+ pool_name := PoolName,
|
|
|
+ installed_channels := Channels
|
|
|
+ } = State
|
|
|
) ->
|
|
|
?TRACE(
|
|
|
"SINGLE_QUERY_SYNC",
|
|
|
@@ -349,15 +406,19 @@ do_query(
|
|
|
#{query => Query, connector => ResourceId, state => State}
|
|
|
),
|
|
|
|
|
|
+ ChannelId = get_channel_id(Query),
|
|
|
+ QueryTuple = get_query_tuple(Query),
|
|
|
+ #{sql_templates := Templates} = _ChannelState = maps:get(ChannelId, Channels),
|
|
|
+
|
|
|
%% only insert sql statement for single query and batch query
|
|
|
- case apply_template(Query, Templates) of
|
|
|
+ case apply_template(QueryTuple, Templates) of
|
|
|
{?ACTION_SEND_MESSAGE, SQL} ->
|
|
|
Result = ecpool:pick_and_do(
|
|
|
PoolName,
|
|
|
{?MODULE, worker_do_insert, [SQL, State]},
|
|
|
ApplyMode
|
|
|
);
|
|
|
- Query ->
|
|
|
+ QueryTuple ->
|
|
|
Result = {error, {unrecoverable_error, invalid_query}};
|
|
|
_ ->
|
|
|
Result = {error, {unrecoverable_error, failed_to_apply_sql_template}}
|
|
|
@@ -426,8 +487,22 @@ execute(Conn, SQL) ->
|
|
|
execute(Conn, SQL, Timeout) ->
|
|
|
odbc:sql_query(Conn, str(SQL), Timeout).
|
|
|
|
|
|
-to_bin(List) when is_list(List) ->
|
|
|
- unicode:characters_to_binary(List, utf8).
|
|
|
+get_channel_id([{ChannelId, _Req} | _]) ->
|
|
|
+ ChannelId;
|
|
|
+get_channel_id({ChannelId, _Req}) ->
|
|
|
+ ChannelId.
|
|
|
+
|
|
|
+get_query_tuple({_ChannelId, {QueryType, Data}} = _Query) ->
|
|
|
+ {QueryType, Data};
|
|
|
+get_query_tuple({_ChannelId, Data} = _Query) ->
|
|
|
+ {send_message, Data};
|
|
|
+get_query_tuple([{_ChannelId, {_QueryType, _Data}} | _]) ->
|
|
|
+ error(
|
|
|
+ {unrecoverable_error,
|
|
|
+ {invalid_request, <<"The only query type that supports batching is insert.">>}}
|
|
|
+ );
|
|
|
+get_query_tuple([InsertQuery | _]) ->
|
|
|
+ get_query_tuple(InsertQuery).
|
|
|
|
|
|
%% for bridge data to sql server
|
|
|
parse_sql_template(Config) ->
|
|
|
@@ -506,3 +581,6 @@ proc_batch_sql(BatchReqs, BatchInserts, Tokens) ->
|
|
|
])
|
|
|
),
|
|
|
<<BatchInserts/binary, " values ", Values/binary>>.
|
|
|
+
|
|
|
+to_bin(List) when is_list(List) ->
|
|
|
+ unicode:characters_to_binary(List, utf8).
|