ソースを参照

feat(ds): Add `emqx_ds_backends` application

ieQu1 1 年間 前
コミット
ef09cfcd71

+ 2 - 0
apps/emqx/rebar.config

@@ -24,6 +24,8 @@
 {deps, [
     {emqx_utils, {path, "../emqx_utils"}},
     {emqx_durable_storage, {path, "../emqx_durable_storage"}},
+    {emqx_ds_builtin_local, {path, "../emqx_ds_builtin_local"}},
+    {emqx_ds_backends, {path, "../emqx_ds_backends"}},
     {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}},
     {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
     {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},

+ 32 - 0
apps/emqx_ds_backends/README.md

@@ -0,0 +1,32 @@
+# EMQX Durable Storage Backends
+
+This is a placeholder OTP application that depends on all durable storage backends available in the release.
+Starting it will ensure that all backends are properly loaded and registered.
+
+Consumers of `emqx_durable_storage` API should depend on this application instead of the parent `emqx_durable_storage`.
+
+# Features
+
+N/A
+
+# Limitation
+
+N/A
+
+# Documentation links
+
+N/A
+
+# Usage
+
+Any business application that creates DS databases should add this application as a dependency.
+
+# Configurations
+
+None
+
+# Other
+N/A
+
+# Contributing
+Please see our [contributing.md](../../CONTRIBUTING.md).

+ 5 - 0
apps/emqx_ds_backends/rebar.config

@@ -0,0 +1,5 @@
+%% -*- mode:erlang -*-
+{deps, [
+    {emqx_utils, {path, "../emqx_utils"}},
+    {emqx_durable_storage, {path, "../emqx_durable_storage"}}
+]}.

+ 11 - 0
apps/emqx_ds_backends/src/emqx_ds_backends.app.src

@@ -0,0 +1,11 @@
+%% -*- mode: erlang -*-
+{application, emqx_ds_backends, [
+    {description, "A placeholder application that depends on all available DS backends"},
+    % strict semver, bump manually!
+    {vsn, "0.1.0"},
+    {modules, []},
+    {registered, []},
+    {applications, [kernel, stdlib, emqx_durable_storage, emqx_ds_builtin_local]},
+    {optional_applications, [emqx_ds_builtin_raft]},
+    {env, []}
+]}.

+ 98 - 315
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -13,7 +13,7 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%--------------------------------------------------------------------
--module(emqx_ds_SUITE).
+-module(emqx_ds_backends_SUITE).
 
 -compile(export_all).
 -compile(nowarn_export_all).
@@ -26,52 +26,27 @@
 
 -define(N_SHARDS, 1).
 
-opts() ->
-    #{
-        backend => builtin,
-        storage => {emqx_ds_storage_reference, #{}},
-        n_shards => ?N_SHARDS,
-        n_sites => 1,
-        replication_factor => 3,
-        replication_options => #{}
-    }.
+opts(Config) ->
+    proplists:get_value(ds_conf, Config).
 
 %% A simple smoke test that verifies that opening/closing the DB
 %% doesn't crash, and not much else
-t_00_smoke_open_drop(_Config) ->
+t_00_smoke_open_drop(Config) ->
     DB = 'DB',
-    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
-    %% Check metadata:
-    %%    We have only one site:
-    [Site] = emqx_ds_replication_layer_meta:sites(),
-    %%    Check all shards:
-    Shards = emqx_ds_replication_layer_meta:shards(DB),
-    %%    Since there is only one site all shards should be allocated
-    %%    to this site:
-    MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
-    ?assertEqual(?N_SHARDS, length(Shards)),
-    lists:foreach(
-        fun(Shard) ->
-            ?assertEqual(
-                [Site], emqx_ds_replication_layer_meta:replica_set(DB, Shard)
-            )
-        end,
-        Shards
-    ),
-    ?assertEqual(lists:sort(Shards), lists:sort(MyShards)),
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
     %% Reopen the DB and make sure the operation is idempotent:
-    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
     %% Close the DB:
     ?assertMatch(ok, emqx_ds:drop_db(DB)).
 
 %% A simple smoke test that verifies that storing the messages doesn't
 %% crash
-t_01_smoke_store(_Config) ->
+t_01_smoke_store(Config) ->
     ?check_trace(
         #{timetrap => 10_000},
         begin
             DB = default,
-            ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+            ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
             Msg = message(<<"foo/bar">>, <<"foo">>, 0),
             ?assertMatch(ok, emqx_ds:store_batch(DB, [Msg]))
         end,
@@ -80,9 +55,9 @@ t_01_smoke_store(_Config) ->
 
 %% A simple smoke test that verifies that getting the list of streams
 %% doesn't crash and that iterators can be opened.
-t_02_smoke_get_streams_start_iter(_Config) ->
+t_02_smoke_get_streams_start_iter(Config) ->
     DB = ?FUNCTION_NAME,
-    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
     StartTime = 0,
     TopicFilter = ['#'],
     [{Rank, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
@@ -91,9 +66,9 @@ t_02_smoke_get_streams_start_iter(_Config) ->
 
 %% A simple smoke test that verifies that it's possible to iterate
 %% over messages.
-t_03_smoke_iterate(_Config) ->
+t_03_smoke_iterate(Config) ->
     DB = ?FUNCTION_NAME,
-    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
     StartTime = 0,
     TopicFilter = ['#'],
     Msgs = [
@@ -101,7 +76,7 @@ t_03_smoke_iterate(_Config) ->
         message(<<"foo">>, <<"2">>, 1),
         message(<<"bar/bar">>, <<"3">>, 2)
     ],
-    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
+    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs, #{sync => true})),
     [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
     {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
     {ok, Iter, Batch} = emqx_ds_test_helpers:consume_iter(DB, Iter0),
@@ -112,9 +87,9 @@ t_03_smoke_iterate(_Config) ->
 %% to the external resources, such as clients' sessions, and they
 %% should always be able to continue replaying the topics from where
 %% they are left off.
-t_04_restart(_Config) ->
+t_04_restart(Config) ->
     DB = ?FUNCTION_NAME,
-    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
     TopicFilter = ['#'],
     StartTime = 0,
     Msgs = [
@@ -122,22 +97,22 @@ t_04_restart(_Config) ->
         message(<<"foo">>, <<"2">>, 1),
         message(<<"bar/bar">>, <<"3">>, 2)
     ],
-    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
+    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs, #{sync => true})),
     [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
     {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
     %% Restart the application:
     ?tp(warning, emqx_ds_SUITE_restart_app, #{}),
     ok = application:stop(emqx_durable_storage),
     {ok, _} = application:ensure_all_started(emqx_durable_storage),
-    ok = emqx_ds:open_db(DB, opts()),
+    ok = emqx_ds:open_db(DB, opts(Config)),
     %% The old iterator should be still operational:
     {ok, Iter, Batch} = emqx_ds_test_helpers:consume_iter(DB, Iter0),
     ?assertEqual(Msgs, Batch, {Iter0, Iter}).
 
 %% Check that we can create iterators directly from DS keys.
-t_05_update_iterator(_Config) ->
+t_05_update_iterator(Config) ->
     DB = ?FUNCTION_NAME,
-    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
     TopicFilter = ['#'],
     StartTime = 0,
     Msgs = [
@@ -158,104 +133,42 @@ t_05_update_iterator(_Config) ->
     ?assertEqual(Msgs, [Msg0 | Batch], #{from_key => Iter1, final_iter => Iter}),
     ok.
 
-t_06_update_config(_Config) ->
+t_06_smoke_add_generation(Config) ->
     DB = ?FUNCTION_NAME,
-    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
-    TopicFilter = ['#'],
-
-    DataSet = update_data_set(),
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
+    ?assertMatch(
+        [{_, _}],
+        maps:to_list(emqx_ds:list_generations_with_lifetimes(DB))
+    ),
+    ?assertMatch(ok, emqx_ds:add_generation(DB)),
+    ?assertMatch(
+        [{_, _}, {_, _}],
+        maps:to_list(emqx_ds:list_generations_with_lifetimes(DB))
+    ).
 
-    ToMsgs = fun(Datas) ->
-        lists:map(
-            fun({Topic, Payload}) ->
-                message(Topic, Payload, emqx_message:timestamp_now())
-            end,
-            Datas
-        )
-    end,
-
-    {_, StartTimes, MsgsList} =
-        lists:foldl(
-            fun
-                (Datas, {true, TimeAcc, MsgAcc}) ->
-                    Msgs = ToMsgs(Datas),
-                    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
-                    {false, TimeAcc, [Msgs | MsgAcc]};
-                (Datas, {Any, TimeAcc, MsgAcc}) ->
-                    timer:sleep(500),
-                    ?assertMatch(ok, emqx_ds:update_db_config(DB, opts())),
-                    timer:sleep(500),
-                    StartTime = emqx_message:timestamp_now(),
-                    Msgs = ToMsgs(Datas),
-                    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
-                    {Any, [StartTime | TimeAcc], [Msgs | MsgAcc]}
-            end,
-            {true, [emqx_message:timestamp_now()], []},
-            DataSet
-        ),
-
-    Checker = fun({StartTime, Msgs0}, Acc) ->
-        Msgs = Acc ++ Msgs0,
-        Batch = emqx_ds_test_helpers:consume(DB, TopicFilter, StartTime),
-        ?assertEqual(Msgs, Batch, StartTime),
-        Msgs
-    end,
-    lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)).
-
-t_07_add_generation(_Config) ->
+t_07_smoke_update_config(Config) ->
     DB = ?FUNCTION_NAME,
-    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
-    TopicFilter = ['#'],
-
-    DataSet = update_data_set(),
-
-    ToMsgs = fun(Datas) ->
-        lists:map(
-            fun({Topic, Payload}) ->
-                message(Topic, Payload, emqx_message:timestamp_now())
-            end,
-            Datas
-        )
-    end,
-
-    {_, StartTimes, MsgsList} =
-        lists:foldl(
-            fun
-                (Datas, {true, TimeAcc, MsgAcc}) ->
-                    Msgs = ToMsgs(Datas),
-                    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
-                    {false, TimeAcc, [Msgs | MsgAcc]};
-                (Datas, {Any, TimeAcc, MsgAcc}) ->
-                    timer:sleep(500),
-                    ?assertMatch(ok, emqx_ds:add_generation(DB)),
-                    timer:sleep(500),
-                    StartTime = emqx_message:timestamp_now(),
-                    Msgs = ToMsgs(Datas),
-                    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
-                    {Any, [StartTime | TimeAcc], [Msgs | MsgAcc]}
-            end,
-            {true, [emqx_message:timestamp_now()], []},
-            DataSet
-        ),
-
-    Checker = fun({StartTime, Msgs0}, Acc) ->
-        Msgs = Acc ++ Msgs0,
-        Batch = emqx_ds_test_helpers:consume(DB, TopicFilter, StartTime),
-        ?assertEqual(Msgs, Batch, StartTime),
-        Msgs
-    end,
-    lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)).
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
+    ?assertMatch(
+        [{_, _}],
+        maps:to_list(emqx_ds:list_generations_with_lifetimes(DB))
+    ),
+    ?assertMatch(ok, emqx_ds:update_db_config(DB, opts(Config))),
+    ?assertMatch(
+        [{_, _}, {_, _}],
+        maps:to_list(emqx_ds:list_generations_with_lifetimes(DB))
+    ).
 
 %% Verifies the basic usage of `list_generations_with_lifetimes' and `drop_generation'...
 %%   1) Cannot drop current generation.
 %%   2) All existing generations are returned by `list_generation_with_lifetimes'.
 %%   3) Dropping a generation removes it from the list.
 %%   4) Dropped generations stay dropped even after restarting the application.
-t_08_smoke_list_drop_generation(_Config) ->
+t_08_smoke_list_drop_generation(Config) ->
     DB = ?FUNCTION_NAME,
     ?check_trace(
         begin
-            ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+            ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
             %% Exactly one generation at first.
             Generations0 = emqx_ds:list_generations_with_lifetimes(DB),
             ?assertMatch(
@@ -295,7 +208,7 @@ t_08_smoke_list_drop_generation(_Config) ->
             %% Should persist surviving generation list
             ok = application:stop(emqx_durable_storage),
             {ok, _} = application:ensure_all_started(emqx_durable_storage),
-            ok = emqx_ds:open_db(DB, opts()),
+            ok = emqx_ds:open_db(DB, opts(Config)),
 
             Generations3 = emqx_ds:list_generations_with_lifetimes(DB),
             ?assertMatch(
@@ -310,12 +223,12 @@ t_08_smoke_list_drop_generation(_Config) ->
     ),
     ok.
 
-t_09_atomic_store_batch(_Config) ->
+t_09_atomic_store_batch(Config) ->
     DB = ?FUNCTION_NAME,
     ?check_trace(
         begin
             application:set_env(emqx_durable_storage, egress_batch_size, 1),
-            ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+            ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
             Msgs = [
                 message(<<"1">>, <<"1">>, 0),
                 message(<<"2">>, <<"2">>, 1),
@@ -335,12 +248,12 @@ t_09_atomic_store_batch(_Config) ->
     ),
     ok.
 
-t_10_non_atomic_store_batch(_Config) ->
+t_10_non_atomic_store_batch(Config) ->
     DB = ?FUNCTION_NAME,
     ?check_trace(
         begin
             application:set_env(emqx_durable_storage, egress_batch_size, 1),
-            ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+            ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
             Msgs = [
                 message(<<"1">>, <<"1">>, 0),
                 message(<<"2">>, <<"2">>, 1),
@@ -369,11 +282,11 @@ t_10_non_atomic_store_batch(_Config) ->
     ),
     ok.
 
-t_smoke_delete_next(_Config) ->
+t_smoke_delete_next(Config) ->
     DB = ?FUNCTION_NAME,
     ?check_trace(
         begin
-            ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+            ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
             StartTime = 0,
             TopicFilter = [<<"foo">>, '#'],
             Msgs =
@@ -410,7 +323,7 @@ t_smoke_delete_next(_Config) ->
     ),
     ok.
 
-t_drop_generation_with_never_used_iterator(_Config) ->
+t_drop_generation_with_never_used_iterator(Config) ->
     %% This test checks how the iterator behaves when:
     %%   1) it's created at generation 1 and not consumed from.
     %%   2) generation 2 is created and 1 dropped.
@@ -418,7 +331,7 @@ t_drop_generation_with_never_used_iterator(_Config) ->
     %% In this case, the iterator won't see any messages and the stream will end.
 
     DB = ?FUNCTION_NAME,
-    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
     [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
 
     TopicFilter = emqx_topic:words(<<"foo/+">>),
@@ -458,7 +371,7 @@ t_drop_generation_with_never_used_iterator(_Config) ->
 
     ok.
 
-t_drop_generation_with_used_once_iterator(_Config) ->
+t_drop_generation_with_used_once_iterator(Config) ->
     %% This test checks how the iterator behaves when:
     %%   1) it's created at generation 1 and consumes at least 1 message.
     %%   2) generation 2 is created and 1 dropped.
@@ -466,7 +379,7 @@ t_drop_generation_with_used_once_iterator(_Config) ->
     %% In this case, the iterator should see no more messages and the stream will end.
 
     DB = ?FUNCTION_NAME,
-    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
     [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
 
     TopicFilter = emqx_topic:words(<<"foo/+">>),
@@ -499,12 +412,12 @@ t_drop_generation_with_used_once_iterator(_Config) ->
         emqx_ds_test_helpers:consume_iter(DB, Iter1)
     ).
 
-t_drop_generation_update_iterator(_Config) ->
+t_drop_generation_update_iterator(Config) ->
     %% This checks the behavior of `emqx_ds:update_iterator' after the generation
     %% underlying the iterator has been dropped.
 
     DB = ?FUNCTION_NAME,
-    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
     [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
 
     TopicFilter = emqx_topic:words(<<"foo/+">>),
@@ -528,12 +441,12 @@ t_drop_generation_update_iterator(_Config) ->
         emqx_ds:update_iterator(DB, Iter1, Key2)
     ).
 
-t_make_iterator_stale_stream(_Config) ->
+t_make_iterator_stale_stream(Config) ->
     %% This checks the behavior of `emqx_ds:make_iterator' after the generation underlying
     %% the stream has been dropped.
 
     DB = ?FUNCTION_NAME,
-    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
     [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
 
     TopicFilter = emqx_topic:words(<<"foo/+">>),
@@ -556,7 +469,7 @@ t_make_iterator_stale_stream(_Config) ->
 
     ok.
 
-t_get_streams_concurrently_with_drop_generation(_Config) ->
+t_get_streams_concurrently_with_drop_generation(Config) ->
     %% This checks that we can get all streams while a generation is dropped
     %% mid-iteration.
 
@@ -564,7 +477,7 @@ t_get_streams_concurrently_with_drop_generation(_Config) ->
     ?check_trace(
         #{timetrap => 5_000},
         begin
-            ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+            ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
 
             [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
             ok = emqx_ds:add_generation(DB),
@@ -593,171 +506,6 @@ t_get_streams_concurrently_with_drop_generation(_Config) ->
         []
     ).
 
-t_error_mapping_replication_layer(_Config) ->
-    %% This checks that the replication layer maps recoverable errors correctly.
-
-    ok = emqx_ds_test_helpers:mock_rpc(),
-    ok = snabbkaffe:start_trace(),
-
-    DB = ?FUNCTION_NAME,
-    ?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})),
-    [Shard1, Shard2] = emqx_ds_replication_layer_meta:shards(DB),
-
-    TopicFilter = emqx_topic:words(<<"foo/#">>),
-    Msgs = [
-        message(<<"C1">>, <<"foo/bar">>, <<"1">>, 0),
-        message(<<"C1">>, <<"foo/baz">>, <<"2">>, 1),
-        message(<<"C2">>, <<"foo/foo">>, <<"3">>, 2),
-        message(<<"C3">>, <<"foo/xyz">>, <<"4">>, 3),
-        message(<<"C4">>, <<"foo/bar">>, <<"5">>, 4),
-        message(<<"C5">>, <<"foo/oof">>, <<"6">>, 5)
-    ],
-
-    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
-
-    ?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush, shard := Shard1}),
-    ?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush, shard := Shard2}),
-
-    Streams0 = emqx_ds:get_streams(DB, TopicFilter, 0),
-    Iterators0 = lists:map(
-        fun({_Rank, S}) ->
-            {ok, Iter} = emqx_ds:make_iterator(DB, S, TopicFilter, 0),
-            Iter
-        end,
-        Streams0
-    ),
-
-    %% Disrupt the link to the second shard.
-    ok = emqx_ds_test_helpers:mock_rpc_result(
-        fun(_Node, emqx_ds_replication_layer, _Function, Args) ->
-            case Args of
-                [DB, Shard1 | _] -> passthrough;
-                [DB, Shard2 | _] -> unavailable
-            end
-        end
-    ),
-
-    %% Result of `emqx_ds:get_streams/3` will just contain partial results, not an error.
-    Streams1 = emqx_ds:get_streams(DB, TopicFilter, 0),
-    ?assert(
-        length(Streams1) > 0 andalso length(Streams1) =< length(Streams0),
-        Streams1
-    ),
-
-    %% At least one of `emqx_ds:make_iterator/4` will end in an error.
-    Results1 = lists:map(
-        fun({_Rank, S}) ->
-            case emqx_ds:make_iterator(DB, S, TopicFilter, 0) of
-                Ok = {ok, _Iter} ->
-                    Ok;
-                Error = {error, recoverable, {erpc, _}} ->
-                    Error;
-                Other ->
-                    ct:fail({unexpected_result, Other})
-            end
-        end,
-        Streams0
-    ),
-    ?assert(
-        length([error || {error, _, _} <- Results1]) > 0,
-        Results1
-    ),
-
-    %% At least one of `emqx_ds:next/3` over initial set of iterators will end in an error.
-    Results2 = lists:map(
-        fun(Iter) ->
-            case emqx_ds:next(DB, Iter, _BatchSize = 42) of
-                Ok = {ok, _Iter, [_ | _]} ->
-                    Ok;
-                Error = {error, recoverable, {badrpc, _}} ->
-                    Error;
-                Other ->
-                    ct:fail({unexpected_result, Other})
-            end
-        end,
-        Iterators0
-    ),
-    ?assert(
-        length([error || {error, _, _} <- Results2]) > 0,
-        Results2
-    ),
-    meck:unload().
-
-%% This testcase verifies the behavior of `store_batch' operation
-%% when the underlying code experiences recoverable or unrecoverable
-%% problems.
-t_store_batch_fail(_Config) ->
-    ?check_trace(
-        #{timetrap => 15_000},
-        try
-            meck:new(emqx_ds_storage_layer, [passthrough, no_history]),
-            DB = ?FUNCTION_NAME,
-            ?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})),
-            %% Success:
-            Batch1 = [
-                message(<<"C1">>, <<"foo/bar">>, <<"1">>, 1),
-                message(<<"C1">>, <<"foo/bar">>, <<"2">>, 1)
-            ],
-            ?assertMatch(ok, emqx_ds:store_batch(DB, Batch1, #{sync => true})),
-            %% Inject unrecoverable error:
-            meck:expect(emqx_ds_storage_layer, store_batch, fun(_DB, _Shard, _Messages) ->
-                {error, unrecoverable, mock}
-            end),
-            Batch2 = [
-                message(<<"C1">>, <<"foo/bar">>, <<"3">>, 1),
-                message(<<"C1">>, <<"foo/bar">>, <<"4">>, 1)
-            ],
-            ?assertMatch(
-                {error, unrecoverable, mock}, emqx_ds:store_batch(DB, Batch2, #{sync => true})
-            ),
-            meck:unload(emqx_ds_storage_layer),
-            %% Inject a recoveralbe error:
-            meck:new(ra, [passthrough, no_history]),
-            meck:expect(ra, process_command, fun(Servers, Shard, Command) ->
-                ?tp(ra_command, #{servers => Servers, shard => Shard, command => Command}),
-                {timeout, mock}
-            end),
-            Batch3 = [
-                message(<<"C1">>, <<"foo/bar">>, <<"5">>, 2),
-                message(<<"C2">>, <<"foo/bar">>, <<"6">>, 2),
-                message(<<"C1">>, <<"foo/bar">>, <<"7">>, 3),
-                message(<<"C2">>, <<"foo/bar">>, <<"8">>, 3)
-            ],
-            %% Note: due to idempotency issues the number of retries
-            %% is currently set to 0:
-            ?assertMatch(
-                {error, recoverable, {timeout, mock}},
-                emqx_ds:store_batch(DB, Batch3, #{sync => true})
-            ),
-            meck:unload(ra),
-            ?assertMatch(ok, emqx_ds:store_batch(DB, Batch3, #{sync => true})),
-            lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 1))
-        after
-            meck:unload()
-        end,
-        [
-            {"message ordering", fun(StoredMessages, _Trace) ->
-                [{_, Stream1}, {_, Stream2}] = StoredMessages,
-                ?assertMatch(
-                    [
-                        #message{payload = <<"1">>},
-                        #message{payload = <<"2">>},
-                        #message{payload = <<"5">>},
-                        #message{payload = <<"7">>}
-                    ],
-                    Stream1
-                ),
-                ?assertMatch(
-                    [
-                        #message{payload = <<"6">>},
-                        #message{payload = <<"8">>}
-                    ],
-                    Stream2
-                )
-            end}
-        ]
-    ).
-
 update_data_set() ->
     [
         [
@@ -802,12 +550,46 @@ delete(DB, It0, Selector, BatchSize, Acc) ->
 
 %% CT callbacks
 
-all() -> emqx_common_test_helpers:all(?MODULE).
+-if(?EMQX_RELEASE_EDITION == ee).
+all() ->
+    [{group, builtin_local}, {group, builtin_raft}].
+-else.
+all() ->
+    [{group, builtin_local}].
+-endif.
+
+groups() ->
+    TCs = emqx_common_test_helpers:all(?MODULE),
+    [
+        {builtin_local, TCs},
+        {builtin_raft, TCs}
+    ].
+
+init_per_group(builtin_local, Config) ->
+    Conf = #{
+        backend => builtin_local,
+        storage => {emqx_ds_storage_reference, #{}},
+        n_shards => ?N_SHARDS
+    },
+    [{ds_conf, Conf} | Config];
+init_per_group(builtin_raft, Config) ->
+    Conf = #{
+        backend => builtin_raft,
+        storage => {emqx_ds_storage_reference, #{}},
+        n_shards => ?N_SHARDS,
+        n_sites => 1,
+        replication_factor => 3,
+        replication_options => #{}
+    },
+    [{ds_conf, Conf} | Config].
+
+end_per_group(_Group, Config) ->
+    Config.
 
 init_per_suite(Config) ->
     emqx_common_test_helpers:clear_screen(),
     Apps = emqx_cth_suite:start(
-        [mria, emqx_durable_storage],
+        [mria, emqx_ds_backends],
         #{work_dir => ?config(priv_dir, Config)}
     ),
     [{apps, Apps} | Config].
@@ -820,7 +602,8 @@ init_per_testcase(_TC, Config) ->
     application:ensure_all_started(emqx_durable_storage),
     Config.
 
-end_per_testcase(_TC, _Config) ->
+end_per_testcase(TC, _Config) ->
+    ok = emqx_ds:drop_db(TC),
     snabbkaffe:stop(),
     ok = application:stop(emqx_durable_storage),
     mria:stop(),

+ 75 - 0
apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl

@@ -698,6 +698,81 @@ t_error_mapping_replication_layer(_Config) ->
     ),
     meck:unload().
 
+%% This testcase verifies the behavior of `store_batch' operation
+%% when the underlying code experiences recoverable or unrecoverable
+%% problems.
+t_store_batch_fail(_Config) ->
+    ?check_trace(
+        #{timetrap => 15_000},
+        try
+            meck:new(emqx_ds_storage_layer, [passthrough, no_history]),
+            DB = ?FUNCTION_NAME,
+            ?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})),
+            %% Success:
+            Batch1 = [
+                message(<<"C1">>, <<"foo/bar">>, <<"1">>, 1),
+                message(<<"C1">>, <<"foo/bar">>, <<"2">>, 1)
+            ],
+            ?assertMatch(ok, emqx_ds:store_batch(DB, Batch1, #{sync => true})),
+            %% Inject unrecoverable error:
+            meck:expect(emqx_ds_storage_layer, store_batch, fun(_DB, _Shard, _Messages) ->
+                {error, unrecoverable, mock}
+            end),
+            Batch2 = [
+                message(<<"C1">>, <<"foo/bar">>, <<"3">>, 1),
+                message(<<"C1">>, <<"foo/bar">>, <<"4">>, 1)
+            ],
+            ?assertMatch(
+                {error, unrecoverable, mock}, emqx_ds:store_batch(DB, Batch2, #{sync => true})
+            ),
+            meck:unload(emqx_ds_storage_layer),
+            %% Inject a recoveralbe error:
+            meck:new(ra, [passthrough, no_history]),
+            meck:expect(ra, process_command, fun(Servers, Shard, Command) ->
+                ?tp(ra_command, #{servers => Servers, shard => Shard, command => Command}),
+                {timeout, mock}
+            end),
+            Batch3 = [
+                message(<<"C1">>, <<"foo/bar">>, <<"5">>, 2),
+                message(<<"C2">>, <<"foo/bar">>, <<"6">>, 2),
+                message(<<"C1">>, <<"foo/bar">>, <<"7">>, 3),
+                message(<<"C2">>, <<"foo/bar">>, <<"8">>, 3)
+            ],
+            %% Note: due to idempotency issues the number of retries
+            %% is currently set to 0:
+            ?assertMatch(
+                {error, recoverable, {timeout, mock}},
+                emqx_ds:store_batch(DB, Batch3, #{sync => true})
+            ),
+            meck:unload(ra),
+            ?assertMatch(ok, emqx_ds:store_batch(DB, Batch3, #{sync => true})),
+            lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 1))
+        after
+            meck:unload()
+        end,
+        [
+            {"message ordering", fun(StoredMessages, _Trace) ->
+                [{_, Stream1}, {_, Stream2}] = StoredMessages,
+                ?assertMatch(
+                    [
+                        #message{payload = <<"1">>},
+                        #message{payload = <<"2">>},
+                        #message{payload = <<"5">>},
+                        #message{payload = <<"7">>}
+                    ],
+                    Stream1
+                ),
+                ?assertMatch(
+                    [
+                        #message{payload = <<"6">>},
+                        #message{payload = <<"8">>}
+                    ],
+                    Stream2
+                )
+            end}
+        ]
+    ).
+
 %%
 
 shard_server_info(Node, DB, Shard, Site, Info) ->

+ 2 - 0
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -87,6 +87,8 @@
 %% Type declarations
 %%================================================================================
 
+-define(APP, emqx_durable_storage).
+
 %% # "Record" integer keys.  We use maps with integer keys to avoid persisting and sending
 %% records over the wire.
 %% tags:

+ 2 - 2
apps/emqx_durable_storage/src/emqx_durable_storage.app.src

@@ -2,10 +2,10 @@
 {application, emqx_durable_storage, [
     {description, "Message persistence and subscription replays for EMQX"},
     % strict semver, bump manually!
-    {vsn, "0.2.1"},
+    {vsn, "0.3.0"},
     {modules, []},
     {registered, []},
-    {applications, [kernel, stdlib, rocksdb, gproc, mria, ra, emqx_utils]},
+    {applications, [kernel, stdlib, rocksdb, gproc, mria, emqx_utils]},
     {mod, {emqx_ds_app, []}},
     {env, []}
 ]}.

+ 1 - 0
apps/emqx_machine/priv/reboot_lists.eterm

@@ -42,6 +42,7 @@
             esasl,
             emqx_utils,
             emqx_durable_storage,
+            emqx_ds_backends,
             emqx_http_lib,
             emqx_resource,
             emqx_connector,