Jelajahi Sumber

feat(queue): replace invalid rewing algorithm with skipping iterator

Ilya Averyanov 1 tahun lalu
induk
melakukan
143086b0ef

+ 14 - 54
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl

@@ -341,7 +341,11 @@ accept_stream(#{topic_filter := TopicFilter} = Event, S, ScheduledActions) ->
     end.
 
 accept_stream(
-    #{topic_filter := TopicFilter, stream := Stream, progress := Progress} = _Event,
+    #{
+        topic_filter := TopicFilter,
+        stream := Stream,
+        progress := #{iterator := Iterator} = _Progress
+    } = _Event,
     S0
 ) ->
     case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
@@ -361,7 +365,6 @@ accept_stream(
                 end,
             case NeedCreateStream of
                 true ->
-                    Iterator = rewind_iterator(Progress),
                     NewSRS =
                         #srs{
                             rank_x = ?rank_x,
@@ -377,52 +380,6 @@ accept_stream(
             end
     end.
 
-%% Skip acked messages.
-%% This may be a bit inefficient, and it is unclear how to handle errors.
-%%
-%% A better variant would be to wrap the iterator on `emqx_ds` level in a new one,
-%% that will skip acked messages internally in `emqx_ds:next` function.
-%% Unluckily, emqx_ds does not have a wrapping structure around iterators of
-%% the underlying levels, so we cannot wrap it without a risk of confusion.
-
-rewind_iterator(#{iterator := Iterator, acked := true}) ->
-    Iterator;
-rewind_iterator(#{iterator := Iterator0, acked := false, qos1_acked := 0, qos2_acked := 0}) ->
-    Iterator0;
-%% This should not happen, means the DS is consistent
-rewind_iterator(#{iterator := Iterator0, acked := false, qos1_acked := Q1, qos2_acked := Q2}) when
-    Q1 < 0 orelse Q2 < 0
-->
-    Iterator0;
-rewind_iterator(
-    #{iterator := Iterator0, acked := false, qos1_acked := Q1Old, qos2_acked := Q2Old} = Progress
-) ->
-    case emqx_ds:next(?PERSISTENT_MESSAGE_DB, Iterator0, Q1Old + Q2Old) of
-        {ok, Iterator1, Messages} ->
-            {Q1New, Q2New} = update_qos_acked(Q1Old, Q2Old, Messages),
-            rewind_iterator(Progress#{
-                iterator => Iterator1, qos1_acked => Q1New, qos2_acked => Q2New
-            });
-        {ok, end_of_stream} ->
-            end_of_stream;
-        {error, _, _} ->
-            %% What to do here?
-            %% In the wrapping variant we do not have this problem.
-            Iterator0
-    end.
-
-update_qos_acked(Q1, Q2, []) ->
-    {Q1, Q2};
-update_qos_acked(Q1, Q2, [{_Key, Message} | Messages]) ->
-    case emqx_message:qos(Message) of
-        ?QOS_1 ->
-            update_qos_acked(Q1 - 1, Q2, Messages);
-        ?QOS_2 ->
-            update_qos_acked(Q1, Q2 - 1, Messages);
-        _ ->
-            update_qos_acked(Q1, Q2, Messages)
-    end.
-
 revoke_stream(
     #{topic_filter := TopicFilter, stream := Stream}, S0
 ) ->
@@ -667,8 +624,8 @@ stream_progress(
         first_seqno_qos2 = StartQos2
     } = SRS
 ) ->
-    Qos1Acked = seqno_diff(?QOS_1, CommQos1, StartQos1),
-    Qos2Acked = seqno_diff(?QOS_2, CommQos2, StartQos2),
+    Qos1Acked = n_acked(?QOS_1, CommQos1, StartQos1),
+    Qos2Acked = n_acked(?QOS_2, CommQos2, StartQos2),
     case is_stream_fully_acked(CommQos1, CommQos2, SRS) of
         true ->
             #{
@@ -683,10 +640,10 @@ stream_progress(
             #{
                 stream => Stream,
                 progress => #{
-                    acked => false,
-                    iterator => BeginIt,
-                    qos1_acked => Qos1Acked,
-                    qos2_acked => Qos2Acked
+                    acked => true,
+                    iterator => emqx_ds_skipping_iterator:update_or_new(
+                        BeginIt, Qos1Acked, Qos2Acked
+                    )
                 },
                 use_finished => is_use_finished(SRS)
             }
@@ -753,6 +710,9 @@ is_stream_fully_acked(_, _, #srs{
 is_stream_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) ->
     (Comm1 >= S1) andalso (Comm2 >= S2).
 
+n_acked(Qos, A, B) ->
+    max(seqno_diff(Qos, A, B), 0).
+
 -dialyzer({nowarn_function, seqno_diff/3}).
 seqno_diff(?QOS_1, A, B) ->
     %% For QoS1 messages we skip a seqno every time the epoch changes,

+ 4 - 0
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -401,10 +401,14 @@ make_iterator(DB, Stream, TopicFilter, StartTime) ->
 
 -spec update_iterator(db(), iterator(), message_key()) ->
     make_iterator_result().
+update_iterator(DB, ?skipping_iterator_match = OldIter, DSKey) ->
+    emqx_ds_skipping_iterator:update_iterator(DB, OldIter, DSKey);
 update_iterator(DB, OldIter, DSKey) ->
     ?module(DB):update_iterator(DB, OldIter, DSKey).
 
 -spec next(db(), iterator(), pos_integer()) -> next_result().
+next(DB, ?skipping_iterator_match = Iter, BatchSize) ->
+    emqx_ds_skipping_iterator:next(DB, Iter, BatchSize);
 next(DB, Iter, BatchSize) ->
     ?module(DB):next(DB, Iter, BatchSize).
 

+ 87 - 0
apps/emqx_durable_storage/src/emqx_ds_skipping_iterator.erl

@@ -0,0 +1,87 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_skipping_iterator).
+
+-include("emqx_ds_skipping_iterator.hrl").
+-include("emqx/include/emqx_mqtt.hrl").
+
+-type t() :: ?skipping_iterator(emqx_ds:iterator(), non_neg_integer(), non_neg_integer()).
+
+-export([
+    update_or_new/3,
+    update_iterator/3,
+    next/3
+]).
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+-spec update_or_new(t() | emqx_ds:iterator(), non_neg_integer(), non_neg_integer()) -> t().
+update_or_new(?skipping_iterator_match(Iterator, Q1Skip0, Q2Skip0), Q1Skip, Q2Skip) when
+    Q1Skip >= 0 andalso Q2Skip >= 0
+->
+    ?skipping_iterator(Iterator, Q1Skip0 + Q1Skip, Q2Skip0 + Q2Skip);
+update_or_new(Iterator, Q1Skip, Q2Skip) when Q1Skip >= 0 andalso Q2Skip >= 0 ->
+    ?skipping_iterator(Iterator, Q1Skip, Q2Skip).
+
+-spec next(emqx_ds:db(), t(), pos_integer()) -> emqx_ds:next_result(t()).
+next(DB, ?skipping_iterator_match(Iterator0, Q1Skip0, Q2Skip0), Count) ->
+    case emqx_ds:next(DB, Iterator0, Count) of
+        {error, _, _} = Error ->
+            Error;
+        {ok, end_of_stream} ->
+            {ok, end_of_stream};
+        {ok, Iterator1, Messages0} ->
+            {Messages1, Q1Skip1, Q2Skip1} = skip(Messages0, Q1Skip0, Q2Skip0),
+            case {Q1Skip1, Q2Skip1} of
+                {0, 0} -> {ok, Iterator1, Messages1};
+                _ -> {ok, ?skipping_iterator(Iterator1, Q1Skip1, Q2Skip1)}
+            end
+    end.
+
+-spec update_iterator(emqx_ds:db(), emqx_ds:iterator(), emqx_ds:message_key()) ->
+    emqx_ds:make_iterator_result().
+update_iterator(DB, ?skipping_iterator_match(Iterator0, Q1Skip0, Q2Skip0), Key) ->
+    case emqx_ds:update_iterator(DB, Iterator0, Key) of
+        {error, _, _} = Error -> Error;
+        {ok, Iterator1} -> {ok, ?skipping_iterator(Iterator1, Q1Skip0, Q2Skip0)}
+    end.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+skip(Messages, Q1Skip, Q2Skip) ->
+    skip(Messages, Q1Skip, Q2Skip, []).
+
+skip([], Q1Skip, Q2Skip, Agg) ->
+    {lists:reverse(Agg), Q1Skip, Q2Skip};
+skip([{Key, Message} | Messages], Q1Skip, Q2Skip, Agg) ->
+    Qos = emqx_message:qos(Message),
+    skip({Key, Message}, Qos, Messages, Q1Skip, Q2Skip, Agg).
+
+skip(_KeyMessage, ?QOS_0, Messages, Q1Skip, Q2Skip, Agg) ->
+    skip(Messages, Q1Skip, Q2Skip, Agg);
+skip(_KeyMessage, ?QOS_1, Messages, Q1Skip, Q2Skip, Agg) when Q1Skip > 0 ->
+    skip(Messages, Q1Skip - 1, Q2Skip, Agg);
+skip(KeyMessage, ?QOS_1, Messages, 0, Q2Skip, Agg) ->
+    skip(Messages, 0, Q2Skip, [KeyMessage | Agg]);
+skip(_KeyMessage, ?QOS_2, Messages, Q1Skip, Q2Skip, Agg) when Q2Skip > 0 ->
+    skip(Messages, Q1Skip, Q2Skip - 1, Agg);
+skip(KeyMessage, ?QOS_2, Messages, Q1Skip, 0, Agg) ->
+    skip(Messages, Q1Skip, 0, [KeyMessage | Agg]).

+ 32 - 0
apps/emqx_durable_storage/src/emqx_ds_skipping_iterator.hrl

@@ -0,0 +1,32 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-define(tag, 1).
+-define(it, 2).
+-define(qos1_skip, 3).
+-define(qos2_skip, 4).
+
+-define(IT, -1000).
+
+-define(skipping_iterator_match, #{?tag := ?IT}).
+
+-define(skipping_iterator_match(Iterator, Q1Skip, Q2Skip), #{
+    ?tag := ?IT, ?it := Iterator, ?qos1_skip := Q1Skip, ?qos2_skip := Q2Skip
+}).
+
+-define(skipping_iterator(Iterator, Q1Skip, Q2Skip), #{
+    ?tag => ?IT, ?it => Iterator, ?qos1_skip => Q1Skip, ?qos2_skip => Q2Skip
+}).