Prechádzať zdrojové kódy

Merge pull request #12338 from thalesmg/ds-message-gc-20240115

feat(ds): add message GC
Thales Macedo Garitezi 2 rokov pred
rodič
commit
d122340c13

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -22,6 +22,7 @@
 {emqx_delayed,3}.
 {emqx_ds,1}.
 {emqx_ds,2}.
+{emqx_ds,3}.
 {emqx_eviction_agent,1}.
 {emqx_eviction_agent,2}.
 {emqx_exhook,1}.

+ 157 - 0
apps/emqx/src/emqx_persistent_message_ds_gc_worker.erl

@@ -0,0 +1,157 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_persistent_message_ds_gc_worker).
+
+-behaviour(gen_server).
+
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("stdlib/include/qlc.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
+
+-include("emqx_persistent_session_ds.hrl").
+
+%% API
+-export([
+    start_link/0,
+    gc/0
+]).
+
+%% `gen_server' API
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2
+]).
+
+%% call/cast/info records
+-record(gc, {}).
+
+%%--------------------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------------------
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+%% For testing or manual ops
+gc() ->
+    gen_server:call(?MODULE, #gc{}, infinity).
+
+%%--------------------------------------------------------------------------------
+%% `gen_server' API
+%%--------------------------------------------------------------------------------
+
+init(_Opts) ->
+    ensure_gc_timer(),
+    State = #{},
+    {ok, State}.
+
+handle_call(#gc{}, _From, State) ->
+    maybe_gc(),
+    {reply, ok, State};
+handle_call(_Call, _From, State) ->
+    {reply, error, State}.
+
+handle_cast(_Cast, State) ->
+    {noreply, State}.
+
+handle_info(#gc{}, State) ->
+    try_gc(),
+    ensure_gc_timer(),
+    {noreply, State};
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------------------
+%% Internal fns
+%%--------------------------------------------------------------------------------
+
+ensure_gc_timer() ->
+    Timeout = emqx_config:get([session_persistence, message_retention_period]),
+    _ = erlang:send_after(Timeout, self(), #gc{}),
+    ok.
+
+try_gc() ->
+    %% Only cores should run GC.
+    CoreNodes = mria_membership:running_core_nodelist(),
+    Res = global:trans(
+        {?MODULE, self()},
+        fun maybe_gc/0,
+        CoreNodes,
+        %% Note: we set retries to 1 here because, in rare occasions, GC might start at the
+        %% same time in more than one node, and each one will abort the other.  By allowing
+        %% one retry, at least one node will (hopefully) get to enter the transaction and
+        %% the other will abort.  If GC runs too fast, both nodes might run in sequence.
+        %% But, in that case, GC is clearly not too costly, and that shouldn't be a problem,
+        %% resource-wise.
+        _Retries = 1
+    ),
+    case Res of
+        aborted ->
+            ?tp(ds_message_gc_lock_taken, #{}),
+            ok;
+        ok ->
+            ok
+    end.
+
+now_ms() ->
+    erlang:system_time(millisecond).
+
+maybe_gc() ->
+    AllGens = emqx_ds:list_generations_with_lifetimes(?PERSISTENT_MESSAGE_DB),
+    NowMS = now_ms(),
+    RetentionPeriod = emqx_config:get([session_persistence, message_retention_period]),
+    TimeThreshold = NowMS - RetentionPeriod,
+    maybe_create_new_generation(AllGens, TimeThreshold),
+    ?tp_span(
+        ps_message_gc,
+        #{},
+        begin
+            ExpiredGens =
+                maps:filter(
+                    fun(_GenId, #{until := Until}) ->
+                        is_number(Until) andalso Until =< TimeThreshold
+                    end,
+                    AllGens
+                ),
+            ExpiredGenIds = maps:keys(ExpiredGens),
+            lists:foreach(
+                fun(GenId) ->
+                    ok = emqx_ds:drop_generation(?PERSISTENT_MESSAGE_DB, GenId),
+                    ?tp(message_gc_generation_dropped, #{gen_id => GenId})
+                end,
+                ExpiredGenIds
+            )
+        end
+    ).
+
+maybe_create_new_generation(AllGens, TimeThreshold) ->
+    NeedNewGen =
+        lists:all(
+            fun({_GenId, #{created_at := CreatedAt}}) ->
+                CreatedAt =< TimeThreshold
+            end,
+            maps:to_list(AllGens)
+        ),
+    case NeedNewGen of
+        false ->
+            ?tp(ps_message_gc_too_early, #{}),
+            ok;
+        true ->
+            ok = emqx_ds:add_generation(?PERSISTENT_MESSAGE_DB),
+            ?tp(ps_message_gc_added_gen, #{})
+    end.

+ 3 - 2
apps/emqx/src/emqx_persistent_session_ds_sup.erl

@@ -48,13 +48,14 @@ init(Opts) ->
 
 do_init(_Opts) ->
     SupFlags = #{
-        strategy => rest_for_one,
+        strategy => one_for_one,
         intensity => 10,
         period => 2,
         auto_shutdown => never
     },
     CoreChildren = [
-        worker(gc_worker, emqx_persistent_session_ds_gc_worker, [])
+        worker(session_gc_worker, emqx_persistent_session_ds_gc_worker, []),
+        worker(message_gc_worker, emqx_persistent_message_ds_gc_worker, [])
     ],
     Children =
         case mria_rlog:role() of

+ 8 - 0
apps/emqx/src/emqx_schema.erl

@@ -1855,6 +1855,14 @@ fields("session_persistence") ->
                     desc => ?DESC(session_ds_session_gc_batch_size)
                 }
             )},
+        {"message_retention_period",
+            sc(
+                timeout_duration(),
+                #{
+                    default => <<"1d">>,
+                    desc => ?DESC(session_ds_message_retention_period)
+                }
+            )},
         {"force_persistence",
             sc(
                 boolean(),

+ 85 - 2
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -19,6 +19,7 @@
 -include_lib("stdlib/include/assert.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 
 -compile(export_all).
@@ -45,10 +46,20 @@ init_per_testcase(t_session_subscription_iterators = TestCase, Config) ->
     Cluster = cluster(),
     Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}),
     [{nodes, Nodes} | Config];
+init_per_testcase(t_message_gc = TestCase, Config) ->
+    Opts = #{
+        extra_emqx_conf =>
+            "\n  session_persistence.message_retention_period = 1s"
+            "\n  session_persistence.storage.builtin.n_shards = 3"
+    },
+    common_init_per_testcase(TestCase, [{n_shards, 3} | Config], Opts);
 init_per_testcase(TestCase, Config) ->
+    common_init_per_testcase(TestCase, Config, _Opts = #{}).
+
+common_init_per_testcase(TestCase, Config, Opts) ->
     ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB),
     Apps = emqx_cth_suite:start(
-        app_specs(),
+        app_specs(Opts),
         #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
     ),
     [{apps, Apps} | Config].
@@ -379,6 +390,66 @@ t_publish_empty_topic_levels(_Config) ->
         emqtt:stop(Pub)
     end.
 
+t_message_gc_too_young(_Config) ->
+    %% Check that GC doesn't attempt to create a new generation if there are fresh enough
+    %% generations around.  The stability of this test relies on the default value for
+    %% message retention being long enough.  Currently, the default is 1 hour.
+    ?check_trace(
+        ok = emqx_persistent_message_ds_gc_worker:gc(),
+        fun(Trace) ->
+            ?assertMatch([_], ?of_kind(ps_message_gc_too_early, Trace)),
+            ok
+        end
+    ),
+    ok.
+
+t_message_gc(Config) ->
+    %% Check that, after GC runs, a new generation is created, retaining messages, and
+    %% older messages no longer are accessible.
+    NShards = ?config(n_shards, Config),
+    ?check_trace(
+        #{timetrap => 10_000},
+        begin
+            %% ensure some messages are in the first generation
+            ?force_ordering(
+                #{?snk_kind := inserted_batch},
+                #{?snk_kind := ps_message_gc_added_gen}
+            ),
+            Msgs0 = [
+                message(<<"foo/bar">>, <<"1">>, 0),
+                message(<<"foo/baz">>, <<"2">>, 1)
+            ],
+            ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs0),
+            ?tp(inserted_batch, #{}),
+            {ok, _} = ?block_until(#{?snk_kind := ps_message_gc_added_gen}),
+
+            Now = emqx_message:timestamp_now(),
+            Msgs1 = [
+                message(<<"foo/bar">>, <<"3">>, Now + 100),
+                message(<<"foo/baz">>, <<"4">>, Now + 101)
+            ],
+            ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs1),
+
+            {ok, _} = snabbkaffe:block_until(
+                ?match_n_events(NShards, #{?snk_kind := message_gc_generation_dropped}),
+                infinity
+            ),
+
+            TopicFilter = emqx_topic:words(<<"#">>),
+            StartTime = 0,
+            Msgs = consume(TopicFilter, StartTime),
+            %% only "1" and "2" should have been GC'ed
+            ?assertEqual(
+                sets:from_list([<<"3">>, <<"4">>], [{version, 2}]),
+                sets:from_list([emqx_message:payload(Msg) || Msg <- Msgs], [{version, 2}])
+            ),
+
+            ok
+        end,
+        []
+    ),
+    ok.
+
 %%
 
 connect(ClientId, CleanStart, EI) ->
@@ -438,9 +509,13 @@ publish(Node, Message) ->
     erpc:call(Node, emqx, publish, [Message]).
 
 app_specs() ->
+    app_specs(_Opts = #{}).
+
+app_specs(Opts) ->
+    ExtraEMQXConf = maps:get(extra_emqx_conf, Opts, ""),
     [
         emqx_durable_storage,
-        {emqx, "session_persistence {enable = true}"}
+        {emqx, "session_persistence {enable = true}" ++ ExtraEMQXConf}
     ].
 
 cluster() ->
@@ -459,3 +534,11 @@ clear_db() ->
     mria:stop(),
     ok = mnesia:delete_schema([node()]),
     ok.
+
+message(Topic, Payload, PublishedAt) ->
+    #message{
+        topic = Topic,
+        payload = Payload,
+        timestamp = PublishedAt,
+        id = emqx_guid:gen()
+    }.

+ 55 - 2
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -22,7 +22,14 @@
 -module(emqx_ds).
 
 %% Management API:
--export([open_db/2, update_db_config/2, add_generation/1, drop_db/1]).
+-export([
+    open_db/2,
+    update_db_config/2,
+    add_generation/1,
+    list_generations_with_lifetimes/1,
+    drop_generation/2,
+    drop_db/1
+]).
 
 %% Message storage API:
 -export([store_batch/2, store_batch/3]).
@@ -52,7 +59,10 @@
     get_iterator_result/1,
 
     ds_specific_stream/0,
-    ds_specific_iterator/0
+    ds_specific_iterator/0,
+    ds_specific_generation_rank/0,
+    generation_rank/0,
+    generation_info/0
 ]).
 
 %%================================================================================
@@ -80,6 +90,8 @@
 
 -type ds_specific_stream() :: term().
 
+-type ds_specific_generation_rank() :: term().
+
 -type message_key() :: binary().
 
 -type store_batch_result() :: ok | {error, _}.
@@ -114,6 +126,17 @@
 
 -type get_iterator_result(Iterator) :: {ok, Iterator} | undefined.
 
+%% An opaque term identifying a generation.  Each implementation will possibly add
+%% information to this term to match its inner structure (e.g.: by embedding the shard id,
+%% in the case of `emqx_ds_replication_layer').
+-opaque generation_rank() :: ds_specific_generation_rank().
+
+-type generation_info() :: #{
+    created_at := time(),
+    since := time(),
+    until := time() | undefined
+}.
+
 -define(persistent_term(DB), {emqx_ds_db_backend, DB}).
 
 -define(module(DB), (persistent_term:get(?persistent_term(DB)))).
@@ -128,6 +151,11 @@
 
 -callback update_db_config(db(), create_db_opts()) -> ok | {error, _}.
 
+-callback list_generations_with_lifetimes(db()) ->
+    #{generation_rank() => generation_info()}.
+
+-callback drop_generation(db(), generation_rank()) -> ok | {error, _}.
+
 -callback drop_db(db()) -> ok | {error, _}.
 
 -callback store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result().
@@ -142,6 +170,11 @@
 
 -callback next(db(), Iterator, pos_integer()) -> next_result(Iterator).
 
+-optional_callbacks([
+    list_generations_with_lifetimes/1,
+    drop_generation/2
+]).
+
 %%================================================================================
 %% API funcions
 %%================================================================================
@@ -166,6 +199,26 @@ add_generation(DB) ->
 update_db_config(DB, Opts) ->
     ?module(DB):update_db_config(DB, Opts).
 
+-spec list_generations_with_lifetimes(db()) -> #{generation_rank() => generation_info()}.
+list_generations_with_lifetimes(DB) ->
+    Mod = ?module(DB),
+    case erlang:function_exported(Mod, list_generations_with_lifetimes, 1) of
+        true ->
+            Mod:list_generations_with_lifetimes(DB);
+        false ->
+            #{}
+    end.
+
+-spec drop_generation(db(), generation_rank()) -> ok | {error, _}.
+drop_generation(DB, GenId) ->
+    Mod = ?module(DB),
+    case erlang:function_exported(Mod, drop_generation, 2) of
+        true ->
+            Mod:drop_generation(DB, GenId);
+        false ->
+            {error, not_implemented}
+    end.
+
 %% @doc TODO: currently if one or a few shards are down, they won't be
 
 %% deleted.

+ 10 - 1
apps/emqx_durable_storage/src/emqx_ds_lts.erl

@@ -18,7 +18,12 @@
 
 %% API:
 -export([
-    trie_create/1, trie_create/0, trie_restore/2, topic_key/3, match_topics/2, lookup_topic_key/2
+    trie_create/1, trie_create/0,
+    trie_restore/2,
+    trie_restore_existing/2,
+    topic_key/3,
+    match_topics/2,
+    lookup_topic_key/2
 ]).
 
 %% Debug:
@@ -115,6 +120,10 @@ trie_create() ->
 -spec trie_restore(options(), [{_Key, _Val}]) -> trie().
 trie_restore(Options, Dump) ->
     Trie = trie_create(Options),
+    trie_restore_existing(Trie, Dump).
+
+-spec trie_restore_existing(trie(), [{_Key, _Val}]) -> trie().
+trie_restore_existing(Trie, Dump) ->
     lists:foreach(
         fun({{StateFrom, Token}, StateTo}) ->
             trie_insert(Trie, StateFrom, Token, StateTo)

+ 43 - 2
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -25,6 +25,8 @@
     open_db/2,
     add_generation/1,
     update_db_config/2,
+    list_generations_with_lifetimes/1,
+    drop_generation/2,
     drop_db/1,
     store_batch/3,
     get_streams/3,
@@ -41,7 +43,9 @@
     do_make_iterator_v1/5,
     do_update_iterator_v2/4,
     do_next_v1/4,
-    do_add_generation_v2/1
+    do_add_generation_v2/1,
+    do_list_generations_with_lifetimes_v3/2,
+    do_drop_generation_v3/3
 ]).
 
 -export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]).
@@ -104,6 +108,8 @@
     ?batch_messages := [emqx_types:message()]
 }.
 
+-type generation_rank() :: {shard_id(), term()}.
+
 %%================================================================================
 %% API functions
 %%================================================================================
@@ -135,6 +141,32 @@ add_generation(DB) ->
 update_db_config(DB, CreateOpts) ->
     emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts).
 
+-spec list_generations_with_lifetimes(emqx_ds:db()) ->
+    #{generation_rank() => emqx_ds:generation_info()}.
+list_generations_with_lifetimes(DB) ->
+    Shards = list_shards(DB),
+    lists:foldl(
+        fun(Shard, GensAcc) ->
+            Node = node_of_shard(DB, Shard),
+            maps:fold(
+                fun(GenId, Data, AccInner) ->
+                    AccInner#{{Shard, GenId} => Data}
+                end,
+                GensAcc,
+                emqx_ds_proto_v3:list_generations_with_lifetimes(Node, DB, Shard)
+            )
+        end,
+        #{},
+        Shards
+    ).
+
+-spec drop_generation(emqx_ds:db(), generation_rank()) -> ok | {error, _}.
+drop_generation(DB, {Shard, GenId}) ->
+    %% TODO: drop generation in all nodes in the replica set, not only in the leader,
+    %% after we have proper replication in place.
+    Node = node_of_shard(DB, Shard),
+    emqx_ds_proto_v3:drop_generation(Node, DB, Shard, GenId).
+
 -spec drop_db(emqx_ds:db()) -> ok | {error, _}.
 drop_db(DB) ->
     Nodes = list_nodes(),
@@ -301,7 +333,6 @@ do_next_v1(DB, Shard, Iter, BatchSize) ->
 -spec do_add_generation_v2(emqx_ds:db()) -> ok | {error, _}.
 do_add_generation_v2(DB) ->
     MyShards = emqx_ds_replication_layer_meta:my_owned_shards(DB),
-
     lists:foreach(
         fun(ShardId) ->
             emqx_ds_storage_layer:add_generation({DB, ShardId})
@@ -309,6 +340,16 @@ do_add_generation_v2(DB) ->
         MyShards
     ).
 
+-spec do_list_generations_with_lifetimes_v3(emqx_ds:db(), shard_id()) ->
+    #{emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info()}.
+do_list_generations_with_lifetimes_v3(DB, ShardId) ->
+    emqx_ds_storage_layer:list_generations_with_lifetimes({DB, ShardId}).
+
+-spec do_drop_generation_v3(emqx_ds:db(), shard_id(), emqx_ds_storage_layer:gen_id()) ->
+    ok | {error, _}.
+do_drop_generation_v3(DB, ShardId, GenId) ->
+    emqx_ds_storage_layer:drop_generation({DB, ShardId}, GenId).
+
 %%================================================================================
 %% Internal functions
 %%================================================================================

+ 45 - 1
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -27,11 +27,13 @@
 -export([
     create/4,
     open/5,
+    drop/5,
     store_batch/4,
     get_streams/4,
     make_iterator/5,
     update_iterator/4,
-    next/4
+    next/4,
+    post_creation_actions/1
 ]).
 
 %% internal exports:
@@ -199,6 +201,37 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
         ts_offset = TSOffsetBits
     }.
 
+-spec post_creation_actions(emqx_ds_storage_layer:post_creation_context()) ->
+    s().
+post_creation_actions(
+    #{
+        db := DBHandle,
+        old_gen_id := OldGenId,
+        old_cf_refs := OldCFRefs,
+        new_gen_runtime_data := NewGenData0
+    }
+) ->
+    {_, OldTrieCF} = lists:keyfind(trie_cf(OldGenId), 1, OldCFRefs),
+    #s{trie = NewTrie0} = NewGenData0,
+    NewTrie = copy_previous_trie(DBHandle, NewTrie0, OldTrieCF),
+    ?tp(bitfield_lts_inherited_trie, #{}),
+    NewGenData0#s{trie = NewTrie}.
+
+-spec drop(
+    emqx_ds_storage_layer:shard_id(),
+    rocksdb:db_handle(),
+    emqx_ds_storage_layer:gen_id(),
+    emqx_ds_storage_layer:cf_refs(),
+    s()
+) ->
+    ok.
+drop(_Shard, DBHandle, GenId, CFRefs, #s{}) ->
+    {_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs),
+    {_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs),
+    ok = rocksdb:drop_column_family(DBHandle, DataCF),
+    ok = rocksdb:drop_column_family(DBHandle, TrieCF),
+    ok.
+
 -spec store_batch(
     emqx_ds_storage_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts()
 ) ->
@@ -500,6 +533,17 @@ restore_trie(TopicIndexBytes, DB, CF) ->
         rocksdb:iterator_close(IT)
     end.
 
+-spec copy_previous_trie(rocksdb:db_handle(), emqx_ds_lts:trie(), rocksdb:cf_handle()) ->
+    emqx_ds_lts:trie().
+copy_previous_trie(DBHandle, NewTrie, OldCF) ->
+    {ok, IT} = rocksdb:iterator(DBHandle, OldCF, []),
+    try
+        OldDump = read_persisted_trie(IT, rocksdb:iterator_move(IT, first)),
+        emqx_ds_lts:trie_restore_existing(NewTrie, OldDump)
+    after
+        rocksdb:iterator_close(IT)
+    end.
+
 read_persisted_trie(IT, {ok, KeyB, ValB}) ->
     [
         {binary_to_term(KeyB), binary_to_term(ValB)}

+ 239 - 54
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -27,7 +27,9 @@
     update_iterator/3,
     next/3,
     update_config/2,
-    add_generation/1
+    add_generation/1,
+    list_generations_with_lifetimes/1,
+    drop_generation/2
 ]).
 
 %% gen_server
@@ -44,7 +46,8 @@
     iterator/0,
     shard_id/0,
     options/0,
-    prototype/0
+    prototype/0,
+    post_creation_context/0
 ]).
 
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@@ -95,11 +98,18 @@
 
 %%%% Generation:
 
+-define(GEN_KEY(GEN_ID), {generation, GEN_ID}).
+
 -type generation(Data) :: #{
     %% Module that handles data for the generation:
     module := module(),
     %% Module-specific data defined at generation creation time:
     data := Data,
+    %% Column families used by this generation
+    cf_refs := cf_refs(),
+    %% Time at which this was created.  Might differ from `since', in particular for the
+    %% first generation.
+    created_at := emqx_ds:time(),
     %% When should this generation become active?
     %% This generation should only contain messages timestamped no earlier than that.
     %% The very first generation will have `since` equal 0.
@@ -121,7 +131,7 @@
     %% This data is used to create new generation:
     prototype := prototype(),
     %% Generations:
-    {generation, gen_id()} => GenData
+    ?GEN_KEY(gen_id()) => GenData
 }.
 
 %% Shard schema (persistent):
@@ -132,6 +142,18 @@
 
 -type options() :: map().
 
+-type post_creation_context() ::
+    #{
+        shard_id := emqx_ds_storage_layer:shard_id(),
+        db := rocksdb:db_handle(),
+        new_gen_id := emqx_ds_storage_layer:gen_id(),
+        old_gen_id := emqx_ds_storage_layer:gen_id(),
+        new_cf_refs := cf_refs(),
+        old_cf_refs := cf_refs(),
+        new_gen_runtime_data := _NewData,
+        old_gen_runtime_data := _OldData
+    }.
+
 %%================================================================================
 %% Generation callbacks
 %%================================================================================
@@ -145,6 +167,9 @@
 -callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
     _Data.
 
+-callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) ->
+    ok | {error, _Reason}.
+
 -callback store_batch(shard_id(), _Data, [emqx_types:message()], emqx_ds:message_store_opts()) ->
     emqx_ds:store_batch_result().
 
@@ -157,10 +182,17 @@
 -callback next(shard_id(), _Data, Iter, pos_integer()) ->
     {ok, Iter, [emqx_types:message()]} | {error, _}.
 
+-callback post_creation_actions(post_creation_context()) -> _Data.
+
+-optional_callbacks([post_creation_actions/1]).
+
 %%================================================================================
 %% API for the replication layer
 %%================================================================================
 
+-record(call_list_generations_with_lifetimes, {}).
+-record(call_drop_generation, {gen_id :: gen_id()}).
+
 -spec open_shard(shard_id(), options()) -> ok.
 open_shard(Shard, Options) ->
     emqx_ds_storage_layer_sup:ensure_shard(Shard, Options).
@@ -188,18 +220,25 @@ store_batch(Shard, Messages, Options) ->
     [{integer(), stream()}].
 get_streams(Shard, TopicFilter, StartTime) ->
     Gens = generations_since(Shard, StartTime),
+    ?tp(get_streams_all_gens, #{gens => Gens}),
     lists:flatmap(
         fun(GenId) ->
-            #{module := Mod, data := GenData} = generation_get(Shard, GenId),
-            Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime),
-            [
-                {GenId, #{
-                    ?tag => ?STREAM,
-                    ?generation => GenId,
-                    ?enc => Stream
-                }}
-             || Stream <- Streams
-            ]
+            ?tp(get_streams_get_gen, #{gen_id => GenId}),
+            case generation_get_safe(Shard, GenId) of
+                {ok, #{module := Mod, data := GenData}} ->
+                    Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime),
+                    [
+                        {GenId, #{
+                            ?tag => ?STREAM,
+                            ?generation => GenId,
+                            ?enc => Stream
+                        }}
+                     || Stream <- Streams
+                    ];
+                {error, not_found} ->
+                    %% race condition: generation was dropped before getting its streams?
+                    []
+            end
         end,
         Gens
     ).
@@ -209,16 +248,20 @@ get_streams(Shard, TopicFilter, StartTime) ->
 make_iterator(
     Shard, #{?tag := ?STREAM, ?generation := GenId, ?enc := Stream}, TopicFilter, StartTime
 ) ->
-    #{module := Mod, data := GenData} = generation_get(Shard, GenId),
-    case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of
-        {ok, Iter} ->
-            {ok, #{
-                ?tag => ?IT,
-                ?generation => GenId,
-                ?enc => Iter
-            }};
-        {error, _} = Err ->
-            Err
+    case generation_get_safe(Shard, GenId) of
+        {ok, #{module := Mod, data := GenData}} ->
+            case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of
+                {ok, Iter} ->
+                    {ok, #{
+                        ?tag => ?IT,
+                        ?generation => GenId,
+                        ?enc => Iter
+                    }};
+                {error, _} = Err ->
+                    Err
+            end;
+        {error, not_found} ->
+            {error, end_of_stream}
     end.
 
 -spec update_iterator(
@@ -230,33 +273,42 @@ update_iterator(
     #{?tag := ?IT, ?generation := GenId, ?enc := OldIter},
     DSKey
 ) ->
-    #{module := Mod, data := GenData} = generation_get(Shard, GenId),
-    case Mod:update_iterator(Shard, GenData, OldIter, DSKey) of
-        {ok, Iter} ->
-            {ok, #{
-                ?tag => ?IT,
-                ?generation => GenId,
-                ?enc => Iter
-            }};
-        {error, _} = Err ->
-            Err
+    case generation_get_safe(Shard, GenId) of
+        {ok, #{module := Mod, data := GenData}} ->
+            case Mod:update_iterator(Shard, GenData, OldIter, DSKey) of
+                {ok, Iter} ->
+                    {ok, #{
+                        ?tag => ?IT,
+                        ?generation => GenId,
+                        ?enc => Iter
+                    }};
+                {error, _} = Err ->
+                    Err
+            end;
+        {error, not_found} ->
+            {error, end_of_stream}
     end.
 
 -spec next(shard_id(), iterator(), pos_integer()) ->
     emqx_ds:next_result(iterator()).
 next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) ->
-    #{module := Mod, data := GenData} = generation_get(Shard, GenId),
-    Current = generation_current(Shard),
-    case Mod:next(Shard, GenData, GenIter0, BatchSize) of
-        {ok, _GenIter, []} when GenId < Current ->
-            %% This is a past generation. Storage layer won't write
-            %% any more messages here. The iterator reached the end:
-            %% the stream has been fully replayed.
-            {ok, end_of_stream};
-        {ok, GenIter, Batch} ->
-            {ok, Iter#{?enc := GenIter}, Batch};
-        Error = {error, _} ->
-            Error
+    case generation_get_safe(Shard, GenId) of
+        {ok, #{module := Mod, data := GenData}} ->
+            Current = generation_current(Shard),
+            case Mod:next(Shard, GenData, GenIter0, BatchSize) of
+                {ok, _GenIter, []} when GenId < Current ->
+                    %% This is a past generation. Storage layer won't write
+                    %% any more messages here. The iterator reached the end:
+                    %% the stream has been fully replayed.
+                    {ok, end_of_stream};
+                {ok, GenIter, Batch} ->
+                    {ok, Iter#{?enc := GenIter}, Batch};
+                Error = {error, _} ->
+                    Error
+            end;
+        {error, not_found} ->
+            %% generation was possibly dropped by GC
+            {ok, end_of_stream}
     end.
 
 -spec update_config(shard_id(), emqx_ds:create_db_opts()) -> ok.
@@ -267,6 +319,21 @@ update_config(ShardId, Options) ->
 add_generation(ShardId) ->
     gen_server:call(?REF(ShardId), add_generation, infinity).
 
+-spec list_generations_with_lifetimes(shard_id()) ->
+    #{
+        gen_id() => #{
+            created_at := emqx_ds:time(),
+            since := emqx_ds:time(),
+            until := undefined | emqx_ds:time()
+        }
+    }.
+list_generations_with_lifetimes(ShardId) ->
+    gen_server:call(?REF(ShardId), #call_list_generations_with_lifetimes{}, infinity).
+
+-spec drop_generation(shard_id(), gen_id()) -> ok.
+drop_generation(ShardId, GenId) ->
+    gen_server:call(?REF(ShardId), #call_drop_generation{gen_id = GenId}, infinity).
+
 %%================================================================================
 %% gen_server for the shard
 %%================================================================================
@@ -328,6 +395,13 @@ handle_call(add_generation, _From, S0) ->
     S = add_generation(S0, Since),
     commit_metadata(S),
     {reply, ok, S};
+handle_call(#call_list_generations_with_lifetimes{}, _From, S) ->
+    Generations = handle_list_generations_with_lifetimes(S),
+    {reply, Generations, S};
+handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) ->
+    {Reply, S} = handle_drop_generation(S0, GenId),
+    commit_metadata(S),
+    {reply, Reply, S};
 handle_call(#call_create_generation{since = Since}, _From, S0) ->
     S = add_generation(S0, Since),
     commit_metadata(S),
@@ -359,7 +433,7 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) ->
     %% Transform generation schemas to generation runtime data:
     maps:map(
         fun
-            ({generation, GenId}, GenSchema) ->
+            (?GEN_KEY(GenId), GenSchema) ->
                 open_generation(ShardId, DB, CFRefs, GenId, GenSchema);
             (_K, Val) ->
                 Val
@@ -372,10 +446,40 @@ add_generation(S0, Since) ->
     #s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0,
     Schema1 = update_last_until(Schema0, Since),
     Shard1 = update_last_until(Shard0, Since),
+
+    #{current_generation := OldGenId, prototype := {CurrentMod, _ModConf}} = Schema0,
+    OldKey = ?GEN_KEY(OldGenId),
+    #{OldKey := OldGenSchema} = Schema0,
+    #{cf_refs := OldCFRefs} = OldGenSchema,
+    #{OldKey := #{module := OldMod, data := OldGenData}} = Shard0,
+
     {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Since),
+
     CFRefs = NewCFRefs ++ CFRefs0,
-    Key = {generation, GenId},
-    Generation = open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)),
+    Key = ?GEN_KEY(GenId),
+    Generation0 =
+        #{data := NewGenData0} =
+        open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)),
+
+    %% When the new generation's module is the same as the last one, we might want to
+    %% perform actions like inheriting some of the previous (meta)data.
+    NewGenData =
+        run_post_creation_actions(
+            #{
+                shard_id => ShardId,
+                db => DB,
+                new_gen_id => GenId,
+                old_gen_id => OldGenId,
+                new_cf_refs => NewCFRefs,
+                old_cf_refs => OldCFRefs,
+                new_gen_runtime_data => NewGenData0,
+                old_gen_runtime_data => OldGenData,
+                new_module => CurrentMod,
+                old_module => OldMod
+            }
+        ),
+    Generation = Generation0#{data := NewGenData},
+
     Shard = Shard1#{current_generation := GenId, Key => Generation},
     S0#s{
         cf_refs = CFRefs,
@@ -383,6 +487,54 @@ add_generation(S0, Since) ->
         shard = Shard
     }.
 
+-spec handle_list_generations_with_lifetimes(server_state()) -> #{gen_id() => map()}.
+handle_list_generations_with_lifetimes(#s{schema = ShardSchema}) ->
+    maps:fold(
+        fun
+            (?GEN_KEY(GenId), GenSchema, Acc) ->
+                Acc#{GenId => export_generation(GenSchema)};
+            (_Key, _Value, Acc) ->
+                Acc
+        end,
+        #{},
+        ShardSchema
+    ).
+
+-spec export_generation(generation_schema()) -> map().
+export_generation(GenSchema) ->
+    maps:with([created_at, since, until], GenSchema).
+
+-spec handle_drop_generation(server_state(), gen_id()) ->
+    {ok | {error, current_generation}, server_state()}.
+handle_drop_generation(#s{schema = #{current_generation := GenId}} = S0, GenId) ->
+    {{error, current_generation}, S0};
+handle_drop_generation(#s{schema = Schema} = S0, GenId) when
+    not is_map_key(?GEN_KEY(GenId), Schema)
+->
+    {{error, not_found}, S0};
+handle_drop_generation(S0, GenId) ->
+    #s{
+        shard_id = ShardId,
+        db = DB,
+        schema = #{?GEN_KEY(GenId) := GenSchema} = OldSchema,
+        shard = OldShard,
+        cf_refs = OldCFRefs
+    } = S0,
+    #{module := Mod, cf_refs := GenCFRefs} = GenSchema,
+    #{?GEN_KEY(GenId) := #{data := RuntimeData}} = OldShard,
+    case Mod:drop(ShardId, DB, GenId, GenCFRefs, RuntimeData) of
+        ok ->
+            CFRefs = OldCFRefs -- GenCFRefs,
+            Shard = maps:remove(?GEN_KEY(GenId), OldShard),
+            Schema = maps:remove(?GEN_KEY(GenId), OldSchema),
+            S = S0#s{
+                cf_refs = CFRefs,
+                shard = Shard,
+                schema = Schema
+            },
+            {ok, S}
+    end.
+
 -spec open_generation(shard_id(), rocksdb:db_handle(), cf_refs(), gen_id(), generation_schema()) ->
     generation().
 open_generation(ShardId, DB, CFRefs, GenId, GenSchema) ->
@@ -409,10 +561,17 @@ new_generation(ShardId, DB, Schema0, Since) ->
     #{current_generation := PrevGenId, prototype := {Mod, ModConf}} = Schema0,
     GenId = PrevGenId + 1,
     {GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConf),
-    GenSchema = #{module => Mod, data => GenData, since => Since, until => undefined},
+    GenSchema = #{
+        module => Mod,
+        data => GenData,
+        cf_refs => NewCFRefs,
+        created_at => emqx_message:timestamp_now(),
+        since => Since,
+        until => undefined
+    },
     Schema = Schema0#{
         current_generation => GenId,
-        {generation, GenId} => GenSchema
+        ?GEN_KEY(GenId) => GenSchema
     },
     {GenId, Schema, NewCFRefs}.
 
@@ -461,9 +620,26 @@ db_dir(BaseDir, {DB, ShardId}) ->
 -spec update_last_until(Schema, emqx_ds:time()) -> Schema when Schema :: shard_schema() | shard().
 update_last_until(Schema, Until) ->
     #{current_generation := GenId} = Schema,
-    GenData0 = maps:get({generation, GenId}, Schema),
+    GenData0 = maps:get(?GEN_KEY(GenId), Schema),
     GenData = GenData0#{until := Until},
-    Schema#{{generation, GenId} := GenData}.
+    Schema#{?GEN_KEY(GenId) := GenData}.
+
+run_post_creation_actions(
+    #{
+        new_module := Mod,
+        old_module := Mod,
+        new_gen_runtime_data := NewGenData
+    } = Context
+) ->
+    case erlang:function_exported(Mod, post_creation_actions, 1) of
+        true ->
+            Mod:post_creation_actions(Context);
+        false ->
+            NewGenData
+    end;
+run_post_creation_actions(#{new_gen_runtime_data := NewGenData}) ->
+    %% Different implementation modules
+    NewGenData.
 
 %%--------------------------------------------------------------------------------
 %% Schema access
@@ -476,15 +652,24 @@ generation_current(Shard) ->
 
 -spec generation_get(shard_id(), gen_id()) -> generation().
 generation_get(Shard, GenId) ->
-    #{{generation, GenId} := GenData} = get_schema_runtime(Shard),
+    {ok, GenData} = generation_get_safe(Shard, GenId),
     GenData.
 
+-spec generation_get_safe(shard_id(), gen_id()) -> {ok, generation()} | {error, not_found}.
+generation_get_safe(Shard, GenId) ->
+    case get_schema_runtime(Shard) of
+        #{?GEN_KEY(GenId) := GenData} ->
+            {ok, GenData};
+        #{} ->
+            {error, not_found}
+    end.
+
 -spec generations_since(shard_id(), emqx_ds:time()) -> [gen_id()].
 generations_since(Shard, Since) ->
     Schema = get_schema_runtime(Shard),
     maps:fold(
         fun
-            ({generation, GenId}, #{until := Until}, Acc) when Until >= Since ->
+            (?GEN_KEY(GenId), #{until := Until}, Acc) when Until >= Since ->
                 [GenId | Acc];
             (_K, _V, Acc) ->
                 Acc

+ 7 - 1
apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl

@@ -30,6 +30,7 @@
 -export([
     create/4,
     open/5,
+    drop/5,
     store_batch/4,
     get_streams/4,
     make_iterator/5,
@@ -85,6 +86,10 @@ open(_Shard, DBHandle, GenId, CFRefs, #schema{}) ->
     {_, CF} = lists:keyfind(data_cf(GenId), 1, CFRefs),
     #s{db = DBHandle, cf = CF}.
 
+drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) ->
+    ok = rocksdb:drop_column_family(DBHandle, CFHandle),
+    ok.
+
 store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) ->
     lists:foreach(
         fun(Msg) ->
@@ -142,7 +147,8 @@ do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) ->
     case rocksdb:iterator_move(IT, Action) of
         {ok, Key, Blob} ->
             Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob),
-            case emqx_topic:match(Topic, TopicFilter) andalso TS >= StartTime of
+            TopicWords = emqx_topic:words(Topic),
+            case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of
                 true ->
                     do_next(TopicFilter, StartTime, IT, next, NLeft - 1, Key, [{Key, Msg} | Acc]);
                 false ->

+ 147 - 0
apps/emqx_durable_storage/src/proto/emqx_ds_proto_v3.erl

@@ -0,0 +1,147 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_ds_proto_v3).
+
+-behavior(emqx_bpapi).
+
+-include_lib("emqx_utils/include/bpapi.hrl").
+%% API:
+-export([
+    drop_db/2,
+    store_batch/5,
+    get_streams/5,
+    make_iterator/6,
+    next/5,
+    update_iterator/5,
+    add_generation/2,
+
+    %% introduced in v3
+    list_generations_with_lifetimes/3,
+    drop_generation/4
+]).
+
+%% behavior callbacks:
+-export([introduced_in/0]).
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+-spec drop_db([node()], emqx_ds:db()) ->
+    [{ok, ok} | {error, _}].
+drop_db(Node, DB) ->
+    erpc:multicall(Node, emqx_ds_replication_layer, do_drop_db_v1, [DB]).
+
+-spec get_streams(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds:topic_filter(),
+    emqx_ds:time()
+) ->
+    [{integer(), emqx_ds_storage_layer:stream()}].
+get_streams(Node, DB, Shard, TopicFilter, Time) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]).
+
+-spec make_iterator(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:stream(),
+    emqx_ds:topic_filter(),
+    emqx_ds:time()
+) ->
+    {ok, emqx_ds_storage_layer:iterator()} | {error, _}.
+make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [
+        DB, Shard, Stream, TopicFilter, StartTime
+    ]).
+
+-spec next(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:iterator(),
+    pos_integer()
+) ->
+    {ok, emqx_ds_storage_layer:iterator(), [{emqx_ds:message_key(), [emqx_types:message()]}]}
+    | {ok, end_of_stream}
+    | {error, _}.
+next(Node, DB, Shard, Iter, BatchSize) ->
+    emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]).
+
+-spec store_batch(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_replication_layer:batch(),
+    emqx_ds:message_store_opts()
+) ->
+    emqx_ds:store_batch_result().
+store_batch(Node, DB, Shard, Batch, Options) ->
+    emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_store_batch_v1, [
+        DB, Shard, Batch, Options
+    ]).
+
+-spec update_iterator(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:iterator(),
+    emqx_ds:message_key()
+) ->
+    {ok, emqx_ds_storage_layer:iterator()} | {error, _}.
+update_iterator(Node, DB, Shard, OldIter, DSKey) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_update_iterator_v2, [
+        DB, Shard, OldIter, DSKey
+    ]).
+
+-spec add_generation([node()], emqx_ds:db()) ->
+    [{ok, ok} | {error, _}].
+add_generation(Node, DB) ->
+    erpc:multicall(Node, emqx_ds_replication_layer, do_add_generation_v2, [DB]).
+
+%%--------------------------------------------------------------------------------
+%% Introduced in V3
+%%--------------------------------------------------------------------------------
+
+-spec list_generations_with_lifetimes(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id()
+) ->
+    #{
+        emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info()
+    }.
+list_generations_with_lifetimes(Node, DB, Shard) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_list_generations_with_lifetimes_v3, [DB, Shard]).
+
+-spec drop_generation(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:gen_id()
+) ->
+    ok | {error, _}.
+drop_generation(Node, DB, Shard, GenId) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_drop_generation_v3, [DB, Shard, GenId]).
+
+%%================================================================================
+%% behavior callbacks
+%%================================================================================
+
+introduced_in() ->
+    "5.6.0".

+ 247 - 3
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -155,7 +155,7 @@ t_05_update_iterator(_Config) ->
     ?assertEqual(Msgs, AllMsgs, #{from_key => Iter1, final_iter => FinalIter}),
     ok.
 
-t_05_update_config(_Config) ->
+t_06_update_config(_Config) ->
     DB = ?FUNCTION_NAME,
     ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
     TopicFilter = ['#'],
@@ -199,7 +199,7 @@ t_05_update_config(_Config) ->
     end,
     lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)).
 
-t_06_add_generation(_Config) ->
+t_07_add_generation(_Config) ->
     DB = ?FUNCTION_NAME,
     ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
     TopicFilter = ['#'],
@@ -243,6 +243,250 @@ t_06_add_generation(_Config) ->
     end,
     lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)).
 
+%% Verifies the basic usage of `list_generations_with_lifetimes' and `drop_generation'...
+%%   1) Cannot drop current generation.
+%%   2) All existing generations are returned by `list_generation_with_lifetimes'.
+%%   3) Dropping a generation removes it from the list.
+%%   4) Dropped generations stay dropped even after restarting the application.
+t_08_smoke_list_drop_generation(_Config) ->
+    DB = ?FUNCTION_NAME,
+    ?check_trace(
+        begin
+            ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+            %% Exactly one generation at first.
+            Generations0 = emqx_ds:list_generations_with_lifetimes(DB),
+            ?assertMatch(
+                [{_GenId, #{since := _, until := _}}],
+                maps:to_list(Generations0),
+                #{gens => Generations0}
+            ),
+            [{GenId0, _}] = maps:to_list(Generations0),
+            %% Cannot delete current generation
+            ?assertEqual({error, current_generation}, emqx_ds:drop_generation(DB, GenId0)),
+
+            %% New gen
+            ok = emqx_ds:add_generation(DB),
+            Generations1 = emqx_ds:list_generations_with_lifetimes(DB),
+            ?assertMatch(
+                [
+                    {GenId0, #{since := _, until := _}},
+                    {_GenId1, #{since := _, until := _}}
+                ],
+                lists:sort(maps:to_list(Generations1)),
+                #{gens => Generations1}
+            ),
+            [GenId0, GenId1] = lists:sort(maps:keys(Generations1)),
+
+            %% Drop the older one
+            ?assertEqual(ok, emqx_ds:drop_generation(DB, GenId0)),
+            Generations2 = emqx_ds:list_generations_with_lifetimes(DB),
+            ?assertMatch(
+                [{GenId1, #{since := _, until := _}}],
+                lists:sort(maps:to_list(Generations2)),
+                #{gens => Generations2}
+            ),
+
+            %% Unknown gen_id, as it was already dropped
+            ?assertEqual({error, not_found}, emqx_ds:drop_generation(DB, GenId0)),
+
+            %% Should persist surviving generation list
+            ok = application:stop(emqx_durable_storage),
+            {ok, _} = application:ensure_all_started(emqx_durable_storage),
+            ok = emqx_ds:open_db(DB, opts()),
+
+            Generations3 = emqx_ds:list_generations_with_lifetimes(DB),
+            ?assertMatch(
+                [{GenId1, #{since := _, until := _}}],
+                lists:sort(maps:to_list(Generations3)),
+                #{gens => Generations3}
+            ),
+
+            ok
+        end,
+        []
+    ),
+    ok.
+
+t_drop_generation_with_never_used_iterator(_Config) ->
+    %% This test checks how the iterator behaves when:
+    %%   1) it's created at generation 1 and not consumed from.
+    %%   2) generation 2 is created and 1 dropped.
+    %%   3) iteration begins.
+    %% In this case, the iterator won't see any messages and the stream will end.
+
+    DB = ?FUNCTION_NAME,
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
+
+    TopicFilter = emqx_topic:words(<<"foo/+">>),
+    StartTime = 0,
+    Msgs0 = [
+        message(<<"foo/bar">>, <<"1">>, 0),
+        message(<<"foo/baz">>, <<"2">>, 1)
+    ],
+    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)),
+
+    [{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
+    {ok, Iter0} = emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime),
+
+    ok = emqx_ds:add_generation(DB),
+    ok = emqx_ds:drop_generation(DB, GenId0),
+
+    Now = emqx_message:timestamp_now(),
+    Msgs1 = [
+        message(<<"foo/bar">>, <<"3">>, Now + 100),
+        message(<<"foo/baz">>, <<"4">>, Now + 101)
+    ],
+    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)),
+
+    ?assertMatch({ok, end_of_stream, []}, iterate(DB, Iter0, 1)),
+
+    %% New iterator for the new stream will only see the later messages.
+    [{_, Stream1}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
+    ?assertNotEqual(Stream0, Stream1),
+    {ok, Iter1} = emqx_ds:make_iterator(DB, Stream1, TopicFilter, StartTime),
+
+    {ok, Iter, Batch} = iterate(DB, Iter1, 1),
+    ?assertNotEqual(end_of_stream, Iter),
+    ?assertEqual(Msgs1, [Msg || {_Key, Msg} <- Batch]),
+
+    ok.
+
+t_drop_generation_with_used_once_iterator(_Config) ->
+    %% This test checks how the iterator behaves when:
+    %%   1) it's created at generation 1 and consumes at least 1 message.
+    %%   2) generation 2 is created and 1 dropped.
+    %%   3) iteration continues.
+    %% In this case, the iterator should see no more messages and the stream will end.
+
+    DB = ?FUNCTION_NAME,
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
+
+    TopicFilter = emqx_topic:words(<<"foo/+">>),
+    StartTime = 0,
+    Msgs0 =
+        [Msg0 | _] = [
+            message(<<"foo/bar">>, <<"1">>, 0),
+            message(<<"foo/baz">>, <<"2">>, 1)
+        ],
+    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)),
+
+    [{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
+    {ok, Iter0} = emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime),
+    {ok, Iter1, Batch1} = emqx_ds:next(DB, Iter0, 1),
+    ?assertNotEqual(end_of_stream, Iter1),
+    ?assertEqual([Msg0], [Msg || {_Key, Msg} <- Batch1]),
+
+    ok = emqx_ds:add_generation(DB),
+    ok = emqx_ds:drop_generation(DB, GenId0),
+
+    Now = emqx_message:timestamp_now(),
+    Msgs1 = [
+        message(<<"foo/bar">>, <<"3">>, Now + 100),
+        message(<<"foo/baz">>, <<"4">>, Now + 101)
+    ],
+    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)),
+
+    ?assertMatch({ok, end_of_stream, []}, iterate(DB, Iter1, 1)),
+
+    ok.
+
+t_drop_generation_update_iterator(_Config) ->
+    %% This checks the behavior of `emqx_ds:update_iterator' after the generation
+    %% underlying the iterator has been dropped.
+
+    DB = ?FUNCTION_NAME,
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
+
+    TopicFilter = emqx_topic:words(<<"foo/+">>),
+    StartTime = 0,
+    Msgs0 = [
+        message(<<"foo/bar">>, <<"1">>, 0),
+        message(<<"foo/baz">>, <<"2">>, 1)
+    ],
+    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)),
+
+    [{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
+    {ok, Iter0} = emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime),
+    {ok, Iter1, _Batch1} = emqx_ds:next(DB, Iter0, 1),
+    {ok, _Iter2, [{Key2, _Msg}]} = emqx_ds:next(DB, Iter1, 1),
+
+    ok = emqx_ds:add_generation(DB),
+    ok = emqx_ds:drop_generation(DB, GenId0),
+
+    ?assertEqual({error, end_of_stream}, emqx_ds:update_iterator(DB, Iter1, Key2)),
+
+    ok.
+
+t_make_iterator_stale_stream(_Config) ->
+    %% This checks the behavior of `emqx_ds:make_iterator' after the generation underlying
+    %% the stream has been dropped.
+
+    DB = ?FUNCTION_NAME,
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
+
+    TopicFilter = emqx_topic:words(<<"foo/+">>),
+    StartTime = 0,
+    Msgs0 = [
+        message(<<"foo/bar">>, <<"1">>, 0),
+        message(<<"foo/baz">>, <<"2">>, 1)
+    ],
+    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)),
+
+    [{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
+
+    ok = emqx_ds:add_generation(DB),
+    ok = emqx_ds:drop_generation(DB, GenId0),
+
+    ?assertEqual(
+        {error, end_of_stream},
+        emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime)
+    ),
+
+    ok.
+
+t_get_streams_concurrently_with_drop_generation(_Config) ->
+    %% This checks that we can get all streams while a generation is dropped
+    %% mid-iteration.
+
+    DB = ?FUNCTION_NAME,
+    ?check_trace(
+        #{timetrap => 5_000},
+        begin
+            ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+
+            [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
+            ok = emqx_ds:add_generation(DB),
+            ok = emqx_ds:add_generation(DB),
+
+            %% All streams
+            TopicFilter = emqx_topic:words(<<"foo/+">>),
+            StartTime = 0,
+            ?assertMatch([_, _, _], emqx_ds:get_streams(DB, TopicFilter, StartTime)),
+
+            ?force_ordering(
+                #{?snk_kind := dropped_gen},
+                #{?snk_kind := get_streams_get_gen}
+            ),
+
+            spawn_link(fun() ->
+                {ok, _} = ?block_until(#{?snk_kind := get_streams_all_gens}),
+                ok = emqx_ds:drop_generation(DB, GenId0),
+                ?tp(dropped_gen, #{})
+            end),
+
+            ?assertMatch([_, _], emqx_ds:get_streams(DB, TopicFilter, StartTime)),
+
+            ok
+        end,
+        []
+    ),
+
+    ok.
+
 update_data_set() ->
     [
         [
@@ -295,7 +539,7 @@ iterate(DB, It0, BatchSize, Acc) ->
         {ok, It, Msgs} ->
             iterate(DB, It, BatchSize, Acc ++ Msgs);
         {ok, end_of_stream} ->
-            {ok, It0, Acc};
+            {ok, end_of_stream, Acc};
         Ret ->
             Ret
     end.

+ 33 - 0
apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl

@@ -131,6 +131,39 @@ t_get_streams(_Config) ->
     ?assert(lists:member(A, AllStreams)),
     ok.
 
+t_new_generation_inherit_trie(_Config) ->
+    %% This test checks that we inherit the previous generation's LTS when creating a new
+    %% generation.
+    ?check_trace(
+        begin
+            %% Create a bunch of topics to be learned in the first generation
+            Timestamps = lists:seq(1, 10_000, 100),
+            Batch = [
+                begin
+                    B = integer_to_binary(I),
+                    make_message(
+                        TS,
+                        <<"wildcard/", B/binary, "/suffix/", Suffix/binary>>,
+                        integer_to_binary(TS)
+                    )
+                end
+             || I <- lists:seq(1, 200),
+                TS <- Timestamps,
+                Suffix <- [<<"foo">>, <<"bar">>]
+            ],
+            ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []),
+            %% Now we create a new generation with the same LTS module.  It should inherit the
+            %% learned trie.
+            ok = emqx_ds_storage_layer:add_generation(?SHARD),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertMatch([_], ?of_kind(bitfield_lts_inherited_trie, Trace)),
+            ok
+        end
+    ),
+    ok.
+
 t_replay(_Config) ->
     %% Create concrete topics:
     Topics = [<<"foo/bar">>, <<"foo/bar/baz">>],

+ 1 - 0
changes/ce/feat-12338.en.md

@@ -0,0 +1 @@
+Added time-based message garbage collection to the RocksDB-based persistent session backend.

+ 3 - 0
rel/i18n/emqx_schema.hocon

@@ -1608,5 +1608,8 @@ The session will query the DB for the new messages when the value of `FreeSpace`
 
 `FreeSpace` is calculated as `ReceiveMaximum` for the session - number of inflight messages."""
 
+session_ds_message_retention_period.desc:
+"""The minimum amount of time that messages should be retained for.  After messages have been in storage for at least this period of time, they'll be dropped."""
+
 
 }