Просмотр исходного кода

Merge pull request #6274 from emqx/persistent-sessions-ram-backend

feat(persistent_sessions): add choice between ram or disc backends in…
Tobias Lindahl 4 лет назад
Родитель
Сommit
ca89a8da61

+ 15 - 3
apps/emqx/src/emqx_persistent_session.erl

@@ -18,6 +18,7 @@
 
 -export([ is_store_enabled/0
         , init_db_backend/0
+        , storage_type/0
         ]).
 
 -export([ discard/2
@@ -82,9 +83,17 @@
 init_db_backend() ->
     case is_store_enabled() of
         true  ->
-            ok = emqx_trie:create_session_trie(),
-            emqx_persistent_session_mnesia_backend:create_tables(),
-            persistent_term:put(?db_backend_key, emqx_persistent_session_mnesia_backend),
+            StorageType = storage_type(),
+            ok = emqx_trie:create_session_trie(StorageType),
+            ok = emqx_session_router:create_router_tab(StorageType),
+            case StorageType of
+                disc ->
+                    emqx_persistent_session_mnesia_disc_backend:create_tables(),
+                    persistent_term:put(?db_backend_key, emqx_persistent_session_mnesia_disc_backend);
+                ram ->
+                    emqx_persistent_session_mnesia_ram_backend:create_tables(),
+                    persistent_term:put(?db_backend_key, emqx_persistent_session_mnesia_ram_backend)
+            end,
             ok;
         false ->
             persistent_term:put(?db_backend_key, emqx_persistent_session_dummy_backend),
@@ -94,6 +103,9 @@ init_db_backend() ->
 is_store_enabled() ->
     emqx_config:get(?is_enabled_key).
 
+storage_type() ->
+    emqx_config:get(?storage_type_key).
+
 %%--------------------------------------------------------------------
 %% Session message ADT API
 %%--------------------------------------------------------------------

+ 9 - 3
apps/emqx/src/emqx_persistent_session.hrl

@@ -14,9 +14,14 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
--define(SESSION_STORE, emqx_session_store).
--define(SESS_MSG_TAB, emqx_session_msg).
--define(MSG_TAB, emqx_persistent_msg).
+-define(SESSION_STORE_DISC, emqx_session_store_disc).
+-define(SESSION_STORE_RAM, emqx_session_store_ram).
+
+-define(SESS_MSG_TAB_DISC, emqx_session_msg_disc).
+-define(SESS_MSG_TAB_RAM, emqx_session_msg_ram).
+
+-define(MSG_TAB_DISC, emqx_persistent_msg_disc).
+-define(MSG_TAB_RAM, emqx_persistent_msg_ram).
 
 -record(session_store, { client_id        :: binary()
                        , expiry_interval  :: non_neg_integer()
@@ -28,6 +33,7 @@
 
 -define(db_backend_key, [persistent_session_store, db_backend]).
 -define(is_enabled_key, [persistent_session_store, enabled]).
+-define(storage_type_key, [persistent_session_store, storage_type]).
 -define(msg_retain, [persistent_session_store, max_retain_undelivered]).
 
 -define(db_backend, (persistent_term:get(?db_backend_key))).

+ 16 - 16
apps/emqx/src/emqx_persistent_session_mnesia_backend.erl

@@ -14,7 +14,7 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
--module(emqx_persistent_session_mnesia_backend).
+-module(emqx_persistent_session_mnesia_disc_backend).
 
 -include("emqx.hrl").
 -include("emqx_persistent_session.hrl").
@@ -36,7 +36,7 @@
         ]).
 
 create_tables() ->
-    ok = mria:create_table(?SESSION_STORE, [
+    ok = mria:create_table(?SESSION_STORE_DISC, [
                 {type, set},
                 {rlog_shard, ?PERSISTENT_SESSION_SHARD},
                 {storage, disc_copies},
@@ -44,7 +44,7 @@ create_tables() ->
                 {attributes, record_info(fields, session_store)},
                 {storage_properties, [{ets, [{read_concurrency, true}]}]}]),
 
-    ok = mria:create_table(?SESS_MSG_TAB, [
+    ok = mria:create_table(?SESS_MSG_TAB_DISC, [
                 {type, ordered_set},
                 {rlog_shard, ?PERSISTENT_SESSION_SHARD},
                 {storage, disc_copies},
@@ -53,7 +53,7 @@ create_tables() ->
                 {storage_properties, [{ets, [{read_concurrency, true},
                                              {write_concurrency, true}]}]}]),
 
-    ok = mria:create_table(?MSG_TAB, [
+    ok = mria:create_table(?MSG_TAB_DISC, [
                 {type, ordered_set},
                 {rlog_shard, ?PERSISTENT_SESSION_SHARD},
                 {storage, disc_copies},
@@ -63,43 +63,43 @@ create_tables() ->
                                              {write_concurrency, true}]}]}]).
 
 first_session_message() ->
-    mnesia:dirty_first(?SESS_MSG_TAB).
+    mnesia:dirty_first(?SESS_MSG_TAB_DISC).
 
 next_session_message(Key) ->
-    mnesia:dirty_next(?SESS_MSG_TAB, Key).
+    mnesia:dirty_next(?SESS_MSG_TAB_DISC, Key).
 
 first_message_id() ->
-    mnesia:dirty_first(?MSG_TAB).
+    mnesia:dirty_first(?MSG_TAB_DISC).
 
 next_message_id(Key) ->
-    mnesia:dirty_next(?MSG_TAB, Key).
+    mnesia:dirty_next(?MSG_TAB_DISC, Key).
 
 delete_message(Key) ->
-    mria:dirty_delete(?MSG_TAB, Key).
+    mria:dirty_delete(?MSG_TAB_DISC, Key).
 
 delete_session_message(Key) ->
-    mria:dirty_delete(?SESS_MSG_TAB, Key).
+    mria:dirty_delete(?SESS_MSG_TAB_DISC, Key).
 
 put_session_store(SS) ->
-    mria:dirty_write(?SESSION_STORE, SS).
+    mria:dirty_write(?SESSION_STORE_DISC, SS).
 
 delete_session_store(ClientID) ->
-    mria:dirty_delete(?SESSION_STORE, ClientID).
+    mria:dirty_delete(?SESSION_STORE_DISC, ClientID).
 
 lookup_session_store(ClientID) ->
-    case mnesia:dirty_read(?SESSION_STORE, ClientID) of
+    case mnesia:dirty_read(?SESSION_STORE_DISC, ClientID) of
         [] -> none;
         [SS] -> {value, SS}
     end.
 
 put_session_message(SessMsg) ->
-    mria:dirty_write(?SESS_MSG_TAB, SessMsg).
+    mria:dirty_write(?SESS_MSG_TAB_DISC, SessMsg).
 
 put_message(Msg) ->
-    mria:dirty_write(?MSG_TAB, Msg).
+    mria:dirty_write(?MSG_TAB_DISC, Msg).
 
 get_message(MsgId) ->
-    case mnesia:read(?MSG_TAB, MsgId) of
+    case mnesia:read(?MSG_TAB_DISC, MsgId) of
         [] -> error({msg_not_found, MsgId});
         [Msg] -> Msg
     end.

+ 110 - 0
apps/emqx/src/emqx_persistent_session_mnesia_ram_backend.erl

@@ -0,0 +1,110 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_persistent_session_mnesia_ram_backend).
+
+-include("emqx.hrl").
+-include("emqx_persistent_session.hrl").
+
+-export([ create_tables/0
+        , first_message_id/0
+        , next_message_id/1
+        , delete_message/1
+        , first_session_message/0
+        , next_session_message/1
+        , delete_session_message/1
+        , put_session_store/1
+        , delete_session_store/1
+        , lookup_session_store/1
+        , put_session_message/1
+        , put_message/1
+        , get_message/1
+        , ro_transaction/1
+        ]).
+
+create_tables() ->
+    ok = mria:create_table(?SESSION_STORE_RAM, [
+                {type, set},
+                {rlog_shard, ?PERSISTENT_SESSION_SHARD},
+                {storage, ram_copies},
+                {record_name, session_store},
+                {attributes, record_info(fields, session_store)},
+                {storage_properties, [{ets, [{read_concurrency, true}]}]}]),
+
+    ok = mria:create_table(?SESS_MSG_TAB_RAM, [
+                {type, ordered_set},
+                {rlog_shard, ?PERSISTENT_SESSION_SHARD},
+                {storage, ram_copies},
+                {record_name, session_msg},
+                {attributes, record_info(fields, session_msg)},
+                {storage_properties, [{ets, [{read_concurrency, true},
+                                             {write_concurrency, true}]}]}]),
+
+    ok = mria:create_table(?MSG_TAB_RAM, [
+                {type, ordered_set},
+                {rlog_shard, ?PERSISTENT_SESSION_SHARD},
+                {storage, ram_copies},
+                {record_name, message},
+                {attributes, record_info(fields, message)},
+                {storage_properties, [{ets, [{read_concurrency, true},
+                                             {write_concurrency, true}]}]}]).
+
+first_session_message() ->
+    mnesia:dirty_first(?SESS_MSG_TAB_RAM).
+
+next_session_message(Key) ->
+    mnesia:dirty_next(?SESS_MSG_TAB_RAM, Key).
+
+first_message_id() ->
+    mnesia:dirty_first(?MSG_TAB_RAM).
+
+next_message_id(Key) ->
+    mnesia:dirty_next(?MSG_TAB_RAM, Key).
+
+delete_message(Key) ->
+    mria:dirty_delete(?MSG_TAB_RAM, Key).
+
+delete_session_message(Key) ->
+    mria:dirty_delete(?SESS_MSG_TAB_RAM, Key).
+
+put_session_store(SS) ->
+    mria:dirty_write(?SESSION_STORE_RAM, SS).
+
+delete_session_store(ClientID) ->
+    mria:dirty_delete(?SESSION_STORE_RAM, ClientID).
+
+lookup_session_store(ClientID) ->
+    case mnesia:dirty_read(?SESSION_STORE_RAM, ClientID) of
+        [] -> none;
+        [SS] -> {value, SS}
+    end.
+
+put_session_message(SessMsg) ->
+    mria:dirty_write(?SESS_MSG_TAB_RAM, SessMsg).
+
+put_message(Msg) ->
+    mria:dirty_write(?MSG_TAB_RAM, Msg).
+
+get_message(MsgId) ->
+    case mnesia:read(?MSG_TAB_RAM, MsgId) of
+        [] -> error({msg_not_found, MsgId});
+        [Msg] -> Msg
+    end.
+
+ro_transaction(Fun) ->
+    {atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun),
+    Res.
+

+ 20 - 6
apps/emqx/src/emqx_router_utils.erl

@@ -20,8 +20,10 @@
 
 -export([ delete_direct_route/2
         , delete_trie_route/2
+        , delete_session_trie_route/2
         , insert_direct_route/2
         , insert_trie_route/2
+        , insert_session_trie_route/2
         , maybe_trans/3
         ]).
 
@@ -30,8 +32,14 @@ insert_direct_route(Tab, Route) ->
 
 insert_trie_route(RouteTab, Route = #route{topic = Topic}) ->
     case mnesia:wread({RouteTab, Topic}) of
-        [] when RouteTab =:= emqx_route         -> emqx_trie:insert(Topic);
-        [] when RouteTab =:= emqx_session_route -> emqx_trie:insert_session(Topic);
+        [] -> emqx_trie:insert(Topic);
+        _  -> ok
+    end,
+    mnesia:write(RouteTab, Route, sticky_write).
+
+insert_session_trie_route(RouteTab, Route = #route{topic = Topic}) ->
+    case mnesia:wread({RouteTab, Topic}) of
+        []  -> emqx_trie:insert_session(Topic);
         _  -> ok
     end,
     mnesia:write(RouteTab, Route, sticky_write).
@@ -39,14 +47,20 @@ insert_trie_route(RouteTab, Route = #route{topic = Topic}) ->
 delete_direct_route(RouteTab, Route) ->
     mria:dirty_delete_object(RouteTab, Route).
 
-delete_trie_route(RouteTab, Route = #route{topic = Topic}) ->
+delete_trie_route(RouteTab, Route) ->
+    delete_trie_route(RouteTab, Route, normal).
+
+delete_session_trie_route(RouteTab, Route) ->
+    delete_trie_route(RouteTab, Route, session).
+
+delete_trie_route(RouteTab, Route = #route{topic = Topic}, Type) ->
     case mnesia:wread({RouteTab, Topic}) of
         [R] when R =:= Route ->
             %% Remove route and trie
             ok = mnesia:delete_object(RouteTab, Route, sticky_write),
-            case RouteTab of
-                emqx_route         -> emqx_trie:delete(Topic);
-                emqx_session_route -> emqx_trie:delete_session(Topic)
+            case Type of
+                normal  -> emqx_trie:delete(Topic);
+                session -> emqx_trie:delete_session(Topic)
             end;
         [_|_]   ->
             %% Remove route only

+ 32 - 1
apps/emqx/src/emqx_schema.erl

@@ -161,19 +161,50 @@ roots(low) ->
 fields("persistent_session_store") ->
     [ {"enabled",
        sc(boolean(),
-          #{ default => "false"
+          #{ default => false
+           , description => """
+Use the database to store information about persistent sessions.
+This makes it possible to migrate a client connection to another
+cluster node if a node is stopped.
+"""
+           })},
+      {"storage_type",
+       sc(hoconsc:union([ram, disc]),
+          #{ default => disc
+           , description => """
+Store information about persistent sessions on disc or in ram.
+If ram is chosen, all information about persistent sessions remains
+as long as at least one node in a cluster is alive to keep the information.
+If disc is chosen, the information is persisted on disc and will survive
+cluster restart, at the price of more disc usage and less throughput.
+"""
            })},
       {"max_retain_undelivered",
        sc(duration(),
           #{ default => "1h"
+           , description => """
+The time messages that was not delivered to a persistent session
+is stored before being garbage collected if the node the previous
+session was handled on restarts of is stopped.
+"""
            })},
       {"message_gc_interval",
        sc(duration(),
           #{ default => "1h"
+           , description => """
+The starting interval for garbage collection of undelivered messages to
+a persistent session. This affects how often the \"max_retain_undelivered\"
+is checked for removal.
+"""
            })},
       {"session_message_gc_interval",
        sc(duration(),
           #{ default => "1m"
+           , description => """
+The starting interval for garbage collection of transient data for
+persistent session messages. This does not affect the life time length
+of persistent session messages.
+"""
            })}
     ];
 

+ 27 - 15
apps/emqx/src/emqx_session_router.erl

@@ -24,12 +24,8 @@
 
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
-%% Mnesia bootstrap
--export([mnesia/1]).
-
--boot_mnesia({mnesia, [boot]}).
-
 -export([ create_init_tab/0
+        , create_router_tab/1
         , start_link/2]).
 
 %% Route APIs
@@ -60,7 +56,8 @@
 
 -type(dest() :: node() | {group(), node()}).
 
--define(ROUTE_TAB, emqx_session_route).
+-define(ROUTE_RAM_TAB, emqx_session_route_ram).
+-define(ROUTE_DISC_TAB, emqx_session_route_disc).
 
 -define(SESSION_INIT_TAB, session_init_tab).
 
@@ -68,13 +65,22 @@
 %% Mnesia bootstrap
 %%--------------------------------------------------------------------
 
-mnesia(boot) ->
-    ok = mria:create_table(?ROUTE_TAB, [
+create_router_tab(disc) ->
+    ok = mria:create_table(?ROUTE_DISC_TAB, [
                 {type, bag},
                 {rlog_shard, ?ROUTE_SHARD},
                 {storage, disc_copies},
                 {record_name, route},
                 {attributes, record_info(fields, route)},
+                {storage_properties, [{ets, [{read_concurrency, true},
+                                             {write_concurrency, true}]}]}]);
+create_router_tab(ram) ->
+    ok = mria:create_table(?ROUTE_RAM_TAB, [
+                {type, bag},
+                {rlog_shard, ?ROUTE_SHARD},
+                {storage, ram_copies},
+                {record_name, route},
+                {attributes, record_info(fields, route)},
                 {storage_properties, [{ets, [{read_concurrency, true},
                                              {write_concurrency, true}]}]}]).
 
@@ -103,11 +109,11 @@ do_add_route(Topic, SessionID) when is_binary(Topic) ->
         false ->
             case emqx_topic:wildcard(Topic) of
                 true  ->
-                    Fun = fun emqx_router_utils:insert_trie_route/2,
-                    emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route],
+                    Fun = fun emqx_router_utils:insert_session_trie_route/2,
+                    emqx_router_utils:maybe_trans(Fun, [route_tab(), Route],
                                                   ?PERSISTENT_SESSION_SHARD);
                 false ->
-                    emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route)
+                    emqx_router_utils:insert_direct_route(route_tab(), Route)
             end
     end.
 
@@ -136,10 +142,10 @@ do_delete_route(Topic, SessionID) ->
     Route = #route{topic = Topic, dest = SessionID},
     case emqx_topic:wildcard(Topic) of
         true  ->
-            Fun = fun emqx_router_utils:delete_trie_route/2,
-            emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?PERSISTENT_SESSION_SHARD);
+            Fun = fun emqx_router_utils:delete_session_trie_route/2,
+            emqx_router_utils:maybe_trans(Fun, [route_tab(), Route], ?PERSISTENT_SESSION_SHARD);
         false ->
-            emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route)
+            emqx_router_utils:delete_direct_route(route_tab(), Route)
     end.
 
 %% @doc Print routes to a topic
@@ -273,4 +279,10 @@ init_resume_worker(RemotePid, SessionID, #{ pmon := Pmon } = State) ->
 %%--------------------------------------------------------------------
 
 lookup_routes(Topic) ->
-    ets:lookup(?ROUTE_TAB, Topic).
+    ets:lookup(route_tab(), Topic).
+
+route_tab() ->
+    case emqx_persistent_session:storage_type() of
+        disc -> ?ROUTE_DISC_TAB;
+        ram  -> ?ROUTE_RAM_TAB
+    end.

+ 28 - 9
apps/emqx/src/emqx_trie.erl

@@ -20,7 +20,7 @@
 
 %% Mnesia bootstrap
 -export([ mnesia/1
-        , create_session_trie/0
+        , create_session_trie/1
         ]).
 
 -boot_mnesia({mnesia, [boot]}).
@@ -48,7 +48,8 @@
 -endif.
 
 -define(TRIE, emqx_trie).
--define(SESSION_TRIE, emqx_session_trie).
+-define(SESSION_DISC_TRIE, emqx_session_trie_disc).
+-define(SESSION_RAM_TRIE, emqx_session_trie_ram).
 -define(PREFIX(Prefix), {Prefix, 0}).
 -define(TOPIC(Topic), {Topic, 1}).
 
@@ -75,16 +76,27 @@ mnesia(boot) ->
                 {type, ordered_set},
                 {storage_properties, StoreProps}]).
 
-create_session_trie() ->
+create_session_trie(disc) ->
     StoreProps = [{ets, [{read_concurrency, true},
                          {write_concurrency, true}
                         ]}],
-    ok = mria:create_table(?SESSION_TRIE,
+    ok = mria:create_table(?SESSION_DISC_TRIE,
                            [{rlog_shard, ?ROUTE_SHARD},
                             {storage, disc_copies},
                             {record_name, ?TRIE},
                             {attributes, record_info(fields, ?TRIE)},
                             {type, ordered_set},
+                            {storage_properties, StoreProps}]);
+create_session_trie(ram) ->
+    StoreProps = [{ets, [{read_concurrency, true},
+                         {write_concurrency, true}
+                        ]}],
+    ok = mria:create_table(?SESSION_RAM_TRIE,
+                           [{rlog_shard, ?ROUTE_SHARD},
+                            {storage, ram_copies},
+                            {record_name, ?TRIE},
+                            {attributes, record_info(fields, ?TRIE)},
+                            {type, ordered_set},
                             {storage_properties, StoreProps}]).
 
 %%--------------------------------------------------------------------
@@ -98,7 +110,7 @@ insert(Topic) when is_binary(Topic) ->
 
 -spec(insert_session(emqx_topic:topic()) -> ok).
 insert_session(Topic) when is_binary(Topic) ->
-    insert(Topic, ?SESSION_TRIE).
+    insert(Topic, session_trie()).
 
 insert(Topic, Trie) when is_binary(Topic) ->
     {TopicKey, PrefixKeys} = make_keys(Topic),
@@ -115,7 +127,7 @@ delete(Topic) when is_binary(Topic) ->
 %% @doc Delete a topic filter from the trie.
 -spec(delete_session(emqx_topic:topic()) -> ok).
 delete_session(Topic) when is_binary(Topic) ->
-    delete(Topic, ?SESSION_TRIE).
+    delete(Topic, session_trie()).
 
 delete(Topic, Trie) when is_binary(Topic) ->
     {TopicKey, PrefixKeys} = make_keys(Topic),
@@ -131,7 +143,7 @@ match(Topic) when is_binary(Topic) ->
 
 -spec(match_session(emqx_topic:topic()) -> list(emqx_topic:topic())).
 match_session(Topic) when is_binary(Topic) ->
-    match(Topic, ?SESSION_TRIE).
+    match(Topic, session_trie()).
 
 match(Topic, Trie) when is_binary(Topic) ->
     Words = emqx_topic:words(Topic),
@@ -154,7 +166,7 @@ match(Topic, Trie) when is_binary(Topic) ->
 empty() -> empty(?TRIE).
 
 empty_session() ->
-    empty(?SESSION_TRIE).
+    empty(session_trie()).
 
 empty(Trie) -> ets:first(Trie) =:= '$end_of_table'.
 
@@ -164,12 +176,19 @@ lock_tables() ->
 
 -spec lock_session_tables() -> ok.
 lock_session_tables() ->
-    mnesia:write_lock_table(?SESSION_TRIE).
+    mnesia:write_lock_table(session_trie()).
 
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------
 
+session_trie() ->
+    case emqx_persistent_session:storage_type() of
+        disc -> ?SESSION_DISC_TRIE;
+        ram  -> ?SESSION_RAM_TRIE
+    end.
+
+
 make_keys(Topic) ->
     Words = emqx_topic:words(Topic),
     {?TOPIC(Topic), [?PREFIX(Prefix) || Prefix <- make_prefixes(Words)]}.

+ 39 - 16
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -50,13 +50,19 @@ groups() ->
     SnabbkaffeTCs = [TC || TC <- TCs, is_snabbkaffe_tc(TC)],
     GCTests  = [TC || TC <- TCs, is_gc_tc(TC)],
     OtherTCs = (TCs -- SnabbkaffeTCs) -- GCTests,
-    [ {persistent_store_enabled, [ {group, no_kill_connection_process}
-                                 , {group,    kill_connection_process}
-                                 , {group, snabbkaffe}
-                                 , {group, gc_tests}
+    [ {persistent_store_enabled, [ {group, ram_tables}
+                                 , {group, disc_tables}
                                  ]}
     , {persistent_store_disabled, [ {group, no_kill_connection_process}
                                   ]}
+    , {  ram_tables, [], [ {group, no_kill_connection_process}
+                         , {group,    kill_connection_process}
+                         , {group, snabbkaffe}
+                         , {group, gc_tests}]}
+    , { disc_tables, [], [ {group, no_kill_connection_process}
+                         , {group,    kill_connection_process}
+                         , {group, snabbkaffe}
+                         , {group, gc_tests}]}
     , {no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]}
     , {   kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]}
     , {snabbkaffe, [], [{group, tcp_snabbkaffe}, {group, quic_snabbkaffe}, {group, ws_snabbkaffe}]}
@@ -76,15 +82,23 @@ is_gc_tc(TC) ->
     re:run(atom_to_list(TC), "^t_gc_") /= nomatch.
 
 init_per_group(persistent_store_enabled, Config) ->
+    [{persistent_store_enabled, true}|Config];
+init_per_group(Group, Config) when Group =:= ram_tables; Group =:= disc_tables ->
     %% Start Apps
+    Reply = case Group =:= ram_tables of
+                true  -> ram;
+                false -> disc
+            end,
     emqx_common_test_helpers:boot_modules(all),
     meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]),
-    meck:expect(emqx_config, get, fun(?is_enabled_key) -> true;
-                                   (Other) -> meck:passthrough([Other])
+    meck:expect(emqx_config, get, fun(?storage_type_key) -> Reply;
+                                     (?is_enabled_key) -> true;
+                                     (Other) -> meck:passthrough([Other])
                                 end),
     emqx_common_test_helpers:start_apps([], fun set_special_confs/1),
     ?assertEqual(true, emqx_persistent_session:is_store_enabled()),
-    [{persistent_store_enabled, true}|Config];
+    ?assertEqual(Reply, emqx_persistent_session:storage_type()),
+    Config;
 init_per_group(persistent_store_disabled, Config) ->
     %% Start Apps
     emqx_common_test_helpers:boot_modules(all),
@@ -125,15 +139,20 @@ init_per_group(gc_tests, Config) ->
                         receive stop -> ok end
                 end),
     meck:new(mnesia, [non_strict, passthrough, no_history, no_link]),
-    meck:expect(mnesia, dirty_first, fun(?SESS_MSG_TAB) -> ets:first(SessionMsgEts);
-                                        (?MSG_TAB) -> ets:first(MsgEts);
-                                        (X) -> meck:passthrough(X)
+    meck:expect(mnesia, dirty_first, fun(?SESS_MSG_TAB_RAM) -> ets:first(SessionMsgEts);
+                                        (?SESS_MSG_TAB_DISC) -> ets:first(SessionMsgEts);
+                                        (?MSG_TAB_RAM) -> ets:first(MsgEts);
+                                        (?MSG_TAB_DISC) -> ets:first(MsgEts);
+                                        (X) -> meck:passthrough([X])
                                      end),
-    meck:expect(mnesia, dirty_next, fun(?SESS_MSG_TAB, X) -> ets:next(SessionMsgEts, X);
-                                       (?MSG_TAB, X) -> ets:next(MsgEts, X);
+    meck:expect(mnesia, dirty_next, fun(?SESS_MSG_TAB_RAM, X) -> ets:next(SessionMsgEts, X);
+                                       (?SESS_MSG_TAB_DISC, X) -> ets:next(SessionMsgEts, X);
+                                       (?MSG_TAB_RAM, X) -> ets:next(MsgEts, X);
+                                       (?MSG_TAB_DISC, X) -> ets:next(MsgEts, X);
                                        (Tab, X) -> meck:passthrough([Tab, X])
                                     end),
-    meck:expect(mnesia, dirty_delete, fun(?MSG_TAB, X) -> ets:delete(MsgEts, X);
+    meck:expect(mnesia, dirty_delete, fun(?MSG_TAB_RAM, X) -> ets:delete(MsgEts, X);
+                                         (?MSG_TAB_DISC, X) -> ets:delete(MsgEts, X);
                                          (Tab, X) -> meck:passthrough([Tab, X])
                                       end),
     [{store_owner, Pid}, {session_msg_store, SessionMsgEts}, {msg_store, MsgEts} | Config].
@@ -154,7 +173,7 @@ end_per_group(gc_tests, Config) ->
     meck:unload(mnesia),
     ?config(store_owner, Config) ! stop,
     ok;
-end_per_group(persistent_store_enabled, _Config) ->
+end_per_group(Group, _Config) when Group =:= ram_tables; Group =:= disc_tables ->
     meck:unload(emqx_config),
     emqx_common_test_helpers:stop_apps([]);
 end_per_group(persistent_store_disabled, _Config) ->
@@ -250,7 +269,7 @@ maybe_kill_connection_process(ClientId, Config) ->
     end.
 
 wait_for_cm_unregister(ClientId) ->
-    wait_for_cm_unregister(ClientId, 10).
+    wait_for_cm_unregister(ClientId, 100).
 
 wait_for_cm_unregister(_ClientId, 0) ->
     error(cm_did_not_unregister);
@@ -884,12 +903,16 @@ t_snabbkaffe_buffered_messages(Config) ->
            %% Make the resume init phase wait until the first message is delivered.
            ?force_ordering( #{ ?snk_kind := ps_worker_deliver },
                             #{ ?snk_kind := ps_resume_end }),
+           Parent = self(),
            spawn_link(fun() ->
                               ?block_until(#{?snk_kind := ps_marker_pendings_msgs}, infinity, 5000),
-                              publish(Topic, Payloads2, true)
+                              publish(Topic, Payloads2, true),
+                              Parent ! publish_done,
+                              ok
                       end),
            {ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]),
            {ok, _} = emqtt:ConnFun(Client2),
+           receive publish_done -> ok after 10000 -> error(too_long_to_publish) end,
            Msgs = receive_messages(length(Payloads1) + length(Payloads2) + 1),
            ReceivedPayloads = [P || #{ payload := P } <- Msgs],
            ?assertEqual(lists:sort(Payloads1 ++ Payloads2),