Просмотр исходного кода

feat(ds): Implement keyspace partitioning across time

Andrew Mayorov 3 лет назад
Родитель
Сommit
5e30a5d3dd

+ 232 - 124
apps/emqx_replay/src/emqx_replay_message_storage.erl

@@ -27,9 +27,8 @@
 %% Debug/troubleshooting:
 -export([
     make_message_key/4,
-    compute_topic_hash/2,
+    compute_bitstring/3,
     compute_hash_bitmask/2,
-    combine/4,
     hash/2
 ]).
 
@@ -104,19 +103,27 @@
     keymapper :: keymapper(),
     next_action :: {seek, binary()} | next,
     topic_filter :: emqx_topic:words(),
-    hash_filter :: integer(),
+    hash_bitfilter :: integer(),
     hash_bitmask :: integer(),
-    start_time :: time()
+    time_bitfilter :: integer(),
+    time_bitmask :: integer()
 }).
 
 % NOTE
 % Keymapper decides how to map messages into RocksDB column family keyspace.
 -record(keymapper, {
-    topic_bits :: bits(),
-    topic_bits_per_level :: bits_per_level(),
-    timestamp_bits :: bits()
+    source :: [bitsource(), ...],
+    bitsize :: bits(),
+    tau :: non_neg_integer()
 }).
 
+-type bitsource() ::
+    %% Consume `_Size` bits from timestamp starting at `_Offset`th bit.
+    %% TODO consistency
+    {timestamp, _Offset :: bits(), _Size :: bits()}
+    %% Consume next topic level (either one or all of them) and compute `_Size` bits-wide hash.
+    | {hash, level | levels, _Size :: bits()}.
+
 -opaque db() :: #db{}.
 -opaque iterator() :: #it{}.
 -type keymapper() :: #keymapper{}.
@@ -162,18 +169,32 @@ close(#db{handle = DB}) ->
 
 -spec make_keymapper(Options) -> keymapper() when
     Options :: #{
-        %% Number of bits in a key allocated to a message timestamp.
+        %% Number of bits in a message timestamp.
         timestamp_bits := bits(),
         %% Number of bits in a key allocated to each level in a message topic.
-        topic_bits_per_level := bits_per_level()
+        topic_bits_per_level := bits_per_level(),
+        %% Maximum granularity of iteration over time.
+        max_tau := time()
     }.
-make_keymapper(Options) ->
-    TimestampBits = maps:get(timestamp_bits, Options),
-    TopicBitsPerLevel = maps:get(topic_bits_per_level, Options),
+make_keymapper(#{
+    timestamp_bits := TimestampBits,
+    topic_bits_per_level := BitsPerLevel,
+    max_tau := MaxTau
+}) ->
+    TimestampLSBs = floor(math:log2(MaxTau)),
+    TimestampMSBs = TimestampBits - TimestampLSBs,
+    NLevels = length(BitsPerLevel),
+    {LevelBits, [TailLevelsBits]} = lists:split(NLevels - 1, BitsPerLevel),
+    Source = lists:flatten([
+        {timestamp, TimestampLSBs, TimestampMSBs},
+        [{hash, level, Bits} || Bits <- LevelBits],
+        {hash, levels, TailLevelsBits},
+        [{timestamp, 0, TimestampLSBs} || TimestampLSBs > 0]
+    ]),
     #keymapper{
-        timestamp_bits = TimestampBits,
-        topic_bits = lists:sum(TopicBitsPerLevel),
-        topic_bits_per_level = TopicBitsPerLevel
+        source = Source,
+        bitsize = lists:sum([S || {_, _, S} <- Source]),
+        tau = 1 bsl TimestampLSBs
     }.
 
 -spec store(db(), emqx_guid:guid(), time(), topic(), binary()) ->
@@ -190,18 +211,21 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic,
 make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime) ->
     case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of
         {ok, ITHandle} ->
-            Hash = compute_topic_hash(TopicFilter, DB#db.keymapper),
+            Bitstring = compute_bitstring(TopicFilter, StartTime, DB#db.keymapper),
             HashBitmask = compute_hash_bitmask(TopicFilter, DB#db.keymapper),
-            HashFilter = Hash band HashBitmask,
-            InitialSeek = combine(HashFilter, StartTime, <<>>, DB#db.keymapper),
+            TimeBitmask = compute_time_bitmask(DB#db.keymapper),
+            HashBitfilter = Bitstring band HashBitmask,
+            TimeBitfilter = Bitstring band TimeBitmask,
+            InitialSeek = combine(HashBitfilter bor TimeBitfilter, <<>>, DB#db.keymapper),
             {ok, #it{
                 handle = ITHandle,
                 keymapper = DB#db.keymapper,
                 next_action = {seek, InitialSeek},
                 topic_filter = TopicFilter,
-                start_time = StartTime,
-                hash_filter = HashFilter,
-                hash_bitmask = HashBitmask
+                hash_bitfilter = HashBitfilter,
+                hash_bitmask = HashBitmask,
+                time_bitfilter = TimeBitfilter,
+                time_bitmask = TimeBitmask
             }};
         Err ->
             Err
@@ -212,8 +236,8 @@ next(It = #it{next_action = Action}) ->
     case rocksdb:iterator_move(It#it.handle, Action) of
         % spec says `{ok, Key}` is also possible but the implementation says it's not
         {ok, Key, Value} ->
-            {TopicHash, PublishedAt} = extract(Key, It#it.keymapper),
-            match_next(It, TopicHash, PublishedAt, Value);
+            Bitstring = extract(Key, It#it.keymapper),
+            match_next(It, Bitstring, Value);
         {error, invalid_iterator} ->
             stop_iteration(It);
         {error, iterator_closed} ->
@@ -225,7 +249,7 @@ next(It = #it{next_action = Action}) ->
 %%================================================================================
 
 make_message_key(Topic, PublishedAt, MessageID, Keymapper) ->
-    combine(compute_topic_hash(Topic, Keymapper), PublishedAt, MessageID, Keymapper).
+    combine(compute_bitstring(Topic, PublishedAt, Keymapper), MessageID, Keymapper).
 
 make_message_value(Topic, MessagePayload) ->
     term_to_binary({Topic, MessagePayload}).
@@ -233,61 +257,74 @@ make_message_value(Topic, MessagePayload) ->
 unwrap_message_value(Binary) ->
     binary_to_term(Binary).
 
--spec combine(_TopicHash :: integer(), time(), emqx_guid:guid(), keymapper()) ->
+-spec combine(_Bitstring :: integer(), emqx_guid:guid(), keymapper()) ->
     key().
-combine(TopicHash, PublishedAt, MessageID, #keymapper{
-    timestamp_bits = TimestampBits,
-    topic_bits = TopicBits
-}) ->
-    <<TopicHash:TopicBits/integer, PublishedAt:TimestampBits/integer, MessageID/binary>>.
+combine(Bitstring, MessageID, #keymapper{bitsize = Size}) ->
+    <<Bitstring:Size/integer, MessageID/binary>>.
 
 -spec extract(key(), keymapper()) ->
-    {_TopicHash :: integer(), time()}.
-extract(Key, #keymapper{
-    timestamp_bits = TimestampBits,
-    topic_bits = TopicBits
-}) ->
-    <<TopicHash:TopicBits/integer, PublishedAt:TimestampBits/integer, _MessageID/binary>> = Key,
-    {TopicHash, PublishedAt}.
+    _Bitstring :: integer().
+extract(Key, #keymapper{bitsize = Size}) ->
+    <<Bitstring:Size/integer, _MessageID/binary>> = Key,
+    Bitstring.
+
+-spec compute_bitstring(topic(), time(), keymapper()) -> integer().
+compute_bitstring(Topic, Timestamp, #keymapper{source = Source}) ->
+    compute_bitstring(Topic, Timestamp, Source, 0).
+
+-spec compute_hash_bitmask(emqx_topic:words(), keymapper()) -> integer().
+compute_hash_bitmask(TopicFilter, #keymapper{source = Source}) ->
+    compute_hash_bitmask(TopicFilter, Source, 0).
 
-compute_topic_hash(Topic, Keymapper) ->
-    compute_topic_hash(Topic, Keymapper#keymapper.topic_bits_per_level, 0).
+-spec compute_time_bitmask(keymapper()) -> integer().
+compute_time_bitmask(#keymapper{source = Source}) ->
+    compute_time_bitmask(Source, 0).
 
 hash(Input, Bits) ->
     % at most 32 bits
     erlang:phash2(Input, 1 bsl Bits).
 
--spec compute_hash_bitmask(emqx_topic:words(), keymapper()) -> integer().
-compute_hash_bitmask(TopicFilter, Keymapper) ->
-    compute_hash_bitmask(TopicFilter, Keymapper#keymapper.topic_bits_per_level, 0).
-
 %%================================================================================
 %% Internal functions
 %%================================================================================
 
-compute_topic_hash(LevelsRest, [Bits], Acc) ->
-    Hash = hash(LevelsRest, Bits),
-    Acc bsl Bits + Hash;
-compute_topic_hash([], [Bits | BitsRest], Acc) ->
-    Hash = hash(<<"/">>, Bits),
-    compute_topic_hash([], BitsRest, Acc bsl Bits + Hash);
-compute_topic_hash([Level | LevelsRest], [Bits | BitsRest], Acc) ->
-    Hash = hash(Level, Bits),
-    compute_topic_hash(LevelsRest, BitsRest, Acc bsl Bits + Hash).
-
-compute_hash_bitmask(['#'], BitsPerLevel, Acc) ->
-    Acc bsl lists:sum(BitsPerLevel) + 0;
-compute_hash_bitmask(['+' | LevelsRest], [Bits | BitsRest], Acc) ->
-    compute_hash_bitmask(LevelsRest, BitsRest, Acc bsl Bits + 0);
-compute_hash_bitmask(_, [Bits], Acc) ->
-    Acc bsl Bits + ones(Bits);
-compute_hash_bitmask([], [Bits | BitsRest], Acc) ->
-    compute_hash_bitmask([], BitsRest, Acc bsl Bits + ones(Bits));
-compute_hash_bitmask([_ | LevelsRest], [Bits | BitsRest], Acc) ->
-    compute_hash_bitmask(LevelsRest, BitsRest, Acc bsl Bits + ones(Bits));
+compute_bitstring(Topic, Timestamp, [{timestamp, Offset, Size} | Rest], Acc) ->
+    I = (Timestamp bsr Offset) band ones(Size),
+    compute_bitstring(Topic, Timestamp, Rest, (Acc bsl Size) + I);
+compute_bitstring([], Timestamp, [{hash, level, Size} | Rest], Acc) ->
+    I = hash(<<"/">>, Size),
+    compute_bitstring([], Timestamp, Rest, (Acc bsl Size) + I);
+compute_bitstring([Level | Tail], Timestamp, [{hash, level, Size} | Rest], Acc) ->
+    I = hash(Level, Size),
+    compute_bitstring(Tail, Timestamp, Rest, (Acc bsl Size) + I);
+compute_bitstring(Tail, Timestamp, [{hash, levels, Size} | Rest], Acc) ->
+    I = hash(Tail, Size),
+    compute_bitstring(Tail, Timestamp, Rest, (Acc bsl Size) + I);
+compute_bitstring(_, _, [], Acc) ->
+    Acc.
+
+compute_hash_bitmask(Filter, [{timestamp, _, Size} | Rest], Acc) ->
+    compute_hash_bitmask(Filter, Rest, (Acc bsl Size) + 0);
+compute_hash_bitmask(['#'], [{hash, _, Size} | Rest], Acc) ->
+    compute_hash_bitmask(['#'], Rest, (Acc bsl Size) + 0);
+compute_hash_bitmask(['+' | Tail], [{hash, _, Size} | Rest], Acc) ->
+    compute_hash_bitmask(Tail, Rest, (Acc bsl Size) + 0);
+compute_hash_bitmask([], [{hash, level, Size} | Rest], Acc) ->
+    compute_hash_bitmask([], Rest, (Acc bsl Size) + ones(Size));
+compute_hash_bitmask([_ | Tail], [{hash, level, Size} | Rest], Acc) ->
+    compute_hash_bitmask(Tail, Rest, (Acc bsl Size) + ones(Size));
+compute_hash_bitmask(_, [{hash, levels, Size} | Rest], Acc) ->
+    compute_hash_bitmask([], Rest, (Acc bsl Size) + ones(Size));
 compute_hash_bitmask(_, [], Acc) ->
     Acc.
 
+compute_time_bitmask([{timestamp, _, Size} | Rest], Acc) ->
+    compute_time_bitmask(Rest, (Acc bsl Size) + ones(Size));
+compute_time_bitmask([{hash, _, Size} | Rest], Acc) ->
+    compute_time_bitmask(Rest, (Acc bsl Size) + 0);
+compute_time_bitmask([], Acc) ->
+    Acc.
+
 ones(Bits) ->
     1 bsl Bits - 1.
 
@@ -308,16 +345,16 @@ match_next(
     It = #it{
         keymapper = Keymapper,
         topic_filter = TopicFilter,
-        hash_filter = HashFilter,
+        hash_bitfilter = HashBitfilter,
         hash_bitmask = HashBitmask,
-        start_time = StartTime
+        time_bitfilter = TimeBitfilter,
+        time_bitmask = TimeBitmask
     },
-    TopicHash,
-    PublishedAt,
+    Bitstring,
     Value
 ) ->
-    HashMatches = (TopicHash band It#it.hash_bitmask) == It#it.hash_filter,
-    TimeMatches = PublishedAt >= It#it.start_time,
+    HashMatches = (Bitstring band HashBitmask) == HashBitfilter,
+    TimeMatches = (Bitstring band TimeBitmask) >= TimeBitfilter,
     case HashMatches of
         true when TimeMatches ->
             {Topic, MessagePayload} = unwrap_message_value(Value),
@@ -327,13 +364,20 @@ match_next(
                 false ->
                     next(It#it{next_action = next})
             end;
-        true ->
-            NextSeek = combine(TopicHash, StartTime, <<>>, Keymapper),
+        true when not TimeMatches ->
+            NextBitstring = (Bitstring band (bnot TimeBitmask)) bor TimeBitfilter,
+            NextSeek = combine(NextBitstring, <<>>, Keymapper),
             next(It#it{next_action = {seek, NextSeek}});
         false ->
-            case compute_next_seek(TopicHash, HashFilter, HashBitmask, Keymapper) of
-                NextHash when is_integer(NextHash) ->
-                    NextSeek = combine(NextHash, StartTime, <<>>, Keymapper),
+            % _ ->
+            case compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) of
+                NextBitstring when is_integer(NextBitstring) ->
+                    % ct:pal("Bitstring     = ~32.16.0B", [Bitstring]),
+                    % ct:pal("Bitfilter     = ~32.16.0B", [Bitfilter]),
+                    % ct:pal("HBitmask      = ~32.16.0B", [HashBitmask]),
+                    % ct:pal("TBitmask      = ~32.16.0B", [TimeBitmask]),
+                    % ct:pal("NextBitstring = ~32.16.0B", [NextBitstring]),
+                    NextSeek = combine(NextBitstring, <<>>, Keymapper),
                     next(It#it{next_action = {seek, NextSeek}});
                 none ->
                     stop_iteration(It)
@@ -344,10 +388,12 @@ stop_iteration(It) ->
     ok = rocksdb:iterator_close(It#it.handle),
     none.
 
-compute_next_seek(TopicHash, HashFilter, HashBitmask, Keymapper = #keymapper{}) ->
-    BitsPerLevel = Keymapper#keymapper.topic_bits_per_level,
-    compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel);
-compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel) ->
+compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) ->
+    Sources = Keymapper#keymapper.source,
+    Size = Keymapper#keymapper.bitsize,
+    compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size).
+
+compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) ->
     % NOTE
     % Ok, this convoluted mess implements a sort of _increment operation_ for some
     % strange number in variable bit-width base. There are `Levels` "digits", those
@@ -356,66 +402,117 @@ compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel) ->
     % with exacly one possible value).
     % TODO make at least remotely readable / optimize later
     Result = zipfoldr3(
-        fun(LevelHash, Filter, LevelMask, Bits, Shift, {Carry, Acc}) ->
-            case LevelMask of
-                0 when Carry == 0 ->
-                    {0, Acc + (LevelHash bsl Shift)};
-                0 ->
-                    LevelHash1 = LevelHash + Carry,
-                    NextCarry = LevelHash1 bsr Bits,
-                    NextAcc = (LevelHash1 band ones(Bits)) bsl Shift,
-                    {NextCarry, NextAcc};
-                _ when (LevelHash + Carry) == Filter ->
-                    {0, Acc + (Filter bsl Shift)};
-                _ when (LevelHash + Carry) > Filter ->
-                    {1, Filter bsl Shift};
-                _ ->
-                    {0, Filter bsl Shift}
+        fun(Source, Substring, Filter, LBitmask, Offset, {Carry, Acc}) ->
+            case Source of
+                {hash, _, _} when LBitmask =:= 0, Carry =:= 0 ->
+                    {0, Acc + (Substring bsl Offset)};
+                {hash, _, S} when LBitmask =:= 0 ->
+                    Substring1 = Substring + Carry,
+                    Carry1 = Substring1 bsr S,
+                    Acc1 = (Substring1 band ones(S)) bsl Offset,
+                    {Carry1, Acc1};
+                {hash, _, _} when LBitmask =/= 0, (Substring + Carry) =:= Filter ->
+                    {0, Acc + (Filter bsl Offset)};
+                {hash, _, _} when LBitmask =/= 0, (Substring + Carry) > Filter ->
+                    {1, Filter bsl Offset};
+                {hash, _, _} when LBitmask =/= 0 ->
+                    {0, Filter bsl Offset};
+                {timestamp, _, _} when Carry =:= 0 ->
+                    {0, Acc + (Substring bsl Offset)};
+                {timestamp, _, S} ->
+                    Substring1 = Substring + Carry,
+                    Carry1 = Substring1 bsr S,
+                    Acc1 = (Substring1 band ones(S)) bsl Offset,
+                    {Carry1, Acc1}
             end
         end,
+        % TODO
+        % We can put carry bit into the `Acc`'s MSB instead of wrapping it into a tuple.
+        % This could save us a heap alloc which might be imporatant in a hot path.
         {1, 0},
-        TopicHash,
-        HashFilter,
+        Bitstring,
+        HashBitfilter,
         HashBitmask,
-        BitsPerLevel
+        Size,
+        Sources
     ),
     case Result of
-        {_, {_Carry = 0, Next}} ->
-            Next bor HashFilter;
-        {_, {_Carry = 1, _}} ->
+        {_Carry = 0, Next} ->
+            Next bor (HashBitfilter band HashBitmask);
+        {_Carry = 1, _} ->
             % we got "carried away" past the range, time to stop iteration
             none
     end.
 
-zipfoldr3(_FoldFun, Acc, _, _, _, []) ->
-    {0, Acc};
-zipfoldr3(FoldFun, Acc, I1, I2, I3, [Bits | Rest]) ->
-    {Shift, AccNext} = zipfoldr3(
-        FoldFun,
-        Acc,
-        I1,
-        I2,
-        I3,
-        Rest
-    ),
-    {
-        Shift + Bits,
-        FoldFun(
-            (I1 bsr Shift) band ones(Bits),
-            (I2 bsr Shift) band ones(Bits),
-            (I3 bsr Shift) band ones(Bits),
-            Bits,
-            Shift,
-            AccNext
-        )
-    }.
+zipfoldr3(_FoldFun, Acc, _, _, _, 0, []) ->
+    Acc;
+zipfoldr3(FoldFun, Acc, I1, I2, I3, Offset, [Source = {_, _, S} | Rest]) ->
+    OffsetNext = Offset - S,
+    AccNext = zipfoldr3(FoldFun, Acc, I1, I2, I3, OffsetNext, Rest),
+    FoldFun(
+        Source,
+        substring(I1, OffsetNext, S),
+        substring(I2, OffsetNext, S),
+        substring(I3, OffsetNext, S),
+        OffsetNext,
+        AccNext
+    ).
+
+substring(I, Offset, Size) ->
+    (I bsr Offset) band ones(Size).
 
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").
 
+make_keymapper_test_() ->
+    [
+        ?_assertEqual(
+            #keymapper{
+                source = [
+                    {timestamp, 9, 23},
+                    {hash, level, 2},
+                    {hash, level, 4},
+                    {hash, levels, 8},
+                    {timestamp, 0, 9}
+                ],
+                bitsize = 46,
+                tau = 512
+            },
+            make_keymapper(#{
+                timestamp_bits => 32,
+                topic_bits_per_level => [2, 4, 8],
+                max_tau => 1000
+            })
+        ),
+        ?_assertEqual(
+            #keymapper{
+                source = [
+                    {timestamp, 0, 32},
+                    {hash, levels, 16}
+                ],
+                bitsize = 48,
+                tau = 1
+            },
+            make_keymapper(#{
+                timestamp_bits => 32,
+                topic_bits_per_level => [16],
+                max_tau => 1
+            })
+        )
+    ].
+
 compute_test_bitmask(TopicFilter) ->
-    compute_hash_bitmask(TopicFilter, [3, 4, 5, 2], 0).
+    compute_hash_bitmask(
+        TopicFilter,
+        [
+            {hash, level, 3},
+            {hash, level, 4},
+            {hash, level, 5},
+            {hash, levels, 2}
+        ],
+        0
+    ).
 
 bitmask_test_() ->
     [
@@ -464,8 +561,19 @@ wildcard_bitmask_test_() ->
 %% Key3   = |123|999|679|001| → Seek = 1 |123|000|678|000| → eos
 %% Key4   = |125|011|179|017| → Seek = 1 |123|000|678|000| → eos
 
-compute_test_next_seek(TopicHash, HashFilter, HashBitmask) ->
-    compute_next_seek(TopicHash, HashFilter, HashBitmask, [8, 8, 16, 12]).
+compute_test_next_seek(Bitstring, Bitfilter, HBitmask) ->
+    compute_next_seek(
+        Bitstring,
+        Bitfilter,
+        HBitmask,
+        [
+            {hash, level, 8},
+            {hash, level, 8},
+            {hash, level, 16},
+            {hash, levels, 12}
+        ],
+        8 + 8 + 16 + 12
+    ).
 
 next_seek_test_() ->
     [

+ 13 - 10
apps/emqx_replay/test/emqx_replay_storage_SUITE.erl

@@ -52,7 +52,7 @@ t_iterate(Config) ->
         begin
             {ok, It} = emqx_replay_message_storage:make_iterator(DB, Topic, 0),
             Values = iterate(It),
-            ?assertEqual(Values, lists:map(fun integer_to_binary/1, Timestamps))
+            ?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values)
         end
      || Topic <- Topics
     ],
@@ -137,28 +137,30 @@ parse_topic(Topic) ->
 
 t_prop_topic_hash_computes(_) ->
     Keymapper = emqx_replay_message_storage:make_keymapper(#{
+        timestamp_bits => 32,
         topic_bits_per_level => [8, 12, 16, 24],
-        timestamp_bits => 0
+        max_tau => 10000
     }),
     ?assert(
         proper:quickcheck(
-            ?FORALL(Topic, topic(), begin
-                Hash = emqx_replay_message_storage:compute_topic_hash(Topic, Keymapper),
-                is_integer(Hash) andalso (byte_size(binary:encode_unsigned(Hash)) =< 8)
+            ?FORALL({Topic, Timestamp}, {topic(), integer()}, begin
+                BS = emqx_replay_message_storage:compute_bitstring(Topic, Timestamp, Keymapper),
+                is_integer(BS) andalso (BS < (1 bsl 92))
             end)
         )
     ).
 
 t_prop_hash_bitmask_computes(_) ->
     Keymapper = emqx_replay_message_storage:make_keymapper(#{
-        topic_bits_per_level => [8, 12, 16, 24],
-        timestamp_bits => 0
+        timestamp_bits => 16,
+        topic_bits_per_level => [8, 12, 16],
+        max_tau => 100
     }),
     ?assert(
         proper:quickcheck(
             ?FORALL(TopicFilter, topic_filter(), begin
-                Hash = emqx_replay_message_storage:compute_hash_bitmask(TopicFilter, Keymapper),
-                is_integer(Hash) andalso (byte_size(binary:encode_unsigned(Hash)) =< 8)
+                Mask = emqx_replay_message_storage:compute_hash_bitmask(TopicFilter, Keymapper),
+                is_integer(Mask) andalso (Mask < (1 bsl (36 + 6)))
             end)
         )
     ).
@@ -252,8 +254,9 @@ init_per_testcase(TC, Config) ->
     {ok, DB} = emqx_replay_message_storage:open(Filename, #{
         column_family => {atom_to_list(TC), []},
         keymapper => emqx_replay_message_storage:make_keymapper(#{
+            timestamp_bits => 64,
             topic_bits_per_level => [8, 8, 32, 16],
-            timestamp_bits => 64
+            max_tau => 5
         })
     }),
     [{handle, DB} | Config].