Explorar o código

feat(influxdb): influxdb connector add `on_batch_query/3` callback

JimMoen %!s(int64=3) %!d(string=hai) anos
pai
achega
0f6c371760

+ 1 - 0
apps/emqx_resource/src/emqx_resource.erl

@@ -126,6 +126,7 @@
 %% when calling emqx_resource:query/3
 -callback on_query(resource_id(), Request :: term(), resource_state()) -> query_result().
 
+%% when calling emqx_resource:on_batch_query/3
 -callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> query_result().
 
 %% when calling emqx_resource:health_check/2

+ 7 - 2
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl

@@ -70,7 +70,8 @@ values(Protocol, post) ->
         write_syntax =>
             <<"${topic},clientid=${clientid}", " ", "payload=${payload},",
                 "${clientid}_int_value=${payload.int_key}i,", SupportUint/binary,
-                "bool=${payload.bool}">>
+                "bool=${payload.bool}">>,
+        batch => #{enable_batch => false, batch_size => 5, batch_time => <<"1m">>}
     };
 values(Protocol, put) ->
     values(Protocol, post).
@@ -109,7 +110,9 @@ fields("get_api_v2") ->
 fields(Name) when
     Name == influxdb_udp orelse Name == influxdb_api_v1 orelse Name == influxdb_api_v2
 ->
-    fields(basic) ++ connector_field(Name).
+    fields(basic) ++
+        emqx_resource_schema:fields('batch&async&queue') ++
+        connector_field(Name).
 
 method_fileds(post, ConnectorType) ->
     fields(basic) ++ connector_field(ConnectorType) ++ type_name_field(ConnectorType);
@@ -162,6 +165,8 @@ write_syntax(converter) ->
     fun to_influx_lines/1;
 write_syntax(desc) ->
     ?DESC("write_syntax");
+write_syntax(format) ->
+    <<"sql">>;
 write_syntax(_) ->
     undefined.
 

+ 83 - 23
lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl

@@ -17,6 +17,7 @@
     on_start/2,
     on_stop/2,
     on_query/3,
+    on_batch_query/3,
     on_get_status/2
 ]).
 
@@ -37,8 +38,29 @@ on_start(InstId, Config) ->
 on_stop(_InstId, #{client := Client}) ->
     influxdb:stop_client(Client).
 
-on_query(InstId, {send_message, Data}, State) ->
-    do_query(InstId, {send_message, Data}, State).
+on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) ->
+    case data_to_points(Data, SyntaxLines) of
+        {ok, Points} ->
+            do_query(InstId, Client, Points);
+        {error, ErrorPoints} = Err ->
+            log_error_points(InstId, ErrorPoints),
+            Err
+    end.
+
+%% Once a Batched Data trans to points failed.
+%% This batch query failed
+on_batch_query(InstId, BatchData, State = #{write_syntax := SyntaxLines, client := Client}) ->
+    case on_get_status(InstId, State) of
+        connected ->
+            case parse_batch_data(InstId, BatchData, SyntaxLines) of
+                {ok, Points} ->
+                    do_query(InstId, Client, Points);
+                {error, Reason} ->
+                    {error, Reason}
+            end;
+        disconnected ->
+            {resource_down, disconnected}
+    end.
 
 on_get_status(_InstId, #{client := Client}) ->
     case influxdb:is_alive(Client) of
@@ -79,7 +101,7 @@ fields("api_v2_put") ->
 fields(basic) ->
     [
         {host,
-            mk(binary(), #{required => true, default => <<"120.0.0.1">>, desc => ?DESC("host")})},
+            mk(binary(), #{required => true, default => <<"127.0.0.1">>, desc => ?DESC("host")})},
         {port, mk(pos_integer(), #{required => true, default => 8086, desc => ?DESC("port")})},
         {precision,
             mk(enum([ns, us, ms, s, m, h]), #{
@@ -310,18 +332,7 @@ ssl_config(SSL = #{enable := true}) ->
 %% -------------------------------------------------------------------------------------------------
 %% Query
 
-do_query(InstId, {send_message, Data}, State = #{client := Client}) ->
-    {Points, Errs} = data_to_points(Data, State),
-    lists:foreach(
-        fun({error, Reason}) ->
-            ?SLOG(error, #{
-                msg => "influxdb trans point failed",
-                connector => InstId,
-                reason => Reason
-            })
-        end,
-        Errs
-    ),
+do_query(InstId, Client, Points) ->
     case influxdb:write(Client, Points) of
         ok ->
             ?SLOG(debug, #{
@@ -376,11 +387,45 @@ to_maps_config(K, V, Res) ->
 
 %% -------------------------------------------------------------------------------------------------
 %% Tags & Fields Data Trans
-data_to_points(Data, #{write_syntax := Lines}) ->
-    lines_to_points(Data, Lines, [], []).
+parse_batch_data(InstId, BatchData, SyntaxLines) ->
+    {Points, Errors} = lists:foldl(
+        fun({send_message, Data}, {AccIn, ErrAccIn}) ->
+            case data_to_points(Data, SyntaxLines) of
+                {ok, Points} ->
+                    {[Points | AccIn], ErrAccIn};
+                {error, ErrorPoints} ->
+                    log_error_points(InstId, ErrorPoints),
+                    {AccIn, ErrAccIn + 1}
+            end
+        end,
+        {[], 0},
+        BatchData
+    ),
+    case Errors of
+        0 ->
+            {ok, Points};
+        _ ->
+            ?SLOG(error, #{
+                msg => io_lib:format("InfluxDB trans point failed, count: ~p", [Errors]),
+                connector => InstId,
+                reason => points_trans_failed
+            }),
+            {error, points_trans_failed}
+    end.
 
-lines_to_points(_, [], Points, Errs) ->
-    {Points, Errs};
+data_to_points(Data, SyntaxLines) ->
+    lines_to_points(Data, SyntaxLines, [], []).
+
+%% When converting multiple rows data into InfluxDB Line Protocol, they are considered to be strongly correlated.
+%% And once a row fails to convert, all of them are considered to have failed.
+lines_to_points(_, [], Points, ErrorPoints) ->
+    case ErrorPoints of
+        [] ->
+            {ok, Points};
+        _ ->
+            %% ignore trans succeeded points
+            {error, ErrorPoints}
+    end;
 lines_to_points(
     Data,
     [
@@ -392,8 +437,8 @@ lines_to_points(
         }
         | Rest
     ],
-    ResAcc,
-    ErrAcc
+    ResultPointsAcc,
+    ErrorPointsAcc
 ) ->
     TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
     case emqx_plugin_libs_rule:proc_tmpl(Timestamp, Data, TransOptions) of
@@ -406,9 +451,11 @@ lines_to_points(
                 tags => EncodeTags,
                 fields => EncodeFields
             },
-            lines_to_points(Data, Rest, [Point | ResAcc], ErrAcc);
+            lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc);
         BadTimestamp ->
-            lines_to_points(Data, Rest, ResAcc, [{error, {bad_timestamp, BadTimestamp}} | ErrAcc])
+            lines_to_points(Data, Rest, ResultPointsAcc, [
+                {error, {bad_timestamp, BadTimestamp}} | ErrorPointsAcc
+            ])
     end.
 
 maps_config_to_data(K, V, {Data, Res}) ->
@@ -461,3 +508,16 @@ data_filter(Bool) when is_boolean(Bool) -> Bool;
 data_filter(Data) -> bin(Data).
 
 bin(Data) -> emqx_plugin_libs_rule:bin(Data).
+
+%% helper funcs
+log_error_points(InstId, Errs) ->
+    lists:foreach(
+        fun({error, Reason}) ->
+            ?SLOG(error, #{
+                msg => "influxdb trans point failed",
+                connector => InstId,
+                reason => Reason
+            })
+        end,
+        Errs
+    ).