Просмотр исходного кода

feat(ds): Replay QoS1 messages

ieQu1 2 лет назад
Родитель
Сommit
2de79dd9ac

+ 1 - 1
Makefile

@@ -85,7 +85,7 @@ $(REL_PROFILES:%=%-compile): $(REBAR) merge-config
 
 .PHONY: ct
 ct: $(REBAR) merge-config
-	@$(REBAR) ct --name $(CT_NODE_NAME) -c -v --cover_export_name $(CT_COVER_EXPORT_PREFIX)-ct
+	ENABLE_COVER_COMPILE=1 $(REBAR) ct --name $(CT_NODE_NAME) -c -v --cover_export_name $(CT_COVER_EXPORT_PREFIX)-ct
 
 ## only check bpapi for enterprise profile because it's a super-set.
 .PHONY: static_checks

+ 207 - 0
apps/emqx/src/emqx_persistent_message_ds_replayer.erl

@@ -0,0 +1,207 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% @doc This module implements the routines for replaying streams of
+%% messages.
+-module(emqx_persistent_message_ds_replayer).
+
+%% API:
+-export([new/0, next_packet_id/1, replay/2, commit_offset/3, poll/3]).
+
+%% internal exports:
+-export([]).
+
+-export_type([inflight/0]).
+
+-include("emqx_persistent_session_ds.hrl").
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+%% Note: sequence numbers are monotonic; they don't wrap around:
+-type seqno() :: non_neg_integer().
+
+-record(range, {
+    stream :: emqx_ds:stream(),
+    first :: seqno(),
+    last :: seqno(),
+    iterator_next :: emqx_ds:iterator() | undefined
+}).
+
+-type range() :: #range{}.
+
+-record(inflight, {
+    next_seqno = 0 :: seqno(),
+    acked_seqno = 0 :: seqno(),
+    offset_ranges = [] :: [range()]
+}).
+
+-opaque inflight() :: #inflight{}.
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+-spec new() -> inflight().
+new() ->
+    #inflight{}.
+
+-spec next_packet_id(inflight()) -> {emqx_types:packet_id(), inflight()}.
+next_packet_id(Inflight0 = #inflight{next_seqno = LastSeqno}) ->
+    Inflight = Inflight0#inflight{next_seqno = LastSeqno + 1},
+    {seqno_to_packet_id(LastSeqno), Inflight}.
+
+-spec replay(emqx_persistent_session_ds:id(), inflight()) ->
+    emqx_session:replies().
+replay(_SessionId, _Inflight = #inflight{offset_ranges = _Ranges}) ->
+    [].
+
+-spec commit_offset(emqx_persistent_session_ds:id(), emqx_types:packet_id(), inflight()) ->
+    {_IsValidOffset :: boolean(), inflight()}.
+commit_offset(SessionId, PacketId, Inflight0 = #inflight{acked_seqno = AckedSeqno0, next_seqno = NextSeqNo, offset_ranges = Ranges0}) ->
+    AckedSeqno = packet_id_to_seqno(NextSeqNo, PacketId),
+    true = AckedSeqno0 < AckedSeqno,
+    Ranges = lists:filter(
+        fun(#range{stream = Stream, last = LastSeqno, iterator_next = ItNext}) ->
+            case LastSeqno =< AckedSeqno of
+                true ->
+                    %% This range has been fully
+                    %% acked. Remove it and replace saved
+                    %% iterator with the trailing iterator.
+                    update_iterator(SessionId, Stream, ItNext),
+                    false;
+                false ->
+                    %% This range still has unacked
+                    %% messages:
+                    true
+            end
+        end,
+        Ranges0
+    ),
+    Inflight = Inflight0#inflight{acked_seqno = AckedSeqno, offset_ranges = Ranges},
+    {true, Inflight}.
+
+-spec poll(emqx_persistent_session_ds:id(), inflight(), pos_integer()) ->
+    {emqx_session:replies(), inflight()}.
+poll(SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSize < 16#7fff ->
+    #inflight{next_seqno = NextSeqNo0, acked_seqno = AckedSeqno} =
+        Inflight0,
+    FetchThreshold = max(1, WindowSize div 2),
+    FreeSpace = AckedSeqno + WindowSize - NextSeqNo0,
+    case FreeSpace >= FetchThreshold of
+        false ->
+            %% TODO: this branch is meant to avoid fetching data from
+            %% the DB in chunks that are too small. However, this
+            %% logic is not exactly good for the latency. Can the
+            %% client get stuck even?
+            {[], Inflight0};
+        true ->
+            Streams = shuffle(get_streams(SessionId)),
+            fetch(SessionId, Inflight0, Streams, FreeSpace, [])
+    end.
+
+%%================================================================================
+%% Internal exports
+%%================================================================================
+
+%%================================================================================
+%% Internal functions
+%%================================================================================
+
+fetch(_SessionId, Inflight, _Streams = [], _N, Acc) ->
+    {lists:reverse(Acc), Inflight};
+fetch(_SessionId, Inflight, _Streams, 0, Acc) ->
+    {lists:reverse(Acc), Inflight};
+fetch(SessionId, Inflight0, [#ds_stream{stream = Stream} | Streams], N, Publishes0) ->
+    #inflight{next_seqno = FirstSeqNo, offset_ranges = Ranges0} = Inflight0,
+    ItBegin = get_last_iterator(SessionId, Stream, Ranges0),
+    {ok, ItEnd, Messages} = emqx_ds:next(ItBegin, N),
+    {Publishes, Inflight1} =
+        lists:foldl(
+            fun(Msg, {PubAcc0, InflightAcc0}) ->
+                {PacketId, InflightAcc} = next_packet_id(InflightAcc0),
+                PubAcc = [{PacketId, Msg} | PubAcc0],
+                {PubAcc, InflightAcc}
+            end,
+            {Publishes0, Inflight0},
+            Messages
+        ),
+    #inflight{next_seqno = LastSeqNo} = Inflight1,
+    NMessages = LastSeqNo - FirstSeqNo,
+    case NMessages > 0 of
+        true ->
+            Range = #range{
+                first = FirstSeqNo,
+                last = LastSeqNo - 1,
+                stream = Stream,
+                iterator_next = ItEnd
+            },
+            Inflight = Inflight1#inflight{offset_ranges = Ranges0 ++ [Range]},
+            fetch(SessionId, Inflight, Streams, N - NMessages, Publishes);
+        false ->
+            fetch(SessionId, Inflight1, Streams, N, Publishes)
+    end.
+
+update_iterator(SessionId, Stream, Iterator) ->
+    mria:dirty_write(?SESSION_ITER_TAB, #ds_iter{id = {SessionId, Stream}, iter = Iterator}).
+
+get_last_iterator(SessionId, Stream, Ranges) ->
+    case lists:keyfind(Stream, #range.stream, lists:reverse(Ranges)) of
+        false ->
+            get_iterator(SessionId, Stream);
+        #range{iterator_next = Next} ->
+            Next
+    end.
+
+get_iterator(SessionId, Stream) ->
+    Id = {SessionId, Stream},
+    [#ds_iter{iter = It}] = mnesia:dirty_read(?SESSION_ITER_TAB, Id),
+    It.
+
+get_streams(SessionId) ->
+    mnesia:dirty_read(?SESSION_STREAM_TAB, SessionId).
+
+%% Packet ID as defined by MQTT protocol is a 16-bit integer in range
+%% 1..FFFF. This function translates internal session sequence number
+%% to MQTT packet ID by chopping off most significant bits and adding
+%% 1.  This assumes that there's never more FFFF in-flight packets at
+%% any time:
+-spec seqno_to_packet_id(non_neg_integer()) -> emqx_types:packet_id().
+seqno_to_packet_id(Counter) ->
+    Counter rem 16#ffff + 1.
+
+%% Reconstruct session counter by adding most significant bits from
+%% the current counter to the packet id.
+-spec packet_id_to_seqno(non_neg_integer(), emqx_types:packet_id()) -> non_neg_integer().
+packet_id_to_seqno(NextSeqNo, PacketId) ->
+    N = ((NextSeqNo bsr 16) bsl 16) + PacketId,
+    case N > NextSeqNo of
+        true -> N - 16#10000;
+        false -> N
+    end.
+
+-spec shuffle([A]) -> [A].
+shuffle(L0) ->
+    L1 = lists:map(
+        fun(A) ->
+            {rand:uniform(), A}
+        end,
+        L0
+    ),
+    L2 = lists:sort(L1),
+    {_, L} = lists:unzip(L2),
+    L.

+ 124 - 41
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -18,9 +18,12 @@
 
 -include("emqx.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
 
 -include("emqx_mqtt.hrl").
 
+-include("emqx_persistent_session_ds.hrl").
+
 %% Session API
 -export([
     create/3,
@@ -50,7 +53,7 @@
 -export([
     deliver/3,
     replay/3,
-    % handle_timeout/3,
+    handle_timeout/3,
     disconnect/1,
     terminate/2
 ]).
@@ -81,10 +84,14 @@
     expires_at := timestamp() | never,
     %% Client’s Subscriptions.
     iterators := #{topic() => subscription()},
+    %% Inflight messages
+    inflight := emqx_persistent_message_ds_replayer:inflight(),
     %%
     props := map()
 }.
 
+%% -type session() :: #session{}.
+
 -type timestamp() :: emqx_utils_calendar:epoch_millisecond().
 -type topic() :: emqx_types:topic().
 -type clientinfo() :: emqx_types:clientinfo().
@@ -113,6 +120,8 @@ open(#{clientid := ClientID}, _ConnInfo) ->
     %% somehow isolate those idling not-yet-expired sessions into a separate process
     %% space, and move this call back into `emqx_cm` where it belongs.
     ok = emqx_cm:discard_session(ClientID),
+    ensure_timer(pull),
+    ensure_timer(get_streams),
     case open_session(ClientID) of
         Session = #{} ->
             {true, Session, []};
@@ -259,8 +268,8 @@ get_subscription(TopicFilter, #{iterators := Iters}) ->
     {ok, emqx_types:publish_result(), replies(), session()}
     | {error, emqx_types:reason_code()}.
 publish(_PacketId, Msg, Session) ->
-    % TODO: stub
-    {ok, emqx_broker:publish(Msg), [], Session}.
+    ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg]),
+    {ok, persisted, [], Session}.
 
 %%--------------------------------------------------------------------
 %% Client -> Broker: PUBACK
@@ -269,9 +278,14 @@ publish(_PacketId, Msg, Session) ->
 -spec puback(clientinfo(), emqx_types:packet_id(), session()) ->
     {ok, emqx_types:message(), replies(), session()}
     | {error, emqx_types:reason_code()}.
-puback(_ClientInfo, _PacketId, _Session = #{}) ->
-    % TODO: stub
-    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}.
+puback(_ClientInfo, PacketId, Session = #{id := Id, inflight := Inflight0}) ->
+    case emqx_persistent_message_ds_replayer:commit_offset(Id, PacketId, Inflight0) of
+        {true, Inflight} ->
+            Msg = #message{}, %% TODO
+            {ok, Msg, [], Session#{inflight => Inflight}};
+        {false, _} ->
+            {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
+    end.
 
 %%--------------------------------------------------------------------
 %% Client -> Broker: PUBREC
@@ -308,10 +322,23 @@ pubcomp(_ClientInfo, _PacketId, _Session = #{}) ->
 %%--------------------------------------------------------------------
 
 -spec deliver(clientinfo(), [emqx_types:deliver()], session()) ->
-    no_return().
-deliver(_ClientInfo, _Delivers, _Session = #{}) ->
-    % TODO: ensure it's unreachable somehow
-    error(unexpected).
+    {ok, emqx_types:message(), replies(), session()}.
+deliver(_ClientInfo, _Delivers, Session) ->
+    %% This may be triggered for the system messages. FIXME.
+    {ok, [], Session}.
+
+-spec handle_timeout(clientinfo(), emqx_session:common_timer_name(), session()) ->
+    {ok, replies(), session()} | {ok, replies(), timeout(), session()}.
+handle_timeout(_ClientInfo, pull, Session = #{id := Id, inflight := Inflight0}) ->
+    WindowSize = 100,
+    {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(Id, Inflight0, WindowSize),
+    %%logger:warning("Inflight: ~p", [Inflight]),
+    ensure_timer(pull),
+    {ok, Publishes, Session#{inflight => Inflight}};
+handle_timeout(_ClientInfo, get_streams, Session = #{id := Id}) ->
+    renew_streams(Id),
+    ensure_timer(get_streams),
+    {ok, [], Session}.
 
 -spec replay(clientinfo(), [], session()) ->
     {ok, replies(), session()}.
@@ -390,29 +417,11 @@ del_subscription(TopicFilterBin, DSSessionId) ->
 %% Session tables operations
 %%--------------------------------------------------------------------
 
--define(SESSION_TAB, emqx_ds_session).
--define(SESSION_SUBSCRIPTIONS_TAB, emqx_ds_session_subscriptions).
--define(DS_MRIA_SHARD, emqx_ds_session_shard).
-
--record(session, {
-    %% same as clientid
-    id :: id(),
-    %% creation time
-    created_at :: _Millisecond :: non_neg_integer(),
-    expires_at = never :: _Millisecond :: non_neg_integer() | never,
-    %% for future usage
-    props = #{} :: map()
-}).
-
--record(ds_sub, {
-    id :: subscription_id(),
-    start_time :: emqx_ds:time(),
-    props = #{} :: map(),
-    extra = #{} :: map()
-}).
--type ds_sub() :: #ds_sub{}.
-
 create_tables() ->
+    ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{
+        backend => builtin,
+        storage => {emqx_ds_storage_bitfield_lts, #{}}
+    }),
     ok = mria:create_table(
         ?SESSION_TAB,
         [
@@ -433,7 +442,29 @@ create_tables() ->
             {attributes, record_info(fields, ds_sub)}
         ]
     ),
-    ok = mria:wait_for_tables([?SESSION_TAB, ?SESSION_SUBSCRIPTIONS_TAB]),
+    ok = mria:create_table(
+        ?SESSION_STREAM_TAB,
+        [
+            {rlog_shard, ?DS_MRIA_SHARD},
+            {type, bag},
+            {storage, storage()},
+            {record_name, ds_stream},
+            {attributes, record_info(fields, ds_stream)}
+        ]
+    ),
+    ok = mria:create_table(
+        ?SESSION_ITER_TAB,
+        [
+            {rlog_shard, ?DS_MRIA_SHARD},
+            {type, set},
+            {storage, storage()},
+            {record_name, ds_iter},
+            {attributes, record_info(fields, ds_iter)}
+        ]
+    ),
+    ok = mria:wait_for_tables([
+        ?SESSION_TAB, ?SESSION_SUBSCRIPTIONS_TAB, ?SESSION_STREAM_TAB, ?SESSION_ITER_TAB
+    ]),
     ok.
 
 -dialyzer({nowarn_function, storage/0}).
@@ -482,7 +513,8 @@ session_create(SessionId, Props) ->
         id = SessionId,
         created_at = erlang:system_time(millisecond),
         expires_at = never,
-        props = Props
+        props = Props,
+        inflight = emqx_persistent_message_ds_replayer:new()
     },
     ok = mnesia:write(?SESSION_TAB, Session, write),
     Session.
@@ -555,12 +587,12 @@ session_del_subscription(#ds_sub{id = DSSubId}) ->
     mnesia:delete(?SESSION_SUBSCRIPTIONS_TAB, DSSubId, write).
 
 session_read_subscriptions(DSSessionId) ->
-    % NOTE: somewhat convoluted way to trick dialyzer
-    Pat = erlang:make_tuple(record_info(size, ds_sub), '_', [
-        {1, ds_sub},
-        {#ds_sub.id, {DSSessionId, '_'}}
-    ]),
-    mnesia:match_object(?SESSION_SUBSCRIPTIONS_TAB, Pat, read).
+    MS = ets:fun2ms(
+        fun(Sub = #ds_sub{id = {Sess, _}}) when Sess =:= DSSessionId ->
+            Sub
+        end
+    ),
+    mnesia:select(?SESSION_SUBSCRIPTIONS_TAB, MS, read).
 
 -spec new_subscription_id(id(), topic_filter()) -> {subscription_id(), emqx_ds:time()}.
 new_subscription_id(DSSessionId, TopicFilter) ->
@@ -568,12 +600,58 @@ new_subscription_id(DSSessionId, TopicFilter) ->
     DSSubId = {DSSessionId, TopicFilter},
     {DSSubId, NowMS}.
 
+%%--------------------------------------------------------------------
+%% Reading batches
+%%--------------------------------------------------------------------
+
+renew_streams(Id) ->
+    Subscriptions = ro_transaction(fun() -> session_read_subscriptions(Id) end),
+    ExistingStreams = ro_transaction(fun() -> mnesia:read(?SESSION_STREAM_TAB, Id) end),
+    lists:foreach(
+        fun(#ds_sub{id = {_, TopicFilter}, start_time = StartTime}) ->
+            renew_streams(Id, ExistingStreams, TopicFilter, StartTime)
+        end,
+        Subscriptions
+    ).
+
+renew_streams(Id, ExistingStreams, TopicFilter, StartTime) ->
+    AllStreams = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime),
+    transaction(
+        fun() ->
+            lists:foreach(
+                fun({Rank, Stream}) ->
+                    Rec = #ds_stream{
+                        session = Id,
+                        topic_filter = TopicFilter,
+                        stream = Stream,
+                        rank = Rank
+                    },
+                    case lists:member(Rec, ExistingStreams) of
+                        true ->
+                            ok;
+                        false ->
+                            mnesia:write(?SESSION_STREAM_TAB, Rec, write),
+                            % StartTime),
+                            {ok, Iterator} = emqx_ds:make_iterator(Stream, TopicFilter, 0),
+                            IterRec = #ds_iter{id = {Id, Stream}, iter = Iterator},
+                            mnesia:write(?SESSION_ITER_TAB, IterRec, write)
+                    end
+                end,
+                AllStreams
+            )
+        end
+    ).
+
 %%--------------------------------------------------------------------------------
 
 transaction(Fun) ->
     {atomic, Res} = mria:transaction(?DS_MRIA_SHARD, Fun),
     Res.
 
+ro_transaction(Fun) ->
+    {atomic, Res} = mria:ro_transaction(?DS_MRIA_SHARD, Fun),
+    Res.
+
 %%--------------------------------------------------------------------------------
 
 export_subscriptions(DSSubs) ->
@@ -586,7 +664,7 @@ export_subscriptions(DSSubs) ->
     ).
 
 export_session(#session{} = Record) ->
-    export_record(Record, #session.id, [id, created_at, expires_at, props], #{}).
+    export_record(Record, #session.id, [id, created_at, expires_at, inflight, props], #{}).
 
 export_subscription(#ds_sub{} = Record) ->
     export_record(Record, #ds_sub.start_time, [start_time, props, extra], #{}).
@@ -595,3 +673,8 @@ export_record(Record, I, [Field | Rest], Acc) ->
     export_record(Record, I + 1, Rest, Acc#{Field => element(I, Record)});
 export_record(_, _, [], Acc) ->
     Acc.
+
+-spec ensure_timer(pull | get_streams) -> ok.
+ensure_timer(Type) ->
+    emqx_utils:start_timer(100, {emqx_session, Type}),
+    ok.

+ 56 - 0
apps/emqx/src/emqx_persistent_session_ds.hrl

@@ -0,0 +1,56 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-ifndef(EMQX_PERSISTENT_SESSION_DS_HRL_HRL).
+-define(EMQX_PERSISTENT_SESSION_DS_HRL_HRL, true).
+
+-define(SESSION_TAB, emqx_ds_session).
+-define(SESSION_SUBSCRIPTIONS_TAB, emqx_ds_session_subscriptions).
+-define(SESSION_STREAM_TAB, emqx_ds_stream_tab).
+-define(SESSION_ITER_TAB, emqx_ds_iter_tab).
+-define(DS_MRIA_SHARD, emqx_ds_session_shard).
+
+-record(ds_sub, {
+    id :: emqx_persistent_session_ds:subscription_id(),
+    start_time :: emqx_ds:time(),
+    props = #{} :: map(),
+    extra = #{} :: map()
+}).
+-type ds_sub() :: #ds_sub{}.
+
+-record(ds_stream, {
+    session :: emqx_persistent_session_ds:id(),
+    topic_filter :: emqx_ds:topic_filter(),
+    stream :: emqx_ds:stream(),
+    rank :: emqx_ds:stream_rank()
+}).
+
+-record(ds_iter, {
+    id :: {emqx_persistent_session_ds:id(), emqx_ds:stream()},
+    iter :: emqx_ds:iterator()
+}).
+
+-record(session, {
+    %% same as clientid
+    id :: emqx_persistent_session_ds:id(),
+    %% creation time
+    created_at :: _Millisecond :: non_neg_integer(),
+    expires_at = never :: _Millisecond :: non_neg_integer() | never,
+    inflight :: emqx_persistent_message_ds_replayer:inflight(),
+    %% for future usage
+    props = #{} :: map()
+}).
+
+-endif.

+ 12 - 8
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -103,8 +103,8 @@ t_messages_persisted(_Config) ->
     ct:pal("Persisted = ~p", [Persisted]),
 
     ?assertEqual(
-        [M1, M2, M5, M7, M9, M10],
-        [{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted]
+        lists:sort([M1, M2, M5, M7, M9, M10]),
+        lists:sort([{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted])
     ),
 
     ok.
@@ -146,11 +146,11 @@ t_messages_persisted_2(_Config) ->
     ct:pal("Persisted = ~p", [Persisted]),
 
     ?assertEqual(
-        [
+        lists:sort([
             {T(<<"client/1/topic">>), <<"4">>},
             {T(<<"client/2/topic">>), <<"5">>}
-        ],
-        [{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted]
+        ]),
+        lists:sort([{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted])
     ),
 
     ok.
@@ -252,9 +252,13 @@ connect(Opts0 = #{}) ->
     Client.
 
 consume(TopicFiler, StartMS) ->
-    [{_, Stream}] = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFiler, StartMS),
-    {ok, It} = emqx_ds:make_iterator(Stream, StartMS),
-    consume(It).
+    lists:flatmap(
+        fun({_Rank, Stream}) ->
+            {ok, It} = emqx_ds:make_iterator(Stream, StartMS, 0),
+            consume(It)
+        end,
+        emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFiler, StartMS)
+    ).
 
 consume(It) ->
     case emqx_ds:next(It, 100) of

+ 42 - 0
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -510,6 +510,48 @@ t_process_dies_session_expires(Config) ->
 
     emqtt:disconnect(Client2).
 
+t_publish_while_client_is_gone_qos1(Config) ->
+    %% A persistent session should receive messages in its
+    %% subscription even if the process owning the session dies.
+    ConnFun = ?config(conn_fun, Config),
+    Topic = ?config(topic, Config),
+    STopic = ?config(stopic, Config),
+    Payload1 = <<"hello1">>,
+    Payload2 = <<"hello2">>,
+    ClientId = ?config(client_id, Config),
+    {ok, Client1} = emqtt:start_link([
+        {proto_ver, v5},
+        {clientid, ClientId},
+        {properties, #{'Session-Expiry-Interval' => 30}},
+        {clean_start, true}
+        | Config
+    ]),
+    {ok, _} = emqtt:ConnFun(Client1),
+    {ok, _, [1]} = emqtt:subscribe(Client1, STopic, qos1),
+
+    ok = emqtt:disconnect(Client1),
+    maybe_kill_connection_process(ClientId, Config),
+
+    ok = publish(Topic, [Payload1, Payload2]),
+
+    {ok, Client2} = emqtt:start_link([
+        {proto_ver, v5},
+        {clientid, ClientId},
+        {properties, #{'Session-Expiry-Interval' => 30}},
+        {clean_start, false}
+        | Config
+    ]),
+    {ok, _} = emqtt:ConnFun(Client2),
+    Msgs = receive_messages(2),
+    ?assertMatch([_, _], Msgs),
+    [Msg2, Msg1] = Msgs,
+    ?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)),
+    ?assertEqual({ok, 1}, maps:find(qos, Msg1)),
+    ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)),
+    ?assertEqual({ok, 1}, maps:find(qos, Msg2)),
+
+    ok = emqtt:disconnect(Client2).
+
 t_publish_while_client_is_gone(init, Config) -> skip_ds_tc(Config);
 t_publish_while_client_is_gone('end', _Config) -> ok.
 t_publish_while_client_is_gone(Config) ->

+ 17 - 1
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -30,6 +30,9 @@
 %% Message replay API:
 -export([get_streams/3, make_iterator/3, next/2]).
 
+%% Iterator storage API:
+-export([save_iterator/3, get_iterator/2]).
+
 %% Misc. API:
 -export([]).
 
@@ -46,7 +49,8 @@
     message_id/0,
     next_result/1, next_result/0,
     store_batch_result/0,
-    make_iterator_result/1, make_iterator_result/0
+    make_iterator_result/1, make_iterator_result/0,
+    get_iterator_result/1
 ]).
 
 %%================================================================================
@@ -97,6 +101,10 @@
 
 -type message_id() :: emqx_ds_replication_layer:message_id().
 
+-type iterator_id() :: term().
+
+-type get_iterator_result(Iterator) :: {ok, Iterator} | undefined.
+
 %%================================================================================
 %% API funcions
 %%================================================================================
@@ -174,6 +182,14 @@ make_iterator(Stream, TopicFilter, StartTime) ->
 next(Iter, BatchSize) ->
     emqx_ds_replication_layer:next(Iter, BatchSize).
 
+-spec save_iterator(db(), iterator_id(), iterator()) -> ok.
+save_iterator(DB, ITRef, Iterator) ->
+    emqx_ds_replication_layer:save_iterator(DB, ITRef, Iterator).
+
+-spec get_iterator(db(), iterator_id()) -> get_iterator_result(iterator()).
+get_iterator(DB, ITRef) ->
+    emqx_ds_replication_layer:get_iterator(DB, ITRef).
+
 %%================================================================================
 %% Internal exports
 %%================================================================================

+ 73 - 0
apps/emqx_durable_storage/src/emqx_ds_helper.erl

@@ -0,0 +1,73 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_ds_helper).
+
+%% API:
+-export([create_rr/1]).
+
+%% internal exports:
+-export([]).
+
+-export_type([rr/0]).
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+-type item() :: {emqx_ds:stream_rank(), emqx_ds:stream()}.
+
+-type rr() :: #{
+    queue := #{term() => [{integer(), emqx_ds:stream()}]},
+    active_ring := {[item()], [item()]}
+}.
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+-spec create_rr([item()]) -> rr().
+create_rr(Streams) ->
+    RR0 = #{latest_rank => #{}, active_ring => {[], []}},
+    add_streams(RR0, Streams).
+
+-spec add_streams(rr(), [item()]) -> rr().
+add_streams(#{queue := Q0, active_ring := R0}, Streams) ->
+    Q1 = lists:foldl(
+        fun({{RankX, RankY}, Stream}, Acc) ->
+            maps:update_with(RankX, fun(L) -> [{RankY, Stream} | L] end, Acc)
+        end,
+        Q0,
+        Streams
+    ),
+    Q2 = maps:map(
+        fun(_RankX, Streams1) ->
+            lists:usort(Streams1)
+        end,
+        Q1
+    ),
+    #{queue => Q2, active_ring => R0}.
+
+%%================================================================================
+%% behavior callbacks
+%%================================================================================
+
+%%================================================================================
+%% Internal exports
+%%================================================================================
+
+%%================================================================================
+%% Internal functions
+%%================================================================================

+ 17 - 7
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -22,7 +22,9 @@
     store_batch/3,
     get_streams/3,
     make_iterator/3,
-    next/2
+    next/2,
+    save_iterator/3,
+    get_iterator/2
 ]).
 
 %% internal exports:
@@ -42,7 +44,7 @@
 
 -type db() :: emqx_ds:db().
 
--type shard_id() :: {emqx_ds:db(), atom()}.
+-type shard_id() :: {db(), atom()}.
 
 %% This record enapsulates the stream entity from the replication
 %% level.
@@ -71,7 +73,7 @@
 %% API functions
 %%================================================================================
 
--spec list_shards(emqx_ds:db()) -> [shard_id()].
+-spec list_shards(db()) -> [shard_id()].
 list_shards(DB) ->
     %% TODO: milestone 5
     lists:map(
@@ -81,7 +83,7 @@ list_shards(DB) ->
         list_nodes()
     ).
 
--spec open_db(emqx_ds:db(), emqx_ds:create_db_opts()) -> ok | {error, _}.
+-spec open_db(db(), emqx_ds:create_db_opts()) -> ok | {error, _}.
 open_db(DB, Opts) ->
     %% TODO: improve error reporting, don't just crash
     lists:foreach(
@@ -92,7 +94,7 @@ open_db(DB, Opts) ->
         list_nodes()
     ).
 
--spec drop_db(emqx_ds:db()) -> ok | {error, _}.
+-spec drop_db(db()) -> ok | {error, _}.
 drop_db(DB) ->
     lists:foreach(
         fun(Node) ->
@@ -102,7 +104,7 @@ drop_db(DB) ->
         list_nodes()
     ).
 
--spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
+-spec store_batch(db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
     emqx_ds:store_batch_result().
 store_batch(DB, Msg, Opts) ->
     %% TODO: Currently we store messages locally.
@@ -112,7 +114,7 @@ store_batch(DB, Msg, Opts) ->
 -spec get_streams(db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
     [{emqx_ds:stream_rank(), stream()}].
 get_streams(DB, TopicFilter, StartTime) ->
-    Shards = emqx_ds_replication_layer:list_shards(DB),
+    Shards = list_shards(DB),
     lists:flatmap(
         fun(Shard) ->
             Node = node_of_shard(Shard),
@@ -164,6 +166,14 @@ next(Iter0, BatchSize) ->
             Other
     end.
 
+-spec save_iterator(db(), emqx_ds:iterator_id(), iterator()) -> ok.
+save_iterator(_DB, _ITRef, _Iterator) ->
+    error(todo).
+
+-spec get_iterator(db(), emqx_ds:iterator_id()) -> emqx_ds:get_iterator_result(iterator()).
+get_iterator(_DB, _ITRef) ->
+    error(todo).
+
 %%================================================================================
 %% behavior callbacks
 %%================================================================================

+ 1 - 1
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -368,7 +368,7 @@ rocksdb_open(Shard, Options) ->
 
 -spec db_dir(shard_id()) -> file:filename().
 db_dir({DB, ShardId}) ->
-    lists:flatten([atom_to_list(DB), $:, atom_to_list(ShardId)]).
+    filename:join("data", lists:flatten([atom_to_list(DB), $:, atom_to_list(ShardId)])).
 
 %%--------------------------------------------------------------------------------
 %% Schema access

+ 13 - 0
tdd

@@ -0,0 +1,13 @@
+#!/bin/bash
+
+make fmt > /dev/null &>1 &
+
+./rebar3 ct --name ct@127.0.0.1 --readable=true  --suite ./_build/test/lib/emqx/test/emqx_persistent_session_SUITE.beam --case t_publish_while_client_is_gone_qos1 --group tcp
+
+suites=$(cat <<EOF | paste -sd "," -
+./_build/test/lib/emqx/test/emqx_persistent_session_SUITE.beam
+./_build/test/lib/emqx/test/emqx_persistent_messages_SUITE.beam
+EOF
+)
+
+#./rebar3 ct --name ct@127.0.0.1 --readable=true --suite "${suites}" --case t_publish_while_client_is_gone

BIN
topic_match_test.png