|
|
@@ -21,7 +21,7 @@
|
|
|
%% API:
|
|
|
-export([new/0, open/1, next_packet_id/1, n_inflight/1]).
|
|
|
|
|
|
--export([poll/4, replay/2, commit_offset/4, commit_marker/4]).
|
|
|
+-export([poll/4, replay/2, commit_offset/4]).
|
|
|
|
|
|
-export([seqno_to_packet_id/1, packet_id_to_seqno/2]).
|
|
|
|
|
|
@@ -55,11 +55,11 @@
|
|
|
-type seqno() :: non_neg_integer().
|
|
|
|
|
|
-type track() :: ack | comp.
|
|
|
--type marker() :: rec.
|
|
|
+-type commit_type() :: rec.
|
|
|
|
|
|
-record(inflight, {
|
|
|
next_seqno = 1 :: seqno(),
|
|
|
- commits = #{ack => 1, comp => 1, rec => 1} :: #{track() | marker() => seqno()},
|
|
|
+ commits = #{ack => 1, comp => 1, rec => 1} :: #{track() | commit_type() => seqno()},
|
|
|
%% Ranges are sorted in ascending order of their sequence numbers.
|
|
|
offset_ranges = [] :: [ds_pubrange()]
|
|
|
}).
|
|
|
@@ -82,7 +82,7 @@ new() ->
|
|
|
-spec open(emqx_persistent_session_ds:id()) -> inflight().
|
|
|
open(SessionId) ->
|
|
|
{Ranges, RecUntil} = ro_transaction(
|
|
|
- fun() -> {get_ranges(SessionId), get_marker(SessionId, rec)} end
|
|
|
+ fun() -> {get_ranges(SessionId), get_committed_offset(SessionId, rec)} end
|
|
|
),
|
|
|
{Commits, NextSeqno} = compute_inflight_range(Ranges),
|
|
|
#inflight{
|
|
|
@@ -128,14 +128,16 @@ replay(ReplyFun, Inflight0 = #inflight{offset_ranges = Ranges0}) ->
|
|
|
Inflight = Inflight0#inflight{offset_ranges = Ranges},
|
|
|
{Replies, Inflight}.
|
|
|
|
|
|
--spec commit_offset(emqx_persistent_session_ds:id(), track(), emqx_types:packet_id(), inflight()) ->
|
|
|
- {_IsValidOffset :: boolean(), inflight()}.
|
|
|
+-spec commit_offset(emqx_persistent_session_ds:id(), Offset, emqx_types:packet_id(), inflight()) ->
|
|
|
+ {_IsValidOffset :: boolean(), inflight()}
|
|
|
+when
|
|
|
+ Offset :: track() | commit_type().
|
|
|
commit_offset(
|
|
|
SessionId,
|
|
|
Track,
|
|
|
PacketId,
|
|
|
Inflight0 = #inflight{commits = Commits}
|
|
|
-) ->
|
|
|
+) when Track == ack orelse Track == comp ->
|
|
|
case validate_commit(Track, PacketId, Inflight0) of
|
|
|
CommitUntil when is_integer(CommitUntil) ->
|
|
|
%% TODO
|
|
|
@@ -148,20 +150,17 @@ commit_offset(
|
|
|
{true, Inflight};
|
|
|
false ->
|
|
|
{false, Inflight0}
|
|
|
- end.
|
|
|
-
|
|
|
--spec commit_marker(emqx_persistent_session_ds:id(), marker(), emqx_types:packet_id(), inflight()) ->
|
|
|
- {_IsValidMarker :: boolean(), inflight()}.
|
|
|
-commit_marker(
|
|
|
+ end;
|
|
|
+commit_offset(
|
|
|
SessionId,
|
|
|
- Marker = rec,
|
|
|
+ CommitType = rec,
|
|
|
PacketId,
|
|
|
Inflight0 = #inflight{commits = Commits}
|
|
|
) ->
|
|
|
- case validate_commit(Marker, PacketId, Inflight0) of
|
|
|
+ case validate_commit(CommitType, PacketId, Inflight0) of
|
|
|
CommitUntil when is_integer(CommitUntil) ->
|
|
|
- update_marker(SessionId, Marker, CommitUntil),
|
|
|
- Inflight = Inflight0#inflight{commits = Commits#{Marker := CommitUntil}},
|
|
|
+ update_committed_offset(SessionId, CommitType, CommitUntil),
|
|
|
+ Inflight = Inflight0#inflight{commits = Commits#{CommitType := CommitUntil}},
|
|
|
{true, Inflight};
|
|
|
false ->
|
|
|
{false, Inflight0}
|
|
|
@@ -187,7 +186,7 @@ poll(ReplyFun, SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSize
|
|
|
|
|
|
%% Which seqno this track is committed until.
|
|
|
%% "Until" means this is first seqno that is _not yet committed_ for this track.
|
|
|
--spec committed_until(track() | marker(), inflight()) -> seqno().
|
|
|
+-spec committed_until(track() | commit_type(), inflight()) -> seqno().
|
|
|
committed_until(Track, #inflight{commits = Commits}) ->
|
|
|
maps:get(Track, Commits).
|
|
|
|
|
|
@@ -491,18 +490,20 @@ get_last_iterator(DSStream = #ds_stream{ref = StreamRef}, Ranges) ->
|
|
|
get_streams(SessionId) ->
|
|
|
mnesia:dirty_read(?SESSION_STREAM_TAB, SessionId).
|
|
|
|
|
|
--spec get_marker(emqx_persistent_session_ds:id(), _Name) -> seqno().
|
|
|
-get_marker(SessionId, Name) ->
|
|
|
- case mnesia:read(?SESSION_MARKER_TAB, {SessionId, Name}) of
|
|
|
+-spec get_committed_offset(emqx_persistent_session_ds:id(), _Name) -> seqno().
|
|
|
+get_committed_offset(SessionId, Name) ->
|
|
|
+ case mnesia:read(?SESSION_COMMITTED_OFFSET_TAB, {SessionId, Name}) of
|
|
|
[] ->
|
|
|
1;
|
|
|
- [#ds_marker{until = Seqno}] ->
|
|
|
+ [#ds_committed_offset{until = Seqno}] ->
|
|
|
Seqno
|
|
|
end.
|
|
|
|
|
|
--spec update_marker(emqx_persistent_session_ds:id(), _Name, seqno()) -> ok.
|
|
|
-update_marker(SessionId, Name, Until) ->
|
|
|
- mria:dirty_write(?SESSION_MARKER_TAB, #ds_marker{id = {SessionId, Name}, until = Until}).
|
|
|
+-spec update_committed_offset(emqx_persistent_session_ds:id(), _Name, seqno()) -> ok.
|
|
|
+update_committed_offset(SessionId, Name, Until) ->
|
|
|
+ mria:dirty_write(?SESSION_COMMITTED_OFFSET_TAB, #ds_committed_offset{
|
|
|
+ id = {SessionId, Name}, until = Until
|
|
|
+ }).
|
|
|
|
|
|
next_seqno(Seqno) ->
|
|
|
NextSeqno = Seqno + 1,
|