Просмотр исходного кода

feat(ds): update delete/count interface

Ilya Averyanov 2 лет назад
Родитель
Сommit
d5ae0e5c53
1 измененных файлов с 16 добавлено и 7 удалено
  1. 16 7
      apps/emqx_durable_storage/src/emqx_ds.erl

+ 16 - 7
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -42,7 +42,7 @@
 -export([get_delete_streams/3, make_delete_iterator/4, delete_next/4]).
 
 %% Misc. API:
--export([]).
+-export([count/1]).
 
 -export_type([
     create_db_opts/0,
@@ -52,6 +52,7 @@
     topic/0,
     stream/0,
     delete_stream/0,
+    delete_selector/0,
     rank_x/0,
     rank_y/0,
     stream_rank/0,
@@ -105,6 +106,8 @@
 
 -opaque delete_stream() :: ds_specific_delete_stream().
 
+-type delete_selector() :: fun((emqx_types:message()) -> boolean()).
+
 -type ds_specific_iterator() :: term().
 
 -type ds_specific_stream() :: term().
@@ -128,9 +131,6 @@
 
 -type ds_specific_delete_stream() :: term().
 
--type ds_delete_selector() ::
-    fun((message_key()) -> boolean()) | fun((message_key(), emqx_types:message()) -> boolean()).
-
 -type make_delete_iterator_result(DeleteIterator) :: {ok, DeleteIterator} | {error, term()}.
 
 -type make_delete_iterator_result() :: make_delete_iterator_result(delete_iterator()).
@@ -212,16 +212,20 @@
 -callback make_delete_iterator(db(), ds_specific_delete_stream(), topic_filter(), time()) ->
     make_delete_iterator_result(ds_specific_delete_iterator()).
 
--callback delete_next(db(), DeleteIterator, ds_delete_selector(), pos_integer()) ->
+-callback delete_next(db(), DeleteIterator, delete_selector(), pos_integer()) ->
     delete_next_result(DeleteIterator).
 
+-callback count(db()) -> non_neg_integer().
+
 -optional_callbacks([
     list_generations_with_lifetimes/1,
     drop_generation/2,
 
     get_delete_streams/3,
     make_delete_iterator/4,
-    delete_next/4
+    delete_next/4,
+
+    count/1
 ]).
 
 %%================================================================================
@@ -358,7 +362,7 @@ make_delete_iterator(DB, Stream, TopicFilter, StartTime) ->
         Mod, make_delete_iterator, [DB, Stream, TopicFilter, StartTime], {error, not_implemented}
     ).
 
--spec delete_next(db(), delete_iterator(), ds_delete_selector(), pos_integer()) ->
+-spec delete_next(db(), delete_iterator(), delete_selector(), pos_integer()) ->
     delete_next_result().
 delete_next(DB, Iter, Selector, BatchSize) ->
     Mod = ?module(DB),
@@ -366,6 +370,11 @@ delete_next(DB, Iter, Selector, BatchSize) ->
         Mod, delete_next, [DB, Iter, Selector, BatchSize], {error, not_implemented}
     ).
 
+-spec count(db()) -> non_neg_integer() | {error, not_implemented}.
+count(DB) ->
+    Mod = ?module(DB),
+    call_if_implemented(Mod, count, [DB], {error, not_implemented}).
+
 %%================================================================================
 %% Internal exports
 %%================================================================================