Просмотр исходного кода

Merge pull request #12361 from keynslug/ft/EMQX-11756/emqx-ds-replication

feat(ds): implement raft-based replication
Andrew Mayorov 1 год назад
Родитель
Сommit
e10d43cdce
27 измененных файлов с 1283 добавлено и 521 удалено
  1. 2 1
      apps/emqx/rebar.config
  2. 31 7
      apps/emqx/src/emqx_ds_schema.erl
  3. 18 5
      apps/emqx/src/emqx_message.erl
  4. 0 1
      apps/emqx/src/emqx_rpc.erl
  5. 1 1
      apps/emqx/test/emqx_bpapi_static_checks.erl
  6. 56 20
      apps/emqx/test/emqx_persistent_messages_SUITE.erl
  7. 7 2
      apps/emqx_durable_storage/src/emqx_ds.erl
  8. 22 7
      apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl
  9. 108 35
      apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl
  10. 236 75
      apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl
  11. 10 0
      apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl
  12. 22 21
      apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl
  13. 91 166
      apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl
  14. 207 0
      apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl
  15. 154 0
      apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl
  16. 58 57
      apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl
  17. 106 74
      apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl
  18. 4 5
      apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl
  19. 1 1
      apps/emqx_durable_storage/src/emqx_durable_storage.app.src
  20. 5 8
      apps/emqx_durable_storage/test/emqx_ds_SUITE.erl
  21. 15 31
      apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl
  22. 1 1
      apps/emqx_machine/src/emqx_machine_boot.erl
  23. 41 1
      apps/emqx_utils/src/emqx_utils_stream.erl
  24. 74 0
      apps/emqx_utils/test/emqx_utils_stream_tests.erl
  25. 2 1
      mix.exs
  26. 2 1
      rebar.config
  27. 9 0
      rel/i18n/emqx_ds_schema.hocon

+ 2 - 1
apps/emqx/rebar.config

@@ -34,7 +34,8 @@
     {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
     {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
     {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
-    {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}
+    {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}},
+    {ra, "2.7.3"}
 ]}.
 
 {plugins, [{rebar3_proper, "0.12.1"}, rebar3_path_deps]}.

+ 31 - 7
apps/emqx/src/emqx_ds_schema.erl

@@ -36,12 +36,15 @@
 %% API
 %%================================================================================
 
-translate_builtin(#{
-    backend := builtin,
-    n_shards := NShards,
-    replication_factor := ReplFactor,
-    layout := Layout
-}) ->
+translate_builtin(
+    Backend = #{
+        backend := builtin,
+        n_shards := NShards,
+        n_sites := NSites,
+        replication_factor := ReplFactor,
+        layout := Layout
+    }
+) ->
     Storage =
         case Layout of
             #{
@@ -61,7 +64,9 @@ translate_builtin(#{
     #{
         backend => builtin,
         n_shards => NShards,
+        n_sites => NSites,
         replication_factor => ReplFactor,
+        replication_options => maps:get(replication_options, Backend, #{}),
         storage => Storage
     }.
 
@@ -126,6 +131,16 @@ fields(builtin) ->
                     desc => ?DESC(builtin_n_shards)
                 }
             )},
+        %% TODO: Deprecate once cluster management and rebalancing is implemented.
+        {"n_sites",
+            sc(
+                pos_integer(),
+                #{
+                    default => 1,
+                    importance => ?IMPORTANCE_HIDDEN,
+                    desc => ?DESC(builtin_n_sites)
+                }
+            )},
         {replication_factor,
             sc(
                 pos_integer(),
@@ -134,6 +149,15 @@ fields(builtin) ->
                     importance => ?IMPORTANCE_HIDDEN
                 }
             )},
+        %% TODO: Elaborate.
+        {"replication_options",
+            sc(
+                hoconsc:map(name, any()),
+                #{
+                    default => #{},
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )},
         {local_write_buffer,
             sc(
                 ref(builtin_local_write_buffer),
@@ -201,7 +225,7 @@ fields(layout_builtin_wildcard_optimized) ->
             sc(
                 range(0, 64),
                 #{
-                    default => 10,
+                    default => 20,
                     importance => ?IMPORTANCE_HIDDEN,
                     desc => ?DESC(wildcard_optimized_epoch_bits)
                 }

+ 18 - 5
apps/emqx/src/emqx_message.erl

@@ -38,7 +38,8 @@
     from/1,
     topic/1,
     payload/1,
-    timestamp/1
+    timestamp/1,
+    timestamp/2
 ]).
 
 %% Flags
@@ -79,7 +80,10 @@
     estimate_size/1
 ]).
 
--export_type([message_map/0]).
+-export_type([
+    timestamp/0,
+    message_map/0
+]).
 
 -type message_map() :: #{
     id := binary(),
@@ -89,10 +93,14 @@
     headers := emqx_types:headers(),
     topic := emqx_types:topic(),
     payload := emqx_types:payload(),
-    timestamp := integer(),
+    timestamp := timestamp(),
     extra := _
 }.
 
+%% Message timestamp
+%% Granularity: milliseconds.
+-type timestamp() :: non_neg_integer().
+
 -elvis([{elvis_style, god_modules, disable}]).
 
 -spec make(emqx_types:topic(), emqx_types:payload()) -> emqx_types:message().
@@ -201,9 +209,14 @@ topic(#message{topic = Topic}) -> Topic.
 -spec payload(emqx_types:message()) -> emqx_types:payload().
 payload(#message{payload = Payload}) -> Payload.
 
--spec timestamp(emqx_types:message()) -> integer().
+-spec timestamp(emqx_types:message()) -> timestamp().
 timestamp(#message{timestamp = TS}) -> TS.
 
+-spec timestamp(emqx_types:message(), second | millisecond | microsecond) -> non_neg_integer().
+timestamp(#message{timestamp = TS}, second) -> TS div 1000;
+timestamp(#message{timestamp = TS}, millisecond) -> TS;
+timestamp(#message{timestamp = TS}, microsecond) -> TS * 1000.
+
 -spec is_sys(emqx_types:message()) -> boolean().
 is_sys(#message{flags = #{sys := true}}) ->
     true;
@@ -416,7 +429,7 @@ from_map(#{
     }.
 
 %% @doc Get current timestamp in milliseconds.
--spec timestamp_now() -> integer().
+-spec timestamp_now() -> timestamp().
 timestamp_now() ->
     erlang:system_time(millisecond).
 

+ 0 - 1
apps/emqx/src/emqx_rpc.erl

@@ -37,7 +37,6 @@
     badrpc/0,
     call_result/1,
     call_result/0,
-    cast_result/0,
     multicall_result/1,
     multicall_result/0,
     erpc/1,

+ 1 - 1
apps/emqx/test/emqx_bpapi_static_checks.erl

@@ -48,7 +48,7 @@
 
 %% Applications and modules we wish to ignore in the analysis:
 -define(IGNORED_APPS,
-    "gen_rpc, recon, redbug, observer_cli, snabbkaffe, ekka, mria, amqp_client, rabbit_common, esaml"
+    "gen_rpc, recon, redbug, observer_cli, snabbkaffe, ekka, mria, amqp_client, rabbit_common, esaml, ra"
 ).
 -define(IGNORED_MODULES, "emqx_rpc").
 -define(FORCE_DELETED_MODULES, [

+ 56 - 20
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -33,10 +33,6 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    %% avoid inter-suite flakiness...
-    %% TODO: remove after other suites start to use `emx_cth_suite'
-    application:stop(emqx),
-    application:stop(emqx_durable_storage),
     Config.
 
 end_per_suite(_Config) ->
@@ -45,19 +41,33 @@ end_per_suite(_Config) ->
 init_per_testcase(t_session_subscription_iterators = TestCase, Config) ->
     Cluster = cluster(),
     Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}),
+    _ = wait_shards_online(Nodes),
     [{nodes, Nodes} | Config];
 init_per_testcase(t_message_gc = TestCase, Config) ->
     Opts = #{
         extra_emqx_conf =>
-            "\n  session_persistence.message_retention_period = 1s"
+            "\n  session_persistence.message_retention_period = 3s"
             "\n  durable_storage.messages.n_shards = 3"
     },
     common_init_per_testcase(TestCase, [{n_shards, 3} | Config], Opts);
+init_per_testcase(t_replication_options = TestCase, Config) ->
+    Opts = #{
+        extra_emqx_conf =>
+            "\n durable_storage.messages.replication_options {"
+            "\n  wal_max_size_bytes = 16000000"
+            "\n  wal_max_batch_size = 1024"
+            "\n  wal_write_strategy = o_sync"
+            "\n  wal_sync_method = datasync"
+            "\n  wal_compute_checksums = false"
+            "\n  snapshot_interval = 64"
+            "\n  resend_window = 60"
+            "\n}"
+    },
+    common_init_per_testcase(TestCase, Config, Opts);
 init_per_testcase(TestCase, Config) ->
     common_init_per_testcase(TestCase, Config, _Opts = #{}).
 
 common_init_per_testcase(TestCase, Config, Opts) ->
-    ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB),
     Apps = emqx_cth_suite:start(
         app_specs(Opts),
         #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
@@ -67,14 +77,11 @@ common_init_per_testcase(TestCase, Config, Opts) ->
 end_per_testcase(t_session_subscription_iterators, Config) ->
     Nodes = ?config(nodes, Config),
     emqx_common_test_helpers:call_janitor(60_000),
-    ok = emqx_cth_cluster:stop(Nodes),
-    end_per_testcase(common, Config);
+    ok = emqx_cth_cluster:stop(Nodes);
 end_per_testcase(_TestCase, Config) ->
     Apps = proplists:get_value(apps, Config, []),
     emqx_common_test_helpers:call_janitor(60_000),
-    clear_db(),
-    emqx_cth_suite:stop(Apps),
-    ok.
+    ok = emqx_cth_suite:stop(Apps).
 
 t_messages_persisted(_Config) ->
     C1 = connect(<<?MODULE_STRING "1">>, true, 30),
@@ -390,7 +397,7 @@ t_message_gc(Config) ->
                 message(<<"foo/bar">>, <<"1">>, 0),
                 message(<<"foo/baz">>, <<"2">>, 1)
             ],
-            ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs0),
+            ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs0, #{sync => true}),
             ?tp(inserted_batch, #{}),
             {ok, _} = ?block_until(#{?snk_kind := ps_message_gc_added_gen}),
 
@@ -399,7 +406,7 @@ t_message_gc(Config) ->
                 message(<<"foo/bar">>, <<"3">>, Now + 100),
                 message(<<"foo/baz">>, <<"4">>, Now + 101)
             ],
-            ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs1),
+            ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs1, #{sync => true}),
 
             {ok, _} = snabbkaffe:block_until(
                 ?match_n_events(NShards, #{?snk_kind := message_gc_generation_dropped}),
@@ -455,6 +462,33 @@ t_metrics_not_dropped(_Config) ->
 
     ok.
 
+t_replication_options(_Config) ->
+    ?assertMatch(
+        #{
+            backend := builtin,
+            replication_options := #{
+                wal_max_size_bytes := 16000000,
+                wal_max_batch_size := 1024,
+                wal_write_strategy := o_sync,
+                wal_sync_method := datasync,
+                wal_compute_checksums := false,
+                snapshot_interval := 64,
+                resend_window := 60
+            }
+        },
+        emqx_ds_replication_layer_meta:get_options(?PERSISTENT_MESSAGE_DB)
+    ),
+    ?assertMatch(
+        #{
+            wal_max_size_bytes := 16000000,
+            wal_max_batch_size := 1024,
+            wal_write_strategy := o_sync,
+            wal_compute_checksums := false,
+            wal_sync_method := datasync
+        },
+        ra_system:fetch(?PERSISTENT_MESSAGE_DB)
+    ).
+
 %%
 
 connect(ClientId, CleanStart, EI) ->
@@ -524,22 +558,24 @@ app_specs(Opts) ->
     ].
 
 cluster() ->
-    Spec = #{role => core, apps => app_specs()},
+    ExtraConf = "\n durable_storage.messages.n_sites = 2",
+    Spec = #{role => core, apps => app_specs(#{extra_emqx_conf => ExtraConf})},
     [
         {persistent_messages_SUITE1, Spec},
         {persistent_messages_SUITE2, Spec}
     ].
 
+wait_shards_online(Nodes = [Node | _]) ->
+    NShards = erpc:call(Node, emqx_ds_replication_layer_meta, n_shards, [?PERSISTENT_MESSAGE_DB]),
+    ?retry(500, 10, [?assertEqual(NShards, shards_online(N)) || N <- Nodes]).
+
+shards_online(Node) ->
+    length(erpc:call(Node, emqx_ds_builtin_db_sup, which_shards, [?PERSISTENT_MESSAGE_DB])).
+
 get_mqtt_port(Node, Type) ->
     {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
     Port.
 
-clear_db() ->
-    ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB),
-    mria:stop(),
-    ok = mnesia:delete_schema([node()]),
-    ok.
-
 message(Topic, Payload, PublishedAt) ->
     #message{
         topic = Topic,

+ 7 - 2
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -43,6 +43,7 @@
 
 %% Misc. API:
 -export([count/1]).
+-export([timestamp_us/0]).
 
 -export_type([
     create_db_opts/0,
@@ -147,9 +148,8 @@
 -type error(Reason) :: {error, recoverable | unrecoverable, Reason}.
 
 %% Timestamp
+%% Each message must have unique timestamp.
 %% Earliest possible timestamp is 0.
-%% TODO granularity?  Currently, we should always use milliseconds, as that's the unit we
-%% use in emqx_guid.  Otherwise, the iterators won't match the message timestamps.
 -type time() :: non_neg_integer().
 
 -type message_store_opts() ::
@@ -295,6 +295,7 @@ drop_db(DB) ->
         undefined ->
             ok;
         Module ->
+            _ = persistent_term:erase(?persistent_term(DB)),
             Module:drop_db(DB)
     end.
 
@@ -394,6 +395,10 @@ count(DB) ->
 %% Internal exports
 %%================================================================================
 
+-spec timestamp_us() -> time().
+timestamp_us() ->
+    erlang:system_time(microsecond).
+
 %%================================================================================
 %% Internal functions
 %%================================================================================

+ 22 - 7
apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl

@@ -112,7 +112,9 @@
     vector_to_key/2,
     bin_vector_to_key/2,
     key_to_vector/2,
+    key_to_coord/3,
     bin_key_to_vector/2,
+    bin_key_to_coord/3,
     key_to_bitstring/2,
     bitstring_to_key/2,
     make_filter/2,
@@ -297,13 +299,7 @@ bin_vector_to_key(Keymapper = #keymapper{vec_coord_size = DimSizeof, key_size =
 key_to_vector(#keymapper{vec_scanner = Scanner}, Key) ->
     lists:map(
         fun(Actions) ->
-            lists:foldl(
-                fun(Action, Acc) ->
-                    Acc bor extract_inv(Key, Action)
-                end,
-                0,
-                Actions
-            )
+            extract_coord(Actions, Key)
         end,
         Scanner
     ).
@@ -324,6 +320,16 @@ bin_key_to_vector(Keymapper = #keymapper{vec_coord_size = DimSizeof, key_size =
         DimSizeof
     ).
 
+-spec key_to_coord(keymapper(), scalar(), dimension()) -> coord().
+key_to_coord(#keymapper{vec_scanner = Scanner}, Key, Dim) ->
+    Actions = lists:nth(Dim, Scanner),
+    extract_coord(Actions, Key).
+
+-spec bin_key_to_coord(keymapper(), key(), dimension()) -> coord().
+bin_key_to_coord(Keymapper = #keymapper{key_size = Size}, BinKey, Dim) ->
+    <<Key:Size>> = BinKey,
+    key_to_coord(Keymapper, Key, Dim).
+
 %% @doc Transform a bitstring to a key
 -spec bitstring_to_key(keymapper(), bitstring()) -> scalar().
 bitstring_to_key(#keymapper{key_size = Size}, Bin) ->
@@ -680,6 +686,15 @@ extract_inv(Dest, #scan_action{
 }) ->
     ((Dest bsr DestOffset) band SrcBitmask) bsl SrcOffset.
 
+extract_coord(Actions, Key) ->
+    lists:foldl(
+        fun(Action, Acc) ->
+            Acc bor extract_inv(Key, Action)
+        end,
+        0,
+        Actions
+    ).
+
 ones(Bits) ->
     1 bsl Bits - 1.
 

+ 108 - 35
apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl

@@ -21,7 +21,8 @@
 -behaviour(supervisor).
 
 %% API:
--export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1]).
+-export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1, ensure_egress/1]).
+-export([which_shards/1]).
 
 %% behaviour callbacks:
 -export([init/1]).
@@ -36,12 +37,14 @@
 -define(via(REC), {via, gproc, {n, l, REC}}).
 
 -define(db_sup, ?MODULE).
--define(shard_sup, emqx_ds_builtin_db_shard_sup).
+-define(shards_sup, emqx_ds_builtin_db_shards_sup).
 -define(egress_sup, emqx_ds_builtin_db_egress_sup).
+-define(shard_sup, emqx_ds_builtin_db_shard_sup).
 
 -record(?db_sup, {db}).
--record(?shard_sup, {db}).
+-record(?shards_sup, {db}).
 -record(?egress_sup, {db}).
+-record(?shard_sup, {db, shard}).
 
 %%================================================================================
 %% API funcions
@@ -53,8 +56,8 @@ start_db(DB, Opts) ->
 
 -spec start_shard(emqx_ds_storage_layer:shard_id()) ->
     supervisor:startchild_ret().
-start_shard(Shard = {DB, _}) ->
-    supervisor:start_child(?via(#?shard_sup{db = DB}), shard_spec(DB, Shard)).
+start_shard({DB, Shard}) ->
+    supervisor:start_child(?via(#?shards_sup{db = DB}), shard_spec(DB, Shard)).
 
 -spec start_egress(emqx_ds_storage_layer:shard_id()) ->
     supervisor:startchild_ret().
@@ -63,21 +66,24 @@ start_egress({DB, Shard}) ->
 
 -spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _}.
 stop_shard(Shard = {DB, _}) ->
-    Sup = ?via(#?shard_sup{db = DB}),
+    Sup = ?via(#?shards_sup{db = DB}),
     ok = supervisor:terminate_child(Sup, Shard),
     ok = supervisor:delete_child(Sup, Shard).
 
 -spec ensure_shard(emqx_ds_storage_layer:shard_id()) ->
     ok | {error, _Reason}.
 ensure_shard(Shard) ->
-    case start_shard(Shard) of
-        {ok, _Pid} ->
-            ok;
-        {error, {already_started, _Pid}} ->
-            ok;
-        {error, Reason} ->
-            {error, Reason}
-    end.
+    ensure_started(start_shard(Shard)).
+
+-spec ensure_egress(emqx_ds_storage_layer:shard_id()) ->
+    ok | {error, _Reason}.
+ensure_egress(Shard) ->
+    ensure_started(start_egress(Shard)).
+
+-spec which_shards(emqx_ds:db()) ->
+    [_Child].
+which_shards(DB) ->
+    supervisor:which_children(?via(#?shards_sup{db = DB})).
 
 %%================================================================================
 %% behaviour callbacks
@@ -86,45 +92,78 @@ ensure_shard(Shard) ->
 init({#?db_sup{db = DB}, DefaultOpts}) ->
     %% Spec for the top-level supervisor for the database:
     logger:notice("Starting DS DB ~p", [DB]),
-    _ = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts),
-    %% TODO: before the leader election is implemented, we set ourselves as the leader for all shards:
-    MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
-    lists:foreach(
-        fun(Shard) ->
-            emqx_ds_replication_layer:maybe_set_myself_as_leader(DB, Shard)
-        end,
-        MyShards
-    ),
-    Children = [sup_spec(#?shard_sup{db = DB}, []), sup_spec(#?egress_sup{db = DB}, [])],
+    Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts),
+    ok = start_ra_system(DB, Opts),
+    Children = [
+        sup_spec(#?shards_sup{db = DB}, []),
+        sup_spec(#?egress_sup{db = DB}, []),
+        shard_allocator_spec(DB, Opts)
+    ],
     SupFlags = #{
         strategy => one_for_all,
         intensity => 0,
         period => 1
     },
     {ok, {SupFlags, Children}};
-init({#?shard_sup{db = DB}, _}) ->
-    %% Spec for the supervisor that manages the worker processes for
+init({#?shards_sup{db = _DB}, _}) ->
+    %% Spec for the supervisor that manages the supervisors for
     %% each local shard of the DB:
-    MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
-    Children = [shard_spec(DB, Shard) || Shard <- MyShards],
     SupFlags = #{
         strategy => one_for_one,
         intensity => 10,
         period => 1
     },
-    {ok, {SupFlags, Children}};
-init({#?egress_sup{db = DB}, _}) ->
+    {ok, {SupFlags, []}};
+init({#?egress_sup{db = _DB}, _}) ->
     %% Spec for the supervisor that manages the egress proxy processes
     %% managing traffic towards each of the shards of the DB:
-    Shards = emqx_ds_replication_layer_meta:shards(DB),
-    Children = [egress_spec(DB, Shard) || Shard <- Shards],
     SupFlags = #{
         strategy => one_for_one,
         intensity => 0,
         period => 1
     },
+    {ok, {SupFlags, []}};
+init({#?shard_sup{db = DB, shard = Shard}, _}) ->
+    SupFlags = #{
+        strategy => rest_for_one,
+        intensity => 10,
+        period => 100
+    },
+    Opts = emqx_ds_replication_layer_meta:get_options(DB),
+    Children = [
+        shard_storage_spec(DB, Shard, Opts),
+        shard_replication_spec(DB, Shard, Opts)
+    ],
     {ok, {SupFlags, Children}}.
 
+start_ra_system(DB, #{replication_options := ReplicationOpts}) ->
+    DataDir = filename:join([emqx_ds:base_dir(), DB, dsrepl]),
+    Config = lists:foldr(fun maps:merge/2, #{}, [
+        ra_system:default_config(),
+        #{
+            name => DB,
+            data_dir => DataDir,
+            wal_data_dir => DataDir,
+            names => ra_system:derive_names(DB)
+        },
+        maps:with(
+            [
+                wal_max_size_bytes,
+                wal_max_batch_size,
+                wal_write_strategy,
+                wal_sync_method,
+                wal_compute_checksums
+            ],
+            ReplicationOpts
+        )
+    ]),
+    case ra_system:start(Config) of
+        {ok, _System} ->
+            ok;
+        {error, {already_started, _System}} ->
+            ok
+    end.
+
 %%================================================================================
 %% Internal exports
 %%================================================================================
@@ -145,15 +184,39 @@ sup_spec(Id, Options) ->
     }.
 
 shard_spec(DB, Shard) ->
-    Options = emqx_ds_replication_layer_meta:get_options(DB),
     #{
-        id => Shard,
-        start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Options]},
+        id => {shard, Shard},
+        start => {?MODULE, start_link_sup, [#?shard_sup{db = DB, shard = Shard}, []]},
+        shutdown => infinity,
+        restart => permanent,
+        type => supervisor
+    }.
+
+shard_storage_spec(DB, Shard, Opts) ->
+    #{
+        id => {Shard, storage},
+        start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Opts]},
         shutdown => 5_000,
         restart => permanent,
         type => worker
     }.
 
+shard_replication_spec(DB, Shard, Opts) ->
+    #{
+        id => {Shard, replication},
+        start => {emqx_ds_replication_layer_shard, start_link, [DB, Shard, Opts]},
+        restart => transient,
+        type => worker
+    }.
+
+shard_allocator_spec(DB, Opts) ->
+    #{
+        id => shard_allocator,
+        start => {emqx_ds_replication_shard_allocator, start_link, [DB, Opts]},
+        restart => permanent,
+        type => worker
+    }.
+
 egress_spec(DB, Shard) ->
     #{
         id => Shard,
@@ -162,3 +225,13 @@ egress_spec(DB, Shard) ->
         restart => permanent,
         type => worker
     }.
+
+ensure_started(Res) ->
+    case Res of
+        {ok, _Pid} ->
+            ok;
+        {error, {already_started, _Pid}} ->
+            ok;
+        {error, Reason} ->
+            {error, Reason}
+    end.

+ 236 - 75
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -36,27 +36,36 @@
     update_iterator/3,
     next/3,
     delete_next/4,
-    node_of_shard/2,
-    shard_of_message/3,
-    maybe_set_myself_as_leader/2
+    shard_of_message/3
 ]).
 
 %% internal exports:
 -export([
+    %% RPC Targets:
     do_drop_db_v1/1,
     do_store_batch_v1/4,
     do_get_streams_v1/4,
     do_get_streams_v2/4,
-    do_make_iterator_v1/5,
     do_make_iterator_v2/5,
     do_update_iterator_v2/4,
     do_next_v1/4,
-    do_add_generation_v2/1,
     do_list_generations_with_lifetimes_v3/2,
-    do_drop_generation_v3/3,
     do_get_delete_streams_v4/4,
     do_make_delete_iterator_v4/5,
-    do_delete_next_v4/5
+    do_delete_next_v4/5,
+    %% Unused:
+    do_drop_generation_v3/3,
+    %% Obsolete:
+    do_make_iterator_v1/5,
+    do_add_generation_v2/1,
+
+    %% Egress API:
+    ra_store_batch/3
+]).
+
+-export([
+    init/1,
+    apply/3
 ]).
 
 -export_type([
@@ -85,7 +94,9 @@
         backend := builtin,
         storage := emqx_ds_storage_layer:prototype(),
         n_shards => pos_integer(),
-        replication_factor => pos_integer()
+        n_sites => pos_integer(),
+        replication_factor => pos_integer(),
+        replication_options => _TODO :: #{}
     }.
 
 %% This enapsulates the stream entity from the replication level.
@@ -150,13 +161,19 @@ open_db(DB, CreateOpts) ->
 
 -spec add_generation(emqx_ds:db()) -> ok | {error, _}.
 add_generation(DB) ->
-    Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB),
-    _ = emqx_ds_proto_v4:add_generation(Nodes, DB),
-    ok.
+    foreach_shard(
+        DB,
+        fun(Shard) -> ok = ra_add_generation(DB, Shard) end
+    ).
 
 -spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
 update_db_config(DB, CreateOpts) ->
-    emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts).
+    ok = emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts),
+    Opts = emqx_ds_replication_layer_meta:get_options(DB),
+    foreach_shard(
+        DB,
+        fun(Shard) -> ok = ra_update_config(DB, Shard, Opts) end
+    ).
 
 -spec list_generations_with_lifetimes(emqx_ds:db()) ->
     #{generation_rank() => emqx_ds:generation_info()}.
@@ -164,13 +181,12 @@ list_generations_with_lifetimes(DB) ->
     Shards = list_shards(DB),
     lists:foldl(
         fun(Shard, GensAcc) ->
-            Node = node_of_shard(DB, Shard),
             maps:fold(
                 fun(GenId, Data, AccInner) ->
                     AccInner#{{Shard, GenId} => Data}
                 end,
                 GensAcc,
-                emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard)
+                ra_list_generations_with_lifetimes(DB, Shard)
             )
         end,
         #{},
@@ -179,18 +195,15 @@ list_generations_with_lifetimes(DB) ->
 
 -spec drop_generation(emqx_ds:db(), generation_rank()) -> ok | {error, _}.
 drop_generation(DB, {Shard, GenId}) ->
-    %% TODO: drop generation in all nodes in the replica set, not only in the leader,
-    %% after we have proper replication in place.
-    Node = node_of_shard(DB, Shard),
-    emqx_ds_proto_v4:drop_generation(Node, DB, Shard, GenId).
+    ra_drop_generation(DB, Shard, GenId).
 
 -spec drop_db(emqx_ds:db()) -> ok | {error, _}.
 drop_db(DB) ->
-    Nodes = list_nodes(),
-    _ = emqx_ds_proto_v4:drop_db(Nodes, DB),
-    _ = emqx_ds_replication_layer_meta:drop_db(DB),
-    emqx_ds_builtin_sup:stop_db(DB),
-    ok.
+    foreach_shard(DB, fun(Shard) ->
+        {ok, _} = ra_drop_shard(DB, Shard)
+    end),
+    _ = emqx_ds_proto_v4:drop_db(list_nodes(), DB),
+    emqx_ds_replication_layer_meta:drop_db(DB).
 
 -spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) ->
     emqx_ds:store_batch_result().
@@ -208,10 +221,9 @@ get_streams(DB, TopicFilter, StartTime) ->
     Shards = list_shards(DB),
     lists:flatmap(
         fun(Shard) ->
-            Node = node_of_shard(DB, Shard),
             Streams =
                 try
-                    emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, StartTime)
+                    ra_get_streams(DB, Shard, TopicFilter, StartTime)
                 catch
                     error:{erpc, _} ->
                         %% TODO: log?
@@ -235,8 +247,7 @@ get_delete_streams(DB, TopicFilter, StartTime) ->
     Shards = list_shards(DB),
     lists:flatmap(
         fun(Shard) ->
-            Node = node_of_shard(DB, Shard),
-            Streams = emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, StartTime),
+            Streams = ra_get_delete_streams(DB, Shard, TopicFilter, StartTime),
             lists:map(
                 fun(StorageLayerStream) ->
                     ?delete_stream(Shard, StorageLayerStream)
@@ -251,8 +262,7 @@ get_delete_streams(DB, TopicFilter, StartTime) ->
     emqx_ds:make_iterator_result(iterator()).
 make_iterator(DB, Stream, TopicFilter, StartTime) ->
     ?stream_v2(Shard, StorageStream) = Stream,
-    Node = node_of_shard(DB, Shard),
-    try emqx_ds_proto_v4:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of
+    try ra_make_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of
         {ok, Iter} ->
             {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
         Error = {error, _, _} ->
@@ -266,12 +276,7 @@ make_iterator(DB, Stream, TopicFilter, StartTime) ->
     emqx_ds:make_delete_iterator_result(delete_iterator()).
 make_delete_iterator(DB, Stream, TopicFilter, StartTime) ->
     ?delete_stream(Shard, StorageStream) = Stream,
-    Node = node_of_shard(DB, Shard),
-    case
-        emqx_ds_proto_v4:make_delete_iterator(
-            Node, DB, Shard, StorageStream, TopicFilter, StartTime
-        )
-    of
+    case ra_make_delete_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of
         {ok, Iter} ->
             {ok, #{?tag => ?DELETE_IT, ?shard => Shard, ?enc => Iter}};
         Err = {error, _} ->
@@ -282,8 +287,7 @@ make_delete_iterator(DB, Stream, TopicFilter, StartTime) ->
     emqx_ds:make_iterator_result(iterator()).
 update_iterator(DB, OldIter, DSKey) ->
     #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter,
-    Node = node_of_shard(DB, Shard),
-    try emqx_ds_proto_v4:update_iterator(Node, DB, Shard, StorageIter, DSKey) of
+    try ra_update_iterator(DB, Shard, StorageIter, DSKey) of
         {ok, Iter} ->
             {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
         Error = {error, _, _} ->
@@ -296,7 +300,6 @@ update_iterator(DB, OldIter, DSKey) ->
 -spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
 next(DB, Iter0, BatchSize) ->
     #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0} = Iter0,
-    Node = node_of_shard(DB, Shard),
     %% TODO: iterator can contain information that is useful for
     %% reconstructing messages sent over the network. For example,
     %% when we send messages with the learned topic index, we could
@@ -305,7 +308,7 @@ next(DB, Iter0, BatchSize) ->
     %%
     %% This kind of trickery should be probably done here in the
     %% replication layer. Or, perhaps, in the logic layer.
-    case emqx_ds_proto_v4:next(Node, DB, Shard, StorageIter0, BatchSize) of
+    case ra_next(DB, Shard, StorageIter0, BatchSize) of
         {ok, StorageIter, Batch} ->
             Iter = Iter0#{?enc := StorageIter},
             {ok, Iter, Batch};
@@ -321,8 +324,7 @@ next(DB, Iter0, BatchSize) ->
     emqx_ds:delete_next_result(delete_iterator()).
 delete_next(DB, Iter0, Selector, BatchSize) ->
     #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIter0} = Iter0,
-    Node = node_of_shard(DB, Shard),
-    case emqx_ds_proto_v4:delete_next(Node, DB, Shard, StorageIter0, Selector, BatchSize) of
+    case ra_delete_next(DB, Shard, StorageIter0, Selector, BatchSize) of
         {ok, StorageIter, NumDeleted} ->
             Iter = Iter0#{?enc := StorageIter},
             {ok, Iter, NumDeleted};
@@ -330,21 +332,10 @@ delete_next(DB, Iter0, Selector, BatchSize) ->
             Other
     end.
 
--spec node_of_shard(emqx_ds:db(), shard_id()) -> node().
-node_of_shard(DB, Shard) ->
-    case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of
-        {ok, Leader} ->
-            Leader;
-        {error, no_leader_for_shard} ->
-            %% TODO: use optvar
-            timer:sleep(500),
-            node_of_shard(DB, Shard)
-    end.
-
 -spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) ->
     emqx_ds_replication_layer:shard_id().
 shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) ->
-    N = emqx_ds_replication_layer_meta:n_shards(DB),
+    N = emqx_ds_replication_shard_allocator:n_shards(DB),
     Hash =
         case SerializeBy of
             clientid -> erlang:phash2(From, N);
@@ -352,18 +343,8 @@ shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) ->
         end,
     integer_to_binary(Hash).
 
-%% TODO: there's no real leader election right now
--spec maybe_set_myself_as_leader(emqx_ds:db(), shard_id()) -> ok.
-maybe_set_myself_as_leader(DB, Shard) ->
-    Site = emqx_ds_replication_layer_meta:this_site(),
-    case emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) of
-        [Site | _] ->
-            %% Currently the first in-sync replica always becomes the
-            %% leader
-            ok = emqx_ds_replication_layer_meta:set_leader(DB, Shard, node());
-        _Sites ->
-            ok
-    end.
+foreach_shard(DB, Fun) ->
+    lists:foreach(Fun, list_shards(DB)).
 
 %%================================================================================
 %% behavior callbacks
@@ -392,7 +373,8 @@ do_drop_db_v1(DB) ->
 ) ->
     emqx_ds:store_batch_result().
 do_store_batch_v1(DB, Shard, #{?tag := ?BATCH, ?batch_messages := Messages}, Options) ->
-    emqx_ds_storage_layer:store_batch({DB, Shard}, Messages, Options).
+    Batch = [{emqx_message:timestamp(Message), Message} || Message <- Messages],
+    emqx_ds_storage_layer:store_batch({DB, Shard}, Batch, Options).
 
 %% Remove me in EMQX 5.6
 -dialyzer({nowarn_function, do_get_streams_v1/4}).
@@ -477,15 +459,9 @@ do_next_v1(DB, Shard, Iter, BatchSize) ->
 do_delete_next_v4(DB, Shard, Iter, Selector, BatchSize) ->
     emqx_ds_storage_layer:delete_next({DB, Shard}, Iter, Selector, BatchSize).
 
--spec do_add_generation_v2(emqx_ds:db()) -> ok | {error, _}.
-do_add_generation_v2(DB) ->
-    MyShards = emqx_ds_replication_layer_meta:my_owned_shards(DB),
-    lists:foreach(
-        fun(ShardId) ->
-            emqx_ds_storage_layer:add_generation({DB, ShardId})
-        end,
-        MyShards
-    ).
+-spec do_add_generation_v2(emqx_ds:db()) -> no_return().
+do_add_generation_v2(_DB) ->
+    error(obsolete_api).
 
 -spec do_list_generations_with_lifetimes_v3(emqx_ds:db(), shard_id()) ->
     #{emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info()}.
@@ -510,3 +486,188 @@ do_get_delete_streams_v4(DB, Shard, TopicFilter, StartTime) ->
 
 list_nodes() ->
     mria:running_nodes().
+
+%% TODO
+%% Too large for normal operation, need better backpressure mechanism.
+-define(RA_TIMEOUT, 60 * 1000).
+
+ra_store_batch(DB, Shard, Messages) ->
+    Command = #{
+        ?tag => ?BATCH,
+        ?batch_messages => Messages
+    },
+    Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
+    case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
+        {ok, Result, _Leader} ->
+            Result;
+        Error ->
+            Error
+    end.
+
+ra_add_generation(DB, Shard) ->
+    Command = #{
+        ?tag => add_generation,
+        ?since => emqx_ds:timestamp_us()
+    },
+    Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
+    case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
+        {ok, Result, _Leader} ->
+            Result;
+        Error ->
+            error(Error, [DB, Shard])
+    end.
+
+ra_update_config(DB, Shard, Opts) ->
+    Command = #{
+        ?tag => update_config,
+        ?config => Opts,
+        ?since => emqx_ds:timestamp_us()
+    },
+    Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
+    case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
+        {ok, Result, _Leader} ->
+            Result;
+        Error ->
+            error(Error, [DB, Shard])
+    end.
+
+ra_drop_generation(DB, Shard, GenId) ->
+    Command = #{?tag => drop_generation, ?generation => GenId},
+    Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
+    case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
+        {ok, Result, _Leader} ->
+            Result;
+        Error ->
+            error(Error, [DB, Shard])
+    end.
+
+ra_get_streams(DB, Shard, TopicFilter, Time) ->
+    {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
+    TimestampUs = timestamp_to_timeus(Time),
+    emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs).
+
+ra_get_delete_streams(DB, Shard, TopicFilter, Time) ->
+    {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
+    emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time).
+
+ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
+    {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
+    TimestampUs = timestamp_to_timeus(StartTime),
+    emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimestampUs).
+
+ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
+    {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
+    emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime).
+
+ra_update_iterator(DB, Shard, Iter, DSKey) ->
+    {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
+    emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey).
+
+ra_next(DB, Shard, Iter, BatchSize) ->
+    {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
+    emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize).
+
+ra_delete_next(DB, Shard, Iter, Selector, BatchSize) ->
+    {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
+    emqx_ds_proto_v4:delete_next(Node, DB, Shard, Iter, Selector, BatchSize).
+
+ra_list_generations_with_lifetimes(DB, Shard) ->
+    {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
+    Gens = emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard),
+    maps:map(
+        fun(_GenId, Data = #{since := Since, until := Until}) ->
+            Data#{
+                since := timeus_to_timestamp(Since),
+                until := emqx_maybe:apply(fun timeus_to_timestamp/1, Until)
+            }
+        end,
+        Gens
+    ).
+
+ra_drop_shard(DB, Shard) ->
+    ra:delete_cluster(emqx_ds_replication_layer_shard:shard_servers(DB, Shard), ?RA_TIMEOUT).
+
+%%
+
+init(#{db := DB, shard := Shard}) ->
+    #{db_shard => {DB, Shard}, latest => 0}.
+
+apply(
+    #{index := RaftIdx},
+    #{
+        ?tag := ?BATCH,
+        ?batch_messages := MessagesIn
+    },
+    #{db_shard := DBShard, latest := Latest} = State
+) ->
+    %% NOTE
+    %% Unique timestamp tracking real time closely.
+    %% With microsecond granularity it should be nearly impossible for it to run
+    %% too far ahead than the real time clock.
+    {NLatest, Messages} = assign_timestamps(Latest, MessagesIn),
+    %% TODO
+    %% Batch is now reversed, but it should not make a lot of difference.
+    %% Even if it would be in order, it's still possible to write messages far away
+    %% in the past, i.e. when replica catches up with the leader. Storage layer
+    %% currently relies on wall clock time to decide if it's safe to iterate over
+    %% next epoch, this is likely wrong. Ideally it should rely on consensus clock
+    %% time instead.
+    Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}),
+    NState = State#{latest := NLatest},
+    %% TODO: Need to measure effects of changing frequency of `release_cursor`.
+    Effect = {release_cursor, RaftIdx, NState},
+    {NState, Result, Effect};
+apply(
+    _RaftMeta,
+    #{?tag := add_generation, ?since := Since},
+    #{db_shard := DBShard, latest := Latest} = State
+) ->
+    {Timestamp, NLatest} = ensure_monotonic_timestamp(Since, Latest),
+    Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp),
+    NState = State#{latest := NLatest},
+    {NState, Result};
+apply(
+    _RaftMeta,
+    #{?tag := update_config, ?since := Since, ?config := Opts},
+    #{db_shard := DBShard, latest := Latest} = State
+) ->
+    {Timestamp, NLatest} = ensure_monotonic_timestamp(Since, Latest),
+    Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts),
+    NState = State#{latest := NLatest},
+    {NState, Result};
+apply(
+    _RaftMeta,
+    #{?tag := drop_generation, ?generation := GenId},
+    #{db_shard := DBShard} = State
+) ->
+    Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId),
+    {State, Result}.
+
+assign_timestamps(Latest, Messages) ->
+    assign_timestamps(Latest, Messages, []).
+
+assign_timestamps(Latest, [MessageIn | Rest], Acc) ->
+    case emqx_message:timestamp(MessageIn, microsecond) of
+        TimestampUs when TimestampUs > Latest ->
+            Message = assign_timestamp(TimestampUs, MessageIn),
+            assign_timestamps(TimestampUs, Rest, [Message | Acc]);
+        _Earlier ->
+            Message = assign_timestamp(Latest + 1, MessageIn),
+            assign_timestamps(Latest + 1, Rest, [Message | Acc])
+    end;
+assign_timestamps(Latest, [], Acc) ->
+    {Latest, Acc}.
+
+assign_timestamp(TimestampUs, Message) ->
+    {TimestampUs, Message}.
+
+ensure_monotonic_timestamp(TimestampUs, Latest) when TimestampUs > Latest ->
+    {TimestampUs, TimestampUs + 1};
+ensure_monotonic_timestamp(_TimestampUs, Latest) ->
+    {Latest, Latest + 1}.
+
+timestamp_to_timeus(TimestampMs) ->
+    TimestampMs * 1000.
+
+timeus_to_timestamp(TimestampUs) ->
+    TimestampUs div 1000.

+ 10 - 0
apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl

@@ -29,6 +29,16 @@
 -define(tag, 1).
 -define(shard, 2).
 -define(enc, 3).
+
+%% ?BATCH
 -define(batch_messages, 2).
+-define(timestamp, 3).
+
+%% add_generation / update_config
+-define(config, 2).
+-define(since, 3).
+
+%% drop_generation
+-define(generation, 2).
 
 -endif.

+ 22 - 21
apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl

@@ -40,7 +40,6 @@
 
 -export_type([]).
 
--include("emqx_ds_replication_layer.hrl").
 -include_lib("snabbkaffe/include/trace.hrl").
 
 %%================================================================================
@@ -109,7 +108,6 @@ store_batch(DB, Messages, Opts) ->
 -record(s, {
     db :: emqx_ds:db(),
     shard :: emqx_ds_replication_layer:shard_id(),
-    leader :: node(),
     n = 0 :: non_neg_integer(),
     tref :: reference(),
     batch = [] :: [emqx_types:message()],
@@ -119,12 +117,9 @@ store_batch(DB, Messages, Opts) ->
 init([DB, Shard]) ->
     process_flag(trap_exit, true),
     process_flag(message_queue_data, off_heap),
-    %% TODO: adjust leader dynamically
-    Leader = shard_leader(DB, Shard),
     S = #s{
         db = DB,
         shard = Shard,
-        leader = Leader,
         tref = start_timer()
     },
     {ok, S}.
@@ -156,16 +151,32 @@ terminate(_Reason, _S) ->
 %% Internal functions
 %%================================================================================
 
+-define(COOLDOWN_MIN, 1000).
+-define(COOLDOWN_MAX, 5000).
+
 do_flush(S = #s{batch = []}) ->
     S#s{tref = start_timer()};
 do_flush(
-    S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard, leader = Leader}
+    S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard}
 ) ->
-    Batch = #{?tag => ?BATCH, ?batch_messages => lists:reverse(Messages)},
-    ok = emqx_ds_proto_v2:store_batch(Leader, DB, Shard, Batch, #{}),
-    [gen_server:reply(From, ok) || From <- lists:reverse(Replies)],
-    ?tp(emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard, batch => Messages}),
-    erlang:garbage_collect(),
+    case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of
+        ok ->
+            lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies),
+            true = erlang:garbage_collect(),
+            ?tp(
+                emqx_ds_replication_layer_egress_flush,
+                #{db => DB, shard => Shard, batch => Messages}
+            );
+        Error ->
+            true = erlang:garbage_collect(),
+            ?tp(
+                warning,
+                emqx_ds_replication_layer_egress_flush_failed,
+                #{db => DB, shard => Shard, reason => Error}
+            ),
+            Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN),
+            ok = timer:sleep(Cooldown)
+    end,
     S#s{
         n = 0,
         batch = [],
@@ -212,13 +223,3 @@ do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies
 start_timer() ->
     Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100),
     erlang:send_after(Interval, self(), ?flush).
-
-shard_leader(DB, Shard) ->
-    %% TODO: use optvar
-    case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of
-        {ok, Leader} ->
-            Leader;
-        {error, no_leader_for_shard} ->
-            timer:sleep(500),
-            shard_leader(DB, Shard)
-    end.

+ 91 - 166
apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl

@@ -29,19 +29,15 @@
 -export([
     shards/1,
     my_shards/1,
-    my_owned_shards/1,
-    leader_nodes/1,
+    allocate_shards/2,
     replica_set/2,
-    in_sync_replicas/2,
     sites/0,
+    node/1,
     open_db/2,
     get_options/1,
     update_db_config/2,
     drop_db/1,
-    shard_leader/2,
     this_site/0,
-    set_leader/3,
-    is_leader/1,
     print_status/0
 ]).
 
@@ -51,12 +47,10 @@
 %% internal exports:
 -export([
     open_db_trans/2,
+    allocate_shards_trans/2,
     update_db_config_trans/2,
     drop_db_trans/1,
     claim_site/2,
-    in_sync_replicas_trans/2,
-    set_leader_trans/3,
-    is_leader_trans/1,
     n_shards/1
 ]).
 
@@ -95,9 +89,6 @@
     %% Sites that should contain the data when the cluster is in the
     %% stable state (no nodes are being added or removed from it):
     replica_set :: [site()],
-    %% Sites that contain the actual data:
-    in_sync_replicas :: [site()],
-    leader :: node() | undefined,
     misc = #{} :: map()
 }).
 
@@ -107,13 +98,24 @@
 %% Peristent term key:
 -define(emqx_ds_builtin_site, emqx_ds_builtin_site).
 
+%% Make Dialyzer happy
+-define(NODE_PAT(),
+    %% Equivalent of `#?NODE_TAB{_ = '_'}`:
+    erlang:make_tuple(record_info(size, ?NODE_TAB), '_')
+).
+
+-define(SHARD_PAT(SHARD),
+    %% Equivalent of `#?SHARD_TAB{shard = SHARD, _ = '_'}`
+    erlang:make_tuple(record_info(size, ?SHARD_TAB), '_', [{#?SHARD_TAB.shard, SHARD}])
+).
+
 %%================================================================================
 %% API funcions
 %%================================================================================
 
 -spec print_status() -> ok.
 print_status() ->
-    io:format("THIS SITE:~n~s~n", [base64:encode(this_site())]),
+    io:format("THIS SITE:~n~s~n", [this_site()]),
     io:format("~nSITES:~n", []),
     Nodes = [node() | nodes()],
     lists:foreach(
@@ -123,28 +125,18 @@ print_status() ->
                     true -> up;
                     false -> down
                 end,
-            io:format("~s    ~p    ~p~n", [base64:encode(Site), Node, Status])
+            io:format("~s    ~p    ~p~n", [Site, Node, Status])
         end,
         eval_qlc(mnesia:table(?NODE_TAB))
     ),
     io:format(
-        "~nSHARDS:~nId                             Leader                            Status~n", []
+        "~nSHARDS:~nId                             Replicas~n", []
     ),
     lists:foreach(
-        fun(#?SHARD_TAB{shard = {DB, Shard}, leader = Leader}) ->
+        fun(#?SHARD_TAB{shard = {DB, Shard}, replica_set = RS}) ->
             ShardStr = string:pad(io_lib:format("~p/~s", [DB, Shard]), 30),
-            LeaderStr = string:pad(atom_to_list(Leader), 33),
-            Status =
-                case lists:member(Leader, Nodes) of
-                    true ->
-                        case node() of
-                            Leader -> "up *";
-                            _ -> "up"
-                        end;
-                    false ->
-                        "down"
-                end,
-            io:format("~s ~s ~s~n", [ShardStr, LeaderStr, Status])
+            ReplicasStr = string:pad(io_lib:format("~p", [RS]), 40),
+            io:format("~s ~s~n", [ShardStr, ReplicasStr])
         end,
         eval_qlc(mnesia:table(?SHARD_TAB))
     ).
@@ -169,30 +161,19 @@ shards(DB) ->
 -spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
 my_shards(DB) ->
     Site = this_site(),
-    filter_shards(DB, fun(#?SHARD_TAB{replica_set = ReplicaSet, in_sync_replicas = InSync}) ->
-        lists:member(Site, ReplicaSet) orelse lists:member(Site, InSync)
+    filter_shards(DB, fun(#?SHARD_TAB{replica_set = ReplicaSet}) ->
+        lists:member(Site, ReplicaSet)
     end).
 
--spec my_owned_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
-my_owned_shards(DB) ->
-    Self = node(),
-    filter_shards(DB, fun(#?SHARD_TAB{leader = Leader}) ->
-        Self =:= Leader
-    end).
-
--spec leader_nodes(emqx_ds:db()) -> [node()].
-leader_nodes(DB) ->
-    lists:uniq(
-        filter_shards(
-            DB,
-            fun(#?SHARD_TAB{leader = Leader}) ->
-                Leader =/= undefined
-            end,
-            fun(#?SHARD_TAB{leader = Leader}) ->
-                Leader
-            end
-        )
-    ).
+allocate_shards(DB, Opts) ->
+    case mria:transaction(?SHARD, fun ?MODULE:allocate_shards_trans/2, [DB, Opts]) of
+        {atomic, Shards} ->
+            {ok, Shards};
+        {aborted, {shards_already_allocated, Shards}} ->
+            {ok, Shards};
+        {aborted, {insufficient_sites_online, Needed, Sites}} ->
+            {error, #{reason => insufficient_sites_online, needed => Needed, sites => Sites}}
+    end.
 
 -spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
     {ok, [site()]} | {error, _}.
@@ -204,46 +185,27 @@ replica_set(DB, Shard) ->
             {error, no_shard}
     end.
 
--spec in_sync_replicas(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
-    [site()].
-in_sync_replicas(DB, ShardId) ->
-    {atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:in_sync_replicas_trans/2, [DB, ShardId]),
-    case Result of
-        {ok, InSync} ->
-            InSync;
-        {error, _} ->
-            []
-    end.
-
 -spec sites() -> [site()].
 sites() ->
     eval_qlc(qlc:q([Site || #?NODE_TAB{site = Site} <- mnesia:table(?NODE_TAB)])).
 
--spec shard_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
-    {ok, node()} | {error, no_leader_for_shard}.
-shard_leader(DB, Shard) ->
-    case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
-        [#?SHARD_TAB{leader = Leader}] when Leader =/= undefined ->
-            {ok, Leader};
-        _ ->
-            {error, no_leader_for_shard}
+-spec node(site()) -> node() | undefined.
+node(Site) ->
+    case mnesia:dirty_read(?NODE_TAB, Site) of
+        [#?NODE_TAB{node = Node}] ->
+            Node;
+        [] ->
+            undefined
     end.
 
--spec set_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), node()) ->
-    ok.
-set_leader(DB, Shard, Node) ->
-    {atomic, _} = mria:transaction(?SHARD, fun ?MODULE:set_leader_trans/3, [DB, Shard, Node]),
-    ok.
-
--spec is_leader(node()) -> boolean().
-is_leader(Node) ->
-    {atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:is_leader_trans/1, [Node]),
-    Result.
-
 -spec get_options(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts().
 get_options(DB) ->
-    {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, undefined]),
-    Opts.
+    case mnesia:dirty_read(?META_TAB, DB) of
+        [#?META_TAB{db_props = Opts}] ->
+            Opts;
+        [] ->
+            #{}
+    end.
 
 -spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
     emqx_ds_replication_layer:builtin_db_opts().
@@ -275,7 +237,6 @@ init([]) ->
     logger:set_process_metadata(#{domain => [ds, meta]}),
     ensure_tables(),
     ensure_site(),
-    {ok, _} = mnesia:subscribe({table, ?META_TAB, detailed}),
     S = #s{},
     {ok, S}.
 
@@ -285,18 +246,6 @@ handle_call(_Call, _From, S) ->
 handle_cast(_Cast, S) ->
     {noreply, S}.
 
-handle_info(
-    {mnesia_table_event, {write, ?META_TAB, #?META_TAB{db = DB, db_props = Options}, [_], _}}, S
-) ->
-    MyShards = my_owned_shards(DB),
-
-    lists:foreach(
-        fun(ShardId) ->
-            emqx_ds_storage_layer:update_config({DB, ShardId}, Options)
-        end,
-        MyShards
-    ),
-    {noreply, S};
 handle_info(_Info, S) ->
     {noreply, S}.
 
@@ -308,20 +257,60 @@ terminate(_Reason, #s{}) ->
 %% Internal exports
 %%================================================================================
 
--spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts() | undefined) ->
+-spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
     emqx_ds_replication_layer:builtin_db_opts().
 open_db_trans(DB, CreateOpts) ->
     case mnesia:wread({?META_TAB, DB}) of
-        [] when is_map(CreateOpts) ->
-            NShards = maps:get(n_shards, CreateOpts),
-            ReplicationFactor = maps:get(replication_factor, CreateOpts),
+        [] ->
             mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}),
-            create_shards(DB, NShards, ReplicationFactor),
             CreateOpts;
         [#?META_TAB{db_props = Opts}] ->
             Opts
     end.
 
+-spec allocate_shards_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> [_Shard].
+allocate_shards_trans(DB, Opts) ->
+    NShards = maps:get(n_shards, Opts),
+    NSites = maps:get(n_sites, Opts),
+    ReplicationFactor = maps:get(replication_factor, Opts),
+    NReplicas = min(NSites, ReplicationFactor),
+    Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)],
+    AllSites = mnesia:match_object(?NODE_TAB, ?NODE_PAT(), read),
+    case length(AllSites) of
+        N when N >= NSites ->
+            ok;
+        _ ->
+            mnesia:abort({insufficient_sites_online, NSites, AllSites})
+    end,
+    case mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write) of
+        [] ->
+            ok;
+        Records ->
+            ShardsAllocated = [Shard || #?SHARD_TAB{shard = {_DB, Shard}} <- Records],
+            mnesia:abort({shards_already_allocated, ShardsAllocated})
+    end,
+    {Allocation, _} = lists:mapfoldl(
+        fun(Shard, SSites) ->
+            {Sites, _} = emqx_utils_stream:consume(NReplicas, SSites),
+            {_, SRest} = emqx_utils_stream:consume(1, SSites),
+            {{Shard, Sites}, SRest}
+        end,
+        emqx_utils_stream:repeat(emqx_utils_stream:list(AllSites)),
+        Shards
+    ),
+    lists:map(
+        fun({Shard, Sites}) ->
+            ReplicaSet = [Site || #?NODE_TAB{site = Site} <- Sites],
+            Record = #?SHARD_TAB{
+                shard = {DB, Shard},
+                replica_set = ReplicaSet
+            },
+            ok = mnesia:write(Record),
+            Shard
+        end,
+        Allocation
+    ).
+
 -spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
     ok | {error, database}.
 update_db_config_trans(DB, CreateOpts) ->
@@ -357,51 +346,13 @@ drop_db_trans(DB) ->
 claim_site(Site, Node) ->
     mnesia:write(#?NODE_TAB{site = Site, node = Node}).
 
--spec in_sync_replicas_trans(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
-    {ok, [site()]} | {error, no_shard}.
-in_sync_replicas_trans(DB, Shard) ->
-    case mnesia:read(?SHARD_TAB, {DB, Shard}) of
-        [#?SHARD_TAB{in_sync_replicas = InSync}] ->
-            {ok, InSync};
-        [] ->
-            {error, no_shard}
-    end.
-
--spec set_leader_trans(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), node()) ->
-    ok.
-set_leader_trans(DB, Shard, Node) ->
-    [Record0] = mnesia:wread({?SHARD_TAB, {DB, Shard}}),
-    Record = Record0#?SHARD_TAB{leader = Node},
-    mnesia:write(Record).
-
--spec is_leader_trans(node) -> boolean().
-is_leader_trans(Node) ->
-    case
-        mnesia:select(
-            ?SHARD_TAB,
-            ets:fun2ms(fun(#?SHARD_TAB{leader = Leader}) ->
-                Leader =:= Node
-            end),
-            1,
-            read
-        )
-    of
-        {[_ | _], _Cont} ->
-            true;
-        _ ->
-            false
-    end.
-
 %%================================================================================
 %% Internal functions
 %%================================================================================
 
 ensure_tables() ->
-    %% TODO: seems like it may introduce flakiness
-    Majority = false,
     ok = mria:create_table(?META_TAB, [
         {rlog_shard, ?SHARD},
-        {majority, Majority},
         {type, ordered_set},
         {storage, disc_copies},
         {record_name, ?META_TAB},
@@ -409,7 +360,6 @@ ensure_tables() ->
     ]),
     ok = mria:create_table(?NODE_TAB, [
         {rlog_shard, ?SHARD},
-        {majority, Majority},
         {type, ordered_set},
         {storage, disc_copies},
         {record_name, ?NODE_TAB},
@@ -417,7 +367,6 @@ ensure_tables() ->
     ]),
     ok = mria:create_table(?SHARD_TAB, [
         {rlog_shard, ?SHARD},
-        {majority, Majority},
         {type, ordered_set},
         {storage, disc_copies},
         {record_name, ?SHARD_TAB},
@@ -431,8 +380,8 @@ ensure_site() ->
         {ok, [Site]} ->
             ok;
         _ ->
-            Site = crypto:strong_rand_bytes(8),
-            logger:notice("Creating a new site with ID=~s", [base64:encode(Site)]),
+            Site = binary:encode_hex(crypto:strong_rand_bytes(8)),
+            logger:notice("Creating a new site with ID=~s", [Site]),
             ok = filelib:ensure_dir(Filename),
             {ok, FD} = file:open(Filename, [write]),
             io:format(FD, "~p.", [Site]),
@@ -442,30 +391,6 @@ ensure_site() ->
     persistent_term:put(?emqx_ds_builtin_site, Site),
     ok.
 
--spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok.
-create_shards(DB, NShards, ReplicationFactor) ->
-    Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)],
-    AllSites = sites(),
-    lists:foreach(
-        fun(Shard) ->
-            Hashes0 = [{hash(Shard, Site), Site} || Site <- AllSites],
-            Hashes = lists:sort(Hashes0),
-            {_, Sites} = lists:unzip(Hashes),
-            [First | ReplicaSet] = lists:sublist(Sites, 1, ReplicationFactor),
-            Record = #?SHARD_TAB{
-                shard = {DB, Shard},
-                replica_set = ReplicaSet,
-                in_sync_replicas = [First]
-            },
-            mnesia:write(Record)
-        end,
-        Shards
-    ).
-
--spec hash(emqx_ds_replication_layer:shard_id(), site()) -> any().
-hash(Shard, Site) ->
-    erlang:phash2({Shard, Site}).
-
 eval_qlc(Q) ->
     case mnesia:is_transaction() of
         true ->

+ 207 - 0
apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl

@@ -0,0 +1,207 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_replication_layer_shard).
+
+-export([start_link/3]).
+
+%% Static server configuration
+-export([
+    shard_servers/2,
+    local_server/2
+]).
+
+%% Dynamic server location API
+-export([
+    servers/3,
+    server/3
+]).
+
+-behaviour(gen_server).
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    terminate/2
+]).
+
+%%
+
+start_link(DB, Shard, Opts) ->
+    gen_server:start_link(?MODULE, {DB, Shard, Opts}, []).
+
+shard_servers(DB, Shard) ->
+    {ok, ReplicaSet} = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
+    [
+        {server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)}
+     || Site <- ReplicaSet
+    ].
+
+local_server(DB, Shard) ->
+    Site = emqx_ds_replication_layer_meta:this_site(),
+    {server_name(DB, Shard, Site), node()}.
+
+cluster_name(DB, Shard) ->
+    iolist_to_binary(io_lib:format("~s_~s", [DB, Shard])).
+
+server_name(DB, Shard, Site) ->
+    DBBin = atom_to_binary(DB),
+    binary_to_atom(<<"ds_", DBBin/binary, Shard/binary, "_", Site/binary>>).
+
+%%
+
+servers(DB, Shard, _Order = leader_preferred) ->
+    get_servers_leader_preferred(DB, Shard);
+servers(DB, Shard, _Order = undefined) ->
+    get_shard_servers(DB, Shard).
+
+server(DB, Shard, _Which = local_preferred) ->
+    get_server_local_preferred(DB, Shard).
+
+get_servers_leader_preferred(DB, Shard) ->
+    %% NOTE: Contact last known leader first, then rest of shard servers.
+    ClusterName = get_cluster_name(DB, Shard),
+    case ra_leaderboard:lookup_leader(ClusterName) of
+        Leader when Leader /= undefined ->
+            Servers = ra_leaderboard:lookup_members(ClusterName),
+            [Leader | lists:delete(Leader, Servers)];
+        undefined ->
+            %% TODO: Dynamic membership.
+            get_shard_servers(DB, Shard)
+    end.
+
+get_server_local_preferred(DB, Shard) ->
+    %% NOTE: Contact random replica that is not a known leader.
+    %% TODO: Replica may be down, so we may need to retry.
+    ClusterName = get_cluster_name(DB, Shard),
+    case ra_leaderboard:lookup_members(ClusterName) of
+        Servers when is_list(Servers) ->
+            pick_local(Servers);
+        undefined ->
+            %% TODO
+            %% Leader is unkonwn if there are no servers of this group on the
+            %% local node. We want to pick a replica in that case as well.
+            %% TODO: Dynamic membership.
+            pick_random(get_shard_servers(DB, Shard))
+    end.
+
+pick_local(Servers) ->
+    case lists:dropwhile(fun({_Name, Node}) -> Node =/= node() end, Servers) of
+        [Local | _] ->
+            Local;
+        [] ->
+            pick_random(Servers)
+    end.
+
+pick_random(Servers) ->
+    lists:nth(rand:uniform(length(Servers)), Servers).
+
+get_cluster_name(DB, Shard) ->
+    memoize(fun cluster_name/2, [DB, Shard]).
+
+get_local_server(DB, Shard) ->
+    memoize(fun local_server/2, [DB, Shard]).
+
+get_shard_servers(DB, Shard) ->
+    maps:get(servers, emqx_ds_replication_shard_allocator:shard_meta(DB, Shard)).
+
+%%
+
+init({DB, Shard, Opts}) ->
+    _ = process_flag(trap_exit, true),
+    _Meta = start_shard(DB, Shard, Opts),
+    {ok, {DB, Shard}}.
+
+handle_call(_Call, _From, State) ->
+    {reply, ignored, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+terminate(_Reason, {DB, Shard}) ->
+    LocalServer = get_local_server(DB, Shard),
+    ok = ra:stop_server(DB, LocalServer).
+
+%%
+
+start_shard(DB, Shard, #{replication_options := ReplicationOpts}) ->
+    Site = emqx_ds_replication_layer_meta:this_site(),
+    ClusterName = cluster_name(DB, Shard),
+    LocalServer = local_server(DB, Shard),
+    Servers = shard_servers(DB, Shard),
+    case ra:restart_server(DB, LocalServer) of
+        ok ->
+            Bootstrap = false;
+        {error, name_not_registered} ->
+            Bootstrap = true,
+            ok = ra:start_server(DB, #{
+                id => LocalServer,
+                uid => <<ClusterName/binary, "_", Site/binary>>,
+                cluster_name => ClusterName,
+                initial_members => Servers,
+                machine => {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
+                log_init_args => maps:with(
+                    [
+                        snapshot_interval,
+                        resend_window
+                    ],
+                    ReplicationOpts
+                )
+            })
+    end,
+    case Servers of
+        [LocalServer | _] ->
+            %% TODO
+            %% Not super robust, but we probably don't expect nodes to be down
+            %% when we bring up a fresh consensus group. Triggering election
+            %% is not really required otherwise.
+            %% TODO
+            %% Ensure that doing that on node restart does not disrupt consensus.
+            %% Edit: looks like it doesn't, this could actually be quite useful
+            %% to "steal" leadership from nodes that have too much leader load.
+            %% TODO
+            %% It doesn't really work that way. There's `ra:transfer_leadership/2`
+            %% for that.
+            try
+                ra:trigger_election(LocalServer, _Timeout = 1_000)
+            catch
+                %% TODO
+                %% Tolerating exceptions because server might be occupied with log
+                %% replay for a while.
+                exit:{timeout, _} when not Bootstrap ->
+                    ok
+            end;
+        _ ->
+            ok
+    end,
+    #{
+        cluster_name => ClusterName,
+        servers => Servers,
+        local_server => LocalServer
+    }.
+
+%%
+
+memoize(Fun, Args) ->
+    %% NOTE: Assuming that the function is pure and never returns `undefined`.
+    case persistent_term:get([Fun | Args], undefined) of
+        undefined ->
+            Result = erlang:apply(Fun, Args),
+            _ = persistent_term:put([Fun | Args], Result),
+            Result;
+        Result ->
+            Result
+    end.

+ 154 - 0
apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl

@@ -0,0 +1,154 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_replication_shard_allocator).
+
+-export([start_link/2]).
+
+-export([n_shards/1]).
+-export([shard_meta/2]).
+
+-behaviour(gen_server).
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2
+]).
+
+-define(db_meta(DB), {?MODULE, DB}).
+-define(shard_meta(DB, SHARD), {?MODULE, DB, SHARD}).
+
+%%
+
+start_link(DB, Opts) ->
+    gen_server:start_link(?MODULE, {DB, Opts}, []).
+
+n_shards(DB) ->
+    Meta = persistent_term:get(?db_meta(DB)),
+    maps:get(n_shards, Meta).
+
+shard_meta(DB, Shard) ->
+    persistent_term:get(?shard_meta(DB, Shard)).
+
+%%
+
+-define(ALLOCATE_RETRY_TIMEOUT, 1_000).
+
+init({DB, Opts}) ->
+    _ = erlang:process_flag(trap_exit, true),
+    _ = logger:set_process_metadata(#{db => DB, domain => [ds, db, shard_allocator]}),
+    State = #{db => DB, opts => Opts, status => allocating},
+    case allocate_shards(State) of
+        {ok, NState} ->
+            {ok, NState};
+        {error, Data} ->
+            _ = logger:notice(
+                Data#{
+                    msg => "Shard allocation still in progress",
+                    retry_in => ?ALLOCATE_RETRY_TIMEOUT
+                }
+            ),
+            {ok, State, ?ALLOCATE_RETRY_TIMEOUT}
+    end.
+
+handle_call(_Call, _From, State) ->
+    {reply, ignored, State}.
+
+handle_cast(_Cast, State) ->
+    {noreply, State}.
+
+handle_info(timeout, State) ->
+    case allocate_shards(State) of
+        {ok, NState} ->
+            {noreply, NState};
+        {error, Data} ->
+            _ = logger:notice(
+                Data#{
+                    msg => "Shard allocation still in progress",
+                    retry_in => ?ALLOCATE_RETRY_TIMEOUT
+                }
+            ),
+            {noreply, State, ?ALLOCATE_RETRY_TIMEOUT}
+    end;
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, #{db := DB, shards := Shards}) ->
+    erase_db_meta(DB),
+    erase_shards_meta(DB, Shards);
+terminate(_Reason, #{}) ->
+    ok.
+
+%%
+
+allocate_shards(State = #{db := DB, opts := Opts}) ->
+    case emqx_ds_replication_layer_meta:allocate_shards(DB, Opts) of
+        {ok, Shards} ->
+            logger:notice(#{msg => "Shards allocated", shards => Shards}),
+            ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB)),
+            ok = start_egresses(DB, Shards),
+            ok = save_db_meta(DB, Shards),
+            ok = save_shards_meta(DB, Shards),
+            {ok, State#{shards => Shards, status := ready}};
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+start_shards(DB, Shards) ->
+    ok = lists:foreach(
+        fun(Shard) ->
+            ok = emqx_ds_builtin_db_sup:ensure_shard({DB, Shard})
+        end,
+        Shards
+    ),
+    ok = logger:info(#{msg => "Shards started", shards => Shards}),
+    ok.
+
+start_egresses(DB, Shards) ->
+    ok = lists:foreach(
+        fun(Shard) ->
+            ok = emqx_ds_builtin_db_sup:ensure_egress({DB, Shard})
+        end,
+        Shards
+    ),
+    logger:info(#{msg => "Egresses started", shards => Shards}),
+    ok.
+
+save_db_meta(DB, Shards) ->
+    persistent_term:put(?db_meta(DB), #{
+        shards => Shards,
+        n_shards => length(Shards)
+    }).
+
+save_shards_meta(DB, Shards) ->
+    lists:foreach(fun(Shard) -> save_shard_meta(DB, Shard) end, Shards).
+
+save_shard_meta(DB, Shard) ->
+    Servers = emqx_ds_replication_layer_shard:shard_servers(DB, Shard),
+    persistent_term:put(?shard_meta(DB, Shard), #{
+        servers => Servers
+    }).
+
+erase_db_meta(DB) ->
+    persistent_term:erase(?db_meta(DB)).
+
+erase_shards_meta(DB, Shards) ->
+    lists:foreach(fun(Shard) -> erase_shard_meta(DB, Shard) end, Shards).
+
+erase_shard_meta(DB, Shard) ->
+    persistent_term:erase(?shard_meta(DB, Shard)).

+ 58 - 57
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -137,6 +137,9 @@
 
 -include("emqx_ds_bitmask.hrl").
 
+-define(DIM_TOPIC, 1).
+-define(DIM_TS, 2).
+
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 -endif.
@@ -160,8 +163,8 @@ create(_ShardId, DBHandle, GenId, Options) ->
     %% Get options:
     BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64),
     TopicIndexBytes = maps:get(topic_index_bytes, Options, 4),
-    %% 10 bits -> 1024 ms -> ~1 sec
-    TSOffsetBits = maps:get(epoch_bits, Options, 10),
+    %% 20 bits -> 1048576 us -> ~1 sec
+    TSOffsetBits = maps:get(epoch_bits, Options, 20),
     %% Create column families:
     DataCFName = data_cf(GenId),
     TrieCFName = trie_cf(GenId),
@@ -242,16 +245,19 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{}) ->
     ok.
 
 -spec store_batch(
-    emqx_ds_storage_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts()
+    emqx_ds_storage_layer:shard_id(),
+    s(),
+    [{emqx_ds:time(), emqx_types:message()}],
+    emqx_ds:message_store_opts()
 ) ->
     emqx_ds:store_batch_result().
 store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
     {ok, Batch} = rocksdb:batch(),
     lists:foreach(
-        fun(Msg) ->
-            {Key, _} = make_key(S, Msg),
+        fun({Timestamp, Msg}) ->
+            {Key, _} = make_key(S, Timestamp, Msg),
             Val = serialize(Msg),
-            rocksdb:batch_put(Batch, Data, Key, Val)
+            rocksdb:put(DB, Data, Key, Val, [])
         end,
         Messages
     ),
@@ -345,7 +351,7 @@ next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) ->
     %% Compute safe cutoff time.
     %% It's the point in time where the last complete epoch ends, so we need to know
     %% the current time to compute it.
-    Now = emqx_message:timestamp_now(),
+    Now = emqx_ds:timestamp_us(),
     SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset,
     next_until(Schema, It, SafeCutoffTime, BatchSize).
 
@@ -436,9 +442,7 @@ prepare_loop_context(DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Key
     %% Make filter:
     Inequations = [
         {'=', TopicIndex},
-        {StartTime, '..', SafeCutoffTime - 1},
-        %% Unique integer:
-        any
+        {StartTime, '..', SafeCutoffTime - 1}
         %% Varying topic levels:
         | lists:map(
             fun
@@ -483,39 +487,44 @@ next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) ->
             true = Key1 > Key0,
             case rocksdb:iterator_move(ITHandle, {seek, Key1}) of
                 {ok, Key, Val} ->
-                    {N, It, Acc} =
-                        traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N0),
+                    {N, It, Acc} = traverse_interval(
+                        ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N0
+                    ),
                     next_loop(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N);
                 {error, invalid_iterator} ->
                     {ok, It0, lists:reverse(Acc0)}
             end
     end.
 
-traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N) ->
+traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N) ->
     It = It0#{?last_seen_key := Key},
-    case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of
+    Timestamp = emqx_ds_bitmask_keymapper:bin_key_to_coord(KeyMapper, Key, ?DIM_TS),
+    case
+        emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) andalso
+            check_timestamp(Cutoff, It, Timestamp)
+    of
         true ->
             Msg = deserialize(Val),
-            case check_message(Cutoff, It, Msg) of
+            case check_message(It, Msg) of
                 true ->
                     Acc = [{Key, Msg} | Acc0],
-                    traverse_interval(ITHandle, Filter, Cutoff, It, Acc, N - 1);
+                    traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N - 1);
                 false ->
-                    traverse_interval(ITHandle, Filter, Cutoff, It, Acc0, N);
-                overflow ->
-                    {0, It0, Acc0}
+                    traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc0, N)
             end;
+        overflow ->
+            {0, It0, Acc0};
         false ->
             {N, It, Acc0}
     end.
 
-traverse_interval(_ITHandle, _Filter, _Cutoff, It, Acc, 0) ->
+traverse_interval(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) ->
     {0, It, Acc};
-traverse_interval(ITHandle, Filter, Cutoff, It, Acc, N) ->
+traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N) ->
     inc_counter(),
     case rocksdb:iterator_move(ITHandle, next) of
         {ok, Key, Val} ->
-            traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It, Acc, N);
+            traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It, Acc, N);
         {error, invalid_iterator} ->
             {0, It, Acc}
     end.
@@ -564,6 +573,7 @@ delete_traverse_interval(LoopContext0) ->
         storage_iter := It0,
         current_key := Key,
         current_val := Val,
+        keymapper := KeyMapper,
         filter := Filter,
         safe_cutoff_time := Cutoff,
         selector := Selector,
@@ -574,10 +584,14 @@ delete_traverse_interval(LoopContext0) ->
         remaining := Remaining0
     } = LoopContext0,
     It = It0#{?last_seen_key := Key},
-    case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of
+    Timestamp = emqx_ds_bitmask_keymapper:bin_key_to_coord(KeyMapper, Key, ?DIM_TS),
+    case
+        emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) andalso
+            check_timestamp(Cutoff, It, Timestamp)
+    of
         true ->
             Msg = deserialize(Val),
-            case check_message(Cutoff, It, Msg) of
+            case check_message(It, Msg) of
                 true ->
                     case Selector(Msg) of
                         true ->
@@ -590,10 +604,10 @@ delete_traverse_interval(LoopContext0) ->
                             delete_traverse_interval1(LoopContext0#{remaining := Remaining0 - 1})
                     end;
                 false ->
-                    delete_traverse_interval1(LoopContext0);
-                overflow ->
-                    {0, It0, AccDel0, AccIter0}
+                    delete_traverse_interval1(LoopContext0)
             end;
+        overflow ->
+            {0, It0, AccDel0, AccIter0};
         false ->
             {Remaining0, It, AccDel0, AccIter0}
     end.
@@ -621,39 +635,28 @@ delete_traverse_interval1(LoopContext0) ->
             {0, It, AccDel, AccIter}
     end.
 
--spec check_message(emqx_ds:time(), iterator() | delete_iterator(), emqx_types:message()) ->
+-spec check_timestamp(emqx_ds:time(), iterator() | delete_iterator(), emqx_ds:time()) ->
     true | false | overflow.
-check_message(
-    Cutoff,
-    _It,
-    #message{timestamp = Timestamp}
-) when Timestamp >= Cutoff ->
+check_timestamp(Cutoff, _It, Timestamp) when Timestamp >= Cutoff ->
     %% We hit the current epoch, we can't continue iterating over it yet.
     %% It would be unsafe otherwise: messages can be stored in the current epoch
     %% concurrently with iterating over it. They can end up earlier (in the iteration
     %% order) due to the nature of keymapping, potentially causing us to miss them.
     overflow;
-check_message(
-    _Cutoff,
-    #{?tag := ?IT, ?start_time := StartTime, ?topic_filter := TopicFilter},
-    #message{timestamp = Timestamp, topic = Topic}
-) when Timestamp >= StartTime ->
-    emqx_topic:match(emqx_topic:tokens(Topic), TopicFilter);
-check_message(
-    _Cutoff,
-    #{?tag := ?DELETE_IT, ?start_time := StartTime, ?topic_filter := TopicFilter},
-    #message{timestamp = Timestamp, topic = Topic}
-) when Timestamp >= StartTime ->
-    emqx_topic:match(emqx_topic:tokens(Topic), TopicFilter);
-check_message(_Cutoff, _It, _Msg) ->
-    false.
+check_timestamp(_Cutoff, #{?start_time := StartTime}, Timestamp) ->
+    Timestamp >= StartTime.
+
+-spec check_message(iterator() | delete_iterator(), emqx_types:message()) ->
+    true | false.
+check_message(#{?topic_filter := TopicFilter}, #message{topic = Topic}) ->
+    emqx_topic:match(emqx_topic:tokens(Topic), TopicFilter).
 
 format_key(KeyMapper, Key) ->
     Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)],
     lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])).
 
--spec make_key(s(), emqx_types:message()) -> {binary(), [binary()]}.
-make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestamp, topic = TopicBin}) ->
+-spec make_key(s(), emqx_ds:time(), emqx_types:message()) -> {binary(), [binary()]}.
+make_key(#s{keymappers = KeyMappers, trie = Trie}, Timestamp, #message{topic = TopicBin}) ->
     Tokens = emqx_topic:words(TopicBin),
     {TopicIndex, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
     VaryingHashes = [hash_topic_level(I) || I <- Varying],
@@ -666,11 +669,10 @@ make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestam
 ]) ->
     binary().
 make_key(KeyMapper, TopicIndex, Timestamp, Varying) ->
-    UniqueInteger = erlang:unique_integer([monotonic, positive]),
     emqx_ds_bitmask_keymapper:key_to_bitstring(
         KeyMapper,
         emqx_ds_bitmask_keymapper:vector_to_key(KeyMapper, [
-            TopicIndex, Timestamp, UniqueInteger | Varying
+            TopicIndex, Timestamp | Varying
         ])
     ).
 
@@ -723,13 +725,12 @@ deserialize(Blob) ->
 %% erlfmt-ignore
 make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) ->
     Bitsources =
-    %% Dimension Offset        Bitsize
-        [{1,     0,            TopicIndexBytes * ?BYTE_SIZE},      %% Topic index
-         {2,     TSOffsetBits, TSBits - TSOffsetBits       }] ++   %% Timestamp epoch
-        [{3 + I, 0,            BitsPerTopicLevel           }       %% Varying topic levels
+    %% Dimension Offset   Bitsize
+        [{?DIM_TOPIC,     0,            TopicIndexBytes * ?BYTE_SIZE},      %% Topic index
+         {?DIM_TS,        TSOffsetBits, TSBits - TSOffsetBits       }] ++   %% Timestamp epoch
+        [{?DIM_TS + I,    0,            BitsPerTopicLevel           }       %% Varying topic levels
                                                            || I <- lists:seq(1, N)] ++
-        [{2,     0,            TSOffsetBits                },      %% Timestamp offset
-         {3,     0,            64                          }],     %% Unique integer
+        [{?DIM_TS,        0,            TSOffsetBits                }],     %% Timestamp offset
     Keymapper = emqx_ds_bitmask_keymapper:make_keymapper(lists:reverse(Bitsources)),
     %% Assert:
     case emqx_ds_bitmask_keymapper:bitsize(Keymapper) rem 8 of

+ 106 - 74
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -29,8 +29,8 @@
     update_iterator/3,
     next/3,
     delete_next/4,
-    update_config/2,
-    add_generation/1,
+    update_config/3,
+    add_generation/2,
     list_generations_with_lifetimes/1,
     drop_generation/2
 ]).
@@ -133,7 +133,7 @@
     cf_refs := cf_refs(),
     %% Time at which this was created.  Might differ from `since', in particular for the
     %% first generation.
-    created_at := emqx_ds:time(),
+    created_at := emqx_message:timestamp(),
     %% When should this generation become active?
     %% This generation should only contain messages timestamped no earlier than that.
     %% The very first generation will have `since` equal 0.
@@ -194,7 +194,12 @@
 -callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) ->
     ok | {error, _Reason}.
 
--callback store_batch(shard_id(), _Data, [emqx_types:message()], emqx_ds:message_store_opts()) ->
+-callback store_batch(
+    shard_id(),
+    _Data,
+    [{emqx_ds:time(), emqx_types:message()}],
+    emqx_ds:message_store_opts()
+) ->
     emqx_ds:store_batch_result().
 
 -callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) ->
@@ -219,6 +224,9 @@
 %% API for the replication layer
 %%================================================================================
 
+%% Note: we specify gen_server requests as records to make use of Dialyzer:
+-record(call_add_generation, {since :: emqx_ds:time()}).
+-record(call_update_config, {options :: emqx_ds:create_db_opts(), since :: emqx_ds:time()}).
 -record(call_list_generations_with_lifetimes, {}).
 -record(call_drop_generation, {gen_id :: gen_id()}).
 
@@ -230,7 +238,11 @@ open_shard(Shard, Options) ->
 drop_shard(Shard) ->
     ok = rocksdb:destroy(db_dir(Shard), []).
 
--spec store_batch(shard_id(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
+-spec store_batch(
+    shard_id(),
+    [{emqx_ds:time(), emqx_types:message()}],
+    emqx_ds:message_store_opts()
+) ->
     emqx_ds:store_batch_result().
 store_batch(Shard, Messages, Options) ->
     %% We always store messages in the current generation:
@@ -398,13 +410,16 @@ delete_next(
             {ok, end_of_stream}
     end.
 
--spec update_config(shard_id(), emqx_ds:create_db_opts()) -> ok.
-update_config(ShardId, Options) ->
-    gen_server:call(?REF(ShardId), {?FUNCTION_NAME, Options}, infinity).
+-spec update_config(shard_id(), emqx_ds:time(), emqx_ds:create_db_opts()) ->
+    ok | {error, overlaps_existing_generations}.
+update_config(ShardId, Since, Options) ->
+    Call = #call_update_config{since = Since, options = Options},
+    gen_server:call(?REF(ShardId), Call, infinity).
 
--spec add_generation(shard_id()) -> ok.
-add_generation(ShardId) ->
-    gen_server:call(?REF(ShardId), add_generation, infinity).
+-spec add_generation(shard_id(), emqx_ds:time()) ->
+    ok | {error, overlaps_existing_generations}.
+add_generation(ShardId, Since) ->
+    gen_server:call(?REF(ShardId), #call_add_generation{since = Since}, infinity).
 
 -spec list_generations_with_lifetimes(shard_id()) ->
     #{
@@ -438,9 +453,6 @@ start_link(Shard = {_, _}, Options) ->
     shard :: shard()
 }).
 
-%% Note: we specify gen_server requests as records to make use of Dialyzer:
--record(call_create_generation, {since :: emqx_ds:time()}).
-
 -type server_state() :: #s{}.
 
 -define(DEFAULT_CF, "default").
@@ -470,18 +482,22 @@ init({ShardId, Options}) ->
     commit_metadata(S),
     {ok, S}.
 
-handle_call({update_config, Options}, _From, #s{schema = Schema} = S0) ->
-    Prototype = maps:get(storage, Options),
-    S1 = S0#s{schema = Schema#{prototype := Prototype}},
-    Since = emqx_message:timestamp_now(),
-    S = add_generation(S1, Since),
-    commit_metadata(S),
-    {reply, ok, S};
-handle_call(add_generation, _From, S0) ->
-    Since = emqx_message:timestamp_now(),
-    S = add_generation(S0, Since),
-    commit_metadata(S),
-    {reply, ok, S};
+handle_call(#call_update_config{since = Since, options = Options}, _From, S0) ->
+    case handle_update_config(S0, Since, Options) of
+        S = #s{} ->
+            commit_metadata(S),
+            {reply, ok, S};
+        Error = {error, _} ->
+            {reply, Error, S0}
+    end;
+handle_call(#call_add_generation{since = Since}, _From, S0) ->
+    case handle_add_generation(S0, Since) of
+        S = #s{} ->
+            commit_metadata(S),
+            {reply, ok, S};
+        Error = {error, _} ->
+            {reply, Error, S0}
+    end;
 handle_call(#call_list_generations_with_lifetimes{}, _From, S) ->
     Generations = handle_list_generations_with_lifetimes(S),
     {reply, Generations, S};
@@ -489,10 +505,6 @@ handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) ->
     {Reply, S} = handle_drop_generation(S0, GenId),
     commit_metadata(S),
     {reply, Reply, S};
-handle_call(#call_create_generation{since = Since}, _From, S0) ->
-    S = add_generation(S0, Since),
-    commit_metadata(S),
-    {reply, ok, S};
 handle_call(_Call, _From, S) ->
     {reply, {error, unknown_call}, S}.
 
@@ -528,11 +540,10 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) ->
         ShardSchema
     ).
 
--spec add_generation(server_state(), emqx_ds:time()) -> server_state().
-add_generation(S0, Since) ->
+-spec handle_add_generation(server_state(), emqx_ds:time()) ->
+    server_state() | {error, overlaps_existing_generations}.
+handle_add_generation(S0, Since) ->
     #s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0,
-    Schema1 = update_last_until(Schema0, Since),
-    Shard1 = update_last_until(Shard0, Since),
 
     #{current_generation := OldGenId, prototype := {CurrentMod, _ModConf}} = Schema0,
     OldKey = ?GEN_KEY(OldGenId),
@@ -540,39 +551,53 @@ add_generation(S0, Since) ->
     #{cf_refs := OldCFRefs} = OldGenSchema,
     #{OldKey := #{module := OldMod, data := OldGenData}} = Shard0,
 
-    {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Since),
-
-    CFRefs = NewCFRefs ++ CFRefs0,
-    Key = ?GEN_KEY(GenId),
-    Generation0 =
-        #{data := NewGenData0} =
-        open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)),
-
-    %% When the new generation's module is the same as the last one, we might want to
-    %% perform actions like inheriting some of the previous (meta)data.
-    NewGenData =
-        run_post_creation_actions(
-            #{
-                shard_id => ShardId,
-                db => DB,
-                new_gen_id => GenId,
-                old_gen_id => OldGenId,
-                new_cf_refs => NewCFRefs,
-                old_cf_refs => OldCFRefs,
-                new_gen_runtime_data => NewGenData0,
-                old_gen_runtime_data => OldGenData,
-                new_module => CurrentMod,
-                old_module => OldMod
-            }
-        ),
-    Generation = Generation0#{data := NewGenData},
-
-    Shard = Shard1#{current_generation := GenId, Key => Generation},
-    S0#s{
-        cf_refs = CFRefs,
-        schema = Schema,
-        shard = Shard
-    }.
+    Schema1 = update_last_until(Schema0, Since),
+    Shard1 = update_last_until(Shard0, Since),
+
+    case Schema1 of
+        _Updated = #{} ->
+            {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Since),
+            CFRefs = NewCFRefs ++ CFRefs0,
+            Key = ?GEN_KEY(GenId),
+            Generation0 =
+                #{data := NewGenData0} =
+                open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)),
+            %% When the new generation's module is the same as the last one, we might want to
+            %% perform actions like inheriting some of the previous (meta)data.
+            NewGenData =
+                run_post_creation_actions(
+                    #{
+                        shard_id => ShardId,
+                        db => DB,
+                        new_gen_id => GenId,
+                        old_gen_id => OldGenId,
+                        new_cf_refs => NewCFRefs,
+                        old_cf_refs => OldCFRefs,
+                        new_gen_runtime_data => NewGenData0,
+                        old_gen_runtime_data => OldGenData,
+                        new_module => CurrentMod,
+                        old_module => OldMod
+                    }
+                ),
+            Generation = Generation0#{data := NewGenData},
+            Shard = Shard1#{current_generation := GenId, Key => Generation},
+            S0#s{
+                cf_refs = CFRefs,
+                schema = Schema,
+                shard = Shard
+            };
+        {error, exists} ->
+            S0;
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+-spec handle_update_config(server_state(), emqx_ds:time(), emqx_ds:create_db_opts()) ->
+    server_state() | {error, overlaps_existing_generations}.
+handle_update_config(S0 = #s{schema = Schema}, Since, Options) ->
+    Prototype = maps:get(storage, Options),
+    S = S0#s{schema = Schema#{prototype := Prototype}},
+    handle_add_generation(S, Since).
 
 -spec handle_list_generations_with_lifetimes(server_state()) -> #{gen_id() => map()}.
 handle_list_generations_with_lifetimes(#s{schema = ShardSchema}) ->
@@ -652,7 +677,7 @@ new_generation(ShardId, DB, Schema0, Since) ->
         module => Mod,
         data => GenData,
         cf_refs => NewCFRefs,
-        created_at => emqx_message:timestamp_now(),
+        created_at => erlang:system_time(millisecond),
         since => Since,
         until => undefined
     },
@@ -703,12 +728,19 @@ rocksdb_open(Shard, Options) ->
 db_dir({DB, ShardId}) ->
     filename:join([emqx_ds:base_dir(), atom_to_list(DB), binary_to_list(ShardId)]).
 
--spec update_last_until(Schema, emqx_ds:time()) -> Schema when Schema :: shard_schema() | shard().
-update_last_until(Schema, Until) ->
-    #{current_generation := GenId} = Schema,
-    GenData0 = maps:get(?GEN_KEY(GenId), Schema),
-    GenData = GenData0#{until := Until},
-    Schema#{?GEN_KEY(GenId) := GenData}.
+-spec update_last_until(Schema, emqx_ds:time()) ->
+    Schema | {error, exists | overlaps_existing_generations}
+when
+    Schema :: shard_schema() | shard().
+update_last_until(Schema = #{current_generation := GenId}, Until) ->
+    case maps:get(?GEN_KEY(GenId), Schema) of
+        GenData = #{since := CurrentSince} when CurrentSince < Until ->
+            Schema#{?GEN_KEY(GenId) := GenData#{until := Until}};
+        #{since := Until} ->
+            {error, exists};
+        #{since := CurrentSince} when CurrentSince > Until ->
+            {error, overlaps_existing_generations}
+    end.
 
 run_post_creation_actions(
     #{

+ 4 - 5
apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl

@@ -117,9 +117,8 @@ store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options = #{atomic := tru
     Res;
 store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) ->
     lists:foreach(
-        fun(Msg) ->
-            Id = erlang:unique_integer([monotonic]),
-            Key = <<Id:64>>,
+        fun({Timestamp, Msg}) ->
+            Key = <<Timestamp:64>>,
             Val = term_to_binary(Msg),
             rocksdb:put(DB, CF, Key, Val, [])
         end,
@@ -210,8 +209,8 @@ do_next(_, _, _, _, 0, Key, Acc) ->
     {Key, Acc};
 do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) ->
     case rocksdb:iterator_move(IT, Action) of
-        {ok, Key, Blob} ->
-            Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob),
+        {ok, Key = <<TS:64>>, Blob} ->
+            Msg = #message{topic = Topic} = binary_to_term(Blob),
             TopicWords = emqx_topic:words(Topic),
             case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of
                 true ->

+ 1 - 1
apps/emqx_durable_storage/src/emqx_durable_storage.app.src

@@ -5,7 +5,7 @@
     {vsn, "0.1.12"},
     {modules, []},
     {registered, []},
-    {applications, [kernel, stdlib, rocksdb, gproc, mria, emqx_utils]},
+    {applications, [kernel, stdlib, rocksdb, gproc, mria, ra, emqx_utils]},
     {mod, {emqx_ds_app, []}},
     {env, []}
 ]}.

+ 5 - 8
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -31,7 +31,9 @@ opts() ->
         backend => builtin,
         storage => {emqx_ds_storage_reference, #{}},
         n_shards => ?N_SHARDS,
-        replication_factor => 3
+        n_sites => 1,
+        replication_factor => 3,
+        replication_options => #{}
     }.
 
 %% A simple smoke test that verifies that opening/closing the DB
@@ -51,13 +53,8 @@ t_00_smoke_open_drop(_Config) ->
     lists:foreach(
         fun(Shard) ->
             ?assertEqual(
-                {ok, []}, emqx_ds_replication_layer_meta:replica_set(DB, Shard)
-            ),
-            ?assertEqual(
-                [Site], emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard)
-            ),
-            %%  Check that the leader is eleected;
-            ?assertEqual({ok, node()}, emqx_ds_replication_layer_meta:shard_leader(DB, Shard))
+                {ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard)
+            )
         end,
         Shards
     ),

+ 15 - 31
apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl

@@ -29,7 +29,9 @@
     backend => builtin,
     storage => {emqx_ds_storage_bitfield_lts, #{}},
     n_shards => 1,
-    replication_factor => 1
+    n_sites => 1,
+    replication_factor => 1,
+    replication_options => #{}
 }).
 
 -define(COMPACT_CONFIG, #{
@@ -54,7 +56,7 @@ t_store(_Config) ->
         payload = Payload,
         timestamp = PublishedAt
     },
-    ?assertMatch(ok, emqx_ds_storage_layer:store_batch(?SHARD, [Msg], #{})).
+    ?assertMatch(ok, emqx_ds_storage_layer:store_batch(?SHARD, [{PublishedAt, Msg}], #{})).
 
 %% Smoke test for iteration through a concrete topic
 t_iterate(_Config) ->
@@ -62,7 +64,7 @@ t_iterate(_Config) ->
     Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>],
     Timestamps = lists:seq(1, 10),
     Batch = [
-        make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))
+        {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
      || Topic <- Topics, PublishedAt <- Timestamps
     ],
     ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []),
@@ -90,7 +92,7 @@ t_delete(_Config) ->
     Topics = [<<"foo/bar">>, TopicToDelete, <<"a">>],
     Timestamps = lists:seq(1, 10),
     Batch = [
-        make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))
+        {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
      || Topic <- Topics, PublishedAt <- Timestamps
     ],
     ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []),
@@ -121,7 +123,7 @@ t_get_streams(_Config) ->
     Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>],
     Timestamps = lists:seq(1, 10),
     Batch = [
-        make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))
+        {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
      || Topic <- Topics, PublishedAt <- Timestamps
     ],
     ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []),
@@ -147,7 +149,7 @@ t_get_streams(_Config) ->
     NewBatch = [
         begin
             B = integer_to_binary(I),
-            make_message(100, <<"foo/bar/", B/binary>>, <<"filler", B/binary>>)
+            {100, make_message(100, <<"foo/bar/", B/binary>>, <<"filler", B/binary>>)}
         end
      || I <- lists:seq(1, 200)
     ],
@@ -176,12 +178,8 @@ t_new_generation_inherit_trie(_Config) ->
             Timestamps = lists:seq(1, 10_000, 100),
             Batch = [
                 begin
-                    B = integer_to_binary(I),
-                    make_message(
-                        TS,
-                        <<"wildcard/", B/binary, "/suffix/", Suffix/binary>>,
-                        integer_to_binary(TS)
-                    )
+                    Topic = emqx_topic:join(["wildcard", integer_to_binary(I), "suffix", Suffix]),
+                    {TS, make_message(TS, Topic, integer_to_binary(TS))}
                 end
              || I <- lists:seq(1, 200),
                 TS <- Timestamps,
@@ -190,7 +188,7 @@ t_new_generation_inherit_trie(_Config) ->
             ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []),
             %% Now we create a new generation with the same LTS module.  It should inherit the
             %% learned trie.
-            ok = emqx_ds_storage_layer:add_generation(?SHARD),
+            ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1000),
             ok
         end,
         fun(Trace) ->
@@ -205,23 +203,21 @@ t_replay(_Config) ->
     Topics = [<<"foo/bar">>, <<"foo/bar/baz">>],
     Timestamps = lists:seq(1, 10_000, 100),
     Batch1 = [
-        make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))
+        {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
      || Topic <- Topics, PublishedAt <- Timestamps
     ],
     ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []),
     %% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar':
     Batch2 = [
         begin
-            B = integer_to_binary(I),
-            make_message(
-                TS, <<"wildcard/", B/binary, "/suffix/", Suffix/binary>>, integer_to_binary(TS)
-            )
+            Topic = emqx_topic:join(["wildcard", integer_to_list(I), "suffix", Suffix]),
+            {TS, make_message(TS, Topic, integer_to_binary(TS))}
         end
      || I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>]
     ],
     ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []),
     %% Check various topic filters:
-    Messages = Batch1 ++ Batch2,
+    Messages = [M || {_TS, M} <- Batch1 ++ Batch2],
     %% Missing topics (no ghost messages):
     ?assertNot(check(?SHARD, <<"missing/foo/bar">>, 0, Messages)),
     %% Regular topics:
@@ -479,18 +475,6 @@ make_message(PublishedAt, Topic, Payload) when is_binary(Topic) ->
         payload = Payload
     }.
 
-store(Shard, PublishedAt, TopicL, Payload) when is_list(TopicL) ->
-    store(Shard, PublishedAt, list_to_binary(TopicL), Payload);
-store(Shard, PublishedAt, Topic, Payload) ->
-    ID = emqx_guid:gen(),
-    Msg = #message{
-        id = ID,
-        topic = Topic,
-        timestamp = PublishedAt,
-        payload = Payload
-    },
-    emqx_ds_storage_layer:message_store(Shard, [Msg], #{}).
-
 payloads(Messages) ->
     lists:map(
         fun(#message{payload = P}) ->

+ 1 - 1
apps/emqx_machine/src/emqx_machine_boot.erl

@@ -31,7 +31,7 @@
 -endif.
 
 %% These apps are always (re)started by emqx_machine:
--define(BASIC_REBOOT_APPS, [gproc, esockd, ranch, cowboy, emqx]).
+-define(BASIC_REBOOT_APPS, [gproc, esockd, ranch, cowboy, emqx_durable_storage, emqx]).
 
 %% If any of these applications crash, the entire EMQX node shuts down:
 -define(BASIC_PERMANENT_APPS, [mria, ekka, esockd, emqx]).

+ 41 - 1
apps/emqx_utils/src/emqx_utils_stream.erl

@@ -22,7 +22,9 @@
     list/1,
     mqueue/1,
     map/2,
-    chain/2
+    transpose/1,
+    chain/2,
+    repeat/1
 ]).
 
 %% Evaluating
@@ -91,6 +93,31 @@ map(F, S) ->
         end
     end.
 
+%% @doc Transpose a list of streams into a stream producing lists of their respective values.
+%% The resulting stream is as long as the shortest of the input streams.
+-spec transpose([stream(X)]) -> stream([X]).
+transpose([S]) ->
+    map(fun(X) -> [X] end, S);
+transpose([S | Streams]) ->
+    transpose_tail(S, transpose(Streams));
+transpose([]) ->
+    empty().
+
+transpose_tail(S, Tail) ->
+    fun() ->
+        case next(S) of
+            [X | SRest] ->
+                case next(Tail) of
+                    [Xs | TailRest] ->
+                        [[X | Xs] | transpose_tail(SRest, TailRest)];
+                    [] ->
+                        []
+                end;
+            [] ->
+                []
+        end
+    end.
+
 %% @doc Make a stream by chaining (concatenating) two streams.
 %% The second stream begins to produce values only after the first one is exhausted.
 -spec chain(stream(X), stream(Y)) -> stream(X | Y).
@@ -104,6 +131,19 @@ chain(SFirst, SThen) ->
         end
     end.
 
+%% @doc Make an infinite stream out of repeats of given stream.
+%% If the given stream is empty, the resulting stream is also empty.
+-spec repeat(stream(X)) -> stream(X).
+repeat(S) ->
+    fun() ->
+        case next(S) of
+            [X | SRest] ->
+                [X | chain(SRest, repeat(S))];
+            [] ->
+                []
+        end
+    end.
+
 %%
 
 %% @doc Produce the next value from the stream.

+ 74 - 0
apps/emqx_utils/test/emqx_utils_stream_tests.erl

@@ -74,6 +74,80 @@ chain_list_map_test() ->
         emqx_utils_stream:consume(S)
     ).
 
+transpose_test() ->
+    S = emqx_utils_stream:transpose([
+        emqx_utils_stream:list([1, 2, 3]),
+        emqx_utils_stream:list([4, 5, 6, 7])
+    ]),
+    ?assertEqual(
+        [[1, 4], [2, 5], [3, 6]],
+        emqx_utils_stream:consume(S)
+    ).
+
+transpose_none_test() ->
+    ?assertEqual(
+        [],
+        emqx_utils_stream:consume(emqx_utils_stream:transpose([]))
+    ).
+
+transpose_one_test() ->
+    S = emqx_utils_stream:transpose([emqx_utils_stream:list([1, 2, 3])]),
+    ?assertEqual(
+        [[1], [2], [3]],
+        emqx_utils_stream:consume(S)
+    ).
+
+transpose_many_test() ->
+    S = emqx_utils_stream:transpose([
+        emqx_utils_stream:list([1, 2, 3]),
+        emqx_utils_stream:list([4, 5, 6, 7]),
+        emqx_utils_stream:list([8, 9])
+    ]),
+    ?assertEqual(
+        [[1, 4, 8], [2, 5, 9]],
+        emqx_utils_stream:consume(S)
+    ).
+
+transpose_many_empty_test() ->
+    S = emqx_utils_stream:transpose([
+        emqx_utils_stream:list([1, 2, 3]),
+        emqx_utils_stream:list([4, 5, 6, 7]),
+        emqx_utils_stream:empty()
+    ]),
+    ?assertEqual(
+        [],
+        emqx_utils_stream:consume(S)
+    ).
+
+repeat_test() ->
+    S = emqx_utils_stream:repeat(emqx_utils_stream:list([1, 2, 3])),
+    ?assertMatch(
+        {[1, 2, 3, 1, 2, 3, 1, 2], _},
+        emqx_utils_stream:consume(8, S)
+    ),
+    {_, SRest} = emqx_utils_stream:consume(8, S),
+    ?assertMatch(
+        {[3, 1, 2, 3, 1, 2, 3, 1], _},
+        emqx_utils_stream:consume(8, SRest)
+    ).
+
+repeat_empty_test() ->
+    S = emqx_utils_stream:repeat(emqx_utils_stream:list([])),
+    ?assertEqual(
+        [],
+        emqx_utils_stream:consume(8, S)
+    ).
+
+transpose_repeat_test() ->
+    S = emqx_utils_stream:transpose([
+        emqx_utils_stream:repeat(emqx_utils_stream:list([1, 2])),
+        emqx_utils_stream:list([4, 5, 6, 7, 8])
+    ]),
+    ?assertEqual(
+        [[1, 4], [2, 5], [1, 6], [2, 7], [1, 8]],
+        emqx_utils_stream:consume(S)
+    ).
+
 mqueue_test() ->
     _ = erlang:send_after(1, self(), 1),
     _ = erlang:send_after(100, self(), 2),

+ 2 - 1
mix.exs

@@ -100,7 +100,8 @@ defmodule EMQXUmbrella.MixProject do
       {:rfc3339, github: "emqx/rfc3339", tag: "0.2.3", override: true},
       {:bcrypt, github: "emqx/erlang-bcrypt", tag: "0.6.2", override: true},
       {:uuid, github: "okeuday/uuid", tag: "v2.0.6", override: true},
-      {:quickrand, github: "okeuday/quickrand", tag: "v2.0.6", override: true}
+      {:quickrand, github: "okeuday/quickrand", tag: "v2.0.6", override: true},
+      {:ra, "2.7.3", override: true}
     ] ++
       emqx_apps(profile_info, version) ++
       enterprise_deps(profile_info) ++ jq_dep() ++ quicer_dep()

+ 2 - 1
rebar.config

@@ -110,7 +110,8 @@
     {uuid, {git, "https://github.com/okeuday/uuid.git", {tag, "v2.0.6"}}},
     {ssl_verify_fun, "1.1.7"},
     {rfc3339, {git, "https://github.com/emqx/rfc3339.git", {tag, "0.2.3"}}},
-    {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.2"}}}
+    {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.2"}}},
+    {ra, "2.7.3"}
 ]}.
 
 {xref_ignores,

+ 9 - 0
rel/i18n/emqx_ds_schema.hocon

@@ -30,6 +30,15 @@ builtin_n_shards.desc:
   Please note that it takes effect only during the initialization of the durable storage database.
   Changing this configuration parameter after the database has been already created won't take any effect.~"""
 
+builtin_n_sites.label: "Initial number of sites"
+builtin_n_sites.desc:
+  """~
+  Number of storage sites that need to share responsibility over the set of storage shards.
+  In this context, sites are essentially EMQX nodes that have message durability enabled.
+  Please note that it takes effect only during the initialization of the durable storage database.
+  During this phase at least that many sites should come online to distribute shards between them, otherwise message storage will be unavailable until then.
+  After the initialization is complete, sites may be offline, which will affect availability depending on the number of offline sites and replication factor.~"""
+
 builtin_local_write_buffer.label: "Local write buffer"
 builtin_local_write_buffer.desc:
   """~