Преглед изворни кода

test(ds): Use streams to fill the storage

ieQu1 пре 1 година
родитељ
комит
63e51fca66

+ 183 - 216
apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl

@@ -30,7 +30,7 @@
 ).
 
 -define(diff_opts, #{
-    context => 20, window => 1000, max_failures => 1000, compare_fun => fun message_eq/2
+    context => 20, window => 1000, compare_fun => fun message_eq/2
 }).
 
 opts() ->
@@ -76,64 +76,59 @@ t_replication_transfers_snapshots('end', Config) ->
     ok = emqx_cth_cluster:stop(?config(nodes, Config)).
 
 t_replication_transfers_snapshots(Config) ->
-    NMsgs = 4000,
+    NMsgs = 400,
+    NClients = 5,
+    {Stream, TopicStreams} = interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs),
+
     Nodes = [Node, NodeOffline | _] = ?config(nodes, Config),
     _Specs = [_, SpecOffline | _] = ?config(specs, Config),
+    ?check_trace(
+        begin
+            %% Initialize DB on all nodes and wait for it to be online.
+            Opts = opts(#{n_shards => 1, n_sites => 3}),
+            ?assertEqual(
+                [{ok, ok} || _ <- Nodes],
+                erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
+            ),
+            ?retry(
+                500,
+                10,
+                ?assertMatch([[_], [_], [_]], [shards_online(N, ?DB) || N <- Nodes])
+            ),
 
-    %% Initialize DB on all nodes and wait for it to be online.
-    Opts = opts(#{n_shards => 1, n_sites => 3}),
-    ?assertEqual(
-        [{ok, ok} || _ <- Nodes],
-        erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
-    ),
-    ?retry(
-        500,
-        10,
-        ?assertMatch([[_], [_], [_]], [shards_online(N, ?DB) || N <- Nodes])
-    ),
-
-    %% Stop the DB on the "offline" node.
-    ok = emqx_cth_cluster:stop_node(NodeOffline),
+            %% Stop the DB on the "offline" node.
+            ok = emqx_cth_cluster:stop_node(NodeOffline),
 
-    %% Fill the storage with messages and few additional generations.
-    Messages = fill_storage(Node, ?DB, NMsgs, #{p_addgen => 0.01}),
+            %% Fill the storage with messages and few additional generations.
+            apply_stream(?DB, Nodes -- [NodeOffline], Stream),
 
-    %% Restart the node.
-    [NodeOffline] = emqx_cth_cluster:restart(SpecOffline),
-    {ok, SRef} = snabbkaffe:subscribe(
-        ?match_event(#{
-            ?snk_kind := dsrepl_snapshot_accepted,
-            ?snk_meta := #{node := NodeOffline}
-        })
-    ),
-    ?assertEqual(
-        ok,
-        erpc:call(NodeOffline, emqx_ds, open_db, [?DB, opts()])
-    ),
+            %% Restart the node.
+            [NodeOffline] = emqx_cth_cluster:restart(SpecOffline),
+            {ok, SRef} = snabbkaffe:subscribe(
+                ?match_event(#{
+                    ?snk_kind := dsrepl_snapshot_accepted,
+                    ?snk_meta := #{node := NodeOffline}
+                })
+            ),
+            ?assertEqual(
+                ok,
+                erpc:call(NodeOffline, emqx_ds, open_db, [?DB, opts()])
+            ),
 
-    %% Trigger storage operation and wait the replica to be restored.
-    _ = add_generation(Node, ?DB),
-    ?assertMatch(
-        {ok, _},
-        snabbkaffe:receive_events(SRef)
-    ),
+            %% Trigger storage operation and wait the replica to be restored.
+            _ = add_generation(Node, ?DB),
+            ?assertMatch(
+                {ok, _},
+                snabbkaffe:receive_events(SRef)
+            ),
 
-    %% Wait until any pending replication activities are finished (e.g. Raft log entries).
-    ok = timer:sleep(3_000),
+            %% Wait until any pending replication activities are finished (e.g. Raft log entries).
+            ok = timer:sleep(3_000),
 
-    %% Check that the DB has been restored.
-    Shard = hd(shards(NodeOffline, ?DB)),
-    MessagesOffline = lists:keysort(
-        #message.timestamp,
-        consume_shard(NodeOffline, ?DB, Shard, ['#'], 0)
-    ),
-    ?assertEqual(
-        sample(40, Messages),
-        sample(40, MessagesOffline)
-    ),
-    ?assertEqual(
-        Messages,
-        MessagesOffline
+            %% Check that the DB has been restored:
+            verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams)
+        end,
+        []
     ).
 
 t_rebalance(init, Config) ->
@@ -159,50 +154,13 @@ t_rebalance('end', Config) ->
 t_rebalance(Config) ->
     NMsgs = 50,
     NClients = 5,
-    %% List of fake client IDs:
-    Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)],
-    %% List of streams that generate messages for each "client" in its own topic:
-    TopicStreams = [
-        {ClientId, emqx_utils_stream:limit_length(NMsgs, topic_messages(?FUNCTION_NAME, ClientId))}
-     || ClientId <- Clients
-    ],
-    %% Interleaved list of events:
-    Stream0 = emqx_utils_stream:interleave(
-        [{2, Stream} || {_ClientId, Stream} <- TopicStreams], true
-    ),
-    Stream = emqx_utils_stream:interleave(
-        [
-            {50, Stream0},
-            emqx_utils_stream:const(add_generation)
-        ],
-        false
-    ),
-    Nodes = [N1, N2, N3, N4] = ?config(nodes, Config),
+    {Stream0, TopicStreams} = interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs),
+    Nodes = [N1, N2 | _] = ?config(nodes, Config),
     ?check_trace(
         #{timetrap => 30_000},
         begin
-            %% 0. Inject schedulings to make sure the messages are
-            %% written to the storage before, during, and after
-            %% rebalance:
-            ?force_ordering(
-                #{?snk_kind := test_push_message, n := 10},
-                #{?snk_kind := test_start_rebalance}
-            ),
-            ?force_ordering(
-                #{?snk_kind := test_start_rebalance1},
-                #{?snk_kind := test_push_message, n := 20}
-            ),
-            ?force_ordering(
-                #{?snk_kind := test_push_message, n := 30},
-                #{?snk_kind := test_start_rebalance2}
-            ),
-            ?force_ordering(
-                #{?snk_kind := test_end_rebalance},
-                #{?snk_kind := test_push_message, n := 40}
-            ),
             %% 1. Initialize DB on the first node.
             Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}),
-
             ?assertEqual(ok, ?ON(N1, emqx_ds:open_db(?DB, Opts))),
             ?assertMatch(Shards when length(Shards) == 16, shards_online(N1, ?DB)),
 
@@ -215,6 +173,22 @@ t_rebalance(Config) ->
             Sites = [S1, S2 | _] = [ds_repl_meta(N, this_site) || N <- Nodes],
             ct:pal("Sites: ~p~n", [Sites]),
 
+            Sequence = [
+                %% Join the second site to the DB replication sites:
+                {N1, join_db_site, S2},
+                %% Should be a no-op:
+                {N2, join_db_site, S2},
+                %% Now join the rest of the sites:
+                {N2, assign_db_sites, Sites}
+            ],
+            Stream = emqx_utils_stream:interleave(
+                [
+                    {50, Stream0},
+                    emqx_utils_stream:list(Sequence)
+                ],
+                true
+            ),
+
             %% 1.2 Verify that all nodes have the same view of metadata storage:
             [
                 ?defer_assert(
@@ -231,31 +205,9 @@ t_rebalance(Config) ->
             ],
 
             %% 2. Start filling the storage:
-            spawn_link(
-                fun() ->
-                    NodeStream = emqx_utils_stream:repeat(emqx_utils_stream:list(Nodes)),
-                    apply_stream(?DB, NodeStream, Stream, 0)
-                end
-            ),
-
-            %% 3. Start rebalance in the meanwhile:
-            ?tp(test_start_rebalance1, #{}),
-            %% 3.1 Join the second site to the DB replication sites.
-            ?assertEqual(ok, ?ON(N1, emqx_ds_replication_layer_meta:join_db_site(?DB, S2))),
-            %% Should be no-op.
-            ?assertEqual(ok, ?ON(N2, emqx_ds_replication_layer_meta:join_db_site(?DB, S2))),
-            ct:pal("Transitions (~p -> ~p): ~p~n", [[S1], [S1, S2], transitions(N1, ?DB)]),
-
-            ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))),
-
-            ?tp(test_start_rebalance2, #{}),
-            %% Now join the rest of the sites.
-            ?assertEqual(ok, ds_repl_meta(N2, assign_db_sites, [?DB, Sites])),
-            ct:pal("Transitions (~p -> ~p): ~p~n", [[S1, S2], Sites, transitions(N1, ?DB)]),
-
-            ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))),
-
-            ?tp(test_end_rebalance, #{}),
+            apply_stream(?DB, Nodes, Stream),
+            timer:sleep(5000),
+            verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams),
             [
                 ?defer_assert(
                     ?assertEqual(
@@ -279,17 +231,12 @@ t_rebalance(Config) ->
                 ShardServers
             ),
 
-            %% Verify that the messages are preserved after the rebalance:
-            ?block_until(#{?snk_kind := all_done}),
-            timer:sleep(5000),
-            verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams),
-
             %% Scale down the cluster by removing the first node.
             ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])),
             ct:pal("Transitions (~p -> ~p): ~p~n", [Sites, tl(Sites), transitions(N1, ?DB)]),
             ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))),
 
-            %% Verify that each node is now responsible for each shard.
+            %% Verify that at the end each node is now responsible for each shard.
             ?defer_assert(
                 ?assertEqual(
                     [0, 16, 16, 16],
@@ -387,81 +334,77 @@ t_rebalance_chaotic_converges(Config) ->
     NMsgs = 500,
     Nodes = [N1, N2, N3] = ?config(nodes, Config),
 
-    %% Initialize DB on first two nodes.
-    Opts = opts(#{n_shards => 16, n_sites => 2, replication_factor => 3}),
-    ?assertEqual(
-        [{ok, ok}, {ok, ok}],
-        erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts])
-    ),
+    NClients = 5,
+    {Stream0, TopicStreams} = interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs),
 
-    %% Open DB on the last node.
-    ?assertEqual(
-        ok,
-        erpc:call(N3, emqx_ds, open_db, [?DB, Opts])
-    ),
+    ?check_trace(
+        #{},
+        begin
+            %% Initialize DB on first two nodes.
+            Opts = opts(#{n_shards => 16, n_sites => 2, replication_factor => 3}),
 
-    %% Find out which sites there are.
-    Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes],
-    ct:pal("Sites: ~p~n", [Sites]),
+            ?assertEqual(
+                [{ok, ok}, {ok, ok}],
+                erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts])
+            ),
 
-    %% Initially, the DB is assigned to [S1, S2].
-    ?retry(500, 10, ?assertEqual([16, 16], [n_shards_online(N, ?DB) || N <- [N1, N2]])),
-    ?assertEqual(
-        lists:sort([S1, S2]),
-        ds_repl_meta(N1, db_sites, [?DB])
-    ),
+            %% Open DB on the last node.
+            ?assertEqual(
+                ok,
+                erpc:call(N3, emqx_ds, open_db, [?DB, Opts])
+            ),
 
-    %% Fill the storage with messages and few additional generations.
-    Messages0 = lists:append([
-        fill_storage(N1, ?DB, NMsgs, #{client_id => <<"C1">>}),
-        fill_storage(N2, ?DB, NMsgs, #{client_id => <<"C2">>}),
-        fill_storage(N3, ?DB, NMsgs, #{client_id => <<"C3">>})
-    ]),
-
-    %% Construct a chaotic transition sequence that changes assignment to [S2, S3].
-    Sequence = [
-        {N1, join_db_site, S3},
-        {N2, leave_db_site, S2},
-        {N3, leave_db_site, S1},
-        {N1, join_db_site, S2},
-        {N2, join_db_site, S1},
-        {N3, leave_db_site, S3},
-        {N1, leave_db_site, S1},
-        {N2, join_db_site, S3}
-    ],
+            %% Find out which sites there are.
+            Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes],
+            ct:pal("Sites: ~p~n", [Sites]),
 
-    %% Apply the sequence while also filling the storage with messages.
-    TransitionMessages = lists:map(
-        fun({N, Operation, Site}) ->
-            %% Apply the transition.
-            ?assertEqual(ok, ds_repl_meta(N, Operation, [?DB, Site])),
-            %% Give some time for at least one transition to complete.
-            Transitions = transitions(N, ?DB),
-            ct:pal("Transitions after ~p: ~p", [Operation, Transitions]),
-            ?retry(200, 10, ?assertNotEqual(Transitions, transitions(N, ?DB))),
-            %% Fill the storage with messages.
-            CID = integer_to_binary(erlang:system_time()),
-            fill_storage(N, ?DB, NMsgs, #{client_id => CID})
-        end,
-        Sequence
-    ),
+            Sequence = [
+                {N1, join_db_site, S3},
+                {N2, leave_db_site, S2},
+                {N3, leave_db_site, S1},
+                {N1, join_db_site, S2},
+                {N2, join_db_site, S1},
+                {N3, leave_db_site, S3},
+                {N1, leave_db_site, S1},
+                {N2, join_db_site, S3}
+            ],
 
-    %% Wait for the last transition to complete.
-    ?retry(500, 20, ?assertEqual([], transitions(N1, ?DB))),
+            %% Interleaved list of events:
+            Stream = emqx_utils_stream:interleave(
+                [
+                    {50, Stream0},
+                    emqx_utils_stream:list(Sequence)
+                ],
+                true
+            ),
 
-    ?assertEqual(
-        lists:sort([S2, S3]),
-        ds_repl_meta(N1, db_sites, [?DB])
-    ),
+            ?retry(500, 10, ?assertEqual([16, 16], [n_shards_online(N, ?DB) || N <- [N1, N2]])),
+            ?assertEqual(
+                lists:sort([S1, S2]),
+                ds_repl_meta(N1, db_sites, [?DB]),
+                "Initially, the DB is assigned to [S1, S2]"
+            ),
 
-    %% Wait until the LTS timestamp is updated
-    timer:sleep(5000),
+            apply_stream(?DB, Nodes, Stream),
 
-    %% Check that all messages are still there.
-    Messages = lists:append(TransitionMessages) ++ Messages0,
-    MessagesDB = lists:sort(fun compare_message/2, consume(N1, ?DB, ['#'], 0)),
-    ?assertEqual(sample(20, Messages), sample(20, MessagesDB)),
-    ?assertEqual(Messages, MessagesDB).
+            %% Wait for the last transition to complete.
+            ?retry(500, 20, ?assertEqual([], transitions(N1, ?DB))),
+
+            ?defer_assert(
+                ?assertEqual(
+                    lists:sort([S2, S3]),
+                    ds_repl_meta(N1, db_sites, [?DB])
+                )
+            ),
+
+            %% Wait until the LTS timestamp is updated:
+            timer:sleep(5000),
+
+            %% Check that all messages are still there.
+            verify_stream_effects(?FUNCTION_NAME, Nodes, TopicStreams)
+        end,
+        []
+    ).
 
 t_rebalance_offline_restarts(init, Config) ->
     Apps = [appspec(emqx_durable_storage)],
@@ -535,7 +478,15 @@ ds_repl_meta(Node, Fun) ->
     ds_repl_meta(Node, Fun, []).
 
 ds_repl_meta(Node, Fun, Args) ->
-    erpc:call(Node, emqx_ds_replication_layer_meta, Fun, Args).
+    try
+        erpc:call(Node, emqx_ds_replication_layer_meta, Fun, Args)
+    catch
+        EC:Err:Stack ->
+            ct:pal("emqx_ds_replication_layer_meta:~p(~p) @~p failed:~n~p:~p~nStack: ~p", [
+                Fun, Args, Node, EC, Err, Stack
+            ]),
+            error(meta_op_failed)
+    end.
 
 ds_repl_shard(Node, Fun, Args) ->
     erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args).
@@ -553,27 +504,6 @@ shards_online(Node, DB) ->
 n_shards_online(Node, DB) ->
     length(shards_online(Node, DB)).
 
-fill_storage(Node, DB, NMsgs, Opts) ->
-    fill_storage(Node, DB, NMsgs, 0, Opts).
-
-fill_storage(Node, DB, NMsgs, I, Opts) when I < NMsgs ->
-    PAddGen = maps:get(p_addgen, Opts, 0.001),
-    R1 = push_message(Node, DB, I, Opts),
-    %probably(PAddGen, fun() -> add_generation(Node, DB) end),
-    R2 = [],
-    R1 ++ R2 ++ fill_storage(Node, DB, NMsgs, I + 1, Opts);
-fill_storage(_Node, _DB, NMsgs, NMsgs, _Opts) ->
-    [].
-
-push_message(Node, DB, I, Opts) ->
-    Topic = emqx_topic:join([<<"topic">>, <<"foo">>, integer_to_binary(I)]),
-    %% {Bytes, _} = rand:bytes_s(5, rand:seed_s(default, I)),
-    Bytes = integer_to_binary(I),
-    ClientId = maps:get(client_id, Opts, <<?MODULE_STRING>>),
-    Message = message(ClientId, Topic, Bytes, I * 100),
-    ok = erpc:call(Node, emqx_ds, store_batch, [DB, [Message], #{sync => true}]),
-    [Message].
-
 add_generation(Node, DB) ->
     ok = erpc:call(Node, emqx_ds, add_generation, [DB]),
     [].
@@ -674,7 +604,7 @@ do_ds_topic_generation_stream(Node, Shard, It0) ->
                 end
             )
         of
-            {ok, It, []} ->
+            {ok, _It, []} ->
                 [];
             {ok, end_of_stream} ->
                 [];
@@ -685,11 +615,19 @@ do_ds_topic_generation_stream(Node, Shard, It0) ->
 
 %% Payload generation:
 
+apply_stream(DB, Nodes, Stream) ->
+    apply_stream(
+        DB,
+        emqx_utils_stream:repeat(emqx_utils_stream:list(Nodes)),
+        Stream,
+        0
+    ).
+
 apply_stream(DB, NodeStream0, Stream0, N) ->
     case emqx_utils_stream:next(Stream0) of
         [] ->
             ?tp(all_done, #{});
-        [Msg = #message{from = From} | Stream] ->
+        [Msg = #message{} | Stream] ->
             [Node | NodeStream] = emqx_utils_stream:next(NodeStream0),
             ?tp(
                 test_push_message,
@@ -701,12 +639,40 @@ apply_stream(DB, NodeStream0, Stream0, N) ->
             ?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})),
             apply_stream(DB, NodeStream, Stream, N + 1);
         [add_generation | Stream] ->
-            [Node | NodeStream] = emqx_utils_stream:next(NodeStream0),
+            %% FIXME:
+            [_Node | NodeStream] = emqx_utils_stream:next(NodeStream0),
             %% add_generation(Node, DB),
-            apply_stream(DB, NodeStream, Stream, N)
+            apply_stream(DB, NodeStream, Stream, N);
+        [{Node, Operation, Arg} | Stream] when
+            Operation =:= join_db_site; Operation =:= leave_db_site; Operation =:= assign_db_sites
+        ->
+            ?tp(notice, test_apply_operation, #{node => Node, operation => Operation, arg => Arg}),
+            %% Apply the transition.
+            ?assertEqual(ok, ds_repl_meta(Node, Operation, [DB, Arg])),
+            %% Give some time for at least one transition to complete.
+            Transitions = transitions(Node, ?DB),
+            ct:pal("Transitions after ~p: ~p", [Operation, Transitions]),
+            ?retry(200, 10, ?assertNotEqual(Transitions, transitions(Node, DB))),
+            apply_stream(DB, NodeStream0, Stream, N);
+        [Fun | Stream] when is_function(Fun) ->
+            Fun(),
+            apply_stream(DB, NodeStream0, Stream, N)
     end.
 
 %% @doc Create an infinite list of messages from a given client:
+interleaved_topic_messages(TestCase, NClients, NMsgs) ->
+    %% List of fake client IDs:
+    Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)],
+    TopicStreams = [
+        {ClientId, emqx_utils_stream:limit_length(NMsgs, topic_messages(TestCase, ClientId))}
+     || ClientId <- Clients
+    ],
+    %% Interleaved stream of messages:
+    Stream = emqx_utils_stream:interleave(
+        [{2, Stream} || {_ClientId, Stream} <- TopicStreams], true
+    ),
+    {Stream, TopicStreams}.
+
 topic_messages(TestCase, ClientId) ->
     topic_messages(TestCase, ClientId, 0).
 
@@ -726,7 +692,7 @@ client_topic(TestCase, ClientId) when is_atom(TestCase) ->
 client_topic(TestCase, ClientId) when is_binary(TestCase) ->
     <<TestCase/binary, "/", ClientId/binary>>.
 
-message_eq(Msg1, {Key, Msg2}) ->
+message_eq(Msg1, {_Key, Msg2}) ->
     %% Timestamps can be modified by the replication layer, ignore them:
     Msg1#message{timestamp = 0} =:= Msg2#message{timestamp = 0}.
 
@@ -734,7 +700,7 @@ message_eq(Msg1, {Key, Msg2}) ->
 
 -spec verify_stream_effects(binary(), [node()], [{emqx_types:clientid(), ds_stream()}]) -> ok.
 verify_stream_effects(TestCase, Nodes0, L) ->
-    lists:foreach(
+    Checked = lists:flatmap(
         fun({ClientId, Stream}) ->
             Nodes = nodes_of_clientid(ClientId, Nodes0),
             ct:pal("Nodes allocated for client ~p: ~p", [ClientId, Nodes]),
@@ -744,7 +710,8 @@ verify_stream_effects(TestCase, Nodes0, L) ->
             [verify_stream_effects(TestCase, Node, ClientId, Stream) || Node <- Nodes]
         end,
         L
-    ).
+    ),
+    ?defer_assert(?assertMatch([_ | _], Checked, "Some messages have been verified")).
 
 -spec verify_stream_effects(binary(), node(), emqx_types:clientid(), ds_stream()) -> ok.
 verify_stream_effects(TestCase, Node, ClientId, ExpectedStream) ->

+ 1 - 1
apps/emqx_utils/test/emqx_utils_stream_tests.erl

@@ -162,7 +162,7 @@ interleave_test() ->
     S2 = emqx_utils_stream:list([a, b, c, d]),
     ?assertEqual(
         [1, 2, a, b, 3, c, d],
-        emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}]))
+        emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}], true))
     ).
 
 csv_test() ->