Browse Source

fix(buffer_worker): check request timeout and health check interval

Port of https://github.com/emqx/emqx/pull/10154 for `release-50`

Fixes https://emqx.atlassian.net/browse/EMQX-9099

Originally, the `resume_interval`, which is what defines how often a
buffer worker will attempt to retry its inflight window, was set to
the same as the `health_check_interval`.  This had the problem that,
with default values, `health_check_interval = request_timeout`.  This
meant that, if a buffer worker with those configs were ever blocked,
all requests would have timed out by the time it retried them.

Here we change the default `resume_interval` to a reasonable value
dependent on `health_check_interval` and `request_timeout`, and also
expose that as a hidden parameter for fine tuning if necessary.
Thales Macedo Garitezi 2 years ago
parent
commit
0b6fd7fe14

+ 11 - 0
apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf

@@ -45,6 +45,17 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise
     }
   }
 
+  resume_interval {
+    desc {
+      en: """The interval at which the buffer worker attempts to resend failed requests in the inflight window."""
+      zh: """在发送失败后尝试重传飞行窗口中的请求的时间间隔。"""
+    }
+    label {
+      en: """Resume Interval"""
+      zh: """重试时间间隔"""
+    }
+  }
+
   start_after_created {
     desc {
       en: """Whether start the resource right after created."""

+ 16 - 1
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -88,6 +88,8 @@
 -type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()).
 -type request() :: term().
 -type request_from() :: undefined | gen_statem:from().
+-type request_timeout() :: infinity | timer:time().
+-type health_check_interval() :: timer:time().
 -type state() :: blocked | running.
 -type inflight_key() :: integer().
 -type data() :: #{
@@ -199,6 +201,8 @@ init({Id, Index, Opts}) ->
     RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT),
     BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
     BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0),
+    DefaultResumeInterval = default_resume_interval(RequestTimeout, HealthCheckInterval),
+    ResumeInterval = maps:get(resume_interval, Opts, DefaultResumeInterval),
     Data = #{
         id => Id,
         index => Index,
@@ -207,7 +211,7 @@ init({Id, Index, Opts}) ->
         batch_size => BatchSize,
         batch_time => BatchTime,
         queue => Queue,
-        resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval),
+        resume_interval => ResumeInterval,
         tref => undefined
     },
     ?tp(buffer_worker_init, #{id => Id, index => Index}),
@@ -1679,6 +1683,17 @@ adjust_batch_time(Id, RequestTimeout, BatchTime0) ->
     end,
     BatchTime.
 
+%% The request timeout should be greater than the resume interval, as
+%% it defines how often the buffer worker tries to unblock. If request
+%% timeout is <= resume interval and the buffer worker is ever
+%% blocked, than all queued requests will basically fail without being
+%% attempted.
+-spec default_resume_interval(request_timeout(), health_check_interval()) -> timer:time().
+default_resume_interval(_RequestTimeout = infinity, HealthCheckInterval) ->
+    max(1, HealthCheckInterval);
+default_resume_interval(RequestTimeout, HealthCheckInterval) ->
+    max(1, min(HealthCheckInterval, RequestTimeout div 3)).
+
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 adjust_batch_time_test_() ->

+ 7 - 0
apps/emqx_resource/src/schema/emqx_resource_schema.erl

@@ -55,6 +55,7 @@ fields("creation_opts") ->
     [
         {worker_pool_size, fun worker_pool_size/1},
         {health_check_interval, fun health_check_interval/1},
+        {resume_interval, fun resume_interval/1},
         {start_after_created, fun start_after_created/1},
         {start_timeout, fun start_timeout/1},
         {auto_restart_interval, fun auto_restart_interval/1},
@@ -81,6 +82,12 @@ worker_pool_size(default) -> ?WORKER_POOL_SIZE;
 worker_pool_size(required) -> false;
 worker_pool_size(_) -> undefined.
 
+resume_interval(type) -> emqx_schema:duration_ms();
+resume_interval(hidden) -> true;
+resume_interval(desc) -> ?DESC("resume_interval");
+resume_interval(required) -> false;
+resume_interval(_) -> undefined.
+
 health_check_interval(type) -> emqx_schema:duration_ms();
 health_check_interval(desc) -> ?DESC("health_check_interval");
 health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW;

+ 8 - 0
changes/ce/fix-10154.en.md

@@ -0,0 +1,8 @@
+Change the default `resume_interval` for bridges and connectors to be
+the minimum of `health_check_interval` and `request_timeout / 3`.
+Also exposes it as a hidden configuration to allow fine tuning.
+
+Before this change, the default values for `resume_interval` meant
+that, if a buffer ever got blocked due to resource errors or high
+message volumes, then, by the time the buffer would try to resume its
+normal operations, almost all requests would have timed out.

+ 12 - 2
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl

@@ -520,6 +520,7 @@ wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
         #{measurements := #{gauge_set := ExpectedValue}} ->
             ok;
         #{measurements := #{gauge_set := Value}} ->
+            ct:pal("events: ~p", [Events]),
             ct:fail(
                 "gauge ~p didn't reach expected value ~p; last value: ~p",
                 [GaugeName, ExpectedValue, Value]
@@ -972,7 +973,13 @@ t_publish_econnrefused(Config) ->
     ResourceId = ?config(resource_id, Config),
     %% set pipelining to 1 so that one of the 2 requests is `pending'
     %% in ehttpc.
-    {ok, _} = create_bridge(Config, #{<<"pipelining">> => 1}),
+    {ok, _} = create_bridge(
+        Config,
+        #{
+            <<"pipelining">> => 1,
+            <<"resource_opts">> => #{<<"resume_interval">> => <<"15s">>}
+        }
+    ),
     {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
     on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
     assert_empty_metrics(ResourceId),
@@ -986,7 +993,10 @@ t_publish_timeout(Config) ->
     %% requests are done separately.
     {ok, _} = create_bridge(Config, #{
         <<"pipelining">> => 1,
-        <<"resource_opts">> => #{<<"batch_size">> => 1}
+        <<"resource_opts">> => #{
+            <<"batch_size">> => 1,
+            <<"resume_interval">> => <<"15s">>
+        }
     }),
     {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
     on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),