Преглед изворни кода

Merge pull request #10501 from fix/EMQX-9521/config-updates

feat(ft): properly propagate config updates
Andrew Mayorov пре 2 година
родитељ
комит
9f7e807e06

+ 3 - 0
apps/emqx/src/emqx_maybe.erl

@@ -23,6 +23,9 @@
 -export([define/2]).
 -export([apply/2]).
 
+-type t(T) :: maybe(T).
+-export_type([t/1]).
+
 -spec to_list(maybe(A)) -> [A].
 to_list(undefined) ->
     [];

+ 12 - 18
apps/emqx_ft/src/emqx_ft.erl

@@ -198,8 +198,7 @@ on_file_command(PacketId, FileId, Msg, FileCommand) ->
     end.
 
 on_init(PacketId, Msg, Transfer, Meta) ->
-    ?SLOG(info, #{
-        msg => "on_init",
+    ?tp(info, "file_transfer_init", #{
         mqtt_msg => Msg,
         packet_id => PacketId,
         transfer => Transfer,
@@ -229,8 +228,7 @@ on_abort(_Msg, _FileId) ->
     ?RC_SUCCESS.
 
 on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
-    ?SLOG(info, #{
-        msg => "on_segment",
+    ?tp(info, "file_transfer_segment", #{
         mqtt_msg => Msg,
         packet_id => PacketId,
         transfer => Transfer,
@@ -255,8 +253,7 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
     end).
 
 on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) ->
-    ?SLOG(info, #{
-        msg => "on_fin",
+    ?tp(info, "file_transfer_fin", #{
         mqtt_msg => Msg,
         packet_id => PacketId,
         transfer => Transfer,
@@ -301,8 +298,8 @@ store_filemeta(Transfer, Segment) ->
         emqx_ft_storage:store_filemeta(Transfer, Segment)
     catch
         C:E:S ->
-            ?SLOG(error, #{
-                msg => "start_store_filemeta_failed", class => C, reason => E, stacktrace => S
+            ?tp(error, "start_store_filemeta_failed", #{
+                class => C, reason => E, stacktrace => S
             }),
             {error, {internal_error, E}}
     end.
@@ -312,8 +309,8 @@ store_segment(Transfer, Segment) ->
         emqx_ft_storage:store_segment(Transfer, Segment)
     catch
         C:E:S ->
-            ?SLOG(error, #{
-                msg => "start_store_segment_failed", class => C, reason => E, stacktrace => S
+            ?tp(error, "start_store_segment_failed", #{
+                class => C, reason => E, stacktrace => S
             }),
             {error, {internal_error, E}}
     end.
@@ -323,8 +320,8 @@ assemble(Transfer, FinalSize) ->
         emqx_ft_storage:assemble(Transfer, FinalSize)
     catch
         C:E:S ->
-            ?SLOG(error, #{
-                msg => "start_assemble_failed", class => C, reason => E, stacktrace => S
+            ?tp(error, "start_assemble_failed", #{
+                class => C, reason => E, stacktrace => S
             }),
             {error, {internal_error, E}}
     end.
@@ -334,8 +331,7 @@ transfer(Msg, FileId) ->
     {clientid_to_binary(ClientId), FileId}.
 
 on_complete(Op, {ChanPid, PacketId}, Transfer, Result) ->
-    ?SLOG(debug, #{
-        msg => "on_complete",
+    ?tp(debug, "on_complete", #{
         operation => Op,
         packet_id => PacketId,
         transfer => Transfer
@@ -344,15 +340,13 @@ on_complete(Op, {ChanPid, PacketId}, Transfer, Result) ->
         {Mode, ok} when Mode == ack orelse Mode == down ->
             erlang:send(ChanPid, {puback, PacketId, [], ?RC_SUCCESS});
         {Mode, {error, _} = Reason} when Mode == ack orelse Mode == down ->
-            ?SLOG(error, #{
-                msg => Op ++ "_failed",
+            ?tp(error, Op ++ "_failed", #{
                 transfer => Transfer,
                 reason => Reason
             }),
             erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR});
         timeout ->
-            ?SLOG(error, #{
-                msg => Op ++ "_timed_out",
+            ?tp(error, Op ++ "_timed_out", #{
                 transfer => Transfer
             }),
             erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR})

+ 0 - 2
apps/emqx_ft/src/emqx_ft_app.erl

@@ -22,11 +22,9 @@
 
 start(_StartType, _StartArgs) ->
     {ok, Sup} = emqx_ft_sup:start_link(),
-    ok = emqx_ft:hook(),
     ok = emqx_ft_conf:load(),
     {ok, Sup}.
 
 stop(_State) ->
     ok = emqx_ft_conf:unload(),
-    ok = emqx_ft:unhook(),
     ok.

+ 1 - 1
apps/emqx_ft/src/emqx_ft_assembler.erl

@@ -159,7 +159,7 @@ handle_event(internal, _, {assemble, []}, St = #{}) ->
     {next_state, complete, St, ?internal([])};
 handle_event(internal, _, complete, St = #{export := Export}) ->
     Result = emqx_ft_storage_exporter:complete(Export),
-    ok = maybe_garbage_collect(Result, St),
+    _ = maybe_garbage_collect(Result, St),
     {stop, {shutdown, Result}, maps:remove(export, St)}.
 
 -spec terminate(_Reason, state(), stdata()) -> _.

+ 42 - 42
apps/emqx_ft/src/emqx_ft_conf.erl

@@ -23,6 +23,7 @@
 -include_lib("emqx/include/logger.hrl").
 
 %% Accessors
+-export([enabled/0]).
 -export([storage/0]).
 -export([gc_interval/1]).
 -export([segments_ttl/1]).
@@ -45,49 +46,32 @@
 -type milliseconds() :: non_neg_integer().
 -type seconds() :: non_neg_integer().
 
-%% 5 minutes (s)
--define(DEFAULT_MIN_SEGMENTS_TTL, 300).
-%% 1 day (s)
--define(DEFAULT_MAX_SEGMENTS_TTL, 86400).
-%% 1 minute (ms)
--define(DEFAULT_GC_INTERVAL, 60000).
-
 %%--------------------------------------------------------------------
 %% Accessors
 %%--------------------------------------------------------------------
 
+-spec enabled() -> boolean().
+enabled() ->
+    emqx_config:get([file_transfer, enable], false).
+
 -spec storage() -> _Storage.
 storage() ->
-    emqx_config:get([file_transfer, storage]).
+    emqx_config:get([file_transfer, storage], undefined).
 
--spec gc_interval(_Storage) -> milliseconds().
-gc_interval(_Storage) ->
-    Conf = assert_storage(local),
-    emqx_utils_maps:deep_get([segments, gc, interval], Conf, ?DEFAULT_GC_INTERVAL).
+-spec gc_interval(_Storage) -> emqx_maybe:t(milliseconds()).
+gc_interval(Conf = #{type := local}) ->
+    emqx_utils_maps:deep_get([segments, gc, interval], Conf);
+gc_interval(_) ->
+    undefined.
 
--spec segments_ttl(_Storage) -> {_Min :: seconds(), _Max :: seconds()}.
-segments_ttl(_Storage) ->
-    Conf = assert_storage(local),
+-spec segments_ttl(_Storage) -> emqx_maybe:t({_Min :: seconds(), _Max :: seconds()}).
+segments_ttl(Conf = #{type := local}) ->
     {
-        emqx_utils_maps:deep_get(
-            [segments, gc, minimum_segments_ttl],
-            Conf,
-            ?DEFAULT_MIN_SEGMENTS_TTL
-        ),
-        emqx_utils_maps:deep_get(
-            [segments, gc, maximum_segments_ttl],
-            Conf,
-            ?DEFAULT_MAX_SEGMENTS_TTL
-        )
-    }.
-
-assert_storage(Type) ->
-    case storage() of
-        Conf = #{type := Type} ->
-            Conf;
-        Conf ->
-            error({inapplicable, Conf})
-    end.
+        emqx_utils_maps:deep_get([segments, gc, minimum_segments_ttl], Conf),
+        emqx_utils_maps:deep_get([segments, gc, maximum_segments_ttl], Conf)
+    };
+segments_ttl(_) ->
+    undefined.
 
 init_timeout() ->
     emqx_config:get([file_transfer, init_timeout]).
@@ -104,10 +88,7 @@ store_segment_timeout() ->
 
 -spec load() -> ok.
 load() ->
-    ok = emqx_ft_storage_exporter:update_exporter(
-        undefined,
-        storage()
-    ),
+    ok = on_config_update(#{}, emqx_config:get([file_transfer], #{})),
     emqx_conf:add_handler([file_transfer], ?MODULE).
 
 -spec unload() -> ok.
@@ -131,7 +112,26 @@ pre_config_update(_, Req, _Config) ->
     emqx_config:app_envs()
 ) ->
     ok | {ok, Result :: any()} | {error, Reason :: term()}.
-post_config_update(_Path, _Req, NewConfig, OldConfig, _AppEnvs) ->
-    OldStorageConfig = maps:get(storage, OldConfig, undefined),
-    NewStorageConfig = maps:get(storage, NewConfig, undefined),
-    emqx_ft_storage_exporter:update_exporter(OldStorageConfig, NewStorageConfig).
+post_config_update([file_transfer | _], _Req, NewConfig, OldConfig, _AppEnvs) ->
+    on_config_update(OldConfig, NewConfig).
+
+on_config_update(OldConfig, NewConfig) ->
+    lists:foreach(
+        fun(ConfKey) ->
+            on_config_update(
+                ConfKey,
+                maps:get(ConfKey, OldConfig, undefined),
+                maps:get(ConfKey, NewConfig, undefined)
+            )
+        end,
+        [storage, enable]
+    ).
+
+on_config_update(_, Config, Config) ->
+    ok;
+on_config_update(storage, OldConfig, NewConfig) ->
+    ok = emqx_ft_storage:on_config_update(OldConfig, NewConfig);
+on_config_update(enable, _, true) ->
+    ok = emqx_ft:hook();
+on_config_update(enable, _, false) ->
+    ok = emqx_ft:unhook().

+ 58 - 17
apps/emqx_ft/src/emqx_ft_schema.erl

@@ -25,6 +25,8 @@
 
 -export([schema/1]).
 
+-export([translate/1]).
+
 -type json_value() ::
     null
     | boolean()
@@ -53,6 +55,15 @@ roots() -> [file_transfer].
 
 fields(file_transfer) ->
     [
+        {enable,
+            mk(
+                boolean(),
+                #{
+                    desc => ?DESC("enable"),
+                    required => false,
+                    default => false
+                }
+            )},
         {init_timeout,
             mk(
                 emqx_schema:duration_ms(),
@@ -82,13 +93,22 @@ fields(file_transfer) ->
             )},
         {storage,
             mk(
-                hoconsc:union([
-                    ref(local_storage)
-                ]),
+                hoconsc:union(
+                    fun
+                        (all_union_members) ->
+                            [ref(local_storage)];
+                        ({value, #{<<"type">> := <<"local">>}}) ->
+                            [ref(local_storage)];
+                        ({value, #{<<"type">> := _}}) ->
+                            throw(#{field_name => type, expected => "local"});
+                        ({value, _}) ->
+                            [ref(local_storage)]
+                    end
+                ),
                 #{
                     required => false,
                     desc => ?DESC("storage"),
-                    default => default_storage()
+                    default => #{<<"type">> => <<"local">>}
                 }
             )}
     ];
@@ -108,18 +128,38 @@ fields(local_storage) ->
                 ref(local_storage_segments),
                 #{
                     desc => ?DESC("local_storage_segments"),
-                    required => false
+                    required => false,
+                    default => #{
+                        <<"gc">> => #{}
+                    }
                 }
             )},
         {exporter,
             mk(
-                hoconsc:union([
-                    ref(local_storage_exporter),
-                    ref(s3_exporter)
-                ]),
+                hoconsc:union(
+                    fun
+                        (all_union_members) ->
+                            [
+                                ref(local_storage_exporter),
+                                ref(s3_exporter)
+                            ];
+                        ({value, #{<<"type">> := <<"local">>}}) ->
+                            [ref(local_storage_exporter)];
+                        ({value, #{<<"type">> := <<"s3">>}}) ->
+                            [ref(s3_exporter)];
+                        ({value, #{<<"type">> := _}}) ->
+                            throw(#{field_name => type, expected => "local | s3"});
+                        ({value, _}) ->
+                            % NOTE: default
+                            [ref(local_storage_exporter)]
+                    end
+                ),
                 #{
                     desc => ?DESC("local_storage_exporter"),
-                    required => true
+                    required => true,
+                    default => #{
+                        <<"type">> => <<"local">>
+                    }
                 }
             )}
     ];
@@ -277,10 +317,11 @@ converter(unicode_string) ->
 ref(Ref) ->
     ref(?MODULE, Ref).
 
-default_storage() ->
-    #{
-        <<"type">> => <<"local">>,
-        <<"exporter">> => #{
-            <<"type">> => <<"local">>
-        }
-    }.
+translate(Conf) ->
+    [Root] = roots(),
+    maps:get(
+        Root,
+        hocon_tconf:check_plain(
+            ?MODULE, #{atom_to_binary(Root) => Conf}, #{atom_key => true}, [Root]
+        )
+    ).

+ 76 - 22
apps/emqx_ft/src/emqx_ft_storage.erl

@@ -27,7 +27,9 @@
         files/0,
 
         with_storage_type/2,
-        with_storage_type/3
+        with_storage_type/3,
+
+        on_config_update/2
     ]
 ).
 
@@ -90,26 +92,35 @@ child_spec() ->
 -spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) ->
     ok | {async, pid()} | {error, term()}.
 store_filemeta(Transfer, FileMeta) ->
-    Mod = mod(),
-    Mod:store_filemeta(storage(), Transfer, FileMeta).
+    with_storage(store_filemeta, [Transfer, FileMeta]).
 
 -spec store_segment(emqx_ft:transfer(), emqx_ft:segment()) ->
     ok | {async, pid()} | {error, term()}.
 store_segment(Transfer, Segment) ->
-    Mod = mod(),
-    Mod:store_segment(storage(), Transfer, Segment).
+    with_storage(store_segment, [Transfer, Segment]).
 
 -spec assemble(emqx_ft:transfer(), emqx_ft:bytes()) ->
     ok | {async, pid()} | {error, term()}.
 assemble(Transfer, Size) ->
-    Mod = mod(),
-    Mod:assemble(storage(), Transfer, Size).
+    with_storage(assemble, [Transfer, Size]).
 
 -spec files() ->
     {ok, [file_info()]} | {error, term()}.
 files() ->
-    Mod = mod(),
-    Mod:files(storage()).
+    with_storage(files, []).
+
+-spec with_storage(atom() | function()) -> any().
+with_storage(Fun) ->
+    with_storage(Fun, []).
+
+-spec with_storage(atom() | function(), list(term())) -> any().
+with_storage(Fun, Args) ->
+    case storage() of
+        Storage = #{} ->
+            apply_storage(Storage, Fun, Args);
+        undefined ->
+            {error, disabled}
+    end.
 
 -spec with_storage_type(atom(), atom() | function()) -> any().
 with_storage_type(Type, Fun) ->
@@ -117,17 +128,61 @@ with_storage_type(Type, Fun) ->
 
 -spec with_storage_type(atom(), atom() | function(), list(term())) -> any().
 with_storage_type(Type, Fun, Args) ->
-    Storage = storage(),
-    case Storage of
-        #{type := Type} when is_function(Fun) ->
-            apply(Fun, [Storage | Args]);
-        #{type := Type} when is_atom(Fun) ->
-            Mod = mod(Storage),
-            apply(Mod, Fun, [Storage | Args]);
-        disabled ->
-            {error, disabled};
-        _ ->
-            {error, {invalid_storage_type, Type}}
+    with_storage(fun(Storage) ->
+        case Storage of
+            #{type := Type} ->
+                apply_storage(Storage, Fun, Args);
+            _ ->
+                {error, {invalid_storage_type, Storage}}
+        end
+    end).
+
+apply_storage(Storage, Fun, Args) when is_atom(Fun) ->
+    apply(mod(Storage), Fun, [Storage | Args]);
+apply_storage(Storage, Fun, Args) when is_function(Fun) ->
+    apply(Fun, [Storage | Args]).
+
+%%
+
+-spec on_config_update(_Old :: emqx_maybe:t(storage()), _New :: emqx_maybe:t(storage())) ->
+    ok.
+on_config_update(Storage, Storage) ->
+    ok;
+on_config_update(#{type := Type} = StorageOld, #{type := Type} = StorageNew) ->
+    ok = (mod(StorageNew)):on_config_update(StorageOld, StorageNew);
+on_config_update(StorageOld, StorageNew) ->
+    _ = emqx_maybe:apply(fun on_storage_stop/1, StorageOld),
+    _ = emqx_maybe:apply(fun on_storage_start/1, StorageNew),
+    _ = emqx_maybe:apply(
+        fun(Storage) -> (mod(Storage)):on_config_update(StorageOld, StorageNew) end,
+        StorageNew
+    ),
+    ok.
+
+on_storage_start(Storage = #{type := _}) ->
+    lists:foreach(
+        fun(ChildSpec) ->
+            {ok, _Child} = supervisor:start_child(emqx_ft_sup, ChildSpec)
+        end,
+        child_spec(Storage)
+    ).
+
+on_storage_stop(Storage = #{type := _}) ->
+    lists:foreach(
+        fun(#{id := ChildId}) ->
+            _ = supervisor:terminate_child(emqx_ft_sup, ChildId),
+            ok = supervisor:delete_child(emqx_ft_sup, ChildId)
+        end,
+        child_spec(Storage)
+    ).
+
+child_spec(Storage) ->
+    try
+        Mod = mod(Storage),
+        Mod:child_spec(Storage)
+    catch
+        error:disabled -> [];
+        error:undef -> []
     end.
 
 %%--------------------------------------------------------------------
@@ -144,7 +199,6 @@ mod(Storage) ->
     case Storage of
         #{type := local} ->
             emqx_ft_storage_fs;
-        disabled ->
+        undefined ->
             error(disabled)
-        % emqx_ft_storage_dummy
     end.

+ 22 - 24
apps/emqx_ft/src/emqx_ft_storage_exporter.erl

@@ -31,7 +31,7 @@
 -export([list/1]).
 
 %% Lifecycle API
--export([update_exporter/2]).
+-export([on_config_update/2]).
 
 %% Internal API
 -export([exporter/1]).
@@ -141,30 +141,28 @@ list(Storage) ->
 
 %% Lifecycle
 
--spec update_exporter(emqx_config:config(), emqx_config:config()) -> ok | {error, term()}.
-update_exporter(
-    #{exporter := #{type := OldType}} = OldConfig,
-    #{exporter := #{type := OldType}} = NewConfig
-) ->
-    {ExporterMod, OldExporterOpts} = exporter(OldConfig),
-    {_NewExporterMod, NewExporterOpts} = exporter(NewConfig),
-    ExporterMod:update(OldExporterOpts, NewExporterOpts);
-update_exporter(
-    #{exporter := _} = OldConfig,
-    #{exporter := _} = NewConfig
-) ->
-    {OldExporterMod, OldExporterOpts} = exporter(OldConfig),
-    {NewExporterMod, NewExporterOpts} = exporter(NewConfig),
-    ok = OldExporterMod:stop(OldExporterOpts),
-    NewExporterMod:start(NewExporterOpts);
-update_exporter(undefined, NewConfig) ->
-    {ExporterMod, ExporterOpts} = exporter(NewConfig),
-    ExporterMod:start(ExporterOpts);
-update_exporter(OldConfig, undefined) ->
-    {ExporterMod, ExporterOpts} = exporter(OldConfig),
-    ExporterMod:stop(ExporterOpts);
-update_exporter(_, _) ->
+-spec on_config_update(storage(), storage()) -> ok | {error, term()}.
+on_config_update(StorageOld, StorageNew) ->
+    on_exporter_update(
+        emqx_maybe:apply(fun exporter/1, StorageOld),
+        emqx_maybe:apply(fun exporter/1, StorageNew)
+    ).
+
+on_exporter_update(Config, Config) ->
+    ok;
+on_exporter_update({ExporterMod, ConfigOld}, {ExporterMod, ConfigNew}) ->
+    ExporterMod:update(ConfigOld, ConfigNew);
+on_exporter_update(ExporterOld, ExporterNew) ->
+    _ = emqx_maybe:apply(fun stop_exporter/1, ExporterOld),
+    _ = emqx_maybe:apply(fun start_exporter/1, ExporterNew),
     ok.
+
+start_exporter({ExporterMod, ExporterOpts}) ->
+    ok = ExporterMod:start(ExporterOpts).
+
+stop_exporter({ExporterMod, ExporterOpts}) ->
+    ok = ExporterMod:stop(ExporterOpts).
+
 %%------------------------------------------------------------------------------
 %% Internal functions
 %%------------------------------------------------------------------------------

+ 10 - 0
apps/emqx_ft/src/emqx_ft_storage_fs.erl

@@ -47,6 +47,8 @@
 
 -export([files/1]).
 
+-export([on_config_update/2]).
+
 -export_type([storage/0]).
 -export_type([filefrag/1]).
 -export_type([filefrag/0]).
@@ -96,6 +98,7 @@
 }.
 
 -type storage() :: #{
+    type := 'local',
     segments := segments(),
     exporter := emqx_ft_storage_exporter:exporter()
 }.
@@ -219,6 +222,13 @@ files(Storage) ->
 
 %%
 
+on_config_update(StorageOld, StorageNew) ->
+    % NOTE: this will reset GC timer, frequent changes would postpone GC indefinitely
+    ok = emqx_ft_storage_fs_gc:reset(StorageNew),
+    emqx_ft_storage_exporter:on_config_update(StorageOld, StorageNew).
+
+%%
+
 -spec transfers(storage()) ->
     {ok, #{transfer() => transferinfo()}}.
 transfers(Storage) ->

+ 70 - 55
apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl

@@ -29,8 +29,9 @@
 
 -export([start_link/1]).
 
--export([collect/1]).
+-export([collect/0]).
 -export([collect/3]).
+-export([reset/0]).
 -export([reset/1]).
 
 -behaviour(gen_server).
@@ -40,38 +41,43 @@
 -export([handle_info/2]).
 
 -record(st, {
-    storage :: emqx_ft_storage_fs:storage(),
     next_gc_timer :: maybe(reference()),
     last_gc :: maybe(gcstats())
 }).
 
 -type gcstats() :: #gcstats{}.
 
+-define(IS_ENABLED(INTERVAL), (is_integer(INTERVAL) andalso INTERVAL > 0)).
+
 %%
 
 start_link(Storage) ->
-    gen_server:start_link(mk_server_ref(Storage), ?MODULE, Storage, []).
+    gen_server:start_link(mk_server_ref(global), ?MODULE, Storage, []).
+
+-spec collect() -> gcstats().
+collect() ->
+    gen_server:call(mk_server_ref(global), {collect, erlang:system_time()}, infinity).
 
--spec collect(emqx_ft_storage_fs:storage()) -> gcstats().
-collect(Storage) ->
-    gen_server:call(mk_server_ref(Storage), {collect, erlang:system_time()}, infinity).
+-spec reset() -> ok.
+reset() ->
+    reset(emqx_ft_conf:storage()).
 
 -spec reset(emqx_ft_storage_fs:storage()) -> ok.
 reset(Storage) ->
-    gen_server:cast(mk_server_ref(Storage), reset).
+    gen_server:cast(mk_server_ref(global), {reset, gc_interval(Storage)}).
 
 collect(Storage, Transfer, Nodes) ->
-    gen_server:cast(mk_server_ref(Storage), {collect, Transfer, Nodes}).
+    gc_enabled(Storage) andalso cast_collect(mk_server_ref(global), Storage, Transfer, Nodes).
 
-mk_server_ref(Storage) ->
+mk_server_ref(Name) ->
     % TODO
-    {via, gproc, {n, l, {?MODULE, get_segments_root(Storage)}}}.
+    {via, gproc, {n, l, {?MODULE, Name}}}.
 
 %%
 
 init(Storage) ->
-    St = #st{storage = Storage},
-    {ok, start_timer(St)}.
+    St = #st{},
+    {ok, start_timer(gc_interval(Storage), St)}.
 
 handle_call({collect, CalledAt}, _From, St) ->
     StNext = maybe_collect_garbage(CalledAt, St),
@@ -80,22 +86,17 @@ handle_call(Call, From, St) ->
     ?SLOG(error, #{msg => "unexpected_call", call => Call, from => From}),
     {noreply, St}.
 
-handle_cast({collect, Transfer, [Node | Rest]}, St) ->
-    case gc_enabled(St) of
-        true ->
-            ok = do_collect_transfer(Transfer, Node, St),
-            case Rest of
-                [_ | _] ->
-                    gen_server:cast(self(), {collect, Transfer, Rest});
-                [] ->
-                    ok
-            end;
-        false ->
-            skip
+handle_cast({collect, Storage, Transfer, [Node | Rest]}, St) ->
+    ok = do_collect_transfer(Storage, Transfer, Node, St),
+    case Rest of
+        [_ | _] ->
+            cast_collect(self(), Storage, Transfer, Rest);
+        [] ->
+            ok
     end,
     {noreply, St};
-handle_cast(reset, St) ->
-    {noreply, reset_timer(St)};
+handle_cast({reset, Interval}, St) ->
+    {noreply, start_timer(Interval, cancel_timer(St))};
 handle_cast(Cast, St) ->
     ?SLOG(error, #{msg => "unexpected_cast", cast => Cast}),
     {noreply, St}.
@@ -104,14 +105,17 @@ handle_info({timeout, TRef, collect}, St = #st{next_gc_timer = TRef}) ->
     StNext = do_collect_garbage(St),
     {noreply, start_timer(StNext#st{next_gc_timer = undefined})}.
 
-do_collect_transfer(Transfer, Node, St = #st{storage = Storage}) when Node == node() ->
+do_collect_transfer(Storage, Transfer, Node, St = #st{}) when Node == node() ->
     Stats = try_collect_transfer(Storage, Transfer, complete, init_gcstats()),
     ok = maybe_report(Stats, St),
     ok;
-do_collect_transfer(_Transfer, _Node, _St = #st{}) ->
+do_collect_transfer(_Storage, _Transfer, _Node, _St = #st{}) ->
     % TODO
     ok.
 
+cast_collect(Ref, Storage, Transfer, Nodes) ->
+    gen_server:cast(Ref, {collect, Storage, Transfer, Nodes}).
+
 maybe_collect_garbage(_CalledAt, St = #st{last_gc = undefined}) ->
     do_collect_garbage(St);
 maybe_collect_garbage(CalledAt, St = #st{last_gc = #gcstats{finished_at = FinishedAt}}) ->
@@ -119,36 +123,41 @@ maybe_collect_garbage(CalledAt, St = #st{last_gc = #gcstats{finished_at = Finish
         true ->
             St;
         false ->
-            reset_timer(do_collect_garbage(St))
+            start_timer(do_collect_garbage(cancel_timer(St)))
     end.
 
-do_collect_garbage(St = #st{storage = Storage}) ->
-    Stats = collect_garbage(Storage),
-    ok = maybe_report(Stats, St),
-    St#st{last_gc = Stats}.
+do_collect_garbage(St = #st{}) ->
+    emqx_ft_storage:with_storage_type(local, fun(Storage) ->
+        Stats = collect_garbage(Storage),
+        ok = maybe_report(Stats, Storage),
+        St#st{last_gc = Stats}
+    end).
 
-maybe_report(#gcstats{errors = Errors}, #st{storage = Storage}) when map_size(Errors) > 0 ->
+maybe_report(#gcstats{errors = Errors}, Storage) when map_size(Errors) > 0 ->
     ?tp(warning, "garbage_collection_errors", #{errors => Errors, storage => Storage});
-maybe_report(#gcstats{} = _Stats, #st{storage = _Storage}) ->
+maybe_report(#gcstats{} = _Stats, _Storage) ->
     ?tp(garbage_collection, #{stats => _Stats, storage => _Storage}).
 
-start_timer(St = #st{storage = Storage, next_gc_timer = undefined}) ->
-    case emqx_ft_conf:gc_interval(Storage) of
-        Delay when Delay > 0 ->
-            St#st{next_gc_timer = emqx_utils:start_timer(Delay, collect)};
-        0 ->
-            ?SLOG(warning, #{msg => "periodic_gc_disabled"}),
-            St
-    end.
+start_timer(St) ->
+    start_timer(gc_interval(emqx_ft_conf:storage()), St).
+
+start_timer(Interval, St = #st{next_gc_timer = undefined}) when ?IS_ENABLED(Interval) ->
+    St#st{next_gc_timer = emqx_utils:start_timer(Interval, collect)};
+start_timer(Interval, St) ->
+    ?SLOG(warning, #{msg => "periodic_gc_disabled", interval => Interval}),
+    St.
 
-reset_timer(St = #st{next_gc_timer = undefined}) ->
-    start_timer(St);
-reset_timer(St = #st{next_gc_timer = TRef}) ->
+cancel_timer(St = #st{next_gc_timer = undefined}) ->
+    St;
+cancel_timer(St = #st{next_gc_timer = TRef}) ->
     ok = emqx_utils:cancel_timer(TRef),
-    start_timer(St#st{next_gc_timer = undefined}).
+    St#st{next_gc_timer = undefined}.
 
-gc_enabled(St) ->
-    emqx_ft_conf:gc_interval(St#st.storage) > 0.
+gc_enabled(Storage) ->
+    ?IS_ENABLED(gc_interval(Storage)).
+
+gc_interval(Storage) ->
+    emqx_ft_conf:gc_interval(Storage).
 
 %%
 
@@ -175,8 +184,13 @@ try_collect_transfer(Storage, Transfer, TransferInfo = #{}, Stats) ->
     % heuristic we only delete transfer directory itself only if it is also outdated
     % _and was empty at the start of GC_, as a precaution against races between
     % writers and GCs.
-    TTL = get_segments_ttl(Storage, TransferInfo),
-    Cutoff = erlang:system_time(second) - TTL,
+    Cutoff =
+        case get_segments_ttl(Storage, TransferInfo) of
+            TTL when is_integer(TTL) ->
+                erlang:system_time(second) - TTL;
+            undefined ->
+                0
+        end,
     {FragCleaned, Stats1} = collect_outdated_fragments(Storage, Transfer, Cutoff, Stats),
     {TempCleaned, Stats2} = collect_outdated_tempfiles(Storage, Transfer, Cutoff, Stats1),
     % TODO: collect empty directories separately
@@ -338,16 +352,17 @@ filepath_to_binary(S) ->
     unicode:characters_to_binary(S, unicode, file:native_name_encoding()).
 
 get_segments_ttl(Storage, TransferInfo) ->
-    {MinTTL, MaxTTL} = emqx_ft_conf:segments_ttl(Storage),
-    clamp(MinTTL, MaxTTL, try_get_filemeta_ttl(TransferInfo)).
+    clamp(emqx_ft_conf:segments_ttl(Storage), try_get_filemeta_ttl(TransferInfo)).
 
 try_get_filemeta_ttl(#{filemeta := Filemeta}) ->
     maps:get(segments_ttl, Filemeta, undefined);
 try_get_filemeta_ttl(#{}) ->
     undefined.
 
-clamp(Min, Max, V) ->
-    min(Max, max(Min, V)).
+clamp({Min, Max}, V) ->
+    min(Max, max(Min, V));
+clamp(undefined, V) ->
+    V.
 
 %%
 

+ 1 - 1
apps/emqx_ft/src/emqx_ft_sup.erl

@@ -61,5 +61,5 @@ init([]) ->
         modules => [emqx_ft_responder_sup]
     },
 
-    ChildSpecs = [Responder, AssemblerSup, FileReaderSup | emqx_ft_storage:child_spec()],
+    ChildSpecs = [Responder, AssemblerSup, FileReaderSup],
     {ok, {SupFlags, ChildSpecs}}.

+ 1 - 0
apps/emqx_ft/test/emqx_ft_SUITE.erl

@@ -63,6 +63,7 @@ set_special_configs(Config) ->
                 % NOTE
                 % Inhibit local fs GC to simulate it isn't fast enough to collect
                 % complete transfers.
+                enable => true,
                 storage => emqx_utils_maps:deep_merge(
                     Storage,
                     #{segments => #{gc => #{interval => 0}}}

+ 1 - 11
apps/emqx_ft/test/emqx_ft_api_SUITE.erl

@@ -30,7 +30,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
     ok = emqx_mgmt_api_test_util:init_suite(
-        [emqx_conf, emqx_ft], set_special_configs(Config)
+        [emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config)
     ),
     {ok, _} = emqx:update_config([rpc, port_discovery], manual),
     Config.
@@ -38,16 +38,6 @@ end_per_suite(_Config) ->
     ok = emqx_mgmt_api_test_util:end_suite([emqx_ft, emqx_conf]),
     ok.
 
-set_special_configs(Config) ->
-    fun
-        (emqx_ft) ->
-            emqx_ft_test_helpers:load_config(#{
-                storage => emqx_ft_test_helpers:local_storage(Config)
-            });
-        (_) ->
-            ok
-    end.
-
 init_per_testcase(Case, Config) ->
     [{tc, Case} | Config].
 end_per_testcase(_Case, _Config) ->

+ 16 - 12
apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl

@@ -46,8 +46,8 @@ init_per_testcase(TC, Config) ->
     ok = snabbkaffe:start_trace(),
     {ok, Pid} = emqx_ft_assembler_sup:start_link(),
     [
-        {storage_root, "file_transfer_root"},
-        {exports_root, "file_transfer_exports"},
+        {storage_root, <<"file_transfer_root">>},
+        {exports_root, <<"file_transfer_exports">>},
         {file_id, atom_to_binary(TC)},
         {assembler_sup, Pid}
         | Config
@@ -246,13 +246,17 @@ exporter(Config) ->
     emqx_ft_storage_exporter:exporter(storage(Config)).
 
 storage(Config) ->
-    #{
-        type => local,
-        segments => #{
-            root => ?config(storage_root, Config)
-        },
-        exporter => #{
-            type => local,
-            root => ?config(exports_root, Config)
-        }
-    }.
+    maps:get(
+        storage,
+        emqx_ft_schema:translate(#{
+            <<"storage">> => #{
+                <<"type">> => <<"local">>,
+                <<"segments">> => #{
+                    <<"root">> => ?config(storage_root, Config)
+                },
+                <<"exporter">> => #{
+                    <<"root">> => ?config(exports_root, Config)
+                }
+            }
+        })
+    ).

+ 158 - 7
apps/emqx_ft/test/emqx_ft_conf_SUITE.erl

@@ -21,26 +21,26 @@
 
 -include_lib("common_test/include/ct.hrl").
 -include_lib("stdlib/include/assert.hrl").
+-include_lib("snabbkaffe/include/test_macros.hrl").
 
 all() -> emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    _ = emqx_config:save_schema_mod_and_names(emqx_ft_schema),
-    ok = emqx_common_test_helpers:start_apps(
-        [emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config)
-    ),
-    {ok, _} = emqx:update_config([rpc, port_discovery], manual),
     Config.
 
 end_per_suite(_Config) ->
-    ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]),
     ok.
 
 init_per_testcase(_Case, Config) ->
+    % NOTE: running each testcase with clean config
+    _ = emqx_config:save_schema_mod_and_names(emqx_ft_schema),
+    ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], fun(_) -> ok end),
+    {ok, _} = emqx:update_config([rpc, port_discovery], manual),
     Config.
 
 end_per_testcase(_Case, _Config) ->
-    ok.
+    ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]),
+    ok = emqx_config:erase(file_transfer).
 
 %%--------------------------------------------------------------------
 %% Tests
@@ -60,6 +60,7 @@ t_update_config(_Config) ->
         emqx_conf:update(
             [file_transfer],
             #{
+                <<"enable">> => true,
                 <<"storage">> => #{
                     <<"type">> => <<"local">>,
                     <<"segments">> => #{
@@ -85,3 +86,153 @@ t_update_config(_Config) ->
         5 * 60 * 1000,
         emqx_ft_conf:gc_interval(emqx_ft_conf:storage())
     ).
+
+t_disable_restore_config(Config) ->
+    ?assertMatch(
+        {ok, _},
+        emqx_conf:update(
+            [file_transfer],
+            #{<<"enable">> => true, <<"storage">> => #{<<"type">> => <<"local">>}},
+            #{}
+        )
+    ),
+    ?assertEqual(
+        60 * 60 * 1000,
+        emqx_ft_conf:gc_interval(emqx_ft_conf:storage())
+    ),
+    % Verify that transfers work
+    ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <<?MODULE_STRING>>),
+    % Verify that clearing storage settings reverts config to defaults
+    ?assertMatch(
+        {ok, _},
+        emqx_conf:update(
+            [file_transfer],
+            #{<<"enable">> => false, <<"storage">> => undefined},
+            #{}
+        )
+    ),
+    ?assertEqual(
+        false,
+        emqx_ft_conf:enabled()
+    ),
+    ?assertMatch(
+        #{type := local, exporter := #{type := local}},
+        emqx_ft_conf:storage()
+    ),
+    ClientId = gen_clientid(),
+    Client = emqx_ft_test_helpers:start_client(ClientId),
+    % Verify that transfers fail cleanly when storage is disabled
+    ?check_trace(
+        ?assertMatch(
+            {ok, #{reason_code_name := no_matching_subscribers}},
+            emqtt:publish(
+                Client,
+                <<"$file/f2/init">>,
+                emqx_utils_json:encode(emqx_ft:encode_filemeta(#{name => "f2", size => 42})),
+                1
+            )
+        ),
+        fun(Trace) ->
+            ?assertMatch([], ?of_kind("file_transfer_init", Trace))
+        end
+    ),
+    ok = emqtt:stop(Client),
+    % Restore local storage backend
+    Root = iolist_to_binary(emqx_ft_test_helpers:root(Config, node(), [segments])),
+    ?assertMatch(
+        {ok, _},
+        emqx_conf:update(
+            [file_transfer],
+            #{
+                <<"enable">> => true,
+                <<"storage">> => #{
+                    <<"type">> => <<"local">>,
+                    <<"segments">> => #{
+                        <<"root">> => Root,
+                        <<"gc">> => #{<<"interval">> => <<"1s">>}
+                    }
+                }
+            },
+            #{}
+        )
+    ),
+    % Verify that GC is getting triggered eventually
+    ?check_trace(
+        ?block_until(#{?snk_kind := garbage_collection}, 5000, 0),
+        fun(Trace) ->
+            ?assertMatch(
+                [
+                    #{
+                        ?snk_kind := garbage_collection,
+                        storage := #{
+                            type := local,
+                            segments := #{root := Root}
+                        }
+                    }
+                ],
+                ?of_kind(garbage_collection, Trace)
+            )
+        end
+    ),
+    % Verify that transfers work again
+    ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <<?MODULE_STRING>>).
+
+t_switch_exporter(_Config) ->
+    ?assertMatch(
+        {ok, _},
+        emqx_conf:update(
+            [file_transfer],
+            #{<<"enable">> => true},
+            #{}
+        )
+    ),
+    ?assertMatch(
+        #{type := local, exporter := #{type := local}},
+        emqx_ft_conf:storage()
+    ),
+    % Verify that switching to a different exporter works
+    ?assertMatch(
+        {ok, _},
+        emqx_conf:update(
+            [file_transfer, storage, exporter],
+            #{
+                <<"type">> => <<"s3">>,
+                <<"bucket">> => <<"emqx">>,
+                <<"host">> => <<"https://localhost">>,
+                <<"port">> => 9000,
+                <<"transport_options">> => #{
+                    <<"ipv6_probe">> => false
+                }
+            },
+            #{}
+        )
+    ),
+    ?assertMatch(
+        #{type := local, exporter := #{type := s3}},
+        emqx_ft_conf:storage()
+    ),
+    % Verify that switching back to local exporter works
+    ?assertMatch(
+        {ok, _},
+        emqx_conf:remove(
+            [file_transfer, storage, exporter],
+            #{}
+        )
+    ),
+    ?assertMatch(
+        {ok, _},
+        emqx_conf:update(
+            [file_transfer, storage, exporter],
+            #{<<"type">> => <<"local">>},
+            #{}
+        )
+    ),
+    ?assertMatch(
+        #{type := local, exporter := #{type := local}},
+        emqx_ft_conf:storage()
+    ),
+    % Verify that transfers work
+    ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <<?MODULE_STRING>>).
+
+gen_clientid() ->
+    emqx_base62:encode(emqx_guid:gen()).

+ 1 - 11
apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl

@@ -35,22 +35,12 @@ groups() ->
     ].
 
 init_per_suite(Config) ->
-    ok = emqx_common_test_helpers:start_apps([emqx_ft], set_special_configs(Config)),
+    ok = emqx_common_test_helpers:start_apps([emqx_ft], emqx_ft_test_helpers:env_handler(Config)),
     Config.
 end_per_suite(_Config) ->
     ok = emqx_common_test_helpers:stop_apps([emqx_ft]),
     ok.
 
-set_special_configs(Config) ->
-    fun
-        (emqx_ft) ->
-            emqx_ft_test_helpers:load_config(#{
-                storage => emqx_ft_test_helpers:local_storage(Config)
-            });
-        (_) ->
-            ok
-    end.
-
 init_per_testcase(Case, Config) ->
     [{tc, Case} | Config].
 end_per_testcase(_Case, _Config) ->

+ 6 - 6
apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl

@@ -68,7 +68,7 @@ end_per_testcase(_TC, _Config) ->
 t_gc_triggers_periodically(_Config) ->
     Interval = 500,
     ok = set_gc_config(interval, Interval),
-    ok = emqx_ft_storage_fs_gc:reset(emqx_ft_conf:storage()),
+    ok = emqx_ft_storage_fs_gc:reset(),
     ?check_trace(
         timer:sleep(Interval * 3),
         fun(Trace) ->
@@ -92,7 +92,7 @@ t_gc_triggers_manually(_Config) ->
         ?assertMatch(
             #gcstats{files = 0, directories = 0, space = 0, errors = #{} = Errors} when
                 map_size(Errors) == 0,
-            emqx_ft_storage_fs_gc:collect(emqx_ft_conf:storage())
+            emqx_ft_storage_fs_gc:collect()
         ),
         fun(Trace) ->
             [Event] = ?of_kind(garbage_collection, Trace),
@@ -108,7 +108,7 @@ t_gc_complete_transfers(_Config) ->
     ok = set_gc_config(minimum_segments_ttl, 0),
     ok = set_gc_config(maximum_segments_ttl, 3),
     ok = set_gc_config(interval, 500),
-    ok = emqx_ft_storage_fs_gc:reset(Storage),
+    ok = emqx_ft_storage_fs_gc:reset(),
     Transfers = [
         {
             T1 = {<<"client1">>, mk_file_id()},
@@ -134,7 +134,7 @@ t_gc_complete_transfers(_Config) ->
     ?assertEqual([S1, S2, S3], TransferSizes),
     ?assertMatch(
         #gcstats{files = 0, directories = 0, errors = #{} = Es} when map_size(Es) == 0,
-        emqx_ft_storage_fs_gc:collect(Storage)
+        emqx_ft_storage_fs_gc:collect()
     ),
     % 2. Complete just the first transfer
     {ok, {ok, Event}} = ?wait_async_action(
@@ -224,7 +224,7 @@ t_gc_incomplete_transfers(_Config) ->
     _ = emqx_utils:pmap(fun(Transfer) -> start_transfer(Storage, Transfer) end, Transfers),
     % 2. Enable periodic GC every 0.5 seconds.
     ok = set_gc_config(interval, 500),
-    ok = emqx_ft_storage_fs_gc:reset(Storage),
+    ok = emqx_ft_storage_fs_gc:reset(),
     % 3. First we need the first transfer to be collected.
     {ok, _} = ?block_until(
         #{
@@ -304,7 +304,7 @@ t_gc_handling_errors(_Config) ->
                     {directory, DirTransfer2} := eexist
                 }
             } when Files == ?NSEGS(Size, SegSize) * 2 andalso Space > Size * 2,
-            emqx_ft_storage_fs_gc:collect(Storage)
+            emqx_ft_storage_fs_gc:collect()
         ),
         fun(Trace) ->
             ?assertMatch(

+ 12 - 5
apps/emqx_ft/test/emqx_ft_test_helpers.erl

@@ -41,7 +41,7 @@ stop_additional_node(Node) ->
 env_handler(Config) ->
     fun
         (emqx_ft) ->
-            load_config(#{storage => local_storage(Config)});
+            load_config(#{enable => true, storage => local_storage(Config)});
         (_) ->
             ok
     end.
@@ -68,15 +68,22 @@ tcp_port(Node) ->
 root(Config, Node, Tail) ->
     filename:join([?config(priv_dir, Config), "file_transfer", Node | Tail]).
 
+start_client(ClientId) ->
+    start_client(ClientId, node()).
+
+start_client(ClientId, Node) ->
+    Port = tcp_port(Node),
+    {ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]),
+    {ok, _} = emqtt:connect(Client),
+    Client.
+
 upload_file(ClientId, FileId, Name, Data) ->
     upload_file(ClientId, FileId, Name, Data, node()).
 
 upload_file(ClientId, FileId, Name, Data, Node) ->
-    Port = tcp_port(Node),
-    Size = byte_size(Data),
+    C1 = start_client(ClientId, Node),
 
-    {ok, C1} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]),
-    {ok, _} = emqtt:connect(C1),
+    Size = byte_size(Data),
     Meta = #{
         name => Name,
         expire_at => erlang:system_time(_Unit = second) + 3600,

+ 1 - 1
apps/emqx_s3/src/emqx_s3_client.erl

@@ -57,7 +57,7 @@
     port := part_number(),
     bucket := string(),
     headers := headers(),
-    acl := emqx_s3:acl(),
+    acl := emqx_s3:acl() | undefined,
     url_expire_time := pos_integer(),
     access_key_id := string() | undefined,
     secret_access_key := string() | undefined,

+ 0 - 1
apps/emqx_s3/src/emqx_s3_schema.erl

@@ -105,7 +105,6 @@ fields(s3) ->
                     bucket_owner_full_control
                 ]),
                 #{
-                    default => private,
                     desc => ?DESC("acl"),
                     required => false
                 }

+ 0 - 1
apps/emqx_s3/test/emqx_s3_schema_SUITE.erl

@@ -23,7 +23,6 @@ t_minimal_config(_Config) ->
             bucket := "bucket",
             host := "s3.us-east-1.endpoint.com",
             port := 443,
-            acl := private,
             min_part_size := 5242880,
             transport_options :=
                 #{

+ 6 - 0
rel/i18n/emqx_ft_schema.hocon

@@ -1,5 +1,11 @@
 emqx_ft_schema {
 
+enable.desc:
+"""Enable the File Transfer feature.<br/>
+Enabling File Transfer implies reserving special MQTT topics in order to serve the protocol.<br/>
+This toggle does not have an effect neither on the availability of the File Transfer REST API, nor
+on storage-dependent background activities (e.g. garbage collection)."""
+
 init_timeout.desc:
 """Timeout for initializing the file transfer.<br/>
 After reaching the timeout, `init` message will be acked with an error"""