|
|
@@ -17,7 +17,6 @@
|
|
|
-export([
|
|
|
resource_type/0,
|
|
|
callback_mode/0,
|
|
|
- callback_mode/1,
|
|
|
on_start/2,
|
|
|
on_stop/2,
|
|
|
on_get_status/2,
|
|
|
@@ -32,8 +31,6 @@
|
|
|
on_format_query_result/1
|
|
|
]).
|
|
|
|
|
|
--export([connect/1, do_get_status/1]).
|
|
|
-
|
|
|
-export([
|
|
|
namespace/0,
|
|
|
roots/0,
|
|
|
@@ -48,8 +45,7 @@
|
|
|
|
|
|
-type config() ::
|
|
|
#{
|
|
|
- driver := driver(),
|
|
|
- request_base => #{
|
|
|
+ request_base := #{
|
|
|
scheme := http | https,
|
|
|
host := iolist(),
|
|
|
port := inet:port_number()
|
|
|
@@ -57,17 +53,18 @@
|
|
|
connect_timeout := pos_integer(),
|
|
|
pool_type := random | hash,
|
|
|
pool_size := pos_integer(),
|
|
|
- iotdb_version => atom(),
|
|
|
- protocol_version => atom(),
|
|
|
+ iotdb_version := atom(),
|
|
|
request => undefined | map(),
|
|
|
atom() => _
|
|
|
}.
|
|
|
|
|
|
-type state() ::
|
|
|
#{
|
|
|
- driver := driver(),
|
|
|
+ connect_timeout := pos_integer(),
|
|
|
+ pool_type := random | hash,
|
|
|
channels := map(),
|
|
|
iotdb_version := atom(),
|
|
|
+ request => undefined | map(),
|
|
|
atom() => _
|
|
|
}.
|
|
|
|
|
|
@@ -118,13 +115,13 @@ connector_example_values() ->
|
|
|
namespace() -> "iotdb".
|
|
|
|
|
|
roots() ->
|
|
|
- [].
|
|
|
+ [{config, #{type => hoconsc:ref(?MODULE, config)}}].
|
|
|
|
|
|
-fields("config_restapi") ->
|
|
|
+fields(config) ->
|
|
|
proplists_without(
|
|
|
[url, request, retry_interval, headers],
|
|
|
emqx_bridge_http_schema:fields("config_connector")
|
|
|
- ) ++ common_fields(restapi) ++
|
|
|
+ ) ++
|
|
|
fields("connection_fields");
|
|
|
fields("connection_fields") ->
|
|
|
[
|
|
|
@@ -146,88 +143,38 @@ fields("connection_fields") ->
|
|
|
)},
|
|
|
{authentication,
|
|
|
mk(
|
|
|
- hoconsc:union([ref(?MODULE, authentication)]),
|
|
|
+ hoconsc:union([ref(?MODULE, auth_basic)]),
|
|
|
#{
|
|
|
default => auth_basic, desc => ?DESC("config_authentication")
|
|
|
}
|
|
|
)}
|
|
|
];
|
|
|
-fields(authentication) ->
|
|
|
+fields(auth_basic) ->
|
|
|
[
|
|
|
- {username, mk(binary(), #{required => true, desc => ?DESC("config_auth_username")})},
|
|
|
+ {username, mk(binary(), #{required => true, desc => ?DESC("config_auth_basic_username")})},
|
|
|
{password,
|
|
|
emqx_schema_secret:mk(#{
|
|
|
required => true,
|
|
|
- desc => ?DESC("config_auth_password")
|
|
|
+ desc => ?DESC("config_auth_basic_password")
|
|
|
})}
|
|
|
];
|
|
|
-fields("config_thrift") ->
|
|
|
- Meta = #{desc => ?DESC("server")},
|
|
|
- emqx_connector_schema:common_fields() ++
|
|
|
- common_fields(thrift) ++
|
|
|
- [
|
|
|
- {server, emqx_schema:servers_sc(Meta, ?THRIFT_HOST_OPTIONS)},
|
|
|
- {protocol_version,
|
|
|
- mk(
|
|
|
- hoconsc:enum([?PROTOCOL_V1, ?PROTOCOL_V2, ?PROTOCOL_V3]),
|
|
|
- #{
|
|
|
- desc => ?DESC("config_protocol_version"),
|
|
|
- default => ?PROTOCOL_V3
|
|
|
- }
|
|
|
- )},
|
|
|
- {'zoneId',
|
|
|
- mk(
|
|
|
- binary(),
|
|
|
- #{default => <<"Asia/Shanghai">>, desc => ?DESC("config_zoneId")}
|
|
|
- )},
|
|
|
- {pool_size,
|
|
|
- mk(
|
|
|
- pos_integer(),
|
|
|
- #{
|
|
|
- default => 8,
|
|
|
- desc => ?DESC("pool_size")
|
|
|
- }
|
|
|
- )}
|
|
|
- ] ++ fields(authentication) ++ emqx_connector_schema_lib:ssl_fields() ++
|
|
|
- emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
|
|
|
-fields(connector_resource_opts) ->
|
|
|
- emqx_connector_schema:resource_opts_fields();
|
|
|
-fields("post_" ++ Driver) ->
|
|
|
- emqx_connector_schema:type_and_name_fields(enum([iotdb])) ++ fields("config_" ++ Driver);
|
|
|
-fields("put_" ++ Driver) ->
|
|
|
- fields("config_" ++ Driver);
|
|
|
-fields("get_" ++ Driver) ->
|
|
|
- emqx_bridge_schema:status_fields() ++ fields("post_" ++ Driver).
|
|
|
-
|
|
|
-common_fields(Driver) ->
|
|
|
- [
|
|
|
- {driver,
|
|
|
- mk(
|
|
|
- hoconsc:enum([Driver]),
|
|
|
- #{
|
|
|
- desc => ?DESC("config_driver"),
|
|
|
- default => <<"restapi">>
|
|
|
- }
|
|
|
- )}
|
|
|
- ].
|
|
|
-
|
|
|
-desc(authentication) ->
|
|
|
- ?DESC("config_authentication");
|
|
|
-desc(connector_resource_opts) ->
|
|
|
- "Connector resource options";
|
|
|
-desc(Struct) when is_list(Struct) ->
|
|
|
- case string:split(Struct, "_") of
|
|
|
- ["config", _] ->
|
|
|
- ?DESC("desc_config");
|
|
|
- [Method, _] when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
|
|
- ["Configuration for IoTDB using `", string:to_upper(Method), "` method."];
|
|
|
- _ ->
|
|
|
- undefined
|
|
|
- end;
|
|
|
+fields("post") ->
|
|
|
+ emqx_connector_schema:type_and_name_fields(enum([iotdb])) ++ fields(config);
|
|
|
+fields("put") ->
|
|
|
+ fields(config);
|
|
|
+fields("get") ->
|
|
|
+ emqx_bridge_schema:status_fields() ++ fields("post").
|
|
|
+
|
|
|
+desc(config) ->
|
|
|
+ ?DESC("desc_config");
|
|
|
+desc(auth_basic) ->
|
|
|
+ "Basic Authentication";
|
|
|
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
|
|
+ ["Configuration for IoTDB using `", string:to_upper(Method), "` method."];
|
|
|
desc(_) ->
|
|
|
undefined.
|
|
|
|
|
|
-connector_config(#{driver := restapi} = Conf, #{name := Name, parse_confs := ParseConfs}) ->
|
|
|
+connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) ->
|
|
|
#{
|
|
|
base_url := BaseUrl,
|
|
|
authentication :=
|
|
|
@@ -252,9 +199,7 @@ connector_config(#{driver := restapi} = Conf, #{name := Name, parse_confs := Par
|
|
|
<<"http">>,
|
|
|
Name,
|
|
|
WebhookConfig
|
|
|
- );
|
|
|
-connector_config(Conf, _) ->
|
|
|
- Conf.
|
|
|
+ ).
|
|
|
|
|
|
proplists_without(Keys, List) ->
|
|
|
[El || El = {K, _} <- List, not lists:member(K, Keys)].
|
|
|
@@ -266,13 +211,8 @@ resource_type() -> iotdb.
|
|
|
|
|
|
callback_mode() -> async_if_possible.
|
|
|
|
|
|
-callback_mode(#{driver := restapi}) ->
|
|
|
- async_if_possible;
|
|
|
-callback_mode(#{driver := thrift}) ->
|
|
|
- always_sync.
|
|
|
-
|
|
|
-spec on_start(manager_id(), config()) -> {ok, state()} | no_return().
|
|
|
-on_start(InstanceId, #{driver := restapi, iotdb_version := Version} = Config) ->
|
|
|
+on_start(InstanceId, #{iotdb_version := Version} = Config) ->
|
|
|
%% [FIXME] The configuration passed in here is pre-processed and transformed
|
|
|
%% in emqx_bridge_resource:parse_confs/2.
|
|
|
case emqx_bridge_http_connector:on_start(InstanceId, Config) of
|
|
|
@@ -282,78 +222,8 @@ on_start(InstanceId, #{driver := restapi, iotdb_version := Version} = Config) ->
|
|
|
instance_id => InstanceId,
|
|
|
request => emqx_utils:redact(maps:get(request, State, <<>>))
|
|
|
}),
|
|
|
- ?tp(iotdb_bridge_started, #{driver => restapi, instance_id => InstanceId}),
|
|
|
- {ok, State#{driver => restapi, iotdb_version => Version, channels => #{}}};
|
|
|
- {error, Reason} ->
|
|
|
- ?SLOG(error, #{
|
|
|
- msg => "failed_to_start_iotdb_bridge",
|
|
|
- instance_id => InstanceId,
|
|
|
- request => emqx_utils:redact(maps:get(request, Config, <<>>)),
|
|
|
- reason => Reason
|
|
|
- }),
|
|
|
- throw(failed_to_start_iotdb_bridge)
|
|
|
- end;
|
|
|
-on_start(
|
|
|
- InstanceId,
|
|
|
- #{
|
|
|
- driver := thrift,
|
|
|
- protocol_version := ProtocolVsn,
|
|
|
- server := Server,
|
|
|
- pool_size := PoolSize,
|
|
|
- ssl := SSL
|
|
|
- } = Config
|
|
|
-) ->
|
|
|
- IoTDBOpts0 = maps:with(['zoneId', username, password], Config),
|
|
|
-
|
|
|
- Version =
|
|
|
- case ProtocolVsn of
|
|
|
- ?PROTOCOL_V1 ->
|
|
|
- 0;
|
|
|
- ?PROTOCOL_V2 ->
|
|
|
- 1;
|
|
|
- ?PROTOCOL_V3 ->
|
|
|
- 2
|
|
|
- end,
|
|
|
-
|
|
|
- #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?THRIFT_HOST_OPTIONS),
|
|
|
-
|
|
|
- TransportOpts =
|
|
|
- case maps:get(enable, SSL) of
|
|
|
- true ->
|
|
|
- #{
|
|
|
- ssltransport => true,
|
|
|
- ssloptions => emqx_tls_lib:to_client_opts(SSL)
|
|
|
- };
|
|
|
- false ->
|
|
|
- #{}
|
|
|
- end,
|
|
|
-
|
|
|
- IoTDBOpts = IoTDBOpts0#{
|
|
|
- version => Version,
|
|
|
- host => Host,
|
|
|
- port => Port,
|
|
|
- options => TransportOpts
|
|
|
- },
|
|
|
-
|
|
|
- Options = [
|
|
|
- {pool_size, PoolSize},
|
|
|
- {iotdb_options, IoTDBOpts}
|
|
|
- ],
|
|
|
-
|
|
|
- case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
|
|
|
- ok ->
|
|
|
- ?SLOG(info, #{
|
|
|
- msg => "iotdb_bridge_started",
|
|
|
- instance_id => InstanceId
|
|
|
- }),
|
|
|
-
|
|
|
- ?tp(iotdb_bridge_started, #{driver => thrift, instance_id => InstanceId}),
|
|
|
-
|
|
|
- {ok, #{
|
|
|
- driver => thrift,
|
|
|
- iotdb_version => ProtocolVsn,
|
|
|
- channels => #{}
|
|
|
- }};
|
|
|
+ ?tp(iotdb_bridge_started, #{instance_id => InstanceId}),
|
|
|
+ {ok, State#{iotdb_version => Version, channels => #{}}};
|
|
|
{error, Reason} ->
|
|
|
?SLOG(error, #{
|
|
|
msg => "failed_to_start_iotdb_bridge",
|
|
|
@@ -365,26 +235,18 @@ on_start(
|
|
|
end.
|
|
|
|
|
|
-spec on_stop(manager_id(), state()) -> ok | {error, term()}.
|
|
|
-on_stop(InstanceId, #{driver := restapi} = State) ->
|
|
|
+on_stop(InstanceId, State) ->
|
|
|
?SLOG(info, #{
|
|
|
msg => "stopping_iotdb_bridge",
|
|
|
connector => InstanceId
|
|
|
}),
|
|
|
Res = emqx_bridge_http_connector:on_stop(InstanceId, State),
|
|
|
?tp(iotdb_bridge_stopped, #{instance_id => InstanceId}),
|
|
|
- Res;
|
|
|
-on_stop(InstanceId, #{driver := thrift} = _State) ->
|
|
|
- ?SLOG(info, #{
|
|
|
- msg => "stopping_iotdb_bridge",
|
|
|
- connector => InstanceId
|
|
|
- }),
|
|
|
-
|
|
|
- ?tp(iotdb_bridge_stopped, #{instance_id => InstanceId}),
|
|
|
- emqx_resource_pool:stop(InstanceId).
|
|
|
+ Res.
|
|
|
|
|
|
-spec on_get_status(manager_id(), state()) ->
|
|
|
connected | connecting | {disconnected, state(), term()}.
|
|
|
-on_get_status(InstanceId, #{driver := restapi} = State) ->
|
|
|
+on_get_status(InstanceId, State) ->
|
|
|
Func = fun(Worker, Timeout) ->
|
|
|
Request = {?IOTDB_PING_PATH, [], undefined},
|
|
|
NRequest = emqx_bridge_http_connector:formalize_request(get, Request),
|
|
|
@@ -403,27 +265,7 @@ on_get_status(InstanceId, #{driver := restapi} = State) ->
|
|
|
{error, {unexpected_ping_result, Result}}
|
|
|
end
|
|
|
end,
|
|
|
- emqx_bridge_http_connector:on_get_status(InstanceId, State, Func);
|
|
|
-on_get_status(InstanceId, #{driver := thrift} = _State) ->
|
|
|
- case emqx_resource_pool:health_check_workers(InstanceId, fun ?MODULE:do_get_status/1) of
|
|
|
- true ->
|
|
|
- ?status_connected;
|
|
|
- false ->
|
|
|
- ?status_disconnected
|
|
|
- end.
|
|
|
-
|
|
|
-do_get_status(Conn) ->
|
|
|
- case iotdb:ping(Conn) of
|
|
|
- {ok, _} ->
|
|
|
- true;
|
|
|
- {error, _} ->
|
|
|
- false
|
|
|
- end.
|
|
|
-
|
|
|
-connect(Opts) ->
|
|
|
- {iotdb_options, #{password := Password} = IoTDBOpts0} = lists:keyfind(iotdb_options, 1, Opts),
|
|
|
- IoTDBOpts = IoTDBOpts0#{password := emqx_secret:unwrap(Password)},
|
|
|
- iotdb:start_link(IoTDBOpts).
|
|
|
+ emqx_bridge_http_connector:on_get_status(InstanceId, State, Func).
|
|
|
|
|
|
-spec on_query(manager_id(), {send_message, map()}, state()) ->
|
|
|
{ok, pos_integer(), [term()], term()}
|
|
|
@@ -445,8 +287,8 @@ on_query(
|
|
|
case try_render_messages([Req], IoTDBVsn, Channels) of
|
|
|
{ok, [IoTDBPayload]} ->
|
|
|
handle_response(
|
|
|
- do_on_query(
|
|
|
- InstanceId, ChannelId, IoTDBPayload, State
|
|
|
+ emqx_bridge_http_connector:on_query(
|
|
|
+ InstanceId, {ChannelId, IoTDBPayload}, State
|
|
|
)
|
|
|
);
|
|
|
Error ->
|
|
|
@@ -459,7 +301,7 @@ on_query_async(
|
|
|
InstanceId,
|
|
|
{ChannelId, _Message} = Req,
|
|
|
ReplyFunAndArgs0,
|
|
|
- #{driver := restapi, iotdb_version := IoTDBVsn, channels := Channels} = State
|
|
|
+ #{iotdb_version := IoTDBVsn, channels := Channels} = State
|
|
|
) ->
|
|
|
?tp(iotdb_bridge_on_query_async, #{instance_id => InstanceId}),
|
|
|
?SLOG(debug, #{
|
|
|
@@ -483,27 +325,13 @@ on_query_async(
|
|
|
);
|
|
|
Error ->
|
|
|
Error
|
|
|
- end;
|
|
|
-on_query_async(
|
|
|
- InstanceId,
|
|
|
- Req,
|
|
|
- _ReplyFunAndArgs0,
|
|
|
- #{driver := thrift} = State
|
|
|
-) ->
|
|
|
- ?SLOG(error, #{
|
|
|
- msg => "iotdb_bridge_async_query_failed",
|
|
|
- instance_id => InstanceId,
|
|
|
- send_message => Req,
|
|
|
- reason => ?THRIFT_NOT_SUPPORT_ASYNC_MSG,
|
|
|
- state => emqx_utils:redact(State)
|
|
|
- }),
|
|
|
- {error, not_support}.
|
|
|
+ end.
|
|
|
|
|
|
on_batch_query_async(
|
|
|
InstId,
|
|
|
Requests,
|
|
|
Callback,
|
|
|
- #{driver := restapi, iotdb_version := IoTDBVsn, channels := Channels} = State
|
|
|
+ #{iotdb_version := IoTDBVsn, channels := Channels} = State
|
|
|
) ->
|
|
|
?tp(iotdb_bridge_on_batch_query_async, #{instance_id => InstId}),
|
|
|
[{ChannelId, _Message} | _] = Requests,
|
|
|
@@ -533,23 +361,8 @@ on_batch_query_async(
|
|
|
);
|
|
|
Error ->
|
|
|
Error
|
|
|
- end;
|
|
|
-on_batch_query_async(
|
|
|
- InstanceId,
|
|
|
- Req,
|
|
|
- _ReplyFunAndArgs0,
|
|
|
- #{driver := thrift} = State
|
|
|
-) ->
|
|
|
- ?SLOG(error, #{
|
|
|
- msg => "iotdb_bridge_async_query_failed",
|
|
|
- instance_id => InstanceId,
|
|
|
- send_message => Req,
|
|
|
- reason => ?THRIFT_NOT_SUPPORT_ASYNC_MSG,
|
|
|
- state => emqx_utils:redact(State)
|
|
|
- }),
|
|
|
- {error, not_support}.
|
|
|
+ end.
|
|
|
|
|
|
-%% todo
|
|
|
on_batch_query(
|
|
|
InstId,
|
|
|
[{ChannelId, _Message}] = Requests,
|
|
|
@@ -568,8 +381,8 @@ on_batch_query(
|
|
|
lists:map(
|
|
|
fun(IoTDBPayload) ->
|
|
|
handle_response(
|
|
|
- do_on_query(
|
|
|
- InstId, ChannelId, IoTDBPayload, State
|
|
|
+ emqx_bridge_http_connector:on_query(
|
|
|
+ InstId, {ChannelId, IoTDBPayload}, State
|
|
|
)
|
|
|
)
|
|
|
end,
|
|
|
@@ -584,7 +397,7 @@ on_format_query_result(Result) ->
|
|
|
|
|
|
on_add_channel(
|
|
|
InstanceId,
|
|
|
- #{driver := restapi, iotdb_version := Version, channels := Channels} = OldState0,
|
|
|
+ #{iotdb_version := Version, channels := Channels} = OldState0,
|
|
|
ChannelId,
|
|
|
#{
|
|
|
parameters := #{data := Data} = Parameter
|
|
|
@@ -616,27 +429,6 @@ on_add_channel(
|
|
|
InstanceId, OldState0, ChannelId, HTTPReq
|
|
|
),
|
|
|
|
|
|
- %% update IoTDB channel
|
|
|
- DeviceId = maps:get(device_id, Parameter, <<>>),
|
|
|
- Channel = Parameter#{
|
|
|
- device_id => emqx_placeholder:preproc_tmpl(DeviceId),
|
|
|
- data := preproc_data_template(Data)
|
|
|
- },
|
|
|
- Channels2 = Channels#{ChannelId => Channel},
|
|
|
- {ok, OldState#{channels := Channels2}}
|
|
|
- end;
|
|
|
-on_add_channel(
|
|
|
- _InstanceId,
|
|
|
- #{driver := thrift, channels := Channels} = OldState,
|
|
|
- ChannelId,
|
|
|
- #{
|
|
|
- parameters := #{data := Data} = Parameter
|
|
|
- }
|
|
|
-) ->
|
|
|
- case maps:is_key(ChannelId, Channels) of
|
|
|
- true ->
|
|
|
- {error, already_exists};
|
|
|
- _ ->
|
|
|
%% update IoTDB channel
|
|
|
DeviceId = maps:get(device_id, Parameter, <<>>),
|
|
|
Channel = Parameter#{
|
|
|
@@ -647,11 +439,8 @@ on_add_channel(
|
|
|
{ok, OldState#{channels := Channels2}}
|
|
|
end.
|
|
|
|
|
|
-on_remove_channel(InstanceId, #{driver := restapi, channels := Channels} = OldState0, ChannelId) ->
|
|
|
+on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) ->
|
|
|
{ok, OldState} = emqx_bridge_http_connector:on_remove_channel(InstanceId, OldState0, ChannelId),
|
|
|
- Channels2 = maps:remove(ChannelId, Channels),
|
|
|
- {ok, OldState#{channels => Channels2}};
|
|
|
-on_remove_channel(_InstanceId, #{driver := thrift, channels := Channels} = OldState, ChannelId) ->
|
|
|
Channels2 = maps:remove(ChannelId, Channels),
|
|
|
{ok, OldState#{channels => Channels2}}.
|
|
|
|
|
|
@@ -747,16 +536,16 @@ proc_data(
|
|
|
],
|
|
|
Msg,
|
|
|
Nows,
|
|
|
- IoTDbVsn,
|
|
|
+ IotDbVsn,
|
|
|
Acc
|
|
|
) ->
|
|
|
DataType = list_to_binary(
|
|
|
string:uppercase(binary_to_list(emqx_placeholder:proc_tmpl(DataType0, Msg)))
|
|
|
),
|
|
|
try
|
|
|
- proc_data(T, Msg, Nows, IoTDbVsn, [
|
|
|
+ proc_data(T, Msg, Nows, IotDbVsn, [
|
|
|
#{
|
|
|
- timestamp => iot_timestamp(IoTDbVsn, TimestampTkn, Msg, Nows),
|
|
|
+ timestamp => iot_timestamp(IotDbVsn, TimestampTkn, Msg, Nows),
|
|
|
measurement => emqx_placeholder:proc_tmpl(Measurement, Msg),
|
|
|
data_type => DataType,
|
|
|
value => proc_value(DataType, ValueTkn, Msg)
|
|
|
@@ -770,28 +559,28 @@ proc_data(
|
|
|
?SLOG(debug, #{exception => Error, reason => Reason, stacktrace => Stacktrace}),
|
|
|
{error, invalid_data}
|
|
|
end;
|
|
|
-proc_data([], _Msg, _Nows, _IoTDbVsn, Acc) ->
|
|
|
+proc_data([], _Msg, _Nows, _IotDbVsn, Acc) ->
|
|
|
{ok, lists:reverse(Acc)}.
|
|
|
|
|
|
-iot_timestamp(_IoTDbVsn, Timestamp, _, _) when is_integer(Timestamp) ->
|
|
|
+iot_timestamp(_IotDbVsn, Timestamp, _, _) when is_integer(Timestamp) ->
|
|
|
Timestamp;
|
|
|
-iot_timestamp(IoTDbVsn, TimestampTkn, Msg, Nows) ->
|
|
|
- iot_timestamp(IoTDbVsn, emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows).
|
|
|
+iot_timestamp(IotDbVsn, TimestampTkn, Msg, Nows) ->
|
|
|
+ iot_timestamp(IotDbVsn, emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows).
|
|
|
|
|
|
%% > v1.3.0 don't allow write nanoseconds nor microseconds
|
|
|
iot_timestamp(?VSN_1_3_X, <<"now_us">>, #{now_ms := NowMs}) ->
|
|
|
NowMs;
|
|
|
iot_timestamp(?VSN_1_3_X, <<"now_ns">>, #{now_ms := NowMs}) ->
|
|
|
NowMs;
|
|
|
-iot_timestamp(_IoTDbVsn, <<"now_us">>, #{now_us := NowUs}) ->
|
|
|
+iot_timestamp(_IotDbVsn, <<"now_us">>, #{now_us := NowUs}) ->
|
|
|
NowUs;
|
|
|
-iot_timestamp(_IoTDbVsn, <<"now_ns">>, #{now_ns := NowNs}) ->
|
|
|
+iot_timestamp(_IotDbVsn, <<"now_ns">>, #{now_ns := NowNs}) ->
|
|
|
NowNs;
|
|
|
-iot_timestamp(_IoTDbVsn, Timestamp, #{now_ms := NowMs}) when
|
|
|
+iot_timestamp(_IotDbVsn, Timestamp, #{now_ms := NowMs}) when
|
|
|
Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>>
|
|
|
->
|
|
|
NowMs;
|
|
|
-iot_timestamp(_IoTDbVsn, Timestamp, _) when is_binary(Timestamp) ->
|
|
|
+iot_timestamp(_IotDbVsn, Timestamp, _) when is_binary(Timestamp) ->
|
|
|
binary_to_integer(Timestamp).
|
|
|
|
|
|
proc_value(<<"TEXT">>, ValueTkn, Msg) ->
|
|
|
@@ -930,10 +719,6 @@ iotdb_field_key(is_aligned, ?VSN_1_0_X) ->
|
|
|
<<"is_aligned">>;
|
|
|
iotdb_field_key(is_aligned, ?VSN_0_13_X) ->
|
|
|
<<"isAligned">>;
|
|
|
-iotdb_field_key(is_aligned, Vsn) when
|
|
|
- Vsn == ?PROTOCOL_V1; Vsn == ?PROTOCOL_V2; Vsn == ?PROTOCOL_V3
|
|
|
-->
|
|
|
- 'isAligned';
|
|
|
iotdb_field_key(device_id, ?VSN_1_3_X) ->
|
|
|
<<"device">>;
|
|
|
iotdb_field_key(device_id, ?VSN_1_1_X) ->
|
|
|
@@ -942,10 +727,6 @@ iotdb_field_key(device_id, ?VSN_1_0_X) ->
|
|
|
<<"device">>;
|
|
|
iotdb_field_key(device_id, ?VSN_0_13_X) ->
|
|
|
<<"deviceId">>;
|
|
|
-iotdb_field_key(device_id, Vsn) when
|
|
|
- Vsn == ?PROTOCOL_V1; Vsn == ?PROTOCOL_V2; Vsn == ?PROTOCOL_V3
|
|
|
-->
|
|
|
- 'deviceId';
|
|
|
iotdb_field_key(data_types, ?VSN_1_3_X) ->
|
|
|
<<"data_types">>;
|
|
|
iotdb_field_key(data_types, ?VSN_1_1_X) ->
|
|
|
@@ -953,11 +734,7 @@ iotdb_field_key(data_types, ?VSN_1_1_X) ->
|
|
|
iotdb_field_key(data_types, ?VSN_1_0_X) ->
|
|
|
<<"data_types">>;
|
|
|
iotdb_field_key(data_types, ?VSN_0_13_X) ->
|
|
|
- <<"dataTypes">>;
|
|
|
-iotdb_field_key(data_types, Vsn) when
|
|
|
- Vsn == ?PROTOCOL_V1; Vsn == ?PROTOCOL_V2; Vsn == ?PROTOCOL_V3
|
|
|
-->
|
|
|
- dtypes.
|
|
|
+ <<"dataTypes">>.
|
|
|
|
|
|
to_list(List) when is_list(List) -> List;
|
|
|
to_list(Data) -> [Data].
|
|
|
@@ -979,8 +756,6 @@ handle_response({ok, Code, _Headers, Body}) ->
|
|
|
{error, #{code => Code, body => Body}};
|
|
|
handle_response({ok, Code, Body}) ->
|
|
|
{error, #{code => Code, body => Body}};
|
|
|
-handle_response({ok, _} = Resp) ->
|
|
|
- Resp;
|
|
|
handle_response({error, _} = Error) ->
|
|
|
Error.
|
|
|
|
|
|
@@ -1074,8 +849,3 @@ get_data_template(#{data := Data}, _Payloads) when Data =/= [] ->
|
|
|
%% This is a self-describing message
|
|
|
get_data_template(#{data := []}, Payloads) ->
|
|
|
preproc_data_list(Payloads).
|
|
|
-
|
|
|
-do_on_query(InstanceId, ChannelId, Data, #{driver := restapi} = State) ->
|
|
|
- emqx_bridge_http_connector:on_query(InstanceId, {ChannelId, Data}, State);
|
|
|
-do_on_query(InstanceId, _ChannelId, Data, #{driver := thrift} = _State) ->
|
|
|
- ecpool:pick_and_do(InstanceId, {iotdb, insert_tablet, [Data]}, no_handover).
|