Browse Source

feat(dsrepl): allocate shards once predefined number of sites online

Before this commit the most likely shard allocation outcome was
that all shard are allocated to just one node.
Andrew Mayorov 2 years ago
parent
commit
5e94bdb932

+ 10 - 0
apps/emqx/src/emqx_ds_schema.erl

@@ -39,6 +39,7 @@
 translate_builtin(#{
 translate_builtin(#{
     backend := builtin,
     backend := builtin,
     n_shards := NShards,
     n_shards := NShards,
+    n_sites := NSites,
     replication_factor := ReplFactor,
     replication_factor := ReplFactor,
     layout := Layout
     layout := Layout
 }) ->
 }) ->
@@ -61,6 +62,7 @@ translate_builtin(#{
     #{
     #{
         backend => builtin,
         backend => builtin,
         n_shards => NShards,
         n_shards => NShards,
+        n_sites => NSites,
         replication_factor => ReplFactor,
         replication_factor => ReplFactor,
         storage => Storage
         storage => Storage
     }.
     }.
@@ -126,6 +128,14 @@ fields(builtin) ->
                     desc => ?DESC(builtin_n_shards)
                     desc => ?DESC(builtin_n_shards)
                 }
                 }
             )},
             )},
+        %% TODO: Minimum number of sites that will be responsible for the shards
+        {"n_sites",
+            sc(
+                pos_integer(),
+                #{
+                    default => 1
+                }
+            )},
         {replication_factor,
         {replication_factor,
             sc(
             sc(
                 pos_integer(),
                 pos_integer(),

+ 2 - 1
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -520,7 +520,8 @@ app_specs(Opts) ->
     ].
     ].
 
 
 cluster() ->
 cluster() ->
-    Spec = #{role => core, apps => app_specs()},
+    ExtraConf = "\n session_persistence.storage.builtin.n_sites = 2",
+    Spec = #{role => core, apps => app_specs(#{extra_emqx_conf => ExtraConf})},
     [
     [
         {persistent_messages_SUITE1, Spec},
         {persistent_messages_SUITE1, Spec},
         {persistent_messages_SUITE2, Spec}
         {persistent_messages_SUITE2, Spec}

+ 1 - 0
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -94,6 +94,7 @@
         backend := builtin,
         backend := builtin,
         storage := emqx_ds_storage_layer:prototype(),
         storage := emqx_ds_storage_layer:prototype(),
         n_shards => pos_integer(),
         n_shards => pos_integer(),
+        n_sites => pos_integer(),
         replication_factor => pos_integer()
         replication_factor => pos_integer()
     }.
     }.
 
 

+ 14 - 13
apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl

@@ -251,11 +251,12 @@ open_db(DB, DefaultOpts) ->
     case mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]) of
     case mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]) of
         {atomic, Opts} ->
         {atomic, Opts} ->
             Opts;
             Opts;
-        {aborted, {siteless_nodes, Nodes}} ->
-            %% TODO
-            %% This is ugly. We need a good story of how to fairly allocate shards in a
-            %% fresh cluster.
-            logger:notice("Aborting shard allocation, siteless nodes found: ~p", [Nodes]),
+        {aborted, {insufficient_sites_online, NNeeded, Sites}} ->
+            %% TODO: Still ugly, it blocks the whole node startup.
+            logger:notice(
+                "Shard allocation still in progress, not enough sites: ~p, need: ~p",
+                [Sites, NNeeded]
+            ),
             ok = timer:sleep(1000),
             ok = timer:sleep(1000),
             open_db(DB, DefaultOpts)
             open_db(DB, DefaultOpts)
     end.
     end.
@@ -337,9 +338,10 @@ open_db_trans(DB, CreateOpts) ->
     case mnesia:wread({?META_TAB, DB}) of
     case mnesia:wread({?META_TAB, DB}) of
         [] when is_map(CreateOpts) ->
         [] when is_map(CreateOpts) ->
             NShards = maps:get(n_shards, CreateOpts),
             NShards = maps:get(n_shards, CreateOpts),
+            NSites = maps:get(n_sites, CreateOpts),
             ReplicationFactor = maps:get(replication_factor, CreateOpts),
             ReplicationFactor = maps:get(replication_factor, CreateOpts),
             mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}),
             mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}),
-            create_shards(DB, NShards, ReplicationFactor),
+            create_shards(DB, NSites, NShards, ReplicationFactor),
             CreateOpts;
             CreateOpts;
         [#?META_TAB{db_props = Opts}] ->
         [#?META_TAB{db_props = Opts}] ->
             Opts
             Opts
@@ -465,16 +467,15 @@ ensure_site() ->
     persistent_term:put(?emqx_ds_builtin_site, Site),
     persistent_term:put(?emqx_ds_builtin_site, Site),
     ok.
     ok.
 
 
--spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok.
-create_shards(DB, NShards, ReplicationFactor) ->
+-spec create_shards(emqx_ds:db(), pos_integer(), pos_integer(), pos_integer()) -> ok.
+create_shards(DB, NSites, NShards, ReplicationFactor) ->
     Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)],
     Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)],
     AllSites = mnesia:match_object(?NODE_TAB, #?NODE_TAB{_ = '_'}, read),
     AllSites = mnesia:match_object(?NODE_TAB, #?NODE_TAB{_ = '_'}, read),
-    Nodes = mria_mnesia:running_nodes(),
-    case Nodes -- [N || #?NODE_TAB{node = N} <- AllSites] of
-        [] ->
+    case length(AllSites) of
+        N when N >= NSites ->
             ok;
             ok;
-        NodesSiteless ->
-            mnesia:abort({siteless_nodes, NodesSiteless})
+        _ ->
+            mnesia:abort({insufficient_sites_online, NSites, AllSites})
     end,
     end,
     lists:foreach(
     lists:foreach(
         fun(Shard) ->
         fun(Shard) ->