Просмотр исходного кода

Merge pull request #13092 from keynslug/fix/dsrepl/site-autoleave

fix(dsrepl): properly handle transaction abort during forget site
Andrew Mayorov 1 год назад
Родитель
Сommit
398dc97ed6

+ 5 - 5
apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl

@@ -694,12 +694,12 @@ ensure_site() ->
 
 forget_node(Node) ->
     Sites = node_sites(Node),
-    Results = transaction(fun lists:map/2, [fun ?MODULE:forget_site_trans/1, Sites]),
-    case [Reason || {error, Reason} <- Results] of
-        [] ->
+    Result = transaction(fun lists:map/2, [fun ?MODULE:forget_site_trans/1, Sites]),
+    case Result of
+        Ok when is_list(Ok) ->
             ok;
-        Errors ->
-            logger:error("Failed to forget leaving node ~p: ~p", [Node, Errors])
+        {error, Reason} ->
+            logger:error("Failed to forget leaving node ~p: ~p", [Node, Reason])
     end.
 
 %% @doc Returns sorted list of sites shards are replicated across.

+ 30 - 6
apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl

@@ -229,6 +229,7 @@ handle_transition(DB, Shard, Trans, Handler) ->
         domain => [emqx, ds, DB, shard_transition]
     }),
     ?tp(
+        debug,
         dsrepl_shard_transition_begin,
         #{shard => Shard, db => DB, transition => Trans, pid => self()}
     ),
@@ -240,7 +241,12 @@ apply_handler(Fun, DB, Shard, Trans) ->
     erlang:apply(Fun, [DB, Shard, Trans]).
 
 trans_add_local(DB, Shard, {add, Site}) ->
-    logger:info(#{msg => "Adding new local shard replica", site => Site}),
+    logger:info(#{
+        msg => "Adding new local shard replica",
+        site => Site,
+        db => DB,
+        shard => Shard
+    }),
     do_add_local(membership, DB, Shard).
 
 do_add_local(membership = Stage, DB, Shard) ->
@@ -251,6 +257,8 @@ do_add_local(membership = Stage, DB, Shard) ->
         {error, recoverable, Reason} ->
             logger:warning(#{
                 msg => "Shard membership change failed",
+                db => DB,
+                shard => Shard,
                 reason => Reason,
                 retry_in => ?TRANS_RETRY_TIMEOUT
             }),
@@ -261,10 +269,12 @@ do_add_local(readiness = Stage, DB, Shard) ->
     LocalServer = emqx_ds_replication_layer_shard:local_server(DB, Shard),
     case emqx_ds_replication_layer_shard:server_info(readiness, LocalServer) of
         ready ->
-            logger:info(#{msg => "Local shard replica ready"});
+            logger:info(#{msg => "Local shard replica ready", db => DB, shard => Shard});
         Status ->
             logger:warning(#{
                 msg => "Still waiting for local shard replica to be ready",
+                db => DB,
+                shard => Shard,
                 status => Status,
                 retry_in => ?TRANS_RETRY_TIMEOUT
             }),
@@ -273,7 +283,12 @@ do_add_local(readiness = Stage, DB, Shard) ->
     end.
 
 trans_drop_local(DB, Shard, {del, Site}) ->
-    logger:info(#{msg => "Dropping local shard replica", site => Site}),
+    logger:info(#{
+        msg => "Dropping local shard replica",
+        site => Site,
+        db => DB,
+        shard => Shard
+    }),
     do_drop_local(DB, Shard).
 
 do_drop_local(DB, Shard) ->
@@ -293,17 +308,24 @@ do_drop_local(DB, Shard) ->
     end.
 
 trans_rm_unresponsive(DB, Shard, {del, Site}) ->
-    logger:info(#{msg => "Removing unresponsive shard replica", site => Site}),
+    logger:info(#{
+        msg => "Removing unresponsive shard replica",
+        site => Site,
+        db => DB,
+        shard => Shard
+    }),
     do_rm_unresponsive(DB, Shard, Site).
 
 do_rm_unresponsive(DB, Shard, Site) ->
     Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site),
     case emqx_ds_replication_layer_shard:remove_server(DB, Shard, Server) of
         ok ->
-            logger:info(#{msg => "Unresponsive shard replica removed"});
+            logger:info(#{msg => "Unresponsive shard replica removed", db => DB, shard => Shard});
         {error, recoverable, Reason} ->
             logger:warning(#{
                 msg => "Shard membership change failed",
+                db => DB,
+                shard => Shard,
                 reason => Reason,
                 retry_in => ?TRANS_RETRY_TIMEOUT
             }),
@@ -341,6 +363,7 @@ handle_exit(Pid, Reason, State0 = #{db := DB, transitions := Ts}) ->
     case maps:to_list(maps:filter(fun(_, TH) -> TH#transhdl.pid == Pid end, Ts)) of
         [{Track, #transhdl{shard = Shard, trans = Trans}}] ->
             ?tp(
+                debug,
                 dsrepl_shard_transition_end,
                 #{shard => Shard, db => DB, transition => Trans, pid => Pid, reason => Reason}
             ),
@@ -361,9 +384,10 @@ handle_transition_exit(Shard, Trans, normal, State = #{db := DB}) ->
     State;
 handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, State) ->
     State;
-handle_transition_exit(Shard, Trans, Reason, State) ->
+handle_transition_exit(Shard, Trans, Reason, State = #{db := DB}) ->
     logger:warning(#{
         msg => "Shard membership transition failed",
+        db => DB,
         shard => Shard,
         transition => Trans,
         reason => Reason,