|
|
@@ -38,6 +38,7 @@
|
|
|
%% Flags
|
|
|
-export([ get_flag/2
|
|
|
, get_flag/3
|
|
|
+ , get_flags/1
|
|
|
, set_flag/2
|
|
|
, set_flag/3
|
|
|
, set_flags/2
|
|
|
@@ -85,6 +86,7 @@ make(From, QoS, Topic, Payload) when ?QOS_0 =< QoS, QoS =< ?QOS_2 ->
|
|
|
qos = QoS,
|
|
|
from = From,
|
|
|
flags = #{dup => false},
|
|
|
+ headers = #{},
|
|
|
topic = Topic,
|
|
|
payload = Payload,
|
|
|
timestamp = os:timestamp()}.
|
|
|
@@ -119,6 +121,9 @@ get_flag(Flag, Msg) ->
|
|
|
get_flag(Flag, #message{flags = Flags}, Default) ->
|
|
|
maps:get(Flag, Flags, Default).
|
|
|
|
|
|
+-spec(get_flags(emqx_types:message()) -> maybe(map())).
|
|
|
+get_flags(#message{flags = Flags}) -> Flags.
|
|
|
+
|
|
|
-spec(set_flag(flag(), emqx_types:message()) -> emqx_types:message()).
|
|
|
set_flag(Flag, Msg = #message{flags = undefined}) when is_atom(Flag) ->
|
|
|
Msg#message{flags = #{Flag => true}};
|
|
|
@@ -144,8 +149,7 @@ unset_flag(Flag, Msg = #message{flags = Flags}) ->
|
|
|
set_headers(Headers, Msg = #message{headers = undefined}) when is_map(Headers) ->
|
|
|
Msg#message{headers = Headers};
|
|
|
set_headers(New, Msg = #message{headers = Old}) when is_map(New) ->
|
|
|
- Msg#message{headers = maps:merge(Old, New)};
|
|
|
-set_headers(undefined, Msg) -> Msg.
|
|
|
+ Msg#message{headers = maps:merge(Old, New)}.
|
|
|
|
|
|
-spec(get_headers(emqx_types:message()) -> map()).
|
|
|
get_headers(Msg) ->
|