Explorar el Código

Merge pull request #11108 from ieQu1/emqx-ds

Durable storage
lafirest hace 2 años
padre
commit
850d642ec2

+ 1 - 0
.github/CODEOWNERS

@@ -18,6 +18,7 @@
 /apps/emqx_rule_engine/    @emqx/emqx-review-board @kjellwinblad
 /apps/emqx_slow_subs/      @emqx/emqx-review-board @lafirest
 /apps/emqx_statsd/         @emqx/emqx-review-board @JimMoen
+/apps/emqx_durable_storage/ @ieQu1
 
 ## CI
 /deploy/  @emqx/emqx-review-board @Rory-Z

+ 1 - 1
apps/emqx/src/proto/emqx_shared_sub_proto_v1.erl

@@ -26,7 +26,7 @@
 -include("bpapi.hrl").
 
 %%================================================================================
-%% behavior callbacks
+%% behaviour callbacks
 %%================================================================================
 
 introduced_in() ->

+ 94 - 0
apps/emqx_durable_storage/BSL.txt

@@ -0,0 +1,94 @@
+Business Source License 1.1
+
+Licensor:             Hangzhou EMQ Technologies Co., Ltd.
+Licensed Work:        EMQX Enterprise Edition
+                      The Licensed Work is (c) 2023
+                      Hangzhou EMQ Technologies Co., Ltd.
+Additional Use Grant: Students and educators are granted right to copy,
+                      modify, and create derivative work for research
+                      or education.
+Change Date:          2027-06-01
+Change License:       Apache License, Version 2.0
+
+For information about alternative licensing arrangements for the Software,
+please contact Licensor: https://www.emqx.com/en/contact
+
+Notice
+
+The Business Source License (this document, or the “License”) is not an Open
+Source license. However, the Licensed Work will eventually be made available
+under an Open Source License, as stated in this License.
+
+License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
+“Business Source License” is a trademark of MariaDB Corporation Ab.
+
+-----------------------------------------------------------------------------
+
+Business Source License 1.1
+
+Terms
+
+The Licensor hereby grants you the right to copy, modify, create derivative
+works, redistribute, and make non-production use of the Licensed Work. The
+Licensor may make an Additional Use Grant, above, permitting limited
+production use.
+
+Effective on the Change Date, or the fourth anniversary of the first publicly
+available distribution of a specific version of the Licensed Work under this
+License, whichever comes first, the Licensor hereby grants you rights under
+the terms of the Change License, and the rights granted in the paragraph
+above terminate.
+
+If your use of the Licensed Work does not comply with the requirements
+currently in effect as described in this License, you must purchase a
+commercial license from the Licensor, its affiliated entities, or authorized
+resellers, or you must refrain from using the Licensed Work.
+
+All copies of the original and modified Licensed Work, and derivative works
+of the Licensed Work, are subject to this License. This License applies
+separately for each version of the Licensed Work and the Change Date may vary
+for each version of the Licensed Work released by Licensor.
+
+You must conspicuously display this License on each original or modified copy
+of the Licensed Work. If you receive the Licensed Work in original or
+modified form from a third party, the terms and conditions set forth in this
+License apply to your use of that work.
+
+Any use of the Licensed Work in violation of this License will automatically
+terminate your rights under this License for the current and all other
+versions of the Licensed Work.
+
+This License does not grant you any right in any trademark or logo of
+Licensor or its affiliates (provided that you may use a trademark or logo of
+Licensor as expressly required by this License).
+
+TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
+AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
+EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
+TITLE.
+
+MariaDB hereby grants you permission to use this License’s text to license
+your works, and to refer to it using the trademark “Business Source License”,
+as long as you comply with the Covenants of Licensor below.
+
+Covenants of Licensor
+
+In consideration of the right to use this License’s text and the “Business
+Source License” name and trademark, Licensor covenants to MariaDB, and to all
+other recipients of the licensed work to be provided by Licensor:
+
+1. To specify as the Change License the GPL Version 2.0 or any later version,
+   or a license that is compatible with GPL Version 2.0 or a later version,
+   where “compatible” means that software provided under the Change License can
+   be included in a program with software provided under GPL Version 2.0 or a
+   later version. Licensor may specify additional Change Licenses without
+   limitation.
+
+2. To either: (a) specify an additional grant of rights to use that does not
+   impose any additional restriction on the right granted in this License, as
+   the Additional Use Grant; or (b) insert the text “None”.
+
+3. To specify a Change Date.
+
+4. Not to modify this License in any other way.

+ 75 - 0
apps/emqx_durable_storage/IMPLEMENTATION.md

@@ -0,0 +1,75 @@
+# General concepts
+
+In the logic layer we don't speak about replication.
+This is because we could use an external DB with its own replication logic.
+
+On the other hand, we introduce notion of shard right here at the logic.
+This is because shared subscription logic needs to be aware of it to some extend, as it has to split work between subscribers somehow.
+
+# Tables
+
+## Message storage
+
+Data is written every time a message matching certain pattern is published.
+This pattern is not part of the logic layer spec.
+
+Write throughput: very high
+Data size: very high
+Write pattern: append only
+Read pattern: pseudoserial
+
+Number of records: O(total write throughput * retention time)
+
+## Session storage
+
+Data there is updated when:
+
+- A new client connects with clean session = false
+- Client subscribes to a topic
+- Client unsubscribes to a topic
+- Garbage collection is performed
+
+Write throughput: low
+
+Data is read when a client connects and replay agents are started
+
+Read throughput: low
+
+Data format:
+
+`#session{clientId = "foobar", iterators = [ItKey1, ItKey2, ItKey3, ...]}`
+
+Number of records: O(N clients)
+Size of record: O(N subscriptions per clients)
+
+## Iterator storage
+
+Data is written every time a client acks a message.
+Data is read when a client reconnects and we restart replay agents.
+
+`#iterator{key = IterKey, data = Blob}`
+
+Number of records: O(N clients * N subscriptions per client)
+Size of record: O(1)
+Write throughput: high, lots of small updates
+Write pattern: mostly key overwrite
+Read throughput: low
+Read pattern: random
+
+# Push vs. Pull model
+
+In push model we have replay agents iterating over the dataset in the shards.
+
+In pull model the the client processes work with iterators.
+
+## Push pros:
+- Lower latency: message can be dispatched to the client as soon as it's persisted
+- Less worry about buffering
+
+## Push cons:
+- Need pushback logic
+- It's not entirely justified when working with external DB that may not provide streaming API
+
+## Pull pros:
+- No need for pushback: client advances iterators at its own tempo
+-

+ 37 - 0
apps/emqx_durable_storage/README.md

@@ -0,0 +1,37 @@
+# EMQX Replay
+
+`emqx_ds` is a durable storage for MQTT messages within EMQX.
+It implements the following scenarios:
+- Persisting messages published by clients
+-
+
+> 0. App overview introduction
+> 1. let people know what your project can do specifically. Is it a base
+> library dependency, or what kind of functionality is provided to the user?
+> 2. Provide context and add a link to any reference visitors might be
+> unfamiliar with.
+> 3. Design details, implementation technology architecture, Roadmap, etc.
+
+# [Features] - [Optional]
+> A List of features your application provided. If the feature is quite simple, just
+> list in the previous section.
+
+# Limitation
+TBD
+
+# Documentation links
+TBD
+
+# Usage
+TBD
+
+# Configurations
+TBD
+
+# HTTP APIs
+
+# Other
+TBD
+
+# Contributing
+Please see our [contributing.md](../../CONTRIBUTING.md).

+ 190 - 0
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -0,0 +1,190 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_ds).
+
+%% API:
+%%   Messages:
+-export([message_store/2, message_store/1, message_stats/0]).
+%%   Iterator:
+-export([iterator_update/2, iterator_next/1, iterator_stats/0]).
+%%   Session:
+-export([
+    session_open/1,
+    session_drop/1,
+    session_suspend/1,
+    session_add_iterator/2,
+    session_del_iterator/2,
+    session_stats/0
+]).
+
+%% internal exports:
+-export([]).
+
+-export_type([
+    message_id/0,
+    message_stats/0,
+    message_store_opts/0,
+    session_id/0,
+    iterator_id/0,
+    iterator/0,
+    shard/0,
+    topic/0,
+    time/0
+]).
+
+-include("emqx_ds_int.hrl").
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+-type session_id() :: emqx_types:clientid().
+
+-type iterator() :: term().
+
+-opaque iterator_id() :: binary().
+
+%%-type session() :: #session{}.
+
+-type message_store_opts() :: #{}.
+
+-type message_stats() :: #{}.
+
+-type message_id() :: binary().
+
+%% Parsed topic:
+-type topic() :: list(binary()).
+
+-type shard() :: binary().
+
+%% Timestamp
+%% Earliest possible timestamp is 0.
+%% TODO granularity?
+-type time() :: non_neg_integer().
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+%%--------------------------------------------------------------------------------
+%% Message
+%%--------------------------------------------------------------------------------
+-spec message_store([emqx_types:message()], message_store_opts()) ->
+    {ok, [message_id()]} | {error, _}.
+message_store(_Msg, _Opts) ->
+    %% TODO
+    {error, not_implemented}.
+
+-spec message_store([emqx_types:message()]) -> {ok, [message_id()]} | {error, _}.
+message_store(Msg) ->
+    %% TODO
+    message_store(Msg, #{}).
+
+-spec message_stats() -> message_stats().
+message_stats() ->
+    #{}.
+
+%%--------------------------------------------------------------------------------
+%% Session
+%%--------------------------------------------------------------------------------
+
+%% @doc Called when a client connects. This function looks up a
+%% session or creates a new one if previous one couldn't be found.
+%%
+%% This function also spawns replay agents for each iterator.
+%%
+%% Note: session API doesn't handle session takeovers, it's the job of
+%% the broker.
+-spec session_open(emqx_types:clientid()) -> {_New :: boolean(), session_id(), [iterator_id()]}.
+session_open(ClientID) ->
+    {atomic, Ret} =
+        mria:transaction(
+            ?DS_SHARD,
+            fun() ->
+                case mnesia:read(?SESSION_TAB, ClientID) of
+                    [#session{iterators = Iterators}] ->
+                        {false, ClientID, Iterators};
+                    [] ->
+                        Session = #session{id = ClientID, iterators = []},
+                        mnesia:write(?SESSION_TAB, Session, write),
+                        {true, ClientID, []}
+                end
+            end
+        ),
+    Ret.
+
+%% @doc Called when a client reconnects with `clean session=true' or
+%% during session GC
+-spec session_drop(emqx_types:clientid()) -> ok.
+session_drop(ClientID) ->
+    {atomic, ok} = mria:transaction(
+        ?DS_SHARD,
+        fun() ->
+            mnesia:delete({?SESSION_TAB, ClientID})
+        end
+    ),
+    ok.
+
+%% @doc Called when a client disconnects. This function terminates all
+%% active processes related to the session.
+-spec session_suspend(session_id()) -> ok | {error, session_not_found}.
+session_suspend(_SessionId) ->
+    %% TODO
+    ok.
+
+%% @doc Called when a client subscribes to a topic. Idempotent.
+-spec session_add_iterator(session_id(), emqx_topic:words()) ->
+    {ok, iterator_id()} | {error, session_not_found}.
+session_add_iterator(_SessionId, _TopicFilter) ->
+    %% TODO
+    {ok, <<"">>}.
+
+%% @doc Called when a client unsubscribes from a topic. Returns `true'
+%% if the session contained the subscription or `false' if it wasn't
+%% subscribed.
+-spec session_del_iterator(session_id(), emqx_topic:words()) ->
+    {ok, boolean()} | {error, session_not_found}.
+session_del_iterator(_SessionId, _TopicFilter) ->
+    %% TODO
+    {ok, false}.
+
+-spec session_stats() -> #{}.
+session_stats() ->
+    #{}.
+
+%%--------------------------------------------------------------------------------
+%% Iterator (pull API)
+%%--------------------------------------------------------------------------------
+
+%% @doc Called when a client acks a message
+-spec iterator_update(iterator_id(), iterator()) -> ok.
+iterator_update(_IterId, _Iter) ->
+    %% TODO
+    ok.
+
+%% @doc Called when a client acks a message
+-spec iterator_next(iterator()) -> {value, emqx_types:message(), iterator()} | none | {error, _}.
+iterator_next(_Iter) ->
+    %% TODO
+    none.
+
+-spec iterator_stats() -> #{}.
+iterator_stats() ->
+    #{}.
+
+%%================================================================================
+%% Internal functions
+%%================================================================================

+ 25 - 0
apps/emqx_durable_storage/src/emqx_ds_app.erl

@@ -0,0 +1,25 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_app).
+
+-export([start/2]).
+
+-include("emqx_ds_int.hrl").
+
+start(_Type, _Args) ->
+    init_mnesia(),
+    emqx_ds_sup:start_link().
+
+init_mnesia() ->
+    ok = mria:create_table(
+        ?SESSION_TAB,
+        [
+            {rlog_shard, ?DS_SHARD},
+            {type, set},
+            {storage, rocksdb_copies},
+            {record_name, session},
+            {attributes, record_info(fields, session)}
+        ]
+    ).

+ 60 - 0
apps/emqx_durable_storage/src/emqx_ds_conf.erl

@@ -0,0 +1,60 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_ds_conf).
+
+%% TODO: make a proper HOCON schema and all...
+
+%% API:
+-export([shard_config/1, db_options/0]).
+
+-export([shard_iteration_options/1]).
+-export([default_iteration_options/0]).
+
+-type backend_config() ::
+    {emqx_ds_message_storage_bitmask, emqx_ds_message_storage_bitmask:options()}
+    | {module(), _Options}.
+
+-export_type([backend_config/0]).
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+-define(APP, emqx_ds).
+
+-spec shard_config(emqx_ds:shard()) -> backend_config().
+shard_config(Shard) ->
+    DefaultShardConfig = application:get_env(?APP, default_shard_config, default_shard_config()),
+    Shards = application:get_env(?APP, shard_config, #{}),
+    maps:get(Shard, Shards, DefaultShardConfig).
+
+-spec shard_iteration_options(emqx_ds:shard()) ->
+    emqx_ds_message_storage_bitmask:iteration_options().
+shard_iteration_options(Shard) ->
+    case shard_config(Shard) of
+        {emqx_ds_message_storage_bitmask, Config} ->
+            maps:get(iteration, Config, default_iteration_options());
+        {_Module, _} ->
+            default_iteration_options()
+    end.
+
+-spec default_iteration_options() -> emqx_ds_message_storage_bitmask:iteration_options().
+default_iteration_options() ->
+    {emqx_ds_message_storage_bitmask, Config} = default_shard_config(),
+    maps:get(iteration, Config).
+
+-spec default_shard_config() -> backend_config().
+default_shard_config() ->
+    {emqx_ds_message_storage_bitmask, #{
+        timestamp_bits => 64,
+        topic_bits_per_level => [8, 8, 8, 32, 16],
+        epoch => 5,
+        iteration => #{
+            iterator_refresh => {every, 100}
+        }
+    }}.
+
+-spec db_options() -> emqx_ds_storage_layer:db_options().
+db_options() ->
+    application:get_env(?APP, db_options, []).

+ 27 - 0
apps/emqx_durable_storage/src/emqx_ds_int.hrl

@@ -0,0 +1,27 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-ifndef(EMQX_DS_HRL).
+-define(EMQX_DS_HRL, true).
+
+-define(SESSION_TAB, emqx_ds_session).
+-define(DS_SHARD, emqx_ds_shard).
+
+-record(session, {
+    id :: emqx_ds:session_id(),
+    iterators :: [{emqx_topic:words(), emqx_ds:iterator_id()}]
+}).
+
+-endif.

+ 731 - 0
apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl

@@ -0,0 +1,731 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_message_storage_bitmask).
+
+%%================================================================================
+%% @doc Description of the schema
+%%
+%% Let us assume that `T' is a topic and `t' is time. These are the two
+%% dimensions used to index messages. They can be viewed as
+%% "coordinates" of an MQTT message in a 2D space.
+%%
+%% Oftentimes, when wildcard subscription is used, keys must be
+%% scanned in both dimensions simultaneously.
+%%
+%% Rocksdb allows to iterate over sorted keys very fast. This means we
+%% need to map our two-dimentional keys to a single index that is
+%% sorted in a way that helps to iterate over both time and topic
+%% without having to do a lot of random seeks.
+%%
+%% == Mapping of 2D keys to rocksdb keys ==
+%%
+%% We use "zigzag" pattern to store messages, where rocksdb key is
+%% composed like like this:
+%%
+%%              |ttttt|TTTTTTTTT|tttt|
+%%                 ^       ^      ^
+%%                 |       |      |
+%%         +-------+       |      +---------+
+%%         |               |                |
+%% most significant    topic hash   least significant
+%% bits of timestamp                bits of timestamp
+%% (a.k.a epoch)                    (a.k.a time offset)
+%%
+%% Topic hash is level-aware: each topic level is hashed separately
+%% and the resulting hashes are bitwise-concatentated. This allows us
+%% to map topics to fixed-length bitstrings while keeping some degree
+%% of information about the hierarchy.
+%%
+%% Next important concept is what we call "epoch". Duration of the
+%% epoch is determined by maximum time offset. Epoch is calculated by
+%% shifting bits of the timestamp right.
+%%
+%% The resulting index is a space-filling curve that looks like
+%% this in the topic-time 2D space:
+%%
+%% T ^ ---->------   |---->------   |---->------
+%%   |       --/     /      --/     /      --/
+%%   |   -<-/       |   -<-/       |   -<-/
+%%   | -/           | -/           | -/
+%%   | ---->------  | ---->------  | ---->------
+%%   |       --/    /       --/    /       --/
+%%   |   ---/      |    ---/      |    ---/
+%%   | -/          ^  -/          ^  -/
+%%   | ---->------ |  ---->------ |  ---->------
+%%   |       --/   /        --/   /        --/
+%%   |   -<-/     |     -<-/     |     -<-/
+%%   | -/         |   -/         |   -/
+%%   | ---->------|   ---->------|   ---------->
+%%   |
+%%  -+------------+-----------------------------> t
+%%        epoch
+%%
+%% This structure allows to quickly seek to a the first message that
+%% was recorded in a certain epoch in a certain topic or a
+%% group of topics matching filter like `foo/bar/#`.
+%%
+%% Due to its structure, for each pair of rocksdb keys K1 and K2, such
+%% that K1 > K2 and topic(K1) = topic(K2), timestamp(K1) >
+%% timestamp(K2).
+%% That is, replay doesn't reorder messages published in each
+%% individual topic.
+%%
+%% This property doesn't hold between different topics, but it's not deemed
+%% a problem right now.
+%%
+%%================================================================================
+
+%% API:
+-export([create_new/3, open/5]).
+-export([make_keymapper/1]).
+
+-export([store/5]).
+-export([make_iterator/2]).
+-export([make_iterator/3]).
+-export([next/1]).
+
+-export([preserve_iterator/1]).
+-export([restore_iterator/3]).
+-export([refresh_iterator/1]).
+
+%% Debug/troubleshooting:
+%% Keymappers
+-export([
+    keymapper_info/1,
+    compute_bitstring/3,
+    compute_topic_bitmask/2,
+    compute_time_bitmask/1,
+    hash/2
+]).
+
+%% Keyspace filters
+-export([
+    make_keyspace_filter/2,
+    compute_initial_seek/1,
+    compute_next_seek/2,
+    compute_time_seek/3,
+    compute_topic_seek/4
+]).
+
+-export_type([db/0, iterator/0, schema/0]).
+
+-export_type([options/0]).
+-export_type([iteration_options/0]).
+
+-compile(
+    {inline, [
+        bitwise_concat/3,
+        ones/1,
+        successor/1,
+        topic_hash_matches/3,
+        time_matches/3
+    ]}
+).
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+-type topic() :: emqx_ds:topic().
+-type time() :: emqx_ds:time().
+
+%% Number of bits
+-type bits() :: non_neg_integer().
+
+%% Key of a RocksDB record.
+-type key() :: binary().
+
+%% Distribution of entropy among topic levels.
+%% Example: [4, 8, 16] means that level 1 gets 4 bits, level 2 gets 8 bits,
+%% and _rest of levels_ (if any) get 16 bits.
+-type bits_per_level() :: [bits(), ...].
+
+-type options() :: #{
+    %% Number of bits in a message timestamp.
+    timestamp_bits := bits(),
+    %% Number of bits in a key allocated to each level in a message topic.
+    topic_bits_per_level := bits_per_level(),
+    %% Maximum granularity of iteration over time.
+    epoch := time(),
+
+    iteration => iteration_options(),
+
+    cf_options => emqx_ds_storage_layer:db_cf_options()
+}.
+
+-type iteration_options() :: #{
+    %% Request periodic iterator refresh.
+    %% This might be helpful during replays taking a lot of time (e.g. tens of seconds).
+    %% Note that `{every, 1000}` means 1000 _operations_ with the iterator which is not
+    %% the same as 1000 replayed messages.
+    iterator_refresh => {every, _NumOperations :: pos_integer()}
+}.
+
+%% Persistent configuration of the generation, it is used to create db
+%% record when the database is reopened
+-record(schema, {keymapper :: keymapper()}).
+
+-opaque schema() :: #schema{}.
+
+-record(db, {
+    shard :: emqx_ds:shard(),
+    handle :: rocksdb:db_handle(),
+    cf :: rocksdb:cf_handle(),
+    keymapper :: keymapper(),
+    write_options = [{sync, true}] :: emqx_ds_storage_layer:db_write_options(),
+    read_options = [] :: emqx_ds_storage_layer:db_write_options()
+}).
+
+-record(it, {
+    handle :: rocksdb:itr_handle(),
+    filter :: keyspace_filter(),
+    cursor :: binary() | undefined,
+    next_action :: {seek, binary()} | next,
+    refresh_counter :: {non_neg_integer(), pos_integer()} | undefined
+}).
+
+-record(filter, {
+    keymapper :: keymapper(),
+    topic_filter :: emqx_topic:words(),
+    start_time :: integer(),
+    hash_bitfilter :: integer(),
+    hash_bitmask :: integer(),
+    time_bitfilter :: integer(),
+    time_bitmask :: integer()
+}).
+
+% NOTE
+% Keymapper decides how to map messages into RocksDB column family keyspace.
+-record(keymapper, {
+    source :: [bitsource(), ...],
+    bitsize :: bits(),
+    epoch :: non_neg_integer()
+}).
+
+-type bitsource() ::
+    %% Consume `_Size` bits from timestamp starting at `_Offset`th bit.
+    %% TODO consistency
+    {timestamp, _Offset :: bits(), _Size :: bits()}
+    %% Consume next topic level (either one or all of them) and compute `_Size` bits-wide hash.
+    | {hash, level | levels, _Size :: bits()}.
+
+-opaque db() :: #db{}.
+-opaque iterator() :: #it{}.
+-type keymapper() :: #keymapper{}.
+-type keyspace_filter() :: #filter{}.
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+%% Create a new column family for the generation and a serializable representation of the schema
+-spec create_new(rocksdb:db_handle(), emqx_ds_storage_layer:gen_id(), options()) ->
+    {schema(), emqx_ds_storage_layer:cf_refs()}.
+create_new(DBHandle, GenId, Options) ->
+    CFName = data_cf(GenId),
+    CFOptions = maps:get(cf_options, Options, []),
+    {ok, CFHandle} = rocksdb:create_column_family(DBHandle, CFName, CFOptions),
+    Schema = #schema{keymapper = make_keymapper(Options)},
+    {Schema, [{CFName, CFHandle}]}.
+
+%% Reopen the database
+-spec open(
+    emqx_ds:shard(),
+    rocksdb:db_handle(),
+    emqx_ds_storage_layer:gen_id(),
+    emqx_ds_storage_layer:cf_refs(),
+    schema()
+) ->
+    db().
+open(Shard, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) ->
+    {value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs),
+    #db{
+        shard = Shard,
+        handle = DBHandle,
+        cf = CFHandle,
+        keymapper = Keymapper
+    }.
+
+-spec make_keymapper(options()) -> keymapper().
+make_keymapper(#{
+    timestamp_bits := TimestampBits,
+    topic_bits_per_level := BitsPerLevel,
+    epoch := MaxEpoch
+}) ->
+    TimestampLSBs = min(TimestampBits, floor(math:log2(MaxEpoch))),
+    TimestampMSBs = TimestampBits - TimestampLSBs,
+    NLevels = length(BitsPerLevel),
+    {LevelBits, [TailLevelsBits]} = lists:split(NLevels - 1, BitsPerLevel),
+    Source = lists:flatten([
+        [{timestamp, TimestampLSBs, TimestampMSBs} || TimestampMSBs > 0],
+        [{hash, level, Bits} || Bits <- LevelBits],
+        {hash, levels, TailLevelsBits},
+        [{timestamp, 0, TimestampLSBs} || TimestampLSBs > 0]
+    ]),
+    #keymapper{
+        source = Source,
+        bitsize = lists:sum([S || {_, _, S} <- Source]),
+        epoch = 1 bsl TimestampLSBs
+    }.
+
+-spec store(db(), emqx_guid:guid(), time(), topic(), binary()) ->
+    ok | {error, _TODO}.
+store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, MessagePayload) ->
+    Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper),
+    Value = make_message_value(Topic, MessagePayload),
+    rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options).
+
+-spec make_iterator(db(), emqx_ds:replay()) ->
+    {ok, iterator()} | {error, _TODO}.
+make_iterator(DB, Replay) ->
+    Options = emqx_ds_conf:shard_iteration_options(DB#db.shard),
+    make_iterator(DB, Replay, Options).
+
+-spec make_iterator(db(), emqx_ds:replay(), iteration_options()) ->
+    % {error, invalid_start_time}? might just start from the beginning of time
+    % and call it a day: client violated the contract anyway.
+    {ok, iterator()} | {error, _TODO}.
+make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, Replay, Options) ->
+    case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of
+        {ok, ITHandle} ->
+            Filter = make_keyspace_filter(Replay, DB#db.keymapper),
+            InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper),
+            RefreshCounter = make_refresh_counter(maps:get(iterator_refresh, Options, undefined)),
+            {ok, #it{
+                handle = ITHandle,
+                filter = Filter,
+                next_action = {seek, InitialSeek},
+                refresh_counter = RefreshCounter
+            }};
+        Err ->
+            Err
+    end.
+
+-spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}.
+next(It0 = #it{filter = #filter{keymapper = Keymapper}}) ->
+    It = maybe_refresh_iterator(It0),
+    case rocksdb:iterator_move(It#it.handle, It#it.next_action) of
+        % spec says `{ok, Key}` is also possible but the implementation says it's not
+        {ok, Key, Value} ->
+            % Preserve last seen key in the iterator so it could be restored / refreshed later.
+            ItNext = It#it{cursor = Key},
+            Bitstring = extract(Key, Keymapper),
+            case match_next(Bitstring, Value, It#it.filter) of
+                {_Topic, Payload} ->
+                    {value, Payload, ItNext#it{next_action = next}};
+                next ->
+                    next(ItNext#it{next_action = next});
+                NextBitstring when is_integer(NextBitstring) ->
+                    NextSeek = combine(NextBitstring, <<>>, Keymapper),
+                    next(ItNext#it{next_action = {seek, NextSeek}});
+                none ->
+                    stop_iteration(ItNext)
+            end;
+        {error, invalid_iterator} ->
+            stop_iteration(It);
+        {error, iterator_closed} ->
+            {error, closed}
+    end.
+
+-spec preserve_iterator(iterator()) -> binary().
+preserve_iterator(#it{cursor = Cursor}) ->
+    State = #{
+        v => 1,
+        cursor => Cursor
+    },
+    term_to_binary(State).
+
+-spec restore_iterator(db(), emqx_ds:replay(), binary()) ->
+    {ok, iterator()} | {error, _TODO}.
+restore_iterator(DB, Replay, Serial) when is_binary(Serial) ->
+    State = binary_to_term(Serial),
+    restore_iterator(DB, Replay, State);
+restore_iterator(DB, Replay, #{
+    v := 1,
+    cursor := Cursor
+}) ->
+    case make_iterator(DB, Replay) of
+        {ok, It} when Cursor == undefined ->
+            % Iterator was preserved right after it has been made.
+            {ok, It};
+        {ok, It} ->
+            % Iterator was preserved mid-replay, seek right past the last seen key.
+            {ok, It#it{cursor = Cursor, next_action = {seek, successor(Cursor)}}};
+        Err ->
+            Err
+    end.
+
+-spec refresh_iterator(iterator()) -> iterator().
+refresh_iterator(It = #it{handle = Handle, cursor = Cursor, next_action = Action}) ->
+    case rocksdb:iterator_refresh(Handle) of
+        ok when Action =:= next ->
+            % Now the underlying iterator is invalid, need to seek instead.
+            It#it{next_action = {seek, successor(Cursor)}};
+        ok ->
+            % Now the underlying iterator is invalid, but will seek soon anyway.
+            It;
+        {error, _} ->
+            % Implementation could in theory return an {error, ...} tuple.
+            % Supposedly our best bet is to ignore it.
+            % TODO logging?
+            It
+    end.
+
+%%================================================================================
+%% Internal exports
+%%================================================================================
+
+-spec keymapper_info(keymapper()) ->
+    #{source := [bitsource()], bitsize := bits(), epoch := time()}.
+keymapper_info(#keymapper{source = Source, bitsize = Bitsize, epoch = Epoch}) ->
+    #{source => Source, bitsize => Bitsize, epoch => Epoch}.
+
+make_message_key(Topic, PublishedAt, MessageID, Keymapper) ->
+    combine(compute_bitstring(Topic, PublishedAt, Keymapper), MessageID, Keymapper).
+
+make_message_value(Topic, MessagePayload) ->
+    term_to_binary({Topic, MessagePayload}).
+
+unwrap_message_value(Binary) ->
+    binary_to_term(Binary).
+
+-spec combine(_Bitstring :: integer(), emqx_guid:guid() | <<>>, keymapper()) ->
+    key().
+combine(Bitstring, MessageID, #keymapper{bitsize = Size}) ->
+    <<Bitstring:Size/integer, MessageID/binary>>.
+
+-spec extract(key(), keymapper()) ->
+    _Bitstring :: integer().
+extract(Key, #keymapper{bitsize = Size}) ->
+    <<Bitstring:Size/integer, _MessageID/binary>> = Key,
+    Bitstring.
+
+-spec compute_bitstring(topic(), time(), keymapper()) -> integer().
+compute_bitstring(Topic, Timestamp, #keymapper{source = Source}) ->
+    compute_bitstring(Topic, Timestamp, Source, 0).
+
+-spec compute_topic_bitmask(emqx_topic:words(), keymapper()) -> integer().
+compute_topic_bitmask(TopicFilter, #keymapper{source = Source}) ->
+    compute_topic_bitmask(TopicFilter, Source, 0).
+
+-spec compute_time_bitmask(keymapper()) -> integer().
+compute_time_bitmask(#keymapper{source = Source}) ->
+    compute_time_bitmask(Source, 0).
+
+-spec hash(term(), bits()) -> integer().
+hash(Input, Bits) ->
+    % at most 32 bits
+    erlang:phash2(Input, 1 bsl Bits).
+
+-spec make_keyspace_filter(emqx_ds:replay(), keymapper()) -> keyspace_filter().
+make_keyspace_filter({TopicFilter, StartTime}, Keymapper) ->
+    Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper),
+    HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper),
+    TimeBitmask = compute_time_bitmask(Keymapper),
+    HashBitfilter = Bitstring band HashBitmask,
+    TimeBitfilter = Bitstring band TimeBitmask,
+    #filter{
+        keymapper = Keymapper,
+        topic_filter = TopicFilter,
+        start_time = StartTime,
+        hash_bitfilter = HashBitfilter,
+        hash_bitmask = HashBitmask,
+        time_bitfilter = TimeBitfilter,
+        time_bitmask = TimeBitmask
+    }.
+
+-spec compute_initial_seek(keyspace_filter()) -> integer().
+compute_initial_seek(#filter{hash_bitfilter = HashBitfilter, time_bitfilter = TimeBitfilter}) ->
+    % Should be the same as `compute_initial_seek(0, Filter)`.
+    HashBitfilter bor TimeBitfilter.
+
+-spec compute_next_seek(integer(), keyspace_filter()) -> integer().
+compute_next_seek(
+    Bitstring,
+    Filter = #filter{
+        hash_bitfilter = HashBitfilter,
+        hash_bitmask = HashBitmask,
+        time_bitfilter = TimeBitfilter,
+        time_bitmask = TimeBitmask
+    }
+) ->
+    HashMatches = topic_hash_matches(Bitstring, HashBitfilter, HashBitmask),
+    TimeMatches = time_matches(Bitstring, TimeBitfilter, TimeBitmask),
+    compute_next_seek(HashMatches, TimeMatches, Bitstring, Filter).
+
+%%================================================================================
+%% Internal functions
+%%================================================================================
+
+compute_bitstring(Topic, Timestamp, [{timestamp, Offset, Size} | Rest], Acc) ->
+    I = (Timestamp bsr Offset) band ones(Size),
+    compute_bitstring(Topic, Timestamp, Rest, bitwise_concat(Acc, I, Size));
+compute_bitstring([], Timestamp, [{hash, level, Size} | Rest], Acc) ->
+    I = hash(<<"/">>, Size),
+    compute_bitstring([], Timestamp, Rest, bitwise_concat(Acc, I, Size));
+compute_bitstring([Level | Tail], Timestamp, [{hash, level, Size} | Rest], Acc) ->
+    I = hash(Level, Size),
+    compute_bitstring(Tail, Timestamp, Rest, bitwise_concat(Acc, I, Size));
+compute_bitstring(Tail, Timestamp, [{hash, levels, Size} | Rest], Acc) ->
+    I = hash(Tail, Size),
+    compute_bitstring(Tail, Timestamp, Rest, bitwise_concat(Acc, I, Size));
+compute_bitstring(_, _, [], Acc) ->
+    Acc.
+
+compute_topic_bitmask(Filter, [{timestamp, _, Size} | Rest], Acc) ->
+    compute_topic_bitmask(Filter, Rest, bitwise_concat(Acc, 0, Size));
+compute_topic_bitmask(['#'], [{hash, _, Size} | Rest], Acc) ->
+    compute_topic_bitmask(['#'], Rest, bitwise_concat(Acc, 0, Size));
+compute_topic_bitmask(['+' | Tail], [{hash, _, Size} | Rest], Acc) ->
+    compute_topic_bitmask(Tail, Rest, bitwise_concat(Acc, 0, Size));
+compute_topic_bitmask([], [{hash, level, Size} | Rest], Acc) ->
+    compute_topic_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size));
+compute_topic_bitmask([_ | Tail], [{hash, level, Size} | Rest], Acc) ->
+    compute_topic_bitmask(Tail, Rest, bitwise_concat(Acc, ones(Size), Size));
+compute_topic_bitmask(Tail, [{hash, levels, Size} | Rest], Acc) ->
+    Mask =
+        case lists:member('+', Tail) orelse lists:member('#', Tail) of
+            true -> 0;
+            false -> ones(Size)
+        end,
+    compute_topic_bitmask([], Rest, bitwise_concat(Acc, Mask, Size));
+compute_topic_bitmask(_, [], Acc) ->
+    Acc.
+
+compute_time_bitmask([{timestamp, _, Size} | Rest], Acc) ->
+    compute_time_bitmask(Rest, bitwise_concat(Acc, ones(Size), Size));
+compute_time_bitmask([{hash, _, Size} | Rest], Acc) ->
+    compute_time_bitmask(Rest, bitwise_concat(Acc, 0, Size));
+compute_time_bitmask([], Acc) ->
+    Acc.
+
+bitwise_concat(Acc, Item, ItemSize) ->
+    (Acc bsl ItemSize) bor Item.
+
+ones(Bits) ->
+    1 bsl Bits - 1.
+
+-spec successor(key()) -> key().
+successor(Key) ->
+    <<Key/binary, 0:8>>.
+
+%% |123|345|678|
+%%  foo bar baz
+
+%% |123|000|678| - |123|fff|678|
+
+%%  foo +   baz
+
+%% |fff|000|fff|
+
+%% |123|000|678|
+
+%% |123|056|678| & |fff|000|fff| = |123|000|678|.
+
+match_next(
+    Bitstring,
+    Value,
+    Filter = #filter{
+        topic_filter = TopicFilter,
+        hash_bitfilter = HashBitfilter,
+        hash_bitmask = HashBitmask,
+        time_bitfilter = TimeBitfilter,
+        time_bitmask = TimeBitmask
+    }
+) ->
+    HashMatches = topic_hash_matches(Bitstring, HashBitfilter, HashBitmask),
+    TimeMatches = time_matches(Bitstring, TimeBitfilter, TimeBitmask),
+    case HashMatches and TimeMatches of
+        true ->
+            Message = {Topic, _Payload} = unwrap_message_value(Value),
+            case emqx_topic:match(Topic, TopicFilter) of
+                true ->
+                    Message;
+                false ->
+                    next
+            end;
+        false ->
+            compute_next_seek(HashMatches, TimeMatches, Bitstring, Filter)
+    end.
+
+%% `Bitstring` is out of the hash space defined by `HashBitfilter`.
+compute_next_seek(
+    _HashMatches = false,
+    _TimeMatches,
+    Bitstring,
+    Filter = #filter{
+        keymapper = Keymapper,
+        hash_bitfilter = HashBitfilter,
+        hash_bitmask = HashBitmask,
+        time_bitfilter = TimeBitfilter,
+        time_bitmask = TimeBitmask
+    }
+) ->
+    NextBitstring = compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper),
+    case NextBitstring of
+        none ->
+            none;
+        _ ->
+            TimeMatches = time_matches(NextBitstring, TimeBitfilter, TimeBitmask),
+            compute_next_seek(true, TimeMatches, NextBitstring, Filter)
+    end;
+%% `Bitstring` is out of the time range defined by `TimeBitfilter`.
+compute_next_seek(
+    _HashMatches = true,
+    _TimeMatches = false,
+    Bitstring,
+    #filter{
+        time_bitfilter = TimeBitfilter,
+        time_bitmask = TimeBitmask
+    }
+) ->
+    compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask);
+compute_next_seek(true, true, Bitstring, _It) ->
+    Bitstring.
+
+topic_hash_matches(Bitstring, HashBitfilter, HashBitmask) ->
+    (Bitstring band HashBitmask) == HashBitfilter.
+
+time_matches(Bitstring, TimeBitfilter, TimeBitmask) ->
+    (Bitstring band TimeBitmask) >= TimeBitfilter.
+
+compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask) ->
+    % Replace the bits of the timestamp in `Bistring` with bits from `Timebitfilter`.
+    (Bitstring band (bnot TimeBitmask)) bor TimeBitfilter.
+
+%% Find the closest bitstring which is:
+%% * greater than `Bitstring`,
+%% * and falls into the hash space defined by `HashBitfilter`.
+%% Note that the result can end up "back" in time and out of the time range.
+compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) ->
+    Sources = Keymapper#keymapper.source,
+    Size = Keymapper#keymapper.bitsize,
+    compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size).
+
+compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) ->
+    % NOTE
+    % We're iterating through `Substring` here, in lockstep with `HashBitfilter`
+    % and `HashBitmask`, starting from least signigicant bits. Each bitsource in
+    % `Sources` has a bitsize `S` and, accordingly, gives us a sub-bitstring `S`
+    % bits long which we interpret as a "digit". There are 2 flavors of those
+    % "digits":
+    %  * regular digit with 2^S possible values,
+    %  * degenerate digit with exactly 1 possible value U (represented with 0).
+    % Our goal here is to find a successor of `Bistring` and perform a kind of
+    % digit-by-digit addition operation with carry propagation.
+    NextSeek = zipfoldr3(
+        fun(Source, Substring, Filter, LBitmask, Offset, Acc) ->
+            case Source of
+                {hash, _, S} when LBitmask =:= 0 ->
+                    % Regular case
+                    bitwise_add_digit(Substring, Acc, S, Offset);
+                {hash, _, _} when LBitmask =/= 0, Substring < Filter ->
+                    % Degenerate case, I_digit < U, no overflow.
+                    % Successor is `U bsl Offset` which is equivalent to 0.
+                    0;
+                {hash, _, S} when LBitmask =/= 0, Substring > Filter ->
+                    % Degenerate case, I_digit > U, overflow.
+                    % Successor is `(1 bsl Size + U) bsl Offset`.
+                    overflow_digit(S, Offset);
+                {hash, _, S} when LBitmask =/= 0 ->
+                    % Degenerate case, I_digit = U
+                    % Perform digit addition with I_digit = 0, assuming "digit" has
+                    % 0 bits of information (but is `S` bits long at the same time).
+                    % This will overflow only if the result of previous iteration
+                    % was an overflow.
+                    bitwise_add_digit(0, Acc, 0, S, Offset);
+                {timestamp, _, S} ->
+                    % Regular case
+                    bitwise_add_digit(Substring, Acc, S, Offset)
+            end
+        end,
+        0,
+        Bitstring,
+        HashBitfilter,
+        HashBitmask,
+        Size,
+        Sources
+    ),
+    case NextSeek bsr Size of
+        _Carry = 0 ->
+            % Found the successor.
+            % We need to recover values of those degenerate digits which we
+            % represented with 0 during digit-by-digit iteration.
+            NextSeek bor (HashBitfilter band HashBitmask);
+        _Carry = 1 ->
+            % We got "carried away" past the range, time to stop iteration.
+            none
+    end.
+
+bitwise_add_digit(Digit, Number, Width, Offset) ->
+    bitwise_add_digit(Digit, Number, Width, Width, Offset).
+
+%% Add "digit" (represented with integer `Digit`) to the `Number` assuming
+%% this digit starts at `Offset` bits in `Number` and is `Width` bits long.
+%% Perform an overflow if the result of addition would not fit into `Bits`
+%% bits.
+bitwise_add_digit(Digit, Number, Bits, Width, Offset) ->
+    Sum = (Digit bsl Offset) + Number,
+    case (Sum bsr Offset) < (1 bsl Bits) of
+        true -> Sum;
+        false -> overflow_digit(Width, Offset)
+    end.
+
+%% Constuct a number which denotes an overflow of digit that starts at
+%% `Offset` bits and is `Width` bits long.
+overflow_digit(Width, Offset) ->
+    (1 bsl Width) bsl Offset.
+
+%% Iterate through sub-bitstrings of 3 integers in lockstep, starting from least
+%% significant bits first.
+%%
+%% Each integer is assumed to be `Size` bits long. Lengths of sub-bitstring are
+%% specified in `Sources` list, in order from most significant bits to least
+%% significant. Each iteration calls `FoldFun` with:
+%% * bitsource that was used to extract sub-bitstrings,
+%% * 3 sub-bitstrings in integer representation,
+%% * bit offset into integers,
+%% * current accumulator.
+-spec zipfoldr3(FoldFun, Acc, integer(), integer(), integer(), _Size :: bits(), [bitsource()]) ->
+    Acc
+when
+    FoldFun :: fun((bitsource(), integer(), integer(), integer(), _Offset :: bits(), Acc) -> Acc).
+zipfoldr3(_FoldFun, Acc, _, _, _, 0, []) ->
+    Acc;
+zipfoldr3(FoldFun, Acc, I1, I2, I3, Offset, [Source = {_, _, S} | Rest]) ->
+    OffsetNext = Offset - S,
+    AccNext = zipfoldr3(FoldFun, Acc, I1, I2, I3, OffsetNext, Rest),
+    FoldFun(
+        Source,
+        substring(I1, OffsetNext, S),
+        substring(I2, OffsetNext, S),
+        substring(I3, OffsetNext, S),
+        OffsetNext,
+        AccNext
+    ).
+
+substring(I, Offset, Size) ->
+    (I bsr Offset) band ones(Size).
+
+%% @doc Generate a column family ID for the MQTT messages
+-spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()].
+data_cf(GenId) ->
+    ?MODULE_STRING ++ integer_to_list(GenId).
+
+make_refresh_counter({every, N}) when is_integer(N), N > 0 ->
+    {0, N};
+make_refresh_counter(undefined) ->
+    undefined.
+
+maybe_refresh_iterator(It = #it{refresh_counter = {N, N}}) ->
+    refresh_iterator(It#it{refresh_counter = {0, N}});
+maybe_refresh_iterator(It = #it{refresh_counter = {M, N}}) ->
+    It#it{refresh_counter = {M + 1, N}};
+maybe_refresh_iterator(It = #it{refresh_counter = undefined}) ->
+    It.
+
+stop_iteration(It) ->
+    ok = rocksdb:iterator_close(It#it.handle),
+    none.

+ 36 - 0
apps/emqx_durable_storage/src/emqx_ds_replay.erl

@@ -0,0 +1,36 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_ds_replay).
+
+%% API:
+-export([]).
+
+-export_type([replay_id/0, replay/0]).
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+-type replay_id() :: binary().
+
+-type replay() :: {
+    _TopicFilter :: emqx_ds:topic(),
+    _StartTime :: emqx_ds:time()
+}.
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+%%================================================================================
+%% behaviour callbacks
+%%================================================================================
+
+%%================================================================================
+%% Internal exports
+%%================================================================================
+
+%%================================================================================
+%% Internal functions
+%%================================================================================

+ 505 - 0
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -0,0 +1,505 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_ds_storage_layer).
+
+-behaviour(gen_server).
+
+%% API:
+-export([start_link/1]).
+-export([create_generation/3]).
+
+-export([store/5]).
+
+-export([make_iterator/2, next/1]).
+
+-export([preserve_iterator/2, restore_iterator/2, discard_iterator/2]).
+
+%% behaviour callbacks:
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
+
+-export_type([cf_refs/0, gen_id/0, db_write_options/0, state/0, iterator/0]).
+
+-compile({inline, [meta_lookup/2]}).
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+%% see rocksdb:db_options()
+% -type options() :: proplists:proplist().
+
+-type db_write_options() :: proplists:proplist().
+
+-type cf_refs() :: [{string(), rocksdb:cf_handle()}].
+
+%% Message storage generation
+%% Keep in mind that instances of this type are persisted in long-term storage.
+-type generation() :: #{
+    %% Module that handles data for the generation
+    module := module(),
+    %% Module-specific data defined at generation creation time
+    data := term(),
+    %% When should this generation become active?
+    %% This generation should only contain messages timestamped no earlier than that.
+    %% The very first generation will have `since` equal 0.
+    since := emqx_ds:time()
+}.
+
+-record(s, {
+    shard :: emqx_ds:shard(),
+    db :: rocksdb:db_handle(),
+    cf_iterator :: rocksdb:cf_handle(),
+    cf_generations :: cf_refs()
+}).
+
+-record(it, {
+    shard :: emqx_ds:shard(),
+    gen :: gen_id(),
+    replay :: emqx_ds:replay(),
+    module :: module(),
+    data :: term()
+}).
+
+-type gen_id() :: 0..16#ffff.
+
+-opaque state() :: #s{}.
+-opaque iterator() :: #it{}.
+
+%% Contents of the default column family:
+%%
+%% [{<<"genNN">>, #generation{}}, ...,
+%%  {<<"current">>, GenID}]
+
+-define(DEFAULT_CF, "default").
+-define(DEFAULT_CF_OPTS, []).
+
+-define(ITERATOR_CF, "$iterators").
+
+%% TODO
+%% 1. CuckooTable might be of use here / `OptimizeForPointLookup(...)`.
+%% 2. Supposedly might be compressed _very_ effectively.
+%% 3. `inplace_update_support`?
+-define(ITERATOR_CF_OPTS, []).
+
+-define(REF(Shard), {via, gproc, {n, l, {?MODULE, Shard}}}).
+
+%%================================================================================
+%% Callbacks
+%%================================================================================
+
+-callback create_new(rocksdb:db_handle(), gen_id(), _Options :: term()) ->
+    {_Schema, cf_refs()}.
+
+-callback open(emqx_ds:shard(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
+    term().
+
+-callback store(_Schema, binary(), emqx_ds:time(), emqx_ds:topic(), binary()) ->
+    ok | {error, _}.
+
+-callback make_iterator(_Schema, emqx_ds:replay()) ->
+    {ok, _It} | {error, _}.
+
+-callback restore_iterator(_Schema, emqx_ds:replay(), binary()) -> {ok, _It} | {error, _}.
+
+-callback preserve_iterator(_Schema, _It) -> term().
+
+-callback next(It) -> {value, binary(), It} | none | {error, closed}.
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+-spec start_link(emqx_ds:shard()) -> {ok, pid()}.
+start_link(Shard) ->
+    gen_server:start_link(?REF(Shard), ?MODULE, [Shard], []).
+
+-spec create_generation(emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()) ->
+    {ok, gen_id()} | {error, nonmonotonic}.
+create_generation(Shard, Since, Config = {_Module, _Options}) ->
+    gen_server:call(?REF(Shard), {create_generation, Since, Config}).
+
+-spec store(
+    emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary()
+) ->
+    ok | {error, _}.
+store(Shard, GUID, Time, Topic, Msg) ->
+    {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time),
+    Mod:store(Data, GUID, Time, Topic, Msg).
+
+-spec make_iterator(emqx_ds:shard(), emqx_ds:replay()) ->
+    {ok, iterator()} | {error, _TODO}.
+make_iterator(Shard, Replay = {_, StartTime}) ->
+    {GenId, Gen} = meta_lookup_gen(Shard, StartTime),
+    open_iterator(Gen, #it{
+        shard = Shard,
+        gen = GenId,
+        replay = Replay
+    }).
+
+-spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}.
+next(It = #it{module = Mod, data = ItData}) ->
+    case Mod:next(ItData) of
+        {value, Val, ItDataNext} ->
+            {value, Val, It#it{data = ItDataNext}};
+        {error, _} = Error ->
+            Error;
+        none ->
+            case open_next_iterator(It) of
+                {ok, ItNext} ->
+                    next(ItNext);
+                {error, _} = Error ->
+                    Error;
+                none ->
+                    none
+            end
+    end.
+
+-spec preserve_iterator(iterator(), emqx_ds:replay_id()) ->
+    ok | {error, _TODO}.
+preserve_iterator(It = #it{}, ReplayID) ->
+    iterator_put_state(ReplayID, It).
+
+-spec restore_iterator(emqx_ds:shard(), emqx_ds:replay_id()) ->
+    {ok, iterator()} | {error, _TODO}.
+restore_iterator(Shard, ReplayID) ->
+    case iterator_get_state(Shard, ReplayID) of
+        {ok, Serial} ->
+            restore_iterator_state(Shard, Serial);
+        not_found ->
+            {error, not_found};
+        {error, _Reason} = Error ->
+            Error
+    end.
+
+-spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) ->
+    ok | {error, _TODO}.
+discard_iterator(Shard, ReplayID) ->
+    iterator_delete(Shard, ReplayID).
+
+%%================================================================================
+%% behaviour callbacks
+%%================================================================================
+
+init([Shard]) ->
+    process_flag(trap_exit, true),
+    {ok, S0} = open_db(Shard),
+    S = ensure_current_generation(S0),
+    ok = populate_metadata(S),
+    {ok, S}.
+
+handle_call({create_generation, Since, Config}, _From, S) ->
+    case create_new_gen(Since, Config, S) of
+        {ok, GenId, NS} ->
+            {reply, {ok, GenId}, NS};
+        {error, _} = Error ->
+            {reply, Error, S}
+    end;
+handle_call(_Call, _From, S) ->
+    {reply, {error, unknown_call}, S}.
+
+handle_cast(_Cast, S) ->
+    {noreply, S}.
+
+handle_info(_Info, S) ->
+    {noreply, S}.
+
+terminate(_Reason, #s{db = DB, shard = Shard}) ->
+    meta_erase(Shard),
+    ok = rocksdb:close(DB).
+
+%%================================================================================
+%% Internal functions
+%%================================================================================
+
+-record(db, {handle :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle()}).
+
+-spec populate_metadata(state()) -> ok.
+populate_metadata(S = #s{shard = Shard, db = DBHandle, cf_iterator = CFIterator}) ->
+    ok = meta_put(Shard, db, #db{handle = DBHandle, cf_iterator = CFIterator}),
+    Current = schema_get_current(DBHandle),
+    lists:foreach(fun(GenId) -> populate_metadata(GenId, S) end, lists:seq(0, Current)).
+
+-spec populate_metadata(gen_id(), state()) -> ok.
+populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) ->
+    Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S),
+    meta_register_gen(Shard, GenId, Gen).
+
+-spec ensure_current_generation(state()) -> state().
+ensure_current_generation(S = #s{shard = Shard, db = DBHandle}) ->
+    case schema_get_current(DBHandle) of
+        undefined ->
+            Config = emqx_ds_conf:shard_config(Shard),
+            {ok, _, NS} = create_new_gen(0, Config, S),
+            NS;
+        _GenId ->
+            S
+    end.
+
+-spec create_new_gen(emqx_ds:time(), emqx_ds_conf:backend_config(), state()) ->
+    {ok, gen_id(), state()} | {error, nonmonotonic}.
+create_new_gen(Since, Config, S = #s{shard = Shard, db = DBHandle}) ->
+    GenId = get_next_id(meta_get_current(Shard)),
+    GenId = get_next_id(schema_get_current(DBHandle)),
+    case is_gen_valid(Shard, GenId, Since) of
+        ok ->
+            {ok, Gen, NS} = create_gen(GenId, Since, Config, S),
+            %% TODO: Transaction? Column family creation can't be transactional, anyway.
+            ok = schema_put_gen(DBHandle, GenId, Gen),
+            ok = schema_put_current(DBHandle, GenId),
+            ok = meta_register_gen(Shard, GenId, open_gen(GenId, Gen, NS)),
+            {ok, GenId, NS};
+        {error, _} = Error ->
+            Error
+    end.
+
+-spec create_gen(gen_id(), emqx_ds:time(), emqx_ds_conf:backend_config(), state()) ->
+    {ok, generation(), state()}.
+create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations = CFs}) ->
+    % TODO: Backend implementation should ensure idempotency.
+    {Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options),
+    Gen = #{
+        module => Module,
+        data => Schema,
+        since => Since
+    },
+    {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}.
+
+-spec open_db(emqx_ds:shard()) -> {ok, state()} | {error, _TODO}.
+open_db(Shard) ->
+    Filename = binary_to_list(Shard),
+    DBOptions = [
+        {create_if_missing, true},
+        {create_missing_column_families, true}
+        | emqx_ds_conf:db_options()
+    ],
+    ExistingCFs =
+        case rocksdb:list_column_families(Filename, DBOptions) of
+            {ok, CFs} ->
+                [{Name, []} || Name <- CFs, Name /= ?DEFAULT_CF, Name /= ?ITERATOR_CF];
+            % DB is not present. First start
+            {error, {db_open, _}} ->
+                []
+        end,
+    ColumnFamilies = [
+        {?DEFAULT_CF, ?DEFAULT_CF_OPTS},
+        {?ITERATOR_CF, ?ITERATOR_CF_OPTS}
+        | ExistingCFs
+    ],
+    case rocksdb:open(Filename, DBOptions, ColumnFamilies) of
+        {ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} ->
+            {CFNames, _} = lists:unzip(ExistingCFs),
+            {ok, #s{
+                shard = Shard,
+                db = DBHandle,
+                cf_iterator = CFIterator,
+                cf_generations = lists:zip(CFNames, CFRefs)
+            }};
+        Error ->
+            Error
+    end.
+
+-spec open_gen(gen_id(), generation(), state()) -> generation().
+open_gen(
+    GenId,
+    Gen = #{module := Mod, data := Data},
+    #s{shard = Shard, db = DBHandle, cf_generations = CFs}
+) ->
+    DB = Mod:open(Shard, DBHandle, GenId, CFs, Data),
+    Gen#{data := DB}.
+
+-spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none.
+open_next_iterator(It = #it{shard = Shard, gen = GenId}) ->
+    open_next_iterator(meta_get_gen(Shard, GenId + 1), It#it{gen = GenId + 1}).
+
+open_next_iterator(undefined, _It) ->
+    none;
+open_next_iterator(Gen = #{}, It) ->
+    open_iterator(Gen, It).
+
+-spec open_iterator(generation(), iterator()) -> {ok, iterator()} | {error, _Reason}.
+open_iterator(#{module := Mod, data := Data}, It = #it{}) ->
+    case Mod:make_iterator(Data, It#it.replay) of
+        {ok, ItData} ->
+            {ok, It#it{module = Mod, data = ItData}};
+        Err ->
+            Err
+    end.
+
+-spec open_restore_iterator(generation(), iterator(), binary()) ->
+    {ok, iterator()} | {error, _Reason}.
+open_restore_iterator(#{module := Mod, data := Data}, It = #it{replay = Replay}, Serial) ->
+    case Mod:restore_iterator(Data, Replay, Serial) of
+        {ok, ItData} ->
+            {ok, It#it{module = Mod, data = ItData}};
+        Err ->
+            Err
+    end.
+
+%%
+
+-define(KEY_REPLAY_STATE(ReplayID), <<(ReplayID)/binary, "rs">>).
+
+-define(ITERATION_WRITE_OPTS, []).
+-define(ITERATION_READ_OPTS, []).
+
+iterator_get_state(Shard, ReplayID) ->
+    #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
+    rocksdb:get(Handle, CF, ?KEY_REPLAY_STATE(ReplayID), ?ITERATION_READ_OPTS).
+
+iterator_put_state(ID, It = #it{shard = Shard}) ->
+    #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
+    Serial = preserve_iterator_state(It),
+    rocksdb:put(Handle, CF, ?KEY_REPLAY_STATE(ID), Serial, ?ITERATION_WRITE_OPTS).
+
+iterator_delete(Shard, ID) ->
+    #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
+    rocksdb:delete(Handle, CF, ?KEY_REPLAY_STATE(ID), ?ITERATION_WRITE_OPTS).
+
+preserve_iterator_state(#it{
+    gen = Gen,
+    replay = {TopicFilter, StartTime},
+    module = Mod,
+    data = ItData
+}) ->
+    term_to_binary(#{
+        v => 1,
+        gen => Gen,
+        filter => TopicFilter,
+        start => StartTime,
+        st => Mod:preserve_iterator(ItData)
+    }).
+
+restore_iterator_state(Shard, Serial) when is_binary(Serial) ->
+    restore_iterator_state(Shard, binary_to_term(Serial));
+restore_iterator_state(
+    Shard,
+    #{
+        v := 1,
+        gen := Gen,
+        filter := TopicFilter,
+        start := StartTime,
+        st := State
+    }
+) ->
+    It = #it{shard = Shard, gen = Gen, replay = {TopicFilter, StartTime}},
+    open_restore_iterator(meta_get_gen(Shard, Gen), It, State).
+
+%% Functions for dealing with the metadata stored persistently in rocksdb
+
+-define(CURRENT_GEN, <<"current">>).
+-define(SCHEMA_WRITE_OPTS, []).
+-define(SCHEMA_READ_OPTS, []).
+
+-spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> generation().
+schema_get_gen(DBHandle, GenId) ->
+    {ok, Bin} = rocksdb:get(DBHandle, schema_gen_key(GenId), ?SCHEMA_READ_OPTS),
+    binary_to_term(Bin).
+
+-spec schema_put_gen(rocksdb:db_handle(), gen_id(), generation()) -> ok | {error, _}.
+schema_put_gen(DBHandle, GenId, Gen) ->
+    rocksdb:put(DBHandle, schema_gen_key(GenId), term_to_binary(Gen), ?SCHEMA_WRITE_OPTS).
+
+-spec schema_get_current(rocksdb:db_handle()) -> gen_id() | undefined.
+schema_get_current(DBHandle) ->
+    case rocksdb:get(DBHandle, ?CURRENT_GEN, ?SCHEMA_READ_OPTS) of
+        {ok, Bin} ->
+            binary_to_integer(Bin);
+        not_found ->
+            undefined
+    end.
+
+-spec schema_put_current(rocksdb:db_handle(), gen_id()) -> ok | {error, _}.
+schema_put_current(DBHandle, GenId) ->
+    rocksdb:put(DBHandle, ?CURRENT_GEN, integer_to_binary(GenId), ?SCHEMA_WRITE_OPTS).
+
+-spec schema_gen_key(integer()) -> binary().
+schema_gen_key(N) ->
+    <<"gen", N:32>>.
+
+-undef(CURRENT_GEN).
+-undef(SCHEMA_WRITE_OPTS).
+-undef(SCHEMA_READ_OPTS).
+
+%% Functions for dealing with the runtime shard metadata:
+
+-define(PERSISTENT_TERM(SHARD, GEN), {?MODULE, SHARD, GEN}).
+
+-spec meta_register_gen(emqx_ds:shard(), gen_id(), generation()) -> ok.
+meta_register_gen(Shard, GenId, Gen) ->
+    Gs =
+        case GenId > 0 of
+            true -> meta_lookup(Shard, GenId - 1);
+            false -> []
+        end,
+    ok = meta_put(Shard, GenId, [Gen | Gs]),
+    ok = meta_put(Shard, current, GenId).
+
+-spec meta_lookup_gen(emqx_ds:shard(), emqx_ds:time()) -> {gen_id(), generation()}.
+meta_lookup_gen(Shard, Time) ->
+    % TODO
+    % Is cheaper persistent term GC on update here worth extra lookup? I'm leaning
+    % towards a "no".
+    Current = meta_lookup(Shard, current),
+    Gens = meta_lookup(Shard, Current),
+    find_gen(Time, Current, Gens).
+
+find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since ->
+    {GenId, Gen};
+find_gen(Time, GenId, [_Gen | Rest]) ->
+    find_gen(Time, GenId - 1, Rest).
+
+-spec meta_get_gen(emqx_ds:shard(), gen_id()) -> generation() | undefined.
+meta_get_gen(Shard, GenId) ->
+    case meta_lookup(Shard, GenId, []) of
+        [Gen | _Older] -> Gen;
+        [] -> undefined
+    end.
+
+-spec meta_get_current(emqx_ds:shard()) -> gen_id() | undefined.
+meta_get_current(Shard) ->
+    meta_lookup(Shard, current, undefined).
+
+-spec meta_lookup(emqx_ds:shard(), _K) -> _V.
+meta_lookup(Shard, K) ->
+    persistent_term:get(?PERSISTENT_TERM(Shard, K)).
+
+-spec meta_lookup(emqx_ds:shard(), _K, Default) -> _V | Default.
+meta_lookup(Shard, K, Default) ->
+    persistent_term:get(?PERSISTENT_TERM(Shard, K), Default).
+
+-spec meta_put(emqx_ds:shard(), _K, _V) -> ok.
+meta_put(Shard, K, V) ->
+    persistent_term:put(?PERSISTENT_TERM(Shard, K), V).
+
+-spec meta_erase(emqx_ds:shard()) -> ok.
+meta_erase(Shard) ->
+    [
+        persistent_term:erase(K)
+     || {K = ?PERSISTENT_TERM(Z, _), _} <- persistent_term:get(), Z =:= Shard
+    ],
+    ok.
+
+-undef(PERSISTENT_TERM).
+
+get_next_id(undefined) -> 0;
+get_next_id(GenId) -> GenId + 1.
+
+is_gen_valid(Shard, GenId, Since) when GenId > 0 ->
+    [GenPrev | _] = meta_lookup(Shard, GenId - 1),
+    case GenPrev of
+        #{since := SincePrev} when Since > SincePrev ->
+            ok;
+        #{} ->
+            {error, nonmonotonic}
+    end;
+is_gen_valid(_Shard, 0, 0) ->
+    ok.
+
+%% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok.
+%% store_cfs(DBHandle, CFRefs) ->
+%%     lists:foreach(
+%%       fun({CFName, CFRef}) ->
+%%               persistent_term:put({self(), CFName}, {DBHandle, CFRef})
+%%       end,
+%%       CFRefs).

+ 62 - 0
apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl

@@ -0,0 +1,62 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_ds_storage_layer_sup).
+
+-behaviour(supervisor).
+
+%% API:
+-export([start_link/0, start_shard/1, stop_shard/1]).
+
+%% behaviour callbacks:
+-export([init/1]).
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+-define(SUP, ?MODULE).
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+-spec start_link() -> {ok, pid()}.
+start_link() ->
+    supervisor:start_link({local, ?SUP}, ?MODULE, []).
+
+-spec start_shard(emqx_ds:shard()) -> supervisor:startchild_ret().
+start_shard(Shard) ->
+    supervisor:start_child(?SUP, shard_child_spec(Shard)).
+
+-spec stop_shard(emqx_ds:shard()) -> ok | {error, _}.
+stop_shard(Shard) ->
+    ok = supervisor:terminate_child(?SUP, Shard),
+    ok = supervisor:delete_child(?SUP, Shard).
+
+%%================================================================================
+%% behaviour callbacks
+%%================================================================================
+
+init([]) ->
+    Children = [],
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 10,
+        period => 10
+    },
+    {ok, {SupFlags, Children}}.
+
+%%================================================================================
+%% Internal functions
+%%================================================================================
+
+-spec shard_child_spec(emqx_ds:shard()) -> supervisor:child_spec().
+shard_child_spec(Shard) ->
+    #{
+        id => Shard,
+        start => {emqx_ds_storage_layer, start_link, [Shard]},
+        shutdown => 5_000,
+        restart => permanent,
+        type => worker
+    }.

+ 52 - 0
apps/emqx_durable_storage/src/emqx_ds_sup.erl

@@ -0,0 +1,52 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_ds_sup).
+
+-behaviour(supervisor).
+
+%% API:
+-export([start_link/0]).
+
+%% behaviour callbacks:
+-export([init/1]).
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+-define(SUP, ?MODULE).
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+-spec start_link() -> {ok, pid()}.
+start_link() ->
+    supervisor:start_link({local, ?SUP}, ?MODULE, []).
+
+%%================================================================================
+%% behaviour callbacks
+%%================================================================================
+
+init([]) ->
+    Children = [shard_sup()],
+    SupFlags = #{
+        strategy => one_for_all,
+        intensity => 0,
+        period => 1
+    },
+    {ok, {SupFlags, Children}}.
+
+%%================================================================================
+%% Internal functions
+%%================================================================================
+
+shard_sup() ->
+    #{
+        id => local_store_shard_sup,
+        start => {emqx_ds_storage_layer_sup, start_link, []},
+        restart => permanent,
+        type => supervisor,
+        shutdown => infinity
+    }.

+ 11 - 0
apps/emqx_durable_storage/src/emqx_durable_storage.app.src

@@ -0,0 +1,11 @@
+%% -*- mode: erlang -*-
+{application, emqx_durable_storage, [
+    {description, "Message persistence and subscription replays for EMQX"},
+    % strict semver, bump manually!
+    {vsn, "0.1.0"},
+    {modules, []},
+    {registered, []},
+    {applications, [kernel, stdlib, rocksdb, gproc, mria]},
+    {mod, {emqx_ds_app, []}},
+    {env, []}
+]}.

+ 188 - 0
apps/emqx_durable_storage/test/emqx_ds_message_storage_bitmask_SUITE.erl

@@ -0,0 +1,188 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_ds_message_storage_bitmask_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("stdlib/include/assert.hrl").
+
+-import(emqx_ds_message_storage_bitmask, [
+    make_keymapper/1,
+    keymapper_info/1,
+    compute_topic_bitmask/2,
+    compute_time_bitmask/1,
+    compute_topic_seek/4
+]).
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+t_make_keymapper(_) ->
+    ?assertMatch(
+        #{
+            source := [
+                {timestamp, 9, 23},
+                {hash, level, 2},
+                {hash, level, 4},
+                {hash, levels, 8},
+                {timestamp, 0, 9}
+            ],
+            bitsize := 46,
+            epoch := 512
+        },
+        keymapper_info(
+            make_keymapper(#{
+                timestamp_bits => 32,
+                topic_bits_per_level => [2, 4, 8],
+                epoch => 1000
+            })
+        )
+    ).
+
+t_make_keymapper_single_hash_level(_) ->
+    ?assertMatch(
+        #{
+            source := [
+                {timestamp, 0, 32},
+                {hash, levels, 16}
+            ],
+            bitsize := 48,
+            epoch := 1
+        },
+        keymapper_info(
+            make_keymapper(#{
+                timestamp_bits => 32,
+                topic_bits_per_level => [16],
+                epoch => 1
+            })
+        )
+    ).
+
+t_make_keymapper_no_timestamp(_) ->
+    ?assertMatch(
+        #{
+            source := [
+                {hash, level, 4},
+                {hash, level, 8},
+                {hash, levels, 16}
+            ],
+            bitsize := 28,
+            epoch := 1
+        },
+        keymapper_info(
+            make_keymapper(#{
+                timestamp_bits => 0,
+                topic_bits_per_level => [4, 8, 16],
+                epoch => 42
+            })
+        )
+    ).
+
+t_compute_topic_bitmask(_) ->
+    KM = make_keymapper(#{topic_bits_per_level => [3, 4, 5, 2], timestamp_bits => 0, epoch => 1}),
+    ?assertEqual(
+        2#111_1111_11111_11,
+        compute_topic_bitmask([<<"foo">>, <<"bar">>], KM)
+    ),
+    ?assertEqual(
+        2#111_0000_11111_11,
+        compute_topic_bitmask([<<"foo">>, '+'], KM)
+    ),
+    ?assertEqual(
+        2#111_0000_00000_11,
+        compute_topic_bitmask([<<"foo">>, '+', '+'], KM)
+    ),
+    ?assertEqual(
+        2#111_0000_11111_00,
+        compute_topic_bitmask([<<"foo">>, '+', <<"bar">>, '+'], KM)
+    ).
+
+t_compute_topic_bitmask_wildcard(_) ->
+    KM = make_keymapper(#{topic_bits_per_level => [3, 4, 5, 2], timestamp_bits => 0, epoch => 1}),
+    ?assertEqual(
+        2#000_0000_00000_00,
+        compute_topic_bitmask(['#'], KM)
+    ),
+    ?assertEqual(
+        2#111_0000_00000_00,
+        compute_topic_bitmask([<<"foo">>, '#'], KM)
+    ),
+    ?assertEqual(
+        2#111_1111_11111_00,
+        compute_topic_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, '#'], KM)
+    ).
+
+t_compute_topic_bitmask_wildcard_long_tail(_) ->
+    KM = make_keymapper(#{topic_bits_per_level => [3, 4, 5, 2], timestamp_bits => 0, epoch => 1}),
+    ?assertEqual(
+        2#111_1111_11111_11,
+        compute_topic_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, <<>>, <<"xyzzy">>], KM)
+    ),
+    ?assertEqual(
+        2#111_1111_11111_00,
+        compute_topic_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, <<>>, '#'], KM)
+    ).
+
+t_compute_time_bitmask(_) ->
+    KM = make_keymapper(#{topic_bits_per_level => [1, 2, 3], timestamp_bits => 10, epoch => 200}),
+    ?assertEqual(2#111_000000_1111111, compute_time_bitmask(KM)).
+
+t_compute_time_bitmask_epoch_only(_) ->
+    KM = make_keymapper(#{topic_bits_per_level => [1, 2, 3], timestamp_bits => 10, epoch => 1}),
+    ?assertEqual(2#1111111111_000000, compute_time_bitmask(KM)).
+
+%% Filter = |123|***|678|***|
+%% Mask   = |123|***|678|***|
+%% Key1   = |123|011|108|121| → Seek = 0 |123|011|678|000|
+%% Key2   = |123|011|679|919| → Seek = 0 |123|012|678|000|
+%% Key3   = |123|999|679|001| → Seek = 1 |123|000|678|000| → eos
+%% Key4   = |125|011|179|017| → Seek = 1 |123|000|678|000| → eos
+
+t_compute_next_topic_seek(_) ->
+    KM = make_keymapper(#{topic_bits_per_level => [8, 8, 16, 12], timestamp_bits => 0, epoch => 1}),
+    ?assertMatch(
+        none,
+        compute_topic_seek(
+            16#FD_42_4242_043,
+            16#FD_42_4242_042,
+            16#FF_FF_FFFF_FFF,
+            KM
+        )
+    ),
+    ?assertMatch(
+        16#FD_11_0678_000,
+        compute_topic_seek(
+            16#FD_11_0108_121,
+            16#FD_00_0678_000,
+            16#FF_00_FFFF_000,
+            KM
+        )
+    ),
+    ?assertMatch(
+        16#FD_12_0678_000,
+        compute_topic_seek(
+            16#FD_11_0679_919,
+            16#FD_00_0678_000,
+            16#FF_00_FFFF_000,
+            KM
+        )
+    ),
+    ?assertMatch(
+        none,
+        compute_topic_seek(
+            16#FD_FF_0679_001,
+            16#FD_00_0678_000,
+            16#FF_00_FFFF_000,
+            KM
+        )
+    ),
+    ?assertMatch(
+        none,
+        compute_topic_seek(
+            16#FE_11_0179_017,
+            16#FD_00_0678_000,
+            16#FF_00_FFFF_000,
+            KM
+        )
+    ).

+ 276 - 0
apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl

@@ -0,0 +1,276 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_ds_storage_layer_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("stdlib/include/assert.hrl").
+
+-define(SHARD, shard(?FUNCTION_NAME)).
+
+-define(DEFAULT_CONFIG,
+    {emqx_ds_message_storage_bitmask, #{
+        timestamp_bits => 64,
+        topic_bits_per_level => [8, 8, 32, 16],
+        epoch => 5,
+        iteration => #{
+            iterator_refresh => {every, 5}
+        }
+    }}
+).
+
+-define(COMPACT_CONFIG,
+    {emqx_ds_message_storage_bitmask, #{
+        timestamp_bits => 16,
+        topic_bits_per_level => [16, 16],
+        epoch => 10
+    }}
+).
+
+%% Smoke test for opening and reopening the database
+t_open(_Config) ->
+    ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
+    {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD).
+
+%% Smoke test of store function
+t_store(_Config) ->
+    MessageID = emqx_guid:gen(),
+    PublishedAt = 1000,
+    Topic = [<<"foo">>, <<"bar">>],
+    Payload = <<"message">>,
+    ?assertMatch(ok, emqx_ds_storage_layer:store(?SHARD, MessageID, PublishedAt, Topic, Payload)).
+
+%% Smoke test for iteration through a concrete topic
+t_iterate(_Config) ->
+    %% Prepare data:
+    Topics = [[<<"foo">>, <<"bar">>], [<<"foo">>, <<"bar">>, <<"baz">>], [<<"a">>]],
+    Timestamps = lists:seq(1, 10),
+    [
+        emqx_ds_storage_layer:store(
+            ?SHARD,
+            emqx_guid:gen(),
+            PublishedAt,
+            Topic,
+            integer_to_binary(PublishedAt)
+        )
+     || Topic <- Topics, PublishedAt <- Timestamps
+    ],
+    %% Iterate through individual topics:
+    [
+        begin
+            {ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, {Topic, 0}),
+            Values = iterate(It),
+            ?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values)
+        end
+     || Topic <- Topics
+    ],
+    ok.
+
+%% Smoke test for iteration with wildcard topic filter
+t_iterate_wildcard(_Config) ->
+    %% Prepare data:
+    Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
+    Timestamps = lists:seq(1, 10),
+    _ = [
+        store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
+     || Topic <- Topics, PublishedAt <- Timestamps
+    ],
+    ?assertEqual(
+        lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- Timestamps]),
+        lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 0)])
+    ),
+    ?assertEqual(
+        [],
+        lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 10 + 1)])
+    ),
+    ?assertEqual(
+        lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- lists:seq(5, 10)]),
+        lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 5)])
+    ),
+    ?assertEqual(
+        lists:sort([
+            {Topic, PublishedAt}
+         || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
+        ]),
+        lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)])
+    ),
+    ?assertEqual(
+        lists:sort([{"foo/bar", PublishedAt} || PublishedAt <- Timestamps]),
+        lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+", 0)])
+    ),
+    ?assertEqual(
+        [],
+        lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+/bar", 0)])
+    ),
+    ?assertEqual(
+        lists:sort([
+            {Topic, PublishedAt}
+         || Topic <- ["foo/bar", "foo/bar/baz", "a/bar"], PublishedAt <- Timestamps
+        ]),
+        lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "+/bar/#", 0)])
+    ),
+    ?assertEqual(
+        lists:sort([{Topic, PublishedAt} || Topic <- ["a", "a/bar"], PublishedAt <- Timestamps]),
+        lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 0)])
+    ),
+    ?assertEqual(
+        [],
+        lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/+/+", 0)])
+    ),
+    ok.
+
+t_iterate_long_tail_wildcard(_Config) ->
+    Topic = "b/c/d/e/f/g",
+    TopicFilter = "b/c/d/e/+/+",
+    Timestamps = lists:seq(1, 100),
+    _ = [
+        store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
+     || PublishedAt <- Timestamps
+    ],
+    ?assertEqual(
+        lists:sort([{"b/c/d/e/f/g", PublishedAt} || PublishedAt <- lists:seq(50, 100)]),
+        lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, TopicFilter, 50)])
+    ).
+
+t_create_gen(_Config) ->
+    {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG),
+    ?assertEqual(
+        {error, nonmonotonic},
+        emqx_ds_storage_layer:create_generation(?SHARD, 1, ?DEFAULT_CONFIG)
+    ),
+    ?assertEqual(
+        {error, nonmonotonic},
+        emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG)
+    ),
+    {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
+    Topics = ["foo/bar", "foo/bar/baz"],
+    Timestamps = lists:seq(1, 100),
+    [
+        ?assertEqual(ok, store(?SHARD, PublishedAt, Topic, <<>>))
+     || Topic <- Topics, PublishedAt <- Timestamps
+    ].
+
+t_iterate_multigen(_Config) ->
+    {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
+    {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
+    {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG),
+    Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
+    Timestamps = lists:seq(1, 100),
+    _ = [
+        store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
+     || Topic <- Topics, PublishedAt <- Timestamps
+    ],
+    ?assertEqual(
+        lists:sort([
+            {Topic, PublishedAt}
+         || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
+        ]),
+        lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)])
+    ),
+    ?assertEqual(
+        lists:sort([
+            {Topic, PublishedAt}
+         || Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100)
+        ]),
+        lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 60)])
+    ).
+
+t_iterate_multigen_preserve_restore(_Config) ->
+    ReplayID = atom_to_binary(?FUNCTION_NAME),
+    {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
+    {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
+    {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 100, ?DEFAULT_CONFIG),
+    Topics = ["foo/bar", "foo/bar/baz", "a/bar"],
+    Timestamps = lists:seq(1, 100),
+    TopicFilter = "foo/#",
+    TopicsMatching = ["foo/bar", "foo/bar/baz"],
+    _ = [
+        store(?SHARD, TS, Topic, term_to_binary({Topic, TS}))
+     || Topic <- Topics, TS <- Timestamps
+    ],
+    It0 = iterator(?SHARD, TopicFilter, 0),
+    {It1, Res10} = iterate(It0, 10),
+    % preserve mid-generation
+    ok = emqx_ds_storage_layer:preserve_iterator(It1, ReplayID),
+    {ok, It2} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID),
+    {It3, Res100} = iterate(It2, 88),
+    % preserve on the generation boundary
+    ok = emqx_ds_storage_layer:preserve_iterator(It3, ReplayID),
+    {ok, It4} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID),
+    {It5, Res200} = iterate(It4, 1000),
+    ?assertEqual(none, It5),
+    ?assertEqual(
+        lists:sort([{Topic, TS} || Topic <- TopicsMatching, TS <- Timestamps]),
+        lists:sort([binary_to_term(Payload) || Payload <- Res10 ++ Res100 ++ Res200])
+    ),
+    ?assertEqual(
+        ok,
+        emqx_ds_storage_layer:discard_iterator(?SHARD, ReplayID)
+    ),
+    ?assertEqual(
+        {error, not_found},
+        emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID)
+    ).
+
+store(Shard, PublishedAt, Topic, Payload) ->
+    ID = emqx_guid:gen(),
+    emqx_ds_storage_layer:store(Shard, ID, PublishedAt, parse_topic(Topic), Payload).
+
+iterate(DB, TopicFilter, StartTime) ->
+    iterate(iterator(DB, TopicFilter, StartTime)).
+
+iterate(It) ->
+    case emqx_ds_storage_layer:next(It) of
+        {value, Payload, ItNext} ->
+            [Payload | iterate(ItNext)];
+        none ->
+            []
+    end.
+
+iterate(It, 0) ->
+    {It, []};
+iterate(It, N) ->
+    case emqx_ds_storage_layer:next(It) of
+        {value, Payload, ItNext} ->
+            {ItFinal, Ps} = iterate(ItNext, N - 1),
+            {ItFinal, [Payload | Ps]};
+        none ->
+            {none, []}
+    end.
+
+iterator(DB, TopicFilter, StartTime) ->
+    {ok, It} = emqx_ds_storage_layer:make_iterator(DB, {parse_topic(TopicFilter), StartTime}),
+    It.
+
+parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) ->
+    Topic;
+parse_topic(Topic) ->
+    emqx_topic:words(iolist_to_binary(Topic)).
+
+%% CT callbacks
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    {ok, _} = application:ensure_all_started(emqx_durable_storage),
+    Config.
+
+end_per_suite(_Config) ->
+    ok = application:stop(emqx_durable_storage).
+
+init_per_testcase(TC, Config) ->
+    ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG),
+    {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC)),
+    Config.
+
+end_per_testcase(TC, _Config) ->
+    ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)).
+
+shard(TC) ->
+    list_to_binary(lists:concat([?MODULE, "_", TC])).
+
+set_shard_config(Shard, Config) ->
+    ok = application:set_env(emqx_ds, shard_config, #{Shard => Config}).

+ 46 - 0
apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl

@@ -0,0 +1,46 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_message_storage_bitmask_shim).
+
+-export([open/0]).
+-export([close/1]).
+-export([store/5]).
+-export([iterate/2]).
+
+-type topic() :: list(binary()).
+-type time() :: integer().
+
+-opaque t() :: ets:tid().
+
+-spec open() -> t().
+open() ->
+    ets:new(?MODULE, [ordered_set, {keypos, 1}]).
+
+-spec close(t()) -> ok.
+close(Tab) ->
+    true = ets:delete(Tab),
+    ok.
+
+-spec store(t(), emqx_guid:guid(), time(), topic(), binary()) ->
+    ok | {error, _TODO}.
+store(Tab, MessageID, PublishedAt, Topic, Payload) ->
+    true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}),
+    ok.
+
+-spec iterate(t(), emqx_ds:replay()) ->
+    [binary()].
+iterate(Tab, {TopicFilter, StartTime}) ->
+    ets:foldr(
+        fun({{PublishedAt, _}, Topic, Payload}, Acc) ->
+            case emqx_topic:match(Topic, TopicFilter) of
+                true when PublishedAt >= StartTime ->
+                    [Payload | Acc];
+                _ ->
+                    Acc
+            end
+        end,
+        [],
+        Tab
+    ).

+ 377 - 0
apps/emqx_durable_storage/test/props/payload_gen.erl

@@ -0,0 +1,377 @@
+%% @doc This module provides lazy, composable producer streams that
+%% can be considered counterparts to Archiver's consumer pipes and
+%% therefore can facilitate testing
+%%
+%% Also it comes with an implementation of binary data stream which is
+%% able to produce sufficiently large amounts of plausibly
+%% pseudorandom binary payload in a deterministic way. It also
+%% contains routines to check binary blobs via sampling
+-module(payload_gen).
+
+-define(end_of_stream, []).
+
+-dialyzer(no_improper_lists).
+
+%% Generic stream API:
+-export([
+    interleave_streams/1,
+    retransmits/2,
+    next/1,
+    consume/2,
+    consume/1
+]).
+
+%% Binary payload generator API:
+-export([
+    interleave_chunks/2,
+    interleave_chunks/1,
+
+    mb/1,
+
+    generator_fun/2,
+    generate_chunks/3,
+    generate_chunk/2,
+    check_consistency/3,
+    check_file_consistency/3,
+    get_byte/2
+]).
+
+%% List to stream generator API:
+-export([list_to_stream/1]).
+
+%% Proper generators:
+-export([
+    binary_stream_gen/1,
+    interleaved_streams_gen/1,
+    interleaved_binary_gen/1,
+    interleaved_list_gen/1
+]).
+
+-export_type([payload/0, binary_payload/0]).
+
+-define(hash_size, 16).
+
+-include_lib("proper/include/proper.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-type payload() :: {Seed :: term(), Size :: integer()}.
+
+-type binary_payload() :: {
+    binary(), _ChunkNum :: non_neg_integer(), _ChunkCnt :: non_neg_integer()
+}.
+
+%% For performance reasons we treat regular lists as streams, see `next/1'
+-opaque cont(Data) ::
+    fun(() -> stream(Data))
+    | stream(Data).
+
+-type stream(Data) ::
+    maybe_improper_list(Data, cont(Data))
+    | ?end_of_stream.
+
+-type tagged_binstream() :: stream({Tag :: term(), Payload :: chunk_state()}).
+
+-record(chunk_state, {
+    seed :: term(),
+    payload_size :: non_neg_integer(),
+    offset :: non_neg_integer(),
+    chunk_size :: non_neg_integer()
+}).
+
+-opaque chunk_state() :: #chunk_state{}.
+
+-record(interleave_state, {streams :: [{Tag :: term(), Stream :: term()}]}).
+
+-opaque interleave_state() :: #interleave_state{}.
+
+%% =============================================================================
+%% API functions
+%% =============================================================================
+
+%% -----------------------------------------------------------------------------
+%% Proper generators
+%% -----------------------------------------------------------------------------
+
+%% @doc Proper generator that creates a binary stream
+-spec binary_stream_gen(_ChunkSize :: non_neg_integer()) -> proper_types:type().
+binary_stream_gen(ChunkSize) when ChunkSize rem ?hash_size =:= 0 ->
+    ?LET(
+        {Seed, Size},
+        {nat(), range(1, 16#100000)},
+        generate_chunk({Seed, Size}, ChunkSize)
+    ).
+
+%% @equiv interleaved_streams_gen(10, Type)
+-spec interleaved_streams_gen(proper_types:type()) -> proper_types:type().
+interleaved_streams_gen(Type) ->
+    interleaved_streams_gen(10, Type).
+
+%% @doc Proper generator that creates a term of type
+%% ```[{_Tag :: binary(), stream()}]''' that is ready to be fed
+%% into `interleave_streams/1' function
+-spec interleaved_streams_gen(non_neg_integer(), proper_types:type()) ->
+    proper_types:type().
+interleaved_streams_gen(MaxNStreams, StreamType) ->
+    ?LET(
+        NStreams,
+        range(1, MaxNStreams),
+        ?LET(
+            Streams,
+            vector(NStreams, StreamType),
+            begin
+                Tags = [<<I/integer>> || I <- lists:seq(1, length(Streams))],
+                lists:zip(Tags, Streams)
+            end
+        )
+    ).
+
+-spec interleaved_binary_gen(non_neg_integer()) -> proper_types:type().
+interleaved_binary_gen(ChunkSize) ->
+    interleaved_streams_gen(binary_stream_gen(ChunkSize)).
+
+-spec interleaved_list_gen(proper_types:type()) -> proper_types:type().
+interleaved_list_gen(Type) ->
+    interleaved_streams_gen(non_empty(list(Type))).
+
+%% -----------------------------------------------------------------------------
+%% Generic streams
+%% -----------------------------------------------------------------------------
+
+%% @doc Consume one element from the stream.
+-spec next(cont(A)) -> stream(A).
+next(Fun) when is_function(Fun, 0) ->
+    Fun();
+next(L) ->
+    L.
+
+%% @doc Take a list of tagged streams and return a stream where
+%% elements of the streams are tagged and randomly interleaved.
+%%
+%% Note: this function is more or less generic and it's compatible
+%% with this module's `generate_chunks' function family, as well as
+%% `ets:next', lists and what not
+%%
+%% Consider using simplified versions of this function
+-spec interleave_streams([{Tag, stream(Data)}]) -> stream({Tag, Data}).
+interleave_streams(Streams) ->
+    do_interleave_streams(
+        #interleave_state{streams = Streams}
+    ).
+
+%% @doc Take an arbitrary stream and add repetitions of the elements
+%% TODO: Make retransmissions of arbitrary length
+-spec retransmits(stream(Data), float()) -> stream(Data).
+retransmits(Stream, Probability) ->
+    case Stream of
+        [Data | Cont0] ->
+            Cont = fun() -> retransmits(next(Cont0), Probability) end,
+            case rand:uniform() < Probability of
+                true -> [Data, Data | Cont];
+                false -> [Data | Cont]
+            end;
+        ?end_of_stream ->
+            ?end_of_stream
+    end.
+
+%% @doc Consume all elements of the stream and feed them into a
+%% callback (e.g. brod:produce)
+-spec consume(
+    stream(A),
+    fun((A) -> Ret)
+) -> [Ret].
+consume(Stream, Callback) ->
+    case Stream of
+        [Data | Cont] -> [Callback(Data) | consume(next(Cont), Callback)];
+        ?end_of_stream -> []
+    end.
+
+%% @equiv consume(Stream, fun(A) -> A end)
+-spec consume(stream(A)) -> [A].
+consume(Stream) ->
+    consume(Stream, fun(A) -> A end).
+
+%% -----------------------------------------------------------------------------
+%% Misc functions
+%% -----------------------------------------------------------------------------
+
+%% @doc Return number of bytes in `N' megabytes
+-spec mb(integer()) -> integer().
+mb(N) ->
+    N * 1048576.
+
+%% -----------------------------------------------------------------------------
+%% List streams
+%% -----------------------------------------------------------------------------
+-spec list_to_stream([A]) -> stream(A).
+list_to_stream(L) -> L.
+
+%% -----------------------------------------------------------------------------
+%% Binary streams
+%% -----------------------------------------------------------------------------
+
+%% @doc First argument is a chunk number, the second one is a seed.
+%% This implementation is hardly efficient, but it was chosen for
+%% clarity reasons
+-spec generator_fun(integer(), binary()) -> binary().
+generator_fun(N, Seed) ->
+    crypto:hash(md5, <<N:32, Seed/binary>>).
+
+%% @doc Get byte at offset `N'
+-spec get_byte(integer(), term()) -> byte().
+get_byte(N, Seed) ->
+    do_get_byte(N, seed_hash(Seed)).
+
+%% @doc Stream of binary chunks. Limitation: both payload size and
+%% `ChunkSize' should be dividable by `?hash_size'
+-spec generate_chunk(payload(), integer()) -> stream(binary_payload()).
+generate_chunk({Seed, Size}, ChunkSize) when
+    ChunkSize rem ?hash_size =:= 0
+->
+    State = #chunk_state{
+        seed = Seed,
+        payload_size = Size,
+        chunk_size = ChunkSize,
+        offset = 0
+    },
+    generate_chunk(State).
+
+%% @doc Take a list of `payload()'s and a callback function, and start
+%% producing the payloads in random order. Seed is used as a tag
+%% @see interleave_streams/4
+-spec interleave_chunks([{payload(), ChunkSize :: non_neg_integer()}]) ->
+    tagged_binstream().
+interleave_chunks(Streams0) ->
+    Streams = [
+        {Tag, generate_chunk(Payload, ChunkSize)}
+     || {Payload = {Tag, _}, ChunkSize} <- Streams0
+    ],
+    interleave_streams(Streams).
+
+%% @doc Take a list of `payload()'s and a callback function, and start
+%% consuming the payloads in a random order. Seed is used as a
+%% tag. All streams use the same chunk size
+%% @see interleave_streams/2
+-spec interleave_chunks(
+    [payload()],
+    non_neg_integer()
+) -> tagged_binstream().
+interleave_chunks(Streams0, ChunkSize) ->
+    Streams = [
+        {Seed, generate_chunk({Seed, Size}, ChunkSize)}
+     || {Seed, Size} <- Streams0
+    ],
+    interleave_streams(Streams).
+
+%% @doc Generate chunks of data and feed them into
+%% `Callback'
+-spec generate_chunks(
+    payload(),
+    integer(),
+    fun((binary()) -> A)
+) -> [A].
+generate_chunks(Payload, ChunkSize, Callback) ->
+    consume(generate_chunk(Payload, ChunkSize), Callback).
+
+-spec check_consistency(
+    payload(),
+    integer(),
+    fun((integer()) -> {ok, binary()} | undefined)
+) -> ok.
+check_consistency({Seed, Size}, SampleSize, Callback) ->
+    SeedHash = seed_hash(Seed),
+    Random = [rand:uniform(Size) - 1 || _ <- lists:seq(1, SampleSize)],
+    %% Always check first and last bytes, and one that should not exist:
+    Samples = [0, Size - 1, Size | Random],
+    lists:foreach(
+        fun
+            (N) when N < Size ->
+                Expected = do_get_byte(N, SeedHash),
+                ?assertEqual(
+                    {N, {ok, Expected}},
+                    {N, Callback(N)}
+                );
+            (N) ->
+                ?assertMatch(undefined, Callback(N))
+        end,
+        Samples
+    ).
+
+-spec check_file_consistency(
+    payload(),
+    integer(),
+    file:filename()
+) -> ok.
+check_file_consistency(Payload, SampleSize, FileName) ->
+    {ok, FD} = file:open(FileName, [read, raw]),
+    try
+        Fun = fun(N) ->
+            case file:pread(FD, [{N, 1}]) of
+                {ok, [[X]]} -> {ok, X};
+                {ok, [eof]} -> undefined
+            end
+        end,
+        check_consistency(Payload, SampleSize, Fun)
+    after
+        file:close(FD)
+    end.
+
+%% =============================================================================
+%% Internal functions
+%% =============================================================================
+
+-spec do_interleave_streams(interleave_state()) -> stream(_Data).
+do_interleave_streams(#interleave_state{streams = []}) ->
+    ?end_of_stream;
+do_interleave_streams(#interleave_state{streams = Streams} = State0) ->
+    %% Not the most efficient implementation (lots of avoidable list
+    %% traversals), but we don't expect the number of streams to be the
+    %% bottleneck
+    N = rand:uniform(length(Streams)),
+    {Hd, [{Tag, SC} | Tl]} = lists:split(N - 1, Streams),
+    case SC of
+        [Payload | SC1] ->
+            State = State0#interleave_state{streams = Hd ++ [{Tag, next(SC1)} | Tl]},
+            Cont = fun() -> do_interleave_streams(State) end,
+            [{Tag, Payload} | Cont];
+        ?end_of_stream ->
+            State = State0#interleave_state{streams = Hd ++ Tl},
+            do_interleave_streams(State)
+    end.
+
+%% @doc Continue generating chunks
+-spec generate_chunk(chunk_state()) -> stream(binary()).
+generate_chunk(#chunk_state{offset = Offset, payload_size = Size}) when
+    Offset >= Size
+->
+    ?end_of_stream;
+generate_chunk(State0 = #chunk_state{offset = Offset, chunk_size = ChunkSize}) ->
+    State = State0#chunk_state{offset = Offset + ChunkSize},
+    Payload = generate_chunk(
+        State#chunk_state.seed,
+        Offset,
+        ChunkSize,
+        State#chunk_state.payload_size
+    ),
+    [Payload | fun() -> generate_chunk(State) end].
+
+generate_chunk(Seed, Offset, ChunkSize, Size) ->
+    SeedHash = seed_hash(Seed),
+    To = min(Offset + ChunkSize, Size) - 1,
+    Payload = iolist_to_binary([
+        generator_fun(I, SeedHash)
+     || I <- lists:seq(Offset div 16, To div 16)
+    ]),
+    ChunkNum = Offset div ChunkSize + 1,
+    ChunkCnt = ceil(Size / ChunkSize),
+    {Payload, ChunkNum, ChunkCnt}.
+
+%% @doc Hash any term
+-spec seed_hash(term()) -> binary().
+seed_hash(Seed) ->
+    crypto:hash(md5, term_to_binary(Seed)).
+
+%% @private Get byte at offset `N'
+-spec do_get_byte(integer(), binary()) -> byte().
+do_get_byte(N, Seed) ->
+    Chunk = generator_fun(N div ?hash_size, Seed),
+    binary:at(Chunk, N rem ?hash_size).

+ 464 - 0
apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl

@@ -0,0 +1,464 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(prop_replay_message_storage).
+
+-include_lib("proper/include/proper.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-define(WORK_DIR, ["_build", "test"]).
+-define(RUN_ID, {?MODULE, testrun_id}).
+
+-define(ZONE, ?MODULE).
+-define(GEN_ID, 42).
+
+%%--------------------------------------------------------------------
+%% Properties
+%%--------------------------------------------------------------------
+
+prop_bitstring_computes() ->
+    ?FORALL(
+        Keymapper,
+        keymapper(),
+        ?FORALL({Topic, Timestamp}, {topic(), integer()}, begin
+            BS = emqx_ds_message_storage_bitmask:compute_bitstring(Topic, Timestamp, Keymapper),
+            is_integer(BS) andalso (BS < (1 bsl get_keymapper_bitsize(Keymapper)))
+        end)
+    ).
+
+prop_topic_bitmask_computes() ->
+    Keymapper = make_keymapper(16, [8, 12, 16], 100),
+    ?FORALL(TopicFilter, topic_filter(), begin
+        Mask = emqx_ds_message_storage_bitmask:compute_topic_bitmask(TopicFilter, Keymapper),
+        % topic bits + timestamp LSBs
+        is_integer(Mask) andalso (Mask < (1 bsl (36 + 6)))
+    end).
+
+prop_next_seek_monotonic() ->
+    ?FORALL(
+        {TopicFilter, StartTime, Keymapper},
+        {topic_filter(), pos_integer(), keymapper()},
+        begin
+            Filter = emqx_ds_message_storage_bitmask:make_keyspace_filter(
+                {TopicFilter, StartTime},
+                Keymapper
+            ),
+            ?FORALL(
+                Bitstring,
+                bitstr(get_keymapper_bitsize(Keymapper)),
+                emqx_ds_message_storage_bitmask:compute_next_seek(Bitstring, Filter) >= Bitstring
+            )
+        end
+    ).
+
+prop_next_seek_eq_initial_seek() ->
+    ?FORALL(
+        Filter,
+        keyspace_filter(),
+        emqx_ds_message_storage_bitmask:compute_initial_seek(Filter) =:=
+            emqx_ds_message_storage_bitmask:compute_next_seek(0, Filter)
+    ).
+
+prop_iterate_messages() ->
+    TBPL = [4, 8, 12],
+    Options = #{
+        timestamp_bits => 32,
+        topic_bits_per_level => TBPL,
+        epoch => 200
+    },
+    % TODO
+    % Shrinking is too unpredictable and leaves a LOT of garbage in the scratch dit.
+    ?FORALL(Stream, noshrink(non_empty(messages(topic(TBPL)))), begin
+        Filepath = make_filepath(?FUNCTION_NAME, erlang:system_time(microsecond)),
+        {DB, Handle} = open_db(Filepath, Options),
+        Shim = emqx_ds_message_storage_bitmask_shim:open(),
+        ok = store_db(DB, Stream),
+        ok = store_shim(Shim, Stream),
+        ?FORALL(
+            {
+                {Topic, _},
+                Pattern,
+                StartTime
+            },
+            {
+                nth(Stream),
+                topic_filter_pattern(),
+                start_time()
+            },
+            begin
+                TopicFilter = make_topic_filter(Pattern, Topic),
+                Iteration = {TopicFilter, StartTime},
+                Messages = iterate_db(DB, Iteration),
+                Reference = iterate_shim(Shim, Iteration),
+                ok = close_db(Handle),
+                ok = emqx_ds_message_storage_bitmask_shim:close(Shim),
+                ?WHENFAIL(
+                    begin
+                        io:format(user, " *** Filepath = ~s~n", [Filepath]),
+                        io:format(user, " *** TopicFilter = ~p~n", [TopicFilter]),
+                        io:format(user, " *** StartTime = ~p~n", [StartTime])
+                    end,
+                    is_list(Messages) andalso equals(Messages -- Reference, Reference -- Messages)
+                )
+            end
+        )
+    end).
+
+prop_iterate_eq_iterate_with_preserve_restore() ->
+    TBPL = [4, 8, 16, 12],
+    Options = #{
+        timestamp_bits => 32,
+        topic_bits_per_level => TBPL,
+        epoch => 500
+    },
+    {DB, _Handle} = open_db(make_filepath(?FUNCTION_NAME), Options),
+    ?FORALL(Stream, non_empty(messages(topic(TBPL))), begin
+        % TODO
+        % This proptest is impure because messages from testruns assumed to be
+        % independent of each other are accumulated in the same storage. This
+        % would probably confuse shrinker in the event a testrun fails.
+        ok = store_db(DB, Stream),
+        ?FORALL(
+            {
+                {Topic, _},
+                Pat,
+                StartTime,
+                Commands
+            },
+            {
+                nth(Stream),
+                topic_filter_pattern(),
+                start_time(),
+                shuffled(flat([non_empty(list({preserve, restore})), list(iterate)]))
+            },
+            begin
+                Replay = {make_topic_filter(Pat, Topic), StartTime},
+                Iterator = make_iterator(DB, Replay),
+                Ctx = #{db => DB, replay => Replay},
+                Messages = run_iterator_commands(Commands, Iterator, Ctx),
+                equals(Messages, iterate_db(DB, Replay))
+            end
+        )
+    end).
+
+prop_iterate_eq_iterate_with_refresh() ->
+    TBPL = [4, 8, 16, 12],
+    Options = #{
+        timestamp_bits => 32,
+        topic_bits_per_level => TBPL,
+        epoch => 500
+    },
+    {DB, _Handle} = open_db(make_filepath(?FUNCTION_NAME), Options),
+    ?FORALL(Stream, non_empty(messages(topic(TBPL))), begin
+        % TODO
+        % This proptest is also impure, see above.
+        ok = store_db(DB, Stream),
+        ?FORALL(
+            {
+                {Topic, _},
+                Pat,
+                StartTime,
+                RefreshEvery
+            },
+            {
+                nth(Stream),
+                topic_filter_pattern(),
+                start_time(),
+                pos_integer()
+            },
+            ?TIMEOUT(5000, begin
+                Replay = {make_topic_filter(Pat, Topic), StartTime},
+                IterationOptions = #{iterator_refresh => {every, RefreshEvery}},
+                Iterator = make_iterator(DB, Replay, IterationOptions),
+                Messages = iterate_db(Iterator),
+                equals(Messages, iterate_db(DB, Replay))
+            end)
+        )
+    end).
+
+% store_message_stream(DB, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) ->
+%     MessageID = emqx_guid:gen(),
+%     PublishedAt = ChunkNum,
+%         MessageID, PublishedAt, Topic
+%     ]),
+%     ok = emqx_ds_message_storage_bitmask:store(DB, MessageID, PublishedAt, Topic, Payload),
+%     store_message_stream(DB, payload_gen:next(Rest));
+% store_message_stream(_Zone, []) ->
+%     ok.
+
+store_db(DB, Messages) ->
+    lists:foreach(
+        fun({Topic, Payload = {MessageID, Timestamp, _}}) ->
+            Bin = term_to_binary(Payload),
+            emqx_ds_message_storage_bitmask:store(DB, MessageID, Timestamp, Topic, Bin)
+        end,
+        Messages
+    ).
+
+iterate_db(DB, Iteration) ->
+    iterate_db(make_iterator(DB, Iteration)).
+
+iterate_db(It) ->
+    case emqx_ds_message_storage_bitmask:next(It) of
+        {value, Payload, ItNext} ->
+            [binary_to_term(Payload) | iterate_db(ItNext)];
+        none ->
+            []
+    end.
+
+make_iterator(DB, Replay) ->
+    {ok, It} = emqx_ds_message_storage_bitmask:make_iterator(DB, Replay),
+    It.
+
+make_iterator(DB, Replay, Options) ->
+    {ok, It} = emqx_ds_message_storage_bitmask:make_iterator(DB, Replay, Options),
+    It.
+
+run_iterator_commands([iterate | Rest], It, Ctx) ->
+    case emqx_ds_message_storage_bitmask:next(It) of
+        {value, Payload, ItNext} ->
+            [binary_to_term(Payload) | run_iterator_commands(Rest, ItNext, Ctx)];
+        none ->
+            []
+    end;
+run_iterator_commands([{preserve, restore} | Rest], It, Ctx) ->
+    #{
+        db := DB,
+        replay := Replay
+    } = Ctx,
+    Serial = emqx_ds_message_storage_bitmask:preserve_iterator(It),
+    {ok, ItNext} = emqx_ds_message_storage_bitmask:restore_iterator(DB, Replay, Serial),
+    run_iterator_commands(Rest, ItNext, Ctx);
+run_iterator_commands([], It, _Ctx) ->
+    iterate_db(It).
+
+store_shim(Shim, Messages) ->
+    lists:foreach(
+        fun({Topic, Payload = {MessageID, Timestamp, _}}) ->
+            Bin = term_to_binary(Payload),
+            emqx_ds_message_storage_bitmask_shim:store(Shim, MessageID, Timestamp, Topic, Bin)
+        end,
+        Messages
+    ).
+
+iterate_shim(Shim, Iteration) ->
+    lists:map(
+        fun binary_to_term/1,
+        emqx_ds_message_storage_bitmask_shim:iterate(Shim, Iteration)
+    ).
+
+%%--------------------------------------------------------------------
+%% Setup / teardown
+%%--------------------------------------------------------------------
+
+open_db(Filepath, Options) ->
+    {ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]),
+    {Schema, CFRefs} = emqx_ds_message_storage_bitmask:create_new(Handle, ?GEN_ID, Options),
+    DB = emqx_ds_message_storage_bitmask:open(?ZONE, Handle, ?GEN_ID, CFRefs, Schema),
+    {DB, Handle}.
+
+close_db(Handle) ->
+    rocksdb:close(Handle).
+
+make_filepath(TC) ->
+    make_filepath(TC, 0).
+
+make_filepath(TC, InstID) ->
+    Name = io_lib:format("~0p.~0p", [TC, InstID]),
+    Path = filename:join(?WORK_DIR ++ ["proper", "runs", get_run_id(), ?MODULE_STRING, Name]),
+    ok = filelib:ensure_dir(Path),
+    Path.
+
+get_run_id() ->
+    case persistent_term:get(?RUN_ID, undefined) of
+        RunID when RunID /= undefined ->
+            RunID;
+        undefined ->
+            RunID = make_run_id(),
+            ok = persistent_term:put(?RUN_ID, RunID),
+            RunID
+    end.
+
+make_run_id() ->
+    calendar:system_time_to_rfc3339(erlang:system_time(second), [{offset, "Z"}]).
+
+%%--------------------------------------------------------------------
+%% Type generators
+%%--------------------------------------------------------------------
+
+topic() ->
+    non_empty(list(topic_level())).
+
+topic(EntropyWeights) ->
+    ?LET(L, scaled(1 / 4, list(1)), begin
+        EWs = lists:sublist(EntropyWeights ++ L, length(L)),
+        ?SIZED(S, [oneof([topic_level(S * EW), topic_level_fixed()]) || EW <- EWs])
+    end).
+
+topic_filter() ->
+    ?SUCHTHAT(
+        L,
+        non_empty(
+            list(
+                frequency([
+                    {5, topic_level()},
+                    {2, '+'},
+                    {1, '#'}
+                ])
+            )
+        ),
+        not lists:member('#', L) orelse lists:last(L) == '#'
+    ).
+
+topic_level_pattern() ->
+    frequency([
+        {5, level},
+        {2, '+'},
+        {1, '#'}
+    ]).
+
+topic_filter_pattern() ->
+    list(topic_level_pattern()).
+
+topic_filter(Topic) ->
+    ?LET({T, Pat}, {Topic, topic_filter_pattern()}, make_topic_filter(Pat, T)).
+
+make_topic_filter([], _) ->
+    [];
+make_topic_filter(_, []) ->
+    [];
+make_topic_filter(['#' | _], _) ->
+    ['#'];
+make_topic_filter(['+' | Rest], [_ | Levels]) ->
+    ['+' | make_topic_filter(Rest, Levels)];
+make_topic_filter([level | Rest], [L | Levels]) ->
+    [L | make_topic_filter(Rest, Levels)].
+
+% topic() ->
+%     ?LAZY(?SIZED(S, frequency([
+%         {S, [topic_level() | topic()]},
+%         {1, []}
+%     ]))).
+
+% topic_filter() ->
+%     ?LAZY(?SIZED(S, frequency([
+%         {round(S / 3 * 2), [topic_level() | topic_filter()]},
+%         {round(S / 3 * 1), ['+' | topic_filter()]},
+%         {1, []},
+%         {1, ['#']}
+%     ]))).
+
+topic_level() ->
+    ?LET(L, list(oneof([range($a, $z), range($0, $9)])), iolist_to_binary(L)).
+
+topic_level(Entropy) ->
+    S = floor(1 + math:log2(Entropy) / 4),
+    ?LET(I, range(1, Entropy), iolist_to_binary(io_lib:format("~*.16.0B", [S, I]))).
+
+topic_level_fixed() ->
+    oneof([
+        <<"foo">>,
+        <<"bar">>,
+        <<"baz">>,
+        <<"xyzzy">>
+    ]).
+
+keymapper() ->
+    ?LET(
+        {TimestampBits, TopicBits, Epoch},
+        {
+            range(0, 128),
+            non_empty(list(range(1, 32))),
+            pos_integer()
+        },
+        make_keymapper(TimestampBits, TopicBits, Epoch * 100)
+    ).
+
+keyspace_filter() ->
+    ?LET(
+        {TopicFilter, StartTime, Keymapper},
+        {topic_filter(), pos_integer(), keymapper()},
+        emqx_ds_message_storage_bitmask:make_keyspace_filter({TopicFilter, StartTime}, Keymapper)
+    ).
+
+messages(Topic) ->
+    ?LET(
+        Ts,
+        list(Topic),
+        interleaved(
+            ?LET(Messages, vector(length(Ts), scaled(4, list(message()))), lists:zip(Ts, Messages))
+        )
+    ).
+
+message() ->
+    ?LET({Timestamp, Payload}, {timestamp(), binary()}, {emqx_guid:gen(), Timestamp, Payload}).
+
+message_streams(Topic) ->
+    ?LET(Topics, list(Topic), [{T, payload_gen:binary_stream_gen(64)} || T <- Topics]).
+
+timestamp() ->
+    scaled(20, pos_integer()).
+
+start_time() ->
+    scaled(10, pos_integer()).
+
+bitstr(Size) ->
+    ?LET(B, binary(1 + (Size div 8)), binary:decode_unsigned(B) band (1 bsl Size - 1)).
+
+nth(L) ->
+    ?LET(I, range(1, length(L)), lists:nth(I, L)).
+
+scaled(Factor, T) ->
+    ?SIZED(S, resize(ceil(S * Factor), T)).
+
+interleaved(T) ->
+    ?LET({L, Seed}, {T, integer()}, interleave(L, rand:seed_s(exsss, Seed))).
+
+shuffled(T) ->
+    ?LET({L, Seed}, {T, integer()}, shuffle(L, rand:seed_s(exsss, Seed))).
+
+flat(T) ->
+    ?LET(L, T, lists:flatten(L)).
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+make_keymapper(TimestampBits, TopicBits, MaxEpoch) ->
+    emqx_ds_message_storage_bitmask:make_keymapper(#{
+        timestamp_bits => TimestampBits,
+        topic_bits_per_level => TopicBits,
+        epoch => MaxEpoch
+    }).
+
+get_keymapper_bitsize(Keymapper) ->
+    maps:get(bitsize, emqx_ds_message_storage_bitmask:keymapper_info(Keymapper)).
+
+-spec interleave(list({Tag, list(E)}), rand:state()) -> list({Tag, E}).
+interleave(Seqs, Rng) ->
+    interleave(Seqs, length(Seqs), Rng).
+
+interleave(Seqs, L, Rng) when L > 0 ->
+    {N, RngNext} = rand:uniform_s(L, Rng),
+    {SeqHead, SeqTail} = lists:split(N - 1, Seqs),
+    case SeqTail of
+        [{Tag, [M | Rest]} | SeqRest] ->
+            [{Tag, M} | interleave(SeqHead ++ [{Tag, Rest} | SeqRest], L, RngNext)];
+        [{_, []} | SeqRest] ->
+            interleave(SeqHead ++ SeqRest, L - 1, RngNext)
+    end;
+interleave([], 0, _) ->
+    [].
+
+-spec shuffle(list(E), rand:state()) -> list(E).
+shuffle(L, Rng) ->
+    {Rands, _} = randoms(length(L), Rng),
+    [E || {_, E} <- lists:sort(lists:zip(Rands, L))].
+
+randoms(N, Rng) when N > 0 ->
+    {Rand, RngNext} = rand:uniform_s(Rng),
+    {Tail, RngFinal} = randoms(N - 1, RngNext),
+    {[Rand | Tail], RngFinal};
+randoms(_, Rng) ->
+    {[], Rng}.