Procházet zdrojové kódy

Merge pull request #13276 from keynslug/fix/ds/inherited-lts-lost

fix(dsstore): persist inherited LTS trie
Andrew Mayorov před 1 rokem
rodič
revize
eede9f349e

+ 16 - 4
apps/emqx_durable_storage/src/emqx_ds_lts.erl

@@ -20,6 +20,7 @@
 -export([
     trie_create/1, trie_create/0,
     destroy/1,
+    trie_dump/2,
     trie_restore/2,
     trie_update/2,
     trie_copy_learned_paths/2,
@@ -76,6 +77,8 @@
         static_key_size => pos_integer()
     }.
 
+-type dump() :: [{_Key, _Val}].
+
 -record(trie, {
     persist :: persist_callback(),
     static_key_size :: pos_integer(),
@@ -125,12 +128,12 @@ destroy(#trie{trie = Trie, stats = Stats}) ->
     ok.
 
 %% @doc Restore trie from a dump
--spec trie_restore(options(), [{_Key, _Val}]) -> trie().
+-spec trie_restore(options(), dump()) -> trie().
 trie_restore(Options, Dump) ->
     trie_update(trie_create(Options), Dump).
 
 %% @doc Update a trie with a dump of operations (used for replication)
--spec trie_update(trie(), [{_Key, _Val}]) -> trie().
+-spec trie_update(trie(), dump()) -> trie().
 trie_update(Trie, Dump) ->
     lists:foreach(
         fun({{StateFrom, Token}, StateTo}) ->
@@ -140,14 +143,23 @@ trie_update(Trie, Dump) ->
     ),
     Trie.
 
+-spec trie_dump(trie(), _Filter :: all | wildcard) -> dump().
+trie_dump(Trie, Filter) ->
+    case Filter of
+        all ->
+            Fun = fun(_) -> true end;
+        wildcard ->
+            Fun = fun contains_wildcard/1
+    end,
+    lists:append([P || P <- paths(Trie), Fun(P)]).
+
 -spec trie_copy_learned_paths(trie(), trie()) -> trie().
 trie_copy_learned_paths(OldTrie, NewTrie) ->
-    WildcardPaths = [P || P <- paths(OldTrie), contains_wildcard(P)],
     lists:foreach(
         fun({{StateFrom, Token}, StateTo}) ->
             trie_insert(NewTrie, StateFrom, Token, StateTo)
         end,
-        lists:flatten(WildcardPaths)
+        trie_dump(OldTrie, wildcard)
     ),
     NewTrie.
 

+ 25 - 21
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -25,7 +25,7 @@
 
 %% behavior callbacks:
 -export([
-    create/4,
+    create/5,
     open/5,
     drop/5,
     prepare_batch/4,
@@ -37,7 +37,6 @@
     update_iterator/4,
     next/6,
     delete_next/6,
-    post_creation_actions/1,
 
     handle_event/4
 ]).
@@ -179,10 +178,11 @@
     emqx_ds_storage_layer:shard_id(),
     rocksdb:db_handle(),
     emqx_ds_storage_layer:gen_id(),
-    options()
+    options(),
+    _PrevGeneration :: s() | undefined
 ) ->
     {schema(), emqx_ds_storage_layer:cf_refs()}.
-create(_ShardId, DBHandle, GenId, Options) ->
+create(_ShardId, DBHandle, GenId, Options, SPrev) ->
     %% Get options:
     BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64),
     TopicIndexBytes = maps:get(topic_index_bytes, Options, 4),
@@ -193,6 +193,14 @@ create(_ShardId, DBHandle, GenId, Options) ->
     TrieCFName = trie_cf(GenId),
     {ok, DataCFHandle} = rocksdb:create_column_family(DBHandle, DataCFName, []),
     {ok, TrieCFHandle} = rocksdb:create_column_family(DBHandle, TrieCFName, []),
+    case SPrev of
+        #s{trie = TriePrev} ->
+            ok = copy_previous_trie(DBHandle, TrieCFHandle, TriePrev),
+            ?tp(bitfield_lts_inherited_trie, #{}),
+            ok;
+        undefined ->
+            ok
+    end,
     %% Create schema:
     Schema = #{
         bits_per_wildcard_level => BitsPerTopicLevel,
@@ -241,20 +249,6 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
         gvars = ets:new(?MODULE, [public, set, {read_concurrency, true}])
     }.
 
--spec post_creation_actions(emqx_ds_storage_layer:post_creation_context()) ->
-    s().
-post_creation_actions(
-    #{
-        new_gen_runtime_data := NewGenData,
-        old_gen_runtime_data := OldGenData
-    }
-) ->
-    #s{trie = OldTrie} = OldGenData,
-    #s{trie = NewTrie0} = NewGenData,
-    NewTrie = copy_previous_trie(OldTrie, NewTrie0),
-    ?tp(bitfield_lts_inherited_trie, #{}),
-    NewGenData#s{trie = NewTrie}.
-
 -spec drop(
     emqx_ds_storage_layer:shard_id(),
     rocksdb:db_handle(),
@@ -905,9 +899,19 @@ restore_trie(TopicIndexBytes, DB, CF) ->
         rocksdb:iterator_close(IT)
     end.
 
--spec copy_previous_trie(emqx_ds_lts:trie(), emqx_ds_lts:trie()) -> emqx_ds_lts:trie().
-copy_previous_trie(OldTrie, NewTrie) ->
-    emqx_ds_lts:trie_copy_learned_paths(OldTrie, NewTrie).
+-spec copy_previous_trie(rocksdb:db_handle(), rocksdb:cf_handle(), emqx_ds_lts:trie()) ->
+    ok.
+copy_previous_trie(DB, TrieCF, TriePrev) ->
+    {ok, Batch} = rocksdb:batch(),
+    lists:foreach(
+        fun({Key, Val}) ->
+            ok = rocksdb:batch_put(Batch, TrieCF, term_to_binary(Key), term_to_binary(Val))
+        end,
+        emqx_ds_lts:trie_dump(TriePrev, wildcard)
+    ),
+    Result = rocksdb:write_batch(DB, Batch, []),
+    rocksdb:release_batch(Batch),
+    Result.
 
 read_persisted_trie(IT, {ok, KeyB, ValB}) ->
     [

+ 67 - 87
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -69,7 +69,6 @@
     shard_id/0,
     options/0,
     prototype/0,
-    post_creation_context/0,
     cooked_batch/0
 ]).
 
@@ -169,11 +168,14 @@
     until := emqx_ds:time() | undefined
 }.
 
+%% Module-specific runtime data, as instantiated by `Mod:open/5` callback function.
+-type generation_data() :: term().
+
 %% Schema for a generation. Persistent term.
 -type generation_schema() :: generation(term()).
 
 %% Runtime view of generation:
--type generation() :: generation(term()).
+-type generation() :: generation(generation_data()).
 
 %%%% Shard:
 
@@ -194,38 +196,32 @@
 
 -type options() :: map().
 
--type post_creation_context() ::
-    #{
-        shard_id := emqx_ds_storage_layer:shard_id(),
-        db := rocksdb:db_handle(),
-        new_gen_id := emqx_ds_storage_layer:gen_id(),
-        old_gen_id := emqx_ds_storage_layer:gen_id(),
-        new_cf_refs := cf_refs(),
-        old_cf_refs := cf_refs(),
-        new_gen_runtime_data := _NewData,
-        old_gen_runtime_data := _OldData
-    }.
-
 %%================================================================================
 %% Generation callbacks
 %%================================================================================
 
 %% Create the new schema given generation id and the options.
 %% Create rocksdb column families.
--callback create(shard_id(), rocksdb:db_handle(), gen_id(), Options :: map()) ->
+-callback create(
+    shard_id(),
+    rocksdb:db_handle(),
+    gen_id(),
+    Options :: map(),
+    generation_data() | undefined
+) ->
     {_Schema, cf_refs()}.
 
 %% Open the existing schema
 -callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
-    _Data.
+    generation_data().
 
 %% Delete the schema and data
--callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) ->
+-callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), generation_data()) ->
     ok | {error, _Reason}.
 
 -callback prepare_batch(
     shard_id(),
-    _Data,
+    generation_data(),
     [{emqx_ds:time(), emqx_types:message()}, ...],
     emqx_ds:message_store_opts()
 ) ->
@@ -233,34 +229,44 @@
 
 -callback commit_batch(
     shard_id(),
-    _Data,
+    generation_data(),
     _CookedBatch
 ) -> ok | emqx_ds:error(_).
 
--callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) ->
+-callback get_streams(
+    shard_id(), generation_data(), emqx_ds:topic_filter(), emqx_ds:time()
+) ->
     [_Stream].
 
--callback make_iterator(shard_id(), _Data, _Stream, emqx_ds:topic_filter(), emqx_ds:time()) ->
+-callback make_iterator(
+    shard_id(), generation_data(), _Stream, emqx_ds:topic_filter(), emqx_ds:time()
+) ->
     emqx_ds:make_iterator_result(_Iterator).
 
 -callback make_delete_iterator(
-    shard_id(), _Data, _DeleteStream, emqx_ds:topic_filter(), emqx_ds:time()
+    shard_id(), generation_data(), _DeleteStream, emqx_ds:topic_filter(), emqx_ds:time()
 ) ->
     emqx_ds:make_delete_iterator_result(_Iterator).
 
--callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time(), _IsCurrent :: boolean()) ->
+-callback next(
+    shard_id(), generation_data(), Iter, pos_integer(), emqx_ds:time(), _IsCurrent :: boolean()
+) ->
     {ok, Iter, [emqx_types:message()]} | {ok, end_of_stream} | {error, _}.
 
 -callback delete_next(
-    shard_id(), _Data, DeleteIterator, emqx_ds:delete_selector(), pos_integer(), emqx_ds:time()
+    shard_id(),
+    generation_data(),
+    DeleteIterator,
+    emqx_ds:delete_selector(),
+    pos_integer(),
+    emqx_ds:time()
 ) ->
     {ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}.
 
--callback handle_event(shard_id(), _Data, emqx_ds:time(), CustomEvent | tick) -> [CustomEvent].
-
--callback post_creation_actions(post_creation_context()) -> _Data.
+-callback handle_event(shard_id(), generation_data(), emqx_ds:time(), CustomEvent | tick) ->
+    [CustomEvent].
 
--optional_callbacks([post_creation_actions/1, handle_event/4]).
+-optional_callbacks([handle_event/4]).
 
 %%================================================================================
 %% API for the replication layer
@@ -686,42 +692,14 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) ->
     server_state() | {error, overlaps_existing_generations}.
 handle_add_generation(S0, Since) ->
     #s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0,
-
-    #{current_generation := OldGenId, prototype := {CurrentMod, _ModConf}} = Schema0,
-    OldKey = ?GEN_KEY(OldGenId),
-    #{OldKey := OldGenSchema} = Schema0,
-    #{cf_refs := OldCFRefs} = OldGenSchema,
-    #{OldKey := #{module := OldMod, data := OldGenData}} = Shard0,
-
     Schema1 = update_last_until(Schema0, Since),
     Shard1 = update_last_until(Shard0, Since),
-
     case Schema1 of
         _Updated = #{} ->
-            {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Since),
+            {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Shard0, Since),
             CFRefs = NewCFRefs ++ CFRefs0,
             Key = ?GEN_KEY(GenId),
-            Generation0 =
-                #{data := NewGenData0} =
-                open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)),
-            %% When the new generation's module is the same as the last one, we might want to
-            %% perform actions like inheriting some of the previous (meta)data.
-            NewGenData =
-                run_post_creation_actions(
-                    #{
-                        shard_id => ShardId,
-                        db => DB,
-                        new_gen_id => GenId,
-                        old_gen_id => OldGenId,
-                        new_cf_refs => NewCFRefs,
-                        old_cf_refs => OldCFRefs,
-                        new_gen_runtime_data => NewGenData0,
-                        old_gen_runtime_data => OldGenData,
-                        new_module => CurrentMod,
-                        old_module => OldMod
-                    }
-                ),
-            Generation = Generation0#{data := NewGenData},
+            Generation = open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)),
             Shard = Shard1#{current_generation := GenId, Key => Generation},
             S0#s{
                 cf_refs = CFRefs,
@@ -834,9 +812,28 @@ create_new_shard_schema(ShardId, DB, CFRefs, Prototype) ->
 -spec new_generation(shard_id(), rocksdb:db_handle(), shard_schema(), emqx_ds:time()) ->
     {gen_id(), shard_schema(), cf_refs()}.
 new_generation(ShardId, DB, Schema0, Since) ->
+    new_generation(ShardId, DB, Schema0, undefined, Since).
+
+-spec new_generation(
+    shard_id(),
+    rocksdb:db_handle(),
+    shard_schema(),
+    shard() | undefined,
+    emqx_ds:time()
+) ->
+    {gen_id(), shard_schema(), cf_refs()}.
+new_generation(ShardId, DB, Schema0, Shard0, Since) ->
     #{current_generation := PrevGenId, prototype := {Mod, ModConf}} = Schema0,
+    case Shard0 of
+        #{?GEN_KEY(PrevGenId) := #{module := Mod} = PrevGen} ->
+            %% When the new generation's module is the same as the last one, we might want
+            %% to perform actions like inheriting some of the previous (meta)data.
+            PrevRuntimeData = maps:get(data, PrevGen);
+        _ ->
+            PrevRuntimeData = undefined
+    end,
     GenId = next_generation_id(PrevGenId),
-    {GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConf),
+    {GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConf, PrevRuntimeData),
     GenSchema = #{
         module => Mod,
         data => GenData,
@@ -918,23 +915,6 @@ update_last_until(Schema = #{current_generation := GenId}, Until) ->
             {error, overlaps_existing_generations}
     end.
 
-run_post_creation_actions(
-    #{
-        new_module := Mod,
-        old_module := Mod,
-        new_gen_runtime_data := NewGenData
-    } = Context
-) ->
-    case erlang:function_exported(Mod, post_creation_actions, 1) of
-        true ->
-            Mod:post_creation_actions(Context);
-        false ->
-            NewGenData
-    end;
-run_post_creation_actions(#{new_gen_runtime_data := NewGenData}) ->
-    %% Different implementation modules
-    NewGenData.
-
 handle_take_snapshot(#s{db = DB, shard_id = ShardId}) ->
     Name = integer_to_list(erlang:system_time(millisecond)),
     Dir = checkpoint_dir(ShardId, Name),
@@ -1007,17 +987,17 @@ generation_get(Shard, GenId) ->
 
 -spec generations_since(shard_id(), emqx_ds:time()) -> [gen_id()].
 generations_since(Shard, Since) ->
-    Schema = get_schema_runtime(Shard),
-    maps:fold(
-        fun
-            (?GEN_KEY(GenId), #{until := Until}, Acc) when Until >= Since ->
-                [GenId | Acc];
-            (_K, _V, Acc) ->
-                Acc
-        end,
-        [],
-        Schema
-    ).
+    Schema = #{current_generation := Current} = get_schema_runtime(Shard),
+    list_generations_since(Schema, Current, Since).
+
+list_generations_since(Schema, GenId, Since) ->
+    case Schema of
+        #{?GEN_KEY(GenId) := #{until := Until}} when Until > Since ->
+            [GenId | list_generations_since(Schema, GenId - 1, Since)];
+        #{} ->
+            %% No more live generations.
+            []
+    end.
 
 format_state(#s{shard_id = ShardId, db = DB, cf_refs = CFRefs, schema = Schema, shard = Shard}) ->
     #{

+ 2 - 2
apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl

@@ -28,7 +28,7 @@
 
 %% behavior callbacks:
 -export([
-    create/4,
+    create/5,
     open/5,
     drop/5,
     prepare_batch/4,
@@ -88,7 +88,7 @@
 %% behavior callbacks
 %%================================================================================
 
-create(_ShardId, DBHandle, GenId, _Options) ->
+create(_ShardId, DBHandle, GenId, _Options, _SPrev) ->
     CFName = data_cf(GenId),
     {ok, CFHandle} = rocksdb:create_column_family(DBHandle, CFName, []),
     Schema = #schema{},

+ 1 - 1
apps/emqx_durable_storage/src/emqx_durable_storage.app.src

@@ -2,7 +2,7 @@
 {application, emqx_durable_storage, [
     {description, "Message persistence and subscription replays for EMQX"},
     % strict semver, bump manually!
-    {vsn, "0.2.0"},
+    {vsn, "0.2.1"},
     {modules, []},
     {registered, []},
     {applications, [kernel, stdlib, rocksdb, gproc, mria, ra, emqx_utils]},

+ 29 - 13
apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl

@@ -177,20 +177,33 @@ t_new_generation_inherit_trie(_Config) ->
     ?check_trace(
         begin
             %% Create a bunch of topics to be learned in the first generation
-            Timestamps = lists:seq(1, 10_000, 100),
-            Batch = [
-                begin
-                    Topic = emqx_topic:join(["wildcard", integer_to_binary(I), "suffix", Suffix]),
-                    {TS, make_message(TS, Topic, integer_to_binary(TS))}
-                end
+            TS1 = 500,
+            Batch1 = [
+                {TS1, make_message(TS1, make_topic([wildcard, I, suffix, Suffix]), bin(I))}
              || I <- lists:seq(1, 200),
-                TS <- Timestamps,
                 Suffix <- [<<"foo">>, <<"bar">>]
             ],
-            ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []),
+            ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []),
             %% Now we create a new generation with the same LTS module.  It should inherit the
             %% learned trie.
-            ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1000),
+            ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1_000),
+            %% Restart the shard, to verify that LTS is persisted.
+            ok = application:stop(emqx_durable_storage),
+            ok = application:start(emqx_durable_storage),
+            ok = emqx_ds:open_db(?FUNCTION_NAME, ?DEFAULT_CONFIG),
+            %% Store a batch of messages with the same set of topics.
+            TS2 = 1_500,
+            Batch2 = [
+                {TS2, make_message(TS2, make_topic([wildcard, I, suffix, Suffix]), bin(I))}
+             || I <- lists:seq(1, 200),
+                Suffix <- [<<"foo">>, <<"bar">>]
+            ],
+            ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []),
+            %% We should get only two streams for wildcard query, for "foo" and for "bar".
+            ?assertMatch(
+                [_Foo, _Bar],
+                emqx_ds_storage_layer:get_streams(?SHARD, [<<"wildcard">>, '#'], 1_000)
+            ),
             ok
         end,
         fun(Trace) ->
@@ -211,10 +224,7 @@ t_replay(_Config) ->
     ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []),
     %% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar':
     Batch2 = [
-        begin
-            Topic = emqx_topic:join(["wildcard", integer_to_list(I), "suffix", Suffix]),
-            {TS, make_message(TS, Topic, integer_to_binary(TS))}
-        end
+        {TS, make_message(TS, make_topic([wildcard, I, suffix, Suffix]), bin(TS))}
      || I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>]
     ],
     ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []),
@@ -475,6 +485,9 @@ make_message(PublishedAt, Topic, Payload) when is_binary(Topic) ->
         payload = Payload
     }.
 
+make_topic(Tokens = [_ | _]) ->
+    emqx_topic:join([bin(T) || T <- Tokens]).
+
 payloads(Messages) ->
     lists:map(
         fun(#message{payload = P}) ->
@@ -488,6 +501,9 @@ parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) ->
 parse_topic(Topic) ->
     emqx_topic:words(iolist_to_binary(Topic)).
 
+bin(X) ->
+    emqx_utils_conv:bin(X).
+
 %% CT callbacks
 
 all() -> emqx_common_test_helpers:all(?MODULE).

+ 1 - 0
changes/ce/fix-13276.en.md

@@ -0,0 +1 @@
+Fix an issue with durable message storage where parts of the internal storage state were not persisted during setup of new storage generation, a concept used internally for managing message expiration and cleanup. This could have manifested as messages being lost after a restart of the broker.