Procházet zdrojové kódy

Merge pull request #10296 from ft/EMQX-9416/fs-iterators

feat(ft-api): add paging support through cursors
Andrew Mayorov před 2 roky
rodič
revize
4100c31d4a

+ 119 - 10
apps/emqx_ft/src/emqx_ft_api.erl

@@ -19,7 +19,6 @@
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
--include_lib("emqx/include/logger.hrl").
 
 %% Swagger specs from hocon schema
 -export([
@@ -30,12 +29,14 @@
 ]).
 
 -export([
-    roots/0
+    roots/0,
+    fields/1
 ]).
 
 %% API callbacks
 -export([
-    '/file_transfer/files'/2
+    '/file_transfer/files'/2,
+    '/file_transfer/files/:clientid/:fileid'/2
 ]).
 
 -import(hoconsc, [mk/2, ref/1, ref/2]).
@@ -47,7 +48,8 @@ api_spec() ->
 
 paths() ->
     [
-        "/file_transfer/files"
+        "/file_transfer/files",
+        "/file_transfer/files/:clientid/:fileid"
     ].
 
 schema("/file_transfer/files") ->
@@ -57,29 +59,133 @@ schema("/file_transfer/files") ->
             tags => [<<"file_transfer">>],
             summary => <<"List all uploaded files">>,
             description => ?DESC("file_list"),
+            parameters => [
+                ref(following),
+                ref(emqx_dashboard_swagger, limit)
+            ],
             responses => #{
                 200 => <<"Operation success">>,
+                400 => emqx_dashboard_swagger:error_codes(
+                    ['BAD_REQUEST'], <<"Invalid cursor">>
+                ),
                 503 => emqx_dashboard_swagger:error_codes(
-                    ['SERVICE_UNAVAILABLE'], <<"Service unavailable">>
+                    ['SERVICE_UNAVAILABLE'], error_desc('SERVICE_UNAVAILABLE')
+                )
+            }
+        }
+    };
+schema("/file_transfer/files/:clientid/:fileid") ->
+    #{
+        'operationId' => '/file_transfer/files/:clientid/:fileid',
+        get => #{
+            tags => [<<"file_transfer">>],
+            summary => <<"List files uploaded in a specific transfer">>,
+            description => ?DESC("file_list_transfer"),
+            parameters => [
+                ref(client_id),
+                ref(file_id)
+            ],
+            responses => #{
+                200 => <<"Operation success">>,
+                404 => emqx_dashboard_swagger:error_codes(
+                    ['FILES_NOT_FOUND'], error_desc('FILES_NOT_FOUND')
+                ),
+                503 => emqx_dashboard_swagger:error_codes(
+                    ['SERVICE_UNAVAILABLE'], error_desc('SERVICE_UNAVAILABLE')
                 )
             }
         }
     }.
 
-'/file_transfer/files'(get, #{}) ->
-    case emqx_ft_storage:files() of
-        {ok, Files} ->
-            {200, #{<<"files">> => lists:map(fun format_file_info/1, Files)}};
+'/file_transfer/files'(get, #{
+    query_string := QueryString
+}) ->
+    try
+        Limit = limit(QueryString),
+        Query =
+            case maps:get(<<"following">>, QueryString, undefined) of
+                undefined ->
+                    #{limit => Limit};
+                Cursor ->
+                    #{limit => Limit, following => Cursor}
+            end,
+        case emqx_ft_storage:files(Query) of
+            {ok, Page} ->
+                {200, format_page(Page)};
+            {error, _} ->
+                {503, error_msg('SERVICE_UNAVAILABLE')}
+        end
+    catch
+        error:{badarg, cursor} ->
+            {400, error_msg('BAD_REQUEST', <<"Invalid cursor">>)}
+    end.
+
+'/file_transfer/files/:clientid/:fileid'(get, #{
+    bindings := #{clientid := ClientId, fileid := FileId}
+}) ->
+    Transfer = {ClientId, FileId},
+    case emqx_ft_storage:files(#{transfer => Transfer}) of
+        {ok, Page} ->
+            {200, format_page(Page)};
+        {error, [{_Node, enoent} | _]} ->
+            {404, error_msg('FILES_NOT_FOUND')};
         {error, _} ->
-            {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
+            {503, error_msg('SERVICE_UNAVAILABLE')}
     end.
 
+format_page(#{items := Files, cursor := Cursor}) ->
+    #{
+        <<"files">> => lists:map(fun format_file_info/1, Files),
+        <<"cursor">> => Cursor
+    };
+format_page(#{items := Files}) ->
+    #{
+        <<"files">> => lists:map(fun format_file_info/1, Files)
+    }.
+
+error_msg(Code) ->
+    #{code => Code, message => error_desc(Code)}.
+
 error_msg(Code, Msg) ->
     #{code => Code, message => emqx_utils:readable_error_msg(Msg)}.
 
+error_desc('FILES_NOT_FOUND') ->
+    <<"Files requested for this transfer could not be found">>;
+error_desc('SERVICE_UNAVAILABLE') ->
+    <<"Service unavailable">>.
+
 roots() ->
     [].
 
+-spec fields(hocon_schema:name()) -> [hoconsc:field()].
+fields(client_id) ->
+    [
+        {clientid,
+            mk(binary(), #{
+                in => path,
+                desc => <<"MQTT Client ID">>,
+                required => true
+            })}
+    ];
+fields(file_id) ->
+    [
+        {fileid,
+            mk(binary(), #{
+                in => path,
+                desc => <<"File ID">>,
+                required => true
+            })}
+    ];
+fields(following) ->
+    [
+        {following,
+            mk(binary(), #{
+                in => query,
+                desc => <<"Cursor to start listing files from">>,
+                required => false
+            })}
+    ].
+
 %%--------------------------------------------------------------------
 %% Helpers
 %%--------------------------------------------------------------------
@@ -115,3 +221,6 @@ format_name(NameBin) when is_binary(NameBin) ->
     NameBin;
 format_name(Name) when is_list(Name) ->
     iolist_to_binary(Name).
+
+limit(QueryString) ->
+    maps:get(<<"limit">>, QueryString, emqx_mgmt:default_row_limit()).

+ 235 - 0
apps/emqx_ft/src/emqx_ft_fs_iterator.erl

@@ -0,0 +1,235 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_fs_iterator).
+
+-export([new/2]).
+-export([next/1]).
+-export([next_leaf/1]).
+
+-export([seek/3]).
+
+-export([fold/3]).
+-export([fold_n/4]).
+
+-export_type([t/0]).
+-export_type([glob/0]).
+-export_type([pathstack/0]).
+
+-type root() :: file:name().
+-type glob() :: ['*' | globfun()].
+-type globfun() ::
+    fun((_Filename :: file:name()) -> boolean())
+    | fun((_Filename :: file:name(), pathstack()) -> boolean()).
+
+% A path stack is a list of path components, in reverse order.
+-type pathstack() :: [file:name(), ...].
+
+-opaque t() :: #{
+    root := root(),
+    queue := [_PathStack :: [file:name()]],
+    head := glob(),
+    stack := [{[pathstack()], glob()}]
+}.
+
+-type entry() :: entry_leaf() | entry_node().
+-type entry_leaf() ::
+    {leaf, file:name(), file:file_info() | {error, file:posix()}, pathstack()}.
+-type entry_node() ::
+    {node, file:name(), {error, file:posix()}, pathstack()}.
+
+-spec new(root(), glob()) ->
+    t().
+new(Root, Glob) ->
+    #{
+        root => Root,
+        queue => [[]],
+        head => Glob,
+        stack => []
+    }.
+
+-spec next(t()) ->
+    {entry(), t()} | none.
+next(It = #{queue := [PathStack | Rest], head := []}) ->
+    {emit(PathStack, It), It#{queue => Rest}};
+next(It = #{queue := [PathStack | Rest], head := [Pat | _], root := Root}) ->
+    Filepath = mk_filepath(PathStack),
+    case emqx_ft_fs_util:list_dir(filename:join(Root, Filepath)) of
+        {ok, Filenames} ->
+            Sorted = lists:sort(Filenames),
+            Matches = [[Fn | PathStack] || Fn <- Sorted, matches_glob(Pat, Fn, [Fn | PathStack])],
+            ItNext = windup(It),
+            next(ItNext#{queue => Matches});
+        {error, _} = Error ->
+            {{node, Filepath, Error, PathStack}, It#{queue => Rest}}
+    end;
+next(It = #{queue := []}) ->
+    unwind(It).
+
+windup(It = #{queue := [_ | Rest], head := [Pat | Glob], stack := Stack}) ->
+    % NOTE
+    % Preserve unfinished paths and glob in the stack, so that we can resume traversal
+    % when the lower levels of the tree are exhausted.
+    It#{
+        head => Glob,
+        stack => [{Rest, [Pat | Glob]} | Stack]
+    }.
+
+unwind(It = #{stack := [{Queue, Glob} | StackRest]}) ->
+    % NOTE
+    % Resume traversal of unfinished paths from the upper levels of the tree.
+    next(It#{
+        queue => Queue,
+        head => Glob,
+        stack => StackRest
+    });
+unwind(#{stack := []}) ->
+    none.
+
+emit(PathStack, #{root := Root}) ->
+    Filepath = mk_filepath(PathStack),
+    case emqx_ft_fs_util:read_info(filename:join(Root, Filepath)) of
+        {ok, Fileinfo} ->
+            {leaf, Filepath, Fileinfo, PathStack};
+        {error, _} = Error ->
+            {leaf, Filepath, Error, PathStack}
+    end.
+
+mk_filepath([]) ->
+    "";
+mk_filepath(PathStack) ->
+    filename:join(lists:reverse(PathStack)).
+
+matches_glob('*', _, _) ->
+    true;
+matches_glob(FilterFun, Filename, _PathStack) when is_function(FilterFun, 1) ->
+    FilterFun(Filename);
+matches_glob(FilterFun, Filename, PathStack) when is_function(FilterFun, 2) ->
+    FilterFun(Filename, PathStack).
+
+%%
+
+-spec next_leaf(t()) ->
+    {entry_leaf(), t()} | none.
+next_leaf(It) ->
+    case next(It) of
+        {{leaf, _, _, _} = Leaf, ItNext} ->
+            {Leaf, ItNext};
+        {{node, _Filename, _Error, _PathStack}, ItNext} ->
+            % NOTE
+            % Intentionally skipping intermediate traversal errors here, for simplicity.
+            next_leaf(ItNext);
+        none ->
+            none
+    end.
+
+%%
+
+-spec seek([file:name()], root(), glob()) ->
+    t().
+seek(PathSeek, Root, Glob) ->
+    SeekGlob = mk_seek_glob(PathSeek, Glob),
+    SeekStack = lists:reverse(PathSeek),
+    case next_leaf(new(Root, SeekGlob)) of
+        {{leaf, _Filepath, _Info, SeekStack}, It} ->
+            fixup_glob(Glob, It);
+        {{leaf, _Filepath, _Info, Successor}, It = #{queue := Queue}} ->
+            fixup_glob(Glob, It#{queue => [Successor | Queue]});
+        none ->
+            none(Root)
+    end.
+
+mk_seek_glob(PathSeek, Glob) ->
+    % NOTE
+    % The seek glob is a glob that skips all the nodes / leaves that are lexicographically
+    % smaller than the seek path. For example, if the seek path is ["a", "b", "c"], and
+    % the glob is ['*', '*', '*', '*'], then the seek glob is:
+    % [ fun(Path) -> Path >= ["a"] end,
+    %   fun(Path) -> Path >= ["a", "b"] end,
+    %   fun(Path) -> Path >= ["a", "b", "c"] end,
+    %   '*'
+    % ]
+    L = min(length(PathSeek), length(Glob)),
+    merge_glob([mk_seek_pat(lists:sublist(PathSeek, N)) || N <- lists:seq(1, L)], Glob).
+
+mk_seek_pat(PathSeek) ->
+    % NOTE
+    % The `PathStack` and `PathSeek` are of the same length here.
+    fun(_Filename, PathStack) -> lists:reverse(PathStack) >= PathSeek end.
+
+merge_glob([Pat | SeekRest], [PatOrig | Rest]) ->
+    [merge_pat(Pat, PatOrig) | merge_glob(SeekRest, Rest)];
+merge_glob([], [PatOrig | Rest]) ->
+    [PatOrig | merge_glob([], Rest)];
+merge_glob([], []) ->
+    [].
+
+merge_pat(Pat, PatOrig) ->
+    fun(Filename, PathStack) ->
+        Pat(Filename, PathStack) andalso matches_glob(PatOrig, Filename, PathStack)
+    end.
+
+fixup_glob(Glob, It = #{head := [], stack := Stack}) ->
+    % NOTE
+    % Restoring original glob through the stack. Strictly speaking, this is not usually
+    % necessary, it's a kind of optimization.
+    fixup_glob(Glob, lists:reverse(Stack), It#{stack => []}).
+
+fixup_glob(Glob = [_ | Rest], [{Queue, _} | StackRest], It = #{stack := Stack}) ->
+    fixup_glob(Rest, StackRest, It#{stack => [{Queue, Glob} | Stack]});
+fixup_glob(Rest, [], It) ->
+    It#{head => Rest}.
+
+%%
+
+-spec fold(fun((entry(), Acc) -> Acc), Acc, t()) ->
+    Acc.
+fold(FoldFun, Acc, It) ->
+    case next(It) of
+        {Entry, ItNext} ->
+            fold(FoldFun, FoldFun(Entry, Acc), ItNext);
+        none ->
+            Acc
+    end.
+
+%% NOTE
+%% Passing negative `N` is allowed, in which case the iterator will be exhausted
+%% completely, like in `fold/3`.
+-spec fold_n(fun((entry(), Acc) -> Acc), Acc, t(), _N :: integer()) ->
+    {Acc, {more, t()} | none}.
+fold_n(_FoldFun, Acc, It, 0) ->
+    {Acc, {more, It}};
+fold_n(FoldFun, Acc, It, N) ->
+    case next(It) of
+        {Entry, ItNext} ->
+            fold_n(FoldFun, FoldFun(Entry, Acc), ItNext, N - 1);
+        none ->
+            {Acc, none}
+    end.
+
+%%
+
+-spec none(root()) ->
+    t().
+none(Root) ->
+    % NOTE
+    % The _none_ iterator is a valid iterator, but it will never yield any entries.
+    #{
+        root => Root,
+        queue => [],
+        head => [],
+        stack => []
+    }.

+ 21 - 45
apps/emqx_ft/src/emqx_ft_fs_util.erl

@@ -25,18 +25,16 @@
 
 -export([read_decode_file/2]).
 -export([read_info/1]).
+-export([list_dir/1]).
 
 -export([fold/4]).
 
--type glob() :: ['*' | globfun()].
--type globfun() ::
-    fun((_Filename :: file:name()) -> boolean()).
 -type foldfun(Acc) ::
     fun(
         (
             _Filepath :: file:name(),
-            _Info :: file:file_info() | {error, _IoError},
-            _Stack :: [file:name()],
+            _Info :: file:file_info() | {error, file:posix()},
+            _Stack :: emqx_ft_fs_iterator:pathstack(),
             Acc
         ) -> Acc
     ).
@@ -153,46 +151,8 @@ read_info(AbsPath) ->
     % Be aware that this function is occasionally mocked in `emqx_ft_fs_util_SUITE`.
     file:read_link_info(AbsPath, [{time, posix}, raw]).
 
--spec fold(foldfun(Acc), Acc, _Root :: file:name(), glob()) ->
-    Acc.
-fold(Fun, Acc, Root, Glob) ->
-    fold(Fun, Acc, [], Root, Glob, []).
-
-fold(Fun, AccIn, Path, Root, [Glob | Rest], Stack) when Glob == '*' orelse is_function(Glob) ->
-    case list_dir(filename:join(Root, Path)) of
-        {ok, Filenames} ->
-            lists:foldl(
-                fun(FN, Acc) ->
-                    case matches_glob(Glob, FN) of
-                        true when Path == [] ->
-                            fold(Fun, Acc, FN, Root, Rest, [FN | Stack]);
-                        true ->
-                            fold(Fun, Acc, filename:join(Path, FN), Root, Rest, [FN | Stack]);
-                        false ->
-                            Acc
-                    end
-                end,
-                AccIn,
-                Filenames
-            );
-        {error, enotdir} ->
-            AccIn;
-        {error, Reason} ->
-            Fun(Path, {error, Reason}, Stack, AccIn)
-    end;
-fold(Fun, AccIn, Filepath, Root, [], Stack) ->
-    case ?MODULE:read_info(filename:join(Root, Filepath)) of
-        {ok, Info} ->
-            Fun(Filepath, Info, Stack, AccIn);
-        {error, Reason} ->
-            Fun(Filepath, {error, Reason}, Stack, AccIn)
-    end.
-
-matches_glob('*', _) ->
-    true;
-matches_glob(FilterFun, Filename) when is_function(FilterFun) ->
-    FilterFun(Filename).
-
+-spec list_dir(file:name_all()) ->
+    {ok, [file:name()]} | {error, file:posix() | badarg}.
 list_dir(AbsPath) ->
     case ?MODULE:read_info(AbsPath) of
         {ok, #file_info{type = directory}} ->
@@ -202,3 +162,19 @@ list_dir(AbsPath) ->
         {error, Reason} ->
             {error, Reason}
     end.
+
+-spec fold(foldfun(Acc), Acc, _Root :: file:name(), emqx_ft_fs_iterator:glob()) ->
+    Acc.
+fold(FoldFun, Acc, Root, Glob) ->
+    fold(FoldFun, Acc, emqx_ft_fs_iterator:new(Root, Glob)).
+
+fold(FoldFun, Acc, It) ->
+    case emqx_ft_fs_iterator:next(It) of
+        {{node, _Path, {error, enotdir}, _PathStack}, ItNext} ->
+            fold(FoldFun, Acc, ItNext);
+        {{_Type, Path, Info, PathStack}, ItNext} ->
+            AccNext = FoldFun(Path, Info, PathStack, Acc),
+            fold(FoldFun, AccNext, ItNext);
+        none ->
+            Acc
+    end.

+ 25 - 4
apps/emqx_ft/src/emqx_ft_storage.erl

@@ -25,6 +25,7 @@
         assemble/2,
 
         files/0,
+        files/1,
 
         with_storage_type/2,
         with_storage_type/3,
@@ -36,12 +37,27 @@
 -type storage() :: emqx_config:config().
 
 -export_type([assemble_callback/0]).
+
+-export_type([query/1]).
+-export_type([page/2]).
 -export_type([file_info/0]).
 -export_type([export_data/0]).
 -export_type([reader/0]).
 
 -type assemble_callback() :: fun((ok | {error, term()}) -> any()).
 
+-type query(Cursor) ::
+    #{transfer => emqx_ft:transfer()}
+    | #{
+        limit => non_neg_integer(),
+        following => Cursor
+    }.
+
+-type page(Item, Cursor) :: #{
+    items := [Item],
+    cursor => Cursor
+}.
+
 -type file_info() :: #{
     transfer := emqx_ft:transfer(),
     name := file:name(),
@@ -71,8 +87,8 @@
 -callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes()) ->
     ok | {async, pid()} | {error, term()}.
 
--callback files(storage()) ->
-    {ok, [file_info()]} | {error, term()}.
+-callback files(storage(), query(Cursor)) ->
+    {ok, page(file_info(), Cursor)} | {error, term()}.
 
 %%--------------------------------------------------------------------
 %% API
@@ -105,9 +121,14 @@ assemble(Transfer, Size) ->
     with_storage(assemble, [Transfer, Size]).
 
 -spec files() ->
-    {ok, [file_info()]} | {error, term()}.
+    {ok, page(file_info(), _)} | {error, term()}.
 files() ->
-    with_storage(files, []).
+    files(#{}).
+
+-spec files(query(Cursor)) ->
+    {ok, page(file_info(), Cursor)} | {error, term()}.
+files(Query) ->
+    with_storage(files, [Query]).
 
 -spec with_storage(atom() | function()) -> any().
 with_storage(Fun) ->

+ 8 - 8
apps/emqx_ft/src/emqx_ft_storage_exporter.erl

@@ -17,7 +17,7 @@
 %% Filesystem storage exporter
 %%
 %% This is conceptually a part of the Filesystem storage backend that defines
-%% how and where complete tranfers are assembled into files and stored.
+%% how and where complete transfers are assembled into files and stored.
 
 -module(emqx_ft_storage_exporter).
 
@@ -28,7 +28,7 @@
 -export([discard/1]).
 
 %% Listing API
--export([list/1]).
+-export([list/2]).
 
 %% Lifecycle API
 -export([on_config_update/2]).
@@ -70,8 +70,8 @@
 -callback discard(ExportSt :: export_st()) ->
     ok | {error, _Reason}.
 
--callback list(storage()) ->
-    {ok, [emqx_ft_storage:file_info()]} | {error, _Reason}.
+-callback list(exporter_conf(), emqx_ft_storage:query(Cursor)) ->
+    {ok, emqx_ft_storage:page(emqx_ft_storage:file_info(), Cursor)} | {error, _Reason}.
 
 %% Lifecycle callbacks
 
@@ -133,11 +133,11 @@ complete(#{mod := ExporterMod, st := ExportSt, hash := Hash, filemeta := Filemet
 discard(#{mod := ExporterMod, st := ExportSt}) ->
     ExporterMod:discard(ExportSt).
 
--spec list(storage()) ->
-    {ok, [emqx_ft_storage:file_info()]} | {error, _Reason}.
-list(Storage) ->
+-spec list(storage(), emqx_ft_storage:query(Cursor)) ->
+    {ok, emqx_ft_storage:page(emqx_ft_storage:file_info(), Cursor)} | {error, _Reason}.
+list(Storage, Query) ->
     {ExporterMod, ExporterOpts} = exporter(Storage),
-    ExporterMod:list(ExporterOpts).
+    ExporterMod:list(ExporterOpts, Query).
 
 %% Lifecycle
 

+ 208 - 93
apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl

@@ -37,12 +37,23 @@
 %% Internal API for RPC
 -export([list_local/1]).
 -export([list_local/2]).
+-export([list_local_transfer/2]).
 -export([start_reader/3]).
 
-% TODO
-% -export([list/2]).
+-export([list/2]).
+
+-export_type([export_st/0]).
+-export_type([options/0]).
+
+-type options() :: #{
+    root => file:name(),
+    _ => _
+}.
+
+-type query() :: emqx_ft_storage:query(cursor()).
+-type page(T) :: emqx_ft_storage:page(T, cursor()).
+-type cursor() :: iodata().
 
--type options() :: _TODO.
 -type transfer() :: emqx_ft:transfer().
 -type filemeta() :: emqx_ft:filemeta().
 -type exportinfo() :: emqx_ft_storage:file_info().
@@ -70,22 +81,6 @@
 %% 2 symbols = at most 256 directories on the second level
 -define(BUCKET2_LEN, 2).
 
--define(SLOG_UNEXPECTED(RelFilepath, Fileinfo, Options),
-    ?SLOG(notice, "filesystem_object_unexpected", #{
-        relpath => RelFilepath,
-        fileinfo => Fileinfo,
-        options => Options
-    })
-).
-
--define(SLOG_INACCESSIBLE(RelFilepath, Reason, Options),
-    ?SLOG(warning, "filesystem_object_inaccessible", #{
-        relpath => RelFilepath,
-        reason => Reason,
-        options => Options
-    })
-).
-
 %%--------------------------------------------------------------------
 %% Exporter behaviour
 %%--------------------------------------------------------------------
@@ -162,33 +157,33 @@ update(_OldOptions, _NewOptions) -> ok.
 %% Internal API
 %%--------------------------------------------------------------------
 
--spec list_local(options(), transfer()) ->
-    {ok, [exportinfo(), ...]} | {error, file_error()}.
-list_local(Options, Transfer) ->
-    TransferRoot = mk_absdir(Options, Transfer, result),
-    case
-        emqx_ft_fs_util:fold(
-            fun
-                (_Path, {error, Reason}, [], []) ->
-                    {error, Reason};
-                (_Path, Fileinfo = #file_info{type = regular}, [Filename | _], Acc) ->
-                    RelFilepath = filename:join(mk_result_reldir(Transfer) ++ [Filename]),
-                    Info = mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo),
-                    [Info | Acc];
-                (RelFilepath, Fileinfo = #file_info{}, _, Acc) ->
-                    ?SLOG_UNEXPECTED(RelFilepath, Fileinfo, Options),
-                    Acc;
-                (RelFilepath, {error, Reason}, _, Acc) ->
-                    ?SLOG_INACCESSIBLE(RelFilepath, Reason, Options),
-                    Acc
-            end,
-            [],
-            TransferRoot,
-            [fun filter_manifest/1]
-        )
-    of
+-type local_query() :: emqx_ft_storage:query({transfer(), file:name()}).
+
+-spec list_local_transfer(options(), transfer()) ->
+    {ok, [exportinfo()]} | {error, file_error()}.
+list_local_transfer(Options, Transfer) ->
+    It = emqx_ft_fs_iterator:new(
+        mk_absdir(Options, Transfer, result),
+        [fun filter_manifest/1]
+    ),
+    Result = emqx_ft_fs_iterator:fold(
+        fun
+            ({leaf, _Path, Fileinfo = #file_info{type = regular}, [Filename | _]}, Acc) ->
+                RelFilepath = filename:join(mk_result_reldir(Transfer) ++ [Filename]),
+                Info = mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo),
+                [Info | Acc];
+            ({node, _Path, {error, Reason}, []}, []) ->
+                {error, Reason};
+            (Entry, Acc) ->
+                ok = log_invalid_entry(Options, Entry),
+                Acc
+        end,
+        [],
+        It
+    ),
+    case Result of
         Infos = [_ | _] ->
-            {ok, Infos};
+            {ok, lists:reverse(Infos)};
         [] ->
             {error, enoent};
         {error, Reason} ->
@@ -196,9 +191,17 @@ list_local(Options, Transfer) ->
     end.
 
 -spec list_local(options()) ->
-    {ok, #{transfer() => [exportinfo(), ...]}}.
+    {ok, [exportinfo()]} | {error, file_error()}.
 list_local(Options) ->
-    Pattern = [
+    list_local(Options, #{}).
+
+-spec list_local(options(), local_query()) ->
+    {ok, [exportinfo()]} | {error, file_error()}.
+list_local(Options, #{transfer := Transfer}) ->
+    list_local_transfer(Options, Transfer);
+list_local(Options, #{} = Query) ->
+    Root = get_storage_root(Options),
+    Glob = [
         _Bucket1 = '*',
         _Bucket2 = '*',
         _Rest = '*',
@@ -206,16 +209,30 @@ list_local(Options) ->
         _FileId = '*',
         fun filter_manifest/1
     ],
-    Root = get_storage_root(Options),
-    {ok,
-        emqx_ft_fs_util:fold(
-            fun(RelFilepath, Info, Stack, Acc) ->
-                read_exportinfo(Options, RelFilepath, Info, Stack, Acc)
-            end,
-            [],
-            Root,
-            Pattern
-        )}.
+    It =
+        case Query of
+            #{following := Cursor} ->
+                emqx_ft_fs_iterator:seek(mk_path_seek(Cursor), Root, Glob);
+            #{} ->
+                emqx_ft_fs_iterator:new(Root, Glob)
+        end,
+    % NOTE
+    % In the rare case when some transfer contain more than one file, the paging mechanic
+    % here may skip over some files, when the cursor is transfer-only.
+    Limit = maps:get(limit, Query, -1),
+    {Exports, _} = emqx_ft_fs_iterator:fold_n(
+        fun(Entry, Acc) -> read_exportinfo(Options, Entry, Acc) end,
+        [],
+        It,
+        Limit
+    ),
+    {ok, Exports}.
+
+mk_path_seek(#{transfer := Transfer, name := Filename}) ->
+    mk_result_reldir(Transfer) ++ [Filename];
+mk_path_seek(#{transfer := Transfer}) ->
+    % NOTE: Any bitstring is greater than any list.
+    mk_result_reldir(Transfer) ++ [<<>>].
 
 %%--------------------------------------------------------------------
 %% Helpers
@@ -227,16 +244,21 @@ filter_manifest(?MANIFEST) ->
 filter_manifest(Filename) ->
     ?MANIFEST =/= string:find(Filename, ?MANIFEST, trailing).
 
-read_exportinfo(Options, RelFilepath, Fileinfo = #file_info{type = regular}, Stack, Acc) ->
-    [Filename, FileId, ClientId | _] = Stack,
+read_exportinfo(
+    Options,
+    {leaf, RelFilepath, Fileinfo = #file_info{type = regular}, [Filename, FileId, ClientId | _]},
+    Acc
+) ->
+    % NOTE
+    % There might be more than one file for a single transfer (though
+    % extremely bad luck is needed for that, e.g. concurrent assemblers with
+    % different filemetas from different nodes). This might be unexpected for a
+    % client given the current protocol, yet might be helpful in the future.
     Transfer = dirnames_to_transfer(ClientId, FileId),
     Info = mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo),
     [Info | Acc];
-read_exportinfo(Options, RelFilepath, Fileinfo = #file_info{}, _Stack, Acc) ->
-    ?SLOG_UNEXPECTED(RelFilepath, Fileinfo, Options),
-    Acc;
-read_exportinfo(Options, RelFilepath, {error, Reason}, _Stack, Acc) ->
-    ?SLOG_INACCESSIBLE(RelFilepath, Reason, Options),
+read_exportinfo(Options, Entry, Acc) ->
+    ok = log_invalid_entry(Options, Entry),
     Acc.
 
 mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo) ->
@@ -268,6 +290,19 @@ try_read_filemeta(Filepath, Info) ->
 mk_export_uri(RelFilepath) ->
     emqx_ft_storage_exporter_fs_api:mk_export_uri(node(), RelFilepath).
 
+log_invalid_entry(Options, {_Type, RelFilepath, Fileinfo = #file_info{}, _Stack}) ->
+    ?SLOG(notice, "filesystem_object_unexpected", #{
+        relpath => RelFilepath,
+        fileinfo => Fileinfo,
+        options => Options
+    });
+log_invalid_entry(Options, {_Type, RelFilepath, {error, Reason}, _Stack}) ->
+    ?SLOG(warning, "filesystem_object_inaccessible", #{
+        relpath => RelFilepath,
+        reason => Reason,
+        options => Options
+    }).
+
 -spec start_reader(options(), file:name(), _Caller :: pid()) ->
     {ok, reader()} | {error, enoent}.
 start_reader(Options, RelFilepath, CallerPid) ->
@@ -282,32 +317,112 @@ start_reader(Options, RelFilepath, CallerPid) ->
 
 %%
 
--spec list(options()) ->
-    {ok, [exportinfo(), ...]} | {error, [{node(), _Reason}]}.
-list(_Options) ->
-    Nodes = mria_mnesia:running_nodes(),
-    Replies = emqx_ft_storage_exporter_fs_proto_v1:list_exports(Nodes),
-    {Results, Errors} = lists:foldl(
-        fun
-            ({_Node, {ok, {ok, Files}}}, {Acc, Errors}) ->
-                {Files ++ Acc, Errors};
-            ({Node, {ok, {error, _} = Error}}, {Acc, Errors}) ->
-                {Acc, [{Node, Error} | Errors]};
-            ({Node, Error}, {Acc, Errors}) ->
-                {Acc, [{Node, Error} | Errors]}
-        end,
-        {[], []},
-        lists:zip(Nodes, Replies)
-    ),
-    length(Errors) > 0 andalso
-        ?SLOG(warning, #{msg => "list_remote_exports_failed", errors => Errors}),
-    case Results of
-        [_ | _] ->
-            {ok, Results};
-        [] when Errors =:= [] ->
-            {ok, Results};
-        [] ->
-            {error, Errors}
+-spec list(options(), query()) ->
+    {ok, page(exportinfo())} | {error, [{node(), _Reason}]}.
+list(_Options, Query = #{transfer := _Transfer}) ->
+    case list(Query) of
+        #{items := Exports = [_ | _]} ->
+            {ok, #{items => Exports}};
+        #{items := [], errors := NodeErrors} ->
+            {error, NodeErrors}
+    end;
+list(_Options, Query) ->
+    Result = list(Query),
+    case Result of
+        #{errors := NodeErrors} ->
+            ?SLOG(warning, "list_exports_errors", #{
+                query => Query,
+                errors => NodeErrors
+            });
+        #{} ->
+            ok
+    end,
+    case Result of
+        #{items := Exports, cursor := Cursor} ->
+            {ok, #{items => lists:reverse(Exports), cursor => encode_cursor(Cursor)}};
+        #{items := Exports} ->
+            {ok, #{items => lists:reverse(Exports)}}
+    end.
+
+list(QueryIn) ->
+    {Nodes, NodeQuery} = decode_query(QueryIn, lists:sort(mria_mnesia:running_nodes())),
+    list_nodes(NodeQuery, Nodes, #{items => []}).
+
+list_nodes(Query, Nodes = [Node | Rest], Acc) ->
+    case emqx_ft_storage_exporter_fs_proto_v1:list_exports([Node], Query) of
+        [{ok, Result}] ->
+            list_accumulate(Result, Query, Nodes, Acc);
+        [Failure] ->
+            ?SLOG(warning, #{
+                msg => "list_remote_exports_failed",
+                node => Node,
+                query => Query,
+                failure => Failure
+            }),
+            list_next(Query, Rest, Acc)
+    end;
+list_nodes(_Query, [], Acc) ->
+    Acc.
+
+list_accumulate({ok, Exports}, Query, [Node | Rest], Acc = #{items := EAcc}) ->
+    NExports = length(Exports),
+    AccNext = Acc#{items := Exports ++ EAcc},
+    case Query of
+        #{limit := Limit} when NExports < Limit ->
+            list_next(Query#{limit => Limit - NExports}, Rest, AccNext);
+        #{limit := _} ->
+            AccNext#{cursor => mk_cursor(Node, Exports)};
+        #{} ->
+            list_next(Query, Rest, AccNext)
+    end;
+list_accumulate({error, Reason}, Query, [Node | Rest], Acc) ->
+    EAcc = maps:get(errors, Acc, []),
+    list_next(Query, Rest, Acc#{errors => [{Node, Reason} | EAcc]}).
+
+list_next(Query, Nodes, Acc) ->
+    list_nodes(maps:remove(following, Query), Nodes, Acc).
+
+decode_query(Query = #{following := Cursor}, Nodes) ->
+    {Node, NodeCursor} = decode_cursor(Cursor),
+    {skip_query_nodes(Node, Nodes), Query#{following => NodeCursor}};
+decode_query(Query = #{}, Nodes) ->
+    {Nodes, Query}.
+
+skip_query_nodes(CNode, Nodes) ->
+    lists:dropwhile(fun(N) -> N < CNode end, Nodes).
+
+mk_cursor(Node, [_Last = #{transfer := Transfer, name := Name} | _]) ->
+    {Node, #{transfer => Transfer, name => Name}}.
+
+encode_cursor({Node, #{transfer := {ClientId, FileId}, name := Name}}) ->
+    emqx_utils_json:encode(#{
+        <<"n">> => Node,
+        <<"cid">> => ClientId,
+        <<"fid">> => FileId,
+        <<"fn">> => unicode:characters_to_binary(Name)
+    }).
+
+decode_cursor(Cursor) ->
+    try
+        #{
+            <<"n">> := NodeIn,
+            <<"cid">> := ClientId,
+            <<"fid">> := FileId,
+            <<"fn">> := NameIn
+        } = emqx_utils_json:decode(Cursor),
+        true = is_binary(ClientId),
+        true = is_binary(FileId),
+        Node = binary_to_existing_atom(NodeIn),
+        Name = unicode:characters_to_list(NameIn),
+        true = is_list(Name),
+        {Node, #{transfer => {ClientId, FileId}, name => Name}}
+    catch
+        error:{_, invalid_json} ->
+            error({badarg, cursor});
+        error:{badmatch, _} ->
+            error({badarg, cursor});
+        error:badarg ->
+            error({badarg, cursor})
     end.
 
 %%
@@ -352,9 +467,9 @@ mk_result_reldir(Transfer = {ClientId, FileId}) ->
         BucketRest/binary
     >> = binary:encode_hex(Hash),
     [
-        Bucket1,
-        Bucket2,
-        BucketRest,
+        binary_to_list(Bucket1),
+        binary_to_list(Bucket2),
+        binary_to_list(BucketRest),
         emqx_ft_fs_util:escape_filename(ClientId),
         emqx_ft_fs_util:escape_filename(FileId)
     ].

+ 3 - 3
apps/emqx_ft/src/emqx_ft_storage_exporter_fs_proxy.erl

@@ -21,15 +21,15 @@
 -module(emqx_ft_storage_exporter_fs_proxy).
 
 -export([
-    list_exports_local/0,
+    list_exports_local/1,
     read_export_file_local/2
 ]).
 
-list_exports_local() ->
+list_exports_local(Query) ->
     emqx_ft_storage:with_storage_type(local, fun(Storage) ->
         case emqx_ft_storage_exporter:exporter(Storage) of
             {emqx_ft_storage_exporter_fs, Options} ->
-                emqx_ft_storage_exporter_fs:list_local(Options)
+                emqx_ft_storage_exporter_fs:list_local(Options, Query)
             % NOTE
             % This case clause is currently deemed unreachable by dialyzer.
             % InvalidExporter ->

+ 81 - 35
apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl

@@ -23,7 +23,7 @@
 -export([write/2]).
 -export([complete/2]).
 -export([discard/1]).
--export([list/1]).
+-export([list/2]).
 
 -export([
     start/1,
@@ -43,6 +43,10 @@
     filemeta => filemeta()
 }.
 
+-type query() :: emqx_ft_storage:query(cursor()).
+-type page(T) :: emqx_ft_storage:page(T, cursor()).
+-type cursor() :: iodata().
+
 -type export_st() :: #{
     pid := pid(),
     filemeta := filemeta(),
@@ -92,10 +96,10 @@ complete(#{pid := Pid} = _ExportSt, _Checksum) ->
 discard(#{pid := Pid} = _ExportSt) ->
     emqx_s3_uploader:abort(Pid).
 
--spec list(options()) ->
-    {ok, [exportinfo()]} | {error, term()}.
-list(Options) ->
-    emqx_s3:with_client(?S3_PROFILE_ID, fun(Client) -> list(Client, Options) end).
+-spec list(options(), query()) ->
+    {ok, page(exportinfo())} | {error, term()}.
+list(Options, Query) ->
+    emqx_s3:with_client(?S3_PROFILE_ID, fun(Client) -> list(Client, Options, Query) end).
 
 %%--------------------------------------------------------------------
 %% Exporter behaviour (lifecycle)
@@ -117,12 +121,11 @@ update(_OldOptions, NewOptions) ->
 %% Internal functions
 %% -------------------------------------------------------------------
 
-s3_key({ClientId, FileId} = _Transfer, #{name := Filename}) ->
-    filename:join([
-        emqx_ft_fs_util:escape_filename(ClientId),
-        emqx_ft_fs_util:escape_filename(FileId),
-        Filename
-    ]).
+s3_key(Transfer, #{name := Filename}) ->
+    s3_prefix(Transfer) ++ "/" ++ Filename.
+
+s3_prefix({ClientId, FileId} = _Transfer) ->
+    emqx_ft_fs_util:escape_filename(ClientId) ++ "/" ++ emqx_ft_fs_util:escape_filename(FileId).
 
 s3_headers({ClientId, FileId}, Filemeta) ->
     #{
@@ -137,54 +140,97 @@ s3_headers({ClientId, FileId}, Filemeta) ->
 s3_header_filemeta(Filemeta) ->
     emqx_utils_json:encode(emqx_ft:encode_filemeta(Filemeta), [force_utf8, uescape]).
 
-list(Client, Options) ->
-    case list_key_info(Client, Options) of
-        {ok, KeyInfos} ->
-            MaybeExportInfos = lists:map(
-                fun(KeyInfo) -> key_info_to_exportinfo(Client, KeyInfo, Options) end, KeyInfos
-            ),
-            ExportInfos = [ExportInfo || {ok, ExportInfo} <- MaybeExportInfos],
-            {ok, ExportInfos};
+list(Client, _Options, #{transfer := Transfer}) ->
+    case list_key_info(Client, [{prefix, s3_prefix(Transfer)}, {max_keys, ?S3_LIST_LIMIT}]) of
+        {ok, {Exports, _Marker}} ->
+            {ok, #{items => Exports}};
+        {error, _Reason} = Error ->
+            Error
+    end;
+list(Client, _Options, Query) ->
+    Limit = maps:get(limit, Query, undefined),
+    Marker = emqx_maybe:apply(fun decode_cursor/1, maps:get(cursor, Query, undefined)),
+    case list_pages(Client, Marker, Limit, []) of
+        {ok, {Exports, undefined}} ->
+            {ok, #{items => Exports}};
+        {ok, {Exports, NextMarker}} ->
+            {ok, #{items => Exports, cursor => encode_cursor(NextMarker)}};
+        {error, _Reason} = Error ->
+            Error
+    end.
+
+list_pages(Client, Marker, Limit, Acc) ->
+    MaxKeys = min(?S3_LIST_LIMIT, Limit),
+    ListOptions = [{marker, Marker} || Marker =/= undefined],
+    case list_key_info(Client, [{max_keys, MaxKeys} | ListOptions]) of
+        {ok, {Exports, NextMarker}} ->
+            list_accumulate(Client, Limit, NextMarker, [Exports | Acc]);
         {error, _Reason} = Error ->
             Error
     end.
 
-list_key_info(Client, Options) ->
-    list_key_info(Client, Options, _Marker = [], _Acc = []).
+list_accumulate(_Client, _Limit, undefined, Acc) ->
+    {ok, {flatten_pages(Acc), undefined}};
+list_accumulate(Client, undefined, Marker, Acc) ->
+    list_pages(Client, Marker, undefined, Acc);
+list_accumulate(Client, Limit, Marker, Acc = [Exports | _]) ->
+    case Limit - length(Exports) of
+        0 ->
+            {ok, {flatten_pages(Acc), Marker}};
+        Left ->
+            list_pages(Client, Marker, Left, Acc)
+    end.
+
+flatten_pages(Pages) ->
+    lists:append(lists:reverse(Pages)).
 
-list_key_info(Client, Options, Marker, Acc) ->
-    ListOptions = [{max_keys, ?S3_LIST_LIMIT}] ++ Marker,
+list_key_info(Client, ListOptions) ->
     case emqx_s3_client:list(Client, ListOptions) of
         {ok, Result} ->
             ?SLOG(debug, #{msg => "list_key_info", result => Result}),
             KeyInfos = proplists:get_value(contents, Result, []),
-            case proplists:get_value(is_truncated, Result, false) of
-                true ->
-                    NewMarker = next_marker(KeyInfos),
-                    list_key_info(Client, Options, NewMarker, [KeyInfos | Acc]);
-                false ->
-                    {ok, lists:append(lists:reverse([KeyInfos | Acc]))}
-            end;
+            Exports = lists:filtermap(
+                fun(KeyInfo) -> key_info_to_exportinfo(Client, KeyInfo) end, KeyInfos
+            ),
+            Marker =
+                case proplists:get_value(is_truncated, Result, false) of
+                    true ->
+                        next_marker(KeyInfos);
+                    false ->
+                        undefined
+                end,
+            {ok, {Exports, Marker}};
         {error, _Reason} = Error ->
             Error
     end.
 
+encode_cursor(Key) ->
+    unicode:characters_to_binary(Key).
+
+decode_cursor(Cursor) ->
+    case unicode:characters_to_list(Cursor) of
+        Key when is_list(Key) ->
+            Key;
+        _ ->
+            error({badarg, cursor})
+    end.
+
 next_marker(KeyInfos) ->
-    [{marker, proplists:get_value(key, lists:last(KeyInfos))}].
+    proplists:get_value(key, lists:last(KeyInfos)).
 
-key_info_to_exportinfo(Client, KeyInfo, _Options) ->
+key_info_to_exportinfo(Client, KeyInfo) ->
     Key = proplists:get_value(key, KeyInfo),
     case parse_transfer_and_name(Key) of
         {ok, {Transfer, Name}} ->
-            {ok, #{
+            {true, #{
                 transfer => Transfer,
                 name => unicode:characters_to_binary(Name),
                 uri => emqx_s3_client:uri(Client, Key),
                 timestamp => datetime_to_epoch_second(proplists:get_value(last_modified, KeyInfo)),
                 size => proplists:get_value(size, KeyInfo)
             }};
-        {error, _Reason} = Error ->
-            Error
+        {error, _Reason} ->
+            false
     end.
 
 -define(EPOCH_START, 62167219200).

+ 3 - 3
apps/emqx_ft/src/emqx_ft_storage_fs.erl

@@ -45,7 +45,7 @@
 -export([get_subdir/2]).
 -export([get_subdir/3]).
 
--export([files/1]).
+-export([files/2]).
 
 -export([on_config_update/2]).
 
@@ -217,8 +217,8 @@ assemble(Storage, Transfer, Size) ->
 
 %%
 
-files(Storage) ->
-    emqx_ft_storage_exporter:list(Storage).
+files(Storage, Query) ->
+    emqx_ft_storage_exporter:list(Storage, Query).
 
 %%
 

+ 8 - 5
apps/emqx_ft/src/proto/emqx_ft_storage_exporter_fs_proto_v1.erl

@@ -20,7 +20,7 @@
 
 -export([introduced_in/0]).
 
--export([list_exports/1]).
+-export([list_exports/2]).
 -export([read_export_file/3]).
 
 -include_lib("emqx/include/bpapi.hrl").
@@ -28,14 +28,17 @@
 introduced_in() ->
     "5.0.17".
 
--spec list_exports([node()]) ->
-    emqx_rpc:erpc_multicall([emqx_ft_storage:file_info()]).
-list_exports(Nodes) ->
+-spec list_exports([node()], emqx_ft_storage:query(_LocalCursor)) ->
+    emqx_rpc:erpc_multicall(
+        {ok, [emqx_ft_storage:file_info()]}
+        | {error, file:posix() | disabled | {invalid_storage_type, _}}
+    ).
+list_exports(Nodes, Query) ->
     erpc:multicall(
         Nodes,
         emqx_ft_storage_exporter_fs_proxy,
         list_exports_local,
-        []
+        [Query]
     ).
 
 -spec read_export_file(node(), file:name(), pid()) ->

+ 1 - 1
apps/emqx_ft/test/emqx_ft_SUITE.erl

@@ -671,7 +671,7 @@ encode_meta(Meta) ->
     emqx_utils_json:encode(emqx_ft:encode_filemeta(Meta)).
 
 list_files(ClientId) ->
-    {ok, Files} = emqx_ft_storage:files(),
+    {ok, #{items := Files}} = emqx_ft_storage:files(),
     [File || File = #{transfer := {CId, _}} <- Files, CId == ClientId].
 
 read_export(#{path := AbsFilepath}) ->

+ 147 - 18
apps/emqx_ft/test/emqx_ft_api_SUITE.erl

@@ -22,11 +22,19 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("stdlib/include/assert.hrl").
 
--include_lib("emqx/include/asserts.hrl").
-
 -import(emqx_dashboard_api_test_helpers, [host/0, uri/1]).
 
-all() -> emqx_common_test_helpers:all(?MODULE).
+all() ->
+    [
+        {group, single},
+        {group, cluster}
+    ].
+
+groups() ->
+    [
+        {single, [], emqx_common_test_helpers:all(?MODULE)},
+        {cluster, [], emqx_common_test_helpers:all(?MODULE)}
+    ].
 
 init_per_suite(Config) ->
     ok = emqx_mgmt_api_test_util:init_suite(
@@ -38,6 +46,41 @@ end_per_suite(_Config) ->
     ok = emqx_mgmt_api_test_util:end_suite([emqx_ft, emqx_conf]),
     ok.
 
+init_per_group(Group = cluster, Config) ->
+    Cluster = mk_cluster_specs(Config),
+    ct:pal("Starting ~p", [Cluster]),
+    Nodes = [
+        emqx_common_test_helpers:start_slave(Name, Opts#{join_to => node()})
+     || {Name, Opts} <- Cluster
+    ],
+    [{group, Group}, {cluster_nodes, Nodes} | Config];
+init_per_group(Group, Config) ->
+    [{group, Group} | Config].
+
+end_per_group(cluster, Config) ->
+    ok = lists:foreach(
+        fun emqx_ft_test_helpers:stop_additional_node/1,
+        ?config(cluster_nodes, Config)
+    );
+end_per_group(_Group, _Config) ->
+    ok.
+
+mk_cluster_specs(Config) ->
+    Specs = [
+        {core, emqx_ft_api_SUITE1, #{listener_ports => [{tcp, 2883}]}},
+        {core, emqx_ft_api_SUITE2, #{listener_ports => [{tcp, 3883}]}}
+    ],
+    CommOpts = [
+        {env, [{emqx, boot_modules, [broker, listeners]}]},
+        {apps, [emqx_ft]},
+        {conf, [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]},
+        {env_handler, emqx_ft_test_helpers:env_handler(Config)}
+    ],
+    emqx_common_test_helpers:emqx_cluster(
+        Specs,
+        CommOpts
+    ).
+
 init_per_testcase(Case, Config) ->
     [{tc, Case} | Config].
 end_per_testcase(_Case, _Config) ->
@@ -47,30 +90,46 @@ end_per_testcase(_Case, _Config) ->
 %% Tests
 %%--------------------------------------------------------------------
 
-t_list_ready_transfers(Config) ->
+t_list_files(Config) ->
     ClientId = client_id(Config),
+    FileId = <<"f1">>,
 
-    ok = emqx_ft_test_helpers:upload_file(ClientId, <<"f1">>, "f1", <<"data">>),
+    Node = lists:last(cluster(Config)),
+    ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node),
 
     {ok, 200, #{<<"files">> := Files}} =
-        request(get, uri(["file_transfer", "files"]), fun json/1),
+        request_json(get, uri(["file_transfer", "files"])),
+
+    ?assertMatch(
+        [#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}],
+        [File || File = #{<<"clientid">> := CId} <- Files, CId == ClientId]
+    ),
+
+    {ok, 200, #{<<"files">> := FilesTransfer}} =
+        request_json(get, uri(["file_transfer", "files", ClientId, FileId])),
 
-    ?assertInclude(
-        #{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>},
-        Files
+    ?assertMatch(
+        [#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}],
+        FilesTransfer
+    ),
+
+    ?assertMatch(
+        {ok, 404, #{<<"code">> := <<"FILES_NOT_FOUND">>}},
+        request_json(get, uri(["file_transfer", "files", ClientId, <<"no-such-file">>]))
     ).
 
 t_download_transfer(Config) ->
     ClientId = client_id(Config),
+    FileId = <<"f1">>,
 
-    ok = emqx_ft_test_helpers:upload_file(ClientId, <<"f1">>, "f1", <<"data">>),
+    Node = lists:last(cluster(Config)),
+    ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node),
 
     ?assertMatch(
         {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
-        request(
+        request_json(
             get,
-            uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>}),
-            fun json/1
+            uri(["file_transfer", "file"]) ++ query(#{fileref => FileId})
         )
     ),
 
@@ -80,7 +139,7 @@ t_download_transfer(Config) ->
             get,
             uri(["file_transfer", "file"]) ++
                 query(#{
-                    fileref => <<"f1">>,
+                    fileref => FileId,
                     node => <<"nonode@nohost">>
                 })
         )
@@ -99,7 +158,7 @@ t_download_transfer(Config) ->
     ),
 
     {ok, 200, #{<<"files">> := [File]}} =
-        request(get, uri(["file_transfer", "files"]), fun json/1),
+        request_json(get, uri(["file_transfer", "files", ClientId, FileId])),
 
     {ok, 200, Response} = request(get, host() ++ maps:get(<<"uri">>, File)),
 
@@ -108,20 +167,85 @@ t_download_transfer(Config) ->
         Response
     ).
 
+t_list_files_paging(Config) ->
+    ClientId = client_id(Config),
+    NFiles = 20,
+    Nodes = cluster(Config),
+    Uploads = [
+        {mk_file_id("file:", N), mk_file_name(N), pick(N, Nodes)}
+     || N <- lists:seq(1, NFiles)
+    ],
+    ok = lists:foreach(
+        fun({FileId, Name, Node}) ->
+            ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, Name, <<"data">>, Node)
+        end,
+        Uploads
+    ),
+
+    ?assertMatch(
+        {ok, 200, #{<<"files">> := [_, _, _], <<"cursor">> := _}},
+        request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 3}))
+    ),
+
+    {ok, 200, #{<<"files">> := Files}} =
+        request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100})),
+
+    ?assert(length(Files) >= NFiles),
+
+    ?assertNotMatch(
+        {ok, 200, #{<<"cursor">> := _}},
+        request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100}))
+    ),
+
+    ?assertMatch(
+        {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
+        request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 0}))
+    ),
+
+    ?assertMatch(
+        {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
+        request_json(
+            get,
+            uri(["file_transfer", "files"]) ++ query(#{following => <<"whatsthat!?">>})
+        )
+    ),
+
+    PageThrough = fun PageThrough(Query, Acc) ->
+        case request_json(get, uri(["file_transfer", "files"]) ++ query(Query)) of
+            {ok, 200, #{<<"files">> := FilesPage, <<"cursor">> := Cursor}} ->
+                PageThrough(Query#{following => Cursor}, Acc ++ FilesPage);
+            {ok, 200, #{<<"files">> := FilesPage}} ->
+                Acc ++ FilesPage
+        end
+    end,
+
+    ?assertEqual(Files, PageThrough(#{limit => 1}, [])),
+    ?assertEqual(Files, PageThrough(#{limit => 8}, [])),
+    ?assertEqual(Files, PageThrough(#{limit => NFiles}, [])).
+
 %%--------------------------------------------------------------------
 %% Helpers
 %%--------------------------------------------------------------------
 
+cluster(Config) ->
+    [node() | proplists:get_value(cluster_nodes, Config, [])].
+
 client_id(Config) ->
-    atom_to_binary(?config(tc, Config), utf8).
+    iolist_to_binary(io_lib:format("~s.~s", [?config(group, Config), ?config(tc, Config)])).
+
+mk_file_id(Prefix, N) ->
+    iolist_to_binary([Prefix, integer_to_list(N)]).
+
+mk_file_name(N) ->
+    "file." ++ integer_to_list(N).
 
 request(Method, Url) ->
     emqx_mgmt_api_test_util:request(Method, Url, []).
 
-request(Method, Url, Decoder) when is_function(Decoder) ->
+request_json(Method, Url) ->
     case emqx_mgmt_api_test_util:request(Method, Url, []) of
         {ok, Code, Body} ->
-            {ok, Code, Decoder(Body)};
+            {ok, Code, json(Body)};
         Otherwise ->
             Otherwise
     end.
@@ -138,7 +262,12 @@ uri_encode(T) ->
 
 to_list(A) when is_atom(A) ->
     atom_to_list(A);
+to_list(A) when is_integer(A) ->
+    integer_to_list(A);
 to_list(B) when is_binary(B) ->
     binary_to_list(B);
 to_list(L) when is_list(L) ->
     L.
+
+pick(N, List) ->
+    lists:nth(1 + (N rem length(List)), List).

+ 1 - 1
apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl

@@ -240,7 +240,7 @@ list_exports(Config) ->
 
 list_exports(Config, Transfer) ->
     {emqx_ft_storage_exporter_fs, Options} = exporter(Config),
-    emqx_ft_storage_exporter_fs:list_local(Options, Transfer).
+    emqx_ft_storage_exporter_fs:list_local_transfer(Options, Transfer).
 
 exporter(Config) ->
     emqx_ft_storage_exporter:exporter(storage(Config)).

+ 102 - 11
apps/emqx_ft/test/emqx_ft_fs_util_SUITE.erl

@@ -34,7 +34,7 @@ t_fold_single_level(Config) ->
             {"c", #file_info{type = directory}, ["c"]},
             {"d", #file_info{type = directory}, ["d"]}
         ],
-        sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*']))
+        sort(fold(fun cons/4, [], Root, ['*']))
     ).
 
 t_fold_multi_level(Config) ->
@@ -45,7 +45,7 @@ t_fold_multi_level(Config) ->
             {"a/b/foo/Я", #file_info{type = regular}, ["Я", "foo", "b", "a"]},
             {"d/e/baz/needle", #file_info{type = regular}, ["needle", "baz", "e", "d"]}
         ],
-        sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*', '*']))
+        sort(fold(fun cons/4, [], Root, ['*', '*', '*', '*']))
     ),
     ?assertMatch(
         [
@@ -53,32 +53,32 @@ t_fold_multi_level(Config) ->
             {"c/bar/中文", #file_info{type = regular}, ["中文", "bar", "c"]},
             {"d/e/baz", #file_info{type = directory}, ["baz", "e", "d"]}
         ],
-        sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*']))
+        sort(fold(fun cons/4, [], Root, ['*', '*', '*']))
     ).
 
 t_fold_no_glob(Config) ->
     Root = ?config(data_dir, Config),
     ?assertMatch(
         [{"", #file_info{type = directory}, []}],
-        sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, []))
+        sort(fold(fun cons/4, [], Root, []))
     ).
 
 t_fold_glob_too_deep(Config) ->
     Root = ?config(data_dir, Config),
     ?assertMatch(
         [],
-        sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*', '*', '*']))
+        sort(fold(fun cons/4, [], Root, ['*', '*', '*', '*', '*']))
     ).
 
 t_fold_invalid_root(Config) ->
     Root = ?config(data_dir, Config),
     ?assertMatch(
         [],
-        sort(emqx_ft_fs_util:fold(fun cons/4, [], filename:join([Root, "a", "link"]), ['*']))
+        sort(fold(fun cons/4, [], filename:join([Root, "a", "link"]), ['*']))
     ),
     ?assertMatch(
         [],
-        sort(emqx_ft_fs_util:fold(fun cons/4, [], filename:join([Root, "d", "haystack"]), ['*']))
+        sort(fold(fun cons/4, [], filename:join([Root, "d", "haystack"]), ['*']))
     ).
 
 t_fold_filter_unicode(Config) ->
@@ -88,13 +88,13 @@ t_fold_filter_unicode(Config) ->
             {"a/b/foo/42", #file_info{type = regular}, ["42", "foo", "b", "a"]},
             {"d/e/baz/needle", #file_info{type = regular}, ["needle", "baz", "e", "d"]}
         ],
-        sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*', fun is_latin1/1]))
+        sort(fold(fun cons/4, [], Root, ['*', '*', '*', fun is_latin1/1]))
     ),
     ?assertMatch(
         [
             {"a/b/foo/Я", #file_info{type = regular}, ["Я", "foo", "b", "a"]}
         ],
-        sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*', is_not(fun is_latin1/1)]))
+        sort(fold(fun cons/4, [], Root, ['*', '*', '*', is_not(fun is_latin1/1)]))
     ).
 
 t_fold_filter_levels(Config) ->
@@ -104,7 +104,7 @@ t_fold_filter_levels(Config) ->
             {"a/b/foo", #file_info{type = directory}, ["foo", "b", "a"]},
             {"d/e/baz", #file_info{type = directory}, ["baz", "e", "d"]}
         ],
-        sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, [fun is_letter/1, fun is_letter/1, '*']))
+        sort(fold(fun cons/4, [], Root, [fun is_letter/1, fun is_letter/1, '*']))
     ).
 
 t_fold_errors(Config) ->
@@ -128,11 +128,99 @@ t_fold_errors(Config) ->
             {"c/link", {error, enotsup}, ["link", "c"]},
             {"d/e/baz/needle", {error, ebusy}, ["needle", "baz", "e", "d"]}
         ],
-        sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*', '*']))
+        sort(fold(fun cons/4, [], Root, ['*', '*', '*', '*']))
+    ).
+
+t_seek_fold(Config) ->
+    Root = ?config(data_dir, Config),
+    ?assertMatch(
+        [
+            {leaf, "a/b/foo/42", #file_info{type = regular}, ["42", "foo", "b", "a"]},
+            {leaf, "a/b/foo/Я", #file_info{type = regular}, ["Я", "foo", "b", "a"]},
+            {leaf, "d/e/baz/needle", #file_info{type = regular}, ["needle", "baz", "e", "d"]}
+            | _Nodes
+        ],
+        sort(
+            emqx_ft_fs_iterator:fold(
+                fun cons/2,
+                [],
+                emqx_ft_fs_iterator:seek(["a", "a"], Root, ['*', '*', '*', '*'])
+            )
+        )
+    ),
+    ?assertMatch(
+        [
+            {leaf, "a/b/foo/Я", #file_info{type = regular}, ["Я", "foo", "b", "a"]},
+            {leaf, "d/e/baz/needle", #file_info{type = regular}, ["needle", "baz", "e", "d"]}
+            | _Nodes
+        ],
+        sort(
+            emqx_ft_fs_iterator:fold(
+                fun cons/2,
+                [],
+                emqx_ft_fs_iterator:seek(["a", "b", "foo", "42"], Root, ['*', '*', '*', '*'])
+            )
+        )
+    ),
+    ?assertMatch(
+        [
+            {leaf, "d/e/baz/needle", #file_info{type = regular}, ["needle", "baz", "e", "d"]}
+            | _Nodes
+        ],
+        sort(
+            emqx_ft_fs_iterator:fold(
+                fun cons/2,
+                [],
+                emqx_ft_fs_iterator:seek(["c", "d", "e", "f"], Root, ['*', '*', '*', '*'])
+            )
+        )
+    ).
+
+t_seek_empty(Config) ->
+    Root = ?config(data_dir, Config),
+    ?assertEqual(
+        emqx_ft_fs_iterator:fold(
+            fun cons/2,
+            [],
+            emqx_ft_fs_iterator:new(Root, ['*', '*', '*', '*'])
+        ),
+        emqx_ft_fs_iterator:fold(
+            fun cons/2,
+            [],
+            emqx_ft_fs_iterator:seek([], Root, ['*', '*', '*', '*'])
+        )
+    ).
+
+t_seek_past_end(Config) ->
+    Root = ?config(data_dir, Config),
+    ?assertEqual(
+        none,
+        emqx_ft_fs_iterator:next(
+            emqx_ft_fs_iterator:seek(["g", "h"], Root, ['*', '*', '*', '*'])
+        )
+    ).
+
+t_seek_with_filter(Config) ->
+    Root = ?config(data_dir, Config),
+    ?assertMatch(
+        [
+            {leaf, "d/e/baz", #file_info{type = directory}, ["baz", "e", "d"]}
+            | _Nodes
+        ],
+        sort(
+            emqx_ft_fs_iterator:fold(
+                fun cons/2,
+                [],
+                emqx_ft_fs_iterator:seek(["a", "link"], Root, ['*', fun is_letter/1, '*'])
+            )
+        )
     ).
 
 %%
 
+fold(FoldFun, Acc, Root, Glob) ->
+    emqx_ft_fs_util:fold(FoldFun, Acc, Root, Glob).
+
 is_not(F) ->
     fun(X) -> not F(X) end.
 
@@ -155,5 +243,8 @@ is_letter(Filename) ->
 cons(Path, Info, Stack, Acc) ->
     [{Path, Info, Stack} | Acc].
 
+cons(Entry, Acc) ->
+    [Entry | Acc].
+
 sort(L) when is_list(L) ->
     lists:sort(L).

+ 3 - 3
apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl

@@ -27,7 +27,7 @@ all() ->
         {group, cluster}
     ].
 
--define(CLUSTER_CASES, [t_multinode_ready_transfers]).
+-define(CLUSTER_CASES, [t_multinode_exports]).
 
 groups() ->
     [
@@ -61,7 +61,7 @@ end_per_group(_Group, _Config) ->
 %% Tests
 %%--------------------------------------------------------------------
 
-t_multinode_ready_transfers(Config) ->
+t_multinode_exports(Config) ->
     Node1 = ?config(additional_node, Config),
     ok = emqx_ft_test_helpers:upload_file(<<"c/1">>, <<"f:1">>, "fn1", <<"data">>, Node1),
 
@@ -87,5 +87,5 @@ storage(Config) ->
     emqx_ft_test_helpers:local_storage(Config).
 
 list_files(Config) ->
-    {ok, Files} = emqx_ft_storage_fs:files(storage(Config)),
+    {ok, #{items := Files}} = emqx_ft_storage_fs:files(storage(Config), #{}),
     Files.

+ 3 - 0
rel/i18n/emqx_ft_api.hocon

@@ -3,6 +3,9 @@ emqx_ft_api {
 file_list.desc:
 """List all uploaded files."""
 
+file_list_transfer.desc
+"""List a file uploaded during specified transfer, identified by client id and file id."""
+
 }
 
 emqx_ft_storage_exporter_fs_api {