瀏覽代碼

feat(ds): support custom LTS threshold specs in storage layouts

Andrew Mayorov 1 年之前
父節點
當前提交
53a74d0b51

+ 15 - 11
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -81,7 +81,8 @@
     #{
     #{
         bits_per_wildcard_level => pos_integer(),
         bits_per_wildcard_level => pos_integer(),
         topic_index_bytes => pos_integer(),
         topic_index_bytes => pos_integer(),
-        epoch_bits => non_neg_integer()
+        epoch_bits => non_neg_integer(),
+        lts_threshold_spec => emqx_ds_lts:threshold_spec()
     }.
     }.
 
 
 %% Permanent state:
 %% Permanent state:
@@ -90,7 +91,8 @@
         bits_per_wildcard_level := pos_integer(),
         bits_per_wildcard_level := pos_integer(),
         topic_index_bytes := pos_integer(),
         topic_index_bytes := pos_integer(),
         ts_bits := non_neg_integer(),
         ts_bits := non_neg_integer(),
-        ts_offset_bits := non_neg_integer()
+        ts_offset_bits := non_neg_integer(),
+        lts_threshold_spec => emqx_ds_lts:threshold_spec()
     }.
     }.
 
 
 %% Runtime state:
 %% Runtime state:
@@ -102,6 +104,7 @@
     keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()),
     keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()),
     ts_bits :: non_neg_integer(),
     ts_bits :: non_neg_integer(),
     ts_offset :: non_neg_integer(),
     ts_offset :: non_neg_integer(),
+    threshold_fun :: emqx_ds_lts:threshold_fun(),
     gvars :: ets:table()
     gvars :: ets:table()
 }).
 }).
 
 
@@ -141,6 +144,9 @@
 %% Limit on the number of wildcard levels in the learned topic trie:
 %% Limit on the number of wildcard levels in the learned topic trie:
 -define(WILDCARD_LIMIT, 10).
 -define(WILDCARD_LIMIT, 10).
 
 
+%% Default LTS thresholds: 0th level = 100 entries max, other levels = 20 entries.
+-define(DEFAULT_LTS_THRESHOLD, {simple, {100, 20}}).
+
 %% Persistent (durable) term representing `#message{}' record. Must
 %% Persistent (durable) term representing `#message{}' record. Must
 %% not change.
 %% not change.
 -type value_v1() ::
 -type value_v1() ::
@@ -195,6 +201,7 @@ create(_ShardId, DBHandle, GenId, Options, SPrev) ->
     TopicIndexBytes = maps:get(topic_index_bytes, Options, 4),
     TopicIndexBytes = maps:get(topic_index_bytes, Options, 4),
     %% 20 bits -> 1048576 us -> ~1 sec
     %% 20 bits -> 1048576 us -> ~1 sec
     TSOffsetBits = maps:get(epoch_bits, Options, 20),
     TSOffsetBits = maps:get(epoch_bits, Options, 20),
+    ThresholdSpec = maps:get(lts_threshold_spec, Options, ?DEFAULT_LTS_THRESHOLD),
     %% Create column families:
     %% Create column families:
     DataCFName = data_cf(GenId),
     DataCFName = data_cf(GenId),
     TrieCFName = trie_cf(GenId),
     TrieCFName = trie_cf(GenId),
@@ -213,7 +220,8 @@ create(_ShardId, DBHandle, GenId, Options, SPrev) ->
         bits_per_wildcard_level => BitsPerTopicLevel,
         bits_per_wildcard_level => BitsPerTopicLevel,
         topic_index_bytes => TopicIndexBytes,
         topic_index_bytes => TopicIndexBytes,
         ts_bits => 64,
         ts_bits => 64,
-        ts_offset_bits => TSOffsetBits
+        ts_offset_bits => TSOffsetBits,
+        lts_threshold_spec => ThresholdSpec
     },
     },
     {Schema, [{DataCFName, DataCFHandle}, {TrieCFName, TrieCFHandle}]}.
     {Schema, [{DataCFName, DataCFHandle}, {TrieCFName, TrieCFHandle}]}.
 
 
@@ -245,6 +253,7 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
          || N <- lists:seq(0, MaxWildcardLevels)
          || N <- lists:seq(0, MaxWildcardLevels)
         ]
         ]
     ),
     ),
+    ThresholdSpec = maps:get(lts_threshold_spec, Schema, ?DEFAULT_LTS_THRESHOLD),
     #s{
     #s{
         db = DBHandle,
         db = DBHandle,
         data = DataCF,
         data = DataCF,
@@ -253,6 +262,7 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
         keymappers = KeymapperCache,
         keymappers = KeymapperCache,
         ts_offset = TSOffsetBits,
         ts_offset = TSOffsetBits,
         ts_bits = TSBits,
         ts_bits = TSBits,
+        threshold_fun = emqx_ds_lts:threshold_fun(ThresholdSpec),
         gvars = ets:new(?MODULE, [public, set, {read_concurrency, true}])
         gvars = ets:new(?MODULE, [public, set, {read_concurrency, true}])
     }.
     }.
 
 
@@ -841,9 +851,9 @@ format_key(KeyMapper, Key) ->
     lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])).
     lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])).
 
 
 -spec make_key(s(), emqx_ds:time(), emqx_types:topic()) -> {binary(), [binary()]}.
 -spec make_key(s(), emqx_ds:time(), emqx_types:topic()) -> {binary(), [binary()]}.
-make_key(#s{keymappers = KeyMappers, trie = Trie}, Timestamp, Topic) ->
+make_key(#s{keymappers = KeyMappers, trie = Trie, threshold_fun = TFun}, Timestamp, Topic) ->
     Tokens = emqx_topic:words(Topic),
     Tokens = emqx_topic:words(Topic),
-    {TopicIndex, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
+    {TopicIndex, Varying} = emqx_ds_lts:topic_key(Trie, TFun, Tokens),
     VaryingHashes = [hash_topic_level(I) || I <- Varying],
     VaryingHashes = [hash_topic_level(I) || I <- Varying],
     KeyMapper = array:get(length(Varying), KeyMappers),
     KeyMapper = array:get(length(Varying), KeyMappers),
     KeyBin = make_key(KeyMapper, TopicIndex, Timestamp, VaryingHashes),
     KeyBin = make_key(KeyMapper, TopicIndex, Timestamp, VaryingHashes),
@@ -861,12 +871,6 @@ make_key(KeyMapper, TopicIndex, Timestamp, Varying) ->
         ])
         ])
     ).
     ).
 
 
-%% TODO: don't hardcode the thresholds
-threshold_fun(0) ->
-    100;
-threshold_fun(_) ->
-    20.
-
 hash_topic_level('') ->
 hash_topic_level('') ->
     hash_topic_level(<<>>);
     hash_topic_level(<<>>);
 hash_topic_level(TopicLevel) ->
 hash_topic_level(TopicLevel) ->

+ 24 - 16
apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl

@@ -87,9 +87,13 @@
         topic_index_bytes := pos_integer(),
         topic_index_bytes := pos_integer(),
         keep_message_id := boolean(),
         keep_message_id := boolean(),
         serialization_schema := emqx_ds_msg_serializer:schema(),
         serialization_schema := emqx_ds_msg_serializer:schema(),
-        with_guid := boolean()
+        with_guid := boolean(),
+        lts_threshold_spec => emqx_ds_lts:threshold_spec()
     }.
     }.
 
 
+%% Default LTS thresholds: 0th level = 100 entries max, other levels = 10 entries.
+-define(DEFAULT_LTS_THRESHOLD, {simple, {100, 10}}).
+
 %% Runtime state:
 %% Runtime state:
 -record(s, {
 -record(s, {
     db :: rocksdb:db_handle(),
     db :: rocksdb:db_handle(),
@@ -98,6 +102,7 @@
     trie_cf :: rocksdb:cf_handle(),
     trie_cf :: rocksdb:cf_handle(),
     serialization_schema :: emqx_ds_msg_serializer:schema(),
     serialization_schema :: emqx_ds_msg_serializer:schema(),
     hash_bytes :: pos_integer(),
     hash_bytes :: pos_integer(),
+    threshold_fun :: emqx_ds_lts:threshold_fun(),
     with_guid :: boolean()
     with_guid :: boolean()
 }).
 }).
 
 
@@ -136,7 +141,8 @@ create(_ShardId, DBHandle, GenId, Schema0, SPrev) ->
         wildcard_hash_bytes => 8,
         wildcard_hash_bytes => 8,
         topic_index_bytes => 8,
         topic_index_bytes => 8,
         serialization_schema => asn1,
         serialization_schema => asn1,
-        with_guid => false
+        with_guid => false,
+        lts_threshold_spec => ?DEFAULT_LTS_THRESHOLD
     },
     },
     Schema = maps:merge(Defaults, Schema0),
     Schema = maps:merge(Defaults, Schema0),
     ok = emqx_ds_msg_serializer:check_schema(maps:get(serialization_schema, Schema)),
     ok = emqx_ds_msg_serializer:check_schema(maps:get(serialization_schema, Schema)),
@@ -154,15 +160,22 @@ create(_ShardId, DBHandle, GenId, Schema0, SPrev) ->
     end,
     end,
     {Schema, [{DataCFName, DataCFHandle}, {TrieCFName, TrieCFHandle}]}.
     {Schema, [{DataCFName, DataCFHandle}, {TrieCFName, TrieCFHandle}]}.
 
 
-open(_Shard, DBHandle, GenId, CFRefs, #{
-    topic_index_bytes := TIBytes,
-    wildcard_hash_bytes := WCBytes,
-    serialization_schema := SSchema,
-    with_guid := WithGuid
-}) ->
+open(
+    _Shard,
+    DBHandle,
+    GenId,
+    CFRefs,
+    Schema = #{
+        topic_index_bytes := TIBytes,
+        wildcard_hash_bytes := WCBytes,
+        serialization_schema := SSchema,
+        with_guid := WithGuid
+    }
+) ->
     {_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs),
     {_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs),
     {_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs),
     {_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs),
     Trie = restore_trie(TIBytes, DBHandle, TrieCF),
     Trie = restore_trie(TIBytes, DBHandle, TrieCF),
+    ThresholdSpec = maps:get(lts_threshold_spec, Schema, ?DEFAULT_LTS_THRESHOLD),
     #s{
     #s{
         db = DBHandle,
         db = DBHandle,
         data_cf = DataCF,
         data_cf = DataCF,
@@ -170,6 +183,7 @@ open(_Shard, DBHandle, GenId, CFRefs, #{
         trie = Trie,
         trie = Trie,
         hash_bytes = WCBytes,
         hash_bytes = WCBytes,
         serialization_schema = SSchema,
         serialization_schema = SSchema,
+        threshold_fun = emqx_ds_lts:threshold_fun(ThresholdSpec),
         with_guid = WithGuid
         with_guid = WithGuid
     }.
     }.
 
 
@@ -181,7 +195,7 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{data_cf = DataCF, trie_cf = TrieCF,
 
 
 prepare_batch(
 prepare_batch(
     _ShardId,
     _ShardId,
-    S = #s{trie = Trie},
+    S = #s{trie = Trie, threshold_fun = TFun},
     Operations,
     Operations,
     _Options
     _Options
 ) ->
 ) ->
@@ -190,7 +204,7 @@ prepare_batch(
         fun
         fun
             ({Timestamp, Msg = #message{topic = Topic}}) ->
             ({Timestamp, Msg = #message{topic = Topic}}) ->
                 Tokens = words(Topic),
                 Tokens = words(Topic),
-                {Static, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
+                {Static, Varying} = emqx_ds_lts:topic_key(Trie, TFun, Tokens),
                 ?cooked_msg_op(Timestamp, Static, Varying, serialize(S, Varying, Msg));
                 ?cooked_msg_op(Timestamp, Static, Varying, serialize(S, Varying, Msg));
             ({delete, #message_matcher{topic = Topic, timestamp = Timestamp}}) ->
             ({delete, #message_matcher{topic = Topic, timestamp = Timestamp}}) ->
                 case emqx_ds_lts:lookup_topic_key(Trie, words(Topic)) of
                 case emqx_ds_lts:lookup_topic_key(Trie, words(Topic)) of
@@ -692,12 +706,6 @@ hash(HashBytes, TopicLevel) ->
 
 
 %%%%%%%% LTS %%%%%%%%%%
 %%%%%%%% LTS %%%%%%%%%%
 
 
-%% TODO: don't hardcode the thresholds
-threshold_fun(0) ->
-    100;
-threshold_fun(_) ->
-    10.
-
 -spec restore_trie(pos_integer(), rocksdb:db_handle(), rocksdb:cf_handle()) -> emqx_ds_lts:trie().
 -spec restore_trie(pos_integer(), rocksdb:db_handle(), rocksdb:cf_handle()) -> emqx_ds_lts:trie().
 restore_trie(StaticIdxBytes, DB, CF) ->
 restore_trie(StaticIdxBytes, DB, CF) ->
     PersistCallback = fun(Key, Val) ->
     PersistCallback = fun(Key, Val) ->