|
|
@@ -131,7 +131,6 @@ t_replication_transfers_snapshots(Config) ->
|
|
|
%% Initialize DB on all nodes and wait for it to be online.
|
|
|
Opts = opts(Config, #{n_shards => 1, n_sites => 3}),
|
|
|
assert_db_open(Nodes, ?DB, Opts),
|
|
|
- assert_db_stable(Nodes, ?DB),
|
|
|
|
|
|
%% Stop the DB on the "offline" node.
|
|
|
?wait_async_action(
|
|
|
@@ -207,7 +206,6 @@ t_rebalance(Config) ->
|
|
|
%% 1. Initialize DB on the first node.
|
|
|
Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}),
|
|
|
assert_db_open(Nodes, ?DB, Opts),
|
|
|
- assert_db_stable(Nodes, ?DB),
|
|
|
|
|
|
%% 1.1 Kick all sites except S1 from the replica set as
|
|
|
%% the initial condition:
|
|
|
@@ -419,7 +417,6 @@ t_rebalance_chaotic_converges(Config) ->
|
|
|
|
|
|
%% Open DB:
|
|
|
assert_db_open(Nodes, ?DB, Opts),
|
|
|
- assert_db_stable(Nodes, ?DB),
|
|
|
|
|
|
%% Kick N3 from the replica set as the initial condition:
|
|
|
?assertMatch(
|
|
|
@@ -503,7 +500,6 @@ t_rebalance_offline_restarts(Config) ->
|
|
|
%% Initialize DB on all 3 nodes.
|
|
|
Opts = opts(Config, #{n_shards => 8, n_sites => 3, replication_factor => 3}),
|
|
|
assert_db_open(Nodes, ?DB, Opts),
|
|
|
- assert_db_stable(Nodes, ?DB),
|
|
|
|
|
|
?retry(
|
|
|
1000,
|
|
|
@@ -845,13 +841,11 @@ t_crash_restart_recover(Config) ->
|
|
|
?check_trace(
|
|
|
begin
|
|
|
%% Initialize DB on all nodes.
|
|
|
- ?assertEqual(
|
|
|
- [{ok, ok} || _ <- Nodes],
|
|
|
- erpc:multicall(Nodes, emqx_ds, open_db, [?DB, DBOpts])
|
|
|
- ),
|
|
|
+ assert_db_open(Nodes, ?DB, DBOpts),
|
|
|
|
|
|
%% Apply the test events, including simulated node crashes.
|
|
|
NodeStream = emqx_utils_stream:const(N1),
|
|
|
+ StartedAt = erlang:monotonic_time(millisecond),
|
|
|
emqx_ds_test_helpers:apply_stream(?DB, NodeStream, Stream, 0),
|
|
|
|
|
|
%% It's expected to lose few messages when leaders are abruptly killed.
|
|
|
@@ -865,6 +859,10 @@ t_crash_restart_recover(Config) ->
|
|
|
ct:pal("Some messages were lost: ~p", [LostMessages]),
|
|
|
?assert(length(LostMessages) < NMsgs div 20),
|
|
|
|
|
|
+ %% Wait until crashed nodes are ready.
|
|
|
+ SinceStarted = erlang:monotonic_time(millisecond) - StartedAt,
|
|
|
+ wait_db_bootstrapped([N2, N3], ?DB, infinity, SinceStarted),
|
|
|
+
|
|
|
%% Verify that all the successfully persisted messages are there.
|
|
|
VerifyClient = fun({ClientId, ExpectedStream}) ->
|
|
|
Topic = emqx_ds_test_helpers:client_topic(?FUNCTION_NAME, ClientId),
|
|
|
@@ -926,7 +924,8 @@ assert_db_open(Nodes, DB, Opts) ->
|
|
|
?assertEqual(
|
|
|
[{ok, ok} || _ <- Nodes],
|
|
|
erpc:multicall(Nodes, emqx_ds, open_db, [DB, Opts])
|
|
|
- ).
|
|
|
+ ),
|
|
|
+ wait_db_bootstrapped(Nodes, ?DB).
|
|
|
|
|
|
assert_db_stable([Node | _], DB) ->
|
|
|
Shards = ds_repl_meta(Node, shards, [DB]),
|
|
|
@@ -935,6 +934,32 @@ assert_db_stable([Node | _], DB) ->
|
|
|
db_leadership(Node, DB, Shards)
|
|
|
).
|
|
|
|
|
|
+wait_db_bootstrapped(Nodes, DB) ->
|
|
|
+ wait_db_bootstrapped(Nodes, DB, infinity, infinity).
|
|
|
+
|
|
|
+wait_db_bootstrapped(Nodes, DB, Timeout, BackInTime) ->
|
|
|
+ SRefs = [
|
|
|
+ snabbkaffe:subscribe(
|
|
|
+ ?match_event(#{
|
|
|
+ ?snk_kind := emqx_ds_replshard_bootstrapped,
|
|
|
+ ?snk_meta := #{node := Node},
|
|
|
+ db := DB,
|
|
|
+ shard := Shard
|
|
|
+ }),
|
|
|
+ 1,
|
|
|
+ Timeout,
|
|
|
+ BackInTime
|
|
|
+ )
|
|
|
+ || Node <- Nodes,
|
|
|
+ Shard <- ds_repl_meta(Node, my_shards, [DB])
|
|
|
+ ],
|
|
|
+ lists:foreach(
|
|
|
+ fun({ok, SRef}) ->
|
|
|
+ ?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef))
|
|
|
+ end,
|
|
|
+ SRefs
|
|
|
+ ).
|
|
|
+
|
|
|
%%
|
|
|
|
|
|
db_leadership(Node, DB, Shards) ->
|