Ver código fonte

feat(persistent_session): Make schema more flexible

ieQu1 3 anos atrás
pai
commit
5221c6b2f9

+ 62 - 13
apps/emqx/i18n/emqx_schema_i18n.conf

@@ -509,22 +509,38 @@ emqx_schema {
     }
   }
 
-  persistent_session_store_storage_type {
+  persistent_session_store_backend {
     desc {
-        en: "Store information about persistent sessions on disc or in ram.\n"
-            "If ram is chosen, all information about persistent sessions remains\n"
-            "as long as at least one node in a cluster is alive to keep the information.\n"
-            "If disc is chosen, the information is persisted on disc and will survive\n"
-            "cluster restart, at the price of more disc usage and less throughput.\n"
-        zh: "将有关持久会话的信息存储在磁盘或内存中。\n"
-            "如果选择了ram,有关持久会话的所有信息将保留\n"
-            "只要群集中至少有一个节点处于活动状态,就可以保留信息。\n"
-            "如果选择了光盘,则信息将保留在光盘上,并且将继续存在\n"
-            "群集重新启动,代价是磁盘使用量增加,吞吐量降低。\n"
+        en: "Database backend used to store information about the persistent sessions.\n"
+            "- `builtin`: Use an embedded database (mria)"
+        zh: "用于存储持久性会话信息的数据库后端\n"
+            "- `builtin`: 使用一个嵌入式数据库(mria)"
     }
     label: {
-        en: "Storage type"
-        zh: "存储类型"
+        en: "Backend"
+        zh: "后台"
+    }
+  }
+
+  persistent_store_on_disc {
+    desc {
+        en: "Persist session data on disc. If `false`, the data will be stored in RAM."
+        zh: "将会话数据保留在磁盘上。如果`false`,数据将被存储在RAM中。"
+    }
+    label: {
+        en: "Persist on disc"
+        zh: "坚持 在光盘上"
+    }
+  }
+
+  persistent_store_ram_cache {
+    desc {
+        en: "Maintain a copy of the data in RAM for faster access."
+        zh: "在RAM中保持一份数据的副本,以便更快地访问。"
+    }
+    label: {
+        en: "RAM cache"
+        zh: "RAM缓存"
     }
   }
 
@@ -569,6 +585,39 @@ emqx_schema {
     }
   }
 
+  persistent_session_builtin_session_table {
+    desc {
+         en: "Tuning for built-in session table."
+         zh: "对内置的会话表进行调整。"
+    }
+    label: {
+         en: "Persistent session."
+         zh: "持久会话。"
+    }
+  }
+
+  persistent_session_builtin_sess_msg_table {
+    desc {
+         en: "Tuning for built-in session messages table."
+         zh: "对内置的会话信息表进行调整。"
+    }
+    label: {
+         en: "Persistent session messages."
+         zh: "持久的会话信息。"
+    }
+  }
+
+  persistent_session_builtin_messages_table {
+    desc {
+         en: "Tuning for built-in messages table."
+         zh: "对内置信息表进行调校。"
+    }
+    label: {
+         en: "Persistent messages."
+         zh: "持久性信息。"
+    }
+  }
+
   stats_enable {
     desc {
         en: "Enable/disable statistic data collection."

+ 0 - 121
apps/emqx/src/emqx_persistent_session_mnesia_disc_backend.erl

@@ -1,121 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_persistent_session_mnesia_disc_backend).
-
--include("emqx.hrl").
--include("emqx_persistent_session.hrl").
-
--export([
-    create_tables/0,
-    first_message_id/0,
-    next_message_id/1,
-    delete_message/1,
-    first_session_message/0,
-    next_session_message/1,
-    delete_session_message/1,
-    put_session_store/1,
-    delete_session_store/1,
-    lookup_session_store/1,
-    put_session_message/1,
-    put_message/1,
-    get_message/1,
-    ro_transaction/1
-]).
-
-create_tables() ->
-    ok = mria:create_table(?SESSION_STORE_DISC, [
-        {type, set},
-        {rlog_shard, ?PERSISTENT_SESSION_SHARD},
-        {storage, disc_copies},
-        {record_name, session_store},
-        {attributes, record_info(fields, session_store)},
-        {storage_properties, [{ets, [{read_concurrency, true}]}]}
-    ]),
-
-    ok = mria:create_table(?SESS_MSG_TAB_DISC, [
-        {type, ordered_set},
-        {rlog_shard, ?PERSISTENT_SESSION_SHARD},
-        {storage, disc_copies},
-        {record_name, session_msg},
-        {attributes, record_info(fields, session_msg)},
-        {storage_properties, [
-            {ets, [
-                {read_concurrency, true},
-                {write_concurrency, true}
-            ]}
-        ]}
-    ]),
-
-    ok = mria:create_table(?MSG_TAB_DISC, [
-        {type, ordered_set},
-        {rlog_shard, ?PERSISTENT_SESSION_SHARD},
-        {storage, disc_copies},
-        {record_name, message},
-        {attributes, record_info(fields, message)},
-        {storage_properties, [
-            {ets, [
-                {read_concurrency, true},
-                {write_concurrency, true}
-            ]}
-        ]}
-    ]).
-
-first_session_message() ->
-    mnesia:dirty_first(?SESS_MSG_TAB_DISC).
-
-next_session_message(Key) ->
-    mnesia:dirty_next(?SESS_MSG_TAB_DISC, Key).
-
-first_message_id() ->
-    mnesia:dirty_first(?MSG_TAB_DISC).
-
-next_message_id(Key) ->
-    mnesia:dirty_next(?MSG_TAB_DISC, Key).
-
-delete_message(Key) ->
-    mria:dirty_delete(?MSG_TAB_DISC, Key).
-
-delete_session_message(Key) ->
-    mria:dirty_delete(?SESS_MSG_TAB_DISC, Key).
-
-put_session_store(SS) ->
-    mria:dirty_write(?SESSION_STORE_DISC, SS).
-
-delete_session_store(ClientID) ->
-    mria:dirty_delete(?SESSION_STORE_DISC, ClientID).
-
-lookup_session_store(ClientID) ->
-    case mnesia:dirty_read(?SESSION_STORE_DISC, ClientID) of
-        [] -> none;
-        [SS] -> {value, SS}
-    end.
-
-put_session_message(SessMsg) ->
-    mria:dirty_write(?SESS_MSG_TAB_DISC, SessMsg).
-
-put_message(Msg) ->
-    mria:dirty_write(?MSG_TAB_DISC, Msg).
-
-get_message(MsgId) ->
-    case mnesia:read(?MSG_TAB_DISC, MsgId) of
-        [] -> error({msg_not_found, MsgId});
-        [Msg] -> Msg
-    end.
-
-ro_transaction(Fun) ->
-    {atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun),
-    Res.

+ 0 - 121
apps/emqx/src/emqx_persistent_session_mnesia_ram_backend.erl

@@ -1,121 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_persistent_session_mnesia_ram_backend).
-
--include("emqx.hrl").
--include("emqx_persistent_session.hrl").
-
--export([
-    create_tables/0,
-    first_message_id/0,
-    next_message_id/1,
-    delete_message/1,
-    first_session_message/0,
-    next_session_message/1,
-    delete_session_message/1,
-    put_session_store/1,
-    delete_session_store/1,
-    lookup_session_store/1,
-    put_session_message/1,
-    put_message/1,
-    get_message/1,
-    ro_transaction/1
-]).
-
-create_tables() ->
-    ok = mria:create_table(?SESSION_STORE_RAM, [
-        {type, set},
-        {rlog_shard, ?PERSISTENT_SESSION_SHARD},
-        {storage, ram_copies},
-        {record_name, session_store},
-        {attributes, record_info(fields, session_store)},
-        {storage_properties, [{ets, [{read_concurrency, true}]}]}
-    ]),
-
-    ok = mria:create_table(?SESS_MSG_TAB_RAM, [
-        {type, ordered_set},
-        {rlog_shard, ?PERSISTENT_SESSION_SHARD},
-        {storage, ram_copies},
-        {record_name, session_msg},
-        {attributes, record_info(fields, session_msg)},
-        {storage_properties, [
-            {ets, [
-                {read_concurrency, true},
-                {write_concurrency, true}
-            ]}
-        ]}
-    ]),
-
-    ok = mria:create_table(?MSG_TAB_RAM, [
-        {type, ordered_set},
-        {rlog_shard, ?PERSISTENT_SESSION_SHARD},
-        {storage, ram_copies},
-        {record_name, message},
-        {attributes, record_info(fields, message)},
-        {storage_properties, [
-            {ets, [
-                {read_concurrency, true},
-                {write_concurrency, true}
-            ]}
-        ]}
-    ]).
-
-first_session_message() ->
-    mnesia:dirty_first(?SESS_MSG_TAB_RAM).
-
-next_session_message(Key) ->
-    mnesia:dirty_next(?SESS_MSG_TAB_RAM, Key).
-
-first_message_id() ->
-    mnesia:dirty_first(?MSG_TAB_RAM).
-
-next_message_id(Key) ->
-    mnesia:dirty_next(?MSG_TAB_RAM, Key).
-
-delete_message(Key) ->
-    mria:dirty_delete(?MSG_TAB_RAM, Key).
-
-delete_session_message(Key) ->
-    mria:dirty_delete(?SESS_MSG_TAB_RAM, Key).
-
-put_session_store(SS) ->
-    mria:dirty_write(?SESSION_STORE_RAM, SS).
-
-delete_session_store(ClientID) ->
-    mria:dirty_delete(?SESSION_STORE_RAM, ClientID).
-
-lookup_session_store(ClientID) ->
-    case mnesia:dirty_read(?SESSION_STORE_RAM, ClientID) of
-        [] -> none;
-        [SS] -> {value, SS}
-    end.
-
-put_session_message(SessMsg) ->
-    mria:dirty_write(?SESS_MSG_TAB_RAM, SessMsg).
-
-put_message(Msg) ->
-    mria:dirty_write(?MSG_TAB_RAM, Msg).
-
-get_message(MsgId) ->
-    case mnesia:read(?MSG_TAB_RAM, MsgId) of
-        [] -> error({msg_not_found, MsgId});
-        [Msg] -> Msg
-    end.
-
-ro_transaction(Fun) ->
-    {atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun),
-    Res.

+ 59 - 4
apps/emqx/src/emqx_schema.erl

@@ -212,12 +212,36 @@ fields("persistent_session_store") ->
                     desc => ?DESC(persistent_session_store_enabled)
                 }
             )},
-        {"storage_type",
+        {"on_disc",
             sc(
-                hoconsc:union([ram, disc]),
+                boolean(),
+                #{
+                    default => true,
+                    desc => ?DESC(persistent_store_on_disc)
+                }
+            )},
+        {"ram_cache",
+            sc(
+                boolean(),
+                #{
+                    default => false,
+                    desc => ?DESC(persistent_store_ram_cache)
+                }
+            )},
+        {"backend",
+            sc(
+                hoconsc:union([ref("persistent_session_builtin")]),
                 #{
-                    default => disc,
-                    desc => ?DESC(persistent_session_store_storage_type)
+                    default => #{
+                        <<"type">> => <<"builtin">>,
+                        <<"session">> =>
+                            #{<<"ram_cache">> => <<"true">>},
+                        <<"session_messages">> =>
+                            #{<<"ram_cache">> => <<"true">>},
+                        <<"messages">> =>
+                            #{<<"ram_cache">> => <<"false">>}
+                    },
+                    desc => ?DESC(persistent_session_store_backend)
                 }
             )},
         {"max_retain_undelivered",
@@ -245,6 +269,33 @@ fields("persistent_session_store") ->
                 }
             )}
     ];
+fields("persistent_table_mria_opts") ->
+    [
+        {"ram_cache",
+            sc(
+                boolean(),
+                #{
+                    default => true,
+                    desc => ?DESC(persistent_store_ram_cache)
+                }
+            )}
+    ];
+fields("persistent_session_builtin") ->
+    [
+        {"type", sc(hoconsc:enum([builtin]), #{default => builtin, desc => ""})},
+        {"session",
+            sc(ref("persistent_table_mria_opts"), #{
+                desc => ?DESC(persistent_session_builtin_session_table)
+            })},
+        {"session_messages",
+            sc(ref("persistent_table_mria_opts"), #{
+                desc => ?DESC(persistent_session_builtin_sess_msg_table)
+            })},
+        {"messages",
+            sc(ref("persistent_table_mria_opts"), #{
+                desc => ?DESC(persistent_session_builtin_messages_table)
+            })}
+    ];
 fields("stats") ->
     [
         {"enable",
@@ -1526,6 +1577,10 @@ base_listener() ->
 
 desc("persistent_session_store") ->
     "Settings for message persistence.";
+desc("persistent_session_builtin") ->
+    "Settings for the built-in storage engine of persistent messages.";
+desc("persistent_table_mria_opts") ->
+    "Tuning options for the mria table.";
 desc("stats") ->
     "Enable/disable statistic data collection.\n"
     "Statistic data such as message receive/send count/rate etc. "

+ 10 - 27
apps/emqx/src/emqx_trie.erl

@@ -50,8 +50,7 @@
 -compile(nowarn_export_all).
 -endif.
 
--define(SESSION_DISC_TRIE, emqx_session_trie_disc).
--define(SESSION_RAM_TRIE, emqx_session_trie_ram).
+-define(SESSION_TRIE, emqx_session_trie).
 -define(PREFIX(Prefix), {Prefix, 0}).
 -define(TOPIC(Topic), {Topic, 1}).
 
@@ -82,25 +81,12 @@ mnesia(boot) ->
         {storage_properties, StoreProps}
     ]).
 
-create_session_trie(disc) ->
-    StoreProps = [
-        {ets, [
-            {read_concurrency, true},
-            {write_concurrency, true}
-        ]}
-    ],
-    ok = mria:create_table(
-        ?SESSION_DISC_TRIE,
-        [
-            {rlog_shard, ?ROUTE_SHARD},
-            {storage, disc_copies},
-            {record_name, ?TRIE},
-            {attributes, record_info(fields, ?TRIE)},
-            {type, ordered_set},
-            {storage_properties, StoreProps}
-        ]
-    );
-create_session_trie(ram) ->
+create_session_trie(Type) ->
+    Storage =
+        case Type of
+            disc -> disc_copies;
+            ram -> ram_copies
+        end,
     StoreProps = [
         {ets, [
             {read_concurrency, true},
@@ -108,10 +94,10 @@ create_session_trie(ram) ->
         ]}
     ],
     ok = mria:create_table(
-        ?SESSION_RAM_TRIE,
+        ?SESSION_TRIE,
         [
             {rlog_shard, ?ROUTE_SHARD},
-            {storage, ram_copies},
+            {storage, Storage},
             {record_name, ?TRIE},
             {attributes, record_info(fields, ?TRIE)},
             {type, ordered_set},
@@ -204,10 +190,7 @@ lock_session_tables() ->
 %%--------------------------------------------------------------------
 
 session_trie() ->
-    case emqx_persistent_session:storage_type() of
-        disc -> ?SESSION_DISC_TRIE;
-        ram -> ?SESSION_RAM_TRIE
-    end.
+    ?SESSION_TRIE.
 
 make_keys(Topic) ->
     Words = emqx_topic:words(Topic),

+ 19 - 12
apps/emqx/src/emqx_persistent_session.erl

@@ -19,6 +19,7 @@
 -export([
     is_store_enabled/0,
     init_db_backend/0,
+    storage_backend/0,
     storage_type/0
 ]).
 
@@ -81,6 +82,9 @@
 
 -type gc_traverse_fun() :: fun(('delete' | 'marker' | 'abandoned', sess_msg_key()) -> 'ok').
 
+%% EMQX configuration keys
+-define(conf_storage_backend, [persistent_session_store, backend, type]).
+
 %%--------------------------------------------------------------------
 %% Init
 %%--------------------------------------------------------------------
@@ -91,27 +95,23 @@ init_db_backend() ->
             StorageType = storage_type(),
             ok = emqx_trie:create_session_trie(StorageType),
             ok = emqx_session_router:create_router_tab(StorageType),
-            case StorageType of
-                disc ->
-                    emqx_persistent_session_mnesia_disc_backend:create_tables(),
-                    persistent_term:put(
-                        ?db_backend_key, emqx_persistent_session_mnesia_disc_backend
-                    );
-                ram ->
-                    emqx_persistent_session_mnesia_ram_backend:create_tables(),
-                    persistent_term:put(?db_backend_key, emqx_persistent_session_mnesia_ram_backend)
+            case storage_backend() of
+                builtin ->
+                    emqx_persistent_session_backend_builtin:create_tables(),
+                    persistent_term:put(?db_backend_key, emqx_persistent_session_backend_builtin)
             end,
             ok;
         false ->
-            persistent_term:put(?db_backend_key, emqx_persistent_session_dummy_backend),
+            persistent_term:put(?db_backend_key, emqx_persistent_session_backend_dummy),
             ok
     end.
 
 is_store_enabled() ->
     emqx_config:get(?is_enabled_key).
 
-storage_type() ->
-    emqx_config:get(?storage_type_key).
+-spec storage_backend() -> builtin.
+storage_backend() ->
+    emqx_config:get(?conf_storage_backend).
 
 %%--------------------------------------------------------------------
 %% Session message ADT API
@@ -557,3 +557,10 @@ gc_traverse({S, _MsgID, _STopic, ?DELIVERED} = Key, SessionID, Abandoned, Fun) -
     %% We have a message that is marked as ?DELIVERED, but the ?UNDELIVERED is missing.
     NewAbandoned = S =:= SessionID andalso Abandoned,
     gc_traverse(next_session_message(Key), S, NewAbandoned, Fun).
+
+-spec storage_type() -> ram | disc.
+storage_type() ->
+    case emqx_config:get(?on_disc_key) of
+        true -> disc;
+        false -> ram
+    end.

+ 9 - 13
apps/emqx/src/emqx_persistent_session.hrl

@@ -14,15 +14,6 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
--define(SESSION_STORE_DISC, emqx_session_store_disc).
--define(SESSION_STORE_RAM, emqx_session_store_ram).
-
--define(SESS_MSG_TAB_DISC, emqx_session_msg_disc).
--define(SESS_MSG_TAB_RAM, emqx_session_msg_ram).
-
--define(MSG_TAB_DISC, emqx_persistent_msg_disc).
--define(MSG_TAB_RAM, emqx_persistent_msg_ram).
-
 -record(session_store, {
     client_id :: binary(),
     expiry_interval :: non_neg_integer(),
@@ -35,9 +26,14 @@
     val = [] :: []
 }).
 
--define(db_backend_key, [persistent_session_store, db_backend]).
--define(is_enabled_key, [persistent_session_store, enabled]).
--define(storage_type_key, [persistent_session_store, storage_type]).
--define(msg_retain, [persistent_session_store, max_retain_undelivered]).
+-define(cfg_root, persistent_session_store).
+-define(db_backend_key, [?cfg_root, db_backend]).
+-define(is_enabled_key, [?cfg_root, enabled]).
+-define(msg_retain, [?cfg_root, max_retain_undelivered]).
+-define(on_disc_key, [?cfg_root, on_disc]).
+
+-define(SESSION_STORE, emqx_session_store).
+-define(SESS_MSG_TAB, emqx_session_msg).
+-define(MSG_TAB, emqx_persistent_msg).
 
 -define(db_backend, (persistent_term:get(?db_backend_key))).

+ 150 - 0
apps/emqx/src/persistent_session/emqx_persistent_session_backend_builtin.erl

@@ -0,0 +1,150 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_persistent_session_backend_builtin).
+
+-include("emqx.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include("emqx_persistent_session.hrl").
+
+-export([
+    create_tables/0,
+    first_message_id/0,
+    next_message_id/1,
+    delete_message/1,
+    first_session_message/0,
+    next_session_message/1,
+    delete_session_message/1,
+    put_session_store/1,
+    delete_session_store/1,
+    lookup_session_store/1,
+    put_session_message/1,
+    put_message/1,
+    get_message/1,
+    ro_transaction/1
+]).
+
+-type mria_table_type() :: ram_copies | disc_copies | rocksdb_copies.
+
+-define(IS_ETS(BACKEND), (BACKEND =:= ram_copies orelse BACKEND =:= disc_copies)).
+
+create_tables() ->
+    SessStoreBackend = table_type(session),
+    ok = mria:create_table(?SESSION_STORE, [
+        {type, set},
+        {rlog_shard, ?PERSISTENT_SESSION_SHARD},
+        {storage, SessStoreBackend},
+        {record_name, session_store},
+        {attributes, record_info(fields, session_store)},
+        {storage_properties, storage_properties(?SESSION_STORE, SessStoreBackend)}
+    ]),
+
+    SessMsgBackend = table_type(session_messages),
+    ok = mria:create_table(?SESS_MSG_TAB, [
+        {type, ordered_set},
+        {rlog_shard, ?PERSISTENT_SESSION_SHARD},
+        {storage, SessMsgBackend},
+        {record_name, session_msg},
+        {attributes, record_info(fields, session_msg)},
+        {storage_properties, storage_properties(?SESS_MSG_TAB, SessMsgBackend)}
+    ]),
+
+    MsgBackend = table_type(messages),
+    ok = mria:create_table(?MSG_TAB, [
+        {type, ordered_set},
+        {rlog_shard, ?PERSISTENT_SESSION_SHARD},
+        {storage, MsgBackend},
+        {record_name, message},
+        {attributes, record_info(fields, message)},
+        {storage_properties, storage_properties(?MSG_TAB, MsgBackend)}
+    ]).
+
+first_session_message() ->
+    mnesia:dirty_first(?SESS_MSG_TAB).
+
+next_session_message(Key) ->
+    mnesia:dirty_next(?SESS_MSG_TAB, Key).
+
+first_message_id() ->
+    mnesia:dirty_first(?MSG_TAB).
+
+next_message_id(Key) ->
+    mnesia:dirty_next(?MSG_TAB, Key).
+
+delete_message(Key) ->
+    mria:dirty_delete(?MSG_TAB, Key).
+
+delete_session_message(Key) ->
+    mria:dirty_delete(?SESS_MSG_TAB, Key).
+
+put_session_store(SS) ->
+    mria:dirty_write(?SESSION_STORE, SS).
+
+delete_session_store(ClientID) ->
+    mria:dirty_delete(?SESSION_STORE, ClientID).
+
+lookup_session_store(ClientID) ->
+    case mnesia:dirty_read(?SESSION_STORE, ClientID) of
+        [] -> none;
+        [SS] -> {value, SS}
+    end.
+
+put_session_message(SessMsg) ->
+    mria:dirty_write(?SESS_MSG_TAB, SessMsg).
+
+put_message(Msg) ->
+    mria:dirty_write(?MSG_TAB, Msg).
+
+get_message(MsgId) ->
+    case mnesia:read(?MSG_TAB, MsgId) of
+        [] -> error({msg_not_found, MsgId});
+        [Msg] -> Msg
+    end.
+
+ro_transaction(Fun) ->
+    {atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun),
+    Res.
+
+-spec storage_properties(?SESSION_STORE | ?SESS_MSG_TAB | ?MSG_TAB, mria_table_type()) -> term().
+storage_properties(?SESSION_STORE, Backend) when ?IS_ETS(Backend) ->
+    [{ets, [{read_concurrency, true}]}];
+storage_properties(_, Backend) when ?IS_ETS(Backend) ->
+    [
+        {ets, [
+            {read_concurrency, true},
+            {write_concurrency, true}
+        ]}
+    ];
+storage_properties(_, _) ->
+    [].
+
+-spec table_type(atom()) -> mria_table_type().
+table_type(Table) ->
+    DiscPersistence = emqx_config:get([?cfg_root, on_disc]),
+    RamCache = get_overlayed(Table, ram_cache),
+    case {DiscPersistence, RamCache} of
+        {true, true} ->
+            disc_copies;
+        {true, false} ->
+            rocksdb_copies;
+        {false, _} ->
+            ram_copies
+    end.
+
+-spec get_overlayed(atom(), on_disc | ram_cache) -> boolean().
+get_overlayed(Table, Suffix) ->
+    Default = emqx_config:get([?cfg_root, Suffix]),
+    emqx_config:get([?cfg_root, backend, Table, Suffix], Default).

+ 1 - 1
apps/emqx/src/emqx_persistent_session_dummy_backend.erl

@@ -14,7 +14,7 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
--module(emqx_persistent_session_dummy_backend).
+-module(emqx_persistent_session_backend_dummy).
 
 -include("emqx_persistent_session.hrl").
 

apps/emqx/src/emqx_persistent_session_gc.erl → apps/emqx/src/persistent_session/emqx_persistent_session_gc.erl


apps/emqx/src/emqx_persistent_session_sup.erl → apps/emqx/src/persistent_session/emqx_persistent_session_sup.erl


+ 2 - 2
apps/emqx/test/emqx_bpapi_static_checks.erl

@@ -168,8 +168,8 @@ typecheck_apis(
                     [
                         logger:error(
                             "Incompatible RPC call: "
-                            "type of the parameter ~p of RPC call ~s on release ~p "
-                            "is not a subtype of the target function ~s on release ~p.~n"
+                            "type of the parameter ~p of RPC call ~s in release ~p "
+                            "is not a subtype of the target function ~s in release ~p.~n"
                             "Caller type: ~s~nCallee type: ~s~n",
                             [
                                 Var,

+ 16 - 14
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -20,7 +20,7 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("../include/emqx.hrl").
--include("../src/emqx_persistent_session.hrl").
+-include("../src/persistent_session/emqx_persistent_session.hrl").
 
 -compile(export_all).
 -compile(nowarn_export_all).
@@ -69,6 +69,12 @@ groups() ->
             {group, snabbkaffe},
             {group, gc_tests}
         ]},
+        {rocks_tables, [], [
+            {group, no_kill_connection_process},
+            {group, kill_connection_process},
+            {group, snabbkaffe},
+            {group, gc_tests}
+        ]},
         {no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]},
         {kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]},
         {snabbkaffe, [], [
@@ -101,13 +107,12 @@ init_per_group(Group, Config) when Group =:= ram_tables; Group =:= disc_tables -
     emqx_common_test_helpers:boot_modules(all),
     meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]),
     meck:expect(emqx_config, get, fun
-        (?storage_type_key) -> Reply;
+        (?on_disc_key) -> Reply =:= disc;
         (?is_enabled_key) -> true;
         (Other) -> meck:passthrough([Other])
     end),
     emqx_common_test_helpers:start_apps([], fun set_special_confs/1),
     ?assertEqual(true, emqx_persistent_session:is_store_enabled()),
-    ?assertEqual(Reply, emqx_persistent_session:storage_type()),
     Config;
 init_per_group(persistent_store_disabled, Config) ->
     %% Start Apps
@@ -159,22 +164,17 @@ init_per_group(gc_tests, Config) ->
     end),
     meck:new(mnesia, [non_strict, passthrough, no_history, no_link]),
     meck:expect(mnesia, dirty_first, fun
-        (?SESS_MSG_TAB_RAM) -> ets:first(SessionMsgEts);
-        (?SESS_MSG_TAB_DISC) -> ets:first(SessionMsgEts);
-        (?MSG_TAB_RAM) -> ets:first(MsgEts);
-        (?MSG_TAB_DISC) -> ets:first(MsgEts);
+        (?SESS_MSG_TAB) -> ets:first(SessionMsgEts);
+        (?MSG_TAB) -> ets:first(MsgEts);
         (X) -> meck:passthrough([X])
     end),
     meck:expect(mnesia, dirty_next, fun
-        (?SESS_MSG_TAB_RAM, X) -> ets:next(SessionMsgEts, X);
-        (?SESS_MSG_TAB_DISC, X) -> ets:next(SessionMsgEts, X);
-        (?MSG_TAB_RAM, X) -> ets:next(MsgEts, X);
-        (?MSG_TAB_DISC, X) -> ets:next(MsgEts, X);
+        (?SESS_MSG_TAB, X) -> ets:next(SessionMsgEts, X);
+        (?MSG_TAB, X) -> ets:next(MsgEts, X);
         (Tab, X) -> meck:passthrough([Tab, X])
     end),
     meck:expect(mnesia, dirty_delete, fun
-        (?MSG_TAB_RAM, X) -> ets:delete(MsgEts, X);
-        (?MSG_TAB_DISC, X) -> ets:delete(MsgEts, X);
+        (?MSG_TAB, X) -> ets:delete(MsgEts, X);
         (Tab, X) -> meck:passthrough([Tab, X])
     end),
     [{store_owner, Pid}, {session_msg_store, SessionMsgEts}, {msg_store, MsgEts} | Config].
@@ -193,7 +193,9 @@ end_per_group(gc_tests, Config) ->
     meck:unload(mnesia),
     ?config(store_owner, Config) ! stop,
     ok;
-end_per_group(Group, _Config) when Group =:= ram_tables; Group =:= disc_tables ->
+end_per_group(Group, _Config) when
+    Group =:= ram_tables; Group =:= disc_tables; Group =:= rocks_tables
+->
     meck:unload(emqx_config),
     emqx_common_test_helpers:stop_apps([]);
 end_per_group(persistent_store_disabled, _Config) ->