Jelajahi Sumber

refactor(ds): Implement create_generation gen_rpc storage layer call

ieQu1 2 tahun lalu
induk
melakukan
2972bf14ee

+ 5 - 2
apps/emqx/src/emqx_persistent_message.erl

@@ -26,6 +26,8 @@
     persist/1
     persist/1
 ]).
 ]).
 
 
+-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
+
 %% FIXME
 %% FIXME
 -define(WHEN_ENABLED(DO),
 -define(WHEN_ENABLED(DO),
     case is_store_enabled() of
     case is_store_enabled() of
@@ -38,7 +40,7 @@
 
 
 init() ->
 init() ->
     ?WHEN_ENABLED(begin
     ?WHEN_ENABLED(begin
-        ok = emqx_ds:open_db(<<"default">>, #{}),
+        ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{}),
         ok = emqx_persistent_session_ds_router:init_tables(),
         ok = emqx_persistent_session_ds_router:init_tables(),
         %ok = emqx_persistent_session_ds:create_tables(),
         %ok = emqx_persistent_session_ds:create_tables(),
         ok
         ok
@@ -65,8 +67,9 @@ persist(Msg) ->
 needs_persistence(Msg) ->
 needs_persistence(Msg) ->
     not (emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg)).
     not (emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg)).
 
 
+-spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result().
 store_message(Msg) ->
 store_message(Msg) ->
-    emqx_ds:store_batch([Msg]).
+    emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg]).
 
 
 has_subscribers(#message{topic = Topic}) ->
 has_subscribers(#message{topic = Topic}) ->
     emqx_persistent_session_ds_router:has_any_route(Topic).
     emqx_persistent_session_ds_router:has_any_route(Topic).

+ 1 - 7
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -25,7 +25,7 @@
 -export([open_db/2, drop_db/1]).
 -export([open_db/2, drop_db/1]).
 
 
 %% Message storage API:
 %% Message storage API:
--export([store_batch/1, store_batch/2, store_batch/3]).
+-export([store_batch/2, store_batch/3]).
 
 
 %% Message replay API:
 %% Message replay API:
 -export([get_streams/3, make_iterator/2, next/2]).
 -export([get_streams/3, make_iterator/2, next/2]).
@@ -89,8 +89,6 @@
 
 
 -type message_id() :: emqx_ds_replication_layer:message_id().
 -type message_id() :: emqx_ds_replication_layer:message_id().
 
 
--define(DEFAULT_DB, <<"default">>).
-
 %%================================================================================
 %%================================================================================
 %% API funcions
 %% API funcions
 %%================================================================================
 %%================================================================================
@@ -107,10 +105,6 @@ open_db(DB, Opts) ->
 drop_db(DB) ->
 drop_db(DB) ->
     emqx_ds_replication_layer:drop_db(DB).
     emqx_ds_replication_layer:drop_db(DB).
 
 
--spec store_batch([emqx_types:message()]) -> store_batch_result().
-store_batch(Msgs) ->
-    store_batch(?DEFAULT_DB, Msgs, #{}).
-
 -spec store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result().
 -spec store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result().
 store_batch(DB, Msgs, Opts) ->
 store_batch(DB, Msgs, Opts) ->
     emqx_ds_replication_layer:store_batch(DB, Msgs, Opts).
     emqx_ds_replication_layer:store_batch(DB, Msgs, Opts).

+ 0 - 4
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -196,10 +196,6 @@ do_next_v1(Shard, Iter, BatchSize) ->
 %% Internal functions
 %% Internal functions
 %%================================================================================
 %%================================================================================
 
 
-add_shard_to_rank(Shard, RankY) ->
-    RankX = erlang:phash2(Shard, 255),
-    {RankX, RankY}.
-
 shard_id(DB, Node) ->
 shard_id(DB, Node) ->
     %% TODO: don't bake node name into the schema, don't repeat the
     %% TODO: don't bake node name into the schema, don't repeat the
     %% Mnesia's 1M$ mistake.
     %% Mnesia's 1M$ mistake.

+ 49 - 23
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -24,10 +24,10 @@
 -export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
 -export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
 
 
 %% internal exports:
 %% internal exports:
--export([drop_shard/1]).
-
 -export_type([gen_id/0, generation/0, cf_refs/0, stream/0, iterator/0]).
 -export_type([gen_id/0, generation/0, cf_refs/0, stream/0, iterator/0]).
 
 
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
 %%================================================================================
 %%================================================================================
 %% Type declarations
 %% Type declarations
 %%================================================================================
 %%================================================================================
@@ -79,9 +79,11 @@
 %%%% Shard:
 %%%% Shard:
 
 
 -type shard(GenData) :: #{
 -type shard(GenData) :: #{
+    %% ID of the current generation (where the new data is written:)
     current_generation := gen_id(),
     current_generation := gen_id(),
-    default_generation_module := module(),
-    default_generation_config := term(),
+    %% This data is used to create new generation:
+    prototype := {module(), term()},
+    %% Generations:
     {generation, gen_id()} => GenData
     {generation, gen_id()} => GenData
 }.
 }.
 
 
@@ -206,6 +208,9 @@ start_link(Shard, Options) ->
     shard :: shard()
     shard :: shard()
 }).
 }).
 
 
+%% Note: we specify gen_server requests as records to make use of Dialyzer:
+-record(call_create_generation, {since :: emqx_ds:time()}).
+
 -type server_state() :: #s{}.
 -type server_state() :: #s{}.
 
 
 -define(DEFAULT_CF, "default").
 -define(DEFAULT_CF, "default").
@@ -213,6 +218,7 @@ start_link(Shard, Options) ->
 
 
 init({ShardId, Options}) ->
 init({ShardId, Options}) ->
     process_flag(trap_exit, true),
     process_flag(trap_exit, true),
+    logger:set_process_metadata(#{shard_id => ShardId, domain => [ds, storage_layer, shard]}),
     erase_schema_runtime(ShardId),
     erase_schema_runtime(ShardId),
     {ok, DB, CFRefs0} = rocksdb_open(ShardId, Options),
     {ok, DB, CFRefs0} = rocksdb_open(ShardId, Options),
     {Schema, CFRefs} =
     {Schema, CFRefs} =
@@ -233,13 +239,10 @@ init({ShardId, Options}) ->
     commit_metadata(S),
     commit_metadata(S),
     {ok, S}.
     {ok, S}.
 
 
-%% handle_call({create_generation, Since, Config}, _From, S) ->
-%%     case create_new_gen(Since, Config, S) of
-%%         {ok, GenId, NS} ->
-%%             {reply, {ok, GenId}, NS};
-%%         {error, _} = Error ->
-%%             {reply, Error, S}
-%%     end;
+handle_call(#call_create_generation{since = Since}, _From, S0) ->
+    S = add_generation(S0, Since),
+    commit_metadata(S),
+    {reply, ok, S};
 handle_call(_Call, _From, S) ->
 handle_call(_Call, _From, S) ->
     {reply, {error, unknown_call}, S}.
     {reply, {error, unknown_call}, S}.
 
 
@@ -275,29 +278,52 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) ->
         ShardSchema
         ShardSchema
     ).
     ).
 
 
+-spec add_generation(server_state(), emqx_ds:time()) -> server_state().
+add_generation(S0, Since) ->
+    #s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0,
+    {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema0, Since),
+    CFRefs = NewCFRefs ++ CFRefs0,
+    Key = {generation, GenId},
+    Generation = open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)),
+    Shard = Shard0#{Key => Generation},
+    S0#s{
+        cf_refs = CFRefs,
+        schema = Schema,
+        shard = Shard
+    }.
+
 -spec open_generation(shard_id(), rocksdb:db_handle(), cf_refs(), gen_id(), generation_schema()) ->
 -spec open_generation(shard_id(), rocksdb:db_handle(), cf_refs(), gen_id(), generation_schema()) ->
     generation().
     generation().
 open_generation(ShardId, DB, CFRefs, GenId, GenSchema) ->
 open_generation(ShardId, DB, CFRefs, GenId, GenSchema) ->
+    ?tp(debug, ds_open_generation, #{gen_id => GenId, schema => GenSchema}),
     #{module := Mod, data := Schema} = GenSchema,
     #{module := Mod, data := Schema} = GenSchema,
     RuntimeData = Mod:open(ShardId, DB, GenId, CFRefs, Schema),
     RuntimeData = Mod:open(ShardId, DB, GenId, CFRefs, Schema),
     GenSchema#{data => RuntimeData}.
     GenSchema#{data => RuntimeData}.
 
 
 -spec create_new_shard_schema(shard_id(), rocksdb:db_handle(), cf_refs(), _Options) ->
 -spec create_new_shard_schema(shard_id(), rocksdb:db_handle(), cf_refs(), _Options) ->
     {shard_schema(), cf_refs()}.
     {shard_schema(), cf_refs()}.
-create_new_shard_schema(ShardId, DB, CFRefs, _Options) ->
-    GenId = 1,
-    %% TODO: read from options/config
-    Mod = emqx_ds_storage_reference,
-    ModConfig = #{},
-    {GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConfig),
-    GenSchema = #{module => Mod, data => GenData, since => 0, until => undefined},
-    ShardSchema = #{
+create_new_shard_schema(ShardId, DB, CFRefs, Options) ->
+    ?tp(notice, ds_create_new_shard_schema, #{shard => ShardId, options => Options}),
+    %% TODO: read prototype from options/config
+    Schema0 = #{
+        current_generation => 0,
+        prototype => {emqx_ds_storage_reference, #{}}
+    },
+    {_NewGenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema0, _Since = 0),
+    {Schema, NewCFRefs ++ CFRefs}.
+
+-spec new_generation(shard_id(), rocksdb:db_handle(), shard_schema(), emqx_ds:time()) ->
+    {gen_id(), shard_schema(), cf_refs()}.
+new_generation(ShardId, DB, Schema0, Since) ->
+    #{current_generation := PrevGenId, prototype := {Mod, ModConf}} = Schema0,
+    GenId = PrevGenId + 1,
+    {GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConf),
+    GenSchema = #{module => Mod, data => GenData, since => Since, until => undefined},
+    Schema = Schema0#{
         current_generation => GenId,
         current_generation => GenId,
-        default_generation_module => Mod,
-        default_generation_confg => ModConfig,
         {generation, GenId} => GenSchema
         {generation, GenId} => GenSchema
     },
     },
-    {ShardSchema, NewCFRefs ++ CFRefs}.
+    {GenId, Schema, NewCFRefs}.
 
 
 %% @doc Commit current state of the server to both rocksdb and the persistent term
 %% @doc Commit current state of the server to both rocksdb and the persistent term
 -spec commit_metadata(server_state()) -> ok.
 -spec commit_metadata(server_state()) -> ok.
@@ -393,7 +419,7 @@ get_schema_persistent(DB) ->
         {ok, Blob} ->
         {ok, Blob} ->
             Schema = binary_to_term(Blob),
             Schema = binary_to_term(Blob),
             %% Sanity check:
             %% Sanity check:
-            #{current_generation := _, default_generation_module := _} = Schema,
+            #{current_generation := _, prototype := _} = Schema,
             Schema;
             Schema;
         not_found ->
         not_found ->
             not_found
             not_found

+ 1 - 1
apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl

@@ -103,7 +103,7 @@ next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) ->
             first ->
             first ->
                 first;
                 first;
             _ ->
             _ ->
-                rocksdb:iterator_move(ITHandle, Key0),
+                _ = rocksdb:iterator_move(ITHandle, Key0),
                 next
                 next
         end,
         end,
     {Key, Messages} = do_next(TopicFilter, StartTime, ITHandle, Action, BatchSize, Key0, []),
     {Key, Messages} = do_next(TopicFilter, StartTime, ITHandle, Action, BatchSize, Key0, []),

+ 29 - 2
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -21,9 +21,10 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("stdlib/include/assert.hrl").
 -include_lib("stdlib/include/assert.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 
 %% A simple smoke test that verifies that opening/closing the DB
 %% A simple smoke test that verifies that opening/closing the DB
-%% doesn't crash
+%% doesn't crash, and not much else
 t_00_smoke_open_drop(_Config) ->
 t_00_smoke_open_drop(_Config) ->
     DB = 'DB',
     DB = 'DB',
     ?assertMatch(ok, emqx_ds:open_db(DB, #{})),
     ?assertMatch(ok, emqx_ds:open_db(DB, #{})),
@@ -65,6 +66,32 @@ t_03_smoke_iterate(_Config) ->
     {ok, Iter, Batch} = iterate(Iter0, 1),
     {ok, Iter, Batch} = iterate(Iter0, 1),
     ?assertEqual(Msgs, Batch, {Iter0, Iter}).
     ?assertEqual(Msgs, Batch, {Iter0, Iter}).
 
 
+%% Verify that iterators survive restart of the application. This is
+%% an important property, since the lifetime of the iterators is tied
+%% to the external resources, such as clients' sessions, and they
+%% should always be able to continue replaying the topics from where
+%% they are left off.
+t_04_restart(_Config) ->
+    DB = ?FUNCTION_NAME,
+    ?assertMatch(ok, emqx_ds:open_db(DB, #{})),
+    StartTime = 0,
+    Msgs = [
+        message(<<"foo/bar">>, <<"1">>, 0),
+        message(<<"foo">>, <<"2">>, 1),
+        message(<<"bar/bar">>, <<"3">>, 2)
+    ],
+    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
+    [{_, Stream}] = emqx_ds:get_streams(DB, ['#'], StartTime),
+    {ok, Iter0} = emqx_ds:make_iterator(Stream, StartTime),
+    %% Restart the application:
+    ?tp(warning, emqx_ds_SUITE_restart_app, #{}),
+    ok = application:stop(emqx_durable_storage),
+    {ok, _} = application:ensure_all_started(emqx_durable_storage),
+    ok = emqx_ds:open_db(DB, #{}),
+    %% The old iterator should be still operational:
+    {ok, Iter, Batch} = iterate(Iter0, 1),
+    ?assertEqual(Msgs, Batch, {Iter0, Iter}).
+
 message(Topic, Payload, PublishedAt) ->
 message(Topic, Payload, PublishedAt) ->
     #message{
     #message{
         topic = Topic,
         topic = Topic,
@@ -102,7 +129,7 @@ end_per_suite(Config) ->
     ok.
     ok.
 
 
 init_per_testcase(_TC, Config) ->
 init_per_testcase(_TC, Config) ->
-    snabbkaffe:fix_ct_logging(),
+    %% snabbkaffe:fix_ct_logging(),
     application:ensure_all_started(emqx_durable_storage),
     application:ensure_all_started(emqx_durable_storage),
     Config.
     Config.