Przeglądaj źródła

feat(ft): add additional operation status report channel

Ilya Averyanov 2 lat temu
rodzic
commit
534c9bdc13

+ 155 - 88
apps/emqx_ft/src/emqx_ft.erl

@@ -90,6 +90,10 @@
 
 -define(FT_EVENT(EVENT), {?MODULE, EVENT}).
 
+-define(ACK_AND_PUBLISH(Result), {true, Result}).
+-define(ACK(Result), {false, Result}).
+-define(DELAY_ACK, delay).
+
 %%--------------------------------------------------------------------
 %% API for app
 %%--------------------------------------------------------------------
@@ -116,46 +120,34 @@ unhook() ->
 %% API
 %%--------------------------------------------------------------------
 
-decode_filemeta(Payload) when is_binary(Payload) ->
-    case emqx_utils_json:safe_decode(Payload, [return_maps]) of
-        {ok, Map} ->
-            decode_filemeta(Map);
-        {error, Error} ->
-            {error, {invalid_filemeta_json, Error}}
-    end;
-decode_filemeta(Map) when is_map(Map) ->
-    Schema = emqx_ft_schema:schema(filemeta),
-    try
-        Meta = hocon_tconf:check_plain(Schema, Map, #{atom_key => true, required => false}),
-        {ok, Meta}
-    catch
-        throw:{_Schema, Errors} ->
-            {error, {invalid_filemeta, Errors}}
-    end.
+decode_filemeta(Payload) ->
+    emqx_ft_schema:decode(filemeta, Payload).
 
 encode_filemeta(Meta = #{}) ->
-    Schema = emqx_ft_schema:schema(filemeta),
-    hocon_tconf:make_serializable(Schema, emqx_utils_maps:binary_key_map(Meta), #{}).
+    emqx_ft_schema:encode(filemeta, Meta).
+
+encode_response(Response) ->
+    emqx_ft_schema:encode(command_response, Response).
 
 %%--------------------------------------------------------------------
 %% Hooks
 %%--------------------------------------------------------------------
 
-on_message_publish(
-    Msg = #message{
-        id = _Id,
-        topic = <<"$file/", _/binary>>
-    }
-) ->
+on_message_publish(Msg = #message{topic = <<"$file-async/", _/binary>>}) ->
+    Headers = Msg#message.headers,
+    {stop, Msg#message{headers = Headers#{allow_publish => false}}};
+on_message_publish(Msg = #message{topic = <<"$file/", _/binary>>}) ->
     Headers = Msg#message.headers,
     {stop, Msg#message{headers = Headers#{allow_publish => false}}};
 on_message_publish(Msg) ->
     {ok, Msg}.
 
-on_message_puback(PacketId, #message{topic = Topic} = Msg, _PubRes, _RC) ->
+on_message_puback(PacketId, #message{from = From, topic = Topic} = Msg, _PubRes, _RC) ->
     case Topic of
-        <<"$file/", FileCommand/binary>> ->
-            {stop, on_file_command(PacketId, Msg, FileCommand)};
+        <<"$file/", _/binary>> ->
+            {stop, on_file_command(sync, From, PacketId, Msg, Topic)};
+        <<"$file-async/", _/binary>> ->
+            {stop, on_file_command(async, From, PacketId, Msg, Topic)};
         _ ->
             ignore
     end.
@@ -163,18 +155,33 @@ on_message_puback(PacketId, #message{topic = Topic} = Msg, _PubRes, _RC) ->
 on_channel_unregistered(ChannelPid) ->
     ok = emqx_ft_async_reply:deregister_all(ChannelPid).
 
-on_client_timeout(_TRef, ?FT_EVENT({MRef, PacketId}), Acc) ->
+on_client_timeout(_TRef0, ?FT_EVENT({MRef, TopicReplyData}), Acc) ->
     _ = erlang:demonitor(MRef, [flush]),
-    _ = emqx_ft_async_reply:take_by_mref(MRef),
-    {stop, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, ?RC_UNSPECIFIED_ERROR)) | Acc]};
+    Result = {error, timeout},
+    _ = publish_response(Result, TopicReplyData),
+    case emqx_ft_async_reply:take_by_mref(MRef) of
+        {ok, undefined, _TRef1, _TopicReplyData} ->
+            {stop, Acc};
+        {ok, PacketId, _TRef1, _TopicReplyData} ->
+            {stop, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, result_to_rc(Result))) | Acc]};
+        not_found ->
+            {ok, Acc}
+    end;
 on_client_timeout(_TRef, _Event, Acc) ->
     {ok, Acc}.
 
-on_process_down(MRef, _Pid, Reason, Acc) ->
+on_process_down(MRef, _Pid, DownReason, Acc) ->
     case emqx_ft_async_reply:take_by_mref(MRef) of
-        {ok, PacketId, TRef} ->
+        {ok, PacketId, TRef, TopicReplyData} ->
             _ = emqx_utils:cancel_timer(TRef),
-            {stop, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, reason_to_rc(Reason))) | Acc]};
+            Result = down_reason_to_result(DownReason),
+            _ = publish_response(Result, TopicReplyData),
+            case PacketId of
+                undefined ->
+                    {stop, Acc};
+                _ ->
+                    {stop, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, result_to_rc(Result))) | Acc]}
+            end;
         not_found ->
             {ok, Acc}
     end.
@@ -185,24 +192,27 @@ on_process_down(MRef, _Pid, Reason, Acc) ->
 
 %% TODO Move to emqx_ft_mqtt?
 
-on_file_command(PacketId, Msg, FileCommand) ->
-    case emqx_topic:tokens(FileCommand) of
-        [FileIdIn | Rest] ->
-            validate([{fileid, FileIdIn}], fun([FileId]) ->
-                on_file_command(PacketId, FileId, Msg, Rest)
-            end);
-        [] ->
-            ?RC_UNSPECIFIED_ERROR
-    end.
+on_file_command(Mode, From, PacketId, Msg, Topic) ->
+    TopicReplyData = topic_reply_data(Mode, From, PacketId, Msg),
+    Result =
+        case emqx_topic:tokens(Topic) of
+            [_FTPrefix, FileIdIn | Rest] ->
+                validate([{fileid, FileIdIn}], fun([FileId]) ->
+                    do_on_file_command(TopicReplyData, FileId, Msg, Rest)
+                end);
+            [] ->
+                ?ACK_AND_PUBLISH({error, {invalid_topic, Topic}})
+        end,
+    maybe_publish_response(Result, TopicReplyData).
 
-on_file_command(PacketId, FileId, Msg, FileCommand) ->
+do_on_file_command(TopicReplyData, FileId, Msg, FileCommand) ->
     Transfer = transfer(Msg, FileId),
     case FileCommand of
         [<<"init">>] ->
             validate(
                 [{filemeta, Msg#message.payload}],
                 fun([Meta]) ->
-                    on_init(PacketId, Msg, Transfer, Meta)
+                    on_init(TopicReplyData, Msg, Transfer, Meta)
                 end
             );
         [<<"fin">>, FinalSizeBin | MaybeChecksum] when length(MaybeChecksum) =< 1 ->
@@ -210,14 +220,14 @@ on_file_command(PacketId, FileId, Msg, FileCommand) ->
             validate(
                 [{size, FinalSizeBin}, {{maybe, checksum}, ChecksumBin}],
                 fun([FinalSize, FinalChecksum]) ->
-                    on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum)
+                    on_fin(TopicReplyData, Msg, Transfer, FinalSize, FinalChecksum)
                 end
             );
         [<<"abort">>] ->
-            on_abort(Msg, Transfer);
+            on_abort(TopicReplyData, Msg, Transfer);
         [OffsetBin] ->
             validate([{offset, OffsetBin}], fun([Offset]) ->
-                on_segment(PacketId, Msg, Transfer, Offset, undefined)
+                on_segment(TopicReplyData, Msg, Transfer, Offset, undefined)
             end);
         [OffsetBin, ChecksumBin] ->
             validate(
@@ -226,16 +236,16 @@ on_file_command(PacketId, FileId, Msg, FileCommand) ->
                     validate(
                         [{integrity, Msg#message.payload, Checksum}],
                         fun(_) ->
-                            on_segment(PacketId, Msg, Transfer, Offset, Checksum)
+                            on_segment(TopicReplyData, Msg, Transfer, Offset, Checksum)
                         end
                     )
                 end
             );
         _ ->
-            ?RC_UNSPECIFIED_ERROR
+            ?ACK_AND_PUBLISH({error, {invalid_file_command, FileCommand}})
     end.
 
-on_init(PacketId, Msg, Transfer, Meta) ->
+on_init(#{packet_id := PacketId}, Msg, Transfer, Meta) ->
     ?tp(info, "file_transfer_init", #{
         mqtt_msg => Msg,
         packet_id => PacketId,
@@ -245,16 +255,13 @@ on_init(PacketId, Msg, Transfer, Meta) ->
     %% Currently synchronous.
     %% If we want to make it async, we need to use `emqx_ft_async_reply`,
     %% like in `on_fin`.
-    case store_filemeta(Transfer, Meta) of
-        ok -> ?RC_SUCCESS;
-        {error, _} -> ?RC_UNSPECIFIED_ERROR
-    end.
+    ?ACK_AND_PUBLISH(store_filemeta(Transfer, Meta)).
 
-on_abort(_Msg, _FileId) ->
+on_abort(_TopicReplyData, _Msg, _FileId) ->
     %% TODO
-    ?RC_SUCCESS.
+    ?ACK_AND_PUBLISH(ok).
 
-on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
+on_segment(#{packet_id := PacketId}, Msg, Transfer, Offset, Checksum) ->
     ?tp(info, "file_transfer_segment", #{
         mqtt_msg => Msg,
         packet_id => PacketId,
@@ -266,12 +273,9 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
     %% Currently synchronous.
     %% If we want to make it async, we need to use `emqx_ft_async_reply`,
     %% like in `on_fin`.
-    case store_segment(Transfer, Segment) of
-        ok -> ?RC_SUCCESS;
-        {error, _} -> ?RC_UNSPECIFIED_ERROR
-    end.
+    ?ACK_AND_PUBLISH(store_segment(Transfer, Segment)).
 
-on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum) ->
+on_fin(#{packet_id := PacketId} = TopicReplyData, Msg, Transfer, FinalSize, FinalChecksum) ->
     ?tp(info, "file_transfer_fin", #{
         mqtt_msg => Msg,
         packet_id => PacketId,
@@ -280,30 +284,94 @@ on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum) ->
         checksum => FinalChecksum
     }),
     %% TODO: handle checksum? Do we need it?
-    emqx_ft_async_reply:with_new_packet(
+    with_new_packet(
+        TopicReplyData,
         PacketId,
         fun() ->
             case assemble(Transfer, FinalSize, FinalChecksum) of
                 ok ->
-                    ?RC_SUCCESS;
-                %% Assembling started, packet will be acked by monitor or timeout
+                    ?ACK_AND_PUBLISH(ok);
+                %% Assembling started, packet will be acked/replied by monitor or timeout
                 {async, Pid} ->
-                    ok = register_async_reply(Pid, PacketId),
-                    ok = emqx_ft_storage:kickoff(Pid),
-                    undefined;
-                {error, _} ->
-                    ?RC_UNSPECIFIED_ERROR
+                    register_async_worker(Pid, TopicReplyData);
+                {error, _} = Error ->
+                    ?ACK_AND_PUBLISH(Error)
             end
-        end,
-        undefined
+        end
     ).
 
-register_async_reply(Pid, PacketId) ->
+register_async_worker(Pid, #{mode := Mode, packet_id := PacketId} = TopicReplyData) ->
     MRef = erlang:monitor(process, Pid),
     TRef = erlang:start_timer(
-        emqx_ft_conf:assemble_timeout(), self(), ?FT_EVENT({MRef, PacketId})
+        emqx_ft_conf:assemble_timeout(), self(), ?FT_EVENT({MRef, TopicReplyData})
+    ),
+    case Mode of
+        async ->
+            ok = emqx_ft_async_reply:register(MRef, TRef, TopicReplyData),
+            ok = emqx_ft_storage:kickoff(Pid),
+            ?ACK(ok);
+        sync ->
+            ok = emqx_ft_async_reply:register(PacketId, MRef, TRef, TopicReplyData),
+            ok = emqx_ft_storage:kickoff(Pid),
+            ?DELAY_ACK
+    end.
+
+topic_reply_data(Mode, From, PacketId, #message{topic = Topic, headers = Headers}) ->
+    Props = maps:get(properties, Headers, #{}),
+    #{
+        mode => Mode,
+        clientid => From,
+        command_topic => Topic,
+        correlation_data => maps:get('Correlation-Data', Props, undefined),
+        response_topic => maps:get('Response-Topic', Props, undefined),
+        packet_id => PacketId
+    }.
+
+maybe_publish_response(?DELAY_ACK, _TopicReplyData) ->
+    undefined;
+maybe_publish_response(?ACK(Result), _TopicReplyData) ->
+    result_to_rc(Result);
+maybe_publish_response(?ACK_AND_PUBLISH(Result), TopicReplyData) ->
+    publish_response(Result, TopicReplyData).
+
+publish_response(Result, #{
+    clientid := ClientId,
+    command_topic := CommandTopic,
+    correlation_data := CorrelationData,
+    response_topic := ResponseTopic,
+    packet_id := PacketId
+}) ->
+    ResultCode = result_to_rc(Result),
+    Response = encode_response(#{
+        topic => CommandTopic,
+        packet_id => PacketId,
+        reason_code => ResultCode,
+        reason_description => emqx_ft_error:format(Result)
+    }),
+    Payload = emqx_utils_json:encode(Response),
+    Topic = emqx_maybe:define(ResponseTopic, response_topic(ClientId)),
+    Msg = emqx_message:make(
+        emqx_guid:gen(),
+        undefined,
+        ?QOS_1,
+        Topic,
+        Payload,
+        #{},
+        #{properties => response_properties(CorrelationData)}
     ),
-    ok = emqx_ft_async_reply:register(PacketId, MRef, TRef).
+    _ = emqx_broker:publish(Msg),
+    ResultCode.
+
+response_properties(undefined) -> #{};
+response_properties(CorrelationData) -> #{'Correlation-Data' => CorrelationData}.
+
+response_topic(ClientId) ->
+    <<"$file-response/", (clientid_to_binary(ClientId))/binary>>.
+
+result_to_rc(ok) ->
+    ?RC_SUCCESS;
+result_to_rc({error, _}) ->
+    ?RC_UNSPECIFIED_ERROR.
 
 store_filemeta(Transfer, Segment) ->
     try
@@ -347,9 +415,9 @@ validate(Validations, Fun) ->
     case do_validate(Validations, []) of
         {ok, Parsed} ->
             Fun(Parsed);
-        {error, Reason} ->
+        {error, Reason} = Error ->
             ?tp(info, "client_violated_protocol", #{reason => Reason}),
-            ?RC_UNSPECIFIED_ERROR
+            ?ACK_AND_PUBLISH(Error)
     end.
 
 do_validate([], Parsed) ->
@@ -416,19 +484,18 @@ clientid_to_binary(A) when is_atom(A) ->
 clientid_to_binary(B) when is_binary(B) ->
     B.
 
-reason_to_rc(Reason) ->
-    case map_down_reason(Reason) of
-        ok -> ?RC_SUCCESS;
-        {error, _} -> ?RC_UNSPECIFIED_ERROR
-    end.
-
-map_down_reason(normal) ->
+down_reason_to_result(normal) ->
     ok;
-map_down_reason(shutdown) ->
+down_reason_to_result(shutdown) ->
     ok;
-map_down_reason({shutdown, Result}) ->
+down_reason_to_result({shutdown, Result}) ->
     Result;
-map_down_reason(noproc) ->
+down_reason_to_result(noproc) ->
     {error, noproc};
-map_down_reason(Error) ->
+down_reason_to_result(Error) ->
     {error, {internal_error, Error}}.
+
+with_new_packet(#{mode := async}, _PacketId, Fun) ->
+    Fun();
+with_new_packet(#{mode := sync}, PacketId, Fun) ->
+    emqx_ft_async_reply:with_new_packet(PacketId, Fun, undefined).

+ 10 - 6
apps/emqx_ft/src/emqx_ft_assembler.erl

@@ -156,12 +156,16 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #{export :=
     % Currently, race is possible between getting segment info from the remote node and
     % this node garbage collecting the segment itself.
     % TODO: pipelining
-    % TODO: better error handling
-    {ok, Content} = pread(Node, Segment, St),
-    case emqx_ft_storage_exporter:write(Export, Content) of
-        {ok, NExport} ->
-            {next_state, {assemble, Rest}, St#{export := NExport}, ?internal([])};
-        {error, _} = Error ->
+    case pread(Node, Segment, St) of
+        {ok, Content} ->
+            case emqx_ft_storage_exporter:write(Export, Content) of
+                {ok, NExport} ->
+                    {next_state, {assemble, Rest}, St#{export := NExport}, ?internal([])};
+                {error, _} = Error ->
+                    {stop, {shutdown, Error}, maps:remove(export, St)}
+            end;
+        {error, ReadError} ->
+            Error = {error, {read_segment, ReadError}},
             {stop, {shutdown, Error}, maps:remove(export, St)}
     end;
 handle_event(internal, _, {assemble, []}, St = #{}) ->

+ 18 - 10
apps/emqx_ft/src/emqx_ft_async_reply.erl

@@ -27,6 +27,7 @@
 
 -export([
     register/3,
+    register/4,
     take_by_mref/1,
     with_new_packet/3,
     deregister_all/1
@@ -42,12 +43,14 @@
 -define(MON_TAB, emqx_ft_async_mons).
 -define(MON_KEY(MRef), ?MON_KEY(self(), MRef)).
 -define(MON_KEY(ChannelPid, MRef), {ChannelPid, MRef}).
+-define(MON_RECORD(KEY, PACKET_ID, TREF, DATA), {KEY, PACKET_ID, TREF, DATA}).
 
 %% async worker monitors by packet ids
 
 -define(PACKET_TAB, emqx_ft_async_packets).
 -define(PACKET_KEY(PacketId), ?PACKET_KEY(self(), PacketId)).
 -define(PACKET_KEY(ChannelPid, PacketId), {ChannelPid, PacketId}).
+-define(PACKET_RECORD(KEY, MREF, DATA), {KEY, MREF, DATA}).
 
 %%--------------------------------------------------------------------
 %% API
@@ -66,10 +69,15 @@ create_tables() ->
     ok = emqx_utils_ets:new(?PACKET_TAB, EtsOptions),
     ok.
 
--spec register(packet_id(), mon_ref(), timer_ref()) -> ok.
-register(PacketId, MRef, TRef) ->
-    _ = ets:insert(?PACKET_TAB, {?PACKET_KEY(PacketId), MRef}),
-    _ = ets:insert(?MON_TAB, {?MON_KEY(MRef), PacketId, TRef}),
+-spec register(packet_id(), mon_ref(), timer_ref(), term()) -> ok.
+register(PacketId, MRef, TRef, Data) ->
+    _ = ets:insert(?PACKET_TAB, ?PACKET_RECORD(?PACKET_KEY(PacketId), MRef, Data)),
+    _ = ets:insert(?MON_TAB, ?MON_RECORD(?MON_KEY(MRef), PacketId, TRef, Data)),
+    ok.
+
+-spec register(mon_ref(), timer_ref(), term()) -> ok.
+register(MRef, TRef, Data) ->
+    _ = ets:insert(?MON_TAB, ?MON_RECORD(?MON_KEY(MRef), undefined, TRef, Data)),
     ok.
 
 -spec with_new_packet(packet_id(), fun(() -> any()), any()) -> any().
@@ -79,12 +87,12 @@ with_new_packet(PacketId, Fun, Default) ->
         false -> Fun()
     end.
 
--spec take_by_mref(mon_ref()) -> {ok, packet_id(), timer_ref()} | not_found.
+-spec take_by_mref(mon_ref()) -> {ok, packet_id() | undefined, timer_ref(), term()} | not_found.
 take_by_mref(MRef) ->
     case ets:take(?MON_TAB, ?MON_KEY(MRef)) of
-        [{_, PacketId, TRef}] ->
-            _ = ets:delete(?PACKET_TAB, ?PACKET_KEY(PacketId)),
-            {ok, PacketId, TRef};
+        [?MON_RECORD(_, PacketId, TRef, Data)] ->
+            PacketId =/= undefined andalso ets:delete(?PACKET_TAB, ?PACKET_KEY(PacketId)),
+            {ok, PacketId, TRef, Data};
         [] ->
             not_found
     end.
@@ -104,11 +112,11 @@ info() ->
 %%-------------------------------------------------------------------
 
 deregister_packets(ChannelPid) when is_pid(ChannelPid) ->
-    MS = [{{?PACKET_KEY(ChannelPid, '_'), '_'}, [], [true]}],
+    MS = [{?PACKET_RECORD(?PACKET_KEY(ChannelPid, '_'), '_', '_'), [], [true]}],
     _ = ets:select_delete(?PACKET_TAB, MS),
     ok.
 
 deregister_mons(ChannelPid) ->
-    MS = [{{?MON_KEY(ChannelPid, '_'), '_', '_'}, [], [true]}],
+    MS = [{?MON_RECORD(?MON_KEY(ChannelPid, '_'), '_', '_', '_'), [], [true]}],
     _ = ets:select_delete(?MON_TAB, MS),
     ok.

+ 39 - 0
apps/emqx_ft/src/emqx_ft_error.erl

@@ -0,0 +1,39 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% @doc File Transfer error description module
+
+-module(emqx_ft_error).
+
+-export([format/1]).
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+format(ok) -> <<"success">>;
+format({error, Reason}) -> format_error_reson(Reason).
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+format_error_reson(Reason) when is_atom(Reason) ->
+    atom_to_binary(Reason, utf8);
+format_error_reson({ErrorKind, _}) when is_atom(ErrorKind) ->
+    atom_to_binary(ErrorKind, utf8);
+format_error_reson(_Reason) ->
+    <<"internal_error">>.

+ 34 - 3
apps/emqx_ft/src/emqx_ft_schema.erl

@@ -26,7 +26,7 @@
 -export([schema/1]).
 
 %% Utilities
--export([backend/1]).
+-export([backend/1, encode/2, decode/2]).
 
 %% Test-only helpers
 -export([translate/1]).
@@ -76,7 +76,7 @@ fields(file_transfer) ->
                 #{
                     desc => ?DESC("init_timeout"),
                     required => false,
-                    importance => ?IMPORTANCE_LOW,
+                    importance => ?IMPORTANCE_HIDDEN,
                     default => "10s"
                 }
             )},
@@ -86,7 +86,7 @@ fields(file_transfer) ->
                 #{
                     desc => ?DESC("store_segment_timeout"),
                     required => false,
-                    importance => ?IMPORTANCE_LOW,
+                    importance => ?IMPORTANCE_HIDDEN,
                     default => "5m"
                 }
             )},
@@ -282,6 +282,16 @@ schema(filemeta) ->
             {segments_ttl, hoconsc:mk(pos_integer())},
             {user_data, hoconsc:mk(json_value())}
         ]
+    };
+schema(command_response) ->
+    #{
+        roots => [
+            {vsn, hoconsc:mk(string(), #{default => <<"0.1">>})},
+            {topic, hoconsc:mk(string())},
+            {packet_id, hoconsc:mk(pos_integer())},
+            {reason_code, hoconsc:mk(non_neg_integer())},
+            {reason_description, hoconsc:mk(binary())}
+        ]
     }.
 
 validator(filename) ->
@@ -345,6 +355,27 @@ backend(Config) ->
 emit_enabled(Type, BConf = #{enable := Enabled}) ->
     Enabled andalso throw({Type, BConf}).
 
+decode(SchemaName, Payload) when is_binary(Payload) ->
+    case emqx_utils_json:safe_decode(Payload, [return_maps]) of
+        {ok, Map} ->
+            decode(SchemaName, Map);
+        {error, Error} ->
+            {error, {invalid_filemeta_json, Error}}
+    end;
+decode(SchemaName, Map) when is_map(Map) ->
+    Schema = schema(SchemaName),
+    try
+        Meta = hocon_tconf:check_plain(Schema, Map, #{atom_key => true, required => false}),
+        {ok, Meta}
+    catch
+        throw:{_Schema, Errors} ->
+            {error, {invalid_filemeta, Errors}}
+    end.
+
+encode(SchemaName, Map = #{}) ->
+    Schema = schema(SchemaName),
+    hocon_tconf:make_serializable(Schema, emqx_utils_maps:binary_key_map(Map), #{}).
+
 %% Test-only helpers
 
 -spec translate(emqx_config:raw_config()) ->

+ 1 - 1
apps/emqx_ft/src/emqx_ft_storage_exporter.erl

@@ -195,7 +195,7 @@ verify_checksum(Ctx, {Algo, Digest} = Checksum) ->
         Digest ->
             {ok, Checksum};
         Mismatch ->
-            {error, {checksum, Algo, binary:encode_hex(Mismatch)}}
+            {error, {checksum_mismatch, Algo, binary:encode_hex(Mismatch)}}
     end;
 verify_checksum(Ctx, undefined) ->
     Digest = crypto:hash_final(Ctx),

+ 1 - 1
apps/emqx_ft/src/emqx_ft_storage_fs.erl

@@ -145,7 +145,7 @@ store_filemeta(Storage, Transfer, Meta) ->
             % We won't see conflicts in case of concurrent `store_filemeta`
             % requests. It's rather odd scenario so it's fine not to worry
             % about it too much now.
-            {error, conflict};
+            {error, filemeta_conflict};
         {error, Reason} when Reason =:= notfound; Reason =:= corrupted; Reason =:= enoent ->
             write_file_atomic(Storage, Transfer, Filepath, encode_filemeta(Meta));
         {error, _} = Error ->

+ 199 - 104
apps/emqx_ft/test/emqx_ft_SUITE.erl

@@ -31,7 +31,8 @@
 
 all() ->
     [
-        {group, single_node},
+        {group, async_mode},
+        {group, sync_mode},
         {group, cluster}
     ].
 
@@ -50,7 +51,14 @@ groups() ->
             t_nasty_filenames,
             t_no_meta,
             t_no_segment,
-            t_simple_transfer
+            t_simple_transfer,
+            t_assemble_timeout
+        ]},
+        {async_mode, [], [
+            {group, single_node}
+        ]},
+        {sync_mode, [], [
+            {group, single_node}
         ]},
         {cluster, [], [
             t_switch_node,
@@ -72,9 +80,10 @@ init_per_suite(Config) ->
         emqx_ft_test_helpers:local_storage(Config),
         #{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => 0}}}}
     ),
+    FTConfig = emqx_ft_test_helpers:config(Storage, #{<<"assemble_timeout">> => <<"2s">>}),
     Apps = emqx_cth_suite:start(
         [
-            {emqx_ft, #{config => emqx_ft_test_helpers:config(Storage)}}
+            {emqx_ft, #{config => FTConfig}}
         ],
         #{work_dir => emqx_cth_suite:work_dir(Config)}
     ),
@@ -85,7 +94,10 @@ end_per_suite(Config) ->
     ok.
 
 init_per_testcase(Case, Config) ->
-    ClientId = atom_to_binary(Case),
+    ClientId = iolist_to_binary([
+        atom_to_binary(Case), <<"-">>, emqx_ft_test_helpers:unique_binary_string()
+    ]),
+    ok = set_client_specific_ft_dirs(ClientId, Config),
     case ?config(group, Config) of
         cluster ->
             [{clientid, ClientId} | Config];
@@ -103,6 +115,10 @@ init_per_group(Group = cluster, Config) ->
     Cluster = mk_cluster_specs(Config),
     Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}),
     [{group, Group}, {cluster_nodes, Nodes} | Config];
+init_per_group(_Group = async_mode, Config) ->
+    [{mode, sync} | Config];
+init_per_group(_Group = sync_mode, Config) ->
+    [{mode, async} | Config];
 init_per_group(Group, Config) ->
     [{group, Group} | Config].
 
@@ -127,7 +143,7 @@ mk_cluster_specs(_Config) ->
     ].
 
 %%--------------------------------------------------------------------
-%% Tests
+%% Single node tests
 %%--------------------------------------------------------------------
 
 t_invalid_topic_format(Config) ->
@@ -171,32 +187,32 @@ t_invalid_fileid(Config) ->
     C = ?config(client, Config),
     ?assertRCName(
         unspecified_error,
-        emqtt:publish(C, <<"$file//init">>, <<>>, 1)
+        emqtt:publish(C, mk_init_topic(Config, <<>>), <<>>, 1)
     ).
 
 t_invalid_filename(Config) ->
     C = ?config(client, Config),
     ?assertRCName(
         unspecified_error,
-        emqtt:publish(C, mk_init_topic(<<"f1">>), encode_meta(meta(".", <<>>)), 1)
+        emqtt:publish(C, mk_init_topic(Config, <<"f1">>), encode_meta(meta(".", <<>>)), 1)
     ),
     ?assertRCName(
         unspecified_error,
-        emqtt:publish(C, mk_init_topic(<<"f2">>), encode_meta(meta("..", <<>>)), 1)
+        emqtt:publish(C, mk_init_topic(Config, <<"f2">>), encode_meta(meta("..", <<>>)), 1)
     ),
     ?assertRCName(
         unspecified_error,
-        emqtt:publish(C, mk_init_topic(<<"f2">>), encode_meta(meta("../nice", <<>>)), 1)
+        emqtt:publish(C, mk_init_topic(Config, <<"f2">>), encode_meta(meta("../nice", <<>>)), 1)
     ),
     ?assertRCName(
         unspecified_error,
-        emqtt:publish(C, mk_init_topic(<<"f3">>), encode_meta(meta("/etc/passwd", <<>>)), 1)
+        emqtt:publish(C, mk_init_topic(Config, <<"f3">>), encode_meta(meta("/etc/passwd", <<>>)), 1)
     ),
     ?assertRCName(
         unspecified_error,
         emqtt:publish(
             C,
-            mk_init_topic(<<"f4">>),
+            mk_init_topic(Config, <<"f4">>),
             encode_meta(meta(lists:duplicate(1000, $A), <<>>)),
             1
         )
@@ -204,6 +220,7 @@ t_invalid_filename(Config) ->
 
 t_simple_transfer(Config) ->
     C = ?config(client, Config),
+    ClientId = ?config(clientid, Config),
 
     Filename = "topsecret.pdf",
     FileId = <<"f1">>,
@@ -214,22 +231,24 @@ t_simple_transfer(Config) ->
 
     ?assertRCName(
         success,
-        emqtt:publish(C, mk_init_topic(FileId), encode_meta(Meta), 1)
+        emqtt:publish(C, mk_init_topic(Config, FileId), encode_meta(Meta), 1)
     ),
 
     lists:foreach(
         fun({Chunk, Offset}) ->
             ?assertRCName(
                 success,
-                emqtt:publish(C, mk_segment_topic(FileId, Offset), Chunk, 1)
+                emqtt:publish(C, mk_segment_topic(Config, FileId, Offset), Chunk, 1)
             )
         end,
         with_offsets(Data)
     ),
 
-    ?assertRCName(
-        success,
-        emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1)
+    ?assertEqual(
+        ok,
+        emqx_ft_test_helpers:fin_result(
+            mode(Config), ClientId, C, mk_fin_topic(Config, FileId, Filesize)
+        )
     ),
 
     [Export] = list_files(?config(clientid, Config)),
@@ -238,7 +257,7 @@ t_simple_transfer(Config) ->
         read_export(Export)
     ).
 
-t_nasty_clientids_fileids(_Config) ->
+t_nasty_clientids_fileids(Config) ->
     Transfers = [
         {<<".">>, <<".">>},
         {<<"🌚"/utf8>>, <<"🌝"/utf8>>},
@@ -249,15 +268,16 @@ t_nasty_clientids_fileids(_Config) ->
 
     ok = lists:foreach(
         fun({ClientId, FileId}) ->
-            ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "justfile", ClientId),
+            Data = ClientId,
+            ok = emqx_ft_test_helpers:upload_file(mode(Config), ClientId, FileId, "justfile", Data),
             [Export] = list_files(ClientId),
             ?assertMatch(#{meta := #{name := "justfile"}}, Export),
-            ?assertEqual({ok, ClientId}, read_export(Export))
+            ?assertEqual({ok, Data}, read_export(Export))
         end,
         Transfers
     ).
 
-t_nasty_filenames(_Config) ->
+t_nasty_filenames(Config) ->
     Filenames = [
         {<<"nasty1">>, "146%"},
         {<<"nasty2">>, "🌚"},
@@ -267,7 +287,7 @@ t_nasty_filenames(_Config) ->
     ok = lists:foreach(
         fun({ClientId, Filename}) ->
             FileId = unicode:characters_to_binary(Filename),
-            ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, Filename, FileId),
+            ok = emqx_ft_test_helpers:upload_file(mode(Config), ClientId, FileId, Filename, FileId),
             [Export] = list_files(ClientId),
             ?assertMatch(#{meta := #{name := Filename}}, Export),
             ?assertEqual({ok, FileId}, read_export(Export))
@@ -285,34 +305,36 @@ t_meta_conflict(Config) ->
 
     ?assertRCName(
         success,
-        emqtt:publish(C, mk_init_topic(FileId), encode_meta(Meta), 1)
+        emqtt:publish(C, mk_init_topic(Config, FileId), encode_meta(Meta), 1)
     ),
 
     ConflictMeta = Meta#{name => "conflict.pdf"},
 
     ?assertRCName(
         unspecified_error,
-        emqtt:publish(C, mk_init_topic(FileId), encode_meta(ConflictMeta), 1)
+        emqtt:publish(C, mk_init_topic(Config, FileId), encode_meta(ConflictMeta), 1)
     ).
 
 t_no_meta(Config) ->
     C = ?config(client, Config),
+    ClientId = ?config(clientid, Config),
 
     FileId = <<"f1">>,
     Data = <<"first">>,
 
     ?assertRCName(
         success,
-        emqtt:publish(C, mk_segment_topic(FileId, 0), Data, 1)
+        emqtt:publish(C, mk_segment_topic(Config, FileId, 0), Data, 1)
     ),
 
-    ?assertRCName(
-        unspecified_error,
-        emqtt:publish(C, mk_fin_topic(FileId, 42), <<>>, 1)
+    ?assertEqual(
+        {error, unspecified_error},
+        emqx_ft_test_helpers:fin_result(mode(Config), ClientId, C, mk_fin_topic(Config, FileId, 42))
     ).
 
 t_no_segment(Config) ->
     C = ?config(client, Config),
+    ClientId = ?config(clientid, Config),
 
     Filename = "topsecret.pdf",
     FileId = <<"f1">>,
@@ -323,23 +345,25 @@ t_no_segment(Config) ->
 
     ?assertRCName(
         success,
-        emqtt:publish(C, mk_init_topic(FileId), encode_meta(Meta), 1)
+        emqtt:publish(C, mk_init_topic(Config, FileId), encode_meta(Meta), 1)
     ),
 
     lists:foreach(
         fun({Chunk, Offset}) ->
             ?assertRCName(
                 success,
-                emqtt:publish(C, mk_segment_topic(FileId, Offset), Chunk, 1)
+                emqtt:publish(C, mk_segment_topic(Config, FileId, Offset), Chunk, 1)
             )
         end,
         %% Skip the first segment
         tl(with_offsets(Data))
     ),
 
-    ?assertRCName(
-        unspecified_error,
-        emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1)
+    ?assertEqual(
+        {error, unspecified_error},
+        emqx_ft_test_helpers:fin_result(
+            mode(Config), ClientId, C, mk_fin_topic(Config, FileId, Filesize)
+        )
     ).
 
 t_invalid_meta(Config) ->
@@ -352,17 +376,18 @@ t_invalid_meta(Config) ->
     MetaPayload = emqx_utils_json:encode(Meta),
     ?assertRCName(
         unspecified_error,
-        emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1)
+        emqtt:publish(C, mk_init_topic(Config, FileId), MetaPayload, 1)
     ),
 
     %% Invalid JSON
     ?assertRCName(
         unspecified_error,
-        emqtt:publish(C, mk_init_topic(FileId), <<"{oops;">>, 1)
+        emqtt:publish(C, mk_init_topic(Config, FileId), <<"{oops;">>, 1)
     ).
 
 t_invalid_checksum(Config) ->
     C = ?config(client, Config),
+    ClientId = ?config(clientid, Config),
 
     Filename = "topsecret.pdf",
     FileId = <<"f1">>,
@@ -374,35 +399,39 @@ t_invalid_checksum(Config) ->
 
     ?assertRCName(
         success,
-        emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1)
+        emqtt:publish(C, mk_init_topic(Config, FileId), MetaPayload, 1)
     ),
 
     lists:foreach(
         fun({Chunk, Offset}) ->
             ?assertRCName(
                 success,
-                emqtt:publish(C, mk_segment_topic(FileId, Offset), Chunk, 1)
+                emqtt:publish(C, mk_segment_topic(Config, FileId, Offset), Chunk, 1)
             )
         end,
         with_offsets(Data)
     ),
 
     % Send `fin` w/o checksum, should fail since filemeta checksum is invalid
-    FinTopic = mk_fin_topic(FileId, Filesize),
-    ?assertRCName(
-        unspecified_error,
-        emqtt:publish(C, FinTopic, <<>>, 1)
+    FinTopic = mk_fin_topic(Config, FileId, Filesize),
+
+    ?assertEqual(
+        {error, unspecified_error},
+        emqx_ft_test_helpers:fin_result(mode(Config), ClientId, C, FinTopic)
     ),
 
     % Send `fin` with the correct checksum
     Checksum = binary:encode_hex(sha256(Data)),
-    ?assertRCName(
-        success,
-        emqtt:publish(C, <<FinTopic/binary, "/", Checksum/binary>>, <<>>, 1)
+    ?assertEqual(
+        ok,
+        emqx_ft_test_helpers:fin_result(
+            mode(Config), ClientId, C, <<FinTopic/binary, "/", Checksum/binary>>
+        )
     ).
 
 t_corrupted_segment_retry(Config) ->
     C = ?config(client, Config),
+    ClientId = ?config(clientid, Config),
 
     Filename = "corruption.pdf",
     FileId = <<"4242-4242">>,
@@ -421,35 +450,89 @@ t_corrupted_segment_retry(Config) ->
 
     Meta = #{size := Filesize} = meta(Filename, Data),
 
-    ?assertRCName(success, emqtt:publish(C, mk_init_topic(FileId), encode_meta(Meta), 1)),
+    ?assertRCName(success, emqtt:publish(C, mk_init_topic(Config, FileId), encode_meta(Meta), 1)),
 
     ?assertRCName(
         success,
-        emqtt:publish(C, mk_segment_topic(FileId, Offset1, Checksum1), Seg1, 1)
+        emqtt:publish(C, mk_segment_topic(Config, FileId, Offset1, Checksum1), Seg1, 1)
     ),
 
     % segment is corrupted
     ?assertRCName(
         unspecified_error,
-        emqtt:publish(C, mk_segment_topic(FileId, Offset2, Checksum2), <<Seg2/binary, 42>>, 1)
+        emqtt:publish(
+            C, mk_segment_topic(Config, FileId, Offset2, Checksum2), <<Seg2/binary, 42>>, 1
+        )
     ),
 
     % retry
     ?assertRCName(
         success,
-        emqtt:publish(C, mk_segment_topic(FileId, Offset2, Checksum2), Seg2, 1)
+        emqtt:publish(C, mk_segment_topic(Config, FileId, Offset2, Checksum2), Seg2, 1)
     ),
 
     ?assertRCName(
         success,
-        emqtt:publish(C, mk_segment_topic(FileId, Offset3, Checksum3), Seg3, 1)
+        emqtt:publish(C, mk_segment_topic(Config, FileId, Offset3, Checksum3), Seg3, 1)
     ),
 
-    ?assertRCName(
-        success,
-        emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1)
+    ?assertEqual(
+        ok,
+        emqx_ft_test_helpers:fin_result(
+            mode(Config), ClientId, C, mk_fin_topic(Config, FileId, Filesize)
+        )
     ).
 
+t_assemble_crash(Config) ->
+    C = ?config(client, Config),
+
+    meck:new(emqx_ft_storage_fs),
+    meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _, _) -> meck:exception(error, oops) end),
+
+    ?assertRCName(
+        unspecified_error,
+        emqtt:publish(C, <<"$file/someid/fin">>, <<>>, 1)
+    ),
+
+    meck:unload(emqx_ft_storage_fs).
+
+t_assemble_timeout(Config) ->
+    C = ?config(client, Config),
+    ClientId = ?config(clientid, Config),
+
+    SleepForever = fun() ->
+        Ref = make_ref(),
+        receive
+            Ref -> ok
+        end
+    end,
+
+    ok = meck:new(emqx_ft_storage, [passthrough]),
+    ok = meck:expect(emqx_ft_storage, assemble, fun(_, _, _) ->
+        {async, spawn_link(SleepForever)}
+    end),
+
+    {Time, Res} = timer:tc(
+        fun() ->
+            emqx_ft_test_helpers:fin_result(
+                mode(Config), ClientId, C, <<"$file/someid/fin/9999999">>
+            )
+        end
+    ),
+
+    ok = meck:unload(emqx_ft_storage),
+
+    ?assertEqual(
+        {error, unspecified_error},
+        Res
+    ),
+
+    ?assert(2_000_000 < Time).
+
+%%--------------------------------------------------------------------
+%% Cluster tests
+%%--------------------------------------------------------------------
+
 t_switch_node(Config) ->
     [Node | _] = ?config(cluster_nodes, Config),
     AdditionalNodePort = emqx_ft_test_helpers:tcp_port(Node),
@@ -471,11 +554,11 @@ t_switch_node(Config) ->
 
     ?assertRCName(
         success,
-        emqtt:publish(C1, mk_init_topic(FileId), encode_meta(Meta), 1)
+        emqtt:publish(C1, mk_init_topic(Config, FileId), encode_meta(Meta), 1)
     ),
     ?assertRCName(
         success,
-        emqtt:publish(C1, mk_segment_topic(FileId, Offset0), Data0, 1)
+        emqtt:publish(C1, mk_segment_topic(Config, FileId, Offset0), Data0, 1)
     ),
 
     %% Then, switch the client to the main node
@@ -487,16 +570,16 @@ t_switch_node(Config) ->
 
     ?assertRCName(
         success,
-        emqtt:publish(C2, mk_segment_topic(FileId, Offset1), Data1, 1)
+        emqtt:publish(C2, mk_segment_topic(Config, FileId, Offset1), Data1, 1)
     ),
     ?assertRCName(
         success,
-        emqtt:publish(C2, mk_segment_topic(FileId, Offset2), Data2, 1)
+        emqtt:publish(C2, mk_segment_topic(Config, FileId, Offset2), Data2, 1)
     ),
 
     ?assertRCName(
         success,
-        emqtt:publish(C2, mk_fin_topic(FileId, Filesize), <<>>, 1)
+        emqtt:publish(C2, mk_fin_topic(Config, FileId, Filesize), <<>>, 1)
     ),
 
     ok = emqtt:stop(C2),
@@ -509,17 +592,6 @@ t_switch_node(Config) ->
         read_export(Export)
     ).
 
-t_assemble_crash(Config) ->
-    C = ?config(client, Config),
-
-    meck:new(emqx_ft_storage_fs),
-    meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _, _) -> meck:exception(error, oops) end),
-
-    ?assertRCName(
-        unspecified_error,
-        emqtt:publish(C, <<"$file/someid/fin">>, <<>>, 1)
-    ).
-
 t_unreliable_migrating_client(Config) ->
     NodeSelf = node(),
     [Node1, Node2] = ?config(cluster_nodes, Config),
@@ -543,10 +615,10 @@ t_unreliable_migrating_client(Config) ->
         {fun connect_mqtt_client/2, [NodeSelf]},
         % Send filemeta and 3 initial segments
         % (assuming client chose 100 bytes as a desired segment size)
-        {fun send_filemeta/2, [Meta]},
-        {fun send_segment/3, [0, 100]},
-        {fun send_segment/3, [100, 100]},
-        {fun send_segment/3, [200, 100]},
+        {fun send_filemeta/3, [Config, Meta]},
+        {fun send_segment/4, [Config, 0, 100]},
+        {fun send_segment/4, [Config, 100, 100]},
+        {fun send_segment/4, [Config, 200, 100]},
         % Disconnect the client cleanly
         {fun stop_mqtt_client/1, []},
         % Connect to the broker on `Node1`
@@ -555,27 +627,27 @@ t_unreliable_migrating_client(Config) ->
         % Client forgot the state for some reason and started the transfer again.
         % (assuming this is usual for a client on a device that was rebooted)
         {fun connect_mqtt_client/2, [Node2]},
-        {fun send_filemeta/2, [Meta]},
+        {fun send_filemeta/3, [Config, Meta]},
         % This time it chose 200 bytes as a segment size
-        {fun send_segment/3, [0, 200]},
-        {fun send_segment/3, [200, 200]},
+        {fun send_segment/4, [Config, 0, 200]},
+        {fun send_segment/4, [Config, 200, 200]},
         % But now it downscaled back to 100 bytes segments
-        {fun send_segment/3, [400, 100]},
+        {fun send_segment/4, [Config, 400, 100]},
         % Client lost connectivity and reconnected
         % (also had last few segments unacked and decided to resend them)
         {fun connect_mqtt_client/2, [Node2]},
-        {fun send_segment/3, [200, 200]},
-        {fun send_segment/3, [400, 200]},
+        {fun send_segment/4, [Config, 200, 200]},
+        {fun send_segment/4, [Config, 400, 200]},
         % Client lost connectivity and reconnected, this time to another node
         % (also had last segment unacked and decided to resend it)
         {fun connect_mqtt_client/2, [Node1]},
-        {fun send_segment/3, [400, 200]},
-        {fun send_segment/3, [600, eof]},
-        {fun send_finish/1, []},
+        {fun send_segment/4, [Config, 400, 200]},
+        {fun send_segment/4, [Config, 600, eof]},
+        {fun send_finish/2, [Config]},
         % Client lost connectivity and reconnected, this time to the current node
         % (client had `fin` unacked and decided to resend it)
         {fun connect_mqtt_client/2, [NodeSelf]},
-        {fun send_finish/1, []}
+        {fun send_finish/2, [Config]}
     ],
     _Context = run_commands(Commands, Context),
 
@@ -621,8 +693,8 @@ t_concurrent_fins(Config) ->
     Context1 = run_commands(
         [
             {fun connect_mqtt_client/2, [Node1]},
-            {fun send_filemeta/2, [Meta]},
-            {fun send_segment/3, [0, 100]},
+            {fun send_filemeta/3, [Config, Meta]},
+            {fun send_segment/4, [Config, 0, 100]},
             {fun stop_mqtt_client/1, []}
         ],
         Context0
@@ -634,7 +706,7 @@ t_concurrent_fins(Config) ->
         run_commands(
             [
                 {fun connect_mqtt_client/2, [Node]},
-                {fun send_finish/1, []}
+                {fun send_finish/2, [Config]}
             ],
             Context1
         )
@@ -708,14 +780,16 @@ disown_mqtt_client(Context = #{client := Client}) ->
 disown_mqtt_client(Context = #{}) ->
     Context.
 
-send_filemeta(Meta, Context = #{client := Client, fileid := FileId}) ->
+send_filemeta(Config, Meta, Context = #{client := Client, fileid := FileId}) ->
     ?assertRCName(
         success,
-        emqtt:publish(Client, mk_init_topic(FileId), encode_meta(Meta), 1)
+        emqtt:publish(Client, mk_init_topic(Config, FileId), encode_meta(Meta), 1)
     ),
     Context.
 
-send_segment(Offset, Size, Context = #{client := Client, fileid := FileId, payload := Payload}) ->
+send_segment(
+    Config, Offset, Size, Context = #{client := Client, fileid := FileId, payload := Payload}
+) ->
     Data =
         case Size of
             eof ->
@@ -725,14 +799,14 @@ send_segment(Offset, Size, Context = #{client := Client, fileid := FileId, paylo
         end,
     ?assertRCName(
         success,
-        emqtt:publish(Client, mk_segment_topic(FileId, Offset), Data, 1)
+        emqtt:publish(Client, mk_segment_topic(Config, FileId, Offset), Data, 1)
     ),
     Context.
 
-send_finish(Context = #{client := Client, fileid := FileId, filesize := Filesize}) ->
+send_finish(Config, Context = #{client := Client, fileid := FileId, filesize := Filesize}) ->
     ?assertRCName(
         success,
-        emqtt:publish(Client, mk_fin_topic(FileId, Filesize), <<>>, 1)
+        emqtt:publish(Client, mk_fin_topic(Config, FileId, Filesize), <<>>, 1)
     ),
     Context.
 
@@ -749,23 +823,30 @@ fs_exported_file_attributes(FSExports) ->
         lists:sort(FSExports)
     ).
 
-mk_init_topic(FileId) ->
-    <<"$file/", FileId/binary, "/init">>.
+mk_init_topic(Config, FileId) ->
+    RequestTopicPrefix = request_topic_prefix(Config, FileId),
+    <<RequestTopicPrefix/binary, "/init">>.
+
+mk_segment_topic(Config, FileId, Offset) when is_integer(Offset) ->
+    mk_segment_topic(Config, FileId, integer_to_binary(Offset));
+mk_segment_topic(Config, FileId, Offset) when is_binary(Offset) ->
+    RequestTopicPrefix = request_topic_prefix(Config, FileId),
+    <<RequestTopicPrefix/binary, "/", Offset/binary>>.
 
-mk_segment_topic(FileId, Offset) when is_integer(Offset) ->
-    mk_segment_topic(FileId, integer_to_binary(Offset));
-mk_segment_topic(FileId, Offset) when is_binary(Offset) ->
-    <<"$file/", FileId/binary, "/", Offset/binary>>.
+mk_segment_topic(Config, FileId, Offset, Checksum) when is_integer(Offset) ->
+    mk_segment_topic(Config, FileId, integer_to_binary(Offset), Checksum);
+mk_segment_topic(Config, FileId, Offset, Checksum) when is_binary(Offset) ->
+    RequestTopicPrefix = request_topic_prefix(Config, FileId),
+    <<RequestTopicPrefix/binary, "/", Offset/binary, "/", Checksum/binary>>.
 
-mk_segment_topic(FileId, Offset, Checksum) when is_integer(Offset) ->
-    mk_segment_topic(FileId, integer_to_binary(Offset), Checksum);
-mk_segment_topic(FileId, Offset, Checksum) when is_binary(Offset) ->
-    <<"$file/", FileId/binary, "/", Offset/binary, "/", Checksum/binary>>.
+mk_fin_topic(Config, FileId, Size) when is_integer(Size) ->
+    mk_fin_topic(Config, FileId, integer_to_binary(Size));
+mk_fin_topic(Config, FileId, Size) when is_binary(Size) ->
+    RequestTopicPrefix = request_topic_prefix(Config, FileId),
+    <<RequestTopicPrefix/binary, "/fin/", Size/binary>>.
 
-mk_fin_topic(FileId, Size) when is_integer(Size) ->
-    mk_fin_topic(FileId, integer_to_binary(Size));
-mk_fin_topic(FileId, Size) when is_binary(Size) ->
-    <<"$file/", FileId/binary, "/fin/", Size/binary>>.
+request_topic_prefix(Config, FileId) ->
+    emqx_ft_test_helpers:request_topic_prefix(mode(Config), FileId).
 
 with_offsets(Items) ->
     {List, _} = lists:mapfoldl(
@@ -799,3 +880,17 @@ list_files(ClientId) ->
 read_export(#{path := AbsFilepath}) ->
     % TODO: only works for the local filesystem exporter right now
     file:read_file(AbsFilepath).
+
+set_client_specific_ft_dirs(ClientId, Config) ->
+    FTRoot = emqx_ft_test_helpers:ft_root(Config),
+    ok = emqx_config:put(
+        [file_transfer, storage, local, segments, root],
+        filename:join([FTRoot, ClientId, segments])
+    ),
+    ok = emqx_config:put(
+        [file_transfer, storage, local, exporter, local, root],
+        filename:join([FTRoot, ClientId, exports])
+    ).
+
+mode(Config) ->
+    proplists:get_value(mode, Config, sync).

+ 3 - 3
apps/emqx_ft/test/emqx_ft_api_SUITE.erl

@@ -85,7 +85,7 @@ t_list_files(Config) ->
     FileId = <<"f1">>,
 
     Node = lists:last(test_nodes(Config)),
-    ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node),
+    ok = emqx_ft_test_helpers:upload_file(sync, ClientId, FileId, "f1", <<"data">>, Node),
 
     {ok, 200, #{<<"files">> := Files}} =
         request_json(get, uri(["file_transfer", "files"]), Config),
@@ -114,7 +114,7 @@ t_download_transfer(Config) ->
 
     Nodes = [Node | _] = test_nodes(Config),
     NodeUpload = lists:last(Nodes),
-    ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, NodeUpload),
+    ok = emqx_ft_test_helpers:upload_file(sync, ClientId, FileId, "f1", <<"data">>, NodeUpload),
 
     ?assertMatch(
         {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
@@ -185,7 +185,7 @@ t_list_files_paging(Config) ->
     ],
     ok = lists:foreach(
         fun({FileId, Name, Node}) ->
-            ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, Name, <<"data">>, Node)
+            ok = emqx_ft_test_helpers:upload_file(sync, ClientId, FileId, Name, <<"data">>, Node)
         end,
         Uploads
     ),

+ 8 - 8
apps/emqx_ft/test/emqx_ft_async_reply_SUITE.erl

@@ -55,7 +55,7 @@ t_register(_Config) ->
     PacketId = 1,
     MRef = make_ref(),
     TRef = make_ref(),
-    ok = emqx_ft_async_reply:register(PacketId, MRef, TRef),
+    ok = emqx_ft_async_reply:register(PacketId, MRef, TRef, somedata),
 
     ?assertEqual(
         undefined,
@@ -68,7 +68,7 @@ t_register(_Config) ->
     ),
 
     ?assertEqual(
-        {ok, PacketId, TRef},
+        {ok, PacketId, TRef, somedata},
         emqx_ft_async_reply:take_by_mref(MRef)
     ).
 
@@ -76,7 +76,7 @@ t_process_independence(_Config) ->
     PacketId = 1,
     MRef = make_ref(),
     TRef = make_ref(),
-    ok = emqx_ft_async_reply:register(PacketId, MRef, TRef),
+    ok = emqx_ft_async_reply:register(PacketId, MRef, TRef, somedata),
 
     Self = self(),
 
@@ -112,10 +112,10 @@ t_take(_Config) ->
     PacketId = 1,
     MRef = make_ref(),
     TRef = make_ref(),
-    ok = emqx_ft_async_reply:register(PacketId, MRef, TRef),
+    ok = emqx_ft_async_reply:register(PacketId, MRef, TRef, somedata),
 
     ?assertEqual(
-        {ok, PacketId, TRef},
+        {ok, PacketId, TRef, somedata},
         emqx_ft_async_reply:take_by_mref(MRef)
     ),
 
@@ -135,12 +135,12 @@ t_cleanup(_Config) ->
     TRef0 = make_ref(),
     MRef1 = make_ref(),
     TRef1 = make_ref(),
-    ok = emqx_ft_async_reply:register(PacketId, MRef0, TRef0),
+    ok = emqx_ft_async_reply:register(PacketId, MRef0, TRef0, somedata0),
 
     Self = self(),
 
     Pid = spawn_link(fun() ->
-        ok = emqx_ft_async_reply:register(PacketId, MRef1, TRef1),
+        ok = emqx_ft_async_reply:register(PacketId, MRef1, TRef1, somedata1),
         receive
             kickoff ->
                 ?assertEqual(
@@ -149,7 +149,7 @@ t_cleanup(_Config) ->
                 ),
 
                 ?assertEqual(
-                    {ok, PacketId, TRef1},
+                    {ok, PacketId, TRef1, somedata1},
                     emqx_ft_async_reply:take_by_mref(MRef1)
                 ),
 

+ 2 - 2
apps/emqx_ft/test/emqx_ft_conf_SUITE.erl

@@ -39,10 +39,10 @@ init_per_testcase(Case, Config) ->
         ],
         #{work_dir => emqx_cth_suite:work_dir(Case, Config)}
     ),
-    [{suite_apps, Apps} | Config].
+    [{apps, Apps} | Config].
 
 end_per_testcase(_Case, Config) ->
-    ok = emqx_cth_suite:stop(?config(suite_apps, Config)),
+    ok = emqx_cth_suite:stop(?config(apps, Config)),
     ok.
 
 %%--------------------------------------------------------------------

+ 113 - 0
apps/emqx_ft/test/emqx_ft_request_SUITE.erl

@@ -0,0 +1,113 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_request_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("stdlib/include/assert.hrl").
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx_ft, "file_transfer { enable = true, assemble_timeout = 1s}"}
+        ],
+        #{work_dir => ?config(priv_dir, Config)}
+    ),
+    [{suite_apps, Apps} | Config].
+
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(suite_apps, Config)),
+    ok.
+
+init_per_testcase(_Case, Config) ->
+    Config.
+
+end_per_testcase(_Case, _Config) ->
+    ok.
+
+%%-------------------------------------------------------------------
+%% Tests
+%%-------------------------------------------------------------------
+
+t_upload_via_requests(_Config) ->
+    C = emqx_ft_test_helpers:start_client(<<"client">>),
+
+    FileId = <<"f1">>,
+    Data = <<"hello world">>,
+    Size = byte_size(Data),
+    Meta = #{
+        name => "test.txt",
+        expire_at => erlang:system_time(_Unit = second) + 3600,
+        size => Size
+    },
+    MetaPayload = emqx_utils_json:encode(emqx_ft:encode_filemeta(Meta)),
+    MetaTopic = <<"$file/", FileId/binary, "/init">>,
+
+    ?assertMatch(
+        {ok, #{<<"reason_code">> := 0, <<"topic">> := MetaTopic}},
+        request(C, MetaTopic, MetaPayload)
+    ),
+
+    SegmentTopic = <<"$file/", FileId/binary, "/0">>,
+
+    ?assertMatch(
+        {ok, #{<<"reason_code">> := 0, <<"topic">> := SegmentTopic}},
+        request(C, SegmentTopic, Data)
+    ),
+
+    FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Size))/binary>>,
+
+    ?assertMatch(
+        {ok, #{<<"reason_code">> := 0, <<"topic">> := FinTopic}},
+        request(C, FinTopic, <<>>)
+    ).
+
+%%--------------------------------------------------------------------
+%% Helper functions
+%%--------------------------------------------------------------------
+
+request(C, Topic, Request) ->
+    CorrelaionData = emqx_ft_test_helpers:unique_binary_string(),
+    ResponseTopic = emqx_ft_test_helpers:unique_binary_string(),
+
+    Properties = #{
+        'Correlation-Data' => CorrelaionData,
+        'Response-Topic' => ResponseTopic
+    },
+    Opts = [{qos, 1}],
+
+    {ok, _, _} = emqtt:subscribe(C, ResponseTopic, 1),
+    {ok, _} = emqtt:publish(C, Topic, Properties, Request, Opts),
+
+    try
+        receive
+            {publish, #{
+                topic := ResponseTopic,
+                payload := Payload,
+                properties := #{'Correlation-Data' := CorrelaionData}
+            }} ->
+                {ok, emqx_utils_json:decode(Payload)}
+        after 1000 ->
+            {error, timeout}
+        end
+    after
+        emqtt:unsubscribe(C, ResponseTopic)
+    end.

+ 15 - 17
apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl

@@ -38,25 +38,23 @@ init_per_suite(Config) ->
 end_per_suite(_Config) ->
     ok.
 
-set_special_configs(Config) ->
-    fun
-        (emqx_ft) ->
-            Storage = emqx_ft_test_helpers:local_storage(Config, #{
-                exporter => s3, bucket_name => ?config(bucket_name, Config)
-            }),
-            emqx_ft_test_helpers:load_config(#{<<"enable">> => true, <<"storage">> => Storage});
-        (_) ->
-            ok
-    end.
-
-init_per_testcase(Case, Config0) ->
+init_per_testcase(Case, Config) ->
     ClientId = atom_to_binary(Case),
     BucketName = create_bucket(),
-    Config1 = [{bucket_name, BucketName}, {clientid, ClientId} | Config0],
-    ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], set_special_configs(Config1)),
-    Config1.
-end_per_testcase(_Case, _Config) ->
-    ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]),
+    Storage = emqx_ft_test_helpers:local_storage(Config, #{
+        exporter => s3, bucket_name => BucketName
+    }),
+    WorkDir = filename:join(?config(priv_dir, Config), atom_to_list(Case)),
+    Apps = emqx_cth_suite:start(
+        [
+            emqx_conf,
+            {emqx_ft, #{config => emqx_ft_test_helpers:config(Storage)}}
+        ],
+        #{work_dir => WorkDir}
+    ),
+    [{apps, Apps}, {bucket_name, BucketName}, {clientid, ClientId} | Config].
+end_per_testcase(_Case, Config) ->
+    ok = emqx_cth_suite:stop(?config(apps, Config)),
     ok.
 
 %%--------------------------------------------------------------------

+ 2 - 2
apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl

@@ -81,8 +81,8 @@ end_per_group(_Group, _Config) ->
 
 t_multinode_exports(Config) ->
     [Node1, Node2 | _] = ?config(cluster, Config),
-    ok = emqx_ft_test_helpers:upload_file(<<"c/1">>, <<"f:1">>, "fn1", <<"data">>, Node1),
-    ok = emqx_ft_test_helpers:upload_file(<<"c/2">>, <<"f:2">>, "fn2", <<"data">>, Node2),
+    ok = emqx_ft_test_helpers:upload_file(sync, <<"c/1">>, <<"f:1">>, "fn1", <<"data">>, Node1),
+    ok = emqx_ft_test_helpers:upload_file(sync, <<"c/2">>, <<"f:2">>, "fn2", <<"data">>, Node2),
     ?assertMatch(
         [
             #{transfer := {<<"c/1">>, <<"f:1">>}, name := "fn1"},

+ 11 - 4
apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl

@@ -25,11 +25,18 @@
 all() -> emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    ok = emqx_common_test_helpers:start_apps([emqx_ft], emqx_ft_test_helpers:env_handler(Config)),
-    Config.
+    WorkDir = ?config(priv_dir, Config),
+    Storage = emqx_ft_test_helpers:local_storage(Config),
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx_ft, #{config => emqx_ft_test_helpers:config(Storage)}}
+        ],
+        #{work_dir => WorkDir}
+    ),
+    [{suite_apps, Apps} | Config].
 
-end_per_suite(_Config) ->
-    ok = emqx_common_test_helpers:stop_apps([emqx_ft]),
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(suite_apps, Config)),
     ok.
 
 init_per_testcase(_Case, Config) ->

+ 64 - 24
apps/emqx_ft/test/emqx_ft_test_helpers.erl

@@ -24,16 +24,15 @@
 -define(S3_HOST, <<"minio">>).
 -define(S3_PORT, 9000).
 
-env_handler(Config) ->
-    fun
-        (emqx_ft) ->
-            load_config(#{<<"enable">> => true, <<"storage">> => local_storage(Config)});
-        (_) ->
-            ok
-    end.
-
 config(Storage) ->
-    #{<<"file_transfer">> => #{<<"enable">> => true, <<"storage">> => Storage}}.
+    config(Storage, #{}).
+
+config(Storage, FTOptions0) ->
+    FTOptions1 = maps:merge(
+        #{<<"enable">> => true, <<"storage">> => Storage},
+        FTOptions0
+    ),
+    #{<<"file_transfer">> => FTOptions1}.
 
 local_storage(Config) ->
     local_storage(Config, #{exporter => local}).
@@ -73,7 +72,13 @@ tcp_port(Node) ->
     Port.
 
 root(Config, Node, Tail) ->
-    iolist_to_binary(filename:join([?config(priv_dir, Config), "file_transfer", Node | Tail])).
+    iolist_to_binary(filename:join([ft_root(Config), Node | Tail])).
+
+ft_root(Config) ->
+    filename:join([?config(priv_dir, Config), "file_transfer"]).
+
+cleanup_ft_root(Config) ->
+    file:del_dir_r(emqx_ft_test_helpers:ft_root(Config)).
 
 start_client(ClientId) ->
     start_client(ClientId, node()).
@@ -85,11 +90,15 @@ start_client(ClientId, Node) ->
     Client.
 
 upload_file(ClientId, FileId, Name, Data) ->
-    upload_file(ClientId, FileId, Name, Data, node()).
+    upload_file(sync, ClientId, FileId, Name, Data).
 
-upload_file(ClientId, FileId, Name, Data, Node) ->
+upload_file(Mode, ClientId, FileId, Name, Data) ->
+    upload_file(Mode, ClientId, FileId, Name, Data, node()).
+
+upload_file(Mode, ClientId, FileId, Name, Data, Node) ->
     C1 = start_client(ClientId, Node),
 
+    ReqTopicPrefix = request_topic_prefix(Mode, FileId),
     Size = byte_size(Data),
     Meta = #{
         name => Name,
@@ -98,25 +107,53 @@ upload_file(ClientId, FileId, Name, Data, Node) ->
     },
     MetaPayload = emqx_utils_json:encode(emqx_ft:encode_filemeta(Meta)),
 
-    ct:pal("MetaPayload = ~ts", [MetaPayload]),
-
-    MetaTopic = <<"$file/", FileId/binary, "/init">>,
+    MetaTopic = <<ReqTopicPrefix/binary, "/init">>,
     {ok, #{reason_code_name := success}} = emqtt:publish(C1, MetaTopic, MetaPayload, 1),
     {ok, #{reason_code_name := success}} = emqtt:publish(
-        C1, <<"$file/", FileId/binary, "/0">>, Data, 1
+        C1, <<ReqTopicPrefix/binary, "/0">>, Data, 1
     ),
 
-    FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Size))/binary>>,
-    FinResult =
-        case emqtt:publish(C1, FinTopic, <<>>, 1) of
-            {ok, #{reason_code_name := success}} ->
-                ok;
-            {ok, #{reason_code_name := Error}} ->
-                {error, Error}
-        end,
+    FinTopic = <<ReqTopicPrefix/binary, "/fin/", (integer_to_binary(Size))/binary>>,
+    FinResult = fin_result(Mode, ClientId, C1, FinTopic),
     ok = emqtt:stop(C1),
     FinResult.
 
+fin_result(Mode, ClientId, C, FinTopic) ->
+    {ok, _, _} = emqtt:subscribe(C, response_topic(ClientId), 1),
+    case emqtt:publish(C, FinTopic, <<>>, 1) of
+        {ok, #{reason_code_name := success}} ->
+            maybe_wait_for_assemble(Mode, ClientId, FinTopic);
+        {ok, #{reason_code_name := Error}} ->
+            {error, Error}
+    end.
+
+maybe_wait_for_assemble(sync, _ClientId, _FinTopic) ->
+    ok;
+maybe_wait_for_assemble(async, ClientId, FinTopic) ->
+    ResponseTopic = response_topic(ClientId),
+    receive
+        {publish, #{payload := Payload, topic := ResponseTopic}} ->
+            case emqx_utils_json:decode(Payload) of
+                #{<<"topic">> := FinTopic, <<"reason_code">> := 0} ->
+                    ok;
+                #{<<"topic">> := FinTopic, <<"reason_code">> := Code} ->
+                    {error, emqx_reason_codes:name(Code)};
+                _ ->
+                    maybe_wait_for_assemble(async, ClientId, FinTopic)
+            end
+    end.
+
+response_topic(ClientId) ->
+    <<"$file-response/", (to_bin(ClientId))/binary>>.
+
+request_topic_prefix(sync, FileId) ->
+    <<"$file/", (to_bin(FileId))/binary>>;
+request_topic_prefix(async, FileId) ->
+    <<"$file-async/", (to_bin(FileId))/binary>>.
+
+to_bin(Val) ->
+    iolist_to_binary(Val).
+
 aws_config() ->
     emqx_s3_test_helpers:aws_config(tcp, binary_to_list(?S3_HOST), ?S3_PORT).
 
@@ -129,3 +166,6 @@ pem_privkey() ->
         "ju0VBj6tOX1y6C0U+85VOM0UU5xqvw==\n"
         "-----END EC PRIVATE KEY-----\n"
     >>.
+
+unique_binary_string() ->
+    emqx_guid:to_hexstr(emqx_guid:gen()).

+ 3 - 0
changes/ee/feat-11541.en.md

@@ -0,0 +1,3 @@
+Introduced additional way of file transfer interactions. Now client may send file transfer commands to `$file-async/...` topic instead of `$file/...` and receive command execution results as messages to `$file-response/{clientId}` topic.
+This simplifies file transfer feature usage in certain cases, for example, when a client uses MQTTv3 or when the broker is behind an MQTT bridge.
+See the [EIP-0021](https://github.com/emqx/eip) for more details.