Преглед изворни кода

test(sessds): Add property-based tests for seqno generator

ieQu1 пре 2 година
родитељ
комит
3fb2064ea4

+ 4 - 2
apps/emqx/src/emqx_channel.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2019-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -191,7 +191,9 @@ info(topic_aliases, #channel{topic_aliases = Aliases}) ->
 info(alias_maximum, #channel{alias_maximum = Limits}) ->
     Limits;
 info(timers, #channel{timers = Timers}) ->
-    Timers.
+    Timers;
+info(session_state, #channel{session = Session}) ->
+    Session.
 
 set_conn_state(ConnState, Channel) ->
     Channel#channel{conn_state = ConnState}.

+ 1 - 1
apps/emqx/src/emqx_cm.erl

@@ -1,5 +1,5 @@
 %%-------------------------------------------------------------------
-%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.

+ 83 - 8
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -27,6 +27,11 @@
 
 -include("emqx_persistent_session_ds.hrl").
 
+-ifdef(TEST).
+-include_lib("proper/include/proper.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
 %% Session API
 -export([
     create/3,
@@ -216,12 +221,12 @@ info(retry_interval, #{props := Conf}) ->
     maps:get(retry_interval, Conf);
 % info(mqueue, #sessmem{mqueue = MQueue}) ->
 %     MQueue;
-% info(mqueue_len, #sessmem{mqueue = MQueue}) ->
-%     emqx_mqueue:len(MQueue);
+info(mqueue_len, #{inflight := Inflight}) ->
+    emqx_persistent_session_ds_inflight:n_buffered(Inflight);
 % info(mqueue_max, #sessmem{mqueue = MQueue}) ->
 %     emqx_mqueue:max_len(MQueue);
-% info(mqueue_dropped, #sessmem{mqueue = MQueue}) ->
-%     emqx_mqueue:dropped(MQueue);
+info(mqueue_dropped, _Session) ->
+    0;
 %% info(next_pkt_id, #{s := S}) ->
 %%     {PacketId, _} = emqx_persistent_message_ds_replayer:next_packet_id(S),
 %%     PacketId;
@@ -750,6 +755,11 @@ drain_buffer(Session = #{inflight := Inflight0}) ->
     ((STREAM#ifs.last_seqno_qos1 =< COMMITTEDQOS1 orelse STREAM#ifs.last_seqno_qos1 =:= undefined) andalso
      (STREAM#ifs.last_seqno_qos2 =< COMMITTEDQOS2 orelse STREAM#ifs.last_seqno_qos2 =:= undefined))).
 
+%% erlfmt-ignore
+-define(last_replayed(STREAM, NEXTQOS1, NEXTQOS2),
+    ((STREAM#ifs.last_seqno_qos1 == NEXTQOS1 orelse STREAM#ifs.last_seqno_qos1 =:= undefined) andalso
+     (STREAM#ifs.last_seqno_qos2 == NEXTQOS2 orelse STREAM#ifs.last_seqno_qos2 =:= undefined))).
+
 -spec find_replay_streams(session()) ->
     [{emqx_persistent_session_ds_state:stream_key(), stream_state()}].
 find_replay_streams(#{s := S}) ->
@@ -1002,9 +1012,6 @@ commit_seqno(Track, PacketId, Session = #{id := SessionId, s := S}) ->
     seqno().
 packet_id_to_seqno(PacketId, S) ->
     NextSeqNo = emqx_persistent_session_ds_state:get_seqno(?next(packet_id_to_qos(PacketId)), S),
-    packet_id_to_seqno_(PacketId, NextSeqNo).
-
-packet_id_to_seqno_(PacketId, NextSeqNo) ->
     Epoch = NextSeqNo bsr 15,
     SeqNo = (Epoch bsl 15) + (PacketId bsr 1),
     case SeqNo =< NextSeqNo of
@@ -1059,6 +1066,74 @@ list_all_sessions() ->
 
 %%%% Proper generators:
 
-%%%% Unit tests:
+%% Generate a sequence number that smaller than the given `NextSeqNo'
+%% number by at most `?EPOCH_SIZE':
+seqno_gen(NextSeqNo) ->
+    WindowSize = ?EPOCH_SIZE - 1,
+    Min = max(0, NextSeqNo - WindowSize),
+    Max = max(0, NextSeqNo - 1),
+    range(Min, Max).
+
+%% Generate a sequence number:
+next_seqno_gen() ->
+    ?LET(
+        {Epoch, Offset},
+        {non_neg_integer(), non_neg_integer()},
+        Epoch bsl 15 + Offset
+    ).
+
+%%%% Property-based tests:
+
+%% erlfmt-ignore
+packet_id_to_seqno_prop() ->
+    ?FORALL(
+        {Qos, NextSeqNo}, {oneof([?QOS_1, ?QOS_2]), next_seqno_gen()},
+        ?FORALL(
+            ExpectedSeqNo, seqno_gen(NextSeqNo),
+            begin
+                PacketId = seqno_to_packet_id(Qos, ExpectedSeqNo),
+                SeqNo = packet_id_to_seqno(PacketId, NextSeqNo),
+                ?WHENFAIL(
+                    begin
+                        io:format(user, " *** PacketID = ~p~n", [PacketId]),
+                        io:format(user, " *** SeqNo = ~p -> ~p~n", [ExpectedSeqNo, SeqNo]),
+                        io:format(user, " *** NextSeqNo = ~p~n", [NextSeqNo])
+                    end,
+                    PacketId < 16#10000 andalso SeqNo =:= ExpectedSeqNo
+                )
+            end)).
+
+inc_seqno_prop() ->
+    ?FORALL(
+        {Qos, SeqNo},
+        {oneof([?QOS_1, ?QOS_2]), next_seqno_gen()},
+        begin
+            NewSeqNo = inc_seqno(Qos, SeqNo),
+            PacketId = seqno_to_packet_id(Qos, NewSeqNo),
+            ?WHENFAIL(
+                begin
+                    io:format(user, " *** SeqNo = ~p -> ~p~n", [SeqNo, NewSeqNo]),
+                    io:format(user, " *** PacketId = ~p~n", [PacketId])
+                end,
+                PacketId > 0 andalso PacketId < 16#10000
+            )
+        end
+    ).
+
+seqno_proper_test_() ->
+    Props = [packet_id_to_seqno_prop(), inc_seqno_prop()],
+    Opts = [{numtests, 10000}, {to_file, user}],
+    {timeout, 30,
+        {setup,
+            fun() ->
+                meck:new(emqx_persistent_session_ds_state, [no_history]),
+                ok = meck:expect(emqx_persistent_session_ds_state, get_seqno, fun(_Track, Seqno) ->
+                    Seqno
+                end)
+            end,
+            fun(_) ->
+                meck:unload(emqx_persistent_session_ds_state)
+            end,
+            [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}}.
 
 -endif.

+ 15 - 5
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -36,7 +36,7 @@ all() ->
         % NOTE
         % Tests are disabled while existing session persistence impl is being
         % phased out.
-        %{group, persistence_disabled},
+        %%{group, persistence_disabled},
         {group, persistence_enabled}
     ].
 
@@ -56,7 +56,7 @@ groups() ->
     TCsNonGeneric = [t_choose_impl],
     TCGroups = [{group, tcp}, {group, quic}, {group, ws}],
     [
-        {persistence_disabled, TCGroups},
+        %% {persistence_disabled, TCGroups},
         {persistence_enabled, TCGroups},
         {tcp, [], TCs},
         {quic, [], TCs -- TCsNonGeneric},
@@ -782,8 +782,9 @@ t_publish_many_while_client_is_gone(Config) ->
     ClientOpts = [
         {proto_ver, v5},
         {clientid, ClientId},
-        {properties, #{'Session-Expiry-Interval' => 30}},
-        {auto_ack, never}
+        %,
+        {properties, #{'Session-Expiry-Interval' => 30}}
+        %{auto_ack, never}
         | Config
     ],
 
@@ -810,7 +811,7 @@ t_publish_many_while_client_is_gone(Config) ->
     Msgs1 = receive_messages(NPubs1),
     ct:pal("Msgs1 = ~p", [Msgs1]),
     NMsgs1 = length(Msgs1),
-    ?assertEqual(NPubs1, NMsgs1),
+    ?assertEqual(NPubs1, NMsgs1, debug_info(ClientId)),
 
     ?assertEqual(
         get_topicwise_order(Pubs1),
@@ -1084,3 +1085,12 @@ skip_ds_tc(Config) ->
         _ ->
             Config
     end.
+
+fail_with_debug_info(Exception, ClientId) ->
+    case emqx_cm:lookup_channels(ClientId) of
+        [Chan] ->
+            sys:get_state(Chan, 1000);
+        [] ->
+            no_channel
+    end,
+    exit(Exception).