|
@@ -39,6 +39,9 @@
|
|
|
|
|
|
|
|
-type ts_precision() :: ns | us | ms | s.
|
|
-type ts_precision() :: ns | us | ms | s.
|
|
|
|
|
|
|
|
|
|
+%% Allocatable resources
|
|
|
|
|
+-define(influx_client, influx_client).
|
|
|
|
|
+
|
|
|
-define(INFLUXDB_DEFAULT_PORT, 8086).
|
|
-define(INFLUXDB_DEFAULT_PORT, 8086).
|
|
|
|
|
|
|
|
%% influxdb servers don't need parse
|
|
%% influxdb servers don't need parse
|
|
@@ -53,10 +56,20 @@
|
|
|
callback_mode() -> async_if_possible.
|
|
callback_mode() -> async_if_possible.
|
|
|
|
|
|
|
|
on_start(InstId, Config) ->
|
|
on_start(InstId, Config) ->
|
|
|
|
|
+ %% InstID as pool would be handled by influxdb client
|
|
|
|
|
+ %% so there is no need to allocate pool_name here
|
|
|
|
|
+ %% ehttpc for influxdb-v1/v2,
|
|
|
|
|
+ %% ecpool for influxdb-udp
|
|
|
|
|
+ %% See: influxdb:start_client/1
|
|
|
start_client(InstId, Config).
|
|
start_client(InstId, Config).
|
|
|
|
|
|
|
|
-on_stop(_InstId, #{client := Client}) ->
|
|
|
|
|
- influxdb:stop_client(Client).
|
|
|
|
|
|
|
+on_stop(InstId, _State) ->
|
|
|
|
|
+ case emqx_resource:get_allocated_resources(InstId) of
|
|
|
|
|
+ #{?influx_client := Client} ->
|
|
|
|
|
+ influxdb:stop_client(Client);
|
|
|
|
|
+ _ ->
|
|
|
|
|
+ ok
|
|
|
|
|
+ end.
|
|
|
|
|
|
|
|
on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) ->
|
|
on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) ->
|
|
|
case data_to_points(Data, SyntaxLines) of
|
|
case data_to_points(Data, SyntaxLines) of
|
|
@@ -220,8 +233,12 @@ start_client(InstId, Config) ->
|
|
|
config => emqx_utils:redact(Config),
|
|
config => emqx_utils:redact(Config),
|
|
|
client_config => emqx_utils:redact(ClientConfig)
|
|
client_config => emqx_utils:redact(ClientConfig)
|
|
|
}),
|
|
}),
|
|
|
- try
|
|
|
|
|
- do_start_client(InstId, ClientConfig, Config)
|
|
|
|
|
|
|
+ try do_start_client(InstId, ClientConfig, Config) of
|
|
|
|
|
+ Res = {ok, #{client := Client}} ->
|
|
|
|
|
+ ok = emqx_resource:allocate_resource(InstId, ?influx_client, Client),
|
|
|
|
|
+ Res;
|
|
|
|
|
+ {error, Reason} ->
|
|
|
|
|
+ {error, Reason}
|
|
|
catch
|
|
catch
|
|
|
E:R:S ->
|
|
E:R:S ->
|
|
|
?tp(influxdb_connector_start_exception, #{error => {E, R}}),
|
|
?tp(influxdb_connector_start_exception, #{error => {E, R}}),
|