Просмотр исходного кода

feat(ft-fs): allow to list all transfers in storage

This is rather simplistic and thus, temporary solution.
Andrew Mayorov 3 лет назад
Родитель
Сommit
7ed06b0a2a
2 измененных файлов с 104 добавлено и 14 удалено
  1. 67 3
      apps/emqx_ft/src/emqx_ft_storage_fs.erl
  2. 37 11
      apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl

+ 67 - 3
apps/emqx_ft/src/emqx_ft_storage_fs.erl

@@ -24,6 +24,8 @@
 -export([pread/5]).
 -export([assemble/3]).
 
+-export([transfers/1]).
+
 -export([open_file/3]).
 -export([complete/4]).
 -export([write/2]).
@@ -31,16 +33,19 @@
 
 -type transfer() :: emqx_ft:transfer().
 -type offset() :: emqx_ft:offset().
-
 -type filemeta() :: emqx_ft:filemeta().
-
--type segment() :: {offset(), _Content :: binary()}.
+-type segment() :: emqx_ft:segment().
 
 -type segmentinfo() :: #{
     offset := offset(),
     size := _Bytes :: non_neg_integer()
 }.
 
+-type transferinfo() :: #{
+    status := complete | incomplete,
+    result => [filefrag({result, #{}})]
+}.
+
 % TODO naming
 -type filefrag(T) :: #{
     path := file:name(),
@@ -85,6 +90,7 @@
     % Quota? Some lower level errors?
     {ok, emqx_ft_storage:ctx()} | {error, conflict} | {error, _TODO}.
 store_filemeta(Storage, Transfer, Meta) ->
+    % TODO safeguard against bad clientids / fileids.
     Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], ?MANIFEST),
     case read_file(Filepath, fun decode_filemeta/1) of
         {ok, Meta} ->
@@ -170,6 +176,52 @@ assemble(Storage, Transfer, Callback) ->
 
 %%
 
+-spec transfers(storage()) ->
+    {ok, #{transfer() => transferinfo()}}.
+transfers(Storage) ->
+    % TODO `Continuation`
+    % There might be millions of transfers on the node, we need a protocol and
+    % storage schema to iterate through them effectively.
+    ClientIds = try_list_dir(get_storage_root(Storage)),
+    {ok,
+        lists:foldl(
+            fun(ClientId, Acc) -> transfers(Storage, ClientId, Acc) end,
+            #{},
+            ClientIds
+        )}.
+
+transfers(Storage, ClientId, AccIn) ->
+    Dirname = mk_client_filedir(Storage, ClientId),
+    case file:list_dir(Dirname) of
+        {ok, FileIds} ->
+            lists:foldl(
+                fun(FileId, Acc) ->
+                    Transfer = {filename_to_binary(ClientId), filename_to_binary(FileId)},
+                    read_transferinfo(Storage, Transfer, Acc)
+                end,
+                AccIn,
+                FileIds
+            );
+        {error, _Reason} ->
+            % TODO worth logging
+            AccIn
+    end.
+
+read_transferinfo(Storage, Transfer, Acc) ->
+    case list(Storage, Transfer, result) of
+        {ok, Result = [_ | _]} ->
+            Info = #{status => complete, result => Result},
+            Acc#{Transfer => Info};
+        {ok, []} ->
+            Info = #{status => incomplete},
+            Acc#{Transfer => Info};
+        {error, _Reason} ->
+            % TODO worth logging
+            Acc
+    end.
+
+%%
+
 -type handle() :: {file:name(), io:device(), crypto:hash_state()}.
 
 -spec open_file(storage(), transfer(), filemeta()) ->
@@ -312,9 +364,18 @@ break_segment_filename(Filename) ->
 mk_filedir(Storage, {ClientId, FileId}, SubDirs) ->
     filename:join([get_storage_root(Storage), ClientId, FileId | SubDirs]).
 
+mk_client_filedir(Storage, ClientId) ->
+    filename:join([get_storage_root(Storage), ClientId]).
+
 mk_filepath(Storage, Transfer, SubDirs, Filename) ->
     filename:join(mk_filedir(Storage, Transfer, SubDirs), Filename).
 
+try_list_dir(Dirname) ->
+    case file:list_dir(Dirname) of
+        {ok, List} -> List;
+        {error, _} -> []
+    end.
+
 get_storage_root(Storage) ->
     maps:get(root, Storage, filename:join(emqx:data_dir(), "file_transfer")).
 
@@ -421,3 +482,6 @@ read_filemeta(_Filename, Filepath) ->
 
 read_segmentinfo(Filename, _Filepath) ->
     break_segment_filename(Filename).
+
+filename_to_binary(S) when is_list(S) -> unicode:characters_to_binary(S);
+filename_to_binary(B) when is_binary(B) -> B.

+ 37 - 11
apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl

@@ -24,24 +24,31 @@
 -include_lib("kernel/include/file.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
-all() -> emqx_common_test_helpers:all(?MODULE).
+all() ->
+    [
+        t_assemble_empty_transfer,
+        t_assemble_complete_local_transfer,
+
+        % NOTE
+        % It depends on the side effects of all previous testcases.
+        t_list_transfers
+    ].
 
 init_per_suite(Config) ->
-    % {ok, Apps} = application:ensure_all_started(emqx_ft),
-    % [{suite_apps, Apps} | Config].
-    % ok = emqx_common_test_helpers:start_apps([emqx_ft]),
     Config.
 
 end_per_suite(_Config) ->
-    % lists:foreach(fun application:stop/1, lists:reverse(?config(suite_apps, Config))).
-    % ok = emqx_common_test_helpers:stop_apps([emqx_ft]),
     ok.
 
 init_per_testcase(TC, Config) ->
     ok = snabbkaffe:start_trace(),
-    Root = filename:join(["roots", TC]),
     {ok, Pid} = emqx_ft_assembler_sup:start_link(),
-    [{storage_root, Root}, {assembler_sup, Pid} | Config].
+    [
+        {storage_root, "file_transfer_root"},
+        {file_id, atom_to_binary(TC)},
+        {assembler_sup, Pid}
+        | Config
+    ].
 
 end_per_testcase(_TC, Config) ->
     ok = inspect_storage_root(Config),
@@ -51,11 +58,12 @@ end_per_testcase(_TC, Config) ->
 
 %%
 
--define(CLIENTID, <<"thatsme">>).
+-define(CLIENTID1, <<"thatsme">>).
+-define(CLIENTID2, <<"thatsnotme">>).
 
 t_assemble_empty_transfer(Config) ->
     Storage = storage(Config),
-    Transfer = {?CLIENTID, mk_fileid()},
+    Transfer = {?CLIENTID1, ?config(file_id, Config)},
     Filename = "important.pdf",
     Meta = #{
         name => Filename,
@@ -90,7 +98,7 @@ t_assemble_empty_transfer(Config) ->
 
 t_assemble_complete_local_transfer(Config) ->
     Storage = storage(Config),
-    Transfer = {?CLIENTID, mk_fileid()},
+    Transfer = {?CLIENTID2, ?config(file_id, Config)},
     Filename = "topsecret.pdf",
     TransferSize = 10000 + rand:uniform(50000),
     SegmentSize = 4096,
@@ -155,6 +163,24 @@ on_assembly_finished(Result) ->
 
 %%
 
+t_list_transfers(Config) ->
+    Storage = storage(Config),
+    ?assertMatch(
+        {ok, #{
+            {?CLIENTID1, <<"t_assemble_empty_transfer">>} := #{
+                status := complete,
+                result := [#{path := _, size := 0, fragment := {result, _}}]
+            },
+            {?CLIENTID2, <<"t_assemble_complete_local_transfer">>} := #{
+                status := complete,
+                result := [#{path := _, size := Size, fragment := {result, _}}]
+            }
+        }} when Size > 0,
+        emqx_ft_storage_fs:transfers(Storage)
+    ).
+
+%%
+
 -include_lib("kernel/include/file.hrl").
 
 inspect_storage_root(Config) ->