Browse Source

Merge pull request #11574 from thalesmg/ds-topic-filter-m-20230907

feat(ds): add filter for message persistence
Thales Macedo Garitezi 2 years ago
parent
commit
f2346b2e9a

+ 1 - 0
apps/emqx/include/emqx.hrl

@@ -23,6 +23,7 @@
 -define(SHARED_SUB_SHARD, emqx_shared_sub_shard).
 -define(CM_SHARD, emqx_cm_shard).
 -define(ROUTE_SHARD, route_shard).
+-define(PS_ROUTER_SHARD, persistent_session_router_shard).
 
 %% Banner
 %%--------------------------------------------------------------------

+ 72 - 41
apps/emqx/integration_test/emqx_ds_SUITE.erl

@@ -14,6 +14,8 @@
 -define(DS_SHARD, <<"local">>).
 -define(ITERATOR_REF_TAB, emqx_ds_iterator_ref).
 
+-import(emqx_common_test_helpers, [on_exit/1]).
+
 %%------------------------------------------------------------------------------
 %% CT boilerplate
 %%------------------------------------------------------------------------------
@@ -56,9 +58,11 @@ end_per_testcase(TestCase, Config) when
     TestCase =:= t_session_unsubscription_idempotency
 ->
     Nodes = ?config(nodes, Config),
+    emqx_common_test_helpers:call_janitor(60_000),
     ok = emqx_cth_cluster:stop(Nodes),
     ok;
 end_per_testcase(_TestCase, _Config) ->
+    emqx_common_test_helpers:call_janitor(60_000),
     ok.
 
 %%------------------------------------------------------------------------------
@@ -87,9 +91,6 @@ app_specs() ->
     [
         emqx_durable_storage,
         {emqx, #{
-            before_start => fun() ->
-                emqx_app:set_config_loader(?MODULE)
-            end,
             config => #{persistent_session_store => #{ds => true}},
             override_env => [{boot_modules, [broker, listeners]}]
         }}
@@ -124,10 +125,56 @@ wait_gen_rpc_down(_NodeSpec = #{apps := Apps}) ->
         false = emqx_common_test_helpers:is_tcp_server_available("127.0.0.1", Port)
     ).
 
+start_client(Opts0 = #{}) ->
+    Defaults = #{
+        proto_ver => v5,
+        properties => #{'Session-Expiry-Interval' => 300}
+    },
+    Opts = maps:to_list(emqx_utils_maps:deep_merge(Defaults, Opts0)),
+    {ok, Client} = emqtt:start_link(Opts),
+    on_exit(fun() -> catch emqtt:stop(Client) end),
+    Client.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
 
+t_non_persistent_session_subscription(_Config) ->
+    ClientId = atom_to_binary(?FUNCTION_NAME),
+    SubTopicFilter = <<"t/#">>,
+    ?check_trace(
+        begin
+            ?tp(notice, "starting", #{}),
+            Client = start_client(#{
+                clientid => ClientId,
+                properties => #{'Session-Expiry-Interval' => 0}
+            }),
+            {ok, _} = emqtt:connect(Client),
+            ?tp(notice, "subscribing", #{}),
+            {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client, SubTopicFilter, qos2),
+            IteratorRefs = get_all_iterator_refs(node()),
+            IteratorIds = get_all_iterator_ids(node()),
+
+            ok = emqtt:stop(Client),
+
+            #{
+                iterator_refs => IteratorRefs,
+                iterator_ids => IteratorIds
+            }
+        end,
+        fun(Res, Trace) ->
+            ct:pal("trace:\n  ~p", [Trace]),
+            #{
+                iterator_refs := IteratorRefs,
+                iterator_ids := IteratorIds
+            } = Res,
+            ?assertEqual([], IteratorRefs),
+            ?assertEqual({ok, []}, IteratorIds),
+            ok
+        end
+    ),
+    ok.
+
 t_session_subscription_idempotency(Config) ->
     [Node1Spec | _] = ?config(node_specs, Config),
     [Node1] = ?config(nodes, Config),
@@ -151,7 +198,7 @@ t_session_subscription_idempotency(Config) ->
 
             spawn_link(fun() ->
                 ?tp(will_restart_node, #{}),
-                ct:pal("restarting node ~p", [Node1]),
+                ?tp(notice, "restarting node", #{node => Node1}),
                 true = monitor_node(Node1, true),
                 ok = erpc:call(Node1, init, restart, []),
                 receive
@@ -160,10 +207,10 @@ t_session_subscription_idempotency(Config) ->
                 after 10_000 ->
                     ct:fail("node ~p didn't stop", [Node1])
                 end,
-                ct:pal("waiting for nodeup ~p", [Node1]),
+                ?tp(notice, "waiting for nodeup", #{node => Node1}),
                 wait_nodeup(Node1),
                 wait_gen_rpc_down(Node1Spec),
-                ct:pal("restarting apps on ~p", [Node1]),
+                ?tp(notice, "restarting apps", #{node => Node1}),
                 Apps = maps:get(apps, Node1Spec),
                 ok = erpc:call(Node1, emqx_cth_suite, load_apps, [Apps]),
                 _ = erpc:call(Node1, emqx_cth_suite, start_apps, [Apps, Node1Spec]),
@@ -171,19 +218,15 @@ t_session_subscription_idempotency(Config) ->
                 %% end....
                 ok = emqx_cth_cluster:set_node_opts(Node1, Node1Spec),
                 ok = snabbkaffe:forward_trace(Node1),
-                ct:pal("node ~p restarted", [Node1]),
+                ?tp(notice, "node restarted", #{node => Node1}),
                 ?tp(restarted_node, #{}),
                 ok
             end),
 
-            ct:pal("starting 1"),
-            {ok, Client0} = emqtt:start_link([
-                {port, Port},
-                {clientid, ClientId},
-                {proto_ver, v5}
-            ]),
+            ?tp(notice, "starting 1", #{}),
+            Client0 = start_client(#{port => Port, clientid => ClientId}),
             {ok, _} = emqtt:connect(Client0),
-            ct:pal("subscribing 1"),
+            ?tp(notice, "subscribing 1", #{}),
             process_flag(trap_exit, true),
             catch emqtt:subscribe(Client0, SubTopicFilter, qos2),
             receive
@@ -194,14 +237,10 @@ t_session_subscription_idempotency(Config) ->
             process_flag(trap_exit, false),
 
             {ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000),
-            ct:pal("starting 2"),
-            {ok, Client1} = emqtt:start_link([
-                {port, Port},
-                {clientid, ClientId},
-                {proto_ver, v5}
-            ]),
+            ?tp(notice, "starting 2", #{}),
+            Client1 = start_client(#{port => Port, clientid => ClientId}),
             {ok, _} = emqtt:connect(Client1),
-            ct:pal("subscribing 2"),
+            ?tp(notice, "subscribing 2", #{}),
             {ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2),
 
             ok = emqtt:stop(Client1),
@@ -247,7 +286,7 @@ t_session_unsubscription_idempotency(Config) ->
 
             spawn_link(fun() ->
                 ?tp(will_restart_node, #{}),
-                ct:pal("restarting node ~p", [Node1]),
+                ?tp(notice, "restarting node", #{node => Node1}),
                 true = monitor_node(Node1, true),
                 ok = erpc:call(Node1, init, restart, []),
                 receive
@@ -256,10 +295,10 @@ t_session_unsubscription_idempotency(Config) ->
                 after 10_000 ->
                     ct:fail("node ~p didn't stop", [Node1])
                 end,
-                ct:pal("waiting for nodeup ~p", [Node1]),
+                ?tp(notice, "waiting for nodeup", #{node => Node1}),
                 wait_nodeup(Node1),
                 wait_gen_rpc_down(Node1Spec),
-                ct:pal("restarting apps on ~p", [Node1]),
+                ?tp(notice, "restarting apps", #{node => Node1}),
                 Apps = maps:get(apps, Node1Spec),
                 ok = erpc:call(Node1, emqx_cth_suite, load_apps, [Apps]),
                 _ = erpc:call(Node1, emqx_cth_suite, start_apps, [Apps, Node1Spec]),
@@ -267,21 +306,17 @@ t_session_unsubscription_idempotency(Config) ->
                 %% end....
                 ok = emqx_cth_cluster:set_node_opts(Node1, Node1Spec),
                 ok = snabbkaffe:forward_trace(Node1),
-                ct:pal("node ~p restarted", [Node1]),
+                ?tp(notice, "node restarted", #{node => Node1}),
                 ?tp(restarted_node, #{}),
                 ok
             end),
 
-            ct:pal("starting 1"),
-            {ok, Client0} = emqtt:start_link([
-                {port, Port},
-                {clientid, ClientId},
-                {proto_ver, v5}
-            ]),
+            ?tp(notice, "starting 1", #{}),
+            Client0 = start_client(#{port => Port, clientid => ClientId}),
             {ok, _} = emqtt:connect(Client0),
-            ct:pal("subscribing 1"),
+            ?tp(notice, "subscribing 1", #{}),
             {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, SubTopicFilter, qos2),
-            ct:pal("unsubscribing 1"),
+            ?tp(notice, "unsubscribing 1", #{}),
             process_flag(trap_exit, true),
             catch emqtt:unsubscribe(Client0, SubTopicFilter),
             receive
@@ -292,16 +327,12 @@ t_session_unsubscription_idempotency(Config) ->
             process_flag(trap_exit, false),
 
             {ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000),
-            ct:pal("starting 2"),
-            {ok, Client1} = emqtt:start_link([
-                {port, Port},
-                {clientid, ClientId},
-                {proto_ver, v5}
-            ]),
+            ?tp(notice, "starting 2", #{}),
+            Client1 = start_client(#{port => Port, clientid => ClientId}),
             {ok, _} = emqtt:connect(Client1),
-            ct:pal("subscribing 2"),
+            ?tp(notice, "subscribing 2", #{}),
             {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2),
-            ct:pal("unsubscribing 2"),
+            ?tp(notice, "unsubscribing 2", #{}),
             {{ok, _, [?RC_SUCCESS]}, {ok, _}} =
                 ?wait_async_action(
                     emqtt:unsubscribe(Client1, SubTopicFilter),

+ 14 - 8
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -16,6 +16,7 @@
 
 -module(emqx_persistent_session_ds).
 
+-include("emqx.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -export([init/0]).
@@ -56,11 +57,13 @@
 %%
 
 init() ->
-    ?WHEN_ENABLED(
+    ?WHEN_ENABLED(begin
         ok = emqx_ds:ensure_shard(?DS_SHARD, #{
             dir => filename:join([emqx:data_dir(), ds, messages, ?DS_SHARD])
-        })
-    ).
+        }),
+        ok = emqx_persistent_session_ds_router:init_tables(),
+        ok
+    end).
 
 %%
 
@@ -71,8 +74,8 @@ persist_message(Msg) ->
         case needs_persistence(Msg) andalso find_subscribers(Msg) of
             [_ | _] ->
                 store_message(Msg);
-            % [] ->
-            %     {skipped, no_subscribers};
+            [] ->
+                {skipped, no_subscribers};
             false ->
                 {skipped, needs_no_persistence}
         end
@@ -87,8 +90,8 @@ store_message(Msg) ->
     Topic = emqx_topic:words(emqx_message:topic(Msg)),
     emqx_ds_storage_layer:store(?DS_SHARD, ID, Timestamp, Topic, serialize_message(Msg)).
 
-find_subscribers(_Msg) ->
-    [node()].
+find_subscribers(#message{topic = Topic}) ->
+    emqx_persistent_session_ds_router:match_routes(Topic).
 
 open_session(ClientID) ->
     ?WHEN_ENABLED(emqx_ds:session_open(ClientID)).
@@ -98,6 +101,7 @@ open_session(ClientID) ->
 add_subscription(TopicFilterBin, DSSessionID) ->
     ?WHEN_ENABLED(
         begin
+            ok = emqx_persistent_session_ds_router:do_add_route(TopicFilterBin, DSSessionID),
             TopicFilter = emqx_topic:words(TopicFilterBin),
             {ok, IteratorID, StartMS, IsNew} = emqx_ds:session_add_iterator(
                 DSSessionID, TopicFilter
@@ -160,7 +164,9 @@ del_subscription(TopicFilterBin, DSSessionID) ->
                 persistent_session_ds_iterator_delete,
                 #{},
                 emqx_ds:session_del_iterator(DSSessionID, TopicFilter)
-            )
+            ),
+            ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilterBin, DSSessionID),
+            ok
         end
     ).
 

+ 31 - 0
apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl

@@ -0,0 +1,31 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-ifndef(EMQX_PS_DS_HRL).
+-define(EMQX_PS_DS_HRL, true).
+
+-define(PS_ROUTER_TAB, emqx_ds_ps_router).
+-define(PS_FILTERS_TAB, emqx_ds_ps_filters).
+
+-record(ps_route, {
+    topic :: binary(),
+    dest :: emqx_ds:session_id()
+}).
+-record(ps_routeidx, {
+    entry :: emqx_topic_index:key(emqx_persistent_session_ds_router:dest()),
+    unused = [] :: nil()
+}).
+
+-endif.

+ 230 - 0
apps/emqx/src/emqx_persistent_session_ds_router.erl

@@ -0,0 +1,230 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_persistent_session_ds_router).
+
+-include("emqx.hrl").
+-include("emqx_persistent_session_ds/emqx_ps_ds_int.hrl").
+
+-export([init_tables/0]).
+
+%% Route APIs
+-export([
+    do_add_route/2,
+    do_delete_route/2,
+    match_routes/1,
+    lookup_routes/1,
+    foldr_routes/2,
+    foldl_routes/2
+]).
+
+-export([cleanup_routes/1]).
+-export([print_routes/1]).
+-export([topics/0]).
+
+-ifdef(TEST).
+-export([has_route/2]).
+-endif.
+
+-type dest() :: emqx_ds:session_id().
+
+-export_type([dest/0]).
+
+%%--------------------------------------------------------------------
+%% Table Initialization
+%%--------------------------------------------------------------------
+
+init_tables() ->
+    mria_config:set_dirty_shard(?PS_ROUTER_SHARD, true),
+    ok = mria:create_table(?PS_ROUTER_TAB, [
+        {type, bag},
+        {rlog_shard, ?PS_ROUTER_SHARD},
+        {storage, ram_copies},
+        {record_name, ps_route},
+        {attributes, record_info(fields, ps_route)},
+        {storage_properties, [
+            {ets, [
+                {read_concurrency, true},
+                {write_concurrency, true}
+            ]}
+        ]}
+    ]),
+    ok = mria:create_table(?PS_FILTERS_TAB, [
+        {type, ordered_set},
+        {rlog_shard, ?PS_ROUTER_SHARD},
+        {storage, ram_copies},
+        {record_name, ps_routeidx},
+        {attributes, record_info(fields, ps_routeidx)},
+        {storage_properties, [
+            {ets, [
+                {read_concurrency, true},
+                {write_concurrency, auto}
+            ]}
+        ]}
+    ]),
+    ok = mria:wait_for_tables([?PS_ROUTER_TAB, ?PS_FILTERS_TAB]),
+    ok.
+
+%%--------------------------------------------------------------------
+%% Route APIs
+%%--------------------------------------------------------------------
+
+-spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
+do_add_route(Topic, Dest) when is_binary(Topic) ->
+    case has_route(Topic, Dest) of
+        true ->
+            ok;
+        false ->
+            mria_insert_route(Topic, Dest)
+    end.
+
+-spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
+do_delete_route(Topic, Dest) ->
+    case emqx_trie_search:filter(Topic) of
+        Words when is_list(Words) ->
+            K = emqx_topic_index:make_key(Words, Dest),
+            mria:dirty_delete(?PS_FILTERS_TAB, K);
+        false ->
+            mria_route_tab_delete(#ps_route{topic = Topic, dest = Dest})
+    end.
+
+%% @doc Take a real topic (not filter) as input, return the matching topics and topic
+%% filters associated with route destination.
+-spec match_routes(emqx_types:topic()) -> [emqx_types:route()].
+match_routes(Topic) when is_binary(Topic) ->
+    lookup_route_tab(Topic) ++
+        [match_to_route(M) || M <- match_filters(Topic)].
+
+%% @doc Take a topic or filter as input, and return the existing routes with exactly
+%% this topic or filter.
+-spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()].
+lookup_routes(Topic) ->
+    case emqx_topic:wildcard(Topic) of
+        true ->
+            Pat = #ps_routeidx{entry = emqx_topic_index:make_key(Topic, '$1')},
+            [Dest || [Dest] <- ets:match(?PS_FILTERS_TAB, Pat)];
+        false ->
+            lookup_route_tab(Topic)
+    end.
+
+-spec has_route(emqx_types:topic(), dest()) -> boolean().
+has_route(Topic, Dest) ->
+    case emqx_topic:wildcard(Topic) of
+        true ->
+            ets:member(?PS_FILTERS_TAB, emqx_topic_index:make_key(Topic, Dest));
+        false ->
+            has_route_tab_entry(Topic, Dest)
+    end.
+
+-spec topics() -> list(emqx_types:topic()).
+topics() ->
+    Pat = #ps_routeidx{entry = '$1'},
+    Filters = [emqx_topic_index:get_topic(K) || [K] <- ets:match(?PS_FILTERS_TAB, Pat)],
+    list_route_tab_topics() ++ Filters.
+
+%% @doc Print routes to a topic
+-spec print_routes(emqx_types:topic()) -> ok.
+print_routes(Topic) ->
+    lists:foreach(
+        fun(#ps_route{topic = To, dest = Dest}) ->
+            io:format("~ts -> ~ts~n", [To, Dest])
+        end,
+        match_routes(Topic)
+    ).
+
+-spec cleanup_routes(emqx_ds:session_id()) -> ok.
+cleanup_routes(DSSessionId) ->
+    %% NOTE
+    %% No point in transaction here because all the operations on filters table are dirty.
+    ok = ets:foldl(
+        fun(#ps_routeidx{entry = K}, ok) ->
+            case get_dest_session_id(emqx_topic_index:get_id(K)) of
+                DSSessionId ->
+                    mria:dirty_delete(?PS_FILTERS_TAB, K);
+                _ ->
+                    ok
+            end
+        end,
+        ok,
+        ?PS_FILTERS_TAB
+    ),
+    ok = ets:foldl(
+        fun(#ps_route{dest = Dest} = Route, ok) ->
+            case get_dest_session_id(Dest) of
+                DSSessionId ->
+                    mria:dirty_delete_object(?PS_ROUTER_TAB, Route);
+                _ ->
+                    ok
+            end
+        end,
+        ok,
+        ?PS_ROUTER_TAB
+    ).
+
+-spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
+foldl_routes(FoldFun, AccIn) ->
+    fold_routes(foldl, FoldFun, AccIn).
+
+-spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
+foldr_routes(FoldFun, AccIn) ->
+    fold_routes(foldr, FoldFun, AccIn).
+
+%%--------------------------------------------------------------------
+%% Internal fns
+%%--------------------------------------------------------------------
+
+mria_insert_route(Topic, Dest) ->
+    case emqx_trie_search:filter(Topic) of
+        Words when is_list(Words) ->
+            K = emqx_topic_index:make_key(Words, Dest),
+            mria:dirty_write(?PS_FILTERS_TAB, #ps_routeidx{entry = K});
+        false ->
+            mria_route_tab_insert(#ps_route{topic = Topic, dest = Dest})
+    end.
+
+fold_routes(FunName, FoldFun, AccIn) ->
+    FilterFoldFun = mk_filtertab_fold_fun(FoldFun),
+    Acc = ets:FunName(FoldFun, AccIn, ?PS_ROUTER_TAB),
+    ets:FunName(FilterFoldFun, Acc, ?PS_FILTERS_TAB).
+
+mk_filtertab_fold_fun(FoldFun) ->
+    fun(#ps_routeidx{entry = K}, Acc) -> FoldFun(match_to_route(K), Acc) end.
+
+match_filters(Topic) ->
+    emqx_topic_index:matches(Topic, ?PS_FILTERS_TAB, []).
+
+get_dest_session_id({_, DSSessionId}) ->
+    DSSessionId;
+get_dest_session_id(DSSessionId) ->
+    DSSessionId.
+
+match_to_route(M) ->
+    #ps_route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}.
+
+mria_route_tab_insert(Route) ->
+    mria:dirty_write(?PS_ROUTER_TAB, Route).
+
+lookup_route_tab(Topic) ->
+    ets:lookup(?PS_ROUTER_TAB, Topic).
+
+has_route_tab_entry(Topic, Dest) ->
+    [] =/= ets:match(?PS_ROUTER_TAB, #ps_route{topic = Topic, dest = Dest}).
+
+list_route_tab_topics() ->
+    mnesia:dirty_all_keys(?PS_ROUTER_TAB).
+
+mria_route_tab_delete(Route) ->
+    mria:dirty_delete_object(?PS_ROUTER_TAB, Route).

+ 2 - 0
apps/emqx/src/emqx_session.erl

@@ -317,6 +317,8 @@ is_subscriptions_full(#session{
 
 -spec add_persistent_subscription(emqx_types:topic(), emqx_types:clientid(), session()) ->
     session().
+add_persistent_subscription(_TopicFilterBin, _ClientId, Session = #session{is_persistent = false}) ->
+    Session;
 add_persistent_subscription(TopicFilterBin, ClientId, Session) ->
     _ = emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId),
     Session.

+ 99 - 34
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -19,10 +19,13 @@
 -include_lib("stdlib/include/assert.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
 
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-import(emqx_common_test_helpers, [on_exit/1]).
+
 -define(DS_SHARD, <<"local">>).
 
 all() ->
@@ -33,29 +36,31 @@ init_per_suite(Config) ->
     %% TODO: remove after other suites start to use `emx_cth_suite'
     application:stop(emqx),
     application:stop(emqx_durable_storage),
-    TCApps = emqx_cth_suite:start(
-        app_specs(),
-        #{work_dir => emqx_cth_suite:work_dir(Config)}
-    ),
-    [{tc_apps, TCApps} | Config].
+    Config.
 
-end_per_suite(Config) ->
-    TCApps = ?config(tc_apps, Config),
-    emqx_cth_suite:stop(TCApps),
+end_per_suite(_Config) ->
     ok.
 
-init_per_testcase(t_session_subscription_iterators, Config) ->
+init_per_testcase(t_session_subscription_iterators = TestCase, Config) ->
     Cluster = cluster(),
-    Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => ?config(priv_dir, Config)}),
+    Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}),
     [{nodes, Nodes} | Config];
-init_per_testcase(_TestCase, Config) ->
-    Config.
+init_per_testcase(TestCase, Config) ->
+    Apps = emqx_cth_suite:start(
+        app_specs(),
+        #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
+    ),
+    [{apps, Apps} | Config].
 
 end_per_testcase(t_session_subscription_iterators, Config) ->
     Nodes = ?config(nodes, Config),
+    emqx_common_test_helpers:call_janitor(60_000),
     ok = emqx_cth_cluster:stop(Nodes),
     ok;
-end_per_testcase(_TestCase, _Config) ->
+end_per_testcase(_TestCase, Config) ->
+    Apps = ?config(apps, Config),
+    emqx_common_test_helpers:call_janitor(60_000),
+    emqx_cth_suite:stop(Apps),
     ok.
 
 t_messages_persisted(_Config) ->
@@ -75,12 +80,12 @@ t_messages_persisted(_Config) ->
     Messages = [
         M1 = {<<"client/1/topic">>, <<"1">>},
         M2 = {<<"client/2/topic">>, <<"2">>},
-        M3 = {<<"client/3/topic/sub">>, <<"3">>},
-        M4 = {<<"client/4">>, <<"4">>},
+        _M3 = {<<"client/3/topic/sub">>, <<"3">>},
+        _M4 = {<<"client/4">>, <<"4">>},
         M5 = {<<"random/5">>, <<"5">>},
-        M6 = {<<"random/6/topic">>, <<"6">>},
+        _M6 = {<<"random/6/topic">>, <<"6">>},
         M7 = {<<"client/7/topic">>, <<"7">>},
-        M8 = {<<"client/8/topic/sub">>, <<"8">>},
+        _M8 = {<<"client/8/topic/sub">>, <<"8">>},
         M9 = {<<"random/9">>, <<"9">>},
         M10 = {<<"random/10">>, <<"10">>}
     ],
@@ -94,8 +99,53 @@ t_messages_persisted(_Config) ->
     ct:pal("Persisted = ~p", [Persisted]),
 
     ?assertEqual(
-        % [M1, M2, M5, M7, M9, M10],
-        [M1, M2, M3, M4, M5, M6, M7, M8, M9, M10],
+        [M1, M2, M5, M7, M9, M10],
+        [{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted]
+    ),
+
+    ok.
+
+t_messages_persisted_2(_Config) ->
+    Prefix = atom_to_binary(?FUNCTION_NAME),
+    C1 = connect(<<Prefix/binary, "1">>, _CleanStart0 = true, _EI0 = 30),
+    CP = connect(<<Prefix/binary, "-pub">>, _CleanStart1 = true, _EI1 = undefined),
+    T = fun(T0) -> <<Prefix/binary, T0/binary>> end,
+
+    %% won't be persisted
+    {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
+        emqtt:publish(CP, T(<<"random/topic">>), <<"0">>, 1),
+    {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
+        emqtt:publish(CP, T(<<"client/1/topic">>), <<"1">>, 1),
+    {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
+        emqtt:publish(CP, T(<<"client/2/topic">>), <<"2">>, 1),
+
+    {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(C1, T(<<"client/+/topic">>), qos1),
+    {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
+        emqtt:publish(CP, T(<<"random/topic">>), <<"3">>, 1),
+    %% will be persisted
+    {ok, #{reason_code := ?RC_SUCCESS}} =
+        emqtt:publish(CP, T(<<"client/1/topic">>), <<"4">>, 1),
+    {ok, #{reason_code := ?RC_SUCCESS}} =
+        emqtt:publish(CP, T(<<"client/2/topic">>), <<"5">>, 1),
+
+    {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C1, T(<<"client/+/topic">>)),
+    %% won't be persisted
+    {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
+        emqtt:publish(CP, T(<<"random/topic">>), <<"6">>, 1),
+    {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
+        emqtt:publish(CP, T(<<"client/1/topic">>), <<"7">>, 1),
+    {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
+        emqtt:publish(CP, T(<<"client/2/topic">>), <<"8">>, 1),
+
+    Persisted = consume(?DS_SHARD, {['#'], 0}),
+
+    ct:pal("Persisted = ~p", [Persisted]),
+
+    ?assertEqual(
+        [
+            {T(<<"client/1/topic">>), <<"4">>},
+            {T(<<"client/2/topic">>), <<"5">>}
+        ],
         [{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted]
     ),
 
@@ -121,12 +171,11 @@ t_session_subscription_iterators(Config) ->
                 lists:seq(1, 4)
             ),
             ct:pal("starting"),
-            {ok, Client} = emqtt:start_link([
-                {port, Port},
-                {clientid, ClientId},
-                {proto_ver, v5}
-            ]),
-            {ok, _} = emqtt:connect(Client),
+            Client = connect(#{
+                clientid => ClientId,
+                port => Port,
+                properties => #{'Session-Expiry-Interval' => 300}
+            }),
             ct:pal("publishing 1"),
             Message1 = emqx_message:make(Topic, Payload1),
             publish(Node1, Message1),
@@ -195,15 +244,19 @@ t_session_subscription_iterators(Config) ->
 %%
 
 connect(ClientId, CleanStart, EI) ->
-    {ok, Client} = emqtt:start_link([
-        {clientid, ClientId},
-        {proto_ver, v5},
-        {clean_start, CleanStart},
-        {properties,
-            maps:from_list(
-                [{'Session-Expiry-Interval', EI} || is_integer(EI)]
-            )}
-    ]),
+    connect(#{
+        clientid => ClientId,
+        clean_start => CleanStart,
+        properties => maps:from_list(
+            [{'Session-Expiry-Interval', EI} || is_integer(EI)]
+        )
+    }).
+
+connect(Opts0 = #{}) ->
+    Defaults = #{proto_ver => v5},
+    Opts = maps:to_list(emqx_utils_maps:deep_merge(Defaults, Opts0)),
+    {ok, Client} = emqtt:start_link(Opts),
+    on_exit(fun() -> catch emqtt:stop(Client) end),
     {ok, _} = emqtt:connect(Client),
     Client.
 
@@ -222,6 +275,18 @@ consume(It) ->
             []
     end.
 
+delete_all_messages() ->
+    Persisted = consume(?DS_SHARD, {['#'], 0}),
+    lists:foreach(
+        fun(Msg) ->
+            GUID = emqx_message:id(Msg),
+            Topic = emqx_topic:words(emqx_message:topic(Msg)),
+            Timestamp = emqx_guid:timestamp(GUID),
+            ok = emqx_ds_storage_layer:delete(?DS_SHARD, GUID, Timestamp, Topic)
+        end,
+        Persisted
+    ).
+
 receive_messages(Count) ->
     receive_messages(Count, []).
 

+ 178 - 0
apps/emqx/test/emqx_persistent_session_ds_router_SUITE.erl

@@ -0,0 +1,178 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_persistent_session_ds_router_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-define(R, emqx_persistent_session_ds_router).
+-define(DEF_DS_SESSION_ID, <<"some-client-id">>).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    WorkDir = filename:join([?config(priv_dir, Config), ?MODULE]),
+    AppSpecs = [
+        emqx_durable_storage,
+        {emqx, #{
+            config => #{persistent_session_store => #{ds => true}},
+            override_env => [{boot_modules, [broker]}]
+        }}
+    ],
+    Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}),
+    [{apps, Apps} | Config].
+
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(apps, Config)),
+    ok.
+
+init_per_testcase(_TestCase, Config) ->
+    clear_tables(),
+    Config.
+
+end_per_testcase(_TestCase, _Config) ->
+    clear_tables().
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+clear_tables() ->
+    lists:foreach(
+        fun mnesia:clear_table/1,
+        [?PS_ROUTER_TAB, ?PS_FILTERS_TAB]
+    ).
+
+add_route(TopicFilter) ->
+    ?R:do_add_route(TopicFilter, ?DEF_DS_SESSION_ID).
+
+delete_route(TopicFilter) ->
+    ?R:do_delete_route(TopicFilter, ?DEF_DS_SESSION_ID).
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+% t_lookup_routes(_) ->
+%     error('TODO').
+
+t_add_delete(_) ->
+    add_route(<<"a/b/c">>),
+    add_route(<<"a/b/c">>),
+    add_route(<<"a/+/b">>),
+    ?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())),
+    delete_route(<<"a/b/c">>),
+    delete_route(<<"a/+/b">>),
+    ?assertEqual([], ?R:topics()).
+
+t_add_delete_incremental(_) ->
+    add_route(<<"a/b/c">>),
+    add_route(<<"a/+/c">>),
+    add_route(<<"a/+/+">>),
+    add_route(<<"a/b/#">>),
+    add_route(<<"#">>),
+    ?assertEqual(
+        [
+            #ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID},
+            #ps_route{topic = <<"a/+/+">>, dest = ?DEF_DS_SESSION_ID},
+            #ps_route{topic = <<"a/+/c">>, dest = ?DEF_DS_SESSION_ID},
+            #ps_route{topic = <<"a/b/#">>, dest = ?DEF_DS_SESSION_ID},
+            #ps_route{topic = <<"a/b/c">>, dest = ?DEF_DS_SESSION_ID}
+        ],
+        lists:sort(?R:match_routes(<<"a/b/c">>))
+    ),
+    delete_route(<<"a/+/c">>),
+    ?assertEqual(
+        [
+            #ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID},
+            #ps_route{topic = <<"a/+/+">>, dest = ?DEF_DS_SESSION_ID},
+            #ps_route{topic = <<"a/b/#">>, dest = ?DEF_DS_SESSION_ID},
+            #ps_route{topic = <<"a/b/c">>, dest = ?DEF_DS_SESSION_ID}
+        ],
+        lists:sort(?R:match_routes(<<"a/b/c">>))
+    ),
+    delete_route(<<"a/+/+">>),
+    ?assertEqual(
+        [
+            #ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID},
+            #ps_route{topic = <<"a/b/#">>, dest = ?DEF_DS_SESSION_ID},
+            #ps_route{topic = <<"a/b/c">>, dest = ?DEF_DS_SESSION_ID}
+        ],
+        lists:sort(?R:match_routes(<<"a/b/c">>))
+    ),
+    delete_route(<<"a/b/#">>),
+    ?assertEqual(
+        [
+            #ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID},
+            #ps_route{topic = <<"a/b/c">>, dest = ?DEF_DS_SESSION_ID}
+        ],
+        lists:sort(?R:match_routes(<<"a/b/c">>))
+    ),
+    delete_route(<<"a/b/c">>),
+    ?assertEqual(
+        [#ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID}],
+        lists:sort(?R:match_routes(<<"a/b/c">>))
+    ).
+
+t_do_add_delete(_) ->
+    add_route(<<"a/b/c">>),
+    add_route(<<"a/b/c">>),
+    add_route(<<"a/+/b">>),
+    ?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())),
+
+    delete_route(<<"a/b/c">>),
+    delete_route(<<"a/+/b">>),
+    ?assertEqual([], ?R:topics()).
+
+t_match_routes(_) ->
+    add_route(<<"a/b/c">>),
+    add_route(<<"a/+/c">>),
+    add_route(<<"a/b/#">>),
+    add_route(<<"#">>),
+    ?assertEqual(
+        [
+            #ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID},
+            #ps_route{topic = <<"a/+/c">>, dest = ?DEF_DS_SESSION_ID},
+            #ps_route{topic = <<"a/b/#">>, dest = ?DEF_DS_SESSION_ID},
+            #ps_route{topic = <<"a/b/c">>, dest = ?DEF_DS_SESSION_ID}
+        ],
+        lists:sort(?R:match_routes(<<"a/b/c">>))
+    ),
+    delete_route(<<"a/b/c">>),
+    delete_route(<<"a/+/c">>),
+    delete_route(<<"a/b/#">>),
+    delete_route(<<"#">>),
+    ?assertEqual([], lists:sort(?R:match_routes(<<"a/b/c">>))).
+
+t_print_routes(_) ->
+    add_route(<<"+/#">>),
+    add_route(<<"+/+">>),
+    ?R:print_routes(<<"a/b">>).
+
+t_has_route(_) ->
+    add_route(<<"devices/+/messages">>),
+    ?assert(?R:has_route(<<"devices/+/messages">>, ?DEF_DS_SESSION_ID)),
+    delete_route(<<"devices/+/messages">>).

+ 56 - 49
apps/emqx/test/emqx_sys_mon_SUITE.erl

@@ -21,6 +21,7 @@
 
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
 
 -define(SYSMON, emqx_sys_mon).
 
@@ -64,60 +65,66 @@
 
 all() -> emqx_common_test_helpers:all(?MODULE).
 
-init_per_testcase(t_sys_mon, Config) ->
-    emqx_common_test_helpers:boot_modules(all),
-    emqx_common_test_helpers:start_apps(
-        [],
-        fun
-            (emqx) ->
-                application:set_env(emqx, sysmon, [
-                    {busy_dist_port, true},
-                    {busy_port, false},
-                    {large_heap, 8388608},
-                    {long_schedule, 240},
-                    {long_gc, 0}
-                ]),
-                ok;
-            (_) ->
-                ok
-        end
+init_per_testcase(t_sys_mon = TestCase, Config) ->
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx, #{
+                override_env => [
+                    {sys_mon, [
+                        {busy_dist_port, true},
+                        {busy_port, false},
+                        {large_heap, 8388608},
+                        {long_schedule, 240},
+                        {long_gc, 0}
+                    ]}
+                ]
+            }}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
+    ),
+    [{apps, Apps} | Config];
+init_per_testcase(t_sys_mon2 = TestCase, Config) ->
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx, #{
+                override_env => [
+                    {sys_mon, [
+                        {busy_dist_port, false},
+                        {busy_port, true},
+                        {large_heap, 8388608},
+                        {long_schedule, 0},
+                        {long_gc, 200},
+                        {nothing, 0}
+                    ]}
+                ]
+            }}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
     ),
-    Config;
-init_per_testcase(t_sys_mon2, Config) ->
-    emqx_common_test_helpers:boot_modules(all),
-    emqx_common_test_helpers:start_apps(
-        [],
-        fun
-            (emqx) ->
-                application:set_env(emqx, sysmon, [
-                    {busy_dist_port, false},
-                    {busy_port, true},
-                    {large_heap, 8388608},
-                    {long_schedule, 0},
-                    {long_gc, 200},
-                    {nothing, 0}
-                ]),
-                ok;
-            (_) ->
-                ok
-        end
+    [{apps, Apps} | Config];
+init_per_testcase(t_procinfo = TestCase, Config) ->
+    Apps = emqx_cth_suite:start(
+        [emqx],
+        #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
     ),
-    Config;
-init_per_testcase(t_procinfo, Config) ->
-    emqx_common_test_helpers:boot_modules(all),
-    emqx_common_test_helpers:start_apps([]),
     ok = meck:new(emqx_vm, [passthrough, no_history]),
-    Config;
-init_per_testcase(_, Config) ->
-    emqx_common_test_helpers:boot_modules(all),
-    emqx_common_test_helpers:start_apps([]),
-    Config.
+    [{apps, Apps} | Config];
+init_per_testcase(TestCase, Config) ->
+    Apps = emqx_cth_suite:start(
+        [emqx],
+        #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
+    ),
+    [{apps, Apps} | Config].
 
-end_per_testcase(t_procinfo, _Config) ->
+end_per_testcase(t_procinfo, Config) ->
+    Apps = ?config(apps, Config),
     ok = meck:unload(emqx_vm),
-    emqx_common_test_helpers:stop_apps([]);
-end_per_testcase(_, _Config) ->
-    emqx_common_test_helpers:stop_apps([]).
+    ok = emqx_cth_suite:stop(Apps),
+    ok;
+end_per_testcase(_, Config) ->
+    Apps = ?config(apps, Config),
+    ok = emqx_cth_suite:stop(Apps),
+    ok.
 
 t_procinfo(_) ->
     ok = meck:expect(emqx_vm, get_process_info, fun(_) -> [] end),

+ 7 - 14
apps/emqx/test/emqx_takeover_SUITE.erl

@@ -34,22 +34,15 @@
 all() -> emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    emqx_common_test_helpers:boot_modules(all),
-    ?check_trace(
-        ?wait_async_action(
-            emqx_common_test_helpers:start_apps([]),
-            #{?snk_kind := listener_started, bind := 1883},
-            timer:seconds(10)
-        ),
-        fun(Trace) ->
-            %% more than one listener
-            ?assertMatch([_ | _], ?of_kind(listener_started, Trace))
-        end
+    Apps = emqx_cth_suite:start(
+        [emqx],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
     ),
-    Config.
+    [{apps, Apps} | Config].
 
-end_per_suite(_Config) ->
-    emqx_common_test_helpers:stop_apps([]),
+end_per_suite(Config) ->
+    Apps = ?config(apps, Config),
+    ok = emqx_cth_suite:stop(Apps),
     ok.
 %%--------------------------------------------------------------------
 %% Testcases

+ 10 - 10
apps/emqx/test/emqx_tls_certfile_gc_SUITE.erl

@@ -32,13 +32,10 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    _ = application:load(emqx),
-    ok = application:set_env(emqx, data_dir, ?config(priv_dir, Config)),
-    ok = emqx_config:save_schema_mod_and_names(?MODULE),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_config:erase_all().
+    ok.
 
 init_per_testcase(TC, Config) ->
     TCAbsDir = filename:join(?config(priv_dir, Config), TC),
@@ -46,9 +43,10 @@ init_per_testcase(TC, Config) ->
     ok = snabbkaffe:start_trace(),
     [{tc_name, atom_to_list(TC)}, {tc_absdir, TCAbsDir} | Config].
 
-end_per_testcase(_TC, Config) ->
+end_per_testcase(_TC, _Config) ->
     ok = snabbkaffe:stop(),
-    ok = application:set_env(emqx, data_dir, ?config(priv_dir, Config)),
+    _ = emqx_schema_hooks:erase_injections(),
+    _ = emqx_config:erase_all(),
     ok.
 
 t_no_orphans(Config) ->
@@ -371,16 +369,18 @@ t_gc_spares_symlinked_datadir(Config) ->
 
     ok = proc_lib:stop(Pid).
 
-t_gc_active(_Config) ->
-    ok = emqx_common_test_helpers:boot_modules([]),
-    ok = emqx_common_test_helpers:start_apps([]),
+t_gc_active(Config) ->
+    Apps = emqx_cth_suite:start(
+        [emqx],
+        #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
+    ),
     try
         ?assertEqual(
             {ok, []},
             emqx_tls_certfile_gc:run()
         )
     after
-        emqx_common_test_helpers:stop_apps([])
+        emqx_cth_suite:stop(Apps)
     end.
 
 orphans() ->

+ 9 - 4
apps/emqx/test/emqx_trace_SUITE.erl

@@ -34,17 +34,22 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    ok = emqx_common_test_helpers:start_apps([]),
+    Apps = emqx_cth_suite:start(
+        [emqx],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
     Listeners = emqx_listeners:list(),
     ct:pal("emqx_listeners:list() = ~p~n", [Listeners]),
     ?assertMatch(
         [_ | _],
         [ID || {ID, #{running := true}} <- Listeners]
     ),
-    Config.
+    [{apps, Apps} | Config].
 
-end_per_suite(_Config) ->
-    emqx_common_test_helpers:stop_apps([]).
+end_per_suite(Config) ->
+    Apps = ?config(apps, Config),
+    ok = emqx_cth_suite:stop(Apps),
+    ok.
 
 init_per_testcase(_, Config) ->
     reload(),

+ 9 - 5
apps/emqx/test/emqx_trace_handler_SUITE.erl

@@ -32,12 +32,16 @@
 all() -> [t_trace_clientid, t_trace_topic, t_trace_ip_address, t_trace_clientid_utf8].
 
 init_per_suite(Config) ->
-    emqx_common_test_helpers:boot_modules(all),
-    emqx_common_test_helpers:start_apps([]),
-    Config.
+    Apps = emqx_cth_suite:start(
+        [emqx],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    [{apps, Apps} | Config].
 
-end_per_suite(_Config) ->
-    emqx_common_test_helpers:stop_apps([]).
+end_per_suite(Config) ->
+    Apps = ?config(apps, Config),
+    ok = emqx_cth_suite:stop(Apps),
+    ok.
 
 init_per_testcase(t_trace_clientid, Config) ->
     init(),

+ 17 - 10
apps/emqx/test/emqx_vm_mon_SUITE.erl

@@ -20,12 +20,15 @@
 -compile(nowarn_export_all).
 
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
 
 all() -> emqx_common_test_helpers:all(?MODULE).
 
-init_per_testcase(t_too_many_processes_alarm, Config) ->
-    emqx_common_test_helpers:boot_modules(all),
-    emqx_common_test_helpers:start_apps([]),
+init_per_testcase(t_too_many_processes_alarm = TestCase, Config) ->
+    Apps = emqx_cth_suite:start(
+        [emqx],
+        #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
+    ),
     emqx_config:put([sysmon, vm], #{
         process_high_watermark => 0,
         process_low_watermark => 0,
@@ -34,14 +37,18 @@ init_per_testcase(t_too_many_processes_alarm, Config) ->
     }),
     ok = supervisor:terminate_child(emqx_sys_sup, emqx_vm_mon),
     {ok, _} = supervisor:restart_child(emqx_sys_sup, emqx_vm_mon),
-    Config;
-init_per_testcase(_, Config) ->
-    emqx_common_test_helpers:boot_modules(all),
-    emqx_common_test_helpers:start_apps([]),
-    Config.
+    [{apps, Apps} | Config];
+init_per_testcase(TestCase, Config) ->
+    Apps = emqx_cth_suite:start(
+        [emqx],
+        #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
+    ),
+    [{apps, Apps} | Config].
 
-end_per_testcase(_, _Config) ->
-    emqx_common_test_helpers:stop_apps([]).
+end_per_testcase(_, Config) ->
+    Apps = ?config(apps, Config),
+    ok = emqx_cth_suite:stop(Apps),
+    ok.
 
 t_too_many_processes_alarm(_) ->
     timer:sleep(500),

+ 26 - 39
apps/emqx/test/emqx_ws_connection_SUITE.erl

@@ -49,6 +49,10 @@ init_per_testcase(TestCase, Config) when
     TestCase =/= t_ws_non_check_origin
 ->
     add_bucket(),
+    Apps = emqx_cth_suite:start(
+        [emqx],
+        #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
+    ),
     %% Meck Cm
     ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
     ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
@@ -80,37 +84,32 @@ init_per_testcase(TestCase, Config) when
     ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
     ok = meck:expect(emqx_metrics, inc_recv, fun(_) -> ok end),
     ok = meck:expect(emqx_metrics, inc_sent, fun(_) -> ok end),
-    PrevConfig = emqx_config:get_listener_conf(ws, default, [websocket]),
-    [
-        {prev_config, PrevConfig}
-        | Config
-    ];
-init_per_testcase(t_ws_non_check_origin, Config) ->
+    [{apps, Apps} | Config];
+init_per_testcase(t_ws_non_check_origin = TestCase, Config) ->
     add_bucket(),
-    ok = emqx_common_test_helpers:start_apps([]),
-    PrevConfig = emqx_config:get_listener_conf(ws, default, [websocket]),
+    Apps = emqx_cth_suite:start(
+        [emqx],
+        #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
+    ),
     emqx_config:put_listener_conf(ws, default, [websocket, check_origin_enable], false),
     emqx_config:put_listener_conf(ws, default, [websocket, check_origins], []),
-    [
-        {prev_config, PrevConfig}
-        | Config
-    ];
-init_per_testcase(_, Config) ->
+    [{apps, Apps} | Config];
+init_per_testcase(TestCase, Config) ->
     add_bucket(),
-    PrevConfig = emqx_config:get_listener_conf(ws, default, [websocket]),
-    ok = emqx_common_test_helpers:start_apps([]),
-    [
-        {prev_config, PrevConfig}
-        | Config
-    ].
+    Apps = emqx_cth_suite:start(
+        [emqx],
+        #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
+    ),
+    [{apps, Apps} | Config].
 
-end_per_testcase(TestCase, _Config) when
+end_per_testcase(TestCase, Config) when
     TestCase =/= t_ws_sub_protocols_mqtt_equivalents,
     TestCase =/= t_ws_sub_protocols_mqtt,
     TestCase =/= t_ws_check_origin,
     TestCase =/= t_ws_non_check_origin,
     TestCase =/= t_ws_pingreq_before_connected
 ->
+    Apps = ?config(apps, Config),
     del_bucket(),
     lists:foreach(
         fun meck:unload/1,
@@ -122,32 +121,20 @@ end_per_testcase(TestCase, _Config) when
             emqx_hooks,
             emqx_metrics
         ]
-    );
+    ),
+    ok = emqx_cth_suite:stop(Apps),
+    ok;
 end_per_testcase(t_ws_non_check_origin, Config) ->
+    Apps = ?config(apps, Config),
     del_bucket(),
-    PrevConfig = ?config(prev_config, Config),
-    emqx_config:put_listener_conf(ws, default, [websocket], PrevConfig),
-    stop_apps(),
+    ok = emqx_cth_suite:stop(Apps),
     ok;
 end_per_testcase(_, Config) ->
+    Apps = ?config(apps, Config),
     del_bucket(),
-    PrevConfig = ?config(prev_config, Config),
-    emqx_config:put_listener_conf(ws, default, [websocket], PrevConfig),
-    stop_apps(),
-    Config.
-
-init_per_suite(Config) ->
-    emqx_common_test_helpers:start_apps([]),
+    ok = emqx_cth_suite:stop(Apps),
     Config.
 
-end_per_suite(_) ->
-    emqx_common_test_helpers:stop_apps([]),
-    ok.
-
-%% FIXME: this is a temp fix to tests share configs.
-stop_apps() ->
-    emqx_common_test_helpers:stop_apps([], #{erase_all_configs => false}).
-
 %%--------------------------------------------------------------------
 %% Test Cases
 %%--------------------------------------------------------------------

+ 1 - 1
apps/emqx_durable_storage/src/emqx_ds_app.erl

@@ -15,7 +15,6 @@ start(_Type, _Args) ->
     emqx_ds_sup:start_link().
 
 init_mnesia() ->
-    %% FIXME: This is a temporary workaround to avoid crashes when starting on Windows
     ok = mria:create_table(
         ?SESSION_TAB,
         [
@@ -39,6 +38,7 @@ init_mnesia() ->
     ok.
 
 storage() ->
+    %% FIXME: This is a temporary workaround to avoid crashes when starting on Windows
     case mria:rocksdb_backend_available() of
         true ->
             rocksdb_copies;

+ 10 - 1
apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl

@@ -77,11 +77,14 @@
 %%
 %%================================================================================
 
+-behaviour(emqx_ds_storage_layer).
+
 %% API:
 -export([create_new/3, open/5]).
 -export([make_keymapper/1]).
 
 -export([store/5]).
+-export([delete/4]).
 -export([make_iterator/2]).
 -export([make_iterator/3]).
 -export([next/1]).
@@ -270,13 +273,19 @@ make_keymapper(#{
         epoch = 1 bsl TimestampLSBs
     }.
 
--spec store(db(), emqx_guid:guid(), time(), topic(), binary()) ->
+-spec store(db(), emqx_guid:guid(), emqx_ds:time(), topic(), binary()) ->
     ok | {error, _TODO}.
 store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, MessagePayload) ->
     Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper),
     Value = make_message_value(Topic, MessagePayload),
     rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options).
 
+-spec delete(db(), emqx_guid:guid(), emqx_ds:time(), topic()) ->
+    ok | {error, _TODO}.
+delete(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic) ->
+    Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper),
+    rocksdb:delete(DBHandle, CFHandle, Key, DB#db.write_options).
+
 -spec make_iterator(db(), emqx_ds:replay()) ->
     {ok, iterator()} | {error, _TODO}.
 make_iterator(DB, Replay) ->

+ 18 - 2
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -10,6 +10,7 @@
 -export([create_generation/3]).
 
 -export([store/5]).
+-export([delete/4]).
 
 -export([make_iterator/2, next/1]).
 
@@ -109,7 +110,16 @@
 -callback open(emqx_ds:shard(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
     term().
 
--callback store(_Schema, binary(), emqx_ds:time(), emqx_ds:topic(), binary()) ->
+-callback store(
+    _Schema,
+    _MessageID :: binary(),
+    emqx_ds:time(),
+    emqx_ds:topic(),
+    _Payload :: binary()
+) ->
+    ok | {error, _}.
+
+-callback delete(_Schema, _MessageID :: binary(), emqx_ds:time(), emqx_ds:topic()) ->
     ok | {error, _}.
 
 -callback make_iterator(_Schema, emqx_ds:replay()) ->
@@ -117,7 +127,7 @@
 
 -callback restore_iterator(_Schema, emqx_ds:replay(), binary()) -> {ok, _It} | {error, _}.
 
--callback preserve_iterator(_Schema, _It) -> term().
+-callback preserve_iterator(_It) -> term().
 
 -callback next(It) -> {value, binary(), It} | none | {error, closed}.
 
@@ -140,6 +150,12 @@ store(Shard, GUID, Time, Topic, Msg) ->
     {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time),
     Mod:store(Data, GUID, Time, Topic, Msg).
 
+-spec delete(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic()) ->
+    ok | {error, _}.
+delete(Shard, GUID, Time, Topic) ->
+    {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time),
+    Mod:delete(Data, GUID, Time, Topic).
+
 -spec make_iterator(emqx_ds:shard(), emqx_ds:replay()) ->
     {ok, iterator()} | {error, _TODO}.
 make_iterator(Shard, Replay = {_, StartTime}) ->