Parcourir la source

Merge pull request #11924 from ieQu1/dev/emqx-ds-behavior

feat(ds): Introduce emqx_ds behavior
ieQu1 il y a 2 ans
Parent
commit
5170267fef

+ 1 - 1
apps/emqx/src/emqx_persistent_message_ds_replayer.erl

@@ -161,7 +161,7 @@ fetch(_SessionId, Inflight, _Streams, 0, Acc) ->
 fetch(SessionId, Inflight0, [Stream | Streams], N, Publishes0) ->
     #inflight{next_seqno = FirstSeqNo, offset_ranges = Ranges0} = Inflight0,
     ItBegin = get_last_iterator(SessionId, Stream, Ranges0),
-    {ok, ItEnd, Messages} = emqx_ds:next(ItBegin, N),
+    {ok, ItEnd, Messages} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, N),
     {NMessages, Publishes, Inflight1} =
         lists:foldl(
             fun(Msg, {N0, PubAcc0, InflightAcc0}) ->

+ 3 - 5
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -105,8 +105,6 @@
 
 -export_type([id/0]).
 
--define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
-
 %%
 
 -spec create(clientinfo(), conninfo(), emqx_session:conf()) ->
@@ -497,8 +495,6 @@ storage() ->
 %% @doc Called when a client connects. This function looks up a
 %% session or returns `false` if previous one couldn't be found.
 %%
-%% This function also spawns replay agents for each iterator.
-%%
 %% Note: session API doesn't handle session takeovers, it's the job of
 %% the broker.
 -spec session_open(id()) ->
@@ -670,7 +666,9 @@ renew_streams(Id, ExistingStreams, TopicFilter, StartTime) ->
                             ok;
                         false ->
                             mnesia:write(?SESSION_STREAM_TAB, Rec, write),
-                            {ok, Iterator} = emqx_ds:make_iterator(Stream, TopicFilter, StartTime),
+                            {ok, Iterator} = emqx_ds:make_iterator(
+                                ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
+                            ),
                             IterRec = #ds_iter{id = {Id, Stream}, iter = Iterator},
                             mnesia:write(?SESSION_ITER_TAB, IterRec, write)
                     end

+ 2 - 0
apps/emqx/src/emqx_persistent_session_ds.hrl

@@ -16,6 +16,8 @@
 -ifndef(EMQX_PERSISTENT_SESSION_DS_HRL_HRL).
 -define(EMQX_PERSISTENT_SESSION_DS_HRL_HRL, true).
 
+-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
+
 -define(SESSION_TAB, emqx_ds_session).
 -define(SESSION_SUBSCRIPTIONS_TAB, emqx_ds_session_subscriptions).
 -define(SESSION_STREAM_TAB, emqx_ds_stream_tab).

+ 2 - 2
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -256,14 +256,14 @@ consume(TopicFilter, StartMS) ->
     Streams = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartMS),
     lists:flatmap(
         fun({_Rank, Stream}) ->
-            {ok, It} = emqx_ds:make_iterator(Stream, TopicFilter, StartMS),
+            {ok, It} = emqx_ds:make_iterator(?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartMS),
             consume(It)
         end,
         Streams
     ).
 
 consume(It) ->
-    case emqx_ds:next(It, 100) of
+    case emqx_ds:next(?PERSISTENT_MESSAGE_DB, It, 100) of
         {ok, _NIt, _Msgs = []} ->
             [];
         {ok, NIt, Msgs} ->

+ 1 - 1
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -133,7 +133,7 @@ get_listener_port(Type, Name) ->
 end_per_group(Group, Config) when Group == tcp; Group == ws; Group == quic ->
     ok = emqx_cth_suite:stop(?config(group_apps, Config));
 end_per_group(_, _Config) ->
-    ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB),
+    catch emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB),
     ok.
 
 init_per_testcase(TestCase, Config) ->

+ 42 - 12
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -28,7 +28,7 @@
 -export([store_batch/2, store_batch/3]).
 
 %% Message replay API:
--export([get_streams/3, make_iterator/3, next/2]).
+-export([get_streams/3, make_iterator/4, next/3]).
 
 %% Misc. API:
 -export([]).
@@ -100,6 +100,26 @@
 
 -type get_iterator_result(Iterator) :: {ok, Iterator} | undefined.
 
+-define(persistent_term(DB), {emqx_ds_db_backend, DB}).
+
+-define(module(DB), (persistent_term:get(?persistent_term(DB)))).
+
+%%================================================================================
+%% Behavior callbacks
+%%================================================================================
+
+-callback open_db(db(), create_db_opts()) -> ok | {error, _}.
+
+-callback drop_db(db()) -> ok | {error, _}.
+
+-callback store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result().
+
+-callback get_streams(db(), topic_filter(), time()) -> [{stream_rank(), stream()}].
+
+-callback make_iterator(db(), _Stream, topic_filter(), time()) -> make_iterator_result(_Iterator).
+
+-callback next(db(), Iterator, pos_integer()) -> next_result(Iterator).
+
 %%================================================================================
 %% API funcions
 %%================================================================================
@@ -107,19 +127,29 @@
 %% @doc Different DBs are completely independent from each other. They
 %% could represent something like different tenants.
 -spec open_db(db(), create_db_opts()) -> ok.
-open_db(DB, Opts = #{backend := builtin}) ->
-    emqx_ds_replication_layer:open_db(DB, Opts).
+open_db(DB, Opts = #{backend := Backend}) when Backend =:= builtin ->
+    Module =
+        case Backend of
+            builtin -> emqx_ds_replication_layer
+        end,
+    persistent_term:put(?persistent_term(DB), Module),
+    ?module(DB):open_db(DB, Opts).
 
 %% @doc TODO: currently if one or a few shards are down, they won't be
 
 %% deleted.
 -spec drop_db(db()) -> ok.
 drop_db(DB) ->
-    emqx_ds_replication_layer:drop_db(DB).
+    case persistent_term:get(?persistent_term(DB), undefined) of
+        undefined ->
+            ok;
+        Module ->
+            Module:drop_db(DB)
+    end.
 
 -spec store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result().
 store_batch(DB, Msgs, Opts) ->
-    emqx_ds_replication_layer:store_batch(DB, Msgs, Opts).
+    ?module(DB):store_batch(DB, Msgs, Opts).
 
 -spec store_batch(db(), [emqx_types:message()]) -> store_batch_result().
 store_batch(DB, Msgs) ->
@@ -168,15 +198,15 @@ store_batch(DB, Msgs) ->
 %% replaying streams that depend on the given one.
 -spec get_streams(db(), topic_filter(), time()) -> [{stream_rank(), stream()}].
 get_streams(DB, TopicFilter, StartTime) ->
-    emqx_ds_replication_layer:get_streams(DB, TopicFilter, StartTime).
+    ?module(DB):get_streams(DB, TopicFilter, StartTime).
 
--spec make_iterator(stream(), topic_filter(), time()) -> make_iterator_result().
-make_iterator(Stream, TopicFilter, StartTime) ->
-    emqx_ds_replication_layer:make_iterator(Stream, TopicFilter, StartTime).
+-spec make_iterator(db(), stream(), topic_filter(), time()) -> make_iterator_result().
+make_iterator(DB, Stream, TopicFilter, StartTime) ->
+    ?module(DB):make_iterator(DB, Stream, TopicFilter, StartTime).
 
--spec next(iterator(), pos_integer()) -> next_result().
-next(Iter, BatchSize) ->
-    emqx_ds_replication_layer:next(Iter, BatchSize).
+-spec next(db(), iterator(), pos_integer()) -> next_result().
+next(DB, Iter, BatchSize) ->
+    ?module(DB):next(DB, Iter, BatchSize).
 
 %%================================================================================
 %% Internal exports

+ 83 - 67
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -18,23 +18,26 @@
 %% replication on their own.
 -module(emqx_ds_replication_layer).
 
+-behaviour(emqx_ds).
+
 -export([
     list_shards/1,
     open_db/2,
     drop_db/1,
     store_batch/3,
     get_streams/3,
-    make_iterator/3,
-    next/2
+    make_iterator/4,
+    next/3
 ]).
 
 %% internal exports:
 -export([
-    do_open_shard_v1/2,
-    do_drop_shard_v1/1,
-    do_get_streams_v1/3,
-    do_make_iterator_v1/4,
-    do_next_v1/3
+    do_open_shard_v1/3,
+    do_drop_shard_v1/2,
+    do_store_batch_v1/4,
+    do_get_streams_v1/4,
+    do_make_iterator_v1/5,
+    do_next_v1/4
 ]).
 
 -export_type([shard_id/0, stream/0, iterator/0, message_id/0]).
@@ -55,9 +58,7 @@
 -define(shard, 2).
 -define(enc, 3).
 
--type db() :: emqx_ds:db().
-
--type shard_id() :: {db(), atom()}.
+-type shard_id() :: atom().
 
 %% This enapsulates the stream entity from the replication level.
 %%
@@ -85,52 +86,48 @@
 %% API functions
 %%================================================================================
 
--spec list_shards(db()) -> [shard_id()].
-list_shards(DB) ->
+-spec list_shards(emqx_ds:db()) -> [shard_id()].
+list_shards(_DB) ->
     %% TODO: milestone 5
-    lists:map(
-        fun(Node) ->
-            shard_id(DB, Node)
-        end,
-        list_nodes()
-    ).
+    list_nodes().
 
--spec open_db(db(), emqx_ds:create_db_opts()) -> ok | {error, _}.
+-spec open_db(emqx_ds:db(), emqx_ds:create_db_opts()) -> ok | {error, _}.
 open_db(DB, Opts) ->
     %% TODO: improve error reporting, don't just crash
     lists:foreach(
-        fun(Node) ->
-            Shard = shard_id(DB, Node),
-            ok = emqx_ds_proto_v1:open_shard(Node, Shard, Opts)
+        fun(Shard) ->
+            Node = node_of_shard(DB, Shard),
+            ok = emqx_ds_proto_v1:open_shard(Node, DB, Shard, Opts)
         end,
-        list_nodes()
+        list_shards(DB)
     ).
 
--spec drop_db(db()) -> ok | {error, _}.
+-spec drop_db(emqx_ds:db()) -> ok | {error, _}.
 drop_db(DB) ->
     lists:foreach(
-        fun(Node) ->
-            Shard = shard_id(DB, Node),
-            ok = emqx_ds_proto_v1:drop_shard(Node, Shard)
+        fun(Shard) ->
+            Node = node_of_shard(DB, Shard),
+            ok = emqx_ds_proto_v1:drop_shard(Node, DB, Shard)
         end,
-        list_nodes()
+        list_shards(DB)
     ).
 
--spec store_batch(db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
+-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
     emqx_ds:store_batch_result().
-store_batch(DB, Msg, Opts) ->
+store_batch(DB, Batch, Opts) ->
     %% TODO: Currently we store messages locally.
-    Shard = shard_id(DB, node()),
-    emqx_ds_storage_layer:store_batch(Shard, Msg, Opts).
+    Shard = node(),
+    Node = node_of_shard(DB, Shard),
+    emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts).
 
--spec get_streams(db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
+-spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
     [{emqx_ds:stream_rank(), stream()}].
 get_streams(DB, TopicFilter, StartTime) ->
     Shards = list_shards(DB),
     lists:flatmap(
         fun(Shard) ->
-            Node = node_of_shard(Shard),
-            Streams = emqx_ds_proto_v1:get_streams(Node, Shard, TopicFilter, StartTime),
+            Node = node_of_shard(DB, Shard),
+            Streams = emqx_ds_proto_v1:get_streams(Node, DB, Shard, TopicFilter, StartTime),
             lists:map(
                 fun({RankY, Stream}) ->
                     RankX = Shard,
@@ -147,22 +144,22 @@ get_streams(DB, TopicFilter, StartTime) ->
         Shards
     ).
 
--spec make_iterator(stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
+-spec make_iterator(emqx_ds:db(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
     emqx_ds:make_iterator_result(iterator()).
-make_iterator(Stream, TopicFilter, StartTime) ->
+make_iterator(DB, Stream, TopicFilter, StartTime) ->
     #{?tag := ?stream, ?shard := Shard, ?enc := StorageStream} = Stream,
-    Node = node_of_shard(Shard),
-    case emqx_ds_proto_v1:make_iterator(Node, Shard, StorageStream, TopicFilter, StartTime) of
+    Node = node_of_shard(DB, Shard),
+    case emqx_ds_proto_v1:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of
         {ok, Iter} ->
             {ok, #{?tag => ?it, ?shard => Shard, ?enc => Iter}};
         Err = {error, _} ->
             Err
     end.
 
--spec next(iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
-next(Iter0, BatchSize) ->
+-spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
+next(DB, Iter0, BatchSize) ->
     #{?tag := ?it, ?shard := Shard, ?enc := StorageIter0} = Iter0,
-    Node = node_of_shard(Shard),
+    Node = node_of_shard(DB, Shard),
     %% TODO: iterator can contain information that is useful for
     %% reconstructing messages sent over the network. For example,
     %% when we send messages with the learned topic index, we could
@@ -171,7 +168,7 @@ next(Iter0, BatchSize) ->
     %%
     %% This kind of trickery should be probably done here in the
     %% replication layer. Or, perhaps, in the logic layer.
-    case emqx_ds_proto_v1:next(Node, Shard, StorageIter0, BatchSize) of
+    case emqx_ds_proto_v1:next(Node, DB, Shard, StorageIter0, BatchSize) of
         {ok, StorageIter, Batch} ->
             Iter = Iter0#{?enc := StorageIter},
             {ok, Iter, Batch};
@@ -187,42 +184,61 @@ next(Iter0, BatchSize) ->
 %% Internal exports (RPC targets)
 %%================================================================================
 
--spec do_open_shard_v1(shard_id(), emqx_ds:create_db_opts()) -> ok.
-do_open_shard_v1(Shard, Opts) ->
-    emqx_ds_storage_layer:open_shard(Shard, Opts).
-
--spec do_drop_shard_v1(shard_id()) -> ok.
-do_drop_shard_v1(Shard) ->
-    emqx_ds_storage_layer:drop_shard(Shard).
+-spec do_open_shard_v1(
+    emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts()
+) ->
+    ok | {error, _}.
+do_open_shard_v1(DB, Shard, Opts) ->
+    emqx_ds_storage_layer:open_shard({DB, Shard}, Opts).
+
+-spec do_drop_shard_v1(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> ok | {error, _}.
+do_drop_shard_v1(DB, Shard) ->
+    emqx_ds_storage_layer:drop_shard({DB, Shard}).
+
+-spec do_store_batch_v1(
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    [emqx_types:message()],
+    emqx_ds:message_store_opts()
+) ->
+    emqx_ds:store_batch_result().
+do_store_batch_v1(DB, Shard, Batch, Options) ->
+    emqx_ds_storage_layer:store_batch({DB, Shard}, Batch, Options).
 
--spec do_get_streams_v1(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) ->
+-spec do_get_streams_v1(
+    emqx_ds:db(), emqx_ds_replicationi_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
+) ->
     [{integer(), emqx_ds_storage_layer:stream()}].
-do_get_streams_v1(Shard, TopicFilter, StartTime) ->
-    emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime).
+do_get_streams_v1(DB, Shard, TopicFilter, StartTime) ->
+    emqx_ds_storage_layer:get_streams({DB, Shard}, TopicFilter, StartTime).
 
 -spec do_make_iterator_v1(
-    shard_id(), emqx_ds_storage_layer:stream(), emqx_ds:topic_filter(), emqx_ds:time()
+    emqx_ds:db(),
+    emqx_ds_storage_layer:shard_id(),
+    emqx_ds_storage_layer:stream(),
+    emqx_ds:topic_filter(),
+    emqx_ds:time()
 ) ->
     {ok, emqx_ds_storage_layer:iterator()} | {error, _}.
-do_make_iterator_v1(Shard, Stream, TopicFilter, StartTime) ->
-    emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, StartTime).
-
--spec do_next_v1(shard_id(), emqx_ds_storage_layer:iterator(), pos_integer()) ->
+do_make_iterator_v1(DB, Shard, Stream, TopicFilter, StartTime) ->
+    emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime).
+
+-spec do_next_v1(
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:iterator(),
+    pos_integer()
+) ->
     emqx_ds:next_result(emqx_ds_storage_layer:iterator()).
-do_next_v1(Shard, Iter, BatchSize) ->
-    emqx_ds_storage_layer:next(Shard, Iter, BatchSize).
+do_next_v1(DB, Shard, Iter, BatchSize) ->
+    emqx_ds_storage_layer:next({DB, Shard}, Iter, BatchSize).
 
 %%================================================================================
 %% Internal functions
 %%================================================================================
 
-shard_id(DB, Node) ->
-    %% TODO: don't bake node name into the schema, don't repeat the
-    %% Mnesia's 1M$ mistake.
-    {DB, Node}.
-
--spec node_of_shard(shard_id()) -> node().
-node_of_shard({_DB, Node}) ->
+-spec node_of_shard(emqx_ds:db(), shard_id()) -> node().
+node_of_shard(_DB, Node) ->
     Node.
 
 list_nodes() ->

+ 5 - 5
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -110,7 +110,7 @@
 %%================================================================================
 
 -spec create(
-    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:shard_id(),
     rocksdb:db_handle(),
     emqx_ds_storage_layer:gen_id(),
     options()
@@ -137,7 +137,7 @@ create(_ShardId, DBHandle, GenId, Options) ->
     {Schema, [{DataCFName, DataCFHandle}, {TrieCFName, TrieCFHandle}]}.
 
 -spec open(
-    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:shard_id(),
     rocksdb:db_handle(),
     emqx_ds_storage_layer:gen_id(),
     emqx_ds_storage_layer:cf_refs(),
@@ -173,7 +173,7 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
     }.
 
 -spec store_batch(
-    emqx_ds_replication_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts()
+    emqx_ds_storage_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts()
 ) ->
     emqx_ds:store_batch_result().
 store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
@@ -187,7 +187,7 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
     ).
 
 -spec get_streams(
-    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:shard_id(),
     s(),
     emqx_ds:topic_filter(),
     emqx_ds:time()
@@ -197,7 +197,7 @@ get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
     [#{?tag => ?stream, ?storage_key => I} || I <- Indexes].
 
 -spec make_iterator(
-    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:shard_id(),
     s(),
     stream(),
     emqx_ds:topic_filter(),

+ 4 - 4
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -50,7 +50,7 @@
     {emqx_ds_storage_reference, emqx_ds_storage_reference:options()}
     | {emqx_ds_storage_bitfield_lts, emqx_ds_storage_bitfield_lts:options()}.
 
--type shard_id() :: emqx_ds_replication_layer:shard_id().
+-type shard_id() :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()}.
 
 -type cf_refs() :: [{string(), rocksdb:cf_handle()}].
 
@@ -217,7 +217,7 @@ next(Shard, Iter = #{?tag := ?it, ?generation := GenId, ?enc := GenIter0}, Batch
 
 -spec start_link(shard_id(), emqx_ds:builtin_db_opts()) ->
     {ok, pid()}.
-start_link(Shard, Options) ->
+start_link(Shard = {_, _}, Options) ->
     gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []).
 
 -record(s, {
@@ -417,11 +417,11 @@ generations_since(Shard, Since) ->
 -define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}).
 
 -spec get_schema_runtime(shard_id()) -> shard().
-get_schema_runtime(Shard) ->
+get_schema_runtime(Shard = {_, _}) ->
     persistent_term:get(?PERSISTENT_TERM(Shard)).
 
 -spec put_schema_runtime(shard_id(), shard()) -> ok.
-put_schema_runtime(Shard, RuntimeSchema) ->
+put_schema_runtime(Shard = {_, _}, RuntimeSchema) ->
     persistent_term:put(?PERSISTENT_TERM(Shard), RuntimeSchema),
     ok.
 

+ 4 - 3
apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl

@@ -25,7 +25,7 @@
 start_link() ->
     supervisor:start_link({local, ?SUP}, ?MODULE, []).
 
--spec start_shard(emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts()) ->
+-spec start_shard(emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) ->
     supervisor:startchild_ret().
 start_shard(Shard, Options) ->
     supervisor:start_child(?SUP, shard_child_spec(Shard, Options)).
@@ -35,7 +35,8 @@ stop_shard(Shard) ->
     ok = supervisor:terminate_child(?SUP, Shard),
     ok = supervisor:delete_child(?SUP, Shard).
 
--spec ensure_shard(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> ok | {error, _Reason}.
+-spec ensure_shard(emqx_ds_storage_layer:shard_id(), emqx_ds_storage_layer:options()) ->
+    ok | {error, _Reason}.
 ensure_shard(Shard, Options) ->
     case start_shard(Shard, Options) of
         {ok, _Pid} ->
@@ -63,7 +64,7 @@ init([]) ->
 %% Internal functions
 %%================================================================================
 
--spec shard_child_spec(emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts()) ->
+-spec shard_child_spec(emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) ->
     supervisor:child_spec().
 shard_child_spec(Shard, Options) ->
     #{

+ 40 - 15
apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl

@@ -19,7 +19,7 @@
 
 -include_lib("emqx_utils/include/bpapi.hrl").
 %% API:
--export([open_shard/3, drop_shard/2, get_streams/4, make_iterator/5, next/4]).
+-export([open_shard/4, drop_shard/3, store_batch/5, get_streams/5, make_iterator/6, next/5]).
 
 %% behavior callbacks:
 -export([introduced_in/0]).
@@ -28,44 +28,69 @@
 %% API funcions
 %%================================================================================
 
--spec open_shard(node(), emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts()) ->
+-spec open_shard(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds:create_db_opts()
+) ->
     ok.
-open_shard(Node, Shard, Opts) ->
-    erpc:call(Node, emqx_ds_replication_layer, do_open_shard_v1, [Shard, Opts]).
+open_shard(Node, DB, Shard, Opts) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_open_shard_v1, [DB, Shard, Opts]).
 
--spec drop_shard(node(), emqx_ds_replication_layer:shard_id()) ->
+-spec drop_shard(node(), emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
     ok.
-drop_shard(Node, Shard) ->
-    erpc:call(Node, emqx_ds_replication_layer, do_drop_shard_v1, [Shard]).
+drop_shard(Node, DB, Shard) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_drop_shard_v1, [DB, Shard]).
 
 -spec get_streams(
-    node(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds:topic_filter(),
+    emqx_ds:time()
 ) ->
     [{integer(), emqx_ds_storage_layer:stream()}].
-get_streams(Node, Shard, TopicFilter, Time) ->
-    erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]).
+get_streams(Node, DB, Shard, TopicFilter, Time) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]).
 
 -spec make_iterator(
     node(),
+    emqx_ds:db(),
     emqx_ds_replication_layer:shard_id(),
     emqx_ds_storage_layer:stream(),
     emqx_ds:topic_filter(),
     emqx_ds:time()
 ) ->
     {ok, emqx_ds_storage_layer:iterator()} | {error, _}.
-make_iterator(Node, Shard, Stream, TopicFilter, StartTime) ->
+make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
     erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [
-        Shard, Stream, TopicFilter, StartTime
+        DB, Shard, Stream, TopicFilter, StartTime
     ]).
 
 -spec next(
-    node(), emqx_ds_replication_layer:shard_id(), emqx_ds_storage_layer:iterator(), pos_integer()
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:iterator(),
+    pos_integer()
 ) ->
     {ok, emqx_ds_storage_layer:iterator(), [emqx_types:messages()]}
     | {ok, end_of_stream}
     | {error, _}.
-next(Node, Shard, Iter, BatchSize) ->
-    erpc:call(Node, emqx_ds_replication_layer, do_next_v1, [Shard, Iter, BatchSize]).
+next(Node, DB, Shard, Iter, BatchSize) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]).
+
+-spec store_batch(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    [emqx_types:message()],
+    emqx_ds:message_store_opts()
+) ->
+    emqx_ds:store_batch_result().
+store_batch(Node, DB, Shard, Batch, Options) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_store_batch_v1, [DB, Shard, Batch, Options]).
 
 %%================================================================================
 %% behavior callbacks

+ 10 - 10
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -54,7 +54,7 @@ t_02_smoke_get_streams_start_iter(_Config) ->
     TopicFilter = ['#'],
     [{Rank, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
     ?assertMatch({_, _}, Rank),
-    ?assertMatch({ok, _Iter}, emqx_ds:make_iterator(Stream, TopicFilter, StartTime)).
+    ?assertMatch({ok, _Iter}, emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime)).
 
 %% A simple smoke test that verifies that it's possible to iterate
 %% over messages.
@@ -70,8 +70,8 @@ t_03_smoke_iterate(_Config) ->
     ],
     ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
     [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
-    {ok, Iter0} = emqx_ds:make_iterator(Stream, TopicFilter, StartTime),
-    {ok, Iter, Batch} = iterate(Iter0, 1),
+    {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
+    {ok, Iter, Batch} = iterate(DB, Iter0, 1),
     ?assertEqual(Msgs, Batch, {Iter0, Iter}).
 
 %% Verify that iterators survive restart of the application. This is
@@ -91,14 +91,14 @@ t_04_restart(_Config) ->
     ],
     ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
     [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
-    {ok, Iter0} = emqx_ds:make_iterator(Stream, TopicFilter, StartTime),
+    {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
     %% Restart the application:
     ?tp(warning, emqx_ds_SUITE_restart_app, #{}),
     ok = application:stop(emqx_durable_storage),
     {ok, _} = application:ensure_all_started(emqx_durable_storage),
     ok = emqx_ds:open_db(DB, opts()),
     %% The old iterator should be still operational:
-    {ok, Iter, Batch} = iterate(Iter0, 1),
+    {ok, Iter, Batch} = iterate(DB, Iter0, 1),
     ?assertEqual(Msgs, Batch, {Iter0, Iter}).
 
 message(Topic, Payload, PublishedAt) ->
@@ -109,15 +109,15 @@ message(Topic, Payload, PublishedAt) ->
         id = emqx_guid:gen()
     }.
 
-iterate(It, BatchSize) ->
-    iterate(It, BatchSize, []).
+iterate(DB, It, BatchSize) ->
+    iterate(DB, It, BatchSize, []).
 
-iterate(It0, BatchSize, Acc) ->
-    case emqx_ds:next(It0, BatchSize) of
+iterate(DB, It0, BatchSize, Acc) ->
+    case emqx_ds:next(DB, It0, BatchSize) of
         {ok, It, []} ->
             {ok, It, Acc};
         {ok, It, Msgs} ->
-            iterate(It, BatchSize, Acc ++ Msgs);
+            iterate(DB, It, BatchSize, Acc ++ Msgs);
         Ret ->
             Ret
     end.