Просмотр исходного кода

Merge pull request #12368 from thalesmg/ds-inherit-wildcards-only-lts-m-20240122

perf(ds): inherit only LTS paths containing wildcards when adding a new generation
Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
77329209a2

+ 13 - 4
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -438,10 +438,19 @@ t_message_gc(Config) ->
             TopicFilter = emqx_topic:words(<<"#">>),
             StartTime = 0,
             Msgs = consume(TopicFilter, StartTime),
-            %% only "1" and "2" should have been GC'ed
-            ?assertEqual(
-                sets:from_list([<<"3">>, <<"4">>], [{version, 2}]),
-                sets:from_list([emqx_message:payload(Msg) || Msg <- Msgs], [{version, 2}])
+            %% "1" and "2" should have been GC'ed
+            PresentMessages = sets:from_list(
+                [emqx_message:payload(Msg) || Msg <- Msgs],
+                [{version, 2}]
+            ),
+            ?assert(
+                sets:is_empty(
+                    sets:intersection(
+                        PresentMessages,
+                        sets:from_list([<<"1">>, <<"2">>], [{version, 2}])
+                    )
+                ),
+                #{present_messages => PresentMessages}
             ),
 
             ok

+ 143 - 5
apps/emqx_durable_storage/src/emqx_ds_lts.erl

@@ -20,7 +20,7 @@
 -export([
     trie_create/1, trie_create/0,
     trie_restore/2,
-    trie_restore_existing/2,
+    trie_copy_learned_paths/2,
     topic_key/3,
     match_topics/2,
     lookup_topic_key/2
@@ -120,10 +120,6 @@ trie_create() ->
 -spec trie_restore(options(), [{_Key, _Val}]) -> trie().
 trie_restore(Options, Dump) ->
     Trie = trie_create(Options),
-    trie_restore_existing(Trie, Dump).
-
--spec trie_restore_existing(trie(), [{_Key, _Val}]) -> trie().
-trie_restore_existing(Trie, Dump) ->
     lists:foreach(
         fun({{StateFrom, Token}, StateTo}) ->
             trie_insert(Trie, StateFrom, Token, StateTo)
@@ -132,6 +128,17 @@ trie_restore_existing(Trie, Dump) ->
     ),
     Trie.
 
+-spec trie_copy_learned_paths(trie(), trie()) -> trie().
+trie_copy_learned_paths(OldTrie, NewTrie) ->
+    WildcardPaths = [P || P <- paths(OldTrie), contains_wildcard(P)],
+    lists:foreach(
+        fun({{StateFrom, Token}, StateTo}) ->
+            trie_insert(NewTrie, StateFrom, Token, StateTo)
+        end,
+        lists:flatten(WildcardPaths)
+    ),
+    NewTrie.
+
 %% @doc Lookup the topic key. Create a new one, if not found.
 -spec topic_key(trie(), threshold_fun(), [binary() | '']) -> msg_storage_key().
 topic_key(Trie, ThresholdFun, Tokens) ->
@@ -385,6 +392,41 @@ emanating(#trie{trie = Tab}, State, Token) when is_binary(Token); Token =:= '' -
                 ets:lookup(Tab, {State, Token})
     ].
 
+all_emanating(#trie{trie = Tab}, State) ->
+    ets:select(
+        Tab,
+        ets:fun2ms(fun(#trans{key = {S, Edge}, next = Next}) when S == State ->
+            {{S, Edge}, Next}
+        end)
+    ).
+
+paths(#trie{} = T) ->
+    Roots = all_emanating(T, ?PREFIX),
+    lists:flatmap(
+        fun({Segment, Next}) ->
+            follow_path(T, Next, [{Segment, Next}])
+        end,
+        Roots
+    ).
+
+follow_path(#trie{} = T, State, Path) ->
+    lists:flatmap(
+        fun
+            ({{_State, ?EOT}, _Next} = Segment) ->
+                [lists:reverse([Segment | Path])];
+            ({_Edge, Next} = Segment) ->
+                follow_path(T, Next, [Segment | Path])
+        end,
+        all_emanating(T, State)
+    ).
+
+contains_wildcard([{{_State, ?PLUS}, _Next} | _Rest]) ->
+    true;
+contains_wildcard([_ | Rest]) ->
+    contains_wildcard(Rest);
+contains_wildcard([]) ->
+    false.
+
 %%================================================================================
 %% Tests
 %%================================================================================
@@ -636,4 +678,100 @@ test_key(Trie, Threshold, Topic0) ->
     {ok, Ret} = lookup_topic_key(Trie, Topic),
     Ret.
 
+paths_test() ->
+    T = trie_create(),
+    Threshold = 4,
+    ThresholdFun = fun
+        (0) -> 1000;
+        (_) -> Threshold
+    end,
+    PathsToInsert =
+        [
+            [''],
+            [1],
+            [2, 2],
+            [3, 3, 3],
+            [2, 3, 4]
+        ] ++ [[4, I, 4] || I <- lists:seq(1, Threshold + 2)] ++
+            [['', I, ''] || I <- lists:seq(1, Threshold + 2)],
+    lists:foreach(
+        fun(PathSpec) ->
+            test_key(T, ThresholdFun, PathSpec)
+        end,
+        PathsToInsert
+    ),
+
+    %% Test that the paths we've inserted are produced in the output
+    Paths = paths(T),
+    FormattedPaths = lists:map(fun format_path/1, Paths),
+    ExpectedWildcardPaths =
+        [
+            [4, '+', 4],
+            ['', '+', '']
+        ],
+    ExpectedPaths =
+        [
+            [''],
+            [1],
+            [2, 2],
+            [3, 3, 3]
+        ] ++ [[4, I, 4] || I <- lists:seq(1, Threshold)] ++
+            [['', I, ''] || I <- lists:seq(1, Threshold)] ++
+            ExpectedWildcardPaths,
+    FormatPathSpec =
+        fun(PathSpec) ->
+            lists:map(
+                fun
+                    (I) when is_integer(I) -> integer_to_binary(I);
+                    (A) -> A
+                end,
+                PathSpec
+            ) ++ [?EOT]
+        end,
+    lists:foreach(
+        fun(PathSpec) ->
+            Path = FormatPathSpec(PathSpec),
+            ?assert(
+                lists:member(Path, FormattedPaths),
+                #{
+                    paths => FormattedPaths,
+                    expected_path => Path
+                }
+            )
+        end,
+        ExpectedPaths
+    ),
+
+    %% Test filter function for paths containing wildcards
+    WildcardPaths = lists:filter(fun contains_wildcard/1, Paths),
+    FormattedWildcardPaths = lists:map(fun format_path/1, WildcardPaths),
+    ?assertEqual(
+        sets:from_list(FormattedWildcardPaths, [{version, 2}]),
+        sets:from_list(lists:map(FormatPathSpec, ExpectedWildcardPaths), [{version, 2}]),
+        #{
+            expected => ExpectedWildcardPaths,
+            wildcards => FormattedWildcardPaths
+        }
+    ),
+
+    %% Test that we're able to reconstruct the same trie from the paths
+    T2 = trie_create(),
+    [
+        trie_insert(T2, State, Edge, Next)
+     || Path <- Paths,
+        {{State, Edge}, Next} <- Path
+    ],
+    #trie{trie = Tab1} = T,
+    #trie{trie = Tab2} = T2,
+    Dump1 = sets:from_list(ets:tab2list(Tab1), [{version, 2}]),
+    Dump2 = sets:from_list(ets:tab2list(Tab2), [{version, 2}]),
+    ?assertEqual(Dump1, Dump2),
+
+    ok.
+
+format_path([{{_State, Edge}, _Next} | Rest]) ->
+    [Edge | format_path(Rest)];
+format_path([]) ->
+    [].
+
 -endif.

+ 9 - 18
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -205,17 +205,15 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
     s().
 post_creation_actions(
     #{
-        db := DBHandle,
-        old_gen_id := OldGenId,
-        old_cf_refs := OldCFRefs,
-        new_gen_runtime_data := NewGenData0
+        new_gen_runtime_data := NewGenData,
+        old_gen_runtime_data := OldGenData
     }
 ) ->
-    {_, OldTrieCF} = lists:keyfind(trie_cf(OldGenId), 1, OldCFRefs),
-    #s{trie = NewTrie0} = NewGenData0,
-    NewTrie = copy_previous_trie(DBHandle, NewTrie0, OldTrieCF),
+    #s{trie = OldTrie} = OldGenData,
+    #s{trie = NewTrie0} = NewGenData,
+    NewTrie = copy_previous_trie(OldTrie, NewTrie0),
     ?tp(bitfield_lts_inherited_trie, #{}),
-    NewGenData0#s{trie = NewTrie}.
+    NewGenData#s{trie = NewTrie}.
 
 -spec drop(
     emqx_ds_storage_layer:shard_id(),
@@ -533,16 +531,9 @@ restore_trie(TopicIndexBytes, DB, CF) ->
         rocksdb:iterator_close(IT)
     end.
 
--spec copy_previous_trie(rocksdb:db_handle(), emqx_ds_lts:trie(), rocksdb:cf_handle()) ->
-    emqx_ds_lts:trie().
-copy_previous_trie(DBHandle, NewTrie, OldCF) ->
-    {ok, IT} = rocksdb:iterator(DBHandle, OldCF, []),
-    try
-        OldDump = read_persisted_trie(IT, rocksdb:iterator_move(IT, first)),
-        emqx_ds_lts:trie_restore_existing(NewTrie, OldDump)
-    after
-        rocksdb:iterator_close(IT)
-    end.
+-spec copy_previous_trie(emqx_ds_lts:trie(), emqx_ds_lts:trie()) -> emqx_ds_lts:trie().
+copy_previous_trie(OldTrie, NewTrie) ->
+    emqx_ds_lts:trie_copy_learned_paths(OldTrie, NewTrie).
 
 read_persisted_trie(IT, {ok, KeyB, ValB}) ->
     [