Browse Source

Merge pull request #13054 from keynslug/fix/EMQX-12365/node-leave

fix(dsrepl): anticipate and handle nodes leaving the cluster
Andrew Mayorov 1 year atrás
parent
commit
86f99959b0

+ 148 - 30
apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl

@@ -35,6 +35,7 @@
     sites/0,
     node/1,
     this_site/0,
+    forget_site/1,
     print_status/0
 ]).
 
@@ -75,7 +76,8 @@
     update_replica_set_trans/3,
     update_db_config_trans/2,
     drop_db_trans/1,
-    claim_site/2,
+    claim_site_trans/2,
+    forget_site_trans/1,
     n_shards/1
 ]).
 
@@ -153,6 +155,11 @@
     erlang:make_tuple(record_info(size, ?NODE_TAB), '_')
 ).
 
+-define(NODE_PAT(NODE),
+    %% Equivalent of `#?NODE_TAB{node = NODE, _ = '_'}`:
+    erlang:make_tuple(record_info(size, ?NODE_TAB), '_', [{#?NODE_TAB.node, NODE}])
+).
+
 -define(SHARD_PAT(SHARD),
     %% Equivalent of `#?SHARD_TAB{shard = SHARD, _ = '_'}`
     erlang:make_tuple(record_info(size, ?SHARD_TAB), '_', [{#?SHARD_TAB.shard, SHARD}])
@@ -164,32 +171,89 @@
 
 -spec print_status() -> ok.
 print_status() ->
-    io:format("THIS SITE:~n~s~n", [this_site()]),
+    io:format("THIS SITE:~n"),
+    try this_site() of
+        Site -> io:format("~s~n", [Site])
+    catch
+        error:badarg ->
+            io:format(
+                "(!) UNCLAIMED~n"
+                "(!) Likely this node's name is already known as another site in the cluster.~n"
+                "(!) Please resolve conflicts manually.~n"
+            )
+    end,
     io:format("~nSITES:~n", []),
-    Nodes = [node() | nodes()],
     lists:foreach(
         fun(#?NODE_TAB{site = Site, node = Node}) ->
             Status =
-                case lists:member(Node, Nodes) of
-                    true -> up;
-                    false -> down
+                case mria:cluster_status(Node) of
+                    running -> "    up";
+                    stopped -> "(x) down";
+                    false -> "(!) UNIDENTIFIED"
                 end,
-            io:format("~s    ~p    ~p~n", [Site, Node, Status])
+            io:format("~s    ~p    ~s~n", [Site, Node, Status])
         end,
         eval_qlc(mnesia:table(?NODE_TAB))
     ),
+    Shards = eval_qlc(mnesia:table(?SHARD_TAB)),
     io:format(
-        "~nSHARDS:~nId                             Replicas~n", []
+        "~nSHARDS:~n~s~s~n",
+        [string:pad("Shard", 30), "Replicas"]
+    ),
+    lists:foreach(
+        fun(#?SHARD_TAB{shard = DBShard, replica_set = RS}) ->
+            ShardStr = format_shard(DBShard),
+            ReplicasStr = string:join([format_replica(R) || R <- RS], "  "),
+            io:format(
+                "~s~s~n",
+                [string:pad(ShardStr, 30), ReplicasStr]
+            )
+        end,
+        Shards
     ),
+    PendingTransitions = lists:filtermap(
+        fun(Record = #?SHARD_TAB{shard = DBShard}) ->
+            case compute_transitions(Record) of
+                [] -> false;
+                Transitions -> {true, {DBShard, Transitions}}
+            end
+        end,
+        Shards
+    ),
+    PendingTransitions /= [] andalso
+        io:format(
+            "~nREPLICA TRANSITIONS:~n~s~s~n",
+            [string:pad("Shard", 30), "Transitions"]
+        ),
     lists:foreach(
-        fun(#?SHARD_TAB{shard = {DB, Shard}, replica_set = RS}) ->
-            ShardStr = string:pad(io_lib:format("~p/~s", [DB, Shard]), 30),
-            ReplicasStr = string:pad(io_lib:format("~p", [RS]), 40),
-            io:format("~s ~s~n", [ShardStr, ReplicasStr])
+        fun({DBShard, Transitions}) ->
+            ShardStr = format_shard(DBShard),
+            TransStr = string:join(lists:map(fun format_transition/1, Transitions), "  "),
+            io:format(
+                "~s~s~n",
+                [string:pad(ShardStr, 30), TransStr]
+            )
         end,
-        eval_qlc(mnesia:table(?SHARD_TAB))
+        PendingTransitions
     ).
 
+format_shard({DB, Shard}) ->
+    io_lib:format("~p/~s", [DB, Shard]).
+
+format_replica(Site) ->
+    Marker =
+        case mria:cluster_status(?MODULE:node(Site)) of
+            running -> "   ";
+            stopped -> "(x)";
+            false -> "(!)"
+        end,
+    io_lib:format("~s ~s", [Marker, Site]).
+
+format_transition({add, Site}) ->
+    io_lib:format("+~s", [Site]);
+format_transition({del, Site}) ->
+    io_lib:format("-~s", [Site]).
+
 -spec this_site() -> site().
 this_site() ->
     persistent_term:get(?emqx_ds_builtin_site).
@@ -256,6 +320,15 @@ node(Site) ->
             undefined
     end.
 
+-spec forget_site(site()) -> ok | {error, _}.
+forget_site(Site) ->
+    case mnesia:dirty_read(?NODE_TAB, Site) of
+        [] ->
+            {error, nonexistent_site};
+        [Record] ->
+            transaction(fun ?MODULE:forget_site_trans/1, [Record])
+    end.
+
 %%===============================================================================
 %% DB API
 %%===============================================================================
@@ -314,8 +387,8 @@ db_sites(DB) ->
     [transition()] | undefined.
 replica_set_transitions(DB, Shard) ->
     case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
-        [#?SHARD_TAB{target_set = TargetSet, replica_set = ReplicaSet}] ->
-            compute_transitions(TargetSet, ReplicaSet);
+        [Record] ->
+            compute_transitions(Record);
         [] ->
             undefined
     end.
@@ -374,6 +447,7 @@ unsubscribe(Pid) ->
 init([]) ->
     process_flag(trap_exit, true),
     logger:set_process_metadata(#{domain => [ds, meta]}),
+    ok = ekka:monitor(membership),
     ensure_tables(),
     ensure_site(),
     S = #s{},
@@ -395,6 +469,9 @@ handle_info({mnesia_table_event, {write, #?SHARD_TAB{shard = {DB, Shard}}, _}},
     {noreply, S};
 handle_info({'DOWN', _MRef, process, Pid, _Reason}, S) ->
     {noreply, handle_unsubscribe(Pid, S)};
+handle_info({membership, {node, leaving, Node}}, S) ->
+    forget_node(Node),
+    {noreply, S};
 handle_info(_Info, S) ->
     {noreply, S}.
 
@@ -420,13 +497,6 @@ open_db_trans(DB, CreateOpts) ->
 -spec allocate_shards_trans(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
 allocate_shards_trans(DB) ->
     Opts = #{n_shards := NShards, n_sites := NSites} = db_config_trans(DB),
-    Nodes = mnesia:match_object(?NODE_TAB, ?NODE_PAT(), read),
-    case length(Nodes) of
-        N when N >= NSites ->
-            ok;
-        _ ->
-            mnesia:abort({insufficient_sites_online, NSites, Nodes})
-    end,
     case mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write) of
         [] ->
             ok;
@@ -434,6 +504,13 @@ allocate_shards_trans(DB) ->
             ShardsAllocated = [Shard || #?SHARD_TAB{shard = {_DB, Shard}} <- Records],
             mnesia:abort({shards_already_allocated, ShardsAllocated})
     end,
+    Nodes = mnesia:match_object(?NODE_TAB, ?NODE_PAT(), read),
+    case length(Nodes) of
+        N when N >= NSites ->
+            ok;
+        _ ->
+            mnesia:abort({insufficient_sites_online, NSites, Nodes})
+    end,
     Shards = gen_shards(NShards),
     Sites = [S || #?NODE_TAB{site = S} <- Nodes],
     Allocation = compute_allocation(Shards, Sites, Opts),
@@ -464,7 +541,7 @@ assign_db_sites_trans(DB, Sites) ->
     %% Optimize reallocation. The goals are:
     %% 1. Minimize the number of membership transitions.
     %% 2. Ensure that sites are responsible for roughly the same number of shards.
-    Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write),
+    Shards = db_shards_trans(DB),
     Reallocation = compute_allocation(Shards, Sites, Opts),
     ok = lists:foreach(
         fun({Record, ReplicaSet}) ->
@@ -476,7 +553,7 @@ assign_db_sites_trans(DB, Sites) ->
 
 -spec modify_db_sites_trans(emqx_ds:db(), [transition()]) -> {ok, unchanged | [site()]}.
 modify_db_sites_trans(DB, Modifications) ->
-    Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write),
+    Shards = db_shards_trans(DB),
     Sites0 = list_db_target_sites(Shards),
     Sites = lists:foldl(fun apply_transition/2, Sites0, Modifications),
     case Sites of
@@ -532,15 +609,40 @@ db_config_trans(DB, LockType) ->
             mnesia:abort({nonexistent_db, DB})
     end.
 
+db_shards_trans(DB) ->
+    mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write).
+
 -spec drop_db_trans(emqx_ds:db()) -> ok.
 drop_db_trans(DB) ->
     mnesia:delete({?META_TAB, DB}),
     [mnesia:delete({?SHARD_TAB, Shard}) || Shard <- shards(DB)],
     ok.
 
--spec claim_site(site(), node()) -> ok.
-claim_site(Site, Node) ->
-    mnesia:write(#?NODE_TAB{site = Site, node = Node}).
+-spec claim_site_trans(site(), node()) -> ok.
+claim_site_trans(Site, Node) ->
+    case node_sites(Node) of
+        [] ->
+            mnesia:write(#?NODE_TAB{site = Site, node = Node});
+        [#?NODE_TAB{site = Site}] ->
+            ok;
+        Records ->
+            ExistingSites = [S || #?NODE_TAB{site = S} <- Records],
+            mnesia:abort({conflicting_node_site, ExistingSites})
+    end.
+
+-spec forget_site_trans(_Record :: tuple()) -> ok.
+forget_site_trans(Record = #?NODE_TAB{site = Site}) ->
+    DBs = mnesia:all_keys(?META_TAB),
+    SiteDBs = [DB || DB <- DBs, S <- list_db_target_sites(db_shards_trans(DB)), S == Site],
+    case SiteDBs of
+        [] ->
+            mnesia:delete_object(?NODE_TAB, Record, write);
+        [_ | _] ->
+            mnesia:abort({member_of_replica_sets, SiteDBs})
+    end.
+
+node_sites(Node) ->
+    mnesia:dirty_match_object(?NODE_TAB, ?NODE_PAT(Node)).
 
 %%================================================================================
 %% Internal functions
@@ -583,9 +685,22 @@ ensure_site() ->
             io:format(FD, "~p.", [Site]),
             file:close(FD)
     end,
-    {atomic, ok} = mria:transaction(?SHARD, fun ?MODULE:claim_site/2, [Site, node()]),
-    persistent_term:put(?emqx_ds_builtin_site, Site),
-    ok.
+    case transaction(fun ?MODULE:claim_site_trans/2, [Site, node()]) of
+        ok ->
+            persistent_term:put(?emqx_ds_builtin_site, Site);
+        {error, Reason} ->
+            logger:error("Attempt to claim site with ID=~s failed: ~p", [Site, Reason])
+    end.
+
+forget_node(Node) ->
+    Sites = node_sites(Node),
+    Results = transaction(fun lists:map/2, [fun ?MODULE:forget_site_trans/1, Sites]),
+    case [Reason || {error, Reason} <- Results] of
+        [] ->
+            ok;
+        Errors ->
+            logger:error("Failed to forget leaving node ~p: ~p", [Node, Errors])
+    end.
 
 %% @doc Returns sorted list of sites shards are replicated across.
 -spec list_db_sites([_Shard]) -> [site()].
@@ -625,6 +740,9 @@ compute_allocation(Shards, Sites, Opts) ->
     ),
     Allocation.
 
+compute_transitions(#?SHARD_TAB{target_set = TargetSet, replica_set = ReplicaSet}) ->
+    compute_transitions(TargetSet, ReplicaSet).
+
 compute_transitions(undefined, _ReplicaSet) ->
     [];
 compute_transitions(TargetSet, ReplicaSet) ->

+ 1 - 1
apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl

@@ -191,7 +191,7 @@ handle_shard_transitions(Shard, [Trans | _Rest], State) ->
     end.
 
 transition_handler(Shard, Trans, _State = #{db := DB}) ->
-    ThisSite = emqx_ds_replication_layer_meta:this_site(),
+    ThisSite = catch emqx_ds_replication_layer_meta:this_site(),
     case Trans of
         {add, ThisSite} ->
             {Shard, fun trans_add_local/3};

+ 19 - 3
apps/emqx_management/src/emqx_mgmt_api_ds.erl

@@ -36,7 +36,9 @@
 
     update_db_sites/3,
     join/3,
-    leave/3
+    leave/3,
+
+    forget/2
 ]).
 
 %% behavior callbacks:
@@ -377,6 +379,14 @@ leave(DB, Site, Via) ->
     }),
     meta_result_to_binary(emqx_ds_replication_layer_meta:leave_db_site(DB, Site)).
 
+-spec forget(emqx_ds_replication_layer_meta:site(), rest | cli) ->
+    ok | {error, _}.
+forget(Site, Via) ->
+    ?SLOG(warning, #{
+        msg => "durable_storage_forget_request", site => Site, via => Via
+    }),
+    meta_result_to_binary(emqx_ds_replication_layer_meta:forget_site(Site)).
+
 %%================================================================================
 %% Internal functions
 %%================================================================================
@@ -467,14 +477,20 @@ list_shards(DB) ->
      || Shard <- emqx_ds_replication_layer_meta:shards(DB)
     ].
 
-meta_result_to_binary({ok, Result}) ->
-    {ok, Result};
+meta_result_to_binary(Ok) when Ok == ok orelse element(1, Ok) == ok ->
+    Ok;
 meta_result_to_binary({error, {nonexistent_sites, UnknownSites}}) ->
     Msg = ["Unknown sites: " | lists:join(", ", UnknownSites)],
     {error, iolist_to_binary(Msg)};
 meta_result_to_binary({error, {nonexistent_db, DB}}) ->
     IOList = io_lib:format("Unknown storage: ~p", [DB]),
     {error, iolist_to_binary(IOList)};
+meta_result_to_binary({error, nonexistent_site}) ->
+    {error, <<"Unknown site">>};
+meta_result_to_binary({error, {member_of_replica_sets, DBNames}}) ->
+    DBs = lists:map(fun atom_to_binary/1, DBNames),
+    Msg = ["Site is still a member of replica sets of: " | lists:join(", ", DBs)],
+    {error, iolist_to_binary(Msg)};
 meta_result_to_binary({error, Err}) ->
     IOList = io_lib:format("Error: ~p", [Err]),
     {error, iolist_to_binary(IOList)}.

+ 9 - 1
apps/emqx_management/src/emqx_mgmt_cli.erl

@@ -891,13 +891,21 @@ do_ds(["leave", DBStr, Site]) ->
         {error, _} ->
             emqx_ctl:print("Unknown durable storage~n")
     end;
+do_ds(["forget", Site]) ->
+    case emqx_mgmt_api_ds:forget(list_to_binary(Site), cli) of
+        ok ->
+            emqx_ctl:print("ok~n");
+        {error, Description} ->
+            emqx_ctl:print("Unable to forget site: ~s~n", [Description])
+    end;
 do_ds(_) ->
     emqx_ctl:usage([
         {"ds info", "Show overview of the embedded durable storage state"},
         {"ds set_replicas <storage> <site1> <site2> ...",
             "Change the replica set of the durable storage"},
         {"ds join <storage> <site>", "Add site to the replica set of the storage"},
-        {"ds leave <storage> <site>", "Remove site from the replica set of the storage"}
+        {"ds leave <storage> <site>", "Remove site from the replica set of the storage"},
+        {"ds forget <site>", "Forcefully remove a site from the list of known sites"}
     ]).
 
 %%--------------------------------------------------------------------