Forráskód Böngészése

feat(ds-blts): make epoch readability configurable

Also special-case single-epoch storage as always readable, even though
it's still unsafe.
Andrew Mayorov 1 éve
szülő
commit
6322e55c0f

+ 25 - 4
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -87,6 +87,9 @@
         bits_per_wildcard_level => pos_integer(),
         topic_index_bytes => pos_integer(),
         epoch_bits => non_neg_integer(),
+        %% Which epochs are considered readable, i.e. visible in `next/6`?
+        %% Default: `complete`, unless it's a single neverending epoch, it's always readable.
+        epoch_readable => complete | any,
         lts_threshold_spec => emqx_ds_lts:threshold_spec()
     }.
 
@@ -97,6 +100,7 @@
         topic_index_bytes := pos_integer(),
         ts_bits := non_neg_integer(),
         ts_offset_bits := non_neg_integer(),
+        epoch_readable => complete | any,
         lts_threshold_spec => emqx_ds_lts:threshold_spec()
     }.
 
@@ -109,6 +113,7 @@
     keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()),
     ts_bits :: non_neg_integer(),
     ts_offset :: non_neg_integer(),
+    epoch_readable :: complete | any,
     threshold_fun :: emqx_ds_lts:threshold_fun(),
     gvars :: ets:table()
 }).
@@ -204,8 +209,15 @@ create(_ShardId, DBHandle, GenId, Options, SPrev) ->
     %% Get options:
     BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64),
     TopicIndexBytes = maps:get(topic_index_bytes, Options, 4),
+    TSBits = 64,
     %% 20 bits -> 1048576 us -> ~1 sec
     TSOffsetBits = maps:get(epoch_bits, Options, 20),
+    case TSBits - TSOffsetBits of
+        %% NOTE: There's a single, always incomplete epoch, consider it readable.
+        0 -> EpochReadableDefault = any;
+        _ -> EpochReadableDefault = complete
+    end,
+    EpochReadable = maps:get(epoch_readable, Options, EpochReadableDefault),
     ThresholdSpec = maps:get(lts_threshold_spec, Options, ?DEFAULT_LTS_THRESHOLD),
     %% Create column families:
     DataCFName = data_cf(GenId),
@@ -224,8 +236,9 @@ create(_ShardId, DBHandle, GenId, Options, SPrev) ->
     Schema = #{
         bits_per_wildcard_level => BitsPerTopicLevel,
         topic_index_bytes => TopicIndexBytes,
-        ts_bits => 64,
+        ts_bits => TSBits,
         ts_offset_bits => TSOffsetBits,
+        epoch_readable => EpochReadable,
         lts_threshold_spec => ThresholdSpec
     },
     {Schema, [{DataCFName, DataCFHandle}, {TrieCFName, TrieCFHandle}]}.
@@ -267,6 +280,7 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
         keymappers = KeymapperCache,
         ts_offset = TSOffsetBits,
         ts_bits = TSBits,
+        epoch_readable = maps:get(epoch_readable, Schema, complete),
         threshold_fun = emqx_ds_lts:threshold_fun(ThresholdSpec),
         gvars = ets:new(?MODULE, [public, set, {read_concurrency, true}])
     }.
@@ -447,7 +461,7 @@ update_iterator(
 
 next(
     Shard,
-    Schema = #s{ts_offset = TSOffset, ts_bits = TSBits},
+    Schema = #s{ts_offset = TSOffset, ts_bits = TSBits, epoch_readable = EpochReadable},
     It = #{?storage_key := Stream},
     BatchSize,
     Now,
@@ -475,6 +489,9 @@ next(
                 %% iterator can jump to the next topic and then it
                 %% won't backtrack.
                 false;
+            _ when EpochReadable == any ->
+                %% Incomplete epochs are explicitly marked as readable:
+                false;
             _ ->
                 %% New batches are only added to the current
                 %% generation. We can ignore cutoff time for old
@@ -600,6 +617,9 @@ lookup_message(
             {error, unrecoverable, {rocksdb, Error}}
     end.
 
+handle_event(_ShardId, #s{epoch_readable = any}, _Time, tick) ->
+    %% No need for idle tracking.
+    [];
 handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) ->
     %% If the last message was published more than one epoch ago, and
     %% the shard remains idle, we need to advance safety cutoff
@@ -959,10 +979,11 @@ deserialize(Blob) ->
 
 %% erlfmt-ignore
 make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) ->
+    TsEpochBits = TSBits - TSOffsetBits,
     Bitsources =
     %% Dimension Offset   Bitsize
-        [{?DIM_TOPIC,     0,            TopicIndexBytes * ?BYTE_SIZE},      %% Topic index
-         {?DIM_TS,        TSOffsetBits, TSBits - TSOffsetBits       }] ++   %% Timestamp epoch
+        [{?DIM_TOPIC,     0,            TopicIndexBytes * ?BYTE_SIZE}] ++   %% Topic index
+        [{?DIM_TS,        TSOffsetBits, TsEpochBits} || TsEpochBits > 0] ++ %% Timestamp epoch
         [{?DIM_TS + I,    0,            BitsPerTopicLevel           }       %% Varying topic levels
                                                            || I <- lists:seq(1, N)] ++
         [{?DIM_TS,        0,            TSOffsetBits                }],     %% Timestamp offset