Просмотр исходного кода

feat(ft-fs): make `list` / `read` more generic

And usable in wider contexts as a consequence, for example querying and
fetching resulting files from remote nodes.
Andrew Mayorov 3 лет назад
Родитель
Сommit
429eeaf029

+ 3 - 3
apps/emqx_ft/src/emqx_ft_assembler.erl

@@ -66,7 +66,7 @@ init({Storage, Transfer, Callback}) ->
 
 handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) ->
     % TODO: what we do with non-transients errors here (e.g. `eacces`)?
-    {ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer),
+    {ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer, fragment),
     NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)),
     NSt = St#st{assembly = NAsm},
     case emqx_ft_assembly:status(NAsm) of
@@ -81,7 +81,7 @@ handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) ->
     end;
 handle_event(internal, _, {list_remote_fragments, Nodes}, St) ->
     % TODO: portable "storage" ref
-    Args = [St#st.storage, St#st.transfer],
+    Args = [St#st.storage, St#st.transfer, fragment],
     % TODO
     % Async would better because we would not need to wait for some lagging nodes if
     % the coverage is already complete.
@@ -121,7 +121,7 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) ->
     % this node garbage collecting the segment itself.
     Args = [St#st.storage, St#st.transfer, Segment, 0, segsize(Segment)],
     % TODO: pipelining
-    case erpc:call(Node, emqx_ft_storage_fs, read_segment, Args, ?RPC_READSEG_TIMEOUT) of
+    case erpc:call(Node, emqx_ft_storage_fs, pread, Args, ?RPC_READSEG_TIMEOUT) of
         {ok, Content} ->
             {ok, NHandle} = emqx_ft_storage_fs:write(St#st.file, Content),
             {next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])}

+ 45 - 15
apps/emqx_ft/src/emqx_ft_storage_fs.erl

@@ -20,8 +20,8 @@
 
 -export([store_filemeta/3]).
 -export([store_segment/3]).
--export([list/2]).
--export([read_segment/5]).
+-export([list/3]).
+-export([pread/5]).
 -export([assemble/3]).
 
 -export([open_file/3]).
@@ -41,13 +41,19 @@
     size := _Bytes :: non_neg_integer()
 }.
 
+% TODO naming
 -type filefrag(T) :: #{
     path := file:name(),
     timestamp := emqx_datetime:epoch_second(),
+    size := _Bytes :: non_neg_integer(),
     fragment := T
 }.
 
--type filefrag() :: filefrag({filemeta, filemeta()} | {segment, segmentinfo()}).
+-type filefrag() :: filefrag(
+    {filemeta, filemeta()}
+    | {segment, segmentinfo()}
+    | {result, #{}}
+).
 
 -define(FRAGDIR, frags).
 -define(TEMPDIR, tmp).
@@ -104,29 +110,44 @@ store_segment(Storage, Transfer, Segment = {_Offset, Content}) ->
     Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], mk_segment_filename(Segment)),
     write_file_atomic(Storage, Transfer, Filepath, Content).
 
--spec list(storage(), transfer()) ->
+-spec list(storage(), transfer(), _What :: fragment | result) ->
     % Some lower level errors? {error, notfound}?
     % Result will contain zero or only one filemeta.
-    {ok, list(filefrag())} | {error, _TODO}.
-list(Storage, Transfer) ->
-    Dirname = mk_filedir(Storage, Transfer, [?FRAGDIR]),
+    {ok, [filefrag({filemeta, filemeta()} | {segment, segmentinfo()})]} | {error, _TODO}.
+list(Storage, Transfer, What) ->
+    Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(What)),
     case file:list_dir(Dirname) of
         {ok, Filenames} ->
-            {ok, filtermap_files(fun mk_filefrag/2, Dirname, Filenames)};
+            % TODO
+            % In case of `What = result` there might be more than one file (though
+            % extremely bad luck is needed for that, e.g. concurrent assemblers with
+            % different filemetas from different nodes). This might be unexpected for a
+            % client given the current protocol, yet might be helpful in the future.
+            {ok, filtermap_files(get_filefrag_fun_for(What), Dirname, Filenames)};
         {error, enoent} ->
             {ok, []};
         {error, _} = Error ->
             Error
     end.
 
--spec read_segment(
-    storage(), transfer(), filefrag(segmentinfo()), offset(), _Size :: non_neg_integer()
-) ->
+get_subdirs_for(fragment) ->
+    [?FRAGDIR];
+get_subdirs_for(result) ->
+    [?RESULTDIR].
+
+get_filefrag_fun_for(fragment) ->
+    fun mk_filefrag/2;
+get_filefrag_fun_for(result) ->
+    fun mk_result_filefrag/2.
+
+-spec pread(storage(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
     {ok, _Content :: iodata()} | {error, _TODO}.
-read_segment(_Storage, _Transfer, Segment, Offset, Size) ->
-    Filepath = maps:get(path, Segment),
-    case file:open(Filepath, [raw, read]) of
+pread(_Storage, _Transfer, Frag, Offset, Size) ->
+    Filepath = maps:get(path, Frag),
+    case file:open(Filepath, [read, raw, binary]) of
         {ok, IoDevice} ->
+            % NOTE
+            % Reading empty file is always `eof`.
             Read = file:pread(IoDevice, Offset, Size),
             ok = file:close(IoDevice),
             case Read of
@@ -147,6 +168,8 @@ read_segment(_Storage, _Transfer, Segment, Offset, Size) ->
 assemble(Storage, Transfer, Callback) ->
     emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback).
 
+%%
+
 -type handle() :: {file:name(), io:device(), crypto:hash_state()}.
 
 -spec open_file(storage(), transfer(), filemeta()) ->
@@ -155,7 +178,7 @@ open_file(Storage, Transfer, Filemeta) ->
     Filename = maps:get(name, Filemeta),
     TempFilepath = mk_temp_filepath(Storage, Transfer, Filename),
     _ = filelib:ensure_dir(TempFilepath),
-    case file:open(TempFilepath, [write, raw]) of
+    case file:open(TempFilepath, [write, raw, binary]) of
         {ok, Handle} ->
             _ = file:truncate(Handle),
             {ok, {TempFilepath, Handle, init_checksum(Filemeta)}};
@@ -370,6 +393,12 @@ mk_filefrag(_Dirname, _Filename) ->
     % TODO this is unexpected, worth logging?
     false.
 
+mk_result_filefrag(Dirname, Filename) ->
+    % NOTE
+    % Any file in the `?RESULTDIR` subdir is currently considered the result of
+    % the file transfer.
+    mk_filefrag(Dirname, Filename, result, fun(_, _) -> {ok, #{}} end).
+
 mk_filefrag(Dirname, Filename, Tag, Fun) ->
     Filepath = filename:join(Dirname, Filename),
     % TODO error handling?
@@ -379,6 +408,7 @@ mk_filefrag(Dirname, Filename, Tag, Fun) ->
             {true, #{
                 path => Filepath,
                 timestamp => Fileinfo#file_info.mtime,
+                size => Fileinfo#file_info.size,
                 fragment => {Tag, Frag}
             }};
         {error, _Reason} ->

+ 17 - 2
apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl

@@ -71,7 +71,7 @@ t_assemble_empty_transfer(Config) ->
                 fragment := {filemeta, Meta}
             }
         ]},
-        emqx_ft_storage_fs:list(Storage, Transfer)
+        emqx_ft_storage_fs:list(Storage, Transfer, fragment)
     ),
     {ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun on_assembly_finished/1),
     {ok, Event} = ?block_until(#{?snk_kind := test_assembly_finished}),
@@ -81,6 +81,11 @@ t_assemble_empty_transfer(Config) ->
         % TODO
         file:read_file(mk_assembly_filename(Config, Transfer, Filename))
     ),
+    {ok, [Result = #{size := Size = 0}]} = emqx_ft_storage_fs:list(Storage, Transfer, result),
+    ?assertEqual(
+        {error, eof},
+        emqx_ft_storage_fs:pread(Storage, Transfer, Result, 0, Size)
+    ),
     ok.
 
 t_assemble_complete_local_transfer(Config) ->
@@ -109,7 +114,7 @@ t_assemble_complete_local_transfer(Config) ->
         end
     ),
 
-    {ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer),
+    {ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer, fragment),
     ?assertEqual((TransferSize div SegmentSize) + 1 + 1, length(Fragments)),
     ?assertEqual(
         [Meta],
@@ -122,6 +127,16 @@ t_assemble_complete_local_transfer(Config) ->
     ?assertMatch(#{result := ok}, Event),
 
     AssemblyFilename = mk_assembly_filename(Config, Transfer, Filename),
+    ?assertMatch(
+        {ok, [
+            #{
+                path := AssemblyFilename,
+                size := TransferSize,
+                fragment := {result, #{}}
+            }
+        ]},
+        emqx_ft_storage_fs:list(Storage, Transfer, result)
+    ),
     ?assertMatch(
         {ok, #file_info{type = regular, size = TransferSize}},
         file:read_file_info(AssemblyFilename)