Преглед изворни кода

chore(ds): add some @TODOs

William Yang пре 1 година
родитељ
комит
a7f810bfb6

+ 1 - 0
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -225,6 +225,7 @@
     %% Shared subscription state:
     shared_sub_s := shared_sub_state(),
     %% Buffer:
+    %% @TODO check if it buffers QoS0 or not ?
     inflight := emqx_persistent_session_ds_buffer:t(),
     stream_scheduler_s := emqx_persistent_session_ds_stream_scheduler:t(),
     %% In-progress replay:

+ 1 - 0
apps/emqx_durable_storage/src/emqx_ds_beamformer.erl

@@ -351,6 +351,7 @@ do_dispatch(Beam = #beam{}) ->
 %%================================================================================
 
 -spec start_fulfill_loop(s()) -> s().
+%% @TODO start_fulfill_loop is missleading
 start_fulfill_loop(S = #s{is_spinning = true}) ->
     S;
 start_fulfill_loop(S = #s{is_spinning = false}) ->

+ 1 - 0
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -601,6 +601,7 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch
 %% sweep. This API does not suppose precise batch size.
 
 %%    When doing multi-next, we group iterators by stream:
+%% @TODO we need add it to the callback
 unpack_iterator(Shard, #{?tag := ?IT, ?generation := GenId, ?enc := Inner}) ->
     case generation_get(Shard, GenId) of
         #{module := Mod, data := GenData} ->

+ 6 - 2
apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl

@@ -236,6 +236,7 @@ commit_batch(
 ) ->
     {ok, Batch} = rocksdb:batch(),
     try
+        %% is this dummy?
         %% Commit LTS trie to the storage:
         lists:foreach(
             fun({Key, Val}) ->
@@ -346,9 +347,12 @@ unpack_iterator(_Shard, #s{trie = _Trie}, #it{
     {StaticIdx, words(CTF), StartKey, TS}.
 
 scan_stream(Shard, S, StaticIdx, Varying, LastSeenKey, BatchSize, TMax, IsCurrent) ->
+    %% @TODO: double check if it is Varying or TopicFilter, is it always [] ?
     LastSeenTS = match_ds_key(StaticIdx, LastSeenKey),
-    It = #it{static_index = StaticIdx, compressed_tf = emqx_topic:join(Varying), ts = LastSeenTS},
-    case next(Shard, S, It, BatchSize, TMax, IsCurrent) of
+    ItSeed = #it{
+        static_index = StaticIdx, compressed_tf = emqx_topic:join(Varying), ts = LastSeenTS
+    },
+    case next(Shard, S, ItSeed, BatchSize, TMax, IsCurrent) of
         {ok, #it{ts = TS, static_index = StaticIdx}, Batch} ->
             {ok, mk_key(StaticIdx, 0, <<>>, TS), Batch};
         Other ->