|
|
@@ -165,6 +165,7 @@ init([]) ->
|
|
|
ok = emqx_stats:update_interval(route_stats, fun ?MODULE:stats_fun/0),
|
|
|
TRef = schedule_task(reconcile, ?RECONCILE_INTERVAL),
|
|
|
State = #{
|
|
|
+ last_membership => mria_membership:nodelist(),
|
|
|
tab_node_status => Tab,
|
|
|
timer_reconcile => TRef
|
|
|
},
|
|
|
@@ -246,6 +247,9 @@ record_node_alive(Node) ->
|
|
|
forget_node(Node) ->
|
|
|
ets:delete(?TAB_STATUS, Node).
|
|
|
|
|
|
+lookup_node_status(Node) ->
|
|
|
+ ets:lookup_element(?TAB_STATUS, Node, #ns.status, _Default = notfound).
|
|
|
+
|
|
|
lookup_node_reachable(Node) ->
|
|
|
ets:lookup_element(?TAB_STATUS, Node, #ns.reachable, _Default = true).
|
|
|
|
|
|
@@ -255,12 +259,21 @@ handle_reconcile(State) ->
|
|
|
TS = erlang:monotonic_time(millisecond),
|
|
|
schedule_purges(TS, reconcile(NState)).
|
|
|
|
|
|
-reconcile(State) ->
|
|
|
- %% NOTE
|
|
|
- %% This is a fallback mechanism. Avoid purging / ignoring live nodes, if missed
|
|
|
- %% lifecycle events for whatever reason.
|
|
|
+reconcile(State = #{last_membership := MembersLast}) ->
|
|
|
+ %% Find if there are discrepancies in membership.
|
|
|
+ %% Missing core nodes must have been "force-left" from the cluster.
|
|
|
+ Members = mria_membership:nodelist(),
|
|
|
+ ok = lists:foreach(
|
|
|
+ fun(Node) ->
|
|
|
+ schedule_purge_left(Node),
|
|
|
+ record_node_left(Node)
|
|
|
+ end,
|
|
|
+ MembersLast -- Members
|
|
|
+ ),
|
|
|
+ %% This is a fallback mechanism.
|
|
|
+ %% Avoid purging live nodes, if missed lifecycle events for whatever reason.
|
|
|
ok = lists:foreach(fun record_node_alive/1, mria:running_nodes() ++ nodes()),
|
|
|
- State.
|
|
|
+ State#{last_membership := Members}.
|
|
|
|
|
|
select_outdated(Status, Since0) ->
|
|
|
MS = ets:fun2ms(
|
|
|
@@ -270,9 +283,25 @@ select_outdated(Status, Since0) ->
|
|
|
),
|
|
|
ets:select(?TAB_STATUS, MS).
|
|
|
|
|
|
+filter_replicants(Nodes) ->
|
|
|
+ Replicants = mria_membership:replicant_nodelist(),
|
|
|
+ [RN || RN <- Nodes, lists:member(RN, Replicants)].
|
|
|
+
|
|
|
schedule_purges(TS, State) ->
|
|
|
- ok = lists:foreach(fun schedule_purge/1, select_outdated(down, TS - ?PURGE_DEAD_TIMEOUT)),
|
|
|
- ok = lists:foreach(fun schedule_purge/1, select_outdated(left, TS - ?PURGE_LEFT_TIMEOUT)),
|
|
|
+ %% Safety measure: purge only dead replicants.
|
|
|
+ %% Assuming a dead / offline core node should:
|
|
|
+ %% 1. Either come back online, and potentially reboot itself when Mria autoheal
|
|
|
+ %% will kick in.
|
|
|
+ %% 2. ...Or leave the cluster (be "force-left" to be precise), in which case
|
|
|
+ %% `reconcile/1` should notice a discrepancy and schedule a purge.
|
|
|
+ ok = lists:foreach(
|
|
|
+ fun schedule_purge/1,
|
|
|
+ filter_replicants(select_outdated(down, TS - ?PURGE_DEAD_TIMEOUT))
|
|
|
+ ),
|
|
|
+ ok = lists:foreach(
|
|
|
+ fun schedule_purge/1,
|
|
|
+ select_outdated(left, TS - ?PURGE_LEFT_TIMEOUT)
|
|
|
+ ),
|
|
|
State.
|
|
|
|
|
|
schedule_purge(Node) ->
|
|
|
@@ -324,7 +353,8 @@ purge_dead_node(Node) ->
|
|
|
end.
|
|
|
|
|
|
purge_dead_node_trans(Node) ->
|
|
|
- case node_has_routes(Node) of
|
|
|
+ StillKnown = lookup_node_status(Node) =/= notfound,
|
|
|
+ case StillKnown andalso node_has_routes(Node) of
|
|
|
true ->
|
|
|
global:trans(
|
|
|
{?LOCK(Node), self()},
|