|
@@ -1540,6 +1540,12 @@ clear_disk_queue_dir(Id, Index) ->
|
|
|
ensure_flush_timer(Data = #{batch_time := T}) ->
|
|
ensure_flush_timer(Data = #{batch_time := T}) ->
|
|
|
ensure_flush_timer(Data, T).
|
|
ensure_flush_timer(Data, T).
|
|
|
|
|
|
|
|
|
|
+ensure_flush_timer(Data = #{tref := undefined}, 0) ->
|
|
|
|
|
+ %% if the batch_time is 0, we don't need to start a timer, which
|
|
|
|
|
+ %% can be costly at high rates.
|
|
|
|
|
+ Ref = make_ref(),
|
|
|
|
|
+ self() ! {flush, {Ref, Ref}},
|
|
|
|
|
+ Data#{tref => {Ref, Ref}};
|
|
|
ensure_flush_timer(Data = #{tref := undefined}, T) ->
|
|
ensure_flush_timer(Data = #{tref := undefined}, T) ->
|
|
|
Ref = make_ref(),
|
|
Ref = make_ref(),
|
|
|
TRef = erlang:send_after(T, self(), {flush, Ref}),
|
|
TRef = erlang:send_after(T, self(), {flush, Ref}),
|