emqttd_serializer.erl 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% @doc MQTT Packet Serializer
  17. -module(emqttd_serializer).
  18. -include("emqttd.hrl").
  19. -include("emqttd_protocol.hrl").
  20. %% API
  21. -export([serialize/1]).
  22. %% @doc Serialise MQTT Packet
  23. -spec serialize(mqtt_packet()) -> binary().
  24. serialize(#mqtt_packet{header = Header = #mqtt_packet_header{type = Type},
  25. variable = Variable,
  26. payload = Payload}) ->
  27. serialize_header(Header,
  28. serialize_variable(Type, Variable,
  29. serialize_payload(Payload))).
  30. serialize_header(#mqtt_packet_header{type = Type,
  31. dup = Dup,
  32. qos = Qos,
  33. retain = Retain},
  34. {VariableBin, PayloadBin}) when ?CONNECT =< Type andalso Type =< ?DISCONNECT ->
  35. Len = size(VariableBin) + size(PayloadBin),
  36. true = (Len =< ?MAX_LEN),
  37. LenBin = serialize_len(Len),
  38. <<Type:4, (opt(Dup)):1, (opt(Qos)):2, (opt(Retain)):1,
  39. LenBin/binary,
  40. VariableBin/binary,
  41. PayloadBin/binary>>.
  42. serialize_variable(?CONNECT, #mqtt_packet_connect{client_id = ClientId,
  43. proto_ver = ProtoVer,
  44. proto_name = ProtoName,
  45. will_retain = WillRetain,
  46. will_qos = WillQos,
  47. will_flag = WillFlag,
  48. clean_sess = CleanSess,
  49. keep_alive = KeepAlive,
  50. will_topic = WillTopic,
  51. will_msg = WillMsg,
  52. username = Username,
  53. password = Password}, undefined) ->
  54. VariableBin = <<(size(ProtoName)):16/big-unsigned-integer,
  55. ProtoName/binary,
  56. ProtoVer:8,
  57. (opt(Username)):1,
  58. (opt(Password)):1,
  59. (opt(WillRetain)):1,
  60. WillQos:2,
  61. (opt(WillFlag)):1,
  62. (opt(CleanSess)):1,
  63. 0:1,
  64. KeepAlive:16/big-unsigned-integer>>,
  65. PayloadBin = serialize_utf(ClientId),
  66. PayloadBin1 = case WillFlag of
  67. true -> <<PayloadBin/binary,
  68. (serialize_utf(WillTopic))/binary,
  69. (size(WillMsg)):16/big-unsigned-integer,
  70. WillMsg/binary>>;
  71. false -> PayloadBin
  72. end,
  73. UserPasswd = << <<(serialize_utf(B))/binary>> || B <- [Username, Password], B =/= undefined >>,
  74. {VariableBin, <<PayloadBin1/binary, UserPasswd/binary>>};
  75. serialize_variable(?CONNACK, #mqtt_packet_connack{ack_flags = AckFlags,
  76. return_code = ReturnCode}, undefined) ->
  77. {<<AckFlags:8, ReturnCode:8>>, <<>>};
  78. serialize_variable(?SUBSCRIBE, #mqtt_packet_subscribe{packet_id = PacketId,
  79. topic_table = Topics }, undefined) ->
  80. {<<PacketId:16/big>>, serialize_topics(Topics)};
  81. serialize_variable(?SUBACK, #mqtt_packet_suback{packet_id = PacketId,
  82. qos_table = QosTable}, undefined) ->
  83. {<<PacketId:16/big>>, << <<Q:8>> || Q <- QosTable >>};
  84. serialize_variable(?UNSUBSCRIBE, #mqtt_packet_unsubscribe{packet_id = PacketId,
  85. topics = Topics }, undefined) ->
  86. {<<PacketId:16/big>>, serialize_topics(Topics)};
  87. serialize_variable(?UNSUBACK, #mqtt_packet_unsuback{packet_id = PacketId}, undefined) ->
  88. {<<PacketId:16/big>>, <<>>};
  89. serialize_variable(?PUBLISH, #mqtt_packet_publish{topic_name = TopicName,
  90. packet_id = PacketId }, PayloadBin) ->
  91. TopicBin = serialize_utf(TopicName),
  92. PacketIdBin = if
  93. PacketId =:= undefined -> <<>>;
  94. true -> <<PacketId:16/big>>
  95. end,
  96. {<<TopicBin/binary, PacketIdBin/binary>>, PayloadBin};
  97. serialize_variable(PubAck, #mqtt_packet_puback{packet_id = PacketId}, _Payload)
  98. when PubAck =:= ?PUBACK; PubAck =:= ?PUBREC; PubAck =:= ?PUBREL; PubAck =:= ?PUBCOMP ->
  99. {<<PacketId:16/big>>, <<>>};
  100. serialize_variable(?PINGREQ, undefined, undefined) ->
  101. {<<>>, <<>>};
  102. serialize_variable(?PINGRESP, undefined, undefined) ->
  103. {<<>>, <<>>};
  104. serialize_variable(?DISCONNECT, undefined, undefined) ->
  105. {<<>>, <<>>}.
  106. serialize_payload(undefined) ->
  107. undefined;
  108. serialize_payload(Bin) when is_binary(Bin) ->
  109. Bin.
  110. serialize_topics([{_Topic, _Qos}|_] = Topics) ->
  111. << <<(serialize_utf(Topic))/binary, ?RESERVED:6, Qos:2>> || {Topic, Qos} <- Topics >>;
  112. serialize_topics([H|_] = Topics) when is_binary(H) ->
  113. << <<(serialize_utf(Topic))/binary>> || Topic <- Topics >>.
  114. serialize_utf(String) ->
  115. StringBin = unicode:characters_to_binary(String),
  116. Len = size(StringBin),
  117. true = (Len =< 16#ffff),
  118. <<Len:16/big, StringBin/binary>>.
  119. serialize_len(N) when N =< ?LOWBITS ->
  120. <<0:1, N:7>>;
  121. serialize_len(N) ->
  122. <<1:1, (N rem ?HIGHBIT):7, (serialize_len(N div ?HIGHBIT))/binary>>.
  123. opt(undefined) -> ?RESERVED;
  124. opt(false) -> 0;
  125. opt(true) -> 1;
  126. opt(X) when is_integer(X) -> X;
  127. opt(B) when is_binary(B) -> 1.