Преглед изворни кода

fix(sessds): Fix crash resulting from subscription state update

Fixes: #14039
ieQu1 пре 1 година
родитељ
комит
a1b86dc887

+ 29 - 5
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -1165,7 +1165,7 @@ enqueue_batch(IsReplay, Session = #{s := S}, ClientInfo, StreamKey, ItBegin, Fet
 
 do_enqueue_batch(IsReplay, Session, ClientInfo, StreamKey, Srs0, ItBegin, FetchResult) ->
     #{s := S0, inflight := Inflight0, stream_scheduler_s := SchedS0} = Session,
-    #srs{sub_state_id = SubStateId} = Srs0,
+    {Srs1, SubState} = maybe_update_sub_state_id(IsReplay, Srs0, S0),
     case IsReplay of
         false ->
             %% Normally we assign a new set of sequence
@@ -1182,10 +1182,10 @@ do_enqueue_batch(IsReplay, Session, ClientInfo, StreamKey, Srs0, ItBegin, FetchR
     end,
     case FetchResult of
         {error, _, _} = Error ->
-            {Error, Srs0, Session};
+            {Error, Srs1, Session};
         {ok, end_of_stream} ->
             %% No new messages; just update the end iterator:
-            Srs = Srs0#srs{
+            Srs = Srs1#srs{
                 first_seqno_qos1 = FirstSeqnoQos1,
                 first_seqno_qos2 = FirstSeqnoQos2,
                 last_seqno_qos1 = FirstSeqnoQos1,
@@ -1199,7 +1199,6 @@ do_enqueue_batch(IsReplay, Session, ClientInfo, StreamKey, Srs0, ItBegin, FetchR
             ),
             {ok, Srs, Session#{stream_scheduler_s := SchedS}};
         {ok, ItEnd, Messages} ->
-            SubState = emqx_persistent_session_ds_state:get_subscription_state(SubStateId, S0),
             {Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch(
                 IsReplay,
                 Session,
@@ -1210,7 +1209,7 @@ do_enqueue_batch(IsReplay, Session, ClientInfo, StreamKey, Srs0, ItBegin, FetchR
                 Messages,
                 Inflight0
             ),
-            Srs = Srs0#srs{
+            Srs = Srs1#srs{
                 it_begin = ItBegin,
                 it_end = ItEnd,
                 batch_size = length(Messages),
@@ -1602,6 +1601,31 @@ maybe_set_will_message_timer(#{id := SessionId, s := S}) ->
             ok
     end.
 
+%% If needed, refresh reference to the subscription state in the SRS
+%% and return the updated records:
+-spec maybe_update_sub_state_id(
+    _IsReplay :: boolean(),
+    SRS,
+    emqx_persistent_session_ds_state:t()
+) ->
+    {SRS, emqx_persistent_session_ds_subs:subscription_state()}
+when
+    SRS :: emqx_persistent_session_ds_stream_scheduler:srs().
+maybe_update_sub_state_id(true, SRS = #srs{sub_state_id = SSID}, S) ->
+    %% We use old subscription state during replay to preserve the
+    %% relation between un-acked packet IDs and session sequence
+    %% numbers:
+    SubState = emqx_persistent_session_ds_state:get_subscription_state(SSID, S),
+    {SRS, SubState};
+maybe_update_sub_state_id(false, SRS = #srs{sub_state_id = SSID0}, S) ->
+    case emqx_persistent_session_ds_state:get_subscription_state(SSID0, S) of
+        #{superseded_by := SSID} ->
+            ?tp(sessds_update_srs_ssid, #{old => SSID0, new => SSID, srs => SRS}),
+            maybe_update_sub_state_id(false, SRS#srs{sub_state_id = SSID}, S);
+        #{} = SubState ->
+            {SRS, SubState}
+    end.
+
 -spec ensure_timer(timer(), non_neg_integer(), session()) -> session().
 ensure_timer(Timer, Time, Session) ->
     case Session of

+ 13 - 12
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_subs.erl

@@ -70,7 +70,10 @@
         rap => _,
         subid => _,
         _ => _
-    }
+    },
+    %% Optional field that is added when subscription state becomes
+    %% outdated:
+    superseded_by => subscription_state_id()
 }.
 
 %%================================================================================
@@ -122,14 +125,17 @@ on_subscribe(TopicFilter, SubOpts, #{id := SessionId, s := S0, props := Props})
                 SState ->
                     %% Client resubscribed with the same parameters:
                     {ok, S0};
-                _ ->
+                OldSState ->
                     %% Subsription parameters changed:
                     {SStateId, S1} = emqx_persistent_session_ds_state:new_id(S0),
                     S2 = emqx_persistent_session_ds_state:put_subscription_state(
                         SStateId, SState, S1
                     ),
-                    Sub = Sub0#{current_state => SStateId},
-                    S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Sub, S2),
+                    S3 = emqx_persistent_session_ds_state:put_subscription_state(
+                        SStateId0, OldSState#{superseded_by => SStateId}, S2
+                    ),
+                    Sub = Sub0#{current_state := SStateId},
+                    S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Sub, S3),
                     {ok, S}
             end
     end.
@@ -190,18 +196,13 @@ gc(S0) ->
         S0
     ),
     AliveSet = emqx_persistent_session_ds_state:fold_streams(
-        fun(_StreamId, SRS = #srs{sub_state_id = SStateId}, Acc) ->
-            case emqx_persistent_session_ds_stream_scheduler:is_fully_acked(SRS, S0) of
-                false ->
-                    Acc#{SStateId => true};
-                true ->
-                    Acc
-            end
+        fun(_StreamId, #srs{sub_state_id = SStateId}, Acc) ->
+            Acc#{SStateId => true}
         end,
         AliveSet0,
         S0
     ),
-    %% Delete dangling subscription states:
+    %% Delete subscription states that don't belong to the alive set:
     emqx_persistent_session_ds_state:fold_subscription_states(
         fun(SStateId, _, S) ->
             case maps:is_key(SStateId, AliveSet) of

+ 73 - 3
apps/emqx/test/emqx_persistent_session_ds_SUITE.erl

@@ -51,6 +51,7 @@ end_per_suite(Config) ->
 init_per_testcase(TestCase, Config) when
     TestCase =:= t_session_subscription_idempotency;
     TestCase =:= t_session_unsubscription_idempotency;
+    TestCase =:= t_subscription_state_change;
     TestCase =:= t_storage_generations
 ->
     Cluster = cluster(#{n => 1}),
@@ -93,6 +94,7 @@ init_per_testcase(_TestCase, Config) ->
 end_per_testcase(TestCase, Config) when
     TestCase =:= t_session_subscription_idempotency;
     TestCase =:= t_session_unsubscription_idempotency;
+    TestCase =:= t_subscription_state_change;
     TestCase =:= t_session_gc
 ->
     Nodes = ?config(nodes, Config),
@@ -398,9 +400,7 @@ t_session_unsubscription_idempotency(Config) ->
                     15_000
                 ),
 
-            ok = stop_and_commit(Client1),
-
-            ok
+            ok = stop_and_commit(Client1)
         end,
         fun(_Trace) ->
             Session = session_open(Node1, ClientId),
@@ -413,6 +413,76 @@ t_session_unsubscription_idempotency(Config) ->
     ),
     ok.
 
+%% This testcase verifies that the session handles update of the
+%% subscription settings by the client correctly.
+t_subscription_state_change(Config) ->
+    [Node1Spec | _] = ?config(node_specs, Config),
+    [Node1] = ?config(nodes, Config),
+    Port = get_mqtt_port(Node1, tcp),
+    TopicFilter = <<"t/+">>,
+    ClientId = mk_clientid(?FUNCTION_NAME, sub),
+    %% Helper function that waits for the session GC:
+    WaitGC = fun() ->
+        ?block_until(
+            #{?snk_kind := sessds_renew_streams, ?snk_meta := #{clientid := ClientId}}, 5_000, 0
+        ),
+        timer:sleep(10)
+    end,
+    %% Helper function that gets runtime state of the session:
+    GetS = fun() ->
+        maps:get(s, erpc:call(Node1, emqx_persistent_session_ds, print_session, [ClientId]))
+    end,
+    ?check_trace(
+        #{timetrap => 30_000},
+        begin
+            %% Init:
+            Sub = start_client(#{port => Port, clientid => ClientId, auto_ack => never}),
+            {ok, _} = emqtt:connect(Sub),
+            Pub = start_client(#{port => Port, clientid => mk_clientid(?FUNCTION_NAME, pub)}),
+            {ok, _} = emqtt:connect(Pub),
+            %% Subscribe to the topic using QoS1 initially:
+            {ok, _, _} = emqtt:subscribe(Sub, TopicFilter, ?QOS_1),
+            #{subscriptions := Subs1} = GetS(),
+            #{TopicFilter := #{current_state := SSID1}} = Subs1,
+            %% Fill the storage with some data to create the streams:
+            {ok, _} = emqtt:publish(Pub, <<"t/1">>, <<"1">>, ?QOS_2),
+            [#{packet_id := PI1, qos := ?QOS_1}] = emqx_common_test_helpers:wait_publishes(
+                1, 5_000
+            ),
+            %% Upgrade subscription to QoS2 and wait for the session GC
+            %% to happen. At which point the session should keep 2
+            %% subscription states, one being marked as obsolete:
+            {ok, _, _} = emqtt:subscribe(Sub, TopicFilter, ?QOS_2),
+            WaitGC(),
+            #{subscriptions := Subs2, subscription_states := SStates2} = GetS(),
+            #{TopicFilter := #{current_state := SSID2}} = Subs2,
+            ?assertNotEqual(SSID1, SSID2),
+            ?assertMatch(
+                #{
+                    SSID1 := #{subopts := #{qos := ?QOS_1}, superseded_by := SSID2},
+                    SSID2 := #{subopts := #{qos := ?QOS_2}}
+                },
+                SStates2
+            ),
+            %% Now ack the packet and publish some more to trigger SRS
+            %% update. This should, in effect, release the reference
+            %% to the old subscription state, and let GC will delete
+            %% old subscription state:
+            ok = emqtt:puback(Sub, PI1),
+            {ok, _} = emqtt:publish(Pub, <<"t/1">>, <<"2">>, ?QOS_2),
+            %% QoS of subscription has been updated:
+            [#{packet_id := PI2, qos := ?QOS_2}] = emqx_common_test_helpers:wait_publishes(
+                1, 5_000
+            ),
+            WaitGC(),
+            #{subscriptions := Subs3, subscription_states := SStates3, streams := Streams3} = GetS(),
+            ?assertEqual(Subs3, Subs2),
+            %% Old substate was deleted:
+            ?assertMatch([{SSID2, _}], maps:to_list(SStates3), Streams3)
+        end,
+        []
+    ).
+
 t_session_discard_persistent_to_non_persistent(_Config) ->
     ClientId = atom_to_binary(?FUNCTION_NAME),
     Params = #{

+ 1 - 0
changes/ce/fix-14042.en.md

@@ -0,0 +1 @@
+Fix crash in the durable session after update of the subscription parameters (such as QoS, `no_local`, `upgrade_qos`, ...)