emqx_persistent_session_SUITE.erl 48 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_persistent_session_SUITE).
  17. -include_lib("stdlib/include/assert.hrl").
  18. -include_lib("emqx/include/asserts.hrl").
  19. -include_lib("common_test/include/ct.hrl").
  20. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  21. -include_lib("emqx/include/emqx_mqtt.hrl").
  22. -include_lib("emqx_utils/include/emqx_message.hrl").
  23. -compile(export_all).
  24. -compile(nowarn_export_all).
  25. -include("emqx_persistent_message.hrl").
  26. -define(EMQX_CONFIG, "sys_topics.sys_heartbeat_interval = 1s\n").
  27. %%--------------------------------------------------------------------
  28. %% SUITE boilerplate
  29. %%--------------------------------------------------------------------
  30. all() ->
  31. [
  32. % NOTE
  33. % Tests are disabled while existing session persistence impl is being
  34. % phased out.
  35. {group, persistence_disabled},
  36. {group, persistence_enabled}
  37. ].
  38. %% A persistent session can be resumed in two ways:
  39. %% 1. The old connection process is still alive, and the session is taken
  40. %% over by the new connection.
  41. %% 2. The old session process has died (e.g., because of node down).
  42. %% The new process resumes the session from the stored state, and finds
  43. %% any subscribed messages from the persistent message store.
  44. %%
  45. %% We want to test both ways, both with the db backend enabled and disabled.
  46. %%
  47. %% In addition, we test both tcp and quic connections.
  48. groups() ->
  49. TCs = emqx_common_test_helpers:all(?MODULE),
  50. TCsNonGeneric = [t_choose_impl, t_transient],
  51. TCGroups = [{group, tcp}, {group, quic}, {group, ws}],
  52. [
  53. {persistence_disabled, TCGroups},
  54. {persistence_enabled, TCGroups},
  55. {tcp, [], TCs},
  56. {quic, [], TCs -- TCsNonGeneric},
  57. {ws, [], TCs -- TCsNonGeneric}
  58. ].
  59. init_per_group(persistence_disabled, Config) ->
  60. [
  61. {emqx_config, ?EMQX_CONFIG ++ "durable_sessions { enable = false }"},
  62. {persistence, false}
  63. | Config
  64. ];
  65. init_per_group(persistence_enabled, Config) ->
  66. [
  67. {emqx_config,
  68. ?EMQX_CONFIG ++
  69. "durable_sessions {\n"
  70. " enable = true\n"
  71. " heartbeat_interval = 100ms\n"
  72. " renew_streams_interval = 100ms\n"
  73. " session_gc_interval = 2s\n"
  74. "}\n"
  75. "durable_storage.messages.backend = builtin_local"},
  76. {persistence, ds}
  77. | Config
  78. ];
  79. init_per_group(tcp, Config) ->
  80. Apps = emqx_cth_suite:start(
  81. [{emqx, ?config(emqx_config, Config)}],
  82. #{work_dir => emqx_cth_suite:work_dir(Config)}
  83. ),
  84. [
  85. {port, get_listener_port(tcp, default)},
  86. {conn_fun, connect},
  87. {group_apps, Apps}
  88. | Config
  89. ];
  90. init_per_group(ws, Config) ->
  91. Apps = emqx_cth_suite:start(
  92. [{emqx, ?config(emqx_config, Config)}],
  93. #{work_dir => emqx_cth_suite:work_dir(Config)}
  94. ),
  95. [
  96. {ssl, false},
  97. {host, "localhost"},
  98. {enable_websocket, true},
  99. {port, get_listener_port(ws, default)},
  100. {conn_fun, ws_connect},
  101. {group_apps, Apps}
  102. | Config
  103. ];
  104. init_per_group(quic, Config) ->
  105. Apps = emqx_cth_suite:start(
  106. [
  107. {emqx,
  108. ?config(emqx_config, Config) ++
  109. "\n listeners.quic.test {"
  110. "\n enable = true"
  111. "\n ssl_options.verify = verify_peer"
  112. "\n }"}
  113. ],
  114. #{work_dir => emqx_cth_suite:work_dir(Config)}
  115. ),
  116. [
  117. {port, get_listener_port(quic, test)},
  118. {conn_fun, quic_connect},
  119. {ssl_opts, emqx_common_test_helpers:client_mtls()},
  120. {ssl, true},
  121. {group_apps, Apps}
  122. | Config
  123. ].
  124. get_listener_port(Type, Name) ->
  125. case emqx_config:get([listeners, Type, Name, bind]) of
  126. {_, Port} -> Port;
  127. Port -> Port
  128. end.
  129. end_per_group(Group, Config) when Group == tcp; Group == ws; Group == quic ->
  130. ok = emqx_cth_suite:stop(?config(group_apps, Config));
  131. end_per_group(_, _Config) ->
  132. catch emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB),
  133. ok.
  134. init_per_testcase(TestCase, Config) ->
  135. Config1 = preconfig_per_testcase(TestCase, Config),
  136. case erlang:function_exported(?MODULE, TestCase, 2) of
  137. true -> ?MODULE:TestCase(init, Config1);
  138. _ -> Config1
  139. end.
  140. end_per_testcase(TestCase, Config) ->
  141. case erlang:function_exported(?MODULE, TestCase, 2) of
  142. true -> ?MODULE:TestCase('end', Config);
  143. false -> ok
  144. end,
  145. Config.
  146. preconfig_per_testcase(TestCase, Config) ->
  147. {BaseName, Config1} =
  148. case ?config(tc_group_properties, Config) of
  149. [] ->
  150. %% We are running a single testcase
  151. {
  152. atom_to_binary(TestCase),
  153. init_per_group(tcp, init_per_group(kill_connection_process, Config))
  154. };
  155. [_ | _] = Props ->
  156. Path = lists:reverse(?config(tc_group_path, Config) ++ Props),
  157. Pre0 = [atom_to_list(N) || {name, N} <- lists:flatten(Path)],
  158. Pre1 = lists:join("_", Pre0 ++ [atom_to_binary(TestCase)]),
  159. {iolist_to_binary(Pre1), Config}
  160. end,
  161. [
  162. {topic, iolist_to_binary([BaseName, "/foo"])},
  163. {stopic, iolist_to_binary([BaseName, "/+"])},
  164. {stopic_alt, iolist_to_binary([BaseName, "/foo"])},
  165. {client_id, BaseName}
  166. | Config1
  167. ].
  168. %%--------------------------------------------------------------------
  169. %% Helpers
  170. %%--------------------------------------------------------------------
  171. client_info(Key, Client) ->
  172. maps:get(Key, maps:from_list(emqtt:info(Client)), undefined).
  173. receive_messages(Count) ->
  174. receive_messages(Count, 15000).
  175. receive_messages(Count, Timeout) ->
  176. Deadline = erlang:monotonic_time(millisecond) + Timeout,
  177. receive_message_loop(Count, Deadline).
  178. receive_message_loop(0, _Deadline) ->
  179. [];
  180. receive_message_loop(Count, Deadline) ->
  181. Timeout = max(0, Deadline - erlang:monotonic_time(millisecond)),
  182. receive
  183. {publish, Msg} ->
  184. [Msg | receive_message_loop(Count - 1, Deadline)];
  185. {pubrel, Msg} ->
  186. [{pubrel, Msg} | receive_message_loop(Count - 1, Deadline)];
  187. _Other ->
  188. receive_message_loop(Count, Deadline)
  189. after Timeout ->
  190. []
  191. end.
  192. maybe_kill_connection_process(ClientId, Config) ->
  193. Persistence = ?config(persistence, Config),
  194. case emqx_cm:lookup_channels(ClientId) of
  195. [] ->
  196. ok;
  197. [ConnectionPid] when Persistence == ds ->
  198. Ref = monitor(process, ConnectionPid),
  199. ConnectionPid ! die_if_test,
  200. ?assertReceive(
  201. {'DOWN', Ref, process, ConnectionPid, Reason} when
  202. Reason == normal orelse Reason == noproc,
  203. 3000
  204. ),
  205. wait_connection_process_unregistered(ClientId);
  206. _ ->
  207. ok
  208. end.
  209. wait_connection_process_dies(ClientId) ->
  210. case emqx_cm:lookup_channels(ClientId) of
  211. [] ->
  212. ok;
  213. [ConnectionPid] ->
  214. Ref = monitor(process, ConnectionPid),
  215. ?assertReceive(
  216. {'DOWN', Ref, process, ConnectionPid, Reason} when
  217. Reason == normal orelse Reason == noproc,
  218. 3000
  219. ),
  220. wait_connection_process_unregistered(ClientId)
  221. end.
  222. wait_connection_process_unregistered(ClientId) ->
  223. ?retry(
  224. _Timeout = 100,
  225. _Retries = 20,
  226. ?assertEqual([], emqx_cm:lookup_channels(ClientId))
  227. ).
  228. wait_channel_disconnected(ClientId) ->
  229. ?retry(
  230. _Timeout = 100,
  231. _Retries = 20,
  232. case emqx_cm:lookup_channels(ClientId) of
  233. [] ->
  234. false;
  235. [ChanPid] ->
  236. false = emqx_cm:is_channel_connected(ChanPid)
  237. end
  238. ).
  239. disconnect_client(ClientPid) ->
  240. ClientId = proplists:get_value(clientid, emqtt:info(ClientPid)),
  241. ok = emqtt:disconnect(ClientPid),
  242. false = wait_channel_disconnected(ClientId),
  243. ok.
  244. messages(Topic, Payloads) ->
  245. messages(Topic, Payloads, ?QOS_2).
  246. messages(Topic, Payloads, QoS) ->
  247. lists:map(
  248. fun
  249. (Bin) when is_binary(Bin) ->
  250. #mqtt_msg{topic = Topic, payload = Bin, qos = QoS};
  251. (Msg = #mqtt_msg{}) ->
  252. Msg#mqtt_msg{topic = Topic}
  253. end,
  254. Payloads
  255. ).
  256. publish(Topic, Payload) ->
  257. publish(Topic, Payload, ?QOS_2).
  258. publish(Topic, Payload, QoS) ->
  259. publish_many(messages(Topic, [Payload], QoS)).
  260. publish_many(Messages) ->
  261. publish_many(Messages, false).
  262. publish_many(Messages, WaitForUnregister) ->
  263. Fun = fun(Client, Message) ->
  264. case emqtt:publish(Client, Message) of
  265. ok -> ok;
  266. {ok, _} -> ok
  267. end
  268. end,
  269. do_publish(Messages, Fun, WaitForUnregister).
  270. do_publish(Messages = [_ | _], PublishFun, WaitForUnregister) ->
  271. %% Publish from another process to avoid connection confusion.
  272. {Pid, Ref} =
  273. spawn_monitor(
  274. fun() ->
  275. %% For convenience, always publish using tcp.
  276. %% The publish path is not what we are testing.
  277. ClientID = <<"ps_SUITE_publisher">>,
  278. {ok, Client} = emqtt:start_link([
  279. {proto_ver, v5},
  280. {clientid, ClientID},
  281. {port, 1883}
  282. ]),
  283. {ok, _} = emqtt:connect(Client),
  284. lists:foreach(fun(Message) -> PublishFun(Client, Message) end, Messages),
  285. ok = emqtt:disconnect(Client),
  286. %% Snabbkaffe sometimes fails unless all processes are gone.
  287. WaitForUnregister andalso wait_connection_process_dies(ClientID)
  288. end
  289. ),
  290. receive
  291. {'DOWN', Ref, process, Pid, normal} -> ok;
  292. {'DOWN', Ref, process, Pid, What} -> error({failed_publish, What})
  293. end.
  294. %%--------------------------------------------------------------------
  295. %% Test Cases
  296. %%--------------------------------------------------------------------
  297. t_choose_impl(Config) ->
  298. ClientId = ?config(client_id, Config),
  299. ConnFun = ?config(conn_fun, Config),
  300. {ok, Client} = emqtt:start_link([
  301. {clientid, ClientId},
  302. {proto_ver, v5},
  303. {properties, #{'Session-Expiry-Interval' => 30}}
  304. | Config
  305. ]),
  306. {ok, _} = emqtt:ConnFun(Client),
  307. [ChanPid] = emqx_cm:lookup_channels(ClientId),
  308. ?assertEqual(
  309. case ?config(persistence, Config) of
  310. false -> emqx_session_mem;
  311. ds -> emqx_persistent_session_ds
  312. end,
  313. emqx_connection:info({channel, {session, impl}}, sys:get_state(ChanPid))
  314. ),
  315. ok = emqtt:disconnect(Client).
  316. t_connect_discards_existing_client(Config) ->
  317. ClientId = ?config(client_id, Config),
  318. ConnFun = ?config(conn_fun, Config),
  319. ClientOpts = [
  320. {clientid, ClientId},
  321. {proto_ver, v5},
  322. {properties, #{'Session-Expiry-Interval' => 30}}
  323. | Config
  324. ],
  325. {ok, Client1} = emqtt:start_link(ClientOpts),
  326. true = unlink(Client1),
  327. MRef = erlang:monitor(process, Client1),
  328. {ok, _} = emqtt:ConnFun(Client1),
  329. {ok, Client2} = emqtt:start_link(ClientOpts),
  330. {ok, _} = emqtt:ConnFun(Client2),
  331. receive
  332. {'DOWN', MRef, process, Client1, Reason} ->
  333. ok = ?assertMatch({disconnected, ?RC_SESSION_TAKEN_OVER, _}, Reason),
  334. ok = emqtt:stop(Client2),
  335. ok
  336. after 1000 ->
  337. error({client_still_connected, Client1})
  338. end.
  339. %% [MQTT-3.1.2-23]
  340. t_connect_session_expiry_interval(Config) ->
  341. ConnFun = ?config(conn_fun, Config),
  342. Topic = ?config(topic, Config),
  343. STopic = ?config(stopic, Config),
  344. Payload = <<"test message">>,
  345. ClientId = ?config(client_id, Config),
  346. {ok, Client1} = emqtt:start_link([
  347. {clientid, ClientId},
  348. {proto_ver, v5},
  349. {properties, #{'Session-Expiry-Interval' => 30}}
  350. | Config
  351. ]),
  352. {ok, _} = emqtt:ConnFun(Client1),
  353. {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Client1, STopic, ?QOS_1),
  354. ok = emqtt:disconnect(Client1),
  355. maybe_kill_connection_process(ClientId, Config),
  356. publish(Topic, Payload, ?QOS_1),
  357. {ok, Client2} = emqtt:start_link([
  358. {clientid, ClientId},
  359. {proto_ver, v5},
  360. {properties, #{'Session-Expiry-Interval' => 30}},
  361. {clean_start, false}
  362. | Config
  363. ]),
  364. {ok, _} = emqtt:ConnFun(Client2),
  365. [Msg | _] = receive_messages(1),
  366. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)),
  367. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)),
  368. ?assertEqual({ok, ?QOS_1}, maps:find(qos, Msg)),
  369. ok = emqtt:disconnect(Client2).
  370. %% [MQTT-3.1.2-23]
  371. t_connect_session_expiry_interval_qos2(Config) ->
  372. ConnFun = ?config(conn_fun, Config),
  373. Topic = ?config(topic, Config),
  374. STopic = ?config(stopic, Config),
  375. Payload = <<"test message">>,
  376. ClientId = ?config(client_id, Config),
  377. {ok, Client1} = emqtt:start_link([
  378. {clientid, ClientId},
  379. {proto_ver, v5},
  380. {properties, #{'Session-Expiry-Interval' => 30}}
  381. | Config
  382. ]),
  383. {ok, _} = emqtt:ConnFun(Client1),
  384. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  385. ok = emqtt:disconnect(Client1),
  386. maybe_kill_connection_process(ClientId, Config),
  387. publish(Topic, Payload),
  388. {ok, Client2} = emqtt:start_link([
  389. {clientid, ClientId},
  390. {proto_ver, v5},
  391. {properties, #{'Session-Expiry-Interval' => 30}},
  392. {clean_start, false}
  393. | Config
  394. ]),
  395. {ok, _} = emqtt:ConnFun(Client2),
  396. [Msg | _] = receive_messages(1),
  397. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)),
  398. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)),
  399. ?assertEqual({ok, 2}, maps:find(qos, Msg)),
  400. ok = emqtt:disconnect(Client2).
  401. t_without_client_id(Config) ->
  402. %% Emqtt client dies
  403. process_flag(trap_exit, true),
  404. ConnFun = ?config(conn_fun, Config),
  405. {ok, Client0} = emqtt:start_link([
  406. {proto_ver, v5},
  407. {properties, #{'Session-Expiry-Interval' => 30}},
  408. {clean_start, false}
  409. | Config
  410. ]),
  411. {error, {client_identifier_not_valid, _}} = emqtt:ConnFun(Client0),
  412. ok.
  413. t_assigned_clientid_persistent_session(Config) ->
  414. ConnFun = ?config(conn_fun, Config),
  415. {ok, Client1} = emqtt:start_link([
  416. {proto_ver, v5},
  417. {properties, #{'Session-Expiry-Interval' => 30}},
  418. {clean_start, true}
  419. | Config
  420. ]),
  421. {ok, _} = emqtt:ConnFun(Client1),
  422. AssignedClientId = client_info(clientid, Client1),
  423. ok = emqtt:disconnect(Client1),
  424. maybe_kill_connection_process(AssignedClientId, Config),
  425. {ok, Client2} = emqtt:start_link([
  426. {clientid, AssignedClientId},
  427. {proto_ver, v5},
  428. {clean_start, false}
  429. | Config
  430. ]),
  431. {ok, _} = emqtt:ConnFun(Client2),
  432. ?assertEqual(1, client_info(session_present, Client2)),
  433. ok = emqtt:disconnect(Client2).
  434. t_cancel_on_disconnect(Config) ->
  435. %% Open a persistent session, but cancel the persistence when
  436. %% shutting down the connection.
  437. ConnFun = ?config(conn_fun, Config),
  438. ClientId = ?config(client_id, Config),
  439. {ok, Client1} = emqtt:start_link([
  440. {proto_ver, v5},
  441. {clientid, ClientId},
  442. {properties, #{'Session-Expiry-Interval' => 30}},
  443. {clean_start, true}
  444. | Config
  445. ]),
  446. {ok, _} = emqtt:ConnFun(Client1),
  447. ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 0}),
  448. wait_connection_process_unregistered(ClientId),
  449. {ok, Client2} = emqtt:start_link([
  450. {clientid, ClientId},
  451. {proto_ver, v5},
  452. {clean_start, false},
  453. {properties, #{'Session-Expiry-Interval' => 30}}
  454. | Config
  455. ]),
  456. {ok, _} = emqtt:ConnFun(Client2),
  457. ?assertEqual(0, client_info(session_present, Client2)),
  458. ok = emqtt:disconnect(Client2).
  459. t_persist_on_disconnect(Config) ->
  460. %% Open a non-persistent session, but add the persistence when
  461. %% shutting down the connection. This is a protocol error, and
  462. %% should not convert the session into a persistent session.
  463. ConnFun = ?config(conn_fun, Config),
  464. ClientId = ?config(client_id, Config),
  465. {ok, Client1} = emqtt:start_link([
  466. {proto_ver, v5},
  467. {clientid, ClientId},
  468. {properties, #{'Session-Expiry-Interval' => 0}},
  469. {clean_start, true}
  470. | Config
  471. ]),
  472. {ok, _} = emqtt:ConnFun(Client1),
  473. %% Strangely enough, the disconnect is reported as successful by emqtt.
  474. ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 30}),
  475. wait_connection_process_unregistered(ClientId),
  476. {ok, Client2} = emqtt:start_link([
  477. {clientid, ClientId},
  478. {proto_ver, v5},
  479. {clean_start, false},
  480. {properties, #{'Session-Expiry-Interval' => 30}}
  481. | Config
  482. ]),
  483. {ok, _} = emqtt:ConnFun(Client2),
  484. %% The session should not be known, since it wasn't persisted because of the
  485. %% changed expiry interval in the disconnect call.
  486. ?assertEqual(0, client_info(session_present, Client2)),
  487. ok = emqtt:disconnect(Client2).
  488. t_process_dies_session_expires(Config) ->
  489. %% Emulate an error in the connect process,
  490. %% or that the node of the process goes down.
  491. %% A persistent session should eventually expire.
  492. ?check_trace(
  493. begin
  494. ConnFun = ?config(conn_fun, Config),
  495. ClientId = ?config(client_id, Config),
  496. Topic = ?config(topic, Config),
  497. STopic = ?config(stopic, Config),
  498. Payload = <<"test">>,
  499. {ok, Client1} = emqtt:start_link([
  500. {proto_ver, v5},
  501. {clientid, ClientId},
  502. {properties, #{'Session-Expiry-Interval' => 1}},
  503. {clean_start, true}
  504. | Config
  505. ]),
  506. {ok, _} = emqtt:ConnFun(Client1),
  507. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  508. ok = emqtt:disconnect(Client1),
  509. maybe_kill_connection_process(ClientId, Config),
  510. ok = publish(Topic, Payload),
  511. timer:sleep(1500),
  512. {ok, Client2} = emqtt:start_link([
  513. {proto_ver, v5},
  514. {clientid, ClientId},
  515. {properties, #{'Session-Expiry-Interval' => 30}},
  516. {clean_start, false}
  517. | Config
  518. ]),
  519. {ok, _} = emqtt:ConnFun(Client2),
  520. ?assertEqual(0, client_info(session_present, Client2)),
  521. %% We should not receive the pending message
  522. ?assertEqual([], receive_messages(1)),
  523. emqtt:disconnect(Client2)
  524. end,
  525. []
  526. ).
  527. t_publish_while_client_is_gone_qos1(Config) ->
  528. %% A persistent session should receive messages in its
  529. %% subscription even if the process owning the session dies.
  530. ConnFun = ?config(conn_fun, Config),
  531. Topic = ?config(topic, Config),
  532. STopic = ?config(stopic, Config),
  533. Payload1 = <<"hello1">>,
  534. Payload2 = <<"hello2">>,
  535. ClientId = ?config(client_id, Config),
  536. {ok, Client1} = emqtt:start_link([
  537. {proto_ver, v5},
  538. {clientid, ClientId},
  539. {properties, #{'Session-Expiry-Interval' => 30}},
  540. {clean_start, true}
  541. | Config
  542. ]),
  543. {ok, _} = emqtt:ConnFun(Client1),
  544. {ok, _, [1]} = emqtt:subscribe(Client1, STopic, qos1),
  545. ok = emqtt:disconnect(Client1),
  546. maybe_kill_connection_process(ClientId, Config),
  547. ok = publish_many(messages(Topic, [Payload1, Payload2], ?QOS_1)),
  548. {ok, Client2} = emqtt:start_link([
  549. {proto_ver, v5},
  550. {clientid, ClientId},
  551. {properties, #{'Session-Expiry-Interval' => 30}},
  552. {clean_start, false}
  553. | Config
  554. ]),
  555. {ok, _} = emqtt:ConnFun(Client2),
  556. Msgs = receive_messages(2),
  557. ?assertMatch([_, _], Msgs),
  558. [Msg1, Msg2] = Msgs,
  559. ?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)),
  560. ?assertEqual({ok, 1}, maps:find(qos, Msg1)),
  561. ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)),
  562. ?assertEqual({ok, 1}, maps:find(qos, Msg2)),
  563. ok = emqtt:disconnect(Client2).
  564. t_publish_many_while_client_is_gone_qos1(Config) ->
  565. %% A persistent session should receive all of the still unacked messages
  566. %% for its subscriptions after the client dies or reconnects, in addition
  567. %% to new messages that were published while the client was gone. The order
  568. %% of the messages should be consistent across reconnects.
  569. ClientId = ?config(client_id, Config),
  570. ConnFun = ?config(conn_fun, Config),
  571. {ok, Client1} = emqtt:start_link([
  572. {proto_ver, v5},
  573. {clientid, ClientId},
  574. {properties, #{'Session-Expiry-Interval' => 30}},
  575. {clean_start, true},
  576. {auto_ack, never}
  577. | Config
  578. ]),
  579. {ok, _} = emqtt:ConnFun(Client1),
  580. STopics = [
  581. <<"t/+/foo">>,
  582. <<"msg/feed/#">>,
  583. <<"loc/+/+/+">>
  584. ],
  585. [{ok, _, [?QOS_1]} = emqtt:subscribe(Client1, ST, ?QOS_1) || ST <- STopics],
  586. Pubs1 = [
  587. #mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M1">>, qos = 1},
  588. #mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M2">>, qos = 1},
  589. #mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M3">>, qos = 1},
  590. #mqtt_msg{topic = <<"loc/1/2/42">>, payload = <<"M4">>, qos = 1},
  591. #mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M5">>, qos = 1},
  592. #mqtt_msg{topic = <<"loc/3/4/5">>, payload = <<"M6">>, qos = 1},
  593. #mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M7">>, qos = 1}
  594. ],
  595. ok = publish_many(Pubs1),
  596. NPubs1 = length(Pubs1),
  597. Msgs1 = receive_messages(NPubs1),
  598. NMsgs1 = length(Msgs1),
  599. ?assertEqual(NPubs1, NMsgs1),
  600. ct:pal("Msgs1 = ~p", [Msgs1]),
  601. %% TODO
  602. %% This assertion doesn't currently hold because `emqx_ds` doesn't enforce
  603. %% strict ordering reflecting client publishing order. Instead, per-topic
  604. %% ordering is guaranteed per each client. In fact, this violates the MQTT
  605. %% specification, but we deemed it acceptable for now.
  606. %% ?assertMatch([
  607. %% #{payload := <<"M1">>},
  608. %% #{payload := <<"M2">>},
  609. %% #{payload := <<"M3">>},
  610. %% #{payload := <<"M4">>},
  611. %% #{payload := <<"M5">>},
  612. %% #{payload := <<"M6">>},
  613. %% #{payload := <<"M7">>}
  614. %% ], Msgs1),
  615. ?assertEqual(
  616. get_topicwise_order(Pubs1),
  617. get_topicwise_order(Msgs1)
  618. ),
  619. NAcked = 4,
  620. ?assert(NMsgs1 >= NAcked),
  621. [ok = emqtt:puback(Client1, PktId) || #{packet_id := PktId} <- lists:sublist(Msgs1, NAcked)],
  622. %% Ensure that PUBACKs are propagated to the channel.
  623. pong = emqtt:ping(Client1),
  624. ok = disconnect_client(Client1),
  625. maybe_kill_connection_process(ClientId, Config),
  626. Pubs2 = [
  627. #mqtt_msg{topic = <<"loc/3/4/6">>, payload = <<"M8">>, qos = 1},
  628. #mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M9">>, qos = 1},
  629. #mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M10">>, qos = 1},
  630. #mqtt_msg{topic = <<"msg/feed/friend">>, payload = <<"M11">>, qos = 1},
  631. #mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M12">>, qos = 1}
  632. ],
  633. ok = publish_many(Pubs2),
  634. NPubs2 = length(Pubs2),
  635. %% Now reconnect with auto ack to make sure all streams are
  636. %% replayed till the end:
  637. {ok, Client2} = emqtt:start_link([
  638. {proto_ver, v5},
  639. {clientid, ClientId},
  640. {properties, #{'Session-Expiry-Interval' => 30}},
  641. {clean_start, false}
  642. | Config
  643. ]),
  644. {ok, _} = emqtt:ConnFun(Client2),
  645. %% Try to receive _at most_ `NPubs` messages.
  646. %% There shouldn't be that much unacked messages in the replay anyway,
  647. %% but it's an easy number to pick.
  648. NPubs = NPubs1 + NPubs2,
  649. Msgs2 = receive_messages(NPubs, _Timeout = 2000),
  650. NMsgs2 = length(Msgs2),
  651. ct:pal("Msgs2 = ~p", [Msgs2]),
  652. ?assert(NMsgs2 < NPubs, {NMsgs2, '<', NPubs}),
  653. ?assert(NMsgs2 > NPubs2, {NMsgs2, '>', NPubs2}),
  654. ?assert(NMsgs2 >= NPubs - NAcked, Msgs2),
  655. NSame = NMsgs2 - NPubs2,
  656. ?assert(
  657. lists:all(fun(#{dup := Dup}) -> Dup end, lists:sublist(Msgs2, NSame))
  658. ),
  659. ?assertNot(
  660. lists:all(fun(#{dup := Dup}) -> Dup end, lists:nthtail(NSame, Msgs2))
  661. ),
  662. ?assertEqual(
  663. [maps:with([packet_id, topic, payload], M) || M <- lists:nthtail(NMsgs1 - NSame, Msgs1)],
  664. [maps:with([packet_id, topic, payload], M) || M <- lists:sublist(Msgs2, NSame)]
  665. ),
  666. ok = disconnect_client(Client2).
  667. t_publish_while_client_is_gone(Config) ->
  668. %% A persistent session should receive messages in its
  669. %% subscription even if the process owning the session dies.
  670. ConnFun = ?config(conn_fun, Config),
  671. Topic = ?config(topic, Config),
  672. STopic = ?config(stopic, Config),
  673. Payload1 = <<"hello1">>,
  674. Payload2 = <<"hello2">>,
  675. ClientId = ?config(client_id, Config),
  676. {ok, Client1} = emqtt:start_link([
  677. {proto_ver, v5},
  678. {clientid, ClientId},
  679. {properties, #{'Session-Expiry-Interval' => 30}},
  680. {clean_start, true}
  681. | Config
  682. ]),
  683. {ok, _} = emqtt:ConnFun(Client1),
  684. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  685. ok = emqtt:disconnect(Client1),
  686. maybe_kill_connection_process(ClientId, Config),
  687. ok = publish_many(messages(Topic, [Payload1, Payload2])),
  688. {ok, Client2} = emqtt:start_link([
  689. {proto_ver, v5},
  690. {clientid, ClientId},
  691. {properties, #{'Session-Expiry-Interval' => 30}},
  692. {clean_start, false}
  693. | Config
  694. ]),
  695. {ok, _} = emqtt:ConnFun(Client2),
  696. Msgs = receive_messages(2),
  697. ?assertMatch([_, _], Msgs),
  698. [Msg1, Msg2] = Msgs,
  699. ?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)),
  700. ?assertEqual({ok, 2}, maps:find(qos, Msg1)),
  701. ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)),
  702. ?assertEqual({ok, 2}, maps:find(qos, Msg2)),
  703. ok = emqtt:disconnect(Client2).
  704. t_publish_many_while_client_is_gone(Config) ->
  705. %% A persistent session should receive all of the still unacked messages
  706. %% for its subscriptions after the client dies or reconnects, in addition
  707. %% to PUBRELs for the messages it has PUBRECed. While client must send
  708. %% PUBACKs and PUBRECs in order, those orders are independent of each other.
  709. %%
  710. %% Developer's note: for simplicity we publish all messages to the
  711. %% same topic, since persistent session ds may reorder messages
  712. %% that belong to different streams, and this particular test is
  713. %% very sensitive the order.
  714. ClientId = ?config(client_id, Config),
  715. ConnFun = ?config(conn_fun, Config),
  716. ClientOpts = [
  717. {proto_ver, v5},
  718. {clientid, ClientId},
  719. {properties, #{'Session-Expiry-Interval' => 30}},
  720. {auto_ack, never}
  721. | Config
  722. ],
  723. {ok, Client1} = emqtt:start_link([{clean_start, true} | ClientOpts]),
  724. {ok, _} = emqtt:ConnFun(Client1),
  725. {ok, _, [?QOS_2]} = emqtt:subscribe(Client1, <<"t">>, ?QOS_2),
  726. Pubs1 = [
  727. #mqtt_msg{topic = <<"t">>, payload = <<"M1">>, qos = 1},
  728. #mqtt_msg{topic = <<"t">>, payload = <<"M2">>, qos = 1},
  729. #mqtt_msg{topic = <<"t">>, payload = <<"M3">>, qos = 2},
  730. #mqtt_msg{topic = <<"t">>, payload = <<"M4">>, qos = 2},
  731. #mqtt_msg{topic = <<"t">>, payload = <<"M5">>, qos = 2},
  732. #mqtt_msg{topic = <<"t">>, payload = <<"M6">>, qos = 1},
  733. #mqtt_msg{topic = <<"t">>, payload = <<"M7">>, qos = 2},
  734. #mqtt_msg{topic = <<"t">>, payload = <<"M8">>, qos = 1},
  735. #mqtt_msg{topic = <<"t">>, payload = <<"M9">>, qos = 2}
  736. ],
  737. ok = publish_many(Pubs1),
  738. NPubs1 = length(Pubs1),
  739. Msgs1 = receive_messages(NPubs1),
  740. ct:pal("Msgs1 = ~p", [Msgs1]),
  741. NMsgs1 = length(Msgs1),
  742. ?assertEqual(NPubs1, NMsgs1, emqx_persistent_session_ds:print_session(ClientId)),
  743. ?assertEqual(
  744. get_topicwise_order(Pubs1),
  745. get_topicwise_order(Msgs1),
  746. emqx_persistent_session_ds:print_session(ClientId)
  747. ),
  748. %% PUBACK every QoS 1 message.
  749. lists:foreach(
  750. fun(PktId) -> ok = emqtt:puback(Client1, PktId) end,
  751. [PktId || #{qos := 1, packet_id := PktId} <- Msgs1]
  752. ),
  753. %% PUBREC first `NRecs` QoS 2 messages (up to "M5")
  754. NRecs = 3,
  755. PubRecs1 = lists:sublist([PktId || #{qos := 2, packet_id := PktId} <- Msgs1], NRecs),
  756. lists:foreach(
  757. fun(PktId) -> ok = emqtt:pubrec(Client1, PktId) end,
  758. PubRecs1
  759. ),
  760. %% Ensure that PUBACKs / PUBRECs are propagated to the channel.
  761. pong = emqtt:ping(Client1),
  762. %% Receive PUBRELs for the sent PUBRECs.
  763. PubRels1 = receive_messages(NRecs),
  764. ct:pal("PubRels1 = ~p", [PubRels1]),
  765. ?assertEqual(
  766. PubRecs1,
  767. [PktId || {pubrel, #{packet_id := PktId}} <- PubRels1],
  768. PubRels1
  769. ),
  770. ok = disconnect_client(Client1),
  771. maybe_kill_connection_process(ClientId, Config),
  772. Pubs2 = [
  773. #mqtt_msg{topic = <<"t">>, payload = <<"M10">>, qos = 2},
  774. #mqtt_msg{topic = <<"t">>, payload = <<"M11">>, qos = 1},
  775. #mqtt_msg{topic = <<"t">>, payload = <<"M12">>, qos = 2}
  776. ],
  777. ok = publish_many(Pubs2),
  778. NPubs2 = length(Pubs2),
  779. {ok, Client2} = emqtt:start_link([{clean_start, false} | ClientOpts]),
  780. {ok, _} = emqtt:ConnFun(Client2),
  781. %% Try to receive _at most_ `NPubs` messages.
  782. %% There shouldn't be that much unacked messages in the replay anyway,
  783. %% but it's an easy number to pick.
  784. NPubs = NPubs1 + NPubs2,
  785. Msgs2 = receive_messages(NPubs, _Timeout = 2000),
  786. ct:pal("Msgs2 = ~p", [Msgs2]),
  787. %% We should again receive PUBRELs for the PUBRECs we sent earlier.
  788. ?assertEqual(
  789. get_msgs_essentials(PubRels1),
  790. [get_msg_essentials(PubRel) || PubRel = {pubrel, _} <- Msgs2]
  791. ),
  792. %% We should receive duplicates only for QoS 2 messages where PUBRELs were
  793. %% not sent, in the same order as the original messages.
  794. Msgs2Dups = [get_msg_essentials(M) || M = #{dup := true} <- Msgs2],
  795. ?assertEqual(
  796. Msgs2Dups,
  797. [M || M = #{qos := 2} <- Msgs2Dups]
  798. ),
  799. ?assertEqual(
  800. get_msgs_essentials(pick_respective_msgs(Msgs2Dups, Msgs1)),
  801. Msgs2Dups
  802. ),
  803. %% Ack more messages:
  804. PubRecs2 = lists:sublist([PktId || #{qos := 2, packet_id := PktId} <- Msgs2], 2),
  805. lists:foreach(
  806. fun(PktId) -> ok = emqtt:pubrec(Client2, PktId) end,
  807. PubRecs2
  808. ),
  809. PubRels2 = receive_messages(length(PubRecs2)),
  810. ct:pal("PubRels2 = ~p", [PubRels2]),
  811. ?assertEqual(
  812. PubRecs2,
  813. [PktId || {pubrel, #{packet_id := PktId}} <- PubRels2],
  814. PubRels2
  815. ),
  816. %% PUBCOMP every PUBREL.
  817. PubComps = [PktId || {pubrel, #{packet_id := PktId}} <- PubRels1 ++ PubRels2],
  818. ct:pal("PubComps: ~p", [PubComps]),
  819. lists:foreach(
  820. fun(PktId) -> ok = emqtt:pubcomp(Client2, PktId) end,
  821. PubComps
  822. ),
  823. %% Ensure that PUBCOMPs are propagated to the channel.
  824. pong = emqtt:ping(Client2),
  825. %% Reconnect for the last time
  826. ok = disconnect_client(Client2),
  827. maybe_kill_connection_process(ClientId, Config),
  828. {ok, Client3} = emqtt:start_link([{clean_start, false} | ClientOpts]),
  829. {ok, _} = emqtt:ConnFun(Client3),
  830. %% Check that we receive the rest of the messages:
  831. Msgs3 = receive_messages(NPubs, _Timeout = 2000),
  832. ct:pal("Msgs3 = ~p", [Msgs3]),
  833. ?assertMatch(
  834. [<<"M10">>, <<"M11">>, <<"M12">>],
  835. [I || #{payload := I} <- Msgs3]
  836. ),
  837. ok = disconnect_client(Client3).
  838. t_clean_start_drops_subscriptions(Config) ->
  839. %% 1. A persistent session is started and disconnected.
  840. %% 2. While disconnected, a message is published and persisted.
  841. %% 3. When connecting again, the clean start flag is set, the subscription is renewed,
  842. %% then we disconnect again.
  843. %% 4. Finally, a new connection is made with clean start set to false.
  844. %% The original message should not be delivered.
  845. ConnFun = ?config(conn_fun, Config),
  846. Topic = ?config(topic, Config),
  847. STopic = ?config(stopic, Config),
  848. Payload1 = <<"hello1">>,
  849. Payload2 = <<"hello2">>,
  850. Payload3 = <<"hello3">>,
  851. ClientId = ?config(client_id, Config),
  852. %% 1.
  853. {ok, Client1} = emqtt:start_link([
  854. {proto_ver, v5},
  855. {clientid, ClientId},
  856. {properties, #{'Session-Expiry-Interval' => 30}},
  857. {clean_start, true}
  858. | Config
  859. ]),
  860. {ok, _} = emqtt:ConnFun(Client1),
  861. {ok, _, [1]} = emqtt:subscribe(Client1, STopic, qos1),
  862. ok = emqtt:disconnect(Client1),
  863. maybe_kill_connection_process(ClientId, Config),
  864. %% 2.
  865. ok = publish(Topic, Payload1, ?QOS_1),
  866. timer:sleep(1000),
  867. %% 3.
  868. {ok, Client2} = emqtt:start_link([
  869. {proto_ver, v5},
  870. {clientid, ClientId},
  871. {properties, #{'Session-Expiry-Interval' => 30}},
  872. {clean_start, true}
  873. | Config
  874. ]),
  875. {ok, _} = emqtt:ConnFun(Client2),
  876. ?assertEqual(0, client_info(session_present, Client2)),
  877. {ok, _, [1]} = emqtt:subscribe(Client2, STopic, qos1),
  878. timer:sleep(100),
  879. ok = publish(Topic, Payload2, ?QOS_1),
  880. [Msg1] = receive_messages(1),
  881. ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg1)),
  882. pong = emqtt:ping(Client2),
  883. ok = emqtt:disconnect(Client2),
  884. maybe_kill_connection_process(ClientId, Config),
  885. %% 4.
  886. {ok, Client3} = emqtt:start_link([
  887. {proto_ver, v5},
  888. {clientid, ClientId},
  889. {properties, #{'Session-Expiry-Interval' => 30}},
  890. {clean_start, false}
  891. | Config
  892. ]),
  893. {ok, _} = emqtt:ConnFun(Client3),
  894. ok = publish(Topic, Payload3, ?QOS_1),
  895. [Msg2] = receive_messages(1),
  896. ?assertEqual({ok, iolist_to_binary(Payload3)}, maps:find(payload, Msg2)),
  897. pong = emqtt:ping(Client3),
  898. ok = emqtt:disconnect(Client3).
  899. t_unsubscribe(Config) ->
  900. ConnFun = ?config(conn_fun, Config),
  901. STopic = ?config(stopic, Config),
  902. ClientId = ?config(client_id, Config),
  903. {ok, Client} = emqtt:start_link([
  904. {clientid, ClientId},
  905. {proto_ver, v5},
  906. {properties, #{'Session-Expiry-Interval' => 30}}
  907. | Config
  908. ]),
  909. {ok, _} = emqtt:ConnFun(Client),
  910. {ok, _, [2]} = emqtt:subscribe(Client, STopic, qos2),
  911. ?assertMatch([_], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
  912. {ok, _, _} = emqtt:unsubscribe(Client, STopic),
  913. ?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
  914. ok = emqtt:disconnect(Client).
  915. %% This testcase verifies that un-acked messages that were once sent
  916. %% to the client are still retransmitted after the session
  917. %% unsubscribes from the topic and reconnects.
  918. t_unsubscribe_replay(Config) ->
  919. ConnFun = ?config(conn_fun, Config),
  920. TopicPrefix = ?config(topic, Config),
  921. ClientId = atom_to_binary(?FUNCTION_NAME),
  922. ClientOpts = [
  923. {proto_ver, v5},
  924. {clientid, ClientId},
  925. {properties, #{'Session-Expiry-Interval' => 30, 'Receive-Maximum' => 10}},
  926. {max_inflight, 10}
  927. | Config
  928. ],
  929. {ok, Sub} = emqtt:start_link([{clean_start, true}, {auto_ack, never} | ClientOpts]),
  930. {ok, _} = emqtt:ConnFun(Sub),
  931. %% 1. Make two subscriptions, one is to be deleted:
  932. Topic1 = iolist_to_binary([TopicPrefix, $/, <<"unsub">>]),
  933. Topic2 = iolist_to_binary([TopicPrefix, $/, <<"sub">>]),
  934. ?assertMatch({ok, _, _}, emqtt:subscribe(Sub, Topic1, qos2)),
  935. ?assertMatch({ok, _, _}, emqtt:subscribe(Sub, Topic2, qos2)),
  936. %% 2. Publish 2 messages to the first and second topics each
  937. %% (client doesn't ack them):
  938. ok = publish(Topic1, <<"1">>, ?QOS_1),
  939. ok = publish(Topic1, <<"2">>, ?QOS_2),
  940. [Msg1, Msg2] = receive_messages(2),
  941. ?assertMatch(
  942. [
  943. #{payload := <<"1">>},
  944. #{payload := <<"2">>}
  945. ],
  946. [Msg1, Msg2]
  947. ),
  948. ok = publish(Topic2, <<"3">>, ?QOS_1),
  949. ok = publish(Topic2, <<"4">>, ?QOS_2),
  950. [Msg3, Msg4] = receive_messages(2),
  951. ?assertMatch(
  952. [
  953. #{payload := <<"3">>},
  954. #{payload := <<"4">>}
  955. ],
  956. [Msg3, Msg4]
  957. ),
  958. %% 3. Unsubscribe from the topic and disconnect:
  959. ?assertMatch({ok, _, _}, emqtt:unsubscribe(Sub, Topic1)),
  960. ok = emqtt:disconnect(Sub),
  961. %% 5. Publish more messages to the disconnected topic:
  962. ok = publish(Topic1, <<"5">>, ?QOS_1),
  963. ok = publish(Topic1, <<"6">>, ?QOS_2),
  964. %% 4. Reconnect the client. It must only receive only four
  965. %% messages from the time when it was subscribed:
  966. {ok, Sub1} = emqtt:start_link([{clean_start, false}, {auto_ack, true} | ClientOpts]),
  967. ?assertMatch({ok, _}, emqtt:ConnFun(Sub1)),
  968. %% Note: we ask for 6 messages, but expect only 4, it's
  969. %% intentional:
  970. ?assertMatch(
  971. #{
  972. Topic1 := [<<"1">>, <<"2">>],
  973. Topic2 := [<<"3">>, <<"4">>]
  974. },
  975. get_topicwise_order(receive_messages(6, 5_000)),
  976. debug_info(ClientId)
  977. ),
  978. %% 5. Now let's resubscribe, and check that the session can receive new messages:
  979. ?assertMatch({ok, _, _}, emqtt:subscribe(Sub1, Topic1, qos2)),
  980. ok = publish(Topic1, <<"7">>, ?QOS_0),
  981. ok = publish(Topic1, <<"8">>, ?QOS_1),
  982. ok = publish(Topic1, <<"9">>, ?QOS_2),
  983. ?assertMatch(
  984. [<<"7">>, <<"8">>, <<"9">>],
  985. lists:map(fun get_msgpub_payload/1, receive_messages(3))
  986. ),
  987. ok = emqtt:disconnect(Sub1).
  988. %% This testcase verifies that persistent sessions handle "transient"
  989. %% mesages correctly.
  990. %%
  991. %% Transient messages are delivered to the channel directly, bypassing
  992. %% the broker code that decides whether the messages should be
  993. %% persisted or not, and therefore they are not persisted.
  994. %%
  995. %% `emqx_retainer' is an example of application that uses this
  996. %% mechanism.
  997. %%
  998. %% This testcase creates the conditions when the transient messages
  999. %% appear in the middle of the replay, to make sure the durable
  1000. %% session doesn't get confused and/or stuck if retained messages are
  1001. %% changed while the session was down.
  1002. t_transient(Config) ->
  1003. ConnFun = ?config(conn_fun, Config),
  1004. TopicPrefix = ?config(topic, Config),
  1005. ClientId = atom_to_binary(?FUNCTION_NAME),
  1006. ClientOpts = [
  1007. {proto_ver, v5},
  1008. {clientid, ClientId},
  1009. {properties, #{'Session-Expiry-Interval' => 30, 'Receive-Maximum' => 100}},
  1010. {max_inflight, 100}
  1011. | Config
  1012. ],
  1013. Deliver = fun(Topic, Payload, QoS) ->
  1014. [Pid] = emqx_cm:lookup_channels(ClientId),
  1015. Msg = emqx_message:make(_From = <<"test">>, QoS, Topic, Payload),
  1016. Pid ! {deliver, Topic, Msg}
  1017. end,
  1018. Topic1 = <<TopicPrefix/binary, "/1">>,
  1019. Topic2 = <<TopicPrefix/binary, "/2">>,
  1020. Topic3 = <<TopicPrefix/binary, "/3">>,
  1021. %% 1. Start the client and subscribe to the topic:
  1022. {ok, Sub} = emqtt:start_link([{clean_start, true}, {auto_ack, never} | ClientOpts]),
  1023. ?assertMatch({ok, _}, emqtt:ConnFun(Sub)),
  1024. ?assertMatch({ok, _, _}, emqtt:subscribe(Sub, <<TopicPrefix/binary, "/#">>, qos2)),
  1025. %% 2. Publish regular messages:
  1026. publish(Topic1, <<"1">>, ?QOS_1),
  1027. publish(Topic1, <<"2">>, ?QOS_2),
  1028. Msgs1 = receive_messages(2),
  1029. [#{payload := <<"1">>, packet_id := PI1}, #{payload := <<"2">>, packet_id := PI2}] = Msgs1,
  1030. %% 3. Publish and recieve transient messages:
  1031. Deliver(Topic2, <<"3">>, ?QOS_0),
  1032. Deliver(Topic2, <<"4">>, ?QOS_1),
  1033. Deliver(Topic2, <<"5">>, ?QOS_2),
  1034. Msgs2 = receive_messages(3),
  1035. ?assertMatch(
  1036. [
  1037. #{payload := <<"3">>, qos := ?QOS_0},
  1038. #{payload := <<"4">>, qos := ?QOS_1},
  1039. #{payload := <<"5">>, qos := ?QOS_2}
  1040. ],
  1041. Msgs2
  1042. ),
  1043. %% 4. Publish more regular messages:
  1044. publish(Topic3, <<"6">>, ?QOS_1),
  1045. publish(Topic3, <<"7">>, ?QOS_2),
  1046. Msgs3 = receive_messages(2),
  1047. [#{payload := <<"6">>, packet_id := PI6}, #{payload := <<"7">>, packet_id := PI7}] = Msgs3,
  1048. %% 5. Reconnect the client:
  1049. ok = emqtt:disconnect(Sub),
  1050. {ok, Sub1} = emqtt:start_link([{clean_start, false}, {auto_ack, true} | ClientOpts]),
  1051. ?assertMatch({ok, _}, emqtt:ConnFun(Sub1)),
  1052. %% 6. Recieve the historic messages and check that their packet IDs didn't change:
  1053. %% Note: durable session currenty WON'T replay transient messages.
  1054. ProcessMessage = fun(#{payload := P, packet_id := ID}) -> {ID, P} end,
  1055. ?assertMatch(
  1056. #{
  1057. Topic1 := [{PI1, <<"1">>}, {PI2, <<"2">>}],
  1058. Topic3 := [{PI6, <<"6">>}, {PI7, <<"7">>}]
  1059. },
  1060. maps:groups_from_list(fun get_msgpub_topic/1, ProcessMessage, receive_messages(7, 5_000))
  1061. ),
  1062. %% 7. Finish off by sending messages to all the topics to make
  1063. %% sure none of the streams are blocked:
  1064. [publish(T, <<"fin">>, ?QOS_2) || T <- [Topic1, Topic2, Topic3]],
  1065. ?assertMatch(
  1066. #{
  1067. Topic1 := [<<"fin">>],
  1068. Topic2 := [<<"fin">>],
  1069. Topic3 := [<<"fin">>]
  1070. },
  1071. get_topicwise_order(receive_messages(3))
  1072. ),
  1073. ok = emqtt:disconnect(Sub1).
  1074. t_multiple_subscription_matches(Config) ->
  1075. ConnFun = ?config(conn_fun, Config),
  1076. Topic = ?config(topic, Config),
  1077. STopic1 = ?config(stopic, Config),
  1078. STopic2 = ?config(stopic_alt, Config),
  1079. Payload = <<"test message">>,
  1080. ClientId = ?config(client_id, Config),
  1081. {ok, Client1} = emqtt:start_link([
  1082. {clientid, ClientId},
  1083. {proto_ver, v5},
  1084. {properties, #{'Session-Expiry-Interval' => 30}}
  1085. | Config
  1086. ]),
  1087. {ok, _} = emqtt:ConnFun(Client1),
  1088. {ok, _, [2]} = emqtt:subscribe(Client1, STopic1, qos2),
  1089. {ok, _, [2]} = emqtt:subscribe(Client1, STopic2, qos2),
  1090. ok = emqtt:disconnect(Client1),
  1091. maybe_kill_connection_process(ClientId, Config),
  1092. publish(Topic, Payload),
  1093. {ok, Client2} = emqtt:start_link([
  1094. {clientid, ClientId},
  1095. {proto_ver, v5},
  1096. {properties, #{'Session-Expiry-Interval' => 30}},
  1097. {clean_start, false}
  1098. | Config
  1099. ]),
  1100. {ok, _} = emqtt:ConnFun(Client2),
  1101. %% We will receive the same message twice because it matches two subscriptions.
  1102. [Msg1, Msg2] = receive_messages(2),
  1103. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg1)),
  1104. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg1)),
  1105. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg2)),
  1106. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg2)),
  1107. ?assertEqual({ok, 2}, maps:find(qos, Msg1)),
  1108. ?assertEqual({ok, 2}, maps:find(qos, Msg2)),
  1109. ok = emqtt:disconnect(Client2).
  1110. %% Check that we don't get a will message when the client disconnects with success reason
  1111. %% code, with `Will-Delay-Interval' = 0, `Session-Expiry-Interval' > 0, QoS = 1.
  1112. t_no_will_message(Config) ->
  1113. ConnFun = ?config(conn_fun, Config),
  1114. WillTopic = ?config(topic, Config),
  1115. WillPayload = <<"will message">>,
  1116. ClientId = ?config(client_id, Config),
  1117. ?check_trace(
  1118. #{timetrap => 15_000},
  1119. begin
  1120. ok = emqx:subscribe(WillTopic, #{qos => 2}),
  1121. {ok, Client} = emqtt:start_link([
  1122. {clientid, ClientId},
  1123. {proto_ver, v5},
  1124. {properties, #{'Session-Expiry-Interval' => 1}},
  1125. {will_topic, WillTopic},
  1126. {will_payload, WillPayload},
  1127. {will_qos, 1},
  1128. {will_props, #{'Will-Delay-Interval' => 0}}
  1129. | Config
  1130. ]),
  1131. {ok, _} = emqtt:ConnFun(Client),
  1132. ok = emqtt:disconnect(Client, ?RC_SUCCESS),
  1133. %% No will message
  1134. ?assertNotReceive({deliver, WillTopic, _}, 5_000),
  1135. ok
  1136. end,
  1137. []
  1138. ),
  1139. ok.
  1140. %% Check that we get a single will message when the client disconnects with a non
  1141. %% successfull reason code, with `Will-Delay-Interval' = `Session-Expiry-Interval' > 0,
  1142. %% QoS = 1.
  1143. t_will_message1(Config) ->
  1144. do_t_will_message(Config, #{will_delay => 1, session_expiry => 1}),
  1145. ok.
  1146. %% Check that we get a single will message when the client disconnects with a non
  1147. %% successfull reason code, with `Will-Delay-Interval' = 0, `Session-Expiry-Interval' > 0,
  1148. %% QoS = 1.
  1149. t_will_message2(Config) ->
  1150. do_t_will_message(Config, #{will_delay => 0, session_expiry => 1}),
  1151. ok.
  1152. %% Check that we get a single will message when the client disconnects with a non
  1153. %% successfull reason code, with `Will-Delay-Interval' >> `Session-Expiry-Interval' > 0,
  1154. %% QoS = 1.
  1155. t_will_message3(Config) ->
  1156. do_t_will_message(Config, #{will_delay => 300, session_expiry => 1}),
  1157. ok.
  1158. do_t_will_message(Config, Opts) ->
  1159. #{
  1160. session_expiry := SessionExpiry,
  1161. will_delay := WillDelay
  1162. } = Opts,
  1163. ConnFun = ?config(conn_fun, Config),
  1164. WillTopic = ?config(topic, Config),
  1165. WillPayload = <<"will message">>,
  1166. ClientId = ?config(client_id, Config),
  1167. ?check_trace(
  1168. #{timetrap => 15_000},
  1169. begin
  1170. ok = emqx:subscribe(WillTopic, #{qos => 2}),
  1171. {ok, Client} = emqtt:start_link([
  1172. {clientid, ClientId},
  1173. {proto_ver, v5},
  1174. {properties, #{'Session-Expiry-Interval' => SessionExpiry}},
  1175. {will_topic, WillTopic},
  1176. {will_payload, WillPayload},
  1177. {will_qos, 1},
  1178. {will_props, #{'Will-Delay-Interval' => WillDelay}}
  1179. | Config
  1180. ]),
  1181. {ok, _} = emqtt:ConnFun(Client),
  1182. ok = emqtt:disconnect(Client, ?RC_UNSPECIFIED_ERROR),
  1183. ?assertReceive({deliver, WillTopic, #message{payload = WillPayload}}, 10_000),
  1184. %% No duplicates
  1185. ?assertNotReceive({deliver, WillTopic, _}, 100),
  1186. ok
  1187. end,
  1188. []
  1189. ),
  1190. ok.
  1191. t_sys_message_delivery(Config) ->
  1192. ConnFun = ?config(conn_fun, Config),
  1193. SysTopicFilter = emqx_topic:join(["$SYS", "brokers", '+', "uptime"]),
  1194. SysTopic = emqx_topic:join(["$SYS", "brokers", atom_to_list(node()), "uptime"]),
  1195. ClientId = ?config(client_id, Config),
  1196. {ok, Client1} = emqtt:start_link([
  1197. {clientid, ClientId},
  1198. {proto_ver, v5},
  1199. {properties, #{'Session-Expiry-Interval' => 30}}
  1200. | Config
  1201. ]),
  1202. {ok, _} = emqtt:ConnFun(Client1),
  1203. {ok, _, [1]} = emqtt:subscribe(Client1, SysTopicFilter, [{qos, 1}, {rh, 2}]),
  1204. ?assertMatch(
  1205. [
  1206. #{topic := SysTopic, qos := 0, retain := false, payload := _Uptime1},
  1207. #{topic := SysTopic, qos := 0, retain := false, payload := _Uptime2}
  1208. ],
  1209. receive_messages(2)
  1210. ),
  1211. ok = emqtt:disconnect(Client1),
  1212. {ok, Client2} = emqtt:start_link([
  1213. {clientid, ClientId},
  1214. {proto_ver, v5},
  1215. {properties, #{'Session-Expiry-Interval' => 30}},
  1216. {clean_start, false}
  1217. | Config
  1218. ]),
  1219. {ok, _} = emqtt:ConnFun(Client2),
  1220. ?assertMatch(
  1221. [#{topic := SysTopic, qos := 0, retain := false, payload := _Uptime3}],
  1222. receive_messages(1)
  1223. ).
  1224. get_topicwise_order(Msgs) ->
  1225. maps:groups_from_list(fun get_msgpub_topic/1, fun get_msgpub_payload/1, Msgs).
  1226. get_msgpub_topic(#mqtt_msg{topic = Topic}) ->
  1227. Topic;
  1228. get_msgpub_topic(#{topic := Topic}) ->
  1229. Topic.
  1230. get_msgpub_payload(#mqtt_msg{payload = Payload}) ->
  1231. Payload;
  1232. get_msgpub_payload(#{payload := Payload}) ->
  1233. Payload.
  1234. get_msg_essentials(Msg = #{}) ->
  1235. maps:with([packet_id, topic, payload, qos], Msg);
  1236. get_msg_essentials({pubrel, Msg}) ->
  1237. {pubrel, maps:with([packet_id, reason_code], Msg)}.
  1238. get_msgs_essentials(Msgs) ->
  1239. [get_msg_essentials(M) || M <- Msgs].
  1240. pick_respective_msgs(MsgRefs, Msgs) ->
  1241. [M || M <- Msgs, Ref <- MsgRefs, maps:get(packet_id, M) =:= maps:get(packet_id, Ref)].
  1242. debug_info(ClientId) ->
  1243. Info = emqx_persistent_session_ds:print_session(ClientId),
  1244. ct:pal("*** State:~n~p", [Info]).