Browse Source

fix(dsrepl): attempt leadership transfer before server removal

This should make it much less likely to hit weird edge cases that lead
to duplicate Raft log entries because of client retries upon receiving
`shutdown` from the leader being removed.
Andrew Mayorov 2 years ago
parent
commit
3223797ae5
1 changed files with 43 additions and 0 deletions
  1. 43 0
      apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl

+ 43 - 0
apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl

@@ -114,6 +114,13 @@ get_server_local_preferred(DB, Shard) ->
             pick_random(get_shard_servers(DB, Shard))
             pick_random(get_shard_servers(DB, Shard))
     end.
     end.
 
 
+lookup_leader(DB, Shard) ->
+    %% NOTE
+    %% Does not block, but the result may be outdated or even unknown when there's
+    %% no servers on the local node.
+    ClusterName = get_cluster_name(DB, Shard),
+    ra_leaderboard:lookup_leader(ClusterName).
+
 pick_local(Servers) ->
 pick_local(Servers) ->
     case lists:keyfind(node(), 2, Servers) of
     case lists:keyfind(node(), 2, Servers) of
         Local when is_tuple(Local) ->
         Local when is_tuple(Local) ->
@@ -181,7 +188,22 @@ add_local_server(DB, Shard) ->
 -spec drop_local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
 -spec drop_local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
     ok | emqx_ds:error(_Reason).
     ok | emqx_ds:error(_Reason).
 drop_local_server(DB, Shard) ->
 drop_local_server(DB, Shard) ->
+    ShardServers = shard_servers(DB, Shard),
     LocalServer = local_server(DB, Shard),
     LocalServer = local_server(DB, Shard),
+    case lookup_leader(DB, Shard) of
+        LocalServer ->
+            %% NOTE
+            %% Trigger leadership transfer *and* force to wait until the new leader
+            %% is elected and updated in the leaderboard. This should help to avoid
+            %% edge cases where entries appended right before removal are duplicated
+            %% due to client retries.
+            %% Timeouts are ignored, it's a best effort attempt.
+            [Candidate | _] = lists:delete(LocalServer, ShardServers),
+            _ = ra:transfer_leadership(LocalServer, Candidate),
+            _ = wait_until(fun() -> lookup_leader(DB, Shard) == Candidate end);
+        _Another ->
+            ok
+    end,
     case remove_server(DB, Shard, LocalServer) of
     case remove_server(DB, Shard, LocalServer) of
         ok ->
         ok ->
             ra:force_delete_server(DB, LocalServer);
             ra:force_delete_server(DB, LocalServer);
@@ -351,3 +373,24 @@ memoize(Fun, Args) ->
         Result ->
         Result ->
             Result
             Result
     end.
     end.
+
+wait_until(Fun) ->
+    wait_until(Fun, 5_000, 250).
+
+wait_until(Fun, Timeout, Sleep) ->
+    Deadline = erlang:monotonic_time(millisecond) + Timeout,
+    loop_until(Fun, Deadline, Sleep).
+
+loop_until(Fun, Deadline, Sleep) ->
+    case Fun() of
+        true ->
+            ok;
+        false ->
+            case erlang:monotonic_time(millisecond) of
+                Now when Now < Deadline ->
+                    timer:sleep(Sleep),
+                    loop_until(Fun, Deadline, Sleep);
+                _ ->
+                    timeout
+            end
+    end.