Преглед изворни кода

fix(sessds): Improve comments

ieQu1 пре 2 година
родитељ
комит
eec56b0d6b

+ 1 - 1
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -1037,7 +1037,7 @@ next_seqno_gen() ->
     ?LET(
         {Epoch, Offset},
         {non_neg_integer(), range(0, ?EPOCH_SIZE)},
-        Epoch bsl 15 + Offset
+        Epoch bsl ?EPOCH_BITS + Offset
     ).
 
 %%%% Property-based tests:

+ 1 - 1
apps/emqx/src/emqx_persistent_session_ds_state.erl

@@ -85,7 +85,7 @@
     #{
         ?created_at => emqx_persistent_session_ds:timestamp(),
         ?last_alive_at => emqx_persistent_session_ds:timestamp(),
-        ?expiry_interval => emqx_types:conninfo(),
+        ?expiry_interval => non_neg_integer(),
         ?last_id => integer()
     }.
 

+ 35 - 0
apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl

@@ -39,6 +39,8 @@
 %% API functions
 %%================================================================================
 
+%% @doc Find the streams that have uncommitted (in-flight) messages.
+%% Return them in the order they were previously replayed.
 -spec find_replay_streams(emqx_persistent_session_ds_state:t()) ->
     [{emqx_persistent_session_ds_state:stream_key(), emqx_persistent_session_ds:stream_state()}].
 find_replay_streams(S) ->
@@ -59,6 +61,15 @@ find_replay_streams(S) ->
     ),
     lists:sort(fun compare_streams/2, Streams).
 
+%% @doc Find streams from which the new messages can be fetched.
+%%
+%% Currently it amounts to the streams that don't have any inflight
+%% messages, since for performance reasons we keep only one record of
+%% in-flight messages per stream, and we don't want to overwrite these
+%% records prematurely.
+%%
+%% This function is non-detereministic: it randomizes the order of
+%% streams to ensure fair replay of different topics.
 -spec find_new_streams(emqx_persistent_session_ds_state:t()) ->
     [{emqx_persistent_session_ds_state:stream_key(), emqx_persistent_session_ds:stream_state()}].
 find_new_streams(S) ->
@@ -91,6 +102,23 @@ find_new_streams(S) ->
         )
     ).
 
+%% @doc This function makes the session aware of the new streams.
+%%
+%% It has the following properties:
+%%
+%% 1. For each RankX, it keeps only the streams with the same RankY.
+%%
+%% 2. For each RankX, it never advances RankY until _all_ streams with
+%% the same RankX are replayed.
+%%
+%% 3. Once all streams with the given rank are replayed, it advances
+%% the RankY to the smallest known RankY that is greater than replayed
+%% RankY.
+%%
+%% 4. If the RankX has never been replayed, it selects the streams
+%% with the smallest RankY.
+%%
+%% This way, messages from the same topic/shard are never reordered.
 -spec renew_streams(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t().
 renew_streams(S0) ->
     S1 = remove_fully_replayed_streams(S0),
@@ -192,6 +220,12 @@ select_streams(SubId, RankX, Streams0, S) ->
             lists:takewhile(fun({{_, Y}, _}) -> Y =:= MinRankY end, Streams)
     end.
 
+%% @doc Advance RankY for each RankX that doesn't have any unreplayed
+%% streams.
+%%
+%% Drop streams with the fully replayed rank. This function relies on
+%% the fact that all streams with the same RankX have also the same
+%% RankY.
 -spec remove_fully_replayed_streams(emqx_persistent_session_ds_state:t()) ->
     emqx_persistent_session_ds_state:t().
 remove_fully_replayed_streams(S0) ->
@@ -246,6 +280,7 @@ remove_fully_replayed_streams(S0) ->
         S1
     ).
 
+%% @doc Compare the streams by the order in which they were replayed.
 compare_streams(
     {_KeyA, #srs{first_seqno_qos1 = A1, first_seqno_qos2 = A2}},
     {_KeyB, #srs{first_seqno_qos1 = B1, first_seqno_qos2 = B2}}

+ 1 - 1
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -928,7 +928,7 @@ t_publish_many_while_client_is_gone(Config) ->
     {ok, Client3} = emqtt:start_link([{clean_start, false} | ClientOpts]),
     {ok, _} = emqtt:ConnFun(Client3),
 
-    %% Check that the messages are retransmitted with DUP=1:
+    %% Check that we receive the rest of the messages:
     Msgs3 = receive_messages(NPubs, _Timeout = 2000),
     ct:pal("Msgs3 = ~p", [Msgs3]),
     ?assertMatch(

+ 3 - 0
apps/emqx/test/emqx_persistent_session_ds_state_tests.erl

@@ -27,6 +27,9 @@
 %% Type declarations
 %%================================================================================
 
+%% Note: here `committed' != `dirty'. It means "has been committed at
+%% least once since the creation", and it's used by the iteration
+%% test.
 -record(s, {subs = #{}, metadata = #{}, streams = #{}, seqno = #{}, committed = false}).
 
 -type state() :: #{emqx_persistent_session_ds:id() => #s{}}.

+ 5 - 1
apps/emqx_durable_storage/src/emqx_ds_lts.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -213,6 +213,10 @@ trie_next(#trie{trie = Trie}, State, ?EOT) ->
         [] -> undefined
     end;
 trie_next(#trie{trie = Trie}, State, Token) ->
+    %% NOTE: it's crucial to return the original (non-wildcard) index
+    %% for the topic, if found. Otherwise messages from the same topic
+    %% will end up in different streams, once the wildcard is learned,
+    %% and their replay order will become undefined:
     case ets:lookup(Trie, {State, Token}) of
         [#trans{next = Next}] ->
             {false, Next};