Просмотр исходного кода

Merge pull request #12377 from ieQu1/ds-publish-batching

feat(ds): Add egress proxies for the builtin backend
ieQu1 2 лет назад
Родитель
Сommit
c388a5441d

+ 10 - 1
apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 -module(emqx_persistent_session_ds_SUITE).
 
@@ -18,6 +18,9 @@
 %% CT boilerplate
 %%------------------------------------------------------------------------------
 
+suite() ->
+    [{timetrap, {seconds, 60}}].
+
 all() ->
     emqx_common_test_helpers:all(?MODULE).
 
@@ -191,6 +194,7 @@ t_non_persistent_session_subscription(_Config) ->
     ClientId = atom_to_binary(?FUNCTION_NAME),
     SubTopicFilter = <<"t/#">>,
     ?check_trace(
+        #{timetrap => 30_000},
         begin
             ?tp(notice, "starting", #{}),
             Client = start_client(#{
@@ -220,6 +224,7 @@ t_session_subscription_idempotency(Config) ->
     SubTopicFilter = <<"t/+">>,
     ClientId = <<"myclientid">>,
     ?check_trace(
+        #{timetrap => 30_000},
         begin
             ?force_ordering(
                 #{?snk_kind := persistent_session_ds_subscription_added},
@@ -281,6 +286,7 @@ t_session_unsubscription_idempotency(Config) ->
     SubTopicFilter = <<"t/+">>,
     ClientId = <<"myclientid">>,
     ?check_trace(
+        #{timetrap => 30_000},
         begin
             ?force_ordering(
                 #{
@@ -385,6 +391,7 @@ do_t_session_discard(Params) ->
     ReconnectOpts = ReconnectOpts0#{clientid => ClientId},
     SubTopicFilter = <<"t/+">>,
     ?check_trace(
+        #{timetrap => 30_000},
         begin
             ?tp(notice, "starting", #{}),
             Client0 = start_client(#{
@@ -472,6 +479,7 @@ do_t_session_expiration(_Config, Opts) ->
     } = Opts,
     CommonParams = #{proto_ver => v5, clientid => ClientId},
     ?check_trace(
+        #{timetrap => 30_000},
         begin
             Topic = <<"some/topic">>,
             Params0 = maps:merge(CommonParams, FirstConn),
@@ -539,6 +547,7 @@ t_session_gc(Config) ->
     end,
 
     ?check_trace(
+        #{timetrap => 30_000},
         begin
             ClientId0 = <<"session_gc0">>,
             Client0 = StartClient(ClientId0, Port1, 30),

+ 2 - 2
apps/emqx/src/emqx_persistent_message.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -99,7 +99,7 @@ needs_persistence(Msg) ->
 
 -spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result().
 store_message(Msg) ->
-    emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg]).
+    emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg], #{sync => false}).
 
 has_subscribers(#message{topic = Topic}) ->
     emqx_persistent_session_ds_router:has_any_route(Topic).

+ 19 - 1
apps/emqx/src/emqx_schema.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -1915,6 +1915,24 @@ fields("session_storage_backend_builtin") ->
                     default => 3,
                     importance => ?IMPORTANCE_HIDDEN
                 }
+            )},
+        {"egress_batch_size",
+            sc(
+                pos_integer(),
+                #{
+                    default => 1000,
+                    mapping => "emqx_durable_storage.egress_batch_size",
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )},
+        {"egress_flush_interval",
+            sc(
+                timeout_duration_ms(),
+                #{
+                    default => 100,
+                    mapping => "emqx_durable_storage.egress_flush_interval",
+                    importance => ?IMPORTANCE_HIDDEN
+                }
             )}
     ].
 

+ 3 - 3
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -376,7 +376,7 @@ t_publish_empty_topic_levels(_Config) ->
             {<<"t/3/bar">>, <<"6">>}
         ],
         [emqtt:publish(Pub, Topic, Payload, ?QOS_1) || {Topic, Payload} <- Messages],
-        Received = receive_messages(length(Messages), 1_500),
+        Received = receive_messages(length(Messages)),
         ?assertMatch(
             [
                 #{topic := <<"t//1/">>, payload := <<"2">>},
@@ -475,7 +475,7 @@ t_metrics_not_dropped(_Config) ->
 
     {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Sub, <<"t/+">>, ?QOS_1),
     emqtt:publish(Pub, <<"t/ps">>, <<"payload">>, ?QOS_1),
-    ?assertMatch([_], receive_messages(1, 1_500)),
+    ?assertMatch([_], receive_messages(1)),
 
     DroppedAfter = emqx_metrics:val('messages.dropped'),
     DroppedNoSubAfter = emqx_metrics:val('messages.dropped.no_subscribers'),

+ 1 - 1
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.

+ 6 - 2
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -111,11 +111,15 @@
 %% use in emqx_guid.  Otherwise, the iterators won't match the message timestamps.
 -type time() :: non_neg_integer().
 
--type message_store_opts() :: #{}.
+-type message_store_opts() ::
+    #{
+        sync => boolean()
+    }.
 
 -type generic_db_opts() ::
     #{
         backend := atom(),
+        serialize_by => clientid | topic,
         _ => _
     }.
 

+ 1 - 1
apps/emqx_durable_storage/src/emqx_ds_app.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 
 -module(emqx_ds_app).

+ 152 - 0
apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl

@@ -0,0 +1,152 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+%% @doc Supervisor that contains all the processes that belong to a
+%% given builtin DS database.
+-module(emqx_ds_builtin_db_sup).
+
+-behaviour(supervisor).
+
+%% API:
+-export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1]).
+
+%% behaviour callbacks:
+-export([init/1]).
+
+%% internal exports:
+-export([start_link_sup/2]).
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+-define(via(REC), {via, gproc, {n, l, REC}}).
+
+-define(db_sup, ?MODULE).
+-define(shard_sup, emqx_ds_builtin_db_shard_sup).
+-define(egress_sup, emqx_ds_builtin_db_egress_sup).
+
+-record(?db_sup, {db}).
+-record(?shard_sup, {db}).
+-record(?egress_sup, {db}).
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+-spec start_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> {ok, pid()}.
+start_db(DB, Opts) ->
+    start_link_sup(#?db_sup{db = DB}, Opts).
+
+-spec start_shard(emqx_ds_storage_layer:shard_id()) ->
+    supervisor:startchild_ret().
+start_shard(Shard = {DB, _}) ->
+    supervisor:start_child(?via(#?shard_sup{db = DB}), shard_spec(DB, Shard)).
+
+-spec start_egress(emqx_ds_storage_layer:shard_id()) ->
+    supervisor:startchild_ret().
+start_egress({DB, Shard}) ->
+    supervisor:start_child(?via(#?egress_sup{db = DB}), egress_spec(DB, Shard)).
+
+-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _}.
+stop_shard(Shard = {DB, _}) ->
+    Sup = ?via(#?shard_sup{db = DB}),
+    ok = supervisor:terminate_child(Sup, Shard),
+    ok = supervisor:delete_child(Sup, Shard).
+
+-spec ensure_shard(emqx_ds_storage_layer:shard_id()) ->
+    ok | {error, _Reason}.
+ensure_shard(Shard) ->
+    case start_shard(Shard) of
+        {ok, _Pid} ->
+            ok;
+        {error, {already_started, _Pid}} ->
+            ok;
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+%%================================================================================
+%% behaviour callbacks
+%%================================================================================
+
+init({#?db_sup{db = DB}, DefaultOpts}) ->
+    %% Spec for the top-level supervisor for the database:
+    logger:notice("Starting DS DB ~p", [DB]),
+    _ = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts),
+    %% TODO: before the leader election is implemented, we set ourselves as the leader for all shards:
+    MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
+    lists:foreach(
+        fun(Shard) ->
+            emqx_ds_replication_layer:maybe_set_myself_as_leader(DB, Shard)
+        end,
+        MyShards
+    ),
+    Children = [sup_spec(#?shard_sup{db = DB}, []), sup_spec(#?egress_sup{db = DB}, [])],
+    SupFlags = #{
+        strategy => one_for_all,
+        intensity => 0,
+        period => 1
+    },
+    {ok, {SupFlags, Children}};
+init({#?shard_sup{db = DB}, _}) ->
+    %% Spec for the supervisor that manages the worker processes for
+    %% each local shard of the DB:
+    MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
+    Children = [shard_spec(DB, Shard) || Shard <- MyShards],
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 10,
+        period => 1
+    },
+    {ok, {SupFlags, Children}};
+init({#?egress_sup{db = DB}, _}) ->
+    %% Spec for the supervisor that manages the egress proxy processes
+    %% managing traffic towards each of the shards of the DB:
+    Shards = emqx_ds_replication_layer_meta:shards(DB),
+    Children = [egress_spec(DB, Shard) || Shard <- Shards],
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 0,
+        period => 1
+    },
+    {ok, {SupFlags, Children}}.
+
+%%================================================================================
+%% Internal exports
+%%================================================================================
+
+start_link_sup(Id, Options) ->
+    supervisor:start_link(?via(Id), ?MODULE, {Id, Options}).
+
+%%================================================================================
+%% Internal functions
+%%================================================================================
+
+sup_spec(Id, Options) ->
+    #{
+        id => element(1, Id),
+        start => {?MODULE, start_link_sup, [Id, Options]},
+        type => supervisor,
+        shutdown => infinity
+    }.
+
+shard_spec(DB, Shard) ->
+    Options = emqx_ds_replication_layer_meta:get_options(DB),
+    #{
+        id => Shard,
+        start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Options]},
+        shutdown => 5_000,
+        restart => permanent,
+        type => worker
+    }.
+
+egress_spec(DB, Shard) ->
+    #{
+        id => Shard,
+        start => {emqx_ds_replication_layer_egress, start_link, [DB, Shard]},
+        shutdown => 5_000,
+        restart => permanent,
+        type => worker
+    }.

+ 132 - 0
apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl

@@ -0,0 +1,132 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% @doc This supervisor manages the global worker processes needed for
+%% the functioning of builtin databases, and all builtin database
+%% attach to it.
+-module(emqx_ds_builtin_sup).
+
+-behaviour(supervisor).
+
+%% API:
+-export([start_db/2, stop_db/1]).
+
+%% behavior callbacks:
+-export([init/1]).
+
+%% internal exports:
+-export([start_top/0, start_databases_sup/0]).
+
+-export_type([]).
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+-define(top, ?MODULE).
+-define(databases, emqx_ds_builtin_databases_sup).
+
+%%================================================================================
+%% API functions
+%%================================================================================
+
+-spec start_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
+    supervisor:startchild_ret().
+start_db(DB, Opts) ->
+    ensure_top(),
+    ChildSpec = #{
+        id => DB,
+        start => {emqx_ds_builtin_db_sup, start_db, [DB, Opts]},
+        type => supervisor,
+        shutdown => infinity
+    },
+    supervisor:start_child(?databases, ChildSpec).
+
+-spec stop_db(emqx_ds:db()) -> ok.
+stop_db(DB) ->
+    case whereis(?databases) of
+        Pid when is_pid(Pid) ->
+            _ = supervisor:terminate_child(?databases, DB),
+            _ = supervisor:delete_child(?databases, DB),
+            ok;
+        undefined ->
+            ok
+    end.
+
+%%================================================================================
+%% behavior callbacks
+%%================================================================================
+
+%% There are two layers of supervision:
+%%
+%% 1. top supervisor for the builtin backend. It contains the global
+%% worker processes (like the metadata server), and `?databases'
+%% supervisior.
+%%
+%% 2. `?databases': a `one_for_one' supervisor where each child is a
+%% `db' supervisor that contains processes that represent the DB.
+%% Chidren are attached dynamically to this one.
+init(?top) ->
+    %% Children:
+    MetadataServer = #{
+        id => metadata_server,
+        start => {emqx_ds_replication_layer_meta, start_link, []},
+        restart => permanent,
+        type => worker,
+        shutdown => 5000
+    },
+    DBsSup = #{
+        id => ?databases,
+        start => {?MODULE, start_databases_sup, []},
+        restart => permanent,
+        type => supervisor,
+        shutdown => infinity
+    },
+    %%
+    SupFlags = #{
+        strategy => one_for_all,
+        intensity => 1,
+        period => 1,
+        auto_shutdown => never
+    },
+    {ok, {SupFlags, [MetadataServer, DBsSup]}};
+init(?databases) ->
+    %% Children are added dynamically:
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 10,
+        period => 1
+    },
+    {ok, {SupFlags, []}}.
+
+%%================================================================================
+%% Internal exports
+%%================================================================================
+
+-spec start_top() -> {ok, pid()}.
+start_top() ->
+    supervisor:start_link({local, ?top}, ?MODULE, ?top).
+
+start_databases_sup() ->
+    supervisor:start_link({local, ?databases}, ?MODULE, ?databases).
+
+%%================================================================================
+%% Internal functions
+%%================================================================================
+
+ensure_top() ->
+    {ok, _} = emqx_ds_sup:attach_backend(builtin, {?MODULE, start_top, []}),
+    ok.

+ 57 - 65
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -32,7 +32,10 @@
     get_streams/3,
     make_iterator/4,
     update_iterator/3,
-    next/3
+    next/3,
+    node_of_shard/2,
+    shard_of_message/3,
+    maybe_set_myself_as_leader/2
 ]).
 
 %% internal exports:
@@ -51,24 +54,12 @@
 -export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]).
 
 -include_lib("emqx_utils/include/emqx_message.hrl").
+-include("emqx_ds_replication_layer.hrl").
 
 %%================================================================================
 %% Type declarations
 %%================================================================================
 
-%% # "Record" integer keys.  We use maps with integer keys to avoid persisting and sending
-%% records over the wire.
-
-%% tags:
--define(STREAM, 1).
--define(IT, 2).
--define(BATCH, 3).
-
-%% keys:
--define(tag, 1).
--define(shard, 2).
--define(enc, 3).
-
 -type shard_id() :: binary().
 
 -type builtin_db_opts() ::
@@ -101,8 +92,6 @@
 
 -type message_id() :: emqx_ds:message_id().
 
--define(batch_messages, 2).
-
 -type batch() :: #{
     ?tag := ?BATCH,
     ?batch_messages := [emqx_types:message()]
@@ -120,21 +109,19 @@ list_shards(DB) ->
 
 -spec open_db(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
 open_db(DB, CreateOpts) ->
-    ok = emqx_ds_sup:ensure_workers(),
-    Opts = emqx_ds_replication_layer_meta:open_db(DB, CreateOpts),
-    MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
-    lists:foreach(
-        fun(Shard) ->
-            emqx_ds_storage_layer:open_shard({DB, Shard}, Opts),
-            maybe_set_myself_as_leader(DB, Shard)
-        end,
-        MyShards
-    ).
+    case emqx_ds_builtin_sup:start_db(DB, CreateOpts) of
+        {ok, _} ->
+            ok;
+        {error, {already_started, _}} ->
+            ok;
+        {error, Err} ->
+            {error, Err}
+    end.
 
 -spec add_generation(emqx_ds:db()) -> ok | {error, _}.
 add_generation(DB) ->
     Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB),
-    _ = emqx_ds_proto_v2:add_generation(Nodes, DB),
+    _ = emqx_ds_proto_v3:add_generation(Nodes, DB),
     ok.
 
 -spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
@@ -170,17 +157,15 @@ drop_generation(DB, {Shard, GenId}) ->
 -spec drop_db(emqx_ds:db()) -> ok | {error, _}.
 drop_db(DB) ->
     Nodes = list_nodes(),
-    _ = emqx_ds_proto_v1:drop_db(Nodes, DB),
+    _ = emqx_ds_proto_v3:drop_db(Nodes, DB),
     _ = emqx_ds_replication_layer_meta:drop_db(DB),
+    emqx_ds_builtin_sup:stop_db(DB),
     ok.
 
 -spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) ->
     emqx_ds:store_batch_result().
 store_batch(DB, Messages, Opts) ->
-    Shard = shard_of_messages(DB, Messages),
-    Node = node_of_shard(DB, Shard),
-    Batch = #{?tag => ?BATCH, ?batch_messages => Messages},
-    emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts).
+    emqx_ds_replication_layer_egress:store_batch(DB, Messages, Opts).
 
 -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
     [{emqx_ds:stream_rank(), stream()}].
@@ -189,7 +174,7 @@ get_streams(DB, TopicFilter, StartTime) ->
     lists:flatmap(
         fun(Shard) ->
             Node = node_of_shard(DB, Shard),
-            Streams = emqx_ds_proto_v1:get_streams(Node, DB, Shard, TopicFilter, StartTime),
+            Streams = emqx_ds_proto_v3:get_streams(Node, DB, Shard, TopicFilter, StartTime),
             lists:map(
                 fun({RankY, Stream}) ->
                     RankX = Shard,
@@ -211,7 +196,7 @@ get_streams(DB, TopicFilter, StartTime) ->
 make_iterator(DB, Stream, TopicFilter, StartTime) ->
     #{?tag := ?STREAM, ?shard := Shard, ?enc := StorageStream} = Stream,
     Node = node_of_shard(DB, Shard),
-    case emqx_ds_proto_v1:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of
+    case emqx_ds_proto_v3:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of
         {ok, Iter} ->
             {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
         Err = {error, _} ->
@@ -228,7 +213,7 @@ update_iterator(DB, OldIter, DSKey) ->
     #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter,
     Node = node_of_shard(DB, Shard),
     case
-        emqx_ds_proto_v2:update_iterator(
+        emqx_ds_proto_v3:update_iterator(
             Node,
             DB,
             Shard,
@@ -254,7 +239,7 @@ next(DB, Iter0, BatchSize) ->
     %%
     %% This kind of trickery should be probably done here in the
     %% replication layer. Or, perhaps, in the logic layer.
-    case emqx_ds_proto_v1:next(Node, DB, Shard, StorageIter0, BatchSize) of
+    case emqx_ds_proto_v3:next(Node, DB, Shard, StorageIter0, BatchSize) of
         {ok, StorageIter, Batch} ->
             Iter = Iter0#{?enc := StorageIter},
             {ok, Iter, Batch};
@@ -262,6 +247,41 @@ next(DB, Iter0, BatchSize) ->
             Other
     end.
 
+-spec node_of_shard(emqx_ds:db(), shard_id()) -> node().
+node_of_shard(DB, Shard) ->
+    case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of
+        {ok, Leader} ->
+            Leader;
+        {error, no_leader_for_shard} ->
+            %% TODO: use optvar
+            timer:sleep(500),
+            node_of_shard(DB, Shard)
+    end.
+
+-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) ->
+    emqx_ds_replication_layer:shard_id().
+shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) ->
+    N = emqx_ds_replication_layer_meta:n_shards(DB),
+    Hash =
+        case SerializeBy of
+            clientid -> erlang:phash2(From, N);
+            topic -> erlang:phash2(Topic, N)
+        end,
+    integer_to_binary(Hash).
+
+%% TODO: there's no real leader election right now
+-spec maybe_set_myself_as_leader(emqx_ds:db(), shard_id()) -> ok.
+maybe_set_myself_as_leader(DB, Shard) ->
+    Site = emqx_ds_replication_layer_meta:this_site(),
+    case emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) of
+        [Site | _] ->
+            %% Currently the first in-sync replica always becomes the
+            %% leader
+            ok = emqx_ds_replication_layer_meta:set_leader(DB, Shard, node());
+        _Sites ->
+            ok
+    end.
+
 %%================================================================================
 %% behavior callbacks
 %%================================================================================
@@ -273,6 +293,7 @@ next(DB, Iter0, BatchSize) ->
 -spec do_drop_db_v1(emqx_ds:db()) -> ok | {error, _}.
 do_drop_db_v1(DB) ->
     MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
+    emqx_ds_builtin_sup:stop_db(DB),
     lists:foreach(
         fun(Shard) ->
             emqx_ds_storage_layer:drop_shard({DB, Shard})
@@ -354,34 +375,5 @@ do_drop_generation_v3(DB, ShardId, GenId) ->
 %% Internal functions
 %%================================================================================
 
-%% TODO: there's no real leader election right now
--spec maybe_set_myself_as_leader(emqx_ds:db(), shard_id()) -> ok.
-maybe_set_myself_as_leader(DB, Shard) ->
-    Site = emqx_ds_replication_layer_meta:this_site(),
-    case emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) of
-        [Site | _] ->
-            %% Currently the first in-sync replica always becomes the
-            %% leader
-            ok = emqx_ds_replication_layer_meta:set_leader(DB, Shard, node());
-        _Sites ->
-            ok
-    end.
-
--spec node_of_shard(emqx_ds:db(), shard_id()) -> node().
-node_of_shard(DB, Shard) ->
-    case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of
-        {ok, Leader} ->
-            Leader;
-        {error, no_leader_for_shard} ->
-            %% TODO: use optvar
-            timer:sleep(500),
-            node_of_shard(DB, Shard)
-    end.
-
-%% Here we assume that all messages in the batch come from the same client
-shard_of_messages(DB, [#message{from = From} | _]) ->
-    N = emqx_ds_replication_layer_meta:n_shards(DB),
-    integer_to_binary(erlang:phash2(From, N)).
-
 list_nodes() ->
     mria:running_nodes().

+ 33 - 0
apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl

@@ -0,0 +1,33 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022, 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-ifndef(EMQX_DS_REPLICATION_LAYER_HRL).
+-define(EMQX_DS_REPLICATION_LAYER_HRL, true).
+
+%% # "Record" integer keys.  We use maps with integer keys to avoid persisting and sending
+%% records over the wire.
+
+%% tags:
+-define(STREAM, 1).
+-define(IT, 2).
+-define(BATCH, 3).
+
+%% keys:
+-define(tag, 1).
+-define(shard, 2).
+-define(enc, 3).
+-define(batch_messages, 2).
+
+-endif.

+ 175 - 0
apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl

@@ -0,0 +1,175 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% @doc Egress servers are responsible for proxing the outcoming
+%% `store_batch' requests towards EMQX DS shards.
+%%
+%% They re-assemble messages from different local processes into
+%% fixed-sized batches, and introduce centralized channels between the
+%% nodes. They are also responsible for maintaining backpressure
+%% towards the local publishers.
+%%
+%% There is (currently) one egress process for each shard running on
+%% each node, but it should be possible to have a pool of egress
+%% servers, if needed.
+-module(emqx_ds_replication_layer_egress).
+
+-behaviour(gen_server).
+
+%% API:
+-export([start_link/2, store_batch/3]).
+
+%% behavior callbacks:
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
+
+%% internal exports:
+-export([]).
+
+-export_type([]).
+
+-include("emqx_ds_replication_layer.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+-define(via(DB, Shard), {via, gproc, {n, l, {?MODULE, DB, Shard}}}).
+-define(flush, flush).
+
+-record(enqueue_req, {message :: emqx_types:message(), sync :: boolean()}).
+
+%%================================================================================
+%% API functions
+%%================================================================================
+
+-spec start_link(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> {ok, pid()}.
+start_link(DB, Shard) ->
+    gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []).
+
+-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
+    ok.
+store_batch(DB, Messages, Opts) ->
+    Sync = maps:get(sync, Opts, true),
+    lists:foreach(
+        fun(Message) ->
+            Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
+            gen_server:call(?via(DB, Shard), #enqueue_req{message = Message, sync = Sync})
+        end,
+        Messages
+    ).
+
+%%================================================================================
+%% behavior callbacks
+%%================================================================================
+
+-record(s, {
+    db :: emqx_ds:db(),
+    shard :: emqx_ds_replication_layer:shard_id(),
+    leader :: node(),
+    n = 0 :: non_neg_integer(),
+    tref :: reference(),
+    batch = [] :: [emqx_types:message()],
+    pending_replies = [] :: [gen_server:from()]
+}).
+
+init([DB, Shard]) ->
+    process_flag(trap_exit, true),
+    process_flag(message_queue_data, off_heap),
+    %% TODO: adjust leader dynamically
+    {ok, Leader} = emqx_ds_replication_layer_meta:shard_leader(DB, Shard),
+    S = #s{
+        db = DB,
+        shard = Shard,
+        leader = Leader,
+        tref = start_timer()
+    },
+    {ok, S}.
+
+handle_call(#enqueue_req{message = Msg, sync = Sync}, From, S) ->
+    do_enqueue(From, Sync, Msg, S);
+handle_call(_Call, _From, S) ->
+    {reply, {error, unknown_call}, S}.
+
+handle_cast(_Cast, S) ->
+    {noreply, S}.
+
+handle_info(?flush, S) ->
+    {noreply, do_flush(S)};
+handle_info(_Info, S) ->
+    {noreply, S}.
+
+terminate(_Reason, _S) ->
+    ok.
+
+%%================================================================================
+%% Internal exports
+%%================================================================================
+
+%%================================================================================
+%% Internal functions
+%%================================================================================
+
+do_flush(S = #s{batch = []}) ->
+    S#s{tref = start_timer()};
+do_flush(
+    S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard, leader = Leader}
+) ->
+    Batch = #{?tag => ?BATCH, ?batch_messages => lists:reverse(Messages)},
+    ok = emqx_ds_proto_v2:store_batch(Leader, DB, Shard, Batch, #{}),
+    [gen_server:reply(From, ok) || From <- lists:reverse(Replies)],
+    ?tp(emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard}),
+    erlang:garbage_collect(),
+    S#s{
+        n = 0,
+        batch = [],
+        pending_replies = [],
+        tref = start_timer()
+    }.
+
+do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) ->
+    NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),
+    S1 = S0#s{n = N + 1, batch = [Msg | Batch]},
+    S2 =
+        case N >= NMax of
+            true ->
+                _ = erlang:cancel_timer(S0#s.tref),
+                do_flush(S1);
+            false ->
+                S1
+        end,
+    %% TODO: later we may want to delay the reply until the message is
+    %% replicated, but it requies changes to the PUBACK/PUBREC flow to
+    %% allow for async replies. For now, we ack when the message is
+    %% _buffered_ rather than stored.
+    %%
+    %% Otherwise, the client would freeze for at least flush interval,
+    %% or until the buffer is filled.
+    S =
+        case Sync of
+            true ->
+                S2#s{pending_replies = [From | Replies]};
+            false ->
+                gen_server:reply(From, ok),
+                S2
+        end,
+    %% TODO: add a backpressure mechanism for the server to avoid
+    %% building a long message queue.
+    {noreply, S}.
+
+start_timer() ->
+    Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100),
+    erlang:send_after(Interval, self(), ?flush).

+ 9 - 3
apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -35,6 +35,7 @@
     in_sync_replicas/2,
     sites/0,
     open_db/2,
+    get_options/1,
     update_db_config/2,
     drop_db/1,
     shard_leader/2,
@@ -230,6 +231,11 @@ is_leader(Node) ->
     {atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:is_leader_trans/1, [Node]),
     Result.
 
+-spec get_options(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts().
+get_options(DB) ->
+    {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, undefined]),
+    Opts.
+
 -spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
     emqx_ds_replication_layer:builtin_db_opts().
 open_db(DB, DefaultOpts) ->
@@ -293,11 +299,11 @@ terminate(_Reason, #s{}) ->
 %% Internal exports
 %%================================================================================
 
--spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
+-spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts() | undefined) ->
     emqx_ds_replication_layer:builtin_db_opts().
 open_db_trans(DB, CreateOpts) ->
     case mnesia:wread({?META_TAB, DB}) of
-        [] ->
+        [] when is_map(CreateOpts) ->
             NShards = maps:get(n_shards, CreateOpts),
             ReplicationFactor = maps:get(replication_factor, CreateOpts),
             mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}),

+ 3 - 3
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -199,7 +199,6 @@ open_shard(Shard, Options) ->
 
 -spec drop_shard(shard_id()) -> ok.
 drop_shard(Shard) ->
-    catch emqx_ds_storage_layer_sup:stop_shard(Shard),
     case persistent_term:get({?MODULE, Shard, data_dir}, undefined) of
         undefined ->
             ok;
@@ -586,7 +585,8 @@ commit_metadata(#s{shard_id = ShardId, schema = Schema, shard = Runtime, db = DB
 rocksdb_open(Shard, Options) ->
     DBOptions = [
         {create_if_missing, true},
-        {create_missing_column_families, true}
+        {create_missing_column_families, true},
+        {enable_write_thread_adaptive_yield, false}
         | maps:get(db_options, Options, [])
     ],
     DataDir = maps:get(data_dir, Options, emqx:data_dir()),

+ 2 - 2
apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 -module(emqx_ds_storage_layer_sup).
 
@@ -23,7 +23,7 @@
 
 -spec start_link() -> {ok, pid()}.
 start_link() ->
-    supervisor:start_link({local, ?SUP}, ?MODULE, []).
+    supervisor:start_link(?MODULE, []).
 
 -spec start_shard(emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) ->
     supervisor:startchild_ret().

+ 21 - 45
apps/emqx_durable_storage/src/emqx_ds_sup.erl

@@ -6,7 +6,7 @@
 -behaviour(supervisor).
 
 %% API:
--export([start_link/0, ensure_workers/0]).
+-export([start_link/0, attach_backend/2]).
 
 %% behaviour callbacks:
 -export([init/1]).
@@ -25,64 +25,40 @@
 start_link() ->
     supervisor:start_link({local, ?SUP}, ?MODULE, top).
 
--spec ensure_workers() -> ok.
-ensure_workers() ->
-    ChildSpec = #{
-        id => workers_sup,
-        restart => temporary,
-        type => supervisor,
-        start => {supervisor, start_link, [?MODULE, workers]}
+%% @doc Attach a child backend-specific supervisor to the top
+%% application supervisor, if not yet present
+-spec attach_backend(_BackendId, {module(), atom(), list()}) ->
+    {ok, pid()} | {error, _}.
+attach_backend(Backend, Start) ->
+    Spec = #{
+        id => Backend,
+        start => Start,
+        significant => false,
+        shutdown => infinity,
+        type => supervisor
     },
-    case supervisor:start_child(?SUP, ChildSpec) of
-        {ok, _} ->
-            ok;
-        {error, already_present} ->
-            ok;
-        {error, {already_started, _}} ->
-            ok
+    case supervisor:start_child(?SUP, Spec) of
+        {ok, Pid} ->
+            {ok, Pid};
+        {error, {already_started, Pid}} ->
+            {ok, Pid};
+        {error, Err} ->
+            {error, Err}
     end.
 
 %%================================================================================
 %% behaviour callbacks
 %%================================================================================
 
--dialyzer({nowarn_function, init/1}).
 init(top) ->
+    Children = [],
     SupFlags = #{
-        strategy => one_for_all,
+        strategy => one_for_one,
         intensity => 10,
         period => 1
     },
-    {ok, {SupFlags, []}};
-init(workers) ->
-    %% TODO: technically, we don't need rocksDB for the alternative
-    %% backends. But right now we have any:
-    Children = [meta(), storage_layer_sup()],
-    SupFlags = #{
-        strategy => one_for_all,
-        intensity => 0,
-        period => 1
-    },
     {ok, {SupFlags, Children}}.
 
 %%================================================================================
 %% Internal functions
 %%================================================================================
-
-meta() ->
-    #{
-        id => emqx_ds_replication_layer_meta,
-        start => {emqx_ds_replication_layer_meta, start_link, []},
-        restart => permanent,
-        type => worker,
-        shutdown => 5000
-    }.
-
-storage_layer_sup() ->
-    #{
-        id => local_store_shard_sup,
-        start => {emqx_ds_storage_layer_sup, start_link, []},
-        restart => permanent,
-        type => supervisor,
-        shutdown => infinity
-    }.

+ 1 - 1
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.

+ 13 - 18
apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl

@@ -16,8 +16,8 @@
 -define(DEFAULT_CONFIG, #{
     backend => builtin,
     storage => {emqx_ds_storage_bitfield_lts, #{}},
-    n_shards => 16,
-    replication_factor => 3
+    n_shards => 1,
+    replication_factor => 1
 }).
 
 -define(COMPACT_CONFIG, #{
@@ -26,15 +26,10 @@
         {emqx_ds_storage_bitfield_lts, #{
             bits_per_wildcard_level => 8
         }},
-    n_shards => 16,
-    replication_factor => 3
+    n_shards => 1,
+    replication_factor => 1
 }).
 
-%% Smoke test for opening and reopening the database
-t_open(_Config) ->
-    ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
-    {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}).
-
 %% Smoke test of store function
 t_store(_Config) ->
     MessageID = emqx_guid:gen(),
@@ -98,8 +93,8 @@ t_get_streams(_Config) ->
     [FooBarBaz] = GetStream(<<"foo/bar/baz">>),
     [A] = GetStream(<<"a">>),
     %% Restart shard to make sure trie is persisted and restored:
-    ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
-    {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}),
+    ok = emqx_ds_builtin_sup:stop_db(?FUNCTION_NAME),
+    {ok, _} = emqx_ds_builtin_sup:start_db(?FUNCTION_NAME, #{}),
     %% Verify that there are no "ghost streams" for topics that don't
     %% have any messages:
     [] = GetStream(<<"bar/foo">>),
@@ -196,9 +191,9 @@ t_replay(_Config) ->
     ?assert(check(?SHARD, <<"foo/+/+">>, 0, Messages)),
     ?assert(check(?SHARD, <<"+/+/+">>, 0, Messages)),
     ?assert(check(?SHARD, <<"+/+/baz">>, 0, Messages)),
-    %% Restart shard to make sure trie is persisted and restored:
-    ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
-    {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}),
+    %% Restart the DB to make sure trie is persisted and restored:
+    ok = emqx_ds_builtin_sup:stop_db(?FUNCTION_NAME),
+    {ok, _} = emqx_ds_builtin_sup:start_db(?FUNCTION_NAME, #{}),
     %% Learned wildcard topics:
     ?assertNot(check(?SHARD, <<"wildcard/1000/suffix/foo">>, 0, [])),
     ?assert(check(?SHARD, <<"wildcard/1/suffix/foo">>, 0, Messages)),
@@ -412,21 +407,21 @@ suite() -> [{timetrap, {seconds, 20}}].
 
 init_per_suite(Config) ->
     {ok, _} = application:ensure_all_started(emqx_durable_storage),
-    emqx_ds_sup:ensure_workers(),
     Config.
 
 end_per_suite(_Config) ->
     ok = application:stop(emqx_durable_storage).
 
 init_per_testcase(TC, Config) ->
-    {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), ?DEFAULT_CONFIG),
+    ok = emqx_ds:open_db(TC, ?DEFAULT_CONFIG),
     Config.
 
 end_per_testcase(TC, _Config) ->
-    ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)).
+    emqx_ds:drop_db(TC),
+    ok.
 
 shard(TC) ->
-    {?MODULE, atom_to_binary(TC)}.
+    {TC, <<"0">>}.
 
 keyspace(TC) ->
     TC.