Ilya Averyanov 3 лет назад
Родитель
Сommit
bd7250cb13

+ 27 - 17
apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl

@@ -137,10 +137,11 @@ s3_headers({ClientId, FileId}, Filemeta) ->
 list(Client, Options) ->
     case list_key_info(Client, Options) of
         {ok, KeyInfos} ->
-            {ok,
-                lists:map(
-                    fun(KeyInfo) -> key_info_to_exportinfo(Client, KeyInfo, Options) end, KeyInfos
-                )};
+            MaybeExportInfos = lists:map(
+                fun(KeyInfo) -> key_info_to_exportinfo(Client, KeyInfo, Options) end, KeyInfos
+            ),
+            ExportInfos = [ExportInfo || {ok, ExportInfo} <- MaybeExportInfos],
+            {ok, ExportInfos};
         {error, _Reason} = Error ->
             Error
     end.
@@ -170,14 +171,18 @@ next_marker(KeyInfos) ->
 
 key_info_to_exportinfo(Client, KeyInfo, _Options) ->
     Key = proplists:get_value(key, KeyInfo),
-    {Transfer, Name} = parse_transfer_and_name(Key),
-    #{
-        transfer => Transfer,
-        name => unicode:characters_to_binary(Name),
-        uri => emqx_s3_client:uri(Client, Key),
-        timestamp => datetime_to_epoch_second(proplists:get_value(last_modified, KeyInfo)),
-        size => proplists:get_value(size, KeyInfo)
-    }.
+    case parse_transfer_and_name(Key) of
+        {ok, {Transfer, Name}} ->
+            {ok, #{
+                transfer => Transfer,
+                name => unicode:characters_to_binary(Name),
+                uri => emqx_s3_client:uri(Client, Key),
+                timestamp => datetime_to_epoch_second(proplists:get_value(last_modified, KeyInfo)),
+                size => proplists:get_value(size, KeyInfo)
+            }};
+        {error, _Reason} = Error ->
+            Error
+    end.
 
 -define(EPOCH_START, 62167219200).
 
@@ -185,8 +190,13 @@ datetime_to_epoch_second(DateTime) ->
     calendar:datetime_to_gregorian_seconds(DateTime) - ?EPOCH_START.
 
 parse_transfer_and_name(Key) ->
-    [ClientId, FileId, Name] = string:split(Key, "/", all),
-    Transfer = {
-        emqx_ft_fs_util:unescape_filename(ClientId), emqx_ft_fs_util:unescape_filename(FileId)
-    },
-    {Transfer, Name}.
+    case string:split(Key, "/", all) of
+        [ClientId, FileId, Name] ->
+            Transfer = {
+                emqx_ft_fs_util:unescape_filename(ClientId),
+                emqx_ft_fs_util:unescape_filename(FileId)
+            },
+            {ok, {Transfer, Name}};
+        _ ->
+            {error, invalid_key}
+    end.

+ 50 - 22
apps/emqx_s3/src/emqx_s3_client.erl

@@ -38,6 +38,8 @@
 -type part_number() :: non_neg_integer().
 -type upload_id() :: string().
 -type etag() :: string().
+-type http_pool() :: ehttpc:pool_name().
+-type pool_type() :: random | hash.
 -type upload_options() :: list({acl, emqx_s3:acl()}).
 
 -opaque client() :: #{
@@ -59,6 +61,7 @@
     access_key_id := string() | undefined,
     secret_access_key := string() | undefined,
     http_pool := ehttpc:pool_name(),
+    pool_type := pool_type(),
     request_timeout := timeout() | undefined,
     max_retries := non_neg_integer() | undefined
 }.
@@ -79,7 +82,8 @@ create(Config) ->
         upload_options => upload_options(Config),
         bucket => maps:get(bucket, Config),
         url_expire_time => maps:get(url_expire_time, Config),
-        headers => headers(Config)
+        headers => headers(Config),
+        pool_type => maps:get(pool_type, Config)
     }.
 
 -spec put_object(client(), key(), iodata()) -> ok_or_error(term()).
@@ -211,6 +215,7 @@ aws_config(#{
     access_key_id := AccessKeyId,
     secret_access_key := SecretAccessKey,
     http_pool := HttpPool,
+    pool_type := PoolType,
     request_timeout := Timeout,
     max_retries := MaxRetries
 }) ->
@@ -224,7 +229,9 @@ aws_config(#{
         access_key_id = AccessKeyId,
         secret_access_key = SecretAccessKey,
 
-        http_client = request_fun(HttpPool, with_default(MaxRetries, ?DEFAULT_MAX_RETRIES)),
+        http_client = request_fun(
+            HttpPool, PoolType, with_default(MaxRetries, ?DEFAULT_MAX_RETRIES)
+        ),
 
         %% This value will be transparently passed to ehttpc
         timeout = with_default(Timeout, ?DEFAULT_REQUEST_TIMEOUT),
@@ -232,55 +239,63 @@ aws_config(#{
         retry_num = 1
     }.
 
--type http_pool() :: term().
-
--spec request_fun(http_pool(), non_neg_integer()) -> erlcloud_httpc:request_fun().
-request_fun(HttpPool, MaxRetries) ->
+-spec request_fun(http_pool(), pool_type(), non_neg_integer()) -> erlcloud_httpc:request_fun().
+request_fun(HttpPool, PoolType, MaxRetries) ->
     fun(Url, Method, Headers, Body, Timeout, _Config) ->
         with_path_and_query_only(Url, fun(PathQuery) ->
             Request = make_request(
                 Method, PathQuery, headers_erlcloud_request_to_ehttpc(Headers), Body
             ),
-            ehttpc_request(HttpPool, Method, Request, Timeout, MaxRetries)
+            case pick_worker_safe(HttpPool, PoolType) of
+                {ok, Worker} ->
+                    ehttpc_request(Worker, Method, Request, Timeout, MaxRetries);
+                {error, Reason} ->
+                    ?SLOG(error, #{
+                        msg => "s3_request_fun_fail",
+                        reason => Reason,
+                        http_pool => HttpPool,
+                        pool_type => PoolType,
+                        method => Method,
+                        request => Request,
+                        timeout => Timeout,
+                        max_retries => MaxRetries
+                    }),
+                    {error, Reason}
+            end
         end)
     end.
 
 ehttpc_request(HttpPool, Method, Request, Timeout, MaxRetries) ->
-    ?SLOG(debug, #{
-        msg => "s3_ehttpc_request",
-        timeout => Timeout,
-        pool => HttpPool,
-        method => Method,
-        max_retries => MaxRetries,
-        request => format_request(Request)
-    }),
-    try ehttpc:request(HttpPool, Method, Request, Timeout, MaxRetries) of
-        {ok, StatusCode, RespHeaders} ->
+    try timer:tc(fun() -> ehttpc:request(HttpPool, Method, Request, Timeout, MaxRetries) end) of
+        {Time, {ok, StatusCode, RespHeaders}} ->
             ?SLOG(debug, #{
                 msg => "s3_ehttpc_request_ok",
                 status_code => StatusCode,
-                headers => RespHeaders
+                headers => RespHeaders,
+                time => Time
             }),
             {ok, {
                 {StatusCode, undefined}, headers_ehttpc_to_erlcloud_response(RespHeaders), undefined
             }};
-        {ok, StatusCode, RespHeaders, RespBody} ->
+        {Time, {ok, StatusCode, RespHeaders, RespBody}} ->
             ?SLOG(debug, #{
                 msg => "s3_ehttpc_request_ok",
                 status_code => StatusCode,
                 headers => RespHeaders,
-                body => RespBody
+                body => RespBody,
+                time => Time
             }),
             {ok, {
                 {StatusCode, undefined}, headers_ehttpc_to_erlcloud_response(RespHeaders), RespBody
             }};
-        {error, Reason} ->
+        {Time, {error, Reason}} ->
             ?SLOG(error, #{
                 msg => "s3_ehttpc_request_fail",
                 reason => Reason,
                 timeout => Timeout,
                 pool => HttpPool,
-                method => Method
+                method => Method,
+                time => Time
             }),
             {error, Reason}
     catch
@@ -304,6 +319,19 @@ ehttpc_request(HttpPool, Method, Request, Timeout, MaxRetries) ->
             {error, Reason}
     end.
 
+pick_worker_safe(HttpPool, PoolType) ->
+    try
+        {ok, pick_worker(HttpPool, PoolType)}
+    catch
+        error:badarg ->
+            {error, no_ehttpc_pool}
+    end.
+
+pick_worker(HttpPool, random) ->
+    ehttpc_pool:pick_worker(HttpPool);
+pick_worker(HttpPool, hash) ->
+    ehttpc_pool:pick_worker(HttpPool, self()).
+
 -define(IS_BODY_EMPTY(Body), (Body =:= undefined orelse Body =:= <<>>)).
 -define(NEEDS_NO_BODY(Method), (Method =:= get orelse Method =:= head orelse Method =:= delete)).
 

+ 4 - 0
apps/emqx_s3/src/emqx_s3_profile_conf.erl

@@ -204,6 +204,7 @@ client_config(ProfileConfig, PoolName) ->
         secret_access_key => maps:get(secret_access_key, ProfileConfig, undefined),
         request_timeout => maps:get(request_timeout, HTTPOpts, undefined),
         max_retries => maps:get(max_retries, HTTPOpts, undefined),
+        pool_type => maps:get(pool_type, HTTPOpts, random),
         http_pool => PoolName
     }.
 
@@ -371,9 +372,12 @@ stop_http_pool(ProfileId, PoolName) ->
     ok = ?tp(debug, "s3_stop_http_pool", #{pool_name => PoolName}).
 
 do_start_http_pool(PoolName, HttpConfig) ->
+    ?SLOG(warning, #{msg => "s3_start_http_pool", pool_name => PoolName, config => HttpConfig}),
     case ehttpc_sup:start_pool(PoolName, HttpConfig) of
         {ok, _} ->
+            ?SLOG(warning, #{msg => "s3_start_http_pool_success", pool_name => PoolName}),
             ok;
         {error, _} = Error ->
+            ?SLOG(error, #{msg => "s3_start_http_pool_fail", pool_name => PoolName, error => Error}),
             Error
     end.

+ 29 - 7
apps/emqx_s3/test/emqx_s3_client_SUITE.erl

@@ -20,9 +20,15 @@ all() ->
 
 groups() ->
     AllCases = emqx_common_test_helpers:all(?MODULE),
+    PoolGroups = [
+        {group, pool_random},
+        {group, pool_hash}
+    ],
     [
-        {tcp, [], AllCases},
-        {tls, [], AllCases}
+        {tcp, [], PoolGroups},
+        {tls, [], PoolGroups},
+        {pool_random, [], AllCases},
+        {pool_hash, [], AllCases}
     ].
 
 init_per_suite(Config) ->
@@ -32,8 +38,17 @@ init_per_suite(Config) ->
 end_per_suite(_Config) ->
     ok = application:stop(emqx_s3).
 
-init_per_group(ConnType, Config) ->
-    [{conn_type, ConnType} | Config].
+init_per_group(ConnTypeGroup, Config) when ConnTypeGroup =:= tcp; ConnTypeGroup =:= tls ->
+    [{conn_type, ConnTypeGroup} | Config];
+init_per_group(PoolTypeGroup, Config) when
+    PoolTypeGroup =:= pool_random; PoolTypeGroup =:= pool_hash
+->
+    PoolType =
+        case PoolTypeGroup of
+            pool_random -> random;
+            pool_hash -> hash
+        end,
+    [{pool_type, PoolType} | Config].
 end_per_group(_ConnType, _Config) ->
     ok.
 
@@ -127,11 +142,18 @@ client(Config) ->
     emqx_s3_client:create(ClientConfig).
 
 profile_config(Config) ->
-    maps:put(
+    ProfileConfig0 = emqx_s3_test_helpers:base_config(?config(conn_type, Config)),
+    ProfileConfig1 = maps:put(
         bucket,
         ?config(bucket, Config),
-        emqx_s3_test_helpers:base_config(?config(conn_type, Config))
-    ).
+        ProfileConfig0
+    ),
+    ProfileConfig2 = emqx_map_lib:deep_put(
+        [transport_options, pool_type],
+        ProfileConfig1,
+        ?config(pool_type, Config)
+    ),
+    ProfileConfig2.
 
 data(Size) ->
     iolist_to_binary([$a || _ <- lists:seq(1, Size)]).