Просмотр исходного кода

feat(ft): add config & backend behaviour

Ilya Averyanov 3 лет назад
Родитель
Сommit
72e3eee6c9

+ 2 - 1
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -63,7 +63,8 @@
     emqx_psk_schema,
     emqx_psk_schema,
     emqx_limiter_schema,
     emqx_limiter_schema,
     emqx_slow_subs_schema,
     emqx_slow_subs_schema,
-    emqx_mgmt_api_key_schema
+    emqx_mgmt_api_key_schema,
+    emqx_ft_schema
 ]).
 ]).
 
 
 %% root config should not have a namespace
 %% root config should not have a namespace

+ 14 - 0
apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf

@@ -0,0 +1,14 @@
+emqx_ft_schema {
+
+    local {
+        desc {
+          en: "Use local file system to store uploaded files and temporary data."
+          zh: "使用本地文件系统来存储上传的文件和临时数据。"
+        }
+        label: {
+              en: "Local Storage"
+              zh: "本地存储"
+            }
+    }
+
+}

+ 81 - 45
apps/emqx_ft/src/emqx_ft.erl

@@ -41,9 +41,13 @@
 
 
 -export([on_assemble_timeout/1]).
 -export([on_assemble_timeout/1]).
 
 
--export_type([clientid/0]).
--export_type([transfer/0]).
--export_type([offset/0]).
+-export_type([
+    clientid/0,
+    transfer/0,
+    offset/0,
+    filemeta/0,
+    segment/0
+]).
 
 
 %% Number of bytes
 %% Number of bytes
 -type bytes() :: non_neg_integer().
 -type bytes() :: non_neg_integer().
@@ -55,6 +59,26 @@
 -type transfer() :: {clientid(), fileid()}.
 -type transfer() :: {clientid(), fileid()}.
 -type offset() :: bytes().
 -type offset() :: bytes().
 
 
+-type filemeta() :: #{
+    %% Display name
+    name := string(),
+    %% Size in bytes, as advertised by the client.
+    %% Client is free to specify here whatever it wants, which means we can end
+    %% up with a file of different size after assembly. It's not clear from
+    %% specification what that means (e.g. what are clients' expectations), we
+    %% currently do not condider that an error (or, specifically, a signal that
+    %% the resulting file is corrupted during transmission).
+    size => _Bytes :: non_neg_integer(),
+    checksum => {sha256, <<_:256>>},
+    expire_at := emqx_datetime:epoch_second(),
+    %% TTL of individual segments
+    %% Somewhat confusing that we won't know it on the nodes where the filemeta
+    %% is missing.
+    segments_ttl => _Seconds :: pos_integer()
+}.
+
+-type segment() :: {offset(), _Content :: binary()}.
+
 -type ft_data() :: #{
 -type ft_data() :: #{
     nodes := list(node())
     nodes := list(node())
 }.
 }.
@@ -135,23 +159,9 @@ on_message_puback(PacketId, #message{topic = Topic} = Msg, _PubRes, _RC) ->
     end.
     end.
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
-%% Private funs
+%% Handlers for transfer messages
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
-get_ft_data(ChanPid) ->
-    case ets:lookup(?FT_TAB, ChanPid) of
-        [#emqx_ft{ft_data = FTData}] -> {ok, FTData};
-        [] -> none
-    end.
-
-delete_ft_data(ChanPid) ->
-    true = ets:delete(?FT_TAB, ChanPid),
-    ok.
-
-put_ft_data(ChanPid, FTData) ->
-    true = ets:insert(?FT_TAB, #emqx_ft{chan_pid = ChanPid, ft_data = FTData}),
-    ok.
-
 on_file_command(PacketId, Msg, FileCommand) ->
 on_file_command(PacketId, Msg, FileCommand) ->
     case string:split(FileCommand, <<"/">>, all) of
     case string:split(FileCommand, <<"/">>, all) of
         [FileId, <<"init">>] ->
         [FileId, <<"init">>] ->
@@ -176,12 +186,16 @@ on_init(Msg, FileId) ->
         mqtt_msg => Msg,
         mqtt_msg => Msg,
         file_id => FileId
         file_id => FileId
     }),
     }),
-    % Payload = Msg#message.payload,
+    Payload = Msg#message.payload,
     % %% Add validations here
     % %% Add validations here
-    % Meta = emqx_json:decode(Payload, [return_maps]),
-    % ok = emqx_ft_storage_fs:store_filemeta(storage(), transfer(Msg, FileId), Meta),
-    % ?RC_SUCCESS.
-    ?RC_UNSPECIFIED_ERROR.
+    Meta = emqx_json:decode(Payload, [return_maps]),
+    case emqx_ft_storage:store_filemeta(transfer(Msg, FileId), Meta) of
+        {ok, Ctx} ->
+            ok = put_context(Ctx),
+            ?RC_SUCCESS;
+        {error, _Reason} ->
+            ?RC_UNSPECIFIED_ERROR
+    end.
 
 
 on_abort(_Msg, _FileId) ->
 on_abort(_Msg, _FileId) ->
     %% TODO
     %% TODO
@@ -195,16 +209,17 @@ on_segment(Msg, FileId, Offset, Checksum) ->
         offset => Offset,
         offset => Offset,
         checksum => Checksum
         checksum => Checksum
     }),
     }),
-    % %% TODO: handle checksum
-    % Payload = Msg#message.payload,
-    % %% Add offset/checksum validations
-    % ok = emqx_ft_storage_fs:store_segment(
-    %     storage(),
-    %     transfer(Msg, FileId),
-    %     {binary_to_integer(Offset), Payload}
-    % ),
-    % ?RC_SUCCESS.
-    ?RC_UNSPECIFIED_ERROR.
+    %% TODO: handle checksum
+    Payload = Msg#message.payload,
+    Segment = {binary_to_integer(Offset), Payload},
+    %% Add offset/checksum validations
+    case emqx_ft_storage:store_segment(get_context(), transfer(Msg, FileId), Segment) of
+        {ok, Ctx} ->
+            ok = put_context(Ctx),
+            ?RC_SUCCESS;
+        {error, _Reason} ->
+            ?RC_UNSPECIFIED_ERROR
+    end.
 
 
 on_fin(PacketId, Msg, FileId, Checksum) ->
 on_fin(PacketId, Msg, FileId, Checksum) ->
     ?SLOG(info, #{
     ?SLOG(info, #{
@@ -227,7 +242,7 @@ on_fin(PacketId, Msg, FileId, Checksum) ->
                 Callback = callback(FinPacketKey, FileId),
                 Callback = callback(FinPacketKey, FileId),
                 case assemble(transfer(Msg, FileId), Callback) of
                 case assemble(transfer(Msg, FileId), Callback) of
                     %% Assembling started, packet will be acked by the callback or the responder
                     %% Assembling started, packet will be acked by the callback or the responder
-                    ok ->
+                    {ok, _} ->
                         undefined;
                         undefined;
                     %% Assembling failed, unregister the packet key
                     %% Assembling failed, unregister the packet key
                     {error, _} ->
                     {error, _} ->
@@ -250,16 +265,12 @@ on_fin(PacketId, Msg, FileId, Checksum) ->
                 undefined
                 undefined
         end.
         end.
 
 
-assemble(_Transfer, _Callback) ->
-    % spawn(fun() -> Callback({error, not_implemented}) end),
-    ok.
-
-% assemble(Transfer, Callback) ->
-%     emqx_ft_storage_fs:assemble(
-%         storage(),
-%         Transfer,
-%         Callback
-%     ).
+assemble(Transfer, Callback) ->
+    emqx_ft_storage:assemble(
+        get_context(),
+        Transfer,
+        Callback
+    ).
 
 
 callback({ChanPid, PacketId} = Key, _FileId) ->
 callback({ChanPid, PacketId} = Key, _FileId) ->
     fun(Result) ->
     fun(Result) ->
@@ -281,9 +292,34 @@ transfer(Msg, FileId) ->
     {ClientId, FileId}.
     {ClientId, FileId}.
 
 
 %% TODO: configure
 %% TODO: configure
+
 storage() ->
 storage() ->
-    filename:join(emqx:data_dir(), "file_transfer").
+    emqx_config:get([file_transfer, storage]).
 
 
 on_assemble_timeout({ChanPid, PacketId}) ->
 on_assemble_timeout({ChanPid, PacketId}) ->
     ?SLOG(warning, #{msg => "on_assemble_timeout", packet_id => PacketId}),
     ?SLOG(warning, #{msg => "on_assemble_timeout", packet_id => PacketId}),
     erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}).
     erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}).
+
+%%--------------------------------------------------------------------
+%% Context management
+%%--------------------------------------------------------------------
+
+get_context() ->
+    get_ft_data(self()).
+
+put_context(Context) ->
+    put_ft_data(self(), Context).
+
+get_ft_data(ChanPid) ->
+    case ets:lookup(?FT_TAB, ChanPid) of
+        [#emqx_ft{ft_data = FTData}] -> {ok, FTData};
+        [] -> none
+    end.
+
+delete_ft_data(ChanPid) ->
+    true = ets:delete(?FT_TAB, ChanPid),
+    ok.
+
+put_ft_data(ChanPid, FTData) ->
+    true = ets:insert(?FT_TAB, #emqx_ft{chan_pid = ChanPid, ft_data = FTData}),
+    ok.

+ 2 - 0
apps/emqx_ft/src/emqx_ft_app.erl

@@ -23,8 +23,10 @@
 start(_StartType, _StartArgs) ->
 start(_StartType, _StartArgs) ->
     {ok, Sup} = emqx_ft_sup:start_link(),
     {ok, Sup} = emqx_ft_sup:start_link(),
     ok = emqx_ft:hook(),
     ok = emqx_ft:hook(),
+    ok = emqx_ft_conf:load(),
     {ok, Sup}.
     {ok, Sup}.
 
 
 stop(_State) ->
 stop(_State) ->
+    ok = emqx_ft_conf:unload(),
     ok = emqx_ft:unhook(),
     ok = emqx_ft:unhook(),
     ok.
     ok.

+ 65 - 0
apps/emqx_ft/src/emqx_ft_conf.erl

@@ -0,0 +1,65 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% @doc File Transfer configuration management module
+
+-module(emqx_ft_conf).
+
+-behaviour(emqx_config_handler).
+
+%% Load/Unload
+-export([
+    load/0,
+    unload/0
+]).
+
+%% callbacks for emqx_config_handler
+-export([
+    pre_config_update/3,
+    post_config_update/5
+]).
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+-spec load() -> ok.
+load() ->
+    emqx_conf:add_handler([file_transfer], ?MODULE).
+
+-spec unload() -> ok.
+unload() ->
+    emqx_conf:remove_handler([file_transfer]).
+
+%%--------------------------------------------------------------------
+%% emqx_config_handler callbacks
+%%--------------------------------------------------------------------
+
+-spec pre_config_update(list(atom()), emqx_config:update_request(), emqx_config:raw_config()) ->
+    {ok, emqx_config:update_request()} | {error, term()}.
+pre_config_update(_, _Req, Config) ->
+    {ok, Config}.
+
+-spec post_config_update(
+    list(atom()),
+    emqx_config:update_request(),
+    emqx_config:config(),
+    emqx_config:config(),
+    emqx_config:app_envs()
+) ->
+    ok | {ok, Result :: any()} | {error, Reason :: term()}.
+post_config_update(_, _Req, _NewConfig, _OldConfig, _AppEnvs) ->
+    ok.

+ 1 - 0
apps/emqx_ft/src/emqx_ft_responder.erl

@@ -79,6 +79,7 @@ handle_call({register, Key, DefaultAction, Timeout}, _From, State) ->
             {reply, {error, already_registered}, State}
             {reply, {error, already_registered}, State}
     end;
     end;
 handle_call({unregister, Key}, _From, State) ->
 handle_call({unregister, Key}, _From, State) ->
+    ?SLOG(warning, #{msg => "unregister", key => Key}),
     case ets:lookup(?TAB, Key) of
     case ets:lookup(?TAB, Key) of
         [] ->
         [] ->
             {reply, {error, not_found}, State};
             {reply, {error, not_found}, State};

+ 49 - 0
apps/emqx_ft/src/emqx_ft_schema.erl

@@ -0,0 +1,49 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_schema).
+
+-behaviour(hocon_schema).
+
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("typerefl/include/types.hrl").
+
+-export([namespace/0, roots/0, fields/1, tags/0]).
+
+namespace() -> file_transfer.
+
+tags() ->
+    [<<"File Transfer">>].
+
+roots() -> [file_transfer].
+
+fields(file_transfer) ->
+    [
+        {storage, #{
+            type => hoconsc:union([
+                hoconsc:ref(?MODULE, local_storage)
+            ])
+        }}
+    ];
+fields(local_storage) ->
+    [
+        {type, #{
+            type => local,
+            default => local,
+            required => false,
+            desc => ?DESC("local")
+        }}
+    ].

+ 75 - 0
apps/emqx_ft/src/emqx_ft_storage.erl

@@ -0,0 +1,75 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_storage).
+
+-export(
+    [
+        store_filemeta/2,
+        store_segment/3,
+        assemble/3
+    ]
+).
+
+-type ctx() :: term().
+-type storage() :: emqx_config:config().
+
+-export_type([assemble_callback/0]).
+
+-type assemble_callback() :: fun((ok | {error, term()}) -> any()).
+
+%%--------------------------------------------------------------------
+%% behaviour
+%%--------------------------------------------------------------------
+
+-callback store_filemeta(storage(), emqx_ft:transfer(), emqx_ft:filemeta()) ->
+    {ok, ctx()} | {error, term()}.
+-callback store_segment(storage(), ctx(), emqx_ft:transfer(), emqx_ft:segment()) ->
+    {ok, ctx()} | {error, term()}.
+-callback assemble(storage(), ctx(), emqx_ft:transfer(), assemble_callback()) ->
+    {ok, pid()} | {error, term()}.
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+-spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) ->
+    {ok, ctx()} | {error, term()}.
+store_filemeta(Transfer, FileMeta) ->
+    Mod = mod(),
+    Mod:store_filemeta(storage(), Transfer, FileMeta).
+
+-spec store_segment(ctx(), emqx_ft:transfer(), emqx_ft:segment()) ->
+    {ok, ctx()} | {error, term()}.
+store_segment(Ctx, Transfer, Segment) ->
+    Mod = mod(),
+    Mod:store_segment(storage(), Ctx, Transfer, Segment).
+
+-spec assemble(ctx(), emqx_ft:transfer(), assemble_callback()) ->
+    {ok, pid()} | {error, term()}.
+assemble(Ctx, Transfer, Callback) ->
+    Mod = mod(),
+    Mod:assemble(storage(), Ctx, Transfer, Callback).
+
+mod() ->
+    case storage() of
+        #{type := local} ->
+            % emqx_ft_storage_fs
+            emqx_ft_storage_dummy
+    end.
+
+storage() ->
+    emqx_config:get([file_transfer, storage]).

+ 35 - 0
apps/emqx_ft/src/emqx_ft_storage_dummy.erl

@@ -0,0 +1,35 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_storage_dummy).
+
+-behaviour(emqx_ft_storage).
+
+-export([
+    store_filemeta/3,
+    store_segment/4,
+    assemble/4
+]).
+
+store_filemeta(_Storage, _Transfer, _Meta) ->
+    {ok, #{}}.
+
+store_segment(_Storage, Ctx, _Transfer, _Segment) ->
+    {ok, Ctx}.
+
+assemble(_Storage, _Ctx, _Transfer, Callback) ->
+    Pid = spawn(fun() -> Callback({error, not_implemented}) end),
+    {ok, Pid}.

+ 18 - 53
apps/emqx_ft/src/emqx_ft_storage_fs.erl

@@ -19,10 +19,10 @@
 -include_lib("typerefl/include/types.hrl").
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 
 
-% -compile(export_all).
+-behaviour(emqx_ft_storage).
 
 
 -export([store_filemeta/3]).
 -export([store_filemeta/3]).
--export([store_segment/3]).
+-export([store_segment/4]).
 -export([list/2]).
 -export([list/2]).
 -export([read_segment/5]).
 -export([read_segment/5]).
 -export([assemble/3]).
 -export([assemble/3]).
@@ -32,45 +32,10 @@
 -export([write/2]).
 -export([write/2]).
 -export([discard/1]).
 -export([discard/1]).
 
 
-% -behaviour(gen_server).
-% -export([init/1]).
-% -export([handle_call/3]).
-% -export([handle_cast/2]).
-
--type json_value() ::
-    null
-    | boolean()
-    | binary()
-    | number()
-    | [json_value()]
-    | #{binary() => json_value()}.
-
--reflect_type([json_value/0]).
-
 -type transfer() :: emqx_ft:transfer().
 -type transfer() :: emqx_ft:transfer().
 -type offset() :: emqx_ft:offset().
 -type offset() :: emqx_ft:offset().
 
 
-%% TODO: move to `emqx_ft` interface module
-% -type sha256_hex() :: <<_:512>>.
-
--type filemeta() :: #{
-    %% Display name
-    name := string(),
-    %% Size in bytes, as advertised by the client.
-    %% Client is free to specify here whatever it wants, which means we can end
-    %% up with a file of different size after assembly. It's not clear from
-    %% specification what that means (e.g. what are clients' expectations), we
-    %% currently do not condider that an error (or, specifically, a signal that
-    %% the resulting file is corrupted during transmission).
-    size => _Bytes :: non_neg_integer(),
-    checksum => {sha256, <<_:256>>},
-    expire_at := emqx_datetime:epoch_second(),
-    %% TTL of individual segments
-    %% Somewhat confusing that we won't know it on the nodes where the filemeta
-    %% is missing.
-    segments_ttl => _Seconds :: pos_integer(),
-    user_data => json_value()
-}.
+-type filemeta() :: emqx_ft:filemeta().
 
 
 -type segment() :: {offset(), _Content :: binary()}.
 -type segment() :: {offset(), _Content :: binary()}.
 
 
@@ -113,13 +78,14 @@
 %% Atomic operation.
 %% Atomic operation.
 -spec store_filemeta(storage(), transfer(), filemeta()) ->
 -spec store_filemeta(storage(), transfer(), filemeta()) ->
     % Quota? Some lower level errors?
     % Quota? Some lower level errors?
-    ok | {error, conflict} | {error, _TODO}.
+    {ok, emqx_ft_storage:ctx()} | {error, conflict} | {error, _TODO}.
 store_filemeta(Storage, Transfer, Meta) ->
 store_filemeta(Storage, Transfer, Meta) ->
     Filepath = mk_filepath(Storage, Transfer, ?MANIFEST),
     Filepath = mk_filepath(Storage, Transfer, ?MANIFEST),
     case read_file(Filepath, fun decode_filemeta/1) of
     case read_file(Filepath, fun decode_filemeta/1) of
         {ok, Meta} ->
         {ok, Meta} ->
             _ = touch_file(Filepath),
             _ = touch_file(Filepath),
-            ok;
+            %% No context is needed for this implementation.
+            {ok, #{}};
         {ok, _Conflict} ->
         {ok, _Conflict} ->
             % TODO
             % TODO
             % We won't see conflicts in case of concurrent `store_filemeta`
             % We won't see conflicts in case of concurrent `store_filemeta`
@@ -132,13 +98,18 @@ store_filemeta(Storage, Transfer, Meta) ->
 
 
 %% Store a segment in the backing filesystem.
 %% Store a segment in the backing filesystem.
 %% Atomic operation.
 %% Atomic operation.
--spec store_segment(storage(), transfer(), segment()) ->
+-spec store_segment(storage(), emqx_ft_storage:ctx(), transfer(), segment()) ->
     % Where is the checksum gets verified? Upper level probably.
     % Where is the checksum gets verified? Upper level probably.
     % Quota? Some lower level errors?
     % Quota? Some lower level errors?
     ok | {error, _TODO}.
     ok | {error, _TODO}.
-store_segment(Storage, Transfer, Segment = {_Offset, Content}) ->
+store_segment(Storage, Ctx, Transfer, Segment = {_Offset, Content}) ->
     Filepath = mk_filepath(Storage, Transfer, mk_segment_filename(Segment)),
     Filepath = mk_filepath(Storage, Transfer, mk_segment_filename(Segment)),
-    write_file_atomic(Filepath, Content).
+    case write_file_atomic(Filepath, Content) of
+        ok ->
+            {ok, Ctx};
+        {error, _} = Error ->
+            Error
+    end.
 
 
 -spec list(storage(), transfer()) ->
 -spec list(storage(), transfer()) ->
     % Some lower level errors? {error, notfound}?
     % Some lower level errors? {error, notfound}?
@@ -178,13 +149,10 @@ read_segment(_Storage, _Transfer, Segment, Offset, Size) ->
     end.
     end.
 
 
 -spec assemble(storage(), transfer(), fun((ok | {error, term()}) -> any())) ->
 -spec assemble(storage(), transfer(), fun((ok | {error, term()}) -> any())) ->
-    % {ok, _Assembler :: pid()} | {error, incomplete} | {error, badrpc} | {error, _TODO}.
     {ok, _Assembler :: pid()} | {error, _TODO}.
     {ok, _Assembler :: pid()} | {error, _TODO}.
-assemble(Storage, Transfer, Callback) ->
+assemble(Storage, _Ctx, Transfer, Callback) ->
     emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback).
     emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback).
 
 
-%%
-
 -type handle() :: {file:name(), io:device(), crypto:hash_state()}.
 -type handle() :: {file:name(), io:device(), crypto:hash_state()}.
 
 
 -spec open_file(storage(), transfer(), filemeta()) ->
 -spec open_file(storage(), transfer(), filemeta()) ->
@@ -268,8 +236,7 @@ schema() ->
             {size, hoconsc:mk(non_neg_integer())},
             {size, hoconsc:mk(non_neg_integer())},
             {expire_at, hoconsc:mk(non_neg_integer())},
             {expire_at, hoconsc:mk(non_neg_integer())},
             {checksum, hoconsc:mk({atom(), binary()}, #{converter => converter(checksum)})},
             {checksum, hoconsc:mk({atom(), binary()}, #{converter => converter(checksum)})},
-            {segments_ttl, hoconsc:mk(pos_integer())},
-            {user_data, hoconsc:mk(json_value())}
+            {segments_ttl, hoconsc:mk(pos_integer())}
         ]
         ]
     }.
     }.
 
 
@@ -354,10 +321,8 @@ mk_filedir(Storage, {ClientId, FileId}) ->
 mk_filepath(Storage, Transfer, Filename) ->
 mk_filepath(Storage, Transfer, Filename) ->
     filename:join(mk_filedir(Storage, Transfer), Filename).
     filename:join(mk_filedir(Storage, Transfer), Filename).
 
 
-get_storage_root(Storage) ->
-    Storage.
-
-%%
+get_storage_root(_Storage) ->
+    filename:join(emqx:data_dir(), "file_transfer").
 
 
 -include_lib("kernel/include/file.hrl").
 -include_lib("kernel/include/file.hrl").