emqx_persistent_session_SUITE.erl 47 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_persistent_session_SUITE).
  17. -include_lib("stdlib/include/assert.hrl").
  18. -include_lib("common_test/include/ct.hrl").
  19. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  20. -include_lib("../include/emqx.hrl").
  21. -include("../src/emqx_persistent_session.hrl").
  22. -compile(export_all).
  23. -compile(nowarn_export_all).
  24. %%--------------------------------------------------------------------
  25. %% SUITE boilerplate
  26. %%--------------------------------------------------------------------
  27. all() ->
  28. [ {group, persistent_store_enabled}
  29. , {group, persistent_store_disabled}
  30. ].
  31. %% A persistent session can be resumed in two ways:
  32. %% 1. The old connection process is still alive, and the session is taken
  33. %% over by the new connection.
  34. %% 2. The old session process has died (e.g., because of node down).
  35. %% The new process resumes the session from the stored state, and finds
  36. %% any subscribed messages from the persistent message store.
  37. %%
  38. %% We want to test both ways, both with the db backend enabled and disabled.
  39. %%
  40. %% In addition, we test both tcp and quic connections.
  41. groups() ->
  42. TCs = emqx_common_test_helpers:all(?MODULE),
  43. SnabbkaffeTCs = [TC || TC <- TCs, is_snabbkaffe_tc(TC)],
  44. GCTests = [TC || TC <- TCs, is_gc_tc(TC)],
  45. OtherTCs = (TCs -- SnabbkaffeTCs) -- GCTests,
  46. [ {persistent_store_enabled, [ {group, ram_tables}
  47. , {group, disc_tables}
  48. ]}
  49. , {persistent_store_disabled, [ {group, no_kill_connection_process}
  50. ]}
  51. , { ram_tables, [], [ {group, no_kill_connection_process}
  52. , {group, kill_connection_process}
  53. , {group, snabbkaffe}
  54. , {group, gc_tests}]}
  55. , { disc_tables, [], [ {group, no_kill_connection_process}
  56. , {group, kill_connection_process}
  57. , {group, snabbkaffe}
  58. , {group, gc_tests}]}
  59. , {no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]}
  60. , { kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]}
  61. , {snabbkaffe, [], [{group, tcp_snabbkaffe}, {group, quic_snabbkaffe}, {group, ws_snabbkaffe}]}
  62. , {tcp, [], OtherTCs}
  63. , {quic, [], OtherTCs}
  64. , {ws, [], OtherTCs}
  65. , {tcp_snabbkaffe, [], SnabbkaffeTCs}
  66. , {quic_snabbkaffe, [], SnabbkaffeTCs}
  67. , {ws_snabbkaffe, [], SnabbkaffeTCs}
  68. , {gc_tests, [], GCTests}
  69. ].
  70. is_snabbkaffe_tc(TC) ->
  71. re:run(atom_to_list(TC), "^t_snabbkaffe_") /= nomatch.
  72. is_gc_tc(TC) ->
  73. re:run(atom_to_list(TC), "^t_gc_") /= nomatch.
  74. init_per_group(persistent_store_enabled, Config) ->
  75. [{persistent_store_enabled, true}|Config];
  76. init_per_group(Group, Config) when Group =:= ram_tables; Group =:= disc_tables ->
  77. %% Start Apps
  78. Reply = case Group =:= ram_tables of
  79. true -> ram;
  80. false -> disc
  81. end,
  82. emqx_common_test_helpers:boot_modules(all),
  83. meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]),
  84. meck:expect(emqx_config, get, fun(?storage_type_key) -> Reply;
  85. (?is_enabled_key) -> true;
  86. (Other) -> meck:passthrough([Other])
  87. end),
  88. emqx_common_test_helpers:start_apps([], fun set_special_confs/1),
  89. ?assertEqual(true, emqx_persistent_session:is_store_enabled()),
  90. ?assertEqual(Reply, emqx_persistent_session:storage_type()),
  91. Config;
  92. init_per_group(persistent_store_disabled, Config) ->
  93. %% Start Apps
  94. emqx_common_test_helpers:boot_modules(all),
  95. meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]),
  96. meck:expect(emqx_config, get, fun(?is_enabled_key) -> false;
  97. (Other) -> meck:passthrough([Other])
  98. end),
  99. emqx_common_test_helpers:start_apps([], fun set_special_confs/1),
  100. ?assertEqual(false, emqx_persistent_session:is_store_enabled()),
  101. [{persistent_store_enabled, false}|Config];
  102. init_per_group(Group, Config) when Group == ws; Group == ws_snabbkaffe ->
  103. [{ssl,false},
  104. {host,"localhost"},
  105. {enable_websocket,true},
  106. {port, 8083},
  107. {conn_fun, ws_connect}| Config];
  108. init_per_group(Group, Config) when Group == tcp; Group == tcp_snabbkaffe ->
  109. [ {port, 1883}, {conn_fun, connect}| Config];
  110. init_per_group(Group, Config) when Group == quic; Group == quic_snabbkaffe ->
  111. [ {port, 14567}, {conn_fun, quic_connect} | Config];
  112. init_per_group(no_kill_connection_process, Config) ->
  113. [ {kill_connection_process, false} | Config];
  114. init_per_group(kill_connection_process, Config) ->
  115. [ {kill_connection_process, true} | Config];
  116. init_per_group(snabbkaffe, Config) ->
  117. [ {kill_connection_process, true} | Config];
  118. init_per_group(gc_tests, Config) ->
  119. %% We need to make sure the system does not interfere with this test group.
  120. lists:foreach(fun(ClientId) ->
  121. maybe_kill_connection_process(ClientId, [{kill_connection_process, true}])
  122. end, emqx_cm:all_client_ids()),
  123. emqx_common_test_helpers:stop_apps([]),
  124. SessionMsgEts = gc_tests_session_store,
  125. MsgEts = gc_tests_msg_store,
  126. Pid = spawn(fun() ->
  127. ets:new(SessionMsgEts, [named_table, public, ordered_set]),
  128. ets:new(MsgEts, [named_table, public, ordered_set, {keypos, 2}]),
  129. receive stop -> ok end
  130. end),
  131. meck:new(mnesia, [non_strict, passthrough, no_history, no_link]),
  132. meck:expect(mnesia, dirty_first, fun(?SESS_MSG_TAB_RAM) -> ets:first(SessionMsgEts);
  133. (?SESS_MSG_TAB_DISC) -> ets:first(SessionMsgEts);
  134. (?MSG_TAB_RAM) -> ets:first(MsgEts);
  135. (?MSG_TAB_DISC) -> ets:first(MsgEts);
  136. (X) -> meck:passthrough([X])
  137. end),
  138. meck:expect(mnesia, dirty_next, fun(?SESS_MSG_TAB_RAM, X) -> ets:next(SessionMsgEts, X);
  139. (?SESS_MSG_TAB_DISC, X) -> ets:next(SessionMsgEts, X);
  140. (?MSG_TAB_RAM, X) -> ets:next(MsgEts, X);
  141. (?MSG_TAB_DISC, X) -> ets:next(MsgEts, X);
  142. (Tab, X) -> meck:passthrough([Tab, X])
  143. end),
  144. meck:expect(mnesia, dirty_delete, fun(?MSG_TAB_RAM, X) -> ets:delete(MsgEts, X);
  145. (?MSG_TAB_DISC, X) -> ets:delete(MsgEts, X);
  146. (Tab, X) -> meck:passthrough([Tab, X])
  147. end),
  148. [{store_owner, Pid}, {session_msg_store, SessionMsgEts}, {msg_store, MsgEts} | Config].
  149. init_per_suite(Config) ->
  150. Config.
  151. set_special_confs(_) ->
  152. ok.
  153. end_per_suite(_Config) ->
  154. ok.
  155. end_per_group(gc_tests, Config) ->
  156. meck:unload(mnesia),
  157. ?config(store_owner, Config) ! stop,
  158. ok;
  159. end_per_group(Group, _Config) when Group =:= ram_tables; Group =:= disc_tables ->
  160. meck:unload(emqx_config),
  161. emqx_common_test_helpers:stop_apps([]);
  162. end_per_group(persistent_store_disabled, _Config) ->
  163. meck:unload(emqx_config),
  164. emqx_common_test_helpers:stop_apps([]);
  165. end_per_group(_Group, _Config) ->
  166. ok.
  167. init_per_testcase(TestCase, Config) ->
  168. Config1 = preconfig_per_testcase(TestCase, Config),
  169. case is_gc_tc(TestCase) of
  170. true ->
  171. ets:delete_all_objects(?config(msg_store, Config)),
  172. ets:delete_all_objects(?config(session_msg_store, Config));
  173. false ->
  174. skip
  175. end,
  176. case erlang:function_exported(?MODULE, TestCase, 2) of
  177. true -> ?MODULE:TestCase(init, Config1);
  178. _ -> Config1
  179. end.
  180. end_per_testcase(TestCase, Config) ->
  181. case is_snabbkaffe_tc(TestCase) of
  182. true -> snabbkaffe:stop();
  183. false -> skip
  184. end,
  185. case erlang:function_exported(?MODULE, TestCase, 2) of
  186. true -> ?MODULE:TestCase('end', Config);
  187. false -> ok
  188. end,
  189. Config.
  190. preconfig_per_testcase(TestCase, Config) ->
  191. {BaseName, Config1} =
  192. case ?config(tc_group_properties, Config) of
  193. [] ->
  194. %% We are running a single testcase
  195. {atom_to_binary(TestCase),
  196. init_per_group(tcp, init_per_group(kill_connection_process, Config))};
  197. [_|_] = Props->
  198. Path = lists:reverse(?config(tc_group_path, Config) ++ Props),
  199. Pre0 = [atom_to_list(N) || {name, N} <- lists:flatten(Path)],
  200. Pre1 = lists:join("_", Pre0 ++ [atom_to_binary(TestCase)]),
  201. {iolist_to_binary(Pre1),
  202. Config}
  203. end,
  204. [ {topic, iolist_to_binary([BaseName, "/foo"])}
  205. , {stopic, iolist_to_binary([BaseName, "/+"])}
  206. , {stopic_alt, iolist_to_binary([BaseName, "/foo"])}
  207. , {client_id, BaseName}
  208. | Config1].
  209. %%--------------------------------------------------------------------
  210. %% Helpers
  211. %%--------------------------------------------------------------------
  212. client_info(Key, Client) ->
  213. maps:get(Key, maps:from_list(emqtt:info(Client)), undefined).
  214. receive_messages(Count) ->
  215. receive_messages(Count, []).
  216. receive_messages(0, Msgs) ->
  217. Msgs;
  218. receive_messages(Count, Msgs) ->
  219. receive
  220. {publish, Msg} ->
  221. receive_messages(Count-1, [Msg|Msgs]);
  222. _Other ->
  223. receive_messages(Count, Msgs)
  224. after 5000 ->
  225. Msgs
  226. end.
  227. maybe_kill_connection_process(ClientId, Config) ->
  228. case ?config(kill_connection_process, Config) of
  229. true ->
  230. case emqx_cm:lookup_channels(ClientId) of
  231. [] ->
  232. ok;
  233. [ConnectionPid] ->
  234. ?assert(is_pid(ConnectionPid)),
  235. Ref = monitor(process, ConnectionPid),
  236. ConnectionPid ! die_if_test,
  237. receive {'DOWN', Ref, process, ConnectionPid, normal} -> ok
  238. after 3000 -> error(process_did_not_die)
  239. end,
  240. wait_for_cm_unregister(ClientId)
  241. end;
  242. false ->
  243. ok
  244. end.
  245. wait_for_cm_unregister(ClientId) ->
  246. wait_for_cm_unregister(ClientId, 100).
  247. wait_for_cm_unregister(_ClientId, 0) ->
  248. error(cm_did_not_unregister);
  249. wait_for_cm_unregister(ClientId, N) ->
  250. case emqx_cm:lookup_channels(ClientId) of
  251. [] -> ok;
  252. [_] -> timer:sleep(100), wait_for_cm_unregister(ClientId, N - 1)
  253. end.
  254. snabbkaffe_sync_publish(Topic, Payloads) ->
  255. Fun = fun(Client, Payload) ->
  256. ?check_trace(
  257. begin
  258. ?wait_async_action( {ok, _} = emqtt:publish(Client, Topic, Payload, 2)
  259. , #{?snk_kind := ps_persist_msg, payload := Payload}
  260. )
  261. end,
  262. fun(_, _Trace) -> ok end)
  263. end,
  264. do_publish(Payloads, Fun, true).
  265. publish(Topic, Payloads) ->
  266. publish(Topic, Payloads, false).
  267. publish(Topic, Payloads, WaitForUnregister) ->
  268. Fun = fun(Client, Payload) ->
  269. {ok, _} = emqtt:publish(Client, Topic, Payload, 2)
  270. end,
  271. do_publish(Payloads, Fun, WaitForUnregister).
  272. do_publish(Payloads = [_|_], PublishFun, WaitForUnregister) ->
  273. %% Publish from another process to avoid connection confusion.
  274. {Pid, Ref} =
  275. spawn_monitor(
  276. fun() ->
  277. %% For convenience, always publish using tcp.
  278. %% The publish path is not what we are testing.
  279. ClientID = <<"ps_SUITE_publisher">>,
  280. {ok, Client} = emqtt:start_link([ {proto_ver, v5}
  281. , {clientid, ClientID}
  282. , {port, 1883} ]),
  283. {ok, _} = emqtt:connect(Client),
  284. lists:foreach(fun(Payload) -> PublishFun(Client, Payload) end, Payloads),
  285. ok = emqtt:disconnect(Client),
  286. %% Snabbkaffe sometimes fails unless all processes are gone.
  287. case WaitForUnregister of
  288. false ->
  289. ok;
  290. true ->
  291. case emqx_cm:lookup_channels(ClientID) of
  292. [] ->
  293. ok;
  294. [ConnectionPid] ->
  295. ?assert(is_pid(ConnectionPid)),
  296. Ref1 = monitor(process, ConnectionPid),
  297. receive {'DOWN', Ref1, process, ConnectionPid, _} -> ok
  298. after 3000 -> error(process_did_not_die)
  299. end,
  300. wait_for_cm_unregister(ClientID)
  301. end
  302. end
  303. end),
  304. receive
  305. {'DOWN', Ref, process, Pid, normal} -> ok;
  306. {'DOWN', Ref, process, Pid, What} -> error({failed_publish, What})
  307. end;
  308. do_publish(Payload, PublishFun, WaitForUnregister) ->
  309. do_publish([Payload], PublishFun, WaitForUnregister).
  310. %%--------------------------------------------------------------------
  311. %% Test Cases
  312. %%--------------------------------------------------------------------
  313. %% [MQTT-3.1.2-23]
  314. t_connect_session_expiry_interval(Config) ->
  315. ConnFun = ?config(conn_fun, Config),
  316. Topic = ?config(topic, Config),
  317. STopic = ?config(stopic, Config),
  318. Payload = <<"test message">>,
  319. ClientId = ?config(client_id, Config),
  320. {ok, Client1} = emqtt:start_link([ {clientid, ClientId},
  321. {proto_ver, v5},
  322. {properties, #{'Session-Expiry-Interval' => 30}}
  323. | Config]),
  324. {ok, _} = emqtt:ConnFun(Client1),
  325. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  326. ok = emqtt:disconnect(Client1),
  327. maybe_kill_connection_process(ClientId, Config),
  328. publish(Topic, Payload),
  329. {ok, Client2} = emqtt:start_link([ {clientid, ClientId},
  330. {proto_ver, v5},
  331. {properties, #{'Session-Expiry-Interval' => 30}},
  332. {clean_start, false}
  333. | Config]),
  334. {ok, _} = emqtt:ConnFun(Client2),
  335. [Msg | _ ] = receive_messages(1),
  336. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)),
  337. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)),
  338. ?assertEqual({ok, 2}, maps:find(qos, Msg)),
  339. ok = emqtt:disconnect(Client2).
  340. t_without_client_id(Config) ->
  341. process_flag(trap_exit, true), %% Emqtt client dies
  342. ConnFun = ?config(conn_fun, Config),
  343. {ok, Client0} = emqtt:start_link([ {proto_ver, v5},
  344. {properties, #{'Session-Expiry-Interval' => 30}},
  345. {clean_start, false}
  346. | Config]),
  347. {error, {client_identifier_not_valid, _}} = emqtt:ConnFun(Client0),
  348. ok.
  349. t_assigned_clientid_persistent_session(Config) ->
  350. ConnFun = ?config(conn_fun, Config),
  351. {ok, Client1} = emqtt:start_link([ {proto_ver, v5},
  352. {properties, #{'Session-Expiry-Interval' => 30}},
  353. {clean_start, true}
  354. | Config]),
  355. {ok, _} = emqtt:ConnFun(Client1),
  356. AssignedClientId = client_info(clientid, Client1),
  357. ok = emqtt:disconnect(Client1),
  358. maybe_kill_connection_process(AssignedClientId, Config),
  359. {ok, Client2} = emqtt:start_link([ {clientid, AssignedClientId},
  360. {proto_ver, v5},
  361. {clean_start, false}
  362. | Config]),
  363. {ok, _} = emqtt:ConnFun(Client2),
  364. ?assertEqual(1, client_info(session_present, Client2)),
  365. ok = emqtt:disconnect(Client2).
  366. t_cancel_on_disconnect(Config) ->
  367. %% Open a persistent session, but cancel the persistence when
  368. %% shutting down the connection.
  369. ConnFun = ?config(conn_fun, Config),
  370. ClientId = ?config(client_id, Config),
  371. {ok, Client1} = emqtt:start_link([ {proto_ver, v5},
  372. {clientid, ClientId},
  373. {properties, #{'Session-Expiry-Interval' => 30}},
  374. {clean_start, true}
  375. | Config]),
  376. {ok, _} = emqtt:ConnFun(Client1),
  377. ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 0}),
  378. wait_for_cm_unregister(ClientId),
  379. {ok, Client2} = emqtt:start_link([ {clientid, ClientId},
  380. {proto_ver, v5},
  381. {clean_start, false},
  382. {properties, #{'Session-Expiry-Interval' => 30}}
  383. | Config]),
  384. {ok, _} = emqtt:ConnFun(Client2),
  385. ?assertEqual(0, client_info(session_present, Client2)),
  386. ok = emqtt:disconnect(Client2).
  387. t_persist_on_disconnect(Config) ->
  388. %% Open a non-persistent session, but add the persistence when
  389. %% shutting down the connection. This is a protocol error, and
  390. %% should not convert the session into a persistent session.
  391. ConnFun = ?config(conn_fun, Config),
  392. ClientId = ?config(client_id, Config),
  393. {ok, Client1} = emqtt:start_link([ {proto_ver, v5},
  394. {clientid, ClientId},
  395. {properties, #{'Session-Expiry-Interval' => 0}},
  396. {clean_start, true}
  397. | Config]),
  398. {ok, _} = emqtt:ConnFun(Client1),
  399. %% Strangely enough, the disconnect is reported as successful by emqtt.
  400. ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 30}),
  401. wait_for_cm_unregister(ClientId),
  402. {ok, Client2} = emqtt:start_link([ {clientid, ClientId},
  403. {proto_ver, v5},
  404. {clean_start, false},
  405. {properties, #{'Session-Expiry-Interval' => 30}}
  406. | Config]),
  407. {ok, _} = emqtt:ConnFun(Client2),
  408. %% The session should not be known, since it wasn't persisted because of the
  409. %% changed expiry interval in the disconnect call.
  410. ?assertEqual(0, client_info(session_present, Client2)),
  411. ok = emqtt:disconnect(Client2).
  412. wait_for_pending(SId) ->
  413. wait_for_pending(SId, 100).
  414. wait_for_pending(_SId, 0) ->
  415. error(exhausted_wait_for_pending);
  416. wait_for_pending(SId, N) ->
  417. case emqx_persistent_session:pending(SId) of
  418. [] -> timer:sleep(1), wait_for_pending(SId, N - 1);
  419. [_|_] = Pending -> Pending
  420. end.
  421. t_process_dies_session_expires(Config) ->
  422. %% Emulate an error in the connect process,
  423. %% or that the node of the process goes down.
  424. %% A persistent session should eventually expire.
  425. ConnFun = ?config(conn_fun, Config),
  426. ClientId = ?config(client_id, Config),
  427. Topic = ?config(topic, Config),
  428. STopic = ?config(stopic, Config),
  429. Payload = <<"test">>,
  430. {ok, Client1} = emqtt:start_link([ {proto_ver, v5},
  431. {clientid, ClientId},
  432. {properties, #{'Session-Expiry-Interval' => 1}},
  433. {clean_start, true}
  434. | Config]),
  435. {ok, _} = emqtt:ConnFun(Client1),
  436. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  437. ok = emqtt:disconnect(Client1),
  438. maybe_kill_connection_process(ClientId, Config),
  439. ok = publish(Topic, [Payload]),
  440. SessionId =
  441. case ?config(persistent_store_enabled, Config) of
  442. false -> undefined;
  443. true ->
  444. %% The session should not be marked as expired.
  445. {Tag, Session} = emqx_persistent_session:lookup(ClientId),
  446. ?assertEqual(persistent, Tag),
  447. SId = emqx_session:info(id, Session),
  448. case ?config(kill_connection_process, Config) of
  449. true ->
  450. %% The session should have a pending message
  451. ?assertMatch([_], wait_for_pending(SId));
  452. false ->
  453. skip
  454. end,
  455. SId
  456. end,
  457. timer:sleep(1100),
  458. %% The session should now be marked as expired.
  459. case (?config(kill_connection_process, Config) andalso
  460. ?config(persistent_store_enabled, Config)) of
  461. true -> ?assertMatch({expired, _}, emqx_persistent_session:lookup(ClientId));
  462. false -> skip
  463. end,
  464. {ok, Client2} = emqtt:start_link([ {proto_ver, v5},
  465. {clientid, ClientId},
  466. {properties, #{'Session-Expiry-Interval' => 30}},
  467. {clean_start, false}
  468. | Config]),
  469. {ok, _} = emqtt:ConnFun(Client2),
  470. ?assertEqual(0, client_info(session_present, Client2)),
  471. case (?config(kill_connection_process, Config) andalso
  472. ?config(persistent_store_enabled, Config)) of
  473. true ->
  474. %% The session should be a fresh one
  475. {persistent, NewSession} = emqx_persistent_session:lookup(ClientId),
  476. ?assertNotEqual(SessionId, emqx_session:info(id, NewSession)),
  477. %% The old session should now either
  478. %% be marked as abandoned or already be garbage collected.
  479. ?assertMatch([], emqx_persistent_session:pending(SessionId));
  480. false ->
  481. skip
  482. end,
  483. %% We should not receive the pending message
  484. ?assertEqual([], receive_messages(1)),
  485. emqtt:disconnect(Client2).
  486. t_publish_while_client_is_gone(Config) ->
  487. %% A persistent session should receive messages in its
  488. %% subscription even if the process owning the session dies.
  489. ConnFun = ?config(conn_fun, Config),
  490. Topic = ?config(topic, Config),
  491. STopic = ?config(stopic, Config),
  492. Payload1 = <<"hello1">>,
  493. Payload2 = <<"hello2">>,
  494. ClientId = ?config(client_id, Config),
  495. {ok, Client1} = emqtt:start_link([ {proto_ver, v5},
  496. {clientid, ClientId},
  497. {properties, #{'Session-Expiry-Interval' => 30}},
  498. {clean_start, true}
  499. | Config]),
  500. {ok, _} = emqtt:ConnFun(Client1),
  501. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  502. ok = emqtt:disconnect(Client1),
  503. maybe_kill_connection_process(ClientId, Config),
  504. ok = publish(Topic, [Payload1, Payload2]),
  505. {ok, Client2} = emqtt:start_link([ {proto_ver, v5},
  506. {clientid, ClientId},
  507. {properties, #{'Session-Expiry-Interval' => 30}},
  508. {clean_start, false}
  509. | Config]),
  510. {ok, _} = emqtt:ConnFun(Client2),
  511. Msgs = receive_messages(2),
  512. ?assertMatch([_, _], Msgs),
  513. [Msg2, Msg1] = Msgs,
  514. ?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)),
  515. ?assertEqual({ok, 2}, maps:find(qos, Msg1)),
  516. ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)),
  517. ?assertEqual({ok, 2}, maps:find(qos, Msg2)),
  518. ok = emqtt:disconnect(Client2).
  519. t_clean_start_drops_subscriptions(Config) ->
  520. %% 1. A persistent session is started and disconnected.
  521. %% 2. While disconnected, a message is published and persisted.
  522. %% 3. When connecting again, the clean start flag is set, the subscription is renewed,
  523. %% then we disconnect again.
  524. %% 4. Finally, a new connection is made with clean start set to false.
  525. %% The original message should not be delivered.
  526. ConnFun = ?config(conn_fun, Config),
  527. Topic = ?config(topic, Config),
  528. STopic = ?config(stopic, Config),
  529. Payload1 = <<"hello1">>,
  530. Payload2 = <<"hello2">>,
  531. Payload3 = <<"hello3">>,
  532. ClientId = ?config(client_id, Config),
  533. %% 1.
  534. {ok, Client1} = emqtt:start_link([ {proto_ver, v5},
  535. {clientid, ClientId},
  536. {properties, #{'Session-Expiry-Interval' => 30}},
  537. {clean_start, true}
  538. | Config]),
  539. {ok, _} = emqtt:ConnFun(Client1),
  540. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  541. ok = emqtt:disconnect(Client1),
  542. maybe_kill_connection_process(ClientId, Config),
  543. %% 2.
  544. ok = publish(Topic, Payload1),
  545. %% 3.
  546. {ok, Client2} = emqtt:start_link([ {proto_ver, v5},
  547. {clientid, ClientId},
  548. {properties, #{'Session-Expiry-Interval' => 30}},
  549. {clean_start, true}
  550. | Config]),
  551. {ok, _} = emqtt:ConnFun(Client2),
  552. ?assertEqual(0, client_info(session_present, Client2)),
  553. {ok, _, [2]} = emqtt:subscribe(Client2, STopic, qos2),
  554. ok = publish(Topic, Payload2),
  555. [Msg1] = receive_messages(1),
  556. ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg1)),
  557. ok = emqtt:disconnect(Client2),
  558. maybe_kill_connection_process(ClientId, Config),
  559. %% 4.
  560. {ok, Client3} = emqtt:start_link([ {proto_ver, v5},
  561. {clientid, ClientId},
  562. {properties, #{'Session-Expiry-Interval' => 30}},
  563. {clean_start, false}
  564. | Config]),
  565. {ok, _} = emqtt:ConnFun(Client3),
  566. ok = publish(Topic, Payload3),
  567. [Msg2] = receive_messages(1),
  568. ?assertEqual({ok, iolist_to_binary(Payload3)}, maps:find(payload, Msg2)),
  569. ok = emqtt:disconnect(Client3).
  570. t_unsubscribe(Config) ->
  571. ConnFun = ?config(conn_fun, Config),
  572. Topic = ?config(topic, Config),
  573. STopic = ?config(stopic, Config),
  574. ClientId = ?config(client_id, Config),
  575. {ok, Client} = emqtt:start_link([ {clientid, ClientId},
  576. {proto_ver, v5},
  577. {properties, #{'Session-Expiry-Interval' => 30}}
  578. | Config]),
  579. {ok, _} = emqtt:ConnFun(Client),
  580. {ok, _, [2]} = emqtt:subscribe(Client, STopic, qos2),
  581. case emqx_persistent_session:is_store_enabled() of
  582. true ->
  583. {persistent, Session} = emqx_persistent_session:lookup(ClientId),
  584. SessionID = emqx_session:info(id, Session),
  585. SessionIDs = [SId || #route{dest = SId} <- emqx_session_router:match_routes(Topic)],
  586. ?assert(lists:member(SessionID, SessionIDs)),
  587. ?assertMatch([_], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
  588. {ok, _, _} = emqtt:unsubscribe(Client, STopic),
  589. ?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
  590. SessionIDs2 = [SId || #route{dest = SId} <- emqx_session_router:match_routes(Topic)],
  591. ?assert(not lists:member(SessionID, SessionIDs2));
  592. false ->
  593. ?assertMatch([_], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
  594. {ok, _, _} = emqtt:unsubscribe(Client, STopic),
  595. ?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic])
  596. end,
  597. ok = emqtt:disconnect(Client).
  598. t_multiple_subscription_matches(Config) ->
  599. ConnFun = ?config(conn_fun, Config),
  600. Topic = ?config(topic, Config),
  601. STopic1 = ?config(stopic, Config),
  602. STopic2 = ?config(stopic_alt, Config),
  603. Payload = <<"test message">>,
  604. ClientId = ?config(client_id, Config),
  605. {ok, Client1} = emqtt:start_link([ {clientid, ClientId},
  606. {proto_ver, v5},
  607. {properties, #{'Session-Expiry-Interval' => 30}}
  608. | Config]),
  609. {ok, _} = emqtt:ConnFun(Client1),
  610. {ok, _, [2]} = emqtt:subscribe(Client1, STopic1, qos2),
  611. {ok, _, [2]} = emqtt:subscribe(Client1, STopic2, qos2),
  612. ok = emqtt:disconnect(Client1),
  613. maybe_kill_connection_process(ClientId, Config),
  614. publish(Topic, Payload),
  615. {ok, Client2} = emqtt:start_link([ {clientid, ClientId},
  616. {proto_ver, v5},
  617. {properties, #{'Session-Expiry-Interval' => 30}},
  618. {clean_start, false}
  619. | Config]),
  620. {ok, _} = emqtt:ConnFun(Client2),
  621. %% We will receive the same message twice because it matches two subscriptions.
  622. [Msg1, Msg2] = receive_messages(2),
  623. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg1)),
  624. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg1)),
  625. ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg2)),
  626. ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg2)),
  627. ?assertEqual({ok, 2}, maps:find(qos, Msg1)),
  628. ?assertEqual({ok, 2}, maps:find(qos, Msg2)),
  629. ok = emqtt:disconnect(Client2).
  630. t_lost_messages_because_of_gc(init, Config) ->
  631. case (emqx_persistent_session:is_store_enabled()
  632. andalso ?config(kill_connection_process, Config)) of
  633. true ->
  634. Retain = 1000,
  635. OldRetain = emqx_config:get(?msg_retain, Retain),
  636. emqx_config:put(?msg_retain, Retain),
  637. [{retain, Retain}, {old_retain, OldRetain}|Config];
  638. false -> {skip, only_relevant_with_store_and_kill_process}
  639. end;
  640. t_lost_messages_because_of_gc('end', Config) ->
  641. OldRetain = ?config(old_retain, Config),
  642. emqx_config:put(?msg_retain, OldRetain),
  643. ok.
  644. t_lost_messages_because_of_gc(Config) ->
  645. ConnFun = ?config(conn_fun, Config),
  646. Topic = ?config(topic, Config),
  647. STopic = ?config(stopic, Config),
  648. ClientId = ?config(client_id, Config),
  649. Retain = ?config(retain, Config),
  650. Payload1 = <<"hello1">>,
  651. Payload2 = <<"hello2">>,
  652. {ok, Client1} = emqtt:start_link([ {clientid, ClientId},
  653. {proto_ver, v5},
  654. {properties, #{'Session-Expiry-Interval' => 30}}
  655. | Config]),
  656. {ok, _} = emqtt:ConnFun(Client1),
  657. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  658. emqtt:disconnect(Client1),
  659. maybe_kill_connection_process(ClientId, Config),
  660. publish(Topic, Payload1),
  661. timer:sleep(2 * Retain),
  662. publish(Topic, Payload2),
  663. emqx_persistent_session_gc:message_gc_worker(),
  664. {ok, Client2} = emqtt:start_link([ {clientid, ClientId},
  665. {clean_start, false},
  666. {proto_ver, v5},
  667. {properties, #{'Session-Expiry-Interval' => 30}}
  668. | Config]),
  669. {ok, _} = emqtt:ConnFun(Client2),
  670. Msgs = receive_messages(2),
  671. ?assertMatch([_], Msgs),
  672. ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, hd(Msgs))),
  673. emqtt:disconnect(Client2),
  674. ok.
  675. %%--------------------------------------------------------------------
  676. %% Snabbkaffe helpers
  677. %%--------------------------------------------------------------------
  678. check_snabbkaffe_vanilla(Trace) ->
  679. ResumeTrace = [T || #{?snk_kind := K} = T <- Trace,
  680. re:run(to_list(K), "^ps_") /= nomatch],
  681. ?assertMatch([_|_], ResumeTrace),
  682. [_Sid] = lists:usort(?projection(sid, ResumeTrace)),
  683. %% Check internal flow of the emqx_cm resuming
  684. ?assert(?strict_causality(#{ ?snk_kind := ps_resuming },
  685. #{ ?snk_kind := ps_initial_pendings },
  686. ResumeTrace)),
  687. ?assert(?strict_causality(#{ ?snk_kind := ps_initial_pendings },
  688. #{ ?snk_kind := ps_persist_pendings },
  689. ResumeTrace)),
  690. ?assert(?strict_causality(#{ ?snk_kind := ps_persist_pendings },
  691. #{ ?snk_kind := ps_notify_writers },
  692. ResumeTrace)),
  693. ?assert(?strict_causality(#{ ?snk_kind := ps_notify_writers },
  694. #{ ?snk_kind := ps_node_markers },
  695. ResumeTrace)),
  696. ?assert(?strict_causality(#{ ?snk_kind := ps_node_markers },
  697. #{ ?snk_kind := ps_resume_session },
  698. ResumeTrace)),
  699. ?assert(?strict_causality(#{ ?snk_kind := ps_resume_session },
  700. #{ ?snk_kind := ps_marker_pendings },
  701. ResumeTrace)),
  702. ?assert(?strict_causality(#{ ?snk_kind := ps_marker_pendings },
  703. #{ ?snk_kind := ps_marker_pendings_msgs },
  704. ResumeTrace)),
  705. ?assert(?strict_causality(#{ ?snk_kind := ps_marker_pendings_msgs },
  706. #{ ?snk_kind := ps_resume_end },
  707. ResumeTrace)),
  708. %% Check flow between worker and emqx_cm
  709. ?assert(?strict_causality(#{ ?snk_kind := ps_notify_writers },
  710. #{ ?snk_kind := ps_worker_started },
  711. ResumeTrace)),
  712. ?assert(?strict_causality(#{ ?snk_kind := ps_marker_pendings },
  713. #{ ?snk_kind := ps_worker_resume_end },
  714. ResumeTrace)),
  715. ?assert(?strict_causality(#{ ?snk_kind := ps_worker_resume_end },
  716. #{ ?snk_kind := ps_worker_shutdown },
  717. ResumeTrace)),
  718. [Markers] = ?projection(markers, ?of_kind(ps_node_markers, Trace)),
  719. ?assertMatch([_], Markers).
  720. to_list(L) when is_list(L) -> L;
  721. to_list(A) when is_atom(A) -> atom_to_list(A);
  722. to_list(B) when is_binary(B) -> binary_to_list(B).
  723. %%--------------------------------------------------------------------
  724. %% Snabbkaffe tests
  725. %%--------------------------------------------------------------------
  726. t_snabbkaffe_vanilla_stages(Config) ->
  727. %% Test that all stages of session resume works ok in the simplest case
  728. ConnFun = ?config(conn_fun, Config),
  729. ClientId = ?config(client_id, Config),
  730. EmqttOpts = [ {proto_ver, v5},
  731. {clientid, ClientId},
  732. {properties, #{'Session-Expiry-Interval' => 30}}
  733. | Config],
  734. {ok, Client1} = emqtt:start_link([{clean_start, true}|EmqttOpts]),
  735. {ok, _} = emqtt:ConnFun(Client1),
  736. ok = emqtt:disconnect(Client1),
  737. maybe_kill_connection_process(ClientId, Config),
  738. ?check_trace(
  739. begin
  740. {ok, Client2} = emqtt:start_link([{clean_start, false}|EmqttOpts]),
  741. {ok, _} = emqtt:ConnFun(Client2),
  742. ok = emqtt:disconnect(Client2)
  743. end,
  744. fun(ok, Trace) ->
  745. check_snabbkaffe_vanilla(Trace)
  746. end),
  747. ok.
  748. t_snabbkaffe_pending_messages(Config) ->
  749. %% Make sure there are pending messages are fetched during the init stage.
  750. ConnFun = ?config(conn_fun, Config),
  751. ClientId = ?config(client_id, Config),
  752. Topic = ?config(topic, Config),
  753. STopic = ?config(stopic, Config),
  754. Payloads = [<<"test", (integer_to_binary(X))/binary>> || X <- [1,2,3,4,5]],
  755. EmqttOpts = [ {proto_ver, v5},
  756. {clientid, ClientId},
  757. {properties, #{'Session-Expiry-Interval' => 30}}
  758. | Config],
  759. {ok, Client1} = emqtt:start_link([{clean_start, true} | EmqttOpts]),
  760. {ok, _} = emqtt:ConnFun(Client1),
  761. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  762. ok = emqtt:disconnect(Client1),
  763. maybe_kill_connection_process(ClientId, Config),
  764. ?check_trace(
  765. begin
  766. snabbkaffe_sync_publish(Topic, Payloads),
  767. {ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]),
  768. {ok, _} = emqtt:ConnFun(Client2),
  769. Msgs = receive_messages(length(Payloads)),
  770. ReceivedPayloads = [P || #{ payload := P } <- Msgs],
  771. ?assertEqual(lists:sort(ReceivedPayloads), lists:sort(Payloads)),
  772. ok = emqtt:disconnect(Client2)
  773. end,
  774. fun(ok, Trace) ->
  775. check_snabbkaffe_vanilla(Trace),
  776. %% Check that all messages was delivered from the DB
  777. [Delivers1] = ?projection(msgs, ?of_kind(ps_persist_pendings_msgs, Trace)),
  778. [Delivers2] = ?projection(msgs, ?of_kind(ps_marker_pendings_msgs, Trace)),
  779. Delivers = Delivers1 ++ Delivers2,
  780. ?assertEqual(length(Payloads), length(Delivers)),
  781. %% Check for no duplicates
  782. ?assertEqual(lists:usort(Delivers), lists:sort(Delivers))
  783. end),
  784. ok.
  785. t_snabbkaffe_buffered_messages(Config) ->
  786. %% Make sure to buffer messages during startup.
  787. ConnFun = ?config(conn_fun, Config),
  788. ClientId = ?config(client_id, Config),
  789. Topic = ?config(topic, Config),
  790. STopic = ?config(stopic, Config),
  791. Payloads1 = [<<"test", (integer_to_binary(X))/binary>> || X <- [1, 2, 3]],
  792. Payloads2 = [<<"test", (integer_to_binary(X))/binary>> || X <- [4, 5, 6]],
  793. EmqttOpts = [ {proto_ver, v5},
  794. {clientid, ClientId},
  795. {properties, #{'Session-Expiry-Interval' => 30}}
  796. | Config],
  797. {ok, Client1} = emqtt:start_link([{clean_start, true} | EmqttOpts]),
  798. {ok, _} = emqtt:ConnFun(Client1),
  799. {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
  800. ok = emqtt:disconnect(Client1),
  801. maybe_kill_connection_process(ClientId, Config),
  802. publish(Topic, Payloads1),
  803. ?check_trace(
  804. begin
  805. %% Make the resume init phase wait until the first message is delivered.
  806. ?force_ordering( #{ ?snk_kind := ps_worker_deliver },
  807. #{ ?snk_kind := ps_resume_end }),
  808. Parent = self(),
  809. spawn_link(fun() ->
  810. ?block_until(#{?snk_kind := ps_marker_pendings_msgs}, infinity, 5000),
  811. publish(Topic, Payloads2, true),
  812. Parent ! publish_done,
  813. ok
  814. end),
  815. {ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]),
  816. {ok, _} = emqtt:ConnFun(Client2),
  817. receive publish_done -> ok after 10000 -> error(too_long_to_publish) end,
  818. Msgs = receive_messages(length(Payloads1) + length(Payloads2) + 1),
  819. ReceivedPayloads = [P || #{ payload := P } <- Msgs],
  820. ?assertEqual(lists:sort(Payloads1 ++ Payloads2),
  821. lists:sort(ReceivedPayloads)),
  822. ok = emqtt:disconnect(Client2)
  823. end,
  824. fun(ok, Trace) ->
  825. check_snabbkaffe_vanilla(Trace),
  826. %% Check that some messages was buffered in the writer process
  827. [Msgs] = ?projection(msgs, ?of_kind(ps_writer_pendings, Trace)),
  828. ?assertMatch(X when 0 < X andalso X =< length(Payloads2),
  829. length(Msgs))
  830. end),
  831. ok.
  832. %%--------------------------------------------------------------------
  833. %% GC tests
  834. %%--------------------------------------------------------------------
  835. -define(MARKER, 3).
  836. -define(DELIVERED, 2).
  837. -define(UNDELIVERED, 1).
  838. -define(ABANDONED, 0).
  839. msg_id() ->
  840. emqx_guid:gen().
  841. delivered_msg(MsgId, SessionID, STopic) ->
  842. {SessionID, MsgId, STopic, ?DELIVERED}.
  843. undelivered_msg(MsgId, SessionID, STopic) ->
  844. {SessionID, MsgId, STopic, ?UNDELIVERED}.
  845. marker_msg(MarkerID, SessionID) ->
  846. {SessionID, MarkerID, <<>>, ?MARKER}.
  847. guid(MicrosecondsAgo) ->
  848. %% Make a fake GUID and set a timestamp.
  849. << TS:64, Tail:64 >> = emqx_guid:gen(),
  850. << (TS - MicrosecondsAgo) : 64, Tail:64 >>.
  851. abandoned_session_msg(SessionID) ->
  852. abandoned_session_msg(SessionID, 0).
  853. abandoned_session_msg(SessionID, MicrosecondsAgo) ->
  854. TS = erlang:system_time(microsecond),
  855. {SessionID, <<>>, <<(TS - MicrosecondsAgo) : 64>>, ?ABANDONED}.
  856. fresh_gc_delete_fun() ->
  857. Ets = ets:new(gc_collect, [ordered_set]),
  858. fun(delete, Key) -> ets:insert(Ets, {Key}), ok;
  859. (collect, <<>>) -> List = ets:match(Ets, {'$1'}), ets:delete(Ets), lists:append(List);
  860. (_, _Key) -> ok
  861. end.
  862. fresh_gc_callbacks_fun() ->
  863. Ets = ets:new(gc_collect, [ordered_set]),
  864. fun(collect, <<>>) -> List = ets:match(Ets, {'$1'}), ets:delete(Ets), lists:append(List);
  865. (Tag, Key) -> ets:insert(Ets, {{Key, Tag}}), ok
  866. end.
  867. get_gc_delete_messages() ->
  868. Fun = fresh_gc_delete_fun(),
  869. emqx_persistent_session:gc_session_messages(Fun),
  870. Fun(collect, <<>>).
  871. get_gc_callbacks() ->
  872. Fun = fresh_gc_callbacks_fun(),
  873. emqx_persistent_session:gc_session_messages(Fun),
  874. Fun(collect, <<>>).
  875. t_gc_all_delivered(Config) ->
  876. Store = ?config(session_msg_store, Config),
  877. STopic = ?config(stopic, Config),
  878. SessionId = emqx_guid:gen(),
  879. MsgIds = [msg_id() || _ <- lists:seq(1, 5)],
  880. Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
  881. Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
  882. SortedContent = lists:usort(Delivered ++ Undelivered),
  883. ets:insert(Store, [{X, <<>>} || X <- SortedContent]),
  884. GCMessages = get_gc_delete_messages(),
  885. ?assertEqual(SortedContent, GCMessages),
  886. ok.
  887. t_gc_some_undelivered(Config) ->
  888. Store = ?config(session_msg_store, Config),
  889. STopic = ?config(stopic, Config),
  890. SessionId = emqx_guid:gen(),
  891. MsgIds = [msg_id() || _ <- lists:seq(1, 10)],
  892. Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
  893. {Delivered1,_Delivered2} = split(Delivered),
  894. Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
  895. {Undelivered1, Undelivered2} = split(Undelivered),
  896. Content = Delivered1 ++ Undelivered1 ++ Undelivered2,
  897. ets:insert(Store, [{X, <<>>} || X <- Content]),
  898. Expected = lists:usort(Delivered1 ++ Undelivered1),
  899. GCMessages = get_gc_delete_messages(),
  900. ?assertEqual(Expected, GCMessages),
  901. ok.
  902. t_gc_with_markers(Config) ->
  903. Store = ?config(session_msg_store, Config),
  904. STopic = ?config(stopic, Config),
  905. SessionId = emqx_guid:gen(),
  906. MsgIds1 = [msg_id() || _ <- lists:seq(1, 10)],
  907. MarkerId = msg_id(),
  908. MsgIds = [msg_id() || _ <- lists:seq(1, 4)] ++ MsgIds1,
  909. Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
  910. {Delivered1,_Delivered2} = split(Delivered),
  911. Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
  912. {Undelivered1, Undelivered2} = split(Undelivered),
  913. Markers = [marker_msg(MarkerId, SessionId)],
  914. Content = Delivered1 ++ Undelivered1 ++ Undelivered2 ++ Markers,
  915. ets:insert(Store, [{X, <<>>} || X <- Content]),
  916. Expected = lists:usort(Delivered1 ++ Undelivered1),
  917. GCMessages = get_gc_delete_messages(),
  918. ?assertEqual(Expected, GCMessages),
  919. ok.
  920. t_gc_abandoned_some_undelivered(Config) ->
  921. Store = ?config(session_msg_store, Config),
  922. STopic = ?config(stopic, Config),
  923. SessionId = emqx_guid:gen(),
  924. MsgIds = [msg_id() || _ <- lists:seq(1, 10)],
  925. Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
  926. {Delivered1,_Delivered2} = split(Delivered),
  927. Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
  928. {Undelivered1, Undelivered2} = split(Undelivered),
  929. Abandoned = abandoned_session_msg(SessionId),
  930. Content = Delivered1 ++ Undelivered1 ++ Undelivered2 ++ [Abandoned],
  931. ets:insert(Store, [{X, <<>>} || X <- Content]),
  932. Expected = lists:usort(Delivered1 ++ Undelivered1 ++ Undelivered2),
  933. GCMessages = get_gc_delete_messages(),
  934. ?assertEqual(Expected, GCMessages),
  935. ok.
  936. t_gc_abandoned_only_called_on_empty_session(Config) ->
  937. Store = ?config(session_msg_store, Config),
  938. STopic = ?config(stopic, Config),
  939. SessionId = emqx_guid:gen(),
  940. MsgIds = [msg_id() || _ <- lists:seq(1, 10)],
  941. Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds],
  942. Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds],
  943. Abandoned = abandoned_session_msg(SessionId),
  944. Content = Delivered ++ Undelivered ++ [Abandoned],
  945. ets:insert(Store, [{X, <<>>} || X <- Content]),
  946. GCMessages = get_gc_callbacks(),
  947. %% Since we had messages to delete, we don't expect to get the
  948. %% callback on the abandoned session
  949. ?assertEqual([], [ X || {X, abandoned} <- GCMessages]),
  950. %% But if we have only the abandoned session marker for this
  951. %% session, it should be called.
  952. ets:delete_all_objects(Store),
  953. UndeliveredOtherSession = undelivered_msg(msg_id(), emqx_guid:gen(), <<"topic">>),
  954. ets:insert(Store, [{X, <<>>} || X <- [Abandoned, UndeliveredOtherSession]]),
  955. GCMessages2 = get_gc_callbacks(),
  956. ?assertEqual([Abandoned], [ X || {X, abandoned} <- GCMessages2]),
  957. ok.
  958. t_gc_session_gc_worker(init, Config) ->
  959. meck:new(emqx_persistent_session, [passthrough, no_link]),
  960. Config;
  961. t_gc_session_gc_worker('end',_Config) ->
  962. meck:unload(emqx_persistent_session),
  963. ok.
  964. t_gc_session_gc_worker(Config) ->
  965. STopic = ?config(stopic, Config),
  966. SessionID = emqx_guid:gen(),
  967. MsgDeleted = delivered_msg(msg_id(), SessionID, STopic),
  968. MarkerNotDeleted = marker_msg(msg_id(), SessionID),
  969. MarkerDeleted = marker_msg(guid(120 * 1000 * 1000), SessionID),
  970. AbandonedNotDeleted = abandoned_session_msg(SessionID),
  971. AbandonedDeleted = abandoned_session_msg(SessionID, 500 * 1000 * 1000),
  972. meck:expect(emqx_persistent_session, delete_session_message, fun(_Key) -> ok end),
  973. emqx_persistent_session_gc:session_gc_worker(delete, MsgDeleted),
  974. emqx_persistent_session_gc:session_gc_worker(marker, MarkerNotDeleted),
  975. emqx_persistent_session_gc:session_gc_worker(marker, MarkerDeleted),
  976. emqx_persistent_session_gc:session_gc_worker(abandoned, AbandonedDeleted),
  977. emqx_persistent_session_gc:session_gc_worker(abandoned, AbandonedNotDeleted),
  978. History = meck:history(emqx_persistent_session, self()),
  979. DeleteCalls = [ Key || {_Pid, {_, delete_session_message, [Key]}, _Result}
  980. <- History],
  981. ?assertEqual(lists:sort([MsgDeleted, AbandonedDeleted, MarkerDeleted]),
  982. lists:sort(DeleteCalls)),
  983. ok.
  984. t_gc_message_gc(Config) ->
  985. Topic = ?config(topic, Config),
  986. ClientID = ?config(client_id, Config),
  987. Store = ?config(msg_store, Config),
  988. NewMsgs = [emqx_message:make(ClientID, Topic, integer_to_binary(P))
  989. || P <- lists:seq(6, 10)],
  990. Retain = 60 * 1000,
  991. emqx_config:put(?msg_retain, Retain),
  992. Msgs1 = [emqx_message:make(ClientID, Topic, integer_to_binary(P))
  993. || P <- lists:seq(1, 5)],
  994. OldMsgs = [M#message{id = guid(Retain*1000)} || M <- Msgs1],
  995. ets:insert(Store, NewMsgs ++ OldMsgs),
  996. ?assertEqual(lists:sort(OldMsgs ++ NewMsgs), ets:tab2list(Store)),
  997. ok = emqx_persistent_session_gc:message_gc_worker(),
  998. ?assertEqual(lists:sort(NewMsgs), ets:tab2list(Store)),
  999. ok.
  1000. split(List) ->
  1001. split(List, [], []).
  1002. split([], L1, L2) ->
  1003. {L1, L2};
  1004. split([H], L1, L2) ->
  1005. {[H|L1], L2};
  1006. split([H1, H2|Left], L1, L2) ->
  1007. split(Left, [H1|L1], [H2|L2]).