瀏覽代碼

feat(ft): add GC logic and process for the FS storage backend

Andrew Mayorov 3 年之前
父節點
當前提交
715816e67b

+ 1 - 0
apps/emqx/test/emqx_common_test_helpers.erl

@@ -30,6 +30,7 @@
     start_apps/1,
     start_apps/1,
     start_apps/2,
     start_apps/2,
     start_apps/3,
     start_apps/3,
+    start_app/2,
     stop_apps/1,
     stop_apps/1,
     reload/2,
     reload/2,
     app_path/2,
     app_path/2,

+ 29 - 0
apps/emqx_ft/include/emqx_ft_storage_fs.hrl

@@ -0,0 +1,29 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-ifndef(EMQX_FT_STORAGE_FS_HRL).
+-define(EMQX_FT_STORAGE_FS_HRL, true).
+
+-record(gcstats, {
+    started_at :: integer(),
+    finished_at :: integer() | undefined,
+    files = 0 :: non_neg_integer(),
+    directories = 0 :: non_neg_integer(),
+    space = 0 :: non_neg_integer(),
+    errors = #{} :: #{_GCSubject => {error, _}}
+}).
+
+-endif.

+ 17 - 0
apps/emqx_ft/src/emqx_ft_assembler.erl

@@ -126,6 +126,7 @@ handle_event(internal, _, {assemble, []}, St = #st{}) ->
 handle_event(internal, _, complete, St = #st{assembly = Asm, file = Handle}) ->
 handle_event(internal, _, complete, St = #st{assembly = Asm, file = Handle}) ->
     Filemeta = emqx_ft_assembly:filemeta(Asm),
     Filemeta = emqx_ft_assembly:filemeta(Asm),
     Result = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle),
     Result = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle),
+    ok = maybe_garbage_collect(Result, St),
     {stop, {shutdown, Result}}.
     {stop, {shutdown, Result}}.
 
 
 pread(Node, Segment, St) when Node =:= node() ->
 pread(Node, Segment, St) when Node =:= node() ->
@@ -135,5 +136,21 @@ pread(Node, Segment, St) ->
 
 
 %%
 %%
 
 
+maybe_garbage_collect(ok, St = #st{storage = Storage, transfer = Transfer}) ->
+    Nodes = get_coverage_nodes(St),
+    emqx_ft_storage_fs_gc:collect(Storage, Transfer, Nodes);
+maybe_garbage_collect({error, _}, _St) ->
+    ok.
+
+get_coverage_nodes(St) ->
+    Coverage = emqx_ft_assembly:coverage(St#st.assembly),
+    ordsets:to_list(
+        lists:foldl(
+            fun({Node, _Segment}, Acc) -> ordsets:add_element(Node, Acc) end,
+            ordsets:new(),
+            Coverage
+        )
+    ).
+
 segsize(#{fragment := {segment, Info}}) ->
 segsize(#{fragment := {segment, Info}}) ->
     maps:get(size, Info).
     maps:get(size, Info).

+ 29 - 0
apps/emqx_ft/src/emqx_ft_conf.erl

@@ -20,6 +20,11 @@
 
 
 -behaviour(emqx_config_handler).
 -behaviour(emqx_config_handler).
 
 
+%% Accessors
+-export([storage/0]).
+-export([gc_interval/1]).
+-export([segments_ttl/1]).
+
 %% Load/Unload
 %% Load/Unload
 -export([
 -export([
     load/0,
     load/0,
@@ -32,6 +37,30 @@
     post_config_update/5
     post_config_update/5
 ]).
 ]).
 
 
+-type milliseconds() :: non_neg_integer().
+-type seconds() :: non_neg_integer().
+
+%%--------------------------------------------------------------------
+%% Accessors
+%%--------------------------------------------------------------------
+
+-spec storage() -> _Storage | disabled.
+storage() ->
+    emqx_config:get([file_transfer, storage], disabled).
+
+-spec gc_interval(_Storage) -> milliseconds().
+gc_interval(_Storage) ->
+    % TODO: config wiring
+    application:get_env(emqx_ft, gc_interval, timer:minutes(10)).
+
+-spec segments_ttl(_Storage) -> {_Min :: seconds(), _Max :: seconds()}.
+segments_ttl(_Storage) ->
+    % TODO: config wiring
+    {
+        application:get_env(emqx_ft, min_segments_ttl, 60),
+        application:get_env(emqx_ft, max_segments_ttl, 72 * 3600)
+    }.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% API
 %% API
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------

+ 19 - 2
apps/emqx_ft/src/emqx_ft_storage.erl

@@ -18,6 +18,8 @@
 
 
 -export(
 -export(
     [
     [
+        child_spec/0,
+
         store_filemeta/2,
         store_filemeta/2,
         store_segment/2,
         store_segment/2,
         assemble/2,
         assemble/2,
@@ -64,6 +66,17 @@
 %% API
 %% API
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
+-spec child_spec() ->
+    [supervisor:child_spec()].
+child_spec() ->
+    try
+        Mod = mod(),
+        Mod:child_spec(storage())
+    catch
+        error:disabled -> [];
+        error:undef -> []
+    end.
+
 -spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) ->
 -spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) ->
     ok | {async, pid()} | {error, term()}.
     ok | {async, pid()} | {error, term()}.
 store_filemeta(Transfer, FileMeta) ->
 store_filemeta(Transfer, FileMeta) ->
@@ -99,6 +112,8 @@ with_storage_type(Type, Fun, Args) ->
         #{type := Type} ->
         #{type := Type} ->
             Mod = mod(Storage),
             Mod = mod(Storage),
             apply(Mod, Fun, [Storage | Args]);
             apply(Mod, Fun, [Storage | Args]);
+        disabled ->
+            {error, disabled};
         _ ->
         _ ->
             {error, {invalid_storage_type, Type}}
             {error, {invalid_storage_type, Type}}
     end.
     end.
@@ -108,7 +123,7 @@ with_storage_type(Type, Fun, Args) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 storage() ->
 storage() ->
-    emqx_config:get([file_transfer, storage]).
+    emqx_ft_conf:storage().
 
 
 mod() ->
 mod() ->
     mod(storage()).
     mod(storage()).
@@ -116,6 +131,8 @@ mod() ->
 mod(Storage) ->
 mod(Storage) ->
     case Storage of
     case Storage of
         #{type := local} ->
         #{type := local} ->
-            emqx_ft_storage_fs
+            emqx_ft_storage_fs;
+        disabled ->
+            error(disabled)
         % emqx_ft_storage_dummy
         % emqx_ft_storage_dummy
     end.
     end.

+ 64 - 16
apps/emqx_ft/src/emqx_ft_storage_fs.erl

@@ -14,6 +14,12 @@
 %% limitations under the License.
 %% limitations under the License.
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
+%% Filesystem storage backend
+%%
+%% NOTE
+%% If you plan to change storage layout please consult `emqx_ft_storage_fs_gc`
+%% to see how much it would break or impair GC.
+
 -module(emqx_ft_storage_fs).
 -module(emqx_ft_storage_fs).
 
 
 -behaviour(emqx_ft_storage).
 -behaviour(emqx_ft_storage).
@@ -21,14 +27,22 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/trace.hrl").
 -include_lib("snabbkaffe/include/trace.hrl").
 
 
+-export([child_spec/1]).
+
 -export([store_filemeta/3]).
 -export([store_filemeta/3]).
 -export([store_segment/3]).
 -export([store_segment/3]).
+-export([read_filemeta/2]).
 -export([list/3]).
 -export([list/3]).
 -export([pread/5]).
 -export([pread/5]).
 -export([assemble/3]).
 -export([assemble/3]).
 
 
 -export([transfers/1]).
 -export([transfers/1]).
 
 
+% GC API
+% TODO: This is quickly becomes hairy.
+-export([get_subdir/2]).
+-export([get_subdir/3]).
+
 -export([ready_transfers_local/1]).
 -export([ready_transfers_local/1]).
 -export([get_ready_transfer_local/3]).
 -export([get_ready_transfer_local/3]).
 
 
@@ -40,6 +54,7 @@
 -export([write/2]).
 -export([write/2]).
 -export([discard/1]).
 -export([discard/1]).
 
 
+-export_type([storage/0]).
 -export_type([filefrag/1]).
 -export_type([filefrag/1]).
 -export_type([filefrag/0]).
 -export_type([filefrag/0]).
 -export_type([transferinfo/0]).
 -export_type([transferinfo/0]).
@@ -79,7 +94,9 @@
 -define(MANIFEST, "MANIFEST.json").
 -define(MANIFEST, "MANIFEST.json").
 -define(SEGMENT, "SEG").
 -define(SEGMENT, "SEG").
 
 
--type storage() :: emqx_ft_storage:storage().
+-type storage() :: #{
+    root => file:name()
+}.
 
 
 -type file_error() ::
 -type file_error() ::
     file:posix()
     file:posix()
@@ -88,13 +105,25 @@
     %% System limit (e.g. number of ports) reached.
     %% System limit (e.g. number of ports) reached.
     | system_limit.
     | system_limit.
 
 
+%% Related resources childspecs
+-spec child_spec(storage()) ->
+    [supervisor:child_spec()].
+child_spec(Storage) ->
+    [
+        #{
+            id => emqx_ft_storage_fs_gc,
+            start => {emqx_ft_storage_fs_gc, start_link, [Storage]},
+            restart => permanent
+        }
+    ].
+
 %% Store manifest in the backing filesystem.
 %% Store manifest in the backing filesystem.
 %% Atomic operation.
 %% Atomic operation.
 -spec store_filemeta(storage(), transfer(), filemeta()) ->
 -spec store_filemeta(storage(), transfer(), filemeta()) ->
     % Quota? Some lower level errors?
     % Quota? Some lower level errors?
     ok | {error, conflict} | {error, file_error()}.
     ok | {error, conflict} | {error, file_error()}.
 store_filemeta(Storage, Transfer, Meta) ->
 store_filemeta(Storage, Transfer, Meta) ->
-    Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], ?MANIFEST),
+    Filepath = mk_filepath(Storage, Transfer, get_subdirs_for(fragment), ?MANIFEST),
     case read_file(Filepath, fun decode_filemeta/1) of
     case read_file(Filepath, fun decode_filemeta/1) of
         {ok, Meta} ->
         {ok, Meta} ->
             _ = touch_file(Filepath),
             _ = touch_file(Filepath),
@@ -119,9 +148,16 @@ store_filemeta(Storage, Transfer, Meta) ->
     % Quota? Some lower level errors?
     % Quota? Some lower level errors?
     ok | {error, file_error()}.
     ok | {error, file_error()}.
 store_segment(Storage, Transfer, Segment = {_Offset, Content}) ->
 store_segment(Storage, Transfer, Segment = {_Offset, Content}) ->
-    Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], mk_segment_filename(Segment)),
+    Filename = mk_segment_filename(Segment),
+    Filepath = mk_filepath(Storage, Transfer, get_subdirs_for(fragment), Filename),
     write_file_atomic(Storage, Transfer, Filepath, Content).
     write_file_atomic(Storage, Transfer, Filepath, Content).
 
 
+-spec read_filemeta(storage(), transfer()) ->
+    {ok, filefrag({filemeta, filemeta()})} | {error, corrupted} | {error, file_error()}.
+read_filemeta(Storage, Transfer) ->
+    Filepath = mk_filepath(Storage, Transfer, get_subdirs_for(fragment), ?MANIFEST),
+    read_file(Filepath, fun decode_filemeta/1).
+
 -spec list(storage(), transfer(), _What :: fragment | result) ->
 -spec list(storage(), transfer(), _What :: fragment | result) ->
     % Some lower level errors? {error, notfound}?
     % Some lower level errors? {error, notfound}?
     % Result will contain zero or only one filemeta.
     % Result will contain zero or only one filemeta.
@@ -143,11 +179,6 @@ list(Storage, Transfer, What) ->
             Error
             Error
     end.
     end.
 
 
-get_subdirs_for(fragment) ->
-    [?FRAGDIR];
-get_subdirs_for(result) ->
-    [?RESULTDIR].
-
 get_filefrag_fun_for(fragment) ->
 get_filefrag_fun_for(fragment) ->
     fun mk_filefrag/2;
     fun mk_filefrag/2;
 get_filefrag_fun_for(result) ->
 get_filefrag_fun_for(result) ->
@@ -329,6 +360,23 @@ read_transferinfo(Storage, Transfer, Acc) ->
             Acc
             Acc
     end.
     end.
 
 
+-spec get_subdir(storage(), transfer()) ->
+    file:name().
+get_subdir(Storage, Transfer) ->
+    mk_filedir(Storage, Transfer, []).
+
+-spec get_subdir(storage(), transfer(), fragment | temporary | result) ->
+    file:name().
+get_subdir(Storage, Transfer, What) ->
+    mk_filedir(Storage, Transfer, get_subdirs_for(What)).
+
+get_subdirs_for(fragment) ->
+    [?FRAGDIR];
+get_subdirs_for(temporary) ->
+    [?TEMPDIR];
+get_subdirs_for(result) ->
+    [?RESULTDIR].
+
 %%
 %%
 
 
 -type handle() :: {file:name(), io:device(), crypto:hash_state()}.
 -type handle() :: {file:name(), io:device(), crypto:hash_state()}.
@@ -341,7 +389,7 @@ open_file(Storage, Transfer, Filemeta) ->
     _ = filelib:ensure_dir(TempFilepath),
     _ = filelib:ensure_dir(TempFilepath),
     case file:open(TempFilepath, [write, raw, binary]) of
     case file:open(TempFilepath, [write, raw, binary]) of
         {ok, Handle} ->
         {ok, Handle} ->
-            _ = file:truncate(Handle),
+            % TODO: preserve filemeta
             {ok, {TempFilepath, Handle, init_checksum(Filemeta)}};
             {ok, {TempFilepath, Handle, init_checksum(Filemeta)}};
         {error, _} = Error ->
         {error, _} = Error ->
             Error
             Error
@@ -359,8 +407,8 @@ write({Filepath, IoDevice, Ctx}, IoData) ->
 
 
 -spec complete(storage(), transfer(), filemeta(), handle()) ->
 -spec complete(storage(), transfer(), filemeta(), handle()) ->
     ok | {error, {checksum, _Algo, _Computed}} | {error, file_error()}.
     ok | {error, {checksum, _Algo, _Computed}} | {error, file_error()}.
-complete(Storage, Transfer, Filemeta, Handle = {Filepath, IoDevice, Ctx}) ->
-    TargetFilepath = mk_filepath(Storage, Transfer, [?RESULTDIR], maps:get(name, Filemeta)),
+complete(Storage, Transfer, Filemeta = #{name := Filename}, Handle = {Filepath, IoDevice, Ctx}) ->
+    TargetFilepath = mk_filepath(Storage, Transfer, get_subdirs_for(result), Filename),
     case verify_checksum(Ctx, Filemeta) of
     case verify_checksum(Ctx, Filemeta) of
         ok ->
         ok ->
             ok = file:close(IoDevice),
             ok = file:close(IoDevice),
@@ -491,7 +539,7 @@ write_file_atomic(Storage, Transfer, Filepath, Content) when is_binary(Content)
 
 
 mk_temp_filepath(Storage, Transfer, Filename) ->
 mk_temp_filepath(Storage, Transfer, Filename) ->
     Unique = erlang:unique_integer([positive]),
     Unique = erlang:unique_integer([positive]),
-    filename:join(mk_filedir(Storage, Transfer, [?TEMPDIR]), mk_filename([Unique, ".", Filename])).
+    filename:join(get_subdir(Storage, Transfer, temporary), mk_filename([Unique, ".", Filename])).
 
 
 mk_filename(Comps) ->
 mk_filename(Comps) ->
     lists:append(lists:map(fun mk_filename_component/1, Comps)).
     lists:append(lists:map(fun mk_filename_component/1, Comps)).
@@ -516,9 +564,9 @@ filtermap_files(Fun, Dirname, Filenames) ->
     lists:filtermap(fun(Filename) -> Fun(Dirname, Filename) end, Filenames).
     lists:filtermap(fun(Filename) -> Fun(Dirname, Filename) end, Filenames).
 
 
 mk_filefrag(Dirname, Filename = ?MANIFEST) ->
 mk_filefrag(Dirname, Filename = ?MANIFEST) ->
-    mk_filefrag(Dirname, Filename, filemeta, fun read_filemeta/2);
+    mk_filefrag(Dirname, Filename, filemeta, fun read_frag_filemeta/2);
 mk_filefrag(Dirname, Filename = ?SEGMENT ++ _) ->
 mk_filefrag(Dirname, Filename = ?SEGMENT ++ _) ->
-    mk_filefrag(Dirname, Filename, segment, fun read_segmentinfo/2);
+    mk_filefrag(Dirname, Filename, segment, fun read_frag_segmentinfo/2);
 mk_filefrag(_Dirname, _Filename) ->
 mk_filefrag(_Dirname, _Filename) ->
     ?tp(warning, "rogue_file_found", #{
     ?tp(warning, "rogue_file_found", #{
         directory => _Dirname,
         directory => _Dirname,
@@ -554,10 +602,10 @@ mk_filefrag(Dirname, Filename, Tag, Fun) ->
             false
             false
     end.
     end.
 
 
-read_filemeta(_Filename, Filepath) ->
+read_frag_filemeta(_Filename, Filepath) ->
     read_file(Filepath, fun decode_filemeta/1).
     read_file(Filepath, fun decode_filemeta/1).
 
 
-read_segmentinfo(Filename, _Filepath) ->
+read_frag_segmentinfo(Filename, _Filepath) ->
     break_segment_filename(Filename).
     break_segment_filename(Filename).
 
 
 filename_to_binary(S) when is_list(S) -> unicode:characters_to_binary(S);
 filename_to_binary(S) when is_list(S) -> unicode:characters_to_binary(S);

+ 337 - 0
apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl

@@ -0,0 +1,337 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% Filesystem storage GC
+%%
+%% This is conceptually a part of the Filesystem storage backend, even
+%% though it's tied to the backend module with somewhat narrow interface.
+
+-module(emqx_ft_storage_fs_gc).
+
+-include_lib("emqx_ft/include/emqx_ft_storage_fs.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx/include/types.hrl").
+-include_lib("kernel/include/file.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
+
+-export([start_link/1]).
+
+-export([collect/1]).
+-export([collect/3]).
+-export([reset/1]).
+
+-behaviour(gen_server).
+-export([init/1]).
+-export([handle_call/3]).
+-export([handle_cast/2]).
+-export([handle_info/2]).
+
+-record(st, {
+    storage :: emqx_ft_storage_fs:storage(),
+    next_gc_timer :: maybe(reference()),
+    last_gc :: maybe(gcstats())
+}).
+
+-type gcstats() :: #gcstats{}.
+
+%%
+
+start_link(Storage) ->
+    gen_server:start_link(mk_server_ref(Storage), ?MODULE, Storage, []).
+
+-spec collect(emqx_ft_storage_fs:storage()) -> gcstats().
+collect(Storage) ->
+    gen_server:call(mk_server_ref(Storage), {collect, erlang:system_time()}, infinity).
+
+-spec reset(emqx_ft_storage_fs:storage()) -> ok.
+reset(Storage) ->
+    gen_server:cast(mk_server_ref(Storage), reset).
+
+collect(Storage, Transfer, Nodes) ->
+    gen_server:cast(mk_server_ref(Storage), {collect, Transfer, Nodes}).
+
+mk_server_ref(Storage) ->
+    % TODO
+    {via, gproc, {n, l, {?MODULE, get_storage_root(Storage)}}}.
+
+%%
+
+init(Storage) ->
+    St = #st{storage = Storage},
+    {ok, start_timer(St)}.
+
+handle_call({collect, CalledAt}, _From, St) ->
+    StNext = maybe_collect_garbage(CalledAt, St),
+    {reply, StNext#st.last_gc, StNext};
+handle_call(Call, From, St) ->
+    ?SLOG(error, #{msg => "unexpected_call", call => Call, from => From}),
+    {noreply, St}.
+
+% TODO
+% handle_cast({collect, Transfer, [Node | Rest]}, St) ->
+%     ok = do_collect_transfer(Transfer, Node, St),
+%     ok = collect(self(), Transfer, Rest),
+%     {noreply, St};
+handle_cast(reset, St) ->
+    {noreply, reset_timer(St)};
+handle_cast(Cast, St) ->
+    ?SLOG(error, #{msg => "unexpected_cast", cast => Cast}),
+    {noreply, St}.
+
+handle_info({timeout, TRef, collect}, St = #st{next_gc_timer = TRef}) ->
+    StNext = do_collect_garbage(St),
+    {noreply, start_timer(StNext#st{next_gc_timer = undefined})}.
+
+% do_collect_transfer(Transfer, Node, St = #st{storage = Storage}) when Node == node() ->
+%     Stats = try_collect_transfer(Storage, Transfer, complete, init_gcstats()),
+%     ok = maybe_report(Stats, St),
+%     ok.
+
+maybe_collect_garbage(_CalledAt, St = #st{last_gc = undefined}) ->
+    do_collect_garbage(St);
+maybe_collect_garbage(CalledAt, St = #st{last_gc = #gcstats{finished_at = FinishedAt}}) ->
+    case FinishedAt > CalledAt of
+        true ->
+            St;
+        false ->
+            reset_timer(do_collect_garbage(St))
+    end.
+
+do_collect_garbage(St = #st{storage = Storage}) ->
+    Stats = collect_garbage(Storage),
+    ok = maybe_report(Stats, St),
+    St#st{last_gc = Stats}.
+
+maybe_report(#gcstats{errors = Errors}, #st{storage = Storage}) when map_size(Errors) > 0 ->
+    ?tp(warning, "garbage_collection_errors", #{errors => Errors, storage => Storage});
+maybe_report(#gcstats{} = _Stats, #st{storage = _Storage}) ->
+    ?tp(garbage_collection, #{stats => _Stats, storage => _Storage}).
+
+start_timer(St = #st{next_gc_timer = undefined}) ->
+    Delay = emqx_ft_conf:gc_interval(St#st.storage),
+    St#st{next_gc_timer = emqx_misc:start_timer(Delay, collect)}.
+
+reset_timer(St = #st{next_gc_timer = undefined}) ->
+    start_timer(St);
+reset_timer(St = #st{next_gc_timer = TRef}) ->
+    ok = emqx_misc:cancel_timer(TRef),
+    start_timer(St#st{next_gc_timer = undefined}).
+
+%%
+
+collect_garbage(Storage) ->
+    Stats = init_gcstats(),
+    {ok, Transfers} = emqx_ft_storage_fs:transfers(Storage),
+    collect_garbage(Storage, Transfers, Stats).
+
+collect_garbage(Storage, Transfers, Stats) ->
+    finish_gcstats(
+        maps:fold(
+            fun(Transfer, TransferInfo, StatsAcc) ->
+                % TODO: throttling?
+                try_collect_transfer(Storage, Transfer, TransferInfo, StatsAcc)
+            end,
+            Stats,
+            Transfers
+        )
+    ).
+
+try_collect_transfer(Storage, Transfer, #{status := complete}, Stats) ->
+    % File transfer is complete.
+    % We should be good to delete fragments and temporary files with their respective
+    % directories altogether.
+    % TODO: file expiration
+    {_, Stats1} = collect_fragments(Storage, Transfer, Stats),
+    {_, Stats2} = collect_tempfiles(Storage, Transfer, Stats1),
+    Stats2;
+try_collect_transfer(Storage, Transfer, #{status := incomplete}, Stats) ->
+    % File transfer is still incomplete.
+    % Any outdated fragments and temporary files should be collectable. As a kind of
+    % heuristic we only delete transfer directory itself only if it is also outdated
+    % _and was empty at the start of GC_, as a precaution against races between
+    % writers and GCs.
+    TTL = get_segments_ttl(Storage, Transfer),
+    Cutoff = erlang:system_time(second) + TTL,
+    {FragCleaned, Stats1} = collect_outdated_fragments(Storage, Transfer, Cutoff, Stats),
+    {TempCleaned, Stats2} = collect_outdated_tempfiles(Storage, Transfer, Cutoff, Stats1),
+    % TODO: collect empty directories separately
+    case FragCleaned and TempCleaned of
+        true ->
+            collect_transfer_directory(Storage, Transfer, Stats2);
+        false ->
+            Stats2
+    end.
+
+collect_fragments(Storage, Transfer, Stats) ->
+    Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer, fragment),
+    collect_filepath(Dirname, true, Stats).
+
+collect_tempfiles(Storage, Transfer, Stats) ->
+    Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer, temporary),
+    collect_filepath(Dirname, true, Stats).
+
+collect_outdated_fragments(Storage, Transfer, Cutoff, Stats) ->
+    Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer, fragment),
+    Filter = fun(_Filepath, #file_info{mtime = ModifiedAt}) -> ModifiedAt < Cutoff end,
+    collect_filepath(Dirname, Filter, Stats).
+
+collect_outdated_tempfiles(Storage, Transfer, Cutoff, Stats) ->
+    Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer, temporary),
+    Filter = fun(_Filepath, #file_info{mtime = ModifiedAt}) -> ModifiedAt < Cutoff end,
+    collect_filepath(Dirname, Filter, Stats).
+
+collect_transfer_directory(Storage, Transfer, Stats) ->
+    Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer),
+    StatsNext = collect_empty_directory(Dirname, Stats),
+    collect_parents(Dirname, StatsNext).
+
+collect_parents(Dirname, Stats) ->
+    Parent = filename:dirname(Dirname),
+    case file:del_dir(Parent) of
+        ok ->
+            collect_parents(Parent, account_gcstat_directory(Stats));
+        {error, enoent} ->
+            collect_parents(Parent, Stats);
+        {error, eexist} ->
+            Stats;
+        {error, Reason} ->
+            register_gcstat_error({directory, Parent}, Reason, Stats)
+    end.
+
+% collect_outdated_fragment(#{path := Filepath, fileinfo := Fileinfo}, Cutoff, Stats) ->
+%     case Fileinfo#file_info.mtime of
+%         ModifiedAt when ModifiedAt < Cutoff ->
+%             collect_filepath(Filepath, Fileinfo, Stats);
+%         _ ->
+%             Stats
+%     end.
+
+-spec collect_filepath(file:name(), Filter, gcstats()) -> {boolean(), gcstats()} when
+    Filter :: boolean() | fun((file:name(), file:file_info()) -> boolean()).
+collect_filepath(Filepath, Filter, Stats) ->
+    case file:read_file_info(Filepath) of
+        {ok, Fileinfo} ->
+            collect_filepath(Filepath, Fileinfo, Filter, Stats);
+        {error, enoent} ->
+            {true, Stats};
+        {error, Reason} ->
+            {false, register_gcstat_error({path, Filepath}, Reason, Stats)}
+    end.
+
+collect_filepath(Filepath, #file_info{type = directory} = Fileinfo, Filter, Stats) ->
+    collect_directory(Filepath, Fileinfo, Filter, Stats);
+collect_filepath(Filepath, #file_info{type = regular} = Fileinfo, Filter, Stats) ->
+    case filter_filepath(Filter, Filepath, Fileinfo) andalso file:delete(Filepath, [raw]) of
+        false ->
+            {false, Stats};
+        ok ->
+            {true, account_gcstat(Fileinfo, Stats)};
+        {error, enoent} ->
+            {true, Stats};
+        {error, Reason} ->
+            {false, register_gcstat_error({file, Filepath}, Reason, Stats)}
+    end;
+collect_filepath(Filepath, Fileinfo, _Filter, Stats) ->
+    {false, register_gcstat_error({file, Filepath}, {unexpected, Fileinfo}, Stats)}.
+
+collect_directory(Dirpath, Fileinfo, Filter, Stats) ->
+    case file:list_dir(Dirpath) of
+        {ok, Filenames} ->
+            {Clean, StatsNext} = collect_files(Dirpath, Filenames, Filter, Stats),
+            case Clean andalso filter_filepath(Filter, Dirpath, Fileinfo) of
+                true ->
+                    {true, collect_empty_directory(Dirpath, StatsNext)};
+                _ ->
+                    {false, StatsNext}
+            end;
+        {error, Reason} ->
+            {false, register_gcstat_error({directory, Dirpath}, Reason, Stats)}
+    end.
+
+collect_files(Dirname, Filenames, Filter, Stats) ->
+    lists:foldl(
+        fun(Filename, {Complete, StatsAcc}) ->
+            Filepath = filename:join(Dirname, Filename),
+            {Collected, StatsNext} = collect_filepath(Filepath, Filter, StatsAcc),
+            {Collected andalso Complete, StatsNext}
+        end,
+        {true, Stats},
+        Filenames
+    ).
+
+collect_empty_directory(Dirpath, Stats) ->
+    case file:del_dir(Dirpath) of
+        ok ->
+            account_gcstat_directory(Stats);
+        {error, enoent} ->
+            Stats;
+        {error, Reason} ->
+            register_gcstat_error({directory, Dirpath}, Reason, Stats)
+    end.
+
+filter_filepath(Filter, _, _) when is_boolean(Filter) ->
+    Filter;
+filter_filepath(Filter, Filepath, Fileinfo) when is_function(Filter) ->
+    Filter(Filepath, Fileinfo).
+
+get_segments_ttl(Storage, Transfer) ->
+    {MinTTL, MaxTTL} = emqx_ft_conf:segments_ttl(Storage),
+    clamp(MinTTL, MaxTTL, try_get_filemeta_ttl(Storage, Transfer)).
+
+try_get_filemeta_ttl(Storage, Transfer) ->
+    case emqx_ft_storage_fs:read_filemeta(Storage, Transfer) of
+        {ok, Filemeta} ->
+            maps:get(segments_ttl, Filemeta, undefined);
+        {error, _} ->
+            undefined
+    end.
+
+clamp(Min, Max, V) ->
+    min(Max, max(Min, V)).
+
+% try_collect(_Subject, ok = Result, Then, _Stats) ->
+%     Then(Result);
+% try_collect(_Subject, {ok, Result}, Then, _Stats) ->
+%     Then(Result);
+% try_collect(Subject, {error, _} = Error, _Then, Stats) ->
+%     register_gcstat_error(Subject, Error, Stats).
+
+%%
+
+init_gcstats() ->
+    #gcstats{started_at = erlang:system_time()}.
+
+finish_gcstats(Stats) ->
+    Stats#gcstats{finished_at = erlang:system_time()}.
+
+account_gcstat(Fileinfo, Stats = #gcstats{files = Files, space = Space}) ->
+    Stats#gcstats{
+        files = Files + 1,
+        space = Space + Fileinfo#file_info.size
+    }.
+
+account_gcstat_directory(Stats = #gcstats{directories = Directories}) ->
+    Stats#gcstats{
+        directories = Directories + 1
+    }.
+
+register_gcstat_error(Subject, Error, Stats = #gcstats{errors = Errors}) ->
+    Stats#gcstats{errors = Errors#{Subject => Error}}.
+
+%%
+
+get_storage_root(Storage) ->
+    maps:get(root, Storage, filename:join(emqx:data_dir(), "file_transfer")).

+ 1 - 1
apps/emqx_ft/src/emqx_ft_sup.erl

@@ -61,5 +61,5 @@ init([]) ->
         modules => [emqx_ft_responder_sup]
         modules => [emqx_ft_responder_sup]
     },
     },
 
 
-    ChildSpecs = [Responder, AssemblerSup, FileReaderSup],
+    ChildSpecs = [Responder, AssemblerSup, FileReaderSup | emqx_ft_storage:child_spec()],
     {ok, {SupFlags, ChildSpecs}}.
     {ok, {SupFlags, ChildSpecs}}.

+ 205 - 0
apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl

@@ -0,0 +1,205 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_storage_fs_gc_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("emqx_ft/include/emqx_ft_storage_fs.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("stdlib/include/assert.hrl").
+-include_lib("snabbkaffe/include/test_macros.hrl").
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    _ = application:load(emqx_ft),
+    ok = emqx_common_test_helpers:start_apps([]),
+    Config.
+
+end_per_suite(_Config) ->
+    ok = emqx_common_test_helpers:stop_apps([]),
+    ok.
+
+init_per_testcase(TC, Config) ->
+    _ = application:unset_env(emqx_ft, gc_interval),
+    _ = application:unset_env(emqx_ft, min_segments_ttl),
+    _ = application:unset_env(emqx_ft, max_segments_ttl),
+    ok = emqx_common_test_helpers:start_app(
+        emqx_ft,
+        fun(emqx_ft) ->
+            ok = emqx_config:put([file_transfer, storage], #{
+                type => local,
+                root => mk_root(TC, Config)
+            })
+        end
+    ),
+    Config.
+
+end_per_testcase(_TC, _Config) ->
+    ok = application:stop(emqx_ft),
+    ok.
+
+mk_root(TC, Config) ->
+    filename:join([?config(priv_dir, Config), <<"file_transfer">>, TC, atom_to_binary(node())]).
+
+%%
+
+t_gc_triggers_periodically(_Config) ->
+    Interval = 1000,
+    ok = application:set_env(emqx_ft, gc_interval, Interval),
+    ok = emqx_ft_storage_fs_gc:reset(emqx_ft_conf:storage()),
+    ?check_trace(
+        timer:sleep(Interval * 3),
+        fun(Trace) ->
+            [Event, _ | _] = ?of_kind(garbage_collection, Trace),
+            ?assertMatch(
+                #{
+                    stats := #gcstats{
+                        files = 0,
+                        directories = 0,
+                        space = 0,
+                        errors = #{} = Errors
+                    }
+                } when map_size(Errors) == 0,
+                Event
+            )
+        end
+    ).
+
+t_gc_triggers_manually(_Config) ->
+    ?check_trace(
+        ?assertMatch(
+            #gcstats{files = 0, directories = 0, space = 0, errors = #{} = Errors} when
+                map_size(Errors) == 0,
+            emqx_ft_storage_fs_gc:collect(emqx_ft_conf:storage())
+        ),
+        fun(Trace) ->
+            [Event] = ?of_kind(garbage_collection, Trace),
+            ?assertMatch(
+                #{stats := #gcstats{}},
+                Event
+            )
+        end
+    ).
+
+t_gc_complete_transfers(_Config) ->
+    Storage = emqx_ft_conf:storage(),
+    Transfers = [
+        {
+            T1 = {<<"client1">>, mk_file_id()},
+            "cat.cur",
+            emqx_ft_content_gen:new({?LINE, S1 = 42}, SS1 = 16)
+        },
+        {
+            T2 = {<<"client2">>, mk_file_id()},
+            "cat.ico",
+            emqx_ft_content_gen:new({?LINE, S2 = 420}, SS2 = 64)
+        },
+        {
+            T3 = {<<"client42">>, mk_file_id()},
+            "cat.jpg",
+            emqx_ft_content_gen:new({?LINE, S3 = 42000}, SS3 = 1024)
+        }
+    ],
+    % 1. Start all transfers
+    TransferSizes = emqx_misc:pmap(
+        fun(Transfer) -> start_transfer(Storage, Transfer) end,
+        Transfers
+    ),
+    ?assertEqual([S1, S2, S3], TransferSizes),
+    ?assertMatch(
+        #gcstats{files = 0, directories = 0, errors = #{} = Es} when map_size(Es) == 0,
+        emqx_ft_storage_fs_gc:collect(Storage)
+    ),
+    % 2. Complete just the first transfer
+    ?assertEqual(
+        ok,
+        complete_transfer(Storage, T1, S1)
+    ),
+    GCFiles1 = ceil(S1 / SS1) + 1,
+    ?assertMatch(
+        #gcstats{
+            files = GCFiles1,
+            directories = 2,
+            space = Space,
+            errors = #{} = Es
+        } when Space > S1 andalso map_size(Es) == 0,
+        emqx_ft_storage_fs_gc:collect(Storage)
+    ),
+    % 3. Complete rest of transfers
+    ?assertEqual(
+        [ok, ok],
+        emqx_misc:pmap(
+            fun({Transfer, Size}) -> complete_transfer(Storage, Transfer, Size) end,
+            [{T2, S2}, {T3, S3}]
+        )
+    ),
+    GCFiles2 = ceil(S2 / SS2) + 1,
+    GCFiles3 = ceil(S3 / SS3) + 1,
+    ?assertMatch(
+        #gcstats{
+            files = Files,
+            directories = 4,
+            space = Space,
+            errors = #{} = Es
+        } when
+            Files == (GCFiles2 + GCFiles3) andalso
+                Space > (S2 + S3) andalso
+                map_size(Es) == 0,
+        emqx_ft_storage_fs_gc:collect(Storage)
+    ).
+
+start_transfer(Storage, {Transfer, Name, Gen}) ->
+    Meta = #{
+        name => Name,
+        segments_ttl => 10
+    },
+    ?assertEqual(
+        ok,
+        emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta)
+    ),
+    emqx_ft_content_gen:fold(
+        fun({Content, SegmentNum, #{chunk_size := SegmentSize}}, _Transferred) ->
+            Offset = (SegmentNum - 1) * SegmentSize,
+            ?assertEqual(
+                ok,
+                emqx_ft_storage_fs:store_segment(Storage, Transfer, {Offset, Content})
+            ),
+            Offset + byte_size(Content)
+        end,
+        0,
+        Gen
+    ).
+
+complete_transfer(Storage, Transfer, Size) ->
+    complete_transfer(Storage, Transfer, Size, 100).
+
+complete_transfer(Storage, Transfer, Size, Timeout) ->
+    {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size),
+    MRef = erlang:monitor(process, Pid),
+    Pid ! kickoff,
+    receive
+        {'DOWN', MRef, process, Pid, {shutdown, Result}} ->
+            Result
+    after Timeout ->
+        ct:fail("Assembler did not finish in time")
+    end.
+
+mk_file_id() ->
+    emqx_guid:to_hexstr(emqx_guid:gen()).