Просмотр исходного кода

feat(ds): Implement ratchet function for bitmask keymapper

ieQu1 2 лет назад
Родитель
Сommit
ef46c09caf

+ 4 - 1
apps/emqx/src/emqx_persistent_message.erl

@@ -40,7 +40,10 @@
 
 init() ->
     ?WHEN_ENABLED(begin
-        ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{}),
+        ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{
+            backend => builtin,
+            storage => {emqx_ds_storage_bitfield_lts, #{}}
+        }),
         ok = emqx_persistent_session_ds_router:init_tables(),
         ok = emqx_persistent_session_ds:create_tables(),
         ok

+ 36 - 0
apps/emqx_durable_storage/src/emqx_ds_bitmask.hrl

@@ -0,0 +1,36 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-ifndef(EMQX_DS_BITMASK_HRL).
+-define(EMQX_DS_BITMASK_HRL, true).
+
+-record(filter_scan_action, {
+    offset :: emqx_ds_bitmask_keymapper:offset(),
+    size :: emqx_ds_bitmask_keymapper:bitsize(),
+    min :: non_neg_integer(),
+    max :: non_neg_integer()
+}).
+
+-record(filter, {
+    size :: non_neg_integer(),
+    bitfilter :: non_neg_integer(),
+    bitmask :: non_neg_integer(),
+    %% Ranges (in _bitsource_ basis):
+    bitsource_ranges :: array:array(#filter_scan_action{}),
+    range_min :: non_neg_integer(),
+    range_max :: non_neg_integer()
+}).
+
+-endif.

+ 289 - 300
apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl

@@ -86,9 +86,12 @@
     bin_vector_to_key/2,
     key_to_vector/2,
     bin_key_to_vector/2,
-    next_range/3,
     key_to_bitstring/2,
     bitstring_to_key/2,
+    make_filter/2,
+    ratchet/2,
+    bin_increment/2,
+    bin_checkmask/2,
     bitsize/1
 ]).
 
@@ -149,6 +152,10 @@
 
 -type scalar_range() :: any | {'=', scalar() | infinity} | {'>=', scalar()}.
 
+-include("emqx_ds_bitmask.hrl").
+
+-type filter() :: #filter{}.
+
 %%================================================================================
 %% API functions
 %%================================================================================
@@ -237,36 +244,6 @@ bin_key_to_vector(Keymapper = #keymapper{dim_sizeof = DimSizeof, size = Size}, B
         lists:zip(Vector, DimSizeof)
     ).
 
-%% @doc Given a keymapper, a filter, and a key, return a triple containing:
-%%
-%% 1. `NextKey', a key that is greater than the given one, and is
-%% within the given range.
-%%
-%% 2. `Bitmask'
-%%
-%% 3. `Bitfilter'
-%%
-%% Bitmask and bitfilter can be used to verify that key any K is in
-%% the range using the following inequality:
-%%
-%% K >= NextKey && (K band Bitmask) =:= Bitfilter.
-%%
-%% ...or `undefined' if the next key is outside the range.
--spec next_range(keymapper(), [scalar_range()], key()) -> {key(), integer(), integer()} | undefined.
-next_range(Keymapper, Filter0, PrevKey) ->
-    %% Key -> Vector -> +1 on vector -> Key
-    Filter = desugar_filter(Keymapper, Filter0),
-    PrevVec = key_to_vector(Keymapper, PrevKey),
-    case inc_vector(Filter, PrevVec) of
-        overflow ->
-            undefined;
-        NextVec ->
-            NewKey = vector_to_key(Keymapper, NextVec),
-            Bitmask = make_bitmask(Keymapper, Filter),
-            Bitfilter = NewKey band Bitmask,
-            {NewKey, Bitmask, Bitfilter}
-    end.
-
 -spec bitstring_to_key(keymapper(), bitstring()) -> key().
 bitstring_to_key(#keymapper{size = Size}, Bin) ->
     case Bin of
@@ -280,60 +257,208 @@ bitstring_to_key(#keymapper{size = Size}, Bin) ->
 key_to_bitstring(#keymapper{size = Size}, Key) ->
     <<Key:Size>>.
 
-%%================================================================================
-%% Internal functions
-%%================================================================================
-
--spec make_bitmask(keymapper(), [{non_neg_integer(), non_neg_integer()}]) -> non_neg_integer().
-make_bitmask(Keymapper = #keymapper{dim_sizeof = DimSizeof}, Ranges) ->
-    BitmaskVector = lists:map(
+%% @doc Create a filter object that facilitates range scans.
+-spec make_filter(keymapper(), [scalar_range()]) -> filter().
+make_filter(KeyMapper = #keymapper{schema = Schema, dim_sizeof = DimSizeof, size = Size}, Filter0) ->
+    NDim = length(DimSizeof),
+    %% Transform "symbolic" inequations to ranges:
+    Filter1 = inequations_to_ranges(KeyMapper, Filter0),
+    {Bitmask, Bitfilter} = make_bitfilter(KeyMapper, Filter1),
+    %% Calculate maximum source offset as per bitsource specification:
+    MaxOffset = lists:foldl(
+        fun({Dim, Offset, _Size}, Acc) ->
+            maps:update_with(Dim, fun(OldVal) -> max(OldVal, Offset) end, 0, Acc)
+        end,
+        #{},
+        Schema
+    ),
+    %% Adjust minimum and maximum values for each interval like this:
+    %%
+    %% Min: 110100|101011 -> 110100|00000
+    %% Max: 110101|001011 -> 110101|11111
+    %%            ^
+    %%            |
+    %%       max offset
+    %%
+    %% This is needed so when we increment the vector, we always scan
+    %% the full range of least significant bits.
+    Filter2 = lists:map(
         fun
-            ({{N, N}, Bits}) ->
-                %% For strict equality we can employ bitmask:
-                ones(Bits);
-            (_) ->
-                0
+            ({{Val, Val}, _Dim}) ->
+                {Val, Val};
+            ({{Min0, Max0}, Dim}) ->
+                Offset = maps:get(Dim, MaxOffset, 0),
+                %% Set least significant bits of Min to 0:
+                Min = (Min0 bsr Offset) bsl Offset,
+                %% Set least significant bits of Max to 1:
+                Max = Max0 bor ones(Offset),
+                {Min, Max}
         end,
-        lists:zip(Ranges, DimSizeof)
+        lists:zip(Filter1, lists:seq(1, NDim))
+    ),
+    %% Project the vector into "bitsource coordinate system":
+    {_, Filter} = fold_bitsources(
+        fun(DstOffset, {Dim, SrcOffset, Size}, Acc) ->
+            {Min0, Max0} = lists:nth(Dim, Filter2),
+            Min = (Min0 bsr SrcOffset) band ones(Size),
+            Max = (Max0 bsr SrcOffset) band ones(Size),
+            Action = #filter_scan_action{
+                offset = DstOffset,
+                size = Size,
+                min = Min,
+                max = Max
+            },
+            [Action | Acc]
+        end,
+        [],
+        Schema
     ),
-    vector_to_key(Keymapper, BitmaskVector).
-
--spec inc_vector([{non_neg_integer(), non_neg_integer()}], vector()) -> vector() | overflow.
-inc_vector(Filter, Vec0) ->
-    case normalize_vector(Filter, Vec0) of
-        {true, Vec} ->
-            Vec;
-        {false, Vec} ->
-            do_inc_vector(Filter, Vec, [])
+    Ranges = array:from_list(lists:reverse(Filter)),
+    %% Compute estimated upper and lower bounds of a _continous_
+    %% interval where all keys lie:
+    case Filter of
+        [] ->
+            RangeMin = 0,
+            RangeMax = 0;
+        [#filter_scan_action{offset = MSBOffset, min = MSBMin, max = MSBMax} | _] ->
+            RangeMin = MSBMin bsl MSBOffset,
+            RangeMax = MSBMax bsl MSBOffset bor ones(MSBOffset)
+    end,
+    %% Final value
+    #filter{
+        size = Size,
+        bitmask = Bitmask,
+        bitfilter = Bitfilter,
+        bitsource_ranges = Ranges,
+        range_min = RangeMin,
+        range_max = RangeMax
+    }.
+
+-spec ratchet(filter(), key()) -> key() | overflow.
+ratchet(#filter{bitsource_ranges = Ranges, range_max = Max}, Key) when Key =< Max ->
+    NDim = array:size(Ranges),
+    case ratchet_scan(Ranges, NDim, Key, 0, _Pivot = {-1, 0}, _Carry = 0) of
+        overflow ->
+            overflow;
+        {Pivot, Increment} ->
+            ratchet_do(Ranges, Key, NDim - 1, Pivot, Increment)
+    end;
+ratchet(_, _) ->
+    overflow.
+
+-spec bin_increment(filter(), binary()) -> binary() | overflow.
+bin_increment(Filter = #filter{size = Size}, <<>>) ->
+    Key = ratchet(Filter, 0),
+    <<Key:Size>>;
+bin_increment(Filter = #filter{size = Size, bitmask = Bitmask, bitfilter = Bitfilter}, KeyBin) ->
+    <<Key0:Size>> = KeyBin,
+    Key1 = Key0 + 1,
+    if
+        Key1 band Bitmask =:= Bitfilter ->
+            %% TODO: check overflow
+            <<Key1:Size>>;
+        true ->
+            case ratchet(Filter, Key1) of
+                overflow ->
+                    overflow;
+                Key ->
+                    <<Key:Size>>
+            end
     end.
 
-do_inc_vector([], [], _Acc) ->
+-spec bin_checkmask(filter(), binary()) -> boolean().
+bin_checkmask(#filter{size = Size, bitmask = Bitmask, bitfilter = Bitfilter}, Key) ->
+    case Key of
+        <<Int:Size>> ->
+            Int band Bitmask =:= Bitfilter;
+        _ ->
+            false
+    end.
+
+%%================================================================================
+%% Internal functions
+%%================================================================================
+
+%% Note: this function operates in bitsource basis, scanning it from 0
+%% to NDim (i.e. from the least significant bits to the most
+%% significant bits)
+ratchet_scan(_Ranges, NDim, _Key, NDim, Pivot, 0) ->
+    %% We've reached the end:
+    Pivot;
+ratchet_scan(_Ranges, NDim, _Key, NDim, _Pivot, 1) ->
+    %% We've reached the end, but key is still not large enough:
     overflow;
-do_inc_vector([{Min, Max} | Intervals], [Elem | Vec], Acc) ->
-    case Elem of
-        Max ->
-            do_inc_vector(Intervals, Vec, [Min | Acc]);
-        _ when Elem < Max ->
-            lists:reverse(Acc) ++ [Elem + 1 | Vec]
+ratchet_scan(Ranges, NDim, Key, I, Pivot0, Carry) ->
+    #filter_scan_action{offset = Offset, size = Size, min = Min, max = Max} = array:get(I, Ranges),
+    %% Extract I-th element of the vector from the original key:
+    Elem = ((Key bsr Offset) band ones(Size)) + Carry,
+    if
+        Elem < Min ->
+            %% I-th coordinate is less than the specified minimum.
+            %%
+            %% We reset this coordinate to the minimum value. It means
+            %% we incremented this bitposition, the less significant
+            %% bits have to be reset to their respective minimum
+            %% values:
+            Pivot = {I + 1, 0},
+            ratchet_scan(Ranges, NDim, Key, I + 1, Pivot, 0);
+        Elem > Max ->
+            %% I-th coordinate is larger than the specified
+            %% minimum. We can only fix this problem by incrementing
+            %% the next coordinate (i.e. more significant bits).
+            %%
+            %% We reset this coordinate to the minimum value, and
+            %% increment the next coordinate (by setting `Carry' to
+            %% 1).
+            Pivot = {I + 1, 1},
+            ratchet_scan(Ranges, NDim, Key, I + 1, Pivot, 1);
+        true ->
+            %% Coordinate is within range:
+            ratchet_scan(Ranges, NDim, Key, I + 1, Pivot0, 0)
     end.
 
-normalize_vector(Intervals, Vec0) ->
-    Vec = lists:map(
+%% Note: this function operates in bitsource basis, scanning it from
+%% NDim to 0. It applies the transformation specified by
+%% `ratchet_scan'.
+ratchet_do(Ranges, Key, I, _Pivot, _Increment) when I < 0 ->
+    0;
+ratchet_do(Ranges, Key, I, Pivot, Increment) ->
+    #filter_scan_action{offset = Offset, size = Size, min = Min} = array:get(I, Ranges),
+    Mask = ones(Offset + Size) bxor ones(Offset),
+    Elem =
+        if
+            I > Pivot ->
+                Mask band Key;
+            I =:= Pivot ->
+                (Mask band Key) + (Increment bsl Offset);
+            true ->
+                Min bsl Offset
+        end,
+    %% erlang:display(
+    %%     {ratchet_do, I, integer_to_list(Key, 16), integer_to_list(Mask, 2),
+    %%         integer_to_list(Elem, 16)}
+    %% ),
+    Elem bor ratchet_do(Ranges, Key, I - 1, Pivot, Increment).
+
+-spec make_bitfilter(keymapper(), [{non_neg_integer(), non_neg_integer()}]) ->
+    {non_neg_integer(), non_neg_integer()}.
+make_bitfilter(Keymapper = #keymapper{dim_sizeof = DimSizeof}, Ranges) ->
+    L = lists:map(
         fun
-            ({{Min, _Max}, Elem}) when Min > Elem ->
-                Min;
-            ({{_Min, Max}, Elem}) when Max < Elem ->
-                Max;
-            ({_, Elem}) ->
-                Elem
+            ({{N, N}, Bits}) ->
+                %% For strict equality we can employ bitmask:
+                {ones(Bits), N};
+            (_) ->
+                {0, 0}
         end,
-        lists:zip(Intervals, Vec0)
+        lists:zip(Ranges, DimSizeof)
     ),
-    {Vec > Vec0, Vec}.
+    {Bitmask, Bitfilter} = lists:unzip(L),
+    {vector_to_key(Keymapper, Bitmask), vector_to_key(Keymapper, Bitfilter)}.
 
 %% Transform inequalities into a list of closed intervals that the
 %% vector elements should lie in.
-desugar_filter(#keymapper{dim_sizeof = DimSizeof}, Filter) ->
+inequations_to_ranges(#keymapper{dim_sizeof = DimSizeof}, Filter) ->
     lists:map(
         fun
             ({any, Bitsize}) ->
@@ -390,24 +515,6 @@ ones(Bits) ->
 
 -ifdef(TEST).
 
-%% %% Create a bitmask that is sufficient to cover a given number. E.g.:
-%% %%
-%% %% 2#1000 -> 2#1111; 2#0 -> 2#0; 2#10101 -> 2#11111
-%% bitmask_of(N) ->
-%%     %% FIXME: avoid floats
-%%     NBits = ceil(math:log2(N + 1)),
-%%     ones(NBits).
-
-%% bitmask_of_test() ->
-%%     ?assertEqual(2#0, bitmask_of(0)),
-%%     ?assertEqual(2#1, bitmask_of(1)),
-%%     ?assertEqual(2#11, bitmask_of(2#10)),
-%%     ?assertEqual(2#11, bitmask_of(2#11)),
-%%     ?assertEqual(2#1111, bitmask_of(2#1000)),
-%%     ?assertEqual(2#1111, bitmask_of(2#1111)),
-%%     ?assertEqual(ones(128), bitmask_of(ones(128))),
-%%     ?assertEqual(ones(256), bitmask_of(ones(256))).
-
 make_keymapper0_test() ->
     Schema = [],
     ?assertEqual(
@@ -510,235 +617,117 @@ key_to_vector2_test() ->
     key2vec(Schema, [0, 1]),
     key2vec(Schema, [255, 0]).
 
-inc_vector0_test() ->
-    Keymapper = make_keymapper([]),
-    ?assertMatch(overflow, incvec(Keymapper, [], [])).
-
-inc_vector1_test() ->
-    Keymapper = make_keymapper([{1, 0, 8}]),
-    ?assertMatch([3], incvec(Keymapper, [{'=', 3}], [1])),
-    ?assertMatch([3], incvec(Keymapper, [{'=', 3}], [2])),
-    ?assertMatch(overflow, incvec(Keymapper, [{'=', 3}], [3])),
-    ?assertMatch(overflow, incvec(Keymapper, [{'=', 3}], [4])),
-    ?assertMatch(overflow, incvec(Keymapper, [{'=', 3}], [255])),
-    %% Now with >=:
-    ?assertMatch([1], incvec(Keymapper, [{'>=', 0}], [0])),
-    ?assertMatch([255], incvec(Keymapper, [{'>=', 0}], [254])),
-    ?assertMatch(overflow, incvec(Keymapper, [{'>=', 0}], [255])),
-
-    ?assertMatch([100], incvec(Keymapper, [{'>=', 100}], [0])),
-    ?assertMatch([100], incvec(Keymapper, [{'>=', 100}], [99])),
-    ?assertMatch([255], incvec(Keymapper, [{'>=', 100}], [254])),
-    ?assertMatch(overflow, incvec(Keymapper, [{'>=', 100}], [255])).
-
-inc_vector2_test() ->
-    Keymapper = make_keymapper([{1, 0, 8}, {2, 0, 8}, {3, 0, 8}]),
-    Filter = [{'>=', 0}, {'=', 100}, {'>=', 30}],
-    ?assertMatch([0, 100, 30], incvec(Keymapper, Filter, [0, 0, 0])),
-    ?assertMatch([1, 100, 30], incvec(Keymapper, Filter, [0, 100, 30])),
-    ?assertMatch([255, 100, 30], incvec(Keymapper, Filter, [254, 100, 30])),
-    ?assertMatch([0, 100, 31], incvec(Keymapper, Filter, [255, 100, 30])),
-    ?assertMatch([0, 100, 30], incvec(Keymapper, Filter, [0, 100, 29])),
-    ?assertMatch(overflow, incvec(Keymapper, Filter, [255, 100, 255])),
-    ?assertMatch([255, 100, 255], incvec(Keymapper, Filter, [254, 100, 255])),
-    ?assertMatch([0, 100, 255], incvec(Keymapper, Filter, [255, 100, 254])),
-    %% Nasty cases (shouldn't happen, hopefully):
-    ?assertMatch([1, 100, 30], incvec(Keymapper, Filter, [0, 101, 0])),
-    ?assertMatch([1, 100, 33], incvec(Keymapper, Filter, [0, 101, 33])),
-    ?assertMatch([0, 100, 255], incvec(Keymapper, Filter, [255, 101, 254])),
-    ?assertMatch(overflow, incvec(Keymapper, Filter, [255, 101, 255])).
-
 make_bitmask0_test() ->
     Keymapper = make_keymapper([]),
-    ?assertMatch(0, mkbmask(Keymapper, [])).
+    ?assertMatch({0, 0}, mkbmask(Keymapper, [])).
 
 make_bitmask1_test() ->
     Keymapper = make_keymapper([{1, 0, 8}]),
-    ?assertEqual(0, mkbmask(Keymapper, [any])),
-    ?assertEqual(16#ff, mkbmask(Keymapper, [{'=', 1}])),
-    ?assertEqual(16#ff, mkbmask(Keymapper, [{'=', 255}])),
-    ?assertEqual(0, mkbmask(Keymapper, [{'>=', 0}])),
-    ?assertEqual(0, mkbmask(Keymapper, [{'>=', 1}])),
-    ?assertEqual(0, mkbmask(Keymapper, [{'>=', 16#f}])).
+    ?assertEqual({0, 0}, mkbmask(Keymapper, [any])),
+    ?assertEqual({16#ff, 1}, mkbmask(Keymapper, [{'=', 1}])),
+    ?assertEqual({16#ff, 255}, mkbmask(Keymapper, [{'=', 255}])),
+    ?assertEqual({0, 0}, mkbmask(Keymapper, [{'>=', 0}])),
+    ?assertEqual({0, 0}, mkbmask(Keymapper, [{'>=', 1}])),
+    ?assertEqual({0, 0}, mkbmask(Keymapper, [{'>=', 16#f}])).
 
 make_bitmask2_test() ->
     Keymapper = make_keymapper([{1, 0, 3}, {2, 0, 4}, {3, 0, 2}]),
-    ?assertEqual(2#00_0000_000, mkbmask(Keymapper, [any, any, any])),
-    ?assertEqual(2#11_0000_000, mkbmask(Keymapper, [any, any, {'=', 0}])),
-    ?assertEqual(2#00_1111_000, mkbmask(Keymapper, [any, {'=', 0}, any])),
-    ?assertEqual(2#00_0000_111, mkbmask(Keymapper, [{'=', 0}, any, any])).
+    ?assertEqual({2#00_0000_000, 2#00_0000_000}, mkbmask(Keymapper, [any, any, any])),
+    ?assertEqual({2#11_0000_000, 2#00_0000_000}, mkbmask(Keymapper, [any, any, {'=', 0}])),
+    ?assertEqual({2#00_1111_000, 2#00_0000_000}, mkbmask(Keymapper, [any, {'=', 0}, any])),
+    ?assertEqual({2#00_0000_111, 2#00_0000_000}, mkbmask(Keymapper, [{'=', 0}, any, any])).
 
 make_bitmask3_test() ->
     %% Key format of type |TimeOffset|Topic|Epoch|:
-    Keymapper = make_keymapper([{1, 8, 8}, {2, 0, 8}, {1, 0, 8}]),
-    ?assertEqual(2#00000000_00000000_00000000, mkbmask(Keymapper, [any, any])),
-    ?assertEqual(2#11111111_11111111_11111111, mkbmask(Keymapper, [{'=', 33}, {'=', 22}])),
-    ?assertEqual(2#11111111_11111111_11111111, mkbmask(Keymapper, [{'=', 33}, {'=', 22}])),
-    ?assertEqual(2#00000000_11111111_00000000, mkbmask(Keymapper, [{'>=', 255}, {'=', 22}])).
+    Keymapper = make_keymapper([{1, 0, 8}, {2, 0, 8}, {1, 8, 8}]),
+    ?assertEqual({2#00000000_00000000_00000000, 16#00_00_00}, mkbmask(Keymapper, [any, any])),
+    ?assertEqual(
+        {2#11111111_11111111_11111111, 16#aa_cc_bb},
+        mkbmask(Keymapper, [{'=', 16#aabb}, {'=', 16#cc}])
+    ),
+    ?assertEqual(
+        {2#00000000_11111111_00000000, 16#00_bb_00}, mkbmask(Keymapper, [{'>=', 255}, {'=', 16#bb}])
+    ).
 
-next_range0_test() ->
-    Keymapper = make_keymapper([]),
+make_filter_test() ->
+    KeyMapper = make_keymapper([]),
     Filter = [],
-    PrevKey = 0,
-    ?assertMatch(undefined, next_range(Keymapper, Filter, PrevKey)).
-
-next_range1_test() ->
-    Keymapper = make_keymapper([{1, 0, 8}, {2, 0, 8}]),
-    ?assertMatch(undefined, next_range(Keymapper, [{'=', 0}, {'=', 0}], 0)),
-    ?assertMatch({1, 16#ffff, 1}, next_range(Keymapper, [{'=', 1}, {'=', 0}], 0)),
-    ?assertMatch({16#100, 16#ffff, 16#100}, next_range(Keymapper, [{'=', 0}, {'=', 1}], 0)),
-    %% Now with any:
-    ?assertMatch({1, 0, 0}, next_range(Keymapper, [any, any], 0)),
-    ?assertMatch({2, 0, 0}, next_range(Keymapper, [any, any], 1)),
-    ?assertMatch({16#fffb, 0, 0}, next_range(Keymapper, [any, any], 16#fffa)),
-    %% Now with >=:
-    ?assertMatch(
-        {16#42_30, 16#ff00, 16#42_00}, next_range(Keymapper, [{'>=', 16#30}, {'=', 16#42}], 0)
-    ),
-    ?assertMatch(
-        {16#42_31, 16#ff00, 16#42_00},
-        next_range(Keymapper, [{'>=', 16#30}, {'=', 16#42}], 16#42_30)
-    ),
+    ?assertMatch(#filter{size = 0, bitmask = 0, bitfilter = 0}, make_filter(KeyMapper, Filter)).
 
+ratchet1_test() ->
+    Bitsources = [{1, 0, 8}],
+    M = make_keymapper(Bitsources),
+    F = make_filter(M, [any]),
+    #filter{bitsource_ranges = Rarr} = F,
     ?assertMatch(
-        {16#30_42, 16#00ff, 16#00_42}, next_range(Keymapper, [{'=', 16#42}, {'>=', 16#30}], 0)
+        [
+            #filter_scan_action{
+                offset = 0,
+                size = 8,
+                min = 0,
+                max = 16#ff
+            }
+        ],
+        array:to_list(Rarr)
     ),
-    ?assertMatch(
-        {16#31_42, 16#00ff, 16#00_42},
-        next_range(Keymapper, [{'=', 16#42}, {'>=', 16#30}], 16#00_43)
-    ).
-
-%% Bunch of tests that verifying that next_range doesn't skip over keys:
-
--define(assertIterComplete(A, B),
-    ?assertEqual(A -- [0], B)
-).
-
--define(assertSameSet(A, B),
-    ?assertIterComplete(lists:sort(A), lists:sort(B))
-).
-
-iterate1_test() ->
-    SizeX = 3,
-    SizeY = 3,
-    Keymapper = make_keymapper([{1, 0, SizeX}, {2, 0, SizeY}]),
-    Keys = test_iteration(Keymapper, [any, any]),
-    Expected = [
-        X bor (Y bsl SizeX)
-     || Y <- lists:seq(0, ones(SizeY)), X <- lists:seq(0, ones(SizeX))
-    ],
-    ?assertIterComplete(Expected, Keys).
-
-iterate2_test() ->
-    SizeX = 64,
-    SizeY = 3,
-    Keymapper = make_keymapper([{1, 0, SizeX}, {2, 0, SizeY}]),
-    X = 123456789,
-    Keys = test_iteration(Keymapper, [{'=', X}, any]),
-    Expected = [
-        X bor (Y bsl SizeX)
-     || Y <- lists:seq(0, ones(SizeY))
-    ],
-    ?assertIterComplete(Expected, Keys).
-
-iterate3_test() ->
-    SizeX = 3,
-    SizeY = 64,
-    Y = 42,
-    Keymapper = make_keymapper([{1, 0, SizeX}, {2, 0, SizeY}]),
-    Keys = test_iteration(Keymapper, [any, {'=', Y}]),
-    Expected = [
-        X bor (Y bsl SizeX)
-     || X <- lists:seq(0, ones(SizeX))
-    ],
-    ?assertIterComplete(Expected, Keys).
-
-iterate4_test() ->
-    SizeX = 8,
-    SizeY = 4,
-    MinX = 16#fa,
-    MinY = 16#a,
-    Keymapper = make_keymapper([{1, 0, SizeX}, {2, 0, SizeY}]),
-    Keys = test_iteration(Keymapper, [{'>=', MinX}, {'>=', MinY}]),
-    Expected = [
-        X bor (Y bsl SizeX)
-     || Y <- lists:seq(MinY, ones(SizeY)), X <- lists:seq(MinX, ones(SizeX))
-    ],
-    ?assertIterComplete(Expected, Keys).
-
-iterate1_prop() ->
-    Size = 4,
-    ?FORALL(
-        {SizeX, SizeY},
-        {integer(1, Size), integer(1, Size)},
-        ?FORALL(
-            {SplitX, MinX, MinY},
-            {integer(0, SizeX), integer(0, SizeX), integer(0, SizeY)},
-            begin
-                Keymapper = make_keymapper([
-                    {1, 0, SplitX}, {2, 0, SizeY}, {1, SplitX, SizeX - SplitX}
-                ]),
-                Keys = test_iteration(Keymapper, [{'>=', MinX}, {'>=', MinY}]),
-                Expected = [
-                    vector_to_key(Keymapper, [X, Y])
-                 || X <- lists:seq(MinX, ones(SizeX)),
-                    Y <- lists:seq(MinY, ones(SizeY))
-                ],
-                ?assertSameSet(Expected, Keys),
-                true
-            end
-        )
-    ).
-
-iterate5_test() ->
-    ?assert(proper:quickcheck(iterate1_prop(), 100)).
-
-iterate2_prop() ->
-    Size = 4,
-    ?FORALL(
-        {SizeX, SizeY},
-        {integer(1, Size), integer(1, Size)},
-        ?FORALL(
-            {SplitX, MinX, MinY},
-            {integer(0, SizeX), integer(0, SizeX), integer(0, SizeY)},
-            begin
-                Keymapper = make_keymapper([
-                    {1, SplitX, SizeX - SplitX}, {2, 0, SizeY}, {1, 0, SplitX}
-                ]),
-                Keys = test_iteration(Keymapper, [{'>=', MinX}, {'>=', MinY}]),
-                Expected = [
-                    vector_to_key(Keymapper, [X, Y])
-                 || X <- lists:seq(MinX, ones(SizeX)),
-                    Y <- lists:seq(MinY, ones(SizeY))
-                ],
-                ?assertSameSet(Expected, Keys),
-                true
-            end
-        )
-    ).
-
-iterate6_test() ->
-    ?assert(proper:quickcheck(iterate2_prop(), 1000)).
-
-test_iteration(Keymapper, Filter) ->
-    test_iteration(Keymapper, Filter, 0).
-
-test_iteration(Keymapper, Filter, PrevKey) ->
-    case next_range(Keymapper, Filter, PrevKey) of
-        undefined ->
-            [];
-        {Key, Bitmask, Bitfilter} ->
-            ?assert((Key band Bitmask) =:= Bitfilter),
-            [Key | test_iteration(Keymapper, Filter, Key)]
-    end.
+    ?assertEqual(0, ratchet(F, 0)),
+    ?assertEqual(16#fa, ratchet(F, 16#fa)),
+    ?assertEqual(16#ff, ratchet(F, 16#ff)),
+    ?assertEqual(overflow, ratchet(F, 16#100), "TBD: filter must store the upper bound").
+
+%% erlfmt-ignore
+ratchet2_test() ->
+    Bitsources = [{1, 0, 8},  %% Static topic index
+                  {2, 8, 8},  %% Epoch
+                  {3, 0, 8},  %% Varying topic hash
+                  {2, 0, 8}], %% Timestamp offset
+    M = make_keymapper(lists:reverse(Bitsources)),
+    F1 = make_filter(M, [{'=', 16#aa}, any, {'=', 16#cc}]),
+    ?assertEqual(16#aa00cc00, ratchet(F1, 0)),
+    ?assertEqual(16#aa01cc00, ratchet(F1, 16#aa00cd00)),
+    ?assertEqual(16#aa01cc11, ratchet(F1, 16#aa01cc11)),
+    ?assertEqual(16#aa11cc00, ratchet(F1, 16#aa10cd00)),
+    ?assertEqual(16#aa11cc00, ratchet(F1, 16#aa10dc11)),
+    ?assertEqual(overflow,    ratchet(F1, 16#ab000000)),
+    F2 = make_filter(M, [{'=', 16#aa}, {'>=', 16#dddd}, {'=', 16#cc}]),
+    ?assertEqual(16#aaddcc00, ratchet(F2, 0)),
+    ?assertEqual(16#aa_de_cc_00, ratchet(F2, 16#aa_dd_cd_11)).
+
+ratchet3_test() ->
+    ?assert(proper:quickcheck(ratchet1_prop(), 100)).
+
+%% erlfmt-ignore
+ratchet1_prop() ->
+    EpochBits = 4,
+    Bitsources = [{1, 0, 2},  %% Static topic index
+                  {2, EpochBits, 4},  %% Epoch
+                  {3, 0, 2},  %% Varying topic hash
+                  {2, 0, EpochBits}], %% Timestamp offset
+    M = make_keymapper(lists:reverse(Bitsources)),
+    F1 = make_filter(M, [{'=', 2#10}, any, {'=', 2#01}]),
+    ?FORALL(N, integer(0, ones(12)),
+             ratchet_prop(F1, N)).
+
+ratchet_prop(Filter = #filter{bitfilter = Bitfilter, bitmask = Bitmask, size = Size}, Key0) ->
+    Key = ratchet(Filter, Key0),
+    ?assert(Key =:= overflow orelse (Key band Bitmask =:= Bitfilter)),
+    ?assert(Key >= Key0, {Key, '>=', Key}),
+    IMax = ones(Size),
+    CheckGaps = fun
+        F(I) when I >= Key; I > IMax ->
+            true;
+        F(I) ->
+            ?assertNot(
+                I band Bitmask =:= Bitfilter,
+                {found_gap, Key0, I, Key}
+            ),
+            F(I + 1)
+    end,
+    CheckGaps(Key0).
 
 mkbmask(Keymapper, Filter0) ->
-    Filter = desugar_filter(Keymapper, Filter0),
-    make_bitmask(Keymapper, Filter).
-
-incvec(Keymapper, Filter0, Vector) ->
-    Filter = desugar_filter(Keymapper, Filter0),
-    inc_vector(Filter, Vector).
+    Filter = inequations_to_ranges(Keymapper, Filter0),
+    make_bitfilter(Keymapper, Filter).
 
 key2vec(Schema, Vector) ->
     Keymapper = make_keymapper(Schema),

+ 73 - 114
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -30,7 +30,7 @@
 -export([create/4, open/5, store_batch/4, get_streams/4, make_iterator/5, next/4]).
 
 %% internal exports:
--export([]).
+-export([format_key/2, format_keyfilter/1]).
 
 -export_type([options/0]).
 
@@ -73,8 +73,7 @@
     topic_filter :: emqx_ds:topic_filter(),
     start_time :: emqx_ds:time(),
     storage_key :: emqx_ds_lts:msg_storage_key(),
-    last_seen_key = 0 :: emqx_ds_bitmask_keymapper:key(),
-    key_filter :: [emqx_ds_bitmask_keymapper:scalar_range()]
+    last_seen_key = <<>> :: binary()
 }).
 
 -define(QUICKCHECK_KEY(KEY, BITMASK, BITFILTER),
@@ -83,6 +82,8 @@
 
 -define(COUNTER, emqx_ds_storage_bitfield_lts_counter).
 
+-include("emqx_ds_bitmask.hrl").
+
 %%================================================================================
 %% API funcions
 %%================================================================================
@@ -95,7 +96,8 @@ create(_ShardId, DBHandle, GenId, Options) ->
     %% Get options:
     BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64),
     TopicIndexBytes = maps:get(topic_index_bytes, Options, 4),
-    TSOffsetBits = maps:get(epoch_bits, Options, 8), %% TODO: change to 10 to make it around ~1 sec
+    %% 10 bits -> 1024 ms -> ~1 sec
+    TSOffsetBits = maps:get(epoch_bits, Options, 10),
     %% Create column families:
     DataCFName = data_cf(GenId),
     TrieCFName = trie_cf(GenId),
@@ -120,17 +122,17 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
     {_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs),
     {_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs),
     Trie = restore_trie(TopicIndexBytes, DBHandle, TrieCF),
-    %% If user's topics have more than learned 10 wildcard levels,
-    %% then it's total carnage; learned topic structure won't help
-    %% much:
+    %% If user's topics have more than learned 10 wildcard levels
+    %% (more than 2, really), then it's total carnage; learned topic
+    %% structure won't help.
     MaxWildcardLevels = 10,
-    Keymappers = array:from_list(
+    KeymapperCache = array:from_list(
         [
             make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N)
          || N <- lists:seq(0, MaxWildcardLevels)
         ]
     ),
-    #s{db = DBHandle, data = DataCF, trie = Trie, keymappers = Keymappers}.
+    #s{db = DBHandle, data = DataCF, trie = Trie, keymappers = KeymapperCache}.
 
 store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
     lists:foreach(
@@ -144,16 +146,26 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
 
 get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
     Indexes = emqx_ds_lts:match_topics(Trie, TopicFilter),
-    [
-        #stream{
-            storage_key = I
-        }
-     || I <- Indexes
-    ].
+    [#stream{storage_key = I} || I <- Indexes].
 
 make_iterator(_Shard, _Data, #stream{storage_key = StorageKey}, TopicFilter, StartTime) ->
+    %% Note: it's a good idea to keep the iterator structure lean,
+    %% since it can be stored on a remote node that could update its
+    %% code independently from us.
+    {ok, #it{
+        topic_filter = TopicFilter,
+        start_time = StartTime,
+        storage_key = StorageKey
+    }}.
+
+next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) ->
+    #it{
+        start_time = StartTime,
+        storage_key = StorageKey
+    } = It0,
+    %% Make filter:
     {TopicIndex, Varying} = StorageKey,
-    Filter = [
+    Inequations = [
         {'=', TopicIndex},
         {'>=', StartTime}
         | lists:map(
@@ -166,29 +178,22 @@ make_iterator(_Shard, _Data, #stream{storage_key = StorageKey}, TopicFilter, Sta
             Varying
         )
     ],
-    {ok, #it{
-        topic_filter = TopicFilter,
-        start_time = StartTime,
-        storage_key = StorageKey,
-        key_filter = Filter
-    }}.
-
-next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) ->
-    #it{
-        key_filter = KeyFilter
-    } = It0,
-    % TODO: ugh, so ugly
-    NVarying = length(KeyFilter) - 2,
+    %% Obtain a keymapper for the current number of varying
+    %% levels. Magic constant 2: we have two extra dimensions of topic
+    %% index and time; the rest of dimensions are varying levels.
+    NVarying = length(Inequations) - 2,
     Keymapper = array:get(NVarying, Keymappers),
-    %% Calculate lower and upper bounds for iteration:
-    LowerBound = lower_bound(Keymapper, KeyFilter),
-    UpperBound = upper_bound(Keymapper, KeyFilter),
+    Filter =
+        #filter{range_min = LowerBound, range_max = UpperBound} = emqx_ds_bitmask_keymapper:make_filter(
+            Keymapper, Inequations
+        ),
     {ok, ITHandle} = rocksdb:iterator(DB, CF, [
-        {iterate_lower_bound, LowerBound}, {iterate_upper_bound, UpperBound}
+        {iterate_lower_bound, emqx_ds_bitmask_keymapper:key_to_bitstring(Keymapper, LowerBound)},
+        {iterate_upper_bound, emqx_ds_bitmask_keymapper:key_to_bitstring(Keymapper, UpperBound)}
     ]),
     try
         put(?COUNTER, 0),
-        next_loop(ITHandle, Keymapper, It0, [], BatchSize)
+        next_loop(ITHandle, Keymapper, Filter, It0, [], BatchSize)
     after
         rocksdb:iterator_close(ITHandle),
         erase(?COUNTER)
@@ -198,100 +203,64 @@ next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) ->
 %% Internal functions
 %%================================================================================
 
-next_loop(_, _, It, Acc, 0) ->
+next_loop(ITHandle, KeyMapper, Filter, It, Acc, 0) ->
     {ok, It, lists:reverse(Acc)};
-next_loop(ITHandle, KeyMapper, It0 = #it{last_seen_key = Key0, key_filter = KeyFilter}, Acc0, N0) ->
+next_loop(ITHandle, KeyMapper, Filter, It0, Acc0, N0) ->
     inc_counter(),
-    case next_range(KeyMapper, It0) of
-        {Key1, Bitmask, Bitfilter} when Key1 > Key0 ->
-            case iterator_move(KeyMapper, ITHandle, {seek, Key1}) of
-                {ok, Key, Val} when ?QUICKCHECK_KEY(Key, Bitmask, Bitfilter) ->
-                    assert_progress(bitmask_match, KeyMapper, KeyFilter, Key0, Key1),
-                    Msg = deserialize(Val),
+    #it{last_seen_key = Key0} = It0,
+    case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of
+        overflow ->
+            {ok, It0, lists:reverse(Acc0)};
+        Key1 ->
+            %% assert
+            true = Key1 > Key0,
+            case rocksdb:iterator_move(ITHandle, {seek, Key1}) of
+                {ok, Key, Val} ->
                     It1 = It0#it{last_seen_key = Key},
-                    case check_message(It1, Msg) of
-                        true ->
+                    case check_message(Filter, It1, Val) of
+                        {true, Msg} ->
                             N1 = N0 - 1,
                             Acc1 = [Msg | Acc0];
                         false ->
                             N1 = N0,
                             Acc1 = Acc0
                     end,
-                    {N, It, Acc} = traverse_interval(
-                        ITHandle, KeyMapper, Bitmask, Bitfilter, It1, Acc1, N1
-                    ),
-                    next_loop(ITHandle, KeyMapper, It, Acc, N);
-                {ok, Key, _Val} ->
-                    assert_progress(bitmask_miss, KeyMapper, KeyFilter, Key0, Key1),
-                    It = It0#it{last_seen_key = Key},
-                    next_loop(ITHandle, KeyMapper, It, Acc0, N0);
+                    {N, It, Acc} = traverse_interval(ITHandle, KeyMapper, Filter, It1, Acc1, N1),
+                    next_loop(ITHandle, KeyMapper, Filter, It, Acc, N);
                 {error, invalid_iterator} ->
                     {ok, It0, lists:reverse(Acc0)}
-            end;
-        _ ->
-            {ok, It0, lists:reverse(Acc0)}
+            end
     end.
 
-traverse_interval(_, _, _, _, It, Acc, 0) ->
+traverse_interval(_ITHandle, _KeyMapper, _Filter, It, Acc, 0) ->
     {0, It, Acc};
-traverse_interval(ITHandle, KeyMapper, Bitmask, Bitfilter, It0, Acc, N) ->
+traverse_interval(ITHandle, KeyMapper, Filter, It0, Acc, N) ->
     inc_counter(),
-    case iterator_move(KeyMapper, ITHandle, next) of
-        {ok, Key, Val} when ?QUICKCHECK_KEY(Key, Bitmask, Bitfilter) ->
-            Msg = deserialize(Val),
+    case rocksdb:iterator_move(ITHandle, next) of
+        {ok, Key, Val} ->
             It = It0#it{last_seen_key = Key},
-            case check_message(It, Msg) of
-                true ->
-                    traverse_interval(
-                        ITHandle, KeyMapper, Bitmask, Bitfilter, It, [Msg | Acc], N - 1
-                    );
+            case check_message(Filter, It, Val) of
+                {true, Msg} ->
+                    traverse_interval(ITHandle, KeyMapper, Filter, It, [Msg | Acc], N - 1);
                 false ->
-                    traverse_interval(ITHandle, KeyMapper, Bitmask, Bitfilter, It, Acc, N)
+                    traverse_interval(ITHandle, KeyMapper, Filter, It, Acc, N)
             end;
-        {ok, Key, _Val} ->
-            It = It0#it{last_seen_key = Key},
-            {N, It, Acc};
         {error, invalid_iterator} ->
             {0, It0, Acc}
     end.
 
-next_range(KeyMapper, #it{key_filter = KeyFilter, last_seen_key = PrevKey}) ->
-    emqx_ds_bitmask_keymapper:next_range(KeyMapper, KeyFilter, PrevKey).
-
-check_message(_Iterator, _Msg) ->
-    %% TODO.
-    true.
-
-iterator_move(KeyMapper, ITHandle, Action0) ->
-    Action =
-        case Action0 of
-            next ->
-                next;
-            {seek, Int} ->
-                {seek, emqx_ds_bitmask_keymapper:key_to_bitstring(KeyMapper, Int)}
-        end,
-    case rocksdb:iterator_move(ITHandle, Action) of
-        {ok, KeyBin, Val} ->
-            {ok, emqx_ds_bitmask_keymapper:bitstring_to_key(KeyMapper, KeyBin), Val};
-        {ok, KeyBin} ->
-            {ok, emqx_ds_bitmask_keymapper:bitstring_to_key(KeyMapper, KeyBin)};
-        Other ->
-            Other
+-spec check_message(emqx_ds_bitmask_keymapper:filter(), #it{}, binary()) ->
+    {true, #message{}} | false.
+check_message(Filter, #it{last_seen_key = Key}, Val) ->
+    case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of
+        true ->
+            Msg = deserialize(Val),
+            %% TODO: check strict time and hash collisions
+            {true, Msg};
+        false ->
+            false
     end.
 
-assert_progress(_Msg, _KeyMapper, _KeyFilter, Key0, Key1) when Key1 > Key0 ->
-    ?tp_ignore_side_effects_in_prod(
-       emqx_ds_storage_bitfield_lts_iter_move,
-       #{ location => _Msg
-        , key0 => format_key(_KeyMapper, Key0)
-        , key1 => format_key(_KeyMapper, Key1)
-        }),
-    ok;
-assert_progress(Msg, KeyMapper, KeyFilter, Key0, Key1) ->
-    Str0 = format_key(KeyMapper, Key0),
-    Str1 = format_key(KeyMapper, Key1),
-    error(#{'$msg' => Msg, key0 => Str0, key1 => Str1, step => get(?COUNTER), keyfilter => lists:map(fun format_keyfilter/1, KeyFilter)}).
-
 format_key(KeyMapper, Key) ->
     Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)],
     lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])).
@@ -357,16 +326,6 @@ make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) ->
     end,
     Keymapper.
 
-upper_bound(Keymapper, [TopicIndex | Rest]) ->
-    filter_to_key(Keymapper, [TopicIndex | [{'=', infinity} || _ <- Rest]]).
-
-lower_bound(Keymapper, [TopicIndex | Rest]) ->
-    filter_to_key(Keymapper, [TopicIndex | [{'=', 0} || _ <- Rest]]).
-
-filter_to_key(KeyMapper, KeyFilter) ->
-    {Key, _, _} = emqx_ds_bitmask_keymapper:next_range(KeyMapper, KeyFilter, 0),
-    emqx_ds_bitmask_keymapper:key_to_bitstring(KeyMapper, Key).
-
 -spec restore_trie(pos_integer(), rocksdb:db_handle(), rocksdb:cf_handle()) -> emqx_ds_lts:trie().
 restore_trie(TopicIndexBytes, DB, CF) ->
     PersistCallback = fun(Key, Val) ->

+ 0 - 714
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl_

@@ -1,714 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%--------------------------------------------------------------------
--module(emqx_ds_storage_layer).
-
--behaviour(gen_server).
-
-%% API:
--export([start_link/2]).
--export([create_generation/3]).
-
--export([open_shard/2, get_streams/3]).
--export([message_store/3]).
--export([delete/4]).
-
--export([make_iterator/3, next/1, next/2]).
-
--export([
-    preserve_iterator/2,
-    restore_iterator/2,
-    discard_iterator/2,
-    ensure_iterator/3,
-    discard_iterator_prefix/2,
-    list_iterator_prefix/2,
-    foldl_iterator_prefix/4
-]).
-
-%% gen_server callbacks:
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-
--export_type([stream/0, cf_refs/0, gen_id/0, options/0, state/0, iterator/0]).
--export_type([db_options/0, db_write_options/0, db_read_options/0]).
-
--compile({inline, [meta_lookup/2]}).
-
--include_lib("emqx/include/emqx.hrl").
-
-%%================================================================================
-%% Type declarations
-%%================================================================================
-
--type options() :: #{
-    dir => file:filename()
-}.
-
-%% see rocksdb:db_options()
--type db_options() :: proplists:proplist().
-%% see rocksdb:write_options()
--type db_write_options() :: proplists:proplist().
-%% see rocksdb:read_options()
--type db_read_options() :: proplists:proplist().
-
--type cf_refs() :: [{string(), rocksdb:cf_handle()}].
-
-%% Message storage generation
-%% Keep in mind that instances of this type are persisted in long-term storage.
--type generation() :: #{
-    %% Module that handles data for the generation
-    module := module(),
-    %% Module-specific data defined at generation creation time
-    data := term(),
-    %% When should this generation become active?
-    %% This generation should only contain messages timestamped no earlier than that.
-    %% The very first generation will have `since` equal 0.
-    since := emqx_ds:time()
-}.
-
--record(s, {
-    shard :: emqx_ds:shard(),
-    keyspace :: emqx_ds_conf:keyspace(),
-    db :: rocksdb:db_handle(),
-    cf_iterator :: rocksdb:cf_handle(),
-    cf_generations :: cf_refs()
-}).
-
--record(stream,
-        { generation :: gen_id()
-        , topic_filter :: emqx_ds:topic_filter()
-        , since :: emqx_ds:time()
-        , enc :: _EncapsultatedData
-        }).
-
--opaque stream() :: #stream{}.
-
--record(it, {
-    shard :: emqx_ds:shard(),
-    gen :: gen_id(),
-    replay :: emqx_ds:replay(),
-    module :: module(),
-    data :: term()
-}).
-
--type gen_id() :: 0..16#ffff.
-
--opaque state() :: #s{}.
--opaque iterator() :: #it{}.
-
-%% Contents of the default column family:
-%%
-%% [{<<"genNN">>, #generation{}}, ...,
-%%  {<<"current">>, GenID}]
-
--define(DEFAULT_CF, "default").
--define(DEFAULT_CF_OPTS, []).
-
--define(ITERATOR_CF, "$iterators").
-
-%% TODO
-%% 1. CuckooTable might be of use here / `OptimizeForPointLookup(...)`.
-%% 2. Supposedly might be compressed _very_ effectively.
-%% 3. `inplace_update_support`?
--define(ITERATOR_CF_OPTS, []).
-
--define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
-
-%%================================================================================
-%% Callbacks
-%%================================================================================
-
--callback create_new(rocksdb:db_handle(), gen_id(), _Options :: term()) ->
-    {_Schema, cf_refs()}.
-
--callback open(
-    emqx_ds:shard(),
-    rocksdb:db_handle(),
-    gen_id(),
-    cf_refs(),
-    _Schema
-) ->
-    _DB.
-
--callback store(
-    _DB,
-    _MessageID :: binary(),
-    emqx_ds:time(),
-    emqx_ds:topic(),
-    _Payload :: binary()
-) ->
-    ok | {error, _}.
-
--callback delete(_DB, _MessageID :: binary(), emqx_ds:time(), emqx_ds:topic()) ->
-    ok | {error, _}.
-
--callback get_streams(_DB, emqx_ds:topic_filter(), emqx_ds:time()) ->
-    [{_TopicRankX, _Stream}].
-
--callback make_iterator(_DB, emqx_ds:replay()) ->
-    {ok, _It} | {error, _}.
-
--callback restore_iterator(_DB, _Serialized :: binary()) -> {ok, _It} | {error, _}.
-
--callback preserve_iterator(_It) -> term().
-
--callback next(It) -> {value, binary(), It} | none | {error, closed}.
-
-%%================================================================================
-%% Replication layer API
-%%================================================================================
-
--spec open_shard(emqx_ds_replication_layer:shard(), emqx_ds_storage_layer:options()) -> ok.
-open_shard(Shard, Options) ->
-    emqx_ds_storage_layer_sup:ensure_shard(Shard, Options).
-
--spec get_streams(emqx_ds:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{emqx_ds:stream_rank(), _Stream}].
-get_streams(Shard, TopicFilter, StartTime) ->
-    %% TODO: lookup ALL generations
-    {GenId, #{module := Mod, data := ModState}} = meta_lookup_gen(Shard, StartTime),
-    lists:map(
-        fun({RankX, ModStream}) ->
-                Stream = #stream{ generation = GenId
-                                , topic_filter = TopicFilter
-                                , since = StartTime
-                                , enc = ModStream
-                                },
-                Rank = {RankX, GenId},
-                {Rank, Stream}
-        end,
-      Mod:get_streams(ModState, TopicFilter, StartTime)).
-
--spec message_store(emqx_ds:shard(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
-                           {ok, _MessageId} | {error, _}.
-message_store(Shard, Msgs, _Opts) ->
-    {ok, lists:map(
-           fun(Msg) ->
-                   GUID = emqx_message:id(Msg),
-                   Timestamp = Msg#message.timestamp,
-                   {_GenId, #{module := Mod, data := ModState}} = meta_lookup_gen(Shard, Timestamp),
-                   Topic = emqx_topic:words(emqx_message:topic(Msg)),
-                   Payload = serialize(Msg),
-                   Mod:store(ModState, GUID, Timestamp, Topic, Payload),
-                   GUID
-           end,
-           Msgs)}.
-
--spec next(iterator()) -> {ok, iterator(), [binary()]} | end_of_stream.
-next(It = #it{}) ->
-    next(It, _BatchSize = 1).
-
--spec next(iterator(), pos_integer()) -> {ok, iterator(), [binary()]} | end_of_stream.
-next(#it{data = {?MODULE, end_of_stream}}, _BatchSize) ->
-    end_of_stream;
-next(
-    It = #it{shard = Shard, module = Mod, gen = Gen, data = {?MODULE, retry, Serialized}}, BatchSize
-) ->
-    #{data := DBData} = meta_get_gen(Shard, Gen),
-    {ok, ItData} = Mod:restore_iterator(DBData, Serialized),
-    next(It#it{data = ItData}, BatchSize);
-next(It = #it{}, BatchSize) ->
-    do_next(It, BatchSize, _Acc = []).
-
-%%================================================================================
-%% API functions
-%%================================================================================
-
--spec create_generation(
-    emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()
-) ->
-    {ok, gen_id()} | {error, nonmonotonic}.
-create_generation(ShardId, Since, Config = {_Module, _Options}) ->
-    gen_server:call(?REF(ShardId), {create_generation, Since, Config}).
-
--spec delete(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic()) ->
-    ok | {error, _}.
-delete(Shard, GUID, Time, Topic) ->
-    {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time),
-    Mod:delete(Data, GUID, Time, Topic).
-
--spec make_iterator(emqx_ds:shard(), stream(), emqx_ds:time()) ->
-    {ok, iterator()} | {error, _TODO}.
-make_iterator(Shard, Stream, StartTime) ->
-    #stream{ topic_filter = TopicFilter
-           , since = Since
-           , enc = Enc
-           } = Stream,
-    {GenId, Gen} = meta_lookup_gen(Shard, StartTime),
-    Replay = {TopicFilter, Since},
-    case Mod:make_iterator(Data, Replay, Options) of
-    #it{ gen = GenId,
-         replay = {TopicFilter, Since}
-       }.
-
--spec do_next(iterator(), non_neg_integer(), [binary()]) ->
-    {ok, iterator(), [binary()]} | end_of_stream.
-do_next(It, N, Acc) when N =< 0 ->
-    {ok, It, lists:reverse(Acc)};
-do_next(It = #it{module = Mod, data = ItData}, N, Acc) ->
-    case Mod:next(ItData) of
-        {value, Bin, ItDataNext} ->
-            Val = deserialize(Bin),
-            do_next(It#it{data = ItDataNext}, N - 1, [Val | Acc]);
-        {error, _} = _Error ->
-            %% todo: log?
-            %% iterator might be invalid now; will need to re-open it.
-            Serialized = Mod:preserve_iterator(ItData),
-            {ok, It#it{data = {?MODULE, retry, Serialized}}, lists:reverse(Acc)};
-        none ->
-            case open_next_iterator(It) of
-                {ok, ItNext} ->
-                    do_next(ItNext, N, Acc);
-                {error, _} = _Error ->
-                    %% todo: log?
-                    %% fixme: only bad options may lead to this?
-                    %% return an "empty" iterator to be re-opened when retrying?
-                    Serialized = Mod:preserve_iterator(ItData),
-                    {ok, It#it{data = {?MODULE, retry, Serialized}}, lists:reverse(Acc)};
-                none ->
-                    case Acc of
-                        [] ->
-                            end_of_stream;
-                        _ ->
-                            {ok, It#it{data = {?MODULE, end_of_stream}}, lists:reverse(Acc)}
-                    end
-            end
-    end.
-
--spec preserve_iterator(iterator(), emqx_ds:iterator_id()) ->
-    ok | {error, _TODO}.
-preserve_iterator(It = #it{}, IteratorID) ->
-    iterator_put_state(IteratorID, It).
-
--spec restore_iterator(emqx_ds:shard(), emqx_ds:replay_id()) ->
-    {ok, iterator()} | {error, _TODO}.
-restore_iterator(Shard, ReplayID) ->
-    case iterator_get_state(Shard, ReplayID) of
-        {ok, Serial} ->
-            restore_iterator_state(Shard, Serial);
-        not_found ->
-            {error, not_found};
-        {error, _Reason} = Error ->
-            Error
-    end.
-
--spec ensure_iterator(emqx_ds:shard(), emqx_ds:iterator_id(), emqx_ds:replay()) ->
-    {ok, iterator()} | {error, _TODO}.
-ensure_iterator(Shard, IteratorID, Replay = {_TopicFilter, _StartMS}) ->
-    case restore_iterator(Shard, IteratorID) of
-        {ok, It} ->
-            {ok, It};
-        {error, not_found} ->
-            {ok, It} = make_iterator(Shard, Replay),
-            ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID),
-            {ok, It};
-        Error ->
-            Error
-    end.
-
--spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) ->
-    ok | {error, _TODO}.
-discard_iterator(Shard, ReplayID) ->
-    iterator_delete(Shard, ReplayID).
-
--spec discard_iterator_prefix(emqx_ds:shard(), binary()) ->
-    ok | {error, _TODO}.
-discard_iterator_prefix(Shard, KeyPrefix) ->
-    case do_discard_iterator_prefix(Shard, KeyPrefix) of
-        {ok, _} -> ok;
-        Error -> Error
-    end.
-
--spec list_iterator_prefix(
-    emqx_ds:shard(),
-    binary()
-) -> {ok, [emqx_ds:iterator_id()]} | {error, _TODO}.
-list_iterator_prefix(Shard, KeyPrefix) ->
-    do_list_iterator_prefix(Shard, KeyPrefix).
-
--spec foldl_iterator_prefix(
-    emqx_ds:shard(),
-    binary(),
-    fun((_Key :: binary(), _Value :: binary(), Acc) -> Acc),
-    Acc
-) -> {ok, Acc} | {error, _TODO} when
-    Acc :: term().
-foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) ->
-    do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc).
-
-%%================================================================================
-%% gen_server
-%%================================================================================
-
--spec start_link(emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
-    {ok, pid()}.
-start_link(Shard, Options) ->
-    gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []).
-
-init({Shard, Options}) ->
-    process_flag(trap_exit, true),
-    {ok, S0} = do_open_db(Shard, Options),
-    S = ensure_current_generation(S0),
-    ok = populate_metadata(S),
-    {ok, S}.
-
-handle_call({create_generation, Since, Config}, _From, S) ->
-    case create_new_gen(Since, Config, S) of
-        {ok, GenId, NS} ->
-            {reply, {ok, GenId}, NS};
-        {error, _} = Error ->
-            {reply, Error, S}
-    end;
-handle_call(_Call, _From, S) ->
-    {reply, {error, unknown_call}, S}.
-
-handle_cast(_Cast, S) ->
-    {noreply, S}.
-
-handle_info(_Info, S) ->
-    {noreply, S}.
-
-terminate(_Reason, #s{db = DB, shard = Shard}) ->
-    meta_erase(Shard),
-    ok = rocksdb:close(DB).
-
-%%================================================================================
-%% Internal functions
-%%================================================================================
-
--record(db, {handle :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle()}).
-
--spec populate_metadata(state()) -> ok.
-populate_metadata(S = #s{shard = Shard, db = DBHandle, cf_iterator = CFIterator}) ->
-    ok = meta_put(Shard, db, #db{handle = DBHandle, cf_iterator = CFIterator}),
-    Current = schema_get_current(DBHandle),
-    lists:foreach(fun(GenId) -> populate_metadata(GenId, S) end, lists:seq(0, Current)).
-
--spec populate_metadata(gen_id(), state()) -> ok.
-populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) ->
-    Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S),
-    meta_register_gen(Shard, GenId, Gen).
-
--spec ensure_current_generation(state()) -> state().
-ensure_current_generation(S = #s{shard = _Shard, keyspace = Keyspace, db = DBHandle}) ->
-    case schema_get_current(DBHandle) of
-        undefined ->
-            Config = emqx_ds_conf:keyspace_config(Keyspace),
-            {ok, _, NS} = create_new_gen(0, Config, S),
-            NS;
-        _GenId ->
-            S
-    end.
-
--spec create_new_gen(emqx_ds:time(), emqx_ds_conf:backend_config(), state()) ->
-    {ok, gen_id(), state()} | {error, nonmonotonic}.
-create_new_gen(Since, Config, S = #s{shard = Shard, db = DBHandle}) ->
-    GenId = get_next_id(meta_get_current(Shard)),
-    GenId = get_next_id(schema_get_current(DBHandle)),
-    case is_gen_valid(Shard, GenId, Since) of
-        ok ->
-            {ok, Gen, NS} = create_gen(GenId, Since, Config, S),
-            %% TODO: Transaction? Column family creation can't be transactional, anyway.
-            ok = schema_put_gen(DBHandle, GenId, Gen),
-            ok = schema_put_current(DBHandle, GenId),
-            ok = meta_register_gen(Shard, GenId, open_gen(GenId, Gen, NS)),
-            {ok, GenId, NS};
-        {error, _} = Error ->
-            Error
-    end.
-
--spec create_gen(gen_id(), emqx_ds:time(), emqx_ds_conf:backend_config(), state()) ->
-    {ok, generation(), state()}.
-create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations = CFs}) ->
-    % TODO: Backend implementation should ensure idempotency.
-    {Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options),
-    Gen = #{
-        module => Module,
-        data => Schema,
-        since => Since
-    },
-    {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}.
-
--spec do_open_db(emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}.
-do_open_db(Shard, Options) ->
-    DefaultDir = binary_to_list(Shard),
-    DBDir = unicode:characters_to_list(maps:get(dir, Options, DefaultDir)),
-    %% TODO: properly forward keyspace
-    Keyspace = maps:get(keyspace, Options, default_keyspace),
-    DBOptions = [
-        {create_if_missing, true},
-        {create_missing_column_families, true}
-        | emqx_ds_conf:db_options(Keyspace)
-    ],
-    _ = filelib:ensure_dir(DBDir),
-    ExistingCFs =
-        case rocksdb:list_column_families(DBDir, DBOptions) of
-            {ok, CFs} ->
-                [{Name, []} || Name <- CFs, Name /= ?DEFAULT_CF, Name /= ?ITERATOR_CF];
-            % DB is not present. First start
-            {error, {db_open, _}} ->
-                []
-        end,
-    ColumnFamilies = [
-        {?DEFAULT_CF, ?DEFAULT_CF_OPTS},
-        {?ITERATOR_CF, ?ITERATOR_CF_OPTS}
-        | ExistingCFs
-    ],
-    case rocksdb:open(DBDir, DBOptions, ColumnFamilies) of
-        {ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} ->
-            {CFNames, _} = lists:unzip(ExistingCFs),
-            {ok, #s{
-                shard = Shard,
-                keyspace = Keyspace,
-                db = DBHandle,
-                cf_iterator = CFIterator,
-                cf_generations = lists:zip(CFNames, CFRefs)
-            }};
-        Error ->
-            Error
-    end.
-
--spec open_gen(gen_id(), generation(), state()) -> generation().
-open_gen(
-    GenId,
-    Gen = #{module := Mod, data := Data},
-    #s{shard = Shard, db = DBHandle, cf_generations = CFs}
-) ->
-    DB = Mod:open(Shard, DBHandle, GenId, CFs, Data),
-    Gen#{data := DB}.
-
--spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none.
-open_next_iterator(It = #it{shard = Shard, gen = GenId}) ->
-    open_next_iterator(meta_get_gen(Shard, GenId + 1), It#it{gen = GenId + 1}).
-
-open_next_iterator(undefined, _It) ->
-    none;
-open_next_iterator(Gen = #{}, It) ->
-    open_iterator(Gen, It).
-
--spec open_restore_iterator(generation(), iterator(), binary()) ->
-    {ok, iterator()} | {error, _Reason}.
-open_restore_iterator(#{module := Mod, data := Data}, It = #it{}, Serial) ->
-    case Mod:restore_iterator(Data, Serial) of
-        {ok, ItData} ->
-            {ok, It#it{module = Mod, data = ItData}};
-        Err ->
-            Err
-    end.
-
-%%
-
--define(KEY_REPLAY_STATE(IteratorId), <<(IteratorId)/binary, "rs">>).
--define(KEY_REPLAY_STATE_PAT(KeyReplayState), begin
-    <<IteratorId:(size(KeyReplayState) - 2)/binary, "rs">> = (KeyReplayState),
-    IteratorId
-end).
-
--define(ITERATION_WRITE_OPTS, []).
--define(ITERATION_READ_OPTS, []).
-
-iterator_get_state(Shard, ReplayID) ->
-    #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
-    rocksdb:get(Handle, CF, ?KEY_REPLAY_STATE(ReplayID), ?ITERATION_READ_OPTS).
-
-iterator_put_state(ID, It = #it{shard = Shard}) ->
-    #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
-    Serial = preserve_iterator_state(It),
-    rocksdb:put(Handle, CF, ?KEY_REPLAY_STATE(ID), Serial, ?ITERATION_WRITE_OPTS).
-
-iterator_delete(Shard, ID) ->
-    #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
-    rocksdb:delete(Handle, CF, ?KEY_REPLAY_STATE(ID), ?ITERATION_WRITE_OPTS).
-
-preserve_iterator_state(#it{
-    gen = Gen,
-    replay = {TopicFilter, StartTime},
-    module = Mod,
-    data = ItData
-}) ->
-    term_to_binary(#{
-        v => 1,
-        gen => Gen,
-        filter => TopicFilter,
-        start => StartTime,
-        st => Mod:preserve_iterator(ItData)
-    }).
-
-restore_iterator_state(Shard, Serial) when is_binary(Serial) ->
-    restore_iterator_state(Shard, binary_to_term(Serial));
-restore_iterator_state(
-    Shard,
-    #{
-        v := 1,
-        gen := Gen,
-        filter := TopicFilter,
-        start := StartTime,
-        st := State
-    }
-) ->
-    It = #it{shard = Shard, gen = Gen, replay = {TopicFilter, StartTime}},
-    open_restore_iterator(meta_get_gen(Shard, Gen), It, State).
-
-do_list_iterator_prefix(Shard, KeyPrefix) ->
-    Fn = fun(K0, _V, Acc) ->
-        K = ?KEY_REPLAY_STATE_PAT(K0),
-        [K | Acc]
-    end,
-    do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, []).
-
-do_discard_iterator_prefix(Shard, KeyPrefix) ->
-    #db{handle = DBHandle, cf_iterator = CF} = meta_lookup(Shard, db),
-    Fn = fun(K, _V, _Acc) -> ok = rocksdb:delete(DBHandle, CF, K, ?ITERATION_WRITE_OPTS) end,
-    do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, ok).
-
-do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) ->
-    #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
-    case rocksdb:iterator(Handle, CF, ?ITERATION_READ_OPTS) of
-        {ok, It} ->
-            NextAction = {seek, KeyPrefix},
-            do_foldl_iterator_prefix(Handle, CF, It, KeyPrefix, NextAction, Fn, Acc);
-        Error ->
-            Error
-    end.
-
-do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, NextAction, Fn, Acc) ->
-    case rocksdb:iterator_move(It, NextAction) of
-        {ok, K = <<KeyPrefix:(size(KeyPrefix))/binary, _/binary>>, V} ->
-            NewAcc = Fn(K, V, Acc),
-            do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, next, Fn, NewAcc);
-        {ok, _K, _V} ->
-            ok = rocksdb:iterator_close(It),
-            {ok, Acc};
-        {error, invalid_iterator} ->
-            ok = rocksdb:iterator_close(It),
-            {ok, Acc};
-        Error ->
-            ok = rocksdb:iterator_close(It),
-            Error
-    end.
-
-%% Functions for dealing with the metadata stored persistently in rocksdb
-
--define(CURRENT_GEN, <<"current">>).
--define(SCHEMA_WRITE_OPTS, []).
--define(SCHEMA_READ_OPTS, []).
-
--spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> generation().
-schema_get_gen(DBHandle, GenId) ->
-    {ok, Bin} = rocksdb:get(DBHandle, schema_gen_key(GenId), ?SCHEMA_READ_OPTS),
-    binary_to_term(Bin).
-
--spec schema_put_gen(rocksdb:db_handle(), gen_id(), generation()) -> ok | {error, _}.
-schema_put_gen(DBHandle, GenId, Gen) ->
-    rocksdb:put(DBHandle, schema_gen_key(GenId), term_to_binary(Gen), ?SCHEMA_WRITE_OPTS).
-
--spec schema_get_current(rocksdb:db_handle()) -> gen_id() | undefined.
-schema_get_current(DBHandle) ->
-    case rocksdb:get(DBHandle, ?CURRENT_GEN, ?SCHEMA_READ_OPTS) of
-        {ok, Bin} ->
-            binary_to_integer(Bin);
-        not_found ->
-            undefined
-    end.
-
--spec schema_put_current(rocksdb:db_handle(), gen_id()) -> ok | {error, _}.
-schema_put_current(DBHandle, GenId) ->
-    rocksdb:put(DBHandle, ?CURRENT_GEN, integer_to_binary(GenId), ?SCHEMA_WRITE_OPTS).
-
--spec schema_gen_key(integer()) -> binary().
-schema_gen_key(N) ->
-    <<"gen", N:32>>.
-
--undef(CURRENT_GEN).
--undef(SCHEMA_WRITE_OPTS).
--undef(SCHEMA_READ_OPTS).
-
-%% Functions for dealing with the runtime shard metadata:
-
--define(PERSISTENT_TERM(SHARD, GEN), {emqx_ds_storage_layer, SHARD, GEN}).
-
--spec meta_register_gen(emqx_ds:shard(), gen_id(), generation()) -> ok.
-meta_register_gen(Shard, GenId, Gen) ->
-    Gs =
-        case GenId > 0 of
-            true -> meta_lookup(Shard, GenId - 1);
-            false -> []
-        end,
-    ok = meta_put(Shard, GenId, [Gen | Gs]),
-    ok = meta_put(Shard, current, GenId).
-
--spec meta_lookup_gen(emqx_ds:shard(), emqx_ds:time()) -> {gen_id(), generation()}.
-meta_lookup_gen(Shard, Time) ->
-    %% TODO
-    %% Is cheaper persistent term GC on update here worth extra lookup? I'm leaning
-    %% towards a "no".
-    Current = meta_lookup(Shard, current),
-    Gens = meta_lookup(Shard, Current),
-    find_gen(Time, Current, Gens).
-
-find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since ->
-    {GenId, Gen};
-find_gen(Time, GenId, [_Gen | Rest]) ->
-    find_gen(Time, GenId - 1, Rest).
-
--spec meta_get_gen(emqx_ds:shard(), gen_id()) -> generation() | undefined.
-meta_get_gen(Shard, GenId) ->
-    case meta_lookup(Shard, GenId, []) of
-        [Gen | _Older] -> Gen;
-        [] -> undefined
-    end.
-
--spec meta_get_current(emqx_ds:shard()) -> gen_id() | undefined.
-meta_get_current(Shard) ->
-    meta_lookup(Shard, current, undefined).
-
--spec meta_lookup(emqx_ds:shard(), _K) -> _V.
-meta_lookup(Shard, Key) ->
-    persistent_term:get(?PERSISTENT_TERM(Shard, Key)).
-
--spec meta_lookup(emqx_ds:shard(), _K, Default) -> _V | Default.
-meta_lookup(Shard, K, Default) ->
-    persistent_term:get(?PERSISTENT_TERM(Shard, K), Default).
-
--spec meta_put(emqx_ds:shard(), _K, _V) -> ok.
-meta_put(Shard, K, V) ->
-    persistent_term:put(?PERSISTENT_TERM(Shard, K), V).
-
--spec meta_erase(emqx_ds:shard()) -> ok.
-meta_erase(Shard) ->
-    [
-        persistent_term:erase(K)
-     || {K = ?PERSISTENT_TERM(Z, _), _} <- persistent_term:get(), Z =:= Shard
-    ],
-    ok.
-
--undef(PERSISTENT_TERM).
-
-get_next_id(undefined) -> 0;
-get_next_id(GenId) -> GenId + 1.
-
-is_gen_valid(Shard, GenId, Since) when GenId > 0 ->
-    [GenPrev | _] = meta_lookup(Shard, GenId - 1),
-    case GenPrev of
-        #{since := SincePrev} when Since > SincePrev ->
-            ok;
-        #{} ->
-            {error, nonmonotonic}
-    end;
-is_gen_valid(_Shard, 0, 0) ->
-    ok.
-
-serialize(Msg) ->
-    %% TODO: remove topic, GUID, etc. from the stored
-    %% message. Reconstruct it from the metadata.
-    term_to_binary(emqx_message:to_map(Msg)).
-
-deserialize(Bin) ->
-    emqx_message:from_map(binary_to_term(Bin)).
-
-
-%% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok.
-%% store_cfs(DBHandle, CFRefs) ->
-%%     lists:foreach(
-%%       fun({CFName, CFRef}) ->
-%%               persistent_term:put({self(), CFName}, {DBHandle, CFRef})
-%%       end,
-%%       CFRefs).

+ 0 - 748
apps/emqx_durable_storage/src/emqx_ds_storage_layer_bitmask.erl_

@@ -1,748 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%--------------------------------------------------------------------
-
--module(emqx_ds_message_storage_bitmask).
-
-%%================================================================================
-%% @doc Description of the schema
-%%
-%% Let us assume that `T' is a topic and `t' is time. These are the two
-%% dimensions used to index messages. They can be viewed as
-%% "coordinates" of an MQTT message in a 2D space.
-%%
-%% Oftentimes, when wildcard subscription is used, keys must be
-%% scanned in both dimensions simultaneously.
-%%
-%% Rocksdb allows to iterate over sorted keys very fast. This means we
-%% need to map our two-dimentional keys to a single index that is
-%% sorted in a way that helps to iterate over both time and topic
-%% without having to do a lot of random seeks.
-%%
-%% == Mapping of 2D keys to rocksdb keys ==
-%%
-%% We use "zigzag" pattern to store messages, where rocksdb key is
-%% composed like like this:
-%%
-%%              |ttttt|TTTTTTTTT|tttt|
-%%                 ^       ^      ^
-%%                 |       |      |
-%%         +-------+       |      +---------+
-%%         |               |                |
-%% most significant    topic hash   least significant
-%% bits of timestamp                bits of timestamp
-%% (a.k.a epoch)                    (a.k.a time offset)
-%%
-%% Topic hash is level-aware: each topic level is hashed separately
-%% and the resulting hashes are bitwise-concatentated. This allows us
-%% to map topics to fixed-length bitstrings while keeping some degree
-%% of information about the hierarchy.
-%%
-%% Next important concept is what we call "epoch". Duration of the
-%% epoch is determined by maximum time offset. Epoch is calculated by
-%% shifting bits of the timestamp right.
-%%
-%% The resulting index is a space-filling curve that looks like
-%% this in the topic-time 2D space:
-%%
-%% T ^ ---->------   |---->------   |---->------
-%%   |       --/     /      --/     /      --/
-%%   |   -<-/       |   -<-/       |   -<-/
-%%   | -/           | -/           | -/
-%%   | ---->------  | ---->------  | ---->------
-%%   |       --/    /       --/    /       --/
-%%   |   ---/      |    ---/      |    ---/
-%%   | -/          ^  -/          ^  -/
-%%   | ---->------ |  ---->------ |  ---->------
-%%   |       --/   /        --/   /        --/
-%%   |   -<-/     |     -<-/     |     -<-/
-%%   | -/         |   -/         |   -/
-%%   | ---->------|   ---->------|   ---------->
-%%   |
-%%  -+------------+-----------------------------> t
-%%        epoch
-%%
-%% This structure allows to quickly seek to a the first message that
-%% was recorded in a certain epoch in a certain topic or a
-%% group of topics matching filter like `foo/bar/#`.
-%%
-%% Due to its structure, for each pair of rocksdb keys K1 and K2, such
-%% that K1 > K2 and topic(K1) = topic(K2), timestamp(K1) >
-%% timestamp(K2).
-%% That is, replay doesn't reorder messages published in each
-%% individual topic.
-%%
-%% This property doesn't hold between different topics, but it's not deemed
-%% a problem right now.
-%%
-%%================================================================================
-
--behaviour(emqx_ds_storage_layer).
-
-%% API:
--export([create_new/3, open/5]).
--export([make_keymapper/1]).
-
--export([store/5, delete/4]).
-
--export([get_streams/3, make_iterator/3, next/1]).
-
--export([preserve_iterator/1, restore_iterator/2, refresh_iterator/1]).
-
-%% Debug/troubleshooting:
-%% Keymappers
--export([
-    keymapper_info/1,
-    compute_bitstring/3,
-    compute_topic_bitmask/2,
-    compute_time_bitmask/1,
-    hash/2
-]).
-
-%% Keyspace filters
--export([
-    make_keyspace_filter/2,
-    compute_initial_seek/1,
-    compute_next_seek/2,
-    compute_time_seek/3,
-    compute_topic_seek/4
-]).
-
--export_type([db/0, stream/0, iterator/0, schema/0]).
-
--export_type([options/0]).
--export_type([iteration_options/0]).
-
--compile(
-    {inline, [
-        bitwise_concat/3,
-        ones/1,
-        successor/1,
-        topic_hash_matches/3,
-        time_matches/3
-    ]}
-).
-
-%%================================================================================
-%% Type declarations
-%%================================================================================
-
--opaque stream() :: emqx_ds:topic_filter().
-
--type topic() :: emqx_ds:topic().
--type topic_filter() :: emqx_ds:topic_filter().
--type time() :: emqx_ds:time().
-
-%% Number of bits
--type bits() :: non_neg_integer().
-
-%% Key of a RocksDB record.
--type key() :: binary().
-
-%% Distribution of entropy among topic levels.
-%% Example: [4, 8, 16] means that level 1 gets 4 bits, level 2 gets 8 bits,
-%% and _rest of levels_ (if any) get 16 bits.
--type bits_per_level() :: [bits(), ...].
-
--type options() :: #{
-    %% Number of bits in a message timestamp.
-    timestamp_bits := bits(),
-    %% Number of bits in a key allocated to each level in a message topic.
-    topic_bits_per_level := bits_per_level(),
-    %% Maximum granularity of iteration over time.
-    epoch := time(),
-
-    iteration => iteration_options(),
-
-    cf_options => emqx_ds_storage_layer:db_cf_options()
-}.
-
--type iteration_options() :: #{
-    %% Request periodic iterator refresh.
-    %% This might be helpful during replays taking a lot of time (e.g. tens of seconds).
-    %% Note that `{every, 1000}` means 1000 _operations_ with the iterator which is not
-    %% the same as 1000 replayed messages.
-    iterator_refresh => {every, _NumOperations :: pos_integer()}
-}.
-
-%% Persistent configuration of the generation, it is used to create db
-%% record when the database is reopened
--record(schema, {keymapper :: keymapper()}).
-
--opaque schema() :: #schema{}.
-
--record(db, {
-    shard :: emqx_ds:shard(),
-    handle :: rocksdb:db_handle(),
-    cf :: rocksdb:cf_handle(),
-    keymapper :: keymapper(),
-    write_options = [{sync, true}] :: emqx_ds_storage_layer:db_write_options(),
-    read_options = [] :: emqx_ds_storage_layer:db_read_options()
-}).
-
--record(it, {
-    handle :: rocksdb:itr_handle(),
-    filter :: keyspace_filter(),
-    cursor :: binary() | undefined,
-    next_action :: {seek, binary()} | next,
-    refresh_counter :: {non_neg_integer(), pos_integer()} | undefined
-}).
-
--record(filter, {
-    keymapper :: keymapper(),
-    topic_filter :: topic_filter(),
-    start_time :: integer(),
-    hash_bitfilter :: integer(),
-    hash_bitmask :: integer(),
-    time_bitfilter :: integer(),
-    time_bitmask :: integer()
-}).
-
-% NOTE
-% Keymapper decides how to map messages into RocksDB column family keyspace.
--record(keymapper, {
-    source :: [bitsource(), ...],
-    bitsize :: bits(),
-    epoch :: non_neg_integer()
-}).
-
--type bitsource() ::
-    %% Consume `_Size` bits from timestamp starting at `_Offset`th bit.
-    %% TODO consistency
-    {timestamp, _Offset :: bits(), _Size :: bits()}
-    %% Consume next topic level (either one or all of them) and compute `_Size` bits-wide hash.
-    | {hash, level | levels, _Size :: bits()}.
-
--opaque db() :: #db{}.
--opaque iterator() :: #it{}.
--type serialized_iterator() :: binary().
--type keymapper() :: #keymapper{}.
--type keyspace_filter() :: #filter{}.
-
-%%================================================================================
-%% API funcions
-%%================================================================================
-
-%% Create a new column family for the generation and a serializable representation of the schema
--spec create_new(rocksdb:db_handle(), emqx_ds_storage_layer:gen_id(), options()) ->
-    {schema(), emqx_ds_storage_layer:cf_refs()}.
-create_new(DBHandle, GenId, Options) ->
-    CFName = data_cf(GenId),
-    CFOptions = maps:get(cf_options, Options, []),
-    {ok, CFHandle} = rocksdb:create_column_family(DBHandle, CFName, CFOptions),
-    Schema = #schema{keymapper = make_keymapper(Options)},
-    {Schema, [{CFName, CFHandle}]}.
-
-%% Reopen the database
--spec open(
-    emqx_ds:shard(),
-    rocksdb:db_handle(),
-    emqx_ds_storage_layer:gen_id(),
-    emqx_ds_storage_layer:cf_refs(),
-    schema()
-) ->
-    db().
-open(Shard, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) ->
-    {value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs),
-    #db{
-        shard = Shard,
-        handle = DBHandle,
-        cf = CFHandle,
-        keymapper = Keymapper
-    }.
-
--spec make_keymapper(options()) -> keymapper().
-make_keymapper(#{
-    timestamp_bits := TimestampBits,
-    topic_bits_per_level := BitsPerLevel,
-    epoch := MaxEpoch
-}) ->
-    TimestampLSBs = min(TimestampBits, floor(math:log2(MaxEpoch))),
-    TimestampMSBs = TimestampBits - TimestampLSBs,
-    NLevels = length(BitsPerLevel),
-    {LevelBits, [TailLevelsBits]} = lists:split(NLevels - 1, BitsPerLevel),
-    Source = lists:flatten([
-        [{timestamp, TimestampLSBs, TimestampMSBs} || TimestampMSBs > 0],
-        [{hash, level, Bits} || Bits <- LevelBits],
-        {hash, levels, TailLevelsBits},
-        [{timestamp, 0, TimestampLSBs} || TimestampLSBs > 0]
-    ]),
-    #keymapper{
-        source = Source,
-        bitsize = lists:sum([S || {_, _, S} <- Source]),
-        epoch = 1 bsl TimestampLSBs
-    }.
-
--spec store(db(), emqx_guid:guid(), emqx_ds:time(), topic(), binary()) ->
-    ok | {error, _TODO}.
-store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, MessagePayload) ->
-    Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper),
-    Value = make_message_value(Topic, MessagePayload),
-    rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options).
-
--spec delete(db(), emqx_guid:guid(), emqx_ds:time(), topic()) ->
-    ok | {error, _TODO}.
-delete(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic) ->
-    Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper),
-    rocksdb:delete(DBHandle, CFHandle, Key, DB#db.write_options).
-
--spec get_streams(db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
-          [stream()].
-get_streams(_, TopicFilter, _) ->
-    [{0, TopicFilter}].
-
--spec make_iterator(db(), emqx_ds:replay(), iteration_options()) ->
-    % {error, invalid_start_time}? might just start from the beginning of time
-    % and call it a day: client violated the contract anyway.
-    {ok, iterator()} | {error, _TODO}.
-make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, Replay, Options) ->
-    case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of
-        {ok, ITHandle} ->
-            Filter = make_keyspace_filter(Replay, DB#db.keymapper),
-            InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper),
-            RefreshCounter = make_refresh_counter(maps:get(iterator_refresh, Options, undefined)),
-            {ok, #it{
-                handle = ITHandle,
-                filter = Filter,
-                next_action = {seek, InitialSeek},
-                refresh_counter = RefreshCounter
-            }};
-        Err ->
-            Err
-    end.
-
--spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}.
-next(It0 = #it{filter = #filter{keymapper = Keymapper}}) ->
-    It = maybe_refresh_iterator(It0),
-    case rocksdb:iterator_move(It#it.handle, It#it.next_action) of
-        % spec says `{ok, Key}` is also possible but the implementation says it's not
-        {ok, Key, Value} ->
-            % Preserve last seen key in the iterator so it could be restored / refreshed later.
-            ItNext = It#it{cursor = Key},
-            Bitstring = extract(Key, Keymapper),
-            case match_next(Bitstring, Value, It#it.filter) of
-                {_Topic, Payload} ->
-                    {value, Payload, ItNext#it{next_action = next}};
-                next ->
-                    next(ItNext#it{next_action = next});
-                NextBitstring when is_integer(NextBitstring) ->
-                    NextSeek = combine(NextBitstring, <<>>, Keymapper),
-                    next(ItNext#it{next_action = {seek, NextSeek}});
-                none ->
-                    stop_iteration(ItNext)
-            end;
-        {error, invalid_iterator} ->
-            stop_iteration(It);
-        {error, iterator_closed} ->
-            {error, closed}
-    end.
-
--spec preserve_iterator(iterator()) -> serialized_iterator().
-preserve_iterator(#it{
-    cursor = Cursor,
-    filter = #filter{
-        topic_filter = TopicFilter,
-        start_time = StartTime
-    }
-}) ->
-    State = #{
-        v => 1,
-        cursor => Cursor,
-        replay => {TopicFilter, StartTime}
-    },
-    term_to_binary(State).
-
--spec restore_iterator(db(), serialized_iterator()) ->
-    {ok, iterator()} | {error, _TODO}.
-restore_iterator(DB, Serial) when is_binary(Serial) ->
-    State = binary_to_term(Serial),
-    restore_iterator(DB, State);
-restore_iterator(DB, #{
-    v := 1,
-    cursor := Cursor,
-    replay := Replay = {_TopicFilter, _StartTime}
-}) ->
-    Options = #{}, % TODO: passthrough options
-    case make_iterator(DB, Replay, Options) of
-        {ok, It} when Cursor == undefined ->
-            % Iterator was preserved right after it has been made.
-            {ok, It};
-        {ok, It} ->
-            % Iterator was preserved mid-replay, seek right past the last seen key.
-            {ok, It#it{cursor = Cursor, next_action = {seek, successor(Cursor)}}};
-        Err ->
-            Err
-    end.
-
--spec refresh_iterator(iterator()) -> iterator().
-refresh_iterator(It = #it{handle = Handle, cursor = Cursor, next_action = Action}) ->
-    case rocksdb:iterator_refresh(Handle) of
-        ok when Action =:= next ->
-            % Now the underlying iterator is invalid, need to seek instead.
-            It#it{next_action = {seek, successor(Cursor)}};
-        ok ->
-            % Now the underlying iterator is invalid, but will seek soon anyway.
-            It;
-        {error, _} ->
-            % Implementation could in theory return an {error, ...} tuple.
-            % Supposedly our best bet is to ignore it.
-            % TODO logging?
-            It
-    end.
-
-%%================================================================================
-%% Internal exports
-%%================================================================================
-
--spec keymapper_info(keymapper()) ->
-    #{source := [bitsource()], bitsize := bits(), epoch := time()}.
-keymapper_info(#keymapper{source = Source, bitsize = Bitsize, epoch = Epoch}) ->
-    #{source => Source, bitsize => Bitsize, epoch => Epoch}.
-
-make_message_key(Topic, PublishedAt, MessageID, Keymapper) ->
-    combine(compute_bitstring(Topic, PublishedAt, Keymapper), MessageID, Keymapper).
-
-make_message_value(Topic, MessagePayload) ->
-    term_to_binary({Topic, MessagePayload}).
-
-unwrap_message_value(Binary) ->
-    binary_to_term(Binary).
-
--spec combine(_Bitstring :: integer(), emqx_guid:guid() | <<>>, keymapper()) ->
-    key().
-combine(Bitstring, MessageID, #keymapper{bitsize = Size}) ->
-    <<Bitstring:Size/integer, MessageID/binary>>.
-
--spec extract(key(), keymapper()) ->
-    _Bitstring :: integer().
-extract(Key, #keymapper{bitsize = Size}) ->
-    <<Bitstring:Size/integer, _MessageID/binary>> = Key,
-    Bitstring.
-
--spec compute_bitstring(topic_filter(), time(), keymapper()) -> integer().
-compute_bitstring(TopicFilter, Timestamp, #keymapper{source = Source}) ->
-    compute_bitstring(TopicFilter, Timestamp, Source, 0).
-
--spec compute_topic_bitmask(topic_filter(), keymapper()) -> integer().
-compute_topic_bitmask(TopicFilter, #keymapper{source = Source}) ->
-    compute_topic_bitmask(TopicFilter, Source, 0).
-
--spec compute_time_bitmask(keymapper()) -> integer().
-compute_time_bitmask(#keymapper{source = Source}) ->
-    compute_time_bitmask(Source, 0).
-
--spec hash(term(), bits()) -> integer().
-hash(Input, Bits) ->
-    % at most 32 bits
-    erlang:phash2(Input, 1 bsl Bits).
-
--spec make_keyspace_filter(emqx_ds:replay(), keymapper()) -> keyspace_filter().
-make_keyspace_filter({TopicFilter, StartTime}, Keymapper) ->
-    Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper),
-    HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper),
-    TimeBitmask = compute_time_bitmask(Keymapper),
-    HashBitfilter = Bitstring band HashBitmask,
-    TimeBitfilter = Bitstring band TimeBitmask,
-    #filter{
-        keymapper = Keymapper,
-        topic_filter = TopicFilter,
-        start_time = StartTime,
-        hash_bitfilter = HashBitfilter,
-        hash_bitmask = HashBitmask,
-        time_bitfilter = TimeBitfilter,
-        time_bitmask = TimeBitmask
-    }.
-
--spec compute_initial_seek(keyspace_filter()) -> integer().
-compute_initial_seek(#filter{hash_bitfilter = HashBitfilter, time_bitfilter = TimeBitfilter}) ->
-    % Should be the same as `compute_initial_seek(0, Filter)`.
-    HashBitfilter bor TimeBitfilter.
-
--spec compute_next_seek(integer(), keyspace_filter()) -> integer().
-compute_next_seek(
-    Bitstring,
-    Filter = #filter{
-        hash_bitfilter = HashBitfilter,
-        hash_bitmask = HashBitmask,
-        time_bitfilter = TimeBitfilter,
-        time_bitmask = TimeBitmask
-    }
-) ->
-    HashMatches = topic_hash_matches(Bitstring, HashBitfilter, HashBitmask),
-    TimeMatches = time_matches(Bitstring, TimeBitfilter, TimeBitmask),
-    compute_next_seek(HashMatches, TimeMatches, Bitstring, Filter).
-
-%%================================================================================
-%% Internal functions
-%%================================================================================
-
-compute_bitstring(Topic, Timestamp, [{timestamp, Offset, Size} | Rest], Acc) ->
-    I = (Timestamp bsr Offset) band ones(Size),
-    compute_bitstring(Topic, Timestamp, Rest, bitwise_concat(Acc, I, Size));
-compute_bitstring([], Timestamp, [{hash, level, Size} | Rest], Acc) ->
-    I = hash(<<"/">>, Size),
-    compute_bitstring([], Timestamp, Rest, bitwise_concat(Acc, I, Size));
-compute_bitstring([Level | Tail], Timestamp, [{hash, level, Size} | Rest], Acc) ->
-    I = hash(Level, Size),
-    compute_bitstring(Tail, Timestamp, Rest, bitwise_concat(Acc, I, Size));
-compute_bitstring(Tail, Timestamp, [{hash, levels, Size} | Rest], Acc) ->
-    I = hash(Tail, Size),
-    compute_bitstring(Tail, Timestamp, Rest, bitwise_concat(Acc, I, Size));
-compute_bitstring(_, _, [], Acc) ->
-    Acc.
-
-compute_topic_bitmask(Filter, [{timestamp, _, Size} | Rest], Acc) ->
-    compute_topic_bitmask(Filter, Rest, bitwise_concat(Acc, 0, Size));
-compute_topic_bitmask(['#'], [{hash, _, Size} | Rest], Acc) ->
-    compute_topic_bitmask(['#'], Rest, bitwise_concat(Acc, 0, Size));
-compute_topic_bitmask(['+' | Tail], [{hash, _, Size} | Rest], Acc) ->
-    compute_topic_bitmask(Tail, Rest, bitwise_concat(Acc, 0, Size));
-compute_topic_bitmask([], [{hash, level, Size} | Rest], Acc) ->
-    compute_topic_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size));
-compute_topic_bitmask([_ | Tail], [{hash, level, Size} | Rest], Acc) ->
-    compute_topic_bitmask(Tail, Rest, bitwise_concat(Acc, ones(Size), Size));
-compute_topic_bitmask(Tail, [{hash, levels, Size} | Rest], Acc) ->
-    Mask =
-        case lists:member('+', Tail) orelse lists:member('#', Tail) of
-            true -> 0;
-            false -> ones(Size)
-        end,
-    compute_topic_bitmask([], Rest, bitwise_concat(Acc, Mask, Size));
-compute_topic_bitmask(_, [], Acc) ->
-    Acc.
-
-compute_time_bitmask([{timestamp, _, Size} | Rest], Acc) ->
-    compute_time_bitmask(Rest, bitwise_concat(Acc, ones(Size), Size));
-compute_time_bitmask([{hash, _, Size} | Rest], Acc) ->
-    compute_time_bitmask(Rest, bitwise_concat(Acc, 0, Size));
-compute_time_bitmask([], Acc) ->
-    Acc.
-
-bitwise_concat(Acc, Item, ItemSize) ->
-    (Acc bsl ItemSize) bor Item.
-
-ones(Bits) ->
-    1 bsl Bits - 1.
-
--spec successor(key()) -> key().
-successor(Key) ->
-    <<Key/binary, 0:8>>.
-
-%% |123|345|678|
-%%  foo bar baz
-
-%% |123|000|678| - |123|fff|678|
-
-%%  foo +   baz
-
-%% |fff|000|fff|
-
-%% |123|000|678|
-
-%% |123|056|678| & |fff|000|fff| = |123|000|678|.
-
-match_next(
-    Bitstring,
-    Value,
-    Filter = #filter{
-        topic_filter = TopicFilter,
-        hash_bitfilter = HashBitfilter,
-        hash_bitmask = HashBitmask,
-        time_bitfilter = TimeBitfilter,
-        time_bitmask = TimeBitmask
-    }
-) ->
-    HashMatches = topic_hash_matches(Bitstring, HashBitfilter, HashBitmask),
-    TimeMatches = time_matches(Bitstring, TimeBitfilter, TimeBitmask),
-    case HashMatches and TimeMatches of
-        true ->
-            Message = {Topic, _Payload} = unwrap_message_value(Value),
-            case emqx_topic:match(Topic, TopicFilter) of
-                true ->
-                    Message;
-                false ->
-                    next
-            end;
-        false ->
-            compute_next_seek(HashMatches, TimeMatches, Bitstring, Filter)
-    end.
-
-%% `Bitstring` is out of the hash space defined by `HashBitfilter`.
-compute_next_seek(
-    _HashMatches = false,
-    _TimeMatches,
-    Bitstring,
-    Filter = #filter{
-        keymapper = Keymapper,
-        hash_bitfilter = HashBitfilter,
-        hash_bitmask = HashBitmask,
-        time_bitfilter = TimeBitfilter,
-        time_bitmask = TimeBitmask
-    }
-) ->
-    NextBitstring = compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper),
-    case NextBitstring of
-        none ->
-            none;
-        _ ->
-            TimeMatches = time_matches(NextBitstring, TimeBitfilter, TimeBitmask),
-            compute_next_seek(true, TimeMatches, NextBitstring, Filter)
-    end;
-%% `Bitstring` is out of the time range defined by `TimeBitfilter`.
-compute_next_seek(
-    _HashMatches = true,
-    _TimeMatches = false,
-    Bitstring,
-    #filter{
-        time_bitfilter = TimeBitfilter,
-        time_bitmask = TimeBitmask
-    }
-) ->
-    compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask);
-compute_next_seek(true, true, Bitstring, _It) ->
-    Bitstring.
-
-topic_hash_matches(Bitstring, HashBitfilter, HashBitmask) ->
-    (Bitstring band HashBitmask) == HashBitfilter.
-
-time_matches(Bitstring, TimeBitfilter, TimeBitmask) ->
-    (Bitstring band TimeBitmask) >= TimeBitfilter.
-
-compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask) ->
-    % Replace the bits of the timestamp in `Bistring` with bits from `Timebitfilter`.
-    (Bitstring band (bnot TimeBitmask)) bor TimeBitfilter.
-
-%% Find the closest bitstring which is:
-%% * greater than `Bitstring`,
-%% * and falls into the hash space defined by `HashBitfilter`.
-%% Note that the result can end up "back" in time and out of the time range.
-compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) ->
-    Sources = Keymapper#keymapper.source,
-    Size = Keymapper#keymapper.bitsize,
-    compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size).
-
-compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) ->
-    % NOTE
-    % We're iterating through `Substring` here, in lockstep with `HashBitfilter`
-    % and `HashBitmask`, starting from least signigicant bits. Each bitsource in
-    % `Sources` has a bitsize `S` and, accordingly, gives us a sub-bitstring `S`
-    % bits long which we interpret as a "digit". There are 2 flavors of those
-    % "digits":
-    %  * regular digit with 2^S possible values,
-    %  * degenerate digit with exactly 1 possible value U (represented with 0).
-    % Our goal here is to find a successor of `Bistring` and perform a kind of
-    % digit-by-digit addition operation with carry propagation.
-    NextSeek = zipfoldr3(
-        fun(Source, Substring, Filter, LBitmask, Offset, Acc) ->
-            case Source of
-                {hash, _, S} when LBitmask =:= 0 ->
-                    % Regular case
-                    bitwise_add_digit(Substring, Acc, S, Offset);
-                {hash, _, _} when LBitmask =/= 0, Substring < Filter ->
-                    % Degenerate case, I_digit < U, no overflow.
-                    % Successor is `U bsl Offset` which is equivalent to 0.
-                    0;
-                {hash, _, S} when LBitmask =/= 0, Substring > Filter ->
-                    % Degenerate case, I_digit > U, overflow.
-                    % Successor is `(1 bsl Size + U) bsl Offset`.
-                    overflow_digit(S, Offset);
-                {hash, _, S} when LBitmask =/= 0 ->
-                    % Degenerate case, I_digit = U
-                    % Perform digit addition with I_digit = 0, assuming "digit" has
-                    % 0 bits of information (but is `S` bits long at the same time).
-                    % This will overflow only if the result of previous iteration
-                    % was an overflow.
-                    bitwise_add_digit(0, Acc, 0, S, Offset);
-                {timestamp, _, S} ->
-                    % Regular case
-                    bitwise_add_digit(Substring, Acc, S, Offset)
-            end
-        end,
-        0,
-        Bitstring,
-        HashBitfilter,
-        HashBitmask,
-        Size,
-        Sources
-    ),
-    case NextSeek bsr Size of
-        _Carry = 0 ->
-            % Found the successor.
-            % We need to recover values of those degenerate digits which we
-            % represented with 0 during digit-by-digit iteration.
-            NextSeek bor (HashBitfilter band HashBitmask);
-        _Carry = 1 ->
-            % We got "carried away" past the range, time to stop iteration.
-            none
-    end.
-
-bitwise_add_digit(Digit, Number, Width, Offset) ->
-    bitwise_add_digit(Digit, Number, Width, Width, Offset).
-
-%% Add "digit" (represented with integer `Digit`) to the `Number` assuming
-%% this digit starts at `Offset` bits in `Number` and is `Width` bits long.
-%% Perform an overflow if the result of addition would not fit into `Bits`
-%% bits.
-bitwise_add_digit(Digit, Number, Bits, Width, Offset) ->
-    Sum = (Digit bsl Offset) + Number,
-    case (Sum bsr Offset) < (1 bsl Bits) of
-        true -> Sum;
-        false -> overflow_digit(Width, Offset)
-    end.
-
-%% Constuct a number which denotes an overflow of digit that starts at
-%% `Offset` bits and is `Width` bits long.
-overflow_digit(Width, Offset) ->
-    (1 bsl Width) bsl Offset.
-
-%% Iterate through sub-bitstrings of 3 integers in lockstep, starting from least
-%% significant bits first.
-%%
-%% Each integer is assumed to be `Size` bits long. Lengths of sub-bitstring are
-%% specified in `Sources` list, in order from most significant bits to least
-%% significant. Each iteration calls `FoldFun` with:
-%% * bitsource that was used to extract sub-bitstrings,
-%% * 3 sub-bitstrings in integer representation,
-%% * bit offset into integers,
-%% * current accumulator.
--spec zipfoldr3(FoldFun, Acc, integer(), integer(), integer(), _Size :: bits(), [bitsource()]) ->
-    Acc
-when
-    FoldFun :: fun((bitsource(), integer(), integer(), integer(), _Offset :: bits(), Acc) -> Acc).
-zipfoldr3(_FoldFun, Acc, _, _, _, 0, []) ->
-    Acc;
-zipfoldr3(FoldFun, Acc, I1, I2, I3, Offset, [Source = {_, _, S} | Rest]) ->
-    OffsetNext = Offset - S,
-    AccNext = zipfoldr3(FoldFun, Acc, I1, I2, I3, OffsetNext, Rest),
-    FoldFun(
-        Source,
-        substring(I1, OffsetNext, S),
-        substring(I2, OffsetNext, S),
-        substring(I3, OffsetNext, S),
-        OffsetNext,
-        AccNext
-    ).
-
-substring(I, Offset, Size) ->
-    (I bsr Offset) band ones(Size).
-
-%% @doc Generate a column family ID for the MQTT messages
--spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()].
-data_cf(GenId) ->
-    ?MODULE_STRING ++ integer_to_list(GenId).
-
-make_refresh_counter({every, N}) when is_integer(N), N > 0 ->
-    {0, N};
-make_refresh_counter(undefined) ->
-    undefined.
-
-maybe_refresh_iterator(It = #it{refresh_counter = {N, N}}) ->
-    refresh_iterator(It#it{refresh_counter = {0, N}});
-maybe_refresh_iterator(It = #it{refresh_counter = {M, N}}) ->
-    It#it{refresh_counter = {M + 1, N}};
-maybe_refresh_iterator(It = #it{refresh_counter = undefined}) ->
-    It.
-
-stop_iteration(It) ->
-    ok = rocksdb:iterator_close(It#it.handle),
-    none.

+ 38 - 86
apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl

@@ -129,7 +129,7 @@ t_get_streams(_Config) ->
 t_replay(_Config) ->
     %% Create concrete topics:
     Topics = [<<"foo/bar">>, <<"foo/bar/baz">>],
-    Timestamps = lists:seq(1, 10),
+    Timestamps = lists:seq(1, 10_000, 100),
     Batch1 = [
         make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))
      || Topic <- Topics, PublishedAt <- Timestamps
@@ -140,10 +140,10 @@ t_replay(_Config) ->
         begin
             B = integer_to_binary(I),
             make_message(
-              TS, <<"wildcard/", B/binary, "/suffix/", Suffix/binary>>, integer_to_binary(TS)
+                TS, <<"wildcard/", B/binary, "/suffix/", Suffix/binary>>, integer_to_binary(TS)
             )
         end
-     || I <- lists:seq(1, 200), TS <- lists:seq(1, 10), Suffix <- [<<"foo">>, <<"bar">>]
+     || I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>]
     ],
     ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []),
     %% Check various topic filters:
@@ -158,6 +158,9 @@ t_replay(_Config) ->
     ?assert(check(?SHARD, <<"foo/+/+">>, 0, Messages)),
     ?assert(check(?SHARD, <<"+/+/+">>, 0, Messages)),
     ?assert(check(?SHARD, <<"+/+/baz">>, 0, Messages)),
+    %% Restart shard to make sure trie is persisted and restored:
+    ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
+    {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}),
     %% Learned wildcard topics:
     ?assertNot(check(?SHARD, <<"wildcard/1000/suffix/foo">>, 0, [])),
     ?assert(check(?SHARD, <<"wildcard/1/suffix/foo">>, 0, Messages)),
@@ -179,23 +182,24 @@ check(Shard, TopicFilter, StartTime, ExpectedMessages) ->
         ExpectedMessages
     ),
     ?check_trace(
-       #{timetrap => 10_000},
-       begin
-           Dump = dump_messages(Shard, TopicFilter, StartTime),
-           verify_dump(TopicFilter, StartTime, Dump),
-           Missing = ExpectedFiltered -- Dump,
-           Extras = Dump -- ExpectedFiltered,
-           ?assertMatch(
-               #{missing := [], unexpected := []},
-               #{
-                   missing => Missing,
-                   unexpected => Extras,
-                   topic_filter => TopicFilter,
-                   start_time => StartTime
-               }
-           )
-       end,
-       []),
+        #{timetrap => 10_000},
+        begin
+            Dump = dump_messages(Shard, TopicFilter, StartTime),
+            verify_dump(TopicFilter, StartTime, Dump),
+            Missing = ExpectedFiltered -- Dump,
+            Extras = Dump -- ExpectedFiltered,
+            ?assertMatch(
+                #{missing := [], unexpected := []},
+                #{
+                    missing => Missing,
+                    unexpected => Extras,
+                    topic_filter => TopicFilter,
+                    start_time => StartTime
+                }
+            )
+        end,
+        []
+    ),
     length(ExpectedFiltered) > 0.
 
 verify_dump(TopicFilter, StartTime, Dump) ->
@@ -227,78 +231,26 @@ dump_messages(Shard, TopicFilter, StartTime) ->
     ).
 
 dump_stream(Shard, Stream, TopicFilter, StartTime) ->
-    BatchSize = 3,
+    BatchSize = 100,
     {ok, Iterator} = emqx_ds_storage_layer:make_iterator(
         Shard, Stream, parse_topic(TopicFilter), StartTime
     ),
-    Loop = fun F(It, 0) ->
-                   error({too_many_iterations, It});
-               F(It, N) ->
-        case emqx_ds_storage_layer:next(Shard, It, BatchSize) of
-            end_of_stream ->
-                [];
-            {ok, _NextIt, []} ->
-                [];
-            {ok, NextIt, Batch} ->
-                Batch ++ F(NextIt, N - 1)
-        end
+    Loop = fun
+        F(It, 0) ->
+            error({too_many_iterations, It});
+        F(It, N) ->
+            case emqx_ds_storage_layer:next(Shard, It, BatchSize) of
+                end_of_stream ->
+                    [];
+                {ok, _NextIt, []} ->
+                    [];
+                {ok, NextIt, Batch} ->
+                    Batch ++ F(NextIt, N - 1)
+            end
     end,
-    MaxIterations = 1000,
+    MaxIterations = 1000000,
     Loop(Iterator, MaxIterations).
 
-%% Smoke test for iteration with wildcard topic filter
-%% t_iterate_wildcard(_Config) ->
-%%     %% Prepare data:
-%%     Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
-%%     Timestamps = lists:seq(1, 10),
-%%     _ = [
-%%         store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
-%%      || Topic <- Topics, PublishedAt <- Timestamps
-%%     ],
-%%     ?assertEqual(
-%%         lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- Timestamps]),
-%%         lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 0)])
-%%     ),
-%%     ?assertEqual(
-%%         [],
-%%         lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 10 + 1)])
-%%     ),
-%%     ?assertEqual(
-%%         lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- lists:seq(5, 10)]),
-%%         lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 5)])
-%%     ),
-%%     ?assertEqual(
-%%         lists:sort([
-%%             {Topic, PublishedAt}
-%%          || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
-%%         ]),
-%%         lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)])
-%%     ),
-%%     ?assertEqual(
-%%         lists:sort([{"foo/bar", PublishedAt} || PublishedAt <- Timestamps]),
-%%         lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+", 0)])
-%%     ),
-%%     ?assertEqual(
-%%         [],
-%%         lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+/bar", 0)])
-%%     ),
-%%     ?assertEqual(
-%%         lists:sort([
-%%             {Topic, PublishedAt}
-%%          || Topic <- ["foo/bar", "foo/bar/baz", "a/bar"], PublishedAt <- Timestamps
-%%         ]),
-%%         lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "+/bar/#", 0)])
-%%     ),
-%%     ?assertEqual(
-%%         lists:sort([{Topic, PublishedAt} || Topic <- ["a", "a/bar"], PublishedAt <- Timestamps]),
-%%         lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 0)])
-%%     ),
-%%     ?assertEqual(
-%%         [],
-%%         lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/+/+", 0)])
-%%     ),
-%%     ok.
-
 %% t_create_gen(_Config) ->
 %%     {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG),
 %%     ?assertEqual(