Просмотр исходного кода

feat(ft): move out exporter concept into dedicated modules

Andrew Mayorov 2 лет назад
Родитель
Сommit
b6b044f429

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -13,6 +13,7 @@
 {emqx_dashboard,1}.
 {emqx_delayed,1}.
 {emqx_exhook,1}.
+{emqx_ft_storage_exporter_fs,1}.
 {emqx_ft_storage_fs,1}.
 {emqx_ft_storage_fs_reader,1}.
 {emqx_gateway_api_listeners,1}.

+ 6 - 2
apps/emqx_dashboard/test/emqx_dashboard_api_test_helpers.erl

@@ -25,11 +25,12 @@
     request/4,
     multipart_formdata_request/3,
     multipart_formdata_request/4,
+    host/0,
     uri/0,
     uri/1
 ]).
 
--define(HOST, "http://127.0.0.1:18083/").
+-define(HOST, "http://127.0.0.1:18083").
 -define(API_VERSION, "v5").
 -define(BASE_PATH, "api").
 
@@ -94,10 +95,13 @@ request(Username, Method, Url, Body) ->
             {error, Reason}
     end.
 
+host() ->
+    ?HOST.
+
 uri() -> uri([]).
 uri(Parts) when is_list(Parts) ->
     NParts = [E || E <- Parts],
-    ?HOST ++ to_list(filename:join([?BASE_PATH, ?API_VERSION | NParts])).
+    host() ++ "/" ++ to_list(filename:join([?BASE_PATH, ?API_VERSION | NParts])).
 
 auth_header(Username) ->
     Password = <<"public">>,

+ 4 - 0
apps/emqx_ft/i18n/emqx_ft_api_i18n.conf

@@ -11,6 +11,10 @@ emqx_ft_api {
         }
     }
 
+}
+
+emqx_ft_storage_exporter_fs_api {
+
     file_get {
         desc {
             en: "Get a file by its id."

+ 3 - 130
apps/emqx_ft/src/emqx_ft_api.erl

@@ -30,18 +30,12 @@
 ]).
 
 -export([
-    fields/1,
     roots/0
 ]).
 
 %% API callbacks
 -export([
-    '/file_transfer/files'/2,
-    '/file_transfer/file'/2
-]).
-
--export([
-    mk_file_uri/3
+    '/file_transfer/files'/2
 ]).
 
 -import(hoconsc, [mk/2, ref/1, ref/2]).
@@ -53,8 +47,7 @@ api_spec() ->
 
 paths() ->
     [
-        "/file_transfer/files",
-        "/file_transfer/file"
+        "/file_transfer/files"
     ].
 
 schema("/file_transfer/files") ->
@@ -71,31 +64,6 @@ schema("/file_transfer/files") ->
                 )
             }
         }
-    };
-schema("/file_transfer/file") ->
-    % TODO
-    % This is conceptually another API, because this logic is inherent only to the
-    % local filesystem exporter. Ideally, we won't even publish it if `emqx_ft` is
-    % configured with another exporter. Accordingly, things that look too specific
-    % for this module (i.e. `parse_filepath/1`) should go away in another API module.
-    #{
-        'operationId' => '/file_transfer/file',
-        get => #{
-            tags => [<<"file_transfer">>],
-            summary => <<"Download a particular file">>,
-            description => ?DESC("file_get"),
-            parameters => [
-                ref(file_node),
-                ref(file_ref)
-            ],
-            responses => #{
-                200 => <<"Operation success">>,
-                404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Not found">>),
-                503 => emqx_dashboard_swagger:error_codes(
-                    ['SERVICE_UNAVAILABLE'], <<"Service unavailable">>
-                )
-            }
-        }
     }.
 
 '/file_transfer/files'(get, #{}) ->
@@ -106,76 +74,11 @@ schema("/file_transfer/file") ->
             {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
     end.
 
-'/file_transfer/file'(get, #{query_string := Query}) ->
-    try
-        Node = parse_node(maps:get(<<"node">>, Query)),
-        Filepath = parse_filepath(maps:get(<<"fileref">>, Query)),
-        case emqx_ft_storage_fs_proto_v1:read_export_file(Node, Filepath, self()) of
-            {ok, ReaderPid} ->
-                FileData = emqx_ft_storage_fs_reader:table(ReaderPid),
-                {200,
-                    #{
-                        <<"content-type">> => <<"application/data">>,
-                        <<"content-disposition">> => <<"attachment">>
-                    },
-                    FileData};
-            {error, enoent} ->
-                {404, error_msg('NOT_FOUND', <<"Not found">>)};
-            {error, Error} ->
-                ?SLOG(warning, #{msg => "get_ready_transfer_fail", error => Error}),
-                {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
-        end
-    catch
-        throw:{invalid, Param} ->
-            {404,
-                error_msg(
-                    'NOT_FOUND',
-                    iolist_to_binary(["Invalid query parameter: ", Param])
-                )};
-        error:{erpc, noconnection} ->
-            {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
-    end.
-
 error_msg(Code, Msg) ->
     #{code => Code, message => emqx_misc:readable_error_msg(Msg)}.
 
--spec fields(hocon_schema:name()) -> hocon_schema:fields().
-fields(file_ref) ->
-    [
-        {fileref,
-            hoconsc:mk(binary(), #{
-                in => query,
-                desc => <<"File reference">>,
-                example => <<"file1">>,
-                required => true
-            })}
-    ];
-fields(file_node) ->
-    [
-        {node,
-            hoconsc:mk(binary(), #{
-                in => query,
-                desc => <<"Node under which the file is located">>,
-                example => atom_to_list(node()),
-                required => true
-            })}
-    ].
-
 roots() ->
-    [
-        file_node,
-        file_ref
-    ].
-
-mk_file_uri(_Options, Node, Filepath) ->
-    % TODO: qualify with `?BASE_PATH`
-    [
-        "/file_transfer/file?",
-        uri_string:compose_query([
-            {"node", atom_to_list(Node)},
-            {"fileref", Filepath}
-        ])
-    ].
+    [].
 
 %%--------------------------------------------------------------------
 %% Helpers
@@ -207,33 +110,3 @@ format_export_info(
 
 format_timestamp(Timestamp) ->
     iolist_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, second}])).
-
-parse_node(NodeBin) ->
-    case emqx_misc:safe_to_existing_atom(NodeBin) of
-        {ok, Node} ->
-            Node;
-        {error, _} ->
-            throw({invalid, NodeBin})
-    end.
-
-parse_filepath(PathBin) ->
-    case filename:pathtype(PathBin) of
-        relative ->
-            ok;
-        absolute ->
-            throw({invalid, PathBin})
-    end,
-    PathComponents = filename:split(PathBin),
-    case lists:any(fun is_special_component/1, PathComponents) of
-        false ->
-            filename:join(PathComponents);
-        true ->
-            throw({invalid, PathBin})
-    end.
-
-is_special_component(<<".", _/binary>>) ->
-    true;
-is_special_component([$. | _]) ->
-    true;
-is_special_component(_) ->
-    false.

+ 11 - 32
apps/emqx_ft/src/emqx_ft_assembler.erl

@@ -106,11 +106,15 @@ handle_event(internal, _, {list_remote_fragments, Nodes}, St) ->
         {error, _} = Error ->
             {stop, {shutdown, Error}}
     end;
-handle_event(internal, _, start_assembling, St = #st{assembly = Asm}) ->
-    Filemeta = emqx_ft_assembly:filemeta(Asm),
-    Coverage = emqx_ft_assembly:coverage(Asm),
+handle_event(internal, _, start_assembling, St = #st{}) ->
+    Filemeta = emqx_ft_assembly:filemeta(St#st.assembly),
+    Coverage = emqx_ft_assembly:coverage(St#st.assembly),
     % TODO: better error handling
-    {ok, Export} = export_start(Filemeta, St),
+    {ok, Export} = emqx_ft_storage_exporter:start_export(
+        St#st.storage,
+        St#st.transfer,
+        Filemeta
+    ),
     {next_state, {assemble, Coverage}, St#st{export = Export}, ?internal([])};
 handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) ->
     % TODO
@@ -119,17 +123,17 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) ->
     % TODO: pipelining
     % TODO: better error handling
     {ok, Content} = pread(Node, Segment, St),
-    {ok, NExport} = export_write(St#st.export, Content),
+    {ok, NExport} = emqx_ft_storage_exporter:write(St#st.export, Content),
     {next_state, {assemble, Rest}, St#st{export = NExport}, ?internal([])};
 handle_event(internal, _, {assemble, []}, St = #st{}) ->
     {next_state, complete, St, ?internal([])};
 handle_event(internal, _, complete, St = #st{}) ->
-    Result = export_complete(St#st.export),
+    Result = emqx_ft_storage_exporter:complete(St#st.export),
     ok = maybe_garbage_collect(Result, St),
     {stop, {shutdown, Result}, St#st{export = undefined}}.
 
 terminate(_Reason, _StateName, #st{export = Export}) ->
-    Export /= undefined andalso export_discard(Export).
+    Export /= undefined andalso emqx_ft_storage_exporter:discard(Export).
 
 pread(Node, Segment, St) when Node =:= node() ->
     emqx_ft_storage_fs:pread(St#st.storage, St#st.transfer, Segment, 0, segsize(Segment));
@@ -138,31 +142,6 @@ pread(Node, Segment, St) ->
 
 %%
 
-export_start(Filemeta, #st{storage = Storage, transfer = Transfer}) ->
-    {ExporterMod, Exporter} = emqx_ft_storage_fs:exporter(Storage),
-    case ExporterMod:start_export(Exporter, Transfer, Filemeta) of
-        {ok, Export} ->
-            {ok, {ExporterMod, Export}};
-        {error, _} = Error ->
-            Error
-    end.
-
-export_write({ExporterMod, Export}, Content) ->
-    case ExporterMod:write(Export, Content) of
-        {ok, ExportNext} ->
-            {ok, {ExporterMod, ExportNext}};
-        {error, _} = Error ->
-            Error
-    end.
-
-export_complete({ExporterMod, Export}) ->
-    ExporterMod:complete(Export).
-
-export_discard({ExporterMod, Export}) ->
-    ExporterMod:discard(Export).
-
-%%
-
 maybe_garbage_collect(ok, #st{storage = Storage, transfer = Transfer, assembly = Asm}) ->
     Nodes = emqx_ft_assembly:nodes(Asm),
     emqx_ft_storage_fs_gc:collect(Storage, Transfer, Nodes);

+ 12 - 2
apps/emqx_ft/src/emqx_ft_storage.erl

@@ -26,6 +26,7 @@
 
         exports/0,
 
+        with_storage_type/2,
         with_storage_type/3
     ]
 ).
@@ -33,7 +34,9 @@
 -type storage() :: emqx_config:config().
 
 -export_type([assemble_callback/0]).
+-export_type([export_info/0]).
 -export_type([export_data/0]).
+-export_type([reader/0]).
 
 -type assemble_callback() :: fun((ok | {error, term()}) -> any()).
 
@@ -46,6 +49,7 @@
 }.
 
 -type export_data() :: binary() | qlc:query_handle().
+-type reader() :: pid().
 
 %%--------------------------------------------------------------------
 %% Behaviour
@@ -106,11 +110,17 @@ exports() ->
     Mod = mod(),
     Mod:exports(storage()).
 
--spec with_storage_type(atom(), atom(), list(term())) -> any().
+-spec with_storage_type(atom(), atom() | function()) -> any().
+with_storage_type(Type, Fun) ->
+    with_storage_type(Type, Fun, []).
+
+-spec with_storage_type(atom(), atom() | function(), list(term())) -> any().
 with_storage_type(Type, Fun, Args) ->
     Storage = storage(),
     case Storage of
-        #{type := Type} ->
+        #{type := Type} when is_function(Fun) ->
+            apply(Fun, [Storage | Args]);
+        #{type := Type} when is_atom(Fun) ->
             Mod = mod(Storage),
             apply(Mod, Fun, [Storage | Args]);
         disabled ->

+ 97 - 0
apps/emqx_ft/src/emqx_ft_storage_exporter.erl

@@ -0,0 +1,97 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% Filesystem storage exporter
+%%
+%% This is conceptually a part of the Filesystem storage backend that defines
+%% how and where complete tranfers are assembled into files and stored.
+
+-module(emqx_ft_storage_exporter).
+
+%% Export API
+-export([start_export/3]).
+-export([write/2]).
+-export([complete/1]).
+-export([discard/1]).
+
+%% Listing API
+-export([list/1]).
+% TODO
+% -export([list/2]).
+
+-export([exporter/1]).
+
+-type storage() :: emxt_ft_storage_fs:storage().
+-type transfer() :: emqx_ft:transfer().
+-type filemeta() :: emqx_ft:filemeta().
+
+-type options() :: map().
+-type export() :: term().
+
+-callback start_export(options(), transfer(), filemeta()) ->
+    {ok, export()} | {error, _Reason}.
+
+-callback write(ExportSt :: export(), iodata()) ->
+    {ok, ExportSt :: export()} | {error, _Reason}.
+
+-callback complete(ExportSt :: export()) ->
+    ok | {error, _Reason}.
+
+-callback discard(ExportSt :: export()) ->
+    ok | {error, _Reason}.
+
+-callback list(options()) ->
+    {ok, [emqx_ft_storage:export_info()]} | {error, _Reason}.
+
+%%
+
+start_export(Storage, Transfer, Filemeta) ->
+    {ExporterMod, Exporter} = exporter(Storage),
+    case ExporterMod:start_export(Exporter, Transfer, Filemeta) of
+        {ok, ExportSt} ->
+            {ok, {ExporterMod, ExportSt}};
+        {error, _} = Error ->
+            Error
+    end.
+
+write({ExporterMod, ExportSt}, Content) ->
+    case ExporterMod:write(ExportSt, Content) of
+        {ok, ExportStNext} ->
+            {ok, {ExporterMod, ExportStNext}};
+        {error, _} = Error ->
+            Error
+    end.
+
+complete({ExporterMod, ExportSt}) ->
+    ExporterMod:complete(ExportSt).
+
+discard({ExporterMod, ExportSt}) ->
+    ExporterMod:discard(ExportSt).
+
+%%
+
+list(Storage) ->
+    {ExporterMod, ExporterOpts} = exporter(Storage),
+    ExporterMod:list(ExporterOpts).
+
+%%
+
+-spec exporter(storage()) -> {module(), _ExporterOptions}.
+exporter(Storage) ->
+    case maps:get(exporter, Storage) of
+        #{type := local} = Options ->
+            {emqx_ft_storage_exporter_fs, Options}
+    end.

+ 7 - 7
apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl

@@ -30,6 +30,7 @@
 -export([start_reader/3]).
 
 -export([list/1]).
+% TODO
 % -export([list/2]).
 
 -export_type([export/0]).
@@ -228,7 +229,7 @@ mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo) ->
         #{
             transfer => Transfer,
             name => Filename,
-            uri => mk_export_uri(Options, RelFilepath),
+            uri => mk_export_uri(RelFilepath),
             timestamp => Fileinfo#file_info.mtime,
             size => Fileinfo#file_info.size,
             path => filename:join(Root, RelFilepath)
@@ -247,15 +248,14 @@ try_read_filemeta(Filepath, Info) ->
             Info
     end.
 
-mk_export_uri(Options, RelFilepath) ->
-    % emqx_ft_storage_exporter_fs_api:mk_export_uri(Options, RelFilepath).
-    emqx_ft_api:mk_file_uri(Options, node(), RelFilepath).
+mk_export_uri(RelFilepath) ->
+    emqx_ft_storage_exporter_fs_api:mk_export_uri(node(), RelFilepath).
 
 -spec start_reader(options(), file:name(), _Caller :: pid()) ->
     {ok, reader()} | {error, enoent}.
-start_reader(Options, Filepath, CallerPid) ->
+start_reader(Options, RelFilepath, CallerPid) ->
     Root = get_storage_root(Options),
-    case filelib:safe_relative_path(Filepath, Root) of
+    case filelib:safe_relative_path(RelFilepath, Root) of
         SafeFilepath when SafeFilepath /= unsafe ->
             AbsFilepath = filename:join(Root, SafeFilepath),
             emqx_ft_storage_fs_reader:start_supervised(CallerPid, AbsFilepath);
@@ -269,7 +269,7 @@ start_reader(Options, Filepath, CallerPid) ->
     {ok, [exportinfo(), ...]} | {error, file_error()}.
 list(_Options) ->
     Nodes = mria_mnesia:running_nodes(),
-    Results = emqx_ft_storage_fs_proto_v1:list_exports(Nodes),
+    Results = emqx_ft_storage_exporter_fs_proto_v1:list_exports(Nodes),
     {GoodResults, BadResults} = lists:partition(
         fun
             ({_Node, {ok, {ok, _}}}) -> true;

+ 180 - 0
apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl

@@ -0,0 +1,180 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_storage_exporter_fs_api).
+
+-behaviour(minirest_api).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx/include/logger.hrl").
+
+%% Swagger specs from hocon schema
+-export([
+    api_spec/0,
+    paths/0,
+    schema/1,
+    namespace/0
+]).
+
+-export([
+    fields/1,
+    roots/0
+]).
+
+%% API callbacks
+-export([
+    '/file_transfer/file'/2
+]).
+
+-export([mk_export_uri/2]).
+
+%%
+
+namespace() -> "file_transfer".
+
+api_spec() ->
+    emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
+
+paths() ->
+    [
+        "/file_transfer/file"
+    ].
+
+schema("/file_transfer/file") ->
+    #{
+        'operationId' => '/file_transfer/file',
+        get => #{
+            tags => [<<"file_transfer">>],
+            summary => <<"Download a particular file">>,
+            description => ?DESC("file_get"),
+            parameters => [
+                hoconsc:ref(file_node),
+                hoconsc:ref(file_ref)
+            ],
+            responses => #{
+                200 => <<"Operation success">>,
+                404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Not found">>),
+                503 => emqx_dashboard_swagger:error_codes(
+                    ['SERVICE_UNAVAILABLE'], <<"Service unavailable">>
+                )
+            }
+        }
+    }.
+
+roots() ->
+    [
+        file_node,
+        file_ref
+    ].
+
+-spec fields(hocon_schema:name()) -> hocon_schema:fields().
+fields(file_ref) ->
+    [
+        {fileref,
+            hoconsc:mk(binary(), #{
+                in => query,
+                desc => <<"File reference">>,
+                example => <<"file1">>,
+                required => true
+            })}
+    ];
+fields(file_node) ->
+    [
+        {node,
+            hoconsc:mk(binary(), #{
+                in => query,
+                desc => <<"Node under which the file is located">>,
+                example => atom_to_list(node()),
+                required => true
+            })}
+    ].
+
+'/file_transfer/file'(get, #{query_string := Query}) ->
+    try
+        Node = parse_node(maps:get(<<"node">>, Query)),
+        Filepath = parse_filepath(maps:get(<<"fileref">>, Query)),
+        case emqx_ft_storage_exporter_fs_proto_v1:read_export_file(Node, Filepath, self()) of
+            {ok, ReaderPid} ->
+                FileData = emqx_ft_storage_fs_reader:table(ReaderPid),
+                {200,
+                    #{
+                        <<"content-type">> => <<"application/data">>,
+                        <<"content-disposition">> => <<"attachment">>
+                    },
+                    FileData};
+            {error, enoent} ->
+                {404, error_msg('NOT_FOUND', <<"Not found">>)};
+            {error, Error} ->
+                ?SLOG(warning, #{msg => "get_ready_transfer_fail", error => Error}),
+                {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
+        end
+    catch
+        throw:{invalid, Param} ->
+            {404,
+                error_msg(
+                    'NOT_FOUND',
+                    iolist_to_binary(["Invalid query parameter: ", Param])
+                )};
+        error:{erpc, noconnection} ->
+            {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
+    end.
+
+error_msg(Code, Msg) ->
+    #{code => Code, message => emqx_misc:readable_error_msg(Msg)}.
+
+-spec mk_export_uri(node(), file:name()) ->
+    uri_string:uri_string().
+mk_export_uri(Node, Filepath) ->
+    emqx_dashboard_swagger:relative_uri([
+        "/file_transfer/file?",
+        uri_string:compose_query([
+            {"node", atom_to_list(Node)},
+            {"fileref", Filepath}
+        ])
+    ]).
+
+%%
+
+parse_node(NodeBin) ->
+    case emqx_misc:safe_to_existing_atom(NodeBin) of
+        {ok, Node} ->
+            Node;
+        {error, _} ->
+            throw({invalid, NodeBin})
+    end.
+
+parse_filepath(PathBin) ->
+    case filename:pathtype(PathBin) of
+        relative ->
+            ok;
+        absolute ->
+            throw({invalid, PathBin})
+    end,
+    PathComponents = filename:split(PathBin),
+    case lists:any(fun is_special_component/1, PathComponents) of
+        false ->
+            filename:join(PathComponents);
+        true ->
+            throw({invalid, PathBin})
+    end.
+
+is_special_component(<<".", _/binary>>) ->
+    true;
+is_special_component([$. | _]) ->
+    true;
+is_special_component(_) ->
+    false.

+ 46 - 0
apps/emqx_ft/src/emqx_ft_storage_exporter_fs_proxy.erl

@@ -0,0 +1,46 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% This methods are called via rpc by `emqx_ft_storage_exporter_fs`
+%% They populate the call with actual storage which may be configured differently
+%% on a concrete node.
+
+-module(emqx_ft_storage_exporter_fs_proxy).
+
+-export([
+    list_exports_local/0,
+    read_export_file_local/2
+]).
+
+list_exports_local() ->
+    emqx_ft_storage:with_storage_type(local, fun(Storage) ->
+        case emqx_ft_storage_exporter:exporter(Storage) of
+            {emqx_ft_storage_exporter_fs, Options} ->
+                emqx_ft_storage_exporter_fs:list_local(Options);
+            InvalidExporter ->
+                {error, {invalid_exporter, InvalidExporter}}
+        end
+    end).
+
+read_export_file_local(Filepath, CallerPid) ->
+    emqx_ft_storage:with_storage_type(local, fun(Storage) ->
+        case emqx_ft_storage_exporter:exporter(Storage) of
+            {emqx_ft_storage_exporter_fs, Options} ->
+                emqx_ft_storage_exporter_fs:start_reader(Options, Filepath, CallerPid);
+            InvalidExporter ->
+                {error, {invalid_exporter, InvalidExporter}}
+        end
+    end).

+ 1 - 22
apps/emqx_ft/src/emqx_ft_storage_fs.erl

@@ -44,12 +44,7 @@
 -export([get_subdir/2]).
 -export([get_subdir/3]).
 
--export([exporter/1]).
-
-% Exporter-specific API
 -export([exports/1]).
--export([exports_local/1]).
--export([exports_local/2]).
 
 -export_type([storage/0]).
 -export_type([filefrag/1]).
@@ -214,24 +209,8 @@ assemble(Storage, Transfer, Size) ->
 
 %%
 
--spec exporter(storage()) -> {module(), _ExporterOptions}.
-exporter(Storage) ->
-    case maps:get(exporter, Storage) of
-        #{type := local} = Options ->
-            {emqx_ft_storage_exporter_fs, Options}
-    end.
-
 exports(Storage) ->
-    {ExporterMod, ExporterOpts} = exporter(Storage),
-    ExporterMod:list(ExporterOpts).
-
-exports_local(Storage) ->
-    {ExporterMod, ExporterOpts} = exporter(Storage),
-    ExporterMod:list_local(ExporterOpts).
-
-exports_local(Storage, Transfer) ->
-    {ExporterMod, ExporterOpts} = exporter(Storage),
-    ExporterMod:list_local(ExporterOpts, Transfer).
+    emqx_ft_storage_exporter:list(Storage).
 
 %%
 

+ 1 - 19
apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl

@@ -22,9 +22,7 @@
 
 -export([
     list_local/2,
-    pread_local/4,
-    list_exports_local/0,
-    read_export_file_local/2
+    pread_local/4
 ]).
 
 list_local(Transfer, What) ->
@@ -32,19 +30,3 @@ list_local(Transfer, What) ->
 
 pread_local(Transfer, Frag, Offset, Size) ->
     emqx_ft_storage:with_storage_type(local, pread, [Transfer, Frag, Offset, Size]).
-
-list_exports_local() ->
-    case emqx_ft_storage:with_storage_type(local, exporter, []) of
-        {emqx_ft_storage_exporter_fs, Options} ->
-            emqx_ft_storage_exporter_fs:list_local(Options);
-        InvalidExporter ->
-            {error, {invalid_exporter, InvalidExporter}}
-    end.
-
-read_export_file_local(Filepath, CallerPid) ->
-    case emqx_ft_storage:with_storage_type(local, exporter, []) of
-        {emqx_ft_storage_exporter_fs, Options} ->
-            emqx_ft_storage_exporter_fs:start_reader(Options, Filepath, CallerPid);
-        InvalidExporter ->
-            {error, {invalid_exporter, InvalidExporter}}
-    end.

+ 51 - 0
apps/emqx_ft/src/proto/emqx_ft_storage_exporter_fs_proto_v1.erl

@@ -0,0 +1,51 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_storage_exporter_fs_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([introduced_in/0]).
+
+-export([list_exports/1]).
+-export([read_export_file/3]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.0.17".
+
+-spec list_exports([node()]) ->
+    emqx_rpc:erpc_multicall([emqx_ft_storage:export_info()]).
+list_exports(Nodes) ->
+    erpc:multicall(
+        Nodes,
+        emqx_ft_storage_exporter_fs_proxy,
+        list_exports_local,
+        []
+    ).
+
+-spec read_export_file(node(), file:name(), pid()) ->
+    {ok, emqx_ft_storage:reader()}
+    | {error, term()}
+    | no_return().
+read_export_file(Node, Filepath, CallerPid) ->
+    erpc:call(
+        Node,
+        emqx_ft_storage_exporter_fs_proxy,
+        read_export_file_local,
+        [Filepath, CallerPid]
+    ).

+ 0 - 18
apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl

@@ -23,10 +23,6 @@
 -export([multilist/3]).
 -export([pread/5]).
 
-%% TODO: These should be defined in a separate BPAPI
--export([list_exports/1]).
--export([read_export_file/3]).
-
 -type offset() :: emqx_ft:offset().
 -type transfer() :: emqx_ft:transfer().
 -type filefrag() :: emqx_ft_storage_fs:filefrag().
@@ -45,17 +41,3 @@ multilist(Nodes, Transfer, What) ->
     {ok, [filefrag()]} | {error, term()} | no_return().
 pread(Node, Transfer, Frag, Offset, Size) ->
     erpc:call(Node, emqx_ft_storage_fs_proxy, pread_local, [Transfer, Frag, Offset, Size]).
-
-%%
-
--spec list_exports([node()]) ->
-    emqx_rpc:erpc_multicall([emqx_ft_storage:export_info()]).
-list_exports(Nodes) ->
-    erpc:multicall(Nodes, emqx_ft_storage_fs_proxy, list_exports_local, []).
-
--spec read_export_file(node(), file:name(), pid()) ->
-    {ok, emqx_ft_storage:export_data()}
-    | {error, term()}
-    | no_return().
-read_export_file(Node, Filepath, CallerPid) ->
-    erpc:call(Node, emqx_ft_storage_fs_proxy, read_export_file_local, [Filepath, CallerPid]).

+ 2 - 2
apps/emqx_ft/test/emqx_ft_api_SUITE.erl

@@ -24,7 +24,7 @@
 
 -include_lib("emqx/include/asserts.hrl").
 
--import(emqx_mgmt_api_test_util, [uri/1]).
+-import(emqx_dashboard_api_test_helpers, [host/0, uri/1]).
 
 all() -> emqx_common_test_helpers:all(?MODULE).
 
@@ -111,7 +111,7 @@ t_download_transfer(Config) ->
     {ok, 200, #{<<"files">> := [File]}} =
         request(get, uri(["file_transfer", "files"]), fun json/1),
 
-    {ok, 200, Response} = request(get, uri([]) ++ maps:get(<<"uri">>, File)),
+    {ok, 200, Response} = request(get, host() ++ maps:get(<<"uri">>, File)),
 
     ?assertEqual(
         <<"data">>,

+ 15 - 7
apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl

@@ -86,8 +86,7 @@ t_assemble_empty_transfer(Config) ->
     ),
     Status = complete_assemble(Storage, Transfer, 0),
     ?assertEqual({shutdown, ok}, Status),
-    {ok, [_Result = #{size := _Size = 0}]} =
-        emqx_ft_storage_fs:exports_local(Storage, Transfer),
+    {ok, [_Result = #{size := _Size = 0}]} = list_exports(Config, Transfer),
     % ?assertEqual(
     %     {error, eof},
     %     emqx_ft_storage_fs:pread(Storage, Transfer, Result, 0, Size)
@@ -138,10 +137,9 @@ t_assemble_complete_local_transfer(Config) ->
                 meta := #{}
             }
         ]},
-        emqx_ft_storage_fs:exports_local(Storage, Transfer)
+        list_exports(Config, Transfer)
     ),
-    {ok, [#{path := AssemblyFilename}]} =
-        emqx_ft_storage_fs:exports_local(Storage, Transfer),
+    {ok, [#{path := AssemblyFilename}]} = list_exports(Config, Transfer),
     ?assertMatch(
         {ok, #file_info{type = regular, size = TransferSize}},
         file:read_file_info(AssemblyFilename)
@@ -193,8 +191,7 @@ complete_assemble(Storage, Transfer, Size, Timeout) ->
 %%
 
 t_list_transfers(Config) ->
-    Storage = storage(Config),
-    {ok, Exports} = emqx_ft_storage_fs:exports_local(Storage),
+    {ok, Exports} = list_exports(Config),
     ?assertMatch(
         [
             #{
@@ -237,6 +234,17 @@ inspect_file(Filename) ->
 mk_fileid() ->
     integer_to_binary(erlang:system_time(millisecond)).
 
+list_exports(Config) ->
+    {emqx_ft_storage_exporter_fs, Options} = exporter(Config),
+    emqx_ft_storage_exporter_fs:list_local(Options).
+
+list_exports(Config, Transfer) ->
+    {emqx_ft_storage_exporter_fs, Options} = exporter(Config),
+    emqx_ft_storage_exporter_fs:list_local(Options, Transfer).
+
+exporter(Config) ->
+    emqx_ft_storage_exporter:exporter(storage(Config)).
+
 storage(Config) ->
     #{
         type => local,