Selaa lähdekoodia

chore(ds): add delete callbacks

Ilya Averyanov 1 vuosi sitten
vanhempi
commit
b010d34640
1 muutettua tiedostoa jossa 70 lisäystä ja 10 poistoa
  1. 70 10
      apps/emqx_durable_storage/src/emqx_ds.erl

+ 70 - 10
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -38,6 +38,9 @@
 %% Message replay API:
 -export([get_streams/3, make_iterator/4, update_iterator/3, next/3]).
 
+%% Message delete API:
+-export([get_delete_streams/3, make_delete_iterator/4, delete_next/4]).
+
 %% Misc. API:
 -export([]).
 
@@ -48,10 +51,12 @@
     topic_filter/0,
     topic/0,
     stream/0,
+    delete_stream/0,
     rank_x/0,
     rank_y/0,
     stream_rank/0,
     iterator/0,
+    delete_iterator/0,
     iterator_id/0,
     message_id/0,
     message_key/0,
@@ -59,11 +64,12 @@
     next_result/1, next_result/0,
     store_batch_result/0,
     make_iterator_result/1, make_iterator_result/0,
-    get_iterator_result/1,
 
     ds_specific_stream/0,
     ds_specific_iterator/0,
     ds_specific_generation_rank/0,
+    ds_specific_delete_stream/0,
+    ds_specific_delete_iterator/0,
     generation_rank/0,
     generation_info/0
 ]).
@@ -93,8 +99,12 @@
 
 -opaque iterator() :: ds_specific_iterator().
 
+-opaque delete_iterator() :: ds_specific_delete_iterator().
+
 -opaque stream() :: ds_specific_stream().
 
+-opaque delete_stream() :: ds_specific_delete_stream().
+
 -type ds_specific_iterator() :: term().
 
 -type ds_specific_stream() :: term().
@@ -114,6 +124,22 @@
 
 -type next_result() :: next_result(iterator()).
 
+-type ds_specific_delete_iterator() :: term().
+
+-type ds_specific_delete_stream() :: term().
+
+-type ds_delete_selector() ::
+    fun((message_key()) -> boolean()) | fun((message_key(), emqx_types:message()) -> boolean()).
+
+-type make_delete_iterator_result(DeleteIterator) :: {ok, DeleteIterator} | {error, term()}.
+
+-type make_delete_iterator_result() :: make_delete_iterator_result(delete_iterator()).
+
+-type delete_next_result(DeleteIterator) ::
+    {ok, DeleteIterator, non_neg_integer()} | {ok, end_of_stream} | {error, term()}.
+
+-type delete_next_result() :: delete_next_result(delete_iterator()).
+
 %% Timestamp
 %% Earliest possible timestamp is 0.
 %% TODO granularity?  Currently, we should always use milliseconds, as that's the unit we
@@ -137,8 +163,6 @@
 
 -type message_id() :: emqx_ds_replication_layer:message_id().
 
--type get_iterator_result(Iterator) :: {ok, Iterator} | undefined.
-
 %% An opaque term identifying a generation.  Each implementation will possibly add
 %% information to this term to match its inner structure (e.g.: by embedding the shard id,
 %% in the case of `emqx_ds_replication_layer').
@@ -183,9 +207,21 @@
 
 -callback next(db(), Iterator, pos_integer()) -> next_result(Iterator).
 
+-callback get_delete_streams(db(), topic_filter(), time()) -> [ds_specific_delete_stream()].
+
+-callback make_delete_iterator(db(), ds_specific_delete_stream(), topic_filter(), time()) ->
+    make_delete_iterator_result(ds_specific_delete_iterator()).
+
+-callback delete_next(db(), DeleteIterator, ds_delete_selector(), pos_integer()) ->
+    delete_next_result(DeleteIterator).
+
 -optional_callbacks([
     list_generations_with_lifetimes/1,
-    drop_generation/2
+    drop_generation/2,
+
+    get_delete_streams/3,
+    make_delete_iterator/4,
+    delete_next/4
 ]).
 
 %%================================================================================
@@ -219,12 +255,7 @@ update_db_config(DB, Opts) ->
 -spec list_generations_with_lifetimes(db()) -> #{generation_rank() => generation_info()}.
 list_generations_with_lifetimes(DB) ->
     Mod = ?module(DB),
-    case erlang:function_exported(Mod, list_generations_with_lifetimes, 1) of
-        true ->
-            Mod:list_generations_with_lifetimes(DB);
-        false ->
-            #{}
-    end.
+    call_if_implemented(Mod, list_generations_with_lifetimes, [DB], #{}).
 
 -spec drop_generation(db(), generation_rank()) -> ok | {error, _}.
 drop_generation(DB, GenId) ->
@@ -314,6 +345,27 @@ update_iterator(DB, OldIter, DSKey) ->
 next(DB, Iter, BatchSize) ->
     ?module(DB):next(DB, Iter, BatchSize).
 
+-spec get_delete_streams(db(), topic_filter(), time()) -> [delete_stream()].
+get_delete_streams(DB, TopicFilter, StartTime) ->
+    Mod = ?module(DB),
+    call_if_implemented(Mod, get_delete_streams, [DB, TopicFilter, StartTime], []).
+
+-spec make_delete_iterator(db(), ds_specific_delete_stream(), topic_filter(), time()) ->
+    make_delete_iterator_result().
+make_delete_iterator(DB, Stream, TopicFilter, StartTime) ->
+    Mod = ?module(DB),
+    call_if_implemented(
+        Mod, make_delete_iterator, [DB, Stream, TopicFilter, StartTime], {error, not_implemented}
+    ).
+
+-spec delete_next(db(), delete_iterator(), ds_delete_selector(), pos_integer()) ->
+    delete_next_result().
+delete_next(DB, Iter, Selector, BatchSize) ->
+    Mod = ?module(DB),
+    call_if_implemented(
+        Mod, delete_next, [DB, Iter, Selector, BatchSize], {error, not_implemented}
+    ).
+
 %%================================================================================
 %% Internal exports
 %%================================================================================
@@ -321,3 +373,11 @@ next(DB, Iter, BatchSize) ->
 %%================================================================================
 %% Internal functions
 %%================================================================================
+
+call_if_implemented(Mod, Fun, Args, Default) ->
+    case erlang:function_exported(Mod, Fun, length(Args)) of
+        true ->
+            apply(Mod, Fun, Args);
+        false ->
+            Default
+    end.