Просмотр исходного кода

refactor(ds): Create a prototype of replication layer

ieQu1 2 лет назад
Родитель
Сommit
c91df2f5cd

+ 1 - 1
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -74,7 +74,7 @@
 ]).
 
 %% FIXME
--define(DS_SHARD_ID, <<"local">>).
+-define(DS_SHARD_ID, atom_to_binary(node())).
 -define(DEFAULT_KEYSPACE, default).
 -define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}).
 

+ 16 - 1
apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl

@@ -23,7 +23,8 @@
 
     open_iterator/4,
     close_iterator/2,
-    close_all_iterators/2
+    close_all_iterators/2,
+    get_streams/5
 ]).
 
 -include_lib("emqx/include/bpapi.hrl").
@@ -50,6 +51,20 @@ open_iterator(Nodes, TopicFilter, StartMS, IteratorID) ->
         ?TIMEOUT
     ).
 
+-spec get_streams(
+        node(),
+        emqx_ds:keyspace(),
+        emqx_ds:shard_id(),
+        emqx_ds:topic_filter(),
+        emqx_ds:time()) ->
+          [emqx_ds_storage_layer:stream()].
+get_streams(Node, Keyspace, ShardId, TopicFilter, StartTime) ->
+    erpc:call(
+      Node,
+      emqx_ds_storage_layer,
+      get_streams,
+      [Keyspace, ShardId, TopicFilter, StartTime]).
+
 -spec close_iterator(
     [node()],
     emqx_ds:iterator_id()

+ 0 - 42
apps/emqx_durable_storage/IMPLEMENTATION.md

@@ -31,48 +31,6 @@ Read pattern: pseudoserial
 
 Number of records: O(total write throughput * retention time)
 
-## Session storage
-
-Data there is updated when:
-
-- A new client connects with clean session = false
-- Client subscribes to a topic
-- Client unsubscribes to a topic
-- Garbage collection is performed
-
-Write throughput: low
-
-Data is read when a client connects and replay agents are started
-
-Read throughput: low
-
-Data format:
-
-`#session{clientId = "foobar", iterators = [ItKey1, ItKey2, ItKey3, ...]}`
-
-Number of records: O(N clients)
-
-Size of record: O(N subscriptions per clients)
-
-## Iterator storage
-
-Data is written every time a client acks a message.
-
-Data is read when a client reconnects and we restart replay agents.
-
-`#iterator{key = IterKey, data = Blob}`
-
-Number of records: O(N clients * N subscriptions per client)
-
-Size of record: O(1)
-
-Write throughput: high, lots of small updates
-
-Write pattern: mostly key overwrite
-
-Read throughput: low
-
-Read pattern: random
 
 # Push vs. Pull model
 

+ 5 - 4
apps/emqx_durable_storage/README.md

@@ -1,9 +1,10 @@
 # EMQX Replay
 
-`emqx_ds` is a durable storage for MQTT messages within EMQX.
-It implements the following scenarios:
-- Persisting messages published by clients
--
+`emqx_ds` is a generic durable storage for MQTT messages within EMQX.
+
+Concepts:
+
+
 
 > 0. App overview introduction
 > 1. let people know what your project can do specifically. Is it a base

+ 112 - 91
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -15,48 +15,29 @@
 %%--------------------------------------------------------------------
 -module(emqx_ds).
 
--include_lib("stdlib/include/ms_transform.hrl").
--include_lib("snabbkaffe/include/snabbkaffe.hrl").
+%% Management API:
+-export([create_db/2]).
 
-%% API:
--export([ensure_shard/2]).
-%%   Messages:
--export([message_store/2, message_store/1, message_stats/0]).
-%%   Iterator:
--export([iterator_update/2, iterator_next/1, iterator_stats/0]).
+%% Message storage API:
+-export([message_store/3, message_store/2]).
+
+%% Message replay API:
+-export([get_streams/3, open_iterator/2, next/2]).
 
 %% internal exports:
 -export([]).
 
--export_type([
-    keyspace/0,
-    message_id/0,
-    message_stats/0,
-    message_store_opts/0,
-    replay/0,
-    replay_id/0,
-    iterator_id/0,
-    iterator/0,
-    shard/0,
-    shard_id/0,
-    topic/0,
-    topic_filter/0,
-    time/0
-]).
+-export_type([db/0, time/0, topic_filter/0, topic/0]).
 
 %%================================================================================
 %% Type declarations
 %%================================================================================
 
--type iterator() :: term().
-
--type iterator_id() :: binary().
-
--type message_store_opts() :: #{}.
-
--type message_stats() :: #{}.
-
--type message_id() :: binary().
+%% Different DBs are completely independent from each other. They
+%% could represent something like different tenants.
+%%
+%% Topics stored in different DBs aren't necesserily disjoint.
+-type db() :: binary().
 
 %% Parsed topic.
 -type topic() :: list(binary()).
@@ -64,9 +45,30 @@
 %% Parsed topic filter.
 -type topic_filter() :: list(binary() | '+' | '#' | '').
 
--type keyspace() :: atom().
--type shard_id() :: binary().
--type shard() :: {keyspace(), shard_id()}.
+%% This record enapsulates the stream entity from the replication
+%% level.
+%%
+%% TODO: currently the stream is hardwired to only support the
+%% internal rocksdb storage. In t he future we want to add another
+%% implementations for emqx_ds, so this type has to take this into
+%% account.
+-record(stream,
+        { shard :: emqx_ds:shard()
+        , enc :: emqx_ds_replication_layer:stream()
+        }).
+
+-type stream_rank() :: {integer(), integer()}.
+
+-opaque stream() :: #stream{}.
+
+%% This record encapsulates the iterator entity from the replication
+%% level.
+-record(iterator,
+        { shard :: emqx_ds:shard()
+        , enc :: enqx_ds_replication_layer:iterator()
+        }).
+
+-opaque iterator() :: #iterator{}.
 
 %% Timestamp
 %% Earliest possible timestamp is 0.
@@ -74,70 +76,89 @@
 %% use in emqx_guid.  Otherwise, the iterators won't match the message timestamps.
 -type time() :: non_neg_integer().
 
--type replay_id() :: binary().
+-type message_store_opts() :: #{}.
+
+-type create_db_opts() :: #{}.
 
--type replay() :: {
-    _TopicFilter :: topic_filter(),
-    _StartTime :: time()
-}.
+-type message_id() :: binary().
 
 %%================================================================================
 %% API funcions
 %%================================================================================
 
--spec ensure_shard(shard(), emqx_ds_storage_layer:options()) ->
-    ok | {error, _Reason}.
-ensure_shard(Shard, Options) ->
-    case emqx_ds_storage_layer_sup:start_shard(Shard, Options) of
-        {ok, _Pid} ->
-            ok;
-        {error, {already_started, _Pid}} ->
-            ok;
-        {error, Reason} ->
-            {error, Reason}
-    end.
+-spec create_db(db(), create_db_opts()) -> ok.
+create_db(DB, Opts) ->
+    emqx_ds_replication_layer:create_db(DB, Opts).
 
-%%--------------------------------------------------------------------------------
-%% Message
-%%--------------------------------------------------------------------------------
--spec message_store([emqx_types:message()], message_store_opts()) ->
+-spec message_store(db(), [emqx_types:message()], message_store_opts()) ->
     {ok, [message_id()]} | {error, _}.
-message_store(_Msg, _Opts) ->
-    %% TODO
-    {error, not_implemented}.
-
--spec message_store([emqx_types:message()]) -> {ok, [message_id()]} | {error, _}.
-message_store(Msg) ->
-    %% TODO
-    message_store(Msg, #{}).
-
--spec message_stats() -> message_stats().
-message_stats() ->
-    #{}.
-
-%%--------------------------------------------------------------------------------
-%% Session
-%%--------------------------------------------------------------------------------
-
-%%--------------------------------------------------------------------------------
-%% Iterator (pull API)
-%%--------------------------------------------------------------------------------
-
-%% @doc Called when a client acks a message
--spec iterator_update(iterator_id(), iterator()) -> ok.
-iterator_update(_IterId, _Iter) ->
-    %% TODO
-    ok.
-
-%% @doc Called when a client acks a message
--spec iterator_next(iterator()) -> {value, emqx_types:message(), iterator()} | none | {error, _}.
-iterator_next(_Iter) ->
-    %% TODO
-    none.
-
--spec iterator_stats() -> #{}.
-iterator_stats() ->
-    #{}.
+message_store(DB, Msgs, Opts) ->
+    emqx_ds_replication_layer:message_store(DB, Msgs, Opts).
+
+-spec message_store(db(), [emqx_types:message()]) -> {ok, [message_id()]} | {error, _}.
+message_store(DB, Msgs) ->
+    message_store(DB, Msgs, #{}).
+
+%% @doc Get a list of streams needed for replaying a topic filter.
+%%
+%% Motivation: under the hood, EMQX may store different topics at
+%% different locations or even in different databases. A wildcard
+%% topic filter may require pulling data from any number of locations.
+%%
+%% Stream is an abstraction exposed by `emqx_ds' that reflects the
+%% notion that different topics can be stored differently, but hides
+%% the implementation details.
+%%
+%% Rules:
+%%
+%% 1. New streams matching the topic filter can appear without notice,
+%% so the replayer must periodically call this function to get the
+%% updated list of streams.
+%%
+%% 2. Streams may depend on one another. Therefore, care should be
+%% taken while replaying them in parallel to avoid out-of-order
+%% replay. This function returns stream together with its
+%% "coordinates": `{X, T, Stream}'. If X coordinate of two streams is
+%% different, then they can be replayed in parallel.  If it's the
+%% same, then the stream with smaller T coordinate should be replayed
+%% first.
+-spec get_streams(db(), topic_filter(), time()) -> [{stream_rank(), stream()}].
+get_streams(DB, TopicFilter, StartTime) ->
+    Shards = emqx_ds_replication_layer:list_shards(DB),
+    lists:flatmap(
+      fun(Shard) ->
+              Streams = emqx_ds_replication_layer:get_streams(Shard, TopicFilter, StartTime),
+              [{Rank, #stream{ shard = Shard
+                             , enc = I
+                             }} || {Rank, I} <- Streams]
+      end,
+      Shards).
+
+-spec open_iterator(stream(), time()) -> {ok, iterator()} | {error, _}.
+open_iterator(#stream{shard = Shard, enc = Stream}, StartTime) ->
+    case emqx_ds_replication_layer:open_iterator(Shard, Stream, StartTime) of
+        {ok, Iter} ->
+            {ok, #iterator{shard = Shard, enc = Iter}};
+        Err = {error, _} ->
+            Err
+    end.
+
+-spec next(iterator(), non_neg_integer()) -> {ok, iterator(), [emqx_types:message()]} | end_of_stream.
+next(#iterator{shard = Shard, enc = Iter0}, BatchSize) ->
+    case emqx_ds_replication_layer:next(Shard, Iter0, BatchSize) of
+        {ok, Iter, Batch} ->
+            {ok, #iterator{shard = Shard, enc = Iter}, Batch};
+        end_of_stream ->
+            end_of_stream
+    end.
+
+%%================================================================================
+%% behavior callbacks
+%%================================================================================
+
+%%================================================================================
+%% Internal exports
+%%================================================================================
 
 %%================================================================================
 %% Internal functions

+ 189 - 0
apps/emqx_durable_storage/src/emqx_ds.erl_

@@ -0,0 +1,189 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_ds).
+
+-include_lib("stdlib/include/ms_transform.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+%% API:
+-export([ensure_shard/2]).
+%%   Messages:
+-export([message_store/2, message_store/1, message_stats/0]).
+%%   Iterator:
+-export([get_streams/3, open_iterator/1, next/2]).
+
+%% internal exports:
+-export([]).
+
+-export_type([
+              stream/0,
+    keyspace/0,
+    message_id/0,
+    message_stats/0,
+    message_store_opts/0,
+    replay/0,
+    replay_id/0,
+              %iterator_id/0,
+    iterator/0,
+    topic/0,
+    topic_filter/0,
+    time/0
+]).
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+%% This record enapsulates the stream entity from the storage level.
+%%
+%% TODO: currently the stream is hardwired to only support the
+%% internal rocksdb storage. In t he future we want to add another
+%% implementations for emqx_ds, so this type has to take this into
+%% account.
+-record(stream,
+        { shard :: emqx_ds:shard()
+        ,  :: emqx_ds_storage_layer:stream()
+        }).
+
+-opaque stream() :: #stream{}.
+
+-type iterator() :: term().
+
+%-type iterator_id() :: binary().
+
+-type message_store_opts() :: #{}.
+
+-type message_stats() :: #{}.
+
+-type message_id() :: binary().
+
+%% Parsed topic.
+-type topic() :: list(binary()).
+
+%% Parsed topic filter.
+-type topic_filter() :: list(binary() | '+' | '#' | '').
+
+-type keyspace() :: atom().
+-type shard_id() :: binary().
+-type shard() :: {keyspace(), shard_id()}.
+
+%% Timestamp
+%% Earliest possible timestamp is 0.
+%% TODO granularity?  Currently, we should always use micro second, as that's the unit we
+%% use in emqx_guid.  Otherwise, the iterators won't match the message timestamps.
+-type time() :: non_neg_integer().
+
+-type replay_id() :: binary().
+
+-type replay() :: {
+    _TopicFilter :: topic_filter(),
+    _StartTime :: time()
+}.
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+%% @doc Get a list of streams needed for replaying a topic filter.
+%%
+%% Motivation: under the hood, EMQX may store different topics at
+%% different locations or even in different databases. A wildcard
+%% topic filter may require pulling data from any number of locations.
+%%
+%% Stream is an abstraction exposed by `emqx_ds' that reflects the
+%% notion that different topics can be stored differently, but hides
+%% the implementation details.
+%%
+%% Rules:
+%%
+%% 1. New streams matching the topic filter can appear without notice,
+%% so the replayer must periodically call this function to get the
+%% updated list of streams.
+%%
+%% 2. Streams may depend on one another. Therefore, care should be
+%% taken while replaying them in parallel to avoid out-of-order
+%% replay. This function returns stream together with its
+%% "coordinates": `{X, T, Stream}'. If X coordinate of two streams is
+%% different, then they can be replayed in parallel.  If it's the
+%% same, then the stream with smaller T coordinate should be replayed
+%% first.
+-spec get_streams(keyspace(), topic_filter(), time()) -> [{integer(), integer(), stream()}].
+get_streams(Keyspace, TopicFilter, StartTime) ->
+    ShardIds = emqx_ds_replication_layer:get_all_shards(Keyspace),
+    lists:flatmap(
+      fun(Shard) ->
+              Node = emqx_ds_replication_layer:shard_to_node(Shard),
+              try
+                  Streams = emqx_persistent_session_ds_proto_v1:get_streams(Node, Keyspace, Shard, TopicFilter, StartTime),
+                  [#stream{ shard = {Keyspace, ShardId}
+                          , stream = Stream
+                          } || Stream <- Streams]
+              catch
+                  error:{erpc, _} ->
+                      %% The caller has to periodically refresh the
+                      %% list of streams anyway, so it's ok to ignore
+                      %% transient errors.
+                      []
+              end
+      end,
+      ShardIds).
+
+-spec ensure_shard(shard(), emqx_ds_storage_layer:options()) ->
+    ok | {error, _Reason}.
+ensure_shard(Shard, Options) ->
+    case emqx_ds_storage_layer_sup:start_shard(Shard, Options) of
+        {ok, _Pid} ->
+            ok;
+        {error, {already_started, _Pid}} ->
+            ok;
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+%%--------------------------------------------------------------------------------
+%% Message
+%%--------------------------------------------------------------------------------
+
+-spec message_store([emqx_types:message()], message_store_opts()) ->
+    {ok, [message_id()]} | {error, _}.
+message_store(Msg, Opts) ->
+    message_store(Msg, Opts).
+
+-spec message_store([emqx_types:message()]) -> {ok, [message_id()]} | {error, _}.
+message_store(Msg) ->
+    message_store(Msg, #{}).
+
+-spec message_stats() -> message_stats().
+message_stats() ->
+    #{}.
+
+%%--------------------------------------------------------------------------------
+%% Iterator (pull API)
+%%--------------------------------------------------------------------------------
+
+-spec open_iterator(stream()) -> {ok, iterator()}.
+open_iterator(#stream{shard = {_Keyspace, _ShardId}, stream = _StorageSpecificStream}) ->
+    error(todo).
+
+-spec next(iterator(), non_neg_integer()) ->
+          {ok, iterator(), [emqx_types:message()]}
+        | end_of_stream.
+next(_Iterator, _BatchSize) ->
+    error(todo).
+
+%%================================================================================
+%% Internal functions
+%%================================================================================

+ 2 - 1
apps/emqx_durable_storage/src/emqx_ds_lts.erl

@@ -458,7 +458,8 @@ topic_match_test() ->
                             [{S2_12, [<<"1">>]},
                              {S2_1_, [<<"1">>, <<"2">>]}]),
         assert_match_topics(T, [2, '#'],
-                            [{S21, []}, {S22, []}, {S2_, ['+']},
+                            [{S21, []}, {S22, []},
+                             {S2_, ['+']},
                              {S2_11, ['+']}, {S2_12, ['+']},
                              {S2_1_, ['+', '+']}]),
         ok

+ 11 - 4
apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl

@@ -85,9 +85,9 @@
 
 -export([store/5]).
 -export([delete/4]).
--export([make_iterator/2]).
--export([make_iterator/3]).
--export([next/1]).
+
+-export([get_streams/2]).
+-export([make_iterator/2, make_iterator/3, next/1]).
 
 -export([preserve_iterator/1]).
 -export([restore_iterator/2]).
@@ -112,7 +112,7 @@
     compute_topic_seek/4
 ]).
 
--export_type([db/0, iterator/0, schema/0]).
+-export_type([db/0, stream/0, iterator/0, schema/0]).
 
 -export_type([options/0]).
 -export_type([iteration_options/0]).
@@ -131,6 +131,8 @@
 %% Type declarations
 %%================================================================================
 
+-opaque stream() :: singleton_stream.
+
 -type topic() :: emqx_ds:topic().
 -type topic_filter() :: emqx_ds:topic_filter().
 -type time() :: emqx_ds:time().
@@ -288,6 +290,11 @@ delete(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic
     Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper),
     rocksdb:delete(DBHandle, CFHandle, Key, DB#db.write_options).
 
+-spec get_streams(db(), emqx_ds:reply()) ->
+          [stream()].
+get_streams(_, _) ->
+    [singleton_stream].
+
 -spec make_iterator(db(), emqx_ds:replay()) ->
     {ok, iterator()} | {error, _TODO}.
 make_iterator(DB, Replay) ->

+ 128 - 0
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -0,0 +1,128 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_ds_replication_layer).
+
+-export([
+          list_shards/1,
+          create_db/2,
+          message_store/3,
+          get_streams/3,
+          open_iterator/3,
+          next/3
+        ]).
+
+
+%% internal exports:
+-export([ do_create_shard_v1/2,
+          do_get_streams_v1/3,
+          do_open_iterator/3,
+          do_next_v1/3
+        ]).
+
+-export_type([shard/0, stream/0, iterator/0]).
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+-opaque stream() :: emqx_ds_storage_layer:stream().
+
+-type shard() :: binary().
+
+-opaque iterator() :: emqx_ds_storage_layer:iterator().
+
+%%================================================================================
+%% API functions
+%%================================================================================
+
+-spec list_shards(emqx_ds:db()) -> [shard()].
+list_shards(DB) ->
+    %% TODO: milestone 5
+    lists:map(
+      fun(Node) ->
+              term_to_binary({DB, Node})
+      end,
+      list_nodes()).
+
+-spec create_db(emqx_ds:db(), emqx_ds:create_db_opts()) -> ok.
+create_db(DB, Opts) ->
+    lists:foreach(
+      fun(Node) ->
+              Shard = term_to_binary({DB, Node}),
+              emqx_ds_proto_v1:create_shard(Node, Shard, Opts)
+      end,
+      list_nodes()).
+
+-spec message_store(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
+    {ok, [message_id()]} | {error, _}.
+message_store(DB, Msg, Opts) ->
+    %% TODO: milestone 5. Currently we store messages locally.
+    Shard = term_to_binary({DB, node()}),
+    emqx_ds_storage_layer:message_store(Shard, Msg, Opts).
+
+-spec get_streams(shard(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{emqx_ds:stream_rank(), stream()}].
+get_streams(Shard, TopicFilter, StartTime) ->
+    Node = node_of_shard(Shard),
+    emqx_ds_proto_v1:get_streams(Node, Shard, TopicFilter, StartTime).
+
+-spec open_iterator(shard(), stream(), emqx_ds:time()) -> {ok, iterator()} | {error, _}.
+open_iterator(Shard, Stream, StartTime) ->
+    Node = node_of_shard(Shard),
+    emqx_ds_proto_v1:open_iterator(Node, Shard, Stream, StartTime).
+
+-spec next(shard(), iterator(), non_neg_integer()) ->
+          {ok, iterator(), [emqx_types:message()]} | end_of_stream.
+next(Shard, Iter, BatchSize) ->
+    Node = node_of_shard(Shard),
+    emqx_ds_proto_v1:next(Node, Shard, Iter, BatchSize).
+
+%%================================================================================
+%% behavior callbacks
+%%================================================================================
+
+%%================================================================================
+%% Internal exports (RPC targets)
+%%================================================================================
+
+-spec do_create_shard_v1(shard(), emqx_ds:create_db_opts()) -> ok.
+do_create_shard_v1(Shard, Opts) ->
+    error({todo, Shard, Opts}).
+
+-spec do_get_streams_v1(shard(), emqx_ds:topic_filter(), emqx_ds:time()) ->
+          [{emqx_ds:stream_rank(), stream()}].
+do_get_streams_v1(Shard, TopicFilter, StartTime) ->
+    error({todo, Shard, TopicFilter, StartTime}).
+
+-spec do_open_iterator_v1(shard(), stream(), emqx_ds:time()) -> iterator().
+do_open_iterator_v1(Shard, Stream, Time) ->
+    error({todo, Shard, Stream, StartTime}).
+
+-spec do_next_v1(shard(), iterator(), non_neg_integer()) ->
+          {ok, iterator(), [emqx_types:message()]} | end_of_stream.
+do_next_v1(Shard, Iter, BatchSize) ->
+    error({todo, Shard, Iter, BatchSize}).
+
+%%================================================================================
+%% Internal functions
+%%================================================================================
+
+-spec node_of_shard(shard()) -> node().
+node_of_shard(ShardId) ->
+    {_DB, Node} = binary_to_term(ShardId),
+    Node.
+
+list_nodes() ->
+    mria:running_nodes().

+ 21 - 6
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -9,6 +9,7 @@
 -export([start_link/2]).
 -export([create_generation/3]).
 
+-export([get_streams/3]).
 -export([store/5]).
 -export([delete/4]).
 
@@ -27,7 +28,7 @@
 %% behaviour callbacks:
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
 
--export_type([cf_refs/0, gen_id/0, options/0, state/0, iterator/0]).
+-export_type([stream/0, cf_refs/0, gen_id/0, options/0, state/0, iterator/0]).
 -export_type([db_options/0, db_write_options/0, db_read_options/0]).
 
 -compile({inline, [meta_lookup/2]}).
@@ -36,6 +37,8 @@
 %% Type declarations
 %%================================================================================
 
+-opaque stream() :: {term()}.
+
 -type options() :: #{
     dir => file:filename()
 }.
@@ -114,10 +117,10 @@
     cf_refs(),
     _Schema
 ) ->
-    term().
+    _DB.
 
 -callback store(
-    _Schema,
+    _DB,
     _MessageID :: binary(),
     emqx_ds:time(),
     emqx_ds:topic(),
@@ -125,13 +128,16 @@
 ) ->
     ok | {error, _}.
 
--callback delete(_Schema, _MessageID :: binary(), emqx_ds:time(), emqx_ds:topic()) ->
+-callback delete(_DB, _MessageID :: binary(), emqx_ds:time(), emqx_ds:topic()) ->
     ok | {error, _}.
 
--callback make_iterator(_Schema, emqx_ds:replay()) ->
+-callback get_streams(_DB, emqx_ds:topic_filter(), emqx_ds:time()) ->
+    [_Stream].
+
+-callback make_iterator(_DB, emqx_ds:replay()) ->
     {ok, _It} | {error, _}.
 
--callback restore_iterator(_Schema, _Serialized :: binary()) -> {ok, _It} | {error, _}.
+-callback restore_iterator(_DB, _Serialized :: binary()) -> {ok, _It} | {error, _}.
 
 -callback preserve_iterator(_It) -> term().
 
@@ -146,6 +152,15 @@
 start_link(Shard = {Keyspace, ShardId}, Options) ->
     gen_server:start_link(?REF(Keyspace, ShardId), ?MODULE, {Shard, Options}, []).
 
+-spec get_streams(emqx_ds:keyspace(), emqx_ds:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> [stream()].
+get_streams(KeySpace, TopicFilter, StartTime) ->
+    %% FIXME: messages can be potentially stored in multiple
+    %% generations. This function should return the results from all
+    %% of them!
+    %% Otherwise we could LOSE messages when generations are switched.
+    {GenId, #{module := Mod, }} = meta_lookup_gen(Shard, StartTime),
+
+
 -spec create_generation(
     emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()
 ) ->

+ 56 - 0
apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl

@@ -0,0 +1,56 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_ds_proto_v1).
+
+-behavior(emqx_bpapi).
+
+%% API:
+-export([]).
+
+%% behavior callbacks:
+-export([introduced_in/0]).
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+-spec create_shard(node(), emqx_ds_replication_layer:shard(), emqx_ds:create_db_opts()) ->
+          ok.
+create_shard(Node, Shard, Opts) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_create_shard_v1, [Shard, Opts]).
+
+-spec get_streams(node(), emqx_ds_replication_layer:shard(), emqx_ds:topic_filter(), emqx_ds:time()) ->
+          [emqx_ds_replication_layer:stream()].
+get_streams(Shard, TopicFilter, Time) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]).
+
+-spec open_iterator(node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:stream(), emqx_ds:time()) ->
+          {ok, emqx_ds_replication_layer:iterator()} | {error, _}.
+open_iterator(Node, Shard, Stream, StartTime) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_open_iterator_v1, [Shard, Stream, Time]).
+
+-spec next(node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:iterator(), non_neg_integer()) ->
+          {ok, emqx_ds_replication_layer:iterator(), [emqx_types:messages()]} | end_of_stream.
+next(Node, Shard, Iter, BatchSize) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_next_v1, [Shard, Iter, BatchSize]).
+
+%%================================================================================
+%% behavior callbacks
+%%================================================================================
+
+introduced_in() ->
+    %% FIXME
+    "5.3.0".