Просмотр исходного кода

Merge pull request #12797 from keynslug/fix/dsrepl-error-handling

fix(dsrepl): handle RPC errors gracefully when storage is down
Andrew Mayorov 1 год назад
Родитель
Сommit
2d074df209

+ 3 - 3
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -571,7 +571,7 @@ replay(ClientInfo, [], Session0 = #{s := S0}) ->
     Session = replay_streams(Session0#{replay => Streams}, ClientInfo),
     {ok, [], Session}.
 
-replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo) ->
+replay_streams(Session0 = #{replay := [{StreamKey, Srs0} | Rest]}, ClientInfo) ->
     case replay_batch(Srs0, Session0, ClientInfo) of
         Session = #{} ->
             replay_streams(Session#{replay := Rest}, ClientInfo);
@@ -579,7 +579,7 @@ replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo)
             RetryTimeout = ?TIMEOUT_RETRY_REPLAY,
             ?SLOG(warning, #{
                 msg => "failed_to_fetch_replay_batch",
-                stream => Srs0,
+                stream => StreamKey,
                 reason => Reason,
                 class => recoverable,
                 retry_in_ms => RetryTimeout
@@ -867,7 +867,7 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
             %% TODO: Handle unrecoverable error.
             ?SLOG(info, #{
                 msg => "failed_to_fetch_batch",
-                stream => Srs1,
+                stream => StreamKey,
                 reason => Reason,
                 class => Class
             }),

+ 18 - 10
apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl

@@ -208,16 +208,24 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
             ?SLOG(debug, #{
                 msg => new_stream, key => Key, stream => Stream
             }),
-            {ok, Iterator} = emqx_ds:make_iterator(
-                ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
-            ),
-            NewStreamState = #srs{
-                rank_x = RankX,
-                rank_y = RankY,
-                it_begin = Iterator,
-                it_end = Iterator
-            },
-            emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S);
+            case emqx_ds:make_iterator(?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime) of
+                {ok, Iterator} ->
+                    NewStreamState = #srs{
+                        rank_x = RankX,
+                        rank_y = RankY,
+                        it_begin = Iterator,
+                        it_end = Iterator
+                    },
+                    emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S);
+                {error, recoverable, Reason} ->
+                    ?SLOG(warning, #{
+                        msg => "failed_to_initialize_stream_iterator",
+                        stream => Stream,
+                        class => recoverable,
+                        reason => Reason
+                    }),
+                    S
+            end;
         #srs{} ->
             S
     end.

+ 99 - 53
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -181,12 +181,19 @@ list_generations_with_lifetimes(DB) ->
     Shards = list_shards(DB),
     lists:foldl(
         fun(Shard, GensAcc) ->
+            case ra_list_generations_with_lifetimes(DB, Shard) of
+                Gens = #{} ->
+                    ok;
+                {error, _Class, _Reason} ->
+                    %% TODO: log error
+                    Gens = #{}
+            end,
             maps:fold(
                 fun(GenId, Data, AccInner) ->
                     AccInner#{{Shard, GenId} => Data}
                 end,
                 GensAcc,
-                ra_list_generations_with_lifetimes(DB, Shard)
+                Gens
             )
         end,
         #{},
@@ -221,14 +228,13 @@ get_streams(DB, TopicFilter, StartTime) ->
     Shards = list_shards(DB),
     lists:flatmap(
         fun(Shard) ->
-            Streams =
-                try
-                    ra_get_streams(DB, Shard, TopicFilter, StartTime)
-                catch
-                    error:{erpc, _} ->
-                        %% TODO: log?
-                        []
-                end,
+            case ra_get_streams(DB, Shard, TopicFilter, StartTime) of
+                Streams when is_list(Streams) ->
+                    ok;
+                {error, _Class, _Reason} ->
+                    %% TODO: log error
+                    Streams = []
+            end,
             lists:map(
                 fun({RankY, StorageLayerStream}) ->
                     RankX = Shard,
@@ -262,14 +268,11 @@ get_delete_streams(DB, TopicFilter, StartTime) ->
     emqx_ds:make_iterator_result(iterator()).
 make_iterator(DB, Stream, TopicFilter, StartTime) ->
     ?stream_v2(Shard, StorageStream) = Stream,
-    try ra_make_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of
+    case ra_make_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of
         {ok, Iter} ->
             {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
         Error = {error, _, _} ->
             Error
-    catch
-        error:RPCError = {erpc, _} ->
-            {error, recoverable, RPCError}
     end.
 
 -spec make_delete_iterator(emqx_ds:db(), delete_stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
@@ -279,22 +282,19 @@ make_delete_iterator(DB, Stream, TopicFilter, StartTime) ->
     case ra_make_delete_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of
         {ok, Iter} ->
             {ok, #{?tag => ?DELETE_IT, ?shard => Shard, ?enc => Iter}};
-        Err = {error, _} ->
-            Err
+        Error = {error, _, _} ->
+            Error
     end.
 
 -spec update_iterator(emqx_ds:db(), iterator(), emqx_ds:message_key()) ->
     emqx_ds:make_iterator_result(iterator()).
 update_iterator(DB, OldIter, DSKey) ->
     #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter,
-    try ra_update_iterator(DB, Shard, StorageIter, DSKey) of
+    case ra_update_iterator(DB, Shard, StorageIter, DSKey) of
         {ok, Iter} ->
             {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
         Error = {error, _, _} ->
             Error
-    catch
-        error:RPCError = {erpc, _} ->
-            {error, recoverable, RPCError}
     end.
 
 -spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
@@ -312,12 +312,8 @@ next(DB, Iter0, BatchSize) ->
         {ok, StorageIter, Batch} ->
             Iter = Iter0#{?enc := StorageIter},
             {ok, Iter, Batch};
-        Ok = {ok, _} ->
-            Ok;
-        Error = {error, _, _} ->
-            Error;
-        RPCError = {badrpc, _} ->
-            {error, recoverable, RPCError}
+        Other ->
+            Other
     end.
 
 -spec delete_next(emqx_ds:db(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) ->
@@ -354,6 +350,19 @@ foreach_shard(DB, Fun) ->
 %% Internal exports (RPC targets)
 %%================================================================================
 
+%% NOTE
+%% Target node may still be in the process of starting up when RPCs arrive, it's
+%% good to have them handled gracefully.
+%% TODO
+%% There's a possibility of race condition: storage may shut down right after we
+%% ask for its status.
+-define(IF_STORAGE_RUNNING(SHARDID, EXPR),
+    case emqx_ds_storage_layer:shard_info(SHARDID, status) of
+        running -> EXPR;
+        down -> {error, recoverable, storage_down}
+    end
+).
+
 -spec do_drop_db_v1(emqx_ds:db()) -> ok | {error, _}.
 do_drop_db_v1(DB) ->
     MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
@@ -386,11 +395,18 @@ do_get_streams_v1(_DB, _Shard, _TopicFilter, _StartTime) ->
     error(obsolete_api).
 
 -spec do_get_streams_v2(
-    emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds:topic_filter(),
+    emqx_ds:time()
 ) ->
-    [{integer(), emqx_ds_storage_layer:stream()}].
+    [{integer(), emqx_ds_storage_layer:stream()}] | emqx_ds:error(storage_down).
 do_get_streams_v2(DB, Shard, TopicFilter, StartTime) ->
-    emqx_ds_storage_layer:get_streams({DB, Shard}, TopicFilter, StartTime).
+    ShardId = {DB, Shard},
+    ?IF_STORAGE_RUNNING(
+        ShardId,
+        emqx_ds_storage_layer:get_streams(ShardId, TopicFilter, StartTime)
+    ).
 
 -dialyzer({nowarn_function, do_make_iterator_v1/5}).
 -spec do_make_iterator_v1(
@@ -413,7 +429,11 @@ do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) ->
 ) ->
     emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()).
 do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) ->
-    emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime).
+    ShardId = {DB, Shard},
+    ?IF_STORAGE_RUNNING(
+        ShardId,
+        emqx_ds_storage_layer:make_iterator(ShardId, Stream, TopicFilter, StartTime)
+    ).
 
 -spec do_make_delete_iterator_v4(
     emqx_ds:db(),
@@ -434,9 +454,7 @@ do_make_delete_iterator_v4(DB, Shard, Stream, TopicFilter, StartTime) ->
 ) ->
     emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()).
 do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
-    emqx_ds_storage_layer:update_iterator(
-        {DB, Shard}, OldIter, DSKey
-    ).
+    emqx_ds_storage_layer:update_iterator({DB, Shard}, OldIter, DSKey).
 
 -spec do_next_v1(
     emqx_ds:db(),
@@ -446,7 +464,11 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
 ) ->
     emqx_ds:next_result(emqx_ds_storage_layer:iterator()).
 do_next_v1(DB, Shard, Iter, BatchSize) ->
-    emqx_ds_storage_layer:next({DB, Shard}, Iter, BatchSize).
+    ShardId = {DB, Shard},
+    ?IF_STORAGE_RUNNING(
+        ShardId,
+        emqx_ds_storage_layer:next(ShardId, Iter, BatchSize)
+    ).
 
 -spec do_delete_next_v4(
     emqx_ds:db(),
@@ -464,9 +486,14 @@ do_add_generation_v2(_DB) ->
     error(obsolete_api).
 
 -spec do_list_generations_with_lifetimes_v3(emqx_ds:db(), shard_id()) ->
-    #{emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info()}.
-do_list_generations_with_lifetimes_v3(DB, ShardId) ->
-    emqx_ds_storage_layer:list_generations_with_lifetimes({DB, ShardId}).
+    #{emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info()}
+    | emqx_ds:error(storage_down).
+do_list_generations_with_lifetimes_v3(DB, Shard) ->
+    ShardId = {DB, Shard},
+    ?IF_STORAGE_RUNNING(
+        ShardId,
+        emqx_ds_storage_layer:list_generations_with_lifetimes(ShardId)
+    ).
 
 -spec do_drop_generation_v3(emqx_ds:db(), shard_id(), emqx_ds_storage_layer:gen_id()) ->
     ok | {error, _}.
@@ -491,6 +518,15 @@ list_nodes() ->
 %% Too large for normal operation, need better backpressure mechanism.
 -define(RA_TIMEOUT, 60 * 1000).
 
+-define(SAFERPC(EXPR),
+    try
+        EXPR
+    catch
+        error:RPCError = {erpc, _} ->
+            {error, recoverable, RPCError}
+    end
+).
+
 ra_store_batch(DB, Shard, Messages) ->
     Command = #{
         ?tag => ?BATCH,
@@ -544,28 +580,34 @@ ra_drop_generation(DB, Shard, GenId) ->
 ra_get_streams(DB, Shard, TopicFilter, Time) ->
     {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
     TimestampUs = timestamp_to_timeus(Time),
-    emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs).
+    ?SAFERPC(emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs)).
 
 ra_get_delete_streams(DB, Shard, TopicFilter, Time) ->
     {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
-    emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time).
+    ?SAFERPC(emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time)).
 
 ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
     {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
-    TimestampUs = timestamp_to_timeus(StartTime),
-    emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimestampUs).
+    TimeUs = timestamp_to_timeus(StartTime),
+    ?SAFERPC(emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)).
 
 ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
     {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
-    emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime).
+    TimeUs = timestamp_to_timeus(StartTime),
+    ?SAFERPC(emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)).
 
 ra_update_iterator(DB, Shard, Iter, DSKey) ->
     {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
-    emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey).
+    ?SAFERPC(emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey)).
 
 ra_next(DB, Shard, Iter, BatchSize) ->
     {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
-    emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize).
+    case emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize) of
+        RPCError = {badrpc, _} ->
+            {error, recoverable, RPCError};
+        Other ->
+            Other
+    end.
 
 ra_delete_next(DB, Shard, Iter, Selector, BatchSize) ->
     {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
@@ -573,16 +615,20 @@ ra_delete_next(DB, Shard, Iter, Selector, BatchSize) ->
 
 ra_list_generations_with_lifetimes(DB, Shard) ->
     {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
-    Gens = emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard),
-    maps:map(
-        fun(_GenId, Data = #{since := Since, until := Until}) ->
-            Data#{
-                since := timeus_to_timestamp(Since),
-                until := emqx_maybe:apply(fun timeus_to_timestamp/1, Until)
-            }
-        end,
-        Gens
-    ).
+    case ?SAFERPC(emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard)) of
+        Gens = #{} ->
+            maps:map(
+                fun(_GenId, Data = #{since := Since, until := Until}) ->
+                    Data#{
+                        since := timeus_to_timestamp(Since),
+                        until := emqx_maybe:apply(fun timeus_to_timestamp/1, Until)
+                    }
+                end,
+                Gens
+            );
+        Error ->
+            Error
+    end.
 
 ra_drop_shard(DB, Shard) ->
     ra:delete_cluster(emqx_ds_replication_layer_shard:shard_servers(DB, Shard), ?RA_TIMEOUT).

+ 9 - 0
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -21,6 +21,7 @@
 -export([
     open_shard/2,
     drop_shard/1,
+    shard_info/2,
     store_batch/3,
     get_streams/3,
     get_delete_streams/3,
@@ -436,6 +437,14 @@ list_generations_with_lifetimes(ShardId) ->
 drop_generation(ShardId, GenId) ->
     gen_server:call(?REF(ShardId), #call_drop_generation{gen_id = GenId}, infinity).
 
+-spec shard_info(shard_id(), status) -> running | down.
+shard_info(ShardId, status) ->
+    try get_schema_runtime(ShardId) of
+        #{} -> running
+    catch
+        error:badarg -> down
+    end.
+
 %%================================================================================
 %% gen_server for the shard
 %%================================================================================