Просмотр исходного кода

feat(dsrepl): make storage layer unaware of granularity of time

Storage also becomes a bit more pure, depending on the upper layer to
provide the timestamps, which also makes it possible to handle more
operations idempotently.
Andrew Mayorov 1 год назад
Родитель
Сommit
74881e8706

+ 18 - 10
apps/emqx/src/emqx_message.erl

@@ -38,7 +38,8 @@
     from/1,
     topic/1,
     payload/1,
-    timestamp/1
+    timestamp/1,
+    timestamp/2
 ]).
 
 %% Flags
@@ -66,7 +67,6 @@
 
 -export([
     is_expired/2,
-    set_timestamp/2,
     update_expiry/1,
     timestamp_now/0
 ]).
@@ -80,7 +80,10 @@
     estimate_size/1
 ]).
 
--export_type([message_map/0]).
+-export_type([
+    timestamp/0,
+    message_map/0
+]).
 
 -type message_map() :: #{
     id := binary(),
@@ -90,10 +93,14 @@
     headers := emqx_types:headers(),
     topic := emqx_types:topic(),
     payload := emqx_types:payload(),
-    timestamp := integer(),
+    timestamp := timestamp(),
     extra := _
 }.
 
+%% Message timestamp
+%% Granularity: milliseconds.
+-type timestamp() :: non_neg_integer().
+
 -elvis([{elvis_style, god_modules, disable}]).
 
 -spec make(emqx_types:topic(), emqx_types:payload()) -> emqx_types:message().
@@ -202,9 +209,14 @@ topic(#message{topic = Topic}) -> Topic.
 -spec payload(emqx_types:message()) -> emqx_types:payload().
 payload(#message{payload = Payload}) -> Payload.
 
--spec timestamp(emqx_types:message()) -> integer().
+-spec timestamp(emqx_types:message()) -> timestamp().
 timestamp(#message{timestamp = TS}) -> TS.
 
+-spec timestamp(emqx_types:message(), second | millisecond | microsecond) -> non_neg_integer().
+timestamp(#message{timestamp = TS}, second) -> TS div 1000;
+timestamp(#message{timestamp = TS}, millisecond) -> TS;
+timestamp(#message{timestamp = TS}, microsecond) -> TS * 1000.
+
 -spec is_sys(emqx_types:message()) -> boolean().
 is_sys(#message{flags = #{sys := true}}) ->
     true;
@@ -289,10 +301,6 @@ is_expired(#message{timestamp = CreatedAt}, Zone) ->
         Interval -> elapsed(CreatedAt) > Interval
     end.
 
--spec set_timestamp(integer(), emqx_types:message()) -> emqx_types:message().
-set_timestamp(Timestamp, Msg) ->
-    Msg#message{timestamp = Timestamp}.
-
 -spec update_expiry(emqx_types:message()) -> emqx_types:message().
 update_expiry(
     Msg = #message{
@@ -421,7 +429,7 @@ from_map(#{
     }.
 
 %% @doc Get current timestamp in milliseconds.
--spec timestamp_now() -> integer().
+-spec timestamp_now() -> timestamp().
 timestamp_now() ->
     erlang:system_time(millisecond).
 

+ 1 - 1
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -323,7 +323,7 @@ subscribe(
             ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID),
             {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
             Subscription = #{
-                start_time => emqx_ds:timestamp_us(),
+                start_time => now_ms(),
                 props => SubOpts,
                 id => SubId,
                 deleted => false

+ 0 - 3
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -150,9 +150,6 @@
 %% Timestamp
 %% Each message must have unique timestamp.
 %% Earliest possible timestamp is 0.
-%% Granularity: microsecond.
-%% TODO: Currently, we should always use milliseconds, as that's the unit we
-%% use in emqx_guid. Otherwise, the iterators won't match the message timestamps.
 -type time() :: non_neg_integer().
 
 -type message_store_opts() ::

+ 1 - 1
apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl

@@ -320,7 +320,7 @@ bin_key_to_vector(Keymapper = #keymapper{vec_coord_size = DimSizeof, key_size =
         DimSizeof
     ).
 
--spec key_to_coord(keymapper(), key(), dimension()) -> coord().
+-spec key_to_coord(keymapper(), scalar(), dimension()) -> coord().
 key_to_coord(#keymapper{vec_scanner = Scanner}, Key, Dim) ->
     Actions = lists:nth(Dim, Scanner),
     extract_coord(Actions, Key).

+ 59 - 21
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -371,7 +371,8 @@ do_drop_db_v1(DB) ->
 ) ->
     emqx_ds:store_batch_result().
 do_store_batch_v1(DB, Shard, #{?tag := ?BATCH, ?batch_messages := Messages}, Options) ->
-    emqx_ds_storage_layer:store_batch({DB, Shard}, Messages, Options).
+    Batch = [{emqx_message:timestamp(Message), Message} || Message <- Messages],
+    emqx_ds_storage_layer:store_batch({DB, Shard}, Batch, Options).
 
 %% Remove me in EMQX 5.6
 -dialyzer({nowarn_function, do_get_streams_v1/4}).
@@ -462,7 +463,7 @@ do_add_generation_v2(DB) ->
     MyShards = [],
     lists:foreach(
         fun(ShardId) ->
-            emqx_ds_storage_layer:add_generation({DB, ShardId})
+            emqx_ds_storage_layer:add_generation({DB, ShardId}, emqx_ds:timestamp_us())
         end,
         MyShards
     ).
@@ -511,7 +512,10 @@ ra_store_batch(DB, Shard, Messages) ->
     end.
 
 ra_add_generation(DB, Shard) ->
-    Command = #{?tag => add_generation},
+    Command = #{
+        ?tag => add_generation,
+        ?since => emqx_ds:timestamp_us()
+    },
     Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
     case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
         {ok, Result, _Leader} ->
@@ -521,7 +525,11 @@ ra_add_generation(DB, Shard) ->
     end.
 
 ra_update_config(DB, Shard, Opts) ->
-    Command = #{?tag => update_config, ?config => Opts},
+    Command = #{
+        ?tag => update_config,
+        ?config => Opts,
+        ?since => emqx_ds:timestamp_us()
+    },
     Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
     case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
         {ok, Result, _Leader} ->
@@ -541,16 +549,18 @@ ra_drop_generation(DB, Shard, GenId) ->
     end.
 
 ra_get_streams(DB, Shard, TopicFilter, Time) ->
-    {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
-    emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, Time).
+    {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
+    TimestampUs = timestamp_to_timeus(Time),
+    emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs).
 
 ra_get_delete_streams(DB, Shard, TopicFilter, Time) ->
     {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
     emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time).
 
 ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
-    {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
-    emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime).
+    {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
+    TimestampUs = timestamp_to_timeus(StartTime),
+    emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimestampUs).
 
 ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
     {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
@@ -570,7 +580,16 @@ ra_delete_next(DB, Shard, Iter, Selector, BatchSize) ->
 
 ra_list_generations_with_lifetimes(DB, Shard) ->
     {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
-    emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard).
+    Gens = emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard),
+    maps:map(
+        fun(_GenId, Data = #{since := Since, until := Until}) ->
+            Data#{
+                since := timeus_to_timestamp(Since),
+                until := emqx_maybe:apply(fun timeus_to_timestamp/1, Until)
+            }
+        end,
+        Gens
+    ).
 
 ra_drop_shard(DB, Shard) ->
     LocalServer = emqx_ds_replication_layer_shard:server(DB, Shard, local),
@@ -608,18 +627,22 @@ apply(
     {NState, Result, Effect};
 apply(
     _RaftMeta,
-    #{?tag := add_generation},
-    #{db_shard := DBShard} = State
+    #{?tag := add_generation, ?since := Since},
+    #{db_shard := DBShard, latest := Latest} = State
 ) ->
-    Result = emqx_ds_storage_layer:add_generation(DBShard),
-    {State, Result};
+    {Timestamp, NLatest} = ensure_monotonic_timestamp(Since, Latest),
+    Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp),
+    NState = State#{latest := NLatest},
+    {NState, Result};
 apply(
     _RaftMeta,
-    #{?tag := update_config, ?config := Opts},
-    #{db_shard := DBShard} = State
+    #{?tag := update_config, ?since := Since, ?config := Opts},
+    #{db_shard := DBShard, latest := Latest} = State
 ) ->
-    Result = emqx_ds_storage_layer:update_config(DBShard, Opts),
-    {State, Result};
+    {Timestamp, NLatest} = ensure_monotonic_timestamp(Since, Latest),
+    Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts),
+    NState = State#{latest := NLatest},
+    {NState, Result};
 apply(
     _RaftMeta,
     #{?tag := drop_generation, ?generation := GenId},
@@ -632,12 +655,27 @@ assign_timestamps(Latest, Messages) ->
     assign_timestamps(Latest, Messages, []).
 
 assign_timestamps(Latest, [MessageIn | Rest], Acc) ->
-    case emqx_message:timestamp(MessageIn) of
-        Later when Later > Latest ->
-            assign_timestamps(Later, Rest, [MessageIn | Acc]);
+    case emqx_message:timestamp(MessageIn, microsecond) of
+        TimestampUs when TimestampUs > Latest ->
+            Message = assign_timestamp(TimestampUs, MessageIn),
+            assign_timestamps(TimestampUs, Rest, [Message | Acc]);
         _Earlier ->
-            Message = emqx_message:set_timestamp(Latest + 1, MessageIn),
+            Message = assign_timestamp(Latest + 1, MessageIn),
             assign_timestamps(Latest + 1, Rest, [Message | Acc])
     end;
 assign_timestamps(Latest, [], Acc) ->
     {Latest, Acc}.
+
+assign_timestamp(TimestampUs, Message) ->
+    {TimestampUs, Message}.
+
+ensure_monotonic_timestamp(TimestampUs, Latest) when TimestampUs > Latest ->
+    {TimestampUs, TimestampUs + 1};
+ensure_monotonic_timestamp(_TimestampUs, Latest) ->
+    {Latest, Latest + 1}.
+
+timestamp_to_timeus(TimestampMs) ->
+    TimestampMs * 1000.
+
+timeus_to_timestamp(TimestampUs) ->
+    TimestampUs div 1000.

+ 2 - 1
apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl

@@ -34,8 +34,9 @@
 -define(batch_messages, 2).
 -define(timestamp, 3).
 
-%% update_config
+%% add_generation / update_config
 -define(config, 2).
+-define(since, 3).
 
 %% drop_generation
 -define(generation, 2).

+ 3 - 6
apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl

@@ -67,9 +67,8 @@ store_batch(DB, Messages, Opts) ->
     case maps:get(atomic, Opts, false) of
         false ->
             lists:foreach(
-                fun(MessageIn) ->
-                    Shard = emqx_ds_replication_layer:shard_of_message(DB, MessageIn, clientid),
-                    Message = emqx_message:set_timestamp(emqx_ds:timestamp_us(), MessageIn),
+                fun(Message) ->
+                    Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
                     gen_server:call(
                         ?via(DB, Shard),
                         #enqueue_req{
@@ -83,9 +82,7 @@ store_batch(DB, Messages, Opts) ->
             );
         true ->
             maps:foreach(
-                fun(Shard, BatchIn) ->
-                    Timestamp = emqx_ds:timestamp_us(),
-                    Batch = [emqx_message:set_timestamp(Timestamp, Message) || Message <- BatchIn],
+                fun(Shard, Batch) ->
                     gen_server:call(
                         ?via(DB, Shard),
                         #enqueue_atomic_req{

+ 9 - 6
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -245,16 +245,19 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{}) ->
     ok.
 
 -spec store_batch(
-    emqx_ds_storage_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts()
+    emqx_ds_storage_layer:shard_id(),
+    s(),
+    [{emqx_ds:time(), emqx_types:message()}],
+    emqx_ds:message_store_opts()
 ) ->
     emqx_ds:store_batch_result().
 store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
     {ok, Batch} = rocksdb:batch(),
     lists:foreach(
-        fun(Msg) ->
-            {Key, _} = make_key(S, Msg),
+        fun({Timestamp, Msg}) ->
+            {Key, _} = make_key(S, Timestamp, Msg),
             Val = serialize(Msg),
-            rocksdb:batch_put(Batch, Data, Key, Val)
+            rocksdb:put(DB, Data, Key, Val, [])
         end,
         Messages
     ),
@@ -652,8 +655,8 @@ format_key(KeyMapper, Key) ->
     Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)],
     lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])).
 
--spec make_key(s(), emqx_types:message()) -> {binary(), [binary()]}.
-make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestamp, topic = TopicBin}) ->
+-spec make_key(s(), emqx_ds:time(), emqx_types:message()) -> {binary(), [binary()]}.
+make_key(#s{keymappers = KeyMappers, trie = Trie}, Timestamp, #message{topic = TopicBin}) ->
     Tokens = emqx_topic:words(TopicBin),
     {TopicIndex, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
     VaryingHashes = [hash_topic_level(I) || I <- Varying],

+ 92 - 70
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -29,8 +29,8 @@
     update_iterator/3,
     next/3,
     delete_next/4,
-    update_config/2,
-    add_generation/1,
+    update_config/3,
+    add_generation/2,
     list_generations_with_lifetimes/1,
     drop_generation/2
 ]).
@@ -133,7 +133,7 @@
     cf_refs := cf_refs(),
     %% Time at which this was created.  Might differ from `since', in particular for the
     %% first generation.
-    created_at := emqx_ds:time(),
+    created_at := emqx_message:timestamp(),
     %% When should this generation become active?
     %% This generation should only contain messages timestamped no earlier than that.
     %% The very first generation will have `since` equal 0.
@@ -194,7 +194,12 @@
 -callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) ->
     ok | {error, _Reason}.
 
--callback store_batch(shard_id(), _Data, [emqx_types:message()], emqx_ds:message_store_opts()) ->
+-callback store_batch(
+    shard_id(),
+    _Data,
+    [{emqx_ds:time(), emqx_types:message()}],
+    emqx_ds:message_store_opts()
+) ->
     emqx_ds:store_batch_result().
 
 -callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) ->
@@ -219,6 +224,9 @@
 %% API for the replication layer
 %%================================================================================
 
+%% Note: we specify gen_server requests as records to make use of Dialyzer:
+-record(call_add_generation, {since :: emqx_ds:time()}).
+-record(call_update_config, {options :: emqx_ds:create_db_opts(), since :: emqx_ds:time()}).
 -record(call_list_generations_with_lifetimes, {}).
 -record(call_drop_generation, {gen_id :: gen_id()}).
 
@@ -230,7 +238,11 @@ open_shard(Shard, Options) ->
 drop_shard(Shard) ->
     ok = rocksdb:destroy(db_dir(Shard), []).
 
--spec store_batch(shard_id(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
+-spec store_batch(
+    shard_id(),
+    [{emqx_ds:time(), emqx_types:message()}],
+    emqx_ds:message_store_opts()
+) ->
     emqx_ds:store_batch_result().
 store_batch(Shard, Messages, Options) ->
     %% We always store messages in the current generation:
@@ -398,13 +410,14 @@ delete_next(
             {ok, end_of_stream}
     end.
 
--spec update_config(shard_id(), emqx_ds:create_db_opts()) -> ok.
-update_config(ShardId, Options) ->
-    gen_server:call(?REF(ShardId), {?FUNCTION_NAME, Options}, infinity).
+-spec update_config(shard_id(), emqx_ds:time(), emqx_ds:create_db_opts()) -> ok.
+update_config(ShardId, Since, Options) ->
+    Call = #call_update_config{since = Since, options = Options},
+    gen_server:call(?REF(ShardId), Call, infinity).
 
--spec add_generation(shard_id()) -> ok.
-add_generation(ShardId) ->
-    gen_server:call(?REF(ShardId), add_generation, infinity).
+-spec add_generation(shard_id(), emqx_ds:time()) -> ok.
+add_generation(ShardId, Since) ->
+    gen_server:call(?REF(ShardId), #call_add_generation{since = Since}, infinity).
 
 -spec list_generations_with_lifetimes(shard_id()) ->
     #{
@@ -438,9 +451,6 @@ start_link(Shard = {_, _}, Options) ->
     shard :: shard()
 }).
 
-%% Note: we specify gen_server requests as records to make use of Dialyzer:
--record(call_create_generation, {since :: emqx_ds:time()}).
-
 -type server_state() :: #s{}.
 
 -define(DEFAULT_CF, "default").
@@ -470,16 +480,12 @@ init({ShardId, Options}) ->
     commit_metadata(S),
     {ok, S}.
 
-handle_call({update_config, Options}, _From, #s{schema = Schema} = S0) ->
-    Prototype = maps:get(storage, Options),
-    S1 = S0#s{schema = Schema#{prototype := Prototype}},
-    Since = emqx_message:timestamp_now(),
-    S = add_generation(S1, Since),
+handle_call(#call_update_config{since = Since, options = Options}, _From, S0) ->
+    S = #s{} = handle_update_config(S0, Since, Options),
     commit_metadata(S),
     {reply, ok, S};
-handle_call(add_generation, _From, S0) ->
-    Since = emqx_message:timestamp_now(),
-    S = add_generation(S0, Since),
+handle_call(#call_add_generation{since = Since}, _From, S0) ->
+    S = #s{} = handle_add_generation(S0, Since),
     commit_metadata(S),
     {reply, ok, S};
 handle_call(#call_list_generations_with_lifetimes{}, _From, S) ->
@@ -489,10 +495,6 @@ handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) ->
     {Reply, S} = handle_drop_generation(S0, GenId),
     commit_metadata(S),
     {reply, Reply, S};
-handle_call(#call_create_generation{since = Since}, _From, S0) ->
-    S = add_generation(S0, Since),
-    commit_metadata(S),
-    {reply, ok, S};
 handle_call(_Call, _From, S) ->
     {reply, {error, unknown_call}, S}.
 
@@ -528,11 +530,10 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) ->
         ShardSchema
     ).
 
--spec add_generation(server_state(), emqx_ds:time()) -> server_state().
-add_generation(S0, Since) ->
+-spec handle_add_generation(server_state(), emqx_ds:time()) ->
+    server_state() | {error, nonmonotonic}.
+handle_add_generation(S0, Since) ->
     #s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0,
-    Schema1 = update_last_until(Schema0, Since),
-    Shard1 = update_last_until(Shard0, Since),
 
     #{current_generation := OldGenId, prototype := {CurrentMod, _ModConf}} = Schema0,
     OldKey = ?GEN_KEY(OldGenId),
@@ -540,39 +541,53 @@ add_generation(S0, Since) ->
     #{cf_refs := OldCFRefs} = OldGenSchema,
     #{OldKey := #{module := OldMod, data := OldGenData}} = Shard0,
 
-    {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Since),
-
-    CFRefs = NewCFRefs ++ CFRefs0,
-    Key = ?GEN_KEY(GenId),
-    Generation0 =
-        #{data := NewGenData0} =
-        open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)),
-
-    %% When the new generation's module is the same as the last one, we might want to
-    %% perform actions like inheriting some of the previous (meta)data.
-    NewGenData =
-        run_post_creation_actions(
-            #{
-                shard_id => ShardId,
-                db => DB,
-                new_gen_id => GenId,
-                old_gen_id => OldGenId,
-                new_cf_refs => NewCFRefs,
-                old_cf_refs => OldCFRefs,
-                new_gen_runtime_data => NewGenData0,
-                old_gen_runtime_data => OldGenData,
-                new_module => CurrentMod,
-                old_module => OldMod
-            }
-        ),
-    Generation = Generation0#{data := NewGenData},
-
-    Shard = Shard1#{current_generation := GenId, Key => Generation},
-    S0#s{
-        cf_refs = CFRefs,
-        schema = Schema,
-        shard = Shard
-    }.
+    Schema1 = update_last_until(Schema0, Since),
+    Shard1 = update_last_until(Shard0, Since),
+
+    case Schema1 of
+        _Updated = #{} ->
+            {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Since),
+            CFRefs = NewCFRefs ++ CFRefs0,
+            Key = ?GEN_KEY(GenId),
+            Generation0 =
+                #{data := NewGenData0} =
+                open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)),
+            %% When the new generation's module is the same as the last one, we might want to
+            %% perform actions like inheriting some of the previous (meta)data.
+            NewGenData =
+                run_post_creation_actions(
+                    #{
+                        shard_id => ShardId,
+                        db => DB,
+                        new_gen_id => GenId,
+                        old_gen_id => OldGenId,
+                        new_cf_refs => NewCFRefs,
+                        old_cf_refs => OldCFRefs,
+                        new_gen_runtime_data => NewGenData0,
+                        old_gen_runtime_data => OldGenData,
+                        new_module => CurrentMod,
+                        old_module => OldMod
+                    }
+                ),
+            Generation = Generation0#{data := NewGenData},
+            Shard = Shard1#{current_generation := GenId, Key => Generation},
+            S0#s{
+                cf_refs = CFRefs,
+                schema = Schema,
+                shard = Shard
+            };
+        {error, exists} ->
+            S0;
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+-spec handle_update_config(server_state(), emqx_ds:time(), emqx_ds:create_db_opts()) ->
+    server_state().
+handle_update_config(S0 = #s{schema = Schema}, Since, Options) ->
+    Prototype = maps:get(storage, Options),
+    S = S0#s{schema = Schema#{prototype := Prototype}},
+    handle_add_generation(S, Since).
 
 -spec handle_list_generations_with_lifetimes(server_state()) -> #{gen_id() => map()}.
 handle_list_generations_with_lifetimes(#s{schema = ShardSchema}) ->
@@ -652,7 +667,7 @@ new_generation(ShardId, DB, Schema0, Since) ->
         module => Mod,
         data => GenData,
         cf_refs => NewCFRefs,
-        created_at => emqx_message:timestamp_now(),
+        created_at => erlang:system_time(millisecond),
         since => Since,
         until => undefined
     },
@@ -703,12 +718,19 @@ rocksdb_open(Shard, Options) ->
 db_dir({DB, ShardId}) ->
     filename:join([emqx_ds:base_dir(), atom_to_list(DB), binary_to_list(ShardId)]).
 
--spec update_last_until(Schema, emqx_ds:time()) -> Schema when Schema :: shard_schema() | shard().
-update_last_until(Schema, Until) ->
-    #{current_generation := GenId} = Schema,
-    GenData0 = maps:get(?GEN_KEY(GenId), Schema),
-    GenData = GenData0#{until := Until},
-    Schema#{?GEN_KEY(GenId) := GenData}.
+-spec update_last_until(Schema, emqx_ds:time()) ->
+    Schema | {error, exists | nonmonotonic}
+when
+    Schema :: shard_schema() | shard().
+update_last_until(Schema = #{current_generation := GenId}, Until) ->
+    case maps:get(?GEN_KEY(GenId), Schema) of
+        GenData = #{since := CurrentSince} when CurrentSince < Until ->
+            Schema#{?GEN_KEY(GenId) := GenData#{until := Until}};
+        #{since := Until} ->
+            {error, exists};
+        #{since := CurrentSince} when CurrentSince > Until ->
+            {error, nonmonotonic}
+    end.
 
 run_post_creation_actions(
     #{

+ 4 - 4
apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl

@@ -117,8 +117,8 @@ store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options = #{atomic := tru
     Res;
 store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) ->
     lists:foreach(
-        fun(Msg) ->
-            Key = <<(emqx_message:timestamp(Msg)):64>>,
+        fun({Timestamp, Msg}) ->
+            Key = <<Timestamp:64>>,
             Val = term_to_binary(Msg),
             rocksdb:put(DB, CF, Key, Val, [])
         end,
@@ -209,8 +209,8 @@ do_next(_, _, _, _, 0, Key, Acc) ->
     {Key, Acc};
 do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) ->
     case rocksdb:iterator_move(IT, Action) of
-        {ok, Key, Blob} ->
-            Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob),
+        {ok, Key = <<TS:64>>, Blob} ->
+            Msg = #message{topic = Topic} = binary_to_term(Blob),
             TopicWords = emqx_topic:words(Topic),
             case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of
                 true ->

+ 12 - 30
apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl

@@ -56,7 +56,7 @@ t_store(_Config) ->
         payload = Payload,
         timestamp = PublishedAt
     },
-    ?assertMatch(ok, emqx_ds_storage_layer:store_batch(?SHARD, [Msg], #{})).
+    ?assertMatch(ok, emqx_ds_storage_layer:store_batch(?SHARD, [{PublishedAt, Msg}], #{})).
 
 %% Smoke test for iteration through a concrete topic
 t_iterate(_Config) ->
@@ -64,7 +64,7 @@ t_iterate(_Config) ->
     Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>],
     Timestamps = lists:seq(1, 10),
     Batch = [
-        make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))
+        {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
      || Topic <- Topics, PublishedAt <- Timestamps
     ],
     ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []),
@@ -92,7 +92,7 @@ t_delete(_Config) ->
     Topics = [<<"foo/bar">>, TopicToDelete, <<"a">>],
     Timestamps = lists:seq(1, 10),
     Batch = [
-        make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))
+        {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
      || Topic <- Topics, PublishedAt <- Timestamps
     ],
     ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []),
@@ -123,7 +123,7 @@ t_get_streams(_Config) ->
     Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>],
     Timestamps = lists:seq(1, 10),
     Batch = [
-        make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))
+        {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
      || Topic <- Topics, PublishedAt <- Timestamps
     ],
     ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []),
@@ -149,7 +149,7 @@ t_get_streams(_Config) ->
     NewBatch = [
         begin
             B = integer_to_binary(I),
-            make_message(100, <<"foo/bar/", B/binary>>, <<"filler", B/binary>>)
+            {100, make_message(100, <<"foo/bar/", B/binary>>, <<"filler", B/binary>>)}
         end
      || I <- lists:seq(1, 200)
     ],
@@ -178,12 +178,8 @@ t_new_generation_inherit_trie(_Config) ->
             Timestamps = lists:seq(1, 10_000, 100),
             Batch = [
                 begin
-                    B = integer_to_binary(I),
-                    make_message(
-                        TS,
-                        <<"wildcard/", B/binary, "/suffix/", Suffix/binary>>,
-                        integer_to_binary(TS)
-                    )
+                    Topic = emqx_topic:join(["wildcard", integer_to_binary(I), "suffix", Suffix]),
+                    {TS, make_message(TS, Topic, integer_to_binary(TS))}
                 end
              || I <- lists:seq(1, 200),
                 TS <- Timestamps,
@@ -192,7 +188,7 @@ t_new_generation_inherit_trie(_Config) ->
             ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []),
             %% Now we create a new generation with the same LTS module.  It should inherit the
             %% learned trie.
-            ok = emqx_ds_storage_layer:add_generation(?SHARD),
+            ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1000),
             ok
         end,
         fun(Trace) ->
@@ -207,23 +203,21 @@ t_replay(_Config) ->
     Topics = [<<"foo/bar">>, <<"foo/bar/baz">>],
     Timestamps = lists:seq(1, 10_000, 100),
     Batch1 = [
-        make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))
+        {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
      || Topic <- Topics, PublishedAt <- Timestamps
     ],
     ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []),
     %% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar':
     Batch2 = [
         begin
-            B = integer_to_binary(I),
-            make_message(
-                TS, <<"wildcard/", B/binary, "/suffix/", Suffix/binary>>, integer_to_binary(TS)
-            )
+            Topic = emqx_topic:join(["wildcard", integer_to_list(I), "suffix", Suffix]),
+            {TS, make_message(TS, Topic, integer_to_binary(TS))}
         end
      || I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>]
     ],
     ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []),
     %% Check various topic filters:
-    Messages = Batch1 ++ Batch2,
+    Messages = [M || {_TS, M} <- Batch1 ++ Batch2],
     %% Missing topics (no ghost messages):
     ?assertNot(check(?SHARD, <<"missing/foo/bar">>, 0, Messages)),
     %% Regular topics:
@@ -481,18 +475,6 @@ make_message(PublishedAt, Topic, Payload) when is_binary(Topic) ->
         payload = Payload
     }.
 
-store(Shard, PublishedAt, TopicL, Payload) when is_list(TopicL) ->
-    store(Shard, PublishedAt, list_to_binary(TopicL), Payload);
-store(Shard, PublishedAt, Topic, Payload) ->
-    ID = emqx_guid:gen(),
-    Msg = #message{
-        id = ID,
-        topic = Topic,
-        timestamp = PublishedAt,
-        payload = Payload
-    },
-    emqx_ds_storage_layer:message_store(Shard, [Msg], #{}).
-
 payloads(Messages) ->
     lists:map(
         fun(#message{payload = P}) ->