Просмотр исходного кода

fix(dsraft): anticipate accidentally duplicate site entries to migrate

Andrew Mayorov 1 год назад
Родитель
Сommit
5fbfd66d90
1 измененных файлов с 29 добавлено и 16 удалено
  1. 29 16
      apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_meta.erl

+ 29 - 16
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_meta.erl

@@ -76,8 +76,8 @@
 -export([
 -export([
     migrate_node_table/0,
     migrate_node_table/0,
     migrate_shard_table/0,
     migrate_shard_table/0,
-    migrate_node_table_trans/0,
-    migrate_shard_table_trans/0
+    migrate_node_table_trans/1,
+    migrate_shard_table_trans/1
 ]).
 ]).
 
 
 -export_type([
 -export_type([
@@ -905,11 +905,13 @@ migrate_node_table() ->
 migrate_node_table(Tab, #{attributes := [_Site, _Node, _Misc]}) ->
 migrate_node_table(Tab, #{attributes := [_Site, _Node, _Misc]}) ->
     %% Table is present and looks migratable.
     %% Table is present and looks migratable.
     ok = mria:wait_for_tables([Tab]),
     ok = mria:wait_for_tables([Tab]),
-    case transaction(fun ?MODULE:migrate_node_table_trans/0, []) of
-        {migrated, []} ->
+    case transaction(fun ?MODULE:migrate_node_table_trans/1, [Tab]) of
+        {migrated, [], []} ->
             ok;
             ok;
-        {migrated, Migrated} ->
+        {migrated, Migrated, Dups} ->
             logger:notice("Table '~p' migrated ~p entries", [Tab, length(Migrated)]),
             logger:notice("Table '~p' migrated ~p entries", [Tab, length(Migrated)]),
+            Dups =/= [] andalso
+                logger:warning("Table '~p' duplicated entries, skipped: ~p", [Tab, Dups]),
             {atomic, ok} = mria:clear_table(Tab);
             {atomic, ok} = mria:clear_table(Tab);
         {error, Reason} ->
         {error, Reason} ->
             logger:warning("Table '~p' unusable, migration skipped: ~p", [Tab, Reason])
             logger:warning("Table '~p' unusable, migration skipped: ~p", [Tab, Reason])
@@ -918,7 +920,7 @@ migrate_node_table(_Tab, undefined) ->
     %% No legacy table exists.
     %% No legacy table exists.
     ok.
     ok.
 
 
-migrate_node_table_trans() ->
+migrate_node_table_trans(Tab) ->
     %% NOTE
     %% NOTE
     %% This table could have been populated when running 5.4.0 release, but the
     %% This table could have been populated when running 5.4.0 release, but the
     %% representation of site IDs has changed in following versions. Legacy site IDs
     %% representation of site IDs has changed in following versions. Legacy site IDs
@@ -926,17 +928,29 @@ migrate_node_table_trans() ->
     %% existing code of those IDs to be "printable" will be broken.
     %% existing code of those IDs to be "printable" will be broken.
     %% This should be no-op when running > 5.4.0 releases.
     %% This should be no-op when running > 5.4.0 releases.
     Migstamp = mk_migstamp(),
     Migstamp = mk_migstamp(),
-    Records = mnesia:match_object(?NODE_TAB_LEGACY, {?NODE_TAB_LEGACY, '_', '_', '_'}, read),
-    Migrate = [attach_migstamp(Migstamp, migrate_node_rec(R)) || R <- Records],
+    Records = mnesia:match_object(Tab, {Tab, '_', '_', '_'}, read),
+    {Migrate, Dups} = unique_node_recs([migrate_node_rec(R) || R <- Records]),
     lists:foreach(
     lists:foreach(
-        fun(R) -> mnesia:write(?NODE_TAB, R, write) end,
+        fun(R) -> mnesia:write(?NODE_TAB, attach_migstamp(Migstamp, R), write) end,
         Migrate
         Migrate
     ),
     ),
-    {migrated, Migrate}.
+    {migrated, Migrate, Dups}.
 
 
 migrate_node_rec({?NODE_TAB_LEGACY, Site, Node, Misc}) ->
 migrate_node_rec({?NODE_TAB_LEGACY, Site, Node, Misc}) ->
     #?NODE_TAB{site = migrate_site_id(Site), node = Node, misc = Misc}.
     #?NODE_TAB{site = migrate_site_id(Site), node = Node, misc = Misc}.
 
 
+unique_node_recs(Records) ->
+    %% NOTE
+    %% Unlikely but possible that a 5.4.0 node could have assigned more than 1 Site ID
+    %% to itself, because of occasional inability to read back Site ID with
+    %% `file:consult/1. In this case it's impossible to tell in 100% of cases which one
+    %% was the most recent, so let's just drop all of such node's records. It will
+    %% insert the correct record by itself anyway, once upgraded to the recent release
+    %% and restarted.
+    Dups = Records -- lists:ukeysort(#?NODE_TAB.node, Records),
+    DupNodes = [Node || #?NODE_TAB{node = Node} <- Dups],
+    lists:partition(fun(R) -> not lists:member(R#?NODE_TAB.node, DupNodes) end, Records).
+
 migrate_shard_table() ->
 migrate_shard_table() ->
     Tab = ?SHARD_TAB_LEGACY,
     Tab = ?SHARD_TAB_LEGACY,
     migrate_shard_table(Tab, table_info_safe(Tab)).
     migrate_shard_table(Tab, table_info_safe(Tab)).
@@ -944,7 +958,7 @@ migrate_shard_table() ->
 migrate_shard_table(Tab, #{attributes := [_Shard, _ReplicaSet, _TargetSet, _Misc]}) ->
 migrate_shard_table(Tab, #{attributes := [_Shard, _ReplicaSet, _TargetSet, _Misc]}) ->
     %% Table is present and looks migratable.
     %% Table is present and looks migratable.
     ok = mria:wait_for_tables([Tab]),
     ok = mria:wait_for_tables([Tab]),
-    case transaction(fun ?MODULE:migrate_shard_table_trans/0, []) of
+    case transaction(fun ?MODULE:migrate_shard_table_trans/1, [Tab]) of
         {migrated, []} ->
         {migrated, []} ->
             ok;
             ok;
         {migrated, Migrated} ->
         {migrated, Migrated} ->
@@ -967,18 +981,17 @@ migrate_shard_table(_Tab, undefined) ->
     %% No legacy table exists.
     %% No legacy table exists.
     ok.
     ok.
 
 
-migrate_shard_table_trans() ->
+migrate_shard_table_trans(Tab) ->
     %% NOTE
     %% NOTE
     %% This table could have been instantiated with a different schema when running
     %% This table could have been instantiated with a different schema when running
     %% 5.4.0 release but most likely never populated, so it should be fine to abandon it.
     %% 5.4.0 release but most likely never populated, so it should be fine to abandon it.
     %% This table could also have been instantiated and populated when running 5.7.0
     %% This table could also have been instantiated and populated when running 5.7.0
     %% release with the same schema, so we just have to migrate all the recoards verbatim.
     %% release with the same schema, so we just have to migrate all the recoards verbatim.
     Migstamp = mk_migstamp(),
     Migstamp = mk_migstamp(),
-    Pattern = {?SHARD_TAB_LEGACY, '_', '_', '_', '_'},
-    Records = mnesia:match_object(?SHARD_TAB_LEGACY, Pattern, read),
-    Migrate = [attach_migstamp(Migstamp, migrate_shard_rec(R)) || R <- Records],
+    Records = mnesia:match_object(Tab, {Tab, '_', '_', '_', '_'}, read),
+    Migrate = [migrate_shard_rec(R) || R <- Records],
     lists:foreach(
     lists:foreach(
-        fun(R) -> mnesia:write(?SHARD_TAB, R, write) end,
+        fun(R) -> mnesia:write(?SHARD_TAB, attach_migstamp(Migstamp, R), write) end,
         Migrate
         Migrate
     ),
     ),
     {migrated, Migrate}.
     {migrated, Migrate}.