|
|
@@ -117,13 +117,16 @@ init({Id, Index, Opts}) ->
|
|
|
true = gproc_pool:connect_worker(Id, {Id, Index}),
|
|
|
Name = name(Id, Index),
|
|
|
BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
|
|
|
+ SegBytes0 = maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE),
|
|
|
+ TotalBytes = maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE),
|
|
|
+ SegBytes = min(SegBytes0, TotalBytes),
|
|
|
Queue =
|
|
|
case maps:get(enable_queue, Opts, false) of
|
|
|
true ->
|
|
|
replayq:open(#{
|
|
|
dir => disk_queue_dir(Id, Index),
|
|
|
- seg_bytes => maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE),
|
|
|
- max_total_bytes => maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE),
|
|
|
+ seg_bytes => SegBytes,
|
|
|
+ max_total_bytes => TotalBytes,
|
|
|
sizer => fun ?MODULE:estimate_size/1,
|
|
|
marshaller => fun ?MODULE:queue_item_marshaller/1
|
|
|
});
|