Просмотр исходного кода

test(ds): Use streams to create traffic

ieQu1 1 год назад
Родитель
Сommit
68ca891f41

+ 35 - 16
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -88,6 +88,7 @@
 ]).
 
 -include_lib("emqx_utils/include/emqx_message.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
 -include("emqx_ds_replication_layer.hrl").
 
 %%================================================================================
@@ -691,37 +692,39 @@ apply(
         ?tag := ?BATCH,
         ?batch_messages := MessagesIn
     },
-    #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State
+    #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0
 ) ->
     %% NOTE
     %% Unique timestamp tracking real time closely.
     %% With microsecond granularity it should be nearly impossible for it to run
     %% too far ahead than the real time clock.
+    ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, ts => Latest0}),
     {Latest, Messages} = assign_timestamps(Latest0, MessagesIn),
     Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}),
-    NState = State#{latest := Latest},
+    State = State0#{latest := Latest},
+    set_ts(DBShard, Latest),
     %% TODO: Need to measure effects of changing frequency of `release_cursor`.
-    Effect = {release_cursor, RaftIdx, NState},
-    emqx_ds_builtin_sup:set_gvar(DB, ?gv_timestamp(Shard), Latest),
-    {NState, Result, Effect};
+    Effect = {release_cursor, RaftIdx, State},
+    {State, Result, Effect};
 apply(
     _RaftMeta,
     #{?tag := add_generation, ?since := Since},
-    #{db_shard := DBShard, latest := Latest} = State
+    #{db_shard := DBShard, latest := Latest0} = State0
 ) ->
-    {Timestamp, NLatest} = ensure_monotonic_timestamp(Since, Latest),
+    {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0),
     Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp),
-    NState = State#{latest := NLatest},
-    {NState, Result};
+    State = State0#{latest := Latest},
+    set_ts(DBShard, Latest),
+    {State, Result};
 apply(
     _RaftMeta,
     #{?tag := update_config, ?since := Since, ?config := Opts},
-    #{db_shard := DBShard, latest := Latest} = State
+    #{db_shard := DBShard, latest := Latest0} = State0
 ) ->
-    {Timestamp, NLatest} = ensure_monotonic_timestamp(Since, Latest),
+    {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0),
     Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts),
-    NState = State#{latest := NLatest},
-    {NState, Result};
+    State = State0#{latest := Latest},
+    {State, Result};
 apply(
     _RaftMeta,
     #{?tag := drop_generation, ?generation := GenId},
@@ -730,17 +733,28 @@ apply(
     Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId),
     {State, Result};
 apply(
-    _RaftMeta,
+    #{index := RaftIdx},
     #{?tag := storage_event, ?payload := CustomEvent},
     #{db_shard := DBShard, latest := Latest0} = State
 ) ->
     {Timestamp, Latest} = ensure_monotonic_timestamp(emqx_ds:timestamp_us(), Latest0),
+    set_ts(DBShard, Latest),
+    ?tp(
+        debug,
+        emqx_ds_replication_layer_storage_event,
+        #{
+            shard => DBShard, ts => Timestamp, payload => CustomEvent
+        }
+    ),
     Effects = handle_custom_event(DBShard, Timestamp, CustomEvent),
     {State#{latest := Latest}, ok, Effects}.
 
 -spec tick(integer(), ra_state()) -> ra_machine:effects().
-tick(TimeMs, #{db_shard := DBShard, latest := Latest}) ->
+tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) ->
+    %% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard),
     {Timestamp, _} = assign_timestamp(timestamp_to_timeus(TimeMs), Latest),
+    ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, ts => Timestamp}),
+    set_ts(DBShard, Latest),
     handle_custom_event(DBShard, Timestamp, tick).
 
 assign_timestamps(Latest, Messages) ->
@@ -781,6 +795,11 @@ handle_custom_event(DBShard, Latest, Event) ->
         [{append, #{?tag => storage_event, ?payload => I}} || I <- Events]
     catch
         EC:Err:Stacktrace ->
-            logger:error(#{EC => Err, stacktrace => Stacktrace, msg => "ds_storage_layer_tick"}),
+            ?tp(error, ds_storage_custom_even_fail, #{
+                EC => Err, stacktrace => Stacktrace, event => Event
+            }),
             []
     end.
+
+set_ts({DB, Shard}, TS) ->
+    emqx_ds_builtin_sup:set_gvar(DB, ?gv_timestamp(Shard), TS).

+ 3 - 2
apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl

@@ -309,7 +309,8 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
     ClusterName = cluster_name(DB, Shard),
     LocalServer = local_server(DB, Shard),
     Servers = shard_servers(DB, Shard),
-    case ra:restart_server(DB, LocalServer) of
+    MutableConfig = #{tick_timeout => 100},
+    case ra:restart_server(DB, LocalServer, MutableConfig) of
         {error, name_not_registered} ->
             Bootstrap = true,
             Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
@@ -320,7 +321,7 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
                 ],
                 ReplicationOpts
             ),
-            ok = ra:start_server(DB, #{
+            ok = ra:start_server(DB, MutableConfig#{
                 id => LocalServer,
                 uid => server_uid(DB, Shard),
                 cluster_name => ClusterName,

+ 9 - 5
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -482,13 +482,17 @@ delete_next_until(
     end.
 
 handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) ->
-    %% Cause replication layer to bump timestamp when idle
     case ets:lookup(Gvars, ?IDLE_DETECT) of
-        [{?IDLE_DETECT, false, LastWrittenTs}] when
-            ?EPOCH(State, LastWrittenTs) > ?EPOCH(State, Time)
-        ->
+        [{?IDLE_DETECT, Latch, LastWrittenTs}] ->
+            ok;
+        [] ->
+            Latch = false,
+            LastWrittenTs = 0
+    end,
+    case Latch of
+        false when ?EPOCH(State, Time) > ?EPOCH(State, LastWrittenTs) ->
             ets:insert(Gvars, {?IDLE_DETECT, true, LastWrittenTs}),
-            [emqx_ds_storage_bitfield_lts_dummy_event];
+            [dummy_event];
         _ ->
             []
     end;

+ 7 - 0
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -287,6 +287,13 @@ get_streams(Shard, TopicFilter, StartTime) ->
             case generation_get(Shard, GenId) of
                 #{module := Mod, data := GenData} ->
                     Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime),
+                    ?tp(get_streams_get_gen_topic, #{
+                        gen_id => GenId,
+                        topic => TopicFilter,
+                        start_time => StartTime,
+                        streams => Streams,
+                        gen_data => GenData
+                    }),
                     [
                         {GenId, ?stream_v2(GenId, InnerStream)}
                      || InnerStream <- Streams

+ 298 - 109
apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl

@@ -21,10 +21,18 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("stdlib/include/assert.hrl").
--include_lib("snabbkaffe/include/test_macros.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(DB, testdb).
 
+-define(ON(NODE, BODY),
+    erpc:call(NODE, erlang, apply, [fun() -> BODY end, []])
+).
+
+-define(diff_opts, #{
+    context => 20, window => 1000, max_failures => 1000, compare_fun => fun message_eq/2
+}).
+
 opts() ->
     opts(#{}).
 
@@ -32,7 +40,8 @@ opts(Overrides) ->
     maps:merge(
         #{
             backend => builtin,
-            storage => {emqx_ds_storage_bitfield_lts, #{}},
+            %% storage => {emqx_ds_storage_reference, #{}},
+            storage => {emqx_ds_storage_bitfield_lts, #{epoch_bits => 1}},
             n_shards => 16,
             n_sites => 1,
             replication_factor => 3,
@@ -142,112 +151,140 @@ t_rebalance(init, Config) ->
 t_rebalance('end', Config) ->
     ok = emqx_cth_cluster:stop(?config(nodes, Config)).
 
+%% This testcase verifies that the storage rebalancing works correctly:
+%% 1. Join/leave operations are applied successfully.
+%% 2. Message data survives the rebalancing.
+%% 3. Shard cluster membership converges to the target replica allocation.
+%% 4. Replication factor is respected.
 t_rebalance(Config) ->
-    %% This testcase verifies that the storage rebalancing works correctly:
-    %% 1. Join/leave operations are applied successfully.
-    %% 2. Message data survives the rebalancing.
-    %% 3. Shard cluster membership converges to the target replica allocation.
-    %% 4. Replication factor is respected.
-
-    NMsgs = 800,
+    NMsgs = 50,
     NClients = 5,
+    %% List of fake client IDs:
+    Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)],
+    %% List of streams that generate messages for each "client" in its own topic:
+    TopicStreams = [
+        {ClientId, emqx_utils_stream:limit_length(NMsgs, topic_messages(?FUNCTION_NAME, ClientId))}
+     || ClientId <- Clients
+    ],
+    %% Interleaved list of events:
+    Stream = emqx_utils_stream:interleave([{2, Stream} || {_ClientId, Stream} <- TopicStreams]),
     Nodes = [N1, N2, N3, N4] = ?config(nodes, Config),
-
-    %% Initialize DB on the first node.
-    Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}),
-    ?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?DB, Opts])),
-    ?assertMatch(
-        Shards when length(Shards) == 16,
-        shards_online(N1, ?DB)
-    ),
-
-    %% Open DB on the rest of the nodes.
-    ?assertEqual(
-        [{ok, ok} || _ <- [N2, N3, N4]],
-        erpc:multicall([N2, N3, N4], emqx_ds, open_db, [?DB, Opts])
-    ),
-
-    Sites = [S1, S2 | _Rest] = [ds_repl_meta(N, this_site) || N <- Nodes],
-    ct:pal("Sites: ~p~n", [Sites]),
-
-    %% Only N1 should be responsible for all shards initially.
-    ?assertEqual(
-        [[S1] || _ <- Nodes],
-        [ds_repl_meta(N, db_sites, [?DB]) || N <- Nodes]
-    ),
-
-    %% Fill the storage with messages and few additional generations.
-    %% This will force shards to trigger snapshot transfers during rebalance.
-    ClientMessages = emqx_utils:pmap(
-        fun(CID) ->
-            N = lists:nth(1 + (CID rem length(Nodes)), Nodes),
-            fill_storage(N, ?DB, NMsgs, #{client_id => integer_to_binary(CID)})
+    ?check_trace(
+        #{timetrap => 30_000},
+        begin
+            %% 0. Inject schedulings to make sure the messages are
+            %% written to the storage before, during, and after
+            %% rebalance:
+            ?force_ordering(#{?snk_kind := test_push_message, n := 10}, #{
+                ?snk_kind := test_start_rebalance
+            }),
+            ?force_ordering(#{?snk_kind := test_start_rebalance}, #{
+                ?snk_kind := test_push_message, n := 20
+            }),
+            ?force_ordering(#{?snk_kind := test_end_rebalance}, #{
+                ?snk_kind := test_push_message, n := 30
+            }),
+            %% 1. Initialize DB on the first node.
+            Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}),
+
+            ?assertEqual(ok, ?ON(N1, emqx_ds:open_db(?DB, Opts))),
+            ?assertMatch(Shards when length(Shards) == 16, shards_online(N1, ?DB)),
+
+            %% 1.1 Open DB on the rest of the nodes:
+            [
+                ?assertEqual(ok, ?ON(Node, emqx_ds:open_db(?DB, Opts)))
+             || Node <- Nodes
+            ],
+
+            Sites = [S1, S2 | _] = [ds_repl_meta(N, this_site) || N <- Nodes],
+            ct:pal("Sites: ~p~n", [Sites]),
+
+            %% 1.2 Verify that all nodes have the same view of metadata storage:
+            [
+                ?defer_assert(
+                    ?assertEqual(
+                        [S1],
+                        ?ON(Node, emqx_ds_replication_layer_meta:db_sites(?DB)),
+                        #{
+                            msg => "Initially, only S1 should be responsible for all shards",
+                            node => Node
+                        }
+                    )
+                )
+             || Node <- Nodes
+            ],
+
+            %% 2. Start filling the storage:
+            spawn_link(
+                fun() ->
+                    NodeStream = emqx_utils_stream:repeat(emqx_utils_stream:list(Nodes)),
+                    apply_stream(?DB, NodeStream, Stream, 0)
+                end
+            ),
+
+            %% 3. Start rebalance in the meanwhile:
+            ?tp(test_start_rebalance, #{}),
+            %% 3.1 Join the second site to the DB replication sites.
+            ?assertEqual(ok, ?ON(N1, emqx_ds_replication_layer_meta:join_db_site(?DB, S2))),
+            %% Should be no-op.
+            ?assertEqual(ok, ?ON(N2, emqx_ds_replication_layer_meta:join_db_site(?DB, S2))),
+            ct:pal("Transitions (~p -> ~p): ~p~n", [[S1], [S1, S2], transitions(N1, ?DB)]),
+
+            ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))),
+
+            %% Now join the rest of the sites.
+            ?assertEqual(ok, ds_repl_meta(N2, assign_db_sites, [?DB, Sites])),
+            ct:pal("Transitions (~p -> ~p): ~p~n", [[S1, S2], Sites, transitions(N1, ?DB)]),
+
+            ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))),
+
+            ?tp(test_end_rebalance, #{}),
+            [
+                ?defer_assert(
+                    ?assertEqual(
+                        16 * 3 div length(Nodes),
+                        n_shards_online(Node, ?DB),
+                        "Each node is now responsible for 3/4 of the shards"
+                    )
+                )
+             || Node <- Nodes
+            ],
+
+            %% Verify that the set of shard servers matches the target allocation.
+            Allocation = [ds_repl_meta(N, my_shards, [?DB]) || N <- Nodes],
+            ShardServers = [
+                shard_server_info(N, ?DB, Shard, Site, readiness)
+             || {N, Site, Shards} <- lists:zip3(Nodes, Sites, Allocation),
+                Shard <- Shards
+            ],
+            ?assert(
+                lists:all(fun({_Server, Status}) -> Status == ready end, ShardServers),
+                ShardServers
+            ),
+
+            %% Verify that the messages are preserved after the rebalance:
+            ?block_until(#{?snk_kind := all_done}),
+            timer:sleep(5000),
+            verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams),
+
+            %% Scale down the cluster by removing the first node.
+            ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])),
+            ct:pal("Transitions (~p -> ~p): ~p~n", [Sites, tl(Sites), transitions(N1, ?DB)]),
+            ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))),
+
+            %% Verify that each node is now responsible for each shard.
+            ?defer_assert(
+                ?assertEqual(
+                    [0, 16, 16, 16],
+                    [n_shards_online(N, ?DB) || N <- Nodes]
+                )
+            ),
+
+            %% Verify that the messages are once again preserved after the rebalance:
+            verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams)
         end,
-        lists:seq(1, NClients),
-        infinity
-    ),
-    Messages1 = lists:sort(fun compare_message/2, lists:append(ClientMessages)),
-
-    %% Join the second site to the DB replication sites.
-    ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])),
-    %% Should be no-op.
-    ?assertEqual(ok, ds_repl_meta(N2, join_db_site, [?DB, S2])),
-    ct:pal("Transitions (~p -> ~p): ~p~n", [[S1], [S1, S2], transitions(N1, ?DB)]),
-
-    %% Fill in some more messages *during* the rebalance.
-    MessagesRB1 = fill_storage(N4, ?DB, NMsgs, #{client_id => <<"RB1">>}),
-
-    ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))),
-
-    %% Now join the rest of the sites.
-    ?assertEqual(ok, ds_repl_meta(N2, assign_db_sites, [?DB, Sites])),
-    ct:pal("Transitions (~p -> ~p): ~p~n", [[S1, S2], Sites, transitions(N1, ?DB)]),
-
-    %% Fill in some more messages *during* the rebalance.
-    MessagesRB2 = fill_storage(N4, ?DB, NMsgs, #{client_id => <<"RB2">>}),
-
-    ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))),
-
-    %% Verify that each node is now responsible for 3/4 of the shards.
-    ?assertEqual(
-        [(16 * 3) div length(Nodes) || _ <- Nodes],
-        [n_shards_online(N, ?DB) || N <- Nodes]
-    ),
-
-    %% Verify that the set of shard servers matches the target allocation.
-    Allocation = [ds_repl_meta(N, my_shards, [?DB]) || N <- Nodes],
-    ShardServers = [
-        shard_server_info(N, ?DB, Shard, Site, readiness)
-     || {N, Site, Shards} <- lists:zip3(Nodes, Sites, Allocation),
-        Shard <- Shards
-    ],
-    ?assert(
-        lists:all(fun({_Server, Status}) -> Status == ready end, ShardServers),
-        ShardServers
-    ),
-
-    %% Verify that the messages are preserved after the rebalance.
-    Messages = Messages1 ++ MessagesRB1 ++ MessagesRB2,
-    MessagesN4 = lists:sort(fun compare_message/2, consume(N4, ?DB, ['#'], 0)),
-    ?assertEqual(sample(20, Messages), sample(20, MessagesN4)),
-    ?assertEqual(Messages, MessagesN4),
-
-    %% Scale down the cluster by removing the first node.
-    ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])),
-    ct:pal("Transitions (~p -> ~p): ~p~n", [Sites, tl(Sites), transitions(N1, ?DB)]),
-
-    ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))),
-
-    %% Verify that each node is now responsible for each shard.
-    ?assertEqual(
-        [0, 16, 16, 16],
-        [n_shards_online(N, ?DB) || N <- Nodes]
-    ),
-
-    %% Verify that the messages are once again preserved after the rebalance.
-    MessagesN3 = lists:sort(fun compare_message/2, consume(N3, ?DB, ['#'], 0)),
-    ?assertEqual(sample(20, Messages), sample(20, MessagesN3)),
-    ?assertEqual(Messages, MessagesN3).
+        []
+    ).
 
 t_join_leave_errors(init, Config) ->
     Apps = [appspec(emqx_durable_storage)],
@@ -400,6 +437,9 @@ t_rebalance_chaotic_converges(Config) ->
         ds_repl_meta(N1, db_sites, [?DB])
     ),
 
+    %% Wait until the LTS timestamp is updated
+    timer:sleep(5000),
+
     %% Check that all messages are still there.
     Messages = lists:append(TransitionMessages) ++ Messages0,
     MessagesDB = lists:sort(fun compare_message/2, consume(N1, ?DB, ['#'], 0)),
@@ -502,14 +542,16 @@ fill_storage(Node, DB, NMsgs, Opts) ->
 fill_storage(Node, DB, NMsgs, I, Opts) when I < NMsgs ->
     PAddGen = maps:get(p_addgen, Opts, 0.001),
     R1 = push_message(Node, DB, I, Opts),
-    R2 = probably(PAddGen, fun() -> add_generation(Node, DB) end),
+    %probably(PAddGen, fun() -> add_generation(Node, DB) end),
+    R2 = [],
     R1 ++ R2 ++ fill_storage(Node, DB, NMsgs, I + 1, Opts);
 fill_storage(_Node, _DB, NMsgs, NMsgs, _Opts) ->
     [].
 
 push_message(Node, DB, I, Opts) ->
     Topic = emqx_topic:join([<<"topic">>, <<"foo">>, integer_to_binary(I)]),
-    {Bytes, _} = rand:bytes_s(120, rand:seed_s(default, I)),
+    %% {Bytes, _} = rand:bytes_s(5, rand:seed_s(default, I)),
+    Bytes = integer_to_binary(I),
     ClientId = maps:get(client_id, Opts, <<?MODULE_STRING>>),
     Message = message(ClientId, Topic, Bytes, I * 100),
     ok = erpc:call(Node, emqx_ds, store_batch, [DB, [Message], #{sync => true}]),
@@ -545,9 +587,14 @@ probably(P, Fun) ->
 
 sample(N, List) ->
     L = length(List),
-    H = N div 2,
-    Filler = integer_to_list(L - N) ++ " more",
-    lists:sublist(List, H) ++ [Filler] ++ lists:sublist(List, L - H, L).
+    case L =< N of
+        true ->
+            L;
+        false ->
+            H = N div 2,
+            Filler = integer_to_list(L - N) ++ " more",
+            lists:sublist(List, H) ++ [Filler] ++ lists:sublist(List, L - H, L)
+    end.
 
 %%
 
@@ -563,3 +610,145 @@ init_per_testcase(TCName, Config0) ->
 end_per_testcase(TCName, Config) ->
     ok = snabbkaffe:stop(),
     emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config).
+
+without_extra(L) ->
+    [I#message{extra = #{}} || I <- L].
+
+%% Consume data from the DS storage on a given node as a stream:
+-type ds_stream() :: emqx_utils_stream:stream({emqx_ds:message_key(), emqx_types:message()}).
+
+%% Create a stream from the topic (wildcards are NOT supported for a
+%% good reason: order of messages is implementation-dependent!):
+-spec ds_topic_stream(binary(), binary(), node()) -> ds_stream().
+ds_topic_stream(ClientId, TopicBin, Node) ->
+    Topic = emqx_topic:words(TopicBin),
+    Shard = shard_of_clientid(Node, ClientId),
+    {ShardId, DSStreams} =
+        ?ON(
+            Node,
+            begin
+                DBShard = {?DB, Shard},
+                {DBShard, emqx_ds_storage_layer:get_streams(DBShard, Topic, 0)}
+            end
+        ),
+    %% Sort streams by their rank Y, and chain them together:
+    emqx_utils_stream:chain([
+        ds_topic_generation_stream(Node, ShardId, Topic, S)
+     || {_RankY, S} <- lists:sort(DSStreams)
+    ]).
+
+%% Note: produces messages with keys
+ds_topic_generation_stream(Node, Shard, Topic, Stream) ->
+    {ok, Iterator} = ?ON(
+        Node,
+        emqx_ds_storage_layer:make_iterator(Shard, Stream, Topic, 0)
+    ),
+    do_ds_topic_generation_stream(Node, Shard, Iterator).
+
+do_ds_topic_generation_stream(Node, Shard, It0) ->
+    Now = 99999999999999999999,
+    fun() ->
+        case ?ON(Node, emqx_ds_storage_layer:next(Shard, It0, 1, Now)) of
+            {ok, It, []} ->
+                [];
+            {ok, It, [KeyMsg]} ->
+                [KeyMsg | do_ds_topic_generation_stream(Node, Shard, It)]
+        end
+    end.
+
+%% Payload generation:
+
+apply_stream(DB, NodeStream0, Stream0, N) ->
+    case emqx_utils_stream:next(Stream0) of
+        [] ->
+            ?tp(all_done, #{});
+        [Msg = #message{from = From} | Stream] ->
+            [Node | NodeStream] = emqx_utils_stream:next(NodeStream0),
+            ?tp(
+                test_push_message,
+                maps:merge(
+                    emqx_message:to_map(Msg),
+                    #{n => N}
+                )
+            ),
+            ?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})),
+            apply_stream(DB, NodeStream, Stream, N + 1)
+    end.
+
+%% @doc Create an infinite list of messages from a given client:
+topic_messages(TestCase, ClientId) ->
+    topic_messages(TestCase, ClientId, 0).
+
+topic_messages(TestCase, ClientId, N) ->
+    fun() ->
+        Msg = #message{
+            from = ClientId,
+            topic = client_topic(TestCase, ClientId),
+            timestamp = N * 100,
+            payload = integer_to_binary(N)
+        },
+        [Msg | topic_messages(TestCase, ClientId, N + 1)]
+    end.
+
+client_topic(TestCase, ClientId) when is_atom(TestCase) ->
+    client_topic(atom_to_binary(TestCase, utf8), ClientId);
+client_topic(TestCase, ClientId) when is_binary(TestCase) ->
+    <<TestCase/binary, "/", ClientId/binary>>.
+
+message_eq(Msg1, {Key, Msg2}) ->
+    %% Timestamps can be modified by the replication layer, ignore them:
+    Msg1#message{timestamp = 0} =:= Msg2#message{timestamp = 0}.
+
+%% Stream comparison:
+
+-spec verify_stream_effects(binary(), [node()], [{emqx_types:clientid(), ds_stream()}]) -> ok.
+verify_stream_effects(TestCase, Nodes0, L) ->
+    lists:foreach(
+        fun({ClientId, Stream}) ->
+            Nodes = nodes_of_clientid(ClientId, Nodes0),
+            ct:pal("Nodes allocated for client ~p: ~p", [ClientId, Nodes]),
+            ?defer_assert(
+                ?assertMatch([_ | _], Nodes, ["No nodes have been allocated for ", ClientId])
+            ),
+            [verify_stream_effects(TestCase, Node, ClientId, Stream) || Node <- Nodes]
+        end,
+        L
+    ).
+
+-spec verify_stream_effects(binary(), node(), emqx_types:clientid(), ds_stream()) -> ok.
+verify_stream_effects(TestCase, Node, ClientId, ExpectedStream) ->
+    ct:pal("Checking consistency of effects for ~p on ~p", [ClientId, Node]),
+    ?defer_assert(
+        begin
+            snabbkaffe_diff:assert_lists_eq(
+                ExpectedStream,
+                ds_topic_stream(ClientId, client_topic(TestCase, ClientId), Node),
+                ?diff_opts#{comment => #{clientid => ClientId, node => Node}}
+            ),
+            ct:pal("Data for client ~p on ~p is consistent.", [ClientId, Node])
+        end
+    ).
+
+%% Find which nodes from the list contain the shards for the given
+%% client ID:
+nodes_of_clientid(ClientId, Nodes = [N0 | _]) ->
+    Shard = shard_of_clientid(N0, ClientId),
+    SiteNodes = ?ON(
+        N0,
+        begin
+            Sites = emqx_ds_replication_layer_meta:replica_set(?DB, Shard),
+            lists:map(fun emqx_ds_replication_layer_meta:node/1, Sites)
+        end
+    ),
+    lists:filter(
+        fun(N) ->
+            lists:member(N, SiteNodes)
+        end,
+        Nodes
+    ).
+
+shard_of_clientid(Node, ClientId) ->
+    ?ON(
+        Node,
+        emqx_ds_replication_layer:shard_of_message(?DB, #message{from = ClientId}, clientid)
+    ).

+ 2 - 0
apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl

@@ -18,6 +18,8 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-include_lib("emqx_utils/include/emqx_message.hrl").
+
 %% RPC mocking
 
 mock_rpc() ->

+ 63 - 1
apps/emqx_utils/src/emqx_utils_stream.erl

@@ -23,8 +23,11 @@
     mqueue/1,
     map/2,
     transpose/1,
+    chain/1,
     chain/2,
-    repeat/1
+    repeat/1,
+    interleave/1,
+    limit_length/2
 ]).
 
 %% Evaluating
@@ -118,6 +121,11 @@ transpose_tail(S, Tail) ->
         end
     end.
 
+%% @doc Make a stream by concatenating multiple streams.
+-spec chain([stream(X)]) -> stream(X).
+chain(L) ->
+    lists:foldl(fun chain/2, empty(), L).
+
 %% @doc Make a stream by chaining (concatenating) two streams.
 %% The second stream begins to produce values only after the first one is exhausted.
 -spec chain(stream(X), stream(Y)) -> stream(X | Y).
@@ -144,6 +152,41 @@ repeat(S) ->
         end
     end.
 
+%% @doc Interleave the elements of the streams.
+%%
+%% This function accepts a list of tuples where the first element
+%% specifies size of the "batch" to be consumed from the stream at a
+%% time (stream is the second tuple element). If element of the list
+%% is a plain stream, then the batch size is assumed to be 1.
+-spec interleave([stream(X) | {non_neg_integer(), stream(X)}]) -> stream(X).
+interleave(L0) ->
+    L = lists:map(
+        fun
+            (Stream) when is_function(Stream) ->
+                {1, Stream};
+            (A = {N, _}) when N >= 0 ->
+                A
+        end,
+        L0
+    ),
+    fun() ->
+        do_interleave(0, L, [])
+    end.
+
+%% @doc Truncate list to the given length
+-spec limit_length(non_neg_integer(), stream(X)) -> stream(X).
+limit_length(0, _) ->
+    fun() -> [] end;
+limit_length(N, S) when N >= 0 ->
+    fun() ->
+        case next(S) of
+            [] ->
+                [];
+            [X | S1] ->
+                [X | limit_length(N - 1, S1)]
+        end
+    end.
+
 %%
 
 %% @doc Produce the next value from the stream.
@@ -237,3 +280,22 @@ csv_read_line([Line | Lines]) ->
     {Fields, Lines};
 csv_read_line([]) ->
     eof.
+
+do_interleave(_, [], []) ->
+    [];
+do_interleave(N, [{N, S} | Rest], Rev) ->
+    do_interleave(0, Rest, [{N, S} | Rev]);
+do_interleave(_, [], Rev) ->
+    do_interleave(0, lists:reverse(Rev), []);
+do_interleave(I, [{N, S} | Rest], Rev) when I < N ->
+    case next(S) of
+        [] ->
+            do_interleave(0, Rest, Rev);
+        [X | S1] ->
+            [
+                X
+                | fun() ->
+                    do_interleave(I + 1, [{N, S1} | Rest], Rev)
+                end
+            ]
+    end.

+ 8 - 0
apps/emqx_utils/test/emqx_utils_stream_tests.erl

@@ -157,6 +157,14 @@ mqueue_test() ->
         emqx_utils_stream:consume(emqx_utils_stream:mqueue(400))
     ).
 
+interleave_test() ->
+    S1 = emqx_utils_stream:list([1, 2, 3]),
+    S2 = emqx_utils_stream:list([a, b, c, d]),
+    ?assertEqual(
+        [1, 2, a, b, 3, c, d],
+        emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}]))
+    ).
+
 csv_test() ->
     Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>,
     ?assertEqual(