|
|
@@ -89,13 +89,16 @@ find_new_streams(S) ->
|
|
|
Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
|
|
|
shuffle(
|
|
|
emqx_persistent_session_ds_state:fold_streams(
|
|
|
- fun(Key, Stream, Acc) ->
|
|
|
- case is_fully_acked(Comm1, Comm2, Stream) of
|
|
|
- true ->
|
|
|
- [{Key, Stream} | Acc];
|
|
|
- false ->
|
|
|
- Acc
|
|
|
- end
|
|
|
+ fun
|
|
|
+ (_Key, #srs{it_end = end_of_stream}, Acc) ->
|
|
|
+ Acc;
|
|
|
+ (Key, Stream, Acc) ->
|
|
|
+ case is_fully_acked(Comm1, Comm2, Stream) of
|
|
|
+ true ->
|
|
|
+ [{Key, Stream} | Acc];
|
|
|
+ false ->
|
|
|
+ Acc
|
|
|
+ end
|
|
|
end,
|
|
|
[],
|
|
|
S
|