emqx_persistent_session_SUITE.erl 48 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_persistent_session_SUITE).
  17. -include_lib("stdlib/include/assert.hrl").
  18. -include_lib("emqx/include/asserts.hrl").
  19. -include_lib("common_test/include/ct.hrl").
  20. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  21. -include_lib("emqx/include/emqx_mqtt.hrl").
  22. -include_lib("emqx_utils/include/emqx_message.hrl").
  23. -compile(export_all).
  24. -compile(nowarn_export_all).
  25. -include("emqx_persistent_message.hrl").
  26. -define(EMQX_CONFIG, "sys_topics.sys_heartbeat_interval = 1s\n").
  27. %%--------------------------------------------------------------------
  28. %% SUITE boilerplate
  29. %%--------------------------------------------------------------------
  30. all() ->
  31. [
  32. % NOTE
  33. % Tests are disabled while existing session persistence impl is being
  34. % phased out.
  35. {group, persistence_disabled},
  36. {group, persistence_enabled}
  37. ].
  38. %% A persistent session can be resumed in two ways:
  39. %% 1. The old connection process is still alive, and the session is taken
  40. %% over by the new connection.
  41. %% 2. The old session process has died (e.g., because of node down).
  42. %% The new process resumes the session from the stored state, and finds
  43. %% any subscribed messages from the persistent message store.
  44. %%
  45. %% We want to test both ways, both with the db backend enabled and disabled.
  46. %%
  47. %% In addition, we test both tcp and quic connections.
  48. groups() ->
  49. TCs = emqx_common_test_helpers:all(?MODULE),
  50. TCsNonGeneric = [t_choose_impl, t_transient],
  51. TCGroups = [{group, tcp}, {group, quic}, {group, ws}],
  52. [
  53. {persistence_disabled, TCGroups},
  54. {persistence_enabled, TCGroups},
  55. {tcp, [], TCs},
  56. {quic, [], TCs -- TCsNonGeneric},
  57. {ws, [], TCs -- TCsNonGeneric}
  58. ].
  59. init_per_group(persistence_disabled, Config) ->
  60. [
  61. {emqx_config, ?EMQX_CONFIG ++ "durable_sessions { enable = false }"},
  62. {persistence, false}
  63. | Config
  64. ];
  65. init_per_group(persistence_enabled, Config) ->
  66. [
  67. {emqx_config,
  68. ?EMQX_CONFIG ++
  69. "durable_sessions {\n"
  70. " enable = true\n"
  71. " heartbeat_interval = 100ms\n"
  72. " renew_streams_interval = 100ms\n"
  73. " session_gc_interval = 2s\n"
  74. "}"},
  75. {persistence, ds}
  76. | Config
  77. ];
  78. init_per_group(tcp, Config) ->
  79. Apps = emqx_cth_suite:start(
  80. [{emqx, ?config(emqx_config, Config)}],
  81. #{work_dir => emqx_cth_suite:work_dir(Config)}
  82. ),
  83. [
  84. {port, get_listener_port(tcp, default)},
  85. {conn_fun, connect},
  86. {group_apps, Apps}
  87. | Config
  88. ];
  89. init_per_group(ws, Config) ->
  90. Apps = emqx_cth_suite:start(
  91. [{emqx, ?config(emqx_config, Config)}],
  92. #{work_dir => emqx_cth_suite:work_dir(Config)}
  93. ),
  94. [
  95. {ssl, false},
  96. {host, "localhost"},
  97. {enable_websocket, true},
  98. {port, get_listener_port(ws, default)},
  99. {conn_fun, ws_connect},
  100. {group_apps, Apps}
  101. | Config
  102. ];
  103. init_per_group(quic, Config) ->
  104. Apps = emqx_cth_suite:start(
  105. [
  106. {emqx,
  107. ?config(emqx_config, Config) ++
  108. "\n listeners.quic.test {"
  109. "\n enable = true"
  110. "\n ssl_options.verify = verify_peer"
  111. "\n }"}
  112. ],
  113. #{work_dir => emqx_cth_suite:work_dir(Config)}
  114. ),
  115. [
  116. {port, get_listener_port(quic, test)},
  117. {conn_fun, quic_connect},
  118. {ssl_opts, emqx_common_test_helpers:client_mtls()},
  119. {ssl, true},
  120. {group_apps, Apps}
  121. | Config
  122. ].
  123. get_listener_port(Type, Name) ->
  124. case emqx_config:get([listeners, Type, Name, bind]) of
  125. {_, Port} -> Port;
  126. Port -> Port
  127. end.
  128. end_per_group(Group, Config) when Group == tcp; Group == ws; Group == quic ->
  129. ok = emqx_cth_suite:stop(?config(group_apps, Config));
  130. end_per_group(_, _Config) ->
  131. catch emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB),
  132. ok.
  133. init_per_testcase(TestCase, Config) ->
  134. Config1 = preconfig_per_testcase(TestCase, Config),
  135. case erlang:function_exported(?MODULE, TestCase, 2) of
  136. true -> ?MODULE:TestCase(init, Config1);
  137. _ -> Config1
  138. end.
  139. end_per_testcase(TestCase, Config) ->
  140. case erlang:function_exported(?MODULE, TestCase, 2) of
  141. true -> ?MODULE:TestCase('end', Config);
  142. false -> ok
  143. end,
  144. Config.
  145. preconfig_per_testcase(TestCase, Config) ->
  146. {BaseName, Config1} =
  147. case ?config(tc_group_properties, Config) of
  148. [] ->
  149. %% We are running a single testcase
  150. {
  151. atom_to_binary(TestCase),
  152. init_per_group(tcp, init_per_group(kill_connection_process, Config))
  153. };
  154. [_ | _] = Props ->
  155. Path = lists:reverse(?config(tc_group_path, Config) ++ Props),
  156. Pre0 = [atom_to_list(N) || {name, N} <- lists:flatten(Path)],
  157. Pre1 = lists:join("_", Pre0 ++ [atom_to_binary(TestCase)]),
  158. {iolist_to_binary(Pre1), Config}
  159. end,
  160. [
  161. {topic, iolist_to_binary([BaseName, "/foo"])},
  162. {stopic, iolist_to_binary([BaseName, "/+"])},
  163. {stopic_alt, iolist_to_binary([BaseName, "/foo"])},
  164. {client_id, BaseName}
  165. | Config1
  166. ].
  167. %%--------------------------------------------------------------------
  168. %% Helpers
  169. %%--------------------------------------------------------------------
  170. client_info(Key, Client) ->
  171. maps:get(Key, maps:from_list(emqtt:info(Client)), undefined).
  172. receive_messages(Count) ->
  173. receive_messages(Count, 15000).
  174. receive_messages(Count, Timeout) ->
  175. Deadline = erlang:monotonic_time(millisecond) + Timeout,
  176. receive_message_loop(Count, Deadline).
  177. receive_message_loop(0, _Deadline) ->
  178. [];
  179. receive_message_loop(Count, Deadline) ->
  180. Timeout = max(0, Deadline - erlang:monotonic_time(millisecond)),
  181. receive
  182. {publish, Msg} ->
  183. [Msg | receive_message_loop(Count - 1, Deadline)];
  184. {pubrel, Msg} ->
  185. [{pubrel, Msg} | receive_message_loop(Count - 1, Deadline)];
  186. _Other ->
  187. receive_message_loop(Count, Deadline)
  188. after Timeout ->
  189. []
  190. end.
  191. maybe_kill_connection_process(ClientId, Config) ->
  192. Persistence = ?config(persistence, Config),
  193. case emqx_cm:lookup_channels(ClientId) of
  194. [] ->
  195. ok;
  196. [ConnectionPid] when Persistence == ds ->
  197. Ref = monitor(process, ConnectionPid),
  198. ConnectionPid ! die_if_test,
  199. ?assertReceive(
  200. {'DOWN', Ref, process, ConnectionPid, Reason} when
  201. Reason == normal orelse Reason == noproc,
  202. 3000
  203. ),
  204. wait_connection_process_unregistered(ClientId);
  205. _ ->
  206. ok
  207. end.
  208. wait_connection_process_dies(ClientId) ->
  209. case emqx_cm:lookup_channels(ClientId) of
  210. [] ->
  211. ok;
  212. [ConnectionPid] ->
  213. Ref = monitor(process, ConnectionPid),
  214. ?assertReceive(
  215. {'DOWN', Ref, process, ConnectionPid, Reason} when
  216. Reason == normal orelse Reason == noproc,
  217. 3000
  218. ),
  219. wait_connection_process_unregistered(ClientId)
  220. end.
  221. wait_connection_process_unregistered(ClientId) ->
  222. ?retry(
  223. _Timeout = 100,
  224. _Retries = 20,
  225. ?assertEqual([], emqx_cm:lookup_channels(ClientId))
  226. ).
  227. wait_channel_disconnected(ClientId) ->
  228. ?retry(
  229. _Timeout = 100,
  230. _Retries = 20,
  231. case emqx_cm:lookup_channels(ClientId) of
  232. [] ->
  233. false;
  234. [ChanPid] ->
  235. false = emqx_cm:is_channel_connected(ChanPid)
  236. end
  237. ).
  238. disconnect_client(ClientPid) ->
  239. ClientId = proplists:get_value(clientid, emqtt:info(ClientPid)),
  240. ok = emqtt:disconnect(ClientPid),
  241. false = wait_channel_disconnected(ClientId),
  242. ok.
  243. messages(Topic, Payloads) ->
  244. messages(Topic, Payloads, ?QOS_2).
  245. messages(Topic, Payloads, QoS) ->
  246. lists:map(
  247. fun
  248. (Bin) when is_binary(Bin) ->
  249. #mqtt_msg{topic = Topic, payload = Bin, qos = QoS};
  250. (Msg = #mqtt_msg{}) ->
  251. Msg#mqtt_msg{topic = Topic}
  252. end,
  253. Payloads
  254. ).
  255. publish(Topic, Payload) ->
  256. publish(Topic, Payload, ?QOS_2).
  257. publish(Topic, Payload, QoS) ->
  258. publish_many(messages(Topic, [Payload], QoS)).
  259. publish_many(Messages) ->
  260. publish_many(Messages, false).
  261. publish_many(Messages, WaitForUnregister) ->
  262. Fun = fun(Client, Message) ->
  263. case emqtt:publish(Client, Message) of
  264. ok -> ok;
  265. {ok, _} -> ok
  266. end
  267. end,
  268. do_publish(Messages, Fun, WaitForUnregister).
  269. do_publish(Messages = [_ | _], PublishFun, WaitForUnregister) ->
  270. %% Publish from another process to avoid connection confusion.
  271. {Pid, Ref} =
  272. spawn_monitor(
  273. fun() ->
  274. %% For convenience, always publish using tcp.
  275. %% The publish path is not what we are testing.
  276. ClientID = <<"ps_SUITE_publisher">>,
  277. {ok, Client} = emqtt:start_link([
  278. {proto_ver, v5},
  279. {clientid, ClientID},
  280. {port, 1883}
  281. ]),
  282. {ok, _} = emqtt:connect(Client),
  283. lists:foreach(fun(Message) -> PublishFun(Client, Message) end, Messages),
  284. ok = emqtt:disconnect(Client),
  285. %% Snabbkaffe sometimes fails unless all processes are gone.
  286. WaitForUnregister andalso wait_connection_process_dies(ClientID)
  287. end
  288. ),
  289. receive
  290. {'DOWN', Ref, process, Pid, normal} -> ok;
  291. {'DOWN', Ref, process, Pid, What} -> error({failed_publish, What})
  292. end.
  293. %%--------------------------------------------------------------------
  294. %% Test Cases
  295. %%--------------------------------------------------------------------
  296. t_choose_impl(Config) ->
  297. ClientId = ?config(client_id, Config),
  298. ConnFun = ?config(conn_fun, Config),
  299. {ok, Client} = emqtt:start_link([
  300. {clientid, ClientId},
  301. {proto_ver, v5},
  302. {properties, #{'Session-Expiry-Interval' => 30}}
  303. | Config
  304. ]),
  305. {ok, _} = emqtt:ConnFun(Client),
  306. [ChanPid] = emqx_cm:lookup_channels(ClientId),
  307. ?assertEqual(
  308. case ?config(persistence, Config) of
  309. false -> emqx_session_mem;
  310. ds -> emqx_persistent_session_ds
  311. end,
  312. emqx_connection:info({channel, {session, impl}}, sys:get_state(ChanPid))
  313. ),
  314. ok = emqtt:disconnect(Client).
  315. t_connect_discards_existing_client(Config) ->
  316. ClientId = ?config(client_id, Config),
  317. ConnFun = ?config(conn_fun, Config),
  318. ClientOpts = [
  319. {clientid, ClientId},
  320. {proto_ver, v5},
  321. {properties, #{'Session-Expiry-Interval' => 30}}
  322. | Config
  323. ],
  324. {ok, Client1} = emqtt:start_link(ClientOpts),
  325. true = unlink(Client1),
  326. MRef = erlang:monitor(process, Client1),
  327. {ok, _} = emqtt:ConnFun(Client1),
  328. {ok, Client2} = emqtt:start_link(ClientOpts),
  329. {ok, _} = emqtt:ConnFun(Client2),
  330. receive
  331. {'DOWN', MRef, process, Client1, Reason} ->
  332. ok = ?assertMatch({disconnected, ?RC_SESSION_TAKEN_OVER, _}, Reason),
  333. ok = emqtt:stop(Client2),
  334. ok
  335. after 1000 ->
  336. error({client_still_connected, Client1})
  337. end.
  338. %% [MQTT-3.1.2-23]
  339. t_connect_session_expiry_interval(Config) ->
  340. ConnFun = ?config(conn_fun, Config),
  341. Topic = ?config(topic, Config),
  342. STopic = ?config(stopic, Config),
  343. Payload = <<"test message">>,
  344. ClientId = ?config(client_id, Config),
  345. {ok, Client1} = emqtt:start_link([
  346. {clientid, ClientId},
  347. {proto_ver, v5},
  348. {properties, #{'Session-Expiry-Interval' => 30}}
  349. | Config
  350. ]),
  351. {ok, _} = emqtt:ConnFun(Client1),
  352. {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Client1, STopic, ?QOS_1),
  353. ok = emqtt:disconnect(Client1),
  354. maybe_kill_connection_process(ClientId, Config),
  355. publish(Topic, Payload, ?QOS_1),
  356. {ok, Client2} = emqtt:start_link([
  357. {clientid, ClientId},
  358. {proto_ver, v5},
  359. {properties, #{'Session-Expiry-Interval' => 30}},
  360. {clean_start, false}
  361. | Config
  362. ]),
  363. {ok, _} = emqtt:ConnFun(Client2),
  364. [Msg | _] = receive_messages(1),
  365. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)),
  366. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)),
  367. ?assertEqual({ok, ?QOS_1}, maps:find(qos, Msg)),
  368. ok = emqtt:disconnect(Client2).
  369. %% [MQTT-3.1.2-23]
  370. t_connect_session_expiry_interval_qos2(Config) ->
  371. ConnFun = ?config(conn_fun, Config),
  372. Topic = ?config(topic, Config),
  373. STopic = ?config(stopic, Config),
  374. Payload = <<"test message">>,
  375. ClientId = ?config(client_id, Config),
  376. {ok, Client1} = emqtt:start_link([
  377. {clientid, ClientId},
  378. {proto_ver, v5},
  379. {properties, #{'Session-Expiry-Interval' => 30}}
  380. | Config
  381. ]),
  382. {ok, _} = emqtt:ConnFun(Client1),
  383. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  384. ok = emqtt:disconnect(Client1),
  385. maybe_kill_connection_process(ClientId, Config),
  386. publish(Topic, Payload),
  387. {ok, Client2} = emqtt:start_link([
  388. {clientid, ClientId},
  389. {proto_ver, v5},
  390. {properties, #{'Session-Expiry-Interval' => 30}},
  391. {clean_start, false}
  392. | Config
  393. ]),
  394. {ok, _} = emqtt:ConnFun(Client2),
  395. [Msg | _] = receive_messages(1),
  396. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)),
  397. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)),
  398. ?assertEqual({ok, 2}, maps:find(qos, Msg)),
  399. ok = emqtt:disconnect(Client2).
  400. t_without_client_id(Config) ->
  401. %% Emqtt client dies
  402. process_flag(trap_exit, true),
  403. ConnFun = ?config(conn_fun, Config),
  404. {ok, Client0} = emqtt:start_link([
  405. {proto_ver, v5},
  406. {properties, #{'Session-Expiry-Interval' => 30}},
  407. {clean_start, false}
  408. | Config
  409. ]),
  410. {error, {client_identifier_not_valid, _}} = emqtt:ConnFun(Client0),
  411. ok.
  412. t_assigned_clientid_persistent_session(Config) ->
  413. ConnFun = ?config(conn_fun, Config),
  414. {ok, Client1} = emqtt:start_link([
  415. {proto_ver, v5},
  416. {properties, #{'Session-Expiry-Interval' => 30}},
  417. {clean_start, true}
  418. | Config
  419. ]),
  420. {ok, _} = emqtt:ConnFun(Client1),
  421. AssignedClientId = client_info(clientid, Client1),
  422. ok = emqtt:disconnect(Client1),
  423. maybe_kill_connection_process(AssignedClientId, Config),
  424. {ok, Client2} = emqtt:start_link([
  425. {clientid, AssignedClientId},
  426. {proto_ver, v5},
  427. {clean_start, false}
  428. | Config
  429. ]),
  430. {ok, _} = emqtt:ConnFun(Client2),
  431. ?assertEqual(1, client_info(session_present, Client2)),
  432. ok = emqtt:disconnect(Client2).
  433. t_cancel_on_disconnect(Config) ->
  434. %% Open a persistent session, but cancel the persistence when
  435. %% shutting down the connection.
  436. ConnFun = ?config(conn_fun, Config),
  437. ClientId = ?config(client_id, Config),
  438. {ok, Client1} = emqtt:start_link([
  439. {proto_ver, v5},
  440. {clientid, ClientId},
  441. {properties, #{'Session-Expiry-Interval' => 30}},
  442. {clean_start, true}
  443. | Config
  444. ]),
  445. {ok, _} = emqtt:ConnFun(Client1),
  446. ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 0}),
  447. wait_connection_process_unregistered(ClientId),
  448. {ok, Client2} = emqtt:start_link([
  449. {clientid, ClientId},
  450. {proto_ver, v5},
  451. {clean_start, false},
  452. {properties, #{'Session-Expiry-Interval' => 30}}
  453. | Config
  454. ]),
  455. {ok, _} = emqtt:ConnFun(Client2),
  456. ?assertEqual(0, client_info(session_present, Client2)),
  457. ok = emqtt:disconnect(Client2).
  458. t_persist_on_disconnect(Config) ->
  459. %% Open a non-persistent session, but add the persistence when
  460. %% shutting down the connection. This is a protocol error, and
  461. %% should not convert the session into a persistent session.
  462. ConnFun = ?config(conn_fun, Config),
  463. ClientId = ?config(client_id, Config),
  464. {ok, Client1} = emqtt:start_link([
  465. {proto_ver, v5},
  466. {clientid, ClientId},
  467. {properties, #{'Session-Expiry-Interval' => 0}},
  468. {clean_start, true}
  469. | Config
  470. ]),
  471. {ok, _} = emqtt:ConnFun(Client1),
  472. %% Strangely enough, the disconnect is reported as successful by emqtt.
  473. ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 30}),
  474. wait_connection_process_unregistered(ClientId),
  475. {ok, Client2} = emqtt:start_link([
  476. {clientid, ClientId},
  477. {proto_ver, v5},
  478. {clean_start, false},
  479. {properties, #{'Session-Expiry-Interval' => 30}}
  480. | Config
  481. ]),
  482. {ok, _} = emqtt:ConnFun(Client2),
  483. %% The session should not be known, since it wasn't persisted because of the
  484. %% changed expiry interval in the disconnect call.
  485. ?assertEqual(0, client_info(session_present, Client2)),
  486. ok = emqtt:disconnect(Client2).
  487. t_process_dies_session_expires(Config) ->
  488. %% Emulate an error in the connect process,
  489. %% or that the node of the process goes down.
  490. %% A persistent session should eventually expire.
  491. ?check_trace(
  492. begin
  493. ConnFun = ?config(conn_fun, Config),
  494. ClientId = ?config(client_id, Config),
  495. Topic = ?config(topic, Config),
  496. STopic = ?config(stopic, Config),
  497. Payload = <<"test">>,
  498. {ok, Client1} = emqtt:start_link([
  499. {proto_ver, v5},
  500. {clientid, ClientId},
  501. {properties, #{'Session-Expiry-Interval' => 1}},
  502. {clean_start, true}
  503. | Config
  504. ]),
  505. {ok, _} = emqtt:ConnFun(Client1),
  506. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  507. ok = emqtt:disconnect(Client1),
  508. maybe_kill_connection_process(ClientId, Config),
  509. ok = publish(Topic, Payload),
  510. timer:sleep(1500),
  511. {ok, Client2} = emqtt:start_link([
  512. {proto_ver, v5},
  513. {clientid, ClientId},
  514. {properties, #{'Session-Expiry-Interval' => 30}},
  515. {clean_start, false}
  516. | Config
  517. ]),
  518. {ok, _} = emqtt:ConnFun(Client2),
  519. ?assertEqual(0, client_info(session_present, Client2)),
  520. %% We should not receive the pending message
  521. ?assertEqual([], receive_messages(1)),
  522. emqtt:disconnect(Client2)
  523. end,
  524. []
  525. ).
  526. t_publish_while_client_is_gone_qos1(Config) ->
  527. %% A persistent session should receive messages in its
  528. %% subscription even if the process owning the session dies.
  529. ConnFun = ?config(conn_fun, Config),
  530. Topic = ?config(topic, Config),
  531. STopic = ?config(stopic, Config),
  532. Payload1 = <<"hello1">>,
  533. Payload2 = <<"hello2">>,
  534. ClientId = ?config(client_id, Config),
  535. {ok, Client1} = emqtt:start_link([
  536. {proto_ver, v5},
  537. {clientid, ClientId},
  538. {properties, #{'Session-Expiry-Interval' => 30}},
  539. {clean_start, true}
  540. | Config
  541. ]),
  542. {ok, _} = emqtt:ConnFun(Client1),
  543. {ok, _, [1]} = emqtt:subscribe(Client1, STopic, qos1),
  544. ok = emqtt:disconnect(Client1),
  545. maybe_kill_connection_process(ClientId, Config),
  546. ok = publish_many(messages(Topic, [Payload1, Payload2], ?QOS_1)),
  547. {ok, Client2} = emqtt:start_link([
  548. {proto_ver, v5},
  549. {clientid, ClientId},
  550. {properties, #{'Session-Expiry-Interval' => 30}},
  551. {clean_start, false}
  552. | Config
  553. ]),
  554. {ok, _} = emqtt:ConnFun(Client2),
  555. Msgs = receive_messages(2),
  556. ?assertMatch([_, _], Msgs),
  557. [Msg1, Msg2] = Msgs,
  558. ?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)),
  559. ?assertEqual({ok, 1}, maps:find(qos, Msg1)),
  560. ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)),
  561. ?assertEqual({ok, 1}, maps:find(qos, Msg2)),
  562. ok = emqtt:disconnect(Client2).
  563. t_publish_many_while_client_is_gone_qos1(Config) ->
  564. %% A persistent session should receive all of the still unacked messages
  565. %% for its subscriptions after the client dies or reconnects, in addition
  566. %% to new messages that were published while the client was gone. The order
  567. %% of the messages should be consistent across reconnects.
  568. ClientId = ?config(client_id, Config),
  569. ConnFun = ?config(conn_fun, Config),
  570. {ok, Client1} = emqtt:start_link([
  571. {proto_ver, v5},
  572. {clientid, ClientId},
  573. {properties, #{'Session-Expiry-Interval' => 30}},
  574. {clean_start, true},
  575. {auto_ack, never}
  576. | Config
  577. ]),
  578. {ok, _} = emqtt:ConnFun(Client1),
  579. STopics = [
  580. <<"t/+/foo">>,
  581. <<"msg/feed/#">>,
  582. <<"loc/+/+/+">>
  583. ],
  584. [{ok, _, [?QOS_1]} = emqtt:subscribe(Client1, ST, ?QOS_1) || ST <- STopics],
  585. Pubs1 = [
  586. #mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M1">>, qos = 1},
  587. #mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M2">>, qos = 1},
  588. #mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M3">>, qos = 1},
  589. #mqtt_msg{topic = <<"loc/1/2/42">>, payload = <<"M4">>, qos = 1},
  590. #mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M5">>, qos = 1},
  591. #mqtt_msg{topic = <<"loc/3/4/5">>, payload = <<"M6">>, qos = 1},
  592. #mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M7">>, qos = 1}
  593. ],
  594. ok = publish_many(Pubs1),
  595. NPubs1 = length(Pubs1),
  596. Msgs1 = receive_messages(NPubs1),
  597. NMsgs1 = length(Msgs1),
  598. ?assertEqual(NPubs1, NMsgs1),
  599. ct:pal("Msgs1 = ~p", [Msgs1]),
  600. %% TODO
  601. %% This assertion doesn't currently hold because `emqx_ds` doesn't enforce
  602. %% strict ordering reflecting client publishing order. Instead, per-topic
  603. %% ordering is guaranteed per each client. In fact, this violates the MQTT
  604. %% specification, but we deemed it acceptable for now.
  605. %% ?assertMatch([
  606. %% #{payload := <<"M1">>},
  607. %% #{payload := <<"M2">>},
  608. %% #{payload := <<"M3">>},
  609. %% #{payload := <<"M4">>},
  610. %% #{payload := <<"M5">>},
  611. %% #{payload := <<"M6">>},
  612. %% #{payload := <<"M7">>}
  613. %% ], Msgs1),
  614. ?assertEqual(
  615. get_topicwise_order(Pubs1),
  616. get_topicwise_order(Msgs1)
  617. ),
  618. NAcked = 4,
  619. ?assert(NMsgs1 >= NAcked),
  620. [ok = emqtt:puback(Client1, PktId) || #{packet_id := PktId} <- lists:sublist(Msgs1, NAcked)],
  621. %% Ensure that PUBACKs are propagated to the channel.
  622. pong = emqtt:ping(Client1),
  623. ok = disconnect_client(Client1),
  624. maybe_kill_connection_process(ClientId, Config),
  625. Pubs2 = [
  626. #mqtt_msg{topic = <<"loc/3/4/6">>, payload = <<"M8">>, qos = 1},
  627. #mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M9">>, qos = 1},
  628. #mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M10">>, qos = 1},
  629. #mqtt_msg{topic = <<"msg/feed/friend">>, payload = <<"M11">>, qos = 1},
  630. #mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M12">>, qos = 1}
  631. ],
  632. ok = publish_many(Pubs2),
  633. NPubs2 = length(Pubs2),
  634. %% Now reconnect with auto ack to make sure all streams are
  635. %% replayed till the end:
  636. {ok, Client2} = emqtt:start_link([
  637. {proto_ver, v5},
  638. {clientid, ClientId},
  639. {properties, #{'Session-Expiry-Interval' => 30}},
  640. {clean_start, false}
  641. | Config
  642. ]),
  643. {ok, _} = emqtt:ConnFun(Client2),
  644. %% Try to receive _at most_ `NPubs` messages.
  645. %% There shouldn't be that much unacked messages in the replay anyway,
  646. %% but it's an easy number to pick.
  647. NPubs = NPubs1 + NPubs2,
  648. Msgs2 = receive_messages(NPubs, _Timeout = 2000),
  649. NMsgs2 = length(Msgs2),
  650. ct:pal("Msgs2 = ~p", [Msgs2]),
  651. ?assert(NMsgs2 < NPubs, {NMsgs2, '<', NPubs}),
  652. ?assert(NMsgs2 > NPubs2, {NMsgs2, '>', NPubs2}),
  653. ?assert(NMsgs2 >= NPubs - NAcked, Msgs2),
  654. NSame = NMsgs2 - NPubs2,
  655. ?assert(
  656. lists:all(fun(#{dup := Dup}) -> Dup end, lists:sublist(Msgs2, NSame))
  657. ),
  658. ?assertNot(
  659. lists:all(fun(#{dup := Dup}) -> Dup end, lists:nthtail(NSame, Msgs2))
  660. ),
  661. ?assertEqual(
  662. [maps:with([packet_id, topic, payload], M) || M <- lists:nthtail(NMsgs1 - NSame, Msgs1)],
  663. [maps:with([packet_id, topic, payload], M) || M <- lists:sublist(Msgs2, NSame)]
  664. ),
  665. ok = disconnect_client(Client2).
  666. t_publish_while_client_is_gone(Config) ->
  667. %% A persistent session should receive messages in its
  668. %% subscription even if the process owning the session dies.
  669. ConnFun = ?config(conn_fun, Config),
  670. Topic = ?config(topic, Config),
  671. STopic = ?config(stopic, Config),
  672. Payload1 = <<"hello1">>,
  673. Payload2 = <<"hello2">>,
  674. ClientId = ?config(client_id, Config),
  675. {ok, Client1} = emqtt:start_link([
  676. {proto_ver, v5},
  677. {clientid, ClientId},
  678. {properties, #{'Session-Expiry-Interval' => 30}},
  679. {clean_start, true}
  680. | Config
  681. ]),
  682. {ok, _} = emqtt:ConnFun(Client1),
  683. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  684. ok = emqtt:disconnect(Client1),
  685. maybe_kill_connection_process(ClientId, Config),
  686. ok = publish_many(messages(Topic, [Payload1, Payload2])),
  687. {ok, Client2} = emqtt:start_link([
  688. {proto_ver, v5},
  689. {clientid, ClientId},
  690. {properties, #{'Session-Expiry-Interval' => 30}},
  691. {clean_start, false}
  692. | Config
  693. ]),
  694. {ok, _} = emqtt:ConnFun(Client2),
  695. Msgs = receive_messages(2),
  696. ?assertMatch([_, _], Msgs),
  697. [Msg1, Msg2] = Msgs,
  698. ?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)),
  699. ?assertEqual({ok, 2}, maps:find(qos, Msg1)),
  700. ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)),
  701. ?assertEqual({ok, 2}, maps:find(qos, Msg2)),
  702. ok = emqtt:disconnect(Client2).
  703. t_publish_many_while_client_is_gone(Config) ->
  704. %% A persistent session should receive all of the still unacked messages
  705. %% for its subscriptions after the client dies or reconnects, in addition
  706. %% to PUBRELs for the messages it has PUBRECed. While client must send
  707. %% PUBACKs and PUBRECs in order, those orders are independent of each other.
  708. %%
  709. %% Developer's note: for simplicity we publish all messages to the
  710. %% same topic, since persistent session ds may reorder messages
  711. %% that belong to different streams, and this particular test is
  712. %% very sensitive the order.
  713. ClientId = ?config(client_id, Config),
  714. ConnFun = ?config(conn_fun, Config),
  715. ClientOpts = [
  716. {proto_ver, v5},
  717. {clientid, ClientId},
  718. {properties, #{'Session-Expiry-Interval' => 30}},
  719. {auto_ack, never}
  720. | Config
  721. ],
  722. {ok, Client1} = emqtt:start_link([{clean_start, true} | ClientOpts]),
  723. {ok, _} = emqtt:ConnFun(Client1),
  724. {ok, _, [?QOS_2]} = emqtt:subscribe(Client1, <<"t">>, ?QOS_2),
  725. Pubs1 = [
  726. #mqtt_msg{topic = <<"t">>, payload = <<"M1">>, qos = 1},
  727. #mqtt_msg{topic = <<"t">>, payload = <<"M2">>, qos = 1},
  728. #mqtt_msg{topic = <<"t">>, payload = <<"M3">>, qos = 2},
  729. #mqtt_msg{topic = <<"t">>, payload = <<"M4">>, qos = 2},
  730. #mqtt_msg{topic = <<"t">>, payload = <<"M5">>, qos = 2},
  731. #mqtt_msg{topic = <<"t">>, payload = <<"M6">>, qos = 1},
  732. #mqtt_msg{topic = <<"t">>, payload = <<"M7">>, qos = 2},
  733. #mqtt_msg{topic = <<"t">>, payload = <<"M8">>, qos = 1},
  734. #mqtt_msg{topic = <<"t">>, payload = <<"M9">>, qos = 2}
  735. ],
  736. ok = publish_many(Pubs1),
  737. NPubs1 = length(Pubs1),
  738. Msgs1 = receive_messages(NPubs1),
  739. ct:pal("Msgs1 = ~p", [Msgs1]),
  740. NMsgs1 = length(Msgs1),
  741. ?assertEqual(NPubs1, NMsgs1, emqx_persistent_session_ds:print_session(ClientId)),
  742. ?assertEqual(
  743. get_topicwise_order(Pubs1),
  744. get_topicwise_order(Msgs1),
  745. emqx_persistent_session_ds:print_session(ClientId)
  746. ),
  747. %% PUBACK every QoS 1 message.
  748. lists:foreach(
  749. fun(PktId) -> ok = emqtt:puback(Client1, PktId) end,
  750. [PktId || #{qos := 1, packet_id := PktId} <- Msgs1]
  751. ),
  752. %% PUBREC first `NRecs` QoS 2 messages (up to "M5")
  753. NRecs = 3,
  754. PubRecs1 = lists:sublist([PktId || #{qos := 2, packet_id := PktId} <- Msgs1], NRecs),
  755. lists:foreach(
  756. fun(PktId) -> ok = emqtt:pubrec(Client1, PktId) end,
  757. PubRecs1
  758. ),
  759. %% Ensure that PUBACKs / PUBRECs are propagated to the channel.
  760. pong = emqtt:ping(Client1),
  761. %% Receive PUBRELs for the sent PUBRECs.
  762. PubRels1 = receive_messages(NRecs),
  763. ct:pal("PubRels1 = ~p", [PubRels1]),
  764. ?assertEqual(
  765. PubRecs1,
  766. [PktId || {pubrel, #{packet_id := PktId}} <- PubRels1],
  767. PubRels1
  768. ),
  769. ok = disconnect_client(Client1),
  770. maybe_kill_connection_process(ClientId, Config),
  771. Pubs2 = [
  772. #mqtt_msg{topic = <<"t">>, payload = <<"M10">>, qos = 2},
  773. #mqtt_msg{topic = <<"t">>, payload = <<"M11">>, qos = 1},
  774. #mqtt_msg{topic = <<"t">>, payload = <<"M12">>, qos = 2}
  775. ],
  776. ok = publish_many(Pubs2),
  777. NPubs2 = length(Pubs2),
  778. {ok, Client2} = emqtt:start_link([{clean_start, false} | ClientOpts]),
  779. {ok, _} = emqtt:ConnFun(Client2),
  780. %% Try to receive _at most_ `NPubs` messages.
  781. %% There shouldn't be that much unacked messages in the replay anyway,
  782. %% but it's an easy number to pick.
  783. NPubs = NPubs1 + NPubs2,
  784. Msgs2 = receive_messages(NPubs, _Timeout = 2000),
  785. ct:pal("Msgs2 = ~p", [Msgs2]),
  786. %% We should again receive PUBRELs for the PUBRECs we sent earlier.
  787. ?assertEqual(
  788. get_msgs_essentials(PubRels1),
  789. [get_msg_essentials(PubRel) || PubRel = {pubrel, _} <- Msgs2]
  790. ),
  791. %% We should receive duplicates only for QoS 2 messages where PUBRELs were
  792. %% not sent, in the same order as the original messages.
  793. Msgs2Dups = [get_msg_essentials(M) || M = #{dup := true} <- Msgs2],
  794. ?assertEqual(
  795. Msgs2Dups,
  796. [M || M = #{qos := 2} <- Msgs2Dups]
  797. ),
  798. ?assertEqual(
  799. get_msgs_essentials(pick_respective_msgs(Msgs2Dups, Msgs1)),
  800. Msgs2Dups
  801. ),
  802. %% Ack more messages:
  803. PubRecs2 = lists:sublist([PktId || #{qos := 2, packet_id := PktId} <- Msgs2], 2),
  804. lists:foreach(
  805. fun(PktId) -> ok = emqtt:pubrec(Client2, PktId) end,
  806. PubRecs2
  807. ),
  808. PubRels2 = receive_messages(length(PubRecs2)),
  809. ct:pal("PubRels2 = ~p", [PubRels2]),
  810. ?assertEqual(
  811. PubRecs2,
  812. [PktId || {pubrel, #{packet_id := PktId}} <- PubRels2],
  813. PubRels2
  814. ),
  815. %% PUBCOMP every PUBREL.
  816. PubComps = [PktId || {pubrel, #{packet_id := PktId}} <- PubRels1 ++ PubRels2],
  817. ct:pal("PubComps: ~p", [PubComps]),
  818. lists:foreach(
  819. fun(PktId) -> ok = emqtt:pubcomp(Client2, PktId) end,
  820. PubComps
  821. ),
  822. %% Ensure that PUBCOMPs are propagated to the channel.
  823. pong = emqtt:ping(Client2),
  824. %% Reconnect for the last time
  825. ok = disconnect_client(Client2),
  826. maybe_kill_connection_process(ClientId, Config),
  827. {ok, Client3} = emqtt:start_link([{clean_start, false} | ClientOpts]),
  828. {ok, _} = emqtt:ConnFun(Client3),
  829. %% Check that we receive the rest of the messages:
  830. Msgs3 = receive_messages(NPubs, _Timeout = 2000),
  831. ct:pal("Msgs3 = ~p", [Msgs3]),
  832. ?assertMatch(
  833. [<<"M10">>, <<"M11">>, <<"M12">>],
  834. [I || #{payload := I} <- Msgs3]
  835. ),
  836. ok = disconnect_client(Client3).
  837. t_clean_start_drops_subscriptions(Config) ->
  838. %% 1. A persistent session is started and disconnected.
  839. %% 2. While disconnected, a message is published and persisted.
  840. %% 3. When connecting again, the clean start flag is set, the subscription is renewed,
  841. %% then we disconnect again.
  842. %% 4. Finally, a new connection is made with clean start set to false.
  843. %% The original message should not be delivered.
  844. ConnFun = ?config(conn_fun, Config),
  845. Topic = ?config(topic, Config),
  846. STopic = ?config(stopic, Config),
  847. Payload1 = <<"hello1">>,
  848. Payload2 = <<"hello2">>,
  849. Payload3 = <<"hello3">>,
  850. ClientId = ?config(client_id, Config),
  851. %% 1.
  852. {ok, Client1} = emqtt:start_link([
  853. {proto_ver, v5},
  854. {clientid, ClientId},
  855. {properties, #{'Session-Expiry-Interval' => 30}},
  856. {clean_start, true}
  857. | Config
  858. ]),
  859. {ok, _} = emqtt:ConnFun(Client1),
  860. {ok, _, [1]} = emqtt:subscribe(Client1, STopic, qos1),
  861. ok = emqtt:disconnect(Client1),
  862. maybe_kill_connection_process(ClientId, Config),
  863. %% 2.
  864. ok = publish(Topic, Payload1, ?QOS_1),
  865. %% 3.
  866. {ok, Client2} = emqtt:start_link([
  867. {proto_ver, v5},
  868. {clientid, ClientId},
  869. {properties, #{'Session-Expiry-Interval' => 30}},
  870. {clean_start, true}
  871. | Config
  872. ]),
  873. {ok, _} = emqtt:ConnFun(Client2),
  874. ?assertEqual(0, client_info(session_present, Client2)),
  875. {ok, _, [1]} = emqtt:subscribe(Client2, STopic, qos1),
  876. timer:sleep(100),
  877. ok = publish(Topic, Payload2, ?QOS_1),
  878. [Msg1] = receive_messages(1),
  879. ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg1)),
  880. pong = emqtt:ping(Client2),
  881. ok = emqtt:disconnect(Client2),
  882. maybe_kill_connection_process(ClientId, Config),
  883. %% 4.
  884. {ok, Client3} = emqtt:start_link([
  885. {proto_ver, v5},
  886. {clientid, ClientId},
  887. {properties, #{'Session-Expiry-Interval' => 30}},
  888. {clean_start, false}
  889. | Config
  890. ]),
  891. {ok, _} = emqtt:ConnFun(Client3),
  892. ok = publish(Topic, Payload3, ?QOS_1),
  893. [Msg2] = receive_messages(1),
  894. ?assertEqual({ok, iolist_to_binary(Payload3)}, maps:find(payload, Msg2)),
  895. pong = emqtt:ping(Client3),
  896. ok = emqtt:disconnect(Client3).
  897. t_unsubscribe(Config) ->
  898. ConnFun = ?config(conn_fun, Config),
  899. STopic = ?config(stopic, Config),
  900. ClientId = ?config(client_id, Config),
  901. {ok, Client} = emqtt:start_link([
  902. {clientid, ClientId},
  903. {proto_ver, v5},
  904. {properties, #{'Session-Expiry-Interval' => 30}}
  905. | Config
  906. ]),
  907. {ok, _} = emqtt:ConnFun(Client),
  908. {ok, _, [2]} = emqtt:subscribe(Client, STopic, qos2),
  909. ?assertMatch([_], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
  910. {ok, _, _} = emqtt:unsubscribe(Client, STopic),
  911. ?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
  912. ok = emqtt:disconnect(Client).
  913. %% This testcase verifies that un-acked messages that were once sent
  914. %% to the client are still retransmitted after the session
  915. %% unsubscribes from the topic and reconnects.
  916. t_unsubscribe_replay(Config) ->
  917. ConnFun = ?config(conn_fun, Config),
  918. TopicPrefix = ?config(topic, Config),
  919. ClientId = atom_to_binary(?FUNCTION_NAME),
  920. ClientOpts = [
  921. {proto_ver, v5},
  922. {clientid, ClientId},
  923. {properties, #{'Session-Expiry-Interval' => 30, 'Receive-Maximum' => 10}},
  924. {max_inflight, 10}
  925. | Config
  926. ],
  927. {ok, Sub} = emqtt:start_link([{clean_start, true}, {auto_ack, never} | ClientOpts]),
  928. {ok, _} = emqtt:ConnFun(Sub),
  929. %% 1. Make two subscriptions, one is to be deleted:
  930. Topic1 = iolist_to_binary([TopicPrefix, $/, <<"unsub">>]),
  931. Topic2 = iolist_to_binary([TopicPrefix, $/, <<"sub">>]),
  932. ?assertMatch({ok, _, _}, emqtt:subscribe(Sub, Topic1, qos2)),
  933. ?assertMatch({ok, _, _}, emqtt:subscribe(Sub, Topic2, qos2)),
  934. %% 2. Publish 2 messages to the first and second topics each
  935. %% (client doesn't ack them):
  936. ok = publish(Topic1, <<"1">>, ?QOS_1),
  937. ok = publish(Topic1, <<"2">>, ?QOS_2),
  938. [Msg1, Msg2] = receive_messages(2),
  939. ?assertMatch(
  940. [
  941. #{payload := <<"1">>},
  942. #{payload := <<"2">>}
  943. ],
  944. [Msg1, Msg2]
  945. ),
  946. ok = publish(Topic2, <<"3">>, ?QOS_1),
  947. ok = publish(Topic2, <<"4">>, ?QOS_2),
  948. [Msg3, Msg4] = receive_messages(2),
  949. ?assertMatch(
  950. [
  951. #{payload := <<"3">>},
  952. #{payload := <<"4">>}
  953. ],
  954. [Msg3, Msg4]
  955. ),
  956. %% 3. Unsubscribe from the topic and disconnect:
  957. ?assertMatch({ok, _, _}, emqtt:unsubscribe(Sub, Topic1)),
  958. ok = emqtt:disconnect(Sub),
  959. %% 5. Publish more messages to the disconnected topic:
  960. ok = publish(Topic1, <<"5">>, ?QOS_1),
  961. ok = publish(Topic1, <<"6">>, ?QOS_2),
  962. %% 4. Reconnect the client. It must only receive only four
  963. %% messages from the time when it was subscribed:
  964. {ok, Sub1} = emqtt:start_link([{clean_start, false}, {auto_ack, true} | ClientOpts]),
  965. ?assertMatch({ok, _}, emqtt:ConnFun(Sub1)),
  966. %% Note: we ask for 6 messages, but expect only 4, it's
  967. %% intentional:
  968. ?assertMatch(
  969. #{
  970. Topic1 := [<<"1">>, <<"2">>],
  971. Topic2 := [<<"3">>, <<"4">>]
  972. },
  973. get_topicwise_order(receive_messages(6, 5_000)),
  974. debug_info(ClientId)
  975. ),
  976. %% 5. Now let's resubscribe, and check that the session can receive new messages:
  977. ?assertMatch({ok, _, _}, emqtt:subscribe(Sub1, Topic1, qos2)),
  978. ok = publish(Topic1, <<"7">>, ?QOS_0),
  979. ok = publish(Topic1, <<"8">>, ?QOS_1),
  980. ok = publish(Topic1, <<"9">>, ?QOS_2),
  981. ?assertMatch(
  982. [<<"7">>, <<"8">>, <<"9">>],
  983. lists:map(fun get_msgpub_payload/1, receive_messages(3))
  984. ),
  985. ok = emqtt:disconnect(Sub1).
  986. %% This testcase verifies that persistent sessions handle "transient"
  987. %% mesages correctly.
  988. %%
  989. %% Transient messages are delivered to the channel directly, bypassing
  990. %% the broker code that decides whether the messages should be
  991. %% persisted or not, and therefore they are not persisted.
  992. %%
  993. %% `emqx_retainer' is an example of application that uses this
  994. %% mechanism.
  995. %%
  996. %% This testcase creates the conditions when the transient messages
  997. %% appear in the middle of the replay, to make sure the durable
  998. %% session doesn't get confused and/or stuck if retained messages are
  999. %% changed while the session was down.
  1000. t_transient(Config) ->
  1001. ConnFun = ?config(conn_fun, Config),
  1002. TopicPrefix = ?config(topic, Config),
  1003. ClientId = atom_to_binary(?FUNCTION_NAME),
  1004. ClientOpts = [
  1005. {proto_ver, v5},
  1006. {clientid, ClientId},
  1007. {properties, #{'Session-Expiry-Interval' => 30, 'Receive-Maximum' => 100}},
  1008. {max_inflight, 100}
  1009. | Config
  1010. ],
  1011. Deliver = fun(Topic, Payload, QoS) ->
  1012. [Pid] = emqx_cm:lookup_channels(ClientId),
  1013. Msg = emqx_message:make(_From = <<"test">>, QoS, Topic, Payload),
  1014. Pid ! {deliver, Topic, Msg}
  1015. end,
  1016. Topic1 = <<TopicPrefix/binary, "/1">>,
  1017. Topic2 = <<TopicPrefix/binary, "/2">>,
  1018. Topic3 = <<TopicPrefix/binary, "/3">>,
  1019. %% 1. Start the client and subscribe to the topic:
  1020. {ok, Sub} = emqtt:start_link([{clean_start, true}, {auto_ack, never} | ClientOpts]),
  1021. ?assertMatch({ok, _}, emqtt:ConnFun(Sub)),
  1022. ?assertMatch({ok, _, _}, emqtt:subscribe(Sub, <<TopicPrefix/binary, "/#">>, qos2)),
  1023. %% 2. Publish regular messages:
  1024. publish(Topic1, <<"1">>, ?QOS_1),
  1025. publish(Topic1, <<"2">>, ?QOS_2),
  1026. Msgs1 = receive_messages(2),
  1027. [#{payload := <<"1">>, packet_id := PI1}, #{payload := <<"2">>, packet_id := PI2}] = Msgs1,
  1028. %% 3. Publish and recieve transient messages:
  1029. Deliver(Topic2, <<"3">>, ?QOS_0),
  1030. Deliver(Topic2, <<"4">>, ?QOS_1),
  1031. Deliver(Topic2, <<"5">>, ?QOS_2),
  1032. Msgs2 = receive_messages(3),
  1033. ?assertMatch(
  1034. [
  1035. #{payload := <<"3">>, qos := ?QOS_0},
  1036. #{payload := <<"4">>, qos := ?QOS_1},
  1037. #{payload := <<"5">>, qos := ?QOS_2}
  1038. ],
  1039. Msgs2
  1040. ),
  1041. %% 4. Publish more regular messages:
  1042. publish(Topic3, <<"6">>, ?QOS_1),
  1043. publish(Topic3, <<"7">>, ?QOS_2),
  1044. Msgs3 = receive_messages(2),
  1045. [#{payload := <<"6">>, packet_id := PI6}, #{payload := <<"7">>, packet_id := PI7}] = Msgs3,
  1046. %% 5. Reconnect the client:
  1047. ok = emqtt:disconnect(Sub),
  1048. {ok, Sub1} = emqtt:start_link([{clean_start, false}, {auto_ack, true} | ClientOpts]),
  1049. ?assertMatch({ok, _}, emqtt:ConnFun(Sub1)),
  1050. %% 6. Recieve the historic messages and check that their packet IDs didn't change:
  1051. %% Note: durable session currenty WON'T replay transient messages.
  1052. ProcessMessage = fun(#{payload := P, packet_id := ID}) -> {ID, P} end,
  1053. ?assertMatch(
  1054. #{
  1055. Topic1 := [{PI1, <<"1">>}, {PI2, <<"2">>}],
  1056. Topic3 := [{PI6, <<"6">>}, {PI7, <<"7">>}]
  1057. },
  1058. maps:groups_from_list(fun get_msgpub_topic/1, ProcessMessage, receive_messages(7, 5_000))
  1059. ),
  1060. %% 7. Finish off by sending messages to all the topics to make
  1061. %% sure none of the streams are blocked:
  1062. [publish(T, <<"fin">>, ?QOS_2) || T <- [Topic1, Topic2, Topic3]],
  1063. ?assertMatch(
  1064. #{
  1065. Topic1 := [<<"fin">>],
  1066. Topic2 := [<<"fin">>],
  1067. Topic3 := [<<"fin">>]
  1068. },
  1069. get_topicwise_order(receive_messages(3))
  1070. ),
  1071. ok = emqtt:disconnect(Sub1).
  1072. t_multiple_subscription_matches(Config) ->
  1073. ConnFun = ?config(conn_fun, Config),
  1074. Topic = ?config(topic, Config),
  1075. STopic1 = ?config(stopic, Config),
  1076. STopic2 = ?config(stopic_alt, Config),
  1077. Payload = <<"test message">>,
  1078. ClientId = ?config(client_id, Config),
  1079. {ok, Client1} = emqtt:start_link([
  1080. {clientid, ClientId},
  1081. {proto_ver, v5},
  1082. {properties, #{'Session-Expiry-Interval' => 30}}
  1083. | Config
  1084. ]),
  1085. {ok, _} = emqtt:ConnFun(Client1),
  1086. {ok, _, [2]} = emqtt:subscribe(Client1, STopic1, qos2),
  1087. {ok, _, [2]} = emqtt:subscribe(Client1, STopic2, qos2),
  1088. ok = emqtt:disconnect(Client1),
  1089. maybe_kill_connection_process(ClientId, Config),
  1090. publish(Topic, Payload),
  1091. {ok, Client2} = emqtt:start_link([
  1092. {clientid, ClientId},
  1093. {proto_ver, v5},
  1094. {properties, #{'Session-Expiry-Interval' => 30}},
  1095. {clean_start, false}
  1096. | Config
  1097. ]),
  1098. {ok, _} = emqtt:ConnFun(Client2),
  1099. %% We will receive the same message twice because it matches two subscriptions.
  1100. [Msg1, Msg2] = receive_messages(2),
  1101. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg1)),
  1102. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg1)),
  1103. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg2)),
  1104. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg2)),
  1105. ?assertEqual({ok, 2}, maps:find(qos, Msg1)),
  1106. ?assertEqual({ok, 2}, maps:find(qos, Msg2)),
  1107. ok = emqtt:disconnect(Client2).
  1108. %% Check that we don't get a will message when the client disconnects with success reason
  1109. %% code, with `Will-Delay-Interval' = 0, `Session-Expiry-Interval' > 0, QoS = 1.
  1110. t_no_will_message(Config) ->
  1111. ConnFun = ?config(conn_fun, Config),
  1112. WillTopic = ?config(topic, Config),
  1113. WillPayload = <<"will message">>,
  1114. ClientId = ?config(client_id, Config),
  1115. ?check_trace(
  1116. #{timetrap => 15_000},
  1117. begin
  1118. ok = emqx:subscribe(WillTopic, #{qos => 2}),
  1119. {ok, Client} = emqtt:start_link([
  1120. {clientid, ClientId},
  1121. {proto_ver, v5},
  1122. {properties, #{'Session-Expiry-Interval' => 1}},
  1123. {will_topic, WillTopic},
  1124. {will_payload, WillPayload},
  1125. {will_qos, 1},
  1126. {will_props, #{'Will-Delay-Interval' => 0}}
  1127. | Config
  1128. ]),
  1129. {ok, _} = emqtt:ConnFun(Client),
  1130. ok = emqtt:disconnect(Client, ?RC_SUCCESS),
  1131. %% No will message
  1132. ?assertNotReceive({deliver, WillTopic, _}, 5_000),
  1133. ok
  1134. end,
  1135. []
  1136. ),
  1137. ok.
  1138. %% Check that we get a single will message when the client disconnects with a non
  1139. %% successfull reason code, with `Will-Delay-Interval' = `Session-Expiry-Interval' > 0,
  1140. %% QoS = 1.
  1141. t_will_message1(Config) ->
  1142. do_t_will_message(Config, #{will_delay => 1, session_expiry => 1}),
  1143. ok.
  1144. %% Check that we get a single will message when the client disconnects with a non
  1145. %% successfull reason code, with `Will-Delay-Interval' = 0, `Session-Expiry-Interval' > 0,
  1146. %% QoS = 1.
  1147. t_will_message2(Config) ->
  1148. do_t_will_message(Config, #{will_delay => 0, session_expiry => 1}),
  1149. ok.
  1150. %% Check that we get a single will message when the client disconnects with a non
  1151. %% successfull reason code, with `Will-Delay-Interval' >> `Session-Expiry-Interval' > 0,
  1152. %% QoS = 1.
  1153. t_will_message3(Config) ->
  1154. do_t_will_message(Config, #{will_delay => 300, session_expiry => 1}),
  1155. ok.
  1156. do_t_will_message(Config, Opts) ->
  1157. #{
  1158. session_expiry := SessionExpiry,
  1159. will_delay := WillDelay
  1160. } = Opts,
  1161. ConnFun = ?config(conn_fun, Config),
  1162. WillTopic = ?config(topic, Config),
  1163. WillPayload = <<"will message">>,
  1164. ClientId = ?config(client_id, Config),
  1165. ?check_trace(
  1166. #{timetrap => 15_000},
  1167. begin
  1168. ok = emqx:subscribe(WillTopic, #{qos => 2}),
  1169. {ok, Client} = emqtt:start_link([
  1170. {clientid, ClientId},
  1171. {proto_ver, v5},
  1172. {properties, #{'Session-Expiry-Interval' => SessionExpiry}},
  1173. {will_topic, WillTopic},
  1174. {will_payload, WillPayload},
  1175. {will_qos, 1},
  1176. {will_props, #{'Will-Delay-Interval' => WillDelay}}
  1177. | Config
  1178. ]),
  1179. {ok, _} = emqtt:ConnFun(Client),
  1180. ok = emqtt:disconnect(Client, ?RC_UNSPECIFIED_ERROR),
  1181. ?assertReceive({deliver, WillTopic, #message{payload = WillPayload}}, 10_000),
  1182. %% No duplicates
  1183. ?assertNotReceive({deliver, WillTopic, _}, 100),
  1184. ok
  1185. end,
  1186. []
  1187. ),
  1188. ok.
  1189. t_sys_message_delivery(Config) ->
  1190. ConnFun = ?config(conn_fun, Config),
  1191. SysTopicFilter = emqx_topic:join(["$SYS", "brokers", '+', "uptime"]),
  1192. SysTopic = emqx_topic:join(["$SYS", "brokers", atom_to_list(node()), "uptime"]),
  1193. ClientId = ?config(client_id, Config),
  1194. {ok, Client1} = emqtt:start_link([
  1195. {clientid, ClientId},
  1196. {proto_ver, v5},
  1197. {properties, #{'Session-Expiry-Interval' => 30}}
  1198. | Config
  1199. ]),
  1200. {ok, _} = emqtt:ConnFun(Client1),
  1201. {ok, _, [1]} = emqtt:subscribe(Client1, SysTopicFilter, [{qos, 1}, {rh, 2}]),
  1202. ?assertMatch(
  1203. [
  1204. #{topic := SysTopic, qos := 0, retain := false, payload := _Uptime1},
  1205. #{topic := SysTopic, qos := 0, retain := false, payload := _Uptime2}
  1206. ],
  1207. receive_messages(2)
  1208. ),
  1209. ok = emqtt:disconnect(Client1),
  1210. {ok, Client2} = emqtt:start_link([
  1211. {clientid, ClientId},
  1212. {proto_ver, v5},
  1213. {properties, #{'Session-Expiry-Interval' => 30}},
  1214. {clean_start, false}
  1215. | Config
  1216. ]),
  1217. {ok, _} = emqtt:ConnFun(Client2),
  1218. ?assertMatch(
  1219. [#{topic := SysTopic, qos := 0, retain := false, payload := _Uptime3}],
  1220. receive_messages(1)
  1221. ).
  1222. get_topicwise_order(Msgs) ->
  1223. maps:groups_from_list(fun get_msgpub_topic/1, fun get_msgpub_payload/1, Msgs).
  1224. get_msgpub_topic(#mqtt_msg{topic = Topic}) ->
  1225. Topic;
  1226. get_msgpub_topic(#{topic := Topic}) ->
  1227. Topic.
  1228. get_msgpub_payload(#mqtt_msg{payload = Payload}) ->
  1229. Payload;
  1230. get_msgpub_payload(#{payload := Payload}) ->
  1231. Payload.
  1232. get_msg_essentials(Msg = #{}) ->
  1233. maps:with([packet_id, topic, payload, qos], Msg);
  1234. get_msg_essentials({pubrel, Msg}) ->
  1235. {pubrel, maps:with([packet_id, reason_code], Msg)}.
  1236. get_msgs_essentials(Msgs) ->
  1237. [get_msg_essentials(M) || M <- Msgs].
  1238. pick_respective_msgs(MsgRefs, Msgs) ->
  1239. [M || M <- Msgs, Ref <- MsgRefs, maps:get(packet_id, M) =:= maps:get(packet_id, Ref)].
  1240. debug_info(ClientId) ->
  1241. Info = emqx_persistent_session_ds:print_session(ClientId),
  1242. ct:pal("*** State:~n~p", [Info]).