فهرست منبع

fix(webhook): treat 404 and other error replies as errors in async requests

Fixes https://emqx.atlassian.net/browse/EMQX-10405

The problem here was that, for async requests, ehttpc responses of the form `{ok, 4__, _,
_}` and similar were being treated as successes.
Thales Macedo Garitezi 2 سال پیش
والد
کامیت
59b109eb5c
3فایلهای تغییر یافته به همراه140 افزوده شده و 54 حذف شده
  1. 87 2
      apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl
  2. 52 52
      apps/emqx_connector/src/emqx_connector_http.erl
  3. 1 0
      changes/ce/fix-11162.en.md

+ 87 - 2
apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl

@@ -27,6 +27,7 @@
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(BRIDGE_TYPE, <<"webhook">>).
 -define(BRIDGE_NAME, atom_to_binary(?MODULE)).
@@ -60,10 +61,23 @@ init_per_testcase(t_send_async_connection_timeout, Config) ->
     ResponseDelayMS = 500,
     Server = start_http_server(#{response_delay_ms => ResponseDelayMS}),
     [{http_server, Server}, {response_delay_ms, ResponseDelayMS} | Config];
+init_per_testcase(t_path_not_found, Config) ->
+    HTTPPath = <<"/nonexisting/path">>,
+    ServerSSLOpts = false,
+    {ok, {HTTPPort, _Pid}} = emqx_connector_web_hook_server:start_link(
+        _Port = random, HTTPPath, ServerSSLOpts
+    ),
+    ok = emqx_connector_web_hook_server:set_handler(not_found_http_handler()),
+    [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
 init_per_testcase(_TestCase, Config) ->
     Server = start_http_server(#{response_delay_ms => 0}),
     [{http_server, Server} | Config].
 
+end_per_testcase(t_path_not_found, _Config) ->
+    ok = emqx_connector_web_hook_server:stop(),
+    emqx_bridge_testlib:delete_all_bridges(),
+    emqx_common_test_helpers:call_janitor(),
+    ok;
 end_per_testcase(_TestCase, Config) ->
     case ?config(http_server, Config) of
         undefined -> ok;
@@ -176,24 +190,34 @@ parse_http_request_assertive(ReqStr0) ->
 bridge_async_config(#{port := Port} = Config) ->
     Type = maps:get(type, Config, ?BRIDGE_TYPE),
     Name = maps:get(name, Config, ?BRIDGE_NAME),
+    Path = maps:get(path, Config, ""),
     PoolSize = maps:get(pool_size, Config, 1),
     QueryMode = maps:get(query_mode, Config, "async"),
     ConnectTimeout = maps:get(connect_timeout, Config, "1s"),
     RequestTimeout = maps:get(request_timeout, Config, "10s"),
     ResumeInterval = maps:get(resume_interval, Config, "1s"),
     ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"),
+    LocalTopic =
+        case maps:find(local_topic, Config) of
+            {ok, LT} ->
+                lists:flatten(["local_topic = \"", LT, "\""]);
+            error ->
+                ""
+        end,
     ConfigString = io_lib:format(
         "bridges.~s.~s {\n"
-        "  url = \"http://localhost:~p\"\n"
+        "  url = \"http://localhost:~p~s\"\n"
         "  connect_timeout = \"~p\"\n"
         "  enable = true\n"
+        %% local_topic
+        "  ~s\n"
         "  enable_pipelining = 100\n"
         "  max_retries = 2\n"
         "  method = \"post\"\n"
         "  pool_size = ~p\n"
         "  pool_type = \"random\"\n"
         "  request_timeout = \"~s\"\n"
-        "  body = \"${id}\""
+        "  body = \"${id}\"\n"
         "  resource_opts {\n"
         "    inflight_window = 100\n"
         "    health_check_interval = \"15s\"\n"
@@ -213,7 +237,9 @@ bridge_async_config(#{port := Port} = Config) ->
             Type,
             Name,
             Port,
+            Path,
             ConnectTimeout,
+            LocalTopic,
             PoolSize,
             RequestTimeout,
             QueryMode,
@@ -244,6 +270,20 @@ make_bridge(Config) ->
     ),
     emqx_bridge_resource:bridge_id(Type, Name).
 
+not_found_http_handler() ->
+    TestPid = self(),
+    fun(Req0, State) ->
+        {ok, Body, Req} = cowboy_req:read_body(Req0),
+        TestPid ! {http, cowboy_req:headers(Req), Body},
+        Rep = cowboy_req:reply(
+            404,
+            #{<<"content-type">> => <<"text/plain">>},
+            <<"not found">>,
+            Req
+        ),
+        {ok, Rep, State}
+    end.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -367,6 +407,51 @@ t_start_stop(Config) ->
         ?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig, emqx_connector_http_stopped
     ).
 
+t_path_not_found(Config) ->
+    ?check_trace(
+        begin
+            #{port := Port, path := Path} = ?config(http_server, Config),
+            MQTTTopic = <<"t/webhook">>,
+            BridgeConfig = bridge_async_config(#{
+                type => ?BRIDGE_TYPE,
+                name => ?BRIDGE_NAME,
+                local_topic => MQTTTopic,
+                port => Port,
+                path => Path
+            }),
+            {ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig),
+            Msg = emqx_message:make(MQTTTopic, <<"{}">>),
+            emqx:publish(Msg),
+            receive
+                {http, _Headers, _Req} ->
+                    ok
+            after 1_000 ->
+                ct:pal("mailbox: ~p", [process_info(self(), messages)]),
+                ct:fail("http request not made")
+            end,
+            ?retry(
+                _Interval = 500,
+                _NAttempts = 20,
+                ?assertMatch(
+                    #{
+                        counters := #{
+                            matched := 1,
+                            failed := 1,
+                            success := 0
+                        }
+                    },
+                    emqx_bridge:get_metrics(?BRIDGE_TYPE, ?BRIDGE_NAME)
+                )
+            ),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertEqual([], ?of_kind(webhook_will_retry_async, Trace)),
+            ok
+        end
+    ),
+    ok.
+
 %% helpers
 do_t_async_retries(TestContext, Error, Fn) ->
     #{error_attempts := ErrorAttempts} = TestContext,

+ 52 - 52
apps/emqx_connector/src/emqx_connector_http.erl

@@ -301,65 +301,41 @@ on_query(
     ),
     NRequest = formalize_request(Method, BasePath, Request),
     Worker = resolve_pool_worker(State, KeyOrNum),
-    case
-        ehttpc:request(
-            Worker,
-            Method,
-            NRequest,
-            Timeout,
-            Retry
-        )
-    of
-        {error, Reason} when
-            Reason =:= econnrefused;
-            Reason =:= timeout;
-            Reason =:= {shutdown, normal};
-            Reason =:= {shutdown, closed}
-        ->
-            ?SLOG(warning, #{
-                msg => "http_connector_do_request_failed",
-                reason => Reason,
-                connector => InstId
-            }),
-            {error, {recoverable_error, Reason}};
-        {error, {closed, _Message} = Reason} ->
-            %% _Message = "The connection was lost."
+    Result0 = ehttpc:request(
+        Worker,
+        Method,
+        NRequest,
+        Timeout,
+        Retry
+    ),
+    Result = transform_result(Result0),
+    case Result of
+        {error, {recoverable_error, Reason}} ->
             ?SLOG(warning, #{
                 msg => "http_connector_do_request_failed",
                 reason => Reason,
                 connector => InstId
             }),
             {error, {recoverable_error, Reason}};
-        {error, Reason} = Result ->
-            ?SLOG(error, #{
-                msg => "http_connector_do_request_failed",
-                request => redact(NRequest),
-                reason => Reason,
-                connector => InstId
-            }),
-            Result;
-        {ok, StatusCode, _} = Result when StatusCode >= 200 andalso StatusCode < 300 ->
-            Result;
-        {ok, StatusCode, _, _} = Result when StatusCode >= 200 andalso StatusCode < 300 ->
-            Result;
-        {ok, StatusCode, Headers} ->
+        {error, #{status_code := StatusCode}} ->
             ?SLOG(error, #{
-                msg => "http connector do request, received error response",
+                msg => "http connector do request, received error response.",
                 note => "the body will be redacted due to security reasons",
                 request => redact_request(NRequest),
                 connector => InstId,
                 status_code => StatusCode
             }),
-            {error, #{status_code => StatusCode, headers => Headers}};
-        {ok, StatusCode, Headers, Body} ->
+            Result;
+        {error, Reason} ->
             ?SLOG(error, #{
-                msg => "http connector do request, received error response.",
-                note => "the body will be redacted due to security reasons",
-                request => redact_request(NRequest),
-                connector => InstId,
-                status_code => StatusCode
+                msg => "http_connector_do_request_failed",
+                request => redact(NRequest),
+                reason => Reason,
+                connector => InstId
             }),
-            {error, #{status_code => StatusCode, headers => Headers, body => Body}}
+            Result;
+        _Success ->
+            Result
     end.
 
 on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
@@ -639,8 +615,11 @@ to_bin(Str) when is_list(Str) ->
 to_bin(Atom) when is_atom(Atom) ->
     atom_to_binary(Atom, utf8).
 
-reply_delegator(Context, ReplyFunAndArgs, Result) ->
-    spawn(fun() -> maybe_retry(Result, Context, ReplyFunAndArgs) end).
+reply_delegator(Context, ReplyFunAndArgs, Result0) ->
+    spawn(fun() ->
+        Result = transform_result(Result0),
+        maybe_retry(Result, Context, ReplyFunAndArgs)
+    end).
 
 transform_result(Result) ->
     case Result of
@@ -657,14 +636,29 @@ transform_result(Result) ->
         {error, {closed, _Message} = Reason} ->
             %% _Message = "The connection was lost."
             {error, {recoverable_error, Reason}};
-        _ ->
-            Result
+        {error, _Reason} ->
+            Result;
+        {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
+            Result;
+        {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->
+            Result;
+        {ok, StatusCode, Headers} ->
+            {error, {unrecoverable_error, #{status_code => StatusCode, headers => Headers}}};
+        {ok, StatusCode, Headers, Body} ->
+            {error,
+                {unrecoverable_error, #{
+                    status_code => StatusCode, headers => Headers, body => Body
+                }}}
     end.
 
-maybe_retry(Result0, _Context = #{attempt := N, max_attempts := Max}, ReplyFunAndArgs) when
+maybe_retry(Result, _Context = #{attempt := N, max_attempts := Max}, ReplyFunAndArgs) when
     N >= Max
 ->
-    Result = transform_result(Result0),
+    emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
+maybe_retry(
+    {error, {unrecoverable_error, #{status_code := _}}} = Result, _Context, ReplyFunAndArgs
+) ->
+    %% request was successful, but we got an error response; no need to retry
     emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
 maybe_retry({error, Reason}, Context, ReplyFunAndArgs) ->
     #{
@@ -676,12 +670,18 @@ maybe_retry({error, Reason}, Context, ReplyFunAndArgs) ->
         timeout := Timeout
     } = Context,
     %% TODO: reset the expiration time for free retries?
-    IsFreeRetry = Reason =:= normal orelse Reason =:= {shutdown, normal},
+    IsFreeRetry =
+        case Reason of
+            {recoverable_error, normal} -> true;
+            {recoverable_error, {shutdown, normal}} -> true;
+            _ -> false
+        end,
     NContext =
         case IsFreeRetry of
             true -> Context;
             false -> Context#{attempt := Attempt + 1}
         end,
+    ?tp(webhook_will_retry_async, #{}),
     Worker = resolve_pool_worker(State, KeyOrNum),
     ok = ehttpc:request_async(
         Worker,

+ 1 - 0
changes/ce/fix-11162.en.md

@@ -0,0 +1 @@
+Fixed an issue in webhook bridge where, in async query mode, HTTP status codes like 4XX and 5XX would be treated as successes in the bridge metrics.