|
@@ -53,15 +53,14 @@
|
|
|
-define(LOCK(RESOURCE), {?MODULE, RESOURCE}).
|
|
-define(LOCK(RESOURCE), {?MODULE, RESOURCE}).
|
|
|
|
|
|
|
|
%% How often to reconcile nodes state? (ms)
|
|
%% How often to reconcile nodes state? (ms)
|
|
|
--define(RECONCILE_INTERVAL, 10 * 60_000).
|
|
|
|
|
|
|
+%% Introduce some jitter to avoid concerted firing on different nodes.
|
|
|
|
|
+-define(RECONCILE_INTERVAL, {10 * 60_000, '±', 30_000}).
|
|
|
%% How soon should a dead node be purged? (ms)
|
|
%% How soon should a dead node be purged? (ms)
|
|
|
-define(PURGE_DEAD_TIMEOUT, 60 * 60_000).
|
|
-define(PURGE_DEAD_TIMEOUT, 60 * 60_000).
|
|
|
%% How soon should a left node be purged? (ms)
|
|
%% How soon should a left node be purged? (ms)
|
|
|
%% This is a fallback, left node is expected to be purged right away.
|
|
%% This is a fallback, left node is expected to be purged right away.
|
|
|
-define(PURGE_LEFT_TIMEOUT, 15_000).
|
|
-define(PURGE_LEFT_TIMEOUT, 15_000).
|
|
|
|
|
|
|
|
-% -define(PURGE_LEFT_FALLBACK_TIMEOUT, {15_000, '±', 2_500}).
|
|
|
|
|
-
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
%%--------------------------------------------------------------------
|
|
|
%% Mnesia bootstrap
|
|
%% Mnesia bootstrap
|
|
|
%%--------------------------------------------------------------------
|
|
%%--------------------------------------------------------------------
|
|
@@ -123,7 +122,7 @@ init([]) ->
|
|
|
_ = purge_this_node(),
|
|
_ = purge_this_node(),
|
|
|
%% Setup periodic stats reporting.
|
|
%% Setup periodic stats reporting.
|
|
|
ok = emqx_stats:update_interval(route_stats, fun ?MODULE:stats_fun/0),
|
|
ok = emqx_stats:update_interval(route_stats, fun ?MODULE:stats_fun/0),
|
|
|
- {ok, TRef} = timer:send_interval(?RECONCILE_INTERVAL, reconcile),
|
|
|
|
|
|
|
+ TRef = schedule_task(reconcile, ?RECONCILE_INTERVAL),
|
|
|
State = #{
|
|
State = #{
|
|
|
down => #{},
|
|
down => #{},
|
|
|
left => #{},
|
|
left => #{},
|
|
@@ -145,10 +144,6 @@ handle_cast(Msg, State) ->
|
|
|
handle_info({membership, Event}, State) ->
|
|
handle_info({membership, Event}, State) ->
|
|
|
NState = handle_membership_event(Event, State),
|
|
NState = handle_membership_event(Event, State),
|
|
|
{noreply, NState};
|
|
{noreply, NState};
|
|
|
-handle_info(reconcile, State) ->
|
|
|
|
|
- TS = erlang:monotonic_time(millisecond),
|
|
|
|
|
- NState = schedule_purges(TS, reconcile(State)),
|
|
|
|
|
- {noreply, NState};
|
|
|
|
|
handle_info({timeout, _TRef, {start, Task}}, State) ->
|
|
handle_info({timeout, _TRef, {start, Task}}, State) ->
|
|
|
NState = handle_task(Task, State),
|
|
NState = handle_task(Task, State),
|
|
|
{noreply, NState};
|
|
{noreply, NState};
|
|
@@ -156,8 +151,7 @@ handle_info(Info, State) ->
|
|
|
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
|
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
|
|
{noreply, State}.
|
|
{noreply, State}.
|
|
|
|
|
|
|
|
-terminate(_Reason, _State = #{timer_reconcile := TRef}) ->
|
|
|
|
|
- timer:cancel(TRef),
|
|
|
|
|
|
|
+terminate(_Reason, _State) ->
|
|
|
emqx_stats:cancel_update(route_stats),
|
|
emqx_stats:cancel_update(route_stats),
|
|
|
ekka:unmonitor(membership).
|
|
ekka:unmonitor(membership).
|
|
|
|
|
|
|
@@ -195,6 +189,12 @@ forget_node(Node, State = #{down := Down, left := Left}) ->
|
|
|
left := maps:remove(Node, Left)
|
|
left := maps:remove(Node, Left)
|
|
|
}.
|
|
}.
|
|
|
|
|
|
|
|
|
|
+handle_reconcile(State) ->
|
|
|
|
|
+ TRef = schedule_task(reconcile, ?RECONCILE_INTERVAL),
|
|
|
|
|
+ NState = State#{timer_reconcile := TRef},
|
|
|
|
|
+ TS = erlang:monotonic_time(millisecond),
|
|
|
|
|
+ schedule_purges(TS, reconcile(NState)).
|
|
|
|
|
+
|
|
|
reconcile(State) ->
|
|
reconcile(State) ->
|
|
|
%% Fallback: avoid purging live nodes, if missed lifecycle events for whatever reason.
|
|
%% Fallback: avoid purging live nodes, if missed lifecycle events for whatever reason.
|
|
|
lists:foldl(fun record_node_alive/2, State, mria:running_nodes()).
|
|
lists:foldl(fun record_node_alive/2, State, mria:running_nodes()).
|
|
@@ -289,7 +289,9 @@ schedule_task(Task, Timeout) ->
|
|
|
emqx_utils:start_timer(choose_timeout(Timeout), {start, Task}).
|
|
emqx_utils:start_timer(choose_timeout(Timeout), {start, Task}).
|
|
|
|
|
|
|
|
handle_task({purge, Node}, State) ->
|
|
handle_task({purge, Node}, State) ->
|
|
|
- handle_purge(Node, State).
|
|
|
|
|
|
|
+ handle_purge(Node, State);
|
|
|
|
|
+handle_task(reconcile, State) ->
|
|
|
|
|
+ handle_reconcile(State).
|
|
|
|
|
|
|
|
choose_timeout({Baseline, '±', Jitter}) ->
|
|
choose_timeout({Baseline, '±', Jitter}) ->
|
|
|
Baseline + rand:uniform(Jitter * 2) - Jitter;
|
|
Baseline + rand:uniform(Jitter * 2) - Jitter;
|