|
|
@@ -3,7 +3,6 @@
|
|
|
%%--------------------------------------------------------------------
|
|
|
-module(emqx_bridge_tablestore_connector).
|
|
|
|
|
|
--include("emqx_bridge_tablestore.hrl").
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
|
|
@@ -20,6 +19,7 @@
|
|
|
on_add_channel/4,
|
|
|
on_remove_channel/3,
|
|
|
on_get_channel_status/3,
|
|
|
+ on_get_channels/1,
|
|
|
on_query/3,
|
|
|
on_batch_query/3,
|
|
|
on_get_status/2
|
|
|
@@ -52,7 +52,7 @@ on_add_channel(_InstId, #{channels := Channels} = OldState, ChannelId, Conf) ->
|
|
|
measurement => maybe_preproc(maps:get(measurement, Params)),
|
|
|
tags => preproc_tags(maps:get(tags, Params)),
|
|
|
fields => preproc_fields(maps:get(fields, Params)),
|
|
|
- data_source => maybe_preproc(maps:get(data_source, Params)),
|
|
|
+ data_source => maybe_preproc(maps:get(data_source, Params, <<>>)),
|
|
|
meta_update_model => maps:get(meta_update_model, Params)
|
|
|
}
|
|
|
}
|
|
|
@@ -67,20 +67,25 @@ on_remove_channel(_InstId, #{channels := Channels} = State, ChannelId) ->
|
|
|
on_get_channel_status(InstId, _ChannelId, State) ->
|
|
|
on_get_status(InstId, State).
|
|
|
|
|
|
+on_get_channels(InstId) ->
|
|
|
+ emqx_bridge_influxdb_connector:on_get_channels(InstId).
|
|
|
+
|
|
|
on_start(InstId, Config) ->
|
|
|
- OtsOpts = [
|
|
|
+ BaseOpts = [
|
|
|
{instance, maps:get(instance_name, Config)},
|
|
|
{pool, ?OTS_CLIENT_NAME(InstId)},
|
|
|
{endpoint, maps:get(endpoint, Config)},
|
|
|
- {access_key, emqx_secret:unwrap(maps:get(access_key_id, Config))},
|
|
|
- {access_secret, emqx_secret:unwrap(maps:get(access_key_secret, Config))},
|
|
|
{pool_size, maps:get(pool_size, Config)}
|
|
|
],
|
|
|
- ?LOG_T(info, #{msg => ots_start, opts => OtsOpts}),
|
|
|
- {ok, ClientRef} = start_ots_ts_client(InstId, OtsOpts),
|
|
|
+ SecretOpts = [
|
|
|
+ {access_key, maps:get(access_key_id, Config)},
|
|
|
+ {access_secret, maps:get(access_key_secret, Config)}
|
|
|
+ ],
|
|
|
+ ?LOG_T(info, #{msg => ots_start, ots_opts => BaseOpts}),
|
|
|
+ {ok, ClientRef} = start_ots_ts_client(InstId, BaseOpts ++ SecretOpts),
|
|
|
case list_ots_tables(ClientRef) of
|
|
|
{ok, _} ->
|
|
|
- {ok, #{client_ref => ClientRef, channels => #{}, ots_opts => OtsOpts}};
|
|
|
+ {ok, #{client_ref => ClientRef, channels => #{}, ots_opts => BaseOpts}};
|
|
|
{error, Reason} ->
|
|
|
_ = ots_ts_client:stop(ClientRef),
|
|
|
{error, Reason}
|
|
|
@@ -273,7 +278,8 @@ do_mk_tablestore_data_row(Message, ChannelState, Measurement) ->
|
|
|
<<"now">> -> os:system_time(microsecond);
|
|
|
<<"NOW">> -> os:system_time(microsecond);
|
|
|
undefined -> os:system_time(microsecond);
|
|
|
- Ts1 -> Ts1
|
|
|
+ Ts1 when is_integer(Ts1) -> Ts1;
|
|
|
+ Ts2 -> throw({bad_ots_data, {bad_timestamp, Ts2}})
|
|
|
end,
|
|
|
#{
|
|
|
measurement => Measurement,
|