Przeglądaj źródła

refactor(ds): Address review remarks

ieQu1 2 lat temu
rodzic
commit
99329e1243

+ 7 - 1
apps/emqx/src/emqx_persistent_message_ds_replayer.erl

@@ -72,7 +72,13 @@ replay(_SessionId, _Inflight = #inflight{offset_ranges = _Ranges}) ->
 
 -spec commit_offset(emqx_persistent_session_ds:id(), emqx_types:packet_id(), inflight()) ->
     {_IsValidOffset :: boolean(), inflight()}.
-commit_offset(SessionId, PacketId, Inflight0 = #inflight{acked_seqno = AckedSeqno0, next_seqno = NextSeqNo, offset_ranges = Ranges0}) ->
+commit_offset(
+    SessionId,
+    PacketId,
+    Inflight0 = #inflight{
+        acked_seqno = AckedSeqno0, next_seqno = NextSeqNo, offset_ranges = Ranges0
+    }
+) ->
     AckedSeqno = packet_id_to_seqno(NextSeqNo, PacketId),
     true = AckedSeqno0 < AckedSeqno,
     Ranges = lists:filter(

+ 2 - 1
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -281,7 +281,8 @@ publish(_PacketId, Msg, Session) ->
 puback(_ClientInfo, PacketId, Session = #{id := Id, inflight := Inflight0}) ->
     case emqx_persistent_message_ds_replayer:commit_offset(Id, PacketId, Inflight0) of
         {true, Inflight} ->
-            Msg = #message{}, %% TODO
+            %% TODO
+            Msg = #message{},
             {ok, Msg, [], Session#{inflight => Inflight}};
         {false, _} ->
             {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}

+ 87 - 33
apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl

@@ -168,6 +168,10 @@
 %% transformation from a list of bitsources.
 %%
 %% Note: Dimension is 1-based.
+%%
+%% Note: order of bitsources is important. First element of the list
+%% is mapped to the _least_ significant bits of the key, and the last
+%% element becomes most significant bits.
 -spec make_keymapper([bitsource()]) -> keymapper().
 make_keymapper(Bitsources) ->
     Arr0 = array:new([{fixed, false}, {default, {0, []}}]),
@@ -207,12 +211,13 @@ vector_to_key(#keymapper{scanner = [Actions | Scanner]}, [Coord | Vector]) ->
 %% @doc Same as `vector_to_key', but it works with binaries, and outputs a binary.
 -spec bin_vector_to_key(keymapper(), [binary()]) -> binary().
 bin_vector_to_key(Keymapper = #keymapper{dim_sizeof = DimSizeof, size = Size}, Binaries) ->
-    Vec = lists:map(
-        fun({Bin, SizeOf}) ->
+    Vec = lists:zipwith(
+        fun(Bin, SizeOf) ->
             <<Int:SizeOf, _/binary>> = Bin,
             Int
         end,
-        lists:zip(Binaries, DimSizeof)
+        Binaries,
+        DimSizeof
     ),
     Key = vector_to_key(Keymapper, Vec),
     <<Key:Size>>.
@@ -241,13 +246,15 @@ key_to_vector(#keymapper{scanner = Scanner}, Key) ->
 bin_key_to_vector(Keymapper = #keymapper{dim_sizeof = DimSizeof, size = Size}, BinKey) ->
     <<Key:Size>> = BinKey,
     Vector = key_to_vector(Keymapper, Key),
-    lists:map(
-        fun({Elem, SizeOf}) ->
+    lists:zipwith(
+        fun(Elem, SizeOf) ->
             <<Elem:SizeOf>>
         end,
-        lists:zip(Vector, DimSizeof)
+        Vector,
+        DimSizeof
     ).
 
+%% @doc Transform a bitstring to a key
 -spec bitstring_to_key(keymapper(), bitstring()) -> key().
 bitstring_to_key(#keymapper{size = Size}, Bin) ->
     case Bin of
@@ -257,6 +264,7 @@ bitstring_to_key(#keymapper{size = Size}, Bin) ->
             error({invalid_key, Bin, Size})
     end.
 
+%% @doc Transform key to a fixed-size bistring
 -spec key_to_bitstring(keymapper(), key()) -> bitstring().
 key_to_bitstring(#keymapper{size = Size}, Key) ->
     <<Key:Size>>.
@@ -267,13 +275,15 @@ make_filter(
     KeyMapper = #keymapper{schema = Schema, dim_sizeof = DimSizeof, size = TotalSize}, Filter0
 ) ->
     NDim = length(DimSizeof),
-    %% Transform "symbolic" inequations to ranges:
-    Filter1 = inequations_to_ranges(KeyMapper, Filter0),
+    %% Transform "symbolic" constraints to ranges:
+    Filter1 = constraints_to_ranges(KeyMapper, Filter0),
     {Bitmask, Bitfilter} = make_bitfilter(KeyMapper, Filter1),
     %% Calculate maximum source offset as per bitsource specification:
     MaxOffset = lists:foldl(
         fun({Dim, Offset, _Size}, Acc) ->
-            maps:update_with(Dim, fun(OldVal) -> max(OldVal, Offset) end, 0, Acc)
+            maps:update_with(
+                Dim, fun(OldVal) -> max(OldVal, Offset) end, maps:merge(#{Dim => 0}, Acc)
+            )
         end,
         #{},
         Schema
@@ -288,11 +298,11 @@ make_filter(
     %%
     %% This is needed so when we increment the vector, we always scan
     %% the full range of least significant bits.
-    Filter2 = lists:map(
+    Filter2 = lists:zipwith(
         fun
-            ({{Val, Val}, _Dim}) ->
+            ({Val, Val}, _Dim) ->
                 {Val, Val};
-            ({{Min0, Max0}, Dim}) ->
+            ({Min0, Max0}, Dim) ->
                 Offset = maps:get(Dim, MaxOffset, 0),
                 %% Set least significant bits of Min to 0:
                 Min = (Min0 bsr Offset) bsl Offset,
@@ -300,7 +310,8 @@ make_filter(
                 Max = Max0 bor ones(Offset),
                 {Min, Max}
         end,
-        lists:zip(Filter1, lists:seq(1, NDim))
+        Filter1,
+        lists:seq(1, NDim)
     ),
     %% Project the vector into "bitsource coordinate system":
     {_, Filter} = fold_bitsources(
@@ -340,10 +351,37 @@ make_filter(
         range_max = RangeMax
     }.
 
+%% @doc Given a filter `F' and key `K0', return the smallest key `K'
+%% that satisfies the following conditions:
+%%
+%% 1. `K >= K0'
+%%
+%% 2. `K' satisfies filter `F'.
+%%
+%% If these conditions cannot be satisfied, return `overflow'.
+%%
+%% Corollary: `K' may be equal to `K0'.
 -spec ratchet(filter(), key()) -> key() | overflow.
 ratchet(#filter{bitsource_ranges = Ranges, range_max = Max}, Key) when Key =< Max ->
+    %% This function works in two steps: first, it finds the position
+    %% of bitsource ("pivot point") corresponding to the part of the
+    %% key that should be incremented (or set to the _minimum_ value
+    %% of the range, in case the respective part of the original key
+    %% is less than the minimum). It also returns "increment": value
+    %% that should be added to the part of the key at the pivot point.
+    %% Increment can be 0 or 1.
+    %%
+    %% Then it transforms the key using the following operation:
+    %%
+    %% 1. Parts of the key that are less than the pivot point are
+    %% reset to their minimum values.
+    %%
+    %% 2. `Increment' is added to the part of the key at the pivot
+    %% point.
+    %%
+    %% 3. The rest of key stays the same
     NDim = array:size(Ranges),
-    case ratchet_scan(Ranges, NDim, Key, 0, _Pivot = {-1, 0}, _Carry = 0) of
+    case ratchet_scan(Ranges, NDim, Key, 0, {_Pivot0 = -1, _Increment0 = 0}, _Carry = 0) of
         overflow ->
             overflow;
         {Pivot, Increment} ->
@@ -352,16 +390,21 @@ ratchet(#filter{bitsource_ranges = Ranges, range_max = Max}, Key) when Key =< Ma
 ratchet(_, _) ->
     overflow.
 
+%% @doc Given a binary representing a key and a filter, return the
+%% next key matching the filter, or `overflow' if such key doesn't
+%% exist.
 -spec bin_increment(filter(), binary()) -> binary() | overflow.
 bin_increment(Filter = #filter{size = Size}, <<>>) ->
     Key = ratchet(Filter, 0),
     <<Key:Size>>;
-bin_increment(Filter = #filter{size = Size, bitmask = Bitmask, bitfilter = Bitfilter}, KeyBin) ->
+bin_increment(
+    Filter = #filter{size = Size, bitmask = Bitmask, bitfilter = Bitfilter, range_max = RangeMax},
+    KeyBin
+) ->
     <<Key0:Size>> = KeyBin,
     Key1 = Key0 + 1,
     if
-        Key1 band Bitmask =:= Bitfilter ->
-            %% TODO: check overflow
+        Key1 band Bitmask =:= Bitfilter, Key1 =< RangeMax ->
             <<Key1:Size>>;
         true ->
             case ratchet(Filter, Key1) of
@@ -372,6 +415,10 @@ bin_increment(Filter = #filter{size = Size, bitmask = Bitmask, bitfilter = Bitfi
             end
     end.
 
+%% @doc Given a filter and a binary representation of a key, return
+%% `false' if the key _doesn't_ match the fitler. This function
+%% returning `true' is necessary, but not sufficient condition that
+%% the key satisfies the filter.
 -spec bin_checkmask(filter(), binary()) -> boolean().
 bin_checkmask(#filter{size = Size, bitmask = Bitmask, bitfilter = Bitfilter}, Key) ->
     case Key of
@@ -449,35 +496,37 @@ ratchet_do(Ranges, Key, I, Pivot, Increment) ->
 -spec make_bitfilter(keymapper(), [{non_neg_integer(), non_neg_integer()}]) ->
     {non_neg_integer(), non_neg_integer()}.
 make_bitfilter(Keymapper = #keymapper{dim_sizeof = DimSizeof}, Ranges) ->
-    L = lists:map(
+    L = lists:zipwith(
         fun
-            ({{N, N}, Bits}) ->
+            ({N, N}, Bits) ->
                 %% For strict equality we can employ bitmask:
                 {ones(Bits), N};
-            (_) ->
+            (_, _) ->
                 {0, 0}
         end,
-        lists:zip(Ranges, DimSizeof)
+        Ranges,
+        DimSizeof
     ),
     {Bitmask, Bitfilter} = lists:unzip(L),
     {vector_to_key(Keymapper, Bitmask), vector_to_key(Keymapper, Bitfilter)}.
 
 %% Transform inequalities into a list of closed intervals that the
 %% vector elements should lie in.
-inequations_to_ranges(#keymapper{dim_sizeof = DimSizeof}, Filter) ->
-    lists:map(
+constraints_to_ranges(#keymapper{dim_sizeof = DimSizeof}, Filter) ->
+    lists:zipwith(
         fun
-            ({any, Bitsize}) ->
+            (any, Bitsize) ->
                 {0, ones(Bitsize)};
-            ({{'=', infinity}, Bitsize}) ->
+            ({'=', infinity}, Bitsize) ->
                 Val = ones(Bitsize),
                 {Val, Val};
-            ({{'=', Val}, _Bitsize}) ->
+            ({'=', Val}, _Bitsize) ->
                 {Val, Val};
-            ({{'>=', Val}, Bitsize}) ->
+            ({'>=', Val}, Bitsize) ->
                 {Val, ones(Bitsize)}
         end,
-        lists:zip(Filter, DimSizeof)
+        Filter,
+        DimSizeof
     ).
 
 -spec fold_bitsources(fun((_DstOffset :: non_neg_integer(), bitsource(), Acc) -> Acc), Acc, [
@@ -679,7 +728,7 @@ ratchet1_test() ->
     ?assertEqual(0, ratchet(F, 0)),
     ?assertEqual(16#fa, ratchet(F, 16#fa)),
     ?assertEqual(16#ff, ratchet(F, 16#ff)),
-    ?assertEqual(overflow, ratchet(F, 16#100), "TBD: filter must store the upper bound").
+    ?assertEqual(overflow, ratchet(F, 16#100)).
 
 %% erlfmt-ignore
 ratchet2_test() ->
@@ -696,6 +745,11 @@ ratchet2_test() ->
     ?assertEqual(16#aa11cc00, ratchet(F1, 16#aa10dc11)),
     ?assertEqual(overflow,    ratchet(F1, 16#ab000000)),
     F2 = make_filter(M, [{'=', 16#aa}, {'>=', 16#dddd}, {'=', 16#cc}]),
+    %% TODO: note that it's `16#aaddcc00` instead of
+    %% `16#aaddccdd'. That is because currently ratchet function
+    %% doesn't take LSBs of an '>=' interval if it has a hole in the
+    %% middle (see `make_filter/2'). This only adds extra keys to the
+    %% very first interval, so it's not deemed a huge problem.
     ?assertEqual(16#aaddcc00, ratchet(F2, 0)),
     ?assertEqual(16#aa_de_cc_00, ratchet(F2, 16#aa_dd_cd_11)).
 
@@ -721,18 +775,18 @@ ratchet3_test_() ->
 %% Note: this function iterates through the full range of keys, so its
 %% complexity grows _exponentially_ with the total size of the
 %% keymapper.
-test_iterate(Filter, overflow) ->
+test_iterate(_Filter, overflow) ->
     true;
 test_iterate(Filter, Key0) ->
     Key = ratchet(Filter, Key0 + 1),
     ?assert(ratchet_prop(Filter, Key0, Key)),
     test_iterate(Filter, Key).
 
-ratchet_prop(Filter = #filter{bitfilter = Bitfilter, bitmask = Bitmask, size = Size}, Key0, Key) ->
+ratchet_prop(#filter{bitfilter = Bitfilter, bitmask = Bitmask, size = Size}, Key0, Key) ->
     %% Validate basic properties of the generated key. It must be
     %% greater than the old key, and match the bitmask:
     ?assert(Key =:= overflow orelse (Key band Bitmask =:= Bitfilter)),
-    ?assert(Key > Key0, {Key, '>=', Key}),
+    ?assert(Key > Key0, {Key, '>=', Key0}),
     IMax = ones(Size),
     %% Iterate through all keys between `Key0 + 1' and `Key' and
     %% validate that none of them match the bitmask. Ultimately, it
@@ -750,7 +804,7 @@ ratchet_prop(Filter = #filter{bitfilter = Bitfilter, bitmask = Bitmask, size = S
     CheckGaps(Key0 + 1).
 
 mkbmask(Keymapper, Filter0) ->
-    Filter = inequations_to_ranges(Keymapper, Filter0),
+    Filter = constraints_to_ranges(Keymapper, Filter0),
     make_bitfilter(Keymapper, Filter).
 
 key2vec(Schema, Vector) ->

+ 0 - 73
apps/emqx_durable_storage/src/emqx_ds_helper.erl

@@ -1,73 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
--module(emqx_ds_helper).
-
-%% API:
--export([create_rr/1]).
-
-%% internal exports:
--export([]).
-
--export_type([rr/0]).
-
-%%================================================================================
-%% Type declarations
-%%================================================================================
-
--type item() :: {emqx_ds:stream_rank(), emqx_ds:stream()}.
-
--type rr() :: #{
-    queue := #{term() => [{integer(), emqx_ds:stream()}]},
-    active_ring := {[item()], [item()]}
-}.
-
-%%================================================================================
-%% API funcions
-%%================================================================================
-
--spec create_rr([item()]) -> rr().
-create_rr(Streams) ->
-    RR0 = #{latest_rank => #{}, active_ring => {[], []}},
-    add_streams(RR0, Streams).
-
--spec add_streams(rr(), [item()]) -> rr().
-add_streams(#{queue := Q0, active_ring := R0}, Streams) ->
-    Q1 = lists:foldl(
-        fun({{RankX, RankY}, Stream}, Acc) ->
-            maps:update_with(RankX, fun(L) -> [{RankY, Stream} | L] end, Acc)
-        end,
-        Q0,
-        Streams
-    ),
-    Q2 = maps:map(
-        fun(_RankX, Streams1) ->
-            lists:usort(Streams1)
-        end,
-        Q1
-    ),
-    #{queue => Q2, active_ring => R0}.
-
-%%================================================================================
-%% behavior callbacks
-%%================================================================================
-
-%%================================================================================
-%% Internal exports
-%%================================================================================
-
-%%================================================================================
-%% Internal functions
-%%================================================================================

+ 3 - 0
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -13,6 +13,9 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%--------------------------------------------------------------------
+
+%% @doc Replication layer for DS backends that don't support
+%% replication on their own.
 -module(emqx_ds_replication_layer).
 
 -export([

+ 10 - 7
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -14,11 +14,8 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
-%% @doc Reference implementation of the storage.
-%%
-%% Trivial, extremely slow and inefficient. It also doesn't handle
-%% restart of the Erlang node properly, so obviously it's only to be
-%% used for testing.
+%% @doc A storage layout based on learned topic structure and using
+%% bitfield mapping for the varying topic layers.
 -module(emqx_ds_storage_bitfield_lts).
 
 -behaviour(emqx_ds_storage_layer).
@@ -82,6 +79,9 @@
 
 -define(COUNTER, emqx_ds_storage_bitfield_lts_counter).
 
+%% Limit on the number of wildcard levels in the learned topic trie:
+-define(WILDCARD_LIMIT, 10).
+
 -include("emqx_ds_bitmask.hrl").
 
 %%================================================================================
@@ -140,7 +140,7 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
     %% If user's topics have more than learned 10 wildcard levels
     %% (more than 2, really), then it's total carnage; learned topic
     %% structure won't help.
-    MaxWildcardLevels = 10,
+    MaxWildcardLevels = ?WILDCARD_LIMIT,
     KeymapperCache = array:from_list(
         [
             make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N)
@@ -201,6 +201,9 @@ next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) ->
     %% levels. Magic constant 2: we have two extra dimensions of topic
     %% index and time; the rest of dimensions are varying levels.
     NVarying = length(Inequations) - 2,
+    %% Assert:
+    NVarying =< ?WILDCARD_LIMIT orelse
+        error({too_many_varying_topic_levels, NVarying}),
     Keymapper = array:get(NVarying, Keymappers),
     Filter =
         #filter{range_min = LowerBound, range_max = UpperBound} = emqx_ds_bitmask_keymapper:make_filter(
@@ -208,7 +211,7 @@ next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) ->
         ),
     {ok, ITHandle} = rocksdb:iterator(DB, CF, [
         {iterate_lower_bound, emqx_ds_bitmask_keymapper:key_to_bitstring(Keymapper, LowerBound)},
-        {iterate_upper_bound, emqx_ds_bitmask_keymapper:key_to_bitstring(Keymapper, UpperBound)}
+        {iterate_upper_bound, emqx_ds_bitmask_keymapper:key_to_bitstring(Keymapper, UpperBound + 1)}
     ]),
     try
         put(?COUNTER, 0),

+ 4 - 4
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -45,7 +45,7 @@
 %% Note: this record might be stored permanently on a remote node.
 -record(stream, {
     generation :: gen_id(),
-    enc :: _EncapsultatedData,
+    enc :: _EncapsulatedData,
     misc = #{} :: map()
 }).
 
@@ -54,7 +54,7 @@
 %% Note: this record might be stored permanently on a remote node.
 -record(it, {
     generation :: gen_id(),
-    enc :: _EncapsultatedData,
+    enc :: _EncapsulatedData,
     misc = #{} :: map()
 }).
 
@@ -83,10 +83,10 @@
 %%%% Shard:
 
 -type shard(GenData) :: #{
-    %% ID of the current generation (where the new data is written:)
+    %% ID of the current generation (where the new data is written):
     current_generation := gen_id(),
     %% This data is used to create new generation:
-    prototype := {module(), term()},
+    prototype := prototype(),
     %% Generations:
     {generation, gen_id()} => GenData
 }.

+ 1 - 2
apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl

@@ -68,5 +68,4 @@ next(Node, Shard, Iter, BatchSize) ->
 %%================================================================================
 
 introduced_in() ->
-    %% FIXME
-    "5.3.0".
+    "5.4.0".

+ 0 - 13
tdd

@@ -1,13 +0,0 @@
-#!/bin/bash
-
-make fmt > /dev/null &>1 &
-
-./rebar3 ct --name ct@127.0.0.1 --readable=true  --suite ./_build/test/lib/emqx/test/emqx_persistent_session_SUITE.beam --case t_publish_while_client_is_gone_qos1 --group tcp
-
-suites=$(cat <<EOF | paste -sd "," -
-./_build/test/lib/emqx/test/emqx_persistent_session_SUITE.beam
-./_build/test/lib/emqx/test/emqx_persistent_messages_SUITE.beam
-EOF
-)
-
-#./rebar3 ct --name ct@127.0.0.1 --readable=true --suite "${suites}" --case t_publish_while_client_is_gone