Просмотр исходного кода

feat(session): move session data to DS

Fixes https://emqx.atlassian.net/browse/EMQX-11841
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
d1b0a986c7

+ 11 - 1
apps/emqx/src/emqx_persistent_message.erl

@@ -47,7 +47,7 @@ init() ->
         ?SLOG(notice, #{msg => "Session durability is enabled"}),
         ?SLOG(notice, #{msg => "Session durability is enabled"}),
         ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, get_db_config()),
         ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, get_db_config()),
         ok = emqx_persistent_session_ds_router:init_tables(),
         ok = emqx_persistent_session_ds_router:init_tables(),
-        ok = emqx_persistent_session_ds:create_tables(),
+        ok = initialize_session_ds_state(),
         ok
         ok
     end).
     end).
 
 
@@ -69,6 +69,16 @@ get_db_config() ->
 force_ds(Zone) ->
 force_ds(Zone) ->
     emqx_config:get_zone_conf(Zone, [durable_sessions, force_persistence]).
     emqx_config:get_zone_conf(Zone, [durable_sessions, force_persistence]).
 
 
+-ifdef(STORE_STATE_IN_DS).
+initialize_session_ds_state() ->
+    ok = emqx_persistent_session_ds_state:open_db(get_db_config()).
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
+initialize_session_ds_state() ->
+    ok = emqx_persistent_session_ds_state:create_tables().
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 -spec add_handler() -> ok.
 -spec add_handler() -> ok.

+ 595 - 6
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl

@@ -27,7 +27,13 @@
 %% for use in the management APIs.
 %% for use in the management APIs.
 -module(emqx_persistent_session_ds_state).
 -module(emqx_persistent_session_ds_state).
 
 
+-ifdef(STORE_STATE_IN_DS).
+-export([open_db/1]).
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 -export([create_tables/0]).
 -export([create_tables/0]).
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 -export([open/1, create_new/1, delete/1, commit/1, format/1, print_session/1, list_sessions/0]).
 -export([open/1, create_new/1, delete/1, commit/1, format/1, print_session/1, list_sessions/0]).
 -export([get_created_at/1, set_created_at/2]).
 -export([get_created_at/1, set_created_at/2]).
@@ -85,20 +91,33 @@
 -include("session_internals.hrl").
 -include("session_internals.hrl").
 -include_lib("snabbkaffe/include/trace.hrl").
 -include_lib("snabbkaffe/include/trace.hrl").
 -include_lib("stdlib/include/qlc.hrl").
 -include_lib("stdlib/include/qlc.hrl").
+-include_lib("emqx_utils/include/emqx_message.hrl").
+
+-ifdef(TEST).
+-ifdef(STORE_STATE_IN_DS).
+-export([to_domain_msg/4, from_domain_msg/1]).
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
+%% END ifdef(TEST).
+-endif.
 
 
 %%================================================================================
 %%================================================================================
 %% Type declarations
 %% Type declarations
 %%================================================================================
 %%================================================================================
 
 
+-define(DB, ?DURABLE_SESSION_STATE).
+
 -type message() :: emqx_types:message().
 -type message() :: emqx_types:message().
 
 
 -opaque iter(K, V) :: gb_trees:iter(K, V).
 -opaque iter(K, V) :: gb_trees:iter(K, V).
 
 
 -opaque session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'.
 -opaque session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'.
 
 
+-ifndef(STORE_STATE_IN_DS).
 %% Generic key-value wrapper that is used for exporting arbitrary
 %% Generic key-value wrapper that is used for exporting arbitrary
 %% terms to mnesia:
 %% terms to mnesia:
 -record(kv, {k, v}).
 -record(kv, {k, v}).
+-endif.
 
 
 %% Persistent map.
 %% Persistent map.
 %%
 %%
@@ -170,6 +189,83 @@
     ?awaiting_rel := pmap(emqx_types:packet_id(), _Timestamp :: integer())
     ?awaiting_rel := pmap(emqx_types:packet_id(), _Timestamp :: integer())
 }.
 }.
 
 
+-ifdef(STORE_STATE_IN_DS).
+-define(session_topic_ns, <<"session">>).
+-define(metadata_domain, metadata).
+-define(metadata_domain_bin, <<"metadata">>).
+-define(subscription_domain, subscription).
+-define(subscription_domain_bin, <<"subscription">>).
+-define(subscription_state_domain, subscription_state).
+-define(subscription_state_domain_bin, <<"subscription_state">>).
+-define(stream_domain, stream).
+-define(rank_domain, rank).
+-define(seqno_domain, seqno).
+-define(awaiting_rel_domain, awaiting_rel).
+-type domain() ::
+    ?metadata_domain
+    | ?subscription_domain
+    | ?subscription_state_domain
+    | ?stream_domain
+    | ?rank_domain
+    | ?seqno_domain
+    | ?awaiting_rel_domain.
+
+-type sub_id() :: nil().
+-type srs() :: #srs{}.
+-type data() ::
+    #{
+        domain := ?metadata_domain,
+        session_id := emqx_persistent_session_ds:id(),
+        key := any(),
+        val := map()
+    }
+    | #{
+        domain := ?subscription_domain,
+        session_id := emqx_persistent_session_ds:id(),
+        key := {emqx_types:topic(), sub_id()},
+        val := emqx_persistent_session_ds:subscription()
+    }
+    | #{
+        domain := ?subscription_state_domain,
+        session_id := emqx_persistent_session_ds:id(),
+        key := emqx_persistent_session_ds_subs:subscription_state_id(),
+        val := emqx_persistent_session_ds_subs:subscription_state()
+    }
+    | #{
+        domain := ?stream_domain,
+        session_id := emqx_persistent_session_ds:id(),
+        key := {non_neg_integer(), emqx_ds:stream()},
+        val := srs()
+    }
+    | #{
+        domain := ?rank_domain,
+        session_id := emqx_persistent_session_ds:id(),
+        key := rank_key(),
+        val := non_neg_integer()
+    }
+    | #{
+        domain := ?seqno_domain,
+        session_id := emqx_persistent_session_ds:id(),
+        key := seqno_type(),
+        val := non_neg_integer()
+    }
+    | #{
+        domain := ?awaiting_rel_domain,
+        session_id := emqx_persistent_session_ds:id(),
+        key := emqx_types:packet_id(),
+        val := _Timestamp :: integer()
+    }.
+
+-define(pmaps, [
+    {?subscriptions, ?subscription_domain},
+    {?subscription_states, ?subscription_state_domain},
+    {?streams, ?stream_domain},
+    {?seqnos, ?seqno_domain},
+    {?ranks, ?rank_domain},
+    {?awaiting_rel, ?awaiting_rel_domain}
+]).
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 -define(session_tab, emqx_ds_session_tab).
 -define(session_tab, emqx_ds_session_tab).
 -define(subscription_tab, emqx_ds_session_subscriptions).
 -define(subscription_tab, emqx_ds_session_subscriptions).
 -define(subscription_states_tab, emqx_ds_session_subscription_states).
 -define(subscription_states_tab, emqx_ds_session_subscription_states).
@@ -186,6 +282,8 @@
     {?ranks, ?rank_tab},
     {?ranks, ?rank_tab},
     {?awaiting_rel, ?awaiting_rel_tab}
     {?awaiting_rel, ?awaiting_rel_tab}
 ]).
 ]).
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 %% Enable this flag if you suspect some code breaks the sequence:
 %% Enable this flag if you suspect some code breaks the sequence:
 -ifndef(CHECK_SEQNO).
 -ifndef(CHECK_SEQNO).
@@ -200,6 +298,12 @@
 %% API functions
 %% API functions
 %%================================================================================
 %%================================================================================
 
 
+-ifdef(STORE_STATE_IN_DS).
+-spec open_db(emqx_ds:create_db_opts()) -> ok.
+open_db(Config) ->
+    emqx_ds:open_db(?DB, Config).
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 -spec create_tables() -> ok.
 -spec create_tables() -> ok.
 create_tables() ->
 create_tables() ->
     ok = mria:create_table(
     ok = mria:create_table(
@@ -215,8 +319,39 @@ create_tables() ->
     {_, PmapTables} = lists:unzip(?pmaps),
     {_, PmapTables} = lists:unzip(?pmaps),
     [create_kv_pmap_table(Table) || Table <- PmapTables],
     [create_kv_pmap_table(Table) || Table <- PmapTables],
     mria:wait_for_tables([?session_tab | PmapTables]).
     mria:wait_for_tables([?session_tab | PmapTables]).
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 -spec open(emqx_persistent_session_ds:id()) -> {ok, t()} | undefined.
 -spec open(emqx_persistent_session_ds:id()) -> {ok, t()} | undefined.
+-ifdef(STORE_STATE_IN_DS).
+open(SessionId) ->
+    case session_restore(SessionId) of
+        #{
+            ?metadata_domain := [#{val := Metadata}],
+            ?subscription_domain := Subs,
+            ?subscription_state_domain := SubStates,
+            ?stream_domain := Streams,
+            ?rank_domain := Ranks,
+            ?seqno_domain := Seqnos,
+            ?awaiting_rel_domain := AwaitingRels
+        } ->
+            Rec = #{
+                ?id => SessionId,
+                ?metadata => Metadata,
+                ?subscriptions => pmap_open(?subscription_domain, Subs),
+                ?subscription_states => pmap_open(?subscription_state_domain, SubStates),
+                ?streams => pmap_open(?stream_domain, Streams),
+                ?seqnos => pmap_open(?seqno_domain, Seqnos),
+                ?ranks => pmap_open(?rank_domain, Ranks),
+                ?awaiting_rel => pmap_open(?awaiting_rel_domain, AwaitingRels),
+                ?unset_dirty
+            },
+            {ok, Rec};
+        _ ->
+            undefined
+    end.
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 open(SessionId) ->
 open(SessionId) ->
     ro_transaction(fun() ->
     ro_transaction(fun() ->
         case kv_restore(?session_tab, SessionId) of
         case kv_restore(?session_tab, SessionId) of
@@ -226,8 +361,8 @@ open(SessionId) ->
                         pmap_open(Table, SessionId)
                         pmap_open(Table, SessionId)
                     end,
                     end,
                     #{
                     #{
-                        id => SessionId,
-                        metadata => Metadata,
+                        ?id => SessionId,
+                        ?metadata => Metadata,
                         ?unset_dirty
                         ?unset_dirty
                     }
                     }
                 ),
                 ),
@@ -236,6 +371,8 @@ open(SessionId) ->
                 undefined
                 undefined
         end
         end
     end).
     end).
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 -spec print_session(emqx_persistent_session_ds:id()) -> map() | undefined.
 -spec print_session(emqx_persistent_session_ds:id()) -> map() | undefined.
 print_session(SessionId) ->
 print_session(SessionId) ->
@@ -256,10 +393,25 @@ format(Rec) ->
     ).
     ).
 
 
 -spec list_sessions() -> [emqx_persistent_session_ds:id()].
 -spec list_sessions() -> [emqx_persistent_session_ds:id()].
+-ifdef(STORE_STATE_IN_DS).
+list_sessions() ->
+    lists:map(
+        fun(#{session_id := Id}) -> Id end,
+        read_iterate('+', [?metadata_domain_bin, ?metadata_domain_bin])
+    ).
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 list_sessions() ->
 list_sessions() ->
     mnesia:dirty_all_keys(?session_tab).
     mnesia:dirty_all_keys(?session_tab).
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 -spec delete(emqx_persistent_session_ds:id()) -> ok.
 -spec delete(emqx_persistent_session_ds:id()) -> ok.
+-ifdef(STORE_STATE_IN_DS).
+delete(Id) ->
+    delete_iterate(Id, ['#']).
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 delete(Id) ->
 delete(Id) ->
     transaction(
     transaction(
         fun() ->
         fun() ->
@@ -267,14 +419,67 @@ delete(Id) ->
             mnesia:delete(?session_tab, Id, write)
             mnesia:delete(?session_tab, Id, write)
         end
         end
     ).
     ).
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 -spec commit(t()) -> t().
 -spec commit(t()) -> t().
+-ifdef(STORE_STATE_IN_DS).
+commit(Rec = #{dirty := false}) ->
+    Rec;
+commit(
+    Rec = #{
+        ?id := SessionId,
+        ?metadata := Metadata,
+        ?subscriptions := Subs0,
+        ?subscription_states := SubStates0,
+        ?streams := Streams0,
+        ?seqnos := SeqNos0,
+        ?ranks := Ranks0,
+        ?awaiting_rel := AwaitingRels0
+    }
+) ->
+    check_sequence(Rec),
+    MetadataMsg = to_domain_msg(?metadata_domain, SessionId, _Key = undefined, Metadata),
+    {{SubsMsgs, SubsDel}, Subs} = pmap_commit(SessionId, Subs0),
+    {{SubStatesMsgs, SubStatesDel}, SubStates} = pmap_commit(SessionId, SubStates0),
+    {{StreamsMsgs, StreamsDel}, Streams} = pmap_commit(SessionId, Streams0),
+    {{SeqNosMsgs, SeqNosDel}, SeqNos} = pmap_commit(SessionId, SeqNos0),
+    {{RanksMsgs, RanksDel}, Ranks} = pmap_commit(SessionId, Ranks0),
+    {{AwaitingRelsMsgs, AwaitingRelsDel}, AwaitingRels} = pmap_commit(SessionId, AwaitingRels0),
+    delete_specific_keys(SessionId, ?subscription_domain, SubsDel),
+    delete_specific_keys(SessionId, ?subscription_state_domain, SubStatesDel),
+    delete_specific_keys(SessionId, ?stream_domain, StreamsDel),
+    delete_specific_keys(SessionId, ?seqno_domain, SeqNosDel),
+    delete_specific_keys(SessionId, ?rank_domain, RanksDel),
+    delete_specific_keys(SessionId, ?awaiting_rel_domain, AwaitingRelsDel),
+    ok = store_batch(
+        [MetadataMsg] ++
+            SubsMsgs ++
+            SubStatesMsgs ++
+            StreamsMsgs ++
+            SeqNosMsgs ++ RanksMsgs ++ AwaitingRelsMsgs
+    ),
+    Rec#{
+        ?subscriptions := Subs,
+        ?subscription_states := SubStates,
+        ?streams := Streams,
+        ?seqnos := SeqNos,
+        ?ranks := Ranks,
+        ?awaiting_rel := AwaitingRels,
+        ?unset_dirty
+    }.
+
+store_batch(Batch0) ->
+    Batch = [{emqx_message:timestamp(Msg, microsecond), Msg} || Msg <- Batch0],
+    emqx_ds:store_batch(?DB, Batch, #{atomic => true}).
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 commit(Rec = #{dirty := false}) ->
 commit(Rec = #{dirty := false}) ->
     Rec;
     Rec;
 commit(
 commit(
     Rec = #{
     Rec = #{
-        id := SessionId,
-        metadata := Metadata
+        ?id := SessionId,
+        ?metadata := Metadata
     }
     }
 ) ->
 ) ->
     check_sequence(Rec),
     check_sequence(Rec),
@@ -287,8 +492,26 @@ commit(
             Rec#{?unset_dirty}
             Rec#{?unset_dirty}
         )
         )
     end).
     end).
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 -spec create_new(emqx_persistent_session_ds:id()) -> t().
 -spec create_new(emqx_persistent_session_ds:id()) -> t().
+-ifdef(STORE_STATE_IN_DS).
+create_new(SessionId) ->
+    delete(SessionId),
+    #{
+        ?id => SessionId,
+        ?metadata => #{},
+        ?subscriptions => pmap_open(?subscription_domain, []),
+        ?subscription_states => pmap_open(?subscription_state_domain, []),
+        ?streams => pmap_open(?stream_domain, []),
+        ?seqnos => pmap_open(?seqno_domain, []),
+        ?ranks => pmap_open(?rank_domain, []),
+        ?awaiting_rel => pmap_open(?awaiting_rel_domain, []),
+        ?set_dirty
+    }.
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 create_new(SessionId) ->
 create_new(SessionId) ->
     transaction(fun() ->
     transaction(fun() ->
         delete(SessionId),
         delete(SessionId),
@@ -297,12 +520,14 @@ create_new(SessionId) ->
                 pmap_open(Table, SessionId)
                 pmap_open(Table, SessionId)
             end,
             end,
             #{
             #{
-                id => SessionId,
-                metadata => #{},
+                ?id => SessionId,
+                ?metadata => #{},
                 ?set_dirty
                 ?set_dirty
             }
             }
         )
         )
     end).
     end).
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 %%
 %%
 
 
@@ -363,6 +588,19 @@ set_will_message(Val, Rec) ->
     set_meta(?will_message, Val, Rec).
     set_meta(?will_message, Val, Rec).
 
 
 -spec clear_will_message_now(emqx_persistent_session_ds:id()) -> ok.
 -spec clear_will_message_now(emqx_persistent_session_ds:id()) -> ok.
+-ifdef(STORE_STATE_IN_DS).
+clear_will_message_now(SessionId) when is_binary(SessionId) ->
+    case session_restore(SessionId) of
+        #{?metadata_domain := [#{val := Metadata0}]} ->
+            Metadata = Metadata0#{?will_message => undefined},
+            MetadataMsg = to_domain_msg(?metadata_domain, SessionId, _Key = undefined, Metadata),
+            ok = store_batch([MetadataMsg]),
+            ok;
+        _ ->
+            ok
+    end.
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 clear_will_message_now(SessionId) when is_binary(SessionId) ->
 clear_will_message_now(SessionId) when is_binary(SessionId) ->
     transaction(fun() ->
     transaction(fun() ->
         case kv_restore(?session_tab, SessionId) of
         case kv_restore(?session_tab, SessionId) of
@@ -374,6 +612,8 @@ clear_will_message_now(SessionId) when is_binary(SessionId) ->
                 ok
                 ok
         end
         end
     end).
     end).
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 -spec clear_will_message(t()) -> t().
 -spec clear_will_message(t()) -> t().
 clear_will_message(Rec) ->
 clear_will_message(Rec) ->
@@ -403,8 +643,18 @@ get_subscription(TopicFilter, Rec) ->
     emqx_persistent_session_ds:id(), emqx_types:topic() | emqx_types:share()
     emqx_persistent_session_ds:id(), emqx_types:topic() | emqx_types:share()
 ) ->
 ) ->
     [emqx_persistent_session_ds_subs:subscription()].
     [emqx_persistent_session_ds_subs:subscription()].
+-ifdef(STORE_STATE_IN_DS).
+cold_get_subscription(SessionId, Topic) ->
+    Data = read_iterate(SessionId, [
+        ?subscription_domain_bin, key_encode(?subscription_domain, Topic)
+    ]),
+    lists:map(fun(#{val := V}) -> V end, Data).
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 cold_get_subscription(SessionId, Topic) ->
 cold_get_subscription(SessionId, Topic) ->
     kv_pmap_read(?subscription_tab, SessionId, Topic).
     kv_pmap_read(?subscription_tab, SessionId, Topic).
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 -spec fold_subscriptions(fun(), Acc, t()) -> Acc.
 -spec fold_subscriptions(fun(), Acc, t()) -> Acc.
 fold_subscriptions(Fun, Acc, Rec) ->
 fold_subscriptions(Fun, Acc, Rec) ->
@@ -415,10 +665,18 @@ n_subscriptions(Rec) ->
     gen_size(?subscriptions, Rec).
     gen_size(?subscriptions, Rec).
 
 
 -spec total_subscription_count() -> non_neg_integer().
 -spec total_subscription_count() -> non_neg_integer().
+-ifdef(STORE_STATE_IN_DS).
+total_subscription_count() ->
+    Fun = fun(Data, Acc) -> length(Data) + Acc end,
+    read_fold(Fun, 0, '+', [?subscription_domain_bin, '+']).
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 total_subscription_count() ->
 total_subscription_count() ->
     mria:async_dirty(?DS_MRIA_SHARD, fun() ->
     mria:async_dirty(?DS_MRIA_SHARD, fun() ->
         mnesia:foldl(fun(#kv{}, Acc) -> Acc + 1 end, 0, ?subscription_tab)
         mnesia:foldl(fun(#kv{}, Acc) -> Acc + 1 end, 0, ?subscription_tab)
     end).
     end).
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 -spec put_subscription(
 -spec put_subscription(
     emqx_persistent_session_ds:topic_filter(),
     emqx_persistent_session_ds:topic_filter(),
@@ -443,8 +701,18 @@ get_subscription_state(SStateId, Rec) ->
     emqx_persistent_session_ds:id(), emqx_persistent_session_ds_subs:subscription_state_id()
     emqx_persistent_session_ds:id(), emqx_persistent_session_ds_subs:subscription_state_id()
 ) ->
 ) ->
     [emqx_persistent_session_ds_subs:subscription_state()].
     [emqx_persistent_session_ds_subs:subscription_state()].
+-ifdef(STORE_STATE_IN_DS).
+cold_get_subscription_state(SessionId, SStateId) ->
+    Data = read_iterate(SessionId, [
+        ?subscription_state_domain_bin, key_encode(?subscription_state_domain, SStateId)
+    ]),
+    lists:map(fun(#{val := V}) -> V end, Data).
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 cold_get_subscription_state(SessionId, SStateId) ->
 cold_get_subscription_state(SessionId, SStateId) ->
     kv_pmap_read(?subscription_states_tab, SessionId, SStateId).
     kv_pmap_read(?subscription_states_tab, SessionId, SStateId).
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 -spec fold_subscription_states(fun(), Acc, t()) -> Acc.
 -spec fold_subscription_states(fun(), Acc, t()) -> Acc.
 fold_subscription_states(Fun, Acc, Rec) ->
 fold_subscription_states(Fun, Acc, Rec) ->
@@ -556,11 +824,77 @@ iter_next(It0) ->
 %%
 %%
 
 
 -spec make_session_iterator() -> session_iterator().
 -spec make_session_iterator() -> session_iterator().
+-ifdef(STORE_STATE_IN_DS).
+make_session_iterator() ->
+    %% NB. This hides the existence of streams.  Users will need to start iteration
+    %% again to see new streams, if any.
+    %% NB. This is not assumed to be stored permanently anywhere.
+    Time = 0,
+    TopicFilter = [
+        <<"session">>,
+        '+',
+        ?metadata_domain_bin,
+        ?metadata_domain_bin
+    ],
+    Iterators = lists:map(
+        fun({_Rank, Stream}) ->
+            {ok, Iterator} = emqx_ds:make_iterator(?DB, Stream, TopicFilter, Time),
+            Iterator
+        end,
+        emqx_ds:get_streams(?DB, TopicFilter, Time)
+    ),
+    #{its => Iterators}.
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 make_session_iterator() ->
 make_session_iterator() ->
     mnesia:dirty_first(?session_tab).
     mnesia:dirty_first(?session_tab).
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 -spec session_iterator_next(session_iterator(), pos_integer()) ->
 -spec session_iterator_next(session_iterator(), pos_integer()) ->
     {[{emqx_persistent_session_ds:id(), metadata()}], session_iterator() | '$end_of_table'}.
     {[{emqx_persistent_session_ds:id(), metadata()}], session_iterator() | '$end_of_table'}.
+-ifdef(STORE_STATE_IN_DS).
+session_iterator_next(Cursor, N) ->
+    session_iterator_next(Cursor, N, []).
+
+%% Note: ordering is not respected here.
+session_iterator_next(#{its := [It | Rest]} = Cursor, 0, Acc) ->
+    %% Peek the next item to detect end of table.
+    case emqx_ds:next(?DB, It, 1) of
+        {ok, end_of_stream} ->
+            session_iterator_next(Cursor#{its := Rest}, 0, Acc);
+        {ok, _NewIt, []} ->
+            session_iterator_next(Cursor#{its := Rest}, 0, Acc);
+        {ok, _NewIt, _Batch} ->
+            {Acc, Cursor}
+    end;
+session_iterator_next(_Cursor, 0, Acc) ->
+    {Acc, '$end_of_table'};
+session_iterator_next('$end_of_table', _N, Acc) ->
+    {Acc, '$end_of_table'};
+session_iterator_next(#{its := []}, _N, Acc) ->
+    {Acc, '$end_of_table'};
+session_iterator_next(#{its := [It | Rest]} = Cursor0, N, Acc) ->
+    case emqx_ds:next(?DB, It, N) of
+        {ok, end_of_stream} ->
+            session_iterator_next(Cursor0#{its := Rest}, N, Acc);
+        {ok, _NewIt, []} ->
+            session_iterator_next(Cursor0#{its := Rest}, N, Acc);
+        {ok, NewIt, Batch} ->
+            NumBatch = length(Batch),
+            SessionIds = lists:map(
+                fun({_DSKey, Msg}) ->
+                    #{session_id := Id, val := Val} = from_domain_msg(Msg),
+                    {Id, Val}
+                end,
+                Batch
+            ),
+            session_iterator_next(
+                Cursor0#{its := [NewIt | Rest]}, N - NumBatch, SessionIds ++ Acc
+            )
+    end.
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 session_iterator_next(Cursor, 0) ->
 session_iterator_next(Cursor, 0) ->
     {[], Cursor};
     {[], Cursor};
 session_iterator_next('$end_of_table', _N) ->
 session_iterator_next('$end_of_table', _N) ->
@@ -572,6 +906,8 @@ session_iterator_next(Cursor0, N) ->
     ],
     ],
     {NextVals, Cursor} = session_iterator_next(mnesia:dirty_next(?session_tab, Cursor0), N - 1),
     {NextVals, Cursor} = session_iterator_next(mnesia:dirty_next(?session_tab, Cursor0), N - 1),
     {ThisVal ++ NextVals, Cursor}.
     {ThisVal ++ NextVals, Cursor}.
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 %%================================================================================
 %%================================================================================
 %% Internal functions
 %% Internal functions
@@ -579,10 +915,58 @@ session_iterator_next(Cursor0, N) ->
 
 
 %% All mnesia reads and writes are passed through this function.
 %% All mnesia reads and writes are passed through this function.
 %% Backward compatiblity issues can be handled here.
 %% Backward compatiblity issues can be handled here.
+-ifdef(STORE_STATE_IN_DS).
+val_encode(_Domain, Term) ->
+    term_to_binary(Term).
+
+val_decode(_Domain, Bin) ->
+    binary_to_term(Bin).
+
+-spec key_encode(domain(), term()) -> binary().
+key_encode(?metadata_domain, _Key) ->
+    ?metadata_domain_bin;
+key_encode(?subscription_domain, TopicFilterAndSubId) ->
+    term_to_topic_level(TopicFilterAndSubId);
+key_encode(?subscription_state_domain, SubStateId) ->
+    integer_to_binary(SubStateId);
+key_encode(?stream_domain, StreamKey) ->
+    term_to_topic_level(StreamKey);
+key_encode(?rank_domain, RankKey) ->
+    term_to_topic_level(RankKey);
+key_encode(?seqno_domain, SeqnoType) ->
+    integer_to_binary(SeqnoType);
+key_encode(?awaiting_rel_domain, PacketId) ->
+    integer_to_binary(PacketId).
+
+-spec key_decode(domain(), binary()) -> term().
+key_decode(?metadata_domain, Bin) ->
+    Bin;
+key_decode(?subscription_domain, Bin) ->
+    topic_level_to_term(Bin);
+key_decode(?subscription_state_domain, Bin) ->
+    binary_to_integer(Bin);
+key_decode(?stream_domain, Bin) ->
+    topic_level_to_term(Bin);
+key_decode(?rank_domain, Bin) ->
+    topic_level_to_term(Bin);
+key_decode(?seqno_domain, Bin) ->
+    binary_to_integer(Bin);
+key_decode(?awaiting_rel_domain, Bin) ->
+    binary_to_integer(Bin).
+
+term_to_topic_level(Term) ->
+    base64:encode(term_to_binary(Term), #{mode => urlsafe, padding => false}).
+
+topic_level_to_term(Bin) ->
+    binary_to_term(base64:decode(Bin, #{mode => urlsafe, padding => false})).
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 encoder(encode, _Table, Term) ->
 encoder(encode, _Table, Term) ->
     Term;
     Term;
 encoder(decode, _Table, Term) ->
 encoder(decode, _Table, Term) ->
     Term.
     Term.
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 %%
 %%
 
 
@@ -646,6 +1030,18 @@ update_pmaps(Fun, Map) ->
 
 
 %% @doc Open a PMAP and fill the clean area with the data from DB.
 %% @doc Open a PMAP and fill the clean area with the data from DB.
 %% This functtion should be ran in a transaction.
 %% This functtion should be ran in a transaction.
+-ifdef(STORE_STATE_IN_DS).
+-spec pmap_open(domain(), [data()]) -> pmap(_K, _V).
+pmap_open(Domain, Data0) ->
+    Data = lists:map(fun(#{key := K, val := V}) -> {K, V} end, Data0),
+    Clean = cache_from_list(Domain, Data),
+    #pmap{
+        table = Domain,
+        cache = Clean,
+        dirty = #{}
+    }.
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 -spec pmap_open(atom(), emqx_persistent_session_ds:id()) -> pmap(_K, _V).
 -spec pmap_open(atom(), emqx_persistent_session_ds:id()) -> pmap(_K, _V).
 pmap_open(Table, SessionId) ->
 pmap_open(Table, SessionId) ->
     Clean = cache_from_list(Table, kv_pmap_restore(Table, SessionId)),
     Clean = cache_from_list(Table, kv_pmap_restore(Table, SessionId)),
@@ -654,6 +1050,8 @@ pmap_open(Table, SessionId) ->
         cache = Clean,
         cache = Clean,
         dirty = #{}
         dirty = #{}
     }.
     }.
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 -spec pmap_get(K, pmap(K, V)) -> V | undefined.
 -spec pmap_get(K, pmap(K, V)) -> V | undefined.
 pmap_get(K, #pmap{table = Table, cache = Cache}) ->
 pmap_get(K, #pmap{table = Table, cache = Cache}) ->
@@ -680,6 +1078,30 @@ pmap_del(
 pmap_fold(Fun, Acc, #pmap{table = Table, cache = Cache}) ->
 pmap_fold(Fun, Acc, #pmap{table = Table, cache = Cache}) ->
     cache_fold(Table, Fun, Acc, Cache).
     cache_fold(Table, Fun, Acc, Cache).
 
 
+-ifdef(STORE_STATE_IN_DS).
+-spec pmap_commit(emqx_persistent_session_ds:id(), pmap(K, V)) ->
+    {{[emqx_types:message()], [term()]}, pmap(K, V)}.
+pmap_commit(
+    SessionId, Pmap = #pmap{table = Domain, dirty = Dirty, cache = Cache}
+) ->
+    Out =
+        maps:fold(
+            fun
+                (K, del, {AccPersist, AccDel}) ->
+                    {AccPersist, [K | AccDel]};
+                (K, dirty, {AccPersist, AccDel}) ->
+                    V = cache_get(Domain, K, Cache),
+                    Msg = to_domain_msg(Domain, SessionId, K, V),
+                    {[Msg | AccPersist], AccDel}
+            end,
+            {[], []},
+            Dirty
+        ),
+    {Out, Pmap#pmap{
+        dirty = #{}
+    }}.
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 -spec pmap_commit(emqx_persistent_session_ds:id(), pmap(K, V)) -> pmap(K, V).
 -spec pmap_commit(emqx_persistent_session_ds:id(), pmap(K, V)) -> pmap(K, V).
 pmap_commit(
 pmap_commit(
     SessionId, Pmap = #pmap{table = Tab, dirty = Dirty, cache = Cache}
     SessionId, Pmap = #pmap{table = Tab, dirty = Dirty, cache = Cache}
@@ -697,6 +1119,8 @@ pmap_commit(
     Pmap#pmap{
     Pmap#pmap{
         dirty = #{}
         dirty = #{}
     }.
     }.
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 -spec pmap_format(pmap(_K, _V)) -> map().
 -spec pmap_format(pmap(_K, _V)) -> map().
 pmap_format(#pmap{table = Table, cache = Cache}) ->
 pmap_format(#pmap{table = Table, cache = Cache}) ->
@@ -713,6 +1137,10 @@ pmap_iter_after(After, #pmap{table = Table, cache = Cache}) ->
 
 
 %%
 %%
 
 
+-ifdef(STORE_STATE_IN_DS).
+-define(stream_tab, ?stream_domain).
+-endif.
+
 cache_data_type(?stream_tab) -> gbt;
 cache_data_type(?stream_tab) -> gbt;
 cache_data_type(_Table) -> map.
 cache_data_type(_Table) -> map.
 
 
@@ -803,6 +1231,30 @@ gbt_iter_after(After, Cache) ->
 gbt_iter_next(It) ->
 gbt_iter_next(It) ->
     gb_trees:next(It).
     gb_trees:next(It).
 
 
+-ifdef(STORE_STATE_IN_DS).
+session_restore(SessionId) ->
+    Empty = maps:from_keys(
+        [
+            ?metadata_domain,
+            ?subscription_domain,
+            ?subscription_state_domain,
+            ?stream_domain,
+            ?rank_domain,
+            ?seqno_domain,
+            ?awaiting_rel_domain
+        ],
+        []
+    ),
+    Data = maps:groups_from_list(
+        fun(#{domain := Domain}) ->
+            Domain
+        end,
+        read_iterate(SessionId, ['#'])
+    ),
+    maps:merge(Empty, Data).
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
+
 %% Functions dealing with set tables:
 %% Functions dealing with set tables:
 
 
 kv_persist(Tab, SessionId, Val0) ->
 kv_persist(Tab, SessionId, Val0) ->
@@ -876,8 +1328,145 @@ ro_transaction(Fun) ->
 %%     {atomic, Res} = mria:ro_transaction(?DS_MRIA_SHARD, Fun),
 %%     {atomic, Res} = mria:ro_transaction(?DS_MRIA_SHARD, Fun),
 %%     Res.
 %%     Res.
 
 
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
+
 %%
 %%
 
 
+-ifdef(STORE_STATE_IN_DS).
+to_domain_msg(Domain, SessionId, Key, Val) ->
+    #message{
+        %% unused; empty binary to satisfy dialyzer
+        id = <<>>,
+        timestamp = 0,
+        from = SessionId,
+        topic = to_topic(Domain, SessionId, key_encode(Domain, Key)),
+        payload = val_encode(Domain, Val)
+    }.
+
+from_domain_msg(#message{topic = Topic, payload = Bin}) ->
+    #{
+        domain := Domain,
+        session_id := _SessionId,
+        key := _Key
+    } = Data = domain_topic_decode(Topic),
+    Data#{val => val_decode(Domain, Bin)}.
+
+to_topic(Domain, SessionId0, BinKey) when is_binary(BinKey) ->
+    SessionId = emqx_http_lib:uri_encode(SessionId0),
+    emqx_topic:join([
+        ?session_topic_ns,
+        SessionId,
+        atom_to_binary(Domain),
+        BinKey
+    ]).
+
+domain_topic_decode(Topic) ->
+    [<<"session">>, SessionId | Rest] = emqx_topic:tokens(Topic),
+    case parse_domain(Rest) of
+        [Domain, Bin] when
+            Domain =:= ?metadata_domain;
+            Domain =:= ?subscription_domain;
+            Domain =:= ?subscription_state_domain;
+            Domain =:= ?stream_domain;
+            Domain =:= ?rank_domain;
+            Domain =:= ?seqno_domain;
+            Domain =:= ?awaiting_rel_domain
+        ->
+            #{
+                domain => Domain,
+                session_id => emqx_http_lib:uri_decode(SessionId),
+                key => key_decode(Domain, Bin)
+            }
+    end.
+
+parse_domain([DomainBin | Rest]) ->
+    [binary_to_existing_atom(DomainBin) | Rest].
+
+-spec read_iterate(emqx_persistent_session_ds:id() | '#' | '+', emqx_ds:topic_filter()) ->
+    [data()].
+read_iterate(SessionId0, TopicFilterWords0) ->
+    Fun = fun(Data, Acc) -> Data ++ Acc end,
+    Acc = [],
+    read_fold(Fun, Acc, SessionId0, TopicFilterWords0).
+
+read_fold(Fun, Acc, SessionId0, TopicFilterWords0) ->
+    Time = 0,
+    SessionId =
+        case is_binary(SessionId0) of
+            true -> emqx_http_lib:uri_encode(SessionId0);
+            false -> SessionId0
+        end,
+    TopicFilter = [?session_topic_ns, SessionId | TopicFilterWords0],
+    Iterators = lists:map(
+        fun({_Rank, Stream}) ->
+            {ok, Iterator} = emqx_ds:make_iterator(?DB, Stream, TopicFilter, Time),
+            Iterator
+        end,
+        emqx_ds:get_streams(?DB, TopicFilter, Time)
+    ),
+    do_read_fold(Fun, Iterators, Acc).
+
+%% Note: no ordering.
+do_read_fold(_Fun, [], Acc) ->
+    Acc;
+do_read_fold(Fun, [Iterator | Rest], Acc) ->
+    %% TODO: config?
+    BatchSize = 100,
+    case emqx_ds:next(?DB, Iterator, BatchSize) of
+        {ok, end_of_stream} ->
+            do_read_fold(Fun, Rest, Acc);
+        {ok, _NewIterator, []} ->
+            do_read_fold(Fun, Rest, Acc);
+        {ok, NewIterator, Msgs} ->
+            Data = lists:map(
+                fun({_DSKey, Msg}) -> from_domain_msg(Msg) end,
+                Msgs
+            ),
+            do_read_fold(Fun, [NewIterator | Rest], Fun(Data, Acc))
+    end.
+
+delete_specific_keys(SessionId, Domain, Keys) when is_list(Keys) ->
+    lists:foreach(
+        fun(Key) ->
+            delete_specific_key(SessionId, Domain, Key)
+        end,
+        Keys
+    ).
+
+delete_specific_key(SessionId, Domain, Key) ->
+    KeyBin = key_encode(Domain, Key),
+    delete_iterate(SessionId, [atom_to_binary(Domain), KeyBin]).
+
+delete_iterate(SessionId, TopicFilterWords0) ->
+    Time = 0,
+    TopicFilter = [?session_topic_ns, emqx_http_lib:uri_encode(SessionId) | TopicFilterWords0],
+    Iterators = lists:map(
+        fun(Stream) ->
+            {ok, Iterator} = emqx_ds:make_delete_iterator(?DB, Stream, TopicFilter, Time),
+            Iterator
+        end,
+        emqx_ds:get_delete_streams(?DB, TopicFilter, Time)
+    ),
+    Selector = fun(_) -> true end,
+    do_delete_iterate(Iterators, Selector).
+
+do_delete_iterate([], _Selector) ->
+    ok;
+do_delete_iterate([Iterator | Rest], Selector) ->
+    %% TODO: config?
+    BatchSize = 100,
+    case emqx_ds:delete_next(?DB, Iterator, Selector, BatchSize) of
+        {ok, end_of_stream} ->
+            do_delete_iterate(Rest, Selector);
+        {ok, _NewIterator, 0} ->
+            do_delete_iterate(Rest, Selector);
+        {ok, NewIterator, _NDeleted} ->
+            do_delete_iterate([NewIterator | Rest], Selector)
+    end.
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
+
 -compile({inline, check_sequence/1}).
 -compile({inline, check_sequence/1}).
 
 
 -ifdef(CHECK_SEQNO).
 -ifdef(CHECK_SEQNO).

+ 2 - 0
apps/emqx/src/emqx_persistent_session_ds/session_internals.hrl

@@ -19,6 +19,8 @@
 -include("emqx_persistent_message.hrl").
 -include("emqx_persistent_message.hrl").
 -include("emqx_durable_session_metadata.hrl").
 -include("emqx_durable_session_metadata.hrl").
 
 
+-define(DURABLE_SESSION_STATE, emqx_persistent_session).
+
 -define(SESSION_TAB, emqx_ds_session).
 -define(SESSION_TAB, emqx_ds_session).
 -define(SESSION_SUBSCRIPTIONS_TAB, emqx_ds_session_subscriptions).
 -define(SESSION_SUBSCRIPTIONS_TAB, emqx_ds_session_subscriptions).
 -define(SESSION_STREAM_TAB, emqx_ds_stream_tab).
 -define(SESSION_STREAM_TAB, emqx_ds_stream_tab).

+ 148 - 2
apps/emqx/test/emqx_persistent_session_ds_state_tests.erl

@@ -21,12 +21,13 @@
 -include_lib("proper/include/proper.hrl").
 -include_lib("proper/include/proper.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
 
--define(tab, ?MODULE).
-
 %%================================================================================
 %%================================================================================
 %% Type declarations
 %% Type declarations
 %%================================================================================
 %%================================================================================
 
 
+-define(tab, ?MODULE).
+-define(DB, emqx_persistent_session).
+
 %% Note: here `committed' != `dirty'. It means "has been committed at
 %% Note: here `committed' != `dirty'. It means "has been committed at
 %% least once since the creation", and it's used by the iteration
 %% least once since the creation", and it's used by the iteration
 %% test.
 %% test.
@@ -34,6 +35,15 @@
 
 
 -type state() :: #{emqx_persistent_session_ds:id() => #s{}}.
 -type state() :: #{emqx_persistent_session_ds:id() => #s{}}.
 
 
+-define(metadata_domain, metadata).
+-define(metadata_domain_bin, <<"metadata">>).
+-define(subscription_domain, subscription).
+-define(subscription_state_domain, subscription_state).
+-define(stream_domain, stream).
+-define(rank_domain, rank).
+-define(seqno_domain, seqno).
+-define(awaiting_rel_domain, awaiting_rel).
+
 %%================================================================================
 %%================================================================================
 %% Properties
 %% Properties
 %%================================================================================
 %%================================================================================
@@ -62,6 +72,42 @@ prop_consistency() ->
         end
         end
     ).
     ).
 
 
+-ifdef(STORE_STATE_IN_DS).
+domain_msg_roundtrip_proper_test_() ->
+    Props = [prop_domain_msg_roundtrip()],
+    Opts = [{numtests, 1000}, {to_file, user}, {max_size, 100}],
+    {timeout, 300, [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}.
+
+prop_domain_msg_roundtrip() ->
+    ?FORALL(
+        {SessionId, Domain, Val},
+        {session_id_gen(), domain_gen(), value_gen()},
+        ?FORALL(
+            Key,
+            key_gen(Domain),
+            begin
+                Msg = emqx_persistent_session_ds_state:to_domain_msg(Domain, SessionId, Key, Val),
+                Parsed = emqx_persistent_session_ds_state:from_domain_msg(Msg),
+                Expected = #{
+                    domain => Domain,
+                    session_id => SessionId,
+                    key => Key,
+                    val => Val
+                },
+                ?WHENFAIL(
+                    begin
+                        io:format(user, " *** Expected =~n       ~p~n", [Expected]),
+                        io:format(user, " *** Got =~n       ~p~n", [Parsed]),
+                        ok
+                    end,
+                    Expected =:= Parsed
+                )
+            end
+        )
+    ).
+%% -ifdef(STORE_STATE_IN_DS).
+-endif.
+
 %%================================================================================
 %%================================================================================
 %% Generators
 %% Generators
 %%================================================================================
 %%================================================================================
@@ -147,6 +193,61 @@ del_req() ->
         {#s.subs, del_subscription, topic()}
         {#s.subs, del_subscription, topic()}
     ]).
     ]).
 
 
+value_gen() ->
+    oneof([proper_types:map(), tuple()]).
+
+session_id_gen() ->
+    frequency([
+        {5, clientid()},
+        {1, <<"a/">>},
+        {1, <<"a/b">>},
+        {1, <<"a/+">>},
+        {1, <<"a/+/#">>},
+        {1, <<"#">>},
+        {1, <<"+">>},
+        {1, <<"/">>}
+    ]).
+
+clientid() ->
+    %% empty string is not valid...
+    ?SUCHTHAT(ClientId, emqx_proper_types:clientid(), ClientId =/= <<>>).
+
+domain_gen() ->
+    oneof([
+        ?metadata_domain,
+        ?subscription_domain,
+        ?subscription_state_domain,
+        ?stream_domain,
+        ?rank_domain,
+        ?seqno_domain,
+        ?awaiting_rel_domain
+    ]).
+
+key_gen(?metadata_domain) ->
+    <<"metadata">>;
+key_gen(?subscription_domain) ->
+    {emqx_proper_types:normal_topic_filter(), []};
+key_gen(?subscription_state_domain) ->
+    integer();
+key_gen(?stream_domain) ->
+    ?LET(
+        {Id, X, Y, Z, T},
+        {
+            integer(),
+            oneof([integer(), binary()]),
+            oneof([integer(), binary()]),
+            oneof([integer(), binary()]),
+            tuple()
+        },
+        {Id, [X, Y, Z | T]}
+    );
+key_gen(?rank_domain) ->
+    {integer(), binary()};
+key_gen(?seqno_domain) ->
+    integer();
+key_gen(?awaiting_rel_domain) ->
+    range(1, 16#FFFF).
+
 command(S) ->
 command(S) ->
     case maps:size(S) > 0 of
     case maps:size(S) > 0 of
         true ->
         true ->
@@ -316,12 +417,57 @@ get_state(SessionId) ->
 put_state(SessionId, S) ->
 put_state(SessionId, S) ->
     ets:insert(?tab, {SessionId, S}).
     ets:insert(?tab, {SessionId, S}).
 
 
+-ifdef(STORE_STATE_IN_DS).
+init() ->
+    _ = ets:new(?tab, [named_table, public, {keypos, 1}]),
+    mria:start(),
+    {ok, _} = application:ensure_all_started(emqx_ds_backends),
+    Dir = binary_to_list(filename:join(["/tmp", emqx_guid:to_hexstr(emqx_guid:gen())])),
+    persistent_term:put({?MODULE, data_dir}, Dir),
+    application:set_env(emqx_durable_storage, db_data_dir, Dir),
+    Defaults = #{
+        backend => builtin_local,
+        force_monotonic_timestamps => false,
+        atomic_batches => true,
+        storage =>
+            {emqx_ds_storage_bitfield_lts, #{
+                topic_index_bytes => 4,
+                epoch_bits => 10,
+                bits_per_topic_level => 64
+            }},
+        n_shards => 16,
+        n_sites => 1,
+        replication_factor => 3,
+        replication_options => #{}
+    },
+    ok = emqx_persistent_session_ds_state:open_db(Defaults),
+    ok.
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 init() ->
 init() ->
     _ = ets:new(?tab, [named_table, public, {keypos, 1}]),
     _ = ets:new(?tab, [named_table, public, {keypos, 1}]),
     mria:start(),
     mria:start(),
     emqx_persistent_session_ds_state:create_tables().
     emqx_persistent_session_ds_state:create_tables().
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
+-ifdef(STORE_STATE_IN_DS).
+clean() ->
+    ets:delete(?tab),
+    emqx_ds:drop_db(?DB),
+    application:stop(emqx_ds_backends),
+    application:stop(emqx_ds_builtin_local),
+    mria:stop(),
+    mria_mnesia:delete_schema(),
+    Dir = persistent_term:get({?MODULE, data_dir}),
+    persistent_term:erase({?MODULE, data_dir}),
+    ok = file:del_dir_r(Dir),
+    ok.
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 clean() ->
 clean() ->
     ets:delete(?tab),
     ets:delete(?tab),
     mria:stop(),
     mria:stop(),
     mria_mnesia:delete_schema().
     mria_mnesia:delete_schema().
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.

+ 2 - 1
apps/emqx/test/emqx_proper_types.erl

@@ -50,7 +50,8 @@
     printable_utf8/0,
     printable_utf8/0,
     printable_codepoint/0,
     printable_codepoint/0,
     raw_duration/0,
     raw_duration/0,
-    large_raw_duration/0
+    large_raw_duration/0,
+    clientid/0
 ]).
 ]).
 
 
 %% Generic Types
 %% Generic Types

+ 20 - 1
apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl

@@ -101,6 +101,8 @@
 -define(stream(SHARD, INNER), [2, SHARD | INNER]).
 -define(stream(SHARD, INNER), [2, SHARD | INNER]).
 -define(delete_stream(SHARD, INNER), [3, SHARD | INNER]).
 -define(delete_stream(SHARD, INNER), [3, SHARD | INNER]).
 
 
+-type message() :: emqx_types:message() | {emqx_ds:time(), emqx_types:message()}.
+
 %%================================================================================
 %%================================================================================
 %% API functions
 %% API functions
 %%================================================================================
 %%================================================================================
@@ -274,7 +276,7 @@ make_batch(_ForceMonotonic = true, Latest, Messages) ->
 make_batch(false, Latest, Messages) ->
 make_batch(false, Latest, Messages) ->
     assign_operation_timestamps(Latest, Messages, []).
     assign_operation_timestamps(Latest, Messages, []).
 
 
-assign_monotonic_timestamps(Latest0, [Message = #message{} | Rest], Acc0) ->
+assign_monotonic_timestamps(Latest0, [#message{} = Message | Rest], Acc0) ->
     case emqx_message:timestamp(Message, microsecond) of
     case emqx_message:timestamp(Message, microsecond) of
         TimestampUs when TimestampUs > Latest0 ->
         TimestampUs when TimestampUs > Latest0 ->
             Latest = TimestampUs;
             Latest = TimestampUs;
@@ -283,6 +285,15 @@ assign_monotonic_timestamps(Latest0, [Message = #message{} | Rest], Acc0) ->
     end,
     end,
     Acc = [assign_timestamp(Latest, Message) | Acc0],
     Acc = [assign_timestamp(Latest, Message) | Acc0],
     assign_monotonic_timestamps(Latest, Rest, Acc);
     assign_monotonic_timestamps(Latest, Rest, Acc);
+assign_monotonic_timestamps(Latest0, [{Timestamp, #message{} = Message0} | Rest], Acc0) ->
+    case Timestamp > Latest0 of
+        true ->
+            Latest = Timestamp;
+        false ->
+            Latest = Latest0 + 1
+    end,
+    Acc = [assign_timestamp(Timestamp, Message0) | Acc0],
+    assign_monotonic_timestamps(Latest, Rest, Acc);
 assign_monotonic_timestamps(Latest, [Operation | Rest], Acc0) ->
 assign_monotonic_timestamps(Latest, [Operation | Rest], Acc0) ->
     Acc = [Operation | Acc0],
     Acc = [Operation | Acc0],
     assign_monotonic_timestamps(Latest, Rest, Acc);
     assign_monotonic_timestamps(Latest, Rest, Acc);
@@ -294,6 +305,10 @@ assign_operation_timestamps(Latest0, [Message = #message{} | Rest], Acc0) ->
     Latest = max(TimestampUs, Latest0),
     Latest = max(TimestampUs, Latest0),
     Acc = [assign_timestamp(TimestampUs, Message) | Acc0],
     Acc = [assign_timestamp(TimestampUs, Message) | Acc0],
     assign_operation_timestamps(Latest, Rest, Acc);
     assign_operation_timestamps(Latest, Rest, Acc);
+assign_operation_timestamps(Latest0, [{Timestamp, #message{} = Message0} | Rest], Acc0) ->
+    Latest = max(Timestamp, Latest0),
+    Acc = [assign_timestamp(Timestamp, Message0) | Acc0],
+    assign_operation_timestamps(Latest, Rest, Acc);
 assign_operation_timestamps(Latest, [Operation | Rest], Acc0) ->
 assign_operation_timestamps(Latest, [Operation | Rest], Acc0) ->
     Acc = [Operation | Acc0],
     Acc = [Operation | Acc0],
     assign_operation_timestamps(Latest, Rest, Acc);
     assign_operation_timestamps(Latest, Rest, Acc);
@@ -304,6 +319,10 @@ assign_timestamp(TimestampUs, Message) ->
     {TimestampUs, Message}.
     {TimestampUs, Message}.
 
 
 -spec shard_of_operation(emqx_ds:db(), emqx_ds:operation(), clientid | topic, _Options) -> shard().
 -spec shard_of_operation(emqx_ds:db(), emqx_ds:operation(), clientid | topic, _Options) -> shard().
+shard_of_operation(DB, {Timestamp, #message{} = Message}, SerializeBy, Options) when
+    is_integer(Timestamp)
+->
+    shard_of_operation(DB, Message, SerializeBy, Options);
 shard_of_operation(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
 shard_of_operation(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
     case SerializeBy of
     case SerializeBy of
         clientid -> Key = From;
         clientid -> Key = From;

+ 13 - 2
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl

@@ -94,6 +94,8 @@
 
 
 -type shard_id() :: binary().
 -type shard_id() :: binary().
 
 
+-type message() :: emqx_types:message() | {emqx_ds:time(), emqx_types:message()}.
+
 -type builtin_db_opts() ::
 -type builtin_db_opts() ::
     #{
     #{
         backend := builtin,
         backend := builtin,
@@ -415,7 +417,7 @@ current_timestamp(DB, Shard) ->
 init_buffer(_DB, _Shard, _Options) ->
 init_buffer(_DB, _Shard, _Options) ->
     {ok, #bs{}}.
     {ok, #bs{}}.
 
 
--spec flush_buffer(emqx_ds:db(), shard_id(), [emqx_types:message()], egress_state()) ->
+-spec flush_buffer(emqx_ds:db(), shard_id(), [message()], egress_state()) ->
     {egress_state(), ok | emqx_ds:error(_)}.
     {egress_state(), ok | emqx_ds:error(_)}.
 flush_buffer(DB, Shard, Messages, State) ->
 flush_buffer(DB, Shard, Messages, State) ->
     case ra_store_batch(DB, Shard, Messages) of
     case ra_store_batch(DB, Shard, Messages) of
@@ -433,6 +435,10 @@ flush_buffer(DB, Shard, Messages, State) ->
     _Options
     _Options
 ) ->
 ) ->
     emqx_ds_replication_layer:shard_id().
     emqx_ds_replication_layer:shard_id().
+shard_of_operation(DB, {Timestamp, #message{} = Message}, SerializeBy, Options) when
+    is_integer(Timestamp)
+->
+    shard_of_operation(DB, Message, SerializeBy, Options);
 shard_of_operation(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
 shard_of_operation(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
     case SerializeBy of
     case SerializeBy of
         clientid -> Key = From;
         clientid -> Key = From;
@@ -1031,7 +1037,7 @@ tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) ->
     ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, timestamp => Timestamp}),
     ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, timestamp => Timestamp}),
     handle_custom_event(DBShard, Timestamp, tick).
     handle_custom_event(DBShard, Timestamp, tick).
 
 
-assign_timestamps(true, Latest0, [Message0 = #message{} | Rest], Acc, N, Sz) ->
+assign_timestamps(true, Latest0, [#message{} = Message0 | Rest], Acc, N, Sz) ->
     case emqx_message:timestamp(Message0, microsecond) of
     case emqx_message:timestamp(Message0, microsecond) of
         TimestampUs when TimestampUs > Latest0 ->
         TimestampUs when TimestampUs > Latest0 ->
             Latest = TimestampUs,
             Latest = TimestampUs,
@@ -1048,6 +1054,11 @@ assign_timestamps(false, Latest0, [Message0 = #message{} | Rest], Acc, N, Sz) ->
     Message = assign_timestamp(TimestampUs, Message0),
     Message = assign_timestamp(TimestampUs, Message0),
     MSize = approx_message_size(Message0),
     MSize = approx_message_size(Message0),
     assign_timestamps(false, Latest, Rest, [Message | Acc], N + 1, Sz + MSize);
     assign_timestamps(false, Latest, Rest, [Message | Acc], N + 1, Sz + MSize);
+assign_timestamps(ForceMonotonic, Latest0, [{Timestamp, #message{} = Message0} | Rest], Acc, N, Sz) ->
+    Latest = max(Latest0, Timestamp),
+    Message = assign_timestamp(Timestamp, Message0),
+    MSize = approx_message_size(Message0),
+    assign_timestamps(ForceMonotonic, Latest, Rest, [Message | Acc], N + 1, Sz + MSize);
 assign_timestamps(ForceMonotonic, Latest, [Operation | Rest], Acc, N, Sz) ->
 assign_timestamps(ForceMonotonic, Latest, [Operation | Rest], Acc, N, Sz) ->
     assign_timestamps(ForceMonotonic, Latest, Rest, [Operation | Acc], N + 1, Sz);
     assign_timestamps(ForceMonotonic, Latest, Rest, [Operation | Acc], N + 1, Sz);
 assign_timestamps(_ForceMonotonic, Latest, [], Acc, N, Size) ->
 assign_timestamps(_ForceMonotonic, Latest, [], Acc, N, Size) ->

+ 1 - 0
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -112,6 +112,7 @@
 -type operation() ::
 -type operation() ::
     %% Store a message.
     %% Store a message.
     message()
     message()
+    | {time(), message()}
     %% Delete a message.
     %% Delete a message.
     %% Does nothing if the message does not exist.
     %% Does nothing if the message does not exist.
     | deletion().
     | deletion().

+ 4 - 0
apps/emqx_durable_storage/src/emqx_ds_buffer.erl

@@ -45,6 +45,8 @@
 
 
 -define(cbm(DB), {?MODULE, DB}).
 -define(cbm(DB), {?MODULE, DB}).
 
 
+-type message() :: emqx_types:message() | {emqx_ds:time(), emqx_types:message()}.
+
 -record(enqueue_req, {
 -record(enqueue_req, {
     operations :: [emqx_ds:operation()],
     operations :: [emqx_ds:operation()],
     sync :: boolean(),
     sync :: boolean(),
@@ -416,6 +418,8 @@ cancel_timer(S = #s{tref = TRef}) ->
 
 
 %% @doc Return approximate size of the MQTT message (it doesn't take
 %% @doc Return approximate size of the MQTT message (it doesn't take
 %% all things into account, for example headers and extras)
 %% all things into account, for example headers and extras)
+payload_size({_TS, #message{} = Message}) ->
+    payload_size(Message);
 payload_size(#message{payload = P, topic = T}) ->
 payload_size(#message{payload = P, topic = T}) ->
     size(P) + size(T);
     size(P) + size(T);
 payload_size({_OpName, _}) ->
 payload_size({_OpName, _}) ->

+ 2 - 5
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -326,11 +326,8 @@ t_persistent_sessions1(Config) ->
             C2 = connect_client(#{port => Port1, clientid => ClientId}),
             C2 = connect_client(#{port => Port1, clientid => ClientId}),
             assert_single_client(#{node => N1, clientid => ClientId, status => connected}, Config),
             assert_single_client(#{node => N1, clientid => ClientId, status => connected}, Config),
             %% 4) Client disconnects.
             %% 4) Client disconnects.
-            ok = emqtt:stop(C2),
             %% 5) Session is GC'ed, client is removed from list.
             %% 5) Session is GC'ed, client is removed from list.
-            ?tp(notice, "gc", #{}),
-            %% simulate GC
-            ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]),
+            disconnect_and_destroy_session(C2),
             ?retry(
             ?retry(
                 100,
                 100,
                 20,
                 20,
@@ -510,7 +507,7 @@ t_persistent_sessions5(Config) ->
                 list_request(#{limit => 2, page => 1}, Config)
                 list_request(#{limit => 2, page => 1}, Config)
             ),
             ),
             %% Disconnect persistent sessions
             %% Disconnect persistent sessions
-            lists:foreach(fun emqtt:stop/1, [C1, C2]),
+            lists:foreach(fun stop_and_commit/1, [C1, C2]),
 
 
             P3 =
             P3 =
                 ?retry(200, 10, begin
                 ?retry(200, 10, begin

+ 6 - 1
mix.exs

@@ -462,7 +462,12 @@ defmodule EMQXUmbrella.MixProject do
       {:d, :snk_kind, :msg}
       {:d, :snk_kind, :msg}
     ] ++
     ] ++
       singleton(test_env?(), {:d, :TEST}) ++
       singleton(test_env?(), {:d, :TEST}) ++
-      singleton(not enable_quicer?(), {:d, :BUILD_WITHOUT_QUIC})
+      singleton(not enable_quicer?(), {:d, :BUILD_WITHOUT_QUIC}) ++
+      singleton(store_state_in_ds?(), {:d, :STORE_STATE_IN_DS, true})
+  end
+
+  defp store_state_in_ds?() do
+    "1" == System.get_env("STORE_STATE_IN_DS")
   end
   end
 
 
   defp singleton(false, _value), do: []
   defp singleton(false, _value), do: []

+ 1 - 0
rebar.config.erl

@@ -222,6 +222,7 @@ common_compile_opts(Edition, _RelType, Vsn) ->
         {d, 'EMQX_RELEASE_EDITION', Edition}
         {d, 'EMQX_RELEASE_EDITION', Edition}
     ] ++
     ] ++
         [{d, 'EMQX_BENCHMARK'} || os:getenv("EMQX_BENCHMARK") =:= "1"] ++
         [{d, 'EMQX_BENCHMARK'} || os:getenv("EMQX_BENCHMARK") =:= "1"] ++
+        [{d, 'STORE_STATE_IN_DS'} || os:getenv("STORE_STATE_IN_DS") =:= "1"] ++
         [{d, 'BUILD_WITHOUT_QUIC'} || not is_quicer_supported()].
         [{d, 'BUILD_WITHOUT_QUIC'} || not is_quicer_supported()].
 
 
 warn_profile_env() ->
 warn_profile_env() ->