Просмотр исходного кода

Merge pull request #13977 from savonarola/1010-fix-end-of-stream-handling

feat(dssess): fix end of stream handling
Ilia Averianov 1 год назад
Родитель
Сommit
b5ffa890ba

+ 4 - 5
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl

@@ -521,8 +521,6 @@ is_fully_acked(Srs, S) ->
 ) -> state().
 derive_state(_, _, #srs{unsubscribed = true}) ->
     u;
-derive_state(_, _, #srs{it_end = end_of_stream}) ->
-    u;
 derive_state(Comm1, Comm2, SRS) ->
     case {is_track_acked(?QOS_1, Comm1, SRS), is_track_acked(?QOS_2, Comm2, SRS)} of
         {true, true} -> r;
@@ -537,8 +535,9 @@ derive_state(Comm1, Comm2, SRS) ->
 %% Transfer the stream either to R state if it's pollable or to U
 %% state if it's not.
 -spec to_RU(stream_key(), srs(), t()) -> t().
-to_RU(Key, Srs = #srs{it_end = end_of_stream}, S) ->
-    to_U(Key, Srs, S);
+to_RU(_Key, #srs{it_end = end_of_stream}, S) ->
+    %% Just don't add to ready
+    S;
 to_RU(Key, Srs = #srs{unsubscribed = true}, S) ->
     to_U(Key, Srs, S);
 to_RU(Key, _Srs, S = #s{ready = R}) ->
@@ -766,7 +765,7 @@ remove_fully_replayed_streams(S0) ->
                     Acc;
                 MinRankY when RankY =< MinRankY ->
                     ?SLOG(debug, #{
-                        msg => del_fully_preplayed_stream,
+                        msg => del_fully_replayed_stream,
                         key => Key,
                         rank => {RankX, RankY},
                         min => MinRankY