Forráskód Böngészése

fix(dsraft): tolerate transient failures when polling shards

Andrew Mayorov 1 éve
szülő
commit
b1a17f90dc

+ 13 - 2
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl

@@ -419,12 +419,23 @@ poll(DB, Iterators, PollOpts = #{timeout := Timeout}) ->
     ),
     maps:foreach(
         fun(Shard, ShardIts) ->
-            ok = ra_poll(
+            Result = ra_poll(
                 DB,
                 Shard,
                 [{{ReplyTo, Token}, It} || {Token, It} <- ShardIts],
                 PollOpts
-            )
+            ),
+            case Result of
+                ok ->
+                    ok;
+                {error, Class, Reason} ->
+                    ?tp(debug, ds_repl_poll_shard_failed, #{
+                        db => DB,
+                        shard => Shard,
+                        class => Class,
+                        reason => Reason
+                    })
+            end
         end,
         Groups
     ),

+ 17 - 5
apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl

@@ -772,17 +772,18 @@ t_error_mapping_replication_layer(init, Config) ->
     Apps = emqx_cth_suite:start([emqx_ds_builtin_raft], #{
         work_dir => ?config(work_dir, Config)
     }),
+    ok = snabbkaffe:start_trace(),
+    ok = emqx_ds_test_helpers:mock_rpc(),
     [{apps, Apps} | Config];
 t_error_mapping_replication_layer('end', Config) ->
+    emqx_ds_test_helpers:unmock_rpc(),
+    snabbkaffe:stop(),
     emqx_cth_suite:stop(?config(apps, Config)),
     Config.
 
 t_error_mapping_replication_layer(Config) ->
     %% This checks that the replication layer maps recoverable errors correctly.
 
-    ok = emqx_ds_test_helpers:mock_rpc(),
-    ok = snabbkaffe:start_trace(),
-
     DB = ?FUNCTION_NAME,
     ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config, #{n_shards => 2}))),
     [Shard1, Shard2] = emqx_ds_replication_layer_meta:shards(DB),
@@ -865,7 +866,19 @@ t_error_mapping_replication_layer(Config) ->
         length([error || {error, _, _} <- Results2]) > 0,
         Results2
     ),
-    meck:unload().
+
+    %% Calling `emqx_ds:poll/3` succeeds, but some poll requests should fail anyway.
+    {ok, SRef} = snabbkaffe:subscribe(
+        ?match_event(#{?snk_kind := ds_repl_poll_shard_failed}),
+        length(Streams0),
+        500
+    ),
+    UserData = ?FUNCTION_NAME,
+    {ok, _PollRef} = emqx_ds:poll(DB, [{UserData, I} || I <- Iterators0], #{timeout => 1_000}),
+    ?assertMatch(
+        {timeout, Events} when length(Events) > 0,
+        snabbkaffe:receive_events(SRef)
+    ).
 
 %% This testcase verifies the behavior of `store_batch' operation
 %% when the underlying code experiences recoverable or unrecoverable
@@ -1068,7 +1081,6 @@ t_poll('end', Config) ->
     ok = emqx_cth_cluster:stop(?config(nodes, Config)).
 
 t_poll(Config) ->
-    DB = ?FUNCTION_NAME,
     Nodes = [N1 | _] = ?config(nodes, Config),
     ?check_trace(
         #{timetrap => 15_000},