Browse Source

feat(ft): add streaming of file content when downloading

Ilya Averyanov 3 years atrás
parent
commit
0aefd4a8c7

+ 8 - 2
apps/emqx_ft/src/emqx_ft_api.erl

@@ -107,10 +107,16 @@ schema("/file_transfer/file") ->
 '/file_transfer/file'(get, #{query_string := Query}) ->
     case emqx_ft_storage:get_ready_transfer(Query) of
         {ok, FileData} ->
-            {200, #{<<"content-type">> => <<"application/data">>}, FileData};
+            {200,
+                #{
+                    <<"content-type">> => <<"application/data">>,
+                    <<"content-disposition">> => <<"attachment">>
+                },
+                FileData};
         {error, enoent} ->
             {404, error_msg('NOT_FOUND', <<"Not found">>)};
-        {error, _} ->
+        {error, Error} ->
+            ?SLOG(warning, #{msg => "get_ready_transfer_fail", error => Error}),
             {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
     end.
 

+ 1 - 16
apps/emqx_ft/src/emqx_ft_storage.erl

@@ -22,8 +22,6 @@
         store_segment/2,
         assemble/2,
 
-        parse_id/1,
-
         ready_transfers/0,
         get_ready_transfer/1,
 
@@ -39,7 +37,7 @@
 
 -type ready_transfer_id() :: term().
 -type ready_transfer_info() :: map().
--type ready_transfer_data() :: binary().
+-type ready_transfer_data() :: binary() | qlc:query_handle().
 
 %%--------------------------------------------------------------------
 %% Behaviour
@@ -88,19 +86,6 @@ get_ready_transfer(ReadyTransferId) ->
     Mod = mod(),
     Mod:get_ready_transfer(storage(), ReadyTransferId).
 
--spec parse_id(map()) -> {ok, ready_transfer_id()} | {error, term()}.
-parse_id(#{
-    <<"type">> := local, <<"node">> := NodeBin, <<"clientid">> := ClientId, <<"id">> := Id
-}) ->
-    case emqx_misc:safe_to_existing_atom(NodeBin) of
-        {ok, Node} ->
-            {ok, {local, Node, ClientId, Id}};
-        {error, _} ->
-            {error, {invalid_node, NodeBin}}
-    end;
-parse_id(#{}) ->
-    {error, invalid_file_id}.
-
 -spec with_storage_type(atom(), atom(), list(term())) -> any().
 with_storage_type(Type, Fun, Args) ->
     Storage = storage(),

+ 29 - 8
apps/emqx_ft/src/emqx_ft_storage_fs.erl

@@ -29,7 +29,7 @@
 -export([transfers/1]).
 
 -export([ready_transfers_local/1]).
--export([get_ready_transfer_local/2]).
+-export([get_ready_transfer_local/3]).
 
 -export([ready_transfers/1]).
 -export([get_ready_transfer/2]).
@@ -175,22 +175,43 @@ get_ready_transfer(_Storage, ReadyTransferId) ->
     case parse_ready_transfer_id(ReadyTransferId) of
         {ok, {Node, Transfer}} ->
             try
-                emqx_ft_storage_fs_proto_v1:get_ready_transfer(Node, Transfer)
+                case emqx_ft_storage_fs_proto_v1:get_ready_transfer(Node, self(), Transfer) of
+                    {ok, ReaderPid} ->
+                        {ok, emqx_ft_storage_fs_reader:table(ReaderPid)};
+                    {error, _} = Error ->
+                        Error
+                end
             catch
-                error:Error ->
-                    {error, Error};
-                C:Error ->
-                    {error, {C, Error}}
+                error:Exc:Stacktrace ->
+                    ?SLOG(warning, #{
+                        msg => "get_ready_transfer_error",
+                        node => Node,
+                        transfer => Transfer,
+                        exception => Exc,
+                        stacktrace => Stacktrace
+                    }),
+                    {error, Exc};
+                C:Exc:Stacktrace ->
+                    ?SLOG(warning, #{
+                        msg => "get_ready_transfer_fail",
+                        class => C,
+                        node => Node,
+                        transfer => Transfer,
+                        exception => Exc,
+                        stacktrace => Stacktrace
+                    }),
+                    {error, {C, Exc}}
             end;
         {error, _} = Error ->
             Error
     end.
 
-get_ready_transfer_local(Storage, Transfer) ->
+get_ready_transfer_local(Storage, CallerPid, Transfer) ->
     Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(result)),
     case file:list_dir(Dirname) of
         {ok, [Filename | _]} ->
-            file:read_file(filename:join([Dirname, Filename]));
+            FullFilename = filename:join([Dirname, Filename]),
+            emqx_ft_storage_fs_reader:start_supervised(CallerPid, FullFilename);
         {error, _} = Error ->
             Error
     end.

+ 3 - 3
apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl

@@ -23,7 +23,7 @@
 -export([
     list_local/2,
     pread_local/4,
-    get_ready_transfer_local/1,
+    get_ready_transfer_local/2,
     ready_transfers_local/0
 ]).
 
@@ -33,8 +33,8 @@ list_local(Transfer, What) ->
 pread_local(Transfer, Frag, Offset, Size) ->
     emqx_ft_storage:with_storage_type(local, pread, [Transfer, Frag, Offset, Size]).
 
-get_ready_transfer_local(Transfer) ->
-    emqx_ft_storage:with_storage_type(local, get_ready_transfer_local, [Transfer]).
+get_ready_transfer_local(CallerPid, Transfer) ->
+    emqx_ft_storage:with_storage_type(local, get_ready_transfer_local, [CallerPid, Transfer]).
 
 ready_transfers_local() ->
     emqx_ft_storage:with_storage_type(local, ready_transfers_local, []).

+ 125 - 0
apps/emqx_ft/src/emqx_ft_storage_fs_reader.erl

@@ -0,0 +1,125 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_storage_fs_reader).
+
+-behaviour(gen_server).
+
+-include_lib("emqx/include/logger.hrl").
+
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2,
+    code_change/3
+]).
+
+-export([
+    start_link/2,
+    start_link/3,
+    start_supervised/2,
+    start_supervised/3,
+    read/1
+]).
+
+-export([
+    table/1
+]).
+
+-define(DEFAULT_CHUNK_SIZE, 1024).
+
+table(ReaderPid) ->
+    NextFun = fun NextFun(Pid) ->
+        try
+            case emqx_ft_storage_fs_reader_proto_v1:read(node(Pid), Pid) of
+                eof ->
+                    [];
+                {ok, Data} ->
+                    [Data | fun() -> NextFun(Pid) end];
+                {error, Reason} ->
+                    ?SLOG(warning, #{msg => "file_read_error", reason => Reason}),
+                    []
+            end
+        catch
+            Class:Error:Stacktrace ->
+                ?SLOG(warning, #{
+                    msg => "file_read_error",
+                    class => Class,
+                    reason => Error,
+                    stacktrace => Stacktrace
+                }),
+                []
+        end
+    end,
+    qlc:table(fun() -> NextFun(ReaderPid) end, []).
+
+start_link(CallerPid, Filename) ->
+    start_link(CallerPid, Filename, ?DEFAULT_CHUNK_SIZE).
+
+start_link(CallerPid, Filename, ChunkSize) ->
+    gen_server:start_link(?MODULE, [CallerPid, Filename, ChunkSize], []).
+
+start_supervised(CallerPid, Filename) ->
+    start_supervised(CallerPid, Filename, ?DEFAULT_CHUNK_SIZE).
+
+start_supervised(CallerPid, Filename, ChunkSize) ->
+    emqx_ft_storage_fs_reader_sup:start_child(CallerPid, Filename, ChunkSize).
+
+read(Pid) ->
+    gen_server:call(Pid, read).
+
+init([CallerPid, Filename, ChunkSize]) ->
+    true = link(CallerPid),
+    case file:open(Filename, [read, raw, binary]) of
+        {ok, File} ->
+            {ok, #{
+                filename => Filename,
+                file => File,
+                chunk_size => ChunkSize
+            }};
+        {error, Reason} ->
+            {stop, Reason}
+    end.
+
+handle_call(read, _From, #{file := File, chunk_size := ChunkSize} = State) ->
+    case file:read(File, ChunkSize) of
+        {ok, Data} ->
+            ?SLOG(warning, #{msg => "read", bytes => byte_size(Data)}),
+            {reply, {ok, Data}, State};
+        eof ->
+            ?SLOG(warning, #{msg => "read", eof => true}),
+            {stop, normal, eof, State};
+        {error, Reason} = Error ->
+            {stop, Reason, Error, State}
+    end;
+handle_call(Msg, _From, State) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, State}.
+
+handle_info(Msg, State) ->
+    ?SLOG(warning, #{msg => "unexpected_message", info_msg => Msg}),
+    {noreply, State}.
+
+handle_cast(Msg, State) ->
+    ?SLOG(warning, #{msg => "unexpected_message", case_msg => Msg}),
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.

+ 44 - 0
apps/emqx_ft/src/emqx_ft_storage_fs_reader_sup.erl

@@ -0,0 +1,44 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_storage_fs_reader_sup).
+
+-behaviour(supervisor).
+
+-export([
+    init/1,
+    start_link/0,
+    start_child/3
+]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+start_child(CallerPid, Filename, ChunkSize) ->
+    Childspec = #{
+        id => {CallerPid, Filename},
+        start => {emqx_ft_storage_fs_reader, start_link, [CallerPid, Filename, ChunkSize]},
+        restart => temporary
+    },
+    supervisor:start_child(?MODULE, Childspec).
+
+init(_) ->
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 10,
+        period => 1000
+    },
+    {ok, {SupFlags, []}}.

+ 10 - 1
apps/emqx_ft/src/emqx_ft_sup.erl

@@ -43,6 +43,15 @@ init([]) ->
         modules => [emqx_ft_assembler_sup]
     },
 
+    FileReaderSup = #{
+        id => emqx_ft_storage_fs_reader_sup,
+        start => {emqx_ft_storage_fs_reader_sup, start_link, []},
+        restart => permanent,
+        shutdown => infinity,
+        type => supervisor,
+        modules => [emqx_ft_storage_fs_reader_sup]
+    },
+
     Responder = #{
         id => emqx_ft_responder,
         start => {emqx_ft_responder, start_link, []},
@@ -52,5 +61,5 @@ init([]) ->
         modules => [emqx_ft_responder]
     },
 
-    ChildSpecs = [Responder, AssemblerSup],
+    ChildSpecs = [Responder, AssemblerSup, FileReaderSup],
     {ok, {SupFlags, ChildSpecs}}.

+ 4 - 4
apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl

@@ -24,7 +24,7 @@
 -export([multilist/3]).
 -export([pread/5]).
 -export([ready_transfers/1]).
--export([get_ready_transfer/2]).
+-export([get_ready_transfer/3]).
 
 -type offset() :: emqx_ft:offset().
 -type transfer() :: emqx_ft:transfer().
@@ -56,9 +56,9 @@ pread(Node, Transfer, Frag, Offset, Size) ->
 ready_transfers(Nodes) ->
     erpc:multicall(Nodes, emqx_ft_storage_fs_proxy, ready_transfers_local, []).
 
--spec get_ready_transfer(node(), emqx_ft_storage:ready_transfer_id()) ->
+-spec get_ready_transfer(node(), pid(), emqx_ft_storage:ready_transfer_id()) ->
     {ok, emqx_ft_storage:ready_transfer_data()}
     | {error, term()}
     | no_return().
-get_ready_transfer(Node, ReadyTransferId) ->
-    erpc:call(Node, emqx_ft_storage_fs_proxy, get_ready_transfer_local, [ReadyTransferId]).
+get_ready_transfer(Node, CallerPid, ReadyTransferId) ->
+    erpc:call(Node, emqx_ft_storage_fs_proxy, get_ready_transfer_local, [CallerPid, ReadyTransferId]).

+ 33 - 0
apps/emqx_ft/src/proto/emqx_ft_storage_fs_reader_proto_v1.erl

@@ -0,0 +1,33 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_storage_fs_reader_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([introduced_in/0]).
+
+-export([read/2]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.0.17".
+
+-spec read(node(), pid()) ->
+    {ok, binary()} | eof | {error, term()} | no_return().
+read(Node, Pid) ->
+    erpc:call(Node, emqx_ft_storage_fs_reader, read, [Pid]).

+ 1 - 1
mix.exs

@@ -58,7 +58,7 @@ defmodule EMQXUmbrella.MixProject do
       {:ekka, github: "emqx/ekka", tag: "0.14.6", override: true},
       {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
       {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true},
-      {:minirest, github: "emqx/minirest", tag: "1.3.8", override: true},
+      {:minirest, github: "emqx/minirest", tag: "1.3.9", override: true},
       {:ecpool, github: "emqx/ecpool", tag: "0.5.3", override: true},
       {:replayq, github: "emqx/replayq", tag: "0.3.7", override: true},
       {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},

+ 1 - 1
rebar.config

@@ -65,9 +65,9 @@
     , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.6"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
     , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
-    , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.8"}}}
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.3"}}}
     , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}}
+    , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.9"}}}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
     , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.5"}}}
     , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.5"}}}