Browse Source

feat(ft): add proxy module for emqx_ft_storage_fs

Ilya Averyanov 3 years ago
parent
commit
197ce32669

+ 12 - 1
apps/emqx_ft/src/emqx_ft.erl

@@ -189,7 +189,7 @@ on_fin(PacketId, Msg, FileId, Checksum) ->
             %% We have new fin packet
             ok ->
                 Callback = callback(FinPacketKey, FileId),
-                case emqx_ft_storage:assemble(transfer(Msg, FileId), Callback) of
+                case assemble(transfer(Msg, FileId), Callback) of
                     %% Assembling started, packet will be acked by the callback or the responder
                     {ok, _} ->
                         undefined;
@@ -214,6 +214,17 @@ on_fin(PacketId, Msg, FileId, Checksum) ->
                 undefined
         end.
 
+assemble(Transfer, Callback) ->
+    try
+        emqx_ft_storage:assemble(Transfer, Callback)
+    catch
+        C:E:S ->
+            ?SLOG(warning, #{
+                msg => "file_assemble_failed", class => C, reason => E, stacktrace => S
+            }),
+            {error, {internal_error, E}}
+    end.
+
 callback({ChanPid, PacketId} = Key, _FileId) ->
     fun(Result) ->
         case emqx_ft_responder:unregister(Key) of

+ 2 - 20
apps/emqx_ft/src/emqx_ft_storage_fs.erl

@@ -28,10 +28,8 @@
 
 -export([transfers/1]).
 
--export([pread_local/4]).
--export([list_local/2]).
--export([ready_transfers_local/0, ready_transfers_local/1]).
--export([get_ready_transfer_local/1, get_ready_transfer_local/2]).
+-export([ready_transfers_local/1]).
+-export([get_ready_transfer_local/2]).
 
 -export([ready_transfers/1]).
 -export([get_ready_transfer/2]).
@@ -173,16 +171,6 @@ pread(_Storage, _Transfer, Frag, Offset, Size) ->
 assemble(Storage, Transfer, Callback) ->
     emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback).
 
--spec list_local(transfer(), fragment | result) ->
-    {ok, [filefrag()]} | {error, term()}.
-list_local(Transfer, What) ->
-    emqx_ft_storage:with_storage_type(local, list, [Transfer, What]).
-
--spec pread_local(transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
-    {ok, [filefrag()]} | {error, term()}.
-pread_local(Transfer, Frag, Offset, Size) ->
-    emqx_ft_storage:with_storage_type(local, pread, [Transfer, Frag, Offset, Size]).
-
 get_ready_transfer(_Storage, ReadyTransferId) ->
     case parse_ready_transfer_id(ReadyTransferId) of
         {ok, {Node, Transfer}} ->
@@ -198,9 +186,6 @@ get_ready_transfer(_Storage, ReadyTransferId) ->
             Error
     end.
 
-get_ready_transfer_local(Transfer) ->
-    emqx_ft_storage:with_storage_type(local, get_ready_transfer_local, [Transfer]).
-
 get_ready_transfer_local(Storage, Transfer) ->
     Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(result)),
     case file:list_dir(Dirname) of
@@ -223,9 +208,6 @@ ready_transfers(_Storage) ->
     ?SLOG(warning, #{msg => "ready_transfers", failures => BadResults}),
     {ok, [File || {ok, Files} <- GoodResults, File <- Files]}.
 
-ready_transfers_local() ->
-    emqx_ft_storage:with_storage_type(local, ready_transfers_local, []).
-
 ready_transfers_local(Storage) ->
     {ok, Transfers} = transfers(Storage),
     lists:filtermap(

+ 40 - 0
apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl

@@ -0,0 +1,40 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% This methods are called via rpc by `emqx_ft_storage_fs`
+%% They populate the call with actual storage which may be configured differently
+%% on a concrete node.
+
+-module(emqx_ft_storage_fs_proxy).
+
+-export([
+    list_local/2,
+    pread_local/4,
+    get_ready_transfer_local/1,
+    ready_transfers_local/0
+]).
+
+list_local(Transfer, What) ->
+    emqx_ft_storage:with_storage_type(local, list, [Transfer, What]).
+
+pread_local(Transfer, Frag, Offset, Size) ->
+    emqx_ft_storage:with_storage_type(local, pread, [Transfer, Frag, Offset, Size]).
+
+get_ready_transfer_local(Transfer) ->
+    emqx_ft_storage:with_storage_type(local, get_ready_transfer_local, [Transfer]).
+
+ready_transfers_local() ->
+    emqx_ft_storage:with_storage_type(local, ready_transfers_local, []).

+ 9 - 8
apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl

@@ -36,28 +36,29 @@ introduced_in() ->
     "5.0.17".
 
 -spec list(node(), transfer(), fragment | result) ->
-    {ok, [filefrag()]} | {error, term()}.
+    {ok, [filefrag()]} | {error, term()} | no_return().
 list(Node, Transfer, What) ->
-    erpc:call(Node, emqx_ft_storage_fs, list_local, [Transfer, What]).
+    erpc:call(Node, emqx_ft_storage_fs_proxy, list_local, [Transfer, What]).
 
 -spec multilist([node()], transfer(), fragment | result) ->
     emqx_rpc:erpc_multicall({ok, [filefrag()]} | {error, term()}).
 multilist(Nodes, Transfer, What) ->
-    erpc:multicall(Nodes, emqx_ft_storage_fs, list_local, [Transfer, What]).
+    erpc:multicall(Nodes, emqx_ft_storage_fs_proxy, list_local, [Transfer, What]).
 
 -spec pread(node(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
-    {ok, [filefrag()]} | {error, term()}.
+    {ok, [filefrag()]} | {error, term()} | no_return().
 pread(Node, Transfer, Frag, Offset, Size) ->
-    erpc:call(Node, emqx_ft_storage_fs, pread_local, [Transfer, Frag, Offset, Size]).
+    erpc:call(Node, emqx_ft_storage_fs_proxy, pread_local, [Transfer, Frag, Offset, Size]).
 
 -spec ready_transfers([node()]) ->
     {ok, [{emqx_ft_storage:ready_transfer_id(), emqx_ft_storage:ready_transfer_info()}]}
     | {error, term()}.
 ready_transfers(Nodes) ->
-    erpc:multicall(Nodes, emqx_ft_storage_fs, ready_transfers_local, []).
+    erpc:multicall(Nodes, emqx_ft_storage_fs_proxy, ready_transfers_local, []).
 
 -spec get_ready_transfer(node(), emqx_ft_storage:ready_transfer_id()) ->
     {ok, emqx_ft_storage:ready_transfer_data()}
-    | {error, term()}.
+    | {error, term()}
+    | no_return().
 get_ready_transfer(Node, ReadyTransferId) ->
-    erpc:call(Node, emqx_ft_storage_fs, get_ready_transfer_local, [ReadyTransferId]).
+    erpc:call(Node, emqx_ft_storage_fs_proxy, get_ready_transfer_local, [ReadyTransferId]).