|
@@ -50,7 +50,7 @@
|
|
|
, delete_delayed_message/1
|
|
, delete_delayed_message/1
|
|
|
]).
|
|
]).
|
|
|
|
|
|
|
|
--record(delayed_message, {key, msg}).
|
|
|
|
|
|
|
+-record(delayed_message, {key, delayed, msg}).
|
|
|
|
|
|
|
|
-define(TAB, ?MODULE).
|
|
-define(TAB, ?MODULE).
|
|
|
-define(SERVER, ?MODULE).
|
|
-define(SERVER, ?MODULE).
|
|
@@ -78,19 +78,19 @@ on_message_publish(Msg = #message{
|
|
|
timestamp = Ts
|
|
timestamp = Ts
|
|
|
}) ->
|
|
}) ->
|
|
|
[Delay, Topic1] = binary:split(Topic, <<"/">>),
|
|
[Delay, Topic1] = binary:split(Topic, <<"/">>),
|
|
|
- PubAt = case binary_to_integer(Delay) of
|
|
|
|
|
|
|
+ {PubAt, Delayed} = case binary_to_integer(Delay) of
|
|
|
Interval when Interval < ?MAX_INTERVAL ->
|
|
Interval when Interval < ?MAX_INTERVAL ->
|
|
|
- Interval + erlang:round(Ts / 1000);
|
|
|
|
|
|
|
+ {Interval + erlang:round(Ts / 1000), Interval};
|
|
|
Timestamp ->
|
|
Timestamp ->
|
|
|
%% Check malicious timestamp?
|
|
%% Check malicious timestamp?
|
|
|
case (Timestamp - erlang:round(Ts / 1000)) > ?MAX_INTERVAL of
|
|
case (Timestamp - erlang:round(Ts / 1000)) > ?MAX_INTERVAL of
|
|
|
true -> error(invalid_delayed_timestamp);
|
|
true -> error(invalid_delayed_timestamp);
|
|
|
- false -> Timestamp
|
|
|
|
|
|
|
+ false -> {Timestamp, Timestamp - erlang:round(Ts / 1000)}
|
|
|
end
|
|
end
|
|
|
end,
|
|
end,
|
|
|
PubMsg = Msg#message{topic = Topic1},
|
|
PubMsg = Msg#message{topic = Topic1},
|
|
|
Headers = PubMsg#message.headers,
|
|
Headers = PubMsg#message.headers,
|
|
|
- case store(#delayed_message{key = {PubAt, Id}, msg = PubMsg}) of
|
|
|
|
|
|
|
+ case store(#delayed_message{key = {PubAt, Id}, delayed = Delayed, msg = PubMsg}) of
|
|
|
ok -> ok;
|
|
ok -> ok;
|
|
|
{error, Error} ->
|
|
{error, Error} ->
|
|
|
?LOG(error, "Store delayed message fail: ~p", [Error])
|
|
?LOG(error, "Store delayed message fail: ~p", [Error])
|
|
@@ -128,15 +128,22 @@ list(Params) ->
|
|
|
format_delayed(Delayed) ->
|
|
format_delayed(Delayed) ->
|
|
|
format_delayed(Delayed, false).
|
|
format_delayed(Delayed, false).
|
|
|
|
|
|
|
|
-format_delayed(#delayed_message{key = {TimeStamp, Id},
|
|
|
|
|
|
|
+format_delayed(#delayed_message{key = {ExpectTimeStamp, Id}, delayed = Delayed,
|
|
|
msg = #message{topic = Topic,
|
|
msg = #message{topic = Topic,
|
|
|
from = From,
|
|
from = From,
|
|
|
headers = #{username := Username},
|
|
headers = #{username := Username},
|
|
|
qos = Qos,
|
|
qos = Qos,
|
|
|
|
|
+ timestamp = PublishTimeStamp,
|
|
|
payload = Payload}}, WithPayload) ->
|
|
payload = Payload}}, WithPayload) ->
|
|
|
|
|
+ PublishTime = to_rfc3339(PublishTimeStamp div 1000),
|
|
|
|
|
+ ExpectTime = to_rfc3339(ExpectTimeStamp),
|
|
|
|
|
+ RemainingTime = ExpectTimeStamp - erlang:system_time(second),
|
|
|
Result = #{
|
|
Result = #{
|
|
|
id => emqx_guid:to_hexstr(Id),
|
|
id => emqx_guid:to_hexstr(Id),
|
|
|
- publish_time => list_to_binary(calendar:system_time_to_rfc3339(TimeStamp, [{unit, second}])),
|
|
|
|
|
|
|
+ publish_at => PublishTime,
|
|
|
|
|
+ delayed_interval => Delayed,
|
|
|
|
|
+ delayed_remaining => RemainingTime,
|
|
|
|
|
+ expected_at => ExpectTime,
|
|
|
topic => Topic,
|
|
topic => Topic,
|
|
|
qos => Qos,
|
|
qos => Qos,
|
|
|
from_clientid => From,
|
|
from_clientid => From,
|
|
@@ -149,6 +156,9 @@ format_delayed(#delayed_message{key = {TimeStamp, Id},
|
|
|
Result
|
|
Result
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
|
|
+to_rfc3339(Timestamp) ->
|
|
|
|
|
+ list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, second}])).
|
|
|
|
|
+
|
|
|
get_delayed_message(Id0) ->
|
|
get_delayed_message(Id0) ->
|
|
|
Id = emqx_guid:from_hexstr(Id0),
|
|
Id = emqx_guid:from_hexstr(Id0),
|
|
|
Ms = [{{delayed_message,{'_',Id},'_'},[],['$_']}],
|
|
Ms = [{{delayed_message,{'_',Id},'_'},[],['$_']}],
|