فهرست منبع

feat(ds): Shard messages by publisher client ID

ieQu1 2 سال پیش
والد
کامیت
2a1f7d946a

+ 1 - 1
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -443,7 +443,7 @@ create_tables() ->
     ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{
     ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{
         backend => builtin,
         backend => builtin,
         storage => {emqx_ds_storage_bitfield_lts, #{}},
         storage => {emqx_ds_storage_bitfield_lts, #{}},
-        n_shards => 255,
+        n_shards => 16,
         replication_factor => 3
         replication_factor => 3
     }),
     }),
     ok = mria:create_table(
     ok = mria:create_table(

+ 16 - 0
apps/emqx/src/emqx_schema.erl

@@ -341,6 +341,22 @@ fields("persistent_session_store") ->
                     importance => ?IMPORTANCE_HIDDEN
                     importance => ?IMPORTANCE_HIDDEN
                 }
                 }
             )},
             )},
+        {"n_shards",
+            sc(
+                pos_integer(),
+                #{
+                    default => 16,
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )},
+        {"replication_factor",
+            sc(
+                pos_integer(),
+                #{
+                    default => 3,
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )},
         {"on_disc",
         {"on_disc",
             sc(
             sc(
                 boolean(),
                 boolean(),

+ 0 - 1
apps/emqx_durable_storage/src/emqx_ds_app.erl

@@ -7,5 +7,4 @@
 -export([start/2]).
 -export([start/2]).
 
 
 start(_Type, _Args) ->
 start(_Type, _Args) ->
-    emqx_ds_replication_layer_meta:init(),
     emqx_ds_sup:start_link().
     emqx_ds_sup:start_link().

+ 52 - 33
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -32,8 +32,7 @@
 
 
 %% internal exports:
 %% internal exports:
 -export([
 -export([
-    do_open_shard_v1/3,
-    do_drop_shard_v1/2,
+    do_drop_db_v1/1,
     do_store_batch_v1/4,
     do_store_batch_v1/4,
     do_get_streams_v1/4,
     do_get_streams_v1/4,
     do_make_iterator_v1/5,
     do_make_iterator_v1/5,
@@ -42,6 +41,8 @@
 
 
 -export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0]).
 -export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0]).
 
 
+-include_lib("emqx_utils/include/emqx_message.hrl").
+
 %%================================================================================
 %%================================================================================
 %% Type declarations
 %% Type declarations
 %%================================================================================
 %%================================================================================
@@ -95,40 +96,34 @@
 %%================================================================================
 %%================================================================================
 
 
 -spec list_shards(emqx_ds:db()) -> [shard_id()].
 -spec list_shards(emqx_ds:db()) -> [shard_id()].
-list_shards(_DB) ->
-    %% TODO: milestone 5
-    lists:map(fun atom_to_binary/1, list_nodes()).
+list_shards(DB) ->
+    emqx_ds_replication_layer_meta:shards(DB).
 
 
 -spec open_db(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
 -spec open_db(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
 open_db(DB, CreateOpts) ->
 open_db(DB, CreateOpts) ->
-    %% TODO: improve error reporting, don't just crash
     Opts = emqx_ds_replication_layer_meta:open_db(DB, CreateOpts),
     Opts = emqx_ds_replication_layer_meta:open_db(DB, CreateOpts),
+    MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
     lists:foreach(
     lists:foreach(
         fun(Shard) ->
         fun(Shard) ->
-            Node = node_of_shard(DB, Shard),
-            ok = emqx_ds_proto_v1:open_shard(Node, DB, Shard, Opts)
+            emqx_ds_storage_layer:open_shard({DB, Shard}, Opts),
+            maybe_set_myself_as_leader(DB, Shard)
         end,
         end,
-        list_shards(DB)
+        MyShards
     ).
     ).
 
 
 -spec drop_db(emqx_ds:db()) -> ok | {error, _}.
 -spec drop_db(emqx_ds:db()) -> ok | {error, _}.
 drop_db(DB) ->
 drop_db(DB) ->
+    Nodes = list_nodes(),
+    _ = emqx_ds_proto_v1:drop_db(Nodes, DB),
     _ = emqx_ds_replication_layer_meta:drop_db(DB),
     _ = emqx_ds_replication_layer_meta:drop_db(DB),
-    lists:foreach(
-        fun(Shard) ->
-            Node = node_of_shard(DB, Shard),
-            ok = emqx_ds_proto_v1:drop_shard(Node, DB, Shard)
-        end,
-        list_shards(DB)
-    ).
+    ok.
 
 
--spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
+-spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) ->
     emqx_ds:store_batch_result().
     emqx_ds:store_batch_result().
-store_batch(DB, Batch, Opts) ->
-    %% TODO: Currently we store messages locally.
-    Shard = atom_to_binary(node()),
+store_batch(DB, Messages, Opts) ->
+    Shard = shard_of_messages(DB, Messages),
     Node = node_of_shard(DB, Shard),
     Node = node_of_shard(DB, Shard),
-    emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts).
+    emqx_ds_proto_v1:store_batch(Node, DB, Shard, Messages, Opts).
 
 
 -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
 -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
     [{emqx_ds:stream_rank(), stream()}].
     [{emqx_ds:stream_rank(), stream()}].
@@ -194,16 +189,15 @@ next(DB, Iter0, BatchSize) ->
 %% Internal exports (RPC targets)
 %% Internal exports (RPC targets)
 %%================================================================================
 %%================================================================================
 
 
--spec do_open_shard_v1(
-    emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts()
-) ->
-    ok | {error, _}.
-do_open_shard_v1(DB, Shard, Opts) ->
-    emqx_ds_storage_layer:open_shard({DB, Shard}, Opts).
-
--spec do_drop_shard_v1(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> ok | {error, _}.
-do_drop_shard_v1(DB, Shard) ->
-    emqx_ds_storage_layer:drop_shard({DB, Shard}).
+-spec do_drop_db_v1(emqx_ds:db()) -> ok | {error, _}.
+do_drop_db_v1(DB) ->
+    MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
+    lists:foreach(
+        fun(Shard) ->
+            emqx_ds_storage_layer:drop_shard({DB, Shard})
+        end,
+        MyShards
+    ).
 
 
 -spec do_store_batch_v1(
 -spec do_store_batch_v1(
     emqx_ds:db(),
     emqx_ds:db(),
@@ -247,9 +241,34 @@ do_next_v1(DB, Shard, Iter, BatchSize) ->
 %% Internal functions
 %% Internal functions
 %%================================================================================
 %%================================================================================
 
 
+%% TODO: there's no real leader election right now
+-spec maybe_set_myself_as_leader(emqx_ds:db(), shard_id()) -> ok.
+maybe_set_myself_as_leader(DB, Shard) ->
+    Site = emqx_ds_replication_layer_meta:this_site(),
+    case emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) of
+        [Site | _] ->
+            %% Currently the first in-sync replica always becomes the
+            %% leader
+            ok = emqx_ds_replication_layer_meta:set_leader(DB, Shard, node());
+        _Sites ->
+            ok
+    end.
+
 -spec node_of_shard(emqx_ds:db(), shard_id()) -> node().
 -spec node_of_shard(emqx_ds:db(), shard_id()) -> node().
-node_of_shard(_DB, Shard) ->
-    binary_to_atom(Shard).
+node_of_shard(DB, Shard) ->
+    case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of
+        {ok, Leader} ->
+            Leader;
+        {error, no_leader_for_shard} ->
+            %% TODO: use optvar
+            timer:sleep(500),
+            node_of_shard(DB, Shard)
+    end.
+
+%% Here we assume that all messages in the batch come from the same client
+shard_of_messages(DB, [#message{from = From} | _]) ->
+    N = emqx_ds_replication_layer_meta:n_shards(DB),
+    integer_to_binary(erlang:phash2(From, N)).
 
 
 list_nodes() ->
 list_nodes() ->
     mria:running_nodes().
     mria:running_nodes().

+ 130 - 8
apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl

@@ -21,11 +21,34 @@
 %% implementation details from this module.
 %% implementation details from this module.
 -module(emqx_ds_replication_layer_meta).
 -module(emqx_ds_replication_layer_meta).
 
 
+-behaviour(gen_server).
+
 %% API:
 %% API:
--export([init/0, shards/1, replica_set/2, sites/0, open_db/2, drop_db/1]).
+-export([
+    shards/1,
+    my_shards/1,
+    replica_set/2,
+    in_sync_replicas/2,
+    sites/0,
+    open_db/2,
+    drop_db/1,
+    shard_leader/2,
+    this_site/0,
+    set_leader/3
+]).
+
+%% gen_server
+-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
 
 
 %% internal exports:
 %% internal exports:
--export([open_db_trans/2, drop_db_trans/1, claim_site/2]).
+-export([
+    open_db_trans/2,
+    drop_db_trans/1,
+    claim_site/2,
+    in_sync_replicas_trans/2,
+    set_leader_trans/3,
+    n_shards/1
+]).
 
 
 -export_type([site/0]).
 -export_type([site/0]).
 
 
@@ -35,6 +58,8 @@
 %% Type declarations
 %% Type declarations
 %%================================================================================
 %%================================================================================
 
 
+-define(SERVER, ?MODULE).
+
 -define(SHARD, emqx_ds_builtin_metadata_shard).
 -define(SHARD, emqx_ds_builtin_metadata_shard).
 %% DS database metadata:
 %% DS database metadata:
 -define(META_TAB, emqx_ds_builtin_metadata_tab).
 -define(META_TAB, emqx_ds_builtin_metadata_tab).
@@ -56,7 +81,10 @@
 
 
 -record(?SHARD_TAB, {
 -record(?SHARD_TAB, {
     shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()},
     shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()},
+    %% Sites that the
     replica_set :: [site()],
     replica_set :: [site()],
+    %% Sites that contain the actual data:
+    in_sync_replicas :: [site()],
     leader :: node() | undefined,
     leader :: node() | undefined,
     misc = #{} :: map()
     misc = #{} :: map()
 }).
 }).
@@ -64,14 +92,21 @@
 %% Persistent ID of the node (independent from the IP/FQDN):
 %% Persistent ID of the node (independent from the IP/FQDN):
 -type site() :: binary().
 -type site() :: binary().
 
 
+%% Peristent term key:
+-define(emqx_ds_builtin_site, emqx_ds_builtin_site).
+
 %%================================================================================
 %%================================================================================
 %% API funcions
 %% API funcions
 %%================================================================================
 %%================================================================================
 
 
--spec init() -> ok.
-init() ->
-    ensure_tables(),
-    ensure_site().
+-spec n_shards(emqx_ds:db()) -> pos_integer().
+n_shards(DB) ->
+    [#?META_TAB{db_props = #{n_shards := NShards}}] = mnesia:dirty_read(?META_TAB, DB),
+    NShards.
+
+-spec start_link() -> {ok, pid()}.
+start_link() ->
+    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
 
 
 -spec shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
 -spec shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
 shards(DB) ->
 shards(DB) ->
@@ -79,6 +114,20 @@ shards(DB) ->
         qlc:q([Shard || #?SHARD_TAB{shard = {D, Shard}} <- mnesia:table(?SHARD_TAB), D =:= DB])
         qlc:q([Shard || #?SHARD_TAB{shard = {D, Shard}} <- mnesia:table(?SHARD_TAB), D =:= DB])
     ).
     ).
 
 
+-spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
+my_shards(DB) ->
+    Site = this_site(),
+    eval_qlc(
+        qlc:q([
+            Shard
+         || #?SHARD_TAB{shard = {D, Shard}, replica_set = ReplicaSet, in_sync_replicas = InSync} <- mnesia:table(
+                ?SHARD_TAB
+            ),
+            D =:= DB,
+            lists:member(Site, ReplicaSet) orelse lists:member(Site, InSync)
+        ])
+    ).
+
 -spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
 -spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
     {ok, [site()]} | {error, _}.
     {ok, [site()]} | {error, _}.
 replica_set(DB, Shard) ->
 replica_set(DB, Shard) ->
@@ -89,10 +138,37 @@ replica_set(DB, Shard) ->
             {error, no_shard}
             {error, no_shard}
     end.
     end.
 
 
+-spec in_sync_replicas(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
+    [site()].
+in_sync_replicas(DB, ShardId) ->
+    {atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:in_sync_replicas_trans/2, [DB, ShardId]),
+    case Result of
+        {ok, InSync} ->
+            InSync;
+        {error, _} ->
+            []
+    end.
+
 -spec sites() -> [site()].
 -spec sites() -> [site()].
 sites() ->
 sites() ->
     eval_qlc(qlc:q([Site || #?NODE_TAB{site = Site} <- mnesia:table(?NODE_TAB)])).
     eval_qlc(qlc:q([Site || #?NODE_TAB{site = Site} <- mnesia:table(?NODE_TAB)])).
 
 
+-spec shard_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
+    {ok, node()} | {error, no_leader_for_shard}.
+shard_leader(DB, Shard) ->
+    case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
+        [#?SHARD_TAB{leader = Leader}] ->
+            {ok, Leader};
+        [] ->
+            {error, no_leader_for_shard}
+    end.
+
+-spec set_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), node()) ->
+    ok.
+set_leader(DB, Shard, Node) ->
+    {atomic, _} = mria:transaction(?SHARD, fun ?MODULE:set_leader_trans/3, [DB, Shard, Node]),
+    ok.
+
 -spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
 -spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
     emqx_ds_replication_layer:builtin_db_opts().
     emqx_ds_replication_layer:builtin_db_opts().
 open_db(DB, DefaultOpts) ->
 open_db(DB, DefaultOpts) ->
@@ -108,6 +184,29 @@ drop_db(DB) ->
 %% behavior callbacks
 %% behavior callbacks
 %%================================================================================
 %%================================================================================
 
 
+-record(s, {}).
+
+init([]) ->
+    process_flag(trap_exit, true),
+    logger:set_process_metadata(#{domain => [ds, meta]}),
+    ensure_tables(),
+    ensure_site(),
+    S = #s{},
+    {ok, S}.
+
+handle_call(_Call, _From, S) ->
+    {reply, {error, unknown_call}, S}.
+
+handle_cast(_Cast, S) ->
+    {noreply, S}.
+
+handle_info(_Info, S) ->
+    {noreply, S}.
+
+terminate(_Reason, #s{}) ->
+    persistent_term:erase(?emqx_ds_builtin_site),
+    ok.
+
 %%================================================================================
 %%================================================================================
 %% Internal exports
 %% Internal exports
 %%================================================================================
 %%================================================================================
@@ -136,6 +235,23 @@ drop_db_trans(DB) ->
 claim_site(Site, Node) ->
 claim_site(Site, Node) ->
     mnesia:write(#?NODE_TAB{site = Site, node = Node}).
     mnesia:write(#?NODE_TAB{site = Site, node = Node}).
 
 
+-spec in_sync_replicas_trans(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
+    {ok, [site()]} | {error, no_shard}.
+in_sync_replicas_trans(DB, Shard) ->
+    case mnesia:read(?SHARD_TAB, {DB, Shard}) of
+        [#?SHARD_TAB{in_sync_replicas = InSync}] ->
+            {ok, InSync};
+        [] ->
+            {error, no_shard}
+    end.
+
+-spec set_leader_trans(emqx_ds:ds(), emqx_ds_replication_layer:shard_id(), node()) ->
+    ok.
+set_leader_trans(DB, Shard, Node) ->
+    [Record0] = mnesia:wread({?SHARD_TAB, {DB, Shard}}),
+    Record = Record0#?SHARD_TAB{leader = Node},
+    mnesia:write(Record).
+
 %%================================================================================
 %%================================================================================
 %% Internal functions
 %% Internal functions
 %%================================================================================
 %%================================================================================
@@ -182,8 +298,13 @@ ensure_site() ->
             file:close(FD)
             file:close(FD)
     end,
     end,
     {atomic, ok} = mria:transaction(?SHARD, fun ?MODULE:claim_site/2, [Site, node()]),
     {atomic, ok} = mria:transaction(?SHARD, fun ?MODULE:claim_site/2, [Site, node()]),
+    persistent_term:put(?emqx_ds_builtin_site, Site),
     ok.
     ok.
 
 
+-spec this_site() -> site().
+this_site() ->
+    persistent_term:get(?emqx_ds_builtin_site).
+
 -spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok.
 -spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok.
 create_shards(DB, NShards, ReplicationFactor) ->
 create_shards(DB, NShards, ReplicationFactor) ->
     Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)],
     Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)],
@@ -193,10 +314,11 @@ create_shards(DB, NShards, ReplicationFactor) ->
             Hashes0 = [{hash(Shard, Site), Site} || Site <- Sites],
             Hashes0 = [{hash(Shard, Site), Site} || Site <- Sites],
             Hashes = lists:sort(Hashes0),
             Hashes = lists:sort(Hashes0),
             {_, Sites} = lists:unzip(Hashes),
             {_, Sites} = lists:unzip(Hashes),
-            ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor),
+            [First | _] = ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor),
             Record = #?SHARD_TAB{
             Record = #?SHARD_TAB{
                 shard = {DB, Shard},
                 shard = {DB, Shard},
-                replica_set = ReplicaSet
+                replica_set = ReplicaSet,
+                in_sync_replicas = [First]
             },
             },
             mnesia:write(Record)
             mnesia:write(Record)
         end,
         end,

+ 10 - 1
apps/emqx_durable_storage/src/emqx_ds_sup.erl

@@ -30,7 +30,7 @@ start_link() ->
 %%================================================================================
 %%================================================================================
 
 
 init([]) ->
 init([]) ->
-    Children = [storage_layer_sup()],
+    Children = [meta(), storage_layer_sup()],
     SupFlags = #{
     SupFlags = #{
         strategy => one_for_all,
         strategy => one_for_all,
         intensity => 0,
         intensity => 0,
@@ -42,6 +42,15 @@ init([]) ->
 %% Internal functions
 %% Internal functions
 %%================================================================================
 %%================================================================================
 
 
+meta() ->
+    #{
+        id => emqx_ds_replication_layer_meta,
+        start => {emqx_ds_replication_layer_meta, start_link, []},
+        restart => permanent,
+        type => worker,
+        shutdown => 5000
+    }.
+
 storage_layer_sup() ->
 storage_layer_sup() ->
     #{
     #{
         id => local_store_shard_sup,
         id => local_store_shard_sup,

+ 5 - 15
apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl

@@ -19,7 +19,7 @@
 
 
 -include_lib("emqx_utils/include/bpapi.hrl").
 -include_lib("emqx_utils/include/bpapi.hrl").
 %% API:
 %% API:
--export([open_shard/4, drop_shard/3, store_batch/5, get_streams/5, make_iterator/6, next/5]).
+-export([drop_db/2, store_batch/5, get_streams/5, make_iterator/6, next/5]).
 
 
 %% behavior callbacks:
 %% behavior callbacks:
 -export([introduced_in/0]).
 -export([introduced_in/0]).
@@ -28,20 +28,10 @@
 %% API funcions
 %% API funcions
 %%================================================================================
 %%================================================================================
 
 
--spec open_shard(
-    node(),
-    emqx_ds:db(),
-    emqx_ds_replication_layer:shard_id(),
-    emqx_ds:create_db_opts()
-) ->
-    ok.
-open_shard(Node, DB, Shard, Opts) ->
-    erpc:call(Node, emqx_ds_replication_layer, do_open_shard_v1, [DB, Shard, Opts]).
-
--spec drop_shard(node(), emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
-    ok.
-drop_shard(Node, DB, Shard) ->
-    erpc:call(Node, emqx_ds_replication_layer, do_drop_shard_v1, [DB, Shard]).
+-spec drop_db([node()], emqx_ds:db()) ->
+    [{ok, ok} | erpc:caught_call_exception()].
+drop_db(Node, DB) ->
+    erpc:multicall(Node, emqx_ds_replication_layer, do_drop_db_v1, [DB]).
 
 
 -spec get_streams(
 -spec get_streams(
     node(),
     node(),

+ 20 - 6
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -23,7 +23,7 @@
 -include_lib("stdlib/include/assert.hrl").
 -include_lib("stdlib/include/assert.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 
--define(N_SHARDS, 8).
+-define(N_SHARDS, 1).
 
 
 opts() ->
 opts() ->
     #{
     #{
@@ -38,18 +38,32 @@ opts() ->
 t_00_smoke_open_drop(_Config) ->
 t_00_smoke_open_drop(_Config) ->
     DB = 'DB',
     DB = 'DB',
     ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
     ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    %% Check metadata:
+    %%    We have only one site:
     [Site] = emqx_ds_replication_layer_meta:sites(),
     [Site] = emqx_ds_replication_layer_meta:sites(),
+    %%    Check all shards:
     Shards = emqx_ds_replication_layer_meta:shards(DB),
     Shards = emqx_ds_replication_layer_meta:shards(DB),
+    %%    Since there is only one site all shards should be allocated
+    %%    to this site:
+    MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
     ?assertEqual(?N_SHARDS, length(Shards)),
     ?assertEqual(?N_SHARDS, length(Shards)),
     lists:foreach(
     lists:foreach(
         fun(Shard) ->
         fun(Shard) ->
             ?assertEqual(
             ?assertEqual(
-                {ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard), {DB, Shard}
-            )
+                {ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard)
+            ),
+            ?assertEqual(
+                [Site], emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard)
+            ),
+            %%  Check that the leader is eleected;
+            ?assertEqual({ok, node()}, emqx_ds_replication_layer_meta:shard_leader(DB, Shard))
         end,
         end,
         Shards
         Shards
     ),
     ),
+    ?assertEqual(lists:sort(Shards), lists:sort(MyShards)),
+    %% Reopen the DB and make sure the operation is idempotent:
     ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
     ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    %% Close the DB:
     ?assertMatch(ok, emqx_ds:drop_db(DB)).
     ?assertMatch(ok, emqx_ds:drop_db(DB)).
 
 
 %% A simple smoke test that verifies that storing the messages doesn't
 %% A simple smoke test that verifies that storing the messages doesn't
@@ -153,11 +167,11 @@ end_per_suite(Config) ->
     ok.
     ok.
 
 
 init_per_testcase(_TC, Config) ->
 init_per_testcase(_TC, Config) ->
-    %% snabbkaffe:fix_ct_logging(),
     application:ensure_all_started(emqx_durable_storage),
     application:ensure_all_started(emqx_durable_storage),
     Config.
     Config.
 
 
 end_per_testcase(_TC, _Config) ->
 end_per_testcase(_TC, _Config) ->
     ok = application:stop(emqx_durable_storage),
     ok = application:stop(emqx_durable_storage),
-    mnesia:delete_schema([node()]),
-    mria:stop().
+    mria:stop(),
+    _ = mnesia:delete_schema([node()]),
+    ok.

+ 3 - 3
apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl

@@ -16,7 +16,7 @@
 -define(DEFAULT_CONFIG, #{
 -define(DEFAULT_CONFIG, #{
     backend => builtin,
     backend => builtin,
     storage => {emqx_ds_storage_bitfield_lts, #{}},
     storage => {emqx_ds_storage_bitfield_lts, #{}},
-    n_shards => 255,
+    n_shards => 16,
     replication_factor => 3
     replication_factor => 3
 }).
 }).
 
 
@@ -26,7 +26,7 @@
         {emqx_ds_storage_bitfield_lts, #{
         {emqx_ds_storage_bitfield_lts, #{
             bits_per_wildcard_level => 8
             bits_per_wildcard_level => 8
         }},
         }},
-    n_shards => 255,
+    n_shards => 16,
     replication_factor => 3
     replication_factor => 3
 }).
 }).
 
 
@@ -391,7 +391,7 @@ end_per_testcase(TC, _Config) ->
     ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)).
     ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)).
 
 
 shard(TC) ->
 shard(TC) ->
-    {?MODULE, TC}.
+    {?MODULE, atom_to_binary(TC)}.
 
 
 keyspace(TC) ->
 keyspace(TC) ->
     TC.
     TC.