소스 검색

feat(ds): add `list_generations` and `drop_generation` APIs

Thales Macedo Garitezi 2 년 전
부모
커밋
4a0fd756ae

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -22,6 +22,7 @@
 {emqx_delayed,3}.
 {emqx_ds,1}.
 {emqx_ds,2}.
+{emqx_ds,3}.
 {emqx_eviction_agent,1}.
 {emqx_eviction_agent,2}.
 {emqx_exhook,1}.

+ 55 - 2
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -22,7 +22,14 @@
 -module(emqx_ds).
 
 %% Management API:
--export([open_db/2, update_db_config/2, add_generation/1, drop_db/1]).
+-export([
+    open_db/2,
+    update_db_config/2,
+    add_generation/1,
+    list_generations_with_lifetimes/1,
+    drop_generation/2,
+    drop_db/1
+]).
 
 %% Message storage API:
 -export([store_batch/2, store_batch/3]).
@@ -52,7 +59,10 @@
     get_iterator_result/1,
 
     ds_specific_stream/0,
-    ds_specific_iterator/0
+    ds_specific_iterator/0,
+    ds_specific_generation_rank/0,
+    generation_rank/0,
+    generation_info/0
 ]).
 
 %%================================================================================
@@ -80,6 +90,8 @@
 
 -type ds_specific_stream() :: term().
 
+-type ds_specific_generation_rank() :: term().
+
 -type message_key() :: binary().
 
 -type store_batch_result() :: ok | {error, _}.
@@ -114,6 +126,17 @@
 
 -type get_iterator_result(Iterator) :: {ok, Iterator} | undefined.
 
+%% An opaque term identifying a generation.  Each implementation will possibly add
+%% information to this term to match its inner structure (e.g.: by embedding the shard id,
+%% in the case of `emqx_ds_replication_layer').
+-opaque generation_rank() :: ds_specific_generation_rank().
+
+-type generation_info() :: #{
+    created_at := time(),
+    since := time(),
+    until := time() | undefined
+}.
+
 -define(persistent_term(DB), {emqx_ds_db_backend, DB}).
 
 -define(module(DB), (persistent_term:get(?persistent_term(DB)))).
@@ -128,6 +151,11 @@
 
 -callback update_db_config(db(), create_db_opts()) -> ok | {error, _}.
 
+-callback list_generations_with_lifetimes(db()) ->
+    #{generation_rank() => generation_info()}.
+
+-callback drop_generation(db(), generation_rank()) -> ok | {error, _}.
+
 -callback drop_db(db()) -> ok | {error, _}.
 
 -callback store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result().
@@ -142,6 +170,11 @@
 
 -callback next(db(), Iterator, pos_integer()) -> next_result(Iterator).
 
+-optional_callbacks([
+    list_generations_with_lifetimes/1,
+    drop_generation/2
+]).
+
 %%================================================================================
 %% API funcions
 %%================================================================================
@@ -166,6 +199,26 @@ add_generation(DB) ->
 update_db_config(DB, Opts) ->
     ?module(DB):update_db_config(DB, Opts).
 
+-spec list_generations_with_lifetimes(db()) -> #{generation_rank() => generation_info()}.
+list_generations_with_lifetimes(DB) ->
+    Mod = ?module(DB),
+    case erlang:function_exported(Mod, list_generations_with_lifetimes, 1) of
+        true ->
+            Mod:list_generations_with_lifetimes(DB);
+        false ->
+            #{}
+    end.
+
+-spec drop_generation(db(), generation_rank()) -> ok | {error, _}.
+drop_generation(DB, GenId) ->
+    Mod = ?module(DB),
+    case erlang:function_exported(Mod, drop_generation, 2) of
+        true ->
+            Mod:drop_generation(DB, GenId);
+        false ->
+            {error, not_implemented}
+    end.
+
 %% @doc TODO: currently if one or a few shards are down, they won't be
 
 %% deleted.

+ 43 - 2
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -25,6 +25,8 @@
     open_db/2,
     add_generation/1,
     update_db_config/2,
+    list_generations_with_lifetimes/1,
+    drop_generation/2,
     drop_db/1,
     store_batch/3,
     get_streams/3,
@@ -41,7 +43,9 @@
     do_make_iterator_v1/5,
     do_update_iterator_v2/4,
     do_next_v1/4,
-    do_add_generation_v2/1
+    do_add_generation_v2/1,
+    do_list_generations_with_lifetimes_v3/2,
+    do_drop_generation_v3/3
 ]).
 
 -export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]).
@@ -104,6 +108,8 @@
     ?batch_messages := [emqx_types:message()]
 }.
 
+-type generation_rank() :: {shard_id(), term()}.
+
 %%================================================================================
 %% API functions
 %%================================================================================
@@ -135,6 +141,32 @@ add_generation(DB) ->
 update_db_config(DB, CreateOpts) ->
     emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts).
 
+-spec list_generations_with_lifetimes(emqx_ds:db()) ->
+    #{generation_rank() => emqx_ds:generation_info()}.
+list_generations_with_lifetimes(DB) ->
+    Shards = list_shards(DB),
+    lists:foldl(
+        fun(Shard, GensAcc) ->
+            Node = node_of_shard(DB, Shard),
+            maps:fold(
+                fun(GenId, Data, AccInner) ->
+                    AccInner#{{Shard, GenId} => Data}
+                end,
+                GensAcc,
+                emqx_ds_proto_v3:list_generations_with_lifetimes(Node, DB, Shard)
+            )
+        end,
+        #{},
+        Shards
+    ).
+
+-spec drop_generation(emqx_ds:db(), generation_rank()) -> ok | {error, _}.
+drop_generation(DB, {Shard, GenId}) ->
+    %% TODO: drop generation in all nodes in the replica set, not only in the leader,
+    %% after we have proper replication in place.
+    Node = node_of_shard(DB, Shard),
+    emqx_ds_proto_v3:drop_generation(Node, DB, Shard, GenId).
+
 -spec drop_db(emqx_ds:db()) -> ok | {error, _}.
 drop_db(DB) ->
     Nodes = list_nodes(),
@@ -301,7 +333,6 @@ do_next_v1(DB, Shard, Iter, BatchSize) ->
 -spec do_add_generation_v2(emqx_ds:db()) -> ok | {error, _}.
 do_add_generation_v2(DB) ->
     MyShards = emqx_ds_replication_layer_meta:my_owned_shards(DB),
-
     lists:foreach(
         fun(ShardId) ->
             emqx_ds_storage_layer:add_generation({DB, ShardId})
@@ -309,6 +340,16 @@ do_add_generation_v2(DB) ->
         MyShards
     ).
 
+-spec do_list_generations_with_lifetimes_v3(emqx_ds:db(), shard_id()) ->
+    #{emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info()}.
+do_list_generations_with_lifetimes_v3(DB, ShardId) ->
+    emqx_ds_storage_layer:list_generations_with_lifetimes({DB, ShardId}).
+
+-spec do_drop_generation_v3(emqx_ds:db(), shard_id(), emqx_ds_storage_layer:gen_id()) ->
+    ok | {error, _}.
+do_drop_generation_v3(DB, ShardId, GenId) ->
+    emqx_ds_storage_layer:drop_generation({DB, ShardId}, GenId).
+
 %%================================================================================
 %% Internal functions
 %%================================================================================

+ 16 - 0
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -27,6 +27,7 @@
 -export([
     create/4,
     open/5,
+    drop/5,
     store_batch/4,
     get_streams/4,
     make_iterator/5,
@@ -199,6 +200,21 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
         ts_offset = TSOffsetBits
     }.
 
+-spec drop(
+    emqx_ds_storage_layer:shard_id(),
+    rocksdb:db_handle(),
+    emqx_ds_storage_layer:gen_id(),
+    emqx_ds_storage_layer:cf_refs(),
+    s()
+) ->
+    ok.
+drop(_Shard, DBHandle, GenId, CFRefs, #s{}) ->
+    {_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs),
+    {_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs),
+    ok = rocksdb:drop_column_family(DBHandle, DataCF),
+    ok = rocksdb:drop_column_family(DBHandle, TrieCF),
+    ok.
+
 -spec store_batch(
     emqx_ds_storage_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts()
 ) ->

+ 239 - 54
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -27,7 +27,9 @@
     update_iterator/3,
     next/3,
     update_config/2,
-    add_generation/1
+    add_generation/1,
+    list_generations_with_lifetimes/1,
+    drop_generation/2
 ]).
 
 %% gen_server
@@ -44,7 +46,8 @@
     iterator/0,
     shard_id/0,
     options/0,
-    prototype/0
+    prototype/0,
+    post_creation_context/0
 ]).
 
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@@ -95,11 +98,18 @@
 
 %%%% Generation:
 
+-define(GEN_KEY(GEN_ID), {generation, GEN_ID}).
+
 -type generation(Data) :: #{
     %% Module that handles data for the generation:
     module := module(),
     %% Module-specific data defined at generation creation time:
     data := Data,
+    %% Column families used by this generation
+    cf_refs := cf_refs(),
+    %% Time at which this was created.  Might differ from `since', in particular for the
+    %% first generation.
+    created_at := emqx_ds:time(),
     %% When should this generation become active?
     %% This generation should only contain messages timestamped no earlier than that.
     %% The very first generation will have `since` equal 0.
@@ -121,7 +131,7 @@
     %% This data is used to create new generation:
     prototype := prototype(),
     %% Generations:
-    {generation, gen_id()} => GenData
+    ?GEN_KEY(gen_id()) => GenData
 }.
 
 %% Shard schema (persistent):
@@ -132,6 +142,18 @@
 
 -type options() :: map().
 
+-type post_creation_context() ::
+    #{
+        shard_id := emqx_ds_storage_layer:shard_id(),
+        db := rocksdb:db_handle(),
+        new_gen_id := emqx_ds_storage_layer:gen_id(),
+        old_gen_id := emqx_ds_storage_layer:gen_id(),
+        new_cf_refs := cf_refs(),
+        old_cf_refs := cf_refs(),
+        new_gen_runtime_data := _NewData,
+        old_gen_runtime_data := _OldData
+    }.
+
 %%================================================================================
 %% Generation callbacks
 %%================================================================================
@@ -145,6 +167,9 @@
 -callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
     _Data.
 
+-callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) ->
+    ok | {error, _Reason}.
+
 -callback store_batch(shard_id(), _Data, [emqx_types:message()], emqx_ds:message_store_opts()) ->
     emqx_ds:store_batch_result().
 
@@ -157,10 +182,17 @@
 -callback next(shard_id(), _Data, Iter, pos_integer()) ->
     {ok, Iter, [emqx_types:message()]} | {error, _}.
 
+-callback post_creation_actions(post_creation_context()) -> _Data.
+
+-optional_callbacks([post_creation_actions/1]).
+
 %%================================================================================
 %% API for the replication layer
 %%================================================================================
 
+-record(call_list_generations_with_lifetimes, {}).
+-record(call_drop_generation, {gen_id :: gen_id()}).
+
 -spec open_shard(shard_id(), options()) -> ok.
 open_shard(Shard, Options) ->
     emqx_ds_storage_layer_sup:ensure_shard(Shard, Options).
@@ -188,18 +220,25 @@ store_batch(Shard, Messages, Options) ->
     [{integer(), stream()}].
 get_streams(Shard, TopicFilter, StartTime) ->
     Gens = generations_since(Shard, StartTime),
+    ?tp(get_streams_all_gens, #{gens => Gens}),
     lists:flatmap(
         fun(GenId) ->
-            #{module := Mod, data := GenData} = generation_get(Shard, GenId),
-            Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime),
-            [
-                {GenId, #{
-                    ?tag => ?STREAM,
-                    ?generation => GenId,
-                    ?enc => Stream
-                }}
-             || Stream <- Streams
-            ]
+            ?tp(get_streams_get_gen, #{gen_id => GenId}),
+            case generation_get_safe(Shard, GenId) of
+                {ok, #{module := Mod, data := GenData}} ->
+                    Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime),
+                    [
+                        {GenId, #{
+                            ?tag => ?STREAM,
+                            ?generation => GenId,
+                            ?enc => Stream
+                        }}
+                     || Stream <- Streams
+                    ];
+                {error, not_found} ->
+                    %% race condition: generation was dropped before getting its streams?
+                    []
+            end
         end,
         Gens
     ).
@@ -209,16 +248,20 @@ get_streams(Shard, TopicFilter, StartTime) ->
 make_iterator(
     Shard, #{?tag := ?STREAM, ?generation := GenId, ?enc := Stream}, TopicFilter, StartTime
 ) ->
-    #{module := Mod, data := GenData} = generation_get(Shard, GenId),
-    case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of
-        {ok, Iter} ->
-            {ok, #{
-                ?tag => ?IT,
-                ?generation => GenId,
-                ?enc => Iter
-            }};
-        {error, _} = Err ->
-            Err
+    case generation_get_safe(Shard, GenId) of
+        {ok, #{module := Mod, data := GenData}} ->
+            case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of
+                {ok, Iter} ->
+                    {ok, #{
+                        ?tag => ?IT,
+                        ?generation => GenId,
+                        ?enc => Iter
+                    }};
+                {error, _} = Err ->
+                    Err
+            end;
+        {error, not_found} ->
+            {error, end_of_stream}
     end.
 
 -spec update_iterator(
@@ -230,33 +273,42 @@ update_iterator(
     #{?tag := ?IT, ?generation := GenId, ?enc := OldIter},
     DSKey
 ) ->
-    #{module := Mod, data := GenData} = generation_get(Shard, GenId),
-    case Mod:update_iterator(Shard, GenData, OldIter, DSKey) of
-        {ok, Iter} ->
-            {ok, #{
-                ?tag => ?IT,
-                ?generation => GenId,
-                ?enc => Iter
-            }};
-        {error, _} = Err ->
-            Err
+    case generation_get_safe(Shard, GenId) of
+        {ok, #{module := Mod, data := GenData}} ->
+            case Mod:update_iterator(Shard, GenData, OldIter, DSKey) of
+                {ok, Iter} ->
+                    {ok, #{
+                        ?tag => ?IT,
+                        ?generation => GenId,
+                        ?enc => Iter
+                    }};
+                {error, _} = Err ->
+                    Err
+            end;
+        {error, not_found} ->
+            {error, end_of_stream}
     end.
 
 -spec next(shard_id(), iterator(), pos_integer()) ->
     emqx_ds:next_result(iterator()).
 next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) ->
-    #{module := Mod, data := GenData} = generation_get(Shard, GenId),
-    Current = generation_current(Shard),
-    case Mod:next(Shard, GenData, GenIter0, BatchSize) of
-        {ok, _GenIter, []} when GenId < Current ->
-            %% This is a past generation. Storage layer won't write
-            %% any more messages here. The iterator reached the end:
-            %% the stream has been fully replayed.
-            {ok, end_of_stream};
-        {ok, GenIter, Batch} ->
-            {ok, Iter#{?enc := GenIter}, Batch};
-        Error = {error, _} ->
-            Error
+    case generation_get_safe(Shard, GenId) of
+        {ok, #{module := Mod, data := GenData}} ->
+            Current = generation_current(Shard),
+            case Mod:next(Shard, GenData, GenIter0, BatchSize) of
+                {ok, _GenIter, []} when GenId < Current ->
+                    %% This is a past generation. Storage layer won't write
+                    %% any more messages here. The iterator reached the end:
+                    %% the stream has been fully replayed.
+                    {ok, end_of_stream};
+                {ok, GenIter, Batch} ->
+                    {ok, Iter#{?enc := GenIter}, Batch};
+                Error = {error, _} ->
+                    Error
+            end;
+        {error, not_found} ->
+            %% generation was possibly dropped by GC
+            {ok, end_of_stream}
     end.
 
 -spec update_config(shard_id(), emqx_ds:create_db_opts()) -> ok.
@@ -267,6 +319,21 @@ update_config(ShardId, Options) ->
 add_generation(ShardId) ->
     gen_server:call(?REF(ShardId), add_generation, infinity).
 
+-spec list_generations_with_lifetimes(shard_id()) ->
+    #{
+        gen_id() => #{
+            created_at := emqx_ds:time(),
+            since := emqx_ds:time(),
+            until := undefined | emqx_ds:time()
+        }
+    }.
+list_generations_with_lifetimes(ShardId) ->
+    gen_server:call(?REF(ShardId), #call_list_generations_with_lifetimes{}, infinity).
+
+-spec drop_generation(shard_id(), gen_id()) -> ok.
+drop_generation(ShardId, GenId) ->
+    gen_server:call(?REF(ShardId), #call_drop_generation{gen_id = GenId}, infinity).
+
 %%================================================================================
 %% gen_server for the shard
 %%================================================================================
@@ -328,6 +395,13 @@ handle_call(add_generation, _From, S0) ->
     S = add_generation(S0, Since),
     commit_metadata(S),
     {reply, ok, S};
+handle_call(#call_list_generations_with_lifetimes{}, _From, S) ->
+    Generations = handle_list_generations_with_lifetimes(S),
+    {reply, Generations, S};
+handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) ->
+    {Reply, S} = handle_drop_generation(S0, GenId),
+    commit_metadata(S),
+    {reply, Reply, S};
 handle_call(#call_create_generation{since = Since}, _From, S0) ->
     S = add_generation(S0, Since),
     commit_metadata(S),
@@ -359,7 +433,7 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) ->
     %% Transform generation schemas to generation runtime data:
     maps:map(
         fun
-            ({generation, GenId}, GenSchema) ->
+            (?GEN_KEY(GenId), GenSchema) ->
                 open_generation(ShardId, DB, CFRefs, GenId, GenSchema);
             (_K, Val) ->
                 Val
@@ -372,10 +446,40 @@ add_generation(S0, Since) ->
     #s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0,
     Schema1 = update_last_until(Schema0, Since),
     Shard1 = update_last_until(Shard0, Since),
+
+    #{current_generation := OldGenId, prototype := {CurrentMod, _ModConf}} = Schema0,
+    OldKey = ?GEN_KEY(OldGenId),
+    #{OldKey := OldGenSchema} = Schema0,
+    #{cf_refs := OldCFRefs} = OldGenSchema,
+    #{OldKey := #{module := OldMod, data := OldGenData}} = Shard0,
+
     {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Since),
+
     CFRefs = NewCFRefs ++ CFRefs0,
-    Key = {generation, GenId},
-    Generation = open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)),
+    Key = ?GEN_KEY(GenId),
+    Generation0 =
+        #{data := NewGenData0} =
+        open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)),
+
+    %% When the new generation's module is the same as the last one, we might want to
+    %% perform actions like inheriting some of the previous (meta)data.
+    NewGenData =
+        run_post_creation_actions(
+            #{
+                shard_id => ShardId,
+                db => DB,
+                new_gen_id => GenId,
+                old_gen_id => OldGenId,
+                new_cf_refs => NewCFRefs,
+                old_cf_refs => OldCFRefs,
+                new_gen_runtime_data => NewGenData0,
+                old_gen_runtime_data => OldGenData,
+                new_module => CurrentMod,
+                old_module => OldMod
+            }
+        ),
+    Generation = Generation0#{data := NewGenData},
+
     Shard = Shard1#{current_generation := GenId, Key => Generation},
     S0#s{
         cf_refs = CFRefs,
@@ -383,6 +487,54 @@ add_generation(S0, Since) ->
         shard = Shard
     }.
 
+-spec handle_list_generations_with_lifetimes(server_state()) -> #{gen_id() => map()}.
+handle_list_generations_with_lifetimes(#s{schema = ShardSchema}) ->
+    maps:fold(
+        fun
+            (?GEN_KEY(GenId), GenSchema, Acc) ->
+                Acc#{GenId => export_generation(GenSchema)};
+            (_Key, _Value, Acc) ->
+                Acc
+        end,
+        #{},
+        ShardSchema
+    ).
+
+-spec export_generation(generation_schema()) -> map().
+export_generation(GenSchema) ->
+    maps:with([created_at, since, until], GenSchema).
+
+-spec handle_drop_generation(server_state(), gen_id()) ->
+    {ok | {error, current_generation}, server_state()}.
+handle_drop_generation(#s{schema = #{current_generation := GenId}} = S0, GenId) ->
+    {{error, current_generation}, S0};
+handle_drop_generation(#s{schema = Schema} = S0, GenId) when
+    not is_map_key(?GEN_KEY(GenId), Schema)
+->
+    {{error, not_found}, S0};
+handle_drop_generation(S0, GenId) ->
+    #s{
+        shard_id = ShardId,
+        db = DB,
+        schema = #{?GEN_KEY(GenId) := GenSchema} = OldSchema,
+        shard = OldShard,
+        cf_refs = OldCFRefs
+    } = S0,
+    #{module := Mod, cf_refs := GenCFRefs} = GenSchema,
+    #{?GEN_KEY(GenId) := #{data := RuntimeData}} = OldShard,
+    case Mod:drop(ShardId, DB, GenId, GenCFRefs, RuntimeData) of
+        ok ->
+            CFRefs = OldCFRefs -- GenCFRefs,
+            Shard = maps:remove(?GEN_KEY(GenId), OldShard),
+            Schema = maps:remove(?GEN_KEY(GenId), OldSchema),
+            S = S0#s{
+                cf_refs = CFRefs,
+                shard = Shard,
+                schema = Schema
+            },
+            {ok, S}
+    end.
+
 -spec open_generation(shard_id(), rocksdb:db_handle(), cf_refs(), gen_id(), generation_schema()) ->
     generation().
 open_generation(ShardId, DB, CFRefs, GenId, GenSchema) ->
@@ -409,10 +561,17 @@ new_generation(ShardId, DB, Schema0, Since) ->
     #{current_generation := PrevGenId, prototype := {Mod, ModConf}} = Schema0,
     GenId = PrevGenId + 1,
     {GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConf),
-    GenSchema = #{module => Mod, data => GenData, since => Since, until => undefined},
+    GenSchema = #{
+        module => Mod,
+        data => GenData,
+        cf_refs => NewCFRefs,
+        created_at => emqx_message:timestamp_now(),
+        since => Since,
+        until => undefined
+    },
     Schema = Schema0#{
         current_generation => GenId,
-        {generation, GenId} => GenSchema
+        ?GEN_KEY(GenId) => GenSchema
     },
     {GenId, Schema, NewCFRefs}.
 
@@ -461,9 +620,26 @@ db_dir(BaseDir, {DB, ShardId}) ->
 -spec update_last_until(Schema, emqx_ds:time()) -> Schema when Schema :: shard_schema() | shard().
 update_last_until(Schema, Until) ->
     #{current_generation := GenId} = Schema,
-    GenData0 = maps:get({generation, GenId}, Schema),
+    GenData0 = maps:get(?GEN_KEY(GenId), Schema),
     GenData = GenData0#{until := Until},
-    Schema#{{generation, GenId} := GenData}.
+    Schema#{?GEN_KEY(GenId) := GenData}.
+
+run_post_creation_actions(
+    #{
+        new_module := Mod,
+        old_module := Mod,
+        new_gen_runtime_data := NewGenData
+    } = Context
+) ->
+    case erlang:function_exported(Mod, post_creation_actions, 1) of
+        true ->
+            Mod:post_creation_actions(Context);
+        false ->
+            NewGenData
+    end;
+run_post_creation_actions(#{new_gen_runtime_data := NewGenData}) ->
+    %% Different implementation modules
+    NewGenData.
 
 %%--------------------------------------------------------------------------------
 %% Schema access
@@ -476,15 +652,24 @@ generation_current(Shard) ->
 
 -spec generation_get(shard_id(), gen_id()) -> generation().
 generation_get(Shard, GenId) ->
-    #{{generation, GenId} := GenData} = get_schema_runtime(Shard),
+    {ok, GenData} = generation_get_safe(Shard, GenId),
     GenData.
 
+-spec generation_get_safe(shard_id(), gen_id()) -> {ok, generation()} | {error, not_found}.
+generation_get_safe(Shard, GenId) ->
+    case get_schema_runtime(Shard) of
+        #{?GEN_KEY(GenId) := GenData} ->
+            {ok, GenData};
+        #{} ->
+            {error, not_found}
+    end.
+
 -spec generations_since(shard_id(), emqx_ds:time()) -> [gen_id()].
 generations_since(Shard, Since) ->
     Schema = get_schema_runtime(Shard),
     maps:fold(
         fun
-            ({generation, GenId}, #{until := Until}, Acc) when Until >= Since ->
+            (?GEN_KEY(GenId), #{until := Until}, Acc) when Until >= Since ->
                 [GenId | Acc];
             (_K, _V, Acc) ->
                 Acc

+ 7 - 1
apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl

@@ -30,6 +30,7 @@
 -export([
     create/4,
     open/5,
+    drop/5,
     store_batch/4,
     get_streams/4,
     make_iterator/5,
@@ -85,6 +86,10 @@ open(_Shard, DBHandle, GenId, CFRefs, #schema{}) ->
     {_, CF} = lists:keyfind(data_cf(GenId), 1, CFRefs),
     #s{db = DBHandle, cf = CF}.
 
+drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) ->
+    ok = rocksdb:drop_column_family(DBHandle, CFHandle),
+    ok.
+
 store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) ->
     lists:foreach(
         fun(Msg) ->
@@ -142,7 +147,8 @@ do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) ->
     case rocksdb:iterator_move(IT, Action) of
         {ok, Key, Blob} ->
             Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob),
-            case emqx_topic:match(Topic, TopicFilter) andalso TS >= StartTime of
+            TopicWords = emqx_topic:words(Topic),
+            case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of
                 true ->
                     do_next(TopicFilter, StartTime, IT, next, NLeft - 1, Key, [{Key, Msg} | Acc]);
                 false ->

+ 147 - 0
apps/emqx_durable_storage/src/proto/emqx_ds_proto_v3.erl

@@ -0,0 +1,147 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_ds_proto_v3).
+
+-behavior(emqx_bpapi).
+
+-include_lib("emqx_utils/include/bpapi.hrl").
+%% API:
+-export([
+    drop_db/2,
+    store_batch/5,
+    get_streams/5,
+    make_iterator/6,
+    next/5,
+    update_iterator/5,
+    add_generation/2,
+
+    %% introduced in v3
+    list_generations_with_lifetimes/3,
+    drop_generation/4
+]).
+
+%% behavior callbacks:
+-export([introduced_in/0]).
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+-spec drop_db([node()], emqx_ds:db()) ->
+    [{ok, ok} | {error, _}].
+drop_db(Node, DB) ->
+    erpc:multicall(Node, emqx_ds_replication_layer, do_drop_db_v1, [DB]).
+
+-spec get_streams(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds:topic_filter(),
+    emqx_ds:time()
+) ->
+    [{integer(), emqx_ds_storage_layer:stream()}].
+get_streams(Node, DB, Shard, TopicFilter, Time) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]).
+
+-spec make_iterator(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:stream(),
+    emqx_ds:topic_filter(),
+    emqx_ds:time()
+) ->
+    {ok, emqx_ds_storage_layer:iterator()} | {error, _}.
+make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [
+        DB, Shard, Stream, TopicFilter, StartTime
+    ]).
+
+-spec next(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:iterator(),
+    pos_integer()
+) ->
+    {ok, emqx_ds_storage_layer:iterator(), [{emqx_ds:message_key(), [emqx_types:message()]}]}
+    | {ok, end_of_stream}
+    | {error, _}.
+next(Node, DB, Shard, Iter, BatchSize) ->
+    emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]).
+
+-spec store_batch(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_replication_layer:batch(),
+    emqx_ds:message_store_opts()
+) ->
+    emqx_ds:store_batch_result().
+store_batch(Node, DB, Shard, Batch, Options) ->
+    emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_store_batch_v1, [
+        DB, Shard, Batch, Options
+    ]).
+
+-spec update_iterator(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:iterator(),
+    emqx_ds:message_key()
+) ->
+    {ok, emqx_ds_storage_layer:iterator()} | {error, _}.
+update_iterator(Node, DB, Shard, OldIter, DSKey) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_update_iterator_v2, [
+        DB, Shard, OldIter, DSKey
+    ]).
+
+-spec add_generation([node()], emqx_ds:db()) ->
+    [{ok, ok} | {error, _}].
+add_generation(Node, DB) ->
+    erpc:multicall(Node, emqx_ds_replication_layer, do_add_generation_v2, [DB]).
+
+%%--------------------------------------------------------------------------------
+%% Introduced in V3
+%%--------------------------------------------------------------------------------
+
+-spec list_generations_with_lifetimes(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id()
+) ->
+    #{
+        emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info()
+    }.
+list_generations_with_lifetimes(Node, DB, Shard) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_list_generations_with_lifetimes_v3, [DB, Shard]).
+
+-spec drop_generation(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:gen_id()
+) ->
+    ok | {error, _}.
+drop_generation(Node, DB, Shard, GenId) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_drop_generation_v3, [DB, Shard, GenId]).
+
+%%================================================================================
+%% behavior callbacks
+%%================================================================================
+
+introduced_in() ->
+    "5.6.0".

+ 247 - 3
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -155,7 +155,7 @@ t_05_update_iterator(_Config) ->
     ?assertEqual(Msgs, AllMsgs, #{from_key => Iter1, final_iter => FinalIter}),
     ok.
 
-t_05_update_config(_Config) ->
+t_06_update_config(_Config) ->
     DB = ?FUNCTION_NAME,
     ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
     TopicFilter = ['#'],
@@ -199,7 +199,7 @@ t_05_update_config(_Config) ->
     end,
     lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)).
 
-t_06_add_generation(_Config) ->
+t_07_add_generation(_Config) ->
     DB = ?FUNCTION_NAME,
     ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
     TopicFilter = ['#'],
@@ -243,6 +243,250 @@ t_06_add_generation(_Config) ->
     end,
     lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)).
 
+%% Verifies the basic usage of `list_generations_with_lifetimes' and `drop_generation'...
+%%   1) Cannot drop current generation.
+%%   2) All existing generations are returned by `list_generation_with_lifetimes'.
+%%   3) Dropping a generation removes it from the list.
+%%   4) Dropped generations stay dropped even after restarting the application.
+t_08_smoke_list_drop_generation(_Config) ->
+    DB = ?FUNCTION_NAME,
+    ?check_trace(
+        begin
+            ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+            %% Exactly one generation at first.
+            Generations0 = emqx_ds:list_generations_with_lifetimes(DB),
+            ?assertMatch(
+                [{_GenId, #{since := _, until := _}}],
+                maps:to_list(Generations0),
+                #{gens => Generations0}
+            ),
+            [{GenId0, _}] = maps:to_list(Generations0),
+            %% Cannot delete current generation
+            ?assertEqual({error, current_generation}, emqx_ds:drop_generation(DB, GenId0)),
+
+            %% New gen
+            ok = emqx_ds:add_generation(DB),
+            Generations1 = emqx_ds:list_generations_with_lifetimes(DB),
+            ?assertMatch(
+                [
+                    {GenId0, #{since := _, until := _}},
+                    {_GenId1, #{since := _, until := _}}
+                ],
+                lists:sort(maps:to_list(Generations1)),
+                #{gens => Generations1}
+            ),
+            [GenId0, GenId1] = lists:sort(maps:keys(Generations1)),
+
+            %% Drop the older one
+            ?assertEqual(ok, emqx_ds:drop_generation(DB, GenId0)),
+            Generations2 = emqx_ds:list_generations_with_lifetimes(DB),
+            ?assertMatch(
+                [{GenId1, #{since := _, until := _}}],
+                lists:sort(maps:to_list(Generations2)),
+                #{gens => Generations2}
+            ),
+
+            %% Unknown gen_id, as it was already dropped
+            ?assertEqual({error, not_found}, emqx_ds:drop_generation(DB, GenId0)),
+
+            %% Should persist surviving generation list
+            ok = application:stop(emqx_durable_storage),
+            {ok, _} = application:ensure_all_started(emqx_durable_storage),
+            ok = emqx_ds:open_db(DB, opts()),
+
+            Generations3 = emqx_ds:list_generations_with_lifetimes(DB),
+            ?assertMatch(
+                [{GenId1, #{since := _, until := _}}],
+                lists:sort(maps:to_list(Generations3)),
+                #{gens => Generations3}
+            ),
+
+            ok
+        end,
+        []
+    ),
+    ok.
+
+t_drop_generation_with_never_used_iterator(_Config) ->
+    %% This test checks how the iterator behaves when:
+    %%   1) it's created at generation 1 and not consumed from.
+    %%   2) generation 2 is created and 1 dropped.
+    %%   3) iteration begins.
+    %% In this case, the iterator won't see any messages and the stream will end.
+
+    DB = ?FUNCTION_NAME,
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
+
+    TopicFilter = emqx_topic:words(<<"foo/+">>),
+    StartTime = 0,
+    Msgs0 = [
+        message(<<"foo/bar">>, <<"1">>, 0),
+        message(<<"foo/baz">>, <<"2">>, 1)
+    ],
+    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)),
+
+    [{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
+    {ok, Iter0} = emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime),
+
+    ok = emqx_ds:add_generation(DB),
+    ok = emqx_ds:drop_generation(DB, GenId0),
+
+    Now = emqx_message:timestamp_now(),
+    Msgs1 = [
+        message(<<"foo/bar">>, <<"3">>, Now + 100),
+        message(<<"foo/baz">>, <<"4">>, Now + 101)
+    ],
+    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)),
+
+    ?assertMatch({ok, end_of_stream, []}, iterate(DB, Iter0, 1)),
+
+    %% New iterator for the new stream will only see the later messages.
+    [{_, Stream1}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
+    ?assertNotEqual(Stream0, Stream1),
+    {ok, Iter1} = emqx_ds:make_iterator(DB, Stream1, TopicFilter, StartTime),
+
+    {ok, Iter, Batch} = iterate(DB, Iter1, 1),
+    ?assertNotEqual(end_of_stream, Iter),
+    ?assertEqual(Msgs1, [Msg || {_Key, Msg} <- Batch]),
+
+    ok.
+
+t_drop_generation_with_used_once_iterator(_Config) ->
+    %% This test checks how the iterator behaves when:
+    %%   1) it's created at generation 1 and consumes at least 1 message.
+    %%   2) generation 2 is created and 1 dropped.
+    %%   3) iteration continues.
+    %% In this case, the iterator should see no more messages and the stream will end.
+
+    DB = ?FUNCTION_NAME,
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
+
+    TopicFilter = emqx_topic:words(<<"foo/+">>),
+    StartTime = 0,
+    Msgs0 =
+        [Msg0 | _] = [
+            message(<<"foo/bar">>, <<"1">>, 0),
+            message(<<"foo/baz">>, <<"2">>, 1)
+        ],
+    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)),
+
+    [{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
+    {ok, Iter0} = emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime),
+    {ok, Iter1, Batch1} = emqx_ds:next(DB, Iter0, 1),
+    ?assertNotEqual(end_of_stream, Iter1),
+    ?assertEqual([Msg0], [Msg || {_Key, Msg} <- Batch1]),
+
+    ok = emqx_ds:add_generation(DB),
+    ok = emqx_ds:drop_generation(DB, GenId0),
+
+    Now = emqx_message:timestamp_now(),
+    Msgs1 = [
+        message(<<"foo/bar">>, <<"3">>, Now + 100),
+        message(<<"foo/baz">>, <<"4">>, Now + 101)
+    ],
+    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)),
+
+    ?assertMatch({ok, end_of_stream, []}, iterate(DB, Iter1, 1)),
+
+    ok.
+
+t_drop_generation_update_iterator(_Config) ->
+    %% This checks the behavior of `emqx_ds:update_iterator' after the generation
+    %% underlying the iterator has been dropped.
+
+    DB = ?FUNCTION_NAME,
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
+
+    TopicFilter = emqx_topic:words(<<"foo/+">>),
+    StartTime = 0,
+    Msgs0 = [
+        message(<<"foo/bar">>, <<"1">>, 0),
+        message(<<"foo/baz">>, <<"2">>, 1)
+    ],
+    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)),
+
+    [{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
+    {ok, Iter0} = emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime),
+    {ok, Iter1, _Batch1} = emqx_ds:next(DB, Iter0, 1),
+    {ok, _Iter2, [{Key2, _Msg}]} = emqx_ds:next(DB, Iter1, 1),
+
+    ok = emqx_ds:add_generation(DB),
+    ok = emqx_ds:drop_generation(DB, GenId0),
+
+    ?assertEqual({error, end_of_stream}, emqx_ds:update_iterator(DB, Iter1, Key2)),
+
+    ok.
+
+t_make_iterator_stale_stream(_Config) ->
+    %% This checks the behavior of `emqx_ds:make_iterator' after the generation underlying
+    %% the stream has been dropped.
+
+    DB = ?FUNCTION_NAME,
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
+
+    TopicFilter = emqx_topic:words(<<"foo/+">>),
+    StartTime = 0,
+    Msgs0 = [
+        message(<<"foo/bar">>, <<"1">>, 0),
+        message(<<"foo/baz">>, <<"2">>, 1)
+    ],
+    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)),
+
+    [{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
+
+    ok = emqx_ds:add_generation(DB),
+    ok = emqx_ds:drop_generation(DB, GenId0),
+
+    ?assertEqual(
+        {error, end_of_stream},
+        emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime)
+    ),
+
+    ok.
+
+t_get_streams_concurrently_with_drop_generation(_Config) ->
+    %% This checks that we can get all streams while a generation is dropped
+    %% mid-iteration.
+
+    DB = ?FUNCTION_NAME,
+    ?check_trace(
+        #{timetrap => 5_000},
+        begin
+            ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+
+            [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
+            ok = emqx_ds:add_generation(DB),
+            ok = emqx_ds:add_generation(DB),
+
+            %% All streams
+            TopicFilter = emqx_topic:words(<<"foo/+">>),
+            StartTime = 0,
+            ?assertMatch([_, _, _], emqx_ds:get_streams(DB, TopicFilter, StartTime)),
+
+            ?force_ordering(
+                #{?snk_kind := dropped_gen},
+                #{?snk_kind := get_streams_get_gen}
+            ),
+
+            spawn_link(fun() ->
+                {ok, _} = ?block_until(#{?snk_kind := get_streams_all_gens}),
+                ok = emqx_ds:drop_generation(DB, GenId0),
+                ?tp(dropped_gen, #{})
+            end),
+
+            ?assertMatch([_, _], emqx_ds:get_streams(DB, TopicFilter, StartTime)),
+
+            ok
+        end,
+        []
+    ),
+
+    ok.
+
 update_data_set() ->
     [
         [
@@ -295,7 +539,7 @@ iterate(DB, It0, BatchSize, Acc) ->
         {ok, It, Msgs} ->
             iterate(DB, It, BatchSize, Acc ++ Msgs);
         {ok, end_of_stream} ->
-            {ok, It0, Acc};
+            {ok, end_of_stream, Acc};
         Ret ->
             Ret
     end.