emqttd_parser.erl 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% @doc MQTT Packet Parser
  17. -module(emqttd_parser).
  18. -include("emqttd.hrl").
  19. -include("emqttd_protocol.hrl").
  20. %% API
  21. -export([new/1, parse/2]).
  22. -record(mqtt_packet_limit, {max_packet_size}).
  23. -type option() :: {atom(), any()}.
  24. -type parser() :: fun( (binary()) -> any() ).
  25. %% @doc Initialize a parser
  26. -spec new(Opts :: [option()]) -> parser().
  27. new(Opts) ->
  28. fun(Bin) -> parse(Bin, {none, limit(Opts)}) end.
  29. limit(Opts) ->
  30. #mqtt_packet_limit{max_packet_size =
  31. proplists:get_value(max_packet_size, Opts, ?MAX_LEN)}.
  32. %% @doc Parse MQTT Packet
  33. -spec parse(binary(), {none, [option()]} | fun()) ->
  34. {ok, mqtt_packet()} | {error, any()} | {more, fun()}.
  35. parse(<<>>, {none, Limit}) ->
  36. {more, fun(Bin) -> parse(Bin, {none, Limit}) end};
  37. parse(<<PacketType:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->
  38. parse_remaining_len(Rest, #mqtt_packet_header{type = PacketType,
  39. dup = bool(Dup),
  40. qos = QoS,
  41. retain = bool(Retain)}, Limit);
  42. parse(Bin, Cont) -> Cont(Bin).
  43. parse_remaining_len(<<>>, Header, Limit) ->
  44. {more, fun(Bin) -> parse_remaining_len(Bin, Header, Limit) end};
  45. parse_remaining_len(Rest, Header, Limit) ->
  46. parse_remaining_len(Rest, Header, 1, 0, Limit).
  47. parse_remaining_len(_Bin, _Header, _Multiplier, Length, #mqtt_packet_limit{max_packet_size = MaxLen})
  48. when Length > MaxLen ->
  49. {error, invalid_mqtt_frame_len};
  50. parse_remaining_len(<<>>, Header, Multiplier, Length, Limit) ->
  51. {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Limit) end};
  52. parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Limit) ->
  53. parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Limit);
  54. parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, #mqtt_packet_limit{max_packet_size = MaxLen}) ->
  55. FrameLen = Value + Len * Multiplier,
  56. if
  57. FrameLen > MaxLen -> {error, invalid_mqtt_frame_len};
  58. true -> parse_frame(Rest, Header, FrameLen)
  59. end.
  60. parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) ->
  61. case {Type, Bin} of
  62. {?CONNECT, <<FrameBin:Length/binary, Rest/binary>>} ->
  63. {ProtoName, Rest1} = parse_utf(FrameBin),
  64. %% Fix mosquitto bridge: 0x83, 0x84
  65. <<_Bridge:4, ProtoVersion:4, Rest2/binary>> = Rest1,
  66. <<UsernameFlag : 1,
  67. PasswordFlag : 1,
  68. WillRetain : 1,
  69. WillQos : 2,
  70. WillFlag : 1,
  71. CleanSession : 1,
  72. _Reserved : 1,
  73. KeepAlive : 16/big,
  74. Rest3/binary>> = Rest2,
  75. {ClientId, Rest4} = parse_utf(Rest3),
  76. {WillTopic, Rest5} = parse_utf(Rest4, WillFlag),
  77. {WillMsg, Rest6} = parse_msg(Rest5, WillFlag),
  78. {UserName, Rest7} = parse_utf(Rest6, UsernameFlag),
  79. {PasssWord, <<>>} = parse_utf(Rest7, PasswordFlag),
  80. case protocol_name_approved(ProtoVersion, ProtoName) of
  81. true ->
  82. wrap(Header,
  83. #mqtt_packet_connect{
  84. proto_ver = ProtoVersion,
  85. proto_name = ProtoName,
  86. will_retain = bool(WillRetain),
  87. will_qos = WillQos,
  88. will_flag = bool(WillFlag),
  89. clean_sess = bool(CleanSession),
  90. keep_alive = KeepAlive,
  91. client_id = ClientId,
  92. will_topic = WillTopic,
  93. will_msg = WillMsg,
  94. username = UserName,
  95. password = PasssWord}, Rest);
  96. false ->
  97. {error, protocol_header_corrupt}
  98. end;
  99. %{?CONNACK, <<FrameBin:Length/binary, Rest/binary>>} ->
  100. % <<_Reserved:7, SP:1, ReturnCode:8>> = FrameBin,
  101. % wrap(Header, #mqtt_packet_connack{ack_flags = SP,
  102. % return_code = ReturnCode }, Rest);
  103. {?PUBLISH, <<FrameBin:Length/binary, Rest/binary>>} ->
  104. {TopicName, Rest1} = parse_utf(FrameBin),
  105. {PacketId, Payload} = case Qos of
  106. 0 -> {undefined, Rest1};
  107. _ -> <<Id:16/big, R/binary>> = Rest1,
  108. {Id, R}
  109. end,
  110. wrap(Header, #mqtt_packet_publish{topic_name = TopicName,
  111. packet_id = PacketId},
  112. Payload, Rest);
  113. {?PUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
  114. <<PacketId:16/big>> = FrameBin,
  115. wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest);
  116. {?PUBREC, <<FrameBin:Length/binary, Rest/binary>>} ->
  117. <<PacketId:16/big>> = FrameBin,
  118. wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest);
  119. {?PUBREL, <<FrameBin:Length/binary, Rest/binary>>} ->
  120. 1 = Qos,
  121. <<PacketId:16/big>> = FrameBin,
  122. wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest);
  123. {?PUBCOMP, <<FrameBin:Length/binary, Rest/binary>>} ->
  124. <<PacketId:16/big>> = FrameBin,
  125. wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest);
  126. {?SUBSCRIBE, <<FrameBin:Length/binary, Rest/binary>>} ->
  127. 1 = Qos,
  128. <<PacketId:16/big, Rest1/binary>> = FrameBin,
  129. TopicTable = parse_topics(?SUBSCRIBE, Rest1, []),
  130. wrap(Header, #mqtt_packet_subscribe{packet_id = PacketId,
  131. topic_table = TopicTable}, Rest);
  132. %{?SUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
  133. % <<PacketId:16/big, Rest1/binary>> = FrameBin,
  134. % wrap(Header, #mqtt_packet_suback{packet_id = PacketId,
  135. % qos_table = parse_qos(Rest1, []) }, Rest);
  136. {?UNSUBSCRIBE, <<FrameBin:Length/binary, Rest/binary>>} ->
  137. 1 = Qos,
  138. <<PacketId:16/big, Rest1/binary>> = FrameBin,
  139. Topics = parse_topics(?UNSUBSCRIBE, Rest1, []),
  140. wrap(Header, #mqtt_packet_unsubscribe{packet_id = PacketId,
  141. topics = Topics}, Rest);
  142. %{?UNSUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
  143. % <<PacketId:16/big>> = FrameBin,
  144. % wrap(Header, #mqtt_packet_unsuback { packet_id = PacketId }, Rest);
  145. {?PINGREQ, Rest} ->
  146. Length = 0,
  147. wrap(Header, Rest);
  148. %{?PINGRESP, Rest} ->
  149. % Length = 0,
  150. % wrap(Header, Rest);
  151. {?DISCONNECT, Rest} ->
  152. Length = 0,
  153. wrap(Header, Rest);
  154. {_, TooShortBin} ->
  155. {more, fun(BinMore) ->
  156. parse_frame(<<TooShortBin/binary, BinMore/binary>>,
  157. Header, Length)
  158. end}
  159. end.
  160. wrap(Header, Variable, Payload, Rest) ->
  161. {ok, #mqtt_packet{header = Header, variable = Variable, payload = Payload}, Rest}.
  162. wrap(Header, Variable, Rest) ->
  163. {ok, #mqtt_packet{header = Header, variable = Variable}, Rest}.
  164. wrap(Header, Rest) ->
  165. {ok, #mqtt_packet{header = Header}, Rest}.
  166. %client function
  167. %parse_qos(<<>>, Acc) ->
  168. % lists:reverse(Acc);
  169. %parse_qos(<<QoS:8/unsigned, Rest/binary>>, Acc) ->
  170. % parse_qos(Rest, [QoS | Acc]).
  171. parse_topics(_, <<>>, Topics) ->
  172. lists:reverse(Topics);
  173. parse_topics(?SUBSCRIBE = Sub, Bin, Topics) ->
  174. {Name, <<_:6, QoS:2, Rest/binary>>} = parse_utf(Bin),
  175. parse_topics(Sub, Rest, [{Name, QoS}| Topics]);
  176. parse_topics(?UNSUBSCRIBE = Sub, Bin, Topics) ->
  177. {Name, <<Rest/binary>>} = parse_utf(Bin),
  178. parse_topics(Sub, Rest, [Name | Topics]).
  179. parse_utf(Bin, 0) ->
  180. {undefined, Bin};
  181. parse_utf(Bin, _) ->
  182. parse_utf(Bin).
  183. parse_utf(<<Len:16/big, Str:Len/binary, Rest/binary>>) ->
  184. {Str, Rest}.
  185. parse_msg(Bin, 0) ->
  186. {undefined, Bin};
  187. parse_msg(<<Len:16/big, Msg:Len/binary, Rest/binary>>, _) ->
  188. {Msg, Rest}.
  189. bool(0) -> false;
  190. bool(1) -> true.
  191. protocol_name_approved(Ver, Name) ->
  192. lists:member({Ver, Name}, ?PROTOCOL_NAMES).