Просмотр исходного кода

feat(ds): Allow to specify message store options

* Keymapper
* Column family name + DB options
* DB write / read options
Andrew Mayorov 3 лет назад
Родитель
Сommit
8c9c9cc669

+ 149 - 43
apps/emqx_replay/src/emqx_replay_message_storage.erl

@@ -18,6 +18,7 @@
 
 %% API:
 -export([open/2, close/1]).
+-export([make_keymapper/1]).
 
 -export([store/5]).
 -export([make_iterator/3]).
@@ -25,11 +26,11 @@
 
 %% Debug/troubleshooting:
 -export([
-    make_message_key/3,
-    compute_topic_hash/1,
-    compute_hash_bitmask/1,
-    hash/2,
-    combine/3
+    make_message_key/4,
+    compute_topic_hash/2,
+    compute_hash_bitmask/2,
+    combine/4,
+    hash/2
 ]).
 
 -export_type([db/0, iterator/0]).
@@ -38,21 +39,69 @@
 %% Type declarations
 %%================================================================================
 
-%% see rocksdb:db_options()
--type options() :: proplists:proplist().
-
 %% parsed
 -type topic() :: list(binary()).
 
 %% TODO granularity?
 -type time() :: integer().
 
+%% Number of bits
+-type bits() :: non_neg_integer().
+
+%% Key of a RocksDB record.
+-type key() :: binary().
+
+%% Distribution of entropy among topic levels.
+%% Example: [4, 8, 16] means that level 1 gets 4 bits, level 2 gets 8 bits,
+%% and _rest of levels_ (if any) get 16 bits.
+-type bits_per_level() :: [bits(), ...].
+
+%% see rocksdb:db_options()
+-type db_options() :: proplists:proplist().
+
+%% see rocksdb:cf_options()
+-type db_cf_options() :: proplists:proplist().
+
+%% see rocksdb:write_options()
+-type db_write_options() :: proplists:proplist().
+
+%% see rocksdb:read_options()
+-type db_read_options() :: proplists:proplist().
+
+-type options() :: #{
+    %% Keymapper.
+    keymapper := keymapper(),
+    %% Name and options to use to open specific column family.
+    column_family => {_Name :: string(), db_cf_options()},
+    %% Options to use when opening the DB.
+    open_options => db_options(),
+    %% Options to use when writing a message to the DB.
+    write_options => db_write_options(),
+    %% Options to use when iterating over messages in the DB.
+    read_options => db_read_options()
+}.
+
+-define(DEFAULT_COLUMN_FAMILY, {"default", []}).
+
+-define(DEFAULT_OPEN_OPTIONS, [
+    {create_if_missing, true},
+    {create_missing_column_families, true}
+]).
+
+-define(DEFAULT_WRITE_OPTIONS, [{sync, true}]).
+-define(DEFAULT_READ_OPTIONS, []).
+
 -record(db, {
-    handle :: rocksdb:db_handle()
+    handle :: rocksdb:db_handle(),
+    cf :: rocksdb:cf_handle(),
+    keymapper :: keymapper(),
+    write_options = [{sync, true}] :: db_write_options(),
+    read_options = [] :: db_write_options()
 }).
 
 -record(it, {
     handle :: rocksdb:itr_handle(),
+    keymapper :: keymapper(),
     next_action :: {seek, binary()} | next,
     topic_filter :: emqx_topic:words(),
     hash_filter :: integer(),
@@ -60,9 +109,17 @@
     start_time :: time()
 }).
 
--opaque db() :: #db{}.
+% NOTE
+% Keymapper decides how to map messages into RocksDB column family keyspace.
+-record(keymapper, {
+    topic_bits :: bits(),
+    topic_bits_per_level :: bits_per_level(),
+    timestamp_bits :: bits()
+}).
 
+-opaque db() :: #db{}.
 -opaque iterator() :: #it{}.
+-type keymapper() :: #keymapper{}.
 
 %%================================================================================
 %% API funcions
@@ -71,9 +128,30 @@
 -spec open(file:filename_all(), options()) ->
     {ok, db()} | {error, _TODO}.
 open(Filename, Options) ->
-    case rocksdb:open(Filename, [{create_if_missing, true}, Options]) of
-        {ok, Handle} ->
-            {ok, #db{handle = Handle}};
+    CFDescriptors =
+        case maps:get(column_family, Options, undefined) of
+            CF = {_Name, _} ->
+                % TODO
+                % > When opening a DB in a read-write mode, you need to specify all
+                % > Column Families that currently exist in a DB. If that's not the case,
+                % > DB::Open call will return Status::InvalidArgument().
+                % This probably means that we need the _manager_ (the thing which knows
+                % about all the column families there is) to hold the responsibility to
+                % open the database and hold all the handles.
+                [CF, ?DEFAULT_COLUMN_FAMILY];
+            undefined ->
+                [?DEFAULT_COLUMN_FAMILY]
+        end,
+    DBOptions = maps:get(open_options, Options, ?DEFAULT_OPEN_OPTIONS),
+    case rocksdb:open(Filename, DBOptions, CFDescriptors) of
+        {ok, Handle, [CFHandle | _]} ->
+            {ok, #db{
+                handle = Handle,
+                cf = CFHandle,
+                keymapper = maps:get(keymapper, Options),
+                write_options = maps:get(write_options, Options, ?DEFAULT_WRITE_OPTIONS),
+                read_options = maps:get(read_options, Options, ?DEFAULT_READ_OPTIONS)
+            }};
         Error ->
             Error
     end.
@@ -82,26 +160,44 @@ open(Filename, Options) ->
 close(#db{handle = DB}) ->
     rocksdb:close(DB).
 
+-spec make_keymapper(Options) -> keymapper() when
+    Options :: #{
+        %% Number of bits in a key allocated to a message timestamp.
+        timestamp_bits := bits(),
+        %% Number of bits in a key allocated to each level in a message topic.
+        topic_bits_per_level := bits_per_level()
+    }.
+make_keymapper(Options) ->
+    TimestampBits = maps:get(timestamp_bits, Options),
+    TopicBitsPerLevel = maps:get(topic_bits_per_level, Options),
+    #keymapper{
+        timestamp_bits = TimestampBits,
+        topic_bits = lists:sum(TopicBitsPerLevel),
+        topic_bits_per_level = TopicBitsPerLevel
+    }.
+
 -spec store(db(), emqx_guid:guid(), time(), topic(), binary()) ->
     ok | {error, _TODO}.
-store(#db{handle = DB}, MessageID, PublishedAt, Topic, MessagePayload) ->
-    Key = make_message_key(MessageID, Topic, PublishedAt),
+store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, MessagePayload) ->
+    Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper),
     Value = make_message_value(Topic, MessagePayload),
-    rocksdb:put(DB, Key, Value, [{sync, true}]).
+    rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options).
 
 -spec make_iterator(db(), emqx_topic:words(), time() | earliest) ->
     % {error, invalid_start_time}? might just start from the beginning of time
     % and call it a day: client violated the contract anyway.
     {ok, iterator()} | {error, _TODO}.
-make_iterator(#db{handle = DBHandle}, TopicFilter, StartTime) ->
-    case rocksdb:iterator(DBHandle, []) of
+make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime) ->
+    case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of
         {ok, ITHandle} ->
-            Hash = compute_topic_hash(TopicFilter),
-            HashBitmask = compute_hash_bitmask(TopicFilter),
+            Hash = compute_topic_hash(TopicFilter, DB#db.keymapper),
+            HashBitmask = compute_hash_bitmask(TopicFilter, DB#db.keymapper),
             HashFilter = Hash band HashBitmask,
+            InitialSeek = combine(HashFilter, StartTime, <<>>, DB#db.keymapper),
             {ok, #it{
                 handle = ITHandle,
-                next_action = {seek, combine(HashFilter, StartTime, <<>>)},
+                keymapper = DB#db.keymapper,
+                next_action = {seek, InitialSeek},
                 topic_filter = TopicFilter,
                 start_time = StartTime,
                 hash_filter = HashFilter,
@@ -116,7 +212,7 @@ next(It = #it{next_action = Action}) ->
     case rocksdb:iterator_move(It#it.handle, Action) of
         % spec says `{ok, Key}` is also possible but the implementation says it's not
         {ok, Key, Value} ->
-            {TopicHash, PublishedAt} = extract(Key),
+            {TopicHash, PublishedAt} = extract(Key, It#it.keymapper),
             match_next(It, TopicHash, PublishedAt, Value);
         {error, invalid_iterator} ->
             stop_iteration(It);
@@ -128,10 +224,8 @@ next(It = #it{next_action = Action}) ->
 %% Internal exports
 %%================================================================================
 
--define(TOPIC_LEVELS_ENTROPY_BITS, [8, 8, 32, 16]).
-
-make_message_key(MessageID, Topic, PublishedAt) ->
-    combine(compute_topic_hash(Topic), PublishedAt, MessageID).
+make_message_key(Topic, PublishedAt, MessageID, Keymapper) ->
+    combine(compute_topic_hash(Topic, Keymapper), PublishedAt, MessageID, Keymapper).
 
 make_message_value(Topic, MessagePayload) ->
     term_to_binary({Topic, MessagePayload}).
@@ -139,22 +233,33 @@ make_message_value(Topic, MessagePayload) ->
 unwrap_message_value(Binary) ->
     binary_to_term(Binary).
 
-combine(TopicHash, PublishedAt, MessageID) ->
-    <<TopicHash:64/integer, PublishedAt:64/integer, MessageID/binary>>.
-
-extract(<<TopicHash:64/integer, PublishedAt:64/integer, _MessageID/binary>>) ->
+-spec combine(_TopicHash :: integer(), time(), emqx_guid:guid(), keymapper()) ->
+    key().
+combine(TopicHash, PublishedAt, MessageID, #keymapper{
+    timestamp_bits = TimestampBits,
+    topic_bits = TopicBits
+}) ->
+    <<TopicHash:TopicBits/integer, PublishedAt:TimestampBits/integer, MessageID/binary>>.
+
+-spec extract(key(), keymapper()) ->
+    {_TopicHash :: integer(), time()}.
+extract(Key, #keymapper{
+    timestamp_bits = TimestampBits,
+    topic_bits = TopicBits
+}) ->
+    <<TopicHash:TopicBits/integer, PublishedAt:TimestampBits/integer, _MessageID/binary>> = Key,
     {TopicHash, PublishedAt}.
 
-compute_topic_hash(Topic) ->
-    compute_topic_hash(Topic, ?TOPIC_LEVELS_ENTROPY_BITS, 0).
+compute_topic_hash(Topic, Keymapper) ->
+    compute_topic_hash(Topic, Keymapper#keymapper.topic_bits_per_level, 0).
 
 hash(Input, Bits) ->
     % at most 32 bits
     erlang:phash2(Input, 1 bsl Bits).
 
--spec compute_hash_bitmask(emqx_topic:words()) -> integer().
-compute_hash_bitmask(TopicFilter) ->
-    compute_hash_bitmask(TopicFilter, ?TOPIC_LEVELS_ENTROPY_BITS, 0).
+-spec compute_hash_bitmask(emqx_topic:words(), keymapper()) -> integer().
+compute_hash_bitmask(TopicFilter, Keymapper) ->
+    compute_hash_bitmask(TopicFilter, Keymapper#keymapper.topic_bits_per_level, 0).
 
 %%================================================================================
 %% Internal functions
@@ -201,6 +306,7 @@ ones(Bits) ->
 
 match_next(
     It = #it{
+        keymapper = Keymapper,
         topic_filter = TopicFilter,
         hash_filter = HashFilter,
         hash_bitmask = HashBitmask,
@@ -222,13 +328,13 @@ match_next(
                     next(It#it{next_action = next})
             end;
         true ->
-            NextAction = {seek, combine(TopicHash, StartTime, <<>>)},
-            next(It#it{next_action = NextAction});
+            NextSeek = combine(TopicHash, StartTime, <<>>, Keymapper),
+            next(It#it{next_action = {seek, NextSeek}});
         false ->
-            case compute_next_seek(TopicHash, HashFilter, HashBitmask) of
+            case compute_next_seek(TopicHash, HashFilter, HashBitmask, Keymapper) of
                 NextHash when is_integer(NextHash) ->
-                    NextAction = {seek, combine(NextHash, StartTime, <<>>)},
-                    next(It#it{next_action = NextAction});
+                    NextSeek = combine(NextHash, StartTime, <<>>, Keymapper),
+                    next(It#it{next_action = {seek, NextSeek}});
                 none ->
                     stop_iteration(It)
             end
@@ -238,9 +344,9 @@ stop_iteration(It) ->
     ok = rocksdb:iterator_close(It#it.handle),
     none.
 
-compute_next_seek(TopicHash, HashFilter, HashBitmask) ->
-    compute_next_seek(TopicHash, HashFilter, HashBitmask, ?TOPIC_LEVELS_ENTROPY_BITS).
-
+compute_next_seek(TopicHash, HashFilter, HashBitmask, Keymapper = #keymapper{}) ->
+    BitsPerLevel = Keymapper#keymapper.topic_bits_per_level,
+    compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel);
 compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel) ->
     % NOTE
     % Ok, this convoluted mess implements a sort of _increment operation_ for some

+ 19 - 6
apps/emqx_replay/test/emqx_replay_storage_SUITE.erl

@@ -136,20 +136,28 @@ parse_topic(Topic) ->
 %%
 
 t_prop_topic_hash_computes(_) ->
+    Keymapper = emqx_replay_message_storage:make_keymapper(#{
+        topic_bits_per_level => [8, 12, 16, 24],
+        timestamp_bits => 0
+    }),
     ?assert(
         proper:quickcheck(
             ?FORALL(Topic, topic(), begin
-                Hash = emqx_replay_message_storage:compute_topic_hash(Topic),
+                Hash = emqx_replay_message_storage:compute_topic_hash(Topic, Keymapper),
                 is_integer(Hash) andalso (byte_size(binary:encode_unsigned(Hash)) =< 8)
             end)
         )
     ).
 
 t_prop_hash_bitmask_computes(_) ->
+    Keymapper = emqx_replay_message_storage:make_keymapper(#{
+        topic_bits_per_level => [8, 12, 16, 24],
+        timestamp_bits => 0
+    }),
     ?assert(
         proper:quickcheck(
             ?FORALL(TopicFilter, topic_filter(), begin
-                Hash = emqx_replay_message_storage:compute_hash_bitmask(TopicFilter),
+                Hash = emqx_replay_message_storage:compute_hash_bitmask(TopicFilter, Keymapper),
                 is_integer(Hash) andalso (byte_size(binary:encode_unsigned(Hash)) =< 8)
             end)
         )
@@ -165,8 +173,9 @@ t_prop_iterate_stored_messages(Config) ->
                 messages(),
                 begin
                     Stream = payload_gen:interleave_streams(Streams),
-                    ok = store_message_stream(DB, Stream)
+                    ok = store_message_stream(DB, Stream),
                     % TODO actually verify some property
+                    true
                 end
             )
         )
@@ -194,8 +203,6 @@ topic(EntropyWeights) ->
     ?LET(
         L,
         list(1),
-        % ?SIZED(S, [topic(S * nth(I, EntropyWeights, 1)) || I <- lists:seq(1, Len)])
-        % [topic(10 * nth(I, EntropyWeights, 1)) || I <- lists:seq(1, Len)]
         ?SIZED(S, [topic_level(S * EW) || EW <- lists:sublist(EntropyWeights ++ L, length(L))])
     ).
 
@@ -242,7 +249,13 @@ all() -> emqx_common_test_helpers:all(?MODULE).
 init_per_testcase(TC, Config) ->
     Filename = filename:join(?MODULE_STRING, atom_to_list(TC)),
     ok = filelib:ensure_dir(Filename),
-    {ok, DB} = emqx_replay_message_storage:open(Filename, []),
+    {ok, DB} = emqx_replay_message_storage:open(Filename, #{
+        column_family => {atom_to_list(TC), []},
+        keymapper => emqx_replay_message_storage:make_keymapper(#{
+            topic_bits_per_level => [8, 8, 32, 16],
+            timestamp_bits => 64
+        })
+    }),
     [{handle, DB} | Config].
 
 end_per_testcase(_TC, Config) ->