Browse Source

Merge pull request #13097 from ieQu1/dev/ds-replay-unrecoverable-no-test

feat(sessds): Handle unrecoverable errors
ieQu1 1 year ago
parent
commit
950fa82be3
1 changed files with 42 additions and 6 deletions
  1. 42 6
      apps/emqx/src/emqx_persistent_session_ds.erl

+ 42 - 6
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -629,8 +629,10 @@ replay_streams(Session0 = #{replay := [{StreamKey, Srs0} | Rest]}, ClientInfo) -
                 class => recoverable,
                 class => recoverable,
                 retry_in_ms => RetryTimeout
                 retry_in_ms => RetryTimeout
             }),
             }),
-            emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, RetryTimeout, Session0)
-        %% TODO: Handle unrecoverable errors.
+            emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, RetryTimeout, Session0);
+        {error, unrecoverable, Reason} ->
+            Session1 = skip_batch(StreamKey, Srs0, Session0, ClientInfo, Reason),
+            replay_streams(Session1#{replay := Rest}, ClientInfo)
     end;
     end;
 replay_streams(Session0 = #{replay := []}, _ClientInfo) ->
 replay_streams(Session0 = #{replay := []}, _ClientInfo) ->
     Session = maps:remove(replay, Session0),
     Session = maps:remove(replay, Session0),
@@ -655,6 +657,39 @@ replay_batch(Srs0, Session0, ClientInfo) ->
             Error
             Error
     end.
     end.
 
 
+%% Handle `{error, unrecoverable, _}' returned by `enqueue_batch'.
+%% Most likely they mean that the generation containing the messages
+%% has been removed.
+-spec skip_batch(_StreamKey, stream_state(), session(), clientinfo(), _Reason) -> session().
+skip_batch(StreamKey, SRS0, Session = #{s := S0}, ClientInfo, Reason) ->
+    ?SLOG(info, #{
+        msg => "session_ds_replay_unrecoverable_error",
+        reason => Reason,
+        srs => SRS0
+    }),
+    GenEvents = fun
+        F(QoS, SeqNo, LastSeqNo) when SeqNo < LastSeqNo ->
+            FakeMsg = #message{
+                id = <<>>,
+                qos = QoS,
+                payload = <<>>,
+                topic = <<>>,
+                timestamp = 0
+            },
+            _ = emqx_session_events:handle_event(ClientInfo, {expired, FakeMsg}),
+            F(QoS, inc_seqno(QoS, SeqNo), LastSeqNo);
+        F(_, _, _) ->
+            ok
+    end,
+    %% Treat messages as expired:
+    GenEvents(?QOS_1, SRS0#srs.first_seqno_qos1, SRS0#srs.last_seqno_qos1),
+    GenEvents(?QOS_2, SRS0#srs.first_seqno_qos2, SRS0#srs.last_seqno_qos2),
+    SRS = SRS0#srs{it_end = end_of_stream, batch_size = 0},
+    %% That's it for the iterator. Mark SRS as reached the
+    %% `end_of_stream', and let stream scheduler do the rest:
+    S = emqx_persistent_session_ds_state:put_stream(StreamKey, SRS, S0),
+    Session#{s := S}.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 -spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}.
 -spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}.
@@ -923,15 +958,16 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
             ),
             ),
             S = emqx_persistent_session_ds_state:put_stream(StreamKey, Srs, S2),
             S = emqx_persistent_session_ds_state:put_stream(StreamKey, Srs, S2),
             Session#{s => S};
             Session#{s => S};
-        {error, Class, Reason} ->
-            %% TODO: Handle unrecoverable error.
+        {error, recoverable, Reason} ->
             ?SLOG(debug, #{
             ?SLOG(debug, #{
                 msg => "failed_to_fetch_batch",
                 msg => "failed_to_fetch_batch",
                 stream => StreamKey,
                 stream => StreamKey,
                 reason => Reason,
                 reason => Reason,
-                class => Class
+                class => recoverable
             }),
             }),
-            Session0
+            Session0;
+        {error, unrecoverable, Reason} ->
+            skip_batch(StreamKey, Srs1, Session0, ClientInfo, Reason)
     end.
     end.
 
 
 enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0, s := S}, ClientInfo) ->
 enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0, s := S}, ClientInfo) ->