|
|
@@ -17,6 +17,7 @@
|
|
|
%% @doc Buffer servers are responsible for collecting batches from the
|
|
|
%% local processes, sharding and repackaging them.
|
|
|
-module(emqx_ds_buffer).
|
|
|
+-feature(maybe_expr, enable).
|
|
|
|
|
|
-behaviour(gen_server).
|
|
|
|
|
|
@@ -60,6 +61,11 @@
|
|
|
-callback shard_of_operation(emqx_ds:db(), emqx_ds:operation(), topic | clientid, _Options) ->
|
|
|
_Shard.
|
|
|
|
|
|
+-callback buffer_config(emqx_ds:db(), _Shard, _State, batch_size | batch_bytes | flush_interval) ->
|
|
|
+ {ok, non_neg_integer() | infinity} | undefined.
|
|
|
+
|
|
|
+-optional_callbacks([buffer_config/4]).
|
|
|
+
|
|
|
%%================================================================================
|
|
|
%% API functions
|
|
|
%%================================================================================
|
|
|
@@ -216,8 +222,8 @@ enqueue(
|
|
|
%% atomic. It wouldn't win us anything in terms of memory, and
|
|
|
%% EMQX currently feeds data to DS in very small batches, so
|
|
|
%% granularity should be fine enough.
|
|
|
- NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),
|
|
|
- NBytesMax = application:get_env(emqx_durable_storage, egress_batch_bytes, infinity),
|
|
|
+ NMax = get_config(S0, batch_size),
|
|
|
+ NBytesMax = get_config(S0, batch_bytes),
|
|
|
NMsgs = NOps0 + BatchSize,
|
|
|
NBytes = NBytes0 + BatchBytes,
|
|
|
case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NOps0 > 0) of
|
|
|
@@ -402,7 +408,7 @@ compose_errors(ErrAcc, _Err) ->
|
|
|
ErrAcc.
|
|
|
|
|
|
ensure_timer(S = #s{tref = undefined}) ->
|
|
|
- Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100),
|
|
|
+ Interval = get_config(S, flush_interval),
|
|
|
Tref = erlang:send_after(Interval, self(), ?flush),
|
|
|
S#s{tref = Tref};
|
|
|
ensure_timer(S) ->
|
|
|
@@ -420,3 +426,20 @@ payload_size(#message{payload = P, topic = T}) ->
|
|
|
size(P) + size(T);
|
|
|
payload_size({_OpName, _}) ->
|
|
|
0.
|
|
|
+
|
|
|
+get_config(#s{db = DB, callback_module = Mod, callback_state = State, shard = Shard}, Item) ->
|
|
|
+ maybe
|
|
|
+ true ?= erlang:function_exported(Mod, buffer_config, 4),
|
|
|
+ {ok, Val} ?= Mod:buffer_config(DB, Shard, State, Item),
|
|
|
+ Val
|
|
|
+ else
|
|
|
+ _ ->
|
|
|
+ case Item of
|
|
|
+ batch_size ->
|
|
|
+ application:get_env(emqx_durable_storage, egress_batch_size, 1000);
|
|
|
+ batch_bytes ->
|
|
|
+ application:get_env(emqx_durable_storage, egress_batch_bytes, infinity);
|
|
|
+ flush_interval ->
|
|
|
+ application:get_env(emqx_durable_storage, egress_flush_interval, 100)
|
|
|
+ end
|
|
|
+ end.
|