Browse Source

feat(ds): Pass current time to the storage layer via argument

ieQu1 1 year atrás
parent
commit
1ff2e02fd9

+ 4 - 2
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -491,7 +491,7 @@ do_next_v1(DB, Shard, Iter, BatchSize) ->
     ShardId = {DB, Shard},
     ?IF_STORAGE_RUNNING(
         ShardId,
-        emqx_ds_storage_layer:next(ShardId, Iter, BatchSize)
+        emqx_ds_storage_layer:next(ShardId, Iter, BatchSize, emqx_ds:timestamp_us())
     ).
 
 -spec do_delete_next_v4(
@@ -503,7 +503,9 @@ do_next_v1(DB, Shard, Iter, BatchSize) ->
 ) ->
     emqx_ds:delete_next_result(emqx_ds_storage_layer:delete_iterator()).
 do_delete_next_v4(DB, Shard, Iter, Selector, BatchSize) ->
-    emqx_ds_storage_layer:delete_next({DB, Shard}, Iter, Selector, BatchSize).
+    emqx_ds_storage_layer:delete_next(
+        {DB, Shard}, Iter, Selector, BatchSize, emqx_ds:timestamp_us()
+    ).
 
 -spec do_add_generation_v2(emqx_ds:db()) -> no_return().
 do_add_generation_v2(_DB) ->

+ 6 - 7
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -34,8 +34,8 @@
     make_iterator/5,
     make_delete_iterator/5,
     update_iterator/4,
-    next/4,
-    delete_next/5,
+    next/5,
+    delete_next/6,
     post_creation_actions/1
 ]).
 
@@ -354,8 +354,9 @@ update_iterator(
 next(
     Shard,
     Schema = #s{ts_offset = TSOffset, ts_bits = TSBits},
-    It = #{?topic_filter := TF, ?storage_key := Stream},
-    BatchSize
+    It = #{?storage_key := Stream},
+    BatchSize,
+    Now
 ) ->
     init_counters(),
     %% Compute safe cutoff time. It's the point in time where the last
@@ -370,7 +371,6 @@ next(
     SafeCutoffTime =
         case IsWildcard of
             true ->
-                Now = emqx_ds:timestamp_us(),
                 (Now bsr TSOffset) bsl TSOffset;
             false ->
                 %% Iterators scanning streams without varying topic
@@ -415,12 +415,11 @@ next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime,
         rocksdb:iterator_close(ITHandle)
     end.
 
-delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize) ->
+delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize, Now) ->
     %% Compute safe cutoff time.
     %% It's the point in time where the last complete epoch ends, so we need to know
     %% the current time to compute it.
     init_counters(),
-    Now = emqx_message:timestamp_now(),
     SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset,
     try
         delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize)

+ 17 - 9
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -31,8 +31,8 @@
     make_iterator/4,
     make_delete_iterator/4,
     update_iterator/3,
-    next/3,
-    delete_next/4,
+    next/4,
+    delete_next/5,
 
     %% Generations
     update_config/3,
@@ -223,9 +223,14 @@
 ) ->
     emqx_ds:make_delete_iterator_result(_Iterator).
 
--callback next(shard_id(), _Data, Iter, pos_integer()) ->
+-callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time()) ->
     {ok, Iter, [emqx_types:message()]} | {error, _}.
 
+-callback delete_next(
+    shard_id(), _Data, DeleteIterator, emqx_ds:delete_selector(), pos_integer(), emqx_ds:time()
+) ->
+    {ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}.
+
 -callback post_creation_actions(post_creation_context()) -> _Data.
 
 -optional_callbacks([post_creation_actions/1]).
@@ -377,13 +382,13 @@ update_iterator(
             {error, unrecoverable, generation_not_found}
     end.
 
--spec next(shard_id(), iterator(), pos_integer()) ->
+-spec next(shard_id(), iterator(), pos_integer(), emqx_ds:time()) ->
     emqx_ds:next_result(iterator()).
-next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) ->
+next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize, Now) ->
     case generation_get(Shard, GenId) of
         #{module := Mod, data := GenData} ->
             Current = generation_current(Shard),
-            case Mod:next(Shard, GenData, GenIter0, BatchSize) of
+            case Mod:next(Shard, GenData, GenIter0, BatchSize, Now) of
                 {ok, _GenIter, []} when GenId < Current ->
                     %% This is a past generation. Storage layer won't write
                     %% any more messages here. The iterator reached the end:
@@ -399,18 +404,21 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch
             {error, unrecoverable, generation_not_found}
     end.
 
--spec delete_next(shard_id(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) ->
+-spec delete_next(
+    shard_id(), delete_iterator(), emqx_ds:delete_selector(), pos_integer(), emqx_ds:time()
+) ->
     emqx_ds:delete_next_result(delete_iterator()).
 delete_next(
     Shard,
     Iter = #{?tag := ?DELETE_IT, ?generation := GenId, ?enc := GenIter0},
     Selector,
-    BatchSize
+    BatchSize,
+    Now
 ) ->
     case generation_get(Shard, GenId) of
         #{module := Mod, data := GenData} ->
             Current = generation_current(Shard),
-            case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize) of
+            case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize, Now) of
                 {ok, _GenIter, _Deleted = 0, _IteratedOver = 0} when GenId < Current ->
                     %% This is a past generation. Storage layer won't write
                     %% any more messages here. The iterator reached the end:

+ 4 - 4
apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl

@@ -37,8 +37,8 @@
     make_iterator/5,
     make_delete_iterator/5,
     update_iterator/4,
-    next/4,
-    delete_next/5
+    next/5,
+    delete_next/6
 ]).
 
 %% internal exports:
@@ -154,7 +154,7 @@ update_iterator(_Shard, _Data, OldIter, DSKey) ->
         last_seen_message_key = DSKey
     }}.
 
-next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) ->
+next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now) ->
     #it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0,
     {ok, ITHandle} = rocksdb:iterator(DB, CF, []),
     Action =
@@ -170,7 +170,7 @@ next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) ->
     It = It0#it{last_seen_message_key = Key},
     {ok, It, lists:reverse(Messages)}.
 
-delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize) ->
+delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now) ->
     #delete_it{
         topic_filter = TopicFilter,
         start_time = StartTime,

+ 11 - 5
apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl

@@ -73,13 +73,15 @@ t_iterate(_Config) ->
         begin
             [{_Rank, Stream}] = emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), 0),
             {ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, Stream, parse_topic(Topic), 0),
-            {ok, NextIt, MessagesAndKeys} = emqx_ds_storage_layer:next(?SHARD, It, 100),
+            {ok, NextIt, MessagesAndKeys} = emqx_ds_storage_layer:next(
+                ?SHARD, It, 100, emqx_ds:timestamp_us()
+            ),
             Messages = [Msg || {_DSKey, Msg} <- MessagesAndKeys],
             ?assertEqual(
                 lists:map(fun integer_to_binary/1, Timestamps),
                 payloads(Messages)
             ),
-            {ok, _, []} = emqx_ds_storage_layer:next(?SHARD, NextIt, 100)
+            {ok, _, []} = emqx_ds_storage_layer:next(?SHARD, NextIt, 100, emqx_ds:timestamp_us())
         end
      || Topic <- Topics
     ],
@@ -370,7 +372,7 @@ dump_stream(Shard, Stream, TopicFilter, StartTime) ->
         F(It, 0) ->
             error({too_many_iterations, It});
         F(It, N) ->
-            case emqx_ds_storage_layer:next(Shard, It, BatchSize) of
+            case emqx_ds_storage_layer:next(Shard, It, BatchSize, emqx_ds:timestamp_us()) of
                 end_of_stream ->
                     [];
                 {ok, _NextIt, []} ->
@@ -542,7 +544,11 @@ delete(_Shard, [], _Selector) ->
 delete(Shard, Iterators, Selector) ->
     {NewIterators0, N} = lists:foldl(
         fun(Iterator0, {AccIterators, NAcc}) ->
-            case emqx_ds_storage_layer:delete_next(Shard, Iterator0, Selector, 10) of
+            case
+                emqx_ds_storage_layer:delete_next(
+                    Shard, Iterator0, Selector, 10, emqx_ds:timestamp_us()
+                )
+            of
                 {ok, end_of_stream} ->
                     {AccIterators, NAcc};
                 {ok, _Iterator1, 0} ->
@@ -573,7 +579,7 @@ replay(_Shard, []) ->
 replay(Shard, Iterators) ->
     {NewIterators0, Messages0} = lists:foldl(
         fun(Iterator0, {AccIterators, AccMessages}) ->
-            case emqx_ds_storage_layer:next(Shard, Iterator0, 10) of
+            case emqx_ds_storage_layer:next(Shard, Iterator0, 10, emqx_ds:timestamp_us()) of
                 {ok, end_of_stream} ->
                     {AccIterators, AccMessages};
                 {ok, _Iterator1, []} ->

+ 19 - 7
apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl

@@ -85,8 +85,14 @@ consume_stream(DB, Stream, TopicFilter, StartTime) ->
 consume_iter(DB, It) ->
     consume_iter(DB, It, #{}).
 
-consume_iter(DB, It, Opts) ->
-    consume_iter_with(fun emqx_ds:next/3, [DB], It, Opts).
+consume_iter(DB, It0, Opts) ->
+    consume_iter_with(
+        fun(It, BatchSize) ->
+            emqx_ds:next(DB, It, BatchSize)
+        end,
+        It0,
+        Opts
+    ).
 
 storage_consume(ShardId, TopicFilter) ->
     storage_consume(ShardId, TopicFilter, 0).
@@ -108,16 +114,22 @@ storage_consume_stream(ShardId, Stream, TopicFilter, StartTime) ->
 storage_consume_iter(ShardId, It) ->
     storage_consume_iter(ShardId, It, #{}).
 
-storage_consume_iter(ShardId, It, Opts) ->
-    consume_iter_with(fun emqx_ds_storage_layer:next/3, [ShardId], It, Opts).
+storage_consume_iter(ShardId, It0, Opts) ->
+    consume_iter_with(
+        fun(It, BatchSize) ->
+            emqx_ds_storage_layer:next(ShardId, It, BatchSize, emqx_ds:timestamp_us())
+        end,
+        It0,
+        Opts
+    ).
 
-consume_iter_with(NextFun, Args, It0, Opts) ->
+consume_iter_with(NextFun, It0, Opts) ->
     BatchSize = maps:get(batch_size, Opts, 5),
-    case erlang:apply(NextFun, Args ++ [It0, BatchSize]) of
+    case NextFun(It0, BatchSize) of
         {ok, It, _Msgs = []} ->
             {ok, It, []};
         {ok, It1, Batch} ->
-            {ok, It, Msgs} = consume_iter_with(NextFun, Args, It1, Opts),
+            {ok, It, Msgs} = consume_iter_with(NextFun, It1, Opts),
             {ok, It, [Msg || {_DSKey, Msg} <- Batch] ++ Msgs};
         {ok, Eos = end_of_stream} ->
             {ok, Eos, []};