emqx_persistent_session_SUITE.erl 46 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_persistent_session_SUITE).
  17. -include_lib("stdlib/include/assert.hrl").
  18. -include_lib("emqx/include/asserts.hrl").
  19. -include_lib("common_test/include/ct.hrl").
  20. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  21. -include_lib("emqx/include/emqx_mqtt.hrl").
  22. -include_lib("emqx_utils/include/emqx_message.hrl").
  23. -compile(export_all).
  24. -compile(nowarn_export_all).
  25. -define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
  26. %%--------------------------------------------------------------------
  27. %% SUITE boilerplate
  28. %%--------------------------------------------------------------------
  29. all() ->
  30. [
  31. % NOTE
  32. % Tests are disabled while existing session persistence impl is being
  33. % phased out.
  34. {group, persistence_disabled},
  35. {group, persistence_enabled}
  36. ].
  37. %% A persistent session can be resumed in two ways:
  38. %% 1. The old connection process is still alive, and the session is taken
  39. %% over by the new connection.
  40. %% 2. The old session process has died (e.g., because of node down).
  41. %% The new process resumes the session from the stored state, and finds
  42. %% any subscribed messages from the persistent message store.
  43. %%
  44. %% We want to test both ways, both with the db backend enabled and disabled.
  45. %%
  46. %% In addition, we test both tcp and quic connections.
  47. groups() ->
  48. TCs = emqx_common_test_helpers:all(?MODULE),
  49. TCsNonGeneric = [t_choose_impl, t_transient],
  50. TCGroups = [{group, tcp}, {group, quic}, {group, ws}],
  51. [
  52. {persistence_disabled, TCGroups},
  53. {persistence_enabled, TCGroups},
  54. {tcp, [], TCs},
  55. {quic, [], TCs -- TCsNonGeneric},
  56. {ws, [], TCs -- TCsNonGeneric}
  57. ].
  58. init_per_group(persistence_disabled, Config) ->
  59. [
  60. {emqx_config, "session_persistence { enable = false }"},
  61. {persistence, false}
  62. | Config
  63. ];
  64. init_per_group(persistence_enabled, Config) ->
  65. [
  66. {emqx_config,
  67. "session_persistence {\n"
  68. " enable = true\n"
  69. " last_alive_update_interval = 100ms\n"
  70. " renew_streams_interval = 100ms\n"
  71. " session_gc_interval = 2s\n"
  72. "}"},
  73. {persistence, ds}
  74. | Config
  75. ];
  76. init_per_group(tcp, Config) ->
  77. Apps = emqx_cth_suite:start(
  78. [{emqx, ?config(emqx_config, Config)}],
  79. #{work_dir => emqx_cth_suite:work_dir(Config)}
  80. ),
  81. [
  82. {port, get_listener_port(tcp, default)},
  83. {conn_fun, connect},
  84. {group_apps, Apps}
  85. | Config
  86. ];
  87. init_per_group(ws, Config) ->
  88. Apps = emqx_cth_suite:start(
  89. [{emqx, ?config(emqx_config, Config)}],
  90. #{work_dir => emqx_cth_suite:work_dir(Config)}
  91. ),
  92. [
  93. {ssl, false},
  94. {host, "localhost"},
  95. {enable_websocket, true},
  96. {port, get_listener_port(ws, default)},
  97. {conn_fun, ws_connect},
  98. {group_apps, Apps}
  99. | Config
  100. ];
  101. init_per_group(quic, Config) ->
  102. Apps = emqx_cth_suite:start(
  103. [
  104. {emqx,
  105. ?config(emqx_config, Config) ++
  106. "\n listeners.quic.test {"
  107. "\n enable = true"
  108. "\n ssl_options.verify = verify_peer"
  109. "\n }"}
  110. ],
  111. #{work_dir => emqx_cth_suite:work_dir(Config)}
  112. ),
  113. [
  114. {port, get_listener_port(quic, test)},
  115. {conn_fun, quic_connect},
  116. {ssl_opts, emqx_common_test_helpers:client_mtls()},
  117. {ssl, true},
  118. {group_apps, Apps}
  119. | Config
  120. ].
  121. get_listener_port(Type, Name) ->
  122. case emqx_config:get([listeners, Type, Name, bind]) of
  123. {_, Port} -> Port;
  124. Port -> Port
  125. end.
  126. end_per_group(Group, Config) when Group == tcp; Group == ws; Group == quic ->
  127. ok = emqx_cth_suite:stop(?config(group_apps, Config));
  128. end_per_group(_, _Config) ->
  129. catch emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB),
  130. ok.
  131. init_per_testcase(TestCase, Config) ->
  132. Config1 = preconfig_per_testcase(TestCase, Config),
  133. case erlang:function_exported(?MODULE, TestCase, 2) of
  134. true -> ?MODULE:TestCase(init, Config1);
  135. _ -> Config1
  136. end.
  137. end_per_testcase(TestCase, Config) ->
  138. case erlang:function_exported(?MODULE, TestCase, 2) of
  139. true -> ?MODULE:TestCase('end', Config);
  140. false -> ok
  141. end,
  142. Config.
  143. preconfig_per_testcase(TestCase, Config) ->
  144. {BaseName, Config1} =
  145. case ?config(tc_group_properties, Config) of
  146. [] ->
  147. %% We are running a single testcase
  148. {
  149. atom_to_binary(TestCase),
  150. init_per_group(tcp, init_per_group(kill_connection_process, Config))
  151. };
  152. [_ | _] = Props ->
  153. Path = lists:reverse(?config(tc_group_path, Config) ++ Props),
  154. Pre0 = [atom_to_list(N) || {name, N} <- lists:flatten(Path)],
  155. Pre1 = lists:join("_", Pre0 ++ [atom_to_binary(TestCase)]),
  156. {iolist_to_binary(Pre1), Config}
  157. end,
  158. [
  159. {topic, iolist_to_binary([BaseName, "/foo"])},
  160. {stopic, iolist_to_binary([BaseName, "/+"])},
  161. {stopic_alt, iolist_to_binary([BaseName, "/foo"])},
  162. {client_id, BaseName}
  163. | Config1
  164. ].
  165. %%--------------------------------------------------------------------
  166. %% Helpers
  167. %%--------------------------------------------------------------------
  168. client_info(Key, Client) ->
  169. maps:get(Key, maps:from_list(emqtt:info(Client)), undefined).
  170. receive_messages(Count) ->
  171. receive_messages(Count, 15000).
  172. receive_messages(Count, Timeout) ->
  173. Deadline = erlang:monotonic_time(millisecond) + Timeout,
  174. receive_message_loop(Count, Deadline).
  175. receive_message_loop(0, _Deadline) ->
  176. [];
  177. receive_message_loop(Count, Deadline) ->
  178. Timeout = max(0, Deadline - erlang:monotonic_time(millisecond)),
  179. receive
  180. {publish, Msg} ->
  181. [Msg | receive_message_loop(Count - 1, Deadline)];
  182. {pubrel, Msg} ->
  183. [{pubrel, Msg} | receive_message_loop(Count - 1, Deadline)];
  184. _Other ->
  185. receive_message_loop(Count, Deadline)
  186. after Timeout ->
  187. []
  188. end.
  189. maybe_kill_connection_process(ClientId, Config) ->
  190. Persistence = ?config(persistence, Config),
  191. case emqx_cm:lookup_channels(ClientId) of
  192. [] ->
  193. ok;
  194. [ConnectionPid] when Persistence == ds ->
  195. Ref = monitor(process, ConnectionPid),
  196. ConnectionPid ! die_if_test,
  197. ?assertReceive(
  198. {'DOWN', Ref, process, ConnectionPid, Reason} when
  199. Reason == normal orelse Reason == noproc,
  200. 3000
  201. ),
  202. wait_connection_process_unregistered(ClientId);
  203. _ ->
  204. ok
  205. end.
  206. wait_connection_process_dies(ClientId) ->
  207. case emqx_cm:lookup_channels(ClientId) of
  208. [] ->
  209. ok;
  210. [ConnectionPid] ->
  211. Ref = monitor(process, ConnectionPid),
  212. ?assertReceive(
  213. {'DOWN', Ref, process, ConnectionPid, Reason} when
  214. Reason == normal orelse Reason == noproc,
  215. 3000
  216. ),
  217. wait_connection_process_unregistered(ClientId)
  218. end.
  219. wait_connection_process_unregistered(ClientId) ->
  220. ?retry(
  221. _Timeout = 100,
  222. _Retries = 20,
  223. ?assertEqual([], emqx_cm:lookup_channels(ClientId))
  224. ).
  225. wait_channel_disconnected(ClientId) ->
  226. ?retry(
  227. _Timeout = 100,
  228. _Retries = 20,
  229. case emqx_cm:lookup_channels(ClientId) of
  230. [] ->
  231. false;
  232. [ChanPid] ->
  233. false = emqx_cm:is_channel_connected(ChanPid)
  234. end
  235. ).
  236. disconnect_client(ClientPid) ->
  237. ClientId = proplists:get_value(clientid, emqtt:info(ClientPid)),
  238. ok = emqtt:disconnect(ClientPid),
  239. false = wait_channel_disconnected(ClientId),
  240. ok.
  241. messages(Topic, Payloads) ->
  242. messages(Topic, Payloads, ?QOS_2).
  243. messages(Topic, Payloads, QoS) ->
  244. lists:map(
  245. fun
  246. (Bin) when is_binary(Bin) ->
  247. #mqtt_msg{topic = Topic, payload = Bin, qos = QoS};
  248. (Msg = #mqtt_msg{}) ->
  249. Msg#mqtt_msg{topic = Topic}
  250. end,
  251. Payloads
  252. ).
  253. publish(Topic, Payload) ->
  254. publish(Topic, Payload, ?QOS_2).
  255. publish(Topic, Payload, QoS) ->
  256. publish_many(messages(Topic, [Payload], QoS)).
  257. publish_many(Messages) ->
  258. publish_many(Messages, false).
  259. publish_many(Messages, WaitForUnregister) ->
  260. Fun = fun(Client, Message) ->
  261. case emqtt:publish(Client, Message) of
  262. ok -> ok;
  263. {ok, _} -> ok
  264. end
  265. end,
  266. do_publish(Messages, Fun, WaitForUnregister).
  267. do_publish(Messages = [_ | _], PublishFun, WaitForUnregister) ->
  268. %% Publish from another process to avoid connection confusion.
  269. {Pid, Ref} =
  270. spawn_monitor(
  271. fun() ->
  272. %% For convenience, always publish using tcp.
  273. %% The publish path is not what we are testing.
  274. ClientID = <<"ps_SUITE_publisher">>,
  275. {ok, Client} = emqtt:start_link([
  276. {proto_ver, v5},
  277. {clientid, ClientID},
  278. {port, 1883}
  279. ]),
  280. {ok, _} = emqtt:connect(Client),
  281. lists:foreach(fun(Message) -> PublishFun(Client, Message) end, Messages),
  282. ok = emqtt:disconnect(Client),
  283. %% Snabbkaffe sometimes fails unless all processes are gone.
  284. WaitForUnregister andalso wait_connection_process_dies(ClientID)
  285. end
  286. ),
  287. receive
  288. {'DOWN', Ref, process, Pid, normal} -> ok;
  289. {'DOWN', Ref, process, Pid, What} -> error({failed_publish, What})
  290. end.
  291. %%--------------------------------------------------------------------
  292. %% Test Cases
  293. %%--------------------------------------------------------------------
  294. t_choose_impl(Config) ->
  295. ClientId = ?config(client_id, Config),
  296. ConnFun = ?config(conn_fun, Config),
  297. {ok, Client} = emqtt:start_link([
  298. {clientid, ClientId},
  299. {proto_ver, v5},
  300. {properties, #{'Session-Expiry-Interval' => 30}}
  301. | Config
  302. ]),
  303. {ok, _} = emqtt:ConnFun(Client),
  304. [ChanPid] = emqx_cm:lookup_channels(ClientId),
  305. ?assertEqual(
  306. case ?config(persistence, Config) of
  307. false -> emqx_session_mem;
  308. ds -> emqx_persistent_session_ds
  309. end,
  310. emqx_connection:info({channel, {session, impl}}, sys:get_state(ChanPid))
  311. ),
  312. ok = emqtt:disconnect(Client).
  313. t_connect_discards_existing_client(Config) ->
  314. ClientId = ?config(client_id, Config),
  315. ConnFun = ?config(conn_fun, Config),
  316. ClientOpts = [
  317. {clientid, ClientId},
  318. {proto_ver, v5},
  319. {properties, #{'Session-Expiry-Interval' => 30}}
  320. | Config
  321. ],
  322. {ok, Client1} = emqtt:start_link(ClientOpts),
  323. true = unlink(Client1),
  324. MRef = erlang:monitor(process, Client1),
  325. {ok, _} = emqtt:ConnFun(Client1),
  326. {ok, Client2} = emqtt:start_link(ClientOpts),
  327. {ok, _} = emqtt:ConnFun(Client2),
  328. receive
  329. {'DOWN', MRef, process, Client1, Reason} ->
  330. ok = ?assertMatch({disconnected, ?RC_SESSION_TAKEN_OVER, _}, Reason),
  331. ok = emqtt:stop(Client2),
  332. ok
  333. after 1000 ->
  334. error({client_still_connected, Client1})
  335. end.
  336. %% [MQTT-3.1.2-23]
  337. t_connect_session_expiry_interval(Config) ->
  338. ConnFun = ?config(conn_fun, Config),
  339. Topic = ?config(topic, Config),
  340. STopic = ?config(stopic, Config),
  341. Payload = <<"test message">>,
  342. ClientId = ?config(client_id, Config),
  343. {ok, Client1} = emqtt:start_link([
  344. {clientid, ClientId},
  345. {proto_ver, v5},
  346. {properties, #{'Session-Expiry-Interval' => 30}}
  347. | Config
  348. ]),
  349. {ok, _} = emqtt:ConnFun(Client1),
  350. {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Client1, STopic, ?QOS_1),
  351. ok = emqtt:disconnect(Client1),
  352. maybe_kill_connection_process(ClientId, Config),
  353. publish(Topic, Payload, ?QOS_1),
  354. {ok, Client2} = emqtt:start_link([
  355. {clientid, ClientId},
  356. {proto_ver, v5},
  357. {properties, #{'Session-Expiry-Interval' => 30}},
  358. {clean_start, false}
  359. | Config
  360. ]),
  361. {ok, _} = emqtt:ConnFun(Client2),
  362. [Msg | _] = receive_messages(1),
  363. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)),
  364. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)),
  365. ?assertEqual({ok, ?QOS_1}, maps:find(qos, Msg)),
  366. ok = emqtt:disconnect(Client2).
  367. %% [MQTT-3.1.2-23]
  368. t_connect_session_expiry_interval_qos2(Config) ->
  369. ConnFun = ?config(conn_fun, Config),
  370. Topic = ?config(topic, Config),
  371. STopic = ?config(stopic, Config),
  372. Payload = <<"test message">>,
  373. ClientId = ?config(client_id, Config),
  374. {ok, Client1} = emqtt:start_link([
  375. {clientid, ClientId},
  376. {proto_ver, v5},
  377. {properties, #{'Session-Expiry-Interval' => 30}}
  378. | Config
  379. ]),
  380. {ok, _} = emqtt:ConnFun(Client1),
  381. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  382. ok = emqtt:disconnect(Client1),
  383. maybe_kill_connection_process(ClientId, Config),
  384. publish(Topic, Payload),
  385. {ok, Client2} = emqtt:start_link([
  386. {clientid, ClientId},
  387. {proto_ver, v5},
  388. {properties, #{'Session-Expiry-Interval' => 30}},
  389. {clean_start, false}
  390. | Config
  391. ]),
  392. {ok, _} = emqtt:ConnFun(Client2),
  393. [Msg | _] = receive_messages(1),
  394. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)),
  395. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)),
  396. ?assertEqual({ok, 2}, maps:find(qos, Msg)),
  397. ok = emqtt:disconnect(Client2).
  398. t_without_client_id(Config) ->
  399. %% Emqtt client dies
  400. process_flag(trap_exit, true),
  401. ConnFun = ?config(conn_fun, Config),
  402. {ok, Client0} = emqtt:start_link([
  403. {proto_ver, v5},
  404. {properties, #{'Session-Expiry-Interval' => 30}},
  405. {clean_start, false}
  406. | Config
  407. ]),
  408. {error, {client_identifier_not_valid, _}} = emqtt:ConnFun(Client0),
  409. ok.
  410. t_assigned_clientid_persistent_session(Config) ->
  411. ConnFun = ?config(conn_fun, Config),
  412. {ok, Client1} = emqtt:start_link([
  413. {proto_ver, v5},
  414. {properties, #{'Session-Expiry-Interval' => 30}},
  415. {clean_start, true}
  416. | Config
  417. ]),
  418. {ok, _} = emqtt:ConnFun(Client1),
  419. AssignedClientId = client_info(clientid, Client1),
  420. ok = emqtt:disconnect(Client1),
  421. maybe_kill_connection_process(AssignedClientId, Config),
  422. {ok, Client2} = emqtt:start_link([
  423. {clientid, AssignedClientId},
  424. {proto_ver, v5},
  425. {clean_start, false}
  426. | Config
  427. ]),
  428. {ok, _} = emqtt:ConnFun(Client2),
  429. ?assertEqual(1, client_info(session_present, Client2)),
  430. ok = emqtt:disconnect(Client2).
  431. t_cancel_on_disconnect(Config) ->
  432. %% Open a persistent session, but cancel the persistence when
  433. %% shutting down the connection.
  434. ConnFun = ?config(conn_fun, Config),
  435. ClientId = ?config(client_id, Config),
  436. {ok, Client1} = emqtt:start_link([
  437. {proto_ver, v5},
  438. {clientid, ClientId},
  439. {properties, #{'Session-Expiry-Interval' => 30}},
  440. {clean_start, true}
  441. | Config
  442. ]),
  443. {ok, _} = emqtt:ConnFun(Client1),
  444. ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 0}),
  445. wait_connection_process_unregistered(ClientId),
  446. {ok, Client2} = emqtt:start_link([
  447. {clientid, ClientId},
  448. {proto_ver, v5},
  449. {clean_start, false},
  450. {properties, #{'Session-Expiry-Interval' => 30}}
  451. | Config
  452. ]),
  453. {ok, _} = emqtt:ConnFun(Client2),
  454. ?assertEqual(0, client_info(session_present, Client2)),
  455. ok = emqtt:disconnect(Client2).
  456. t_persist_on_disconnect(Config) ->
  457. %% Open a non-persistent session, but add the persistence when
  458. %% shutting down the connection. This is a protocol error, and
  459. %% should not convert the session into a persistent session.
  460. ConnFun = ?config(conn_fun, Config),
  461. ClientId = ?config(client_id, Config),
  462. {ok, Client1} = emqtt:start_link([
  463. {proto_ver, v5},
  464. {clientid, ClientId},
  465. {properties, #{'Session-Expiry-Interval' => 0}},
  466. {clean_start, true}
  467. | Config
  468. ]),
  469. {ok, _} = emqtt:ConnFun(Client1),
  470. %% Strangely enough, the disconnect is reported as successful by emqtt.
  471. ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 30}),
  472. wait_connection_process_unregistered(ClientId),
  473. {ok, Client2} = emqtt:start_link([
  474. {clientid, ClientId},
  475. {proto_ver, v5},
  476. {clean_start, false},
  477. {properties, #{'Session-Expiry-Interval' => 30}}
  478. | Config
  479. ]),
  480. {ok, _} = emqtt:ConnFun(Client2),
  481. %% The session should not be known, since it wasn't persisted because of the
  482. %% changed expiry interval in the disconnect call.
  483. ?assertEqual(0, client_info(session_present, Client2)),
  484. ok = emqtt:disconnect(Client2).
  485. t_process_dies_session_expires(Config) ->
  486. %% Emulate an error in the connect process,
  487. %% or that the node of the process goes down.
  488. %% A persistent session should eventually expire.
  489. ?check_trace(
  490. begin
  491. ConnFun = ?config(conn_fun, Config),
  492. ClientId = ?config(client_id, Config),
  493. Topic = ?config(topic, Config),
  494. STopic = ?config(stopic, Config),
  495. Payload = <<"test">>,
  496. {ok, Client1} = emqtt:start_link([
  497. {proto_ver, v5},
  498. {clientid, ClientId},
  499. {properties, #{'Session-Expiry-Interval' => 1}},
  500. {clean_start, true}
  501. | Config
  502. ]),
  503. {ok, _} = emqtt:ConnFun(Client1),
  504. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  505. ok = emqtt:disconnect(Client1),
  506. maybe_kill_connection_process(ClientId, Config),
  507. ok = publish(Topic, Payload),
  508. timer:sleep(1500),
  509. {ok, Client2} = emqtt:start_link([
  510. {proto_ver, v5},
  511. {clientid, ClientId},
  512. {properties, #{'Session-Expiry-Interval' => 30}},
  513. {clean_start, false}
  514. | Config
  515. ]),
  516. {ok, _} = emqtt:ConnFun(Client2),
  517. ?assertEqual(0, client_info(session_present, Client2)),
  518. %% We should not receive the pending message
  519. ?assertEqual([], receive_messages(1)),
  520. emqtt:disconnect(Client2)
  521. end,
  522. []
  523. ).
  524. t_publish_while_client_is_gone_qos1(Config) ->
  525. %% A persistent session should receive messages in its
  526. %% subscription even if the process owning the session dies.
  527. ConnFun = ?config(conn_fun, Config),
  528. Topic = ?config(topic, Config),
  529. STopic = ?config(stopic, Config),
  530. Payload1 = <<"hello1">>,
  531. Payload2 = <<"hello2">>,
  532. ClientId = ?config(client_id, Config),
  533. {ok, Client1} = emqtt:start_link([
  534. {proto_ver, v5},
  535. {clientid, ClientId},
  536. {properties, #{'Session-Expiry-Interval' => 30}},
  537. {clean_start, true}
  538. | Config
  539. ]),
  540. {ok, _} = emqtt:ConnFun(Client1),
  541. {ok, _, [1]} = emqtt:subscribe(Client1, STopic, qos1),
  542. ok = emqtt:disconnect(Client1),
  543. maybe_kill_connection_process(ClientId, Config),
  544. ok = publish_many(messages(Topic, [Payload1, Payload2], ?QOS_1)),
  545. {ok, Client2} = emqtt:start_link([
  546. {proto_ver, v5},
  547. {clientid, ClientId},
  548. {properties, #{'Session-Expiry-Interval' => 30}},
  549. {clean_start, false}
  550. | Config
  551. ]),
  552. {ok, _} = emqtt:ConnFun(Client2),
  553. Msgs = receive_messages(2),
  554. ?assertMatch([_, _], Msgs),
  555. [Msg1, Msg2] = Msgs,
  556. ?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)),
  557. ?assertEqual({ok, 1}, maps:find(qos, Msg1)),
  558. ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)),
  559. ?assertEqual({ok, 1}, maps:find(qos, Msg2)),
  560. ok = emqtt:disconnect(Client2).
  561. t_publish_many_while_client_is_gone_qos1(Config) ->
  562. %% A persistent session should receive all of the still unacked messages
  563. %% for its subscriptions after the client dies or reconnects, in addition
  564. %% to new messages that were published while the client was gone. The order
  565. %% of the messages should be consistent across reconnects.
  566. ClientId = ?config(client_id, Config),
  567. ConnFun = ?config(conn_fun, Config),
  568. {ok, Client1} = emqtt:start_link([
  569. {proto_ver, v5},
  570. {clientid, ClientId},
  571. {properties, #{'Session-Expiry-Interval' => 30}},
  572. {clean_start, true},
  573. {auto_ack, never}
  574. | Config
  575. ]),
  576. {ok, _} = emqtt:ConnFun(Client1),
  577. STopics = [
  578. <<"t/+/foo">>,
  579. <<"msg/feed/#">>,
  580. <<"loc/+/+/+">>
  581. ],
  582. [{ok, _, [?QOS_1]} = emqtt:subscribe(Client1, ST, ?QOS_1) || ST <- STopics],
  583. Pubs1 = [
  584. #mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M1">>, qos = 1},
  585. #mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M2">>, qos = 1},
  586. #mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M3">>, qos = 1},
  587. #mqtt_msg{topic = <<"loc/1/2/42">>, payload = <<"M4">>, qos = 1},
  588. #mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M5">>, qos = 1},
  589. #mqtt_msg{topic = <<"loc/3/4/5">>, payload = <<"M6">>, qos = 1},
  590. #mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M7">>, qos = 1}
  591. ],
  592. ok = publish_many(Pubs1),
  593. NPubs1 = length(Pubs1),
  594. Msgs1 = receive_messages(NPubs1),
  595. NMsgs1 = length(Msgs1),
  596. ?assertEqual(NPubs1, NMsgs1),
  597. ct:pal("Msgs1 = ~p", [Msgs1]),
  598. %% TODO
  599. %% This assertion doesn't currently hold because `emqx_ds` doesn't enforce
  600. %% strict ordering reflecting client publishing order. Instead, per-topic
  601. %% ordering is guaranteed per each client. In fact, this violates the MQTT
  602. %% specification, but we deemed it acceptable for now.
  603. %% ?assertMatch([
  604. %% #{payload := <<"M1">>},
  605. %% #{payload := <<"M2">>},
  606. %% #{payload := <<"M3">>},
  607. %% #{payload := <<"M4">>},
  608. %% #{payload := <<"M5">>},
  609. %% #{payload := <<"M6">>},
  610. %% #{payload := <<"M7">>}
  611. %% ], Msgs1),
  612. ?assertEqual(
  613. get_topicwise_order(Pubs1),
  614. get_topicwise_order(Msgs1)
  615. ),
  616. NAcked = 4,
  617. ?assert(NMsgs1 >= NAcked),
  618. [ok = emqtt:puback(Client1, PktId) || #{packet_id := PktId} <- lists:sublist(Msgs1, NAcked)],
  619. %% Ensure that PUBACKs are propagated to the channel.
  620. pong = emqtt:ping(Client1),
  621. ok = disconnect_client(Client1),
  622. maybe_kill_connection_process(ClientId, Config),
  623. Pubs2 = [
  624. #mqtt_msg{topic = <<"loc/3/4/6">>, payload = <<"M8">>, qos = 1},
  625. #mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M9">>, qos = 1},
  626. #mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M10">>, qos = 1},
  627. #mqtt_msg{topic = <<"msg/feed/friend">>, payload = <<"M11">>, qos = 1},
  628. #mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M12">>, qos = 1}
  629. ],
  630. ok = publish_many(Pubs2),
  631. NPubs2 = length(Pubs2),
  632. %% Now reconnect with auto ack to make sure all streams are
  633. %% replayed till the end:
  634. {ok, Client2} = emqtt:start_link([
  635. {proto_ver, v5},
  636. {clientid, ClientId},
  637. {properties, #{'Session-Expiry-Interval' => 30}},
  638. {clean_start, false}
  639. | Config
  640. ]),
  641. {ok, _} = emqtt:ConnFun(Client2),
  642. %% Try to receive _at most_ `NPubs` messages.
  643. %% There shouldn't be that much unacked messages in the replay anyway,
  644. %% but it's an easy number to pick.
  645. NPubs = NPubs1 + NPubs2,
  646. Msgs2 = receive_messages(NPubs, _Timeout = 2000),
  647. NMsgs2 = length(Msgs2),
  648. ct:pal("Msgs2 = ~p", [Msgs2]),
  649. ?assert(NMsgs2 < NPubs, {NMsgs2, '<', NPubs}),
  650. ?assert(NMsgs2 > NPubs2, {NMsgs2, '>', NPubs2}),
  651. ?assert(NMsgs2 >= NPubs - NAcked, Msgs2),
  652. NSame = NMsgs2 - NPubs2,
  653. ?assert(
  654. lists:all(fun(#{dup := Dup}) -> Dup end, lists:sublist(Msgs2, NSame))
  655. ),
  656. ?assertNot(
  657. lists:all(fun(#{dup := Dup}) -> Dup end, lists:nthtail(NSame, Msgs2))
  658. ),
  659. ?assertEqual(
  660. [maps:with([packet_id, topic, payload], M) || M <- lists:nthtail(NMsgs1 - NSame, Msgs1)],
  661. [maps:with([packet_id, topic, payload], M) || M <- lists:sublist(Msgs2, NSame)]
  662. ),
  663. ok = disconnect_client(Client2).
  664. t_publish_while_client_is_gone(Config) ->
  665. %% A persistent session should receive messages in its
  666. %% subscription even if the process owning the session dies.
  667. ConnFun = ?config(conn_fun, Config),
  668. Topic = ?config(topic, Config),
  669. STopic = ?config(stopic, Config),
  670. Payload1 = <<"hello1">>,
  671. Payload2 = <<"hello2">>,
  672. ClientId = ?config(client_id, Config),
  673. {ok, Client1} = emqtt:start_link([
  674. {proto_ver, v5},
  675. {clientid, ClientId},
  676. {properties, #{'Session-Expiry-Interval' => 30}},
  677. {clean_start, true}
  678. | Config
  679. ]),
  680. {ok, _} = emqtt:ConnFun(Client1),
  681. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  682. ok = emqtt:disconnect(Client1),
  683. maybe_kill_connection_process(ClientId, Config),
  684. ok = publish_many(messages(Topic, [Payload1, Payload2])),
  685. {ok, Client2} = emqtt:start_link([
  686. {proto_ver, v5},
  687. {clientid, ClientId},
  688. {properties, #{'Session-Expiry-Interval' => 30}},
  689. {clean_start, false}
  690. | Config
  691. ]),
  692. {ok, _} = emqtt:ConnFun(Client2),
  693. Msgs = receive_messages(2),
  694. ?assertMatch([_, _], Msgs),
  695. [Msg1, Msg2] = Msgs,
  696. ?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)),
  697. ?assertEqual({ok, 2}, maps:find(qos, Msg1)),
  698. ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)),
  699. ?assertEqual({ok, 2}, maps:find(qos, Msg2)),
  700. ok = emqtt:disconnect(Client2).
  701. t_publish_many_while_client_is_gone(Config) ->
  702. %% A persistent session should receive all of the still unacked messages
  703. %% for its subscriptions after the client dies or reconnects, in addition
  704. %% to PUBRELs for the messages it has PUBRECed. While client must send
  705. %% PUBACKs and PUBRECs in order, those orders are independent of each other.
  706. %%
  707. %% Developer's note: for simplicity we publish all messages to the
  708. %% same topic, since persistent session ds may reorder messages
  709. %% that belong to different streams, and this particular test is
  710. %% very sensitive the order.
  711. ClientId = ?config(client_id, Config),
  712. ConnFun = ?config(conn_fun, Config),
  713. ClientOpts = [
  714. {proto_ver, v5},
  715. {clientid, ClientId},
  716. {properties, #{'Session-Expiry-Interval' => 30}},
  717. {auto_ack, never}
  718. | Config
  719. ],
  720. {ok, Client1} = emqtt:start_link([{clean_start, true} | ClientOpts]),
  721. {ok, _} = emqtt:ConnFun(Client1),
  722. {ok, _, [?QOS_2]} = emqtt:subscribe(Client1, <<"t">>, ?QOS_2),
  723. Pubs1 = [
  724. #mqtt_msg{topic = <<"t">>, payload = <<"M1">>, qos = 1},
  725. #mqtt_msg{topic = <<"t">>, payload = <<"M2">>, qos = 1},
  726. #mqtt_msg{topic = <<"t">>, payload = <<"M3">>, qos = 2},
  727. #mqtt_msg{topic = <<"t">>, payload = <<"M4">>, qos = 2},
  728. #mqtt_msg{topic = <<"t">>, payload = <<"M5">>, qos = 2},
  729. #mqtt_msg{topic = <<"t">>, payload = <<"M6">>, qos = 1},
  730. #mqtt_msg{topic = <<"t">>, payload = <<"M7">>, qos = 2},
  731. #mqtt_msg{topic = <<"t">>, payload = <<"M8">>, qos = 1},
  732. #mqtt_msg{topic = <<"t">>, payload = <<"M9">>, qos = 2}
  733. ],
  734. ok = publish_many(Pubs1),
  735. NPubs1 = length(Pubs1),
  736. Msgs1 = receive_messages(NPubs1),
  737. ct:pal("Msgs1 = ~p", [Msgs1]),
  738. NMsgs1 = length(Msgs1),
  739. ?assertEqual(NPubs1, NMsgs1, emqx_persistent_session_ds:print_session(ClientId)),
  740. ?assertEqual(
  741. get_topicwise_order(Pubs1),
  742. get_topicwise_order(Msgs1),
  743. emqx_persistent_session_ds:print_session(ClientId)
  744. ),
  745. %% PUBACK every QoS 1 message.
  746. lists:foreach(
  747. fun(PktId) -> ok = emqtt:puback(Client1, PktId) end,
  748. [PktId || #{qos := 1, packet_id := PktId} <- Msgs1]
  749. ),
  750. %% PUBREC first `NRecs` QoS 2 messages (up to "M5")
  751. NRecs = 3,
  752. PubRecs1 = lists:sublist([PktId || #{qos := 2, packet_id := PktId} <- Msgs1], NRecs),
  753. lists:foreach(
  754. fun(PktId) -> ok = emqtt:pubrec(Client1, PktId) end,
  755. PubRecs1
  756. ),
  757. %% Ensure that PUBACKs / PUBRECs are propagated to the channel.
  758. pong = emqtt:ping(Client1),
  759. %% Receive PUBRELs for the sent PUBRECs.
  760. PubRels1 = receive_messages(NRecs),
  761. ct:pal("PubRels1 = ~p", [PubRels1]),
  762. ?assertEqual(
  763. PubRecs1,
  764. [PktId || {pubrel, #{packet_id := PktId}} <- PubRels1],
  765. PubRels1
  766. ),
  767. ok = disconnect_client(Client1),
  768. maybe_kill_connection_process(ClientId, Config),
  769. Pubs2 = [
  770. #mqtt_msg{topic = <<"t">>, payload = <<"M10">>, qos = 2},
  771. #mqtt_msg{topic = <<"t">>, payload = <<"M11">>, qos = 1},
  772. #mqtt_msg{topic = <<"t">>, payload = <<"M12">>, qos = 2}
  773. ],
  774. ok = publish_many(Pubs2),
  775. NPubs2 = length(Pubs2),
  776. {ok, Client2} = emqtt:start_link([{clean_start, false} | ClientOpts]),
  777. {ok, _} = emqtt:ConnFun(Client2),
  778. %% Try to receive _at most_ `NPubs` messages.
  779. %% There shouldn't be that much unacked messages in the replay anyway,
  780. %% but it's an easy number to pick.
  781. NPubs = NPubs1 + NPubs2,
  782. Msgs2 = receive_messages(NPubs, _Timeout = 2000),
  783. ct:pal("Msgs2 = ~p", [Msgs2]),
  784. %% We should again receive PUBRELs for the PUBRECs we sent earlier.
  785. ?assertEqual(
  786. get_msgs_essentials(PubRels1),
  787. [get_msg_essentials(PubRel) || PubRel = {pubrel, _} <- Msgs2]
  788. ),
  789. %% We should receive duplicates only for QoS 2 messages where PUBRELs were
  790. %% not sent, in the same order as the original messages.
  791. Msgs2Dups = [get_msg_essentials(M) || M = #{dup := true} <- Msgs2],
  792. ?assertEqual(
  793. Msgs2Dups,
  794. [M || M = #{qos := 2} <- Msgs2Dups]
  795. ),
  796. ?assertEqual(
  797. get_msgs_essentials(pick_respective_msgs(Msgs2Dups, Msgs1)),
  798. Msgs2Dups
  799. ),
  800. %% Ack more messages:
  801. PubRecs2 = lists:sublist([PktId || #{qos := 2, packet_id := PktId} <- Msgs2], 2),
  802. lists:foreach(
  803. fun(PktId) -> ok = emqtt:pubrec(Client2, PktId) end,
  804. PubRecs2
  805. ),
  806. PubRels2 = receive_messages(length(PubRecs2)),
  807. ct:pal("PubRels2 = ~p", [PubRels2]),
  808. ?assertEqual(
  809. PubRecs2,
  810. [PktId || {pubrel, #{packet_id := PktId}} <- PubRels2],
  811. PubRels2
  812. ),
  813. %% PUBCOMP every PUBREL.
  814. PubComps = [PktId || {pubrel, #{packet_id := PktId}} <- PubRels1 ++ PubRels2],
  815. ct:pal("PubComps: ~p", [PubComps]),
  816. lists:foreach(
  817. fun(PktId) -> ok = emqtt:pubcomp(Client2, PktId) end,
  818. PubComps
  819. ),
  820. %% Ensure that PUBCOMPs are propagated to the channel.
  821. pong = emqtt:ping(Client2),
  822. %% Reconnect for the last time
  823. ok = disconnect_client(Client2),
  824. maybe_kill_connection_process(ClientId, Config),
  825. {ok, Client3} = emqtt:start_link([{clean_start, false} | ClientOpts]),
  826. {ok, _} = emqtt:ConnFun(Client3),
  827. %% Check that we receive the rest of the messages:
  828. Msgs3 = receive_messages(NPubs, _Timeout = 2000),
  829. ct:pal("Msgs3 = ~p", [Msgs3]),
  830. ?assertMatch(
  831. [<<"M10">>, <<"M11">>, <<"M12">>],
  832. [I || #{payload := I} <- Msgs3]
  833. ),
  834. ok = disconnect_client(Client3).
  835. t_clean_start_drops_subscriptions(Config) ->
  836. %% 1. A persistent session is started and disconnected.
  837. %% 2. While disconnected, a message is published and persisted.
  838. %% 3. When connecting again, the clean start flag is set, the subscription is renewed,
  839. %% then we disconnect again.
  840. %% 4. Finally, a new connection is made with clean start set to false.
  841. %% The original message should not be delivered.
  842. ConnFun = ?config(conn_fun, Config),
  843. Topic = ?config(topic, Config),
  844. STopic = ?config(stopic, Config),
  845. Payload1 = <<"hello1">>,
  846. Payload2 = <<"hello2">>,
  847. Payload3 = <<"hello3">>,
  848. ClientId = ?config(client_id, Config),
  849. %% 1.
  850. {ok, Client1} = emqtt:start_link([
  851. {proto_ver, v5},
  852. {clientid, ClientId},
  853. {properties, #{'Session-Expiry-Interval' => 30}},
  854. {clean_start, true}
  855. | Config
  856. ]),
  857. {ok, _} = emqtt:ConnFun(Client1),
  858. {ok, _, [1]} = emqtt:subscribe(Client1, STopic, qos1),
  859. ok = emqtt:disconnect(Client1),
  860. maybe_kill_connection_process(ClientId, Config),
  861. %% 2.
  862. ok = publish(Topic, Payload1, ?QOS_1),
  863. %% 3.
  864. {ok, Client2} = emqtt:start_link([
  865. {proto_ver, v5},
  866. {clientid, ClientId},
  867. {properties, #{'Session-Expiry-Interval' => 30}},
  868. {clean_start, true}
  869. | Config
  870. ]),
  871. {ok, _} = emqtt:ConnFun(Client2),
  872. ?assertEqual(0, client_info(session_present, Client2)),
  873. {ok, _, [1]} = emqtt:subscribe(Client2, STopic, qos1),
  874. timer:sleep(100),
  875. ok = publish(Topic, Payload2, ?QOS_1),
  876. [Msg1] = receive_messages(1),
  877. ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg1)),
  878. pong = emqtt:ping(Client2),
  879. ok = emqtt:disconnect(Client2),
  880. maybe_kill_connection_process(ClientId, Config),
  881. %% 4.
  882. {ok, Client3} = emqtt:start_link([
  883. {proto_ver, v5},
  884. {clientid, ClientId},
  885. {properties, #{'Session-Expiry-Interval' => 30}},
  886. {clean_start, false}
  887. | Config
  888. ]),
  889. {ok, _} = emqtt:ConnFun(Client3),
  890. ok = publish(Topic, Payload3, ?QOS_1),
  891. [Msg2] = receive_messages(1),
  892. ?assertEqual({ok, iolist_to_binary(Payload3)}, maps:find(payload, Msg2)),
  893. pong = emqtt:ping(Client3),
  894. ok = emqtt:disconnect(Client3).
  895. t_unsubscribe(Config) ->
  896. ConnFun = ?config(conn_fun, Config),
  897. STopic = ?config(stopic, Config),
  898. ClientId = ?config(client_id, Config),
  899. {ok, Client} = emqtt:start_link([
  900. {clientid, ClientId},
  901. {proto_ver, v5},
  902. {properties, #{'Session-Expiry-Interval' => 30}}
  903. | Config
  904. ]),
  905. {ok, _} = emqtt:ConnFun(Client),
  906. {ok, _, [2]} = emqtt:subscribe(Client, STopic, qos2),
  907. ?assertMatch([_], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
  908. {ok, _, _} = emqtt:unsubscribe(Client, STopic),
  909. ?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
  910. ok = emqtt:disconnect(Client).
  911. %% This testcase verifies that un-acked messages that were once sent
  912. %% to the client are still retransmitted after the session
  913. %% unsubscribes from the topic and reconnects.
  914. t_unsubscribe_replay(Config) ->
  915. ConnFun = ?config(conn_fun, Config),
  916. TopicPrefix = ?config(topic, Config),
  917. ClientId = atom_to_binary(?FUNCTION_NAME),
  918. ClientOpts = [
  919. {proto_ver, v5},
  920. {clientid, ClientId},
  921. {properties, #{'Session-Expiry-Interval' => 30, 'Receive-Maximum' => 10}},
  922. {max_inflight, 10}
  923. | Config
  924. ],
  925. {ok, Sub} = emqtt:start_link([{clean_start, true}, {auto_ack, never} | ClientOpts]),
  926. {ok, _} = emqtt:ConnFun(Sub),
  927. %% 1. Make two subscriptions, one is to be deleted:
  928. Topic1 = iolist_to_binary([TopicPrefix, $/, <<"unsub">>]),
  929. Topic2 = iolist_to_binary([TopicPrefix, $/, <<"sub">>]),
  930. ?assertMatch({ok, _, _}, emqtt:subscribe(Sub, Topic1, qos2)),
  931. ?assertMatch({ok, _, _}, emqtt:subscribe(Sub, Topic2, qos2)),
  932. %% 2. Publish 2 messages to the first and second topics each
  933. %% (client doesn't ack them):
  934. ok = publish(Topic1, <<"1">>, ?QOS_1),
  935. ok = publish(Topic1, <<"2">>, ?QOS_2),
  936. [Msg1, Msg2] = receive_messages(2),
  937. ?assertMatch(
  938. [
  939. #{payload := <<"1">>},
  940. #{payload := <<"2">>}
  941. ],
  942. [Msg1, Msg2]
  943. ),
  944. ok = publish(Topic2, <<"3">>, ?QOS_1),
  945. ok = publish(Topic2, <<"4">>, ?QOS_2),
  946. [Msg3, Msg4] = receive_messages(2),
  947. ?assertMatch(
  948. [
  949. #{payload := <<"3">>},
  950. #{payload := <<"4">>}
  951. ],
  952. [Msg3, Msg4]
  953. ),
  954. %% 3. Unsubscribe from the topic and disconnect:
  955. ?assertMatch({ok, _, _}, emqtt:unsubscribe(Sub, Topic1)),
  956. ok = emqtt:disconnect(Sub),
  957. %% 5. Publish more messages to the disconnected topic:
  958. ok = publish(Topic1, <<"5">>, ?QOS_1),
  959. ok = publish(Topic1, <<"6">>, ?QOS_2),
  960. %% 4. Reconnect the client. It must only receive only four
  961. %% messages from the time when it was subscribed:
  962. {ok, Sub1} = emqtt:start_link([{clean_start, false}, {auto_ack, true} | ClientOpts]),
  963. ?assertMatch({ok, _}, emqtt:ConnFun(Sub1)),
  964. %% Note: we ask for 6 messages, but expect only 4, it's
  965. %% intentional:
  966. ?assertMatch(
  967. #{
  968. Topic1 := [<<"1">>, <<"2">>],
  969. Topic2 := [<<"3">>, <<"4">>]
  970. },
  971. get_topicwise_order(receive_messages(6, 5_000)),
  972. debug_info(ClientId)
  973. ),
  974. %% 5. Now let's resubscribe, and check that the session can receive new messages:
  975. ?assertMatch({ok, _, _}, emqtt:subscribe(Sub1, Topic1, qos2)),
  976. ok = publish(Topic1, <<"7">>, ?QOS_0),
  977. ok = publish(Topic1, <<"8">>, ?QOS_1),
  978. ok = publish(Topic1, <<"9">>, ?QOS_2),
  979. ?assertMatch(
  980. [<<"7">>, <<"8">>, <<"9">>],
  981. lists:map(fun get_msgpub_payload/1, receive_messages(3))
  982. ),
  983. ok = emqtt:disconnect(Sub1).
  984. %% This testcase verifies that persistent sessions handle "transient"
  985. %% mesages correctly.
  986. %%
  987. %% Transient messages are delivered to the channel directly, bypassing
  988. %% the broker code that decides whether the messages should be
  989. %% persisted or not, and therefore they are not persisted.
  990. %%
  991. %% `emqx_retainer' is an example of application that uses this
  992. %% mechanism.
  993. %%
  994. %% This testcase creates the conditions when the transient messages
  995. %% appear in the middle of the replay, to make sure the durable
  996. %% session doesn't get confused and/or stuck if retained messages are
  997. %% changed while the session was down.
  998. t_transient(Config) ->
  999. ConnFun = ?config(conn_fun, Config),
  1000. TopicPrefix = ?config(topic, Config),
  1001. ClientId = atom_to_binary(?FUNCTION_NAME),
  1002. ClientOpts = [
  1003. {proto_ver, v5},
  1004. {clientid, ClientId},
  1005. {properties, #{'Session-Expiry-Interval' => 30, 'Receive-Maximum' => 100}},
  1006. {max_inflight, 100}
  1007. | Config
  1008. ],
  1009. Deliver = fun(Topic, Payload, QoS) ->
  1010. [Pid] = emqx_cm:lookup_channels(ClientId),
  1011. Msg = emqx_message:make(_From = <<"test">>, QoS, Topic, Payload),
  1012. Pid ! {deliver, Topic, Msg}
  1013. end,
  1014. Topic1 = <<TopicPrefix/binary, "/1">>,
  1015. Topic2 = <<TopicPrefix/binary, "/2">>,
  1016. Topic3 = <<TopicPrefix/binary, "/3">>,
  1017. %% 1. Start the client and subscribe to the topic:
  1018. {ok, Sub} = emqtt:start_link([{clean_start, true}, {auto_ack, never} | ClientOpts]),
  1019. ?assertMatch({ok, _}, emqtt:ConnFun(Sub)),
  1020. ?assertMatch({ok, _, _}, emqtt:subscribe(Sub, <<TopicPrefix/binary, "/#">>, qos2)),
  1021. %% 2. Publish regular messages:
  1022. publish(Topic1, <<"1">>, ?QOS_1),
  1023. publish(Topic1, <<"2">>, ?QOS_2),
  1024. Msgs1 = receive_messages(2),
  1025. [#{payload := <<"1">>, packet_id := PI1}, #{payload := <<"2">>, packet_id := PI2}] = Msgs1,
  1026. %% 3. Publish and recieve transient messages:
  1027. Deliver(Topic2, <<"3">>, ?QOS_0),
  1028. Deliver(Topic2, <<"4">>, ?QOS_1),
  1029. Deliver(Topic2, <<"5">>, ?QOS_2),
  1030. Msgs2 = receive_messages(3),
  1031. ?assertMatch(
  1032. [
  1033. #{payload := <<"3">>, qos := ?QOS_0},
  1034. #{payload := <<"4">>, qos := ?QOS_1},
  1035. #{payload := <<"5">>, qos := ?QOS_2}
  1036. ],
  1037. Msgs2
  1038. ),
  1039. %% 4. Publish more regular messages:
  1040. publish(Topic3, <<"6">>, ?QOS_1),
  1041. publish(Topic3, <<"7">>, ?QOS_2),
  1042. Msgs3 = receive_messages(2),
  1043. [#{payload := <<"6">>, packet_id := PI6}, #{payload := <<"7">>, packet_id := PI7}] = Msgs3,
  1044. %% 5. Reconnect the client:
  1045. ok = emqtt:disconnect(Sub),
  1046. {ok, Sub1} = emqtt:start_link([{clean_start, false}, {auto_ack, true} | ClientOpts]),
  1047. ?assertMatch({ok, _}, emqtt:ConnFun(Sub1)),
  1048. %% 6. Recieve the historic messages and check that their packet IDs didn't change:
  1049. %% Note: durable session currenty WON'T replay transient messages.
  1050. ProcessMessage = fun(#{payload := P, packet_id := ID}) -> {ID, P} end,
  1051. ?assertMatch(
  1052. #{
  1053. Topic1 := [{PI1, <<"1">>}, {PI2, <<"2">>}],
  1054. Topic3 := [{PI6, <<"6">>}, {PI7, <<"7">>}]
  1055. },
  1056. maps:groups_from_list(fun get_msgpub_topic/1, ProcessMessage, receive_messages(7, 5_000))
  1057. ),
  1058. %% 7. Finish off by sending messages to all the topics to make
  1059. %% sure none of the streams are blocked:
  1060. [publish(T, <<"fin">>, ?QOS_2) || T <- [Topic1, Topic2, Topic3]],
  1061. ?assertMatch(
  1062. #{
  1063. Topic1 := [<<"fin">>],
  1064. Topic2 := [<<"fin">>],
  1065. Topic3 := [<<"fin">>]
  1066. },
  1067. get_topicwise_order(receive_messages(3))
  1068. ),
  1069. ok = emqtt:disconnect(Sub1).
  1070. t_multiple_subscription_matches(Config) ->
  1071. ConnFun = ?config(conn_fun, Config),
  1072. Topic = ?config(topic, Config),
  1073. STopic1 = ?config(stopic, Config),
  1074. STopic2 = ?config(stopic_alt, Config),
  1075. Payload = <<"test message">>,
  1076. ClientId = ?config(client_id, Config),
  1077. {ok, Client1} = emqtt:start_link([
  1078. {clientid, ClientId},
  1079. {proto_ver, v5},
  1080. {properties, #{'Session-Expiry-Interval' => 30}}
  1081. | Config
  1082. ]),
  1083. {ok, _} = emqtt:ConnFun(Client1),
  1084. {ok, _, [2]} = emqtt:subscribe(Client1, STopic1, qos2),
  1085. {ok, _, [2]} = emqtt:subscribe(Client1, STopic2, qos2),
  1086. ok = emqtt:disconnect(Client1),
  1087. maybe_kill_connection_process(ClientId, Config),
  1088. publish(Topic, Payload),
  1089. {ok, Client2} = emqtt:start_link([
  1090. {clientid, ClientId},
  1091. {proto_ver, v5},
  1092. {properties, #{'Session-Expiry-Interval' => 30}},
  1093. {clean_start, false}
  1094. | Config
  1095. ]),
  1096. {ok, _} = emqtt:ConnFun(Client2),
  1097. %% We will receive the same message twice because it matches two subscriptions.
  1098. [Msg1, Msg2] = receive_messages(2),
  1099. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg1)),
  1100. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg1)),
  1101. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg2)),
  1102. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg2)),
  1103. ?assertEqual({ok, 2}, maps:find(qos, Msg1)),
  1104. ?assertEqual({ok, 2}, maps:find(qos, Msg2)),
  1105. ok = emqtt:disconnect(Client2).
  1106. %% Check that we don't get a will message when the client disconnects with success reason
  1107. %% code, with `Will-Delay-Interval' = 0, `Session-Expiry-Interval' > 0, QoS = 1.
  1108. t_no_will_message(Config) ->
  1109. ConnFun = ?config(conn_fun, Config),
  1110. WillTopic = ?config(topic, Config),
  1111. WillPayload = <<"will message">>,
  1112. ClientId = ?config(client_id, Config),
  1113. ?check_trace(
  1114. #{timetrap => 15_000},
  1115. begin
  1116. ok = emqx:subscribe(WillTopic, #{qos => 2}),
  1117. {ok, Client} = emqtt:start_link([
  1118. {clientid, ClientId},
  1119. {proto_ver, v5},
  1120. {properties, #{'Session-Expiry-Interval' => 1}},
  1121. {will_topic, WillTopic},
  1122. {will_payload, WillPayload},
  1123. {will_qos, 1},
  1124. {will_props, #{'Will-Delay-Interval' => 0}}
  1125. | Config
  1126. ]),
  1127. {ok, _} = emqtt:ConnFun(Client),
  1128. ok = emqtt:disconnect(Client, ?RC_SUCCESS),
  1129. %% No will message
  1130. ?assertNotReceive({deliver, WillTopic, _}, 5_000),
  1131. ok
  1132. end,
  1133. []
  1134. ),
  1135. ok.
  1136. %% Check that we get a single will message when the client disconnects with a non
  1137. %% successfull reason code, with `Will-Delay-Interval' = `Session-Expiry-Interval' > 0,
  1138. %% QoS = 1.
  1139. t_will_message1(Config) ->
  1140. do_t_will_message(Config, #{will_delay => 1, session_expiry => 1}),
  1141. ok.
  1142. %% Check that we get a single will message when the client disconnects with a non
  1143. %% successfull reason code, with `Will-Delay-Interval' = 0, `Session-Expiry-Interval' > 0,
  1144. %% QoS = 1.
  1145. t_will_message2(Config) ->
  1146. do_t_will_message(Config, #{will_delay => 0, session_expiry => 1}),
  1147. ok.
  1148. %% Check that we get a single will message when the client disconnects with a non
  1149. %% successfull reason code, with `Will-Delay-Interval' >> `Session-Expiry-Interval' > 0,
  1150. %% QoS = 1.
  1151. t_will_message3(Config) ->
  1152. do_t_will_message(Config, #{will_delay => 300, session_expiry => 1}),
  1153. ok.
  1154. do_t_will_message(Config, Opts) ->
  1155. #{
  1156. session_expiry := SessionExpiry,
  1157. will_delay := WillDelay
  1158. } = Opts,
  1159. ConnFun = ?config(conn_fun, Config),
  1160. WillTopic = ?config(topic, Config),
  1161. WillPayload = <<"will message">>,
  1162. ClientId = ?config(client_id, Config),
  1163. ?check_trace(
  1164. #{timetrap => 15_000},
  1165. begin
  1166. ok = emqx:subscribe(WillTopic, #{qos => 2}),
  1167. {ok, Client} = emqtt:start_link([
  1168. {clientid, ClientId},
  1169. {proto_ver, v5},
  1170. {properties, #{'Session-Expiry-Interval' => SessionExpiry}},
  1171. {will_topic, WillTopic},
  1172. {will_payload, WillPayload},
  1173. {will_qos, 1},
  1174. {will_props, #{'Will-Delay-Interval' => WillDelay}}
  1175. | Config
  1176. ]),
  1177. {ok, _} = emqtt:ConnFun(Client),
  1178. ok = emqtt:disconnect(Client, ?RC_UNSPECIFIED_ERROR),
  1179. ?assertReceive({deliver, WillTopic, #message{payload = WillPayload}}, 10_000),
  1180. %% No duplicates
  1181. ?assertNotReceive({deliver, WillTopic, _}, 100),
  1182. ok
  1183. end,
  1184. []
  1185. ),
  1186. ok.
  1187. get_topicwise_order(Msgs) ->
  1188. maps:groups_from_list(fun get_msgpub_topic/1, fun get_msgpub_payload/1, Msgs).
  1189. get_msgpub_topic(#mqtt_msg{topic = Topic}) ->
  1190. Topic;
  1191. get_msgpub_topic(#{topic := Topic}) ->
  1192. Topic.
  1193. get_msgpub_payload(#mqtt_msg{payload = Payload}) ->
  1194. Payload;
  1195. get_msgpub_payload(#{payload := Payload}) ->
  1196. Payload.
  1197. get_msg_essentials(Msg = #{}) ->
  1198. maps:with([packet_id, topic, payload, qos], Msg);
  1199. get_msg_essentials({pubrel, Msg}) ->
  1200. {pubrel, maps:with([packet_id, reason_code], Msg)}.
  1201. get_msgs_essentials(Msgs) ->
  1202. [get_msg_essentials(M) || M <- Msgs].
  1203. pick_respective_msgs(MsgRefs, Msgs) ->
  1204. [M || M <- Msgs, Ref <- MsgRefs, maps:get(packet_id, M) =:= maps:get(packet_id, Ref)].
  1205. debug_info(ClientId) ->
  1206. Info = emqx_persistent_session_ds:print_session(ClientId),
  1207. ct:pal("*** State:~n~p", [Info]).