Explorar o código

feat(sessds): make ds session be aware of shared subscriptions

Ilya Averyanov hai 1 ano
pai
achega
9cdfbb0845

+ 7 - 0
apps/emqx/include/emqx_session.hrl

@@ -20,4 +20,11 @@
 -define(IS_SESSION_IMPL_MEM(S), (is_tuple(S) andalso element(1, S) =:= session)).
 -define(IS_SESSION_IMPL_DS(S), (is_map_key(id, S))).
 
+%% (Erlang) messages that a connection process should forward to the
+%% session handler.
+-record(session_message, {
+    message :: term()
+}).
+-define(session_message(MSG), #session_message{message = MSG}).
+
 -endif.

+ 4 - 0
apps/emqx/src/emqx_channel.erl

@@ -19,6 +19,7 @@
 
 -include("emqx.hrl").
 -include("emqx_channel.hrl").
+-include("emqx_session.hrl").
 -include("emqx_mqtt.hrl").
 -include("emqx_access_control.hrl").
 -include("logger.hrl").
@@ -1299,6 +1300,9 @@ handle_info({'DOWN', Ref, process, Pid, Reason}, Channel) ->
         [] -> {ok, Channel};
         Msgs -> {ok, Msgs, Channel}
     end;
+handle_info(?session_message(Message), #channel{session = Session} = Channel) ->
+    NSession = emqx_session:handle_info(Message, Session),
+    {ok, Channel#channel{session = NSession}};
 handle_info(Info, Channel) ->
     ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {ok, Channel}.

+ 105 - 17
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -25,6 +25,7 @@
 
 -include("emqx_mqtt.hrl").
 
+-include("emqx_session.hrl").
 -include("emqx_persistent_session_ds/session_internals.hrl").
 
 -ifdef(TEST).
@@ -63,6 +64,7 @@
     deliver/3,
     replay/3,
     handle_timeout/3,
+    handle_info/2,
     disconnect/2,
     terminate/2
 ]).
@@ -106,6 +108,7 @@
     seqno/0,
     timestamp/0,
     topic_filter/0,
+    share_topic_filter/0,
     subscription_id/0,
     subscription/0,
     session/0,
@@ -117,7 +120,8 @@
 %% Currently, this is the clientid.  We avoid `emqx_types:clientid()' because that can be
 %% an atom, in theory (?).
 -type id() :: binary().
--type topic_filter() :: emqx_types:topic() | #share{}.
+-type share_topic_filter() :: #share{}.
+-type topic_filter() :: emqx_types:topic() | share_topic_filter().
 
 %% Subscription and subscription states:
 %%
@@ -155,6 +159,8 @@
     subopts := map()
 }.
 
+-type shared_sub_state() :: term().
+
 -define(TIMER_PULL, timer_pull).
 -define(TIMER_GET_STREAMS, timer_get_streams).
 -define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at).
@@ -172,6 +178,8 @@
     props := map(),
     %% Persistent state:
     s := emqx_persistent_session_ds_state:t(),
+    %% Shared subscription state:
+    shared_sub_s := shared_sub_state(),
     %% Buffer:
     inflight := emqx_persistent_session_ds_inflight:t(),
     %% In-progress replay:
@@ -277,8 +285,11 @@ info(created_at, #{s := S}) ->
     emqx_persistent_session_ds_state:get_created_at(S);
 info(is_persistent, #{}) ->
     true;
-info(subscriptions, #{s := S}) ->
-    emqx_persistent_session_ds_subs:to_map(S);
+info(subscriptions, #{s := S, shared_sub_s := SharedSubS}) ->
+    maps:merge(
+        emqx_persistent_session_ds_subs:to_map(S),
+        emqx_persistent_session_ds_shared_subs:to_map(S, SharedSubS)
+    );
 info(subscriptions_cnt, #{s := S}) ->
     emqx_persistent_session_ds_state:n_subscriptions(S);
 info(subscriptions_max, #{props := Conf}) ->
@@ -356,15 +367,23 @@ print_session(ClientId) ->
 %% Client -> Broker: SUBSCRIBE / UNSUBSCRIBE
 %%--------------------------------------------------------------------
 
+%% Suppress warnings about clauses handling unimplemented reuslts
+%% of `emqx_persistent_session_ds_shared_subs:on_subscribe/3`
+-dialyzer({nowarn_function, subscribe/3}).
 -spec subscribe(topic_filter(), emqx_types:subopts(), session()) ->
     {ok, session()} | {error, emqx_types:reason_code()}.
 subscribe(
-    #share{},
-    _SubOpts,
-    _Session
+    #share{} = TopicFilter,
+    SubOpts,
+    Session
 ) ->
-    %% TODO: Shared subscriptions are not supported yet:
-    {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED};
+    case emqx_persistent_session_ds_shared_subs:on_subscribe(TopicFilter, SubOpts, Session) of
+        {ok, S1} ->
+            S = emqx_persistent_session_ds_state:commit(S1),
+            {ok, Session#{s => S}};
+        Error = {error, _} ->
+            Error
+    end;
 subscribe(
     TopicFilter,
     SubOpts,
@@ -378,8 +397,23 @@ subscribe(
             Error
     end.
 
+%% Suppress warnings about clauses handling unimplemented reuslts
+%% of `emqx_persistent_session_ds_shared_subs:on_subscribe/4`
+-dialyzer({nowarn_function, unsubscribe/2}).
 -spec unsubscribe(topic_filter(), session()) ->
     {ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}.
+unsubscribe(
+    #share{} = TopicFilter,
+    Session = #{id := SessionId, s := S0}
+) ->
+    case emqx_persistent_session_ds_shared_subs:on_unsubscribe(SessionId, TopicFilter, S0) of
+        {ok, S1, #{id := SubId, subopts := SubOpts}} ->
+            S2 = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1),
+            S = emqx_persistent_session_ds_state:commit(S2),
+            {ok, Session#{s => S}, SubOpts};
+        Error = {error, _} ->
+            Error
+    end;
 unsubscribe(
     TopicFilter,
     Session = #{id := SessionId, s := S0}
@@ -540,6 +574,8 @@ pubcomp(_ClientInfo, PacketId, Session0) ->
     end.
 
 %%--------------------------------------------------------------------
+%% Delivers
+%%--------------------------------------------------------------------
 
 -spec deliver(clientinfo(), [emqx_types:deliver()], session()) ->
     {ok, replies(), session()}.
@@ -551,6 +587,10 @@ deliver(ClientInfo, Delivers, Session0) ->
     ),
     {ok, [], pull_now(Session)}.
 
+%%--------------------------------------------------------------------
+%% Timeouts
+%%--------------------------------------------------------------------
+
 -spec handle_timeout(clientinfo(), _Timeout, session()) ->
     {ok, replies(), session()} | {ok, replies(), timeout(), session()}.
 handle_timeout(ClientInfo, ?TIMER_PULL, Session0) ->
@@ -573,14 +613,15 @@ handle_timeout(ClientInfo, ?TIMER_PULL, Session0) ->
 handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) ->
     Session = replay_streams(Session0, ClientInfo),
     {ok, [], Session};
-handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) ->
+handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0, shared_sub_s := SharedSubS0}) ->
     S1 = emqx_persistent_session_ds_subs:gc(S0),
-    S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1),
+    S2 = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1),
+    {S, SharedSubS} = emqx_persistent_session_ds_shared_subs:renew_streams(S2, SharedSubS0),
     Interval = get_config(ClientInfo, [renew_streams_interval]),
     Session = emqx_session:ensure_timer(
         ?TIMER_GET_STREAMS,
         Interval,
-        Session0#{s => S}
+        Session0#{s => S, shared_sub_s => SharedSubS}
     ),
     {ok, [], Session};
 handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0 = #{s := S0}) ->
@@ -601,6 +642,45 @@ handle_timeout(_ClientInfo, Timeout, Session) ->
     ?SLOG(warning, #{msg => "unknown_ds_timeout", timeout => Timeout}),
     {ok, [], Session}.
 
+%%--------------------------------------------------------------------
+%% Generic messages
+%%--------------------------------------------------------------------
+
+-spec handle_info(term(), session()) -> session().
+handle_info(?shared_sub_message(Msg), Session = #{s := S0, shared_sub_s := SharedSubS0}) ->
+    {S, SharedSubS} = emqx_persistent_session_ds_shared_subs:on_info(S0, SharedSubS0, Msg),
+    Session#{s => S, shared_sub_s => SharedSubS}.
+
+%%--------------------------------------------------------------------
+%% Shared subscription outgoing messages
+%%--------------------------------------------------------------------
+
+shared_sub_opts(SessionId) ->
+    #{
+        session_id => SessionId,
+        send_funs => #{
+            send => fun send_message/2,
+            send_after => fun send_message_after/3
+        }
+    }.
+
+send_message(Dest, Msg) ->
+    case Dest =:= self() of
+        true ->
+            erlang:send(Dest, ?session_message(?shared_sub_message(Msg))),
+            Msg;
+        false ->
+            erlang:send(Dest, Msg)
+    end.
+
+send_message_after(Time, Dest, Msg) ->
+    case Dest =:= self() of
+        true ->
+            erlang:send_after(Time, Dest, ?session_message(?shared_sub_message(Msg)));
+        false ->
+            erlang:send_after(Time, Dest, Msg)
+    end.
+
 bump_last_alive(S0) ->
     %% Note: we take a pessimistic approach here and assume that the client will be alive
     %% until the next bump timeout.  With this, we avoid garbage collecting this session
@@ -814,13 +894,17 @@ session_open(
                     S4 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S3),
                     S5 = set_clientinfo(ClientInfo, S4),
                     S6 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S5),
-                    S = emqx_persistent_session_ds_state:commit(S6),
+                    {ok, S7, SharedSubS} = emqx_persistent_session_ds_shared_subs:open(
+                        S6, shared_sub_opts(SessionId)
+                    ),
+                    S = emqx_persistent_session_ds_state:commit(S7),
                     Inflight = emqx_persistent_session_ds_inflight:new(
                         receive_maximum(NewConnInfo)
                     ),
                     #{
                         id => SessionId,
                         s => S,
+                        shared_sub_s => SharedSubS,
                         inflight => Inflight,
                         props => #{}
                     }
@@ -869,6 +953,7 @@ session_ensure_new(
         id => Id,
         props => Conf,
         s => S,
+        shared_sub_s => emqx_persistent_session_ds_shared_subs:new(shared_sub_opts(Id)),
         inflight => emqx_persistent_session_ds_inflight:new(receive_maximum(ConnInfo))
     }.
 
@@ -879,8 +964,8 @@ session_drop(SessionId, Reason) ->
     case emqx_persistent_session_ds_state:open(SessionId) of
         {ok, S0} ->
             ?tp(debug, drop_persistent_session, #{client_id => SessionId, reason => Reason}),
-            emqx_persistent_session_ds_subs:on_session_drop(SessionId, S0),
-            emqx_persistent_session_ds_state:delete(SessionId);
+            ok = emqx_persistent_session_ds_subs:on_session_drop(SessionId, S0),
+            ok = emqx_persistent_session_ds_state:delete(SessionId);
         undefined ->
             ok
     end.
@@ -917,9 +1002,12 @@ do_ensure_all_iterators_closed(_DSSessionID) ->
 %% Normal replay:
 %%--------------------------------------------------------------------
 
-fetch_new_messages(Session = #{s := S}, ClientInfo) ->
-    Streams = emqx_persistent_session_ds_stream_scheduler:find_new_streams(S),
-    fetch_new_messages(Streams, Session, ClientInfo).
+fetch_new_messages(Session0 = #{s := S0}, ClientInfo) ->
+    Streams = emqx_persistent_session_ds_stream_scheduler:find_new_streams(S0),
+    Session1 = fetch_new_messages(Streams, Session0, ClientInfo),
+    #{s := S1, shared_sub_s := SharedSubS0} = Session1,
+    {S2, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replayed(S1, SharedSubS0),
+    Session1#{s => S2, shared_sub_s => SharedSubS1}.
 
 fetch_new_messages([], Session, _ClientInfo) ->
     Session;

+ 368 - 0
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl

@@ -0,0 +1,368 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_persistent_session_ds_shared_subs).
+
+-include_lib("emqx_mqtt.hrl").
+-include("session_internals.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
+
+-export([
+    new/1,
+    open/2,
+
+    on_subscribe/3,
+    on_unsubscribe/4,
+
+    on_streams_replayed/2,
+    on_info/3,
+
+    renew_streams/2,
+    to_map/2
+]).
+
+-record(agent_message, {
+    message :: term()
+}).
+
+-type t() :: #{
+    agent := emqx_persistent_session_ds_shared_subs_agent:t()
+}.
+-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
+-type opts() :: #{
+    session_id := emqx_persistent_session_ds:id(),
+    send_funs := #{
+        send := fun((pid(), term()) -> term()),
+        send_after := fun((non_neg_integer(), pid(), term()) -> reference())
+    }
+}.
+
+-define(agent_message(Msg), #agent_message{message = Msg}).
+-define(rank_x, rank_shared).
+-define(rank_y, 0).
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+-spec new(emqx_persistent_session_ds:shared_sub_opts()) -> t().
+new(Opts) ->
+    #{
+        agent => emqx_persistent_session_ds_shared_subs_agent:new(
+            agent_opts(Opts)
+        )
+    }.
+
+-spec open(emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds:shared_sub_opts()) ->
+    {ok, emqx_persistent_session_ds_state:t(), t()}.
+open(S, Opts) ->
+    SharedSubscriptions = fold_shared_subs(
+        fun(#share{} = TopicFilter, Sub, Acc) ->
+            [{TopicFilter, to_agent_subscription(S, Sub)} | Acc]
+        end,
+        [],
+        S
+    ),
+    Agent = emqx_persistent_session_ds_shared_subs_agent:open(
+        SharedSubscriptions, agent_opts(Opts)
+    ),
+    SharedSubS = #{agent => Agent},
+    {ok, S, SharedSubS}.
+
+-spec on_subscribe(
+    emqx_persistent_session_ds_state:t(),
+    t(),
+    share_topic_filter(),
+    emqx_types:subopts()
+) -> {ok, emqx_persistent_session_ds_state:t(), t()} | {error, emqx_types:reason_code()}.
+on_subscribe(TopicFilter, SubOpts, #{s := S} = Session) ->
+    Subscription = emqx_persistent_session_ds_state:get_subscription(TopicFilter, S),
+    on_subscribe(Subscription, TopicFilter, SubOpts, Session).
+
+-spec on_unsubscribe(
+    emqx_persistent_session_ds:id(),
+    emqx_persistent_session_ds:topic_filter(),
+    emqx_persistent_session_ds_state:t(),
+    t()
+) ->
+    {ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()}
+    | {error, emqx_types:reason_code()}.
+on_unsubscribe(SessionId, TopicFilter, S0, #{agent := Agent0} = SharedSubS0) ->
+    case lookup(TopicFilter, S0) of
+        undefined ->
+            {error, ?RC_NO_SUBSCRIPTION_EXISTED};
+        Subscription ->
+            ?tp(persistent_session_ds_subscription_delete, #{
+                session_id => SessionId, topic_filter => TopicFilter
+            }),
+            Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_unsubscribe(
+                Agent0, TopicFilter
+            ),
+            SharedSubS = SharedSubS0#{agent => Agent1},
+            S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0),
+            {ok, S, SharedSubS, Subscription}
+    end.
+
+-spec renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
+    {emqx_persistent_session_ds_state:t(), t()}.
+renew_streams(S0, #{agent := Agent0} = SharedSubS0) ->
+    {NewLeasedStreams, RevokedStreams, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams(
+        Agent0
+    ),
+    S1 = lists:foldl(fun accept_stream/2, S0, NewLeasedStreams),
+    S2 = lists:foldl(fun revoke_stream/2, S1, RevokedStreams),
+    SharedSubS1 = SharedSubS0#{agent => Agent1},
+    {S2, SharedSubS1}.
+
+-spec on_streams_replayed(
+    emqx_persistent_session_ds_state:t(),
+    t()
+) -> t().
+on_streams_replayed(S, #{agent := Agent0} = SharedSubS0) ->
+    %% TODO
+    %% Is it sufficient for a report?
+    Progress = fold_shared_stream_states(
+        fun(TopicFilter, Stream, SRS, Acc) ->
+            #srs{it_begin = BeginIt} = SRS,
+            StreamProgress = #{
+                topic_filter => TopicFilter,
+                stream => Stream,
+                iterator => BeginIt
+            },
+            [StreamProgress | Acc]
+        end,
+        [],
+        S
+    ),
+    Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
+        Agent0, Progress
+    ),
+    SharedSubS1 = SharedSubS0#{agent => Agent1},
+    {S, SharedSubS1}.
+
+-spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) ->
+    {emqx_persistent_session_ds_state:t(), t()}.
+on_info(S, #{agent := Agent0} = SharedSubS0, ?agent_message(Info)) ->
+    Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_info(Agent0, Info),
+    SharedSubS1 = SharedSubS0#{agent => Agent1},
+    {S, SharedSubS1};
+on_info(S, SharedSubS, _Info) ->
+    %% TODO
+    %% Log warning
+    {S, SharedSubS}.
+
+-spec to_map(emqx_persistent_session_ds_state:t(), t()) -> map().
+to_map(_S, _SharedSubS) ->
+    %% TODO
+    #{}.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+fold_shared_subs(Fun, Acc, S) ->
+    emqx_persistent_session_ds_state:fold_subscriptions(
+        fun
+            (#share{} = TopicFilter, Sub, Acc0) -> Fun(TopicFilter, Sub, Acc0);
+            (_, _Sub, Acc0) -> Acc0
+        end,
+        Acc,
+        S
+    ).
+
+fold_shared_stream_states(Fun, Acc, S) ->
+    %% TODO
+    %% Optimize or cache
+    TopicFilters = fold_shared_subs(
+        fun
+            (#share{} = TopicFilter, #{id := Id} = _Sub, Acc0) ->
+                Acc0#{Id => TopicFilter};
+            (_, _, Acc0) ->
+                Acc0
+        end,
+        #{},
+        S
+    ),
+    emqx_persistent_session_ds_state:fold_streams(
+        fun({SubId, Stream}, SRS, Acc0) ->
+            case TopicFilters of
+                #{SubId := TopicFilter} ->
+                    Fun(TopicFilter, Stream, SRS, Acc0);
+                _ ->
+                    Acc0
+            end
+        end,
+        Acc,
+        S
+    ).
+
+on_subscribe(undefined, TopicFilter, SubOpts, #{props := Props, s := S} = Session) ->
+    #{max_subscriptions := MaxSubscriptions} = Props,
+    case emqx_persistent_session_ds_state:n_subscriptions(S) < MaxSubscriptions of
+        true ->
+            create_new_subscription(TopicFilter, SubOpts, Session);
+        false ->
+            {error, ?RC_QUOTA_EXCEEDED}
+    end;
+on_subscribe(Subscription, TopicFilter, SubOpts, Session) ->
+    update_subscription(Subscription, TopicFilter, SubOpts, Session).
+
+create_new_subscription(TopicFilter, SubOpts, #{
+    id := SessionId, s := S0, shared_sub_s := #{agent := Agent0} = SharedSubS0, props := Props
+}) ->
+    case
+        emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
+            Agent0, TopicFilter, SubOpts
+        )
+    of
+        {ok, Agent1} ->
+            #{upgrade_qos := UpgradeQoS} = Props,
+            {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
+            {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1),
+            SState = #{
+                parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts
+            },
+            S3 = emqx_persistent_session_ds_state:put_subscription_state(
+                SStateId, SState, S2
+            ),
+            Subscription = #{
+                id => SubId,
+                current_state => SStateId,
+                start_time => now_ms()
+            },
+            S = emqx_persistent_session_ds_state:put_subscription(
+                TopicFilter, Subscription, S3
+            ),
+            SharedSubS = SharedSubS0#{agent => Agent1},
+            ?tp(persistent_session_ds_shared_subscription_added, #{
+                topic_filter => TopicFilter, session => SessionId
+            }),
+            {ok, S, SharedSubS};
+        {error, _} = Error ->
+            Error
+    end.
+
+update_subscription(#{current_state := SStateId0, id := SubId} = Sub0, TopicFilter, SubOpts, #{
+    s := S0, shared_sub_s := SharedSubS, props := Props
+}) ->
+    #{upgrade_qos := UpgradeQoS} = Props,
+    SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts},
+    case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of
+        SState ->
+            %% Client resubscribed with the same parameters:
+            {ok, S0, SharedSubS};
+        _ ->
+            %% Subsription parameters changed:
+            {SStateId, S1} = emqx_persistent_session_ds_state:new_id(S0),
+            S2 = emqx_persistent_session_ds_state:put_subscription_state(
+                SStateId, SState, S1
+            ),
+            Sub = Sub0#{current_state => SStateId},
+            S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Sub, S2),
+            {ok, S, SharedSubS}
+    end.
+
+lookup(TopicFilter, S) ->
+    case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S) of
+        Sub = #{current_state := SStateId} ->
+            case emqx_persistent_session_ds_state:get_subscription_state(SStateId, S) of
+                #{subopts := SubOpts} ->
+                    Sub#{subopts => SubOpts};
+                undefined ->
+                    undefined
+            end;
+        undefined ->
+            undefined
+    end.
+
+accept_stream(
+    #{topic_filter := TopicFilter, stream := Stream, iterator := Iterator}, S0
+) ->
+    case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
+        undefined ->
+            %% This should not happen.
+            %% Agent should have received unsubscribe callback
+            %% and should not have passed this stream as a new one
+            error(new_stream_without_sub);
+        #{id := SubId, current_state := SStateId} ->
+            NewSRS =
+                #srs{
+                    rank_x = ?rank_x,
+                    rank_y = ?rank_y,
+                    it_begin = Iterator,
+                    it_end = Iterator,
+                    sub_state_id = SStateId
+                },
+            Key = {SubId, Stream},
+            S1 = emqx_persistent_session_ds_state:put_stream(Key, NewSRS, S0),
+            S1
+    end.
+
+revoke_stream(
+    #{topic_filter := TopicFilter, stream := Stream}, S0
+) ->
+    case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
+        undefined ->
+            %% This should not happen.
+            %% Agent should have received unsubscribe callback
+            %% and should not have revoked this stream
+            S0;
+        #{id := SubId} ->
+            Key = {SubId, Stream},
+            case emqx_persistent_session_ds_state:get_stream(Key, S0) of
+                undefined ->
+                    S0;
+                SRS0 ->
+                    SRS1 = SRS0#srs{unsubscribed = true},
+                    S1 = emqx_persistent_session_ds_state:put_stream(Key, SRS1, S0),
+                    S1
+            end
+    end.
+
+-spec to_agent_subscription(
+    emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds:subscription()
+) ->
+    emqx_persistent_session_ds_shared_subs_agent:subscription().
+to_agent_subscription(_S, Subscription) ->
+    %% TODO
+    %% do we need anything from sub state?
+    maps:with([start_time], Subscription).
+
+-spec agent_opts(opts()) -> emqx_persistent_session_ds_shared_subs_agent:opts().
+agent_opts(#{session_id := SessionId, send_funs := SendFuns}) ->
+    #{
+        session_id => SessionId,
+        send_funs => agent_send_funs(SendFuns)
+    }.
+
+agent_send_funs(#{
+    send := Send,
+    send_after := SendAfter
+}) ->
+    #{
+        send => fun(Pid, Msg) -> send_from_agent(Send, Pid, Msg) end,
+        send_after => fun(Time, Pid, Msg) ->
+            send_after_from_agent(SendAfter, Time, Pid, Msg)
+        end
+    }.
+
+send_from_agent(Send, Dest, Msg) ->
+    case Dest =:= self() of
+        true ->
+            Send(Dest, ?agent_message(Msg)),
+            Msg;
+        false ->
+            Send(Dest, Msg)
+    end.
+
+send_after_from_agent(SendAfter, Time, Dest, Msg) ->
+    case Dest =:= self() of
+        true ->
+            SendAfter(Time, Dest, ?agent_message(Msg));
+        false ->
+            SendAfter(Time, Dest, Msg)
+    end.
+
+now_ms() ->
+    erlang:system_time(millisecond).

+ 104 - 0
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl

@@ -0,0 +1,104 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_persistent_session_ds_shared_subs_agent).
+
+-type session_id() :: emqx_persistent_session_ds:id().
+
+-type subscription() :: #{
+    start_time := emqx_ds:time()
+}.
+
+-type t() :: term().
+-type topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
+
+-type opts() :: #{
+    session_id := session_id(),
+    send_funs := #{
+        send := fun((pid(), term()) -> term()),
+        send_after := fun((non_neg_integer(), pid(), term()) -> reference())
+    }
+}.
+
+%% TODO
+%% This records goe through network, we better shrink them
+%% * use integer keys
+%% * somehow avoid passing stream and topic_filter — they both are part of the iterator
+-type stream_lease() :: #{
+    %% Used as "external" subscription_id
+    topic_filter := topic_filter(),
+    stream := emqx_ds:stream(),
+    iterator := emqx_ds:iterator()
+}.
+
+-type stream_revoke() :: #{
+    topic_filter := topic_filter(),
+    stream := emqx_ds:stream()
+}.
+
+-type stream_progress() :: #{
+    topic_filter := topic_filter(),
+    stream := emqx_ds:stream(),
+    iterator := emqx_ds:iterator()
+}.
+
+-export_type([
+    t/0,
+    subscription/0,
+    session_id/0,
+    stream_lease/0,
+    opts/0
+]).
+
+-export([
+    new/1,
+    open/2,
+
+    on_subscribe/3,
+    on_unsubscribe/2,
+    on_session_drop/1,
+    on_stream_progress/2,
+    on_info/2,
+
+    renew_streams/1
+]).
+
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+-spec new(opts()) -> t().
+new(_Opts) ->
+    undefined.
+
+-spec open([{topic_filter(), subscription()}], opts()) -> t().
+open(_Topics, _Opts) ->
+    undefined.
+
+-spec on_subscribe(t(), topic_filter(), emqx_types:subopts()) ->
+    {ok, t()} | {error, term()}.
+on_subscribe(Agent, _TopicFilter, _SubOpts) ->
+    % {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}
+    {ok, Agent}.
+
+-spec on_unsubscribe(t(), topic_filter()) -> t().
+on_unsubscribe(Agent, _TopicFilter) ->
+    Agent.
+
+-spec on_session_drop(t()) -> t().
+on_session_drop(Agent) ->
+    Agent.
+
+-spec renew_streams(t()) -> {[stream_lease()], [stream_revoke()], t()}.
+renew_streams(Agent) ->
+    {[], [], Agent}.
+
+-spec on_stream_progress(t(), [stream_progress()]) -> t().
+on_stream_progress(Agent, _StreamProgress) ->
+    Agent.
+
+-spec on_info(t(), term()) -> t().
+on_info(Agent, _Info) ->
+    Agent.

+ 42 - 29
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl

@@ -16,7 +16,7 @@
 -module(emqx_persistent_session_ds_stream_scheduler).
 
 %% API:
--export([find_new_streams/1, find_replay_streams/1, is_fully_acked/2]).
+-export([find_new_streams/1, find_replay_streams/1, is_fully_acked/2, shuffle/1]).
 -export([renew_streams/1, on_unsubscribe/2]).
 
 %% behavior callbacks:
@@ -87,22 +87,20 @@ find_new_streams(S) ->
     %% after timeout?)
     Comm1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
     Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
-    shuffle(
-        emqx_persistent_session_ds_state:fold_streams(
-            fun
-                (_Key, #srs{it_end = end_of_stream}, Acc) ->
-                    Acc;
-                (Key, Stream, Acc) ->
-                    case is_fully_acked(Comm1, Comm2, Stream) andalso not Stream#srs.unsubscribed of
-                        true ->
-                            [{Key, Stream} | Acc];
-                        false ->
-                            Acc
-                    end
-            end,
-            [],
-            S
-        )
+    emqx_persistent_session_ds_state:fold_streams(
+        fun
+            (_Key, #srs{it_end = end_of_stream}, Acc) ->
+                Acc;
+            (Key, Stream, Acc) ->
+                case is_fully_acked(Comm1, Comm2, Stream) andalso not Stream#srs.unsubscribed of
+                    true ->
+                        [{Key, Stream} | Acc];
+                    false ->
+                        Acc
+                end
+        end,
+        [],
+        S
     ).
 
 %% @doc This function makes the session aware of the new streams.
@@ -127,7 +125,12 @@ renew_streams(S0) ->
     S1 = remove_unsubscribed_streams(S0),
     S2 = remove_fully_replayed_streams(S1),
     S3 = update_stream_subscription_state_ids(S2),
-    emqx_persistent_session_ds_subs:fold(
+    %% For shared subscriptions, the streams are populated by
+    %% `emqx_persistent_session_ds_shared_subs`.
+    %% TODO
+    %% Move discovery of proper streams
+    %% out of the scheduler for complete symmetry?
+    fold_proper_subscriptions(
         fun
             (Key, #{start_time := StartTime, id := SubId, current_state := SStateId}, Acc) ->
                 TopicFilter = emqx_topic:words(Key),
@@ -198,6 +201,19 @@ is_fully_acked(Srs, S) ->
     CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
     is_fully_acked(CommQos1, CommQos2, Srs).
 
+-spec shuffle([A]) -> [A].
+shuffle(L0) ->
+    L1 = lists:map(
+        fun(A) ->
+            %% maybe topic/stream prioritization could be introduced here?
+            {rand:uniform(), A}
+        end,
+        L0
+    ),
+    L2 = lists:sort(L1),
+    {_, L} = lists:unzip(L2),
+    L.
+
 %%================================================================================
 %% Internal functions
 %%================================================================================
@@ -408,15 +424,12 @@ is_fully_acked(_, _, #srs{
 is_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) ->
     (Comm1 >= S1) andalso (Comm2 >= S2).
 
--spec shuffle([A]) -> [A].
-shuffle(L0) ->
-    L1 = lists:map(
-        fun(A) ->
-            %% maybe topic/stream prioritization could be introduced here?
-            {rand:uniform(), A}
+fold_proper_subscriptions(Fun, Acc, S) ->
+    emqx_persistent_session_ds_state:fold_subscriptions(
+        fun
+            (#share{}, _Sub, Acc0) -> Acc0;
+            (TopicFilter, Sub, Acc0) -> Fun(TopicFilter, Sub, Acc0)
         end,
-        L0
-    ),
-    L2 = lists:sort(L1),
-    {_, L} = lists:unzip(L2),
-    L.
+        Acc,
+        S
+    ).

+ 16 - 16
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_subs.erl

@@ -30,8 +30,7 @@
     on_session_drop/2,
     gc/1,
     lookup/2,
-    to_map/1,
-    fold/3
+    to_map/1
 ]).
 
 %% Management API:
@@ -160,7 +159,7 @@ on_unsubscribe(SessionId, TopicFilter, S0) ->
 
 -spec on_session_drop(emqx_persistent_session_ds:id(), emqx_persistent_session_ds_state:t()) -> ok.
 on_session_drop(SessionId, S0) ->
-    fold(
+    _ = fold_proper_subscriptions(
         fun(TopicFilter, _Subscription, S) ->
             case on_unsubscribe(SessionId, TopicFilter, S) of
                 {ok, S1, _} -> S1;
@@ -169,7 +168,8 @@ on_session_drop(SessionId, S0) ->
         end,
         S0,
         S0
-    ).
+    ),
+    ok.
 
 %% @doc Remove subscription states that don't have a parent, and that
 %% don't have any unacked messages:
@@ -210,7 +210,7 @@ gc(S0) ->
         S0
     ).
 
-%% @doc Fold over active subscriptions:
+%% @doc Lookup a subscription and merge it with its current state:
 -spec lookup(emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_state:t()) ->
     emqx_persistent_session_ds:subscription() | undefined.
 lookup(TopicFilter, S) ->
@@ -230,22 +230,12 @@ lookup(TopicFilter, S) ->
 %% purpose:
 -spec to_map(emqx_persistent_session_ds_state:t()) -> map().
 to_map(S) ->
-    fold(
+    fold_proper_subscriptions(
         fun(TopicFilter, _, Acc) -> Acc#{TopicFilter => lookup(TopicFilter, S)} end,
         #{},
         S
     ).
 
-%% @doc Fold over active subscriptions:
--spec fold(
-    fun((emqx_types:topic(), emqx_persistent_session_ds:subscription(), Acc) -> Acc),
-    Acc,
-    emqx_persistent_session_ds_state:t()
-) ->
-    Acc.
-fold(Fun, Acc, S) ->
-    emqx_persistent_session_ds_state:fold_subscriptions(Fun, Acc, S).
-
 -spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) ->
     emqx_persistent_session_ds:subscription() | undefined.
 cold_get_subscription(SessionId, Topic) ->
@@ -267,5 +257,15 @@ cold_get_subscription(SessionId, Topic) ->
 %% Internal functions
 %%================================================================================
 
+fold_proper_subscriptions(Fun, Acc, S) ->
+    emqx_persistent_session_ds_state:fold_subscriptions(
+        fun
+            (#share{}, _Sub, Acc0) -> Acc0;
+            (TopicFilter, Sub, Acc0) -> Fun(TopicFilter, Sub, Acc0)
+        end,
+        Acc,
+        S
+    ).
+
 now_ms() ->
     erlang:system_time(millisecond).

+ 7 - 0
apps/emqx/src/emqx_persistent_session_ds/session_internals.hrl

@@ -71,4 +71,11 @@
     sub_state_id :: emqx_persistent_session_ds_subs:subscription_state_id()
 }).
 
+%% (Erlang) messages that session should forward to the
+%% shared subscription handler.
+-record(shared_sub_message, {
+    message :: term()
+}).
+-define(shared_sub_message(MSG), #shared_sub_message{message = MSG}).
+
 -endif.

+ 13 - 0
apps/emqx/src/emqx_session.erl

@@ -83,6 +83,7 @@
 
 -export([
     deliver/3,
+    handle_info/2,
     handle_timeout/3,
     disconnect/3,
     terminate/3
@@ -188,6 +189,10 @@
 -callback destroy(t() | clientinfo()) -> ok.
 -callback clear_will_message(t()) -> t().
 -callback publish_will_message_now(t(), message()) -> t().
+-callback handle_timeout(clientinfo(), common_timer_name() | custom_timer_name(), t()) ->
+    {ok, replies(), t()}
+    | {ok, replies(), timeout(), t()}.
+-callback handle_info(term(), t()) -> t().
 
 %%--------------------------------------------------------------------
 %% Create a Session
@@ -484,6 +489,14 @@ enrich_subopts(_Opt, _V, Msg, _) ->
 handle_timeout(ClientInfo, Timer, Session) ->
     ?IMPL(Session):handle_timeout(ClientInfo, Timer, Session).
 
+%%--------------------------------------------------------------------
+%% Generic Messages
+%%--------------------------------------------------------------------
+
+-spec handle_info(term(), t()) -> t().
+handle_info(Info, Session) ->
+    ?IMPL(Session):handle_info(Info, Session).
+
 %%--------------------------------------------------------------------
 
 -spec ensure_timer(custom_timer_name(), timeout(), map()) ->

+ 10 - 0
apps/emqx/src/emqx_session_mem.erl

@@ -87,6 +87,7 @@
     deliver/3,
     replay/3,
     handle_timeout/3,
+    handle_info/2,
     disconnect/2,
     terminate/2
 ]).
@@ -597,6 +598,15 @@ handle_timeout(ClientInfo, retry_delivery, Session) ->
 handle_timeout(ClientInfo, expire_awaiting_rel, Session) ->
     expire(ClientInfo, Session).
 
+%%--------------------------------------------------------------------
+%% Geneic messages
+%%--------------------------------------------------------------------
+
+-spec handle_info(term(), session()) -> session().
+handle_info(Msg, Session) ->
+    ?SLOG(warning, #{msg => emqx_session_mem_unknown_message, message => Msg}),
+    Session.
+
 %%--------------------------------------------------------------------
 %% Retry Delivery
 %%--------------------------------------------------------------------

+ 94 - 0
apps/emqx_ds_shared_sub/BSL.txt

@@ -0,0 +1,94 @@
+Business Source License 1.1
+
+Licensor:             Hangzhou EMQ Technologies Co., Ltd.
+Licensed Work:        EMQX Enterprise Edition
+                      The Licensed Work is (c) 2024
+                      Hangzhou EMQ Technologies Co., Ltd.
+Additional Use Grant: Students and educators are granted right to copy,
+                      modify, and create derivative work for research
+                      or education.
+Change Date:          2028-05-30
+Change License:       Apache License, Version 2.0
+
+For information about alternative licensing arrangements for the Software,
+please contact Licensor: https://www.emqx.com/en/contact
+
+Notice
+
+The Business Source License (this document, or the “License”) is not an Open
+Source license. However, the Licensed Work will eventually be made available
+under an Open Source License, as stated in this License.
+
+License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
+“Business Source License” is a trademark of MariaDB Corporation Ab.
+
+-----------------------------------------------------------------------------
+
+Business Source License 1.1
+
+Terms
+
+The Licensor hereby grants you the right to copy, modify, create derivative
+works, redistribute, and make non-production use of the Licensed Work. The
+Licensor may make an Additional Use Grant, above, permitting limited
+production use.
+
+Effective on the Change Date, or the fourth anniversary of the first publicly
+available distribution of a specific version of the Licensed Work under this
+License, whichever comes first, the Licensor hereby grants you rights under
+the terms of the Change License, and the rights granted in the paragraph
+above terminate.
+
+If your use of the Licensed Work does not comply with the requirements
+currently in effect as described in this License, you must purchase a
+commercial license from the Licensor, its affiliated entities, or authorized
+resellers, or you must refrain from using the Licensed Work.
+
+All copies of the original and modified Licensed Work, and derivative works
+of the Licensed Work, are subject to this License. This License applies
+separately for each version of the Licensed Work and the Change Date may vary
+for each version of the Licensed Work released by Licensor.
+
+You must conspicuously display this License on each original or modified copy
+of the Licensed Work. If you receive the Licensed Work in original or
+modified form from a third party, the terms and conditions set forth in this
+License apply to your use of that work.
+
+Any use of the Licensed Work in violation of this License will automatically
+terminate your rights under this License for the current and all other
+versions of the Licensed Work.
+
+This License does not grant you any right in any trademark or logo of
+Licensor or its affiliates (provided that you may use a trademark or logo of
+Licensor as expressly required by this License).
+
+TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
+AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
+EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
+TITLE.
+
+MariaDB hereby grants you permission to use this License’s text to license
+your works, and to refer to it using the trademark “Business Source License”,
+as long as you comply with the Covenants of Licensor below.
+
+Covenants of Licensor
+
+In consideration of the right to use this License’s text and the “Business
+Source License” name and trademark, Licensor covenants to MariaDB, and to all
+other recipients of the licensed work to be provided by Licensor:
+
+1. To specify as the Change License the GPL Version 2.0 or any later version,
+   or a license that is compatible with GPL Version 2.0 or a later version,
+   where “compatible” means that software provided under the Change License can
+   be included in a program with software provided under GPL Version 2.0 or a
+   later version. Licensor may specify additional Change Licenses without
+   limitation.
+
+2. To either: (a) specify an additional grant of rights to use that does not
+   impose any additional restriction on the right granted in this License, as
+   the Additional Use Grant; or (b) insert the text “None”.
+
+3. To specify a Change Date.
+
+4. Not to modify this License in any other way.

+ 9 - 0
apps/emqx_ds_shared_sub/README.md

@@ -0,0 +1,9 @@
+# EMQX Durable Shared Subscriptions
+
+# Contributing
+
+Please see our [contributing.md](../../CONTRIBUTING.md).
+
+# License
+
+EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).

+ 6 - 0
apps/emqx_ds_shared_sub/rebar.config

@@ -0,0 +1,6 @@
+%% -*- mode: erlang; -*-
+
+{erl_opts, [debug_info]}.
+{deps, [
+    {emqx, {path, "../../apps/emqx"}}
+]}.

+ 15 - 0
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub.app.src

@@ -0,0 +1,15 @@
+{application, emqx_ds_shared_sub, [
+    {description, "EMQX DS Shared Subscriptions"},
+    {vsn, "0.1.0"},
+    {registered, [emqx_ds_shared_sub_sup]},
+    {mod, {emqx_ds_shared_sub_app, []}},
+    {applications, [
+        kernel,
+        stdlib,
+        emqx
+    ]},
+    {env, []},
+    {modules, []},
+
+    {links, []}
+]}.

+ 23 - 0
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_app.erl

@@ -0,0 +1,23 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_shared_sub_app).
+
+-behaviour(application).
+
+%% application behaviour callbacks
+-export([start/2, stop/1]).
+
+%%------------------------------------------------------------------------------
+%% application behaviour callbacks
+%%------------------------------------------------------------------------------
+
+-spec start(application:start_type(), term()) -> {ok, pid()}.
+start(_Type, _Args) ->
+    {ok, Sup} = emqx_ds_shared_sub_sup:start_link(),
+    {ok, Sup}.
+
+-spec stop(term()) -> ok.
+stop(_State) ->
+    ok.

+ 33 - 0
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_sup.erl

@@ -0,0 +1,33 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_shared_sub_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% supervisor behaviour callbacks
+-export([init/1]).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%%------------------------------------------------------------------------------
+%% supervisor behaviour callbacks
+%%------------------------------------------------------------------------------
+
+init([]) ->
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 10,
+        period => 10
+    },
+    ChildSpecs = [],
+    {ok, {SupFlags, ChildSpecs}}.

+ 5 - 0
apps/emqx_ds_shared_sub/src/emqx_ds_shared_subs.erl

@@ -0,0 +1,5 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_shared_subs).

+ 2 - 1
apps/emqx_machine/priv/reboot_lists.eterm

@@ -130,7 +130,8 @@
             emqx_gateway_ocpp,
             emqx_gateway_jt808,
             emqx_bridge_syskeeper,
-            emqx_bridge_confluent
+            emqx_bridge_confluent,
+            emqx_ds_shared_sub
         ],
     %% must always be of type `load'
     ce_business_apps =>

+ 2 - 1
mix.exs

@@ -202,7 +202,8 @@ defmodule EMQXUmbrella.MixProject do
       :emqx_gateway_gbt32960,
       :emqx_gateway_ocpp,
       :emqx_gateway_jt808,
-      :emqx_bridge_syskeeper
+      :emqx_bridge_syskeeper,
+      :emqx_ds_shared_sub
     ])
   end
 

+ 1 - 0
rebar.config.erl

@@ -120,6 +120,7 @@ is_community_umbrella_app("apps/emqx_bridge_syskeeper") -> false;
 is_community_umbrella_app("apps/emqx_schema_validation") -> false;
 is_community_umbrella_app("apps/emqx_eviction_agent") -> false;
 is_community_umbrella_app("apps/emqx_node_rebalance") -> false;
+is_community_umbrella_app("apps/emqx_ds_shared_sub") -> false;
 is_community_umbrella_app(_) -> true.
 
 %% BUILD_WITHOUT_JQ