Просмотр исходного кода

fix(ds): Improve logic of skipstream LTS layout

Iterators:
Previously it used timestamp of the next message as a reference. This
won't work well for the upcoming beamformer/beamsplitter feature. This
commit changes the logic so iterators store timestamp of the last seen
message.

Cooked batches:
Cooked batches no longer store index entries. Creation of indexes has
been delegated to commit callback.
ieQu1 1 год назад
Родитель
Сommit
b010efb647
1 измененных файлов с 70 добавлено и 48 удалено
  1. 70 48
      apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl

+ 70 - 48
apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl

@@ -55,9 +55,14 @@
 %% Type declarations
 %%================================================================================
 
-%% keys:
+%% TLOG entry
+%%   keys:
 -define(cooked_payloads, 6).
 -define(cooked_lts_ops, 7).
+%%   Payload:
+-define(cooked_payload(TIMESTAMP, STATIC, VARYING, VALUE),
+    {TIMESTAMP, STATIC, VARYING, VALUE}
+).
 
 -define(lts_persist_ops, emqx_ds_storage_skipstream_lts_ops).
 
@@ -101,10 +106,11 @@
 
 -record(it, {
     static_index :: emqx_ds_lts:static_key(),
-    %% Minimal timestamp of the next message:
+    %% Timestamp of the last visited message:
     ts :: ts(),
     %% Compressed topic filter:
-    compressed_tf :: binary()
+    compressed_tf :: binary(),
+    misc = []
 }).
 
 %% Level iterator:
@@ -170,28 +176,16 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{data_cf = DataCF, trie_cf = TrieCF,
     ok = rocksdb:drop_column_family(DBHandle, TrieCF),
     ok.
 
-prepare_batch(
-    _ShardId,
-    S = #s{trie = Trie, hash_bytes = HashBytes},
-    Messages,
-    _Options
-) ->
+prepare_batch(_ShardId, S = #s{trie = Trie}, Messages, _Options) ->
     _ = erase(?lts_persist_ops),
-    Payloads =
-        lists:flatmap(
-            fun({Timestamp, Msg = #message{topic = Topic}}) ->
-                Tokens = words(Topic),
-                {Static, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
-                %% TODO: is it possible to create index during the
-                %% commit phase to avoid transferring indexes through
-                %% the translog?
-                [
-                    {mk_key(Static, 0, <<>>, Timestamp), serialize(S, Varying, Msg)}
-                    | mk_index(HashBytes, Static, Timestamp, Varying)
-                ]
-            end,
-            Messages
-        ),
+    Payloads = [
+        begin
+            Tokens = words(Topic),
+            {Static, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
+            ?cooked_payload(Timestamp, Static, Varying, serialize(S, Varying, Msg))
+        end
+     || {Timestamp, Msg = #message{topic = Topic}} <- Messages
+    ],
     {ok, #{
         ?cooked_payloads => Payloads,
         ?cooked_lts_ops => pop_lts_persist_ops()
@@ -199,7 +193,7 @@ prepare_batch(
 
 commit_batch(
     _ShardId,
-    #s{db = DB, trie_cf = TrieCF, data_cf = DataCF, trie = Trie},
+    #s{db = DB, trie_cf = TrieCF, data_cf = DataCF, trie = Trie, hash_bytes = HashBytes},
     #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads},
     Options
 ) ->
@@ -216,8 +210,10 @@ commit_batch(
         _ = emqx_ds_lts:trie_update(Trie, LtsOps),
         %% Commit payloads:
         lists:foreach(
-            fun({Key, Val}) ->
-                ok = rocksdb:batch_put(Batch, DataCF, Key, Val)
+            fun(?cooked_payload(Timestamp, Static, Varying, ValBlob)) ->
+                MasterKey = mk_key(Static, 0, <<>>, Timestamp),
+                ok = rocksdb:batch_put(Batch, DataCF, MasterKey, ValBlob),
+                mk_index(Batch, DataCF, HashBytes, Static, Varying, Timestamp)
             end,
             Payloads
         ),
@@ -243,12 +239,14 @@ get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
 get_delete_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
     get_streams(Trie, TopicFilter).
 
+make_iterator(_Shard, _State, _Stream, _TopicFilter, TS) when TS >= ?max_ts ->
+    {error, unrecoverable, "Timestamp is too large"};
 make_iterator(_Shard, #s{trie = Trie}, #stream{static_index = StaticIdx}, TopicFilter, StartTime) ->
     {ok, TopicStructure} = emqx_ds_lts:reverse_lookup(Trie, StaticIdx),
     CompressedTF = emqx_ds_lts:compress_topic(StaticIdx, TopicStructure, TopicFilter),
     {ok, #it{
         static_index = StaticIdx,
-        ts = StartTime,
+        ts = dec_ts(StartTime),
         compressed_tf = emqx_topic:join(CompressedTF)
     }}.
 
@@ -442,7 +440,7 @@ do_init_iterators(S, Static, [], _WildcardLevel) ->
 next_loop(
     Shard,
     S = #s{trie = Trie},
-    It = #it{static_index = StaticIdx, ts = TS, compressed_tf = CompressedTF},
+    It = #it{static_index = StaticIdx, ts = LastTS, compressed_tf = CompressedTF},
     Iterators,
     BatchSize,
     TMax
@@ -465,10 +463,10 @@ next_loop(
         filter = words(CompressedTF),
         tmax = TMax
     },
-    next_loop(Ctx, It, BatchSize, {seek, TS}, []).
+    next_loop(Ctx, It, BatchSize, {seek, inc_ts(LastTS)}, []).
 
-next_loop(_Ctx, It, 0, Op, Acc) ->
-    finalize_loop(It, Op, Acc);
+next_loop(_Ctx, It, 0, _Op, Acc) ->
+    finalize_loop(It, Acc);
 next_loop(Ctx, It0, BatchSize, Op, Acc) ->
     %% ?tp(notice, skipstream_loop, #{
     %%     ts => It0#it.ts, tf => It0#it.compressed_tf, bs => BatchSize, tmax => TMax, op => Op
@@ -479,15 +477,15 @@ next_loop(Ctx, It0, BatchSize, Op, Acc) ->
         none ->
             %% ?tp(notice, skipstream_loop_result, #{r => none}),
             inc_counter(?DS_SKIPSTREAM_LTS_EOS),
-            finalize_loop(It0, Op, Acc);
+            finalize_loop(It0, Acc);
         {seek, TS} when TS > TMax ->
             %% ?tp(notice, skipstream_loop_result, #{r => seek_future, ts => TS}),
             inc_counter(?DS_SKIPSTREAM_LTS_FUTURE),
-            finalize_loop(It0, {seek, TS}, Acc);
+            finalize_loop(It0, Acc);
         {ok, TS, _Key, _Msg0} when TS > TMax ->
             %% ?tp(notice, skipstream_loop_result, #{r => ok_future, ts => TS, key => _Key}),
             inc_counter(?DS_SKIPSTREAM_LTS_FUTURE),
-            finalize_loop(It0, {seek, TS}, Acc);
+            finalize_loop(It0, Acc);
         {seek, TS} ->
             %% ?tp(notice, skipstream_loop_result, #{r => seek, ts => TS}),
             It = It0#it{ts = TS},
@@ -499,12 +497,7 @@ next_loop(Ctx, It0, BatchSize, Op, Acc) ->
             next_loop(Ctx, It, BatchSize - 1, next, [{DSKey, Message} | Acc])
     end.
 
-finalize_loop(It0, Op, Acc) ->
-    case Op of
-        next -> NextTS = It0#it.ts + 1;
-        {seek, NextTS} -> ok
-    end,
-    It = It0#it{ts = NextTS},
+finalize_loop(It, Acc) ->
     {ok, It, lists:reverse(Acc)}.
 
 next_step(
@@ -581,14 +574,15 @@ free_iterators(Its) ->
 
 %%%%%%%% Indexes %%%%%%%%%%
 
-mk_index(HashBytes, Static, Timestamp, Varying) ->
-    mk_index(HashBytes, Static, Timestamp, 1, Varying, []).
+mk_index(Batch, CF, HashBytes, Static, Varying, Timestamp) ->
+    mk_index(Batch, CF, HashBytes, Static, Timestamp, 1, Varying).
 
-mk_index(HashBytes, Static, Timestamp, N, [TopicLevel | Varying], Acc) ->
-    Op = {mk_key(Static, N, hash(HashBytes, TopicLevel), Timestamp), <<>>},
-    mk_index(HashBytes, Static, Timestamp, N + 1, Varying, [Op | Acc]);
-mk_index(_HashBytes, _Static, _Timestamp, _N, [], Acc) ->
-    Acc.
+mk_index(Batch, CF, HashBytes, Static, Timestamp, N, [TopicLevel | Varying]) ->
+    Key = mk_key(Static, N, hash(HashBytes, TopicLevel), Timestamp),
+    ok = rocksdb:batch_put(Batch, CF, Key, <<>>),
+    mk_index(Batch, CF, HashBytes, Static, Timestamp, N + 1, Varying);
+mk_index(_Batch, _CF, _HashBytes, _Static, _Timestamp, _N, []) ->
+    ok.
 
 %%%%%%%% Keys %%%%%%%%%%
 
@@ -747,3 +741,31 @@ collect_counters(Shard) ->
         end,
         ?COUNTERS
     ).
+
+inc_ts(?max_ts) -> 0;
+inc_ts(TS) when TS >= 0, TS < ?max_ts -> TS + 1.
+
+dec_ts(0) -> ?max_ts;
+dec_ts(TS) when TS > 0, TS =< ?max_ts -> TS - 1.
+
+%%================================================================================
+%% Tests
+%%================================================================================
+
+-ifdef(TEST).
+
+inc_dec_test_() ->
+    Numbers = [0, 1, 100, ?max_ts - 1, ?max_ts],
+    [
+        ?_assertEqual(N, dec_ts(inc_ts(N)))
+     || N <- Numbers
+    ].
+
+dec_inc_test_() ->
+    Numbers = [0, 1, 100, ?max_ts - 1, ?max_ts],
+    [
+        ?_assertEqual(N, inc_ts(dec_ts(N)))
+     || N <- Numbers
+    ].
+
+-endif.