emqx_frame.erl 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_frame).
  17. -include("emqx.hrl").
  18. -include("emqx_mqtt.hrl").
  19. -export([ initial_parse_state/0
  20. , initial_parse_state/1
  21. ]).
  22. -export([ parse/1
  23. , parse/2
  24. , serialize_fun/0
  25. , serialize_fun/1
  26. , serialize_opts/0
  27. , serialize_opts/1
  28. , serialize_pkt/2
  29. , serialize/1
  30. , serialize/2
  31. ]).
  32. -export_type([ options/0
  33. , parse_state/0
  34. , parse_result/0
  35. , serialize_opts/0
  36. ]).
  37. -type(options() :: #{strict_mode => boolean(),
  38. max_size => 1..?MAX_PACKET_SIZE,
  39. version => emqx_types:version()
  40. }).
  41. -type(parse_state() :: {none, options()} | {cont_state(), options()}).
  42. -type(parse_result() :: {more, parse_state()}
  43. | {ok, emqx_types:packet(), binary(), parse_state()}).
  44. -type(cont_state() :: {Stage :: len | body,
  45. State :: #{hdr := #mqtt_packet_header{},
  46. len := {pos_integer(), non_neg_integer()} | non_neg_integer(),
  47. rest => binary()
  48. }
  49. }).
  50. -type(serialize_opts() :: options()).
  51. -define(none(Options), {none, Options}).
  52. -define(DEFAULT_OPTIONS,
  53. #{strict_mode => false,
  54. max_size => ?MAX_PACKET_SIZE,
  55. version => ?MQTT_PROTO_V4
  56. }).
  57. -dialyzer({no_match, [serialize_utf8_string/2]}).
  58. %%--------------------------------------------------------------------
  59. %% Init Parse State
  60. %%--------------------------------------------------------------------
  61. -spec(initial_parse_state() -> {none, options()}).
  62. initial_parse_state() ->
  63. initial_parse_state(#{}).
  64. -spec(initial_parse_state(options()) -> {none, options()}).
  65. initial_parse_state(Options) when is_map(Options) ->
  66. ?none(merge_opts(Options)).
  67. %% @pivate
  68. merge_opts(Options) ->
  69. maps:merge(?DEFAULT_OPTIONS, Options).
  70. %%--------------------------------------------------------------------
  71. %% Parse MQTT Frame
  72. %%--------------------------------------------------------------------
  73. -spec(parse(binary()) -> parse_result()).
  74. parse(Bin) ->
  75. parse(Bin, initial_parse_state()).
  76. -spec(parse(binary(), parse_state()) -> parse_result()).
  77. parse(<<>>, {none, Options}) ->
  78. {more, {none, Options}};
  79. parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>,
  80. {none, Options = #{strict_mode := StrictMode}}) ->
  81. %% Validate header if strict mode.
  82. StrictMode andalso validate_header(Type, Dup, QoS, Retain),
  83. Header = #mqtt_packet_header{type = Type,
  84. dup = bool(Dup),
  85. qos = QoS,
  86. retain = bool(Retain)
  87. },
  88. Header1 = case fixqos(Type, QoS) of
  89. QoS -> Header;
  90. FixedQoS -> Header#mqtt_packet_header{qos = FixedQoS}
  91. end,
  92. parse_remaining_len(Rest, Header1, Options);
  93. parse(Bin, {{len, #{hdr := Header,
  94. len := {Multiplier, Length}}
  95. }, Options}) when is_binary(Bin) ->
  96. parse_remaining_len(Bin, Header, Multiplier, Length, Options);
  97. parse(Bin, {{body, #{hdr := Header,
  98. len := Length,
  99. rest := Rest}
  100. }, Options}) when is_binary(Bin) ->
  101. parse_frame(<<Rest/binary, Bin/binary>>, Header, Length, Options).
  102. parse_remaining_len(<<>>, Header, Options) ->
  103. {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}};
  104. parse_remaining_len(Rest, Header, Options) ->
  105. parse_remaining_len(Rest, Header, 1, 0, Options).
  106. parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize})
  107. when Length > MaxSize ->
  108. error(frame_too_large);
  109. parse_remaining_len(<<>>, Header, Multiplier, Length, Options) ->
  110. {more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}};
  111. %% Match DISCONNECT without payload
  112. parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) ->
  113. Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}),
  114. {ok, Packet, Rest, ?none(Options)};
  115. %% Match PINGREQ.
  116. parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
  117. parse_frame(Rest, Header, 0, Options);
  118. %% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
  119. parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
  120. parse_frame(Rest, Header, 2, Options);
  121. parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
  122. parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
  123. parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,
  124. Options = #{max_size := MaxSize}) ->
  125. FrameLen = Value + Len * Multiplier,
  126. if
  127. FrameLen > MaxSize -> error(frame_too_large);
  128. true -> parse_frame(Rest, Header, FrameLen, Options)
  129. end.
  130. parse_frame(Bin, Header, 0, Options) ->
  131. {ok, packet(Header), Bin, ?none(Options)};
  132. parse_frame(Bin, Header, Length, Options) ->
  133. case Bin of
  134. <<FrameBin:Length/binary, Rest/binary>> ->
  135. case parse_packet(Header, FrameBin, Options) of
  136. {Variable, Payload} ->
  137. {ok, packet(Header, Variable, Payload), Rest, ?none(Options)};
  138. Variable = #mqtt_packet_connect{proto_ver = Ver} ->
  139. {ok, packet(Header, Variable), Rest, ?none(Options#{version := Ver})};
  140. Variable ->
  141. {ok, packet(Header, Variable), Rest, ?none(Options)}
  142. end;
  143. TooShortBin ->
  144. {more, {{body, #{hdr => Header, len => Length, rest => TooShortBin}}, Options}}
  145. end.
  146. -compile({inline, [packet/1, packet/2, packet/3]}).
  147. packet(Header) ->
  148. #mqtt_packet{header = Header}.
  149. packet(Header, Variable) ->
  150. #mqtt_packet{header = Header, variable = Variable}.
  151. packet(Header, Variable, Payload) ->
  152. #mqtt_packet{header = Header, variable = Variable, payload = Payload}.
  153. parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) ->
  154. {ProtoName, Rest} = parse_utf8_string(FrameBin),
  155. <<BridgeTag:4, ProtoVer:4, Rest1/binary>> = Rest,
  156. % Note: Crash when reserved flag doesn't equal to 0, there is no strict
  157. % compliance with the MQTT5.0.
  158. <<UsernameFlag : 1,
  159. PasswordFlag : 1,
  160. WillRetain : 1,
  161. WillQoS : 2,
  162. WillFlag : 1,
  163. CleanStart : 1,
  164. 0 : 1,
  165. KeepAlive : 16/big,
  166. Rest2/binary>> = Rest1,
  167. {Properties, Rest3} = parse_properties(Rest2, ProtoVer),
  168. {ClientId, Rest4} = parse_utf8_string(Rest3),
  169. ConnPacket = #mqtt_packet_connect{proto_name = ProtoName,
  170. proto_ver = ProtoVer,
  171. is_bridge = (BridgeTag =:= 8),
  172. clean_start = bool(CleanStart),
  173. will_flag = bool(WillFlag),
  174. will_qos = WillQoS,
  175. will_retain = bool(WillRetain),
  176. keepalive = KeepAlive,
  177. properties = Properties,
  178. clientid = ClientId
  179. },
  180. {ConnPacket1, Rest5} = parse_will_message(ConnPacket, Rest4),
  181. {Username, Rest6} = parse_utf8_string(Rest5, bool(UsernameFlag)),
  182. {Passsword, <<>>} = parse_utf8_string(Rest6, bool(PasswordFlag)),
  183. ConnPacket1#mqtt_packet_connect{username = Username, password = Passsword};
  184. parse_packet(#mqtt_packet_header{type = ?CONNACK},
  185. <<AckFlags:8, ReasonCode:8, Rest/binary>>, #{version := Ver}) ->
  186. {Properties, <<>>} = parse_properties(Rest, Ver),
  187. #mqtt_packet_connack{ack_flags = AckFlags,
  188. reason_code = ReasonCode,
  189. properties = Properties
  190. };
  191. parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin,
  192. #{strict_mode := StrictMode, version := Ver}) ->
  193. {TopicName, Rest} = parse_utf8_string(Bin),
  194. {PacketId, Rest1} = case QoS of
  195. ?QOS_0 -> {undefined, Rest};
  196. _ -> parse_packet_id(Rest)
  197. end,
  198. (PacketId =/= undefined) andalso
  199. StrictMode andalso validate_packet_id(PacketId),
  200. {Properties, Payload} = parse_properties(Rest1, Ver),
  201. Publish = #mqtt_packet_publish{topic_name = TopicName,
  202. packet_id = PacketId,
  203. properties = Properties
  204. },
  205. {Publish, Payload};
  206. parse_packet(#mqtt_packet_header{type = PubAck}, <<PacketId:16/big>>, #{strict_mode := StrictMode})
  207. when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP ->
  208. StrictMode andalso validate_packet_id(PacketId),
  209. #mqtt_packet_puback{packet_id = PacketId, reason_code = 0};
  210. parse_packet(#mqtt_packet_header{type = PubAck}, <<PacketId:16/big, ReasonCode, Rest/binary>>,
  211. #{strict_mode := StrictMode, version := Ver = ?MQTT_PROTO_V5})
  212. when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP ->
  213. StrictMode andalso validate_packet_id(PacketId),
  214. {Properties, <<>>} = parse_properties(Rest, Ver),
  215. #mqtt_packet_puback{packet_id = PacketId,
  216. reason_code = ReasonCode,
  217. properties = Properties
  218. };
  219. parse_packet(#mqtt_packet_header{type = ?SUBSCRIBE}, <<PacketId:16/big, Rest/binary>>,
  220. #{strict_mode := StrictMode, version := Ver}) ->
  221. StrictMode andalso validate_packet_id(PacketId),
  222. {Properties, Rest1} = parse_properties(Rest, Ver),
  223. TopicFilters = parse_topic_filters(subscribe, Rest1),
  224. ok = validate_subqos([QoS || {_, #{qos := QoS}} <- TopicFilters]),
  225. #mqtt_packet_subscribe{packet_id = PacketId,
  226. properties = Properties,
  227. topic_filters = TopicFilters
  228. };
  229. parse_packet(#mqtt_packet_header{type = ?SUBACK}, <<PacketId:16/big, Rest/binary>>,
  230. #{strict_mode := StrictMode, version := Ver}) ->
  231. StrictMode andalso validate_packet_id(PacketId),
  232. {Properties, Rest1} = parse_properties(Rest, Ver),
  233. ReasonCodes = parse_reason_codes(Rest1),
  234. #mqtt_packet_suback{packet_id = PacketId,
  235. properties = Properties,
  236. reason_codes = ReasonCodes
  237. };
  238. parse_packet(#mqtt_packet_header{type = ?UNSUBSCRIBE}, <<PacketId:16/big, Rest/binary>>,
  239. #{strict_mode := StrictMode, version := Ver}) ->
  240. StrictMode andalso validate_packet_id(PacketId),
  241. {Properties, Rest1} = parse_properties(Rest, Ver),
  242. TopicFilters = parse_topic_filters(unsubscribe, Rest1),
  243. #mqtt_packet_unsubscribe{packet_id = PacketId,
  244. properties = Properties,
  245. topic_filters = TopicFilters
  246. };
  247. parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <<PacketId:16/big>>,
  248. #{strict_mode := StrictMode}) ->
  249. StrictMode andalso validate_packet_id(PacketId),
  250. #mqtt_packet_unsuback{packet_id = PacketId};
  251. parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <<PacketId:16/big, Rest/binary>>,
  252. #{strict_mode := StrictMode, version := Ver}) ->
  253. StrictMode andalso validate_packet_id(PacketId),
  254. {Properties, Rest1} = parse_properties(Rest, Ver),
  255. ReasonCodes = parse_reason_codes(Rest1),
  256. #mqtt_packet_unsuback{packet_id = PacketId,
  257. properties = Properties,
  258. reason_codes = ReasonCodes
  259. };
  260. parse_packet(#mqtt_packet_header{type = ?DISCONNECT}, <<ReasonCode, Rest/binary>>,
  261. #{version := ?MQTT_PROTO_V5}) ->
  262. {Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5),
  263. #mqtt_packet_disconnect{reason_code = ReasonCode,
  264. properties = Properties
  265. };
  266. parse_packet(#mqtt_packet_header{type = ?AUTH}, <<ReasonCode, Rest/binary>>,
  267. #{version := ?MQTT_PROTO_V5}) ->
  268. {Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5),
  269. #mqtt_packet_auth{reason_code = ReasonCode, properties = Properties}.
  270. parse_will_message(Packet = #mqtt_packet_connect{will_flag = true,
  271. proto_ver = Ver}, Bin) ->
  272. {Props, Rest} = parse_properties(Bin, Ver),
  273. {Topic, Rest1} = parse_utf8_string(Rest),
  274. {Payload, Rest2} = parse_binary_data(Rest1),
  275. {Packet#mqtt_packet_connect{will_props = Props,
  276. will_topic = Topic,
  277. will_payload = Payload
  278. }, Rest2};
  279. parse_will_message(Packet, Bin) -> {Packet, Bin}.
  280. -compile({inline, [parse_packet_id/1]}).
  281. parse_packet_id(<<PacketId:16/big, Rest/binary>>) ->
  282. {PacketId, Rest}.
  283. parse_properties(Bin, Ver) when Ver =/= ?MQTT_PROTO_V5 ->
  284. {#{}, Bin};
  285. %% TODO: version mess?
  286. parse_properties(<<>>, ?MQTT_PROTO_V5) ->
  287. {#{}, <<>>};
  288. parse_properties(<<0, Rest/binary>>, ?MQTT_PROTO_V5) ->
  289. {#{}, Rest};
  290. parse_properties(Bin, ?MQTT_PROTO_V5) ->
  291. {Len, Rest} = parse_variable_byte_integer(Bin),
  292. <<PropsBin:Len/binary, Rest1/binary>> = Rest,
  293. {parse_property(PropsBin, #{}), Rest1}.
  294. parse_property(<<>>, Props) ->
  295. Props;
  296. parse_property(<<16#01, Val, Bin/binary>>, Props) ->
  297. parse_property(Bin, Props#{'Payload-Format-Indicator' => Val});
  298. parse_property(<<16#02, Val:32/big, Bin/binary>>, Props) ->
  299. parse_property(Bin, Props#{'Message-Expiry-Interval' => Val});
  300. parse_property(<<16#03, Bin/binary>>, Props) ->
  301. {Val, Rest} = parse_utf8_string(Bin),
  302. parse_property(Rest, Props#{'Content-Type' => Val});
  303. parse_property(<<16#08, Bin/binary>>, Props) ->
  304. {Val, Rest} = parse_utf8_string(Bin),
  305. parse_property(Rest, Props#{'Response-Topic' => Val});
  306. parse_property(<<16#09, Len:16/big, Val:Len/binary, Bin/binary>>, Props) ->
  307. parse_property(Bin, Props#{'Correlation-Data' => Val});
  308. parse_property(<<16#0B, Bin/binary>>, Props) ->
  309. {Val, Rest} = parse_variable_byte_integer(Bin),
  310. parse_property(Rest, Props#{'Subscription-Identifier' => Val});
  311. parse_property(<<16#11, Val:32/big, Bin/binary>>, Props) ->
  312. parse_property(Bin, Props#{'Session-Expiry-Interval' => Val});
  313. parse_property(<<16#12, Bin/binary>>, Props) ->
  314. {Val, Rest} = parse_utf8_string(Bin),
  315. parse_property(Rest, Props#{'Assigned-Client-Identifier' => Val});
  316. parse_property(<<16#13, Val:16, Bin/binary>>, Props) ->
  317. parse_property(Bin, Props#{'Server-Keep-Alive' => Val});
  318. parse_property(<<16#15, Bin/binary>>, Props) ->
  319. {Val, Rest} = parse_utf8_string(Bin),
  320. parse_property(Rest, Props#{'Authentication-Method' => Val});
  321. parse_property(<<16#16, Len:16/big, Val:Len/binary, Bin/binary>>, Props) ->
  322. parse_property(Bin, Props#{'Authentication-Data' => Val});
  323. parse_property(<<16#17, Val, Bin/binary>>, Props) ->
  324. parse_property(Bin, Props#{'Request-Problem-Information' => Val});
  325. parse_property(<<16#18, Val:32, Bin/binary>>, Props) ->
  326. parse_property(Bin, Props#{'Will-Delay-Interval' => Val});
  327. parse_property(<<16#19, Val, Bin/binary>>, Props) ->
  328. parse_property(Bin, Props#{'Request-Response-Information' => Val});
  329. parse_property(<<16#1A, Bin/binary>>, Props) ->
  330. {Val, Rest} = parse_utf8_string(Bin),
  331. parse_property(Rest, Props#{'Response-Information' => Val});
  332. parse_property(<<16#1C, Bin/binary>>, Props) ->
  333. {Val, Rest} = parse_utf8_string(Bin),
  334. parse_property(Rest, Props#{'Server-Reference' => Val});
  335. parse_property(<<16#1F, Bin/binary>>, Props) ->
  336. {Val, Rest} = parse_utf8_string(Bin),
  337. parse_property(Rest, Props#{'Reason-String' => Val});
  338. parse_property(<<16#21, Val:16/big, Bin/binary>>, Props) ->
  339. parse_property(Bin, Props#{'Receive-Maximum' => Val});
  340. parse_property(<<16#22, Val:16/big, Bin/binary>>, Props) ->
  341. parse_property(Bin, Props#{'Topic-Alias-Maximum' => Val});
  342. parse_property(<<16#23, Val:16/big, Bin/binary>>, Props) ->
  343. parse_property(Bin, Props#{'Topic-Alias' => Val});
  344. parse_property(<<16#24, Val, Bin/binary>>, Props) ->
  345. parse_property(Bin, Props#{'Maximum-QoS' => Val});
  346. parse_property(<<16#25, Val, Bin/binary>>, Props) ->
  347. parse_property(Bin, Props#{'Retain-Available' => Val});
  348. parse_property(<<16#26, Bin/binary>>, Props) ->
  349. {Pair, Rest} = parse_utf8_pair(Bin),
  350. case maps:find('User-Property', Props) of
  351. {ok, UserProps} ->
  352. UserProps1 = lists:append(UserProps, [Pair]),
  353. parse_property(Rest, Props#{'User-Property' := UserProps1});
  354. error ->
  355. parse_property(Rest, Props#{'User-Property' => [Pair]})
  356. end;
  357. parse_property(<<16#27, Val:32, Bin/binary>>, Props) ->
  358. parse_property(Bin, Props#{'Maximum-Packet-Size' => Val});
  359. parse_property(<<16#28, Val, Bin/binary>>, Props) ->
  360. parse_property(Bin, Props#{'Wildcard-Subscription-Available' => Val});
  361. parse_property(<<16#29, Val, Bin/binary>>, Props) ->
  362. parse_property(Bin, Props#{'Subscription-Identifier-Available' => Val});
  363. parse_property(<<16#2A, Val, Bin/binary>>, Props) ->
  364. parse_property(Bin, Props#{'Shared-Subscription-Available' => Val}).
  365. parse_variable_byte_integer(Bin) ->
  366. parse_variable_byte_integer(Bin, 1, 0).
  367. parse_variable_byte_integer(<<1:1, Len:7, Rest/binary>>, Multiplier, Value) ->
  368. parse_variable_byte_integer(Rest, Multiplier * ?HIGHBIT, Value + Len * Multiplier);
  369. parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) ->
  370. {Value + Len * Multiplier, Rest}.
  371. parse_topic_filters(subscribe, Bin) ->
  372. [{Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => QoS}}
  373. || <<Len:16/big, Topic:Len/binary, _:2, Rh:2, Rap:1, Nl:1, QoS:2>> <= Bin];
  374. parse_topic_filters(unsubscribe, Bin) ->
  375. [Topic || <<Len:16/big, Topic:Len/binary>> <= Bin].
  376. parse_reason_codes(Bin) ->
  377. [Code || <<Code>> <= Bin].
  378. parse_utf8_pair(<<Len1:16/big, Key:Len1/binary,
  379. Len2:16/big, Val:Len2/binary, Rest/binary>>) ->
  380. {{Key, Val}, Rest}.
  381. parse_utf8_string(Bin, false) ->
  382. {undefined, Bin};
  383. parse_utf8_string(Bin, true) ->
  384. parse_utf8_string(Bin).
  385. parse_utf8_string(<<Len:16/big, Str:Len/binary, Rest/binary>>) ->
  386. {Str, Rest}.
  387. parse_binary_data(<<Len:16/big, Data:Len/binary, Rest/binary>>) ->
  388. {Data, Rest}.
  389. %%--------------------------------------------------------------------
  390. %% Serialize MQTT Packet
  391. %%--------------------------------------------------------------------
  392. serialize_fun() -> serialize_fun(?DEFAULT_OPTIONS).
  393. serialize_fun(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) ->
  394. MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE),
  395. serialize_fun(#{version => ProtoVer, max_size => MaxSize});
  396. serialize_fun(#{version := Ver, max_size := MaxSize}) ->
  397. fun(Packet) ->
  398. IoData = serialize(Packet, Ver),
  399. case is_too_large(IoData, MaxSize) of
  400. true -> <<>>;
  401. false -> IoData
  402. end
  403. end.
  404. serialize_opts() ->
  405. ?DEFAULT_OPTIONS.
  406. serialize_opts(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) ->
  407. MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE),
  408. #{version => ProtoVer, max_size => MaxSize}.
  409. serialize_pkt(Packet, #{version := Ver, max_size := MaxSize}) ->
  410. IoData = serialize(Packet, Ver),
  411. case is_too_large(IoData, MaxSize) of
  412. true -> <<>>;
  413. false -> IoData
  414. end.
  415. -spec(serialize(emqx_types:packet()) -> iodata()).
  416. serialize(Packet) -> serialize(Packet, ?MQTT_PROTO_V4).
  417. -spec(serialize(emqx_types:packet(), emqx_types:version()) -> iodata()).
  418. serialize(#mqtt_packet{header = Header,
  419. variable = Variable,
  420. payload = Payload}, Ver) ->
  421. serialize(Header, serialize_variable(Variable, Ver), serialize_payload(Payload)).
  422. serialize(#mqtt_packet_header{type = Type,
  423. dup = Dup,
  424. qos = QoS,
  425. retain = Retain
  426. }, VariableBin, PayloadBin)
  427. when ?CONNECT =< Type andalso Type =< ?AUTH ->
  428. Len = iolist_size(VariableBin) + iolist_size(PayloadBin),
  429. [<<Type:4, (flag(Dup)):1, (flag(QoS)):2, (flag(Retain)):1>>,
  430. serialize_remaining_len(Len), VariableBin, PayloadBin].
  431. serialize_variable(#mqtt_packet_connect{
  432. proto_name = ProtoName,
  433. proto_ver = ProtoVer,
  434. is_bridge = IsBridge,
  435. clean_start = CleanStart,
  436. will_flag = WillFlag,
  437. will_qos = WillQoS,
  438. will_retain = WillRetain,
  439. keepalive = KeepAlive,
  440. properties = Properties,
  441. clientid = ClientId,
  442. will_props = WillProps,
  443. will_topic = WillTopic,
  444. will_payload = WillPayload,
  445. username = Username,
  446. password = Password}, _Ver) ->
  447. [serialize_binary_data(ProtoName),
  448. <<(case IsBridge of
  449. true -> 16#80 + ProtoVer;
  450. false -> ProtoVer
  451. end):8,
  452. (flag(Username)):1,
  453. (flag(Password)):1,
  454. (flag(WillRetain)):1,
  455. WillQoS:2,
  456. (flag(WillFlag)):1,
  457. (flag(CleanStart)):1,
  458. 0:1,
  459. KeepAlive:16/big-unsigned-integer>>,
  460. serialize_properties(Properties, ProtoVer),
  461. serialize_utf8_string(ClientId),
  462. case WillFlag of
  463. true -> [serialize_properties(WillProps, ProtoVer),
  464. serialize_utf8_string(WillTopic),
  465. serialize_binary_data(WillPayload)];
  466. false -> <<>>
  467. end,
  468. serialize_utf8_string(Username, true),
  469. serialize_utf8_string(Password, true)];
  470. serialize_variable(#mqtt_packet_connack{ack_flags = AckFlags,
  471. reason_code = ReasonCode,
  472. properties = Properties}, Ver) ->
  473. [AckFlags, ReasonCode, serialize_properties(Properties, Ver)];
  474. serialize_variable(#mqtt_packet_publish{topic_name = TopicName,
  475. packet_id = PacketId,
  476. properties = Properties}, Ver) ->
  477. [serialize_utf8_string(TopicName),
  478. if
  479. PacketId =:= undefined -> <<>>;
  480. true -> <<PacketId:16/big-unsigned-integer>>
  481. end,
  482. serialize_properties(Properties, Ver)];
  483. serialize_variable(#mqtt_packet_puback{packet_id = PacketId}, Ver)
  484. when Ver == ?MQTT_PROTO_V3; Ver == ?MQTT_PROTO_V4 ->
  485. <<PacketId:16/big-unsigned-integer>>;
  486. serialize_variable(#mqtt_packet_puback{packet_id = PacketId,
  487. reason_code = ReasonCode,
  488. properties = Properties
  489. },
  490. Ver = ?MQTT_PROTO_V5) ->
  491. [<<PacketId:16/big-unsigned-integer>>, ReasonCode,
  492. serialize_properties(Properties, Ver)];
  493. serialize_variable(#mqtt_packet_subscribe{packet_id = PacketId,
  494. properties = Properties,
  495. topic_filters = TopicFilters}, Ver) ->
  496. [<<PacketId:16/big-unsigned-integer>>, serialize_properties(Properties, Ver),
  497. serialize_topic_filters(subscribe, TopicFilters, Ver)];
  498. serialize_variable(#mqtt_packet_suback{packet_id = PacketId,
  499. properties = Properties,
  500. reason_codes = ReasonCodes}, Ver) ->
  501. [<<PacketId:16/big-unsigned-integer>>, serialize_properties(Properties, Ver),
  502. serialize_reason_codes(ReasonCodes)];
  503. serialize_variable(#mqtt_packet_unsubscribe{packet_id = PacketId,
  504. properties = Properties,
  505. topic_filters = TopicFilters}, Ver) ->
  506. [<<PacketId:16/big-unsigned-integer>>, serialize_properties(Properties, Ver),
  507. serialize_topic_filters(unsubscribe, TopicFilters, Ver)];
  508. serialize_variable(#mqtt_packet_unsuback{packet_id = PacketId,
  509. properties = Properties,
  510. reason_codes = ReasonCodes}, Ver) ->
  511. [<<PacketId:16/big-unsigned-integer>>, serialize_properties(Properties, Ver),
  512. serialize_reason_codes(ReasonCodes)];
  513. serialize_variable(#mqtt_packet_disconnect{}, Ver)
  514. when Ver == ?MQTT_PROTO_V3; Ver == ?MQTT_PROTO_V4 ->
  515. <<>>;
  516. serialize_variable(#mqtt_packet_disconnect{reason_code = ReasonCode,
  517. properties = Properties},
  518. Ver = ?MQTT_PROTO_V5) ->
  519. [ReasonCode, serialize_properties(Properties, Ver)];
  520. serialize_variable(#mqtt_packet_disconnect{}, _Ver) ->
  521. <<>>;
  522. serialize_variable(#mqtt_packet_auth{reason_code = ReasonCode,
  523. properties = Properties},
  524. Ver = ?MQTT_PROTO_V5) ->
  525. [ReasonCode, serialize_properties(Properties, Ver)];
  526. serialize_variable(PacketId, ?MQTT_PROTO_V3) when is_integer(PacketId) ->
  527. <<PacketId:16/big-unsigned-integer>>;
  528. serialize_variable(PacketId, ?MQTT_PROTO_V4) when is_integer(PacketId) ->
  529. <<PacketId:16/big-unsigned-integer>>;
  530. serialize_variable(undefined, _Ver) ->
  531. <<>>.
  532. serialize_payload(undefined) -> <<>>;
  533. serialize_payload(Bin) -> Bin.
  534. serialize_properties(_Props, Ver) when Ver =/= ?MQTT_PROTO_V5 ->
  535. <<>>;
  536. serialize_properties(Props, ?MQTT_PROTO_V5) ->
  537. serialize_properties(Props).
  538. serialize_properties(undefined) ->
  539. <<0>>;
  540. serialize_properties(Props) when map_size(Props) == 0 ->
  541. <<0>>;
  542. serialize_properties(Props) when is_map(Props) ->
  543. Bin = << <<(serialize_property(Prop, Val))/binary>> || {Prop, Val} <- maps:to_list(Props) >>,
  544. [serialize_variable_byte_integer(byte_size(Bin)), Bin].
  545. serialize_property(_, undefined) ->
  546. <<>>;
  547. serialize_property('Payload-Format-Indicator', Val) ->
  548. <<16#01, Val>>;
  549. serialize_property('Message-Expiry-Interval', Val) ->
  550. <<16#02, Val:32/big>>;
  551. serialize_property('Content-Type', Val) ->
  552. <<16#03, (serialize_utf8_string(Val))/binary>>;
  553. serialize_property('Response-Topic', Val) ->
  554. <<16#08, (serialize_utf8_string(Val))/binary>>;
  555. serialize_property('Correlation-Data', Val) ->
  556. <<16#09, (byte_size(Val)):16, Val/binary>>;
  557. serialize_property('Subscription-Identifier', Val) ->
  558. <<16#0B, (serialize_variable_byte_integer(Val))/binary>>;
  559. serialize_property('Session-Expiry-Interval', Val) ->
  560. <<16#11, Val:32/big>>;
  561. serialize_property('Assigned-Client-Identifier', Val) ->
  562. <<16#12, (serialize_utf8_string(Val))/binary>>;
  563. serialize_property('Server-Keep-Alive', Val) ->
  564. <<16#13, Val:16/big>>;
  565. serialize_property('Authentication-Method', Val) ->
  566. <<16#15, (serialize_utf8_string(Val))/binary>>;
  567. serialize_property('Authentication-Data', Val) ->
  568. <<16#16, (iolist_size(Val)):16, Val/binary>>;
  569. serialize_property('Request-Problem-Information', Val) ->
  570. <<16#17, Val>>;
  571. serialize_property('Will-Delay-Interval', Val) ->
  572. <<16#18, Val:32/big>>;
  573. serialize_property('Request-Response-Information', Val) ->
  574. <<16#19, Val>>;
  575. serialize_property('Response-Information', Val) ->
  576. <<16#1A, (serialize_utf8_string(Val))/binary>>;
  577. serialize_property('Server-Reference', Val) ->
  578. <<16#1C, (serialize_utf8_string(Val))/binary>>;
  579. serialize_property('Reason-String', Val) ->
  580. <<16#1F, (serialize_utf8_string(Val))/binary>>;
  581. serialize_property('Receive-Maximum', Val) ->
  582. <<16#21, Val:16/big>>;
  583. serialize_property('Topic-Alias-Maximum', Val) ->
  584. <<16#22, Val:16/big>>;
  585. serialize_property('Topic-Alias', Val) ->
  586. <<16#23, Val:16/big>>;
  587. serialize_property('Maximum-QoS', Val) ->
  588. <<16#24, Val>>;
  589. serialize_property('Retain-Available', Val) ->
  590. <<16#25, Val>>;
  591. serialize_property('User-Property', {Key, Val}) ->
  592. <<16#26, (serialize_utf8_pair({Key, Val}))/binary>>;
  593. serialize_property('User-Property', Props) when is_list(Props) ->
  594. << <<(serialize_property('User-Property', {Key, Val}))/binary>>
  595. || {Key, Val} <- Props >>;
  596. serialize_property('Maximum-Packet-Size', Val) ->
  597. <<16#27, Val:32/big>>;
  598. serialize_property('Wildcard-Subscription-Available', Val) ->
  599. <<16#28, Val>>;
  600. serialize_property('Subscription-Identifier-Available', Val) ->
  601. <<16#29, Val>>;
  602. serialize_property('Shared-Subscription-Available', Val) ->
  603. <<16#2A, Val>>.
  604. serialize_topic_filters(subscribe, TopicFilters, ?MQTT_PROTO_V5) ->
  605. << <<(serialize_utf8_string(Topic))/binary,
  606. ?RESERVED:2, Rh:2, (flag(Rap)):1,(flag(Nl)):1, QoS:2 >>
  607. || {Topic, #{rh := Rh, rap := Rap, nl := Nl, qos := QoS}} <- TopicFilters >>;
  608. serialize_topic_filters(subscribe, TopicFilters, _Ver) ->
  609. << <<(serialize_utf8_string(Topic))/binary, ?RESERVED:6, QoS:2>>
  610. || {Topic, #{qos := QoS}} <- TopicFilters >>;
  611. serialize_topic_filters(unsubscribe, TopicFilters, _Ver) ->
  612. << <<(serialize_utf8_string(Topic))/binary>> || Topic <- TopicFilters >>.
  613. serialize_reason_codes(undefined) ->
  614. <<>>;
  615. serialize_reason_codes(ReasonCodes) when is_list(ReasonCodes) ->
  616. << <<Code>> || Code <- ReasonCodes >>.
  617. serialize_utf8_pair({Name, Value}) ->
  618. << (serialize_utf8_string(Name))/binary, (serialize_utf8_string(Value))/binary >>.
  619. serialize_binary_data(Bin) ->
  620. [<<(byte_size(Bin)):16/big-unsigned-integer>>, Bin].
  621. serialize_utf8_string(undefined, false) ->
  622. error(utf8_string_undefined);
  623. serialize_utf8_string(undefined, true) ->
  624. <<>>;
  625. serialize_utf8_string(String, _AllowNull) ->
  626. serialize_utf8_string(String).
  627. serialize_utf8_string(String) ->
  628. StringBin = unicode:characters_to_binary(String),
  629. Len = byte_size(StringBin),
  630. true = (Len =< 16#ffff),
  631. <<Len:16/big, StringBin/binary>>.
  632. serialize_remaining_len(I) ->
  633. serialize_variable_byte_integer(I).
  634. serialize_variable_byte_integer(N) when N =< ?LOWBITS ->
  635. <<0:1, N:7>>;
  636. serialize_variable_byte_integer(N) ->
  637. <<1:1, (N rem ?HIGHBIT):7, (serialize_variable_byte_integer(N div ?HIGHBIT))/binary>>.
  638. %% Is the frame too large?
  639. -spec(is_too_large(iodata(), pos_integer()) -> boolean()).
  640. is_too_large(IoData, MaxSize) ->
  641. iolist_size(IoData) >= MaxSize.
  642. get_property(_Key, undefined, Default) ->
  643. Default;
  644. get_property(Key, Props, Default) ->
  645. maps:get(Key, Props, Default).
  646. %% Validate header if sctrict mode. See: mqtt-v5.0: 2.1.3 Flags
  647. validate_header(?CONNECT, 0, 0, 0) -> ok;
  648. validate_header(?CONNACK, 0, 0, 0) -> ok;
  649. validate_header(?PUBLISH, 0, ?QOS_0, _) -> ok;
  650. validate_header(?PUBLISH, _, ?QOS_1, _) -> ok;
  651. validate_header(?PUBLISH, 0, ?QOS_2, _) -> ok;
  652. validate_header(?PUBACK, 0, 0, 0) -> ok;
  653. validate_header(?PUBREC, 0, 0, 0) -> ok;
  654. validate_header(?PUBREL, 0, 1, 0) -> ok;
  655. validate_header(?PUBCOMP, 0, 0, 0) -> ok;
  656. validate_header(?SUBSCRIBE, 0, 1, 0) -> ok;
  657. validate_header(?SUBACK, 0, 0, 0) -> ok;
  658. validate_header(?UNSUBSCRIBE, 0, 1, 0) -> ok;
  659. validate_header(?UNSUBACK, 0, 0, 0) -> ok;
  660. validate_header(?PINGREQ, 0, 0, 0) -> ok;
  661. validate_header(?PINGRESP, 0, 0, 0) -> ok;
  662. validate_header(?DISCONNECT, 0, 0, 0) -> ok;
  663. validate_header(?AUTH, 0, 0, 0) -> ok;
  664. validate_header(_Type, _Dup, _QoS, _Rt) -> error(bad_frame_header).
  665. -compile({inline, [validate_packet_id/1]}).
  666. validate_packet_id(0) -> error(bad_packet_id);
  667. validate_packet_id(_) -> ok.
  668. validate_subqos([3|_]) -> error(bad_subqos);
  669. validate_subqos([_|T]) -> validate_subqos(T);
  670. validate_subqos([]) -> ok.
  671. bool(0) -> false;
  672. bool(1) -> true.
  673. flag(undefined) -> ?RESERVED;
  674. flag(false) -> 0;
  675. flag(true) -> 1;
  676. flag(X) when is_integer(X) -> X;
  677. flag(B) when is_binary(B) -> 1.
  678. fixqos(?PUBREL, 0) -> 1;
  679. fixqos(?SUBSCRIBE, 0) -> 1;
  680. fixqos(?UNSUBSCRIBE, 0) -> 1;
  681. fixqos(_Type, QoS) -> QoS.