emqx_packet_SUITE.erl 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2018-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_packet_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("emqx/include/emqx.hrl").
  20. -include_lib("emqx/include/emqx_mqtt.hrl").
  21. -include_lib("eunit/include/eunit.hrl").
  22. -define(PACKETS, [
  23. {?CONNECT, 'CONNECT', ?CONNECT_PACKET(#mqtt_packet_connect{})},
  24. {?CONNACK, 'CONNACK', ?CONNACK_PACKET(?RC_SUCCESS)},
  25. {?PUBLISH, 'PUBLISH', ?PUBLISH_PACKET(?QOS_1)},
  26. {?PUBACK, 'PUBACK', ?PUBACK_PACKET(1)},
  27. {?PUBREC, 'PUBREC', ?PUBREC_PACKET(1)},
  28. {?PUBREL, 'PUBREL', ?PUBREL_PACKET(1)},
  29. {?PUBCOMP, 'PUBCOMP', ?PUBCOMP_PACKET(1)},
  30. {?SUBSCRIBE, 'SUBSCRIBE', ?SUBSCRIBE_PACKET(1, [])},
  31. {?SUBACK, 'SUBACK', ?SUBACK_PACKET(1, [0])},
  32. {?UNSUBSCRIBE, 'UNSUBSCRIBE', ?UNSUBSCRIBE_PACKET(1, [])},
  33. {?UNSUBACK, 'UNSUBACK', ?UNSUBACK_PACKET(1)},
  34. {?DISCONNECT, 'DISCONNECT', ?DISCONNECT_PACKET(?RC_SUCCESS)},
  35. {?AUTH, 'AUTH', ?AUTH_PACKET()}
  36. ]).
  37. all() -> emqx_common_test_helpers:all(?MODULE).
  38. t_type(_) ->
  39. lists:foreach(
  40. fun({Type, _Name, Packet}) ->
  41. ?assertEqual(Type, emqx_packet:type(Packet))
  42. end,
  43. ?PACKETS
  44. ).
  45. t_type_name(_) ->
  46. lists:foreach(
  47. fun({_Type, Name, Packet}) ->
  48. ?assertEqual(Name, emqx_packet:type_name(Packet))
  49. end,
  50. ?PACKETS
  51. ).
  52. t_dup(_) ->
  53. ?assertEqual(false, emqx_packet:dup(?PUBLISH_PACKET(?QOS_1))).
  54. t_qos(_) ->
  55. lists:foreach(
  56. fun(QoS) ->
  57. ?assertEqual(QoS, emqx_packet:qos(?PUBLISH_PACKET(QoS)))
  58. end,
  59. [?QOS_0, ?QOS_1, ?QOS_2]
  60. ).
  61. t_retain(_) ->
  62. ?assertEqual(false, emqx_packet:retain(?PUBLISH_PACKET(?QOS_1))).
  63. t_proto_name(_) ->
  64. lists:foreach(
  65. fun({Ver, Name}) ->
  66. ConnPkt = ?CONNECT_PACKET(#mqtt_packet_connect{
  67. proto_ver = Ver,
  68. proto_name = Name
  69. }),
  70. ?assertEqual(Name, emqx_packet:proto_name(ConnPkt))
  71. end,
  72. ?PROTOCOL_NAMES
  73. ).
  74. t_proto_ver(_) ->
  75. lists:foreach(
  76. fun(Ver) ->
  77. ConnPkt = ?CONNECT_PACKET(#mqtt_packet_connect{proto_ver = Ver}),
  78. ?assertEqual(Ver, emqx_packet:proto_ver(ConnPkt))
  79. end,
  80. [?MQTT_PROTO_V3, ?MQTT_PROTO_V4, ?MQTT_PROTO_V5]
  81. ).
  82. t_connect_info(_) ->
  83. ConnPkt = #mqtt_packet_connect{
  84. will_flag = true,
  85. clientid = <<"clientid">>,
  86. username = <<"username">>,
  87. will_retain = true,
  88. will_qos = ?QOS_2,
  89. will_topic = <<"topic">>,
  90. will_payload = <<"payload">>
  91. },
  92. ?assertEqual(<<"MQTT">>, emqx_packet:info(proto_name, ConnPkt)),
  93. ?assertEqual(4, emqx_packet:info(proto_ver, ConnPkt)),
  94. ?assertEqual(false, emqx_packet:info(is_bridge, ConnPkt)),
  95. ?assertEqual(true, emqx_packet:info(clean_start, ConnPkt)),
  96. ?assertEqual(true, emqx_packet:info(will_flag, ConnPkt)),
  97. ?assertEqual(?QOS_2, emqx_packet:info(will_qos, ConnPkt)),
  98. ?assertEqual(true, emqx_packet:info(will_retain, ConnPkt)),
  99. ?assertEqual(0, emqx_packet:info(keepalive, ConnPkt)),
  100. ?assertEqual(#{}, emqx_packet:info(properties, ConnPkt)),
  101. ?assertEqual(<<"clientid">>, emqx_packet:info(clientid, ConnPkt)),
  102. ?assertEqual(#{}, emqx_packet:info(will_props, ConnPkt)),
  103. ?assertEqual(<<"topic">>, emqx_packet:info(will_topic, ConnPkt)),
  104. ?assertEqual(<<"payload">>, emqx_packet:info(will_payload, ConnPkt)),
  105. ?assertEqual(<<"username">>, emqx_packet:info(username, ConnPkt)),
  106. ?assertEqual(undefined, emqx_packet:info(password, ConnPkt)).
  107. t_connack_info(_) ->
  108. AckPkt = #mqtt_packet_connack{ack_flags = 0, reason_code = 0},
  109. ?assertEqual(0, emqx_packet:info(ack_flags, AckPkt)),
  110. ?assertEqual(0, emqx_packet:info(reason_code, AckPkt)),
  111. ?assertEqual(#{}, emqx_packet:info(properties, AckPkt)).
  112. t_publish_info(_) ->
  113. PubPkt = #mqtt_packet_publish{topic_name = <<"t">>, packet_id = 1},
  114. ?assertEqual(1, emqx_packet:info(packet_id, PubPkt)),
  115. ?assertEqual(<<"t">>, emqx_packet:info(topic_name, PubPkt)),
  116. ?assertEqual(#{}, emqx_packet:info(properties, PubPkt)).
  117. t_puback_info(_) ->
  118. AckPkt = #mqtt_packet_puback{packet_id = 1, reason_code = 0},
  119. ?assertEqual(1, emqx_packet:info(packet_id, AckPkt)),
  120. ?assertEqual(0, emqx_packet:info(reason_code, AckPkt)),
  121. ?assertEqual(#{}, emqx_packet:info(properties, AckPkt)).
  122. t_subscribe_info(_) ->
  123. TopicFilters = [{<<"t/#">>, #{}}],
  124. SubPkt = #mqtt_packet_subscribe{packet_id = 1, topic_filters = TopicFilters},
  125. ?assertEqual(1, emqx_packet:info(packet_id, SubPkt)),
  126. ?assertEqual(#{}, emqx_packet:info(properties, SubPkt)),
  127. ?assertEqual(TopicFilters, emqx_packet:info(topic_filters, SubPkt)).
  128. t_suback_info(_) ->
  129. SubackPkt = #mqtt_packet_suback{packet_id = 1, reason_codes = [0]},
  130. ?assertEqual(1, emqx_packet:info(packet_id, SubackPkt)),
  131. ?assertEqual(#{}, emqx_packet:info(properties, SubackPkt)),
  132. ?assertEqual([0], emqx_packet:info(reason_codes, SubackPkt)).
  133. t_unsubscribe_info(_) ->
  134. UnsubPkt = #mqtt_packet_unsubscribe{packet_id = 1, topic_filters = [<<"t/#">>]},
  135. ?assertEqual(1, emqx_packet:info(packet_id, UnsubPkt)),
  136. ?assertEqual(#{}, emqx_packet:info(properties, UnsubPkt)),
  137. ?assertEqual([<<"t/#">>], emqx_packet:info(topic_filters, UnsubPkt)).
  138. t_unsuback_info(_) ->
  139. AckPkt = #mqtt_packet_unsuback{packet_id = 1, reason_codes = [0]},
  140. ?assertEqual(1, emqx_packet:info(packet_id, AckPkt)),
  141. ?assertEqual([0], emqx_packet:info(reason_codes, AckPkt)),
  142. ?assertEqual(#{}, emqx_packet:info(properties, AckPkt)).
  143. t_disconnect_info(_) ->
  144. DisconnPkt = #mqtt_packet_disconnect{reason_code = 0},
  145. ?assertEqual(0, emqx_packet:info(reason_code, DisconnPkt)),
  146. ?assertEqual(#{}, emqx_packet:info(properties, DisconnPkt)).
  147. t_auth_info(_) ->
  148. AuthPkt = #mqtt_packet_auth{reason_code = 0},
  149. ?assertEqual(0, emqx_packet:info(reason_code, AuthPkt)),
  150. ?assertEqual(#{}, emqx_packet:info(properties, AuthPkt)).
  151. t_set_props(_) ->
  152. Pkts = [
  153. #mqtt_packet_connect{},
  154. #mqtt_packet_connack{},
  155. #mqtt_packet_publish{},
  156. #mqtt_packet_puback{},
  157. #mqtt_packet_subscribe{},
  158. #mqtt_packet_suback{},
  159. #mqtt_packet_unsubscribe{},
  160. #mqtt_packet_unsuback{},
  161. #mqtt_packet_disconnect{},
  162. #mqtt_packet_auth{}
  163. ],
  164. Props = #{'A-Fake-Props' => true},
  165. lists:foreach(
  166. fun(Pkt) ->
  167. ?assertEqual(Props, emqx_packet:info(properties, emqx_packet:set_props(Props, Pkt)))
  168. end,
  169. Pkts
  170. ).
  171. t_check_publish(_) ->
  172. Props = #{'Response-Topic' => <<"responsetopic">>, 'Topic-Alias' => 1},
  173. ok = emqx_packet:check(?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, Props, <<"payload">>)),
  174. ok = emqx_packet:check(#mqtt_packet_publish{packet_id = 1, topic_name = <<"t">>}),
  175. {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(#mqtt_packet_publish{
  176. topic_name = <<>>,
  177. properties = #{'Topic-Alias' => 0}
  178. }),
  179. {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(
  180. ?PUBLISH_PACKET(?QOS_1, <<>>, 1, #{}, <<"payload">>)
  181. ),
  182. {error, ?RC_TOPIC_NAME_INVALID} = emqx_packet:check(
  183. ?PUBLISH_PACKET(?QOS_1, <<"+/+">>, 1, #{}, <<"payload">>)
  184. ),
  185. {error, ?RC_TOPIC_ALIAS_INVALID} = emqx_packet:check(
  186. ?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Topic-Alias' => 0}, <<"payload">>)
  187. ),
  188. %% TODO::
  189. %% {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(
  190. %% ?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Subscription-Identifier' => 10}, <<"payload">>)
  191. %%),
  192. ok = emqx_packet:check(
  193. ?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Subscription-Identifier' => 10}, <<"payload">>)
  194. ),
  195. {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(
  196. ?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Subscription-Identifier' => 0}, <<"payload">>)
  197. ),
  198. {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(
  199. ?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Response-Topic' => <<"+/+">>}, <<"payload">>)
  200. ).
  201. t_check_subscribe(_) ->
  202. ok = emqx_packet:check(
  203. ?SUBSCRIBE_PACKET(
  204. 1,
  205. #{'Subscription-Identifier' => 1},
  206. [{<<"topic">>, #{qos => ?QOS_0}}]
  207. )
  208. ),
  209. {error, ?RC_TOPIC_FILTER_INVALID} = emqx_packet:check(#mqtt_packet_subscribe{topic_filters = []}),
  210. {error, ?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED} =
  211. emqx_packet:check(
  212. ?SUBSCRIBE_PACKET(
  213. 1,
  214. #{'Subscription-Identifier' => -1},
  215. [{<<"topic">>, #{qos => ?QOS_0, rp => 0}}]
  216. )
  217. ).
  218. t_check_unsubscribe(_) ->
  219. ok = emqx_packet:check(?UNSUBSCRIBE_PACKET(1, [<<"topic">>])),
  220. {error, ?RC_TOPIC_FILTER_INVALID} = emqx_packet:check(?UNSUBSCRIBE_PACKET(1, [])).
  221. t_check_connect(_) ->
  222. Opts = #{max_clientid_len => 5, mqtt_retain_available => false},
  223. ok = emqx_packet:check(#mqtt_packet_connect{}, Opts),
  224. ok = emqx_packet:check(
  225. ?CONNECT_PACKET(#mqtt_packet_connect{
  226. clientid = <<1>>,
  227. properties = #{'Receive-Maximum' => 1},
  228. will_flag = true,
  229. will_topic = <<"will_topic">>
  230. }),
  231. Opts
  232. ),
  233. ConnPkt1 = #mqtt_packet_connect{
  234. proto_name = <<"MQIsdp">>,
  235. proto_ver = ?MQTT_PROTO_V5
  236. },
  237. {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION} = emqx_packet:check(ConnPkt1, Opts),
  238. ConnPkt2 = #mqtt_packet_connect{
  239. proto_ver = ?MQTT_PROTO_V3,
  240. proto_name = <<"MQIsdp">>,
  241. clientid = <<>>
  242. },
  243. {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} = emqx_packet:check(ConnPkt2, Opts),
  244. ConnPkt3 = #mqtt_packet_connect{clientid = <<"123456">>},
  245. {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} = emqx_packet:check(ConnPkt3, Opts),
  246. ConnPkt4 = #mqtt_packet_connect{
  247. will_flag = true,
  248. will_retain = true
  249. },
  250. {error, ?RC_RETAIN_NOT_SUPPORTED} = emqx_packet:check(ConnPkt4, Opts),
  251. ConnPkt5 = #mqtt_packet_connect{
  252. will_flag = true,
  253. will_topic = <<"#">>
  254. },
  255. {error, ?RC_TOPIC_NAME_INVALID} = emqx_packet:check(ConnPkt5, Opts),
  256. ConnPkt6 = ?CONNECT_PACKET(#mqtt_packet_connect{
  257. properties = #{'Request-Response-Information' => -1}
  258. }),
  259. {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(ConnPkt6, Opts),
  260. {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(
  261. ?CONNECT_PACKET(#mqtt_packet_connect{
  262. properties = #{'Request-Problem-Information' => 2}
  263. }),
  264. Opts
  265. ),
  266. {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(
  267. ?CONNECT_PACKET(#mqtt_packet_connect{
  268. properties = #{'Receive-Maximum' => 0}
  269. }),
  270. Opts
  271. ),
  272. ConnPkt7 = #mqtt_packet_connect{clientid = <<>>, clean_start = false},
  273. {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} = emqx_packet:check(ConnPkt7, Opts).
  274. t_from_to_message(_) ->
  275. ExpectedMsg = emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>),
  276. ExpectedMsg1 = emqx_message:set_flags(#{dup => false, retain => false}, ExpectedMsg),
  277. ExpectedMsg2 = emqx_message:set_headers(
  278. #{
  279. peerhost => {127, 0, 0, 1},
  280. protocol => mqtt,
  281. properties => #{},
  282. username => <<"test">>
  283. },
  284. ExpectedMsg1
  285. ),
  286. Pkt = #mqtt_packet{
  287. header = #mqtt_packet_header{
  288. type = ?PUBLISH,
  289. qos = ?QOS_0,
  290. retain = false,
  291. dup = false
  292. },
  293. variable = #mqtt_packet_publish{
  294. topic_name = <<"topic">>,
  295. packet_id = 10,
  296. properties = #{}
  297. },
  298. payload = <<"payload">>
  299. },
  300. MsgFromPkt = emqx_packet:to_message(
  301. Pkt,
  302. <<"clientid">>,
  303. #{
  304. protocol => mqtt,
  305. username => <<"test">>,
  306. peerhost => {127, 0, 0, 1}
  307. }
  308. ),
  309. ?assertEqual(ExpectedMsg2, MsgFromPkt#message{
  310. id = emqx_message:id(ExpectedMsg),
  311. timestamp = emqx_message:timestamp(ExpectedMsg)
  312. }).
  313. t_will_msg(_) ->
  314. ?assertEqual(undefined, emqx_packet:will_msg(#mqtt_packet_connect{will_flag = false})),
  315. Pkt = #mqtt_packet_connect{
  316. will_flag = true,
  317. clientid = <<"clientid">>,
  318. username = "test",
  319. will_retain = true,
  320. will_qos = ?QOS_2,
  321. will_topic = <<"topic">>,
  322. will_props = #{},
  323. will_payload = <<"payload">>
  324. },
  325. Msg = emqx_packet:will_msg(Pkt),
  326. ?assertEqual(<<"clientid">>, Msg#message.from),
  327. ?assertEqual(<<"topic">>, Msg#message.topic),
  328. Pkt2 = #mqtt_packet_connect{
  329. will_flag = true,
  330. clientid = <<"clientid">>,
  331. username = "test",
  332. will_retain = true,
  333. will_qos = ?QOS_2,
  334. will_topic = <<"topic">>,
  335. will_payload = <<"payload">>
  336. },
  337. Msg2 = emqx_packet:will_msg(Pkt2),
  338. ?assertEqual(<<"clientid">>, Msg2#message.from),
  339. ?assertEqual(<<"topic">>, Msg2#message.topic).
  340. t_format(_) ->
  341. io:format("~ts", [
  342. emqx_packet:format(#mqtt_packet{
  343. header = #mqtt_packet_header{type = ?CONNACK, retain = true, dup = 0},
  344. variable = undefined
  345. })
  346. ]),
  347. io:format("~ts", [
  348. emqx_packet:format(#mqtt_packet{
  349. header = #mqtt_packet_header{type = ?CONNACK}, variable = 1, payload = <<"payload">>
  350. })
  351. ]),
  352. io:format("~ts", [
  353. emqx_packet:format(
  354. ?CONNECT_PACKET(#mqtt_packet_connect{
  355. will_flag = true,
  356. will_retain = true,
  357. will_qos = ?QOS_2,
  358. will_topic = <<"topic">>,
  359. will_payload = <<"payload">>
  360. })
  361. )
  362. ]),
  363. io:format("~ts", [
  364. emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{password = password}))
  365. ]),
  366. io:format("~ts", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]),
  367. io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]),
  368. io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]),
  369. io:format("~ts", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]),
  370. io:format("~ts", [emqx_packet:format(?PUBREL_PACKET(99))]),
  371. io:format("~ts", [
  372. emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]))
  373. ]),
  374. io:format("~ts", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]))]),
  375. io:format("~ts", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]),
  376. io:format("~ts", [emqx_packet:format(?UNSUBACK_PACKET(90))]),
  377. io:format("~ts", [emqx_packet:format(?DISCONNECT_PACKET(128))]).
  378. t_parse_empty_publish(_) ->
  379. %% 52: 0011(type=PUBLISH) 0100 (QoS=2)
  380. Packet = #mqtt_packet_publish{topic_name = <<>>},
  381. ?assertEqual({error, ?RC_PROTOCOL_ERROR}, emqx_packet:check(Packet)).