|
|
@@ -30,10 +30,7 @@
|
|
|
|
|
|
-export([
|
|
|
on_message_publish/1,
|
|
|
- on_message_puback/4,
|
|
|
- on_client_timeout/3,
|
|
|
- on_process_down/4,
|
|
|
- on_channel_unregistered/1
|
|
|
+ on_message_puback/4
|
|
|
]).
|
|
|
|
|
|
-export([
|
|
|
@@ -88,8 +85,6 @@
|
|
|
checksum => checksum()
|
|
|
}.
|
|
|
|
|
|
--define(FT_EVENT(EVENT), {?MODULE, EVENT}).
|
|
|
-
|
|
|
-define(ACK_AND_PUBLISH(Result), {true, Result}).
|
|
|
-define(ACK(Result), {false, Result}).
|
|
|
-define(DELAY_ACK, delay).
|
|
|
@@ -100,21 +95,11 @@
|
|
|
|
|
|
hook() ->
|
|
|
ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_LOWEST),
|
|
|
- ok = emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST),
|
|
|
- ok = emqx_hooks:put('client.timeout', {?MODULE, on_client_timeout, []}, ?HP_LOWEST),
|
|
|
- ok = emqx_hooks:put(
|
|
|
- 'client.monitored_process_down', {?MODULE, on_process_down, []}, ?HP_LOWEST
|
|
|
- ),
|
|
|
- ok = emqx_hooks:put(
|
|
|
- 'cm.channel.unregistered', {?MODULE, on_channel_unregistered, []}, ?HP_LOWEST
|
|
|
- ).
|
|
|
+ ok = emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST).
|
|
|
|
|
|
unhook() ->
|
|
|
ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
|
|
|
- ok = emqx_hooks:del('message.puback', {?MODULE, on_message_puback}),
|
|
|
- ok = emqx_hooks:del('client.timeout', {?MODULE, on_client_timeout}),
|
|
|
- ok = emqx_hooks:del('client.monitored_process_down', {?MODULE, on_process_down}),
|
|
|
- ok = emqx_hooks:del('cm.channel.unregistered', {?MODULE, on_channel_unregistered}).
|
|
|
+ ok = emqx_hooks:del('message.puback', {?MODULE, on_message_puback}).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% API
|
|
|
@@ -152,40 +137,6 @@ on_message_puback(PacketId, #message{from = From, topic = Topic} = Msg, _PubRes,
|
|
|
ignore
|
|
|
end.
|
|
|
|
|
|
-on_channel_unregistered(ChannelPid) ->
|
|
|
- ok = emqx_ft_async_reply:deregister_all(ChannelPid).
|
|
|
-
|
|
|
-on_client_timeout(_TRef0, ?FT_EVENT({MRef, TopicReplyData}), Acc) ->
|
|
|
- _ = erlang:demonitor(MRef, [flush]),
|
|
|
- Result = {error, timeout},
|
|
|
- _ = publish_response(Result, TopicReplyData),
|
|
|
- case emqx_ft_async_reply:take_by_mref(MRef) of
|
|
|
- {ok, undefined, _TRef1, _TopicReplyData} ->
|
|
|
- {stop, Acc};
|
|
|
- {ok, PacketId, _TRef1, _TopicReplyData} ->
|
|
|
- {stop, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, result_to_rc(Result))) | Acc]};
|
|
|
- not_found ->
|
|
|
- {ok, Acc}
|
|
|
- end;
|
|
|
-on_client_timeout(_TRef, _Event, Acc) ->
|
|
|
- {ok, Acc}.
|
|
|
-
|
|
|
-on_process_down(MRef, _Pid, DownReason, Acc) ->
|
|
|
- case emqx_ft_async_reply:take_by_mref(MRef) of
|
|
|
- {ok, PacketId, TRef, TopicReplyData} ->
|
|
|
- _ = emqx_utils:cancel_timer(TRef),
|
|
|
- Result = down_reason_to_result(DownReason),
|
|
|
- _ = publish_response(Result, TopicReplyData),
|
|
|
- case PacketId of
|
|
|
- undefined ->
|
|
|
- {stop, Acc};
|
|
|
- _ ->
|
|
|
- {stop, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, result_to_rc(Result))) | Acc]}
|
|
|
- end;
|
|
|
- not_found ->
|
|
|
- {ok, Acc}
|
|
|
- end.
|
|
|
-
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Handlers for transfer messages
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -253,7 +204,7 @@ on_init(#{packet_id := PacketId}, Msg, Transfer, Meta) ->
|
|
|
filemeta => Meta
|
|
|
}),
|
|
|
%% Currently synchronous.
|
|
|
- %% If we want to make it async, we need to use `emqx_ft_async_reply`,
|
|
|
+ %% If we want to make it async, we need to use `with_responder`,
|
|
|
%% like in `on_fin`.
|
|
|
?ACK_AND_PUBLISH(store_filemeta(Transfer, Meta)).
|
|
|
|
|
|
@@ -271,11 +222,13 @@ on_segment(#{packet_id := PacketId}, Msg, Transfer, Offset, Checksum) ->
|
|
|
}),
|
|
|
Segment = {Offset, Msg#message.payload},
|
|
|
%% Currently synchronous.
|
|
|
- %% If we want to make it async, we need to use `emqx_ft_async_reply`,
|
|
|
+ %% If we want to make it async, we need to use `with_responder`,
|
|
|
%% like in `on_fin`.
|
|
|
?ACK_AND_PUBLISH(store_segment(Transfer, Segment)).
|
|
|
|
|
|
-on_fin(#{packet_id := PacketId} = TopicReplyData, Msg, Transfer, FinalSize, FinalChecksum) ->
|
|
|
+on_fin(
|
|
|
+ #{packet_id := PacketId, mode := Mode} = TopicReplyData, Msg, Transfer, FinalSize, FinalChecksum
|
|
|
+) ->
|
|
|
?tp(info, "file_transfer_fin", #{
|
|
|
mqtt_msg => Msg,
|
|
|
packet_id => PacketId,
|
|
|
@@ -283,39 +236,78 @@ on_fin(#{packet_id := PacketId} = TopicReplyData, Msg, Transfer, FinalSize, Fina
|
|
|
final_size => FinalSize,
|
|
|
checksum => FinalChecksum
|
|
|
}),
|
|
|
- %% TODO: handle checksum? Do we need it?
|
|
|
- with_new_packet(
|
|
|
- TopicReplyData,
|
|
|
- PacketId,
|
|
|
- fun() ->
|
|
|
- case assemble(Transfer, FinalSize, FinalChecksum) of
|
|
|
- ok ->
|
|
|
- ?ACK_AND_PUBLISH(ok);
|
|
|
- %% Assembling started, packet will be acked/replied by monitor or timeout
|
|
|
- {async, Pid} ->
|
|
|
- register_async_worker(Pid, TopicReplyData);
|
|
|
- {error, _} = Error ->
|
|
|
- ?ACK_AND_PUBLISH(Error)
|
|
|
- end
|
|
|
+ FinPacketKey = {self(), PacketId},
|
|
|
+ Callback = fun(Result) ->
|
|
|
+ on_complete("assemble", TopicReplyData, FinPacketKey, Transfer, Result)
|
|
|
+ end,
|
|
|
+ with_responder(FinPacketKey, Callback, emqx_ft_conf:assemble_timeout(), fun() ->
|
|
|
+ case assemble(Transfer, FinalSize, FinalChecksum) of
|
|
|
+ %% Assembling completed, ack through the responder right away
|
|
|
+ ok ->
|
|
|
+ emqx_ft_responder:ack(FinPacketKey, ok),
|
|
|
+ ?DELAY_ACK;
|
|
|
+ %% Assembling started, packet will be acked by the responder
|
|
|
+ {async, Pid} ->
|
|
|
+ ok = emqx_ft_responder:kickoff(FinPacketKey, Pid),
|
|
|
+ ack_if_async(Mode);
|
|
|
+ %% Assembling failed, ack through the responder
|
|
|
+ {error, _} = Error ->
|
|
|
+ emqx_ft_responder:ack(FinPacketKey, Error),
|
|
|
+ ?DELAY_ACK
|
|
|
end
|
|
|
- ).
|
|
|
-
|
|
|
-register_async_worker(Pid, #{mode := Mode, packet_id := PacketId} = TopicReplyData) ->
|
|
|
- MRef = erlang:monitor(process, Pid),
|
|
|
- TRef = erlang:start_timer(
|
|
|
- emqx_ft_conf:assemble_timeout(), self(), ?FT_EVENT({MRef, TopicReplyData})
|
|
|
- ),
|
|
|
- case Mode of
|
|
|
- async ->
|
|
|
- ok = emqx_ft_async_reply:register(MRef, TRef, TopicReplyData),
|
|
|
- ok = emqx_ft_storage:kickoff(Pid),
|
|
|
- ?ACK(ok);
|
|
|
- sync ->
|
|
|
- ok = emqx_ft_async_reply:register(PacketId, MRef, TRef, TopicReplyData),
|
|
|
- ok = emqx_ft_storage:kickoff(Pid),
|
|
|
+ end).
|
|
|
+
|
|
|
+with_responder(Key, Callback, Timeout, CriticalSection) ->
|
|
|
+ case emqx_ft_responder:start(Key, Callback, Timeout) of
|
|
|
+ %% We have new packet
|
|
|
+ {ok, _} ->
|
|
|
+ CriticalSection();
|
|
|
+ %% Packet already received.
|
|
|
+ %% Since we are still handling the previous one,
|
|
|
+ %% we probably have retransmit here
|
|
|
+ {error, {already_started, _}} ->
|
|
|
?DELAY_ACK
|
|
|
end.
|
|
|
|
|
|
+ack_if_async(sync) ->
|
|
|
+ ?DELAY_ACK;
|
|
|
+ack_if_async(async) ->
|
|
|
+ ?ACK(ok).
|
|
|
+
|
|
|
+on_complete(Op, #{mode := Mode} = TopicReplyData, {ChanPid, PacketId}, Transfer, ResponderResult) ->
|
|
|
+ ?tp(debug, "on_complete", #{
|
|
|
+ operation => Op,
|
|
|
+ packet_id => PacketId,
|
|
|
+ transfer => Transfer
|
|
|
+ }),
|
|
|
+ Result =
|
|
|
+ case ResponderResult of
|
|
|
+ {RespMode, ok} when RespMode == ack orelse RespMode == down ->
|
|
|
+ ok;
|
|
|
+ {RespMode, {error, _} = Reason} when RespMode == ack orelse RespMode == down ->
|
|
|
+ ?tp(error, Op ++ "_failed", #{
|
|
|
+ transfer => Transfer,
|
|
|
+ reason => Reason
|
|
|
+ }),
|
|
|
+ Reason;
|
|
|
+ timeout ->
|
|
|
+ ?tp(error, Op ++ "_timed_out", #{
|
|
|
+ transfer => Transfer
|
|
|
+ }),
|
|
|
+ {error, timeout}
|
|
|
+ end,
|
|
|
+ NeedAck =
|
|
|
+ case {ResponderResult, Mode} of
|
|
|
+ {{down, _}, async} -> false;
|
|
|
+ {timeout, async} -> false;
|
|
|
+ _ -> true
|
|
|
+ end,
|
|
|
+ NeedAck andalso ack_packet(ChanPid, PacketId, Result),
|
|
|
+ _ = publish_response(Result, TopicReplyData).
|
|
|
+
|
|
|
+ack_packet(ChanPid, PacketId, Result) ->
|
|
|
+ erlang:send(ChanPid, {puback, PacketId, [], result_to_rc(Result)}).
|
|
|
+
|
|
|
topic_reply_data(Mode, From, PacketId, #message{topic = Topic, headers = Headers}) ->
|
|
|
Props = maps:get(properties, Headers, #{}),
|
|
|
#{
|
|
|
@@ -483,19 +475,3 @@ clientid_to_binary(A) when is_atom(A) ->
|
|
|
atom_to_binary(A);
|
|
|
clientid_to_binary(B) when is_binary(B) ->
|
|
|
B.
|
|
|
-
|
|
|
-down_reason_to_result(normal) ->
|
|
|
- ok;
|
|
|
-down_reason_to_result(shutdown) ->
|
|
|
- ok;
|
|
|
-down_reason_to_result({shutdown, Result}) ->
|
|
|
- Result;
|
|
|
-down_reason_to_result(noproc) ->
|
|
|
- {error, noproc};
|
|
|
-down_reason_to_result(Error) ->
|
|
|
- {error, {internal_error, Error}}.
|
|
|
-
|
|
|
-with_new_packet(#{mode := async}, _PacketId, Fun) ->
|
|
|
- Fun();
|
|
|
-with_new_packet(#{mode := sync}, PacketId, Fun) ->
|
|
|
- emqx_ft_async_reply:with_new_packet(PacketId, Fun, undefined).
|