Просмотр исходного кода

feat(dsrepl): add more primitive operations to modify DB sites

Andrew Mayorov 1 год назад
Родитель
Сommit
df6c5b35fe
1 измененных файлов с 102 добавлено и 34 удалено
  1. 102 34
      apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl

+ 102 - 34
apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl

@@ -46,6 +46,8 @@
 
 %% Site / shard allocation:
 -export([
+    join_db_site/2,
+    leave_db_site/2,
     assign_db_sites/2,
     replica_set_transitions/2,
     update_replica_set/3,
@@ -62,6 +64,7 @@
     open_db_trans/2,
     allocate_shards_trans/1,
     assign_db_sites_trans/2,
+    modify_db_sites_trans/2,
     update_replica_set_trans/3,
     update_db_config_trans/2,
     drop_db_trans/1,
@@ -227,7 +230,7 @@ open_db(DB, DefaultOpts) ->
     transaction(fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]).
 
 -spec update_db_config(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
-    emqx_ds_replication_layer:builtin_db_opts() | {error, _}.
+    emqx_ds_replication_layer:builtin_db_opts() | {error, nonexistent_db}.
 update_db_config(DB, DefaultOpts) ->
     transaction(fun ?MODULE:update_db_config_trans/2, [DB, DefaultOpts]).
 
@@ -235,26 +238,36 @@ update_db_config(DB, DefaultOpts) ->
 drop_db(DB) ->
     transaction(fun ?MODULE:drop_db_trans/1, [DB]).
 
--spec assign_db_sites(emqx_ds:db(), [site()]) -> ok.
+%%===============================================================================
+%% Site / shard allocation API
+%%===============================================================================
+
+%% @doc Join a site to the set of sites the DB is replicated across.
+-spec join_db_site(emqx_ds:db(), site()) ->
+    ok | {error, nonexistent_db | nonexistent_sites}.
+join_db_site(DB, Site) ->
+    transaction(fun ?MODULE:modify_db_sites_trans/2, [DB, [{add, Site}]]).
+
+%% @doc Make a site leave the set of sites the DB is replicated across.
+-spec leave_db_site(emqx_ds:db(), site()) ->
+    ok | {error, nonexistent_db | nonexistent_sites}.
+leave_db_site(DB, Site) ->
+    transaction(fun ?MODULE:modify_db_sites_trans/2, [DB, [{del, Site}]]).
+
+%% @doc Assign a set of sites to the DB for replication.
+-spec assign_db_sites(emqx_ds:db(), [site()]) ->
+    ok | {error, nonexistent_db | nonexistent_sites}.
 assign_db_sites(DB, Sites) ->
-    case mria:transaction(?SHARD, fun ?MODULE:assign_db_sites_trans/2, [DB, Sites]) of
-        {atomic, ok} ->
-            ok;
-        {aborted, Reason} ->
-            {error, Reason}
-    end.
+    transaction(fun ?MODULE:assign_db_sites_trans/2, [DB, Sites]).
 
+%% @doc List the sites the DB is replicated across.
 -spec db_sites(emqx_ds:db()) -> [site()].
 db_sites(DB) ->
     Recs = mnesia:dirty_match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'})),
-    lists:foldl(
-        fun(#?SHARD_TAB{replica_set = RS}, Acc) ->
-            ordsets:union(ordsets:from_list(RS), Acc)
-        end,
-        ordsets:new(),
-        Recs
-    ).
+    list_db_sites(Recs).
 
+%% @doc List the sequence of transitions that should be conducted in order to
+%% bring the set of replicas for a DB shard in line with the target set.
 -spec replica_set_transitions(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
     [transition()] | undefined.
 replica_set_transitions(DB, Shard) ->
@@ -265,6 +278,8 @@ replica_set_transitions(DB, Shard) ->
             undefined
     end.
 
+%% @doc Update the set of replication sites for a shard.
+%% To be called after a `transition()` has been conducted successfully.
 -spec update_replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), transition()) -> ok.
 update_replica_set(DB, Shard, Trans) ->
     case mria:transaction(?SHARD, fun ?MODULE:update_replica_set_trans/3, [DB, Shard, Trans]) of
@@ -274,6 +289,7 @@ update_replica_set(DB, Shard, Trans) ->
             {error, Reason}
     end.
 
+%% @doc Get the current set of replication sites for a shard.
 -spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
     [site()] | undefined.
 replica_set(DB, Shard) ->
@@ -284,6 +300,9 @@ replica_set(DB, Shard) ->
             undefined
     end.
 
+%% @doc Get the target set of replication sites for a DB shard.
+%% Target set is updated every time the set of replication sites for the DB changes.
+%% See `join_db_site/2`, `leave_db_site/2`, `assign_db_sites/2`.
 -spec target_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
     [site()] | undefined.
 target_set(DB, Shard) ->
@@ -390,6 +409,18 @@ assign_db_sites_trans(DB, Sites) ->
         Reallocation
     ).
 
+-spec modify_db_sites_trans(emqx_ds:db(), [transition()]) -> ok.
+modify_db_sites_trans(DB, Modifications) ->
+    Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write),
+    Sites0 = list_db_target_sites(Shards),
+    Sites = lists:foldl(fun apply_transition/2, Sites0, Modifications),
+    case Sites of
+        Sites0 ->
+            ok;
+        _Chagned ->
+            assign_db_sites_trans(DB, Sites)
+    end.
+
 update_replica_set_trans(DB, Shard, Trans) ->
     case mnesia:read(?SHARD_TAB, {DB, Shard}, write) of
         [Record = #?SHARD_TAB{replica_set = ReplicaSet0, target_set = TargetSet0}] ->
@@ -406,27 +437,26 @@ update_replica_set_trans(DB, Shard, Trans) ->
     end.
 
 -spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
-    ok | {error, _}.
+    emqx_ds_replication_layer:builtin_db_opts().
 update_db_config_trans(DB, UpdateOpts) ->
-    case mnesia:wread({?META_TAB, DB}) of
-        [#?META_TAB{db_props = Opts}] ->
-            %% Since this is an update and not a reopen,
-            %% we should keep the shard number and replication factor
-            %% and not create a new shard server
-            ChangeableOpts = maps:without([n_shards, n_sites, replication_factor], UpdateOpts),
-            EffectiveOpts = maps:merge(Opts, ChangeableOpts),
-            mnesia:write(#?META_TAB{
-                db = DB,
-                db_props = EffectiveOpts
-            }),
-            EffectiveOpts;
-        [] ->
-            {error, no_database}
-    end.
+    Opts = db_config_trans(DB, write),
+    %% Since this is an update and not a reopen,
+    %% we should keep the shard number and replication factor
+    %% and not create a new shard server
+    ChangeableOpts = maps:without([n_shards, n_sites, replication_factor], UpdateOpts),
+    EffectiveOpts = maps:merge(Opts, ChangeableOpts),
+    ok = mnesia:write(#?META_TAB{
+        db = DB,
+        db_props = EffectiveOpts
+    }),
+    EffectiveOpts.
 
 -spec db_config_trans(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts().
 db_config_trans(DB) ->
-    case mnesia:read(?META_TAB, DB, read) of
+    db_config_trans(DB, read).
+
+db_config_trans(DB, LockType) ->
+    case mnesia:read(?META_TAB, DB, LockType) of
         [#?META_TAB{db_props = Config}] ->
             Config;
         [] ->
@@ -488,6 +518,27 @@ ensure_site() ->
     persistent_term:put(?emqx_ds_builtin_site, Site),
     ok.
 
+%% @doc Returns sorted list of sites shards are replicated across.
+-spec list_db_sites([_Shard]) -> [site()].
+list_db_sites(Shards) ->
+    flatmap_sorted_set(fun get_shard_sites/1, Shards).
+
+-spec list_db_target_sites([_Shard]) -> [site()].
+list_db_target_sites(Shards) ->
+    flatmap_sorted_set(fun get_shard_target_sites/1, Shards).
+
+-spec get_shard_sites(_Shard) -> [site()].
+get_shard_sites(#?SHARD_TAB{replica_set = ReplicaSet}) ->
+    ReplicaSet.
+
+-spec get_shard_target_sites(_Shard) -> [site()].
+get_shard_target_sites(#?SHARD_TAB{target_set = Sites}) when is_list(Sites) ->
+    Sites;
+get_shard_target_sites(#?SHARD_TAB{target_set = undefined} = Shard) ->
+    get_shard_sites(Shard).
+
+-spec compute_allocation([Shard], [Site], emqx_ds_replication_layer:builtin_db_opts()) ->
+    [{Shard, [Site, ...]}].
 compute_allocation(Shards, Sites, Opts) ->
     NSites = length(Sites),
     ReplicationFactor = maps:get(replication_factor, Opts),
@@ -512,6 +563,8 @@ compute_transitions(TargetSet, ReplicaSet) ->
     Deletions = ReplicaSet -- TargetSet,
     intersperse([{add, S} || S <- Additions], [{del, S} || S <- Deletions]).
 
+%% @doc Apply a transition to a list of sites, preserving sort order.
+-spec apply_transition(transition(), [site()]) -> [site()].
 apply_transition({add, S}, Sites) ->
     lists:usort([S | Sites]);
 apply_transition({del, S}, Sites) ->
@@ -530,8 +583,12 @@ eval_qlc(Q) ->
     end.
 
 transaction(Fun, Args) ->
-    {atomic, Result} = mria:transaction(?SHARD, Fun, Args),
-    Result.
+    case mria:transaction(?SHARD, Fun, Args) of
+        {atomic, Result} ->
+            Result;
+        {aborted, Reason} ->
+            {error, Reason}
+    end.
 
 %% @doc Intersperse elements of two lists.
 %% Example: intersperse([1, 2], [3, 4, 5]) -> [1, 3, 2, 4, 5].
@@ -542,3 +599,14 @@ intersperse([], L2) ->
     L2;
 intersperse([H1 | T1], L2) ->
     [H1 | intersperse(L2, T1)].
+
+%% @doc Map list into a list of sets and return union, as a sorted list.
+-spec flatmap_sorted_set(fun((X) -> [Y]), [X]) -> [Y].
+flatmap_sorted_set(Fun, L) ->
+    ordsets:to_list(
+        lists:foldl(
+            fun(X, Acc) -> ordsets:union(ordsets:from_list(Fun(X)), Acc) end,
+            ordsets:new(),
+            L
+        )
+    ).