Просмотр исходного кода

fix(dsraft): wait table readiness before upgrading

Relying on the fact that schema should have been loaded by that time.
Andrew Mayorov 1 год назад
Родитель
Сommit
c1e77f6c86
1 измененных файлов с 54 добавлено и 33 удалено
  1. 54 33
      apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_meta.erl

+ 54 - 33
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_meta.erl

@@ -899,18 +899,24 @@ notify_subscribers(EventSubject, Event, #s{subs = Subs}) ->
 %%====================================================================
 
 migrate_node_table() ->
+    Tab = ?NODE_TAB_LEGACY,
+    migrate_node_table(Tab, table_info_safe(Tab)).
+
+migrate_node_table(Tab, #{attributes := [_Site, _Node, _Misc]}) ->
+    %% Table is present and looks migratable.
+    ok = mria:wait_for_tables([Tab]),
     case transaction(fun ?MODULE:migrate_node_table_trans/0, []) of
         {migrated, []} ->
             ok;
         {migrated, Migrated} ->
-            logger:notice("Table '~p' migrated ~p entries", [?NODE_TAB, length(Migrated)]),
-            {atomic, ok} = mria:clear_table(?NODE_TAB_LEGACY);
+            logger:notice("Table '~p' migrated ~p entries", [Tab, length(Migrated)]),
+            {atomic, ok} = mria:clear_table(Tab);
         {error, Reason} ->
-            logger:warning(
-                "Table '~p' unusable, migration skipped: ~p",
-                [?NODE_TAB_LEGACY, Reason]
-            )
-    end.
+            logger:warning("Table '~p' unusable, migration skipped: ~p", [Tab, Reason])
+    end;
+migrate_node_table(_Tab, undefined) ->
+    %% No legacy table exists.
+    ok.
 
 migrate_node_table_trans() ->
     %% NOTE
@@ -932,24 +938,34 @@ migrate_node_rec({?NODE_TAB_LEGACY, Site, Node, Misc}) ->
     #?NODE_TAB{site = migrate_site_id(Site), node = Node, misc = Misc}.
 
 migrate_shard_table() ->
+    Tab = ?SHARD_TAB_LEGACY,
+    migrate_shard_table(Tab, table_info_safe(Tab)).
+
+migrate_shard_table(Tab, #{attributes := [_Shard, _ReplicaSet, _TargetSet, _Misc]}) ->
+    %% Table is present and looks migratable.
+    ok = mria:wait_for_tables([Tab]),
     case transaction(fun ?MODULE:migrate_shard_table_trans/0, []) of
-        {skipped, 0} ->
+        {migrated, []} ->
             ok;
         {migrated, Migrated} ->
-            logger:notice("Table '~p' migrated ~p entries", [?SHARD_TAB_LEGACY, length(Migrated)]),
-            {atomic, ok} = mria:clear_table(?SHARD_TAB_LEGACY);
-        {skipped, Size} ->
-            logger:warning(
-                "Table '~p' has ~p legacy entries to be abandoned",
-                [?SHARD_TAB_LEGACY, Size]
-            ),
-            {atomic, ok} = mria:clear_table(?SHARD_TAB_LEGACY);
+            logger:notice("Table '~p' migrated ~p entries", [Tab, length(Migrated)]),
+            {atomic, ok} = mria:clear_table(Tab);
         {error, Reason} ->
-            logger:warning(
-                "Table '~p' unusable, migration skipped: ~p",
-                [?SHARD_TAB_LEGACY, Reason]
-            )
-    end.
+            logger:warning("Table '~p' unusable, migration skipped: ~p", [Tab, Reason])
+    end;
+migrate_shard_table(Tab, #{attributes := _Incompatible}) ->
+    %% Table is present and is incompatible.
+    ok = mria:wait_for_tables([Tab]),
+    case mnesia:table_info(Tab, size) of
+        0 ->
+            ok;
+        Size ->
+            logger:warning("Table '~p' has ~p legacy entries to be abandoned", [Size, Tab]),
+            {atomic, ok} = mria:clear_table(Tab)
+    end;
+migrate_shard_table(_Tab, undefined) ->
+    %% No legacy table exists.
+    ok.
 
 migrate_shard_table_trans() ->
     %% NOTE
@@ -958,18 +974,14 @@ migrate_shard_table_trans() ->
     %% This table could also have been instantiated and populated when running 5.7.0
     %% release with the same schema, so we just have to migrate all the recoards verbatim.
     Migstamp = mk_migstamp(),
-    case mnesia:match_object(?SHARD_TAB_LEGACY, {?SHARD_TAB_LEGACY, '_', '_', '_', '_'}, read) of
-        [] ->
-            %% NOTE: Find out how much legacy entries are there (most likely zero).
-            {skipped, mnesia:table_info(?SHARD_TAB_LEGACY, size)};
-        [_ | _] = Records ->
-            Migrate = [attach_migstamp(Migstamp, migrate_shard_rec(R)) || R <- Records],
-            lists:foreach(
-                fun(R) -> mnesia:write(?SHARD_TAB, R, write) end,
-                Migrate
-            ),
-            {migrated, Migrate}
-    end.
+    Pattern = {?SHARD_TAB_LEGACY, '_', '_', '_', '_'},
+    Records = mnesia:match_object(?SHARD_TAB_LEGACY, Pattern, read),
+    Migrate = [attach_migstamp(Migstamp, migrate_shard_rec(R)) || R <- Records],
+    lists:foreach(
+        fun(R) -> mnesia:write(?SHARD_TAB, R, write) end,
+        Migrate
+    ),
+    {migrated, Migrate}.
 
 migrate_shard_rec({?SHARD_TAB_LEGACY, Shard, ReplicaSet, TargetSet, Misc}) ->
     #?SHARD_TAB{shard = Shard, replica_set = ReplicaSet, target_set = TargetSet, misc = Misc}.
@@ -986,6 +998,15 @@ attach_migstamp(Migstamp, Node = #?NODE_TAB{misc = Misc}) ->
 attach_migstamp(Migstamp, Shard = #?SHARD_TAB{misc = Misc}) ->
     Shard#?SHARD_TAB{misc = Misc#{migrated => Migstamp}}.
 
+table_info_safe(Tab) ->
+    try mnesia:table_info(Tab, all) of
+        Props ->
+            maps:from_list(Props)
+    catch
+        exit:{aborted, {no_exists, Tab, _}} ->
+            undefined
+    end.
+
 %%====================================================================
 
 %% @doc Intersperse elements of two lists.