Просмотр исходного кода

fix(dsstore): clean orphaned column families on storage init

Andrew Mayorov 1 год назад
Родитель
Сommit
b49478dddf
1 измененных файлов с 56 добавлено и 2 удалено
  1. 56 2
      apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

+ 56 - 2
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -64,7 +64,15 @@
 ]).
 
 %% gen_server
--export([init/1, format_status/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
+-export([
+    init/1,
+    format_status/1,
+    handle_continue/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2
+]).
 
 %% internal exports:
 -export([db_dir/1, base_dir/0]).
@@ -784,7 +792,53 @@ init({ShardId, Options}) ->
     },
     commit_metadata(S),
     ?tp(debug, ds_storage_init_state, #{shard => ShardId, s => S}),
-    {ok, S}.
+    {ok, S, {continue, clean_orphans}}.
+
+handle_continue(
+    clean_orphans,
+    S = #s{shard_id = ShardId, db = DB, cf_refs = CFRefs, schema = Schema}
+) ->
+    %% Add / drop generation are not transactional.
+    %% This means that the storage may contain "orphaned" column families, i.e.
+    %% column families that do not belong to a live generation. We need to clean
+    %% them, because an attempt to create existing column family is an error,
+    %% therefore `add_generation/2` is not idempotent. Cleaning seems to be safe:
+    %% either it's unfinished `handle_add_generation/2` meaning CFs are empty, or
+    %% it's unfinished `handle_drop_generation/2` meaning CFs was meant to be
+    %% dropped anyway.
+    CFNames = maps:fold(
+        fun
+            (?GEN_KEY(_), #{cf_names := GenCFNames}, Acc) ->
+                GenCFNames ++ Acc;
+            (_Prop, _, Acc) ->
+                Acc
+        end,
+        [],
+        Schema
+    ),
+    OrphanedCFRefs = lists:foldl(fun proplists:delete/2, CFRefs, CFNames),
+    case OrphanedCFRefs of
+        [] ->
+            {noreply, S};
+        [_ | _] ->
+            lists:foreach(
+                fun({CFName, CFHandle}) ->
+                    Result = rocksdb:drop_column_family(DB, CFHandle),
+                    ?tp(
+                        warning,
+                        ds_storage_layer_dropped_orphaned_column_family,
+                        #{
+                            shard => ShardId,
+                            orphan => CFName,
+                            result => Result,
+                            s => format_state(S)
+                        }
+                    )
+                end,
+                OrphanedCFRefs
+            ),
+            {noreply, S#s{cf_refs = CFRefs -- OrphanedCFRefs}}
+    end.
 
 format_status(Status) ->
     maps:map(