Просмотр исходного кода

feat(lts): inherit previous generation's lts when possible

Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
7c0d37fdb9

+ 10 - 1
apps/emqx_durable_storage/src/emqx_ds_lts.erl

@@ -18,7 +18,12 @@
 
 %% API:
 -export([
-    trie_create/1, trie_create/0, trie_restore/2, topic_key/3, match_topics/2, lookup_topic_key/2
+    trie_create/1, trie_create/0,
+    trie_restore/2,
+    trie_restore_existing/2,
+    topic_key/3,
+    match_topics/2,
+    lookup_topic_key/2
 ]).
 
 %% Debug:
@@ -115,6 +120,10 @@ trie_create() ->
 -spec trie_restore(options(), [{_Key, _Val}]) -> trie().
 trie_restore(Options, Dump) ->
     Trie = trie_create(Options),
+    trie_restore_existing(Trie, Dump).
+
+-spec trie_restore_existing(trie(), [{_Key, _Val}]) -> trie().
+trie_restore_existing(Trie, Dump) ->
     lists:foreach(
         fun({{StateFrom, Token}, StateTo}) ->
             trie_insert(Trie, StateFrom, Token, StateTo)

+ 29 - 1
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -32,7 +32,8 @@
     get_streams/4,
     make_iterator/5,
     update_iterator/4,
-    next/4
+    next/4,
+    post_creation_actions/1
 ]).
 
 %% internal exports:
@@ -200,6 +201,22 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
         ts_offset = TSOffsetBits
     }.
 
+-spec post_creation_actions(emqx_ds_storage_layer:post_creation_context()) ->
+    s().
+post_creation_actions(
+    #{
+        db := DBHandle,
+        old_gen_id := OldGenId,
+        old_cf_refs := OldCFRefs,
+        new_gen_runtime_data := NewGenData0
+    }
+) ->
+    {_, OldTrieCF} = lists:keyfind(trie_cf(OldGenId), 1, OldCFRefs),
+    #s{trie = NewTrie0} = NewGenData0,
+    NewTrie = copy_previous_trie(DBHandle, NewTrie0, OldTrieCF),
+    ?tp(bitfield_lts_inherited_trie, #{}),
+    NewGenData0#s{trie = NewTrie}.
+
 -spec drop(
     emqx_ds_storage_layer:shard_id(),
     rocksdb:db_handle(),
@@ -516,6 +533,17 @@ restore_trie(TopicIndexBytes, DB, CF) ->
         rocksdb:iterator_close(IT)
     end.
 
+-spec copy_previous_trie(rocksdb:db_handle(), emqx_ds_lts:trie(), rocksdb:cf_handle()) ->
+    emqx_ds_lts:trie().
+copy_previous_trie(DBHandle, NewTrie, OldCF) ->
+    {ok, IT} = rocksdb:iterator(DBHandle, OldCF, []),
+    try
+        OldDump = read_persisted_trie(IT, rocksdb:iterator_move(IT, first)),
+        emqx_ds_lts:trie_restore_existing(NewTrie, OldDump)
+    after
+        rocksdb:iterator_close(IT)
+    end.
+
 read_persisted_trie(IT, {ok, KeyB, ValB}) ->
     [
         {binary_to_term(KeyB), binary_to_term(ValB)}

+ 33 - 0
apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl

@@ -131,6 +131,39 @@ t_get_streams(_Config) ->
     ?assert(lists:member(A, AllStreams)),
     ok.
 
+t_new_generation_inherit_trie(_Config) ->
+    %% This test checks that we inherit the previous generation's LTS when creating a new
+    %% generation.
+    ?check_trace(
+        begin
+            %% Create a bunch of topics to be learned in the first generation
+            Timestamps = lists:seq(1, 10_000, 100),
+            Batch = [
+                begin
+                    B = integer_to_binary(I),
+                    make_message(
+                        TS,
+                        <<"wildcard/", B/binary, "/suffix/", Suffix/binary>>,
+                        integer_to_binary(TS)
+                    )
+                end
+             || I <- lists:seq(1, 200),
+                TS <- Timestamps,
+                Suffix <- [<<"foo">>, <<"bar">>]
+            ],
+            ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []),
+            %% Now we create a new generation with the same LTS module.  It should inherit the
+            %% learned trie.
+            ok = emqx_ds_storage_layer:add_generation(?SHARD),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertMatch([_], ?of_kind(bitfield_lts_inherited_trie, Trace)),
+            ok
+        end
+    ),
+    ok.
+
 t_replay(_Config) ->
     %% Create concrete topics:
     Topics = [<<"foo/bar">>, <<"foo/bar/baz">>],