Przeglądaj źródła

fix(sessds): Don't schedule fully replayed streams for poll

ieQu1 1 rok temu
rodzic
commit
f8e32027f1

+ 2 - 2
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -1481,11 +1481,11 @@ update_seqno(
                 case Track of
                 case Track of
                     puback ->
                     puback ->
                         emqx_persistent_session_ds_stream_scheduler:on_seqno_release(
                         emqx_persistent_session_ds_stream_scheduler:on_seqno_release(
-                            ?QOS_1, SeqNo, SchedS0
+                            ?QOS_1, SeqNo, S, SchedS0
                         );
                         );
                     pubcomp ->
                     pubcomp ->
                         emqx_persistent_session_ds_stream_scheduler:on_seqno_release(
                         emqx_persistent_session_ds_stream_scheduler:on_seqno_release(
-                            ?QOS_2, SeqNo, SchedS0
+                            ?QOS_2, SeqNo, S, SchedS0
                         );
                         );
                     _ ->
                     _ ->
                         SchedS0
                         SchedS0

+ 38 - 23
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl

@@ -59,13 +59,15 @@
 %% un-acked QoS1, QoS2 or QoS1&2 messages respectively. Such streams
 %% un-acked QoS1, QoS2 or QoS1&2 messages respectively. Such streams
 %% are stored in `#s.bq1' or `#s.bq2' buckets (or both).
 %% are stored in `#s.bq1' or `#s.bq2' buckets (or both).
 %%
 %%
-%% - *(U)nsubscribed*: streams for unsubcribed topics can linger in
-%% the session state for a while until all queued messages are acked.
-%% This state is implicit: unsubscribed streams are simply removed
-%% from all buckets. Unsubscribed streams are ignored by the scheduler
-%% until the moment they can be garbage-collected. So this is a
-%% terminal state. Even if the client resubscribes, it will produce a
-%% new, totally separate set of SRS.
+%% - *(U)ndead*: zombie streams that are kept for historical reasons
+%% only. For example, streams for unsubcribed topics can linger in the
+%% session state for a while until all queued messages are acked, as
+%% well as streams that reached `end_of_stream'. This state is
+%% implicit: undead streams are simply removed from all buckets.
+%% Undead streams are ignored by the scheduler until the moment they
+%% can be garbage-collected. So this is a terminal state. Even if the
+%% client resubscribes, it will produce a new, totally separate set of
+%% SRS.
 %%
 %%
 %% *** State transitions
 %% *** State transitions
 %%
 %%
@@ -98,7 +100,10 @@
 %%    \
 %%    \
 %%     \--([QoS0] & QoS2 messages)--> *BQ2* --> ...
 %%     \--([QoS0] & QoS2 messages)--> *BQ2* --> ...
 %%      \
 %%      \
-%%       `--([QoS0] & QoS1 & QoS2)--> *BQ12* --> ...
+%%       \--([QoS0] & QoS1 & QoS2)--> *BQ12* --> ...
+%%        \
+%%         `--(`end_of_stream')--> *U*
+%%
 %%
 %%
 %% *BQ1* and *BQ2* are handled similarly. They transition to *Ready*
 %% *BQ1* and *BQ2* are handled similarly. They transition to *Ready*
 %% once session calls `?MODULE:on_seqno_release' for the corresponding
 %% once session calls `?MODULE:on_seqno_release' for the corresponding
@@ -126,7 +131,7 @@
     poll/3,
     poll/3,
     on_ds_reply/3,
     on_ds_reply/3,
     on_enqueue/5,
     on_enqueue/5,
-    on_seqno_release/3,
+    on_seqno_release/4,
     find_replay_streams/1,
     find_replay_streams/1,
     is_fully_acked/2
     is_fully_acked/2
 ]).
 ]).
@@ -206,11 +211,13 @@ init(S) ->
     },
     },
     Comm1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
     Comm1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
     Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
     Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
-    %% Restore stream states:
+    %% Restore stream states.
+    %%
+    %% Note: these states are NOT used during replay.
     emqx_persistent_session_ds_state:fold_streams(
     emqx_persistent_session_ds_state:fold_streams(
         fun(Key, Srs, Acc) ->
         fun(Key, Srs, Acc) ->
             case derive_state(Comm1, Comm2, Srs) of
             case derive_state(Comm1, Comm2, Srs) of
-                r -> to_R(Key, Acc);
+                r -> to_RU(Key, Srs, Acc);
                 u -> to_U(Key, Srs, Acc);
                 u -> to_U(Key, Srs, Acc);
                 bq1 -> to_BQ1(Key, Srs, Acc);
                 bq1 -> to_BQ1(Key, Srs, Acc);
                 bq2 -> to_BQ2(Key, Srs, Acc);
                 bq2 -> to_BQ2(Key, Srs, Acc);
@@ -290,10 +297,8 @@ on_ds_reply(#poll_reply{ref = Ref, payload = poll_timeout}, S, SchedS0 = #s{pend
                     case emqx_persistent_session_ds_state:get_stream(Key, S) of
                     case emqx_persistent_session_ds_state:get_stream(Key, S) of
                         undefined ->
                         undefined ->
                             SchedS2;
                             SchedS2;
-                        #srs{unsubscribed = true} ->
-                            SchedS2;
-                        _ ->
-                            to_R(Key, SchedS2)
+                        Srs ->
+                            to_RU(Key, Srs, SchedS2)
                     end;
                     end;
                 false ->
                 false ->
                     SchedS1
                     SchedS1
@@ -332,7 +337,7 @@ on_enqueue(false, Key, Srs, S, SchedS) ->
     Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
     Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
     case derive_state(Comm1, Comm2, Srs) of
     case derive_state(Comm1, Comm2, Srs) of
         r ->
         r ->
-            to_R(Key, SchedS);
+            to_RU(Key, Srs, SchedS);
         u ->
         u ->
             to_U(Key, Srs, SchedS);
             to_U(Key, Srs, SchedS);
         bq1 ->
         bq1 ->
@@ -343,14 +348,15 @@ on_enqueue(false, Key, Srs, S, SchedS) ->
             to_BQ12(Key, Srs, SchedS)
             to_BQ12(Key, Srs, SchedS)
     end.
     end.
 
 
-on_seqno_release(?QOS_1, SnQ1, SchedS0 = #s{bq1 = PrimaryTab0, bq2 = SecondaryTab}) ->
+on_seqno_release(?QOS_1, SnQ1, S, SchedS0 = #s{bq1 = PrimaryTab0, bq2 = SecondaryTab}) ->
     case check_block_status(PrimaryTab0, SecondaryTab, SnQ1, #block.last_seqno_qos2) of
     case check_block_status(PrimaryTab0, SecondaryTab, SnQ1, #block.last_seqno_qos2) of
         false ->
         false ->
             %% This seqno doesn't unlock anything:
             %% This seqno doesn't unlock anything:
             SchedS0;
             SchedS0;
         {false, Key, PrimaryTab} ->
         {false, Key, PrimaryTab} ->
             %% It was BQ1:
             %% It was BQ1:
-            to_R(Key, SchedS0#s{bq1 = PrimaryTab});
+            Srs = emqx_persistent_session_ds_state:get_stream(Key, S),
+            to_RU(Key, Srs, SchedS0#s{bq1 = PrimaryTab});
         {true, Key, PrimaryTab} ->
         {true, Key, PrimaryTab} ->
             %% It was BQ12:
             %% It was BQ12:
             ?tp(sessds_stream_state_trans, #{
             ?tp(sessds_stream_state_trans, #{
@@ -359,14 +365,15 @@ on_seqno_release(?QOS_1, SnQ1, SchedS0 = #s{bq1 = PrimaryTab0, bq2 = SecondaryTa
             }),
             }),
             SchedS0#s{bq1 = PrimaryTab}
             SchedS0#s{bq1 = PrimaryTab}
     end;
     end;
-on_seqno_release(?QOS_2, SnQ2, SchedS0 = #s{bq2 = PrimaryTab0, bq1 = SecondaryTab}) ->
+on_seqno_release(?QOS_2, SnQ2, S, SchedS0 = #s{bq2 = PrimaryTab0, bq1 = SecondaryTab}) ->
     case check_block_status(PrimaryTab0, SecondaryTab, SnQ2, #block.last_seqno_qos1) of
     case check_block_status(PrimaryTab0, SecondaryTab, SnQ2, #block.last_seqno_qos1) of
         false ->
         false ->
             %% This seqno doesn't unlock anything:
             %% This seqno doesn't unlock anything:
             SchedS0;
             SchedS0;
         {false, Key, PrimaryTab} ->
         {false, Key, PrimaryTab} ->
             %% It was BQ2:
             %% It was BQ2:
-            to_R(Key, SchedS0#s{bq2 = PrimaryTab});
+            Srs = emqx_persistent_session_ds_state:get_stream(Key, S),
+            to_RU(Key, Srs, SchedS0#s{bq2 = PrimaryTab});
         {true, Key, PrimaryTab} ->
         {true, Key, PrimaryTab} ->
             %% It was BQ12:
             %% It was BQ12:
             ?tp(sessds_stream_state_trans, #{
             ?tp(sessds_stream_state_trans, #{
@@ -513,6 +520,8 @@ is_fully_acked(Srs, S) ->
 ) -> state().
 ) -> state().
 derive_state(_, _, #srs{unsubscribed = true}) ->
 derive_state(_, _, #srs{unsubscribed = true}) ->
     u;
     u;
+derive_state(_, _, #srs{it_end = end_of_stream}) ->
+    u;
 derive_state(Comm1, Comm2, SRS) ->
 derive_state(Comm1, Comm2, SRS) ->
     case {is_track_acked(?QOS_1, Comm1, SRS), is_track_acked(?QOS_2, Comm2, SRS)} of
     case {is_track_acked(?QOS_1, Comm1, SRS), is_track_acked(?QOS_2, Comm2, SRS)} of
         {true, true} -> r;
         {true, true} -> r;
@@ -524,8 +533,14 @@ derive_state(Comm1, Comm2, SRS) ->
 %% Note: `to_State' functions must be called from a correct state.
 %% Note: `to_State' functions must be called from a correct state.
 %% They are NOT idempotent, and they don't do full cleanup.
 %% They are NOT idempotent, and they don't do full cleanup.
 
 
--spec to_R(stream_key(), t()) -> t().
-to_R(Key, S = #s{ready = R}) ->
+%% Transfer the stream either to R state if it's pollable or to U
+%% state if it's not.
+-spec to_RU(stream_key(), srs(), t()) -> t().
+to_RU(Key, Srs = #srs{it_end = end_of_stream}, S) ->
+    to_U(Key, Srs, S);
+to_RU(Key, Srs = #srs{unsubscribed = true}, S) ->
+    to_U(Key, Srs, S);
+to_RU(Key, _Srs, S = #s{ready = R}) ->
     ?tp(sessds_stream_state_trans, #{
     ?tp(sessds_stream_state_trans, #{
         key => Key,
         key => Key,
         to => r
         to => r
@@ -629,7 +644,7 @@ ensure_iterator(TopicFilter, StartTime, SubId, SStateId, {{RankX, RankY}, Stream
                     },
                     },
                     {
                     {
                         emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S),
                         emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S),
-                        to_R(Key, SchedS)
+                        to_RU(Key, NewStreamState, SchedS)
                     };
                     };
                 {error, Class, Reason} ->
                 {error, Class, Reason} ->
                     ?SLOG(info, #{
                     ?SLOG(info, #{