|
|
@@ -909,30 +909,28 @@ update_seqno(Track, PacketId, Session = #{id := SessionId, s := S}) ->
|
|
|
SeqNo = packet_id_to_seqno(PacketId, S),
|
|
|
case Track of
|
|
|
puback ->
|
|
|
- MinTrack = ?committed(?QOS_1),
|
|
|
- MaxTrack = ?next(?QOS_1);
|
|
|
+ QoS = ?QOS_1,
|
|
|
+ SeqNoKey = ?committed(?QOS_1);
|
|
|
pubrec ->
|
|
|
- MinTrack = ?rec,
|
|
|
- MaxTrack = ?next(?QOS_2);
|
|
|
+ QoS = ?QOS_2,
|
|
|
+ SeqNoKey = ?rec;
|
|
|
pubcomp ->
|
|
|
- MinTrack = ?committed(?QOS_2),
|
|
|
- MaxTrack = ?next(?QOS_2)
|
|
|
+ QoS = ?QOS_2,
|
|
|
+ SeqNoKey = ?committed(?QOS_2)
|
|
|
end,
|
|
|
- Min = emqx_persistent_session_ds_state:get_seqno(MinTrack, S),
|
|
|
- Max = emqx_persistent_session_ds_state:get_seqno(MaxTrack, S),
|
|
|
- case Min =< SeqNo andalso SeqNo =< Max of
|
|
|
- true ->
|
|
|
+ Current = emqx_persistent_session_ds_state:get_seqno(SeqNoKey, S),
|
|
|
+ case inc_seqno(QoS, Current) of
|
|
|
+ SeqNo ->
|
|
|
%% TODO: we pass a bogus message into the hook:
|
|
|
Msg = emqx_message:make(SessionId, <<>>, <<>>),
|
|
|
- {ok, Msg, Session#{s => emqx_persistent_session_ds_state:put_seqno(MinTrack, SeqNo, S)}};
|
|
|
- false ->
|
|
|
+ {ok, Msg, Session#{s => emqx_persistent_session_ds_state:put_seqno(SeqNoKey, SeqNo, S)}};
|
|
|
+ Expected ->
|
|
|
?SLOG(warning, #{
|
|
|
msg => "out-of-order_commit",
|
|
|
track => Track,
|
|
|
packet_id => PacketId,
|
|
|
seqno => SeqNo,
|
|
|
- min => Min,
|
|
|
- max => Max
|
|
|
+ expected => Expected
|
|
|
}),
|
|
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
|
|
end.
|