Jelajahi Sumber

fix(sessds): Postpone deletion of the subscription until fully acked

ieQu1 2 tahun lalu
induk
melakukan
3000a8f286

+ 62 - 7
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -117,7 +117,7 @@
     id := subscription_id(),
     start_time := emqx_ds:time(),
     props := map(),
-    extra := map()
+    deleted := boolean()
 }.
 
 -define(TIMER_PULL, timer_pull).
@@ -313,7 +313,8 @@ subscribe(
             Subscription = #{
                 start_time => now_ms(),
                 props => SubOpts,
-                id => SubId
+                id => SubId,
+                deleted => false
             },
             IsNew = true;
         Subscription0 = #{} ->
@@ -343,12 +344,17 @@ unsubscribe(
 
 -spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) ->
     emqx_persistent_session_ds_state:t().
-do_unsubscribe(SessionId, TopicFilter, #{id := SubId}, S0) ->
-    S1 = emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], S0),
+do_unsubscribe(SessionId, TopicFilter, SubMeta0 = #{id := SubId}, S0) ->
+    %% Note: we cannot delete the subscription immediately, since its
+    %% metadata can be used during replay (see `process_batch'). We
+    %% instead mark it as deleted, and let `subscription_gc' function
+    %% dispatch it later:
+    SubMeta = SubMeta0#{deleted => true},
+    S1 = emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], SubMeta, S0),
     ?tp(persistent_session_ds_subscription_delete, #{
         session_id => SessionId, topic_filter => TopicFilter
     }),
-    S = emqx_persistent_session_ds_stream_scheduler:del_subscription(SubId, S1),
+    S = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1),
     ?tp_span(
         persistent_session_ds_subscription_route_delete,
         #{session_id => SessionId, topic_filter => TopicFilter},
@@ -459,7 +465,8 @@ handle_timeout(
     Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1),
     {ok, Publishes, Session};
 handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) ->
-    S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S0),
+    S1 = subscription_gc(S0),
+    S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1),
     Interval = emqx_config:get([session_persistence, renew_streams_interval]),
     Session = emqx_session:ensure_timer(
         ?TIMER_GET_STREAMS,
@@ -502,6 +509,7 @@ replay(ClientInfo, [], Session0 = #{s := S0}) ->
         Session0,
         Streams
     ),
+    logger:error("Replay streams: ~p~n~p", [Streams, Session]),
     %% Note: we filled the buffer with the historical messages, and
     %% from now on we'll rely on the normal inflight/flow control
     %% mechanisms to replay them:
@@ -897,9 +905,43 @@ do_drain_buffer(Inflight0, S0, Acc) ->
 
 %%--------------------------------------------------------------------------------
 
+%% @doc Remove subscriptions that have been marked for deletion, and
+%% that don't have any unacked messages:
+subscription_gc(S0) ->
+    subs_fold_all(
+        fun(TopicFilter, #{id := SubId, deleted := Deleted}, Acc) ->
+            case Deleted andalso has_no_unacked_streams(SubId, S0) of
+                true ->
+                    emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], Acc);
+                false ->
+                    Acc
+            end
+        end,
+        S0,
+        S0
+    ).
+
+has_no_unacked_streams(SubId, S) ->
+    emqx_persistent_session_ds_state:fold_streams(
+        fun
+            ({SID, _Stream}, Srs, Acc) when SID =:= SubId ->
+                emqx_persistent_session_ds_stream_scheduler:is_fully_acked(Srs, S) andalso Acc;
+            (_StreamKey, _Srs, Acc) ->
+                Acc
+        end,
+        true,
+        S
+    ).
+
+%% @doc It only returns subscriptions that haven't been marked for deletion:
 subs_lookup(TopicFilter, S) ->
     Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
-    emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined).
+    case emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined) of
+        #{deleted := true} ->
+            undefined;
+        Sub ->
+            Sub
+    end.
 
 subs_to_map(S) ->
     subs_fold(
@@ -909,6 +951,19 @@ subs_to_map(S) ->
     ).
 
 subs_fold(Fun, AccIn, S) ->
+    subs_fold_all(
+        fun(TopicFilter, Sub = #{deleted := Deleted}, Acc) ->
+            case Deleted of
+                true -> Acc;
+                false -> Fun(TopicFilter, Sub, Acc)
+            end
+        end,
+        AccIn,
+        S
+    ).
+
+%% @doc Iterate over all subscriptions, including the deleted ones:
+subs_fold_all(Fun, AccIn, S) ->
     Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
     emqx_topic_gbt:fold(
         fun(Key, Sub, Acc) -> Fun(emqx_topic_gbt:get_topic(Key), Sub, Acc) end,

+ 30 - 23
apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl

@@ -16,8 +16,8 @@
 -module(emqx_persistent_session_ds_stream_scheduler).
 
 %% API:
--export([find_new_streams/1, find_replay_streams/1]).
--export([renew_streams/1, del_subscription/2]).
+-export([find_new_streams/1, find_replay_streams/1, is_fully_acked/2]).
+-export([renew_streams/1, on_unsubscribe/2]).
 
 %% behavior callbacks:
 -export([]).
@@ -127,30 +127,33 @@ renew_streams(S0) ->
     S1 = remove_unsubscribed_streams(S0),
     S2 = remove_fully_replayed_streams(S1),
     emqx_topic_gbt:fold(
-        fun(Key, _Subscription = #{start_time := StartTime, id := SubId}, Acc) ->
-            TopicFilter = emqx_topic:words(emqx_trie_search:get_topic(Key)),
-            Streams = select_streams(
-                SubId,
-                emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime),
+        fun
+            (Key, _Subscription = #{start_time := StartTime, id := SubId, deleted := false}, Acc) ->
+                TopicFilter = emqx_topic:words(emqx_trie_search:get_topic(Key)),
+                Streams = select_streams(
+                    SubId,
+                    emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime),
+                    Acc
+                ),
+                lists:foldl(
+                    fun(I, Acc1) ->
+                        ensure_iterator(TopicFilter, StartTime, SubId, I, Acc1)
+                    end,
+                    Acc,
+                    Streams
+                );
+            (_Key, _DeletedSubscription, Acc) ->
                 Acc
-            ),
-            lists:foldl(
-                fun(I, Acc1) ->
-                    ensure_iterator(TopicFilter, StartTime, SubId, I, Acc1)
-                end,
-                Acc,
-                Streams
-            )
         end,
         S2,
         emqx_persistent_session_ds_state:get_subscriptions(S2)
     ).
 
--spec del_subscription(
+-spec on_unsubscribe(
     emqx_persistent_session_ds:subscription_id(), emqx_persistent_session_ds_state:t()
 ) ->
     emqx_persistent_session_ds_state:t().
-del_subscription(SubId, S0) ->
+on_unsubscribe(SubId, S0) ->
     %% NOTE: this function only marks the streams for deletion,
     %% instead of outright deleting them.
     %%
@@ -170,13 +173,13 @@ del_subscription(SubId, S0) ->
     %% `renew_streams', when it detects that all in-flight messages
     %% from the stream have been acked by the client.
     emqx_persistent_session_ds_state:fold_streams(
-        fun(Key, ReplayState, Acc) ->
+        fun(Key, Srs, Acc) ->
             case Key of
                 {SubId, _Stream} ->
                     %% This stream belongs to a deleted subscription.
                     %% Mark for deletion:
                     emqx_persistent_session_ds_state:put_stream(
-                        Key, ReplayState#srs{unsubscribed = true}, Acc
+                        Key, Srs#srs{unsubscribed = true}, Acc
                     );
                 _ ->
                     Acc
@@ -186,6 +189,14 @@ del_subscription(SubId, S0) ->
         S0
     ).
 
+-spec is_fully_acked(
+    emqx_persistent_session_ds:stream_state(), emqx_persistent_session_ds_state:t()
+) -> boolean().
+is_fully_acked(Srs, S) ->
+    CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
+    CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
+    is_fully_acked(CommQos1, CommQos2, Srs).
+
 %%================================================================================
 %% Internal functions
 %%================================================================================
@@ -207,10 +218,6 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
                 it_end = Iterator
             },
             emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S);
-        SRS = #srs{unsubscribed = true} ->
-            %% The session resubscribed to the stream after
-            %% unsubscribing. Spare the stream:
-            emqx_persistent_session_ds_state:put_stream(Key, SRS#srs{unsubscribed = false}, S);
         #srs{} ->
             S
     end.

+ 78 - 1
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -278,7 +278,10 @@ publish_many(Messages) ->
 
 publish_many(Messages, WaitForUnregister) ->
     Fun = fun(Client, Message) ->
-        {ok, _} = emqtt:publish(Client, Message)
+        case emqtt:publish(Client, Message) of
+            ok -> ok;
+            {ok, _} -> ok
+        end
     end,
     do_publish(Messages, Fun, WaitForUnregister).
 
@@ -1026,6 +1029,80 @@ t_unsubscribe(Config) ->
     ?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
     ok = emqtt:disconnect(Client).
 
+%% This testcase verifies that un-acked messages that were once sent
+%% to the client are still retransmitted after the session
+%% unsubscribes from the topic and reconnects.
+t_unsubscribe_replay(Config) ->
+    ConnFun = ?config(conn_fun, Config),
+    TopicPrefix = ?config(topic, Config),
+    ClientId = atom_to_binary(?FUNCTION_NAME),
+    ClientOpts = [
+        {proto_ver, v5},
+        {clientid, ClientId},
+        {properties, #{'Session-Expiry-Interval' => 30, 'Receive-Maximum' => 10}},
+        {max_inflight, 10}
+        | Config
+    ],
+    {ok, Sub} = emqtt:start_link([{clean_start, true}, {auto_ack, never} | ClientOpts]),
+    {ok, _} = emqtt:ConnFun(Sub),
+    %% 1. Make two subscriptions, one is to be deleted:
+    Topic1 = iolist_to_binary([TopicPrefix, $/, <<"unsub">>]),
+    Topic2 = iolist_to_binary([TopicPrefix, $/, <<"sub">>]),
+    ?assertMatch({ok, _, _}, emqtt:subscribe(Sub, Topic1, qos2)),
+    ?assertMatch({ok, _, _}, emqtt:subscribe(Sub, Topic2, qos2)),
+    %% 2. Publish 2 messages to the first and second topics each
+    %% (client doesn't ack them):
+    ok = publish(Topic1, <<"1">>, ?QOS_1),
+    ok = publish(Topic1, <<"2">>, ?QOS_2),
+    [Msg1, Msg2] = receive_messages(2),
+    ?assertMatch(
+        [
+            #{payload := <<"1">>},
+            #{payload := <<"2">>}
+        ],
+        [Msg1, Msg2]
+    ),
+    ok = publish(Topic2, <<"3">>, ?QOS_1),
+    ok = publish(Topic2, <<"4">>, ?QOS_2),
+    [Msg3, Msg4] = receive_messages(2),
+    ?assertMatch(
+        [
+            #{payload := <<"3">>},
+            #{payload := <<"4">>}
+        ],
+        [Msg3, Msg4]
+    ),
+    %% 3. Unsubscribe from the topic and disconnect:
+    ?assertMatch({ok, _, _}, emqtt:unsubscribe(Sub, Topic1)),
+    ok = emqtt:disconnect(Sub),
+    %% 5. Publish more messages to the disconnected topic:
+    ok = publish(Topic1, <<"5">>, ?QOS_1),
+    ok = publish(Topic1, <<"6">>, ?QOS_2),
+    %% 4. Reconnect the client. It must only receive only four
+    %% messages from the time when it was subscribed:
+    {ok, Sub1} = emqtt:start_link([{clean_start, false}, {auto_ack, true} | ClientOpts]),
+    ?assertMatch({ok, _}, emqtt:ConnFun(Sub1)),
+    %% Note: we ask for 6 messages, but expect only 4, it's
+    %% intentional:
+    ?assertMatch(
+        #{
+            Topic1 := [<<"1">>, <<"2">>],
+            Topic2 := [<<"3">>, <<"4">>]
+        },
+        get_topicwise_order(receive_messages(6, 5_000)),
+        debug_info(ClientId)
+    ),
+    %% 5. Now let's resubscribe, and check that the session can receive new messages:
+    ?assertMatch({ok, _, _}, emqtt:subscribe(Sub1, Topic1, qos2)),
+    ok = publish(Topic1, <<"7">>, ?QOS_0),
+    ok = publish(Topic1, <<"8">>, ?QOS_1),
+    ok = publish(Topic1, <<"9">>, ?QOS_2),
+    ?assertMatch(
+        [<<"7">>, <<"8">>, <<"9">>],
+        lists:map(fun get_msgpub_payload/1, receive_messages(3))
+    ),
+    ok = emqtt:disconnect(Sub1).
+
 t_multiple_subscription_matches(Config) ->
     ConnFun = ?config(conn_fun, Config),
     Topic = ?config(topic, Config),