William Yang пре 1 година
родитељ
комит
b14c8dde14

+ 1 - 1
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl

@@ -216,7 +216,7 @@ init(S) ->
         S
     ).
 
-%% @doc Find the streams that have uncommitted (in-flight) messages.
+%% @doc Find the streams that have unacked (in-flight) messages.
 %% Return them in the order they were previously replayed.
 -spec find_replay_streams(emqx_persistent_session_ds_state:t()) ->
     [{stream_key(), emqx_persistent_session_ds:stream_state()}].

+ 11 - 6
apps/emqx_durable_storage/src/emqx_ds_lts.erl

@@ -379,7 +379,8 @@ trie_next(#trie{trie = Trie}, State, Token) ->
     NChildren :: non_neg_integer(),
     Updated :: false | NChildren.
 trie_insert(Trie, State, Token) ->
-    trie_insert(Trie, State, Token, get_id_for_key(Trie, State, Token)).
+    NextState = get_id_for_key(Trie, State, Token),
+    trie_insert(Trie, State, Token, NextState).
 
 %%================================================================================
 %% Internal functions
@@ -413,6 +414,7 @@ trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token,
             {false, NextState}
     end.
 
+%% @doc Get storage static key
 -spec get_id_for_key(trie(), state(), edge()) -> static_key().
 get_id_for_key(#trie{is_binary_key = IsBin, static_key_size = Size}, State, Token) ->
     %% Requirements for the return value:
@@ -516,8 +518,10 @@ do_topic_key(Trie, ThresholdFun, Depth, State, [Tok | Rest], Tokens, Varying0) -
     Varying =
         case IsWildcard of
             _ when is_integer(NChildren), NChildren >= Threshold ->
+                %% Topic structure learnt!
                 %% Number of children for the trie node reached the
                 %% threshold, we need to insert wildcard here.
+                %% Next new children from next call will resue the WildcardState.
                 {_, _WildcardState} = trie_insert(Trie, State, ?PLUS),
                 Varying0;
             false ->
@@ -534,15 +538,16 @@ do_topic_key(Trie, ThresholdFun, Depth, State, [Tok | Rest], Tokens, Varying0) -
         end,
     do_topic_key(Trie, ThresholdFun, Depth + 1, NextState, Rest, [TokOrWildcard | Tokens], Varying).
 
-%% @doc Has side effects! Inserts missing elements
--spec trie_next_(trie(), state(), binary() | ?EOT) -> {New, Wildcard, state()} when
+%% @doc Has side effects! Inserts missing elements.
+-spec trie_next_(trie(), state(), binary() | ?EOT) -> {New, IsWildcard, state()} when
     New :: false | non_neg_integer(),
-    Wildcard :: boolean().
+    IsWildcard :: boolean().
 trie_next_(Trie, State, Token) ->
     case trie_next(Trie, State, Token) of
-        {Wildcard, NextState} ->
-            {false, Wildcard, NextState};
+        {IsWildcard, NextState} ->
+            {false, IsWildcard, NextState};
         undefined ->
+            %% No exists, create new static key for return
             {Updated, NextState} = trie_insert(Trie, State, Token),
             {Updated, false, NextState}
     end.

+ 22 - 18
apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl

@@ -370,19 +370,19 @@ update_iterator(_Shard, _Data, OldIter, DSKey) ->
             {ok, OldIter#it{ts = TS}}
     end.
 
-next(ShardId = {_DB, Shard}, S, It, BatchSize, TMax, IsCurrent) ->
+next(ShardId = {_DB, Shard}, S, ItSeed, BatchSize, TMax, IsCurrent) ->
     init_counters(),
-    Iterators = init_iterators(S, It),
+    PerLevelIterators = init_iterators(S, ItSeed),
     %% ?tp(notice, skipstream_init_iters, #{it => It, its => Iterators}),
     try
-        case next_loop(Shard, S, It, Iterators, BatchSize, TMax) of
+        case start_next_loop(Shard, S, ItSeed, PerLevelIterators, BatchSize, TMax) of
             {ok, _, []} when not IsCurrent ->
                 {ok, end_of_stream};
             Result ->
                 Result
         end
     after
-        free_iterators(Iterators),
+        free_iterators(PerLevelIterators),
         collect_counters(ShardId)
     end.
 
@@ -544,11 +544,12 @@ do_delete(CF, Batch, Static, KeyFamily, MsgKey) ->
 
 %%%%%%%% Iteration %%%%%%%%%%
 
+%% @doc Init iterators per level, order is important: e.g. [L1, L2, L3, L0].
 init_iterators(S, #it{static_index = Static, compressed_tf = CompressedTF}) ->
     do_init_iterators(S, Static, words(CompressedTF), 1).
 
 do_init_iterators(S, Static, ['+' | TopicFilter], WildcardLevel) ->
-    %% Ignore wildcard levels in the topic filter:
+    %% Ignore wildcard levels in the topic filter because it has no value to index '+'
     do_init_iterators(S, Static, TopicFilter, WildcardLevel + 1);
 do_init_iterators(S, Static, [Constraint | TopicFilter], WildcardLevel) ->
     %% Create iterator for the index stream:
@@ -574,24 +575,25 @@ do_init_iterators(S, Static, [], _WildcardLevel) ->
         }
     ].
 
-next_loop(
+start_next_loop(
     Shard,
     S = #s{trie = Trie},
-    It = #it{static_index = StaticIdx, ts = LastTS, compressed_tf = CompressedTF},
-    Iterators,
+    ItSeed = #it{static_index = StaticIdx, ts = LastTS, compressed_tf = CompressedTF},
+    PerLevelIterators,
     BatchSize,
     TMax
 ) ->
+    %% cache it?
     TopicStructure = get_topic_structure(Trie, StaticIdx),
     Ctx = #ctx{
         shard = Shard,
         s = S,
-        iters = Iterators,
+        iters = PerLevelIterators,
         topic_structure = TopicStructure,
         filter = words(CompressedTF),
         tmax = TMax
     },
-    next_loop(Ctx, It, BatchSize, {seek, inc_ts(LastTS)}, []).
+    next_loop(Ctx, ItSeed, BatchSize, {seek, inc_ts(LastTS)}, []).
 
 get_topic_structure(Trie, StaticIdx) ->
     case emqx_ds_lts:reverse_lookup(Trie, StaticIdx) of
@@ -612,7 +614,7 @@ next_loop(Ctx, It0, BatchSize, Op, Acc) ->
     %% }),
     #ctx{s = S, tmax = TMax, iters = Iterators} = Ctx,
     #it{static_index = StaticIdx, compressed_tf = CompressedTF} = It0,
-    case next_step(S, StaticIdx, CompressedTF, Iterators, undefined, Op) of
+    case next_step(S, StaticIdx, CompressedTF, Iterators, any, Op) of
         none ->
             %% ?tp(notice, skipstream_loop_result, #{r => none}),
             inc_counter(?DS_SKIPSTREAM_LTS_EOS),
@@ -655,14 +657,14 @@ next_step(
         {error, invalid_iterator} ->
             none;
         {ok, Key, Blob} ->
-            case match_key(StaticIdx, N, Hash, Key) of
+            case match_key_prefix(StaticIdx, N, Hash, Key) of
                 false ->
                     %% This should not happen, since we set boundaries
                     %% to the iterators, and overflow to a different
                     %% key prefix should be caught by the previous
                     %% clause:
                     none;
-                NextTS when ExpectedTS =:= undefined; NextTS =:= ExpectedTS ->
+                NextTS when ExpectedTS =:= any; NextTS =:= ExpectedTS ->
                     %% We found a key that corresponds to the
                     %% timestamp we expect.
                     %% ?tp(notice, ?MODULE_STRING "_step_hit", #{
@@ -670,6 +672,8 @@ next_step(
                     %% }),
                     case Iterators of
                         [] ->
+                            %% Last one in PerLevelIterators is the one and the only one data stream.
+                            0 = N,
                             %% This is data stream as well. Check
                             %% message for hash collisions and return
                             %% value:
@@ -685,7 +689,7 @@ next_step(
                                     {seek, NextTS + 1}
                             end;
                         _ ->
-                            %% This is index stream. Keep going:
+                            %% This is index stream. Keep matching NextTS in other levels.
                             next_step(S, StaticIdx, CompressedTF, Iterators, NextTS, {seek, NextTS})
                     end;
                 NextTS when NextTS > ExpectedTS, N > 0 ->
@@ -743,11 +747,11 @@ get_key_range(StaticIdx, WildcardIdx, Hash) ->
 
 -spec match_ds_key(emqx_ds_lts:static_key(), binary()) -> ts() | false.
 match_ds_key(StaticIdx, Key) ->
-    match_key(StaticIdx, 0, <<>>, Key).
+    match_key_prefix(StaticIdx, 0, <<>>, Key).
 
--spec match_key(emqx_ds_lts:static_key(), wildcard_idx(), wildcard_hash(), binary()) ->
+-spec match_key_prefix(emqx_ds_lts:static_key(), wildcard_idx(), wildcard_hash(), binary()) ->
     ts() | false.
-match_key(StaticIdx, 0, <<>>, Key) ->
+match_key_prefix(StaticIdx, 0, <<>>, Key) ->
     TSz = size(StaticIdx),
     case Key of
         <<StaticIdx:TSz/binary, 0:?wcb, Timestamp:?tsb>> ->
@@ -755,7 +759,7 @@ match_key(StaticIdx, 0, <<>>, Key) ->
         _ ->
             false
     end;
-match_key(StaticIdx, Idx, Hash, Key) when Idx > 0 ->
+match_key_prefix(StaticIdx, Idx, Hash, Key) when Idx > 0 ->
     Tsz = size(StaticIdx),
     Hsz = size(Hash),
     case Key of