|
|
@@ -238,13 +238,18 @@ remove_fully_replayed_streams(S0) ->
|
|
|
Groups = emqx_persistent_session_ds_state:fold_streams(
|
|
|
fun({SubId, _Stream}, StreamState = #srs{rank_x = RankX, rank_y = RankY}, Acc) ->
|
|
|
Key = {SubId, RankX},
|
|
|
- case is_fully_replayed(CommQos1, CommQos2, StreamState) of
|
|
|
- true when is_map_key(Key, Acc) ->
|
|
|
+ case {is_fully_replayed(CommQos1, CommQos2, StreamState), Acc} of
|
|
|
+ {_, #{Key := false}} ->
|
|
|
+ Acc;
|
|
|
+ {true, #{Key := {true, RankY}}} ->
|
|
|
+ Acc;
|
|
|
+ {true, #{Key := {true, _RankYOther}}} ->
|
|
|
+ %% assert, should never happen
|
|
|
+ error(multiple_rank_y_for_rank_x);
|
|
|
+ {true, #{}} ->
|
|
|
Acc#{Key => {true, RankY}};
|
|
|
- true ->
|
|
|
- Acc#{Key => false};
|
|
|
- _ ->
|
|
|
- Acc
|
|
|
+ {false, #{}} ->
|
|
|
+ Acc#{Key => false}
|
|
|
end
|
|
|
end,
|
|
|
#{},
|
|
|
@@ -267,7 +272,7 @@ remove_fully_replayed_streams(S0) ->
|
|
|
case emqx_persistent_session_ds_state:get_rank({SubId, RankX}, Acc) of
|
|
|
undefined ->
|
|
|
Acc;
|
|
|
- MinRankY when RankY < MinRankY ->
|
|
|
+ MinRankY when RankY =< MinRankY ->
|
|
|
?SLOG(debug, #{
|
|
|
msg => del_fully_preplayed_stream,
|
|
|
key => Key,
|