|
|
@@ -192,7 +192,7 @@ init([]) ->
|
|
|
ok = emqx_stats:update_interval(route_stats, fun ?MODULE:stats_fun/0),
|
|
|
TRef = schedule_task(reconcile, ?RECONCILE_INTERVAL),
|
|
|
State = #{
|
|
|
- last_membership => mria_membership:nodelist(),
|
|
|
+ last_membership => mria:cluster_nodes(cores),
|
|
|
tab_node_status => Tab,
|
|
|
timer_reconcile => TRef
|
|
|
},
|
|
|
@@ -289,19 +289,17 @@ handle_reconcile(State) ->
|
|
|
reconcile(State = #{last_membership := MembersLast}) ->
|
|
|
%% Find if there are discrepancies in membership.
|
|
|
%% Missing core nodes must have been "force-left" from the cluster.
|
|
|
- Members = mria_membership:nodelist(),
|
|
|
- ok = lists:foreach(
|
|
|
- fun(Node) ->
|
|
|
- schedule_purge_left(Node),
|
|
|
- record_node_left(Node)
|
|
|
- end,
|
|
|
- MembersLast -- Members
|
|
|
- ),
|
|
|
+ Members = mria:cluster_nodes(cores),
|
|
|
+ ok = lists:foreach(fun(Node) -> record_node_left(Node) end, MembersLast -- Members),
|
|
|
%% This is a fallback mechanism.
|
|
|
%% Avoid purging live nodes, if missed lifecycle events for whatever reason.
|
|
|
- ok = lists:foreach(fun record_node_alive/1, mria:running_nodes() ++ nodes()),
|
|
|
+ ok = lists:foreach(fun record_node_alive/1, mria:running_nodes()),
|
|
|
State#{last_membership := Members}.
|
|
|
|
|
|
+select(Status) ->
|
|
|
+ MS = ets:fun2ms(fun(#ns{node = Node, status = S}) when S == Status -> Node end),
|
|
|
+ ets:select(?TAB_STATUS, MS).
|
|
|
+
|
|
|
select_outdated(Status, Since0) ->
|
|
|
MS = ets:fun2ms(
|
|
|
fun(#ns{node = Node, status = S, since = Since}) when S == Status andalso Since < Since0 ->
|
|
|
@@ -325,6 +323,12 @@ schedule_purges(TS, State) ->
|
|
|
fun schedule_purge/1,
|
|
|
filter_replicants(select_outdated(down, TS - ?PURGE_DEAD_TIMEOUT))
|
|
|
),
|
|
|
+ %% Trigger purges for "force-left" nodes found during reconcile, if resposible.
|
|
|
+ ok = lists:foreach(
|
|
|
+ fun schedule_purge_left/1,
|
|
|
+ select(left)
|
|
|
+ ),
|
|
|
+ %% Otherwise, purge nodes marked left for a while.
|
|
|
ok = lists:foreach(
|
|
|
fun schedule_purge/1,
|
|
|
select_outdated(left, TS - ?PURGE_LEFT_TIMEOUT)
|