Просмотр исходного кода

fix(sessds): Replace GET_STREAMS timer with stream events

ieQu1 1 год назад
Родитель
Сommit
8a7d56b467

+ 80 - 87
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -199,14 +199,12 @@
 
 -define(TIMER_PULL, timer_pull).
 -define(TIMER_PUSH, timer_push).
--define(TIMER_GET_STREAMS, timer_get_streams).
 -define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at).
 -define(TIMER_RETRY_REPLAY, timer_retry_replay).
 
 -type timer() ::
     ?TIMER_PULL
     | ?TIMER_PUSH
-    | ?TIMER_GET_STREAMS
     | ?TIMER_BUMP_LAST_ALIVE_AT
     | ?TIMER_RETRY_REPLAY.
 
@@ -234,7 +232,6 @@
     %% Timers:
     ?TIMER_PULL := timer_state(),
     ?TIMER_PUSH := timer_state(),
-    ?TIMER_GET_STREAMS := timer_state(),
     ?TIMER_BUMP_LAST_ALIVE_AT := timer_state(),
     ?TIMER_RETRY_REPLAY := timer_state()
 }.
@@ -280,7 +277,7 @@
 -spec create(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
     session().
 create(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
-    ensure_timers(session_ensure_new(ClientID, ClientInfo, ConnInfo, MaybeWillMsg, Conf)).
+    post_init(session_ensure_new(ClientID, ClientInfo, ConnInfo, MaybeWillMsg, Conf)).
 
 -spec open(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
     {_IsPresent :: true, session(), []} | false.
@@ -296,7 +293,7 @@ open(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
         Session0 = #{} ->
             Session1 = Session0#{props := Conf},
             Session = do_expire(ClientInfo, Session1),
-            {true, ensure_timers(Session), []};
+            {true, post_init(Session), []};
         false ->
             false
     end.
@@ -425,24 +422,25 @@ print_session(ClientId) ->
 subscribe(
     #share{} = TopicFilter,
     SubOpts,
-    Session
+    Session0
 ) ->
-    case emqx_persistent_session_ds_shared_subs:on_subscribe(TopicFilter, SubOpts, Session) of
-        {ok, S0, SharedSubS} ->
-            S = emqx_persistent_session_ds_state:commit(S0),
-            {ok, Session#{s := S, shared_sub_s := SharedSubS}};
+    case emqx_persistent_session_ds_shared_subs:on_subscribe(TopicFilter, SubOpts, Session0) of
+        {ok, S, SharedSubS} ->
+            Session = Session0#{s := S, shared_sub_s := SharedSubS},
+            {ok, commit(Session)};
         Error = {error, _} ->
             Error
     end;
 subscribe(
     TopicFilter,
     SubOpts,
-    Session
+    Session0
 ) ->
-    case emqx_persistent_session_ds_subs:on_subscribe(TopicFilter, SubOpts, Session) of
+    case emqx_persistent_session_ds_subs:on_subscribe(TopicFilter, SubOpts, Session0) of
         {ok, S1, NewStreamSubs} ->
-            S = emqx_persistent_session_ds_state:commit(S1),
-            {ok, Session#{s := S, new_stream_subs := NewStreamSubs}};
+            Session1 = Session0#{s := S1, new_stream_subs := NewStreamSubs},
+            Session = renew_streams(TopicFilter, Session1),
+            {ok, commit(Session)};
         Error = {error, _} ->
             Error
     end.
@@ -464,12 +462,11 @@ unsubscribe(
         )
     of
         {ok, S1, SharedSubS1, #{id := SubId, subopts := SubOpts}} ->
-            {S2, SchedS} = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(
+            {S, SchedS} = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(
                 SubId, S1, SchedS0
             ),
-            S = emqx_persistent_session_ds_state:commit(S2),
             Session = Session0#{s := S, shared_sub_s := SharedSubS1, stream_scheduler_s := SchedS},
-            {ok, Session, SubOpts};
+            {ok, commit(Session), SubOpts};
         Error = {error, _} ->
             Error
     end;
@@ -483,14 +480,13 @@ unsubscribe(
         emqx_persistent_session_ds_subs:on_unsubscribe(SessionId, TopicFilter, S0, NewStreamSubs0)
     of
         {ok, S1, NewStreamSubs, #{id := SubId, subopts := SubOpts}} ->
-            {S2, SchedS} = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(
+            {S, SchedS} = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(
                 SubId, S1, SchedS0
             ),
-            S = emqx_persistent_session_ds_state:commit(S2),
             Session = Session0#{
                 s := S, stream_scheduler_s := SchedS, new_stream_subs := NewStreamSubs
             },
-            {ok, Session, SubOpts};
+            {ok, commit(Session), SubOpts};
         Error = {error, _} ->
             Error
     end.
@@ -686,39 +682,18 @@ handle_timeout(ClientInfo, ?TIMER_PUSH, Session0) ->
 handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) ->
     Session = replay_streams(Session0, ClientInfo),
     {ok, [], Session};
-handle_timeout(
-    ClientInfo,
-    ?TIMER_GET_STREAMS,
-    Session0 = #{s := S0, shared_sub_s := SharedSubS0, stream_scheduler_s := SchedS0}
-) ->
-    ?tp(debug, sessds_renew_streams, #{}),
-    %% `gc` and `renew_streams` methods may drop unsubscribed streams.
-    %% Shared subscription handler must have a chance to see unsubscribed streams
-    %% in the fully replayed state.
-    {S1, SchedS1, SharedSubS} = emqx_persistent_session_ds_shared_subs:on_streams_replay(
-        S0, SchedS0, SharedSubS0
-    ),
-    S2 = emqx_persistent_session_ds_subs:gc(S1),
-    {S, SchedS} = emqx_persistent_session_ds_stream_scheduler:renew_streams(S2, SchedS1),
-    Interval = get_config(ClientInfo, [renew_streams_interval]),
-    Session = set_timer(
-        ?TIMER_GET_STREAMS,
-        Interval,
-        Session0#{s := S, shared_sub_s := SharedSubS, stream_scheduler_s := SchedS}
-    ),
-    {ok, [], push_now(Session)};
 handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0 = #{s := S0}) ->
-    S = emqx_persistent_session_ds_state:commit(bump_last_alive(S0)),
+    S = bump_last_alive(S0),
     Session = set_timer(
         ?TIMER_BUMP_LAST_ALIVE_AT,
         bump_interval(),
         Session0#{s := S}
     ),
-    {ok, [], Session};
-handle_timeout(_ClientInfo, #req_sync{from = From, ref = Ref}, Session = #{s := S0}) ->
-    S = emqx_persistent_session_ds_state:commit(S0),
+    {ok, [], commit(Session)};
+handle_timeout(_ClientInfo, #req_sync{from = From, ref = Ref}, Session0) ->
+    Session = commit(Session0),
     From ! Ref,
-    {ok, [], Session#{s := S}};
+    {ok, [], Session};
 handle_timeout(ClientInfo, expire_awaiting_rel, Session) ->
     expire(ClientInfo, Session);
 handle_timeout(_ClientInfo, Timeout, Session) ->
@@ -852,9 +827,19 @@ skip_batch(StreamKey, SRS0, Session = #{s := S0}, ClientInfo, Reason) ->
     S = emqx_persistent_session_ds_state:put_stream(StreamKey, SRS, S0),
     Session#{s := S}.
 
-renew_streams(TopicFilter, Session0) ->
-    ?tp(debug, sessds_renew_streams, #{topic_filter => TopicFilter}),
-    #{s := S0, shared_sub_s := SharedSubS0, stream_scheduler_s := SchedS0} = Session0,
+%%--------------------------------------------------------------------
+
+renew_streams(all, Session0) ->
+    ?tp(debug, sessds_renew_streams, #{topic_filter => all}),
+    Session1 = #{s := S0, stream_scheduler_s := SchedS0} = stream_housekeeping(Session0),
+    %% Renew streams for all subscriptions:
+    {S, SchedS} = emqx_persistent_session_ds_stream_scheduler:renew_streams(
+        S0, SchedS0
+    ),
+    Session = Session1#{s := S, stream_scheduler_s := SchedS},
+    %% Launch push loop to get data from the new streams:
+    push_now(Session);
+renew_streams(TopicFilter, Session0 = #{s := S0}) ->
     case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
         undefined ->
             ?tp(
@@ -864,23 +849,29 @@ renew_streams(TopicFilter, Session0) ->
             ),
             Session0;
         Subscription ->
-            %% `gc` and `renew_streams` methods may drop unsubscribed streams.
-            %% Shared subscription handler must have a chance to see unsubscribed streams
-            %% in the fully replayed state.
-            {S1, SchedS1, SharedSubS} = emqx_persistent_session_ds_shared_subs:on_streams_replay(
-                S0, SchedS0, SharedSubS0
-            ),
-            %% Take an opportunity to remove obsolete stream states:
-            S2 = emqx_persistent_session_ds_subs:gc(S1),
-            %% Renew streams for the topic filter:
+            ?tp(debug, sessds_renew_streams, #{topic_filter => TopicFilter}),
+            Session1 = #{s := S1, stream_scheduler_s := SchedS0} = stream_housekeeping(Session0),
+            %% Renew streams for the topic-filter:
             {S, SchedS} = emqx_persistent_session_ds_stream_scheduler:renew_streams(
-                TopicFilter, Subscription, S2, SchedS1
+                TopicFilter, Subscription, S1, SchedS0
             ),
-            Session = Session0#{s := S, shared_sub_s := SharedSubS, stream_scheduler_s := SchedS},
+            Session = Session1#{s := S, stream_scheduler_s := SchedS},
             %% Launch push loop to get data from the new streams:
             push_now(Session)
     end.
 
+stream_housekeeping(Session) ->
+    #{s := S0, shared_sub_s := SharedSubS0, stream_scheduler_s := SchedS0} = Session,
+    %% `gc' and `renew_streams' methods may drop unsubscribed streams.
+    %% Shared subscription handler must have a chance to see
+    %% unsubscribed streams in the fully replayed state.
+    {S1, SchedS, SharedSubS} = emqx_persistent_session_ds_shared_subs:on_streams_replay(
+        S0, SchedS0, SharedSubS0
+    ),
+    %% Take the opportunity to remove obsolete stream states:
+    S = emqx_persistent_session_ds_subs:gc(S1),
+    Session#{s := S, shared_sub_s := SharedSubS, stream_scheduler_s := SchedS}.
+
 %%--------------------------------------------------------------------
 
 -spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}.
@@ -894,14 +885,13 @@ disconnect(Session = #{id := Id, s := S0, shared_sub_s := SharedSubS0}, ConnInfo
             _ ->
                 S2
         end,
-    {S4, SharedSubS} = emqx_persistent_session_ds_shared_subs:on_disconnect(S3, SharedSubS0),
-    S = emqx_persistent_session_ds_state:commit(S4),
-    {shutdown, Session#{s := S, shared_sub_s := SharedSubS}}.
+    {S, SharedSubS} = emqx_persistent_session_ds_shared_subs:on_disconnect(S3, SharedSubS0),
+    {shutdown, commit(Session#{s := S, shared_sub_s := SharedSubS})}.
 
 -spec terminate(Reason :: term(), session()) -> ok.
-terminate(_Reason, Session = #{id := Id, s := S}) ->
+terminate(_Reason, Session = #{id := Id}) ->
     maybe_set_will_message_timer(Session),
-    _ = emqx_persistent_session_ds_state:commit(S),
+    _ = commit(Session),
     ?tp(debug, persistent_session_ds_terminate, #{id => Id}),
     ok.
 
@@ -1011,30 +1001,30 @@ session_open(
                     S4 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S3),
                     S5 = set_clientinfo(ClientInfo, S4),
                     S6 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S5),
-                    {ok, S7, SharedSubS} = emqx_persistent_session_ds_shared_subs:open(
+                    {ok, S, SharedSubS} = emqx_persistent_session_ds_shared_subs:open(
                         S6, shared_sub_opts(SessionId)
                     ),
-                    S = emqx_persistent_session_ds_state:commit(S7),
                     Inflight = emqx_persistent_session_ds_buffer:new(
                         receive_maximum(NewConnInfo)
                     ),
                     NewStreamSubs = emqx_persistent_session_ds_subs:open(S),
                     SSS = emqx_persistent_session_ds_stream_scheduler:init(S),
-                    #{
-                        id => SessionId,
-                        s => S,
-                        shared_sub_s => SharedSubS,
-                        inflight => Inflight,
-                        props => #{},
-                        stream_scheduler_s => SSS,
-                        replay => undefined,
-                        new_stream_subs => NewStreamSubs,
-                        ?TIMER_PULL => undefined,
-                        ?TIMER_PUSH => undefined,
-                        ?TIMER_GET_STREAMS => undefined,
-                        ?TIMER_BUMP_LAST_ALIVE_AT => undefined,
-                        ?TIMER_RETRY_REPLAY => undefined
-                    }
+                    commit(
+                        #{
+                            id => SessionId,
+                            s => S,
+                            shared_sub_s => SharedSubS,
+                            inflight => Inflight,
+                            props => #{},
+                            stream_scheduler_s => SSS,
+                            replay => undefined,
+                            new_stream_subs => NewStreamSubs,
+                            ?TIMER_PULL => undefined,
+                            ?TIMER_PUSH => undefined,
+                            ?TIMER_BUMP_LAST_ALIVE_AT => undefined,
+                            ?TIMER_RETRY_REPLAY => undefined
+                        }
+                    )
             end;
         undefined ->
             false
@@ -1087,7 +1077,6 @@ session_ensure_new(
         replay => undefined,
         ?TIMER_PULL => undefined,
         ?TIMER_PUSH => undefined,
-        ?TIMER_GET_STREAMS => undefined,
         ?TIMER_BUMP_LAST_ALIVE_AT => undefined,
         ?TIMER_RETRY_REPLAY => undefined
     }.
@@ -1421,10 +1410,10 @@ do_drain_buffer(Inflight0, S0, Acc) ->
 
 %% TODO: find a more reliable way to perform actions that have side
 %% effects. Add `CBM:init' callback to the session behavior?
--spec ensure_timers(session()) -> session().
-ensure_timers(Session0) ->
-    Session1 = set_timer(?TIMER_GET_STREAMS, 100, Session0),
-    set_timer(?TIMER_BUMP_LAST_ALIVE_AT, 100, Session1).
+-spec post_init(session()) -> session().
+post_init(Session0) ->
+    Session = renew_streams(all, Session0),
+    set_timer(?TIMER_BUMP_LAST_ALIVE_AT, 100, Session).
 
 %% This function triggers sending buffered packets to the client
 %% (provided there is something to send and the number of in-flight
@@ -1688,6 +1677,10 @@ set_timer(Timer, Time, Session) ->
     TRef = emqx_utils:start_timer(Time, {emqx_session, Timer}),
     Session#{Timer := TRef}.
 
+commit(Session = #{s := S0}) ->
+    S = emqx_persistent_session_ds_state:commit(S0),
+    Session#{s := S}.
+
 %%--------------------------------------------------------------------
 %% Tests
 %%--------------------------------------------------------------------

+ 9 - 7
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_subs.erl

@@ -89,7 +89,7 @@ open(S) ->
     %% Receive notifications about new streams for the topic filter:
     fold_private_subscriptions(
         fun(TopicFilter, _Sub, Acc) ->
-            Acc#{TopicFilter => watch_streams(TopicFilter)}
+            Acc#{watch_streams(TopicFilter) => TopicFilter}
         end,
         #{},
         S
@@ -144,7 +144,7 @@ on_subscribe(TopicFilter, SubOpts, #{
                     ?tp(persistent_session_ds_subscription_added, #{
                         topic_filter => TopicFilter, session => SessionId
                     }),
-                    {ok, S, NewStreamSubs#{TopicFilter => watch_streams(TopicFilter)}};
+                    {ok, gc(S), NewStreamSubs#{watch_streams(TopicFilter) => TopicFilter}};
                 false ->
                     {error, ?RC_QUOTA_EXCEEDED}
             end;
@@ -153,7 +153,7 @@ on_subscribe(TopicFilter, SubOpts, #{
             case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of
                 SState ->
                     %% Client resubscribed with the same parameters:
-                    {ok, S0, NewStreamSubs};
+                    {ok, gc(S0), NewStreamSubs};
                 OldSState ->
                     %% Subsription parameters changed:
                     {SStateId, S1} = emqx_persistent_session_ds_state:new_id(S0),
@@ -165,7 +165,7 @@ on_subscribe(TopicFilter, SubOpts, #{
                     ),
                     Sub = Sub0#{current_state := SStateId},
                     S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Sub, S3),
-                    {ok, S, NewStreamSubs}
+                    {ok, gc(S), NewStreamSubs}
             end
     end.
 
@@ -194,10 +194,10 @@ on_unsubscribe(SessionId, TopicFilter, S0, NewStreamSubs) ->
             ),
             _ = emqx_external_broker:delete_persistent_route(TopicFilter, SessionId),
             S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0),
-            {ok, S, unwatch_streams(TopicFilter, NewStreamSubs), Subscription}
+            {ok, gc(S), unwatch_streams(TopicFilter, NewStreamSubs), Subscription}
     end.
 
--spec on_session_drop(emqx_persistent_session_ds:id(), emqx_persistent_session_ds:sesssion()) -> ok.
+-spec on_session_drop(emqx_persistent_session_ds:id(), emqx_persistent_session_ds_state:t()) -> ok.
 on_session_drop(SessionId, S0) ->
     _ = fold_private_subscriptions(
         fun(TopicFilter, _Subscription, S) ->
@@ -302,6 +302,7 @@ watch_streams(TopicFilter) ->
     {ok, Ref} = emqx_ds_new_streams:watch(
         ?PERSISTENT_MESSAGE_DB, emqx_topic:words(TopicFilter)
     ),
+    ?tp(debug, sessds_watch_streams, #{topic_filter => TopicFilter, ref => Ref}),
     Ref.
 
 unwatch_streams(TopicFilter, NewStreamSubs) ->
@@ -318,6 +319,7 @@ unwatch_streams(TopicFilter, NewStreamSubs) ->
         NewStreamSubs
     catch
         {found, Ref} ->
-            emqx_ds_new_streams:unwatch(Ref),
+            ?tp(debug, sessds_unwatch_streams, #{topic_filter => TopicFilter, ref => Ref}),
+            emqx_ds_new_streams:unwatch(?PERSISTENT_MESSAGE_DB, Ref),
             maps:remove(Ref, NewStreamSubs)
     end.

+ 103 - 20
apps/emqx/test/emqx_persistent_session_ds_SUITE.erl

@@ -52,7 +52,8 @@ init_per_testcase(TestCase, Config) when
     TestCase =:= t_session_subscription_idempotency;
     TestCase =:= t_session_unsubscription_idempotency;
     TestCase =:= t_subscription_state_change;
-    TestCase =:= t_storage_generations
+    TestCase =:= t_storage_generations;
+    TestCase =:= t_new_stream_notifications
 ->
     Cluster = cluster(#{n => 1}),
     WorkDir = emqx_cth_suite:work_dir(TestCase, Config),
@@ -100,7 +101,8 @@ end_per_testcase(TestCase, Config) when
     TestCase =:= t_session_unsubscription_idempotency;
     TestCase =:= t_subscription_state_change;
     TestCase =:= t_session_gc;
-    TestCase =:= t_storage_generations
+    TestCase =:= t_storage_generations;
+    TestCase =:= t_new_stream_notifications
 ->
     Nodes = ?config(nodes, Config),
     emqx_common_test_helpers:call_janitor(60_000),
@@ -427,12 +429,15 @@ t_subscription_state_change(Config) ->
     Port = get_mqtt_port(Node1, tcp),
     TopicFilter = <<"t/+">>,
     ClientId = mk_clientid(?FUNCTION_NAME, sub),
-    %% Helper function that waits for the internal session GC:
+    %% Helper function that forces session to perform stream renewal.
+    %% In the current implementation it also causes garbage collection
+    %% of session states:
     WaitGC = fun() ->
-        ?block_until(
-            #{?snk_kind := sessds_renew_streams, ?snk_meta := #{clientid := ClientId}}, 5_000, 0
-        ),
-        timer:sleep(10)
+        ?wait_async_action(
+            emqx_ds_new_streams:set_dirty(?PERSISTENT_MESSAGE_DB),
+            #{?snk_kind := sessds_renew_streams, ?snk_meta := #{clientid := ClientId}},
+            timer:seconds(10)
+        )
     end,
     %% Helper function that gets runtime state of the session:
     GetS = fun() ->
@@ -452,9 +457,8 @@ t_subscription_state_change(Config) ->
             #{TopicFilter := #{current_state := SSID1}} = Subs1,
             %% Fill the storage with some data to create the streams:
             {ok, _} = emqtt:publish(Pub, <<"t/1">>, <<"1">>, ?QOS_2),
-            [#{packet_id := PI1, qos := ?QOS_1}] = emqx_common_test_helpers:wait_publishes(
-                1, 5_000
-            ),
+            [#{packet_id := PI1, qos := ?QOS_1, topic := <<"t/1">>, payload := <<"1">>}] =
+                emqx_common_test_helpers:wait_publishes(1, 5_000),
             %% Upgrade subscription to QoS2 and wait for the session GC
             %% to happen. At which point the session should keep 2
             %% subscription states, one being marked as obsolete:
@@ -470,25 +474,104 @@ t_subscription_state_change(Config) ->
                 },
                 SStates2
             ),
-            %% Now ack the packet and publish some more to trigger SRS
-            %% update. This should, in effect, release the reference
-            %% to the old subscription state, and let GC delete old
-            %% subscription state:
+            %% Now ack the packet and trigger garbage collection. This
+            %% should release the reference to the old subscription
+            %% state, and let GC delete old subscription state:
             ok = emqtt:puback(Sub, PI1),
             {ok, _} = emqtt:publish(Pub, <<"t/1">>, <<"2">>, ?QOS_2),
-            %% QoS of subscription has been updated:
-            [#{packet_id := PI2, qos := ?QOS_2}] = emqx_common_test_helpers:wait_publishes(
-                1, 5_000
-            ),
+            %% Verify that QoS of subscription has been updated:
+            [#{packet_id := PI2, qos := ?QOS_2, topic := <<"t/1">>, payload := <<"2">>}] =
+                emqx_common_test_helpers:wait_publishes(1, 5_000),
             WaitGC(),
             #{subscriptions := Subs3, subscription_states := SStates3, streams := Streams3} = GetS(),
             ?assertEqual(Subs3, Subs2),
-            %% Old substate was deleted:
-            ?assertMatch([{SSID2, _}], maps:to_list(SStates3), Streams3)
+            %% Verify that the old substate (SSID1) was deleted:
+            ?assertMatch(
+                [{SSID2, _}],
+                maps:to_list(SStates3),
+                #{
+                    streams => Streams3,
+                    ssid1 => SSID1,
+                    ssid2 => SSID2,
+                    sub_states => SStates3
+                }
+            )
         end,
         []
     ).
 
+%% This testcase verifies the lifetimes of session's subscriptions to
+%% new stream events.
+t_new_stream_notifications(Config) ->
+    [Node1Spec | _] = ?config(node_specs, Config),
+    [Node1] = ?config(nodes, Config),
+    Port = get_mqtt_port(Node1, tcp),
+    ClientId = mk_clientid(?FUNCTION_NAME, sub),
+    TopicFilter1 = <<"foo/+">>,
+    TopicFilter2 = <<"bar">>,
+    ?check_trace(
+        #{timetrap => 30_000},
+        begin
+            %% Init:
+            Sub0 = start_client(#{port => Port, clientid => ClientId}),
+            {ok, _} = emqtt:connect(Sub0),
+            %% 1. Sessions should start watching streams when they
+            %% subscribe to the topics:
+            ?wait_async_action(
+                {ok, _, _} = emqtt:subscribe(Sub0, TopicFilter1, ?QOS_1),
+                #{?snk_kind := sessds_watch_streams, topic_filter := TopicFilter1}
+            ),
+            ?wait_async_action(
+                {ok, _, _} = emqtt:subscribe(Sub0, TopicFilter2, ?QOS_1),
+                #{?snk_kind := sessds_watch_streams, topic_filter := TopicFilter2}
+            ),
+            %% 2. Sessions should re-subscribe to the events after
+            %% reconnect:
+            emqtt:disconnect(Sub0),
+            {ok, SNKsub} = snabbkaffe:subscribe(
+                ?match_event(#{?snk_kind := sessds_watch_streams}),
+                2,
+                timer:seconds(10)
+            ),
+            Sub1 = start_client(#{port => Port, clientid => ClientId, clean_start => false}),
+            {ok, _} = emqtt:connect(Sub1),
+            %% Verify that both subscriptions have been renewed:
+            {ok, EventsAfterRestart} = snabbkaffe:receive_events(SNKsub),
+            ?assertMatch(
+                [<<"bar">>, <<"foo/+">>],
+                lists:sort(?projection(topic_filter, EventsAfterRestart))
+            ),
+            %% Verify that stream notifications are handled:
+            ?wait_async_action(
+                emqx_ds_new_streams:notify_new_stream(?PERSISTENT_MESSAGE_DB, <<"bar">>),
+                #{?snk_kind := sessds_renew_streams, topic_filter := <<"bar">>}
+            ),
+            ?wait_async_action(
+                emqx_ds_new_streams:notify_new_stream(?PERSISTENT_MESSAGE_DB, <<"foo/1">>),
+                #{?snk_kind := sessds_renew_streams, topic_filter := <<"foo/+">>}
+            ),
+            %% Verify that new stream subscriptions are removed when
+            %% session unsubscribes from a topic:
+            ?wait_async_action(
+                emqtt:unsubscribe(Sub1, TopicFilter2),
+                #{?snk_kind := sessds_unwatch_streams, topic_filter := <<"bar">>}
+            ),
+            %% But the rest of subscriptions are still active:
+            ?wait_async_action(
+                emqx_ds_new_streams:set_dirty(?PERSISTENT_MESSAGE_DB),
+                #{?snk_kind := sessds_renew_streams, topic_filter := <<"foo/+">>}
+            )
+        end,
+        fun(Trace) ->
+            ?assertMatch(
+                [], ?of_kind(sessds_new_stream_notification_for_undefined_subscription, Trace)
+            ),
+            ?assertMatch(
+                [], ?of_kind(sessds_unexpected_stream_notifiction, Trace)
+            )
+        end
+    ).
+
 t_session_discard_persistent_to_non_persistent(_Config) ->
     ClientId = atom_to_binary(?FUNCTION_NAME),
     Params = #{

+ 5 - 0
changes/ce/feat-14130.en.md

@@ -0,0 +1,5 @@
+Reduce CPU usage by idle durable sessions.
+
+Previously idle durable sessions had to wake up periodically to refresh the list of DS streams.
+This change makes stream discovery event-based.
+Additionally, it reduces the delay before the sessions are made aware of new streams, removing the long tail from the end-to-end latency distribution.