Просмотр исходного кода

feat(router): require node to cleanup routes on restart

Also tolerate transient connectivity issues. Now, the router helper:
* Cleans any leftover routes from previous incarnations upon restart.
* Expects cluster nodes to clean up themselves upon restart.
* Cleans routes of a cluster node that decided to leave upon receiving
  such event.
* Cleans routes of cluster nodes that are down for more than 1 hour.
Andrew Mayorov 1 год назад
Родитель
Сommit
6478ecb9a9
2 измененных файлов с 325 добавлено и 143 удалено
  1. 203 79
      apps/emqx/src/emqx_router_helper.erl
  2. 122 64
      apps/emqx/test/emqx_router_helper_SUITE.erl

+ 203 - 79
apps/emqx/src/emqx_router_helper.erl

@@ -30,7 +30,9 @@
 %% API
 -export([
     start_link/0,
-    monitor/1
+    monitor/1,
+    purge/0,
+    purge_force/0
 ]).
 
 %% Internal export
@@ -46,16 +48,19 @@
     code_change/3
 ]).
 
-%% Internal exports (RPC)
--export([
-    cleanup_routes/1
-]).
-
 -record(routing_node, {name, const = unused}).
 
--define(LOCK, {?MODULE, cleanup_routes}).
+-define(LOCK(RESOURCE), {?MODULE, RESOURCE}).
+
+%% How often to reconcile nodes state? (ms)
+-define(RECONCILE_INTERVAL, 10 * 60_000).
+%% How soon should a dead node be purged? (ms)
+-define(PURGE_DEAD_TIMEOUT, 60 * 60_000).
+%% How soon should a left node be purged? (ms)
+%% This is a fallback, left node is expected to be purged right away.
+-define(PURGE_LEFT_TIMEOUT, 15_000).
 
--dialyzer({nowarn_function, [cleanup_routes/1]}).
+% -define(PURGE_LEFT_FALLBACK_TIMEOUT, {15_000, '±', 2_500}).
 
 %%--------------------------------------------------------------------
 %% Mnesia bootstrap
@@ -86,39 +91,49 @@ start_link() ->
 monitor({_Group, Node}) ->
     monitor(Node);
 monitor(Node) when is_atom(Node) ->
-    case
-        ekka:is_member(Node) orelse
-            ets:member(?ROUTING_NODE, Node)
-    of
+    case ets:member(?ROUTING_NODE, Node) of
         true -> ok;
         false -> mria:dirty_write(?ROUTING_NODE, #routing_node{name = Node})
     end.
 
+%% @doc Schedule dead node purges.
+-spec purge() -> scheduled.
+purge() ->
+    TS = erlang:monotonic_time(millisecond),
+    gen_server:call(?MODULE, {purge, TS}).
+
+%% @doc Force dead node purges, regardless of for how long nodes are down.
+%% Mostly for testing purposes.
+-spec purge_force() -> scheduled.
+purge_force() ->
+    TS = erlang:monotonic_time(millisecond),
+    gen_server:call(?MODULE, {purge, TS + ?PURGE_DEAD_TIMEOUT * 2}).
+
 %%--------------------------------------------------------------------
 %% gen_server callbacks
 %%--------------------------------------------------------------------
 
 init([]) ->
     process_flag(trap_exit, true),
+    %% Monitor nodes lifecycle events.
     ok = ekka:monitor(membership),
+    %% Cleanup any routes left by old incarnations of this node (if any).
+    %% Depending on the size of routing tables this can take signicant amount of time.
     _ = mria:wait_for_tables([?ROUTING_NODE]),
-    {ok, _} = mnesia:subscribe({table, ?ROUTING_NODE, simple}),
-    Nodes = lists:foldl(
-        fun(Node, Acc) ->
-            case ekka:is_member(Node) of
-                true ->
-                    Acc;
-                false ->
-                    true = erlang:monitor_node(Node, true),
-                    [Node | Acc]
-            end
-        end,
-        [],
-        mnesia:dirty_all_keys(?ROUTING_NODE)
-    ),
+    _ = purge_this_node(),
+    %% Setup periodic stats reporting.
     ok = emqx_stats:update_interval(route_stats, fun ?MODULE:stats_fun/0),
-    {ok, #{nodes => Nodes}, hibernate}.
+    {ok, TRef} = timer:send_interval(?RECONCILE_INTERVAL, reconcile),
+    State = #{
+        down => #{},
+        left => #{},
+        timer_reconcile => TRef
+    },
+    {ok, State, hibernate}.
 
+handle_call({purge, TS}, _From, State) ->
+    NState = schedule_purges(TS, State),
+    {reply, scheduled, NState};
 handle_call(Req, _From, State) ->
     ?SLOG(error, #{msg => "unexpected_call", call => Req}),
     {reply, ignored, State}.
@@ -127,59 +142,24 @@ handle_cast(Msg, State) ->
     ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
     {noreply, State}.
 
-handle_info(
-    {mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}},
-    State = #{nodes := Nodes}
-) ->
-    case ekka:is_member(Node) orelse lists:member(Node, Nodes) of
-        true ->
-            {noreply, State};
-        false ->
-            true = erlang:monitor_node(Node, true),
-            {noreply, State#{nodes := [Node | Nodes]}}
-    end;
-handle_info({mnesia_table_event, {delete, {?ROUTING_NODE, _Node}, _}}, State) ->
-    %% ignore
-    {noreply, State};
-handle_info({mnesia_table_event, Event}, State) ->
-    ?SLOG(debug, #{msg => "unexpected_mnesia_table_event", event => Event}),
-    {noreply, State};
-handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
-    case mria_rlog:role() of
-        core ->
-            % TODO
-            % Node may flap, do we need to wait for any pending cleanups in `init/1`
-            % on the flapping node?
-            global:trans(
-                {?LOCK, self()},
-                fun() -> cleanup_routes(Node) end
-            ),
-            ok = mria:dirty_delete(?ROUTING_NODE, Node);
-        replicant ->
-            ok
-    end,
-    ?tp(emqx_router_helper_cleanup_done, #{node => Node}),
-    {noreply, State#{nodes := lists:delete(Node, Nodes)}, hibernate};
-handle_info({membership, {mnesia, down, Node}}, State) ->
-    handle_info({nodedown, Node}, State);
-handle_info({membership, {node, down, Node}}, State) ->
-    handle_info({nodedown, Node}, State);
-handle_info({membership, _Event}, State) ->
-    {noreply, State};
+handle_info({membership, Event}, State) ->
+    NState = handle_membership_event(Event, State),
+    {noreply, NState};
+handle_info(reconcile, State) ->
+    TS = erlang:monotonic_time(millisecond),
+    NState = schedule_purges(TS, reconcile(State)),
+    {noreply, NState};
+handle_info({timeout, _TRef, {start, Task}}, State) ->
+    NState = handle_task(Task, State),
+    {noreply, NState};
 handle_info(Info, State) ->
     ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {noreply, State}.
 
-terminate(_Reason, _State) ->
-    try
-        ok = ekka:unmonitor(membership),
-        emqx_stats:cancel_update(route_stats),
-        mnesia:unsubscribe({table, ?ROUTING_NODE, simple})
-    catch
-        exit:{noproc, {gen_server, call, [mria_membership, _]}} ->
-            ?SLOG(warning, #{msg => "mria_membership_down"}),
-            ok
-    end.
+terminate(_Reason, _State = #{timer_reconcile := TRef}) ->
+    timer:cancel(TRef),
+    emqx_stats:cancel_update(route_stats),
+    ekka:unmonitor(membership).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
@@ -188,6 +168,153 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
+handle_membership_event({node, down, Node}, State) ->
+    record_node_down(Node, State);
+handle_membership_event({node, leaving, Node}, State) ->
+    _ = schedule_purge_left(Node),
+    record_node_left(Node, State);
+handle_membership_event({node, up, Node}, State) ->
+    record_node_alive(Node, State);
+handle_membership_event(_Event, State) ->
+    State.
+
+record_node_down(Node, State = #{down := Down}) ->
+    Record = #{since => erlang:monotonic_time(millisecond)},
+    State#{down := Down#{Node => Record}}.
+
+record_node_left(Node, State = #{left := Left}) ->
+    Record = #{since => erlang:monotonic_time(millisecond)},
+    State#{left := Left#{Node => Record}}.
+
+record_node_alive(Node, State) ->
+    forget_node(Node, State).
+
+forget_node(Node, State = #{down := Down, left := Left}) ->
+    State#{
+        down := maps:remove(Node, Down),
+        left := maps:remove(Node, Left)
+    }.
+
+reconcile(State) ->
+    %% Fallback: avoid purging live nodes, if missed lifecycle events for whatever reason.
+    lists:foldl(fun record_node_alive/2, State, mria:running_nodes()).
+
+select_outdated(NodesInfo, Since0) ->
+    maps:keys(maps:filter(fun(_, #{since := Since}) -> Since < Since0 end, NodesInfo)).
+
+schedule_purges(TS, State = #{down := Down, left := Left}) ->
+    ok = lists:foreach(fun schedule_purge/1, select_outdated(Down, TS - ?PURGE_DEAD_TIMEOUT)),
+    ok = lists:foreach(fun schedule_purge/1, select_outdated(Left, TS - ?PURGE_LEFT_TIMEOUT)),
+    State.
+
+schedule_purge(Node) ->
+    case am_core() of
+        true ->
+            schedule_task({purge, Node}, 0);
+        false ->
+            false
+    end.
+
+schedule_purge_left(Node) ->
+    case am_core() andalso pick_responsible({purge, Node}) of
+        Responsible when Responsible == node() ->
+            %% Schedule purge on responsible node first, to avoid racing for a global lock.
+            schedule_task({purge, Node}, 0);
+        _ ->
+            %% Replicant / not responsible.
+            %% In the latter case try to purge on the next reconcile, as a fallback.
+            false
+    end.
+
+handle_purge(Node, State) ->
+    try purge_dead_node_trans(Node) of
+        true ->
+            ?tp(debug, emqx_router_node_purged, #{node => Node}),
+            forget_node(Node, State);
+        false ->
+            ?tp(debug, emqx_router_node_purge_skipped, #{node => Node}),
+            forget_node(Node, State);
+        aborted ->
+            ?tp(notice, emqx_router_node_purge_aborted, #{node => Node}),
+            State
+    catch
+        Kind:Error ->
+            ?tp(warning, emqx_router_node_purge_error, #{
+                node => Node,
+                kind => Kind,
+                error => Error
+            }),
+            State
+    end.
+
+purge_dead_node(Node) ->
+    case node_has_routes(Node) of
+        true ->
+            ok = cleanup_routes(Node),
+            true;
+        false ->
+            false
+    end.
+
+purge_dead_node_trans(Node) ->
+    case node_has_routes(Node) of
+        true ->
+            global:trans(
+                {?LOCK(Node), self()},
+                fun() ->
+                    ok = cleanup_routes(Node),
+                    true
+                end,
+                cores(),
+                _Retries = 3
+            );
+        false ->
+            false
+    end.
+
+purge_this_node() ->
+    %% TODO: Guard against possible `emqx_router_helper` restarts?
+    purge_dead_node(node()).
+
+node_has_routes(Node) ->
+    ets:member(?ROUTING_NODE, Node).
+
+cleanup_routes(Node) ->
+    emqx_router:cleanup_routes(Node),
+    mria:dirty_delete(?ROUTING_NODE, Node).
+
+%%
+
+schedule_task(Task, Timeout) ->
+    emqx_utils:start_timer(choose_timeout(Timeout), {start, Task}).
+
+handle_task({purge, Node}, State) ->
+    handle_purge(Node, State).
+
+choose_timeout({Baseline, '±', Jitter}) ->
+    Baseline + rand:uniform(Jitter * 2) - Jitter;
+choose_timeout(Baseline) ->
+    Baseline.
+
+%%
+
+am_core() ->
+    mria_config:whoami() =/= replicant.
+
+cores() ->
+    %% Include stopped nodes as well.
+    mria_membership:nodelist().
+
+pick_responsible(Task) ->
+    %% Pick a responsible core node.
+    %% We expect the same node to be picked as responsible across the cluster (unless
+    %% the cluster is highly turbulent).
+    Nodes = lists:sort(mria_membership:running_core_nodelist()),
+    N = length(Nodes),
+    N > 0 andalso lists:nth(1 + erlang:phash2(Task, N), Nodes).
+
+%%
+
 stats_fun() ->
     PSRouteCount = persistent_route_count(),
     NonPSRouteCount = emqx_router:stats(n_routes),
@@ -200,6 +327,3 @@ persistent_route_count() ->
         false ->
             0
     end.
-
-cleanup_routes(Node) ->
-    emqx_router:cleanup_routes(Node).

+ 122 - 64
apps/emqx/test/emqx_router_helper_SUITE.erl

@@ -20,7 +20,6 @@
 -compile(nowarn_export_all).
 
 -include_lib("eunit/include/eunit.hrl").
--include_lib("emqx/include/emqx_router.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
@@ -28,41 +27,62 @@
 
 all() ->
     [
-        {group, routing_schema_v1},
-        {group, routing_schema_v2}
+        {group, smoke},
+        {group, cleanup},
+        {group, cluster}
     ].
 
 groups() ->
-    TCs = emqx_common_test_helpers:all(?MODULE),
+    SmokeTCs = [t_monitor, t_message],
+    CleanupTCs = [t_membership_node_leaving],
+    ClusterTCs = [
+        t_cluster_node_leaving,
+        t_cluster_node_down,
+        t_cluster_node_restart
+    ],
+    SchemaTCs = [
+        {mria_match_delete, [], CleanupTCs},
+        {fallback, [], CleanupTCs}
+    ],
     [
-        {routing_schema_v1, [], [
-            {mria_match_delete, [], TCs},
-            {fallback, [], TCs}
+        {smoke, [], SmokeTCs},
+        {cleanup, [], [
+            {group, routing_schema_v1},
+            {group, routing_schema_v2}
         ]},
-        {routing_schema_v2, [], [
-            {mria_match_delete, [], TCs},
-            {fallback, [], TCs}
-        ]}
+        {cluster, [], ClusterTCs},
+        {routing_schema_v1, [], SchemaTCs},
+        {routing_schema_v2, [], SchemaTCs}
     ].
 
+init_per_group(GroupName, Config) when
+    GroupName == smoke;
+    GroupName == cluster;
+    GroupName == routing_schema_v1;
+    GroupName == routing_schema_v2
+->
+    WorkDir = emqx_cth_suite:work_dir(Config),
+    AppSpecs = [{emqx, mk_config(GroupName)}],
+    Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}),
+    [{group_name, GroupName}, {group_apps, Apps} | Config];
 init_per_group(fallback, Config) ->
     ok = mock_mria_match_delete(),
     Config;
-init_per_group(mria_match_delete, Config) ->
-    Config;
-init_per_group(GroupName, Config) ->
-    WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]),
-    AppSpecs = [{emqx, mk_config(GroupName)}],
-    Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}),
-    [{group_name, GroupName}, {group_apps, Apps} | Config].
+init_per_group(_GroupName, Config) ->
+    Config.
 
+end_per_group(GroupName, Config) when
+    GroupName == smoke;
+    GroupName == cluster;
+    GroupName == routing_schema_v1;
+    GroupName == routing_schema_v2
+->
+    ok = emqx_cth_suite:stop(?config(group_apps, Config));
 end_per_group(fallback, _Config) ->
     unmock_mria_match_delete(),
     ok;
-end_per_group(mria_match_delete, _Config) ->
-    ok;
-end_per_group(_GroupName, Config) ->
-    ok = emqx_cth_suite:stop(?config(group_apps, Config)).
+end_per_group(_GroupName, _Config) ->
+    ok.
 
 mk_config(routing_schema_v1) ->
     #{
@@ -73,7 +93,9 @@ mk_config(routing_schema_v2) ->
     #{
         config => "broker.routing.storage_schema = v2",
         override_env => [{boot_modules, [broker]}]
-    }.
+    };
+mk_config(_) ->
+    #{override_env => [{boot_modules, [broker]}]}.
 
 mock_mria_match_delete() ->
     ok = meck:new(mria, [no_link, passthrough]),
@@ -82,63 +104,99 @@ mock_mria_match_delete() ->
 unmock_mria_match_delete() ->
     ok = meck:unload(mria).
 
-init_per_testcase(_TestCase, Config) ->
+init_per_testcase(TC, Config) ->
     ok = snabbkaffe:start_trace(),
-    Config.
+    emqx_common_test_helpers:init_per_testcase(?MODULE, TC, Config).
 
-end_per_testcase(_TestCase, _Config) ->
+end_per_testcase(TC, Config) ->
     ok = snabbkaffe:stop(),
-    ok.
+    emqx_common_test_helpers:end_per_testcase(?MODULE, TC, Config).
 
 t_monitor(_) ->
     ok = emqx_router_helper:monitor({undefined, node()}),
-    emqx_router_helper:monitor(undefined).
-
-t_mnesia(_) ->
-    ?ROUTER_HELPER ! {mnesia_table_event, {delete, {?ROUTING_NODE, node()}, undefined}},
-    ?ROUTER_HELPER ! {mnesia_table_event, testing},
-    ?ROUTER_HELPER ! {mnesia_table_event, {write, {?ROUTING_NODE, node()}, undefined}},
-    ?ROUTER_HELPER ! {membership, testing},
-    ?ROUTER_HELPER ! {membership, {mnesia, down, node()}},
-    ct:sleep(200).
-
-t_cleanup_membership_mnesia_down(_Config) ->
-    Slave = emqx_cth_cluster:node_name(node2),
-    emqx_router:add_route(<<"a/b/c">>, Slave),
-    emqx_router:add_route(<<"d/e/f">>, node()),
+    ok = emqx_router_helper:monitor(undefined).
+
+t_membership_node_leaving(_Config) ->
+    AnotherNode = emqx_cth_cluster:node_name(leaving),
+    ok = emqx_router:add_route(<<"leaving1/b/c">>, AnotherNode),
+    ok = emqx_router:add_route(<<"test/e/f">>, node()),
     ?assertMatch([_, _], emqx_router:topics()),
-    ?wait_async_action(
-        ?ROUTER_HELPER ! {membership, {mnesia, down, Slave}},
-        #{?snk_kind := emqx_router_helper_cleanup_done, node := Slave},
+    {_, {ok, _}} = ?wait_async_action(
+        ?ROUTER_HELPER ! {membership, {node, leaving, AnotherNode}},
+        #{?snk_kind := emqx_router_node_purged, node := AnotherNode},
         1_000
     ),
-    ?assertEqual([<<"d/e/f">>], emqx_router:topics()).
+    ?assertEqual([<<"test/e/f">>], emqx_router:topics()).
+
+t_cluster_node_leaving('init', Config) ->
+    start_join_node(cluster_node_leaving, Config);
+t_cluster_node_leaving('end', Config) ->
+    stop_leave_node(Config).
 
-t_cleanup_membership_node_down(_Config) ->
-    Slave = emqx_cth_cluster:node_name(node3),
-    emqx_router:add_route(<<"a/b/c">>, Slave),
-    emqx_router:add_route(<<"d/e/f">>, node()),
+t_cluster_node_leaving(Config) ->
+    ClusterNode = ?config(cluster_node, Config),
+    ok = emqx_router:add_route(<<"leaving/b/c">>, ClusterNode),
+    ok = emqx_router:add_route(<<"test/e/f">>, node()),
     ?assertMatch([_, _], emqx_router:topics()),
-    ?wait_async_action(
-        ?ROUTER_HELPER ! {membership, {node, down, Slave}},
-        #{?snk_kind := emqx_router_helper_cleanup_done, node := Slave},
-        1_000
+    {ok, {ok, _}} = ?wait_async_action(
+        erpc:call(ClusterNode, ekka, leave, []),
+        #{?snk_kind := emqx_router_node_purged, node := ClusterNode},
+        3_000
     ),
-    ?assertEqual([<<"d/e/f">>], emqx_router:topics()).
+    ?assertEqual([<<"test/e/f">>], emqx_router:topics()).
 
-t_cleanup_monitor_node_down(_Config) ->
-    [Slave] = emqx_cth_cluster:start_bare_nodes([node4]),
-    emqx_router:add_route(<<"a/b/c">>, Slave),
-    emqx_router:add_route(<<"d/e/f">>, node()),
+t_cluster_node_down('init', Config) ->
+    start_join_node(cluster_node_down, Config);
+t_cluster_node_down('end', Config) ->
+    stop_leave_node(Config).
+
+t_cluster_node_down(Config) ->
+    ClusterNode = ?config(cluster_node, Config),
+    emqx_router:add_route(<<"down/b/#">>, ClusterNode),
+    emqx_router:add_route(<<"test/e/f">>, node()),
     ?assertMatch([_, _], emqx_router:topics()),
-    ?wait_async_action(
-        emqx_cth_cluster:stop([Slave]),
-        #{?snk_kind := emqx_router_helper_cleanup_done, node := Slave},
-        1_000
+    ok = emqx_cth_cluster:stop([ClusterNode]),
+    {scheduled, {ok, _}} = ?wait_async_action(
+        %% The same as what would have happened after dead node timeout had passed.
+        emqx_router_helper:purge_force(),
+        #{?snk_kind := emqx_router_node_purged, node := ClusterNode},
+        3_000
     ),
-    ?assertEqual([<<"d/e/f">>], emqx_router:topics()).
+    ?assertEqual([<<"test/e/f">>], emqx_router:topics()).
+
+t_cluster_node_restart('init', Config) ->
+    start_join_node(cluster_node_restart, Config);
+t_cluster_node_restart('end', Config) ->
+    stop_leave_node(Config).
+
+t_cluster_node_restart(Config) ->
+    ClusterNode = ?config(cluster_node, Config),
+    ClusterSpec = ?config(cluster_node_spec, Config),
+    emqx_router:add_route(<<"restart/b/c">>, ClusterNode),
+    emqx_router:add_route(<<"test/e/f">>, node()),
+    ?assertMatch([_, _], emqx_router:topics()),
+    _ = emqx_cth_cluster:restart(ClusterSpec),
+    ?assertEqual([<<"test/e/f">>], emqx_router:topics()).
 
 t_message(_) ->
+    Pid = erlang:whereis(?ROUTER_HELPER),
     ?ROUTER_HELPER ! testing,
     gen_server:cast(?ROUTER_HELPER, testing),
-    gen_server:call(?ROUTER_HELPER, testing).
+    gen_server:call(?ROUTER_HELPER, testing),
+    ?assert(erlang:is_process_alive(Pid)),
+    ?assertEqual(Pid, erlang:whereis(?ROUTER_HELPER)).
+
+%%
+
+start_join_node(Name, Config) ->
+    [ClusterSpec] = emqx_cth_cluster:mk_nodespecs(
+        [{Name, #{apps => [emqx], join_to => node()}}],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    [ClusterNode] = emqx_cth_cluster:start([ClusterSpec]),
+    [{cluster_node, ClusterNode}, {cluster_node_spec, ClusterSpec} | Config].
+
+stop_leave_node(Config) ->
+    ClusterNode = ?config(cluster_node, Config),
+    ekka:force_leave(ClusterNode),
+    emqx_cth_cluster:stop([ClusterNode]).