|
@@ -703,7 +703,7 @@ replay(ClientInfo, [], Session0 = #{s := S0}) ->
|
|
|
|
|
|
|
|
replay_streams(Session0 = #{replay := [{StreamKey, Srs0} | Rest]}, ClientInfo) ->
|
|
replay_streams(Session0 = #{replay := [{StreamKey, Srs0} | Rest]}, ClientInfo) ->
|
|
|
case replay_batch(StreamKey, Srs0, Session0, ClientInfo) of
|
|
case replay_batch(StreamKey, Srs0, Session0, ClientInfo) of
|
|
|
- Session = #{} ->
|
|
|
|
|
|
|
+ {ok, _Srs, Session} ->
|
|
|
replay_streams(Session#{replay := Rest}, ClientInfo);
|
|
replay_streams(Session#{replay := Rest}, ClientInfo);
|
|
|
{error, recoverable, Reason} ->
|
|
{error, recoverable, Reason} ->
|
|
|
RetryTimeout = ?TIMEOUT_RETRY_REPLAY,
|
|
RetryTimeout = ?TIMEOUT_RETRY_REPLAY,
|
|
@@ -731,8 +731,9 @@ replay_streams(Session0 = #{replay := []}, _ClientInfo) ->
|
|
|
) ->
|
|
) ->
|
|
|
session() | emqx_ds:error(_).
|
|
session() | emqx_ds:error(_).
|
|
|
replay_batch(StreamKey, Srs0, Session0, ClientInfo) ->
|
|
replay_batch(StreamKey, Srs0, Session0, ClientInfo) ->
|
|
|
- Pending = fetch(true, StreamKey, Srs0, undefined),
|
|
|
|
|
- case enqueue_batch(Session0, ClientInfo, Pending, receive_pending(Pending)) of
|
|
|
|
|
|
|
+ #srs{it_begin = ItBegin, batch_size = BatchSize} = Srs0,
|
|
|
|
|
+ FetchResult = emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, BatchSize),
|
|
|
|
|
+ case enqueue_batch(true, Session0, ClientInfo, StreamKey, ItBegin, FetchResult) of
|
|
|
{ok, Srs, Session} ->
|
|
{ok, Srs, Session} ->
|
|
|
%% Assert:
|
|
%% Assert:
|
|
|
Srs =:= Srs0 orelse
|
|
Srs =:= Srs0 orelse
|
|
@@ -740,8 +741,8 @@ replay_batch(StreamKey, Srs0, Session0, ClientInfo) ->
|
|
|
expected => Srs0,
|
|
expected => Srs0,
|
|
|
got => Srs
|
|
got => Srs
|
|
|
}),
|
|
}),
|
|
|
- Session;
|
|
|
|
|
- {error, _, _} = Error ->
|
|
|
|
|
|
|
+ {ok, Srs, Session};
|
|
|
|
|
+ {{error, _, _} = Error, _Srs, _Session} ->
|
|
|
Error
|
|
Error
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
@@ -1099,8 +1100,21 @@ receive_pending(#pending_next{ref = Ref}) ->
|
|
|
#ds_async_result{ref = Ref, data = Data} -> Data
|
|
#ds_async_result{ref = Ref, data = Data} -> Data
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-enqueue_batch(Session = #{s := S}, ClientInfo, Pending, FetchResult) ->
|
|
|
|
|
- #pending_next{stream_key = StreamKey, is_replay = IsReplay} = Pending,
|
|
|
|
|
|
|
+enqueue_batch(Session, ClientInfo, Pending, FetchResult) ->
|
|
|
|
|
+ #pending_next{is_replay = IsReplay, stream_key = StreamKey, it_begin = ItBegin} = Pending,
|
|
|
|
|
+ enqueue_batch(IsReplay, Session, ClientInfo, StreamKey, ItBegin, FetchResult).
|
|
|
|
|
+
|
|
|
|
|
+-spec enqueue_batch(
|
|
|
|
|
+ boolean(),
|
|
|
|
|
+ session(),
|
|
|
|
|
+ clientinfo(),
|
|
|
|
|
+ emqx_persistent_session_ds_state:stream_key(),
|
|
|
|
|
+ emqx_ds:iterator(),
|
|
|
|
|
+ emqx_ds:next_result()
|
|
|
|
|
+) ->
|
|
|
|
|
+ {ok | emqx_ds:error(), #srs{}, session()}
|
|
|
|
|
+ | {ignore, undefined, session()}.
|
|
|
|
|
+enqueue_batch(IsReplay, Session = #{s := S}, ClientInfo, StreamKey, ItBegin, FetchResult) ->
|
|
|
case emqx_persistent_session_ds_state:get_stream(StreamKey, S) of
|
|
case emqx_persistent_session_ds_state:get_stream(StreamKey, S) of
|
|
|
undefined ->
|
|
undefined ->
|
|
|
%% Assert: streams should not change or go missing during
|
|
%% Assert: streams should not change or go missing during
|
|
@@ -1113,20 +1127,20 @@ enqueue_batch(Session = #{s := S}, ClientInfo, Pending, FetchResult) ->
|
|
|
%% so far.
|
|
%% so far.
|
|
|
?SLOG(info, #{
|
|
?SLOG(info, #{
|
|
|
msg => "sessds_ignoring_orphaned_batch",
|
|
msg => "sessds_ignoring_orphaned_batch",
|
|
|
- pending => Pending
|
|
|
|
|
|
|
+ stream_key => StreamKey,
|
|
|
|
|
+ it_begin => ItBegin
|
|
|
}),
|
|
}),
|
|
|
- Session;
|
|
|
|
|
|
|
+ {ignore, undefined, Session};
|
|
|
Srs ->
|
|
Srs ->
|
|
|
- do_enqueue_batch(Session, ClientInfo, Pending, Srs, FetchResult)
|
|
|
|
|
|
|
+ do_enqueue_batch(IsReplay, Session, ClientInfo, Srs, ItBegin, FetchResult)
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-do_enqueue_batch(
|
|
|
|
|
- Session = #{s := S0, inflight := Inflight0}, ClientInfo, Pending, Srs0, FetchResult
|
|
|
|
|
-) ->
|
|
|
|
|
- #pending_next{is_replay = IsReplay, it_begin = ItBegin} = Pending,
|
|
|
|
|
|
|
+do_enqueue_batch(IsReplay, Session, ClientInfo, Srs0, ItBegin, FetchResult) ->
|
|
|
|
|
+ #{s := S0, inflight := Inflight0} = Session,
|
|
|
|
|
+ #srs{sub_state_id = SubStateId} = Srs0,
|
|
|
case FetchResult of
|
|
case FetchResult of
|
|
|
{error, _, _} = Error ->
|
|
{error, _, _} = Error ->
|
|
|
- Error;
|
|
|
|
|
|
|
+ {Error, Srs0, Session};
|
|
|
{ok, end_of_stream} ->
|
|
{ok, end_of_stream} ->
|
|
|
%% No new messages; just update the end iterator:
|
|
%% No new messages; just update the end iterator:
|
|
|
Srs = Srs0#srs{
|
|
Srs = Srs0#srs{
|
|
@@ -1150,10 +1164,7 @@ do_enqueue_batch(
|
|
|
first_seqno_qos2 = FirstSeqnoQos2
|
|
first_seqno_qos2 = FirstSeqnoQos2
|
|
|
} = Srs0
|
|
} = Srs0
|
|
|
end,
|
|
end,
|
|
|
- SubState =
|
|
|
|
|
- #{} = emqx_persistent_session_ds_state:get_subscription_state(
|
|
|
|
|
- Srs0#srs.sub_state_id, S0
|
|
|
|
|
- ),
|
|
|
|
|
|
|
+ SubState = emqx_persistent_session_ds_state:get_subscription_state(SubStateId, S0),
|
|
|
{Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch(
|
|
{Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch(
|
|
|
IsReplay,
|
|
IsReplay,
|
|
|
Session,
|
|
Session,
|