|
|
@@ -340,8 +340,7 @@ return_with(Msg, {ok, Publishes, Session}) ->
|
|
|
pubrec(PacketId, Session = #session{inflight = Inflight}) ->
|
|
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
|
|
{value, {Msg, _Ts}} when is_record(Msg, message) ->
|
|
|
- Inflight1 = emqx_inflight:update(
|
|
|
- PacketId, {pubrel, os:timestamp()}, Inflight),
|
|
|
+ Inflight1 = emqx_inflight:update(PacketId, with_ts(pubrel), Inflight),
|
|
|
{ok, Msg, Session#session{inflight = Inflight1}};
|
|
|
{value, {pubrel, _Ts}} ->
|
|
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
|
|
|
@@ -510,7 +509,7 @@ enrich_subopts([{subid, SubId}|Opts], Msg, Session) ->
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
|
|
|
- Inflight1 = emqx_inflight:insert(PacketId, {Msg, os:timestamp()}, Inflight),
|
|
|
+ Inflight1 = emqx_inflight:insert(PacketId, with_ts(Msg), Inflight),
|
|
|
Session#session{inflight = Inflight1}.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -521,9 +520,8 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
|
|
|
retry(Session = #session{inflight = Inflight}) ->
|
|
|
case emqx_inflight:is_empty(Inflight) of
|
|
|
true -> {ok, Session};
|
|
|
- false ->
|
|
|
- retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight),
|
|
|
- [], os:timestamp(), Session)
|
|
|
+ false -> retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight),
|
|
|
+ [], erlang:system_time(millisecond), Session)
|
|
|
end.
|
|
|
|
|
|
retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}) ->
|
|
|
@@ -561,7 +559,7 @@ retry_delivery(PacketId, pubrel, Now, Acc, Inflight) ->
|
|
|
expire(awaiting_rel, Session = #session{awaiting_rel = AwaitingRel}) ->
|
|
|
case maps:size(AwaitingRel) of
|
|
|
0 -> {ok, Session};
|
|
|
- _ -> expire_awaiting_rel(os:timestamp(), Session)
|
|
|
+ _ -> expire_awaiting_rel(erlang:system_time(millisecond), Session)
|
|
|
end.
|
|
|
|
|
|
expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel,
|
|
|
@@ -623,7 +621,7 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) ->
|
|
|
%% Helper functions
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
--compile({inline, [sort_fun/0, batch_n/1, age/2]}).
|
|
|
+-compile({inline, [sort_fun/0, batch_n/1, with_ts/1, age/2]}).
|
|
|
|
|
|
sort_fun() ->
|
|
|
fun({_, {_, Ts1}}, {_, {_, Ts2}}) -> Ts1 < Ts2 end.
|
|
|
@@ -634,7 +632,10 @@ batch_n(Inflight) ->
|
|
|
Sz -> Sz - emqx_inflight:size(Inflight)
|
|
|
end.
|
|
|
|
|
|
-age(Now, Ts) -> timer:now_diff(Now, Ts) div 1000.
|
|
|
+with_ts(Msg) ->
|
|
|
+ {Msg, erlang:system_time(millisecond)}.
|
|
|
+
|
|
|
+age(Now, Ts) -> Now - Ts.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% For CT tests
|