Browse Source

Merge pull request #13391 from keynslug/perf/EMQX-12611/avoid-stream-shuffle

perf(sessds): rotate through streams with iterators when fetching
Andrew Mayorov 1 year ago
parent
commit
187f5e5936

+ 18 - 9
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -182,6 +182,9 @@
     shared_sub_s := shared_sub_state(),
     %% Buffer:
     inflight := emqx_persistent_session_ds_inflight:t(),
+    %% Last fetched stream:
+    %% Used as a continuation point for fair stream scheduling.
+    last_fetched_stream => emqx_persistent_session_ds_state:stream_key(),
     %% In-progress replay:
     %% List of stream replay states to be added to the inflight buffer.
     replay => [{_StreamKey, stream_state()}, ...],
@@ -984,26 +987,32 @@ do_ensure_all_iterators_closed(_DSSessionID) ->
 %%--------------------------------------------------------------------
 
 fetch_new_messages(Session0 = #{s := S0}, ClientInfo) ->
-    Streams = emqx_persistent_session_ds_stream_scheduler:find_new_streams(S0),
-    Session1 = fetch_new_messages(Streams, Session0, ClientInfo),
+    LFS = maps:get(last_fetched_stream, Session0, beginning),
+    ItStream = emqx_persistent_session_ds_stream_scheduler:iter_next_streams(LFS, S0),
+    BatchSize = get_config(ClientInfo, [batch_size]),
+    Session1 = fetch_new_messages(ItStream, BatchSize, Session0, ClientInfo),
     #{s := S1, shared_sub_s := SharedSubS0} = Session1,
     {S2, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replayed(S1, SharedSubS0),
     Session1#{s => S2, shared_sub_s => SharedSubS1}.
 
-fetch_new_messages([], Session, _ClientInfo) ->
-    Session;
-fetch_new_messages([I | Streams], Session0 = #{inflight := Inflight}, ClientInfo) ->
-    BatchSize = get_config(ClientInfo, [batch_size]),
+fetch_new_messages(ItStream0, BatchSize, Session0, ClientInfo) ->
+    #{inflight := Inflight} = Session0,
     case emqx_persistent_session_ds_inflight:n_buffered(all, Inflight) >= BatchSize of
         true ->
             %% Buffer is full:
             Session0;
         false ->
-            Session = new_batch(I, BatchSize, Session0, ClientInfo),
-            fetch_new_messages(Streams, Session, ClientInfo)
+            case emqx_persistent_session_ds_stream_scheduler:next_stream(ItStream0) of
+                {StreamKey, Srs, ItStream} ->
+                    Session1 = new_batch(StreamKey, Srs, BatchSize, Session0, ClientInfo),
+                    Session = Session1#{last_fetched_stream => StreamKey},
+                    fetch_new_messages(ItStream, BatchSize, Session, ClientInfo);
+                none ->
+                    Session0
+            end
     end.
 
-new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
+new_batch(StreamKey, Srs0, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
     SN1 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S0),
     SN2 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S0),
     Srs1 = Srs0#srs{

+ 139 - 15
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl

@@ -39,7 +39,7 @@
 -export([get_peername/1, set_peername/2]).
 -export([get_protocol/1, set_protocol/2]).
 -export([new_id/1]).
--export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3, n_streams/1]).
+-export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3, iter_streams/2, n_streams/1]).
 -export([get_seqno/2, put_seqno/3]).
 -export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]).
 -export([
@@ -66,11 +66,14 @@
     n_awaiting_rel/1
 ]).
 
+-export([iter_next/1]).
+
 -export([make_session_iterator/0, session_iterator_next/2]).
 
 -export_type([
     t/0,
     metadata/0,
+    iter/2,
     seqno_type/0,
     stream_key/0,
     rank_key/0,
@@ -89,6 +92,8 @@
 
 -type message() :: emqx_types:message().
 
+-opaque iter(K, V) :: gb_trees:iter(K, V).
+
 -opaque session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'.
 
 %% Generic key-value wrapper that is used for exporting arbitrary
@@ -113,7 +118,7 @@
 -type pmap(K, V) ::
     #pmap{
         table :: atom(),
-        cache :: #{K => V},
+        cache :: #{K => V} | gb_trees:tree(K, V),
         dirty :: #{K => dirty | del}
     }.
 
@@ -476,6 +481,14 @@ del_stream(Key, Rec) ->
 fold_streams(Fun, Acc, Rec) ->
     gen_fold(?streams, Fun, Acc, Rec).
 
+-spec iter_streams(_StartAfter :: stream_key() | beginning, t()) ->
+    iter(stream_key(), emqx_persistent_session_ds:stream_state()).
+iter_streams(After, Rec) ->
+    %% NOTE
+    %% No special handling for `beginning', as it always compares less
+    %% than any `stream_key()'.
+    gen_iter_after(?streams, After, Rec).
+
 -spec n_streams(t()) -> non_neg_integer().
 n_streams(Rec) ->
     gen_size(?streams, Rec).
@@ -534,6 +547,12 @@ n_awaiting_rel(Rec) ->
 
 %%
 
+-spec iter_next(iter(K, V)) -> {K, V, iter(K, V)} | none.
+iter_next(It0) ->
+    gen_iter_next(It0).
+
+%%
+
 -spec make_session_iterator() -> session_iterator().
 make_session_iterator() ->
     mnesia:dirty_first(?session_tab).
@@ -601,6 +620,14 @@ gen_size(Field, Rec) ->
     check_sequence(Rec),
     pmap_size(maps:get(Field, Rec)).
 
+gen_iter_after(Field, After, Rec) ->
+    check_sequence(Rec),
+    pmap_iter_after(After, maps:get(Field, Rec)).
+
+gen_iter_next(It) ->
+    %% NOTE: Currently, gbt iterators is the only type of iterators.
+    gbt_iter_next(It).
+
 -spec update_pmaps(fun((pmap(_K, _V) | undefined, atom()) -> term()), map()) -> map().
 update_pmaps(Fun, Map) ->
     lists:foldl(
@@ -619,7 +646,7 @@ update_pmaps(Fun, Map) ->
 %% This functtion should be ran in a transaction.
 -spec pmap_open(atom(), emqx_persistent_session_ds:id()) -> pmap(_K, _V).
 pmap_open(Table, SessionId) ->
-    Clean = maps:from_list(kv_pmap_restore(Table, SessionId)),
+    Clean = cache_from_list(Table, kv_pmap_restore(Table, SessionId)),
     #pmap{
         table = Table,
         cache = Clean,
@@ -627,29 +654,29 @@ pmap_open(Table, SessionId) ->
     }.
 
 -spec pmap_get(K, pmap(K, V)) -> V | undefined.
-pmap_get(K, #pmap{cache = Cache}) ->
-    maps:get(K, Cache, undefined).
+pmap_get(K, #pmap{table = Table, cache = Cache}) ->
+    cache_get(Table, K, Cache).
 
 -spec pmap_put(K, V, pmap(K, V)) -> pmap(K, V).
-pmap_put(K, V, Pmap = #pmap{dirty = Dirty, cache = Cache}) ->
+pmap_put(K, V, Pmap = #pmap{table = Table, dirty = Dirty, cache = Cache}) ->
     Pmap#pmap{
-        cache = maps:put(K, V, Cache),
+        cache = cache_put(Table, K, V, Cache),
         dirty = Dirty#{K => dirty}
     }.
 
 -spec pmap_del(K, pmap(K, V)) -> pmap(K, V).
 pmap_del(
     Key,
-    Pmap = #pmap{dirty = Dirty, cache = Cache}
+    Pmap = #pmap{table = Table, dirty = Dirty, cache = Cache}
 ) ->
     Pmap#pmap{
-        cache = maps:remove(Key, Cache),
+        cache = cache_remove(Table, Key, Cache),
         dirty = Dirty#{Key => del}
     }.
 
 -spec pmap_fold(fun((K, V, A) -> A), A, pmap(K, V)) -> A.
-pmap_fold(Fun, Acc, #pmap{cache = Cache}) ->
-    maps:fold(Fun, Acc, Cache).
+pmap_fold(Fun, Acc, #pmap{table = Table, cache = Cache}) ->
+    cache_fold(Table, Fun, Acc, Cache).
 
 -spec pmap_commit(emqx_persistent_session_ds:id(), pmap(K, V)) -> pmap(K, V).
 pmap_commit(
@@ -660,7 +687,7 @@ pmap_commit(
             (K, del) ->
                 kv_pmap_delete(Tab, SessionId, K);
             (K, dirty) ->
-                V = maps:get(K, Cache),
+                V = cache_get(Tab, K, Cache),
                 kv_pmap_persist(Tab, SessionId, K, V)
         end,
         Dirty
@@ -670,13 +697,110 @@ pmap_commit(
     }.
 
 -spec pmap_format(pmap(_K, _V)) -> map().
-pmap_format(#pmap{cache = Cache}) ->
-    Cache.
+pmap_format(#pmap{table = Table, cache = Cache}) ->
+    cache_format(Table, Cache).
 
 -spec pmap_size(pmap(_K, _V)) -> non_neg_integer().
-pmap_size(#pmap{cache = Cache}) ->
+pmap_size(#pmap{table = Table, cache = Cache}) ->
+    cache_size(Table, Cache).
+
+pmap_iter_after(After, #pmap{table = Table, cache = Cache}) ->
+    %% NOTE: Only valid for gbt-backed PMAPs.
+    gbt = cache_data_type(Table),
+    gbt_iter_after(After, Cache).
+
+%%
+
+cache_data_type(?stream_tab) -> gbt;
+cache_data_type(_Table) -> map.
+
+cache_from_list(?stream_tab, L) ->
+    gbt_from_list(L);
+cache_from_list(_Table, L) ->
+    maps:from_list(L).
+
+cache_get(?stream_tab, K, Cache) ->
+    gbt_get(K, Cache, undefined);
+cache_get(_Table, K, Cache) ->
+    maps:get(K, Cache, undefined).
+
+cache_put(?stream_tab, K, V, Cache) ->
+    gbt_put(K, V, Cache);
+cache_put(_Table, K, V, Cache) ->
+    maps:put(K, V, Cache).
+
+cache_remove(?stream_tab, K, Cache) ->
+    gbt_remove(K, Cache);
+cache_remove(_Table, K, Cache) ->
+    maps:remove(K, Cache).
+
+cache_fold(?stream_tab, Fun, Acc, Cache) ->
+    gbt_fold(Fun, Acc, Cache);
+cache_fold(_Table, Fun, Acc, Cache) ->
+    maps:fold(Fun, Acc, Cache).
+
+cache_format(?stream_tab, Cache) ->
+    gbt_format(Cache);
+cache_format(_Table, Cache) ->
+    Cache.
+
+cache_size(?stream_tab, Cache) ->
+    gbt_size(Cache);
+cache_size(_Table, Cache) ->
     maps:size(Cache).
 
+%% PMAP Cache implementation backed by `gb_trees'.
+%% Supports iteration starting from specific key.
+
+gbt_from_list(L) ->
+    lists:foldl(
+        fun({K, V}, Acc) -> gb_trees:insert(K, V, Acc) end,
+        gb_trees:empty(),
+        L
+    ).
+
+gbt_get(K, Cache, undefined) ->
+    case gb_trees:lookup(K, Cache) of
+        none -> undefined;
+        {_, V} -> V
+    end.
+
+gbt_put(K, V, Cache) ->
+    gb_trees:enter(K, V, Cache).
+
+gbt_remove(K, Cache) ->
+    gb_trees:delete_any(K, Cache).
+
+gbt_format(Cache) ->
+    gb_trees:to_list(Cache).
+
+gbt_fold(Fun, Acc, Cache) ->
+    It = gb_trees:iterator(Cache),
+    gbt_fold_iter(Fun, Acc, It).
+
+gbt_fold_iter(Fun, Acc, It0) ->
+    case gb_trees:next(It0) of
+        {K, V, It} ->
+            gbt_fold_iter(Fun, Fun(K, V, Acc), It);
+        _ ->
+            Acc
+    end.
+
+gbt_size(Cache) ->
+    gb_trees:size(Cache).
+
+gbt_iter_after(After, Cache) ->
+    It0 = gb_trees:iterator_from(After, Cache),
+    case gb_trees:next(It0) of
+        {After, _, It} ->
+            It;
+        _ ->
+            It0
+    end.
+
+gbt_iter_next(It) ->
+    gb_trees:next(It).
+
 %% Functions dealing with set tables:
 
 kv_persist(Tab, SessionId, Val0) ->

+ 66 - 34
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl

@@ -16,7 +16,8 @@
 -module(emqx_persistent_session_ds_stream_scheduler).
 
 %% API:
--export([find_new_streams/1, find_replay_streams/1, is_fully_acked/2]).
+-export([iter_next_streams/2, next_stream/1]).
+-export([find_replay_streams/1, is_fully_acked/2]).
 -export([renew_streams/1, on_unsubscribe/2]).
 
 %% behavior callbacks:
@@ -35,6 +36,29 @@
 %% Type declarations
 %%================================================================================
 
+-type stream_key() :: emqx_persistent_session_ds_state:stream_key().
+-type stream_state() :: emqx_persistent_session_ds:stream_state().
+
+%% Restartable iterator with a filter and an iteration limit.
+-record(iter, {
+    limit :: non_neg_integer(),
+    filter,
+    it,
+    it_cont
+}).
+
+-type iter(K, V, IterInner) :: #iter{
+    filter :: fun((K, V) -> boolean()),
+    it :: IterInner,
+    it_cont :: IterInner
+}.
+
+-type iter_stream() :: iter(
+    stream_key(),
+    stream_state(),
+    emqx_persistent_session_ds_state:iter(stream_key(), stream_state())
+).
+
 %%================================================================================
 %% API functions
 %%================================================================================
@@ -70,9 +94,9 @@ find_replay_streams(S) ->
 %%
 %% This function is non-detereministic: it randomizes the order of
 %% streams to ensure fair replay of different topics.
--spec find_new_streams(emqx_persistent_session_ds_state:t()) ->
-    [{emqx_persistent_session_ds_state:stream_key(), emqx_persistent_session_ds:stream_state()}].
-find_new_streams(S) ->
+-spec iter_next_streams(_LastVisited :: stream_key(), emqx_persistent_session_ds_state:t()) ->
+    iter_stream().
+iter_next_streams(LastVisited, S) ->
     %% FIXME: this function is currently very sensitive to the
     %% consistency of the packet IDs on both broker and client side.
     %%
@@ -87,23 +111,44 @@ find_new_streams(S) ->
     %% after timeout?)
     Comm1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
     Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
-    shuffle(
-        emqx_persistent_session_ds_state:fold_streams(
-            fun
-                (_Key, #srs{it_end = end_of_stream}, Acc) ->
-                    Acc;
-                (Key, Stream, Acc) ->
-                    case is_fully_acked(Comm1, Comm2, Stream) andalso not Stream#srs.unsubscribed of
-                        true ->
-                            [{Key, Stream} | Acc];
-                        false ->
-                            Acc
-                    end
-            end,
-            [],
-            S
-        )
-    ).
+    Filter = fun(_Key, Stream) -> is_fetchable(Comm1, Comm2, Stream) end,
+    #iter{
+        %% Limit the iteration to one round over all streams:
+        limit = emqx_persistent_session_ds_state:n_streams(S),
+        %% Filter out the streams not eligible for fetching:
+        filter = Filter,
+        %% Start the iteration right after the last visited stream:
+        it = emqx_persistent_session_ds_state:iter_streams(LastVisited, S),
+        %% Restart the iteration from the beginning:
+        it_cont = emqx_persistent_session_ds_state:iter_streams(beginning, S)
+    }.
+
+-spec next_stream(iter_stream()) -> {stream_key(), stream_state(), iter_stream()} | none.
+next_stream(#iter{limit = 0}) ->
+    none;
+next_stream(ItStream0 = #iter{limit = N, filter = Filter, it = It0, it_cont = ItCont}) ->
+    case emqx_persistent_session_ds_state:iter_next(It0) of
+        {Key, Stream, It} ->
+            ItStream = ItStream0#iter{it = It, limit = N - 1},
+            case Filter(Key, Stream) of
+                true ->
+                    {Key, Stream, ItStream};
+                false ->
+                    next_stream(ItStream)
+            end;
+        none when It0 =/= ItCont ->
+            %% Restart the iteration from the beginning:
+            ItStream = ItStream0#iter{it = ItCont},
+            next_stream(ItStream);
+        none ->
+            %% No point in restarting the iteration, `ItCont` is empty:
+            none
+    end.
+
+is_fetchable(_Comm1, _Comm2, #srs{it_end = end_of_stream}) ->
+    false;
+is_fetchable(Comm1, Comm2, #srs{unsubscribed = Unsubscribed} = Stream) ->
+    is_fully_acked(Comm1, Comm2, Stream) andalso not Unsubscribed.
 
 %% @doc This function makes the session aware of the new streams.
 %%
@@ -410,19 +455,6 @@ is_fully_acked(_, _, #srs{
 is_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) ->
     (Comm1 >= S1) andalso (Comm2 >= S2).
 
--spec shuffle([A]) -> [A].
-shuffle(L0) ->
-    L1 = lists:map(
-        fun(A) ->
-            %% maybe topic/stream prioritization could be introduced here?
-            {rand:uniform(), A}
-        end,
-        L0
-    ),
-    L2 = lists:sort(L1),
-    {_, L} = lists:unzip(L2),
-    L.
-
 fold_proper_subscriptions(Fun, Acc, S) ->
     emqx_persistent_session_ds_state:fold_subscriptions(
         fun