Просмотр исходного кода

feat(ft): make storage backend fully async-aware

Introduce an ad-hoc concept of tasks that need to be kicked off
manually. Rework filesystem backend to accomodate for this change.
Adapt responder logic for that "kickoff" protocol.
Andrew Mayorov 3 лет назад
Родитель
Сommit
f896fefa59

+ 121 - 56
apps/emqx_ft/src/emqx_ft.erl

@@ -35,7 +35,7 @@
     decode_filemeta/1
 ]).
 
--export([on_assemble/2]).
+-export([on_complete/4]).
 
 -export_type([
     clientid/0,
@@ -76,7 +76,8 @@
 
 -type segment() :: {offset(), _Content :: binary()}.
 
--define(ASSEMBLE_TIMEOUT, 5000).
+-define(STORE_SEGMENT_TIMEOUT, 10000).
+-define(ASSEMBLE_TIMEOUT, 60000).
 
 %%--------------------------------------------------------------------
 %% API for app
@@ -143,52 +144,59 @@ on_message_puback(PacketId, #message{topic = Topic} = Msg, _PubRes, _RC) ->
 on_file_command(PacketId, Msg, FileCommand) ->
     case string:split(FileCommand, <<"/">>, all) of
         [FileId, <<"init">>] ->
-            on_init(Msg, FileId);
+            on_init(PacketId, Msg, transfer(Msg, FileId));
         [FileId, <<"fin">>] ->
-            on_fin(PacketId, Msg, FileId, undefined);
+            on_fin(PacketId, Msg, transfer(Msg, FileId), undefined);
         [FileId, <<"fin">>, Checksum] ->
-            on_fin(PacketId, Msg, FileId, Checksum);
+            on_fin(PacketId, Msg, transfer(Msg, FileId), Checksum);
         [FileId, <<"abort">>] ->
-            on_abort(Msg, FileId);
+            on_abort(Msg, transfer(Msg, FileId));
         [FileId, OffsetBin] ->
             validate([{offset, OffsetBin}], fun([Offset]) ->
-                on_segment(Msg, FileId, Offset, undefined)
+                on_segment(PacketId, Msg, transfer(Msg, FileId), Offset, undefined)
             end);
         [FileId, OffsetBin, ChecksumBin] ->
             validate([{offset, OffsetBin}, {checksum, ChecksumBin}], fun([Offset, Checksum]) ->
-                on_segment(Msg, FileId, Offset, Checksum)
+                on_segment(PacketId, Msg, transfer(Msg, FileId), Offset, Checksum)
             end);
         _ ->
             ?RC_UNSPECIFIED_ERROR
     end.
 
-on_init(Msg, FileId) ->
+on_init(PacketId, Msg, Transfer) ->
     ?SLOG(info, #{
         msg => "on_init",
         mqtt_msg => Msg,
-        file_id => FileId
+        packet_id => PacketId,
+        transfer => Transfer
     }),
     Payload = Msg#message.payload,
+    PacketKey = {self(), PacketId},
     % %% Add validations here
     case decode_filemeta(Payload) of
         {ok, Meta} ->
-            case emqx_ft_storage:store_filemeta(transfer(Msg, FileId), Meta) of
-                ok ->
-                    ?RC_SUCCESS;
-                {error, Reason} ->
-                    ?SLOG(warning, #{
-                        msg => "store_filemeta_failed",
-                        mqtt_msg => Msg,
-                        file_id => FileId,
-                        reason => Reason
-                    }),
-                    ?RC_UNSPECIFIED_ERROR
-            end;
+            Callback = fun(Result) ->
+                ?MODULE:on_complete("store_filemeta", PacketKey, Transfer, Result)
+            end,
+            with_responder(PacketKey, Callback, ?STORE_SEGMENT_TIMEOUT, fun() ->
+                case store_filemeta(Transfer, Meta) of
+                    % Stored, ack through the responder right away
+                    ok ->
+                        emqx_ft_responder:ack(PacketKey, ok);
+                    % Storage operation started, packet will be acked by the responder
+                    {async, Pid} ->
+                        ok = emqx_ft_responder:kickoff(PacketKey, Pid),
+                        ok;
+                    %% Storage operation failed, ack through the responder
+                    {error, _} = Error ->
+                        emqx_ft_responder:ack(PacketKey, Error)
+                end
+            end);
         {error, Reason} ->
             ?SLOG(error, #{
                 msg => "on_init: invalid filemeta",
                 mqtt_msg => Msg,
-                file_id => FileId,
+                transfer => Transfer,
                 reason => Reason
             }),
             ?RC_UNSPECIFIED_ERROR
@@ -198,48 +206,69 @@ on_abort(_Msg, _FileId) ->
     %% TODO
     ?RC_SUCCESS.
 
-on_segment(Msg, FileId, Offset, Checksum) ->
+on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
     ?SLOG(info, #{
         msg => "on_segment",
         mqtt_msg => Msg,
-        file_id => FileId,
+        packet_id => PacketId,
+        transfer => Transfer,
         offset => Offset,
         checksum => Checksum
     }),
     %% TODO: handle checksum
     Payload = Msg#message.payload,
     Segment = {Offset, Payload},
+    PacketKey = {self(), PacketId},
+    Callback = fun(Result) ->
+        ?MODULE:on_complete("store_segment", PacketKey, Transfer, Result)
+    end,
     %% Add offset/checksum validations
-    case emqx_ft_storage:store_segment(transfer(Msg, FileId), Segment) of
-        ok ->
-            ?RC_SUCCESS;
-        {error, _Reason} ->
-            ?RC_UNSPECIFIED_ERROR
-    end.
-
-on_fin(PacketId, Msg, FileId, Checksum) ->
+    with_responder(PacketKey, Callback, ?STORE_SEGMENT_TIMEOUT, fun() ->
+        case store_segment(Transfer, Segment) of
+            ok ->
+                emqx_ft_responder:ack(PacketKey, ok);
+            {async, Pid} ->
+                ok = emqx_ft_responder:kickoff(PacketKey, Pid),
+                ok;
+            {error, _} = Error ->
+                emqx_ft_responder:ack(PacketKey, Error)
+        end
+    end).
+
+on_fin(PacketId, Msg, Transfer, Checksum) ->
     ?SLOG(info, #{
         msg => "on_fin",
         mqtt_msg => Msg,
-        file_id => FileId,
-        checksum => Checksum,
-        packet_id => PacketId
+        packet_id => PacketId,
+        transfer => Transfer,
+        checksum => Checksum
     }),
     %% TODO: handle checksum? Do we need it?
     FinPacketKey = {self(), PacketId},
-    case emqx_ft_responder:start(FinPacketKey, fun ?MODULE:on_assemble/2, ?ASSEMBLE_TIMEOUT) of
-        %% We have new fin packet
+    Callback = fun(Result) ->
+        ?MODULE:on_complete("assemble", FinPacketKey, Transfer, Result)
+    end,
+    with_responder(FinPacketKey, Callback, ?ASSEMBLE_TIMEOUT, fun() ->
+        case assemble(Transfer) of
+            %% Assembling completed, ack through the responder right away
+            ok ->
+                emqx_ft_responder:ack(FinPacketKey, ok);
+            %% Assembling started, packet will be acked by the responder
+            {async, Pid} ->
+                ok = emqx_ft_responder:kickoff(FinPacketKey, Pid),
+                ok;
+            %% Assembling failed, ack through the responder
+            {error, _} = Error ->
+                emqx_ft_responder:ack(FinPacketKey, Error)
+        end
+    end).
+
+with_responder(Key, Callback, Timeout, CriticalSection) ->
+    case emqx_ft_responder:start(Key, Callback, Timeout) of
+        %% We have new packet
         {ok, _} ->
-            Callback = fun(Result) -> emqx_ft_responder:ack(FinPacketKey, Result) end,
-            case assemble(transfer(Msg, FileId), Callback) of
-                %% Assembling started, packet will be acked by the callback or the responder
-                {ok, _} ->
-                    ok;
-                %% Assembling failed, ack through the responder
-                {error, _} = Error ->
-                    emqx_ft_responder:ack(FinPacketKey, Error)
-            end;
-        %% Fin packet already received.
+            CriticalSection();
+        %% Packet already received.
         %% Since we are still handling the previous one,
         %% we probably have retransmit here
         {error, {already_started, _}} ->
@@ -247,13 +276,35 @@ on_fin(PacketId, Msg, FileId, Checksum) ->
     end,
     undefined.
 
-assemble(Transfer, Callback) ->
+store_filemeta(Transfer, Segment) ->
     try
-        emqx_ft_storage:assemble(Transfer, Callback)
+        emqx_ft_storage:store_filemeta(Transfer, Segment)
     catch
         C:E:S ->
-            ?SLOG(warning, #{
-                msg => "file_assemble_failed", class => C, reason => E, stacktrace => S
+            ?SLOG(error, #{
+                msg => "start_store_filemeta_failed", class => C, reason => E, stacktrace => S
+            }),
+            {error, {internal_error, E}}
+    end.
+
+store_segment(Transfer, Segment) ->
+    try
+        emqx_ft_storage:store_segment(Transfer, Segment)
+    catch
+        C:E:S ->
+            ?SLOG(error, #{
+                msg => "start_store_segment_failed", class => C, reason => E, stacktrace => S
+            }),
+            {error, {internal_error, E}}
+    end.
+
+assemble(Transfer) ->
+    try
+        emqx_ft_storage:assemble(Transfer)
+    catch
+        C:E:S ->
+            ?SLOG(error, #{
+                msg => "start_assemble_failed", class => C, reason => E, stacktrace => S
             }),
             {error, {internal_error, E}}
     end.
@@ -262,14 +313,28 @@ transfer(Msg, FileId) ->
     ClientId = Msg#message.from,
     {ClientId, FileId}.
 
-on_assemble({ChanPid, PacketId}, Result) ->
-    ?SLOG(debug, #{msg => "on_assemble", packet_id => PacketId, result => Result}),
+on_complete(Op, {ChanPid, PacketId}, Transfer, Result) ->
+    ?SLOG(debug, #{
+        msg => "on_complete",
+        operation => Op,
+        packet_id => PacketId,
+        transfer => Transfer
+    }),
     case Result of
-        {ack, ok} ->
+        {Mode, ok} when Mode == ack orelse Mode == down ->
             erlang:send(ChanPid, {puback, PacketId, [], ?RC_SUCCESS});
-        {ack, {error, _}} ->
+        {Mode, {error, _} = Reason} when Mode == ack orelse Mode == down ->
+            ?SLOG(error, #{
+                msg => Op ++ "_failed",
+                transfer => Transfer,
+                reason => Reason
+            }),
             erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR});
         timeout ->
+            ?SLOG(error, #{
+                msg => Op ++ "_timed_out",
+                transfer => Transfer
+            }),
             erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR})
     end.
 

+ 32 - 64
apps/emqx_ft/src/emqx_ft_assembler.erl

@@ -18,36 +18,31 @@
 
 -include_lib("emqx/include/logger.hrl").
 
--export([start_link/3]).
+-export([start_link/2]).
 
 -behaviour(gen_statem).
 -export([callback_mode/0]).
 -export([init/1]).
-% -export([list_local_fragments/3]).
-% -export([list_remote_fragments/3]).
-% -export([start_assembling/3]).
 -export([handle_event/4]).
 
-% -export([handle_continue/2]).
-% -export([handle_call/3]).
-% -export([handle_cast/2]).
-
 -record(st, {
     storage :: _Storage,
     transfer :: emqx_ft:transfer(),
     assembly :: _TODO,
     file :: {file:filename(), io:device(), term()} | undefined,
-    hash,
-    callback :: fun((ok | {error, term()}) -> any())
+    hash
 }).
 
--define(RPC_LIST_TIMEOUT, 1000).
--define(RPC_READSEG_TIMEOUT, 5000).
+-define(NAME(Transfer), {n, l, {?MODULE, Transfer}}).
+-define(REF(Transfer), {via, gproc, ?NAME(Transfer)}).
 
 %%
 
-start_link(Storage, Transfer, Callback) ->
-    gen_statem:start_link(?MODULE, {Storage, Transfer, Callback}, []).
+start_link(Storage, Transfer) ->
+    %% TODO
+    %% Additional callbacks? They won't survive restarts by the supervisor, which brings a
+    %% question if we even need to retry with the help of supervisor.
+    gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer}, []).
 
 %%
 
@@ -56,16 +51,21 @@ start_link(Storage, Transfer, Callback) ->
 callback_mode() ->
     handle_event_function.
 
-init({Storage, Transfer, Callback}) ->
+init({Storage, Transfer}) ->
     St = #st{
         storage = Storage,
         transfer = Transfer,
         assembly = emqx_ft_assembly:new(),
-        hash = crypto:hash_init(sha256),
-        callback = Callback
+        hash = crypto:hash_init(sha256)
     },
-    {ok, list_local_fragments, St, ?internal([])}.
-
+    {ok, idle, St}.
+
+handle_event(info, kickoff, idle, St) ->
+    % NOTE
+    % Someone's told us to start the work, which usually means that it has set up a monitor.
+    % We could wait for this message and handle it at the end of the assembling rather than at
+    % the beginning, however it would make error handling much more messier.
+    {next_state, list_local_fragments, St, ?internal([])};
 handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) ->
     % TODO: what we do with non-transients errors here (e.g. `eacces`)?
     {ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer, fragment),
@@ -76,10 +76,10 @@ handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) ->
             {next_state, start_assembling, NSt, ?internal([])};
         {incomplete, _} ->
             Nodes = mria_mnesia:running_nodes() -- [node()],
-            {next_state, {list_remote_fragments, Nodes}, NSt, ?internal([])}
+            {next_state, {list_remote_fragments, Nodes}, NSt, ?internal([])};
         % TODO: recovery?
-        % {error, _} = Reason ->
-        %     {stop, Reason}
+        {error, _} = Error ->
+            {stop, {shutdown, Error}}
     end;
 handle_event(internal, _, {list_remote_fragments, Nodes}, St) ->
     % TODO
@@ -107,12 +107,14 @@ handle_event(internal, _, {list_remote_fragments, Nodes}, St) ->
             {next_state, start_assembling, NSt, ?internal([])};
         % TODO: retries / recovery?
         {incomplete, _} = Status ->
-            {next_state, {failure, {error, Status}}, NSt, ?internal([])}
+            {stop, {shutdown, {error, Status}}};
+        {error, _} = Error ->
+            {stop, {shutdown, Error}}
     end;
 handle_event(internal, _, start_assembling, St = #st{assembly = Asm}) ->
     Filemeta = emqx_ft_assembly:filemeta(Asm),
     Coverage = emqx_ft_assembly:coverage(Asm),
-    % TODO: errors
+    % TODO: better error handling
     {ok, Handle} = emqx_ft_storage_fs:open_file(St#st.storage, St#st.transfer, Filemeta),
     {next_state, {assemble, Coverage}, St#st{file = Handle}, ?internal([])};
 handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) ->
@@ -120,50 +122,16 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) ->
     % Currently, race is possible between getting segment info from the remote node and
     % this node garbage collecting the segment itself.
     % TODO: pipelining
-    case pread(Node, Segment, St) of
-        {ok, Content} ->
-            case emqx_ft_storage_fs:write(St#st.file, Content) of
-                {ok, NHandle} ->
-                    {next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])};
-                %% TODO: better error handling
-                {error, _} = Error ->
-                    {next_state, {failure, Error}, St, ?internal([])}
-            end;
-        {error, _} = Error ->
-            %% TODO: better error handling
-            {next_state, {failure, Error}, St, ?internal([])}
-    end;
+    % TODO: better error handling
+    {ok, Content} = pread(Node, Segment, St),
+    {ok, NHandle} = emqx_ft_storage_fs:write(St#st.file, Content),
+    {next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])};
 handle_event(internal, _, {assemble, []}, St = #st{}) ->
     {next_state, complete, St, ?internal([])};
-handle_event(internal, _, complete, St = #st{assembly = Asm, file = Handle, callback = Callback}) ->
+handle_event(internal, _, complete, St = #st{assembly = Asm, file = Handle}) ->
     Filemeta = emqx_ft_assembly:filemeta(Asm),
     Result = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle),
-    _ = safe_apply(Callback, Result),
-    {stop, shutdown};
-handle_event(internal, _, {failure, Error}, #st{callback = Callback}) ->
-    _ = safe_apply(Callback, Error),
-    {stop, Error}.
-
-% handle_continue(list_local, St = #st{storage = Storage, transfer = Transfer, assembly = Asm}) ->
-%     % TODO: what we do with non-transients errors here (e.g. `eacces`)?
-%     {ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer),
-%     NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)),
-%     NSt = St#st{assembly = NAsm},
-%     case emqx_ft_assembly:status(NAsm) of
-%         complete ->
-%             {noreply, NSt, {continue}};
-%         {more, _} ->
-%             error(noimpl);
-%         {error, _} ->
-%             error(noimpl)
-%     end,
-%     {noreply, St}.
-
-% handle_call(_Call, _From, St) ->
-%     {reply, {error, badcall}, St}.
-
-% handle_cast(_Cast, St) ->
-%     {noreply, St}.
+    {stop, {shutdown, Result}}.
 
 pread(Node, Segment, St) when Node =:= node() ->
     emqx_ft_storage_fs:pread(St#st.storage, St#st.transfer, Segment, 0, segsize(Segment));

+ 10 - 5
apps/emqx_ft/src/emqx_ft_assembler_sup.erl

@@ -17,7 +17,7 @@
 -module(emqx_ft_assembler_sup).
 
 -export([start_link/0]).
--export([start_child/3]).
+-export([ensure_child/2]).
 
 -behaviour(supervisor).
 -export([init/1]).
@@ -25,13 +25,18 @@
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
-start_child(Storage, Transfer, Callback) ->
+ensure_child(Storage, Transfer) ->
     Childspec = #{
-        id => {Storage, Transfer},
-        start => {emqx_ft_assembler, start_link, [Storage, Transfer, Callback]},
+        id => Transfer,
+        start => {emqx_ft_assembler, start_link, [Storage, Transfer]},
         restart => temporary
     },
-    supervisor:start_child(?MODULE, Childspec).
+    case supervisor:start_child(?MODULE, Childspec) of
+        {ok, Pid} ->
+            {ok, Pid};
+        {error, {already_started, Pid}} ->
+            {ok, Pid}
+    end.
 
 init(_) ->
     SupFlags = #{

+ 29 - 4
apps/emqx_ft/src/emqx_ft_responder.erl

@@ -25,6 +25,7 @@
 
 %% API
 -export([start/3]).
+-export([kickoff/2]).
 -export([ack/2]).
 
 %% Supervisor API
@@ -35,7 +36,7 @@
 -define(REF(Key), {via, gproc, {n, l, {?MODULE, Key}}}).
 
 -type key() :: term().
--type respfun() :: fun(({ack, _Result} | timeout) -> _SideEffect).
+-type respfun() :: fun(({ack, _Result} | {down, _Result} | timeout) -> _SideEffect).
 
 %%--------------------------------------------------------------------
 %% API
@@ -45,6 +46,10 @@
 start(Key, RespFun, Timeout) ->
     emqx_ft_responder_sup:start_child(Key, RespFun, Timeout).
 
+-spec kickoff(key(), pid()) -> ok.
+kickoff(Key, Pid) ->
+    gen_server:call(?REF(Key), {kickoff, Pid}).
+
 -spec ack(key(), _Result) -> _Return.
 ack(Key, Result) ->
     % TODO: it's possible to avoid term copy
@@ -63,8 +68,13 @@ init({Key, RespFun, Timeout}) ->
     _TRef = erlang:send_after(Timeout, self(), timeout),
     {ok, {Key, RespFun}}.
 
+handle_call({kickoff, Pid}, _From, St) ->
+    % TODO: more state?
+    _MRef = erlang:monitor(process, Pid),
+    _ = Pid ! kickoff,
+    {reply, ok, St};
 handle_call({ack, Result}, _From, {Key, RespFun}) ->
-    Ret = apply(RespFun, [Key, {ack, Result}]),
+    Ret = apply(RespFun, [{ack, Result}]),
     ?tp(ft_responder_ack, #{key => Key, result => Result, return => Ret}),
     {stop, {shutdown, Ret}, Ret, undefined};
 handle_call(Msg, _From, State) ->
@@ -76,9 +86,13 @@ handle_cast(Msg, State) ->
     {noreply, State}.
 
 handle_info(timeout, {Key, RespFun}) ->
-    Ret = apply(RespFun, [Key, timeout]),
+    Ret = apply(RespFun, [timeout]),
     ?tp(ft_responder_timeout, #{key => Key, return => Ret}),
     {stop, {shutdown, Ret}, undefined};
+handle_info({'DOWN', _MRef, process, _Pid, Reason}, {Key, RespFun}) ->
+    Ret = apply(RespFun, [{down, map_down_reason(Reason)}]),
+    ?tp(ft_responder_procdown, #{key => Key, reason => Reason, return => Ret}),
+    {stop, {shutdown, Ret}, undefined};
 handle_info(Msg, State) ->
     ?SLOG(warning, #{msg => "unknown_message", info_msg => Msg}),
     {noreply, State}.
@@ -86,6 +100,17 @@ handle_info(Msg, State) ->
 terminate(_Reason, undefined) ->
     ok;
 terminate(Reason, {Key, RespFun}) ->
-    Ret = apply(RespFun, [Key, timeout]),
+    Ret = apply(RespFun, [timeout]),
     ?tp(ft_responder_shutdown, #{key => Key, reason => Reason, return => Ret}),
     ok.
+
+map_down_reason(normal) ->
+    ok;
+map_down_reason(shutdown) ->
+    ok;
+map_down_reason({shutdown, Result}) ->
+    Result;
+map_down_reason(noproc) ->
+    {error, noproc};
+map_down_reason(Error) ->
+    {error, {internal_error, Error}}.

+ 17 - 11
apps/emqx_ft/src/emqx_ft_storage.erl

@@ -20,7 +20,7 @@
     [
         store_filemeta/2,
         store_segment/2,
-        assemble/2,
+        assemble/1,
 
         ready_transfers/0,
         get_ready_transfer/1,
@@ -43,12 +43,18 @@
 %% Behaviour
 %%--------------------------------------------------------------------
 
+%% NOTE
+%% An async task will wait for a `kickoff` message to start processing, to give some time
+%% to set up monitors, etc. Async task will not explicitly report the processing result,
+%% you are expected to receive and handle exit reason of the process, which is
+%% -type result() :: `{shutdown, ok | {error, _}}`.
+
 -callback store_filemeta(storage(), emqx_ft:transfer(), emqx_ft:filemeta()) ->
-    ok | {error, term()}.
+    ok | {async, pid()} | {error, term()}.
 -callback store_segment(storage(), emqx_ft:transfer(), emqx_ft:segment()) ->
-    ok | {error, term()}.
--callback assemble(storage(), emqx_ft:transfer(), assemble_callback()) ->
-    {ok, pid()} | {error, term()}.
+    ok | {async, pid()} | {error, term()}.
+-callback assemble(storage(), emqx_ft:transfer()) ->
+    ok | {async, pid()} | {error, term()}.
 -callback ready_transfers(storage()) ->
     {ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}.
 -callback get_ready_transfer(storage(), ready_transfer_id()) ->
@@ -59,22 +65,22 @@
 %%--------------------------------------------------------------------
 
 -spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) ->
-    ok | {error, term()}.
+    ok | {async, pid()} | {error, term()}.
 store_filemeta(Transfer, FileMeta) ->
     Mod = mod(),
     Mod:store_filemeta(storage(), Transfer, FileMeta).
 
 -spec store_segment(emqx_ft:transfer(), emqx_ft:segment()) ->
-    ok | {error, term()}.
+    ok | {async, pid()} | {error, term()}.
 store_segment(Transfer, Segment) ->
     Mod = mod(),
     Mod:store_segment(storage(), Transfer, Segment).
 
--spec assemble(emqx_ft:transfer(), assemble_callback()) ->
-    {ok, pid()} | {error, term()}.
-assemble(Transfer, Callback) ->
+-spec assemble(emqx_ft:transfer()) ->
+    ok | {async, pid()} | {error, term()}.
+assemble(Transfer) ->
     Mod = mod(),
-    Mod:assemble(storage(), Transfer, Callback).
+    Mod:assemble(storage(), Transfer).
 
 -spec ready_transfers() -> {ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}.
 ready_transfers() ->

+ 7 - 5
apps/emqx_ft/src/emqx_ft_storage_fs.erl

@@ -24,7 +24,7 @@
 -export([store_segment/3]).
 -export([list/3]).
 -export([pread/5]).
--export([assemble/3]).
+-export([assemble/2]).
 
 -export([transfers/1]).
 
@@ -168,11 +168,13 @@ pread(_Storage, _Transfer, Frag, Offset, Size) ->
             {error, Reason}
     end.
 
--spec assemble(storage(), transfer(), fun((ok | {error, term()}) -> any())) ->
+-spec assemble(storage(), transfer()) ->
     % {ok, _Assembler :: pid()} | {error, incomplete} | {error, badrpc} | {error, _TODO}.
-    {ok, _Assembler :: pid()} | {error, _TODO}.
-assemble(Storage, Transfer, Callback) ->
-    emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback).
+    {async, _Assembler :: pid()} | {error, _TODO}.
+assemble(Storage, Transfer) ->
+    % TODO: ask cluster if the transfer is already assembled
+    {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer),
+    {async, Pid}.
 
 get_ready_transfer(_Storage, ReadyTransferId) ->
     case parse_ready_transfer_id(ReadyTransferId) of

+ 1 - 1
apps/emqx_ft/test/emqx_ft_SUITE.erl

@@ -361,7 +361,7 @@ t_assemble_crash(Config) ->
     C = ?config(client, Config),
 
     meck:new(emqx_ft_storage_fs),
-    meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _) -> meck:exception(error, oops) end),
+    meck:expect(emqx_ft_storage_fs, assemble, fun(_, _) -> meck:exception(error, oops) end),
 
     ?assertRCName(
         unspecified_error,

+ 22 - 28
apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl

@@ -37,7 +37,8 @@ all() ->
     ].
 
 init_per_suite(Config) ->
-    Config.
+    Apps = application:ensure_all_started(gproc),
+    [{suite_apps, Apps} | Config].
 
 end_per_suite(_Config) ->
     ok.
@@ -83,9 +84,8 @@ t_assemble_empty_transfer(Config) ->
         ]},
         emqx_ft_storage_fs:list(Storage, Transfer, fragment)
     ),
-    {ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun on_assembly_finished/1),
-    {ok, Event} = ?block_until(#{?snk_kind := test_assembly_finished}),
-    ?assertMatch(#{result := ok}, Event),
+    Status = complete_assemble(Storage, Transfer),
+    ?assertEqual({shutdown, ok}, Status),
     ?assertEqual(
         {ok, <<>>},
         % TODO
@@ -132,9 +132,8 @@ t_assemble_complete_local_transfer(Config) ->
         Fragments
     ),
 
-    {ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun on_assembly_finished/1),
-    {ok, Event} = ?block_until(#{?snk_kind := test_assembly_finished}),
-    ?assertMatch(#{result := ok}, Event),
+    Status = complete_assemble(Storage, Transfer),
+    ?assertEqual({shutdown, ok}, Status),
 
     AssemblyFilename = mk_assembly_filename(Config, Transfer, Filename),
     ?assertMatch(
@@ -172,37 +171,32 @@ t_assemble_incomplete_transfer(Config) ->
         expire_at => 42
     },
     ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta),
-    Self = self(),
-    {ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun(Result) ->
-        Self ! {test_assembly_finished, Result}
-    end),
-    receive
-        {test_assembly_finished, Result} ->
-            ?assertMatch({error, _}, Result)
-    after 1000 ->
-        ct:fail("Assembler did not called callback")
-    end.
+    Status = complete_assemble(Storage, Transfer),
+    ?assertMatch({shutdown, {error, _}}, Status).
 
 t_assemble_no_meta(Config) ->
     Storage = storage(Config),
     Transfer = {?CLIENTID2, ?config(file_id, Config)},
-    Self = self(),
-    {ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun(Result) ->
-        Self ! {test_assembly_finished, Result}
-    end),
+    Status = complete_assemble(Storage, Transfer),
+    ?assertMatch({shutdown, {error, {incomplete, _}}}, Status).
+
+complete_assemble(Storage, Transfer) ->
+    complete_assemble(Storage, Transfer, 1000).
+
+complete_assemble(Storage, Transfer, Timeout) ->
+    {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer),
+    MRef = erlang:monitor(process, Pid),
+    Pid ! kickoff,
     receive
-        {test_assembly_finished, Result} ->
-            ?assertMatch({error, _}, Result)
-    after 1000 ->
-        ct:fail("Assembler did not called callback")
+        {'DOWN', MRef, process, Pid, Result} ->
+            Result
+    after Timeout ->
+        ct:fail("Assembler did not finish in time")
     end.
 
 mk_assembly_filename(Config, {ClientID, FileID}, Filename) ->
     filename:join([?config(storage_root, Config), ClientID, FileID, result, Filename]).
 
-on_assembly_finished(Result) ->
-    ?tp(test_assembly_finished, #{result => Result}).
-
 %%
 
 t_list_transfers(Config) ->

+ 3 - 3
apps/emqx_ft/test/emqx_ft_responder_SUITE.erl

@@ -40,7 +40,7 @@ end_per_testcase(_Case, _Config) ->
 
 t_start_ack(_Config) ->
     Key = <<"test">>,
-    DefaultAction = fun(_Key, {ack, Ref}) -> Ref end,
+    DefaultAction = fun({ack, Ref}) -> Ref end,
     ?assertMatch(
         {ok, _Pid},
         emqx_ft_responder:start(Key, DefaultAction, 1000)
@@ -62,7 +62,7 @@ t_start_ack(_Config) ->
 t_timeout(_Config) ->
     Key = <<"test">>,
     Self = self(),
-    DefaultAction = fun(K, timeout) -> Self ! {timeout, K} end,
+    DefaultAction = fun(timeout) -> Self ! {timeout, Key} end,
     {ok, _Pid} = emqx_ft_responder:start(Key, DefaultAction, 20),
     receive
         {timeout, Key} ->
@@ -89,7 +89,7 @@ t_timeout(_Config) ->
 %     ).
 
 t_unknown_msgs(_Config) ->
-    {ok, Pid} = emqx_ft_responder:start(make_ref(), fun(_, _) -> ok end, 100),
+    {ok, Pid} = emqx_ft_responder:start(make_ref(), fun(_) -> ok end, 100),
     Pid ! {unknown_msg, <<"test">>},
     ok = gen_server:cast(Pid, {unknown_msg, <<"test">>}),
     ?assertEqual(