Sfoglia il codice sorgente

feat(resource): first commit for batching/async/caching mechanism

Shawn 3 anni fa
parent
commit
12904d797f

+ 1 - 0
apps/emqx_resource/include/emqx_resource.hrl

@@ -54,3 +54,4 @@
 -type after_query_fun() :: {fun((...) -> ok), Args :: [term()]}.
 
 -define(TEST_ID_PREFIX, "_test_:").
+-define(RES_METRICS, resource_metrics).

+ 2 - 3
apps/emqx_resource/include/emqx_resource_utils.hrl

@@ -15,7 +15,7 @@
 %%--------------------------------------------------------------------
 
 -define(SAFE_CALL(_EXP_),
-    ?SAFE_CALL(_EXP_, ok)
+    ?SAFE_CALL(_EXP_, {error, {_EXCLASS_, _EXCPTION_, _ST_}})
 ).
 
 -define(SAFE_CALL(_EXP_, _EXP_ON_FAIL_),
@@ -24,8 +24,7 @@
             (_EXP_)
         catch
             _EXCLASS_:_EXCPTION_:_ST_ ->
-                _EXP_ON_FAIL_,
-                {error, {_EXCLASS_, _EXCPTION_, _ST_}}
+                _EXP_ON_FAIL_
         end
     end()
 ).

+ 22 - 38
apps/emqx_resource/src/emqx_resource.erl

@@ -83,8 +83,7 @@
     stop/1,
     %% query the instance
     query/2,
-    %% query the instance with after_query()
-    query/3
+    query_async/3
 ]).
 
 %% Direct calls to the callback module
@@ -111,6 +110,8 @@
     list_group_instances/1
 ]).
 
+-export([inc_metrics_funcs/1, inc_success/1, inc_failed/1]).
+
 -optional_callbacks([
     on_query/4,
     on_get_status/2
@@ -150,16 +151,16 @@ is_resource_mod(Module) ->
 
 -spec query_success(after_query()) -> ok.
 query_success(undefined) -> ok;
-query_success({OnSucc, _}) -> apply_query_after_calls(OnSucc).
+query_success({OnSucc, _}) -> exec_query_after_calls(OnSucc).
 
 -spec query_failed(after_query()) -> ok.
 query_failed(undefined) -> ok;
-query_failed({_, OnFailed}) -> apply_query_after_calls(OnFailed).
+query_failed({_, OnFailed}) -> exec_query_after_calls(OnFailed).
 
-apply_query_after_calls(Funcs) ->
+exec_query_after_calls(Funcs) ->
     lists:foreach(
-        fun({Fun, Args}) ->
-            safe_apply(Fun, Args)
+        fun({Fun, Arg}) ->
+            emqx_resource_utils:safe_exec(Fun, Arg)
         end,
         Funcs
     ).
@@ -243,29 +244,12 @@ reset_metrics(ResId) ->
 %% =================================================================================
 -spec query(resource_id(), Request :: term()) -> Result :: term().
 query(ResId, Request) ->
-    query(ResId, Request, inc_metrics_funcs(ResId)).
-
-%% same to above, also defines what to do when the Module:on_query success or failed
-%% it is the duty of the Module to apply the `after_query()` functions.
--spec query(resource_id(), Request :: term(), after_query()) -> Result :: term().
-query(ResId, Request, AfterQuery) ->
-    case emqx_resource_manager:ets_lookup(ResId) of
-        {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} ->
-            %% the resource state is readonly to Module:on_query/4
-            %% and the `after_query()` functions should be thread safe
-            ok = emqx_metrics_worker:inc(resource_metrics, ResId, matched),
-            try
-                Mod:on_query(ResId, Request, AfterQuery, ResourceState)
-            catch
-                Err:Reason:ST ->
-                    emqx_metrics_worker:inc(resource_metrics, ResId, exception),
-                    erlang:raise(Err, Reason, ST)
-            end;
-        {ok, _Group, _Data} ->
-            query_error(not_connected, <<"resource not connected">>);
-        {error, not_found} ->
-            query_error(not_found, <<"resource not found">>)
-    end.
+    emqx_resource_worker:query(ResId, Request).
+
+-spec query_async(resource_id(), Request :: term(), emqx_resource_worker:reply_fun()) ->
+    ok.
+query_async(ResId, Request, ReplyFun) ->
+    emqx_resource_worker:query_async(ResId, Request, ReplyFun).
 
 -spec start(resource_id()) -> ok | {error, Reason :: term()}.
 start(ResId) ->
@@ -429,16 +413,16 @@ check_and_do(ResourceType, RawConfig, Do) when is_function(Do) ->
 
 %% =================================================================================
 
+inc_success(ResId) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ResId, success).
+
+inc_failed(ResId) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ResId, failed).
+
 filter_instances(Filter) ->
     [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
 
 inc_metrics_funcs(ResId) ->
-    OnFailed = [{fun emqx_metrics_worker:inc/3, [resource_metrics, ResId, failed]}],
-    OnSucc = [{fun emqx_metrics_worker:inc/3, [resource_metrics, ResId, success]}],
+    OnSucc = [{fun ?MODULE:inc_success/1, ResId}],
+    OnFailed = [{fun ?MODULE:inc_failed/1, ResId}],
     {OnSucc, OnFailed}.
-
-safe_apply(Func, Args) ->
-    ?SAFE_CALL(erlang:apply(Func, Args)).
-
-query_error(Reason, Msg) ->
-    {error, {?MODULE, #{reason => Reason, msg => Msg}}}.

+ 16 - 14
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -109,9 +109,9 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
     % The state machine will make the actual call to the callback/resource module after init
     ok = emqx_resource_manager_sup:ensure_child(MgrId, ResId, Group, ResourceType, Config, Opts),
     ok = emqx_metrics_worker:create_metrics(
-        resource_metrics,
+        ?RES_METRICS,
         ResId,
-        [matched, success, failed, exception],
+        [matched, success, failed, exception, resource_error],
         [matched]
     ),
     case maps:get(start_after_created, Opts, true) of
@@ -207,12 +207,12 @@ ets_lookup(ResId) ->
 
 %% @doc Get the metrics for the specified resource
 get_metrics(ResId) ->
-    emqx_metrics_worker:get_metrics(resource_metrics, ResId).
+    emqx_metrics_worker:get_metrics(?RES_METRICS, ResId).
 
 %% @doc Reset the metrics for the specified resource
 -spec reset_metrics(resource_id()) -> ok.
 reset_metrics(ResId) ->
-    emqx_metrics_worker:reset_metrics(resource_metrics, ResId).
+    emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId).
 
 %% @doc Returns the data for all resources
 -spec list_all() -> [resource_data()] | [].
@@ -298,8 +298,7 @@ handle_event({call, From}, stop, stopped, _Data) ->
     {keep_state_and_data, [{reply, From, ok}]};
 handle_event({call, From}, stop, _State, Data) ->
     Result = stop_resource(Data),
-    UpdatedData = Data#data{status = disconnected},
-    {next_state, stopped, UpdatedData, [{reply, From, Result}]};
+    {next_state, stopped, Data, [{reply, From, Result}]};
 % Called when a resource is to be stopped and removed.
 handle_event({call, From}, {remove, ClearMetrics}, _State, Data) ->
     handle_remove_event(From, ClearMetrics, Data);
@@ -315,9 +314,10 @@ handle_event({call, From}, health_check, _State, Data) ->
     handle_manually_health_check(From, Data);
 % State: CONNECTING
 handle_event(enter, _OldState, connecting, Data) ->
+    UpdatedData = Data#data{status = connected},
     insert_cache(Data#data.id, Data#data.group, Data),
     Actions = [{state_timeout, 0, health_check}],
-    {keep_state_and_data, Actions};
+    {keep_state, UpdatedData, Actions};
 handle_event(internal, start_resource, connecting, Data) ->
     start_resource(Data, undefined);
 handle_event(state_timeout, health_check, connecting, Data) ->
@@ -326,22 +326,24 @@ handle_event(state_timeout, health_check, connecting, Data) ->
 %% The connected state is entered after a successful on_start/2 of the callback mod
 %% and successful health_checks
 handle_event(enter, _OldState, connected, Data) ->
-    insert_cache(Data#data.id, Data#data.group, Data),
+    UpdatedData = Data#data{status = connected},
+    insert_cache(Data#data.id, Data#data.group, UpdatedData),
     _ = emqx_alarm:deactivate(Data#data.id),
     Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}],
-    {next_state, connected, Data, Actions};
+    {next_state, connected, UpdatedData, Actions};
 handle_event(state_timeout, health_check, connected, Data) ->
     handle_connected_health_check(Data);
 %% State: DISCONNECTED
 handle_event(enter, _OldState, disconnected, Data) ->
-    insert_cache(Data#data.id, Data#data.group, Data),
-    handle_disconnected_state_enter(Data);
+    UpdatedData = Data#data{status = disconnected},
+    insert_cache(Data#data.id, Data#data.group, UpdatedData),
+    handle_disconnected_state_enter(UpdatedData);
 handle_event(state_timeout, auto_retry, disconnected, Data) ->
     start_resource(Data, undefined);
 %% State: STOPPED
 %% The stopped state is entered after the resource has been explicitly stopped
 handle_event(enter, _OldState, stopped, Data) ->
-    UpdatedData = Data#data{status = disconnected},
+    UpdatedData = Data#data{status = stopped},
     insert_cache(Data#data.id, Data#data.group, UpdatedData),
     {next_state, stopped, UpdatedData};
 % Ignore all other events
@@ -415,7 +417,7 @@ handle_disconnected_state_enter(Data) ->
 handle_remove_event(From, ClearMetrics, Data) ->
     stop_resource(Data),
     case ClearMetrics of
-        true -> ok = emqx_metrics_worker:clear_metrics(resource_metrics, Data#data.id);
+        true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id);
         false -> ok
     end,
     {stop_and_reply, normal, [{reply, From, ok}]}.
@@ -433,7 +435,7 @@ start_resource(Data, From) ->
             _ = maybe_alarm(disconnected, Data#data.id),
             %% Keep track of the error reason why the connection did not work
             %% so that the Reason can be returned when the verification call is made.
-            UpdatedData = Data#data{status = disconnected, error = Reason},
+            UpdatedData = Data#data{error = Reason},
             Actions = maybe_reply([], From, Err),
             {next_state, disconnected, UpdatedData, Actions}
     end.

+ 3 - 1
apps/emqx_resource/src/emqx_resource_sup.erl

@@ -15,6 +15,8 @@
 %%--------------------------------------------------------------------
 -module(emqx_resource_sup).
 
+-include("emqx_resource.hrl").
+
 -behaviour(supervisor).
 
 -export([start_link/0]).
@@ -29,7 +31,7 @@ start_link() ->
 
 init([]) ->
     SupFlags = #{strategy => one_for_one, intensity => 10, period => 10},
-    Metrics = emqx_metrics_worker:child_spec(resource_metrics),
+    Metrics = emqx_metrics_worker:child_spec(?RES_METRICS),
 
     ResourceManager =
         #{

+ 17 - 0
apps/emqx_resource/src/emqx_resource_utils.erl

@@ -0,0 +1,17 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_resource_utils).

+ 252 - 0
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -0,0 +1,252 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% An FIFO queue using ETS-ReplayQ as backend.
+
+-module(emqx_resource_worker).
+
+-include("emqx_resource.hrl").
+-include("emqx_resource_utils.hrl").
+-include_lib("emqx/include/logger.hrl").
+
+-behaviour(gen_statem).
+
+-export([
+    start_link/2,
+    query/2,
+    query_async/3,
+    query_mfa/3
+]).
+
+-export([
+    callback_mode/0,
+    init/1
+]).
+
+-export([do/3]).
+
+%% count
+-define(DEFAULT_BATCH_SIZE, 100).
+%% milliseconds
+-define(DEFAULT_BATCH_TIME, 10).
+
+-define(QUERY(FROM, REQUEST), {FROM, REQUEST}).
+-define(REPLY(FROM, REQUEST, RESULT), {FROM, REQUEST, RESULT}).
+-define(EXPAND(RESULT, BATCH), [?REPLY(FROM, REQUEST, RESULT) || ?QUERY(FROM, REQUEST) <- BATCH]).
+
+-type id() :: binary().
+-type request() :: term().
+-type result() :: term().
+-type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()}.
+-type from() :: pid() | reply_fun().
+
+-callback batcher_flush(Acc :: [{from(), request()}], CbState :: term()) ->
+    {{from(), result()}, NewCbState :: term()}.
+
+callback_mode() -> [state_functions].
+
+start_link(Id, Opts) ->
+    gen_statem:start_link({local, name(Id)}, ?MODULE, {Id, Opts}, []).
+
+-spec query(id(), request()) -> ok.
+query(Id, Request) ->
+    gen_statem:call(name(Id), {query, Request}).
+
+-spec query_async(id(), request(), reply_fun()) -> ok.
+query_async(Id, Request, ReplyFun) ->
+    gen_statem:cast(name(Id), {query, Request, ReplyFun}).
+
+-spec name(id()) -> atom().
+name(Id) ->
+    Mod = atom_to_binary(?MODULE, utf8),
+    <<Mod/binary, ":", Id/binary>>.
+
+disk_cache_dir(Id) ->
+    filename:join([emqx:data_dir(), Id, cache]).
+
+init({Id, Opts}) ->
+    BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
+    Queue =
+        case maps:get(cache_enabled, Opts, true) of
+            true -> replayq:open(#{dir => disk_cache_dir(Id), seg_bytes => 10000000});
+            false -> undefined
+        end,
+    St = #{
+        id => Id,
+        batch_enabled => maps:get(batch_enabled, Opts, true),
+        batch_size => BatchSize,
+        batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
+        cache_queue => Queue,
+        acc => [],
+        acc_left => BatchSize,
+        tref => undefined
+    },
+    {ok, do, St}.
+
+do(cast, {query, Request, ReplyFun}, #{batch_enabled := true} = State) ->
+    do_acc(ReplyFun, Request, State);
+do(cast, {query, Request, ReplyFun}, #{batch_enabled := false} = State) ->
+    do_query(ReplyFun, Request, State);
+do({call, From}, {query, Request}, #{batch_enabled := true} = State) ->
+    do_acc(From, Request, State);
+do({call, From}, {query, Request}, #{batch_enabled := false} = State) ->
+    do_query(From, Request, State);
+do(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) ->
+    {keep_state, flush(St#{tref := undefined})};
+do(info, {flush, _Ref}, _St) ->
+    keep_state_and_data;
+do(info, Info, _St) ->
+    ?SLOG(error, #{msg => unexpected_msg, info => Info}),
+    keep_state_and_data.
+
+do_acc(From, Request, #{acc := Acc, acc_left := Left} = St0) ->
+    Acc1 = [?QUERY(From, Request) | Acc],
+    St = St0#{acc := Acc1, acc_left := Left - 1},
+    case Left =< 1 of
+        true -> {keep_state, flush(St)};
+        false -> {keep_state, ensure_flush_timer(St)}
+    end.
+
+do_query(From, Request, #{id := Id, cache_queue := Q0} = St0) ->
+    Result = call_query(Id, Request),
+    Q1 = reply_caller(Id, Q0, ?REPLY(From, Request, Result)),
+    {keep_state, St0#{cache_queue := Q1}}.
+
+flush(#{acc := []} = St) ->
+    St;
+flush(
+    #{
+        id := Id,
+        acc := Batch,
+        batch_size := Size,
+        cache_queue := Q0
+    } = St
+) ->
+    BatchResults = call_batch_query(Id, Batch),
+    Q1 = batch_reply_caller(Id, Q0, BatchResults),
+    cancel_flush_timer(
+        St#{
+            acc_left := Size,
+            acc := [],
+            cache_queue := Q1
+        }
+    ).
+
+maybe_append_cache(undefined, _Request) -> undefined;
+maybe_append_cache(Q, Request) -> replayq:append(Q, Request).
+
+batch_reply_caller(Id, Q, BatchResults) ->
+    lists:foldl(
+        fun(Reply, Q1) ->
+            reply_caller(Id, Q1, Reply)
+        end,
+        Q,
+        BatchResults
+    ).
+
+reply_caller(Id, Q, ?REPLY({ReplyFun, Args}, Request, Result)) when is_function(ReplyFun) ->
+    ?SAFE_CALL(ReplyFun(Result, Args)),
+    handle_query_result(Id, Q, Request, Result);
+reply_caller(Id, Q, ?REPLY(From, Request, Result)) ->
+    gen_statem:reply(From, Result),
+    handle_query_result(Id, Q, Request, Result).
+
+handle_query_result(Id, Q, _Request, ok) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, success),
+    Q;
+handle_query_result(Id, Q, _Request, {ok, _}) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, success),
+    Q;
+handle_query_result(Id, Q, _Request, {error, _}) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, failed),
+    Q;
+handle_query_result(Id, Q, Request, {error, {resource_error, #{reason := not_connected}}}) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, resource_error),
+    maybe_append_cache(Q, Request);
+handle_query_result(Id, Q, _Request, {error, {resource_error, #{}}}) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, resource_error),
+    Q;
+handle_query_result(Id, Q, Request, {error, {exception, _}}) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, exception),
+    maybe_append_cache(Q, Request).
+
+call_query(Id, Request) ->
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched),
+    case emqx_resource_manager:ets_lookup(Id) of
+        {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} ->
+            try Mod:on_query(Id, Request, ResourceState) of
+                Result -> Result
+            catch
+                Err:Reason:ST ->
+                    ModB = atom_to_binary(Mod, utf8),
+                    Msg = <<"call failed, func: ", ModB/binary, ":on_query/3">>,
+                    exception_error(Reason, Msg, {Err, Reason, ST})
+            end;
+        {ok, _Group, #{status := stopped}} ->
+            resource_error(stopped, <<"resource stopped or disabled">>);
+        {ok, _Group, _Data} ->
+            resource_error(not_connected, <<"resource not connected">>);
+        {error, not_found} ->
+            resource_error(not_found, <<"resource not found">>)
+    end.
+
+call_batch_query(Id, Batch) ->
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Batch)),
+    case emqx_resource_manager:ets_lookup(Id) of
+        {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} ->
+            try Mod:on_batch_query(Id, Batch, ResourceState) of
+                BatchResults -> BatchResults
+            catch
+                Err:Reason:ST ->
+                    ModB = atom_to_binary(Mod, utf8),
+                    Msg = <<"call failed, func: ", ModB/binary, ":on_batch_query/3">>,
+                    ?EXPAND(exception_error(Reason, Msg, {Err, Reason, ST}), Batch)
+            end;
+        {ok, _Group, _Data} ->
+            ?EXPAND(resource_error(not_connected, <<"resource not connected">>), Batch);
+        {error, not_found} ->
+            ?EXPAND(resource_error(not_found, <<"resource not found">>), Batch)
+    end.
+
+resource_error(Reason, Msg) ->
+    {error, {resource_error, #{reason => Reason, msg => Msg}}}.
+exception_error(Reason, Msg, Details) ->
+    {error, {exception, #{reason => Reason, msg => Msg, details => Details}}}.
+
+%% ==========================================
+ensure_flush_timer(St = #{tref := undefined, batch_time := T}) ->
+    Ref = make_ref(),
+    TRef = erlang:send_after(T, self(), {flush, Ref}),
+    St#{tref => {TRef, Ref}};
+ensure_flush_timer(St) ->
+    St.
+
+cancel_flush_timer(St = #{tref := undefined}) ->
+    St;
+cancel_flush_timer(St = #{tref := {TRef, _Ref}}) ->
+    _ = erlang:cancel_timer(TRef),
+    St#{tref => undefined}.
+
+query_mfa(InsertMode, Request, SyncTimeout) ->
+    {?MODULE, query_fun(InsertMode), query_args(InsertMode, Request, SyncTimeout)}.
+
+query_fun(<<"sync">>) -> query;
+query_fun(<<"async">>) -> query_async.
+
+query_args(<<"sync">>, Request, SyncTimeout) ->
+    [Request, SyncTimeout];
+query_args(<<"async">>, Request, _) ->
+    [Request].

+ 779 - 0
apps/emqx_resource/src/replayq.erl

@@ -0,0 +1,779 @@
+-module(replayq).
+
+-export([open/1, close/1]).
+-export([append/2, pop/2, ack/2, ack_sync/2, peek/1, overflow/1]).
+-export([count/1, bytes/1, is_empty/1, is_mem_only/1]).
+%% exported for troubleshooting
+-export([do_read_items/2]).
+
+%% internal exports for beam reload
+-export([committer_loop/2, default_sizer/1, default_marshaller/1]).
+
+-export_type([config/0, q/0, ack_ref/0, sizer/0, marshaller/0]).
+
+-define(NOTHING_TO_ACK, nothing_to_ack).
+-define(PENDING_ACKS(Ref), {replayq_pending_acks, Ref}).
+
+-type segno() :: pos_integer().
+-type item() :: term().
+-type count() :: non_neg_integer().
+-type id() :: count().
+-type bytes() :: non_neg_integer().
+-type filename() :: file:filename_all().
+-type dir() :: filename().
+-type ack_ref() :: ?NOTHING_TO_ACK | {segno(), ID :: pos_integer()}.
+-type sizer() :: fun((item()) -> bytes()).
+-type marshaller() :: fun((item()) -> binary()).
+
+-type config() :: #{
+    dir => dir(),
+    seg_bytes => bytes(),
+    mem_only => boolean(),
+    max_total_bytes => bytes(),
+    offload => boolean(),
+    sizer => sizer(),
+    marshaller => marshaller()
+}.
+%% writer cursor
+-define(NO_FD, no_fd).
+-type w_cur() :: #{
+    segno := segno(),
+    bytes := bytes(),
+    count := count(),
+    fd := ?NO_FD | file:fd()
+}.
+
+-type stats() :: #{
+    bytes := bytes(),
+    count := count()
+}.
+
+-opaque q() :: #{
+    config := mem_only | config(),
+    stats := stats(),
+    in_mem := queue:queue(in_mem_item()),
+    w_cur => w_cur(),
+    committer => pid(),
+    head_segno => segno(),
+    sizer := sizer(),
+    marshaller => marshaller(),
+    max_total_bytes := bytes()
+}.
+
+-define(LAYOUT_VSN_0, 0).
+-define(LAYOUT_VSN_1, 1).
+-define(MAGIC, 841265288).
+-define(SUFFIX, "replaylog").
+-define(DEFAULT_POP_BYTES_LIMIT, 2000000).
+-define(DEFAULT_POP_COUNT_LIMIT, 1000).
+-define(DEFAULT_REPLAYQ_LIMIT, 2000000000).
+-define(COMMIT(SEGNO, ID, From), {commit, SEGNO, ID, From}).
+-define(NO_COMMIT_HIST, no_commit_hist).
+-define(FIRST_SEGNO, 1).
+-define(NEXT_SEGNO(N), (N + 1)).
+-define(STOP, stop).
+-define(MEM_ONLY_ITEM(Bytes, Item), {Bytes, Item}).
+-define(DISK_CP_ITEM(Id, Bytes, Item), {Id, Bytes, Item}).
+
+-type in_mem_item() ::
+    ?MEM_ONLY_ITEM(bytes(), item())
+    | ?DISK_CP_ITEM(id(), bytes(), item()).
+
+-spec open(config()) -> q().
+open(#{mem_only := true} = C) ->
+    #{
+        stats => #{bytes => 0, count => 0},
+        in_mem => queue:new(),
+        sizer => get_sizer(C),
+        config => mem_only,
+        max_total_bytes => maps:get(max_total_bytes, C, ?DEFAULT_REPLAYQ_LIMIT)
+    };
+open(#{dir := Dir, seg_bytes := _} = Config) ->
+    ok = filelib:ensure_dir(filename:join(Dir, "foo")),
+    Sizer = get_sizer(Config),
+    Marshaller = get_marshaller(Config),
+    IsOffload = is_offload_mode(Config),
+    Q =
+        case delete_consumed_and_list_rest(Dir) of
+            [] ->
+                %% no old segments, start over from zero
+                #{
+                    stats => #{bytes => 0, count => 0},
+                    w_cur => init_writer(Dir, empty, IsOffload),
+                    committer => spawn_committer(?FIRST_SEGNO, Dir),
+                    head_segno => ?FIRST_SEGNO,
+                    in_mem => queue:new()
+                };
+            Segs ->
+                LastSegno = lists:last(Segs),
+                CommitHist = get_commit_hist(Dir),
+                Reader = fun(Seg, Ch) -> read_items(Dir, Seg, Ch, Sizer, Marshaller) end,
+                HeadItems = Reader(hd(Segs), CommitHist),
+                #{
+                    stats => collect_stats(HeadItems, tl(Segs), Reader),
+                    w_cur => init_writer(Dir, LastSegno, IsOffload),
+                    committer => spawn_committer(hd(Segs), Dir),
+                    head_segno => hd(Segs),
+                    in_mem => queue:from_list(HeadItems)
+                }
+        end,
+    Q#{
+        sizer => Sizer,
+        marshaller => Marshaller,
+        config => maps:without([sizer, marshaller], Config),
+        max_total_bytes => maps:get(max_total_bytes, Config, ?DEFAULT_REPLAYQ_LIMIT)
+    }.
+
+-spec close(q() | w_cur()) -> ok | {error, any()}.
+close(#{config := mem_only}) ->
+    ok;
+close(#{w_cur := W_Cur, committer := Pid} = Q) ->
+    MRef = erlang:monitor(process, Pid),
+    Pid ! ?STOP,
+    unlink(Pid),
+    receive
+        {'DOWN', MRef, process, Pid, _Reason} ->
+            ok
+    end,
+    ok = maybe_dump_back_to_disk(Q),
+    do_close(W_Cur).
+
+do_close(#{fd := ?NO_FD}) -> ok;
+do_close(#{fd := Fd}) -> file:close(Fd).
+
+%% In case of offload mode, dump the unacked (and un-popped) on disk
+%% before close. this serves as a best-effort data loss protection
+maybe_dump_back_to_disk(#{config := Config} = Q) ->
+    case is_offload_mode(Config) of
+        true -> dump_back_to_disk(Q);
+        false -> ok
+    end.
+
+dump_back_to_disk(#{
+    config := #{dir := Dir},
+    head_segno := ReaderSegno,
+    in_mem := InMem,
+    marshaller := Marshaller
+}) ->
+    IoData0 = get_unacked(process_info(self(), dictionary), ReaderSegno, Marshaller),
+    Items1 = queue:to_list(InMem),
+    IoData1 = lists:map(fun(?DISK_CP_ITEM(_, _, I)) -> make_iodata(I, Marshaller) end, Items1),
+    %% ensure old segment file is deleted
+    ok = ensure_deleted(filename(Dir, ReaderSegno)),
+    %% rewrite the segment with what's currently in memory
+    IoData = [IoData0, IoData1],
+    case iolist_size(IoData) > 0 of
+        true ->
+            #{fd := Fd} = open_segment(Dir, ReaderSegno),
+            ok = file:write(Fd, [IoData0, IoData1]),
+            ok = file:close(Fd);
+        false ->
+            %% nothing to write
+            ok
+    end.
+
+get_unacked({dictionary, Dict}, ReaderSegno, Marshaller) ->
+    F = fun
+        ({?PENDING_ACKS(AckRef), Items}) ->
+            erase(?PENDING_ACKS(AckRef)),
+            {Segno, Id} = AckRef,
+            Segno =:= ReaderSegno andalso
+                {true, {Id, Items}};
+        (_) ->
+            false
+    end,
+    Pendings0 = lists:filtermap(F, Dict),
+    Pendings = lists:keysort(1, Pendings0),
+    do_get_unacked(Pendings, Marshaller).
+
+do_get_unacked([], _Marshaller) ->
+    [];
+do_get_unacked([{_, Items} | Rest], Marshaller) ->
+    [
+        [make_iodata(I, Marshaller) || I <- Items]
+        | do_get_unacked(Rest, Marshaller)
+    ].
+
+-spec append(q(), [item()]) -> q().
+append(Q, []) ->
+    Q;
+append(
+    #{
+        config := mem_only,
+        in_mem := InMem,
+        stats := #{bytes := Bytes0, count := Count0},
+        sizer := Sizer
+    } = Q,
+    Items0
+) ->
+    {CountDiff, BytesDiff, Items} = transform(false, Items0, Sizer),
+
+    Stats = #{count => Count0 + CountDiff, bytes => Bytes0 + BytesDiff},
+    Q#{
+        stats := Stats,
+        in_mem := append_in_mem(Items, InMem)
+    };
+append(
+    #{
+        config := #{seg_bytes := BytesLimit, dir := Dir} = Config,
+        stats := #{bytes := Bytes0, count := Count0},
+        w_cur := #{count := CountInSeg, segno := WriterSegno} = W_Cur0,
+        head_segno := ReaderSegno,
+        sizer := Sizer,
+        marshaller := Marshaller,
+        in_mem := HeadItems0
+    } = Q,
+    Items0
+) ->
+    IoData = lists:map(fun(I) -> make_iodata(I, Marshaller) end, Items0),
+    {CountDiff, BytesDiff, Items} = transform(CountInSeg + 1, Items0, Sizer),
+    TotalBytes = Bytes0 + BytesDiff,
+    Stats = #{count => Count0 + CountDiff, bytes => TotalBytes},
+    IsOffload = is_offload_mode(Config),
+    W_Cur1 = do_append(W_Cur0, CountDiff, BytesDiff, IoData),
+    W_Cur =
+        case is_segment_full(W_Cur1, TotalBytes, BytesLimit, ReaderSegno, IsOffload) of
+            true ->
+                ok = do_close(W_Cur1),
+                %% get ready for the next append
+                open_segment(Dir, ?NEXT_SEGNO(WriterSegno));
+            false ->
+                W_Cur1
+        end,
+    HeadItems =
+        case ReaderSegno =:= WriterSegno of
+            true -> append_in_mem(Items, HeadItems0);
+            false -> HeadItems0
+        end,
+    Q#{
+        stats := Stats,
+        w_cur := W_Cur,
+        in_mem := HeadItems
+    }.
+
+%% @doc pop out at least one item from the queue.
+%% volume limited by `bytes_limit' and `count_limit'.
+-spec pop(q(), #{bytes_limit => bytes(), count_limit => count()}) ->
+    {q(), ack_ref(), [item()]}.
+pop(Q, Opts) ->
+    Bytes = maps:get(bytes_limit, Opts, ?DEFAULT_POP_BYTES_LIMIT),
+    Count = maps:get(count_limit, Opts, ?DEFAULT_POP_COUNT_LIMIT),
+    true = (Count > 0),
+    pop(Q, Bytes, Count, ?NOTHING_TO_ACK, []).
+
+%% @doc peek the queue front item.
+-spec peek(q()) -> empty | item().
+peek(#{in_mem := HeadItems}) ->
+    case queue:peek(HeadItems) of
+        empty -> empty;
+        {value, ?MEM_ONLY_ITEM(_, Item)} -> Item;
+        {value, ?DISK_CP_ITEM(_, _, Item)} -> Item
+    end.
+
+%% @doc Asynch-ly write the consumed item Segment number + ID to a file.
+-spec ack(q(), ack_ref()) -> ok.
+ack(_, ?NOTHING_TO_ACK) ->
+    ok;
+ack(#{committer := Pid}, {Segno, Id} = AckRef) ->
+    _ = erlang:erase(?PENDING_ACKS(AckRef)),
+    Pid ! ?COMMIT(Segno, Id, false),
+    ok.
+
+%% @hidden Synced ack, for deterministic tests only
+-spec ack_sync(q(), ack_ref()) -> ok.
+ack_sync(_, ?NOTHING_TO_ACK) ->
+    ok;
+ack_sync(#{committer := Pid}, {Segno, Id} = AckRef) ->
+    _ = erlang:erase(?PENDING_ACKS(AckRef)),
+    Ref = make_ref(),
+    Pid ! ?COMMIT(Segno, Id, {self(), Ref}),
+    receive
+        {Ref, ok} -> ok
+    end.
+
+-spec count(q()) -> count().
+count(#{stats := #{count := Count}}) -> Count.
+
+-spec bytes(q()) -> bytes().
+bytes(#{stats := #{bytes := Bytes}}) -> Bytes.
+
+is_empty(#{config := mem_only, in_mem := All}) ->
+    queue:is_empty(All);
+is_empty(
+    #{
+        w_cur := #{segno := WriterSegno},
+        head_segno := ReaderSegno,
+        in_mem := HeadItems
+    } = Q
+) ->
+    Result = ((WriterSegno =:= ReaderSegno) andalso queue:is_empty(HeadItems)),
+    %% assert
+    Result = (count(Q) =:= 0).
+
+%% @doc Returns number of bytes the size of the queue has exceeded
+%% total bytes limit. Result is negative when it is not overflow.
+-spec overflow(q()) -> integer().
+overflow(#{
+    max_total_bytes := MaxTotalBytes,
+    stats := #{bytes := Bytes}
+}) ->
+    Bytes - MaxTotalBytes.
+
+-spec is_mem_only(q()) -> boolean().
+is_mem_only(#{config := mem_only}) ->
+    true;
+is_mem_only(_) ->
+    false.
+
+%% internals =========================================================
+
+transform(Id, Items, Sizer) ->
+    transform(Id, Items, Sizer, 0, 0, []).
+
+transform(_Id, [], _Sizer, Count, Bytes, Acc) ->
+    {Count, Bytes, lists:reverse(Acc)};
+transform(Id, [Item0 | Rest], Sizer, Count, Bytes, Acc) ->
+    Size = Sizer(Item0),
+    {NextId, Item} =
+        case Id of
+            false -> {false, ?MEM_ONLY_ITEM(Size, Item0)};
+            N -> {N + 1, ?DISK_CP_ITEM(Id, Size, Item0)}
+        end,
+    transform(NextId, Rest, Sizer, Count + 1, Bytes + Size, [Item | Acc]).
+
+append_in_mem([], Q) -> Q;
+append_in_mem([Item | Rest], Q) -> append_in_mem(Rest, queue:in(Item, Q)).
+
+pop(Q, _Bytes, 0, AckRef, Acc) ->
+    Result = lists:reverse(Acc),
+    ok = maybe_save_pending_acks(AckRef, Q, Result),
+    {Q, AckRef, Result};
+pop(#{config := Cfg} = Q, Bytes, Count, AckRef, Acc) ->
+    case is_empty(Q) of
+        true ->
+            {Q, AckRef, lists:reverse(Acc)};
+        false when Cfg =:= mem_only ->
+            pop_mem(Q, Bytes, Count, Acc);
+        false ->
+            pop2(Q, Bytes, Count, AckRef, Acc)
+    end.
+
+pop_mem(
+    #{
+        in_mem := InMem,
+        stats := #{count := TotalCount, bytes := TotalBytes} = Stats
+    } = Q,
+    Bytes,
+    Count,
+    Acc
+) ->
+    case queue:out(InMem) of
+        {{value, ?MEM_ONLY_ITEM(Sz, _Item)}, _} when Sz > Bytes andalso Acc =/= [] ->
+            {Q, ?NOTHING_TO_ACK, lists:reverse(Acc)};
+        {{value, ?MEM_ONLY_ITEM(Sz, Item)}, Rest} ->
+            NewQ = Q#{
+                in_mem := Rest,
+                stats := Stats#{
+                    count := TotalCount - 1,
+                    bytes := TotalBytes - Sz
+                }
+            },
+            pop(NewQ, Bytes - Sz, Count - 1, ?NOTHING_TO_ACK, [Item | Acc])
+    end.
+
+pop2(
+    #{
+        head_segno := ReaderSegno,
+        in_mem := HeadItems,
+        stats := #{count := TotalCount, bytes := TotalBytes} = Stats,
+        w_cur := #{segno := WriterSegno}
+    } = Q,
+    Bytes,
+    Count,
+    AckRef,
+    Acc
+) ->
+    case queue:out(HeadItems) of
+        {{value, ?DISK_CP_ITEM(_, Sz, _Item)}, _} when Sz > Bytes andalso Acc =/= [] ->
+            %% taking the head item would cause exceeding size limit
+            {Q, AckRef, lists:reverse(Acc)};
+        {{value, ?DISK_CP_ITEM(Id, Sz, Item)}, Rest} ->
+            Q1 = Q#{
+                in_mem := Rest,
+                stats := Stats#{
+                    count := TotalCount - 1,
+                    bytes := TotalBytes - Sz
+                }
+            },
+            %% read the next segment in case current is drained
+            NewQ =
+                case queue:is_empty(Rest) andalso ReaderSegno < WriterSegno of
+                    true -> read_next_seg(Q1);
+                    false -> Q1
+                end,
+            NewAckRef = {ReaderSegno, Id},
+            pop(NewQ, Bytes - Sz, Count - 1, NewAckRef, [Item | Acc])
+    end.
+
+%% due to backward compatibility reasons for the ack api
+%% we ca nnot track pending acks in q() -- reason to use process dictionary
+maybe_save_pending_acks(?NOTHING_TO_ACK, _, _) ->
+    ok;
+maybe_save_pending_acks(AckRef, #{config := Config}, Items) ->
+    case is_offload_mode(Config) of
+        true ->
+            _ = erlang:put(?PENDING_ACKS(AckRef), Items),
+            ok;
+        false ->
+            ok
+    end.
+
+read_next_seg(
+    #{
+        config := #{dir := Dir} = Config,
+        head_segno := ReaderSegno,
+        w_cur := #{segno := WriterSegno, fd := Fd} = WCur0,
+        sizer := Sizer,
+        marshaller := Marshaller
+    } = Q
+) ->
+    NextSegno = ReaderSegno + 1,
+    %% reader has caught up to latest segment
+    case NextSegno =:= WriterSegno of
+        true ->
+            %% force flush to disk so the next read can get all bytes
+            ok = file:sync(Fd);
+        false ->
+            ok
+    end,
+    IsOffload = is_offload_mode(Config),
+    WCur =
+        case IsOffload andalso NextSegno =:= WriterSegno of
+            true ->
+                %% reader has caught up to latest segment in offload mode,
+                %% close the writer's fd. Continue in mem-only mode for the head segment
+                ok = do_close(WCur0),
+                WCur0#{fd := ?NO_FD};
+            false ->
+                WCur0
+        end,
+    NextSegItems = read_items(Dir, NextSegno, ?NO_COMMIT_HIST, Sizer, Marshaller),
+    Q#{
+        head_segno := NextSegno,
+        in_mem := queue:from_list(NextSegItems),
+        w_cur := WCur
+    }.
+
+delete_consumed_and_list_rest(Dir0) ->
+    Dir = unicode:characters_to_list(Dir0),
+    Segnos0 = lists:sort([parse_segno(N) || N <- filelib:wildcard("*." ?SUFFIX, Dir)]),
+    {SegnosToDelete, Segnos} = find_segnos_to_delete(Dir, Segnos0),
+    ok = lists:foreach(fun(Segno) -> ensure_deleted(filename(Dir, Segno)) end, SegnosToDelete),
+    case Segnos of
+        [] ->
+            %% delete commit file in case there is no segments left
+            %% segment number will start from 0 again.
+            ensure_deleted(commit_filename(Dir)),
+            [];
+        X ->
+            X
+    end.
+
+find_segnos_to_delete(Dir, Segnos) ->
+    CommitHist = get_commit_hist(Dir),
+    do_find_segnos_to_delete(Dir, Segnos, CommitHist).
+
+do_find_segnos_to_delete(_Dir, Segnos, ?NO_COMMIT_HIST) ->
+    {[], Segnos};
+do_find_segnos_to_delete(Dir, Segnos0, {CommittedSegno, CommittedId}) ->
+    {SegnosToDelete, Segnos} = lists:partition(fun(N) -> N < CommittedSegno end, Segnos0),
+    case
+        Segnos =/= [] andalso
+            hd(Segnos) =:= CommittedSegno andalso
+            is_all_consumed(Dir, CommittedSegno, CommittedId)
+    of
+        true ->
+            %% assert
+            CommittedSegno = hd(Segnos),
+            %% all items in the oldest segment have been consumed,
+            %% no need to keep this segment
+            {[CommittedSegno | SegnosToDelete], tl(Segnos)};
+        _ ->
+            {SegnosToDelete, Segnos}
+    end.
+
+%% ALL items are consumed if the committed item ID is no-less than the number
+%% of items in this segment
+is_all_consumed(Dir, CommittedSegno, CommittedId) ->
+    CommittedId >= erlang:length(do_read_items(Dir, CommittedSegno)).
+
+ensure_deleted(Filename) ->
+    case file:delete(Filename) of
+        ok -> ok;
+        {error, enoent} -> ok
+    end.
+
+%% The committer writes consumer's acked segmeng number + item ID
+%% to a file. The file is only read at start/restart.
+spawn_committer(ReaderSegno, Dir) ->
+    Name = iolist_to_binary(filename:join([Dir, committer])),
+    %% register a name to avoid having two committers spawned for the same dir
+    RegName = binary_to_atom(Name, utf8),
+    Pid = erlang:spawn_link(fun() -> committer_loop(ReaderSegno, Dir) end),
+    true = erlang:register(RegName, Pid),
+    Pid.
+
+committer_loop(ReaderSegno, Dir) ->
+    receive
+        ?COMMIT(Segno0, Id0, false) ->
+            {Segno, Id} = collect_async_commits(Segno0, Id0),
+            ok = handle_commit(ReaderSegno, Dir, Segno, Id, false),
+            ?MODULE:committer_loop(Segno, Dir);
+        ?COMMIT(Segno, Id, From) ->
+            ok = handle_commit(ReaderSegno, Dir, Segno, Id, From),
+            ?MODULE:committer_loop(Segno, Dir);
+        ?STOP ->
+            ok;
+        Msg ->
+            exit({replayq_committer_unkown_msg, Msg})
+    after 200 ->
+        ?MODULE:committer_loop(ReaderSegno, Dir)
+    end.
+
+handle_commit(ReaderSegno, Dir, Segno, Id, From) ->
+    IoData = io_lib:format("~p.\n", [#{segno => Segno, id => Id}]),
+    ok = do_commit(Dir, IoData),
+    case Segno > ReaderSegno of
+        true ->
+            SegnosToDelete = lists:seq(ReaderSegno, Segno - 1),
+            lists:foreach(fun(N) -> ok = ensure_deleted(filename(Dir, N)) end, SegnosToDelete);
+        false ->
+            ok
+    end,
+    ok = reply_ack_ok(From).
+
+%% Collect async acks which are already sent in the mailbox,
+%% and keep only the last one for the current segment.
+collect_async_commits(Segno, Id) ->
+    receive
+        ?COMMIT(Segno, AnotherId, false) ->
+            collect_async_commits(Segno, AnotherId)
+    after 0 ->
+        {Segno, Id}
+    end.
+
+reply_ack_ok({Pid, Ref}) ->
+    Pid ! {Ref, ok},
+    ok;
+reply_ack_ok(_) ->
+    ok.
+
+get_commit_hist(Dir) ->
+    CommitFile = commit_filename(Dir),
+    case filelib:is_regular(CommitFile) of
+        true ->
+            {ok, [#{segno := Segno, id := Id}]} = file:consult(CommitFile),
+            {Segno, Id};
+        false ->
+            ?NO_COMMIT_HIST
+    end.
+
+do_commit(Dir, IoData) ->
+    TmpName = commit_filename(Dir, "COMMIT.tmp"),
+    Name = commit_filename(Dir),
+    ok = file:write_file(TmpName, IoData),
+    ok = file:rename(TmpName, Name).
+
+commit_filename(Dir) ->
+    commit_filename(Dir, "COMMIT").
+
+commit_filename(Dir, Name) ->
+    filename:join([Dir, Name]).
+
+do_append(
+    #{fd := ?NO_FD, bytes := Bytes0, count := Count0} = Cur,
+    Count,
+    Bytes,
+    _IoData
+) ->
+    %% offload mode, fd is not initialized yet
+    Cur#{
+        bytes => Bytes0 + Bytes,
+        count => Count0 + Count
+    };
+do_append(
+    #{fd := Fd, bytes := Bytes0, count := Count0} = Cur,
+    Count,
+    Bytes,
+    IoData
+) ->
+    ok = file:write(Fd, IoData),
+    Cur#{
+        bytes => Bytes0 + Bytes,
+        count => Count0 + Count
+    }.
+
+read_items(Dir, Segno, CommitHist, Sizer, Marshaller) ->
+    Items0 = do_read_items(Dir, Segno),
+    Items =
+        case CommitHist of
+            ?NO_COMMIT_HIST ->
+                %% no commit hist, return all
+                Items0;
+            {CommitedSegno, _} when CommitedSegno < Segno ->
+                %% committed at an older segment
+                Items0;
+            {Segno, CommittedId} ->
+                %% committed at current segment keep only the tail
+                {_, R} = lists:splitwith(fun({I, _}) -> I =< CommittedId end, Items0),
+                R
+        end,
+    lists:map(
+        fun({Id, Bin}) ->
+            Item = Marshaller(Bin),
+            Size = Sizer(Item),
+            ?DISK_CP_ITEM(Id, Size, Item)
+        end,
+        Items
+    ).
+
+do_read_items(Dir, Segno) ->
+    Filename = filename(Dir, Segno),
+    {ok, Bin} = file:read_file(Filename),
+    case parse_items(Bin, 1, []) of
+        {Items, <<>>} ->
+            Items;
+        {Items, Corrupted} ->
+            error_logger:error_msg(
+                "corrupted replayq log: ~s, skipped ~p bytes",
+                [Filename, size(Corrupted)]
+            ),
+            Items
+    end.
+
+parse_items(<<>>, _Id, Acc) ->
+    {lists:reverse(Acc), <<>>};
+parse_items(
+    <<?LAYOUT_VSN_1:8, ?MAGIC:32/unsigned-integer, CRC:32/unsigned-integer,
+        Size:32/unsigned-integer, Item:Size/binary, Rest/binary>> = All,
+    Id,
+    Acc
+) ->
+    case CRC =:= erlang:crc32(Item) of
+        true -> parse_items(Rest, Id + 1, [{Id, Item} | Acc]);
+        false -> {lists:reverse(Acc), All}
+    end;
+parse_items(
+    <<?LAYOUT_VSN_0:8, CRC:32/unsigned-integer, Size:32/unsigned-integer, Item:Size/binary,
+        Rest/binary>> = All,
+    Id,
+    Acc
+) ->
+    case CRC =:= erlang:crc32(Item) andalso Item =/= <<>> of
+        true -> parse_items(Rest, Id + 1, [{Id, Item} | Acc]);
+        false -> {lists:reverse(Acc), All}
+    end;
+parse_items(Corrupted, _Id, Acc) ->
+    {lists:reverse(Acc), Corrupted}.
+
+make_iodata(Item0, Marshaller) ->
+    Item = Marshaller(Item0),
+    Size = size(Item),
+    CRC = erlang:crc32(Item),
+    [
+        <<?LAYOUT_VSN_1:8, ?MAGIC:32/unsigned-integer, CRC:32/unsigned-integer,
+            Size:32/unsigned-integer>>,
+        Item
+    ].
+
+collect_stats(HeadItems, SegsOnDisk, Reader) ->
+    ItemF = fun(?DISK_CP_ITEM(_Id, Sz, _Item), {B, C}) ->
+        {B + Sz, C + 1}
+    end,
+    Acc0 = lists:foldl(ItemF, {0, 0}, HeadItems),
+    {Bytes, Count} =
+        lists:foldl(
+            fun(Segno, Acc) ->
+                Items = Reader(Segno, ?NO_COMMIT_HIST),
+                lists:foldl(ItemF, Acc, Items)
+            end,
+            Acc0,
+            SegsOnDisk
+        ),
+    #{bytes => Bytes, count => Count}.
+
+parse_segno(Filename) ->
+    [Segno, ?SUFFIX] = string:tokens(Filename, "."),
+    list_to_integer(Segno).
+
+filename(Dir, Segno) ->
+    Name = lists:flatten(io_lib:format("~10.10.0w." ?SUFFIX, [Segno])),
+    filename:join(Dir, Name).
+
+%% open the current segment for write if it is empty
+%% otherwise rollout to the next segment
+-spec init_writer(dir(), empty | segno(), boolean()) -> w_cur().
+init_writer(_Dir, empty, true) ->
+    %% clean start for offload mode
+    #{fd => ?NO_FD, segno => ?FIRST_SEGNO, bytes => 0, count => 0};
+init_writer(Dir, empty, false) ->
+    open_segment(Dir, ?FIRST_SEGNO);
+init_writer(Dir, Segno, _IsOffload) when is_number(Segno) ->
+    Filename = filename(Dir, Segno),
+    case filelib:file_size(Filename) of
+        0 -> open_segment(Dir, Segno);
+        _ -> open_segment(Dir, ?NEXT_SEGNO(Segno))
+    end.
+
+-spec open_segment(dir(), segno()) -> w_cur().
+open_segment(Dir, Segno) ->
+    Filename = filename(Dir, Segno),
+    %% raw so there is no need to go through the single gen_server file_server
+    {ok, Fd} = file:open(Filename, [raw, read, write, binary, delayed_write]),
+    #{fd => Fd, segno => Segno, bytes => 0, count => 0}.
+
+get_sizer(C) ->
+    maps:get(sizer, C, fun ?MODULE:default_sizer/1).
+
+get_marshaller(C) ->
+    maps:get(marshaller, C, fun ?MODULE:default_marshaller/1).
+
+is_offload_mode(Config) when is_map(Config) ->
+    maps:get(offload, Config, false).
+
+default_sizer(I) when is_binary(I) -> erlang:size(I).
+
+default_marshaller(I) when is_binary(I) -> I.
+
+is_segment_full(
+    #{segno := WriterSegno, bytes := SegmentBytes},
+    TotalBytes,
+    SegmentBytesLimit,
+    ReaderSegno,
+    true
+) ->
+    %% in offload mode, when reader is lagging behind, we try
+    %% writer rolls to a new segment when file size limit is reached
+    %% when reader is reading off from the same segment as writer
+    %% i.e. the in memory queue, only start writing to segment file
+    %% when total bytes (in memory) is reached segment limit
+    %%
+    %% NOTE: we never shrink segment bytes, even when popping out
+    %% from the in-memory queue.
+    case ReaderSegno < WriterSegno of
+        true -> SegmentBytes >= SegmentBytesLimit;
+        false -> TotalBytes >= SegmentBytesLimit
+    end;
+is_segment_full(
+    #{bytes := SegmentBytes},
+    _TotalBytes,
+    SegmentBytesLimit,
+    _ReaderSegno,
+    false
+) ->
+    %% here we check if segment size is greater than segment size limit
+    %% after append based on the assumption that the items are usually
+    %% very small in size comparing to segment size.
+    %% We can change implementation to split items list to avoid
+    %% segment overflow if really necessary
+    SegmentBytes >= SegmentBytesLimit.