Selaa lähdekoodia

feat(ds): Metadata storage for the replication layer

ieQu1 2 vuotta sitten
vanhempi
commit
62542e5844

+ 3 - 1
apps/emqx/src/emqx_persistent_message.erl

@@ -57,7 +57,9 @@ storage_backend() ->
 storage_backend(#{builtin := #{enable := true}}) ->
     #{
         backend => builtin,
-        storage => {emqx_ds_storage_bitfield_lts, #{}}
+        storage => {emqx_ds_storage_bitfield_lts, #{}},
+        n_shards => 16,
+        replication_factor => 3
     }.
 
 %%--------------------------------------------------------------------

+ 3 - 1
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -442,7 +442,9 @@ del_subscription(TopicFilter, DSSessionId) ->
 create_tables() ->
     ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{
         backend => builtin,
-        storage => {emqx_ds_storage_bitfield_lts, #{}}
+        storage => {emqx_ds_storage_bitfield_lts, #{}},
+        n_shards => 255,
+        replication_factor => 3
     }),
     ok = mria:create_table(
         ?SESSION_TAB,

+ 1 - 8
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -35,7 +35,6 @@
 
 -export_type([
     create_db_opts/0,
-    builtin_db_opts/0,
     db/0,
     time/0,
     topic_filter/0,
@@ -87,14 +86,8 @@
 
 -type message_store_opts() :: #{}.
 
--type builtin_db_opts() ::
-    #{
-        backend := builtin,
-        storage := emqx_ds_storage_layer:prototype()
-    }.
-
 -type create_db_opts() ::
-    builtin_db_opts().
+    emqx_ds_replication_layer:builtin_db_opts().
 
 -type message_id() :: emqx_ds_replication_layer:message_id().
 

+ 1 - 0
apps/emqx_durable_storage/src/emqx_ds_app.erl

@@ -7,4 +7,5 @@
 -export([start/2]).
 
 start(_Type, _Args) ->
+    emqx_ds_replication_layer_meta:init(),
     emqx_ds_sup:start_link().

+ 18 - 8
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -40,7 +40,7 @@
     do_next_v1/4
 ]).
 
--export_type([shard_id/0, stream/0, iterator/0, message_id/0]).
+-export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0]).
 
 %%================================================================================
 %% Type declarations
@@ -58,7 +58,15 @@
 -define(shard, 2).
 -define(enc, 3).
 
--type shard_id() :: atom().
+-type shard_id() :: binary().
+
+-type builtin_db_opts() ::
+    #{
+        backend := builtin,
+        storage := emqx_ds_storage_layer:prototype(),
+        n_shards => pos_integer(),
+        replication_factor => pos_integer()
+    }.
 
 %% This enapsulates the stream entity from the replication level.
 %%
@@ -89,11 +97,12 @@
 -spec list_shards(emqx_ds:db()) -> [shard_id()].
 list_shards(_DB) ->
     %% TODO: milestone 5
-    list_nodes().
+    lists:map(fun atom_to_binary/1, list_nodes()).
 
--spec open_db(emqx_ds:db(), emqx_ds:create_db_opts()) -> ok | {error, _}.
-open_db(DB, Opts) ->
+-spec open_db(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
+open_db(DB, CreateOpts) ->
     %% TODO: improve error reporting, don't just crash
+    Opts = emqx_ds_replication_layer_meta:open_db(DB, CreateOpts),
     lists:foreach(
         fun(Shard) ->
             Node = node_of_shard(DB, Shard),
@@ -104,6 +113,7 @@ open_db(DB, Opts) ->
 
 -spec drop_db(emqx_ds:db()) -> ok | {error, _}.
 drop_db(DB) ->
+    _ = emqx_ds_replication_layer_meta:drop_db(DB),
     lists:foreach(
         fun(Shard) ->
             Node = node_of_shard(DB, Shard),
@@ -116,7 +126,7 @@ drop_db(DB) ->
     emqx_ds:store_batch_result().
 store_batch(DB, Batch, Opts) ->
     %% TODO: Currently we store messages locally.
-    Shard = node(),
+    Shard = atom_to_binary(node()),
     Node = node_of_shard(DB, Shard),
     emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts).
 
@@ -238,8 +248,8 @@ do_next_v1(DB, Shard, Iter, BatchSize) ->
 %%================================================================================
 
 -spec node_of_shard(emqx_ds:db(), shard_id()) -> node().
-node_of_shard(_DB, Node) ->
-    Node.
+node_of_shard(_DB, Shard) ->
+    binary_to_atom(Shard).
 
 list_nodes() ->
     mria:running_nodes().

+ 217 - 0
apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl

@@ -0,0 +1,217 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% @doc Metadata storage for the builtin sharded database.
+%%
+%% Currently metadata is stored in mria; that's not ideal, but
+%% eventually we'll replace it, so it's important not to leak
+%% implementation details from this module.
+-module(emqx_ds_replication_layer_meta).
+
+%% API:
+-export([init/0, shards/1, replica_set/2, sites/0, open_db/2, drop_db/1]).
+
+%% internal exports:
+-export([open_db_trans/2, drop_db_trans/1, claim_site/2]).
+
+-export_type([site/0]).
+
+-include_lib("stdlib/include/qlc.hrl").
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+-define(SHARD, emqx_ds_builtin_metadata_shard).
+%% DS database metadata:
+-define(META_TAB, emqx_ds_builtin_metadata_tab).
+%% Mapping from Site to the actual Erlang node:
+-define(NODE_TAB, emqx_ds_builtin_node_tab).
+%% Shard metadata:
+-define(SHARD_TAB, emqx_ds_builtin_shard_tab).
+
+-record(?META_TAB, {
+    db :: emqx_ds:db(),
+    db_props :: emqx_ds_replication_layer:builtin_db_opts()
+}).
+
+-record(?NODE_TAB, {
+    site :: site(),
+    node :: node(),
+    misc = #{} :: map()
+}).
+
+-record(?SHARD_TAB, {
+    shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()},
+    replica_set :: [site()],
+    leader :: node() | undefined,
+    misc = #{} :: map()
+}).
+
+%% Persistent ID of the node (independent from the IP/FQDN):
+-type site() :: binary().
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+-spec init() -> ok.
+init() ->
+    ensure_tables(),
+    ensure_site().
+
+-spec shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
+shards(DB) ->
+    eval_qlc(
+        qlc:q([Shard || #?SHARD_TAB{shard = {D, Shard}} <- mnesia:table(?SHARD_TAB), D =:= DB])
+    ).
+
+-spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
+    {ok, [site()]} | {error, _}.
+replica_set(DB, Shard) ->
+    case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
+        [#?SHARD_TAB{replica_set = ReplicaSet}] ->
+            {ok, ReplicaSet};
+        [] ->
+            {error, no_shard}
+    end.
+
+-spec sites() -> [site()].
+sites() ->
+    eval_qlc(qlc:q([Site || #?NODE_TAB{site = Site} <- mnesia:table(?NODE_TAB)])).
+
+-spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
+    emqx_ds_replication_layer:builtin_db_opts().
+open_db(DB, DefaultOpts) ->
+    {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]),
+    Opts.
+
+-spec drop_db(emqx_ds:db()) -> ok.
+drop_db(DB) ->
+    _ = mria:transaction(?SHARD, fun ?MODULE:drop_db_trans/1, [DB]),
+    ok.
+
+%%================================================================================
+%% behavior callbacks
+%%================================================================================
+
+%%================================================================================
+%% Internal exports
+%%================================================================================
+
+-spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
+    emqx_ds_replication_layer:builtin_db_opts().
+open_db_trans(DB, CreateOpts) ->
+    case mnesia:wread({?META_TAB, DB}) of
+        [] ->
+            NShards = maps:get(n_shards, CreateOpts),
+            ReplicationFactor = maps:get(replication_factor, CreateOpts),
+            mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}),
+            create_shards(DB, NShards, ReplicationFactor),
+            CreateOpts;
+        [#?META_TAB{db_props = Opts}] ->
+            Opts
+    end.
+
+-spec drop_db_trans(emqx_ds:db()) -> ok.
+drop_db_trans(DB) ->
+    mnesia:delete({?META_TAB, DB}),
+    [mnesia:delete({?SHARD_TAB, Shard}) || Shard <- shards(DB)],
+    ok.
+
+-spec claim_site(site(), node()) -> ok.
+claim_site(Site, Node) ->
+    mnesia:write(#?NODE_TAB{site = Site, node = Node}).
+
+%%================================================================================
+%% Internal functions
+%%================================================================================
+
+ensure_tables() ->
+    %% TODO: seems like it may introduce flakiness
+    Majority = false,
+    ok = mria:create_table(?META_TAB, [
+        {rlog_shard, ?SHARD},
+        {majority, Majority},
+        {type, ordered_set},
+        {storage, rocksdb_copies},
+        {record_name, ?META_TAB},
+        {attributes, record_info(fields, ?META_TAB)}
+    ]),
+    ok = mria:create_table(?NODE_TAB, [
+        {rlog_shard, ?SHARD},
+        {majority, Majority},
+        {type, ordered_set},
+        {storage, rocksdb_copies},
+        {record_name, ?NODE_TAB},
+        {attributes, record_info(fields, ?NODE_TAB)}
+    ]),
+    ok = mria:create_table(?SHARD_TAB, [
+        {rlog_shard, ?SHARD},
+        {majority, Majority},
+        {type, ordered_set},
+        {storage, ram_copies},
+        {record_name, ?SHARD_TAB},
+        {attributes, record_info(fields, ?SHARD_TAB)}
+    ]),
+    ok = mria:wait_for_tables([?META_TAB, ?NODE_TAB, ?SHARD_TAB]).
+
+ensure_site() ->
+    Filename = filename:join(emqx:data_dir(), "emqx_ds_builtin_site.eterm"),
+    case file:consult(Filename) of
+        {ok, [Site]} ->
+            ok;
+        _ ->
+            Site = crypto:strong_rand_bytes(8),
+            ok = filelib:ensure_dir(Filename),
+            {ok, FD} = file:open(Filename, [write]),
+            io:format(FD, "~p.", [Site]),
+            file:close(FD)
+    end,
+    {atomic, ok} = mria:transaction(?SHARD, fun ?MODULE:claim_site/2, [Site, node()]),
+    ok.
+
+-spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok.
+create_shards(DB, NShards, ReplicationFactor) ->
+    Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)],
+    Sites = sites(),
+    lists:foreach(
+        fun(Shard) ->
+            Hashes0 = [{hash(Shard, Site), Site} || Site <- Sites],
+            Hashes = lists:sort(Hashes0),
+            {_, Sites} = lists:unzip(Hashes),
+            ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor),
+            Record = #?SHARD_TAB{
+                shard = {DB, Shard},
+                replica_set = ReplicaSet
+            },
+            mnesia:write(Record)
+        end,
+        Shards
+    ).
+
+-spec hash(emqx_ds_replication_layer:shard_id(), site()) -> any().
+hash(Shard, Site) ->
+    erlang:phash2({Shard, Site}).
+
+eval_qlc(Q) ->
+    case mnesia:is_transaction() of
+        true ->
+            qlc:eval(Q);
+        false ->
+            {atomic, Result} = mria:ro_transaction(?SHARD, fun() -> qlc:eval(Q) end),
+            Result
+    end.

+ 1 - 1
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -384,7 +384,7 @@ rocksdb_open(Shard, Options) ->
 
 -spec db_dir(shard_id()) -> file:filename().
 db_dir({DB, ShardId}) ->
-    filename:join([emqx:data_dir(), atom_to_list(DB), atom_to_list(ShardId)]).
+    filename:join([emqx:data_dir(), atom_to_list(DB), binary_to_list(ShardId)]).
 
 %%--------------------------------------------------------------------------------
 %% Schema access

+ 19 - 2
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -23,10 +23,14 @@
 -include_lib("stdlib/include/assert.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
+-define(N_SHARDS, 8).
+
 opts() ->
     #{
         backend => builtin,
-        storage => {emqx_ds_storage_reference, #{}}
+        storage => {emqx_ds_storage_reference, #{}},
+        n_shards => ?N_SHARDS,
+        replication_factor => 3
     }.
 
 %% A simple smoke test that verifies that opening/closing the DB
@@ -34,6 +38,17 @@ opts() ->
 t_00_smoke_open_drop(_Config) ->
     DB = 'DB',
     ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    [Site] = emqx_ds_replication_layer_meta:sites(),
+    Shards = emqx_ds_replication_layer_meta:shards(DB),
+    ?assertEqual(?N_SHARDS, length(Shards)),
+    lists:foreach(
+        fun(Shard) ->
+            ?assertEqual(
+                {ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard), {DB, Shard}
+            )
+        end,
+        Shards
+    ),
     ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
     ?assertMatch(ok, emqx_ds:drop_db(DB)).
 
@@ -143,4 +158,6 @@ init_per_testcase(_TC, Config) ->
     Config.
 
 end_per_testcase(_TC, _Config) ->
-    ok = application:stop(emqx_durable_storage).
+    ok = application:stop(emqx_durable_storage),
+    mnesia:delete_schema([node()]),
+    mria:stop().

+ 6 - 2
apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl

@@ -15,7 +15,9 @@
 
 -define(DEFAULT_CONFIG, #{
     backend => builtin,
-    storage => {emqx_ds_storage_bitfield_lts, #{}}
+    storage => {emqx_ds_storage_bitfield_lts, #{}},
+    n_shards => 255,
+    replication_factor => 3
 }).
 
 -define(COMPACT_CONFIG, #{
@@ -23,7 +25,9 @@
     storage =>
         {emqx_ds_storage_bitfield_lts, #{
             bits_per_wildcard_level => 8
-        }}
+        }},
+    n_shards => 255,
+    replication_factor => 3
 }).
 
 %% Smoke test for opening and reopening the database