Forráskód Böngészése

test(sessds): Add a more low-level test of stream scheduler

ieQu1 1 éve
szülő
commit
b03bae1424

+ 1 - 0
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl

@@ -211,6 +211,7 @@ init(S) ->
     %% Note: these states are NOT used during replay.
     emqx_persistent_session_ds_state:fold_streams(
         fun(Key, Srs, Acc) ->
+            ?tp(sessds_stream_state_trans, #{key => Key, to => '$restore'}),
             case derive_state(Comm1, Comm2, Srs) of
                 r -> to_RU(Key, Srs, Acc);
                 u -> to_U(Key, Srs, Acc);

+ 137 - 3
apps/emqx/test/emqx_persistent_session_ds_SUITE.erl

@@ -12,6 +12,8 @@
 -include_lib("emqx/include/asserts.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 
+-include("emqx_persistent_message.hrl").
+
 -import(emqx_common_test_helpers, [on_exit/1]).
 
 -define(DURABLE_SESSION_STATE, emqx_persistent_session).
@@ -27,7 +29,7 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    case emqx_ds_test_helpers:skip_if_norepl() of
+    try emqx_ds_test_helpers:skip_if_norepl() of
         false ->
             TCApps = emqx_cth_suite:start(
                 app_specs(),
@@ -36,6 +38,9 @@ init_per_suite(Config) ->
             [{tc_apps, TCApps} | Config];
         Yes ->
             Yes
+    catch
+        error:undef ->
+            {skip, standalone_not_supported}
     end.
 
 end_per_suite(Config) ->
@@ -45,7 +50,8 @@ end_per_suite(Config) ->
 
 init_per_testcase(TestCase, Config) when
     TestCase =:= t_session_subscription_idempotency;
-    TestCase =:= t_session_unsubscription_idempotency
+    TestCase =:= t_session_unsubscription_idempotency;
+    TestCase =:= t_storage_generations
 ->
     Cluster = cluster(#{n => 1}),
     ClusterOpts = #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)},
@@ -220,6 +226,61 @@ stop_and_commit(Client) ->
 %% Testcases
 %%------------------------------------------------------------------------------
 
+%% This testcase verifies session's behavior related to generation
+%% rotation in the message storage.
+%%
+%% This testcase verifies (on the surface level) that session handles
+%% `end_of_stream' and doesn't violate the ordering of messages that
+%% are split into different generations.
+t_storage_generations(Config) ->
+    [Node1Spec | _] = ?config(node_specs, Config),
+    [Node1] = ?config(nodes, Config),
+    Port = get_mqtt_port(Node1, tcp),
+    TopicFilter = <<"t/+">>,
+    ClientId = mk_clientid(?FUNCTION_NAME, sub),
+    ?check_trace(
+        #{timetrap => 30_000},
+        begin
+            %% Start subscriber:
+            Sub = start_client(#{port => Port, clientid => ClientId, auto_ack => never}),
+            {ok, _} = emqtt:connect(Sub),
+            {ok, _, _} = emqtt:subscribe(Sub, TopicFilter, qos2),
+            %% Start publisher:
+            Pub = start_client(#{port => Port, clientid => mk_clientid(?FUNCTION_NAME, pub)}),
+            {ok, _} = emqtt:connect(Pub),
+            %% Publish 3 messages. Subscriber receives them, but
+            %% doesn't ack them initially.
+            {ok, _} = emqtt:publish(Pub, <<"t/1">>, <<"1">>, ?QOS_1),
+            [#{packet_id := PI1}] = emqx_common_test_helpers:wait_publishes(1, 5_000),
+            {ok, _} = emqtt:publish(Pub, <<"t/2">>, <<"2">>, ?QOS_1),
+            {ok, _} = emqtt:publish(Pub, <<"t/2">>, <<"3">>, ?QOS_1),
+            [#{packet_id := PI2}, #{packet_id := PI3}] = emqx_common_test_helpers:wait_publishes(
+                2, 5_000
+            ),
+            %% Ack the first message. It transfers "t/1" stream into
+            %% ready state where it will be polled.
+            ok = emqtt:puback(Sub, PI1),
+            %% Create a new generation and publish messages to the new
+            %% generation. We expect 1 message for topic t/1 (since it
+            %% was unblocked), but NOT for t/2:
+            ok = emqx_ds:add_generation(?PERSISTENT_MESSAGE_DB),
+            timer:sleep(100),
+            {ok, _} = emqtt:publish(Pub, <<"t/1">>, <<"4">>, ?QOS_1),
+            {ok, _} = emqtt:publish(Pub, <<"t/2">>, <<"5">>, ?QOS_1),
+            [#{packet_id := PI4, payload := <<"4">>}] = emqx_common_test_helpers:wait_publishes(
+                2, 5_000
+            ),
+            %% Ack the rest of messages, it should unblock 5th
+            %% message:
+            ok = emqtt:puback(Sub, PI2),
+            ok = emqtt:puback(Sub, PI3),
+            ok = emqtt:puback(Sub, PI4),
+            [#{packet_id := PI5}] = emqx_common_test_helpers:wait_publishes(1, 5_000)
+        end,
+        [fun check_stream_state_transitions/1]
+    ),
+    ok.
+
 t_session_subscription_idempotency(Config) ->
     [Node1Spec | _] = ?config(node_specs, Config),
     [Node1] = ?config(nodes, Config),
@@ -624,7 +685,7 @@ t_session_gc(Config) ->
             ?assertMatch([_], list_all_subscriptions(Node1), subscriptions),
             ok
         end,
-        []
+        [fun check_stream_state_transitions/1]
     ),
     ok.
 
@@ -732,3 +793,76 @@ t_session_gc_will_message(_Config) ->
         []
     ),
     ok.
+
+%% Trace specifications:
+
+check_stream_state_transitions(Trace) ->
+    %% Check sequence of state transitions for each stream replay
+    %% state:
+    Groups = maps:groups_from_list(
+        fun(#{key := Key, ?snk_meta := #{clientid := ClientId}}) -> {ClientId, Key} end,
+        fun(#{to := To}) -> To end,
+        ?of_kind(sessds_stream_state_trans, Trace)
+    ),
+    ?assert(maps:size(Groups) > 0),
+    maps:foreach(
+        fun(StreamId, Transitions) ->
+            check_stream_state_transitions(StreamId, Transitions, void)
+        end,
+        Groups
+    ).
+
+%% erlfmt-ignore
+check_stream_state_transitions(StreamId, [], _) ->
+    true;
+check_stream_state_transitions(StreamId = {ClientId, Key}, ['$restore', To | Rest], State) ->
+    %% This clause verifies that restored session re-calculates states
+    %% of the streams exactly as they were before.
+    case To of
+        State ->
+            check_stream_state_transitions(StreamId, Rest, State);
+        _ ->
+            error(#{
+                kind => inconsistent_stream_state_after_session_restore,
+                from => State,
+                to => To,
+                clientid => ClientId,
+                key => Key
+            })
+    end;
+check_stream_state_transitions(StreamId = {ClientId, Key}, [To | Rest], State) ->
+    %% See FSM in emqx_persistent_session_ds_stream_scheduler.erl:
+    case {State, To} of
+        {void, r} -> ok;
+        %% R
+        {r, p} -> ok;
+        {r, u} -> ok;
+        %% P
+        {p, r} -> ok;
+        {p, s} -> ok;
+        %% S
+        {s, r} -> ok;
+        {s, u} -> ok;
+        {s, bq1} -> ok;
+        {s, bq2} -> ok;
+        {s, bq12} -> ok;
+        %% BQ1
+        {bq1, u} -> ok;
+        {bq1, r} -> ok;
+        %% BQ2
+        {bq2, u} -> ok;
+        {bq2, r} -> ok;
+        %% BQ12
+        {bq12, u} -> ok;
+        {bq12, bq1} -> ok;
+        {bq12, bq2} -> ok;
+        _ ->
+            error(#{
+                kind => invalid_state_transition,
+                from => State,
+                to => To,
+                clientid => ClientId,
+                key => Key
+            })
+    end,
+    check_stream_state_transitions(StreamId, Rest, To).