Просмотр исходного кода

feat(ft): add fs storage bpapi and use it in assembler

Andrew Mayorov 3 лет назад
Родитель
Сommit
92670bfe3d

+ 8 - 6
apps/emqx_ft/src/emqx_ft_assembler.erl

@@ -80,13 +80,11 @@ handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) ->
         %     {stop, Reason}
     end;
 handle_event(internal, _, {list_remote_fragments, Nodes}, St) ->
-    % TODO: portable "storage" ref
-    Args = [St#st.storage, St#st.transfer, fragment],
     % TODO
     % Async would better because we would not need to wait for some lagging nodes if
     % the coverage is already complete.
-    % TODO: BP API?
-    Results = erpc:multicall(Nodes, emqx_ft_storage_fs, list, Args, ?RPC_LIST_TIMEOUT),
+    % TODO: portable "storage" ref
+    Results = emqx_ft_storage_fs_proto_v1:multilist(Nodes, St#st.transfer, fragment),
     NodeResults = lists:zip(Nodes, Results),
     NAsm = emqx_ft_assembly:update(
         lists:foldl(
@@ -119,9 +117,8 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) ->
     % TODO
     % Currently, race is possible between getting segment info from the remote node and
     % this node garbage collecting the segment itself.
-    Args = [St#st.storage, St#st.transfer, Segment, 0, segsize(Segment)],
     % TODO: pipelining
-    case erpc:call(Node, emqx_ft_storage_fs, pread, Args, ?RPC_READSEG_TIMEOUT) of
+    case pread(Node, Segment, St) of
         {ok, Content} ->
             {ok, NHandle} = emqx_ft_storage_fs:write(St#st.file, Content),
             {next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])}
@@ -158,6 +155,11 @@ handle_event(internal, _, complete, St = #st{assembly = Asm, file = Handle, call
 % handle_cast(_Cast, St) ->
 %     {noreply, St}.
 
+pread(Node, Segment, St) when Node =:= node() ->
+    emqx_ft_storage_fs:pread(St#st.storage, St#st.transfer, Segment, 0, segsize(Segment));
+pread(Node, Segment, St) ->
+    emqx_ft_storage_fs_proto_v1:pread(Node, St#st.transfer, Segment, 0, segsize(Segment)).
+
 %%
 
 segsize(#{fragment := {segment, Info}}) ->

+ 50 - 1
apps/emqx_ft/src/emqx_ft_storage.erl

@@ -24,6 +24,14 @@
     ]
 ).
 
+-export([list_local/2]).
+-export([pread_local/4]).
+
+-export([local_transfers/0]).
+
+-type offset() :: emqx_ft:offset().
+-type transfer() :: emqx_ft:transfer().
+
 -type storage() :: emqx_config:config().
 
 -export_type([assemble_callback/0]).
@@ -63,8 +71,41 @@ assemble(Transfer, Callback) ->
     Mod = mod(),
     Mod:assemble(storage(), Transfer, Callback).
 
+%%--------------------------------------------------------------------
+%% Local FS API
+%%--------------------------------------------------------------------
+
+-type filefrag() :: emqx_ft_storage_fs:filefrag().
+-type transferinfo() :: emqx_ft_storage_fs:transferinfo().
+
+-spec list_local(transfer(), fragment | result) ->
+    {ok, [filefrag()]} | {error, term()}.
+list_local(Transfer, What) ->
+    with_local_storage(
+        fun(Mod, Storage) -> Mod:list(Storage, Transfer, What) end
+    ).
+
+-spec pread_local(transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
+    {ok, [filefrag()]} | {error, term()}.
+pread_local(Transfer, Frag, Offset, Size) ->
+    with_local_storage(
+        fun(Mod, Storage) -> Mod:pread(Storage, Transfer, Frag, Offset, Size) end
+    ).
+
+-spec local_transfers() ->
+    {ok, node(), #{transfer() => transferinfo()}} | {error, term()}.
+local_transfers() ->
+    with_local_storage(
+        fun(Mod, Storage) -> Mod:transfers(Storage) end
+    ).
+
+%%
+
 mod() ->
-    case storage() of
+    mod(storage()).
+
+mod(Storage) ->
+    case Storage of
         #{type := local} ->
             emqx_ft_storage_fs
         % emqx_ft_storage_dummy
@@ -72,3 +113,11 @@ mod() ->
 
 storage() ->
     emqx_config:get([file_transfer, storage]).
+
+with_local_storage(Fun) ->
+    case storage() of
+        #{type := local} = Storage ->
+            Fun(mod(Storage), Storage);
+        #{type := Type} ->
+            {error, {unsupported_storage_type, Type}}
+    end.

+ 4 - 0
apps/emqx_ft/src/emqx_ft_storage_fs.erl

@@ -31,6 +31,10 @@
 -export([write/2]).
 -export([discard/1]).
 
+-export_type([filefrag/1]).
+-export_type([filefrag/0]).
+-export_type([transferinfo/0]).
+
 -type transfer() :: emqx_ft:transfer().
 -type offset() :: emqx_ft:offset().
 -type filemeta() :: emqx_ft:filemeta().

+ 56 - 0
apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl

@@ -0,0 +1,56 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_storage_fs_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([introduced_in/0]).
+
+-export([list/3]).
+-export([multilist/3]).
+-export([pread/5]).
+-export([transfers/1]).
+
+-type offset() :: emqx_ft:offset().
+-type transfer() :: emqx_ft:transfer().
+-type filefrag() :: emqx_ft_storage_fs:filefrag().
+-type transferinfo() :: emqx_ft_storage_fs:transferinfo().
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.0.17".
+
+-spec list(node(), transfer(), fragment | result) ->
+    {ok, [filefrag()]} | {error, term()}.
+list(Node, Transfer, What) ->
+    erpc:call(Node, emqx_ft_storage, list_local, [Transfer, What]).
+
+-spec multilist([node()], transfer(), fragment | result) ->
+    emqx_rpc:erpc_multicall({ok, [filefrag()]} | {error, term()}).
+multilist(Nodes, Transfer, What) ->
+    erpc:multicall(Nodes, emqx_ft_storage, list_local, [Transfer, What]).
+
+-spec pread(node(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
+    {ok, [filefrag()]} | {error, term()}.
+pread(Node, Transfer, Frag, Offset, Size) ->
+    erpc:call(Node, emqx_ft_storage, pread_local, [Transfer, Frag, Offset, Size]).
+
+-spec transfers([node()]) ->
+    emqx_rpc:erpc_multicall({ok, #{transfer() => transferinfo()}} | {error, term()}).
+transfers(Nodes) ->
+    erpc:multicall(Nodes, emqx_ft_storage, local_transfers, []).