|
|
@@ -1,10 +1,13 @@
|
|
|
%%--------------------------------------------------------------------
|
|
|
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
|
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
|
%%--------------------------------------------------------------------
|
|
|
--module(emqx_bridge_iotdb_impl).
|
|
|
+-module(emqx_bridge_iotdb_connector).
|
|
|
+
|
|
|
+-behaviour(emqx_resource).
|
|
|
|
|
|
-include("emqx_bridge_iotdb.hrl").
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
+-include_lib("hocon/include/hoconsc.hrl").
|
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
|
|
%% `emqx_resource' API
|
|
|
@@ -14,9 +17,25 @@
|
|
|
on_stop/2,
|
|
|
on_get_status/2,
|
|
|
on_query/3,
|
|
|
- on_query_async/4
|
|
|
+ on_query_async/4,
|
|
|
+ on_add_channel/4,
|
|
|
+ on_remove_channel/3,
|
|
|
+ on_get_channels/1,
|
|
|
+ on_get_channel_status/3
|
|
|
]).
|
|
|
|
|
|
+-export([
|
|
|
+ namespace/0,
|
|
|
+ roots/0,
|
|
|
+ fields/1,
|
|
|
+ desc/1,
|
|
|
+ connector_examples/1,
|
|
|
+ connector_example_values/0
|
|
|
+]).
|
|
|
+
|
|
|
+%% emqx_connector_resource behaviour callbacks
|
|
|
+-export([connector_config/2]).
|
|
|
+
|
|
|
-type config() ::
|
|
|
#{
|
|
|
base_url := #{
|
|
|
@@ -29,33 +48,140 @@
|
|
|
pool_type := random | hash,
|
|
|
pool_size := pos_integer(),
|
|
|
request => undefined | map(),
|
|
|
- is_aligned => boolean(),
|
|
|
- iotdb_version => binary(),
|
|
|
- device_id => binary() | undefined,
|
|
|
atom() => _
|
|
|
}.
|
|
|
|
|
|
-type state() ::
|
|
|
#{
|
|
|
base_path := _,
|
|
|
- base_url := #{
|
|
|
- scheme := http | https,
|
|
|
- host := iolist(),
|
|
|
- port := inet:port_number(),
|
|
|
- path := _
|
|
|
- },
|
|
|
connect_timeout := pos_integer(),
|
|
|
pool_type := random | hash,
|
|
|
- pool_size := pos_integer(),
|
|
|
+ channels := map(),
|
|
|
request => undefined | map(),
|
|
|
- is_aligned => boolean(),
|
|
|
- iotdb_version => binary(),
|
|
|
- device_id => binary() | undefined,
|
|
|
atom() => _
|
|
|
}.
|
|
|
|
|
|
-type manager_id() :: binary().
|
|
|
|
|
|
+-define(CONNECTOR_TYPE, iotdb).
|
|
|
+
|
|
|
+-import(hoconsc, [mk/2, enum/1, ref/2]).
|
|
|
+
|
|
|
+%%-------------------------------------------------------------------------------------
|
|
|
+%% connector examples
|
|
|
+%%-------------------------------------------------------------------------------------
|
|
|
+connector_examples(Method) ->
|
|
|
+ [
|
|
|
+ #{
|
|
|
+ <<"iotdb">> =>
|
|
|
+ #{
|
|
|
+ summary => <<"Apache IoTDB Connector">>,
|
|
|
+ value => emqx_connector_schema:connector_values(
|
|
|
+ Method, ?CONNECTOR_TYPE, connector_example_values()
|
|
|
+ )
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ].
|
|
|
+
|
|
|
+connector_example_values() ->
|
|
|
+ #{
|
|
|
+ name => <<"iotdb_connector">>,
|
|
|
+ type => iotdb,
|
|
|
+ enable => true,
|
|
|
+ authentication => #{
|
|
|
+ <<"username">> => <<"root">>,
|
|
|
+ <<"password">> => <<"*****">>
|
|
|
+ },
|
|
|
+ base_url => <<"http://iotdb.local:18080/">>,
|
|
|
+ connect_timeout => <<"15s">>,
|
|
|
+ pool_type => <<"random">>,
|
|
|
+ pool_size => 8,
|
|
|
+ enable_pipelining => 100,
|
|
|
+ ssl => #{enable => false}
|
|
|
+ }.
|
|
|
+
|
|
|
+%%-------------------------------------------------------------------------------------
|
|
|
+%% schema
|
|
|
+%%-------------------------------------------------------------------------------------
|
|
|
+namespace() -> "iotdb".
|
|
|
+
|
|
|
+roots() ->
|
|
|
+ [{config, #{type => hoconsc:ref(?MODULE, config)}}].
|
|
|
+
|
|
|
+fields(config) ->
|
|
|
+ proplists_without([url, headers], emqx_bridge_http_schema:fields("config_connector")) ++
|
|
|
+ fields("connection_fields");
|
|
|
+fields("connection_fields") ->
|
|
|
+ [
|
|
|
+ {base_url,
|
|
|
+ mk(
|
|
|
+ emqx_schema:url(),
|
|
|
+ #{
|
|
|
+ required => true,
|
|
|
+ desc => ?DESC(emqx_bridge_iotdb, "config_base_url")
|
|
|
+ }
|
|
|
+ )},
|
|
|
+ {authentication,
|
|
|
+ mk(
|
|
|
+ hoconsc:union([ref(?MODULE, auth_basic)]),
|
|
|
+ #{
|
|
|
+ default => auth_basic, desc => ?DESC("config_authentication")
|
|
|
+ }
|
|
|
+ )}
|
|
|
+ ];
|
|
|
+fields(auth_basic) ->
|
|
|
+ [
|
|
|
+ {username, mk(binary(), #{required => true, desc => ?DESC("config_auth_basic_username")})},
|
|
|
+ {password,
|
|
|
+ emqx_schema_secret:mk(#{
|
|
|
+ required => true,
|
|
|
+ desc => ?DESC("config_auth_basic_password")
|
|
|
+ })}
|
|
|
+ ];
|
|
|
+fields("post") ->
|
|
|
+ emqx_connector_schema:type_and_name_fields(enum([iotdb])) ++ fields(config);
|
|
|
+fields("put") ->
|
|
|
+ fields(config);
|
|
|
+fields("get") ->
|
|
|
+ emqx_bridge_schema:status_fields() ++ fields("post").
|
|
|
+
|
|
|
+desc(config) ->
|
|
|
+ ?DESC("desc_config");
|
|
|
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
|
|
+ ["Configuration for IoTDB using `", string:to_upper(Method), "` method."];
|
|
|
+desc(_) ->
|
|
|
+ undefined.
|
|
|
+
|
|
|
+connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) ->
|
|
|
+ #{
|
|
|
+ base_url := BaseUrl,
|
|
|
+ authentication :=
|
|
|
+ #{
|
|
|
+ username := Username,
|
|
|
+ password := Password0
|
|
|
+ }
|
|
|
+ } = Conf,
|
|
|
+
|
|
|
+ Password = emqx_secret:unwrap(Password0),
|
|
|
+ BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>),
|
|
|
+
|
|
|
+ WebhookConfig =
|
|
|
+ Conf#{
|
|
|
+ url => BaseUrl,
|
|
|
+ headers => [
|
|
|
+ {<<"Content-type">>, <<"application/json">>},
|
|
|
+ {<<"Authorization">>, BasicToken}
|
|
|
+ ]
|
|
|
+ },
|
|
|
+ ParseConfs(
|
|
|
+ <<"http">>,
|
|
|
+ Name,
|
|
|
+ WebhookConfig
|
|
|
+ ).
|
|
|
+
|
|
|
+proplists_without(Keys, List) ->
|
|
|
+ [El || El = {K, _} <- List, not lists:member(K, Keys)].
|
|
|
+
|
|
|
%%-------------------------------------------------------------------------------------
|
|
|
%% `emqx_resource' API
|
|
|
%%-------------------------------------------------------------------------------------
|
|
|
@@ -73,7 +199,7 @@ on_start(InstanceId, Config) ->
|
|
|
request => maps:get(request, State, <<>>)
|
|
|
}),
|
|
|
?tp(iotdb_bridge_started, #{instance_id => InstanceId}),
|
|
|
- {ok, maps:merge(Config, State)};
|
|
|
+ {ok, State#{channels => #{}}};
|
|
|
{error, Reason} ->
|
|
|
?SLOG(error, #{
|
|
|
msg => "failed_to_start_iotdb_bridge",
|
|
|
@@ -103,19 +229,20 @@ on_get_status(InstanceId, State) ->
|
|
|
{ok, pos_integer(), [term()], term()}
|
|
|
| {ok, pos_integer(), [term()]}
|
|
|
| {error, term()}.
|
|
|
-on_query(InstanceId, {send_message, Message}, State) ->
|
|
|
+on_query(InstanceId, {ChannelId, _Message} = Req, #{channels := Channels} = State) ->
|
|
|
?tp(iotdb_bridge_on_query, #{instance_id => InstanceId}),
|
|
|
?SLOG(debug, #{
|
|
|
msg => "iotdb_bridge_on_query_called",
|
|
|
instance_id => InstanceId,
|
|
|
- send_message => Message,
|
|
|
+ send_message => Req,
|
|
|
state => emqx_utils:redact(State)
|
|
|
}),
|
|
|
- case make_iotdb_insert_request(Message, State) of
|
|
|
+
|
|
|
+ case try_render_message(Req, Channels) of
|
|
|
{ok, IoTDBPayload} ->
|
|
|
handle_response(
|
|
|
emqx_bridge_http_connector:on_query(
|
|
|
- InstanceId, {send_message, IoTDBPayload}, State
|
|
|
+ InstanceId, {ChannelId, IoTDBPayload}, State
|
|
|
)
|
|
|
);
|
|
|
Error ->
|
|
|
@@ -124,15 +251,17 @@ on_query(InstanceId, {send_message, Message}, State) ->
|
|
|
|
|
|
-spec on_query_async(manager_id(), {send_message, map()}, {function(), [term()]}, state()) ->
|
|
|
{ok, pid()} | {error, empty_request}.
|
|
|
-on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) ->
|
|
|
+on_query_async(
|
|
|
+ InstanceId, {ChannelId, _Message} = Req, ReplyFunAndArgs0, #{channels := Channels} = State
|
|
|
+) ->
|
|
|
?tp(iotdb_bridge_on_query_async, #{instance_id => InstanceId}),
|
|
|
?SLOG(debug, #{
|
|
|
msg => "iotdb_bridge_on_query_async_called",
|
|
|
instance_id => InstanceId,
|
|
|
- send_message => Message,
|
|
|
+ send_message => Req,
|
|
|
state => emqx_utils:redact(State)
|
|
|
}),
|
|
|
- case make_iotdb_insert_request(Message, State) of
|
|
|
+ case try_render_message(Req, Channels) of
|
|
|
{ok, IoTDBPayload} ->
|
|
|
ReplyFunAndArgs =
|
|
|
{
|
|
|
@@ -143,12 +272,71 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) ->
|
|
|
[]
|
|
|
},
|
|
|
emqx_bridge_http_connector:on_query_async(
|
|
|
- InstanceId, {send_message, IoTDBPayload}, ReplyFunAndArgs, State
|
|
|
+ InstanceId, {ChannelId, IoTDBPayload}, ReplyFunAndArgs, State
|
|
|
);
|
|
|
Error ->
|
|
|
Error
|
|
|
end.
|
|
|
|
|
|
+on_add_channel(
|
|
|
+ InstanceId,
|
|
|
+ #{channels := Channels} = OldState0,
|
|
|
+ ChannelId,
|
|
|
+ #{
|
|
|
+ parameters := #{iotdb_version := Version, data := Data} = Parameter
|
|
|
+ }
|
|
|
+) ->
|
|
|
+ case maps:is_key(ChannelId, Channels) of
|
|
|
+ true ->
|
|
|
+ {error, already_exists};
|
|
|
+ _ ->
|
|
|
+ %% update HTTP channel
|
|
|
+ InsertTabletPathV1 = <<"rest/v1/insertTablet">>,
|
|
|
+ InsertTabletPathV2 = <<"rest/v2/insertTablet">>,
|
|
|
+
|
|
|
+ Path =
|
|
|
+ case Version of
|
|
|
+ ?VSN_1_1_X -> InsertTabletPathV2;
|
|
|
+ _ -> InsertTabletPathV1
|
|
|
+ end,
|
|
|
+
|
|
|
+ HTTPReq = #{
|
|
|
+ parameters => Parameter#{
|
|
|
+ path => Path,
|
|
|
+ method => <<"post">>
|
|
|
+ }
|
|
|
+ },
|
|
|
+
|
|
|
+ {ok, OldState} = emqx_bridge_http_connector:on_add_channel(
|
|
|
+ InstanceId, OldState0, ChannelId, HTTPReq
|
|
|
+ ),
|
|
|
+
|
|
|
+ %% update IoTDB channel
|
|
|
+ DeviceId = maps:get(device_id, Parameter, <<>>),
|
|
|
+ Channel = Parameter#{
|
|
|
+ device_id => emqx_placeholder:preproc_tmpl(DeviceId),
|
|
|
+ data := preproc_data_template(Data)
|
|
|
+ },
|
|
|
+ Channels2 = Channels#{ChannelId => Channel},
|
|
|
+ {ok, OldState#{channels := Channels2}}
|
|
|
+ end.
|
|
|
+
|
|
|
+on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) ->
|
|
|
+ {ok, OldState} = emqx_bridge_http_connector:on_remove_channel(InstanceId, OldState0, ChannelId),
|
|
|
+ Channels2 = maps:remove(ChannelId, Channels),
|
|
|
+ {ok, OldState#{channels => Channels2}}.
|
|
|
+
|
|
|
+on_get_channels(InstanceId) ->
|
|
|
+ emqx_bridge_v2:get_channels_for_connector(InstanceId).
|
|
|
+
|
|
|
+on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
|
|
|
+ case maps:is_key(ChannelId, Channels) of
|
|
|
+ true ->
|
|
|
+ connected;
|
|
|
+ _ ->
|
|
|
+ {error, not_exists}
|
|
|
+ end.
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Internal Functions
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -238,14 +426,14 @@ iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) ->
|
|
|
iot_timestamp(TimestampTkn, Msg, Nows) ->
|
|
|
iot_timestamp(emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows).
|
|
|
|
|
|
+iot_timestamp(<<"now_us">>, #{now_us := NowUs}) ->
|
|
|
+ NowUs;
|
|
|
+iot_timestamp(<<"now_ns">>, #{now_ns := NowNs}) ->
|
|
|
+ NowNs;
|
|
|
iot_timestamp(Timestamp, #{now_ms := NowMs}) when
|
|
|
Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>>
|
|
|
->
|
|
|
NowMs;
|
|
|
-iot_timestamp(Timestamp, #{now_us := NowUs}) when Timestamp =:= <<"now_us">> ->
|
|
|
- NowUs;
|
|
|
-iot_timestamp(Timestamp, #{now_ns := NowNs}) when Timestamp =:= <<"now_ns">> ->
|
|
|
- NowNs;
|
|
|
iot_timestamp(Timestamp, _) when is_binary(Timestamp) ->
|
|
|
binary_to_integer(Timestamp).
|
|
|
|
|
|
@@ -304,25 +492,13 @@ convert_float(Str) when is_binary(Str) ->
|
|
|
convert_float(undefined) ->
|
|
|
null.
|
|
|
|
|
|
-make_iotdb_insert_request(Message, State) ->
|
|
|
- Payloads = to_list(parse_payload(get_payload(Message))),
|
|
|
- IsAligned = maps:get(is_aligned, State, false),
|
|
|
- IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_1_X),
|
|
|
- case {device_id(Message, Payloads, State), preproc_data_list(Payloads)} of
|
|
|
- {undefined, _} ->
|
|
|
- {error, device_id_missing};
|
|
|
- {_, []} ->
|
|
|
- {error, invalid_data};
|
|
|
- {DeviceId, PreProcessedData} ->
|
|
|
- DataList = proc_data(PreProcessedData, Message),
|
|
|
- InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
|
|
|
- Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
|
|
|
- {ok,
|
|
|
- maps:merge(Rows, #{
|
|
|
- iotdb_field_key(is_aligned, IotDBVsn) => IsAligned,
|
|
|
- iotdb_field_key(device_id, IotDBVsn) => DeviceId
|
|
|
- })}
|
|
|
- end.
|
|
|
+make_iotdb_insert_request(DataList, IsAligned, DeviceId, IotDBVsn) ->
|
|
|
+ InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
|
|
|
+ Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
|
|
|
+ maps:merge(Rows, #{
|
|
|
+ iotdb_field_key(is_aligned, IotDBVsn) => IsAligned,
|
|
|
+ iotdb_field_key(device_id, IotDBVsn) => DeviceId
|
|
|
+ }).
|
|
|
|
|
|
replace_dtypes(Rows0, IotDBVsn) ->
|
|
|
{Types, Rows} = maps:take(dtypes, Rows0),
|
|
|
@@ -404,13 +580,12 @@ iotdb_field_key(data_types, ?VSN_0_13_X) ->
|
|
|
to_list(List) when is_list(List) -> List;
|
|
|
to_list(Data) -> [Data].
|
|
|
|
|
|
-device_id(Message, Payloads, State) ->
|
|
|
- case maps:get(device_id, State, undefined) of
|
|
|
- undefined ->
|
|
|
- %% [FIXME] there could be conflicting device-ids in the Payloads
|
|
|
+%% If device_id is missing from the channel data, try to find it from the payload
|
|
|
+device_id(Message, Payloads, Channel) ->
|
|
|
+ case maps:get(device_id, Channel, <<>>) of
|
|
|
+ <<>> ->
|
|
|
maps:get(<<"device_id">>, hd(Payloads), undefined);
|
|
|
- DeviceId ->
|
|
|
- DeviceIdTkn = emqx_placeholder:preproc_tmpl(DeviceId),
|
|
|
+ DeviceIdTkn ->
|
|
|
emqx_placeholder:proc_tmpl(DeviceIdTkn, Message)
|
|
|
end.
|
|
|
|
|
|
@@ -430,3 +605,47 @@ eval_response_body(Body, Resp) ->
|
|
|
#{<<"code">> := 200} -> Resp;
|
|
|
Reason -> {error, Reason}
|
|
|
end.
|
|
|
+
|
|
|
+preproc_data_template(DataList) ->
|
|
|
+ lists:map(
|
|
|
+ fun(
|
|
|
+ #{
|
|
|
+ timestamp := Timestamp,
|
|
|
+ measurement := Measurement,
|
|
|
+ data_type := DataType,
|
|
|
+ value := Value
|
|
|
+ }
|
|
|
+ ) ->
|
|
|
+ #{
|
|
|
+ timestamp => emqx_placeholder:preproc_tmpl(Timestamp),
|
|
|
+ measurement => emqx_placeholder:preproc_tmpl(Measurement),
|
|
|
+ data_type => DataType,
|
|
|
+ value => emqx_placeholder:preproc_tmpl(Value)
|
|
|
+ }
|
|
|
+ end,
|
|
|
+ DataList
|
|
|
+ ).
|
|
|
+
|
|
|
+try_render_message({ChannelId, Msg}, Channels) ->
|
|
|
+ case maps:find(ChannelId, Channels) of
|
|
|
+ {ok, Channel} ->
|
|
|
+ {ok, render_channel_message(Channel, Msg)};
|
|
|
+ _ ->
|
|
|
+ {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}}
|
|
|
+ end.
|
|
|
+
|
|
|
+render_channel_message(#{is_aligned := IsAligned, iotdb_version := IoTDBVsn} = Channel, Message) ->
|
|
|
+ Payloads = to_list(parse_payload(get_payload(Message))),
|
|
|
+ DataTemplate = get_data_template(Channel, Payloads),
|
|
|
+ DataList = proc_data(DataTemplate, Message),
|
|
|
+ DeviceId = device_id(Message, Payloads, Channel),
|
|
|
+ make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn).
|
|
|
+
|
|
|
+%% Get the message template.
|
|
|
+%% In order to be compatible with 4.4, the template version has higher priority
|
|
|
+%% This is a template, using it
|
|
|
+get_data_template(#{data := Data}, _Payloads) when Data =/= [] ->
|
|
|
+ Data;
|
|
|
+%% This is a self-describing message
|
|
|
+get_data_template(#{data := []}, Payloads) ->
|
|
|
+ preproc_data_list(Payloads).
|