소스 검색

feat(sessds): handle recoverable errors in stream scheduler

Andrew Mayorov 1 년 전
부모
커밋
35c43eb8a0
2개의 변경된 파일21개의 추가작업 그리고 13개의 파일을 삭제
  1. 3 3
      apps/emqx/src/emqx_persistent_session_ds.erl
  2. 18 10
      apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl

+ 3 - 3
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -571,7 +571,7 @@ replay(ClientInfo, [], Session0 = #{s := S0}) ->
     Session = replay_streams(Session0#{replay => Streams}, ClientInfo),
     {ok, [], Session}.
 
-replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo) ->
+replay_streams(Session0 = #{replay := [{StreamKey, Srs0} | Rest]}, ClientInfo) ->
     case replay_batch(Srs0, Session0, ClientInfo) of
         Session = #{} ->
             replay_streams(Session#{replay := Rest}, ClientInfo);
@@ -579,7 +579,7 @@ replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo)
             RetryTimeout = ?TIMEOUT_RETRY_REPLAY,
             ?SLOG(warning, #{
                 msg => "failed_to_fetch_replay_batch",
-                stream => Srs0,
+                stream => StreamKey,
                 reason => Reason,
                 class => recoverable,
                 retry_in_ms => RetryTimeout
@@ -867,7 +867,7 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
             %% TODO: Handle unrecoverable error.
             ?SLOG(info, #{
                 msg => "failed_to_fetch_batch",
-                stream => Srs1,
+                stream => StreamKey,
                 reason => Reason,
                 class => Class
             }),

+ 18 - 10
apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl

@@ -208,16 +208,24 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
             ?SLOG(debug, #{
                 msg => new_stream, key => Key, stream => Stream
             }),
-            {ok, Iterator} = emqx_ds:make_iterator(
-                ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
-            ),
-            NewStreamState = #srs{
-                rank_x = RankX,
-                rank_y = RankY,
-                it_begin = Iterator,
-                it_end = Iterator
-            },
-            emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S);
+            case emqx_ds:make_iterator(?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime) of
+                {ok, Iterator} ->
+                    NewStreamState = #srs{
+                        rank_x = RankX,
+                        rank_y = RankY,
+                        it_begin = Iterator,
+                        it_end = Iterator
+                    },
+                    emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S);
+                {error, recoverable, Reason} ->
+                    ?SLOG(warning, #{
+                        msg => "failed_to_initialize_stream_iterator",
+                        stream => Stream,
+                        class => recoverable,
+                        reason => Reason
+                    }),
+                    S
+            end;
         #srs{} ->
             S
     end.