Преглед изворни кода

fix(s3-bridge): handle recoverable AWS errors

Andrew Mayorov пре 2 година
родитељ
комит
a4eac75b25

+ 3 - 1
apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl

@@ -196,8 +196,10 @@ run_simple_upload(
 
 map_error({socket_error, _} = Reason) ->
     {recoverable_error, Reason};
+map_error(Reason = {aws_error, Status, _, _Body}) when Status >= 500 ->
+    %% https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
+    {recoverable_error, Reason};
 map_error(Reason) ->
-    %% TODO: Recoverable errors.
     {unrecoverable_error, Reason}.
 
 render_bucket(Template, Data) ->

+ 35 - 4
apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl

@@ -9,6 +9,7 @@
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/test_macros.hrl").
 
 -import(emqx_utils_conv, [bin/1]).
 
@@ -89,8 +90,8 @@ connector_config(Name, _Config) ->
             <<"headers">> => #{
                 <<"content-type">> => <<?CONTENT_TYPE>>
             },
-            <<"connect_timeout">> => 1000,
-            <<"request_timeout">> => 1000,
+            <<"connect_timeout">> => <<"500ms">>,
+            <<"request_timeout">> => <<"1s">>,
             <<"pool_size">> => 4,
             <<"max_retries">> => 0,
             <<"enable_pipelining">> => 1
@@ -110,13 +111,13 @@ action_config(Name, ConnectorId) ->
         <<"resource_opts">> => #{
             <<"buffer_mode">> => <<"memory_only">>,
             <<"buffer_seg_bytes">> => <<"10MB">>,
-            <<"health_check_interval">> => <<"5s">>,
+            <<"health_check_interval">> => <<"3s">>,
             <<"inflight_window">> => 40,
             <<"max_buffer_bytes">> => <<"256MB">>,
             <<"metrics_flush_interval">> => <<"1s">>,
             <<"query_mode">> => <<"sync">>,
             <<"request_ttl">> => <<"60s">>,
-            <<"resume_interval">> => <<"5s">>,
+            <<"resume_interval">> => <<"3s">>,
             <<"worker_pool_size">> => <<"4">>
         }
     }).
@@ -165,6 +166,36 @@ t_sync_query(Config) ->
         maps:from_list(erlcloud_s3:get_object(Bucket, Topic, AwsConfig))
     ).
 
+t_query_retry_recoverable(Config) ->
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    BridgeName = ?config(bridge_name, Config),
+    Bucket = emqx_s3_test_helpers:unique_bucket(),
+    Topic = "d/e/f",
+    Payload = rand:bytes(1024),
+    AwsConfig = emqx_s3_test_helpers:aws_config(tcp),
+    ok = erlcloud_s3:create_bucket(Bucket, AwsConfig),
+    %% Create a bridge with the sample configuration.
+    ?assertMatch(
+        {ok, _Bridge},
+        emqx_bridge_v2_testlib:create_bridge(Config)
+    ),
+    %% Simulate recoverable failure.
+    _ = emqx_common_test_helpers:enable_failure(timeout, ?PROXY_NAME, ProxyHost, ProxyPort),
+    _ = timer:apply_after(
+        _Timeout = 5000,
+        emqx_common_test_helpers,
+        heal_failure,
+        [timeout, ?PROXY_NAME, ProxyHost, ProxyPort]
+    ),
+    Message = mk_message(Bucket, Topic, Payload),
+    %% Verify that the message is sent eventually.
+    ok = emqx_bridge_v2:send_message(?BRIDGE_TYPE, BridgeName, Message, #{}),
+    ?assertMatch(
+        #{content := Payload},
+        maps:from_list(erlcloud_s3:get_object(Bucket, Topic, AwsConfig))
+    ).
+
 mk_message(ClientId, Topic, Payload) ->
     Message = emqx_message:make(bin(ClientId), bin(Topic), Payload),
     {Event, _} = emqx_rule_events:eventmsg_publish(Message),