emqttd_message.erl 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% @doc MQTT Message Functions
  17. -module(emqttd_message).
  18. -include("emqttd.hrl").
  19. -include("emqttd_protocol.hrl").
  20. -export([make/3, make/4, from_packet/1, from_packet/2, to_packet/1]).
  21. -export([set_flag/1, set_flag/2, unset_flag/1, unset_flag/2]).
  22. -export([format/1]).
  23. %% @doc Make a message
  24. -spec make(From, Topic, Payload) -> mqtt_message() when
  25. From :: atom() | binary(),
  26. Topic :: binary(),
  27. Payload :: binary().
  28. make(From, Topic, Payload) ->
  29. #mqtt_message{topic = Topic,
  30. from = From,
  31. payload = Payload,
  32. timestamp = os:timestamp()}.
  33. -spec make(From, Qos, Topic, Payload) -> mqtt_message() when
  34. From :: atom() | binary(),
  35. Qos :: mqtt_qos() | mqtt_qos_name(),
  36. Topic :: binary(),
  37. Payload :: binary().
  38. make(From, Qos, Topic, Payload) ->
  39. #mqtt_message{msgid = msgid(?QOS_I(Qos)),
  40. topic = Topic,
  41. from = From,
  42. qos = ?QOS_I(Qos),
  43. payload = Payload,
  44. timestamp = os:timestamp()}.
  45. %% @doc Message from Packet
  46. -spec from_packet(mqtt_packet()) -> mqtt_message().
  47. from_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
  48. retain = Retain,
  49. qos = Qos,
  50. dup = Dup},
  51. variable = #mqtt_packet_publish{topic_name = Topic,
  52. packet_id = PacketId},
  53. payload = Payload}) ->
  54. #mqtt_message{msgid = msgid(Qos),
  55. pktid = PacketId,
  56. qos = Qos,
  57. retain = Retain,
  58. dup = Dup,
  59. topic = Topic,
  60. payload = Payload,
  61. timestamp = os:timestamp()};
  62. from_packet(#mqtt_packet_connect{will_flag = false}) ->
  63. undefined;
  64. from_packet(#mqtt_packet_connect{will_retain = Retain,
  65. will_qos = Qos,
  66. will_topic = Topic,
  67. will_msg = Msg}) ->
  68. #mqtt_message{msgid = msgid(Qos),
  69. topic = Topic,
  70. retain = Retain,
  71. qos = Qos,
  72. dup = false,
  73. payload = Msg,
  74. timestamp = os:timestamp()}.
  75. from_packet(ClientId, Packet) ->
  76. Msg = from_packet(Packet), Msg#mqtt_message{from = ClientId}.
  77. msgid(?QOS_0) ->
  78. undefined;
  79. msgid(Qos) when Qos =:= ?QOS_1 orelse Qos =:= ?QOS_2 ->
  80. emqttd_guid:gen().
  81. %% @doc Message to packet
  82. -spec to_packet(mqtt_message()) -> mqtt_packet().
  83. to_packet(#mqtt_message{pktid = PkgId,
  84. qos = Qos,
  85. retain = Retain,
  86. dup = Dup,
  87. topic = Topic,
  88. payload = Payload}) ->
  89. #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
  90. qos = Qos,
  91. retain = Retain,
  92. dup = Dup},
  93. variable = #mqtt_packet_publish{topic_name = Topic,
  94. packet_id = if
  95. Qos =:= ?QOS_0 -> undefined;
  96. true -> PkgId
  97. end
  98. },
  99. payload = Payload}.
  100. %% @doc set dup, retain flag
  101. -spec set_flag(mqtt_message()) -> mqtt_message().
  102. set_flag(Msg) ->
  103. Msg#mqtt_message{dup = true, retain = true}.
  104. -spec set_flag(atom(), mqtt_message()) -> mqtt_message().
  105. set_flag(dup, Msg = #mqtt_message{dup = false}) ->
  106. Msg#mqtt_message{dup = true};
  107. set_flag(sys, Msg = #mqtt_message{sys = false}) ->
  108. Msg#mqtt_message{sys = true};
  109. set_flag(retain, Msg = #mqtt_message{retain = false}) ->
  110. Msg#mqtt_message{retain = true};
  111. set_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
  112. %% @doc Unset dup, retain flag
  113. -spec unset_flag(mqtt_message()) -> mqtt_message().
  114. unset_flag(Msg) ->
  115. Msg#mqtt_message{dup = false, retain = false}.
  116. -spec unset_flag(dup | retain | atom(), mqtt_message()) -> mqtt_message().
  117. unset_flag(dup, Msg = #mqtt_message{dup = true}) ->
  118. Msg#mqtt_message{dup = false};
  119. unset_flag(retain, Msg = #mqtt_message{retain = true}) ->
  120. Msg#mqtt_message{retain = false};
  121. unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
  122. %% @doc Format MQTT Message
  123. format(#mqtt_message{msgid = MsgId, pktid = PktId, from = From,
  124. qos = Qos, retain = Retain, dup = Dup, topic =Topic}) ->
  125. io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s, Topic=~s)",
  126. [i(Qos), i(Retain), i(Dup), MsgId, PktId, From, Topic]).
  127. i(true) -> 1;
  128. i(false) -> 0;
  129. i(I) when is_integer(I) -> I.