Просмотр исходного кода

Merge pull request #12125 from thalesmg/ds-cache-m-20231206

chore(ds): return DS key from `next` and add `update_iterator` callback
Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
dbc8141930

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -19,6 +19,7 @@
 {emqx_delayed,1}.
 {emqx_delayed,2}.
 {emqx_ds,1}.
+{emqx_ds,2}.
 {emqx_eviction_agent,1}.
 {emqx_eviction_agent,2}.
 {emqx_exhook,1}.

+ 2 - 2
apps/emqx/src/emqx_persistent_message_ds_replayer.erl

@@ -431,7 +431,7 @@ get_commit_next(comp, #inflight{commits = Commits}) ->
 
 publish_fetch(PreprocFun, FirstSeqno, Messages) ->
     flatmapfoldl(
-        fun(MessageIn, Acc) ->
+        fun({_DSKey, MessageIn}, Acc) ->
             Message = PreprocFun(MessageIn),
             publish_fetch(Message, Acc)
         end,
@@ -450,7 +450,7 @@ publish_fetch(Messages, Seqno) ->
 publish_replay(PreprocFun, Commits, FirstSeqno, Messages) ->
     #{ack := AckedUntil, comp := CompUntil, rec := RecUntil} = Commits,
     flatmapfoldl(
-        fun(MessageIn, Acc) ->
+        fun({_DSKey, MessageIn}, Acc) ->
             Message = PreprocFun(MessageIn),
             publish_replay(Message, AckedUntil, CompUntil, RecUntil, Acc)
         end,

+ 2 - 2
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -412,8 +412,8 @@ consume(It) ->
     case emqx_ds:next(?PERSISTENT_MESSAGE_DB, It, 100) of
         {ok, _NIt, _Msgs = []} ->
             [];
-        {ok, NIt, Msgs} ->
-            Msgs ++ consume(NIt);
+        {ok, NIt, MsgsAndKeys} ->
+            [Msg || {_DSKey, Msg} <- MsgsAndKeys] ++ consume(NIt);
         {ok, end_of_stream} ->
             []
     end.

+ 13 - 2
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -28,7 +28,7 @@
 -export([store_batch/2, store_batch/3]).
 
 %% Message replay API:
--export([get_streams/3, make_iterator/4, next/3]).
+-export([get_streams/3, make_iterator/4, update_iterator/3, next/3]).
 
 %% Misc. API:
 -export([]).
@@ -43,6 +43,7 @@
     stream_rank/0,
     iterator/0,
     message_id/0,
+    message_key/0,
     next_result/1, next_result/0,
     store_batch_result/0,
     make_iterator_result/1, make_iterator_result/0,
@@ -74,6 +75,8 @@
 
 -type ds_specific_stream() :: term().
 
+-type message_key() :: binary().
+
 -type store_batch_result() :: ok | {error, _}.
 
 -type make_iterator_result(Iterator) :: {ok, Iterator} | {error, _}.
@@ -81,7 +84,7 @@
 -type make_iterator_result() :: make_iterator_result(iterator()).
 
 -type next_result(Iterator) ::
-    {ok, Iterator, [emqx_types:message()]} | {ok, end_of_stream} | {error, _}.
+    {ok, Iterator, [{message_key(), emqx_types:message()}]} | {ok, end_of_stream} | {error, _}.
 
 -type next_result() :: next_result(iterator()).
 
@@ -125,6 +128,9 @@
 -callback make_iterator(db(), ds_specific_stream(), topic_filter(), time()) ->
     make_iterator_result(ds_specific_iterator()).
 
+-callback update_iterator(db(), ds_specific_iterator(), message_key()) ->
+    make_iterator_result(ds_specific_iterator()).
+
 -callback next(db(), Iterator, pos_integer()) -> next_result(Iterator).
 
 %%================================================================================
@@ -212,6 +218,11 @@ get_streams(DB, TopicFilter, StartTime) ->
 make_iterator(DB, Stream, TopicFilter, StartTime) ->
     ?module(DB):make_iterator(DB, Stream, TopicFilter, StartTime).
 
+-spec update_iterator(db(), iterator(), message_key()) ->
+    make_iterator_result().
+update_iterator(DB, OldIter, DSKey) ->
+    ?module(DB):update_iterator(DB, OldIter, DSKey).
+
 -spec next(db(), iterator(), pos_integer()) -> next_result().
 next(DB, Iter, BatchSize) ->
     ?module(DB):next(DB, Iter, BatchSize).

+ 38 - 0
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -27,6 +27,7 @@
     store_batch/3,
     get_streams/3,
     make_iterator/4,
+    update_iterator/3,
     next/3
 ]).
 
@@ -36,6 +37,7 @@
     do_store_batch_v1/4,
     do_get_streams_v1/4,
     do_make_iterator_v1/5,
+    do_update_iterator_v2/4,
     do_next_v1/4
 ]).
 
@@ -170,6 +172,30 @@ make_iterator(DB, Stream, TopicFilter, StartTime) ->
             Err
     end.
 
+-spec update_iterator(
+    emqx_ds:db(),
+    iterator(),
+    emqx_ds:message_key()
+) ->
+    emqx_ds:make_iterator_result(iterator()).
+update_iterator(DB, OldIter, DSKey) ->
+    #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter,
+    Node = node_of_shard(DB, Shard),
+    case
+        emqx_ds_proto_v2:update_iterator(
+            Node,
+            DB,
+            Shard,
+            StorageIter,
+            DSKey
+        )
+    of
+        {ok, Iter} ->
+            {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
+        Err = {error, _} ->
+            Err
+    end.
+
 -spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
 next(DB, Iter0, BatchSize) ->
     #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0} = Iter0,
@@ -236,6 +262,18 @@ do_get_streams_v1(DB, Shard, TopicFilter, StartTime) ->
 do_make_iterator_v1(DB, Shard, Stream, TopicFilter, StartTime) ->
     emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime).
 
+-spec do_update_iterator_v2(
+    emqx_ds:db(),
+    emqx_ds_storage_layer:shard_id(),
+    emqx_ds_storage_layer:iterator(),
+    emqx_ds:message_key()
+) ->
+    {ok, emqx_ds_storage_layer:iterator()} | {error, _}.
+do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
+    emqx_ds_storage_layer:update_iterator(
+        {DB, Shard}, OldIter, DSKey
+    ).
+
 -spec do_next_v1(
     emqx_ds:db(),
     emqx_ds_replication_layer:shard_id(),

+ 24 - 2
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -24,7 +24,15 @@
 -export([]).
 
 %% behavior callbacks:
--export([create/4, open/5, store_batch/4, get_streams/4, make_iterator/5, next/4]).
+-export([
+    create/4,
+    open/5,
+    store_batch/4,
+    get_streams/4,
+    make_iterator/5,
+    update_iterator/4,
+    next/4
+]).
 
 %% internal exports:
 -export([format_key/2]).
@@ -236,6 +244,20 @@ make_iterator(
         ?last_seen_key => <<>>
     }}.
 
+-spec update_iterator(
+    emqx_ds_storage_layer:shard_id(),
+    s(),
+    iterator(),
+    emqx_ds:message_key()
+) -> {ok, iterator()}.
+update_iterator(
+    _Shard,
+    _Data,
+    #{?tag := ?IT} = OldIter,
+    DSKey
+) ->
+    {ok, OldIter#{?last_seen_key => DSKey}}.
+
 next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) ->
     %% Compute safe cutoff time.
     %% It's the point in time where the last complete epoch ends, so we need to know
@@ -329,7 +351,7 @@ traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N) ->
             Msg = deserialize(Val),
             case check_message(Cutoff, It, Msg) of
                 true ->
-                    Acc = [Msg | Acc0],
+                    Acc = [{Key, Msg} | Acc0],
                     traverse_interval(ITHandle, Filter, Cutoff, It, Acc, N - 1);
                 false ->
                     traverse_interval(ITHandle, Filter, Cutoff, It, Acc0, N);

+ 30 - 1
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -18,7 +18,15 @@
 -behaviour(gen_server).
 
 %% Replication layer API:
--export([open_shard/2, drop_shard/1, store_batch/3, get_streams/3, make_iterator/4, next/3]).
+-export([
+    open_shard/2,
+    drop_shard/1,
+    store_batch/3,
+    get_streams/3,
+    make_iterator/4,
+    update_iterator/3,
+    next/3
+]).
 
 %% gen_server
 -export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
@@ -192,6 +200,27 @@ make_iterator(
             Err
     end.
 
+-spec update_iterator(
+    shard_id(), iterator(), emqx_ds:message_key()
+) ->
+    emqx_ds:make_iterator_result(iterator()).
+update_iterator(
+    Shard,
+    #{?tag := ?IT, ?generation := GenId, ?enc := OldIter},
+    DSKey
+) ->
+    #{module := Mod, data := GenData} = generation_get(Shard, GenId),
+    case Mod:update_iterator(Shard, GenData, OldIter, DSKey) of
+        {ok, Iter} ->
+            {ok, #{
+                ?tag => ?IT,
+                ?generation => GenId,
+                ?enc => Iter
+            }};
+        {error, _} = Err ->
+            Err
+    end.
+
 -spec next(shard_id(), iterator(), pos_integer()) ->
     emqx_ds:next_result(iterator()).
 next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) ->

+ 21 - 2
apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl

@@ -27,7 +27,15 @@
 -export([]).
 
 %% behavior callbacks:
--export([create/4, open/5, store_batch/4, get_streams/4, make_iterator/5, next/4]).
+-export([
+    create/4,
+    open/5,
+    store_batch/4,
+    get_streams/4,
+    make_iterator/5,
+    update_iterator/4,
+    next/4
+]).
 
 %% internal exports:
 -export([]).
@@ -97,6 +105,17 @@ make_iterator(_Shard, _Data, #stream{}, TopicFilter, StartTime) ->
         start_time = StartTime
     }}.
 
+update_iterator(_Shard, _Data, OldIter, DSKey) ->
+    #it{
+        topic_filter = TopicFilter,
+        start_time = StartTime
+    } = OldIter,
+    {ok, #it{
+        topic_filter = TopicFilter,
+        start_time = StartTime,
+        last_seen_message_key = DSKey
+    }}.
+
 next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) ->
     #it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0,
     {ok, ITHandle} = rocksdb:iterator(DB, CF, []),
@@ -125,7 +144,7 @@ do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) ->
             Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob),
             case emqx_topic:match(Topic, TopicFilter) andalso TS >= StartTime of
                 true ->
-                    do_next(TopicFilter, StartTime, IT, next, NLeft - 1, Key, [Msg | Acc]);
+                    do_next(TopicFilter, StartTime, IT, next, NLeft - 1, Key, [{Key, Msg} | Acc]);
                 false ->
                     do_next(TopicFilter, StartTime, IT, next, NLeft, Key, Acc)
             end;

+ 8 - 2
apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl

@@ -19,7 +19,13 @@
 
 -include_lib("emqx_utils/include/bpapi.hrl").
 %% API:
--export([drop_db/2, store_batch/5, get_streams/5, make_iterator/6, next/5]).
+-export([
+    drop_db/2,
+    store_batch/5,
+    get_streams/5,
+    make_iterator/6,
+    next/5
+]).
 
 %% behavior callbacks:
 -export([introduced_in/0]).
@@ -65,7 +71,7 @@ make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
     emqx_ds_storage_layer:iterator(),
     pos_integer()
 ) ->
-    {ok, emqx_ds_storage_layer:iterator(), [emqx_types:messages()]}
+    {ok, emqx_ds_storage_layer:iterator(), [{emqx_ds:message_key(), emqx_types:messages()}]}
     | {ok, end_of_stream}
     | {error, _}.
 next(Node, DB, Shard, Iter, BatchSize) ->

+ 118 - 0
apps/emqx_durable_storage/src/proto/emqx_ds_proto_v2.erl

@@ -0,0 +1,118 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_ds_proto_v2).
+
+-behavior(emqx_bpapi).
+
+-include_lib("emqx_utils/include/bpapi.hrl").
+%% API:
+-export([
+    drop_db/2,
+    store_batch/5,
+    get_streams/5,
+    make_iterator/6,
+    next/5,
+
+    %% introduced in v2
+    update_iterator/5
+]).
+
+%% behavior callbacks:
+-export([introduced_in/0]).
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+-spec drop_db([node()], emqx_ds:db()) ->
+    [{ok, ok} | erpc:caught_call_exception()].
+drop_db(Node, DB) ->
+    erpc:multicall(Node, emqx_ds_replication_layer, do_drop_db_v1, [DB]).
+
+-spec get_streams(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds:topic_filter(),
+    emqx_ds:time()
+) ->
+    [{integer(), emqx_ds_storage_layer:stream()}].
+get_streams(Node, DB, Shard, TopicFilter, Time) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]).
+
+-spec make_iterator(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:stream(),
+    emqx_ds:topic_filter(),
+    emqx_ds:time()
+) ->
+    {ok, emqx_ds_storage_layer:iterator()} | {error, _}.
+make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [
+        DB, Shard, Stream, TopicFilter, StartTime
+    ]).
+
+-spec next(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:iterator(),
+    pos_integer()
+) ->
+    {ok, emqx_ds_storage_layer:iterator(), [{emqx_ds:message_key(), emqx_types:messages()}]}
+    | {ok, end_of_stream}
+    | {error, _}.
+next(Node, DB, Shard, Iter, BatchSize) ->
+    emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]).
+
+-spec store_batch(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_replication_layer:batch(),
+    emqx_ds:message_store_opts()
+) ->
+    emqx_ds:store_batch_result().
+store_batch(Node, DB, Shard, Batch, Options) ->
+    emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_store_batch_v1, [
+        DB, Shard, Batch, Options
+    ]).
+
+%%--------------------------------------------------------------------------------
+%% Introduced in V2
+%%--------------------------------------------------------------------------------
+
+-spec update_iterator(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:iterator(),
+    emqx_ds:message_key()
+) ->
+    {ok, emqx_ds_storage_layer:iterator()} | {error, _}.
+update_iterator(Node, DB, Shard, OldIter, DSKey) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_update_iterator_v2, [
+        DB, Shard, OldIter, DSKey
+    ]).
+
+%%================================================================================
+%% behavior callbacks
+%%================================================================================
+
+introduced_in() ->
+    "5.5.0".

+ 27 - 2
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -101,7 +101,7 @@ t_03_smoke_iterate(_Config) ->
     [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
     {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
     {ok, Iter, Batch} = iterate(DB, Iter0, 1),
-    ?assertEqual(Msgs, Batch, {Iter0, Iter}).
+    ?assertEqual(Msgs, [Msg || {_Key, Msg} <- Batch], {Iter0, Iter}).
 
 %% Verify that iterators survive restart of the application. This is
 %% an important property, since the lifetime of the iterators is tied
@@ -128,7 +128,32 @@ t_04_restart(_Config) ->
     ok = emqx_ds:open_db(DB, opts()),
     %% The old iterator should be still operational:
     {ok, Iter, Batch} = iterate(DB, Iter0, 1),
-    ?assertEqual(Msgs, Batch, {Iter0, Iter}).
+    ?assertEqual(Msgs, [Msg || {_Key, Msg} <- Batch], {Iter0, Iter}).
+
+%% Check that we can create iterators directly from DS keys.
+t_05_update_iterator(_Config) ->
+    DB = ?FUNCTION_NAME,
+    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+    TopicFilter = ['#'],
+    StartTime = 0,
+    Msgs = [
+        message(<<"foo/bar">>, <<"1">>, 0),
+        message(<<"foo">>, <<"2">>, 1),
+        message(<<"bar/bar">>, <<"3">>, 2)
+    ],
+    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
+    [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
+    {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
+    Res0 = emqx_ds:next(DB, Iter0, 1),
+    ?assertMatch({ok, _OldIter, [{_Key0, _Msg0}]}, Res0),
+    {ok, OldIter, [{Key0, Msg0}]} = Res0,
+    Res1 = emqx_ds:update_iterator(DB, OldIter, Key0),
+    ?assertMatch({ok, _Iter1}, Res1),
+    {ok, Iter1} = Res1,
+    {ok, FinalIter, Batch} = iterate(DB, Iter1, 1),
+    AllMsgs = [Msg0 | [Msg || {_Key, Msg} <- Batch]],
+    ?assertEqual(Msgs, AllMsgs, #{from_key => Iter1, final_iter => FinalIter}),
+    ok.
 
 message(Topic, Payload, PublishedAt) ->
     #message{

+ 3 - 2
apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl

@@ -64,7 +64,8 @@ t_iterate(_Config) ->
         begin
             [{_Rank, Stream}] = emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), 0),
             {ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, Stream, parse_topic(Topic), 0),
-            {ok, NextIt, Messages} = emqx_ds_storage_layer:next(?SHARD, It, 100),
+            {ok, NextIt, MessagesAndKeys} = emqx_ds_storage_layer:next(?SHARD, It, 100),
+            Messages = [Msg || {_DSKey, Msg} <- MessagesAndKeys],
             ?assertEqual(
                 lists:map(fun integer_to_binary/1, Timestamps),
                 payloads(Messages)
@@ -249,7 +250,7 @@ dump_stream(Shard, Stream, TopicFilter, StartTime) ->
                 {ok, _NextIt, []} ->
                     [];
                 {ok, NextIt, Batch} ->
-                    Batch ++ F(NextIt, N - 1)
+                    [Msg || {_DSKey, Msg} <- Batch] ++ F(NextIt, N - 1)
             end
     end,
     MaxIterations = 1000000,