Explorar o código

fix(fs-gc): do not hide `enoent`s

Also use `file:read_link_info/2`, it actually fetches any file info
while also not following symlinks automatically, which is better for
GC usecases.
Andrew Mayorov %!s(int64=3) %!d(string=hai) anos
pai
achega
50c6eef2bc

+ 15 - 15
apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl

@@ -177,21 +177,21 @@ try_collect_transfer(Storage, Transfer, #{status := incomplete}, Stats) ->
 
 collect_fragments(Storage, Transfer, Stats) ->
     Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer, fragment),
-    collect_filepath(Dirname, true, Stats).
+    maybe_collect_directory(Dirname, true, Stats).
 
 collect_tempfiles(Storage, Transfer, Stats) ->
     Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer, temporary),
-    collect_filepath(Dirname, true, Stats).
+    maybe_collect_directory(Dirname, true, Stats).
 
 collect_outdated_fragments(Storage, Transfer, Cutoff, Stats) ->
     Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer, fragment),
     Filter = fun(_Filepath, #file_info{mtime = ModifiedAt}) -> ModifiedAt < Cutoff end,
-    collect_filepath(Dirname, Filter, Stats).
+    maybe_collect_directory(Dirname, Filter, Stats).
 
 collect_outdated_tempfiles(Storage, Transfer, Cutoff, Stats) ->
     Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer, temporary),
     Filter = fun(_Filepath, #file_info{mtime = ModifiedAt}) -> ModifiedAt < Cutoff end,
-    collect_filepath(Dirname, Filter, Stats).
+    maybe_collect_directory(Dirname, Filter, Stats).
 
 collect_transfer_directory(Storage, Transfer, Stats) ->
     Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer),
@@ -206,8 +206,6 @@ collect_parents(Dirname, Until, Stats) ->
         ok ->
             ?tp(garbage_collected_directory, #{path => Dirname}),
             collect_parents(Parent, Until, account_gcstat_directory(Stats));
-        {error, enoent} ->
-            collect_parents(Parent, Until, Stats);
         {error, eexist} ->
             Stats;
         {error, Reason} ->
@@ -222,16 +220,22 @@ collect_parents(Dirname, Until, Stats) ->
 %             Stats
 %     end.
 
+maybe_collect_directory(Dirpath, Filter, Stats) ->
+    case filelib:is_dir(Dirpath) of
+        true ->
+            collect_filepath(Dirpath, Filter, Stats);
+        false ->
+            {true, Stats}
+    end.
+
 -spec collect_filepath(file:name(), Filter, gcstats()) -> {boolean(), gcstats()} when
     Filter :: boolean() | fun((file:name(), file:file_info()) -> boolean()).
 collect_filepath(Filepath, Filter, Stats) ->
-    case file:read_file_info(Filepath, [{time, posix}]) of
+    case file:read_link_info(Filepath, [{time, posix}, raw]) of
         {ok, Fileinfo} ->
             collect_filepath(Filepath, Fileinfo, Filter, Stats);
-        {error, enoent} ->
-            {true, Stats};
         {error, Reason} ->
-            {false, register_gcstat_error({path, Filepath}, Reason, Stats)}
+            {Reason == enoent, register_gcstat_error({path, Filepath}, Reason, Stats)}
     end.
 
 collect_filepath(Filepath, #file_info{type = directory} = Fileinfo, Filter, Stats) ->
@@ -243,10 +247,8 @@ collect_filepath(Filepath, #file_info{type = regular} = Fileinfo, Filter, Stats)
         ok ->
             ?tp(garbage_collected_file, #{path => Filepath}),
             {true, account_gcstat(Fileinfo, Stats)};
-        {error, enoent} ->
-            {true, Stats};
         {error, Reason} ->
-            {false, register_gcstat_error({file, Filepath}, Reason, Stats)}
+            {Reason == enoent, register_gcstat_error({file, Filepath}, Reason, Stats)}
     end;
 collect_filepath(Filepath, Fileinfo, _Filter, Stats) ->
     {false, register_gcstat_error({file, Filepath}, {unexpected, Fileinfo}, Stats)}.
@@ -281,8 +283,6 @@ collect_empty_directory(Dirpath, Stats) ->
         ok ->
             ?tp(garbage_collected_directory, #{path => Dirpath}),
             account_gcstat_directory(Stats);
-        {error, enoent} ->
-            Stats;
         {error, Reason} ->
             register_gcstat_error({directory, Dirpath}, Reason, Stats)
     end.

+ 57 - 2
apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl

@@ -56,7 +56,7 @@ end_per_testcase(_TC, _Config) ->
     ok.
 
 mk_root(TC, Config) ->
-    filename:join([?config(priv_dir, Config), <<"file_transfer">>, TC, atom_to_binary(node())]).
+    filename:join([?config(priv_dir, Config), "file_transfer", TC, atom_to_list(node())]).
 
 %%
 
@@ -167,7 +167,6 @@ t_gc_complete_transfers(_Config) ->
 t_gc_incomplete_transfers(_Config) ->
     _ = application:set_env(emqx_ft, min_segments_ttl, 0),
     _ = application:set_env(emqx_ft, max_segments_ttl, 4),
-    ok = emqx_ft_storage_fs_gc:reset(emqx_ft_conf:storage()),
     Storage = emqx_ft_conf:storage(),
     Transfers = [
         {
@@ -241,6 +240,62 @@ t_gc_incomplete_transfers(_Config) ->
         []
     ).
 
+t_gc_handling_errors(_Config) ->
+    _ = application:set_env(emqx_ft, min_segments_ttl, 0),
+    _ = application:set_env(emqx_ft, max_segments_ttl, 0),
+    Storage = emqx_ft_conf:storage(),
+    Transfer1 = {<<"client1">>, mk_file_id()},
+    Transfer2 = {<<"client2">>, mk_file_id()},
+    Filemeta = #{name => "oops.pdf"},
+    Size = 420,
+    SegSize = 16,
+    _ = start_transfer(
+        Storage,
+        {Transfer1, Filemeta, emqx_ft_content_gen:new({?LINE, Size}, SegSize)}
+    ),
+    _ = start_transfer(
+        Storage,
+        {Transfer2, Filemeta, emqx_ft_content_gen:new({?LINE, Size}, SegSize)}
+    ),
+    % 1. Throw some chaos in the transfer directory.
+    DirFragment1 = emqx_ft_storage_fs:get_subdir(Storage, Transfer1, fragment),
+    DirTemporary1 = emqx_ft_storage_fs:get_subdir(Storage, Transfer1, temporary),
+    PathShadyLink = filename:join(DirTemporary1, "linked-here"),
+    ok = file:make_symlink(DirFragment1, PathShadyLink),
+    DirTransfer2 = emqx_ft_storage_fs:get_subdir(Storage, Transfer2),
+    PathTripUp = filename:join(DirTransfer2, "trip-up-here"),
+    ok = file:write_file(PathTripUp, <<"HAHA">>),
+    ok = timer:sleep(timer:seconds(1)),
+    % 2. Observe the errors are reported consistently.
+    ?check_trace(
+        ?assertMatch(
+            #gcstats{
+                files = Files,
+                directories = 3,
+                space = Space,
+                errors = #{
+                    % NOTE: dangling symlink looks like `enoent` for some reason
+                    {file, PathShadyLink} := {unexpected, _},
+                    {directory, DirTransfer2} := eexist
+                }
+            } when Files == ?NSEGS(Size, SegSize) * 2 andalso Space > Size * 2,
+            emqx_ft_storage_fs_gc:collect(Storage)
+        ),
+        fun(Trace) ->
+            ?assertMatch(
+                [
+                    #{
+                        errors := #{
+                            {file, PathShadyLink} := {unexpected, _},
+                            {directory, DirTransfer2} := eexist
+                        }
+                    }
+                ],
+                ?of_kind("garbage_collection_errors", Trace)
+            )
+        end
+    ).
+
 %%
 
 start_transfer(Storage, {Transfer, Meta, Gen}) ->