|
|
@@ -48,11 +48,13 @@ make(From, QoS, Topic, Payload) ->
|
|
|
payload = Payload,
|
|
|
timestamp = os:timestamp()}.
|
|
|
|
|
|
+-spec(set_flags(map(), emqx_types:message()) -> emqx_types:message()).
|
|
|
set_flags(Flags, Msg = #message{flags = undefined}) when is_map(Flags) ->
|
|
|
Msg#message{flags = Flags};
|
|
|
set_flags(New, Msg = #message{flags = Old}) when is_map(New) ->
|
|
|
Msg#message{flags = maps:merge(Old, New)}.
|
|
|
|
|
|
+-spec(get_flag(flag(), emqx_types:message()) -> boolean()).
|
|
|
get_flag(Flag, Msg) ->
|
|
|
get_flag(Flag, Msg, false).
|
|
|
get_flag(Flag, #message{flags = Flags}, Default) ->
|
|
|
@@ -73,20 +75,26 @@ set_flag(Flag, Val, Msg = #message{flags = Flags}) when is_atom(Flag) ->
|
|
|
|
|
|
-spec(unset_flag(flag(), emqx_types:message()) -> emqx_types:message()).
|
|
|
unset_flag(Flag, Msg = #message{flags = Flags}) ->
|
|
|
- Msg#message{flags = maps:remove(Flag, Flags)}.
|
|
|
+ case maps:is_key(Flag, Flags) of
|
|
|
+ true ->
|
|
|
+ Msg#message{flags = maps:remove(Flag, Flags)};
|
|
|
+ false -> Msg
|
|
|
+ end.
|
|
|
|
|
|
+-spec(set_headers(map(), emqx_types:message()) -> emqx_types:message()).
|
|
|
set_headers(Headers, Msg = #message{headers = undefined}) when is_map(Headers) ->
|
|
|
Msg#message{headers = Headers};
|
|
|
set_headers(New, Msg = #message{headers = Old}) when is_map(New) ->
|
|
|
- Msg#message{headers = maps:merge(Old, New)};
|
|
|
-set_headers(_, Msg) ->
|
|
|
- Msg.
|
|
|
+ Msg#message{headers = maps:merge(Old, New)}.
|
|
|
|
|
|
+-spec(get_header(term(), emqx_types:message()) -> term()).
|
|
|
get_header(Hdr, Msg) ->
|
|
|
get_header(Hdr, Msg, undefined).
|
|
|
+-spec(get_header(term(), emqx_types:message(), Default :: term()) -> term()).
|
|
|
get_header(Hdr, #message{headers = Headers}, Default) ->
|
|
|
maps:get(Hdr, Headers, Default).
|
|
|
|
|
|
+-spec(set_header(term(), term(), emqx_types:message()) -> emqx_types:message()).
|
|
|
set_header(Hdr, Val, Msg = #message{headers = undefined}) ->
|
|
|
Msg#message{headers = #{Hdr => Val}};
|
|
|
set_header(Hdr, Val, Msg = #message{headers = Headers}) ->
|
|
|
@@ -98,13 +106,13 @@ is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval}, timestam
|
|
|
is_expired(_Msg) ->
|
|
|
false.
|
|
|
|
|
|
+-spec(update_expiry(emqx_types:message()) -> emqx_types:message()).
|
|
|
update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) ->
|
|
|
case elapsed(CreatedAt) of
|
|
|
Elapsed when Elapsed > 0 ->
|
|
|
set_header('Message-Expiry-Interval', max(1, Interval - (Elapsed div 1000)), Msg);
|
|
|
_ -> Msg
|
|
|
end;
|
|
|
-
|
|
|
update_expiry(Msg) -> Msg.
|
|
|
|
|
|
remove_topic_alias(Msg = #message{headers = Headers}) ->
|