|
|
@@ -33,8 +33,8 @@ on_start(InstId, Config) ->
|
|
|
on_stop(_InstId, #{client := Client}) ->
|
|
|
influxdb:stop_client(Client).
|
|
|
|
|
|
-on_query(_InstId, {send_message, _Data}, _AfterQuery, _State) ->
|
|
|
- ok.
|
|
|
+on_query(InstId, {send_message, Data}, AfterQuery, State) ->
|
|
|
+ do_query(InstId, {send_message, Data}, AfterQuery, State).
|
|
|
|
|
|
on_get_status(_InstId, #{client := Client}) ->
|
|
|
case influxdb:is_alive(Client) of
|
|
|
@@ -184,18 +184,36 @@ start_client(InstId, Config) ->
|
|
|
{error, R}
|
|
|
end.
|
|
|
|
|
|
-do_start_client(InstId, ClientConfig, Config = #{egress := #{payload := PayloadBin}}) ->
|
|
|
+do_start_client(
|
|
|
+ InstId,
|
|
|
+ ClientConfig,
|
|
|
+ Config = #{
|
|
|
+ egress := #{
|
|
|
+ measurement := Measurement,
|
|
|
+ timestamp := Timestamp,
|
|
|
+ tags := Tags,
|
|
|
+ fields := Fields
|
|
|
+ }
|
|
|
+ }
|
|
|
+) ->
|
|
|
case influxdb:start_client(ClientConfig) of
|
|
|
{ok, Client} ->
|
|
|
case influxdb:is_alive(Client) of
|
|
|
true ->
|
|
|
- Payload = emqx_plugin_libs_rule:preproc_tmpl(PayloadBin),
|
|
|
+ State = #{
|
|
|
+ client => Client,
|
|
|
+ measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
|
|
|
+ timestamp => emqx_plugin_libs_rule:preproc_tmpl(Timestamp),
|
|
|
+ tags => to_tags_config(Tags),
|
|
|
+ fields => to_fields_config(Fields)
|
|
|
+ },
|
|
|
?SLOG(info, #{
|
|
|
msg => "starting influxdb connector success",
|
|
|
connector => InstId,
|
|
|
- client => Client
|
|
|
+ client => Client,
|
|
|
+ state => State
|
|
|
}),
|
|
|
- #{client => Client, payload => Payload};
|
|
|
+ {ok, State};
|
|
|
false ->
|
|
|
?SLOG(error, #{
|
|
|
msg => "starting influxdb connector failed",
|
|
|
@@ -231,21 +249,15 @@ client_config(
|
|
|
}
|
|
|
) ->
|
|
|
[
|
|
|
- {host, Host},
|
|
|
+ {host, binary_to_list(Host)},
|
|
|
{port, Port},
|
|
|
{pool_size, PoolSize},
|
|
|
{pool, binary_to_atom(InstId, utf8)},
|
|
|
- {precision, maps:get(precision, Config, ms)}
|
|
|
+ {precision, atom_to_binary(maps:get(precision, Config, ms), utf8)}
|
|
|
] ++ protocol_config(Config).
|
|
|
|
|
|
+%% api v2 config
|
|
|
protocol_config(#{
|
|
|
- protocol := udp
|
|
|
-}) ->
|
|
|
- [
|
|
|
- {protocol, udp}
|
|
|
- ];
|
|
|
-protocol_config(#{
|
|
|
- protocol := api_v1,
|
|
|
username := Username,
|
|
|
password := Password,
|
|
|
database := DB,
|
|
|
@@ -254,13 +266,12 @@ protocol_config(#{
|
|
|
[
|
|
|
{protocol, http},
|
|
|
{version, v1},
|
|
|
- {username, Username},
|
|
|
- {password, Password},
|
|
|
- {database, DB},
|
|
|
- {ssl, SSL}
|
|
|
+ {username, binary_to_list(Username)},
|
|
|
+ {password, binary_to_list(Password)},
|
|
|
+ {database, binary_to_list(DB)}
|
|
|
] ++ ssl_config(SSL);
|
|
|
+%% api v1 config
|
|
|
protocol_config(#{
|
|
|
- protocol := api_v2,
|
|
|
bucket := Bucket,
|
|
|
org := Org,
|
|
|
token := Token,
|
|
|
@@ -269,11 +280,15 @@ protocol_config(#{
|
|
|
[
|
|
|
{protocol, http},
|
|
|
{version, v2},
|
|
|
- {bucket, Bucket},
|
|
|
- {org, Org},
|
|
|
- {token, Token},
|
|
|
- {ssl, SSL}
|
|
|
- ] ++ ssl_config(SSL).
|
|
|
+ {bucket, binary_to_list(Bucket)},
|
|
|
+ {org, binary_to_list(Org)},
|
|
|
+ {token, Token}
|
|
|
+ ] ++ ssl_config(SSL);
|
|
|
+%% udp config
|
|
|
+protocol_config(_) ->
|
|
|
+ [
|
|
|
+ {protocol, udp}
|
|
|
+ ].
|
|
|
|
|
|
ssl_config(#{enable := false}) ->
|
|
|
[
|
|
|
@@ -284,3 +299,110 @@ ssl_config(SSL = #{enable := true}) ->
|
|
|
{https_enabled, true},
|
|
|
{transport, ssl}
|
|
|
] ++ maps:to_list(maps:remove(enable, SSL)).
|
|
|
+
|
|
|
+%% -------------------------------------------------------------------------------------------------
|
|
|
+%% Query
|
|
|
+
|
|
|
+do_query(InstId, {send_message, Data}, AfterQuery, State = #{client := Client}) ->
|
|
|
+ case data_to_point(Data, State) of
|
|
|
+ {ok, Point} ->
|
|
|
+ case influxdb:write(Client, [Point]) of
|
|
|
+ ok ->
|
|
|
+ ?SLOG(debug, #{
|
|
|
+ msg => "influxdb write point success",
|
|
|
+ connector => InstId,
|
|
|
+ point => Point
|
|
|
+ }),
|
|
|
+ emqx_resource:query_success(AfterQuery);
|
|
|
+ {error, Reason} ->
|
|
|
+ ?SLOG(error, #{
|
|
|
+ msg => "influxdb write point failed",
|
|
|
+ connector => InstId,
|
|
|
+ reason => Reason
|
|
|
+ }),
|
|
|
+ emqx_resource:query_failed(AfterQuery)
|
|
|
+ end;
|
|
|
+ {error, Reason} ->
|
|
|
+ ?SLOG(error, #{
|
|
|
+ msg => "influxdb trans point failed",
|
|
|
+ connector => InstId,
|
|
|
+ reason => Reason
|
|
|
+ }),
|
|
|
+ {error, Reason}
|
|
|
+ end.
|
|
|
+
|
|
|
+%% -------------------------------------------------------------------------------------------------
|
|
|
+%% Tags & Fields Config Trans
|
|
|
+
|
|
|
+to_tags_config(Tags) ->
|
|
|
+ maps:fold(fun to_maps_config/3, #{}, Tags).
|
|
|
+
|
|
|
+to_fields_config(Fields) ->
|
|
|
+ maps:fold(fun to_maps_config/3, #{}, Fields).
|
|
|
+
|
|
|
+to_maps_config(K, [IntType, V], Res) when IntType == <<"int">> orelse IntType == <<"uint">> ->
|
|
|
+ NK = emqx_plugin_libs_rule:preproc_tmpl(bin(K)),
|
|
|
+ NV = emqx_plugin_libs_rule:preproc_tmpl(bin(V)),
|
|
|
+ Res#{NK => {binary_to_atom(IntType, utf8), NV}};
|
|
|
+to_maps_config(K, V, Res) ->
|
|
|
+ NK = emqx_plugin_libs_rule:preproc_tmpl(bin(K)),
|
|
|
+ NV = emqx_plugin_libs_rule:preproc_tmpl(bin(V)),
|
|
|
+ Res#{NK => NV}.
|
|
|
+
|
|
|
+%% -------------------------------------------------------------------------------------------------
|
|
|
+%% Tags & Fields Data Trans
|
|
|
+data_to_point(
|
|
|
+ Data,
|
|
|
+ #{
|
|
|
+ measurement := Measurement,
|
|
|
+ timestamp := Timestamp,
|
|
|
+ tags := Tags,
|
|
|
+ fields := Fields
|
|
|
+ }
|
|
|
+) ->
|
|
|
+ TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
|
|
|
+ case emqx_plugin_libs_rule:proc_tmpl(Timestamp, Data, TransOptions) of
|
|
|
+ [TimestampInt] when is_integer(TimestampInt) ->
|
|
|
+ Point = #{
|
|
|
+ measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data),
|
|
|
+ timestamp => TimestampInt,
|
|
|
+ tags => maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags),
|
|
|
+ fields => maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields)
|
|
|
+ },
|
|
|
+ {ok, Point};
|
|
|
+ BadTimestamp ->
|
|
|
+ {error, {bad_timestamp, BadTimestamp}}
|
|
|
+ end.
|
|
|
+
|
|
|
+maps_config_to_data(K, {IntType, V}, {Data, Res}) when IntType == int orelse IntType == uint ->
|
|
|
+ TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
|
|
|
+ NK = emqx_plugin_libs_rule:proc_tmpl(K, Data, TransOptions),
|
|
|
+ NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, TransOptions),
|
|
|
+ case {NK, NV} of
|
|
|
+ {[undefined], _} ->
|
|
|
+ Res;
|
|
|
+ {_, [undefined]} ->
|
|
|
+ Res;
|
|
|
+ {_, [IntV]} when is_integer(IntV) ->
|
|
|
+ Res#{NK => {IntType, IntV}}
|
|
|
+ end;
|
|
|
+maps_config_to_data(K, V, {Data, Res}) ->
|
|
|
+ TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
|
|
|
+ NK = emqx_plugin_libs_rule:proc_tmpl(K, Data, TransOptions),
|
|
|
+ NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, TransOptions),
|
|
|
+ case {NK, NV} of
|
|
|
+ {[undefined], _} ->
|
|
|
+ Res;
|
|
|
+ {_, [undefined]} ->
|
|
|
+ Res;
|
|
|
+ _ ->
|
|
|
+ Res#{bin(NK) => NV}
|
|
|
+ end.
|
|
|
+
|
|
|
+data_filter(undefined) -> undefined;
|
|
|
+data_filter(Int) when is_integer(Int) -> Int;
|
|
|
+data_filter(Number) when is_number(Number) -> Number;
|
|
|
+data_filter(Bool) when is_boolean(Bool) -> Bool;
|
|
|
+data_filter(Data) -> bin(Data).
|
|
|
+
|
|
|
+bin(Data) -> emqx_plugin_libs_rule:bin(Data).
|