|
|
@@ -23,6 +23,7 @@
|
|
|
-include("logger.hrl").
|
|
|
-include("types.hrl").
|
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
+-include_lib("stdlib/include/ms_transform.hrl").
|
|
|
|
|
|
%% Mnesia bootstrap
|
|
|
-export([create_tables/0]).
|
|
|
@@ -137,10 +138,23 @@ purge_force() ->
|
|
|
%% gen_server callbacks
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
+-record(ns, {
|
|
|
+ node :: node(),
|
|
|
+ status :: down | left,
|
|
|
+ since :: _MonotonicTimestamp :: integer(),
|
|
|
+ reachable = false :: boolean()
|
|
|
+}).
|
|
|
+
|
|
|
init([]) ->
|
|
|
process_flag(trap_exit, true),
|
|
|
%% Initialize a table to cache node status.
|
|
|
- Tab = ets:new(?TAB_STATUS, [protected, named_table, set, {read_concurrency, true}]),
|
|
|
+ Tab = ets:new(?TAB_STATUS, [
|
|
|
+ protected,
|
|
|
+ {keypos, #ns.node},
|
|
|
+ named_table,
|
|
|
+ set,
|
|
|
+ {read_concurrency, true}
|
|
|
+ ]),
|
|
|
%% Monitor nodes lifecycle events.
|
|
|
ok = ekka:monitor(membership),
|
|
|
%% Cleanup any routes left by old incarnations of this node (if any).
|
|
|
@@ -151,8 +165,6 @@ init([]) ->
|
|
|
ok = emqx_stats:update_interval(route_stats, fun ?MODULE:stats_fun/0),
|
|
|
TRef = schedule_task(reconcile, ?RECONCILE_INTERVAL),
|
|
|
State = #{
|
|
|
- down => #{},
|
|
|
- left => #{},
|
|
|
tab_node_status => Tab,
|
|
|
timer_reconcile => TRef
|
|
|
},
|
|
|
@@ -191,37 +203,51 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
handle_membership_event({node, down, Node}, State) ->
|
|
|
- record_node_down(Node, State);
|
|
|
+ _ = record_node_down(Node),
|
|
|
+ State;
|
|
|
handle_membership_event({node, leaving, Node}, State) ->
|
|
|
_ = schedule_purge_left(Node),
|
|
|
- record_node_left(Node, State);
|
|
|
+ _ = record_node_left(Node),
|
|
|
+ State;
|
|
|
handle_membership_event({node, up, Node}, State) ->
|
|
|
- record_node_alive(Node, State);
|
|
|
+ _ = record_node_alive(Node),
|
|
|
+ State;
|
|
|
handle_membership_event(_Event, State) ->
|
|
|
State.
|
|
|
|
|
|
-record_node_down(Node, State = #{down := Down}) ->
|
|
|
- _ = ets:insert(?TAB_STATUS, {Node, down, _Reachable = false}),
|
|
|
- Record = #{since => erlang:monotonic_time(millisecond)},
|
|
|
- State#{down := Down#{Node => Record}}.
|
|
|
+record_node_down(Node) ->
|
|
|
+ NS = #ns{
|
|
|
+ node = Node,
|
|
|
+ status = down,
|
|
|
+ since = erlang:monotonic_time(millisecond)
|
|
|
+ },
|
|
|
+ case ets:lookup(?TAB_STATUS, Node) of
|
|
|
+ [#ns{status = left}] ->
|
|
|
+ %% Node is still marked as left, keep it that way.
|
|
|
+ false;
|
|
|
+ [#ns{status = down}] ->
|
|
|
+ %% Duplicate.
|
|
|
+ ets:insert(?TAB_STATUS, NS);
|
|
|
+ [] ->
|
|
|
+ ets:insert(?TAB_STATUS, NS)
|
|
|
+ end.
|
|
|
|
|
|
-record_node_left(Node, State = #{left := Left}) ->
|
|
|
- _ = ets:insert(?TAB_STATUS, {Node, left, _Reachable = false}),
|
|
|
- Record = #{since => erlang:monotonic_time(millisecond)},
|
|
|
- State#{left := Left#{Node => Record}}.
|
|
|
+record_node_left(Node) ->
|
|
|
+ NS = #ns{
|
|
|
+ node = Node,
|
|
|
+ status = left,
|
|
|
+ since = erlang:monotonic_time(millisecond)
|
|
|
+ },
|
|
|
+ ets:insert(?TAB_STATUS, NS).
|
|
|
|
|
|
-record_node_alive(Node, State) ->
|
|
|
- forget_node(Node, State).
|
|
|
+record_node_alive(Node) ->
|
|
|
+ forget_node(Node).
|
|
|
|
|
|
-forget_node(Node, State = #{down := Down, left := Left}) ->
|
|
|
- _ = ets:delete(?TAB_STATUS, Node),
|
|
|
- State#{
|
|
|
- down := maps:remove(Node, Down),
|
|
|
- left := maps:remove(Node, Left)
|
|
|
- }.
|
|
|
+forget_node(Node) ->
|
|
|
+ ets:delete(?TAB_STATUS, Node).
|
|
|
|
|
|
lookup_node_reachable(Node) ->
|
|
|
- ets:lookup_element(?TAB_STATUS, Node, 3, _Default = true).
|
|
|
+ ets:lookup_element(?TAB_STATUS, Node, #ns.reachable, _Default = true).
|
|
|
|
|
|
handle_reconcile(State) ->
|
|
|
TRef = schedule_task(reconcile, ?RECONCILE_INTERVAL),
|
|
|
@@ -233,14 +259,20 @@ reconcile(State) ->
|
|
|
%% NOTE
|
|
|
%% This is a fallback mechanism. Avoid purging / ignoring live nodes, if missed
|
|
|
%% lifecycle events for whatever reason.
|
|
|
- lists:foldl(fun record_node_alive/2, State, mria:running_nodes() ++ nodes()).
|
|
|
-
|
|
|
-select_outdated(NodesInfo, Since0) ->
|
|
|
- maps:keys(maps:filter(fun(_, #{since := Since}) -> Since < Since0 end, NodesInfo)).
|
|
|
+ ok = lists:foreach(fun record_node_alive/1, mria:running_nodes() ++ nodes()),
|
|
|
+ State.
|
|
|
|
|
|
-schedule_purges(TS, State = #{down := Down, left := Left}) ->
|
|
|
- ok = lists:foreach(fun schedule_purge/1, select_outdated(Down, TS - ?PURGE_DEAD_TIMEOUT)),
|
|
|
- ok = lists:foreach(fun schedule_purge/1, select_outdated(Left, TS - ?PURGE_LEFT_TIMEOUT)),
|
|
|
+select_outdated(Status, Since0) ->
|
|
|
+ MS = ets:fun2ms(
|
|
|
+ fun(#ns{node = Node, status = S, since = Since}) when S == Status andalso Since < Since0 ->
|
|
|
+ Node
|
|
|
+ end
|
|
|
+ ),
|
|
|
+ ets:select(?TAB_STATUS, MS).
|
|
|
+
|
|
|
+schedule_purges(TS, State) ->
|
|
|
+ ok = lists:foreach(fun schedule_purge/1, select_outdated(down, TS - ?PURGE_DEAD_TIMEOUT)),
|
|
|
+ ok = lists:foreach(fun schedule_purge/1, select_outdated(left, TS - ?PURGE_LEFT_TIMEOUT)),
|
|
|
State.
|
|
|
|
|
|
schedule_purge(Node) ->
|
|
|
@@ -266,22 +298,21 @@ handle_purge(Node, State) ->
|
|
|
try purge_dead_node_trans(Node) of
|
|
|
true ->
|
|
|
?tp(debug, emqx_router_node_purged, #{node => Node}),
|
|
|
- forget_node(Node, State);
|
|
|
+ forget_node(Node);
|
|
|
false ->
|
|
|
?tp(debug, emqx_router_node_purge_skipped, #{node => Node}),
|
|
|
- forget_node(Node, State);
|
|
|
+ forget_node(Node);
|
|
|
aborted ->
|
|
|
- ?tp(notice, emqx_router_node_purge_aborted, #{node => Node}),
|
|
|
- State
|
|
|
+ ?tp(notice, emqx_router_node_purge_aborted, #{node => Node})
|
|
|
catch
|
|
|
Kind:Error ->
|
|
|
?tp(warning, emqx_router_node_purge_error, #{
|
|
|
node => Node,
|
|
|
kind => Kind,
|
|
|
error => Error
|
|
|
- }),
|
|
|
- State
|
|
|
- end.
|
|
|
+ })
|
|
|
+ end,
|
|
|
+ State.
|
|
|
|
|
|
purge_dead_node(Node) ->
|
|
|
case node_has_routes(Node) of
|