浏览代码

Merge pull request #12903 from ieQu1/dev/ds-egress-flush-condition

fix(ds): Fix egress flush condition
ieQu1 1 年之前
父节点
当前提交
f9eda1883f
共有 1 个文件被更改,包括 12 次插入12 次删除
  1. 12 12
      apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl

+ 12 - 12
apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl

@@ -127,7 +127,7 @@ init([DB, Shard]) ->
         metrics_id = MetricsId,
         queue = queue:new()
     },
-    {ok, start_timer(S)}.
+    {ok, S}.
 
 handle_call(
     #enqueue_req{
@@ -195,7 +195,6 @@ enqueue(
         true ->
             %% Adding this batch would cause buffer to overflow. Flush
             %% it now, and retry:
-            cancel_timer(S0),
             S1 = flush(S0),
             enqueue(Sync, Atomic, Msgs, BatchSize, BatchBytes, S1);
         false ->
@@ -203,12 +202,11 @@ enqueue(
             %% entirety:
             Q1 = lists:foldl(fun queue:in/2, Q0, Msgs),
             S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1},
-            case NMsgs >= NMax orelse NBytes >= NBytes of
+            case NMsgs >= NMax orelse NBytes >= NBytesMax of
                 true ->
-                    cancel_timer(S1),
                     flush(S1);
                 false ->
-                    S1
+                    ensure_timer(S1)
             end
     end.
 
@@ -216,7 +214,7 @@ enqueue(
 -define(COOLDOWN_MAX, 5000).
 
 flush(S) ->
-    start_timer(do_flush(S)).
+    do_flush(cancel_timer(S)).
 
 do_flush(S0 = #s{n = 0}) ->
     S0;
@@ -372,16 +370,18 @@ compose_errors({error, recoverable, _}, {error, unrecoverable, Err}) ->
 compose_errors(ErrAcc, _Err) ->
     ErrAcc.
 
-start_timer(S) ->
+ensure_timer(S = #s{tref = undefined}) ->
     Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100),
     Tref = erlang:send_after(Interval, self(), ?flush),
-    S#s{tref = Tref}.
+    S#s{tref = Tref};
+ensure_timer(S) ->
+    S.
 
-cancel_timer(#s{tref = undefined}) ->
-    ok;
-cancel_timer(#s{tref = TRef}) ->
+cancel_timer(S = #s{tref = undefined}) ->
+    S;
+cancel_timer(S = #s{tref = TRef}) ->
     _ = erlang:cancel_timer(TRef),
-    ok.
+    S#s{tref = undefined}.
 
 %% @doc Return approximate size of the MQTT message (it doesn't take
 %% all things into account, for example headers and extras)