Преглед изворни кода

feat(dsrepl): manage generations / db config through ra machine

Andrew Mayorov пре 2 година
родитељ
комит
e6c2c2fb07

+ 87 - 51
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -36,9 +36,7 @@
     update_iterator/3,
     next/3,
     delete_next/4,
-    node_of_shard/2,
-    shard_of_message/3,
-    maybe_set_myself_as_leader/2
+    shard_of_message/3
 ]).
 
 %% internal exports:
@@ -160,13 +158,19 @@ open_db(DB, CreateOpts) ->
 
 -spec add_generation(emqx_ds:db()) -> ok | {error, _}.
 add_generation(DB) ->
-    Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB),
-    _ = emqx_ds_proto_v4:add_generation(Nodes, DB),
-    ok.
+    foreach_shard(
+        DB,
+        fun(Shard) -> ok = ra_add_generation(DB, Shard) end
+    ).
 
 -spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
 update_db_config(DB, CreateOpts) ->
-    emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts).
+    ok = emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts),
+    Opts = emqx_ds_replication_layer_meta:get_options(DB),
+    foreach_shard(
+        DB,
+        fun(Shard) -> ok = ra_update_config(DB, Shard, Opts) end
+    ).
 
 -spec list_generations_with_lifetimes(emqx_ds:db()) ->
     #{generation_rank() => emqx_ds:generation_info()}.
@@ -174,13 +178,12 @@ list_generations_with_lifetimes(DB) ->
     Shards = list_shards(DB),
     lists:foldl(
         fun(Shard, GensAcc) ->
-            Node = node_of_shard(DB, Shard),
             maps:fold(
                 fun(GenId, Data, AccInner) ->
                     AccInner#{{Shard, GenId} => Data}
                 end,
                 GensAcc,
-                emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard)
+                ra_list_generations_with_lifetimes(DB, Shard)
             )
         end,
         #{},
@@ -189,10 +192,7 @@ list_generations_with_lifetimes(DB) ->
 
 -spec drop_generation(emqx_ds:db(), generation_rank()) -> ok | {error, _}.
 drop_generation(DB, {Shard, GenId}) ->
-    %% TODO: drop generation in all nodes in the replica set, not only in the leader,
-    %% after we have proper replication in place.
-    Node = node_of_shard(DB, Shard),
-    emqx_ds_proto_v4:drop_generation(Node, DB, Shard, GenId).
+    ra_drop_generation(DB, Shard, GenId).
 
 -spec drop_db(emqx_ds:db()) -> ok | {error, _}.
 drop_db(DB) ->
@@ -244,8 +244,7 @@ get_delete_streams(DB, TopicFilter, StartTime) ->
     Shards = list_shards(DB),
     lists:flatmap(
         fun(Shard) ->
-            Node = node_of_shard(DB, Shard),
-            Streams = emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, StartTime),
+            Streams = ra_get_delete_streams(DB, Shard, TopicFilter, StartTime),
             lists:map(
                 fun(StorageLayerStream) ->
                     ?delete_stream(Shard, StorageLayerStream)
@@ -274,12 +273,7 @@ make_iterator(DB, Stream, TopicFilter, StartTime) ->
     emqx_ds:make_delete_iterator_result(delete_iterator()).
 make_delete_iterator(DB, Stream, TopicFilter, StartTime) ->
     ?delete_stream(Shard, StorageStream) = Stream,
-    Node = node_of_shard(DB, Shard),
-    case
-        emqx_ds_proto_v4:make_delete_iterator(
-            Node, DB, Shard, StorageStream, TopicFilter, StartTime
-        )
-    of
+    case ra_make_delete_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of
         {ok, Iter} ->
             {ok, #{?tag => ?DELETE_IT, ?shard => Shard, ?enc => Iter}};
         Err = {error, _} ->
@@ -327,8 +321,7 @@ next(DB, Iter0, BatchSize) ->
     emqx_ds:delete_next_result(delete_iterator()).
 delete_next(DB, Iter0, Selector, BatchSize) ->
     #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIter0} = Iter0,
-    Node = node_of_shard(DB, Shard),
-    case emqx_ds_proto_v4:delete_next(Node, DB, Shard, StorageIter0, Selector, BatchSize) of
+    case ra_delete_next(DB, Shard, StorageIter0, Selector, BatchSize) of
         {ok, StorageIter, NumDeleted} ->
             Iter = Iter0#{?enc := StorageIter},
             {ok, Iter, NumDeleted};
@@ -336,17 +329,6 @@ delete_next(DB, Iter0, Selector, BatchSize) ->
             Other
     end.
 
--spec node_of_shard(emqx_ds:db(), shard_id()) -> node().
-node_of_shard(DB, Shard) ->
-    case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of
-        {ok, Leader} ->
-            Leader;
-        {error, no_leader_for_shard} ->
-            %% TODO: use optvar
-            timer:sleep(500),
-            node_of_shard(DB, Shard)
-    end.
-
 -spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) ->
     emqx_ds_replication_layer:shard_id().
 shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) ->
@@ -358,18 +340,8 @@ shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) ->
         end,
     integer_to_binary(Hash).
 
-%% TODO: there's no real leader election right now
--spec maybe_set_myself_as_leader(emqx_ds:db(), shard_id()) -> ok.
-maybe_set_myself_as_leader(DB, Shard) ->
-    Site = emqx_ds_replication_layer_meta:this_site(),
-    case emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) of
-        [Site | _] ->
-            %% Currently the first in-sync replica always becomes the
-            %% leader
-            ok = emqx_ds_replication_layer_meta:set_leader(DB, Shard, node());
-        _Sites ->
-            ok
-    end.
+foreach_shard(DB, Fun) ->
+    lists:foreach(Fun, list_shards(DB)).
 
 %%================================================================================
 %% behavior callbacks
@@ -486,7 +458,8 @@ do_delete_next_v4(DB, Shard, Iter, Selector, BatchSize) ->
 
 -spec do_add_generation_v2(emqx_ds:db()) -> ok | {error, _}.
 do_add_generation_v2(DB) ->
-    MyShards = emqx_ds_replication_layer_meta:my_owned_shards(DB),
+    %% FIXME
+    MyShards = [],
     lists:foreach(
         fun(ShardId) ->
             emqx_ds_storage_layer:add_generation({DB, ShardId})
@@ -551,7 +524,7 @@ ra_start_shard(DB, Shard) ->
         _ ->
             ok
     end,
-    ignore.
+    emqx_ds_replication_layer_shard:start_link(LocalServer).
 
 ra_store_batch(DB, Shard, Messages) ->
     Command = #{
@@ -565,14 +538,49 @@ ra_store_batch(DB, Shard, Messages) ->
             error(Error, [DB, Shard])
     end.
 
+ra_add_generation(DB, Shard) ->
+    Command = #{?tag => add_generation},
+    case ra:process_command(ra_leader_servers(DB, Shard), Command) of
+        {ok, Result, _Leader} ->
+            Result;
+        Error ->
+            error(Error, [DB, Shard])
+    end.
+
+ra_update_config(DB, Shard, Opts) ->
+    Command = #{?tag => update_config, ?config => Opts},
+    case ra:process_command(ra_leader_servers(DB, Shard), Command) of
+        {ok, Result, _Leader} ->
+            Result;
+        Error ->
+            error(Error, [DB, Shard])
+    end.
+
+ra_drop_generation(DB, Shard, GenId) ->
+    Command = #{?tag => drop_generation, ?generation => GenId},
+    case ra:process_command(ra_leader_servers(DB, Shard), Command) of
+        {ok, Result, _Leader} ->
+            Result;
+        Error ->
+            error(Error, [DB, Shard])
+    end.
+
 ra_get_streams(DB, Shard, TopicFilter, Time) ->
     {_Name, Node} = ra_random_replica(DB, Shard),
     emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, Time).
 
+ra_get_delete_streams(DB, Shard, TopicFilter, Time) ->
+    {_Name, Node} = ra_random_replica(DB, Shard),
+    emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time).
+
 ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
     {_Name, Node} = ra_random_replica(DB, Shard),
     emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime).
 
+ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
+    {_Name, Node} = ra_random_replica(DB, Shard),
+    emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime).
+
 ra_update_iterator(DB, Shard, Iter, DSKey) ->
     {_Name, Node} = ra_random_replica(DB, Shard),
     emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey).
@@ -581,9 +589,16 @@ ra_next(DB, Shard, Iter, BatchSize) ->
     {_Name, Node} = ra_random_replica(DB, Shard),
     emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize).
 
+ra_delete_next(DB, Shard, Iter, Selector, BatchSize) ->
+    {_Name, Node} = ra_random_replica(DB, Shard),
+    emqx_ds_proto_v4:delete_next(Node, DB, Shard, Iter, Selector, BatchSize).
+
+ra_list_generations_with_lifetimes(DB, Shard) ->
+    {_Name, Node} = ra_random_replica(DB, Shard),
+    emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard).
+
 ra_drop_shard(DB, Shard) ->
-    %% TODO: clean dsrepl state
-    ra:stop_server(_System = default, ra_local_server(DB, Shard)).
+    ra:force_delete_server(_System = default, ra_local_server(DB, Shard)).
 
 ra_shard_servers(DB, Shard) ->
     {ok, ReplicaSet} = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
@@ -672,7 +687,28 @@ apply(
     NState = State#{latest := NLatest},
     %% TODO: Need to measure effects of changing frequency of `release_cursor`.
     Effect = {release_cursor, RaftIdx, NState},
-    {NState, Result, Effect}.
+    {NState, Result, Effect};
+apply(
+    _RaftMeta,
+    #{?tag := add_generation},
+    State
+) ->
+    Result = emqx_ds_storage_layer:add_generation(erlang:get(emqx_ds_db_shard)),
+    {State, Result};
+apply(
+    _RaftMeta,
+    #{?tag := update_config, ?config := Opts},
+    State
+) ->
+    Result = emqx_ds_storage_layer:update_config(erlang:get(emqx_ds_db_shard), Opts),
+    {State, Result};
+apply(
+    _RaftMeta,
+    #{?tag := drop_generation, ?generation := GenId},
+    State
+) ->
+    Result = emqx_ds_storage_layer:drop_generation(erlang:get(emqx_ds_db_shard), GenId),
+    {State, Result}.
 
 assign_timestamps(Latest, Messages) ->
     assign_timestamps(Latest, Messages, []).

+ 9 - 1
apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl

@@ -28,8 +28,16 @@
 %% keys:
 -define(tag, 1).
 -define(shard, 2).
--define(timestamp, 3).
 -define(enc, 3).
+
+%% ?BATCH
 -define(batch_messages, 2).
+-define(timestamp, 3).
+
+%% update_config
+-define(config, 2).
+
+%% drop_generation
+-define(generation, 2).
 
 -endif.

+ 0 - 113
apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl

@@ -29,20 +29,14 @@
 -export([
     shards/1,
     my_shards/1,
-    my_owned_shards/1,
-    leader_nodes/1,
     replica_set/2,
-    in_sync_replicas/2,
     sites/0,
     node/1,
     open_db/2,
     get_options/1,
     update_db_config/2,
     drop_db/1,
-    shard_leader/2,
     this_site/0,
-    set_leader/3,
-    is_leader/1,
     print_status/0
 ]).
 
@@ -55,9 +49,6 @@
     update_db_config_trans/2,
     drop_db_trans/1,
     claim_site/2,
-    in_sync_replicas_trans/2,
-    set_leader_trans/3,
-    is_leader_trans/1,
     n_shards/1
 ]).
 
@@ -96,9 +87,6 @@
     %% Sites that should contain the data when the cluster is in the
     %% stable state (no nodes are being added or removed from it):
     replica_set :: [site()],
-    %% Sites that contain the actual data:
-    in_sync_replicas :: [site()],
-    leader :: node() | undefined,
     misc = #{} :: map()
 }).
 
@@ -164,27 +152,6 @@ my_shards(DB) ->
         lists:member(Site, ReplicaSet)
     end).
 
--spec my_owned_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
-my_owned_shards(DB) ->
-    Self = node(),
-    filter_shards(DB, fun(#?SHARD_TAB{leader = Leader}) ->
-        Self =:= Leader
-    end).
-
--spec leader_nodes(emqx_ds:db()) -> [node()].
-leader_nodes(DB) ->
-    lists:uniq(
-        filter_shards(
-            DB,
-            fun(#?SHARD_TAB{leader = Leader}) ->
-                Leader =/= undefined
-            end,
-            fun(#?SHARD_TAB{leader = Leader}) ->
-                Leader
-            end
-        )
-    ).
-
 -spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
     {ok, [site()]} | {error, _}.
 replica_set(DB, Shard) ->
@@ -195,17 +162,6 @@ replica_set(DB, Shard) ->
             {error, no_shard}
     end.
 
--spec in_sync_replicas(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
-    [site()].
-in_sync_replicas(DB, ShardId) ->
-    {atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:in_sync_replicas_trans/2, [DB, ShardId]),
-    case Result of
-        {ok, InSync} ->
-            InSync;
-        {error, _} ->
-            []
-    end.
-
 -spec sites() -> [site()].
 sites() ->
     eval_qlc(qlc:q([Site || #?NODE_TAB{site = Site} <- mnesia:table(?NODE_TAB)])).
@@ -219,27 +175,6 @@ node(Site) ->
             undefined
     end.
 
--spec shard_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
-    {ok, node()} | {error, no_leader_for_shard}.
-shard_leader(DB, Shard) ->
-    case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
-        [#?SHARD_TAB{leader = Leader}] when Leader =/= undefined ->
-            {ok, Leader};
-        _ ->
-            {error, no_leader_for_shard}
-    end.
-
--spec set_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), node()) ->
-    ok.
-set_leader(DB, Shard, Node) ->
-    {atomic, _} = mria:transaction(?SHARD, fun ?MODULE:set_leader_trans/3, [DB, Shard, Node]),
-    ok.
-
--spec is_leader(node()) -> boolean().
-is_leader(Node) ->
-    {atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:is_leader_trans/1, [Node]),
-    Result.
-
 -spec get_options(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts().
 get_options(DB) ->
     {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, undefined]),
@@ -286,7 +221,6 @@ init([]) ->
     init_ra(),
     ensure_tables(),
     ensure_site(),
-    {ok, _} = mnesia:subscribe({table, ?META_TAB, detailed}),
     S = #s{},
     {ok, S}.
 
@@ -309,18 +243,6 @@ handle_call(_Call, _From, S) ->
 handle_cast(_Cast, S) ->
     {noreply, S}.
 
-handle_info(
-    {mnesia_table_event, {write, ?META_TAB, #?META_TAB{db = DB, db_props = Options}, [_], _}}, S
-) ->
-    MyShards = my_owned_shards(DB),
-
-    lists:foreach(
-        fun(ShardId) ->
-            emqx_ds_storage_layer:update_config({DB, ShardId}, Options)
-        end,
-        MyShards
-    ),
-    {noreply, S};
 handle_info(_Info, S) ->
     {noreply, S}.
 
@@ -382,41 +304,6 @@ drop_db_trans(DB) ->
 claim_site(Site, Node) ->
     mnesia:write(#?NODE_TAB{site = Site, node = Node}).
 
--spec in_sync_replicas_trans(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
-    {ok, [site()]} | {error, no_shard}.
-in_sync_replicas_trans(DB, Shard) ->
-    case mnesia:read(?SHARD_TAB, {DB, Shard}) of
-        [#?SHARD_TAB{in_sync_replicas = InSync}] ->
-            {ok, InSync};
-        [] ->
-            {error, no_shard}
-    end.
-
--spec set_leader_trans(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), node()) ->
-    ok.
-set_leader_trans(DB, Shard, Node) ->
-    [Record0] = mnesia:wread({?SHARD_TAB, {DB, Shard}}),
-    Record = Record0#?SHARD_TAB{leader = Node},
-    mnesia:write(Record).
-
--spec is_leader_trans(node) -> boolean().
-is_leader_trans(Node) ->
-    case
-        mnesia:select(
-            ?SHARD_TAB,
-            ets:fun2ms(fun(#?SHARD_TAB{leader = Leader}) ->
-                Leader =:= Node
-            end),
-            1,
-            read
-        )
-    of
-        {[_ | _], _Cont} ->
-            true;
-        _ ->
-            false
-    end.
-
 %%================================================================================
 %% Internal functions
 %%================================================================================

+ 47 - 0
apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl

@@ -0,0 +1,47 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_replication_layer_shard).
+
+-export([start_link/1]).
+
+-behaviour(gen_server).
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    terminate/2
+]).
+
+%%
+
+start_link(ServerId) ->
+    gen_server:start_link(?MODULE, ServerId, []).
+
+%%
+
+init(ServerId) ->
+    process_flag(trap_exit, true),
+    {ok, ServerId}.
+
+handle_call(_Call, _From, State) ->
+    {reply, ignored, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+terminate(_Reason, ServerId) ->
+    ok = ra:stop_server(ServerId).

+ 2 - 7
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -51,13 +51,8 @@ t_00_smoke_open_drop(_Config) ->
     lists:foreach(
         fun(Shard) ->
             ?assertEqual(
-                {ok, []}, emqx_ds_replication_layer_meta:replica_set(DB, Shard)
-            ),
-            ?assertEqual(
-                [Site], emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard)
-            ),
-            %%  Check that the leader is eleected;
-            ?assertEqual({ok, node()}, emqx_ds_replication_layer_meta:shard_leader(DB, Shard))
+                {ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard)
+            )
         end,
         Shards
     ),