Explorar o código

feat(ft-api): support paging in S3 storage exporter

Andrew Mayorov %!s(int64=2) %!d(string=hai) anos
pai
achega
a9866fede4
Modificáronse 1 ficheiros con 81 adicións e 35 borrados
  1. 81 35
      apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl

+ 81 - 35
apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl

@@ -23,7 +23,7 @@
 -export([write/2]).
 -export([complete/2]).
 -export([discard/1]).
--export([list/1]).
+-export([list/2]).
 
 -export([
     start/1,
@@ -43,6 +43,10 @@
     filemeta => filemeta()
 }.
 
+-type query() :: emqx_ft_storage:query(cursor()).
+-type page(T) :: emqx_ft_storage:page(T, cursor()).
+-type cursor() :: iodata().
+
 -type export_st() :: #{
     pid := pid(),
     filemeta := filemeta(),
@@ -92,10 +96,10 @@ complete(#{pid := Pid} = _ExportSt, _Checksum) ->
 discard(#{pid := Pid} = _ExportSt) ->
     emqx_s3_uploader:abort(Pid).
 
--spec list(options()) ->
-    {ok, [exportinfo()]} | {error, term()}.
-list(Options) ->
-    emqx_s3:with_client(?S3_PROFILE_ID, fun(Client) -> list(Client, Options) end).
+-spec list(options(), query()) ->
+    {ok, page(exportinfo())} | {error, term()}.
+list(Options, Query) ->
+    emqx_s3:with_client(?S3_PROFILE_ID, fun(Client) -> list(Client, Options, Query) end).
 
 %%--------------------------------------------------------------------
 %% Exporter behaviour (lifecycle)
@@ -117,12 +121,11 @@ update(_OldOptions, NewOptions) ->
 %% Internal functions
 %% -------------------------------------------------------------------
 
-s3_key({ClientId, FileId} = _Transfer, #{name := Filename}) ->
-    filename:join([
-        emqx_ft_fs_util:escape_filename(ClientId),
-        emqx_ft_fs_util:escape_filename(FileId),
-        Filename
-    ]).
+s3_key(Transfer, #{name := Filename}) ->
+    s3_prefix(Transfer) ++ "/" ++ Filename.
+
+s3_prefix({ClientId, FileId} = _Transfer) ->
+    emqx_ft_fs_util:escape_filename(ClientId) ++ "/" ++ emqx_ft_fs_util:escape_filename(FileId).
 
 s3_headers({ClientId, FileId}, Filemeta) ->
     #{
@@ -137,54 +140,97 @@ s3_headers({ClientId, FileId}, Filemeta) ->
 s3_header_filemeta(Filemeta) ->
     emqx_utils_json:encode(emqx_ft:encode_filemeta(Filemeta), [force_utf8, uescape]).
 
-list(Client, Options) ->
-    case list_key_info(Client, Options) of
-        {ok, KeyInfos} ->
-            MaybeExportInfos = lists:map(
-                fun(KeyInfo) -> key_info_to_exportinfo(Client, KeyInfo, Options) end, KeyInfos
-            ),
-            ExportInfos = [ExportInfo || {ok, ExportInfo} <- MaybeExportInfos],
-            {ok, ExportInfos};
+list(Client, _Options, #{transfer := Transfer}) ->
+    case list_key_info(Client, [{prefix, s3_prefix(Transfer)}, {max_keys, ?S3_LIST_LIMIT}]) of
+        {ok, {Exports, _Marker}} ->
+            {ok, #{items => Exports}};
+        {error, _Reason} = Error ->
+            Error
+    end;
+list(Client, _Options, Query) ->
+    Limit = maps:get(limit, Query, undefined),
+    Marker = emqx_maybe:apply(fun decode_cursor/1, maps:get(cursor, Query, undefined)),
+    case list_pages(Client, Marker, Limit, []) of
+        {ok, {Exports, undefined}} ->
+            {ok, #{items => Exports}};
+        {ok, {Exports, NextMarker}} ->
+            {ok, #{items => Exports, cursor => encode_cursor(NextMarker)}};
+        {error, _Reason} = Error ->
+            Error
+    end.
+
+list_pages(Client, Marker, Limit, Acc) ->
+    MaxKeys = min(?S3_LIST_LIMIT, Limit),
+    ListOptions = [{marker, Marker} || Marker =/= undefined],
+    case list_key_info(Client, [{max_keys, MaxKeys} | ListOptions]) of
+        {ok, {Exports, NextMarker}} ->
+            list_accumulate(Client, Limit, NextMarker, [Exports | Acc]);
         {error, _Reason} = Error ->
             Error
     end.
 
-list_key_info(Client, Options) ->
-    list_key_info(Client, Options, _Marker = [], _Acc = []).
+list_accumulate(_Client, _Limit, undefined, Acc) ->
+    {ok, {flatten_pages(Acc), undefined}};
+list_accumulate(Client, undefined, Marker, Acc) ->
+    list_pages(Client, Marker, undefined, Acc);
+list_accumulate(Client, Limit, Marker, Acc = [Exports | _]) ->
+    case Limit - length(Exports) of
+        0 ->
+            {ok, {flatten_pages(Acc), Marker}};
+        Left ->
+            list_pages(Client, Marker, Left, Acc)
+    end.
+
+flatten_pages(Pages) ->
+    lists:append(lists:reverse(Pages)).
 
-list_key_info(Client, Options, Marker, Acc) ->
-    ListOptions = [{max_keys, ?S3_LIST_LIMIT}] ++ Marker,
+list_key_info(Client, ListOptions) ->
     case emqx_s3_client:list(Client, ListOptions) of
         {ok, Result} ->
             ?SLOG(debug, #{msg => "list_key_info", result => Result}),
             KeyInfos = proplists:get_value(contents, Result, []),
-            case proplists:get_value(is_truncated, Result, false) of
-                true ->
-                    NewMarker = next_marker(KeyInfos),
-                    list_key_info(Client, Options, NewMarker, [KeyInfos | Acc]);
-                false ->
-                    {ok, lists:append(lists:reverse([KeyInfos | Acc]))}
-            end;
+            Exports = lists:filtermap(
+                fun(KeyInfo) -> key_info_to_exportinfo(Client, KeyInfo) end, KeyInfos
+            ),
+            Marker =
+                case proplists:get_value(is_truncated, Result, false) of
+                    true ->
+                        next_marker(KeyInfos);
+                    false ->
+                        undefined
+                end,
+            {ok, {Exports, Marker}};
         {error, _Reason} = Error ->
             Error
     end.
 
+encode_cursor(Key) ->
+    unicode:characters_to_binary(Key).
+
+decode_cursor(Cursor) ->
+    case unicode:characters_to_list(Cursor) of
+        Key when is_list(Key) ->
+            Key;
+        _ ->
+            error({badarg, cursor})
+    end.
+
 next_marker(KeyInfos) ->
-    [{marker, proplists:get_value(key, lists:last(KeyInfos))}].
+    proplists:get_value(key, lists:last(KeyInfos)).
 
-key_info_to_exportinfo(Client, KeyInfo, _Options) ->
+key_info_to_exportinfo(Client, KeyInfo) ->
     Key = proplists:get_value(key, KeyInfo),
     case parse_transfer_and_name(Key) of
         {ok, {Transfer, Name}} ->
-            {ok, #{
+            {true, #{
                 transfer => Transfer,
                 name => unicode:characters_to_binary(Name),
                 uri => emqx_s3_client:uri(Client, Key),
                 timestamp => datetime_to_epoch_second(proplists:get_value(last_modified, KeyInfo)),
                 size => proplists:get_value(size, KeyInfo)
             }};
-        {error, _Reason} = Error ->
-            Error
+        {error, _Reason} ->
+            false
     end.
 
 -define(EPOCH_START, 62167219200).