emqx_frame_SUITE.erl 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2018-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_frame_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("emqx/include/emqx_mqtt.hrl").
  20. -include_lib("eunit/include/eunit.hrl").
  21. -include_lib("common_test/include/ct.hrl").
  22. -define(ASSERT_FRAME_THROW(Reason, Expr),
  23. ?assertThrow({?FRAME_PARSE_ERROR, Reason}, Expr)
  24. ).
  25. all() ->
  26. [
  27. {group, parse},
  28. {group, connect},
  29. {group, connack},
  30. {group, publish},
  31. {group, puback},
  32. {group, subscribe},
  33. {group, suback},
  34. {group, unsubscribe},
  35. {group, unsuback},
  36. {group, ping},
  37. {group, disconnect},
  38. {group, auth}
  39. ].
  40. groups() ->
  41. [
  42. {parse, [parallel], [
  43. t_parse_cont,
  44. t_parse_frame_too_large,
  45. t_parse_frame_malformed_variable_byte_integer,
  46. t_parse_malformed_utf8_string
  47. ]},
  48. {connect, [parallel], [
  49. t_serialize_parse_v3_connect,
  50. t_serialize_parse_v4_connect,
  51. t_serialize_parse_v5_connect,
  52. t_serialize_parse_connect_without_clientid,
  53. t_serialize_parse_connect_with_will,
  54. t_serialize_parse_connect_with_malformed_will,
  55. t_serialize_parse_bridge_connect,
  56. t_parse_invalid_remaining_len,
  57. t_parse_malformed_properties,
  58. t_malformed_connect_header,
  59. t_malformed_connect_data,
  60. t_reserved_connect_flag,
  61. t_invalid_clientid
  62. ]},
  63. {connack, [parallel], [
  64. t_serialize_parse_connack,
  65. t_serialize_parse_connack_v5
  66. ]},
  67. {publish, [parallel], [
  68. t_parse_sticky_frames,
  69. t_serialize_parse_qos0_publish,
  70. t_serialize_parse_qos1_publish,
  71. t_serialize_parse_qos2_publish,
  72. t_serialize_parse_publish_v5
  73. ]},
  74. {puback, [parallel], [
  75. t_serialize_parse_puback,
  76. t_serialize_parse_puback_v3_4,
  77. t_serialize_parse_puback_v5,
  78. t_serialize_parse_pubrec,
  79. t_serialize_parse_pubrec_v5,
  80. t_serialize_parse_pubrel,
  81. t_serialize_parse_pubrel_v5,
  82. t_serialize_parse_pubcomp,
  83. t_serialize_parse_pubcomp_v5
  84. ]},
  85. {subscribe, [parallel], [
  86. t_serialize_parse_subscribe,
  87. t_serialize_parse_subscribe_v5
  88. ]},
  89. {suback, [parallel], [
  90. t_serialize_parse_suback,
  91. t_serialize_parse_suback_v5
  92. ]},
  93. {unsubscribe, [parallel], [
  94. t_serialize_parse_unsubscribe,
  95. t_serialize_parse_unsubscribe_v5
  96. ]},
  97. {unsuback, [parallel], [
  98. t_serialize_parse_unsuback,
  99. t_serialize_parse_unsuback_v5
  100. ]},
  101. {ping, [parallel], [
  102. t_serialize_parse_pingreq,
  103. t_serialize_parse_pingresp
  104. ]},
  105. {disconnect, [parallel], [
  106. t_serialize_parse_disconnect,
  107. t_serialize_parse_disconnect_v5
  108. ]},
  109. {auth, [parallel], [t_serialize_parse_auth_v5]}
  110. ].
  111. init_per_suite(Config) ->
  112. Config.
  113. end_per_suite(_Config) ->
  114. ok.
  115. init_per_group(_Group, Config) ->
  116. Config.
  117. end_per_group(_Group, _Config) ->
  118. ok.
  119. t_parse_cont(_) ->
  120. Packet = ?CONNECT_PACKET(#mqtt_packet_connect{}),
  121. ParseState = emqx_frame:initial_parse_state(),
  122. <<HdrBin:1/binary, LenBin:1/binary, RestBin/binary>> = serialize_to_binary(Packet),
  123. {more, ContParse} = emqx_frame:parse(<<>>, ParseState),
  124. {more, ContParse1} = emqx_frame:parse(HdrBin, ContParse),
  125. {more, ContParse2} = emqx_frame:parse(LenBin, ContParse1),
  126. {more, ContParse3} = emqx_frame:parse(<<>>, ContParse2),
  127. {ok, Packet, <<>>, _} = emqx_frame:parse(RestBin, ContParse3).
  128. t_parse_frame_too_large(_) ->
  129. Packet = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, payload(1000)),
  130. ?ASSERT_FRAME_THROW(#{cause := frame_too_large}, parse_serialize(Packet, #{max_size => 256})),
  131. ?ASSERT_FRAME_THROW(#{cause := frame_too_large}, parse_serialize(Packet, #{max_size => 512})),
  132. ?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})).
  133. t_parse_frame_malformed_variable_byte_integer(_) ->
  134. MalformedPayload = <<<<16#80>> || _ <- lists:seq(1, 6)>>,
  135. ParseState = emqx_frame:initial_parse_state(#{}),
  136. ?ASSERT_FRAME_THROW(
  137. malformed_variable_byte_integer,
  138. emqx_frame:parse(MalformedPayload, ParseState)
  139. ).
  140. t_parse_malformed_utf8_string(_) ->
  141. MalformedPacket =
  142. <<16, 31, 0, 4,
  143. %% Specification name, should be "MQTT"
  144. %% 77,81,84,84,
  145. %% malformed 1-Byte UTF-8 in (U+0000 .. U+001F] && [U+007F])
  146. 16#00, 16#01, 16#1F, 16#7F,
  147. 4, 194, 0, 60, 0, 4, 101, 109, 113, 120, 0, 5, 97, 100, 109, 105, 110, 0, 6, 112, 117,
  148. 98, 108, 105, 99>>,
  149. ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}),
  150. ?ASSERT_FRAME_THROW(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)).
  151. t_serialize_parse_v3_connect(_) ->
  152. Bin =
  153. <<16, 37, 0, 6, 77, 81, 73, 115, 100, 112, 3, 2, 0, 60, 0, 23, 109, 111, 115, 113, 112, 117,
  154. 98, 47, 49, 48, 52, 53, 49, 45, 105, 77, 97, 99, 46, 108, 111, 99, 97>>,
  155. Packet = ?CONNECT_PACKET(
  156. #mqtt_packet_connect{
  157. proto_ver = ?MQTT_PROTO_V3,
  158. proto_name = <<"MQIsdp">>,
  159. clientid = <<"mosqpub/10451-iMac.loca">>,
  160. clean_start = true,
  161. keepalive = 60
  162. }
  163. ),
  164. {ok, Packet, <<>>, PState} = emqx_frame:parse(Bin),
  165. ?assertMatch({none, #{version := ?MQTT_PROTO_V3}}, PState).
  166. t_serialize_parse_v4_connect(_) ->
  167. Bin =
  168. <<16, 35, 0, 4, 77, 81, 84, 84, 4, 2, 0, 60, 0, 23, 109, 111, 115, 113, 112, 117, 98, 47,
  169. 49, 48, 52, 53, 49, 45, 105, 77, 97, 99, 46, 108, 111, 99, 97>>,
  170. Packet = ?CONNECT_PACKET(
  171. #mqtt_packet_connect{
  172. proto_ver = ?MQTT_PROTO_V4,
  173. proto_name = <<"MQTT">>,
  174. clientid = <<"mosqpub/10451-iMac.loca">>,
  175. clean_start = true,
  176. keepalive = 60
  177. }
  178. ),
  179. ?assertEqual(Bin, serialize_to_binary(Packet)),
  180. ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)).
  181. t_serialize_parse_v5_connect(_) ->
  182. Props = #{
  183. 'Session-Expiry-Interval' => 60,
  184. 'Receive-Maximum' => 100,
  185. 'Maximum-QoS' => ?QOS_2,
  186. 'Retain-Available' => 1,
  187. 'Maximum-Packet-Size' => 1024,
  188. 'Topic-Alias-Maximum' => 10,
  189. 'Request-Response-Information' => 1,
  190. 'Request-Problem-Information' => 1,
  191. 'Authentication-Method' => <<"oauth2">>,
  192. 'Authentication-Data' => <<"33kx93k">>
  193. },
  194. WillProps = #{
  195. 'Will-Delay-Interval' => 60,
  196. 'Payload-Format-Indicator' => 1,
  197. 'Message-Expiry-Interval' => 60,
  198. 'Content-Type' => <<"text/json">>,
  199. 'Response-Topic' => <<"topic">>,
  200. 'Correlation-Data' => <<"correlateid">>,
  201. 'User-Property' => [{<<"k">>, <<"v">>}]
  202. },
  203. Packet = ?CONNECT_PACKET(
  204. #mqtt_packet_connect{
  205. proto_name = <<"MQTT">>,
  206. proto_ver = ?MQTT_PROTO_V5,
  207. is_bridge = false,
  208. clean_start = true,
  209. clientid = <<>>,
  210. will_flag = true,
  211. will_qos = ?QOS_1,
  212. will_retain = false,
  213. keepalive = 60,
  214. properties = Props,
  215. will_props = WillProps,
  216. will_topic = <<"topic">>,
  217. will_payload = <<>>,
  218. username = <<"device:1">>,
  219. password = <<"passwd">>
  220. }
  221. ),
  222. ?assertEqual(Packet, parse_serialize(Packet)).
  223. t_serialize_parse_connect_without_clientid(_) ->
  224. Bin = <<16, 12, 0, 4, 77, 81, 84, 84, 4, 2, 0, 60, 0, 0>>,
  225. Packet = ?CONNECT_PACKET(#mqtt_packet_connect{
  226. proto_ver = ?MQTT_PROTO_V4,
  227. proto_name = <<"MQTT">>,
  228. clientid = <<>>,
  229. clean_start = true,
  230. keepalive = 60
  231. }),
  232. ?assertEqual(Bin, serialize_to_binary(Packet)),
  233. ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)).
  234. t_serialize_parse_connect_with_will(_) ->
  235. Bin =
  236. <<16, 67, 0, 6, 77, 81, 73, 115, 100, 112, 3, 206, 0, 60, 0, 23, 109, 111, 115, 113, 112,
  237. 117, 98, 47, 49, 48, 52, 53, 50, 45, 105, 77, 97, 99, 46, 108, 111, 99, 97, 0, 5, 47,
  238. 119, 105, 108, 108, 0, 7, 119, 105, 108, 108, 109, 115, 103, 0, 4, 116, 101, 115, 116,
  239. 0, 6, 112, 117, 98, 108, 105, 99>>,
  240. Packet = #mqtt_packet{
  241. header = #mqtt_packet_header{type = ?CONNECT},
  242. variable = #mqtt_packet_connect{
  243. proto_ver = ?MQTT_PROTO_V3,
  244. proto_name = <<"MQIsdp">>,
  245. clientid = <<"mosqpub/10452-iMac.loca">>,
  246. clean_start = true,
  247. keepalive = 60,
  248. will_retain = false,
  249. will_qos = ?QOS_1,
  250. will_flag = true,
  251. will_topic = <<"/will">>,
  252. will_payload = <<"willmsg">>,
  253. username = <<"test">>,
  254. password = <<"public">>
  255. }
  256. },
  257. ?assertEqual(Bin, serialize_to_binary(Packet)),
  258. ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)).
  259. t_serialize_parse_connect_with_malformed_will(_) ->
  260. Packet2 = #mqtt_packet{
  261. header = #mqtt_packet_header{type = ?CONNECT},
  262. variable = #mqtt_packet_connect{
  263. proto_ver = ?MQTT_PROTO_V3,
  264. proto_name = <<"MQIsdp">>,
  265. clientid = <<"mosqpub/10452-iMac.loca">>,
  266. clean_start = true,
  267. keepalive = 60,
  268. will_retain = false,
  269. will_qos = ?QOS_1,
  270. will_flag = true,
  271. will_topic = <<"/will">>,
  272. will_payload = <<>>
  273. }
  274. },
  275. <<16, 46, Body:44/binary, 0, 0>> = serialize_to_binary(Packet2),
  276. %% too short
  277. BadBin1 = <<16, 45, Body/binary, 0>>,
  278. ?ASSERT_FRAME_THROW(
  279. #{cause := malformed_will_payload, length_bytes := 1, expected_bytes := 2},
  280. emqx_frame:parse(BadBin1)
  281. ),
  282. %% too long
  283. BadBin2 = <<16, 47, Body/binary, 0, 2, 0>>,
  284. ?ASSERT_FRAME_THROW(
  285. #{cause := malformed_will_payload, parsed_length := 2, remaining_bytes := 1},
  286. emqx_frame:parse(BadBin2)
  287. ),
  288. ok.
  289. t_serialize_parse_bridge_connect(_) ->
  290. Bin =
  291. <<16, 86, 0, 6, 77, 81, 73, 115, 100, 112, 131, 44, 0, 60, 0, 19, 67, 95, 48, 48, 58, 48,
  292. 67, 58, 50, 57, 58, 50, 66, 58, 55, 55, 58, 53, 50, 0, 48, 36, 83, 89, 83, 47, 98, 114,
  293. 111, 107, 101, 114, 47, 99, 111, 110, 110, 101, 99, 116, 105, 111, 110, 47, 67, 95, 48,
  294. 48, 58, 48, 67, 58, 50, 57, 58, 50, 66, 58, 55, 55, 58, 53, 50, 47, 115, 116, 97, 116,
  295. 101, 0, 1, 48>>,
  296. Topic = <<"$SYS/broker/connection/C_00:0C:29:2B:77:52/state">>,
  297. Packet = #mqtt_packet{
  298. header = #mqtt_packet_header{type = ?CONNECT},
  299. variable = #mqtt_packet_connect{
  300. clientid = <<"C_00:0C:29:2B:77:52">>,
  301. proto_ver = 16#03,
  302. proto_name = <<"MQIsdp">>,
  303. is_bridge = true,
  304. will_retain = true,
  305. will_qos = ?QOS_1,
  306. will_flag = true,
  307. clean_start = false,
  308. keepalive = 60,
  309. will_topic = Topic,
  310. will_payload = <<"0">>
  311. }
  312. },
  313. ?assertEqual(Bin, serialize_to_binary(Packet)),
  314. ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)),
  315. Packet1 = ?CONNECT_PACKET(#mqtt_packet_connect{is_bridge = true}),
  316. ?assertEqual(Packet1, parse_serialize(Packet1)).
  317. t_serialize_parse_connack(_) ->
  318. Packet = ?CONNACK_PACKET(?RC_SUCCESS),
  319. ?assertEqual(<<32, 2, 0, 0>>, serialize_to_binary(Packet)),
  320. ?assertEqual(Packet, parse_serialize(Packet)).
  321. t_serialize_parse_connack_v5(_) ->
  322. Props = #{
  323. 'Session-Expiry-Interval' => 60,
  324. 'Receive-Maximum' => 100,
  325. 'Maximum-QoS' => ?QOS_2,
  326. 'Retain-Available' => 1,
  327. 'Maximum-Packet-Size' => 1024,
  328. 'Assigned-Client-Identifier' => <<"id">>,
  329. 'Topic-Alias-Maximum' => 10,
  330. 'Reason-String' => <<>>,
  331. 'Wildcard-Subscription-Available' => 1,
  332. 'Subscription-Identifier-Available' => 1,
  333. 'Shared-Subscription-Available' => 1,
  334. 'Server-Keep-Alive' => 60,
  335. 'Response-Information' => <<"response">>,
  336. 'Server-Reference' => <<"192.168.1.10">>,
  337. 'Authentication-Method' => <<"oauth2">>,
  338. 'Authentication-Data' => <<"33kx93k">>
  339. },
  340. Packet = ?CONNACK_PACKET(?RC_SUCCESS, 0, Props),
  341. ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
  342. t_parse_sticky_frames(_) ->
  343. Payload = lists:duplicate(10, 0),
  344. P = #mqtt_packet{
  345. header = #mqtt_packet_header{
  346. type = ?PUBLISH,
  347. dup = false,
  348. qos = ?QOS_0,
  349. retain = false
  350. },
  351. variable = #mqtt_packet_publish{
  352. topic_name = <<"a/b">>,
  353. packet_id = undefined
  354. },
  355. payload = iolist_to_binary(Payload)
  356. },
  357. Bin = serialize_to_binary(P),
  358. Size = size(Bin),
  359. <<H:(Size - 2)/binary, TailTwoBytes/binary>> = Bin,
  360. %% needs 2 more bytes
  361. {more, PState1} = emqx_frame:parse(H),
  362. %% feed 3 bytes as if the next 1 byte belongs to the next packet.
  363. {ok, _, <<42>>, PState2} = emqx_frame:parse(iolist_to_binary([TailTwoBytes, 42]), PState1),
  364. ?assertMatch({none, _}, PState2).
  365. t_serialize_parse_qos0_publish(_) ->
  366. Bin = <<48, 14, 0, 7, 120, 120, 120, 47, 121, 121, 121, 104, 101, 108, 108, 111>>,
  367. Packet = #mqtt_packet{
  368. header = #mqtt_packet_header{
  369. type = ?PUBLISH,
  370. dup = false,
  371. qos = ?QOS_0,
  372. retain = false
  373. },
  374. variable = #mqtt_packet_publish{
  375. topic_name = <<"xxx/yyy">>,
  376. packet_id = undefined
  377. },
  378. payload = <<"hello">>
  379. },
  380. ?assertEqual(Bin, serialize_to_binary(Packet)),
  381. ?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})).
  382. t_serialize_parse_qos1_publish(_) ->
  383. Bin = <<50, 13, 0, 5, 97, 47, 98, 47, 99, 0, 1, 104, 97, 104, 97>>,
  384. Packet = #mqtt_packet{
  385. header = #mqtt_packet_header{
  386. type = ?PUBLISH,
  387. dup = false,
  388. qos = ?QOS_1,
  389. retain = false
  390. },
  391. variable = #mqtt_packet_publish{
  392. topic_name = <<"a/b/c">>,
  393. packet_id = 1
  394. },
  395. payload = <<"haha">>
  396. },
  397. ?assertEqual(Bin, serialize_to_binary(Packet)),
  398. ?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})),
  399. %% strict_mode = true
  400. ?ASSERT_FRAME_THROW(
  401. bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 0, <<>>))
  402. ),
  403. %% strict_mode = false
  404. _ = parse_serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 0, <<>>), #{strict_mode => false}).
  405. t_serialize_parse_qos2_publish(_) ->
  406. Packet = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>),
  407. Bin = <<52, 9, 0, 5, 84, 111, 112, 105, 99, 0, 1>>,
  408. ?assertEqual(Packet, parse_serialize(Packet)),
  409. ?assertEqual(Bin, serialize_to_binary(Packet)),
  410. ?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})),
  411. %% strict_mode = true
  412. ?ASSERT_FRAME_THROW(
  413. bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 0, <<>>))
  414. ),
  415. %% strict_mode = false
  416. _ = parse_serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 0, <<>>), #{strict_mode => false}).
  417. t_serialize_parse_publish_v5(_) ->
  418. Props = #{
  419. 'Payload-Format-Indicator' => 1,
  420. 'Message-Expiry-Interval' => 60,
  421. 'Topic-Alias' => 16#AB,
  422. 'Response-Topic' => <<"reply">>,
  423. 'Correlation-Data' => <<"correlation-id">>,
  424. 'Subscription-Identifier' => 1,
  425. 'Content-Type' => <<"text/json">>
  426. },
  427. Packet = ?PUBLISH_PACKET(?QOS_1, <<"$share/group/topic">>, 1, Props, <<"payload">>),
  428. ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
  429. t_serialize_parse_puback(_) ->
  430. Packet = ?PUBACK_PACKET(1),
  431. ?assertEqual(<<64, 2, 0, 1>>, serialize_to_binary(Packet)),
  432. ?assertEqual(Packet, parse_serialize(Packet)),
  433. %% strict_mode = true
  434. ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBACK_PACKET(0))),
  435. %% strict_mode = false
  436. ?PUBACK_PACKET(0) = parse_serialize(?PUBACK_PACKET(0), #{strict_mode => false}).
  437. t_serialize_parse_puback_v3_4(_) ->
  438. Bin = <<64, 2, 0, 1>>,
  439. Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK}, variable = 1},
  440. ?assertEqual(Bin, serialize_to_binary(Packet, ?MQTT_PROTO_V3)),
  441. ?assertEqual(Bin, serialize_to_binary(Packet, ?MQTT_PROTO_V4)),
  442. ?assertEqual(?PUBACK_PACKET(1), parse_to_packet(Bin, #{version => ?MQTT_PROTO_V3})),
  443. ?assertEqual(?PUBACK_PACKET(1), parse_to_packet(Bin, #{version => ?MQTT_PROTO_V4})).
  444. t_serialize_parse_puback_v5(_) ->
  445. Packet = ?PUBACK_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
  446. ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
  447. t_serialize_parse_pubrec(_) ->
  448. Packet = ?PUBREC_PACKET(1),
  449. ?assertEqual(<<5:4, 0:4, 2, 0, 1>>, serialize_to_binary(Packet)),
  450. ?assertEqual(Packet, parse_serialize(Packet)),
  451. %% strict_mode = true
  452. ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBREC_PACKET(0))),
  453. %% strict_mode = false
  454. ?PUBREC_PACKET(0) = parse_serialize(?PUBREC_PACKET(0), #{strict_mode => false}).
  455. t_serialize_parse_pubrec_v5(_) ->
  456. Packet = ?PUBREC_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
  457. ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
  458. t_serialize_parse_pubrel(_) ->
  459. Packet = ?PUBREL_PACKET(1),
  460. Bin = serialize_to_binary(Packet),
  461. ?assertEqual(<<6:4, 2:4, 2, 0, 1>>, Bin),
  462. ?assertEqual(Packet, parse_serialize(Packet)),
  463. %% PUBREL with bad qos 0
  464. Bin0 = <<6:4, 0:4, 2, 0, 1>>,
  465. ?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})),
  466. ?ASSERT_FRAME_THROW(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})),
  467. %% strict_mode = false
  468. ?PUBREL_PACKET(0) = parse_serialize(?PUBREL_PACKET(0), #{strict_mode => false}),
  469. %% strict_mode = true
  470. ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBREL_PACKET(0))).
  471. t_serialize_parse_pubrel_v5(_) ->
  472. Packet = ?PUBREL_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
  473. ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
  474. t_serialize_parse_pubcomp(_) ->
  475. Packet = ?PUBCOMP_PACKET(1),
  476. Bin = serialize_to_binary(Packet),
  477. ?assertEqual(<<7:4, 0:4, 2, 0, 1>>, Bin),
  478. ?assertEqual(Packet, parse_serialize(Packet)),
  479. %% strict_mode = false
  480. ?PUBCOMP_PACKET(0) = parse_serialize(?PUBCOMP_PACKET(0), #{strict_mode => false}),
  481. %% strict_mode = true
  482. ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBCOMP_PACKET(0))).
  483. t_serialize_parse_pubcomp_v5(_) ->
  484. Packet = ?PUBCOMP_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
  485. ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
  486. t_serialize_parse_subscribe(_) ->
  487. %% SUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[{<<"TopicA">>,2}])
  488. Bin = <<?SUBSCRIBE:4, 2:4, 11, 0, 2, 0, 6, 84, 111, 112, 105, 99, 65, 2>>,
  489. TopicOpts = #{nl => 0, rap => 0, rh => 0, qos => 2},
  490. TopicFilters = [{<<"TopicA">>, TopicOpts}],
  491. Packet = ?SUBSCRIBE_PACKET(2, TopicFilters),
  492. ?assertEqual(Bin, serialize_to_binary(Packet)),
  493. ?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})),
  494. %% SUBSCRIBE with bad qos 0
  495. Bin0 = <<?SUBSCRIBE:4, 0:4, 11, 0, 2, 0, 6, 84, 111, 112, 105, 99, 65, 2>>,
  496. ?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})),
  497. %% strict_mode = false
  498. _ = parse_to_packet(Bin0, #{strict_mode => false}),
  499. ?ASSERT_FRAME_THROW(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})),
  500. %% strict_mode = false
  501. _ = parse_serialize(?SUBSCRIBE_PACKET(0, TopicFilters), #{strict_mode => false}),
  502. %% strict_mode = true
  503. ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?SUBSCRIBE_PACKET(0, TopicFilters))),
  504. ?ASSERT_FRAME_THROW(
  505. bad_subqos, parse_serialize(?SUBSCRIBE_PACKET(1, [{<<"t">>, #{qos => 3}}]))
  506. ).
  507. t_serialize_parse_subscribe_v5(_) ->
  508. TopicFilters = [
  509. {<<"TopicQos0">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0}},
  510. {<<"TopicQos1">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0}}
  511. ],
  512. Packet = ?SUBSCRIBE_PACKET(3, #{'Subscription-Identifier' => 16#FFFFFFF}, TopicFilters),
  513. ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
  514. t_serialize_parse_suback(_) ->
  515. Packet = ?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128]),
  516. ?assertEqual(Packet, parse_serialize(Packet)),
  517. %% strict_mode = false
  518. _ = parse_serialize(?SUBACK_PACKET(0, [?QOS_0]), #{strict_mode => false}),
  519. %% strict_mode = true
  520. ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?SUBACK_PACKET(0, [?QOS_0]))).
  521. t_serialize_parse_suback_v5(_) ->
  522. Packet = ?SUBACK_PACKET(
  523. 1,
  524. #{
  525. 'Reason-String' => <<"success">>,
  526. 'User-Property' => [{<<"key">>, <<"value">>}]
  527. },
  528. [?QOS_0, ?QOS_1, 128]
  529. ),
  530. ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
  531. t_serialize_parse_unsubscribe(_) ->
  532. %% UNSUBSCRIBE(Q1, R1, D0, PacketId=2, TopicTable=[<<"TopicA">>])
  533. Bin = <<?UNSUBSCRIBE:4, 2:4, 10, 0, 2, 0, 6, 84, 111, 112, 105, 99, 65>>,
  534. Packet = ?UNSUBSCRIBE_PACKET(2, [<<"TopicA">>]),
  535. ?assertEqual(Bin, serialize_to_binary(Packet)),
  536. ?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})),
  537. %% UNSUBSCRIBE with bad qos
  538. %% UNSUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[<<"TopicA">>])
  539. Bin0 = <<?UNSUBSCRIBE:4, 0:4, 10, 0, 2, 0, 6, 84, 111, 112, 105, 99, 65>>,
  540. ?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})),
  541. ?ASSERT_FRAME_THROW(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})),
  542. %% strict_mode = false
  543. _ = parse_serialize(?UNSUBSCRIBE_PACKET(0, [<<"TopicA">>]), #{strict_mode => false}),
  544. %% strict_mode = true
  545. ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?UNSUBSCRIBE_PACKET(0, [<<"TopicA">>]))).
  546. t_serialize_parse_unsubscribe_v5(_) ->
  547. Props = #{'User-Property' => [{<<"key">>, <<"val">>}]},
  548. Packet = ?UNSUBSCRIBE_PACKET(10, Props, [<<"Topic1">>, <<"Topic2">>]),
  549. ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
  550. t_serialize_parse_unsuback(_) ->
  551. Packet = ?UNSUBACK_PACKET(10),
  552. ?assertEqual(Packet, parse_serialize(Packet)).
  553. t_serialize_parse_unsuback_v5(_) ->
  554. Packet = ?UNSUBACK_PACKET(
  555. 10,
  556. #{
  557. 'Reason-String' => <<"Not authorized">>,
  558. 'User-Property' => [{<<"key">>, <<"val">>}]
  559. },
  560. [16#87, 16#87, 16#87]
  561. ),
  562. ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
  563. t_serialize_parse_pingreq(_) ->
  564. PingReq = ?PACKET(?PINGREQ),
  565. ?assertEqual(PingReq, parse_serialize(PingReq)).
  566. t_serialize_parse_pingresp(_) ->
  567. PingResp = ?PACKET(?PINGRESP),
  568. Packet = serialize_to_binary(PingResp),
  569. ?assertException(
  570. throw,
  571. {frame_parse_error, #{cause := unexpected_packet, header_type := 'PINGRESP'}},
  572. emqx_frame:parse(Packet)
  573. ).
  574. t_parse_disconnect(_) ->
  575. Packet = ?DISCONNECT_PACKET(?RC_SUCCESS),
  576. ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(<<224, 0>>)).
  577. t_serialize_parse_disconnect(_) ->
  578. Packet = ?DISCONNECT_PACKET(?RC_SUCCESS),
  579. ?assertEqual(Packet, parse_serialize(Packet)).
  580. t_serialize_parse_disconnect_v5(_) ->
  581. Packet = ?DISCONNECT_PACKET(
  582. ?RC_SUCCESS,
  583. #{
  584. 'Session-Expiry-Interval' => 60,
  585. 'Reason-String' => <<"server_moved">>,
  586. 'Server-Reference' => <<"192.168.1.10">>
  587. }
  588. ),
  589. ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
  590. t_serialize_parse_auth_v5(_) ->
  591. Packet = ?AUTH_PACKET(
  592. ?RC_SUCCESS,
  593. #{
  594. 'Authentication-Method' => <<"oauth2">>,
  595. 'Authentication-Data' => <<"3zekkd">>,
  596. 'Reason-String' => <<"success">>,
  597. 'User-Property' => [
  598. {<<"key1">>, <<"val1">>},
  599. {<<"key2">>, <<"val2">>}
  600. ]
  601. }
  602. ),
  603. ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})),
  604. ?assertEqual(
  605. Packet,
  606. parse_serialize(Packet, #{
  607. version => ?MQTT_PROTO_V5,
  608. strict_mode => true
  609. })
  610. ).
  611. t_parse_invalid_remaining_len(_) ->
  612. ?assertException(
  613. throw,
  614. {frame_parse_error, #{cause := zero_remaining_len}},
  615. emqx_frame:parse(<<?CONNECT, 0>>)
  616. ).
  617. t_parse_malformed_properties(_) ->
  618. ?assertException(
  619. throw,
  620. {frame_parse_error, malformed_properties},
  621. emqx_frame:parse(<<2:4, 0:4, 3:8, 1:8, 0:8, 0:8>>)
  622. ).
  623. t_malformed_connect_header(_) ->
  624. ?ASSERT_FRAME_THROW(
  625. #{cause := malformed_connect, header_bytes := _},
  626. emqx_frame:parse(<<16, 11, 0, 6, 77, 81, 73, 115, 100, 112, 3, 130, 1, 6>>)
  627. ).
  628. t_malformed_connect_data(_) ->
  629. ?ASSERT_FRAME_THROW(
  630. #{cause := malformed_connect, unexpected_trailing_bytes := _},
  631. emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 0, 0, 0, 0, 0, 0>>)
  632. ).
  633. t_reserved_connect_flag(_) ->
  634. ?assertException(
  635. throw,
  636. {frame_parse_error, reserved_connect_flag},
  637. emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 1, 0, 0, 1, 0, 0>>)
  638. ).
  639. t_invalid_clientid(_) ->
  640. ?assertException(
  641. throw,
  642. {frame_parse_error, #{cause := invalid_clientid}},
  643. emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 0, 0, 0, 1, 0, 0>>)
  644. ).
  645. %% for regression: `password` must be `undefined`
  646. t_undefined_password(_) ->
  647. Payload = <<16, 19, 0, 4, 77, 81, 84, 84, 4, 130, 0, 60, 0, 2, 97, 49, 0, 3, 97, 97, 97>>,
  648. {ok, Packet, <<>>, {none, _}} = emqx_frame:parse(Payload),
  649. Password = undefined,
  650. ?assertEqual(
  651. #mqtt_packet{
  652. header = #mqtt_packet_header{
  653. type = 1,
  654. dup = false,
  655. qos = 0,
  656. retain = false
  657. },
  658. variable = #mqtt_packet_connect{
  659. proto_name = <<"MQTT">>,
  660. proto_ver = 4,
  661. is_bridge = false,
  662. clean_start = true,
  663. will_flag = false,
  664. will_qos = 0,
  665. will_retain = false,
  666. keepalive = 60,
  667. properties = #{},
  668. clientid = <<"a1">>,
  669. will_props = #{},
  670. will_topic = undefined,
  671. will_payload = undefined,
  672. username = <<"aaa">>,
  673. password = Password
  674. },
  675. payload = undefined
  676. },
  677. Packet
  678. ),
  679. ok.
  680. parse_serialize(Packet) ->
  681. parse_serialize(Packet, #{strict_mode => true}).
  682. parse_serialize(Packet, Opts) when is_map(Opts) ->
  683. Ver = maps:get(version, Opts, ?MQTT_PROTO_V4),
  684. Bin = iolist_to_binary(emqx_frame:serialize(Packet, Ver)),
  685. ParseState = emqx_frame:initial_parse_state(Opts),
  686. {ok, NPacket, <<>>, _} = emqx_frame:parse(Bin, ParseState),
  687. NPacket.
  688. serialize_to_binary(Packet) ->
  689. iolist_to_binary(emqx_frame:serialize(Packet)).
  690. serialize_to_binary(Packet, Ver) ->
  691. iolist_to_binary(emqx_frame:serialize(Packet, Ver)).
  692. parse_to_packet(Bin, Opts) ->
  693. PState = emqx_frame:initial_parse_state(Opts),
  694. {ok, Packet, <<>>, _} = emqx_frame:parse(Bin, PState),
  695. Packet.
  696. payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)).