Просмотр исходного кода

Merge pull request #14248 from keynslug/fix/EMQX-11852/no-eager-cleanup-nodedown

feat(router): require node to cleanup routes on restart
Andrew Mayorov 1 год назад
Родитель
Сommit
df2726699e

+ 10 - 3
apps/emqx/src/emqx_broker.erl

@@ -408,14 +408,18 @@ do_route2({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) ->
 aggre([]) ->
     [];
 aggre([#route{topic = To, dest = Node}]) when is_atom(Node) ->
-    [{To, Node}];
+    [{To, Node} || emqx_router_helper:is_routable(Node)];
 aggre([#route{topic = To, dest = {Group, _Node}}]) ->
     [{To, Group}];
 aggre(Routes) ->
     aggre(Routes, false, []).
 
 aggre([#route{topic = To, dest = Node} | Rest], Dedup, Acc) when is_atom(Node) ->
-    aggre(Rest, Dedup, [{To, Node} | Acc]);
+    case emqx_router_helper:is_routable(Node) of
+        true -> NAcc = [{To, Node} | Acc];
+        false -> NAcc = Acc
+    end,
+    aggre(Rest, Dedup, NAcc);
 aggre([#route{topic = To, dest = {Group, _Node}} | Rest], _Dedup, Acc) ->
     aggre(Rest, true, [{To, Group} | Acc]);
 aggre([], false, Acc) ->
@@ -442,7 +446,10 @@ forward(Node, To, Delivery = #delivery{message = _Msg}, RpcMode) ->
 
 %% @doc Forward message to another node.
 -spec do_forward(
-    node(), emqx_types:topic() | emqx_types:share(), emqx_types:delivery(), RpcMode :: sync | async
+    node(),
+    emqx_types:topic() | emqx_types:share(),
+    emqx_types:delivery(),
+    RpcMode :: sync | async
 ) ->
     emqx_types:deliver_result().
 do_forward(Node, To, Delivery, async) ->

+ 343 - 78
apps/emqx/src/emqx_router_helper.erl

@@ -14,6 +14,33 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
+%% Router helper process.
+%%
+%% Responsibility is twofold:
+%% 1. Cleaning own portion of the global routing table when restarted.
+%%    The assumption is that the node has crashed (worst-case), so the
+%%    previous incarnation's routes are still present upon restart.
+%% 2. Managing portions of global routing table belonging to dead / "left"
+%%    cluster members, i.e. members that are not supposed to come back
+%%    online again.
+%%
+%% Only core nodes are responsible for the latter task. Moreover, helper
+%% adopts the following operational model:
+%% 1. Core nodes are supposed to be explicitly evicted (or "left") from
+%%    the cluster. Even if a core node is marked down for several hours,
+%%    helper won't attempt to purge its portion of the global routing
+%%    table.
+%% 2. Replicant nodes are considered dead (or "left") once they are down
+%%    for a specific timespan. Currently hardcoded as `?PURGE_DEAD_TIMEOUT`.
+%%    Ideally it should reflect amount of time it takes for a connectivity
+%%    failure between cores and replicants to heal worst-case.
+%%
+%% TODO
+%% While cores purge unreachable replicants' routes after a timeout,
+%% replicants _do nothing_ on connectivity loss, regardless of how long
+%% it is. Coupled with the fact that replicants are not affected by
+%% "autoheal" mechanism, this may still lead to routing inconsistencies.
+
 -module(emqx_router_helper).
 
 -behaviour(gen_server).
@@ -23,6 +50,7 @@
 -include("logger.hrl").
 -include("types.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
 
 %% Mnesia bootstrap
 -export([create_tables/0]).
@@ -30,7 +58,10 @@
 %% API
 -export([
     start_link/0,
-    monitor/1
+    monitor/1,
+    is_routable/1,
+    schedule_purge/0,
+    schedule_force_purge/0
 ]).
 
 %% Internal export
@@ -46,16 +77,29 @@
     code_change/3
 ]).
 
-%% Internal exports (RPC)
--export([
-    cleanup_routes/1
-]).
-
 -record(routing_node, {name, const = unused}).
 
--define(LOCK, {?MODULE, cleanup_routes}).
+-define(TAB_STATUS, ?MODULE).
+
+-define(LOCK(RESOURCE), {?MODULE, RESOURCE}).
+
+%% How often to reconcile nodes state? (ms)
+%% Introduce some jitter to avoid concerted firing on different nodes.
+-define(RECONCILE_INTERVAL, {2 * 60_000, '±', 15_000}).
+%% How soon should a dead node be purged? (ms)
+-define(PURGE_DEAD_TIMEOUT, 15 * 60_000).
+%% How soon should a left node be purged? (ms)
+%% This is a fallback, left node is expected to be purged right away.
+-define(PURGE_LEFT_TIMEOUT, 15_000).
 
--dialyzer({nowarn_function, [cleanup_routes/1]}).
+-ifdef(TEST).
+-undef(RECONCILE_INTERVAL).
+-undef(PURGE_DEAD_TIMEOUT).
+-undef(PURGE_LEFT_TIMEOUT).
+-define(RECONCILE_INTERVAL, {2_000, '±', 500}).
+-define(PURGE_DEAD_TIMEOUT, 3_000).
+-define(PURGE_LEFT_TIMEOUT, 1_000).
+-endif.
 
 %%--------------------------------------------------------------------
 %% Mnesia bootstrap
@@ -86,39 +130,77 @@ start_link() ->
 monitor({_Group, Node}) ->
     monitor(Node);
 monitor(Node) when is_atom(Node) ->
-    case
-        ekka:is_member(Node) orelse
-            ets:member(?ROUTING_NODE, Node)
-    of
+    case ets:member(?ROUTING_NODE, Node) of
         true -> ok;
         false -> mria:dirty_write(?ROUTING_NODE, #routing_node{name = Node})
     end.
 
+%% @doc Is given node considered routable?
+%% I.e. should the broker attempt to forward messages there, even if there are
+%% routes to this node in the routing table?
+-spec is_routable(node()) -> boolean().
+is_routable(Node) when Node == node() ->
+    true;
+is_routable(Node) ->
+    try
+        lookup_node_reachable(Node)
+    catch
+        error:badarg -> true
+    end.
+
+%% @doc Schedule dead node purges.
+-spec schedule_purge() -> scheduled.
+schedule_purge() ->
+    TS = erlang:monotonic_time(millisecond),
+    gen_server:call(?MODULE, {purge, TS}).
+
+%% @doc Force dead node purges, regardless of for how long nodes are down.
+%% Mostly for testing purposes.
+-spec schedule_force_purge() -> scheduled.
+schedule_force_purge() ->
+    TS = erlang:monotonic_time(millisecond),
+    gen_server:call(?MODULE, {purge, TS + ?PURGE_DEAD_TIMEOUT * 2}).
+
 %%--------------------------------------------------------------------
 %% gen_server callbacks
 %%--------------------------------------------------------------------
 
+-record(ns, {
+    node :: node(),
+    status :: down | left,
+    since :: _MonotonicTimestamp :: integer(),
+    reachable = false :: boolean()
+}).
+
 init([]) ->
     process_flag(trap_exit, true),
+    %% Initialize a table to cache node status.
+    Tab = ets:new(?TAB_STATUS, [
+        protected,
+        {keypos, #ns.node},
+        named_table,
+        set,
+        {read_concurrency, true}
+    ]),
+    %% Monitor nodes lifecycle events.
     ok = ekka:monitor(membership),
+    %% Cleanup any routes left by old incarnations of this node (if any).
+    %% Depending on the size of routing tables this can take signicant amount of time.
     _ = mria:wait_for_tables([?ROUTING_NODE]),
-    {ok, _} = mnesia:subscribe({table, ?ROUTING_NODE, simple}),
-    Nodes = lists:foldl(
-        fun(Node, Acc) ->
-            case ekka:is_member(Node) of
-                true ->
-                    Acc;
-                false ->
-                    true = erlang:monitor_node(Node, true),
-                    [Node | Acc]
-            end
-        end,
-        [],
-        mnesia:dirty_all_keys(?ROUTING_NODE)
-    ),
+    _ = purge_this_node(),
+    %% Setup periodic stats reporting.
     ok = emqx_stats:update_interval(route_stats, fun ?MODULE:stats_fun/0),
-    {ok, #{nodes => Nodes}, hibernate}.
+    TRef = schedule_task(reconcile, ?RECONCILE_INTERVAL),
+    State = #{
+        last_membership => mria:cluster_nodes(cores),
+        tab_node_status => Tab,
+        timer_reconcile => TRef
+    },
+    {ok, State, hibernate}.
 
+handle_call({purge, TS}, _From, State) ->
+    NState = schedule_purges(TS, State),
+    {reply, scheduled, NState};
 handle_call(Req, _From, State) ->
     ?SLOG(error, #{msg => "unexpected_call", call => Req}),
     {reply, ignored, State}.
@@ -127,59 +209,19 @@ handle_cast(Msg, State) ->
     ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
     {noreply, State}.
 
-handle_info(
-    {mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}},
-    State = #{nodes := Nodes}
-) ->
-    case ekka:is_member(Node) orelse lists:member(Node, Nodes) of
-        true ->
-            {noreply, State};
-        false ->
-            true = erlang:monitor_node(Node, true),
-            {noreply, State#{nodes := [Node | Nodes]}}
-    end;
-handle_info({mnesia_table_event, {delete, {?ROUTING_NODE, _Node}, _}}, State) ->
-    %% ignore
-    {noreply, State};
-handle_info({mnesia_table_event, Event}, State) ->
-    ?SLOG(debug, #{msg => "unexpected_mnesia_table_event", event => Event}),
-    {noreply, State};
-handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
-    case mria_rlog:role() of
-        core ->
-            % TODO
-            % Node may flap, do we need to wait for any pending cleanups in `init/1`
-            % on the flapping node?
-            global:trans(
-                {?LOCK, self()},
-                fun() -> cleanup_routes(Node) end
-            ),
-            ok = mria:dirty_delete(?ROUTING_NODE, Node);
-        replicant ->
-            ok
-    end,
-    ?tp(emqx_router_helper_cleanup_done, #{node => Node}),
-    {noreply, State#{nodes := lists:delete(Node, Nodes)}, hibernate};
-handle_info({membership, {mnesia, down, Node}}, State) ->
-    handle_info({nodedown, Node}, State);
-handle_info({membership, {node, down, Node}}, State) ->
-    handle_info({nodedown, Node}, State);
-handle_info({membership, _Event}, State) ->
-    {noreply, State};
+handle_info({membership, Event}, State) ->
+    NState = handle_membership_event(Event, State),
+    {noreply, NState};
+handle_info({timeout, _TRef, {start, Task}}, State) ->
+    NState = handle_task(Task, State),
+    {noreply, NState};
 handle_info(Info, State) ->
     ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {noreply, State}.
 
 terminate(_Reason, _State) ->
-    try
-        ok = ekka:unmonitor(membership),
-        emqx_stats:cancel_update(route_stats),
-        mnesia:unsubscribe({table, ?ROUTING_NODE, simple})
-    catch
-        exit:{noproc, {gen_server, call, [mria_membership, _]}} ->
-            ?SLOG(warning, #{msg => "mria_membership_down"}),
-            ok
-    end.
+    emqx_stats:cancel_update(route_stats),
+    ekka:unmonitor(membership).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
@@ -188,6 +230,232 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
+handle_membership_event({node, down, Node}, State) ->
+    _ = record_node_down(Node),
+    State;
+handle_membership_event({node, leaving, Node}, State) ->
+    _ = schedule_purge_left(Node),
+    _ = record_node_left(Node),
+    State;
+handle_membership_event({node, up, Node}, State) ->
+    _ = record_node_alive(Node),
+    State;
+handle_membership_event({mnesia, up, Node}, State = #{last_membership := Membership}) ->
+    State#{last_membership := lists:usort([Node | Membership])};
+handle_membership_event(_Event, State) ->
+    State.
+
+record_node_down(Node) ->
+    NS = #ns{
+        node = Node,
+        status = down,
+        since = erlang:monotonic_time(millisecond)
+    },
+    case ets:lookup(?TAB_STATUS, Node) of
+        [#ns{status = left}] ->
+            %% Node is still marked as left, keep it that way.
+            false;
+        [#ns{status = down}] ->
+            %% Duplicate.
+            ets:insert(?TAB_STATUS, NS);
+        [] ->
+            ets:insert(?TAB_STATUS, NS)
+    end.
+
+record_node_left(Node) ->
+    NS = #ns{
+        node = Node,
+        status = left,
+        since = erlang:monotonic_time(millisecond)
+    },
+    ets:insert(?TAB_STATUS, NS).
+
+record_node_alive(Node) ->
+    forget_node(Node).
+
+forget_node(Node) ->
+    ets:delete(?TAB_STATUS, Node).
+
+lookup_node_status(Node) ->
+    ets:lookup_element(?TAB_STATUS, Node, #ns.status, _Default = notfound).
+
+lookup_node_reachable(Node) ->
+    ets:lookup_element(?TAB_STATUS, Node, #ns.reachable, _Default = true).
+
+handle_reconcile(State) ->
+    TRef = schedule_task(reconcile, ?RECONCILE_INTERVAL),
+    NState = State#{timer_reconcile := TRef},
+    TS = erlang:monotonic_time(millisecond),
+    schedule_purges(TS, reconcile(NState)).
+
+reconcile(State = #{last_membership := MembersLast}) ->
+    %% Find if there are discrepancies in membership.
+    %% Missing core nodes must have been "force-left" from the cluster.
+    Members = mria:cluster_nodes(cores),
+    ok = lists:foreach(fun(Node) -> record_node_left(Node) end, MembersLast -- Members),
+    %% This is a fallback mechanism.
+    %% Avoid purging live nodes, if missed lifecycle events for whatever reason.
+    ok = lists:foreach(fun record_node_alive/1, mria:running_nodes()),
+    State#{last_membership := Members}.
+
+select(Status) ->
+    MS = ets:fun2ms(fun(#ns{node = Node, status = S}) when S == Status -> Node end),
+    ets:select(?TAB_STATUS, MS).
+
+select_outdated(Status, Since0) ->
+    MS = ets:fun2ms(
+        fun(#ns{node = Node, status = S, since = Since}) when S == Status andalso Since < Since0 ->
+            Node
+        end
+    ),
+    ets:select(?TAB_STATUS, MS).
+
+filter_replicants(Nodes) ->
+    Replicants = mria_membership:replicant_nodelist(),
+    [RN || RN <- Nodes, lists:member(RN, Replicants)].
+
+schedule_purges(TS, State) ->
+    %% Safety measure: purge only dead replicants.
+    %% Assuming a dead / offline core node should:
+    %% 1. Either come back online, and potentially reboot itself when Mria autoheal
+    %%    will kick in.
+    %% 2. ...Or leave the cluster (be "force-left" to be precise), in which case
+    %%    `reconcile/1` should notice a discrepancy and schedule a purge.
+    ok = lists:foreach(
+        fun(Node) -> schedule_purge(Node, {replicant_down_for, ?PURGE_DEAD_TIMEOUT}) end,
+        filter_replicants(select_outdated(down, TS - ?PURGE_DEAD_TIMEOUT))
+    ),
+    %% Trigger purges for "force-left" nodes found during reconcile, if resposible.
+    ok = lists:foreach(
+        fun schedule_purge_left/1,
+        select(left)
+    ),
+    %% Otherwise, purge nodes marked left for a while.
+    ok = lists:foreach(
+        fun(Node) -> schedule_purge(Node, left) end,
+        select_outdated(left, TS - ?PURGE_LEFT_TIMEOUT)
+    ),
+    State.
+
+schedule_purge(Node, Why) ->
+    case am_core() of
+        true ->
+            schedule_task({purge, Node, Why}, 0);
+        false ->
+            false
+    end.
+
+schedule_purge_left(Node) ->
+    case am_core() andalso pick_responsible({purge, Node}) of
+        Responsible when Responsible == node() ->
+            %% Schedule purge on responsible node first, to avoid racing for a global lock.
+            schedule_task({purge, Node, left}, 0);
+        _ ->
+            %% Replicant / not responsible.
+            %% In the latter case try to purge on the next reconcile, as a fallback.
+            false
+    end.
+
+handle_purge(Node, Why, State) ->
+    try purge_dead_node_trans(Node) of
+        true ->
+            ?tp(warning, router_node_routing_table_purged, #{
+                node => Node,
+                reason => Why,
+                hint => "Ignore if the node in question went offline due to cluster maintenance"
+            }),
+            forget_node(Node);
+        false ->
+            ?tp(debug, router_node_purge_skipped, #{node => Node}),
+            forget_node(Node);
+        aborted ->
+            ?tp(notice, router_node_purge_aborted, #{node => Node})
+    catch
+        Kind:Error ->
+            ?tp(warning, router_node_purge_error, #{
+                node => Node,
+                kind => Kind,
+                error => Error
+            })
+    end,
+    State.
+
+purge_dead_node(Node) ->
+    case node_has_routes(Node) of
+        true ->
+            ok = cleanup_routes(Node),
+            true;
+        false ->
+            false
+    end.
+
+purge_dead_node_trans(Node) ->
+    StillKnown = lookup_node_status(Node) =/= notfound,
+    case StillKnown andalso node_has_routes(Node) of
+        true ->
+            global:trans(
+                {?LOCK(Node), self()},
+                fun() ->
+                    ok = cleanup_routes(Node),
+                    true
+                end,
+                cores(),
+                _Retries = 3
+            );
+        false ->
+            false
+    end.
+
+purge_this_node() ->
+    %% TODO: Guard against possible `emqx_router_helper` restarts?
+    purge_dead_node(node()).
+
+node_has_routes(Node) ->
+    ets:member(?ROUTING_NODE, Node).
+
+cleanup_routes(Node) ->
+    emqx_router:cleanup_routes(Node),
+    mria:dirty_delete(?ROUTING_NODE, Node).
+
+%%
+
+schedule_task(Task, Timeout) ->
+    emqx_utils:start_timer(choose_timeout(Timeout), {start, Task}).
+
+handle_task({purge, Node, Why}, State) ->
+    handle_purge(Node, Why, State);
+handle_task(reconcile, State) ->
+    handle_reconcile(State).
+
+choose_timeout({Baseline, '±', Jitter}) ->
+    Baseline + rand:uniform(Jitter * 2) - Jitter;
+choose_timeout(Baseline) ->
+    Baseline.
+
+%%
+
+-spec am_core() -> boolean().
+am_core() ->
+    mria_config:whoami() =/= replicant.
+
+-spec cores() -> [node()].
+cores() ->
+    %% Include stopped nodes as well.
+    mria_membership:nodelist().
+
+-spec pick_responsible(_Task) -> node().
+pick_responsible(Task) ->
+    %% Pick a responsible core node.
+    %% We expect the same node to be picked as responsible across the cluster (unless
+    %% the cluster is highly turbulent).
+    Nodes = lists:sort(mria_membership:running_core_nodelist()),
+    case length(Nodes) of
+        0 -> node();
+        N -> lists:nth(1 + erlang:phash2(Task, N), Nodes)
+    end.
+
+%%
+
 stats_fun() ->
     PSRouteCount = persistent_route_count(),
     NonPSRouteCount = emqx_router:stats(n_routes),
@@ -200,6 +468,3 @@ persistent_route_count() ->
         false ->
             0
     end.
-
-cleanup_routes(Node) ->
-    emqx_router:cleanup_routes(Node).

+ 4 - 3
apps/emqx/src/emqx_shared_sub.erl

@@ -96,7 +96,6 @@
 -define(SERVER, ?MODULE).
 
 -define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5).
--define(IS_REMOTE_PID(Pid), (is_pid(Pid) andalso node(Pid) =/= node())).
 -define(ACK, shared_sub_ack).
 -define(NACK(Reason), {shared_sub_nack, Reason}).
 -define(NO_ACK, no_ack).
@@ -556,10 +555,12 @@ is_active_sub(Pid, FailedSubs, All) ->
         is_alive_sub(Pid).
 
 %% erlang:is_process_alive/1 does not work with remote pid.
+is_alive_sub(Pid) when node(Pid) == node() ->
+    %% The race is when the pid is actually down cleanup_down is not evaluated yet.
+    erlang:is_process_alive(Pid);
 is_alive_sub(Pid) ->
     %% When process is not local, the best guess is it's alive.
-    %% The race is when the pid is actually down cleanup_down is not evaluated yet
-    ?IS_REMOTE_PID(Pid) orelse erlang:is_process_alive(Pid).
+    emqx_router_helper:is_routable(node(Pid)).
 
 delete_route_if_needed({Group, Topic} = GroupTopic) ->
     if_no_more_subscribers(GroupTopic, fun() ->

+ 183 - 66
apps/emqx/test/emqx_router_helper_SUITE.erl

@@ -20,7 +20,6 @@
 -compile(nowarn_export_all).
 
 -include_lib("eunit/include/eunit.hrl").
--include_lib("emqx/include/emqx_router.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
@@ -28,52 +27,93 @@
 
 all() ->
     [
-        {group, routing_schema_v1},
-        {group, routing_schema_v2}
+        {group, smoke},
+        {group, cleanup},
+        {group, cluster},
+        {group, cluster_replicant}
     ].
 
 groups() ->
-    TCs = emqx_common_test_helpers:all(?MODULE),
+    SmokeTCs = [t_monitor, t_message],
+    CleanupTCs = [t_membership_node_leaving],
+    ClusterTCs = [
+        t_cluster_node_leaving,
+        t_cluster_node_force_leave,
+        t_cluster_node_restart
+    ],
+    ClusterReplicantTCs = [
+        t_cluster_node_leaving,
+        t_cluster_node_down,
+        t_cluster_node_restart
+    ],
+    SchemaTCs = [
+        {mria_match_delete, [], CleanupTCs},
+        {fallback, [], CleanupTCs}
+    ],
     [
-        {routing_schema_v1, [], [
-            {mria_match_delete, [], TCs},
-            {fallback, [], TCs}
+        {smoke, [], SmokeTCs},
+        {cleanup, [], [
+            {group, routing_schema_v1},
+            {group, routing_schema_v2}
         ]},
-        {routing_schema_v2, [], [
-            {mria_match_delete, [], TCs},
-            {fallback, [], TCs}
-        ]}
+        {cluster, [], ClusterTCs},
+        {cluster_replicant, [], ClusterReplicantTCs},
+        {routing_schema_v1, [], SchemaTCs},
+        {routing_schema_v2, [], SchemaTCs}
     ].
 
+init_per_group(GroupName, Config) when
+    GroupName == smoke;
+    GroupName == cluster;
+    GroupName == cluster_replicant;
+    GroupName == routing_schema_v1;
+    GroupName == routing_schema_v2
+->
+    WorkDir = emqx_cth_suite:work_dir(Config),
+    AppSpecs = [
+        {mria, mk_config(mria, GroupName)},
+        {emqx, mk_config(emqx, GroupName)}
+    ],
+    Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}),
+    [{group_name, GroupName}, {group_apps, Apps} | Config];
 init_per_group(fallback, Config) ->
     ok = mock_mria_match_delete(),
     Config;
-init_per_group(mria_match_delete, Config) ->
-    Config;
-init_per_group(GroupName, Config) ->
-    WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]),
-    AppSpecs = [{emqx, mk_config(GroupName)}],
-    Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}),
-    [{group_name, GroupName}, {group_apps, Apps} | Config].
+init_per_group(_GroupName, Config) ->
+    Config.
 
+end_per_group(GroupName, Config) when
+    GroupName == smoke;
+    GroupName == cluster;
+    GroupName == cluster_replicant;
+    GroupName == routing_schema_v1;
+    GroupName == routing_schema_v2
+->
+    ok = emqx_cth_suite:stop(?config(group_apps, Config));
 end_per_group(fallback, _Config) ->
     unmock_mria_match_delete(),
     ok;
-end_per_group(mria_match_delete, _Config) ->
-    ok;
-end_per_group(_GroupName, Config) ->
-    ok = emqx_cth_suite:stop(?config(group_apps, Config)).
+end_per_group(_GroupName, _Config) ->
+    ok.
 
-mk_config(routing_schema_v1) ->
+mk_config(emqx, routing_schema_v1) ->
     #{
         config => "broker.routing.storage_schema = v1",
         override_env => [{boot_modules, [broker]}]
     };
-mk_config(routing_schema_v2) ->
+mk_config(emqx, routing_schema_v2) ->
     #{
         config => "broker.routing.storage_schema = v2",
         override_env => [{boot_modules, [broker]}]
-    }.
+    };
+mk_config(mria, cluster_replicant) ->
+    #{
+        override_env => [{node_role, core}, {db_backend, rlog}]
+    };
+mk_config(emqx, _) ->
+    #{override_env => [{boot_modules, [broker]}]};
+mk_config(_App, _) ->
+    #{}.
 
 mock_mria_match_delete() ->
     ok = meck:new(mria, [no_link, passthrough]),
@@ -82,63 +122,140 @@ mock_mria_match_delete() ->
 unmock_mria_match_delete() ->
     ok = meck:unload(mria).
 
-init_per_testcase(_TestCase, Config) ->
+init_per_testcase(TC, Config) ->
     ok = snabbkaffe:start_trace(),
-    Config.
+    emqx_common_test_helpers:init_per_testcase(?MODULE, TC, Config).
 
-end_per_testcase(_TestCase, _Config) ->
+end_per_testcase(TC, Config) ->
     ok = snabbkaffe:stop(),
-    ok.
+    emqx_common_test_helpers:end_per_testcase(?MODULE, TC, Config).
 
 t_monitor(_) ->
     ok = emqx_router_helper:monitor({undefined, node()}),
-    emqx_router_helper:monitor(undefined).
-
-t_mnesia(_) ->
-    ?ROUTER_HELPER ! {mnesia_table_event, {delete, {?ROUTING_NODE, node()}, undefined}},
-    ?ROUTER_HELPER ! {mnesia_table_event, testing},
-    ?ROUTER_HELPER ! {mnesia_table_event, {write, {?ROUTING_NODE, node()}, undefined}},
-    ?ROUTER_HELPER ! {membership, testing},
-    ?ROUTER_HELPER ! {membership, {mnesia, down, node()}},
-    ct:sleep(200).
-
-t_cleanup_membership_mnesia_down(_Config) ->
-    Slave = emqx_cth_cluster:node_name(node2),
-    emqx_router:add_route(<<"a/b/c">>, Slave),
-    emqx_router:add_route(<<"d/e/f">>, node()),
+    ok = emqx_router_helper:monitor(undefined).
+
+t_membership_node_leaving(_Config) ->
+    AnotherNode = emqx_cth_cluster:node_name(leaving),
+    ok = emqx_router:add_route(<<"leaving1/b/c">>, AnotherNode),
+    ok = emqx_router:add_route(<<"test/e/f">>, node()),
     ?assertMatch([_, _], emqx_router:topics()),
-    ?wait_async_action(
-        ?ROUTER_HELPER ! {membership, {mnesia, down, Slave}},
-        #{?snk_kind := emqx_router_helper_cleanup_done, node := Slave},
+    {_, {ok, _}} = ?wait_async_action(
+        ?ROUTER_HELPER ! {membership, {node, leaving, AnotherNode}},
+        #{?snk_kind := router_node_routing_table_purged, node := AnotherNode},
         1_000
     ),
-    ?assertEqual([<<"d/e/f">>], emqx_router:topics()).
+    ?assertEqual([<<"test/e/f">>], emqx_router:topics()).
 
-t_cleanup_membership_node_down(_Config) ->
-    Slave = emqx_cth_cluster:node_name(node3),
-    emqx_router:add_route(<<"a/b/c">>, Slave),
-    emqx_router:add_route(<<"d/e/f">>, node()),
+t_cluster_node_leaving('init', Config) ->
+    start_join_node(cluster_node_leaving, Config);
+t_cluster_node_leaving('end', Config) ->
+    stop_leave_node(Config).
+
+t_cluster_node_leaving(Config) ->
+    ClusterNode = ?config(cluster_node, Config),
+    ok = emqx_router:add_route(<<"leaving/b/c">>, ClusterNode),
+    ok = emqx_router:add_route(<<"test/e/f">>, node()),
     ?assertMatch([_, _], emqx_router:topics()),
-    ?wait_async_action(
-        ?ROUTER_HELPER ! {membership, {node, down, Slave}},
-        #{?snk_kind := emqx_router_helper_cleanup_done, node := Slave},
-        1_000
+    {ok, {ok, _}} = ?wait_async_action(
+        erpc:call(ClusterNode, ekka, leave, []),
+        #{?snk_kind := router_node_routing_table_purged, node := ClusterNode},
+        3_000
+    ),
+    ?assertEqual([<<"test/e/f">>], emqx_router:topics()).
+
+t_cluster_node_down('init', Config) ->
+    start_join_node(cluster_node_down, Config);
+t_cluster_node_down('end', Config) ->
+    stop_leave_node(Config).
+
+t_cluster_node_down(Config) ->
+    ClusterNode = ?config(cluster_node, Config),
+    emqx_router:add_route(<<"down/b/#">>, ClusterNode),
+    emqx_router:add_route(<<"test/e/f">>, node()),
+    ?assertMatch([_, _], emqx_router:topics()),
+    {ok, SRef} = snabbkaffe:subscribe(
+        %% Should be purged after ~2 reconciliations.
+        ?match_event(#{?snk_kind := router_node_routing_table_purged, node := ClusterNode}),
+        1,
+        10_000
     ),
-    ?assertEqual([<<"d/e/f">>], emqx_router:topics()).
+    ok = emqx_cth_cluster:stop([ClusterNode]),
+    {ok, _Event} = snabbkaffe:receive_events(SRef),
+    ?assertEqual([<<"test/e/f">>], emqx_router:topics()).
+
+t_cluster_node_force_leave('init', Config) ->
+    start_join_node(cluster_node_force_leave, Config);
+t_cluster_node_force_leave('end', Config) ->
+    stop_leave_node(Config).
 
-t_cleanup_monitor_node_down(_Config) ->
-    [Slave] = emqx_cth_cluster:start_bare_nodes([node4]),
-    emqx_router:add_route(<<"a/b/c">>, Slave),
-    emqx_router:add_route(<<"d/e/f">>, node()),
+t_cluster_node_force_leave(Config) ->
+    ClusterNode = ?config(cluster_node, Config),
+    emqx_router:add_route(<<"forceleave/b/#">>, ClusterNode),
+    emqx_router:add_route(<<"test/e/f">>, node()),
     ?assertMatch([_, _], emqx_router:topics()),
-    ?wait_async_action(
-        emqx_cth_cluster:stop([Slave]),
-        #{?snk_kind := emqx_router_helper_cleanup_done, node := Slave},
-        1_000
+    {ok, SRef} = snabbkaffe:subscribe(
+        ?match_event(#{?snk_kind := router_node_routing_table_purged, node := ClusterNode}),
+        1,
+        10_000
     ),
-    ?assertEqual([<<"d/e/f">>], emqx_router:topics()).
+    %% Simulate node crash.
+    ok = emqx_cth_peer:kill(ClusterNode),
+    %% Give Mria some time to recognize the node is down.
+    ok = timer:sleep(500),
+    %% Force-leave it.
+    ok = ekka:force_leave(ClusterNode),
+    {ok, _Event} = snabbkaffe:receive_events(SRef),
+    ?assertEqual([<<"test/e/f">>], emqx_router:topics()).
+
+t_cluster_node_restart('init', Config) ->
+    start_join_node(cluster_node_restart, Config);
+t_cluster_node_restart('end', Config) ->
+    stop_leave_node(Config).
+
+t_cluster_node_restart(Config) ->
+    ClusterNode = ?config(cluster_node, Config),
+    ClusterSpec = ?config(cluster_node_spec, Config),
+    emqx_router:add_route(<<"restart/b/+">>, ClusterNode),
+    emqx_router:add_route(<<"test/e/f">>, node()),
+    ?assertMatch([_, _], emqx_router:topics()),
+    ok = emqx_cth_cluster:stop([ClusterNode]),
+    %% The route should still be there, still expecting the node to come back up.
+    ?assertMatch([_, _], emqx_router:topics()),
+    %% Verify broker is aware there's no reason to route to a node that is down.
+    ok = timer:sleep(500),
+    ?assertEqual(
+        [],
+        emqx_broker:publish(emqx_message:make(<<?MODULE_STRING>>, <<"restart/b/c">>, <<>>))
+    ),
+    _ = emqx_cth_cluster:restart(ClusterSpec),
+    %% Node should have cleaned up upon restart.
+    ?assertEqual([<<"test/e/f">>], emqx_router:topics()).
 
 t_message(_) ->
+    Pid = erlang:whereis(?ROUTER_HELPER),
     ?ROUTER_HELPER ! testing,
     gen_server:cast(?ROUTER_HELPER, testing),
-    gen_server:call(?ROUTER_HELPER, testing).
+    gen_server:call(?ROUTER_HELPER, testing),
+    ?assert(erlang:is_process_alive(Pid)),
+    ?assertEqual(Pid, erlang:whereis(?ROUTER_HELPER)).
+
+%%
+
+start_join_node(Name, Config) ->
+    case ?config(group_name, Config) of
+        cluster_replicant ->
+            Role = replicant;
+        _Cluster ->
+            Role = core
+    end,
+    [ClusterSpec] = emqx_cth_cluster:mk_nodespecs(
+        [{Name, #{apps => [emqx], role => Role, join_to => node()}}],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    [ClusterNode] = emqx_cth_cluster:start([ClusterSpec]),
+    [{cluster_node, ClusterNode}, {cluster_node_spec, ClusterSpec} | Config].
+
+stop_leave_node(Config) ->
+    ClusterNode = ?config(cluster_node, Config),
+    ekka:force_leave(ClusterNode),
+    emqx_cth_cluster:stop([ClusterNode]).

+ 1 - 0
changes/ce/fix-14248.en.md

@@ -0,0 +1 @@
+Fixed a class of issues where intermittent connectivity issues between cluster nodes could potentially cause partial loss of cluster-wide routing table state.