Explorar el Código

Merge pull request #14298 from keynslug/fix/EMQX-13577/poll-respect-shard-ready

fix(dsraft): tolerate transient failures when polling shards
Andrew Mayorov hace 1 año
padre
commit
e148da25a7

+ 46 - 6
apps/emqx_ds_builtin_raft/src/emqx_ds_builtin_raft_db_sup.erl

@@ -14,6 +14,7 @@
     start_shard/1,
     start_egress/1,
     stop_shard/1,
+    shard_info/2,
     terminate_storage/1,
     restart_storage/1,
     ensure_shard/1,
@@ -31,23 +32,26 @@
 -export([init/1]).
 
 %% internal exports:
--export([start_link_sup/2]).
+-export([start_link_sup/2, start_link_sentinel/1, init_sentinel/2]).
 
 %%================================================================================
 %% Type declarations
 %%================================================================================
 
--define(via(REC), {via, gproc, {n, l, REC}}).
+-define(name(REC), {n, l, REC}).
+-define(via(REC), {via, gproc, ?name(REC)}).
 
 -define(db_sup, ?MODULE).
--define(shards_sup, emqx_ds_builtin_db_shards_sup).
--define(egress_sup, emqx_ds_builtin_db_egress_sup).
--define(shard_sup, emqx_ds_builtin_db_shard_sup).
+-define(shards_sup, emqx_ds_builtin_raft_db_shards_sup).
+-define(egress_sup, emqx_ds_builtin_raft_db_egress_sup).
+-define(shard_sup, emqx_ds_builtin_raft_db_shard_sup).
+-define(shard_sentinel, emqx_ds_builtin_raft_db_shard_sentinel).
 
 -record(?db_sup, {db}).
 -record(?shards_sup, {db}).
 -record(?egress_sup, {db}).
 -record(?shard_sup, {db, shard}).
+-record(?shard_sentinel, {shardid}).
 
 %%================================================================================
 %% API functions
@@ -77,6 +81,13 @@ stop_shard({DB, Shard}) ->
             {error, Reason}
     end.
 
+-spec shard_info(emqx_ds_storage_layer:shard_id(), ready) -> boolean() | down.
+shard_info(ShardId = {DB, Shard}, Info) ->
+    case sentinel_alive(ShardId) of
+        true -> emqx_ds_replication_layer_shard:shard_info(DB, Shard, Info);
+        false -> down
+    end.
+
 -spec terminate_storage(emqx_ds_storage_layer:shard_id()) -> ok | {error, _Reason}.
 terminate_storage({DB, Shard}) ->
     Sup = ?via(#?shard_sup{db = DB, shard = Shard}),
@@ -181,7 +192,8 @@ init({#?shard_sup{db = DB, shard = Shard}, _}) ->
     Children = [
         shard_storage_spec(DB, Shard, Opts),
         shard_replication_spec(DB, Shard, Opts),
-        shard_beamformers_spec(DB, Shard)
+        shard_beamformers_spec(DB, Shard),
+        shard_sentinel_spec(DB, Shard)
     ],
     {ok, {SupFlags, Children}}.
 
@@ -220,6 +232,22 @@ start_ra_system(DB, #{replication_options := ReplicationOpts}) ->
 start_link_sup(Id, Options) ->
     supervisor:start_link(?via(Id), ?MODULE, {Id, Options}).
 
+-spec start_link_sentinel(emqx_ds_storage_layer:shard_id()) -> {ok, pid()}.
+start_link_sentinel(Id) ->
+    proc_lib:start_link(?MODULE, init_sentinel, [self(), Id]).
+
+-spec init_sentinel(pid(), emqx_ds_storage_layer:shard_id()) -> no_return().
+init_sentinel(Parent, Id) ->
+    Name = ?name(#?shard_sentinel{shardid = Id}),
+    gproc:reg(Name),
+    proc_lib:init_ack(Parent, {ok, self()}),
+    receive
+        %% Not trapping exits, but just in case.
+        {'EXIT', _Pid, Reason} ->
+            gproc:unreg(Name),
+            exit(Reason)
+    end.
+
 %%================================================================================
 %% Internal functions
 %%================================================================================
@@ -292,6 +320,18 @@ shard_beamformers_spec(DB, Shard) ->
             ]}
     }.
 
+shard_sentinel_spec(DB, Shard) ->
+    #{
+        id => {Shard, sentinel},
+        type => worker,
+        restart => permanent,
+        shutdown => brutal_kill,
+        start => {?MODULE, start_link_sentinel, [{DB, Shard}]}
+    }.
+
+sentinel_alive(Id) ->
+    gproc:where(?name(#?shard_sentinel{shardid = Id})) =/= undefined.
+
 ensure_started(Res) ->
     case Res of
         {ok, _Pid} ->

+ 30 - 22
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl

@@ -419,12 +419,23 @@ poll(DB, Iterators, PollOpts = #{timeout := Timeout}) ->
     ),
     maps:foreach(
         fun(Shard, ShardIts) ->
-            ok = ra_poll(
+            Result = ra_poll(
                 DB,
                 Shard,
                 [{{ReplyTo, Token}, It} || {Token, It} <- ShardIts],
                 PollOpts
-            )
+            ),
+            case Result of
+                ok ->
+                    ok;
+                {error, Class, Reason} ->
+                    ?tp(debug, ds_repl_poll_shard_failed, #{
+                        db => DB,
+                        shard => Shard,
+                        class => Class,
+                        reason => Reason
+                    })
+            end
         end,
         Groups
     ),
@@ -527,10 +538,10 @@ shards_of_batch(_DB, [], Acc) ->
 %% TODO
 %% There's a possibility of race condition: storage may shut down right after we
 %% ask for its status.
--define(IF_SHARD_READY(DB, SHARD, EXPR),
-    case emqx_ds_replication_layer_shard:shard_info(DB, SHARD, ready) of
+-define(IF_SHARD_READY(SHARDID, EXPR),
+    case emqx_ds_builtin_raft_db_sup:shard_info(SHARDID, ready) of
         true -> EXPR;
-        false -> {error, recoverable, shard_unavailable}
+        _Unready -> {error, recoverable, shard_unavailable}
     end
 ).
 
@@ -574,8 +585,7 @@ do_get_streams_v1(_DB, _Shard, _TopicFilter, _StartTime) ->
 do_get_streams_v2(DB, Shard, TopicFilter, StartTime) ->
     ShardId = {DB, Shard},
     ?IF_SHARD_READY(
-        DB,
-        Shard,
+        ShardId,
         emqx_ds_storage_layer:get_streams(ShardId, TopicFilter, StartTime)
     ).
 
@@ -602,8 +612,7 @@ do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) ->
 do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) ->
     ShardId = {DB, Shard},
     ?IF_SHARD_READY(
-        DB,
-        Shard,
+        ShardId,
         emqx_ds_storage_layer:make_iterator(ShardId, Stream, TopicFilter, StartTime)
     ).
 
@@ -638,8 +647,7 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
 do_next_v1(DB, Shard, Iter, BatchSize) ->
     ShardId = {DB, Shard},
     ?IF_SHARD_READY(
-        DB,
-        Shard,
+        ShardId,
         emqx_ds_storage_layer:next(
             ShardId, Iter, BatchSize, emqx_ds_replication_layer:current_timestamp(DB, Shard)
         )
@@ -672,8 +680,7 @@ do_add_generation_v2(_DB) ->
 do_list_generations_with_lifetimes_v3(DB, Shard) ->
     ShardId = {DB, Shard},
     ?IF_SHARD_READY(
-        DB,
-        Shard,
+        ShardId,
         emqx_ds_storage_layer:list_generations_with_lifetimes(ShardId)
     ).
 
@@ -700,11 +707,14 @@ do_get_delete_streams_v4(DB, Shard, TopicFilter, StartTime) ->
 do_poll_v1(SourceNode, DB, Shard, Iterators, PollOpts) ->
     ShardId = {DB, Shard},
     ?tp(ds_raft_do_poll, #{shard => ShardId, iterators => Iterators}),
-    lists:foreach(
-        fun({RAddr, It}) ->
-            emqx_ds_beamformer:poll(SourceNode, RAddr, ShardId, It, PollOpts)
-        end,
-        Iterators
+    ?IF_SHARD_READY(
+        ShardId,
+        lists:foreach(
+            fun({RAddr, It}) ->
+                emqx_ds_beamformer:poll(SourceNode, RAddr, ShardId, It, PollOpts)
+            end,
+            Iterators
+        )
     ).
 
 %%================================================================================
@@ -1249,11 +1259,9 @@ snapshot_module() ->
 unpack_iterator(Shard, #{?tag := ?IT, ?enc := Iterator}) ->
     emqx_ds_storage_layer:unpack_iterator(Shard, Iterator).
 
-scan_stream(ShardId, Stream, TopicFilter, StartMsg, BatchSize) ->
-    {DB, Shard} = ShardId,
+scan_stream(ShardId = {DB, Shard}, Stream, TopicFilter, StartMsg, BatchSize) ->
     ?IF_SHARD_READY(
-        DB,
-        Shard,
+        ShardId,
         begin
             Now = current_timestamp(DB, Shard),
             emqx_ds_storage_layer:scan_stream(

+ 17 - 5
apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl

@@ -772,17 +772,18 @@ t_error_mapping_replication_layer(init, Config) ->
     Apps = emqx_cth_suite:start([emqx_ds_builtin_raft], #{
         work_dir => ?config(work_dir, Config)
     }),
+    ok = snabbkaffe:start_trace(),
+    ok = emqx_ds_test_helpers:mock_rpc(),
     [{apps, Apps} | Config];
 t_error_mapping_replication_layer('end', Config) ->
+    emqx_ds_test_helpers:unmock_rpc(),
+    snabbkaffe:stop(),
     emqx_cth_suite:stop(?config(apps, Config)),
     Config.
 
 t_error_mapping_replication_layer(Config) ->
     %% This checks that the replication layer maps recoverable errors correctly.
 
-    ok = emqx_ds_test_helpers:mock_rpc(),
-    ok = snabbkaffe:start_trace(),
-
     DB = ?FUNCTION_NAME,
     ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config, #{n_shards => 2}))),
     [Shard1, Shard2] = emqx_ds_replication_layer_meta:shards(DB),
@@ -865,7 +866,19 @@ t_error_mapping_replication_layer(Config) ->
         length([error || {error, _, _} <- Results2]) > 0,
         Results2
     ),
-    meck:unload().
+
+    %% Calling `emqx_ds:poll/3` succeeds, but some poll requests should fail anyway.
+    {ok, SRef} = snabbkaffe:subscribe(
+        ?match_event(#{?snk_kind := ds_repl_poll_shard_failed}),
+        length(Streams0),
+        500
+    ),
+    UserData = ?FUNCTION_NAME,
+    {ok, _PollRef} = emqx_ds:poll(DB, [{UserData, I} || I <- Iterators0], #{timeout => 1_000}),
+    ?assertMatch(
+        {timeout, Events} when length(Events) > 0,
+        snabbkaffe:receive_events(SRef)
+    ).
 
 %% This testcase verifies the behavior of `store_batch' operation
 %% when the underlying code experiences recoverable or unrecoverable
@@ -1068,7 +1081,6 @@ t_poll('end', Config) ->
     ok = emqx_cth_cluster:stop(?config(nodes, Config)).
 
 t_poll(Config) ->
-    DB = ?FUNCTION_NAME,
     Nodes = [N1 | _] = ?config(nodes, Config),
     ?check_trace(
         #{timetrap => 15_000},

+ 1 - 0
changes/ee/fix-14298.en.md

@@ -0,0 +1 @@
+Tolerate transient remote shard failures in DS Raft/RocksDB backend that could have caused durable sessions to crash when polling shards for updates.