Просмотр исходного кода

fix(ds): Move responsibility of returning end_of_stream to the CBM

ieQu1 1 год назад
Родитель
Сommit
60edf5e9b8

+ 27 - 12
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -35,7 +35,7 @@
     make_iterator/5,
     make_delete_iterator/5,
     update_iterator/4,
-    next/5,
+    next/6,
     delete_next/6,
     post_creation_actions/1,
 
@@ -424,23 +424,21 @@ next(
     Schema = #s{ts_offset = TSOffset, ts_bits = TSBits},
     It = #{?storage_key := Stream},
     BatchSize,
-    Now
+    Now,
+    IsCurrent
 ) ->
     init_counters(),
     %% Compute safe cutoff time. It's the point in time where the last
     %% complete epoch ends, so we need to know the current time to
     %% compute it. This is needed because new keys can be added before
     %% the iterator.
-    IsWildcard =
+    %%
+    %% This is needed to avoid situations when the iterator advances
+    %% to position k1, and then a new message with k2, such that k2 <
+    %% k1 is inserted. k2 would be missed.
+    HasCutoff =
         case Stream of
-            {_StaticKey, []} -> false;
-            _ -> true
-        end,
-    SafeCutoffTime =
-        case IsWildcard of
-            true ->
-                (Now bsr TSOffset) bsl TSOffset;
-            false ->
+            {_StaticKey, []} ->
                 %% Iterators scanning streams without varying topic
                 %% levels can operate on incomplete epochs, since new
                 %% matching keys for the single topic are added in
@@ -450,10 +448,27 @@ next(
                 %% filters operating on streams with varying parts:
                 %% iterator can jump to the next topic and then it
                 %% won't backtrack.
+                false;
+            _ ->
+                %% New batches are only added to the current
+                %% generation. We can ignore cutoff time for old
+                %% generations:
+                IsCurrent
+        end,
+    SafeCutoffTime =
+        case HasCutoff of
+            true ->
+                (Now bsr TSOffset) bsl TSOffset;
+            false ->
                 1 bsl TSBits - 1
         end,
     try
-        next_until(Schema, It, SafeCutoffTime, BatchSize)
+        case next_until(Schema, It, SafeCutoffTime, BatchSize) of
+            {ok, _, []} when not IsCurrent ->
+                {ok, end_of_stream};
+            Result ->
+                Result
+        end
     after
         report_counters(Shard)
     end.

+ 6 - 9
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -248,8 +248,8 @@
 ) ->
     emqx_ds:make_delete_iterator_result(_Iterator).
 
--callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time()) ->
-    {ok, Iter, [emqx_types:message()]} | {error, _}.
+-callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time(), _IsCurrent :: boolean()) ->
+    {ok, Iter, [emqx_types:message()]} | {ok, end_of_stream} | {error, _}.
 
 -callback delete_next(
     shard_id(), _Data, DeleteIterator, emqx_ds:delete_selector(), pos_integer(), emqx_ds:time()
@@ -449,15 +449,12 @@ update_iterator(
 next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize, Now) ->
     case generation_get(Shard, GenId) of
         #{module := Mod, data := GenData} ->
-            Current = generation_current(Shard),
-            case Mod:next(Shard, GenData, GenIter0, BatchSize, Now) of
-                {ok, _GenIter, []} when GenId < Current ->
-                    %% This is a past generation. Storage layer won't write
-                    %% any more messages here. The iterator reached the end:
-                    %% the stream has been fully replayed.
-                    {ok, end_of_stream};
+            IsCurrent = GenId =:= generation_current(Shard),
+            case Mod:next(Shard, GenData, GenIter0, BatchSize, Now, IsCurrent) of
                 {ok, GenIter, Batch} ->
                     {ok, Iter#{?enc := GenIter}, Batch};
+                {ok, end_of_stream} ->
+                    {ok, end_of_stream};
                 Error = {error, _, _} ->
                     Error
             end;

+ 8 - 3
apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl

@@ -38,7 +38,7 @@
     make_iterator/5,
     make_delete_iterator/5,
     update_iterator/4,
-    next/5,
+    next/6,
     delete_next/6
 ]).
 
@@ -148,7 +148,7 @@ update_iterator(_Shard, _Data, OldIter, DSKey) ->
         last_seen_message_key = DSKey
     }}.
 
-next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now) ->
+next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now, IsCurrent) ->
     #it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0,
     {ok, ITHandle} = rocksdb:iterator(DB, CF, []),
     Action =
@@ -162,7 +162,12 @@ next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now) ->
     {Key, Messages} = do_next(TopicFilter, StartTime, ITHandle, Action, BatchSize, Key0, []),
     rocksdb:iterator_close(ITHandle),
     It = It0#it{last_seen_message_key = Key},
-    {ok, It, lists:reverse(Messages)}.
+    case Messages of
+        [] when not IsCurrent ->
+            {ok, end_of_stream};
+        _ ->
+            {ok, It, lists:reverse(Messages)}
+    end.
 
 delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now) ->
     #delete_it{