emqx_persistent_messages_SUITE.erl 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_persistent_messages_SUITE).
  17. -include_lib("stdlib/include/assert.hrl").
  18. -include_lib("common_test/include/ct.hrl").
  19. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  20. -include_lib("emqx/include/emqx.hrl").
  21. -include_lib("emqx/include/emqx_mqtt.hrl").
  22. -compile(export_all).
  23. -compile(nowarn_export_all).
  24. -import(emqx_common_test_helpers, [on_exit/1]).
  25. -include("emqx_persistent_message.hrl").
  26. all() ->
  27. emqx_common_test_helpers:all(?MODULE).
  28. init_per_suite(Config) ->
  29. Config.
  30. end_per_suite(_Config) ->
  31. ok.
  32. init_per_testcase(t_session_subscription_iterators = TestCase, Config) ->
  33. Cluster = cluster(),
  34. Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}),
  35. _ = wait_shards_online(Nodes),
  36. [{nodes, Nodes} | Config];
  37. init_per_testcase(t_message_gc = TestCase, Config) ->
  38. Opts = #{
  39. extra_emqx_conf =>
  40. "\n durable_sessions.message_retention_period = 3s"
  41. "\n durable_storage.messages.n_shards = 3"
  42. },
  43. common_init_per_testcase(TestCase, [{n_shards, 3} | Config], Opts);
  44. init_per_testcase(t_replication_options = TestCase, Config) ->
  45. Opts = #{
  46. extra_emqx_conf =>
  47. "\n durable_storage.messages.replication_options {"
  48. "\n wal_max_size_bytes = 16000000"
  49. "\n wal_max_batch_size = 1024"
  50. "\n wal_write_strategy = o_sync"
  51. "\n wal_sync_method = datasync"
  52. "\n wal_compute_checksums = false"
  53. "\n snapshot_interval = 64"
  54. "\n resend_window = 60"
  55. "\n}"
  56. },
  57. common_init_per_testcase(TestCase, Config, Opts);
  58. init_per_testcase(TestCase, Config) ->
  59. common_init_per_testcase(TestCase, Config, _Opts = #{}).
  60. common_init_per_testcase(TestCase, Config, Opts) ->
  61. Apps = emqx_cth_suite:start(
  62. app_specs(Opts),
  63. #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
  64. ),
  65. [{apps, Apps} | Config].
  66. end_per_testcase(t_session_subscription_iterators, Config) ->
  67. Nodes = ?config(nodes, Config),
  68. emqx_common_test_helpers:call_janitor(60_000),
  69. ok = emqx_cth_cluster:stop(Nodes);
  70. end_per_testcase(_TestCase, Config) ->
  71. Apps = proplists:get_value(apps, Config, []),
  72. emqx_common_test_helpers:call_janitor(60_000),
  73. ok = emqx_cth_suite:stop(Apps).
  74. t_messages_persisted(_Config) ->
  75. C1 = connect(<<?MODULE_STRING "1">>, true, 30),
  76. C2 = connect(<<?MODULE_STRING "2">>, false, 60),
  77. C3 = connect(<<?MODULE_STRING "3">>, false, undefined),
  78. C4 = connect(<<?MODULE_STRING "4">>, false, 0),
  79. CP = connect(<<?MODULE_STRING "-pub">>, true, undefined),
  80. {ok, _, [1]} = emqtt:subscribe(C1, <<"client/+/topic">>, qos1),
  81. {ok, _, [0]} = emqtt:subscribe(C2, <<"client/+/topic">>, qos0),
  82. {ok, _, [1]} = emqtt:subscribe(C2, <<"random/+">>, qos1),
  83. {ok, _, [2]} = emqtt:subscribe(C3, <<"client/#">>, qos2),
  84. {ok, _, [0]} = emqtt:subscribe(C4, <<"random/#">>, qos0),
  85. Messages = [
  86. M1 = {<<"client/1/topic">>, <<"1">>},
  87. M2 = {<<"client/2/topic">>, <<"2">>},
  88. _M3 = {<<"client/3/topic/sub">>, <<"3">>},
  89. _M4 = {<<"client/4">>, <<"4">>},
  90. M5 = {<<"random/5">>, <<"5">>},
  91. _M6 = {<<"random/6/topic">>, <<"6">>},
  92. M7 = {<<"client/7/topic">>, <<"7">>},
  93. _M8 = {<<"client/8/topic/sub">>, <<"8">>},
  94. M9 = {<<"random/9">>, <<"9">>},
  95. M10 = {<<"random/10">>, <<"10">>}
  96. ],
  97. Results = [emqtt:publish(CP, Topic, Payload, 1) || {Topic, Payload} <- Messages],
  98. ct:pal("Results = ~p", [Results]),
  99. timer:sleep(2000),
  100. Persisted = consume(['#'], 0),
  101. ct:pal("Persisted = ~p", [Persisted]),
  102. ?assertEqual(
  103. lists:sort([M1, M2, M5, M7, M9, M10]),
  104. lists:sort([{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted])
  105. ),
  106. ok.
  107. t_messages_persisted_2(_Config) ->
  108. Prefix = atom_to_binary(?FUNCTION_NAME),
  109. C1 = connect(<<Prefix/binary, "1">>, _CleanStart0 = true, _EI0 = 30),
  110. CP = connect(<<Prefix/binary, "-pub">>, _CleanStart1 = true, _EI1 = undefined),
  111. T = fun(T0) -> <<Prefix/binary, T0/binary>> end,
  112. %% won't be persisted
  113. {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
  114. emqtt:publish(CP, T(<<"random/topic">>), <<"0">>, 1),
  115. {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
  116. emqtt:publish(CP, T(<<"client/1/topic">>), <<"1">>, 1),
  117. {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
  118. emqtt:publish(CP, T(<<"client/2/topic">>), <<"2">>, 1),
  119. {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(C1, T(<<"client/+/topic">>), qos1),
  120. {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
  121. emqtt:publish(CP, T(<<"random/topic">>), <<"3">>, 1),
  122. %% will be persisted
  123. {ok, #{reason_code := ?RC_SUCCESS}} =
  124. emqtt:publish(CP, T(<<"client/1/topic">>), <<"4">>, 1),
  125. {ok, #{reason_code := ?RC_SUCCESS}} =
  126. emqtt:publish(CP, T(<<"client/2/topic">>), <<"5">>, 1),
  127. {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C1, T(<<"client/+/topic">>)),
  128. %% won't be persisted
  129. {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
  130. emqtt:publish(CP, T(<<"random/topic">>), <<"6">>, 1),
  131. {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
  132. emqtt:publish(CP, T(<<"client/1/topic">>), <<"7">>, 1),
  133. {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
  134. emqtt:publish(CP, T(<<"client/2/topic">>), <<"8">>, 1),
  135. timer:sleep(2000),
  136. Persisted = consume(['#'], 0),
  137. ct:pal("Persisted = ~p", [Persisted]),
  138. ?assertEqual(
  139. lists:sort([
  140. {T(<<"client/1/topic">>), <<"4">>},
  141. {T(<<"client/2/topic">>), <<"5">>}
  142. ]),
  143. lists:sort([{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted])
  144. ),
  145. ok.
  146. %% TODO: test quic and ws too
  147. t_session_subscription_iterators(Config) ->
  148. [Node1, _Node2] = ?config(nodes, Config),
  149. Port = get_mqtt_port(Node1, tcp),
  150. Topic = <<"t/topic">>,
  151. SubTopicFilter = <<"t/+">>,
  152. AnotherTopic = <<"u/another-topic">>,
  153. ClientId = <<"myclientid">>,
  154. ?check_trace(
  155. begin
  156. [
  157. Payload1,
  158. Payload2,
  159. Payload3,
  160. Payload4
  161. ] = lists:map(
  162. fun(N) -> <<"hello", (integer_to_binary(N))/binary>> end,
  163. lists:seq(1, 4)
  164. ),
  165. ct:pal("starting"),
  166. Client = connect(#{
  167. clientid => ClientId,
  168. port => Port,
  169. properties => #{'Session-Expiry-Interval' => 300}
  170. }),
  171. ct:pal("publishing 1"),
  172. Message1 = emqx_message:make(Topic, Payload1),
  173. publish(Node1, Message1),
  174. ct:pal("subscribing 1"),
  175. {ok, _, [2]} = emqtt:subscribe(Client, SubTopicFilter, qos2),
  176. ct:pal("publishing 2"),
  177. Message2 = emqx_message:make(Topic, Payload2),
  178. publish(Node1, Message2),
  179. % TODO: no incoming publishes at the moment
  180. % [_] = receive_messages(1),
  181. ct:pal("subscribing 2"),
  182. {ok, _, [1]} = emqtt:subscribe(Client, SubTopicFilter, qos1),
  183. ct:pal("publishing 3"),
  184. Message3 = emqx_message:make(Topic, Payload3),
  185. publish(Node1, Message3),
  186. % [_] = receive_messages(1),
  187. ct:pal("publishing 4"),
  188. Message4 = emqx_message:make(AnotherTopic, Payload4),
  189. publish(Node1, Message4),
  190. emqtt:stop(Client),
  191. #{
  192. messages => [Message1, Message2, Message3, Message4]
  193. }
  194. end,
  195. []
  196. ),
  197. ok.
  198. t_qos0(_Config) ->
  199. Sub = connect(<<?MODULE_STRING "1">>, true, 30),
  200. Pub = connect(<<?MODULE_STRING "2">>, true, 0),
  201. try
  202. {ok, _, [1]} = emqtt:subscribe(Sub, <<"t/#">>, qos1),
  203. Messages = [
  204. {<<"t/1">>, <<"1">>, 0},
  205. {<<"t/1">>, <<"2">>, 1},
  206. {<<"t/1">>, <<"3">>, 0}
  207. ],
  208. [emqtt:publish(Pub, Topic, Payload, Qos) || {Topic, Payload, Qos} <- Messages],
  209. ?assertMatch(
  210. [
  211. #{qos := 0, topic := <<"t/1">>, payload := <<"1">>},
  212. #{qos := 1, topic := <<"t/1">>, payload := <<"2">>},
  213. #{qos := 0, topic := <<"t/1">>, payload := <<"3">>}
  214. ],
  215. receive_messages(3)
  216. )
  217. after
  218. emqtt:stop(Sub),
  219. emqtt:stop(Pub)
  220. end.
  221. t_qos0_only_many_streams(_Config) ->
  222. ClientId = <<?MODULE_STRING "_sub">>,
  223. Sub = connect(ClientId, true, 30),
  224. Pub = connect(<<?MODULE_STRING "_pub">>, true, 0),
  225. [ConnPid] = emqx_cm:lookup_channels(ClientId),
  226. try
  227. {ok, _, [1]} = emqtt:subscribe(Sub, <<"t/#">>, qos1),
  228. [
  229. emqtt:publish(Pub, Topic, Payload, ?QOS_0)
  230. || {Topic, Payload} <- [
  231. {<<"t/1">>, <<"foo">>},
  232. {<<"t/2">>, <<"bar">>},
  233. {<<"t/3">>, <<"baz">>}
  234. ]
  235. ],
  236. ?assertMatch(
  237. [_, _, _],
  238. receive_messages(3)
  239. ),
  240. Inflight0 = get_session_inflight(ConnPid),
  241. [
  242. emqtt:publish(Pub, Topic, Payload, ?QOS_0)
  243. || {Topic, Payload} <- [
  244. {<<"t/2">>, <<"foo">>},
  245. {<<"t/2">>, <<"bar">>},
  246. {<<"t/1">>, <<"baz">>}
  247. ]
  248. ],
  249. ?assertMatch(
  250. [_, _, _],
  251. receive_messages(3)
  252. ),
  253. [
  254. emqtt:publish(Pub, Topic, Payload, ?QOS_0)
  255. || {Topic, Payload} <- [
  256. {<<"t/3">>, <<"foo">>},
  257. {<<"t/3">>, <<"bar">>},
  258. {<<"t/2">>, <<"baz">>}
  259. ]
  260. ],
  261. ?assertMatch(
  262. [_, _, _],
  263. receive_messages(3)
  264. ),
  265. Inflight1 = get_session_inflight(ConnPid),
  266. %% TODO: Kinda stupid way to verify that the runtime state is not growing.
  267. ?assert(
  268. erlang:external_size(Inflight1) - erlang:external_size(Inflight0) < 16,
  269. Inflight1
  270. )
  271. after
  272. emqtt:stop(Sub),
  273. emqtt:stop(Pub)
  274. end.
  275. get_session_inflight(ConnPid) ->
  276. emqx_connection:info({channel, {session, inflight}}, sys:get_state(ConnPid)).
  277. t_publish_as_persistent(_Config) ->
  278. Sub = connect(<<?MODULE_STRING "1">>, true, 30),
  279. Pub = connect(<<?MODULE_STRING "2">>, true, 30),
  280. try
  281. {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Sub, <<"t/#">>, qos2),
  282. Messages = [
  283. {<<"t/1">>, <<"1">>, 0},
  284. {<<"t/1">>, <<"2">>, 1},
  285. {<<"t/1">>, <<"3">>, 2}
  286. ],
  287. [emqtt:publish(Pub, Topic, Payload, Qos) || {Topic, Payload, Qos} <- Messages],
  288. ?assertMatch(
  289. [
  290. #{qos := 0, topic := <<"t/1">>, payload := <<"1">>},
  291. #{qos := 1, topic := <<"t/1">>, payload := <<"2">>},
  292. #{qos := 2, topic := <<"t/1">>, payload := <<"3">>}
  293. ],
  294. receive_messages(3)
  295. )
  296. after
  297. emqtt:stop(Sub),
  298. emqtt:stop(Pub)
  299. end.
  300. t_publish_empty_topic_levels(_Config) ->
  301. Sub = connect(<<?MODULE_STRING "1">>, true, 30),
  302. Pub = connect(<<?MODULE_STRING "2">>, true, 30),
  303. try
  304. {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Sub, <<"t//+//#">>, qos1),
  305. Messages = [
  306. {<<"t//1">>, <<"1">>},
  307. {<<"t//1/">>, <<"2">>},
  308. {<<"t//2//">>, <<"3">>},
  309. {<<"t//2//foo">>, <<"4">>},
  310. {<<"t//2/foo">>, <<"5">>},
  311. {<<"t/3/bar">>, <<"6">>}
  312. ],
  313. [emqtt:publish(Pub, Topic, Payload, ?QOS_1) || {Topic, Payload} <- Messages],
  314. Received = receive_messages(length(Messages)),
  315. ?assertMatch(
  316. [
  317. #{topic := <<"t//1/">>, payload := <<"2">>},
  318. #{topic := <<"t//2//">>, payload := <<"3">>},
  319. #{topic := <<"t//2//foo">>, payload := <<"4">>}
  320. ],
  321. lists:sort(emqx_utils_maps:key_comparer(payload), Received)
  322. )
  323. after
  324. emqtt:stop(Sub),
  325. emqtt:stop(Pub)
  326. end.
  327. t_message_gc_too_young(_Config) ->
  328. %% Check that GC doesn't attempt to create a new generation if there are fresh enough
  329. %% generations around. The stability of this test relies on the default value for
  330. %% message retention being long enough. Currently, the default is 1 hour.
  331. ?check_trace(
  332. ok = emqx_persistent_message_ds_gc_worker:gc(),
  333. fun(Trace) ->
  334. ?assertMatch([_], ?of_kind(ps_message_gc_too_early, Trace)),
  335. ok
  336. end
  337. ),
  338. ok.
  339. t_message_gc(Config) ->
  340. %% Check that, after GC runs, a new generation is created, retaining messages, and
  341. %% older messages no longer are accessible.
  342. NShards = ?config(n_shards, Config),
  343. ?check_trace(
  344. #{timetrap => 10_000},
  345. begin
  346. %% ensure some messages are in the first generation
  347. ?force_ordering(
  348. #{?snk_kind := inserted_batch},
  349. #{?snk_kind := ps_message_gc_added_gen}
  350. ),
  351. Msgs0 = [
  352. message(<<"foo/bar">>, <<"1">>, 0),
  353. message(<<"foo/baz">>, <<"2">>, 1)
  354. ],
  355. ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs0, #{sync => true}),
  356. ?tp(inserted_batch, #{}),
  357. {ok, _} = ?block_until(#{?snk_kind := ps_message_gc_added_gen}),
  358. Now = emqx_message:timestamp_now(),
  359. Msgs1 = [
  360. message(<<"foo/bar">>, <<"3">>, Now + 100),
  361. message(<<"foo/baz">>, <<"4">>, Now + 101)
  362. ],
  363. ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs1, #{sync => true}),
  364. {ok, _} = snabbkaffe:block_until(
  365. ?match_n_events(NShards, #{?snk_kind := message_gc_generation_dropped}),
  366. infinity
  367. ),
  368. TopicFilter = emqx_topic:words(<<"#">>),
  369. StartTime = 0,
  370. Msgs = consume(TopicFilter, StartTime),
  371. %% "1" and "2" should have been GC'ed
  372. PresentMessages = sets:from_list(
  373. [emqx_message:payload(Msg) || Msg <- Msgs],
  374. [{version, 2}]
  375. ),
  376. ?assert(
  377. sets:is_empty(
  378. sets:intersection(
  379. PresentMessages,
  380. sets:from_list([<<"1">>, <<"2">>], [{version, 2}])
  381. )
  382. ),
  383. #{present_messages => PresentMessages}
  384. ),
  385. ok
  386. end,
  387. []
  388. ),
  389. ok.
  390. t_metrics_not_dropped(_Config) ->
  391. %% Asserts that, if only persisted sessions are subscribed to a topic being published
  392. %% to, we don't bump the `message.dropped' metric, nor we run the equivalent hook.
  393. Sub = connect(<<?MODULE_STRING "1">>, true, 30),
  394. on_exit(fun() -> emqtt:stop(Sub) end),
  395. Pub = connect(<<?MODULE_STRING "2">>, true, 30),
  396. on_exit(fun() -> emqtt:stop(Pub) end),
  397. Hookpoint = 'message.dropped',
  398. emqx_hooks:add(Hookpoint, {?MODULE, on_message_dropped, [self()]}, 1_000),
  399. on_exit(fun() -> emqx_hooks:del(Hookpoint, {?MODULE, on_message_dropped}) end),
  400. DroppedBefore = emqx_metrics:val('messages.dropped'),
  401. DroppedNoSubBefore = emqx_metrics:val('messages.dropped.no_subscribers'),
  402. {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Sub, <<"t/+">>, ?QOS_1),
  403. emqtt:publish(Pub, <<"t/ps">>, <<"payload">>, ?QOS_1),
  404. ?assertMatch([_], receive_messages(1)),
  405. DroppedAfter = emqx_metrics:val('messages.dropped'),
  406. DroppedNoSubAfter = emqx_metrics:val('messages.dropped.no_subscribers'),
  407. ?assertEqual(DroppedBefore, DroppedAfter),
  408. ?assertEqual(DroppedNoSubBefore, DroppedNoSubAfter),
  409. ok.
  410. t_replication_options(_Config) ->
  411. ?assertMatch(
  412. #{
  413. backend := builtin,
  414. replication_options := #{
  415. wal_max_size_bytes := 16000000,
  416. wal_max_batch_size := 1024,
  417. wal_write_strategy := o_sync,
  418. wal_sync_method := datasync,
  419. wal_compute_checksums := false,
  420. snapshot_interval := 64,
  421. resend_window := 60
  422. }
  423. },
  424. emqx_ds_replication_layer_meta:db_config(?PERSISTENT_MESSAGE_DB)
  425. ),
  426. ?assertMatch(
  427. #{
  428. wal_max_size_bytes := 16000000,
  429. wal_max_batch_size := 1024,
  430. wal_write_strategy := o_sync,
  431. wal_compute_checksums := false,
  432. wal_sync_method := datasync
  433. },
  434. ra_system:fetch(?PERSISTENT_MESSAGE_DB)
  435. ).
  436. %%
  437. connect(ClientId, CleanStart, EI) ->
  438. connect(#{
  439. clientid => ClientId,
  440. clean_start => CleanStart,
  441. properties => maps:from_list(
  442. [{'Session-Expiry-Interval', EI} || is_integer(EI)]
  443. )
  444. }).
  445. connect(Opts0 = #{}) ->
  446. Defaults = #{proto_ver => v5},
  447. Opts = maps:to_list(emqx_utils_maps:deep_merge(Defaults, Opts0)),
  448. {ok, Client} = emqtt:start_link(Opts),
  449. on_exit(fun() -> catch emqtt:stop(Client) end),
  450. {ok, _} = emqtt:connect(Client),
  451. Client.
  452. consume(TopicFilter, StartMS) ->
  453. Streams = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartMS),
  454. lists:flatmap(
  455. fun({_Rank, Stream}) ->
  456. {ok, It} = emqx_ds:make_iterator(?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartMS),
  457. consume(It)
  458. end,
  459. Streams
  460. ).
  461. consume(It) ->
  462. case emqx_ds:next(?PERSISTENT_MESSAGE_DB, It, 100) of
  463. {ok, _NIt, _Msgs = []} ->
  464. [];
  465. {ok, NIt, MsgsAndKeys} ->
  466. [Msg || {_DSKey, Msg} <- MsgsAndKeys] ++ consume(NIt);
  467. {ok, end_of_stream} ->
  468. []
  469. end.
  470. receive_messages(Count) ->
  471. receive_messages(Count, 10_000).
  472. receive_messages(Count, Timeout) ->
  473. lists:reverse(receive_messages(Count, [], Timeout)).
  474. receive_messages(0, Msgs, _Timeout) ->
  475. Msgs;
  476. receive_messages(Count, Msgs, Timeout) ->
  477. receive
  478. {publish, Msg} ->
  479. receive_messages(Count - 1, [Msg | Msgs], Timeout)
  480. after Timeout ->
  481. Msgs
  482. end.
  483. publish(Node, Message) ->
  484. erpc:call(Node, emqx, publish, [Message]).
  485. app_specs() ->
  486. app_specs(_Opts = #{}).
  487. app_specs(Opts) ->
  488. ExtraEMQXConf = maps:get(extra_emqx_conf, Opts, ""),
  489. [
  490. emqx_durable_storage,
  491. {emqx, "durable_sessions {enable = true}" ++ ExtraEMQXConf}
  492. ].
  493. cluster() ->
  494. ExtraConf = "\n durable_storage.messages.n_sites = 2",
  495. Spec = #{role => core, apps => app_specs(#{extra_emqx_conf => ExtraConf})},
  496. [
  497. {persistent_messages_SUITE1, Spec},
  498. {persistent_messages_SUITE2, Spec}
  499. ].
  500. wait_shards_online(Nodes = [Node | _]) ->
  501. NShards = erpc:call(Node, emqx_ds_replication_layer_meta, n_shards, [?PERSISTENT_MESSAGE_DB]),
  502. ?retry(500, 10, [?assertEqual(NShards, shards_online(N)) || N <- Nodes]).
  503. shards_online(Node) ->
  504. length(erpc:call(Node, emqx_ds_builtin_db_sup, which_shards, [?PERSISTENT_MESSAGE_DB])).
  505. get_mqtt_port(Node, Type) ->
  506. {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
  507. Port.
  508. message(Topic, Payload, PublishedAt) ->
  509. #message{
  510. topic = Topic,
  511. payload = Payload,
  512. timestamp = PublishedAt,
  513. id = emqx_guid:gen()
  514. }.
  515. on_message_dropped(#message{flags = #{sys := true}}, _Context, _Res, _TestPid) ->
  516. ok;
  517. on_message_dropped(Msg, Context, Res, TestPid) ->
  518. ErrCtx = #{msg => Msg, ctx => Context, res => Res},
  519. ct:pal("this hook should not be called.\n ~p", [ErrCtx]),
  520. exit(TestPid, {hookpoint_called, ErrCtx}).