|
|
@@ -35,7 +35,7 @@ opts() ->
|
|
|
opts(Overrides) ->
|
|
|
maps:merge(
|
|
|
#{
|
|
|
- backend => builtin,
|
|
|
+ backend => builtin_raft,
|
|
|
%% storage => {emqx_ds_storage_reference, #{}},
|
|
|
storage => {emqx_ds_storage_bitfield_lts, #{epoch_bits => 10}},
|
|
|
n_shards => 16,
|
|
|
@@ -56,8 +56,52 @@ appspec(emqx_durable_storage) ->
|
|
|
override_env => [{egress_flush_interval, 1}]
|
|
|
}}.
|
|
|
|
|
|
+t_metadata(init, Config) ->
|
|
|
+ emqx_cth_suite:start([emqx_ds_builtin_raft], #{
|
|
|
+ work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)
|
|
|
+ }),
|
|
|
+ Config;
|
|
|
+t_metadata('end', Config) ->
|
|
|
+ emqx_cth_suite:stop([emqx_ds_builtin_raft]),
|
|
|
+ Config.
|
|
|
+
|
|
|
+t_metadata(_Config) ->
|
|
|
+ DB = ?FUNCTION_NAME,
|
|
|
+ NShards = 1,
|
|
|
+ Options = #{
|
|
|
+ backend => builtin_raft,
|
|
|
+ storage => {emqx_ds_storage_reference, #{}},
|
|
|
+ n_shards => NShards,
|
|
|
+ n_sites => 1,
|
|
|
+ replication_factor => 1,
|
|
|
+ replication_options => #{}
|
|
|
+ },
|
|
|
+ try
|
|
|
+ ?assertMatch(ok, emqx_ds:open_db(DB, Options)),
|
|
|
+ %% Check metadata:
|
|
|
+ %% We have only one site:
|
|
|
+ [Site] = emqx_ds_replication_layer_meta:sites(),
|
|
|
+ %% Check all shards:
|
|
|
+ Shards = emqx_ds_replication_layer_meta:shards(DB),
|
|
|
+ %% Since there is only one site all shards should be allocated
|
|
|
+ %% to this site:
|
|
|
+ MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
|
|
|
+ ?assertEqual(NShards, length(Shards)),
|
|
|
+ lists:foreach(
|
|
|
+ fun(Shard) ->
|
|
|
+ ?assertEqual(
|
|
|
+ [Site], emqx_ds_replication_layer_meta:replica_set(DB, Shard)
|
|
|
+ )
|
|
|
+ end,
|
|
|
+ Shards
|
|
|
+ ),
|
|
|
+ ?assertEqual(lists:sort(Shards), lists:sort(MyShards))
|
|
|
+ after
|
|
|
+ ?assertMatch(ok, emqx_ds:drop_db(DB))
|
|
|
+ end.
|
|
|
+
|
|
|
t_replication_transfers_snapshots(init, Config) ->
|
|
|
- Apps = [appspec(emqx_durable_storage)],
|
|
|
+ Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft],
|
|
|
NodeSpecs = emqx_cth_cluster:mk_nodespecs(
|
|
|
[
|
|
|
{t_replication_transfers_snapshots1, #{apps => Apps}},
|
|
|
@@ -130,7 +174,7 @@ t_replication_transfers_snapshots(Config) ->
|
|
|
).
|
|
|
|
|
|
t_rebalance(init, Config) ->
|
|
|
- Apps = [appspec(emqx_durable_storage)],
|
|
|
+ Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft],
|
|
|
Nodes = emqx_cth_cluster:start(
|
|
|
[
|
|
|
{t_rebalance1, #{apps => Apps}},
|
|
|
@@ -260,7 +304,7 @@ t_rebalance(Config) ->
|
|
|
).
|
|
|
|
|
|
t_join_leave_errors(init, Config) ->
|
|
|
- Apps = [appspec(emqx_durable_storage)],
|
|
|
+ Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft],
|
|
|
Nodes = emqx_cth_cluster:start(
|
|
|
[
|
|
|
{t_join_leave_errors1, #{apps => Apps}},
|
|
|
@@ -322,7 +366,7 @@ t_join_leave_errors(Config) ->
|
|
|
?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)).
|
|
|
|
|
|
t_rebalance_chaotic_converges(init, Config) ->
|
|
|
- Apps = [appspec(emqx_durable_storage)],
|
|
|
+ Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft],
|
|
|
Nodes = emqx_cth_cluster:start(
|
|
|
[
|
|
|
{t_rebalance_chaotic_converges1, #{apps => Apps}},
|
|
|
@@ -418,7 +462,7 @@ t_rebalance_chaotic_converges(Config) ->
|
|
|
).
|
|
|
|
|
|
t_rebalance_offline_restarts(init, Config) ->
|
|
|
- Apps = [appspec(emqx_durable_storage)],
|
|
|
+ Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft],
|
|
|
Specs = emqx_cth_cluster:mk_nodespecs(
|
|
|
[
|
|
|
{t_rebalance_offline_restarts1, #{apps => Apps}},
|
|
|
@@ -435,6 +479,7 @@ t_rebalance_offline_restarts('end', Config) ->
|
|
|
t_rebalance_offline_restarts(Config) ->
|
|
|
%% This testcase verifies that rebalancing progresses if nodes restart or
|
|
|
%% go offline and never come back.
|
|
|
+ ok = snabbkaffe:start_trace(),
|
|
|
|
|
|
Nodes = [N1, N2, N3] = ?config(nodes, Config),
|
|
|
_Specs = [NS1, NS2, _] = ?config(nodespecs, Config),
|
|
|
@@ -477,7 +522,7 @@ t_rebalance_offline_restarts(Config) ->
|
|
|
?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])).
|
|
|
|
|
|
t_drop_generation(Config) ->
|
|
|
- Apps = [appspec(emqx_durable_storage)],
|
|
|
+ Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft],
|
|
|
[_, _, NS3] =
|
|
|
NodeSpecs = emqx_cth_cluster:mk_nodespecs(
|
|
|
[
|
|
|
@@ -554,6 +599,105 @@ t_drop_generation(Config) ->
|
|
|
end
|
|
|
).
|
|
|
|
|
|
+t_error_mapping_replication_layer(init, Config) ->
|
|
|
+ emqx_cth_suite:start([emqx_ds_builtin_raft], #{
|
|
|
+ work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)
|
|
|
+ }),
|
|
|
+ Config;
|
|
|
+t_error_mapping_replication_layer('end', Config) ->
|
|
|
+ emqx_cth_suite:stop([emqx_ds_builtin_raft]),
|
|
|
+ Config.
|
|
|
+
|
|
|
+t_error_mapping_replication_layer(_Config) ->
|
|
|
+ %% This checks that the replication layer maps recoverable errors correctly.
|
|
|
+
|
|
|
+ ok = emqx_ds_test_helpers:mock_rpc(),
|
|
|
+ ok = snabbkaffe:start_trace(),
|
|
|
+
|
|
|
+ DB = ?FUNCTION_NAME,
|
|
|
+ ?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})),
|
|
|
+ [Shard1, Shard2] = emqx_ds_replication_layer_meta:shards(DB),
|
|
|
+
|
|
|
+ TopicFilter = emqx_topic:words(<<"foo/#">>),
|
|
|
+ Msgs = [
|
|
|
+ message(<<"C1">>, <<"foo/bar">>, <<"1">>, 0),
|
|
|
+ message(<<"C1">>, <<"foo/baz">>, <<"2">>, 1),
|
|
|
+ message(<<"C2">>, <<"foo/foo">>, <<"3">>, 2),
|
|
|
+ message(<<"C3">>, <<"foo/xyz">>, <<"4">>, 3),
|
|
|
+ message(<<"C4">>, <<"foo/bar">>, <<"5">>, 4),
|
|
|
+ message(<<"C5">>, <<"foo/oof">>, <<"6">>, 5)
|
|
|
+ ],
|
|
|
+
|
|
|
+ ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
|
|
|
+
|
|
|
+ ?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush, shard := Shard1}),
|
|
|
+ ?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush, shard := Shard2}),
|
|
|
+
|
|
|
+ Streams0 = emqx_ds:get_streams(DB, TopicFilter, 0),
|
|
|
+ Iterators0 = lists:map(
|
|
|
+ fun({_Rank, S}) ->
|
|
|
+ {ok, Iter} = emqx_ds:make_iterator(DB, S, TopicFilter, 0),
|
|
|
+ Iter
|
|
|
+ end,
|
|
|
+ Streams0
|
|
|
+ ),
|
|
|
+
|
|
|
+ %% Disrupt the link to the second shard.
|
|
|
+ ok = emqx_ds_test_helpers:mock_rpc_result(
|
|
|
+ fun(_Node, emqx_ds_replication_layer, _Function, Args) ->
|
|
|
+ case Args of
|
|
|
+ [DB, Shard1 | _] -> passthrough;
|
|
|
+ [DB, Shard2 | _] -> unavailable
|
|
|
+ end
|
|
|
+ end
|
|
|
+ ),
|
|
|
+
|
|
|
+ %% Result of `emqx_ds:get_streams/3` will just contain partial results, not an error.
|
|
|
+ Streams1 = emqx_ds:get_streams(DB, TopicFilter, 0),
|
|
|
+ ?assert(
|
|
|
+ length(Streams1) > 0 andalso length(Streams1) =< length(Streams0),
|
|
|
+ Streams1
|
|
|
+ ),
|
|
|
+
|
|
|
+ %% At least one of `emqx_ds:make_iterator/4` will end in an error.
|
|
|
+ Results1 = lists:map(
|
|
|
+ fun({_Rank, S}) ->
|
|
|
+ case emqx_ds:make_iterator(DB, S, TopicFilter, 0) of
|
|
|
+ Ok = {ok, _Iter} ->
|
|
|
+ Ok;
|
|
|
+ Error = {error, recoverable, {erpc, _}} ->
|
|
|
+ Error;
|
|
|
+ Other ->
|
|
|
+ ct:fail({unexpected_result, Other})
|
|
|
+ end
|
|
|
+ end,
|
|
|
+ Streams0
|
|
|
+ ),
|
|
|
+ ?assert(
|
|
|
+ length([error || {error, _, _} <- Results1]) > 0,
|
|
|
+ Results1
|
|
|
+ ),
|
|
|
+
|
|
|
+ %% At least one of `emqx_ds:next/3` over initial set of iterators will end in an error.
|
|
|
+ Results2 = lists:map(
|
|
|
+ fun(Iter) ->
|
|
|
+ case emqx_ds:next(DB, Iter, _BatchSize = 42) of
|
|
|
+ Ok = {ok, _Iter, [_ | _]} ->
|
|
|
+ Ok;
|
|
|
+ Error = {error, recoverable, {badrpc, _}} ->
|
|
|
+ Error;
|
|
|
+ Other ->
|
|
|
+ ct:fail({unexpected_result, Other})
|
|
|
+ end
|
|
|
+ end,
|
|
|
+ Iterators0
|
|
|
+ ),
|
|
|
+ ?assert(
|
|
|
+ length([error || {error, _, _} <- Results2]) > 0,
|
|
|
+ Results2
|
|
|
+ ),
|
|
|
+ meck:unload().
|
|
|
+
|
|
|
%%
|
|
|
|
|
|
shard_server_info(Node, DB, Shard, Site, Info) ->
|
|
|
@@ -583,7 +727,7 @@ shards(Node, DB) ->
|
|
|
erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]).
|
|
|
|
|
|
shards_online(Node, DB) ->
|
|
|
- erpc:call(Node, emqx_ds_builtin_db_sup, which_shards, [DB]).
|
|
|
+ erpc:call(Node, emqx_ds_builtin_raft_db_sup, which_shards, [DB]).
|
|
|
|
|
|
n_shards_online(Node, DB) ->
|
|
|
length(shards_online(Node, DB)).
|
|
|
@@ -635,7 +779,6 @@ all() -> emqx_common_test_helpers:all(?MODULE).
|
|
|
|
|
|
init_per_testcase(TCName, Config0) ->
|
|
|
Config = emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config0),
|
|
|
- ok = snabbkaffe:start_trace(),
|
|
|
Config.
|
|
|
|
|
|
end_per_testcase(TCName, Config) ->
|