浏览代码

fix(sessds): Immutable subscriptions

This commit fixes two issues:
- Behavior of overlapping subscriptions has been aligned with the
in-memory session.

- Fixed handling of replays when subscription changes (either by
client or EMQX configuration)
ieQu1 1 年之前
父节点
当前提交
b30ddc206e

+ 85 - 81
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -116,15 +116,42 @@
 %% Currently, this is the clientid.  We avoid `emqx_types:clientid()' because that can be
 %% an atom, in theory (?).
 -type id() :: binary().
--type topic_filter() :: emqx_types:topic().
+-type topic_filter() :: emqx_types:topic() | #share{}.
+
+%% Subscription and subscription states:
+%%
+%% Persistent sessions cannot simply update or delete subscriptions,
+%% since subscription parameters must be exactly the same during
+%% replay.
+%%
+%% To solve this problem, we store subscriptions in a twofold manner:
+%%
+%% - `subscription' is an object that holds up-to-date information
+%% about the client's subscription and a reference to the latest
+%% subscription state id
+%%
+%% - `subscription_state' is an immutable object that holds
+%% information about the subcription parameters at a certain point of
+%% time
+%%
+%% New subscription states are created whenever the client subscribes
+%% to a topics, or updates an existing subscription.
+%%
+%% Stream replay states contain references to the subscription states.
+%%
+%% Outdated subscription states are discarded when they are not
+%% referenced by either subscription or stream replay state objects.
 
 -type subscription_id() :: integer().
 
+%% This type is a result of merging
+%% `emqx_persistent_session_ds_subs:subscription()' with its current
+%% state.
 -type subscription() :: #{
     id := subscription_id(),
     start_time := emqx_ds:time(),
-    props := map(),
-    deleted := boolean()
+    current_state := emqx_persistent_session_ds_subs:subscription_state_id(),
+    subopts := map()
 }.
 
 -define(TIMER_PULL, timer_pull).
@@ -252,7 +279,7 @@ info(is_persistent, #{}) ->
 info(subscriptions, #{s := S}) ->
     emqx_persistent_session_ds_subs:to_map(S);
 info(subscriptions_cnt, #{s := S}) ->
-    emqx_topic_gbt:size(emqx_persistent_session_ds_state:get_subscriptions(S));
+    emqx_persistent_session_ds_state:n_subscriptions(S);
 info(subscriptions_max, #{props := Conf}) ->
     maps:get(max_subscriptions, Conf);
 info(upgrade_qos, #{props := Conf}) ->
@@ -340,53 +367,20 @@ subscribe(
 subscribe(
     TopicFilter,
     SubOpts,
-    Session = #{id := ID, s := S0}
+    Session = #{id := ID, s := S0, props := #{upgrade_qos := UpgradeQoS}}
 ) ->
-    case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of
-        undefined ->
-            %% TODO: max subscriptions
-
-            %% N.B.: we chose to update the router before adding the
-            %% subscription to the session/iterator table. The
-            %% reasoning for this is as follows:
-            %%
-            %% Messages matching this topic filter should start to be
-            %% persisted as soon as possible to avoid missing
-            %% messages. If this is the first such persistent session
-            %% subscription, it's important to do so early on.
-            %%
-            %% This could, in turn, lead to some inconsistency: if
-            %% such a route gets created but the session/iterator data
-            %% fails to be updated accordingly, we have a dangling
-            %% route. To remove such dangling routes, we may have a
-            %% periodic GC process that removes routes that do not
-            %% have a matching persistent subscription. Also, route
-            %% operations use dirty mnesia operations, which
-            %% inherently have room for inconsistencies.
-            %%
-            %% In practice, we use the iterator reference table as a
-            %% source of truth, since it is guarded by a transaction
-            %% context: we consider a subscription operation to be
-            %% successful if it ended up changing this table. Both
-            %% router and iterator information can be reconstructed
-            %% from this table, if needed.
-            ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID),
-            {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
-            Subscription = #{
-                start_time => now_ms(),
-                props => SubOpts,
-                id => SubId,
-                deleted => false
-            },
-            IsNew = true;
-        Subscription0 = #{} ->
-            Subscription = Subscription0#{props => SubOpts},
-            IsNew = false,
-            S1 = S0
+    {UpdateRouter, S1} = emqx_persistent_session_ds_subs:on_subscribe(
+        TopicFilter, UpgradeQoS, SubOpts, S0
+    ),
+    case UpdateRouter of
+        true ->
+            ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID);
+        false ->
+            ok
     end,
-    S = emqx_persistent_session_ds_subs:on_subscribe(TopicFilter, Subscription, S1),
+    S = emqx_persistent_session_ds_state:commit(S1),
     ?tp(persistent_session_ds_subscription_added, #{
-        topic_filter => TopicFilter, sub => Subscription, is_new => IsNew
+        topic_filter => TopicFilter, is_new => UpdateRouter
     }),
     {ok, Session#{s => S}}.
 
@@ -399,15 +393,15 @@ unsubscribe(
     case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of
         undefined ->
             {error, ?RC_NO_SUBSCRIPTION_EXISTED};
-        Subscription = #{props := SubOpts} ->
+        Subscription = #{subopts := SubOpts} ->
             S = do_unsubscribe(ID, TopicFilter, Subscription, S0),
             {ok, Session#{s => S}, SubOpts}
     end.
 
 -spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) ->
     emqx_persistent_session_ds_state:t().
-do_unsubscribe(SessionId, TopicFilter, Subscription = #{id := SubId}, S0) ->
-    S1 = emqx_persistent_session_ds_subs:on_unsubscribe(TopicFilter, Subscription, S0),
+do_unsubscribe(SessionId, TopicFilter, #{id := SubId}, S0) ->
+    S1 = emqx_persistent_session_ds_subs:on_unsubscribe(TopicFilter, S0),
     ?tp(persistent_session_ds_subscription_delete, #{
         session_id => SessionId, topic_filter => TopicFilter
     }),
@@ -426,7 +420,7 @@ get_subscription(#share{}, _) ->
     undefined;
 get_subscription(TopicFilter, #{s := S}) ->
     case emqx_persistent_session_ds_subs:lookup(TopicFilter, S) of
-        _Subscription = #{props := SubOpts} ->
+        #{subopts := SubOpts} ->
             SubOpts;
         undefined ->
             undefined
@@ -716,7 +710,7 @@ list_client_subscriptions(ClientId) ->
             %% TODO: this is not the most optimal implementation, since it
             %% should be possible to avoid reading extra data (streams, etc.)
             case print_session(ClientId) of
-                Sess = #{s := #{subscriptions := Subs}} ->
+                Sess = #{s := #{subscriptions := Subs, subscription_states := SStates}} ->
                     Node =
                         case Sess of
                             #{'_alive' := {true, Pid}} ->
@@ -726,8 +720,9 @@ list_client_subscriptions(ClientId) ->
                         end,
                     SubList =
                         maps:fold(
-                            fun(Topic, #{props := SubProps}, Acc) ->
-                                Elem = {Topic, SubProps},
+                            fun(Topic, #{current_state := CS}, Acc) ->
+                                #{subopts := SubOpts} = maps:get(CS, SStates),
+                                Elem = {Topic, SubOpts},
                                 [Elem | Acc]
                             end,
                             [],
@@ -945,22 +940,31 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
             Session0
     end.
 
-enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0}, ClientInfo) ->
+enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0, s := S}, ClientInfo) ->
     #srs{
         it_begin = ItBegin0,
         it_end = ItEnd0,
         first_seqno_qos1 = FirstSeqnoQos1,
-        first_seqno_qos2 = FirstSeqnoQos2
+        first_seqno_qos2 = FirstSeqnoQos2,
+        sub_state_id = SubStateId
     } = Srs0,
     ItBegin =
         case IsReplay of
             true -> ItBegin0;
             false -> ItEnd0
         end,
+    SubState = #{} = emqx_persistent_session_ds_state:get_subscription_state(SubStateId, S),
     case emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, BatchSize) of
         {ok, ItEnd, Messages} ->
             {Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch(
-                IsReplay, Session, ClientInfo, FirstSeqnoQos1, FirstSeqnoQos2, Messages, Inflight0
+                IsReplay,
+                Session,
+                SubState,
+                ClientInfo,
+                FirstSeqnoQos1,
+                FirstSeqnoQos2,
+                Messages,
+                Inflight0
             ),
             Srs = Srs0#srs{
                 it_begin = ItBegin,
@@ -984,27 +988,29 @@ enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0}, Cli
 %% key_of_iter(#{3 := #{3 := #{5 := K}}}) ->
 %%     K.
 
-process_batch(_IsReplay, _Session, _ClientInfo, LastSeqNoQos1, LastSeqNoQos2, [], Inflight) ->
+process_batch(
+    _IsReplay, _Session, _SubState, _ClientInfo, LastSeqNoQos1, LastSeqNoQos2, [], Inflight
+) ->
     {Inflight, LastSeqNoQos1, LastSeqNoQos2};
 process_batch(
-    IsReplay, Session, ClientInfo, FirstSeqNoQos1, FirstSeqNoQos2, [KV | Messages], Inflight0
+    IsReplay,
+    Session,
+    SubState,
+    ClientInfo,
+    FirstSeqNoQos1,
+    FirstSeqNoQos2,
+    [KV | Messages],
+    Inflight0
 ) ->
-    #{s := S, props := #{upgrade_qos := UpgradeQoS}} = Session,
-    {_DsMsgKey, Msg0 = #message{topic = Topic}} = KV,
+    #{s := S} = Session,
+    #{upgrade_qos := UpgradeQoS, subopts := SubOpts} = SubState,
+    {_DsMsgKey, Msg0} = KV,
     Comm1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
     Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
     Dup1 = emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_1), S),
     Dup2 = emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_2), S),
     Rec = emqx_persistent_session_ds_state:get_seqno(?rec, S),
-    Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
-    Msgs = [
-        Msg
-     || SubMatch <- emqx_topic_gbt:matches(Topic, Subs, []),
-        Msg <- begin
-            #{props := SubOpts} = emqx_topic_gbt:get_record(SubMatch, Subs),
-            emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS)
-        end
-    ],
+    Msgs = emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS),
     {Inflight, LastSeqNoQos1, LastSeqNoQos2} = lists:foldl(
         fun(Msg = #message{qos = Qos}, {Acc, SeqNoQos10, SeqNoQos20}) ->
             case Qos of
@@ -1060,7 +1066,7 @@ process_batch(
         Msgs
     ),
     process_batch(
-        IsReplay, Session, ClientInfo, LastSeqNoQos1, LastSeqNoQos2, Messages, Inflight
+        IsReplay, Session, SubState, ClientInfo, LastSeqNoQos1, LastSeqNoQos2, Messages, Inflight
     ).
 
 %%--------------------------------------------------------------------
@@ -1077,15 +1083,13 @@ enqueue_transient(ClientInfo, Msg0, Session = #{s := S, props := #{upgrade_qos :
     %% queued messages. Since streams in this DB are exclusive to the
     %% session, messages from the queue can be dropped as soon as they
     %% are acked.
-    Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
-    Msgs = [
-        Msg
-     || SubMatch <- emqx_topic_gbt:matches(Msg0#message.topic, Subs, []),
-        Msg <- begin
-            #{props := SubOpts} = emqx_topic_gbt:get_record(SubMatch, Subs),
-            emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS)
-        end
-    ],
+    case emqx_persistent_session_ds_state:get_subscription(Msg0#message.topic, S) of
+        #{current_state := CS} ->
+            #{subopts := SubOpts} = emqx_persistent_session_ds_state:get_subscription_state(CS, S);
+        undefined ->
+            SubOpts = undefined
+    end,
+    Msgs = emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS),
     lists:foldl(fun do_enqueue_transient/2, Session, Msgs).
 
 do_enqueue_transient(Msg = #message{qos = Qos}, Session = #{inflight := Inflight0, s := S0}) ->

+ 3 - 1
apps/emqx/src/emqx_persistent_session_ds.hrl

@@ -65,7 +65,9 @@
     last_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno(),
     %% This stream belongs to an unsubscribed topic-filter, and is
     %% marked for deletion:
-    unsubscribed = false :: boolean()
+    unsubscribed = false :: boolean(),
+    %% Reference to the subscription state:
+    sub_state_id :: emqx_persistent_session_ds_subs:subscription_state_id()
 }).
 
 %% Session metadata keys:

+ 117 - 90
apps/emqx/src/emqx_persistent_session_ds_state.erl

@@ -37,7 +37,19 @@
 -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3, n_streams/1]).
 -export([get_seqno/2, put_seqno/3]).
 -export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]).
--export([get_subscriptions/1, put_subscription/4, del_subscription/3]).
+-export([
+    get_subscription_state/2,
+    fold_subscription_states/3,
+    put_subscription_state/3,
+    del_subscription_state/2
+]).
+-export([
+    get_subscription/2,
+    fold_subscriptions/3,
+    n_subscriptions/1,
+    put_subscription/3,
+    del_subscription/2
+]).
 -export([
     get_awaiting_rel/2,
     put_awaiting_rel/3,
@@ -51,7 +63,6 @@
 -export_type([
     t/0,
     metadata/0,
-    subscriptions/0,
     seqno_type/0,
     stream_key/0,
     rank_key/0,
@@ -69,8 +80,6 @@
 
 -type message() :: emqx_types:message().
 
--type subscriptions() :: emqx_topic_gbt:t(_SubId, emqx_persistent_session_ds:subscription()).
-
 -opaque session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'.
 
 %% Generic key-value wrapper that is used for exporting arbitrary
@@ -121,7 +130,13 @@
     id := emqx_persistent_session_ds:id(),
     dirty := boolean(),
     metadata := metadata(),
-    subscriptions := subscriptions(),
+    subscriptions := pmap(
+        emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_subs:subscription()
+    ),
+    subscription_states := pmap(
+        emqx_persistent_session_ds_subs:subscription_state_id(),
+        emqx_persistent_session_ds_subs:subscription_state()
+    ),
     seqnos := pmap(seqno_type(), emqx_persistent_session_ds:seqno()),
     streams := pmap(emqx_ds:stream(), emqx_persistent_session_ds:stream_state()),
     ranks := pmap(term(), integer()),
@@ -130,11 +145,20 @@
 
 -define(session_tab, emqx_ds_session_tab).
 -define(subscription_tab, emqx_ds_session_subscriptions).
+-define(subscription_states_tab, emqx_ds_session_subscription_states).
 -define(stream_tab, emqx_ds_session_streams).
 -define(seqno_tab, emqx_ds_session_seqnos).
 -define(rank_tab, emqx_ds_session_ranks).
 -define(awaiting_rel_tab, emqx_ds_session_awaiting_rel).
--define(pmap_tables, [?stream_tab, ?seqno_tab, ?rank_tab, ?subscription_tab, ?awaiting_rel_tab]).
+
+-define(pmaps, [
+    {subscriptions, ?subscription_tab},
+    {subscription_states, ?subscription_states_tab},
+    {streams, ?stream_tab},
+    {seqnos, ?seqno_tab},
+    {ranks, ?rank_tab},
+    {awaiting_rel, ?awaiting_rel_tab}
+]).
 
 %% Enable this flag if you suspect some code breaks the sequence:
 -ifndef(CHECK_SEQNO).
@@ -161,24 +185,25 @@ create_tables() ->
             {attributes, record_info(fields, kv)}
         ]
     ),
-    [create_kv_pmap_table(Table) || Table <- ?pmap_tables],
-    mria:wait_for_tables([?session_tab | ?pmap_tables]).
+    {_, PmapTables} = lists:unzip(?pmaps),
+    [create_kv_pmap_table(Table) || Table <- PmapTables],
+    mria:wait_for_tables([?session_tab | PmapTables]).
 
 -spec open(emqx_persistent_session_ds:id()) -> {ok, t()} | undefined.
 open(SessionId) ->
     ro_transaction(fun() ->
         case kv_restore(?session_tab, SessionId) of
             [Metadata] ->
-                Rec = #{
-                    id => SessionId,
-                    metadata => Metadata,
-                    subscriptions => read_subscriptions(SessionId),
-                    streams => pmap_open(?stream_tab, SessionId),
-                    seqnos => pmap_open(?seqno_tab, SessionId),
-                    ranks => pmap_open(?rank_tab, SessionId),
-                    awaiting_rel => pmap_open(?awaiting_rel_tab, SessionId),
-                    ?unset_dirty
-                },
+                Rec = update_pmaps(
+                    fun(_Pmap, Table) ->
+                        pmap_open(Table, SessionId)
+                    end,
+                    #{
+                        id => SessionId,
+                        metadata => Metadata,
+                        ?unset_dirty
+                    }
+                ),
                 {ok, Rec};
             [] ->
                 undefined
@@ -195,29 +220,13 @@ print_session(SessionId) ->
     end.
 
 -spec format(t()) -> map().
-format(#{
-    metadata := Metadata,
-    subscriptions := SubsGBT,
-    streams := Streams,
-    seqnos := Seqnos,
-    ranks := Ranks,
-    awaiting_rel := AwaitingRel
-}) ->
-    Subs = emqx_topic_gbt:fold(
-        fun(Key, Sub, Acc) ->
-            maps:put(emqx_topic_gbt:get_topic(Key), Sub, Acc)
+format(Rec) ->
+    update_pmaps(
+        fun(Pmap, _Table) ->
+            pmap_format(Pmap)
         end,
-        #{},
-        SubsGBT
-    ),
-    #{
-        metadata => Metadata,
-        subscriptions => Subs,
-        streams => pmap_format(Streams),
-        seqnos => pmap_format(Seqnos),
-        ranks => pmap_format(Ranks),
-        awaiting_rel => pmap_format(AwaitingRel)
-    }.
+        maps:without([id, dirty], Rec)
+    ).
 
 -spec list_sessions() -> [emqx_persistent_session_ds:id()].
 list_sessions() ->
@@ -227,7 +236,7 @@ list_sessions() ->
 delete(Id) ->
     transaction(
         fun() ->
-            [kv_pmap_delete(Table, Id) || Table <- ?pmap_tables],
+            [kv_pmap_delete(Table, Id) || {_, Table} <- ?pmaps],
             mnesia:delete(?session_tab, Id, write)
         end
     ).
@@ -238,39 +247,34 @@ commit(Rec = #{dirty := false}) ->
 commit(
     Rec = #{
         id := SessionId,
-        metadata := Metadata,
-        streams := Streams,
-        seqnos := SeqNos,
-        ranks := Ranks,
-        awaiting_rel := AwaitingRel
+        metadata := Metadata
     }
 ) ->
     check_sequence(Rec),
     transaction(fun() ->
         kv_persist(?session_tab, SessionId, Metadata),
-        Rec#{
-            streams => pmap_commit(SessionId, Streams),
-            seqnos => pmap_commit(SessionId, SeqNos),
-            ranks => pmap_commit(SessionId, Ranks),
-            awaiting_rel => pmap_commit(SessionId, AwaitingRel),
-            ?unset_dirty
-        }
+        update_pmaps(
+            fun(Pmap, _Table) ->
+                pmap_commit(SessionId, Pmap)
+            end,
+            Rec#{?unset_dirty}
+        )
     end).
 
 -spec create_new(emqx_persistent_session_ds:id()) -> t().
 create_new(SessionId) ->
     transaction(fun() ->
         delete(SessionId),
-        #{
-            id => SessionId,
-            metadata => #{},
-            subscriptions => emqx_topic_gbt:new(),
-            streams => pmap_open(?stream_tab, SessionId),
-            seqnos => pmap_open(?seqno_tab, SessionId),
-            ranks => pmap_open(?rank_tab, SessionId),
-            awaiting_rel => pmap_open(?awaiting_rel_tab, SessionId),
-            ?set_dirty
-        }
+        update_pmaps(
+            fun(_Pmap, Table) ->
+                pmap_open(Table, SessionId)
+            end,
+            #{
+                id => SessionId,
+                metadata => #{},
+                ?set_dirty
+            }
+        )
     end).
 
 %%
@@ -351,30 +355,53 @@ new_id(Rec) ->
 
 %%
 
--spec get_subscriptions(t()) -> subscriptions().
-get_subscriptions(#{subscriptions := Subs}) ->
-    Subs.
+-spec get_subscription(emqx_persistent_session_ds:topic_filter(), t()) ->
+    emqx_persistent_session_ds_subs:subscription() | undefined.
+get_subscription(TopicFilter, Rec) ->
+    gen_get(subscriptions, TopicFilter, Rec).
+
+-spec fold_subscriptions(fun(), Acc, t()) -> Acc.
+fold_subscriptions(Fun, Acc, Rec) ->
+    gen_fold(subscriptions, Fun, Acc, Rec).
+
+-spec n_subscriptions(t()) -> non_neg_integer().
+n_subscriptions(Rec) ->
+    gen_size(subscriptions, Rec).
 
 -spec put_subscription(
     emqx_persistent_session_ds:topic_filter(),
-    _SubId,
-    emqx_persistent_session_ds:subscription(),
+    emqx_persistent_session_ds_subs:subscription(),
     t()
 ) -> t().
-put_subscription(TopicFilter, SubId, Subscription, Rec = #{id := Id, subscriptions := Subs0}) ->
-    %% Note: currently changes to the subscriptions are persisted immediately.
-    Key = {TopicFilter, SubId},
-    transaction(fun() -> kv_pmap_persist(?subscription_tab, Id, Key, Subscription) end),
-    Subs = emqx_topic_gbt:insert(TopicFilter, SubId, Subscription, Subs0),
-    Rec#{subscriptions => Subs}.
-
--spec del_subscription(emqx_persistent_session_ds:topic_filter(), _SubId, t()) -> t().
-del_subscription(TopicFilter, SubId, Rec = #{id := Id, subscriptions := Subs0}) ->
-    %% Note: currently the subscriptions are persisted immediately.
-    Key = {TopicFilter, SubId},
-    transaction(fun() -> kv_pmap_delete(?subscription_tab, Id, Key) end),
-    Subs = emqx_topic_gbt:delete(TopicFilter, SubId, Subs0),
-    Rec#{subscriptions => Subs}.
+put_subscription(TopicFilter, Subscription, Rec) ->
+    gen_put(subscriptions, TopicFilter, Subscription, Rec).
+
+-spec del_subscription(emqx_persistent_session_ds:topic_filter(), t()) -> t().
+del_subscription(TopicFilter, Rec) ->
+    gen_del(subscriptions, TopicFilter, Rec).
+
+%%
+
+-spec get_subscription_state(emqx_persistent_session_ds_subs:subscription_state_id(), t()) ->
+    emqx_persistent_session_ds_subs:subscription_state() | undefined.
+get_subscription_state(SStateId, Rec) ->
+    gen_get(subscription_states, SStateId, Rec).
+
+-spec fold_subscription_states(fun(), Acc, t()) -> Acc.
+fold_subscription_states(Fun, Acc, Rec) ->
+    gen_fold(subscription_states, Fun, Acc, Rec).
+
+-spec put_subscription_state(
+    emqx_persistent_session_ds_subs:subscription_state_id(),
+    emqx_persistent_session_ds_subs:subscription_state(),
+    t()
+) -> t().
+put_subscription_state(SStateId, SState, Rec) ->
+    gen_put(subscription_states, SStateId, SState, Rec).
+
+-spec del_subscription_state(emqx_persistent_session_ds_subs:subscription_state_id(), t()) -> t().
+del_subscription_state(SStateId, Rec) ->
+    gen_del(subscription_states, SStateId, Rec).
 
 %%
 
@@ -522,16 +549,16 @@ gen_size(Field, Rec) ->
     check_sequence(Rec),
     pmap_size(maps:get(Field, Rec)).
 
-%%
-
-read_subscriptions(SessionId) ->
-    Records = kv_pmap_restore(?subscription_tab, SessionId),
+-spec update_pmaps(fun((pmap(_K, _V) | undefined, atom()) -> term()), map()) -> map().
+update_pmaps(Fun, Map) ->
     lists:foldl(
-        fun({{TopicFilter, SubId}, Subscription}, Acc) ->
-            emqx_topic_gbt:insert(TopicFilter, SubId, Subscription, Acc)
+        fun({MapKey, Table}, Acc) ->
+            OldVal = maps:get(MapKey, Map, undefined),
+            Val = Fun(OldVal, Table),
+            maps:put(MapKey, Val, Acc)
         end,
-        emqx_topic_gbt:new(),
-        Records
+        Map,
+        ?pmaps
     ).
 
 %%

+ 40 - 6
apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl

@@ -126,9 +126,10 @@ find_new_streams(S) ->
 renew_streams(S0) ->
     S1 = remove_unsubscribed_streams(S0),
     S2 = remove_fully_replayed_streams(S1),
+    S3 = update_stream_subscription_state_ids(S2),
     emqx_persistent_session_ds_subs:fold(
         fun
-            (Key, #{start_time := StartTime, id := SubId, deleted := false}, Acc) ->
+            (Key, #{start_time := StartTime, id := SubId, current_state := SStateId}, Acc) ->
                 TopicFilter = emqx_topic:words(Key),
                 Streams = select_streams(
                     SubId,
@@ -137,7 +138,7 @@ renew_streams(S0) ->
                 ),
                 lists:foldl(
                     fun(I, Acc1) ->
-                        ensure_iterator(TopicFilter, StartTime, SubId, I, Acc1)
+                        ensure_iterator(TopicFilter, StartTime, SubId, SStateId, I, Acc1)
                     end,
                     Acc,
                     Streams
@@ -145,8 +146,8 @@ renew_streams(S0) ->
             (_Key, _DeletedSubscription, Acc) ->
                 Acc
         end,
-        S2,
-        S2
+        S3,
+        S3
     ).
 
 -spec on_unsubscribe(
@@ -201,7 +202,7 @@ is_fully_acked(Srs, S) ->
 %% Internal functions
 %%================================================================================
 
-ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
+ensure_iterator(TopicFilter, StartTime, SubId, SStateId, {{RankX, RankY}, Stream}, S) ->
     Key = {SubId, Stream},
     case emqx_persistent_session_ds_state:get_stream(Key, S) of
         undefined ->
@@ -214,7 +215,8 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
                         rank_x = RankX,
                         rank_y = RankY,
                         it_begin = Iterator,
-                        it_end = Iterator
+                        it_end = Iterator,
+                        sub_state_id = SStateId
                     },
                     emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S);
                 {error, recoverable, Reason} ->
@@ -350,6 +352,38 @@ remove_fully_replayed_streams(S0) ->
         S1
     ).
 
+%% @doc Update subscription state IDs for all streams that don't have unacked messages
+-spec update_stream_subscription_state_ids(emqx_persistent_session_ds_state:t()) ->
+    emqx_persistent_session_ds_state:t().
+update_stream_subscription_state_ids(S0) ->
+    CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S0),
+    CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S0),
+    %% Find the latest state IDs for each subscription:
+    LastSStateIds = emqx_persistent_session_ds_state:fold_subscriptions(
+        fun(_, #{id := SubId, current_state := SStateId}, Acc) ->
+            Acc#{SubId => SStateId}
+        end,
+        #{},
+        S0
+    ),
+    %% Update subscription state IDs for fully acked streams:
+    emqx_persistent_session_ds_state:fold_streams(
+        fun
+            (_, #srs{unsubscribed = true}, S) ->
+                S;
+            (Key = {SubId, _Stream}, SRS0, S) ->
+                case is_fully_acked(CommQos1, CommQos2, SRS0) of
+                    true ->
+                        SRS = SRS0#srs{sub_state_id = maps:get(SubId, LastSStateIds)},
+                        emqx_persistent_session_ds_state:put_stream(Key, SRS, S);
+                    false ->
+                        S
+                end
+        end,
+        S0,
+        S0
+    ).
+
 %% @doc Compare the streams by the order in which they were replayed.
 compare_streams(
     {_KeyA, #srs{first_seqno_qos1 = A1, first_seqno_qos2 = A2}},

+ 113 - 72
apps/emqx/src/emqx_persistent_session_ds_subs.erl

@@ -25,21 +25,47 @@
 
 %% API:
 -export([
-    on_subscribe/3,
-    on_unsubscribe/3,
+    on_subscribe/4,
+    on_unsubscribe/2,
     gc/1,
     lookup/2,
     to_map/1,
-    fold/3,
-    fold_all/3
+    fold/3
 ]).
 
--export_type([]).
+-export_type([subscription_state_id/0, subscription/0, subscription_state/0]).
+
+-include("emqx_persistent_session_ds.hrl").
 
 %%================================================================================
 %% Type declarations
 %%================================================================================
 
+-type subscription() :: #{
+    %% Session-unique identifier of the subscription. Other objects
+    %% can use it as a compact reference:
+    id := emqx_persistent_session_ds:subscription_id(),
+    %% Reference to the current subscription state:
+    current_state := subscription_state_id(),
+    %% Time when the subscription was added:
+    start_time := emqx_ds:time()
+}.
+
+-type subscription_state_id() :: integer().
+
+-type subscription_state() :: #{
+    parent_subscription := emqx_persistent_session_ds:subscription_id(),
+    upgrade_qos := boolean(),
+    %% SubOpts:
+    subopts := #{
+        nl => _,
+        qos => _,
+        rap => _,
+        subid => _,
+        _ => _
+    }
+}.
+
 %%================================================================================
 %% API functions
 %%================================================================================
@@ -47,41 +73,88 @@
 %% @doc Process a new subscription
 -spec on_subscribe(
     emqx_persistent_session_ds:topic_filter(),
-    emqx_persistent_session_ds:subscription(),
+    boolean(),
+    emqx_types:subopts(),
     emqx_persistent_session_ds_state:t()
 ) ->
-    emqx_persistent_session_ds_state:t().
-on_subscribe(TopicFilter, Subscription, S) ->
-    emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S).
+    {_UpdateRouter :: boolean(), emqx_persistent_session_ds_state:t()}.
+on_subscribe(TopicFilter, UpgradeQoS, SubOpts, S0) ->
+    case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
+        undefined ->
+            %% This is a new subscription:
+            {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
+            {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1),
+            SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts},
+            S3 = emqx_persistent_session_ds_state:put_subscription_state(SStateId, SState, S2),
+            Subscription = #{
+                id => SubId,
+                current_state => SStateId,
+                start_time => now_ms()
+            },
+            S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Subscription, S3),
+            {true, S};
+        Sub0 = #{current_state := SStateId0, id := SubId} ->
+            SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts},
+            case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of
+                SState ->
+                    %% Client resubscribed with the same parameters:
+                    {false, S0};
+                _ ->
+                    %% Subsription parameters changed:
+                    {SStateId, S1} = emqx_persistent_session_ds_state:new_id(S0),
+                    S2 = emqx_persistent_session_ds_state:put_subscription_state(
+                        SStateId, SState, S1
+                    ),
+                    Sub = Sub0#{current_state => SStateId},
+                    S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Sub, S2),
+                    {false, S}
+            end
+    end.
 
 %% @doc Process UNSUBSCRIBE
 -spec on_unsubscribe(
     emqx_persistent_session_ds:topic_filter(),
-    emqx_persistent_session_ds:subscription(),
     emqx_persistent_session_ds_state:t()
 ) ->
     emqx_persistent_session_ds_state:t().
-on_unsubscribe(TopicFilter, Subscription0, S0) ->
-    %% Note: we cannot delete the subscription immediately, since its
-    %% metadata can be used during replay (see `process_batch'). We
-    %% instead mark it as deleted, and let `subscription_gc' function
-    %% dispatch it later:
-    Subscription = Subscription0#{deleted => true},
-    emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S0).
-
-%% @doc Remove subscriptions that have been marked for deletion, and
-%% that don't have any unacked messages:
+on_unsubscribe(TopicFilter, S0) ->
+    emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0).
+
+%% @doc Remove subscription states that don't have a parent, and that
+%% don't have any unacked messages:
 -spec gc(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t().
 gc(S0) ->
-    fold_all(
-        fun(TopicFilter, #{id := SubId, deleted := Deleted}, Acc) ->
-            case Deleted andalso has_no_unacked_streams(SubId, S0) of
-                true ->
-                    emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], Acc);
+    %% Create a set of subscription states IDs referenced either by a
+    %% subscription or a stream replay state:
+    AliveSet0 = emqx_persistent_session_ds_state:fold_subscriptions(
+        fun(_TopicFilter, #{current_state := SStateId}, Acc) ->
+            Acc#{SStateId => true}
+        end,
+        #{},
+        S0
+    ),
+    AliveSet = emqx_persistent_session_ds_state:fold_streams(
+        fun(_StreamId, SRS = #srs{sub_state_id = SStateId}, Acc) ->
+            case emqx_persistent_session_ds_stream_scheduler:is_fully_acked(SRS, S0) of
                 false ->
+                    Acc#{SStateId => true};
+                true ->
                     Acc
             end
         end,
+        AliveSet0,
+        S0
+    ),
+    %% Delete dangling subscription states:
+    emqx_persistent_session_ds_state:fold_subscription_states(
+        fun(SStateId, _, S) ->
+            case maps:is_key(SStateId, AliveSet) of
+                true ->
+                    S;
+                false ->
+                    emqx_persistent_session_ds_state:del_subscription_state(SStateId, S)
+            end
+        end,
         S0,
         S0
     ).
@@ -90,12 +163,16 @@ gc(S0) ->
 -spec lookup(emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_state:t()) ->
     emqx_persistent_session_ds:subscription() | undefined.
 lookup(TopicFilter, S) ->
-    Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
-    case emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined) of
-        #{deleted := true} ->
-            undefined;
-        Sub ->
-            Sub
+    case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S) of
+        Sub = #{current_state := SStateId} ->
+            case emqx_persistent_session_ds_state:get_subscription_state(SStateId, S) of
+                #{subopts := SubOpts} ->
+                    Sub#{subopts => SubOpts};
+                undefined ->
+                    undefined
+            end;
+        undefined ->
+            undefined
     end.
 
 %% @doc Convert active subscriptions to a map, for information
@@ -103,7 +180,7 @@ lookup(TopicFilter, S) ->
 -spec to_map(emqx_persistent_session_ds_state:t()) -> map().
 to_map(S) ->
     fold(
-        fun(TopicFilter, #{props := Props}, Acc) -> Acc#{TopicFilter => Props} end,
+        fun(TopicFilter, _, Acc) -> Acc#{TopicFilter => lookup(TopicFilter, S)} end,
         #{},
         S
     ).
@@ -115,48 +192,12 @@ to_map(S) ->
     emqx_persistent_session_ds_state:t()
 ) ->
     Acc.
-fold(Fun, AccIn, S) ->
-    fold_all(
-        fun(TopicFilter, Sub = #{deleted := Deleted}, Acc) ->
-            case Deleted of
-                true -> Acc;
-                false -> Fun(TopicFilter, Sub, Acc)
-            end
-        end,
-        AccIn,
-        S
-    ).
-
-%% @doc Fold over all subscriptions, including inactive ones:
--spec fold_all(
-    fun((emqx_types:topic(), emqx_persistent_session_ds:subscription(), Acc) -> Acc),
-    Acc,
-    emqx_persistent_session_ds_state:t()
-) ->
-    Acc.
-fold_all(Fun, AccIn, S) ->
-    Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
-    emqx_topic_gbt:fold(
-        fun(Key, Sub, Acc) -> Fun(emqx_topic_gbt:get_topic(Key), Sub, Acc) end,
-        AccIn,
-        Subs
-    ).
+fold(Fun, Acc, S) ->
+    emqx_persistent_session_ds_state:fold_subscriptions(Fun, Acc, S).
 
 %%================================================================================
 %% Internal functions
 %%================================================================================
 
--spec has_no_unacked_streams(
-    emqx_persistent_session_ds:subscription_id(), emqx_persistent_session_ds_state:t()
-) -> boolean().
-has_no_unacked_streams(SubId, S) ->
-    emqx_persistent_session_ds_state:fold_streams(
-        fun
-            ({SID, _Stream}, Srs, Acc) when SID =:= SubId ->
-                emqx_persistent_session_ds_stream_scheduler:is_fully_acked(Srs, S) andalso Acc;
-            (_StreamKey, _Srs, Acc) ->
-                Acc
-        end,
-        true,
-        S
-    ).
+now_ms() ->
+    erlang:system_time(millisecond).

+ 5 - 0
apps/emqx/src/emqx_session.erl

@@ -429,6 +429,11 @@ enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) ->
         end,
     enrich_message(ClientInfo, Msg, SubOpts, UpgradeQoS).
 
+%% Caution: updating this function _may_ break consistency of replay
+%% for persistent sessions. Persistent sessions expect it to return
+%% the same result during replay. If it changes the behavior between
+%% releases, sessions restored from the cold storage may end up
+%% replaying messages with different QoS, etc.
 enrich_message(
     ClientInfo = #{clientid := ClientId},
     Msg = #message{from = ClientId},

+ 9 - 55
apps/emqx/test/emqx_persistent_session_ds_state_tests.erl

@@ -74,9 +74,6 @@ session_id() ->
 topic() ->
     oneof([<<"foo">>, <<"bar">>, <<"foo/#">>, <<"//+/#">>]).
 
-subid() ->
-    oneof([[]]).
-
 subscription() ->
     oneof([#{}]).
 
@@ -129,18 +126,25 @@ put_req() ->
             {Track, Seqno},
             {seqno_track(), seqno()},
             {#s.seqno, put_seqno, Track, Seqno}
+        ),
+        ?LET(
+            {Topic, Subscription},
+            {topic(), subscription()},
+            {#s.subs, put_subscription, Topic, Subscription}
         )
     ]).
 
 get_req() ->
     oneof([
         {#s.streams, get_stream, stream_id()},
-        {#s.seqno, get_seqno, seqno_track()}
+        {#s.seqno, get_seqno, seqno_track()},
+        {#s.subs, get_subscription, topic()}
     ]).
 
 del_req() ->
     oneof([
-        {#s.streams, del_stream, stream_id()}
+        {#s.streams, del_stream, stream_id()},
+        {#s.subs, del_subscription, topic()}
     ]).
 
 command(S) ->
@@ -153,13 +157,6 @@ command(S) ->
                 {2, {call, ?MODULE, reopen, [session_id(S)]}},
                 {2, {call, ?MODULE, commit, [session_id(S)]}},
 
-                %% Subscriptions:
-                {3,
-                    {call, ?MODULE, put_subscription, [
-                        session_id(S), topic(), subid(), subscription()
-                    ]}},
-                {3, {call, ?MODULE, del_subscription, [session_id(S), topic(), subid()]}},
-
                 %% Metadata:
                 {3, {call, ?MODULE, put_metadata, [session_id(S), put_metadata()]}},
                 {3, {call, ?MODULE, get_metadata, [session_id(S), get_metadata()]}},
@@ -170,7 +167,6 @@ command(S) ->
                 {3, {call, ?MODULE, gen_del, [session_id(S), del_req()]}},
 
                 %% Getters:
-                {4, {call, ?MODULE, get_subscriptions, [session_id(S)]}},
                 {1, {call, ?MODULE, iterate_sessions, [batch_size()]}}
             ]);
         false ->
@@ -207,19 +203,6 @@ postcondition(S, {call, ?MODULE, gen_get, [SessionId, {Idx, Fun, Key}]}, Result)
         #{session_id => SessionId, key => Key, 'fun' => Fun}
     ),
     true;
-postcondition(S, {call, ?MODULE, get_subscriptions, [SessionId]}, Result) ->
-    #{SessionId := #s{subs = Subs}} = S,
-    ?assertEqual(maps:size(Subs), emqx_topic_gbt:size(Result)),
-    maps:foreach(
-        fun({TopicFilter, Id}, Expected) ->
-            ?assertEqual(
-                Expected,
-                emqx_topic_gbt:lookup(TopicFilter, Id, Result, default)
-            )
-        end,
-        Subs
-    ),
-    true;
 postcondition(_, _, _) ->
     true.
 
@@ -227,22 +210,6 @@ next_state(S, _V, {call, ?MODULE, create_new, [SessionId]}) ->
     S#{SessionId => #s{}};
 next_state(S, _V, {call, ?MODULE, delete, [SessionId]}) ->
     maps:remove(SessionId, S);
-next_state(S, _V, {call, ?MODULE, put_subscription, [SessionId, TopicFilter, SubId, Subscription]}) ->
-    Key = {TopicFilter, SubId},
-    update(
-        SessionId,
-        #s.subs,
-        fun(Subs) -> Subs#{Key => Subscription} end,
-        S
-    );
-next_state(S, _V, {call, ?MODULE, del_subscription, [SessionId, TopicFilter, SubId]}) ->
-    Key = {TopicFilter, SubId},
-    update(
-        SessionId,
-        #s.subs,
-        fun(Subs) -> maps:remove(Key, Subs) end,
-        S
-    );
 next_state(S, _V, {call, ?MODULE, put_metadata, [SessionId, {Key, _Fun, Val}]}) ->
     update(
         SessionId,
@@ -296,19 +263,6 @@ reopen(SessionId) ->
     {ok, S} = emqx_persistent_session_ds_state:open(SessionId),
     put_state(SessionId, S).
 
-put_subscription(SessionId, TopicFilter, SubId, Subscription) ->
-    S = emqx_persistent_session_ds_state:put_subscription(
-        TopicFilter, SubId, Subscription, get_state(SessionId)
-    ),
-    put_state(SessionId, S).
-
-del_subscription(SessionId, TopicFilter, SubId) ->
-    S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, SubId, get_state(SessionId)),
-    put_state(SessionId, S).
-
-get_subscriptions(SessionId) ->
-    emqx_persistent_session_ds_state:get_subscriptions(get_state(SessionId)).
-
 put_metadata(SessionId, {_MetaKey, Fun, Value}) ->
     S = apply(emqx_persistent_session_ds_state, Fun, [Value, get_state(SessionId)]),
     put_state(SessionId, S).