Explorar o código

feat(s3-aggreg): handle delivery shutdowns gracefully

Andrew Mayorov hai 1 ano
pai
achega
f6e5eea4f7

+ 96 - 46
apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_delivery.erl

@@ -12,11 +12,21 @@
 -export([start_link/3]).
 
 %% Internal exports
--export([run_delivery/3]).
+-export([
+    init/4,
+    loop/3
+]).
 
 -behaviour(emqx_template).
 -export([lookup/2]).
 
+%% Sys
+-export([
+    system_continue/3,
+    system_terminate/4,
+    format_status/2
+]).
+
 -record(delivery, {
     name :: _Name,
     container :: emqx_bridge_s3_aggreg_csv:container(),
@@ -25,19 +35,23 @@
     empty :: boolean()
 }).
 
+-type state() :: #delivery{}.
+
 %%
 
 start_link(Name, Buffer, Opts) ->
-    proc_lib:start_link(?MODULE, run_delivery, [Name, Buffer, Opts]).
+    proc_lib:start_link(?MODULE, init, [self(), Name, Buffer, Opts]).
 
 %%
 
-run_delivery(Name, Buffer, Opts) ->
+-spec init(pid(), _Name, buffer(), _Opts :: map()) -> no_return().
+init(Parent, Name, Buffer, Opts) ->
     ?tp(s3_aggreg_delivery_started, #{action => Name, buffer => Buffer}),
     Reader = open_buffer(Buffer),
     Delivery = init_delivery(Name, Reader, Buffer, Opts#{action => Name}),
+    _ = erlang:process_flag(trap_exit, true),
     ok = proc_lib:init_ack({ok, self()}),
-    loop_deliver(Delivery).
+    loop(Delivery, Parent, []).
 
 init_delivery(Name, Reader, Buffer, Opts = #{container := ContainerOpts}) ->
     #delivery{
@@ -48,43 +62,87 @@ init_delivery(Name, Reader, Buffer, Opts = #{container := ContainerOpts}) ->
         empty = true
     }.
 
-loop_deliver(Delivery = #delivery{reader = Reader0}) ->
+open_buffer(#buffer{filename = Filename}) ->
+    case file:open(Filename, [read, binary, raw]) of
+        {ok, FD} ->
+            {_Meta, Reader} = emqx_bridge_s3_aggreg_buffer:new_reader(FD),
+            Reader;
+        {error, Reason} ->
+            error({buffer_open_failed, Reason})
+    end.
+
+mk_container(#{type := csv, column_order := OrderOpt}) ->
+    %% TODO: Deduplicate?
+    ColumnOrder = lists:map(fun emqx_utils_conv:bin/1, OrderOpt),
+    emqx_bridge_s3_aggreg_csv:new(#{column_order => ColumnOrder}).
+
+mk_upload(
+    Buffer,
+    Opts = #{
+        bucket := Bucket,
+        upload_options := UploadOpts,
+        client_config := Config,
+        uploader_config := UploaderConfig
+    }
+) ->
+    Client = emqx_s3_client:create(Bucket, Config),
+    Key = mk_object_key(Buffer, Opts),
+    emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig).
+
+mk_object_key(Buffer, #{action := Name, key := Template}) ->
+    emqx_template:render_strict(Template, {?MODULE, {Name, Buffer}}).
+
+%%
+
+-spec loop(state(), pid(), [sys:debug_option()]) -> no_return().
+loop(Delivery, Parent, Debug) ->
+    %% NOTE: This function is mocked in tests.
+    receive
+        Msg -> handle_msg(Msg, Delivery, Parent, Debug)
+    after 0 ->
+        process_delivery(Delivery, Parent, Debug)
+    end.
+
+process_delivery(Delivery0 = #delivery{reader = Reader0}, Parent, Debug) ->
     case emqx_bridge_s3_aggreg_buffer:read(Reader0) of
         {Records = [#{} | _], Reader} ->
-            loop_deliver_records(Records, Delivery#delivery{reader = Reader});
+            Delivery1 = Delivery0#delivery{reader = Reader},
+            Delivery2 = process_append_records(Records, Delivery1),
+            Delivery = process_write(Delivery2),
+            loop(Delivery, Parent, Debug);
         {[], Reader} ->
-            loop_deliver(Delivery#delivery{reader = Reader});
+            Delivery = Delivery0#delivery{reader = Reader},
+            loop(Delivery, Parent, Debug);
         eof ->
-            complete_delivery(Delivery);
+            process_complete(Delivery0);
         {Unexpected, _Reader} ->
             exit({buffer_unexpected_record, Unexpected})
     end.
 
-loop_deliver_records(Records, Delivery = #delivery{container = Container0, upload = Upload0}) ->
+process_append_records(Records, Delivery = #delivery{container = Container0, upload = Upload0}) ->
     {Writes, Container} = emqx_bridge_s3_aggreg_csv:fill(Records, Container0),
     {ok, Upload} = emqx_s3_upload:append(Writes, Upload0),
-    loop_deliver_upload(Delivery#delivery{
+    Delivery#delivery{
         container = Container,
         upload = Upload,
         empty = false
-    }).
+    }.
 
-loop_deliver_upload(Delivery = #delivery{upload = Upload0}) ->
+process_write(Delivery = #delivery{upload = Upload0}) ->
     case emqx_s3_upload:write(Upload0) of
         {ok, Upload} ->
-            loop_deliver(Delivery#delivery{upload = Upload});
+            Delivery#delivery{upload = Upload};
         {cont, Upload} ->
-            loop_deliver_upload(Delivery#delivery{upload = Upload});
+            process_write(Delivery#delivery{upload = Upload});
         {error, Reason} ->
-            %% TODO: retries
             _ = emqx_s3_upload:abort(Upload0),
             exit({upload_failed, Reason})
     end.
 
-complete_delivery(#delivery{name = Name, empty = true}) ->
+process_complete(#delivery{name = Name, empty = true}) ->
     ?tp(s3_aggreg_delivery_completed, #{action => Name, upload => empty}),
     exit({shutdown, {skipped, empty}});
-complete_delivery(#delivery{name = Name, container = Container, upload = Upload0}) ->
+process_complete(#delivery{name = Name, container = Container, upload = Upload0}) ->
     Trailer = emqx_bridge_s3_aggreg_csv:close(Container),
     {ok, Upload} = emqx_s3_upload:append(Trailer, Upload0),
     case emqx_s3_upload:complete(Upload) of
@@ -92,40 +150,32 @@ complete_delivery(#delivery{name = Name, container = Container, upload = Upload0
             ?tp(s3_aggreg_delivery_completed, #{action => Name, upload => Completed}),
             ok;
         {error, Reason} ->
-            %% TODO: retries
             _ = emqx_s3_upload:abort(Upload),
             exit({upload_failed, Reason})
     end.
 
-mk_container(#{type := csv, column_order := OrderOpt}) ->
-    %% TODO: Deduplicate?
-    ColumnOrder = lists:map(fun emqx_utils_conv:bin/1, OrderOpt),
-    emqx_bridge_s3_aggreg_csv:new(#{column_order => ColumnOrder}).
-
-mk_upload(
-    Buffer,
-    Opts = #{
-        bucket := Bucket,
-        upload_options := UploadOpts,
-        client_config := Config,
-        uploader_config := UploaderConfig
-    }
-) ->
-    Client = emqx_s3_client:create(Bucket, Config),
-    Key = mk_object_key(Buffer, Opts),
-    emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig).
-
-mk_object_key(Buffer, #{action := Name, key := Template}) ->
-    emqx_template:render_strict(Template, {?MODULE, {Name, Buffer}}).
+%%
 
-open_buffer(#buffer{filename = Filename}) ->
-    case file:open(Filename, [read, binary, raw]) of
-        {ok, FD} ->
-            {_Meta, Reader} = emqx_bridge_s3_aggreg_buffer:new_reader(FD),
-            Reader;
-        {error, Reason} ->
-            error({buffer_open_failed, Reason})
-    end.
+handle_msg({system, From, Msg}, Delivery, Parent, Debug) ->
+    sys:handle_system_msg(Msg, From, Parent, ?MODULE, Debug, Delivery);
+handle_msg({'EXIT', Parent, Reason}, Delivery, Parent, Debug) ->
+    system_terminate(Reason, Parent, Debug, Delivery);
+handle_msg(_Msg, Delivery, Parent, Debug) ->
+    loop(Parent, Debug, Delivery).
+
+-spec system_continue(pid(), [sys:debug_option()], state()) -> no_return().
+system_continue(Parent, Debug, Delivery) ->
+    loop(Delivery, Parent, Debug).
+
+-spec system_terminate(_Reason, pid(), [sys:debug_option()], state()) -> _.
+system_terminate(_Reason, _Parent, _Debug, #delivery{upload = Upload}) ->
+    emqx_s3_upload:abort(Upload).
+
+-spec format_status(normal, Args :: [term()]) -> _StateFormatted.
+format_status(_Normal, [_PDict, _SysState, _Parent, _Debug, Delivery]) ->
+    Delivery#delivery{
+        upload = emqx_s3_upload:format(Delivery#delivery.upload)
+    }.
 
 %%
 

+ 1 - 0
apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.erl

@@ -393,6 +393,7 @@ handle_delivery_exit(Buffer, Error, St = #st{name = Name}) ->
         filename => Buffer#buffer.filename,
         reason => Error
     }),
+    %% TODO: Retries?
     enqueue_status_error(Error, St).
 
 enqueue_status_error({upload_failed, Error}, St = #st{errors = QErrors}) ->

+ 36 - 1
apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl

@@ -219,7 +219,6 @@ t_aggreg_upload_restart(Config) ->
     %% Check there's still only one upload.
     _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
     _Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key),
-    %% Verify that column order is respected.
     ?assertMatch(
         {ok, [
             _Header = [_ | _],
@@ -285,6 +284,42 @@ t_aggreg_upload_restart_corrupted(Config) ->
         CSV
     ).
 
+t_aggreg_pending_upload_restart(Config) ->
+    %% NOTE
+    %% This test verifies that the bridge will finish uploading a buffer file after
+    %% a restart.
+    Bucket = ?config(s3_bucket, Config),
+    BridgeName = ?config(bridge_name, Config),
+    %% Create a bridge with the sample configuration.
+    ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
+    %% Send few large messages that will require multipart upload.
+    %% Ensure that they span multiple batch queries.
+    Payload = iolist_to_binary(lists:duplicate(128 * 1024, "PAYLOAD!")),
+    Messages = [{integer_to_binary(N), <<"a/b/c">>, Payload} || N <- lists:seq(1, 10)],
+    ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages), 10),
+    %% Wait until the multipart upload is started.
+    {ok, #{key := ObjectKey}} =
+        ?block_until(#{?snk_kind := s3_client_multipart_started, bucket := Bucket}),
+    %% Stop the bridge.
+    {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName),
+    %% Verify that pending uploads have been gracefully aborted.
+    %% NOTE: Minio does not support multipart upload listing w/o prefix.
+    ?assertEqual(
+        [],
+        emqx_bridge_s3_test_helpers:list_pending_uploads(Bucket, ObjectKey)
+    ),
+    %% Restart the bridge.
+    {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
+    %% Wait until the delivery is completed.
+    {ok, _} = ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}),
+    %% Check that delivery contains all the messages.
+    _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
+    [_Header | Rows] = fetch_parse_csv(Bucket, Key),
+    ?assertEqual(
+        Messages,
+        [{CID, Topic, PL} || [_TS, CID, Topic, PL | _] <- Rows]
+    ).
+
 t_aggreg_next_rotate(Config) ->
     %% NOTE
     %% This is essentially a stress test that tries to verify that buffer rotation

+ 6 - 0
apps/emqx_bridge_s3/test/emqx_bridge_s3_test_helpers.erl

@@ -43,6 +43,12 @@ get_object(Bucket, Key) ->
     AwsConfig = emqx_s3_test_helpers:aws_config(tcp),
     maps:from_list(erlcloud_s3:get_object(Bucket, Key, AwsConfig)).
 
+list_pending_uploads(Bucket, Key) ->
+    AwsConfig = emqx_s3_test_helpers:aws_config(tcp),
+    {ok, Props} = erlcloud_s3:list_multipart_uploads(Bucket, [{prefix, Key}], [], AwsConfig),
+    Uploads = proplists:get_value(uploads, Props),
+    lists:map(fun maps:from_list/1, Uploads).
+
 %% File utilities
 
 truncate_at(Filename, Pos) ->

+ 18 - 1
apps/emqx_s3/src/emqx_s3_client.erl

@@ -6,6 +6,7 @@
 
 -include_lib("emqx/include/types.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
 -include_lib("erlcloud/include/erlcloud_aws.hrl").
 
 -export([
@@ -133,7 +134,13 @@ start_multipart(
     Headers = join_headers(BaseHeaders, maps:get(headers, UploadOpts, undefined)),
     case erlcloud_s3:start_multipart(Bucket, ECKey, ECOpts, Headers, AwsConfig) of
         {ok, Props} ->
-            {ok, response_property('uploadId', Props)};
+            UploadId = response_property('uploadId', Props),
+            ?tp(s3_client_multipart_started, #{
+                bucket => Bucket,
+                key => Key,
+                upload_id => UploadId
+            }),
+            {ok, UploadId};
         {error, Reason} ->
             ?SLOG(debug, #{msg => "start_multipart_fail", key => Key, reason => Reason}),
             {error, Reason}
@@ -177,6 +184,11 @@ complete_multipart(
         )
     of
         ok ->
+            ?tp(s3_client_multipart_completed, #{
+                bucket => Bucket,
+                key => Key,
+                upload_id => UploadId
+            }),
             ok;
         {error, Reason} ->
             ?SLOG(debug, #{msg => "complete_multipart_fail", key => Key, reason => Reason}),
@@ -193,6 +205,11 @@ abort_multipart(
 ) ->
     case erlcloud_s3:abort_multipart(Bucket, erlcloud_key(Key), UploadId, [], Headers, AwsConfig) of
         ok ->
+            ?tp(s3_client_multipart_aborted, #{
+                bucket => Bucket,
+                key => Key,
+                upload_id => UploadId
+            }),
             ok;
         {error, Reason} ->
             ?SLOG(debug, #{msg => "abort_multipart_fail", key => Key, reason => Reason}),