Просмотр исходного кода

feat(sessds): Index streams by a unique subid

ieQu1 2 лет назад
Родитель
Сommit
82ef34998a

+ 114 - 63
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -93,6 +93,7 @@
     seqno/0,
     timestamp/0,
     topic_filter/0,
+    subscription_id/0,
     subscription/0,
     session/0,
     stream_state/0
@@ -105,7 +106,10 @@
 -type id() :: binary().
 -type topic_filter() :: emqx_types:topic().
 
+-type subscription_id() :: integer().
+
 -type subscription() :: #{
+    id := subscription_id(),
     start_time := emqx_ds:time(),
     props := map(),
     extra := map()
@@ -286,16 +290,19 @@ subscribe(
             %% router and iterator information can be reconstructed
             %% from this table, if needed.
             ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID),
+            {SubId, S1} = emqx_persistent_session_ds_state:new_subid(S0),
             Subscription = #{
                 start_time => now_ms(),
-                props => SubOpts
+                props => SubOpts,
+                id => SubId
             },
             IsNew = true;
         Subscription0 = #{} ->
             Subscription = Subscription0#{props => SubOpts},
-            IsNew = false
+            IsNew = false,
+            S1 = S0
     end,
-    S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S0),
+    S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S1),
     ?tp(persistent_session_ds_subscription_added, #{
         topic_filter => TopicFilter, sub => Subscription, is_new => IsNew
     }),
@@ -309,7 +316,7 @@ unsubscribe(
 ) ->
     %% TODO: drop streams and messages from the buffer
     case subs_lookup(TopicFilter, S0) of
-        #{props := SubOpts} ->
+        #{props := SubOpts, id := _SubId} ->
             S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], S0),
             ?tp_span(
                 persistent_session_ds_subscription_route_delete,
@@ -477,7 +484,7 @@ disconnect(Session = #{s := S0}, _ConnInfo) ->
 
 -spec terminate(Reason :: term(), session()) -> ok.
 terminate(_Reason, _Session = #{s := S}) ->
-    emqx_persistent_session_ds_state:commit(S),
+    _ = emqx_persistent_session_ds_state:commit(S),
     ok.
 
 %%--------------------------------------------------------------------
@@ -584,7 +591,9 @@ do_ensure_all_iterators_closed(_DSSessionID) ->
 %%--------------------------------------------------------------------
 
 fill_buffer(Session = #{s := S}, ClientInfo) ->
-    fill_buffer(shuffle(find_new_streams(S)), Session, ClientInfo).
+    Streams = shuffle(find_new_streams(S)),
+    ?SLOG(error, #{msg => "fill_buffer", streams => Streams}),
+    fill_buffer(Streams, Session, ClientInfo).
 
 -spec shuffle([A]) -> [A].
 shuffle(L0) ->
@@ -827,82 +836,124 @@ find_new_streams(S) ->
 
 -spec renew_streams(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t().
 renew_streams(S0) ->
-    CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S0),
-    CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S0),
+    S1 = remove_old_streams(S0),
     subs_fold(
-        fun(TopicFilterBin, _Subscription = #{start_time := StartTime}, S1) ->
-            SubId = [],
+        fun(TopicFilterBin, _Subscription = #{start_time := StartTime, id := SubId}, S2) ->
             TopicFilter = emqx_topic:words(TopicFilterBin),
-            TopicStreams = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime),
-            TopicStreamGroups = maps:groups_from_list(fun({{X, _}, _}) -> X end, TopicStreams),
-            %% Iterate over groups of streams with the same rank X,
-            %% finding the first eligible stream to replay:
-            maps:fold(
-                fun(RankX, Streams, S2) ->
-                    Key = {RankX, SubId},
-                    case emqx_persistent_session_ds_state:get_stream(Key, S2) of
-                        undefined ->
-                            MinRankY = emqx_persistent_session_ds_state:get_rank(RankX, S2),
-                            start_stream_replay(
-                                TopicFilter, StartTime, Key, MinRankY, Streams, S2
-                            );
-                        Stream = #ifs{it_end = end_of_stream, rank_y = MinRankY} when
-                            ?fully_replayed(Stream, CommQos1, CommQos2)
-                        ->
-                            %% We have fully replayed the stream with
-                            %% the given rank X, and the client acked
-                            %% all messages:
-                            S3 = emqx_persistent_session_ds_state:del_stream(Key, S2),
-                            S4 = emqx_persistent_session_ds_state:put_rank(RankX, MinRankY, S3),
-                            start_stream_replay(TopicFilter, StartTime, Key, MinRankY, Streams, S4);
-                        #ifs{} ->
-                            %% Stream replay is currently in progress, leave it as is:
-                            S2
-                    end
+            Streams = select_streams(
+                SubId,
+                emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime),
+                S2
+            ),
+            lists:foldl(
+                fun(I, Acc) ->
+                    ensure_iterator(TopicFilter, StartTime, SubId, I, Acc)
                 end,
-                S1,
-                TopicStreamGroups
+                S2,
+                Streams
             )
         end,
-        S0,
-        S0
+        S1,
+        S1
     ).
 
-start_stream_replay(TopicFilter, StartTime, Key, MinRankY, Streams, S0) ->
-    case find_first_stream(MinRankY, Streams) of
-        {RankY, Stream} ->
+ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
+    Key = {SubId, Stream},
+    case emqx_persistent_session_ds_state:get_stream(Key, S) of
+        undefined ->
             {ok, Iterator} = emqx_ds:make_iterator(
                 ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
             ),
             NewStreamState = #ifs{
+                rank_x = RankX,
                 rank_y = RankY,
                 it_end = Iterator
             },
-            emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S0);
-        undefined ->
-            S0
+            emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S);
+        #ifs{} ->
+            S
+    end.
+
+select_streams(SubId, Streams0, S) ->
+    TopicStreamGroups = maps:groups_from_list(fun({{X, _}, _}) -> X end, Streams0),
+    maps:fold(
+        fun(RankX, Streams, Acc) ->
+            select_streams(SubId, RankX, Streams, S) ++ Acc
+        end,
+        [],
+        TopicStreamGroups
+    ).
+
+select_streams(SubId, RankX, Streams0, S) ->
+    %% 1. Find the streams with the rank Y greater than the recorded one:
+    Streams1 =
+        case emqx_persistent_session_ds_state:get_rank({SubId, RankX}, S) of
+            undefined ->
+                Streams0;
+            ReplayedY ->
+                [I || I = {{_, Y}, _} <- Streams0, Y > ReplayedY]
+        end,
+    %% 2. Sort streams by rank Y:
+    Streams = lists:sort(
+        fun({{_, Y1}, _}, {{_, Y2}, _}) ->
+            Y1 =< Y2
+        end,
+        Streams1
+    ),
+    %% 3. Select streams with the least rank Y:
+    case Streams of
+        [] ->
+            [];
+        [{{_, MinRankY}, _} | _] ->
+            lists:takewhile(fun({{_, Y}, _}) -> Y =:= MinRankY end, Streams)
     end.
 
-%% @doc Find the first stream with rank Y greater than the one given as the first argument.
--spec find_first_stream(emqx_ds:rank_y() | undefined, [
-    {emqx_ds:stream_rank(), emqx_ds:ds_specific_stream()}
-]) ->
-    {emqx_ds:rank_y(), emqx_ds:ds_specific_stream()} | undefined.
-find_first_stream(MinRankY, Streams) ->
-    lists:foldl(
+-spec remove_old_streams(emqx_persistent_session_ds_state:t()) ->
+    emqx_persistent_session_ds_state:t().
+remove_old_streams(S0) ->
+    CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S0),
+    CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S0),
+    %% 1. For each subscription, find the X ranks that were fully replayed:
+    Groups = emqx_persistent_session_ds_state:fold_streams(
+        fun({SubId, _Stream}, StreamState = #ifs{rank_x = RankX, rank_y = RankY, it_end = It}, Acc) ->
+            Key = {SubId, RankX},
+            IsComplete =
+                It =:= end_of_stream andalso ?fully_replayed(StreamState, CommQos1, CommQos2),
+            case {maps:get(Key, Acc, undefined), IsComplete} of
+                {undefined, true} ->
+                    Acc#{Key => {true, RankY}};
+                {_, false} ->
+                    Acc#{Key => false};
+                _ ->
+                    Acc
+            end
+        end,
+        #{},
+        S0
+    ),
+    %% 2. Advance rank y for each fully replayed set of streams:
+    S1 = maps:fold(
         fun
-            ({{_RankX, RankY}, Stream}, Acc) when RankY > MinRankY; MinRankY =:= undefined ->
-                case Acc of
-                    {AccY, _} when AccY < RankY ->
-                        Acc;
-                    _ ->
-                        {RankY, Stream}
-                end;
-            (_, Acc) ->
+            (Key, {true, RankY}, Acc) ->
+                emqx_persistent_session_ds_state:put_rank(Key, RankY, Acc);
+            (_, _, Acc) ->
                 Acc
         end,
-        undefined,
-        Streams
+        S0,
+        Groups
+    ),
+    %% 3. Remove the fully replayed streams:
+    emqx_persistent_session_ds_state:fold_streams(
+        fun(Key = {SubId, _Stream}, #ifs{rank_x = RankX, rank_y = RankY}, Acc) ->
+            case emqx_persistent_session_ds_state:get_rank({SubId, RankX}, Acc) of
+                MinRankY when RankY < MinRankY ->
+                    emqx_persistent_session_ds_state:del_stream(Key, Acc);
+                _ ->
+                    Acc
+            end
+        end,
+        S1,
+        S1
     ).
 
 %%--------------------------------------------------------------------------------

+ 1 - 0
apps/emqx/src/emqx_persistent_session_ds.hrl

@@ -27,6 +27,7 @@
 
 %% State of the stream:
 -record(ifs, {
+    rank_x :: emqx_ds:rank_x(),
     rank_y :: emqx_ds:rank_y(),
     %% Iterator at the end of the last batch:
     it_end :: emqx_ds:iterator() | undefined | end_of_stream,

+ 77 - 55
apps/emqx/src/emqx_persistent_session_ds_state.erl

@@ -26,10 +26,11 @@
 
 -export([create_tables/0]).
 
--export([open/1, create_new/1, delete/1, commit/1, print_session/1, list_sessions/0]).
+-export([open/1, create_new/1, delete/1, commit/1, format/1, print_session/1, list_sessions/0]).
 -export([get_created_at/1, set_created_at/2]).
 -export([get_last_alive_at/1, set_last_alive_at/2]).
 -export([get_conninfo/1, set_conninfo/2]).
+-export([new_subid/1]).
 -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]).
 -export([get_seqno/2, put_seqno/3]).
 -export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]).
@@ -38,7 +39,7 @@
 %% internal exports:
 -export([]).
 
--export_type([t/0, subscriptions/0, seqno_type/0, stream_key/0]).
+-export_type([t/0, subscriptions/0, seqno_type/0, stream_key/0, rank_key/0]).
 
 -include("emqx_persistent_session_ds.hrl").
 
@@ -78,18 +79,18 @@
 -define(created_at, created_at).
 -define(last_alive_at, last_alive_at).
 -define(conninfo, conninfo).
+-define(last_subid, last_subid).
 
 -type metadata() ::
     #{
         ?created_at => emqx_persistent_session_ds:timestamp(),
         ?last_alive_at => emqx_persistent_session_ds:timestamp(),
-        ?conninfo => emqx_types:conninfo()
+        ?conninfo => emqx_types:conninfo(),
+        ?last_subid => integer()
     }.
 
 -type seqno_type() :: term().
 
--type stream_key() :: {emqx_ds:rank_x(), _SubId}.
-
 -opaque t() :: #{
     id := emqx_persistent_session_ds:id(),
     dirty := boolean(),
@@ -151,27 +152,31 @@ print_session(SessionId) ->
     case open(SessionId) of
         undefined ->
             undefined;
-        {ok, #{
-            metadata := Metadata,
-            subscriptions := SubsGBT,
-            streams := Streams,
-            seqnos := Seqnos,
-            ranks := Ranks
-        }} ->
-            Subs = emqx_topic_gbt:fold(
-                fun(Key, Sub, Acc) -> maps:put(Key, Sub, Acc) end,
-                #{},
-                SubsGBT
-            ),
-            #{
-                session => Metadata,
-                subscriptions => Subs,
-                streams => Streams#pmap.clean,
-                seqnos => Seqnos#pmap.clean,
-                ranks => Ranks#pmap.clean
-            }
+        {ok, Session} ->
+            format(Session)
     end.
 
+-spec format(t()) -> map().
+format(#{
+    metadata := Metadata,
+    subscriptions := SubsGBT,
+    streams := Streams,
+    seqnos := Seqnos,
+    ranks := Ranks
+}) ->
+    Subs = emqx_topic_gbt:fold(
+        fun(Key, Sub, Acc) -> maps:put(Key, Sub, Acc) end,
+        #{},
+        SubsGBT
+    ),
+    #{
+        metadata => Metadata,
+        subscriptions => Subs,
+        streams => pmap_format(Streams),
+        seqnos => pmap_format(Seqnos),
+        ranks => pmap_format(Ranks)
+    }.
+
 -spec list_sessions() -> [emqx_persistent_session_ds:id()].
 list_sessions() ->
     mnesia:dirty_all_keys(?session_tab).
@@ -248,8 +253,46 @@ get_conninfo(Rec) ->
 set_conninfo(Val, Rec) ->
     set_meta(?conninfo, Val, Rec).
 
+-spec new_subid(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}.
+new_subid(Rec) ->
+    LastSubId =
+        case get_meta(?last_subid, Rec) of
+            undefined -> 0;
+            N when is_integer(N) -> N
+        end,
+    {LastSubId, set_meta(?last_subid, LastSubId + 1, Rec)}.
+
+%%
+
+-spec get_subscriptions(t()) -> subscriptions().
+get_subscriptions(#{subscriptions := Subs}) ->
+    Subs.
+
+-spec put_subscription(
+    emqx_persistent_session_ds:topic_filter(),
+    _SubId,
+    emqx_persistent_session_ds:subscription(),
+    t()
+) -> t().
+put_subscription(TopicFilter, SubId, Subscription, Rec = #{id := Id, subscriptions := Subs0}) ->
+    %% Note: currently changes to the subscriptions are persisted immediately.
+    Key = {TopicFilter, SubId},
+    transaction(fun() -> kv_bag_persist(?subscription_tab, Id, Key, Subscription) end),
+    Subs = emqx_topic_gbt:insert(TopicFilter, SubId, Subscription, Subs0),
+    Rec#{subscriptions => Subs}.
+
+-spec del_subscription(emqx_persistent_session_ds:topic_filter(), _SubId, t()) -> t().
+del_subscription(TopicFilter, SubId, Rec = #{id := Id, subscriptions := Subs0}) ->
+    %% Note: currently the subscriptions are persisted immediately.
+    Key = {TopicFilter, SubId},
+    transaction(fun() -> kv_bag_delete(?subscription_tab, Id, Key) end),
+    Subs = emqx_topic_gbt:delete(TopicFilter, SubId, Subs0),
+    Rec#{subscriptions => Subs}.
+
 %%
 
+-type stream_key() :: {emqx_persistent_session_ds:subscription_id(), emqx_ds:stream()}.
+
 -spec get_stream(stream_key(), t()) ->
     emqx_persistent_session_ds:stream_state() | undefined.
 get_stream(Key, Rec) ->
@@ -261,7 +304,7 @@ put_stream(Key, Val, Rec) ->
 
 -spec del_stream(stream_key(), t()) -> t().
 del_stream(Key, Rec) ->
-    gen_del(stream, Key, Rec).
+    gen_del(streams, Key, Rec).
 
 -spec fold_streams(fun(), Acc, t()) -> Acc.
 fold_streams(Fun, Acc, Rec) ->
@@ -279,15 +322,17 @@ put_seqno(Key, Val, Rec) ->
 
 %%
 
--spec get_rank(term(), t()) -> integer() | undefined.
+-type rank_key() :: {emqx_persistent_session_ds:subscription_id(), emqx_ds:rank_x()}.
+
+-spec get_rank(rank_key(), t()) -> integer() | undefined.
 get_rank(Key, Rec) ->
     gen_get(ranks, Key, Rec).
 
--spec put_rank(term(), integer(), t()) -> t().
+-spec put_rank(rank_key(), integer(), t()) -> t().
 put_rank(Key, Val, Rec) ->
     gen_put(ranks, Key, Val, Rec).
 
--spec del_rank(term(), t()) -> t().
+-spec del_rank(rank_key(), t()) -> t().
 del_rank(Key, Rec) ->
     gen_del(ranks, Key, Rec).
 
@@ -295,33 +340,6 @@ del_rank(Key, Rec) ->
 fold_ranks(Fun, Acc, Rec) ->
     gen_fold(ranks, Fun, Acc, Rec).
 
-%%
-
--spec get_subscriptions(t()) -> subscriptions().
-get_subscriptions(#{subscriptions := Subs}) ->
-    Subs.
-
--spec put_subscription(
-    emqx_persistent_session_ds:topic_filter(),
-    _SubId,
-    emqx_persistent_session_ds:subscription(),
-    t()
-) -> t().
-put_subscription(TopicFilter, SubId, Subscription, Rec = #{id := Id, subscriptions := Subs0}) ->
-    %% Note: currently changes to the subscriptions are persisted immediately.
-    Key = {TopicFilter, SubId},
-    transaction(fun() -> kv_bag_persist(?subscription_tab, Id, Key, Subscription) end),
-    Subs = emqx_topic_gbt:insert(TopicFilter, SubId, Subscription, Subs0),
-    Rec#{subscriptions => Subs}.
-
--spec del_subscription(emqx_persistent_session_ds:topic_filter(), _SubId, t()) -> t().
-del_subscription(TopicFilter, SubId, Rec = #{id := Id, subscriptions := Subs0}) ->
-    %% Note: currently the subscriptions are persisted immediately.
-    Key = {TopicFilter, SubId},
-    transaction(fun() -> kv_bag_delete(?subscription_tab, Id, Key) end),
-    Subs = emqx_topic_gbt:delete(TopicFilter, SubId, Subs0),
-    Rec#{subscriptions => Subs}.
-
 %%================================================================================
 %% Internal functions
 %%================================================================================
@@ -445,6 +463,10 @@ pmap_commit(
         clean = maps:merge(Clean, Dirty)
     }.
 
+-spec pmap_format(pmap(_K, _V)) -> map().
+pmap_format(#pmap{clean = Clean, dirty = Dirty}) ->
+    maps:merge(Clean, Dirty).
+
 %% Functions dealing with set tables:
 
 kv_persist(Tab, SessionId, Val0) ->

+ 14 - 9
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -811,7 +811,8 @@ t_publish_many_while_client_is_gone(Config) ->
     Msgs1 = receive_messages(NPubs1),
     ct:pal("Msgs1 = ~p", [Msgs1]),
     NMsgs1 = length(Msgs1),
-    ?assertEqual(NPubs1, NMsgs1, debug_info(ClientId)),
+    NPubs1 =:= NMsgs1 orelse
+        throw_with_debug_info({NPubs1, '==', NMsgs1}, ClientId),
 
     ?assertEqual(
         get_topicwise_order(Pubs1),
@@ -1086,11 +1087,15 @@ skip_ds_tc(Config) ->
             Config
     end.
 
-fail_with_debug_info(Exception, ClientId) ->
-    case emqx_cm:lookup_channels(ClientId) of
-        [Chan] ->
-            sys:get_state(Chan, 1000);
-        [] ->
-            no_channel
-    end,
-    exit(Exception).
+throw_with_debug_info(Error, ClientId) ->
+    Info =
+        case emqx_cm:lookup_channels(ClientId) of
+            [Pid] ->
+                #{channel := ChanState} = emqx_connection:get_state(Pid),
+                SessionState = emqx_channel:info(session_state, ChanState),
+                maps:update_with(s, fun emqx_persistent_session_ds_state:format/1, SessionState);
+            [] ->
+                no_channel
+        end,
+    ct:pal("!!! Assertion failed: ~p~nState:~n~p", [Error, Info]),
+    exit(Error).