Просмотр исходного кода

Merge pull request #11917 from ieQu1/dev/fix-packet-id-to-seqno-trans

fix(ds): Fix packet id -> sequence number translation
ieQu1 2 лет назад
Родитель
Сommit
ced7be61f0

+ 95 - 23
apps/emqx/src/emqx_persistent_message_ds_replayer.erl

@@ -26,9 +26,11 @@
 
 -export_type([inflight/0]).
 
+-include_lib("emqx/include/logger.hrl").
 -include("emqx_persistent_session_ds.hrl").
 
 -ifdef(TEST).
+-include_lib("proper/include/proper.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -endif.
 
@@ -65,9 +67,17 @@ new() ->
     #inflight{}.
 
 -spec next_packet_id(inflight()) -> {emqx_types:packet_id(), inflight()}.
-next_packet_id(Inflight0 = #inflight{next_seqno = LastSeqno}) ->
-    Inflight = Inflight0#inflight{next_seqno = LastSeqno + 1},
-    {seqno_to_packet_id(LastSeqno), Inflight}.
+next_packet_id(Inflight0 = #inflight{next_seqno = LastSeqNo}) ->
+    Inflight = Inflight0#inflight{next_seqno = LastSeqNo + 1},
+    case LastSeqNo rem 16#10000 of
+        0 ->
+            %% We skip sequence numbers that lead to PacketId = 0 to
+            %% simplify math. Note: it leads to occasional gaps in the
+            %% sequence numbers.
+            next_packet_id(Inflight);
+        PacketId ->
+            {PacketId, Inflight}
+    end.
 
 -spec replay(emqx_persistent_session_ds:id(), inflight()) ->
     emqx_session:replies().
@@ -83,8 +93,20 @@ commit_offset(
         acked_seqno = AckedSeqno0, next_seqno = NextSeqNo, offset_ranges = Ranges0
     }
 ) ->
-    AckedSeqno = packet_id_to_seqno(NextSeqNo, PacketId),
-    true = AckedSeqno0 < AckedSeqno,
+    AckedSeqno =
+        case packet_id_to_seqno(NextSeqNo, PacketId) of
+            N when N > AckedSeqno0; AckedSeqno0 =:= 0 ->
+                N;
+            OutOfRange ->
+                ?SLOG(warning, #{
+                    msg => "out-of-order_ack",
+                    prev_seqno => AckedSeqno0,
+                    acked_seqno => OutOfRange,
+                    next_seqno => NextSeqNo,
+                    packet_id => PacketId
+                }),
+                AckedSeqno0
+        end,
     Ranges = lists:filter(
         fun(#range{stream = Stream, last = LastSeqno, iterator_next = ItNext}) ->
             case LastSeqno =< AckedSeqno of
@@ -140,18 +162,17 @@ fetch(SessionId, Inflight0, [Stream | Streams], N, Publishes0) ->
     #inflight{next_seqno = FirstSeqNo, offset_ranges = Ranges0} = Inflight0,
     ItBegin = get_last_iterator(SessionId, Stream, Ranges0),
     {ok, ItEnd, Messages} = emqx_ds:next(ItBegin, N),
-    {Publishes, Inflight1} =
+    {NMessages, Publishes, Inflight1} =
         lists:foldl(
-            fun(Msg, {PubAcc0, InflightAcc0}) ->
+            fun(Msg, {N0, PubAcc0, InflightAcc0}) ->
                 {PacketId, InflightAcc} = next_packet_id(InflightAcc0),
                 PubAcc = [{PacketId, Msg} | PubAcc0],
-                {PubAcc, InflightAcc}
+                {N0 + 1, PubAcc, InflightAcc}
             end,
-            {Publishes0, Inflight0},
+            {0, Publishes0, Inflight0},
             Messages
         ),
     #inflight{next_seqno = LastSeqNo} = Inflight1,
-    NMessages = LastSeqNo - FirstSeqNo,
     case NMessages > 0 of
         true ->
             Range = #range{
@@ -193,25 +214,22 @@ get_streams(SessionId) ->
         mnesia:dirty_read(?SESSION_STREAM_TAB, SessionId)
     ).
 
-%% Packet ID as defined by MQTT protocol is a 16-bit integer in range
-%% 1..FFFF. This function translates internal session sequence number
-%% to MQTT packet ID by chopping off most significant bits and adding
-%% 1.  This assumes that there's never more FFFF in-flight packets at
-%% any time:
--spec seqno_to_packet_id(non_neg_integer()) -> emqx_types:packet_id().
-seqno_to_packet_id(Counter) ->
-    Counter rem 16#ffff + 1.
-
 %% Reconstruct session counter by adding most significant bits from
 %% the current counter to the packet id.
 -spec packet_id_to_seqno(non_neg_integer(), emqx_types:packet_id()) -> non_neg_integer().
 packet_id_to_seqno(NextSeqNo, PacketId) ->
-    N = ((NextSeqNo bsr 16) bsl 16) + PacketId,
-    case N > NextSeqNo of
-        true -> N - 16#10000;
-        false -> N
+    Epoch = NextSeqNo bsr 16,
+    case packet_id_to_seqno_(Epoch, PacketId) of
+        N when N =< NextSeqNo ->
+            N;
+        _ ->
+            packet_id_to_seqno_(Epoch - 1, PacketId)
     end.
 
+-spec packet_id_to_seqno_(non_neg_integer(), emqx_types:packet_id()) -> non_neg_integer().
+packet_id_to_seqno_(Epoch, PacketId) ->
+    (Epoch bsl 16) + PacketId.
+
 -spec shuffle([A]) -> [A].
 shuffle(L0) ->
     L1 = lists:map(
@@ -223,3 +241,57 @@ shuffle(L0) ->
     L2 = lists:sort(L1),
     {_, L} = lists:unzip(L2),
     L.
+
+-ifdef(TEST).
+
+%% This test only tests boundary conditions (to make sure property-based test didn't skip them):
+packet_id_to_seqno_test() ->
+    %% Packet ID = 1; first epoch:
+    ?assertEqual(1, packet_id_to_seqno(1, 1)),
+    ?assertEqual(1, packet_id_to_seqno(10, 1)),
+    ?assertEqual(1, packet_id_to_seqno(1 bsl 16 - 1, 1)),
+    ?assertEqual(1, packet_id_to_seqno(1 bsl 16, 1)),
+    %% Packet ID = 1; second and 3rd epochs:
+    ?assertEqual(1 bsl 16 + 1, packet_id_to_seqno(1 bsl 16 + 1, 1)),
+    ?assertEqual(1 bsl 16 + 1, packet_id_to_seqno(2 bsl 16, 1)),
+    ?assertEqual(2 bsl 16 + 1, packet_id_to_seqno(2 bsl 16 + 1, 1)),
+    %% Packet ID = 16#ffff:
+    PID = 1 bsl 16 - 1,
+    ?assertEqual(PID, packet_id_to_seqno(PID, PID)),
+    ?assertEqual(PID, packet_id_to_seqno(1 bsl 16, PID)),
+    ?assertEqual(1 bsl 16 + PID, packet_id_to_seqno(2 bsl 16, PID)),
+    ok.
+
+packet_id_to_seqno_test_() ->
+    Opts = [{numtests, 1000}, {to_file, user}],
+    {timeout, 30, fun() -> ?assert(proper:quickcheck(packet_id_to_seqno_prop(), Opts)) end}.
+
+packet_id_to_seqno_prop() ->
+    ?FORALL(
+        NextSeqNo,
+        next_seqno_gen(),
+        ?FORALL(
+            SeqNo,
+            seqno_gen(NextSeqNo),
+            begin
+                PacketId = SeqNo rem 16#10000,
+                ?assertEqual(SeqNo, packet_id_to_seqno(NextSeqNo, PacketId)),
+                true
+            end
+        )
+    ).
+
+next_seqno_gen() ->
+    ?LET(
+        {Epoch, Offset},
+        {non_neg_integer(), non_neg_integer()},
+        Epoch bsl 16 + Offset
+    ).
+
+seqno_gen(NextSeqNo) ->
+    WindowSize = 1 bsl 16 - 1,
+    Min = max(0, NextSeqNo - WindowSize),
+    Max = max(0, NextSeqNo - 1),
+    range(Min, Max).
+
+-endif.

+ 1 - 1
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -340,7 +340,7 @@ deliver(_ClientInfo, _Delivers, Session) ->
 -spec handle_timeout(clientinfo(), _Timeout, session()) ->
     {ok, replies(), session()} | {ok, replies(), timeout(), session()}.
 handle_timeout(_ClientInfo, pull, Session = #{id := Id, inflight := Inflight0}) ->
-    WindowSize = 100,
+    WindowSize = 1000,
     {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(Id, Inflight0, WindowSize),
     %% TODO: make these values configurable:
     Timeout =