emqx_persistent_session_SUITE.erl 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_persistent_session_SUITE).
  17. -include_lib("stdlib/include/assert.hrl").
  18. -include_lib("common_test/include/ct.hrl").
  19. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  20. -include_lib("emqx/include/emqx_mqtt.hrl").
  21. -compile(export_all).
  22. -compile(nowarn_export_all).
  23. %%--------------------------------------------------------------------
  24. %% SUITE boilerplate
  25. %%--------------------------------------------------------------------
  26. all() ->
  27. [
  28. % NOTE
  29. % Tests are disabled while existing session persistence impl is being
  30. % phased out.
  31. {group, persistent_store_disabled},
  32. {group, persistent_store_ds}
  33. ].
  34. %% A persistent session can be resumed in two ways:
  35. %% 1. The old connection process is still alive, and the session is taken
  36. %% over by the new connection.
  37. %% 2. The old session process has died (e.g., because of node down).
  38. %% The new process resumes the session from the stored state, and finds
  39. %% any subscribed messages from the persistent message store.
  40. %%
  41. %% We want to test both ways, both with the db backend enabled and disabled.
  42. %%
  43. %% In addition, we test both tcp and quic connections.
  44. groups() ->
  45. TCs = emqx_common_test_helpers:all(?MODULE),
  46. TCsNonGeneric = [t_choose_impl],
  47. [
  48. {persistent_store_disabled, [{group, no_kill_connection_process}]},
  49. {persistent_store_ds, [{group, no_kill_connection_process}]},
  50. {no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]},
  51. {tcp, [], TCs},
  52. {quic, [], TCs -- TCsNonGeneric},
  53. {ws, [], TCs -- TCsNonGeneric}
  54. ].
  55. init_per_group(persistent_store_disabled, Config) ->
  56. [
  57. {emqx_config, "persistent_session_store { enabled = false }"},
  58. {persistent_store, false}
  59. | Config
  60. ];
  61. init_per_group(persistent_store_ds, Config) ->
  62. [
  63. {emqx_config, "persistent_session_store { ds = true }"},
  64. {persistent_store, ds}
  65. | Config
  66. ];
  67. init_per_group(Group, Config) when Group == tcp ->
  68. Apps = emqx_cth_suite:start(
  69. [{emqx, ?config(emqx_config, Config)}],
  70. #{work_dir => emqx_cth_suite:work_dir(Config)}
  71. ),
  72. [
  73. {port, get_listener_port(tcp, default)},
  74. {conn_fun, connect},
  75. {group_apps, Apps}
  76. | Config
  77. ];
  78. init_per_group(Group, Config) when Group == ws ->
  79. Apps = emqx_cth_suite:start(
  80. [{emqx, ?config(emqx_config, Config)}],
  81. #{work_dir => emqx_cth_suite:work_dir(Config)}
  82. ),
  83. [
  84. {ssl, false},
  85. {host, "localhost"},
  86. {enable_websocket, true},
  87. {port, get_listener_port(ws, default)},
  88. {conn_fun, ws_connect},
  89. {group_apps, Apps}
  90. | Config
  91. ];
  92. init_per_group(Group, Config) when Group == quic ->
  93. Apps = emqx_cth_suite:start(
  94. [
  95. {emqx,
  96. ?config(emqx_config, Config) ++
  97. "\n listeners.quic.test {"
  98. "\n enable = true"
  99. "\n ssl_options.verify = verify_peer"
  100. "\n }"}
  101. ],
  102. #{work_dir => emqx_cth_suite:work_dir(Config)}
  103. ),
  104. [
  105. {port, get_listener_port(quic, test)},
  106. {conn_fun, quic_connect},
  107. {ssl_opts, emqx_common_test_helpers:client_ssl_twoway()},
  108. {ssl, true},
  109. {group_apps, Apps}
  110. | Config
  111. ];
  112. init_per_group(no_kill_connection_process, Config) ->
  113. [{kill_connection_process, false} | Config];
  114. init_per_group(kill_connection_process, Config) ->
  115. [{kill_connection_process, true} | Config].
  116. get_listener_port(Type, Name) ->
  117. case emqx_config:get([listeners, Type, Name, bind]) of
  118. {_, Port} -> Port;
  119. Port -> Port
  120. end.
  121. end_per_group(Group, Config) when Group == tcp; Group == ws; Group == quic ->
  122. ok = emqx_cth_suite:stop(?config(group_apps, Config));
  123. end_per_group(_, _Config) ->
  124. ok.
  125. init_per_testcase(TestCase, Config) ->
  126. Config1 = preconfig_per_testcase(TestCase, Config),
  127. case erlang:function_exported(?MODULE, TestCase, 2) of
  128. true -> ?MODULE:TestCase(init, Config1);
  129. _ -> Config1
  130. end.
  131. end_per_testcase(TestCase, Config) ->
  132. case erlang:function_exported(?MODULE, TestCase, 2) of
  133. true -> ?MODULE:TestCase('end', Config);
  134. false -> ok
  135. end,
  136. Config.
  137. preconfig_per_testcase(TestCase, Config) ->
  138. {BaseName, Config1} =
  139. case ?config(tc_group_properties, Config) of
  140. [] ->
  141. %% We are running a single testcase
  142. {
  143. atom_to_binary(TestCase),
  144. init_per_group(tcp, init_per_group(kill_connection_process, Config))
  145. };
  146. [_ | _] = Props ->
  147. Path = lists:reverse(?config(tc_group_path, Config) ++ Props),
  148. Pre0 = [atom_to_list(N) || {name, N} <- lists:flatten(Path)],
  149. Pre1 = lists:join("_", Pre0 ++ [atom_to_binary(TestCase)]),
  150. {iolist_to_binary(Pre1), Config}
  151. end,
  152. [
  153. {topic, iolist_to_binary([BaseName, "/foo"])},
  154. {stopic, iolist_to_binary([BaseName, "/+"])},
  155. {stopic_alt, iolist_to_binary([BaseName, "/foo"])},
  156. {client_id, BaseName}
  157. | Config1
  158. ].
  159. %%--------------------------------------------------------------------
  160. %% Helpers
  161. %%--------------------------------------------------------------------
  162. client_info(Key, Client) ->
  163. maps:get(Key, maps:from_list(emqtt:info(Client)), undefined).
  164. receive_messages(Count) ->
  165. receive_messages(Count, []).
  166. receive_messages(0, Msgs) ->
  167. Msgs;
  168. receive_messages(Count, Msgs) ->
  169. receive
  170. {publish, Msg} ->
  171. receive_messages(Count - 1, [Msg | Msgs]);
  172. _Other ->
  173. receive_messages(Count, Msgs)
  174. after 5000 ->
  175. Msgs
  176. end.
  177. maybe_kill_connection_process(ClientId, Config) ->
  178. case ?config(kill_connection_process, Config) of
  179. true ->
  180. case emqx_cm:lookup_channels(ClientId) of
  181. [] ->
  182. ok;
  183. [ConnectionPid] ->
  184. ?assert(is_pid(ConnectionPid)),
  185. Ref = monitor(process, ConnectionPid),
  186. ConnectionPid ! die_if_test,
  187. receive
  188. {'DOWN', Ref, process, ConnectionPid, normal} -> ok
  189. after 3000 -> error(process_did_not_die)
  190. end,
  191. wait_for_cm_unregister(ClientId)
  192. end;
  193. false ->
  194. ok
  195. end.
  196. wait_for_cm_unregister(ClientId) ->
  197. wait_for_cm_unregister(ClientId, 100).
  198. wait_for_cm_unregister(_ClientId, 0) ->
  199. error(cm_did_not_unregister);
  200. wait_for_cm_unregister(ClientId, N) ->
  201. case emqx_cm:lookup_channels(ClientId) of
  202. [] ->
  203. ok;
  204. [_] ->
  205. timer:sleep(100),
  206. wait_for_cm_unregister(ClientId, N - 1)
  207. end.
  208. publish(Topic, Payloads) ->
  209. publish(Topic, Payloads, false).
  210. publish(Topic, Payloads, WaitForUnregister) ->
  211. Fun = fun(Client, Payload) ->
  212. {ok, _} = emqtt:publish(Client, Topic, Payload, 2)
  213. end,
  214. do_publish(Payloads, Fun, WaitForUnregister).
  215. do_publish(Payloads = [_ | _], PublishFun, WaitForUnregister) ->
  216. %% Publish from another process to avoid connection confusion.
  217. {Pid, Ref} =
  218. spawn_monitor(
  219. fun() ->
  220. %% For convenience, always publish using tcp.
  221. %% The publish path is not what we are testing.
  222. ClientID = <<"ps_SUITE_publisher">>,
  223. {ok, Client} = emqtt:start_link([
  224. {proto_ver, v5},
  225. {clientid, ClientID},
  226. {port, 1883}
  227. ]),
  228. {ok, _} = emqtt:connect(Client),
  229. lists:foreach(fun(Payload) -> PublishFun(Client, Payload) end, Payloads),
  230. ok = emqtt:disconnect(Client),
  231. %% Snabbkaffe sometimes fails unless all processes are gone.
  232. case WaitForUnregister of
  233. false ->
  234. ok;
  235. true ->
  236. case emqx_cm:lookup_channels(ClientID) of
  237. [] ->
  238. ok;
  239. [ConnectionPid] ->
  240. ?assert(is_pid(ConnectionPid)),
  241. Ref1 = monitor(process, ConnectionPid),
  242. receive
  243. {'DOWN', Ref1, process, ConnectionPid, _} -> ok
  244. after 3000 -> error(process_did_not_die)
  245. end,
  246. wait_for_cm_unregister(ClientID)
  247. end
  248. end
  249. end
  250. ),
  251. receive
  252. {'DOWN', Ref, process, Pid, normal} -> ok;
  253. {'DOWN', Ref, process, Pid, What} -> error({failed_publish, What})
  254. end;
  255. do_publish(Payload, PublishFun, WaitForUnregister) ->
  256. do_publish([Payload], PublishFun, WaitForUnregister).
  257. %%--------------------------------------------------------------------
  258. %% Test Cases
  259. %%--------------------------------------------------------------------
  260. t_choose_impl(Config) ->
  261. ClientId = ?config(client_id, Config),
  262. ConnFun = ?config(conn_fun, Config),
  263. {ok, Client} = emqtt:start_link([
  264. {clientid, ClientId},
  265. {proto_ver, v5},
  266. {properties, #{'Session-Expiry-Interval' => 30}}
  267. | Config
  268. ]),
  269. {ok, _} = emqtt:ConnFun(Client),
  270. [ChanPid] = emqx_cm:lookup_channels(ClientId),
  271. ?assertEqual(
  272. case ?config(persistent_store, Config) of
  273. false -> emqx_session_mem;
  274. ds -> emqx_persistent_session_ds
  275. end,
  276. emqx_connection:info({channel, {session, impl}}, sys:get_state(ChanPid))
  277. ).
  278. t_connect_discards_existing_client(Config) ->
  279. ClientId = ?config(client_id, Config),
  280. ConnFun = ?config(conn_fun, Config),
  281. ClientOpts = [
  282. {clientid, ClientId},
  283. {proto_ver, v5},
  284. {properties, #{'Session-Expiry-Interval' => 30}}
  285. | Config
  286. ],
  287. {ok, Client1} = emqtt:start_link(ClientOpts),
  288. true = unlink(Client1),
  289. MRef = erlang:monitor(process, Client1),
  290. {ok, _} = emqtt:ConnFun(Client1),
  291. {ok, Client2} = emqtt:start_link(ClientOpts),
  292. {ok, _} = emqtt:ConnFun(Client2),
  293. receive
  294. {'DOWN', MRef, process, Client1, Reason} ->
  295. ok = ?assertMatch({disconnected, ?RC_SESSION_TAKEN_OVER, _}, Reason),
  296. ok = emqtt:stop(Client2),
  297. ok
  298. after 1000 ->
  299. error({client_still_connected, Client1})
  300. end.
  301. %% [MQTT-3.1.2-23]
  302. t_connect_session_expiry_interval(init, Config) -> skip_ds_tc(Config);
  303. t_connect_session_expiry_interval('end', _Config) -> ok.
  304. t_connect_session_expiry_interval(Config) ->
  305. ConnFun = ?config(conn_fun, Config),
  306. Topic = ?config(topic, Config),
  307. STopic = ?config(stopic, Config),
  308. Payload = <<"test message">>,
  309. ClientId = ?config(client_id, Config),
  310. {ok, Client1} = emqtt:start_link([
  311. {clientid, ClientId},
  312. {proto_ver, v5},
  313. {properties, #{'Session-Expiry-Interval' => 30}}
  314. | Config
  315. ]),
  316. {ok, _} = emqtt:ConnFun(Client1),
  317. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  318. ok = emqtt:disconnect(Client1),
  319. maybe_kill_connection_process(ClientId, Config),
  320. publish(Topic, Payload),
  321. {ok, Client2} = emqtt:start_link([
  322. {clientid, ClientId},
  323. {proto_ver, v5},
  324. {properties, #{'Session-Expiry-Interval' => 30}},
  325. {clean_start, false}
  326. | Config
  327. ]),
  328. {ok, _} = emqtt:ConnFun(Client2),
  329. [Msg | _] = receive_messages(1),
  330. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)),
  331. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)),
  332. ?assertEqual({ok, 2}, maps:find(qos, Msg)),
  333. ok = emqtt:disconnect(Client2).
  334. t_without_client_id(Config) ->
  335. %% Emqtt client dies
  336. process_flag(trap_exit, true),
  337. ConnFun = ?config(conn_fun, Config),
  338. {ok, Client0} = emqtt:start_link([
  339. {proto_ver, v5},
  340. {properties, #{'Session-Expiry-Interval' => 30}},
  341. {clean_start, false}
  342. | Config
  343. ]),
  344. {error, {client_identifier_not_valid, _}} = emqtt:ConnFun(Client0),
  345. ok.
  346. t_assigned_clientid_persistent_session(Config) ->
  347. ConnFun = ?config(conn_fun, Config),
  348. {ok, Client1} = emqtt:start_link([
  349. {proto_ver, v5},
  350. {properties, #{'Session-Expiry-Interval' => 30}},
  351. {clean_start, true}
  352. | Config
  353. ]),
  354. {ok, _} = emqtt:ConnFun(Client1),
  355. AssignedClientId = client_info(clientid, Client1),
  356. ok = emqtt:disconnect(Client1),
  357. maybe_kill_connection_process(AssignedClientId, Config),
  358. {ok, Client2} = emqtt:start_link([
  359. {clientid, AssignedClientId},
  360. {proto_ver, v5},
  361. {clean_start, false}
  362. | Config
  363. ]),
  364. {ok, _} = emqtt:ConnFun(Client2),
  365. ?assertEqual(1, client_info(session_present, Client2)),
  366. ok = emqtt:disconnect(Client2).
  367. t_cancel_on_disconnect(Config) ->
  368. %% Open a persistent session, but cancel the persistence when
  369. %% shutting down the connection.
  370. ConnFun = ?config(conn_fun, Config),
  371. ClientId = ?config(client_id, Config),
  372. {ok, Client1} = emqtt:start_link([
  373. {proto_ver, v5},
  374. {clientid, ClientId},
  375. {properties, #{'Session-Expiry-Interval' => 30}},
  376. {clean_start, true}
  377. | Config
  378. ]),
  379. {ok, _} = emqtt:ConnFun(Client1),
  380. ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 0}),
  381. wait_for_cm_unregister(ClientId),
  382. {ok, Client2} = emqtt:start_link([
  383. {clientid, ClientId},
  384. {proto_ver, v5},
  385. {clean_start, false},
  386. {properties, #{'Session-Expiry-Interval' => 30}}
  387. | Config
  388. ]),
  389. {ok, _} = emqtt:ConnFun(Client2),
  390. ?assertEqual(0, client_info(session_present, Client2)),
  391. ok = emqtt:disconnect(Client2).
  392. t_persist_on_disconnect(Config) ->
  393. %% Open a non-persistent session, but add the persistence when
  394. %% shutting down the connection. This is a protocol error, and
  395. %% should not convert the session into a persistent session.
  396. ConnFun = ?config(conn_fun, Config),
  397. ClientId = ?config(client_id, Config),
  398. {ok, Client1} = emqtt:start_link([
  399. {proto_ver, v5},
  400. {clientid, ClientId},
  401. {properties, #{'Session-Expiry-Interval' => 0}},
  402. {clean_start, true}
  403. | Config
  404. ]),
  405. {ok, _} = emqtt:ConnFun(Client1),
  406. %% Strangely enough, the disconnect is reported as successful by emqtt.
  407. ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 30}),
  408. wait_for_cm_unregister(ClientId),
  409. {ok, Client2} = emqtt:start_link([
  410. {clientid, ClientId},
  411. {proto_ver, v5},
  412. {clean_start, false},
  413. {properties, #{'Session-Expiry-Interval' => 30}}
  414. | Config
  415. ]),
  416. {ok, _} = emqtt:ConnFun(Client2),
  417. %% The session should not be known, since it wasn't persisted because of the
  418. %% changed expiry interval in the disconnect call.
  419. ?assertEqual(0, client_info(session_present, Client2)),
  420. ok = emqtt:disconnect(Client2).
  421. t_process_dies_session_expires(init, Config) -> skip_ds_tc(Config);
  422. t_process_dies_session_expires('end', _Config) -> ok.
  423. t_process_dies_session_expires(Config) ->
  424. %% Emulate an error in the connect process,
  425. %% or that the node of the process goes down.
  426. %% A persistent session should eventually expire.
  427. ConnFun = ?config(conn_fun, Config),
  428. ClientId = ?config(client_id, Config),
  429. Topic = ?config(topic, Config),
  430. STopic = ?config(stopic, Config),
  431. Payload = <<"test">>,
  432. {ok, Client1} = emqtt:start_link([
  433. {proto_ver, v5},
  434. {clientid, ClientId},
  435. {properties, #{'Session-Expiry-Interval' => 1}},
  436. {clean_start, true}
  437. | Config
  438. ]),
  439. {ok, _} = emqtt:ConnFun(Client1),
  440. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  441. ok = emqtt:disconnect(Client1),
  442. maybe_kill_connection_process(ClientId, Config),
  443. ok = publish(Topic, [Payload]),
  444. timer:sleep(1100),
  445. {ok, Client2} = emqtt:start_link([
  446. {proto_ver, v5},
  447. {clientid, ClientId},
  448. {properties, #{'Session-Expiry-Interval' => 30}},
  449. {clean_start, false}
  450. | Config
  451. ]),
  452. {ok, _} = emqtt:ConnFun(Client2),
  453. ?assertEqual(0, client_info(session_present, Client2)),
  454. %% We should not receive the pending message
  455. ?assertEqual([], receive_messages(1)),
  456. emqtt:disconnect(Client2).
  457. t_publish_while_client_is_gone(init, Config) -> skip_ds_tc(Config);
  458. t_publish_while_client_is_gone('end', _Config) -> ok.
  459. t_publish_while_client_is_gone(Config) ->
  460. %% A persistent session should receive messages in its
  461. %% subscription even if the process owning the session dies.
  462. ConnFun = ?config(conn_fun, Config),
  463. Topic = ?config(topic, Config),
  464. STopic = ?config(stopic, Config),
  465. Payload1 = <<"hello1">>,
  466. Payload2 = <<"hello2">>,
  467. ClientId = ?config(client_id, Config),
  468. {ok, Client1} = emqtt:start_link([
  469. {proto_ver, v5},
  470. {clientid, ClientId},
  471. {properties, #{'Session-Expiry-Interval' => 30}},
  472. {clean_start, true}
  473. | Config
  474. ]),
  475. {ok, _} = emqtt:ConnFun(Client1),
  476. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  477. ok = emqtt:disconnect(Client1),
  478. maybe_kill_connection_process(ClientId, Config),
  479. ok = publish(Topic, [Payload1, Payload2]),
  480. {ok, Client2} = emqtt:start_link([
  481. {proto_ver, v5},
  482. {clientid, ClientId},
  483. {properties, #{'Session-Expiry-Interval' => 30}},
  484. {clean_start, false}
  485. | Config
  486. ]),
  487. {ok, _} = emqtt:ConnFun(Client2),
  488. Msgs = receive_messages(2),
  489. ?assertMatch([_, _], Msgs),
  490. [Msg2, Msg1] = Msgs,
  491. ?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)),
  492. ?assertEqual({ok, 2}, maps:find(qos, Msg1)),
  493. ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)),
  494. ?assertEqual({ok, 2}, maps:find(qos, Msg2)),
  495. ok = emqtt:disconnect(Client2).
  496. t_clean_start_drops_subscriptions(init, Config) -> skip_ds_tc(Config);
  497. t_clean_start_drops_subscriptions('end', _Config) -> ok.
  498. t_clean_start_drops_subscriptions(Config) ->
  499. %% 1. A persistent session is started and disconnected.
  500. %% 2. While disconnected, a message is published and persisted.
  501. %% 3. When connecting again, the clean start flag is set, the subscription is renewed,
  502. %% then we disconnect again.
  503. %% 4. Finally, a new connection is made with clean start set to false.
  504. %% The original message should not be delivered.
  505. ConnFun = ?config(conn_fun, Config),
  506. Topic = ?config(topic, Config),
  507. STopic = ?config(stopic, Config),
  508. Payload1 = <<"hello1">>,
  509. Payload2 = <<"hello2">>,
  510. Payload3 = <<"hello3">>,
  511. ClientId = ?config(client_id, Config),
  512. %% 1.
  513. {ok, Client1} = emqtt:start_link([
  514. {proto_ver, v5},
  515. {clientid, ClientId},
  516. {properties, #{'Session-Expiry-Interval' => 30}},
  517. {clean_start, true}
  518. | Config
  519. ]),
  520. {ok, _} = emqtt:ConnFun(Client1),
  521. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  522. ok = emqtt:disconnect(Client1),
  523. maybe_kill_connection_process(ClientId, Config),
  524. %% 2.
  525. ok = publish(Topic, Payload1),
  526. %% 3.
  527. {ok, Client2} = emqtt:start_link([
  528. {proto_ver, v5},
  529. {clientid, ClientId},
  530. {properties, #{'Session-Expiry-Interval' => 30}},
  531. {clean_start, true}
  532. | Config
  533. ]),
  534. {ok, _} = emqtt:ConnFun(Client2),
  535. ?assertEqual(0, client_info(session_present, Client2)),
  536. {ok, _, [2]} = emqtt:subscribe(Client2, STopic, qos2),
  537. ok = publish(Topic, Payload2),
  538. [Msg1] = receive_messages(1),
  539. ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg1)),
  540. ok = emqtt:disconnect(Client2),
  541. maybe_kill_connection_process(ClientId, Config),
  542. %% 4.
  543. {ok, Client3} = emqtt:start_link([
  544. {proto_ver, v5},
  545. {clientid, ClientId},
  546. {properties, #{'Session-Expiry-Interval' => 30}},
  547. {clean_start, false}
  548. | Config
  549. ]),
  550. {ok, _} = emqtt:ConnFun(Client3),
  551. ok = publish(Topic, Payload3),
  552. [Msg2] = receive_messages(1),
  553. ?assertEqual({ok, iolist_to_binary(Payload3)}, maps:find(payload, Msg2)),
  554. ok = emqtt:disconnect(Client3).
  555. t_unsubscribe(Config) ->
  556. ConnFun = ?config(conn_fun, Config),
  557. STopic = ?config(stopic, Config),
  558. ClientId = ?config(client_id, Config),
  559. {ok, Client} = emqtt:start_link([
  560. {clientid, ClientId},
  561. {proto_ver, v5},
  562. {properties, #{'Session-Expiry-Interval' => 30}}
  563. | Config
  564. ]),
  565. {ok, _} = emqtt:ConnFun(Client),
  566. {ok, _, [2]} = emqtt:subscribe(Client, STopic, qos2),
  567. ?assertMatch([_], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
  568. {ok, _, _} = emqtt:unsubscribe(Client, STopic),
  569. ?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
  570. ok = emqtt:disconnect(Client).
  571. t_multiple_subscription_matches(init, Config) -> skip_ds_tc(Config);
  572. t_multiple_subscription_matches('end', _Config) -> ok.
  573. t_multiple_subscription_matches(Config) ->
  574. ConnFun = ?config(conn_fun, Config),
  575. Topic = ?config(topic, Config),
  576. STopic1 = ?config(stopic, Config),
  577. STopic2 = ?config(stopic_alt, Config),
  578. Payload = <<"test message">>,
  579. ClientId = ?config(client_id, Config),
  580. {ok, Client1} = emqtt:start_link([
  581. {clientid, ClientId},
  582. {proto_ver, v5},
  583. {properties, #{'Session-Expiry-Interval' => 30}}
  584. | Config
  585. ]),
  586. {ok, _} = emqtt:ConnFun(Client1),
  587. {ok, _, [2]} = emqtt:subscribe(Client1, STopic1, qos2),
  588. {ok, _, [2]} = emqtt:subscribe(Client1, STopic2, qos2),
  589. ok = emqtt:disconnect(Client1),
  590. maybe_kill_connection_process(ClientId, Config),
  591. publish(Topic, Payload),
  592. {ok, Client2} = emqtt:start_link([
  593. {clientid, ClientId},
  594. {proto_ver, v5},
  595. {properties, #{'Session-Expiry-Interval' => 30}},
  596. {clean_start, false}
  597. | Config
  598. ]),
  599. {ok, _} = emqtt:ConnFun(Client2),
  600. %% We will receive the same message twice because it matches two subscriptions.
  601. [Msg1, Msg2] = receive_messages(2),
  602. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg1)),
  603. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg1)),
  604. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg2)),
  605. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg2)),
  606. ?assertEqual({ok, 2}, maps:find(qos, Msg1)),
  607. ?assertEqual({ok, 2}, maps:find(qos, Msg2)),
  608. ok = emqtt:disconnect(Client2).
  609. skip_ds_tc(Config) ->
  610. case ?config(persistent_store, Config) of
  611. ds ->
  612. {skip, "Testcase not yet supported under 'emqx_persistent_session_ds' implementation"};
  613. _ ->
  614. Config
  615. end.