Explorar el Código

Merge pull request #13486 from keynslug/fix/ci/ds-raft-flaky-next

test(dsraft): attempt to stabilize flaky testcases
Andrew Mayorov hace 1 año
padre
commit
b7200656a5

+ 15 - 40
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_shard_allocator.erl

@@ -32,13 +32,12 @@
 -define(TRIGGER_PENDING_TIMEOUT, 60_000).
 -define(TRIGGER_PENDING_TIMEOUT, 60_000).
 
 
 -define(TRANS_RETRY_TIMEOUT, 5_000).
 -define(TRANS_RETRY_TIMEOUT, 5_000).
--define(REMOVE_REPLICA_DELAY, {10_000, 5_000}).
 
 
 -ifdef(TEST).
 -ifdef(TEST).
 -undef(TRANS_RETRY_TIMEOUT).
 -undef(TRANS_RETRY_TIMEOUT).
--undef(REMOVE_REPLICA_DELAY).
+-undef(TRIGGER_PENDING_TIMEOUT).
 -define(TRANS_RETRY_TIMEOUT, 1_000).
 -define(TRANS_RETRY_TIMEOUT, 1_000).
--define(REMOVE_REPLICA_DELAY, {3_000, 2_000}).
+-define(TRIGGER_PENDING_TIMEOUT, 5_000).
 -endif.
 -endif.
 
 
 %%
 %%
@@ -155,12 +154,12 @@ unsubscribe_db_changes(_State) ->
 
 
 handle_shard_changed(Shard, State = #{db := DB}) ->
 handle_shard_changed(Shard, State = #{db := DB}) ->
     ok = save_shard_meta(DB, Shard),
     ok = save_shard_meta(DB, Shard),
-    handle_shard_transitions(Shard, next_transitions(DB, Shard), State).
+    handle_shard_transitions(Shard, local, next_transitions(DB, Shard), State).
 
 
 handle_pending_transitions(State = #{db := DB, shards := Shards}) ->
 handle_pending_transitions(State = #{db := DB, shards := Shards}) ->
     lists:foldl(
     lists:foldl(
         fun(Shard, StateAcc) ->
         fun(Shard, StateAcc) ->
-            handle_shard_transitions(Shard, next_transitions(DB, Shard), StateAcc)
+            handle_shard_transitions(Shard, any, next_transitions(DB, Shard), StateAcc)
         end,
         end,
         State,
         State,
         Shards
         Shards
@@ -169,41 +168,34 @@ handle_pending_transitions(State = #{db := DB, shards := Shards}) ->
 next_transitions(DB, Shard) ->
 next_transitions(DB, Shard) ->
     emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard).
     emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard).
 
 
-handle_shard_transitions(_Shard, [], State) ->
+handle_shard_transitions(_Shard, _, [], State) ->
     %% We reached the target allocation.
     %% We reached the target allocation.
     State;
     State;
-handle_shard_transitions(Shard, [Trans | _Rest], State) ->
-    case transition_handler(Shard, Trans, State) of
+handle_shard_transitions(Shard, Scope, [Trans | _Rest], State) ->
+    case transition_handler(Shard, Scope, Trans, State) of
         {Track, Handler} ->
         {Track, Handler} ->
             ensure_transition(Track, Shard, Trans, Handler, State);
             ensure_transition(Track, Shard, Trans, Handler, State);
         undefined ->
         undefined ->
             State
             State
     end.
     end.
 
 
-transition_handler(Shard, Trans, _State = #{db := DB}) ->
+transition_handler(Shard, Scope, Trans, _State = #{db := DB}) ->
     ThisSite = catch emqx_ds_replication_layer_meta:this_site(),
     ThisSite = catch emqx_ds_replication_layer_meta:this_site(),
     case Trans of
     case Trans of
         {add, ThisSite} ->
         {add, ThisSite} ->
             {Shard, {fun trans_claim/4, [fun trans_add_local/3]}};
             {Shard, {fun trans_claim/4, [fun trans_add_local/3]}};
         {del, ThisSite} ->
         {del, ThisSite} ->
             {Shard, {fun trans_claim/4, [fun trans_drop_local/3]}};
             {Shard, {fun trans_claim/4, [fun trans_drop_local/3]}};
-        {del, Site} ->
+        {del, Site} when Scope =:= any ->
+            %% NOTE
+            %% Letting the replica handle its own removal first, acting on the
+            %% transition only when triggered explicitly or by `?TRIGGER_PENDING_TIMEOUT`
+            %% timer. In other cases `Scope` is `local`.
             ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
             ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
             case lists:member(Site, ReplicaSet) of
             case lists:member(Site, ReplicaSet) of
                 true ->
                 true ->
-                    %% NOTE
-                    %% Let the replica handle its own removal first, but still set
-                    %% up a removal handler after a delay, in case the replica is
-                    %% unresponsive.
-                    Handler = {fun trans_delay/5, [
-                        ?REMOVE_REPLICA_DELAY,
-                        {fun trans_claim/4, [fun trans_rm_unresponsive/3]}
-                    ]},
-                    %% NOTE
-                    %% Putting this transition handler on separate "track" so that it
-                    %% won't block any changes with higher priority (e.g. managing
-                    %% local replicas).
-                    {{unresp, Shard}, Handler};
+                    Handler = {fun trans_claim/4, [fun trans_rm_unresponsive/3]},
+                    {Shard, Handler};
                 false ->
                 false ->
                     undefined
                     undefined
             end;
             end;
@@ -332,16 +324,6 @@ do_rm_unresponsive(DB, Shard, Site) ->
             do_rm_unresponsive(DB, Shard, Site)
             do_rm_unresponsive(DB, Shard, Site)
     end.
     end.
 
 
-trans_delay(DB, Shard, Trans, Delay, NextHandler) ->
-    ok = delay(Delay),
-    %% NOTE: Proceed only if the transition we are going to handle is still desired.
-    case next_transitions(DB, Shard) of
-        [Trans | _] ->
-            apply_handler(NextHandler, DB, Shard, Trans);
-        _Outdated ->
-            exit({shutdown, skipped})
-    end.
-
 %%
 %%
 
 
 ensure_transition(Track, Shard, Trans, Handler, State = #{transitions := Ts}) ->
 ensure_transition(Track, Shard, Trans, Handler, State = #{transitions := Ts}) ->
@@ -459,10 +441,3 @@ erase_shards_meta(DB, Shards) ->
 
 
 erase_shard_meta(DB, Shard) ->
 erase_shard_meta(DB, Shard) ->
     persistent_term:erase(?shard_meta(DB, Shard)).
     persistent_term:erase(?shard_meta(DB, Shard)).
-
-%%
-
-delay({MinDelay, Variance}) ->
-    timer:sleep(MinDelay + rand:uniform(Variance));
-delay(Delay) ->
-    timer:sleep(Delay).

+ 3 - 9
apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl

@@ -130,15 +130,8 @@ t_replication_transfers_snapshots(Config) ->
         begin
         begin
             %% Initialize DB on all nodes and wait for it to be online.
             %% Initialize DB on all nodes and wait for it to be online.
             Opts = opts(Config, #{n_shards => 1, n_sites => 3}),
             Opts = opts(Config, #{n_shards => 1, n_sites => 3}),
-            ?assertEqual(
-                [{ok, ok} || _ <- Nodes],
-                erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
-            ),
-            ?retry(
-                500,
-                10,
-                ?assertMatch([[_], [_], [_]], [shards_online(N, ?DB) || N <- Nodes])
-            ),
+            assert_db_open(Nodes, ?DB, Opts),
+            assert_db_stable(Nodes, ?DB),
 
 
             %% Stop the DB on the "offline" node.
             %% Stop the DB on the "offline" node.
             ?wait_async_action(
             ?wait_async_action(
@@ -476,6 +469,7 @@ t_rebalance_chaotic_converges(Config) ->
 
 
             %% Wait until the LTS timestamp is updated:
             %% Wait until the LTS timestamp is updated:
             timer:sleep(5000),
             timer:sleep(5000),
+            assert_db_stable(Nodes, ?DB),
 
 
             %% Check that all messages are still there.
             %% Check that all messages are still there.
             emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams)
             emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams)