Просмотр исходного кода

fix(ft-s3): make controllable use of max_retries and request_timeout arguments

Ilya Averyanov 3 лет назад
Родитель
Сommit
820e06d756

+ 22 - 16
apps/emqx_s3/src/emqx_s3_client.erl

@@ -58,11 +58,15 @@
     access_key_id := string() | undefined,
     secret_access_key := string() | undefined,
     http_pool := ehttpc:pool_name(),
-    request_timeout := timeout()
+    request_timeout := timeout() | undefined,
+    max_retries := non_neg_integer() | undefined
 }.
 
 -type s3_options() :: list({string(), string()}).
 
+-define(DEFAULT_REQUEST_TIMEOUT, 30000).
+-define(DEFAULT_MAX_RETRIES, 2).
+
 %%--------------------------------------------------------------------
 %% API
 %%--------------------------------------------------------------------
@@ -206,7 +210,8 @@ aws_config(#{
     access_key_id := AccessKeyId,
     secret_access_key := SecretAccessKey,
     http_pool := HttpPool,
-    request_timeout := Timeout
+    request_timeout := Timeout,
+    max_retries := MaxRetries
 }) ->
     #aws_config{
         s3_scheme = Scheme,
@@ -218,39 +223,37 @@ aws_config(#{
         access_key_id = AccessKeyId,
         secret_access_key = SecretAccessKey,
 
-        http_client = request_fun(HttpPool),
-        timeout = Timeout
+        http_client = request_fun(HttpPool, with_default(MaxRetries, ?DEFAULT_MAX_RETRIES)),
+
+        %% This value will be transparently passed to ehttpc
+        timeout = with_default(Timeout, ?DEFAULT_REQUEST_TIMEOUT),
+        %% We rely on retry mechanism of ehttpc
+        retry_num = 1
     }.
 
 -type http_pool() :: term().
 
--spec request_fun(http_pool()) -> erlcloud_httpc:request_fun().
-request_fun(HttpPool) ->
+-spec request_fun(http_pool(), non_neg_integer()) -> erlcloud_httpc:request_fun().
+request_fun(HttpPool, MaxRetries) ->
     fun(Url, Method, Headers, Body, Timeout, _Config) ->
         with_path_and_query_only(Url, fun(PathQuery) ->
             Request = make_request(
                 Method, PathQuery, headers_erlcloud_request_to_ehttpc(Headers), Body
             ),
-            ?SLOG(debug, #{
-                msg => "s3_ehttpc_request",
-                timeout => Timeout,
-                pool => HttpPool,
-                method => Method,
-                request => Request
-            }),
-            ehttpc_request(HttpPool, Method, Request, Timeout)
+            ehttpc_request(HttpPool, Method, Request, Timeout, MaxRetries)
         end)
     end.
 
-ehttpc_request(HttpPool, Method, Request, Timeout) ->
+ehttpc_request(HttpPool, Method, Request, Timeout, MaxRetries) ->
     ?SLOG(debug, #{
         msg => "s3_ehttpc_request",
         timeout => Timeout,
         pool => HttpPool,
         method => Method,
+        max_retries => MaxRetries,
         request => format_request(Request)
     }),
-    try ehttpc:request(HttpPool, Method, Request, Timeout) of
+    try ehttpc:request(HttpPool, Method, Request, Timeout, MaxRetries) of
         {ok, StatusCode, RespHeaders} ->
             ?SLOG(debug, #{
                 msg => "s3_ehttpc_request_ok",
@@ -388,3 +391,6 @@ response_property(Name, Props) ->
         Value ->
             Value
     end.
+
+with_default(undefined, Default) -> Default;
+with_default(Value, _Default) -> Value.

+ 1 - 0
apps/emqx_s3/src/emqx_s3_profile_conf.erl

@@ -203,6 +203,7 @@ client_config(ProfileConfig, PoolName) ->
         access_key_id => maps:get(access_key_id, ProfileConfig, undefined),
         secret_access_key => maps:get(secret_access_key, ProfileConfig, undefined),
         request_timeout => maps:get(request_timeout, HTTPOpts, undefined),
+        max_retries => maps:get(max_retries, HTTPOpts, undefined),
         http_pool => PoolName
     }.
 

+ 1 - 1
apps/emqx_s3/test/emqx_s3_test_helpers.erl

@@ -99,7 +99,7 @@ unique_bucket() ->
 with_failure(_ConnType, ehttpc_500, Fun) ->
     try
         meck:new(ehttpc, [passthrough, no_history]),
-        meck:expect(ehttpc, request, fun(_, _, _, _) -> {ok, 500, []} end),
+        meck:expect(ehttpc, request, fun(_, _, _, _, _) -> {ok, 500, []} end),
         Fun()
     after
         meck:unload(ehttpc)

+ 0 - 1
apps/emqx_s3/test/emqx_s3_uploader_SUITE.erl

@@ -13,7 +13,6 @@
 -define(assertProcessExited(Reason, Pid),
     receive
         {'DOWN', _, _, Pid, Reason} ->
-            % ct:print("uploader process exited with reason: ~p", [R]),
             ok
     after 3000 ->
         ct:fail("uploader process did not exit")