emqx_persistent_session_SUITE.erl 45 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_persistent_session_SUITE).
  17. -include_lib("stdlib/include/assert.hrl").
  18. -include_lib("common_test/include/ct.hrl").
  19. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  20. -include_lib("../include/emqx.hrl").
  21. -include("../src/persistent_session/emqx_persistent_session.hrl").
  22. -compile(export_all).
  23. -compile(nowarn_export_all).
  24. %%--------------------------------------------------------------------
  25. %% SUITE boilerplate
  26. %%--------------------------------------------------------------------
  27. all() ->
  28. [
  29. % NOTE
  30. % Tests are disabled while existing session persistence impl is being
  31. % phased out.
  32. % {group, persistent_store_enabled},
  33. {group, persistent_store_disabled}
  34. ].
  35. %% A persistent session can be resumed in two ways:
  36. %% 1. The old connection process is still alive, and the session is taken
  37. %% over by the new connection.
  38. %% 2. The old session process has died (e.g., because of node down).
  39. %% The new process resumes the session from the stored state, and finds
  40. %% any subscribed messages from the persistent message store.
  41. %%
  42. %% We want to test both ways, both with the db backend enabled and disabled.
  43. %%
  44. %% In addition, we test both tcp and quic connections.
  45. groups() ->
  46. TCs = emqx_common_test_helpers:all(?MODULE),
  47. SnabbkaffeTCs = [TC || TC <- TCs, is_snabbkaffe_tc(TC)],
  48. GCTests = [TC || TC <- TCs, is_gc_tc(TC)],
  49. OtherTCs = (TCs -- SnabbkaffeTCs) -- GCTests,
  50. [
  51. {persistent_store_enabled, [
  52. {group, ram_tables},
  53. {group, disc_tables}
  54. ]},
  55. {persistent_store_disabled, [{group, no_kill_connection_process}]},
  56. {ram_tables, [], [
  57. {group, no_kill_connection_process},
  58. {group, kill_connection_process},
  59. {group, snabbkaffe},
  60. {group, gc_tests}
  61. ]},
  62. {disc_tables, [], [
  63. {group, no_kill_connection_process},
  64. {group, kill_connection_process},
  65. {group, snabbkaffe},
  66. {group, gc_tests}
  67. ]},
  68. {no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]},
  69. {kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]},
  70. {snabbkaffe, [], [
  71. {group, tcp_snabbkaffe}, {group, quic_snabbkaffe}, {group, ws_snabbkaffe}
  72. ]},
  73. {tcp, [], OtherTCs},
  74. {quic, [], OtherTCs},
  75. {ws, [], OtherTCs},
  76. {tcp_snabbkaffe, [], SnabbkaffeTCs},
  77. {quic_snabbkaffe, [], SnabbkaffeTCs},
  78. {ws_snabbkaffe, [], SnabbkaffeTCs},
  79. {gc_tests, [], GCTests}
  80. ].
  81. is_snabbkaffe_tc(TC) ->
  82. re:run(atom_to_list(TC), "^t_snabbkaffe_") /= nomatch.
  83. is_gc_tc(TC) ->
  84. re:run(atom_to_list(TC), "^t_gc_") /= nomatch.
  85. init_per_group(persistent_store_enabled, Config) ->
  86. [{persistent_store_enabled, true} | Config];
  87. init_per_group(Group, Config) when Group =:= ram_tables; Group =:= disc_tables ->
  88. %% Start Apps
  89. Reply =
  90. case Group =:= ram_tables of
  91. true -> ram;
  92. false -> disc
  93. end,
  94. emqx_common_test_helpers:boot_modules(all),
  95. meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]),
  96. meck:expect(emqx_config, get, fun
  97. (?on_disc_key) -> Reply =:= disc;
  98. (?is_enabled_key) -> true;
  99. (Other) -> meck:passthrough([Other])
  100. end),
  101. emqx_common_test_helpers:start_apps([], fun set_special_confs/1),
  102. ?assertEqual(true, emqx_persistent_session:is_store_enabled()),
  103. Config;
  104. init_per_group(persistent_store_disabled, Config) ->
  105. %% Start Apps
  106. emqx_common_test_helpers:boot_modules(all),
  107. meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]),
  108. meck:expect(emqx_config, get, fun
  109. (?is_enabled_key) -> false;
  110. (Other) -> meck:passthrough([Other])
  111. end),
  112. emqx_common_test_helpers:start_apps([], fun set_special_confs/1),
  113. ?assertEqual(false, emqx_persistent_session:is_store_enabled()),
  114. [{persistent_store_enabled, false} | Config];
  115. init_per_group(Group, Config) when Group == ws; Group == ws_snabbkaffe ->
  116. [
  117. {ssl, false},
  118. {host, "localhost"},
  119. {enable_websocket, true},
  120. {port, 8083},
  121. {conn_fun, ws_connect}
  122. | Config
  123. ];
  124. init_per_group(Group, Config) when Group == tcp; Group == tcp_snabbkaffe ->
  125. [{port, 1883}, {conn_fun, connect} | Config];
  126. init_per_group(Group, Config) when Group == quic; Group == quic_snabbkaffe ->
  127. UdpPort = 1883,
  128. emqx_common_test_helpers:ensure_quic_listener(?MODULE, UdpPort),
  129. [{port, UdpPort}, {conn_fun, quic_connect} | Config];
  130. init_per_group(no_kill_connection_process, Config) ->
  131. [{kill_connection_process, false} | Config];
  132. init_per_group(kill_connection_process, Config) ->
  133. [{kill_connection_process, true} | Config];
  134. init_per_group(snabbkaffe, Config) ->
  135. [{kill_connection_process, true} | Config];
  136. init_per_group(gc_tests, Config) ->
  137. %% We need to make sure the system does not interfere with this test group.
  138. lists:foreach(
  139. fun(ClientId) ->
  140. maybe_kill_connection_process(ClientId, [{kill_connection_process, true}])
  141. end,
  142. emqx_cm:all_client_ids()
  143. ),
  144. emqx_common_test_helpers:stop_apps([]),
  145. SessionMsgEts = gc_tests_session_store,
  146. MsgEts = gc_tests_msg_store,
  147. Pid = spawn(fun() ->
  148. ets:new(SessionMsgEts, [named_table, public, ordered_set]),
  149. ets:new(MsgEts, [named_table, public, ordered_set, {keypos, 2}]),
  150. receive
  151. stop -> ok
  152. end
  153. end),
  154. meck:new(mnesia, [non_strict, passthrough, no_history, no_link]),
  155. meck:expect(mnesia, dirty_first, fun
  156. (?SESS_MSG_TAB) -> ets:first(SessionMsgEts);
  157. (?MSG_TAB) -> ets:first(MsgEts);
  158. (X) -> meck:passthrough([X])
  159. end),
  160. meck:expect(mnesia, dirty_next, fun
  161. (?SESS_MSG_TAB, X) -> ets:next(SessionMsgEts, X);
  162. (?MSG_TAB, X) -> ets:next(MsgEts, X);
  163. (Tab, X) -> meck:passthrough([Tab, X])
  164. end),
  165. meck:expect(mnesia, dirty_delete, fun
  166. (?MSG_TAB, X) -> ets:delete(MsgEts, X);
  167. (Tab, X) -> meck:passthrough([Tab, X])
  168. end),
  169. [{store_owner, Pid}, {session_msg_store, SessionMsgEts}, {msg_store, MsgEts} | Config].
  170. init_per_suite(Config) ->
  171. Config.
  172. set_special_confs(_) ->
  173. ok.
  174. end_per_suite(_Config) ->
  175. emqx_common_test_helpers:ensure_mnesia_stopped(),
  176. ok.
  177. end_per_group(gc_tests, Config) ->
  178. meck:unload(mnesia),
  179. ?config(store_owner, Config) ! stop,
  180. ok;
  181. end_per_group(Group, _Config) when
  182. Group =:= ram_tables; Group =:= disc_tables
  183. ->
  184. meck:unload(emqx_config),
  185. emqx_common_test_helpers:stop_apps([]);
  186. end_per_group(persistent_store_disabled, _Config) ->
  187. meck:unload(emqx_config),
  188. emqx_common_test_helpers:stop_apps([]);
  189. end_per_group(_Group, _Config) ->
  190. ok.
  191. init_per_testcase(TestCase, Config) ->
  192. Config1 = preconfig_per_testcase(TestCase, Config),
  193. case is_gc_tc(TestCase) of
  194. true ->
  195. ets:delete_all_objects(?config(msg_store, Config)),
  196. ets:delete_all_objects(?config(session_msg_store, Config));
  197. false ->
  198. skip
  199. end,
  200. case erlang:function_exported(?MODULE, TestCase, 2) of
  201. true -> ?MODULE:TestCase(init, Config1);
  202. _ -> Config1
  203. end.
  204. end_per_testcase(TestCase, Config) ->
  205. case is_snabbkaffe_tc(TestCase) of
  206. true -> snabbkaffe:stop();
  207. false -> skip
  208. end,
  209. case erlang:function_exported(?MODULE, TestCase, 2) of
  210. true -> ?MODULE:TestCase('end', Config);
  211. false -> ok
  212. end,
  213. Config.
  214. preconfig_per_testcase(TestCase, Config) ->
  215. {BaseName, Config1} =
  216. case ?config(tc_group_properties, Config) of
  217. [] ->
  218. %% We are running a single testcase
  219. {
  220. atom_to_binary(TestCase),
  221. init_per_group(tcp, init_per_group(kill_connection_process, Config))
  222. };
  223. [_ | _] = Props ->
  224. Path = lists:reverse(?config(tc_group_path, Config) ++ Props),
  225. Pre0 = [atom_to_list(N) || {name, N} <- lists:flatten(Path)],
  226. Pre1 = lists:join("_", Pre0 ++ [atom_to_binary(TestCase)]),
  227. {iolist_to_binary(Pre1), Config}
  228. end,
  229. [
  230. {topic, iolist_to_binary([BaseName, "/foo"])},
  231. {stopic, iolist_to_binary([BaseName, "/+"])},
  232. {stopic_alt, iolist_to_binary([BaseName, "/foo"])},
  233. {client_id, BaseName}
  234. | Config1
  235. ].
  236. %%--------------------------------------------------------------------
  237. %% Helpers
  238. %%--------------------------------------------------------------------
  239. client_info(Key, Client) ->
  240. maps:get(Key, maps:from_list(emqtt:info(Client)), undefined).
  241. receive_messages(Count) ->
  242. receive_messages(Count, []).
  243. receive_messages(0, Msgs) ->
  244. Msgs;
  245. receive_messages(Count, Msgs) ->
  246. receive
  247. {publish, Msg} ->
  248. receive_messages(Count - 1, [Msg | Msgs]);
  249. _Other ->
  250. receive_messages(Count, Msgs)
  251. after 5000 ->
  252. Msgs
  253. end.
  254. maybe_kill_connection_process(ClientId, Config) ->
  255. case ?config(kill_connection_process, Config) of
  256. true ->
  257. case emqx_cm:lookup_channels(ClientId) of
  258. [] ->
  259. ok;
  260. [ConnectionPid] ->
  261. ?assert(is_pid(ConnectionPid)),
  262. Ref = monitor(process, ConnectionPid),
  263. ConnectionPid ! die_if_test,
  264. receive
  265. {'DOWN', Ref, process, ConnectionPid, normal} -> ok
  266. after 3000 -> error(process_did_not_die)
  267. end,
  268. wait_for_cm_unregister(ClientId)
  269. end;
  270. false ->
  271. ok
  272. end.
  273. wait_for_cm_unregister(ClientId) ->
  274. wait_for_cm_unregister(ClientId, 100).
  275. wait_for_cm_unregister(_ClientId, 0) ->
  276. error(cm_did_not_unregister);
  277. wait_for_cm_unregister(ClientId, N) ->
  278. case emqx_cm:lookup_channels(ClientId) of
  279. [] ->
  280. ok;
  281. [_] ->
  282. timer:sleep(100),
  283. wait_for_cm_unregister(ClientId, N - 1)
  284. end.
  285. snabbkaffe_sync_publish(Topic, Payloads) ->
  286. Fun = fun(Client, Payload) ->
  287. ?check_trace(
  288. begin
  289. ?wait_async_action(
  290. {ok, _} = emqtt:publish(Client, Topic, Payload, 2),
  291. #{?snk_kind := ps_persist_msg, payload := Payload}
  292. )
  293. end,
  294. fun(_, _Trace) -> ok end
  295. )
  296. end,
  297. do_publish(Payloads, Fun, true).
  298. publish(Topic, Payloads) ->
  299. publish(Topic, Payloads, false).
  300. publish(Topic, Payloads, WaitForUnregister) ->
  301. Fun = fun(Client, Payload) ->
  302. {ok, _} = emqtt:publish(Client, Topic, Payload, 2)
  303. end,
  304. do_publish(Payloads, Fun, WaitForUnregister).
  305. do_publish(Payloads = [_ | _], PublishFun, WaitForUnregister) ->
  306. %% Publish from another process to avoid connection confusion.
  307. {Pid, Ref} =
  308. spawn_monitor(
  309. fun() ->
  310. %% For convenience, always publish using tcp.
  311. %% The publish path is not what we are testing.
  312. ClientID = <<"ps_SUITE_publisher">>,
  313. {ok, Client} = emqtt:start_link([
  314. {proto_ver, v5},
  315. {clientid, ClientID},
  316. {port, 1883}
  317. ]),
  318. {ok, _} = emqtt:connect(Client),
  319. lists:foreach(fun(Payload) -> PublishFun(Client, Payload) end, Payloads),
  320. ok = emqtt:disconnect(Client),
  321. %% Snabbkaffe sometimes fails unless all processes are gone.
  322. case WaitForUnregister of
  323. false ->
  324. ok;
  325. true ->
  326. case emqx_cm:lookup_channels(ClientID) of
  327. [] ->
  328. ok;
  329. [ConnectionPid] ->
  330. ?assert(is_pid(ConnectionPid)),
  331. Ref1 = monitor(process, ConnectionPid),
  332. receive
  333. {'DOWN', Ref1, process, ConnectionPid, _} -> ok
  334. after 3000 -> error(process_did_not_die)
  335. end,
  336. wait_for_cm_unregister(ClientID)
  337. end
  338. end
  339. end
  340. ),
  341. receive
  342. {'DOWN', Ref, process, Pid, normal} -> ok;
  343. {'DOWN', Ref, process, Pid, What} -> error({failed_publish, What})
  344. end;
  345. do_publish(Payload, PublishFun, WaitForUnregister) ->
  346. do_publish([Payload], PublishFun, WaitForUnregister).
  347. %%--------------------------------------------------------------------
  348. %% Test Cases
  349. %%--------------------------------------------------------------------
  350. %% [MQTT-3.1.2-23]
  351. t_connect_session_expiry_interval(Config) ->
  352. ConnFun = ?config(conn_fun, Config),
  353. Topic = ?config(topic, Config),
  354. STopic = ?config(stopic, Config),
  355. Payload = <<"test message">>,
  356. ClientId = ?config(client_id, Config),
  357. {ok, Client1} = emqtt:start_link([
  358. {clientid, ClientId},
  359. {proto_ver, v5},
  360. {properties, #{'Session-Expiry-Interval' => 30}}
  361. | Config
  362. ]),
  363. {ok, _} = emqtt:ConnFun(Client1),
  364. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  365. ok = emqtt:disconnect(Client1),
  366. maybe_kill_connection_process(ClientId, Config),
  367. publish(Topic, Payload),
  368. {ok, Client2} = emqtt:start_link([
  369. {clientid, ClientId},
  370. {proto_ver, v5},
  371. {properties, #{'Session-Expiry-Interval' => 30}},
  372. {clean_start, false}
  373. | Config
  374. ]),
  375. {ok, _} = emqtt:ConnFun(Client2),
  376. [Msg | _] = receive_messages(1),
  377. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)),
  378. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)),
  379. ?assertEqual({ok, 2}, maps:find(qos, Msg)),
  380. ok = emqtt:disconnect(Client2).
  381. t_without_client_id(Config) ->
  382. %% Emqtt client dies
  383. process_flag(trap_exit, true),
  384. ConnFun = ?config(conn_fun, Config),
  385. {ok, Client0} = emqtt:start_link([
  386. {proto_ver, v5},
  387. {properties, #{'Session-Expiry-Interval' => 30}},
  388. {clean_start, false}
  389. | Config
  390. ]),
  391. {error, {client_identifier_not_valid, _}} = emqtt:ConnFun(Client0),
  392. ok.
  393. t_assigned_clientid_persistent_session(Config) ->
  394. ConnFun = ?config(conn_fun, Config),
  395. {ok, Client1} = emqtt:start_link([
  396. {proto_ver, v5},
  397. {properties, #{'Session-Expiry-Interval' => 30}},
  398. {clean_start, true}
  399. | Config
  400. ]),
  401. {ok, _} = emqtt:ConnFun(Client1),
  402. AssignedClientId = client_info(clientid, Client1),
  403. ok = emqtt:disconnect(Client1),
  404. maybe_kill_connection_process(AssignedClientId, Config),
  405. {ok, Client2} = emqtt:start_link([
  406. {clientid, AssignedClientId},
  407. {proto_ver, v5},
  408. {clean_start, false}
  409. | Config
  410. ]),
  411. {ok, _} = emqtt:ConnFun(Client2),
  412. ?assertEqual(1, client_info(session_present, Client2)),
  413. ok = emqtt:disconnect(Client2).
  414. t_cancel_on_disconnect(Config) ->
  415. %% Open a persistent session, but cancel the persistence when
  416. %% shutting down the connection.
  417. ConnFun = ?config(conn_fun, Config),
  418. ClientId = ?config(client_id, Config),
  419. {ok, Client1} = emqtt:start_link([
  420. {proto_ver, v5},
  421. {clientid, ClientId},
  422. {properties, #{'Session-Expiry-Interval' => 30}},
  423. {clean_start, true}
  424. | Config
  425. ]),
  426. {ok, _} = emqtt:ConnFun(Client1),
  427. ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 0}),
  428. wait_for_cm_unregister(ClientId),
  429. {ok, Client2} = emqtt:start_link([
  430. {clientid, ClientId},
  431. {proto_ver, v5},
  432. {clean_start, false},
  433. {properties, #{'Session-Expiry-Interval' => 30}}
  434. | Config
  435. ]),
  436. {ok, _} = emqtt:ConnFun(Client2),
  437. ?assertEqual(0, client_info(session_present, Client2)),
  438. ok = emqtt:disconnect(Client2).
  439. t_persist_on_disconnect(Config) ->
  440. %% Open a non-persistent session, but add the persistence when
  441. %% shutting down the connection. This is a protocol error, and
  442. %% should not convert the session into a persistent session.
  443. ConnFun = ?config(conn_fun, Config),
  444. ClientId = ?config(client_id, Config),
  445. {ok, Client1} = emqtt:start_link([
  446. {proto_ver, v5},
  447. {clientid, ClientId},
  448. {properties, #{'Session-Expiry-Interval' => 0}},
  449. {clean_start, true}
  450. | Config
  451. ]),
  452. {ok, _} = emqtt:ConnFun(Client1),
  453. %% Strangely enough, the disconnect is reported as successful by emqtt.
  454. ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 30}),
  455. wait_for_cm_unregister(ClientId),
  456. {ok, Client2} = emqtt:start_link([
  457. {clientid, ClientId},
  458. {proto_ver, v5},
  459. {clean_start, false},
  460. {properties, #{'Session-Expiry-Interval' => 30}}
  461. | Config
  462. ]),
  463. {ok, _} = emqtt:ConnFun(Client2),
  464. %% The session should not be known, since it wasn't persisted because of the
  465. %% changed expiry interval in the disconnect call.
  466. ?assertEqual(0, client_info(session_present, Client2)),
  467. ok = emqtt:disconnect(Client2).
  468. wait_for_pending(SId) ->
  469. wait_for_pending(SId, 100).
  470. wait_for_pending(_SId, 0) ->
  471. error(exhausted_wait_for_pending);
  472. wait_for_pending(SId, N) ->
  473. case emqx_persistent_session:pending(SId) of
  474. [] ->
  475. timer:sleep(1),
  476. wait_for_pending(SId, N - 1);
  477. [_ | _] = Pending ->
  478. Pending
  479. end.
  480. t_process_dies_session_expires(Config) ->
  481. %% Emulate an error in the connect process,
  482. %% or that the node of the process goes down.
  483. %% A persistent session should eventually expire.
  484. ConnFun = ?config(conn_fun, Config),
  485. ClientId = ?config(client_id, Config),
  486. Topic = ?config(topic, Config),
  487. STopic = ?config(stopic, Config),
  488. Payload = <<"test">>,
  489. {ok, Client1} = emqtt:start_link([
  490. {proto_ver, v5},
  491. {clientid, ClientId},
  492. {properties, #{'Session-Expiry-Interval' => 1}},
  493. {clean_start, true}
  494. | Config
  495. ]),
  496. {ok, _} = emqtt:ConnFun(Client1),
  497. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  498. ok = emqtt:disconnect(Client1),
  499. maybe_kill_connection_process(ClientId, Config),
  500. ok = publish(Topic, [Payload]),
  501. SessionId =
  502. case ?config(persistent_store_enabled, Config) of
  503. false ->
  504. undefined;
  505. true ->
  506. %% The session should not be marked as expired.
  507. {Tag, Session} = emqx_persistent_session:lookup(ClientId),
  508. ?assertEqual(persistent, Tag),
  509. SId = emqx_session:info(id, Session),
  510. case ?config(kill_connection_process, Config) of
  511. true ->
  512. %% The session should have a pending message
  513. ?assertMatch([_], wait_for_pending(SId));
  514. false ->
  515. skip
  516. end,
  517. SId
  518. end,
  519. timer:sleep(1100),
  520. %% The session should now be marked as expired.
  521. case
  522. (?config(kill_connection_process, Config) andalso
  523. ?config(persistent_store_enabled, Config))
  524. of
  525. true -> ?assertMatch({expired, _}, emqx_persistent_session:lookup(ClientId));
  526. false -> skip
  527. end,
  528. {ok, Client2} = emqtt:start_link([
  529. {proto_ver, v5},
  530. {clientid, ClientId},
  531. {properties, #{'Session-Expiry-Interval' => 30}},
  532. {clean_start, false}
  533. | Config
  534. ]),
  535. {ok, _} = emqtt:ConnFun(Client2),
  536. ?assertEqual(0, client_info(session_present, Client2)),
  537. case
  538. (?config(kill_connection_process, Config) andalso
  539. ?config(persistent_store_enabled, Config))
  540. of
  541. true ->
  542. %% The session should be a fresh one
  543. {persistent, NewSession} = emqx_persistent_session:lookup(ClientId),
  544. ?assertNotEqual(SessionId, emqx_session:info(id, NewSession)),
  545. %% The old session should now either
  546. %% be marked as abandoned or already be garbage collected.
  547. ?assertMatch([], emqx_persistent_session:pending(SessionId));
  548. false ->
  549. skip
  550. end,
  551. %% We should not receive the pending message
  552. ?assertEqual([], receive_messages(1)),
  553. emqtt:disconnect(Client2).
  554. t_publish_while_client_is_gone(Config) ->
  555. %% A persistent session should receive messages in its
  556. %% subscription even if the process owning the session dies.
  557. ConnFun = ?config(conn_fun, Config),
  558. Topic = ?config(topic, Config),
  559. STopic = ?config(stopic, Config),
  560. Payload1 = <<"hello1">>,
  561. Payload2 = <<"hello2">>,
  562. ClientId = ?config(client_id, Config),
  563. {ok, Client1} = emqtt:start_link([
  564. {proto_ver, v5},
  565. {clientid, ClientId},
  566. {properties, #{'Session-Expiry-Interval' => 30}},
  567. {clean_start, true}
  568. | Config
  569. ]),
  570. {ok, _} = emqtt:ConnFun(Client1),
  571. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  572. ok = emqtt:disconnect(Client1),
  573. maybe_kill_connection_process(ClientId, Config),
  574. ok = publish(Topic, [Payload1, Payload2]),
  575. {ok, Client2} = emqtt:start_link([
  576. {proto_ver, v5},
  577. {clientid, ClientId},
  578. {properties, #{'Session-Expiry-Interval' => 30}},
  579. {clean_start, false}
  580. | Config
  581. ]),
  582. {ok, _} = emqtt:ConnFun(Client2),
  583. Msgs = receive_messages(2),
  584. ?assertMatch([_, _], Msgs),
  585. [Msg2, Msg1] = Msgs,
  586. ?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)),
  587. ?assertEqual({ok, 2}, maps:find(qos, Msg1)),
  588. ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)),
  589. ?assertEqual({ok, 2}, maps:find(qos, Msg2)),
  590. ok = emqtt:disconnect(Client2).
  591. t_clean_start_drops_subscriptions(Config) ->
  592. %% 1. A persistent session is started and disconnected.
  593. %% 2. While disconnected, a message is published and persisted.
  594. %% 3. When connecting again, the clean start flag is set, the subscription is renewed,
  595. %% then we disconnect again.
  596. %% 4. Finally, a new connection is made with clean start set to false.
  597. %% The original message should not be delivered.
  598. ConnFun = ?config(conn_fun, Config),
  599. Topic = ?config(topic, Config),
  600. STopic = ?config(stopic, Config),
  601. Payload1 = <<"hello1">>,
  602. Payload2 = <<"hello2">>,
  603. Payload3 = <<"hello3">>,
  604. ClientId = ?config(client_id, Config),
  605. %% 1.
  606. {ok, Client1} = emqtt:start_link([
  607. {proto_ver, v5},
  608. {clientid, ClientId},
  609. {properties, #{'Session-Expiry-Interval' => 30}},
  610. {clean_start, true}
  611. | Config
  612. ]),
  613. {ok, _} = emqtt:ConnFun(Client1),
  614. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  615. ok = emqtt:disconnect(Client1),
  616. maybe_kill_connection_process(ClientId, Config),
  617. %% 2.
  618. ok = publish(Topic, Payload1),
  619. %% 3.
  620. {ok, Client2} = emqtt:start_link([
  621. {proto_ver, v5},
  622. {clientid, ClientId},
  623. {properties, #{'Session-Expiry-Interval' => 30}},
  624. {clean_start, true}
  625. | Config
  626. ]),
  627. {ok, _} = emqtt:ConnFun(Client2),
  628. ?assertEqual(0, client_info(session_present, Client2)),
  629. {ok, _, [2]} = emqtt:subscribe(Client2, STopic, qos2),
  630. ok = publish(Topic, Payload2),
  631. [Msg1] = receive_messages(1),
  632. ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg1)),
  633. ok = emqtt:disconnect(Client2),
  634. maybe_kill_connection_process(ClientId, Config),
  635. %% 4.
  636. {ok, Client3} = emqtt:start_link([
  637. {proto_ver, v5},
  638. {clientid, ClientId},
  639. {properties, #{'Session-Expiry-Interval' => 30}},
  640. {clean_start, false}
  641. | Config
  642. ]),
  643. {ok, _} = emqtt:ConnFun(Client3),
  644. ok = publish(Topic, Payload3),
  645. [Msg2] = receive_messages(1),
  646. ?assertEqual({ok, iolist_to_binary(Payload3)}, maps:find(payload, Msg2)),
  647. ok = emqtt:disconnect(Client3).
  648. t_unsubscribe(Config) ->
  649. ConnFun = ?config(conn_fun, Config),
  650. Topic = ?config(topic, Config),
  651. STopic = ?config(stopic, Config),
  652. ClientId = ?config(client_id, Config),
  653. {ok, Client} = emqtt:start_link([
  654. {clientid, ClientId},
  655. {proto_ver, v5},
  656. {properties, #{'Session-Expiry-Interval' => 30}}
  657. | Config
  658. ]),
  659. {ok, _} = emqtt:ConnFun(Client),
  660. {ok, _, [2]} = emqtt:subscribe(Client, STopic, qos2),
  661. case emqx_persistent_session:is_store_enabled() of
  662. true ->
  663. {persistent, Session} = emqx_persistent_session:lookup(ClientId),
  664. SessionID = emqx_session:info(id, Session),
  665. SessionIDs = [SId || #route{dest = SId} <- emqx_session_router:match_routes(Topic)],
  666. ?assert(lists:member(SessionID, SessionIDs)),
  667. ?assertMatch([_], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
  668. {ok, _, _} = emqtt:unsubscribe(Client, STopic),
  669. ?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
  670. SessionIDs2 = [SId || #route{dest = SId} <- emqx_session_router:match_routes(Topic)],
  671. ?assert(not lists:member(SessionID, SessionIDs2));
  672. false ->
  673. ?assertMatch([_], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
  674. {ok, _, _} = emqtt:unsubscribe(Client, STopic),
  675. ?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic])
  676. end,
  677. ok = emqtt:disconnect(Client).
  678. t_multiple_subscription_matches(Config) ->
  679. ConnFun = ?config(conn_fun, Config),
  680. Topic = ?config(topic, Config),
  681. STopic1 = ?config(stopic, Config),
  682. STopic2 = ?config(stopic_alt, Config),
  683. Payload = <<"test message">>,
  684. ClientId = ?config(client_id, Config),
  685. {ok, Client1} = emqtt:start_link([
  686. {clientid, ClientId},
  687. {proto_ver, v5},
  688. {properties, #{'Session-Expiry-Interval' => 30}}
  689. | Config
  690. ]),
  691. {ok, _} = emqtt:ConnFun(Client1),
  692. {ok, _, [2]} = emqtt:subscribe(Client1, STopic1, qos2),
  693. {ok, _, [2]} = emqtt:subscribe(Client1, STopic2, qos2),
  694. ok = emqtt:disconnect(Client1),
  695. maybe_kill_connection_process(ClientId, Config),
  696. publish(Topic, Payload),
  697. {ok, Client2} = emqtt:start_link([
  698. {clientid, ClientId},
  699. {proto_ver, v5},
  700. {properties, #{'Session-Expiry-Interval' => 30}},
  701. {clean_start, false}
  702. | Config
  703. ]),
  704. {ok, _} = emqtt:ConnFun(Client2),
  705. %% We will receive the same message twice because it matches two subscriptions.
  706. [Msg1, Msg2] = receive_messages(2),
  707. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg1)),
  708. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg1)),
  709. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg2)),
  710. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg2)),
  711. ?assertEqual({ok, 2}, maps:find(qos, Msg1)),
  712. ?assertEqual({ok, 2}, maps:find(qos, Msg2)),
  713. ok = emqtt:disconnect(Client2).
  714. t_lost_messages_because_of_gc(init, Config) ->
  715. case
  716. (emqx_persistent_session:is_store_enabled() andalso
  717. ?config(kill_connection_process, Config))
  718. of
  719. true ->
  720. Retain = 1000,
  721. OldRetain = emqx_config:get(?msg_retain, Retain),
  722. emqx_config:put(?msg_retain, Retain),
  723. [{retain, Retain}, {old_retain, OldRetain} | Config];
  724. false ->
  725. {skip, only_relevant_with_store_and_kill_process}
  726. end;
  727. t_lost_messages_because_of_gc('end', Config) ->
  728. OldRetain = ?config(old_retain, Config),
  729. emqx_config:put(?msg_retain, OldRetain),
  730. ok.
  731. t_lost_messages_because_of_gc(Config) ->
  732. ConnFun = ?config(conn_fun, Config),
  733. Topic = ?config(topic, Config),
  734. STopic = ?config(stopic, Config),
  735. ClientId = ?config(client_id, Config),
  736. Retain = ?config(retain, Config),
  737. Payload1 = <<"hello1">>,
  738. Payload2 = <<"hello2">>,
  739. {ok, Client1} = emqtt:start_link([
  740. {clientid, ClientId},
  741. {proto_ver, v5},
  742. {properties, #{'Session-Expiry-Interval' => 30}}
  743. | Config
  744. ]),
  745. {ok, _} = emqtt:ConnFun(Client1),
  746. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  747. emqtt:disconnect(Client1),
  748. maybe_kill_connection_process(ClientId, Config),
  749. publish(Topic, Payload1),
  750. timer:sleep(2 * Retain),
  751. publish(Topic, Payload2),
  752. emqx_persistent_session_gc:message_gc_worker(),
  753. {ok, Client2} = emqtt:start_link([
  754. {clientid, ClientId},
  755. {clean_start, false},
  756. {proto_ver, v5},
  757. {properties, #{'Session-Expiry-Interval' => 30}}
  758. | Config
  759. ]),
  760. {ok, _} = emqtt:ConnFun(Client2),
  761. Msgs = receive_messages(2),
  762. ?assertMatch([_], Msgs),
  763. ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, hd(Msgs))),
  764. emqtt:disconnect(Client2),
  765. ok.
  766. %%--------------------------------------------------------------------
  767. %% Snabbkaffe helpers
  768. %%--------------------------------------------------------------------
  769. check_snabbkaffe_vanilla(Trace) ->
  770. ResumeTrace = [
  771. T
  772. || #{?snk_kind := K} = T <- Trace,
  773. re:run(to_list(K), "^ps_") /= nomatch
  774. ],
  775. ?assertMatch([_ | _], ResumeTrace),
  776. [_Sid] = lists:usort(?projection(sid, ResumeTrace)),
  777. %% Check internal flow of the emqx_cm resuming
  778. ?assert(
  779. ?strict_causality(
  780. #{?snk_kind := ps_resuming},
  781. #{?snk_kind := ps_initial_pendings},
  782. ResumeTrace
  783. )
  784. ),
  785. ?assert(
  786. ?strict_causality(
  787. #{?snk_kind := ps_initial_pendings},
  788. #{?snk_kind := ps_persist_pendings},
  789. ResumeTrace
  790. )
  791. ),
  792. ?assert(
  793. ?strict_causality(
  794. #{?snk_kind := ps_persist_pendings},
  795. #{?snk_kind := ps_notify_writers},
  796. ResumeTrace
  797. )
  798. ),
  799. ?assert(
  800. ?strict_causality(
  801. #{?snk_kind := ps_notify_writers},
  802. #{?snk_kind := ps_node_markers},
  803. ResumeTrace
  804. )
  805. ),
  806. ?assert(
  807. ?strict_causality(
  808. #{?snk_kind := ps_node_markers},
  809. #{?snk_kind := ps_resume_session},
  810. ResumeTrace
  811. )
  812. ),
  813. ?assert(
  814. ?strict_causality(
  815. #{?snk_kind := ps_resume_session},
  816. #{?snk_kind := ps_marker_pendings},
  817. ResumeTrace
  818. )
  819. ),
  820. ?assert(
  821. ?strict_causality(
  822. #{?snk_kind := ps_marker_pendings},
  823. #{?snk_kind := ps_marker_pendings_msgs},
  824. ResumeTrace
  825. )
  826. ),
  827. ?assert(
  828. ?strict_causality(
  829. #{?snk_kind := ps_marker_pendings_msgs},
  830. #{?snk_kind := ps_resume_end},
  831. ResumeTrace
  832. )
  833. ),
  834. %% Check flow between worker and emqx_cm
  835. ?assert(
  836. ?strict_causality(
  837. #{?snk_kind := ps_notify_writers},
  838. #{?snk_kind := ps_worker_started},
  839. ResumeTrace
  840. )
  841. ),
  842. ?assert(
  843. ?strict_causality(
  844. #{?snk_kind := ps_marker_pendings},
  845. #{?snk_kind := ps_worker_resume_end},
  846. ResumeTrace
  847. )
  848. ),
  849. ?assert(
  850. ?strict_causality(
  851. #{?snk_kind := ps_worker_resume_end},
  852. #{?snk_kind := ps_worker_shutdown},
  853. ResumeTrace
  854. )
  855. ),
  856. [Markers] = ?projection(markers, ?of_kind(ps_node_markers, Trace)),
  857. ?assertMatch([_], Markers).
  858. to_list(L) when is_list(L) -> L;
  859. to_list(A) when is_atom(A) -> atom_to_list(A);
  860. to_list(B) when is_binary(B) -> binary_to_list(B).
  861. %%--------------------------------------------------------------------
  862. %% Snabbkaffe tests
  863. %%--------------------------------------------------------------------
  864. t_snabbkaffe_vanilla_stages(Config) ->
  865. %% Test that all stages of session resume works ok in the simplest case
  866. ConnFun = ?config(conn_fun, Config),
  867. ClientId = ?config(client_id, Config),
  868. EmqttOpts = [
  869. {proto_ver, v5},
  870. {clientid, ClientId},
  871. {properties, #{'Session-Expiry-Interval' => 30}}
  872. | Config
  873. ],
  874. {ok, Client1} = emqtt:start_link([{clean_start, true} | EmqttOpts]),
  875. {ok, _} = emqtt:ConnFun(Client1),
  876. ok = emqtt:disconnect(Client1),
  877. maybe_kill_connection_process(ClientId, Config),
  878. ?check_trace(
  879. begin
  880. {ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]),
  881. {ok, _} = emqtt:ConnFun(Client2),
  882. ok = emqtt:disconnect(Client2)
  883. end,
  884. fun(ok, Trace) ->
  885. check_snabbkaffe_vanilla(Trace)
  886. end
  887. ),
  888. ok.
  889. t_snabbkaffe_pending_messages(Config) ->
  890. %% Make sure there are pending messages are fetched during the init stage.
  891. ConnFun = ?config(conn_fun, Config),
  892. ClientId = ?config(client_id, Config),
  893. Topic = ?config(topic, Config),
  894. STopic = ?config(stopic, Config),
  895. Payloads = [<<"test", (integer_to_binary(X))/binary>> || X <- [1, 2, 3, 4, 5]],
  896. EmqttOpts = [
  897. {proto_ver, v5},
  898. {clientid, ClientId},
  899. {properties, #{'Session-Expiry-Interval' => 30}}
  900. | Config
  901. ],
  902. {ok, Client1} = emqtt:start_link([{clean_start, true} | EmqttOpts]),
  903. {ok, _} = emqtt:ConnFun(Client1),
  904. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  905. ok = emqtt:disconnect(Client1),
  906. maybe_kill_connection_process(ClientId, Config),
  907. ?check_trace(
  908. begin
  909. snabbkaffe_sync_publish(Topic, Payloads),
  910. {ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]),
  911. {ok, _} = emqtt:ConnFun(Client2),
  912. Msgs = receive_messages(length(Payloads)),
  913. ReceivedPayloads = [P || #{payload := P} <- Msgs],
  914. ?assertEqual(lists:sort(ReceivedPayloads), lists:sort(Payloads)),
  915. ok = emqtt:disconnect(Client2)
  916. end,
  917. fun(ok, Trace) ->
  918. check_snabbkaffe_vanilla(Trace),
  919. %% Check that all messages was delivered from the DB
  920. [Delivers1] = ?projection(msgs, ?of_kind(ps_persist_pendings_msgs, Trace)),
  921. [Delivers2] = ?projection(msgs, ?of_kind(ps_marker_pendings_msgs, Trace)),
  922. Delivers = Delivers1 ++ Delivers2,
  923. ?assertEqual(length(Payloads), length(Delivers)),
  924. %% Check for no duplicates
  925. ?assertEqual(lists:usort(Delivers), lists:sort(Delivers))
  926. end
  927. ),
  928. ok.
  929. t_snabbkaffe_buffered_messages(Config) ->
  930. %% Make sure to buffer messages during startup.
  931. ConnFun = ?config(conn_fun, Config),
  932. ClientId = ?config(client_id, Config),
  933. Topic = ?config(topic, Config),
  934. STopic = ?config(stopic, Config),
  935. Payloads1 = [<<"test", (integer_to_binary(X))/binary>> || X <- [1, 2, 3]],
  936. Payloads2 = [<<"test", (integer_to_binary(X))/binary>> || X <- [4, 5, 6]],
  937. EmqttOpts = [
  938. {proto_ver, v5},
  939. {clientid, ClientId},
  940. {properties, #{'Session-Expiry-Interval' => 30}}
  941. | Config
  942. ],
  943. {ok, Client1} = emqtt:start_link([{clean_start, true} | EmqttOpts]),
  944. {ok, _} = emqtt:ConnFun(Client1),
  945. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  946. ok = emqtt:disconnect(Client1),
  947. maybe_kill_connection_process(ClientId, Config),
  948. publish(Topic, Payloads1),
  949. ?check_trace(
  950. begin
  951. %% Make the resume init phase wait until the first message is delivered.
  952. ?force_ordering(
  953. #{?snk_kind := ps_worker_deliver},
  954. #{?snk_kind := ps_resume_end}
  955. ),
  956. Parent = self(),
  957. spawn_link(fun() ->
  958. ?block_until(#{?snk_kind := ps_marker_pendings_msgs}, infinity, 5000),
  959. publish(Topic, Payloads2, true),
  960. Parent ! publish_done,
  961. ok
  962. end),
  963. {ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]),
  964. {ok, _} = emqtt:ConnFun(Client2),
  965. receive
  966. publish_done -> ok
  967. after 10000 -> error(too_long_to_publish)
  968. end,
  969. Msgs = receive_messages(length(Payloads1) + length(Payloads2) + 1),
  970. ReceivedPayloads = [P || #{payload := P} <- Msgs],
  971. ?assertEqual(
  972. lists:sort(Payloads1 ++ Payloads2),
  973. lists:sort(ReceivedPayloads)
  974. ),
  975. ok = emqtt:disconnect(Client2)
  976. end,
  977. fun(ok, Trace) ->
  978. check_snabbkaffe_vanilla(Trace),
  979. %% Check that some messages was buffered in the writer process
  980. [Msgs] = ?projection(msgs, ?of_kind(ps_writer_pendings, Trace)),
  981. ?assertMatch(
  982. X when 0 < X andalso X =< length(Payloads2),
  983. length(Msgs)
  984. )
  985. end
  986. ),
  987. ok.
  988. %%--------------------------------------------------------------------
  989. %% GC tests
  990. %%--------------------------------------------------------------------
  991. -define(MARKER, 3).
  992. -define(DELIVERED, 2).
  993. -define(UNDELIVERED, 1).
  994. -define(ABANDONED, 0).
  995. msg_id() ->
  996. emqx_guid:gen().
  997. delivered_msg(MsgId, SessionID, STopic) ->
  998. {SessionID, MsgId, STopic, ?DELIVERED}.
  999. undelivered_msg(MsgId, SessionID, STopic) ->
  1000. {SessionID, MsgId, STopic, ?UNDELIVERED}.
  1001. marker_msg(MarkerID, SessionID) ->
  1002. {SessionID, MarkerID, <<>>, ?MARKER}.
  1003. guid(MicrosecondsAgo) ->
  1004. %% Make a fake GUID and set a timestamp.
  1005. <<TS:64, Tail:64>> = emqx_guid:gen(),
  1006. <<(TS - MicrosecondsAgo):64, Tail:64>>.
  1007. abandoned_session_msg(SessionID) ->
  1008. abandoned_session_msg(SessionID, 0).
  1009. abandoned_session_msg(SessionID, MicrosecondsAgo) ->
  1010. TS = erlang:system_time(microsecond),
  1011. {SessionID, <<>>, <<(TS - MicrosecondsAgo):64>>, ?ABANDONED}.
  1012. fresh_gc_delete_fun() ->
  1013. Ets = ets:new(gc_collect, [ordered_set]),
  1014. fun
  1015. (delete, Key) ->
  1016. ets:insert(Ets, {Key}),
  1017. ok;
  1018. (collect, <<>>) ->
  1019. List = ets:match(Ets, {'$1'}),
  1020. ets:delete(Ets),
  1021. lists:append(List);
  1022. (_, _Key) ->
  1023. ok
  1024. end.
  1025. fresh_gc_callbacks_fun() ->
  1026. Ets = ets:new(gc_collect, [ordered_set]),
  1027. fun
  1028. (collect, <<>>) ->
  1029. List = ets:match(Ets, {'$1'}),
  1030. ets:delete(Ets),
  1031. lists:append(List);
  1032. (Tag, Key) ->
  1033. ets:insert(Ets, {{Key, Tag}}),
  1034. ok
  1035. end.
  1036. get_gc_delete_messages() ->
  1037. Fun = fresh_gc_delete_fun(),
  1038. emqx_persistent_session:gc_session_messages(Fun),
  1039. Fun(collect, <<>>).
  1040. get_gc_callbacks() ->
  1041. Fun = fresh_gc_callbacks_fun(),
  1042. emqx_persistent_session:gc_session_messages(Fun),
  1043. Fun(collect, <<>>).
  1044. t_gc_all_delivered(Config) ->
  1045. Store = ?config(session_msg_store, Config),
  1046. STopic = ?config(stopic, Config),
  1047. SessionId = emqx_guid:gen(),
  1048. MsgIds = [msg_id() || _ <- lists:seq(1, 5)],
  1049. Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
  1050. Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
  1051. SortedContent = lists:usort(Delivered ++ Undelivered),
  1052. ets:insert(Store, [{X, <<>>} || X <- SortedContent]),
  1053. GCMessages = get_gc_delete_messages(),
  1054. ?assertEqual(SortedContent, GCMessages),
  1055. ok.
  1056. t_gc_some_undelivered(Config) ->
  1057. Store = ?config(session_msg_store, Config),
  1058. STopic = ?config(stopic, Config),
  1059. SessionId = emqx_guid:gen(),
  1060. MsgIds = [msg_id() || _ <- lists:seq(1, 10)],
  1061. Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
  1062. {Delivered1, _Delivered2} = split(Delivered),
  1063. Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
  1064. {Undelivered1, Undelivered2} = split(Undelivered),
  1065. Content = Delivered1 ++ Undelivered1 ++ Undelivered2,
  1066. ets:insert(Store, [{X, <<>>} || X <- Content]),
  1067. Expected = lists:usort(Delivered1 ++ Undelivered1),
  1068. GCMessages = get_gc_delete_messages(),
  1069. ?assertEqual(Expected, GCMessages),
  1070. ok.
  1071. t_gc_with_markers(Config) ->
  1072. Store = ?config(session_msg_store, Config),
  1073. STopic = ?config(stopic, Config),
  1074. SessionId = emqx_guid:gen(),
  1075. MsgIds1 = [msg_id() || _ <- lists:seq(1, 10)],
  1076. MarkerId = msg_id(),
  1077. MsgIds = [msg_id() || _ <- lists:seq(1, 4)] ++ MsgIds1,
  1078. Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
  1079. {Delivered1, _Delivered2} = split(Delivered),
  1080. Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
  1081. {Undelivered1, Undelivered2} = split(Undelivered),
  1082. Markers = [marker_msg(MarkerId, SessionId)],
  1083. Content = Delivered1 ++ Undelivered1 ++ Undelivered2 ++ Markers,
  1084. ets:insert(Store, [{X, <<>>} || X <- Content]),
  1085. Expected = lists:usort(Delivered1 ++ Undelivered1),
  1086. GCMessages = get_gc_delete_messages(),
  1087. ?assertEqual(Expected, GCMessages),
  1088. ok.
  1089. t_gc_abandoned_some_undelivered(Config) ->
  1090. Store = ?config(session_msg_store, Config),
  1091. STopic = ?config(stopic, Config),
  1092. SessionId = emqx_guid:gen(),
  1093. MsgIds = [msg_id() || _ <- lists:seq(1, 10)],
  1094. Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
  1095. {Delivered1, _Delivered2} = split(Delivered),
  1096. Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
  1097. {Undelivered1, Undelivered2} = split(Undelivered),
  1098. Abandoned = abandoned_session_msg(SessionId),
  1099. Content = Delivered1 ++ Undelivered1 ++ Undelivered2 ++ [Abandoned],
  1100. ets:insert(Store, [{X, <<>>} || X <- Content]),
  1101. Expected = lists:usort(Delivered1 ++ Undelivered1 ++ Undelivered2),
  1102. GCMessages = get_gc_delete_messages(),
  1103. ?assertEqual(Expected, GCMessages),
  1104. ok.
  1105. t_gc_abandoned_only_called_on_empty_session(Config) ->
  1106. Store = ?config(session_msg_store, Config),
  1107. STopic = ?config(stopic, Config),
  1108. SessionId = emqx_guid:gen(),
  1109. MsgIds = [msg_id() || _ <- lists:seq(1, 10)],
  1110. Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
  1111. Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
  1112. Abandoned = abandoned_session_msg(SessionId),
  1113. Content = Delivered ++ Undelivered ++ [Abandoned],
  1114. ets:insert(Store, [{X, <<>>} || X <- Content]),
  1115. GCMessages = get_gc_callbacks(),
  1116. %% Since we had messages to delete, we don't expect to get the
  1117. %% callback on the abandoned session
  1118. ?assertEqual([], [X || {X, abandoned} <- GCMessages]),
  1119. %% But if we have only the abandoned session marker for this
  1120. %% session, it should be called.
  1121. ets:delete_all_objects(Store),
  1122. UndeliveredOtherSession = undelivered_msg(msg_id(), emqx_guid:gen(), <<"topic">>),
  1123. ets:insert(Store, [{X, <<>>} || X <- [Abandoned, UndeliveredOtherSession]]),
  1124. GCMessages2 = get_gc_callbacks(),
  1125. ?assertEqual([Abandoned], [X || {X, abandoned} <- GCMessages2]),
  1126. ok.
  1127. t_gc_session_gc_worker(init, Config) ->
  1128. meck:new(emqx_persistent_session, [passthrough, no_link]),
  1129. Config;
  1130. t_gc_session_gc_worker('end', _Config) ->
  1131. meck:unload(emqx_persistent_session),
  1132. ok.
  1133. t_gc_session_gc_worker(Config) ->
  1134. STopic = ?config(stopic, Config),
  1135. SessionID = emqx_guid:gen(),
  1136. MsgDeleted = delivered_msg(msg_id(), SessionID, STopic),
  1137. MarkerNotDeleted = marker_msg(msg_id(), SessionID),
  1138. MarkerDeleted = marker_msg(guid(120 * 1000 * 1000), SessionID),
  1139. AbandonedNotDeleted = abandoned_session_msg(SessionID),
  1140. AbandonedDeleted = abandoned_session_msg(SessionID, 500 * 1000 * 1000),
  1141. meck:expect(emqx_persistent_session, delete_session_message, fun(_Key) -> ok end),
  1142. emqx_persistent_session_gc:session_gc_worker(delete, MsgDeleted),
  1143. emqx_persistent_session_gc:session_gc_worker(marker, MarkerNotDeleted),
  1144. emqx_persistent_session_gc:session_gc_worker(marker, MarkerDeleted),
  1145. emqx_persistent_session_gc:session_gc_worker(abandoned, AbandonedDeleted),
  1146. emqx_persistent_session_gc:session_gc_worker(abandoned, AbandonedNotDeleted),
  1147. History = meck:history(emqx_persistent_session, self()),
  1148. DeleteCalls = [
  1149. Key
  1150. || {_Pid, {_, delete_session_message, [Key]}, _Result} <-
  1151. History
  1152. ],
  1153. ?assertEqual(
  1154. lists:sort([MsgDeleted, AbandonedDeleted, MarkerDeleted]),
  1155. lists:sort(DeleteCalls)
  1156. ),
  1157. ok.
  1158. t_gc_message_gc(Config) ->
  1159. Topic = ?config(topic, Config),
  1160. ClientID = ?config(client_id, Config),
  1161. Store = ?config(msg_store, Config),
  1162. NewMsgs = [
  1163. emqx_message:make(ClientID, Topic, integer_to_binary(P))
  1164. || P <- lists:seq(6, 10)
  1165. ],
  1166. Retain = 60 * 1000,
  1167. emqx_config:put(?msg_retain, Retain),
  1168. Msgs1 = [
  1169. emqx_message:make(ClientID, Topic, integer_to_binary(P))
  1170. || P <- lists:seq(1, 5)
  1171. ],
  1172. OldMsgs = [M#message{id = guid(Retain * 1000)} || M <- Msgs1],
  1173. ets:insert(Store, NewMsgs ++ OldMsgs),
  1174. ?assertEqual(lists:sort(OldMsgs ++ NewMsgs), ets:tab2list(Store)),
  1175. ok = emqx_persistent_session_gc:message_gc_worker(),
  1176. ?assertEqual(lists:sort(NewMsgs), ets:tab2list(Store)),
  1177. ok.
  1178. split(List) ->
  1179. split(List, [], []).
  1180. split([], L1, L2) ->
  1181. {L1, L2};
  1182. split([H], L1, L2) ->
  1183. {[H | L1], L2};
  1184. split([H1, H2 | Left], L1, L2) ->
  1185. split(Left, [H1 | L1], [H2 | L2]).