emqx_message.erl 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_message).
  17. -compile(inline).
  18. -include("emqx.hrl").
  19. -include("emqx_mqtt.hrl").
  20. -include("types.hrl").
  21. %% Create
  22. -export([ make/2
  23. , make/3
  24. , make/4
  25. ]).
  26. %% Fields
  27. -export([ id/1
  28. , qos/1
  29. , from/1
  30. , topic/1
  31. , payload/1
  32. , timestamp/1
  33. ]).
  34. %% Flags
  35. -export([ is_sys/1
  36. , clean_dup/1
  37. , get_flag/2
  38. , get_flag/3
  39. , get_flags/1
  40. , set_flag/2
  41. , set_flag/3
  42. , set_flags/2
  43. , unset_flag/2
  44. ]).
  45. %% Headers
  46. -export([ get_headers/1
  47. , get_header/2
  48. , get_header/3
  49. , set_header/3
  50. , set_headers/2
  51. , remove_header/2
  52. ]).
  53. -export([ is_expired/1
  54. , update_expiry/1
  55. ]).
  56. -export([ to_packet/2
  57. , to_map/1
  58. , to_list/1
  59. ]).
  60. -export([format/1]).
  61. -type(flag() :: atom()).
  62. -spec(make(emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()).
  63. make(Topic, Payload) ->
  64. make(undefined, Topic, Payload).
  65. -spec(make(emqx_types:clientid(),
  66. emqx_topic:topic(),
  67. emqx_types:payload()) -> emqx_types:message()).
  68. make(From, Topic, Payload) ->
  69. make(From, ?QOS_0, Topic, Payload).
  70. -spec(make(emqx_types:clientid(),
  71. emqx_types:qos(),
  72. emqx_topic:topic(),
  73. emqx_types:payload()) -> emqx_types:message()).
  74. make(From, QoS, Topic, Payload) when ?QOS_0 =< QoS, QoS =< ?QOS_2 ->
  75. Now = erlang:system_time(millisecond),
  76. #message{id = emqx_guid:gen(),
  77. qos = QoS,
  78. from = From,
  79. topic = Topic,
  80. payload = Payload,
  81. timestamp = Now
  82. }.
  83. -spec(id(emqx_types:message()) -> maybe(binary())).
  84. id(#message{id = Id}) -> Id.
  85. -spec(qos(emqx_types:message()) -> emqx_types:qos()).
  86. qos(#message{qos = QoS}) -> QoS.
  87. -spec(from(emqx_types:message()) -> atom() | binary()).
  88. from(#message{from = From}) -> From.
  89. -spec(topic(emqx_types:message()) -> emqx_types:topic()).
  90. topic(#message{topic = Topic}) -> Topic.
  91. -spec(payload(emqx_types:message()) -> emqx_types:payload()).
  92. payload(#message{payload = Payload}) -> Payload.
  93. -spec(timestamp(emqx_types:message()) -> integer()).
  94. timestamp(#message{timestamp = TS}) -> TS.
  95. -spec(is_sys(emqx_types:message()) -> boolean()).
  96. is_sys(#message{flags = #{sys := true}}) ->
  97. true;
  98. is_sys(#message{topic = <<"$SYS/", _/binary>>}) ->
  99. true;
  100. is_sys(_Msg) -> false.
  101. -spec(clean_dup(emqx_types:message()) -> emqx_types:message()).
  102. clean_dup(Msg = #message{flags = Flags = #{dup := true}}) ->
  103. Msg#message{flags = Flags#{dup => false}};
  104. clean_dup(Msg) -> Msg.
  105. -spec(set_flags(map(), emqx_types:message()) -> emqx_types:message()).
  106. set_flags(Flags, Msg = #message{flags = undefined}) when is_map(Flags) ->
  107. Msg#message{flags = Flags};
  108. set_flags(New, Msg = #message{flags = Old}) when is_map(New) ->
  109. Msg#message{flags = maps:merge(Old, New)}.
  110. -spec(get_flag(flag(), emqx_types:message()) -> boolean()).
  111. get_flag(_Flag, #message{flags = undefined}) ->
  112. false;
  113. get_flag(Flag, Msg) ->
  114. get_flag(Flag, Msg, false).
  115. get_flag(_Flag, #message{flags = undefined}, Default) ->
  116. Default;
  117. get_flag(Flag, #message{flags = Flags}, Default) ->
  118. maps:get(Flag, Flags, Default).
  119. -spec(get_flags(emqx_types:message()) -> maybe(map())).
  120. get_flags(#message{flags = Flags}) -> Flags.
  121. -spec(set_flag(flag(), emqx_types:message()) -> emqx_types:message()).
  122. set_flag(Flag, Msg = #message{flags = undefined}) when is_atom(Flag) ->
  123. Msg#message{flags = #{Flag => true}};
  124. set_flag(Flag, Msg = #message{flags = Flags}) when is_atom(Flag) ->
  125. Msg#message{flags = maps:put(Flag, true, Flags)}.
  126. -spec(set_flag(flag(), boolean() | integer(), emqx_types:message())
  127. -> emqx_types:message()).
  128. set_flag(Flag, Val, Msg = #message{flags = undefined}) when is_atom(Flag) ->
  129. Msg#message{flags = #{Flag => Val}};
  130. set_flag(Flag, Val, Msg = #message{flags = Flags}) when is_atom(Flag) ->
  131. Msg#message{flags = maps:put(Flag, Val, Flags)}.
  132. -spec(unset_flag(flag(), emqx_types:message()) -> emqx_types:message()).
  133. unset_flag(Flag, Msg = #message{flags = Flags}) ->
  134. case maps:is_key(Flag, Flags) of
  135. true -> Msg#message{flags = maps:remove(Flag, Flags)};
  136. false -> Msg
  137. end.
  138. -spec(set_headers(map(), emqx_types:message()) -> emqx_types:message()).
  139. set_headers(Headers, Msg = #message{headers = undefined}) when is_map(Headers) ->
  140. Msg#message{headers = Headers};
  141. set_headers(New, Msg = #message{headers = Old}) when is_map(New) ->
  142. Msg#message{headers = maps:merge(Old, New)}.
  143. -spec(get_headers(emqx_types:message()) -> maybe(map())).
  144. get_headers(Msg) -> Msg#message.headers.
  145. -spec(get_header(term(), emqx_types:message()) -> term()).
  146. get_header(_Hdr, #message{headers = undefined}) ->
  147. undefined;
  148. get_header(Hdr, Msg) ->
  149. get_header(Hdr, Msg, undefined).
  150. -spec(get_header(term(), emqx_types:message(), term()) -> term()).
  151. get_header(_Hdr, #message{headers = undefined}, Default) ->
  152. Default;
  153. get_header(Hdr, #message{headers = Headers}, Default) ->
  154. maps:get(Hdr, Headers, Default).
  155. -spec(set_header(term(), term(), emqx_types:message()) -> emqx_types:message()).
  156. set_header(Hdr, Val, Msg = #message{headers = undefined}) ->
  157. Msg#message{headers = #{Hdr => Val}};
  158. set_header(Hdr, Val, Msg = #message{headers = Headers}) ->
  159. Msg#message{headers = maps:put(Hdr, Val, Headers)}.
  160. -spec(remove_header(term(), emqx_types:message()) -> emqx_types:message()).
  161. remove_header(_Hdr, Msg = #message{headers = undefined}) ->
  162. Msg;
  163. remove_header(Hdr, Msg = #message{headers = Headers}) ->
  164. case maps:is_key(Hdr, Headers) of
  165. true -> Msg#message{headers = maps:remove(Hdr, Headers)};
  166. false -> Msg
  167. end.
  168. -spec(is_expired(emqx_types:message()) -> boolean()).
  169. is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval},
  170. timestamp = CreatedAt}) ->
  171. elapsed(CreatedAt) > timer:seconds(Interval);
  172. is_expired(_Msg) -> false.
  173. -spec(update_expiry(emqx_types:message()) -> emqx_types:message()).
  174. update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval},
  175. timestamp = CreatedAt}) ->
  176. case elapsed(CreatedAt) of
  177. Elapsed when Elapsed > 0 ->
  178. Interval1 = max(1, Interval - (Elapsed div 1000)),
  179. set_header('Message-Expiry-Interval', Interval1, Msg);
  180. _ -> Msg
  181. end;
  182. update_expiry(Msg) -> Msg.
  183. %% @doc Message to PUBLISH Packet.
  184. -spec(to_packet(emqx_types:packet_id(), emqx_types:message())
  185. -> emqx_types:packet()).
  186. to_packet(PacketId, Msg = #message{qos = QoS, headers = Headers,
  187. topic = Topic, payload = Payload}) ->
  188. #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
  189. dup = get_flag(dup, Msg),
  190. qos = QoS,
  191. retain = get_flag(retain, Msg)
  192. },
  193. variable = #mqtt_packet_publish{topic_name = Topic,
  194. packet_id = PacketId,
  195. properties = props(Headers)
  196. },
  197. payload = Payload
  198. }.
  199. props(undefined) -> undefined;
  200. props(Headers) -> maps:with(['Payload-Format-Indicator',
  201. 'Response-Topic',
  202. 'Correlation-Data',
  203. 'User-Property',
  204. 'Subscription-Identifier',
  205. 'Content-Type',
  206. 'Message-Expiry-Interval'
  207. ], Headers).
  208. %% @doc Message to map
  209. -spec(to_map(emqx_types:message()) -> map()).
  210. to_map(#message{
  211. id = Id,
  212. qos = QoS,
  213. from = From,
  214. flags = Flags,
  215. headers = Headers,
  216. topic = Topic,
  217. payload = Payload,
  218. timestamp = Timestamp
  219. }) ->
  220. #{id => Id,
  221. qos => QoS,
  222. from => From,
  223. flags => Flags,
  224. headers => Headers,
  225. topic => Topic,
  226. payload => Payload,
  227. timestamp => Timestamp
  228. }.
  229. %% @doc Message to tuple list
  230. -spec(to_list(emqx_types:message()) -> map()).
  231. to_list(Msg) ->
  232. lists:zip(record_info(fields, message), tl(tuple_to_list(Msg))).
  233. %% MilliSeconds
  234. elapsed(Since) ->
  235. max(0, erlang:system_time(millisecond) - Since).
  236. format(#message{id = Id, qos = QoS, topic = Topic, from = From, flags = Flags, headers = Headers}) ->
  237. io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~p, Flags=~s, Headers=~s)",
  238. [Id, QoS, Topic, From, format(flags, Flags), format(headers, Headers)]).
  239. format(_, undefined) ->
  240. "";
  241. format(flags, Flags) ->
  242. io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]);
  243. format(headers, Headers) ->
  244. io_lib:format("~p", [Headers]).