Parcourir la source

perf(webhook): add retry attempts for async

This is a performance improvement for webhook bridge.

Since this bridge is called using `async` callback mode, and `ehttpc`
frequently returns errors of the form `normal` and `{shutdown,
normal}` that are retried "for free" by `ehttpc`, we add this behavior
to async requests as well.  Other errors are retried too, but they are
not "free": 3 attempts are made at a maximum.

This is important because, when using buffer workers, we should avoid
making them enter the `blocked` state, since that halts all progress
and makes throughput plummet.
Thales Macedo Garitezi il y a 2 ans
Parent
commit
a7b41e1cdf

+ 106 - 4
apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl

@@ -23,6 +23,7 @@
 -compile(export_all).
 -compile(export_all).
 
 
 -import(emqx_mgmt_api_test_util, [request/3, uri/1]).
 -import(emqx_mgmt_api_test_util, [request/3, uri/1]).
+-import(emqx_common_test_helpers, [on_exit/1]).
 
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("common_test/include/ct.hrl").
@@ -52,6 +53,13 @@ end_per_suite(_Config) ->
 suite() ->
 suite() ->
     [{timetrap, {seconds, 60}}].
     [{timetrap, {seconds, 60}}].
 
 
+init_per_testcase(_TestCase, Config) ->
+    Config.
+
+end_per_testcase(_TestCase, _Config) ->
+    emqx_common_test_helpers:call_janitor(),
+    ok.
+
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% HTTP server for testing
 %% HTTP server for testing
 %% (Orginally copied from emqx_bridge_api_SUITE)
 %% (Orginally copied from emqx_bridge_api_SUITE)
@@ -158,7 +166,8 @@ bridge_async_config(#{port := Port} = Config) ->
     QueryMode = maps:get(query_mode, Config, "async"),
     QueryMode = maps:get(query_mode, Config, "async"),
     ConnectTimeout = maps:get(connect_timeout, Config, 1),
     ConnectTimeout = maps:get(connect_timeout, Config, 1),
     RequestTimeout = maps:get(request_timeout, Config, 10000),
     RequestTimeout = maps:get(request_timeout, Config, 10000),
-    ResourceRequestTimeout = maps:get(resouce_request_timeout, Config, "infinity"),
+    ResumeInterval = maps:get(resume_interval, Config, "1s"),
+    ResourceRequestTimeout = maps:get(resource_request_timeout, Config, "infinity"),
     ConfigString = io_lib:format(
     ConfigString = io_lib:format(
         "bridges.~s.~s {\n"
         "bridges.~s.~s {\n"
         "  url = \"http://localhost:~p\"\n"
         "  url = \"http://localhost:~p\"\n"
@@ -177,7 +186,8 @@ bridge_async_config(#{port := Port} = Config) ->
         "    health_check_interval = \"15s\"\n"
         "    health_check_interval = \"15s\"\n"
         "    max_buffer_bytes = \"1GB\"\n"
         "    max_buffer_bytes = \"1GB\"\n"
         "    query_mode = \"~s\"\n"
         "    query_mode = \"~s\"\n"
-        "    request_timeout = \"~s\"\n"
+        "    request_timeout = \"~p\"\n"
+        "    resume_interval = \"~s\"\n"
         "    start_after_created = \"true\"\n"
         "    start_after_created = \"true\"\n"
         "    start_timeout = \"5s\"\n"
         "    start_timeout = \"5s\"\n"
         "    worker_pool_size = \"1\"\n"
         "    worker_pool_size = \"1\"\n"
@@ -194,7 +204,8 @@ bridge_async_config(#{port := Port} = Config) ->
             PoolSize,
             PoolSize,
             RequestTimeout,
             RequestTimeout,
             QueryMode,
             QueryMode,
-            ResourceRequestTimeout
+            ResourceRequestTimeout,
+            ResumeInterval
         ]
         ]
     ),
     ),
     ct:pal(ConfigString),
     ct:pal(ConfigString),
@@ -236,7 +247,7 @@ t_send_async_connection_timeout(_Config) ->
         query_mode => "async",
         query_mode => "async",
         connect_timeout => ResponseDelayMS * 2,
         connect_timeout => ResponseDelayMS * 2,
         request_timeout => 10000,
         request_timeout => 10000,
-        resouce_request_timeout => "infinity"
+        resource_request_timeout => "infinity"
     }),
     }),
     NumberOfMessagesToSend = 10,
     NumberOfMessagesToSend = 10,
     [
     [
@@ -250,6 +261,97 @@ t_send_async_connection_timeout(_Config) ->
     stop_http_server(Server),
     stop_http_server(Server),
     ok.
     ok.
 
 
+t_async_free_retries(_Config) ->
+    #{port := Port} = start_http_server(#{response_delay_ms => 0}),
+    BridgeID = make_bridge(#{
+        port => Port,
+        pool_size => 1,
+        query_mode => "sync",
+        connect_timeout => 1_000,
+        request_timeout => 10_000,
+        resource_request_timeout => "10000s"
+    }),
+    %% Fail 5 times then succeed.
+    Context = #{error_attempts => 5},
+    ExpectedAttempts = 6,
+    Fn = fun(Get, Error) ->
+        ?assertMatch(
+            {ok, 200, _, _},
+            emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
+            #{error => Error}
+        ),
+        ?assertEqual(ExpectedAttempts, Get(), #{error => Error})
+    end,
+    do_t_async_retries(Context, {error, normal}, Fn),
+    do_t_async_retries(Context, {error, {shutdown, normal}}, Fn),
+    ok.
+
+t_async_common_retries(_Config) ->
+    #{port := Port} = start_http_server(#{response_delay_ms => 0}),
+    BridgeID = make_bridge(#{
+        port => Port,
+        pool_size => 1,
+        query_mode => "sync",
+        resume_interval => "100ms",
+        connect_timeout => 1_000,
+        request_timeout => 10_000,
+        resource_request_timeout => "10000s"
+    }),
+    %% Keeps failing until connector gives up.
+    Context = #{error_attempts => infinity},
+    ExpectedAttempts = 3,
+    FnSucceed = fun(Get, Error) ->
+        ?assertMatch(
+            {ok, 200, _, _},
+            emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
+            #{error => Error, attempts => Get()}
+        ),
+        ?assertEqual(ExpectedAttempts, Get(), #{error => Error})
+    end,
+    FnFail = fun(Get, Error) ->
+        ?assertMatch(
+            Error,
+            emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
+            #{error => Error, attempts => Get()}
+        ),
+        ?assertEqual(ExpectedAttempts, Get(), #{error => Error})
+    end,
+    %% These two succeed because they're further retried by the buffer
+    %% worker synchronously, and we're not mock that call.
+    do_t_async_retries(Context, {error, {closed, "The connection was lost."}}, FnSucceed),
+    do_t_async_retries(Context, {error, {shutdown, closed}}, FnSucceed),
+    %% This fails because this error is treated as unrecoverable.
+    do_t_async_retries(Context, {error, something_else}, FnFail),
+    ok.
+
+do_t_async_retries(TestContext, Error, Fn) ->
+    #{error_attempts := ErrorAttempts} = TestContext,
+    persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, 0),
+    on_exit(fun() -> persistent_term:erase({?MODULE, ?FUNCTION_NAME, attempts}) end),
+    Get = fun() -> persistent_term:get({?MODULE, ?FUNCTION_NAME, attempts}) end,
+    GetAndBump = fun() ->
+        Attempts = persistent_term:get({?MODULE, ?FUNCTION_NAME, attempts}),
+        persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, Attempts + 1),
+        Attempts + 1
+    end,
+    emqx_common_test_helpers:with_mock(
+        emqx_connector_http,
+        reply_delegator,
+        fun(Context, ReplyFunAndArgs, Result) ->
+            Attempts = GetAndBump(),
+            case Attempts > ErrorAttempts of
+                true ->
+                    ct:pal("succeeding ~p : ~p", [Error, Attempts]),
+                    meck:passthrough([Context, ReplyFunAndArgs, Result]);
+                false ->
+                    ct:pal("failing ~p : ~p", [Error, Attempts]),
+                    meck:passthrough([Context, ReplyFunAndArgs, Error])
+            end
+        end,
+        fun() -> Fn(Get, Error) end
+    ),
+    ok.
+
 receive_request_notifications(MessageIDs, _ResponseDelay) when map_size(MessageIDs) =:= 0 ->
 receive_request_notifications(MessageIDs, _ResponseDelay) when map_size(MessageIDs) =:= 0 ->
     ok;
     ok;
 receive_request_notifications(MessageIDs, ResponseDelay) ->
 receive_request_notifications(MessageIDs, ResponseDelay) ->

+ 53 - 9
apps/emqx_connector/src/emqx_connector_http.erl

@@ -32,7 +32,7 @@
     on_query/3,
     on_query/3,
     on_query_async/4,
     on_query_async/4,
     on_get_status/2,
     on_get_status/2,
-    reply_delegator/2
+    reply_delegator/3
 ]).
 ]).
 
 
 -type url() :: emqx_http_lib:uri_map().
 -type url() :: emqx_http_lib:uri_map().
@@ -267,7 +267,7 @@ on_query(InstId, {send_message, Msg}, State) ->
                 request_timeout := Timeout
                 request_timeout := Timeout
             } = process_request(Request, Msg),
             } = process_request(Request, Msg),
             %% bridge buffer worker has retry, do not let ehttpc retry
             %% bridge buffer worker has retry, do not let ehttpc retry
-            Retry = 0,
+            Retry = 2,
             ClientId = maps:get(clientid, Msg, undefined),
             ClientId = maps:get(clientid, Msg, undefined),
             on_query(
             on_query(
                 InstId,
                 InstId,
@@ -396,12 +396,22 @@ on_query_async(
         }
         }
     ),
     ),
     NRequest = formalize_request(Method, BasePath, Request),
     NRequest = formalize_request(Method, BasePath, Request),
+    MaxAttempts = maps:get(max_attempts, State, 3),
+    Context = #{
+        attempt => 1,
+        max_attempts => MaxAttempts,
+        state => State,
+        key_or_num => KeyOrNum,
+        method => Method,
+        request => NRequest,
+        timeout => Timeout
+    },
     ok = ehttpc:request_async(
     ok = ehttpc:request_async(
         Worker,
         Worker,
         Method,
         Method,
         NRequest,
         NRequest,
         Timeout,
         Timeout,
-        {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs]}
+        {fun ?MODULE:reply_delegator/3, [Context, ReplyFunAndArgs]}
     ),
     ),
     {ok, Worker}.
     {ok, Worker}.
 
 
@@ -634,7 +644,10 @@ to_bin(Str) when is_list(Str) ->
 to_bin(Atom) when is_atom(Atom) ->
 to_bin(Atom) when is_atom(Atom) ->
     atom_to_binary(Atom, utf8).
     atom_to_binary(Atom, utf8).
 
 
-reply_delegator(ReplyFunAndArgs, Result) ->
+reply_delegator(Context, ReplyFunAndArgs, Result) ->
+    maybe_retry(Result, Context, ReplyFunAndArgs).
+
+transform_result(Result) ->
     case Result of
     case Result of
         %% The normal reason happens when the HTTP connection times out before
         %% The normal reason happens when the HTTP connection times out before
         %% the request has been fully processed
         %% the request has been fully processed
@@ -645,16 +658,47 @@ reply_delegator(ReplyFunAndArgs, Result) ->
             Reason =:= {shutdown, normal};
             Reason =:= {shutdown, normal};
             Reason =:= {shutdown, closed}
             Reason =:= {shutdown, closed}
         ->
         ->
-            Result1 = {error, {recoverable_error, Reason}},
-            emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
+            {error, {recoverable_error, Reason}};
         {error, {closed, _Message} = Reason} ->
         {error, {closed, _Message} = Reason} ->
             %% _Message = "The connection was lost."
             %% _Message = "The connection was lost."
-            Result1 = {error, {recoverable_error, Reason}},
-            emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
+            {error, {recoverable_error, Reason}};
         _ ->
         _ ->
-            emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
+            Result
     end.
     end.
 
 
+maybe_retry(Result0, _Context = #{attempt := N, max_attempts := Max}, ReplyFunAndArgs) when
+    N >= Max
+->
+    Result = transform_result(Result0),
+    emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
+maybe_retry({error, Reason}, Context, ReplyFunAndArgs) ->
+    #{
+        state := State,
+        attempt := Attempt,
+        key_or_num := KeyOrNum,
+        method := Method,
+        request := Request,
+        timeout := Timeout
+    } = Context,
+    %% TODO: reset the expiration time for free retries?
+    IsFreeRetry = Reason =:= normal orelse Reason =:= {shutdown, normal},
+    NContext =
+        case IsFreeRetry of
+            true -> Context;
+            false -> Context#{attempt := Attempt + 1}
+        end,
+    Worker = resolve_pool_worker(State, KeyOrNum),
+    ok = ehttpc:request_async(
+        Worker,
+        Method,
+        Request,
+        Timeout,
+        {fun ?MODULE:reply_delegator/3, [NContext, ReplyFunAndArgs]}
+    ),
+    ok;
+maybe_retry(Result, _Context, ReplyFunAndArgs) ->
+    emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).
+
 %% The HOCON schema system may generate sensitive keys with this format
 %% The HOCON schema system may generate sensitive keys with this format
 is_sensitive_key([{str, StringKey}]) ->
 is_sensitive_key([{str, StringKey}]) ->
     is_sensitive_key(StringKey);
     is_sensitive_key(StringKey);

+ 3 - 0
changes/ce/perf-10690.en.md

@@ -0,0 +1,3 @@
+Added a retry mechanism to webhook bridge that attempts to improve throughput.
+
+This optimization retries request failures without blocking the buffering layer, which can improve throughput in situations of high messaging rate.