Explorar o código

chore(connector aggregator): trigger delivery when max records is pushed

Fixes https://emqx.atlassian.net/browse/EMQX-12927
Thales Macedo Garitezi hai 1 ano
pai
achega
750123b4af

+ 1 - 0
apps/emqx_connector_aggregator/mix.exs

@@ -24,6 +24,7 @@ defmodule EMQXConnectorAggregator.MixProject do
   def deps() do
     [
       {:emqx, in_umbrella: true},
+      UMP.common_dep(:gproc),
       {:erl_csv, "0.2.0"}
     ]
   end

+ 3 - 2
apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src

@@ -1,10 +1,11 @@
 {application, emqx_connector_aggregator, [
     {description, "EMQX Enterprise Connector Data Aggregator"},
-    {vsn, "0.1.2"},
+    {vsn, "0.1.3"},
     {registered, []},
     {applications, [
         kernel,
-        stdlib
+        stdlib,
+        gproc
     ]},
     {env, []},
     {modules, []},

+ 23 - 7
apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl

@@ -85,28 +85,34 @@ buffer_to_map(#buffer{} = Buffer) ->
 %%
 
 write_records_limited(Name, Buffer = #buffer{max_records = undefined}, Records) ->
-    write_records(Name, Buffer, Records);
+    write_records(Name, Buffer, Records, _NumWritten = undefined);
 write_records_limited(Name, Buffer = #buffer{max_records = MaxRecords}, Records) ->
     NR = length(Records),
     case inc_num_records(Buffer, NR) of
         NR ->
             %% NOTE: Allow unconditionally if it's the first write.
-            write_records(Name, Buffer, Records);
+            write_records(Name, Buffer, Records, NR);
         NWritten when NWritten > MaxRecords ->
             NextBuffer = rotate_buffer(Name, Buffer),
             write_records_limited(Name, NextBuffer, Records);
-        _ ->
-            write_records(Name, Buffer, Records)
+        NWritten ->
+            write_records(Name, Buffer, Records, NWritten)
     end.
 
-write_records(Name, Buffer = #buffer{fd = Writer}, Records) ->
+write_records(Name, Buffer = #buffer{fd = Writer, max_records = MaxRecords}, Records, NumWritten) ->
     case emqx_connector_aggreg_buffer:write(Records, Writer) of
         ok ->
             ?tp(connector_aggreg_records_written, #{action => Name, records => Records}),
+            case is_number(NumWritten) andalso NumWritten >= MaxRecords of
+                true ->
+                    rotate_buffer_async(Name, Buffer);
+                false ->
+                    ok
+            end,
             ok;
         {error, terminated} ->
             BufferNext = rotate_buffer(Name, Buffer),
-            write_records(Name, BufferNext, Records);
+            write_records_limited(Name, BufferNext, Records);
         {error, _} = Error ->
             Error
     end.
@@ -120,6 +126,9 @@ next_buffer(Name, Timestamp) ->
 rotate_buffer(Name, #buffer{fd = FD}) ->
     gen_server:call(?SRVREF(Name), {rotate_buffer, FD}).
 
+rotate_buffer_async(Name, #buffer{fd = FD}) ->
+    gen_server:cast(?SRVREF(Name), {rotate_buffer, FD}).
+
 send_close_buffer(Name, Timestamp) ->
     gen_server:cast(?SRVREF(Name), {close_buffer, Timestamp}).
 
@@ -184,6 +193,9 @@ handle_call(take_error, _From, St0) ->
 
 handle_cast({close_buffer, Timestamp}, St) ->
     {noreply, handle_close_buffer(Timestamp, St)};
+handle_cast({rotate_buffer, FD}, St0) ->
+    St = handle_rotate_buffer(FD, St0),
+    {noreply, St, 0};
 handle_cast(_Cast, St) ->
     {noreply, St}.
 
@@ -469,6 +481,8 @@ parse_filename(Filename) ->
 
 %%
 
+-define(COUNTER_POS, 2).
+
 add_counter({Tab, Counter}) ->
     add_counter({Tab, Counter}, 0).
 
@@ -476,11 +490,13 @@ add_counter({Tab, Counter}, N) ->
     ets:insert(Tab, {Counter, N}).
 
 inc_counter({Tab, Counter}, Size) ->
-    ets:update_counter(Tab, Counter, {2, Size}).
+    ets:update_counter(Tab, Counter, {?COUNTER_POS, Size}).
 
 del_counter({Tab, Counter}) ->
     ets:delete(Tab, Counter).
 
+-undef(COUNTER_POS).
+
 %%
 
 create_tab(Name) ->

+ 98 - 0
apps/emqx_connector_aggregator/test/emqx_connector_aggregator_SUITE.erl

@@ -0,0 +1,98 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_connector_aggregator_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    Apps = emqx_cth_suite:start(
+        [emqx_connector_aggregator],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    [{apps, Apps} | Config].
+
+end_per_suite(Config) ->
+    Apps = ?config(apps, Config),
+    emqx_cth_suite:stop(Apps),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% `emqx_connector_aggreg_delivery' API
+%%------------------------------------------------------------------------------
+
+init_transfer_state(_Buffer, Opts) ->
+    #{opts => Opts}.
+
+process_append(_IOData, State) ->
+    State.
+
+process_write(State) ->
+    {ok, State}.
+
+process_complete(_State) ->
+    {ok, done}.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+now_ms() ->
+    erlang:system_time(millisecond).
+
+%%------------------------------------------------------------------------------
+%% Test cases
+%%------------------------------------------------------------------------------
+
+%% Verifies that a new delivery is triggered when we reach the configured maximum number
+%% of records.
+t_trigger_max_records(Config) ->
+    ?check_trace(
+        #{timetrap => 3_000},
+        begin
+            AggregId = ?FUNCTION_NAME,
+            MaxRecords = 3,
+            AggregOpts = #{
+                max_records => MaxRecords,
+                time_interval => 120_000,
+                work_dir => emqx_cth_suite:work_dir(Config)
+            },
+            ContainerOpts = #{
+                type => csv,
+                column_order => []
+            },
+            DeliveryOpts = #{
+                callback_module => ?MODULE,
+                container => ContainerOpts,
+                upload_options => #{}
+            },
+            {ok, _Sup} = emqx_connector_aggreg_upload_sup:start_link(
+                AggregId, AggregOpts, DeliveryOpts
+            ),
+            Timestamp = now_ms(),
+            Records = lists:duplicate(MaxRecords, #{}),
+            %% Should immediately trigger a delivery process to be kicked off.
+            ?assertMatch(
+                {ok, {ok, _}},
+                ?wait_async_action(
+                    emqx_connector_aggregator:push_records(AggregId, Timestamp, Records),
+                    #{?snk_kind := connector_aggreg_delivery_completed}
+                )
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.

+ 1 - 0
changes/ee/fix-13724.en.md

@@ -0,0 +1 @@
+Azure Blob Storage and S3 actions in aggregated mode now trigger sending aggregated data sooner after the maximum number of records is reached.