Pārlūkot izejas kodu

Merge pull request #12489 from ieQu1/dev/ds-retainer

Durable sessions support retained messages
ieQu1 2 gadi atpakaļ
vecāks
revīzija
ab2a469aff

+ 71 - 26
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -241,8 +241,10 @@ info(mqueue_dropped, _Session) ->
 %%     seqno_diff(?QOS_2, ?rec, ?committed(?QOS_2), S);
 info(awaiting_rel_max, #{props := Conf}) ->
     maps:get(max_awaiting_rel, Conf);
-info(await_rel_timeout, #{props := Conf}) ->
-    maps:get(await_rel_timeout, Conf).
+info(await_rel_timeout, #{props := _Conf}) ->
+    %% TODO: currently this setting is ignored:
+    %% maps:get(await_rel_timeout, Conf).
+    0.
 
 -spec stats(session()) -> emqx_types:stats().
 stats(Session) ->
@@ -389,7 +391,7 @@ publish(_PacketId, Msg, Session) ->
 puback(_ClientInfo, PacketId, Session0) ->
     case update_seqno(puback, PacketId, Session0) of
         {ok, Msg, Session} ->
-            {ok, Msg, [], inc_send_quota(Session)};
+            {ok, Msg, [], pull_now(Session)};
         Error ->
             Error
     end.
@@ -429,7 +431,7 @@ pubrel(_PacketId, Session = #{}) ->
 pubcomp(_ClientInfo, PacketId, Session0) ->
     case update_seqno(pubcomp, PacketId, Session0) of
         {ok, Msg, Session} ->
-            {ok, Msg, [], inc_send_quota(Session)};
+            {ok, Msg, [], pull_now(Session)};
         Error = {error, _} ->
             Error
     end.
@@ -438,9 +440,13 @@ pubcomp(_ClientInfo, PacketId, Session0) ->
 
 -spec deliver(clientinfo(), [emqx_types:deliver()], session()) ->
     {ok, replies(), session()}.
-deliver(_ClientInfo, _Delivers, Session) ->
-    %% TODO: system messages end up here.
-    {ok, [], Session}.
+deliver(ClientInfo, Delivers, Session0) ->
+    %% Durable sessions still have to handle some transient messages.
+    %% For example, retainer sends messages to the session directly.
+    Session = lists:foldl(
+        fun(Msg, Acc) -> enqueue_transient(ClientInfo, Msg, Acc) end, Session0, Delivers
+    ),
+    {ok, [], pull_now(Session)}.
 
 -spec handle_timeout(clientinfo(), _Timeout, session()) ->
     {ok, replies(), session()} | {ok, replies(), timeout(), session()}.
@@ -481,8 +487,8 @@ handle_timeout(_ClientInfo, #req_sync{from = From, ref = Ref}, Session = #{s :=
     S = emqx_persistent_session_ds_state:commit(S0),
     From ! Ref,
     {ok, [], Session#{s => S}};
-handle_timeout(_ClientInfo, expire_awaiting_rel, Session) ->
-    %% TODO: stub
+handle_timeout(_ClientInfo, Timeout, Session) ->
+    ?SLOG(warning, #{msg => "unknown_ds_timeout", timeout => Timeout}),
     {ok, [], Session}.
 
 bump_last_alive(S0) ->
@@ -871,6 +877,48 @@ process_batch(
         IsReplay, Session, ClientInfo, LastSeqNoQos1, LastSeqNoQos2, Messages, Inflight
     ).
 
+%%--------------------------------------------------------------------
+%% Transient messages
+%%--------------------------------------------------------------------
+
+enqueue_transient(ClientInfo, Msg0, Session = #{s := S, props := #{upgrade_qos := UpgradeQoS}}) ->
+    %% TODO: Such messages won't be retransmitted, should the session
+    %% reconnect before transient messages are acked.
+    %%
+    %% Proper solution could look like this: session publishes
+    %% transient messages to a separate DS DB that serves as a queue,
+    %% then subscribes to a special system topic that contains the
+    %% queued messages. Since streams in this DB are exclusive to the
+    %% session, messages from the queue can be dropped as soon as they
+    %% are acked.
+    Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
+    Msgs = [
+        Msg
+     || SubMatch <- emqx_topic_gbt:matches(Msg0#message.topic, Subs, []),
+        Msg <- begin
+            #{props := SubOpts} = emqx_topic_gbt:get_record(SubMatch, Subs),
+            emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS)
+        end
+    ],
+    lists:foldl(fun do_enqueue_transient/2, Session, Msgs).
+
+do_enqueue_transient(Msg = #message{qos = Qos}, Session = #{inflight := Inflight0, s := S0}) ->
+    case Qos of
+        ?QOS_0 ->
+            S = S0,
+            Inflight = emqx_persistent_session_ds_inflight:push({undefined, Msg}, Inflight0);
+        QoS when QoS =:= ?QOS_1; QoS =:= ?QOS_2 ->
+            SeqNo = inc_seqno(
+                QoS, emqx_persistent_session_ds_state:get_seqno(?next(QoS), S0)
+            ),
+            S = emqx_persistent_session_ds_state:put_seqno(?next(QoS), SeqNo, S0),
+            Inflight = emqx_persistent_session_ds_inflight:push({SeqNo, Msg}, Inflight0)
+    end,
+    Session#{
+        inflight => Inflight,
+        s => S
+    }.
+
 %%--------------------------------------------------------------------
 %% Buffer drain
 %%--------------------------------------------------------------------
@@ -907,11 +955,6 @@ ensure_timers(Session0) ->
     Session2 = emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session1),
     emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, 100, Session2).
 
--spec inc_send_quota(session()) -> session().
-inc_send_quota(Session = #{inflight := Inflight0}) ->
-    Inflight = emqx_persistent_session_ds_inflight:inc_send_quota(Inflight0),
-    pull_now(Session#{inflight => Inflight}).
-
 -spec pull_now(session()) -> session().
 pull_now(Session) ->
     emqx_session:reset_timer(?TIMER_PULL, 0, Session).
@@ -957,26 +1000,28 @@ try_get_live_session(ClientId) ->
 
 -spec update_seqno(puback | pubrec | pubcomp, emqx_types:packet_id(), session()) ->
     {ok, emqx_types:message(), session()} | {error, _}.
-update_seqno(Track, PacketId, Session = #{id := SessionId, s := S}) ->
+update_seqno(Track, PacketId, Session = #{id := SessionId, s := S, inflight := Inflight0}) ->
     SeqNo = packet_id_to_seqno(PacketId, S),
     case Track of
         puback ->
-            QoS = ?QOS_1,
-            SeqNoKey = ?committed(?QOS_1);
+            SeqNoKey = ?committed(?QOS_1),
+            Result = emqx_persistent_session_ds_inflight:puback(SeqNo, Inflight0);
         pubrec ->
-            QoS = ?QOS_2,
-            SeqNoKey = ?rec;
+            SeqNoKey = ?rec,
+            Result = emqx_persistent_session_ds_inflight:pubrec(SeqNo, Inflight0);
         pubcomp ->
-            QoS = ?QOS_2,
-            SeqNoKey = ?committed(?QOS_2)
+            SeqNoKey = ?committed(?QOS_2),
+            Result = emqx_persistent_session_ds_inflight:pubcomp(SeqNo, Inflight0)
     end,
-    Current = emqx_persistent_session_ds_state:get_seqno(SeqNoKey, S),
-    case inc_seqno(QoS, Current) of
-        SeqNo ->
+    case Result of
+        {ok, Inflight} ->
             %% TODO: we pass a bogus message into the hook:
             Msg = emqx_message:make(SessionId, <<>>, <<>>),
-            {ok, Msg, Session#{s => emqx_persistent_session_ds_state:put_seqno(SeqNoKey, SeqNo, S)}};
-        Expected ->
+            {ok, Msg, Session#{
+                s => emqx_persistent_session_ds_state:put_seqno(SeqNoKey, SeqNo, S),
+                inflight => Inflight
+            }};
+        {error, Expected} ->
             ?SLOG(warning, #{
                 msg => "out-of-order_commit",
                 track => Track,

+ 221 - 14
apps/emqx/src/emqx_persistent_session_ds_inflight.erl

@@ -22,7 +22,9 @@
     pop/1,
     n_buffered/2,
     n_inflight/1,
-    inc_send_quota/1,
+    puback/2,
+    pubrec/2,
+    pubcomp/2,
     receive_maximum/1
 ]).
 
@@ -34,13 +36,28 @@
 -include("emqx.hrl").
 -include("emqx_mqtt.hrl").
 
+-ifdef(TEST).
+-include_lib("proper/include/proper.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
 %%================================================================================
 %% Type declarations
 %%================================================================================
 
+-type payload() ::
+    {emqx_persistent_session_ds:seqno() | undefined, emqx_types:message()}
+    | {pubrel, emqx_persistent_session_ds:seqno()}.
+
 -record(inflight, {
-    queue :: queue:queue(),
     receive_maximum :: pos_integer(),
+    %% Main queue:
+    queue :: queue:queue(payload()),
+    %% Queues that are used to track sequence numbers of ack tracks:
+    puback_queue :: iqueue(),
+    pubrec_queue :: iqueue(),
+    pubcomp_queue :: iqueue(),
+    %% Counters:
     n_inflight = 0 :: non_neg_integer(),
     n_qos0 = 0 :: non_neg_integer(),
     n_qos1 = 0 :: non_neg_integer(),
@@ -49,17 +66,19 @@
 
 -type t() :: #inflight{}.
 
--type payload() ::
-    {emqx_persistent_session_ds:seqno() | undefined, emqx_types:message()}
-    | {pubrel, emqx_persistent_session_ds:seqno()}.
-
 %%================================================================================
 %% API funcions
 %%================================================================================
 
 -spec new(non_neg_integer()) -> t().
 new(ReceiveMaximum) when ReceiveMaximum > 0 ->
-    #inflight{queue = queue:new(), receive_maximum = ReceiveMaximum}.
+    #inflight{
+        receive_maximum = ReceiveMaximum,
+        queue = queue:new(),
+        puback_queue = iqueue_new(),
+        pubrec_queue = iqueue_new(),
+        pubcomp_queue = iqueue_new()
+    }.
 
 -spec receive_maximum(t()) -> pos_integer().
 receive_maximum(#inflight{receive_maximum = ReceiveMaximum}) ->
@@ -86,6 +105,9 @@ pop(Rec0) ->
         receive_maximum = ReceiveMaximum,
         n_inflight = NInflight,
         queue = Q0,
+        puback_queue = QAck,
+        pubrec_queue = QRec,
+        pubcomp_queue = QComp,
         n_qos0 = NQos0,
         n_qos1 = NQos1,
         n_qos2 = NQos2
@@ -96,17 +118,24 @@ pop(Rec0) ->
                 case Payload of
                     {pubrel, _} ->
                         Rec0#inflight{queue = Q};
-                    {_, #message{qos = Qos}} ->
+                    {SeqNo, #message{qos = Qos}} ->
                         case Qos of
                             ?QOS_0 ->
                                 Rec0#inflight{queue = Q, n_qos0 = NQos0 - 1};
                             ?QOS_1 ->
                                 Rec0#inflight{
-                                    queue = Q, n_qos1 = NQos1 - 1, n_inflight = NInflight + 1
+                                    queue = Q,
+                                    n_qos1 = NQos1 - 1,
+                                    n_inflight = NInflight + 1,
+                                    puback_queue = ipush(SeqNo, QAck)
                                 };
                             ?QOS_2 ->
                                 Rec0#inflight{
-                                    queue = Q, n_qos2 = NQos2 - 1, n_inflight = NInflight + 1
+                                    queue = Q,
+                                    n_qos2 = NQos2 - 1,
+                                    n_inflight = NInflight + 1,
+                                    pubrec_queue = ipush(SeqNo, QRec),
+                                    pubcomp_queue = ipush(SeqNo, QComp)
                                 }
                         end
                 end,
@@ -129,12 +158,190 @@ n_buffered(all, #inflight{n_qos0 = NQos0, n_qos1 = NQos1, n_qos2 = NQos2}) ->
 n_inflight(#inflight{n_inflight = NInflight}) ->
     NInflight.
 
+-spec puback(emqx_persistent_session_ds:seqno(), t()) -> {ok, t()} | {error, Expected} when
+    Expected :: emqx_persistent_session_ds:seqno() | undefined.
+puback(SeqNo, Rec = #inflight{puback_queue = Q0, n_inflight = N}) ->
+    case ipop(Q0) of
+        {{value, SeqNo}, Q} ->
+            {ok, Rec#inflight{
+                puback_queue = Q,
+                n_inflight = max(0, N - 1)
+            }};
+        {{value, Expected}, _} ->
+            {error, Expected};
+        _ ->
+            {error, undefined}
+    end.
+
+-spec pubcomp(emqx_persistent_session_ds:seqno(), t()) -> {ok, t()} | {error, Expected} when
+    Expected :: emqx_persistent_session_ds:seqno() | undefined.
+pubcomp(SeqNo, Rec = #inflight{pubcomp_queue = Q0, n_inflight = N}) ->
+    case ipop(Q0) of
+        {{value, SeqNo}, Q} ->
+            {ok, Rec#inflight{
+                pubcomp_queue = Q,
+                n_inflight = max(0, N - 1)
+            }};
+        {{value, Expected}, _} ->
+            {error, Expected};
+        _ ->
+            {error, undefined}
+    end.
+
+%% PUBREC doesn't affect inflight window:
 %% https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Flow_Control
--spec inc_send_quota(t()) -> t().
-inc_send_quota(Rec = #inflight{n_inflight = NInflight0}) ->
-    NInflight = max(NInflight0 - 1, 0),
-    Rec#inflight{n_inflight = NInflight}.
+-spec pubrec(emqx_persistent_session_ds:seqno(), t()) -> {ok, t()} | {error, Expected} when
+    Expected :: emqx_persistent_session_ds:seqno() | undefined.
+pubrec(SeqNo, Rec = #inflight{pubrec_queue = Q0}) ->
+    case ipop(Q0) of
+        {{value, SeqNo}, Q} ->
+            {ok, Rec#inflight{
+                pubrec_queue = Q
+            }};
+        {{value, Expected}, _} ->
+            {error, Expected};
+        _ ->
+            {error, undefined}
+    end.
 
 %%================================================================================
 %% Internal functions
 %%================================================================================
+
+%%%% Interval queue:
+
+%% "Interval queue": a data structure that represents a queue of
+%% monotonically increasing non-negative integers in a compact manner.
+%% It is functionally equivalent to a `queue:queue(integer())'.
+-record(iqueue, {
+    %% Head interval:
+    head = 0 :: integer(),
+    head_end = 0 :: integer(),
+    %% Intermediate ranges:
+    queue :: queue:queue({integer(), integer()}),
+    %% End interval:
+    tail = 0 :: integer(),
+    tail_end = 0 :: integer()
+}).
+
+-type iqueue() :: #iqueue{}.
+
+iqueue_new() ->
+    #iqueue{
+        queue = queue:new()
+    }.
+
+%% @doc Push a value into the interval queue:
+-spec ipush(integer(), iqueue()) -> iqueue().
+ipush(Val, Q = #iqueue{tail_end = Val, head_end = Val}) ->
+    %% Optimization: head and tail intervals overlap, and the newly
+    %% inserted value extends both. Attach it to both intervals, to
+    %% avoid `queue:out' in `ipop':
+    Q#iqueue{
+        tail_end = Val + 1,
+        head_end = Val + 1
+    };
+ipush(Val, Q = #iqueue{tail_end = Val}) ->
+    %% Extend tail interval:
+    Q#iqueue{
+        tail_end = Val + 1
+    };
+ipush(Val, Q = #iqueue{tail = Tl, tail_end = End, queue = IQ0}) when is_number(Val), Val > End ->
+    IQ = queue:in({Tl, End}, IQ0),
+    %% Begin a new interval:
+    Q#iqueue{
+        queue = IQ,
+        tail = Val,
+        tail_end = Val + 1
+    }.
+
+-spec ipop(iqueue()) -> {{value, integer()}, iqueue()} | {empty, iqueue()}.
+ipop(Q = #iqueue{head = Hd, head_end = HdEnd}) when Hd < HdEnd ->
+    %% Head interval is not empty. Consume a value from it:
+    {{value, Hd}, Q#iqueue{head = Hd + 1}};
+ipop(Q = #iqueue{head_end = End, tail_end = End}) ->
+    %% Head interval is fully consumed, and it's overlaps with the
+    %% tail interval. It means the queue is empty:
+    {empty, Q};
+ipop(Q = #iqueue{head = Hd0, tail = Tl, tail_end = TlEnd, queue = IQ0}) ->
+    %% Head interval is fully consumed, and it doesn't overlap with
+    %% the tail interval. Replace the head interval with the next
+    %% interval from the queue or with the tail interval:
+    case queue:out(IQ0) of
+        {{value, {Hd, HdEnd}}, IQ} ->
+            ipop(Q#iqueue{head = max(Hd0, Hd), head_end = HdEnd, queue = IQ});
+        {empty, _} ->
+            ipop(Q#iqueue{head = max(Hd0, Tl), head_end = TlEnd})
+    end.
+
+-ifdef(TEST).
+
+%% Test that behavior of iqueue is identical to that of a regular queue of integers:
+iqueue_compat_test_() ->
+    Props = [iqueue_compat()],
+    Opts = [{numtests, 1000}, {to_file, user}, {max_size, 100}],
+    {timeout, 30, [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}.
+
+%% Generate a sequence of pops and pushes with monotonically
+%% increasing arguments, and verify replaying produces equivalent
+%% results for the optimized and the reference implementation:
+iqueue_compat() ->
+    ?FORALL(
+        Cmds,
+        iqueue_commands(),
+        begin
+            lists:foldl(
+                fun
+                    ({push, N}, {IQ, Q, Acc}) ->
+                        {ipush(N, IQ), queue:in(N, Q), [N | Acc]};
+                    (pop, {IQ0, Q0, Acc}) ->
+                        {Ret, IQ} = ipop(IQ0),
+                        {Expected, Q} = queue:out(Q0),
+                        ?assertEqual(
+                            Expected,
+                            Ret,
+                            #{
+                                sequence => lists:reverse(Acc),
+                                q => queue:to_list(Q0),
+                                iq0 => iqueue_print(IQ0),
+                                iq => iqueue_print(IQ)
+                            }
+                        ),
+                        {IQ, Q, [pop | Acc]}
+                end,
+                {iqueue_new(), queue:new(), []},
+                Cmds
+            ),
+            true
+        end
+    ).
+
+iqueue_cmd() ->
+    oneof([
+        pop,
+        {push, range(1, 3)}
+    ]).
+
+iqueue_commands() ->
+    ?LET(
+        Cmds,
+        list(iqueue_cmd()),
+        process_test_cmds(Cmds, 0)
+    ).
+
+process_test_cmds([], _) ->
+    [];
+process_test_cmds([pop | Tl], Cnt) ->
+    [pop | process_test_cmds(Tl, Cnt)];
+process_test_cmds([{push, N} | Tl], Cnt0) ->
+    Cnt = Cnt0 + N,
+    [{push, Cnt} | process_test_cmds(Tl, Cnt)].
+
+iqueue_print(I = #iqueue{head = Hd, head_end = HdEnd, queue = Q, tail = Tl, tail_end = TlEnd}) ->
+    #{
+        hd => {Hd, HdEnd},
+        tl => {Tl, TlEnd},
+        q => queue:to_list(Q)
+    }.
+
+-endif.

+ 97 - 2
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -53,7 +53,7 @@ all() ->
 
 groups() ->
     TCs = emqx_common_test_helpers:all(?MODULE),
-    TCsNonGeneric = [t_choose_impl],
+    TCsNonGeneric = [t_choose_impl, t_transient],
     TCGroups = [{group, tcp}, {group, quic}, {group, ws}],
     [
         {persistence_disabled, TCGroups},
@@ -265,7 +265,15 @@ messages(Topic, Payloads) ->
     messages(Topic, Payloads, ?QOS_2).
 
 messages(Topic, Payloads, QoS) ->
-    [#mqtt_msg{topic = Topic, payload = P, qos = QoS} || P <- Payloads].
+    lists:map(
+        fun
+            (Bin) when is_binary(Bin) ->
+                #mqtt_msg{topic = Topic, payload = Bin, qos = QoS};
+            (Msg = #mqtt_msg{}) ->
+                Msg#mqtt_msg{topic = Topic}
+        end,
+        Payloads
+    ).
 
 publish(Topic, Payload) ->
     publish(Topic, Payload, ?QOS_2).
@@ -1103,6 +1111,93 @@ t_unsubscribe_replay(Config) ->
     ),
     ok = emqtt:disconnect(Sub1).
 
+%% This testcase verifies that persistent sessions handle "transient"
+%% mesages correctly.
+%%
+%% Transient messages are delivered to the channel directly, bypassing
+%% the broker code that decides whether the messages should be
+%% persisted or not, and therefore they are not persisted.
+%%
+%% `emqx_retainer' is an example of application that uses this
+%% mechanism.
+%%
+%% This testcase creates the conditions when the transient messages
+%% appear in the middle of the replay, to make sure the durable
+%% session doesn't get confused and/or stuck if retained messages are
+%% changed while the session was down.
+t_transient(Config) ->
+    ConnFun = ?config(conn_fun, Config),
+    TopicPrefix = ?config(topic, Config),
+    ClientId = atom_to_binary(?FUNCTION_NAME),
+    ClientOpts = [
+        {proto_ver, v5},
+        {clientid, ClientId},
+        {properties, #{'Session-Expiry-Interval' => 30, 'Receive-Maximum' => 100}},
+        {max_inflight, 100}
+        | Config
+    ],
+    Deliver = fun(Topic, Payload, QoS) ->
+        [Pid] = emqx_cm:lookup_channels(ClientId),
+        Msg = emqx_message:make(_From = <<"test">>, QoS, Topic, Payload),
+        Pid ! {deliver, Topic, Msg}
+    end,
+    Topic1 = <<TopicPrefix/binary, "/1">>,
+    Topic2 = <<TopicPrefix/binary, "/2">>,
+    Topic3 = <<TopicPrefix/binary, "/3">>,
+    %% 1. Start the client and subscribe to the topic:
+    {ok, Sub} = emqtt:start_link([{clean_start, true}, {auto_ack, never} | ClientOpts]),
+    ?assertMatch({ok, _}, emqtt:ConnFun(Sub)),
+    ?assertMatch({ok, _, _}, emqtt:subscribe(Sub, <<TopicPrefix/binary, "/#">>, qos2)),
+    %% 2. Publish regular messages:
+    publish(Topic1, <<"1">>, ?QOS_1),
+    publish(Topic1, <<"2">>, ?QOS_2),
+    Msgs1 = receive_messages(2),
+    [#{payload := <<"1">>, packet_id := PI1}, #{payload := <<"2">>, packet_id := PI2}] = Msgs1,
+    %% 3. Publish and recieve transient messages:
+    Deliver(Topic2, <<"3">>, ?QOS_0),
+    Deliver(Topic2, <<"4">>, ?QOS_1),
+    Deliver(Topic2, <<"5">>, ?QOS_2),
+    Msgs2 = receive_messages(3),
+    ?assertMatch(
+        [
+            #{payload := <<"3">>, qos := ?QOS_0},
+            #{payload := <<"4">>, qos := ?QOS_1},
+            #{payload := <<"5">>, qos := ?QOS_2}
+        ],
+        Msgs2
+    ),
+    %% 4. Publish more regular messages:
+    publish(Topic3, <<"6">>, ?QOS_1),
+    publish(Topic3, <<"7">>, ?QOS_2),
+    Msgs3 = receive_messages(2),
+    [#{payload := <<"6">>, packet_id := PI6}, #{payload := <<"7">>, packet_id := PI7}] = Msgs3,
+    %% 5. Reconnect the client:
+    ok = emqtt:disconnect(Sub),
+    {ok, Sub1} = emqtt:start_link([{clean_start, false}, {auto_ack, true} | ClientOpts]),
+    ?assertMatch({ok, _}, emqtt:ConnFun(Sub1)),
+    %% 6. Recieve the historic messages and check that their packet IDs didn't change:
+    %% Note: durable session currenty WON'T replay transient messages.
+    ProcessMessage = fun(#{payload := P, packet_id := ID}) -> {ID, P} end,
+    ?assertMatch(
+        #{
+            Topic1 := [{PI1, <<"1">>}, {PI2, <<"2">>}],
+            Topic3 := [{PI6, <<"6">>}, {PI7, <<"7">>}]
+        },
+        maps:groups_from_list(fun get_msgpub_topic/1, ProcessMessage, receive_messages(7, 5_000))
+    ),
+    %% 7. Finish off by sending messages to all the topics to make
+    %% sure none of the streams are blocked:
+    [publish(T, <<"fin">>, ?QOS_2) || T <- [Topic1, Topic2, Topic3]],
+    ?assertMatch(
+        #{
+            Topic1 := [<<"fin">>],
+            Topic2 := [<<"fin">>],
+            Topic3 := [<<"fin">>]
+        },
+        get_topicwise_order(receive_messages(3))
+    ),
+    ok = emqtt:disconnect(Sub1).
+
 t_multiple_subscription_matches(Config) ->
     ConnFun = ?config(conn_fun, Config),
     Topic = ?config(topic, Config),