|
|
@@ -129,7 +129,8 @@ shard_of_operation(DB, Operation, ShardBy) ->
|
|
|
n_bytes = 0 :: non_neg_integer(),
|
|
|
tref :: undefined | reference(),
|
|
|
queue :: queue:queue(emqx_ds:operation()),
|
|
|
- pending_replies = [] :: [gen_server:from()]
|
|
|
+ pending_replies = [] :: [gen_server:from()],
|
|
|
+ oldest_message_timestamp :: integer() | undefined
|
|
|
}).
|
|
|
|
|
|
init([CBM, CBMOptions, DB, Shard]) ->
|
|
|
@@ -216,7 +217,7 @@ enqueue(
|
|
|
Ops,
|
|
|
BatchSize,
|
|
|
BatchBytes,
|
|
|
- S0 = #s{n = NOps0, n_bytes = NBytes0, queue = Q0}
|
|
|
+ S0 = #s{n = NOps0, n_bytes = NBytes0, queue = Q0, oldest_message_timestamp = OldestTS0}
|
|
|
) ->
|
|
|
%% At this point we don't split the batches, even when they aren't
|
|
|
%% atomic. It wouldn't win us anything in terms of memory, and
|
|
|
@@ -235,8 +236,8 @@ enqueue(
|
|
|
false ->
|
|
|
%% The buffer is empty, we enqueue the atomic batch in its
|
|
|
%% entirety:
|
|
|
- Q1 = lists:foldl(fun queue:in/2, Q0, Ops),
|
|
|
- S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1},
|
|
|
+ {Q1, OldestTS} = lists:foldl(fun enqueue_op/2, {Q0, OldestTS0}, Ops),
|
|
|
+ S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1, oldest_message_timestamp = OldestTS},
|
|
|
case NMsgs >= NMax orelse NBytes >= NBytesMax of
|
|
|
true ->
|
|
|
flush(S1);
|
|
|
@@ -245,6 +246,17 @@ enqueue(
|
|
|
end
|
|
|
end.
|
|
|
|
|
|
+enqueue_op(Msg = #message{timestamp = TS}, {Q, OldestTS}) ->
|
|
|
+ {
|
|
|
+ queue:in(Msg, Q),
|
|
|
+ min(OldestTS, TS)
|
|
|
+ };
|
|
|
+enqueue_op(Op, {Q, OldestTS}) ->
|
|
|
+ {
|
|
|
+ queue:in(Op, Q),
|
|
|
+ OldestTS
|
|
|
+ }.
|
|
|
+
|
|
|
-define(COOLDOWN_MIN, 1000).
|
|
|
-define(COOLDOWN_MAX, 5000).
|
|
|
|
|
|
@@ -263,7 +275,8 @@ do_flush(
|
|
|
shard = Shard,
|
|
|
metrics_id = Metrics,
|
|
|
n_retries = Retries,
|
|
|
- max_retries = MaxRetries
|
|
|
+ max_retries = MaxRetries,
|
|
|
+ oldest_message_timestamp = OTS
|
|
|
}
|
|
|
) ->
|
|
|
Messages = queue:to_list(Q),
|
|
|
@@ -274,20 +287,30 @@ do_flush(
|
|
|
emqx_ds_builtin_metrics:observe_buffer_flush_time(Metrics, T1 - T0),
|
|
|
case Result of
|
|
|
ok ->
|
|
|
+ %% Report metrics and events:
|
|
|
emqx_ds_builtin_metrics:inc_buffer_batches(Metrics),
|
|
|
emqx_ds_builtin_metrics:inc_buffer_messages(Metrics, S#s.n),
|
|
|
emqx_ds_builtin_metrics:inc_buffer_bytes(Metrics, S#s.n_bytes),
|
|
|
+ case is_integer(OTS) of
|
|
|
+ true ->
|
|
|
+ Latency = erlang:system_time(millisecond) - OTS,
|
|
|
+ emqx_ds_builtin_metrics:observe_buffer_latency(Metrics, Latency);
|
|
|
+ false ->
|
|
|
+ ok
|
|
|
+ end,
|
|
|
?tp(
|
|
|
emqx_ds_buffer_flush,
|
|
|
#{db => DB, shard => Shard, batch => Messages}
|
|
|
),
|
|
|
+ %% Unblock clients:
|
|
|
lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies),
|
|
|
erlang:garbage_collect(),
|
|
|
S#s{
|
|
|
n = 0,
|
|
|
n_bytes = 0,
|
|
|
queue = queue:new(),
|
|
|
- pending_replies = []
|
|
|
+ pending_replies = [],
|
|
|
+ oldest_message_timestamp = undefined
|
|
|
};
|
|
|
{error, recoverable, Err} when Retries < MaxRetries ->
|
|
|
%% Note: this is a hot loop, so we report error messages
|
|
|
@@ -329,7 +352,8 @@ do_flush(
|
|
|
n_bytes = 0,
|
|
|
queue = queue:new(),
|
|
|
pending_replies = [],
|
|
|
- n_retries = 0
|
|
|
+ n_retries = 0,
|
|
|
+ oldest_message_timestamp = undefined
|
|
|
}
|
|
|
end.
|
|
|
|