|
|
@@ -721,7 +721,7 @@ replay_streams(Session0 = #{replay := []}, _ClientInfo) ->
|
|
|
-spec replay_batch(stream_state(), session(), clientinfo()) -> session() | emqx_ds:error(_).
|
|
|
replay_batch(Srs0, Session0, ClientInfo) ->
|
|
|
#srs{batch_size = BatchSize} = Srs0,
|
|
|
- case enqueue_batch(true, BatchSize, Srs0, Session0, ClientInfo) of
|
|
|
+ case enqueue_batch(true, Session0, ClientInfo, fetch(true, BatchSize, Srs0)) of
|
|
|
{ok, Srs, Session} ->
|
|
|
%% Assert:
|
|
|
Srs =:= Srs0 orelse
|
|
|
@@ -1041,7 +1041,7 @@ new_batch(StreamKey, Srs0, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
|
|
|
last_seqno_qos1 = SN1,
|
|
|
last_seqno_qos2 = SN2
|
|
|
},
|
|
|
- case enqueue_batch(false, BatchSize, Srs1, Session0, ClientInfo) of
|
|
|
+ case enqueue_batch(false, Session0, ClientInfo, fetch(false, BatchSize, Srs1)) of
|
|
|
{ok, Srs, Session} ->
|
|
|
S1 = emqx_persistent_session_ds_state:put_seqno(
|
|
|
?next(?QOS_1),
|
|
|
@@ -1067,22 +1067,17 @@ new_batch(StreamKey, Srs0, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
|
|
|
skip_batch(StreamKey, Srs1, Session0, ClientInfo, Reason)
|
|
|
end.
|
|
|
|
|
|
-enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0, s := S}, ClientInfo) ->
|
|
|
- #srs{
|
|
|
- it_begin = ItBegin0,
|
|
|
- it_end = ItEnd0,
|
|
|
- first_seqno_qos1 = FirstSeqnoQos1,
|
|
|
- first_seqno_qos2 = FirstSeqnoQos2,
|
|
|
- sub_state_id = SubStateId
|
|
|
- } = Srs0,
|
|
|
- ItBegin =
|
|
|
- case IsReplay of
|
|
|
- true -> ItBegin0;
|
|
|
- false -> ItEnd0
|
|
|
- end,
|
|
|
- SubState = #{} = emqx_persistent_session_ds_state:get_subscription_state(SubStateId, S),
|
|
|
- case emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, BatchSize) of
|
|
|
- {ok, ItEnd, Messages} ->
|
|
|
+enqueue_batch(IsReplay, Session = #{inflight := Inflight0, s := S}, ClientInfo, FetchResult) ->
|
|
|
+ case FetchResult of
|
|
|
+ {ok, Srs, end_of_stream} ->
|
|
|
+ {ok, Srs, Session};
|
|
|
+ {ok, Srs1, Messages} ->
|
|
|
+ #srs{
|
|
|
+ first_seqno_qos1 = FirstSeqnoQos1,
|
|
|
+ first_seqno_qos2 = FirstSeqnoQos2,
|
|
|
+ sub_state_id = SubStateId
|
|
|
+ } = Srs1,
|
|
|
+ SubState = #{} = emqx_persistent_session_ds_state:get_subscription_state(SubStateId, S),
|
|
|
{Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch(
|
|
|
IsReplay,
|
|
|
Session,
|
|
|
@@ -1093,21 +1088,37 @@ enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0, s :=
|
|
|
Messages,
|
|
|
Inflight0
|
|
|
),
|
|
|
- Srs = Srs0#srs{
|
|
|
- it_begin = ItBegin,
|
|
|
- it_end = ItEnd,
|
|
|
- %% TODO: it should be possible to avoid calling
|
|
|
- %% length here by diffing size of inflight before
|
|
|
- %% and after inserting messages:
|
|
|
- batch_size = length(Messages),
|
|
|
+ Srs = Srs1#srs{
|
|
|
last_seqno_qos1 = LastSeqnoQos1,
|
|
|
last_seqno_qos2 = LastSeqnoQos2
|
|
|
},
|
|
|
{ok, Srs, Session#{inflight := Inflight}};
|
|
|
+ {error, _, _} = Error ->
|
|
|
+ Error
|
|
|
+ end.
|
|
|
+
|
|
|
+fetch(IsReplay, BatchSize, Srs0) ->
|
|
|
+ #srs{
|
|
|
+ it_begin = ItBegin0,
|
|
|
+ it_end = ItEnd0
|
|
|
+ } = Srs0,
|
|
|
+ ItBegin =
|
|
|
+ case IsReplay of
|
|
|
+ true -> ItBegin0;
|
|
|
+ false -> ItEnd0
|
|
|
+ end,
|
|
|
+ case emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, BatchSize) of
|
|
|
+ {ok, ItEnd, Messages} ->
|
|
|
+ Srs = Srs0#srs{
|
|
|
+ it_begin = ItBegin,
|
|
|
+ it_end = ItEnd,
|
|
|
+ batch_size = length(Messages)
|
|
|
+ },
|
|
|
+ {ok, Srs, Messages};
|
|
|
{ok, end_of_stream} ->
|
|
|
%% No new messages; just update the end iterator:
|
|
|
Srs = Srs0#srs{it_begin = ItBegin, it_end = end_of_stream, batch_size = 0},
|
|
|
- {ok, Srs, Session#{inflight := Inflight0}};
|
|
|
+ {ok, Srs, end_of_stream};
|
|
|
{error, _, _} = Error ->
|
|
|
Error
|
|
|
end.
|