Explorar o código

feat(influxdb): add async callback

JimMoen %!s(int64=3) %!d(string=hai) anos
pai
achega
594d071c05

+ 8 - 0
apps/emqx_resource/src/emqx_resource.erl

@@ -114,6 +114,7 @@
 -optional_callbacks([
     on_query/3,
     on_batch_query/3,
+    on_query_async/4,
     on_get_status/2
 ]).
 
@@ -130,6 +131,13 @@
 %% when calling emqx_resource:on_batch_query/3
 -callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> query_result().
 
+-callback on_query_async(
+    resource_id(),
+    Request :: term(),
+    {ReplyFun :: function(), Args :: list()},
+    resource_state()
+) -> query_result().
+
 %% when calling emqx_resource:health_check/2
 -callback on_get_status(resource_id(), resource_state()) ->
     resource_status()

+ 23 - 1
lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl

@@ -18,6 +18,7 @@
     on_stop/2,
     on_query/3,
     on_batch_query/3,
+    on_query_async/4,
     on_get_status/2
 ]).
 
@@ -62,6 +63,20 @@ on_batch_query(InstId, BatchData, State = #{write_syntax := SyntaxLines, client
             {resource_down, disconnected}
     end.
 
+on_query_async(
+    InstId,
+    {send_message, Data},
+    {ReplayFun, Args},
+    _State = #{write_syntax := SyntaxLines, client := Client}
+) ->
+    case data_to_points(Data, SyntaxLines) of
+        {ok, Points} ->
+            do_async_query(InstId, Client, Points, {ReplayFun, Args});
+        {error, ErrorPoints} = Err ->
+            log_error_points(InstId, ErrorPoints),
+            Err
+    end.
+
 on_get_status(_InstId, #{client := Client}) ->
     case influxdb:is_alive(Client) of
         true ->
@@ -331,7 +346,6 @@ ssl_config(SSL = #{enable := true}) ->
 
 %% -------------------------------------------------------------------------------------------------
 %% Query
-
 do_query(InstId, Client, Points) ->
     case influxdb:write(Client, Points) of
         ok ->
@@ -349,6 +363,14 @@ do_query(InstId, Client, Points) ->
             Err
     end.
 
+do_async_query(InstId, Client, Points, ReplayFunAndArgs) ->
+    ?SLOG(info, #{
+        msg => "influxdb write point async",
+        connector => InstId,
+        points => Points
+    }),
+    ok = influxdb:write_async(Client, Points, ReplayFunAndArgs).
+
 %% -------------------------------------------------------------------------------------------------
 %% Tags & Fields Config Trans