Przeglądaj źródła

feat(dsraft): Support async poll in Raft backend

ieQu1 1 rok temu
rodzic
commit
a65d22cc38

+ 2 - 21
apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl

@@ -398,11 +398,7 @@ poll(DB, Iterators, PollOpts = #{timeout := Timeout}) ->
     end,
     %% Spawn a helper process that will notify the caller when the
     %% poll times out:
-    _Completion = spawn_link(
-        fun() ->
-            send_poll_timeout(ReplyTo, Timeout)
-        end
-    ),
+    emqx_ds_lib:send_poll_timeout(ReplyTo, Timeout),
     %% Submit poll jobs:
     lists:foreach(
         fun({ItKey, It = #{?tag := ?IT, ?shard := Shard}}) ->
@@ -415,15 +411,7 @@ poll(DB, Iterators, PollOpts = #{timeout := Timeout}) ->
     {ok, ReplyTo}.
 
 unpack_iterator(Shard, #{?tag := ?IT, ?enc := Iterator}) ->
-    {Stream, TopicFilter, DSKey, TS} = emqx_ds_storage_layer:unpack_iterator(Shard, Iterator),
-    MsgMatcher = emqx_ds_storage_layer:message_matcher(Shard, Iterator),
-    #{
-        stream => Stream,
-        topic_filter => TopicFilter,
-        last_seen_key => DSKey,
-        timestamp => TS,
-        message_matcher => MsgMatcher
-    }.
+    emqx_ds_storage_layer:unpack_iterator(Shard, Iterator).
 
 scan_stream(ShardId, Stream, TopicFilter, StartMsg, BatchSize) ->
     {DB, _} = ShardId,
@@ -532,10 +520,3 @@ timeus_to_timestamp(undefined) ->
     undefined;
 timeus_to_timestamp(TimestampUs) ->
     TimestampUs div 1000.
-
-send_poll_timeout(ReplyTo, Timeout) ->
-    receive
-    after Timeout + 10 ->
-        logger:debug("Timeout for poll ~p", [ReplyTo]),
-        ReplyTo ! #poll_reply{ref = ReplyTo, payload = poll_timeout}
-    end.

+ 17 - 1
apps/emqx_ds_builtin_raft/src/emqx_ds_builtin_raft_db_sup.erl

@@ -180,7 +180,8 @@ init({#?shard_sup{db = DB, shard = Shard}, _}) ->
     Opts = emqx_ds_replication_layer_meta:db_config(DB),
     Children = [
         shard_storage_spec(DB, Shard, Opts),
-        shard_replication_spec(DB, Shard, Opts)
+        shard_replication_spec(DB, Shard, Opts),
+        shard_beamformers_spec(DB, Shard)
     ],
     {ok, {SupFlags, Children}}.
 
@@ -276,6 +277,21 @@ egress_spec(DB, Shard) ->
         type => worker
     }.
 
+shard_beamformers_spec(DB, Shard) ->
+    %% TODO: don't hardcode value
+    BeamformerOpts = #{
+        n_workers => 5
+    },
+    #{
+        id => {Shard, beamformers},
+        type => supervisor,
+        shutdown => infinity,
+        start =>
+            {emqx_ds_beamformer_sup, start_link, [
+                emqx_ds_replication_layer, {DB, Shard}, BeamformerOpts
+            ]}
+    }.
+
 ensure_started(Res) ->
     case Res of
         {ok, _Pid} ->

+ 98 - 21
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl

@@ -23,7 +23,6 @@
     get_delete_streams/3,
     make_iterator/4,
     make_delete_iterator/4,
-    update_iterator/3,
     next/3,
     poll/3,
     delete_next/4,
@@ -48,6 +47,7 @@
     do_get_delete_streams_v4/4,
     do_make_delete_iterator_v4/5,
     do_delete_next_v4/5,
+    do_poll_v1/5,
     %% Obsolete:
     do_store_batch_v1/4,
     do_make_iterator_v1/5,
@@ -69,6 +69,15 @@
     snapshot_module/0
 ]).
 
+-behaviour(emqx_ds_beamformer).
+-export(
+    [
+        unpack_iterator/2,
+        scan_stream/5,
+        update_iterator/3
+    ]
+).
+
 -export_type([
     shard_id/0,
     builtin_db_opts/0,
@@ -349,17 +358,6 @@ make_delete_iterator(DB, Stream, TopicFilter, StartTime) ->
             Error
     end.
 
--spec update_iterator(emqx_ds:db(), iterator(), emqx_ds:message_key()) ->
-    emqx_ds:make_iterator_result(iterator()).
-update_iterator(DB, OldIter, DSKey) ->
-    #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter,
-    case ra_update_iterator(DB, Shard, StorageIter, DSKey) of
-        {ok, Iter} ->
-            {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
-        Error = {error, _, _} ->
-            Error
-    end.
-
 -spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
 next(DB, Iter0, BatchSize) ->
     #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0} = Iter0,
@@ -385,8 +383,34 @@ next(DB, Iter0, BatchSize) ->
 
 -spec poll(emqx_ds:db(), emqx_ds:poll_iterators(), emqx_ds:poll_opts()) ->
     {ok, reference()}.
-poll(_DB, _Iterators, _PollOpts) ->
-    error(not_implemented).
+poll(DB, Iterators, PollOpts = #{timeout := Timeout}) ->
+    %% Create a new alias, if not already provided:
+    case PollOpts of
+        #{reply_to := ReplyTo} ->
+            ok;
+        _ ->
+            ReplyTo = alias([explicit_unalias])
+    end,
+    %% Spawn a helper process that will notify the caller when the
+    %% poll times out:
+    _Completion = emqx_ds_lib:send_poll_timeout(ReplyTo, Timeout),
+    %% Submit poll jobs:
+    Groups = maps:groups_from_list(
+        fun({_Token, #{?tag := ?IT, ?shard := Shard}}) -> Shard end,
+        Iterators
+    ),
+    maps:foreach(
+        fun(Shard, ShardIts) ->
+            ok = ra_poll(
+                DB,
+                Shard,
+                [{{ReplyTo, Token}, It} || {Token, It} <- ShardIts],
+                PollOpts
+            )
+        end,
+        Groups
+    ),
+    {ok, ReplyTo}.
 
 -spec delete_next(emqx_ds:db(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) ->
     emqx_ds:delete_next_result(delete_iterator()).
@@ -647,6 +671,24 @@ do_drop_generation_v3(_DB, _ShardId, _GenId) ->
 do_get_delete_streams_v4(DB, Shard, TopicFilter, StartTime) ->
     emqx_ds_storage_layer:get_delete_streams({DB, Shard}, TopicFilter, StartTime).
 
+-spec do_poll_v1(
+    node(),
+    emqx_ds:db(),
+    shard_id(),
+    [{emqx_ds_beamformer:return_addr(_), emqx_ds_storage_layer:iterator()}],
+    emqx_ds:poll_opts()
+) ->
+    ok.
+do_poll_v1(SourceNode, DB, Shard, Iterators, PollOpts) ->
+    ShardId = {DB, Shard},
+    ?tp(ds_raft_do_poll, #{shard => ShardId, iterators => Iterators}),
+    lists:foreach(
+        fun({RAddr, It}) ->
+            emqx_ds_beamformer:poll(SourceNode, RAddr, ShardId, It, PollOpts)
+        end,
+        Iterators
+    ).
+
 %%================================================================================
 %% Internal functions
 %%================================================================================
@@ -794,13 +836,13 @@ ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
         )
     ).
 
-ra_update_iterator(DB, Shard, Iter, DSKey) ->
-    ?SHARD_RPC(
-        DB,
-        Shard,
-        Node,
-        ?SAFE_ERPC(emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey))
-    ).
+%% ra_update_iterator(DB, Shard, Iter, DSKey) ->
+%%     ?SHARD_RPC(
+%%         DB,
+%%         Shard,
+%%         Node,
+%%         ?SAFE_ERPC(emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey))
+%%     ).
 
 ra_next(DB, Shard, Iter, BatchSize) ->
     ?SHARD_RPC(
@@ -815,6 +857,14 @@ ra_next(DB, Shard, Iter, BatchSize) ->
         end
     ).
 
+ra_poll(DB, Shard, Iterators, PollOpts) ->
+    ?SHARD_RPC(
+        DB,
+        Shard,
+        DestNode,
+        ?SAFE_ERPC(emqx_ds_proto_v5:poll(DestNode, node(), DB, Shard, Iterators, PollOpts))
+    ).
+
 ra_delete_next(DB, Shard, Iter, Selector, BatchSize) ->
     ?SHARD_RPC(
         DB,
@@ -1076,6 +1126,33 @@ timeus_to_timestamp(TimestampUs) ->
 snapshot_module() ->
     emqx_ds_replication_snapshot.
 
+unpack_iterator(Shard, #{?tag := ?IT, ?enc := Iterator}) ->
+    emqx_ds_storage_layer:unpack_iterator(Shard, Iterator).
+
+scan_stream(ShardId, Stream, TopicFilter, StartMsg, BatchSize) ->
+    {DB, Shard} = ShardId,
+    ?IF_SHARD_READY(
+        DB,
+        Shard,
+        begin
+            Now = current_timestamp(DB, Shard),
+            emqx_ds_storage_layer:scan_stream(
+                ShardId, Stream, TopicFilter, Now, StartMsg, BatchSize
+            )
+        end
+    ).
+
+-spec update_iterator(emqx_ds_storage_layer:shard_id(), iterator(), emqx_ds:message_key()) ->
+    emqx_ds:make_iterator_result(iterator()).
+update_iterator(ShardId, OldIter, DSKey) ->
+    #{?tag := ?IT, ?enc := Inner0} = OldIter,
+    case emqx_ds_storage_layer:update_iterator(ShardId, Inner0, DSKey) of
+        {ok, Inner} ->
+            {ok, OldIter#{?enc => Inner}};
+        Err = {error, _, _} ->
+            Err
+    end.
+
 handle_custom_event(DBShard, Latest, Event) ->
     try
         Events = emqx_ds_storage_layer:handle_event(DBShard, Latest, Event),

+ 0 - 88
apps/emqx_ds_builtin_raft/src/proto/emqx_ds_proto_v1.erl

@@ -1,88 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%--------------------------------------------------------------------
--module(emqx_ds_proto_v1).
-
--behavior(emqx_bpapi).
-
--include_lib("emqx_utils/include/bpapi.hrl").
-%% API:
--export([
-    drop_db/2,
-    store_batch/5,
-    get_streams/5,
-    make_iterator/6,
-    next/5
-]).
-
-%% behavior callbacks:
--export([introduced_in/0, deprecated_since/0]).
-
-%%================================================================================
-%% API functions
-%%================================================================================
-
--spec drop_db([node()], emqx_ds:db()) -> [emqx_rpc:erpc(ok)].
-drop_db(Node, DB) ->
-    erpc:multicall(Node, emqx_ds_replication_layer, do_drop_db_v1, [DB]).
-
--spec get_streams(
-    node(),
-    emqx_ds:db(),
-    emqx_ds_replication_layer:shard_id(),
-    emqx_ds:topic_filter(),
-    emqx_ds:time()
-) ->
-    [{integer(), emqx_ds_storage_layer:stream_v1()}].
-get_streams(Node, DB, Shard, TopicFilter, Time) ->
-    erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]).
-
--spec make_iterator(
-    node(),
-    emqx_ds:db(),
-    emqx_ds_replication_layer:shard_id(),
-    emqx_ds_storage_layer:stream_v1(),
-    emqx_ds:topic_filter(),
-    emqx_ds:time()
-) ->
-    {ok, emqx_ds_storage_layer:iterator()} | {error, _}.
-make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
-    erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [
-        DB, Shard, Stream, TopicFilter, StartTime
-    ]).
-
--spec next(
-    node(),
-    emqx_ds:db(),
-    emqx_ds_replication_layer:shard_id(),
-    emqx_ds_storage_layer:iterator(),
-    pos_integer()
-) ->
-    {ok, emqx_ds_storage_layer:iterator(), [{emqx_ds:message_key(), [emqx_types:message()]}]}
-    | {ok, end_of_stream}
-    | {error, _}.
-next(Node, DB, Shard, Iter, BatchSize) ->
-    emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]).
-
--spec store_batch(
-    node(),
-    emqx_ds:db(),
-    emqx_ds_replication_layer:shard_id(),
-    emqx_ds_replication_layer:batch(),
-    emqx_ds:message_store_opts()
-) ->
-    emqx_ds:store_batch_result().
-store_batch(Node, DB, Shard, Batch, Options) ->
-    emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_store_batch_v1, [
-        DB, Shard, Batch, Options
-    ]).
-
-%%================================================================================
-%% behavior callbacks
-%%================================================================================
-
-introduced_in() ->
-    "5.4.0".
-
-deprecated_since() ->
-    "5.5.0".

+ 0 - 115
apps/emqx_ds_builtin_raft/src/proto/emqx_ds_proto_v2.erl

@@ -1,115 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%--------------------------------------------------------------------
--module(emqx_ds_proto_v2).
-
--behavior(emqx_bpapi).
-
--include_lib("emqx_utils/include/bpapi.hrl").
-%% API:
--export([
-    drop_db/2,
-    store_batch/5,
-    get_streams/5,
-    make_iterator/6,
-    next/5,
-
-    %% introduced in v2
-    update_iterator/5,
-    add_generation/2
-]).
-
-%% behavior callbacks:
--export([introduced_in/0, deprecated_since/0]).
-
-%%================================================================================
-%% API functions
-%%================================================================================
-
--spec drop_db([node()], emqx_ds:db()) ->
-    [{ok, ok} | {error, _}].
-drop_db(Node, DB) ->
-    erpc:multicall(Node, emqx_ds_replication_layer, do_drop_db_v1, [DB]).
-
--spec get_streams(
-    node(),
-    emqx_ds:db(),
-    emqx_ds_replication_layer:shard_id(),
-    emqx_ds:topic_filter(),
-    emqx_ds:time()
-) ->
-    [{integer(), emqx_ds_storage_layer:stream_v1()}].
-get_streams(Node, DB, Shard, TopicFilter, Time) ->
-    erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]).
-
--spec make_iterator(
-    node(),
-    emqx_ds:db(),
-    emqx_ds_replication_layer:shard_id(),
-    emqx_ds_storage_layer:stream_v1(),
-    emqx_ds:topic_filter(),
-    emqx_ds:time()
-) ->
-    {ok, emqx_ds_storage_layer:iterator()} | {error, _}.
-make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
-    erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [
-        DB, Shard, Stream, TopicFilter, StartTime
-    ]).
-
--spec next(
-    node(),
-    emqx_ds:db(),
-    emqx_ds_replication_layer:shard_id(),
-    emqx_ds_storage_layer:iterator(),
-    pos_integer()
-) ->
-    {ok, emqx_ds_storage_layer:iterator(), [{emqx_ds:message_key(), [emqx_types:message()]}]}
-    | {ok, end_of_stream}
-    | {error, _}.
-next(Node, DB, Shard, Iter, BatchSize) ->
-    emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]).
-
--spec store_batch(
-    node(),
-    emqx_ds:db(),
-    emqx_ds_replication_layer:shard_id(),
-    emqx_ds_replication_layer:batch(),
-    emqx_ds:message_store_opts()
-) ->
-    emqx_ds:store_batch_result().
-store_batch(Node, DB, Shard, Batch, Options) ->
-    emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_store_batch_v1, [
-        DB, Shard, Batch, Options
-    ]).
-
-%%--------------------------------------------------------------------------------
-%% Introduced in V2
-%%--------------------------------------------------------------------------------
-
--spec update_iterator(
-    node(),
-    emqx_ds:db(),
-    emqx_ds_replication_layer:shard_id(),
-    emqx_ds_storage_layer:iterator(),
-    emqx_ds:message_key()
-) ->
-    {ok, emqx_ds_storage_layer:iterator()} | {error, _}.
-update_iterator(Node, DB, Shard, OldIter, DSKey) ->
-    erpc:call(Node, emqx_ds_replication_layer, do_update_iterator_v2, [
-        DB, Shard, OldIter, DSKey
-    ]).
-
--spec add_generation([node()], emqx_ds:db()) ->
-    [{ok, ok} | {error, _}].
-add_generation(Node, DB) ->
-    erpc:multicall(Node, emqx_ds_replication_layer, do_add_generation_v2, [DB]).
-
-%%================================================================================
-%% behavior callbacks
-%%================================================================================
-
-introduced_in() ->
-    "5.5.0".
-
-deprecated_since() ->
-    "5.5.0".

+ 4 - 1
apps/emqx_ds_builtin_raft/src/proto/emqx_ds_proto_v4.erl

@@ -25,7 +25,7 @@
 ]).
 
 %% behavior callbacks:
--export([introduced_in/0]).
+-export([introduced_in/0, deprecated_since/0]).
 
 %%================================================================================
 %% API functions
@@ -180,3 +180,6 @@ delete_next(Node, DB, Shard, Iter, Selector, BatchSize) ->
 
 introduced_in() ->
     "5.6.0".
+
+deprecated_since() ->
+    "5.8.0".

+ 82 - 22
apps/emqx_ds_builtin_raft/src/proto/emqx_ds_proto_v3.erl

@@ -1,7 +1,7 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_ds_proto_v3).
+-module(emqx_ds_proto_v5).
 
 -behavior(emqx_bpapi).
 
@@ -13,16 +13,20 @@
     get_streams/5,
     make_iterator/6,
     next/5,
+    poll/6,
     update_iterator/5,
     add_generation/2,
-
-    %% introduced in v3
     list_generations_with_lifetimes/3,
-    drop_generation/4
+    drop_generation/4,
+
+    %% introduced in v4
+    get_delete_streams/5,
+    make_delete_iterator/6,
+    delete_next/6
 ]).
 
 %% behavior callbacks:
--export([introduced_in/0, deprecated_since/0]).
+-export([introduced_in/0]).
 
 %%================================================================================
 %% API functions
@@ -40,21 +44,21 @@ drop_db(Node, DB) ->
     emqx_ds:topic_filter(),
     emqx_ds:time()
 ) ->
-    [{integer(), emqx_ds_storage_layer:stream_v1()}].
+    [{integer(), emqx_ds_storage_layer:stream()}].
 get_streams(Node, DB, Shard, TopicFilter, Time) ->
-    erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]).
+    erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v2, [DB, Shard, TopicFilter, Time]).
 
 -spec make_iterator(
     node(),
     emqx_ds:db(),
     emqx_ds_replication_layer:shard_id(),
-    emqx_ds_storage_layer:stream_v1(),
+    emqx_ds_storage_layer:stream(),
     emqx_ds:topic_filter(),
     emqx_ds:time()
 ) ->
-    {ok, emqx_ds_storage_layer:iterator()} | {error, _}.
+    emqx_ds:make_iterator_result().
 make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
-    erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [
+    erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v2, [
         DB, Shard, Stream, TopicFilter, StartTime
     ]).
 
@@ -65,9 +69,7 @@ make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
     emqx_ds_storage_layer:iterator(),
     pos_integer()
 ) ->
-    {ok, emqx_ds_storage_layer:iterator(), [{emqx_ds:message_key(), [emqx_types:message()]}]}
-    | {ok, end_of_stream}
-    | {error, _}.
+    emqx_rpc:call_result(emqx_ds:next_result()).
 next(Node, DB, Shard, Iter, BatchSize) ->
     emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]).
 
@@ -84,6 +86,7 @@ store_batch(Node, DB, Shard, Batch, Options) ->
         DB, Shard, Batch, Options
     ]).
 
+%% Candidate for removal:
 -spec update_iterator(
     node(),
     emqx_ds:db(),
@@ -91,21 +94,31 @@ store_batch(Node, DB, Shard, Batch, Options) ->
     emqx_ds_storage_layer:iterator(),
     emqx_ds:message_key()
 ) ->
-    {ok, emqx_ds_storage_layer:iterator()} | {error, _}.
+    emqx_ds:make_iterator_result().
 update_iterator(Node, DB, Shard, OldIter, DSKey) ->
     erpc:call(Node, emqx_ds_replication_layer, do_update_iterator_v2, [
         DB, Shard, OldIter, DSKey
     ]).
 
+-spec poll(
+    node(),
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    [{emqx_ds_beamformer:return_addr(_), emqx_ds_storage_layer:iterator()}],
+    emqx_ds:poll_opts()
+) ->
+    ok.
+poll(DestNode, SourceNode, DB, Shard, Iterators, PollOpts) ->
+    erpc:call(DestNode, emqx_ds_replication_layer, do_poll_v1, [
+        SourceNode, DB, Shard, Iterators, PollOpts
+    ]).
+
 -spec add_generation([node()], emqx_ds:db()) ->
     [{ok, ok} | {error, _}].
 add_generation(Node, DB) ->
     erpc:multicall(Node, emqx_ds_replication_layer, do_add_generation_v2, [DB]).
 
-%%--------------------------------------------------------------------------------
-%% Introduced in V3
-%%--------------------------------------------------------------------------------
-
 -spec list_generations_with_lifetimes(
     node(),
     emqx_ds:db(),
@@ -127,12 +140,59 @@ list_generations_with_lifetimes(Node, DB, Shard) ->
 drop_generation(Node, DB, Shard, GenId) ->
     erpc:call(Node, emqx_ds_replication_layer, do_drop_generation_v3, [DB, Shard, GenId]).
 
+%%--------------------------------------------------------------------------------
+%% Introduced in V4
+%%--------------------------------------------------------------------------------
+
+-spec get_delete_streams(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds:topic_filter(),
+    emqx_ds:time()
+) ->
+    [emqx_ds_storage_layer:delete_stream()].
+get_delete_streams(Node, DB, Shard, TopicFilter, Time) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_get_delete_streams_v4, [
+        DB, Shard, TopicFilter, Time
+    ]).
+
+-spec make_delete_iterator(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:delete_stream(),
+    emqx_ds:topic_filter(),
+    emqx_ds:time()
+) ->
+    {ok, emqx_ds_storage_layer:delete_iterator()} | {error, _}.
+make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_make_delete_iterator_v4, [
+        DB, Shard, Stream, TopicFilter, StartTime
+    ]).
+
+-spec delete_next(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:delete_iterator(),
+    emqx_ds:delete_selector(),
+    pos_integer()
+) ->
+    {ok, emqx_ds_storage_layer:delete_iterator(), non_neg_integer()}
+    | {ok, end_of_stream}
+    | {error, _}.
+delete_next(Node, DB, Shard, Iter, Selector, BatchSize) ->
+    erpc:call(
+        Node,
+        emqx_ds_replication_layer,
+        do_delete_next_v4,
+        [DB, Shard, Iter, Selector, BatchSize]
+    ).
+
 %%================================================================================
 %% behavior callbacks
 %%================================================================================
 
 introduced_in() ->
-    "5.5.0".
-
-deprecated_since() ->
-    "5.5.1".
+    "5.8.0".

+ 67 - 1
apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl

@@ -895,6 +895,72 @@ t_crash_restart_recover(Config) ->
         []
     ).
 
+t_poll(init, Config) ->
+    Apps = [appspec(emqx_durable_storage), appspec(emqx_ds_builtin_raft)],
+    Nodes = emqx_cth_cluster:start(
+        [
+            {t_poll1, #{apps => Apps}},
+            {t_poll2, #{apps => Apps}}
+        ],
+        #{work_dir => ?config(work_dir, Config)}
+    ),
+    [{nodes, Nodes} | Config];
+t_poll('end', Config) ->
+    ok = emqx_cth_cluster:stop(?config(nodes, Config)).
+
+t_poll(Config) ->
+    DB = ?FUNCTION_NAME,
+    Nodes = [N1 | _] = ?config(nodes, Config),
+    ?check_trace(
+        #{timetrap => 15_000},
+        begin
+            %% Initialize DB on all nodes and wait for it to come online.
+            Opts = opts(Config, #{n_shards => 1}),
+            assert_db_open(Nodes, ?DB, Opts),
+
+            %% Insert data:
+            Batch1 = [
+                message(<<"C1">>, <<"foo/bar">>, <<"1">>, 1),
+                message(<<"C1">>, <<"foo/bar">>, <<"2">>, 1)
+            ],
+            ?ON(N1, ok = emqx_ds:store_batch(?DB, Batch1, #{sync => true})),
+            %% Create initial iterators:
+            Its0 = ?ON(
+                N1,
+                begin
+                    TF = [<<"foo">>, <<"bar">>],
+                    [
+                        begin
+                            {ok, It} = emqx_ds:make_iterator(?DB, Stream, TF, 0),
+                            {make_ref(), It}
+                        end
+                     || {_Rank, Stream} <- emqx_ds:get_streams(?DB, TF, 0)
+                    ]
+                end
+            ),
+            [{Token1, _}] = Its0,
+            %% Check poll funtionality:
+            CheckF =
+                fun() ->
+                    {ok, Alias} = emqx_ds:poll(?DB, Its0, #{timeout => 1000}),
+                    [{Token1, {ok, NextIt, Batch}}] = emqx_ds_test_helpers:collect_poll_replies(
+                        Alias, 1000
+                    ),
+                    ?assertMatch(
+                        [{_, #message{payload = <<"1">>}}, {_, #message{payload = <<"2">>}}],
+                        Batch
+                    ),
+                    ?assertMatch(
+                        {ok, _It, []},
+                        emqx_ds:next(?DB, NextIt, 100),
+                        "The returned iterator is correct"
+                    )
+                end,
+            [?defer_assert(emqx_ds_test_helpers:on(N, CheckF)) || N <- Nodes]
+        end,
+        []
+    ).
+
 nodes_of_clientid(ClientId, Nodes) ->
     emqx_ds_test_helpers:nodes_of_clientid(?DB, ClientId, Nodes).
 
@@ -1062,7 +1128,7 @@ all() ->
 groups() ->
     TCs = emqx_common_test_helpers:all(?MODULE),
     [
-        {bitfield_lts, TCs},
+        {bitfield_lts, TCs -- [t_poll]},
         {skipstream_lts, TCs}
     ].
 

+ 4 - 4
apps/emqx_durable_storage/src/emqx_ds_beamformer.erl

@@ -25,7 +25,8 @@
 %% By "coherent" we mean requests to scan overlapping key ranges of
 %% the same DS stream. Grouping requests helps to reduce the number of
 %% storage queries and conserve throughput of the EMQX backplane
-%% network.
+%% network. This should help in the situations when the majority of
+%% clients are up to date and simply wait for the new data.
 %%
 %% Beamformer works as following:
 %%
@@ -55,9 +56,8 @@
 %% WARNING: beamformer makes some implicit assumptions about the
 %% storage layout:
 %%
-%% - There's a bijection between iterator position and the message key
-%%
-%% - Message keys in the stream are monotonic
+%% - For each topic filter and stream, there's a bijection between
+%% iterator and the message key
 %%
 %% - Quering a stream with non-wildcard topic-filter is equivalent to
 %% quering it with a wildcard topic filter and dropping messages in

+ 14 - 1
apps/emqx_durable_storage/src/emqx_ds_lib.erl

@@ -18,7 +18,7 @@
 -include("emqx_ds.hrl").
 
 %% API:
--export([with_worker/4]).
+-export([with_worker/4, send_poll_timeout/2]).
 
 %% internal exports:
 -export([]).
@@ -55,6 +55,19 @@ with_worker(UserData, Mod, Function, Args) ->
     ),
     {ok, ReplyTo}.
 
+-spec send_poll_timeout(reference(), timeout()) -> ok.
+send_poll_timeout(ReplyTo, Timeout) ->
+    _ = spawn_link(
+        fun() ->
+            receive
+            after Timeout + 10 ->
+                logger:debug("Timeout for poll ~p", [ReplyTo]),
+                ReplyTo ! #poll_reply{ref = ReplyTo, payload = poll_timeout}
+            end
+        end
+    ),
+    ok.
+
 %%================================================================================
 %% Internal exports
 %%================================================================================

+ 7 - 11
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -41,7 +41,6 @@
     generation/1,
     unpack_iterator/2,
     scan_stream/6,
-    message_matcher/2,
 
     delete_next/5,
 
@@ -606,7 +605,13 @@ unpack_iterator(Shard, #{?tag := ?IT, ?generation := GenId, ?enc := Inner}) ->
     case generation_get(Shard, GenId) of
         #{module := Mod, data := GenData} ->
             {InnerStream, TopicFilter, Key, TS} = Mod:unpack_iterator(Shard, GenData, Inner),
-            {?stream_v2(GenId, InnerStream), TopicFilter, Key, TS};
+            #{
+                stream => ?stream_v2(GenId, InnerStream),
+                topic_filter => TopicFilter,
+                last_seen_key => Key,
+                timestamp => TS,
+                message_matcher => Mod:message_matcher(Shard, GenData, Inner)
+            };
         not_found ->
             %% generation was possibly dropped by GC
             ?ERR_GEN_GONE
@@ -628,15 +633,6 @@ scan_stream(
             ?ERR_GEN_GONE
     end.
 
-message_matcher(Shard, #{?tag := ?IT, ?generation := GenId, ?enc := Inner}) ->
-    %% logger:warning(?MODULE_STRING ++ ":match_message(~p, ~p, ~p)", [Shard, GenId, Inner]),
-    case generation_get(Shard, GenId) of
-        #{module := Mod, data := GenData} ->
-            Mod:message_matcher(Shard, GenData, Inner);
-        not_found ->
-            false
-    end.
-
 -spec delete_next(
     shard_id(), delete_iterator(), emqx_ds:delete_selector(), pos_integer(), emqx_ds:time()
 ) ->

+ 2 - 12
apps/emqx_durable_storage/test/emqx_ds_storage_layout_SUITE.erl

@@ -331,7 +331,7 @@ t_poll(Config) ->
             %% the iterator as tags):
             {ok, Alias1} = emqx_ds:poll(?FUNCTION_NAME, Iterators0, PollOpts),
             %% Collect the replies:
-            Got1 = collect_poll_replies(Alias1, Timeout),
+            Got1 = emqx_ds_test_helpers:collect_poll_replies(Alias1, Timeout),
             unalias(Alias1),
             %% 4. Compare data. Everything (batch contents and iterators) should be the same:
             compare_poll_with_reference(Reference1, Got1),
@@ -356,7 +356,7 @@ t_poll(Config) ->
                 _ ->
                     ?assertMatch(
                         [{_, {ok, _, [_ | _]}} | _],
-                        collect_poll_replies(Alias2, Timeout),
+                        emqx_ds_test_helpers:collect_poll_replies(Alias2, Timeout),
                         "Poll reply with non-empty batch should be received after "
                         "data was published to the topic."
                     )
@@ -387,16 +387,6 @@ compare_poll_reply({ok, ReferenceIterator, BatchRef}, {ok, ReplyIterator, Batch}
 compare_poll_reply(A, B) ->
     ?defer_assert(?assertEqual(A, B)).
 
-collect_poll_replies(Alias, Timeout) ->
-    receive
-        #poll_reply{payload = poll_timeout, ref = Alias} ->
-            [];
-        #poll_reply{userdata = ItRef, payload = Reply, ref = Alias} ->
-            [{ItRef, Reply} | collect_poll_replies(Alias, Timeout)]
-    after Timeout ->
-        []
-    end.
-
 t_atomic_store_batch(_Config) ->
     DB = ?FUNCTION_NAME,
     ?check_trace(

+ 11 - 0
apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl

@@ -18,6 +18,7 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-include_lib("emqx_durable_storage/include/emqx_ds.hrl").
 -include_lib("emqx_utils/include/emqx_message.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("stdlib/include/assert.hrl").
@@ -459,3 +460,13 @@ consume_iter_with(NextFun, It0, Opts) ->
         {error, Class, Reason} ->
             error({error, Class, Reason})
     end.
+
+collect_poll_replies(Alias, Timeout) ->
+    receive
+        #poll_reply{payload = poll_timeout, ref = Alias} ->
+            [];
+        #poll_reply{userdata = ItRef, payload = Reply, ref = Alias} ->
+            [{ItRef, Reply} | collect_poll_replies(Alias, Timeout)]
+    after Timeout ->
+        []
+    end.