Просмотр исходного кода

fix(buffer_worker): add batch time automatic adjustment

To avoid message loss due to misconfigurations, we adjust `batch_time`
based on `request_timeout`.  If `batch_time` > `request_timeout`, all
requests will timeout before being sent if the message rate is low.
Even worse if `pool_size` is high.  We cap `batch_time` at
`request_timeout div 2` as a rule of thumb.
Thales Macedo Garitezi 3 лет назад
Родитель
Сommit
e9ffabf936
1 измененных файлов с 47 добавлено и 1 удалено
  1. 47 1
      apps/emqx_resource/src/emqx_resource_buffer_worker.erl

+ 47 - 1
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -196,13 +196,16 @@ init({Id, Index, Opts}) ->
     InflightWinSize = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
     InflightWinSize = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
     InflightTID = inflight_new(InflightWinSize, Id, Index),
     InflightTID = inflight_new(InflightWinSize, Id, Index),
     HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
     HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
+    RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT),
+    BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
+    BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0),
     Data = #{
     Data = #{
         id => Id,
         id => Id,
         index => Index,
         index => Index,
         inflight_tid => InflightTID,
         inflight_tid => InflightTID,
         async_workers => #{},
         async_workers => #{},
         batch_size => BatchSize,
         batch_size => BatchSize,
-        batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
+        batch_time => BatchTime,
         queue => Queue,
         queue => Queue,
         resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval),
         resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval),
         tref => undefined
         tref => undefined
@@ -1639,3 +1642,46 @@ do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt) = Query) -> Query.
 -else.
 -else.
 do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt).
 do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt).
 -endif.
 -endif.
+
+%% To avoid message loss due to misconfigurations, we adjust
+%% `batch_time' based on `request_timeout'.  If `batch_time' >
+%% `request_timeout', all requests will timeout before being sent if
+%% the message rate is low.  Even worse if `pool_size' is high.
+%% We cap `batch_time' at `request_timeout div 2' as a rule of thumb.
+adjust_batch_time(Id, RequestTimeout, BatchTime0) ->
+    BatchTime = max(0, min(BatchTime0, RequestTimeout div 2)),
+    case BatchTime =:= BatchTime0 of
+        false ->
+            ?SLOG(info, #{
+                id => Id,
+                msg => adjusting_buffer_worker_batch_time,
+                new_batch_time => BatchTime
+            });
+        true ->
+            ok
+    end,
+    BatchTime.
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+adjust_batch_time_test_() ->
+    %% just for logging
+    Id = some_id,
+    [
+        {"batch time smaller than request_time/2",
+            ?_assertEqual(
+                100,
+                adjust_batch_time(Id, 500, 100)
+            )},
+        {"batch time equal to request_time/2",
+            ?_assertEqual(
+                100,
+                adjust_batch_time(Id, 200, 100)
+            )},
+        {"batch time greater than request_time/2",
+            ?_assertEqual(
+                50,
+                adjust_batch_time(Id, 100, 100)
+            )}
+    ].
+-endif.