Parcourir la source

Merge pull request #13299 from savonarola/0617-shared-sub-leader-poc

feat(queue): implement PoC version of session ↔️ shared group leader interaction
Ilia Averianov il y a 1 an
Parent
commit
f1b8c356a6

+ 1 - 24
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -660,30 +660,7 @@ handle_info(?shared_sub_message(Msg), Session = #{s := S0, shared_sub_s := Share
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 shared_sub_opts(SessionId) ->
 shared_sub_opts(SessionId) ->
-    #{
-        session_id => SessionId,
-        send_funs => #{
-            send => fun send_message/2,
-            send_after => fun send_message_after/3
-        }
-    }.
-
-send_message(Dest, Msg) ->
-    case Dest =:= self() of
-        true ->
-            erlang:send(Dest, ?session_message(?shared_sub_message(Msg))),
-            Msg;
-        false ->
-            erlang:send(Dest, Msg)
-    end.
-
-send_message_after(Time, Dest, Msg) ->
-    case Dest =:= self() of
-        true ->
-            erlang:send_after(Time, Dest, ?session_message(?shared_sub_message(Msg)));
-        false ->
-            erlang:send_after(Time, Dest, Msg)
-    end.
+    #{session_id => SessionId}.
 
 
 bump_last_alive(S0) ->
 bump_last_alive(S0) ->
     %% Note: we take a pessimistic approach here and assume that the client will be alive
     %% Note: we take a pessimistic approach here and assume that the client will be alive

+ 16 - 57
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl

@@ -23,23 +23,14 @@
     to_map/2
     to_map/2
 ]).
 ]).
 
 
--record(agent_message, {
-    message :: term()
-}).
-
 -type t() :: #{
 -type t() :: #{
     agent := emqx_persistent_session_ds_shared_subs_agent:t()
     agent := emqx_persistent_session_ds_shared_subs_agent:t()
 }.
 }.
 -type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
 -type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
 -type opts() :: #{
 -type opts() :: #{
-    session_id := emqx_persistent_session_ds:id(),
-    send_funs := #{
-        send := fun((pid(), term()) -> term()),
-        send_after := fun((non_neg_integer(), pid(), term()) -> reference())
-    }
+    session_id := emqx_persistent_session_ds:id()
 }.
 }.
 
 
--define(agent_message(Msg), #agent_message{message = Msg}).
 -define(rank_x, rank_shared).
 -define(rank_x, rank_shared).
 -define(rank_y, 0).
 -define(rank_y, 0).
 
 
@@ -107,17 +98,20 @@ on_unsubscribe(SessionId, TopicFilter, S0, #{agent := Agent0} = SharedSubS0) ->
 -spec renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
 -spec renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
     {emqx_persistent_session_ds_state:t(), t()}.
     {emqx_persistent_session_ds_state:t(), t()}.
 renew_streams(S0, #{agent := Agent0} = SharedSubS0) ->
 renew_streams(S0, #{agent := Agent0} = SharedSubS0) ->
-    {NewLeasedStreams, RevokedStreams, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams(
+    {StreamLeaseEvents, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams(
         Agent0
         Agent0
     ),
     ),
-    NewLeasedStreams =/= [] andalso
-        ?SLOG(
-            info, #{msg => shared_subs_new_stream_leases, stream_leases => NewLeasedStreams}
-        ),
-    S1 = lists:foldl(fun accept_stream/2, S0, NewLeasedStreams),
-    S2 = lists:foldl(fun revoke_stream/2, S1, RevokedStreams),
+    ?tp(info, shared_subs_new_stream_lease_events, #{stream_lease_events => StreamLeaseEvents}),
+    S1 = lists:foldl(
+        fun
+            (#{type := lease} = Event, S) -> accept_stream(Event, S);
+            (#{type := revoke} = Event, S) -> revoke_stream(Event, S)
+        end,
+        S0,
+        StreamLeaseEvents
+    ),
     SharedSubS1 = SharedSubS0#{agent => Agent1},
     SharedSubS1 = SharedSubS0#{agent => Agent1},
-    {S2, SharedSubS1}.
+    {S1, SharedSubS1}.
 
 
 -spec on_streams_replayed(
 -spec on_streams_replayed(
     emqx_persistent_session_ds_state:t(),
     emqx_persistent_session_ds_state:t(),
@@ -147,14 +141,10 @@ on_streams_replayed(S, #{agent := Agent0} = SharedSubS0) ->
 
 
 -spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) ->
 -spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) ->
     {emqx_persistent_session_ds_state:t(), t()}.
     {emqx_persistent_session_ds_state:t(), t()}.
-on_info(S, #{agent := Agent0} = SharedSubS0, ?agent_message(Info)) ->
+on_info(S, #{agent := Agent0} = SharedSubS0, Info) ->
     Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_info(Agent0, Info),
     Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_info(Agent0, Info),
     SharedSubS1 = SharedSubS0#{agent => Agent1},
     SharedSubS1 = SharedSubS0#{agent => Agent1},
-    {S, SharedSubS1};
-on_info(S, SharedSubS, _Info) ->
-    %% TODO
-    %% Log warning
-    {S, SharedSubS}.
+    {S, SharedSubS1}.
 
 
 -spec to_map(emqx_persistent_session_ds_state:t(), t()) -> map().
 -spec to_map(emqx_persistent_session_ds_state:t(), t()) -> map().
 to_map(_S, _SharedSubS) ->
 to_map(_S, _SharedSubS) ->
@@ -340,39 +330,8 @@ to_agent_subscription(_S, Subscription) ->
     maps:with([start_time], Subscription).
     maps:with([start_time], Subscription).
 
 
 -spec agent_opts(opts()) -> emqx_persistent_session_ds_shared_subs_agent:opts().
 -spec agent_opts(opts()) -> emqx_persistent_session_ds_shared_subs_agent:opts().
-agent_opts(#{session_id := SessionId, send_funs := SendFuns}) ->
-    #{
-        session_id => SessionId,
-        send_funs => agent_send_funs(SendFuns)
-    }.
-
-agent_send_funs(#{
-    send := Send,
-    send_after := SendAfter
-}) ->
-    #{
-        send => fun(Pid, Msg) -> send_from_agent(Send, Pid, Msg) end,
-        send_after => fun(Time, Pid, Msg) ->
-            send_after_from_agent(SendAfter, Time, Pid, Msg)
-        end
-    }.
-
-send_from_agent(Send, Dest, Msg) ->
-    case Dest =:= self() of
-        true ->
-            Send(Dest, ?agent_message(Msg)),
-            Msg;
-        false ->
-            Send(Dest, Msg)
-    end.
-
-send_after_from_agent(SendAfter, Time, Dest, Msg) ->
-    case Dest =:= self() of
-        true ->
-            SendAfter(Time, Dest, ?agent_message(Msg));
-        false ->
-            SendAfter(Time, Dest, Msg)
-    end.
+agent_opts(#{session_id := SessionId}) ->
+    #{session_id => SessionId}.
 
 
 -dialyzer({nowarn_function, now_ms/0}).
 -dialyzer({nowarn_function, now_ms/0}).
 now_ms() ->
 now_ms() ->

+ 23 - 8
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl

@@ -5,6 +5,8 @@
 -module(emqx_persistent_session_ds_shared_subs_agent).
 -module(emqx_persistent_session_ds_shared_subs_agent).
 
 
 -include("shared_subs_agent.hrl").
 -include("shared_subs_agent.hrl").
+-include("emqx_session.hrl").
+-include("session_internals.hrl").
 
 
 -type session_id() :: emqx_persistent_session_ds:id().
 -type session_id() :: emqx_persistent_session_ds:id().
 
 
@@ -16,18 +18,15 @@
 -type topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
 -type topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
 
 
 -type opts() :: #{
 -type opts() :: #{
-    session_id := session_id(),
-    send_funs := #{
-        send := fun((pid(), term()) -> term()),
-        send_after := fun((non_neg_integer(), pid(), term()) -> reference())
-    }
+    session_id := session_id()
 }.
 }.
 
 
 %% TODO
 %% TODO
-%% This records goe through network, we better shrink them
+%% This records go through network, we better shrink them
 %% * use integer keys
 %% * use integer keys
 %% * somehow avoid passing stream and topic_filter — they both are part of the iterator
 %% * somehow avoid passing stream and topic_filter — they both are part of the iterator
 -type stream_lease() :: #{
 -type stream_lease() :: #{
+    type => lease,
     %% Used as "external" subscription_id
     %% Used as "external" subscription_id
     topic_filter := topic_filter(),
     topic_filter := topic_filter(),
     stream := emqx_ds:stream(),
     stream := emqx_ds:stream(),
@@ -35,10 +34,13 @@
 }.
 }.
 
 
 -type stream_revoke() :: #{
 -type stream_revoke() :: #{
+    type => revoke,
     topic_filter := topic_filter(),
     topic_filter := topic_filter(),
     stream := emqx_ds:stream()
     stream := emqx_ds:stream()
 }.
 }.
 
 
+-type stream_lease_event() :: stream_lease() | stream_revoke().
+
 -type stream_progress() :: #{
 -type stream_progress() :: #{
     topic_filter := topic_filter(),
     topic_filter := topic_filter(),
     stream := emqx_ds:stream(),
     stream := emqx_ds:stream(),
@@ -65,6 +67,11 @@
     renew_streams/1
     renew_streams/1
 ]).
 ]).
 
 
+-export([
+    send/2,
+    send_after/3
+]).
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Behaviour
 %% Behaviour
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -74,7 +81,7 @@
 -callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) ->
 -callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) ->
     {ok, t()} | {error, term()}.
     {ok, t()} | {error, term()}.
 -callback on_unsubscribe(t(), topic_filter()) -> t().
 -callback on_unsubscribe(t(), topic_filter()) -> t().
--callback renew_streams(t()) -> {[stream_lease()], [stream_revoke()], t()}.
+-callback renew_streams(t()) -> {[stream_lease_event()], t()}.
 -callback on_stream_progress(t(), [stream_progress()]) -> t().
 -callback on_stream_progress(t(), [stream_progress()]) -> t().
 -callback on_info(t(), term()) -> t().
 -callback on_info(t(), term()) -> t().
 
 
@@ -99,7 +106,7 @@ on_subscribe(Agent, TopicFilter, SubOpts) ->
 on_unsubscribe(Agent, TopicFilter) ->
 on_unsubscribe(Agent, TopicFilter) ->
     ?shared_subs_agent:on_unsubscribe(Agent, TopicFilter).
     ?shared_subs_agent:on_unsubscribe(Agent, TopicFilter).
 
 
--spec renew_streams(t()) -> {[stream_lease()], [stream_revoke()], t()}.
+-spec renew_streams(t()) -> {[stream_lease_event()], t()}.
 renew_streams(Agent) ->
 renew_streams(Agent) ->
     ?shared_subs_agent:renew_streams(Agent).
     ?shared_subs_agent:renew_streams(Agent).
 
 
@@ -110,3 +117,11 @@ on_stream_progress(Agent, StreamProgress) ->
 -spec on_info(t(), term()) -> t().
 -spec on_info(t(), term()) -> t().
 on_info(Agent, Info) ->
 on_info(Agent, Info) ->
     ?shared_subs_agent:on_info(Agent, Info).
     ?shared_subs_agent:on_info(Agent, Info).
+
+-spec send(pid(), term()) -> term().
+send(Dest, Msg) ->
+    erlang:send(Dest, ?session_message(?shared_sub_message(Msg))).
+
+-spec send_after(non_neg_integer(), pid(), term()) -> reference().
+send_after(Time, Dest, Msg) ->
+    erlang:send_after(Time, Dest, ?session_message(?shared_sub_message(Msg))).

+ 1 - 1
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_null_agent.erl

@@ -37,7 +37,7 @@ on_unsubscribe(Agent, _TopicFilter) ->
     Agent.
     Agent.
 
 
 renew_streams(Agent) ->
 renew_streams(Agent) ->
-    {[], [], Agent}.
+    {[], Agent}.
 
 
 on_stream_progress(Agent, _StreamProgress) ->
 on_stream_progress(Agent, _StreamProgress) ->
     Agent.
     Agent.

+ 15 - 5
apps/emqx/src/emqx_persistent_session_ds/shared_subs_agent.hrl

@@ -10,25 +10,35 @@
 -if(?EMQX_RELEASE_EDITION == ee).
 -if(?EMQX_RELEASE_EDITION == ee).
 
 
 %% agent from BSL app
 %% agent from BSL app
-% -define(shared_subs_agent, emqx_ds_shared_sub_agent).
+
+-ifdef(TEST).
+
+-define(shared_subs_agent, emqx_ds_shared_sub_agent).
+
+%% clause of -ifdef(TEST).
+-else.
+
 %% Till full implementation we need to dispach to the null agent.
 %% Till full implementation we need to dispach to the null agent.
 %% It will report "not implemented" error for attempts to use shared subscriptions.
 %% It will report "not implemented" error for attempts to use shared subscriptions.
 -define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent).
 -define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent).
 
 
-%% -if(?EMQX_RELEASE_EDITION == ee).
+%% end of -ifdef(TEST).
+-endif.
+
+%% clause of -if(?EMQX_RELEASE_EDITION == ee).
 -else.
 -else.
 
 
 -define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent).
 -define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent).
 
 
-%% -if(?EMQX_RELEASE_EDITION == ee).
+%% end of -if(?EMQX_RELEASE_EDITION == ee).
 -endif.
 -endif.
 
 
-%% -ifdef(EMQX_RELEASE_EDITION).
+%% clause of -ifdef(EMQX_RELEASE_EDITION).
 -else.
 -else.
 
 
 -define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent).
 -define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent).
 
 
-%% -ifdef(EMQX_RELEASE_EDITION).
+%% end of -ifdef(EMQX_RELEASE_EDITION).
 -endif.
 -endif.
 
 
 -endif.
 -endif.

+ 9 - 0
apps/emqx_ds_shared_sub/README.md

@@ -1,5 +1,14 @@
 # EMQX Durable Shared Subscriptions
 # EMQX Durable Shared Subscriptions
 
 
+This application makes durable session capable to cooperatively replay messages from a topic.
+
+# General layout and interaction with session
+
+![General layout](docs/images/ds_shared_subs.png)
+
+* The nesting reflects nesting/ownership of entity states.
+* The bold arrow represent the [most complex interaction](https://github.com/emqx/eip/blob/main/active/0028-durable-shared-subscriptions.md#shared-subscription-session-handler), between session-side group subscription state machine and the shared subscription leader.
+
 # Contributing
 # Contributing
 
 
 Please see our [contributing.md](../../CONTRIBUTING.md).
 Please see our [contributing.md](../../CONTRIBUTING.md).

BIN
apps/emqx_ds_shared_sub/docs/images/ds_shared_subs.png


+ 5 - 0
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub.erl

@@ -0,0 +1,5 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_shared_sub).

+ 95 - 93
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl

@@ -4,10 +4,11 @@
 
 
 -module(emqx_ds_shared_sub_agent).
 -module(emqx_ds_shared_sub_agent).
 
 
--include_lib("emqx/include/emqx_persistent_message.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/logger.hrl").
 
 
+-include("emqx_ds_shared_sub_proto.hrl").
+
 -export([
 -export([
     new/1,
     new/1,
     open/2,
     open/2,
@@ -22,6 +23,11 @@
 
 
 -behaviour(emqx_persistent_session_ds_shared_subs_agent).
 -behaviour(emqx_persistent_session_ds_shared_subs_agent).
 
 
+-record(message_to_group_sm, {
+    group :: emqx_types:group(),
+    message :: term()
+}).
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% API
 %% API
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -32,8 +38,8 @@ new(Opts) ->
 open(TopicSubscriptions, Opts) ->
 open(TopicSubscriptions, Opts) ->
     State0 = init_state(Opts),
     State0 = init_state(Opts),
     State1 = lists:foldl(
     State1 = lists:foldl(
-        fun({ShareTopicFilter, #{start_time := StartTime}}, State) ->
-            add_subscription(State, ShareTopicFilter, StartTime)
+        fun({ShareTopicFilter, #{}}, State) ->
+            add_group_subscription(State, ShareTopicFilter)
         end,
         end,
         State0,
         State0,
         TopicSubscriptions
         TopicSubscriptions
@@ -41,23 +47,44 @@ open(TopicSubscriptions, Opts) ->
     State1.
     State1.
 
 
 on_subscribe(State0, TopicFilter, _SubOpts) ->
 on_subscribe(State0, TopicFilter, _SubOpts) ->
-    StartTime = now_ms(),
-    State1 = add_subscription(State0, TopicFilter, StartTime),
+    State1 = add_group_subscription(State0, TopicFilter),
     {ok, State1}.
     {ok, State1}.
 
 
 on_unsubscribe(State, TopicFilter) ->
 on_unsubscribe(State, TopicFilter) ->
-    delete_subscription(State, TopicFilter).
+    delete_group_subscription(State, TopicFilter).
 
 
-renew_streams(State0) ->
-    State1 = do_renew_streams(State0),
-    {State2, StreamLeases} = stream_leases(State1),
-    {StreamLeases, [], State2}.
+renew_streams(#{} = State) ->
+    fetch_stream_events(State).
 
 
 on_stream_progress(State, _StreamProgress) ->
 on_stream_progress(State, _StreamProgress) ->
+    %% TODO https://emqx.atlassian.net/browse/EMQX-12572
+    %% Send to leader
     State.
     State.
 
 
-on_info(State, _Info) ->
-    State.
+on_info(State, ?leader_lease_streams_match(Group, StreamProgresses, Version)) ->
+    ?SLOG(info, #{
+        msg => leader_lease_streams,
+        group => Group,
+        streams => StreamProgresses,
+        version => Version
+    }),
+    with_group_sm(State, Group, fun(GSM) ->
+        emqx_ds_shared_sub_group_sm:handle_leader_lease_streams(GSM, StreamProgresses, Version)
+    end);
+on_info(State, ?leader_renew_stream_lease_match(Group, Version)) ->
+    ?SLOG(info, #{
+        msg => leader_renew_stream_lease,
+        group => Group,
+        version => Version
+    }),
+    with_group_sm(State, Group, fun(GSM) ->
+        emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, Version)
+    end);
+%% Generic messages sent by group_sm's to themselves (timeouts).
+on_info(State, #message_to_group_sm{group = Group, message = Message}) ->
+    with_group_sm(State, Group, fun(GSM) ->
+        emqx_ds_shared_sub_group_sm:handle_info(GSM, Message)
+    end).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Internal functions
 %% Internal functions
@@ -65,92 +92,67 @@ on_info(State, _Info) ->
 
 
 init_state(Opts) ->
 init_state(Opts) ->
     SessionId = maps:get(session_id, Opts),
     SessionId = maps:get(session_id, Opts),
-    SendFuns = maps:get(send_funs, Opts),
-    Send = maps:get(send, SendFuns),
-    SendAfter = maps:get(send_after, SendFuns),
     #{
     #{
         session_id => SessionId,
         session_id => SessionId,
-        send => Send,
-        end_after => SendAfter,
-        subscriptions => #{}
+        groups => #{}
     }.
     }.
 
 
-% send(State, Pid, Msg) ->
-%     Send = maps:get(send, State),
-%     Send(Pid, Msg).
-
-% send_after(State, Time, Pid, Msg) ->
-%     SendAfter = maps:get(send_after, State),
-%     SendAfter(Time, Pid, Msg).
-
-do_renew_streams(#{subscriptions := Subs0} = State0) ->
-    Subs1 = maps:map(
-        fun(
-            ShareTopicFilter,
-            #{start_time := StartTime, streams := Streams0, stream_leases := StreamLeases} = Sub
-        ) ->
-            #share{topic = TopicFilterRaw} = ShareTopicFilter,
-            TopicFilter = emqx_topic:words(TopicFilterRaw),
-            {_, NewStreams} = lists:unzip(
-                emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime)
-            ),
-            {Streams1, NewLeases} = lists:foldl(
-                fun(Stream, {StreamsAcc, LeasesAcc}) ->
-                    case StreamsAcc of
-                        #{Stream := _} ->
-                            {StreamsAcc, LeasesAcc};
-                        _ ->
-                            {ok, It} = emqx_ds:make_iterator(
-                                ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
-                            ),
-                            StreamLease = #{
-                                topic_filter => ShareTopicFilter,
-                                stream => Stream,
-                                iterator => It
-                            },
-                            {StreamsAcc#{Stream => It}, [StreamLease | LeasesAcc]}
-                    end
-                end,
-                {Streams0, []},
-                NewStreams
-            ),
-            Sub#{streams => Streams1, stream_leases => StreamLeases ++ NewLeases}
-        end,
-        Subs0
-    ),
-    State0#{subscriptions => Subs1}.
-
-delete_subscription(#{session_id := SessionId, subscriptions := Subs0} = State0, ShareTopicFilter) ->
-    #share{topic = TopicFilter} = ShareTopicFilter,
-    ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId),
-    Subs1 = maps:remove(ShareTopicFilter, Subs0),
-    State0#{subscriptions => Subs1}.
-
-stream_leases(#{subscriptions := Subs0} = State0) ->
-    {Subs1, StreamLeases} = lists:foldl(
-        fun({TopicFilter, #{stream_leases := Leases} = Sub}, {SubsAcc, LeasesAcc}) ->
-            {SubsAcc#{TopicFilter => Sub#{stream_leases => []}}, [Leases | LeasesAcc]}
-        end,
-        {Subs0, []},
-        maps:to_list(Subs0)
-    ),
-    State1 = State0#{subscriptions => Subs1},
-    {State1, lists:concat(StreamLeases)}.
-
-now_ms() ->
-    erlang:system_time(millisecond).
+delete_group_subscription(State, _ShareTopicFilter) ->
+    %% TODO https://emqx.atlassian.net/browse/EMQX-12572
+    State.
 
 
-add_subscription(
-    #{subscriptions := Subs0, session_id := SessionId} = State0, ShareTopicFilter, StartTime
+add_group_subscription(
+    #{groups := Groups0} = State0, ShareTopicFilter
 ) ->
 ) ->
-    #share{topic = TopicFilter} = ShareTopicFilter,
-    ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, SessionId),
-    Subs1 = Subs0#{
-        ShareTopicFilter => #{
-            start_time => StartTime,
-            streams => #{},
-            stream_leases => []
-        }
+    ?SLOG(info, #{
+        msg => agent_add_group_subscription,
+        topic_filter => ShareTopicFilter
+    }),
+    #share{group = Group} = ShareTopicFilter,
+    Groups1 = Groups0#{
+        Group => emqx_ds_shared_sub_group_sm:new(#{
+            topic_filter => ShareTopicFilter,
+            agent => this_agent(),
+            send_after => send_to_subscription_after(Group)
+        })
     },
     },
-    State1 = State0#{subscriptions => Subs1},
+    State1 = State0#{groups => Groups1},
     State1.
     State1.
+
+fetch_stream_events(#{groups := Groups0} = State0) ->
+    {Groups1, Events} = maps:fold(
+        fun(Group, GroupSM0, {GroupsAcc, EventsAcc}) ->
+            {GroupSM1, Events} = emqx_ds_shared_sub_group_sm:fetch_stream_events(GroupSM0),
+            {GroupsAcc#{Group => GroupSM1}, [Events | EventsAcc]}
+        end,
+        {#{}, []},
+        Groups0
+    ),
+    State1 = State0#{groups => Groups1},
+    {lists:concat(Events), State1}.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+this_agent() -> self().
+
+send_to_subscription_after(Group) ->
+    fun(Time, Msg) ->
+        emqx_persistent_session_ds_shared_subs_agent:send_after(
+            Time,
+            self(),
+            #message_to_group_sm{group = Group, message = Msg}
+        )
+    end.
+
+with_group_sm(State, Group, Fun) ->
+    case State of
+        #{groups := #{Group := GSM0} = Groups} ->
+            GSM1 = Fun(GSM0),
+            State#{groups => Groups#{Group => GSM1}};
+        _ ->
+            %% TODO
+            %% Error?
+            State
+    end.

+ 282 - 0
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl

@@ -0,0 +1,282 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+%% @doc State machine for a single subscription of a shared subscription agent.
+%% Implements GSFSM described in
+%% https://github.com/emqx/eip/blob/main/active/0028-durable-shared-subscriptions.md
+
+%% `group_sm` stands for "group state machine".
+-module(emqx_ds_shared_sub_group_sm).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-export([
+    new/1,
+
+    %% Leader messages
+    handle_leader_lease_streams/3,
+    handle_leader_renew_stream_lease/2,
+
+    %% Self-initiated messages
+    handle_info/2,
+
+    %% API
+    fetch_stream_events/1
+]).
+
+-type options() :: #{
+    agent := emqx_ds_shared_sub_proto:agent(),
+    topic_filter := emqx_persistent_session_ds:share_topic_filter(),
+    send_after := fun((non_neg_integer(), term()) -> reference())
+}.
+
+%% Subscription states
+
+-define(connecting, connecting).
+-define(replaying, replaying).
+-define(updating, updating).
+
+-type state() :: ?connecting | ?replaying | ?updating.
+
+-type group_sm() :: #{
+    topic_filter => emqx_persistent_session_ds:share_topic_filter(),
+    agent => emqx_ds_shared_sub_proto:agent(),
+    send_after => fun((non_neg_integer(), term()) -> reference()),
+
+    state => state(),
+    state_data => map(),
+    state_timers => map()
+}.
+
+-record(state_timeout, {
+    id :: reference(),
+    name :: atom(),
+    message :: term()
+}).
+-record(timer, {
+    ref :: reference(),
+    id :: reference()
+}).
+
+%%-----------------------------------------------------------------------
+%% Constants
+%%-----------------------------------------------------------------------
+
+%% TODO https://emqx.atlassian.net/browse/EMQX-12574
+%% Move to settings
+-define(FIND_LEADER_TIMEOUT, 1000).
+-define(RENEW_LEASE_TIMEOUT, 2000).
+
+%%-----------------------------------------------------------------------
+%% API
+%%-----------------------------------------------------------------------
+
+-spec new(options()) -> group_sm().
+new(#{
+    agent := Agent,
+    topic_filter := ShareTopicFilter,
+    send_after := SendAfter
+}) ->
+    ?SLOG(
+        info,
+        #{
+            msg => group_sm_new,
+            agent => Agent,
+            topic_filter => ShareTopicFilter
+        }
+    ),
+    GSM0 = #{
+        topic_filter => ShareTopicFilter,
+        agent => Agent,
+        send_after => SendAfter
+    },
+    transition(GSM0, ?connecting, #{}).
+
+fetch_stream_events(
+    #{
+        state := ?replaying,
+        topic_filter := TopicFilter,
+        state_data := #{stream_lease_events := Events0} = Data
+    } = GSM
+) ->
+    Events1 = lists:map(
+        fun(Event) ->
+            Event#{topic_filter => TopicFilter}
+        end,
+        Events0
+    ),
+    {
+        GSM#{
+            state_data => Data#{stream_lease_events => []}
+        },
+        Events1
+    };
+fetch_stream_events(GSM) ->
+    {GSM, []}.
+
+%%-----------------------------------------------------------------------
+%% Event Handlers
+%%-----------------------------------------------------------------------
+
+%%-----------------------------------------------------------------------
+%% Connecting state
+
+handle_connecting(#{agent := Agent, topic_filter := ShareTopicFilter} = GSM) ->
+    ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, ShareTopicFilter),
+    ensure_state_timeout(GSM, find_leader_timeout, ?FIND_LEADER_TIMEOUT).
+
+handle_leader_lease_streams(
+    #{state := ?connecting, topic_filter := TopicFilter} = GSM0, StreamProgresses, Version
+) ->
+    ?tp(debug, leader_lease_streams, #{topic_filter => TopicFilter}),
+    Streams = lists:foldl(
+        fun(#{stream := Stream, iterator := It}, Acc) ->
+            Acc#{Stream => It}
+        end,
+        #{},
+        StreamProgresses
+    ),
+    StreamLeaseEvents = lists:map(
+        fun(#{stream := Stream, iterator := It}) ->
+            #{
+                type => lease,
+                stream => Stream,
+                iterator => It
+            }
+        end,
+        StreamProgresses
+    ),
+    transition(
+        GSM0,
+        ?replaying,
+        #{
+            streams => Streams,
+            stream_lease_events => StreamLeaseEvents,
+            prev_version => undefined,
+            version => Version
+        }
+    );
+handle_leader_lease_streams(GSM, _StreamProgresses, _Version) ->
+    GSM.
+
+handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0) ->
+    ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, TopicFilter),
+    GSM1 = ensure_state_timeout(GSM0, find_leader_timeout, ?FIND_LEADER_TIMEOUT),
+    GSM1.
+
+%%-----------------------------------------------------------------------
+%% Replaying state
+
+handle_replaying(GSM) ->
+    ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT).
+
+handle_leader_renew_stream_lease(
+    #{state := ?replaying, state_data := #{version := Version}} = GSM, Version
+) ->
+    ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT);
+handle_leader_renew_stream_lease(GSM, _Version) ->
+    GSM.
+
+handle_renew_lease_timeout(GSM) ->
+    ?tp(debug, renew_lease_timeout, #{}),
+    transition(GSM, ?connecting, #{}).
+
+%%-----------------------------------------------------------------------
+%% Updating state
+
+% handle_updating(GSM) ->
+%     GSM.
+
+%%-----------------------------------------------------------------------
+%% Internal API
+%%-----------------------------------------------------------------------
+
+handle_state_timeout(
+    #{state := ?connecting, topic_filter := TopicFilter} = GSM,
+    find_leader_timeout,
+    _Message
+) ->
+    ?tp(debug, find_leader_timeout, #{topic_filter => TopicFilter}),
+    handle_find_leader_timeout(GSM);
+handle_state_timeout(
+    #{state := ?replaying} = GSM,
+    renew_lease_timeout,
+    _Message
+) ->
+    handle_renew_lease_timeout(GSM).
+
+handle_info(
+    #{state_timers := Timers} = GSM, #state_timeout{message = Message, name = Name, id = Id} = _Info
+) ->
+    case Timers of
+        #{Name := #timer{id = Id}} ->
+            handle_state_timeout(GSM, Name, Message);
+        _ ->
+            %% Stale timer
+            GSM
+    end;
+handle_info(GSM, _Info) ->
+    GSM.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+transition(GSM0, NewState, NewStateData) ->
+    Timers = maps:get(state_timers, GSM0, #{}),
+    TimerNames = maps:keys(Timers),
+    GSM1 = lists:foldl(
+        fun(Name, Acc) ->
+            cancel_timer(Acc, Name)
+        end,
+        GSM0,
+        TimerNames
+    ),
+    GSM2 = GSM1#{
+        state => NewState,
+        state_data => NewStateData,
+        state_timers => #{}
+    },
+    run_enter_callback(GSM2).
+
+ensure_state_timeout(GSM0, Name, Delay) ->
+    ensure_state_timeout(GSM0, Name, Delay, Name).
+
+ensure_state_timeout(GSM0, Name, Delay, Message) ->
+    Id = make_ref(),
+    GSM1 = cancel_timer(GSM0, Name),
+    Timers = maps:get(state_timers, GSM1),
+    TimerMessage = #state_timeout{
+        id = Id,
+        name = Name,
+        message = Message
+    },
+    TimerRef = send_after(GSM1, Delay, TimerMessage),
+    GSM2 = GSM1#{
+        state_timers := Timers#{Name => #timer{ref = TimerRef, id = Id}}
+    },
+    GSM2.
+
+send_after(#{send_after := SendAfter} = _GSM, Delay, Message) ->
+    SendAfter(Delay, Message).
+
+cancel_timer(GSM, Name) ->
+    Timers = maps:get(state_timers, GSM, #{}),
+    case Timers of
+        #{Name := #timer{ref = TimerRef}} ->
+            _ = erlang:cancel_timer(TimerRef),
+            GSM#{
+                state_timers := maps:remove(Name, Timers)
+            };
+        _ ->
+            GSM
+    end.
+
+run_enter_callback(#{state := ?connecting} = GSM) ->
+    handle_connecting(GSM);
+run_enter_callback(#{state := ?replaying} = GSM) ->
+    handle_replaying(GSM).
+% run_enter_callback(#{state := ?updating} = GSM) ->
+%     handle_updating(GSM).

+ 326 - 0
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl

@@ -0,0 +1,326 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_shared_sub_leader).
+
+-behaviour(gen_statem).
+
+-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx/include/emqx_persistent_message.hrl").
+-include("emqx_ds_shared_sub_proto.hrl").
+
+-export([
+    register/2,
+
+    start_link/1,
+    child_spec/1,
+    id/1,
+
+    callback_mode/0,
+    init/1,
+    handle_event/4,
+    terminate/3
+]).
+
+-type options() :: #{
+    topic_filter := emqx_persistent_session_ds:share_topic_filter()
+}.
+
+-type stream_assignment() :: #{
+    prev_version := emqx_maybe:t(emqx_ds_shared_sub_proto:version()),
+    version := emqx_ds_shared_sub_proto:version(),
+    streams := list(emqx_ds:stream())
+}.
+
+-type data() :: #{
+    group := emqx_types:group(),
+    topic := emqx_types:topic(),
+    %% For ds router, not an actual session_id
+    router_id := binary(),
+    %% TODO https://emqx.atlassian.net/browse/EMQX-12307
+    %% Persist progress
+    %% TODO https://emqx.atlassian.net/browse/EMQX-12575
+    %% Implement some stats to assign evenly?
+    stream_progresses := #{
+        emqx_ds:stream() => emqx_ds:iterator()
+    },
+    agent_stream_assignments := #{
+        emqx_ds_shared_sub_proto:agent() => stream_assignment()
+    },
+    stream_assignments := #{
+        emqx_ds:stream() => emqx_ds_shared_sub_proto:agent()
+    }
+}.
+
+-export_type([
+    options/0,
+    data/0
+]).
+
+%% States
+
+-define(waiting_registration, waiting_registration).
+-define(replaying, replaying).
+
+%% Events
+
+-record(register, {
+    register_fun :: fun(() -> pid())
+}).
+-record(renew_streams, {}).
+-record(renew_leases, {}).
+
+%% Constants
+
+%% TODO https://emqx.atlassian.net/browse/EMQX-12574
+%% Move to settings
+-define(RENEW_LEASE_INTERVAL, 5000).
+-define(RENEW_STREAMS_INTERVAL, 5000).
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+register(Pid, Fun) ->
+    gen_statem:call(Pid, #register{register_fun = Fun}).
+
+%%--------------------------------------------------------------------
+%% Internal API
+%%--------------------------------------------------------------------
+
+child_spec(#{topic_filter := TopicFilter} = Options) ->
+    #{
+        id => id(TopicFilter),
+        start => {?MODULE, start_link, [Options]},
+        restart => temporary,
+        shutdown => 5000,
+        type => worker
+    }.
+
+start_link(Options) ->
+    gen_statem:start_link(?MODULE, [Options], []).
+
+id(#share{group = Group} = _TopicFilter) ->
+    {?MODULE, Group}.
+
+%%--------------------------------------------------------------------
+%% gen_statem callbacks
+%%--------------------------------------------------------------------
+
+callback_mode() -> [handle_event_function, state_enter].
+
+init([#{topic_filter := #share{group = Group, topic = Topic}} = _Options]) ->
+    Data = #{
+        group => Group,
+        topic => Topic,
+        router_id => router_id(),
+        stream_progresses => #{},
+        stream_assignments => #{},
+        agent_stream_assignments => #{}
+    },
+    {ok, ?waiting_registration, Data}.
+
+%%--------------------------------------------------------------------
+%% waiting_registration state
+
+handle_event({call, From}, #register{register_fun = Fun}, ?waiting_registration, Data) ->
+    Self = self(),
+    case Fun() of
+        Self ->
+            {next_state, ?replaying, Data, {reply, From, {ok, Self}}};
+        OtherPid ->
+            {stop_and_reply, normal, {reply, From, {ok, OtherPid}}}
+    end;
+%%--------------------------------------------------------------------
+%% repalying state
+handle_event(enter, _OldState, ?replaying, #{topic := Topic, router_id := RouterId} = _Data) ->
+    ok = emqx_persistent_session_ds_router:do_add_route(Topic, RouterId),
+    {keep_state_and_data, [
+        {state_timeout, ?RENEW_LEASE_INTERVAL, #renew_leases{}},
+        {state_timeout, 0, #renew_streams{}}
+    ]};
+handle_event(state_timeout, #renew_streams{}, ?replaying, Data0) ->
+    Data1 = renew_streams(Data0),
+    {keep_state, Data1, {state_timeout, ?RENEW_STREAMS_INTERVAL, #renew_streams{}}};
+handle_event(state_timeout, #renew_leases{}, ?replaying, Data0) ->
+    Data1 = renew_leases(Data0),
+    {keep_state, Data1, {state_timeout, ?RENEW_LEASE_INTERVAL, #renew_leases{}}};
+handle_event(info, ?agent_connect_leader_match(Agent, _TopicFilter), ?replaying, Data0) ->
+    Data1 = connect_agent(Data0, Agent),
+    {keep_state, Data1};
+handle_event(
+    info, ?agent_update_stream_states_match(Agent, StreamProgresses, Version), ?replaying, Data0
+) ->
+    Data1 = update_agent_stream_states(Data0, Agent, StreamProgresses, Version),
+    {keep_state, Data1};
+%%--------------------------------------------------------------------
+%% fallback
+handle_event(enter, _OldState, _State, _Data) ->
+    keep_state_and_data;
+handle_event(Event, _Content, State, _Data) ->
+    ?SLOG(warning, #{
+        msg => unexpected_event,
+        event => Event,
+        state => State
+    }),
+    keep_state_and_data.
+
+terminate(_Reason, _State, #{topic := Topic, router_id := RouterId} = _Data) ->
+    ok = emqx_persistent_session_ds_router:do_delete_route(Topic, RouterId),
+    ok.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+renew_streams(#{stream_progresses := Progresses, topic := Topic} = Data0) ->
+    TopicFilter = emqx_topic:words(Topic),
+    StartTime = now_ms(),
+    {_, Streams} = lists:unzip(
+        emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, now_ms())
+    ),
+    %% TODO https://emqx.atlassian.net/browse/EMQX-12572
+    %% Handle stream removal
+    NewProgresses = lists:foldl(
+        fun(Stream, ProgressesAcc) ->
+            case ProgressesAcc of
+                #{Stream := _} ->
+                    ProgressesAcc;
+                _ ->
+                    {ok, It} = emqx_ds:make_iterator(
+                        ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
+                    ),
+                    ProgressesAcc#{Stream => It}
+            end
+        end,
+        Progresses,
+        Streams
+    ),
+    %% TODO https://emqx.atlassian.net/browse/EMQX-12572
+    %% Initiate reassigment
+    ?SLOG(info, #{
+        msg => leader_renew_streams,
+        topic_filter => TopicFilter,
+        streams => length(Streams)
+    }),
+    Data0#{stream_progresses => NewProgresses}.
+
+%% TODO https://emqx.atlassian.net/browse/EMQX-12572
+%% This just gives unassigned streams to the connecting agent,
+%% we need to implement actual stream (re)assignment.
+connect_agent(
+    #{
+        group := Group,
+        agent_stream_assignments := AgentStreamAssignments0,
+        stream_assignments := StreamAssignments0,
+        stream_progresses := StreamProgresses
+    } = Data0,
+    Agent
+) ->
+    ?SLOG(info, #{
+        msg => leader_agent_connected,
+        agent => Agent,
+        group => Group
+    }),
+    {AgentStreamAssignments, StreamAssignments} =
+        case AgentStreamAssignments0 of
+            #{Agent := _} ->
+                {AgentStreamAssignments0, StreamAssignments0};
+            _ ->
+                UnassignedStreams = unassigned_streams(Data0),
+                Version = 0,
+                StreamAssignment = #{
+                    prev_version => undefined,
+                    version => Version,
+                    streams => UnassignedStreams
+                },
+                AgentStreamAssignments1 = AgentStreamAssignments0#{Agent => StreamAssignment},
+                StreamAssignments1 = lists:foldl(
+                    fun(Stream, Acc) ->
+                        Acc#{Stream => Agent}
+                    end,
+                    StreamAssignments0,
+                    UnassignedStreams
+                ),
+                StreamLease = lists:map(
+                    fun(Stream) ->
+                        #{
+                            stream => Stream,
+                            iterator => maps:get(Stream, StreamProgresses)
+                        }
+                    end,
+                    UnassignedStreams
+                ),
+                ?SLOG(info, #{
+                    msg => leader_lease_streams,
+                    agent => Agent,
+                    group => Group,
+                    streams => length(StreamLease),
+                    version => Version
+                }),
+                ok = emqx_ds_shared_sub_proto:leader_lease_streams(
+                    Agent, Group, StreamLease, Version
+                ),
+                {AgentStreamAssignments1, StreamAssignments1}
+        end,
+    Data0#{
+        agent_stream_assignments => AgentStreamAssignments, stream_assignments => StreamAssignments
+    }.
+
+renew_leases(#{group := Group, agent_stream_assignments := AgentStreamAssignments} = Data) ->
+    ok = lists:foreach(
+        fun({Agent, #{version := Version}}) ->
+            ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version)
+        end,
+        maps:to_list(AgentStreamAssignments)
+    ),
+    Data.
+
+update_agent_stream_states(
+    #{
+        agent_stream_assignments := AgentStreamAssignments,
+        stream_assignments := StreamAssignments,
+        stream_progresses := StreamProgresses0
+    } = Data0,
+    Agent,
+    AgentStreamProgresses,
+    Version
+) ->
+    AgentVersion = emqx_utils_maps:deep_get([Agent, version], AgentStreamAssignments, undefined),
+    AgentPrevVersion = emqx_utils_maps:deep_get(
+        [Agent, prev_version], AgentStreamAssignments, undefined
+    ),
+    case AgentVersion == Version orelse AgentPrevVersion == Version of
+        false ->
+            %% TODO https://emqx.atlassian.net/browse/EMQX-12572
+            %% send invalidate to agent
+            Data0;
+        true ->
+            StreamProgresses1 = lists:foldl(
+                fun(#{stream := Stream, iterator := It}, ProgressesAcc) ->
+                    %% Assert Stream is assigned to Agent
+                    Agent = maps:get(Stream, StreamAssignments),
+                    ProgressesAcc#{Stream => It}
+                end,
+                StreamProgresses0,
+                AgentStreamProgresses
+            ),
+            Data0#{stream_progresses => StreamProgresses1}
+    end.
+
+%%--------------------------------------------------------------------
+%% Helper functions
+%%--------------------------------------------------------------------
+
+router_id() ->
+    emqx_guid:to_hexstr(emqx_guid:gen()).
+
+now_ms() ->
+    erlang:system_time(millisecond).
+
+unassigned_streams(#{stream_progresses := StreamProgresses, stream_assignments := StreamAssignments}) ->
+    Streams = maps:keys(StreamProgresses),
+    AssignedStreams = maps:keys(StreamAssignments),
+    Streams -- AssignedStreams.

+ 59 - 0
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_sup.erl

@@ -0,0 +1,59 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_shared_sub_leader_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([
+    start_link/0,
+    child_spec/0,
+
+    start_leader/1,
+    stop_leader/1
+]).
+
+%% supervisor behaviour callbacks
+-export([init/1]).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+-spec start_link() -> supervisor:startlink_ret().
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+-spec child_spec() -> supervisor:child_spec().
+child_spec() ->
+    #{
+        id => ?MODULE,
+        start => {?MODULE, start_link, []},
+        restart => permanent,
+        shutdown => 5000,
+        type => supervisor
+    }.
+
+-spec start_leader(emqx_ds_shared_sub_leader:options()) -> supervisor:startchild_ret().
+start_leader(Options) ->
+    ChildSpec = emqx_ds_shared_sub_leader:child_spec(Options),
+    supervisor:start_child(?MODULE, ChildSpec).
+
+-spec stop_leader(emqx_persistent_session_ds:share_topic_filter()) -> ok | {error, term()}.
+stop_leader(TopicFilter) ->
+    supervisor:terminate_child(?MODULE, emqx_ds_shared_sub_leader:id(TopicFilter)).
+
+%%------------------------------------------------------------------------------
+%% supervisor behaviour callbacks
+%%------------------------------------------------------------------------------
+
+init([]) ->
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 10,
+        period => 10
+    },
+    ChildSpecs = [],
+    {ok, {SupFlags, ChildSpecs}}.

+ 72 - 0
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl

@@ -0,0 +1,72 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+%% TODO https://emqx.atlassian.net/browse/EMQX-12573
+%% This should be wrapped with a proto_v1 module.
+%% For simplicity, send as simple OTP messages for now.
+
+-module(emqx_ds_shared_sub_proto).
+
+-include("emqx_ds_shared_sub_proto.hrl").
+
+-export([
+    agent_connect_leader/3,
+    agent_update_stream_states/4,
+
+    leader_lease_streams/4,
+    leader_renew_stream_lease/3
+]).
+
+-type agent() :: pid().
+-type leader() :: pid().
+-type topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
+-type group() :: emqx_types:group().
+-type version() :: non_neg_integer().
+
+-type stream_progress() :: #{
+    stream := emqx_ds:stream(),
+    iterator := emqx_ds:iterator()
+}.
+
+-export_type([
+    agent/0,
+    leader/0,
+    group/0,
+    version/0,
+    stream_progress/0
+]).
+
+%% agent -> leader messages
+
+-spec agent_connect_leader(leader(), agent(), topic_filter()) -> ok.
+agent_connect_leader(ToLeader, FromAgent, TopicFilter) ->
+    _ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, TopicFilter)),
+    ok.
+
+-spec agent_update_stream_states(leader(), agent(), list(stream_progress()), version()) -> ok.
+agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) ->
+    _ = erlang:send(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)),
+    ok.
+
+%% ...
+
+%% leader -> agent messages
+
+-spec leader_lease_streams(agent(), group(), list(stream_progress()), version()) -> ok.
+leader_lease_streams(ToAgent, OfGroup, Streams, Version) ->
+    _ = emqx_persistent_session_ds_shared_subs_agent:send(
+        ToAgent,
+        ?leader_lease_streams(OfGroup, Streams, Version)
+    ),
+    ok.
+
+-spec leader_renew_stream_lease(agent(), group(), version()) -> ok.
+leader_renew_stream_lease(ToAgent, OfGroup, Version) ->
+    _ = emqx_persistent_session_ds_shared_subs_agent:send(
+        ToAgent,
+        ?leader_renew_stream_lease(OfGroup, Version)
+    ),
+    ok.
+
+%% ...

+ 85 - 0
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl

@@ -0,0 +1,85 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+%% @doc Asynchronous messages between shared sub agent and shared sub leader
+%% These messages are instantiated on the receiver's side, so they do not
+%% travel over the network.
+
+-ifndef(EMQX_DS_SHARED_SUB_PROTO_HRL).
+-define(EMQX_DS_SHARED_SUB_PROTO_HRL, true).
+
+%% NOTE
+%% We do not need any kind of request/response identification,
+%% because the protocol is fully event-based.
+
+%% agent messages, sent from agent side to the leader
+
+-define(agent_connect_leader_msg, agent_connect_leader).
+-define(agent_update_stream_states_msg, agent_update_stream_states).
+-define(agent_connect_leader_timeout_msg, agent_connect_leader_timeout).
+-define(agent_renew_stream_lease_timeout_msg, agent_renew_stream_lease_timeout).
+
+%% Agent messages sent to the leader.
+%% Leader talks to many agents, `agent` field is used to identify the sender.
+
+-define(agent_connect_leader(Agent, TopicFilter), #{
+    type => ?agent_connect_leader_msg,
+    topic_filter => TopicFilter,
+    agent => Agent
+}).
+
+-define(agent_connect_leader_match(Agent, TopicFilter), #{
+    type := ?agent_connect_leader_msg,
+    topic_filter := TopicFilter,
+    agent := Agent
+}).
+
+-define(agent_update_stream_states(Agent, StreamStates, Version), #{
+    type => ?agent_update_stream_states_msg,
+    stream_states => StreamStates,
+    version => Version,
+    agent => Agent
+}).
+
+-define(agent_update_stream_states_match(Agent, StreamStates, Version), #{
+    type := ?agent_update_stream_states_msg,
+    stream_states := StreamStates,
+    version := Version,
+    agent := Agent
+}).
+
+%% leader messages, sent from the leader to the agent
+%% Agent may have several shared subscriptions, so may talk to several leaders
+%% `group` field is used to identify the leader.
+
+-define(leader_lease_streams_msg, leader_lease_streams).
+-define(leader_renew_stream_lease_msg, leader_renew_stream_lease).
+
+-define(leader_lease_streams(Group, Streams, Version), #{
+    type => ?leader_lease_streams_msg,
+    streams => Streams,
+    version => Version,
+    group => Group
+}).
+
+-define(leader_lease_streams_match(Group, Streams, Version), #{
+    type := ?leader_lease_streams_msg,
+    streams := Streams,
+    version := Version,
+    group := Group
+}).
+
+-define(leader_renew_stream_lease(Group, Version), #{
+    type => ?leader_renew_stream_lease_msg,
+    version => Version,
+    group => Group
+}).
+
+-define(leader_renew_stream_lease_match(Group, Version), #{
+    type := ?leader_renew_stream_lease_msg,
+    version := Version,
+    group := Group
+}).
+
+-endif.

+ 111 - 0
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl

@@ -0,0 +1,111 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_shared_sub_registry).
+
+-behaviour(gen_server).
+
+-include_lib("emqx/include/logger.hrl").
+
+-export([
+    start_link/0,
+    child_spec/0,
+
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2
+]).
+
+-export([
+    lookup_leader/2
+]).
+
+-record(lookup_leader, {
+    agent :: emqx_ds_shared_sub_proto:agent(),
+    topic_filter :: emqx_persistent_session_ds:share_topic_filter()
+}).
+
+-define(gproc_id(ID), {n, l, ID}).
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+-spec lookup_leader(
+    emqx_ds_shared_sub_proto:agent(), emqx_persistent_session_ds:share_topic_filter()
+) -> ok.
+lookup_leader(Agent, TopicFilter) ->
+    gen_server:cast(?MODULE, #lookup_leader{agent = Agent, topic_filter = TopicFilter}).
+
+%%--------------------------------------------------------------------
+%% Internal API
+%%--------------------------------------------------------------------
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+child_spec() ->
+    #{
+        id => ?MODULE,
+        start => {?MODULE, start_link, []},
+        restart => permanent,
+        shutdown => 5000,
+        type => worker
+    }.
+
+%%--------------------------------------------------------------------
+%% gen_server callbacks
+%%--------------------------------------------------------------------
+
+init([]) ->
+    {ok, #{}}.
+
+handle_call(_Request, _From, State) ->
+    {reply, {error, unknown_request}, State}.
+
+handle_cast(#lookup_leader{agent = Agent, topic_filter = TopicFilter}, State) ->
+    State1 = do_lookup_leader(Agent, TopicFilter, State),
+    {noreply, State1}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+do_lookup_leader(Agent, TopicFilter, State) ->
+    %% TODO https://emqx.atlassian.net/browse/EMQX-12309
+    %% Cluster-wide unique leader election should be implemented
+    Id = emqx_ds_shared_sub_leader:id(TopicFilter),
+    LeaderPid =
+        case gproc:where(?gproc_id(Id)) of
+            undefined ->
+                {ok, Pid} = emqx_ds_shared_sub_leader_sup:start_leader(#{
+                    topic_filter => TopicFilter
+                }),
+                {ok, NewLeaderPid} = emqx_ds_shared_sub_leader:register(
+                    Pid,
+                    fun() ->
+                        {LPid, _} = gproc:reg_or_locate(?gproc_id(Id)),
+                        LPid
+                    end
+                ),
+                NewLeaderPid;
+            Pid ->
+                Pid
+        end,
+    ?SLOG(info, #{
+        msg => lookup_leader,
+        agent => Agent,
+        topic_filter => TopicFilter,
+        leader => LeaderPid
+    }),
+    ok = emqx_ds_shared_sub_proto:agent_connect_leader(LeaderPid, Agent, TopicFilter),
+    State.

+ 4 - 1
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_sup.erl

@@ -29,5 +29,8 @@ init([]) ->
         intensity => 10,
         intensity => 10,
         period => 10
         period => 10
     },
     },
-    ChildSpecs = [],
+    ChildSpecs = [
+        emqx_ds_shared_sub_registry:child_spec(),
+        emqx_ds_shared_sub_leader_sup:child_spec()
+    ],
     {ok, {SupFlags, ChildSpecs}}.
     {ok, {SupFlags, ChildSpecs}}.

+ 165 - 0
apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl

@@ -0,0 +1,165 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_shared_sub_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx/include/asserts.hrl").
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx, #{
+                config => #{
+                    <<"durable_sessions">> => #{
+                        <<"enable">> => true,
+                        <<"renew_streams_interval">> => "100ms"
+                    },
+                    <<"durable_storage">> => #{
+                        <<"messages">> => #{
+                            <<"backend">> => <<"builtin">>
+                        }
+                    }
+                }
+            }},
+            emqx_ds_shared_sub
+        ],
+        #{work_dir => ?config(priv_dir, Config)}
+    ),
+    [{apps, Apps} | Config].
+
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(apps, Config)),
+    ok.
+
+init_per_testcase(_TC, Config) ->
+    ok = snabbkaffe:start_trace(),
+    Config.
+
+end_per_testcase(_TC, _Config) ->
+    ok = snabbkaffe:stop(),
+    ok = terminate_leaders(),
+    ok.
+
+t_lease_initial(_Config) ->
+    ConnPub = emqtt_connect_pub(<<"client_pub">>),
+
+    %% Need to pre-create some streams in "topic/#".
+    %% Leader is dummy by far and won't update streams after the first lease to the agent.
+    %% So there should be some streams already when the agent connects.
+    ok = init_streams(ConnPub, <<"topic1/1">>),
+
+    ConnShared = emqtt_connect_sub(<<"client_shared">>),
+    {ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr1/topic1/#">>, 1),
+
+    {ok, _} = emqtt:publish(ConnPub, <<"topic1/1">>, <<"hello2">>, 1),
+    ?assertReceive({publish, #{payload := <<"hello2">>}}, 10_000),
+
+    ok = emqtt:disconnect(ConnShared),
+    ok = emqtt:disconnect(ConnPub).
+
+t_lease_reconnect(_Config) ->
+    ConnPub = emqtt_connect_pub(<<"client_pub">>),
+
+    %% Need to pre-create some streams in "topic/#".
+    %% Leader is dummy by far and won't update streams after the first lease to the agent.
+    %% So there should be some streams already when the agent connects.
+    ok = init_streams(ConnPub, <<"topic2/2">>),
+
+    ConnShared = emqtt_connect_sub(<<"client_shared">>),
+
+    %% Stop registry to simulate unability to find leader.
+    ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry),
+
+    ?assertWaitEvent(
+        {ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr2/topic2/#">>, 1),
+        #{?snk_kind := find_leader_timeout},
+        5_000
+    ),
+
+    %% Start registry, agent should retry after some time and find the leader.
+    ?assertWaitEvent(
+        {ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry),
+        #{?snk_kind := leader_lease_streams},
+        5_000
+    ),
+
+    ct:sleep(1_000),
+    {ok, _} = emqtt:publish(ConnPub, <<"topic2/2">>, <<"hello2">>, 1),
+
+    ?assertReceive({publish, #{payload := <<"hello2">>}}, 10_000),
+
+    ok = emqtt:disconnect(ConnShared),
+    ok = emqtt:disconnect(ConnPub).
+
+t_renew_lease_timeout(_Config) ->
+    ConnShared = emqtt_connect_sub(<<"client_shared">>),
+
+    ?assertWaitEvent(
+        {ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr3/topic3/#">>, 1),
+        #{?snk_kind := leader_lease_streams},
+        5_000
+    ),
+
+    ?check_trace(
+        ?wait_async_action(
+            ok = terminate_leaders(),
+            #{?snk_kind := leader_lease_streams},
+            5_000
+        ),
+        fun(Trace) ->
+            ?strict_causality(
+                #{?snk_kind := renew_lease_timeout},
+                #{?snk_kind := leader_lease_streams},
+                Trace
+            )
+        end
+    ),
+
+    ok = emqtt:disconnect(ConnShared).
+
+%%--------------------------------------------------------------------
+%% Helper functions
+%%--------------------------------------------------------------------
+
+init_streams(ConnPub, Topic) ->
+    ConnRegular = emqtt_connect_sub(<<"client_regular">>),
+    {ok, _, _} = emqtt:subscribe(ConnRegular, Topic, 1),
+    {ok, _} = emqtt:publish(ConnPub, Topic, <<"hello1">>, 1),
+
+    ?assertReceive({publish, #{payload := <<"hello1">>}}, 5_000),
+
+    ok = emqtt:disconnect(ConnRegular).
+
+emqtt_connect_sub(ClientId) ->
+    {ok, C} = emqtt:start_link([
+        {client_id, ClientId},
+        {clean_start, true},
+        {proto_ver, v5},
+        {properties, #{'Session-Expiry-Interval' => 7_200}}
+    ]),
+    {ok, _} = emqtt:connect(C),
+    C.
+
+emqtt_connect_pub(ClientId) ->
+    {ok, C} = emqtt:start_link([
+        {client_id, ClientId},
+        {clean_start, true},
+        {proto_ver, v5}
+    ]),
+    {ok, _} = emqtt:connect(C),
+    C.
+
+terminate_leaders() ->
+    ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup),
+    {ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup),
+    ok.