Jelajahi Sumber

refactor(ds): Use async next in sessds fetch function.

Further untangle the logic of a function formerly known as
enqueue_batch.

Its intermediate context is now stored in a record, and enqueue takes
care of intepreting that context.
ieQu1 1 tahun lalu
induk
melakukan
0c5b8448ec
1 mengubah file dengan 110 tambahan dan 54 penghapusan
  1. 110 54
      apps/emqx/src/emqx_persistent_session_ds.erl

+ 110 - 54
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -27,6 +27,7 @@
 
 -include("emqx_session.hrl").
 -include("emqx_persistent_session_ds/session_internals.hrl").
+-include_lib("emqx_durable_storage/include/emqx_ds.hrl").
 
 -ifdef(TEST).
 -include_lib("proper/include/proper.hrl").
@@ -175,6 +176,13 @@
 %% TODO: Needs configuration?
 -define(TIMEOUT_RETRY_REPLAY, 1000).
 
+-record(pending_next, {
+    ref :: reference(),
+    stream_key :: emqx_persistent_session_ds_state:stream_key(),
+    it_begin :: emqx_ds:iterator(),
+    is_replay :: boolean()
+}).
+
 -type session() :: #{
     %% Client ID
     id := id(),
@@ -694,7 +702,7 @@ replay(ClientInfo, [], Session0 = #{s := S0}) ->
     {ok, [], Session}.
 
 replay_streams(Session0 = #{replay := [{StreamKey, Srs0} | Rest]}, ClientInfo) ->
-    case replay_batch(Srs0, Session0, ClientInfo) of
+    case replay_batch(StreamKey, Srs0, Session0, ClientInfo) of
         Session = #{} ->
             replay_streams(Session#{replay := Rest}, ClientInfo);
         {error, recoverable, Reason} ->
@@ -718,10 +726,13 @@ replay_streams(Session0 = #{replay := []}, _ClientInfo) ->
     %% mechanisms to replay them:
     pull_now(Session).
 
--spec replay_batch(stream_state(), session(), clientinfo()) -> session() | emqx_ds:error(_).
-replay_batch(Srs0, Session0, ClientInfo) ->
-    #srs{batch_size = BatchSize} = Srs0,
-    case enqueue_batch(true, Session0, ClientInfo, fetch(true, BatchSize, Srs0)) of
+-spec replay_batch(
+    emqx_persistent_session_ds_state:stream_key(), stream_state(), session(), clientinfo()
+) ->
+    session() | emqx_ds:error(_).
+replay_batch(StreamKey, Srs0, Session0, ClientInfo) ->
+    Pending = fetch(true, StreamKey, Srs0, undefined),
+    case enqueue_batch(Session0, ClientInfo, Pending, receive_pending(Pending)) of
         {ok, Srs, Session} ->
             %% Assert:
             Srs =:= Srs0 orelse
@@ -1032,16 +1043,8 @@ fetch_new_messages(ItStream0, BatchSize, Session0, ClientInfo) ->
     end.
 
 new_batch(StreamKey, Srs0, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
-    SN1 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S0),
-    SN2 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S0),
-    Srs1 = Srs0#srs{
-        first_seqno_qos1 = SN1,
-        first_seqno_qos2 = SN2,
-        batch_size = 0,
-        last_seqno_qos1 = SN1,
-        last_seqno_qos2 = SN2
-    },
-    case enqueue_batch(false, Session0, ClientInfo, fetch(false, BatchSize, Srs1)) of
+    Pending = fetch(false, StreamKey, Srs0, BatchSize),
+    case enqueue_batch(Session0, ClientInfo, Pending, receive_pending(Pending)) of
         {ok, Srs, Session} ->
             S1 = emqx_persistent_session_ds_state:put_seqno(
                 ?next(?QOS_1),
@@ -1064,20 +1067,93 @@ new_batch(StreamKey, Srs0, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
             }),
             Session0;
         {error, unrecoverable, Reason} ->
-            skip_batch(StreamKey, Srs1, Session0, ClientInfo, Reason)
+            skip_batch(StreamKey, Srs0, Session0, ClientInfo, Reason)
+    end.
+
+%%--------------------------------------------------------------------
+%% Generic functions for fetching messages (during replay or normal
+%% operation):
+%% --------------------------------------------------------------------
+
+fetch(IsReplay, StreamKey, Srs0, DefaultBatchSize) ->
+    case IsReplay of
+        true ->
+            %% When we do replay we must use the same starting point
+            %% and batch size as initially:
+            BatchSize = Srs0#srs.batch_size,
+            ItBegin = Srs0#srs.it_begin;
+        false ->
+            BatchSize = DefaultBatchSize,
+            ItBegin = Srs0#srs.it_end
+    end,
+    {ok, Ref} = emqx_ds:anext(?PERSISTENT_MESSAGE_DB, ItBegin, BatchSize),
+    #pending_next{
+        ref = Ref,
+        is_replay = IsReplay,
+        it_begin = ItBegin,
+        stream_key = StreamKey
+    }.
+
+receive_pending(#pending_next{ref = Ref}) ->
+    receive
+        #ds_async_result{ref = Ref, data = Data} -> Data
     end.
 
-enqueue_batch(IsReplay, Session = #{inflight := Inflight0, s := S}, ClientInfo, FetchResult) ->
+enqueue_batch(Session = #{s := S}, ClientInfo, Pending, FetchResult) ->
+    #pending_next{stream_key = StreamKey, is_replay = IsReplay} = Pending,
+    case emqx_persistent_session_ds_state:get_stream(StreamKey, S) of
+        undefined ->
+            %% Assert: streams should not change or go missing during
+            %% replay:
+            false = IsReplay,
+            %% But normally, if stream goes missing, it means the
+            %% client unsubscribed while we were awaiting the batch or
+            %% shared sub group leader revoked the lease. We can just
+            %% ignore it: we haven't done any side effects related to
+            %% so far.
+            ?SLOG(info, #{
+                msg => "sessds_ignoring_orphaned_batch",
+                pending => Pending
+            }),
+            Session;
+        Srs ->
+            do_enqueue_batch(Session, ClientInfo, Pending, Srs, FetchResult)
+    end.
+
+do_enqueue_batch(
+    Session = #{s := S0, inflight := Inflight0}, ClientInfo, Pending, Srs0, FetchResult
+) ->
+    #pending_next{is_replay = IsReplay, it_begin = ItBegin} = Pending,
     case FetchResult of
-        {ok, Srs, end_of_stream} ->
+        {error, _, _} = Error ->
+            Error;
+        {ok, end_of_stream} ->
+            %% No new messages; just update the end iterator:
+            Srs = Srs0#srs{
+                it_begin = ItBegin,
+                it_end = end_of_stream,
+                batch_size = 0
+            },
             {ok, Srs, Session};
-        {ok, Srs1, Messages} ->
-            #srs{
-                first_seqno_qos1 = FirstSeqnoQos1,
-                first_seqno_qos2 = FirstSeqnoQos2,
-                sub_state_id = SubStateId
-            } = Srs1,
-            SubState = #{} = emqx_persistent_session_ds_state:get_subscription_state(SubStateId, S),
+        {ok, ItEnd, Messages} ->
+            case IsReplay of
+                false ->
+                    %% Normally we assign a new set of sequence
+                    %% numbers to messages in the batch:
+                    FirstSeqnoQos1 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S0),
+                    FirstSeqnoQos2 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S0);
+                true ->
+                    %% During replay we reuse the original sequence
+                    %% numbers:
+                    #srs{
+                        first_seqno_qos1 = FirstSeqnoQos1,
+                        first_seqno_qos2 = FirstSeqnoQos2
+                    } = Srs0
+            end,
+            SubState =
+                #{} = emqx_persistent_session_ds_state:get_subscription_state(
+                    Srs0#srs.sub_state_id, S0
+                ),
             {Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch(
                 IsReplay,
                 Session,
@@ -1088,44 +1164,24 @@ enqueue_batch(IsReplay, Session = #{inflight := Inflight0, s := S}, ClientInfo,
                 Messages,
                 Inflight0
             ),
-            Srs = Srs1#srs{
-                last_seqno_qos1 = LastSeqnoQos1,
-                last_seqno_qos2 = LastSeqnoQos2
-            },
-            {ok, Srs, Session#{inflight := Inflight}};
-        {error, _, _} = Error ->
-            Error
-    end.
-
-fetch(IsReplay, BatchSize, Srs0) ->
-    #srs{
-        it_begin = ItBegin0,
-        it_end = ItEnd0
-    } = Srs0,
-    ItBegin =
-        case IsReplay of
-            true -> ItBegin0;
-            false -> ItEnd0
-        end,
-    case emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, BatchSize) of
-        {ok, ItEnd, Messages} ->
             Srs = Srs0#srs{
                 it_begin = ItBegin,
                 it_end = ItEnd,
-                batch_size = length(Messages)
+                batch_size = length(Messages),
+                first_seqno_qos1 = FirstSeqnoQos1,
+                first_seqno_qos2 = FirstSeqnoQos2,
+                last_seqno_qos1 = LastSeqnoQos1,
+                last_seqno_qos2 = LastSeqnoQos2
             },
-            {ok, Srs, Messages};
-        {ok, end_of_stream} ->
-            %% No new messages; just update the end iterator:
-            Srs = Srs0#srs{it_begin = ItBegin, it_end = end_of_stream, batch_size = 0},
-            {ok, Srs, end_of_stream};
-        {error, _, _} = Error ->
-            Error
+            {ok, Srs, Session#{inflight := Inflight}}
     end.
 
 %% key_of_iter(#{3 := #{3 := #{5 := K}}}) ->
 %%     K.
 
+%% Enrich messages according to the subscription options and assign
+%% sequence number to each message, later to be used for packet ID
+%% generation:
 process_batch(
     _IsReplay, _Session, _SubState, _ClientInfo, LastSeqNoQos1, LastSeqNoQos2, [], Inflight
 ) ->