emqx_ds_replication_SUITE.erl 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_ds_replication_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("emqx/include/emqx.hrl").
  20. -include_lib("common_test/include/ct.hrl").
  21. -include_lib("stdlib/include/assert.hrl").
  22. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  23. -define(DB, testdb).
  24. -define(ON(NODE, BODY),
  25. erpc:call(NODE, erlang, apply, [fun() -> BODY end, []])
  26. ).
  27. opts() ->
  28. opts(#{}).
  29. opts(Overrides) ->
  30. maps:merge(
  31. #{
  32. backend => builtin,
  33. %% storage => {emqx_ds_storage_reference, #{}},
  34. storage => {emqx_ds_storage_bitfield_lts, #{epoch_bits => 10}},
  35. n_shards => 16,
  36. n_sites => 1,
  37. replication_factor => 3,
  38. replication_options => #{
  39. wal_max_size_bytes => 64,
  40. wal_max_batch_size => 1024,
  41. snapshot_interval => 128
  42. }
  43. },
  44. Overrides
  45. ).
  46. appspec(emqx_durable_storage) ->
  47. {emqx_durable_storage, #{
  48. before_start => fun snabbkaffe:fix_ct_logging/0,
  49. override_env => [{egress_flush_interval, 1}]
  50. }}.
  51. t_replication_transfers_snapshots(init, Config) ->
  52. Apps = [appspec(emqx_durable_storage)],
  53. NodeSpecs = emqx_cth_cluster:mk_nodespecs(
  54. [
  55. {t_replication_transfers_snapshots1, #{apps => Apps}},
  56. {t_replication_transfers_snapshots2, #{apps => Apps}},
  57. {t_replication_transfers_snapshots3, #{apps => Apps}}
  58. ],
  59. #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
  60. ),
  61. Nodes = emqx_cth_cluster:start(NodeSpecs),
  62. [{nodes, Nodes}, {specs, NodeSpecs} | Config];
  63. t_replication_transfers_snapshots('end', Config) ->
  64. ok = emqx_cth_cluster:stop(?config(nodes, Config)).
  65. t_replication_transfers_snapshots(Config) ->
  66. NMsgs = 400,
  67. NClients = 5,
  68. {Stream, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages(
  69. ?FUNCTION_NAME, NClients, NMsgs
  70. ),
  71. Nodes = [Node, NodeOffline | _] = ?config(nodes, Config),
  72. _Specs = [_, SpecOffline | _] = ?config(specs, Config),
  73. ?check_trace(
  74. begin
  75. %% Initialize DB on all nodes and wait for it to be online.
  76. Opts = opts(#{n_shards => 1, n_sites => 3}),
  77. ?assertEqual(
  78. [{ok, ok} || _ <- Nodes],
  79. erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
  80. ),
  81. ?retry(
  82. 500,
  83. 10,
  84. ?assertMatch([[_], [_], [_]], [shards_online(N, ?DB) || N <- Nodes])
  85. ),
  86. %% Stop the DB on the "offline" node.
  87. ok = emqx_cth_cluster:stop_node(NodeOffline),
  88. %% Fill the storage with messages and few additional generations.
  89. emqx_ds_test_helpers:apply_stream(?DB, Nodes -- [NodeOffline], Stream),
  90. %% Restart the node.
  91. [NodeOffline] = emqx_cth_cluster:restart(SpecOffline),
  92. {ok, SRef} = snabbkaffe:subscribe(
  93. ?match_event(#{
  94. ?snk_kind := dsrepl_snapshot_accepted,
  95. ?snk_meta := #{node := NodeOffline}
  96. })
  97. ),
  98. ?assertEqual(
  99. ok,
  100. erpc:call(NodeOffline, emqx_ds, open_db, [?DB, opts()])
  101. ),
  102. %% Trigger storage operation and wait the replica to be restored.
  103. _ = add_generation(Node, ?DB),
  104. ?assertMatch(
  105. {ok, _},
  106. snabbkaffe:receive_events(SRef)
  107. ),
  108. %% Wait until any pending replication activities are finished (e.g. Raft log entries).
  109. ok = timer:sleep(3_000),
  110. %% Check that the DB has been restored:
  111. emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams)
  112. end,
  113. []
  114. ).
  115. t_rebalance(init, Config) ->
  116. Apps = [appspec(emqx_durable_storage)],
  117. Nodes = emqx_cth_cluster:start(
  118. [
  119. {t_rebalance1, #{apps => Apps}},
  120. {t_rebalance2, #{apps => Apps}},
  121. {t_rebalance3, #{apps => Apps}},
  122. {t_rebalance4, #{apps => Apps}}
  123. ],
  124. #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
  125. ),
  126. [{nodes, Nodes} | Config];
  127. t_rebalance('end', Config) ->
  128. ok = emqx_cth_cluster:stop(?config(nodes, Config)).
  129. %% This testcase verifies that the storage rebalancing works correctly:
  130. %% 1. Join/leave operations are applied successfully.
  131. %% 2. Message data survives the rebalancing.
  132. %% 3. Shard cluster membership converges to the target replica allocation.
  133. %% 4. Replication factor is respected.
  134. t_rebalance(Config) ->
  135. NMsgs = 50,
  136. NClients = 5,
  137. {Stream0, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages(
  138. ?FUNCTION_NAME, NClients, NMsgs
  139. ),
  140. Nodes = [N1, N2 | _] = ?config(nodes, Config),
  141. ?check_trace(
  142. #{timetrap => 30_000},
  143. begin
  144. %% 1. Initialize DB on the first node.
  145. Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}),
  146. ?assertEqual(ok, ?ON(N1, emqx_ds:open_db(?DB, Opts))),
  147. ?assertMatch(Shards when length(Shards) == 16, shards_online(N1, ?DB)),
  148. %% 1.1 Open DB on the rest of the nodes:
  149. [
  150. ?assertEqual(ok, ?ON(Node, emqx_ds:open_db(?DB, Opts)))
  151. || Node <- Nodes
  152. ],
  153. Sites = [S1, S2 | _] = [ds_repl_meta(N, this_site) || N <- Nodes],
  154. ct:pal("Sites: ~p~n", [Sites]),
  155. Sequence = [
  156. %% Join the second site to the DB replication sites:
  157. {N1, join_db_site, S2},
  158. %% Should be a no-op:
  159. {N2, join_db_site, S2},
  160. %% Now join the rest of the sites:
  161. {N2, assign_db_sites, Sites}
  162. ],
  163. Stream1 = emqx_utils_stream:interleave(
  164. [
  165. {50, Stream0},
  166. emqx_utils_stream:const(add_generation)
  167. ],
  168. false
  169. ),
  170. Stream = emqx_utils_stream:interleave(
  171. [
  172. {50, Stream0},
  173. emqx_utils_stream:list(Sequence)
  174. ],
  175. true
  176. ),
  177. %% 1.2 Verify that all nodes have the same view of metadata storage:
  178. [
  179. ?defer_assert(
  180. ?assertEqual(
  181. [S1],
  182. ?ON(Node, emqx_ds_replication_layer_meta:db_sites(?DB)),
  183. #{
  184. msg => "Initially, only S1 should be responsible for all shards",
  185. node => Node
  186. }
  187. )
  188. )
  189. || Node <- Nodes
  190. ],
  191. %% 2. Start filling the storage:
  192. emqx_ds_test_helpers:apply_stream(?DB, Nodes, Stream),
  193. timer:sleep(5000),
  194. emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams),
  195. [
  196. ?defer_assert(
  197. ?assertEqual(
  198. 16 * 3 div length(Nodes),
  199. n_shards_online(Node, ?DB),
  200. "Each node is now responsible for 3/4 of the shards"
  201. )
  202. )
  203. || Node <- Nodes
  204. ],
  205. %% Verify that the set of shard servers matches the target allocation.
  206. Allocation = [ds_repl_meta(N, my_shards, [?DB]) || N <- Nodes],
  207. ShardServers = [
  208. shard_server_info(N, ?DB, Shard, Site, readiness)
  209. || {N, Site, Shards} <- lists:zip3(Nodes, Sites, Allocation),
  210. Shard <- Shards
  211. ],
  212. ?assert(
  213. lists:all(fun({_Server, Status}) -> Status == ready end, ShardServers),
  214. ShardServers
  215. ),
  216. %% Scale down the cluster by removing the first node.
  217. ?assertMatch({ok, _}, ds_repl_meta(N1, leave_db_site, [?DB, S1])),
  218. ct:pal("Transitions (~p -> ~p): ~p~n", [
  219. Sites, tl(Sites), emqx_ds_test_helpers:transitions(N1, ?DB)
  220. ]),
  221. ?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N2, ?DB))),
  222. %% Verify that at the end each node is now responsible for each shard.
  223. ?defer_assert(
  224. ?assertEqual(
  225. [0, 16, 16, 16],
  226. [n_shards_online(N, ?DB) || N <- Nodes]
  227. )
  228. ),
  229. %% Verify that the messages are once again preserved after the rebalance:
  230. emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams)
  231. end,
  232. []
  233. ).
  234. t_join_leave_errors(init, Config) ->
  235. Apps = [appspec(emqx_durable_storage)],
  236. Nodes = emqx_cth_cluster:start(
  237. [
  238. {t_join_leave_errors1, #{apps => Apps}},
  239. {t_join_leave_errors2, #{apps => Apps}}
  240. ],
  241. #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
  242. ),
  243. [{nodes, Nodes} | Config];
  244. t_join_leave_errors('end', Config) ->
  245. ok = emqx_cth_cluster:stop(?config(nodes, Config)).
  246. t_join_leave_errors(Config) ->
  247. %% This testcase verifies that logical errors arising during handling of
  248. %% join/leave operations are reported correctly.
  249. [N1, N2] = ?config(nodes, Config),
  250. Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}),
  251. ?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?DB, Opts])),
  252. ?assertEqual(ok, erpc:call(N2, emqx_ds, open_db, [?DB, Opts])),
  253. [S1, S2] = [ds_repl_meta(N, this_site) || N <- [N1, N2]],
  254. ?assertEqual([S1], ds_repl_meta(N1, db_sites, [?DB])),
  255. %% Attempts to join a nonexistent DB / site.
  256. ?assertEqual(
  257. {error, {nonexistent_db, boo}},
  258. ds_repl_meta(N1, join_db_site, [_DB = boo, S1])
  259. ),
  260. ?assertEqual(
  261. {error, {nonexistent_sites, [<<"NO-MANS-SITE">>]}},
  262. ds_repl_meta(N1, join_db_site, [?DB, <<"NO-MANS-SITE">>])
  263. ),
  264. %% NOTE: Leaving a non-existent site is not an error.
  265. ?assertEqual(
  266. {ok, unchanged},
  267. ds_repl_meta(N1, leave_db_site, [?DB, <<"NO-MANS-SITE">>])
  268. ),
  269. %% Should be no-op.
  270. ?assertEqual({ok, unchanged}, ds_repl_meta(N1, join_db_site, [?DB, S1])),
  271. ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)),
  272. %% Impossible to leave the last site.
  273. ?assertEqual(
  274. {error, {too_few_sites, []}},
  275. ds_repl_meta(N1, leave_db_site, [?DB, S1])
  276. ),
  277. %% "Move" the DB to the other node.
  278. ?assertMatch({ok, _}, ds_repl_meta(N1, join_db_site, [?DB, S2])),
  279. ?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
  280. ?assertMatch([_ | _], emqx_ds_test_helpers:transitions(N1, ?DB)),
  281. ?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
  282. %% Should be no-op.
  283. ?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
  284. ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)).
  285. t_rebalance_chaotic_converges(init, Config) ->
  286. Apps = [appspec(emqx_durable_storage)],
  287. Nodes = emqx_cth_cluster:start(
  288. [
  289. {t_rebalance_chaotic_converges1, #{apps => Apps}},
  290. {t_rebalance_chaotic_converges2, #{apps => Apps}},
  291. {t_rebalance_chaotic_converges3, #{apps => Apps}}
  292. ],
  293. #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
  294. ),
  295. [{nodes, Nodes} | Config];
  296. t_rebalance_chaotic_converges('end', Config) ->
  297. ok = emqx_cth_cluster:stop(?config(nodes, Config)).
  298. t_rebalance_chaotic_converges(Config) ->
  299. %% This testcase verifies that even a very chaotic sequence of join/leave
  300. %% operations will still be handled consistently, and that the shard
  301. %% allocation will converge to the target state.
  302. NMsgs = 500,
  303. Nodes = [N1, N2, N3] = ?config(nodes, Config),
  304. NClients = 5,
  305. {Stream0, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages(
  306. ?FUNCTION_NAME, NClients, NMsgs
  307. ),
  308. ?check_trace(
  309. #{},
  310. begin
  311. %% Initialize DB on first two nodes.
  312. Opts = opts(#{n_shards => 16, n_sites => 2, replication_factor => 3}),
  313. ?assertEqual(
  314. [{ok, ok}, {ok, ok}],
  315. erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts])
  316. ),
  317. %% Open DB on the last node.
  318. ?assertEqual(
  319. ok,
  320. erpc:call(N3, emqx_ds, open_db, [?DB, Opts])
  321. ),
  322. %% Find out which sites there are.
  323. Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes],
  324. ct:pal("Sites: ~p~n", [Sites]),
  325. Sequence = [
  326. {N1, join_db_site, S3},
  327. {N2, leave_db_site, S2},
  328. {N3, leave_db_site, S1},
  329. {N1, join_db_site, S2},
  330. {N2, join_db_site, S1},
  331. {N3, leave_db_site, S3},
  332. {N1, leave_db_site, S1},
  333. {N2, join_db_site, S3}
  334. ],
  335. %% Interleaved list of events:
  336. Stream = emqx_utils_stream:interleave(
  337. [
  338. {50, Stream0},
  339. emqx_utils_stream:list(Sequence)
  340. ],
  341. true
  342. ),
  343. ?retry(500, 10, ?assertEqual([16, 16], [n_shards_online(N, ?DB) || N <- [N1, N2]])),
  344. ?assertEqual(
  345. lists:sort([S1, S2]),
  346. ds_repl_meta(N1, db_sites, [?DB]),
  347. "Initially, the DB is assigned to [S1, S2]"
  348. ),
  349. emqx_ds_test_helpers:apply_stream(?DB, Nodes, Stream),
  350. %% Wait for the last transition to complete.
  351. ?retry(500, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
  352. ?defer_assert(
  353. ?assertEqual(
  354. lists:sort([S2, S3]),
  355. ds_repl_meta(N1, db_sites, [?DB])
  356. )
  357. ),
  358. %% Wait until the LTS timestamp is updated:
  359. timer:sleep(5000),
  360. %% Check that all messages are still there.
  361. emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams)
  362. end,
  363. []
  364. ).
  365. t_rebalance_offline_restarts(init, Config) ->
  366. Apps = [appspec(emqx_durable_storage)],
  367. Specs = emqx_cth_cluster:mk_nodespecs(
  368. [
  369. {t_rebalance_offline_restarts1, #{apps => Apps}},
  370. {t_rebalance_offline_restarts2, #{apps => Apps}},
  371. {t_rebalance_offline_restarts3, #{apps => Apps}}
  372. ],
  373. #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
  374. ),
  375. Nodes = emqx_cth_cluster:start(Specs),
  376. [{nodes, Nodes}, {nodespecs, Specs} | Config];
  377. t_rebalance_offline_restarts('end', Config) ->
  378. ok = emqx_cth_cluster:stop(?config(nodes, Config)).
  379. t_rebalance_offline_restarts(Config) ->
  380. %% This testcase verifies that rebalancing progresses if nodes restart or
  381. %% go offline and never come back.
  382. Nodes = [N1, N2, N3] = ?config(nodes, Config),
  383. _Specs = [NS1, NS2, _] = ?config(nodespecs, Config),
  384. %% Initialize DB on all 3 nodes.
  385. Opts = opts(#{n_shards => 8, n_sites => 3, replication_factor => 3}),
  386. ?assertEqual(
  387. [{ok, ok} || _ <- Nodes],
  388. erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
  389. ),
  390. ?retry(
  391. 1000,
  392. 5,
  393. ?assertEqual([8 || _ <- Nodes], [n_shards_online(N, ?DB) || N <- Nodes])
  394. ),
  395. %% Find out which sites are there.
  396. Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes],
  397. ct:pal("Sites: ~p~n", [Sites]),
  398. %% Shut down N3 and then remove it from the DB.
  399. ok = emqx_cth_cluster:stop_node(N3),
  400. ?assertMatch({ok, _}, ds_repl_meta(N1, leave_db_site, [?DB, S3])),
  401. Transitions = emqx_ds_test_helpers:transitions(N1, ?DB),
  402. ct:pal("Transitions: ~p~n", [Transitions]),
  403. %% Wait until at least one transition completes.
  404. ?block_until(#{?snk_kind := dsrepl_shard_transition_end}),
  405. %% Restart N1 and N2.
  406. [N1] = emqx_cth_cluster:restart(NS1),
  407. [N2] = emqx_cth_cluster:restart(NS2),
  408. ?assertEqual(
  409. [{ok, ok}, {ok, ok}],
  410. erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts])
  411. ),
  412. %% Target state should still be reached eventually.
  413. ?retry(1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
  414. ?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])).
  415. %%
  416. shard_server_info(Node, DB, Shard, Site, Info) ->
  417. Server = shard_server(Node, DB, Shard, Site),
  418. {Server, ds_repl_shard(Node, server_info, [Info, Server])}.
  419. shard_server(Node, DB, Shard, Site) ->
  420. ds_repl_shard(Node, shard_server, [DB, Shard, Site]).
  421. ds_repl_meta(Node, Fun) ->
  422. ds_repl_meta(Node, Fun, []).
  423. ds_repl_meta(Node, Fun, Args) ->
  424. try
  425. erpc:call(Node, emqx_ds_replication_layer_meta, Fun, Args)
  426. catch
  427. EC:Err:Stack ->
  428. ct:pal("emqx_ds_replication_layer_meta:~p(~p) @~p failed:~n~p:~p~nStack: ~p", [
  429. Fun, Args, Node, EC, Err, Stack
  430. ]),
  431. error(meta_op_failed)
  432. end.
  433. ds_repl_shard(Node, Fun, Args) ->
  434. erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args).
  435. shards(Node, DB) ->
  436. erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]).
  437. shards_online(Node, DB) ->
  438. erpc:call(Node, emqx_ds_builtin_db_sup, which_shards, [DB]).
  439. n_shards_online(Node, DB) ->
  440. length(shards_online(Node, DB)).
  441. add_generation(Node, DB) ->
  442. ok = erpc:call(Node, emqx_ds, add_generation, [DB]),
  443. [].
  444. message(ClientId, Topic, Payload, PublishedAt) ->
  445. #message{
  446. from = ClientId,
  447. topic = Topic,
  448. payload = Payload,
  449. timestamp = PublishedAt,
  450. id = emqx_guid:gen()
  451. }.
  452. compare_message(M1, M2) ->
  453. {M1#message.from, M1#message.timestamp} < {M2#message.from, M2#message.timestamp}.
  454. consume(Node, DB, TopicFilter, StartTime) ->
  455. erpc:call(Node, emqx_ds_test_helpers, consume, [DB, TopicFilter, StartTime]).
  456. consume_shard(Node, DB, Shard, TopicFilter, StartTime) ->
  457. erpc:call(Node, emqx_ds_test_helpers, storage_consume, [{DB, Shard}, TopicFilter, StartTime]).
  458. probably(P, Fun) ->
  459. case rand:uniform() of
  460. X when X < P -> Fun();
  461. _ -> []
  462. end.
  463. sample(N, List) ->
  464. L = length(List),
  465. case L =< N of
  466. true ->
  467. L;
  468. false ->
  469. H = N div 2,
  470. Filler = integer_to_list(L - N) ++ " more",
  471. lists:sublist(List, H) ++ [Filler] ++ lists:sublist(List, L - H, L)
  472. end.
  473. %%
  474. suite() -> [{timetrap, {seconds, 60}}].
  475. all() -> emqx_common_test_helpers:all(?MODULE).
  476. init_per_testcase(TCName, Config0) ->
  477. Config = emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config0),
  478. ok = snabbkaffe:start_trace(),
  479. Config.
  480. end_per_testcase(TCName, Config) ->
  481. ok = snabbkaffe:stop(),
  482. emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config).