Jelajahi Sumber

Merge pull request #12780 from keynslug/ft/EMQX-11979/snapshot-transfer

feat(dsrepl): transfer storage snapshot during ra snapshot recovery
Andrew Mayorov 1 tahun lalu
induk
melakukan
879709e686

+ 21 - 2
apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl

@@ -21,7 +21,16 @@
 -behaviour(supervisor).
 
 %% API:
--export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1, ensure_egress/1]).
+-export([
+    start_db/2,
+    start_shard/1,
+    start_egress/1,
+    stop_shard/1,
+    terminate_storage/1,
+    restart_storage/1,
+    ensure_shard/1,
+    ensure_egress/1
+]).
 -export([which_shards/1]).
 
 %% behaviour callbacks:
@@ -64,12 +73,22 @@ start_shard({DB, Shard}) ->
 start_egress({DB, Shard}) ->
     supervisor:start_child(?via(#?egress_sup{db = DB}), egress_spec(DB, Shard)).
 
--spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _}.
+-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok.
 stop_shard(Shard = {DB, _}) ->
     Sup = ?via(#?shards_sup{db = DB}),
     ok = supervisor:terminate_child(Sup, Shard),
     ok = supervisor:delete_child(Sup, Shard).
 
+-spec terminate_storage(emqx_ds_storage_layer:shard_id()) -> ok | {error, _Reason}.
+terminate_storage({DB, Shard}) ->
+    Sup = ?via(#?shard_sup{db = DB, shard = Shard}),
+    supervisor:terminate_child(Sup, {Shard, storage}).
+
+-spec restart_storage(emqx_ds_storage_layer:shard_id()) -> {ok, _Child} | {error, _Reason}.
+restart_storage({DB, Shard}) ->
+    Sup = ?via(#?shard_sup{db = DB, shard = Shard}),
+    supervisor:restart_child(Sup, {Shard, storage}).
+
 -spec ensure_shard(emqx_ds_storage_layer:shard_id()) ->
     ok | {error, _Reason}.
 ensure_shard(Shard) ->

+ 4 - 2
apps/emqx_durable_storage/src/emqx_ds_lts.erl

@@ -263,12 +263,14 @@ trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token,
     end.
 
 -spec get_id_for_key(trie(), state(), edge()) -> static_key().
-get_id_for_key(#trie{static_key_size = Size}, _State, _Token) ->
+get_id_for_key(#trie{static_key_size = Size}, State, Token) when Size =< 32 ->
     %% Requirements for the return value:
     %%
     %% It should be globally unique for the `{State, Token}` pair. Other
     %% than that, there's no requirements. The return value doesn't even
     %% have to be deterministic, since the states are saved in the trie.
+    %% Yet, it helps a lot if it is, so that applying the same sequence
+    %% of topics to different tries will result in the same trie state.
     %%
     %% The generated value becomes the ID of the topic in the durable
     %% storage. Its size should be relatively small to reduce the
@@ -277,7 +279,7 @@ get_id_for_key(#trie{static_key_size = Size}, _State, _Token) ->
     %% If we want to impress computer science crowd, sorry, I mean to
     %% minimize storage requirements, we can even employ Huffman coding
     %% based on the frequency of messages.
-    <<Int:(Size * 8)>> = crypto:strong_rand_bytes(Size),
+    <<Int:(Size * 8), _/bytes>> = crypto:hash(sha256, term_to_binary([State | Token])),
     Int.
 
 %% erlfmt-ignore

+ 37 - 11
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -43,7 +43,6 @@
 -export([
     %% RPC Targets:
     do_drop_db_v1/1,
-    do_store_batch_v1/4,
     do_get_streams_v1/4,
     do_get_streams_v2/4,
     do_make_iterator_v2/5,
@@ -53,11 +52,11 @@
     do_get_delete_streams_v4/4,
     do_make_delete_iterator_v4/5,
     do_delete_next_v4/5,
-    %% Unused:
-    do_drop_generation_v3/3,
     %% Obsolete:
+    do_store_batch_v1/4,
     do_make_iterator_v1/5,
     do_add_generation_v2/1,
+    do_drop_generation_v3/3,
 
     %% Egress API:
     ra_store_batch/3
@@ -65,7 +64,9 @@
 
 -export([
     init/1,
-    apply/3
+    apply/3,
+
+    snapshot_module/0
 ]).
 
 -export_type([
@@ -80,6 +81,10 @@
     batch/0
 ]).
 
+-export_type([
+    ra_state/0
+]).
+
 -include_lib("emqx_utils/include/emqx_message.hrl").
 -include("emqx_ds_replication_layer.hrl").
 
@@ -133,6 +138,8 @@
 
 -type message_id() :: emqx_ds:message_id().
 
+%% TODO: this type is obsolete and is kept only for compatibility with
+%% BPAPIs. Remove it when emqx_ds_proto_v4 is gone (EMQX 5.6)
 -type batch() :: #{
     ?tag := ?BATCH,
     ?batch_messages := [emqx_types:message()]
@@ -140,6 +147,20 @@
 
 -type generation_rank() :: {shard_id(), term()}.
 
+%% Core state of the replication, i.e. the state of ra machine.
+-type ra_state() :: #{
+    db_shard := {emqx_ds:db(), shard_id()},
+    latest := timestamp_us()
+}.
+
+%% Command. Each command is an entry in the replication log.
+-type ra_command() :: #{
+    ?tag := ?BATCH | add_generation | update_config | drop_generation,
+    _ => _
+}.
+
+-type timestamp_us() :: non_neg_integer().
+
 %%================================================================================
 %% API functions
 %%================================================================================
@@ -380,10 +401,9 @@ do_drop_db_v1(DB) ->
     batch(),
     emqx_ds:message_store_opts()
 ) ->
-    emqx_ds:store_batch_result().
-do_store_batch_v1(DB, Shard, #{?tag := ?BATCH, ?batch_messages := Messages}, Options) ->
-    Batch = [{emqx_message:timestamp(Message), Message} || Message <- Messages],
-    emqx_ds_storage_layer:store_batch({DB, Shard}, Batch, Options).
+    no_return().
+do_store_batch_v1(_DB, _Shard, _Batch, _Options) ->
+    error(obsolete_api).
 
 %% Remove me in EMQX 5.6
 -dialyzer({nowarn_function, do_get_streams_v1/4}).
@@ -496,9 +516,9 @@ do_list_generations_with_lifetimes_v3(DB, Shard) ->
     ).
 
 -spec do_drop_generation_v3(emqx_ds:db(), shard_id(), emqx_ds_storage_layer:gen_id()) ->
-    ok | {error, _}.
-do_drop_generation_v3(DB, ShardId, GenId) ->
-    emqx_ds_storage_layer:drop_generation({DB, ShardId}, GenId).
+    no_return().
+do_drop_generation_v3(_DB, _ShardId, _GenId) ->
+    error(obsolete_api).
 
 -spec do_get_delete_streams_v4(
     emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
@@ -635,9 +655,12 @@ ra_drop_shard(DB, Shard) ->
 
 %%
 
+-spec init(_Args :: map()) -> ra_state().
 init(#{db := DB, shard := Shard}) ->
     #{db_shard => {DB, Shard}, latest => 0}.
 
+-spec apply(ra_machine:command_meta_data(), ra_command(), ra_state()) ->
+    {ra_state(), _Reply, _Effects}.
 apply(
     #{index := RaftIdx},
     #{
@@ -717,3 +740,6 @@ timestamp_to_timeus(TimestampMs) ->
 
 timeus_to_timestamp(TimestampUs) ->
     TimestampUs div 1000.
+
+snapshot_module() ->
+    emqx_ds_replication_snapshot.

+ 10 - 8
apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl

@@ -147,19 +147,21 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) ->
             Bootstrap = false;
         {error, name_not_registered} ->
             Bootstrap = true,
+            Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
+            LogOpts = maps:with(
+                [
+                    snapshot_interval,
+                    resend_window
+                ],
+                ReplicationOpts
+            ),
             ok = ra:start_server(DB, #{
                 id => LocalServer,
                 uid => <<ClusterName/binary, "_", Site/binary>>,
                 cluster_name => ClusterName,
                 initial_members => Servers,
-                machine => {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
-                log_init_args => maps:with(
-                    [
-                        snapshot_interval,
-                        resend_window
-                    ],
-                    ReplicationOpts
-                )
+                machine => Machine,
+                log_init_args => LogOpts
             })
     end,
     case Servers of

+ 256 - 0
apps/emqx_durable_storage/src/emqx_ds_replication_snapshot.erl

@@ -0,0 +1,256 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_replication_snapshot).
+
+-include_lib("snabbkaffe/include/trace.hrl").
+
+-behaviour(ra_snapshot).
+-export([
+    prepare/2,
+    write/3,
+
+    begin_read/2,
+    read_chunk/3,
+
+    begin_accept/2,
+    accept_chunk/2,
+    complete_accept/2,
+
+    recover/1,
+    validate/1,
+    read_meta/1
+]).
+
+%% Read state.
+-record(rs, {
+    phase :: machine_state | storage_snapshot,
+    started_at :: _Time :: integer(),
+    state :: emqx_ds_replication_layer:ra_state() | undefined,
+    reader :: emqx_ds_storage_snapshot:reader() | undefined
+}).
+
+%% Write state.
+-record(ws, {
+    phase :: machine_state | storage_snapshot,
+    started_at :: _Time :: integer(),
+    dir :: file:filename(),
+    meta :: ra_snapshot:meta(),
+    state :: emqx_ds_replication_layer:ra_state() | undefined,
+    writer :: emqx_ds_storage_snapshot:writer() | undefined
+}).
+
+-type rs() :: #rs{}.
+-type ws() :: #ws{}.
+
+-type ra_state() :: emqx_ds_replication_layer:ra_state().
+
+%% Writing a snapshot.
+%% This process is exactly the same as writing a ra log snapshot: store the
+%% log meta and the machine state in a single snapshot file.
+
+-spec prepare(_RaftIndex, ra_state()) -> _State :: ra_state().
+prepare(Index, State) ->
+    ra_log_snapshot:prepare(Index, State).
+
+-spec write(_SnapshotDir :: file:filename(), ra_snapshot:meta(), _State :: ra_state()) ->
+    ok | {ok, _BytesWritten :: non_neg_integer()} | {error, ra_snapshot:file_err()}.
+write(Dir, Meta, MachineState) ->
+    ra_log_snapshot:write(Dir, Meta, MachineState).
+
+%% Reading a snapshot.
+%%
+%% This is triggered by the leader when it finds out that a follower is
+%% behind so much that there are no log segments covering the gap anymore.
+%% This process, on the other hand, MUST involve reading the storage snapshot,
+%% (in addition to the log snapshot) to reconstruct the storage state on the
+%% target server.
+%%
+%% Currently, a snapshot reader is owned by a special "snapshot sender" process
+%% spawned by the leader `ra` server, which sends chunks to the target server
+%% in a tight loop. This process terminates under the following conditions:
+%% 1. The snapshot is completely read and sent.
+%% 2. Remote server fails to accept a chunk, either due to network failure (most
+%%    likely) or a logic error (very unlikely).
+%%
+%% TODO
+%% In the latter case the process terminates without the chance to clean up the
+%% snapshot reader resource, which will cause the snapshot to linger indefinitely.
+%% For better control over resources, observability, and niceties like flow
+%% control and backpressure we need to move this into a dedicated process tree.
+
+-spec begin_read(_SnapshotDir :: file:filename(), _Context :: #{}) ->
+    {ok, ra_snapshot:meta(), rs()} | {error, _Reason :: term()}.
+begin_read(Dir, _Context) ->
+    RS = #rs{
+        phase = machine_state,
+        started_at = erlang:monotonic_time(millisecond)
+    },
+    case ra_log_snapshot:recover(Dir) of
+        {ok, Meta, MachineState} ->
+            start_snapshot_reader(Meta, RS#rs{state = MachineState});
+        Error ->
+            Error
+    end.
+
+start_snapshot_reader(Meta, RS) ->
+    ShardId = shard_id(RS),
+    logger:info(#{
+        msg => "dsrepl_snapshot_read_started",
+        shard => ShardId
+    }),
+    {ok, SnapReader} = emqx_ds_storage_layer:take_snapshot(ShardId),
+    {ok, Meta, RS#rs{reader = SnapReader}}.
+
+-spec read_chunk(rs(), _Size :: non_neg_integer(), _SnapshotDir :: file:filename()) ->
+    {ok, binary(), {next, rs()} | last} | {error, _Reason :: term()}.
+read_chunk(RS = #rs{phase = machine_state, state = MachineState}, _Size, _Dir) ->
+    Chunk = term_to_binary(MachineState),
+    {ok, Chunk, {next, RS#rs{phase = storage_snapshot}}};
+read_chunk(RS = #rs{phase = storage_snapshot, reader = SnapReader0}, Size, _Dir) ->
+    case emqx_ds_storage_snapshot:read_chunk(SnapReader0, Size) of
+        {next, Chunk, SnapReader} ->
+            {ok, Chunk, {next, RS#rs{reader = SnapReader}}};
+        {last, Chunk, SnapReader} ->
+            %% TODO: idempotence?
+            ?tp(dsrepl_snapshot_read_complete, #{reader => SnapReader}),
+            _ = complete_read(RS#rs{reader = SnapReader}),
+            {ok, Chunk, last};
+        {error, Reason} ->
+            ?tp(dsrepl_snapshot_read_error, #{reason => Reason, reader => SnapReader0}),
+            _ = emqx_ds_storage_snapshot:release_reader(SnapReader0),
+            error(Reason)
+    end.
+
+complete_read(RS = #rs{reader = SnapReader, started_at = StartedAt}) ->
+    _ = emqx_ds_storage_snapshot:release_reader(SnapReader),
+    logger:info(#{
+        msg => "dsrepl_snapshot_read_complete",
+        shard => shard_id(RS),
+        duration_ms => erlang:monotonic_time(millisecond) - StartedAt,
+        read_bytes => emqx_ds_storage_snapshot:reader_info(bytes_read, SnapReader)
+    }).
+
+%% Accepting a snapshot.
+%%
+%% This process is triggered by the target server, when the leader finds out
+%% that the target server is severely lagging behind. This is receiving side of
+%% `begin_read/2` and `read_chunk/3`.
+%%
+%% Currently, a snapshot writer is owned by the follower `ra` server process
+%% residing in dedicated `receive_snapshot` state. This process reverts back
+%% to the regular `follower` state under the following conditions:
+%% 1. The snapshot is completely accepted, and the machine state is recovered.
+%% 2. The process times out waiting for the next chunk.
+%% 3. The process encounters a logic error (very unlikely).
+%%
+%% TODO
+%% In the latter cases, the snapshot writer will not have a chance to clean up.
+%% For better control over resources, observability, and niceties like flow
+%% control and backpressure we need to move this into a dedicated process tree.
+
+-spec begin_accept(_SnapshotDir :: file:filename(), ra_snapshot:meta()) ->
+    {ok, ws()}.
+begin_accept(Dir, Meta) ->
+    WS = #ws{
+        phase = machine_state,
+        started_at = erlang:monotonic_time(millisecond),
+        dir = Dir,
+        meta = Meta
+    },
+    {ok, WS}.
+
+-spec accept_chunk(binary(), ws()) ->
+    {ok, ws()} | {error, _Reason :: term()}.
+accept_chunk(Chunk, WS = #ws{phase = machine_state}) ->
+    MachineState = binary_to_term(Chunk),
+    start_snapshot_writer(WS#ws{state = MachineState});
+accept_chunk(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0}) ->
+    %% TODO: idempotence?
+    case emqx_ds_storage_snapshot:write_chunk(SnapWriter0, Chunk) of
+        {next, SnapWriter} ->
+            {ok, WS#ws{writer = SnapWriter}};
+        {error, Reason} ->
+            ?tp(dsrepl_snapshot_write_error, #{reason => Reason, writer => SnapWriter0}),
+            _ = emqx_ds_storage_snapshot:abort_writer(SnapWriter0),
+            error(Reason)
+    end.
+
+start_snapshot_writer(WS) ->
+    ShardId = shard_id(WS),
+    logger:info(#{
+        msg => "dsrepl_snapshot_write_started",
+        shard => ShardId
+    }),
+    _ = emqx_ds_builtin_db_sup:terminate_storage(ShardId),
+    {ok, SnapWriter} = emqx_ds_storage_layer:accept_snapshot(ShardId),
+    {ok, WS#ws{phase = storage_snapshot, writer = SnapWriter}}.
+
+-spec complete_accept(ws()) -> ok | {error, ra_snapshot:file_err()}.
+complete_accept(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0}) ->
+    %% TODO: idempotence?
+    case emqx_ds_storage_snapshot:write_chunk(SnapWriter0, Chunk) of
+        {last, SnapWriter} ->
+            ?tp(dsrepl_snapshot_write_complete, #{writer => SnapWriter}),
+            _ = emqx_ds_storage_snapshot:release_writer(SnapWriter),
+            Result = complete_accept(WS#ws{writer = SnapWriter}),
+            ?tp(dsrepl_snapshot_accepted, #{shard => shard_id(WS)}),
+            Result;
+        {error, Reason} ->
+            ?tp(dsrepl_snapshot_write_error, #{reason => Reason, writer => SnapWriter0}),
+            _ = emqx_ds_storage_snapshot:abort_writer(SnapWriter0),
+            error(Reason)
+    end.
+
+complete_accept(WS = #ws{started_at = StartedAt, writer = SnapWriter}) ->
+    ShardId = shard_id(WS),
+    logger:info(#{
+        msg => "dsrepl_snapshot_read_complete",
+        shard => ShardId,
+        duration_ms => erlang:monotonic_time(millisecond) - StartedAt,
+        bytes_written => emqx_ds_storage_snapshot:writer_info(bytes_written, SnapWriter)
+    }),
+    {ok, _} = emqx_ds_builtin_db_sup:restart_storage(ShardId),
+    write_machine_snapshot(WS).
+
+write_machine_snapshot(#ws{dir = Dir, meta = Meta, state = MachineState}) ->
+    write(Dir, Meta, MachineState).
+
+%% Restoring machine state from a snapshot.
+%% This is equivalent to restoring from a log snapshot.
+
+-spec recover(_SnapshotDir :: file:filename()) ->
+    {ok, ra_snapshot:meta(), ra_state()} | {error, _Reason}.
+recover(Dir) ->
+    %% TODO: Verify that storage layer is online?
+    ra_log_snapshot:recover(Dir).
+
+-spec validate(_SnapshotDir :: file:filename()) ->
+    ok | {error, _Reason}.
+validate(Dir) ->
+    ra_log_snapshot:validate(Dir).
+
+-spec read_meta(_SnapshotDir :: file:filename()) ->
+    {ok, ra_snapshot:meta()} | {error, _Reason}.
+read_meta(Dir) ->
+    ra_log_snapshot:read_meta(Dir).
+
+shard_id(#rs{state = MachineState}) ->
+    shard_id(MachineState);
+shard_id(#ws{state = MachineState}) ->
+    shard_id(MachineState);
+shard_id(MachineState) ->
+    maps:get(db_shard, MachineState).

+ 104 - 43
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -19,9 +19,12 @@
 
 %% Replication layer API:
 -export([
-    open_shard/2,
+    %% Lifecycle
+    start_link/2,
     drop_shard/1,
     shard_info/2,
+
+    %% Data
     store_batch/3,
     get_streams/3,
     get_delete_streams/3,
@@ -30,14 +33,20 @@
     update_iterator/3,
     next/3,
     delete_next/4,
+
+    %% Generations
     update_config/3,
     add_generation/2,
     list_generations_with_lifetimes/1,
-    drop_generation/2
+    drop_generation/2,
+
+    %% Snapshotting
+    take_snapshot/1,
+    accept_snapshot/1
 ]).
 
 %% gen_server
--export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
 
 %% internal exports:
 -export([db_dir/1]).
@@ -230,10 +239,7 @@
 -record(call_update_config, {options :: emqx_ds:create_db_opts(), since :: emqx_ds:time()}).
 -record(call_list_generations_with_lifetimes, {}).
 -record(call_drop_generation, {gen_id :: gen_id()}).
-
--spec open_shard(shard_id(), options()) -> ok.
-open_shard(Shard, Options) ->
-    emqx_ds_storage_layer_sup:ensure_shard(Shard, Options).
+-record(call_take_snapshot, {}).
 
 -spec drop_shard(shard_id()) -> ok.
 drop_shard(Shard) ->
@@ -245,11 +251,13 @@ drop_shard(Shard) ->
     emqx_ds:message_store_opts()
 ) ->
     emqx_ds:store_batch_result().
-store_batch(Shard, Messages, Options) ->
-    %% We always store messages in the current generation:
-    GenId = generation_current(Shard),
-    #{module := Mod, data := GenData} = generation_get(Shard, GenId),
-    Mod:store_batch(Shard, GenData, Messages, Options).
+store_batch(Shard, Messages = [{Time, _Msg} | _], Options) ->
+    %% NOTE
+    %% We assume that batches do not span generations. Callers should enforce this.
+    #{module := Mod, data := GenData} = generation_at(Shard, Time),
+    Mod:store_batch(Shard, GenData, Messages, Options);
+store_batch(_Shard, [], _Options) ->
+    ok.
 
 -spec get_streams(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) ->
     [{integer(), stream()}].
@@ -259,14 +267,14 @@ get_streams(Shard, TopicFilter, StartTime) ->
     lists:flatmap(
         fun(GenId) ->
             ?tp(get_streams_get_gen, #{gen_id => GenId}),
-            case generation_get_safe(Shard, GenId) of
-                {ok, #{module := Mod, data := GenData}} ->
+            case generation_get(Shard, GenId) of
+                #{module := Mod, data := GenData} ->
                     Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime),
                     [
                         {GenId, ?stream_v2(GenId, InnerStream)}
                      || InnerStream <- Streams
                     ];
-                {error, not_found} ->
+                not_found ->
                     %% race condition: generation was dropped before getting its streams?
                     []
             end
@@ -282,14 +290,14 @@ get_delete_streams(Shard, TopicFilter, StartTime) ->
     lists:flatmap(
         fun(GenId) ->
             ?tp(get_streams_get_gen, #{gen_id => GenId}),
-            case generation_get_safe(Shard, GenId) of
-                {ok, #{module := Mod, data := GenData}} ->
+            case generation_get(Shard, GenId) of
+                #{module := Mod, data := GenData} ->
                     Streams = Mod:get_delete_streams(Shard, GenData, TopicFilter, StartTime),
                     [
                         ?delete_stream(GenId, InnerStream)
                      || InnerStream <- Streams
                     ];
-                {error, not_found} ->
+                not_found ->
                     %% race condition: generation was dropped before getting its streams?
                     []
             end
@@ -302,8 +310,8 @@ get_delete_streams(Shard, TopicFilter, StartTime) ->
 make_iterator(
     Shard, ?stream_v2(GenId, Stream), TopicFilter, StartTime
 ) ->
-    case generation_get_safe(Shard, GenId) of
-        {ok, #{module := Mod, data := GenData}} ->
+    case generation_get(Shard, GenId) of
+        #{module := Mod, data := GenData} ->
             case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of
                 {ok, Iter} ->
                     {ok, #{
@@ -314,7 +322,7 @@ make_iterator(
                 {error, _} = Err ->
                     Err
             end;
-        {error, not_found} ->
+        not_found ->
             {error, unrecoverable, generation_not_found}
     end.
 
@@ -323,8 +331,8 @@ make_iterator(
 make_delete_iterator(
     Shard, ?delete_stream(GenId, Stream), TopicFilter, StartTime
 ) ->
-    case generation_get_safe(Shard, GenId) of
-        {ok, #{module := Mod, data := GenData}} ->
+    case generation_get(Shard, GenId) of
+        #{module := Mod, data := GenData} ->
             case Mod:make_delete_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of
                 {ok, Iter} ->
                     {ok, #{
@@ -335,7 +343,7 @@ make_delete_iterator(
                 {error, _} = Err ->
                     Err
             end;
-        {error, not_found} ->
+        not_found ->
             {error, end_of_stream}
     end.
 
@@ -346,8 +354,8 @@ update_iterator(
     #{?tag := ?IT, ?generation := GenId, ?enc := OldIter},
     DSKey
 ) ->
-    case generation_get_safe(Shard, GenId) of
-        {ok, #{module := Mod, data := GenData}} ->
+    case generation_get(Shard, GenId) of
+        #{module := Mod, data := GenData} ->
             case Mod:update_iterator(Shard, GenData, OldIter, DSKey) of
                 {ok, Iter} ->
                     {ok, #{
@@ -358,15 +366,15 @@ update_iterator(
                 {error, _} = Err ->
                     Err
             end;
-        {error, not_found} ->
+        not_found ->
             {error, unrecoverable, generation_not_found}
     end.
 
 -spec next(shard_id(), iterator(), pos_integer()) ->
     emqx_ds:next_result(iterator()).
 next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) ->
-    case generation_get_safe(Shard, GenId) of
-        {ok, #{module := Mod, data := GenData}} ->
+    case generation_get(Shard, GenId) of
+        #{module := Mod, data := GenData} ->
             Current = generation_current(Shard),
             case Mod:next(Shard, GenData, GenIter0, BatchSize) of
                 {ok, _GenIter, []} when GenId < Current ->
@@ -379,7 +387,7 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch
                 Error = {error, _, _} ->
                     Error
             end;
-        {error, not_found} ->
+        not_found ->
             %% generation was possibly dropped by GC
             {error, unrecoverable, generation_not_found}
     end.
@@ -392,8 +400,8 @@ delete_next(
     Selector,
     BatchSize
 ) ->
-    case generation_get_safe(Shard, GenId) of
-        {ok, #{module := Mod, data := GenData}} ->
+    case generation_get(Shard, GenId) of
+        #{module := Mod, data := GenData} ->
             Current = generation_current(Shard),
             case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize) of
                 {ok, _GenIter, _Deleted = 0, _IteratedOver = 0} when GenId < Current ->
@@ -406,7 +414,7 @@ delete_next(
                 Error = {error, _} ->
                     Error
             end;
-        {error, not_found} ->
+        not_found ->
             %% generation was possibly dropped by GC
             {ok, end_of_stream}
     end.
@@ -445,6 +453,20 @@ shard_info(ShardId, status) ->
         error:badarg -> down
     end.
 
+-spec take_snapshot(shard_id()) -> {ok, emqx_ds_storage_snapshot:reader()} | {error, _Reason}.
+take_snapshot(ShardId) ->
+    case gen_server:call(?REF(ShardId), #call_take_snapshot{}, infinity) of
+        {ok, Dir} ->
+            emqx_ds_storage_snapshot:new_reader(Dir);
+        Error ->
+            Error
+    end.
+
+-spec accept_snapshot(shard_id()) -> {ok, emqx_ds_storage_snapshot:writer()} | {error, _Reason}.
+accept_snapshot(ShardId) ->
+    ok = drop_shard(ShardId),
+    handle_accept_snapshot(ShardId).
+
 %%================================================================================
 %% gen_server for the shard
 %%================================================================================
@@ -514,6 +536,9 @@ handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) ->
     {Reply, S} = handle_drop_generation(S0, GenId),
     commit_metadata(S),
     {reply, Reply, S};
+handle_call(#call_take_snapshot{}, _From, S) ->
+    Snapshot = handle_take_snapshot(S),
+    {reply, Snapshot, S};
 handle_call(_Call, _From, S) ->
     {reply, {error, unknown_call}, S}.
 
@@ -680,7 +705,7 @@ create_new_shard_schema(ShardId, DB, CFRefs, Prototype) ->
     {gen_id(), shard_schema(), cf_refs()}.
 new_generation(ShardId, DB, Schema0, Since) ->
     #{current_generation := PrevGenId, prototype := {Mod, ModConf}} = Schema0,
-    GenId = PrevGenId + 1,
+    GenId = next_generation_id(PrevGenId),
     {GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConf),
     GenSchema = #{
         module => Mod,
@@ -696,6 +721,14 @@ new_generation(ShardId, DB, Schema0, Since) ->
     },
     {GenId, Schema, NewCFRefs}.
 
+-spec next_generation_id(gen_id()) -> gen_id().
+next_generation_id(GenId) ->
+    GenId + 1.
+
+-spec prev_generation_id(gen_id()) -> gen_id().
+prev_generation_id(GenId) when GenId > 0 ->
+    GenId - 1.
+
 %% @doc Commit current state of the server to both rocksdb and the persistent term
 -spec commit_metadata(server_state()) -> ok.
 commit_metadata(#s{shard_id = ShardId, schema = Schema, shard = Runtime, db = DB}) ->
@@ -735,7 +768,11 @@ rocksdb_open(Shard, Options) ->
 
 -spec db_dir(shard_id()) -> file:filename().
 db_dir({DB, ShardId}) ->
-    filename:join([emqx_ds:base_dir(), atom_to_list(DB), binary_to_list(ShardId)]).
+    filename:join([emqx_ds:base_dir(), DB, binary_to_list(ShardId)]).
+
+-spec checkpoint_dir(shard_id(), _Name :: file:name()) -> file:filename().
+checkpoint_dir({DB, ShardId}, Name) ->
+    filename:join([emqx_ds:base_dir(), DB, checkpoints, binary_to_list(ShardId), Name]).
 
 -spec update_last_until(Schema, emqx_ds:time()) ->
     Schema | {error, exists | overlaps_existing_generations}
@@ -768,6 +805,21 @@ run_post_creation_actions(#{new_gen_runtime_data := NewGenData}) ->
     %% Different implementation modules
     NewGenData.
 
+handle_take_snapshot(#s{db = DB, shard_id = ShardId}) ->
+    Name = integer_to_list(erlang:system_time(millisecond)),
+    Dir = checkpoint_dir(ShardId, Name),
+    _ = filelib:ensure_dir(Dir),
+    case rocksdb:checkpoint(DB, Dir) of
+        ok ->
+            {ok, Dir};
+        {error, _} = Error ->
+            Error
+    end.
+
+handle_accept_snapshot(ShardId) ->
+    Dir = db_dir(ShardId),
+    emqx_ds_storage_snapshot:new_writer(Dir).
+
 %%--------------------------------------------------------------------------------
 %% Schema access
 %%--------------------------------------------------------------------------------
@@ -777,18 +829,13 @@ generation_current(Shard) ->
     #{current_generation := Current} = get_schema_runtime(Shard),
     Current.
 
--spec generation_get(shard_id(), gen_id()) -> generation().
+-spec generation_get(shard_id(), gen_id()) -> generation() | not_found.
 generation_get(Shard, GenId) ->
-    {ok, GenData} = generation_get_safe(Shard, GenId),
-    GenData.
-
--spec generation_get_safe(shard_id(), gen_id()) -> {ok, generation()} | {error, not_found}.
-generation_get_safe(Shard, GenId) ->
     case get_schema_runtime(Shard) of
         #{?GEN_KEY(GenId) := GenData} ->
-            {ok, GenData};
+            GenData;
         #{} ->
-            {error, not_found}
+            not_found
     end.
 
 -spec generations_since(shard_id(), emqx_ds:time()) -> [gen_id()].
@@ -805,6 +852,20 @@ generations_since(Shard, Since) ->
         Schema
     ).
 
+-spec generation_at(shard_id(), emqx_ds:time()) -> generation().
+generation_at(Shard, Time) ->
+    Schema = #{current_generation := Current} = get_schema_runtime(Shard),
+    generation_at(Time, Current, Schema).
+
+generation_at(Time, GenId, Schema) ->
+    #{?GEN_KEY(GenId) := Gen} = Schema,
+    case Gen of
+        #{since := Since} when Time < Since andalso GenId > 0 ->
+            generation_at(Time, prev_generation_id(GenId), Schema);
+        _ ->
+            Gen
+    end.
+
 -define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}).
 
 -spec get_schema_runtime(shard_id()) -> shard().

+ 0 - 88
apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl

@@ -1,88 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
--module(emqx_ds_storage_layer_sup).
-
--behaviour(supervisor).
-
-%% API:
--export([start_link/0, start_shard/2, stop_shard/1, ensure_shard/2]).
-
-%% behaviour callbacks:
--export([init/1]).
-
-%%================================================================================
-%% Type declarations
-%%================================================================================
-
--define(SUP, ?MODULE).
-
-%%================================================================================
-%% API funcions
-%%================================================================================
-
--spec start_link() -> {ok, pid()}.
-start_link() ->
-    supervisor:start_link(?MODULE, []).
-
--spec start_shard(emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) ->
-    supervisor:startchild_ret().
-start_shard(Shard, Options) ->
-    supervisor:start_child(?SUP, shard_child_spec(Shard, Options)).
-
--spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _}.
-stop_shard(Shard) ->
-    ok = supervisor:terminate_child(?SUP, Shard),
-    ok = supervisor:delete_child(?SUP, Shard).
-
--spec ensure_shard(emqx_ds_storage_layer:shard_id(), emqx_ds_storage_layer:options()) ->
-    ok | {error, _Reason}.
-ensure_shard(Shard, Options) ->
-    case start_shard(Shard, Options) of
-        {ok, _Pid} ->
-            ok;
-        {error, {already_started, _Pid}} ->
-            ok;
-        {error, Reason} ->
-            {error, Reason}
-    end.
-
-%%================================================================================
-%% behaviour callbacks
-%%================================================================================
-
-init([]) ->
-    Children = [],
-    SupFlags = #{
-        strategy => one_for_one,
-        intensity => 10,
-        period => 10
-    },
-    {ok, {SupFlags, Children}}.
-
-%%================================================================================
-%% Internal functions
-%%================================================================================
-
--spec shard_child_spec(emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) ->
-    supervisor:child_spec().
-shard_child_spec(Shard, Options) ->
-    #{
-        id => Shard,
-        start => {emqx_ds_storage_layer, start_link, [Shard, Options]},
-        shutdown => 5_000,
-        restart => permanent,
-        type => worker
-    }.

+ 325 - 0
apps/emqx_durable_storage/src/emqx_ds_storage_snapshot.erl

@@ -0,0 +1,325 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_ds_storage_snapshot).
+
+-include_lib("kernel/include/file.hrl").
+
+-export([
+    new_reader/1,
+    read_chunk/2,
+    abort_reader/1,
+    release_reader/1,
+    reader_info/2
+]).
+
+-export([
+    new_writer/1,
+    write_chunk/2,
+    abort_writer/1,
+    release_writer/1,
+    writer_info/2
+]).
+
+-export_type([
+    reader/0,
+    writer/0
+]).
+
+%%
+
+-define(FILECHUNK(RELPATH, POS, MORE), #{
+    '$' => chunk,
+    rp => RELPATH,
+    pos => POS,
+    more => MORE
+}).
+-define(PAT_FILECHUNK(RELPATH, POS, MORE), #{
+    '$' := chunk,
+    rp := RELPATH,
+    pos := POS,
+    more := MORE
+}).
+
+-define(EOS(), #{
+    '$' => eos
+}).
+-define(PAT_EOS(), #{
+    '$' := eos
+}).
+
+-define(PAT_HEADER(), #{'$' := _}).
+
+%%
+
+-record(reader, {
+    dirpath :: file:filename(),
+    files :: #{_RelPath => reader_file()},
+    queue :: [_RelPath :: file:filename()]
+}).
+
+-record(rfile, {
+    abspath :: file:filename(),
+    fd :: file:io_device() | eof,
+    pos :: non_neg_integer()
+}).
+
+-opaque reader() :: #reader{}.
+-type reader_file() :: #rfile{}.
+
+-type reason() :: {atom(), _AbsPath :: file:filename(), _Details :: term()}.
+
+%% @doc Initialize a reader for a snapshot directory.
+%% Snapshot directory is a directory containing arbitrary number of regular
+%% files in arbitrary subdirectory structure. Files are read in indeterminate
+%% order. It's an error to have non-regular files in the directory (e.g. symlinks).
+-spec new_reader(_Dir :: file:filename()) -> {ok, reader()}.
+new_reader(DirPath) ->
+    %% NOTE
+    %% Opening all files at once, so there would be less error handling later
+    %% during transfer.
+    %% TODO
+    %% Beware of how errors are handled: if one file fails to open, the whole
+    %% process will exit. This is fine for the purpose of replication (because
+    %% ra spawns separate process for each transfer), but may not be suitable
+    %% for other use cases.
+    Files = emqx_utils_fs:traverse_dir(
+        fun(Path, Info, Acc) -> new_reader_file(Path, Info, DirPath, Acc) end,
+        #{},
+        DirPath
+    ),
+    {ok, #reader{
+        dirpath = DirPath,
+        files = Files,
+        queue = maps:keys(Files)
+    }}.
+
+new_reader_file(Path, #file_info{type = regular}, DirPath, Acc) ->
+    case file:open(Path, [read, binary, raw]) of
+        {ok, IoDev} ->
+            RelPath = emqx_utils_fs:find_relpath(Path, DirPath),
+            File = #rfile{abspath = Path, fd = IoDev, pos = 0},
+            Acc#{RelPath => File};
+        {error, Reason} ->
+            error({open_failed, Path, Reason})
+    end;
+new_reader_file(Path, #file_info{type = Type}, _, _Acc) ->
+    error({bad_file_type, Path, Type});
+new_reader_file(Path, {error, Reason}, _, _Acc) ->
+    error({inaccessible, Path, Reason}).
+
+%% @doc Read a chunk of data from the snapshot.
+%% Returns `{last, Chunk, Reader}` when the last chunk is read. After that, one
+%% should call `release_reader/1` to finalize the process (or `abort_reader/1` if
+%% keeping the snapshot is desired).
+-spec read_chunk(reader(), _Size :: non_neg_integer()) ->
+    {last | next, _Chunk :: iodata(), reader()} | {error, reason()}.
+read_chunk(R = #reader{files = Files, queue = [RelPath | Rest]}, Size) ->
+    File = maps:get(RelPath, Files),
+    case read_chunk_file(RelPath, File, Size) of
+        {last, Chunk, FileRest} ->
+            {next, Chunk, R#reader{files = Files#{RelPath := FileRest}, queue = Rest}};
+        {next, Chunk, FileRest} ->
+            {next, Chunk, R#reader{files = Files#{RelPath := FileRest}}};
+        Error ->
+            Error
+    end;
+read_chunk(R = #reader{queue = []}, _Size) ->
+    {last, make_packet(?EOS()), R}.
+
+read_chunk_file(RelPath, RFile0 = #rfile{fd = IoDev, pos = Pos, abspath = AbsPath}, Size) ->
+    case file:read(IoDev, Size) of
+        {ok, Chunk} ->
+            ChunkSize = byte_size(Chunk),
+            HasMore = ChunkSize div Size,
+            RFile1 = RFile0#rfile{pos = Pos + ChunkSize},
+            case HasMore of
+                _Yes = 1 ->
+                    Status = next,
+                    RFile = RFile1;
+                _No = 0 ->
+                    Status = last,
+                    RFile = release_reader_file(RFile1)
+            end,
+            Packet = make_packet(?FILECHUNK(RelPath, Pos, HasMore), Chunk),
+            {Status, Packet, RFile};
+        eof ->
+            Packet = make_packet(?FILECHUNK(RelPath, Pos, 0)),
+            {last, Packet, release_reader_file(RFile0)};
+        {error, Reason} ->
+            {error, {read_failed, AbsPath, Reason}}
+    end.
+
+%% @doc Aborts the snapshot reader, but does not release the snapshot files.
+-spec abort_reader(reader()) -> ok.
+abort_reader(#reader{files = Files}) ->
+    lists:foreach(fun release_reader_file/1, maps:values(Files)).
+
+%% @doc Aborts the snapshot reader and deletes the snapshot files.
+-spec release_reader(reader()) -> ok.
+release_reader(R = #reader{dirpath = DirPath}) ->
+    ok = abort_reader(R),
+    file:del_dir_r(DirPath).
+
+release_reader_file(RFile = #rfile{fd = eof}) ->
+    RFile;
+release_reader_file(RFile = #rfile{fd = IoDev}) ->
+    _ = file:close(IoDev),
+    RFile#rfile{fd = eof}.
+
+-spec reader_info(bytes_read, reader()) -> _Bytes :: non_neg_integer().
+reader_info(bytes_read, #reader{files = Files}) ->
+    maps:fold(fun(_, RFile, Sum) -> Sum + RFile#rfile.pos end, 0, Files).
+
+%%
+
+-record(writer, {
+    dirpath :: file:filename(),
+    files :: #{_RelPath :: file:filename() => writer_file()}
+}).
+
+-record(wfile, {
+    abspath :: file:filename(),
+    fd :: file:io_device() | eof,
+    pos :: non_neg_integer()
+}).
+
+-opaque writer() :: #writer{}.
+-type writer_file() :: #wfile{}.
+
+%% @doc Initialize a writer into a snapshot directory.
+%% The directory needs not to exist, it will be created if it doesn't.
+%% Having non-empty directory is not an error, existing files will be
+%% overwritten.
+-spec new_writer(_Dir :: file:filename()) -> {ok, writer()} | {error, reason()}.
+new_writer(DirPath) ->
+    case filelib:ensure_path(DirPath) of
+        ok ->
+            {ok, #writer{dirpath = DirPath, files = #{}}};
+        {error, Reason} ->
+            {error, {mkdir_failed, DirPath, Reason}}
+    end.
+
+%% @doc Write a chunk of data to the snapshot.
+%% Returns `{last, Writer}` when the last chunk is written. After that, one
+%% should call `release_writer/1` to finalize the process.
+-spec write_chunk(writer(), _Chunk :: binary()) ->
+    {last | next, writer()} | {error, _Reason}.
+write_chunk(W, Packet) ->
+    case parse_packet(Packet) of
+        {?PAT_FILECHUNK(RelPath, Pos, More), Chunk} ->
+            write_chunk(W, RelPath, Pos, More, Chunk);
+        {?PAT_EOS(), _Rest} ->
+            %% TODO: Verify all files are `eof` at this point?
+            {last, W};
+        Error ->
+            Error
+    end.
+
+write_chunk(W = #writer{files = Files}, RelPath, Pos, More, Chunk) ->
+    case Files of
+        #{RelPath := WFile} ->
+            write_chunk(W, WFile, RelPath, Pos, More, Chunk);
+        #{} when Pos == 0 ->
+            case new_writer_file(W, RelPath) of
+                WFile = #wfile{} ->
+                    write_chunk(W, WFile, RelPath, Pos, More, Chunk);
+                Error ->
+                    Error
+            end;
+        #{} ->
+            {error, {bad_chunk, RelPath, Pos}}
+    end.
+
+write_chunk(W = #writer{files = Files}, WFile0, RelPath, Pos, More, Chunk) ->
+    case write_chunk_file(WFile0, Pos, More, Chunk) of
+        WFile = #wfile{} ->
+            {next, W#writer{files = Files#{RelPath => WFile}}};
+        Error ->
+            Error
+    end.
+
+new_writer_file(#writer{dirpath = DirPath}, RelPath) ->
+    AbsPath = filename:join(DirPath, RelPath),
+    _ = filelib:ensure_dir(AbsPath),
+    case file:open(AbsPath, [write, binary, raw]) of
+        {ok, IoDev} ->
+            #wfile{
+                abspath = AbsPath,
+                fd = IoDev,
+                pos = 0
+            };
+        {error, Reason} ->
+            {error, {open_failed, AbsPath, Reason}}
+    end.
+
+write_chunk_file(WFile0 = #wfile{fd = IoDev, pos = Pos, abspath = AbsPath}, Pos, More, Chunk) ->
+    ChunkSize = byte_size(Chunk),
+    case file:write(IoDev, Chunk) of
+        ok ->
+            WFile1 = WFile0#wfile{pos = Pos + ChunkSize},
+            case More of
+                0 -> release_writer_file(WFile1);
+                _ -> WFile1
+            end;
+        {error, Reason} ->
+            {error, {write_failed, AbsPath, Reason}}
+    end;
+write_chunk_file(WFile = #wfile{pos = WPos}, Pos, _More, _Chunk) when Pos < WPos ->
+    WFile;
+write_chunk_file(#wfile{abspath = AbsPath}, Pos, _More, _Chunk) ->
+    {error, {bad_chunk, AbsPath, Pos}}.
+
+%% @doc Abort the writer and clean up unfinished snapshot files.
+-spec abort_writer(writer()) -> ok | {error, file:posix()}.
+abort_writer(W = #writer{dirpath = DirPath}) ->
+    ok = release_writer(W),
+    file:del_dir_r(DirPath).
+
+%% @doc Release the writer and close all snapshot files.
+-spec release_writer(writer()) -> ok.
+release_writer(#writer{files = Files}) ->
+    ok = lists:foreach(fun release_writer_file/1, maps:values(Files)).
+
+release_writer_file(WFile = #wfile{fd = eof}) ->
+    WFile;
+release_writer_file(WFile = #wfile{fd = IoDev}) ->
+    _ = file:close(IoDev),
+    WFile#wfile{fd = eof}.
+
+-spec writer_info(bytes_written, writer()) -> _Bytes :: non_neg_integer().
+writer_info(bytes_written, #writer{files = Files}) ->
+    maps:fold(fun(_, WFile, Sum) -> Sum + WFile#wfile.pos end, 0, Files).
+
+%%
+
+make_packet(Header) ->
+    term_to_binary(Header).
+
+make_packet(Header, Rest) ->
+    HeaderBytes = term_to_binary(Header),
+    <<HeaderBytes/binary, Rest/binary>>.
+
+parse_packet(Packet) ->
+    try binary_to_term(Packet, [safe, used]) of
+        {Header = ?PAT_HEADER(), Length} ->
+            {_, Rest} = split_binary(Packet, Length),
+            {Header, Rest};
+        {Header, _} ->
+            {error, {bad_header, Header}}
+    catch
+        error:badarg ->
+            {error, bad_packet}
+    end.

+ 22 - 58
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -98,8 +98,8 @@ t_03_smoke_iterate(_Config) ->
     ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
     [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
     {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
-    {ok, Iter, Batch} = iterate(DB, Iter0, 1),
-    ?assertEqual(Msgs, [Msg || {_Key, Msg} <- Batch], {Iter0, Iter}).
+    {ok, Iter, Batch} = emqx_ds_test_helpers:consume_iter(DB, Iter0),
+    ?assertEqual(Msgs, Batch, {Iter0, Iter}).
 
 %% Verify that iterators survive restart of the application. This is
 %% an important property, since the lifetime of the iterators is tied
@@ -125,8 +125,8 @@ t_04_restart(_Config) ->
     {ok, _} = application:ensure_all_started(emqx_durable_storage),
     ok = emqx_ds:open_db(DB, opts()),
     %% The old iterator should be still operational:
-    {ok, Iter, Batch} = iterate(DB, Iter0, 1),
-    ?assertEqual(Msgs, [Msg || {_Key, Msg} <- Batch], {Iter0, Iter}).
+    {ok, Iter, Batch} = emqx_ds_test_helpers:consume_iter(DB, Iter0),
+    ?assertEqual(Msgs, Batch, {Iter0, Iter}).
 
 %% Check that we can create iterators directly from DS keys.
 t_05_update_iterator(_Config) ->
@@ -148,9 +148,8 @@ t_05_update_iterator(_Config) ->
     Res1 = emqx_ds:update_iterator(DB, OldIter, Key0),
     ?assertMatch({ok, _Iter1}, Res1),
     {ok, Iter1} = Res1,
-    {ok, FinalIter, Batch} = iterate(DB, Iter1, 1),
-    AllMsgs = [Msg0 | [Msg || {_Key, Msg} <- Batch]],
-    ?assertEqual(Msgs, AllMsgs, #{from_key => Iter1, final_iter => FinalIter}),
+    {ok, Iter, Batch} = emqx_ds_test_helpers:consume_iter(DB, Iter1, #{batch_size => 1}),
+    ?assertEqual(Msgs, [Msg0 | Batch], #{from_key => Iter1, final_iter => Iter}),
     ok.
 
 t_06_update_config(_Config) ->
@@ -190,9 +189,9 @@ t_06_update_config(_Config) ->
         ),
 
     Checker = fun({StartTime, Msgs0}, Acc) ->
-        Msgs = Msgs0 ++ Acc,
-        Batch = fetch_all(DB, TopicFilter, StartTime),
-        ?assertEqual(Msgs, Batch, {StartTime}),
+        Msgs = Acc ++ Msgs0,
+        Batch = emqx_ds_test_helpers:consume(DB, TopicFilter, StartTime),
+        ?assertEqual(Msgs, Batch, StartTime),
         Msgs
     end,
     lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)).
@@ -234,9 +233,9 @@ t_07_add_generation(_Config) ->
         ),
 
     Checker = fun({StartTime, Msgs0}, Acc) ->
-        Msgs = Msgs0 ++ Acc,
-        Batch = fetch_all(DB, TopicFilter, StartTime),
-        ?assertEqual(Msgs, Batch, {StartTime}),
+        Msgs = Acc ++ Msgs0,
+        Batch = emqx_ds_test_helpers:consume(DB, TopicFilter, StartTime),
+        ?assertEqual(Msgs, Batch, StartTime),
         Msgs
     end,
     lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)).
@@ -398,9 +397,8 @@ t_smoke_delete_next(_Config) ->
 
             TopicFilterHash = ['#'],
             [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilterHash, StartTime),
-            {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilterHash, StartTime),
-            {ok, _Iter, Batch} = iterate(DB, Iter0, 1),
-            ?assertEqual([Msg1, Msg3], [Msg || {_Key, Msg} <- Batch]),
+            Batch = emqx_ds_test_helpers:consume_stream(DB, Stream, TopicFilterHash, StartTime),
+            ?assertEqual([Msg1, Msg3], Batch),
 
             ok = emqx_ds:add_generation(DB),
 
@@ -444,9 +442,9 @@ t_drop_generation_with_never_used_iterator(_Config) ->
     ],
     ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)),
 
-    ?assertMatch(
-        {error, unrecoverable, generation_not_found, []},
-        iterate(DB, Iter0, 1)
+    ?assertError(
+        {error, unrecoverable, generation_not_found},
+        emqx_ds_test_helpers:consume_iter(DB, Iter0)
     ),
 
     %% New iterator for the new stream will only see the later messages.
@@ -454,9 +452,9 @@ t_drop_generation_with_never_used_iterator(_Config) ->
     ?assertNotEqual(Stream0, Stream1),
     {ok, Iter1} = emqx_ds:make_iterator(DB, Stream1, TopicFilter, StartTime),
 
-    {ok, Iter, Batch} = iterate(DB, Iter1, 1),
+    {ok, Iter, Batch} = emqx_ds_test_helpers:consume_iter(DB, Iter1, #{batch_size => 1}),
     ?assertNotEqual(end_of_stream, Iter),
-    ?assertEqual(Msgs1, [Msg || {_Key, Msg} <- Batch]),
+    ?assertEqual(Msgs1, Batch),
 
     ok.
 
@@ -496,9 +494,9 @@ t_drop_generation_with_used_once_iterator(_Config) ->
     ],
     ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)),
 
-    ?assertMatch(
-        {error, unrecoverable, generation_not_found, []},
-        iterate(DB, Iter1, 1)
+    ?assertError(
+        {error, unrecoverable, generation_not_found},
+        emqx_ds_test_helpers:consume_iter(DB, Iter1)
     ).
 
 t_drop_generation_update_iterator(_Config) ->
@@ -702,25 +700,6 @@ update_data_set() ->
         ]
     ].
 
-fetch_all(DB, TopicFilter, StartTime) ->
-    Streams0 = emqx_ds:get_streams(DB, TopicFilter, StartTime),
-    Streams = lists:sort(
-        fun({{_, A}, _}, {{_, B}, _}) ->
-            A < B
-        end,
-        Streams0
-    ),
-    lists:foldl(
-        fun({_, Stream}, Acc) ->
-            {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
-            {ok, _, Msgs0} = iterate(DB, Iter0, StartTime),
-            Msgs = lists:map(fun({_, Msg}) -> Msg end, Msgs0),
-            Acc ++ Msgs
-        end,
-        [],
-        Streams
-    ).
-
 message(ClientId, Topic, Payload, PublishedAt) ->
     Msg = message(Topic, Payload, PublishedAt),
     Msg#message{from = ClientId}.
@@ -733,21 +712,6 @@ message(Topic, Payload, PublishedAt) ->
         id = emqx_guid:gen()
     }.
 
-iterate(DB, It, BatchSize) ->
-    iterate(DB, It, BatchSize, []).
-
-iterate(DB, It0, BatchSize, Acc) ->
-    case emqx_ds:next(DB, It0, BatchSize) of
-        {ok, It, []} ->
-            {ok, It, Acc};
-        {ok, It, Msgs} ->
-            iterate(DB, It, BatchSize, Acc ++ Msgs);
-        {ok, end_of_stream} ->
-            {ok, end_of_stream, Acc};
-        {error, Class, Reason} ->
-            {error, Class, Reason, Acc}
-    end.
-
 delete(DB, It, Selector, BatchSize) ->
     delete(DB, It, Selector, BatchSize, 0).
 

+ 181 - 0
apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl

@@ -0,0 +1,181 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_ds_replication_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("stdlib/include/assert.hrl").
+-include_lib("snabbkaffe/include/test_macros.hrl").
+
+-define(DB, testdb).
+
+opts() ->
+    #{
+        backend => builtin,
+        storage => {emqx_ds_storage_bitfield_lts, #{}},
+        n_shards => 1,
+        n_sites => 3,
+        replication_factor => 3,
+        replication_options => #{
+            wal_max_size_bytes => 128 * 1024,
+            wal_max_batch_size => 1024,
+            snapshot_interval => 128
+        }
+    }.
+
+t_replication_transfers_snapshots(Config) ->
+    NMsgs = 4000,
+    Nodes = [Node, NodeOffline | _] = ?config(nodes, Config),
+    _Specs = [_, SpecOffline | _] = ?config(specs, Config),
+
+    %% Initialize DB on all nodes and wait for it to be online.
+    ?assertEqual(
+        [{ok, ok} || _ <- Nodes],
+        erpc:multicall(Nodes, emqx_ds, open_db, [?DB, opts()])
+    ),
+    ?retry(
+        500,
+        10,
+        ?assertMatch([[_], [_], [_]], [shards_online(N, ?DB) || N <- Nodes])
+    ),
+
+    %% Stop the DB on the "offline" node.
+    ok = emqx_cth_cluster:stop_node(NodeOffline),
+
+    %% Fill the storage with messages and few additional generations.
+    Messages = fill_storage(Node, ?DB, NMsgs, #{p_addgen => 0.01}),
+
+    %% Restart the node.
+    [NodeOffline] = emqx_cth_cluster:restart(SpecOffline),
+    {ok, SRef} = snabbkaffe:subscribe(
+        ?match_event(#{
+            ?snk_kind := dsrepl_snapshot_accepted,
+            ?snk_meta := #{node := NodeOffline}
+        })
+    ),
+    ?assertEqual(
+        ok,
+        erpc:call(NodeOffline, emqx_ds, open_db, [?DB, opts()])
+    ),
+
+    %% Trigger storage operation and wait the replica to be restored.
+    _ = add_generation(Node, ?DB),
+    ?assertMatch(
+        {ok, _},
+        snabbkaffe:receive_events(SRef)
+    ),
+
+    %% Wait until any pending replication activities are finished (e.g. Raft log entries).
+    ok = timer:sleep(3_000),
+
+    %% Check that the DB has been restored.
+    Shard = hd(shards(NodeOffline, ?DB)),
+    MessagesOffline = lists:keysort(
+        #message.timestamp,
+        consume(NodeOffline, ?DB, Shard, ['#'], 0)
+    ),
+    ?assertEqual(
+        sample(40, Messages),
+        sample(40, MessagesOffline)
+    ),
+    ?assertEqual(
+        Messages,
+        MessagesOffline
+    ).
+
+shards(Node, DB) ->
+    erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]).
+
+shards_online(Node, DB) ->
+    erpc:call(Node, emqx_ds_builtin_db_sup, which_shards, [DB]).
+
+fill_storage(Node, DB, NMsgs, Opts) ->
+    fill_storage(Node, DB, NMsgs, 0, Opts).
+
+fill_storage(Node, DB, NMsgs, I, Opts = #{p_addgen := PAddGen}) when I < NMsgs ->
+    R1 = push_message(Node, DB, I),
+    R2 = probably(PAddGen, fun() -> add_generation(Node, DB) end),
+    R1 ++ R2 ++ fill_storage(Node, DB, NMsgs, I + 1, Opts);
+fill_storage(_Node, _DB, NMsgs, NMsgs, _Opts) ->
+    [].
+
+push_message(Node, DB, I) ->
+    Topic = emqx_topic:join([<<"topic">>, <<"foo">>, integer_to_binary(I)]),
+    {Bytes, _} = rand:bytes_s(120, rand:seed_s(default, I)),
+    Message = message(Topic, Bytes, I * 100),
+    ok = erpc:call(Node, emqx_ds, store_batch, [DB, [Message], #{sync => true}]),
+    [Message].
+
+add_generation(Node, DB) ->
+    ok = erpc:call(Node, emqx_ds, add_generation, [DB]),
+    [].
+
+message(Topic, Payload, PublishedAt) ->
+    #message{
+        from = <<?MODULE_STRING>>,
+        topic = Topic,
+        payload = Payload,
+        timestamp = PublishedAt,
+        id = emqx_guid:gen()
+    }.
+
+consume(Node, DB, Shard, TopicFilter, StartTime) ->
+    erpc:call(Node, emqx_ds_test_helpers, storage_consume, [{DB, Shard}, TopicFilter, StartTime]).
+
+probably(P, Fun) ->
+    case rand:uniform() of
+        X when X < P -> Fun();
+        _ -> []
+    end.
+
+sample(N, List) ->
+    L = length(List),
+    H = N div 2,
+    Filler = integer_to_list(L - N) ++ " more",
+    lists:sublist(List, H) ++ [Filler] ++ lists:sublist(List, L - H, L).
+
+%%
+
+suite() -> [{timetrap, {seconds, 60}}].
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_testcase(TCName, Config) ->
+    Apps = [
+        {emqx_durable_storage, #{
+            before_start => fun snabbkaffe:fix_ct_logging/0,
+            override_env => [{egress_flush_interval, 1}]
+        }}
+    ],
+    WorkDir = emqx_cth_suite:work_dir(TCName, Config),
+    NodeSpecs = emqx_cth_cluster:mk_nodespecs(
+        [
+            {emqx_ds_replication_SUITE1, #{apps => Apps}},
+            {emqx_ds_replication_SUITE2, #{apps => Apps}},
+            {emqx_ds_replication_SUITE3, #{apps => Apps}}
+        ],
+        #{work_dir => WorkDir}
+    ),
+    Nodes = emqx_cth_cluster:start(NodeSpecs),
+    ok = snabbkaffe:start_trace(),
+    [{nodes, Nodes}, {specs, NodeSpecs} | Config].
+
+end_per_testcase(_TCName, Config) ->
+    ok = snabbkaffe:stop(),
+    ok = emqx_cth_cluster:stop(?config(nodes, Config)).

+ 148 - 0
apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl

@@ -0,0 +1,148 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_ds_storage_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("stdlib/include/assert.hrl").
+
+opts() ->
+    #{storage => {emqx_ds_storage_bitfield_lts, #{}}}.
+
+%%
+
+t_idempotent_store_batch(_Config) ->
+    Shard = {?FUNCTION_NAME, _ShardId = <<"42">>},
+    {ok, Pid} = emqx_ds_storage_layer:start_link(Shard, opts()),
+    %% Push some messages to the shard.
+    Msgs1 = [gen_message(N) || N <- lists:seq(10, 20)],
+    GenTs = 30,
+    Msgs2 = [gen_message(N) || N <- lists:seq(40, 50)],
+    ?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs1), #{})),
+    %% Add new generation and push the same batch + some more.
+    ?assertEqual(ok, emqx_ds_storage_layer:add_generation(Shard, GenTs)),
+    ?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs1), #{})),
+    ?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs2), #{})),
+    %% First batch should have been handled idempotently.
+    ?assertEqual(
+        Msgs1 ++ Msgs2,
+        lists:keysort(#message.timestamp, emqx_ds_test_helpers:storage_consume(Shard, ['#']))
+    ),
+    ok = stop_shard(Pid).
+
+t_snapshot_take_restore(_Config) ->
+    Shard = {?FUNCTION_NAME, _ShardId = <<"42">>},
+    {ok, Pid} = emqx_ds_storage_layer:start_link(Shard, opts()),
+
+    %% Push some messages to the shard.
+    Msgs1 = [gen_message(N) || N <- lists:seq(1000, 2000)],
+    ?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs1), #{})),
+
+    %% Add new generation and push some more.
+    ?assertEqual(ok, emqx_ds_storage_layer:add_generation(Shard, 3000)),
+    Msgs2 = [gen_message(N) || N <- lists:seq(4000, 5000)],
+    ?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs2), #{})),
+    ?assertEqual(ok, emqx_ds_storage_layer:add_generation(Shard, 6000)),
+
+    %% Take a snapshot of the shard.
+    {ok, SnapReader} = emqx_ds_storage_layer:take_snapshot(Shard),
+
+    %% Push even more messages to the shard AFTER taking the snapshot.
+    Msgs3 = [gen_message(N) || N <- lists:seq(7000, 8000)],
+    ?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs3), #{})),
+
+    %% Destroy the shard.
+    ok = stop_shard(Pid),
+    ok = emqx_ds_storage_layer:drop_shard(Shard),
+
+    %% Restore the shard from the snapshot.
+    {ok, SnapWriter} = emqx_ds_storage_layer:accept_snapshot(Shard),
+    ?assertEqual(ok, transfer_snapshot(SnapReader, SnapWriter)),
+
+    %% Verify that the restored shard contains the messages up until the snapshot.
+    {ok, _Pid} = emqx_ds_storage_layer:start_link(Shard, opts()),
+    ?assertEqual(
+        Msgs1 ++ Msgs2,
+        lists:keysort(#message.timestamp, emqx_ds_test_helpers:storage_consume(Shard, ['#']))
+    ).
+
+transfer_snapshot(Reader, Writer) ->
+    ChunkSize = rand:uniform(1024),
+    ReadResult = emqx_ds_storage_snapshot:read_chunk(Reader, ChunkSize),
+    ?assertMatch({RStatus, _, _} when RStatus == next; RStatus == last, ReadResult),
+    {RStatus, Chunk, NReader} = ReadResult,
+    Data = iolist_to_binary(Chunk),
+    {WStatus, NWriter} = emqx_ds_storage_snapshot:write_chunk(Writer, Data),
+    %% Verify idempotency.
+    ?assertMatch(
+        {WStatus, NWriter},
+        emqx_ds_storage_snapshot:write_chunk(NWriter, Data)
+    ),
+    %% Verify convergence.
+    ?assertEqual(
+        RStatus,
+        WStatus,
+        #{reader => NReader, writer => NWriter}
+    ),
+    case WStatus of
+        last ->
+            ?assertEqual(ok, emqx_ds_storage_snapshot:release_reader(NReader)),
+            ?assertEqual(ok, emqx_ds_storage_snapshot:release_writer(NWriter)),
+            ok;
+        next ->
+            transfer_snapshot(NReader, NWriter)
+    end.
+
+%%
+
+batch(Msgs) ->
+    [{emqx_message:timestamp(Msg), Msg} || Msg <- Msgs].
+
+gen_message(N) ->
+    Topic = emqx_topic:join([<<"foo">>, <<"bar">>, integer_to_binary(N)]),
+    message(Topic, crypto:strong_rand_bytes(16), N).
+
+message(Topic, Payload, PublishedAt) ->
+    #message{
+        from = <<?MODULE_STRING>>,
+        topic = Topic,
+        payload = Payload,
+        timestamp = PublishedAt,
+        id = emqx_guid:gen()
+    }.
+
+stop_shard(Pid) ->
+    _ = unlink(Pid),
+    proc_lib:stop(Pid, shutdown, infinity).
+
+%%
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_testcase(TCName, Config) ->
+    WorkDir = emqx_cth_suite:work_dir(TCName, Config),
+    Apps = emqx_cth_suite:start(
+        [{emqx_durable_storage, #{override_env => [{db_data_dir, WorkDir}]}}],
+        #{work_dir => WorkDir}
+    ),
+    [{apps, Apps} | Config].
+
+end_per_testcase(_TCName, Config) ->
+    ok = emqx_cth_suite:stop(?config(apps, Config)),
+    ok.

+ 60 - 0
apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl

@@ -56,3 +56,63 @@ mock_rpc_result(gen_rpc, ExpectFun) ->
                 {badrpc, timeout}
         end
     end).
+
+%% Consuming streams and iterators
+
+consume(DB, TopicFilter) ->
+    consume(DB, TopicFilter, 0).
+
+consume(DB, TopicFilter, StartTime) ->
+    Streams = emqx_ds:get_streams(DB, TopicFilter, StartTime),
+    lists:flatmap(
+        fun({_Rank, Stream}) -> consume_stream(DB, Stream, TopicFilter, StartTime) end,
+        Streams
+    ).
+
+consume_stream(DB, Stream, TopicFilter, StartTime) ->
+    {ok, It0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
+    {ok, _It, Msgs} = consume_iter(DB, It0),
+    Msgs.
+
+consume_iter(DB, It) ->
+    consume_iter(DB, It, #{}).
+
+consume_iter(DB, It, Opts) ->
+    consume_iter_with(fun emqx_ds:next/3, [DB], It, Opts).
+
+storage_consume(ShardId, TopicFilter) ->
+    storage_consume(ShardId, TopicFilter, 0).
+
+storage_consume(ShardId, TopicFilter, StartTime) ->
+    Streams = emqx_ds_storage_layer:get_streams(ShardId, TopicFilter, StartTime),
+    lists:flatmap(
+        fun({_Rank, Stream}) ->
+            storage_consume_stream(ShardId, Stream, TopicFilter, StartTime)
+        end,
+        Streams
+    ).
+
+storage_consume_stream(ShardId, Stream, TopicFilter, StartTime) ->
+    {ok, It0} = emqx_ds_storage_layer:make_iterator(ShardId, Stream, TopicFilter, StartTime),
+    {ok, _It, Msgs} = storage_consume_iter(ShardId, It0),
+    Msgs.
+
+storage_consume_iter(ShardId, It) ->
+    storage_consume_iter(ShardId, It, #{}).
+
+storage_consume_iter(ShardId, It, Opts) ->
+    consume_iter_with(fun emqx_ds_storage_layer:next/3, [ShardId], It, Opts).
+
+consume_iter_with(NextFun, Args, It0, Opts) ->
+    BatchSize = maps:get(batch_size, Opts, 5),
+    case erlang:apply(NextFun, Args ++ [It0, BatchSize]) of
+        {ok, It, _Msgs = []} ->
+            {ok, It, []};
+        {ok, It1, Batch} ->
+            {ok, It, Msgs} = consume_iter_with(NextFun, Args, It1, Opts),
+            {ok, It, [Msg || {_DSKey, Msg} <- Batch] ++ Msgs};
+        {ok, Eos = end_of_stream} ->
+            {ok, Eos, []};
+        {error, Class, Reason} ->
+            error({error, Class, Reason})
+    end.