Просмотр исходного кода

feat(dsrepl): transfer storage snapshot during ra snapshot recovery

Andrew Mayorov 1 год назад
Родитель
Сommit
77a022bd93

+ 21 - 2
apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl

@@ -21,7 +21,16 @@
 -behaviour(supervisor).
 
 %% API:
--export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1, ensure_egress/1]).
+-export([
+    start_db/2,
+    start_shard/1,
+    start_egress/1,
+    stop_shard/1,
+    terminate_storage/1,
+    restart_storage/1,
+    ensure_shard/1,
+    ensure_egress/1
+]).
 -export([which_shards/1]).
 
 %% behaviour callbacks:
@@ -64,12 +73,22 @@ start_shard({DB, Shard}) ->
 start_egress({DB, Shard}) ->
     supervisor:start_child(?via(#?egress_sup{db = DB}), egress_spec(DB, Shard)).
 
--spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _}.
+-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok.
 stop_shard(Shard = {DB, _}) ->
     Sup = ?via(#?shards_sup{db = DB}),
     ok = supervisor:terminate_child(Sup, Shard),
     ok = supervisor:delete_child(Sup, Shard).
 
+-spec terminate_storage(emqx_ds_storage_layer:shard_id()) -> ok | {error, _Reason}.
+terminate_storage({DB, Shard}) ->
+    Sup = ?via(#?shard_sup{db = DB, shard = Shard}),
+    supervisor:terminate_child(Sup, {Shard, storage}).
+
+-spec restart_storage(emqx_ds_storage_layer:shard_id()) -> {ok, _Child} | {error, _Reason}.
+restart_storage({DB, Shard}) ->
+    Sup = ?via(#?shard_sup{db = DB, shard = Shard}),
+    supervisor:restart_child(Sup, {Shard, storage}).
+
 -spec ensure_shard(emqx_ds_storage_layer:shard_id()) ->
     ok | {error, _Reason}.
 ensure_shard(Shard) ->

+ 27 - 1
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -65,7 +65,9 @@
 
 -export([
     init/1,
-    apply/3
+    apply/3,
+
+    snapshot_module/0
 ]).
 
 -export_type([
@@ -80,6 +82,10 @@
     batch/0
 ]).
 
+-export_type([
+    ra_state/0
+]).
+
 -include_lib("emqx_utils/include/emqx_message.hrl").
 -include("emqx_ds_replication_layer.hrl").
 
@@ -140,6 +146,20 @@
 
 -type generation_rank() :: {shard_id(), term()}.
 
+%% Core state of the replication, i.e. the state of ra machine.
+-type ra_state() :: #{
+    db_shard := {emqx_ds:db(), shard_id()},
+    latest := timestamp_us()
+}.
+
+%% Command. Each command is an entry in the replication log.
+-type ra_command() :: #{
+    ?tag := ?BATCH | add_generation | update_config | drop_generation,
+    _ => _
+}.
+
+-type timestamp_us() :: non_neg_integer().
+
 %%================================================================================
 %% API functions
 %%================================================================================
@@ -635,9 +655,12 @@ ra_drop_shard(DB, Shard) ->
 
 %%
 
+-spec init(_Args :: map()) -> ra_state().
 init(#{db := DB, shard := Shard}) ->
     #{db_shard => {DB, Shard}, latest => 0}.
 
+-spec apply(ra_machine:command_meta_data(), ra_command(), ra_state()) ->
+    {ra_state(), _Reply, _Effects}.
 apply(
     #{index := RaftIdx},
     #{
@@ -717,3 +740,6 @@ timestamp_to_timeus(TimestampMs) ->
 
 timeus_to_timestamp(TimestampUs) ->
     TimestampUs div 1000.
+
+snapshot_module() ->
+    emqx_ds_replication_snapshot.

+ 10 - 8
apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl

@@ -147,19 +147,21 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) ->
             Bootstrap = false;
         {error, name_not_registered} ->
             Bootstrap = true,
+            Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
+            LogOpts = maps:with(
+                [
+                    snapshot_interval,
+                    resend_window
+                ],
+                ReplicationOpts
+            ),
             ok = ra:start_server(DB, #{
                 id => LocalServer,
                 uid => <<ClusterName/binary, "_", Site/binary>>,
                 cluster_name => ClusterName,
                 initial_members => Servers,
-                machine => {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
-                log_init_args => maps:with(
-                    [
-                        snapshot_interval,
-                        resend_window
-                    ],
-                    ReplicationOpts
-                )
+                machine => Machine,
+                log_init_args => LogOpts
             })
     end,
     case Servers of

+ 229 - 0
apps/emqx_durable_storage/src/emqx_ds_replication_snapshot.erl

@@ -0,0 +1,229 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_replication_snapshot).
+
+-include_lib("snabbkaffe/include/trace.hrl").
+
+-behaviour(ra_snapshot).
+-export([
+    prepare/2,
+    write/3,
+
+    begin_read/2,
+    read_chunk/3,
+
+    begin_accept/2,
+    accept_chunk/2,
+    complete_accept/2,
+
+    recover/1,
+    validate/1,
+    read_meta/1
+]).
+
+%% Read state.
+-record(rs, {
+    phase :: machine_state | storage_snapshot,
+    started_at :: _Time :: integer(),
+    state :: emqx_ds_replication_layer:ra_state() | undefined,
+    reader :: emqx_ds_storage_snapshot:reader() | undefined
+}).
+
+%% Write state.
+-record(ws, {
+    phase :: machine_state | storage_snapshot,
+    started_at :: _Time :: integer(),
+    dir :: file:filename(),
+    meta :: ra_snapshot:meta(),
+    state :: emqx_ds_replication_layer:ra_state() | undefined,
+    writer :: emqx_ds_storage_snapshot:writer() | undefined
+}).
+
+-type rs() :: #rs{}.
+-type ws() :: #ws{}.
+
+-type ra_state() :: emqx_ds_replication_layer:ra_state().
+
+%% Writing a snapshot.
+%% This process is exactly the same as writing a ra log snapshot: store the
+%% log meta and the machine state in a single snapshot file.
+
+-spec prepare(_RaftIndex, ra_state()) -> _State :: ra_state().
+prepare(Index, State) ->
+    ra_log_snapshot:prepare(Index, State).
+
+-spec write(_SnapshotDir :: file:filename(), ra_snapshot:meta(), _State :: ra_state()) ->
+    ok | {ok, _BytesWritten :: non_neg_integer()} | {error, ra_snapshot:file_err()}.
+write(Dir, Meta, MachineState) ->
+    ra_log_snapshot:write(Dir, Meta, MachineState).
+
+%% Reading a snapshot.
+%% This is triggered by the leader when it finds out that a follower is
+%% behind so much that there are no log segments covering the gap anymore.
+%% This process, on the other hand, MUST involve reading the storage snapshot,
+%% (in addition to the log snapshot) to reconstruct the storage state on the
+%% target node.
+
+-spec begin_read(_SnapshotDir :: file:filename(), _Context :: #{}) ->
+    {ok, ra_snapshot:meta(), rs()} | {error, _Reason :: term()}.
+begin_read(Dir, _Context) ->
+    RS = #rs{
+        phase = machine_state,
+        started_at = erlang:monotonic_time(millisecond)
+    },
+    case ra_log_snapshot:recover(Dir) of
+        {ok, Meta, MachineState} ->
+            start_snapshot_reader(Meta, RS#rs{state = MachineState});
+        Error ->
+            Error
+    end.
+
+start_snapshot_reader(Meta, RS) ->
+    ShardId = shard_id(RS),
+    logger:info(#{
+        msg => "dsrepl_snapshot_read_started",
+        shard => ShardId
+    }),
+    {ok, SnapReader} = emqx_ds_storage_layer:take_snapshot(ShardId),
+    {ok, Meta, RS#rs{reader = SnapReader}}.
+
+-spec read_chunk(rs(), _Size :: non_neg_integer(), _SnapshotDir :: file:filename()) ->
+    {ok, binary(), {next, rs()} | last} | {error, _Reason :: term()}.
+read_chunk(RS = #rs{phase = machine_state, state = MachineState}, _Size, _Dir) ->
+    Chunk = term_to_binary(MachineState),
+    {ok, Chunk, {next, RS#rs{phase = storage_snapshot}}};
+read_chunk(RS = #rs{phase = storage_snapshot, reader = SnapReader0}, Size, _Dir) ->
+    case emqx_ds_storage_snapshot:read_chunk(SnapReader0, Size) of
+        {next, Chunk, SnapReader} ->
+            {ok, Chunk, {next, RS#rs{reader = SnapReader}}};
+        {last, Chunk, SnapReader} ->
+            %% TODO: idempotence?
+            ?tp(dsrepl_snapshot_read_complete, #{reader => SnapReader}),
+            _ = complete_read(RS#rs{reader = SnapReader}),
+            {ok, Chunk, last};
+        {error, Reason} ->
+            ?tp(dsrepl_snapshot_read_error, #{reason => Reason, reader => SnapReader0}),
+            _ = emqx_ds_storage_snapshot:release_reader(SnapReader0),
+            error(Reason)
+    end.
+
+complete_read(RS = #rs{reader = SnapReader, started_at = StartedAt}) ->
+    _ = emqx_ds_storage_snapshot:release_reader(SnapReader),
+    logger:info(#{
+        msg => "dsrepl_snapshot_read_complete",
+        shard => shard_id(RS),
+        duration_ms => erlang:monotonic_time(millisecond) - StartedAt,
+        read_bytes => emqx_ds_storage_snapshot:reader_info(bytes_read, SnapReader)
+    }).
+
+%% Accepting a snapshot.
+%% This process is triggered by the target server, when the leader finds out
+%% that the target server is severely lagging behind. This is receiving side of
+%% `begin_read/2` and `read_chunk/3`.
+
+-spec begin_accept(_SnapshotDir :: file:filename(), ra_snapshot:meta()) ->
+    {ok, ws()}.
+begin_accept(Dir, Meta) ->
+    WS = #ws{
+        phase = machine_state,
+        started_at = erlang:monotonic_time(millisecond),
+        dir = Dir,
+        meta = Meta
+    },
+    {ok, WS}.
+
+-spec accept_chunk(binary(), ws()) ->
+    {ok, ws()} | {error, _Reason :: term()}.
+accept_chunk(Chunk, WS = #ws{phase = machine_state}) ->
+    MachineState = binary_to_term(Chunk),
+    start_snapshot_writer(WS#ws{state = MachineState});
+accept_chunk(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0}) ->
+    %% TODO: idempotence?
+    case emqx_ds_storage_snapshot:write_chunk(SnapWriter0, Chunk) of
+        {next, SnapWriter} ->
+            {ok, WS#ws{writer = SnapWriter}};
+        {error, Reason} ->
+            ?tp(dsrepl_snapshot_write_error, #{reason => Reason, writer => SnapWriter0}),
+            _ = emqx_ds_storage_snapshot:abort_writer(SnapWriter0),
+            error(Reason)
+    end.
+
+start_snapshot_writer(WS) ->
+    ShardId = shard_id(WS),
+    logger:info(#{
+        msg => "dsrepl_snapshot_write_started",
+        shard => ShardId
+    }),
+    _ = emqx_ds_builtin_db_sup:terminate_storage(ShardId),
+    {ok, SnapWriter} = emqx_ds_storage_layer:accept_snapshot(ShardId),
+    {ok, WS#ws{phase = storage_snapshot, writer = SnapWriter}}.
+
+-spec complete_accept(ws()) -> ok | {error, ra_snapshot:file_err()}.
+complete_accept(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0}) ->
+    %% TODO: idempotence?
+    case emqx_ds_storage_snapshot:write_chunk(SnapWriter0, Chunk) of
+        {last, SnapWriter} ->
+            ?tp(dsrepl_snapshot_write_complete, #{writer => SnapWriter}),
+            _ = emqx_ds_storage_snapshot:release_writer(SnapWriter),
+            Result = complete_accept(WS#ws{writer = SnapWriter}),
+            ?tp(dsrepl_snapshot_accepted, #{shard => shard_id(WS)}),
+            Result;
+        {error, Reason} ->
+            ?tp(dsrepl_snapshot_write_error, #{reason => Reason, writer => SnapWriter0}),
+            _ = emqx_ds_storage_snapshot:abort_writer(SnapWriter0),
+            error(Reason)
+    end.
+
+complete_accept(WS = #ws{started_at = StartedAt, writer = SnapWriter}) ->
+    ShardId = shard_id(WS),
+    logger:info(#{
+        msg => "dsrepl_snapshot_read_complete",
+        shard => ShardId,
+        duration_ms => erlang:monotonic_time(millisecond) - StartedAt,
+        bytes_written => emqx_ds_storage_snapshot:writer_info(bytes_written, SnapWriter)
+    }),
+    {ok, _} = emqx_ds_builtin_db_sup:restart_storage(ShardId),
+    write_machine_snapshot(WS).
+
+write_machine_snapshot(#ws{dir = Dir, meta = Meta, state = MachineState}) ->
+    write(Dir, Meta, MachineState).
+
+%% Restoring machine state from a snapshot.
+%% This is equivalent to restoring from a log snapshot.
+
+-spec recover(_SnapshotDir :: file:filename()) ->
+    {ok, ra_snapshot:meta(), ra_state()} | {error, _Reason}.
+recover(Dir) ->
+    %% TODO: Verify that storage layer is online?
+    ra_log_snapshot:recover(Dir).
+
+-spec validate(_SnapshotDir :: file:filename()) ->
+    ok | {error, _Reason}.
+validate(Dir) ->
+    ra_log_snapshot:validate(Dir).
+
+-spec read_meta(_SnapshotDir :: file:filename()) ->
+    {ok, ra_snapshot:meta()} | {error, _Reason}.
+read_meta(Dir) ->
+    ra_log_snapshot:read_meta(Dir).
+
+shard_id(#rs{state = MachineState}) ->
+    shard_id(MachineState);
+shard_id(#ws{state = MachineState}) ->
+    shard_id(MachineState);
+shard_id(MachineState) ->
+    maps:get(db_shard, MachineState).

+ 64 - 10
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -19,9 +19,12 @@
 
 %% Replication layer API:
 -export([
-    open_shard/2,
+    %% Lifecycle
+    start_link/2,
     drop_shard/1,
     shard_info/2,
+
+    %% Data
     store_batch/3,
     get_streams/3,
     get_delete_streams/3,
@@ -30,14 +33,20 @@
     update_iterator/3,
     next/3,
     delete_next/4,
+
+    %% Generations
     update_config/3,
     add_generation/2,
     list_generations_with_lifetimes/1,
-    drop_generation/2
+    drop_generation/2,
+
+    %% Snapshotting
+    take_snapshot/1,
+    accept_snapshot/1
 ]).
 
 %% gen_server
--export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
 
 %% internal exports:
 -export([db_dir/1]).
@@ -230,10 +239,7 @@
 -record(call_update_config, {options :: emqx_ds:create_db_opts(), since :: emqx_ds:time()}).
 -record(call_list_generations_with_lifetimes, {}).
 -record(call_drop_generation, {gen_id :: gen_id()}).
-
--spec open_shard(shard_id(), options()) -> ok.
-open_shard(Shard, Options) ->
-    emqx_ds_storage_layer_sup:ensure_shard(Shard, Options).
+-record(call_take_snapshot, {}).
 
 -spec drop_shard(shard_id()) -> ok.
 drop_shard(Shard) ->
@@ -245,12 +251,24 @@ drop_shard(Shard) ->
     emqx_ds:message_store_opts()
 ) ->
     emqx_ds:store_batch_result().
-store_batch(Shard, Messages, Options) ->
+store_batch(Shard, Messages0, Options) ->
     %% We always store messages in the current generation:
     GenId = generation_current(Shard),
-    #{module := Mod, data := GenData} = generation_get(Shard, GenId),
+    #{module := Mod, data := GenData, since := Since} = generation_get(Shard, GenId),
+    case Messages0 of
+        [{Time, _Msg} | Rest] when Time < Since ->
+            %% FIXME: log / feedback
+            Messages = skip_outdated_messages(Since, Rest);
+        _ ->
+            Messages = Messages0
+    end,
     Mod:store_batch(Shard, GenData, Messages, Options).
 
+skip_outdated_messages(Since, [{Time, _Msg} | Rest]) when Time < Since ->
+    skip_outdated_messages(Since, Rest);
+skip_outdated_messages(_Since, Messages) ->
+    Messages.
+
 -spec get_streams(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) ->
     [{integer(), stream()}].
 get_streams(Shard, TopicFilter, StartTime) ->
@@ -445,6 +463,20 @@ shard_info(ShardId, status) ->
         error:badarg -> down
     end.
 
+-spec take_snapshot(shard_id()) -> {ok, emqx_ds_storage_snapshot:reader()} | {error, _Reason}.
+take_snapshot(ShardId) ->
+    case gen_server:call(?REF(ShardId), #call_take_snapshot{}, infinity) of
+        {ok, Dir} ->
+            emqx_ds_storage_snapshot:new_reader(Dir);
+        Error ->
+            Error
+    end.
+
+-spec accept_snapshot(shard_id()) -> {ok, emqx_ds_storage_snapshot:writer()} | {error, _Reason}.
+accept_snapshot(ShardId) ->
+    ok = drop_shard(ShardId),
+    handle_accept_snapshot(ShardId).
+
 %%================================================================================
 %% gen_server for the shard
 %%================================================================================
@@ -514,6 +546,9 @@ handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) ->
     {Reply, S} = handle_drop_generation(S0, GenId),
     commit_metadata(S),
     {reply, Reply, S};
+handle_call(#call_take_snapshot{}, _From, S) ->
+    Snapshot = handle_take_snapshot(S),
+    {reply, Snapshot, S};
 handle_call(_Call, _From, S) ->
     {reply, {error, unknown_call}, S}.
 
@@ -735,7 +770,11 @@ rocksdb_open(Shard, Options) ->
 
 -spec db_dir(shard_id()) -> file:filename().
 db_dir({DB, ShardId}) ->
-    filename:join([emqx_ds:base_dir(), atom_to_list(DB), binary_to_list(ShardId)]).
+    filename:join([emqx_ds:base_dir(), DB, binary_to_list(ShardId)]).
+
+-spec checkpoint_dir(shard_id(), _Name :: file:name()) -> file:filename().
+checkpoint_dir({DB, ShardId}, Name) ->
+    filename:join([emqx_ds:base_dir(), DB, checkpoints, binary_to_list(ShardId), Name]).
 
 -spec update_last_until(Schema, emqx_ds:time()) ->
     Schema | {error, exists | overlaps_existing_generations}
@@ -768,6 +807,21 @@ run_post_creation_actions(#{new_gen_runtime_data := NewGenData}) ->
     %% Different implementation modules
     NewGenData.
 
+handle_take_snapshot(#s{db = DB, shard_id = ShardId}) ->
+    Name = integer_to_list(erlang:system_time(millisecond)),
+    Dir = checkpoint_dir(ShardId, Name),
+    _ = filelib:ensure_dir(Dir),
+    case rocksdb:checkpoint(DB, Dir) of
+        ok ->
+            {ok, Dir};
+        {error, _} = Error ->
+            Error
+    end.
+
+handle_accept_snapshot(ShardId) ->
+    Dir = db_dir(ShardId),
+    emqx_ds_storage_snapshot:new_writer(Dir).
+
 %%--------------------------------------------------------------------------------
 %% Schema access
 %%--------------------------------------------------------------------------------

+ 0 - 88
apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl

@@ -1,88 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
--module(emqx_ds_storage_layer_sup).
-
--behaviour(supervisor).
-
-%% API:
--export([start_link/0, start_shard/2, stop_shard/1, ensure_shard/2]).
-
-%% behaviour callbacks:
--export([init/1]).
-
-%%================================================================================
-%% Type declarations
-%%================================================================================
-
--define(SUP, ?MODULE).
-
-%%================================================================================
-%% API funcions
-%%================================================================================
-
--spec start_link() -> {ok, pid()}.
-start_link() ->
-    supervisor:start_link(?MODULE, []).
-
--spec start_shard(emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) ->
-    supervisor:startchild_ret().
-start_shard(Shard, Options) ->
-    supervisor:start_child(?SUP, shard_child_spec(Shard, Options)).
-
--spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _}.
-stop_shard(Shard) ->
-    ok = supervisor:terminate_child(?SUP, Shard),
-    ok = supervisor:delete_child(?SUP, Shard).
-
--spec ensure_shard(emqx_ds_storage_layer:shard_id(), emqx_ds_storage_layer:options()) ->
-    ok | {error, _Reason}.
-ensure_shard(Shard, Options) ->
-    case start_shard(Shard, Options) of
-        {ok, _Pid} ->
-            ok;
-        {error, {already_started, _Pid}} ->
-            ok;
-        {error, Reason} ->
-            {error, Reason}
-    end.
-
-%%================================================================================
-%% behaviour callbacks
-%%================================================================================
-
-init([]) ->
-    Children = [],
-    SupFlags = #{
-        strategy => one_for_one,
-        intensity => 10,
-        period => 10
-    },
-    {ok, {SupFlags, Children}}.
-
-%%================================================================================
-%% Internal functions
-%%================================================================================
-
--spec shard_child_spec(emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) ->
-    supervisor:child_spec().
-shard_child_spec(Shard, Options) ->
-    #{
-        id => Shard,
-        start => {emqx_ds_storage_layer, start_link, [Shard, Options]},
-        shutdown => 5_000,
-        restart => permanent,
-        type => worker
-    }.

+ 325 - 0
apps/emqx_durable_storage/src/emqx_ds_storage_snapshot.erl

@@ -0,0 +1,325 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_ds_storage_snapshot).
+
+-include_lib("kernel/include/file.hrl").
+
+-export([
+    new_reader/1,
+    read_chunk/2,
+    abort_reader/1,
+    release_reader/1,
+    reader_info/2
+]).
+
+-export([
+    new_writer/1,
+    write_chunk/2,
+    abort_writer/1,
+    release_writer/1,
+    writer_info/2
+]).
+
+-export_type([
+    reader/0,
+    writer/0
+]).
+
+%%
+
+-define(FILECHUNK(RELPATH, POS, MORE), #{
+    '$' => chunk,
+    rp => RELPATH,
+    pos => POS,
+    more => MORE
+}).
+-define(PAT_FILECHUNK(RELPATH, POS, MORE), #{
+    '$' := chunk,
+    rp := RELPATH,
+    pos := POS,
+    more := MORE
+}).
+
+-define(EOS(), #{
+    '$' => eos
+}).
+-define(PAT_EOS(), #{
+    '$' := eos
+}).
+
+-define(PAT_HEADER(), #{'$' := _}).
+
+%%
+
+-record(reader, {
+    dirpath :: file:filename(),
+    files :: #{_RelPath => reader_file()},
+    queue :: [_RelPath :: file:filename()]
+}).
+
+-record(rfile, {
+    abspath :: file:filename(),
+    fd :: file:io_device() | eof,
+    pos :: non_neg_integer()
+}).
+
+-opaque reader() :: #reader{}.
+-type reader_file() :: #rfile{}.
+
+-type reason() :: {atom(), _AbsPath :: file:filename(), _Details :: term()}.
+
+%% @doc Initialize a reader for a snapshot directory.
+%% Snapshot directory is a directory containing arbitrary number of regular
+%% files in arbitrary subdirectory structure. Files are read in indeterminate
+%% order. It's an error to have non-regular files in the directory (e.g. symlinks).
+-spec new_reader(_Dir :: file:filename()) -> {ok, reader()}.
+new_reader(DirPath) ->
+    %% NOTE
+    %% Opening all files at once, so there would be less error handling later
+    %% during transfer.
+    %% TODO
+    %% Beware of how errors are handled: if one file fails to open, the whole
+    %% process will exit. This is fine for the purpose of replication (because
+    %% ra spawns separate process for each transfer), but may not be suitable
+    %% for other use cases.
+    Files = emqx_utils_fs:traverse_dir(
+        fun(Path, Info, Acc) -> new_reader_file(Path, Info, DirPath, Acc) end,
+        #{},
+        DirPath
+    ),
+    {ok, #reader{
+        dirpath = DirPath,
+        files = Files,
+        queue = maps:keys(Files)
+    }}.
+
+new_reader_file(Path, #file_info{type = regular}, DirPath, Acc) ->
+    case file:open(Path, [read, binary, raw]) of
+        {ok, IoDev} ->
+            RelPath = emqx_utils_fs:find_relpath(Path, DirPath),
+            File = #rfile{abspath = Path, fd = IoDev, pos = 0},
+            Acc#{RelPath => File};
+        {error, Reason} ->
+            error({open_failed, Path, Reason})
+    end;
+new_reader_file(Path, #file_info{type = Type}, _, _Acc) ->
+    error({bad_file_type, Path, Type});
+new_reader_file(Path, {error, Reason}, _, _Acc) ->
+    error({inaccessible, Path, Reason}).
+
+%% @doc Read a chunk of data from the snapshot.
+%% Returns `{last, Chunk, Reader}` when the last chunk is read. After that, one
+%% should call `release_reader/1` to finalize the process (or `abort_reader/1` if
+%% keeping the snapshot is desired).
+-spec read_chunk(reader(), _Size :: non_neg_integer()) ->
+    {last | next, _Chunk :: iodata(), reader()} | {error, reason()}.
+read_chunk(R = #reader{files = Files, queue = [RelPath | Rest]}, Size) ->
+    File = maps:get(RelPath, Files),
+    case read_chunk_file(RelPath, File, Size) of
+        {last, Chunk, FileRest} ->
+            {next, Chunk, R#reader{files = Files#{RelPath := FileRest}, queue = Rest}};
+        {next, Chunk, FileRest} ->
+            {next, Chunk, R#reader{files = Files#{RelPath := FileRest}}};
+        Error ->
+            Error
+    end;
+read_chunk(R = #reader{queue = []}, _Size) ->
+    {last, make_packet(?EOS()), R}.
+
+read_chunk_file(RelPath, RFile0 = #rfile{fd = IoDev, pos = Pos, abspath = AbsPath}, Size) ->
+    case file:read(IoDev, Size) of
+        {ok, Chunk} ->
+            ChunkSize = byte_size(Chunk),
+            HasMore = ChunkSize div Size,
+            RFile1 = RFile0#rfile{pos = Pos + ChunkSize},
+            case ChunkSize < Size of
+                false ->
+                    Status = next,
+                    RFile = RFile1;
+                true ->
+                    Status = last,
+                    RFile = release_reader_file(RFile1)
+            end,
+            Packet = make_packet(?FILECHUNK(RelPath, Pos, HasMore), Chunk),
+            {Status, Packet, RFile};
+        eof ->
+            Packet = make_packet(?FILECHUNK(RelPath, Pos, 0)),
+            {last, Packet, release_reader_file(RFile0)};
+        {error, Reason} ->
+            {error, {read_failed, AbsPath, Reason}}
+    end.
+
+%% @doc Aborts the snapshot reader, but does not release the snapshot files.
+-spec abort_reader(reader()) -> ok.
+abort_reader(#reader{files = Files}) ->
+    lists:foreach(fun release_reader_file/1, maps:values(Files)).
+
+%% @doc Aborts the snapshot reader and deletes the snapshot files.
+-spec release_reader(reader()) -> ok.
+release_reader(R = #reader{dirpath = DirPath}) ->
+    ok = abort_reader(R),
+    file:del_dir_r(DirPath).
+
+release_reader_file(RFile = #rfile{fd = eof}) ->
+    RFile;
+release_reader_file(RFile = #rfile{fd = IoDev}) ->
+    _ = file:close(IoDev),
+    RFile#rfile{fd = eof}.
+
+-spec reader_info(bytes_read, reader()) -> _Bytes :: non_neg_integer().
+reader_info(bytes_read, #reader{files = Files}) ->
+    maps:fold(fun(_, RFile, Sum) -> Sum + RFile#rfile.pos end, 0, Files).
+
+%%
+
+-record(writer, {
+    dirpath :: file:filename(),
+    files :: #{_RelPath :: file:filename() => writer_file()}
+}).
+
+-record(wfile, {
+    abspath :: file:filename(),
+    fd :: file:io_device() | eof,
+    pos :: non_neg_integer()
+}).
+
+-opaque writer() :: #writer{}.
+-type writer_file() :: #wfile{}.
+
+%% @doc Initialize a writer into a snapshot directory.
+%% The directory needs not to exist, it will be created if it doesn't.
+%% Having non-empty directory is not an error, existing files will be
+%% overwritten.
+-spec new_writer(_Dir :: file:filename()) -> {ok, writer()} | {error, reason()}.
+new_writer(DirPath) ->
+    case filelib:ensure_path(DirPath) of
+        ok ->
+            {ok, #writer{dirpath = DirPath, files = #{}}};
+        {error, Reason} ->
+            {error, {mkdir_failed, DirPath, Reason}}
+    end.
+
+%% @doc Write a chunk of data to the snapshot.
+%% Returns `{last, Writer}` when the last chunk is written. After that, one
+%% should call `release_writer/1` to finalize the process.
+-spec write_chunk(writer(), _Chunk :: binary()) ->
+    {last | next, writer()} | {error, _Reason}.
+write_chunk(W, Packet) ->
+    case parse_packet(Packet) of
+        {?PAT_FILECHUNK(RelPath, Pos, More), Chunk} ->
+            write_chunk(W, RelPath, Pos, More, Chunk);
+        {?PAT_EOS(), _Rest} ->
+            %% TODO: Verify all files are `eof` at this point?
+            {last, W};
+        Error ->
+            Error
+    end.
+
+write_chunk(W = #writer{files = Files}, RelPath, Pos, More, Chunk) ->
+    case Files of
+        #{RelPath := WFile} ->
+            write_chunk(W, WFile, RelPath, Pos, More, Chunk);
+        #{} when Pos == 0 ->
+            case new_writer_file(W, RelPath) of
+                WFile = #wfile{} ->
+                    write_chunk(W, WFile, RelPath, Pos, More, Chunk);
+                Error ->
+                    Error
+            end;
+        #{} ->
+            {error, {bad_chunk, RelPath, Pos}}
+    end.
+
+write_chunk(W = #writer{files = Files}, WFile0, RelPath, Pos, More, Chunk) ->
+    case write_chunk_file(WFile0, Pos, More, Chunk) of
+        WFile = #wfile{} ->
+            {next, W#writer{files = Files#{RelPath => WFile}}};
+        Error ->
+            Error
+    end.
+
+new_writer_file(#writer{dirpath = DirPath}, RelPath) ->
+    AbsPath = filename:join(DirPath, RelPath),
+    _ = filelib:ensure_dir(AbsPath),
+    case file:open(AbsPath, [write, binary, raw]) of
+        {ok, IoDev} ->
+            #wfile{
+                abspath = AbsPath,
+                fd = IoDev,
+                pos = 0
+            };
+        {error, Reason} ->
+            {error, {open_failed, AbsPath, Reason}}
+    end.
+
+write_chunk_file(WFile0 = #wfile{fd = IoDev, pos = Pos, abspath = AbsPath}, Pos, More, Chunk) ->
+    ChunkSize = byte_size(Chunk),
+    case file:write(IoDev, Chunk) of
+        ok ->
+            WFile1 = WFile0#wfile{pos = Pos + ChunkSize},
+            case More of
+                0 -> release_writer_file(WFile1);
+                _ -> WFile1
+            end;
+        {error, Reason} ->
+            {error, {write_failed, AbsPath, Reason}}
+    end;
+write_chunk_file(WFile = #wfile{pos = WPos}, Pos, _More, _Chunk) when Pos < WPos ->
+    WFile;
+write_chunk_file(#wfile{abspath = AbsPath}, Pos, _More, _Chunk) ->
+    {error, {bad_chunk, AbsPath, Pos}}.
+
+%% @doc Abort the writer and clean up unfinished snapshot files.
+-spec abort_writer(writer()) -> ok | {error, file:posix()}.
+abort_writer(W = #writer{dirpath = DirPath}) ->
+    ok = release_writer(W),
+    file:del_dir_r(DirPath).
+
+%% @doc Release the writer and close all snapshot files.
+-spec release_writer(writer()) -> ok.
+release_writer(#writer{files = Files}) ->
+    ok = lists:foreach(fun release_writer_file/1, maps:values(Files)).
+
+release_writer_file(WFile = #wfile{fd = eof}) ->
+    WFile;
+release_writer_file(WFile = #wfile{fd = IoDev}) ->
+    _ = file:close(IoDev),
+    WFile#wfile{fd = eof}.
+
+-spec writer_info(bytes_written, writer()) -> _Bytes :: non_neg_integer().
+writer_info(bytes_written, #writer{files = Files}) ->
+    maps:fold(fun(_, WFile, Sum) -> Sum + WFile#wfile.pos end, 0, Files).
+
+%%
+
+make_packet(Header) ->
+    term_to_binary(Header).
+
+make_packet(Header, Rest) ->
+    HeaderBytes = term_to_binary(Header),
+    <<HeaderBytes/binary, Rest/binary>>.
+
+parse_packet(Packet) ->
+    try binary_to_term(Packet, [safe, used]) of
+        {Header = ?PAT_HEADER(), Length} ->
+            Rest = binary:part(Packet, Length, byte_size(Packet) - Length),
+            {Header, Rest};
+        {Header, _} ->
+            {error, {bad_header, Header}}
+    catch
+        error:badarg ->
+            {error, bad_packet}
+    end.

+ 202 - 0
apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl

@@ -0,0 +1,202 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_ds_replication_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("stdlib/include/assert.hrl").
+-include_lib("snabbkaffe/include/test_macros.hrl").
+
+-define(DB, testdb).
+
+opts() ->
+    #{
+        backend => builtin,
+        storage => {emqx_ds_storage_bitfield_lts, #{}},
+        n_shards => 1,
+        n_sites => 3,
+        replication_factor => 3,
+        replication_options => #{
+            wal_max_size_bytes => 128 * 1024,
+            wal_max_batch_size => 1024,
+            snapshot_interval => 128
+        }
+    }.
+
+t_replication_transfers_snapshots(Config) ->
+    NMsgs = 4000,
+    Nodes = [Node, NodeOffline | _] = ?config(nodes, Config),
+    _Specs = [_, SpecOffline | _] = ?config(specs, Config),
+
+    %% Initialize DB on all nodes and wait for it to be online.
+    ?assertEqual(
+        [{ok, ok} || _ <- Nodes],
+        erpc:multicall(Nodes, emqx_ds, open_db, [?DB, opts()])
+    ),
+    ?retry(
+        500,
+        10,
+        ?assertMatch([_], shards_online(Node, ?DB))
+    ),
+
+    %% Stop the DB on the "offline" node.
+    ok = emqx_cth_cluster:stop_node(NodeOffline),
+
+    %% Fill the storage with messages and few additional generations.
+    Messages = fill_storage(Node, ?DB, NMsgs, #{p_addgen => 0.01}),
+
+    %% Restart the node.
+    [NodeOffline] = emqx_cth_cluster:restart(SpecOffline),
+    {ok, SRef} = snabbkaffe:subscribe(
+        ?match_event(#{
+            ?snk_kind := dsrepl_snapshot_accepted,
+            ?snk_meta := #{node := NodeOffline}
+        })
+    ),
+    ?assertEqual(
+        ok,
+        erpc:call(NodeOffline, emqx_ds, open_db, [?DB, opts()])
+    ),
+
+    %% Trigger storage operation and wait the replica to be restored.
+    _ = add_generation(Node, ?DB),
+    ?assertMatch(
+        {ok, _},
+        snabbkaffe:receive_events(SRef)
+    ),
+
+    %% Wait until any pending replication activities are finished (e.g. Raft log entries).
+    ok = timer:sleep(3_000),
+
+    %% Check that the DB has been restored.
+    Shard = hd(shards(NodeOffline, ?DB)),
+    MessagesOffline = lists:keysort(
+        #message.timestamp,
+        consume(NodeOffline, ?DB, Shard, ['#'], 0)
+    ),
+    ?assertEqual(
+        sample(40, Messages),
+        sample(40, MessagesOffline)
+    ),
+    ?assertEqual(
+        Messages,
+        MessagesOffline
+    ).
+
+shards(Node, DB) ->
+    erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]).
+
+shards_online(Node, DB) ->
+    erpc:call(Node, emqx_ds_builtin_db_sup, which_shards, [DB]).
+
+fill_storage(Node, DB, NMsgs, Opts) ->
+    fill_storage(Node, DB, NMsgs, 0, Opts).
+
+fill_storage(Node, DB, NMsgs, I, Opts = #{p_addgen := PAddGen}) when I < NMsgs ->
+    R1 = push_message(Node, DB, I),
+    R2 = probably(PAddGen, fun() -> add_generation(Node, DB) end),
+    R1 ++ R2 ++ fill_storage(Node, DB, NMsgs, I + 1, Opts);
+fill_storage(_Node, _DB, NMsgs, NMsgs, _Opts) ->
+    [].
+
+push_message(Node, DB, I) ->
+    Topic = emqx_topic:join([<<"topic">>, <<"foo">>, integer_to_binary(I)]),
+    {Bytes, _} = rand:bytes_s(120, rand:seed_s(default, I)),
+    Message = message(Topic, Bytes, I * 100),
+    ok = erpc:call(Node, emqx_ds, store_batch, [DB, [Message], #{sync => true}]),
+    [Message].
+
+add_generation(Node, DB) ->
+    ok = erpc:call(Node, emqx_ds, add_generation, [DB]),
+    [].
+
+message(Topic, Payload, PublishedAt) ->
+    #message{
+        from = <<?MODULE_STRING>>,
+        topic = Topic,
+        payload = Payload,
+        timestamp = PublishedAt,
+        id = emqx_guid:gen()
+    }.
+
+consume(Node, DB, Shard, TopicFilter, StartTime) ->
+    Streams = erpc:call(Node, emqx_ds_storage_layer, get_streams, [
+        {DB, Shard}, TopicFilter, StartTime
+    ]),
+    lists:flatmap(
+        fun({_Rank, Stream}) ->
+            {ok, It} = erpc:call(Node, emqx_ds_storage_layer, make_iterator, [
+                {DB, Shard}, Stream, TopicFilter, StartTime
+            ]),
+            consume_stream(Node, DB, Shard, It)
+        end,
+        Streams
+    ).
+
+consume_stream(Node, DB, Shard, It) ->
+    case erpc:call(Node, emqx_ds_storage_layer, next, [{DB, Shard}, It, 100]) of
+        {ok, _NIt, _Msgs = []} ->
+            [];
+        {ok, NIt, Batch} ->
+            [Msg || {_Key, Msg} <- Batch] ++ consume_stream(Node, DB, Shard, NIt);
+        {ok, end_of_stream} ->
+            []
+    end.
+
+probably(P, Fun) ->
+    case rand:uniform() of
+        X when X < P -> Fun();
+        _ -> []
+    end.
+
+sample(N, List) ->
+    L = length(List),
+    H = N div 2,
+    Filler = integer_to_list(L - N) ++ " more",
+    lists:sublist(List, H) ++ [Filler] ++ lists:sublist(List, L - H, L).
+
+%%
+
+suite() -> [{timetrap, {seconds, 60}}].
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_testcase(TCName, Config) ->
+    Apps = [
+        {emqx_durable_storage, #{
+            before_start => fun snabbkaffe:fix_ct_logging/0,
+            override_env => [{egress_flush_interval, 1}]
+        }}
+    ],
+    WorkDir = emqx_cth_suite:work_dir(TCName, Config),
+    NodeSpecs = emqx_cth_cluster:mk_nodespecs(
+        [
+            {emqx_ds_replication_SUITE1, #{apps => Apps}},
+            {emqx_ds_replication_SUITE2, #{apps => Apps}},
+            {emqx_ds_replication_SUITE3, #{apps => Apps}}
+        ],
+        #{work_dir => WorkDir}
+    ),
+    Nodes = emqx_cth_cluster:start(NodeSpecs),
+    ok = snabbkaffe:start_trace(),
+    [{nodes, Nodes}, {specs, NodeSpecs} | Config].
+
+end_per_testcase(_TCName, Config) ->
+    ok = snabbkaffe:stop(),
+    ok = emqx_cth_cluster:stop(?config(nodes, Config)).

+ 149 - 0
apps/emqx_durable_storage/test/emqx_ds_storage_snapshot_SUITE.erl

@@ -0,0 +1,149 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_ds_storage_snapshot_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("stdlib/include/assert.hrl").
+
+opts() ->
+    #{storage => {emqx_ds_storage_bitfield_lts, #{}}}.
+
+%%
+
+t_snapshot_take_restore(_Config) ->
+    Shard = {?FUNCTION_NAME, _ShardId = <<"42">>},
+    {ok, Pid} = emqx_ds_storage_layer:start_link(Shard, opts()),
+
+    %% Push some messages to the shard.
+    Msgs1 = [gen_message(N) || N <- lists:seq(1000, 2000)],
+    ?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, mk_batch(Msgs1), #{})),
+
+    %% Add new generation and push some more.
+    ?assertEqual(ok, emqx_ds_storage_layer:add_generation(Shard, 3000)),
+    Msgs2 = [gen_message(N) || N <- lists:seq(4000, 5000)],
+    ?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, mk_batch(Msgs2), #{})),
+    ?assertEqual(ok, emqx_ds_storage_layer:add_generation(Shard, 6000)),
+
+    %% Take a snapshot of the shard.
+    {ok, SnapReader} = emqx_ds_storage_layer:take_snapshot(Shard),
+
+    %% Push even more messages to the shard AFTER taking the snapshot.
+    Msgs3 = [gen_message(N) || N <- lists:seq(7000, 8000)],
+    ?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, mk_batch(Msgs3), #{})),
+
+    %% Destroy the shard.
+    _ = unlink(Pid),
+    ok = proc_lib:stop(Pid, shutdown, infinity),
+    ok = emqx_ds_storage_layer:drop_shard(Shard),
+
+    %% Restore the shard from the snapshot.
+    {ok, SnapWriter} = emqx_ds_storage_layer:accept_snapshot(Shard),
+    ?assertEqual(ok, transfer_snapshot(SnapReader, SnapWriter)),
+
+    %% Verify that the restored shard contains the messages up until the snapshot.
+    {ok, _Pid} = emqx_ds_storage_layer:start_link(Shard, opts()),
+    ?assertEqual(
+        Msgs1 ++ Msgs2,
+        lists:keysort(#message.timestamp, consume(Shard, ['#']))
+    ).
+
+mk_batch(Msgs) ->
+    [{emqx_message:timestamp(Msg, microsecond), Msg} || Msg <- Msgs].
+
+gen_message(N) ->
+    Topic = emqx_topic:join([<<"foo">>, <<"bar">>, integer_to_binary(N)]),
+    message(Topic, integer_to_binary(N), N * 100).
+
+message(Topic, Payload, PublishedAt) ->
+    #message{
+        from = <<?MODULE_STRING>>,
+        topic = Topic,
+        payload = Payload,
+        timestamp = PublishedAt,
+        id = emqx_guid:gen()
+    }.
+
+transfer_snapshot(Reader, Writer) ->
+    ChunkSize = rand:uniform(1024),
+    case emqx_ds_storage_snapshot:read_chunk(Reader, ChunkSize) of
+        {RStatus, Chunk, NReader} ->
+            Data = iolist_to_binary(Chunk),
+            {WStatus, NWriter} = emqx_ds_storage_snapshot:write_chunk(Writer, Data),
+            %% Verify idempotency.
+            ?assertEqual(
+                {WStatus, NWriter},
+                emqx_ds_storage_snapshot:write_chunk(Writer, Data)
+            ),
+            %% Verify convergence.
+            ?assertEqual(
+                RStatus,
+                WStatus,
+                #{reader => NReader, writer => NWriter}
+            ),
+            case WStatus of
+                last ->
+                    ?assertEqual(ok, emqx_ds_storage_snapshot:release_reader(NReader)),
+                    ?assertEqual(ok, emqx_ds_storage_snapshot:release_writer(NWriter)),
+                    ok;
+                next ->
+                    transfer_snapshot(NReader, NWriter)
+            end;
+        {error, Reason} ->
+            {error, Reason, Reader}
+    end.
+
+consume(Shard, TopicFilter) ->
+    consume(Shard, TopicFilter, 0).
+
+consume(Shard, TopicFilter, StartTime) ->
+    Streams = emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime),
+    lists:flatmap(
+        fun({_Rank, Stream}) ->
+            {ok, It} = emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, StartTime),
+            consume_stream(Shard, It)
+        end,
+        Streams
+    ).
+
+consume_stream(Shard, It) ->
+    case emqx_ds_storage_layer:next(Shard, It, 100) of
+        {ok, _NIt, _Msgs = []} ->
+            [];
+        {ok, NIt, Batch} ->
+            [Msg || {_DSKey, Msg} <- Batch] ++ consume_stream(Shard, NIt);
+        {ok, end_of_stream} ->
+            []
+    end.
+
+%%
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_testcase(TCName, Config) ->
+    WorkDir = emqx_cth_suite:work_dir(TCName, Config),
+    Apps = emqx_cth_suite:start(
+        [{emqx_durable_storage, #{override_env => [{db_data_dir, WorkDir}]}}],
+        #{work_dir => WorkDir}
+    ),
+    [{apps, Apps} | Config].
+
+end_per_testcase(_TCName, Config) ->
+    ok = emqx_cth_suite:stop(?config(apps, Config)),
+    ok.