Просмотр исходного кода

Merge pull request #12825 from keynslug/feat/EMQX-12110/repl-meta-api

feat(dsrepl): add APIs to manage DB replication sites
Andrew Mayorov 1 год назад
Родитель
Сommit
70396e9766

+ 1 - 1
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -476,7 +476,7 @@ t_replication_options(_Config) ->
                 resend_window := 60
             }
         },
-        emqx_ds_replication_layer_meta:get_options(?PERSISTENT_MESSAGE_DB)
+        emqx_ds_replication_layer_meta:db_config(?PERSISTENT_MESSAGE_DB)
     ),
     ?assertMatch(
         #{

+ 4 - 4
apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl

@@ -124,7 +124,7 @@ init({#?db_sup{db = DB}, DefaultOpts}) ->
     Children = [
         sup_spec(#?shards_sup{db = DB}, []),
         sup_spec(#?egress_sup{db = DB}, []),
-        shard_allocator_spec(DB, Opts)
+        shard_allocator_spec(DB)
     ],
     SupFlags = #{
         strategy => one_for_all,
@@ -156,7 +156,7 @@ init({#?shard_sup{db = DB, shard = Shard}, _}) ->
         intensity => 10,
         period => 100
     },
-    Opts = emqx_ds_replication_layer_meta:get_options(DB),
+    Opts = emqx_ds_replication_layer_meta:db_config(DB),
     Children = [
         shard_storage_spec(DB, Shard, Opts),
         shard_replication_spec(DB, Shard, Opts)
@@ -236,10 +236,10 @@ shard_replication_spec(DB, Shard, Opts) ->
         type => worker
     }.
 
-shard_allocator_spec(DB, Opts) ->
+shard_allocator_spec(DB) ->
     #{
         id => shard_allocator,
-        start => {emqx_ds_replication_shard_allocator, start_link, [DB, Opts]},
+        start => {emqx_ds_replication_shard_allocator, start_link, [DB]},
         restart => permanent,
         type => worker
     }.

+ 1 - 2
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -189,8 +189,7 @@ add_generation(DB) ->
 
 -spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
 update_db_config(DB, CreateOpts) ->
-    ok = emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts),
-    Opts = emqx_ds_replication_layer_meta:get_options(DB),
+    Opts = #{} = emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts),
     foreach_shard(
         DB,
         fun(Shard) -> ok = ra_update_config(DB, Shard, Opts) end

+ 286 - 102
apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl

@@ -29,25 +29,43 @@
 -export([
     shards/1,
     my_shards/1,
-    allocate_shards/2,
-    replica_set/2,
+    allocate_shards/1,
     sites/0,
     node/1,
-    open_db/2,
-    get_options/1,
-    update_db_config/2,
-    drop_db/1,
     this_site/0,
     print_status/0
 ]).
 
+%% DB API:
+-export([
+    open_db/2,
+    db_config/1,
+    update_db_config/2,
+    drop_db/1
+]).
+
+%% Site / shard allocation:
+-export([
+    join_db_site/2,
+    leave_db_site/2,
+    assign_db_sites/2,
+    replica_set_transitions/2,
+    update_replica_set/3,
+    db_sites/1,
+    replica_set/2,
+    target_set/2
+]).
+
 %% gen_server
 -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
 
 %% internal exports:
 -export([
     open_db_trans/2,
-    allocate_shards_trans/2,
+    allocate_shards_trans/1,
+    assign_db_sites_trans/2,
+    modify_db_sites_trans/2,
+    update_replica_set_trans/3,
     update_db_config_trans/2,
     drop_db_trans/1,
     claim_site/2,
@@ -86,15 +104,20 @@
 
 -record(?SHARD_TAB, {
     shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()},
+    %% Sites that currently contain the data:
+    replica_set :: [site()],
     %% Sites that should contain the data when the cluster is in the
     %% stable state (no nodes are being added or removed from it):
-    replica_set :: [site()],
+    target_set :: [site()] | undefined,
     misc = #{} :: map()
 }).
 
 %% Persistent ID of the node (independent from the IP/FQDN):
 -type site() :: binary().
 
+%% Membership transition of shard's replica set:
+-type transition() :: {add | del, site()}.
+
 %% Peristent term key:
 -define(emqx_ds_builtin_site, emqx_ds_builtin_site).
 
@@ -156,17 +179,17 @@ start_link() ->
 
 -spec shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
 shards(DB) ->
-    filter_shards(DB).
+    Recs = mnesia:dirty_match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'})),
+    [Shard || #?SHARD_TAB{shard = {_, Shard}} <- Recs].
 
 -spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
 my_shards(DB) ->
     Site = this_site(),
-    filter_shards(DB, fun(#?SHARD_TAB{replica_set = ReplicaSet}) ->
-        lists:member(Site, ReplicaSet)
-    end).
+    Recs = mnesia:dirty_match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'})),
+    [Shard || #?SHARD_TAB{shard = {_, Shard}, replica_set = RS} <- Recs, lists:member(Site, RS)].
 
-allocate_shards(DB, Opts) ->
-    case mria:transaction(?SHARD, fun ?MODULE:allocate_shards_trans/2, [DB, Opts]) of
+allocate_shards(DB) ->
+    case mria:transaction(?SHARD, fun ?MODULE:allocate_shards_trans/1, [DB]) of
         {atomic, Shards} ->
             {ok, Shards};
         {aborted, {shards_already_allocated, Shards}} ->
@@ -175,16 +198,6 @@ allocate_shards(DB, Opts) ->
             {error, #{reason => insufficient_sites_online, needed => Needed, sites => Sites}}
     end.
 
--spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
-    {ok, [site()]} | {error, _}.
-replica_set(DB, Shard) ->
-    case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
-        [#?SHARD_TAB{replica_set = ReplicaSet}] ->
-            {ok, ReplicaSet};
-        [] ->
-            {error, no_shard}
-    end.
-
 -spec sites() -> [site()].
 sites() ->
     eval_qlc(qlc:q([Site || #?NODE_TAB{site = Site} <- mnesia:table(?NODE_TAB)])).
@@ -198,8 +211,12 @@ node(Site) ->
             undefined
     end.
 
--spec get_options(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts().
-get_options(DB) ->
+%%===============================================================================
+%% DB API
+%%===============================================================================
+
+-spec db_config(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts().
+db_config(DB) ->
     case mnesia:dirty_read(?META_TAB, DB) of
         [#?META_TAB{db_props = Opts}] ->
             Opts;
@@ -210,21 +227,91 @@ get_options(DB) ->
 -spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
     emqx_ds_replication_layer:builtin_db_opts().
 open_db(DB, DefaultOpts) ->
-    {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]),
-    Opts.
+    transaction(fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]).
 
 -spec update_db_config(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
-    ok | {error, _}.
+    emqx_ds_replication_layer:builtin_db_opts() | {error, nonexistent_db}.
 update_db_config(DB, DefaultOpts) ->
-    {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:update_db_config_trans/2, [
-        DB, DefaultOpts
-    ]),
-    Opts.
+    transaction(fun ?MODULE:update_db_config_trans/2, [DB, DefaultOpts]).
 
 -spec drop_db(emqx_ds:db()) -> ok.
 drop_db(DB) ->
-    _ = mria:transaction(?SHARD, fun ?MODULE:drop_db_trans/1, [DB]),
-    ok.
+    transaction(fun ?MODULE:drop_db_trans/1, [DB]).
+
+%%===============================================================================
+%% Site / shard allocation API
+%%===============================================================================
+
+%% @doc Join a site to the set of sites the DB is replicated across.
+-spec join_db_site(emqx_ds:db(), site()) ->
+    ok | {error, nonexistent_db | nonexistent_sites}.
+join_db_site(DB, Site) ->
+    transaction(fun ?MODULE:modify_db_sites_trans/2, [DB, [{add, Site}]]).
+
+%% @doc Make a site leave the set of sites the DB is replicated across.
+-spec leave_db_site(emqx_ds:db(), site()) ->
+    ok | {error, nonexistent_db | nonexistent_sites}.
+leave_db_site(DB, Site) ->
+    transaction(fun ?MODULE:modify_db_sites_trans/2, [DB, [{del, Site}]]).
+
+%% @doc Assign a set of sites to the DB for replication.
+-spec assign_db_sites(emqx_ds:db(), [site()]) ->
+    ok | {error, nonexistent_db | nonexistent_sites}.
+assign_db_sites(DB, Sites) ->
+    transaction(fun ?MODULE:assign_db_sites_trans/2, [DB, Sites]).
+
+%% @doc List the sites the DB is replicated across.
+-spec db_sites(emqx_ds:db()) -> [site()].
+db_sites(DB) ->
+    Recs = mnesia:dirty_match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'})),
+    list_db_sites(Recs).
+
+%% @doc List the sequence of transitions that should be conducted in order to
+%% bring the set of replicas for a DB shard in line with the target set.
+-spec replica_set_transitions(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
+    [transition()] | undefined.
+replica_set_transitions(DB, Shard) ->
+    case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
+        [#?SHARD_TAB{target_set = TargetSet, replica_set = ReplicaSet}] ->
+            compute_transitions(TargetSet, ReplicaSet);
+        [] ->
+            undefined
+    end.
+
+%% @doc Update the set of replication sites for a shard.
+%% To be called after a `transition()` has been conducted successfully.
+-spec update_replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), transition()) -> ok.
+update_replica_set(DB, Shard, Trans) ->
+    case mria:transaction(?SHARD, fun ?MODULE:update_replica_set_trans/3, [DB, Shard, Trans]) of
+        {atomic, ok} ->
+            ok;
+        {aborted, Reason} ->
+            {error, Reason}
+    end.
+
+%% @doc Get the current set of replication sites for a shard.
+-spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
+    [site()] | undefined.
+replica_set(DB, Shard) ->
+    case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
+        [#?SHARD_TAB{replica_set = ReplicaSet}] ->
+            ReplicaSet;
+        [] ->
+            undefined
+    end.
+
+%% @doc Get the target set of replication sites for a DB shard.
+%% Target set is updated every time the set of replication sites for the DB changes.
+%% See `join_db_site/2`, `leave_db_site/2`, `assign_db_sites/2`.
+-spec target_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
+    [site()] | undefined.
+target_set(DB, Shard) ->
+    case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
+        [#?SHARD_TAB{target_set = TargetSet}] ->
+            TargetSet;
+        [] ->
+            undefined
+    end.
 
 %%================================================================================
 %% behavior callbacks
@@ -268,19 +355,15 @@ open_db_trans(DB, CreateOpts) ->
             Opts
     end.
 
--spec allocate_shards_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> [_Shard].
-allocate_shards_trans(DB, Opts) ->
-    NShards = maps:get(n_shards, Opts),
-    NSites = maps:get(n_sites, Opts),
-    ReplicationFactor = maps:get(replication_factor, Opts),
-    NReplicas = min(NSites, ReplicationFactor),
-    Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)],
-    AllSites = mnesia:match_object(?NODE_TAB, ?NODE_PAT(), read),
-    case length(AllSites) of
+-spec allocate_shards_trans(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
+allocate_shards_trans(DB) ->
+    Opts = #{n_shards := NShards, n_sites := NSites} = db_config_trans(DB),
+    Nodes = mnesia:match_object(?NODE_TAB, ?NODE_PAT(), read),
+    case length(Nodes) of
         N when N >= NSites ->
             ok;
         _ ->
-            mnesia:abort({insufficient_sites_online, NSites, AllSites})
+            mnesia:abort({insufficient_sites_online, NSites, Nodes})
     end,
     case mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write) of
         [] ->
@@ -289,18 +372,11 @@ allocate_shards_trans(DB, Opts) ->
             ShardsAllocated = [Shard || #?SHARD_TAB{shard = {_DB, Shard}} <- Records],
             mnesia:abort({shards_already_allocated, ShardsAllocated})
     end,
-    {Allocation, _} = lists:mapfoldl(
-        fun(Shard, SSites) ->
-            {Sites, _} = emqx_utils_stream:consume(NReplicas, SSites),
-            {_, SRest} = emqx_utils_stream:consume(1, SSites),
-            {{Shard, Sites}, SRest}
-        end,
-        emqx_utils_stream:repeat(emqx_utils_stream:list(AllSites)),
-        Shards
-    ),
+    Shards = gen_shards(NShards),
+    Sites = [S || #?NODE_TAB{site = S} <- Nodes],
+    Allocation = compute_allocation(Shards, Sites, Opts),
     lists:map(
-        fun({Shard, Sites}) ->
-            ReplicaSet = [Site || #?NODE_TAB{site = Site} <- Sites],
+        fun({Shard, ReplicaSet}) ->
             Record = #?SHARD_TAB{
                 shard = {DB, Shard},
                 replica_set = ReplicaSet
@@ -311,29 +387,80 @@ allocate_shards_trans(DB, Opts) ->
         Allocation
     ).
 
--spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
-    ok | {error, database}.
-update_db_config_trans(DB, CreateOpts) ->
-    case mnesia:wread({?META_TAB, DB}) of
-        [#?META_TAB{db_props = Opts}] ->
-            %% Since this is an update and not a reopen,
-            %% we should keep the shard number and replication factor
-            %% and not create a new shard server
-            #{
-                n_shards := NShards,
-                replication_factor := ReplicationFactor
-            } = Opts,
-
-            mnesia:write(#?META_TAB{
-                db = DB,
-                db_props = CreateOpts#{
-                    n_shards := NShards,
-                    replication_factor := ReplicationFactor
-                }
-            }),
+-spec assign_db_sites_trans(emqx_ds:db(), [site()]) -> ok.
+assign_db_sites_trans(DB, Sites) ->
+    Opts = db_config_trans(DB),
+    case [S || S <- Sites, mnesia:read(?NODE_TAB, S, read) == []] of
+        [] ->
+            ok;
+        NonexistentSites ->
+            mnesia:abort({nonexistent_sites, NonexistentSites})
+    end,
+    %% TODO
+    %% Optimize reallocation. The goals are:
+    %% 1. Minimize the number of membership transitions.
+    %% 2. Ensure that sites are responsible for roughly the same number of shards.
+    Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write),
+    Reallocation = compute_allocation(Shards, Sites, Opts),
+    lists:foreach(
+        fun({Record, ReplicaSet}) ->
+            ok = mnesia:write(Record#?SHARD_TAB{target_set = ReplicaSet})
+        end,
+        Reallocation
+    ).
+
+-spec modify_db_sites_trans(emqx_ds:db(), [transition()]) -> ok.
+modify_db_sites_trans(DB, Modifications) ->
+    Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write),
+    Sites0 = list_db_target_sites(Shards),
+    Sites = lists:foldl(fun apply_transition/2, Sites0, Modifications),
+    case Sites of
+        Sites0 ->
             ok;
+        _Chagned ->
+            assign_db_sites_trans(DB, Sites)
+    end.
+
+update_replica_set_trans(DB, Shard, Trans) ->
+    case mnesia:read(?SHARD_TAB, {DB, Shard}, write) of
+        [Record = #?SHARD_TAB{replica_set = ReplicaSet0, target_set = TargetSet0}] ->
+            ReplicaSet = apply_transition(Trans, ReplicaSet0),
+            case lists:usort(TargetSet0) of
+                ReplicaSet ->
+                    TargetSet = undefined;
+                TS ->
+                    TargetSet = TS
+            end,
+            mnesia:write(Record#?SHARD_TAB{replica_set = ReplicaSet, target_set = TargetSet});
+        [] ->
+            mnesia:abort({nonexistent_shard, {DB, Shard}})
+    end.
+
+-spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
+    emqx_ds_replication_layer:builtin_db_opts().
+update_db_config_trans(DB, UpdateOpts) ->
+    Opts = db_config_trans(DB, write),
+    %% Since this is an update and not a reopen,
+    %% we should keep the shard number and replication factor
+    %% and not create a new shard server
+    ChangeableOpts = maps:without([n_shards, n_sites, replication_factor], UpdateOpts),
+    EffectiveOpts = maps:merge(Opts, ChangeableOpts),
+    ok = mnesia:write(#?META_TAB{
+        db = DB,
+        db_props = EffectiveOpts
+    }),
+    EffectiveOpts.
+
+-spec db_config_trans(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts().
+db_config_trans(DB) ->
+    db_config_trans(DB, read).
+
+db_config_trans(DB, LockType) ->
+    case mnesia:read(?META_TAB, DB, LockType) of
+        [#?META_TAB{db_props = Config}] ->
+            Config;
         [] ->
-            {error, no_database}
+            mnesia:abort({nonexistent_db, DB})
     end.
 
 -spec drop_db_trans(emqx_ds:db()) -> ok.
@@ -391,6 +518,61 @@ ensure_site() ->
     persistent_term:put(?emqx_ds_builtin_site, Site),
     ok.
 
+%% @doc Returns sorted list of sites shards are replicated across.
+-spec list_db_sites([_Shard]) -> [site()].
+list_db_sites(Shards) ->
+    flatmap_sorted_set(fun get_shard_sites/1, Shards).
+
+-spec list_db_target_sites([_Shard]) -> [site()].
+list_db_target_sites(Shards) ->
+    flatmap_sorted_set(fun get_shard_target_sites/1, Shards).
+
+-spec get_shard_sites(_Shard) -> [site()].
+get_shard_sites(#?SHARD_TAB{replica_set = ReplicaSet}) ->
+    ReplicaSet.
+
+-spec get_shard_target_sites(_Shard) -> [site()].
+get_shard_target_sites(#?SHARD_TAB{target_set = Sites}) when is_list(Sites) ->
+    Sites;
+get_shard_target_sites(#?SHARD_TAB{target_set = undefined} = Shard) ->
+    get_shard_sites(Shard).
+
+-spec compute_allocation([Shard], [Site], emqx_ds_replication_layer:builtin_db_opts()) ->
+    [{Shard, [Site, ...]}].
+compute_allocation(Shards, Sites, Opts) ->
+    NSites = length(Sites),
+    ReplicationFactor = maps:get(replication_factor, Opts),
+    NReplicas = min(NSites, ReplicationFactor),
+    ShardsSorted = lists:sort(Shards),
+    SitesSorted = lists:sort(Sites),
+    {Allocation, _} = lists:mapfoldl(
+        fun(Shard, SSites) ->
+            {ReplicaSet, _} = emqx_utils_stream:consume(NReplicas, SSites),
+            {_, SRest} = emqx_utils_stream:consume(1, SSites),
+            {{Shard, ReplicaSet}, SRest}
+        end,
+        emqx_utils_stream:repeat(emqx_utils_stream:list(SitesSorted)),
+        ShardsSorted
+    ),
+    Allocation.
+
+compute_transitions(undefined, _ReplicaSet) ->
+    [];
+compute_transitions(TargetSet, ReplicaSet) ->
+    Additions = TargetSet -- ReplicaSet,
+    Deletions = ReplicaSet -- TargetSet,
+    intersperse([{add, S} || S <- Additions], [{del, S} || S <- Deletions]).
+
+%% @doc Apply a transition to a list of sites, preserving sort order.
+-spec apply_transition(transition(), [site()]) -> [site()].
+apply_transition({add, S}, Sites) ->
+    lists:usort([S | Sites]);
+apply_transition({del, S}, Sites) ->
+    lists:delete(S, Sites).
+
+gen_shards(NShards) ->
+    [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)].
+
 eval_qlc(Q) ->
     case mnesia:is_transaction() of
         true ->
@@ -400,29 +582,31 @@ eval_qlc(Q) ->
             Result
     end.
 
-filter_shards(DB) ->
-    filter_shards(DB, const(true)).
-
--spec filter_shards(emqx_ds:db(), fun((_) -> boolean())) ->
-    [emqx_ds_replication_layer:shard_id()].
-filter_shards(DB, Predicte) ->
-    filter_shards(DB, Predicte, fun(#?SHARD_TAB{shard = {_, ShardId}}) ->
-        ShardId
-    end).
-
-filter_shards(DB, Predicate, Mapper) ->
-    eval_qlc(
-        qlc:q([
-            Mapper(Shard)
-         || #?SHARD_TAB{shard = {D, _}} = Shard <- mnesia:table(
-                ?SHARD_TAB
-            ),
-            D =:= DB,
-            Predicate(Shard)
-        ])
-    ).
-
-const(Result) ->
-    fun(_) ->
-        Result
+transaction(Fun, Args) ->
+    case mria:transaction(?SHARD, Fun, Args) of
+        {atomic, Result} ->
+            Result;
+        {aborted, Reason} ->
+            {error, Reason}
     end.
+
+%% @doc Intersperse elements of two lists.
+%% Example: intersperse([1, 2], [3, 4, 5]) -> [1, 3, 2, 4, 5].
+-spec intersperse([X], [Y]) -> [X | Y].
+intersperse(L1, []) ->
+    L1;
+intersperse([], L2) ->
+    L2;
+intersperse([H1 | T1], L2) ->
+    [H1 | intersperse(L2, T1)].
+
+%% @doc Map list into a list of sets and return union, as a sorted list.
+-spec flatmap_sorted_set(fun((X) -> [Y]), [X]) -> [Y].
+flatmap_sorted_set(Fun, L) ->
+    ordsets:to_list(
+        lists:foldl(
+            fun(X, Acc) -> ordsets:union(ordsets:from_list(Fun(X)), Acc) end,
+            ordsets:new(),
+            L
+        )
+    ).

+ 1 - 1
apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl

@@ -44,7 +44,7 @@ start_link(DB, Shard, Opts) ->
     gen_server:start_link(?MODULE, {DB, Shard, Opts}, []).
 
 shard_servers(DB, Shard) ->
-    {ok, ReplicaSet} = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
+    ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
     [
         {server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)}
      || Site <- ReplicaSet

+ 24 - 30
apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl

@@ -16,7 +16,7 @@
 
 -module(emqx_ds_replication_shard_allocator).
 
--export([start_link/2]).
+-export([start_link/1]).
 
 -export([n_shards/1]).
 -export([shard_meta/2]).
@@ -35,8 +35,8 @@
 
 %%
 
-start_link(DB, Opts) ->
-    gen_server:start_link(?MODULE, {DB, Opts}, []).
+start_link(DB) ->
+    gen_server:start_link(?MODULE, DB, []).
 
 n_shards(DB) ->
     Meta = persistent_term:get(?db_meta(DB)),
@@ -49,22 +49,11 @@ shard_meta(DB, Shard) ->
 
 -define(ALLOCATE_RETRY_TIMEOUT, 1_000).
 
-init({DB, Opts}) ->
+init(DB) ->
     _ = erlang:process_flag(trap_exit, true),
     _ = logger:set_process_metadata(#{db => DB, domain => [ds, db, shard_allocator]}),
-    State = #{db => DB, opts => Opts, status => allocating},
-    case allocate_shards(State) of
-        {ok, NState} ->
-            {ok, NState};
-        {error, Data} ->
-            _ = logger:notice(
-                Data#{
-                    msg => "Shard allocation still in progress",
-                    retry_in => ?ALLOCATE_RETRY_TIMEOUT
-                }
-            ),
-            {ok, State, ?ALLOCATE_RETRY_TIMEOUT}
-    end.
+    State = #{db => DB, status => allocating},
+    handle_allocate_shards(State, ok).
 
 handle_call(_Call, _From, State) ->
     {reply, ignored, State}.
@@ -73,9 +62,22 @@ handle_cast(_Cast, State) ->
     {noreply, State}.
 
 handle_info(timeout, State) ->
+    handle_allocate_shards(State, noreply);
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, #{db := DB, shards := Shards}) ->
+    erase_db_meta(DB),
+    erase_shards_meta(DB, Shards);
+terminate(_Reason, #{}) ->
+    ok.
+
+%%
+
+handle_allocate_shards(State, Ret) ->
     case allocate_shards(State) of
         {ok, NState} ->
-            {noreply, NState};
+            {Ret, NState};
         {error, Data} ->
             _ = logger:notice(
                 Data#{
@@ -83,21 +85,13 @@ handle_info(timeout, State) ->
                     retry_in => ?ALLOCATE_RETRY_TIMEOUT
                 }
             ),
-            {noreply, State, ?ALLOCATE_RETRY_TIMEOUT}
-    end;
-handle_info(_Info, State) ->
-    {noreply, State}.
-
-terminate(_Reason, #{db := DB, shards := Shards}) ->
-    erase_db_meta(DB),
-    erase_shards_meta(DB, Shards);
-terminate(_Reason, #{}) ->
-    ok.
+            {Ret, State, ?ALLOCATE_RETRY_TIMEOUT}
+    end.
 
 %%
 
-allocate_shards(State = #{db := DB, opts := Opts}) ->
-    case emqx_ds_replication_layer_meta:allocate_shards(DB, Opts) of
+allocate_shards(State = #{db := DB}) ->
+    case emqx_ds_replication_layer_meta:allocate_shards(DB) of
         {ok, Shards} ->
             logger:notice(#{msg => "Shards allocated", shards => Shards}),
             ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB)),

+ 1 - 1
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -53,7 +53,7 @@ t_00_smoke_open_drop(_Config) ->
     lists:foreach(
         fun(Shard) ->
             ?assertEqual(
-                {ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard)
+                [Site], emqx_ds_replication_layer_meta:replica_set(DB, Shard)
             )
         end,
         Shards