Przeglądaj źródła

chore(ds): change return type of `storage_layer:next/{1,2}`

Part of https://emqx.atlassian.net/browse/EMQX-10942

The goal is to help make it clear to the caller of `next` what to do next: if the iterator
should still be used or if no new messages will ever come out of it.

From:

```erlang
-spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}.
```

To:

```erlang
-spec next(iterator()) -> {ok, iterator(), [binary()]} | end_of_stream.

-spec next(iterator(), pos_integer()) -> {ok, iterator(), [binary()]} | end_of_stream.
```
Thales Macedo Garitezi 2 lat temu
rodzic
commit
7ab57824dc

+ 2 - 2
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -272,9 +272,9 @@ consume(Shard, IteratorId) when is_binary(IteratorId) ->
 
 consume(It) ->
     case emqx_ds_storage_layer:next(It) of
-        {value, Msg, NIt} ->
+        {ok, NIt, [Msg]} ->
             [emqx_persistent_message:deserialize(Msg) | consume(NIt)];
-        none ->
+        end_of_stream ->
             []
     end.
 

+ 18 - 9
apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl

@@ -90,7 +90,7 @@
 -export([next/1]).
 
 -export([preserve_iterator/1]).
--export([restore_iterator/3]).
+-export([restore_iterator/2]).
 -export([refresh_iterator/1]).
 
 %% Debug/troubleshooting:
@@ -217,6 +217,7 @@
 
 -opaque db() :: #db{}.
 -opaque iterator() :: #it{}.
+-type serialized_iterator() :: binary().
 -type keymapper() :: #keymapper{}.
 -type keyspace_filter() :: #filter{}.
 
@@ -340,22 +341,30 @@ next(It0 = #it{filter = #filter{keymapper = Keymapper}}) ->
             {error, closed}
     end.
 
--spec preserve_iterator(iterator()) -> binary().
-preserve_iterator(#it{cursor = Cursor}) ->
+-spec preserve_iterator(iterator()) -> serialized_iterator().
+preserve_iterator(#it{
+    cursor = Cursor,
+    filter = #filter{
+        topic_filter = TopicFilter,
+        start_time = StartTime
+    }
+}) ->
     State = #{
         v => 1,
-        cursor => Cursor
+        cursor => Cursor,
+        replay => {TopicFilter, StartTime}
     },
     term_to_binary(State).
 
--spec restore_iterator(db(), emqx_ds:replay(), binary()) ->
+-spec restore_iterator(db(), serialized_iterator()) ->
     {ok, iterator()} | {error, _TODO}.
-restore_iterator(DB, Replay, Serial) when is_binary(Serial) ->
+restore_iterator(DB, Serial) when is_binary(Serial) ->
     State = binary_to_term(Serial),
-    restore_iterator(DB, Replay, State);
-restore_iterator(DB, Replay, #{
+    restore_iterator(DB, State);
+restore_iterator(DB, #{
     v := 1,
-    cursor := Cursor
+    cursor := Cursor,
+    replay := Replay = {_TopicFilter, _StartTime}
 }) ->
     case make_iterator(DB, Replay) of
         {ok, It} when Cursor == undefined ->

+ 44 - 13
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -12,7 +12,7 @@
 -export([store/5]).
 -export([delete/4]).
 
--export([make_iterator/2, next/1]).
+-export([make_iterator/2, next/1, next/2]).
 
 -export([
     preserve_iterator/2,
@@ -131,7 +131,7 @@
 -callback make_iterator(_Schema, emqx_ds:replay()) ->
     {ok, _It} | {error, _}.
 
--callback restore_iterator(_Schema, emqx_ds:replay(), binary()) -> {ok, _It} | {error, _}.
+-callback restore_iterator(_Schema, _Serialized :: binary()) -> {ok, _It} | {error, _}.
 
 -callback preserve_iterator(_It) -> term().
 
@@ -175,21 +175,52 @@ make_iterator(Shard, Replay = {_, StartTime}) ->
         replay = Replay
     }).
 
--spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}.
-next(It = #it{module = Mod, data = ItData}) ->
+-spec next(iterator()) -> {ok, iterator(), [binary()]} | end_of_stream.
+next(It = #it{}) ->
+    next(It, _BatchSize = 1).
+
+-spec next(iterator(), pos_integer()) -> {ok, iterator(), [binary()]} | end_of_stream.
+next(#it{data = {?MODULE, end_of_stream}}, _BatchSize) ->
+    end_of_stream;
+next(
+    It = #it{shard = Shard, module = Mod, gen = Gen, data = {?MODULE, retry, Serialized}}, BatchSize
+) ->
+    #{data := DBData} = meta_get_gen(Shard, Gen),
+    {ok, ItData} = Mod:restore_iterator(DBData, Serialized),
+    next(It#it{data = ItData}, BatchSize);
+next(It = #it{}, BatchSize) ->
+    do_next(It, BatchSize, _Acc = []).
+
+-spec do_next(iterator(), non_neg_integer(), [binary()]) ->
+    {ok, iterator(), [binary()]} | end_of_stream.
+do_next(It, N, Acc) when N =< 0 ->
+    {ok, It, lists:reverse(Acc)};
+do_next(It = #it{module = Mod, data = ItData}, N, Acc) ->
     case Mod:next(ItData) of
         {value, Val, ItDataNext} ->
-            {value, Val, It#it{data = ItDataNext}};
-        {error, _} = Error ->
-            Error;
+            do_next(It#it{data = ItDataNext}, N - 1, [Val | Acc]);
+        {error, _} = _Error ->
+            %% todo: log?
+            %% iterator might be invalid now; will need to re-open it.
+            Serialized = Mod:preserve_iterator(ItData),
+            {ok, It#it{data = {?MODULE, retry, Serialized}}, lists:reverse(Acc)};
         none ->
             case open_next_iterator(It) of
                 {ok, ItNext} ->
-                    next(ItNext);
-                {error, _} = Error ->
-                    Error;
+                    do_next(ItNext, N, Acc);
+                {error, _} = _Error ->
+                    %% todo: log?
+                    %% fixme: only bad options may lead to this?
+                    %% return an "empty" iterator to be re-opened when retrying?
+                    Serialized = Mod:preserve_iterator(ItData),
+                    {ok, It#it{data = {?MODULE, retry, Serialized}}, lists:reverse(Acc)};
                 none ->
-                    none
+                    case Acc of
+                        [] ->
+                            end_of_stream;
+                        _ ->
+                            {ok, It#it{data = {?MODULE, end_of_stream}}, lists:reverse(Acc)}
+                    end
             end
     end.
 
@@ -407,8 +438,8 @@ open_iterator(#{module := Mod, data := Data}, It = #it{}) ->
 
 -spec open_restore_iterator(generation(), iterator(), binary()) ->
     {ok, iterator()} | {error, _Reason}.
-open_restore_iterator(#{module := Mod, data := Data}, It = #it{replay = Replay}, Serial) ->
-    case Mod:restore_iterator(Data, Replay, Serial) of
+open_restore_iterator(#{module := Mod, data := Data}, It = #it{}, Serial) ->
+    case Mod:restore_iterator(Data, Serial) of
         {ok, ItData} ->
             {ok, It#it{module = Mod, data = ItData}};
         Err ->

+ 10 - 11
apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl

@@ -201,7 +201,7 @@ t_iterate_multigen_preserve_restore(_Config) ->
     ok = emqx_ds_storage_layer:preserve_iterator(It3, ReplayID),
     {ok, It4} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID),
     {It5, Res200} = iterate(It4, 1000),
-    ?assertEqual(none, It5),
+    ?assertEqual({end_of_stream, []}, iterate(It5, 1)),
     ?assertEqual(
         lists:sort([{Topic, TS} || Topic <- TopicsMatching, TS <- Timestamps]),
         lists:sort([binary_to_term(Payload) || Payload <- Res10 ++ Res100 ++ Res200])
@@ -224,21 +224,20 @@ iterate(DB, TopicFilter, StartTime) ->
 
 iterate(It) ->
     case emqx_ds_storage_layer:next(It) of
-        {value, Payload, ItNext} ->
+        {ok, ItNext, [Payload]} ->
             [Payload | iterate(ItNext)];
-        none ->
+        end_of_stream ->
             []
     end.
 
-iterate(It, 0) ->
-    {It, []};
+iterate(end_of_stream, _N) ->
+    {end_of_stream, []};
 iterate(It, N) ->
-    case emqx_ds_storage_layer:next(It) of
-        {value, Payload, ItNext} ->
-            {ItFinal, Ps} = iterate(ItNext, N - 1),
-            {ItFinal, [Payload | Ps]};
-        none ->
-            {none, []}
+    case emqx_ds_storage_layer:next(It, N) of
+        {ok, ItFinal, Payloads} ->
+            {ItFinal, Payloads};
+        end_of_stream ->
+            {end_of_stream, []}
     end.
 
 iterator(DB, TopicFilter, StartTime) ->

+ 2 - 5
apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl

@@ -225,12 +225,9 @@ run_iterator_commands([iterate | Rest], It, Ctx) ->
             []
     end;
 run_iterator_commands([{preserve, restore} | Rest], It, Ctx) ->
-    #{
-        db := DB,
-        replay := Replay
-    } = Ctx,
+    #{db := DB} = Ctx,
     Serial = emqx_ds_message_storage_bitmask:preserve_iterator(It),
-    {ok, ItNext} = emqx_ds_message_storage_bitmask:restore_iterator(DB, Replay, Serial),
+    {ok, ItNext} = emqx_ds_message_storage_bitmask:restore_iterator(DB, Serial),
     run_iterator_commands(Rest, ItNext, Ctx);
 run_iterator_commands([], It, _Ctx) ->
     iterate_db(It).