Преглед изворни кода

feat(sessds): Consider #srs with only QoS0 messages fully acked

ieQu1 пре 2 година
родитељ
комит
a9c55f7568
1 измењених фајлова са 6 додато и 1 уклоњено
  1. 6 1
      apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl

+ 6 - 1
apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl

@@ -168,7 +168,6 @@ del_subscription(SubId, S0) ->
 %%================================================================================
 %%================================================================================
 
 
 ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
 ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
-    %% TODO: hash collisions
     Key = {SubId, Stream},
     Key = {SubId, Stream},
     case emqx_persistent_session_ds_state:get_stream(Key, S) of
     case emqx_persistent_session_ds_state:get_stream(Key, S) of
         undefined ->
         undefined ->
@@ -303,6 +302,12 @@ compare_streams(
 is_fully_replayed(Comm1, Comm2, S = #srs{it_end = It}) ->
 is_fully_replayed(Comm1, Comm2, S = #srs{it_end = It}) ->
     It =:= end_of_stream andalso is_fully_acked(Comm1, Comm2, S).
     It =:= end_of_stream andalso is_fully_acked(Comm1, Comm2, S).
 
 
+is_fully_acked(_, _, #srs{
+    first_seqno_qos1 = Q1, last_seqno_qos1 = Q1, first_seqno_qos2 = Q2, last_seqno_qos2 = Q2
+}) ->
+    %% Streams where the last chunk doesn't contain any QoS1 and 2
+    %% messages are considered fully acked:
+    true;
 is_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) ->
 is_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) ->
     (Comm1 >= S1) andalso (Comm2 >= S2).
     (Comm1 >= S1) andalso (Comm2 >= S2).