Przeglądaj źródła

chore(ds): Attempt to make `compute_next_seek`'s logic clearer

Andrew Mayorov 3 lat temu
rodzic
commit
a0bcdb5104

+ 119 - 55
apps/emqx_replay/src/emqx_replay_message_storage.erl

@@ -102,6 +102,9 @@
     make_message_key/4,
     compute_bitstring/3,
     compute_hash_bitmask/2,
+    compute_next_seek/4,
+    compute_time_seek/3,
+    compute_hash_seek/4,
     hash/2
 ]).
 
@@ -231,12 +234,12 @@ make_keymapper(#{
     topic_bits_per_level := BitsPerLevel,
     epoch := MaxEpoch
 }) ->
-    TimestampLSBs = floor(math:log2(MaxEpoch)),
+    TimestampLSBs = min(TimestampBits, floor(math:log2(MaxEpoch))),
     TimestampMSBs = TimestampBits - TimestampLSBs,
     NLevels = length(BitsPerLevel),
     {LevelBits, [TailLevelsBits]} = lists:split(NLevels - 1, BitsPerLevel),
     Source = lists:flatten([
-        {timestamp, TimestampLSBs, TimestampMSBs},
+        [{timestamp, TimestampLSBs, TimestampMSBs} || TimestampMSBs > 0],
         [{hash, level, Bits} || Bits <- LevelBits],
         {hash, levels, TailLevelsBits},
         [{timestamp, 0, TimestampLSBs} || TimestampLSBs > 0]
@@ -408,8 +411,8 @@ match_next(
 ) ->
     HashMatches = (Bitstring band HashBitmask) == HashBitfilter,
     TimeMatches = (Bitstring band TimeBitmask) >= TimeBitfilter,
-    case HashMatches of
-        true when TimeMatches ->
+    case HashMatches and TimeMatches of
+        true ->
             {Topic, MessagePayload} = unwrap_message_value(Value),
             case emqx_topic:match(Topic, TopicFilter) of
                 true ->
@@ -417,13 +420,8 @@ match_next(
                 false ->
                     next(It#it{next_action = next})
             end;
-        true when not TimeMatches ->
-            NextBitstring = (Bitstring band (bnot TimeBitmask)) bor TimeBitfilter,
-            NextSeek = combine(NextBitstring, <<>>, Keymapper),
-            next(It#it{next_action = {seek, NextSeek}});
         false ->
-            % _ ->
-            case compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) of
+            case compute_next_seek(HashMatches, TimeMatches, Bitstring, It) of
                 NextBitstring when is_integer(NextBitstring) ->
                     % ct:pal("Bitstring     = ~32.16.0B", [Bitstring]),
                     % ct:pal("Bitfilter     = ~32.16.0B", [Bitfilter]),
@@ -441,62 +439,128 @@ stop_iteration(It) ->
     ok = rocksdb:iterator_close(It#it.handle),
     none.
 
-compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) ->
+%% `Bitstring` is out of the hash space defined by `HashBitfilter`.
+compute_next_seek(_HashMatches = false, _, Bitstring, It) ->
+    NextBitstring = compute_hash_seek(
+        Bitstring,
+        It#it.hash_bitfilter,
+        It#it.hash_bitmask,
+        It#it.keymapper
+    ),
+    case NextBitstring of
+        none ->
+            none;
+        _ ->
+            TimeMatches = (NextBitstring band It#it.time_bitmask) >= It#it.time_bitfilter,
+            compute_next_seek(true, TimeMatches, NextBitstring, It)
+    end;
+%% `Bitstring` is out of the time range defined by `TimeBitfilter`.
+compute_next_seek(_HashMatches = true, _TimeMatches = false, Bitstring, It) ->
+    compute_time_seek(Bitstring, It#it.time_bitfilter, It#it.time_bitmask);
+compute_next_seek(true, true, Bitstring, _It) ->
+    Bitstring.
+
+compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask) ->
+    % Replace the bits of the timestamp in `Bistring` with bits from `Timebitfilter`.
+    (Bitstring band (bnot TimeBitmask)) bor TimeBitfilter.
+
+%% Find the closest bitstring which is:
+%% * greater than `Bitstring`,
+%% * and falls into the hash space defined by `HashBitfilter`.
+%% Note that the result can end up "back" in time and out of the time range.
+compute_hash_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) ->
     Sources = Keymapper#keymapper.source,
     Size = Keymapper#keymapper.bitsize,
-    compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size).
+    compute_hash_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size).
 
-compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) ->
+compute_hash_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) ->
     % NOTE
-    % Ok, this convoluted mess implements a sort of _increment operation_ for some
-    % strange number in variable bit-width base. There are `Levels` "digits", those
-    % with `0` level bitmask have `BitsPerLevel` bit-width and those with `111...`
-    % level bitmask have in some sense 0 bits (because they are fixed "digits"
-    % with exacly one possible value).
-    % TODO make at least remotely readable / optimize later
-    Result = zipfoldr3(
-        fun(Source, Substring, Filter, LBitmask, Offset, {Carry, Acc}) ->
+    % We're iterating through `Substring` here, in lockstep with `HashBitfilter`
+    % and`HashBitmask`, starting from least signigicant bits. Each bitsource in
+    % `Sources` has a bitsize `S` and, accordingly, gives us a sub-bitstring `S`
+    % bits long which we interpret as a "digit". There are 2 flavors of those
+    % "digits":
+    %  * regular digit with 2^S possible values,
+    %  * degenerate digit with exactly 1 possible value U (represented with 0).
+    % Our goal here is to find a successor of `Bistring` and perform a kind of
+    % digit-by-digit addition operation with carry propagation.
+    NextSeek = zipfoldr3(
+        fun(Source, Substring, Filter, LBitmask, Offset, Acc) ->
             case Source of
-                {hash, _, _} when LBitmask =:= 0, Carry =:= 0 ->
-                    {0, Acc + (Substring bsl Offset)};
                 {hash, _, S} when LBitmask =:= 0 ->
-                    Substring1 = Substring + Carry,
-                    Carry1 = Substring1 bsr S,
-                    Acc1 = (Substring1 band ones(S)) bsl Offset,
-                    {Carry1, Acc1};
-                {hash, _, _} when LBitmask =/= 0, (Substring + Carry) =:= Filter ->
-                    {0, Acc + (Filter bsl Offset)};
-                {hash, _, _} when LBitmask =/= 0, (Substring + Carry) > Filter ->
-                    {1, Filter bsl Offset};
-                {hash, _, _} when LBitmask =/= 0 ->
-                    {0, Filter bsl Offset};
-                {timestamp, _, _} when Carry =:= 0 ->
-                    {0, Acc + (Substring bsl Offset)};
+                    % Regular case
+                    bitwise_add_digit(Substring, Acc, S, Offset);
+                {hash, _, _} when LBitmask =/= 0, Substring < Filter ->
+                    % Degenerate case, I_digit < U, no overflow.
+                    % Successor is `U bsl Offset` which is equivalent to 0.
+                    0;
+                {hash, _, S} when LBitmask =/= 0, Substring > Filter ->
+                    % Degenerate case, I_digit > U, overflow.
+                    % Successor is `(1 bsl Size + U) bsl Offset`.
+                    overflow_digit(S, Offset);
+                {hash, _, S} when LBitmask =/= 0 ->
+                    % Degenerate case, I_digit = U
+                    % Perform digit addition with I_digit = 0, assuming "digit" has
+                    % 0 bits of information (but is `S` bits long at the same time).
+                    % This will overflow only if the result of previous iteration
+                    % was an overflow.
+                    bitwise_add_digit(0, Acc, 0, S, Offset);
                 {timestamp, _, S} ->
-                    Substring1 = Substring + Carry,
-                    Carry1 = Substring1 bsr S,
-                    Acc1 = (Substring1 band ones(S)) bsl Offset,
-                    {Carry1, Acc1}
+                    % Regular case
+                    bitwise_add_digit(Substring, Acc, S, Offset)
             end
         end,
-        % TODO
-        % We can put carry bit into the `Acc`'s MSB instead of wrapping it into a tuple.
-        % This could save us a heap alloc which might be imporatant in a hot path.
-        {1, 0},
+        0,
         Bitstring,
         HashBitfilter,
         HashBitmask,
         Size,
         Sources
     ),
-    case Result of
-        {_Carry = 0, Next} ->
-            Next bor (HashBitfilter band HashBitmask);
-        {_Carry = 1, _} ->
-            % we got "carried away" past the range, time to stop iteration
+    case NextSeek bsr Size of
+        _Carry = 0 ->
+            % Found the successor.
+            % We need to recover values of those degenerate digits which we
+            % represented with 0 during digit-by-digit iteration.
+            NextSeek bor (HashBitfilter band HashBitmask);
+        _Carry = 1 ->
+            % We got "carried away" past the range, time to stop iteration.
             none
     end.
 
+bitwise_add_digit(Digit, Number, Width, Offset) ->
+    bitwise_add_digit(Digit, Number, Width, Width, Offset).
+
+%% Add "digit" (represented with integer `Digit`) to the `Number` assuming
+%% this digit starts at `Offset` bits in `Number` and is `Width` bits long.
+%% Perform an overflow if the result of addition would not fit into `Bits`
+%% bits.
+bitwise_add_digit(Digit, Number, Bits, Width, Offset) ->
+    Sum = (Digit bsl Offset) + Number,
+    case (Sum bsr Offset) < (1 bsl Bits) of
+        true -> Sum;
+        false -> overflow_digit(Width, Offset)
+    end.
+
+%% Constuct a number which denotes an overflow of digit that starts at
+%% `Offset` bits and is `Width` bits long.
+overflow_digit(Width, Offset) ->
+    (1 bsl Width) bsl Offset.
+
+%% Iterate through sub-bitstrings of 3 integers in lockstep, starting from least
+%% significant bits first.
+%%
+%% Each integer is assumed to be `Size` bits long. Lengths of sub-bitstring are
+%% specified in `Sources` list, in order from most significant bits to least
+%% significant. Each iteration calls `FoldFun` with:
+%% * bitsource that was used to extract sub-bitstrings,
+%% * 3 sub-bitstrings in integer representation,
+%% * bit offset into integers,
+%% * current accumulator.
+-spec zipfoldr3(FoldFun, Acc, integer(), integer(), integer(), _Size :: bits(), [bitsource()]) ->
+    Acc
+when
+    FoldFun :: fun((bitsource(), integer(), integer(), integer(), _Offset :: bits(), Acc) -> Acc).
 zipfoldr3(_FoldFun, Acc, _, _, _, 0, []) ->
     Acc;
 zipfoldr3(FoldFun, Acc, I1, I2, I3, Offset, [Source = {_, _, S} | Rest]) ->
@@ -619,8 +683,8 @@ wildcard_bitmask_test_() ->
 %% Key3   = |123|999|679|001| → Seek = 1 |123|000|678|000| → eos
 %% Key4   = |125|011|179|017| → Seek = 1 |123|000|678|000| → eos
 
-compute_test_next_seek(Bitstring, Bitfilter, HBitmask) ->
-    compute_next_seek(
+compute_test_hash_seek(Bitstring, Bitfilter, HBitmask) ->
+    compute_hash_seek(
         Bitstring,
         Bitfilter,
         HBitmask,
@@ -637,7 +701,7 @@ next_seek_test_() ->
     [
         ?_assertMatch(
             none,
-            compute_test_next_seek(
+            compute_test_hash_seek(
                 16#FD_42_4242_043,
                 16#FD_42_4242_042,
                 16#FF_FF_FFFF_FFF
@@ -645,7 +709,7 @@ next_seek_test_() ->
         ),
         ?_assertMatch(
             16#FD_11_0678_000,
-            compute_test_next_seek(
+            compute_test_hash_seek(
                 16#FD_11_0108_121,
                 16#FD_00_0678_000,
                 16#FF_00_FFFF_000
@@ -653,7 +717,7 @@ next_seek_test_() ->
         ),
         ?_assertMatch(
             16#FD_12_0678_000,
-            compute_test_next_seek(
+            compute_test_hash_seek(
                 16#FD_11_0679_919,
                 16#FD_00_0678_000,
                 16#FF_00_FFFF_000
@@ -661,7 +725,7 @@ next_seek_test_() ->
         ),
         ?_assertMatch(
             none,
-            compute_test_next_seek(
+            compute_test_hash_seek(
                 16#FD_FF_0679_001,
                 16#FD_00_0678_000,
                 16#FF_00_FFFF_000
@@ -669,7 +733,7 @@ next_seek_test_() ->
         ),
         ?_assertMatch(
             none,
-            compute_test_next_seek(
+            compute_test_hash_seek(
                 16#FE_11_0179_017,
                 16#FD_00_0678_000,
                 16#FF_00_FFFF_000

+ 2 - 2
apps/emqx_replay/test/emqx_replay_storage_SUITE.erl

@@ -169,7 +169,7 @@ t_prop_hash_bitmask_computes(_) ->
         )
     ).
 
-t_prop_iterate_stored_messages(Config) ->
+t_prop_iterate_stored_messages(_) ->
     ?assertEqual(
         true,
         proper:quickcheck(
@@ -256,7 +256,7 @@ init_per_testcase(TC, Config) ->
     {ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)),
     Config.
 
-end_per_testcase(_TC, Config) ->
+end_per_testcase(_TC, _Config) ->
     ok = application:stop(emqx_replay).
 
 zone(TC) ->