Преглед изворни кода

feat(s3): separate concepts to make app reusable in bridges

Andrew Mayorov пре 2 година
родитељ
комит
4ff04ab1f3

+ 17 - 17
apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl

@@ -69,11 +69,9 @@
 -spec start_export(options(), transfer(), filemeta()) ->
     {ok, export_st()} | {error, term()}.
 start_export(_Options, Transfer, Filemeta) ->
-    Options = #{
-        key => s3_key(Transfer, Filemeta),
-        headers => s3_headers(Transfer, Filemeta)
-    },
-    case emqx_s3:start_uploader(?S3_PROFILE_ID, Options) of
+    Key = s3_key(Transfer, Filemeta),
+    UploadOpts = #{headers => s3_headers(Transfer, Filemeta)},
+    case emqx_s3:start_uploader(?S3_PROFILE_ID, Key, UploadOpts) of
         {ok, Pid} ->
             true = erlang:link(Pid),
             {ok, #{filemeta => Filemeta, pid => Pid}};
@@ -180,22 +178,24 @@ list_pages(Client, Marker, Limit, Acc) ->
     ListOptions = [{marker, Marker} || Marker =/= undefined],
     case list_key_info(Client, [{max_keys, MaxKeys} | ListOptions]) of
         {ok, {Exports, NextMarker}} ->
-            list_accumulate(Client, Limit, NextMarker, [Exports | Acc]);
+            Left = update_limit(Limit, Exports),
+            NextAcc = [Exports | Acc],
+            case NextMarker of
+                undefined ->
+                    {ok, {flatten_pages(NextAcc), undefined}};
+                _ when Left =< 0 ->
+                    {ok, {flatten_pages(NextAcc), NextMarker}};
+                _ ->
+                    list_pages(Client, NextMarker, Left, NextAcc)
+            end;
         {error, _Reason} = Error ->
             Error
     end.
 
-list_accumulate(_Client, _Limit, undefined, Acc) ->
-    {ok, {flatten_pages(Acc), undefined}};
-list_accumulate(Client, undefined, Marker, Acc) ->
-    list_pages(Client, Marker, undefined, Acc);
-list_accumulate(Client, Limit, Marker, Acc = [Exports | _]) ->
-    case Limit - length(Exports) of
-        0 ->
-            {ok, {flatten_pages(Acc), Marker}};
-        Left ->
-            list_pages(Client, Marker, Left, Acc)
-    end.
+update_limit(undefined, _Exports) ->
+    undefined;
+update_limit(Limit, Exports) ->
+    Limit - length(Exports).
 
 flatten_pages(Pages) ->
     lists:append(lists:reverse(Pages)).

+ 7 - 6
apps/emqx_s3/src/emqx_s3.erl

@@ -10,7 +10,7 @@
     start_profile/2,
     stop_profile/1,
     update_profile/2,
-    start_uploader/2,
+    start_uploader/3,
     with_client/2
 ]).
 
@@ -22,6 +22,7 @@
 -export_type([
     profile_id/0,
     profile_config/0,
+    transport_options/0,
     acl/0
 ]).
 
@@ -81,18 +82,18 @@ stop_profile(ProfileId) when ?IS_PROFILE_ID(ProfileId) ->
 update_profile(ProfileId, ProfileConfig) when ?IS_PROFILE_ID(ProfileId) ->
     emqx_s3_profile_conf:update_config(ProfileId, ProfileConfig).
 
--spec start_uploader(profile_id(), emqx_s3_uploader:opts()) ->
+-spec start_uploader(profile_id(), emqx_s3_client:key(), emqx_s3_client:upload_options()) ->
     emqx_types:startlink_ret() | {error, profile_not_found}.
-start_uploader(ProfileId, Opts) when ?IS_PROFILE_ID(ProfileId) ->
-    emqx_s3_profile_uploader_sup:start_uploader(ProfileId, Opts).
+start_uploader(ProfileId, Key, Props) when ?IS_PROFILE_ID(ProfileId) ->
+    emqx_s3_profile_uploader_sup:start_uploader(ProfileId, Key, Props).
 
 -spec with_client(profile_id(), fun((emqx_s3_client:client()) -> Result)) ->
     {error, profile_not_found} | Result.
 with_client(ProfileId, Fun) when is_function(Fun, 1) andalso ?IS_PROFILE_ID(ProfileId) ->
     case emqx_s3_profile_conf:checkout_config(ProfileId) of
-        {ok, ClientConfig, _UploadConfig} ->
+        {Bucket, ClientConfig, _UploadOpts, _UploadConfig} ->
             try
-                Fun(emqx_s3_client:create(ClientConfig))
+                Fun(emqx_s3_client:create(Bucket, ClientConfig))
             after
                 emqx_s3_profile_conf:checkin_config(ProfileId)
             end;

+ 37 - 30
apps/emqx_s3/src/emqx_s3_client.erl

@@ -9,12 +9,11 @@
 -include_lib("erlcloud/include/erlcloud_aws.hrl").
 
 -export([
-    create/1,
+    create/2,
 
     put_object/3,
     put_object/4,
 
-    start_multipart/2,
     start_multipart/3,
     upload_part/5,
     complete_multipart/4,
@@ -26,10 +25,15 @@
     format_request/1
 ]).
 
+%% For connectors
+-export([aws_config/1]).
+
 -export_type([
     client/0,
     headers/0,
+    bucket/0,
     key/0,
+    upload_options/0,
     upload_id/0,
     etag/0,
     part_number/0,
@@ -39,18 +43,17 @@
 -type headers() :: #{binary() | string() => iodata()}.
 -type erlcloud_headers() :: list({string(), iodata()}).
 
+-type bucket() :: string().
 -type key() :: string().
 -type part_number() :: non_neg_integer().
 -type upload_id() :: string().
 -type etag() :: string().
 -type http_pool() :: ehttpc:pool_name().
 -type pool_type() :: random | hash.
--type upload_options() :: list({acl, emqx_s3:acl()}).
 
 -opaque client() :: #{
     aws_config := aws_config(),
-    upload_options := upload_options(),
-    bucket := string(),
+    bucket := bucket(),
     headers := erlcloud_headers(),
     url_expire_time := non_neg_integer(),
     pool_type := pool_type()
@@ -60,9 +63,7 @@
     scheme := string(),
     host := string(),
     port := part_number(),
-    bucket := string(),
     headers := headers(),
-    acl := emqx_s3:acl() | undefined,
     url_expire_time := pos_integer(),
     access_key_id := string() | undefined,
     secret_access_key := emqx_secret:t(string()) | undefined,
@@ -72,6 +73,11 @@
     max_retries := non_neg_integer() | undefined
 }.
 
+-type upload_options() :: #{
+    acl => emqx_s3:acl() | undefined,
+    headers => headers()
+}.
+
 -type s3_options() :: proplists:proplist().
 
 -define(DEFAULT_REQUEST_TIMEOUT, 30000).
@@ -81,12 +87,11 @@
 %% API
 %%--------------------------------------------------------------------
 
--spec create(config()) -> client().
-create(Config) ->
+-spec create(bucket(), config()) -> client().
+create(Bucket, Config) ->
     #{
         aws_config => aws_config(Config),
-        upload_options => upload_options(Config),
-        bucket => maps:get(bucket, Config),
+        bucket => Bucket,
         url_expire_time => maps:get(url_expire_time, Config),
         headers => headers(Config),
         pool_type => maps:get(pool_type, Config)
@@ -94,17 +99,19 @@ create(Config) ->
 
 -spec put_object(client(), key(), iodata()) -> ok_or_error(term()).
 put_object(Client, Key, Value) ->
-    put_object(Client, #{}, Key, Value).
+    put_object(Client, Key, #{}, Value).
 
--spec put_object(client(), headers(), key(), iodata()) -> ok_or_error(term()).
+-spec put_object(client(), key(), upload_options(), iodata()) -> ok_or_error(term()).
 put_object(
-    #{bucket := Bucket, upload_options := Options, headers := Headers, aws_config := AwsConfig},
-    SpecialHeaders,
+    #{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig},
     Key,
-    Value
+    UploadOpts,
+    Content
 ) ->
-    AllHeaders = join_headers(Headers, SpecialHeaders),
-    try erlcloud_s3:put_object(Bucket, erlcloud_key(Key), Value, Options, AllHeaders, AwsConfig) of
+    ECKey = erlcloud_key(Key),
+    ECOpts = erlcloud_upload_options(UploadOpts),
+    Headers = join_headers(BaseHeaders, maps:get(headers, UploadOpts, undefined)),
+    try erlcloud_s3:put_object(Bucket, ECKey, Content, ECOpts, Headers, AwsConfig) of
         Props when is_list(Props) ->
             ok
     catch
@@ -113,18 +120,16 @@ put_object(
             {error, Reason}
     end.
 
--spec start_multipart(client(), key()) -> ok_or_error(upload_id(), term()).
-start_multipart(Client, Key) ->
-    start_multipart(Client, #{}, Key).
-
--spec start_multipart(client(), headers(), key()) -> ok_or_error(upload_id(), term()).
+-spec start_multipart(client(), key(), upload_options()) -> ok_or_error(upload_id(), term()).
 start_multipart(
-    #{bucket := Bucket, upload_options := Options, headers := Headers, aws_config := AwsConfig},
-    SpecialHeaders,
-    Key
+    #{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig},
+    Key,
+    UploadOpts
 ) ->
-    AllHeaders = join_headers(Headers, SpecialHeaders),
-    case erlcloud_s3:start_multipart(Bucket, erlcloud_key(Key), Options, AllHeaders, AwsConfig) of
+    ECKey = erlcloud_key(Key),
+    ECOpts = erlcloud_upload_options(UploadOpts),
+    Headers = join_headers(BaseHeaders, maps:get(headers, UploadOpts, undefined)),
+    case erlcloud_s3:start_multipart(Bucket, ECKey, ECOpts, Headers, AwsConfig) of
         {ok, Props} ->
             {ok, response_property('uploadId', Props)};
         {error, Reason} ->
@@ -204,11 +209,11 @@ format(#{aws_config := AwsConfig} = Client) ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
-upload_options(#{acl := Acl}) when Acl =/= undefined ->
+erlcloud_upload_options(#{acl := Acl}) when Acl =/= undefined ->
     [
         {acl, Acl}
     ];
-upload_options(#{}) ->
+erlcloud_upload_options(#{}) ->
     [].
 
 headers(#{headers := Headers}) ->
@@ -401,6 +406,8 @@ headers_ehttpc_to_erlcloud_response(EhttpcHeaders) ->
 headers_erlcloud_request_to_ehttpc(ErlcloudHeaders) ->
     [{to_binary(K), V} || {K, V} <- ErlcloudHeaders].
 
+join_headers(ErlcloudHeaders, undefined) ->
+    ErlcloudHeaders;
 join_headers(ErlcloudHeaders, UserSpecialHeaders) ->
     ErlcloudHeaders ++ headers_user_to_erlcloud_request(UserSpecialHeaders).
 

+ 28 - 6
apps/emqx_s3/src/emqx_s3_profile_conf.erl

@@ -37,13 +37,25 @@
     code_change/3
 ]).
 
-%% For test purposes
+%% For connectors
 -export([
     client_config/2,
+    http_config/1
+]).
+
+%% For test purposes
+-export([
     start_http_pool/2,
     id/1
 ]).
 
+-type config_checkout() :: {
+    emqx_s3_client:bucket(),
+    emqx_s3_client:config(),
+    emqx_s3_client:upload_options(),
+    emqx_s3_uploader:config()
+}.
+
 -define(DEFAULT_CALL_TIMEOUT, 5000).
 
 -define(DEFAULT_HTTP_POOL_TIMEOUT, 60000).
@@ -78,12 +90,12 @@ update_config(ProfileId, ProfileConfig, Timeout) ->
     ?SAFE_CALL_VIA_GPROC(ProfileId, {update_config, ProfileConfig}, Timeout).
 
 -spec checkout_config(emqx_s3:profile_id()) ->
-    {ok, emqx_s3_client:config(), emqx_s3_uploader:config()} | {error, profile_not_found}.
+    config_checkout() | {error, profile_not_found}.
 checkout_config(ProfileId) ->
     checkout_config(ProfileId, ?DEFAULT_CALL_TIMEOUT).
 
 -spec checkout_config(emqx_s3:profile_id(), timeout()) ->
-    {ok, emqx_s3_client:config(), emqx_s3_uploader:config()} | {error, profile_not_found}.
+    config_checkout() | {error, profile_not_found}.
 checkout_config(ProfileId, Timeout) ->
     ?SAFE_CALL_VIA_GPROC(ProfileId, {checkout_config, self()}, Timeout).
 
@@ -108,6 +120,8 @@ init([ProfileId, ProfileConfig]) ->
             {ok, #{
                 profile_id => ProfileId,
                 profile_config => ProfileConfig,
+                bucket => bucket(ProfileConfig),
+                upload_options => upload_options(ProfileConfig),
                 client_config => client_config(ProfileConfig, PoolName),
                 uploader_config => uploader_config(ProfileConfig),
                 pool_name => PoolName,
@@ -128,12 +142,14 @@ handle_call(
     {checkout_config, Pid},
     _From,
     #{
+        bucket := Bucket,
+        upload_options := Options,
         client_config := ClientConfig,
         uploader_config := UploaderConfig
     } = State
 ) ->
     ok = register_client(Pid, State),
-    {reply, {ok, ClientConfig, UploaderConfig}, State};
+    {reply, {Bucket, ClientConfig, Options, UploaderConfig}, State};
 handle_call({checkin_config, Pid}, _From, State) ->
     ok = unregister_client(Pid, State),
     {reply, ok, State};
@@ -146,6 +162,8 @@ handle_call(
         {ok, PoolName} ->
             NewState = State#{
                 profile_config => NewProfileConfig,
+                bucket => bucket(NewProfileConfig),
+                upload_options => upload_options(NewProfileConfig),
                 client_config => client_config(NewProfileConfig, PoolName),
                 uploader_config => uploader_config(NewProfileConfig),
                 http_pool_timeout => http_pool_timeout(NewProfileConfig),
@@ -198,8 +216,6 @@ client_config(ProfileConfig, PoolName) ->
         port => maps:get(port, ProfileConfig),
         url_expire_time => maps:get(url_expire_time, ProfileConfig),
         headers => maps:get(headers, HTTPOpts, #{}),
-        acl => maps:get(acl, ProfileConfig, undefined),
-        bucket => maps:get(bucket, ProfileConfig),
         access_key_id => maps:get(access_key_id, ProfileConfig, undefined),
         secret_access_key => maps:get(secret_access_key, ProfileConfig, undefined),
         request_timeout => maps:get(request_timeout, HTTPOpts, undefined),
@@ -214,6 +230,12 @@ uploader_config(#{max_part_size := MaxPartSize, min_part_size := MinPartSize} =
         max_part_size => MaxPartSize
     }.
 
+bucket(ProfileConfig) ->
+    maps:get(bucket, ProfileConfig).
+
+upload_options(ProfileConfig) ->
+    #{acl => maps:get(acl, ProfileConfig, undefined)}.
+
 scheme(#{ssl := #{enable := true}}) -> "https://";
 scheme(_TransportOpts) -> "http://".
 

+ 4 - 4
apps/emqx_s3/src/emqx_s3_profile_uploader_sup.erl

@@ -15,7 +15,7 @@
     start_link/1,
     child_spec/1,
     id/1,
-    start_uploader/2
+    start_uploader/3
 ]).
 
 -export([init/1]).
@@ -43,10 +43,10 @@ child_spec(ProfileId) ->
 id(ProfileId) ->
     {?MODULE, ProfileId}.
 
--spec start_uploader(emqx_s3:profile_id(), emqx_s3_uploader:opts()) ->
+-spec start_uploader(emqx_s3:profile_id(), emqx_s3_client:key(), emqx_s3_client:upload_options()) ->
     emqx_types:startlink_ret() | {error, profile_not_found}.
-start_uploader(ProfileId, Opts) ->
-    try supervisor:start_child(?VIA_GPROC(id(ProfileId)), [Opts]) of
+start_uploader(ProfileId, Key, UploadOpts) ->
+    try supervisor:start_child(?VIA_GPROC(id(ProfileId)), [Key, UploadOpts]) of
         Result -> Result
     catch
         exit:{noproc, _} -> {error, profile_not_found}

+ 57 - 29
apps/emqx_s3/src/emqx_s3_schema.erl

@@ -23,6 +23,13 @@ tags() ->
     [<<"S3">>].
 
 fields(s3) ->
+    lists:append([
+        fields(s3_client),
+        fields(s3_uploader),
+        fields(s3_url_options),
+        props_with([bucket, acl], fields(s3_upload))
+    ]);
+fields(s3_client) ->
     [
         {access_key_id,
             mk(
@@ -38,14 +45,6 @@ fields(s3) ->
                     desc => ?DESC("secret_access_key")
                 }
             )},
-        {bucket,
-            mk(
-                string(),
-                #{
-                    desc => ?DESC("bucket"),
-                    required => true
-                }
-            )},
         {host,
             mk(
                 string(),
@@ -62,34 +61,31 @@ fields(s3) ->
                     required => true
                 }
             )},
-        {url_expire_time,
+        {transport_options,
             mk(
-                %% not used in a `receive ... after' block, just timestamp comparison
-                emqx_schema:duration_s(),
+                ref(?MODULE, transport_options),
                 #{
-                    default => <<"1h">>,
-                    desc => ?DESC("url_expire_time"),
+                    desc => ?DESC("transport_options"),
                     required => false
                 }
-            )},
-        {min_part_size,
+            )}
+    ];
+fields(s3_upload) ->
+    [
+        {bucket,
             mk(
-                emqx_schema:bytesize(),
+                string(),
                 #{
-                    default => <<"5mb">>,
-                    desc => ?DESC("min_part_size"),
-                    required => true,
-                    validator => fun part_size_validator/1
+                    desc => ?DESC("bucket"),
+                    required => true
                 }
             )},
-        {max_part_size,
+        {key,
             mk(
-                emqx_schema:bytesize(),
+                string(),
                 #{
-                    default => <<"5gb">>,
-                    desc => ?DESC("max_part_size"),
-                    required => true,
-                    validator => fun part_size_validator/1
+                    desc => ?DESC("key"),
+                    required => true
                 }
             )},
         {acl,
@@ -106,12 +102,40 @@ fields(s3) ->
                     desc => ?DESC("acl"),
                     required => false
                 }
+            )}
+    ];
+fields(s3_uploader) ->
+    [
+        {min_part_size,
+            mk(
+                emqx_schema:bytesize(),
+                #{
+                    default => <<"5mb">>,
+                    desc => ?DESC("min_part_size"),
+                    required => true,
+                    validator => fun part_size_validator/1
+                }
             )},
-        {transport_options,
+        {max_part_size,
             mk(
-                ref(?MODULE, transport_options),
+                emqx_schema:bytesize(),
                 #{
-                    desc => ?DESC("transport_options"),
+                    default => <<"5gb">>,
+                    desc => ?DESC("max_part_size"),
+                    required => true,
+                    validator => fun part_size_validator/1
+                }
+            )}
+    ];
+fields(s3_url_options) ->
+    [
+        {url_expire_time,
+            mk(
+                %% not used in a `receive ... after' block, just timestamp comparison
+                emqx_schema:duration_s(),
+                #{
+                    default => <<"1h">>,
+                    desc => ?DESC("url_expire_time"),
                     required => false
                 }
             )}
@@ -138,6 +162,10 @@ fields(transport_options) ->
 
 desc(s3) ->
     "S3 connection options";
+desc(s3_client) ->
+    "S3 connection options";
+desc(s3_upload) ->
+    "S3 upload options";
 desc(transport_options) ->
     "Options for the HTTP transport layer used by the S3 client".
 

+ 32 - 35
apps/emqx_s3/src/emqx_s3_uploader.erl

@@ -9,7 +9,7 @@
 -behaviour(gen_statem).
 
 -export([
-    start_link/2,
+    start_link/3,
 
     write/2,
     write/3,
@@ -33,30 +33,25 @@
     format_status/2
 ]).
 
--export_type([opts/0, config/0]).
+-export_type([config/0]).
 
 -type config() :: #{
     min_part_size => pos_integer(),
     max_part_size => pos_integer()
 }.
 
--type opts() :: #{
-    key := string(),
-    headers => emqx_s3_client:headers()
-}.
-
 -type data() :: #{
-    profile_id := emqx_s3:profile_id(),
+    profile_id => emqx_s3:profile_id(),
     client := emqx_s3_client:client(),
     key := emqx_s3_client:key(),
+    upload_opts := emqx_s3_client:upload_options(),
     buffer := iodata(),
     buffer_size := non_neg_integer(),
     min_part_size := pos_integer(),
     max_part_size := pos_integer(),
     upload_id := undefined | emqx_s3_client:upload_id(),
     etags := [emqx_s3_client:etag()],
-    part_number := emqx_s3_client:part_number(),
-    headers := emqx_s3_client:headers()
+    part_number := emqx_s3_client:part_number()
 }.
 
 %% 5MB
@@ -66,9 +61,10 @@
 
 -define(DEFAULT_TIMEOUT, 30000).
 
--spec start_link(emqx_s3:profile_id(), opts()) -> gen_statem:start_ret().
-start_link(ProfileId, #{key := Key} = Opts) when is_list(Key) ->
-    gen_statem:start_link(?MODULE, [ProfileId, Opts], []).
+-spec start_link(emqx_s3:profile_id(), emqx_s3_client:key(), emqx_s3_client:upload_options()) ->
+    gen_statem:start_ret().
+start_link(ProfileId, Key, UploadOpts) when is_list(Key) ->
+    gen_statem:start_link(?MODULE, {profile, ProfileId, Key, UploadOpts}, []).
 
 -spec write(pid(), iodata()) -> ok_or_error(term()).
 write(Pid, WriteData) ->
@@ -105,19 +101,23 @@ shutdown(Pid) ->
 
 callback_mode() -> handle_event_function.
 
-init([ProfileId, #{key := Key} = Opts]) ->
-    process_flag(trap_exit, true),
-    {ok, ClientConfig, UploaderConfig} = emqx_s3_profile_conf:checkout_config(ProfileId),
-    Client = client(ClientConfig),
-    {ok, upload_not_started, #{
+init({profile, ProfileId, Key, UploadOpts}) ->
+    {Bucket, ClientConfig, BaseOpts, UploaderConfig} =
+        emqx_s3_profile_conf:checkout_config(ProfileId),
+    Upload = #{
         profile_id => ProfileId,
-        client => Client,
-        headers => maps:get(headers, Opts, #{}),
+        client => client(Bucket, ClientConfig),
         key => Key,
+        upload_opts => maps:merge(BaseOpts, UploadOpts)
+    },
+    init({upload, UploaderConfig, Upload});
+init({upload, Config, Upload}) ->
+    process_flag(trap_exit, true),
+    {ok, upload_not_started, Upload#{
         buffer => [],
         buffer_size => 0,
-        min_part_size => maps:get(min_part_size, UploaderConfig, ?DEFAULT_MIN_PART_SIZE),
-        max_part_size => maps:get(max_part_size, UploaderConfig, ?DEFAULT_MAX_PART_SIZE),
+        min_part_size => maps:get(min_part_size, Config, ?DEFAULT_MIN_PART_SIZE),
+        max_part_size => maps:get(max_part_size, Config, ?DEFAULT_MAX_PART_SIZE),
         upload_id => undefined,
         etags => [],
         part_number => 1
@@ -221,8 +221,8 @@ maybe_start_upload(#{buffer_size := BufferSize, min_part_size := MinPartSize} =
     end.
 
 -spec start_upload(data()) -> {started, data()} | {error, term()}.
-start_upload(#{client := Client, key := Key, headers := Headers} = Data) ->
-    case emqx_s3_client:start_multipart(Client, Headers, Key) of
+start_upload(#{client := Client, key := Key, upload_opts := UploadOpts} = Data) ->
+    case emqx_s3_client:start_multipart(Client, Key, UploadOpts) of
         {ok, UploadId} ->
             NewData = Data#{upload_id => UploadId},
             {started, NewData};
@@ -274,12 +274,9 @@ complete_upload(
     } = Data0
 ) ->
     case upload_part(Data0) of
-        {ok, #{etags := ETags} = Data1} ->
-            case
-                emqx_s3_client:complete_multipart(
-                    Client, Key, UploadId, lists:reverse(ETags)
-                )
-            of
+        {ok, #{etags := ETagsRev} = Data1} ->
+            ETags = lists:reverse(ETagsRev),
+            case emqx_s3_client:complete_multipart(Client, Key, UploadId, ETags) of
                 ok ->
                     {ok, Data1};
                 {error, _} = Error ->
@@ -309,11 +306,11 @@ put_object(
     #{
         client := Client,
         key := Key,
-        buffer := Buffer,
-        headers := Headers
+        upload_opts := UploadOpts,
+        buffer := Buffer
     }
 ) ->
-    case emqx_s3_client:put_object(Client, Headers, Key, Buffer) of
+    case emqx_s3_client:put_object(Client, Key, UploadOpts, Buffer) of
         ok ->
             ok;
         {error, _} = Error ->
@@ -337,5 +334,5 @@ unwrap(WrappedData) ->
 is_valid_part(WriteData, #{max_part_size := MaxPartSize, buffer_size := BufferSize}) ->
     BufferSize + iolist_size(WriteData) =< MaxPartSize.
 
-client(Config) ->
-    emqx_s3_client:create(Config).
+client(Bucket, Config) ->
+    emqx_s3_client:create(Bucket, Config).

+ 9 - 11
apps/emqx_s3/test/emqx_s3_client_SUITE.erl

@@ -79,7 +79,7 @@ t_multipart_upload(Config) ->
 
     Client = client(Config),
 
-    {ok, UploadId} = emqx_s3_client:start_multipart(Client, Key),
+    {ok, UploadId} = emqx_s3_client:start_multipart(Client, Key, #{}),
 
     Data = data(6_000_000),
 
@@ -97,7 +97,7 @@ t_simple_put(Config) ->
 
     Data = data(6_000_000),
 
-    ok = emqx_s3_client:put_object(Client, Key, Data).
+    ok = emqx_s3_client:put_object(Client, Key, #{acl => private}, Data).
 
 t_list(Config) ->
     Key = ?config(key, Config),
@@ -123,7 +123,7 @@ t_url(Config) ->
     Key = ?config(key, Config),
 
     Client = client(Config),
-    ok = emqx_s3_client:put_object(Client, Key, <<"data">>),
+    ok = emqx_s3_client:put_object(Client, Key, #{acl => public_read}, <<"data">>),
 
     Url = emqx_s3_client:uri(Client, Key),
 
@@ -135,20 +135,18 @@ t_url(Config) ->
 t_no_acl(Config) ->
     Key = ?config(key, Config),
 
-    ClientConfig = emqx_s3_profile_conf:client_config(
-        profile_config(Config), ?config(ehttpc_pool_name, Config)
-    ),
-    Client = emqx_s3_client:create(maps:without([acl], ClientConfig)),
+    Client = client(Config),
 
-    ok = emqx_s3_client:put_object(Client, Key, <<"data">>).
+    ok = emqx_s3_client:put_object(Client, Key, #{}, <<"data">>).
 
 t_extra_headers(Config0) ->
     Config = [{extra_headers, #{'Content-Type' => <<"application/json">>}} | Config0],
     Key = ?config(key, Config),
 
     Client = client(Config),
+    Opts = #{acl => public_read},
     Data = #{foo => bar},
-    ok = emqx_s3_client:put_object(Client, Key, emqx_utils_json:encode(Data)),
+    ok = emqx_s3_client:put_object(Client, Key, Opts, emqx_utils_json:encode(Data)),
 
     Url = emqx_s3_client:uri(Client, Key),
 
@@ -164,10 +162,11 @@ t_extra_headers(Config0) ->
 %%--------------------------------------------------------------------
 
 client(Config) ->
+    Bucket = ?config(bucket, Config),
     ClientConfig = emqx_s3_profile_conf:client_config(
         profile_config(Config), ?config(ehttpc_pool_name, Config)
     ),
-    emqx_s3_client:create(ClientConfig).
+    emqx_s3_client:create(Bucket, ClientConfig).
 
 profile_config(Config) ->
     ProfileConfig0 = emqx_s3_test_helpers:base_config(?config(conn_type, Config)),
@@ -175,7 +174,6 @@ profile_config(Config) ->
         fun inject_config/3,
         ProfileConfig0,
         #{
-            bucket => ?config(bucket, Config),
             [transport_options, pool_type] => ?config(pool_type, Config),
             [transport_options, headers] => ?config(extra_headers, Config)
         }

+ 2 - 2
apps/emqx_s3/test/emqx_s3_profile_conf_SUITE.erl

@@ -46,7 +46,7 @@ end_per_testcase(_TestCase, _Config) ->
 t_regular_outdated_pool_cleanup(Config) ->
     _ = process_flag(trap_exit, true),
     Key = emqx_s3_test_helpers:unique_key(),
-    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
 
     [OldPool] = emqx_s3_profile_http_pools:all(profile_id()),
 
@@ -94,7 +94,7 @@ t_timeout_pool_cleanup(Config) ->
 
     %% Start uploader
     Key = emqx_s3_test_helpers:unique_key(),
-    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
     ok = emqx_s3_uploader:write(Pid, <<"data">>),
 
     [OldPool] = emqx_s3_profile_http_pools:all(profile_id()),

+ 17 - 17
apps/emqx_s3/test/emqx_s3_uploader_SUITE.erl

@@ -133,7 +133,7 @@ end_per_testcase(_TestCase, _Config) ->
 
 t_happy_path_simple_put(Config) ->
     Key = emqx_s3_test_helpers:unique_key(),
-    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
 
     _ = erlang:monitor(process, Pid),
 
@@ -165,7 +165,7 @@ t_happy_path_simple_put(Config) ->
 
 t_happy_path_multi(Config) ->
     Key = emqx_s3_test_helpers:unique_key(),
-    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
 
     _ = erlang:monitor(process, Pid),
 
@@ -233,7 +233,7 @@ t_signed_nonascii_url_download(_Config) ->
 
 t_abort_multi(Config) ->
     Key = emqx_s3_test_helpers:unique_key(),
-    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
 
     _ = erlang:monitor(process, Pid),
 
@@ -260,7 +260,7 @@ t_abort_multi(Config) ->
 
 t_abort_simple_put(_Config) ->
     Key = emqx_s3_test_helpers:unique_key(),
-    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
 
     _ = erlang:monitor(process, Pid),
 
@@ -278,7 +278,7 @@ t_abort_simple_put(_Config) ->
 t_config_switch(Config) ->
     Key = emqx_s3_test_helpers:unique_key(),
     OldBucket = ?config(bucket, Config),
-    {ok, Pid0} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+    {ok, Pid0} = emqx_s3:start_uploader(profile_id(), Key, #{}),
 
     [Data0, Data1] = data($a, 6 * 1024 * 1024, 2),
 
@@ -304,7 +304,7 @@ t_config_switch(Config) ->
     ),
 
     %% Now check that new uploader uses new config
-    {ok, Pid1} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+    {ok, Pid1} = emqx_s3:start_uploader(profile_id(), Key, #{}),
     ok = emqx_s3_uploader:write(Pid1, Data0),
     ok = emqx_s3_uploader:complete(Pid1),
 
@@ -318,7 +318,7 @@ t_config_switch(Config) ->
 t_config_switch_http_settings(Config) ->
     Key = emqx_s3_test_helpers:unique_key(),
     OldBucket = ?config(bucket, Config),
-    {ok, Pid0} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+    {ok, Pid0} = emqx_s3:start_uploader(profile_id(), Key, #{}),
 
     [Data0, Data1] = data($a, 6 * 1024 * 1024, 2),
 
@@ -345,7 +345,7 @@ t_config_switch_http_settings(Config) ->
     ),
 
     %% Now check that new uploader uses new config
-    {ok, Pid1} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+    {ok, Pid1} = emqx_s3:start_uploader(profile_id(), Key, #{}),
     ok = emqx_s3_uploader:write(Pid1, Data0),
     ok = emqx_s3_uploader:complete(Pid1),
 
@@ -360,7 +360,7 @@ t_start_multipart_error(Config) ->
     _ = process_flag(trap_exit, true),
 
     Key = emqx_s3_test_helpers:unique_key(),
-    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
 
     _ = erlang:monitor(process, Pid),
 
@@ -386,7 +386,7 @@ t_upload_part_error(Config) ->
     _ = process_flag(trap_exit, true),
 
     Key = emqx_s3_test_helpers:unique_key(),
-    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
 
     _ = erlang:monitor(process, Pid),
 
@@ -414,7 +414,7 @@ t_abort_multipart_error(Config) ->
     _ = process_flag(trap_exit, true),
 
     Key = emqx_s3_test_helpers:unique_key(),
-    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
 
     _ = erlang:monitor(process, Pid),
 
@@ -442,7 +442,7 @@ t_complete_multipart_error(Config) ->
     _ = process_flag(trap_exit, true),
 
     Key = emqx_s3_test_helpers:unique_key(),
-    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
 
     _ = erlang:monitor(process, Pid),
 
@@ -470,7 +470,7 @@ t_put_object_error(Config) ->
     _ = process_flag(trap_exit, true),
 
     Key = emqx_s3_test_helpers:unique_key(),
-    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
 
     _ = erlang:monitor(process, Pid),
 
@@ -496,7 +496,7 @@ t_put_object_error(Config) ->
 
 t_too_large(Config) ->
     Key = emqx_s3_test_helpers:unique_key(),
-    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
 
     _ = erlang:monitor(process, Pid),
 
@@ -533,7 +533,7 @@ t_tls_error(Config) ->
     ),
     ok = emqx_s3:update_profile(profile_id(), ProfileConfig),
     Key = emqx_s3_test_helpers:unique_key(),
-    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
 
     _ = erlang:monitor(process, Pid),
 
@@ -553,7 +553,7 @@ t_no_profile(_Config) ->
     Key = emqx_s3_test_helpers:unique_key(),
     ?assertMatch(
         {error, profile_not_found},
-        emqx_s3:start_uploader(<<"no-profile">>, #{key => Key})
+        emqx_s3:start_uploader(<<"no-profile">>, Key, #{})
     ).
 
 %%--------------------------------------------------------------------
@@ -572,7 +572,7 @@ list_objects(Config) ->
     proplists:get_value(contents, Props).
 
 upload(Key, ChunkSize, ChunkCount) ->
-    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
 
     _ = erlang:monitor(process, Pid),