فهرست منبع

fix(sessds): Refactor emqx_persistent_session_ds to use CRUD module

ieQu1 2 سال پیش
والد
کامیت
8e8d3af096

+ 0 - 795
apps/emqx/src/emqx_persistent_message_ds_replayer.erl

@@ -1,795 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
-%% @doc This module implements the routines for replaying streams of
-%% messages.
--module(emqx_persistent_message_ds_replayer).
-
-%% API:
--export([new/0, open/1, next_packet_id/1, n_inflight/1]).
-
--export([poll/4, replay/2, commit_offset/4]).
-
--export([seqno_to_packet_id/1, packet_id_to_seqno/2]).
-
--export([committed_until/2]).
-
-%% internal exports:
--export([]).
-
--export_type([inflight/0, seqno/0]).
-
--include_lib("emqx/include/logger.hrl").
--include_lib("emqx/include/emqx_mqtt.hrl").
--include_lib("emqx_utils/include/emqx_message.hrl").
--include("emqx_persistent_session_ds.hrl").
-
--ifdef(TEST).
--include_lib("proper/include/proper.hrl").
--include_lib("eunit/include/eunit.hrl").
--endif.
-
--define(EPOCH_SIZE, 16#10000).
-
--define(ACK, 0).
--define(COMP, 1).
-
--define(TRACK_FLAG(WHICH), (1 bsl WHICH)).
--define(TRACK_FLAGS_ALL, ?TRACK_FLAG(?ACK) bor ?TRACK_FLAG(?COMP)).
--define(TRACK_FLAGS_NONE, 0).
-
-%%================================================================================
-%% Type declarations
-%%================================================================================
-
-%% Note: sequence numbers are monotonic; they don't wrap around:
--type seqno() :: non_neg_integer().
-
--type track() :: ack | comp.
--type commit_type() :: rec.
-
--record(inflight, {
-    next_seqno = 1 :: seqno(),
-    commits = #{ack => 1, comp => 1, rec => 1} :: #{track() | commit_type() => seqno()},
-    %% Ranges are sorted in ascending order of their sequence numbers.
-    offset_ranges = [] :: [ds_pubrange()]
-}).
-
--opaque inflight() :: #inflight{}.
-
--type message() :: emqx_types:message().
--type replies() :: [emqx_session:reply()].
-
--type preproc_fun() :: fun((message()) -> message() | [message()]).
-
-%%================================================================================
-%% API funcions
-%%================================================================================
-
--spec new() -> inflight().
-new() ->
-    #inflight{}.
-
--spec open(emqx_persistent_session_ds:id()) -> inflight().
-open(SessionId) ->
-    {Ranges, RecUntil} = ro_transaction(
-        fun() -> {get_ranges(SessionId), get_committed_offset(SessionId, rec)} end
-    ),
-    {Commits, NextSeqno} = compute_inflight_range(Ranges),
-    #inflight{
-        commits = Commits#{rec => RecUntil},
-        next_seqno = NextSeqno,
-        offset_ranges = Ranges
-    }.
-
--spec next_packet_id(inflight()) -> {emqx_types:packet_id(), inflight()}.
-next_packet_id(Inflight0 = #inflight{next_seqno = LastSeqno}) ->
-    Inflight = Inflight0#inflight{next_seqno = next_seqno(LastSeqno)},
-    {seqno_to_packet_id(LastSeqno), Inflight}.
-
--spec n_inflight(inflight()) -> non_neg_integer().
-n_inflight(#inflight{offset_ranges = Ranges}) ->
-    %% TODO
-    %% This is not very efficient. Instead, we can take the maximum of
-    %% `range_size(AckedUntil, NextSeqno)` and `range_size(CompUntil, NextSeqno)`.
-    %% This won't be exact number but a pessimistic estimate, but this way we
-    %% will penalize clients that PUBACK QoS 1 messages but don't PUBCOMP QoS 2
-    %% messages for some reason. For that to work, we need to additionally track
-    %% actual `AckedUntil` / `CompUntil` during `commit_offset/4`.
-    lists:foldl(
-        fun
-            (#ds_pubrange{type = ?T_CHECKPOINT}, N) ->
-                N;
-            (#ds_pubrange{type = ?T_INFLIGHT} = Range, N) ->
-                N + range_size(Range)
-        end,
-        0,
-        Ranges
-    ).
-
--spec replay(preproc_fun(), inflight()) -> {emqx_session:replies(), inflight()}.
-replay(PreprocFunFun, Inflight0 = #inflight{offset_ranges = Ranges0, commits = Commits}) ->
-    {Ranges, Replies} = lists:mapfoldr(
-        fun(Range, Acc) ->
-            replay_range(PreprocFunFun, Commits, Range, Acc)
-        end,
-        [],
-        Ranges0
-    ),
-    Inflight = Inflight0#inflight{offset_ranges = Ranges},
-    {Replies, Inflight}.
-
--spec commit_offset(emqx_persistent_session_ds:id(), Offset, emqx_types:packet_id(), inflight()) ->
-    {_IsValidOffset :: boolean(), inflight()}
-when
-    Offset :: track() | commit_type().
-commit_offset(
-    SessionId,
-    Track,
-    PacketId,
-    Inflight0 = #inflight{commits = Commits}
-) when Track == ack orelse Track == comp ->
-    case validate_commit(Track, PacketId, Inflight0) of
-        CommitUntil when is_integer(CommitUntil) ->
-            %% TODO
-            %% We do not preserve `CommitUntil` in the database. Instead, we discard
-            %% fully acked ranges from the database. In effect, this means that the
-            %% most recent `CommitUntil` the client has sent may be lost in case of a
-            %% crash or client loss.
-            Inflight1 = Inflight0#inflight{commits = Commits#{Track := CommitUntil}},
-            Inflight = discard_committed(SessionId, Inflight1),
-            {true, Inflight};
-        false ->
-            {false, Inflight0}
-    end;
-commit_offset(
-    SessionId,
-    CommitType = rec,
-    PacketId,
-    Inflight0 = #inflight{commits = Commits}
-) ->
-    case validate_commit(CommitType, PacketId, Inflight0) of
-        CommitUntil when is_integer(CommitUntil) ->
-            update_committed_offset(SessionId, CommitType, CommitUntil),
-            Inflight = Inflight0#inflight{commits = Commits#{CommitType := CommitUntil}},
-            {true, Inflight};
-        false ->
-            {false, Inflight0}
-    end.
-
--spec poll(preproc_fun(), emqx_persistent_session_ds:id(), inflight(), pos_integer()) ->
-    {emqx_session:replies(), inflight()}.
-poll(PreprocFun, SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSize < ?EPOCH_SIZE ->
-    MinBatchSize = emqx_config:get([session_persistence, min_batch_size]),
-    FetchThreshold = min(MinBatchSize, ceil(WindowSize / 2)),
-    FreeSpace = WindowSize - n_inflight(Inflight0),
-    case FreeSpace >= FetchThreshold of
-        false ->
-            %% TODO: this branch is meant to avoid fetching data from
-            %% the DB in chunks that are too small. However, this
-            %% logic is not exactly good for the latency. Can the
-            %% client get stuck even?
-            {[], Inflight0};
-        true ->
-            %% TODO: Wrap this in `mria:async_dirty/2`?
-            Checkpoints = find_checkpoints(Inflight0#inflight.offset_ranges),
-            StreamGroups = group_streams(get_streams(SessionId)),
-            {Publihes, Inflight} =
-                fetch(PreprocFun, SessionId, Inflight0, Checkpoints, StreamGroups, FreeSpace, []),
-            %% Discard now irrelevant QoS0-only ranges, if any.
-            {Publihes, discard_committed(SessionId, Inflight)}
-    end.
-
-%% Which seqno this track is committed until.
-%% "Until" means this is first seqno that is _not yet committed_ for this track.
--spec committed_until(track() | commit_type(), inflight()) -> seqno().
-committed_until(Track, #inflight{commits = Commits}) ->
-    maps:get(Track, Commits).
-
--spec seqno_to_packet_id(seqno()) -> emqx_types:packet_id() | 0.
-seqno_to_packet_id(Seqno) ->
-    Seqno rem ?EPOCH_SIZE.
-
-%% Reconstruct session counter by adding most significant bits from
-%% the current counter to the packet id.
--spec packet_id_to_seqno(emqx_types:packet_id(), inflight()) -> seqno().
-packet_id_to_seqno(PacketId, #inflight{next_seqno = NextSeqno}) ->
-    packet_id_to_seqno_(NextSeqno, PacketId).
-
-%%================================================================================
-%% Internal exports
-%%================================================================================
-
-%%================================================================================
-%% Internal functions
-%%================================================================================
-
-compute_inflight_range([]) ->
-    {#{ack => 1, comp => 1}, 1};
-compute_inflight_range(Ranges) ->
-    _RangeLast = #ds_pubrange{until = LastSeqno} = lists:last(Ranges),
-    AckedUntil = find_committed_until(ack, Ranges),
-    CompUntil = find_committed_until(comp, Ranges),
-    Commits = #{
-        ack => emqx_maybe:define(AckedUntil, LastSeqno),
-        comp => emqx_maybe:define(CompUntil, LastSeqno)
-    },
-    {Commits, LastSeqno}.
-
-find_committed_until(Track, Ranges) ->
-    RangesUncommitted = lists:dropwhile(
-        fun(Range) ->
-            case Range of
-                #ds_pubrange{type = ?T_CHECKPOINT} ->
-                    true;
-                #ds_pubrange{type = ?T_INFLIGHT, tracks = Tracks} ->
-                    not has_track(Track, Tracks)
-            end
-        end,
-        Ranges
-    ),
-    case RangesUncommitted of
-        [#ds_pubrange{id = {_, CommittedUntil, _StreamRef}} | _] ->
-            CommittedUntil;
-        [] ->
-            undefined
-    end.
-
--spec get_ranges(emqx_persistent_session_ds:id()) -> [ds_pubrange()].
-get_ranges(SessionId) ->
-    Pat = erlang:make_tuple(
-        record_info(size, ds_pubrange),
-        '_',
-        [{1, ds_pubrange}, {#ds_pubrange.id, {SessionId, '_', '_'}}]
-    ),
-    mnesia:match_object(?SESSION_PUBRANGE_TAB, Pat, read).
-
-fetch(PreprocFun, SessionId, Inflight0, CPs, Groups, N, Acc) when N > 0, Groups =/= [] ->
-    #inflight{next_seqno = FirstSeqno, offset_ranges = Ranges} = Inflight0,
-    {Stream, Groups2} = get_the_first_stream(Groups),
-    case get_next_n_messages_from_stream(Stream, CPs, N) of
-        [] ->
-            fetch(PreprocFun, SessionId, Inflight0, CPs, Groups2, N, Acc);
-        {ItBegin, ItEnd, Messages} ->
-            %% We need to preserve the iterator pointing to the beginning of the
-            %% range, so that we can replay it if needed.
-            {Publishes, UntilSeqno} = publish_fetch(PreprocFun, FirstSeqno, Messages),
-            Size = range_size(FirstSeqno, UntilSeqno),
-            Range0 = #ds_pubrange{
-                id = {SessionId, FirstSeqno, Stream#ds_stream.ref},
-                type = ?T_INFLIGHT,
-                tracks = compute_pub_tracks(Publishes),
-                until = UntilSeqno,
-                iterator = ItBegin
-            },
-            ok = preserve_range(Range0),
-            %% ...Yet we need to keep the iterator pointing past the end of the
-            %% range, so that we can pick up where we left off: it will become
-            %% `ItBegin` of the next range for this stream.
-            Range = keep_next_iterator(ItEnd, Range0),
-            Inflight = Inflight0#inflight{
-                next_seqno = UntilSeqno,
-                offset_ranges = Ranges ++ [Range]
-            },
-            fetch(PreprocFun, SessionId, Inflight, CPs, Groups2, N - Size, [Publishes | Acc])
-    end;
-fetch(_ReplyFun, _SessionId, Inflight, _CPs, _Groups, _N, Acc) ->
-    Publishes = lists:append(lists:reverse(Acc)),
-    {Publishes, Inflight}.
-
-discard_committed(
-    SessionId,
-    Inflight0 = #inflight{commits = Commits, offset_ranges = Ranges0}
-) ->
-    %% TODO: This could be kept and incrementally updated in the inflight state.
-    Checkpoints = find_checkpoints(Ranges0),
-    %% TODO: Wrap this in `mria:async_dirty/2`?
-    Ranges = discard_committed_ranges(SessionId, Commits, Checkpoints, Ranges0),
-    Inflight0#inflight{offset_ranges = Ranges}.
-
-find_checkpoints(Ranges) ->
-    lists:foldl(
-        fun(#ds_pubrange{id = {_SessionId, _, StreamRef}} = Range, Acc) ->
-            %% For each stream, remember the last range over this stream.
-            Acc#{StreamRef => Range}
-        end,
-        #{},
-        Ranges
-    ).
-
-discard_committed_ranges(
-    SessionId,
-    Commits,
-    Checkpoints,
-    Ranges = [Range = #ds_pubrange{id = {_SessionId, _, StreamRef}} | Rest]
-) ->
-    case discard_committed_range(Commits, Range) of
-        discard ->
-            %% This range has been fully committed.
-            %% Either discard it completely, or preserve the iterator for the next range
-            %% over this stream (i.e. a checkpoint).
-            RangeKept =
-                case maps:get(StreamRef, Checkpoints) of
-                    Range ->
-                        [checkpoint_range(Range)];
-                    _Previous ->
-                        discard_range(Range),
-                        []
-                end,
-            %% Since we're (intentionally) not using transactions here, it's important to
-            %% issue database writes in the same order in which ranges are stored: from
-            %% the oldest to the newest. This is also why we need to compute which ranges
-            %% should become checkpoints before we start writing anything.
-            RangeKept ++ discard_committed_ranges(SessionId, Commits, Checkpoints, Rest);
-        keep ->
-            %% This range has not been fully committed.
-            [Range | discard_committed_ranges(SessionId, Commits, Checkpoints, Rest)];
-        keep_all ->
-            %% The rest of ranges (if any) still have uncommitted messages.
-            Ranges;
-        TracksLeft ->
-            %% Only some track has been committed.
-            %% Preserve the uncommitted tracks in the database.
-            RangeKept = Range#ds_pubrange{tracks = TracksLeft},
-            preserve_range(restore_first_iterator(RangeKept)),
-            [RangeKept | discard_committed_ranges(SessionId, Commits, Checkpoints, Rest)]
-    end;
-discard_committed_ranges(_SessionId, _Commits, _Checkpoints, []) ->
-    [].
-
-discard_committed_range(_Commits, #ds_pubrange{type = ?T_CHECKPOINT}) ->
-    discard;
-discard_committed_range(
-    #{ack := AckedUntil, comp := CompUntil},
-    #ds_pubrange{until = Until}
-) when Until > AckedUntil andalso Until > CompUntil ->
-    keep_all;
-discard_committed_range(Commits, #ds_pubrange{until = Until, tracks = Tracks}) ->
-    case discard_tracks(Commits, Until, Tracks) of
-        0 ->
-            discard;
-        Tracks ->
-            keep;
-        TracksLeft ->
-            TracksLeft
-    end.
-
-discard_tracks(#{ack := AckedUntil, comp := CompUntil}, Until, Tracks) ->
-    TAck =
-        case Until > AckedUntil of
-            true -> ?TRACK_FLAG(?ACK) band Tracks;
-            false -> 0
-        end,
-    TComp =
-        case Until > CompUntil of
-            true -> ?TRACK_FLAG(?COMP) band Tracks;
-            false -> 0
-        end,
-    TAck bor TComp.
-
-replay_range(
-    PreprocFun,
-    Commits,
-    Range0 = #ds_pubrange{
-        type = ?T_INFLIGHT, id = {_, First, _StreamRef}, until = Until, iterator = It
-    },
-    Acc
-) ->
-    Size = range_size(First, Until),
-    {ok, ItNext, MessagesUnacked} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, It, Size),
-    %% Asserting that range is consistent with the message storage state.
-    {Replies, Until} = publish_replay(PreprocFun, Commits, First, MessagesUnacked),
-    %% Again, we need to keep the iterator pointing past the end of the
-    %% range, so that we can pick up where we left off.
-    Range = keep_next_iterator(ItNext, Range0),
-    {Range, Replies ++ Acc};
-replay_range(_PreprocFun, _Commits, Range0 = #ds_pubrange{type = ?T_CHECKPOINT}, Acc) ->
-    {Range0, Acc}.
-
-validate_commit(
-    Track,
-    PacketId,
-    Inflight = #inflight{commits = Commits, next_seqno = NextSeqno}
-) ->
-    Seqno = packet_id_to_seqno_(NextSeqno, PacketId),
-    CommittedUntil = maps:get(Track, Commits),
-    CommitNext = get_commit_next(Track, Inflight),
-    case Seqno >= CommittedUntil andalso Seqno < CommitNext of
-        true ->
-            next_seqno(Seqno);
-        false ->
-            ?SLOG(warning, #{
-                msg => "out-of-order_commit",
-                track => Track,
-                packet_id => PacketId,
-                commit_seqno => Seqno,
-                committed_until => CommittedUntil,
-                commit_next => CommitNext
-            }),
-            false
-    end.
-
-get_commit_next(ack, #inflight{next_seqno = NextSeqno}) ->
-    NextSeqno;
-get_commit_next(rec, #inflight{next_seqno = NextSeqno}) ->
-    NextSeqno;
-get_commit_next(comp, #inflight{commits = Commits}) ->
-    maps:get(rec, Commits).
-
-publish_fetch(PreprocFun, FirstSeqno, Messages) ->
-    flatmapfoldl(
-        fun({_DSKey, MessageIn}, Acc) ->
-            Message = PreprocFun(MessageIn),
-            publish_fetch(Message, Acc)
-        end,
-        FirstSeqno,
-        Messages
-    ).
-
-publish_fetch(#message{qos = ?QOS_0} = Message, Seqno) ->
-    {{undefined, Message}, Seqno};
-publish_fetch(#message{} = Message, Seqno) ->
-    PacketId = seqno_to_packet_id(Seqno),
-    {{PacketId, Message}, next_seqno(Seqno)};
-publish_fetch(Messages, Seqno) ->
-    flatmapfoldl(fun publish_fetch/2, Seqno, Messages).
-
-publish_replay(PreprocFun, Commits, FirstSeqno, Messages) ->
-    #{ack := AckedUntil, comp := CompUntil, rec := RecUntil} = Commits,
-    flatmapfoldl(
-        fun({_DSKey, MessageIn}, Acc) ->
-            Message = PreprocFun(MessageIn),
-            publish_replay(Message, AckedUntil, CompUntil, RecUntil, Acc)
-        end,
-        FirstSeqno,
-        Messages
-    ).
-
-publish_replay(#message{qos = ?QOS_0}, _, _, _, Seqno) ->
-    %% QoS 0 (at most once) messages should not be replayed.
-    {[], Seqno};
-publish_replay(#message{qos = Qos} = Message, AckedUntil, CompUntil, RecUntil, Seqno) ->
-    case Qos of
-        ?QOS_1 when Seqno < AckedUntil ->
-            %% This message has already been acked, so we can skip it.
-            %% We still need to advance seqno, because previously we assigned this message
-            %% a unique Packet Id.
-            {[], next_seqno(Seqno)};
-        ?QOS_2 when Seqno < CompUntil ->
-            %% This message's flow has already been fully completed, so we can skip it.
-            %% We still need to advance seqno, because previously we assigned this message
-            %% a unique Packet Id.
-            {[], next_seqno(Seqno)};
-        ?QOS_2 when Seqno < RecUntil ->
-            %% This message's flow has been partially completed, we need to resend a PUBREL.
-            PacketId = seqno_to_packet_id(Seqno),
-            Pub = {pubrel, PacketId},
-            {Pub, next_seqno(Seqno)};
-        _ ->
-            %% This message flow hasn't been acked and/or received, we need to resend it.
-            PacketId = seqno_to_packet_id(Seqno),
-            Pub = {PacketId, emqx_message:set_flag(dup, true, Message)},
-            {Pub, next_seqno(Seqno)}
-    end;
-publish_replay([], _, _, _, Seqno) ->
-    {[], Seqno};
-publish_replay(Messages, AckedUntil, CompUntil, RecUntil, Seqno) ->
-    flatmapfoldl(
-        fun(Message, Acc) ->
-            publish_replay(Message, AckedUntil, CompUntil, RecUntil, Acc)
-        end,
-        Seqno,
-        Messages
-    ).
-
--spec compute_pub_tracks(replies()) -> non_neg_integer().
-compute_pub_tracks(Pubs) ->
-    compute_pub_tracks(Pubs, ?TRACK_FLAGS_NONE).
-
-compute_pub_tracks(_Pubs, Tracks = ?TRACK_FLAGS_ALL) ->
-    Tracks;
-compute_pub_tracks([Pub | Rest], Tracks) ->
-    Track =
-        case Pub of
-            {_PacketId, #message{qos = ?QOS_1}} -> ?TRACK_FLAG(?ACK);
-            {_PacketId, #message{qos = ?QOS_2}} -> ?TRACK_FLAG(?COMP);
-            {pubrel, _PacketId} -> ?TRACK_FLAG(?COMP);
-            _ -> ?TRACK_FLAGS_NONE
-        end,
-    compute_pub_tracks(Rest, Track bor Tracks);
-compute_pub_tracks([], Tracks) ->
-    Tracks.
-
-keep_next_iterator(ItNext, Range = #ds_pubrange{iterator = ItFirst, misc = Misc}) ->
-    Range#ds_pubrange{
-        iterator = ItNext,
-        %% We need to keep the first iterator around, in case we need to preserve
-        %% this range again, updating still uncommitted tracks it's part of.
-        misc = Misc#{iterator_first => ItFirst}
-    }.
-
-restore_first_iterator(Range = #ds_pubrange{misc = Misc = #{iterator_first := ItFirst}}) ->
-    Range#ds_pubrange{
-        iterator = ItFirst,
-        misc = maps:remove(iterator_first, Misc)
-    }.
-
--spec preserve_range(ds_pubrange()) -> ok.
-preserve_range(Range = #ds_pubrange{type = ?T_INFLIGHT}) ->
-    mria:dirty_write(?SESSION_PUBRANGE_TAB, Range).
-
-has_track(ack, Tracks) ->
-    (?TRACK_FLAG(?ACK) band Tracks) > 0;
-has_track(comp, Tracks) ->
-    (?TRACK_FLAG(?COMP) band Tracks) > 0.
-
--spec discard_range(ds_pubrange()) -> ok.
-discard_range(#ds_pubrange{id = RangeId}) ->
-    mria:dirty_delete(?SESSION_PUBRANGE_TAB, RangeId).
-
--spec checkpoint_range(ds_pubrange()) -> ds_pubrange().
-checkpoint_range(Range0 = #ds_pubrange{type = ?T_INFLIGHT}) ->
-    Range = Range0#ds_pubrange{type = ?T_CHECKPOINT, misc = #{}},
-    ok = mria:dirty_write(?SESSION_PUBRANGE_TAB, Range),
-    Range;
-checkpoint_range(Range = #ds_pubrange{type = ?T_CHECKPOINT}) ->
-    %% This range should have been checkpointed already.
-    Range.
-
-get_last_iterator(Stream = #ds_stream{ref = StreamRef}, Checkpoints) ->
-    case maps:get(StreamRef, Checkpoints, none) of
-        none ->
-            Stream#ds_stream.beginning;
-        #ds_pubrange{iterator = ItNext} ->
-            ItNext
-    end.
-
--spec get_streams(emqx_persistent_session_ds:id()) -> [ds_stream()].
-get_streams(SessionId) ->
-    mnesia:dirty_read(?SESSION_STREAM_TAB, SessionId).
-
--spec get_committed_offset(emqx_persistent_session_ds:id(), _Name) -> seqno().
-get_committed_offset(SessionId, Name) ->
-    case mnesia:read(?SESSION_COMMITTED_OFFSET_TAB, {SessionId, Name}) of
-        [] ->
-            1;
-        [#ds_committed_offset{until = Seqno}] ->
-            Seqno
-    end.
-
--spec update_committed_offset(emqx_persistent_session_ds:id(), _Name, seqno()) -> ok.
-update_committed_offset(SessionId, Name, Until) ->
-    mria:dirty_write(?SESSION_COMMITTED_OFFSET_TAB, #ds_committed_offset{
-        id = {SessionId, Name}, until = Until
-    }).
-
-next_seqno(Seqno) ->
-    NextSeqno = Seqno + 1,
-    case seqno_to_packet_id(NextSeqno) of
-        0 ->
-            %% We skip sequence numbers that lead to PacketId = 0 to
-            %% simplify math. Note: it leads to occasional gaps in the
-            %% sequence numbers.
-            NextSeqno + 1;
-        _ ->
-            NextSeqno
-    end.
-
-packet_id_to_seqno_(NextSeqno, PacketId) ->
-    Epoch = NextSeqno bsr 16,
-    case (Epoch bsl 16) + PacketId of
-        N when N =< NextSeqno ->
-            N;
-        N ->
-            N - ?EPOCH_SIZE
-    end.
-
-range_size(#ds_pubrange{id = {_, First, _StreamRef}, until = Until}) ->
-    range_size(First, Until).
-
-range_size(FirstSeqno, UntilSeqno) ->
-    %% This function assumes that gaps in the sequence ID occur _only_ when the
-    %% packet ID wraps.
-    Size = UntilSeqno - FirstSeqno,
-    Size + (FirstSeqno bsr 16) - (UntilSeqno bsr 16).
-
-%%================================================================================
-%% stream scheduler
-
-%% group streams by the first position in the rank
--spec group_streams(list(ds_stream())) -> list(list(ds_stream())).
-group_streams(Streams) ->
-    Groups = maps:groups_from_list(
-        fun(#ds_stream{rank = {RankX, _}}) -> RankX end,
-        Streams
-    ),
-    shuffle(maps:values(Groups)).
-
--spec shuffle([A]) -> [A].
-shuffle(L0) ->
-    L1 = lists:map(
-        fun(A) ->
-            %% maybe topic/stream prioritization could be introduced here?
-            {rand:uniform(), A}
-        end,
-        L0
-    ),
-    L2 = lists:sort(L1),
-    {_, L} = lists:unzip(L2),
-    L.
-
-get_the_first_stream([Group | Groups]) ->
-    case get_next_stream_from_group(Group) of
-        {Stream, {sorted, []}} ->
-            {Stream, Groups};
-        {Stream, Group2} ->
-            {Stream, [Group2 | Groups]};
-        undefined ->
-            get_the_first_stream(Groups)
-    end;
-get_the_first_stream([]) ->
-    %% how this possible ?
-    throw(#{reason => no_valid_stream}).
-
-%% the scheduler is simple, try to get messages from the same shard, but it's okay to take turns
-get_next_stream_from_group({sorted, [H | T]}) ->
-    {H, {sorted, T}};
-get_next_stream_from_group({sorted, []}) ->
-    undefined;
-get_next_stream_from_group(Streams) ->
-    [Stream | T] = lists:sort(
-        fun(#ds_stream{rank = {_, RankA}}, #ds_stream{rank = {_, RankB}}) ->
-            RankA < RankB
-        end,
-        Streams
-    ),
-    {Stream, {sorted, T}}.
-
-get_next_n_messages_from_stream(Stream, CPs, N) ->
-    ItBegin = get_last_iterator(Stream, CPs),
-    case emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, N) of
-        {ok, _ItEnd, []} ->
-            [];
-        {ok, ItEnd, Messages} ->
-            {ItBegin, ItEnd, Messages};
-        {ok, end_of_stream} ->
-            %% TODO: how to skip this closed stream or it should be taken over by lower level layer
-            []
-    end.
-
-%%================================================================================
-
--spec flatmapfoldl(fun((X, Acc) -> {Y | [Y], Acc}), Acc, [X]) -> {[Y], Acc}.
-flatmapfoldl(_Fun, Acc, []) ->
-    {[], Acc};
-flatmapfoldl(Fun, Acc, [X | Xs]) ->
-    {Ys, NAcc} = Fun(X, Acc),
-    {Zs, FAcc} = flatmapfoldl(Fun, NAcc, Xs),
-    case is_list(Ys) of
-        true ->
-            {Ys ++ Zs, FAcc};
-        _ ->
-            {[Ys | Zs], FAcc}
-    end.
-
-ro_transaction(Fun) ->
-    {atomic, Res} = mria:ro_transaction(?DS_MRIA_SHARD, Fun),
-    Res.
-
--ifdef(TEST).
-
-%% This test only tests boundary conditions (to make sure property-based test didn't skip them):
-packet_id_to_seqno_test() ->
-    %% Packet ID = 1; first epoch:
-    ?assertEqual(1, packet_id_to_seqno_(1, 1)),
-    ?assertEqual(1, packet_id_to_seqno_(10, 1)),
-    ?assertEqual(1, packet_id_to_seqno_(1 bsl 16 - 1, 1)),
-    ?assertEqual(1, packet_id_to_seqno_(1 bsl 16, 1)),
-    %% Packet ID = 1; second and 3rd epochs:
-    ?assertEqual(1 bsl 16 + 1, packet_id_to_seqno_(1 bsl 16 + 1, 1)),
-    ?assertEqual(1 bsl 16 + 1, packet_id_to_seqno_(2 bsl 16, 1)),
-    ?assertEqual(2 bsl 16 + 1, packet_id_to_seqno_(2 bsl 16 + 1, 1)),
-    %% Packet ID = 16#ffff:
-    PID = 1 bsl 16 - 1,
-    ?assertEqual(PID, packet_id_to_seqno_(PID, PID)),
-    ?assertEqual(PID, packet_id_to_seqno_(1 bsl 16, PID)),
-    ?assertEqual(1 bsl 16 + PID, packet_id_to_seqno_(2 bsl 16, PID)),
-    ok.
-
-packet_id_to_seqno_test_() ->
-    Opts = [{numtests, 1000}, {to_file, user}],
-    {timeout, 30, fun() -> ?assert(proper:quickcheck(packet_id_to_seqno_prop(), Opts)) end}.
-
-packet_id_to_seqno_prop() ->
-    ?FORALL(
-        NextSeqNo,
-        next_seqno_gen(),
-        ?FORALL(
-            SeqNo,
-            seqno_gen(NextSeqNo),
-            begin
-                PacketId = seqno_to_packet_id(SeqNo),
-                ?assertEqual(SeqNo, packet_id_to_seqno_(NextSeqNo, PacketId)),
-                true
-            end
-        )
-    ).
-
-next_seqno_gen() ->
-    ?LET(
-        {Epoch, Offset},
-        {non_neg_integer(), non_neg_integer()},
-        Epoch bsl 16 + Offset
-    ).
-
-seqno_gen(NextSeqNo) ->
-    WindowSize = 1 bsl 16 - 1,
-    Min = max(0, NextSeqNo - WindowSize),
-    Max = max(0, NextSeqNo - 1),
-    range(Min, Max).
-
-range_size_test_() ->
-    [
-        ?_assertEqual(0, range_size(42, 42)),
-        ?_assertEqual(1, range_size(42, 43)),
-        ?_assertEqual(1, range_size(16#ffff, 16#10001)),
-        ?_assertEqual(16#ffff - 456 + 123, range_size(16#1f0000 + 456, 16#200000 + 123))
-    ].
-
-compute_inflight_range_test_() ->
-    [
-        ?_assertEqual(
-            {#{ack => 1, comp => 1}, 1},
-            compute_inflight_range([])
-        ),
-        ?_assertEqual(
-            {#{ack => 12, comp => 13}, 42},
-            compute_inflight_range([
-                #ds_pubrange{id = {<<>>, 1, 0}, until = 2, type = ?T_CHECKPOINT},
-                #ds_pubrange{id = {<<>>, 4, 0}, until = 8, type = ?T_CHECKPOINT},
-                #ds_pubrange{id = {<<>>, 11, 0}, until = 12, type = ?T_CHECKPOINT},
-                #ds_pubrange{
-                    id = {<<>>, 12, 0},
-                    until = 13,
-                    type = ?T_INFLIGHT,
-                    tracks = ?TRACK_FLAG(?ACK)
-                },
-                #ds_pubrange{
-                    id = {<<>>, 13, 0},
-                    until = 20,
-                    type = ?T_INFLIGHT,
-                    tracks = ?TRACK_FLAG(?COMP)
-                },
-                #ds_pubrange{
-                    id = {<<>>, 20, 0},
-                    until = 42,
-                    type = ?T_INFLIGHT,
-                    tracks = ?TRACK_FLAG(?ACK) bor ?TRACK_FLAG(?COMP)
-                }
-            ])
-        ),
-        ?_assertEqual(
-            {#{ack => 13, comp => 13}, 13},
-            compute_inflight_range([
-                #ds_pubrange{id = {<<>>, 1, 0}, until = 2, type = ?T_CHECKPOINT},
-                #ds_pubrange{id = {<<>>, 4, 0}, until = 8, type = ?T_CHECKPOINT},
-                #ds_pubrange{id = {<<>>, 11, 0}, until = 12, type = ?T_CHECKPOINT},
-                #ds_pubrange{id = {<<>>, 12, 0}, until = 13, type = ?T_CHECKPOINT}
-            ])
-        )
-    ].
-
--endif.

تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است
+ 647 - 686
apps/emqx/src/emqx_persistent_session_ds.erl


+ 20 - 59
apps/emqx/src/emqx_persistent_session_ds.hrl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -25,66 +25,27 @@
 -define(SESSION_COMMITTED_OFFSET_TAB, emqx_ds_committed_offset_tab).
 -define(DS_MRIA_SHARD, emqx_ds_session_shard).
 
--define(T_INFLIGHT, 1).
--define(T_CHECKPOINT, 2).
-
--record(ds_sub, {
-    id :: emqx_persistent_session_ds:subscription_id(),
-    start_time :: emqx_ds:time(),
-    props = #{} :: map(),
-    extra = #{} :: map()
-}).
--type ds_sub() :: #ds_sub{}.
-
--record(ds_stream, {
-    session :: emqx_persistent_session_ds:id(),
-    ref :: _StreamRef,
-    stream :: emqx_ds:stream(),
-    rank :: emqx_ds:stream_rank(),
-    beginning :: emqx_ds:iterator()
-}).
--type ds_stream() :: #ds_stream{}.
-
--record(ds_pubrange, {
-    id :: {
-        %% What session this range belongs to.
-        _Session :: emqx_persistent_session_ds:id(),
-        %% Where this range starts.
-        _First :: emqx_persistent_message_ds_replayer:seqno(),
-        %% Which stream this range is over.
-        _StreamRef
-    },
-    %% Where this range ends: the first seqno that is not included in the range.
-    until :: emqx_persistent_message_ds_replayer:seqno(),
-    %% Type of a range:
-    %% * Inflight range is a range of yet unacked messages from this stream.
-    %% * Checkpoint range was already acked, its purpose is to keep track of the
-    %%   very last iterator for this stream.
-    type :: ?T_INFLIGHT | ?T_CHECKPOINT,
-    %% What commit tracks this range is part of.
-    tracks = 0 :: non_neg_integer(),
-    %% Meaning of this depends on the type of the range:
-    %% * For inflight range, this is the iterator pointing to the first message in
-    %%   the range.
-    %% * For checkpoint range, this is the iterator pointing right past the last
-    %%   message in the range.
-    iterator :: emqx_ds:iterator(),
-    %% Reserved for future use.
-    misc = #{} :: map()
-}).
--type ds_pubrange() :: #ds_pubrange{}.
-
--record(ds_committed_offset, {
-    id :: {
-        %% What session this marker belongs to.
-        _Session :: emqx_persistent_session_ds:id(),
-        %% Marker name.
-        _CommitType
-    },
-    %% Where this marker is pointing to: the first seqno that is not marked.
-    until :: emqx_persistent_message_ds_replayer:seqno()
+%% State of the stream:
+-record(ifs, {
+    rank_y :: emqx_ds:rank_y(),
+    %% Iterator at the end of the last batch:
+    it_end :: emqx_ds:iterator() | undefined | end_of_stream,
+    %% Size of the last batch:
+    batch_size :: pos_integer() | undefined,
+    %% Key that points at the beginning of the batch:
+    batch_begin_key :: binary() | undefined,
+    %% Number of messages collected in the last batch:
+    batch_n_messages :: pos_integer() | undefined,
+    %% Session sequence number at the time when the batch was fetched:
+    first_seqno_qos1 :: emqx_persistent_session_ds:seqno() | undefined,
+    first_seqno_qos2 :: emqx_persistent_session_ds:seqno() | undefined,
+    %% Sequence numbers that the client must PUBACK or PUBREL
+    %% before we can consider the batch to be fully replayed:
+    last_seqno_qos1 :: emqx_persistent_session_ds:seqno() | undefined,
+    last_seqno_qos2 :: emqx_persistent_session_ds:seqno() | undefined
 }).
 
+%% TODO: remove
 -record(session, {
     %% same as clientid
     id :: emqx_persistent_session_ds:id(),

+ 111 - 0
apps/emqx/src/emqx_persistent_session_ds_inflight.erl

@@ -0,0 +1,111 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_persistent_session_ds_inflight).
+
+%% API:
+-export([new/1, push/2, pop/1, n_buffered/1, n_inflight/1, inc_send_quota/1, receive_maximum/1]).
+
+%% behavior callbacks:
+-export([]).
+
+%% internal exports:
+-export([]).
+
+-export_type([t/0]).
+
+-include("emqx.hrl").
+-include("emqx_mqtt.hrl").
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+-record(inflight, {
+    queue :: queue:queue(),
+    receive_maximum :: pos_integer(),
+    n_inflight = 0 :: non_neg_integer(),
+    n_qos0 = 0 :: non_neg_integer(),
+    n_qos1 = 0 :: non_neg_integer(),
+    n_qos2 = 0 :: non_neg_integer()
+}).
+
+-type t() :: #inflight{}.
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+-spec new(non_neg_integer()) -> t().
+new(ReceiveMaximum) when ReceiveMaximum > 0 ->
+    #inflight{queue = queue:new(), receive_maximum = ReceiveMaximum}.
+
+-spec receive_maximum(t()) -> pos_integer().
+receive_maximum(#inflight{receive_maximum = ReceiveMaximum}) ->
+    ReceiveMaximum.
+
+-spec push({emqx_types:packet_id() | undefined, emqx_types:message()}, t()) -> t().
+push(Val = {_PacketId, Msg}, Rec) ->
+    #inflight{queue = Q0, n_qos0 = NQos0, n_qos1 = NQos1, n_qos2 = NQos2} = Rec,
+    Q = queue:in(Val, Q0),
+    case Msg#message.qos of
+        ?QOS_0 ->
+            Rec#inflight{queue = Q, n_qos0 = NQos0 + 1};
+        ?QOS_1 ->
+            Rec#inflight{queue = Q, n_qos1 = NQos1 + 1};
+        ?QOS_2 ->
+            Rec#inflight{queue = Q, n_qos2 = NQos2 + 1}
+    end.
+
+-spec pop(t()) -> {[{emqx_types:packet_id() | undefined, emqx_types:message()}], t()}.
+pop(Inflight = #inflight{receive_maximum = ReceiveMaximum}) ->
+    do_pop(ReceiveMaximum, Inflight, []).
+
+-spec n_buffered(t()) -> non_neg_integer().
+n_buffered(#inflight{n_qos0 = NQos0, n_qos1 = NQos1, n_qos2 = NQos2}) ->
+    NQos0 + NQos1 + NQos2.
+
+-spec n_inflight(t()) -> non_neg_integer().
+n_inflight(#inflight{n_inflight = NInflight}) ->
+    NInflight.
+
+%% https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Flow_Control
+-spec inc_send_quota(t()) -> {non_neg_integer(), t()}.
+inc_send_quota(Rec = #inflight{n_inflight = NInflight0}) ->
+    NInflight = max(NInflight0 - 1, 0),
+    {NInflight, Rec#inflight{n_inflight = NInflight}}.
+
+%%================================================================================
+%% Internal functions
+%%================================================================================
+
+do_pop(ReceiveMaximum, Rec0 = #inflight{n_inflight = NInflight, queue = Q0}, Acc) ->
+    case NInflight < ReceiveMaximum andalso queue:out(Q0) of
+        {{value, Val}, Q} ->
+            #inflight{n_qos0 = NQos0, n_qos1 = NQos1, n_qos2 = NQos2} = Rec0,
+            {_PacketId, #message{qos = Qos}} = Val,
+            Rec =
+                case Qos of
+                    ?QOS_0 ->
+                        Rec0#inflight{queue = Q, n_qos0 = NQos0 - 1};
+                    ?QOS_1 ->
+                        Rec0#inflight{queue = Q, n_qos1 = NQos1 - 1, n_inflight = NInflight + 1};
+                    ?QOS_2 ->
+                        Rec0#inflight{queue = Q, n_qos2 = NQos2 - 1, n_inflight = NInflight + 1}
+                end,
+            do_pop(ReceiveMaximum, Rec, [Val | Acc]);
+        _ ->
+            {lists:reverse(Acc), Rec0}
+    end.

+ 30 - 29
apps/emqx/src/emqx_persistent_session_ds_state.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -26,7 +26,7 @@
 
 -export([create_tables/0]).
 
--export([open/1, create_new/1, delete/1, commit/1, print_session/1]).
+-export([open/1, create_new/1, delete/1, commit/1, print_session/1, list_sessions/0]).
 -export([get_created_at/1, set_created_at/2]).
 -export([get_last_alive_at/1, set_last_alive_at/2]).
 -export([get_conninfo/1, set_conninfo/2]).
@@ -38,7 +38,7 @@
 %% internal exports:
 -export([]).
 
--export_type([t/0, seqno_type/0]).
+-export_type([t/0, subscriptions/0, seqno_type/0, stream_key/0]).
 
 -include("emqx_persistent_session_ds.hrl").
 
@@ -46,12 +46,11 @@
 %% Type declarations
 %%================================================================================
 
+-type subscriptions() :: emqx_topic_gbt:t(_SubId, emqx_persistent_session_ds:subscription()).
+
 %% Generic key-value wrapper that is used for exporting arbitrary
 %% terms to mnesia:
--record(kv, {
-    k :: term(),
-    v :: map()
-}).
+-record(kv, {k, v}).
 
 %% Persistent map.
 %%
@@ -62,9 +61,9 @@
 %% It should be possible to make frequent changes to the pmap without
 %% stressing Mria.
 %%
-%% It's implemented as two maps: `clean' and `dirty'. Updates are made
-%% to the `dirty' area. `pmap_commit' function saves the updated
-%% entries to Mnesia and moves them to the `clean' area.
+%% It's implemented as three maps: `clean', `dirty' and `tombstones'.
+%% Updates are made to the `dirty' area. `pmap_commit' function saves
+%% the updated entries to Mnesia and moves them to the `clean' area.
 -record(pmap, {table, clean, dirty, tombstones}).
 
 -type pmap(K, V) ::
@@ -87,15 +86,17 @@
         ?conninfo => emqx_types:conninfo()
     }.
 
--type seqno_type() :: next | acked | pubrel.
+-type seqno_type() :: term().
+
+-type stream_key() :: {emqx_ds:rank_x(), _SubId}.
 
 -opaque t() :: #{
     id := emqx_persistent_session_ds:id(),
     dirty := boolean(),
     metadata := metadata(),
-    subscriptions := emqx_persistent_session_ds:subscriptions(),
+    subscriptions := subscriptions(),
     seqnos := pmap(seqno_type(), emqx_persistent_session_ds:seqno()),
-    streams := pmap(emqx_ds:stream(), emqx_persistent_message_ds_replayer:stream_state()),
+    streams := pmap(emqx_ds:stream(), emqx_persistent_session_ds:stream_state()),
     ranks := pmap(term(), integer())
 }.
 
@@ -104,7 +105,7 @@
 -define(stream_tab, emqx_ds_session_streams).
 -define(seqno_tab, emqx_ds_session_seqnos).
 -define(rank_tab, emqx_ds_session_ranks).
--define(bag_tables, [?stream_tab, ?seqno_tab, ?rank_tab]).
+-define(bag_tables, [?stream_tab, ?seqno_tab, ?rank_tab, ?subscription_tab]).
 
 %%================================================================================
 %% API funcions
@@ -125,7 +126,7 @@ create_tables() ->
     [create_kv_bag_table(Table) || Table <- ?bag_tables],
     mria:wait_for_tables([?session_tab | ?bag_tables]).
 
--spec open(emqx_persistent_session_ds:session_id()) -> {ok, t()} | undefined.
+-spec open(emqx_persistent_session_ds:id()) -> {ok, t()} | undefined.
 open(SessionId) ->
     ro_transaction(fun() ->
         case kv_restore(?session_tab, SessionId) of
@@ -150,13 +151,13 @@ print_session(SessionId) ->
     case open(SessionId) of
         undefined ->
             undefined;
-        #{
+        {ok, #{
             metadata := Metadata,
             subscriptions := SubsGBT,
             streams := Streams,
             seqnos := Seqnos,
             ranks := Ranks
-        } ->
+        }} ->
             Subs = emqx_topic_gbt:fold(
                 fun(Key, Sub, Acc) -> maps:put(Key, Sub, Acc) end,
                 #{},
@@ -171,6 +172,10 @@ print_session(SessionId) ->
             }
     end.
 
+-spec list_sessions() -> [emqx_persistent_session_ds:id()].
+list_sessions() ->
+    mnesia:dirty_all_keys(?session_tab).
+
 -spec delete(emqx_persistent_session_ds:id()) -> ok.
 delete(Id) ->
     transaction(
@@ -187,7 +192,6 @@ commit(
     Rec = #{
         id := SessionId,
         metadata := Metadata,
-        subscriptions := Subs,
         streams := Streams,
         seqnos := SeqNos,
         ranks := Ranks
@@ -196,10 +200,9 @@ commit(
     transaction(fun() ->
         kv_persist(?session_tab, SessionId, Metadata),
         Rec#{
-            subscriptions => pmap_commit(SessionId, Subs),
             streams => pmap_commit(SessionId, Streams),
             seqnos => pmap_commit(SessionId, SeqNos),
-            ranksz => pmap_commit(SessionId, Ranks),
+            ranks => pmap_commit(SessionId, Ranks),
             dirty => false
         }
     end).
@@ -247,18 +250,16 @@ set_conninfo(Val, Rec) ->
 
 %%
 
--spec get_stream(emqx_persistent_session_ds:stream(), t()) ->
-    emqx_persistent_message_ds_replayer:stream_state() | undefined.
+-spec get_stream(stream_key(), t()) ->
+    emqx_persistent_session_ds:stream_state() | undefined.
 get_stream(Key, Rec) ->
     gen_get(streams, Key, Rec).
 
--spec put_stream(
-    emqx_persistent_session_ds:stream(), emqx_persistent_message_ds_replayer:stream_state(), t()
-) -> t().
+-spec put_stream(stream_key(), emqx_persistent_session_ds:stream_state(), t()) -> t().
 put_stream(Key, Val, Rec) ->
     gen_put(streams, Key, Val, Rec).
 
--spec del_stream(emqx_persistent_session_ds:stream(), t()) -> t().
+-spec del_stream(stream_key(), t()) -> t().
 del_stream(Key, Rec) ->
     gen_del(stream, Key, Rec).
 
@@ -296,12 +297,12 @@ fold_ranks(Fun, Acc, Rec) ->
 
 %%
 
--spec get_subscriptions(t()) -> emqx_persistent_session_ds:subscriptions().
+-spec get_subscriptions(t()) -> subscriptions().
 get_subscriptions(#{subscriptions := Subs}) ->
     Subs.
 
 -spec put_subscription(
-    emqx_persistent_session_ds:subscription_id(),
+    emqx_persistent_session_ds:topic_filter(),
     _SubId,
     emqx_persistent_session_ds:subscription(),
     t()
@@ -474,7 +475,7 @@ kv_bag_persist(Tab, SessionId, Key, Val0) ->
     kv_bag_delete(Tab, SessionId, Key),
     %% Write data to mnesia:
     Val = encoder(encode, Tab, Val0),
-    mnesia:write(Tab, #kv{k = SessionId, v = {Key, Val}}).
+    mnesia:write(Tab, #kv{k = SessionId, v = {Key, Val}}, write).
 
 kv_bag_restore(Tab, SessionId) ->
     [{K, encoder(decode, Tab, V)} || #kv{v = {K, V}} <- mnesia:read(Tab, SessionId)].

+ 1 - 1
apps/emqx/src/emqx_schema.erl

@@ -1810,7 +1810,7 @@ fields("session_persistence") ->
             sc(
                 pos_integer(),
                 #{
-                    default => 1000,
+                    default => 100,
                     desc => ?DESC(session_ds_max_batch_size)
                 }
             )},

+ 11 - 13
apps/emqx/src/emqx_session.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -409,12 +409,8 @@ enrich_delivers(ClientInfo, Delivers, Session) ->
 enrich_delivers(_ClientInfo, [], _UpgradeQoS, _Session) ->
     [];
 enrich_delivers(ClientInfo, [D | Rest], UpgradeQoS, Session) ->
-    case enrich_deliver(ClientInfo, D, UpgradeQoS, Session) of
-        [] ->
-            enrich_delivers(ClientInfo, Rest, UpgradeQoS, Session);
-        Msg ->
-            [Msg | enrich_delivers(ClientInfo, Rest, UpgradeQoS, Session)]
-    end.
+    enrich_deliver(ClientInfo, D, UpgradeQoS, Session) ++
+        enrich_delivers(ClientInfo, Rest, UpgradeQoS, Session).
 
 enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) ->
     SubOpts =
@@ -435,13 +431,15 @@ enrich_message(
     _ = emqx_session_events:handle_event(ClientInfo, {dropped, Msg, no_local}),
     [];
 enrich_message(_ClientInfo, MsgIn, SubOpts = #{}, UpgradeQoS) ->
-    maps:fold(
-        fun(SubOpt, V, Msg) -> enrich_subopts(SubOpt, V, Msg, UpgradeQoS) end,
-        MsgIn,
-        SubOpts
-    );
+    [
+        maps:fold(
+            fun(SubOpt, V, Msg) -> enrich_subopts(SubOpt, V, Msg, UpgradeQoS) end,
+            MsgIn,
+            SubOpts
+        )
+    ];
 enrich_message(_ClientInfo, Msg, undefined, _UpgradeQoS) ->
-    Msg.
+    [Msg].
 
 enrich_subopts(nl, 1, Msg, _) ->
     emqx_message:set_flag(nl, Msg);

+ 10 - 10
apps/emqx/src/emqx_topic_gbt.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -39,11 +39,11 @@
 -type match(ID) :: key(ID).
 
 -opaque t(ID, Value) :: gb_trees:tree(key(ID), Value).
--opaque t() :: t(_ID, _Value).
+-type t() :: t(_ID, _Value).
 
 %% @doc Create a new gb_tree and store it in the persitent_term with the
 %% given name.
--spec new() -> t().
+-spec new() -> t(_ID, _Value).
 new() ->
     gb_trees:empty().
 
@@ -54,19 +54,19 @@ size(Gbt) ->
 %% @doc Insert a new entry into the index that associates given topic filter to given
 %% record ID, and attaches arbitrary record to the entry. This allows users to choose
 %% between regular and "materialized" indexes, for example.
--spec insert(emqx_types:topic() | words(), _ID, _Record, t()) -> t().
+-spec insert(emqx_types:topic() | words(), ID, Record, t(ID, Record)) -> t(ID, Record).
 insert(Filter, ID, Record, Gbt) ->
     Key = key(Filter, ID),
     gb_trees:enter(Key, Record, Gbt).
 
 %% @doc Delete an entry from the index that associates given topic filter to given
 %% record ID. Deleting non-existing entry is not an error.
--spec delete(emqx_types:topic() | words(), _ID, t()) -> t().
+-spec delete(emqx_types:topic() | words(), ID, t(ID, Record)) -> t(ID, Record).
 delete(Filter, ID, Gbt) ->
     Key = key(Filter, ID),
     gb_trees:delete_any(Key, Gbt).
 
--spec lookup(emqx_types:topic() | words(), _ID, t(), Default) -> _Record | Default.
+-spec lookup(emqx_types:topic() | words(), ID, t(ID, Record), Default) -> Record | Default.
 lookup(Filter, ID, Gbt, Default) ->
     Key = key(Filter, ID),
     case gb_trees:lookup(Key, Gbt) of
@@ -76,7 +76,7 @@ lookup(Filter, ID, Gbt, Default) ->
             Default
     end.
 
--spec fold(fun((key(_ID), _Record, Acc) -> Acc), Acc, t()) -> Acc.
+-spec fold(fun((key(ID), Record, Acc) -> Acc), Acc, t(ID, Record)) -> Acc.
 fold(Fun, Acc, Gbt) ->
     Iter = gb_trees:iterator(Gbt),
     fold_iter(Fun, Acc, Iter).
@@ -91,13 +91,13 @@ fold_iter(Fun, Acc, Iter) ->
 
 %% @doc Match given topic against the index and return the first match, or `false` if
 %% no match is found.
--spec match(emqx_types:topic(), t()) -> match(_ID) | false.
+-spec match(emqx_types:topic(), t(ID, _Record)) -> match(ID) | false.
 match(Topic, Gbt) ->
     emqx_trie_search:match(Topic, make_nextf(Gbt)).
 
 %% @doc Match given topic against the index and return _all_ matches.
 %% If `unique` option is given, return only unique matches by record ID.
--spec matches(emqx_types:topic(), t(), emqx_trie_search:opts()) -> [match(_ID)].
+-spec matches(emqx_types:topic(), t(ID, _Record), emqx_trie_search:opts()) -> [match(ID)].
 matches(Topic, Gbt, Opts) ->
     emqx_trie_search:matches(Topic, make_nextf(Gbt), Opts).
 
@@ -112,7 +112,7 @@ get_topic(Key) ->
     emqx_trie_search:get_topic(Key).
 
 %% @doc Fetch the record associated with the match.
--spec get_record(match(_ID), t()) -> _Record.
+-spec get_record(match(ID), t(ID, Record)) -> Record.
 get_record(Key, Gbt) ->
     gb_trees:get(Key, Gbt).
 

+ 6 - 2
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -36,7 +36,7 @@ all() ->
         % NOTE
         % Tests are disabled while existing session persistence impl is being
         % phased out.
-        {group, persistence_disabled},
+        %{group, persistence_disabled},
         {group, persistence_enabled}
     ].
 
@@ -71,7 +71,11 @@ init_per_group(persistence_disabled, Config) ->
     ];
 init_per_group(persistence_enabled, Config) ->
     [
-        {emqx_config, "session_persistence { enable = true }"},
+        {emqx_config,
+            "session_persistence {\n"
+            "  enable = true\n"
+            "  renew_streams_interval = 100ms\n"
+            "}"},
         {persistence, ds}
         | Config
     ];

+ 1 - 1
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -594,7 +594,7 @@ fields("node") ->
             sc(
                 hoconsc:enum([gen_rpc, distr]),
                 #{
-                    mapping => "mria.shard_transport",
+                    mapping => "mria.shardp_transport",
                     importance => ?IMPORTANCE_HIDDEN,
                     default => distr,
                     desc => ?DESC(db_default_shard_transport)

+ 7 - 1
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -47,6 +47,8 @@
     topic_filter/0,
     topic/0,
     stream/0,
+    rank_x/0,
+    rank_y/0,
     stream_rank/0,
     iterator/0,
     iterator_id/0,
@@ -77,7 +79,11 @@
 %% Parsed topic filter.
 -type topic_filter() :: list(binary() | '+' | '#' | '').
 
--type stream_rank() :: {term(), integer()}.
+-type rank_x() :: term().
+
+-type rank_y() :: integer().
+
+-type stream_rank() :: {rank_x(), rank_y()}.
 
 %% TODO: Not implemented
 -type iterator_id() :: term().

+ 1 - 1
apps/emqx_exhook/test/emqx_exhook_SUITE.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.