Procházet zdrojové kódy

chore(ds): clean some todos and update docs

William Yang před 1 rokem
rodič
revize
20d7c0553d

+ 0 - 1
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -225,7 +225,6 @@
     %% Shared subscription state:
     shared_sub_s := shared_sub_state(),
     %% Buffer:
-    %% @TODO check if it buffers QoS0 or not ?
     inflight := emqx_persistent_session_ds_buffer:t(),
     stream_scheduler_s := emqx_persistent_session_ds_stream_scheduler:t(),
     %% In-progress replay:

+ 3 - 3
apps/emqx/src/emqx_trie_search.erl

@@ -179,12 +179,12 @@ match(Topic, NextF) ->
 
 %% @doc Match given topic against the index and return _all_ matches.
 %% If `unique` option is given, return only unique matches by record ID.
--spec matches(emqx_types:topic(), nextf(), opts()) -> [key(_)].
+-spec matches(emqx_types:topic() | [word()], nextf(), opts()) -> [key(_)].
 matches(Topic, NextF, Opts) ->
     search(Topic, NextF, Opts).
 
 %% @doc Match given topic filter against the index and return _all_ matches.
--spec matches_filter(emqx_types:topic(), nextf(), opts()) -> [key(_)].
+-spec matches_filter(emqx_types:topic() | [word()], nextf(), opts()) -> [key(_)].
 matches_filter(TopicFilter, NextF, Opts) ->
     search(TopicFilter, NextF, [topic_filter | Opts]).
 
@@ -355,7 +355,7 @@ match_add(K, Acc) when is_list(Acc) ->
 match_add(K, first) ->
     throw({first, K}).
 
--spec filter_words(emqx_types:topic()) -> [word()].
+-spec filter_words(emqx_types:topic() | [word()]) -> [word()].
 filter_words(Words) when is_list(Words) ->
     Words;
 filter_words(Topic) when is_binary(Topic) ->

+ 51 - 17
apps/emqx_durable_storage/src/emqx_ds_beamformer.erl

@@ -15,7 +15,8 @@
 %%--------------------------------------------------------------------
 
 %% @doc This process is responsible for processing async poll requests
-%% from the consumers.
+%% from the consumers. It is designed as a generic, drop-in "frontend"
+%% that can be integrated into any DS backend (builtin or external).
 %%
 %% It serves as a pool for such requests, limiting the number of
 %% queries running in parallel. In addition, it tries to group
@@ -36,16 +37,19 @@
 %% from the pending queue one at a time, and tries to fulfill them
 %% normally by quering the storage.
 %%
-%% - If storage returns a non-empty batch, beamformer then searches
-%% for pending poll requests that may be coherent with the current
-%% one. All matching requests are then packed into "beams" (one per
-%% destination node) and sent out accodingly.
+%% - When storage returns a non-empty batch, an unrecoverable error or
+%% `end_of_stream', beamformer searches for pending poll requests that
+%% may be coherent with the current one. All matching requests are
+%% then packed into "beams" (one per destination node) and sent out
+%% accodingly.
 %%
 %% - If the query returns an empty batch, beamformer moves the request
 %% to the "wait" queue. Poll requests just linger there until they
 %% time out, or until beamformer receives a matching stream event from
-%% the storage. The storage backend can send requests to the
-%% beamformer by calling `shard_event' function.
+%% the storage.
+%%
+%% The storage backend can send events to the beamformer by calling
+%% `shard_event' function.
 %%
 %% Storage event processing logic is following: if beamformer finds
 %% waiting poll requests matching the event, it queries the storage
@@ -96,6 +100,16 @@
 %% Type declarations
 %%================================================================================
 
+%% `event_topic' and `event_topic_filter' types are structurally (but
+%% not semantically) equivalent to their `emqx_ds' counterparts.
+%%
+%% These types are only used for matching of events against waiting
+%% poll requests. Values of these types are never exposed outside.
+%% Hence the backend can, for example, compress topics used in the
+%% events and iterators.
+-type event_topic() :: emqx_ds:topic().
+-type event_topic_filter() :: emqx_ds:topic_filter().
+
 -type opts() :: #{
     n_workers := non_neg_integer()
 }.
@@ -120,7 +134,7 @@
 
 -type poll_req(ItKey, Iterator) ::
     #poll_req{
-        key :: {_Stream, _TopicFilter, emqx_ds:message_key()},
+        key :: {_Stream, event_topic_filter(), emqx_ds:message_key()},
         node :: node(),
         return_addr :: return_addr(ItKey),
         it :: Iterator,
@@ -167,7 +181,7 @@
 -type s() :: #s{}.
 
 -record(shard_event, {
-    events :: [{_Stream, _Topic}]
+    events :: [{_Stream, event_topic()}]
 }).
 
 -define(fulfill_loop, fulfill_loop).
@@ -181,7 +195,7 @@
 
 -type unpack_iterator_result(Stream) :: #{
     stream := Stream,
-    topic_filter := _,
+    topic_filter := event_topic_filter(),
     last_seen_key := emqx_ds:message_key(),
     timestamp := emqx_ds:time(),
     message_matcher := match_messagef()
@@ -200,6 +214,7 @@
 %% API functions
 %%================================================================================
 
+%% @doc Submit a poll request
 -spec poll(node(), return_addr(_ItKey), _Shard, _Iterator, emqx_ds:poll_opts()) ->
     ok.
 poll(Node, ReturnAddr, Shard, Iterator, Opts = #{timeout := Timeout}) ->
@@ -248,6 +263,27 @@ poll(Node, ReturnAddr, Shard, Iterator, Opts = #{timeout := Timeout}) ->
             ok
     end.
 
+%% @doc This internal API notifies the beamformer that new data is
+%% available for reading in the storage.
+%%
+%% Backends should call this function after committing data to the
+%% storage, so waiting long poll requests can be fulfilled.
+%%
+%% Types of arguments to this function are dependent on the backend.
+%%
+%% - `_Shard': most DS backends have some notion of shard. Beamformer
+%% uses this fact to partition poll requests, but otherwise it treats
+%% shard as an opaque value.
+%%
+%% - `_Stream': backend- and layout-specific type of stream.
+%% Beamformer uses exact matching on streams when it searches for
+%% similar requests and when it matches events. Otherwise it's an
+%% opaque type.
+%%
+%% - `event_topic()': When beamformer receives stream events, it
+%% selects waiting events with matching stream AND
+%% `event_topic_filter'.
+-spec shard_event(_Shard, [{_Stream, event_topic()}]) -> ok.
 shard_event(Shard, Events) ->
     Workers = gproc_pool:active_workers(emqx_ds_beamformer_sup:pool(Shard)),
     lists:foreach(
@@ -294,7 +330,7 @@ handle_call(
             {reply, Reply, S};
         false ->
             ets:insert(S#s.pending_queue, Req),
-            {reply, ok, start_fulfill_loop(S)}
+            {reply, ok, ensure_fulfill_loop(S)}
     end;
 handle_call(_Call, _From, S) ->
     {reply, {error, unknown_call}, S}.
@@ -350,11 +386,10 @@ do_dispatch(Beam = #beam{}) ->
 %% Internal functions
 %%================================================================================
 
--spec start_fulfill_loop(s()) -> s().
-%% @TODO start_fulfill_loop is missleading
-start_fulfill_loop(S = #s{is_spinning = true}) ->
+-spec ensure_fulfill_loop(s()) -> s().
+ensure_fulfill_loop(S = #s{is_spinning = true}) ->
     S;
-start_fulfill_loop(S = #s{is_spinning = false}) ->
+ensure_fulfill_loop(S = #s{is_spinning = false}) ->
     self() ! ?fulfill_loop,
     S#s{is_spinning = true}.
 
@@ -362,7 +397,6 @@ start_fulfill_loop(S = #s{is_spinning = false}) ->
 cleanup(S = #s{pending_queue = PendingTab, wait_queue = WaitingTab, metrics_id = Metrics}) ->
     do_cleanup(Metrics, PendingTab),
     do_cleanup(Metrics, WaitingTab),
-    %% erlang:garbage_collect(),
     S.
 
 do_cleanup(Metrics, Tab) ->
@@ -382,7 +416,7 @@ fulfill_pending(S = #s{pending_queue = PendingTab}) ->
             %% The function MUST destructively consume all requests
             %% matching stream and MsgKey to avoid infinite loop:
             do_fulfill_pending(S, Req),
-            start_fulfill_loop(S)
+            ensure_fulfill_loop(S)
     end.
 
 do_fulfill_pending(

+ 6 - 5
apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl

@@ -236,7 +236,6 @@ commit_batch(
 ) ->
     {ok, Batch} = rocksdb:batch(),
     try
-        %% is this dummy?
         %% Commit LTS trie to the storage:
         lists:foreach(
             fun({Key, Val}) ->
@@ -347,7 +346,6 @@ unpack_iterator(_Shard, #s{trie = _Trie}, #it{
     {StaticIdx, words(CTF), StartKey, TS}.
 
 scan_stream(Shard, S, StaticIdx, Varying, LastSeenKey, BatchSize, TMax, IsCurrent) ->
-    %% @TODO: double check if it is Varying or TopicFilter, is it always [] ?
     LastSeenTS = match_ds_key(StaticIdx, LastSeenKey),
     ItSeed = #it{
         static_index = StaticIdx, compressed_tf = emqx_topic:join(Varying), ts = LastSeenTS
@@ -614,6 +612,10 @@ next_loop(Ctx, It0, BatchSize, Op, Acc) ->
     %% }),
     #ctx{s = S, tmax = TMax, iters = Iterators} = Ctx,
     #it{static_index = StaticIdx, compressed_tf = CompressedTF} = It0,
+    %% Note: `next_step' function destructively updates RocksDB
+    %% iterators in `ctx.iters' (they are handles, not values!),
+    %% therefore a recursive call with the same arguments is not a
+    %% bug.
     case next_step(S, StaticIdx, CompressedTF, Iterators, any, Op) of
         none ->
             %% ?tp(notice, skipstream_loop_result, #{r => none}),
@@ -629,7 +631,6 @@ next_loop(Ctx, It0, BatchSize, Op, Acc) ->
             finalize_loop(It0, Acc);
         {seek, TS} ->
             %% ?tp(notice, skipstream_loop_result, #{r => seek, ts => TS}),
-            %% @TODO Deadloop if Op == {seek, TS}?
             It = It0#it{ts = TS},
             next_loop(Ctx, It, BatchSize, {seek, TS}, Acc);
         {ok, TS, DSKey, Msg0} ->
@@ -695,8 +696,8 @@ next_step(
                     end;
                 NextTS when NextTS > ExpectedTS, N > 0 ->
                     %% Next index level is not what we expect. Reset
-                    %% search to the first wilcard index, but continue
-                    %% from `NextTS'.
+                    %% search to the first wildcard index, but
+                    %% continue from `NextTS'.
                     %%
                     %% Note: if `NextTS > ExpectedTS' and `N =:= 0',
                     %% it means the upper (replication) level is