Просмотр исходного кода

Merge pull request #11481 from savonarola/0817-simplify-async-responses

chore(ft): refactor async reply mechanism
Ilya Averyanov 2 лет назад
Родитель
Сommit
2ef60db147

+ 5 - 0
apps/emqx/include/emqx_channel.hrl

@@ -41,4 +41,9 @@
     will_msg
 ]).
 
+-define(REPLY_OUTGOING(Packets), {outgoing, Packets}).
+-define(REPLY_CONNACK(Packet), {connack, Packet}).
+-define(REPLY_EVENT(StateOrEvent), {event, StateOrEvent}).
+-define(REPLY_CLOSE(Reason), {close, Reason}).
+
 -define(EXPIRE_INTERVAL_INFINITE, 4294967295000).

+ 19 - 9
apps/emqx/src/emqx_channel.erl

@@ -122,6 +122,7 @@
 -type reply() ::
     {outgoing, emqx_types:packet()}
     | {outgoing, [emqx_types:packet()]}
+    | {connack, emqx_types:packet()}
     | {event, conn_state() | updated}
     | {close, Reason :: atom()}.
 
@@ -1023,7 +1024,7 @@ handle_out(publish, [], Channel) ->
     {ok, Channel};
 handle_out(publish, Publishes, Channel) ->
     {Packets, NChannel} = do_deliver(Publishes, Channel),
-    {ok, {outgoing, Packets}, NChannel};
+    {ok, ?REPLY_OUTGOING(Packets), NChannel};
 handle_out(puback, {PacketId, ReasonCode}, Channel) ->
     {ok, ?PUBACK_PACKET(PacketId, ReasonCode), Channel};
 handle_out(pubrec, {PacketId, ReasonCode}, Channel) ->
@@ -1048,7 +1049,7 @@ handle_out(disconnect, {ReasonCode, ReasonName}, Channel) ->
     handle_out(disconnect, {ReasonCode, ReasonName, #{}}, Channel);
 handle_out(disconnect, {ReasonCode, ReasonName, Props}, Channel = ?IS_MQTT_V5) ->
     Packet = ?DISCONNECT_PACKET(ReasonCode, Props),
-    {ok, [{outgoing, Packet}, {close, ReasonName}], Channel};
+    {ok, [?REPLY_OUTGOING(Packet), ?REPLY_CLOSE(ReasonName)], Channel};
 handle_out(disconnect, {_ReasonCode, ReasonName, _Props}, Channel) ->
     {ok, {close, ReasonName}, Channel};
 handle_out(auth, {ReasonCode, Properties}, Channel) ->
@@ -1062,7 +1063,7 @@ handle_out(Type, Data, Channel) ->
 %%--------------------------------------------------------------------
 
 return_connack(AckPacket, Channel) ->
-    Replies = [{event, connected}, {connack, AckPacket}],
+    Replies = [?REPLY_EVENT(connected), ?REPLY_CONNACK(AckPacket)],
     case maybe_resume_session(Channel) of
         ignore ->
             {ok, Replies, Channel};
@@ -1073,7 +1074,7 @@ return_connack(AckPacket, Channel) ->
                 session = NSession
             },
             {Packets, NChannel2} = do_deliver(Publishes, NChannel1),
-            Outgoing = [{outgoing, Packets} || length(Packets) > 0],
+            Outgoing = [?REPLY_OUTGOING(Packets) || length(Packets) > 0],
             {ok, Replies ++ Outgoing, NChannel2}
     end.
 
@@ -1121,7 +1122,7 @@ do_deliver(Publishes, Channel) when is_list(Publishes) ->
 %%--------------------------------------------------------------------
 
 return_sub_unsub_ack(Packet, Channel) ->
-    {ok, [{outgoing, Packet}, {event, updated}], Channel}.
+    {ok, [?REPLY_OUTGOING(Packet), ?REPLY_EVENT(updated)], Channel}.
 
 %%--------------------------------------------------------------------
 %% Handle call
@@ -1235,7 +1236,7 @@ handle_info(
 ->
     Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
     case maybe_shutdown(Reason, Channel1) of
-        {ok, Channel2} -> {ok, {event, disconnected}, Channel2};
+        {ok, Channel2} -> {ok, ?REPLY_EVENT(disconnected), Channel2};
         Shutdown -> Shutdown
     end;
 handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) ->
@@ -1252,6 +1253,11 @@ handle_info({disconnect, ReasonCode, ReasonName, Props}, Channel) ->
     handle_out(disconnect, {ReasonCode, ReasonName, Props}, Channel);
 handle_info({puback, PacketId, PubRes, RC}, Channel) ->
     do_finish_publish(PacketId, PubRes, RC, Channel);
+handle_info({'DOWN', Ref, process, Pid, Reason}, Channel) ->
+    case emqx_hooks:run_fold('client.monitored_process_down', [Ref, Pid, Reason], []) of
+        [] -> {ok, Channel};
+        Msgs -> {ok, Msgs, Channel}
+    end;
 handle_info(Info, Channel) ->
     ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {ok, Channel}.
@@ -1358,9 +1364,13 @@ handle_timeout(
         {_, Quota2} ->
             {ok, clean_timer(quota_timer, Channel#channel{quota = Quota2})}
     end;
-handle_timeout(_TRef, Msg, Channel) ->
-    ?SLOG(error, #{msg => "unexpected_timeout", timeout_msg => Msg}),
-    {ok, Channel}.
+handle_timeout(TRef, Msg, Channel) ->
+    case emqx_hooks:run_fold('client.timeout', [TRef, Msg], []) of
+        [] ->
+            {ok, Channel};
+        Msgs ->
+            {ok, Msgs, Channel}
+    end.
 
 %%--------------------------------------------------------------------
 %% Ensure timers

+ 1 - 1
apps/emqx/src/emqx_cm.erl

@@ -191,7 +191,7 @@ do_unregister_channel({_ClientId, ChanPid} = Chan) ->
     true = ets:delete(?CHAN_CONN_TAB, Chan),
     true = ets:delete(?CHAN_INFO_TAB, Chan),
     ets:delete_object(?CHAN_TAB, Chan),
-    ok = emqx_hooks:run('channel.unregistered', [ChanPid]),
+    ok = emqx_hooks:run('cm.channel.unregistered', [ChanPid]),
     true.
 
 -spec connection_closed(emqx_types:clientid()) -> true.

+ 1 - 1
apps/emqx/test/emqx_connection_SUITE.erl

@@ -49,7 +49,7 @@ init_per_suite(Config) ->
     %% Meck Hooks
     ok = meck:new(emqx_hooks, [passthrough, no_history, no_link]),
     ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end),
-    ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> {ok, Acc} end),
+    ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> Acc end),
 
     ok = meck:expect(emqx_channel, ensure_disconnected, fun(_, Channel) -> Channel end),
 

+ 94 - 91
apps/emqx_ft/src/emqx_ft.erl

@@ -18,7 +18,9 @@
 
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx/include/emqx_channel.hrl").
 -include_lib("emqx/include/emqx_hooks.hrl").
+
 -include_lib("snabbkaffe/include/trace.hrl").
 
 -export([
@@ -28,7 +30,10 @@
 
 -export([
     on_message_publish/1,
-    on_message_puback/4
+    on_message_puback/4,
+    on_client_timeout/3,
+    on_process_down/4,
+    on_channel_unregistered/1
 ]).
 
 -export([
@@ -36,8 +41,6 @@
     encode_filemeta/1
 ]).
 
--export([on_complete/4]).
-
 -export_type([
     clientid/0,
     transfer/0,
@@ -85,17 +88,29 @@
     checksum => checksum()
 }.
 
+-define(FT_EVENT(EVENT), {?MODULE, EVENT}).
+
 %%--------------------------------------------------------------------
 %% API for app
 %%--------------------------------------------------------------------
 
 hook() ->
     ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_LOWEST),
-    ok = emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST).
+    ok = emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST),
+    ok = emqx_hooks:put('client.timeout', {?MODULE, on_client_timeout, []}, ?HP_LOWEST),
+    ok = emqx_hooks:put(
+        'client.monitored_process_down', {?MODULE, on_process_down, []}, ?HP_LOWEST
+    ),
+    ok = emqx_hooks:put(
+        'cm.channel.unregistered', {?MODULE, on_channel_unregistered, []}, ?HP_LOWEST
+    ).
 
 unhook() ->
     ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
-    ok = emqx_hooks:del('message.puback', {?MODULE, on_message_puback}).
+    ok = emqx_hooks:del('message.puback', {?MODULE, on_message_puback}),
+    ok = emqx_hooks:del('client.timeout', {?MODULE, on_client_timeout}),
+    ok = emqx_hooks:del('client.monitored_process_down', {?MODULE, on_process_down}),
+    ok = emqx_hooks:del('cm.channel.unregistered', {?MODULE, on_channel_unregistered}).
 
 %%--------------------------------------------------------------------
 %% API
@@ -145,6 +160,25 @@ on_message_puback(PacketId, #message{topic = Topic} = Msg, _PubRes, _RC) ->
             ignore
     end.
 
+on_channel_unregistered(ChannelPid) ->
+    ok = emqx_ft_async_reply:deregister_all(ChannelPid).
+
+on_client_timeout(_TRef, ?FT_EVENT({MRef, PacketId}), Acc) ->
+    _ = erlang:demonitor(MRef, [flush]),
+    _ = emqx_ft_async_reply:take_by_mref(MRef),
+    {stop, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, ?RC_UNSPECIFIED_ERROR)) | Acc]};
+on_client_timeout(_TRef, _Event, Acc) ->
+    {ok, Acc}.
+
+on_process_down(MRef, _Pid, Reason, Acc) ->
+    case emqx_ft_async_reply:take_by_mref(MRef) of
+        {ok, PacketId, TRef} ->
+            _ = emqx_utils:cancel_timer(TRef),
+            {stop, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, reason_to_rc(Reason))) | Acc]};
+        not_found ->
+            {ok, Acc}
+    end.
+
 %%--------------------------------------------------------------------
 %% Handlers for transfer messages
 %%--------------------------------------------------------------------
@@ -208,24 +242,13 @@ on_init(PacketId, Msg, Transfer, Meta) ->
         transfer => Transfer,
         filemeta => Meta
     }),
-    PacketKey = {self(), PacketId},
-    Callback = fun(Result) ->
-        ?MODULE:on_complete("store_filemeta", PacketKey, Transfer, Result)
-    end,
-    with_responder(PacketKey, Callback, emqx_ft_conf:init_timeout(), fun() ->
-        case store_filemeta(Transfer, Meta) of
-            % Stored, ack through the responder right away
-            ok ->
-                emqx_ft_responder:ack(PacketKey, ok);
-            % Storage operation started, packet will be acked by the responder
-            % {async, Pid} ->
-            %     ok = emqx_ft_responder:kickoff(PacketKey, Pid),
-            %     ok;
-            %% Storage operation failed, ack through the responder
-            {error, _} = Error ->
-                emqx_ft_responder:ack(PacketKey, Error)
-        end
-    end).
+    %% Currently synchronous.
+    %% If we want to make it async, we need to use `emqx_ft_async_reply`,
+    %% like in `on_fin`.
+    case store_filemeta(Transfer, Meta) of
+        ok -> ?RC_SUCCESS;
+        {error, _} -> ?RC_UNSPECIFIED_ERROR
+    end.
 
 on_abort(_Msg, _FileId) ->
     %% TODO
@@ -240,21 +263,13 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
         checksum => Checksum
     }),
     Segment = {Offset, Msg#message.payload},
-    PacketKey = {self(), PacketId},
-    Callback = fun(Result) ->
-        ?MODULE:on_complete("store_segment", PacketKey, Transfer, Result)
-    end,
-    with_responder(PacketKey, Callback, emqx_ft_conf:store_segment_timeout(), fun() ->
-        case store_segment(Transfer, Segment) of
-            ok ->
-                emqx_ft_responder:ack(PacketKey, ok);
-            % {async, Pid} ->
-            %     ok = emqx_ft_responder:kickoff(PacketKey, Pid),
-            %     ok;
-            {error, _} = Error ->
-                emqx_ft_responder:ack(PacketKey, Error)
-        end
-    end).
+    %% Currently synchronous.
+    %% If we want to make it async, we need to use `emqx_ft_async_reply`,
+    %% like in `on_fin`.
+    case store_segment(Transfer, Segment) of
+        ok -> ?RC_SUCCESS;
+        {error, _} -> ?RC_UNSPECIFIED_ERROR
+    end.
 
 on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum) ->
     ?tp(info, "file_transfer_fin", #{
@@ -265,37 +280,30 @@ on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum) ->
         checksum => FinalChecksum
     }),
     %% TODO: handle checksum? Do we need it?
-    FinPacketKey = {self(), PacketId},
-    Callback = fun(Result) ->
-        ?MODULE:on_complete("assemble", FinPacketKey, Transfer, Result)
-    end,
-    with_responder(FinPacketKey, Callback, emqx_ft_conf:assemble_timeout(), fun() ->
-        case assemble(Transfer, FinalSize, FinalChecksum) of
-            %% Assembling completed, ack through the responder right away
-            ok ->
-                emqx_ft_responder:ack(FinPacketKey, ok);
-            %% Assembling started, packet will be acked by the responder
-            {async, Pid} ->
-                ok = emqx_ft_responder:kickoff(FinPacketKey, Pid),
-                ok;
-            %% Assembling failed, ack through the responder
-            {error, _} = Error ->
-                emqx_ft_responder:ack(FinPacketKey, Error)
-        end
-    end).
-
-with_responder(Key, Callback, Timeout, CriticalSection) ->
-    case emqx_ft_responder:start(Key, Callback, Timeout) of
-        %% We have new packet
-        {ok, _} ->
-            CriticalSection();
-        %% Packet already received.
-        %% Since we are still handling the previous one,
-        %% we probably have retransmit here
-        {error, {already_started, _}} ->
-            ok
-    end,
-    undefined.
+    emqx_ft_async_reply:with_new_packet(
+        PacketId,
+        fun() ->
+            case assemble(Transfer, FinalSize, FinalChecksum) of
+                ok ->
+                    ?RC_SUCCESS;
+                %% Assembling started, packet will be acked by monitor or timeout
+                {async, Pid} ->
+                    ok = register_async_reply(Pid, PacketId),
+                    ok = emqx_ft_storage:kickoff(Pid),
+                    undefined;
+                {error, _} ->
+                    ?RC_UNSPECIFIED_ERROR
+            end
+        end,
+        undefined
+    ).
+
+register_async_reply(Pid, PacketId) ->
+    MRef = erlang:monitor(process, Pid),
+    TRef = erlang:start_timer(
+        emqx_ft_conf:assemble_timeout(), self(), ?FT_EVENT({MRef, PacketId})
+    ),
+    ok = emqx_ft_async_reply:register(PacketId, MRef, TRef).
 
 store_filemeta(Transfer, Segment) ->
     try
@@ -335,28 +343,6 @@ transfer(Msg, FileId) ->
     ClientId = Msg#message.from,
     {clientid_to_binary(ClientId), FileId}.
 
-on_complete(Op, {ChanPid, PacketId}, Transfer, Result) ->
-    ?tp(debug, "on_complete", #{
-        operation => Op,
-        packet_id => PacketId,
-        transfer => Transfer
-    }),
-    case Result of
-        {Mode, ok} when Mode == ack orelse Mode == down ->
-            erlang:send(ChanPid, {puback, PacketId, [], ?RC_SUCCESS});
-        {Mode, {error, _} = Reason} when Mode == ack orelse Mode == down ->
-            ?tp(error, Op ++ "_failed", #{
-                transfer => Transfer,
-                reason => Reason
-            }),
-            erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR});
-        timeout ->
-            ?tp(error, Op ++ "_timed_out", #{
-                transfer => Transfer
-            }),
-            erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR})
-    end.
-
 validate(Validations, Fun) ->
     case do_validate(Validations, []) of
         {ok, Parsed} ->
@@ -429,3 +415,20 @@ clientid_to_binary(A) when is_atom(A) ->
     atom_to_binary(A);
 clientid_to_binary(B) when is_binary(B) ->
     B.
+
+reason_to_rc(Reason) ->
+    case map_down_reason(Reason) of
+        ok -> ?RC_SUCCESS;
+        {error, _} -> ?RC_UNSPECIFIED_ERROR
+    end.
+
+map_down_reason(normal) ->
+    ok;
+map_down_reason(shutdown) ->
+    ok;
+map_down_reason({shutdown, Result}) ->
+    Result;
+map_down_reason(noproc) ->
+    {error, noproc};
+map_down_reason(Error) ->
+    {error, {internal_error, Error}}.

+ 114 - 0
apps/emqx_ft/src/emqx_ft_async_reply.erl

@@ -0,0 +1,114 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_async_reply).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx/include/types.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
+
+-export([
+    create_tables/0,
+    info/0
+]).
+
+-export([
+    register/3,
+    take_by_mref/1,
+    with_new_packet/3,
+    deregister_all/1
+]).
+
+-type channel_pid() :: pid().
+-type mon_ref() :: reference().
+-type timer_ref() :: reference().
+-type packet_id() :: emqx_types:packet_id().
+
+%% packets waiting for async workers
+
+-define(MON_TAB, emqx_ft_async_mons).
+-define(MON_KEY(MRef), ?MON_KEY(self(), MRef)).
+-define(MON_KEY(ChannelPid, MRef), {ChannelPid, MRef}).
+
+%% async worker monitors by packet ids
+
+-define(PACKET_TAB, emqx_ft_async_packets).
+-define(PACKET_KEY(PacketId), ?PACKET_KEY(self(), PacketId)).
+-define(PACKET_KEY(ChannelPid, PacketId), {ChannelPid, PacketId}).
+
+%%--------------------------------------------------------------------
+%% API
+%% -------------------------------------------------------------------
+
+-spec create_tables() -> ok.
+create_tables() ->
+    EtsOptions = [
+        named_table,
+        public,
+        ordered_set,
+        {read_concurrency, true},
+        {write_concurrency, true}
+    ],
+    ok = emqx_utils_ets:new(?MON_TAB, EtsOptions),
+    ok = emqx_utils_ets:new(?PACKET_TAB, EtsOptions),
+    ok.
+
+-spec register(packet_id(), mon_ref(), timer_ref()) -> ok.
+register(PacketId, MRef, TRef) ->
+    _ = ets:insert(?PACKET_TAB, {?PACKET_KEY(PacketId), MRef}),
+    _ = ets:insert(?MON_TAB, {?MON_KEY(MRef), PacketId, TRef}),
+    ok.
+
+-spec with_new_packet(packet_id(), fun(() -> any()), any()) -> any().
+with_new_packet(PacketId, Fun, Default) ->
+    case ets:member(?PACKET_TAB, ?PACKET_KEY(PacketId)) of
+        true -> Default;
+        false -> Fun()
+    end.
+
+-spec take_by_mref(mon_ref()) -> {ok, packet_id(), timer_ref()} | not_found.
+take_by_mref(MRef) ->
+    case ets:take(?MON_TAB, ?MON_KEY(MRef)) of
+        [{_, PacketId, TRef}] ->
+            _ = ets:delete(?PACKET_TAB, ?PACKET_KEY(PacketId)),
+            {ok, PacketId, TRef};
+        [] ->
+            not_found
+    end.
+
+-spec deregister_all(channel_pid()) -> ok.
+deregister_all(ChannelPid) ->
+    ok = deregister_packets(ChannelPid),
+    ok = deregister_mons(ChannelPid),
+    ok.
+
+-spec info() -> {non_neg_integer(), non_neg_integer()}.
+info() ->
+    {ets:info(?MON_TAB, size), ets:info(?PACKET_TAB, size)}.
+
+%%--------------------------------------------------------------------
+%% Internal
+%%-------------------------------------------------------------------
+
+deregister_packets(ChannelPid) when is_pid(ChannelPid) ->
+    MS = [{{?PACKET_KEY(ChannelPid, '_'), '_'}, [], [true]}],
+    _ = ets:select_delete(?PACKET_TAB, MS),
+    ok.
+
+deregister_mons(ChannelPid) ->
+    MS = [{{?MON_KEY(ChannelPid, '_'), '_', '_'}, [], [true]}],
+    _ = ets:select_delete(?MON_TAB, MS),
+    ok.

+ 0 - 116
apps/emqx_ft/src/emqx_ft_responder.erl

@@ -1,116 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_ft_responder).
-
--behaviour(gen_server).
-
--include_lib("emqx/include/logger.hrl").
--include_lib("emqx/include/types.hrl").
-
--include_lib("snabbkaffe/include/snabbkaffe.hrl").
-
-%% API
--export([start/3]).
--export([kickoff/2]).
--export([ack/2]).
-
-%% Supervisor API
--export([start_link/3]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-
--define(REF(Key), {via, gproc, {n, l, {?MODULE, Key}}}).
-
--type key() :: term().
--type respfun() :: fun(({ack, _Result} | {down, _Result} | timeout) -> _SideEffect).
-
-%%--------------------------------------------------------------------
-%% API
-%% -------------------------------------------------------------------
-
--spec start(key(), respfun(), timeout()) -> startlink_ret().
-start(Key, RespFun, Timeout) ->
-    emqx_ft_responder_sup:start_child(Key, RespFun, Timeout).
-
--spec kickoff(key(), pid()) -> ok.
-kickoff(Key, Pid) ->
-    gen_server:call(?REF(Key), {kickoff, Pid}).
-
--spec ack(key(), _Result) -> _Return.
-ack(Key, Result) ->
-    % TODO: it's possible to avoid term copy
-    gen_server:call(?REF(Key), {ack, Result}, infinity).
-
--spec start_link(key(), timeout(), respfun()) -> startlink_ret().
-start_link(Key, RespFun, Timeout) ->
-    gen_server:start_link(?REF(Key), ?MODULE, {Key, RespFun, Timeout}, []).
-
-%%--------------------------------------------------------------------
-%% gen_server callbacks
-%% -------------------------------------------------------------------
-
-init({Key, RespFun, Timeout}) ->
-    _ = erlang:process_flag(trap_exit, true),
-    _TRef = erlang:send_after(Timeout, self(), timeout),
-    {ok, {Key, RespFun}}.
-
-handle_call({kickoff, Pid}, _From, St) ->
-    % TODO: more state?
-    _MRef = erlang:monitor(process, Pid),
-    _ = Pid ! kickoff,
-    {reply, ok, St};
-handle_call({ack, Result}, _From, {Key, RespFun}) ->
-    Ret = apply(RespFun, [{ack, Result}]),
-    ?tp(debug, ft_responder_ack, #{key => Key, result => Result, return => Ret}),
-    {stop, {shutdown, Ret}, Ret, undefined};
-handle_call(Msg, _From, State) ->
-    ?SLOG(warning, #{msg => "unknown_call", call_msg => Msg}),
-    {reply, {error, unknown_call}, State}.
-
-handle_cast(Msg, State) ->
-    ?SLOG(warning, #{msg => "unknown_cast", cast_msg => Msg}),
-    {noreply, State}.
-
-handle_info(timeout, {Key, RespFun}) ->
-    Ret = apply(RespFun, [timeout]),
-    ?tp(debug, ft_responder_timeout, #{key => Key, return => Ret}),
-    {stop, {shutdown, Ret}, undefined};
-handle_info({'DOWN', _MRef, process, _Pid, Reason}, {Key, RespFun}) ->
-    Ret = apply(RespFun, [{down, map_down_reason(Reason)}]),
-    ?tp(debug, ft_responder_procdown, #{key => Key, reason => Reason, return => Ret}),
-    {stop, {shutdown, Ret}, undefined};
-handle_info(Msg, State) ->
-    ?SLOG(warning, #{msg => "unknown_message", info_msg => Msg}),
-    {noreply, State}.
-
-terminate(_Reason, undefined) ->
-    ok;
-terminate(Reason, {Key, RespFun}) ->
-    Ret = apply(RespFun, [timeout]),
-    ?tp(debug, ft_responder_shutdown, #{key => Key, reason => Reason, return => Ret}),
-    ok.
-
-map_down_reason(normal) ->
-    ok;
-map_down_reason(shutdown) ->
-    ok;
-map_down_reason({shutdown, Result}) ->
-    Result;
-map_down_reason(noproc) ->
-    {error, noproc};
-map_down_reason(Error) ->
-    {error, {internal_error, Error}}.

+ 0 - 48
apps/emqx_ft/src/emqx_ft_responder_sup.erl

@@ -1,48 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_ft_responder_sup).
-
--export([start_link/0]).
--export([start_child/3]).
-
--behaviour(supervisor).
--export([init/1]).
-
--define(SUPERVISOR, ?MODULE).
-
-%%
-
--spec start_link() -> {ok, pid()}.
-start_link() ->
-    supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []).
-
-start_child(Key, RespFun, Timeout) ->
-    supervisor:start_child(?SUPERVISOR, [Key, RespFun, Timeout]).
-
--spec init(_) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
-init(_) ->
-    Flags = #{
-        strategy => simple_one_for_one,
-        intensity => 100,
-        period => 100
-    },
-    ChildSpec = #{
-        id => responder,
-        start => {emqx_ft_responder, start_link, []},
-        restart => temporary
-    },
-    {ok, {Flags, [ChildSpec]}}.

+ 8 - 0
apps/emqx_ft/src/emqx_ft_storage.erl

@@ -23,6 +23,7 @@
         store_filemeta/2,
         store_segment/2,
         assemble/3,
+        kickoff/1,
 
         files/0,
         files/1,
@@ -121,6 +122,13 @@ store_segment(Transfer, Segment) ->
 assemble(Transfer, Size, FinOpts) ->
     dispatch(assemble, [Transfer, Size, FinOpts]).
 
+-spec kickoff(pid()) -> ok.
+kickoff(Pid) ->
+    _ = erlang:send(Pid, kickoff),
+    ok.
+
+%%
+
 -spec files() ->
     {ok, page(file_info(), _)} | {error, term()}.
 files() ->

+ 3 - 10
apps/emqx_ft/src/emqx_ft_sup.erl

@@ -28,6 +28,8 @@ start_link() ->
     supervisor:start_link({local, ?SERVER}, ?MODULE, []).
 
 init([]) ->
+    ok = emqx_ft_async_reply:create_tables(),
+
     SupFlags = #{
         strategy => one_for_one,
         intensity => 100,
@@ -52,14 +54,5 @@ init([]) ->
         modules => [emqx_ft_storage_fs_reader_sup]
     },
 
-    Responder = #{
-        id => emqx_ft_responder_sup,
-        start => {emqx_ft_responder_sup, start_link, []},
-        restart => permanent,
-        shutdown => infinity,
-        type => worker,
-        modules => [emqx_ft_responder_sup]
-    },
-
-    ChildSpecs = [Responder, AssemblerSup, FileReaderSup],
+    ChildSpecs = [AssemblerSup, FileReaderSup],
     {ok, {SupFlags, ChildSpecs}}.

+ 1 - 1
apps/emqx_ft/test/emqx_ft_SUITE.erl

@@ -37,7 +37,7 @@ all() ->
 
 groups() ->
     [
-        {single_node, [parallel], [
+        {single_node, [], [
             t_assemble_crash,
             t_corrupted_segment_retry,
             t_invalid_checksum,

+ 247 - 0
apps/emqx_ft/test/emqx_ft_async_reply_SUITE.erl

@@ -0,0 +1,247 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_async_reply_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("stdlib/include/assert.hrl").
+-include_lib("emqx/include/asserts.hrl").
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx, #{override_env => [{boot_modules, [broker, listeners]}]}},
+            {emqx_ft, "file_transfer { enable = true, assemble_timeout = 1s }"}
+        ],
+        #{work_dir => ?config(priv_dir, Config)}
+    ),
+    [{suite_apps, Apps} | Config].
+
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(suite_apps, Config)),
+    ok.
+
+init_per_testcase(_Case, Config) ->
+    ok = snabbkaffe:start_trace(),
+    Config.
+
+end_per_testcase(_Case, _Config) ->
+    ok = snabbkaffe:stop(),
+    ok.
+
+%%--------------------------------------------------------------------
+%% Tests
+%%--------------------------------------------------------------------
+
+t_register(_Config) ->
+    PacketId = 1,
+    MRef = make_ref(),
+    TRef = make_ref(),
+    ok = emqx_ft_async_reply:register(PacketId, MRef, TRef),
+
+    ?assertEqual(
+        undefined,
+        emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined)
+    ),
+
+    ?assertEqual(
+        ok,
+        emqx_ft_async_reply:with_new_packet(2, fun() -> ok end, undefined)
+    ),
+
+    ?assertEqual(
+        {ok, PacketId, TRef},
+        emqx_ft_async_reply:take_by_mref(MRef)
+    ).
+
+t_process_independence(_Config) ->
+    PacketId = 1,
+    MRef = make_ref(),
+    TRef = make_ref(),
+    ok = emqx_ft_async_reply:register(PacketId, MRef, TRef),
+
+    Self = self(),
+
+    spawn_link(fun() ->
+        Self ! emqx_ft_async_reply:take_by_mref(MRef)
+    end),
+
+    Res1 =
+        receive
+            Msg1 -> Msg1
+        end,
+
+    ?assertEqual(
+        not_found,
+        Res1
+    ),
+
+    spawn_link(fun() ->
+        Self ! emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined)
+    end),
+
+    Res2 =
+        receive
+            Msg2 -> Msg2
+        end,
+
+    ?assertEqual(
+        ok,
+        Res2
+    ).
+
+t_take(_Config) ->
+    PacketId = 1,
+    MRef = make_ref(),
+    TRef = make_ref(),
+    ok = emqx_ft_async_reply:register(PacketId, MRef, TRef),
+
+    ?assertEqual(
+        {ok, PacketId, TRef},
+        emqx_ft_async_reply:take_by_mref(MRef)
+    ),
+
+    ?assertEqual(
+        not_found,
+        emqx_ft_async_reply:take_by_mref(MRef)
+    ),
+
+    ?assertEqual(
+        ok,
+        emqx_ft_async_reply:with_new_packet(2, fun() -> ok end, undefined)
+    ).
+
+t_cleanup(_Config) ->
+    PacketId = 1,
+    MRef0 = make_ref(),
+    TRef0 = make_ref(),
+    MRef1 = make_ref(),
+    TRef1 = make_ref(),
+    ok = emqx_ft_async_reply:register(PacketId, MRef0, TRef0),
+
+    Self = self(),
+
+    Pid = spawn_link(fun() ->
+        ok = emqx_ft_async_reply:register(PacketId, MRef1, TRef1),
+        receive
+            kickoff ->
+                ?assertEqual(
+                    undefined,
+                    emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined)
+                ),
+
+                ?assertEqual(
+                    {ok, PacketId, TRef1},
+                    emqx_ft_async_reply:take_by_mref(MRef1)
+                ),
+
+                Self ! done
+        end
+    end),
+
+    ?assertEqual(
+        undefined,
+        emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined)
+    ),
+
+    ok = emqx_ft_async_reply:deregister_all(Self),
+
+    ?assertEqual(
+        ok,
+        emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined)
+    ),
+
+    Pid ! kickoff,
+
+    receive
+        done -> ok
+    end.
+
+t_reply_by_tiemout(_Config) ->
+    process_flag(trap_exit, true),
+    ClientId = atom_to_binary(?FUNCTION_NAME),
+    C = emqx_ft_test_helpers:start_client(ClientId, node()),
+
+    SleepForever = fun() ->
+        Ref = make_ref(),
+        receive
+            Ref -> ok
+        end
+    end,
+
+    ok = meck:new(emqx_ft_storage, [passthrough]),
+    meck:expect(emqx_ft_storage, assemble, fun(_, _, _) -> {async, spawn_link(SleepForever)} end),
+
+    FinTopic = <<"$file/fakeid/fin/999999">>,
+
+    ?assertMatch(
+        {ok, #{reason_code_name := unspecified_error}},
+        emqtt:publish(C, FinTopic, <<>>, 1)
+    ),
+
+    meck:unload(emqx_ft_storage),
+    emqtt:stop(C).
+
+t_cleanup_by_cm(_Config) ->
+    process_flag(trap_exit, true),
+    ClientId = atom_to_binary(?FUNCTION_NAME),
+    C = emqx_ft_test_helpers:start_client(ClientId, node()),
+
+    ok = meck:new(emqx_ft_storage, [passthrough]),
+    meck:expect(emqx_ft_storage, kickoff, fun(_) -> meck:exception(error, oops) end),
+
+    FinTopic = <<"$file/fakeid/fin/999999">>,
+
+    [ClientPid] = emqx_cm:lookup_channels(ClientId),
+
+    ?assertWaitEvent(
+        begin
+            emqtt:publish(C, FinTopic, <<>>, 1),
+            exit(ClientPid, kill)
+        end,
+        #{?snk_kind := emqx_cm_clean_down, client_id := ClientId},
+        1000
+    ),
+
+    ?assertEqual(
+        {0, 0},
+        emqx_ft_async_reply:info()
+    ),
+
+    meck:unload(emqx_ft_storage).
+
+t_unrelated_events(_Config) ->
+    process_flag(trap_exit, true),
+    ClientId = atom_to_binary(?FUNCTION_NAME),
+    C = emqx_ft_test_helpers:start_client(ClientId, node()),
+    [ClientPid] = emqx_cm:lookup_channels(ClientId),
+
+    erlang:monitor(process, ClientPid),
+
+    ClientPid ! {'DOWN', make_ref(), process, self(), normal},
+    ClientPid ! {timeout, make_ref(), unknown_timer_event},
+
+    ?assertNotReceive(
+        {'DOWN', _Ref, process, ClientPid, _Reason},
+        500
+    ),
+
+    emqtt:stop(C).

+ 0 - 84
apps/emqx_ft/test/emqx_ft_responder_SUITE.erl

@@ -1,84 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_ft_responder_SUITE).
-
--compile(export_all).
--compile(nowarn_export_all).
-
--include_lib("stdlib/include/assert.hrl").
-
-all() -> emqx_common_test_helpers:all(?MODULE).
-
-init_per_suite(Config) ->
-    ok = emqx_common_test_helpers:start_apps([emqx_ft], emqx_ft_test_helpers:env_handler(Config)),
-    Config.
-
-end_per_suite(_Config) ->
-    ok = emqx_common_test_helpers:stop_apps([emqx_ft]),
-    ok.
-
-init_per_testcase(_Case, Config) ->
-    Config.
-
-end_per_testcase(_Case, _Config) ->
-    ok.
-
-t_start_ack(_Config) ->
-    Key = <<"test">>,
-    DefaultAction = fun({ack, Ref}) -> Ref end,
-    ?assertMatch(
-        {ok, _Pid},
-        emqx_ft_responder:start(Key, DefaultAction, 1000)
-    ),
-    ?assertMatch(
-        {error, {already_started, _Pid}},
-        emqx_ft_responder:start(Key, DefaultAction, 1000)
-    ),
-    Ref = make_ref(),
-    ?assertEqual(
-        Ref,
-        emqx_ft_responder:ack(Key, Ref)
-    ),
-    ?assertExit(
-        {noproc, _},
-        emqx_ft_responder:ack(Key, Ref)
-    ).
-
-t_timeout(_Config) ->
-    Key = <<"test">>,
-    Self = self(),
-    DefaultAction = fun(timeout) -> Self ! {timeout, Key} end,
-    {ok, _Pid} = emqx_ft_responder:start(Key, DefaultAction, 20),
-    receive
-        {timeout, Key} ->
-            ok
-    after 100 ->
-        ct:fail("emqx_ft_responder not called")
-    end,
-    ?assertExit(
-        {noproc, _},
-        emqx_ft_responder:ack(Key, oops)
-    ).
-
-t_unknown_msgs(_Config) ->
-    {ok, Pid} = emqx_ft_responder:start(make_ref(), fun(_) -> ok end, 100),
-    Pid ! {unknown_msg, <<"test">>},
-    ok = gen_server:cast(Pid, {unknown_msg, <<"test">>}),
-    ?assertEqual(
-        {error, unknown_call},
-        gen_server:call(Pid, {unknown_call, <<"test">>})
-    ).