Przeglądaj źródła

refactor: combine shard id and keyspace into a single value

Thales Macedo Garitezi 2 lat temu
rodzic
commit
5ed5ac48ee

+ 3 - 1
apps/emqx/integration_test/emqx_ds_SUITE.erl

@@ -11,7 +11,9 @@
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 
--define(DS_SHARD, <<"local">>).
+-define(DEFAULT_KEYSPACE, default).
+-define(DS_SHARD_ID, <<"local">>).
+-define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}).
 -define(ITERATOR_REF_TAB, emqx_ds_iterator_ref).
 
 -import(emqx_common_test_helpers, [on_exit/1]).

+ 7 - 5
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -45,8 +45,9 @@
 ]).
 
 %% FIXME
--define(DS_SHARD, <<"local">>).
+-define(DS_SHARD_ID, <<"local">>).
 -define(DEFAULT_KEYSPACE, default).
+-define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}).
 
 -define(WHEN_ENABLED(DO),
     case is_store_enabled() of
@@ -60,15 +61,14 @@
 init() ->
     ?WHEN_ENABLED(begin
         ok = emqx_ds:ensure_shard(
-            ?DEFAULT_KEYSPACE,
             ?DS_SHARD,
             #{
                 dir => filename:join([
                     emqx:data_dir(),
                     ds,
                     messages,
-                    atom_to_binary(?DEFAULT_KEYSPACE),
-                    ?DS_SHARD
+                    ?DEFAULT_KEYSPACE,
+                    ?DS_SHARD_ID
                 ])
             }
         ),
@@ -97,7 +97,9 @@ store_message(Msg) ->
     ID = emqx_message:id(Msg),
     Timestamp = emqx_guid:timestamp(ID),
     Topic = emqx_topic:words(emqx_message:topic(Msg)),
-    emqx_ds_storage_layer:store(?DS_SHARD, ID, Timestamp, Topic, serialize_message(Msg)).
+    emqx_ds_storage_layer:store(
+        ?DS_SHARD, ID, Timestamp, Topic, serialize_message(Msg)
+    ).
 
 has_subscribers(#message{topic = Topic}) ->
     emqx_persistent_session_ds_router:has_any_route(Topic).

+ 3 - 1
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -26,7 +26,9 @@
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
--define(DS_SHARD, <<"local">>).
+-define(DEFAULT_KEYSPACE, default).
+-define(DS_SHARD_ID, <<"local">>).
+-define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}).
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).

+ 11 - 9
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -19,7 +19,7 @@
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 %% API:
--export([ensure_shard/3]).
+-export([ensure_shard/2]).
 %%   Messages:
 -export([message_store/2, message_store/1, message_stats/0]).
 %%   Iterator:
@@ -49,6 +49,7 @@
     iterator_id/0,
     iterator/0,
     shard/0,
+    shard_id/0,
     topic/0,
     time/0
 ]).
@@ -79,7 +80,8 @@
 -type topic() :: list(binary()).
 
 -type keyspace() :: atom().
--type shard() :: binary().
+-type shard_id() :: binary().
+-type shard() :: {keyspace(), shard_id()}.
 
 %% Timestamp
 %% Earliest possible timestamp is 0.
@@ -98,10 +100,10 @@
 %% API funcions
 %%================================================================================
 
--spec ensure_shard(keyspace(), shard(), emqx_ds_storage_layer:options()) ->
+-spec ensure_shard(shard(), emqx_ds_storage_layer:options()) ->
     ok | {error, _Reason}.
-ensure_shard(Keyspace, Shard, Options) ->
-    case emqx_ds_storage_layer_sup:start_shard(Keyspace, Shard, Options) of
+ensure_shard(Shard, Options) ->
+    case emqx_ds_storage_layer_sup:start_shard(Shard, Options) of
         {ok, _Pid} ->
             ok;
         {error, {already_started, _Pid}} ->
@@ -142,7 +144,7 @@ message_stats() ->
 -spec session_open(emqx_types:clientid()) -> {_New :: boolean(), session_id()}.
 session_open(ClientID) ->
     {atomic, Res} =
-        mria:transaction(?DS_SHARD, fun() ->
+        mria:transaction(?DS_MRIA_SHARD, fun() ->
             case mnesia:read(?SESSION_TAB, ClientID, write) of
                 [#session{}] ->
                     {false, ClientID};
@@ -159,7 +161,7 @@ session_open(ClientID) ->
 -spec session_drop(emqx_types:clientid()) -> ok.
 session_drop(ClientID) ->
     {atomic, ok} = mria:transaction(
-        ?DS_SHARD,
+        ?DS_MRIA_SHARD,
         fun() ->
             %% TODO: ensure all iterators from this clientid are closed?
             mnesia:delete({?SESSION_TAB, ClientID})
@@ -180,7 +182,7 @@ session_suspend(_SessionId) ->
 session_add_iterator(DSSessionId, TopicFilter) ->
     IteratorRefId = {DSSessionId, TopicFilter},
     {atomic, Res} =
-        mria:transaction(?DS_SHARD, fun() ->
+        mria:transaction(?DS_MRIA_SHARD, fun() ->
             case mnesia:read(?ITERATOR_REF_TAB, IteratorRefId, write) of
                 [] ->
                     {IteratorId, StartMS} = new_iterator_id(DSSessionId),
@@ -223,7 +225,7 @@ session_get_iterator_id(DSSessionId, TopicFilter) ->
 session_del_iterator(DSSessionId, TopicFilter) ->
     IteratorRefId = {DSSessionId, TopicFilter},
     {atomic, ok} =
-        mria:transaction(?DS_SHARD, fun() ->
+        mria:transaction(?DS_MRIA_SHARD, fun() ->
             mnesia:delete(?ITERATOR_REF_TAB, IteratorRefId, write)
         end),
     ok.

+ 2 - 2
apps/emqx_durable_storage/src/emqx_ds_app.erl

@@ -18,7 +18,7 @@ init_mnesia() ->
     ok = mria:create_table(
         ?SESSION_TAB,
         [
-            {rlog_shard, ?DS_SHARD},
+            {rlog_shard, ?DS_MRIA_SHARD},
             {type, set},
             {storage, storage()},
             {record_name, session},
@@ -28,7 +28,7 @@ init_mnesia() ->
     ok = mria:create_table(
         ?ITERATOR_REF_TAB,
         [
-            {rlog_shard, ?DS_SHARD},
+            {rlog_shard, ?DS_MRIA_SHARD},
             {type, ordered_set},
             {storage, storage()},
             {record_name, iterator_ref},

+ 1 - 1
apps/emqx_durable_storage/src/emqx_ds_int.hrl

@@ -18,7 +18,7 @@
 
 -define(SESSION_TAB, emqx_ds_session).
 -define(ITERATOR_REF_TAB, emqx_ds_iterator_ref).
--define(DS_SHARD, emqx_ds_shard).
+-define(DS_MRIA_SHARD, emqx_ds_shard).
 
 -record(session, {
     %% same as clientid

+ 4 - 6
apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl

@@ -80,7 +80,7 @@
 -behaviour(emqx_ds_storage_layer).
 
 %% API:
--export([create_new/3, open/6]).
+-export([create_new/3, open/5]).
 -export([make_keymapper/1]).
 
 -export([store/5]).
@@ -173,7 +173,6 @@
 -opaque schema() :: #schema{}.
 
 -record(db, {
-    keyspace :: emqx_ds:keyspace(),
     shard :: emqx_ds:shard(),
     handle :: rocksdb:db_handle(),
     cf :: rocksdb:cf_handle(),
@@ -236,7 +235,6 @@ create_new(DBHandle, GenId, Options) ->
 
 %% Reopen the database
 -spec open(
-    emqx_ds:keyspace(),
     emqx_ds:shard(),
     rocksdb:db_handle(),
     emqx_ds_storage_layer:gen_id(),
@@ -244,10 +242,9 @@ create_new(DBHandle, GenId, Options) ->
     schema()
 ) ->
     db().
-open(Keyspace, Shard, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) ->
+open(Shard, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) ->
     {value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs),
     #db{
-        keyspace = Keyspace,
         shard = Shard,
         handle = DBHandle,
         cf = CFHandle,
@@ -292,7 +289,8 @@ delete(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic
 -spec make_iterator(db(), emqx_ds:replay()) ->
     {ok, iterator()} | {error, _TODO}.
 make_iterator(DB, Replay) ->
-    Options = emqx_ds_conf:iteration_options(DB#db.keyspace),
+    {Keyspace, _ShardId} = DB#db.shard,
+    Options = emqx_ds_conf:iteration_options(Keyspace),
     make_iterator(DB, Replay, Options).
 
 -spec make_iterator(db(), emqx_ds:replay(), iteration_options()) ->

+ 17 - 20
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -6,8 +6,8 @@
 -behaviour(gen_server).
 
 %% API:
--export([start_link/3]).
--export([create_generation/4]).
+-export([start_link/2]).
+-export([create_generation/3]).
 
 -export([store/5]).
 -export([delete/4]).
@@ -64,7 +64,6 @@
 
 -record(s, {
     shard :: emqx_ds:shard(),
-    keyspace :: emqx_ds:keyspace(),
     db :: rocksdb:db_handle(),
     cf_iterator :: rocksdb:cf_handle(),
     cf_generations :: cf_refs()
@@ -99,7 +98,7 @@
 %% 3. `inplace_update_support`?
 -define(ITERATOR_CF_OPTS, []).
 
--define(REF(Keyspace, Shard), {via, gproc, {n, l, {?MODULE, Keyspace, Shard}}}).
+-define(REF(Keyspace, ShardId), {via, gproc, {n, l, {?MODULE, Keyspace, ShardId}}}).
 
 %%================================================================================
 %% Callbacks
@@ -109,7 +108,6 @@
     {_Schema, cf_refs()}.
 
 -callback open(
-    emqx_ds:keyspace(),
     emqx_ds:shard(),
     rocksdb:db_handle(),
     gen_id(),
@@ -143,17 +141,17 @@
 %% API funcions
 %%================================================================================
 
--spec start_link(emqx_ds:keyspace(), emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
+-spec start_link(emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
     {ok, pid()}.
-start_link(Keyspace, Shard, Options) ->
-    gen_server:start_link(?REF(Keyspace, Shard), ?MODULE, {Keyspace, Shard, Options}, []).
+start_link(Shard = {Keyspace, ShardId}, Options) ->
+    gen_server:start_link(?REF(Keyspace, ShardId), ?MODULE, {Shard, Options}, []).
 
 -spec create_generation(
-    emqx_ds:keyspace(), emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()
+    emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()
 ) ->
     {ok, gen_id()} | {error, nonmonotonic}.
-create_generation(Keyspace, Shard, Since, Config = {_Module, _Options}) ->
-    gen_server:call(?REF(Keyspace, Shard), {create_generation, Since, Config}).
+create_generation({Keyspace, ShardId}, Since, Config = {_Module, _Options}) ->
+    gen_server:call(?REF(Keyspace, ShardId), {create_generation, Since, Config}).
 
 -spec store(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary()) ->
     ok | {error, _}.
@@ -260,9 +258,9 @@ foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) ->
 %% behaviour callbacks
 %%================================================================================
 
-init({Keyspace, Shard, Options}) ->
+init({Shard, Options}) ->
     process_flag(trap_exit, true),
-    {ok, S0} = open_db(Keyspace, Shard, Options),
+    {ok, S0} = open_db(Shard, Options),
     S = ensure_current_generation(S0),
     ok = populate_metadata(S),
     {ok, S}.
@@ -305,7 +303,7 @@ populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) ->
     meta_register_gen(Shard, GenId, Gen).
 
 -spec ensure_current_generation(state()) -> state().
-ensure_current_generation(S = #s{keyspace = Keyspace, db = DBHandle}) ->
+ensure_current_generation(S = #s{shard = {Keyspace, _ShardId}, db = DBHandle}) ->
     case schema_get_current(DBHandle) of
         undefined ->
             Config = emqx_ds_conf:keyspace_config(Keyspace),
@@ -344,9 +342,9 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations
     },
     {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}.
 
--spec open_db(emqx_ds:keyspace(), emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}.
-open_db(Keyspace, Shard, Options) ->
-    DefaultDir = filename:join([atom_to_binary(Keyspace), Shard]),
+-spec open_db(emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}.
+open_db(Shard = {Keyspace, ShardId}, Options) ->
+    DefaultDir = filename:join([atom_to_binary(Keyspace), ShardId]),
     DBDir = unicode:characters_to_list(maps:get(dir, Options, DefaultDir)),
     DBOptions = [
         {create_if_missing, true},
@@ -372,7 +370,6 @@ open_db(Keyspace, Shard, Options) ->
             {CFNames, _} = lists:unzip(ExistingCFs),
             {ok, #s{
                 shard = Shard,
-                keyspace = Keyspace,
                 db = DBHandle,
                 cf_iterator = CFIterator,
                 cf_generations = lists:zip(CFNames, CFRefs)
@@ -385,9 +382,9 @@ open_db(Keyspace, Shard, Options) ->
 open_gen(
     GenId,
     Gen = #{module := Mod, data := Data},
-    #s{keyspace = Keyspace, shard = Shard, db = DBHandle, cf_generations = CFs}
+    #s{shard = Shard, db = DBHandle, cf_generations = CFs}
 ) ->
-    DB = Mod:open(Keyspace, Shard, DBHandle, GenId, CFs, Data),
+    DB = Mod:open(Shard, DBHandle, GenId, CFs, Data),
     Gen#{data := DB}.
 
 -spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none.

+ 7 - 7
apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl

@@ -6,7 +6,7 @@
 -behaviour(supervisor).
 
 %% API:
--export([start_link/0, start_shard/3, stop_shard/1]).
+-export([start_link/0, start_shard/2, stop_shard/1]).
 
 %% behaviour callbacks:
 -export([init/1]).
@@ -25,10 +25,10 @@
 start_link() ->
     supervisor:start_link({local, ?SUP}, ?MODULE, []).
 
--spec start_shard(emqx_ds:keyspace(), emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
+-spec start_shard(emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
     supervisor:startchild_ret().
-start_shard(Keyspace, Shard, Options) ->
-    supervisor:start_child(?SUP, shard_child_spec(Keyspace, Shard, Options)).
+start_shard(Shard, Options) ->
+    supervisor:start_child(?SUP, shard_child_spec(Shard, Options)).
 
 -spec stop_shard(emqx_ds:shard()) -> ok | {error, _}.
 stop_shard(Shard) ->
@@ -52,12 +52,12 @@ init([]) ->
 %% Internal functions
 %%================================================================================
 
--spec shard_child_spec(emqx_ds:keyspace(), emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
+-spec shard_child_spec(emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
     supervisor:child_spec().
-shard_child_spec(Keyspace, Shard, Options) ->
+shard_child_spec(Shard, Options) ->
     #{
         id => Shard,
-        start => {emqx_ds_storage_layer, start_link, [Keyspace, Shard, Options]},
+        start => {emqx_ds_storage_layer, start_link, [Shard, Options]},
         shutdown => 5_000,
         restart => permanent,
         type => worker

+ 16 - 14
apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl

@@ -10,7 +10,6 @@
 -include_lib("stdlib/include/assert.hrl").
 
 -define(SHARD, shard(?FUNCTION_NAME)).
--define(KEYSPACE, keyspace(?FUNCTION_NAME)).
 
 -define(DEFAULT_CONFIG,
     {emqx_ds_message_storage_bitmask, #{
@@ -34,7 +33,7 @@
 %% Smoke test for opening and reopening the database
 t_open(_Config) ->
     ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
-    {ok, _} = emqx_ds_storage_layer_sup:start_shard(?KEYSPACE, ?SHARD, #{}).
+    {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}).
 
 %% Smoke test of store function
 t_store(_Config) ->
@@ -137,16 +136,16 @@ t_iterate_long_tail_wildcard(_Config) ->
     ).
 
 t_create_gen(_Config) ->
-    {ok, 1} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 5, ?DEFAULT_CONFIG),
+    {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG),
     ?assertEqual(
         {error, nonmonotonic},
-        emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 1, ?DEFAULT_CONFIG)
+        emqx_ds_storage_layer:create_generation(?SHARD, 1, ?DEFAULT_CONFIG)
     ),
     ?assertEqual(
         {error, nonmonotonic},
-        emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 5, ?DEFAULT_CONFIG)
+        emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG)
     ),
-    {ok, 2} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 10, ?COMPACT_CONFIG),
+    {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
     Topics = ["foo/bar", "foo/bar/baz"],
     Timestamps = lists:seq(1, 100),
     [
@@ -155,9 +154,9 @@ t_create_gen(_Config) ->
     ].
 
 t_iterate_multigen(_Config) ->
-    {ok, 1} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 10, ?COMPACT_CONFIG),
-    {ok, 2} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 50, ?DEFAULT_CONFIG),
-    {ok, 3} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 1000, ?DEFAULT_CONFIG),
+    {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
+    {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
+    {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG),
     Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
     Timestamps = lists:seq(1, 100),
     _ = [
@@ -181,9 +180,9 @@ t_iterate_multigen(_Config) ->
 
 t_iterate_multigen_preserve_restore(_Config) ->
     ReplayID = atom_to_binary(?FUNCTION_NAME),
-    {ok, 1} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 10, ?COMPACT_CONFIG),
-    {ok, 2} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 50, ?DEFAULT_CONFIG),
-    {ok, 3} = emqx_ds_storage_layer:create_generation(?KEYSPACE, ?SHARD, 100, ?DEFAULT_CONFIG),
+    {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
+    {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
+    {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 100, ?DEFAULT_CONFIG),
     Topics = ["foo/bar", "foo/bar/baz", "a/bar"],
     Timestamps = lists:seq(1, 100),
     TopicFilter = "foo/#",
@@ -264,7 +263,7 @@ end_per_suite(_Config) ->
 
 init_per_testcase(TC, Config) ->
     ok = set_keyspace_config(keyspace(TC), ?DEFAULT_CONFIG),
-    {ok, _} = emqx_ds_storage_layer_sup:start_shard(keyspace(TC), shard(TC), #{}),
+    {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), #{}),
     Config.
 
 end_per_testcase(TC, _Config) ->
@@ -273,8 +272,11 @@ end_per_testcase(TC, _Config) ->
 keyspace(TC) ->
     list_to_atom(lists:concat([?MODULE, "_", TC])).
 
+shard_id(_TC) ->
+    <<"shard">>.
+
 shard(TC) ->
-    <<(atom_to_binary(keyspace(TC)))/binary, "_shard">>.
+    {keyspace(TC), shard_id(TC)}.
 
 set_keyspace_config(Keyspace, Config) ->
     ok = application:set_env(emqx_ds, keyspace_config, #{Keyspace => Config}).

+ 3 - 2
apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl

@@ -11,7 +11,8 @@
 -define(RUN_ID, {?MODULE, testrun_id}).
 
 -define(KEYSPACE, ?MODULE).
--define(SHARD, <<(atom_to_binary(?KEYSPACE))/binary, "_shard">>).
+-define(SHARD_ID, <<"shard">>).
+-define(SHARD, {?KEYSPACE, ?SHARD_ID}).
 -define(GEN_ID, 42).
 
 %%--------------------------------------------------------------------
@@ -256,7 +257,7 @@ iterate_shim(Shim, Iteration) ->
 open_db(Filepath, Options) ->
     {ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]),
     {Schema, CFRefs} = emqx_ds_message_storage_bitmask:create_new(Handle, ?GEN_ID, Options),
-    DB = emqx_ds_message_storage_bitmask:open(?SHARD, ?KEYSPACE, Handle, ?GEN_ID, CFRefs, Schema),
+    DB = emqx_ds_message_storage_bitmask:open(?SHARD, Handle, ?GEN_ID, CFRefs, Schema),
     {DB, Handle}.
 
 close_db(Handle) ->