Kaynağa Gözat

Merge pull request #13887 from ieQu1/dev/EMQX-13204-fix-sessds-ready-state

fix(sessds): Don't schedule fully replayed streams for poll
ieQu1 1 yıl önce
ebeveyn
işleme
42d61d1839

+ 0 - 0
apps/emqx/integration_test/.gitkeep


+ 2 - 2
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -1485,11 +1485,11 @@ update_seqno(
                 case Track of
                     puback ->
                         emqx_persistent_session_ds_stream_scheduler:on_seqno_release(
-                            ?QOS_1, SeqNo, SchedS0
+                            ?QOS_1, SeqNo, S, SchedS0
                         );
                     pubcomp ->
                         emqx_persistent_session_ds_stream_scheduler:on_seqno_release(
-                            ?QOS_2, SeqNo, SchedS0
+                            ?QOS_2, SeqNo, S, SchedS0
                         );
                     _ ->
                         SchedS0

+ 39 - 23
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl

@@ -59,13 +59,15 @@
 %% un-acked QoS1, QoS2 or QoS1&2 messages respectively. Such streams
 %% are stored in `#s.bq1' or `#s.bq2' buckets (or both).
 %%
-%% - *(U)nsubscribed*: streams for unsubcribed topics can linger in
-%% the session state for a while until all queued messages are acked.
-%% This state is implicit: unsubscribed streams are simply removed
-%% from all buckets. Unsubscribed streams are ignored by the scheduler
-%% until the moment they can be garbage-collected. So this is a
-%% terminal state. Even if the client resubscribes, it will produce a
-%% new, totally separate set of SRS.
+%% - *(U)ndead*: zombie streams that are kept for historical reasons
+%% only. For example, streams for unsubcribed topics can linger in the
+%% session state for a while until all queued messages are acked, as
+%% well as streams that reached `end_of_stream'. This state is
+%% implicit: undead streams are simply removed from all buckets.
+%% Undead streams are ignored by the scheduler until the moment they
+%% can be garbage-collected. So this is a terminal state. Even if the
+%% client resubscribes, it will produce a new, totally separate set of
+%% SRS.
 %%
 %% *** State transitions
 %%
@@ -98,7 +100,10 @@
 %%    \
 %%     \--([QoS0] & QoS2 messages)--> *BQ2* --> ...
 %%      \
-%%       `--([QoS0] & QoS1 & QoS2)--> *BQ12* --> ...
+%%       \--([QoS0] & QoS1 & QoS2)--> *BQ12* --> ...
+%%        \
+%%         `--(`end_of_stream')--> *U*
+%%
 %%
 %% *BQ1* and *BQ2* are handled similarly. They transition to *Ready*
 %% once session calls `?MODULE:on_seqno_release' for the corresponding
@@ -126,7 +131,7 @@
     poll/3,
     on_ds_reply/3,
     on_enqueue/5,
-    on_seqno_release/3,
+    on_seqno_release/4,
     find_replay_streams/1,
     is_fully_acked/2
 ]).
@@ -201,11 +206,14 @@ init(S) ->
     },
     Comm1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
     Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
-    %% Restore stream states:
+    %% Restore stream states.
+    %%
+    %% Note: these states are NOT used during replay.
     emqx_persistent_session_ds_state:fold_streams(
         fun(Key, Srs, Acc) ->
+            ?tp(sessds_stream_state_trans, #{key => Key, to => '$restore'}),
             case derive_state(Comm1, Comm2, Srs) of
-                r -> to_R(Key, Acc);
+                r -> to_RU(Key, Srs, Acc);
                 u -> to_U(Key, Srs, Acc);
                 bq1 -> to_BQ1(Key, Srs, Acc);
                 bq2 -> to_BQ2(Key, Srs, Acc);
@@ -285,10 +293,8 @@ on_ds_reply(#poll_reply{ref = Ref, payload = poll_timeout}, S, SchedS0 = #s{pend
                     case emqx_persistent_session_ds_state:get_stream(Key, S) of
                         undefined ->
                             SchedS2;
-                        #srs{unsubscribed = true} ->
-                            SchedS2;
-                        _ ->
-                            to_R(Key, SchedS2)
+                        Srs ->
+                            to_RU(Key, Srs, SchedS2)
                     end;
                 false ->
                     SchedS1
@@ -327,7 +333,7 @@ on_enqueue(false, Key, Srs, S, SchedS) ->
     Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
     case derive_state(Comm1, Comm2, Srs) of
         r ->
-            to_R(Key, SchedS);
+            to_RU(Key, Srs, SchedS);
         u ->
             to_U(Key, Srs, SchedS);
         bq1 ->
@@ -338,14 +344,15 @@ on_enqueue(false, Key, Srs, S, SchedS) ->
             to_BQ12(Key, Srs, SchedS)
     end.
 
-on_seqno_release(?QOS_1, SnQ1, SchedS0 = #s{bq1 = PrimaryTab0, bq2 = SecondaryTab}) ->
+on_seqno_release(?QOS_1, SnQ1, S, SchedS0 = #s{bq1 = PrimaryTab0, bq2 = SecondaryTab}) ->
     case check_block_status(PrimaryTab0, SecondaryTab, SnQ1, #block.last_seqno_qos2) of
         false ->
             %% This seqno doesn't unlock anything:
             SchedS0;
         {false, Key, PrimaryTab} ->
             %% It was BQ1:
-            to_R(Key, SchedS0#s{bq1 = PrimaryTab});
+            Srs = emqx_persistent_session_ds_state:get_stream(Key, S),
+            to_RU(Key, Srs, SchedS0#s{bq1 = PrimaryTab});
         {true, Key, PrimaryTab} ->
             %% It was BQ12:
             ?tp(sessds_stream_state_trans, #{
@@ -354,14 +361,15 @@ on_seqno_release(?QOS_1, SnQ1, SchedS0 = #s{bq1 = PrimaryTab0, bq2 = SecondaryTa
             }),
             SchedS0#s{bq1 = PrimaryTab}
     end;
-on_seqno_release(?QOS_2, SnQ2, SchedS0 = #s{bq2 = PrimaryTab0, bq1 = SecondaryTab}) ->
+on_seqno_release(?QOS_2, SnQ2, S, SchedS0 = #s{bq2 = PrimaryTab0, bq1 = SecondaryTab}) ->
     case check_block_status(PrimaryTab0, SecondaryTab, SnQ2, #block.last_seqno_qos1) of
         false ->
             %% This seqno doesn't unlock anything:
             SchedS0;
         {false, Key, PrimaryTab} ->
             %% It was BQ2:
-            to_R(Key, SchedS0#s{bq2 = PrimaryTab});
+            Srs = emqx_persistent_session_ds_state:get_stream(Key, S),
+            to_RU(Key, Srs, SchedS0#s{bq2 = PrimaryTab});
         {true, Key, PrimaryTab} ->
             %% It was BQ12:
             ?tp(sessds_stream_state_trans, #{
@@ -500,6 +508,8 @@ is_fully_acked(Srs, S) ->
 ) -> state().
 derive_state(_, _, #srs{unsubscribed = true}) ->
     u;
+derive_state(_, _, #srs{it_end = end_of_stream}) ->
+    u;
 derive_state(Comm1, Comm2, SRS) ->
     case {is_track_acked(?QOS_1, Comm1, SRS), is_track_acked(?QOS_2, Comm2, SRS)} of
         {true, true} -> r;
@@ -511,8 +521,14 @@ derive_state(Comm1, Comm2, SRS) ->
 %% Note: `to_State' functions must be called from a correct state.
 %% They are NOT idempotent, and they don't do full cleanup.
 
--spec to_R(stream_key(), t()) -> t().
-to_R(Key, S = #s{ready = R}) ->
+%% Transfer the stream either to R state if it's pollable or to U
+%% state if it's not.
+-spec to_RU(stream_key(), srs(), t()) -> t().
+to_RU(Key, Srs = #srs{it_end = end_of_stream}, S) ->
+    to_U(Key, Srs, S);
+to_RU(Key, Srs = #srs{unsubscribed = true}, S) ->
+    to_U(Key, Srs, S);
+to_RU(Key, _Srs, S = #s{ready = R}) ->
     ?tp(sessds_stream_state_trans, #{
         key => Key,
         to => r
@@ -616,7 +632,7 @@ ensure_iterator(TopicFilter, StartTime, SubId, SStateId, {{RankX, RankY}, Stream
                     },
                     {
                         emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S),
-                        to_R(Key, SchedS)
+                        to_RU(Key, NewStreamState, SchedS)
                     };
                 {error, Class, Reason} ->
                     ?SLOG(info, #{

+ 137 - 3
apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl

@@ -12,6 +12,8 @@
 -include_lib("emqx/include/asserts.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 
+-include("emqx_persistent_message.hrl").
+
 -import(emqx_common_test_helpers, [on_exit/1]).
 
 -define(DURABLE_SESSION_STATE, emqx_persistent_session).
@@ -27,7 +29,7 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    case emqx_ds_test_helpers:skip_if_norepl() of
+    try emqx_ds_test_helpers:skip_if_norepl() of
         false ->
             TCApps = emqx_cth_suite:start(
                 app_specs(),
@@ -36,6 +38,9 @@ init_per_suite(Config) ->
             [{tc_apps, TCApps} | Config];
         Yes ->
             Yes
+    catch
+        error:undef ->
+            {skip, standalone_not_supported}
     end.
 
 end_per_suite(Config) ->
@@ -45,7 +50,8 @@ end_per_suite(Config) ->
 
 init_per_testcase(TestCase, Config) when
     TestCase =:= t_session_subscription_idempotency;
-    TestCase =:= t_session_unsubscription_idempotency
+    TestCase =:= t_session_unsubscription_idempotency;
+    TestCase =:= t_storage_generations
 ->
     Cluster = cluster(#{n => 1}),
     ClusterOpts = #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)},
@@ -220,6 +226,61 @@ stop_and_commit(Client) ->
 %% Testcases
 %%------------------------------------------------------------------------------
 
+%% This testcase verifies session's behavior related to generation
+%% rotation in the message storage.
+%%
+%% This testcase verifies (on the surface level) that session handles
+%% `end_of_stream' and doesn't violate the ordering of messages that
+%% are split into different generations.
+t_storage_generations(Config) ->
+    [Node1Spec | _] = ?config(node_specs, Config),
+    [Node1] = ?config(nodes, Config),
+    Port = get_mqtt_port(Node1, tcp),
+    TopicFilter = <<"t/+">>,
+    ClientId = mk_clientid(?FUNCTION_NAME, sub),
+    ?check_trace(
+        #{timetrap => 30_000},
+        begin
+            %% Start subscriber:
+            Sub = start_client(#{port => Port, clientid => ClientId, auto_ack => never}),
+            {ok, _} = emqtt:connect(Sub),
+            {ok, _, _} = emqtt:subscribe(Sub, TopicFilter, qos2),
+            %% Start publisher:
+            Pub = start_client(#{port => Port, clientid => mk_clientid(?FUNCTION_NAME, pub)}),
+            {ok, _} = emqtt:connect(Pub),
+            %% Publish 3 messages. Subscriber receives them, but
+            %% doesn't ack them initially.
+            {ok, _} = emqtt:publish(Pub, <<"t/1">>, <<"1">>, ?QOS_1),
+            [#{packet_id := PI1}] = emqx_common_test_helpers:wait_publishes(1, 5_000),
+            {ok, _} = emqtt:publish(Pub, <<"t/2">>, <<"2">>, ?QOS_1),
+            {ok, _} = emqtt:publish(Pub, <<"t/2">>, <<"3">>, ?QOS_1),
+            [#{packet_id := PI2}, #{packet_id := PI3}] = emqx_common_test_helpers:wait_publishes(
+                2, 5_000
+            ),
+            %% Ack the first message. It transfers "t/1" stream into
+            %% ready state where it will be polled.
+            ok = emqtt:puback(Sub, PI1),
+            %% Create a new generation and publish messages to the new
+            %% generation. We expect 1 message for topic t/1 (since it
+            %% was unblocked), but NOT for t/2:
+            ok = emqx_ds:add_generation(?PERSISTENT_MESSAGE_DB),
+            timer:sleep(100),
+            {ok, _} = emqtt:publish(Pub, <<"t/1">>, <<"4">>, ?QOS_1),
+            {ok, _} = emqtt:publish(Pub, <<"t/2">>, <<"5">>, ?QOS_1),
+            [#{packet_id := PI4, payload := <<"4">>}] = emqx_common_test_helpers:wait_publishes(
+                2, 5_000
+            ),
+            %% Ack the rest of messages, it should unblock 5th
+            %% message:
+            ok = emqtt:puback(Sub, PI2),
+            ok = emqtt:puback(Sub, PI3),
+            ok = emqtt:puback(Sub, PI4),
+            [#{packet_id := PI5}] = emqx_common_test_helpers:wait_publishes(1, 5_000)
+        end,
+        [fun check_stream_state_transitions/1]
+    ),
+    ok.
+
 t_session_subscription_idempotency(Config) ->
     [Node1Spec | _] = ?config(node_specs, Config),
     [Node1] = ?config(nodes, Config),
@@ -624,7 +685,7 @@ t_session_gc(Config) ->
             ?assertMatch([_], list_all_subscriptions(Node1), subscriptions),
             ok
         end,
-        []
+        [fun check_stream_state_transitions/1]
     ),
     ok.
 
@@ -732,3 +793,76 @@ t_session_gc_will_message(_Config) ->
         []
     ),
     ok.
+
+%% Trace specifications:
+
+check_stream_state_transitions(Trace) ->
+    %% Check sequence of state transitions for each stream replay
+    %% state:
+    Groups = maps:groups_from_list(
+        fun(#{key := Key, ?snk_meta := #{clientid := ClientId}}) -> {ClientId, Key} end,
+        fun(#{to := To}) -> To end,
+        ?of_kind(sessds_stream_state_trans, Trace)
+    ),
+    ?assert(maps:size(Groups) > 0),
+    maps:foreach(
+        fun(StreamId, Transitions) ->
+            check_stream_state_transitions(StreamId, Transitions, void)
+        end,
+        Groups
+    ).
+
+%% erlfmt-ignore
+check_stream_state_transitions(StreamId, [], _) ->
+    true;
+check_stream_state_transitions(StreamId = {ClientId, Key}, ['$restore', To | Rest], State) ->
+    %% This clause verifies that restored session re-calculates states
+    %% of the streams exactly as they were before.
+    case To of
+        State ->
+            check_stream_state_transitions(StreamId, Rest, State);
+        _ ->
+            error(#{
+                kind => inconsistent_stream_state_after_session_restore,
+                from => State,
+                to => To,
+                clientid => ClientId,
+                key => Key
+            })
+    end;
+check_stream_state_transitions(StreamId = {ClientId, Key}, [To | Rest], State) ->
+    %% See FSM in emqx_persistent_session_ds_stream_scheduler.erl:
+    case {State, To} of
+        {void, r} -> ok;
+        %% R
+        {r, p} -> ok;
+        {r, u} -> ok;
+        %% P
+        {p, r} -> ok;
+        {p, s} -> ok;
+        %% S
+        {s, r} -> ok;
+        {s, u} -> ok;
+        {s, bq1} -> ok;
+        {s, bq2} -> ok;
+        {s, bq12} -> ok;
+        %% BQ1
+        {bq1, u} -> ok;
+        {bq1, r} -> ok;
+        %% BQ2
+        {bq2, u} -> ok;
+        {bq2, r} -> ok;
+        %% BQ12
+        {bq12, u} -> ok;
+        {bq12, bq1} -> ok;
+        {bq12, bq2} -> ok;
+        _ ->
+            error(#{
+                kind => invalid_state_transition,
+                from => State,
+                to => To,
+                clientid => ClientId,
+                key => Key
+            })
+    end,
+    check_stream_state_transitions(StreamId, Rest, To).