Ver código fonte

fix(webhook): treat http status code 429 as recoverable

Thales Macedo Garitezi 2 anos atrás
pai
commit
ca435975de

+ 97 - 9
apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl

@@ -69,12 +69,24 @@ init_per_testcase(t_path_not_found, Config) ->
     ),
     ok = emqx_connector_web_hook_server:set_handler(not_found_http_handler()),
     [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
+init_per_testcase(t_too_many_requests, Config) ->
+    HTTPPath = <<"/path">>,
+    ServerSSLOpts = false,
+    {ok, {HTTPPort, _Pid}} = emqx_connector_web_hook_server:start_link(
+        _Port = random, HTTPPath, ServerSSLOpts
+    ),
+    ok = emqx_connector_web_hook_server:set_handler(too_many_requests_http_handler()),
+    [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
 init_per_testcase(_TestCase, Config) ->
     Server = start_http_server(#{response_delay_ms => 0}),
     [{http_server, Server} | Config].
 
-end_per_testcase(t_path_not_found, _Config) ->
+end_per_testcase(TestCase, _Config) when
+    TestCase =:= t_path_not_found;
+    TestCase =:= t_too_many_requests
+->
     ok = emqx_connector_web_hook_server:stop(),
+    persistent_term:erase({?MODULE, times_called}),
     emqx_bridge_testlib:delete_all_bridges(),
     emqx_common_test_helpers:call_janitor(),
     ok;
@@ -220,7 +232,7 @@ bridge_async_config(#{port := Port} = Config) ->
         "  body = \"${id}\"\n"
         "  resource_opts {\n"
         "    inflight_window = 100\n"
-        "    health_check_interval = \"15s\"\n"
+        "    health_check_interval = \"200ms\"\n"
         "    max_buffer_bytes = \"1GB\"\n"
         "    query_mode = \"~s\"\n"
         "    request_ttl = \"~p\"\n"
@@ -284,6 +296,47 @@ not_found_http_handler() ->
         {ok, Rep, State}
     end.
 
+too_many_requests_http_handler() ->
+    GetAndBump =
+        fun() ->
+            NCalled = persistent_term:get({?MODULE, times_called}, 0),
+            persistent_term:put({?MODULE, times_called}, NCalled + 1),
+            NCalled + 1
+        end,
+    TestPid = self(),
+    fun(Req0, State) ->
+        N = GetAndBump(),
+        {ok, Body, Req} = cowboy_req:read_body(Req0),
+        TestPid ! {http, cowboy_req:headers(Req), Body},
+        Rep =
+            case N >= 2 of
+                true ->
+                    cowboy_req:reply(
+                        200,
+                        #{<<"content-type">> => <<"text/plain">>},
+                        <<"ok">>,
+                        Req
+                    );
+                false ->
+                    cowboy_req:reply(
+                        429,
+                        #{<<"content-type">> => <<"text/plain">>},
+                        <<"slow down, buddy">>,
+                        Req
+                    )
+            end,
+        {ok, Rep, State}
+    end.
+
+wait_http_request() ->
+    receive
+        {http, _Headers, _Req} ->
+            ok
+    after 1_000 ->
+        ct:pal("mailbox: ~p", [process_info(self(), messages)]),
+        ct:fail("http request not made")
+    end.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -422,13 +475,7 @@ t_path_not_found(Config) ->
             {ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig),
             Msg = emqx_message:make(MQTTTopic, <<"{}">>),
             emqx:publish(Msg),
-            receive
-                {http, _Headers, _Req} ->
-                    ok
-            after 1_000 ->
-                ct:pal("mailbox: ~p", [process_info(self(), messages)]),
-                ct:fail("http request not made")
-            end,
+            wait_http_request(),
             ?retry(
                 _Interval = 500,
                 _NAttempts = 20,
@@ -452,6 +499,47 @@ t_path_not_found(Config) ->
     ),
     ok.
 
+t_too_many_requests(Config) ->
+    ?check_trace(
+        begin
+            #{port := Port, path := Path} = ?config(http_server, Config),
+            MQTTTopic = <<"t/webhook">>,
+            BridgeConfig = bridge_async_config(#{
+                type => ?BRIDGE_TYPE,
+                name => ?BRIDGE_NAME,
+                local_topic => MQTTTopic,
+                port => Port,
+                path => Path
+            }),
+            {ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig),
+            Msg = emqx_message:make(MQTTTopic, <<"{}">>),
+            emqx:publish(Msg),
+            %% should retry
+            wait_http_request(),
+            wait_http_request(),
+            ?retry(
+                _Interval = 500,
+                _NAttempts = 20,
+                ?assertMatch(
+                    #{
+                        counters := #{
+                            matched := 1,
+                            failed := 0,
+                            success := 1
+                        }
+                    },
+                    emqx_bridge:get_metrics(?BRIDGE_TYPE, ?BRIDGE_NAME)
+                )
+            ),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertMatch([_ | _], ?of_kind(webhook_will_retry_async, Trace)),
+            ok
+        end
+    ),
+    ok.
+
 %% helpers
 do_t_async_retries(TestContext, Error, Fn) ->
     #{error_attempts := ErrorAttempts} = TestContext,

+ 7 - 0
apps/emqx_connector/src/emqx_connector_http.erl

@@ -642,8 +642,15 @@ transform_result(Result) ->
             Result;
         {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->
             Result;
+        {ok, _TooManyRequests = StatusCode = 429, Headers} ->
+            {error, {recoverable_error, #{status_code => StatusCode, headers => Headers}}};
         {ok, StatusCode, Headers} ->
             {error, {unrecoverable_error, #{status_code => StatusCode, headers => Headers}}};
+        {ok, _TooManyRequests = StatusCode = 429, Headers, Body} ->
+            {error,
+                {recoverable_error, #{
+                    status_code => StatusCode, headers => Headers, body => Body
+                }}};
         {ok, StatusCode, Headers, Body} ->
             {error,
                 {unrecoverable_error, #{