Просмотр исходного кода

Merge pull request #12157 from lafirest/feat/dsgen

feat(ds): add an API for making new generations
lafirest 2 лет назад
Родитель
Сommit
de02dd21ca

+ 14 - 2
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -22,7 +22,7 @@
 -module(emqx_ds).
 
 %% Management API:
--export([open_db/2, drop_db/1]).
+-export([open_db/2, add_generation/2, add_generation/1, drop_db/1]).
 
 %% Message storage API:
 -export([store_batch/2, store_batch/3]).
@@ -95,7 +95,7 @@
 
 %% Timestamp
 %% Earliest possible timestamp is 0.
-%% TODO granularity?  Currently, we should always use micro second, as that's the unit we
+%% TODO granularity?  Currently, we should always use milliseconds, as that's the unit we
 %% use in emqx_guid.  Otherwise, the iterators won't match the message timestamps.
 -type time() :: non_neg_integer().
 
@@ -124,6 +124,10 @@
 
 -callback open_db(db(), create_db_opts()) -> ok | {error, _}.
 
+-callback add_generation(db()) -> ok | {error, _}.
+
+-callback add_generation(db(), create_db_opts()) -> ok | {error, _}.
+
 -callback drop_db(db()) -> ok | {error, _}.
 
 -callback store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result().
@@ -154,6 +158,14 @@ open_db(DB, Opts = #{backend := Backend}) when Backend =:= builtin orelse Backen
     persistent_term:put(?persistent_term(DB), Module),
     ?module(DB):open_db(DB, Opts).
 
+-spec add_generation(db()) -> ok.
+add_generation(DB) ->
+    ?module(DB):add_generation(DB).
+
+-spec add_generation(db(), create_db_opts()) -> ok.
+add_generation(DB, Opts) ->
+    ?module(DB):add_generation(DB, Opts).
+
 %% @doc TODO: currently if one or a few shards are down, they won't be
 
 %% deleted.

+ 25 - 1
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -23,6 +23,8 @@
 -export([
     list_shards/1,
     open_db/2,
+    add_generation/1,
+    add_generation/2,
     drop_db/1,
     store_batch/3,
     get_streams/3,
@@ -38,7 +40,8 @@
     do_get_streams_v1/4,
     do_make_iterator_v1/5,
     do_update_iterator_v2/4,
-    do_next_v1/4
+    do_next_v1/4,
+    do_add_generation_v2/1
 ]).
 
 -export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]).
@@ -121,6 +124,16 @@ open_db(DB, CreateOpts) ->
         MyShards
     ).
 
+-spec add_generation(emqx_ds:db()) -> ok | {error, _}.
+add_generation(DB) ->
+    Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB),
+    _ = emqx_ds_proto_v2:add_generation(Nodes, DB),
+    ok.
+
+-spec add_generation(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
+add_generation(DB, CreateOpts) ->
+    emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts).
+
 -spec drop_db(emqx_ds:db()) -> ok | {error, _}.
 drop_db(DB) ->
     Nodes = list_nodes(),
@@ -284,6 +297,17 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
 do_next_v1(DB, Shard, Iter, BatchSize) ->
     emqx_ds_storage_layer:next({DB, Shard}, Iter, BatchSize).
 
+-spec do_add_generation_v2(emqx_ds:db()) -> ok | {error, _}.
+do_add_generation_v2(DB) ->
+    MyShards = emqx_ds_replication_layer_meta:my_owned_shards(DB),
+
+    lists:foreach(
+        fun(ShardId) ->
+            emqx_ds_storage_layer:add_generation({DB, ShardId})
+        end,
+        MyShards
+    ).
+
 %%================================================================================
 %% Internal functions
 %%================================================================================

+ 130 - 13
apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl

@@ -21,20 +21,26 @@
 %% implementation details from this module.
 -module(emqx_ds_replication_layer_meta).
 
+-compile(inline).
+
 -behaviour(gen_server).
 
 %% API:
 -export([
     shards/1,
     my_shards/1,
+    my_owned_shards/1,
+    leader_nodes/1,
     replica_set/2,
     in_sync_replicas/2,
     sites/0,
     open_db/2,
+    update_db_config/2,
     drop_db/1,
     shard_leader/2,
     this_site/0,
     set_leader/3,
+    is_leader/1,
     print_status/0
 ]).
 
@@ -44,16 +50,19 @@
 %% internal exports:
 -export([
     open_db_trans/2,
+    update_db_config_trans/2,
     drop_db_trans/1,
     claim_site/2,
     in_sync_replicas_trans/2,
     set_leader_trans/3,
+    is_leader_trans/1,
     n_shards/1
 ]).
 
 -export_type([site/0]).
 
 -include_lib("stdlib/include/qlc.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
 
 %%================================================================================
 %% Type declarations
@@ -145,22 +154,34 @@ start_link() ->
 
 -spec shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
 shards(DB) ->
-    eval_qlc(
-        qlc:q([Shard || #?SHARD_TAB{shard = {D, Shard}} <- mnesia:table(?SHARD_TAB), D =:= DB])
-    ).
+    filter_shards(DB).
 
 -spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
 my_shards(DB) ->
     Site = this_site(),
-    eval_qlc(
-        qlc:q([
-            Shard
-         || #?SHARD_TAB{shard = {D, Shard}, replica_set = ReplicaSet, in_sync_replicas = InSync} <- mnesia:table(
-                ?SHARD_TAB
-            ),
-            D =:= DB,
-            lists:member(Site, ReplicaSet) orelse lists:member(Site, InSync)
-        ])
+    filter_shards(DB, fun(#?SHARD_TAB{replica_set = ReplicaSet, in_sync_replicas = InSync}) ->
+        lists:member(Site, ReplicaSet) orelse lists:member(Site, InSync)
+    end).
+
+-spec my_owned_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
+my_owned_shards(DB) ->
+    Self = node(),
+    filter_shards(DB, fun(#?SHARD_TAB{leader = Leader}) ->
+        Self =:= Leader
+    end).
+
+-spec leader_nodes(emqx_ds:db()) -> [node()].
+leader_nodes(DB) ->
+    lists:uniq(
+        filter_shards(
+            DB,
+            fun(#?SHARD_TAB{leader = Leader}) ->
+                Leader =/= undefined
+            end,
+            fun(#?SHARD_TAB{leader = Leader}) ->
+                Leader
+            end
+        )
     ).
 
 -spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
@@ -204,12 +225,25 @@ set_leader(DB, Shard, Node) ->
     {atomic, _} = mria:transaction(?SHARD, fun ?MODULE:set_leader_trans/3, [DB, Shard, Node]),
     ok.
 
+-spec is_leader(node()) -> boolean().
+is_leader(Node) ->
+    {atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:is_leader_trans/1, [Node]),
+    Result.
+
 -spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
     emqx_ds_replication_layer:builtin_db_opts().
 open_db(DB, DefaultOpts) ->
     {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]),
     Opts.
 
+-spec update_db_config(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
+    ok | {error, _}.
+update_db_config(DB, DefaultOpts) ->
+    {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:update_db_config_trans/2, [
+        DB, DefaultOpts
+    ]),
+    Opts.
+
 -spec drop_db(emqx_ds:db()) -> ok.
 drop_db(DB) ->
     _ = mria:transaction(?SHARD, fun ?MODULE:drop_db_trans/1, [DB]),
@@ -226,6 +260,7 @@ init([]) ->
     logger:set_process_metadata(#{domain => [ds, meta]}),
     ensure_tables(),
     ensure_site(),
+    {ok, _} = mnesia:subscribe({table, ?META_TAB, detailed}),
     S = #s{},
     {ok, S}.
 
@@ -235,6 +270,18 @@ handle_call(_Call, _From, S) ->
 handle_cast(_Cast, S) ->
     {noreply, S}.
 
+handle_info(
+    {mnesia_table_event, {write, ?META_TAB, #?META_TAB{db = DB, db_props = Options}, [_], _}}, S
+) ->
+    MyShards = my_owned_shards(DB),
+
+    lists:foreach(
+        fun(ShardId) ->
+            emqx_ds_storage_layer:update_config({DB, ShardId}, Options)
+        end,
+        MyShards
+    ),
+    {noreply, S};
 handle_info(_Info, S) ->
     {noreply, S}.
 
@@ -260,6 +307,31 @@ open_db_trans(DB, CreateOpts) ->
             Opts
     end.
 
+-spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
+    ok | {error, database}.
+update_db_config_trans(DB, CreateOpts) ->
+    case mnesia:wread({?META_TAB, DB}) of
+        [#?META_TAB{db_props = Opts}] ->
+            %% Since this is an update and not a reopen,
+            %% we should keep the shard number and replication factor
+            %% and not create a new shard server
+            #{
+                n_shards := NShards,
+                replication_factor := ReplicationFactor
+            } = Opts,
+
+            mnesia:write(#?META_TAB{
+                db = DB,
+                db_props = CreateOpts#{
+                    n_shards := NShards,
+                    replication_factor := ReplicationFactor
+                }
+            }),
+            ok;
+        [] ->
+            {error, no_database}
+    end.
+
 -spec drop_db_trans(emqx_ds:db()) -> ok.
 drop_db_trans(DB) ->
     mnesia:delete({?META_TAB, DB}),
@@ -287,6 +359,24 @@ set_leader_trans(DB, Shard, Node) ->
     Record = Record0#?SHARD_TAB{leader = Node},
     mnesia:write(Record).
 
+-spec is_leader_trans(node) -> boolean().
+is_leader_trans(Node) ->
+    case
+        mnesia:select(
+            ?SHARD_TAB,
+            ets:fun2ms(fun(#?SHARD_TAB{leader = Leader}) ->
+                Leader =:= Node
+            end),
+            1,
+            read
+        )
+    of
+        {[_ | _], _Cont} ->
+            true;
+        _ ->
+            false
+    end.
+
 %%================================================================================
 %% Internal functions
 %%================================================================================
@@ -346,7 +436,7 @@ create_shards(DB, NShards, ReplicationFactor) ->
             Hashes0 = [{hash(Shard, Site), Site} || Site <- AllSites],
             Hashes = lists:sort(Hashes0),
             {_, Sites} = lists:unzip(Hashes),
-            [First | _] = ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor),
+            [First | ReplicaSet] = lists:sublist(Sites, 1, ReplicationFactor),
             Record = #?SHARD_TAB{
                 shard = {DB, Shard},
                 replica_set = ReplicaSet,
@@ -369,3 +459,30 @@ eval_qlc(Q) ->
             {atomic, Result} = mria:ro_transaction(?SHARD, fun() -> qlc:eval(Q) end),
             Result
     end.
+
+filter_shards(DB) ->
+    filter_shards(DB, const(true)).
+
+-spec filter_shards(emqx_ds:db(), fun((_) -> boolean())) ->
+    [emqx_ds_replication_layer:shard_id()].
+filter_shards(DB, Predicte) ->
+    filter_shards(DB, Predicte, fun(#?SHARD_TAB{shard = {_, ShardId}}) ->
+        ShardId
+    end).
+
+filter_shards(DB, Predicate, Mapper) ->
+    eval_qlc(
+        qlc:q([
+            Mapper(Shard)
+         || #?SHARD_TAB{shard = {D, _}} = Shard <- mnesia:table(
+                ?SHARD_TAB
+            ),
+            D =:= DB,
+            Predicate(Shard)
+        ])
+    ).
+
+const(Result) ->
+    fun(_) ->
+        Result
+    end.

+ 37 - 6
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -25,7 +25,9 @@
     get_streams/3,
     make_iterator/4,
     update_iterator/3,
-    next/3
+    next/3,
+    update_config/2,
+    add_generation/1
 ]).
 
 %% gen_server
@@ -47,6 +49,8 @@
 
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
+-define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
+
 %%================================================================================
 %% Type declarations
 %%================================================================================
@@ -249,13 +253,19 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch
             Error
     end.
 
+-spec update_config(shard_id(), emqx_ds:create_db_opts()) -> ok.
+update_config(ShardId, Options) ->
+    gen_server:call(?REF(ShardId), {?FUNCTION_NAME, Options}, infinity).
+
+-spec add_generation(shard_id()) -> ok.
+add_generation(ShardId) ->
+    gen_server:call(?REF(ShardId), add_generation, infinity).
+
 %%================================================================================
 %% gen_server for the shard
 %%================================================================================
 
--define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
-
--spec start_link(shard_id(), options()) ->
+-spec start_link(shard_id(), emqx_ds:create_db_opts()) ->
     {ok, pid()}.
 start_link(Shard = {_, _}, Options) ->
     gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []).
@@ -300,6 +310,18 @@ init({ShardId, Options}) ->
     commit_metadata(S),
     {ok, S}.
 
+handle_call({update_config, Options}, _From, #s{schema = Schema} = S0) ->
+    Prototype = maps:get(storage, Options),
+    S1 = S0#s{schema = Schema#{prototype := Prototype}},
+    Since = emqx_message:timestamp_now(),
+    S = add_generation(S1, Since),
+    commit_metadata(S),
+    {reply, ok, S};
+handle_call(add_generation, _From, S0) ->
+    Since = emqx_message:timestamp_now(),
+    S = add_generation(S0, Since),
+    commit_metadata(S),
+    {reply, ok, S};
 handle_call(#call_create_generation{since = Since}, _From, S0) ->
     S = add_generation(S0, Since),
     commit_metadata(S),
@@ -342,11 +364,13 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) ->
 -spec add_generation(server_state(), emqx_ds:time()) -> server_state().
 add_generation(S0, Since) ->
     #s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0,
-    {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema0, Since),
+    Schema1 = update_last_until(Schema0, Since),
+    Shard1 = update_last_until(Shard0, Since),
+    {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Since),
     CFRefs = NewCFRefs ++ CFRefs0,
     Key = {generation, GenId},
     Generation = open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)),
-    Shard = Shard0#{Key => Generation},
+    Shard = Shard1#{current_generation := GenId, Key => Generation},
     S0#s{
         cf_refs = CFRefs,
         schema = Schema,
@@ -426,6 +450,13 @@ rocksdb_open(Shard, Options) ->
 db_dir({DB, ShardId}) ->
     filename:join([emqx:data_dir(), atom_to_list(DB), binary_to_list(ShardId)]).
 
+-spec update_last_until(Schema, emqx_ds:time()) -> Schema when Schema :: shard_schema() | shard().
+update_last_until(Schema, Until) ->
+    #{current_generation := GenId} = Schema,
+    GenData0 = maps:get({generation, GenId}, Schema),
+    GenData = GenData0#{until := Until},
+    Schema#{{generation, GenId} := GenData}.
+
 %%--------------------------------------------------------------------------------
 %% Schema access
 %%--------------------------------------------------------------------------------

+ 7 - 1
apps/emqx_durable_storage/src/proto/emqx_ds_proto_v2.erl

@@ -27,7 +27,8 @@
     next/5,
 
     %% introduced in v2
-    update_iterator/5
+    update_iterator/5,
+    add_generation/2
 ]).
 
 %% behavior callbacks:
@@ -110,6 +111,11 @@ update_iterator(Node, DB, Shard, OldIter, DSKey) ->
         DB, Shard, OldIter, DSKey
     ]).
 
+-spec add_generation([node()], emqx_ds:db()) ->
+    [{ok, ok} | {error, _}].
+add_generation(Node, DB) ->
+    erpc:multicall(Node, emqx_ds_replication_layer, do_add_generation_v2, [DB]).
+
 %%================================================================================
 %% behavior callbacks
 %%================================================================================

+ 125 - 1
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -50,7 +50,7 @@ t_00_smoke_open_drop(_Config) ->
     lists:foreach(
         fun(Shard) ->
             ?assertEqual(
-                {ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard)
+                {ok, []}, emqx_ds_replication_layer_meta:replica_set(DB, Shard)
             ),
             ?assertEqual(
                 [Site], emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard)
@@ -155,6 +155,128 @@ t_05_update_iterator(_Config) ->
     ?assertEqual(Msgs, AllMsgs, #{from_key => Iter1, final_iter => FinalIter}),
     ok.
 
+t_05_update_config(_Config) ->
+    DB = ?FUNCTION_NAME,
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    TopicFilter = ['#'],
+
+    DataSet = update_data_set(),
+
+    ToMsgs = fun(Datas) ->
+        lists:map(
+            fun({Topic, Payload}) ->
+                message(Topic, Payload, emqx_message:timestamp_now())
+            end,
+            Datas
+        )
+    end,
+
+    {_, StartTimes, MsgsList} =
+        lists:foldl(
+            fun
+                (Datas, {true, TimeAcc, MsgAcc}) ->
+                    Msgs = ToMsgs(Datas),
+                    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
+                    {false, TimeAcc, [Msgs | MsgAcc]};
+                (Datas, {Any, TimeAcc, MsgAcc}) ->
+                    timer:sleep(500),
+                    ?assertMatch(ok, emqx_ds:add_generation(DB, opts())),
+                    timer:sleep(500),
+                    StartTime = emqx_message:timestamp_now(),
+                    Msgs = ToMsgs(Datas),
+                    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
+                    {Any, [StartTime | TimeAcc], [Msgs | MsgAcc]}
+            end,
+            {true, [emqx_message:timestamp_now()], []},
+            DataSet
+        ),
+
+    Checker = fun({StartTime, Msgs0}, Acc) ->
+        Msgs = Msgs0 ++ Acc,
+        Batch = fetch_all(DB, TopicFilter, StartTime),
+        ?assertEqual(Msgs, Batch, {StartTime}),
+        Msgs
+    end,
+    lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)).
+
+t_06_add_generation(_Config) ->
+    DB = ?FUNCTION_NAME,
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    TopicFilter = ['#'],
+
+    DataSet = update_data_set(),
+
+    ToMsgs = fun(Datas) ->
+        lists:map(
+            fun({Topic, Payload}) ->
+                message(Topic, Payload, emqx_message:timestamp_now())
+            end,
+            Datas
+        )
+    end,
+
+    {_, StartTimes, MsgsList} =
+        lists:foldl(
+            fun
+                (Datas, {true, TimeAcc, MsgAcc}) ->
+                    Msgs = ToMsgs(Datas),
+                    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
+                    {false, TimeAcc, [Msgs | MsgAcc]};
+                (Datas, {Any, TimeAcc, MsgAcc}) ->
+                    timer:sleep(500),
+                    ?assertMatch(ok, emqx_ds:add_generation(DB)),
+                    timer:sleep(500),
+                    StartTime = emqx_message:timestamp_now(),
+                    Msgs = ToMsgs(Datas),
+                    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
+                    {Any, [StartTime | TimeAcc], [Msgs | MsgAcc]}
+            end,
+            {true, [emqx_message:timestamp_now()], []},
+            DataSet
+        ),
+
+    Checker = fun({StartTime, Msgs0}, Acc) ->
+        Msgs = Msgs0 ++ Acc,
+        Batch = fetch_all(DB, TopicFilter, StartTime),
+        ?assertEqual(Msgs, Batch, {StartTime}),
+        Msgs
+    end,
+    lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)).
+
+update_data_set() ->
+    [
+        [
+            {<<"foo/bar">>, <<"1">>}
+        ],
+
+        [
+            {<<"foo">>, <<"2">>}
+        ],
+
+        [
+            {<<"bar/bar">>, <<"3">>}
+        ]
+    ].
+
+fetch_all(DB, TopicFilter, StartTime) ->
+    Streams0 = emqx_ds:get_streams(DB, TopicFilter, StartTime),
+    Streams = lists:sort(
+        fun({{_, A}, _}, {{_, B}, _}) ->
+            A < B
+        end,
+        Streams0
+    ),
+    lists:foldl(
+        fun({_, Stream}, Acc) ->
+            {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
+            {ok, _, Msgs0} = iterate(DB, Iter0, StartTime),
+            Msgs = lists:map(fun({_, Msg}) -> Msg end, Msgs0),
+            Acc ++ Msgs
+        end,
+        [],
+        Streams
+    ).
+
 message(Topic, Payload, PublishedAt) ->
     #message{
         topic = Topic,
@@ -172,6 +294,8 @@ iterate(DB, It0, BatchSize, Acc) ->
             {ok, It, Acc};
         {ok, It, Msgs} ->
             iterate(DB, It, BatchSize, Acc ++ Msgs);
+        {ok, end_of_stream} ->
+            {ok, It0, Acc};
         Ret ->
             Ret
     end.