emqx_mqtt_protocol_v5_SUITE.erl 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_mqtt_protocol_v5_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("emqx/include/emqx_mqtt.hrl").
  20. -include_lib("eunit/include/eunit.hrl").
  21. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  22. -include_lib("common_test/include/ct.hrl").
  23. -import(lists, [nth/2]).
  24. -define(TOPICS, [
  25. <<"TopicA">>,
  26. <<"TopicA/B">>,
  27. <<"Topic/C">>,
  28. <<"TopicA/C">>,
  29. <<"/TopicA">>
  30. ]).
  31. -define(WILD_TOPICS, [
  32. <<"TopicA/+">>,
  33. <<"+/C">>,
  34. <<"#">>,
  35. <<"/#">>,
  36. <<"/+">>,
  37. <<"+/+">>,
  38. <<"TopicA/#">>
  39. ]).
  40. all() ->
  41. [
  42. {group, tcp},
  43. {group, quic}
  44. ].
  45. groups() ->
  46. TCs = emqx_common_test_helpers:all(?MODULE),
  47. [
  48. {tcp, [], TCs},
  49. {quic, [], TCs}
  50. ].
  51. init_per_group(tcp, Config) ->
  52. Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
  53. [{port, 1883}, {conn_fun, connect}, {group_apps, Apps} | Config];
  54. init_per_group(quic, Config) ->
  55. Apps = emqx_cth_suite:start(
  56. [{emqx, "listeners.quic.test { enable = true, bind = 1884 }"}],
  57. #{work_dir => emqx_cth_suite:work_dir(Config)}
  58. ),
  59. [{port, 1884}, {conn_fun, quic_connect}, {group_apps, Apps} | Config].
  60. end_per_group(_Group, Config) ->
  61. emqx_cth_suite:stop(?config(group_apps, Config)).
  62. init_per_testcase(TestCase, Config) ->
  63. case erlang:function_exported(?MODULE, TestCase, 2) of
  64. true -> ?MODULE:TestCase(init, Config);
  65. _ -> Config
  66. end.
  67. end_per_testcase(TestCase, Config) ->
  68. case erlang:function_exported(?MODULE, TestCase, 2) of
  69. true -> ?MODULE:TestCase('end', Config);
  70. false -> ok
  71. end,
  72. Config.
  73. %%--------------------------------------------------------------------
  74. %% Helpers
  75. %%--------------------------------------------------------------------
  76. client_info(Key, Client) ->
  77. maps:get(Key, maps:from_list(emqtt:info(Client)), undefined).
  78. receive_messages(Count) ->
  79. receive_messages(Count, []).
  80. receive_messages(0, Msgs) ->
  81. Msgs;
  82. receive_messages(Count, Msgs) ->
  83. receive
  84. {publish, Msg} ->
  85. receive_messages(Count - 1, [Msg | Msgs]);
  86. _Other ->
  87. receive_messages(Count, Msgs)
  88. after 1000 ->
  89. Msgs
  90. end.
  91. receive_disconnect_reasoncode() ->
  92. receive
  93. {disconnected, ReasonCode, _} -> ReasonCode;
  94. _Other -> receive_disconnect_reasoncode()
  95. after 100 ->
  96. error("no disconnect packet")
  97. end.
  98. waiting_client_process_exit(C) ->
  99. receive
  100. {'EXIT', C, Reason} -> Reason;
  101. _Oth -> error({got_another_message, _Oth})
  102. after 1000 -> error({waiting_timeout, C})
  103. end.
  104. clean_retained(Topic, Config) ->
  105. ConnFun = ?config(conn_fun, Config),
  106. {ok, Clean} = emqtt:start_link([{clean_start, true} | Config]),
  107. {ok, _} = emqtt:ConnFun(Clean),
  108. {ok, _} = emqtt:publish(Clean, Topic, #{}, <<"">>, [{qos, ?QOS_1}, {retain, true}]),
  109. ok = emqtt:disconnect(Clean).
  110. %%--------------------------------------------------------------------
  111. %% Test Cases
  112. %%--------------------------------------------------------------------
  113. t_basic_test(Config) ->
  114. ConnFun = ?config(conn_fun, Config),
  115. Topic = nth(1, ?TOPICS),
  116. {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]),
  117. {ok, _} = emqtt:ConnFun(C),
  118. {ok, _, [1]} = emqtt:subscribe(C, Topic, qos1),
  119. {ok, _, [2]} = emqtt:subscribe(C, Topic, qos2),
  120. {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
  121. {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
  122. {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
  123. ?assertEqual(3, length(receive_messages(3))),
  124. ok = emqtt:disconnect(C).
  125. %%--------------------------------------------------------------------
  126. %% Connection
  127. %%--------------------------------------------------------------------
  128. t_connect_clean_start(Config) ->
  129. ConnFun = ?config(conn_fun, Config),
  130. process_flag(trap_exit, true),
  131. {ok, Client1} = emqtt:start_link([
  132. {clientid, <<"t_connect_clean_start">>},
  133. {proto_ver, v5},
  134. {clean_start, true}
  135. | Config
  136. ]),
  137. {ok, _} = emqtt:ConnFun(Client1),
  138. %% [MQTT-3.1.2-4]
  139. ?assertEqual(0, client_info(session_present, Client1)),
  140. ok = emqtt:pause(Client1),
  141. {ok, Client2} = emqtt:start_link([
  142. {clientid, <<"t_connect_clean_start">>},
  143. {proto_ver, v5},
  144. {clean_start, false}
  145. | Config
  146. ]),
  147. {ok, _} = emqtt:ConnFun(Client2),
  148. %% [MQTT-3.1.2-5]
  149. ?assertEqual(1, client_info(session_present, Client2)),
  150. ?assertEqual(142, receive_disconnect_reasoncode()),
  151. waiting_client_process_exit(Client1),
  152. ok = emqtt:disconnect(Client2),
  153. waiting_client_process_exit(Client2),
  154. {ok, Client3} = emqtt:start_link([
  155. {clientid, <<"new_client">>},
  156. {proto_ver, v5},
  157. {clean_start, false}
  158. | Config
  159. ]),
  160. {ok, _} = emqtt:ConnFun(Client3),
  161. %% [MQTT-3.1.2-6]
  162. ?assertEqual(0, client_info(session_present, Client3)),
  163. ok = emqtt:disconnect(Client3),
  164. waiting_client_process_exit(Client3),
  165. process_flag(trap_exit, false).
  166. t_connect_will_message(Config) ->
  167. ConnFun = ?config(conn_fun, Config),
  168. Topic = nth(1, ?TOPICS),
  169. Payload = "will message",
  170. {ok, Client1} = emqtt:start_link([
  171. {proto_ver, v5},
  172. {clean_start, true},
  173. {will_flag, true},
  174. {will_topic, Topic},
  175. {will_payload, Payload}
  176. | Config
  177. ]),
  178. {ok, _} = emqtt:ConnFun(Client1),
  179. [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client1)),
  180. Info = emqx_connection:info(sys:get_state(ClientPid)),
  181. %% [MQTT-3.1.2-7]
  182. ?assertNotEqual(undefined, maps:find(will_msg, Info)),
  183. {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]),
  184. {ok, _} = emqtt:ConnFun(Client2),
  185. {ok, _, [2]} = emqtt:subscribe(Client2, Topic, qos2),
  186. %% [MQTT-3.14.2-1]
  187. ok = emqtt:disconnect(Client1, 4),
  188. [Msg | _] = receive_messages(1),
  189. %% [MQTT-3.1.2-8]
  190. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)),
  191. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)),
  192. ?assertEqual({ok, 0}, maps:find(qos, Msg)),
  193. ok = emqtt:disconnect(Client2),
  194. {ok, Client3} = emqtt:start_link([
  195. {proto_ver, v5},
  196. {clean_start, true},
  197. {will_flag, true},
  198. {will_topic, Topic},
  199. {will_payload, Payload}
  200. | Config
  201. ]),
  202. {ok, _} = emqtt:ConnFun(Client3),
  203. {ok, Client4} = emqtt:start_link([{proto_ver, v5} | Config]),
  204. {ok, _} = emqtt:ConnFun(Client4),
  205. {ok, _, [2]} = emqtt:subscribe(Client4, Topic, qos2),
  206. ok = emqtt:disconnect(Client3),
  207. %% [MQTT-3.1.2-10]
  208. MsgRecv = receive_messages(1),
  209. ?assertEqual([], MsgRecv),
  210. ok = emqtt:disconnect(Client4).
  211. t_batch_subscribe(init, Config) ->
  212. emqx_config:put_zone_conf(default, [authorization, enable], true),
  213. ok = meck:new(emqx_access_control, [non_strict, passthrough, no_history, no_link]),
  214. meck:expect(emqx_access_control, authorize, fun(_, _, _) -> deny end),
  215. Config;
  216. t_batch_subscribe('end', _Config) ->
  217. emqx_config:put_zone_conf(default, [authorization, enable], false),
  218. meck:unload(emqx_access_control).
  219. t_batch_subscribe(Config) ->
  220. ConnFun = ?config(conn_fun, Config),
  221. {ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"batch_test">>} | Config]),
  222. {ok, _} = emqtt:ConnFun(Client),
  223. {ok, _, [
  224. ?RC_NOT_AUTHORIZED,
  225. ?RC_NOT_AUTHORIZED,
  226. ?RC_NOT_AUTHORIZED
  227. ]} = emqtt:subscribe(Client, [
  228. {<<"t1">>, qos1},
  229. {<<"t2">>, qos2},
  230. {<<"t3">>, qos0}
  231. ]),
  232. {ok, _, [
  233. ?RC_NO_SUBSCRIPTION_EXISTED,
  234. ?RC_NO_SUBSCRIPTION_EXISTED,
  235. ?RC_NO_SUBSCRIPTION_EXISTED
  236. ]} = emqtt:unsubscribe(Client, [
  237. <<"t1">>,
  238. <<"t2">>,
  239. <<"t3">>
  240. ]),
  241. emqtt:disconnect(Client).
  242. t_connect_will_retain(Config) ->
  243. ConnFun = ?config(conn_fun, Config),
  244. process_flag(trap_exit, true),
  245. Topic = nth(1, ?TOPICS),
  246. Payload = "will message",
  247. {ok, Client1} = emqtt:start_link([
  248. {proto_ver, v5},
  249. {clean_start, true},
  250. {will_flag, true},
  251. {will_topic, Topic},
  252. {will_payload, Payload},
  253. {will_retain, false}
  254. | Config
  255. ]),
  256. {ok, _} = emqtt:ConnFun(Client1),
  257. {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]),
  258. {ok, _} = emqtt:ConnFun(Client2),
  259. {ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{rap, true}, {qos, 2}]}]),
  260. ok = emqtt:disconnect(Client1, 4),
  261. [Msg1 | _] = receive_messages(1),
  262. %% [MQTT-3.1.2-14]
  263. ?assertEqual({ok, false}, maps:find(retain, Msg1)),
  264. ok = emqtt:disconnect(Client2),
  265. {ok, Client3} = emqtt:start_link([
  266. {proto_ver, v5},
  267. {clean_start, true},
  268. {will_flag, true},
  269. {will_topic, Topic},
  270. {will_payload, Payload},
  271. {will_retain, true}
  272. | Config
  273. ]),
  274. {ok, _} = emqtt:ConnFun(Client3),
  275. {ok, Client4} = emqtt:start_link([{proto_ver, v5} | Config]),
  276. {ok, _} = emqtt:ConnFun(Client4),
  277. {ok, _, [2]} = emqtt:subscribe(Client4, #{}, [{Topic, [{rap, true}, {qos, 2}]}]),
  278. ok = emqtt:disconnect(Client3, 4),
  279. [Msg2 | _] = receive_messages(1),
  280. %% [MQTT-3.1.2-15]
  281. ?assertEqual({ok, true}, maps:find(retain, Msg2)),
  282. ok = emqtt:disconnect(Client4),
  283. clean_retained(Topic, Config).
  284. t_connect_idle_timeout(_Config) ->
  285. IdleTimeout = 2000,
  286. emqx_config:put_zone_conf(default, [mqtt, idle_timeout], IdleTimeout),
  287. emqx_config:put_zone_conf(default, [mqtt, idle_timeout], IdleTimeout),
  288. {ok, Sock} = emqtt_sock:connect({127, 0, 0, 1}, 1883, [], 60000),
  289. timer:sleep(IdleTimeout),
  290. ?assertMatch({error, closed}, emqtt_sock:recv(Sock, 1024)).
  291. t_connect_emit_stats_timeout(init, Config) ->
  292. NewIdleTimeout = 1000,
  293. emqx_config:put_zone_conf(default, [mqtt, idle_timeout], NewIdleTimeout),
  294. emqx_config:put_zone_conf(default, [mqtt, idle_timeout], NewIdleTimeout),
  295. ok = snabbkaffe:start_trace(),
  296. [{idle_timeout, NewIdleTimeout} | Config];
  297. t_connect_emit_stats_timeout('end', _Config) ->
  298. snabbkaffe:stop(),
  299. emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 15000),
  300. emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 15000),
  301. ok.
  302. t_connect_emit_stats_timeout(Config) ->
  303. ConnFun = ?config(conn_fun, Config),
  304. {_, IdleTimeout} = lists:keyfind(idle_timeout, 1, Config),
  305. {ok, Client} = emqtt:start_link([{proto_ver, v5}, {keepalive, 60} | Config]),
  306. {ok, _} = emqtt:ConnFun(Client),
  307. [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)),
  308. ?assert(is_reference(emqx_connection:info(stats_timer, sys:get_state(ClientPid)))),
  309. ?block_until(#{?snk_kind := cancel_stats_timer}, IdleTimeout * 2, _BackInTime = 0),
  310. ?assertEqual(undefined, emqx_connection:info(stats_timer, sys:get_state(ClientPid))),
  311. ok = emqtt:disconnect(Client).
  312. %% [MQTT-3.1.2-22]
  313. t_connect_keepalive_timeout(Config) ->
  314. ConnFun = ?config(conn_fun, Config),
  315. %% Prevent the emqtt client bringing us down on the disconnect.
  316. process_flag(trap_exit, true),
  317. Keepalive = 2,
  318. {ok, Client} = emqtt:start_link([
  319. {proto_ver, v5},
  320. {keepalive, Keepalive}
  321. | Config
  322. ]),
  323. {ok, _} = emqtt:ConnFun(Client),
  324. emqtt:pause(Client),
  325. receive
  326. {disconnected, ReasonCode, _Channel} -> ?assertEqual(141, ReasonCode)
  327. after round(timer:seconds(Keepalive) * 2 * 1.5) ->
  328. error("keepalive timeout")
  329. end.
  330. %% [MQTT-3.1.3-9]
  331. %% !!!REFACTOR NEED:
  332. %t_connect_will_delay_interval(Config) ->
  333. % process_flag(trap_exit, true),
  334. % Topic = nth(1, ?TOPICS),
  335. % Payload = "will message",
  336. %
  337. % {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
  338. % {ok, _} = emqtt:ConnFun(Client1),
  339. % {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2),
  340. %
  341. % {ok, Client2} = emqtt:start_link([
  342. % {clientid, <<"t_connect_will_delay_interval">>},
  343. % {proto_ver, v5},
  344. % {clean_start, true},
  345. % {will_flag, true},
  346. % {will_qos, 2},
  347. % {will_topic, Topic},
  348. % {will_payload, Payload},
  349. % {will_props, #{'Will-Delay-Interval' => 3}},
  350. % {properties, #{'Session-Expiry-Interval' => 7200}},
  351. % {keepalive, 2} | Config
  352. % ]),
  353. % {ok, _} = emqtt:ConnFun(Client2),
  354. % timer:sleep(50),
  355. % erlang:exit(Client2, kill),
  356. % timer:sleep(2000),
  357. % ?assertEqual(0, length(receive_messages(1))),
  358. % timer:sleep(5000),
  359. % ?assertEqual(1, length(receive_messages(1))),
  360. %
  361. % {ok, Client3} = emqtt:start_link([
  362. % {clientid, <<"t_connect_will_delay_interval">>},
  363. % {proto_ver, v5},
  364. % {clean_start, true},
  365. % {will_flag, true},
  366. % {will_qos, 2},
  367. % {will_topic, Topic},
  368. % {will_payload, Payload},
  369. % {will_props, #{'Will-Delay-Interval' => 7200}},
  370. % {properties, #{'Session-Expiry-Interval' => 3}},
  371. % {keepalive, 2} | Config
  372. % ]),
  373. % {ok, _} = emqtt:ConnFun(Client3),
  374. % timer:sleep(50),
  375. % erlang:exit(Client3, kill),
  376. %
  377. % timer:sleep(2000),
  378. % ?assertEqual(0, length(receive_messages(1))),
  379. % timer:sleep(5000),
  380. % ?assertEqual(1, length(receive_messages(1))),
  381. %
  382. % ok = emqtt:disconnect(Client1),
  383. %
  384. % receive {'EXIT', _, _} -> ok
  385. % after 100 -> ok
  386. % end,
  387. % process_flag(trap_exit, false).
  388. %% [MQTT-3.1.4-3]
  389. t_connect_duplicate_clientid(Config) ->
  390. ConnFun = ?config(conn_fun, Config),
  391. process_flag(trap_exit, true),
  392. {ok, Client1} = emqtt:start_link([
  393. {clientid, <<"t_connect_duplicate_clientid">>},
  394. {proto_ver, v5}
  395. | Config
  396. ]),
  397. {ok, _} = emqtt:ConnFun(Client1),
  398. {ok, Client2} = emqtt:start_link([
  399. {clientid, <<"t_connect_duplicate_clientid">>},
  400. {proto_ver, v5}
  401. | Config
  402. ]),
  403. {ok, _} = emqtt:ConnFun(Client2),
  404. ?assertEqual(142, receive_disconnect_reasoncode()),
  405. waiting_client_process_exit(Client1),
  406. ok = emqtt:disconnect(Client2),
  407. waiting_client_process_exit(Client2),
  408. process_flag(trap_exit, false).
  409. %%--------------------------------------------------------------------
  410. %% Connack
  411. %%--------------------------------------------------------------------
  412. t_connack_session_present(Config) ->
  413. ConnFun = ?config(conn_fun, Config),
  414. {ok, Client1} = emqtt:start_link([
  415. {clientid, <<"t_connect_duplicate_clientid">>},
  416. {proto_ver, v5},
  417. {properties, #{'Session-Expiry-Interval' => 7200}},
  418. {clean_start, true}
  419. | Config
  420. ]),
  421. {ok, _} = emqtt:ConnFun(Client1),
  422. %% [MQTT-3.2.2-2]
  423. ?assertEqual(0, client_info(session_present, Client1)),
  424. ok = emqtt:disconnect(Client1),
  425. {ok, Client2} = emqtt:start_link([
  426. {clientid, <<"t_connect_duplicate_clientid">>},
  427. {proto_ver, v5},
  428. {properties, #{'Session-Expiry-Interval' => 7200}},
  429. {clean_start, false}
  430. | Config
  431. ]),
  432. {ok, _} = emqtt:ConnFun(Client2),
  433. %% [[MQTT-3.2.2-3]]
  434. ?assertEqual(1, client_info(session_present, Client2)),
  435. ok = emqtt:disconnect(Client2).
  436. t_connack_max_qos_allowed(init, Config) ->
  437. Config;
  438. t_connack_max_qos_allowed('end', _Config) ->
  439. emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 2),
  440. emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 2),
  441. ok.
  442. t_connack_max_qos_allowed(Config) ->
  443. ConnFun = ?config(conn_fun, Config),
  444. process_flag(trap_exit, true),
  445. Topic = nth(1, ?TOPICS),
  446. %% max_qos_allowed = 0
  447. emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 0),
  448. emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 0),
  449. {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
  450. {ok, Connack1} = emqtt:ConnFun(Client1),
  451. %% [MQTT-3.2.2-9]
  452. ?assertEqual(0, maps:get('Maximum-QoS', Connack1)),
  453. %% [MQTT-3.2.2-10]
  454. {ok, _, [0]} = emqtt:subscribe(Client1, Topic, 0),
  455. %% [MQTT-3.2.2-10]
  456. {ok, _, [1]} = emqtt:subscribe(Client1, Topic, 1),
  457. %% [MQTT-3.2.2-10]
  458. {ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2),
  459. %% [MQTT-3.2.2-11]
  460. ?assertMatch(
  461. {error, {disconnected, 155, _}},
  462. emqtt:publish(Client1, Topic, <<"Unsupported Qos 1">>, qos1)
  463. ),
  464. ?assertEqual(155, receive_disconnect_reasoncode()),
  465. waiting_client_process_exit(Client1),
  466. {ok, Client2} = emqtt:start_link([
  467. {proto_ver, v5},
  468. {will_flag, true},
  469. {will_topic, Topic},
  470. {will_payload, <<"Unsupported Qos">>},
  471. {will_qos, 2}
  472. | Config
  473. ]),
  474. {error, Connack2} = emqtt:ConnFun(Client2),
  475. %% [MQTT-3.2.2-12]
  476. ?assertMatch({qos_not_supported, _}, Connack2),
  477. waiting_client_process_exit(Client2),
  478. %% max_qos_allowed = 1
  479. emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 1),
  480. emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 1),
  481. {ok, Client3} = emqtt:start_link([{proto_ver, v5} | Config]),
  482. {ok, Connack3} = emqtt:ConnFun(Client3),
  483. %% [MQTT-3.2.2-9]
  484. ?assertEqual(1, maps:get('Maximum-QoS', Connack3)),
  485. %% [MQTT-3.2.2-10]
  486. {ok, _, [0]} = emqtt:subscribe(Client3, Topic, 0),
  487. %% [MQTT-3.2.2-10]
  488. {ok, _, [1]} = emqtt:subscribe(Client3, Topic, 1),
  489. %% [MQTT-3.2.2-10]
  490. {ok, _, [2]} = emqtt:subscribe(Client3, Topic, 2),
  491. %% [MQTT-3.2.2-11]
  492. ?assertMatch(
  493. {error, {disconnected, 155, _}},
  494. emqtt:publish(Client3, Topic, <<"Unsupported Qos 2">>, qos2)
  495. ),
  496. ?assertEqual(155, receive_disconnect_reasoncode()),
  497. waiting_client_process_exit(Client3),
  498. {ok, Client4} = emqtt:start_link([
  499. {proto_ver, v5},
  500. {will_flag, true},
  501. {will_topic, Topic},
  502. {will_payload, <<"Unsupported Qos">>},
  503. {will_qos, 2}
  504. | Config
  505. ]),
  506. {error, Connack4} = emqtt:ConnFun(Client4),
  507. %% [MQTT-3.2.2-12]
  508. ?assertMatch({qos_not_supported, _}, Connack4),
  509. waiting_client_process_exit(Client4),
  510. %% max_qos_allowed = 2
  511. emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 2),
  512. emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 2),
  513. {ok, Client5} = emqtt:start_link([{proto_ver, v5} | Config]),
  514. {ok, Connack5} = emqtt:ConnFun(Client5),
  515. %% [MQTT-3.2.2-9]
  516. ?assertEqual(undefined, maps:get('Maximum-QoS', Connack5, undefined)),
  517. ok = emqtt:disconnect(Client5),
  518. waiting_client_process_exit(Client5),
  519. process_flag(trap_exit, false).
  520. t_connack_assigned_clientid(Config) ->
  521. ConnFun = ?config(conn_fun, Config),
  522. {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
  523. {ok, _} = emqtt:ConnFun(Client1),
  524. %% [MQTT-3.2.2-16]
  525. ?assert(is_binary(client_info(clientid, Client1))),
  526. ok = emqtt:disconnect(Client1).
  527. %%--------------------------------------------------------------------
  528. %% Publish
  529. %%--------------------------------------------------------------------
  530. t_publish_rap(Config) ->
  531. ConnFun = ?config(conn_fun, Config),
  532. Topic = nth(1, ?TOPICS),
  533. {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
  534. {ok, _} = emqtt:ConnFun(Client1),
  535. {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{rap, true}, {qos, 2}]}]),
  536. {ok, _} = emqtt:publish(
  537. Client1,
  538. Topic,
  539. #{},
  540. <<"retained message">>,
  541. [{qos, ?QOS_1}, {retain, true}]
  542. ),
  543. [Msg1 | _] = receive_messages(1),
  544. %% [MQTT-3.3.1-12]
  545. ?assertEqual(true, maps:get(retain, Msg1)),
  546. ok = emqtt:disconnect(Client1),
  547. {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]),
  548. {ok, _} = emqtt:ConnFun(Client2),
  549. {ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{rap, false}, {qos, 2}]}]),
  550. {ok, _} = emqtt:publish(
  551. Client2,
  552. Topic,
  553. #{},
  554. <<"retained message">>,
  555. [{qos, ?QOS_1}, {retain, true}]
  556. ),
  557. [Msg2 | _] = receive_messages(1),
  558. %% [MQTT-3.3.1-13]
  559. ?assertEqual(false, maps:get(retain, Msg2)),
  560. ok = emqtt:disconnect(Client2),
  561. clean_retained(Topic, Config).
  562. t_publish_wildtopic(Config) ->
  563. ConnFun = ?config(conn_fun, Config),
  564. process_flag(trap_exit, true),
  565. Topic = nth(1, ?WILD_TOPICS),
  566. {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
  567. {ok, _} = emqtt:ConnFun(Client1),
  568. ok = emqtt:publish(Client1, Topic, <<"error topic">>),
  569. ?assertEqual(144, receive_disconnect_reasoncode()),
  570. waiting_client_process_exit(Client1),
  571. process_flag(trap_exit, false).
  572. t_publish_payload_format_indicator(Config) ->
  573. ConnFun = ?config(conn_fun, Config),
  574. Topic = nth(1, ?TOPICS),
  575. Properties = #{'Payload-Format-Indicator' => 233},
  576. {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
  577. {ok, _} = emqtt:ConnFun(Client1),
  578. {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2),
  579. ok = emqtt:publish(Client1, Topic, Properties, <<"Payload Format Indicator">>, [{qos, ?QOS_0}]),
  580. [Msg1 | _] = receive_messages(1),
  581. %% [MQTT-3.3.2-6]
  582. ?assertEqual(Properties, maps:get(properties, Msg1)),
  583. ok = emqtt:disconnect(Client1).
  584. t_publish_topic_alias(Config) ->
  585. ConnFun = ?config(conn_fun, Config),
  586. process_flag(trap_exit, true),
  587. Topic = nth(1, ?TOPICS),
  588. {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
  589. {ok, _} = emqtt:ConnFun(Client1),
  590. ok = emqtt:publish(Client1, Topic, #{'Topic-Alias' => 0}, <<"Topic-Alias">>, [{qos, ?QOS_0}]),
  591. %% [MQTT-3.3.2-8]
  592. ?assertEqual(148, receive_disconnect_reasoncode()),
  593. waiting_client_process_exit(Client1),
  594. {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]),
  595. {ok, _} = emqtt:ConnFun(Client2),
  596. {ok, _, [2]} = emqtt:subscribe(Client2, Topic, qos2),
  597. ok = emqtt:publish(
  598. Client2,
  599. Topic,
  600. #{'Topic-Alias' => 233},
  601. <<"Topic-Alias">>,
  602. [{qos, ?QOS_0}]
  603. ),
  604. ok = emqtt:publish(
  605. Client2,
  606. <<"">>,
  607. #{'Topic-Alias' => 233},
  608. <<"Topic-Alias">>,
  609. [{qos, ?QOS_0}]
  610. ),
  611. %% [MQTT-3.3.2-12]
  612. ?assertEqual(2, length(receive_messages(2))),
  613. ok = emqtt:disconnect(Client2),
  614. waiting_client_process_exit(Client2),
  615. process_flag(trap_exit, false).
  616. t_publish_response_topic(Config) ->
  617. ConnFun = ?config(conn_fun, Config),
  618. process_flag(trap_exit, true),
  619. Topic = nth(1, ?TOPICS),
  620. {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
  621. {ok, _} = emqtt:ConnFun(Client1),
  622. ok = emqtt:publish(
  623. Client1,
  624. Topic,
  625. #{'Response-Topic' => nth(1, ?WILD_TOPICS)},
  626. <<"Response-Topic">>,
  627. [{qos, ?QOS_0}]
  628. ),
  629. %% [MQTT-3.3.2-14]
  630. ?assertEqual(130, receive_disconnect_reasoncode()),
  631. waiting_client_process_exit(Client1),
  632. process_flag(trap_exit, false).
  633. t_publish_properties(Config) ->
  634. ConnFun = ?config(conn_fun, Config),
  635. Topic = nth(1, ?TOPICS),
  636. Properties = #{
  637. %% [MQTT-3.3.2-15]
  638. 'Response-Topic' => Topic,
  639. %% [MQTT-3.3.2-16]
  640. 'Correlation-Data' => <<"233">>,
  641. %% [MQTT-3.3.2-18]
  642. 'User-Property' => [{<<"a">>, <<"2333">>}],
  643. %% [MQTT-3.3.2-20]
  644. 'Content-Type' => <<"2333">>
  645. },
  646. {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
  647. {ok, _} = emqtt:ConnFun(Client1),
  648. {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2),
  649. ok = emqtt:publish(Client1, Topic, Properties, <<"Publish Properties">>, [{qos, ?QOS_0}]),
  650. [Msg1 | _] = receive_messages(1),
  651. %% [MQTT-3.3.2-16]
  652. ?assertEqual(Properties, maps:get(properties, Msg1)),
  653. ok = emqtt:disconnect(Client1).
  654. t_publish_overlapping_subscriptions(Config) ->
  655. ConnFun = ?config(conn_fun, Config),
  656. Topic = nth(1, ?TOPICS),
  657. Properties = #{'Subscription-Identifier' => 2333},
  658. {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
  659. {ok, _} = emqtt:ConnFun(Client1),
  660. {ok, _, [1]} = emqtt:subscribe(Client1, Properties, nth(1, ?WILD_TOPICS), qos1),
  661. {ok, _, [0]} = emqtt:subscribe(Client1, Properties, nth(3, ?WILD_TOPICS), qos0),
  662. {ok, _} = emqtt:publish(
  663. Client1,
  664. Topic,
  665. #{},
  666. <<"t_publish_overlapping_subscriptions">>,
  667. [{qos, ?QOS_2}]
  668. ),
  669. [Msg1 | _] = receive_messages(2),
  670. %% [MQTT-3.3.4-2]
  671. ?assert(maps:get(qos, Msg1) < 2),
  672. %% [MQTT-3.3.4-3]
  673. ?assertEqual(Properties, maps:get(properties, Msg1)),
  674. ok = emqtt:disconnect(Client1).
  675. %%--------------------------------------------------------------------
  676. %% Subsctibe
  677. %%--------------------------------------------------------------------
  678. t_subscribe_topic_alias(Config) ->
  679. ConnFun = ?config(conn_fun, Config),
  680. Topic1 = nth(1, ?TOPICS),
  681. Topic2 = nth(2, ?TOPICS),
  682. {ok, Client1} = emqtt:start_link([
  683. {proto_ver, v5},
  684. {properties, #{'Topic-Alias-Maximum' => 1}}
  685. | Config
  686. ]),
  687. {ok, _} = emqtt:ConnFun(Client1),
  688. {ok, _, [2]} = emqtt:subscribe(Client1, Topic1, qos2),
  689. {ok, _, [2]} = emqtt:subscribe(Client1, Topic2, qos2),
  690. ok = emqtt:publish(Client1, Topic1, #{}, <<"Topic-Alias">>, [{qos, ?QOS_0}]),
  691. [Msg1] = receive_messages(1),
  692. ?assertEqual({ok, #{'Topic-Alias' => 1}}, maps:find(properties, Msg1)),
  693. ?assertEqual({ok, Topic1}, maps:find(topic, Msg1)),
  694. ok = emqtt:publish(Client1, Topic1, #{}, <<"Topic-Alias">>, [{qos, ?QOS_0}]),
  695. [Msg2] = receive_messages(1),
  696. ?assertEqual({ok, #{'Topic-Alias' => 1}}, maps:find(properties, Msg2)),
  697. ?assertEqual({ok, <<>>}, maps:find(topic, Msg2)),
  698. ok = emqtt:publish(Client1, Topic2, #{}, <<"Topic-Alias">>, [{qos, ?QOS_0}]),
  699. [Msg3] = receive_messages(1),
  700. ?assertEqual({ok, #{}}, maps:find(properties, Msg3)),
  701. ?assertEqual({ok, Topic2}, maps:find(topic, Msg3)),
  702. ok = emqtt:disconnect(Client1).
  703. t_subscribe_no_local(Config) ->
  704. ConnFun = ?config(conn_fun, Config),
  705. Topic = nth(1, ?TOPICS),
  706. {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
  707. {ok, _} = emqtt:ConnFun(Client1),
  708. {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{nl, true}, {qos, 2}]}]),
  709. {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]),
  710. {ok, _} = emqtt:ConnFun(Client2),
  711. {ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{nl, true}, {qos, 2}]}]),
  712. ok = emqtt:publish(Client1, Topic, <<"t_subscribe_no_local">>, 0),
  713. %% [MQTT-3.8.3-3]
  714. ?assertEqual(1, length(receive_messages(2))),
  715. ok = emqtt:disconnect(Client1).
  716. t_subscribe_no_local_mixed(Config) ->
  717. ConnFun = ?config(conn_fun, Config),
  718. Topic = nth(1, ?TOPICS),
  719. {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
  720. {ok, _} = emqtt:ConnFun(Client1),
  721. {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]),
  722. {ok, _} = emqtt:ConnFun(Client2),
  723. %% Given tow clients and client1 subscribe to topic with 'no local' set to true
  724. {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{nl, true}, {qos, 2}]}]),
  725. %% When mixed publish traffic are sent from both clients (Client1 sent 6 and Client2 sent 2)
  726. CB = {fun emqtt:sync_publish_result/3, [self(), async_res]},
  727. ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed1">>, 0, CB),
  728. ok = emqtt:publish_async(Client2, Topic, <<"t_subscribe_no_local_mixed2">>, 0, CB),
  729. ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed3">>, 0, CB),
  730. ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed4">>, 0, CB),
  731. ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed5">>, 0, CB),
  732. ok = emqtt:publish_async(Client2, Topic, <<"t_subscribe_no_local_mixed6">>, 0, CB),
  733. ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed7">>, 0, CB),
  734. ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed8">>, 0, CB),
  735. [
  736. receive
  737. {async_res, Res} -> ?assertEqual(ok, Res)
  738. end
  739. || _ <- lists:seq(1, 8)
  740. ],
  741. %% Then only two messages from clients 2 are received
  742. PubRecvd = receive_messages(9),
  743. ct:pal("~p", [PubRecvd]),
  744. ?assertEqual(2, length(PubRecvd)),
  745. ok = emqtt:disconnect(Client1),
  746. ok = emqtt:disconnect(Client2).
  747. t_subscribe_actions(Config) ->
  748. ConnFun = ?config(conn_fun, Config),
  749. Topic = nth(1, ?TOPICS),
  750. Properties = #{'Subscription-Identifier' => 2333},
  751. {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
  752. {ok, _} = emqtt:ConnFun(Client1),
  753. {ok, _, [2]} = emqtt:subscribe(Client1, Properties, Topic, qos2),
  754. {ok, _, [1]} = emqtt:subscribe(Client1, Properties, Topic, qos1),
  755. {ok, _} = emqtt:publish(Client1, Topic, <<"t_subscribe_actions">>, 2),
  756. [Msg1 | _] = receive_messages(1),
  757. %% [MQTT-3.8.4-3] [MQTT-3.8.4-8]
  758. ?assertEqual(1, maps:get(qos, Msg1)),
  759. %% [MQTT-3.8.4-5] [MQTT-3.8.4-6] [MQTT-3.8.4-7]
  760. {ok, _, [2, 2]} = emqtt:subscribe(Client1, [
  761. {nth(1, ?TOPICS), qos2},
  762. {nth(2, ?TOPICS), qos2}
  763. ]),
  764. ok = emqtt:disconnect(Client1).
  765. %%--------------------------------------------------------------------
  766. %% Unsubsctibe Unsuback
  767. %%--------------------------------------------------------------------
  768. t_unscbsctibe(Config) ->
  769. ConnFun = ?config(conn_fun, Config),
  770. Topic1 = nth(1, ?TOPICS),
  771. Topic2 = nth(2, ?TOPICS),
  772. {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
  773. {ok, _} = emqtt:ConnFun(Client1),
  774. {ok, _, [2]} = emqtt:subscribe(Client1, Topic1, qos2),
  775. %% [MQTT-3.10.4-4]
  776. {ok, _, [0]} = emqtt:unsubscribe(Client1, Topic1),
  777. %% [MQTT-3.10.4-5]
  778. {ok, _, [17]} = emqtt:unsubscribe(Client1, <<"noExistTopic">>),
  779. {ok, _, [2, 2]} = emqtt:subscribe(Client1, [{Topic1, qos2}, {Topic2, qos2}]),
  780. %% [[MQTT-3.10.4-6]] [MQTT-3.11.3-1] [MQTT-3.11.3-2]
  781. {ok, _, [0, 0, 17]} = emqtt:unsubscribe(Client1, [Topic1, Topic2, <<"noExistTopic">>]),
  782. ok = emqtt:disconnect(Client1).
  783. %%--------------------------------------------------------------------
  784. %% Pingreq
  785. %%--------------------------------------------------------------------
  786. t_pingreq(Config) ->
  787. ConnFun = ?config(conn_fun, Config),
  788. {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
  789. {ok, _} = emqtt:ConnFun(Client1),
  790. %% [MQTT-3.12.4-1]
  791. pong = emqtt:ping(Client1),
  792. ok = emqtt:disconnect(Client1).
  793. %%--------------------------------------------------------------------
  794. %% Shared Subscriptions
  795. %%--------------------------------------------------------------------
  796. t_shared_subscriptions_client_terminates_when_qos_eq_2(init, Config) ->
  797. ok = meck:new(emqtt, [non_strict, passthrough, no_history, no_link]),
  798. Config;
  799. t_shared_subscriptions_client_terminates_when_qos_eq_2('end', _Config) ->
  800. catch meck:unload(emqtt).
  801. t_shared_subscriptions_client_terminates_when_qos_eq_2(Config) ->
  802. ConnFun = ?config(conn_fun, Config),
  803. process_flag(trap_exit, true),
  804. emqx_config:put([broker, shared_dispatch_ack_enabled], true),
  805. Topic = nth(1, ?TOPICS),
  806. SharedTopic = list_to_binary("$share/sharename/" ++ binary_to_list(<<"TopicA">>)),
  807. CRef = counters:new(1, [atomics]),
  808. meck:expect(
  809. emqtt,
  810. connected,
  811. fun
  812. (cast, {?PUBLISH_PACKET(?QOS_2, _PacketId), _Via}, _State) ->
  813. ok = counters:add(CRef, 1, 1),
  814. {stop, {shutdown, for_testing}};
  815. (Arg1, ARg2, Arg3) ->
  816. meck:passthrough([Arg1, ARg2, Arg3])
  817. end
  818. ),
  819. {ok, Sub1} = emqtt:start_link([
  820. {proto_ver, v5},
  821. {clientid, <<"sub_client_1">>},
  822. {keepalive, 5}
  823. | Config
  824. ]),
  825. {ok, _} = emqtt:ConnFun(Sub1),
  826. {ok, _, [2]} = emqtt:subscribe(Sub1, SharedTopic, qos2),
  827. {ok, Sub2} = emqtt:start_link([
  828. {proto_ver, v5},
  829. {clientid, <<"sub_client_2">>},
  830. {keepalive, 5}
  831. | Config
  832. ]),
  833. {ok, _} = emqtt:ConnFun(Sub2),
  834. {ok, _, [2]} = emqtt:subscribe(Sub2, SharedTopic, qos2),
  835. {ok, Pub} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"pub_client">>} | Config]),
  836. {ok, _} = emqtt:ConnFun(Pub),
  837. {ok, _} = emqtt:publish(
  838. Pub,
  839. Topic,
  840. <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>,
  841. 2
  842. ),
  843. receive
  844. {'EXIT', _, {shutdown, for_testing}} ->
  845. ok
  846. after 1000 ->
  847. ct:fail("disconnected timeout")
  848. end,
  849. ?assertEqual(1, counters:get(CRef, 1)),
  850. process_flag(trap_exit, false).
  851. t_share_subscribe_no_local(Config) ->
  852. ConnFun = ?config(conn_fun, Config),
  853. process_flag(trap_exit, true),
  854. ShareTopic = <<"$share/sharename/TopicA">>,
  855. {ok, Client} = emqtt:start_link([{proto_ver, v5} | Config]),
  856. {ok, _} = emqtt:ConnFun(Client),
  857. %% MQTT-5.0 [MQTT-3.8.3-4] and [MQTT-4.13.1-1] (Disconnect)
  858. case catch emqtt:subscribe(Client, #{}, [{ShareTopic, [{nl, true}, {qos, 1}]}]) of
  859. {'EXIT', {Reason, _Stk}} ->
  860. ?assertEqual({disconnected, ?RC_PROTOCOL_ERROR, #{}}, Reason)
  861. end,
  862. process_flag(trap_exit, false).