Parcourir la source

Merge pull request #13596 from keynslug/feat/EMQX-12307/leader-store

feat(dssubs): employ leader store to manage leader state
Ilia Averianov il y a 1 an
Parent
commit
ca6cc7b126

+ 29 - 24
apps/emqx/src/emqx_ds_schema.erl

@@ -18,7 +18,8 @@
 -module(emqx_ds_schema).
 
 %% API:
--export([schema/0, translate_builtin_raft/1, translate_builtin_local/1]).
+-export([schema/0, storage_schema/1, translate_builtin_raft/1, translate_builtin_local/1]).
+-export([db_config/1]).
 
 %% Behavior callbacks:
 -export([fields/1, desc/1, namespace/0]).
@@ -48,6 +49,11 @@
 %% API
 %%================================================================================
 
+-spec db_config(emqx_config:runtime_config_key_path()) -> emqx_ds:create_db_opts().
+db_config(Path) ->
+    ConfigTree = #{'_config_handler' := {Module, Function}} = emqx_config:get(Path),
+    apply(Module, Function, [ConfigTree]).
+
 translate_builtin_raft(
     Backend = #{
         backend := builtin_raft,
@@ -89,15 +95,22 @@ namespace() ->
 schema() ->
     [
         {messages,
-            ds_schema(#{
-                default =>
-                    #{
-                        <<"backend">> => ?DEFAULT_BACKEND
-                    },
+            storage_schema(#{
                 importance => ?IMPORTANCE_MEDIUM,
                 desc => ?DESC(messages)
             })}
-    ].
+    ] ++ emqx_schema_hooks:injection_point('durable_storage', []).
+
+storage_schema(ExtraOptions) ->
+    Options = #{
+        default => #{<<"backend">> => ?DEFAULT_BACKEND}
+    },
+    sc(
+        hoconsc:union(
+            ?BUILTIN_BACKENDS ++ emqx_schema_hooks:injection_point('durable_storage.backends', [])
+        ),
+        maps:merge(Options, ExtraOptions)
+    ).
 
 fields(builtin_local) ->
     %% Schema for the builtin_raft backend:
@@ -145,26 +158,26 @@ fields(builtin_raft) ->
                     importance => ?IMPORTANCE_HIDDEN
                 }
             )},
-        %% TODO: Deprecate once cluster management and rebalancing is implemented.
-        {"n_sites",
+        {replication_factor,
             sc(
                 pos_integer(),
                 #{
-                    default => 1,
-                    importance => ?IMPORTANCE_HIDDEN,
-                    desc => ?DESC(builtin_n_sites)
+                    default => 3,
+                    importance => ?IMPORTANCE_MEDIUM,
+                    desc => ?DESC(builtin_raft_replication_factor)
                 }
             )},
-        {replication_factor,
+        {n_sites,
             sc(
                 pos_integer(),
                 #{
-                    default => 3,
-                    importance => ?IMPORTANCE_HIDDEN
+                    default => 1,
+                    importance => ?IMPORTANCE_LOW,
+                    desc => ?DESC(builtin_raft_n_sites)
                 }
             )},
         %% TODO: Elaborate.
-        {"replication_options",
+        {replication_options,
             sc(
                 hoconsc:map(name, any()),
                 #{
@@ -375,14 +388,6 @@ translate_layout(
 translate_layout(#{type := reference}) ->
     {emqx_ds_storage_reference, #{}}.
 
-ds_schema(Options) ->
-    sc(
-        hoconsc:union(
-            ?BUILTIN_BACKENDS ++ emqx_schema_hooks:injection_point('durable_storage.backends', [])
-        ),
-        Options
-    ).
-
 builtin_layouts() ->
     %% Reference layout stores everything in one stream, so it's not
     %% suitable for production use. However, it's very simple and

+ 4 - 9
apps/emqx/src/emqx_persistent_message.erl

@@ -45,8 +45,7 @@ init() ->
     persistent_term:put(?PERSISTENCE_ENABLED, IsEnabled),
     ?WITH_DURABILITY_ENABLED(begin
         ?SLOG(notice, #{msg => "Session durability is enabled"}),
-        Backend = storage_backend(),
-        ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, Backend),
+        ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, get_db_config()),
         ok = emqx_persistent_session_ds_router:init_tables(),
         ok = emqx_persistent_session_ds:create_tables(),
         ok
@@ -60,9 +59,9 @@ is_persistence_enabled() ->
 is_persistence_enabled(Zone) ->
     emqx_config:get_zone_conf(Zone, [durable_sessions, enable]).
 
--spec storage_backend() -> emqx_ds:create_db_opts().
-storage_backend() ->
-    storage_backend([durable_storage, messages]).
+-spec get_db_config() -> emqx_ds:create_db_opts().
+get_db_config() ->
+    emqx_ds_schema:db_config([durable_storage, messages]).
 
 %% Dev-only option: force all messages to go through
 %% `emqx_persistent_session_ds':
@@ -70,10 +69,6 @@ storage_backend() ->
 force_ds(Zone) ->
     emqx_config:get_zone_conf(Zone, [durable_sessions, force_persistence]).
 
-storage_backend(Path) ->
-    ConfigTree = #{'_config_handler' := {Module, Function}} = emqx_config:get(Path),
-    apply(Module, Function, [ConfigTree]).
-
 %%--------------------------------------------------------------------
 
 -spec add_handler() -> ok.

+ 6 - 0
apps/emqx_conf/src/emqx_conf_schema_inject.erl

@@ -27,6 +27,7 @@ schemas(Edition) ->
         cluster_linking(Edition) ++
         authn(Edition) ++
         authz() ++
+        shared_subs(Edition) ++
         customized(Edition).
 
 mria(ce) ->
@@ -81,6 +82,11 @@ authz_mods() ->
         emqx_authz_ldap_schema
     ].
 
+shared_subs(ee) ->
+    [emqx_ds_shared_sub_schema];
+shared_subs(ce) ->
+    [].
+
 %% Add more schemas here.
 customized(_Edition) ->
     [].

+ 124 - 0
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_elector.erl

@@ -0,0 +1,124 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+%% @doc Shared subscription group elector process.
+%% Hosted under the _shared subscription registry_ supervisor.
+%% Responsible for starting the leader election process that eventually
+%% finishes with 2 outcomes:
+%% 1. The elector wins the leadership.
+%%    In this case the elector _becomes_ the leader, by entering the
+%%    `emqx_ds_shared_sub_leader` process loop.
+%% 2. The elector finds the active leader.
+%%    In this case the elector idles while the leader is considered active
+%%    and redirects any connect requests to the active leader.
+-module(emqx_ds_shared_sub_elector).
+
+-include("emqx_ds_shared_sub_proto.hrl").
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
+
+%% Internal API
+-export([
+    start_link/2
+]).
+
+-behaviour(gen_server).
+-export([
+    init/1,
+    handle_continue/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2
+]).
+
+%%--------------------------------------------------------------------
+%% Internal API
+%%--------------------------------------------------------------------
+
+start_link(ShareTopic, StartTime) ->
+    gen_server:start_link(?MODULE, {elect, ShareTopic, StartTime}, []).
+
+%%--------------------------------------------------------------------
+%% gen_server callbacks
+%%--------------------------------------------------------------------
+
+-record(follower, {
+    topic :: emqx_persistent_session_ds:share_topic_filter(),
+    leader :: pid(),
+    alive_until :: non_neg_integer()
+}).
+
+init(Elect = {elect, _ShareTopic, _StartTime}) ->
+    %% NOTE
+    %% Important to have it here, because this process can become
+    %% `emqx_ds_shared_sub_leader`, which has `terminate/2` logic.
+    _ = erlang:process_flag(trap_exit, true),
+    {ok, #{}, {continue, Elect}}.
+
+handle_continue({elect, ShareTopic, StartTime}, _State) ->
+    elect(ShareTopic, StartTime).
+
+handle_call(_Request, _From, State) ->
+    {reply, {error, unknown_request}, State}.
+
+handle_cast(_Cast, State) ->
+    {noreply, State}.
+
+handle_info(?agent_connect_leader_match(Agent, AgentMetadata, _ShareTopic), State) ->
+    %% NOTE: Redirecting to the known leader.
+    ok = connect_leader(Agent, AgentMetadata, State),
+    {noreply, State};
+handle_info({timeout, _TRef, invalidate}, State) ->
+    {stop, {shutdown, invalidate}, State}.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+elect(ShareTopic, TS) ->
+    Group = emqx_ds_shared_sub_leader:group_name(ShareTopic),
+    case emqx_ds_shared_sub_leader_store:claim_leadership(Group, _Leader = self(), TS) of
+        {ok, LeaderClaim} ->
+            %% Become the leader.
+            ?tp(debug, shared_sub_elector_becomes_leader, #{
+                id => ShareTopic,
+                group => Group,
+                leader => LeaderClaim
+            }),
+            emqx_ds_shared_sub_leader:become(ShareTopic, TS, LeaderClaim);
+        {exists, LeaderClaim} ->
+            %% Turn into the follower that redirects connect requests to the leader
+            %% while it's considered alive. Note that the leader may in theory decide
+            %% to let go of leadership earlier than that.
+            AliveUntil = emqx_ds_shared_sub_leader_store:alive_until(LeaderClaim),
+            ?tp(debug, shared_sub_elector_becomes_follower, #{
+                id => ShareTopic,
+                group => Group,
+                leader => LeaderClaim,
+                until => AliveUntil
+            }),
+            TTL = AliveUntil - TS,
+            _TRef = erlang:start_timer(max(0, TTL), self(), invalidate),
+            St = #follower{
+                topic = ShareTopic,
+                leader = emqx_ds_shared_sub_leader_store:leader_id(LeaderClaim),
+                alive_until = AliveUntil
+            },
+            {noreply, St};
+        {error, Class, Reason} = Error ->
+            ?tp(warning, "Shared subscription leader election failed", #{
+                id => ShareTopic,
+                group => Group,
+                error => Error
+            }),
+            case Class of
+                recoverable -> StopReason = {shutdown, Reason};
+                unrecoverable -> StopReason = Error
+            end,
+            {stop, StopReason, ShareTopic}
+    end.
+
+connect_leader(Agent, AgentMetadata, #follower{topic = ShareTopic, leader = Pid}) ->
+    emqx_ds_shared_sub_proto:agent_connect_leader(Pid, Agent, AgentMetadata, ShareTopic).

+ 2 - 2
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl

@@ -180,7 +180,7 @@ handle_connecting(#{agent := Agent, share_topic_filter := ShareTopicFilter} = GS
         agent => Agent,
         share_topic_filter => ShareTopicFilter
     }),
-    ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM), ShareTopicFilter),
+    ok = emqx_ds_shared_sub_registry:leader_wanted(Agent, agent_metadata(GSM), ShareTopicFilter),
     ensure_state_timeout(GSM, find_leader_timeout, ?dq_config(session_find_leader_timeout_ms)).
 
 handle_leader_lease_streams(
@@ -211,7 +211,7 @@ handle_find_leader_timeout(#{agent := Agent, share_topic_filter := ShareTopicFil
         agent => Agent,
         share_topic_filter => ShareTopicFilter
     }),
-    ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM0), ShareTopicFilter),
+    ok = emqx_ds_shared_sub_registry:leader_wanted(Agent, agent_metadata(GSM0), ShareTopicFilter),
     GSM1 = ensure_state_timeout(
         GSM0, find_leader_timeout, ?dq_config(session_find_leader_timeout_ms)
     ),

+ 227 - 133
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl

@@ -4,8 +4,6 @@
 
 -module(emqx_ds_shared_sub_leader).
 
--behaviour(gen_statem).
-
 -include("emqx_ds_shared_sub_proto.hrl").
 -include("emqx_ds_shared_sub_config.hrl").
 
@@ -15,12 +13,16 @@
 -include_lib("snabbkaffe/include/trace.hrl").
 
 -export([
-    register/2,
-
     start_link/1,
-    child_spec/1,
-    id/1,
+    become/3
+]).
 
+-export([
+    group_name/1
+]).
+
+-behaviour(gen_statem).
+-export([
     callback_mode/0,
     init/1,
     handle_event/4,
@@ -53,14 +55,6 @@
     revoked_streams := list(emqx_ds:stream())
 }.
 
--type progress() :: emqx_persistent_session_ds_shared_subs:progress().
-
--type stream_state() :: #{
-    progress => progress(),
-    rank => emqx_ds:stream_rank()
-}.
-
-%% TODO https://emqx.atlassian.net/browse/EMQX-12307
 %% Some data should be persisted
 -type data() :: #{
     %%
@@ -68,12 +62,8 @@
     %%
     group_id := group_id(),
     topic := emqx_types:topic(),
-    %% TODO https://emqx.atlassian.net/browse/EMQX-12575
     %% Implement some stats to assign evenly?
-    stream_states := #{
-        emqx_ds:stream() => stream_state()
-    },
-    rank_progress := emqx_ds_shared_sub_leader_rank_progress:t(),
+    store := emqx_ds_shared_sub_leader_store:t(),
 
     %%
     %% Ephemeral data, should not be persisted
@@ -88,49 +78,44 @@
 
 -export_type([
     options/0,
-    data/0,
-    progress/0
+    data/0
 ]).
 
 %% States
 
--define(leader_waiting_registration, leader_waiting_registration).
 -define(leader_active, leader_active).
 
 %% Events
 
--record(register, {
-    register_fun :: fun(() -> pid())
-}).
 -record(renew_streams, {}).
 -record(renew_leases, {}).
 -record(drop_timeout, {}).
+-record(renew_leader_claim, {}).
 
 %%--------------------------------------------------------------------
 %% API
 %%--------------------------------------------------------------------
 
-register(Pid, Fun) ->
-    gen_statem:call(Pid, #register{register_fun = Fun}).
-
-%%--------------------------------------------------------------------
-%% Internal API
-%%--------------------------------------------------------------------
-
-child_spec(#{share_topic_filter := ShareTopicFilter} = Options) ->
-    #{
-        id => id(ShareTopicFilter),
-        start => {?MODULE, start_link, [Options]},
-        restart => temporary,
-        shutdown => 5000,
-        type => worker
-    }.
-
 start_link(Options) ->
     gen_statem:start_link(?MODULE, [Options], []).
 
-id(ShareTopicFilter) ->
-    {?MODULE, ShareTopicFilter}.
+become(ShareTopicFilter, StartTime, Claim) ->
+    Data0 = init_data(ShareTopicFilter, StartTime),
+    Data1 = attach_claim(Claim, Data0),
+    case store_is_dirty(Data1) of
+        true ->
+            Actions = force_claim_renewal(Data1);
+        false ->
+            Actions = init_claim_renewal(Data1)
+    end,
+    gen_statem:enter_loop(?MODULE, [], ?leader_active, Data1, Actions).
+
+%%--------------------------------------------------------------------
+
+group_name(#share{group = ShareGroup, topic = Topic}) ->
+    %% NOTE: Should not contain `/`s.
+    %% TODO: More observable encoding.
+    iolist_to_binary([ShareGroup, $:, binary:encode_hex(Topic)]).
 
 %%--------------------------------------------------------------------
 %% gen_statem callbacks
@@ -138,29 +123,42 @@ id(ShareTopicFilter) ->
 
 callback_mode() -> [handle_event_function, state_enter].
 
-init([#{share_topic_filter := #share{topic = Topic} = ShareTopicFilter} = _Options]) ->
-    Data = #{
+init([#{share_topic_filter := ShareTopicFilter} = _Options]) ->
+    _ = erlang:process_flag(trap_exit, true),
+    Data = init_data(ShareTopicFilter, now_ms()),
+    {ok, ?leader_active, Data}.
+
+init_data(#share{topic = Topic} = ShareTopicFilter, StartTime) ->
+    Group = group_name(ShareTopicFilter),
+    case emqx_ds_shared_sub_leader_store:open(Group) of
+        Store when Store =/= false ->
+            ?tp(warning, shared_sub_leader_store_open, #{topic => ShareTopicFilter, store => Store}),
+            ok;
+        false ->
+            ?tp(warning, shared_sub_leader_store_init, #{topic => ShareTopicFilter}),
+            RankProgress = emqx_ds_shared_sub_leader_rank_progress:init(),
+            Store0 = emqx_ds_shared_sub_leader_store:init(Group),
+            Store1 = emqx_ds_shared_sub_leader_store:set(start_time, StartTime, Store0),
+            Store = emqx_ds_shared_sub_leader_store:set(rank_progress, RankProgress, Store1)
+    end,
+    #{
         group_id => ShareTopicFilter,
         topic => Topic,
-        start_time => now_ms(),
-        stream_states => #{},
+        store => Store,
         stream_owners => #{},
-        agents => #{},
-        rank_progress => emqx_ds_shared_sub_leader_rank_progress:init()
-    },
-    {ok, ?leader_waiting_registration, Data}.
+        agents => #{}
+    }.
+
+attach_claim(Claim, Data) ->
+    Data#{leader_claim => Claim}.
+
+force_claim_renewal(_Data = #{}) ->
+    [{{timeout, #renew_leader_claim{}}, 0, #renew_leader_claim{}}].
+
+init_claim_renewal(_Data = #{leader_claim := Claim}) ->
+    Interval = emqx_ds_shared_sub_leader_store:heartbeat_interval(Claim),
+    [{{timeout, #renew_leader_claim{}}, Interval, #renew_leader_claim{}}].
 
-%%--------------------------------------------------------------------
-%% waiting_registration state
-
-handle_event({call, From}, #register{register_fun = Fun}, ?leader_waiting_registration, Data) ->
-    Self = self(),
-    case Fun() of
-        Self ->
-            {next_state, ?leader_active, Data, {reply, From, {ok, Self}}};
-        OtherPid ->
-            {stop_and_reply, normal, {reply, From, {ok, OtherPid}}}
-    end;
 %%--------------------------------------------------------------------
 %% repalying state
 handle_event(enter, _OldState, ?leader_active, #{topic := Topic} = _Data) ->
@@ -193,6 +191,14 @@ handle_event({timeout, #drop_timeout{}}, #drop_timeout{}, ?leader_active, Data0)
     Data1 = drop_timeout_agents(Data0),
     {keep_state, Data1,
         {{timeout, #drop_timeout{}}, ?dq_config(leader_drop_timeout_interval_ms), #drop_timeout{}}};
+handle_event({timeout, #renew_leader_claim{}}, #renew_leader_claim{}, ?leader_active, Data0) ->
+    case renew_leader_claim(Data0) of
+        Data1 = #{} ->
+            Actions = init_claim_renewal(Data1),
+            {keep_state, Data1, Actions};
+        Error ->
+            {stop, Error}
+    end;
 %%--------------------------------------------------------------------
 %% agent events
 handle_event(
@@ -243,13 +249,57 @@ handle_event(Event, Content, State, _Data) ->
     }),
     keep_state_and_data.
 
-terminate(_Reason, _State, _Data) ->
-    ok.
+terminate(
+    _Reason,
+    _State,
+    #{group_id := ShareTopicFilter, leader_claim := Claim, store := Store}
+) ->
+    %% NOTE
+    %% Call to `commit_dirty/2` will currently block.
+    %% On the other hand, call to `disown_leadership/1` should be non-blocking.
+    Group = group_name(ShareTopicFilter),
+    Result = emqx_ds_shared_sub_leader_store:commit_dirty(Claim, Store),
+    ok = emqx_ds_shared_sub_leader_store:disown_leadership(Group, Claim),
+    ?tp(shared_sub_leader_store_committed_dirty, #{
+        id => ShareTopicFilter,
+        group => Group,
+        claim => Claim,
+        result => Result
+    }).
 
 %%--------------------------------------------------------------------
 %% Event handlers
 %%--------------------------------------------------------------------
 
+%%--------------------------------------------------------------------
+
+renew_leader_claim(Data = #{group_id := ShareTopicFilter, store := Store0, leader_claim := Claim}) ->
+    TS = emqx_message:timestamp_now(),
+    Group = group_name(ShareTopicFilter),
+    case emqx_ds_shared_sub_leader_store:commit_renew(Claim, TS, Store0) of
+        {ok, RenewedClaim, CommittedStore} ->
+            ?tp(shared_sub_leader_store_committed, #{
+                id => ShareTopicFilter,
+                group => Group,
+                claim => Claim,
+                renewed => RenewedClaim
+            }),
+            attach_claim(RenewedClaim, Data#{store := CommittedStore});
+        {error, Class, Reason} = Error ->
+            ?tp(warning, "Shared subscription leader store commit failed", #{
+                id => ShareTopicFilter,
+                group => Group,
+                claim => Claim,
+                reason => Reason
+            }),
+            case Class of
+                %% Will retry.
+                recoverable -> Data;
+                %% Will have to crash.
+                unrecoverable -> Error
+            end
+    end.
+
 %%--------------------------------------------------------------------
 %% Renew streams
 
@@ -257,60 +307,50 @@ terminate(_Reason, _State, _Data) ->
 %% * Revoke streams from agents having too many streams
 %% * Assign streams to agents having too few streams
 
-renew_streams(
-    #{
-        start_time := StartTime,
-        stream_states := StreamStates,
-        topic := Topic,
-        rank_progress := RankProgress0
-    } = Data0
-) ->
+renew_streams(#{topic := Topic} = Data0) ->
     TopicFilter = emqx_topic:words(Topic),
-    StreamsWRanks = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime),
+    StartTime = store_get_start_time(Data0),
+    StreamsWRanks = get_streams(TopicFilter, StartTime),
 
     %% Discard streams that are already replayed and init new
-    {NewStreamsWRanks, RankProgress1} = emqx_ds_shared_sub_leader_rank_progress:add_streams(
-        StreamsWRanks, RankProgress0
-    ),
-    {NewStreamStates, VanishedStreamStates} = update_progresses(
-        StreamStates, NewStreamsWRanks, TopicFilter, StartTime
+    {NewStreamsWRanks, RankProgress} = emqx_ds_shared_sub_leader_rank_progress:add_streams(
+        StreamsWRanks,
+        store_get_rank_progress(Data0)
     ),
-    Data1 = removed_vanished_streams(Data0, VanishedStreamStates),
-    Data2 = Data1#{stream_states => NewStreamStates, rank_progress => RankProgress1},
-    Data3 = revoke_streams(Data2),
-    Data4 = assign_streams(Data3),
+    {Data1, VanishedStreams} = update_progresses(Data0, NewStreamsWRanks, TopicFilter, StartTime),
+    Data2 = store_put_rank_progress(Data1, RankProgress),
+    Data3 = removed_vanished_streams(Data2, VanishedStreams),
+    Data4 = revoke_streams(Data3),
+    Data5 = assign_streams(Data4),
     ?SLOG(info, #{
         msg => leader_renew_streams,
         topic_filter => TopicFilter,
         new_streams => length(NewStreamsWRanks)
     }),
-    Data4.
+    Data5.
 
-update_progresses(StreamStates, NewStreamsWRanks, TopicFilter, StartTime) ->
-    lists:foldl(
-        fun({Rank, Stream}, {NewStreamStatesAcc, OldStreamStatesAcc}) ->
-            case OldStreamStatesAcc of
-                #{Stream := StreamData} ->
-                    {
-                        NewStreamStatesAcc#{Stream => StreamData},
-                        maps:remove(Stream, OldStreamStatesAcc)
-                    };
-                _ ->
-                    {ok, It} = emqx_ds:make_iterator(
-                        ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
-                    ),
-                    Progress = #{
-                        iterator => It
-                    },
-                    {
-                        NewStreamStatesAcc#{Stream => #{progress => Progress, rank => Rank}},
-                        OldStreamStatesAcc
-                    }
+update_progresses(Data0, NewStreamsWRanks, TopicFilter, StartTime) ->
+    ExistingStreams = store_setof_streams(Data0),
+    Data = lists:foldl(
+        fun({Rank, Stream}, DataAcc) ->
+            case sets:is_element(Stream, ExistingStreams) of
+                true ->
+                    DataAcc;
+                false ->
+                    {ok, It} = make_iterator(Stream, TopicFilter, StartTime),
+                    StreamData = #{progress => #{iterator => It}, rank => Rank},
+                    store_put_stream(DataAcc, Stream, StreamData)
             end
         end,
-        {#{}, StreamStates},
+        Data0,
         NewStreamsWRanks
-    ).
+    ),
+    VanishedStreams = lists:foldl(
+        fun({_Rank, Stream}, Acc) -> sets:del_element(Stream, Acc) end,
+        ExistingStreams,
+        NewStreamsWRanks
+    ),
+    {Data, sets:to_list(VanishedStreams)}.
 
 %% We just remove disappeared streams from anywhere.
 %%
@@ -320,8 +360,7 @@ update_progresses(StreamStates, NewStreamsWRanks, TopicFilter, StartTime) ->
 %%
 %% If streams disappear after long leader sleep, it is a normal situation.
 %% This removal will be a part of initialization before any agents connect.
-removed_vanished_streams(Data0, VanishedStreamStates) ->
-    VanishedStreams = maps:keys(VanishedStreamStates),
+removed_vanished_streams(Data0, VanishedStreams) ->
     Data1 = lists:foldl(
         fun(Stream, #{stream_owners := StreamOwners0} = DataAcc) ->
             case StreamOwners0 of
@@ -601,47 +640,61 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, Version) ->
     end.
 
 update_stream_progresses(
-    #{stream_states := StreamStates0, stream_owners := StreamOwners} = Data0,
+    #{stream_owners := StreamOwners} = Data0,
     Agent,
     AgentState0,
     ReceivedStreamProgresses
 ) ->
-    {StreamStates1, ReplayedStreams} = lists:foldl(
-        fun(#{stream := Stream, progress := Progress}, {StreamStatesAcc, ReplayedStreamsAcc}) ->
+    ReplayedStreams = lists:foldl(
+        fun(#{stream := Stream, progress := Progress}, Acc) ->
+            case StreamOwners of
+                #{Stream := Agent} ->
+                    case Progress of
+                        #{iterator := end_of_stream} ->
+                            #{rank := Rank} = store_get_stream(Data0, Stream),
+                            Acc#{Stream => Rank};
+                        _ ->
+                            Acc
+                    end;
+                _ ->
+                    Acc
+            end
+        end,
+        #{},
+        ReceivedStreamProgresses
+    ),
+    Data1 = lists:foldl(
+        fun(#{stream := Stream, progress := Progress}, DataAcc) ->
             case StreamOwners of
                 #{Stream := Agent} ->
-                    StreamData0 = maps:get(Stream, StreamStatesAcc),
+                    StreamData0 = store_get_stream(DataAcc, Stream),
                     case Progress of
                         #{iterator := end_of_stream} ->
-                            Rank = maps:get(rank, StreamData0),
-                            {maps:remove(Stream, StreamStatesAcc), ReplayedStreamsAcc#{
-                                Stream => Rank
-                            }};
+                            store_delete_stream(DataAcc, Stream);
                         _ ->
-                            StreamData1 = StreamData0#{progress => Progress},
-                            {StreamStatesAcc#{Stream => StreamData1}, ReplayedStreamsAcc}
+                            StreamData = StreamData0#{progress => Progress},
+                            store_put_stream(DataAcc, Stream, StreamData)
                     end;
                 _ ->
-                    {StreamStatesAcc, ReplayedStreamsAcc}
+                    DataAcc
             end
         end,
-        {StreamStates0, #{}},
+        Data0,
         ReceivedStreamProgresses
     ),
-    Data1 = update_rank_progress(Data0, ReplayedStreams),
-    Data2 = Data1#{stream_states => StreamStates1},
+    Data2 = update_rank_progress(Data1, ReplayedStreams),
     AgentState1 = filter_replayed_streams(AgentState0, ReplayedStreams),
     {Data2, AgentState1}.
 
-update_rank_progress(#{rank_progress := RankProgress0} = Data, ReplayedStreams) ->
-    RankProgress1 = maps:fold(
+update_rank_progress(Data, ReplayedStreams) ->
+    RankProgress = maps:fold(
         fun(Stream, Rank, RankProgressAcc) ->
             emqx_ds_shared_sub_leader_rank_progress:set_replayed({Rank, Stream}, RankProgressAcc)
         end,
-        RankProgress0,
+        store_get_rank_progress(Data),
         ReplayedStreams
     ),
-    Data#{rank_progress => RankProgress1}.
+    store_put_rank_progress(Data, RankProgress).
 
 %% No need to revoke fully replayed streams. We do not assign them anymore.
 %% The agent's session also will drop replayed streams itself.
@@ -899,10 +952,9 @@ renew_no_replaying_deadline(#{} = AgentState) ->
             ?dq_config(leader_session_not_replaying_timeout_ms)
     }.
 
-unassigned_streams(#{stream_states := StreamStates, stream_owners := StreamOwners}) ->
-    Streams = maps:keys(StreamStates),
-    AssignedStreams = maps:keys(StreamOwners),
-    Streams -- AssignedStreams.
+unassigned_streams(#{stream_owners := StreamOwners} = Data) ->
+    Streams = store_setof_streams(Data),
+    sets:to_list(sets:subtract(Streams, StreamOwners)).
 
 %% Those who are not connecting or updating, i.e. not in a transient state.
 replaying_agents(#{agents := AgentStates}) ->
@@ -922,12 +974,12 @@ desired_stream_count_per_agent(#{agents := AgentStates} = Data) ->
 desired_stream_count_for_new_agent(#{agents := AgentStates} = Data) ->
     desired_stream_count_per_agent(Data, maps:size(AgentStates) + 1).
 
-desired_stream_count_per_agent(#{stream_states := StreamStates}, AgentCount) ->
+desired_stream_count_per_agent(Data, AgentCount) ->
     case AgentCount of
         0 ->
             0;
         _ ->
-            StreamCount = maps:size(StreamStates),
+            StreamCount = store_num_streams(Data),
             case StreamCount rem AgentCount of
                 0 ->
                     StreamCount div AgentCount;
@@ -936,10 +988,10 @@ desired_stream_count_per_agent(#{stream_states := StreamStates}, AgentCount) ->
             end
     end.
 
-stream_progresses(#{stream_states := StreamStates} = _Data, Streams) ->
+stream_progresses(Data, Streams) ->
     lists:map(
         fun(Stream) ->
-            StreamData = maps:get(Stream, StreamStates),
+            StreamData = store_get_stream(Data, Stream),
             #{
                 stream => Stream,
                 progress => maps:get(progress, StreamData)
@@ -1013,3 +1065,45 @@ with_agent(#{agents := Agents} = Data, Agent, Fun) ->
         _ ->
             Data
     end.
+
+%%
+
+get_streams(TopicFilter, StartTime) ->
+    emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime).
+
+make_iterator(Stream, TopicFilter, StartTime) ->
+    emqx_ds:make_iterator(?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime).
+
+%% Leader store
+
+store_is_dirty(#{store := Store}) ->
+    emqx_ds_shared_sub_leader_store:dirty(Store).
+
+store_get_stream(#{store := Store}, ID) ->
+    emqx_ds_shared_sub_leader_store:get(stream, ID, Store).
+
+store_put_stream(Data = #{store := Store0}, ID, StreamData) ->
+    Store = emqx_ds_shared_sub_leader_store:put(stream, ID, StreamData, Store0),
+    Data#{store := Store}.
+
+store_delete_stream(Data = #{store := Store0}, ID) ->
+    Store = emqx_ds_shared_sub_leader_store:delete(stream, ID, Store0),
+    Data#{store := Store}.
+
+store_get_rank_progress(#{store := Store}) ->
+    emqx_ds_shared_sub_leader_store:get(rank_progress, Store).
+
+store_put_rank_progress(Data = #{store := Store0}, RankProgress) ->
+    Store = emqx_ds_shared_sub_leader_store:set(rank_progress, RankProgress, Store0),
+    Data#{store := Store}.
+
+store_get_start_time(#{store := Store}) ->
+    emqx_ds_shared_sub_leader_store:get(start_time, Store).
+
+store_num_streams(#{store := Store}) ->
+    emqx_ds_shared_sub_leader_store:size(stream, Store).
+
+store_setof_streams(#{store := Store}) ->
+    Acc0 = sets:new([{version, 2}]),
+    FoldFun = fun(Stream, _StreamData, Acc) -> sets:add_element(Stream, Acc) end,
+    emqx_ds_shared_sub_leader_store:fold(stream, FoldFun, Acc0, Store).

+ 656 - 0
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_store.erl

@@ -0,0 +1,656 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_shared_sub_leader_store).
+
+-include_lib("emqx_utils/include/emqx_message.hrl").
+-include_lib("emqx_durable_storage/include/emqx_ds.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
+
+-export([
+    open/0,
+    close/0
+]).
+
+%% Leadership API
+-export([
+    %% Leadership claims
+    claim_leadership/3,
+    renew_leadership/3,
+    disown_leadership/2,
+    %% Accessors
+    leader_id/1,
+    alive_until/1,
+    heartbeat_interval/1
+]).
+
+%% Store API
+-export([
+    %% Lifecycle
+    init/1,
+    open/1,
+    %% TODO
+    %% destroy/1,
+    %% Managing records
+    get/3,
+    get/4,
+    fold/4,
+    size/2,
+    put/4,
+    get/2,
+    set/3,
+    delete/3,
+    dirty/1,
+    commit_dirty/2,
+    commit_renew/3
+]).
+
+-export_type([
+    t/0,
+    leader_claim/1
+]).
+
+-type group() :: binary().
+-type leader_claim(ID) :: {ID, _Heartbeat :: emqx_message:timestamp()}.
+
+-define(DS_DB, dqleader).
+
+-define(LEADER_TTL, 30_000).
+-define(LEADER_HEARTBEAT_INTERVAL, 10_000).
+
+-define(LEADER_TOPIC_PREFIX, <<"$leader">>).
+-define(LEADER_HEADER_HEARTBEAT, <<"$leader.ts">>).
+
+-define(STORE_TOPIC_PREFIX, <<"$s">>).
+
+-define(STORE_SK(SPACE, KEY), [SPACE | KEY]).
+-define(STORE_STAGE_ENTRY(SEQNUM, VALUE), {SEQNUM, VALUE}).
+-define(STORE_TOMBSTONE, '$tombstone').
+-define(STORE_PAYLOAD(ID, VALUE), [ID, VALUE]).
+-define(STORE_HEADER_CHANGESEQNUM, '$store.seqnum').
+
+-define(STORE_BATCH_SIZE, 500).
+-define(STORE_SLURP_RETRIES, 2).
+-define(STORE_SLURP_RETRY_TIMEOUT, 1000).
+
+-define(STORE_IS_ROOTSET(VAR), (VAR == seqnum)).
+
+-ifdef(TEST).
+-undef(LEADER_TTL).
+-undef(LEADER_HEARTBEAT_INTERVAL).
+-define(LEADER_TTL, 3_000).
+-define(LEADER_HEARTBEAT_INTERVAL, 1_000).
+-endif.
+
+%%
+
+open() ->
+    emqx_ds:open_db(?DS_DB, db_config()).
+
+close() ->
+    emqx_ds:close_db(?DS_DB).
+
+db_config() ->
+    Config = emqx_ds_schema:db_config([durable_storage, queues]),
+    tune_db_config(Config).
+
+tune_db_config(Config0 = #{backend := Backend}) ->
+    Config = Config0#{
+        %% We need total control over timestamp assignment.
+        force_monotonic_timestamps => false
+    },
+    case Backend of
+        B when B == builtin_raft; B == builtin_local ->
+            tune_db_storage_layout(Config);
+        _ ->
+            Config
+    end.
+
+tune_db_storage_layout(Config = #{storage := {Layout, Opts0}}) when
+    Layout == emqx_ds_storage_skipstream_lts;
+    Layout == emqx_ds_storage_bitfield_lts
+->
+    Opts = Opts0#{
+        %% Since these layouts impose somewhat strict requirements on message
+        %% timestamp uniqueness, we need to additionally ensure that LTS always
+        %% keeps different groups under separate indices.
+        lts_threshold_spec => {simple, {inf, inf, inf, 0}}
+    },
+    Config#{storage := {Layout, Opts}};
+tune_db_storage_layout(Config = #{storage := _}) ->
+    Config.
+
+%%
+
+-spec claim_leadership(group(), ID, emqx_message:timestamp()) ->
+    {ok | exists, leader_claim(ID)} | emqx_ds:error(_).
+claim_leadership(Group, LeaderID, TS) ->
+    LeaderClaim = {LeaderID, TS},
+    case try_replace_leader(Group, LeaderClaim, undefined) of
+        ok ->
+            {ok, LeaderClaim};
+        {exists, ExistingClaim = {_, LastHeartbeat}} when LastHeartbeat > TS - ?LEADER_TTL ->
+            {exists, ExistingClaim};
+        {exists, ExistingClaim = {_LeaderDead, _}} ->
+            case try_replace_leader(Group, LeaderClaim, ExistingClaim) of
+                ok ->
+                    {ok, LeaderClaim};
+                {exists, ConcurrentClaim} ->
+                    {exists, ConcurrentClaim};
+                Error ->
+                    Error
+            end;
+        Error ->
+            Error
+    end.
+
+-spec renew_leadership(group(), leader_claim(ID), emqx_message:timestamp()) ->
+    {ok | exists, leader_claim(ID)} | emqx_ds:error(_).
+renew_leadership(Group, LeaderClaim, TS) ->
+    RenewedClaim = renew_claim(LeaderClaim, TS),
+    case RenewedClaim =/= false andalso try_replace_leader(Group, RenewedClaim, LeaderClaim) of
+        ok ->
+            {ok, RenewedClaim};
+        {exists, NewestClaim} ->
+            {exists, NewestClaim};
+        false ->
+            {error, unrecoverable, leader_claim_outdated};
+        Error ->
+            Error
+    end.
+
+-spec renew_claim(leader_claim(ID), emqx_message:timestamp()) -> leader_claim(ID) | false.
+renew_claim({LeaderID, LastHeartbeat}, TS) ->
+    RenewedClaim = {LeaderID, TS},
+    IsRenewable = (LastHeartbeat > TS - ?LEADER_TTL),
+    IsRenewable andalso RenewedClaim.
+
+-spec disown_leadership(group(), leader_claim(_ID)) ->
+    ok | emqx_ds:error(_).
+disown_leadership(Group, LeaderClaim) ->
+    try_delete_leader(Group, LeaderClaim).
+
+-spec leader_id(leader_claim(ID)) ->
+    ID.
+leader_id({LeaderID, _}) ->
+    LeaderID.
+
+-spec alive_until(leader_claim(_)) ->
+    emqx_message:timestamp().
+alive_until({_LeaderID, LastHeartbeatTS}) ->
+    LastHeartbeatTS + ?LEADER_TTL.
+
+-spec heartbeat_interval(leader_claim(_)) ->
+    _Milliseconds :: pos_integer().
+heartbeat_interval(_) ->
+    ?LEADER_HEARTBEAT_INTERVAL.
+try_replace_leader(Group, LeaderClaim, ExistingClaim) ->
+    Batch = #dsbatch{
+        preconditions = [mk_precondition(Group, ExistingClaim)],
+        operations = [encode_leader_claim(Group, LeaderClaim)]
+    },
+    case emqx_ds:store_batch(?DS_DB, Batch, #{sync => true}) of
+        ok ->
+            ok;
+        {error, unrecoverable, {precondition_failed, Mismatch}} ->
+            {exists, decode_leader_msg(Mismatch)};
+        Error ->
+            Error
+    end.
+
+try_delete_leader(Group, LeaderClaim) ->
+    {_Cond, Matcher} = mk_precondition(Group, LeaderClaim),
+    emqx_ds:store_batch(?DS_DB, #dsbatch{operations = [{delete, Matcher}]}, #{sync => false}).
+
+mk_precondition(Group, undefined) ->
+    {unless_exists, #message_matcher{
+        from = Group,
+        topic = mk_leader_topic(Group),
+        timestamp = 0,
+        payload = '_'
+    }};
+mk_precondition(Group, {Leader, HeartbeatTS}) ->
+    {if_exists, #message_matcher{
+        from = Group,
+        topic = mk_leader_topic(Group),
+        timestamp = 0,
+        payload = encode_leader(Leader),
+        headers = #{?LEADER_HEADER_HEARTBEAT => HeartbeatTS}
+    }}.
+
+encode_leader_claim(Group, {Leader, HeartbeatTS}) ->
+    #message{
+        id = <<>>,
+        qos = 0,
+        from = Group,
+        topic = mk_leader_topic(Group),
+        timestamp = 0,
+        payload = encode_leader(Leader),
+        headers = #{?LEADER_HEADER_HEARTBEAT => HeartbeatTS}
+    }.
+
+decode_leader_msg(#message{from = _Group, payload = Payload, headers = Headers}) ->
+    Leader = decode_leader(Payload),
+    Heartbeat = maps:get(?LEADER_HEADER_HEARTBEAT, Headers, 0),
+    {Leader, Heartbeat}.
+
+encode_leader(Leader) ->
+    %% NOTE: Lists are compact but easy to extend later.
+    term_to_binary([Leader]).
+
+decode_leader(Payload) ->
+    [Leader | _Extra] = binary_to_term(Payload),
+    Leader.
+
+mk_leader_topic(GroupName) ->
+    emqx_topic:join([?LEADER_TOPIC_PREFIX, GroupName]).
+
+%%
+
+-type space_name() :: stream.
+-type var_name() :: start_time | rank_progress.
+-type space_key() :: nonempty_improper_list(space_name(), _Key).
+
+%% NOTE
+%% Instances of `emqx_ds:stream()` type are persisted in durable storage.
+%% Given that streams are opaque and identity of a stream is stream itself (i.e.
+%% if S1 =:= S2 then both are the same stream), it's critical to keep the "shape"
+%% of the term intact between releases. Otherwise, if it changes then we will
+%% need an additional API to deal with that (e.g. `emqx_ds:term_to_stream/2`).
+%% Instances of `emqx_ds:iterator()` are also persisted in durable storage,
+%% but those already has similar requirement because in some backends they travel
+%% in RPCs between different nodes of potentially different releases.
+-type t() :: #{
+    %% General.
+    group := group(),
+    %% Spaces and variables: most up-to-date in-memory state.
+    stream := #{emqx_ds:stream() => stream_state()},
+    start_time => _SubsriptionStartTime :: emqx_message:timestamp(),
+    rank_progress => _RankProgress,
+    %% Internal _sequence number_ that tracks every change.
+    seqnum := integer(),
+    %% Mapping between complex keys and seqnums.
+    seqmap := #{space_key() => _SeqNum :: integer()},
+    %% Stage: uncommitted changes.
+    stage := #{space_key() | var_name() => _Value},
+    dirty => true
+}.
+
+-type stream_state() :: #{
+    progress => emqx_persistent_session_ds_shared_subs:progress(),
+    rank => emqx_ds:stream_rank()
+}.
+
+-spec init(group()) -> t().
+init(Group) ->
+    %% NOTE: Empty store is dirty because rootset needs to be persisted.
+    mark_dirty(mk_store(Group)).
+
+-spec open(group()) -> t() | false.
+open(Group) ->
+    open_store(mk_store(Group)).
+
+mk_store(Group) ->
+    #{
+        group => Group,
+        stream => #{},
+        seqnum => 0,
+        seqmap => #{},
+        stage => #{}
+    }.
+
+open_store(Store = #{group := Group}) ->
+    ReadRootset = mk_read_root_batch(Group),
+    case emqx_ds:store_batch(?DS_DB, ReadRootset, #{sync => true}) of
+        ok ->
+            false;
+        {error, unrecoverable, {precondition_failed, RootMessage}} ->
+            Rootset = open_root_message(RootMessage),
+            slurp_store(Rootset, Store)
+    end.
+
+slurp_store(Rootset, Acc) ->
+    slurp_store(Rootset, #{}, ?STORE_SLURP_RETRIES, ?STORE_SLURP_RETRY_TIMEOUT, Acc).
+
+slurp_store(Rootset, StreamIts0, Retries, RetryTimeout, Acc = #{group := Group}) ->
+    TopicFilter = mk_store_wildcard(Group),
+    StreamIts1 = ds_refresh_streams(TopicFilter, _StartTime = 0, StreamIts0),
+    {StreamIts, Store} = ds_streams_fold(
+        fun(Message, StoreAcc) -> open_message(Message, StoreAcc) end,
+        Acc,
+        StreamIts1
+    ),
+    case map_get(seqnum, Store) of
+        SeqNum when SeqNum >= map_get(seqnum, Rootset) ->
+            maps:merge(Store, Rootset);
+        _Mismatch when Retries > 0 ->
+            ok = timer:sleep(RetryTimeout),
+            slurp_store(Rootset, StreamIts, Retries - 1, RetryTimeout, Store);
+        _Mismatch ->
+            {error, unrecoverable, {leader_store_inconsistent, Store, Rootset}}
+    end.
+
+-spec get(space_name(), _ID, t()) -> _Value.
+get(SpaceName, ID, Store) ->
+    Space = maps:get(SpaceName, Store),
+    maps:get(ID, Space).
+
+-spec get(space_name(), _ID, Default, t()) -> _Value | Default.
+get(SpaceName, ID, Default, Store) ->
+    Space = maps:get(SpaceName, Store),
+    maps:get(ID, Space, Default).
+
+-spec fold(space_name(), fun((_ID, _Value, Acc) -> Acc), Acc, t()) -> Acc.
+fold(SpaceName, Fun, Acc, Store) ->
+    Space = maps:get(SpaceName, Store),
+    maps:fold(Fun, Acc, Space).
+
+-spec size(space_name(), t()) -> non_neg_integer().
+size(SpaceName, Store) ->
+    map_size(maps:get(SpaceName, Store)).
+
+-spec put(space_name(), _ID, _Value, t()) -> t().
+put(SpaceName, ID, Value, Store0 = #{stage := Stage, seqnum := SeqNum0, seqmap := SeqMap}) ->
+    Space0 = maps:get(SpaceName, Store0),
+    Space1 = maps:put(ID, Value, Space0),
+    SeqNum = SeqNum0 + 1,
+    SK = ?STORE_SK(SpaceName, ID),
+    Store = Store0#{
+        SpaceName := Space1,
+        seqnum := SeqNum,
+        stage := Stage#{SK => ?STORE_STAGE_ENTRY(SeqNum, Value)}
+    },
+    case map_size(Space1) of
+        S when S > map_size(Space0) ->
+            Store#{seqmap := maps:put(SK, SeqNum, SeqMap)};
+        _ ->
+            Store
+    end.
+
+get_seqnum(?STORE_SK(_SpaceName, _) = SK, SeqMap) ->
+    maps:get(SK, SeqMap);
+get_seqnum(_VarName, _SeqMap) ->
+    0.
+
+-spec get(var_name(), t()) -> _Value.
+get(VarName, Store) ->
+    maps:get(VarName, Store).
+
+-spec set(var_name(), _Value, t()) -> t().
+set(VarName, Value, Store = #{stage := Stage, seqnum := SeqNum0}) ->
+    SeqNum = SeqNum0 + 1,
+    Store#{
+        VarName => Value,
+        seqnum := SeqNum,
+        stage := Stage#{VarName => ?STORE_STAGE_ENTRY(SeqNum, Value)}
+    }.
+
+-spec delete(space_name(), _ID, t()) -> t().
+delete(SpaceName, ID, Store = #{stage := Stage, seqmap := SeqMap}) ->
+    Space0 = maps:get(SpaceName, Store),
+    Space1 = maps:remove(ID, Space0),
+    case map_size(Space1) of
+        S when S < map_size(Space0) ->
+            %% NOTE
+            %% We do not bump seqnum on deletions because tracking them does
+            %% not make a lot of sense, assuming batches are atomic.
+            SK = ?STORE_SK(SpaceName, ID),
+            Store#{
+                SpaceName := Space1,
+                stage := Stage#{SK => ?STORE_TOMBSTONE},
+                seqmap := maps:remove(SK, SeqMap)
+            };
+        _ ->
+            Store
+    end.
+
+mark_dirty(Store) ->
+    Store#{dirty => true}.
+
+mark_clean(Store) ->
+    maps:remove(dirty, Store).
+
+-spec dirty(t()) -> boolean().
+dirty(#{dirty := Dirty}) ->
+    Dirty;
+dirty(#{stage := Stage}) ->
+    map_size(Stage) > 0.
+
+%% @doc Commit staged changes to the storage.
+%% Does nothing if there are no staged changes.
+-spec commit_dirty(leader_claim(_), t()) ->
+    {ok, t()} | emqx_ds:error(_).
+commit_dirty(LeaderClaim, Store = #{dirty := true}) ->
+    commit(LeaderClaim, Store);
+commit_dirty(LeaderClaim, Store = #{stage := Stage}) when map_size(Stage) > 0 ->
+    commit(LeaderClaim, Store).
+
+commit(LeaderClaim, Store = #{group := Group}) ->
+    Operations = mk_store_operations(Store),
+    Batch = mk_store_batch(Group, LeaderClaim, Operations),
+    case emqx_ds:store_batch(?DS_DB, Batch, #{sync => true}) of
+        ok ->
+            {ok, mark_clean(Store#{stage := #{}})};
+        {error, unrecoverable, {precondition_failed, Mismatch}} ->
+            {error, unrecoverable, {leadership_lost, decode_leader_msg(Mismatch)}};
+        Error ->
+            Error
+    end.
+
+%% @doc Commit staged changes and renew leadership at the same time.
+%% Goes to the storage even if there are no staged changes.
+-spec commit_renew(leader_claim(ID), emqx_message:timestamp(), t()) ->
+    {ok, leader_claim(ID), t()} | emqx_ds:error(_).
+commit_renew(LeaderClaim, TS, Store = #{group := Group}) ->
+    case renew_claim(LeaderClaim, TS) of
+        RenewedClaim when RenewedClaim =/= false ->
+            Operations = mk_store_operations(Store),
+            Batch = mk_store_batch(Group, LeaderClaim, RenewedClaim, Operations),
+            case emqx_ds:store_batch(?DS_DB, Batch, #{sync => true}) of
+                ok ->
+                    {ok, RenewedClaim, mark_clean(Store#{stage := #{}})};
+                {error, unrecoverable, {precondition_failed, Mismatch}} ->
+                    {error, unrecoverable, {leadership_lost, decode_leader_msg(Mismatch)}};
+                Error ->
+                    Error
+            end;
+        false ->
+            {error, unrecoverable, leader_claim_outdated}
+    end.
+
+mk_store_batch(Group, LeaderClaim, Operations) ->
+    #dsbatch{
+        preconditions = [mk_precondition(Group, LeaderClaim)],
+        operations = Operations
+    }.
+
+mk_store_batch(Group, ExistingClaim, RenewedClaim, Operations) ->
+    #dsbatch{
+        preconditions = [mk_precondition(Group, ExistingClaim)],
+        operations = [encode_leader_claim(Group, RenewedClaim) | Operations]
+    }.
+
+mk_store_operations(Store = #{group := Group, stage := Stage, seqmap := SeqMap}) ->
+    %% NOTE: Always persist rootset.
+    RootOperation = mk_store_root(Store),
+    maps:fold(
+        fun(SK, Value, Acc) ->
+            [mk_store_operation(Group, SK, Value, SeqMap) | Acc]
+        end,
+        [RootOperation],
+        Stage
+    ).
+
+mk_store_root(Store = #{group := Group}) ->
+    Payload = maps:filter(fun(V, _) -> ?STORE_IS_ROOTSET(V) end, Store),
+    #message{
+        id = <<>>,
+        qos = 0,
+        from = Group,
+        topic = mk_store_root_topic(Group),
+        payload = term_to_binary(Payload),
+        timestamp = 0
+    }.
+
+mk_store_operation(Group, SK, ?STORE_TOMBSTONE, SeqMap) ->
+    {delete, #message_matcher{
+        from = Group,
+        topic = mk_store_topic(Group, SK, SeqMap),
+        payload = '_',
+        timestamp = get_seqnum(SK, SeqMap)
+    }};
+mk_store_operation(Group, SK, ?STORE_STAGE_ENTRY(ChangeSeqNum, Value), SeqMap) ->
+    %% NOTE
+    %% Using `SeqNum` as timestamp to further disambiguate one record (message) from
+    %% another in the DS DB keyspace. As an example, Skipstream-LTS storage layout
+    %% _requires_ messages in the same stream to have unique timestamps.
+    %% TODO
+    %% Do we need to have wall-clock timestamp here?
+    Payload = mk_store_payload(SK, Value),
+    #message{
+        id = <<>>,
+        qos = 0,
+        from = Group,
+        topic = mk_store_topic(Group, SK, SeqMap),
+        payload = term_to_binary(Payload),
+        timestamp = get_seqnum(SK, SeqMap),
+        %% NOTE: Preserving the seqnum when this change has happened.
+        headers = #{?STORE_HEADER_CHANGESEQNUM => ChangeSeqNum}
+    }.
+
+open_root_message(#message{payload = Payload, timestamp = 0}) ->
+    #{} = binary_to_term(Payload).
+
+open_message(
+    Msg = #message{topic = Topic, payload = Payload, timestamp = SeqNum, headers = Headers}, Store
+) ->
+    Entry =
+        try
+            ChangeSeqNum = maps:get(?STORE_HEADER_CHANGESEQNUM, Headers),
+            case emqx_topic:tokens(Topic) of
+                [_Prefix, _Group, SpaceTok, _SeqTok] ->
+                    SpaceName = token_to_space(SpaceTok),
+                    ?STORE_PAYLOAD(ID, Value) = binary_to_term(Payload),
+                    %% TODO: Records.
+                    Record = {SpaceName, ID, Value, SeqNum};
+                [_Prefix, _Group, VarTok] ->
+                    VarName = token_to_varname(VarTok),
+                    Value = binary_to_term(Payload),
+                    Record = {VarName, Value}
+            end,
+            {ChangeSeqNum, Record}
+        catch
+            error:_ ->
+                ?tp(warning, "dssubs_leader_store_unrecognized_message", #{
+                    group => maps:get(group, Store),
+                    message => Msg
+                }),
+                unrecognized
+        end,
+    open_entry(Entry, Store).
+
+open_entry({ChangeSeqNum, Record}, Store = #{seqnum := SeqNum}) ->
+    open_record(Record, Store#{seqnum := max(ChangeSeqNum, SeqNum)}).
+
+open_record({SpaceName, ID, Value, SeqNum}, Store = #{seqmap := SeqMap}) ->
+    Space0 = maps:get(SpaceName, Store),
+    Space1 = maps:put(ID, Value, Space0),
+    SK = ?STORE_SK(SpaceName, ID),
+    Store#{
+        SpaceName := Space1,
+        seqmap := SeqMap#{SK => SeqNum}
+    };
+open_record({VarName, Value}, Store) ->
+    Store#{VarName => Value}.
+
+mk_store_payload(?STORE_SK(_SpaceName, ID), Value) ->
+    ?STORE_PAYLOAD(ID, Value);
+mk_store_payload(_VarName, Value) ->
+    Value.
+
+mk_store_root_topic(GroupName) ->
+    emqx_topic:join([?STORE_TOPIC_PREFIX, GroupName]).
+
+mk_store_topic(GroupName, ?STORE_SK(SpaceName, _) = SK, SeqMap) ->
+    SeqNum = get_seqnum(SK, SeqMap),
+    SeqTok = integer_to_binary(SeqNum),
+    emqx_topic:join([?STORE_TOPIC_PREFIX, GroupName, space_to_token(SpaceName), SeqTok]);
+mk_store_topic(GroupName, VarName, _SeqMap) ->
+    emqx_topic:join([?STORE_TOPIC_PREFIX, GroupName, varname_to_token(VarName)]).
+
+mk_store_wildcard(GroupName) ->
+    [?STORE_TOPIC_PREFIX, GroupName, '+', '#'].
+
+mk_read_root_batch(Group) ->
+    %% NOTE
+    %% Construct batch that essentially does nothing but reads rootset in a consistent
+    %% manner.
+    Matcher = #message_matcher{
+        from = Group,
+        topic = mk_store_root_topic(Group),
+        payload = '_',
+        timestamp = 0
+    },
+    #dsbatch{
+        preconditions = [{unless_exists, Matcher}],
+        operations = [{delete, Matcher#message_matcher{payload = <<>>}}]
+    }.
+
+ds_refresh_streams(TopicFilter, StartTime, StreamIts) ->
+    Streams = emqx_ds:get_streams(?DS_DB, TopicFilter, StartTime),
+    lists:foldl(
+        fun({_Rank, Stream}, Acc) ->
+            case StreamIts of
+                #{Stream := _It} ->
+                    Acc;
+                #{} ->
+                    %% TODO: Gracefully handle `emqx_ds:error(_)`?
+                    {ok, It} = emqx_ds:make_iterator(?DS_DB, Stream, TopicFilter, StartTime),
+                    Acc#{Stream => It}
+            end
+        end,
+        StreamIts,
+        Streams
+    ).
+
+ds_streams_fold(Fun, AccIn, StreamItsIn) ->
+    maps:fold(
+        fun(Stream, It0, {StreamIts, Acc0}) ->
+            {It, Acc} = ds_stream_fold(Fun, Acc0, It0),
+            {StreamIts#{Stream := It}, Acc}
+        end,
+        {StreamItsIn, AccIn},
+        StreamItsIn
+    ).
+
+ds_stream_fold(_Fun, Acc0, end_of_stream) ->
+    %% NOTE: Assuming atom `end_of_stream` is not a valid `emqx_ds:iterator()`.
+    {end_of_stream, Acc0};
+ds_stream_fold(Fun, Acc0, It0) ->
+    %% TODO: Gracefully handle `emqx_ds:error(_)`?
+    case emqx_ds:next(?DS_DB, It0, ?STORE_BATCH_SIZE) of
+        {ok, It, Messages = [_ | _]} ->
+            Acc1 = lists:foldl(fun({_Key, Msg}, Acc) -> Fun(Msg, Acc) end, Acc0, Messages),
+            ds_stream_fold(Fun, Acc1, It);
+        {ok, It, []} ->
+            {It, Acc0};
+        {ok, end_of_stream} ->
+            {end_of_stream, Acc0}
+    end.
+
+%%
+
+space_to_token(stream) -> <<"s">>;
+space_to_token(progress) -> <<"prog">>;
+space_to_token(sequence) -> <<"seq">>.
+
+token_to_space(<<"s">>) -> stream;
+token_to_space(<<"prog">>) -> progress;
+token_to_space(<<"seq">>) -> sequence.
+
+varname_to_token(rank_progress) -> <<"rankp">>;
+varname_to_token(start_time) -> <<"stime">>.
+
+token_to_varname(<<"rankp">>) -> rank_progress;
+token_to_varname(<<"stime">>) -> start_time.

+ 0 - 59
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_sup.erl

@@ -1,59 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%--------------------------------------------------------------------
-
--module(emqx_ds_shared_sub_leader_sup).
-
--behaviour(supervisor).
-
-%% API
--export([
-    start_link/0,
-    child_spec/0,
-
-    start_leader/1,
-    stop_leader/1
-]).
-
-%% supervisor behaviour callbacks
--export([init/1]).
-
-%%------------------------------------------------------------------------------
-%% API
-%%------------------------------------------------------------------------------
-
--spec start_link() -> supervisor:startlink_ret().
-start_link() ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-
--spec child_spec() -> supervisor:child_spec().
-child_spec() ->
-    #{
-        id => ?MODULE,
-        start => {?MODULE, start_link, []},
-        restart => permanent,
-        shutdown => 5000,
-        type => supervisor
-    }.
-
--spec start_leader(emqx_ds_shared_sub_leader:options()) -> supervisor:startchild_ret().
-start_leader(Options) ->
-    ChildSpec = emqx_ds_shared_sub_leader:child_spec(Options),
-    supervisor:start_child(?MODULE, ChildSpec).
-
--spec stop_leader(emqx_persistent_session_ds:share_topic_filter()) -> ok | {error, term()}.
-stop_leader(TopicFilter) ->
-    supervisor:terminate_child(?MODULE, emqx_ds_shared_sub_leader:id(TopicFilter)).
-
-%%------------------------------------------------------------------------------
-%% supervisor behaviour callbacks
-%%------------------------------------------------------------------------------
-
-init([]) ->
-    SupFlags = #{
-        strategy => one_for_one,
-        intensity => 10,
-        period => 10
-    },
-    ChildSpecs = [],
-    {ok, {SupFlags, ChildSpecs}}.

+ 52 - 97
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl

@@ -4,122 +4,77 @@
 
 -module(emqx_ds_shared_sub_registry).
 
--behaviour(gen_server).
-
--include_lib("emqx/include/logger.hrl").
-
+%% API
 -export([
     start_link/0,
-    child_spec/0,
-
-    init/1,
-    handle_call/3,
-    handle_cast/2,
-    handle_info/2,
-    terminate/2
+    child_spec/0
 ]).
 
 -export([
-    lookup_leader/3
+    leader_wanted/3,
+    start_elector/2
 ]).
 
--record(lookup_leader, {
-    agent :: emqx_ds_shared_sub_proto:agent(),
-    agent_metadata :: emqx_ds_shared_sub_proto:agent_metadata(),
-    share_topic_filter :: emqx_persistent_session_ds:share_topic_filter()
-}).
+-behaviour(supervisor).
+-export([init/1]).
 
--define(gproc_id(ID), {n, l, ID}).
-
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% API
-%%--------------------------------------------------------------------
-
--spec lookup_leader(
-    emqx_ds_shared_sub_proto:agent(),
-    emqx_ds_shared_sub_proto:agent_metadata(),
-    emqx_persistent_session_ds:share_topic_filter()
-) -> ok.
-lookup_leader(Agent, AgentMetadata, ShareTopicFilter) ->
-    gen_server:cast(?MODULE, #lookup_leader{
-        agent = Agent, agent_metadata = AgentMetadata, share_topic_filter = ShareTopicFilter
-    }).
-
-%%--------------------------------------------------------------------
-%% Internal API
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
+-spec start_link() -> supervisor:startlink_ret().
 start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
+-spec child_spec() -> supervisor:child_spec().
 child_spec() ->
     #{
         id => ?MODULE,
         start => {?MODULE, start_link, []},
         restart => permanent,
-        shutdown => 5000,
-        type => worker
+        type => supervisor
     }.
 
-%%--------------------------------------------------------------------
-%% gen_server callbacks
-%%--------------------------------------------------------------------
-
-init([]) ->
-    {ok, #{}}.
+-spec leader_wanted(
+    emqx_ds_shared_sub_proto:agent(),
+    emqx_ds_shared_sub_proto:agent_metadata(),
+    emqx_persistent_session_ds:share_topic_filter()
+) -> ok.
+leader_wanted(Agent, AgentMetadata, ShareTopic) ->
+    {ok, Pid} = ensure_elector_started(ShareTopic),
+    emqx_ds_shared_sub_proto:agent_connect_leader(Pid, Agent, AgentMetadata, ShareTopic).
+
+-spec ensure_elector_started(emqx_persistent_session_ds:share_topic_filter()) ->
+    {ok, pid()}.
+ensure_elector_started(ShareTopic) ->
+    case start_elector(ShareTopic, _StartTime = emqx_message:timestamp_now()) of
+        {ok, Pid} ->
+            {ok, Pid};
+        {error, {already_started, Pid}} when is_pid(Pid) ->
+            {ok, Pid}
+    end.
+
+-spec start_elector(emqx_persistent_session_ds:share_topic_filter(), emqx_message:timestamp()) ->
+    supervisor:startchild_ret().
+start_elector(ShareTopic, StartTime) ->
+    supervisor:start_child(?MODULE, #{
+        id => ShareTopic,
+        start => {emqx_ds_shared_sub_elector, start_link, [ShareTopic, StartTime]},
+        restart => temporary,
+        type => worker,
+        shutdown => 5000
+    }).
 
-handle_call(_Request, _From, State) ->
-    {reply, {error, unknown_request}, State}.
+%%------------------------------------------------------------------------------
+%% supervisor behaviour callbacks
+%%------------------------------------------------------------------------------
 
-handle_cast(
-    #lookup_leader{
-        agent = Agent,
-        agent_metadata = AgentMetadata,
-        share_topic_filter = ShareTopicFilter
+init([]) ->
+    ok = emqx_ds_shared_sub_leader_store:open(),
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 10,
+        period => 1
     },
-    State
-) ->
-    State1 = do_lookup_leader(Agent, AgentMetadata, ShareTopicFilter, State),
-    {noreply, State1}.
-
-handle_info(_Info, State) ->
-    {noreply, State}.
-
-terminate(_Reason, _State) ->
-    ok.
-
-%%--------------------------------------------------------------------
-%% Internal functions
-%%--------------------------------------------------------------------
-
-do_lookup_leader(Agent, AgentMetadata, ShareTopicFilter, State) ->
-    %% TODO https://emqx.atlassian.net/browse/EMQX-12309
-    %% Cluster-wide unique leader election should be implemented
-    Id = emqx_ds_shared_sub_leader:id(ShareTopicFilter),
-    LeaderPid =
-        case gproc:where(?gproc_id(Id)) of
-            undefined ->
-                {ok, Pid} = emqx_ds_shared_sub_leader_sup:start_leader(#{
-                    share_topic_filter => ShareTopicFilter
-                }),
-                {ok, NewLeaderPid} = emqx_ds_shared_sub_leader:register(
-                    Pid,
-                    fun() ->
-                        {LPid, _} = gproc:reg_or_locate(?gproc_id(Id)),
-                        LPid
-                    end
-                ),
-                NewLeaderPid;
-            Pid ->
-                Pid
-        end,
-    ?SLOG(info, #{
-        msg => lookup_leader,
-        agent => Agent,
-        share_topic_filter => ShareTopicFilter,
-        leader => LeaderPid
-    }),
-    ok = emqx_ds_shared_sub_proto:agent_connect_leader(
-        LeaderPid, Agent, AgentMetadata, ShareTopicFilter
-    ),
-    State.
+    ChildSpecs = [],
+    {ok, {SupFlags, ChildSpecs}}.

+ 15 - 0
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_schema.erl

@@ -13,6 +13,10 @@
     desc/1
 ]).
 
+-export([
+    injected_fields/0
+]).
+
 namespace() -> emqx_shared_subs.
 
 roots() ->
@@ -42,6 +46,17 @@ fields(durable_queues) ->
         duration(leader_session_not_replaying_timeout_ms, 5000)
     ].
 
+injected_fields() ->
+    #{
+        'durable_storage' => [
+            {queues,
+                emqx_ds_schema:storage_schema(#{
+                    importance => ?IMPORTANCE_HIDDEN,
+                    desc => ?DESC(durable_queues_storage)
+                })}
+        ]
+    }.
+
 duration(MsFieldName, Default) ->
     {MsFieldName,
         ?HOCON(

+ 1 - 2
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_sup.erl

@@ -30,7 +30,6 @@ init([]) ->
         period => 10
     },
     ChildSpecs = [
-        emqx_ds_shared_sub_registry:child_spec(),
-        emqx_ds_shared_sub_leader_sup:child_spec()
+        emqx_ds_shared_sub_registry:child_spec()
     ],
     {ok, {SupFlags, ChildSpecs}}.

+ 54 - 8
apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl

@@ -18,7 +18,7 @@ all() ->
 init_per_suite(Config) ->
     Apps = emqx_cth_suite:start(
         [
-            {emqx, #{
+            {emqx_conf, #{
                 config => #{
                     <<"durable_sessions">> => #{
                         <<"enable">> => true,
@@ -27,10 +27,17 @@ init_per_suite(Config) ->
                     <<"durable_storage">> => #{
                         <<"messages">> => #{
                             <<"backend">> => <<"builtin_raft">>
+                        },
+                        <<"queues">> => #{
+                            <<"backend">> => <<"builtin_raft">>,
+                            <<"local_write_buffer">> => #{
+                                <<"flush_interval">> => <<"10ms">>
+                            }
                         }
                     }
                 }
             }},
+            emqx,
             emqx_ds_shared_sub
         ],
         #{work_dir => ?config(priv_dir, Config)}
@@ -183,6 +190,44 @@ t_graceful_disconnect(_Config) ->
     ok = emqtt:disconnect(ConnShared2),
     ok = emqtt:disconnect(ConnPub).
 
+t_leader_state_preserved(_Config) ->
+    ?check_trace(
+        begin
+            ConnShared1 = emqtt_connect_sub(<<"client1">>),
+            {ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/lsp/topic42/#">>, 1),
+
+            ConnShared2 = emqtt_connect_sub(<<"client2">>),
+            {ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/lsp/topic42/#">>, 1),
+
+            ConnPub = emqtt_connect_pub(<<"client_pub">>),
+
+            {ok, _} = emqtt:publish(ConnPub, <<"topic42/1/2">>, <<"hello1">>, 1),
+            {ok, _} = emqtt:publish(ConnPub, <<"topic42/3/4">>, <<"hello2">>, 1),
+            ?assertReceive({publish, #{payload := <<"hello1">>}}, 2_000),
+            ?assertReceive({publish, #{payload := <<"hello2">>}}, 2_000),
+
+            ok = emqtt:disconnect(ConnShared1),
+            ok = emqtt:disconnect(ConnShared2),
+
+            %% Equivalent to node restart.
+            ok = terminate_leaders(),
+            ok = timer:sleep(1_000),
+
+            {ok, _} = emqtt:publish(ConnPub, <<"topic42/1/2">>, <<"hello3">>, 1),
+            {ok, _} = emqtt:publish(ConnPub, <<"topic42/3/4">>, <<"hello4">>, 1),
+
+            ConnShared3 = emqtt_connect_sub(<<"client3">>),
+            {ok, _, _} = emqtt:subscribe(ConnShared3, <<"$share/lsp/topic42/#">>, 1),
+
+            ?assertReceive({publish, #{payload := <<"hello3">>}}, 2_000),
+            ?assertReceive({publish, #{payload := <<"hello4">>}}, 2_000),
+
+            ok = emqtt:disconnect(ConnShared3),
+            ok = emqtt:disconnect(ConnPub)
+        end,
+        []
+    ).
+
 t_intensive_reassign(_Config) ->
     ConnPub = emqtt_connect_pub(<<"client_pub">>),
 
@@ -405,15 +450,16 @@ t_disconnect_no_double_replay2(_Config) ->
     %     3000
     % ),
 
-    ok = emqtt:disconnect(ConnShared12).
+    ok = emqtt:disconnect(ConnShared12),
+    ok = emqtt:disconnect(ConnPub).
 
 t_lease_reconnect(_Config) ->
     ConnPub = emqtt_connect_pub(<<"client_pub">>),
 
     ConnShared = emqtt_connect_sub(<<"client_shared">>),
 
-    %% Stop registry to simulate unability to find leader.
-    ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry),
+    %% Simulate unability to find leader.
+    ok = emqx_ds_shared_sub_leader_store:close(),
 
     ?assertWaitEvent(
         {ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr2/topic2/#">>, 1),
@@ -421,9 +467,9 @@ t_lease_reconnect(_Config) ->
         5_000
     ),
 
-    %% Start registry, agent should retry after some time and find the leader.
+    %% Agent should retry after some time and find the leader.
     ?assertWaitEvent(
-        {ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry),
+        ok = emqx_ds_shared_sub_leader_store:open(),
         #{?snk_kind := leader_lease_streams},
         5_000
     ),
@@ -490,8 +536,8 @@ emqtt_connect_pub(ClientId) ->
     C.
 
 terminate_leaders() ->
-    ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup),
-    {ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup),
+    ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry),
+    {ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry),
     ok.
 
 publish_n(_Conn, _Topics, From, To) when From > To ->

+ 7 - 3
apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_api_SUITE.erl

@@ -25,7 +25,7 @@ all() ->
 init_per_suite(Config) ->
     Apps = emqx_cth_suite:start(
         [
-            {emqx, #{
+            {emqx_conf, #{
                 config => #{
                     <<"durable_sessions">> => #{
                         <<"enable">> => true,
@@ -34,10 +34,14 @@ init_per_suite(Config) ->
                     <<"durable_storage">> => #{
                         <<"messages">> => #{
                             <<"backend">> => <<"builtin_raft">>
+                        },
+                        <<"queues">> => #{
+                            <<"backend">> => <<"builtin_raft">>
                         }
                     }
                 }
             }},
+            emqx,
             emqx_ds_shared_sub,
             emqx_management,
             emqx_mgmt_api_test_util:emqx_dashboard()
@@ -135,6 +139,6 @@ api(Method, Path, Data) ->
     end.
 
 terminate_leaders() ->
-    ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup),
-    {ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup),
+    ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry),
+    {ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry),
     ok.

+ 6 - 3
apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_config_SUITE.erl

@@ -18,8 +18,7 @@ all() ->
 init_per_suite(Config) ->
     Apps = emqx_cth_suite:start(
         [
-            emqx_conf,
-            {emqx, #{
+            {emqx_conf, #{
                 config => #{
                     <<"durable_sessions">> => #{
                         <<"enable">> => true,
@@ -28,6 +27,9 @@ init_per_suite(Config) ->
                     <<"durable_storage">> => #{
                         <<"messages">> => #{
                             <<"backend">> => <<"builtin_raft">>
+                        },
+                        <<"queues">> => #{
+                            <<"backend">> => <<"builtin_raft">>
                         }
                     }
                 }
@@ -39,7 +41,8 @@ init_per_suite(Config) ->
                         <<"session_find_leader_timeout_ms">> => "1200ms"
                     }
                 }
-            }}
+            }},
+            emqx
         ],
         #{work_dir => ?config(priv_dir, Config)}
     ),

+ 6 - 5
apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_mgmt_api_subscription_SUITE.erl

@@ -18,11 +18,11 @@ all() -> emqx_common_test_helpers:all(?MODULE).
 init_per_suite(Config) ->
     Apps = emqx_cth_suite:start(
         [
-            {emqx,
-                "durable_sessions {\n"
-                "    enable = true\n"
-                "    renew_streams_interval = 10ms\n"
-                "}"},
+            {emqx_conf,
+                "durable_sessions {"
+                "\n     enable = true"
+                "\n     renew_streams_interval = 10ms"
+                "\n }"},
             {emqx_ds_shared_sub, #{
                 config => #{
                     <<"durable_queues">> => #{
@@ -31,6 +31,7 @@ init_per_suite(Config) ->
                     }
                 }
             }},
+            emqx,
             emqx_management,
             emqx_mgmt_api_test_util:emqx_dashboard()
         ],

+ 2 - 1
apps/emqx_durable_storage/src/emqx_ds_buffer.erl

@@ -192,7 +192,8 @@ handle_info(?flush, S) ->
 handle_info(_Info, S) ->
     {noreply, S}.
 
-terminate(_Reason, #s{db = DB}) ->
+terminate(_Reason, S = #s{db = DB}) ->
+    _ = flush(S),
     persistent_term:erase(?cbm(DB)),
     ok.
 

+ 18 - 1
apps/emqx_durable_storage/src/emqx_ds_lts.erl

@@ -31,6 +31,8 @@
     info/2,
     info/1,
 
+    threshold_fun/1,
+
     compress_topic/3,
     decompress_topic/2
 ]).
@@ -44,7 +46,9 @@
     static_key/0,
     trie/0,
     msg_storage_key/0,
-    learned_structure/0
+    learned_structure/0,
+    threshold_spec/0,
+    threshold_fun/0
 ]).
 
 -include_lib("stdlib/include/ms_transform.hrl").
@@ -83,6 +87,12 @@
 
 -type msg_storage_key() :: {static_key(), varying()}.
 
+-type threshold_spec() ::
+    %% Simple spec that maps level (depth) to a threshold.
+    %% For example, `{simple, {inf, 20}}` means that 0th level has infinite
+    %% threshold while all other levels' threshold is 20.
+    {simple, tuple()}.
+
 -type threshold_fun() :: fun((non_neg_integer()) -> non_neg_integer()).
 
 -type persist_callback() :: fun((_Key, _Val) -> ok).
@@ -313,6 +323,13 @@ info(Trie) ->
         {topics, info(Trie, topics)}
     ].
 
+-spec threshold_fun(threshold_spec()) -> threshold_fun().
+threshold_fun({simple, Thresholds}) ->
+    S = tuple_size(Thresholds),
+    fun(Depth) ->
+        element(min(Depth + 1, S), Thresholds)
+    end.
+
 %%%%%%%% Topic compression %%%%%%%%%%
 
 %% @doc Given topic structure for the static LTS index (as returned by

+ 15 - 11
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -81,7 +81,8 @@
     #{
         bits_per_wildcard_level => pos_integer(),
         topic_index_bytes => pos_integer(),
-        epoch_bits => non_neg_integer()
+        epoch_bits => non_neg_integer(),
+        lts_threshold_spec => emqx_ds_lts:threshold_spec()
     }.
 
 %% Permanent state:
@@ -90,7 +91,8 @@
         bits_per_wildcard_level := pos_integer(),
         topic_index_bytes := pos_integer(),
         ts_bits := non_neg_integer(),
-        ts_offset_bits := non_neg_integer()
+        ts_offset_bits := non_neg_integer(),
+        lts_threshold_spec => emqx_ds_lts:threshold_spec()
     }.
 
 %% Runtime state:
@@ -102,6 +104,7 @@
     keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()),
     ts_bits :: non_neg_integer(),
     ts_offset :: non_neg_integer(),
+    threshold_fun :: emqx_ds_lts:threshold_fun(),
     gvars :: ets:table()
 }).
 
@@ -141,6 +144,9 @@
 %% Limit on the number of wildcard levels in the learned topic trie:
 -define(WILDCARD_LIMIT, 10).
 
+%% Default LTS thresholds: 0th level = 100 entries max, other levels = 20 entries.
+-define(DEFAULT_LTS_THRESHOLD, {simple, {100, 20}}).
+
 %% Persistent (durable) term representing `#message{}' record. Must
 %% not change.
 -type value_v1() ::
@@ -195,6 +201,7 @@ create(_ShardId, DBHandle, GenId, Options, SPrev) ->
     TopicIndexBytes = maps:get(topic_index_bytes, Options, 4),
     %% 20 bits -> 1048576 us -> ~1 sec
     TSOffsetBits = maps:get(epoch_bits, Options, 20),
+    ThresholdSpec = maps:get(lts_threshold_spec, Options, ?DEFAULT_LTS_THRESHOLD),
     %% Create column families:
     DataCFName = data_cf(GenId),
     TrieCFName = trie_cf(GenId),
@@ -213,7 +220,8 @@ create(_ShardId, DBHandle, GenId, Options, SPrev) ->
         bits_per_wildcard_level => BitsPerTopicLevel,
         topic_index_bytes => TopicIndexBytes,
         ts_bits => 64,
-        ts_offset_bits => TSOffsetBits
+        ts_offset_bits => TSOffsetBits,
+        lts_threshold_spec => ThresholdSpec
     },
     {Schema, [{DataCFName, DataCFHandle}, {TrieCFName, TrieCFHandle}]}.
 
@@ -245,6 +253,7 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
          || N <- lists:seq(0, MaxWildcardLevels)
         ]
     ),
+    ThresholdSpec = maps:get(lts_threshold_spec, Schema, ?DEFAULT_LTS_THRESHOLD),
     #s{
         db = DBHandle,
         data = DataCF,
@@ -253,6 +262,7 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
         keymappers = KeymapperCache,
         ts_offset = TSOffsetBits,
         ts_bits = TSBits,
+        threshold_fun = emqx_ds_lts:threshold_fun(ThresholdSpec),
         gvars = ets:new(?MODULE, [public, set, {read_concurrency, true}])
     }.
 
@@ -841,9 +851,9 @@ format_key(KeyMapper, Key) ->
     lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])).
 
 -spec make_key(s(), emqx_ds:time(), emqx_types:topic()) -> {binary(), [binary()]}.
-make_key(#s{keymappers = KeyMappers, trie = Trie}, Timestamp, Topic) ->
+make_key(#s{keymappers = KeyMappers, trie = Trie, threshold_fun = TFun}, Timestamp, Topic) ->
     Tokens = emqx_topic:words(Topic),
-    {TopicIndex, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
+    {TopicIndex, Varying} = emqx_ds_lts:topic_key(Trie, TFun, Tokens),
     VaryingHashes = [hash_topic_level(I) || I <- Varying],
     KeyMapper = array:get(length(Varying), KeyMappers),
     KeyBin = make_key(KeyMapper, TopicIndex, Timestamp, VaryingHashes),
@@ -861,12 +871,6 @@ make_key(KeyMapper, TopicIndex, Timestamp, Varying) ->
         ])
     ).
 
-%% TODO: don't hardcode the thresholds
-threshold_fun(0) ->
-    100;
-threshold_fun(_) ->
-    20.
-
 hash_topic_level('') ->
     hash_topic_level(<<>>);
 hash_topic_level(TopicLevel) ->

+ 24 - 16
apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl

@@ -87,9 +87,13 @@
         topic_index_bytes := pos_integer(),
         keep_message_id := boolean(),
         serialization_schema := emqx_ds_msg_serializer:schema(),
-        with_guid := boolean()
+        with_guid := boolean(),
+        lts_threshold_spec => emqx_ds_lts:threshold_spec()
     }.
 
+%% Default LTS thresholds: 0th level = 100 entries max, other levels = 10 entries.
+-define(DEFAULT_LTS_THRESHOLD, {simple, {100, 10}}).
+
 %% Runtime state:
 -record(s, {
     db :: rocksdb:db_handle(),
@@ -98,6 +102,7 @@
     trie_cf :: rocksdb:cf_handle(),
     serialization_schema :: emqx_ds_msg_serializer:schema(),
     hash_bytes :: pos_integer(),
+    threshold_fun :: emqx_ds_lts:threshold_fun(),
     with_guid :: boolean()
 }).
 
@@ -136,7 +141,8 @@ create(_ShardId, DBHandle, GenId, Schema0, SPrev) ->
         wildcard_hash_bytes => 8,
         topic_index_bytes => 8,
         serialization_schema => asn1,
-        with_guid => false
+        with_guid => false,
+        lts_threshold_spec => ?DEFAULT_LTS_THRESHOLD
     },
     Schema = maps:merge(Defaults, Schema0),
     ok = emqx_ds_msg_serializer:check_schema(maps:get(serialization_schema, Schema)),
@@ -154,15 +160,22 @@ create(_ShardId, DBHandle, GenId, Schema0, SPrev) ->
     end,
     {Schema, [{DataCFName, DataCFHandle}, {TrieCFName, TrieCFHandle}]}.
 
-open(_Shard, DBHandle, GenId, CFRefs, #{
-    topic_index_bytes := TIBytes,
-    wildcard_hash_bytes := WCBytes,
-    serialization_schema := SSchema,
-    with_guid := WithGuid
-}) ->
+open(
+    _Shard,
+    DBHandle,
+    GenId,
+    CFRefs,
+    Schema = #{
+        topic_index_bytes := TIBytes,
+        wildcard_hash_bytes := WCBytes,
+        serialization_schema := SSchema,
+        with_guid := WithGuid
+    }
+) ->
     {_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs),
     {_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs),
     Trie = restore_trie(TIBytes, DBHandle, TrieCF),
+    ThresholdSpec = maps:get(lts_threshold_spec, Schema, ?DEFAULT_LTS_THRESHOLD),
     #s{
         db = DBHandle,
         data_cf = DataCF,
@@ -170,6 +183,7 @@ open(_Shard, DBHandle, GenId, CFRefs, #{
         trie = Trie,
         hash_bytes = WCBytes,
         serialization_schema = SSchema,
+        threshold_fun = emqx_ds_lts:threshold_fun(ThresholdSpec),
         with_guid = WithGuid
     }.
 
@@ -181,7 +195,7 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{data_cf = DataCF, trie_cf = TrieCF,
 
 prepare_batch(
     _ShardId,
-    S = #s{trie = Trie},
+    S = #s{trie = Trie, threshold_fun = TFun},
     Operations,
     _Options
 ) ->
@@ -190,7 +204,7 @@ prepare_batch(
         fun
             ({Timestamp, Msg = #message{topic = Topic}}) ->
                 Tokens = words(Topic),
-                {Static, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
+                {Static, Varying} = emqx_ds_lts:topic_key(Trie, TFun, Tokens),
                 ?cooked_msg_op(Timestamp, Static, Varying, serialize(S, Varying, Msg));
             ({delete, #message_matcher{topic = Topic, timestamp = Timestamp}}) ->
                 case emqx_ds_lts:lookup_topic_key(Trie, words(Topic)) of
@@ -692,12 +706,6 @@ hash(HashBytes, TopicLevel) ->
 
 %%%%%%%% LTS %%%%%%%%%%
 
-%% TODO: don't hardcode the thresholds
-threshold_fun(0) ->
-    100;
-threshold_fun(_) ->
-    10.
-
 -spec restore_trie(pos_integer(), rocksdb:db_handle(), rocksdb:cf_handle()) -> emqx_ds_lts:trie().
 restore_trie(StaticIdxBytes, DB, CF) ->
     PersistCallback = fun(Key, Val) ->

+ 12 - 3
rel/i18n/emqx_ds_schema.hocon

@@ -36,11 +36,20 @@ builtin_n_shards.desc:
   Please note that it takes effect only during the initialization of the durable storage database.
   Changing this configuration parameter after the database has been already created won't take any effect.~"""
 
-builtin_n_sites.label: "Initial number of sites"
-builtin_n_sites.desc:
+builtin_raft_replication_factor.label: "Replication factor"
+builtin_raft_replication_factor.desc:
+  """~
+  Number of identical replicas each shard should have.
+  Increasing this number improves durability and availability at the expense of greater resource consumption.
+  Quorum of replicas is needed to be healthy for the replication to work, hence an odd number of replicas is a good pick in general.
+  Please note that it takes effect only during the initialization of the durable storage database.
+  Changing this configuration parameter after the database has been already created won't take any effect.~"""
+
+builtin_raft_n_sites.label: "Initial number of sites"
+builtin_raft_n_sites.desc:
   """~
   Number of storage sites that need to share responsibility over the set of storage shards.
-  In this context, sites are essentially EMQX nodes that have message durability enabled.
+  In this context, sites are EMQX nodes with message durability enabled.
   Please note that it takes effect only during the initialization of the durable storage database.
   During this phase at least that many sites should come online to distribute shards between them, otherwise message storage will be unavailable until then.
   After the initialization is complete, sites may be offline, which will affect availability depending on the number of offline sites and replication factor.~"""