|
|
@@ -4,7 +4,7 @@
|
|
|
-module(emqx_bridge_greptimedb_connector).
|
|
|
|
|
|
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
|
|
-
|
|
|
+-include_lib("emqx_resource/include/emqx_resource.hrl").
|
|
|
-include_lib("hocon/include/hoconsc.hrl").
|
|
|
-include_lib("typerefl/include/types.hrl").
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
@@ -19,6 +19,10 @@
|
|
|
callback_mode/0,
|
|
|
on_start/2,
|
|
|
on_stop/2,
|
|
|
+ on_add_channel/4,
|
|
|
+ on_remove_channel/3,
|
|
|
+ on_get_channel_status/3,
|
|
|
+ on_get_channels/1,
|
|
|
on_query/3,
|
|
|
on_batch_query/3,
|
|
|
on_query_async/4,
|
|
|
@@ -34,6 +38,8 @@
|
|
|
desc/1
|
|
|
]).
|
|
|
|
|
|
+-export([precision_field/0]).
|
|
|
+
|
|
|
%% only for test
|
|
|
-ifdef(TEST).
|
|
|
-export([is_unrecoverable_error/1]).
|
|
|
@@ -62,6 +68,38 @@
|
|
|
%% resource callback
|
|
|
callback_mode() -> async_if_possible.
|
|
|
|
|
|
+on_add_channel(
|
|
|
+ _InstanceId,
|
|
|
+ #{channels := Channels} = OldState,
|
|
|
+ ChannelId,
|
|
|
+ #{parameters := Parameters} = ChannelConfig0
|
|
|
+) ->
|
|
|
+ #{write_syntax := WriteSyntaxTmpl} = Parameters,
|
|
|
+ Precision = maps:get(precision, Parameters, ms),
|
|
|
+ ChannelConfig = maps:merge(
|
|
|
+ Parameters,
|
|
|
+ ChannelConfig0#{
|
|
|
+ precision => Precision,
|
|
|
+ write_syntax => to_config(WriteSyntaxTmpl, Precision)
|
|
|
+ }
|
|
|
+ ),
|
|
|
+ {ok, OldState#{
|
|
|
+ channels => Channels#{ChannelId => ChannelConfig}
|
|
|
+ }}.
|
|
|
+
|
|
|
+on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) ->
|
|
|
+ NewState = State#{channels => maps:remove(ChannelId, Channels)},
|
|
|
+ {ok, NewState}.
|
|
|
+
|
|
|
+on_get_channel_status(InstanceId, _ChannelId, State) ->
|
|
|
+ case on_get_status(InstanceId, State) of
|
|
|
+ ?status_connected -> ?status_connected;
|
|
|
+ _ -> ?status_connecting
|
|
|
+ end.
|
|
|
+
|
|
|
+on_get_channels(InstanceId) ->
|
|
|
+ emqx_bridge_v2:get_channels_for_connector(InstanceId).
|
|
|
+
|
|
|
on_start(InstId, Config) ->
|
|
|
%% InstID as pool would be handled by greptimedb client
|
|
|
%% so there is no need to allocate pool_name here
|
|
|
@@ -78,8 +116,13 @@ on_stop(InstId, _State) ->
|
|
|
ok
|
|
|
end.
|
|
|
|
|
|
-on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) ->
|
|
|
- case data_to_points(Data, SyntaxLines) of
|
|
|
+on_query(InstId, {Channel, Message}, State) ->
|
|
|
+ #{
|
|
|
+ channels := #{Channel := #{write_syntax := SyntaxLines}},
|
|
|
+ client := Client,
|
|
|
+ dbname := DbName
|
|
|
+ } = State,
|
|
|
+ case data_to_points(Message, DbName, SyntaxLines) of
|
|
|
{ok, Points} ->
|
|
|
?tp(
|
|
|
greptimedb_connector_send_query,
|
|
|
@@ -97,8 +140,13 @@ on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, c
|
|
|
|
|
|
%% Once a Batched Data trans to points failed.
|
|
|
%% This batch query failed
|
|
|
-on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client := Client}) ->
|
|
|
- case parse_batch_data(InstId, BatchData, SyntaxLines) of
|
|
|
+on_batch_query(InstId, [{Channel, _} | _] = BatchData, State) ->
|
|
|
+ #{
|
|
|
+ channels := #{Channel := #{write_syntax := SyntaxLines}},
|
|
|
+ client := Client,
|
|
|
+ dbname := DbName
|
|
|
+ } = State,
|
|
|
+ case parse_batch_data(InstId, DbName, BatchData, SyntaxLines) of
|
|
|
{ok, Points} ->
|
|
|
?tp(
|
|
|
greptimedb_connector_send_query,
|
|
|
@@ -113,13 +161,13 @@ on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client
|
|
|
{error, {unrecoverable_error, Reason}}
|
|
|
end.
|
|
|
|
|
|
-on_query_async(
|
|
|
- InstId,
|
|
|
- {send_message, Data},
|
|
|
- {ReplyFun, Args},
|
|
|
- _State = #{write_syntax := SyntaxLines, client := Client}
|
|
|
-) ->
|
|
|
- case data_to_points(Data, SyntaxLines) of
|
|
|
+on_query_async(InstId, {Channel, Message}, {ReplyFun, Args}, State) ->
|
|
|
+ #{
|
|
|
+ channels := #{Channel := #{write_syntax := SyntaxLines}},
|
|
|
+ client := Client,
|
|
|
+ dbname := DbName
|
|
|
+ } = State,
|
|
|
+ case data_to_points(Message, DbName, SyntaxLines) of
|
|
|
{ok, Points} ->
|
|
|
?tp(
|
|
|
greptimedb_connector_send_query,
|
|
|
@@ -135,13 +183,13 @@ on_query_async(
|
|
|
Err
|
|
|
end.
|
|
|
|
|
|
-on_batch_query_async(
|
|
|
- InstId,
|
|
|
- BatchData,
|
|
|
- {ReplyFun, Args},
|
|
|
- #{write_syntax := SyntaxLines, client := Client}
|
|
|
-) ->
|
|
|
- case parse_batch_data(InstId, BatchData, SyntaxLines) of
|
|
|
+on_batch_query_async(InstId, [{Channel, _} | _] = BatchData, {ReplyFun, Args}, State) ->
|
|
|
+ #{
|
|
|
+ channels := #{Channel := #{write_syntax := SyntaxLines}},
|
|
|
+ client := Client,
|
|
|
+ dbname := DbName
|
|
|
+ } = State,
|
|
|
+ case parse_batch_data(InstId, DbName, BatchData, SyntaxLines) of
|
|
|
{ok, Points} ->
|
|
|
?tp(
|
|
|
greptimedb_connector_send_query,
|
|
|
@@ -159,9 +207,9 @@ on_batch_query_async(
|
|
|
on_get_status(_InstId, #{client := Client}) ->
|
|
|
case greptimedb:is_alive(Client) of
|
|
|
true ->
|
|
|
- connected;
|
|
|
+ ?status_connected;
|
|
|
false ->
|
|
|
- disconnected
|
|
|
+ ?status_disconnected
|
|
|
end.
|
|
|
|
|
|
%% -------------------------------------------------------------------------------------------------
|
|
|
@@ -179,22 +227,36 @@ roots() ->
|
|
|
}}
|
|
|
].
|
|
|
|
|
|
+fields("connector") ->
|
|
|
+ [server_field()] ++
|
|
|
+ credentials_fields() ++
|
|
|
+ emqx_connector_schema_lib:ssl_fields();
|
|
|
+%% ============ begin: schema for old bridge configs ============
|
|
|
fields(common) ->
|
|
|
[
|
|
|
- {server, server()},
|
|
|
- {precision,
|
|
|
- %% The greptimedb only supports these 4 precision
|
|
|
- mk(enum([ns, us, ms, s]), #{
|
|
|
- required => false, default => ms, desc => ?DESC("precision")
|
|
|
- })}
|
|
|
+ server_field(),
|
|
|
+ precision_field()
|
|
|
];
|
|
|
fields(greptimedb) ->
|
|
|
fields(common) ++
|
|
|
- [
|
|
|
- {dbname, mk(binary(), #{required => true, desc => ?DESC("dbname")})},
|
|
|
- {username, mk(binary(), #{desc => ?DESC("username")})},
|
|
|
- {password, emqx_schema_secret:mk(#{desc => ?DESC("password")})}
|
|
|
- ] ++ emqx_connector_schema_lib:ssl_fields().
|
|
|
+ credentials_fields() ++
|
|
|
+ emqx_connector_schema_lib:ssl_fields().
|
|
|
+%% ============ end: schema for old bridge configs ============
|
|
|
+
|
|
|
+desc(common) ->
|
|
|
+ ?DESC("common");
|
|
|
+desc(greptimedb) ->
|
|
|
+ ?DESC("greptimedb").
|
|
|
+
|
|
|
+precision_field() ->
|
|
|
+ {precision,
|
|
|
+ %% The greptimedb only supports these 4 precision
|
|
|
+ mk(enum([ns, us, ms, s]), #{
|
|
|
+ required => false, default => ms, desc => ?DESC("precision")
|
|
|
+ })}.
|
|
|
+
|
|
|
+server_field() ->
|
|
|
+ {server, server()}.
|
|
|
|
|
|
server() ->
|
|
|
Meta = #{
|
|
|
@@ -205,10 +267,12 @@ server() ->
|
|
|
},
|
|
|
emqx_schema:servers_sc(Meta, ?GREPTIMEDB_HOST_OPTIONS).
|
|
|
|
|
|
-desc(common) ->
|
|
|
- ?DESC("common");
|
|
|
-desc(greptimedb) ->
|
|
|
- ?DESC("greptimedb").
|
|
|
+credentials_fields() ->
|
|
|
+ [
|
|
|
+ {dbname, mk(binary(), #{required => true, desc => ?DESC("dbname")})},
|
|
|
+ {username, mk(binary(), #{desc => ?DESC("username")})},
|
|
|
+ {password, emqx_schema_secret:mk(#{desc => ?DESC("password")})}
|
|
|
+ ].
|
|
|
|
|
|
%% -------------------------------------------------------------------------------------------------
|
|
|
%% internal functions
|
|
|
@@ -243,9 +307,8 @@ start_client(InstId, Config) ->
|
|
|
do_start_client(
|
|
|
InstId,
|
|
|
ClientConfig,
|
|
|
- Config = #{write_syntax := Lines}
|
|
|
+ Config
|
|
|
) ->
|
|
|
- Precision = maps:get(precision, Config, ms),
|
|
|
case greptimedb:start_client(ClientConfig) of
|
|
|
{ok, Client} ->
|
|
|
case greptimedb:is_alive(Client, true) of
|
|
|
@@ -253,7 +316,7 @@ do_start_client(
|
|
|
State = #{
|
|
|
client => Client,
|
|
|
dbname => proplists:get_value(dbname, ClientConfig, ?DEFAULT_DB),
|
|
|
- write_syntax => to_config(Lines, Precision)
|
|
|
+ channels => #{}
|
|
|
},
|
|
|
?SLOG(info, #{
|
|
|
msg => "starting_greptimedb_connector_success",
|
|
|
@@ -314,8 +377,7 @@ client_config(
|
|
|
{pool, InstId},
|
|
|
{pool_type, random},
|
|
|
{auto_reconnect, ?AUTO_RECONNECT_S},
|
|
|
- {gprc_options, grpc_config()},
|
|
|
- {timeunit, maps:get(precision, Config, ms)}
|
|
|
+ {gprc_options, grpc_config()}
|
|
|
] ++ protocol_config(Config).
|
|
|
|
|
|
protocol_config(
|
|
|
@@ -469,10 +531,10 @@ to_maps_config(K, V, Res) ->
|
|
|
|
|
|
%% -------------------------------------------------------------------------------------------------
|
|
|
%% Tags & Fields Data Trans
|
|
|
-parse_batch_data(InstId, BatchData, SyntaxLines) ->
|
|
|
+parse_batch_data(InstId, DbName, BatchData, SyntaxLines) ->
|
|
|
{Points, Errors} = lists:foldl(
|
|
|
- fun({send_message, Data}, {ListOfPoints, ErrAccIn}) ->
|
|
|
- case data_to_points(Data, SyntaxLines) of
|
|
|
+ fun({_, Data}, {ListOfPoints, ErrAccIn}) ->
|
|
|
+ case data_to_points(Data, DbName, SyntaxLines) of
|
|
|
{ok, Points} ->
|
|
|
{[Points | ListOfPoints], ErrAccIn};
|
|
|
{error, ErrorPoints} ->
|
|
|
@@ -496,21 +558,25 @@ parse_batch_data(InstId, BatchData, SyntaxLines) ->
|
|
|
{error, points_trans_failed}
|
|
|
end.
|
|
|
|
|
|
--spec data_to_points(map(), [
|
|
|
- #{
|
|
|
- fields := [{binary(), binary()}],
|
|
|
- measurement := binary(),
|
|
|
- tags := [{binary(), binary()}],
|
|
|
- timestamp := emqx_placeholder:tmpl_token() | integer(),
|
|
|
- precision := {From :: ts_precision(), To :: ts_precision()}
|
|
|
- }
|
|
|
-]) -> {ok, [map()]} | {error, term()}.
|
|
|
-data_to_points(Data, SyntaxLines) ->
|
|
|
- lines_to_points(Data, SyntaxLines, [], []).
|
|
|
+-spec data_to_points(
|
|
|
+ map(),
|
|
|
+ binary(),
|
|
|
+ [
|
|
|
+ #{
|
|
|
+ fields := [{binary(), binary()}],
|
|
|
+ measurement := binary(),
|
|
|
+ tags := [{binary(), binary()}],
|
|
|
+ timestamp := emqx_placeholder:tmpl_token() | integer(),
|
|
|
+ precision := {From :: ts_precision(), To :: ts_precision()}
|
|
|
+ }
|
|
|
+ ]
|
|
|
+) -> {ok, [map()]} | {error, term()}.
|
|
|
+data_to_points(Data, DbName, SyntaxLines) ->
|
|
|
+ lines_to_points(Data, DbName, SyntaxLines, [], []).
|
|
|
|
|
|
%% When converting multiple rows data into Greptimedb Line Protocol, they are considered to be strongly correlated.
|
|
|
%% And once a row fails to convert, all of them are considered to have failed.
|
|
|
-lines_to_points(_, [], Points, ErrorPoints) ->
|
|
|
+lines_to_points(_Data, _DbName, [], Points, ErrorPoints) ->
|
|
|
case ErrorPoints of
|
|
|
[] ->
|
|
|
{ok, Points};
|
|
|
@@ -518,23 +584,27 @@ lines_to_points(_, [], Points, ErrorPoints) ->
|
|
|
%% ignore trans succeeded points
|
|
|
{error, ErrorPoints}
|
|
|
end;
|
|
|
-lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc) when
|
|
|
+lines_to_points(
|
|
|
+ Data, DbName, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc
|
|
|
+) when
|
|
|
is_list(Ts)
|
|
|
->
|
|
|
TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
|
|
|
case parse_timestamp(emqx_placeholder:proc_tmpl(Ts, Data, TransOptions)) of
|
|
|
{ok, TsInt} ->
|
|
|
Item1 = Item#{timestamp => TsInt},
|
|
|
- continue_lines_to_points(Data, Item1, Rest, ResultPointsAcc, ErrorPointsAcc);
|
|
|
+ continue_lines_to_points(Data, DbName, Item1, Rest, ResultPointsAcc, ErrorPointsAcc);
|
|
|
{error, BadTs} ->
|
|
|
- lines_to_points(Data, Rest, ResultPointsAcc, [
|
|
|
+ lines_to_points(Data, DbName, Rest, ResultPointsAcc, [
|
|
|
{error, {bad_timestamp, BadTs}} | ErrorPointsAcc
|
|
|
])
|
|
|
end;
|
|
|
-lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc) when
|
|
|
+lines_to_points(
|
|
|
+ Data, DbName, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc
|
|
|
+) when
|
|
|
is_integer(Ts)
|
|
|
->
|
|
|
- continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc).
|
|
|
+ continue_lines_to_points(Data, DbName, Item, Rest, ResultPointsAcc, ErrorPointsAcc).
|
|
|
|
|
|
parse_timestamp([TsInt]) when is_integer(TsInt) ->
|
|
|
{ok, TsInt};
|
|
|
@@ -546,30 +616,32 @@ parse_timestamp([TsBin]) ->
|
|
|
{error, TsBin}
|
|
|
end.
|
|
|
|
|
|
-continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc) ->
|
|
|
- case line_to_point(Data, Item) of
|
|
|
+continue_lines_to_points(Data, DbName, Item, Rest, ResultPointsAcc, ErrorPointsAcc) ->
|
|
|
+ case line_to_point(Data, DbName, Item) of
|
|
|
{_, [#{fields := Fields}]} when map_size(Fields) =:= 0 ->
|
|
|
%% greptimedb client doesn't like empty field maps...
|
|
|
ErrorPointsAcc1 = [{error, no_fields} | ErrorPointsAcc],
|
|
|
- lines_to_points(Data, Rest, ResultPointsAcc, ErrorPointsAcc1);
|
|
|
+ lines_to_points(Data, DbName, Rest, ResultPointsAcc, ErrorPointsAcc1);
|
|
|
Point ->
|
|
|
- lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc)
|
|
|
+ lines_to_points(Data, DbName, Rest, [Point | ResultPointsAcc], ErrorPointsAcc)
|
|
|
end.
|
|
|
|
|
|
line_to_point(
|
|
|
Data,
|
|
|
+ DbName,
|
|
|
#{
|
|
|
measurement := Measurement,
|
|
|
tags := Tags,
|
|
|
fields := Fields,
|
|
|
timestamp := Ts,
|
|
|
- precision := Precision
|
|
|
+ precision := {_, ToPrecision} = Precision
|
|
|
} = Item
|
|
|
) ->
|
|
|
{_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags),
|
|
|
{_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields),
|
|
|
TableName = emqx_placeholder:proc_tmpl(Measurement, Data),
|
|
|
- {TableName, [
|
|
|
+ Metric = #{dbname => DbName, table => TableName, timeunit => ToPrecision},
|
|
|
+ {Metric, [
|
|
|
maps:without([precision, measurement], Item#{
|
|
|
tags => EncodedTags,
|
|
|
fields => EncodedFields,
|