|
|
@@ -583,10 +583,11 @@ do_flush(
|
|
|
),
|
|
|
case queue_count(Q1) > 0 of
|
|
|
true ->
|
|
|
- {keep_state, Data1, [{next_event, internal, flush}]};
|
|
|
+ flush_worker(self());
|
|
|
false ->
|
|
|
- {keep_state, Data1}
|
|
|
- end
|
|
|
+ ok
|
|
|
+ end,
|
|
|
+ {keep_state, Data1}
|
|
|
end;
|
|
|
do_flush(Data0, #{
|
|
|
is_batch := true,
|
|
|
@@ -659,15 +660,17 @@ do_flush(Data0, #{
|
|
|
}
|
|
|
),
|
|
|
CurrentCount = queue_count(Q1),
|
|
|
- case {CurrentCount > 0, CurrentCount >= BatchSize} of
|
|
|
- {false, _} ->
|
|
|
- {keep_state, Data1};
|
|
|
- {true, true} ->
|
|
|
- {keep_state, Data1, [{next_event, internal, flush}]};
|
|
|
- {true, false} ->
|
|
|
- Data2 = ensure_flush_timer(Data1),
|
|
|
- {keep_state, Data2}
|
|
|
- end
|
|
|
+ Data2 =
|
|
|
+ case {CurrentCount > 0, CurrentCount >= BatchSize} of
|
|
|
+ {false, _} ->
|
|
|
+ Data1;
|
|
|
+ {true, true} ->
|
|
|
+ flush_worker(self()),
|
|
|
+ Data1;
|
|
|
+ {true, false} ->
|
|
|
+ ensure_flush_timer(Data1)
|
|
|
+ end,
|
|
|
+ {keep_state, Data2}
|
|
|
end.
|
|
|
|
|
|
batch_reply_caller(Id, BatchResult, Batch, QueryOpts) ->
|