Просмотр исходного кода

refactor(ds): Create a CRUD module for the persistent session

ieQu1 2 лет назад
Родитель
Сommit
2d08aa88d8
1 измененных файлов с 508 добавлено и 0 удалено
  1. 508 0
      apps/emqx/src/emqx_persistent_session_ds_state.erl

+ 508 - 0
apps/emqx/src/emqx_persistent_session_ds_state.erl

@@ -0,0 +1,508 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% @doc CRUD interface for the persistent session
+%%
+%% This module encapsulates the data related to the state of the
+%% inflight messages for the persistent session based on DS.
+%%
+%% It is responsible for saving, caching, and restoring session state.
+%% It is completely devoid of business logic. Not even the default
+%% values should be set in this module.
+-module(emqx_persistent_session_ds_state).
+
+-export([create_tables/0]).
+
+-export([open/1, create_new/1, delete/1, commit/1, print_session/1]).
+-export([get_created_at/1, set_created_at/2]).
+-export([get_last_alive_at/1, set_last_alive_at/2]).
+-export([get_conninfo/1, set_conninfo/2]).
+-export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]).
+-export([get_seqno/2, put_seqno/3]).
+-export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]).
+-export([get_subscriptions/1, put_subscription/4, del_subscription/3]).
+
+%% internal exports:
+-export([]).
+
+-export_type([t/0, seqno_type/0]).
+
+-include("emqx_persistent_session_ds.hrl").
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+%% Generic key-value wrapper that is used for exporting arbitrary
+%% terms to mnesia:
+-record(kv, {
+    k :: term(),
+    v :: map()
+}).
+
+%% Persistent map.
+%%
+%% Pmap accumulates the updates in a term stored in the heap of a
+%% process, so they can be committed all at once in a single
+%% transaction.
+%%
+%% It should be possible to make frequent changes to the pmap without
+%% stressing Mria.
+%%
+%% It's implemented as two maps: `clean' and `dirty'. Updates are made
+%% to the `dirty' area. `pmap_commit' function saves the updated
+%% entries to Mnesia and moves them to the `clean' area.
+-record(pmap, {table, clean, dirty, tombstones}).
+
+-type pmap(K, V) ::
+    #pmap{
+        table :: atom(),
+        clean :: #{K => V},
+        dirty :: #{K => V},
+        tombstones :: #{K => _}
+    }.
+
+%% Session metadata:
+-define(created_at, created_at).
+-define(last_alive_at, last_alive_at).
+-define(conninfo, conninfo).
+
+-type metadata() ::
+    #{
+        ?created_at => emqx_persistent_session_ds:timestamp(),
+        ?last_alive_at => emqx_persistent_session_ds:timestamp(),
+        ?conninfo => emqx_types:conninfo()
+    }.
+
+-type seqno_type() :: next | acked | pubrel.
+
+-opaque t() :: #{
+    id := emqx_persistent_session_ds:id(),
+    dirty := boolean(),
+    metadata := metadata(),
+    subscriptions := emqx_persistent_session_ds:subscriptions(),
+    seqnos := pmap(seqno_type(), emqx_persistent_session_ds:seqno()),
+    streams := pmap(emqx_ds:stream(), emqx_persistent_message_ds_replayer:stream_state()),
+    ranks := pmap(term(), integer())
+}.
+
+-define(session_tab, emqx_ds_session_tab).
+-define(subscription_tab, emqx_ds_session_subscriptions).
+-define(stream_tab, emqx_ds_session_streams).
+-define(seqno_tab, emqx_ds_session_seqnos).
+-define(rank_tab, emqx_ds_session_ranks).
+-define(bag_tables, [?stream_tab, ?seqno_tab, ?rank_tab]).
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+-spec create_tables() -> ok.
+create_tables() ->
+    ok = mria:create_table(
+        ?session_tab,
+        [
+            {rlog_shard, ?DS_MRIA_SHARD},
+            {type, set},
+            {storage, rocksdb_copies},
+            {record_name, kv},
+            {attributes, record_info(fields, kv)}
+        ]
+    ),
+    [create_kv_bag_table(Table) || Table <- ?bag_tables],
+    mria:wait_for_tables([?session_tab | ?bag_tables]).
+
+-spec open(emqx_persistent_session_ds:session_id()) -> {ok, t()} | undefined.
+open(SessionId) ->
+    ro_transaction(fun() ->
+        case kv_restore(?session_tab, SessionId) of
+            [Metadata] ->
+                Rec = #{
+                    id => SessionId,
+                    metadata => Metadata,
+                    subscriptions => read_subscriptions(SessionId),
+                    streams => pmap_open(?stream_tab, SessionId),
+                    seqnos => pmap_open(?seqno_tab, SessionId),
+                    ranks => pmap_open(?rank_tab, SessionId),
+                    dirty => false
+                },
+                {ok, Rec};
+            [] ->
+                undefined
+        end
+    end).
+
+-spec print_session(emqx_persistent_session_ds:id()) -> map() | undefined.
+print_session(SessionId) ->
+    case open(SessionId) of
+        undefined ->
+            undefined;
+        #{
+            metadata := Metadata,
+            subscriptions := SubsGBT,
+            streams := Streams,
+            seqnos := Seqnos,
+            ranks := Ranks
+        } ->
+            Subs = emqx_topic_gbt:fold(
+                fun(Key, Sub, Acc) -> maps:put(Key, Sub, Acc) end,
+                #{},
+                SubsGBT
+            ),
+            #{
+                session => Metadata,
+                subscriptions => Subs,
+                streams => Streams#pmap.clean,
+                seqnos => Seqnos#pmap.clean,
+                ranks => Ranks#pmap.clean
+            }
+    end.
+
+-spec delete(emqx_persistent_session_ds:id()) -> ok.
+delete(Id) ->
+    transaction(
+        fun() ->
+            [kv_delete(Table, Id) || Table <- ?bag_tables],
+            mnesia:delete(?session_tab, Id, write)
+        end
+    ).
+
+-spec commit(t()) -> t().
+commit(Rec = #{dirty := false}) ->
+    Rec;
+commit(
+    Rec = #{
+        id := SessionId,
+        metadata := Metadata,
+        subscriptions := Subs,
+        streams := Streams,
+        seqnos := SeqNos,
+        ranks := Ranks
+    }
+) ->
+    transaction(fun() ->
+        kv_persist(?session_tab, SessionId, Metadata),
+        Rec#{
+            subscriptions => pmap_commit(SessionId, Subs),
+            streams => pmap_commit(SessionId, Streams),
+            seqnos => pmap_commit(SessionId, SeqNos),
+            ranksz => pmap_commit(SessionId, Ranks),
+            dirty => false
+        }
+    end).
+
+-spec create_new(emqx_persistent_session_ds:id()) -> t().
+create_new(SessionId) ->
+    transaction(fun() ->
+        delete(SessionId),
+        #{
+            id => SessionId,
+            metadata => #{},
+            subscriptions => emqx_topic_gbt:new(),
+            streams => pmap_open(?stream_tab, SessionId),
+            seqnos => pmap_open(?seqno_tab, SessionId),
+            ranks => pmap_open(?rank_tab, SessionId),
+            dirty => true
+        }
+    end).
+
+%%
+
+-spec get_created_at(t()) -> emqx_persistent_session_ds:timestamp() | undefined.
+get_created_at(Rec) ->
+    get_meta(?created_at, Rec).
+
+-spec set_created_at(emqx_persistent_session_ds:timestamp(), t()) -> t().
+set_created_at(Val, Rec) ->
+    set_meta(?created_at, Val, Rec).
+
+-spec get_last_alive_at(t()) -> emqx_persistent_session_ds:timestamp() | undefined.
+get_last_alive_at(Rec) ->
+    get_meta(?last_alive_at, Rec).
+
+-spec set_last_alive_at(emqx_persistent_session_ds:timestamp(), t()) -> t().
+set_last_alive_at(Val, Rec) ->
+    set_meta(?last_alive_at, Val, Rec).
+
+-spec get_conninfo(t()) -> emqx_types:conninfo() | undefined.
+get_conninfo(Rec) ->
+    get_meta(?conninfo, Rec).
+
+-spec set_conninfo(emqx_types:conninfo(), t()) -> t().
+set_conninfo(Val, Rec) ->
+    set_meta(?conninfo, Val, Rec).
+
+%%
+
+-spec get_stream(emqx_persistent_session_ds:stream(), t()) ->
+    emqx_persistent_message_ds_replayer:stream_state() | undefined.
+get_stream(Key, Rec) ->
+    gen_get(streams, Key, Rec).
+
+-spec put_stream(
+    emqx_persistent_session_ds:stream(), emqx_persistent_message_ds_replayer:stream_state(), t()
+) -> t().
+put_stream(Key, Val, Rec) ->
+    gen_put(streams, Key, Val, Rec).
+
+-spec del_stream(emqx_persistent_session_ds:stream(), t()) -> t().
+del_stream(Key, Rec) ->
+    gen_del(stream, Key, Rec).
+
+-spec fold_streams(fun(), Acc, t()) -> Acc.
+fold_streams(Fun, Acc, Rec) ->
+    gen_fold(streams, Fun, Acc, Rec).
+
+%%
+
+-spec get_seqno(seqno_type(), t()) -> emqx_persistent_session_ds:seqno() | undefined.
+get_seqno(Key, Rec) ->
+    gen_get(seqnos, Key, Rec).
+
+-spec put_seqno(seqno_type(), emqx_persistent_session_ds:seqno(), t()) -> t().
+put_seqno(Key, Val, Rec) ->
+    gen_put(seqnos, Key, Val, Rec).
+
+%%
+
+-spec get_rank(term(), t()) -> integer() | undefined.
+get_rank(Key, Rec) ->
+    gen_get(ranks, Key, Rec).
+
+-spec put_rank(term(), integer(), t()) -> t().
+put_rank(Key, Val, Rec) ->
+    gen_put(ranks, Key, Val, Rec).
+
+-spec del_rank(term(), t()) -> t().
+del_rank(Key, Rec) ->
+    gen_del(ranks, Key, Rec).
+
+-spec fold_ranks(fun(), Acc, t()) -> Acc.
+fold_ranks(Fun, Acc, Rec) ->
+    gen_fold(ranks, Fun, Acc, Rec).
+
+%%
+
+-spec get_subscriptions(t()) -> emqx_persistent_session_ds:subscriptions().
+get_subscriptions(#{subscriptions := Subs}) ->
+    Subs.
+
+-spec put_subscription(
+    emqx_persistent_session_ds:subscription_id(),
+    _SubId,
+    emqx_persistent_session_ds:subscription(),
+    t()
+) -> t().
+put_subscription(TopicFilter, SubId, Subscription, Rec = #{id := Id, subscriptions := Subs0}) ->
+    %% Note: currently changes to the subscriptions are persisted immediately.
+    Key = {TopicFilter, SubId},
+    transaction(fun() -> kv_bag_persist(?subscription_tab, Id, Key, Subscription) end),
+    Subs = emqx_topic_gbt:insert(TopicFilter, SubId, Subscription, Subs0),
+    Rec#{subscriptions => Subs}.
+
+-spec del_subscription(emqx_persistent_session_ds:topic_filter(), _SubId, t()) -> t().
+del_subscription(TopicFilter, SubId, Rec = #{id := Id, subscriptions := Subs0}) ->
+    %% Note: currently the subscriptions are persisted immediately.
+    Key = {TopicFilter, SubId},
+    transaction(fun() -> kv_bag_delete(?subscription_tab, Id, Key) end),
+    Subs = emqx_topic_gbt:delete(TopicFilter, SubId, Subs0),
+    Rec#{subscriptions => Subs}.
+
+%%================================================================================
+%% Internal functions
+%%================================================================================
+
+%% All mnesia reads and writes are passed through this function.
+%% Backward compatiblity issues can be handled here.
+encoder(encode, _Table, Term) ->
+    Term;
+encoder(decode, _Table, Term) ->
+    Term.
+
+%%
+
+get_meta(K, #{metadata := Meta}) ->
+    maps:get(K, Meta, undefined).
+
+set_meta(K, V, Rec = #{metadata := Meta}) ->
+    Rec#{metadata => maps:put(K, V, Meta), dirty => true}.
+
+%%
+
+gen_get(Field, Key, Rec) ->
+    pmap_get(Key, maps:get(Field, Rec)).
+
+gen_fold(Field, Fun, Acc, Rec) ->
+    pmap_fold(Fun, Acc, maps:get(Field, Rec)).
+
+gen_put(Field, Key, Val, Rec) ->
+    maps:update_with(
+        Field,
+        fun(PMap) -> pmap_put(Key, Val, PMap) end,
+        Rec#{dirty => true}
+    ).
+
+gen_del(Field, Key, Rec) ->
+    maps:update_with(
+        Field,
+        fun(PMap) -> pmap_del(Key, PMap) end,
+        Rec#{dirty => true}
+    ).
+
+%%
+
+read_subscriptions(SessionId) ->
+    Records = kv_bag_restore(?subscription_tab, SessionId),
+    lists:foldl(
+        fun({{TopicFilter, SubId}, Subscription}, Acc) ->
+            emqx_topic_gbt:insert(TopicFilter, SubId, Subscription, Acc)
+        end,
+        emqx_topic_gbt:new(),
+        Records
+    ).
+
+%%
+
+%% @doc Open a PMAP and fill the clean area with the data from DB.
+%% This functtion should be ran in a transaction.
+-spec pmap_open(atom(), emqx_persistent_session_ds:id()) -> pmap(_K, _V).
+pmap_open(Table, SessionId) ->
+    Clean = maps:from_list(kv_bag_restore(Table, SessionId)),
+    #pmap{
+        table = Table,
+        clean = Clean,
+        dirty = #{},
+        tombstones = #{}
+    }.
+
+-spec pmap_get(K, pmap(K, V)) -> V | undefined.
+pmap_get(K, #pmap{dirty = Dirty, clean = Clean}) ->
+    case Dirty of
+        #{K := V} ->
+            V;
+        _ ->
+            case Clean of
+                #{K := V} -> V;
+                _ -> undefined
+            end
+    end.
+
+-spec pmap_put(K, V, pmap(K, V)) -> pmap(K, V).
+pmap_put(K, V, Pmap = #pmap{dirty = Dirty, clean = Clean, tombstones = Tombstones}) ->
+    Pmap#pmap{
+        dirty = maps:put(K, V, Dirty),
+        clean = maps:remove(K, Clean),
+        tombstones = maps:remove(K, Tombstones)
+    }.
+
+-spec pmap_del(K, pmap(K, V)) -> pmap(K, V).
+pmap_del(
+    Key,
+    Pmap = #pmap{dirty = Dirty, clean = Clean, tombstones = Tombstones}
+) ->
+    %% Update the caches:
+    Pmap#pmap{
+        dirty = maps:remove(Key, Dirty),
+        clean = maps:remove(Key, Clean),
+        tombstones = Tombstones#{Key => del}
+    }.
+
+-spec pmap_fold(fun((K, V, A) -> A), A, pmap(K, V)) -> A.
+pmap_fold(Fun, Acc0, #pmap{clean = Clean, dirty = Dirty}) ->
+    Acc1 = maps:fold(Fun, Acc0, Dirty),
+    maps:fold(Fun, Acc1, Clean).
+
+-spec pmap_commit(emqx_persistent_session_ds:id(), pmap(K, V)) -> pmap(K, V).
+pmap_commit(
+    SessionId, Pmap = #pmap{table = Tab, dirty = Dirty, clean = Clean, tombstones = Tombstones}
+) ->
+    %% Commit deletions:
+    maps:foreach(fun(K, _) -> kv_bag_delete(Tab, SessionId, K) end, Tombstones),
+    %% Replace all records in the bag with the entries from the dirty area:
+    maps:foreach(
+        fun(K, V) ->
+            kv_bag_persist(Tab, SessionId, K, V)
+        end,
+        Dirty
+    ),
+    Pmap#pmap{
+        dirty = #{},
+        tombstones = #{},
+        clean = maps:merge(Clean, Dirty)
+    }.
+
+%% Functions dealing with set tables:
+
+kv_persist(Tab, SessionId, Val0) ->
+    Val = encoder(encode, Tab, Val0),
+    mnesia:write(Tab, #kv{k = SessionId, v = Val}, write).
+
+kv_delete(Table, Namespace) ->
+    mnesia:delete({Table, Namespace}).
+
+kv_restore(Tab, SessionId) ->
+    [encoder(decode, Tab, V) || #kv{v = V} <- mnesia:read(Tab, SessionId)].
+
+%% Functions dealing with bags:
+
+%% @doc Create a mnesia table for the PMAP:
+-spec create_kv_bag_table(atom()) -> ok.
+create_kv_bag_table(Table) ->
+    mria:create_table(Table, [
+        {type, bag},
+        {rlog_shard, ?DS_MRIA_SHARD},
+        {storage, rocksdb_copies},
+        {record_name, kv},
+        {attributes, record_info(fields, kv)}
+    ]).
+
+kv_bag_persist(Tab, SessionId, Key, Val0) ->
+    %% Remove the previous entry corresponding to the key:
+    kv_bag_delete(Tab, SessionId, Key),
+    %% Write data to mnesia:
+    Val = encoder(encode, Tab, Val0),
+    mnesia:write(Tab, #kv{k = SessionId, v = {Key, Val}}).
+
+kv_bag_restore(Tab, SessionId) ->
+    [{K, encoder(decode, Tab, V)} || #kv{v = {K, V}} <- mnesia:read(Tab, SessionId)].
+
+kv_bag_delete(Table, SessionId, Key) ->
+    %% Note: this match spec uses a fixed primary key, so it doesn't
+    %% require a table scan, and the transaction doesn't grab the
+    %% whole table lock:
+    MS = [{#kv{k = SessionId, v = {Key, '_'}}, [], ['$_']}],
+    Objs = mnesia:select(Table, MS, write),
+    lists:foreach(
+        fun(Obj) ->
+            mnesia:delete_object(Table, Obj, write)
+        end,
+        Objs
+    ).
+
+%%
+
+transaction(Fun) ->
+    case mnesia:is_transaction() of
+        true ->
+            Fun();
+        false ->
+            {atomic, Res} = mria:transaction(?DS_MRIA_SHARD, Fun),
+            Res
+    end.
+
+ro_transaction(Fun) ->
+    {atomic, Res} = mria:ro_transaction(?DS_MRIA_SHARD, Fun),
+    Res.