|
|
@@ -189,22 +189,9 @@ add_local_server(DB, Shard) ->
|
|
|
-spec drop_local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
|
|
|
ok | emqx_ds:error(_Reason).
|
|
|
drop_local_server(DB, Shard) ->
|
|
|
- ShardServers = shard_servers(DB, Shard),
|
|
|
+ %% NOTE: Timeouts are ignored, it's a best effort attempt.
|
|
|
+ _ = prep_stop_server(DB, Shard),
|
|
|
LocalServer = local_server(DB, Shard),
|
|
|
- case lookup_leader(DB, Shard) of
|
|
|
- LocalServer ->
|
|
|
- %% NOTE
|
|
|
- %% Trigger leadership transfer *and* force to wait until the new leader
|
|
|
- %% is elected and updated in the leaderboard. This should help to avoid
|
|
|
- %% edge cases where entries appended right before removal are duplicated
|
|
|
- %% due to client retries.
|
|
|
- %% Timeouts are ignored, it's a best effort attempt.
|
|
|
- [Candidate | _] = lists:delete(LocalServer, ShardServers),
|
|
|
- _ = ra:transfer_leadership(LocalServer, Candidate),
|
|
|
- _ = wait_until(fun() -> lookup_leader(DB, Shard) == Candidate end);
|
|
|
- _Another ->
|
|
|
- ok
|
|
|
- end,
|
|
|
case remove_server(DB, Shard, LocalServer) of
|
|
|
ok ->
|
|
|
ra:force_delete_server(DB, LocalServer);
|
|
|
@@ -300,7 +287,7 @@ ra_overview(Server) ->
|
|
|
|
|
|
init({DB, Shard, Opts}) ->
|
|
|
_ = process_flag(trap_exit, true),
|
|
|
- ok = start_shard(DB, Shard, Opts),
|
|
|
+ ok = start_server(DB, Shard, Opts),
|
|
|
{ok, {DB, Shard}}.
|
|
|
|
|
|
handle_call(_Call, _From, State) ->
|
|
|
@@ -310,18 +297,18 @@ handle_cast(_Msg, State) ->
|
|
|
{noreply, State}.
|
|
|
|
|
|
terminate(_Reason, {DB, Shard}) ->
|
|
|
+ %% NOTE: Timeouts are ignored, it's a best effort attempt.
|
|
|
+ catch prep_stop_server(DB, Shard),
|
|
|
LocalServer = get_local_server(DB, Shard),
|
|
|
ok = ra:stop_server(DB, LocalServer).
|
|
|
|
|
|
%%
|
|
|
|
|
|
-start_shard(DB, Shard, #{replication_options := ReplicationOpts}) ->
|
|
|
+start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
|
|
|
ClusterName = cluster_name(DB, Shard),
|
|
|
LocalServer = local_server(DB, Shard),
|
|
|
Servers = shard_servers(DB, Shard),
|
|
|
case ra:restart_server(DB, LocalServer) of
|
|
|
- ok ->
|
|
|
- Bootstrap = false;
|
|
|
{error, name_not_registered} ->
|
|
|
Bootstrap = true,
|
|
|
Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
|
|
|
@@ -339,7 +326,11 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) ->
|
|
|
initial_members => Servers,
|
|
|
machine => Machine,
|
|
|
log_init_args => LogOpts
|
|
|
- })
|
|
|
+ });
|
|
|
+ ok ->
|
|
|
+ Bootstrap = false;
|
|
|
+ {error, {already_started, _}} ->
|
|
|
+ Bootstrap = false
|
|
|
end,
|
|
|
%% NOTE
|
|
|
%% Triggering election is necessary when a new consensus group is being brought up.
|
|
|
@@ -371,6 +362,29 @@ server_uid(_DB, Shard) ->
|
|
|
|
|
|
%%
|
|
|
|
|
|
+prep_stop_server(DB, Shard) ->
|
|
|
+ prep_stop_server(DB, Shard, 5_000).
|
|
|
+
|
|
|
+prep_stop_server(DB, Shard, Timeout) ->
|
|
|
+ LocalServer = get_local_server(DB, Shard),
|
|
|
+ Candidates = lists:delete(LocalServer, shard_servers(DB, Shard)),
|
|
|
+ case lookup_leader(DB, Shard) of
|
|
|
+ LocalServer when Candidates =/= [] ->
|
|
|
+ %% NOTE
|
|
|
+ %% Trigger leadership transfer *and* force to wait until the new leader
|
|
|
+ %% is elected and updated in the leaderboard. This should help to avoid
|
|
|
+ %% edge cases where entries appended right before removal are duplicated
|
|
|
+ %% due to client retries.
|
|
|
+ %% TODO: Candidate may be offline.
|
|
|
+ [Candidate | _] = Candidates,
|
|
|
+ _ = ra:transfer_leadership(LocalServer, Candidate),
|
|
|
+ wait_until(fun() -> lookup_leader(DB, Shard) == Candidate end, Timeout);
|
|
|
+ _Another ->
|
|
|
+ ok
|
|
|
+ end.
|
|
|
+
|
|
|
+%%
|
|
|
+
|
|
|
memoize(Fun, Args) ->
|
|
|
%% NOTE: Assuming that the function is pure and never returns `undefined`.
|
|
|
case persistent_term:get([Fun | Args], undefined) of
|
|
|
@@ -382,8 +396,8 @@ memoize(Fun, Args) ->
|
|
|
Result
|
|
|
end.
|
|
|
|
|
|
-wait_until(Fun) ->
|
|
|
- wait_until(Fun, 5_000, 250).
|
|
|
+wait_until(Fun, Timeout) ->
|
|
|
+ wait_until(Fun, Timeout, 100).
|
|
|
|
|
|
wait_until(Fun, Timeout, Sleep) ->
|
|
|
Deadline = erlang:monotonic_time(millisecond) + Timeout,
|