emqx_frame.erl 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2018-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_frame).
  17. -include("emqx.hrl").
  18. -include("emqx_mqtt.hrl").
  19. -export([
  20. initial_parse_state/0,
  21. initial_parse_state/1
  22. ]).
  23. -export([
  24. parse/1,
  25. parse/2,
  26. serialize_fun/0,
  27. serialize_fun/1,
  28. serialize_opts/0,
  29. serialize_opts/1,
  30. serialize_pkt/2,
  31. serialize/1,
  32. serialize/2
  33. ]).
  34. -export([describe_state/1]).
  35. -export_type([
  36. options/0,
  37. parse_state/0,
  38. parse_result/0,
  39. serialize_opts/0
  40. ]).
  41. -define(Q(BYTES, Q), {BYTES, Q}).
  42. -type options() :: #{
  43. strict_mode => boolean(),
  44. max_size => 1..?MAX_PACKET_SIZE,
  45. version => emqx_types:proto_ver()
  46. }.
  47. -define(NONE(Options), {none, Options}).
  48. -type parse_state() :: ?NONE(options()) | {cont_state(), options()}.
  49. -type parse_result() ::
  50. {more, parse_state()}
  51. | {ok, emqx_types:packet(), binary(), parse_state()}.
  52. -type cont_state() ::
  53. {
  54. Stage :: len | body,
  55. State :: #{
  56. hdr := #mqtt_packet_header{},
  57. len := {pos_integer(), non_neg_integer()} | non_neg_integer(),
  58. rest => binary() | ?Q(non_neg_integer(), queue:queue(binary()))
  59. }
  60. }.
  61. -type serialize_opts() :: options().
  62. -define(DEFAULT_OPTIONS, #{
  63. strict_mode => false,
  64. max_size => ?MAX_PACKET_SIZE,
  65. version => ?MQTT_PROTO_V4
  66. }).
  67. -define(PARSE_ERR(Reason), ?THROW_FRAME_ERROR(Reason)).
  68. -define(SERIALIZE_ERR(Reason), ?THROW_SERIALIZE_ERROR(Reason)).
  69. -define(MULTIPLIER_MAX, 16#200000).
  70. -dialyzer({no_match, [serialize_utf8_string/2]}).
  71. %% @doc Describe state for logging.
  72. describe_state(?NONE(_Opts)) ->
  73. <<"clean">>;
  74. describe_state({{len, _}, _Opts}) ->
  75. <<"parsing_varint_length">>;
  76. describe_state({{body, State}, _Opts}) ->
  77. #{
  78. hdr := Hdr,
  79. len := Len
  80. } = State,
  81. Desc = #{
  82. parsed_header => Hdr,
  83. expected_bytes => Len
  84. },
  85. case maps:get(rest, State, undefined) of
  86. undefined -> Desc;
  87. Body -> Desc#{received_bytes => body_bytes(Body)}
  88. end.
  89. %%--------------------------------------------------------------------
  90. %% Init Parse State
  91. %%--------------------------------------------------------------------
  92. -spec initial_parse_state() -> ?NONE(options()).
  93. initial_parse_state() ->
  94. initial_parse_state(#{}).
  95. -spec initial_parse_state(options()) -> ?NONE(options()).
  96. initial_parse_state(Options) when is_map(Options) ->
  97. ?NONE(maps:merge(?DEFAULT_OPTIONS, Options)).
  98. %%--------------------------------------------------------------------
  99. %% Parse MQTT Frame
  100. %%--------------------------------------------------------------------
  101. -spec parse(binary()) -> parse_result().
  102. parse(Bin) ->
  103. parse(Bin, initial_parse_state()).
  104. -spec parse(binary(), parse_state()) -> parse_result().
  105. parse(<<>>, ?NONE(Options)) ->
  106. {more, ?NONE(Options)};
  107. parse(
  108. <<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>,
  109. ?NONE(Options = #{strict_mode := StrictMode})
  110. ) ->
  111. %% Validate header if strict mode.
  112. StrictMode andalso validate_header(Type, Dup, QoS, Retain),
  113. Header = #mqtt_packet_header{
  114. type = Type,
  115. dup = bool(Dup),
  116. qos = fixqos(Type, QoS),
  117. retain = bool(Retain)
  118. },
  119. parse_remaining_len(Rest, Header, Options);
  120. parse(Bin, {
  121. {len, #{
  122. hdr := Header,
  123. len := {Multiplier, Length}
  124. }},
  125. Options
  126. }) when is_binary(Bin) ->
  127. parse_remaining_len(Bin, Header, Multiplier, Length, Options);
  128. parse(Bin, {
  129. {body, #{
  130. hdr := Header,
  131. len := Length,
  132. rest := Body
  133. }},
  134. Options
  135. }) when is_binary(Bin) ->
  136. NewBody = append_body(Body, Bin),
  137. parse_frame(NewBody, Header, Length, Options).
  138. parse_remaining_len(<<>>, Header, Options) ->
  139. {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}};
  140. parse_remaining_len(Rest, Header, Options) ->
  141. parse_remaining_len(Rest, Header, 1, 0, Options).
  142. parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) when
  143. Length > MaxSize
  144. ->
  145. ?PARSE_ERR(frame_too_large);
  146. parse_remaining_len(<<>>, Header, Multiplier, Length, Options) ->
  147. {more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}};
  148. %% Match DISCONNECT without payload
  149. parse_remaining_len(
  150. <<0:8, Rest/binary>>,
  151. Header = #mqtt_packet_header{type = ?DISCONNECT},
  152. 1,
  153. 0,
  154. Options
  155. ) ->
  156. Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}),
  157. {ok, Packet, Rest, ?NONE(Options)};
  158. %% Match PINGREQ.
  159. parse_remaining_len(
  160. <<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?PINGREQ}, 1, 0, Options
  161. ) ->
  162. parse_frame(Rest, Header, 0, Options);
  163. parse_remaining_len(
  164. <<0:8, _Rest/binary>>, _Header = #mqtt_packet_header{type = ?PINGRESP}, 1, 0, _Options
  165. ) ->
  166. ?PARSE_ERR(#{hint => unexpected_packet, header_type => 'PINGRESP'});
  167. %% All other types of messages should not have a zero remaining length.
  168. parse_remaining_len(
  169. <<0:8, _Rest/binary>>, Header, 1, 0, _Options
  170. ) ->
  171. ?PARSE_ERR(#{hint => zero_remaining_len, header_type => Header#mqtt_packet_header.type});
  172. %% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
  173. parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
  174. parse_frame(Rest, Header, 2, Options);
  175. parse_remaining_len(<<1:1, _Len:7, _Rest/binary>>, _Header, Multiplier, _Value, _Options) when
  176. Multiplier > ?MULTIPLIER_MAX
  177. ->
  178. ?PARSE_ERR(malformed_variable_byte_integer);
  179. parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
  180. parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
  181. parse_remaining_len(
  182. <<0:1, Len:7, Rest/binary>>,
  183. Header,
  184. Multiplier,
  185. Value,
  186. Options = #{max_size := MaxSize}
  187. ) ->
  188. FrameLen = Value + Len * Multiplier,
  189. case FrameLen > MaxSize of
  190. true -> ?PARSE_ERR(frame_too_large);
  191. false -> parse_frame(Rest, Header, FrameLen, Options)
  192. end.
  193. body_bytes(B) when is_binary(B) -> size(B);
  194. body_bytes(?Q(Bytes, _)) -> Bytes.
  195. append_body(H, <<>>) ->
  196. H;
  197. append_body(H, T) when is_binary(H) andalso size(H) < 1024 ->
  198. <<H/binary, T/binary>>;
  199. append_body(H, T) when is_binary(H) ->
  200. Bytes = size(H) + size(T),
  201. ?Q(Bytes, queue:from_list([H, T]));
  202. append_body(?Q(Bytes, Q), T) ->
  203. ?Q(Bytes + iolist_size(T), queue:in(T, Q)).
  204. flatten_body(Body) when is_binary(Body) -> Body;
  205. flatten_body(?Q(_, Q)) -> iolist_to_binary(queue:to_list(Q)).
  206. parse_frame(Body, Header, 0, Options) ->
  207. {ok, packet(Header), flatten_body(Body), ?NONE(Options)};
  208. parse_frame(Body, Header, Length, Options) ->
  209. case body_bytes(Body) >= Length of
  210. true ->
  211. <<FrameBin:Length/binary, Rest/binary>> = flatten_body(Body),
  212. case parse_packet(Header, FrameBin, Options) of
  213. {Variable, Payload} ->
  214. {ok, packet(Header, Variable, Payload), Rest, ?NONE(Options)};
  215. Variable = #mqtt_packet_connect{proto_ver = Ver} ->
  216. {ok, packet(Header, Variable), Rest, ?NONE(Options#{version := Ver})};
  217. Variable ->
  218. {ok, packet(Header, Variable), Rest, ?NONE(Options)}
  219. end;
  220. false ->
  221. {more, {
  222. {body, #{
  223. hdr => Header,
  224. len => Length,
  225. rest => Body
  226. }},
  227. Options
  228. }}
  229. end.
  230. -compile({inline, [packet/1, packet/2, packet/3]}).
  231. packet(Header) ->
  232. #mqtt_packet{header = Header}.
  233. packet(Header, Variable) ->
  234. #mqtt_packet{header = Header, variable = Variable}.
  235. packet(Header, Variable, Payload) ->
  236. #mqtt_packet{header = Header, variable = Variable, payload = Payload}.
  237. parse_connect(FrameBin, StrictMode) ->
  238. {ProtoName, Rest} = parse_utf8_string_with_hint(FrameBin, StrictMode, invalid_proto_name),
  239. case ProtoName of
  240. <<"MQTT">> ->
  241. ok;
  242. <<"MQIsdp">> ->
  243. ok;
  244. _ ->
  245. %% from spec: the server MAY send disconnect with reason code 0x84
  246. %% we chose to close socket because the client is likely not talking MQTT anyway
  247. ?PARSE_ERR(#{
  248. hint => invalid_proto_name,
  249. expected => <<"'MQTT' or 'MQIsdp'">>,
  250. received => ProtoName
  251. })
  252. end,
  253. parse_connect2(ProtoName, Rest, StrictMode).
  254. % Note: return malformed if reserved flag is not 0.
  255. parse_connect2(
  256. ProtoName,
  257. <<BridgeTag:4, ProtoVer:4, UsernameFlag:1, PasswordFlag:1, WillRetain:1, WillQoS:2, WillFlag:1,
  258. CleanStart:1, Reserved:1, KeepAlive:16/big, Rest2/binary>>,
  259. StrictMode
  260. ) ->
  261. case Reserved of
  262. 0 -> ok;
  263. 1 -> ?PARSE_ERR(reserved_connect_flag)
  264. end,
  265. {Properties, Rest3} = parse_properties(Rest2, ProtoVer, StrictMode),
  266. {ClientId, Rest4} = parse_utf8_string_with_hint(Rest3, StrictMode, invalid_clientid),
  267. ConnPacket = #mqtt_packet_connect{
  268. proto_name = ProtoName,
  269. proto_ver = ProtoVer,
  270. is_bridge = (BridgeTag =:= 8),
  271. clean_start = bool(CleanStart),
  272. will_flag = bool(WillFlag),
  273. will_qos = WillQoS,
  274. will_retain = bool(WillRetain),
  275. keepalive = KeepAlive,
  276. properties = Properties,
  277. clientid = ClientId
  278. },
  279. {ConnPacket1, Rest5} = parse_will_message(ConnPacket, Rest4, StrictMode),
  280. {Username, Rest6} = parse_optional(
  281. Rest5,
  282. fun(Bin) ->
  283. parse_utf8_string_with_hint(Bin, StrictMode, invalid_username)
  284. end,
  285. bool(UsernameFlag)
  286. ),
  287. {Password, Rest7} = parse_optional(
  288. Rest6,
  289. fun(Bin) ->
  290. parse_utf8_string_with_hint(Bin, StrictMode, invalid_password)
  291. end,
  292. bool(PasswordFlag)
  293. ),
  294. case Rest7 of
  295. <<>> ->
  296. ConnPacket1#mqtt_packet_connect{username = Username, password = Password};
  297. _ ->
  298. ?PARSE_ERR(malformed_connect_data)
  299. end;
  300. parse_connect2(_ProtoName, _, _) ->
  301. ?PARSE_ERR(malformed_connect_header).
  302. parse_packet(
  303. #mqtt_packet_header{type = ?CONNECT},
  304. FrameBin,
  305. #{strict_mode := StrictMode}
  306. ) ->
  307. parse_connect(FrameBin, StrictMode);
  308. parse_packet(
  309. #mqtt_packet_header{type = ?CONNACK},
  310. <<AckFlags:8, ReasonCode:8, Rest/binary>>,
  311. #{version := Ver, strict_mode := StrictMode}
  312. ) ->
  313. %% Not possible for broker to receive!
  314. case parse_properties(Rest, Ver, StrictMode) of
  315. {Properties, <<>>} ->
  316. #mqtt_packet_connack{
  317. ack_flags = AckFlags,
  318. reason_code = ReasonCode,
  319. properties = Properties
  320. };
  321. _ ->
  322. ?PARSE_ERR(malformed_properties)
  323. end;
  324. parse_packet(
  325. #mqtt_packet_header{type = ?PUBLISH, qos = QoS},
  326. Bin,
  327. #{strict_mode := StrictMode, version := Ver}
  328. ) ->
  329. {TopicName, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_topic),
  330. {PacketId, Rest1} =
  331. case QoS of
  332. ?QOS_0 -> {undefined, Rest};
  333. _ -> parse_packet_id(Rest)
  334. end,
  335. (PacketId =/= undefined) andalso
  336. StrictMode andalso validate_packet_id(PacketId),
  337. {Properties, Payload} = parse_properties(Rest1, Ver, StrictMode),
  338. Publish = #mqtt_packet_publish{
  339. topic_name = TopicName,
  340. packet_id = PacketId,
  341. properties = Properties
  342. },
  343. {Publish, Payload};
  344. parse_packet(#mqtt_packet_header{type = PubAck}, <<PacketId:16/big>>, #{strict_mode := StrictMode}) when
  345. ?PUBACK =< PubAck, PubAck =< ?PUBCOMP
  346. ->
  347. StrictMode andalso validate_packet_id(PacketId),
  348. #mqtt_packet_puback{packet_id = PacketId, reason_code = 0};
  349. parse_packet(
  350. #mqtt_packet_header{type = PubAck},
  351. <<PacketId:16/big, ReasonCode, Rest/binary>>,
  352. #{strict_mode := StrictMode, version := Ver = ?MQTT_PROTO_V5}
  353. ) when
  354. ?PUBACK =< PubAck, PubAck =< ?PUBCOMP
  355. ->
  356. StrictMode andalso validate_packet_id(PacketId),
  357. {Properties, <<>>} = parse_properties(Rest, Ver, StrictMode),
  358. #mqtt_packet_puback{
  359. packet_id = PacketId,
  360. reason_code = ReasonCode,
  361. properties = Properties
  362. };
  363. parse_packet(
  364. #mqtt_packet_header{type = ?SUBSCRIBE},
  365. <<PacketId:16/big, Rest/binary>>,
  366. #{strict_mode := StrictMode, version := Ver}
  367. ) ->
  368. StrictMode andalso validate_packet_id(PacketId),
  369. {Properties, Rest1} = parse_properties(Rest, Ver, StrictMode),
  370. TopicFilters = parse_topic_filters(subscribe, Rest1),
  371. ok = validate_subqos([QoS || {_, #{qos := QoS}} <- TopicFilters]),
  372. #mqtt_packet_subscribe{
  373. packet_id = PacketId,
  374. properties = Properties,
  375. topic_filters = TopicFilters
  376. };
  377. parse_packet(
  378. #mqtt_packet_header{type = ?SUBACK},
  379. <<PacketId:16/big, Rest/binary>>,
  380. #{strict_mode := StrictMode, version := Ver}
  381. ) ->
  382. StrictMode andalso validate_packet_id(PacketId),
  383. {Properties, Rest1} = parse_properties(Rest, Ver, StrictMode),
  384. ReasonCodes = parse_reason_codes(Rest1),
  385. #mqtt_packet_suback{
  386. packet_id = PacketId,
  387. properties = Properties,
  388. reason_codes = ReasonCodes
  389. };
  390. parse_packet(
  391. #mqtt_packet_header{type = ?UNSUBSCRIBE},
  392. <<PacketId:16/big, Rest/binary>>,
  393. #{strict_mode := StrictMode, version := Ver}
  394. ) ->
  395. StrictMode andalso validate_packet_id(PacketId),
  396. {Properties, Rest1} = parse_properties(Rest, Ver, StrictMode),
  397. TopicFilters = parse_topic_filters(unsubscribe, Rest1),
  398. #mqtt_packet_unsubscribe{
  399. packet_id = PacketId,
  400. properties = Properties,
  401. topic_filters = TopicFilters
  402. };
  403. parse_packet(
  404. #mqtt_packet_header{type = ?UNSUBACK},
  405. <<PacketId:16/big>>,
  406. #{strict_mode := StrictMode}
  407. ) ->
  408. StrictMode andalso validate_packet_id(PacketId),
  409. #mqtt_packet_unsuback{packet_id = PacketId};
  410. parse_packet(
  411. #mqtt_packet_header{type = ?UNSUBACK},
  412. <<PacketId:16/big, Rest/binary>>,
  413. #{strict_mode := StrictMode, version := Ver}
  414. ) ->
  415. StrictMode andalso validate_packet_id(PacketId),
  416. {Properties, Rest1} = parse_properties(Rest, Ver, StrictMode),
  417. ReasonCodes = parse_reason_codes(Rest1),
  418. #mqtt_packet_unsuback{
  419. packet_id = PacketId,
  420. properties = Properties,
  421. reason_codes = ReasonCodes
  422. };
  423. parse_packet(
  424. #mqtt_packet_header{type = ?DISCONNECT},
  425. <<ReasonCode, Rest/binary>>,
  426. #{strict_mode := StrictMode, version := ?MQTT_PROTO_V5}
  427. ) ->
  428. {Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5, StrictMode),
  429. #mqtt_packet_disconnect{
  430. reason_code = ReasonCode,
  431. properties = Properties
  432. };
  433. parse_packet(
  434. #mqtt_packet_header{type = ?AUTH},
  435. <<ReasonCode, Rest/binary>>,
  436. #{strict_mode := StrictMode, version := ?MQTT_PROTO_V5}
  437. ) ->
  438. {Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5, StrictMode),
  439. #mqtt_packet_auth{reason_code = ReasonCode, properties = Properties};
  440. parse_packet(_Header, _FrameBin, _Options) ->
  441. ?PARSE_ERR(malformed_packet).
  442. parse_will_message(
  443. Packet = #mqtt_packet_connect{
  444. will_flag = true,
  445. proto_ver = Ver
  446. },
  447. Bin,
  448. StrictMode
  449. ) ->
  450. {Props, Rest} = parse_properties(Bin, Ver, StrictMode),
  451. {Topic, Rest1} = parse_utf8_string_with_hint(Rest, StrictMode, invalid_topic),
  452. {Payload, Rest2} = parse_binary_data(Rest1),
  453. {
  454. Packet#mqtt_packet_connect{
  455. will_props = Props,
  456. will_topic = Topic,
  457. will_payload = Payload
  458. },
  459. Rest2
  460. };
  461. parse_will_message(Packet, Bin, _StrictMode) ->
  462. {Packet, Bin}.
  463. -compile({inline, [parse_packet_id/1]}).
  464. parse_packet_id(<<PacketId:16/big, Rest/binary>>) ->
  465. {PacketId, Rest};
  466. parse_packet_id(_) ->
  467. ?PARSE_ERR(invalid_packet_id).
  468. parse_properties(Bin, Ver, _StrictMode) when Ver =/= ?MQTT_PROTO_V5 ->
  469. {#{}, Bin};
  470. %% TODO: version mess?
  471. parse_properties(<<>>, ?MQTT_PROTO_V5, _StrictMode) ->
  472. {#{}, <<>>};
  473. parse_properties(<<0, Rest/binary>>, ?MQTT_PROTO_V5, _StrictMode) ->
  474. {#{}, Rest};
  475. parse_properties(Bin, ?MQTT_PROTO_V5, StrictMode) ->
  476. {Len, Rest} = parse_variable_byte_integer(Bin),
  477. <<PropsBin:Len/binary, Rest1/binary>> = Rest,
  478. {parse_property(PropsBin, #{}, StrictMode), Rest1}.
  479. parse_property(<<>>, Props, _StrictMode) ->
  480. Props;
  481. parse_property(<<16#01, Val, Bin/binary>>, Props, StrictMode) ->
  482. parse_property(Bin, Props#{'Payload-Format-Indicator' => Val}, StrictMode);
  483. parse_property(<<16#02, Val:32/big, Bin/binary>>, Props, StrictMode) ->
  484. parse_property(Bin, Props#{'Message-Expiry-Interval' => Val}, StrictMode);
  485. parse_property(<<16#03, Bin/binary>>, Props, StrictMode) ->
  486. {Val, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_content_type),
  487. parse_property(Rest, Props#{'Content-Type' => Val}, StrictMode);
  488. parse_property(<<16#08, Bin/binary>>, Props, StrictMode) ->
  489. {Val, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_response_topic),
  490. parse_property(Rest, Props#{'Response-Topic' => Val}, StrictMode);
  491. parse_property(<<16#09, Len:16/big, Val:Len/binary, Bin/binary>>, Props, StrictMode) ->
  492. parse_property(Bin, Props#{'Correlation-Data' => Val}, StrictMode);
  493. parse_property(<<16#0B, Bin/binary>>, Props, StrictMode) ->
  494. {Val, Rest} = parse_variable_byte_integer(Bin),
  495. parse_property(Rest, Props#{'Subscription-Identifier' => Val}, StrictMode);
  496. parse_property(<<16#11, Val:32/big, Bin/binary>>, Props, StrictMode) ->
  497. parse_property(Bin, Props#{'Session-Expiry-Interval' => Val}, StrictMode);
  498. parse_property(<<16#12, Bin/binary>>, Props, StrictMode) ->
  499. {Val, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_assigned_client_id),
  500. parse_property(Rest, Props#{'Assigned-Client-Identifier' => Val}, StrictMode);
  501. parse_property(<<16#13, Val:16, Bin/binary>>, Props, StrictMode) ->
  502. parse_property(Bin, Props#{'Server-Keep-Alive' => Val}, StrictMode);
  503. parse_property(<<16#15, Bin/binary>>, Props, StrictMode) ->
  504. {Val, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_authn_method),
  505. parse_property(Rest, Props#{'Authentication-Method' => Val}, StrictMode);
  506. parse_property(<<16#16, Len:16/big, Val:Len/binary, Bin/binary>>, Props, StrictMode) ->
  507. parse_property(Bin, Props#{'Authentication-Data' => Val}, StrictMode);
  508. parse_property(<<16#17, Val, Bin/binary>>, Props, StrictMode) ->
  509. parse_property(Bin, Props#{'Request-Problem-Information' => Val}, StrictMode);
  510. parse_property(<<16#18, Val:32, Bin/binary>>, Props, StrictMode) ->
  511. parse_property(Bin, Props#{'Will-Delay-Interval' => Val}, StrictMode);
  512. parse_property(<<16#19, Val, Bin/binary>>, Props, StrictMode) ->
  513. parse_property(Bin, Props#{'Request-Response-Information' => Val}, StrictMode);
  514. parse_property(<<16#1A, Bin/binary>>, Props, StrictMode) ->
  515. {Val, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_response_info),
  516. parse_property(Rest, Props#{'Response-Information' => Val}, StrictMode);
  517. parse_property(<<16#1C, Bin/binary>>, Props, StrictMode) ->
  518. {Val, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_server_reference),
  519. parse_property(Rest, Props#{'Server-Reference' => Val}, StrictMode);
  520. parse_property(<<16#1F, Bin/binary>>, Props, StrictMode) ->
  521. {Val, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_reason_string),
  522. parse_property(Rest, Props#{'Reason-String' => Val}, StrictMode);
  523. parse_property(<<16#21, Val:16/big, Bin/binary>>, Props, StrictMode) ->
  524. parse_property(Bin, Props#{'Receive-Maximum' => Val}, StrictMode);
  525. parse_property(<<16#22, Val:16/big, Bin/binary>>, Props, StrictMode) ->
  526. parse_property(Bin, Props#{'Topic-Alias-Maximum' => Val}, StrictMode);
  527. parse_property(<<16#23, Val:16/big, Bin/binary>>, Props, StrictMode) ->
  528. parse_property(Bin, Props#{'Topic-Alias' => Val}, StrictMode);
  529. parse_property(<<16#24, Val, Bin/binary>>, Props, StrictMode) ->
  530. parse_property(Bin, Props#{'Maximum-QoS' => Val}, StrictMode);
  531. parse_property(<<16#25, Val, Bin/binary>>, Props, StrictMode) ->
  532. parse_property(Bin, Props#{'Retain-Available' => Val}, StrictMode);
  533. parse_property(<<16#26, Bin/binary>>, Props, StrictMode) ->
  534. {Pair, Rest} = parse_utf8_pair(Bin, StrictMode),
  535. case maps:find('User-Property', Props) of
  536. {ok, UserProps} ->
  537. UserProps1 = lists:append(UserProps, [Pair]),
  538. parse_property(Rest, Props#{'User-Property' := UserProps1}, StrictMode);
  539. error ->
  540. parse_property(Rest, Props#{'User-Property' => [Pair]}, StrictMode)
  541. end;
  542. parse_property(<<16#27, Val:32, Bin/binary>>, Props, StrictMode) ->
  543. parse_property(Bin, Props#{'Maximum-Packet-Size' => Val}, StrictMode);
  544. parse_property(<<16#28, Val, Bin/binary>>, Props, StrictMode) ->
  545. parse_property(Bin, Props#{'Wildcard-Subscription-Available' => Val}, StrictMode);
  546. parse_property(<<16#29, Val, Bin/binary>>, Props, StrictMode) ->
  547. parse_property(Bin, Props#{'Subscription-Identifier-Available' => Val}, StrictMode);
  548. parse_property(<<16#2A, Val, Bin/binary>>, Props, StrictMode) ->
  549. parse_property(Bin, Props#{'Shared-Subscription-Available' => Val}, StrictMode);
  550. parse_property(<<Property:8, _Rest/binary>>, _Props, _StrictMode) ->
  551. ?PARSE_ERR(#{invalid_property_code => Property}).
  552. %% TODO: invalid property in specific packet.
  553. parse_variable_byte_integer(Bin) ->
  554. parse_variable_byte_integer(Bin, 1, 0).
  555. parse_variable_byte_integer(<<1:1, _Len:7, _Rest/binary>>, Multiplier, _Value) when
  556. Multiplier > ?MULTIPLIER_MAX
  557. ->
  558. ?PARSE_ERR(malformed_variable_byte_integer);
  559. parse_variable_byte_integer(<<1:1, Len:7, Rest/binary>>, Multiplier, Value) ->
  560. parse_variable_byte_integer(Rest, Multiplier * ?HIGHBIT, Value + Len * Multiplier);
  561. parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) ->
  562. {Value + Len * Multiplier, Rest}.
  563. parse_topic_filters(subscribe, Bin) ->
  564. [
  565. {Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => QoS}}
  566. || <<Len:16/big, Topic:Len/binary, _:2, Rh:2, Rap:1, Nl:1, QoS:2>> <= Bin
  567. ];
  568. parse_topic_filters(unsubscribe, Bin) ->
  569. [Topic || <<Len:16/big, Topic:Len/binary>> <= Bin].
  570. parse_reason_codes(Bin) ->
  571. [Code || <<Code>> <= Bin].
  572. parse_utf8_pair(
  573. <<Len1:16/big, Key:Len1/binary, Len2:16/big, Val:Len2/binary, Rest/binary>>,
  574. true
  575. ) ->
  576. {{validate_utf8(Key), validate_utf8(Val)}, Rest};
  577. parse_utf8_pair(
  578. <<Len1:16/big, Key:Len1/binary, Len2:16/big, Val:Len2/binary, Rest/binary>>,
  579. false
  580. ) ->
  581. {{Key, Val}, Rest};
  582. parse_utf8_pair(<<LenK:16/big, Rest/binary>>, _StrictMode) when
  583. LenK > byte_size(Rest)
  584. ->
  585. ?PARSE_ERR(#{
  586. hint => user_property_not_enough_bytes,
  587. parsed_key_length => LenK,
  588. remaining_bytes_length => byte_size(Rest)
  589. });
  590. %% key maybe malformed
  591. parse_utf8_pair(<<LenK:16/big, _Key:LenK/binary, LenV:16/big, Rest/binary>>, _StrictMode) when
  592. LenV > byte_size(Rest)
  593. ->
  594. ?PARSE_ERR(#{
  595. hint => malformed_user_property_value,
  596. parsed_key_length => LenK,
  597. parsed_value_length => LenV,
  598. remaining_bytes_length => byte_size(Rest)
  599. });
  600. parse_utf8_pair(Bin, _StrictMode) when
  601. 4 > byte_size(Bin)
  602. ->
  603. ?PARSE_ERR(#{
  604. hint => user_property_not_enough_bytes,
  605. total_bytes => byte_size(Bin)
  606. }).
  607. parse_utf8_string_with_hint(Bin, StrictMode, Hint) ->
  608. try
  609. parse_utf8_string(Bin, StrictMode)
  610. catch
  611. throw:{?FRAME_PARSE_ERROR, Reason} when is_map(Reason) ->
  612. ?PARSE_ERR(Reason#{hint => Hint})
  613. end.
  614. parse_optional(Bin, F, true) ->
  615. F(Bin);
  616. parse_optional(Bin, _F, false) ->
  617. {undefined, Bin}.
  618. parse_utf8_string(<<Len:16/big, Str:Len/binary, Rest/binary>>, true) ->
  619. {validate_utf8(Str), Rest};
  620. parse_utf8_string(<<Len:16/big, Str:Len/binary, Rest/binary>>, false) ->
  621. {Str, Rest};
  622. parse_utf8_string(<<Len:16/big, Rest/binary>>, _) when
  623. Len > byte_size(Rest)
  624. ->
  625. ?PARSE_ERR(#{
  626. hint => malformed_utf8_string,
  627. parsed_length => Len,
  628. remaining_bytes_length => byte_size(Rest)
  629. });
  630. parse_utf8_string(Bin, _) when
  631. 2 > byte_size(Bin)
  632. ->
  633. ?PARSE_ERR(#{reason => malformed_utf8_string_length}).
  634. parse_binary_data(<<Len:16/big, Data:Len/binary, Rest/binary>>) ->
  635. {Data, Rest};
  636. parse_binary_data(<<Len:16/big, Rest/binary>>) when
  637. Len > byte_size(Rest)
  638. ->
  639. ?PARSE_ERR(#{
  640. hint => malformed_binary_data,
  641. parsed_length => Len,
  642. remaining_bytes_length => byte_size(Rest)
  643. });
  644. parse_binary_data(Bin) when
  645. 2 > byte_size(Bin)
  646. ->
  647. ?PARSE_ERR(malformed_binary_data_length).
  648. %%--------------------------------------------------------------------
  649. %% Serialize MQTT Packet
  650. %%--------------------------------------------------------------------
  651. serialize_fun() -> serialize_fun(?DEFAULT_OPTIONS).
  652. serialize_fun(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) ->
  653. MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE),
  654. serialize_fun(#{version => ProtoVer, max_size => MaxSize});
  655. serialize_fun(#{version := Ver, max_size := MaxSize}) ->
  656. fun(Packet) ->
  657. IoData = serialize(Packet, Ver),
  658. case is_too_large(IoData, MaxSize) of
  659. true -> <<>>;
  660. false -> IoData
  661. end
  662. end.
  663. serialize_opts() ->
  664. ?DEFAULT_OPTIONS.
  665. serialize_opts(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) ->
  666. MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE),
  667. #{version => ProtoVer, max_size => MaxSize}.
  668. serialize_pkt(Packet, #{version := Ver, max_size := MaxSize}) ->
  669. IoData = serialize(Packet, Ver),
  670. case is_too_large(IoData, MaxSize) of
  671. true -> <<>>;
  672. false -> IoData
  673. end.
  674. -spec serialize(emqx_types:packet()) -> iodata().
  675. serialize(Packet) -> serialize(Packet, ?MQTT_PROTO_V4).
  676. -spec serialize(emqx_types:packet(), emqx_types:proto_ver()) -> iodata().
  677. serialize(
  678. #mqtt_packet{
  679. header = Header,
  680. variable = Variable,
  681. payload = Payload
  682. },
  683. Ver
  684. ) ->
  685. serialize(Header, serialize_variable(Variable, Ver), serialize_payload(Payload)).
  686. serialize(
  687. #mqtt_packet_header{
  688. type = Type,
  689. dup = Dup,
  690. qos = QoS,
  691. retain = Retain
  692. },
  693. VariableBin,
  694. PayloadBin
  695. ) when
  696. ?CONNECT =< Type andalso Type =< ?AUTH
  697. ->
  698. Len = iolist_size(VariableBin) + iolist_size(PayloadBin),
  699. [
  700. <<Type:4, (flag(Dup)):1, (flag(QoS)):2, (flag(Retain)):1>>,
  701. serialize_remaining_len(Len),
  702. VariableBin,
  703. PayloadBin
  704. ].
  705. serialize_variable(
  706. #mqtt_packet_connect{
  707. proto_name = ProtoName,
  708. proto_ver = ProtoVer,
  709. is_bridge = IsBridge,
  710. clean_start = CleanStart,
  711. will_flag = WillFlag,
  712. will_qos = WillQoS,
  713. will_retain = WillRetain,
  714. keepalive = KeepAlive,
  715. properties = Properties,
  716. clientid = ClientId,
  717. will_props = WillProps,
  718. will_topic = WillTopic,
  719. will_payload = WillPayload,
  720. username = Username,
  721. password = Password
  722. },
  723. _Ver
  724. ) ->
  725. [
  726. serialize_binary_data(ProtoName),
  727. <<
  728. (case IsBridge of
  729. true -> 16#80 + ProtoVer;
  730. false -> ProtoVer
  731. end):8,
  732. (flag(Username)):1,
  733. (flag(Password)):1,
  734. (flag(WillRetain)):1,
  735. WillQoS:2,
  736. (flag(WillFlag)):1,
  737. (flag(CleanStart)):1,
  738. 0:1,
  739. KeepAlive:16/big-unsigned-integer
  740. >>,
  741. serialize_properties(Properties, ProtoVer),
  742. serialize_utf8_string(ClientId),
  743. case WillFlag of
  744. true ->
  745. [
  746. serialize_properties(WillProps, ProtoVer),
  747. serialize_utf8_string(WillTopic),
  748. serialize_binary_data(WillPayload)
  749. ];
  750. false ->
  751. <<>>
  752. end,
  753. serialize_utf8_string(Username, true),
  754. serialize_utf8_string(Password, true)
  755. ];
  756. serialize_variable(
  757. #mqtt_packet_connack{
  758. ack_flags = AckFlags,
  759. reason_code = ReasonCode,
  760. properties = Properties
  761. },
  762. Ver
  763. ) ->
  764. [AckFlags, ReasonCode, serialize_properties(Properties, Ver)];
  765. serialize_variable(
  766. #mqtt_packet_publish{
  767. topic_name = TopicName,
  768. packet_id = PacketId,
  769. properties = Properties
  770. },
  771. Ver
  772. ) ->
  773. [
  774. serialize_utf8_string(TopicName),
  775. case PacketId of
  776. undefined -> <<>>;
  777. _ -> <<PacketId:16/big-unsigned-integer>>
  778. end,
  779. serialize_properties(Properties, Ver)
  780. ];
  781. serialize_variable(#mqtt_packet_puback{packet_id = PacketId}, Ver) when
  782. Ver == ?MQTT_PROTO_V3; Ver == ?MQTT_PROTO_V4
  783. ->
  784. <<PacketId:16/big-unsigned-integer>>;
  785. serialize_variable(
  786. #mqtt_packet_puback{
  787. packet_id = PacketId,
  788. reason_code = ReasonCode,
  789. properties = Properties
  790. },
  791. Ver = ?MQTT_PROTO_V5
  792. ) ->
  793. [
  794. <<PacketId:16/big-unsigned-integer>>,
  795. ReasonCode,
  796. serialize_properties(Properties, Ver)
  797. ];
  798. serialize_variable(
  799. #mqtt_packet_subscribe{
  800. packet_id = PacketId,
  801. properties = Properties,
  802. topic_filters = TopicFilters
  803. },
  804. Ver
  805. ) ->
  806. [
  807. <<PacketId:16/big-unsigned-integer>>,
  808. serialize_properties(Properties, Ver),
  809. serialize_topic_filters(subscribe, TopicFilters, Ver)
  810. ];
  811. serialize_variable(
  812. #mqtt_packet_suback{
  813. packet_id = PacketId,
  814. properties = Properties,
  815. reason_codes = ReasonCodes
  816. },
  817. Ver
  818. ) ->
  819. [
  820. <<PacketId:16/big-unsigned-integer>>,
  821. serialize_properties(Properties, Ver),
  822. serialize_reason_codes(ReasonCodes)
  823. ];
  824. serialize_variable(
  825. #mqtt_packet_unsubscribe{
  826. packet_id = PacketId,
  827. properties = Properties,
  828. topic_filters = TopicFilters
  829. },
  830. Ver
  831. ) ->
  832. [
  833. <<PacketId:16/big-unsigned-integer>>,
  834. serialize_properties(Properties, Ver),
  835. serialize_topic_filters(unsubscribe, TopicFilters, Ver)
  836. ];
  837. serialize_variable(
  838. #mqtt_packet_unsuback{
  839. packet_id = PacketId,
  840. properties = Properties,
  841. reason_codes = ReasonCodes
  842. },
  843. Ver
  844. ) ->
  845. [
  846. <<PacketId:16/big-unsigned-integer>>,
  847. serialize_properties(Properties, Ver),
  848. serialize_reason_codes(ReasonCodes)
  849. ];
  850. serialize_variable(#mqtt_packet_disconnect{}, Ver) when
  851. Ver == ?MQTT_PROTO_V3; Ver == ?MQTT_PROTO_V4
  852. ->
  853. <<>>;
  854. serialize_variable(
  855. #mqtt_packet_disconnect{
  856. reason_code = ReasonCode,
  857. properties = Properties
  858. },
  859. Ver = ?MQTT_PROTO_V5
  860. ) ->
  861. [ReasonCode, serialize_properties(Properties, Ver)];
  862. serialize_variable(#mqtt_packet_disconnect{}, _Ver) ->
  863. <<>>;
  864. serialize_variable(
  865. #mqtt_packet_auth{
  866. reason_code = ReasonCode,
  867. properties = Properties
  868. },
  869. Ver = ?MQTT_PROTO_V5
  870. ) ->
  871. [ReasonCode, serialize_properties(Properties, Ver)];
  872. serialize_variable(PacketId, ?MQTT_PROTO_V3) when is_integer(PacketId) ->
  873. <<PacketId:16/big-unsigned-integer>>;
  874. serialize_variable(PacketId, ?MQTT_PROTO_V4) when is_integer(PacketId) ->
  875. <<PacketId:16/big-unsigned-integer>>;
  876. serialize_variable(undefined, _Ver) ->
  877. <<>>.
  878. serialize_payload(undefined) -> <<>>;
  879. serialize_payload(Bin) -> Bin.
  880. serialize_properties(_Props, Ver) when Ver =/= ?MQTT_PROTO_V5 ->
  881. <<>>;
  882. serialize_properties(Props, ?MQTT_PROTO_V5) ->
  883. serialize_properties(Props).
  884. serialize_properties(undefined) ->
  885. <<0>>;
  886. serialize_properties(Props) when map_size(Props) == 0 ->
  887. <<0>>;
  888. serialize_properties(Props) when is_map(Props) ->
  889. Bin = <<<<(serialize_property(Prop, Val))/binary>> || {Prop, Val} <- maps:to_list(Props)>>,
  890. [serialize_variable_byte_integer(byte_size(Bin)), Bin].
  891. serialize_property(_, Disabled) when Disabled =:= disabled; Disabled =:= undefined ->
  892. <<>>;
  893. serialize_property('Payload-Format-Indicator', Val) ->
  894. <<16#01, Val>>;
  895. serialize_property('Message-Expiry-Interval', Val) ->
  896. <<16#02, Val:32/big>>;
  897. serialize_property('Content-Type', Val) ->
  898. <<16#03, (serialize_utf8_string(Val))/binary>>;
  899. serialize_property('Response-Topic', Val) ->
  900. <<16#08, (serialize_utf8_string(Val))/binary>>;
  901. serialize_property('Correlation-Data', Val) ->
  902. <<16#09, (byte_size(Val)):16, Val/binary>>;
  903. serialize_property('Subscription-Identifier', Val) ->
  904. <<16#0B, (serialize_variable_byte_integer(Val))/binary>>;
  905. serialize_property('Session-Expiry-Interval', Val) ->
  906. <<16#11, Val:32/big>>;
  907. serialize_property('Assigned-Client-Identifier', Val) ->
  908. <<16#12, (serialize_utf8_string(Val))/binary>>;
  909. serialize_property('Server-Keep-Alive', Val) ->
  910. <<16#13, Val:16/big>>;
  911. serialize_property('Authentication-Method', Val) ->
  912. <<16#15, (serialize_utf8_string(Val))/binary>>;
  913. serialize_property('Authentication-Data', Val) ->
  914. <<16#16, (iolist_size(Val)):16, Val/binary>>;
  915. serialize_property('Request-Problem-Information', Val) ->
  916. <<16#17, Val>>;
  917. serialize_property('Will-Delay-Interval', Val) ->
  918. <<16#18, Val:32/big>>;
  919. serialize_property('Request-Response-Information', Val) ->
  920. <<16#19, Val>>;
  921. serialize_property('Response-Information', Val) ->
  922. <<16#1A, (serialize_utf8_string(Val))/binary>>;
  923. serialize_property('Server-Reference', Val) ->
  924. <<16#1C, (serialize_utf8_string(Val))/binary>>;
  925. serialize_property('Reason-String', Val) ->
  926. <<16#1F, (serialize_utf8_string(Val))/binary>>;
  927. serialize_property('Receive-Maximum', Val) ->
  928. <<16#21, Val:16/big>>;
  929. serialize_property('Topic-Alias-Maximum', Val) ->
  930. <<16#22, Val:16/big>>;
  931. serialize_property('Topic-Alias', Val) ->
  932. <<16#23, Val:16/big>>;
  933. serialize_property('Maximum-QoS', Val) ->
  934. <<16#24, Val>>;
  935. serialize_property('Retain-Available', Val) ->
  936. <<16#25, Val>>;
  937. serialize_property('User-Property', {Key, Val}) ->
  938. <<16#26, (serialize_utf8_pair({Key, Val}))/binary>>;
  939. serialize_property('User-Property', Props) when is_list(Props) ->
  940. <<
  941. <<(serialize_property('User-Property', {Key, Val}))/binary>>
  942. || {Key, Val} <- Props
  943. >>;
  944. serialize_property('Maximum-Packet-Size', Val) ->
  945. <<16#27, Val:32/big>>;
  946. serialize_property('Wildcard-Subscription-Available', Val) ->
  947. <<16#28, Val>>;
  948. serialize_property('Subscription-Identifier-Available', Val) ->
  949. <<16#29, Val>>;
  950. serialize_property('Shared-Subscription-Available', Val) ->
  951. <<16#2A, Val>>.
  952. serialize_topic_filters(subscribe, TopicFilters, ?MQTT_PROTO_V5) ->
  953. <<
  954. <<
  955. (serialize_utf8_string(Topic))/binary,
  956. ?RESERVED:2,
  957. Rh:2,
  958. (flag(Rap)):1,
  959. (flag(Nl)):1,
  960. QoS:2
  961. >>
  962. || {Topic, #{rh := Rh, rap := Rap, nl := Nl, qos := QoS}} <- TopicFilters
  963. >>;
  964. serialize_topic_filters(subscribe, TopicFilters, _Ver) ->
  965. <<
  966. <<(serialize_utf8_string(Topic))/binary, ?RESERVED:6, QoS:2>>
  967. || {Topic, #{qos := QoS}} <- TopicFilters
  968. >>;
  969. serialize_topic_filters(unsubscribe, TopicFilters, _Ver) ->
  970. <<<<(serialize_utf8_string(Topic))/binary>> || Topic <- TopicFilters>>.
  971. serialize_reason_codes(undefined) ->
  972. <<>>;
  973. serialize_reason_codes(ReasonCodes) when is_list(ReasonCodes) ->
  974. <<<<Code>> || Code <- ReasonCodes>>.
  975. serialize_utf8_pair({Name, Value}) ->
  976. <<(serialize_utf8_string(Name))/binary, (serialize_utf8_string(Value))/binary>>.
  977. serialize_binary_data(Bin) ->
  978. [<<(byte_size(Bin)):16/big-unsigned-integer>>, Bin].
  979. serialize_utf8_string(undefined, false) ->
  980. ?SERIALIZE_ERR(utf8_string_undefined);
  981. serialize_utf8_string(undefined, true) ->
  982. <<>>;
  983. serialize_utf8_string(String, _AllowNull) ->
  984. serialize_utf8_string(String).
  985. serialize_utf8_string(String) ->
  986. StringBin = unicode:characters_to_binary(String),
  987. Len = byte_size(StringBin),
  988. true = (Len =< 16#ffff),
  989. <<Len:16/big, StringBin/binary>>.
  990. serialize_remaining_len(I) ->
  991. serialize_variable_byte_integer(I).
  992. serialize_variable_byte_integer(N) when N =< ?LOWBITS ->
  993. <<0:1, N:7>>;
  994. serialize_variable_byte_integer(N) ->
  995. <<1:1, (N rem ?HIGHBIT):7, (serialize_variable_byte_integer(N div ?HIGHBIT))/binary>>.
  996. %% Is the frame too large?
  997. -spec is_too_large(iodata(), pos_integer()) -> boolean().
  998. is_too_large(IoData, MaxSize) ->
  999. iolist_size(IoData) >= MaxSize.
  1000. get_property(_Key, undefined, Default) ->
  1001. Default;
  1002. get_property(Key, Props, Default) ->
  1003. maps:get(Key, Props, Default).
  1004. %% Validate header if sctrict mode. See: mqtt-v5.0: 2.1.3 Flags
  1005. validate_header(?CONNECT, 0, 0, 0) -> ok;
  1006. validate_header(?CONNACK, 0, 0, 0) -> ok;
  1007. validate_header(?PUBLISH, 0, ?QOS_0, _) -> ok;
  1008. validate_header(?PUBLISH, _, ?QOS_1, _) -> ok;
  1009. validate_header(?PUBLISH, 0, ?QOS_2, _) -> ok;
  1010. validate_header(?PUBACK, 0, 0, 0) -> ok;
  1011. validate_header(?PUBREC, 0, 0, 0) -> ok;
  1012. validate_header(?PUBREL, 0, 1, 0) -> ok;
  1013. validate_header(?PUBCOMP, 0, 0, 0) -> ok;
  1014. validate_header(?SUBSCRIBE, 0, 1, 0) -> ok;
  1015. validate_header(?SUBACK, 0, 0, 0) -> ok;
  1016. validate_header(?UNSUBSCRIBE, 0, 1, 0) -> ok;
  1017. validate_header(?UNSUBACK, 0, 0, 0) -> ok;
  1018. validate_header(?PINGREQ, 0, 0, 0) -> ok;
  1019. validate_header(?PINGRESP, 0, 0, 0) -> ok;
  1020. validate_header(?DISCONNECT, 0, 0, 0) -> ok;
  1021. validate_header(?AUTH, 0, 0, 0) -> ok;
  1022. validate_header(_Type, _Dup, _QoS, _Rt) -> ?PARSE_ERR(bad_frame_header).
  1023. -compile({inline, [validate_packet_id/1]}).
  1024. validate_packet_id(0) -> ?PARSE_ERR(bad_packet_id);
  1025. validate_packet_id(_) -> ok.
  1026. validate_subqos([3 | _]) -> ?PARSE_ERR(bad_subqos);
  1027. validate_subqos([_ | T]) -> validate_subqos(T);
  1028. validate_subqos([]) -> ok.
  1029. bool(0) -> false;
  1030. bool(1) -> true.
  1031. flag(undefined) -> ?RESERVED;
  1032. flag(false) -> 0;
  1033. flag(true) -> 1;
  1034. flag(X) when is_integer(X) -> X;
  1035. flag(B) when is_binary(B) -> 1.
  1036. fixqos(?PUBREL, 0) -> 1;
  1037. fixqos(?SUBSCRIBE, 0) -> 1;
  1038. fixqos(?UNSUBSCRIBE, 0) -> 1;
  1039. fixqos(_Type, QoS) -> QoS.
  1040. validate_utf8(Bin) ->
  1041. case unicode:characters_to_binary(Bin) of
  1042. {error, _, _} ->
  1043. ?PARSE_ERR(utf8_string_invalid);
  1044. {incomplete, _, _} ->
  1045. ?PARSE_ERR(utf8_string_invalid);
  1046. Bin when is_binary(Bin) ->
  1047. case validate_mqtt_utf8_char(Bin) of
  1048. true -> Bin;
  1049. false -> ?PARSE_ERR(utf8_string_invalid)
  1050. end
  1051. end.
  1052. %% Is the utf8 string respecting UTF-8 characters defined by MQTT Spec?
  1053. %% i.e. contains invalid UTF-8 char or control char
  1054. validate_mqtt_utf8_char(<<>>) ->
  1055. true;
  1056. %% ==== 1-Byte UTF-8 invalid: [[U+0000 .. U+001F] && [U+007F]]
  1057. validate_mqtt_utf8_char(<<B1, Bs/binary>>) when
  1058. B1 >= 16#20, B1 =< 16#7E
  1059. ->
  1060. validate_mqtt_utf8_char(Bs);
  1061. validate_mqtt_utf8_char(<<B1, _Bs/binary>>) when
  1062. B1 >= 16#00, B1 =< 16#1F;
  1063. B1 =:= 16#7F
  1064. ->
  1065. %% [U+0000 .. U+001F] && [U+007F]
  1066. false;
  1067. %% ==== 2-Bytes UTF-8 invalid: [U+0080 .. U+009F]
  1068. validate_mqtt_utf8_char(<<B1, B2, Bs/binary>>) when
  1069. B1 =:= 16#C2;
  1070. B2 >= 16#A0, B2 =< 16#BF;
  1071. B1 > 16#C3, B1 =< 16#DE;
  1072. B2 >= 16#80, B2 =< 16#BF
  1073. ->
  1074. validate_mqtt_utf8_char(Bs);
  1075. validate_mqtt_utf8_char(<<16#C2, B2, _Bs/binary>>) when
  1076. B2 >= 16#80, B2 =< 16#9F
  1077. ->
  1078. %% [U+0080 .. U+009F]
  1079. false;
  1080. %% ==== 3-Bytes UTF-8 invalid: [U+D800 .. U+DFFF]
  1081. validate_mqtt_utf8_char(<<B1, _B2, _B3, Bs/binary>>) when
  1082. B1 >= 16#E0, B1 =< 16#EE;
  1083. B1 =:= 16#EF
  1084. ->
  1085. validate_mqtt_utf8_char(Bs);
  1086. validate_mqtt_utf8_char(<<16#ED, _B2, _B3, _Bs/binary>>) ->
  1087. false;
  1088. %% ==== 4-Bytes UTF-8
  1089. validate_mqtt_utf8_char(<<B1, _B2, _B3, _B4, Bs/binary>>) when
  1090. B1 =:= 16#0F
  1091. ->
  1092. validate_mqtt_utf8_char(Bs).