Просмотр исходного кода

fix(ds): Optimize emqx_ds_bitmask_keymapper:make_filter

This optimization makes idle polling faster
ieQu1 2 лет назад
Родитель
Сommit
698ba3f271

+ 10 - 2
apps/emqx/src/emqx_persistent_session_ds_inflight.erl

@@ -16,7 +16,15 @@
 -module(emqx_persistent_session_ds_inflight).
 
 %% API:
--export([new/1, push/2, pop/1, n_buffered/2, n_inflight/1, inc_send_quota/1, receive_maximum/1]).
+-export([
+    new/1,
+    push/2,
+    pop/1,
+    n_buffered/2,
+    n_inflight/1,
+    inc_send_quota/1,
+    receive_maximum/1
+]).
 
 %% internal exports:
 -export([]).
@@ -107,7 +115,7 @@ pop(Rec0) ->
             undefined
     end.
 
--spec n_buffered(0..2 | all, t()) -> non_neg_integer().
+-spec n_buffered(?QOS_0..?QOS_2 | all, t()) -> non_neg_integer().
 n_buffered(?QOS_0, #inflight{n_qos0 = NQos0}) ->
     NQos0;
 n_buffered(?QOS_1, #inflight{n_qos1 = NQos1}) ->

+ 180 - 112
apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl

@@ -95,14 +95,14 @@
 %% Notes on the terminology:
 %%
 %% - "Coordinates" of the original message (usually topic and the
-%% timestamp, like in the example above) will be referred as the
+%% timestamp, like in the example above) will be referred to as the
 %% "vector".
 %%
 %% - The 1D scalar that these coordinates are transformed to will be
-%% referred as the "scalar".
+%% referred to as the "scalar".
 %%
-%% - Binary representation of the scalar if fixed size will be
-%% referred as the "key".
+%% - Fixed-size binary representation of the scalar is called the
+%% "key".
 %%
 %%================================================================================
 
@@ -122,13 +122,15 @@
     bitsize/1
 ]).
 
--export_type([vector/0, key/0, dimension/0, offset/0, bitsize/0, bitsource/0, keymapper/0]).
+-export_type([vector/0, scalar/0, key/0, dimension/0, offset/0, bitsize/0, bitsource/0, keymapper/0]).
 
 -compile(
     {inline, [
         ones/1,
         extract/2,
-        extract_inv/2
+        extract_inv/2,
+        constr_adjust_min/2,
+        constr_adjust_max/2
     ]}
 ).
 
@@ -150,12 +152,13 @@
 %% N-th coordinate of a vector:
 -type dimension() :: pos_integer().
 
+-type key() :: binary().
+
 -type offset() :: non_neg_integer().
 
 -type bitsize() :: pos_integer().
 
-%% The resulting 1D key:
--type key() :: non_neg_integer().
+-type scalar() :: non_neg_integer().
 
 -type bitsource() ::
     %% Consume `_Size` bits from timestamp starting at `_Offset`th
@@ -177,17 +180,21 @@
 
 -type scan_action() :: #scan_action{}.
 
--type scanner() :: [[scan_action()]].
+-type scanner() :: [_CoorScanActions :: [scan_action()]].
 
 -record(keymapper, {
     %% The original schema of the transformation:
     schema :: [bitsource()],
+    %% Number of dimensions:
+    vec_n_dim :: non_neg_integer(),
     %% List of operations used to map a vector to the scalar
     vec_scanner :: scanner(),
     %% Total size of the resulting key, in bits:
     key_size :: non_neg_integer(),
-    %% Bit size of each dimenstion of the vector:
-    dim_sizeof :: [non_neg_integer()]
+    %% Bit size of each dimension of the vector:
+    vec_coord_size :: [non_neg_integer()],
+    %% Maximum offset of the part, for each the vector element:
+    vec_max_offset :: [offset()]
 }).
 
 -opaque keymapper() :: #keymapper{}.
@@ -211,6 +218,9 @@
 %% Note: order of bitsources is important. First element of the list
 %% is mapped to the _least_ significant bits of the key, and the last
 %% element becomes most significant bits.
+%%
+%% Warning: currently the algorithm doesn't handle situations when
+%% parts of a vector element are _reordered_ in the resulting scalar.
 -spec make_keymapper([bitsource()]) -> keymapper().
 make_keymapper(Bitsources) ->
     Arr0 = array:new([{fixed, false}, {default, {0, []}}]),
@@ -218,7 +228,9 @@ make_keymapper(Bitsources) ->
         fun(DestOffset, {Dim0, Offset, Size}, Acc) ->
             Dim = Dim0 - 1,
             Action = #scan_action{
-                vec_coord_bitmask = ones(Size), vec_coord_offset = Offset, scalar_offset = DestOffset
+                vec_coord_bitmask = ones(Size),
+                vec_coord_offset = Offset,
+                scalar_offset = DestOffset
             },
             {DimSizeof, Actions} = array:get(Dim, Acc),
             array:set(Dim, {DimSizeof + Size, [Action | Actions]}, Acc)
@@ -227,11 +239,15 @@ make_keymapper(Bitsources) ->
         Bitsources
     ),
     {DimSizeof, Scanner} = lists:unzip(array:to_list(Arr)),
+    NDim = length(Scanner),
+    MaxOffsets = vec_max_offset(NDim, Bitsources),
     #keymapper{
         schema = Bitsources,
+        vec_n_dim = length(Scanner),
         vec_scanner = Scanner,
         key_size = Size,
-        dim_sizeof = DimSizeof
+        vec_coord_size = DimSizeof,
+        vec_max_offset = MaxOffsets
     }.
 
 -spec bitsize(keymapper()) -> pos_integer().
@@ -241,7 +257,7 @@ bitsize(#keymapper{key_size = Size}) ->
 %% @doc Map N-dimensional vector to a scalar key.
 %%
 %% Note: this function is not injective.
--spec vector_to_key(keymapper(), vector()) -> key().
+-spec vector_to_key(keymapper(), vector()) -> scalar().
 vector_to_key(#keymapper{vec_scanner = []}, []) ->
     0;
 vector_to_key(#keymapper{vec_scanner = [Actions | Scanner]}, [Coord | Vector]) ->
@@ -249,7 +265,7 @@ vector_to_key(#keymapper{vec_scanner = [Actions | Scanner]}, [Coord | Vector]) -
 
 %% @doc Same as `vector_to_key', but it works with binaries, and outputs a binary.
 -spec bin_vector_to_key(keymapper(), [binary()]) -> binary().
-bin_vector_to_key(Keymapper = #keymapper{dim_sizeof = DimSizeof, key_size = Size}, Binaries) ->
+bin_vector_to_key(Keymapper = #keymapper{vec_coord_size = DimSizeof, key_size = Size}, Binaries) ->
     Vec = lists:zipwith(
         fun(Bin, SizeOf) ->
             <<Int:SizeOf>> = Bin,
@@ -265,7 +281,7 @@ bin_vector_to_key(Keymapper = #keymapper{dim_sizeof = DimSizeof, key_size = Size
 %%
 %% Note: `vector_to_key(key_to_vector(K)) = K' but
 %% `key_to_vector(vector_to_key(V)) = V' is not guaranteed.
--spec key_to_vector(keymapper(), key()) -> vector().
+-spec key_to_vector(keymapper(), scalar()) -> vector().
 key_to_vector(#keymapper{vec_scanner = Scanner}, Key) ->
     lists:map(
         fun(Actions) ->
@@ -281,8 +297,8 @@ key_to_vector(#keymapper{vec_scanner = Scanner}, Key) ->
     ).
 
 %% @doc Same as `key_to_vector', but it works with binaries.
--spec bin_key_to_vector(keymapper(), binary()) -> [binary()].
-bin_key_to_vector(Keymapper = #keymapper{dim_sizeof = DimSizeof, key_size = Size}, BinKey) ->
+-spec bin_key_to_vector(keymapper(), key()) -> [binary()].
+bin_key_to_vector(Keymapper = #keymapper{vec_coord_size = DimSizeof, key_size = Size}, BinKey) ->
     <<Key:Size>> = BinKey,
     Vector = key_to_vector(Keymapper, Key),
     lists:zipwith(
@@ -294,7 +310,7 @@ bin_key_to_vector(Keymapper = #keymapper{dim_sizeof = DimSizeof, key_size = Size
     ).
 
 %% @doc Transform a bitstring to a key
--spec bitstring_to_key(keymapper(), bitstring()) -> key().
+-spec bitstring_to_key(keymapper(), bitstring()) -> scalar().
 bitstring_to_key(#keymapper{key_size = Size}, Bin) ->
     case Bin of
         <<Key:Size>> ->
@@ -304,58 +320,21 @@ bitstring_to_key(#keymapper{key_size = Size}, Bin) ->
     end.
 
 %% @doc Transform key to a fixed-size bistring
--spec key_to_bitstring(keymapper(), key()) -> bitstring().
+-spec key_to_bitstring(keymapper(), scalar()) -> bitstring().
 key_to_bitstring(#keymapper{key_size = Size}, Key) ->
     <<Key:Size>>.
 
 %% @doc Create a filter object that facilitates range scans.
 -spec make_filter(keymapper(), [coord_range()]) -> filter().
 make_filter(
-    KeyMapper = #keymapper{schema = Schema, dim_sizeof = DimSizeof, key_size = TotalSize}, Filter0
+    KeyMapper = #keymapper{schema = Schema, key_size = TotalSize},
+    Filter0
 ) ->
-    NDim = length(DimSizeof),
-    %% Transform "symbolic" constraints to ranges:
-    Filter1 = constraints_to_ranges(KeyMapper, Filter0),
-    {Bitmask, Bitfilter} = make_bitfilter(KeyMapper, Filter1),
-    %% Calculate maximum source offset as per bitsource specification:
-    MaxOffset = lists:foldl(
-        fun({Dim, Offset, _Size}, Acc) ->
-            maps:update_with(
-                Dim, fun(OldVal) -> max(OldVal, Offset) end, maps:merge(#{Dim => 0}, Acc)
-            )
-        end,
-        #{},
-        Schema
-    ),
-    %% Adjust minimum and maximum values for each interval like this:
-    %%
-    %% Min: 110100|101011 -> 110100|00000
-    %% Max: 110101|001011 -> 110101|11111
-    %%            ^
-    %%            |
-    %%       max offset
-    %%
-    %% This is needed so when we increment the vector, we always scan
-    %% the full range of least significant bits.
-    Filter2 = lists:zipwith(
-        fun
-            ({Val, Val}, _Dim) ->
-                {Val, Val};
-            ({Min0, Max0}, Dim) ->
-                Offset = maps:get(Dim, MaxOffset, 0),
-                %% Set least significant bits of Min to 0:
-                Min = (Min0 bsr Offset) bsl Offset,
-                %% Set least significant bits of Max to 1:
-                Max = Max0 bor ones(Offset),
-                {Min, Max}
-        end,
-        Filter1,
-        lists:seq(1, NDim)
-    ),
-    %% Project the vector into "bitsource coordinate system":
+    {Intervals, Bitmask, Bitfilter} = transform_constraints(KeyMapper, Filter0),
+    %% Project the intervals into the "bitsource coordinate system":
     {_, Filter} = fold_bitsources(
         fun(DstOffset, {Dim, SrcOffset, Size}, Acc) ->
-            {Min0, Max0} = lists:nth(Dim, Filter2),
+            {Min0, Max0} = element(Dim, Intervals),
             Min = (Min0 bsr SrcOffset) band ones(Size),
             Max = (Max0 bsr SrcOffset) band ones(Size),
             Action = #filter_scan_action{
@@ -369,7 +348,7 @@ make_filter(
         [],
         Schema
     ),
-    Ranges = array:from_list(lists:reverse(Filter)),
+    Ranges = list_to_tuple(lists:reverse(Filter)),
     %% Compute estimated upper and lower bounds of a _continous_
     %% interval where all keys lie:
     case Filter of
@@ -377,6 +356,7 @@ make_filter(
             RangeMin = 0,
             RangeMax = 0;
         [#filter_scan_action{offset = MSBOffset, min = MSBMin, max = MSBMax} | _] ->
+            %% Hack: currently this function only considers the first bitsource:
             RangeMin = MSBMin bsl MSBOffset,
             RangeMax = MSBMax bsl MSBOffset bor ones(MSBOffset)
     end,
@@ -400,7 +380,7 @@ make_filter(
 %% If these conditions cannot be satisfied, return `overflow'.
 %%
 %% Corollary: `K' may be equal to `K0'.
--spec ratchet(filter(), key()) -> key() | overflow.
+-spec ratchet(filter(), scalar()) -> scalar() | overflow.
 ratchet(#filter{bitsource_ranges = Ranges, range_max = Max}, Key) when Key =< Max ->
     %% This function works in two steps: first, it finds the position
     %% of bitsource ("pivot point") corresponding to the part of the
@@ -419,7 +399,7 @@ ratchet(#filter{bitsource_ranges = Ranges, range_max = Max}, Key) when Key =< Ma
     %% point.
     %%
     %% 3. The rest of key stays the same
-    NDim = array:size(Ranges),
+    NDim = tuple_size(Ranges),
     case ratchet_scan(Ranges, NDim, Key, 0, {_Pivot0 = -1, _Increment0 = 0}, _Carry = 0) of
         overflow ->
             overflow;
@@ -482,7 +462,9 @@ ratchet_scan(_Ranges, NDim, _Key, NDim, _Pivot, 1) ->
     %% We've reached the end, but key is still not large enough:
     overflow;
 ratchet_scan(Ranges, NDim, Key, I, Pivot0, Carry) ->
-    #filter_scan_action{offset = Offset, size = Size, min = Min, max = Max} = array:get(I, Ranges),
+    #filter_scan_action{offset = Offset, size = Size, min = Min, max = Max} = element(
+        I + 1, Ranges
+    ),
     %% Extract I-th element of the vector from the original key:
     Elem = ((Key bsr Offset) band ones(Size)) + Carry,
     if
@@ -516,7 +498,7 @@ ratchet_scan(Ranges, NDim, Key, I, Pivot0, Carry) ->
 ratchet_do(_Ranges, _Key, I, _Pivot, _Increment) when I < 0 ->
     0;
 ratchet_do(Ranges, Key, I, Pivot, Increment) ->
-    #filter_scan_action{offset = Offset, size = Size, min = Min} = array:get(I, Ranges),
+    #filter_scan_action{offset = Offset, size = Size, min = Min} = element(I + 1, Ranges),
     Mask = ones(Offset + Size) bxor ones(Offset),
     Elem =
         if
@@ -533,46 +515,122 @@ ratchet_do(Ranges, Key, I, Pivot, Increment) ->
     %% ),
     Elem bor ratchet_do(Ranges, Key, I - 1, Pivot, Increment).
 
--spec make_bitfilter(keymapper(), [{non_neg_integer(), non_neg_integer()}]) ->
-    {non_neg_integer(), non_neg_integer()}.
-make_bitfilter(Keymapper = #keymapper{dim_sizeof = DimSizeof}, Ranges) ->
-    L = lists:zipwith(
-        fun
-            ({N, N}, Bits) ->
-                %% For strict equality we can employ bitmask:
-                {ones(Bits), N};
-            (_, _) ->
-                {0, 0}
+%% Calculate maximum offset for each dimension of the vector.
+%%
+%% These offsets are cached because during the creation of the filter
+%% we need to adjust the search interval for the presence of holes.
+-spec vec_max_offset(non_neg_integer(), [bitsource()]) -> array:array(offset()).
+vec_max_offset(NDim, Bitsources) ->
+    Arr0 = array:new([{size, NDim}, {default, 0}, {fixed, true}]),
+    Arr = lists:foldl(
+        fun({Dimension, Offset, _Size}, Acc) ->
+            OldVal = array:get(Dimension - 1, Acc),
+            array:set(Dimension - 1, max(Offset, OldVal), Acc)
         end,
-        Ranges,
-        DimSizeof
+        Arr0,
+        Bitsources
     ),
-    {Bitmask, Bitfilter} = lists:unzip(L),
-    {vector_to_key(Keymapper, Bitmask), vector_to_key(Keymapper, Bitfilter)}.
+    array:to_list(Arr).
 
 %% Transform constraints into a list of closed intervals that the
 %% vector elements should lie in.
-constraints_to_ranges(#keymapper{dim_sizeof = DimSizeof}, Filter) ->
-    lists:zipwith(
-        fun(Constraint, Bitsize) ->
-            Max = ones(Bitsize),
-            case Constraint of
-                any ->
-                    {0, Max};
-                {'=', infinity} ->
-                    {Max, Max};
-                {'=', Val} when Val =< Max ->
-                    {Val, Val};
-                {'>=', Val} when Val =< Max ->
-                    {Val, Max};
-                {A, '..', B} when A =< Max, B =< Max ->
-                    {A, B}
-            end
-        end,
-        Filter,
-        DimSizeof
+transform_constraints(
+    #keymapper{
+        vec_scanner = Scanner, vec_coord_size = DimSizeL, vec_max_offset = MaxOffsetL
+    },
+    FilterL
+) ->
+    do_transform_constraints(
+        Scanner, DimSizeL, MaxOffsetL, FilterL, [], 0, 0
     ).
 
+do_transform_constraints([], [], [], [], RangeAcc, BitmaskAcc, BitfilterAcc) ->
+    {
+        list_to_tuple(lists:reverse(RangeAcc)),
+        BitmaskAcc,
+        BitfilterAcc
+    };
+do_transform_constraints(
+    [Actions | Scanner],
+    [DimSize | DimSizeL],
+    [MaxOffset | MaxOffsetL],
+    [Filter | FilterL],
+    RangeAcc,
+    BitmaskAcc,
+    BitfilterAcc
+) ->
+    %% This function does four things:
+    %%
+    %% 1. It transforms the list of "symbolic inequations" to a list
+    %% of closed intervals for each vector element.
+    %%
+    %% 2. In addition, this function adjusts minimum and maximum
+    %% values for each interval like this:
+    %%
+    %% Min: 110100|101011 -> 110100|00000
+    %% Max: 110101|001011 -> 110101|11111
+    %%            ^
+    %%            |
+    %%       max offset
+    %%
+    %% This is needed so when we increment the vector, we always scan
+    %% the full range of the least significant bits.
+    %%
+    %% This leads to some out-of-range elements being exposed at the
+    %% beginning and the end of the range, so they should be filtered
+    %% out during post-processing.
+    %%
+    %% 3. It calculates the bitmask that can be used together with the
+    %% bitfilter (see 4) to quickly filter out keys that don't satisfy
+    %% the strict equations, using `Key && Bitmask != Bitfilter' check
+    %%
+    %% 4. It calculates the bitfilter
+    Max = ones(DimSize),
+    case Filter of
+        any ->
+            Range = {0, Max},
+            Bitmask = 0,
+            Bitfilter = 0;
+        {'=', infinity} ->
+            Range = {Max, Max},
+            Bitmask = Max,
+            Bitfilter = Max;
+        {'=', Val} when Val =< Max ->
+            Range = {Val, Val},
+            Bitmask = Max,
+            Bitfilter = Val;
+        {'>=', Val} when Val =< Max ->
+            Range = {constr_adjust_min(MaxOffset, Val), constr_adjust_max(MaxOffset, Max)},
+            Bitmask = 0,
+            Bitfilter = 0;
+        {A, '..', B} when A =< Max, B =< Max ->
+            Range = {constr_adjust_min(MaxOffset, A), constr_adjust_max(MaxOffset, B)},
+            Bitmask = 0,
+            Bitfilter = 0
+    end,
+    do_transform_constraints(
+        Scanner,
+        DimSizeL,
+        MaxOffsetL,
+        FilterL,
+        [Range | RangeAcc],
+        vec_elem_to_key(Bitmask, Actions, BitmaskAcc),
+        vec_elem_to_key(Bitfilter, Actions, BitfilterAcc)
+    ).
+
+constr_adjust_min(MaxOffset, Num) ->
+    (Num bsr MaxOffset) bsl MaxOffset.
+
+constr_adjust_max(MaxOffset, Num) ->
+    Num bor ones(MaxOffset).
+
+-spec vec_elem_to_key(non_neg_integer(), [scan_action()], Acc) -> Acc when
+    Acc :: non_neg_integer().
+vec_elem_to_key(_Elem, [], Acc) ->
+    Acc;
+vec_elem_to_key(Elem, [Action | Actions], Acc) ->
+    vec_elem_to_key(Elem, Actions, Acc bor extract(Elem, Action)).
+
 -spec fold_bitsources(fun((_DstOffset :: non_neg_integer(), bitsource(), Acc) -> Acc), Acc, [
     bitsource()
 ]) -> {bitsize(), Acc}.
@@ -595,7 +653,9 @@ do_vector_to_key([Action | Actions], Scanner, Coord, Vector, Acc0) ->
     do_vector_to_key(Actions, Scanner, Coord, Vector, Acc).
 
 -spec extract(_Source :: coord(), scan_action()) -> integer().
-extract(Src, #scan_action{vec_coord_bitmask = SrcBitmask, vec_coord_offset = SrcOffset, scalar_offset = DstOffset}) ->
+extract(Src, #scan_action{
+    vec_coord_bitmask = SrcBitmask, vec_coord_offset = SrcOffset, scalar_offset = DstOffset
+}) ->
     ((Src bsr SrcOffset) band SrcBitmask) bsl DstOffset.
 
 %% extract^-1
@@ -619,9 +679,11 @@ make_keymapper0_test() ->
     ?assertEqual(
         #keymapper{
             schema = Schema,
+            vec_n_dim = 0,
             vec_scanner = [],
             key_size = 0,
-            dim_sizeof = []
+            vec_coord_size = [],
+            vec_max_offset = []
         },
         make_keymapper(Schema)
     ).
@@ -631,12 +693,14 @@ make_keymapper1_test() ->
     ?assertEqual(
         #keymapper{
             schema = Schema,
+            vec_n_dim = 2,
             vec_scanner = [
                 [#scan_action{vec_coord_bitmask = 2#111, vec_coord_offset = 0, scalar_offset = 0}],
                 [#scan_action{vec_coord_bitmask = 2#11111, vec_coord_offset = 0, scalar_offset = 3}]
             ],
             key_size = 8,
-            dim_sizeof = [3, 5]
+            vec_coord_size = [3, 5],
+            vec_max_offset = [0, 0]
         },
         make_keymapper(Schema)
     ).
@@ -646,15 +710,19 @@ make_keymapper2_test() ->
     ?assertEqual(
         #keymapper{
             schema = Schema,
+            vec_n_dim = 2,
             vec_scanner = [
                 [
-                    #scan_action{vec_coord_bitmask = 2#11111, vec_coord_offset = 3, scalar_offset = 8},
+                    #scan_action{
+                        vec_coord_bitmask = 2#11111, vec_coord_offset = 3, scalar_offset = 8
+                    },
                     #scan_action{vec_coord_bitmask = 2#111, vec_coord_offset = 0, scalar_offset = 0}
                 ],
                 [#scan_action{vec_coord_bitmask = 2#11111, vec_coord_offset = 0, scalar_offset = 3}]
             ],
             key_size = 13,
-            dim_sizeof = [8, 5]
+            vec_coord_size = [8, 5],
+            vec_max_offset = [3, 0]
         },
         make_keymapper(Schema)
     ).
@@ -757,17 +825,17 @@ ratchet1_test() ->
     Bitsources = [{1, 0, 8}],
     M = make_keymapper(Bitsources),
     F = make_filter(M, [any]),
-    #filter{bitsource_ranges = Rarr} = F,
+    #filter{bitsource_ranges = Ranges} = F,
     ?assertMatch(
-        [
+        {
             #filter_scan_action{
                 offset = 0,
                 size = 8,
                 min = 0,
                 max = 16#ff
             }
-        ],
-        array:to_list(Rarr)
+        },
+        Ranges
     ),
     ?assertEqual(0, ratchet(F, 0)),
     ?assertEqual(16#fa, ratchet(F, 16#fa)),
@@ -847,9 +915,9 @@ ratchet_prop(#filter{bitfilter = Bitfilter, bitmask = Bitmask, size = Size}, Key
     end,
     CheckGaps(Key0 + 1).
 
-mkbmask(Keymapper, Filter0) ->
-    Filter = constraints_to_ranges(Keymapper, Filter0),
-    make_bitfilter(Keymapper, Filter).
+mkbmask(Keymapper, Filter) ->
+    {_Ranges, Bitmask, Bitfilter} = transform_constraints(Keymapper, Filter),
+    {Bitmask, Bitfilter}.
 
 key2vec(Schema, Vector) ->
     Keymapper = make_keymapper(Schema),