emqx_persistent_session_ds_SUITE.erl 24 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_persistent_session_ds_SUITE).
  5. -compile(export_all).
  6. -compile(nowarn_export_all).
  7. -include_lib("stdlib/include/assert.hrl").
  8. -include_lib("common_test/include/ct.hrl").
  9. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  10. -include_lib("emqx/include/asserts.hrl").
  11. -include_lib("emqx/include/emqx_mqtt.hrl").
  12. -import(emqx_common_test_helpers, [on_exit/1]).
  13. %%------------------------------------------------------------------------------
  14. %% CT boilerplate
  15. %%------------------------------------------------------------------------------
  16. suite() ->
  17. [{timetrap, {seconds, 60}}].
  18. all() ->
  19. emqx_common_test_helpers:all(?MODULE).
  20. init_per_suite(Config) ->
  21. TCApps = emqx_cth_suite:start(
  22. app_specs(),
  23. #{work_dir => emqx_cth_suite:work_dir(Config)}
  24. ),
  25. [{tc_apps, TCApps} | Config].
  26. end_per_suite(Config) ->
  27. TCApps = ?config(tc_apps, Config),
  28. emqx_cth_suite:stop(TCApps),
  29. ok.
  30. init_per_testcase(TestCase, Config) when
  31. TestCase =:= t_session_subscription_idempotency;
  32. TestCase =:= t_session_unsubscription_idempotency
  33. ->
  34. Cluster = cluster(#{n => 1}),
  35. ClusterOpts = #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)},
  36. NodeSpecs = emqx_cth_cluster:mk_nodespecs(Cluster, ClusterOpts),
  37. Nodes = emqx_cth_cluster:start(NodeSpecs),
  38. [
  39. {cluster, Cluster},
  40. {node_specs, NodeSpecs},
  41. {cluster_opts, ClusterOpts},
  42. {nodes, Nodes}
  43. | Config
  44. ];
  45. init_per_testcase(t_session_gc = TestCase, Config) ->
  46. Opts = #{
  47. n => 3,
  48. roles => [core, core, core],
  49. extra_emqx_conf =>
  50. "\n session_persistence {"
  51. "\n last_alive_update_interval = 500ms "
  52. "\n session_gc_interval = 1s "
  53. "\n session_gc_batch_size = 2 "
  54. "\n }"
  55. },
  56. Cluster = cluster(Opts),
  57. ClusterOpts = #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)},
  58. NodeSpecs = emqx_cth_cluster:mk_nodespecs(Cluster, ClusterOpts),
  59. Nodes = emqx_cth_cluster:start(Cluster, ClusterOpts),
  60. [
  61. {cluster, Cluster},
  62. {node_specs, NodeSpecs},
  63. {cluster_opts, ClusterOpts},
  64. {nodes, Nodes},
  65. {gc_interval, timer:seconds(2)}
  66. | Config
  67. ];
  68. init_per_testcase(_TestCase, Config) ->
  69. Config.
  70. end_per_testcase(TestCase, Config) when
  71. TestCase =:= t_session_subscription_idempotency;
  72. TestCase =:= t_session_unsubscription_idempotency;
  73. TestCase =:= t_session_gc
  74. ->
  75. Nodes = ?config(nodes, Config),
  76. emqx_common_test_helpers:call_janitor(60_000),
  77. ok = emqx_cth_cluster:stop(Nodes),
  78. ok;
  79. end_per_testcase(_TestCase, _Config) ->
  80. emqx_common_test_helpers:call_janitor(60_000),
  81. ok.
  82. %%------------------------------------------------------------------------------
  83. %% Helper functions
  84. %%------------------------------------------------------------------------------
  85. cluster(#{n := N} = Opts) ->
  86. MkRole = fun(M) ->
  87. case maps:get(roles, Opts, undefined) of
  88. undefined ->
  89. core;
  90. Roles ->
  91. lists:nth(M, Roles)
  92. end
  93. end,
  94. MkSpec = fun(M) -> #{role => MkRole(M), apps => app_specs(Opts)} end,
  95. lists:map(
  96. fun(M) ->
  97. Name = list_to_atom("ds_SUITE" ++ integer_to_list(M)),
  98. {Name, MkSpec(M)}
  99. end,
  100. lists:seq(1, N)
  101. ).
  102. app_specs() ->
  103. app_specs(_Opts = #{}).
  104. app_specs(Opts) ->
  105. DefaultEMQXConf = "session_persistence {enable = true, renew_streams_interval = 1s}",
  106. ExtraEMQXConf = maps:get(extra_emqx_conf, Opts, ""),
  107. [
  108. {emqx, DefaultEMQXConf ++ ExtraEMQXConf}
  109. ].
  110. get_mqtt_port(Node, Type) ->
  111. {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
  112. Port.
  113. wait_nodeup(Node) ->
  114. ?retry(
  115. _Sleep0 = 500,
  116. _Attempts0 = 50,
  117. pong = net_adm:ping(Node)
  118. ).
  119. start_client(Opts0 = #{}) ->
  120. Defaults = #{
  121. port => 1883,
  122. proto_ver => v5,
  123. properties => #{'Session-Expiry-Interval' => 300}
  124. },
  125. Opts = emqx_utils_maps:deep_merge(Defaults, Opts0),
  126. ?tp(notice, "starting client", Opts),
  127. {ok, Client} = emqtt:start_link(maps:to_list(Opts)),
  128. unlink(Client),
  129. on_exit(fun() -> catch emqtt:stop(Client) end),
  130. Client.
  131. start_connect_client(Opts = #{}) ->
  132. Client = start_client(Opts),
  133. ?assertMatch({ok, _}, emqtt:connect(Client)),
  134. Client.
  135. mk_clientid(Prefix, ID) ->
  136. iolist_to_binary(io_lib:format("~p/~p", [Prefix, ID])).
  137. restart_node(Node, NodeSpec) ->
  138. ?tp(will_restart_node, #{}),
  139. emqx_cth_cluster:restart(Node, NodeSpec),
  140. wait_nodeup(Node),
  141. ?tp(restarted_node, #{}),
  142. ok.
  143. is_persistent_connect_opts(#{properties := #{'Session-Expiry-Interval' := EI}}) ->
  144. EI > 0.
  145. list_all_sessions(Node) ->
  146. erpc:call(Node, emqx_persistent_session_ds_state, list_sessions, []).
  147. list_all_subscriptions(Node) ->
  148. Sessions = list_all_sessions(Node),
  149. lists:flatmap(
  150. fun(ClientId) ->
  151. #{s := #{subscriptions := Subs}} = erpc:call(
  152. Node, emqx_persistent_session_ds, print_session, [ClientId]
  153. ),
  154. maps:to_list(Subs)
  155. end,
  156. Sessions
  157. ).
  158. list_all_pubranges(Node) ->
  159. erpc:call(Node, emqx_persistent_session_ds, list_all_pubranges, []).
  160. session_open(Node, ClientId) ->
  161. ClientInfo = #{},
  162. ConnInfo = #{peername => {undefined, undefined}, proto_name => <<"MQTT">>, proto_ver => 5},
  163. WillMsg = undefined,
  164. erpc:call(
  165. Node,
  166. emqx_persistent_session_ds,
  167. session_open,
  168. [ClientId, ClientInfo, ConnInfo, WillMsg]
  169. ).
  170. force_last_alive_at(ClientId, Time) ->
  171. {ok, S0} = emqx_persistent_session_ds_state:open(ClientId),
  172. S = emqx_persistent_session_ds_state:set_last_alive_at(Time, S0),
  173. _ = emqx_persistent_session_ds_state:commit(S),
  174. ok.
  175. %%------------------------------------------------------------------------------
  176. %% Testcases
  177. %%------------------------------------------------------------------------------
  178. t_session_subscription_idempotency(Config) ->
  179. [Node1Spec | _] = ?config(node_specs, Config),
  180. [Node1] = ?config(nodes, Config),
  181. Port = get_mqtt_port(Node1, tcp),
  182. SubTopicFilter = <<"t/+">>,
  183. ClientId = <<"myclientid">>,
  184. ?check_trace(
  185. #{timetrap => 30_000},
  186. begin
  187. ?force_ordering(
  188. #{?snk_kind := persistent_session_ds_subscription_added},
  189. _NEvents0 = 1,
  190. #{?snk_kind := will_restart_node},
  191. _Guard0 = true
  192. ),
  193. ?force_ordering(
  194. #{?snk_kind := restarted_node},
  195. _NEvents1 = 1,
  196. #{?snk_kind := persistent_session_ds_open_iterators, ?snk_span := start},
  197. _Guard1 = true
  198. ),
  199. spawn_link(fun() -> restart_node(Node1, Node1Spec) end),
  200. ?tp(notice, "starting 1", #{}),
  201. Client0 = start_client(#{port => Port, clientid => ClientId}),
  202. {ok, _} = emqtt:connect(Client0),
  203. ?tp(notice, "subscribing 1", #{}),
  204. process_flag(trap_exit, true),
  205. catch emqtt:subscribe(Client0, SubTopicFilter, qos2),
  206. receive
  207. {'EXIT', {shutdown, _}} ->
  208. ok
  209. after 100 -> ok
  210. end,
  211. process_flag(trap_exit, false),
  212. {ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000),
  213. ?tp(notice, "starting 2", #{}),
  214. Client1 = start_client(#{port => Port, clientid => ClientId}),
  215. {ok, _} = emqtt:connect(Client1),
  216. ?tp(notice, "subscribing 2", #{}),
  217. {ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2),
  218. ok = emqtt:stop(Client1),
  219. ok
  220. end,
  221. fun(Trace) ->
  222. Session = session_open(Node1, ClientId),
  223. ?assertMatch(
  224. #{SubTopicFilter := #{}},
  225. emqx_session:info(subscriptions, Session)
  226. )
  227. end
  228. ),
  229. ok.
  230. %% Check that we close the iterators before deleting the iterator id entry.
  231. t_session_unsubscription_idempotency(Config) ->
  232. [Node1Spec | _] = ?config(node_specs, Config),
  233. [Node1] = ?config(nodes, Config),
  234. Port = get_mqtt_port(Node1, tcp),
  235. SubTopicFilter = <<"t/+">>,
  236. ClientId = <<"myclientid">>,
  237. ?check_trace(
  238. #{timetrap => 30_000},
  239. begin
  240. ?force_ordering(
  241. #{
  242. ?snk_kind := persistent_session_ds_subscription_delete
  243. },
  244. _NEvents0 = 1,
  245. #{?snk_kind := will_restart_node},
  246. _Guard0 = true
  247. ),
  248. ?force_ordering(
  249. #{?snk_kind := restarted_node},
  250. _NEvents1 = 1,
  251. #{?snk_kind := persistent_session_ds_subscription_route_delete, ?snk_span := start},
  252. _Guard1 = true
  253. ),
  254. spawn_link(fun() -> restart_node(Node1, Node1Spec) end),
  255. ?tp(notice, "starting 1", #{}),
  256. Client0 = start_client(#{port => Port, clientid => ClientId}),
  257. {ok, _} = emqtt:connect(Client0),
  258. ?tp(notice, "subscribing 1", #{}),
  259. {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, SubTopicFilter, qos2),
  260. ?tp(notice, "unsubscribing 1", #{}),
  261. process_flag(trap_exit, true),
  262. catch emqtt:unsubscribe(Client0, SubTopicFilter),
  263. receive
  264. {'EXIT', {shutdown, _}} ->
  265. ok
  266. after 100 -> ok
  267. end,
  268. process_flag(trap_exit, false),
  269. {ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000),
  270. ?tp(notice, "starting 2", #{}),
  271. Client1 = start_client(#{port => Port, clientid => ClientId}),
  272. {ok, _} = emqtt:connect(Client1),
  273. ?tp(notice, "subscribing 2", #{}),
  274. {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2),
  275. ?tp(notice, "unsubscribing 2", #{}),
  276. {{ok, _, [?RC_SUCCESS]}, {ok, _}} =
  277. ?wait_async_action(
  278. emqtt:unsubscribe(Client1, SubTopicFilter),
  279. #{
  280. ?snk_kind := persistent_session_ds_subscription_route_delete,
  281. ?snk_span := {complete, _}
  282. },
  283. 15_000
  284. ),
  285. ok = emqtt:stop(Client1),
  286. ok
  287. end,
  288. fun(Trace) ->
  289. Session = session_open(Node1, ClientId),
  290. ?assertEqual(
  291. #{},
  292. emqx_session:info(subscriptions, Session)
  293. ),
  294. ok
  295. end
  296. ),
  297. ok.
  298. t_session_discard_persistent_to_non_persistent(_Config) ->
  299. ClientId = atom_to_binary(?FUNCTION_NAME),
  300. Params = #{
  301. client_id => ClientId,
  302. reconnect_opts =>
  303. #{
  304. clean_start => true,
  305. %% we set it to zero so that a new session is not created.
  306. properties => #{'Session-Expiry-Interval' => 0},
  307. proto_ver => v5
  308. }
  309. },
  310. do_t_session_discard(Params).
  311. t_session_discard_persistent_to_persistent(_Config) ->
  312. ClientId = atom_to_binary(?FUNCTION_NAME),
  313. Params = #{
  314. client_id => ClientId,
  315. reconnect_opts =>
  316. #{
  317. clean_start => true,
  318. properties => #{'Session-Expiry-Interval' => 30},
  319. proto_ver => v5
  320. }
  321. },
  322. do_t_session_discard(Params).
  323. do_t_session_discard(Params) ->
  324. #{
  325. client_id := ClientId,
  326. reconnect_opts := ReconnectOpts0
  327. } = Params,
  328. ReconnectOpts = ReconnectOpts0#{clientid => ClientId},
  329. SubTopicFilter = <<"t/+">>,
  330. ?check_trace(
  331. #{timetrap => 30_000},
  332. begin
  333. ?tp(notice, "starting", #{}),
  334. Client0 = start_client(#{
  335. clientid => ClientId,
  336. clean_start => false,
  337. properties => #{'Session-Expiry-Interval' => 30},
  338. proto_ver => v5
  339. }),
  340. {ok, _} = emqtt:connect(Client0),
  341. ?tp(notice, "subscribing", #{}),
  342. {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, SubTopicFilter, qos2),
  343. %% Store some matching messages so that streams and iterators are created.
  344. ok = emqtt:publish(Client0, <<"t/1">>, <<"1">>),
  345. ok = emqtt:publish(Client0, <<"t/2">>, <<"2">>),
  346. ?retry(
  347. _Sleep0 = 100,
  348. _Attempts0 = 50,
  349. #{} = emqx_persistent_session_ds_state:print_session(ClientId)
  350. ),
  351. ok = emqtt:stop(Client0),
  352. ?tp(notice, "disconnected", #{}),
  353. ?tp(notice, "reconnecting", #{}),
  354. %% we still have the session:
  355. ?assertMatch(#{}, emqx_persistent_session_ds_state:print_session(ClientId)),
  356. Client1 = start_client(ReconnectOpts),
  357. {ok, _} = emqtt:connect(Client1),
  358. ?assertEqual([], emqtt:subscriptions(Client1)),
  359. case is_persistent_connect_opts(ReconnectOpts) of
  360. true ->
  361. ?assertMatch(#{}, emqx_persistent_session_ds_state:print_session(ClientId));
  362. false ->
  363. ?assertEqual(
  364. undefined, emqx_persistent_session_ds_state:print_session(ClientId)
  365. )
  366. end,
  367. ?assertEqual([], emqx_persistent_session_ds_router:topics()),
  368. ok = emqtt:stop(Client1),
  369. ?tp(notice, "disconnected", #{}),
  370. ok
  371. end,
  372. []
  373. ),
  374. ok.
  375. t_session_expiration1(Config) ->
  376. %% This testcase verifies that the properties passed in the
  377. %% CONNECT packet are respected by the GC process:
  378. ClientId = atom_to_binary(?FUNCTION_NAME),
  379. Opts = #{
  380. clientid => ClientId,
  381. sequence => [
  382. {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 30}}, #{}},
  383. {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 1}}, #{}},
  384. {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 30}}, #{}}
  385. ]
  386. },
  387. do_t_session_expiration(Config, Opts).
  388. t_session_expiration2(Config) ->
  389. %% This testcase updates the expiry interval for the session in
  390. %% the _DISCONNECT_ packet. This setting should be respected by GC
  391. %% process:
  392. ClientId = atom_to_binary(?FUNCTION_NAME),
  393. Opts = #{
  394. clientid => ClientId,
  395. sequence => [
  396. {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 30}}, #{}},
  397. {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 30}}, #{
  398. 'Session-Expiry-Interval' => 1
  399. }},
  400. {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 30}}, #{}}
  401. ]
  402. },
  403. do_t_session_expiration(Config, Opts).
  404. do_t_session_expiration(_Config, Opts) ->
  405. %% Sequence is a list of pairs of properties passed through the
  406. %% CONNECT and for the DISCONNECT for each session:
  407. #{
  408. clientid := ClientId,
  409. sequence := [
  410. {FirstConn, FirstDisconn},
  411. {SecondConn, SecondDisconn},
  412. {ThirdConn, ThirdDisconn}
  413. ]
  414. } = Opts,
  415. CommonParams = #{proto_ver => v5, clientid => ClientId},
  416. ?check_trace(
  417. #{timetrap => 30_000},
  418. begin
  419. Topic = <<"some/topic">>,
  420. Params0 = maps:merge(CommonParams, FirstConn),
  421. Client0 = start_client(Params0),
  422. {ok, _} = emqtt:connect(Client0),
  423. {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, Topic, ?QOS_2),
  424. #{s := #{subscriptions := Subs0}} = emqx_persistent_session_ds:print_session(ClientId),
  425. ?assertEqual(1, map_size(Subs0), #{subs => Subs0}),
  426. Info0 = maps:from_list(emqtt:info(Client0)),
  427. ?assertEqual(0, maps:get(session_present, Info0), #{info => Info0}),
  428. emqtt:disconnect(Client0, ?RC_NORMAL_DISCONNECTION, FirstDisconn),
  429. Params1 = maps:merge(CommonParams, SecondConn),
  430. Client1 = start_client(Params1),
  431. {ok, _} = emqtt:connect(Client1),
  432. Info1 = maps:from_list(emqtt:info(Client1)),
  433. ?assertEqual(1, maps:get(session_present, Info1), #{info => Info1}),
  434. Subs1 = emqtt:subscriptions(Client1),
  435. ?assertEqual([], Subs1),
  436. emqtt:disconnect(Client1, ?RC_NORMAL_DISCONNECTION, SecondDisconn),
  437. ct:sleep(2_500),
  438. Params2 = maps:merge(CommonParams, ThirdConn),
  439. Client2 = start_client(Params2),
  440. {ok, _} = emqtt:connect(Client2),
  441. Info2 = maps:from_list(emqtt:info(Client2)),
  442. ?assertEqual(0, maps:get(session_present, Info2), #{info => Info2}),
  443. Subs2 = emqtt:subscriptions(Client2),
  444. ?assertEqual([], Subs2),
  445. emqtt:publish(Client2, Topic, <<"payload">>),
  446. ?assertNotReceive({publish, #{topic := Topic}}),
  447. %% ensure subscriptions are absent from table.
  448. #{s := #{subscriptions := Subs3}} = emqx_persistent_session_ds:print_session(ClientId),
  449. ?assertEqual([], maps:to_list(Subs3)),
  450. emqtt:disconnect(Client2, ?RC_NORMAL_DISCONNECTION, ThirdDisconn),
  451. ok
  452. end,
  453. []
  454. ),
  455. ok.
  456. t_session_gc(Config) ->
  457. [Node1, _Node2, _Node3] = Nodes = ?config(nodes, Config),
  458. [
  459. Port1,
  460. Port2,
  461. Port3
  462. ] = lists:map(fun(N) -> get_mqtt_port(N, tcp) end, Nodes),
  463. ct:pal("Ports: ~p", [[Port1, Port2, Port3]]),
  464. CommonParams = #{
  465. clean_start => false,
  466. proto_ver => v5
  467. },
  468. StartClient = fun(ClientId, Port, ExpiryInterval) ->
  469. Params = maps:merge(CommonParams, #{
  470. clientid => ClientId,
  471. port => Port,
  472. properties => #{'Session-Expiry-Interval' => ExpiryInterval}
  473. }),
  474. Client = start_client(Params),
  475. {ok, _} = emqtt:connect(Client),
  476. Client
  477. end,
  478. ?check_trace(
  479. #{timetrap => 30_000},
  480. begin
  481. ClientId1 = <<"session_gc1">>,
  482. Client1 = StartClient(ClientId1, Port1, 30),
  483. ClientId2 = <<"session_gc2">>,
  484. Client2 = StartClient(ClientId2, Port2, 1),
  485. ClientId3 = <<"session_gc3">>,
  486. Client3 = StartClient(ClientId3, Port3, 1),
  487. lists:foreach(
  488. fun(Client) ->
  489. Topic = <<"some/topic">>,
  490. Payload = <<"hi">>,
  491. {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Client, Topic, ?QOS_1),
  492. {ok, _} = emqtt:publish(Client, Topic, Payload, ?QOS_1),
  493. ok
  494. end,
  495. [Client1, Client2, Client3]
  496. ),
  497. %% Clients are still alive; no session is garbage collected.
  498. ?tp(notice, "waiting for gc", #{}),
  499. ?assertMatch(
  500. {ok, _},
  501. ?block_until(
  502. #{
  503. ?snk_kind := ds_session_gc,
  504. ?snk_span := {complete, _},
  505. ?snk_meta := #{node := N}
  506. } when N =/= node()
  507. )
  508. ),
  509. ?assertMatch([_, _, _], list_all_sessions(Node1), sessions),
  510. ?assertMatch([_, _, _], list_all_subscriptions(Node1), subscriptions),
  511. ?tp(notice, "gc ran", #{}),
  512. %% Now we disconnect 2 of them; only those should be GC'ed.
  513. ?tp(notice, "disconnecting client1", #{}),
  514. ?assertMatch(
  515. {ok, {ok, _}},
  516. ?wait_async_action(
  517. emqtt:stop(Client2),
  518. #{?snk_kind := terminate}
  519. )
  520. ),
  521. ?tp(notice, "disconnected client1", #{}),
  522. ?assertMatch(
  523. {ok, {ok, _}},
  524. ?wait_async_action(
  525. emqtt:stop(Client3),
  526. #{?snk_kind := terminate}
  527. )
  528. ),
  529. ?tp(notice, "disconnected client2", #{}),
  530. ?assertMatch(
  531. {ok, _},
  532. ?block_until(
  533. #{
  534. ?snk_kind := ds_session_gc_cleaned,
  535. session_id := ClientId2
  536. }
  537. )
  538. ),
  539. ?assertMatch(
  540. {ok, _},
  541. ?block_until(
  542. #{
  543. ?snk_kind := ds_session_gc_cleaned,
  544. session_id := ClientId3
  545. }
  546. )
  547. ),
  548. ?retry(50, 3, [ClientId1] = list_all_sessions(Node1)),
  549. ?assertMatch([_], list_all_subscriptions(Node1), subscriptions),
  550. ok
  551. end,
  552. []
  553. ),
  554. ok.
  555. t_session_replay_retry(_Config) ->
  556. %% Verify that the session recovers smoothly from transient errors during
  557. %% replay.
  558. ok = emqx_ds_test_helpers:mock_rpc(),
  559. NClients = 10,
  560. ClientSubOpts = #{
  561. clientid => mk_clientid(?FUNCTION_NAME, sub),
  562. auto_ack => never
  563. },
  564. ClientSub = start_connect_client(ClientSubOpts),
  565. ?assertMatch(
  566. {ok, _, [?RC_GRANTED_QOS_1]},
  567. emqtt:subscribe(ClientSub, <<"t/#">>, ?QOS_1)
  568. ),
  569. ClientsPub = [
  570. start_connect_client(#{
  571. clientid => mk_clientid(?FUNCTION_NAME, I),
  572. properties => #{'Session-Expiry-Interval' => 0}
  573. })
  574. || I <- lists:seq(1, NClients)
  575. ],
  576. lists:foreach(
  577. fun(Client) ->
  578. Index = integer_to_binary(rand:uniform(NClients)),
  579. Topic = <<"t/", Index/binary>>,
  580. ?assertMatch({ok, #{}}, emqtt:publish(Client, Topic, Index, 1))
  581. end,
  582. ClientsPub
  583. ),
  584. Pubs0 = emqx_common_test_helpers:wait_publishes(NClients, 5_000),
  585. NPubs = length(Pubs0),
  586. ?assertEqual(NClients, NPubs, ?drainMailbox(1_500)),
  587. ok = emqtt:stop(ClientSub),
  588. %% Make `emqx_ds` believe that roughly half of the shards are unavailable.
  589. ok = emqx_ds_test_helpers:mock_rpc_result(
  590. fun(_Node, emqx_ds_replication_layer, _Function, [_DB, Shard | _]) ->
  591. case erlang:phash2(Shard) rem 2 of
  592. 0 -> unavailable;
  593. 1 -> passthrough
  594. end
  595. end
  596. ),
  597. _ClientSub = start_connect_client(ClientSubOpts#{clean_start => false}),
  598. Pubs1 = emqx_common_test_helpers:wait_publishes(NPubs, 5_000),
  599. ?assert(length(Pubs1) < length(Pubs0), Pubs1),
  600. %% "Recover" the shards.
  601. emqx_ds_test_helpers:unmock_rpc(),
  602. Pubs2 = emqx_common_test_helpers:wait_publishes(NPubs - length(Pubs1), 5_000),
  603. ?assertEqual(
  604. [maps:with([topic, payload, qos], P) || P <- Pubs0],
  605. [maps:with([topic, payload, qos], P) || P <- Pubs1 ++ Pubs2]
  606. ).
  607. %% Check that we send will messages when performing GC without relying on timers set by
  608. %% the channel process.
  609. t_session_gc_will_message(_Config) ->
  610. ?check_trace(
  611. #{timetrap => 10_000},
  612. begin
  613. WillTopic = <<"will/t">>,
  614. ok = emqx:subscribe(WillTopic, #{qos => 2}),
  615. ClientId = <<"will_msg_client">>,
  616. Client = start_client(#{
  617. clientid => ClientId,
  618. will_topic => WillTopic,
  619. will_payload => <<"will payload">>,
  620. will_qos => 0,
  621. will_props => #{'Will-Delay-Interval' => 300}
  622. }),
  623. {ok, _} = emqtt:connect(Client),
  624. %% Use reason code =/= `?RC_SUCCESS' to allow will message
  625. {ok, {ok, _}} =
  626. ?wait_async_action(
  627. emqtt:disconnect(Client, ?RC_UNSPECIFIED_ERROR),
  628. #{?snk_kind := emqx_cm_clean_down}
  629. ),
  630. ?assertNotReceive({deliver, WillTopic, _}),
  631. %% Set fake `last_alive_at' to trigger immediate will message.
  632. force_last_alive_at(ClientId, _Time = 0),
  633. {ok, {ok, _}} =
  634. ?wait_async_action(
  635. emqx_persistent_session_ds_gc_worker:check_session(ClientId),
  636. #{?snk_kind := session_gc_published_will_msg}
  637. ),
  638. ?assertReceive({deliver, WillTopic, _}),
  639. ok
  640. end,
  641. []
  642. ),
  643. ok.