|
|
@@ -120,11 +120,11 @@ namespace() -> "iotdb".
|
|
|
roots() ->
|
|
|
[].
|
|
|
|
|
|
-fields("config_rest") ->
|
|
|
+fields("config_restapi") ->
|
|
|
proplists_without(
|
|
|
[url, request, retry_interval, headers],
|
|
|
emqx_bridge_http_schema:fields("config_connector")
|
|
|
- ) ++ common_fields(rest) ++
|
|
|
+ ) ++ common_fields(restapi) ++
|
|
|
fields("connection_fields");
|
|
|
fields("connection_fields") ->
|
|
|
[
|
|
|
@@ -206,7 +206,7 @@ common_fields(Driver) ->
|
|
|
hoconsc:enum([Driver]),
|
|
|
#{
|
|
|
desc => ?DESC("config_driver"),
|
|
|
- default => <<"rest">>
|
|
|
+ default => <<"restapi">>
|
|
|
}
|
|
|
)}
|
|
|
].
|
|
|
@@ -227,7 +227,7 @@ desc(Struct) when is_list(Struct) ->
|
|
|
desc(_) ->
|
|
|
undefined.
|
|
|
|
|
|
-connector_config(#{driver := rest} = Conf, #{name := Name, parse_confs := ParseConfs}) ->
|
|
|
+connector_config(#{driver := restapi} = Conf, #{name := Name, parse_confs := ParseConfs}) ->
|
|
|
#{
|
|
|
base_url := BaseUrl,
|
|
|
authentication :=
|
|
|
@@ -266,13 +266,13 @@ resource_type() -> iotdb.
|
|
|
|
|
|
callback_mode() -> async_if_possible.
|
|
|
|
|
|
-callback_mode(#{driver := rest}) ->
|
|
|
+callback_mode(#{driver := restapi}) ->
|
|
|
async_if_possible;
|
|
|
callback_mode(#{driver := thrift}) ->
|
|
|
always_sync.
|
|
|
|
|
|
-spec on_start(manager_id(), config()) -> {ok, state()} | no_return().
|
|
|
-on_start(InstanceId, #{driver := rest, iotdb_version := Version} = Config) ->
|
|
|
+on_start(InstanceId, #{driver := restapi, iotdb_version := Version} = Config) ->
|
|
|
%% [FIXME] The configuration passed in here is pre-processed and transformed
|
|
|
%% in emqx_bridge_resource:parse_confs/2.
|
|
|
case emqx_bridge_http_connector:on_start(InstanceId, Config) of
|
|
|
@@ -282,8 +282,8 @@ on_start(InstanceId, #{driver := rest, iotdb_version := Version} = Config) ->
|
|
|
instance_id => InstanceId,
|
|
|
request => emqx_utils:redact(maps:get(request, State, <<>>))
|
|
|
}),
|
|
|
- ?tp(iotdb_bridge_started, #{driver => rest, instance_id => InstanceId}),
|
|
|
- {ok, State#{driver => rest, iotdb_version => Version, channels => #{}}};
|
|
|
+ ?tp(iotdb_bridge_started, #{driver => restapi, instance_id => InstanceId}),
|
|
|
+ {ok, State#{driver => restapi, iotdb_version => Version, channels => #{}}};
|
|
|
{error, Reason} ->
|
|
|
?SLOG(error, #{
|
|
|
msg => "failed_to_start_iotdb_bridge",
|
|
|
@@ -365,7 +365,7 @@ on_start(
|
|
|
end.
|
|
|
|
|
|
-spec on_stop(manager_id(), state()) -> ok | {error, term()}.
|
|
|
-on_stop(InstanceId, #{driver := rest} = State) ->
|
|
|
+on_stop(InstanceId, #{driver := restapi} = State) ->
|
|
|
?SLOG(info, #{
|
|
|
msg => "stopping_iotdb_bridge",
|
|
|
connector => InstanceId
|
|
|
@@ -384,7 +384,7 @@ on_stop(InstanceId, #{driver := thrift} = _State) ->
|
|
|
|
|
|
-spec on_get_status(manager_id(), state()) ->
|
|
|
connected | connecting | {disconnected, state(), term()}.
|
|
|
-on_get_status(InstanceId, #{driver := rest} = State) ->
|
|
|
+on_get_status(InstanceId, #{driver := restapi} = State) ->
|
|
|
Func = fun(Worker, Timeout) ->
|
|
|
Request = {?IOTDB_PING_PATH, [], undefined},
|
|
|
NRequest = emqx_bridge_http_connector:formalize_request(get, Request),
|
|
|
@@ -459,7 +459,7 @@ on_query_async(
|
|
|
InstanceId,
|
|
|
{ChannelId, _Message} = Req,
|
|
|
ReplyFunAndArgs0,
|
|
|
- #{driver := rest, iotdb_version := IoTDBVsn, channels := Channels} = State
|
|
|
+ #{driver := restapi, iotdb_version := IoTDBVsn, channels := Channels} = State
|
|
|
) ->
|
|
|
?tp(iotdb_bridge_on_query_async, #{instance_id => InstanceId}),
|
|
|
?SLOG(debug, #{
|
|
|
@@ -503,7 +503,7 @@ on_batch_query_async(
|
|
|
InstId,
|
|
|
Requests,
|
|
|
Callback,
|
|
|
- #{driver := rest, iotdb_version := IoTDBVsn, channels := Channels} = State
|
|
|
+ #{driver := restapi, iotdb_version := IoTDBVsn, channels := Channels} = State
|
|
|
) ->
|
|
|
?tp(iotdb_bridge_on_batch_query_async, #{instance_id => InstId}),
|
|
|
[{ChannelId, _Message} | _] = Requests,
|
|
|
@@ -584,7 +584,7 @@ on_format_query_result(Result) ->
|
|
|
|
|
|
on_add_channel(
|
|
|
InstanceId,
|
|
|
- #{driver := rest, iotdb_version := Version, channels := Channels} = OldState0,
|
|
|
+ #{driver := restapi, iotdb_version := Version, channels := Channels} = OldState0,
|
|
|
ChannelId,
|
|
|
#{
|
|
|
parameters := #{data := Data} = Parameter
|
|
|
@@ -647,7 +647,7 @@ on_add_channel(
|
|
|
{ok, OldState#{channels := Channels2}}
|
|
|
end.
|
|
|
|
|
|
-on_remove_channel(InstanceId, #{driver := rest, channels := Channels} = OldState0, ChannelId) ->
|
|
|
+on_remove_channel(InstanceId, #{driver := restapi, channels := Channels} = OldState0, ChannelId) ->
|
|
|
{ok, OldState} = emqx_bridge_http_connector:on_remove_channel(InstanceId, OldState0, ChannelId),
|
|
|
Channels2 = maps:remove(ChannelId, Channels),
|
|
|
{ok, OldState#{channels => Channels2}};
|
|
|
@@ -1075,7 +1075,7 @@ get_data_template(#{data := Data}, _Payloads) when Data =/= [] ->
|
|
|
get_data_template(#{data := []}, Payloads) ->
|
|
|
preproc_data_list(Payloads).
|
|
|
|
|
|
-do_on_query(InstanceId, ChannelId, Data, #{driver := rest} = State) ->
|
|
|
+do_on_query(InstanceId, ChannelId, Data, #{driver := restapi} = State) ->
|
|
|
emqx_bridge_http_connector:on_query(InstanceId, {ChannelId, Data}, State);
|
|
|
do_on_query(InstanceId, _ChannelId, Data, #{driver := thrift} = _State) ->
|
|
|
ecpool:pick_and_do(InstanceId, {iotdb, insert_tablet, [Data]}, no_handover).
|