Browse Source

feat(s3-bridge): implement aggregated upload action

Andrew Mayorov 1 năm trước cách đây
mục cha
commit
ccbcc0c4e3
23 tập tin đã thay đổi với 2338 bổ sung199 xóa
  1. 2 1
      apps/emqx_bridge/src/emqx_action_info.erl
  2. 8 2
      apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src
  3. 0 104
      apps/emqx_bridge_s3/src/emqx_bridge_s3.erl
  4. 6 1
      apps/emqx_bridge_s3/src/emqx_bridge_s3.hrl
  5. 138 0
      apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_buffer.erl
  6. 98 0
      apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_csv.erl
  7. 162 0
      apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_delivery.erl
  8. 240 0
      apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl
  9. 21 0
      apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_action_info.erl
  10. 72 0
      apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_sup.erl
  11. 485 0
      apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.erl
  12. 15 0
      apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.hrl
  13. 16 0
      apps/emqx_bridge_s3/src/emqx_bridge_s3_app.erl
  14. 161 22
      apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl
  15. 1 1
      apps/emqx_bridge_s3/src/emqx_bridge_s3_connector_info.erl
  16. 42 0
      apps/emqx_bridge_s3/src/emqx_bridge_s3_sup.erl
  17. 142 0
      apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl
  18. 5 3
      apps/emqx_bridge_s3/src/emqx_bridge_s3_action_info.erl
  19. 47 65
      apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl
  20. 181 0
      apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_buffer_SUITE.erl
  21. 72 0
      apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_csv_tests.erl
  22. 372 0
      apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl
  23. 52 0
      apps/emqx_bridge_s3/test/emqx_bridge_s3_test_helpers.erl

+ 2 - 1
apps/emqx_bridge/src/emqx_action_info.erl

@@ -118,7 +118,8 @@ hard_coded_action_info_modules_ee() ->
         emqx_bridge_pulsar_action_info,
         emqx_bridge_greptimedb_action_info,
         emqx_bridge_tdengine_action_info,
-        emqx_bridge_s3_action_info
+        emqx_bridge_s3_upload_action_info,
+        emqx_bridge_s3_aggreg_upload_action_info
     ].
 -else.
 hard_coded_action_info_modules_ee() ->

+ 8 - 2
apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src

@@ -10,9 +10,15 @@
         emqx_s3
     ]},
     {env, [
-        {emqx_action_info_modules, [emqx_bridge_s3_action_info]},
-        {emqx_connector_info_modules, [emqx_bridge_s3_connector_info]}
+        {emqx_action_info_modules, [
+            emqx_bridge_s3_upload_action_info,
+            emqx_bridge_s3_aggreg_upload_action_info
+        ]},
+        {emqx_connector_info_modules, [
+            emqx_bridge_s3_connector_info
+        ]}
     ]},
+    {mod, {emqx_bridge_s3_app, []}},
     {modules, []},
     {links, []}
 ]}.

+ 0 - 104
apps/emqx_bridge_s3/src/emqx_bridge_s3.erl

@@ -19,7 +19,6 @@
 ]).
 
 -export([
-    bridge_v2_examples/1,
     connector_examples/1
 ]).
 
@@ -39,58 +38,11 @@ fields(Field) when
     Field == "post_connector"
 ->
     emqx_connector_schema:api_fields(Field, ?CONNECTOR, fields(s3_connector_config));
-fields(Field) when
-    Field == "get_bridge_v2";
-    Field == "put_bridge_v2";
-    Field == "post_bridge_v2"
-->
-    emqx_bridge_v2_schema:api_fields(Field, ?ACTION, fields(?ACTION));
-fields(action) ->
-    {?ACTION,
-        hoconsc:mk(
-            hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)),
-            #{
-                desc => <<"S3 Action Config">>,
-                required => false
-            }
-        )};
 fields("config_connector") ->
     emqx_connector_schema:common_fields() ++ fields(s3_connector_config);
-fields(?ACTION) ->
-    emqx_bridge_v2_schema:make_producer_action_schema(
-        hoconsc:mk(
-            ?R_REF(s3_upload_parameters),
-            #{
-                required => true,
-                desc => ?DESC(s3_upload)
-            }
-        ),
-        #{
-            resource_opts_ref => ?R_REF(s3_action_resource_opts)
-        }
-    );
 fields(s3_connector_config) ->
     emqx_s3_schema:fields(s3_client) ++
         emqx_connector_schema:resource_opts_ref(?MODULE, s3_connector_resource_opts);
-fields(s3_upload_parameters) ->
-    emqx_s3_schema:fields(s3_upload) ++
-        [
-            {content,
-                hoconsc:mk(
-                    emqx_schema:template(),
-                    #{
-                        required => false,
-                        default => <<"${.}">>,
-                        desc => ?DESC(s3_object_content)
-                    }
-                )}
-        ];
-fields(s3_action_resource_opts) ->
-    UnsupportedOpts = [batch_size, batch_time],
-    lists:filter(
-        fun({N, _}) -> not lists:member(N, UnsupportedOpts) end,
-        emqx_bridge_v2_schema:action_resource_opts_fields()
-    );
 fields(s3_connector_resource_opts) ->
     CommonOpts = emqx_connector_schema:common_resource_opts_subfields(),
     lists:filter(
@@ -100,14 +52,6 @@ fields(s3_connector_resource_opts) ->
 
 desc("config_connector") ->
     ?DESC(config_connector);
-desc(?ACTION) ->
-    ?DESC(s3_upload);
-desc(s3_upload) ->
-    ?DESC(s3_upload);
-desc(s3_upload_parameters) ->
-    ?DESC(s3_upload_parameters);
-desc(s3_action_resource_opts) ->
-    ?DESC(emqx_resource_schema, resource_opts);
 desc(s3_connector_resource_opts) ->
     ?DESC(emqx_resource_schema, resource_opts);
 desc(_Name) ->
@@ -115,54 +59,6 @@ desc(_Name) ->
 
 %% Examples
 
-bridge_v2_examples(Method) ->
-    [
-        #{
-            <<"s3">> => #{
-                summary => <<"S3 Simple Upload">>,
-                value => action_example(Method)
-            }
-        }
-    ].
-
-action_example(post) ->
-    maps:merge(
-        action_example(put),
-        #{
-            type => atom_to_binary(?ACTION),
-            name => <<"my_s3_action">>
-        }
-    );
-action_example(get) ->
-    maps:merge(
-        action_example(put),
-        #{
-            status => <<"connected">>,
-            node_status => [
-                #{
-                    node => <<"emqx@localhost">>,
-                    status => <<"connected">>
-                }
-            ]
-        }
-    );
-action_example(put) ->
-    #{
-        enable => true,
-        connector => <<"my_s3_connector">>,
-        description => <<"My action">>,
-        parameters => #{
-            bucket => <<"${clientid}">>,
-            key => <<"${topic}">>,
-            content => <<"${payload}">>,
-            acl => <<"public_read">>
-        },
-        resource_opts => #{
-            query_mode => <<"sync">>,
-            inflight_window => 10
-        }
-    }.
-
 connector_examples(Method) ->
     [
         #{

+ 6 - 1
apps/emqx_bridge_s3/src/emqx_bridge_s3.hrl

@@ -5,7 +5,12 @@
 -ifndef(__EMQX_BRIDGE_S3_HRL__).
 -define(__EMQX_BRIDGE_S3_HRL__, true).
 
--define(ACTION, s3).
+%% Actions
+-define(ACTION_UPLOAD, s3).
+-define(BRIDGE_TYPE_UPLOAD, <<"s3">>).
+-define(ACTION_AGGREGATED_UPLOAD, s3_aggregated_upload).
+-define(BRIDGE_TYPE_AGGREGATED_UPLOAD, <<"s3_aggregated_upload">>).
+
 -define(CONNECTOR, s3).
 
 -endif.

+ 138 - 0
apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_buffer.erl

@@ -0,0 +1,138 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+%% This module provides pretty stupid interface for writing and reading
+%% Erlang terms to/from a file descriptor (i.e. IO device), with a goal
+%% of being able to write and read terms in a streaming fashion, and to
+%% survive partial corruption.
+%%
+%% Layout of the file is as follows:
+%% ```
+%% ETF(Header { Metadata })
+%% ETF(Record1 ByteSize)
+%% ETF(Record1)
+%% ETF(Record2 ByteSize)
+%% ETF(Record2)
+%% ...
+%% ```
+%% ^ ETF = Erlang External Term Format (i.e. `erlang:term_to_binary/1`).
+-module(emqx_bridge_s3_aggreg_buffer).
+
+-export([
+    new_writer/2,
+    write/2,
+    takeover/1
+]).
+
+-export([
+    new_reader/1,
+    read/1
+]).
+
+-export_type([writer/0, reader/0]).
+
+-record(reader, {
+    fd :: file:io_device() | eof,
+    buffer :: binary(),
+    hasread = 0 :: non_neg_integer()
+}).
+
+-type writer() :: file:io_device().
+-type reader() :: #reader{}.
+
+%%
+
+-define(VSN, 1).
+-define(HEADER(MD), [?MODULE, ?VSN, MD]).
+
+-define(READAHEAD_BYTES, 64 * 4096).
+-define(SANE_TERM_SIZE, 256 * 1024 * 1024).
+
+%%
+
+-spec new_writer(file:io_device(), _Meta) -> writer().
+new_writer(FD, Meta) ->
+    %% TODO: Validate header is not too big?
+    Header = term_to_iovec(?HEADER(Meta)),
+    case file:write(FD, Header) of
+        ok ->
+            FD;
+        {error, Reason} ->
+            error({buffer_write_failed, Reason})
+    end.
+
+-spec write(_Term, writer()) -> ok | {error, file:posix()}.
+write(Term, FD) ->
+    IOData = term_to_iovec(Term),
+    Marker = term_to_binary(iolist_size(IOData)),
+    file:write(FD, [Marker | IOData]).
+
+%%
+
+-spec new_reader(file:io_device()) -> {_Meta, reader()}.
+new_reader(FD) ->
+    Reader0 = #reader{fd = FD, buffer = <<>>},
+    Reader1 = read_buffered(?READAHEAD_BYTES, Reader0),
+    case read_next_term(Reader1) of
+        {?HEADER(MD), Reader} ->
+            {MD, Reader};
+        {UnexpectedHeader, _Reader} ->
+            error({buffer_unexpected_header, UnexpectedHeader});
+        eof ->
+            error({buffer_incomplete, header})
+    end.
+
+-spec read(reader()) -> {_Term, reader()} | eof.
+read(Reader0) ->
+    case read_next_term(read_buffered(_LargeEnough = 16, Reader0)) of
+        {Size, Reader1} when is_integer(Size) andalso Size > 0 andalso Size < ?SANE_TERM_SIZE ->
+            case read_next_term(read_buffered(Size, Reader1)) of
+                {Term, Reader} ->
+                    {Term, Reader};
+                eof ->
+                    error({buffer_incomplete, Size})
+            end;
+        {UnexpectedSize, _Reader} ->
+            error({buffer_unexpected_record_size, UnexpectedSize});
+        eof ->
+            eof
+    end.
+
+-spec takeover(reader()) -> writer().
+takeover(#reader{fd = FD, hasread = HasRead}) ->
+    case file:position(FD, HasRead) of
+        {ok, HasRead} ->
+            case file:truncate(FD) of
+                ok ->
+                    FD;
+                {error, Reason} ->
+                    error({buffer_takeover_failed, Reason})
+            end;
+        {error, Reason} ->
+            error({buffer_takeover_failed, Reason})
+    end.
+
+read_next_term(#reader{fd = eof, buffer = <<>>}) ->
+    eof;
+read_next_term(Reader = #reader{buffer = Buffer, hasread = HasRead}) ->
+    {Term, UsedBytes} = erlang:binary_to_term(Buffer, [safe, used]),
+    BufferSize = byte_size(Buffer),
+    BufferLeft = binary:part(Buffer, UsedBytes, BufferSize - UsedBytes),
+    {Term, Reader#reader{buffer = BufferLeft, hasread = HasRead + UsedBytes}}.
+
+read_buffered(_Size, Reader = #reader{fd = eof}) ->
+    Reader;
+read_buffered(Size, Reader = #reader{fd = FD, buffer = Buffer0}) ->
+    BufferSize = byte_size(Buffer0),
+    ReadSize = erlang:max(Size, ?READAHEAD_BYTES),
+    case BufferSize < Size andalso file:read(FD, ReadSize) of
+        false ->
+            Reader;
+        {ok, Data} ->
+            Reader#reader{buffer = <<Buffer0/binary, Data/binary>>};
+        eof ->
+            Reader#reader{fd = eof};
+        {error, Reason} ->
+            error({buffer_read_failed, Reason})
+    end.

+ 98 - 0
apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_csv.erl

@@ -0,0 +1,98 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+%% CSV container implementation for `emqx_bridge_s3_aggregator`.
+-module(emqx_bridge_s3_aggreg_csv).
+
+%% Container API
+-export([
+    new/1,
+    fill/2,
+    close/1
+]).
+
+-export_type([container/0]).
+
+-record(csv, {
+    columns :: [binary()] | undefined,
+    order :: [binary()],
+    separator :: char() | iodata(),
+    delimiter :: char() | iodata(),
+    quoting_mp :: _ReMP
+}).
+
+-type container() :: #csv{}.
+-type options() :: #{column_order => [column()]}.
+
+-type record() :: emqx_bridge_s3_aggregator:record().
+-type column() :: binary().
+
+%%
+
+-spec new(options()) -> container().
+new(Opts) ->
+    {ok, MP} = re:compile("[\\[\\],\\r\\n\"]", [unicode]),
+    #csv{
+        order = maps:get(column_order, Opts, []),
+        separator = $,,
+        delimiter = $\n,
+        quoting_mp = MP
+    }.
+
+-spec fill([record()], container()) -> {iodata(), container()}.
+fill(Records = [Record | _], CSV0 = #csv{columns = undefined}) ->
+    Columns = mk_columns(Record, CSV0),
+    Header = emit_header(Columns, CSV0),
+    {Writes, CSV} = fill(Records, CSV0#csv{columns = Columns}),
+    {[Header | Writes], CSV};
+fill(Records, CSV) ->
+    Writes = [emit_row(R, CSV) || R <- Records],
+    {Writes, CSV}.
+
+-spec close(container()) -> iodata().
+close(#csv{}) ->
+    [].
+
+%%
+
+mk_columns(Record, #csv{order = ColumnOrder}) ->
+    Columns = lists:sort(maps:keys(Record)),
+    OrderedFirst = [CO || CO <- ColumnOrder, lists:member(CO, Columns)],
+    Unoredered = Columns -- ColumnOrder,
+    OrderedFirst ++ Unoredered.
+
+-spec emit_header([column()], container()) -> iodata().
+emit_header([C], #csv{delimiter = Delim}) ->
+    [C, Delim];
+emit_header([C | Rest], CSV = #csv{separator = Sep}) ->
+    [C, Sep | emit_header(Rest, CSV)];
+emit_header([], #csv{delimiter = Delim}) ->
+    [Delim].
+
+-spec emit_row(record(), container()) -> iodata().
+emit_row(Record, CSV = #csv{columns = Columns}) ->
+    emit_row(Record, Columns, CSV).
+
+emit_row(Record, [C], CSV = #csv{delimiter = Delim}) ->
+    [emit_cell(C, Record, CSV), Delim];
+emit_row(Record, [C | Rest], CSV = #csv{separator = Sep}) ->
+    [emit_cell(C, Record, CSV), Sep | emit_row(Record, Rest, CSV)];
+emit_row(#{}, [], #csv{delimiter = Delim}) ->
+    [Delim].
+
+emit_cell(Column, Record, CSV) ->
+    case maps:get(Column, Record, undefined) of
+        undefined ->
+            _Empty = "";
+        Value ->
+            encode_cell(emqx_template:to_string(Value), CSV)
+    end.
+
+encode_cell(V, #csv{quoting_mp = MP}) ->
+    case re:run(V, MP, []) of
+        nomatch ->
+            V;
+        _ ->
+            [$", re:replace(V, <<"\"">>, <<"\"\"">>, [global, unicode]), $"]
+    end.

+ 162 - 0
apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_delivery.erl

@@ -0,0 +1,162 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+%% This module takes aggregated records from a buffer and delivers them to S3,
+%% wrapped in a configurable container (though currently there's only CSV).
+-module(emqx_bridge_s3_aggreg_delivery).
+
+-include_lib("snabbkaffe/include/trace.hrl").
+-include("emqx_bridge_s3_aggregator.hrl").
+
+-export([start_link/3]).
+
+%% Internal exports
+-export([run_delivery/3]).
+
+-behaviour(emqx_template).
+-export([lookup/2]).
+
+-record(delivery, {
+    name :: _Name,
+    container :: emqx_bridge_s3_aggreg_csv:container(),
+    reader :: emqx_bridge_s3_aggreg_buffer:reader(),
+    upload :: emqx_s3_upload:t(),
+    empty :: boolean()
+}).
+
+%%
+
+start_link(Name, Buffer, Opts) ->
+    proc_lib:start_link(?MODULE, run_delivery, [Name, Buffer, Opts]).
+
+%%
+
+run_delivery(Name, Buffer, Opts) ->
+    ?tp(s3_aggreg_delivery_started, #{action => Name, buffer => Buffer}),
+    Reader = open_buffer(Buffer),
+    Delivery = init_delivery(Name, Reader, Buffer, Opts#{action => Name}),
+    ok = proc_lib:init_ack({ok, self()}),
+    loop_deliver(Delivery).
+
+init_delivery(Name, Reader, Buffer, Opts = #{container := ContainerOpts}) ->
+    #delivery{
+        name = Name,
+        container = mk_container(ContainerOpts),
+        reader = Reader,
+        upload = mk_upload(Buffer, Opts),
+        empty = true
+    }.
+
+loop_deliver(Delivery = #delivery{reader = Reader0}) ->
+    case emqx_bridge_s3_aggreg_buffer:read(Reader0) of
+        {Records = [#{} | _], Reader} ->
+            loop_deliver_records(Records, Delivery#delivery{reader = Reader});
+        {[], Reader} ->
+            loop_deliver(Delivery#delivery{reader = Reader});
+        eof ->
+            complete_delivery(Delivery);
+        {Unexpected, _Reader} ->
+            exit({buffer_unexpected_record, Unexpected})
+    end.
+
+loop_deliver_records(Records, Delivery = #delivery{container = Container0, upload = Upload0}) ->
+    {Writes, Container} = emqx_bridge_s3_aggreg_csv:fill(Records, Container0),
+    {ok, Upload} = emqx_s3_upload:append(Writes, Upload0),
+    loop_deliver_upload(Delivery#delivery{
+        container = Container,
+        upload = Upload,
+        empty = false
+    }).
+
+loop_deliver_upload(Delivery = #delivery{upload = Upload0}) ->
+    case emqx_s3_upload:write(Upload0) of
+        {ok, Upload} ->
+            loop_deliver(Delivery#delivery{upload = Upload});
+        {cont, Upload} ->
+            loop_deliver_upload(Delivery#delivery{upload = Upload});
+        {error, Reason} ->
+            %% TODO: retries
+            _ = emqx_s3_upload:abort(Upload0),
+            exit({upload_failed, Reason})
+    end.
+
+complete_delivery(#delivery{name = Name, empty = true}) ->
+    ?tp(s3_aggreg_delivery_completed, #{action => Name, upload => empty}),
+    exit({shutdown, {skipped, empty}});
+complete_delivery(#delivery{name = Name, container = Container, upload = Upload0}) ->
+    Trailer = emqx_bridge_s3_aggreg_csv:close(Container),
+    {ok, Upload} = emqx_s3_upload:append(Trailer, Upload0),
+    case emqx_s3_upload:complete(Upload) of
+        {ok, Completed} ->
+            ?tp(s3_aggreg_delivery_completed, #{action => Name, upload => Completed}),
+            ok;
+        {error, Reason} ->
+            %% TODO: retries
+            _ = emqx_s3_upload:abort(Upload),
+            exit({upload_failed, Reason})
+    end.
+
+mk_container(#{type := csv, column_order := OrderOpt}) ->
+    %% TODO: Deduplicate?
+    ColumnOrder = lists:map(fun emqx_utils_conv:bin/1, OrderOpt),
+    emqx_bridge_s3_aggreg_csv:new(#{column_order => ColumnOrder}).
+
+mk_upload(
+    Buffer,
+    Opts = #{
+        bucket := Bucket,
+        upload_options := UploadOpts,
+        client_config := Config,
+        uploader_config := UploaderConfig
+    }
+) ->
+    Client = emqx_s3_client:create(Bucket, Config),
+    Key = mk_object_key(Buffer, Opts),
+    emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig).
+
+mk_object_key(Buffer, #{action := Name, key := Template}) ->
+    emqx_template:render_strict(Template, {?MODULE, {Name, Buffer}}).
+
+open_buffer(#buffer{filename = Filename}) ->
+    case file:open(Filename, [read, binary, raw]) of
+        {ok, FD} ->
+            {_Meta, Reader} = emqx_bridge_s3_aggreg_buffer:new_reader(FD),
+            Reader;
+        {error, Reason} ->
+            error({buffer_open_failed, Reason})
+    end.
+
+%%
+
+-spec lookup(emqx_template:accessor(), {_Name, buffer()}) ->
+    {ok, integer() | string()} | {error, undefined}.
+lookup([<<"action">>], {Name, _Buffer}) ->
+    {ok, mk_fs_safe_string(Name)};
+lookup(Accessor, {_Name, Buffer = #buffer{}}) ->
+    lookup_buffer_var(Accessor, Buffer);
+lookup(_Accessor, _Context) ->
+    {error, undefined}.
+
+lookup_buffer_var([<<"datetime">>, Format], #buffer{since = Since}) ->
+    {ok, format_timestamp(Since, Format)};
+lookup_buffer_var([<<"datetime_until">>, Format], #buffer{until = Until}) ->
+    {ok, format_timestamp(Until, Format)};
+lookup_buffer_var([<<"sequence">>], #buffer{seq = Seq}) ->
+    {ok, Seq};
+lookup_buffer_var([<<"node">>], #buffer{}) ->
+    {ok, mk_fs_safe_string(atom_to_binary(erlang:node()))};
+lookup_buffer_var(_Binding, _Context) ->
+    {error, undefined}.
+
+format_timestamp(Timestamp, <<"rfc3339utc">>) ->
+    String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}, {offset, "Z"}]),
+    mk_fs_safe_string(String);
+format_timestamp(Timestamp, <<"rfc3339">>) ->
+    String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}]),
+    mk_fs_safe_string(String);
+format_timestamp(Timestamp, <<"unix">>) ->
+    Timestamp.
+
+mk_fs_safe_string(String) ->
+    unicode:characters_to_binary(string:replace(String, ":", "_", all)).

+ 240 - 0
apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl

@@ -0,0 +1,240 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_s3_aggreg_upload).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include("emqx_bridge_s3.hrl").
+
+-define(ACTION, ?ACTION_AGGREGATED_UPLOAD).
+
+-define(DEFAULT_BATCH_SIZE, 100).
+-define(DEFAULT_BATCH_TIME, <<"10ms">>).
+
+-behaviour(hocon_schema).
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1
+]).
+
+%% Interpreting options
+-export([
+    mk_key_template/1
+]).
+
+%% emqx_bridge_v2_schema API
+-export([bridge_v2_examples/1]).
+
+%%-------------------------------------------------------------------------------------------------
+%% `hocon_schema' API
+%%-------------------------------------------------------------------------------------------------
+
+namespace() ->
+    "bridge_s3".
+
+roots() ->
+    [].
+
+fields(Field) when
+    Field == "get_bridge_v2";
+    Field == "put_bridge_v2";
+    Field == "post_bridge_v2"
+->
+    emqx_bridge_v2_schema:api_fields(Field, ?ACTION, fields(?ACTION));
+fields(action) ->
+    {?ACTION,
+        hoconsc:mk(
+            hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)),
+            #{
+                desc => <<"S3 Aggregated Upload Action Config">>,
+                required => false
+            }
+        )};
+fields(?ACTION) ->
+    emqx_bridge_v2_schema:make_producer_action_schema(
+        hoconsc:mk(
+            ?R_REF(s3_aggregated_upload_parameters),
+            #{
+                required => true,
+                desc => ?DESC(s3_aggregated_upload)
+            }
+        ),
+        #{
+            resource_opts_ref => ?R_REF(s3_aggreg_upload_resource_opts)
+        }
+    );
+fields(s3_aggregated_upload_parameters) ->
+    [
+        {container,
+            hoconsc:mk(
+                %% TODO: Support selectors once there are more than one container.
+                hoconsc:union(fun
+                    (all_union_members) -> [?REF(s3_container_csv)];
+                    ({value, _Valur}) -> [?REF(s3_container_csv)]
+                end),
+                #{
+                    required => true,
+                    default => #{<<"type">> => <<"csv">>},
+                    desc => ?DESC(s3_aggregated_container)
+                }
+            )},
+        {aggregation,
+            hoconsc:mk(
+                ?REF(s3_aggregation),
+                #{
+                    required => true,
+                    desc => ?DESC(s3_aggregation)
+                }
+            )}
+    ] ++
+        emqx_s3_schema:fields(s3_upload) ++
+        emqx_s3_schema:fields(s3_uploader);
+fields(s3_container_csv) ->
+    [
+        {type,
+            hoconsc:mk(
+                csv,
+                #{
+                    required => true,
+                    desc => ?DESC(s3_aggregated_container_csv)
+                }
+            )},
+        {column_order,
+            hoconsc:mk(
+                hoconsc:array(string()),
+                #{
+                    required => false,
+                    default => [],
+                    desc => ?DESC(s3_aggregated_container_csv_column_order)
+                }
+            )}
+    ];
+fields(s3_aggregation) ->
+    [
+        %% TODO: Needs bucketing? (e.g. messages falling in this 1h interval)
+        {time_interval,
+            hoconsc:mk(
+                emqx_schema:duration_s(),
+                #{
+                    required => false,
+                    default => <<"1h">>,
+                    desc => ?DESC(s3_aggregation_interval)
+                }
+            )},
+        {max_records,
+            hoconsc:mk(
+                pos_integer(),
+                #{
+                    required => false,
+                    default => <<"1000000">>,
+                    desc => ?DESC(s3_aggregation_max_records)
+                }
+            )}
+    ];
+fields(s3_aggreg_upload_resource_opts) ->
+    %% NOTE: This action should benefit from generous batching defaults.
+    emqx_bridge_v2_schema:action_resource_opts_fields([
+        {batch_size, #{default => ?DEFAULT_BATCH_SIZE}},
+        {batch_time, #{default => ?DEFAULT_BATCH_TIME}}
+    ]).
+
+desc(?ACTION) ->
+    ?DESC(s3_aggregated_upload);
+desc(s3_aggregated_upload_parameters) ->
+    ?DESC(s3_aggregated_upload_parameters);
+desc(s3_aggreg_upload_resource_opts) ->
+    ?DESC(emqx_resource_schema, resource_opts);
+desc(_Name) ->
+    undefined.
+
+%% Interpreting options
+
+-spec mk_key_template(string()) -> emqx_template:str().
+mk_key_template(Key) ->
+    Template = emqx_template:parse(Key),
+    {_, BindingErrors} = emqx_template:render(Template, #{}),
+    {UsedBindings, _} = lists:unzip(BindingErrors),
+    SuffixTemplate = mk_suffix_template(UsedBindings),
+    case emqx_template:is_const(SuffixTemplate) of
+        true ->
+            Template;
+        false ->
+            Template ++ SuffixTemplate
+    end.
+
+mk_suffix_template(UsedBindings) ->
+    RequiredBindings = ["action", "node", "datetime.", "sequence"],
+    SuffixBindings = [
+        mk_default_binding(RB)
+     || RB <- RequiredBindings,
+        lists:all(fun(UB) -> string:prefix(UB, RB) == nomatch end, UsedBindings)
+    ],
+    SuffixTemplate = [["/", B] || B <- SuffixBindings],
+    emqx_template:parse(SuffixTemplate).
+
+mk_default_binding("datetime.") ->
+    "${datetime.rfc3339utc}";
+mk_default_binding(Binding) ->
+    "${" ++ Binding ++ "}".
+
+%% Examples
+
+bridge_v2_examples(Method) ->
+    [
+        #{
+            <<"s3_aggregated_upload">> => #{
+                summary => <<"S3 Aggregated Upload">>,
+                value => s3_action_example(Method)
+            }
+        }
+    ].
+
+s3_action_example(post) ->
+    maps:merge(
+        s3_action_example(put),
+        #{
+            type => atom_to_binary(?ACTION_UPLOAD),
+            name => <<"my_s3_action">>
+        }
+    );
+s3_action_example(get) ->
+    maps:merge(
+        s3_action_example(put),
+        #{
+            status => <<"connected">>,
+            node_status => [
+                #{
+                    node => <<"emqx@localhost">>,
+                    status => <<"connected">>
+                }
+            ]
+        }
+    );
+s3_action_example(put) ->
+    #{
+        enable => true,
+        connector => <<"my_s3_connector">>,
+        description => <<"My action">>,
+        parameters => #{
+            bucket => <<"mqtt-aggregated">>,
+            key => <<"${action}/${node}/${datetime.rfc3339utc}_N${sequence}.csv">>,
+            acl => <<"public_read">>,
+            aggregation => #{
+                time_interval => <<"15m">>,
+                max_records => 100_000
+            },
+            <<"container">> => #{
+                type => <<"csv">>,
+                column_order => [<<"clientid">>, <<"topic">>, <<"publish_received_at">>]
+            }
+        },
+        resource_opts => #{
+            health_check_interval => <<"10s">>,
+            query_mode => <<"async">>,
+            inflight_window => 100
+        }
+    }.

+ 21 - 0
apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_action_info.erl

@@ -0,0 +1,21 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_s3_aggreg_upload_action_info).
+
+-behaviour(emqx_action_info).
+
+-include("emqx_bridge_s3.hrl").
+
+-export([
+    action_type_name/0,
+    connector_type_name/0,
+    schema_module/0
+]).
+
+action_type_name() -> ?ACTION_AGGREGATED_UPLOAD.
+
+connector_type_name() -> s3.
+
+schema_module() -> emqx_bridge_s3_aggreg_upload.

+ 72 - 0
apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_sup.erl

@@ -0,0 +1,72 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_s3_aggreg_upload_sup).
+
+-export([
+    start_link/3,
+    start_link_delivery_sup/2
+]).
+
+-export([
+    start_delivery/2,
+    start_delivery_proc/3
+]).
+
+-behaviour(supervisor).
+-export([init/1]).
+
+-define(SUPREF(NAME), {via, gproc, {n, l, {?MODULE, NAME}}}).
+
+%%
+
+start_link(Name, AggregOpts, DeliveryOpts) ->
+    supervisor:start_link(?MODULE, {root, Name, AggregOpts, DeliveryOpts}).
+
+start_link_delivery_sup(Name, DeliveryOpts) ->
+    supervisor:start_link(?SUPREF(Name), ?MODULE, {delivery, Name, DeliveryOpts}).
+
+%%
+
+start_delivery(Name, Buffer) ->
+    supervisor:start_child(?SUPREF(Name), [Buffer]).
+
+start_delivery_proc(Name, DeliveryOpts, Buffer) ->
+    emqx_bridge_s3_aggreg_delivery:start_link(Name, Buffer, DeliveryOpts).
+
+%%
+
+init({root, Name, AggregOpts, DeliveryOpts}) ->
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 10,
+        period => 5
+    },
+    AggregatorChildSpec = #{
+        id => aggregator,
+        start => {emqx_bridge_s3_aggregator, start_link, [Name, AggregOpts]},
+        type => worker,
+        restart => permanent
+    },
+    DeliverySupChildSpec = #{
+        id => delivery_sup,
+        start => {?MODULE, start_link_delivery_sup, [Name, DeliveryOpts]},
+        type => supervisor,
+        restart => permanent
+    },
+    {ok, {SupFlags, [DeliverySupChildSpec, AggregatorChildSpec]}};
+init({delivery, Name, DeliveryOpts}) ->
+    SupFlags = #{
+        strategy => simple_one_for_one,
+        intensity => 100,
+        period => 5
+    },
+    ChildSpec = #{
+        id => delivery,
+        start => {?MODULE, start_delivery_proc, [Name, DeliveryOpts]},
+        type => worker,
+        restart => temporary,
+        shutdown => 1000
+    },
+    {ok, {SupFlags, [ChildSpec]}}.

+ 485 - 0
apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.erl

@@ -0,0 +1,485 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+%% This module manages buffers for aggregating records and offloads them
+%% to separate "delivery" processes when they are full or time interval
+%% is over.
+-module(emqx_bridge_s3_aggregator).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
+
+-include("emqx_bridge_s3_aggregator.hrl").
+
+-export([
+    start_link/2,
+    push_records/3,
+    tick/2,
+    take_error/1
+]).
+
+-behaviour(gen_server).
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2
+]).
+
+-export_type([
+    record/0,
+    timestamp/0
+]).
+
+%% Record.
+-type record() :: #{binary() => _}.
+
+%% Unix timestamp, seconds since epoch.
+-type timestamp() :: _Seconds :: non_neg_integer().
+
+%%
+
+-define(VSN, 1).
+-define(SRVREF(NAME), {via, gproc, {n, l, {?MODULE, NAME}}}).
+
+%%
+
+start_link(Name, Opts) ->
+    gen_server:start_link(?SRVREF(Name), ?MODULE, mk_state(Name, Opts), []).
+
+push_records(Name, Timestamp, Records = [_ | _]) ->
+    %% FIXME: Error feedback.
+    case pick_buffer(Name, Timestamp) of
+        undefined ->
+            BufferNext = next_buffer(Name, Timestamp),
+            write_records_limited(Name, BufferNext, Records);
+        Buffer ->
+            write_records_limited(Name, Buffer, Records)
+    end;
+push_records(_Name, _Timestamp, []) ->
+    ok.
+
+tick(Name, Timestamp) ->
+    case pick_buffer(Name, Timestamp) of
+        #buffer{} ->
+            ok;
+        _Outdated ->
+            send_close_buffer(Name, Timestamp)
+    end.
+
+take_error(Name) ->
+    gen_server:call(?SRVREF(Name), take_error).
+
+%%
+
+write_records_limited(Name, Buffer = #buffer{max_records = undefined}, Records) ->
+    write_records(Name, Buffer, Records);
+write_records_limited(Name, Buffer = #buffer{max_records = MaxRecords}, Records) ->
+    NR = length(Records),
+    case inc_num_records(Buffer, NR) of
+        NR ->
+            %% NOTE: Allow unconditionally if it's the first write.
+            write_records(Name, Buffer, Records);
+        NWritten when NWritten > MaxRecords ->
+            NextBuffer = rotate_buffer(Name, Buffer),
+            write_records_limited(Name, NextBuffer, Records);
+        _ ->
+            write_records(Name, Buffer, Records)
+    end.
+
+write_records(Name, Buffer = #buffer{fd = Writer}, Records) ->
+    case emqx_bridge_s3_aggreg_buffer:write(Records, Writer) of
+        ok ->
+            ?tp(s3_aggreg_records_written, #{action => Name, records => Records}),
+            ok;
+        {error, Reason} when Reason == terminated orelse Reason == closed ->
+            BufferNext = rotate_buffer(Name, Buffer),
+            write_records(Name, BufferNext, Records);
+        {error, _} = Error ->
+            Error
+    end.
+
+inc_num_records(#buffer{cnt_records = Counter}, Size) ->
+    inc_counter(Counter, Size).
+
+next_buffer(Name, Timestamp) ->
+    gen_server:call(?SRVREF(Name), {next_buffer, Timestamp}).
+
+rotate_buffer(Name, #buffer{fd = FD}) ->
+    gen_server:call(?SRVREF(Name), {rotate_buffer, FD}).
+
+send_close_buffer(Name, Timestamp) ->
+    gen_server:cast(?SRVREF(Name), {close_buffer, Timestamp}).
+
+%%
+
+-record(st, {
+    name :: _Name,
+    tab :: ets:tid() | undefined,
+    buffer :: buffer() | undefined,
+    queued :: buffer() | undefined,
+    deliveries = #{} :: #{reference() => buffer()},
+    errors = queue:new() :: queue:queue(_Error),
+    interval :: emqx_schema:duration_s(),
+    max_records :: pos_integer(),
+    work_dir :: file:filename()
+}).
+
+-type state() :: #st{}.
+
+mk_state(Name, Opts) ->
+    Interval = maps:get(time_interval, Opts),
+    MaxRecords = maps:get(max_records, Opts),
+    WorkDir = maps:get(work_dir, Opts),
+    ok = ensure_workdir(WorkDir),
+    #st{
+        name = Name,
+        interval = Interval,
+        max_records = MaxRecords,
+        work_dir = WorkDir
+    }.
+
+ensure_workdir(WorkDir) ->
+    %% NOTE
+    %% Writing MANIFEST as a means to ensure the work directory is writable. It's not
+    %% (yet) read back because there's only one version of the implementation.
+    ok = filelib:ensure_path(WorkDir),
+    ok = write_manifest(WorkDir).
+
+write_manifest(WorkDir) ->
+    Manifest = #{<<"version">> => ?VSN},
+    file:write_file(filename:join(WorkDir, "MANIFEST"), hocon_pp:do(Manifest, #{})).
+
+%%
+
+-spec init(state()) -> {ok, state()}.
+init(St0 = #st{name = Name}) ->
+    _ = erlang:process_flag(trap_exit, true),
+    St1 = St0#st{tab = create_tab(Name)},
+    St = recover(St1),
+    _ = announce_current_buffer(St),
+    {ok, St}.
+
+handle_call({next_buffer, Timestamp}, _From, St0) ->
+    St = #st{buffer = Buffer} = handle_next_buffer(Timestamp, St0),
+    {reply, Buffer, St, 0};
+handle_call({rotate_buffer, FD}, _From, St0) ->
+    St = #st{buffer = Buffer} = handle_rotate_buffer(FD, St0),
+    {reply, Buffer, St, 0};
+handle_call(take_error, _From, St0) ->
+    {MaybeError, St} = handle_take_error(St0),
+    {reply, MaybeError, St}.
+
+handle_cast({close_buffer, Timestamp}, St) ->
+    {noreply, handle_close_buffer(Timestamp, St)};
+handle_cast(_Cast, St) ->
+    {noreply, St}.
+
+handle_info(timeout, St) ->
+    {noreply, handle_queued_buffer(St)};
+handle_info({'DOWN', MRef, _, Pid, Reason}, St0 = #st{name = Name, deliveries = Ds0}) ->
+    case maps:take(MRef, Ds0) of
+        {Buffer, Ds} ->
+            St = St0#st{deliveries = Ds},
+            {noreply, handle_delivery_exit(Buffer, Reason, St)};
+        error ->
+            ?SLOG(notice, #{
+                msg => "unexpected_down_signal",
+                action => Name,
+                pid => Pid,
+                reason => Reason
+            }),
+            {noreply, St0}
+    end;
+handle_info(_Msg, St) ->
+    {noreply, St}.
+
+terminate(_Reason, #st{name = Name}) ->
+    cleanup_tab(Name).
+
+%%
+
+handle_next_buffer(Timestamp, St = #st{buffer = #buffer{until = Until}}) when Timestamp < Until ->
+    St;
+handle_next_buffer(Timestamp, St0 = #st{buffer = Buffer = #buffer{since = PrevSince}}) ->
+    BufferClosed = close_buffer(Buffer),
+    St = enqueue_closed_buffer(BufferClosed, St0),
+    handle_next_buffer(Timestamp, PrevSince, St);
+handle_next_buffer(Timestamp, St = #st{buffer = undefined}) ->
+    handle_next_buffer(Timestamp, Timestamp, St).
+
+handle_next_buffer(Timestamp, PrevSince, St0) ->
+    NextBuffer = allocate_next_buffer(Timestamp, PrevSince, St0),
+    St = St0#st{buffer = NextBuffer},
+    _ = announce_current_buffer(St),
+    St.
+
+handle_rotate_buffer(
+    FD,
+    St0 = #st{buffer = Buffer = #buffer{since = Since, seq = Seq, fd = FD}}
+) ->
+    BufferClosed = close_buffer(Buffer),
+    NextBuffer = allocate_buffer(Since, Seq + 1, St0),
+    St = enqueue_closed_buffer(BufferClosed, St0#st{buffer = NextBuffer}),
+    _ = announce_current_buffer(St),
+    St;
+handle_rotate_buffer(_ClosedFD, St) ->
+    St.
+
+enqueue_closed_buffer(Buffer, St = #st{queued = undefined}) ->
+    St#st{queued = Buffer};
+enqueue_closed_buffer(Buffer, St0) ->
+    %% NOTE: Should never really happen unless interval / max records are too tight.
+    St = handle_queued_buffer(St0),
+    St#st{queued = Buffer}.
+
+handle_queued_buffer(St = #st{queued = undefined}) ->
+    St;
+handle_queued_buffer(St = #st{queued = Buffer}) ->
+    enqueue_delivery(Buffer, St#st{queued = undefined}).
+
+allocate_next_buffer(Timestamp, PrevSince, St = #st{interval = Interval}) ->
+    Since = compute_since(Timestamp, PrevSince, Interval),
+    allocate_buffer(Since, 0, St).
+
+compute_since(Timestamp, PrevSince, Interval) ->
+    Timestamp - (Timestamp - PrevSince) rem Interval.
+
+allocate_buffer(Since, Seq, St = #st{name = Name}) ->
+    Buffer = #buffer{filename = Filename, cnt_records = Counter} = mk_buffer(Since, Seq, St),
+    {ok, FD} = file:open(Filename, [write, binary]),
+    Writer = emqx_bridge_s3_aggreg_buffer:new_writer(FD, _Meta = []),
+    _ = add_counter(Counter),
+    ?tp(s3_aggreg_buffer_allocated, #{action => Name, filename => Filename}),
+    Buffer#buffer{fd = Writer}.
+
+recover_buffer(Buffer = #buffer{filename = Filename, cnt_records = Counter}) ->
+    {ok, FD} = file:open(Filename, [read, write, binary]),
+    case recover_buffer_writer(FD, Filename) of
+        {ok, Writer, NWritten} ->
+            _ = add_counter(Counter, NWritten),
+            Buffer#buffer{fd = Writer};
+        {error, Reason} ->
+            ?SLOG(warning, #{
+                msg => "existing_buffer_recovery_failed",
+                filename => Filename,
+                reason => Reason,
+                details => "Buffer is corrupted beyond repair, will be discarded."
+            }),
+            _ = file:close(FD),
+            _ = file:delete(Filename),
+            undefined
+    end.
+
+recover_buffer_writer(FD, Filename) ->
+    try emqx_bridge_s3_aggreg_buffer:new_reader(FD) of
+        {_Meta, Reader} -> recover_buffer_writer(FD, Filename, Reader, 0)
+    catch
+        error:Reason ->
+            {error, Reason}
+    end.
+
+recover_buffer_writer(FD, Filename, Reader0, NWritten) ->
+    try emqx_bridge_s3_aggreg_buffer:read(Reader0) of
+        {Records, Reader} when is_list(Records) ->
+            recover_buffer_writer(FD, Filename, Reader, NWritten + length(Records));
+        {Unexpected, _Reader} ->
+            %% Buffer is corrupted, should be discarded.
+            {error, {buffer_unexpected_record, Unexpected}};
+        eof ->
+            %% Buffer is fine, continue writing at the end.
+            {ok, FD, NWritten}
+    catch
+        error:Reason ->
+            %% Buffer is truncated or corrupted somewhere in the middle.
+            %% Continue writing after the last valid record.
+            ?SLOG(warning, #{
+                msg => "existing_buffer_recovered_partially",
+                filename => Filename,
+                reason => Reason,
+                details =>
+                    "Buffer is truncated or corrupted somewhere in the middle. "
+                    "Corrupted records will be discarded."
+            }),
+            Writer = emqx_bridge_s3_aggreg_buffer:takeover(Reader0),
+            {ok, Writer, NWritten}
+    end.
+
+mk_buffer(
+    Since,
+    Seq,
+    #st{tab = Tab, interval = Interval, max_records = MaxRecords, work_dir = WorkDir}
+) ->
+    Name = mk_filename(Since, Seq),
+    Counter = {Tab, {Since, Seq}},
+    #buffer{
+        since = Since,
+        until = Since + Interval,
+        seq = Seq,
+        filename = filename:join(WorkDir, Name),
+        max_records = MaxRecords,
+        cnt_records = Counter
+    }.
+
+handle_close_buffer(
+    Timestamp,
+    St0 = #st{buffer = Buffer = #buffer{until = Until}}
+) when Timestamp >= Until ->
+    St = St0#st{buffer = undefined},
+    _ = announce_current_buffer(St),
+    enqueue_delivery(close_buffer(Buffer), St);
+handle_close_buffer(_Timestamp, St = #st{buffer = undefined}) ->
+    St.
+
+close_buffer(Buffer = #buffer{fd = FD}) ->
+    ok = file:close(FD),
+    Buffer#buffer{fd = undefined}.
+
+discard_buffer(#buffer{filename = Filename, cnt_records = Counter}) ->
+    %% NOTE: Hopefully, no process is touching this counter anymore.
+    _ = del_counter(Counter),
+    file:delete(Filename).
+
+pick_buffer(Name, Timestamp) ->
+    case lookup_current_buffer(Name) of
+        #buffer{until = Until} = Buffer when Timestamp < Until ->
+            Buffer;
+        #buffer{since = Since} when Timestamp < Since ->
+            %% TODO: Support timestamps going back.
+            error({invalid_timestamp, Timestamp});
+        _Outdated ->
+            undefined
+    end.
+
+announce_current_buffer(#st{tab = Tab, buffer = Buffer}) ->
+    ets:insert(Tab, {buffer, Buffer}).
+
+lookup_current_buffer(Name) ->
+    ets:lookup_element(lookup_tab(Name), buffer, 2).
+
+%%
+
+enqueue_delivery(Buffer, St = #st{name = Name, deliveries = Ds}) ->
+    {ok, Pid} = emqx_bridge_s3_aggreg_upload_sup:start_delivery(Name, Buffer),
+    MRef = erlang:monitor(process, Pid),
+    St#st{deliveries = Ds#{MRef => Buffer}}.
+
+handle_delivery_exit(Buffer, Normal, St = #st{name = Name}) when
+    Normal == normal; Normal == noproc
+->
+    ?SLOG(debug, #{
+        msg => "aggregated_buffer_delivery_completed",
+        action => Name,
+        buffer => Buffer#buffer.filename
+    }),
+    ok = discard_buffer(Buffer),
+    St;
+handle_delivery_exit(Buffer, {shutdown, {skipped, Reason}}, St = #st{name = Name}) ->
+    ?SLOG(info, #{
+        msg => "aggregated_buffer_delivery_skipped",
+        action => Name,
+        buffer => {Buffer#buffer.since, Buffer#buffer.seq},
+        reason => Reason
+    }),
+    ok = discard_buffer(Buffer),
+    St;
+handle_delivery_exit(Buffer, Error, St = #st{name = Name}) ->
+    ?SLOG(error, #{
+        msg => "aggregated_buffer_delivery_failed",
+        action => Name,
+        buffer => {Buffer#buffer.since, Buffer#buffer.seq},
+        filename => Buffer#buffer.filename,
+        reason => Error
+    }),
+    enqueue_status_error(Error, St).
+
+enqueue_status_error({upload_failed, Error}, St = #st{errors = QErrors}) ->
+    %% TODO
+    %% This code feels too specific, errors probably need classification.
+    St#st{errors = queue:in(Error, QErrors)};
+enqueue_status_error(_AnotherError, St) ->
+    St.
+
+handle_take_error(St = #st{errors = QErrors0}) ->
+    case queue:out(QErrors0) of
+        {{value, Error}, QErrors} ->
+            {[Error], St#st{errors = QErrors}};
+        {empty, QErrors} ->
+            {[], St#st{errors = QErrors}}
+    end.
+
+%%
+
+recover(St0 = #st{work_dir = WorkDir}) ->
+    {ok, Filenames} = file:list_dir(WorkDir),
+    ExistingBuffers = lists:flatmap(fun(FN) -> read_existing_file(FN, St0) end, Filenames),
+    case lists:reverse(lists:keysort(#buffer.since, ExistingBuffers)) of
+        [Buffer | ClosedBuffers] ->
+            St = lists:foldl(fun enqueue_delivery/2, St0, ClosedBuffers),
+            St#st{buffer = recover_buffer(Buffer)};
+        [] ->
+            St0
+    end.
+
+read_existing_file("MANIFEST", _St) ->
+    [];
+read_existing_file(Name, St) ->
+    case parse_filename(Name) of
+        {Since, Seq} ->
+            [read_existing_buffer(Since, Seq, Name, St)];
+        error ->
+            %% TODO: log?
+            []
+    end.
+
+read_existing_buffer(Since, Seq, Name, St = #st{work_dir = WorkDir}) ->
+    Filename = filename:join(WorkDir, Name),
+    Buffer = mk_buffer(Since, Seq, St),
+    Buffer#buffer{filename = Filename}.
+
+%%
+
+mk_filename(Since, Seq) ->
+    "T" ++ integer_to_list(Since) ++ "_" ++ pad_number(Seq, 4).
+
+parse_filename(Filename) ->
+    case re:run(Filename, "^T(\\d+)_(\\d+)$", [{capture, all_but_first, list}]) of
+        {match, [Since, Seq]} ->
+            {list_to_integer(Since), list_to_integer(Seq)};
+        nomatch ->
+            error
+    end.
+
+%%
+
+add_counter({Tab, Counter}) ->
+    add_counter({Tab, Counter}, 0).
+
+add_counter({Tab, Counter}, N) ->
+    ets:insert(Tab, {Counter, N}).
+
+inc_counter({Tab, Counter}, Size) ->
+    ets:update_counter(Tab, Counter, {2, Size}).
+
+del_counter({Tab, Counter}) ->
+    ets:delete(Tab, Counter).
+
+%%
+
+create_tab(Name) ->
+    Tab = ets:new(?MODULE, [public, set, {write_concurrency, auto}]),
+    ok = persistent_term:put({?MODULE, Name}, Tab),
+    Tab.
+
+lookup_tab(Name) ->
+    persistent_term:get({?MODULE, Name}).
+
+cleanup_tab(Name) ->
+    persistent_term:erase({?MODULE, Name}).
+
+%%
+
+pad_number(I, L) ->
+    string:pad(integer_to_list(I), L, leading, $0).

+ 15 - 0
apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.hrl

@@ -0,0 +1,15 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-record(buffer, {
+    since :: emqx_bridge_s3_aggregator:timestamp(),
+    until :: emqx_bridge_s3_aggregator:timestamp(),
+    seq :: non_neg_integer(),
+    filename :: file:filename(),
+    fd :: file:io_device() | undefined,
+    max_records :: pos_integer() | undefined,
+    cnt_records :: {ets:tab(), _Counter} | undefined
+}).
+
+-type buffer() :: #buffer{}.

+ 16 - 0
apps/emqx_bridge_s3/src/emqx_bridge_s3_app.erl

@@ -0,0 +1,16 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_s3_app).
+
+-behaviour(application).
+-export([start/2, stop/1]).
+
+%%
+
+start(_StartType, _StartArgs) ->
+    emqx_bridge_s3_sup:start_link().
+
+stop(_State) ->
+    ok.

+ 161 - 22
apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl

@@ -7,6 +7,7 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/trace.hrl").
 -include_lib("emqx_resource/include/emqx_resource.hrl").
+-include("emqx_bridge_s3.hrl").
 
 -behaviour(emqx_resource).
 -export([
@@ -17,7 +18,7 @@
     on_remove_channel/3,
     on_get_channels/1,
     on_query/3,
-    % on_batch_query/3,
+    on_batch_query/3,
     on_get_status/2,
     on_get_channel_status/3
 ]).
@@ -31,12 +32,31 @@
 }.
 
 -type channel_config() :: #{
-    parameters := #{
-        bucket := string(),
-        key := string(),
-        content := string(),
-        acl => emqx_s3:acl()
-    }
+    bridge_type := binary(),
+    parameters := s3_upload_parameters() | s3_aggregated_upload_parameters()
+}.
+
+-type s3_upload_parameters() :: #{
+    bucket := string(),
+    key := string(),
+    content := string(),
+    acl => emqx_s3:acl()
+}.
+
+-type s3_aggregated_upload_parameters() :: #{
+    bucket := string(),
+    key := string(),
+    acl => emqx_s3:acl(),
+    aggregation => #{
+        time_interval := emqx_schema:duration_s(),
+        max_records := pos_integer()
+    },
+    container := #{
+        type := csv,
+        column_order => [string()]
+    },
+    min_part_size := emqx_schema:bytesize(),
+    max_part_size := emqx_schema:bytesize()
 }.
 
 -type channel_state() :: #{
@@ -123,12 +143,13 @@ on_get_status(_InstId, State = #{client_config := Config}) ->
 -spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) ->
     {ok, state()} | {error, _Reason}.
 on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) ->
-    ChannelState = init_channel_state(Config),
+    ChannelState = start_channel(State, Config),
     {ok, State#{channels => Channels#{ChannelId => ChannelState}}}.
 
 -spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) ->
     {ok, state()}.
 on_remove_channel(_InstId, State = #{channels := Channels}, ChannelId) ->
+    ok = stop_channel(maps:get(ChannelId, Channels, undefined)),
     {ok, State#{channels => maps:remove(ChannelId, Channels)}}.
 
 -spec on_get_channels(_InstanceId :: resource_id()) ->
@@ -138,27 +159,122 @@ on_get_channels(InstId) ->
 
 -spec on_get_channel_status(_InstanceId :: resource_id(), channel_id(), state()) ->
     channel_status().
-on_get_channel_status(_InstId, ChannelId, #{channels := Channels}) ->
+on_get_channel_status(_InstId, ChannelId, State = #{channels := Channels}) ->
     case maps:get(ChannelId, Channels, undefined) of
-        _ChannelState = #{} ->
-            %% TODO
-            %% Since bucket name may be templated, we can't really provide any
-            %% additional information regarding the channel health.
-            ?status_connected;
+        ChannelState = #{} ->
+            channel_status(ChannelState, State);
         undefined ->
             ?status_disconnected
     end.
 
-init_channel_state(#{parameters := Parameters}) ->
+start_channel(_State, #{
+    bridge_type := ?BRIDGE_TYPE_UPLOAD,
+    parameters := Parameters = #{
+        bucket := Bucket,
+        key := Key,
+        content := Content
+    }
+}) ->
+    #{
+        type => ?ACTION_UPLOAD,
+        bucket => emqx_template:parse(Bucket),
+        key => emqx_template:parse(Key),
+        content => emqx_template:parse(Content),
+        upload_options => upload_options(Parameters)
+    };
+start_channel(State, #{
+    bridge_type := Type = ?BRIDGE_TYPE_AGGREGATED_UPLOAD,
+    bridge_name := Name,
+    parameters := Parameters = #{
+        aggregation := #{
+            time_interval := TimeInterval,
+            max_records := MaxRecords
+        },
+        container := Container,
+        bucket := Bucket,
+        key := Key
+    }
+}) ->
+    AggregOpts = #{
+        time_interval => TimeInterval,
+        max_records => MaxRecords,
+        work_dir => work_dir(Type, Name)
+    },
+    DeliveryOpts = #{
+        bucket => Bucket,
+        key => emqx_bridge_s3_aggreg_upload:mk_key_template(Key),
+        container => Container,
+        upload_options => upload_options(Parameters),
+        client_config => maps:get(client_config, State),
+        uploader_config => maps:with([min_part_size, max_part_size], Parameters)
+    },
+    _ = emqx_bridge_s3_sup:delete_child({Type, Name}),
+    {ok, SupPid} = emqx_bridge_s3_sup:start_child(#{
+        id => {Type, Name},
+        start => {emqx_bridge_s3_aggreg_upload_sup, start_link, [Name, AggregOpts, DeliveryOpts]},
+        type => supervisor,
+        restart => permanent
+    }),
     #{
-        bucket => emqx_template:parse(maps:get(bucket, Parameters)),
-        key => emqx_template:parse(maps:get(key, Parameters)),
-        content => emqx_template:parse(maps:get(content, Parameters)),
-        upload_options => #{
-            acl => maps:get(acl, Parameters, undefined)
-        }
+        type => ?ACTION_AGGREGATED_UPLOAD,
+        name => Name,
+        bucket => Bucket,
+        supervisor => SupPid,
+        on_stop => fun() -> emqx_bridge_s3_sup:delete_child({Type, Name}) end
     }.
 
+upload_options(Parameters) ->
+    #{acl => maps:get(acl, Parameters, undefined)}.
+
+work_dir(Type, Name) ->
+    filename:join([emqx:data_dir(), bridge, Type, Name]).
+
+stop_channel(#{on_stop := OnStop}) ->
+    OnStop();
+stop_channel(_ChannelState) ->
+    ok.
+
+channel_status(#{type := ?ACTION_UPLOAD}, _State) ->
+    %% TODO
+    %% Since bucket name may be templated, we can't really provide any additional
+    %% information regarding the channel health.
+    ?status_connected;
+channel_status(#{type := ?ACTION_AGGREGATED_UPLOAD, name := Name, bucket := Bucket}, State) ->
+    %% NOTE: This will effectively trigger uploads of buffers yet to be uploaded.
+    Timestamp = erlang:system_time(second),
+    ok = emqx_bridge_s3_aggregator:tick(Name, Timestamp),
+    ok = check_bucket_accessible(Bucket, State),
+    ok = check_aggreg_upload_errors(Name),
+    ?status_connected.
+
+check_bucket_accessible(Bucket, #{client_config := Config}) ->
+    case emqx_s3_client:aws_config(Config) of
+        {error, Reason} ->
+            throw({unhealthy_target, Reason});
+        AWSConfig ->
+            try erlcloud_s3:list_objects(Bucket, [{max_keys, 1}], AWSConfig) of
+                Props when is_list(Props) ->
+                    ok
+            catch
+                error:{aws_error, {http_error, 404, _, _Reason}} ->
+                    throw({unhealthy_target, "Bucket does not exist"});
+                error:{aws_error, {socket_error, Reason}} ->
+                    throw({unhealthy_target, emqx_utils:format(Reason)})
+            end
+    end.
+
+check_aggreg_upload_errors(Name) ->
+    case emqx_bridge_s3_aggregator:take_error(Name) of
+        [Error] ->
+            %% TODO
+            %% This approach means that, for example, 3 upload failures will cause
+            %% the channel to be marked as unhealthy for 3 consecutive health checks.
+            ErrorMessage = emqx_utils:format(Error),
+            throw({unhealthy_target, ErrorMessage});
+        [] ->
+            ok
+    end.
+
 %% Queries
 
 -type query() :: {_Tag :: channel_id(), _Data :: emqx_jsonish:t()}.
@@ -167,8 +283,21 @@ init_channel_state(#{parameters := Parameters}) ->
     {ok, _Result} | {error, _Reason}.
 on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels}) ->
     case maps:get(Tag, Channels, undefined) of
-        ChannelState = #{} ->
+        ChannelState = #{type := ?ACTION_UPLOAD} ->
             run_simple_upload(InstId, Tag, Data, ChannelState, Config);
+        ChannelState = #{type := ?ACTION_AGGREGATED_UPLOAD} ->
+            run_aggregated_upload(InstId, [Data], ChannelState);
+        undefined ->
+            {error, {unrecoverable_error, {invalid_message_tag, Tag}}}
+    end.
+
+-spec on_batch_query(_InstanceId :: resource_id(), [query()], state()) ->
+    {ok, _Result} | {error, _Reason}.
+on_batch_query(InstId, [{Tag, Data0} | Rest], #{channels := Channels}) ->
+    case maps:get(Tag, Channels, undefined) of
+        ChannelState = #{type := ?ACTION_AGGREGATED_UPLOAD} ->
+            Records = [Data0 | [Data || {_, Data} <- Rest]],
+            run_aggregated_upload(InstId, Records, ChannelState);
         undefined ->
             {error, {unrecoverable_error, {invalid_message_tag, Tag}}}
     end.
@@ -206,6 +335,16 @@ run_simple_upload(
             {error, map_error(Reason)}
     end.
 
+run_aggregated_upload(InstId, Records, #{name := Name}) ->
+    Timestamp = erlang:system_time(second),
+    case emqx_bridge_s3_aggregator:push_records(Name, Timestamp, Records) of
+        ok ->
+            ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => Name}),
+            ok;
+        {error, Reason} ->
+            {error, {unrecoverable_error, Reason}}
+    end.
+
 map_error({socket_error, _} = Reason) ->
     {recoverable_error, Reason};
 map_error(Reason = {aws_error, Status, _, _Body}) when Status >= 500 ->

+ 1 - 1
apps/emqx_bridge_s3/src/emqx_bridge_s3_connector_info.erl

@@ -20,7 +20,7 @@ type_name() ->
     s3.
 
 bridge_types() ->
-    [s3].
+    [s3, s3_aggregated_upload].
 
 resource_callback_module() ->
     emqx_bridge_s3_connector.

+ 42 - 0
apps/emqx_bridge_s3/src/emqx_bridge_s3_sup.erl

@@ -0,0 +1,42 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_s3_sup).
+
+-export([
+    start_link/0,
+    start_child/1,
+    delete_child/1
+]).
+
+-behaviour(supervisor).
+-export([init/1]).
+
+-define(SUPREF, ?MODULE).
+
+%%
+
+start_link() ->
+    supervisor:start_link({local, ?SUPREF}, ?MODULE, root).
+
+start_child(ChildSpec) ->
+    supervisor:start_child(?SUPREF, ChildSpec).
+
+delete_child(ChildId) ->
+    case supervisor:terminate_child(?SUPREF, ChildId) of
+        ok ->
+            supervisor:delete_child(?SUPREF, ChildId);
+        Error ->
+            Error
+    end.
+
+%%
+
+init(root) ->
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 1,
+        period => 1
+    },
+    {ok, {SupFlags, []}}.

+ 142 - 0
apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl

@@ -0,0 +1,142 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_s3_upload).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include("emqx_bridge_s3.hrl").
+
+-define(ACTION, ?ACTION_UPLOAD).
+
+-behaviour(hocon_schema).
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1
+]).
+
+-export([
+    bridge_v2_examples/1
+]).
+
+%%-------------------------------------------------------------------------------------------------
+%% `hocon_schema' API
+%%-------------------------------------------------------------------------------------------------
+
+namespace() ->
+    "bridge_s3".
+
+roots() ->
+    [].
+
+fields(Field) when
+    Field == "get_bridge_v2";
+    Field == "put_bridge_v2";
+    Field == "post_bridge_v2"
+->
+    emqx_bridge_v2_schema:api_fields(Field, ?ACTION, fields(?ACTION));
+fields(action) ->
+    {?ACTION,
+        hoconsc:mk(
+            hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)),
+            #{
+                desc => <<"S3 Upload Action Config">>,
+                required => false
+            }
+        )};
+fields(?ACTION) ->
+    emqx_bridge_v2_schema:make_producer_action_schema(
+        hoconsc:mk(
+            ?R_REF(s3_upload_parameters),
+            #{
+                required => true,
+                desc => ?DESC(s3_upload)
+            }
+        ),
+        #{
+            resource_opts_ref => ?R_REF(s3_action_resource_opts)
+        }
+    );
+fields(s3_upload_parameters) ->
+    emqx_s3_schema:fields(s3_upload) ++
+        [
+            {content,
+                hoconsc:mk(
+                    emqx_schema:template(),
+                    #{
+                        required => false,
+                        default => <<"${.}">>,
+                        desc => ?DESC(s3_object_content)
+                    }
+                )}
+        ];
+fields(s3_action_resource_opts) ->
+    UnsupportedOpts = [batch_size, batch_time],
+    lists:filter(
+        fun({N, _}) -> not lists:member(N, UnsupportedOpts) end,
+        emqx_bridge_v2_schema:action_resource_opts_fields()
+    ).
+
+desc(?ACTION) ->
+    ?DESC(s3_upload);
+desc(s3_upload) ->
+    ?DESC(s3_upload);
+desc(s3_upload_parameters) ->
+    ?DESC(s3_upload_parameters);
+desc(s3_action_resource_opts) ->
+    ?DESC(emqx_resource_schema, resource_opts);
+desc(_Name) ->
+    undefined.
+
+%% Examples
+
+bridge_v2_examples(Method) ->
+    [
+        #{
+            <<"s3">> => #{
+                summary => <<"S3 Simple Upload">>,
+                value => s3_upload_action_example(Method)
+            }
+        }
+    ].
+
+s3_upload_action_example(post) ->
+    maps:merge(
+        s3_upload_action_example(put),
+        #{
+            type => atom_to_binary(?ACTION_UPLOAD),
+            name => <<"my_s3_action">>
+        }
+    );
+s3_upload_action_example(get) ->
+    maps:merge(
+        s3_upload_action_example(put),
+        #{
+            status => <<"connected">>,
+            node_status => [
+                #{
+                    node => <<"emqx@localhost">>,
+                    status => <<"connected">>
+                }
+            ]
+        }
+    );
+s3_upload_action_example(put) ->
+    #{
+        enable => true,
+        connector => <<"my_s3_connector">>,
+        description => <<"My action">>,
+        parameters => #{
+            bucket => <<"${clientid}">>,
+            key => <<"${topic}">>,
+            content => <<"${payload}">>,
+            acl => <<"public_read">>
+        },
+        resource_opts => #{
+            query_mode => <<"sync">>,
+            inflight_window => 10
+        }
+    }.

+ 5 - 3
apps/emqx_bridge_s3/src/emqx_bridge_s3_action_info.erl

@@ -2,18 +2,20 @@
 %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 
--module(emqx_bridge_s3_action_info).
+-module(emqx_bridge_s3_upload_action_info).
 
 -behaviour(emqx_action_info).
 
+-include("emqx_bridge_s3.hrl").
+
 -export([
     action_type_name/0,
     connector_type_name/0,
     schema_module/0
 ]).
 
-action_type_name() -> s3.
+action_type_name() -> ?ACTION_UPLOAD.
 
 connector_type_name() -> s3.
 
-schema_module() -> emqx_bridge_s3.
+schema_module() -> emqx_bridge_s3_upload.

+ 47 - 65
apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl

@@ -11,8 +11,6 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/test_macros.hrl").
 
--import(emqx_utils_conv, [bin/1]).
-
 %% See `emqx_bridge_s3.hrl`.
 -define(BRIDGE_TYPE, <<"s3">>).
 -define(CONNECTOR_TYPE, <<"s3">>).
@@ -79,67 +77,56 @@ end_per_testcase(_TestCase, _Config) ->
 
 connector_config(Name, _Config) ->
     BaseConf = emqx_s3_test_helpers:base_raw_config(tcp),
-    parse_and_check_config(<<"connectors">>, ?CONNECTOR_TYPE, Name, #{
-        <<"enable">> => true,
-        <<"description">> => <<"S3 Connector">>,
-        <<"host">> => emqx_utils_conv:bin(maps:get(<<"host">>, BaseConf)),
-        <<"port">> => maps:get(<<"port">>, BaseConf),
-        <<"access_key_id">> => maps:get(<<"access_key_id">>, BaseConf),
-        <<"secret_access_key">> => maps:get(<<"secret_access_key">>, BaseConf),
-        <<"transport_options">> => #{
-            <<"headers">> => #{
-                <<"content-type">> => <<?CONTENT_TYPE>>
+    emqx_bridge_s3_test_helpers:parse_and_check_config(
+        <<"connectors">>, ?CONNECTOR_TYPE, Name, #{
+            <<"enable">> => true,
+            <<"description">> => <<"S3 Connector">>,
+            <<"host">> => emqx_utils_conv:bin(maps:get(<<"host">>, BaseConf)),
+            <<"port">> => maps:get(<<"port">>, BaseConf),
+            <<"access_key_id">> => maps:get(<<"access_key_id">>, BaseConf),
+            <<"secret_access_key">> => maps:get(<<"secret_access_key">>, BaseConf),
+            <<"transport_options">> => #{
+                <<"headers">> => #{
+                    <<"content-type">> => <<?CONTENT_TYPE>>
+                },
+                <<"connect_timeout">> => <<"500ms">>,
+                <<"request_timeout">> => <<"1s">>,
+                <<"pool_size">> => 4,
+                <<"max_retries">> => 0,
+                <<"enable_pipelining">> => 1
             },
-            <<"connect_timeout">> => <<"500ms">>,
-            <<"request_timeout">> => <<"1s">>,
-            <<"pool_size">> => 4,
-            <<"max_retries">> => 0,
-            <<"enable_pipelining">> => 1
-        },
-        <<"resource_opts">> => #{
-            <<"health_check_interval">> => <<"5s">>,
-            <<"start_timeout">> => <<"5s">>
+            <<"resource_opts">> => #{
+                <<"health_check_interval">> => <<"5s">>,
+                <<"start_timeout">> => <<"5s">>
+            }
         }
-    }).
+    ).
 
 action_config(Name, ConnectorId) ->
-    parse_and_check_config(<<"actions">>, ?BRIDGE_TYPE, Name, #{
-        <<"enable">> => true,
-        <<"connector">> => ConnectorId,
-        <<"parameters">> => #{
-            <<"bucket">> => <<"${clientid}">>,
-            <<"key">> => <<"${topic}">>,
-            <<"content">> => <<"${payload}">>,
-            <<"acl">> => <<"public_read">>
-        },
-        <<"resource_opts">> => #{
-            <<"buffer_mode">> => <<"memory_only">>,
-            <<"buffer_seg_bytes">> => <<"10MB">>,
-            <<"health_check_interval">> => <<"3s">>,
-            <<"inflight_window">> => 40,
-            <<"max_buffer_bytes">> => <<"256MB">>,
-            <<"metrics_flush_interval">> => <<"1s">>,
-            <<"query_mode">> => <<"sync">>,
-            <<"request_ttl">> => <<"60s">>,
-            <<"resume_interval">> => <<"3s">>,
-            <<"worker_pool_size">> => <<"4">>
+    emqx_bridge_s3_test_helpers:parse_and_check_config(
+        <<"actions">>, ?BRIDGE_TYPE, Name, #{
+            <<"enable">> => true,
+            <<"connector">> => ConnectorId,
+            <<"parameters">> => #{
+                <<"bucket">> => <<"${clientid}">>,
+                <<"key">> => <<"${topic}">>,
+                <<"content">> => <<"${payload}">>,
+                <<"acl">> => <<"public_read">>
+            },
+            <<"resource_opts">> => #{
+                <<"buffer_mode">> => <<"memory_only">>,
+                <<"buffer_seg_bytes">> => <<"10MB">>,
+                <<"health_check_interval">> => <<"3s">>,
+                <<"inflight_window">> => 40,
+                <<"max_buffer_bytes">> => <<"256MB">>,
+                <<"metrics_flush_interval">> => <<"1s">>,
+                <<"query_mode">> => <<"sync">>,
+                <<"request_ttl">> => <<"60s">>,
+                <<"resume_interval">> => <<"3s">>,
+                <<"worker_pool_size">> => <<"4">>
+            }
         }
-    }).
-
-parse_and_check_config(Root, Type, Name, ConfigIn) ->
-    Schema =
-        case Root of
-            <<"connectors">> -> emqx_connector_schema;
-            <<"actions">> -> emqx_bridge_v2_schema
-        end,
-    #{Root := #{Type := #{Name := Config}}} =
-        hocon_tconf:check_plain(
-            Schema,
-            #{Root => #{Type => #{Name => ConfigIn}}},
-            #{required => false, atom_key => false}
-        ),
-    ct:pal("parsed config: ~p", [Config]),
-    ConfigIn.
+    ).
 
 t_start_stop(Config) ->
     emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped).
@@ -190,7 +177,7 @@ t_sync_query(Config) ->
     ok = erlcloud_s3:create_bucket(Bucket, AwsConfig),
     ok = emqx_bridge_v2_testlib:t_sync_query(
         Config,
-        fun() -> mk_message(Bucket, Topic, Payload) end,
+        fun() -> emqx_bridge_s3_test_helpers:mk_message_event(Bucket, Topic, Payload) end,
         fun(Res) -> ?assertMatch(ok, Res) end,
         s3_bridge_connector_upload_ok
     ),
@@ -224,15 +211,10 @@ t_query_retry_recoverable(Config) ->
         heal_failure,
         [timeout, ?PROXY_NAME, ProxyHost, ProxyPort]
     ),
-    Message = mk_message(Bucket, Topic, Payload),
+    Message = emqx_bridge_s3_test_helpers:mk_message_event(Bucket, Topic, Payload),
     %% Verify that the message is sent eventually.
     ok = emqx_bridge_v2:send_message(?BRIDGE_TYPE, BridgeName, Message, #{}),
     ?assertMatch(
         #{content := Payload},
         maps:from_list(erlcloud_s3:get_object(Bucket, Topic, AwsConfig))
     ).
-
-mk_message(ClientId, Topic, Payload) ->
-    Message = emqx_message:make(bin(ClientId), bin(Topic), Payload),
-    {Event, _} = emqx_rule_events:eventmsg_publish(Message),
-    Event.

+ 181 - 0
apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_buffer_SUITE.erl

@@ -0,0 +1,181 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_s3_aggreg_buffer_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+%% CT Setup
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    WorkDir = emqx_cth_suite:work_dir(Config),
+    ok = filelib:ensure_path(WorkDir),
+    [{work_dir, WorkDir} | Config].
+
+end_per_suite(_Config) ->
+    ok.
+
+%% Testcases
+
+t_write_read_cycle(Config) ->
+    Filename = mk_filename(?FUNCTION_NAME, Config),
+    Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}},
+    {ok, WFD} = file:open(Filename, [write, binary]),
+    Writer = emqx_bridge_s3_aggreg_buffer:new_writer(WFD, Metadata),
+    Terms = [
+        [],
+        [[[[[[[[]]]]]]]],
+        123,
+        lists:seq(1, 100),
+        lists:seq(1, 1000),
+        lists:seq(1, 10000),
+        lists:seq(1, 100000),
+        #{<<"id">> => 123456789, <<"ts">> => <<"2028-02-29T12:34:56Z">>, <<"gauge">> => 42.42},
+        {<<"text/plain">>, _Huge = rand:bytes(1048576)},
+        {<<"application/json">>, emqx_utils_json:encode(#{j => <<"son">>, null => null})}
+    ],
+    ok = lists:foreach(
+        fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer)) end,
+        Terms
+    ),
+    ok = file:close(WFD),
+    {ok, RFD} = file:open(Filename, [read, binary, raw]),
+    {MetadataRead, Reader} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD),
+    ?assertEqual(Metadata, MetadataRead),
+    TermsRead = read_until_eof(Reader),
+    ?assertEqual(Terms, TermsRead).
+
+t_read_empty(Config) ->
+    Filename = mk_filename(?FUNCTION_NAME, Config),
+    {ok, WFD} = file:open(Filename, [write, binary]),
+    ok = file:close(WFD),
+    {ok, RFD} = file:open(Filename, [read, binary]),
+    ?assertError(
+        {buffer_incomplete, header},
+        emqx_bridge_s3_aggreg_buffer:new_reader(RFD)
+    ).
+
+t_read_garbage(Config) ->
+    Filename = mk_filename(?FUNCTION_NAME, Config),
+    {ok, WFD} = file:open(Filename, [write, binary]),
+    ok = file:write(WFD, rand:bytes(1048576)),
+    ok = file:close(WFD),
+    {ok, RFD} = file:open(Filename, [read, binary]),
+    ?assertError(
+        badarg,
+        emqx_bridge_s3_aggreg_buffer:new_reader(RFD)
+    ).
+
+t_read_truncated(Config) ->
+    Filename = mk_filename(?FUNCTION_NAME, Config),
+    {ok, WFD} = file:open(Filename, [write, binary]),
+    Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}},
+    Writer = emqx_bridge_s3_aggreg_buffer:new_writer(WFD, Metadata),
+    Terms = [
+        [[[[[[[[[[[]]]]]]]]]]],
+        lists:seq(1, 100000),
+        #{<<"id">> => 123456789, <<"ts">> => <<"2029-02-30T12:34:56Z">>, <<"gauge">> => 42.42},
+        {<<"text/plain">>, _Huge = rand:bytes(1048576)}
+    ],
+    LastTerm =
+        {<<"application/json">>, emqx_utils_json:encode(#{j => <<"son">>, null => null})},
+    ok = lists:foreach(
+        fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer)) end,
+        Terms
+    ),
+    {ok, WPos} = file:position(WFD, cur),
+    ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(LastTerm, Writer)),
+    ok = file:close(WFD),
+    ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, WPos + 1),
+    {ok, RFD1} = file:open(Filename, [read, binary]),
+    {Metadata, Reader0} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD1),
+    {ReadTerms1, Reader1} = read_terms(length(Terms), Reader0),
+    ?assertEqual(Terms, ReadTerms1),
+    ?assertError(
+        badarg,
+        emqx_bridge_s3_aggreg_buffer:read(Reader1)
+    ),
+    ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, WPos div 2),
+    {ok, RFD2} = file:open(Filename, [read, binary]),
+    {Metadata, Reader2} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD2),
+    {ReadTerms2, Reader3} = read_terms(_FitsInto = 3, Reader2),
+    ?assertEqual(lists:sublist(Terms, 3), ReadTerms2),
+    ?assertError(
+        badarg,
+        emqx_bridge_s3_aggreg_buffer:read(Reader3)
+    ).
+
+t_read_truncated_takeover_write(Config) ->
+    Filename = mk_filename(?FUNCTION_NAME, Config),
+    {ok, WFD} = file:open(Filename, [write, binary]),
+    Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}},
+    Writer1 = emqx_bridge_s3_aggreg_buffer:new_writer(WFD, Metadata),
+    Terms1 = [
+        [[[[[[[[[[[]]]]]]]]]]],
+        lists:seq(1, 10000),
+        lists:duplicate(1000, ?FUNCTION_NAME),
+        {<<"text/plain">>, _Huge = rand:bytes(1048576)}
+    ],
+    Terms2 = [
+        {<<"application/json">>, emqx_utils_json:encode(#{j => <<"son">>, null => null})},
+        {<<"application/x-octet-stream">>, rand:bytes(102400)}
+    ],
+    ok = lists:foreach(
+        fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer1)) end,
+        Terms1
+    ),
+    {ok, WPos} = file:position(WFD, cur),
+    ok = file:close(WFD),
+    ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, WPos div 2),
+    {ok, RWFD} = file:open(Filename, [read, write, binary]),
+    {Metadata, Reader0} = emqx_bridge_s3_aggreg_buffer:new_reader(RWFD),
+    {ReadTerms1, Reader1} = read_terms(_Survived = 3, Reader0),
+    ?assertEqual(
+        lists:sublist(Terms1, 3),
+        ReadTerms1
+    ),
+    ?assertError(
+        badarg,
+        emqx_bridge_s3_aggreg_buffer:read(Reader1)
+    ),
+    Writer2 = emqx_bridge_s3_aggreg_buffer:takeover(Reader1),
+    ok = lists:foreach(
+        fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer2)) end,
+        Terms2
+    ),
+    ok = file:close(RWFD),
+    {ok, RFD} = file:open(Filename, [read, binary]),
+    {Metadata, Reader2} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD),
+    ReadTerms2 = read_until_eof(Reader2),
+    ?assertEqual(
+        lists:sublist(Terms1, 3) ++ Terms2,
+        ReadTerms2
+    ).
+
+%%
+
+mk_filename(Name, Config) ->
+    filename:join(?config(work_dir, Config), Name).
+
+read_terms(0, Reader) ->
+    {[], Reader};
+read_terms(N, Reader0) ->
+    {Term, Reader1} = emqx_bridge_s3_aggreg_buffer:read(Reader0),
+    {Terms, Reader} = read_terms(N - 1, Reader1),
+    {[Term | Terms], Reader}.
+
+read_until_eof(Reader0) ->
+    case emqx_bridge_s3_aggreg_buffer:read(Reader0) of
+        {Term, Reader} ->
+            [Term | read_until_eof(Reader)];
+        eof ->
+            []
+    end.

+ 72 - 0
apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_csv_tests.erl

@@ -0,0 +1,72 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_s3_aggreg_csv_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+encoding_test() ->
+    CSV = emqx_bridge_s3_aggreg_csv:new(#{}),
+    ?assertEqual(
+        "A,B,Ç\n"
+        "1.2345,string,0.0\n"
+        "0.3333333333,\"[]\",-0.0\n"
+        "111111,🫠,0.0\n"
+        "111.111,\"\"\"quoted\"\"\",\"line\r\nbreak\"\n"
+        "222.222,,\n",
+        fill_close(CSV, [
+            [
+                #{<<"A">> => 1.2345, <<"B">> => "string", <<"Ç"/utf8>> => +0.0},
+                #{<<"A">> => 1 / 3, <<"B">> => "[]", <<"Ç"/utf8>> => -0.0},
+                #{<<"A">> => 111111, <<"B">> => "🫠", <<"Ç"/utf8>> => 0.0},
+                #{<<"A">> => 111.111, <<"B">> => "\"quoted\"", <<"Ç"/utf8>> => "line\r\nbreak"},
+                #{<<"A">> => 222.222, <<"B">> => "", <<"Ç"/utf8>> => undefined}
+            ]
+        ])
+    ).
+
+column_order_test() ->
+    Order = [<<"ID">>, <<"TS">>],
+    CSV = emqx_bridge_s3_aggreg_csv:new(#{column_order => Order}),
+    ?assertEqual(
+        "ID,TS,A,B,D\n"
+        "1,2024-01-01,12.34,str,\"[]\"\n"
+        "2,2024-01-02,23.45,ing,\n"
+        "3,,45,,'\n"
+        "4,2024-01-04,,,\n",
+        fill_close(CSV, [
+            [
+                #{
+                    <<"A">> => 12.34,
+                    <<"B">> => "str",
+                    <<"ID">> => 1,
+                    <<"TS">> => "2024-01-01",
+                    <<"D">> => <<"[]">>
+                },
+                #{
+                    <<"TS">> => "2024-01-02",
+                    <<"C">> => <<"null">>,
+                    <<"ID">> => 2,
+                    <<"A">> => 23.45,
+                    <<"B">> => "ing"
+                }
+            ],
+            [
+                #{<<"A">> => 45, <<"D">> => <<"'">>, <<"ID">> => 3},
+                #{<<"ID">> => 4, <<"TS">> => "2024-01-04"}
+            ]
+        ])
+    ).
+
+fill_close(CSV, LRecords) ->
+    string(fill_close_(CSV, LRecords)).
+
+fill_close_(CSV0, [Records | LRest]) ->
+    {Writes, CSV} = emqx_bridge_s3_aggreg_csv:fill(Records, CSV0),
+    [Writes | fill_close_(CSV, LRest)];
+fill_close_(CSV, []) ->
+    [emqx_bridge_s3_aggreg_csv:close(CSV)].
+
+string(Writes) ->
+    unicode:characters_to_list(Writes).

+ 372 - 0
apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl

@@ -0,0 +1,372 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_s3_aggreg_upload_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/test_macros.hrl").
+
+-import(emqx_utils_conv, [bin/1]).
+
+%% See `emqx_bridge_s3.hrl`.
+-define(BRIDGE_TYPE, <<"s3_aggregated_upload">>).
+-define(CONNECTOR_TYPE, <<"s3">>).
+
+-define(PROXY_NAME, "minio_tcp").
+
+-define(CONF_TIME_INTERVAL, 4000).
+-define(CONF_MAX_RECORDS, 100).
+-define(CONF_COLUMN_ORDER, ?CONF_COLUMN_ORDER([])).
+-define(CONF_COLUMN_ORDER(T), [
+    <<"publish_received_at">>,
+    <<"clientid">>,
+    <<"topic">>,
+    <<"payload">>
+    | T
+]).
+
+-define(LIMIT_TOLERANCE, 1.1).
+
+%% CT Setup
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    % Setup toxiproxy
+    ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
+    ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
+    _ = emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    Apps = emqx_cth_suite:start(
+        [
+            emqx,
+            emqx_conf,
+            emqx_connector,
+            emqx_bridge_s3,
+            emqx_bridge,
+            emqx_rule_engine,
+            emqx_management,
+            {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    {ok, _} = emqx_common_test_http:create_default_app(),
+    [
+        {apps, Apps},
+        {proxy_host, ProxyHost},
+        {proxy_port, ProxyPort},
+        {proxy_name, ?PROXY_NAME}
+        | Config
+    ].
+
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(apps, Config)).
+
+%% Testcases
+
+init_per_testcase(TestCase, Config) ->
+    ct:timetrap(timer:seconds(10)),
+    ok = snabbkaffe:start_trace(),
+    TS = erlang:system_time(),
+    Name = iolist_to_binary(io_lib:format("~s-~p", [TestCase, TS])),
+    Bucket = unicode:characters_to_list(string:replace(Name, "_", "-", all)),
+    ConnectorConfig = connector_config(Name, Config),
+    ActionConfig = action_config(Name, Name, Bucket),
+    ok = emqx_bridge_s3_test_helpers:create_bucket(Bucket),
+    [
+        {connector_type, ?CONNECTOR_TYPE},
+        {connector_name, Name},
+        {connector_config, ConnectorConfig},
+        {bridge_type, ?BRIDGE_TYPE},
+        {bridge_name, Name},
+        {bridge_config, ActionConfig},
+        {s3_bucket, Bucket}
+        | Config
+    ].
+
+end_per_testcase(_TestCase, _Config) ->
+    ok = snabbkaffe:stop(),
+    ok.
+
+connector_config(Name, _Config) ->
+    BaseConf = emqx_s3_test_helpers:base_raw_config(tcp),
+    emqx_bridge_s3_test_helpers:parse_and_check_config(
+        <<"connectors">>, ?CONNECTOR_TYPE, Name, #{
+            <<"enable">> => true,
+            <<"description">> => <<"S3 Connector">>,
+            <<"host">> => emqx_utils_conv:bin(maps:get(<<"host">>, BaseConf)),
+            <<"port">> => maps:get(<<"port">>, BaseConf),
+            <<"access_key_id">> => maps:get(<<"access_key_id">>, BaseConf),
+            <<"secret_access_key">> => maps:get(<<"secret_access_key">>, BaseConf),
+            <<"transport_options">> => #{
+                <<"connect_timeout">> => <<"500ms">>,
+                <<"request_timeout">> => <<"1s">>,
+                <<"pool_size">> => 4,
+                <<"max_retries">> => 0
+            },
+            <<"resource_opts">> => #{
+                <<"health_check_interval">> => <<"1s">>
+            }
+        }
+    ).
+
+action_config(Name, ConnectorId, Bucket) ->
+    emqx_bridge_s3_test_helpers:parse_and_check_config(
+        <<"actions">>, ?BRIDGE_TYPE, Name, #{
+            <<"enable">> => true,
+            <<"connector">> => ConnectorId,
+            <<"parameters">> => #{
+                <<"bucket">> => unicode:characters_to_binary(Bucket),
+                <<"key">> => <<"${action}/${node}/${datetime.rfc3339}">>,
+                <<"acl">> => <<"public_read">>,
+                <<"aggregation">> => #{
+                    <<"time_interval">> => <<"4s">>,
+                    <<"max_records">> => ?CONF_MAX_RECORDS
+                },
+                <<"container">> => #{
+                    <<"type">> => <<"csv">>,
+                    <<"column_order">> => ?CONF_COLUMN_ORDER
+                }
+            },
+            <<"resource_opts">> => #{
+                <<"health_check_interval">> => <<"1s">>,
+                <<"max_buffer_bytes">> => <<"64MB">>,
+                <<"query_mode">> => <<"async">>,
+                <<"worker_pool_size">> => 4
+            }
+        }
+    ).
+
+t_start_stop(Config) ->
+    emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped).
+
+t_create_via_http(Config) ->
+    emqx_bridge_v2_testlib:t_create_via_http(Config).
+
+t_on_get_status(Config) ->
+    emqx_bridge_v2_testlib:t_on_get_status(Config, #{}).
+
+t_aggreg_upload(Config) ->
+    Bucket = ?config(s3_bucket, Config),
+    BridgeName = ?config(bridge_name, Config),
+    BridgeNameString = unicode:characters_to_list(BridgeName),
+    NodeString = atom_to_list(node()),
+    %% Create a bridge with the sample configuration.
+    ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
+    %% Prepare some sample messages that look like Rule SQL productions.
+    MessageEvents = lists:map(fun mk_message_event/1, [
+        {<<"C1">>, T1 = <<"a/b/c">>, P1 = <<"{\"hello\":\"world\"}">>},
+        {<<"C2">>, T2 = <<"foo/bar">>, P2 = <<"baz">>},
+        {<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>}
+    ]),
+    ok = send_messages(BridgeName, MessageEvents),
+    %% Wait until the delivery is completed.
+    ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}),
+    %% Check the uploaded objects.
+    _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
+    ?assertMatch(
+        [BridgeNameString, NodeString, _Datetime, _Seq = "0"],
+        string:split(Key, "/", all)
+    ),
+    _Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key),
+    %% Verify that column order is respected.
+    ?assertMatch(
+        {ok, [
+            ?CONF_COLUMN_ORDER(_),
+            [TS, <<"C1">>, T1, P1 | _],
+            [TS, <<"C2">>, T2, P2 | _],
+            [TS, <<"C3">>, T3, P3 | _]
+        ]},
+        erl_csv:decode(Content)
+    ).
+
+t_aggreg_upload_restart(Config) ->
+    %% NOTE
+    %% This test verifies that the bridge will reuse existing aggregation buffer
+    %% after a restart.
+    Bucket = ?config(s3_bucket, Config),
+    BridgeName = ?config(bridge_name, Config),
+    %% Create a bridge with the sample configuration.
+    ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
+    %% Send some sample messages that look like Rule SQL productions.
+    MessageEvents = lists:map(fun mk_message_event/1, [
+        {<<"C1">>, T1 = <<"a/b/c">>, P1 = <<"{\"hello\":\"world\"}">>},
+        {<<"C2">>, T2 = <<"foo/bar">>, P2 = <<"baz">>},
+        {<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>}
+    ]),
+    ok = send_messages(BridgeName, MessageEvents),
+    {ok, _} = ?block_until(#{?snk_kind := s3_aggreg_records_written, action := BridgeName}),
+    %% Restart the bridge.
+    {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName),
+    {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
+    %% Send some more messages.
+    ok = send_messages(BridgeName, MessageEvents),
+    {ok, _} = ?block_until(#{?snk_kind := s3_aggreg_records_written, action := BridgeName}),
+    %% Wait until the delivery is completed.
+    {ok, _} = ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}),
+    %% Check there's still only one upload.
+    _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
+    _Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key),
+    %% Verify that column order is respected.
+    ?assertMatch(
+        {ok, [
+            _Header = [_ | _],
+            [TS1, <<"C1">>, T1, P1 | _],
+            [TS1, <<"C2">>, T2, P2 | _],
+            [TS1, <<"C3">>, T3, P3 | _],
+            [TS2, <<"C1">>, T1, P1 | _],
+            [TS2, <<"C2">>, T2, P2 | _],
+            [TS2, <<"C3">>, T3, P3 | _]
+        ]},
+        erl_csv:decode(Content)
+    ).
+
+t_aggreg_upload_restart_corrupted(Config) ->
+    %% NOTE
+    %% This test verifies that the bridge can recover from a buffer file corruption,
+    %% and does so while preserving uncompromised data.
+    Bucket = ?config(s3_bucket, Config),
+    BridgeName = ?config(bridge_name, Config),
+    BatchSize = ?CONF_MAX_RECORDS div 2,
+    %% Create a bridge with the sample configuration.
+    ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
+    %% Send some sample messages that look like Rule SQL productions.
+    Messages1 = [
+        {integer_to_binary(N), <<"a/b/c">>, <<"{\"hello\":\"world\"}">>}
+     || N <- lists:seq(1, BatchSize)
+    ],
+    %% Ensure that they span multiple batch queries.
+    ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages1), 1),
+    {ok, _} = ?block_until(
+        #{?snk_kind := s3_aggreg_records_written, action := BridgeName},
+        infinity,
+        0
+    ),
+    %% Find out the buffer file.
+    {ok, #{filename := Filename}} = ?block_until(
+        #{?snk_kind := s3_aggreg_buffer_allocated, action := BridgeName}
+    ),
+    %% Stop the bridge, corrupt the buffer file, and restart the bridge.
+    {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName),
+    BufferFileSize = filelib:file_size(Filename),
+    ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, BufferFileSize div 2),
+    {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
+    %% Send some more messages.
+    Messages2 = [
+        {integer_to_binary(N), <<"c/d/e">>, <<"{\"hello\":\"world\"}">>}
+     || N <- lists:seq(1, BatchSize)
+    ],
+    ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages2), 0),
+    %% Wait until the delivery is completed.
+    {ok, _} = ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}),
+    %% Check that upload contains part of the first batch and all of the second batch.
+    _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
+    CSV = [_Header | Rows] = fetch_parse_csv(Bucket, Key),
+    NRows = length(Rows),
+    ?assert(
+        NRows > BatchSize,
+        CSV
+    ),
+    ?assertEqual(
+        lists:sublist(Messages1, NRows - BatchSize) ++ Messages2,
+        [{ClientID, Topic, Payload} || [_TS, ClientID, Topic, Payload | _] <- Rows],
+        CSV
+    ).
+
+t_aggreg_next_rotate(Config) ->
+    %% NOTE
+    %% This is essentially a stress test that tries to verify that buffer rotation
+    %% and windowing work correctly under high rate, high concurrency conditions.
+    Bucket = ?config(s3_bucket, Config),
+    BridgeName = ?config(bridge_name, Config),
+    NSenders = 4,
+    %% Create a bridge with the sample configuration.
+    ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
+    %% Start separate processes to send messages.
+    Senders = [
+        spawn_link(fun() -> run_message_sender(BridgeName, N) end)
+     || N <- lists:seq(1, NSenders)
+    ],
+    %% Give them some time to send messages so that rotation and windowing will happen.
+    ok = timer:sleep(round(?CONF_TIME_INTERVAL * 1.5)),
+    %% Stop the senders.
+    _ = [Sender ! {stop, self()} || Sender <- Senders],
+    NSent = receive_sender_reports(Senders),
+    %% Wait for the last delivery to complete.
+    ok = timer:sleep(round(?CONF_TIME_INTERVAL * 0.5)),
+    ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}, infinity, 0),
+    %% There should be at least 2 time windows of aggregated records.
+    Uploads = [K || #{key := K} <- emqx_bridge_s3_test_helpers:list_objects(Bucket)],
+    DTs = [DT || K <- Uploads, [_Action, _Node, DT | _] <- [string:split(K, "/", all)]],
+    ?assert(
+        ordsets:size(ordsets:from_list(DTs)) > 1,
+        Uploads
+    ),
+    %% Uploads should not contain more than max allowed records.
+    CSVs = [{K, fetch_parse_csv(Bucket, K)} || K <- Uploads],
+    NRecords = [{K, length(CSV) - 1} || {K, CSV} <- CSVs],
+    ?assertEqual(
+        [],
+        [{K, NR} || {K, NR} <- NRecords, NR > ?CONF_MAX_RECORDS * ?LIMIT_TOLERANCE]
+    ),
+    %% No message should be lost.
+    ?assertEqual(
+        NSent,
+        lists:sum([NR || {_, NR} <- NRecords])
+    ).
+
+run_message_sender(BridgeName, N) ->
+    ClientID = integer_to_binary(N),
+    Topic = <<"a/b/c/", ClientID/binary>>,
+    run_message_sender(BridgeName, N, ClientID, Topic, N, 0).
+
+run_message_sender(BridgeName, N, ClientID, Topic, Delay, NSent) ->
+    Payload = integer_to_binary(N * 1_000_000 + NSent),
+    Message = emqx_bridge_s3_test_helpers:mk_message_event(ClientID, Topic, Payload),
+    _ = send_message(BridgeName, Message),
+    receive
+        {stop, From} ->
+            From ! {sent, self(), NSent + 1}
+    after Delay ->
+        run_message_sender(BridgeName, N, ClientID, Topic, Delay, NSent + 1)
+    end.
+
+receive_sender_reports([Sender | Rest]) ->
+    receive
+        {sent, Sender, NSent} -> NSent + receive_sender_reports(Rest)
+    end;
+receive_sender_reports([]) ->
+    0.
+
+%%
+
+mk_message_event({ClientID, Topic, Payload}) ->
+    emqx_bridge_s3_test_helpers:mk_message_event(ClientID, Topic, Payload).
+
+send_messages(BridgeName, MessageEvents) ->
+    lists:foreach(
+        fun(M) -> send_message(BridgeName, M) end,
+        MessageEvents
+    ).
+
+send_messages_delayed(BridgeName, MessageEvents, Delay) ->
+    lists:foreach(
+        fun(M) ->
+            send_message(BridgeName, M),
+            timer:sleep(Delay)
+        end,
+        MessageEvents
+    ).
+
+send_message(BridgeName, Message) ->
+    ?assertEqual(ok, emqx_bridge_v2:send_message(?BRIDGE_TYPE, BridgeName, Message, #{})).
+
+fetch_parse_csv(Bucket, Key) ->
+    #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key),
+    {ok, CSV} = erl_csv:decode(Content),
+    CSV.

+ 52 - 0
apps/emqx_bridge_s3/test/emqx_bridge_s3_test_helpers.erl

@@ -0,0 +1,52 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_s3_test_helpers).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-import(emqx_utils_conv, [bin/1]).
+
+parse_and_check_config(Root, Type, Name, Config) ->
+    Schema =
+        case Root of
+            <<"connectors">> -> emqx_connector_schema;
+            <<"actions">> -> emqx_bridge_v2_schema
+        end,
+    #{Root := #{Type := #{Name := _ConfigParsed}}} =
+        hocon_tconf:check_plain(
+            Schema,
+            #{Root => #{Type => #{Name => Config}}},
+            #{required => false, atom_key => false}
+        ),
+    Config.
+
+mk_message_event(ClientId, Topic, Payload) ->
+    Message = emqx_message:make(bin(ClientId), bin(Topic), Payload),
+    {Event, _} = emqx_rule_events:eventmsg_publish(Message),
+    emqx_utils_maps:binary_key_map(Event).
+
+create_bucket(Bucket) ->
+    AwsConfig = emqx_s3_test_helpers:aws_config(tcp),
+    erlcloud_s3:create_bucket(Bucket, AwsConfig).
+
+list_objects(Bucket) ->
+    AwsConfig = emqx_s3_test_helpers:aws_config(tcp),
+    Response = erlcloud_s3:list_objects(Bucket, AwsConfig),
+    false = proplists:get_value(is_truncated, Response),
+    Contents = proplists:get_value(contents, Response),
+    lists:map(fun maps:from_list/1, Contents).
+
+get_object(Bucket, Key) ->
+    AwsConfig = emqx_s3_test_helpers:aws_config(tcp),
+    maps:from_list(erlcloud_s3:get_object(Bucket, Key, AwsConfig)).
+
+%% File utilities
+
+truncate_at(Filename, Pos) ->
+    {ok, FD} = file:open(Filename, [read, write, binary]),
+    {ok, Pos} = file:position(FD, Pos),
+    ok = file:truncate(FD),
+    ok = file:close(FD).