Bläddra i källkod

fix(fs-gc): add testcase on incomplete transfers GC

And fix logic that made GC delete incomplete segments prematurely.
Andrew Mayorov 3 år sedan
förälder
incheckning
344799f100

+ 4 - 2
apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl

@@ -164,7 +164,7 @@ try_collect_transfer(Storage, Transfer, #{status := incomplete}, Stats) ->
     % _and was empty at the start of GC_, as a precaution against races between
     % writers and GCs.
     TTL = get_segments_ttl(Storage, Transfer),
-    Cutoff = erlang:system_time(second) + TTL,
+    Cutoff = erlang:system_time(second) - TTL,
     {FragCleaned, Stats1} = collect_outdated_fragments(Storage, Transfer, Cutoff, Stats),
     {TempCleaned, Stats2} = collect_outdated_tempfiles(Storage, Transfer, Cutoff, Stats1),
     % TODO: collect empty directories separately
@@ -222,7 +222,7 @@ collect_parents(Dirname, Stats) ->
 -spec collect_filepath(file:name(), Filter, gcstats()) -> {boolean(), gcstats()} when
     Filter :: boolean() | fun((file:name(), file:file_info()) -> boolean()).
 collect_filepath(Filepath, Filter, Stats) ->
-    case file:read_file_info(Filepath) of
+    case file:read_file_info(Filepath, [{time, posix}]) of
         {ok, Fileinfo} ->
             collect_filepath(Filepath, Fileinfo, Filter, Stats);
         {error, enoent} ->
@@ -238,6 +238,7 @@ collect_filepath(Filepath, #file_info{type = regular} = Fileinfo, Filter, Stats)
         false ->
             {false, Stats};
         ok ->
+            ?tp(garbage_collected_file, #{path => Filepath}),
             {true, account_gcstat(Fileinfo, Stats)};
         {error, enoent} ->
             {true, Stats};
@@ -275,6 +276,7 @@ collect_files(Dirname, Filenames, Filter, Stats) ->
 collect_empty_directory(Dirpath, Stats) ->
     case file:del_dir(Dirpath) of
         ok ->
+            ?tp(garbage_collected_directory, #{path => Dirpath}),
             account_gcstat_directory(Stats);
         {error, enoent} ->
             Stats;

+ 89 - 15
apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl

@@ -60,8 +60,10 @@ mk_root(TC, Config) ->
 
 %%
 
+-define(NSEGS(Filesize, SegmentSize), (ceil(Filesize / SegmentSize) + 1)).
+
 t_gc_triggers_periodically(_Config) ->
-    Interval = 1000,
+    Interval = 500,
     ok = application:set_env(emqx_ft, gc_interval, Interval),
     ok = emqx_ft_storage_fs_gc:reset(emqx_ft_conf:storage()),
     ?check_trace(
@@ -103,17 +105,17 @@ t_gc_complete_transfers(_Config) ->
     Transfers = [
         {
             T1 = {<<"client1">>, mk_file_id()},
-            "cat.cur",
+            #{name => "cat.cur", segments_ttl => 10},
             emqx_ft_content_gen:new({?LINE, S1 = 42}, SS1 = 16)
         },
         {
             T2 = {<<"client2">>, mk_file_id()},
-            "cat.ico",
+            #{name => "cat.ico", segments_ttl => 10},
             emqx_ft_content_gen:new({?LINE, S2 = 420}, SS2 = 64)
         },
         {
             T3 = {<<"client42">>, mk_file_id()},
-            "cat.jpg",
+            #{name => "cat.jpg", segments_ttl => 10},
             emqx_ft_content_gen:new({?LINE, S3 = 42000}, SS3 = 1024)
         }
     ],
@@ -132,14 +134,13 @@ t_gc_complete_transfers(_Config) ->
         ok,
         complete_transfer(Storage, T1, S1)
     ),
-    GCFiles1 = ceil(S1 / SS1) + 1,
     ?assertMatch(
         #gcstats{
-            files = GCFiles1,
+            files = Files,
             directories = 2,
             space = Space,
             errors = #{} = Es
-        } when Space > S1 andalso map_size(Es) == 0,
+        } when Files == ?NSEGS(S1, SS1) andalso Space > S1 andalso map_size(Es) == 0,
         emqx_ft_storage_fs_gc:collect(Storage)
     ),
     % 3. Complete rest of transfers
@@ -150,8 +151,6 @@ t_gc_complete_transfers(_Config) ->
             [{T2, S2}, {T3, S3}]
         )
     ),
-    GCFiles2 = ceil(S2 / SS2) + 1,
-    GCFiles3 = ceil(S3 / SS3) + 1,
     ?assertMatch(
         #gcstats{
             files = Files,
@@ -159,17 +158,92 @@ t_gc_complete_transfers(_Config) ->
             space = Space,
             errors = #{} = Es
         } when
-            Files == (GCFiles2 + GCFiles3) andalso
+            Files == (?NSEGS(S2, SS2) + ?NSEGS(S3, SS3)) andalso
                 Space > (S2 + S3) andalso
                 map_size(Es) == 0,
         emqx_ft_storage_fs_gc:collect(Storage)
     ).
 
-start_transfer(Storage, {Transfer, Name, Gen}) ->
-    Meta = #{
-        name => Name,
-        segments_ttl => 10
-    },
+t_gc_incomplete_transfers(_Config) ->
+    _ = application:set_env(emqx_ft, min_segments_ttl, 0),
+    _ = application:set_env(emqx_ft, max_segments_ttl, 4),
+    ok = emqx_ft_storage_fs_gc:reset(emqx_ft_conf:storage()),
+    Storage = emqx_ft_conf:storage(),
+    Transfers = [
+        {
+            {<<"client43"/utf8>>, <<"file-🦕"/utf8>>},
+            #{name => "dog.cur", segments_ttl => 1},
+            emqx_ft_content_gen:new({?LINE, S1 = 123}, SS1 = 32)
+        },
+        {
+            {<<"client44">>, <<"file-🦖"/utf8>>},
+            #{name => "dog.ico", segments_ttl => 2},
+            emqx_ft_content_gen:new({?LINE, S2 = 456}, SS2 = 64)
+        },
+        {
+            {<<"client1337">>, <<"file-🦀"/utf8>>},
+            #{name => "dog.jpg", segments_ttl => 3000},
+            emqx_ft_content_gen:new({?LINE, S3 = 7890}, SS3 = 128)
+        },
+        {
+            {<<"client31337">>, <<"file-⏳"/utf8>>},
+            #{name => "dog.jpg"},
+            emqx_ft_content_gen:new({?LINE, S4 = 1230}, SS4 = 256)
+        }
+    ],
+    % 1. Start transfers, send all the segments but don't trigger completion.
+    _ = emqx_misc:pmap(fun(Transfer) -> start_transfer(Storage, Transfer) end, Transfers),
+    ?check_trace(
+        begin
+            % 2. Enable periodic GC every 0.5 seconds.
+            ok = application:set_env(emqx_ft, gc_interval, 500),
+            ok = emqx_ft_storage_fs_gc:reset(Storage),
+            % 3. First we need the first transfer to be collected.
+            {ok, _} = ?block_until(
+                #{
+                    ?snk_kind := garbage_collection,
+                    stats := #gcstats{
+                        files = Files,
+                        directories = 4,
+                        space = Space
+                    }
+                } when Files == (?NSEGS(S1, SS1)) andalso Space > S1,
+                5000,
+                0
+            ),
+            % 4. Then the second one.
+            {ok, _} = ?block_until(
+                #{
+                    ?snk_kind := garbage_collection,
+                    stats := #gcstats{
+                        files = Files,
+                        directories = 4,
+                        space = Space
+                    }
+                } when Files == (?NSEGS(S2, SS2)) andalso Space > S2,
+                5000,
+                0
+            ),
+            % 5. Then transfers 3 and 4 because 3rd has too big TTL and 4th has no specific TTL.
+            {ok, _} = ?block_until(
+                #{
+                    ?snk_kind := garbage_collection,
+                    stats := #gcstats{
+                        files = Files,
+                        directories = 4 * 2,
+                        space = Space
+                    }
+                } when Files == (?NSEGS(S3, SS3) + ?NSEGS(S4, SS4)) andalso Space > S3 + S4,
+                5000,
+                0
+            )
+        end,
+        []
+    ).
+
+%%
+
+start_transfer(Storage, {Transfer, Meta, Gen}) ->
     ?assertEqual(
         ok,
         emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta)