Просмотр исходного кода

feat(router): track node liveness / reachability in helper

Now that helper does not cleanup after a dead node right away, routes
pointing to down or otherwise unavailable nodes can stay in the global
routing table for a while. Doing `gen_rpc` casts (as part of message
forwarding) to unavailable nodes might prove expensive, so the idea here
is to avoid that once we know the node is down.
Andrew Mayorov 1 год назад
Родитель
Сommit
001776dc7e

+ 10 - 3
apps/emqx/src/emqx_broker.erl

@@ -408,14 +408,18 @@ do_route2({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) ->
 aggre([]) ->
     [];
 aggre([#route{topic = To, dest = Node}]) when is_atom(Node) ->
-    [{To, Node}];
+    [{To, Node} || emqx_router_helper:is_routable(Node)];
 aggre([#route{topic = To, dest = {Group, _Node}}]) ->
     [{To, Group}];
 aggre(Routes) ->
     aggre(Routes, false, []).
 
 aggre([#route{topic = To, dest = Node} | Rest], Dedup, Acc) when is_atom(Node) ->
-    aggre(Rest, Dedup, [{To, Node} | Acc]);
+    case emqx_router_helper:is_routable(Node) of
+        true -> NAcc = [{To, Node} | Acc];
+        false -> NAcc = Acc
+    end,
+    aggre(Rest, Dedup, NAcc);
 aggre([#route{topic = To, dest = {Group, _Node}} | Rest], _Dedup, Acc) ->
     aggre(Rest, true, [{To, Group} | Acc]);
 aggre([], false, Acc) ->
@@ -442,7 +446,10 @@ forward(Node, To, Delivery = #delivery{message = _Msg}, RpcMode) ->
 
 %% @doc Forward message to another node.
 -spec do_forward(
-    node(), emqx_types:topic() | emqx_types:share(), emqx_types:delivery(), RpcMode :: sync | async
+    node(),
+    emqx_types:topic() | emqx_types:share(),
+    emqx_types:delivery(),
+    RpcMode :: sync | async
 ) ->
     emqx_types:deliver_result().
 do_forward(Node, To, Delivery, async) ->

+ 29 - 2
apps/emqx/src/emqx_router_helper.erl

@@ -31,6 +31,7 @@
 -export([
     start_link/0,
     monitor/1,
+    is_routable/1,
     purge/0,
     purge_force/0
 ]).
@@ -50,6 +51,8 @@
 
 -record(routing_node, {name, const = unused}).
 
+-define(TAB_STATUS, ?MODULE).
+
 -define(LOCK(RESOURCE), {?MODULE, RESOURCE}).
 
 %% How often to reconcile nodes state? (ms)
@@ -95,6 +98,19 @@ monitor(Node) when is_atom(Node) ->
         false -> mria:dirty_write(?ROUTING_NODE, #routing_node{name = Node})
     end.
 
+%% @doc Is given node considered routable?
+%% I.e. should the broker attempt to forward messages there, even if there are
+%% routes to this node in the routing table?
+-spec is_routable(node()) -> boolean().
+is_routable(Node) when Node == node() ->
+    true;
+is_routable(Node) ->
+    try
+        lookup_node_reachable(Node)
+    catch
+        error:badarg -> true
+    end.
+
 %% @doc Schedule dead node purges.
 -spec purge() -> scheduled.
 purge() ->
@@ -114,6 +130,8 @@ purge_force() ->
 
 init([]) ->
     process_flag(trap_exit, true),
+    %% Initialize a table to cache node status.
+    Tab = ets:new(?TAB_STATUS, [protected, named_table, set, {read_concurrency, true}]),
     %% Monitor nodes lifecycle events.
     ok = ekka:monitor(membership),
     %% Cleanup any routes left by old incarnations of this node (if any).
@@ -126,6 +144,7 @@ init([]) ->
     State = #{
         down => #{},
         left => #{},
+        tab_node_status => Tab,
         timer_reconcile => TRef
     },
     {ok, State, hibernate}.
@@ -173,10 +192,12 @@ handle_membership_event(_Event, State) ->
     State.
 
 record_node_down(Node, State = #{down := Down}) ->
+    _ = ets:insert(?TAB_STATUS, {Node, down, _Reachable = false}),
     Record = #{since => erlang:monotonic_time(millisecond)},
     State#{down := Down#{Node => Record}}.
 
 record_node_left(Node, State = #{left := Left}) ->
+    _ = ets:insert(?TAB_STATUS, {Node, left, _Reachable = false}),
     Record = #{since => erlang:monotonic_time(millisecond)},
     State#{left := Left#{Node => Record}}.
 
@@ -184,11 +205,15 @@ record_node_alive(Node, State) ->
     forget_node(Node, State).
 
 forget_node(Node, State = #{down := Down, left := Left}) ->
+    _ = ets:delete(?TAB_STATUS, Node),
     State#{
         down := maps:remove(Node, Down),
         left := maps:remove(Node, Left)
     }.
 
+lookup_node_reachable(Node) ->
+    ets:lookup_element(?TAB_STATUS, Node, 3, _Default = true).
+
 handle_reconcile(State) ->
     TRef = schedule_task(reconcile, ?RECONCILE_INTERVAL),
     NState = State#{timer_reconcile := TRef},
@@ -196,8 +221,10 @@ handle_reconcile(State) ->
     schedule_purges(TS, reconcile(NState)).
 
 reconcile(State) ->
-    %% Fallback: avoid purging live nodes, if missed lifecycle events for whatever reason.
-    lists:foldl(fun record_node_alive/2, State, mria:running_nodes()).
+    %% NOTE
+    %% This is a fallback mechanism. Avoid purging / ignoring live nodes, if missed
+    %% lifecycle events for whatever reason.
+    lists:foldl(fun record_node_alive/2, State, mria:running_nodes() ++ nodes()).
 
 select_outdated(NodesInfo, Since0) ->
     maps:keys(maps:filter(fun(_, #{since := Since}) -> Since < Since0 end, NodesInfo)).

+ 4 - 3
apps/emqx/src/emqx_shared_sub.erl

@@ -96,7 +96,6 @@
 -define(SERVER, ?MODULE).
 
 -define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5).
--define(IS_REMOTE_PID(Pid), (is_pid(Pid) andalso node(Pid) =/= node())).
 -define(ACK, shared_sub_ack).
 -define(NACK(Reason), {shared_sub_nack, Reason}).
 -define(NO_ACK, no_ack).
@@ -556,10 +555,12 @@ is_active_sub(Pid, FailedSubs, All) ->
         is_alive_sub(Pid).
 
 %% erlang:is_process_alive/1 does not work with remote pid.
+is_alive_sub(Pid) when node(Pid) == node() ->
+    %% The race is when the pid is actually down cleanup_down is not evaluated yet.
+    erlang:is_process_alive(Pid);
 is_alive_sub(Pid) ->
     %% When process is not local, the best guess is it's alive.
-    %% The race is when the pid is actually down cleanup_down is not evaluated yet
-    ?IS_REMOTE_PID(Pid) orelse erlang:is_process_alive(Pid).
+    emqx_router_helper:is_routable(node(Pid)).
 
 delete_route_if_needed({Group, Topic} = GroupTopic) ->
     if_no_more_subscribers(GroupTopic, fun() ->

+ 11 - 1
apps/emqx/test/emqx_router_helper_SUITE.erl

@@ -172,10 +172,20 @@ t_cluster_node_restart('end', Config) ->
 t_cluster_node_restart(Config) ->
     ClusterNode = ?config(cluster_node, Config),
     ClusterSpec = ?config(cluster_node_spec, Config),
-    emqx_router:add_route(<<"restart/b/c">>, ClusterNode),
+    emqx_router:add_route(<<"restart/b/+">>, ClusterNode),
     emqx_router:add_route(<<"test/e/f">>, node()),
     ?assertMatch([_, _], emqx_router:topics()),
+    ok = emqx_cth_cluster:stop([ClusterNode]),
+    %% The route should still be there, still expecting the node to come back up.
+    ?assertMatch([_, _], emqx_router:topics()),
+    %% Verify broker is aware there's no reason to route to a node that is down.
+    ok = timer:sleep(500),
+    ?assertEqual(
+        [],
+        emqx_broker:publish(emqx_message:make(<<?MODULE_STRING>>, <<"restart/b/c">>, <<>>))
+    ),
     _ = emqx_cth_cluster:restart(ClusterSpec),
+    %% Node should have cleaned up upon restart.
     ?assertEqual([<<"test/e/f">>], emqx_router:topics()).
 
 t_message(_) ->