|
@@ -17,6 +17,7 @@
|
|
|
%% @doc Buffer servers are responsible for collecting batches from the
|
|
%% @doc Buffer servers are responsible for collecting batches from the
|
|
|
%% local processes, sharding and repackaging them.
|
|
%% local processes, sharding and repackaging them.
|
|
|
-module(emqx_ds_buffer).
|
|
-module(emqx_ds_buffer).
|
|
|
|
|
+-feature(maybe_expr, enable).
|
|
|
|
|
|
|
|
-behaviour(gen_server).
|
|
-behaviour(gen_server).
|
|
|
|
|
|
|
@@ -60,6 +61,11 @@
|
|
|
-callback shard_of_operation(emqx_ds:db(), emqx_ds:operation(), topic | clientid, _Options) ->
|
|
-callback shard_of_operation(emqx_ds:db(), emqx_ds:operation(), topic | clientid, _Options) ->
|
|
|
_Shard.
|
|
_Shard.
|
|
|
|
|
|
|
|
|
|
+-callback buffer_config(emqx_ds:db(), _Shard, _State, batch_size | batch_bytes | flush_interval) ->
|
|
|
|
|
+ {ok, non_neg_integer() | infinity} | undefined.
|
|
|
|
|
+
|
|
|
|
|
+-optional_callbacks([buffer_config/4]).
|
|
|
|
|
+
|
|
|
%%================================================================================
|
|
%%================================================================================
|
|
|
%% API functions
|
|
%% API functions
|
|
|
%%================================================================================
|
|
%%================================================================================
|
|
@@ -123,7 +129,8 @@ shard_of_operation(DB, Operation, ShardBy) ->
|
|
|
n_bytes = 0 :: non_neg_integer(),
|
|
n_bytes = 0 :: non_neg_integer(),
|
|
|
tref :: undefined | reference(),
|
|
tref :: undefined | reference(),
|
|
|
queue :: queue:queue(emqx_ds:operation()),
|
|
queue :: queue:queue(emqx_ds:operation()),
|
|
|
- pending_replies = [] :: [gen_server:from()]
|
|
|
|
|
|
|
+ pending_replies = [] :: [gen_server:from()],
|
|
|
|
|
+ oldest_message_timestamp :: integer() | undefined
|
|
|
}).
|
|
}).
|
|
|
|
|
|
|
|
init([CBM, CBMOptions, DB, Shard]) ->
|
|
init([CBM, CBMOptions, DB, Shard]) ->
|
|
@@ -210,14 +217,14 @@ enqueue(
|
|
|
Ops,
|
|
Ops,
|
|
|
BatchSize,
|
|
BatchSize,
|
|
|
BatchBytes,
|
|
BatchBytes,
|
|
|
- S0 = #s{n = NOps0, n_bytes = NBytes0, queue = Q0}
|
|
|
|
|
|
|
+ S0 = #s{n = NOps0, n_bytes = NBytes0, queue = Q0, oldest_message_timestamp = OldestTS0}
|
|
|
) ->
|
|
) ->
|
|
|
%% At this point we don't split the batches, even when they aren't
|
|
%% At this point we don't split the batches, even when they aren't
|
|
|
%% atomic. It wouldn't win us anything in terms of memory, and
|
|
%% atomic. It wouldn't win us anything in terms of memory, and
|
|
|
%% EMQX currently feeds data to DS in very small batches, so
|
|
%% EMQX currently feeds data to DS in very small batches, so
|
|
|
%% granularity should be fine enough.
|
|
%% granularity should be fine enough.
|
|
|
- NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),
|
|
|
|
|
- NBytesMax = application:get_env(emqx_durable_storage, egress_batch_bytes, infinity),
|
|
|
|
|
|
|
+ NMax = get_config(S0, batch_size),
|
|
|
|
|
+ NBytesMax = get_config(S0, batch_bytes),
|
|
|
NMsgs = NOps0 + BatchSize,
|
|
NMsgs = NOps0 + BatchSize,
|
|
|
NBytes = NBytes0 + BatchBytes,
|
|
NBytes = NBytes0 + BatchBytes,
|
|
|
case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NOps0 > 0) of
|
|
case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NOps0 > 0) of
|
|
@@ -229,8 +236,8 @@ enqueue(
|
|
|
false ->
|
|
false ->
|
|
|
%% The buffer is empty, we enqueue the atomic batch in its
|
|
%% The buffer is empty, we enqueue the atomic batch in its
|
|
|
%% entirety:
|
|
%% entirety:
|
|
|
- Q1 = lists:foldl(fun queue:in/2, Q0, Ops),
|
|
|
|
|
- S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1},
|
|
|
|
|
|
|
+ {Q1, OldestTS} = lists:foldl(fun enqueue_op/2, {Q0, OldestTS0}, Ops),
|
|
|
|
|
+ S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1, oldest_message_timestamp = OldestTS},
|
|
|
case NMsgs >= NMax orelse NBytes >= NBytesMax of
|
|
case NMsgs >= NMax orelse NBytes >= NBytesMax of
|
|
|
true ->
|
|
true ->
|
|
|
flush(S1);
|
|
flush(S1);
|
|
@@ -239,6 +246,17 @@ enqueue(
|
|
|
end
|
|
end
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
|
|
+enqueue_op(Msg = #message{timestamp = TS}, {Q, OldestTS}) ->
|
|
|
|
|
+ {
|
|
|
|
|
+ queue:in(Msg, Q),
|
|
|
|
|
+ min(OldestTS, TS)
|
|
|
|
|
+ };
|
|
|
|
|
+enqueue_op(Op, {Q, OldestTS}) ->
|
|
|
|
|
+ {
|
|
|
|
|
+ queue:in(Op, Q),
|
|
|
|
|
+ OldestTS
|
|
|
|
|
+ }.
|
|
|
|
|
+
|
|
|
-define(COOLDOWN_MIN, 1000).
|
|
-define(COOLDOWN_MIN, 1000).
|
|
|
-define(COOLDOWN_MAX, 5000).
|
|
-define(COOLDOWN_MAX, 5000).
|
|
|
|
|
|
|
@@ -257,7 +275,8 @@ do_flush(
|
|
|
shard = Shard,
|
|
shard = Shard,
|
|
|
metrics_id = Metrics,
|
|
metrics_id = Metrics,
|
|
|
n_retries = Retries,
|
|
n_retries = Retries,
|
|
|
- max_retries = MaxRetries
|
|
|
|
|
|
|
+ max_retries = MaxRetries,
|
|
|
|
|
+ oldest_message_timestamp = OTS
|
|
|
}
|
|
}
|
|
|
) ->
|
|
) ->
|
|
|
Messages = queue:to_list(Q),
|
|
Messages = queue:to_list(Q),
|
|
@@ -268,20 +287,30 @@ do_flush(
|
|
|
emqx_ds_builtin_metrics:observe_buffer_flush_time(Metrics, T1 - T0),
|
|
emqx_ds_builtin_metrics:observe_buffer_flush_time(Metrics, T1 - T0),
|
|
|
case Result of
|
|
case Result of
|
|
|
ok ->
|
|
ok ->
|
|
|
|
|
+ %% Report metrics and events:
|
|
|
emqx_ds_builtin_metrics:inc_buffer_batches(Metrics),
|
|
emqx_ds_builtin_metrics:inc_buffer_batches(Metrics),
|
|
|
emqx_ds_builtin_metrics:inc_buffer_messages(Metrics, S#s.n),
|
|
emqx_ds_builtin_metrics:inc_buffer_messages(Metrics, S#s.n),
|
|
|
emqx_ds_builtin_metrics:inc_buffer_bytes(Metrics, S#s.n_bytes),
|
|
emqx_ds_builtin_metrics:inc_buffer_bytes(Metrics, S#s.n_bytes),
|
|
|
|
|
+ case is_integer(OTS) of
|
|
|
|
|
+ true ->
|
|
|
|
|
+ Latency = erlang:system_time(millisecond) - OTS,
|
|
|
|
|
+ emqx_ds_builtin_metrics:observe_buffer_latency(Metrics, Latency);
|
|
|
|
|
+ false ->
|
|
|
|
|
+ ok
|
|
|
|
|
+ end,
|
|
|
?tp(
|
|
?tp(
|
|
|
emqx_ds_buffer_flush,
|
|
emqx_ds_buffer_flush,
|
|
|
#{db => DB, shard => Shard, batch => Messages}
|
|
#{db => DB, shard => Shard, batch => Messages}
|
|
|
),
|
|
),
|
|
|
|
|
+ %% Unblock clients:
|
|
|
lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies),
|
|
lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies),
|
|
|
erlang:garbage_collect(),
|
|
erlang:garbage_collect(),
|
|
|
S#s{
|
|
S#s{
|
|
|
n = 0,
|
|
n = 0,
|
|
|
n_bytes = 0,
|
|
n_bytes = 0,
|
|
|
queue = queue:new(),
|
|
queue = queue:new(),
|
|
|
- pending_replies = []
|
|
|
|
|
|
|
+ pending_replies = [],
|
|
|
|
|
+ oldest_message_timestamp = undefined
|
|
|
};
|
|
};
|
|
|
{error, recoverable, Err} when Retries < MaxRetries ->
|
|
{error, recoverable, Err} when Retries < MaxRetries ->
|
|
|
%% Note: this is a hot loop, so we report error messages
|
|
%% Note: this is a hot loop, so we report error messages
|
|
@@ -323,7 +352,8 @@ do_flush(
|
|
|
n_bytes = 0,
|
|
n_bytes = 0,
|
|
|
queue = queue:new(),
|
|
queue = queue:new(),
|
|
|
pending_replies = [],
|
|
pending_replies = [],
|
|
|
- n_retries = 0
|
|
|
|
|
|
|
+ n_retries = 0,
|
|
|
|
|
+ oldest_message_timestamp = undefined
|
|
|
}
|
|
}
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
@@ -402,7 +432,7 @@ compose_errors(ErrAcc, _Err) ->
|
|
|
ErrAcc.
|
|
ErrAcc.
|
|
|
|
|
|
|
|
ensure_timer(S = #s{tref = undefined}) ->
|
|
ensure_timer(S = #s{tref = undefined}) ->
|
|
|
- Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100),
|
|
|
|
|
|
|
+ Interval = get_config(S, flush_interval),
|
|
|
Tref = erlang:send_after(Interval, self(), ?flush),
|
|
Tref = erlang:send_after(Interval, self(), ?flush),
|
|
|
S#s{tref = Tref};
|
|
S#s{tref = Tref};
|
|
|
ensure_timer(S) ->
|
|
ensure_timer(S) ->
|
|
@@ -420,3 +450,20 @@ payload_size(#message{payload = P, topic = T}) ->
|
|
|
size(P) + size(T);
|
|
size(P) + size(T);
|
|
|
payload_size({_OpName, _}) ->
|
|
payload_size({_OpName, _}) ->
|
|
|
0.
|
|
0.
|
|
|
|
|
+
|
|
|
|
|
+get_config(#s{db = DB, callback_module = Mod, callback_state = State, shard = Shard}, Item) ->
|
|
|
|
|
+ maybe
|
|
|
|
|
+ true ?= erlang:function_exported(Mod, buffer_config, 4),
|
|
|
|
|
+ {ok, Val} ?= Mod:buffer_config(DB, Shard, State, Item),
|
|
|
|
|
+ Val
|
|
|
|
|
+ else
|
|
|
|
|
+ _ ->
|
|
|
|
|
+ case Item of
|
|
|
|
|
+ batch_size ->
|
|
|
|
|
+ application:get_env(emqx_durable_storage, egress_batch_size, 1000);
|
|
|
|
|
+ batch_bytes ->
|
|
|
|
|
+ application:get_env(emqx_durable_storage, egress_batch_bytes, infinity);
|
|
|
|
|
+ flush_interval ->
|
|
|
|
|
+ application:get_env(emqx_durable_storage, egress_flush_interval, 100)
|
|
|
|
|
+ end
|
|
|
|
|
+ end.
|