Sfoglia il codice sorgente

Merge pull request #11613 from thalesmg/ds-keyspace-m-20230914

feat(ds): introduce keyspace concept
Thales Macedo Garitezi 2 anni fa
parent
commit
2965fa6fcb

+ 3 - 1
apps/emqx/integration_test/emqx_ds_SUITE.erl

@@ -11,7 +11,9 @@
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 
--define(DS_SHARD, <<"local">>).
+-define(DEFAULT_KEYSPACE, default).
+-define(DS_SHARD_ID, <<"local">>).
+-define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}).
 -define(ITERATOR_REF_TAB, emqx_ds_iterator_ref).
 
 -import(emqx_common_test_helpers, [on_exit/1]).

+ 18 - 5
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -45,7 +45,9 @@
 ]).
 
 %% FIXME
--define(DS_SHARD, <<"local">>).
+-define(DS_SHARD_ID, <<"local">>).
+-define(DEFAULT_KEYSPACE, default).
+-define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}).
 
 -define(WHEN_ENABLED(DO),
     case is_store_enabled() of
@@ -58,9 +60,18 @@
 
 init() ->
     ?WHEN_ENABLED(begin
-        ok = emqx_ds:ensure_shard(?DS_SHARD, #{
-            dir => filename:join([emqx:data_dir(), ds, messages, ?DS_SHARD])
-        }),
+        ok = emqx_ds:ensure_shard(
+            ?DS_SHARD,
+            #{
+                dir => filename:join([
+                    emqx:data_dir(),
+                    ds,
+                    messages,
+                    ?DEFAULT_KEYSPACE,
+                    ?DS_SHARD_ID
+                ])
+            }
+        ),
         ok = emqx_persistent_session_ds_router:init_tables(),
         ok
     end).
@@ -86,7 +97,9 @@ store_message(Msg) ->
     ID = emqx_message:id(Msg),
     Timestamp = emqx_guid:timestamp(ID),
     Topic = emqx_topic:words(emqx_message:topic(Msg)),
-    emqx_ds_storage_layer:store(?DS_SHARD, ID, Timestamp, Topic, serialize_message(Msg)).
+    emqx_ds_storage_layer:store(
+        ?DS_SHARD, ID, Timestamp, Topic, serialize_message(Msg)
+    ).
 
 has_subscribers(#message{topic = Topic}) ->
     emqx_persistent_session_ds_router:has_any_route(Topic).

+ 3 - 1
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -26,7 +26,9 @@
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
--define(DS_SHARD, <<"local">>).
+-define(DEFAULT_KEYSPACE, default).
+-define(DS_SHARD_ID, <<"local">>).
+-define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}).
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).

+ 9 - 5
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -39,6 +39,7 @@
 -export([]).
 
 -export_type([
+    keyspace/0,
     message_id/0,
     message_stats/0,
     message_store_opts/0,
@@ -48,6 +49,7 @@
     iterator_id/0,
     iterator/0,
     shard/0,
+    shard_id/0,
     topic/0,
     time/0
 ]).
@@ -77,7 +79,9 @@
 %% Parsed topic:
 -type topic() :: list(binary()).
 
--type shard() :: binary().
+-type keyspace() :: atom().
+-type shard_id() :: binary().
+-type shard() :: {keyspace(), shard_id()}.
 
 %% Timestamp
 %% Earliest possible timestamp is 0.
@@ -140,7 +144,7 @@ message_stats() ->
 -spec session_open(emqx_types:clientid()) -> {_New :: boolean(), session_id()}.
 session_open(ClientID) ->
     {atomic, Res} =
-        mria:transaction(?DS_SHARD, fun() ->
+        mria:transaction(?DS_MRIA_SHARD, fun() ->
             case mnesia:read(?SESSION_TAB, ClientID, write) of
                 [#session{}] ->
                     {false, ClientID};
@@ -157,7 +161,7 @@ session_open(ClientID) ->
 -spec session_drop(emqx_types:clientid()) -> ok.
 session_drop(ClientID) ->
     {atomic, ok} = mria:transaction(
-        ?DS_SHARD,
+        ?DS_MRIA_SHARD,
         fun() ->
             %% TODO: ensure all iterators from this clientid are closed?
             mnesia:delete({?SESSION_TAB, ClientID})
@@ -178,7 +182,7 @@ session_suspend(_SessionId) ->
 session_add_iterator(DSSessionId, TopicFilter) ->
     IteratorRefId = {DSSessionId, TopicFilter},
     {atomic, Res} =
-        mria:transaction(?DS_SHARD, fun() ->
+        mria:transaction(?DS_MRIA_SHARD, fun() ->
             case mnesia:read(?ITERATOR_REF_TAB, IteratorRefId, write) of
                 [] ->
                     {IteratorId, StartMS} = new_iterator_id(DSSessionId),
@@ -221,7 +225,7 @@ session_get_iterator_id(DSSessionId, TopicFilter) ->
 session_del_iterator(DSSessionId, TopicFilter) ->
     IteratorRefId = {DSSessionId, TopicFilter},
     {atomic, ok} =
-        mria:transaction(?DS_SHARD, fun() ->
+        mria:transaction(?DS_MRIA_SHARD, fun() ->
             mnesia:delete(?ITERATOR_REF_TAB, IteratorRefId, write)
         end),
     ok.

+ 2 - 2
apps/emqx_durable_storage/src/emqx_ds_app.erl

@@ -18,7 +18,7 @@ init_mnesia() ->
     ok = mria:create_table(
         ?SESSION_TAB,
         [
-            {rlog_shard, ?DS_SHARD},
+            {rlog_shard, ?DS_MRIA_SHARD},
             {type, set},
             {storage, storage()},
             {record_name, session},
@@ -28,7 +28,7 @@ init_mnesia() ->
     ok = mria:create_table(
         ?ITERATOR_REF_TAB,
         [
-            {rlog_shard, ?DS_SHARD},
+            {rlog_shard, ?DS_MRIA_SHARD},
             {type, ordered_set},
             {storage, storage()},
             {record_name, iterator_ref},

+ 23 - 16
apps/emqx_durable_storage/src/emqx_ds_conf.erl

@@ -6,9 +6,9 @@
 %% TODO: make a proper HOCON schema and all...
 
 %% API:
--export([shard_config/1, db_options/0]).
+-export([keyspace_config/1, db_options/1]).
 
--export([shard_iteration_options/1]).
+-export([iteration_options/1]).
 -export([default_iteration_options/0]).
 
 -type backend_config() ::
@@ -23,16 +23,20 @@
 
 -define(APP, emqx_ds).
 
--spec shard_config(emqx_ds:shard()) -> backend_config().
-shard_config(Shard) ->
-    DefaultShardConfig = application:get_env(?APP, default_shard_config, default_shard_config()),
-    Shards = application:get_env(?APP, shard_config, #{}),
-    maps:get(Shard, Shards, DefaultShardConfig).
+-spec keyspace_config(emqx_ds:keyspace()) -> backend_config().
+keyspace_config(Keyspace) ->
+    DefaultKeyspaceConfig = application:get_env(
+        ?APP,
+        default_keyspace_config,
+        default_keyspace_config()
+    ),
+    Keyspaces = application:get_env(?APP, keyspace_config, #{}),
+    maps:get(Keyspace, Keyspaces, DefaultKeyspaceConfig).
 
--spec shard_iteration_options(emqx_ds:shard()) ->
+-spec iteration_options(emqx_ds:keyspace()) ->
     emqx_ds_message_storage_bitmask:iteration_options().
-shard_iteration_options(Shard) ->
-    case shard_config(Shard) of
+iteration_options(Keyspace) ->
+    case keyspace_config(Keyspace) of
         {emqx_ds_message_storage_bitmask, Config} ->
             maps:get(iteration, Config, default_iteration_options());
         {_Module, _} ->
@@ -41,12 +45,13 @@ shard_iteration_options(Shard) ->
 
 -spec default_iteration_options() -> emqx_ds_message_storage_bitmask:iteration_options().
 default_iteration_options() ->
-    {emqx_ds_message_storage_bitmask, Config} = default_shard_config(),
+    {emqx_ds_message_storage_bitmask, Config} = default_keyspace_config(),
     maps:get(iteration, Config).
 
--spec default_shard_config() -> backend_config().
-default_shard_config() ->
+-spec default_keyspace_config() -> backend_config().
+default_keyspace_config() ->
     {emqx_ds_message_storage_bitmask, #{
+        db_options => [],
         timestamp_bits => 64,
         topic_bits_per_level => [8, 8, 8, 32, 16],
         epoch => 5,
@@ -55,6 +60,8 @@ default_shard_config() ->
         }
     }}.
 
--spec db_options() -> emqx_ds_storage_layer:db_options().
-db_options() ->
-    application:get_env(?APP, db_options, []).
+-spec db_options(emqx_ds:keyspace()) -> emqx_ds_storage_layer:db_options().
+db_options(Keyspace) ->
+    DefaultDBOptions = application:get_env(?APP, default_db_options, []),
+    Keyspaces = application:get_env(?APP, keyspace_config, #{}),
+    emqx_utils_maps:deep_get([Keyspace, db_options], Keyspaces, DefaultDBOptions).

+ 1 - 1
apps/emqx_durable_storage/src/emqx_ds_int.hrl

@@ -18,7 +18,7 @@
 
 -define(SESSION_TAB, emqx_ds_session).
 -define(ITERATOR_REF_TAB, emqx_ds_iterator_ref).
--define(DS_SHARD, emqx_ds_shard).
+-define(DS_MRIA_SHARD, emqx_ds_shard).
 
 -record(session, {
     %% same as clientid

+ 2 - 1
apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl

@@ -289,7 +289,8 @@ delete(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic
 -spec make_iterator(db(), emqx_ds:replay()) ->
     {ok, iterator()} | {error, _TODO}.
 make_iterator(DB, Replay) ->
-    Options = emqx_ds_conf:shard_iteration_options(DB#db.shard),
+    {Keyspace, _ShardId} = DB#db.shard,
+    Options = emqx_ds_conf:iteration_options(Keyspace),
     make_iterator(DB, Replay, Options).
 
 -spec make_iterator(db(), emqx_ds:replay(), iteration_options()) ->

+ 23 - 13
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -98,7 +98,7 @@
 %% 3. `inplace_update_support`?
 -define(ITERATOR_CF_OPTS, []).
 
--define(REF(Shard), {via, gproc, {n, l, {?MODULE, Shard}}}).
+-define(REF(Keyspace, ShardId), {via, gproc, {n, l, {?MODULE, Keyspace, ShardId}}}).
 
 %%================================================================================
 %% Callbacks
@@ -107,7 +107,13 @@
 -callback create_new(rocksdb:db_handle(), gen_id(), _Options :: term()) ->
     {_Schema, cf_refs()}.
 
--callback open(emqx_ds:shard(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
+-callback open(
+    emqx_ds:shard(),
+    rocksdb:db_handle(),
+    gen_id(),
+    cf_refs(),
+    _Schema
+) ->
     term().
 
 -callback store(
@@ -135,14 +141,17 @@
 %% API funcions
 %%================================================================================
 
--spec start_link(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> {ok, pid()}.
-start_link(Shard, Options) ->
-    gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []).
+-spec start_link(emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
+    {ok, pid()}.
+start_link(Shard = {Keyspace, ShardId}, Options) ->
+    gen_server:start_link(?REF(Keyspace, ShardId), ?MODULE, {Shard, Options}, []).
 
--spec create_generation(emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()) ->
+-spec create_generation(
+    emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()
+) ->
     {ok, gen_id()} | {error, nonmonotonic}.
-create_generation(Shard, Since, Config = {_Module, _Options}) ->
-    gen_server:call(?REF(Shard), {create_generation, Since, Config}).
+create_generation({Keyspace, ShardId}, Since, Config = {_Module, _Options}) ->
+    gen_server:call(?REF(Keyspace, ShardId), {create_generation, Since, Config}).
 
 -spec store(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary()) ->
     ok | {error, _}.
@@ -294,10 +303,10 @@ populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) ->
     meta_register_gen(Shard, GenId, Gen).
 
 -spec ensure_current_generation(state()) -> state().
-ensure_current_generation(S = #s{shard = Shard, db = DBHandle}) ->
+ensure_current_generation(S = #s{shard = {Keyspace, _ShardId}, db = DBHandle}) ->
     case schema_get_current(DBHandle) of
         undefined ->
-            Config = emqx_ds_conf:shard_config(Shard),
+            Config = emqx_ds_conf:keyspace_config(Keyspace),
             {ok, _, NS} = create_new_gen(0, Config, S),
             NS;
         _GenId ->
@@ -334,12 +343,13 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations
     {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}.
 
 -spec open_db(emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}.
-open_db(Shard, Options) ->
-    DBDir = unicode:characters_to_list(maps:get(dir, Options, Shard)),
+open_db(Shard = {Keyspace, ShardId}, Options) ->
+    DefaultDir = filename:join([atom_to_binary(Keyspace), ShardId]),
+    DBDir = unicode:characters_to_list(maps:get(dir, Options, DefaultDir)),
     DBOptions = [
         {create_if_missing, true},
         {create_missing_column_families, true}
-        | emqx_ds_conf:db_options()
+        | emqx_ds_conf:db_options(Keyspace)
     ],
     _ = filelib:ensure_dir(DBDir),
     ExistingCFs =

+ 10 - 4
apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl

@@ -262,15 +262,21 @@ end_per_suite(_Config) ->
     ok = application:stop(emqx_durable_storage).
 
 init_per_testcase(TC, Config) ->
-    ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG),
+    ok = set_keyspace_config(keyspace(TC), ?DEFAULT_CONFIG),
     {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), #{}),
     Config.
 
 end_per_testcase(TC, _Config) ->
     ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)).
 
+keyspace(TC) ->
+    list_to_atom(lists:concat([?MODULE, "_", TC])).
+
+shard_id(_TC) ->
+    <<"shard">>.
+
 shard(TC) ->
-    list_to_binary(lists:concat([?MODULE, "_", TC])).
+    {keyspace(TC), shard_id(TC)}.
 
-set_shard_config(Shard, Config) ->
-    ok = application:set_env(emqx_ds, shard_config, #{Shard => Config}).
+set_keyspace_config(Keyspace, Config) ->
+    ok = application:set_env(emqx_ds, keyspace_config, #{Keyspace => Config}).

+ 4 - 2
apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl

@@ -10,7 +10,9 @@
 -define(WORK_DIR, ["_build", "test"]).
 -define(RUN_ID, {?MODULE, testrun_id}).
 
--define(ZONE, ?MODULE).
+-define(KEYSPACE, ?MODULE).
+-define(SHARD_ID, <<"shard">>).
+-define(SHARD, {?KEYSPACE, ?SHARD_ID}).
 -define(GEN_ID, 42).
 
 %%--------------------------------------------------------------------
@@ -255,7 +257,7 @@ iterate_shim(Shim, Iteration) ->
 open_db(Filepath, Options) ->
     {ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]),
     {Schema, CFRefs} = emqx_ds_message_storage_bitmask:create_new(Handle, ?GEN_ID, Options),
-    DB = emqx_ds_message_storage_bitmask:open(?ZONE, Handle, ?GEN_ID, CFRefs, Schema),
+    DB = emqx_ds_message_storage_bitmask:open(?SHARD, Handle, ?GEN_ID, CFRefs, Schema),
     {DB, Handle}.
 
 close_db(Handle) ->