Kaynağa Gözat

feat(s3): add S3 client application

Ilya Averyanov 3 yıl önce
ebeveyn
işleme
31b441a46e
32 değiştirilmiş dosya ile 3204 ekleme ve 4 silme
  1. 21 0
      .ci/docker-compose-file/docker-compose-minio-tcp.yaml
  2. 23 0
      .ci/docker-compose-file/docker-compose-minio-tls.yaml
  3. 16 0
      .ci/docker-compose-file/docker-compose-toxiproxy.yaml
  4. 12 0
      .ci/docker-compose-file/toxiproxy.json
  5. 1 1
      apps/emqx_machine/src/emqx_machine.app.src
  6. 2 1
      apps/emqx_machine/src/emqx_machine_boot.erl
  7. 94 0
      apps/emqx_s3/BSL.txt
  8. 133 0
      apps/emqx_s3/README.md
  9. 2 0
      apps/emqx_s3/docker-ct
  10. BIN
      apps/emqx_s3/docs/s3_app.png
  11. 6 0
      apps/emqx_s3/rebar.config
  12. 14 0
      apps/emqx_s3/src/emqx_s3.app.src
  13. 65 0
      apps/emqx_s3/src/emqx_s3.erl
  14. 16 0
      apps/emqx_s3/src/emqx_s3_app.erl
  15. 293 0
      apps/emqx_s3/src/emqx_s3_client.erl
  16. 390 0
      apps/emqx_s3/src/emqx_s3_profile_conf.erl
  17. 35 0
      apps/emqx_s3/src/emqx_s3_profile_http_pool_clients.erl
  18. 123 0
      apps/emqx_s3/src/emqx_s3_profile_http_pools.erl
  19. 48 0
      apps/emqx_s3/src/emqx_s3_profile_sup.erl
  20. 73 0
      apps/emqx_s3/src/emqx_s3_profile_uploader_sup.erl
  21. 143 0
      apps/emqx_s3/src/emqx_s3_schema.erl
  22. 47 0
      apps/emqx_s3/src/emqx_s3_sup.erl
  23. 318 0
      apps/emqx_s3/src/emqx_s3_uploader.erl
  24. 29 0
      apps/emqx_s3/test/certs/ca.crt
  25. 66 0
      apps/emqx_s3/test/emqx_s3_SUITE.erl
  26. 104 0
      apps/emqx_s3/test/emqx_s3_client_SUITE.erl
  27. 293 0
      apps/emqx_s3/test/emqx_s3_profile_conf_SUITE.erl
  28. 154 0
      apps/emqx_s3/test/emqx_s3_schema_SUITE.erl
  29. 135 0
      apps/emqx_s3/test/emqx_s3_test_helpers.erl
  30. 535 0
      apps/emqx_s3/test/emqx_s3_uploader_SUITE.erl
  31. 2 1
      rebar.config.erl
  32. 11 1
      scripts/ct/run.sh

+ 21 - 0
.ci/docker-compose-file/docker-compose-minio-tcp.yaml

@@ -0,0 +1,21 @@
+version: '3.7'
+
+services:
+  minio:
+    hostname: minio
+    image: quay.io/minio/minio:RELEASE.2023-03-20T20-16-18Z
+    command: server --address ":9000" --console-address ":9001" /minio-data
+    expose:
+      - "9000"
+      - "9001"
+    ports:
+      - "9000:9000"
+      - "9001:9001"
+    healthcheck:
+      test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
+      interval: 30s
+      timeout: 5s
+      retries: 3
+    networks:
+      emqx_bridge:
+

+ 23 - 0
.ci/docker-compose-file/docker-compose-minio-tls.yaml

@@ -0,0 +1,23 @@
+version: '3.7'
+
+services:
+  minio_tls:
+    hostname: minio-tls
+    image: quay.io/minio/minio:RELEASE.2023-03-20T20-16-18Z
+    command: server --certs-dir /etc/certs --address ":9100" --console-address ":9101" /minio-data
+    volumes:
+      - ./certs/server.crt:/etc/certs/public.crt
+      - ./certs/server.key:/etc/certs/private.key
+    expose:
+      - "9100"
+      - "9101"
+    ports:
+      - "9100:9100"
+      - "9101:9101"
+    healthcheck:
+      test: ["CMD", "curl", "-k", "-f", "https://localhost:9100/minio/health/live"]
+      interval: 30s
+      timeout: 5s
+      retries: 3
+    networks:
+      emqx_bridge:

+ 16 - 0
.ci/docker-compose-file/docker-compose-toxiproxy.yaml

@@ -13,18 +13,34 @@ services:
     volumes:
       - "./toxiproxy.json:/config/toxiproxy.json"
     ports:
+      # Toxiproxy management API
       - 8474:8474
+      # InfluxDB
       - 8086:8086
+      # InfluxDB TLS
       - 8087:8087
+      # MySQL
       - 13306:3306
+      # MySQL TLS
       - 13307:3307
+      # PostgreSQL
       - 15432:5432
+      # PostgreSQL TLS
       - 15433:5433
+      # TDEngine
       - 16041:6041
+      # DynamoDB
       - 18000:8000
+      # RocketMQ
       - 19876:9876
+      # Cassandra
       - 19042:9042
+      # Cassandra TLS
       - 19142:9142
+      # S3
+      - 19000:19000
+      # S3 TLS
+      - 19100:19100
     command:
       - "-host=0.0.0.0"
       - "-config=/config/toxiproxy.json"

+ 12 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -95,5 +95,17 @@
     "listen": "0.0.0.0:9142",
     "upstream": "cassandra:9142",
     "enabled": true
+  },
+  {
+    "name": "minio_tcp",
+    "listen": "0.0.0.0:19000",
+    "upstream": "minio:9000",
+    "enabled": true
+  },
+  {
+    "name": "minio_tls",
+    "listen": "0.0.0.0:19100",
+    "upstream": "minio-tls:9100",
+    "enabled": true
   }
 ]

+ 1 - 1
apps/emqx_machine/src/emqx_machine.app.src

@@ -3,7 +3,7 @@
     {id, "emqx_machine"},
     {description, "The EMQX Machine"},
     % strict semver, bump manually!
-    {vsn, "0.2.1"},
+    {vsn, "0.2.2"},
     {modules, []},
     {registered, []},
     {applications, [kernel, stdlib, emqx_ctl]},

+ 2 - 1
apps/emqx_machine/src/emqx_machine_boot.erl

@@ -146,7 +146,8 @@ basic_reboot_apps() ->
                 emqx_authz,
                 emqx_slow_subs,
                 emqx_auto_subscribe,
-                emqx_plugins
+                emqx_plugins,
+                emqx_s3
             ],
     case emqx_release:edition() of
         ce -> CE;

+ 94 - 0
apps/emqx_s3/BSL.txt

@@ -0,0 +1,94 @@
+Business Source License 1.1
+
+Licensor:             Hangzhou EMQ Technologies Co., Ltd.
+Licensed Work:        EMQX Enterprise Edition
+                      The Licensed Work is (c) 2023
+                      Hangzhou EMQ Technologies Co., Ltd.
+Additional Use Grant: Students and educators are granted right to copy,
+                      modify, and create derivative work for research
+                      or education.
+Change Date:          2027-02-01
+Change License:       Apache License, Version 2.0
+
+For information about alternative licensing arrangements for the Software,
+please contact Licensor: https://www.emqx.com/en/contact
+
+Notice
+
+The Business Source License (this document, or the “License”) is not an Open
+Source license. However, the Licensed Work will eventually be made available
+under an Open Source License, as stated in this License.
+
+License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
+“Business Source License” is a trademark of MariaDB Corporation Ab.
+
+-----------------------------------------------------------------------------
+
+Business Source License 1.1
+
+Terms
+
+The Licensor hereby grants you the right to copy, modify, create derivative
+works, redistribute, and make non-production use of the Licensed Work. The
+Licensor may make an Additional Use Grant, above, permitting limited
+production use.
+
+Effective on the Change Date, or the fourth anniversary of the first publicly
+available distribution of a specific version of the Licensed Work under this
+License, whichever comes first, the Licensor hereby grants you rights under
+the terms of the Change License, and the rights granted in the paragraph
+above terminate.
+
+If your use of the Licensed Work does not comply with the requirements
+currently in effect as described in this License, you must purchase a
+commercial license from the Licensor, its affiliated entities, or authorized
+resellers, or you must refrain from using the Licensed Work.
+
+All copies of the original and modified Licensed Work, and derivative works
+of the Licensed Work, are subject to this License. This License applies
+separately for each version of the Licensed Work and the Change Date may vary
+for each version of the Licensed Work released by Licensor.
+
+You must conspicuously display this License on each original or modified copy
+of the Licensed Work. If you receive the Licensed Work in original or
+modified form from a third party, the terms and conditions set forth in this
+License apply to your use of that work.
+
+Any use of the Licensed Work in violation of this License will automatically
+terminate your rights under this License for the current and all other
+versions of the Licensed Work.
+
+This License does not grant you any right in any trademark or logo of
+Licensor or its affiliates (provided that you may use a trademark or logo of
+Licensor as expressly required by this License).
+
+TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
+AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
+EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
+TITLE.
+
+MariaDB hereby grants you permission to use this License’s text to license
+your works, and to refer to it using the trademark “Business Source License”,
+as long as you comply with the Covenants of Licensor below.
+
+Covenants of Licensor
+
+In consideration of the right to use this License’s text and the “Business
+Source License” name and trademark, Licensor covenants to MariaDB, and to all
+other recipients of the licensed work to be provided by Licensor:
+
+1. To specify as the Change License the GPL Version 2.0 or any later version,
+   or a license that is compatible with GPL Version 2.0 or a later version,
+   where “compatible” means that software provided under the Change License can
+   be included in a program with software provided under GPL Version 2.0 or a
+   later version. Licensor may specify additional Change Licenses without
+   limitation.
+
+2. To either: (a) specify an additional grant of rights to use that does not
+   impose any additional restriction on the right granted in this License, as
+   the Additional Use Grant; or (b) insert the text “None”.
+
+3. To specify a Change Date.
+
+4. Not to modify this License in any other way.

+ 133 - 0
apps/emqx_s3/README.md

@@ -0,0 +1,133 @@
+# emqx_s3
+
+EMQX S3 Application
+
+## Description
+
+This application provides functionality for uploading files to S3.
+
+## Usage
+
+The steps to integrate this application are:
+* Integrate S3 configuration schema where needed.
+* On _client_ application start:
+    * Call `emqx_s3:start_profile(ProfileName, ProfileConfig)` with configuration.
+    * Add `emqx_config_handler` hook to call `emqx_s3:start_profile(ProfileName, ProfileConfig)` when configuration is updated.
+* On _client_ application stop, call `emqx_s3:stop_profile(ProfileName)`.
+
+`ProfileName` is a unique name used to distinguish different sets of S3 settings. Each profile has its own connection pool and configuration.
+
+To use S3 from a _client_ application:
+* Create an uploader process with `{ok, Pid} = emqx_s3:start_uploader(ProfileName, #{key => MyKey})`.
+* Write data with `emqx_s3_uploader:write(Pid, <<"data">>)`.
+* Finish the uploader with `emqx_s3_uploader:complete(Pid)` or `emqx_s3_uploader:abort(Pid)`.
+
+### Configuration
+
+Example of integrating S3 configuration schema into a _client_ application `emqx_someapp`.
+
+```erlang
+-module(emqx_someapp_schema).
+
+...
+
+roots() -> [someapp]
+...
+
+fields(someapp) ->
+    [
+        {other_setting, ...},
+        {s3_settings,
+            mk(
+                hoconsc:ref(emqx_s3_schema, s3),
+                #{
+                    desc => ?DESC("s3_settings"),
+                    required => true
+                }
+            )}
+    ];
+...
+
+```
+
+### Application start and config hooks
+
+```erlang
+-module(emqx_someapp_app).
+
+-behaviour(application).
+
+-export([start/2, stop/1]).
+
+-export([
+    pre_config_update/3,
+    post_config_update/5
+]).
+
+start(_StartType, _StartArgs) ->
+    ProfileConfig = emqx_config:get([someapp, s3_settings]),
+    ProfileName = someapp,
+    ok = emqx_s3:start_profile(ProfileName, ProfileConfig),
+    ok = emqx_config_handler:add_handler([someapp], ?MODULE).
+
+stop(_State) ->
+    ok = emqx_conf:remove_handler([someapp]),
+    ProfileName = someapp,
+    ok = emqx_s3:stop_profile(ProfileName).
+
+pre_config_update(_Path, NewConfig, _OldConfig) ->
+    {ok, NewConfig}.
+
+post_config_update(Path, _Req, NewConfig, _OldConfig, _AppEnvs) ->
+    NewProfileConfig = maps:get(s3_settings, NewConfig),
+    ProfileName = someapp,
+    %% more graceful error handling may be needed
+    ok = emqx_s3:update_profile(ProfileName, NewProfileConfig).
+
+```
+
+### Uploader usage
+
+```erlang
+-module(emqx_someapp_logic).
+...
+
+-spec do_upload_data(Key :: string(), Data :: binary()) -> ok.
+do_upload_data(Key, Data) ->
+    ProfileName = someapp,
+    {ok, Pid} = emqx_s3:start_uploader(ProfileName, #{key => Key}),
+    ok = emqx_s3_uploader:write(Pid, Data),
+    ok = emqx_s3_uploader:complete(Pid).
+
+```
+
+## Design
+
+![Design](./docs/s3_app.png)
+
+* Each profile has its own supervisor `emqx_s3_profile_sup`.
+* Under each profile supervisor, there is a
+    * `emqx_s3_profile_uploader_sup` supervisor for uploader processes.
+    * `emqx_s3_profile_conf` server for managing profile configuration.
+
+When an uploader process is started, it checkouts the actual S3 configuration for the profile from the `emqx_s3_profile_conf` server. It uses the obtained configuration and connection pool to upload data to S3 till the termination, even if the configuration is updated.
+
+`emqx_s3_profile_conf`:
+* Keeps actual S3 configuration for the profile and creates a connection pool for the actual configuration.
+* Creates a new connection pool when the configuration is updated.
+* Keeps track of uploaders using connection pools.
+* Drops connection pools when no uploaders are using it or after a timeout.
+
+The code is designed to allow a painless transition from `ehttpc` pool to any other HTTP pool/client.
+
+## Possible performance improvements
+
+One of the downsides of the current implementation is that there is a lot of message passing between the uploader client and the actual sockets.
+
+A possible improvement could be:
+* Use a process-less HTTP client, like [Mint](https://github.com/elixir-mint/mint).
+* Use a resource pool, like [NimblePool](https://github.com/dashbitco/nimble_pool) to manage the HTTP connections. It temporarily grants sockets to its clients.
+* Do the buffering logic locally in the uploader client.
+* Use `emqx_s3_client` directly from the uploader client.
+
+In this case, the data will be directly sent to the socket, without being sent to any intermediate processes.

+ 2 - 0
apps/emqx_s3/docker-ct

@@ -0,0 +1,2 @@
+minio
+toxiproxy

BIN
apps/emqx_s3/docs/s3_app.png


+ 6 - 0
apps/emqx_s3/rebar.config

@@ -0,0 +1,6 @@
+{deps, [
+    {emqx, {path, "../../apps/emqx"}},
+    {erlcloud, {git, "https://github.com/savonarola/erlcloud", {tag, "3.6.7-emqx-1"}}}
+]}.
+
+{project_plugins, [erlfmt]}.

+ 14 - 0
apps/emqx_s3/src/emqx_s3.app.src

@@ -0,0 +1,14 @@
+{application, emqx_s3, [
+    {description, "EMQX S3"},
+    {vsn, "5.0.6"},
+    {modules, []},
+    {registered, [emqx_s3_sup]},
+    {applications, [
+        kernel,
+        stdlib,
+        gproc,
+        erlcloud,
+        ehttpc
+    ]},
+    {mod, {emqx_s3_app, []}}
+]}.

+ 65 - 0
apps/emqx_s3/src/emqx_s3.erl

@@ -0,0 +1,65 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_s3).
+
+-include_lib("emqx/include/types.hrl").
+
+-export([
+    start_profile/2,
+    stop_profile/1,
+    update_profile/2,
+    start_uploader/2,
+    with_client/2
+]).
+
+-export_type([
+    profile_id/0,
+    profile_config/0
+]).
+
+-type profile_id() :: term().
+
+%% TODO: define fields
+-type profile_config() :: map().
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+-spec start_profile(profile_id(), profile_config()) -> ok_or_error(term()).
+start_profile(ProfileId, ProfileConfig) ->
+    case emqx_s3_sup:start_profile(ProfileId, ProfileConfig) of
+        {ok, _} ->
+            ok;
+        {error, _} = Error ->
+            Error
+    end.
+
+-spec stop_profile(profile_id()) -> ok_or_error(term()).
+stop_profile(ProfileId) ->
+    emqx_s3_sup:stop_profile(ProfileId).
+
+-spec update_profile(profile_id(), profile_config()) -> ok_or_error(term()).
+update_profile(ProfileId, ProfileConfig) ->
+    emqx_s3_profile_conf:update_config(ProfileId, ProfileConfig).
+
+-spec start_uploader(profile_id(), emqx_s3_uploader:opts()) ->
+    supervisor:start_ret() | {error, profile_not_found}.
+start_uploader(ProfileId, Opts) ->
+    emqx_s3_profile_uploader_sup:start_uploader(ProfileId, Opts).
+
+-spec with_client(profile_id(), fun((emqx_s3_client:client()) -> Result)) ->
+    {error, profile_not_found} | Result.
+with_client(ProfileId, Fun) when is_function(Fun, 1) ->
+    case emqx_s3_profile_conf:checkout_config(ProfileId) of
+        {ok, ClientConfig, _UploadConfig} ->
+            try
+                Fun(emqx_s3_client:create(ClientConfig))
+            after
+                emqx_s3_profile_conf:checkin_config(ProfileId)
+            end;
+        {error, _} = Error ->
+            Error
+    end.

+ 16 - 0
apps/emqx_s3/src/emqx_s3_app.erl

@@ -0,0 +1,16 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_s3_app).
+
+-behaviour(application).
+
+-export([start/2, stop/1]).
+
+start(_Type, _Args) ->
+    {ok, Sup} = emqx_s3_sup:start_link(),
+    {ok, Sup}.
+
+stop(_State) ->
+    ok.

+ 293 - 0
apps/emqx_s3/src/emqx_s3_client.erl

@@ -0,0 +1,293 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_s3_client).
+
+-include_lib("emqx/include/types.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("erlcloud/include/erlcloud_aws.hrl").
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-export([
+    create/1,
+
+    put_object/3,
+
+    start_multipart/2,
+    upload_part/5,
+    complete_multipart/4,
+    abort_multipart/3,
+    list/2,
+
+    format/1
+]).
+
+-export_type([client/0]).
+
+-type s3_bucket_acl() ::
+    private
+    | public_read
+    | public_read_write
+    | authenticated_read
+    | bucket_owner_read
+    | bucket_owner_full_control.
+
+-type headers() :: #{binary() => binary()}.
+
+-type key() :: string().
+-type part_number() :: non_neg_integer().
+-type upload_id() :: string().
+-type etag() :: string().
+
+-type upload_options() :: list({acl, s3_bucket_acl()}).
+
+-opaque client() :: #{
+    aws_config := aws_config(),
+    options := upload_options(),
+    bucket := string(),
+    headers := headers()
+}.
+
+-type config() :: #{
+    scheme := string(),
+    host := string(),
+    port := part_number(),
+    bucket := string(),
+    headers := headers(),
+    acl := s3_bucket_acl(),
+    access_key_id := string() | undefined,
+    secret_access_key := string() | undefined,
+    http_pool := ecpool:pool_name(),
+    request_timeout := timeout()
+}.
+
+-type s3_options() :: list({string(), string()}).
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+-spec create(config()) -> client().
+create(Config) ->
+    #{
+        aws_config => aws_config(Config),
+        upload_options => upload_options(Config),
+        bucket => maps:get(bucket, Config),
+        headers => headers(Config)
+    }.
+
+-spec put_object(client(), key(), iodata()) -> ok_or_error(term()).
+put_object(
+    #{bucket := Bucket, upload_options := Options, headers := Headers, aws_config := AwsConfig},
+    Key,
+    Value
+) ->
+    try erlcloud_s3:put_object(Bucket, Key, Value, Options, Headers, AwsConfig) of
+        Props when is_list(Props) ->
+            ok
+    catch
+        error:{aws_error, Reason} ->
+            ?SLOG(debug, #{msg => "put_object_fail", key => Key, reason => Reason}),
+            {error, Reason}
+    end.
+
+-spec start_multipart(client(), key()) -> ok_or_error(upload_id(), term()).
+start_multipart(
+    #{bucket := Bucket, upload_options := Options, headers := Headers, aws_config := AwsConfig},
+    Key
+) ->
+    case erlcloud_s3:start_multipart(Bucket, Key, Options, Headers, AwsConfig) of
+        {ok, Props} ->
+            {ok, proplists:get_value(uploadId, Props)};
+        {error, Reason} ->
+            ?SLOG(debug, #{msg => "start_multipart_fail", key => Key, reason => Reason}),
+            {error, Reason}
+    end.
+
+-spec upload_part(client(), key(), upload_id(), part_number(), iodata()) ->
+    ok_or_error(etag(), term()).
+upload_part(
+    #{bucket := Bucket, headers := Headers, aws_config := AwsConfig},
+    Key,
+    UploadId,
+    PartNumber,
+    Value
+) ->
+    case erlcloud_s3:upload_part(Bucket, Key, UploadId, PartNumber, Value, Headers, AwsConfig) of
+        {ok, Props} ->
+            {ok, proplists:get_value(etag, Props)};
+        {error, Reason} ->
+            ?SLOG(debug, #{msg => "upload_part_fail", key => Key, reason => Reason}),
+            {error, Reason}
+    end.
+
+-spec complete_multipart(client(), key(), upload_id(), [etag()]) -> ok_or_error(term()).
+complete_multipart(
+    #{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, Key, UploadId, ETags
+) ->
+    case erlcloud_s3:complete_multipart(Bucket, Key, UploadId, ETags, Headers, AwsConfig) of
+        ok ->
+            ok;
+        {error, Reason} ->
+            ?SLOG(debug, #{msg => "complete_multipart_fail", key => Key, reason => Reason}),
+            {error, Reason}
+    end.
+
+-spec abort_multipart(client(), key(), upload_id()) -> ok_or_error(term()).
+abort_multipart(#{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, Key, UploadId) ->
+    case erlcloud_s3:abort_multipart(Bucket, Key, UploadId, [], Headers, AwsConfig) of
+        ok ->
+            ok;
+        {error, Reason} ->
+            ?SLOG(debug, #{msg => "abort_multipart_fail", key => Key, reason => Reason}),
+            {error, Reason}
+    end.
+
+-spec list(client(), s3_options()) -> ok_or_error(term()).
+list(#{bucket := Bucket, aws_config := AwsConfig}, Options) ->
+    try
+        {ok, erlcloud_s3:list_objects(Bucket, Options, AwsConfig)}
+    catch
+        error:{aws_error, Reason} ->
+            ?SLOG(debug, #{msg => "list_objects_fail", bucket => Bucket, reason => Reason}),
+            {error, Reason}
+    end.
+
+-spec format(client()) -> term().
+format(#{aws_config := AwsConfig} = Client) ->
+    Client#{aws_config => AwsConfig#aws_config{secret_access_key = "***"}}.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+upload_options(Config) ->
+    [
+        {acl, maps:get(acl, Config)}
+    ].
+
+headers(#{headers := Headers}) ->
+    maps:to_list(Headers).
+
+aws_config(#{
+    scheme := Scheme,
+    host := Host,
+    port := Port,
+    headers := Headers,
+    access_key_id := AccessKeyId,
+    secret_access_key := SecretAccessKey,
+    http_pool := HttpPool,
+    request_timeout := Timeout
+}) ->
+    #aws_config{
+        s3_scheme = Scheme,
+        s3_host = Host,
+        s3_port = Port,
+        s3_bucket_access_method = path,
+
+        access_key_id = AccessKeyId,
+        secret_access_key = SecretAccessKey,
+
+        http_client = request_fun(Headers, HttpPool),
+        timeout = Timeout
+    }.
+
+-type http_headers() :: [{binary(), binary()}].
+-type http_pool() :: term().
+
+-spec request_fun(http_headers(), http_pool()) -> erlcloud_httpc:request_fun().
+request_fun(CustomHeaders, HttpPool) ->
+    fun(Url, Method, Headers, Body, Timeout, _Config) ->
+        with_path_and_query_only(Url, fun(PathQuery) ->
+            JoinedHeaders = join_headers(Headers, CustomHeaders),
+            Request = make_request(Method, PathQuery, JoinedHeaders, Body),
+            ehttpc_request(HttpPool, Method, Request, Timeout)
+        end)
+    end.
+
+ehttpc_request(HttpPool, Method, Request, Timeout) ->
+    try ehttpc:request(HttpPool, Method, Request, Timeout) of
+        {ok, StatusCode, RespHeaders} ->
+            {ok, {{StatusCode, undefined}, string_headers(RespHeaders), undefined}};
+        {ok, StatusCode, RespHeaders, RespBody} ->
+            {ok, {{StatusCode, undefined}, string_headers(RespHeaders), RespBody}};
+        {error, Reason} ->
+            ?SLOG(error, #{
+                msg => "s3_ehttpc_request_fail",
+                reason => Reason,
+                timeout => Timeout,
+                pool => HttpPool,
+                method => Method
+            }),
+            {error, Reason}
+    catch
+        error:badarg ->
+            ?SLOG(error, #{
+                msg => "s3_ehttpc_request_fail",
+                reason => badarg,
+                timeout => Timeout,
+                pool => HttpPool,
+                method => Method
+            }),
+            {error, no_ehttpc_pool};
+        error:Reason ->
+            ?SLOG(error, #{
+                msg => "s3_ehttpc_request_fail",
+                reason => Reason,
+                timeout => Timeout,
+                pool => HttpPool,
+                method => Method
+            }),
+            {error, Reason}
+    end.
+
+-define(IS_BODY_EMPTY(Body), (Body =:= undefined orelse Body =:= <<>>)).
+-define(NEEDS_BODY(Method), (Method =:= get orelse Method =:= head orelse Method =:= delete)).
+
+make_request(Method, PathQuery, Headers, Body) when
+    ?IS_BODY_EMPTY(Body) andalso ?NEEDS_BODY(Method)
+->
+    {PathQuery, Headers};
+make_request(_Method, PathQuery, Headers, Body) when ?IS_BODY_EMPTY(Body) ->
+    {PathQuery, [{<<"content-length">>, <<"0">>} | Headers], <<>>};
+make_request(_Method, PathQuery, Headers, Body) ->
+    {PathQuery, Headers, Body}.
+
+format_request({PathQuery, Headers, _Body}) -> {PathQuery, Headers, <<"...">>}.
+
+join_headers(Headers, CustomHeaders) ->
+    MapHeaders = lists:foldl(
+        fun({K, V}, MHeaders) ->
+            maps:put(to_binary(K), V, MHeaders)
+        end,
+        #{},
+        Headers ++ maps:to_list(CustomHeaders)
+    ),
+    maps:to_list(MapHeaders).
+
+with_path_and_query_only(Url, Fun) ->
+    case string:split(Url, "//", leading) of
+        [_Scheme, UrlRem] ->
+            case string:split(UrlRem, "/", leading) of
+                [_HostPort, PathQuery] ->
+                    Fun([$/ | PathQuery]);
+                _ ->
+                    {error, {invalid_url, Url}}
+            end;
+        _ ->
+            {error, {invalid_url, Url}}
+    end.
+
+to_binary(Val) when is_list(Val) -> list_to_binary(Val);
+to_binary(Val) when is_binary(Val) -> Val.
+
+string_headers(Hdrs) ->
+    [{string:to_lower(to_list_string(K)), to_list_string(V)} || {K, V} <- Hdrs].
+
+to_list_string(Val) when is_binary(Val) ->
+    binary_to_list(Val);
+to_list_string(Val) when is_list(Val) ->
+    Val.

+ 390 - 0
apps/emqx_s3/src/emqx_s3_profile_conf.erl

@@ -0,0 +1,390 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_s3_profile_conf).
+
+-behaviour(gen_server).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx/include/types.hrl").
+
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-export([
+    start_link/2,
+    child_spec/2
+]).
+
+-export([
+    checkout_config/1,
+    checkout_config/2,
+    checkin_config/1,
+    checkin_config/2,
+
+    update_config/2,
+    update_config/3
+]).
+
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2,
+    code_change/3
+]).
+
+%% For test purposes
+-export([
+    client_config/2,
+    start_http_pool/2,
+    id/1
+]).
+
+-define(DEFAULT_CALL_TIMEOUT, 5000).
+
+-define(DEFAULT_HTTP_POOL_TIMEOUT, 60000).
+-define(DEAFULT_HTTP_POOL_CLEANUP_INTERVAL, 60000).
+
+-spec child_spec(emqx_s3:profile_id(), emqx_s3:profile_config()) -> supervisor:child_spec().
+child_spec(ProfileId, ProfileConfig) ->
+    #{
+        id => ProfileId,
+        start => {?MODULE, start_link, [ProfileId, ProfileConfig]},
+        restart => permanent,
+        shutdown => 5000,
+        type => worker,
+        modules => [?MODULE]
+    }.
+
+-spec start_link(emqx_s3:profile_id(), emqx_s3:profile_config()) -> gen_server:start_ret().
+start_link(ProfileId, ProfileConfig) ->
+    gen_server:start_link(?MODULE, [ProfileId, ProfileConfig], []).
+
+-spec update_config(emqx_s3:profile_id(), emqx_s3:profile_config()) -> ok_or_error(term()).
+update_config(ProfileId, ProfileConfig) ->
+    update_config(ProfileId, ProfileConfig, ?DEFAULT_CALL_TIMEOUT).
+
+-spec update_config(emqx_s3:profile_id(), emqx_s3:profile_config(), timeout()) ->
+    ok_or_error(term()).
+update_config(ProfileId, ProfileConfig, Timeout) ->
+    case gproc:where({n, l, id(ProfileId)}) of
+        undefined ->
+            {error, profile_not_found};
+        Pid ->
+            gen_server:call(Pid, {update_config, ProfileConfig}, Timeout)
+    end.
+
+-spec checkout_config(emqx_s3:profile_id()) ->
+    {ok, emqx_s3_client:config(), emqx_s3_uploader:config()} | {error, profile_not_found}.
+checkout_config(ProfileId) ->
+    checkout_config(ProfileId, ?DEFAULT_CALL_TIMEOUT).
+
+-spec checkout_config(emqx_s3:profile_id(), timeout()) ->
+    {ok, emqx_s3_client:config(), emqx_s3_uploader:config()} | {error, profile_not_found}.
+checkout_config(ProfileId, Timeout) ->
+    case gproc:where({n, l, id(ProfileId)}) of
+        undefined ->
+            {error, profile_not_found};
+        Pid ->
+            gen_server:call(Pid, {checkout_config, self()}, Timeout)
+    end.
+
+-spec checkin_config(emqx_s3:profile_id()) -> ok | {error, profile_not_found}.
+checkin_config(ProfileId) ->
+    checkin_config(ProfileId, ?DEFAULT_CALL_TIMEOUT).
+
+-spec checkin_config(emqx_s3:profile_id(), timeout()) -> ok | {error, profile_not_found}.
+checkin_config(ProfileId, Timeout) ->
+    case gproc:where({n, l, id(ProfileId)}) of
+        undefined ->
+            {error, profile_not_found};
+        Pid ->
+            gen_server:call(Pid, {checkin_config, self()}, Timeout)
+    end.
+
+%%--------------------------------------------------------------------
+%% gen_server callbacks
+%%--------------------------------------------------------------------
+
+init([ProfileId, ProfileConfig]) ->
+    _ = process_flag(trap_exit, true),
+    ok = cleanup_orphaned_pools(ProfileId),
+    case start_http_pool(ProfileId, ProfileConfig) of
+        {ok, PoolName} ->
+            true = gproc:reg({n, l, id(ProfileId)}, ignored),
+            HttpPoolCleanupInterval = http_pool_cleanup_interval(ProfileConfig),
+            {ok, #{
+                profile_id => ProfileId,
+                profile_config => ProfileConfig,
+                client_config => client_config(ProfileConfig, PoolName),
+                uploader_config => uploader_config(ProfileConfig),
+                pool_name => PoolName,
+                pool_clients => emqx_s3_profile_http_pool_clients:create_table(),
+                %% We don't expose these options to users currently, but use in tests
+                http_pool_timeout => http_pool_timeout(ProfileConfig),
+                http_pool_cleanup_interval => HttpPoolCleanupInterval,
+
+                outdated_pool_cleanup_tref => erlang:send_after(
+                    HttpPoolCleanupInterval, self(), cleanup_outdated
+                )
+            }};
+        {error, Reason} ->
+            {stop, Reason}
+    end.
+
+handle_call(
+    {checkout_config, Pid},
+    _From,
+    #{
+        client_config := ClientConfig,
+        uploader_config := UploaderConfig
+    } = State
+) ->
+    ok = register_client(Pid, State),
+    {reply, {ok, ClientConfig, UploaderConfig}, State};
+handle_call({checkin_config, Pid}, _From, State) ->
+    ok = unregister_client(Pid, State),
+    {reply, ok, State};
+handle_call(
+    {update_config, NewProfileConfig},
+    _From,
+    #{profile_id := ProfileId} = State
+) ->
+    case update_http_pool(ProfileId, NewProfileConfig, State) of
+        {ok, PoolName} ->
+            NewState = State#{
+                profile_config => NewProfileConfig,
+                client_config => client_config(NewProfileConfig, PoolName),
+                uploader_config => uploader_config(NewProfileConfig),
+                http_pool_timeout => http_pool_timeout(NewProfileConfig),
+                http_pool_cleanup_interval => http_pool_cleanup_interval(NewProfileConfig),
+                pool_name => PoolName
+            },
+            {reply, ok, NewState};
+        {error, Reason} ->
+            {reply, {error, Reason}, State}
+    end;
+handle_call(_Request, _From, State) ->
+    {reply, {error, not_implemented}, State}.
+
+handle_cast(_Request, State) ->
+    {noreply, State}.
+
+handle_info({'DOWN', _Ref, process, Pid, _Reason}, State) ->
+    ok = unregister_client(Pid, State),
+    {noreply, State};
+handle_info(cleanup_outdated, #{http_pool_cleanup_interval := HttpPoolCleanupInterval} = State0) ->
+    %% Maybe cleanup asynchoronously
+    ok = cleanup_outdated_pools(State0),
+    State1 = State0#{
+        outdated_pool_cleanup_tref => erlang:send_after(
+            HttpPoolCleanupInterval, self(), cleanup_outdated
+        )
+    },
+    {noreply, State1};
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, #{profile_id := ProfileId}) ->
+    lists:foreach(
+        fun(PoolName) ->
+            ok = stop_http_pool(ProfileId, PoolName)
+        end,
+        emqx_s3_profile_http_pools:all(ProfileId)
+    ).
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+id(ProfileId) ->
+    {?MODULE, ProfileId}.
+
+client_config(ProfileConfig, PoolName) ->
+    HTTPOpts = maps:get(transport_options, ProfileConfig, #{}),
+    #{
+        scheme => scheme(HTTPOpts),
+        host => maps:get(host, ProfileConfig),
+        port => maps:get(port, ProfileConfig),
+        headers => maps:get(headers, HTTPOpts, #{}),
+        acl => maps:get(acl, ProfileConfig),
+        bucket => maps:get(bucket, ProfileConfig),
+        access_key_id => maps:get(access_key_id, ProfileConfig, undefined),
+        secret_access_key => maps:get(secret_access_key, ProfileConfig, undefined),
+        request_timeout => maps:get(request_timeout, HTTPOpts, undefined),
+        http_pool => PoolName
+    }.
+
+uploader_config(#{max_part_size := MaxPartSize, min_part_size := MinPartSize} = _ProfileConfig) ->
+    #{
+        min_part_size => MinPartSize,
+        max_part_size => MaxPartSize
+    }.
+
+scheme(#{ssl := #{enable := true}}) -> "https://";
+scheme(_TransportOpts) -> "http://".
+
+start_http_pool(ProfileId, ProfileConfig) ->
+    HttpConfig = http_config(ProfileConfig),
+    PoolName = pool_name(ProfileId),
+    case do_start_http_pool(PoolName, HttpConfig) of
+        ok ->
+            ok = emqx_s3_profile_http_pools:register(ProfileId, PoolName),
+            ok = ?tp(debug, "s3_start_http_pool", #{pool_name => PoolName, profile_id => ProfileId}),
+            {ok, PoolName};
+        {error, _} = Error ->
+            Error
+    end.
+
+update_http_pool(ProfileId, ProfileConfig, #{pool_name := OldPoolName} = State) ->
+    HttpConfig = http_config(ProfileConfig),
+    OldHttpConfig = old_http_config(State),
+    case OldHttpConfig =:= HttpConfig of
+        true ->
+            {ok, OldPoolName};
+        false ->
+            PoolName = pool_name(ProfileId),
+            case do_start_http_pool(PoolName, HttpConfig) of
+                ok ->
+                    ok = set_old_pool_outdated(State),
+                    ok = emqx_s3_profile_http_pools:register(ProfileId, PoolName),
+                    {ok, PoolName};
+                {error, _} = Error ->
+                    Error
+            end
+    end.
+
+pool_name(ProfileId) ->
+    iolist_to_binary([
+        <<"s3-http-">>,
+        ProfileId,
+        <<"-">>,
+        integer_to_binary(erlang:system_time(millisecond)),
+        <<"-">>,
+        integer_to_binary(erlang:unique_integer([positive]))
+    ]).
+
+old_http_config(#{profile_config := ProfileConfig}) -> http_config(ProfileConfig).
+
+set_old_pool_outdated(#{
+    profile_id := ProfileId, pool_name := PoolName, http_pool_timeout := HttpPoolTimeout
+}) ->
+    _ = emqx_s3_profile_http_pools:set_outdated(ProfileId, PoolName, HttpPoolTimeout),
+    ok.
+
+cleanup_orphaned_pools(ProfileId) ->
+    lists:foreach(
+        fun(PoolName) ->
+            ok = stop_http_pool(ProfileId, PoolName)
+        end,
+        emqx_s3_profile_http_pools:all(ProfileId)
+    ).
+
+register_client(Pid, #{profile_id := ProfileId, pool_clients := PoolClients, pool_name := PoolName}) ->
+    MRef = monitor(process, Pid),
+    ok = emqx_s3_profile_http_pool_clients:register(PoolClients, Pid, MRef, PoolName),
+    _ = emqx_s3_profile_http_pools:register_client(ProfileId, PoolName),
+    ok.
+
+unregister_client(
+    Pid,
+    #{
+        profile_id := ProfileId, pool_clients := PoolClients, pool_name := PoolName
+    }
+) ->
+    case emqx_s3_profile_http_pool_clients:unregister(PoolClients, Pid) of
+        undefined ->
+            ok;
+        {MRef, PoolName} ->
+            true = erlang:demonitor(MRef, [flush]),
+            _ = emqx_s3_profile_http_pools:unregister_client(ProfileId, PoolName),
+            ok;
+        {MRef, OutdatedPoolName} ->
+            true = erlang:demonitor(MRef, [flush]),
+            ClientNum = emqx_s3_profile_http_pools:unregister_client(ProfileId, OutdatedPoolName),
+            maybe_stop_outdated_pool(ProfileId, OutdatedPoolName, ClientNum)
+    end.
+
+maybe_stop_outdated_pool(ProfileId, OutdatedPoolName, 0) ->
+    ok = stop_http_pool(ProfileId, OutdatedPoolName);
+maybe_stop_outdated_pool(_ProfileId, _OutdatedPoolName, _ClientNum) ->
+    ok.
+
+cleanup_outdated_pools(#{profile_id := ProfileId}) ->
+    lists:foreach(
+        fun(PoolName) ->
+            ok = stop_http_pool(ProfileId, PoolName)
+        end,
+        emqx_s3_profile_http_pools:outdated(ProfileId)
+    ).
+
+%%--------------------------------------------------------------------
+%% HTTP Pool implementation dependent functions
+%%--------------------------------------------------------------------
+
+http_config(
+    #{
+        host := Host,
+        port := Port,
+        transport_options := #{
+            pool_type := PoolType,
+            pool_size := PoolSize,
+            enable_pipelining := EnablePipelining,
+            connect_timeout := ConnectTimeout
+        } = HTTPOpts
+    }
+) ->
+    {Transport, TransportOpts} =
+        case scheme(HTTPOpts) of
+            "http://" ->
+                {tcp, []};
+            "https://" ->
+                SSLOpts = emqx_tls_lib:to_client_opts(maps:get(ssl, HTTPOpts)),
+                {tls, SSLOpts}
+        end,
+    NTransportOpts = emqx_misc:ipv6_probe(TransportOpts),
+    [
+        {host, Host},
+        {port, Port},
+        {connect_timeout, ConnectTimeout},
+        {keepalive, 30000},
+        {pool_type, PoolType},
+        {pool_size, PoolSize},
+        {transport, Transport},
+        {transport_opts, NTransportOpts},
+        {enable_pipelining, EnablePipelining}
+    ].
+
+http_pool_cleanup_interval(ProfileConfig) ->
+    maps:get(
+        http_pool_cleanup_interval, ProfileConfig, ?DEAFULT_HTTP_POOL_CLEANUP_INTERVAL
+    ).
+
+http_pool_timeout(ProfileConfig) ->
+    maps:get(
+        http_pool_timeout, ProfileConfig, ?DEFAULT_HTTP_POOL_TIMEOUT
+    ).
+
+stop_http_pool(ProfileId, PoolName) ->
+    case ehttpc_sup:stop_pool(PoolName) of
+        ok ->
+            ok;
+        {error, Reason} ->
+            ?SLOG(error, #{msg => "ehttpc_pool_stop_fail", pool_name => PoolName, reason => Reason}),
+            ok
+    end,
+    ok = emqx_s3_profile_http_pools:unregister(ProfileId, PoolName),
+    ok = ?tp(debug, "s3_stop_http_pool", #{pool_name => PoolName}).
+
+do_start_http_pool(PoolName, HttpConfig) ->
+    case ehttpc_sup:start_pool(PoolName, HttpConfig) of
+        {ok, _} ->
+            ok;
+        {error, _} = Error ->
+            Error
+    end.

+ 35 - 0
apps/emqx_s3/src/emqx_s3_profile_http_pool_clients.erl

@@ -0,0 +1,35 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_s3_profile_http_pool_clients).
+
+-export([
+    create_table/0,
+
+    register/4,
+    unregister/2
+]).
+
+-define(TAB, ?MODULE).
+
+-spec create_table() -> ok.
+create_table() ->
+    ets:new(?TAB, [
+        private,
+        set
+    ]).
+
+-spec register(ets:tid(), pid(), reference(), emqx_s3_profile_http_pools:pool_name()) -> true.
+register(Tab, Pid, MRef, PoolName) ->
+    true = ets:insert(Tab, {Pid, {MRef, PoolName}}),
+    ok.
+
+-spec unregister(ets:tid(), pid()) -> emqx_s3_profile_http_pools:pool_name() | undefined.
+unregister(Tab, Pid) ->
+    case ets:take(Tab, Pid) of
+        [{Pid, {MRef, PoolName}}] ->
+            {MRef, PoolName};
+        [] ->
+            undefined
+    end.

+ 123 - 0
apps/emqx_s3/src/emqx_s3_profile_http_pools.erl

@@ -0,0 +1,123 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_s3_profile_http_pools).
+
+-include_lib("stdlib/include/ms_transform.hrl").
+
+-export([
+    create_table/0,
+
+    register/2,
+    unregister/2,
+
+    register_client/2,
+    unregister_client/2,
+
+    set_outdated/3,
+
+    outdated/1,
+    all/1
+]).
+
+-export_type([pool_name/0]).
+
+-define(TAB, ?MODULE).
+
+-type pool_name() :: ecpool:pool_name().
+
+-type pool_key() :: {emqx_s3:profile_id(), pool_name()}.
+
+-record(pool, {
+    key :: pool_key(),
+    client_count = 0 :: integer(),
+    deadline = undefined :: undefined | integer(),
+    extra = #{} :: map()
+}).
+
+-spec create_table() -> ok.
+create_table() ->
+    _ = ets:new(?TAB, [
+        named_table,
+        public,
+        ordered_set,
+        {keypos, #pool.key},
+        {read_concurrency, true},
+        {write_concurrency, true}
+    ]),
+    ok.
+
+-spec register(emqx_s3:profile_id(), pool_name()) ->
+    ok.
+register(ProfileId, PoolName) ->
+    Key = key(ProfileId, PoolName),
+    true = ets:insert(?TAB, #pool{
+        key = Key,
+        client_count = 0,
+        deadline = undefined,
+        extra = #{}
+    }),
+    ok.
+
+-spec unregister(emqx_s3:profile_id(), pool_name()) ->
+    ok.
+unregister(ProfileId, PoolName) ->
+    Key = key(ProfileId, PoolName),
+    true = ets:delete(?TAB, Key),
+    ok.
+
+-spec register_client(emqx_s3:profile_id(), pool_name()) ->
+    integer().
+register_client(ProfileId, PoolName) ->
+    Key = key(ProfileId, PoolName),
+    ets:update_counter(?TAB, Key, {#pool.client_count, 1}).
+
+-spec unregister_client(emqx_s3:profile_id(), pool_name()) ->
+    integer().
+unregister_client(ProfileId, PoolName) ->
+    Key = key(ProfileId, PoolName),
+    try
+        ets:update_counter(?TAB, Key, {#pool.client_count, -1})
+    catch
+        error:badarg ->
+            undefined
+    end.
+
+-spec set_outdated(emqx_s3:profile_id(), pool_name(), integer()) ->
+    ok.
+set_outdated(ProfileId, PoolName, Timeout) ->
+    Key = key(ProfileId, PoolName),
+    Now = erlang:monotonic_time(millisecond),
+    ets:update_element(?TAB, Key, {#pool.deadline, Now + Timeout}).
+
+-spec outdated(emqx_s3:profile_id()) ->
+    [pool_name()].
+outdated(ProfileId) ->
+    Now = erlang:monotonic_time(millisecond),
+    MS = ets:fun2ms(
+        fun(#pool{key = {ProfileId_, PoolName}, deadline = Deadline_}) when
+            ProfileId_ =:= ProfileId andalso
+                Deadline_ =/= undefined andalso Deadline_ < Now
+        ->
+            PoolName
+        end
+    ),
+    ets:select(?TAB, MS).
+
+-spec all(emqx_s3:profile_id()) ->
+    [pool_name()].
+all(ProfileId) ->
+    MS = ets:fun2ms(
+        fun(#pool{key = {ProfileId_, PoolName}}) when ProfileId_ =:= ProfileId ->
+            PoolName
+        end
+    ),
+    ets:select(?TAB, MS).
+
+%%--------------------------------------------------------------------
+%% Helpers
+%%--------------------------------------------------------------------
+
+key(ProfileId, PoolName) ->
+    {ProfileId, PoolName}.

+ 48 - 0
apps/emqx_s3/src/emqx_s3_profile_sup.erl

@@ -0,0 +1,48 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_s3_profile_sup).
+
+-behaviour(supervisor).
+
+-include_lib("emqx/include/types.hrl").
+
+-export([
+    start_link/2,
+    child_spec/2
+]).
+
+-export([init/1]).
+
+-spec start_link(emqx_s3:profile_id(), emqx_s3:profile_config()) -> supervisor:start_ret().
+start_link(ProfileId, ProfileConfig) ->
+    supervisor:start_link(?MODULE, [ProfileId, ProfileConfig]).
+
+-spec child_spec(emqx_s3:profile_id(), emqx_s3:profile_config()) -> supervisor:child_spec().
+child_spec(ProfileId, ProfileConfig) ->
+    #{
+        id => ProfileId,
+        start => {?MODULE, start_link, [ProfileId, ProfileConfig]},
+        restart => permanent,
+        shutdown => 5000,
+        type => supervisor,
+        modules => [?MODULE]
+    }.
+
+%%--------------------------------------------------------------------
+%% supervisor callbacks
+%%-------------------------------------------------------------------
+
+init([ProfileId, ProfileConfig]) ->
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 10,
+        period => 5
+    },
+    ChildSpecs = [
+        %% Order matters
+        emqx_s3_profile_conf:child_spec(ProfileId, ProfileConfig),
+        emqx_s3_profile_uploader_sup:child_spec(ProfileId)
+    ],
+    {ok, {SupFlags, ChildSpecs}}.

+ 73 - 0
apps/emqx_s3/src/emqx_s3_profile_uploader_sup.erl

@@ -0,0 +1,73 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_s3_profile_uploader_sup).
+
+-behaviour(supervisor).
+
+-include_lib("emqx/include/types.hrl").
+
+-export([
+    start_link/1,
+    child_spec/1,
+    id/1,
+    start_uploader/2
+]).
+
+-export([init/1]).
+
+-export_type([id/0]).
+
+-type id() :: {?MODULE, emqx_s3:profile_id()}.
+
+-spec start_link(emqx_s3:profile_id()) -> supervisor:start_ret().
+start_link(ProfileId) ->
+    supervisor:start_link(?MODULE, [ProfileId]).
+
+-spec child_spec(emqx_s3:profile_id()) -> supervisor:child_spec().
+child_spec(ProfileId) ->
+    #{
+        id => id(ProfileId),
+        start => {?MODULE, start_link, [ProfileId]},
+        restart => permanent,
+        shutdown => 5000,
+        type => supervisor,
+        modules => [?MODULE]
+    }.
+
+-spec id(emqx_s3:profile_id()) -> id().
+id(ProfileId) ->
+    {?MODULE, ProfileId}.
+
+-spec start_uploader(emqx_s3:profile_id(), emqx_s3_uploader:opts()) ->
+    supervisor:start_ret() | {error, profile_not_found}.
+start_uploader(ProfileId, Opts) ->
+    Id = id(ProfileId),
+    case gproc:where({n, l, Id}) of
+        undefined -> {error, profile_not_found};
+        Pid -> supervisor:start_child(Pid, [Opts])
+    end.
+
+%%--------------------------------------------------------------------
+%% supervisor callbacks
+%%-------------------------------------------------------------------
+
+init([ProfileId]) ->
+    true = gproc:reg({n, l, id(ProfileId)}, ignored),
+    SupFlags = #{
+        strategy => simple_one_for_one,
+        intensity => 10,
+        period => 5
+    },
+    ChildSpecs = [
+        #{
+            id => emqx_s3_uploader,
+            start => {emqx_s3_uploader, start_link, [ProfileId]},
+            restart => temporary,
+            shutdown => 5000,
+            type => worker,
+            modules => [emqx_s3_uploader]
+        }
+    ],
+    {ok, {SupFlags, ChildSpecs}}.

+ 143 - 0
apps/emqx_s3/src/emqx_s3_schema.erl

@@ -0,0 +1,143 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_s3_schema).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+-import(hoconsc, [mk/2, ref/1]).
+
+-export([roots/0, fields/1, namespace/0, tags/0]).
+
+-export([translate/1]).
+
+roots() ->
+    [s3].
+
+namespace() -> "s3".
+
+tags() ->
+    [<<"S3">>].
+
+fields(s3) ->
+    [
+        {access_key_id,
+            mk(
+                string(),
+                #{
+                    desc => ?DESC("access_key_id"),
+                    required => false
+                }
+            )},
+        {secret_access_key,
+            mk(
+                string(),
+                #{
+                    desc => ?DESC("secret_access_key"),
+                    required => false
+                }
+            )},
+        {bucket,
+            mk(
+                string(),
+                #{
+                    desc => ?DESC("bucket"),
+                    required => true
+                }
+            )},
+        {host,
+            mk(
+                string(),
+                #{
+                    desc => ?DESC("host"),
+                    required => true
+                }
+            )},
+        {port,
+            mk(
+                pos_integer(),
+                #{
+                    desc => ?DESC("port"),
+                    required => true
+                }
+            )},
+        {min_part_size,
+            mk(
+                emqx_schema:bytesize(),
+                #{
+                    default => "5mb",
+                    desc => ?DESC("min_part_size"),
+                    required => true,
+                    validator => fun part_size_validator/1
+                }
+            )},
+        {max_part_size,
+            mk(
+                emqx_schema:bytesize(),
+                #{
+                    default => "5gb",
+                    desc => ?DESC("max_part_size"),
+                    required => true,
+                    validator => fun part_size_validator/1
+                }
+            )},
+        {acl,
+            mk(
+                hoconsc:enum([
+                    private,
+                    public_read,
+                    public_read_write,
+                    authenticated_read,
+                    bucket_owner_read,
+                    bucket_owner_full_control
+                ]),
+                #{
+                    default => private,
+                    desc => ?DESC("acl"),
+                    required => true
+                }
+            )},
+        {transport_options,
+            mk(
+                ref(transport_options),
+                #{
+                    desc => ?DESC("transport_options"),
+                    required => false
+                }
+            )}
+    ];
+fields(transport_options) ->
+    props_without(
+        [base_url, max_retries, retry_interval, request], emqx_connector_http:fields(config)
+    ) ++
+        props_with(
+            [headers, max_retries, request_timeout], emqx_connector_http:fields("request")
+        ).
+
+translate(Conf) ->
+    Options = #{atom_key => true},
+    #{s3 := TranslatedConf} = hocon_tconf:check_plain(
+        emqx_s3_schema, #{<<"s3">> => Conf}, Options, [s3]
+    ),
+    TranslatedConf.
+
+%%--------------------------------------------------------------------
+%% Helpers
+%%--------------------------------------------------------------------
+
+props_with(Keys, Proplist) ->
+    lists:filter(fun({K, _}) -> lists:member(K, Keys) end, Proplist).
+
+props_without(Keys, Proplist) ->
+    lists:filter(fun({K, _}) -> not lists:member(K, Keys) end, Proplist).
+
+part_size_validator(PartSizeLimit) ->
+    case
+        PartSizeLimit >= 5 * 1024 * 1024 andalso
+            PartSizeLimit =< 5 * 1024 * 1024 * 1024
+    of
+        true -> ok;
+        false -> {error, "must be at least 5mb and less than 5gb"}
+    end.

+ 47 - 0
apps/emqx_s3/src/emqx_s3_sup.erl

@@ -0,0 +1,47 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_s3_sup).
+
+-behaviour(supervisor).
+
+-include_lib("emqx/include/types.hrl").
+
+-export([
+    start_link/0,
+    start_profile/2,
+    stop_profile/1
+]).
+
+-export([init/1]).
+
+-spec start_link() -> supervisor:start_ret().
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+-spec start_profile(emqx_s3:profile_id(), emqx_s3:profile_config()) -> supervisor:startchild_ret().
+start_profile(ProfileId, ProfileConfig) ->
+    supervisor:start_child(?MODULE, emqx_s3_profile_sup:child_spec(ProfileId, ProfileConfig)).
+
+-spec stop_profile(emqx_s3:profile_id()) -> ok_or_error(term()).
+stop_profile(ProfileId) ->
+    case supervisor:terminate_child(?MODULE, ProfileId) of
+        ok ->
+            supervisor:delete_child(?MODULE, ProfileId);
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+%%--------------------------------------------------------------------
+%% supervisor callbacks
+%%-------------------------------------------------------------------
+
+init([]) ->
+    ok = emqx_s3_profile_http_pools:create_table(),
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 10,
+        period => 5
+    },
+    {ok, {SupFlags, []}}.

+ 318 - 0
apps/emqx_s3/src/emqx_s3_uploader.erl

@@ -0,0 +1,318 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_s3_uploader).
+
+-include_lib("emqx/include/types.hrl").
+
+-behaviour(gen_statem).
+
+-export([
+    start_link/2,
+
+    write/2,
+    complete/1,
+    abort/1
+]).
+
+-export([
+    init/1,
+    callback_mode/0,
+    handle_event/4,
+    terminate/3,
+    code_change/4,
+    format_status/1,
+    format_status/2
+]).
+
+-export_type([opts/0, config/0]).
+
+-type opts() :: #{
+    name := string()
+}.
+
+-type config() :: #{
+    min_part_size := pos_integer()
+}.
+
+-type data() :: #{
+    profile_id := emqx_s3:profile_id(),
+    client := emqx_s3_client:client(),
+    key := emqx_s3_client:key(),
+    buffer := iodata(),
+    buffer_size := non_neg_integer(),
+    min_part_size := pos_integer(),
+    max_part_size := pos_integer(),
+    upload_id := undefined | emqx_s3_client:upload_id(),
+    etags := [emqx_s3_client:etag()],
+    part_number := emqx_s3_client:part_number()
+}.
+
+%% 5MB
+-define(DEFAULT_MIN_PART_SIZE, 5242880).
+%% 5GB
+-define(DEFAULT_MAX_PART_SIZE, 5368709120).
+
+-spec start_link(emqx_s3:profile_id(), opts()) -> gen_statem:start_ret().
+start_link(ProfileId, #{key := Key} = Opts) when is_list(Key) ->
+    gen_statem:start_link(?MODULE, [ProfileId, Opts], []).
+
+-spec write(pid(), binary()) -> ok_or_error(term()).
+write(Pid, WriteData) when is_binary(WriteData) ->
+    write(Pid, WriteData, infinity).
+
+-spec write(pid(), binary(), timeout()) -> ok_or_error(term()).
+write(Pid, WriteData, Timeout) when is_binary(WriteData) ->
+    gen_statem:call(Pid, {write, wrap(WriteData)}, Timeout).
+
+-spec complete(pid()) -> ok_or_error(term()).
+complete(Pid) ->
+    complete(Pid, infinity).
+
+-spec complete(pid(), timeout()) -> ok_or_error(term()).
+complete(Pid, Timeout) ->
+    gen_statem:call(Pid, complete, Timeout).
+
+-spec abort(pid()) -> ok_or_error(term()).
+abort(Pid) ->
+    abort(Pid, infinity).
+
+-spec abort(pid(), timeout()) -> ok_or_error(term()).
+abort(Pid, Timeout) ->
+    gen_statem:call(Pid, abort, Timeout).
+
+%%--------------------------------------------------------------------
+%% gen_statem callbacks
+%%--------------------------------------------------------------------
+
+callback_mode() -> handle_event_function.
+
+init([ProfileId, #{key := Key}]) ->
+    process_flag(trap_exit, true),
+    {ok, ClientConfig, UploaderConfig} = emqx_s3_profile_conf:checkout_config(ProfileId),
+    Client = emqx_s3_client:create(ClientConfig),
+    {ok, upload_not_started, #{
+        profile_id => ProfileId,
+        client => Client,
+        key => Key,
+        buffer => [],
+        buffer_size => 0,
+        min_part_size => maps:get(min_part_size, UploaderConfig, ?DEFAULT_MIN_PART_SIZE),
+        max_part_size => maps:get(max_part_size, UploaderConfig, ?DEFAULT_MAX_PART_SIZE),
+        upload_id => undefined,
+        etags => [],
+        part_number => 1
+    }}.
+
+handle_event({call, From}, {write, WriteDataWrapped}, State, Data0) ->
+    WriteData = unwrap(WriteDataWrapped),
+    case is_valid_part(WriteData, Data0) of
+        true ->
+            handle_write(State, From, WriteData, Data0);
+        false ->
+            {keep_state_and_data, {reply, From, {error, {too_large, byte_size(WriteData)}}}}
+    end;
+handle_event({call, From}, complete, upload_not_started, Data0) ->
+    case put_object(Data0) of
+        ok ->
+            {stop_and_reply, normal, {reply, From, ok}};
+        {error, _} = Error ->
+            {stop_and_reply, Error, {reply, From, Error}, Data0}
+    end;
+handle_event({call, From}, complete, upload_started, Data0) ->
+    case complete_upload(Data0) of
+        {ok, Data1} ->
+            {stop_and_reply, normal, {reply, From, ok}, Data1};
+        {error, _} = Error ->
+            {stop_and_reply, Error, {reply, From, Error}, Data0}
+    end;
+handle_event({call, From}, abort, upload_not_started, _Data) ->
+    {stop_and_reply, normal, {reply, From, ok}};
+handle_event({call, From}, abort, upload_started, Data0) ->
+    case abort_upload(Data0) of
+        ok ->
+            {stop_and_reply, normal, {reply, From, ok}};
+        {error, _} = Error ->
+            {stop_and_reply, Error, {reply, From, Error}, Data0}
+    end.
+
+handle_write(upload_not_started, From, WriteData, Data0) ->
+    Data1 = append_buffer(Data0, WriteData),
+    case maybe_start_upload(Data1) of
+        not_started ->
+            {keep_state, Data1, {reply, From, ok}};
+        {started, Data2} ->
+            case upload_part(Data2) of
+                {ok, Data3} ->
+                    {next_state, upload_started, Data3, {reply, From, ok}};
+                {error, _} = Error ->
+                    {stop_and_reply, Error, {reply, From, Error}, Data2}
+            end;
+        {error, _} = Error ->
+            {stop_and_reply, Error, {reply, From, Error}, Data1}
+    end;
+handle_write(upload_started, From, WriteData, Data0) ->
+    Data1 = append_buffer(Data0, WriteData),
+    case maybe_upload_part(Data1) of
+        {ok, Data2} ->
+            {keep_state, Data2, {reply, From, ok}};
+        {error, _} = Error ->
+            {stop_and_reply, Error, {reply, From, Error}, Data1}
+    end.
+
+terminate(Reason, _State, #{client := Client, upload_id := UploadId, key := Key}) when
+    (UploadId =/= undefined) andalso (Reason =/= normal)
+->
+    emqx_s3_client:abort_multipart(Client, Key, UploadId);
+terminate(_Reason, _State, _Data) ->
+    ok.
+
+code_change(_OldVsn, StateName, State, _Extra) ->
+    {ok, StateName, State}.
+
+format_status(#{data := #{client := Client} = Data} = Status) ->
+    Status#{
+        data => Data#{
+            client => emqx_s3_client:format(Client),
+            buffer => [<<"...">>]
+        }
+    }.
+
+format_status(_Opt, [PDict, State, #{client := Client} = Data]) ->
+    #{
+        data => Data#{
+            client => emqx_s3_client:format(Client),
+            buffer => [<<"...">>]
+        },
+        state => State,
+        pdict => PDict
+    }.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+-spec maybe_start_upload(data()) -> not_started | {started, data()} | {error, term()}.
+maybe_start_upload(#{buffer_size := BufferSize, min_part_size := MinPartSize} = Data) ->
+    case BufferSize >= MinPartSize of
+        true ->
+            start_upload(Data);
+        false ->
+            not_started
+    end.
+
+-spec start_upload(data()) -> {started, data()} | {error, term()}.
+start_upload(#{client := Client, key := Key} = Data) ->
+    case emqx_s3_client:start_multipart(Client, Key) of
+        {ok, UploadId} ->
+            NewData = Data#{upload_id => UploadId},
+            {started, NewData};
+        {error, _} = Error ->
+            Error
+    end.
+
+-spec maybe_upload_part(data()) -> ok_or_error(data(), term()).
+maybe_upload_part(#{buffer_size := BufferSize, min_part_size := MinPartSize} = Data) ->
+    case BufferSize >= MinPartSize of
+        true ->
+            upload_part(Data);
+        false ->
+            % ct:print("buffer size: ~p, max part size: ~p, no upload", [BufferSize, MinPartSize]),
+            {ok, Data}
+    end.
+
+-spec upload_part(data()) -> ok_or_error(data(), term()).
+upload_part(#{buffer_size := 0} = Data) ->
+    {ok, Data};
+upload_part(
+    #{
+        client := Client,
+        key := Key,
+        upload_id := UploadId,
+        buffer := Buffer,
+        part_number := PartNumber,
+        etags := ETags
+    } = Data
+) ->
+    case emqx_s3_client:upload_part(Client, Key, UploadId, PartNumber, lists:reverse(Buffer)) of
+        {ok, ETag} ->
+            % ct:print("upload part ~p, etag: ~p", [PartNumber, ETag]),
+            NewData = Data#{
+                buffer => [],
+                buffer_size => 0,
+                part_number => PartNumber + 1,
+                etags => [{PartNumber, ETag} | ETags]
+            },
+            {ok, NewData};
+        {error, _} = Error ->
+            % ct:print("upload part ~p failed: ~p", [PartNumber, Error]),
+            Error
+    end.
+
+-spec complete_upload(data()) -> ok_or_error(term()).
+complete_upload(
+    #{
+        client := Client,
+        key := Key,
+        upload_id := UploadId
+    } = Data0
+) ->
+    case upload_part(Data0) of
+        {ok, #{etags := ETags} = Data1} ->
+            case emqx_s3_client:complete_multipart(Client, Key, UploadId, lists:reverse(ETags)) of
+                ok ->
+                    {ok, Data1};
+                {error, _} = Error ->
+                    Error
+            end;
+        {error, _} = Error ->
+            Error
+    end.
+
+-spec abort_upload(data()) -> ok_or_error(term()).
+abort_upload(
+    #{
+        client := Client,
+        key := Key,
+        upload_id := UploadId
+    }
+) ->
+    case emqx_s3_client:abort_multipart(Client, Key, UploadId) of
+        ok ->
+            ok;
+        {error, _} = Error ->
+            Error
+    end.
+
+-spec put_object(data()) -> ok_or_error(term()).
+put_object(
+    #{
+        client := Client,
+        key := Key,
+        buffer := Buffer
+    }
+) ->
+    case emqx_s3_client:put_object(Client, Key, lists:reverse(Buffer)) of
+        ok ->
+            ok;
+        {error, _} = Error ->
+            Error
+    end.
+
+-spec append_buffer(data(), binary()) -> data().
+append_buffer(#{buffer := Buffer, buffer_size := BufferSize} = Data, WriteData) ->
+    Data#{
+        buffer => [WriteData | Buffer],
+        buffer_size => BufferSize + byte_size(WriteData)
+    }.
+
+-compile({inline, [wrap/1, unwrap/1]}).
+wrap(Data) ->
+    fun() -> Data end.
+
+unwrap(WrappedData) ->
+    WrappedData().
+
+is_valid_part(WriteData, #{max_part_size := MaxPartSize, buffer_size := BufferSize}) ->
+    BufferSize + byte_size(WriteData) =< MaxPartSize.

+ 29 - 0
apps/emqx_s3/test/certs/ca.crt

@@ -0,0 +1,29 @@
+-----BEGIN CERTIFICATE-----
+MIIE5DCCAswCCQCF3o0gIdaNDjANBgkqhkiG9w0BAQsFADA0MRIwEAYDVQQKDAlF
+TVFYIFRlc3QxHjAcBgNVBAMMFUNlcnRpZmljYXRlIEF1dGhvcml0eTAeFw0yMTEy
+MzAwODQxMTFaFw00OTA1MTcwODQxMTFaMDQxEjAQBgNVBAoMCUVNUVggVGVzdDEe
+MBwGA1UEAwwVQ2VydGlmaWNhdGUgQXV0aG9yaXR5MIICIjANBgkqhkiG9w0BAQEF
+AAOCAg8AMIICCgKCAgEAqmqSrxyH16j63QhqGLT1UO8I+m6BM3HfnJQM8laQdtJ0
+WgHqCh0/OphH3S7v4SfF4fNJDEJWMWuuzJzU9cTqHPLzhvo3+ZHcMIENgtY2p2Cf
+7AQjEqFViEDyv2ZWNEe76BJeShntdY5NZr4gIPar99YGG/Ln8YekspleV+DU38rE
+EX9WzhgBr02NN9z4NzIxeB+jdvPnxcXs3WpUxzfnUjOQf/T1tManvSdRbFmKMbxl
+A8NLYK3oAYm8EbljWUINUNN6loqYhbigKv8bvo5S4xvRqmX86XB7sc0SApngtNcg
+O0EKn8z/KVPDskE+8lMfGMiU2e2Tzw6Rph57mQPOPtIp5hPiKRik7ST9n0p6piXW
+zRLplJEzSjf40I1u+VHmpXlWI/Fs8b1UkDSMiMVJf0LyWb4ziBSZOY2LtZzWHbWj
+LbNgxQcwSS29tKgUwfEFmFcm+iOM59cPfkl2IgqVLh5h4zmKJJbfQKSaYb5fcKRf
+50b1qsN40VbR3Pk/0lJ0/WqgF6kZCExmT1qzD5HJES/5grjjKA4zIxmHOVU86xOF
+ouWvtilVR4PGkzmkFvwK5yRhBUoGH/A9BurhqOc0QCGay1kqHQFA6se4JJS+9KOS
+x8Rn1Nm6Pi7sd6Le3cKmHTlyl5a/ofKqTCX2Qh+v/7y62V1V1wnoh3ipRjdPTnMC
+AwEAATANBgkqhkiG9w0BAQsFAAOCAgEARCqaocvlMFUQjtFtepO2vyG1krn11xJ0
+e7md26i+g8SxCCYqQ9IqGmQBg0Im8fyNDKRN/LZoj5+A4U4XkG1yya91ZIrPpWyF
+KUiRAItchNj3g1kHmI2ckl1N//6Kpx3DPaS7qXZaN3LTExf6Ph+StE1FnS0wVF+s
+tsNIf6EaQ+ZewW3pjdlLeAws3jvWKUkROc408Ngvx74zbbKo/zAC4tz8oH9ZcpsT
+WD8enVVEeUQKI6ItcpZ9HgTI9TFWgfZ1vYwvkoRwNIeabYI62JKmLEo2vGfGwWKr
+c+GjnJ/tlVI2DpPljfWOnQ037/7yyJI/zo65+HPRmGRD6MuW/BdPDYOvOZUTcQKh
+kANi5THSbJJgZcG3jb1NLebaUQ1H0zgVjn0g3KhUV+NJQYk8RQ7rHtB+MySqTKlM
+kRkRjfTfR0Ykxpks7Mjvsb6NcZENf08ZFPd45+e/ptsxpiKu4e4W4bV7NZDvNKf9
+0/aD3oGYNMiP7s+KJ1lRSAjnBuG21Yk8FpzG+yr8wvJhV8aFgNQ5wIH86SuUTmN0
+5bVzFEIcUejIwvGoQEctNHBlOwHrb7zmB6OwyZeMapdXBQ+9UDhYg8ehDqdDOdfn
+wsBcnjD2MwNhlE1hjL+tZWLNwSHiD6xx3LvNoXZu2HK8Cp3SOrkE69cFghYMIZZb
+T+fp6tNL6LE=
+-----END CERTIFICATE-----

+ 66 - 0
apps/emqx_s3/test/emqx_s3_SUITE.erl

@@ -0,0 +1,66 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_s3_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    {ok, _} = application:ensure_all_started(emqx_s3),
+    Config.
+
+end_per_suite(_Config) ->
+    ok = application:stop(emqx_s3).
+
+%%--------------------------------------------------------------------
+%% Test cases
+%%--------------------------------------------------------------------
+
+t_start_stop_update(_Config) ->
+    ProfileId = <<"test">>,
+    ProfileConfig = profile_config(),
+
+    ?assertMatch(
+        ok,
+        emqx_s3:start_profile(ProfileId, ProfileConfig)
+    ),
+
+    ?assertMatch(
+        {error, _},
+        emqx_s3:start_profile(ProfileId, ProfileConfig)
+    ),
+
+    ?assertEqual(
+        ok,
+        emqx_s3:update_profile(ProfileId, ProfileConfig)
+    ),
+
+    ?assertMatch(
+        {error, _},
+        emqx_s3:update_profile(<<"unknown">>, ProfileConfig)
+    ),
+
+    ?assertEqual(
+        ok,
+        emqx_s3:stop_profile(ProfileId)
+    ),
+
+    ?assertMatch(
+        {error, _},
+        emqx_s3:stop_profile(ProfileId)
+    ).
+
+%%--------------------------------------------------------------------
+%% Helpers
+%%--------------------------------------------------------------------
+
+profile_config() ->
+    emqx_s3_test_helpers:base_config(tcp).

+ 104 - 0
apps/emqx_s3/test/emqx_s3_client_SUITE.erl

@@ -0,0 +1,104 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_s3_client_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-define(PROFILE_ID, atom_to_binary(?MODULE)).
+
+all() ->
+    [
+        {group, tcp},
+        {group, tls}
+    ].
+
+groups() ->
+    AllCases = emqx_common_test_helpers:all(?MODULE),
+    [
+        {tcp, [], AllCases},
+        {tls, [], AllCases}
+    ].
+
+init_per_suite(Config) ->
+    {ok, _} = application:ensure_all_started(emqx_s3),
+    Config.
+
+end_per_suite(_Config) ->
+    ok = application:stop(emqx_s3).
+
+init_per_group(ConnType, Config) ->
+    [{conn_type, ConnType} | Config].
+end_per_group(_ConnType, _Config) ->
+    ok.
+
+init_per_testcase(_TestCase, Config0) ->
+    ConnType = ?config(conn_type, Config0),
+
+    Bucket = emqx_s3_test_helpers:unique_bucket(),
+    TestAwsConfig = emqx_s3_test_helpers:aws_config(ConnType),
+    ok = erlcloud_s3:create_bucket(Bucket, TestAwsConfig),
+    Config1 = [
+        {key, emqx_s3_test_helpers:unique_key()},
+        {bucket, Bucket}
+        | Config0
+    ],
+    {ok, PoolName} = emqx_s3_profile_conf:start_http_pool(?PROFILE_ID, profile_config(Config1)),
+    [{ehttpc_pool_name, PoolName} | Config1].
+
+end_per_testcase(_TestCase, Config) ->
+    ok = ehttpc_sup:stop_pool(?config(ehttpc_pool_name, Config)).
+
+%%--------------------------------------------------------------------
+%% Test cases
+%%--------------------------------------------------------------------
+
+t_multipart_upload(Config) ->
+    Key = ?config(key, Config),
+
+    Client = client(Config),
+
+    {ok, UploadId} = emqx_s3_client:start_multipart(Client, Key),
+
+    Data = data(6_000_000),
+
+    {ok, Etag1} = emqx_s3_client:upload_part(Client, Key, UploadId, 1, Data),
+    {ok, Etag2} = emqx_s3_client:upload_part(Client, Key, UploadId, 2, Data),
+
+    ok = emqx_s3_client:complete_multipart(
+        Client, Key, UploadId, [{1, Etag1}, {2, Etag2}]
+    ).
+
+t_simple_put(Config) ->
+    Key = ?config(key, Config),
+
+    Client = client(Config),
+
+    Data = data(6_000_000),
+
+    ok = emqx_s3_client:put_object(Client, Key, Data).
+
+%%--------------------------------------------------------------------
+%% Helpers
+%%--------------------------------------------------------------------
+
+client(Config) ->
+    ClientConfig = emqx_s3_profile_conf:client_config(
+        profile_config(Config), ?config(ehttpc_pool_name, Config)
+    ),
+    emqx_s3_client:create(ClientConfig).
+
+profile_config(Config) ->
+    maps:put(
+        bucket,
+        ?config(bucket, Config),
+        emqx_s3_test_helpers:base_config(?config(conn_type, Config))
+    ).
+
+data(Size) ->
+    iolist_to_binary([$a || _ <- lists:seq(1, Size)]).

+ 293 - 0
apps/emqx_s3/test/emqx_s3_profile_conf_SUITE.erl

@@ -0,0 +1,293 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_s3_profile_conf_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-define(assertWaitEvent(Code, EventMatch, Timeout),
+    ?assertMatch(
+        {_, {ok, EventMatch}},
+        ?wait_async_action(
+            Code,
+            EventMatch,
+            Timeout
+        )
+    )
+).
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+suite() -> [{timetrap, {minutes, 1}}].
+
+init_per_suite(Config) ->
+    {ok, _} = application:ensure_all_started(emqx_s3),
+    Config.
+
+end_per_suite(_Config) ->
+    ok = application:stop(emqx_s3).
+
+init_per_testcase(_TestCase, Config) ->
+    ok = snabbkaffe:start_trace(),
+    TestAwsConfig = emqx_s3_test_helpers:aws_config(tcp),
+
+    Bucket = emqx_s3_test_helpers:unique_bucket(),
+    ok = erlcloud_s3:create_bucket(Bucket, TestAwsConfig),
+
+    ProfileBaseConfig = emqx_s3_test_helpers:base_config(tcp),
+    ProfileConfig = ProfileBaseConfig#{bucket => Bucket},
+    ok = emqx_s3:start_profile(profile_id(), ProfileConfig),
+
+    [{profile_config, ProfileConfig} | Config].
+
+end_per_testcase(_TestCase, _Config) ->
+    ok = snabbkaffe:stop(),
+    _ = emqx_s3:stop_profile(profile_id()).
+
+%%--------------------------------------------------------------------
+%% Test cases
+%%--------------------------------------------------------------------
+
+t_regular_outdated_pool_cleanup(Config) ->
+    _ = process_flag(trap_exit, true),
+    Key = emqx_s3_test_helpers:unique_key(),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+
+    [OldPool] = emqx_s3_profile_http_pools:all(profile_id()),
+
+    ProfileBaseConfig = ?config(profile_config, Config),
+    ProfileConfig = emqx_map_lib:deep_put(
+        [transport_options, pool_size], ProfileBaseConfig, 16
+    ),
+    ok = emqx_s3:update_profile(profile_id(), ProfileConfig),
+
+    ?assertEqual(
+        2,
+        length(emqx_s3_profile_http_pools:all(profile_id()))
+    ),
+
+    ?assertWaitEvent(
+        ok = emqx_s3_uploader:abort(Pid),
+        #{?snk_kind := "s3_stop_http_pool", pool_name := OldPool},
+        1000
+    ),
+
+    [NewPool] = emqx_s3_profile_http_pools:all(profile_id()),
+
+    ?assertWaitEvent(
+        ok = emqx_s3:stop_profile(profile_id()),
+        #{?snk_kind := "s3_stop_http_pool", pool_name := NewPool},
+        1000
+    ),
+
+    ?assertEqual(
+        0,
+        length(emqx_s3_profile_http_pools:all(profile_id()))
+    ).
+
+t_timeout_pool_cleanup(Config) ->
+    _ = process_flag(trap_exit, true),
+
+    %% We restart the profile to set `http_pool_timeout` value suitable for test
+    ok = emqx_s3:stop_profile(profile_id()),
+    ProfileBaseConfig = ?config(profile_config, Config),
+    ProfileConfig = ProfileBaseConfig#{
+        http_pool_timeout => 500,
+        http_pool_cleanup_interval => 100
+    },
+    ok = emqx_s3:start_profile(profile_id(), ProfileConfig),
+
+    %% Start uploader
+    Key = emqx_s3_test_helpers:unique_key(),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+    ok = emqx_s3_uploader:write(Pid, <<"data">>),
+
+    [OldPool] = emqx_s3_profile_http_pools:all(profile_id()),
+
+    NewProfileConfig = emqx_map_lib:deep_put(
+        [transport_options, pool_size], ProfileConfig, 16
+    ),
+
+    %% We update profile to create new pool and wait for the old one to be stopped by timeout
+    ?assertWaitEvent(
+        ok = emqx_s3:update_profile(profile_id(), NewProfileConfig),
+        #{?snk_kind := "s3_stop_http_pool", pool_name := OldPool},
+        1000
+    ),
+
+    %% The uploader now has no valid pool and should fail
+    ?assertMatch(
+        {error, _},
+        emqx_s3_uploader:complete(Pid)
+    ).
+
+t_checkout_no_profile(_Config) ->
+    ?assertEqual(
+        {error, profile_not_found},
+        emqx_s3_profile_conf:checkout_config(<<"no_such_profile">>)
+    ).
+
+t_httpc_pool_start_error(Config) ->
+    %% `ehhtpc_pool`s are lazy so it is difficult to trigger an error
+    %% passing some bad connection options.
+    %% So we emulate some unknown crash with `meck`.
+    meck:new(ehttpc_pool, [passthrough]),
+    meck:expect(ehttpc_pool, init, fun(_) -> meck:raise(error, badarg) end),
+
+    ?assertMatch(
+        {error, _},
+        emqx_s3:start_profile(<<"profile">>, ?config(profile_config, Config))
+    ).
+
+t_httpc_pool_update_error(Config) ->
+    %% `ehhtpc_pool`s are lazy so it is difficult to trigger an error
+    %% passing some bad connection options.
+    %% So we emulate some unknown crash with `meck`.
+    meck:new(ehttpc_pool, [passthrough]),
+    meck:expect(ehttpc_pool, init, fun(_) -> meck:raise(error, badarg) end),
+
+    ProfileBaseConfig = ?config(profile_config, Config),
+    NewProfileConfig = emqx_map_lib:deep_put(
+        [transport_options, pool_size], ProfileBaseConfig, 16
+    ),
+
+    ?assertMatch(
+        {error, _},
+        emqx_s3:start_profile(<<"profile">>, NewProfileConfig)
+    ).
+
+t_orphaned_pools_cleanup(_Config) ->
+    ProfileId = profile_id(),
+    Pid = gproc:where({n, l, emqx_s3_profile_conf:id(ProfileId)}),
+
+    %% We kill conf and wait for it to restart
+    %% and create a new pool
+    ?assertWaitEvent(
+        exit(Pid, kill),
+        #{?snk_kind := "s3_start_http_pool", profile_id := ProfileId},
+        1000
+    ),
+
+    %% We should still have only one pool
+    ?assertEqual(
+        1,
+        length(emqx_s3_profile_http_pools:all(ProfileId))
+    ).
+
+t_orphaned_pools_cleanup_non_graceful(_Config) ->
+    ProfileId = profile_id(),
+    Pid = gproc:where({n, l, emqx_s3_profile_conf:id(ProfileId)}),
+
+    %% We stop pool, conf server should not fail when attempting to stop it once more
+    [PoolName] = emqx_s3_profile_http_pools:all(ProfileId),
+    ok = ehttpc_pool:stop_pool(PoolName),
+
+    %% We kill conf and wait for it to restart
+    %% and create a new pool
+    ?assertWaitEvent(
+        exit(Pid, kill),
+        #{?snk_kind := "s3_start_http_pool", profile_id := ProfileId},
+        1000
+    ),
+
+    %% We should still have only one pool
+    ?assertEqual(
+        1,
+        length(emqx_s3_profile_http_pools:all(ProfileId))
+    ).
+
+t_checkout_client(Config) ->
+    ProfileId = profile_id(),
+    Key = emqx_s3_test_helpers:unique_key(),
+    Caller = self(),
+    Pid = spawn_link(fun() ->
+        emqx_s3:with_client(
+            ProfileId,
+            fun(Client) ->
+                receive
+                    put_object ->
+                        Caller ! {put_object, emqx_s3_client:put_object(Client, Key, <<"data">>)}
+                end,
+                receive
+                    list_objects ->
+                        Caller ! {list_objects, emqx_s3_client:list(Client, [])}
+                end
+            end
+        ),
+        Caller ! client_released,
+        receive
+            stop -> ok
+        end
+    end),
+
+    %% Ask spawned process to put object
+    Pid ! put_object,
+    receive
+        {put_object, ok} -> ok
+    after 1000 ->
+        ct:fail("put_object fail")
+    end,
+
+    %% Now change config for the profile
+    ProfileBaseConfig = ?config(profile_config, Config),
+    NewProfileConfig0 = ProfileBaseConfig#{bucket => <<"new_bucket">>},
+    NewProfileConfig1 = emqx_map_lib:deep_put(
+        [transport_options, pool_size], NewProfileConfig0, 16
+    ),
+    ok = emqx_s3:update_profile(profile_id(), NewProfileConfig1),
+
+    %% We should have two pools now, because the old one is still in use
+    %% by the spawned process
+    ?assertEqual(
+        2,
+        length(emqx_s3_profile_http_pools:all(ProfileId))
+    ),
+
+    %% Ask spawned process to list objects
+    Pid ! list_objects,
+    receive
+        {list_objects, Result} ->
+            {ok, OkResult} = Result,
+            Contents = proplists:get_value(contents, OkResult),
+            ?assertEqual(1, length(Contents)),
+            ?assertEqual(Key, proplists:get_value(key, hd(Contents)))
+    after 1000 ->
+        ct:fail("list_objects fail")
+    end,
+
+    %% Wait till spawned process releases client
+    receive
+        client_released -> ok
+    after 1000 ->
+        ct:fail("client not released")
+    end,
+
+    %% We should have only one pool now, because the old one is released
+    ?assertEqual(
+        1,
+        length(emqx_s3_profile_http_pools:all(ProfileId))
+    ).
+
+t_unknown_messages(_Config) ->
+    Pid = gproc:where({n, l, emqx_s3_profile_conf:id(profile_id())}),
+
+    Pid ! unknown,
+    ok = gen_server:cast(Pid, unknown),
+
+    ?assertEqual(
+        {error, not_implemented},
+        gen_server:call(Pid, unknown)
+    ).
+
+%%--------------------------------------------------------------------
+%% Test helpers
+%%--------------------------------------------------------------------
+
+profile_id() ->
+    <<"test">>.

+ 154 - 0
apps/emqx_s3/test/emqx_s3_schema_SUITE.erl

@@ -0,0 +1,154 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_s3_schema_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+%%--------------------------------------------------------------------
+%% Test cases
+%%--------------------------------------------------------------------
+
+t_minimal_config(_Config) ->
+    ?assertMatch(
+        #{
+            bucket := "bucket",
+            host := "s3.us-east-1.endpoint.com",
+            port := 443,
+            acl := private,
+            min_part_size := 5242880,
+            transport_options :=
+                #{
+                    connect_timeout := 15000,
+                    enable_pipelining := 100,
+                    pool_size := 8,
+                    pool_type := random,
+                    ssl := #{enable := false}
+                }
+        },
+        emqx_s3_schema:translate(#{
+            <<"bucket">> => <<"bucket">>,
+            <<"host">> => <<"s3.us-east-1.endpoint.com">>,
+            <<"port">> => 443
+        })
+    ).
+
+t_full_config(_Config) ->
+    ?assertMatch(
+        #{
+            access_key_id := "access_key_id",
+            acl := public_read,
+            bucket := "bucket",
+            host := "s3.us-east-1.endpoint.com",
+            min_part_size := 10485760,
+            port := 443,
+            secret_access_key := "secret_access_key",
+            transport_options :=
+                #{
+                    connect_timeout := 30000,
+                    enable_pipelining := 200,
+                    headers := #{<<"x-amz-acl">> := <<"public-read">>},
+                    max_retries := 3,
+                    pool_size := 10,
+                    pool_type := random,
+                    request_timeout := 10000,
+                    ssl :=
+                        #{
+                            cacertfile := <<"cacertfile.crt">>,
+                            certfile := <<"server.crt">>,
+                            ciphers := ["ECDHE-RSA-AES256-GCM-SHA384"],
+                            depth := 10,
+                            enable := true,
+                            keyfile := <<"server.key">>,
+                            reuse_sessions := true,
+                            secure_renegotiate := true,
+                            server_name_indication := "some-host",
+                            verify := verify_peer,
+                            versions := ['tlsv1.2']
+                        }
+                }
+        },
+        emqx_s3_schema:translate(#{
+            <<"access_key_id">> => <<"access_key_id">>,
+            <<"secret_access_key">> => <<"secret_access_key">>,
+            <<"bucket">> => <<"bucket">>,
+            <<"host">> => <<"s3.us-east-1.endpoint.com">>,
+            <<"port">> => 443,
+            <<"min_part_size">> => <<"10mb">>,
+            <<"acl">> => <<"public_read">>,
+            <<"transport_options">> => #{
+                <<"connect_timeout">> => 30000,
+                <<"enable_pipelining">> => 200,
+                <<"pool_size">> => 10,
+                <<"pool_type">> => <<"random">>,
+                <<"ssl">> => #{
+                    <<"enable">> => true,
+                    <<"keyfile">> => <<"server.key">>,
+                    <<"certfile">> => <<"server.crt">>,
+                    <<"cacertfile">> => <<"cacertfile.crt">>,
+                    <<"server_name_indication">> => <<"some-host">>,
+                    <<"verify">> => <<"verify_peer">>,
+                    <<"versions">> => [<<"tlsv1.2">>],
+                    <<"ciphers">> => [<<"ECDHE-RSA-AES256-GCM-SHA384">>]
+                },
+                <<"request_timeout">> => <<"10s">>,
+                <<"max_retries">> => 3,
+                <<"headers">> => #{
+                    <<"x-amz-acl">> => <<"public-read">>
+                }
+            }
+        })
+    ).
+
+t_invalid_limits(_Config) ->
+    ?assertException(
+        throw,
+        {emqx_s3_schema, [#{kind := validation_error, path := "s3.min_part_size"}]},
+        emqx_s3_schema:translate(#{
+            <<"bucket">> => <<"bucket">>,
+            <<"host">> => <<"s3.us-east-1.endpoint.com">>,
+            <<"port">> => 443,
+            <<"min_part_size">> => <<"1mb">>
+        })
+    ),
+
+    ?assertException(
+        throw,
+        {emqx_s3_schema, [#{kind := validation_error, path := "s3.min_part_size"}]},
+        emqx_s3_schema:translate(#{
+            <<"bucket">> => <<"bucket">>,
+            <<"host">> => <<"s3.us-east-1.endpoint.com">>,
+            <<"port">> => 443,
+            <<"min_part_size">> => <<"100000gb">>
+        })
+    ),
+
+    ?assertException(
+        throw,
+        {emqx_s3_schema, [#{kind := validation_error, path := "s3.max_part_size"}]},
+        emqx_s3_schema:translate(#{
+            <<"bucket">> => <<"bucket">>,
+            <<"host">> => <<"s3.us-east-1.endpoint.com">>,
+            <<"port">> => 443,
+            <<"max_part_size">> => <<"1mb">>
+        })
+    ),
+
+    ?assertException(
+        throw,
+        {emqx_s3_schema, [#{kind := validation_error, path := "s3.max_part_size"}]},
+        emqx_s3_schema:translate(#{
+            <<"bucket">> => <<"bucket">>,
+            <<"host">> => <<"s3.us-east-1.endpoint.com">>,
+            <<"port">> => 443,
+            <<"max_part_size">> => <<"100000gb">>
+        })
+    ).

+ 135 - 0
apps/emqx_s3/test/emqx_s3_test_helpers.erl

@@ -0,0 +1,135 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_s3_test_helpers).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-define(ACCESS_KEY_ID, "minioadmin").
+-define(SECRET_ACCESS_KEY, "minioadmin").
+
+-define(TOXIPROXY_HOST, "toxiproxy").
+-define(TOXIPROXY_PORT, 8474).
+
+-define(TCP_HOST, ?TOXIPROXY_HOST).
+-define(TCP_PORT, 19000).
+-define(TLS_HOST, ?TOXIPROXY_HOST).
+-define(TLS_PORT, 19100).
+
+-include_lib("erlcloud/include/erlcloud_aws.hrl").
+
+-export([
+    aws_config/1,
+    base_raw_config/1,
+    base_config/1,
+
+    unique_key/0,
+    unique_bucket/0,
+
+    with_failure/3
+]).
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+aws_config(tcp) ->
+    erlcloud_s3_new(
+        ?ACCESS_KEY_ID,
+        ?SECRET_ACCESS_KEY,
+        ?TCP_HOST,
+        ?TCP_PORT,
+        "http://"
+    );
+aws_config(tls) ->
+    erlcloud_s3_new(
+        ?ACCESS_KEY_ID,
+        ?SECRET_ACCESS_KEY,
+        ?TLS_HOST,
+        ?TLS_PORT,
+        "https://"
+    ).
+
+base_raw_config(tcp) ->
+    #{
+        <<"bucket">> => <<"bucket">>,
+        <<"access_key_id">> => bin(?ACCESS_KEY_ID),
+        <<"secret_access_key">> => bin(?SECRET_ACCESS_KEY),
+        <<"host">> => ?TCP_HOST,
+        <<"port">> => ?TCP_PORT,
+        <<"max_part_size">> => 10 * 1024 * 1024,
+        <<"transport_options">> =>
+            #{
+                <<"request_timeout">> => 2000
+            }
+    };
+base_raw_config(tls) ->
+    #{
+        <<"bucket">> => <<"bucket">>,
+        <<"access_key_id">> => bin(?ACCESS_KEY_ID),
+        <<"secret_access_key">> => bin(?SECRET_ACCESS_KEY),
+        <<"host">> => ?TLS_HOST,
+        <<"port">> => ?TLS_PORT,
+        <<"max_part_size">> => 10 * 1024 * 1024,
+        <<"transport_options">> =>
+            #{
+                <<"request_timeout">> => 2000,
+                <<"ssl">> => #{
+                    <<"enable">> => true,
+                    <<"cacertfile">> => bin(cert_path("ca.crt")),
+                    <<"server_name_indication">> => <<"authn-server">>,
+                    <<"verify">> => <<"verify_peer">>
+                }
+            }
+    }.
+
+base_config(ConnType) ->
+    emqx_s3_schema:translate(base_raw_config(ConnType)).
+
+unique_key() ->
+    "key-" ++ integer_to_list(erlang:system_time(millisecond)) ++ "-" ++
+        integer_to_list(erlang:unique_integer([positive])).
+
+unique_bucket() ->
+    "bucket-" ++ integer_to_list(erlang:system_time(millisecond)) ++ "-" ++
+        integer_to_list(erlang:unique_integer([positive])).
+
+with_failure(_ConnType, ehttpc_500, Fun) ->
+    try
+        meck:new(ehttpc, [passthrough, no_history]),
+        meck:expect(ehttpc, request, fun(_, _, _, _) -> {ok, 500, []} end),
+        Fun()
+    after
+        meck:unload(ehttpc)
+    end;
+with_failure(ConnType, FailureType, Fun) ->
+    emqx_common_test_helpers:with_failure(
+        FailureType,
+        toxproxy_name(ConnType),
+        ?TOXIPROXY_HOST,
+        ?TOXIPROXY_PORT,
+        Fun
+    ).
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+toxproxy_name(tcp) -> "minio_tcp";
+toxproxy_name(tls) -> "minio_tls".
+
+cert_path(FileName) ->
+    Dir = code:lib_dir(emqx_s3, test),
+    filename:join([Dir, <<"certs">>, FileName]).
+
+bin(String) when is_list(String) -> list_to_binary(String);
+bin(Binary) when is_binary(Binary) -> Binary.
+
+erlcloud_s3_new(AccessKeyId, SecretAccessKey, Host, Port, Scheme) ->
+    AwsConfig = erlcloud_s3:new(AccessKeyId, SecretAccessKey, Host, Port),
+    AwsConfig#aws_config{
+        s3_scheme = Scheme,
+        s3_bucket_access_method = path
+    }.

+ 535 - 0
apps/emqx_s3/test/emqx_s3_uploader_SUITE.erl

@@ -0,0 +1,535 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_s3_uploader_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-define(assertProcessExited(Reason, Pid),
+    receive
+        {'DOWN', _, _, Pid, Reason} ->
+            % ct:print("uploader process exited with reason: ~p", [R]),
+            ok
+    after 3000 ->
+        ct:fail("uploader process did not exit")
+    end
+).
+
+-define(assertObjectEqual(Value, AwsConfig, Bucket, Key),
+    ?assertEqual(
+        Value,
+        proplists:get_value(
+            content,
+            erlcloud_s3:get_object(
+                Bucket,
+                Key,
+                AwsConfig
+            )
+        )
+    )
+).
+
+all() ->
+    [
+        {group, tcp},
+        {group, tls}
+    ].
+
+groups() ->
+    [
+        {tcp, [
+            {group, common_cases},
+            {group, tcp_cases}
+        ]},
+        {tls, [
+            {group, common_cases},
+            {group, tls_cases}
+        ]},
+        {common_cases, [], [
+            t_happy_path_simple_put,
+            t_happy_path_multi,
+            t_abort_multi,
+            t_abort_simple_put,
+
+            {group, noconn_errors},
+            {group, timeout_errors},
+            {group, http_errors}
+        ]},
+
+        {tcp_cases, [
+            t_config_switch,
+            t_config_switch_http_settings,
+            t_too_large,
+            t_no_profile
+        ]},
+
+        {tls_cases, [
+            t_tls_error
+        ]},
+
+        {noconn_errors, [{group, transport_errors}]},
+        {timeout_errors, [{group, transport_errors}]},
+        {http_errors, [{group, transport_errors}]},
+
+        {transport_errors, [
+            t_start_multipart_error,
+            t_upload_part_error,
+            t_complete_multipart_error,
+            t_abort_multipart_error,
+            t_put_object_error
+        ]}
+    ].
+
+suite() -> [{timetrap, {minutes, 1}}].
+
+init_per_suite(Config) ->
+    {ok, _} = application:ensure_all_started(emqx_s3),
+    Config.
+
+end_per_suite(_Config) ->
+    ok = application:stop(emqx_s3).
+
+init_per_group(Group, Config) when Group =:= tcp orelse Group =:= tls ->
+    [{conn_type, Group} | Config];
+init_per_group(noconn_errors, Config) ->
+    [{failure, down} | Config];
+init_per_group(timeout_errors, Config) ->
+    [{failure, timeout} | Config];
+init_per_group(http_errors, Config) ->
+    [{failure, ehttpc_500} | Config];
+init_per_group(_ConnType, Config) ->
+    Config.
+
+end_per_group(_ConnType, _Config) ->
+    ok.
+
+init_per_testcase(_TestCase, Config) ->
+    ok = snabbkaffe:start_trace(),
+    ConnType = ?config(conn_type, Config),
+    TestAwsConfig = emqx_s3_test_helpers:aws_config(ConnType),
+
+    Bucket = emqx_s3_test_helpers:unique_bucket(),
+    ok = erlcloud_s3:create_bucket(Bucket, TestAwsConfig),
+
+    ProfileBaseConfig = emqx_s3_test_helpers:base_config(ConnType),
+    ProfileConfig = ProfileBaseConfig#{bucket => Bucket},
+    ok = emqx_s3:start_profile(profile_id(), ProfileConfig),
+
+    [{bucket, Bucket}, {test_aws_config, TestAwsConfig}, {profile_config, ProfileConfig} | Config].
+
+end_per_testcase(_TestCase, _Config) ->
+    ok = snabbkaffe:stop(),
+    _ = emqx_s3:stop_profile(profile_id()).
+
+%%--------------------------------------------------------------------
+%% Test cases
+%%--------------------------------------------------------------------
+
+t_happy_path_simple_put(Config) ->
+    Key = emqx_s3_test_helpers:unique_key(),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+
+    _ = erlang:monitor(process, Pid),
+
+    Data = data($a, 1024, 10),
+
+    lists:foreach(
+        fun(Chunk) ->
+            ?assertEqual(
+                ok,
+                emqx_s3_uploader:write(Pid, Chunk)
+            )
+        end,
+        Data
+    ),
+
+    ok = emqx_s3_uploader:complete(Pid),
+
+    ?assertProcessExited(
+        normal,
+        Pid
+    ),
+
+    ?assertObjectEqual(
+        iolist_to_binary(Data),
+        ?config(test_aws_config, Config),
+        ?config(bucket, Config),
+        Key
+    ).
+
+t_happy_path_multi(Config) ->
+    Key = emqx_s3_test_helpers:unique_key(),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+
+    _ = erlang:monitor(process, Pid),
+
+    Data = data($a, 1024 * 1024, 10),
+
+    lists:foreach(
+        fun(Chunk) ->
+            ?assertEqual(
+                ok,
+                emqx_s3_uploader:write(Pid, Chunk)
+            )
+        end,
+        Data
+    ),
+
+    ok = emqx_s3_uploader:complete(Pid),
+
+    ?assertProcessExited(
+        normal,
+        Pid
+    ),
+
+    ?assertObjectEqual(
+        iolist_to_binary(Data),
+        ?config(test_aws_config, Config),
+        ?config(bucket, Config),
+        Key
+    ).
+
+t_abort_multi(Config) ->
+    Key = emqx_s3_test_helpers:unique_key(),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+
+    _ = erlang:monitor(process, Pid),
+
+    [Data] = data($a, 6 * 1024 * 1024, 1),
+
+    ok = emqx_s3_uploader:write(Pid, Data),
+
+    ?assertMatch(
+        [],
+        list_objects(Config)
+    ),
+
+    ok = emqx_s3_uploader:abort(Pid),
+
+    ?assertMatch(
+        [],
+        list_objects(Config)
+    ),
+
+    ?assertProcessExited(
+        normal,
+        Pid
+    ).
+
+t_abort_simple_put(_Config) ->
+    Key = emqx_s3_test_helpers:unique_key(),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+
+    _ = erlang:monitor(process, Pid),
+
+    [Data] = data($a, 10 * 1024, 1),
+
+    ok = emqx_s3_uploader:write(Pid, Data),
+
+    ok = emqx_s3_uploader:abort(Pid),
+
+    ?assertProcessExited(
+        normal,
+        Pid
+    ).
+
+t_config_switch(Config) ->
+    Key = emqx_s3_test_helpers:unique_key(),
+    OldBucket = ?config(bucket, Config),
+    {ok, Pid0} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+
+    [Data0, Data1] = data($a, 6 * 1024 * 1024, 2),
+
+    ok = emqx_s3_uploader:write(Pid0, Data0),
+
+    %% Switch to the new config, but without changing HTTP settings
+    ProfileConfig = ?config(profile_config, Config),
+    NewBucket = emqx_s3_test_helpers:unique_bucket(),
+    ok = erlcloud_s3:create_bucket(NewBucket, ?config(test_aws_config, Config)),
+    NewProfileConfig = ProfileConfig#{bucket => NewBucket},
+
+    ok = emqx_s3:update_profile(profile_id(), NewProfileConfig),
+
+    %% Already started uploader should be OK and use previous config
+    ok = emqx_s3_uploader:write(Pid0, Data1),
+    ok = emqx_s3_uploader:complete(Pid0),
+
+    ?assertObjectEqual(
+        iolist_to_binary([Data0, Data1]),
+        ?config(test_aws_config, Config),
+        OldBucket,
+        Key
+    ),
+
+    %% Now check that new uploader uses new config
+    {ok, Pid1} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+    ok = emqx_s3_uploader:write(Pid1, Data0),
+    ok = emqx_s3_uploader:complete(Pid1),
+
+    ?assertObjectEqual(
+        iolist_to_binary(Data0),
+        ?config(test_aws_config, Config),
+        NewBucket,
+        Key
+    ).
+
+t_config_switch_http_settings(Config) ->
+    Key = emqx_s3_test_helpers:unique_key(),
+    OldBucket = ?config(bucket, Config),
+    {ok, Pid0} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+
+    [Data0, Data1] = data($a, 6 * 1024 * 1024, 2),
+
+    ok = emqx_s3_uploader:write(Pid0, Data0),
+
+    %% Switch to the new config, completely changing HTTP settings (tcp -> tls)
+    NewBucket = emqx_s3_test_helpers:unique_bucket(),
+    NewTestAwsConfig = emqx_s3_test_helpers:aws_config(tls),
+    ok = erlcloud_s3:create_bucket(NewBucket, NewTestAwsConfig),
+    NewProfileConfig0 = emqx_s3_test_helpers:base_config(tls),
+    NewProfileConfig1 = NewProfileConfig0#{bucket => NewBucket},
+
+    ok = emqx_s3:update_profile(profile_id(), NewProfileConfig1),
+
+    %% Already started uploader should be OK and use previous config
+    ok = emqx_s3_uploader:write(Pid0, Data1),
+    ok = emqx_s3_uploader:complete(Pid0),
+
+    ?assertObjectEqual(
+        iolist_to_binary([Data0, Data1]),
+        ?config(test_aws_config, Config),
+        OldBucket,
+        Key
+    ),
+
+    %% Now check that new uploader uses new config
+    {ok, Pid1} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+    ok = emqx_s3_uploader:write(Pid1, Data0),
+    ok = emqx_s3_uploader:complete(Pid1),
+
+    ?assertObjectEqual(
+        iolist_to_binary(Data0),
+        NewTestAwsConfig,
+        NewBucket,
+        Key
+    ).
+
+t_start_multipart_error(Config) ->
+    _ = process_flag(trap_exit, true),
+
+    Key = emqx_s3_test_helpers:unique_key(),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+
+    _ = erlang:monitor(process, Pid),
+
+    [Data] = data($a, 6 * 1024 * 1024, 1),
+
+    emqx_s3_test_helpers:with_failure(
+        ?config(conn_type, Config),
+        ?config(failure, Config),
+        fun() ->
+            ?assertMatch(
+                {error, _},
+                emqx_s3_uploader:write(Pid, Data)
+            )
+        end
+    ),
+
+    ?assertProcessExited(
+        {error, _},
+        Pid
+    ).
+
+t_upload_part_error(Config) ->
+    _ = process_flag(trap_exit, true),
+
+    Key = emqx_s3_test_helpers:unique_key(),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+
+    _ = erlang:monitor(process, Pid),
+
+    [Data0, Data1] = data($a, 6 * 1024 * 1024, 2),
+
+    ok = emqx_s3_uploader:write(Pid, Data0),
+
+    emqx_s3_test_helpers:with_failure(
+        ?config(conn_type, Config),
+        ?config(failure, Config),
+        fun() ->
+            ?assertMatch(
+                {error, _},
+                emqx_s3_uploader:write(Pid, Data1)
+            )
+        end
+    ),
+
+    ?assertProcessExited(
+        {error, _},
+        Pid
+    ).
+
+t_abort_multipart_error(Config) ->
+    _ = process_flag(trap_exit, true),
+
+    Key = emqx_s3_test_helpers:unique_key(),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+
+    _ = erlang:monitor(process, Pid),
+
+    [Data] = data($a, 6 * 1024 * 1024, 1),
+
+    ok = emqx_s3_uploader:write(Pid, Data),
+
+    emqx_s3_test_helpers:with_failure(
+        ?config(conn_type, Config),
+        ?config(failure, Config),
+        fun() ->
+            ?assertMatch(
+                {error, _},
+                emqx_s3_uploader:abort(Pid)
+            )
+        end
+    ),
+
+    ?assertProcessExited(
+        {error, _},
+        Pid
+    ).
+
+t_complete_multipart_error(Config) ->
+    _ = process_flag(trap_exit, true),
+
+    Key = emqx_s3_test_helpers:unique_key(),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+
+    _ = erlang:monitor(process, Pid),
+
+    [Data] = data($a, 6 * 1024 * 1024, 1),
+
+    ok = emqx_s3_uploader:write(Pid, Data),
+
+    emqx_s3_test_helpers:with_failure(
+        ?config(conn_type, Config),
+        ?config(failure, Config),
+        fun() ->
+            ?assertMatch(
+                {error, _},
+                emqx_s3_uploader:complete(Pid)
+            )
+        end
+    ),
+
+    ?assertProcessExited(
+        {error, _},
+        Pid
+    ).
+
+t_put_object_error(Config) ->
+    _ = process_flag(trap_exit, true),
+
+    Key = emqx_s3_test_helpers:unique_key(),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+
+    _ = erlang:monitor(process, Pid),
+
+    %% Little data to avoid multipart upload
+    [Data] = data($a, 1024, 1),
+
+    emqx_s3_test_helpers:with_failure(
+        ?config(conn_type, Config),
+        ?config(failure, Config),
+        fun() ->
+            ok = emqx_s3_uploader:write(Pid, Data),
+            ?assertMatch(
+                {error, _},
+                emqx_s3_uploader:complete(Pid)
+            )
+        end
+    ),
+
+    ?assertProcessExited(
+        {error, _},
+        Pid
+    ).
+
+t_too_large(Config) ->
+    Key = emqx_s3_test_helpers:unique_key(),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+
+    _ = erlang:monitor(process, Pid),
+
+    [Data] = data($a, 1024, 1),
+
+    [DataLarge] = data($a, 20 * 1024 * 1024, 1),
+
+    ?assertMatch(
+        {error, {too_large, _}},
+        emqx_s3_uploader:write(Pid, DataLarge)
+    ),
+
+    ok = emqx_s3_uploader:write(Pid, Data),
+    ok = emqx_s3_uploader:complete(Pid),
+
+    ?assertProcessExited(
+        normal,
+        Pid
+    ),
+
+    ?assertObjectEqual(
+        iolist_to_binary(Data),
+        ?config(test_aws_config, Config),
+        ?config(bucket, Config),
+        Key
+    ).
+
+t_tls_error(Config) ->
+    _ = process_flag(trap_exit, true),
+
+    ProfileBaseConfig = ?config(profile_config, Config),
+    ProfileConfig = emqx_map_lib:deep_put(
+        [transport_options, ssl, server_name_indication], ProfileBaseConfig, "invalid-hostname"
+    ),
+    ok = emqx_s3:update_profile(profile_id(), ProfileConfig),
+    Key = emqx_s3_test_helpers:unique_key(),
+    {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
+
+    _ = erlang:monitor(process, Pid),
+
+    [Data] = data($a, 6 * 1024 * 1024, 1),
+
+    ?assertMatch(
+        {error, _},
+        emqx_s3_uploader:write(Pid, Data)
+    ),
+
+    ?assertProcessExited(
+        {error, _},
+        Pid
+    ).
+
+t_no_profile(_Config) ->
+    Key = emqx_s3_test_helpers:unique_key(),
+    ?assertMatch(
+        {error, profile_not_found},
+        emqx_s3:start_uploader(<<"no-profile">>, #{key => Key})
+    ).
+
+%%--------------------------------------------------------------------
+%% Test helpers
+%%--------------------------------------------------------------------
+
+profile_id() ->
+    <<"test">>.
+
+data(Byte, ChunkSize, ChunkCount) ->
+    Chunk = iolist_to_binary([Byte || _ <- lists:seq(1, ChunkSize)]),
+    [Chunk || _ <- lists:seq(1, ChunkCount)].
+
+list_objects(Config) ->
+    Props = erlcloud_s3:list_objects(?config(bucket, Config), [], ?config(test_aws_config, Config)),
+    proplists:get_value(contents, Props).

+ 2 - 1
rebar.config.erl

@@ -401,7 +401,8 @@ relx_apps(ReleaseType, Edition) ->
             emqx_psk,
             emqx_slow_subs,
             emqx_plugins,
-            emqx_ft
+            emqx_ft,
+            emqx_s3
         ] ++
         [quicer || is_quicer_supported()] ++
         [bcrypt || provide_bcrypt_release(ReleaseType)] ++

+ 11 - 1
scripts/ct/run.sh

@@ -91,6 +91,12 @@ if [ "${WHICH_APP}" = 'novalue' ]; then
     exit 1
 fi
 
+if [ ! -d "${WHICH_APP}" ]; then
+    echo "must provide an existing path for --app arg"
+    help
+    exit 1
+fi
+
 if [[ "${WHICH_APP}" == lib-ee* && (-z "${PROFILE+x}" || "${PROFILE}" != emqx-enterprise) ]]; then
     echo 'You are trying to run an enterprise test case without the emqx-enterprise profile.'
     echo 'This will most likely not work.'
@@ -172,10 +178,14 @@ for dep in ${CT_DEPS}; do
             ;;
         rocketmq)
             FILES+=( '.ci/docker-compose-file/docker-compose-rocketmq.yaml' )
-            ;; 
+            ;;
         cassandra)
             FILES+=( '.ci/docker-compose-file/docker-compose-cassandra.yaml' )
             ;;
+        minio)
+            FILES+=( '.ci/docker-compose-file/docker-compose-minio-tcp.yaml'
+                     '.ci/docker-compose-file/docker-compose-minio-tls.yaml' )
+            ;;
         *)
             echo "unknown_ct_dependency $dep"
             exit 1