Просмотр исходного кода

feat(ds): introduce keyspace concept

Fixes https://emqx.atlassian.net/browse/EMQX-10579

This introduces the concept of "keyspaces" to our durable storage (DS) implementation, and
also refactors some places where "shard" and "keyspace" would be mixed up.

We might want to tune the storage options differently for distinct sets of topics, the
keyspaces.  The keyspace is composed by one or more shards.

- Keyspaces are identified simply by binary strings.
- DS configuration is scoped by keyspaces instead of shards.
- Starting a new DS shard requires definining to which keyspace the shard belongs.
Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
b30bcf32bd

+ 8 - 3
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -46,6 +46,7 @@
 
 %% FIXME
 -define(DS_SHARD, <<"local">>).
+-define(DEFAULT_KEYSPACE, <<"#">>).
 
 -define(WHEN_ENABLED(DO),
     case is_store_enabled() of
@@ -58,9 +59,13 @@
 
 init() ->
     ?WHEN_ENABLED(begin
-        ok = emqx_ds:ensure_shard(?DS_SHARD, #{
-            dir => filename:join([emqx:data_dir(), ds, messages, ?DS_SHARD])
-        }),
+        ok = emqx_ds:ensure_shard(
+            ?DS_SHARD,
+            ?DEFAULT_KEYSPACE,
+            #{
+                dir => filename:join([emqx:data_dir(), ds, messages, ?DS_SHARD])
+            }
+        ),
         ok = emqx_persistent_session_ds_router:init_tables(),
         ok
     end).

+ 6 - 4
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -19,7 +19,7 @@
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 %% API:
--export([ensure_shard/2]).
+-export([ensure_shard/3]).
 %%   Messages:
 -export([message_store/2, message_store/1, message_stats/0]).
 %%   Iterator:
@@ -39,6 +39,7 @@
 -export([]).
 
 -export_type([
+    keyspace/0,
     message_id/0,
     message_stats/0,
     message_store_opts/0,
@@ -77,6 +78,7 @@
 %% Parsed topic:
 -type topic() :: list(binary()).
 
+-type keyspace() :: binary().
 -type shard() :: binary().
 
 %% Timestamp
@@ -96,10 +98,10 @@
 %% API funcions
 %%================================================================================
 
--spec ensure_shard(shard(), emqx_ds_storage_layer:options()) ->
+-spec ensure_shard(shard(), keyspace(), emqx_ds_storage_layer:options()) ->
     ok | {error, _Reason}.
-ensure_shard(Shard, Options) ->
-    case emqx_ds_storage_layer_sup:start_shard(Shard, Options) of
+ensure_shard(Shard, Keyspace, Options) ->
+    case emqx_ds_storage_layer_sup:start_shard(Shard, Keyspace, Options) of
         {ok, _Pid} ->
             ok;
         {error, {already_started, _Pid}} ->

+ 23 - 16
apps/emqx_durable_storage/src/emqx_ds_conf.erl

@@ -6,9 +6,9 @@
 %% TODO: make a proper HOCON schema and all...
 
 %% API:
--export([shard_config/1, db_options/0]).
+-export([keyspace_config/1, db_options/1]).
 
--export([shard_iteration_options/1]).
+-export([iteration_options/1]).
 -export([default_iteration_options/0]).
 
 -type backend_config() ::
@@ -23,16 +23,20 @@
 
 -define(APP, emqx_ds).
 
--spec shard_config(emqx_ds:shard()) -> backend_config().
-shard_config(Shard) ->
-    DefaultShardConfig = application:get_env(?APP, default_shard_config, default_shard_config()),
-    Shards = application:get_env(?APP, shard_config, #{}),
-    maps:get(Shard, Shards, DefaultShardConfig).
+-spec keyspace_config(emqx_ds:keyspace()) -> backend_config().
+keyspace_config(Keyspace) ->
+    DefaultKeyspaceConfig = application:get_env(
+        ?APP,
+        default_keyspace_config,
+        default_keyspace_config()
+    ),
+    Keyspaces = application:get_env(?APP, keyspace_config, #{}),
+    maps:get(Keyspace, Keyspaces, DefaultKeyspaceConfig).
 
--spec shard_iteration_options(emqx_ds:shard()) ->
+-spec iteration_options(emqx_ds:keyspace()) ->
     emqx_ds_message_storage_bitmask:iteration_options().
-shard_iteration_options(Shard) ->
-    case shard_config(Shard) of
+iteration_options(Keyspace) ->
+    case keyspace_config(Keyspace) of
         {emqx_ds_message_storage_bitmask, Config} ->
             maps:get(iteration, Config, default_iteration_options());
         {_Module, _} ->
@@ -41,12 +45,13 @@ shard_iteration_options(Shard) ->
 
 -spec default_iteration_options() -> emqx_ds_message_storage_bitmask:iteration_options().
 default_iteration_options() ->
-    {emqx_ds_message_storage_bitmask, Config} = default_shard_config(),
+    {emqx_ds_message_storage_bitmask, Config} = default_keyspace_config(),
     maps:get(iteration, Config).
 
--spec default_shard_config() -> backend_config().
-default_shard_config() ->
+-spec default_keyspace_config() -> backend_config().
+default_keyspace_config() ->
     {emqx_ds_message_storage_bitmask, #{
+        db_options => [],
         timestamp_bits => 64,
         topic_bits_per_level => [8, 8, 8, 32, 16],
         epoch => 5,
@@ -55,6 +60,8 @@ default_shard_config() ->
         }
     }}.
 
--spec db_options() -> emqx_ds_storage_layer:db_options().
-db_options() ->
-    application:get_env(?APP, db_options, []).
+-spec db_options(emqx_ds:keyspace()) -> emqx_ds_storage_layer:db_options().
+db_options(Keyspace) ->
+    DefaultDBOptions = application:get_env(?APP, default_db_options, []),
+    Keyspaces = application:get_env(?APP, keyspace_config, #{}),
+    emqx_utils_maps:deep_get([Keyspace, db_options], Keyspaces, DefaultDBOptions).

+ 6 - 3
apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl

@@ -80,7 +80,7 @@
 -behaviour(emqx_ds_storage_layer).
 
 %% API:
--export([create_new/3, open/5]).
+-export([create_new/3, open/6]).
 -export([make_keymapper/1]).
 
 -export([store/5]).
@@ -174,6 +174,7 @@
 
 -record(db, {
     shard :: emqx_ds:shard(),
+    keyspace :: emqx_ds:keyspace(),
     handle :: rocksdb:db_handle(),
     cf :: rocksdb:cf_handle(),
     keymapper :: keymapper(),
@@ -236,16 +237,18 @@ create_new(DBHandle, GenId, Options) ->
 %% Reopen the database
 -spec open(
     emqx_ds:shard(),
+    emqx_ds:keyspace(),
     rocksdb:db_handle(),
     emqx_ds_storage_layer:gen_id(),
     emqx_ds_storage_layer:cf_refs(),
     schema()
 ) ->
     db().
-open(Shard, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) ->
+open(Shard, Keyspace, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) ->
     {value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs),
     #db{
         shard = Shard,
+        keyspace = Keyspace,
         handle = DBHandle,
         cf = CFHandle,
         keymapper = Keymapper
@@ -289,7 +292,7 @@ delete(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic
 -spec make_iterator(db(), emqx_ds:replay()) ->
     {ok, iterator()} | {error, _TODO}.
 make_iterator(DB, Replay) ->
-    Options = emqx_ds_conf:shard_iteration_options(DB#db.shard),
+    Options = emqx_ds_conf:iteration_options(DB#db.keyspace),
     make_iterator(DB, Replay, Options).
 
 -spec make_iterator(db(), emqx_ds:replay(), iteration_options()) ->

+ 24 - 14
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -6,7 +6,7 @@
 -behaviour(gen_server).
 
 %% API:
--export([start_link/2]).
+-export([start_link/3]).
 -export([create_generation/3]).
 
 -export([store/5]).
@@ -64,6 +64,7 @@
 
 -record(s, {
     shard :: emqx_ds:shard(),
+    keyspace :: emqx_ds:keyspace(),
     db :: rocksdb:db_handle(),
     cf_iterator :: rocksdb:cf_handle(),
     cf_generations :: cf_refs()
@@ -107,7 +108,14 @@
 -callback create_new(rocksdb:db_handle(), gen_id(), _Options :: term()) ->
     {_Schema, cf_refs()}.
 
--callback open(emqx_ds:shard(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
+-callback open(
+    emqx_ds:shard(),
+    emqx_ds:keyspace(),
+    rocksdb:db_handle(),
+    gen_id(),
+    cf_refs(),
+    _Schema
+) ->
     term().
 
 -callback store(
@@ -135,9 +143,10 @@
 %% API funcions
 %%================================================================================
 
--spec start_link(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> {ok, pid()}.
-start_link(Shard, Options) ->
-    gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []).
+-spec start_link(emqx_ds:shard(), emqx_ds:keyspace(), emqx_ds_storage_layer:options()) ->
+    {ok, pid()}.
+start_link(Shard, Keyspace, Options) ->
+    gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Keyspace, Options}, []).
 
 -spec create_generation(emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()) ->
     {ok, gen_id()} | {error, nonmonotonic}.
@@ -249,9 +258,9 @@ foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) ->
 %% behaviour callbacks
 %%================================================================================
 
-init({Shard, Options}) ->
+init({Shard, Keyspace, Options}) ->
     process_flag(trap_exit, true),
-    {ok, S0} = open_db(Shard, Options),
+    {ok, S0} = open_db(Shard, Keyspace, Options),
     S = ensure_current_generation(S0),
     ok = populate_metadata(S),
     {ok, S}.
@@ -294,10 +303,10 @@ populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) ->
     meta_register_gen(Shard, GenId, Gen).
 
 -spec ensure_current_generation(state()) -> state().
-ensure_current_generation(S = #s{shard = Shard, db = DBHandle}) ->
+ensure_current_generation(S = #s{keyspace = Keyspace, db = DBHandle}) ->
     case schema_get_current(DBHandle) of
         undefined ->
-            Config = emqx_ds_conf:shard_config(Shard),
+            Config = emqx_ds_conf:keyspace_config(Keyspace),
             {ok, _, NS} = create_new_gen(0, Config, S),
             NS;
         _GenId ->
@@ -333,13 +342,13 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations
     },
     {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}.
 
--spec open_db(emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}.
-open_db(Shard, Options) ->
+-spec open_db(emqx_ds:shard(), emqx_ds:keyspace(), options()) -> {ok, state()} | {error, _TODO}.
+open_db(Shard, Keyspace, Options) ->
     DBDir = unicode:characters_to_list(maps:get(dir, Options, Shard)),
     DBOptions = [
         {create_if_missing, true},
         {create_missing_column_families, true}
-        | emqx_ds_conf:db_options()
+        | emqx_ds_conf:db_options(Keyspace)
     ],
     _ = filelib:ensure_dir(DBDir),
     ExistingCFs =
@@ -360,6 +369,7 @@ open_db(Shard, Options) ->
             {CFNames, _} = lists:unzip(ExistingCFs),
             {ok, #s{
                 shard = Shard,
+                keyspace = Keyspace,
                 db = DBHandle,
                 cf_iterator = CFIterator,
                 cf_generations = lists:zip(CFNames, CFRefs)
@@ -372,9 +382,9 @@ open_db(Shard, Options) ->
 open_gen(
     GenId,
     Gen = #{module := Mod, data := Data},
-    #s{shard = Shard, db = DBHandle, cf_generations = CFs}
+    #s{shard = Shard, keyspace = Keyspace, db = DBHandle, cf_generations = CFs}
 ) ->
-    DB = Mod:open(Shard, DBHandle, GenId, CFs, Data),
+    DB = Mod:open(Shard, Keyspace, DBHandle, GenId, CFs, Data),
     Gen#{data := DB}.
 
 -spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none.

+ 7 - 7
apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl

@@ -6,7 +6,7 @@
 -behaviour(supervisor).
 
 %% API:
--export([start_link/0, start_shard/2, stop_shard/1]).
+-export([start_link/0, start_shard/3, stop_shard/1]).
 
 %% behaviour callbacks:
 -export([init/1]).
@@ -25,10 +25,10 @@
 start_link() ->
     supervisor:start_link({local, ?SUP}, ?MODULE, []).
 
--spec start_shard(emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
+-spec start_shard(emqx_ds:shard(), emqx_ds:keyspace(), emqx_ds_storage_layer:options()) ->
     supervisor:startchild_ret().
-start_shard(Shard, Options) ->
-    supervisor:start_child(?SUP, shard_child_spec(Shard, Options)).
+start_shard(Shard, Keyspace, Options) ->
+    supervisor:start_child(?SUP, shard_child_spec(Shard, Keyspace, Options)).
 
 -spec stop_shard(emqx_ds:shard()) -> ok | {error, _}.
 stop_shard(Shard) ->
@@ -52,12 +52,12 @@ init([]) ->
 %% Internal functions
 %%================================================================================
 
--spec shard_child_spec(emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
+-spec shard_child_spec(emqx_ds:shard(), emqx_ds:keyspace(), emqx_ds_storage_layer:options()) ->
     supervisor:child_spec().
-shard_child_spec(Shard, Options) ->
+shard_child_spec(Shard, Keyspace, Options) ->
     #{
         id => Shard,
-        start => {emqx_ds_storage_layer, start_link, [Shard, Options]},
+        start => {emqx_ds_storage_layer, start_link, [Shard, Keyspace, Options]},
         shutdown => 5_000,
         restart => permanent,
         type => worker

+ 10 - 6
apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl

@@ -10,6 +10,7 @@
 -include_lib("stdlib/include/assert.hrl").
 
 -define(SHARD, shard(?FUNCTION_NAME)).
+-define(KEYSPACE, keyspace(?FUNCTION_NAME)).
 
 -define(DEFAULT_CONFIG,
     {emqx_ds_message_storage_bitmask, #{
@@ -33,7 +34,7 @@
 %% Smoke test for opening and reopening the database
 t_open(_Config) ->
     ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
-    {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}).
+    {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, ?KEYSPACE, #{}).
 
 %% Smoke test of store function
 t_store(_Config) ->
@@ -262,15 +263,18 @@ end_per_suite(_Config) ->
     ok = application:stop(emqx_durable_storage).
 
 init_per_testcase(TC, Config) ->
-    ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG),
-    {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), #{}),
+    ok = set_keyspace_config(keyspace(TC), ?DEFAULT_CONFIG),
+    {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), keyspace(TC), #{}),
     Config.
 
 end_per_testcase(TC, _Config) ->
     ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)).
 
-shard(TC) ->
+keyspace(TC) ->
     list_to_binary(lists:concat([?MODULE, "_", TC])).
 
-set_shard_config(Shard, Config) ->
-    ok = application:set_env(emqx_ds, shard_config, #{Shard => Config}).
+shard(TC) ->
+    <<(keyspace(TC))/binary, "_shard">>.
+
+set_keyspace_config(Keyspace, Config) ->
+    ok = application:set_env(emqx_ds, keyspace_config, #{Keyspace => Config}).

+ 3 - 2
apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl

@@ -10,7 +10,8 @@
 -define(WORK_DIR, ["_build", "test"]).
 -define(RUN_ID, {?MODULE, testrun_id}).
 
--define(ZONE, ?MODULE).
+-define(KEYSPACE, atom_to_binary(?MODULE)).
+-define(SHARD, <<(?KEYSPACE)/binary, "_shard">>).
 -define(GEN_ID, 42).
 
 %%--------------------------------------------------------------------
@@ -255,7 +256,7 @@ iterate_shim(Shim, Iteration) ->
 open_db(Filepath, Options) ->
     {ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]),
     {Schema, CFRefs} = emqx_ds_message_storage_bitmask:create_new(Handle, ?GEN_ID, Options),
-    DB = emqx_ds_message_storage_bitmask:open(?ZONE, Handle, ?GEN_ID, CFRefs, Schema),
+    DB = emqx_ds_message_storage_bitmask:open(?SHARD, ?KEYSPACE, Handle, ?GEN_ID, CFRefs, Schema),
     {DB, Handle}.
 
 close_db(Handle) ->