瀏覽代碼

feat(ds): implement `get_delete_streams`, `make_delete_iterator` and `delete_next` callbacks for builtin storage

Part of https://emqx.atlassian.net/browse/EMQX-11841
Thales Macedo Garitezi 1 年之前
父節點
當前提交
6af01b916e

+ 102 - 2
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -30,9 +30,12 @@
     drop_db/1,
     store_batch/3,
     get_streams/3,
+    get_delete_streams/3,
     make_iterator/4,
+    make_delete_iterator/4,
     update_iterator/3,
     next/3,
+    delete_next/4,
     node_of_shard/2,
     shard_of_message/3,
     maybe_set_myself_as_leader/2
@@ -50,11 +53,22 @@
     do_next_v1/4,
     do_add_generation_v2/1,
     do_list_generations_with_lifetimes_v3/2,
-    do_drop_generation_v3/3
+    do_drop_generation_v3/3,
+    do_get_delete_streams_v4/4,
+    do_make_delete_iterator_v4/5,
+    do_delete_next_v4/5
 ]).
 
 -export_type([
-    shard_id/0, builtin_db_opts/0, stream_v1/0, stream/0, iterator/0, message_id/0, batch/0
+    shard_id/0,
+    builtin_db_opts/0,
+    stream_v1/0,
+    stream/0,
+    delete_stream/0,
+    iterator/0,
+    delete_iterator/0,
+    message_id/0,
+    batch/0
 ]).
 
 -include_lib("emqx_utils/include/emqx_message.hrl").
@@ -86,9 +100,12 @@
     }.
 
 -define(stream_v2(SHARD, INNER), [2, SHARD | INNER]).
+-define(delete_stream(SHARD, INNER), [3, SHARD | INNER]).
 
 -opaque stream() :: nonempty_maybe_improper_list().
 
+-opaque delete_stream() :: nonempty_maybe_improper_list().
+
 -opaque iterator() ::
     #{
         ?tag := ?IT,
@@ -96,6 +113,13 @@
         ?enc := emqx_ds_storage_layer:iterator()
     }.
 
+-opaque delete_iterator() ::
+    #{
+        ?tag := ?DELETE_IT,
+        ?shard := emqx_ds_replication_layer:shard_id(),
+        ?enc := emqx_ds_storage_layer:delete_iterator()
+    }.
+
 -type message_id() :: emqx_ds:message_id().
 
 -type batch() :: #{
@@ -193,6 +217,24 @@ get_streams(DB, TopicFilter, StartTime) ->
         Shards
     ).
 
+-spec get_delete_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
+    [delete_stream()].
+get_delete_streams(DB, TopicFilter, StartTime) ->
+    Shards = list_shards(DB),
+    lists:flatmap(
+        fun(Shard) ->
+            Node = node_of_shard(DB, Shard),
+            Streams = emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, StartTime),
+            lists:map(
+                fun(StorageLayerStream) ->
+                    ?delete_stream(Shard, StorageLayerStream)
+                end,
+                Streams
+            )
+        end,
+        Shards
+    ).
+
 -spec make_iterator(emqx_ds:db(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
     emqx_ds:make_iterator_result(iterator()).
 make_iterator(DB, Stream, TopicFilter, StartTime) ->
@@ -205,6 +247,22 @@ make_iterator(DB, Stream, TopicFilter, StartTime) ->
             Err
     end.
 
+-spec make_delete_iterator(emqx_ds:db(), delete_stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
+    emqx_ds:make_delete_iterator_result(delete_iterator()).
+make_delete_iterator(DB, Stream, TopicFilter, StartTime) ->
+    ?delete_stream(Shard, StorageStream) = Stream,
+    Node = node_of_shard(DB, Shard),
+    case
+        emqx_ds_proto_v4:make_delete_iterator(
+            Node, DB, Shard, StorageStream, TopicFilter, StartTime
+        )
+    of
+        {ok, Iter} ->
+            {ok, #{?tag => ?DELETE_IT, ?shard => Shard, ?enc => Iter}};
+        Err = {error, _} ->
+            Err
+    end.
+
 -spec update_iterator(
     emqx_ds:db(),
     iterator(),
@@ -249,6 +307,19 @@ next(DB, Iter0, BatchSize) ->
             Other
     end.
 
+-spec delete_next(emqx_ds:db(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) ->
+    emqx_ds:delete_next_result(delete_iterator()).
+delete_next(DB, Iter0, Selector, BatchSize) ->
+    #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIter0} = Iter0,
+    Node = node_of_shard(DB, Shard),
+    case emqx_ds_proto_v4:delete_next(Node, DB, Shard, StorageIter0, Selector, BatchSize) of
+        {ok, StorageIter, NumDeleted} ->
+            Iter = Iter0#{?enc := StorageIter},
+            {ok, Iter, NumDeleted};
+        Other ->
+            Other
+    end.
+
 -spec node_of_shard(emqx_ds:db(), shard_id()) -> node().
 node_of_shard(DB, Shard) ->
     case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of
@@ -352,6 +423,17 @@ do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) ->
 do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) ->
     emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime).
 
+-spec do_make_delete_iterator_v4(
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:delete_stream(),
+    emqx_ds:topic_filter(),
+    emqx_ds:time()
+) ->
+    {ok, emqx_ds_storage_layer:delete_iterator()} | {error, _}.
+do_make_delete_iterator_v4(DB, Shard, Stream, TopicFilter, StartTime) ->
+    emqx_ds_storage_layer:make_delete_iterator({DB, Shard}, Stream, TopicFilter, StartTime).
+
 -spec do_update_iterator_v2(
     emqx_ds:db(),
     emqx_ds_replication_layer:shard_id(),
@@ -374,6 +456,17 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
 do_next_v1(DB, Shard, Iter, BatchSize) ->
     emqx_ds_storage_layer:next({DB, Shard}, Iter, BatchSize).
 
+-spec do_delete_next_v4(
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:delete_iterator(),
+    emqx_ds:delete_selector(),
+    pos_integer()
+) ->
+    emqx_ds:delete_next_result(emqx_ds_storage_layer:delete_iterator()).
+do_delete_next_v4(DB, Shard, Iter, Selector, BatchSize) ->
+    emqx_ds_storage_layer:delete_next({DB, Shard}, Iter, Selector, BatchSize).
+
 -spec do_add_generation_v2(emqx_ds:db()) -> ok | {error, _}.
 do_add_generation_v2(DB) ->
     MyShards = emqx_ds_replication_layer_meta:my_owned_shards(DB),
@@ -394,6 +487,13 @@ do_list_generations_with_lifetimes_v3(DB, ShardId) ->
 do_drop_generation_v3(DB, ShardId, GenId) ->
     emqx_ds_storage_layer:drop_generation({DB, ShardId}, GenId).
 
+-spec do_get_delete_streams_v4(
+    emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
+) ->
+    [emqx_ds_storage_layer:delete_stream()].
+do_get_delete_streams_v4(DB, Shard, TopicFilter, StartTime) ->
+    emqx_ds_storage_layer:get_delete_streams({DB, Shard}, TopicFilter, StartTime).
+
 %%================================================================================
 %% Internal functions
 %%================================================================================

+ 1 - 0
apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl

@@ -23,6 +23,7 @@
 -define(STREAM, 1).
 -define(IT, 2).
 -define(BATCH, 3).
+-define(DELETE_IT, 4).
 
 %% keys:
 -define(tag, 1).

+ 228 - 12
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -30,9 +30,12 @@
     drop/5,
     store_batch/4,
     get_streams/4,
+    get_delete_streams/4,
     make_iterator/5,
+    make_delete_iterator/5,
     update_iterator/4,
     next/4,
+    delete_next/5,
     post_creation_actions/1
 ]).
 
@@ -54,6 +57,7 @@
 %% tags:
 -define(STREAM, 1).
 -define(IT, 2).
+-define(DELETE_IT, 3).
 
 %% keys:
 -define(tag, 1).
@@ -91,6 +95,8 @@
 
 -type stream() :: emqx_ds_lts:msg_storage_key().
 
+-type delete_stream() :: emqx_ds_lts:msg_storage_key().
+
 -type iterator() ::
     #{
         ?tag := ?IT,
@@ -100,6 +106,15 @@
         ?last_seen_key := binary()
     }.
 
+-type delete_iterator() ::
+    #{
+        ?tag := ?DELETE_IT,
+        ?topic_filter := emqx_ds:topic_filter(),
+        ?start_time := emqx_ds:time(),
+        ?storage_key := emqx_ds_lts:msg_storage_key(),
+        ?last_seen_key := binary()
+    }.
+
 -define(COUNTER, emqx_ds_storage_bitfield_lts_counter).
 
 %% Limit on the number of wildcard levels in the learned topic trie:
@@ -262,6 +277,15 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
 get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
     emqx_ds_lts:match_topics(Trie, TopicFilter).
 
+-spec get_delete_streams(
+    emqx_ds_storage_layer:shard_id(),
+    s(),
+    emqx_ds:topic_filter(),
+    emqx_ds:time()
+) -> [delete_stream()].
+get_delete_streams(Shard, State, TopicFilter, StartTime) ->
+    get_streams(Shard, State, TopicFilter, StartTime).
+
 -spec make_iterator(
     emqx_ds_storage_layer:shard_id(),
     s(),
@@ -283,6 +307,27 @@ make_iterator(
         ?last_seen_key => <<>>
     }}.
 
+-spec make_delete_iterator(
+    emqx_ds_storage_layer:shard_id(),
+    s(),
+    delete_stream(),
+    emqx_ds:topic_filter(),
+    emqx_ds:time()
+) -> {ok, delete_iterator()}.
+make_delete_iterator(
+    _Shard, _Data, StorageKey, TopicFilter, StartTime
+) ->
+    %% Note: it's a good idea to keep the iterator structure lean,
+    %% since it can be stored on a remote node that could update its
+    %% code independently from us.
+    {ok, #{
+        ?tag => ?DELETE_IT,
+        ?topic_filter => TopicFilter,
+        ?start_time => StartTime,
+        ?storage_key => StorageKey,
+        ?last_seen_key => <<>>
+    }}.
+
 -spec update_iterator(
     emqx_ds_storage_layer:shard_id(),
     s(),
@@ -319,6 +364,76 @@ next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime,
         ?start_time := StartTime,
         ?storage_key := {TopicIndex, Varying}
     } = It,
+    #{
+        it_handle := ITHandle,
+        keymapper := Keymapper,
+        filter := Filter
+    } = prepare_loop_context(DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Keymappers),
+    try
+        put(?COUNTER, 0),
+        next_loop(ITHandle, Keymapper, Filter, SafeCutoffTime, It, [], BatchSize)
+    after
+        rocksdb:iterator_close(ITHandle),
+        erase(?COUNTER)
+    end.
+
+delete_next(_Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize) ->
+    %% Compute safe cutoff time.
+    %% It's the point in time where the last complete epoch ends, so we need to know
+    %% the current time to compute it.
+    Now = emqx_message:timestamp_now(),
+    SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset,
+    delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize).
+
+delete_next_until(
+    _Schema,
+    It = #{?tag := ?DELETE_IT, ?start_time := StartTime},
+    SafeCutoffTime,
+    _Selector,
+    _BatchSize
+) when
+    StartTime >= SafeCutoffTime
+->
+    %% We're in the middle of the current epoch, so we can't yet iterate over it.
+    %% It would be unsafe otherwise: messages can be stored in the current epoch
+    %% concurrently with iterating over it. They can end up earlier (in the iteration
+    %% order) due to the nature of keymapping, potentially causing us to miss them.
+    {ok, It, 0, 0};
+delete_next_until(
+    #s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, Selector, BatchSize
+) ->
+    #{
+        ?tag := ?DELETE_IT,
+        ?start_time := StartTime,
+        ?storage_key := {TopicIndex, Varying}
+    } = It,
+    #{it_handle := ITHandle} =
+        LoopContext0 = prepare_loop_context(
+            DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Keymappers
+        ),
+    try
+        put(?COUNTER, 0),
+        LoopContext = LoopContext0#{
+            db => DB,
+            cf => CF,
+            safe_cutoff_time => SafeCutoffTime,
+            storage_iter => It,
+            deleted => 0,
+            iterated_over => 0,
+            selector => Selector,
+            remaining => BatchSize
+        },
+        delete_next_loop(LoopContext)
+    after
+        rocksdb:iterator_close(ITHandle),
+        erase(?COUNTER)
+    end.
+
+%%================================================================================
+%% Internal functions
+%%================================================================================
+
+prepare_loop_context(DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Keymappers) ->
     %% Make filter:
     Inequations = [
         {'=', TopicIndex},
@@ -350,17 +465,11 @@ next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime,
         {iterate_lower_bound, emqx_ds_bitmask_keymapper:key_to_bitstring(Keymapper, LowerBound)},
         {iterate_upper_bound, emqx_ds_bitmask_keymapper:key_to_bitstring(Keymapper, UpperBound + 1)}
     ]),
-    try
-        put(?COUNTER, 0),
-        next_loop(ITHandle, Keymapper, Filter, SafeCutoffTime, It, [], BatchSize)
-    after
-        rocksdb:iterator_close(ITHandle),
-        erase(?COUNTER)
-    end.
-
-%%================================================================================
-%% Internal functions
-%%================================================================================
+    #{
+        it_handle => ITHandle,
+        keymapper => Keymapper,
+        filter => Filter
+    }.
 
 next_loop(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) ->
     {ok, It, lists:reverse(Acc)};
@@ -412,7 +521,108 @@ traverse_interval(ITHandle, Filter, Cutoff, It, Acc, N) ->
             {0, It, Acc}
     end.
 
--spec check_message(emqx_ds:time(), iterator(), emqx_types:message()) ->
+delete_next_loop(
+    #{deleted := AccDel, iterated_over := AccIter, storage_iter := It, remaining := 0}
+) ->
+    {ok, It, AccDel, AccIter};
+delete_next_loop(LoopContext0) ->
+    #{
+        storage_iter := It0,
+        filter := Filter,
+        deleted := AccDel0,
+        iterated_over := AccIter0,
+        it_handle := ITHandle
+    } = LoopContext0,
+    inc_counter(),
+    #{?tag := ?DELETE_IT, ?last_seen_key := Key0} = It0,
+    case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of
+        overflow ->
+            {ok, It0, AccDel0, AccIter0};
+        Key1 ->
+            %% assert
+            true = Key1 > Key0,
+            case rocksdb:iterator_move(ITHandle, {seek, Key1}) of
+                {ok, Key, Val} ->
+                    {N, It, AccDel, AccIter} =
+                        delete_traverse_interval(LoopContext0#{
+                            iterated_over := AccIter0 + 1,
+                            current_key => Key,
+                            current_val => Val
+                        }),
+                    delete_next_loop(LoopContext0#{
+                        iterated_over := AccIter,
+                        deleted := AccDel,
+                        remaining := N,
+                        storage_iter := It
+                    });
+                {error, invalid_iterator} ->
+                    {ok, It0, AccDel0, AccIter0}
+            end
+    end.
+
+delete_traverse_interval(LoopContext0) ->
+    #{
+        storage_iter := It0,
+        current_key := Key,
+        current_val := Val,
+        filter := Filter,
+        safe_cutoff_time := Cutoff,
+        selector := Selector,
+        db := DB,
+        cf := CF,
+        deleted := AccDel0,
+        iterated_over := AccIter0,
+        remaining := Remaining0
+    } = LoopContext0,
+    It = It0#{?last_seen_key := Key},
+    case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of
+        true ->
+            Msg = deserialize(Val),
+            case check_message(Cutoff, It, Msg) of
+                true ->
+                    case Selector(Msg) of
+                        true ->
+                            ok = rocksdb:delete(DB, CF, Key, _WriteOpts = []),
+                            delete_traverse_interval1(LoopContext0#{
+                                deleted := AccDel0 + 1,
+                                remaining := Remaining0 - 1
+                            });
+                        false ->
+                            delete_traverse_interval1(LoopContext0#{remaining := Remaining0 - 1})
+                    end;
+                false ->
+                    delete_traverse_interval1(LoopContext0);
+                overflow ->
+                    {0, It0, AccDel0, AccIter0}
+            end;
+        false ->
+            {Remaining0, It, AccDel0, AccIter0}
+    end.
+
+delete_traverse_interval1(#{
+    storage_iter := It, deleted := AccDel, iterated_over := AccIter, remaining := 0
+}) ->
+    {0, It, AccDel, AccIter};
+delete_traverse_interval1(LoopContext0) ->
+    #{
+        it_handle := ITHandle,
+        deleted := AccDel,
+        iterated_over := AccIter,
+        storage_iter := It
+    } = LoopContext0,
+    inc_counter(),
+    case rocksdb:iterator_move(ITHandle, next) of
+        {ok, Key, Val} ->
+            delete_traverse_interval(LoopContext0#{
+                iterated_over := AccIter + 1,
+                current_key := Key,
+                current_val := Val
+            });
+        {error, invalid_iterator} ->
+            {0, It, AccDel, AccIter}
+    end.
+
+-spec check_message(emqx_ds:time(), iterator() | delete_iterator(), emqx_types:message()) ->
     true | false | overflow.
 check_message(
     Cutoff,
@@ -430,6 +640,12 @@ check_message(
     #message{timestamp = Timestamp, topic = Topic}
 ) when Timestamp >= StartTime ->
     emqx_topic:match(emqx_topic:tokens(Topic), TopicFilter);
+check_message(
+    _Cutoff,
+    #{?tag := ?DELETE_IT, ?start_time := StartTime, ?topic_filter := TopicFilter},
+    #message{timestamp = Timestamp, topic = Topic}
+) when Timestamp >= StartTime ->
+    emqx_topic:match(emqx_topic:tokens(Topic), TopicFilter);
 check_message(_Cutoff, _It, _Msg) ->
     false.
 

+ 95 - 1
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -23,9 +23,12 @@
     drop_shard/1,
     store_batch/3,
     get_streams/3,
+    get_delete_streams/3,
     make_iterator/4,
+    make_delete_iterator/4,
     update_iterator/3,
     next/3,
+    delete_next/4,
     update_config/2,
     add_generation/1,
     list_generations_with_lifetimes/1,
@@ -43,8 +46,10 @@
     generation/0,
     cf_refs/0,
     stream/0,
+    delete_stream/0,
     stream_v1/0,
     iterator/0,
+    delete_iterator/0,
     shard_id/0,
     options/0,
     prototype/0,
@@ -56,6 +61,7 @@
 -define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
 
 -define(stream_v2(GENERATION, INNER), [GENERATION | INNER]).
+-define(delete_stream(GENERATION, INNER), [GENERATION | INNER]).
 
 %%================================================================================
 %% Type declarations
@@ -67,6 +73,7 @@
 %% tags:
 -define(STREAM, 1).
 -define(IT, 2).
+-define(DELETE_IT, 3).
 
 %% keys:
 -define(tag, 1).
@@ -94,7 +101,10 @@
 %% Note: this might be stored permanently on a remote node.
 -opaque stream() :: nonempty_maybe_improper_list(gen_id(), term()).
 
-%% Note: this might be stred permanently on a remote node.
+%% Note: this might be stored permanently on a remote node.
+-opaque delete_stream() :: stream().
+
+%% Note: this might be stored permanently on a remote node.
 -opaque iterator() ::
     #{
         ?tag := ?IT,
@@ -102,6 +112,14 @@
         ?enc := term()
     }.
 
+%% Note: this might be stored permanently on a remote node.
+-opaque delete_iterator() ::
+    #{
+        ?tag := ?DELETE_IT,
+        ?generation := gen_id(),
+        ?enc := term()
+    }.
+
 %%%% Generation:
 
 -define(GEN_KEY(GEN_ID), {generation, GEN_ID}).
@@ -185,6 +203,11 @@
 -callback make_iterator(shard_id(), _Data, _Stream, emqx_ds:topic_filter(), emqx_ds:time()) ->
     emqx_ds:make_iterator_result(_Iterator).
 
+-callback make_delete_iterator(
+    shard_id(), _Data, _DeleteStream, emqx_ds:topic_filter(), emqx_ds:time()
+) ->
+    emqx_ds:make_delete_iterator_result(_Iterator).
+
 -callback next(shard_id(), _Data, Iter, pos_integer()) ->
     {ok, Iter, [emqx_types:message()]} | {error, _}.
 
@@ -238,6 +261,29 @@ get_streams(Shard, TopicFilter, StartTime) ->
         Gens
     ).
 
+-spec get_delete_streams(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) ->
+    [delete_stream()].
+get_delete_streams(Shard, TopicFilter, StartTime) ->
+    Gens = generations_since(Shard, StartTime),
+    ?tp(get_streams_all_gens, #{gens => Gens}),
+    lists:flatmap(
+        fun(GenId) ->
+            ?tp(get_streams_get_gen, #{gen_id => GenId}),
+            case generation_get_safe(Shard, GenId) of
+                {ok, #{module := Mod, data := GenData}} ->
+                    Streams = Mod:get_delete_streams(Shard, GenData, TopicFilter, StartTime),
+                    [
+                        ?delete_stream(GenId, InnerStream)
+                     || InnerStream <- Streams
+                    ];
+                {error, not_found} ->
+                    %% race condition: generation was dropped before getting its streams?
+                    []
+            end
+        end,
+        Gens
+    ).
+
 -spec make_iterator(shard_id(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
     emqx_ds:make_iterator_result(iterator()).
 make_iterator(
@@ -259,6 +305,27 @@ make_iterator(
             {error, end_of_stream}
     end.
 
+-spec make_delete_iterator(shard_id(), delete_stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
+    emqx_ds:make_delete_iterator_result(delete_iterator()).
+make_delete_iterator(
+    Shard, ?delete_stream(GenId, Stream), TopicFilter, StartTime
+) ->
+    case generation_get_safe(Shard, GenId) of
+        {ok, #{module := Mod, data := GenData}} ->
+            case Mod:make_delete_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of
+                {ok, Iter} ->
+                    {ok, #{
+                        ?tag => ?DELETE_IT,
+                        ?generation => GenId,
+                        ?enc => Iter
+                    }};
+                {error, _} = Err ->
+                    Err
+            end;
+        {error, not_found} ->
+            {error, end_of_stream}
+    end.
+
 -spec update_iterator(
     shard_id(), iterator(), emqx_ds:message_key()
 ) ->
@@ -306,6 +373,33 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch
             {ok, end_of_stream}
     end.
 
+-spec delete_next(shard_id(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) ->
+    emqx_ds:delete_next_result(delete_iterator()).
+delete_next(
+    Shard,
+    Iter = #{?tag := ?DELETE_IT, ?generation := GenId, ?enc := GenIter0},
+    Selector,
+    BatchSize
+) ->
+    case generation_get_safe(Shard, GenId) of
+        {ok, #{module := Mod, data := GenData}} ->
+            Current = generation_current(Shard),
+            case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize) of
+                {ok, _GenIter, _Deleted = 0, _IteratedOver = 0} when GenId < Current ->
+                    %% This is a past generation. Storage layer won't write
+                    %% any more messages here. The iterator reached the end:
+                    %% the stream has been fully replayed.
+                    {ok, end_of_stream};
+                {ok, GenIter, NumDeleted, _IteratedOver} ->
+                    {ok, Iter#{?enc := GenIter}, NumDeleted};
+                Error = {error, _} ->
+                    Error
+            end;
+        {error, not_found} ->
+            %% generation was possibly dropped by GC
+            {ok, end_of_stream}
+    end.
+
 -spec update_config(shard_id(), emqx_ds:create_db_opts()) -> ok.
 update_config(ShardId, Options) ->
     gen_server:call(?REF(ShardId), {?FUNCTION_NAME, Options}, infinity).

+ 111 - 1
apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl

@@ -33,9 +33,12 @@
     drop/5,
     store_batch/4,
     get_streams/4,
+    get_delete_streams/4,
     make_iterator/5,
+    make_delete_iterator/5,
     update_iterator/4,
-    next/4
+    next/4,
+    delete_next/5
 ]).
 
 %% internal exports:
@@ -62,12 +65,20 @@
 
 -record(stream, {}).
 
+-record(delete_stream, {}).
+
 -record(it, {
     topic_filter :: emqx_ds:topic_filter(),
     start_time :: emqx_ds:time(),
     last_seen_message_key = first :: binary() | first
 }).
 
+-record(delete_it, {
+    topic_filter :: emqx_ds:topic_filter(),
+    start_time :: emqx_ds:time(),
+    last_seen_message_key = first :: binary() | first
+}).
+
 %%================================================================================
 %% API funcions
 %%================================================================================
@@ -118,12 +129,21 @@ store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) ->
 get_streams(_Shard, _Data, _TopicFilter, _StartTime) ->
     [#stream{}].
 
+get_delete_streams(_Shard, _Data, _TopicFilter, _StartTime) ->
+    [#delete_stream{}].
+
 make_iterator(_Shard, _Data, #stream{}, TopicFilter, StartTime) ->
     {ok, #it{
         topic_filter = TopicFilter,
         start_time = StartTime
     }}.
 
+make_delete_iterator(_Shard, _Data, #delete_stream{}, TopicFilter, StartTime) ->
+    {ok, #delete_it{
+        topic_filter = TopicFilter,
+        start_time = StartTime
+    }}.
+
 update_iterator(_Shard, _Data, OldIter, DSKey) ->
     #it{
         topic_filter = TopicFilter,
@@ -151,6 +171,37 @@ next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) ->
     It = It0#it{last_seen_message_key = Key},
     {ok, It, lists:reverse(Messages)}.
 
+delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize) ->
+    #delete_it{
+        topic_filter = TopicFilter,
+        start_time = StartTime,
+        last_seen_message_key = Key0
+    } = It0,
+    {ok, ITHandle} = rocksdb:iterator(DB, CF, []),
+    Action =
+        case Key0 of
+            first ->
+                first;
+            _ ->
+                _ = rocksdb:iterator_move(ITHandle, Key0),
+                next
+        end,
+    {Key, {NumDeleted, NumIterated}} = do_delete_next(
+        TopicFilter,
+        StartTime,
+        DB,
+        CF,
+        ITHandle,
+        Action,
+        Selector,
+        BatchSize,
+        Key0,
+        {0, 0}
+    ),
+    rocksdb:iterator_close(ITHandle),
+    It = It0#delete_it{last_seen_message_key = Key},
+    {ok, It, NumDeleted, NumIterated}.
+
 %%================================================================================
 %% Internal functions
 %%================================================================================
@@ -172,6 +223,65 @@ do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) ->
             {Key0, Acc}
     end.
 
+%% TODO: use a context map...
+do_delete_next(_, _, _, _, _, _, _, 0, Key, Acc) ->
+    {Key, Acc};
+do_delete_next(
+    TopicFilter, StartTime, DB, CF, IT, Action, Selector, NLeft, Key0, {AccDel, AccIter}
+) ->
+    case rocksdb:iterator_move(IT, Action) of
+        {ok, Key, Blob} ->
+            Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob),
+            TopicWords = emqx_topic:words(Topic),
+            case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of
+                true ->
+                    case Selector(Msg) of
+                        true ->
+                            ok = rocksdb:delete(DB, CF, Key, _WriteOpts = []),
+                            do_delete_next(
+                                TopicFilter,
+                                StartTime,
+                                DB,
+                                CF,
+                                IT,
+                                next,
+                                Selector,
+                                NLeft - 1,
+                                Key,
+                                {AccDel + 1, AccIter + 1}
+                            );
+                        false ->
+                            do_delete_next(
+                                TopicFilter,
+                                StartTime,
+                                DB,
+                                CF,
+                                IT,
+                                next,
+                                Selector,
+                                NLeft - 1,
+                                Key,
+                                {AccDel, AccIter + 1}
+                            )
+                    end;
+                false ->
+                    do_delete_next(
+                        TopicFilter,
+                        StartTime,
+                        DB,
+                        CF,
+                        IT,
+                        next,
+                        Selector,
+                        NLeft,
+                        Key,
+                        {AccDel, AccIter + 1}
+                    )
+            end;
+        {error, invalid_iterator} ->
+            {Key0, {AccDel, AccIter}}
+    end.
+
 %% @doc Generate a column family ID for the MQTT messages
 -spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()].
 data_cf(GenId) ->

+ 58 - 8
apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl

@@ -27,10 +27,13 @@
     next/5,
     update_iterator/5,
     add_generation/2,
-
-    %% introduced in v3
     list_generations_with_lifetimes/3,
-    drop_generation/4
+    drop_generation/4,
+
+    %% introduced in v4
+    get_delete_streams/5,
+    make_delete_iterator/6,
+    delete_next/6
 ]).
 
 %% behavior callbacks:
@@ -114,10 +117,6 @@ update_iterator(Node, DB, Shard, OldIter, DSKey) ->
 add_generation(Node, DB) ->
     erpc:multicall(Node, emqx_ds_replication_layer, do_add_generation_v2, [DB]).
 
-%%--------------------------------------------------------------------------------
-%% Introduced in V3
-%%--------------------------------------------------------------------------------
-
 -spec list_generations_with_lifetimes(
     node(),
     emqx_ds:db(),
@@ -139,9 +138,60 @@ list_generations_with_lifetimes(Node, DB, Shard) ->
 drop_generation(Node, DB, Shard, GenId) ->
     erpc:call(Node, emqx_ds_replication_layer, do_drop_generation_v3, [DB, Shard, GenId]).
 
+%%--------------------------------------------------------------------------------
+%% Introduced in V4
+%%--------------------------------------------------------------------------------
+
+-spec get_delete_streams(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds:topic_filter(),
+    emqx_ds:time()
+) ->
+    [emqx_ds_storage_layer:delete_stream()].
+get_delete_streams(Node, DB, Shard, TopicFilter, Time) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_get_delete_streams_v4, [
+        DB, Shard, TopicFilter, Time
+    ]).
+
+-spec make_delete_iterator(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:delete_stream(),
+    emqx_ds:topic_filter(),
+    emqx_ds:time()
+) ->
+    {ok, emqx_ds_storage_layer:delete_iterator()} | {error, _}.
+make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_make_delete_iterator_v4, [
+        DB, Shard, Stream, TopicFilter, StartTime
+    ]).
+
+-spec delete_next(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:delete_iterator(),
+    emqx_ds:delete_selector(),
+    pos_integer()
+) ->
+    {ok, emqx_ds_storage_layer:delete_iterator(), non_neg_integer()}
+    | {ok, end_of_stream}
+    | {error, _}.
+delete_next(Node, DB, Shard, Iter, Selector, BatchSize) ->
+    emqx_rpc:call(
+        Shard,
+        Node,
+        emqx_ds_replication_layer,
+        do_delete_next_v4,
+        [DB, Shard, Iter, Selector, BatchSize]
+    ).
+
 %%================================================================================
 %% behavior callbacks
 %%================================================================================
 
 introduced_in() ->
-    "5.5.1".
+    "5.6.0".

+ 57 - 0
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -372,6 +372,48 @@ t_10_non_atomic_store_batch(_Config) ->
     ),
     ok.
 
+t_smoke_delete_next(_Config) ->
+    DB = ?FUNCTION_NAME,
+    ?check_trace(
+        begin
+            ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+            StartTime = 0,
+            TopicFilter = [<<"foo">>, '#'],
+            Msgs =
+                [Msg1, _Msg2, Msg3] = [
+                    message(<<"foo/bar">>, <<"1">>, 0),
+                    message(<<"foo">>, <<"2">>, 1),
+                    message(<<"bar/bar">>, <<"3">>, 2)
+                ],
+            ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
+
+            [DStream] = emqx_ds:get_delete_streams(DB, TopicFilter, StartTime),
+            {ok, DIter0} = emqx_ds:make_delete_iterator(DB, DStream, TopicFilter, StartTime),
+
+            Selector = fun(#message{topic = Topic}) ->
+                Topic == <<"foo">>
+            end,
+            {ok, DIter1, NumDeleted1} = delete(DB, DIter0, Selector, 1),
+            ?assertEqual(0, NumDeleted1),
+            {ok, DIter2, NumDeleted2} = delete(DB, DIter1, Selector, 1),
+            ?assertEqual(1, NumDeleted2),
+
+            TopicFilterHash = ['#'],
+            [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilterHash, StartTime),
+            {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilterHash, StartTime),
+            {ok, _Iter, Batch} = iterate(DB, Iter0, 1),
+            ?assertEqual([Msg1, Msg3], [Msg || {_Key, Msg} <- Batch]),
+
+            ok = emqx_ds:add_generation(DB),
+
+            ?assertMatch({ok, end_of_stream}, emqx_ds:delete_next(DB, DIter2, Selector, 1)),
+
+            ok
+        end,
+        []
+    ),
+    ok.
+
 t_drop_generation_with_never_used_iterator(_Config) ->
     %% This test checks how the iterator behaves when:
     %%   1) it's created at generation 1 and not consumed from.
@@ -609,6 +651,21 @@ iterate(DB, It0, BatchSize, Acc) ->
             Ret
     end.
 
+delete(DB, It, Selector, BatchSize) ->
+    delete(DB, It, Selector, BatchSize, 0).
+
+delete(DB, It0, Selector, BatchSize, Acc) ->
+    case emqx_ds:delete_next(DB, It0, Selector, BatchSize) of
+        {ok, It, 0} ->
+            {ok, It, Acc};
+        {ok, It, NumDeleted} ->
+            delete(DB, It, BatchSize, Selector, Acc + NumDeleted);
+        {ok, end_of_stream} ->
+            {ok, end_of_stream, Acc};
+        Ret ->
+            Ret
+    end.
+
 %% CT callbacks
 
 all() -> emqx_common_test_helpers:all(?MODULE).

+ 97 - 0
apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl

@@ -83,6 +83,35 @@ t_iterate(_Config) ->
     ],
     ok.
 
+%% Smoke test for deleting messages.
+t_delete(_Config) ->
+    %% Prepare data:
+    TopicToDelete = <<"foo/bar/baz">>,
+    Topics = [<<"foo/bar">>, TopicToDelete, <<"a">>],
+    Timestamps = lists:seq(1, 10),
+    Batch = [
+        make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))
+     || Topic <- Topics, PublishedAt <- Timestamps
+    ],
+    ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []),
+
+    %% Iterate through topics:
+    StartTime = 0,
+    TopicFilter = parse_topic(<<"#">>),
+    Selector = fun(#message{topic = T} = Msg) ->
+        T == TopicToDelete
+    end,
+    NumDeleted = delete(?SHARD, TopicFilter, StartTime, Selector),
+    ?assertEqual(10, NumDeleted),
+
+    %% Read surviving messages.
+    Messages = [Msg || {_DSKey, Msg} <- replay(?SHARD, TopicFilter, StartTime)],
+    MessagesByTopic = maps:groups_from_list(fun emqx_message:topic/1, Messages),
+    ?assertNot(is_map_key(TopicToDelete, MessagesByTopic), #{msgs => MessagesByTopic}),
+    ?assertEqual(20, length(Messages)),
+
+    ok.
+
 -define(assertSameSet(A, B), ?assertEqual(lists:sort(A), lists:sort(B))).
 
 %% Smoke test that verifies that concrete topics are mapped to
@@ -509,3 +538,71 @@ keyspace(TC) ->
 
 set_keyspace_config(Keyspace, Config) ->
     ok = application:set_env(emqx_ds, keyspace_config, #{Keyspace => Config}).
+
+delete(Shard, TopicFilter, Time, Selector) ->
+    Streams = emqx_ds_storage_layer:get_delete_streams(Shard, TopicFilter, Time),
+    Iterators = lists:map(
+        fun(Stream) ->
+            {ok, Iterator} = emqx_ds_storage_layer:make_delete_iterator(
+                Shard,
+                Stream,
+                TopicFilter,
+                Time
+            ),
+            Iterator
+        end,
+        Streams
+    ),
+    delete(Shard, Iterators, Selector).
+
+delete(_Shard, [], _Selector) ->
+    0;
+delete(Shard, Iterators, Selector) ->
+    {NewIterators0, N} = lists:foldl(
+        fun(Iterator0, {AccIterators, NAcc}) ->
+            case emqx_ds_storage_layer:delete_next(Shard, Iterator0, Selector, 10) of
+                {ok, end_of_stream} ->
+                    {AccIterators, NAcc};
+                {ok, _Iterator1, 0} ->
+                    {AccIterators, NAcc};
+                {ok, Iterator1, NDeleted} ->
+                    {[Iterator1 | AccIterators], NDeleted + NAcc}
+            end
+        end,
+        {[], 0},
+        Iterators
+    ),
+    NewIterators1 = lists:reverse(NewIterators0),
+    N + delete(Shard, NewIterators1, Selector).
+
+replay(Shard, TopicFilter, Time) ->
+    StreamsByRank = emqx_ds_storage_layer:get_streams(Shard, TopicFilter, Time),
+    Iterators = lists:map(
+        fun({_Rank, Stream}) ->
+            {ok, Iterator} = emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, Time),
+            Iterator
+        end,
+        StreamsByRank
+    ),
+    replay(Shard, Iterators).
+
+replay(_Shard, []) ->
+    [];
+replay(Shard, Iterators) ->
+    {NewIterators0, Messages0} = lists:foldl(
+        fun(Iterator0, {AccIterators, AccMessages}) ->
+            case emqx_ds_storage_layer:next(Shard, Iterator0, 10) of
+                {ok, end_of_stream} ->
+                    {AccIterators, AccMessages};
+                {ok, _Iterator1, []} ->
+                    {AccIterators, AccMessages};
+                {ok, Iterator1, NewMessages} ->
+                    {[Iterator1 | AccIterators], [NewMessages | AccMessages]}
+            end
+        end,
+        {[], []},
+        Iterators
+    ),
+    Messages1 = lists:flatten(lists:reverse(Messages0)),
+    NewIterators1 = lists:reverse(NewIterators0),
+    Messages1 ++ replay(Shard, NewIterators1).