Просмотр исходного кода

feat(ft): tie file transfer frontend and backend together

Ilya Averyanov 3 лет назад
Родитель
Сommit
d36ca18bff

+ 22 - 7
apps/emqx/src/emqx_channel.erl

@@ -744,9 +744,13 @@ do_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) ->
     {ok, NChannel};
 do_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) ->
     PubRes = emqx_broker:publish(Msg),
-    RC = puback_reason_code(PubRes),
-    NChannel = ensure_quota(PubRes, Channel),
-    handle_out(puback, {PacketId, RC}, NChannel);
+    RC = puback_reason_code(PacketId, Msg, PubRes),
+    case RC of
+        undefined ->
+            {ok, Channel};
+        _Value ->
+            do_finish_publish(PacketId, PubRes, RC, Channel)
+    end;
 do_publish(
     PacketId,
     Msg = #message{qos = ?QOS_2},
@@ -754,7 +758,7 @@ do_publish(
 ) ->
     case emqx_session:publish(ClientInfo, PacketId, Msg, Session) of
         {ok, PubRes, NSession} ->
-            RC = puback_reason_code(PubRes),
+            RC = pubrec_reason_code(PubRes),
             NChannel0 = set_session(NSession, Channel),
             NChannel1 = ensure_timer(await_timer, NChannel0),
             NChannel2 = ensure_quota(PubRes, NChannel1),
@@ -767,6 +771,10 @@ do_publish(
             handle_out(disconnect, RC, Channel)
     end.
 
+do_finish_publish(PacketId, PubRes, RC, Channel) ->
+    NChannel = ensure_quota(PubRes, Channel),
+    handle_out(puback, {PacketId, RC}, NChannel).
+
 ensure_quota(_, Channel = #channel{quota = undefined}) ->
     Channel;
 ensure_quota(PubRes, Channel = #channel{quota = Limiter}) ->
@@ -786,9 +794,14 @@ ensure_quota(PubRes, Channel = #channel{quota = Limiter}) ->
             ensure_timer(quota_timer, Intv, Channel#channel{quota = NLimiter})
     end.
 
--compile({inline, [puback_reason_code/1]}).
-puback_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS;
-puback_reason_code([_ | _]) -> ?RC_SUCCESS.
+-compile({inline, [pubrec_reason_code/1]}).
+pubrec_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS;
+pubrec_reason_code([_ | _]) -> ?RC_SUCCESS.
+
+puback_reason_code(PacketId, Msg, [] = PubRes) ->
+    emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_NO_MATCHING_SUBSCRIBERS);
+puback_reason_code(PacketId, Msg, [_ | _] = PubRes) ->
+    emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_SUCCESS).
 
 -compile({inline, [after_message_acked/3]}).
 after_message_acked(ClientInfo, Msg, PubAckProps) ->
@@ -1283,6 +1296,8 @@ handle_info(die_if_test = Info, Channel) ->
     die_if_test_compiled(),
     ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {ok, Channel};
+handle_info({puback, PacketId, PubRes, RC}, Channel) ->
+    do_finish_publish(PacketId, PubRes, RC, Channel);
 handle_info(Info, Channel) ->
     ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {ok, Channel}.

+ 1 - 1
apps/emqx/src/emqx_cm.erl

@@ -313,7 +313,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, #{conn_mod := NewConnM
                         Session1 = emqx_persistent_session:persist(
                             ClientInfo, ConnInfo, Session
                         ),
-                        ok = emqx_hooks:run('channel.takeovered', [NewConnMod, Self, TakoverData]),
+                        ok = emqx_hooks:run('channel.takenover', [NewConnMod, Self, TakoverData]),
                         {ok, #{
                             session => clean_session(Session1),
                             present => true,

+ 1 - 1
apps/emqx/test/emqx_channel_SUITE.erl

@@ -1133,7 +1133,7 @@ t_ws_cookie_init(_) ->
     ?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)).
 
 %%--------------------------------------------------------------------
-%% Test cases for other mechnisms
+%% Test cases for other mechanisms
 %%--------------------------------------------------------------------
 
 t_flapping_detect(_) ->

+ 70 - 0
apps/emqx/test/emqx_channel_delayed_puback_SUITE.erl

@@ -0,0 +1,70 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2018-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_channel_delayed_puback_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx/include/emqx_hooks.hrl").
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    emqx_common_test_helpers:boot_modules(all),
+    emqx_common_test_helpers:start_apps([]),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_common_test_helpers:stop_apps([]).
+
+init_per_testcase(Case, Config) ->
+    ?MODULE:Case({init, Config}).
+
+end_per_testcase(Case, Config) ->
+    ?MODULE:Case({'end', Config}).
+
+%%--------------------------------------------------------------------
+%% Test cases
+%%--------------------------------------------------------------------
+
+t_delayed_puback({init, Config}) ->
+    emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST),
+    Config;
+t_delayed_puback({'end', _Config}) ->
+    emqx_hooks:del('message.puback', {?MODULE, on_message_puback});
+t_delayed_puback(_Config) ->
+    {ok, ConnPid} = emqtt:start_link([{clientid, <<"clientid">>}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(ConnPid),
+    {ok, #{reason_code := ?RC_UNSPECIFIED_ERROR}} = emqtt:publish(
+        ConnPid, <<"topic">>, <<"hello">>, 1
+    ),
+    emqtt:disconnect(ConnPid).
+
+%%--------------------------------------------------------------------
+%% Helpers
+%%--------------------------------------------------------------------
+
+on_message_puback(PacketId, _Msg, PubRes, _RC) ->
+    erlang:send(self(), {puback, PacketId, PubRes, ?RC_UNSPECIFIED_ERROR}),
+    {stop, undefined}.

+ 130 - 9
apps/emqx_ft/src/emqx_ft.erl

@@ -17,6 +17,10 @@
 -module(emqx_ft).
 
 -include("emqx_ft.hrl").
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx/include/emqx_hooks.hrl").
 
 -export([
     create_tab/0,
@@ -27,9 +31,14 @@
 -export([
     on_channel_unregistered/1,
     on_channel_takeover/3,
-    on_channel_takeovered/3
+    on_channel_takenover/3,
+    on_message_publish/1,
+    on_message_puback/4
 ]).
 
+%% For Debug
+-export([transfer/2, storage/0]).
+
 -export_type([clientid/0]).
 -export_type([transfer/0]).
 -export_type([offset/0]).
@@ -67,16 +76,20 @@ create_tab() ->
     ok.
 
 hook() ->
-    % ok = emqx_hooks:put('channel.registered', {?MODULE, on_channel_registered, []}),
-    ok = emqx_hooks:put('channel.unregistered', {?MODULE, on_channel_unregistered, []}),
-    ok = emqx_hooks:put('channel.takeover', {?MODULE, on_channel_takeover, []}),
-    ok = emqx_hooks:put('channel.takeovered', {?MODULE, on_channel_takeovered, []}).
+    ok = emqx_hooks:put('channel.unregistered', {?MODULE, on_channel_unregistered, []}, ?HP_LOWEST),
+    ok = emqx_hooks:put('channel.takeover', {?MODULE, on_channel_takeover, []}, ?HP_LOWEST),
+    ok = emqx_hooks:put('channel.takenover', {?MODULE, on_channel_takenover, []}, ?HP_LOWEST),
+
+    ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_LOWEST),
+    ok = emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST).
 
 unhook() ->
-    % ok = emqx_hooks:del('channel.registered', {?MODULE, on_channel_registered}),
     ok = emqx_hooks:del('channel.unregistered', {?MODULE, on_channel_unregistered}),
     ok = emqx_hooks:del('channel.takeover', {?MODULE, on_channel_takeover}),
-    ok = emqx_hooks:del('channel.takeovered', {?MODULE, on_channel_takeovered}).
+    ok = emqx_hooks:del('channel.takenover', {?MODULE, on_channel_takenover}),
+
+    ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
+    ok = emqx_hooks:del('message.puback', {?MODULE, on_message_puback}).
 
 %%--------------------------------------------------------------------
 %% Hooks
@@ -93,11 +106,30 @@ on_channel_takeover(_ConnMod, ChanPid, TakeoverData) ->
             ok
     end.
 
-on_channel_takeovered(_ConnMod, ChanPid, #{ft_data := FTData}) ->
+on_channel_takenover(_ConnMod, ChanPid, #{ft_data := FTData}) ->
     ok = put_ft_data(ChanPid, FTData);
-on_channel_takeovered(_ConnMod, _ChanPid, _) ->
+on_channel_takenover(_ConnMod, _ChanPid, _) ->
     ok.
 
+on_message_publish(
+    Msg = #message{
+        id = _Id,
+        topic = <<"$file/", _/binary>>
+    }
+) ->
+    Headers = Msg#message.headers,
+    {stop, Msg#message{headers = Headers#{allow_publish => false}}};
+on_message_publish(Msg) ->
+    {ok, Msg}.
+
+on_message_puback(PacketId, #message{topic = Topic} = Msg, _PubRes, _RC) ->
+    case Topic of
+        <<"$file/", FileCommand/binary>> ->
+            {stop, on_file_command(PacketId, Msg, FileCommand)};
+        _ ->
+            ignore
+    end.
+
 %%--------------------------------------------------------------------
 %% Private funs
 %%--------------------------------------------------------------------
@@ -115,3 +147,92 @@ delete_ft_data(ChanPid) ->
 put_ft_data(ChanPid, FTData) ->
     true = ets:insert(?FT_TAB, #emqx_ft{chan_pid = ChanPid, ft_data = FTData}),
     ok.
+
+on_file_command(PacketId, Msg, FileCommand) ->
+    case string:split(FileCommand, <<"/">>, all) of
+        [FileId, <<"init">>] ->
+            on_init(Msg, FileId);
+        [FileId, <<"fin">>] ->
+            on_fin(PacketId, Msg, FileId, undefined);
+        [FileId, <<"fin">>, Checksum] ->
+            on_fin(PacketId, Msg, FileId, Checksum);
+        [FileId, <<"abort">>] ->
+            on_abort(Msg, FileId);
+        [FileId, Offset] ->
+            on_segment(Msg, FileId, Offset, undefined);
+        [FileId, Offset, Checksum] ->
+            on_segment(Msg, FileId, Offset, Checksum);
+        _ ->
+            ?RC_UNSPECIFIED_ERROR
+    end.
+
+on_init(Msg, FileId) ->
+    ?SLOG(info, #{
+        msg => "on_init",
+        mqtt_msg => Msg,
+        file_id => FileId
+    }),
+    % Payload = Msg#message.payload,
+    % %% Add validations here
+    % Meta = emqx_json:decode(Payload, [return_maps]),
+    % ok = emqx_ft_storage_fs:store_filemeta(storage(), transfer(Msg, FileId), Meta),
+    % ?RC_SUCCESS.
+    ?RC_UNSPECIFIED_ERROR.
+
+on_abort(_Msg, _FileId) ->
+    %% TODO
+    ?RC_SUCCESS.
+
+on_segment(Msg, FileId, Offset, Checksum) ->
+    ?SLOG(info, #{
+        msg => "on_segment",
+        mqtt_msg => Msg,
+        file_id => FileId,
+        offset => Offset,
+        checksum => Checksum
+    }),
+    % %% TODO: handle checksum
+    % Payload = Msg#message.payload,
+    % %% Add offset/checksum validations
+    % ok = emqx_ft_storage_fs:store_segment(
+    %     storage(),
+    %     transfer(Msg, FileId),
+    %     {binary_to_integer(Offset), Payload}
+    % ),
+    % ?RC_SUCCESS.
+    ?RC_UNSPECIFIED_ERROR.
+
+on_fin(PacketId, Msg, FileId, Checksum) ->
+    ?SLOG(info, #{
+        msg => "on_fin",
+        mqtt_msg => Msg,
+        file_id => FileId,
+        checksum => Checksum,
+        packet_id => PacketId
+    }),
+    % %% TODO: handle checksum? Do we need it?
+    % {ok, _} = emqx_ft_storage_fs:assemble(
+    %     storage(),
+    %     transfer(Msg, FileId),
+    %     callback(FileId, Msg)
+    % ),
+    Callback = callback(FileId, PacketId),
+    spawn(fun() -> Callback({error, not_implemented}) end),
+    undefined.
+
+callback(_FileId, PacketId) ->
+    ChanPid = self(),
+    fun
+        (ok) ->
+            erlang:send(ChanPid, {puback, PacketId, [], ?RC_SUCCESS});
+        ({error, _}) ->
+            erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR})
+    end.
+
+transfer(Msg, FileId) ->
+    ClientId = Msg#message.from,
+    {ClientId, FileId}.
+
+%% TODO: configure
+storage() ->
+    filename:join(emqx:data_dir(), "file_transfer").

+ 13 - 9
apps/emqx_ft/src/emqx_ft_assembler.erl

@@ -16,7 +16,7 @@
 
 -module(emqx_ft_assembler).
 
--export([start_link/2]).
+-export([start_link/3]).
 
 -behaviour(gen_statem).
 -export([callback_mode/0]).
@@ -35,7 +35,8 @@
     transfer :: emqx_ft:transfer(),
     assembly :: _TODO,
     file :: io:device(),
-    hash
+    hash,
+    callback :: fun((ok | {error, term()}) -> any())
 }).
 
 -define(RPC_LIST_TIMEOUT, 1000).
@@ -43,8 +44,8 @@
 
 %%
 
-start_link(Storage, Transfer) ->
-    gen_server:start_link(?MODULE, {Storage, Transfer}, []).
+start_link(Storage, Transfer, Callback) ->
+    gen_server:start_link(?MODULE, {Storage, Transfer, Callback}, []).
 
 %%
 
@@ -53,12 +54,13 @@ start_link(Storage, Transfer) ->
 callback_mode() ->
     handle_event_function.
 
-init({Storage, Transfer}) ->
+init({Storage, Transfer, Callback}) ->
     St = #st{
         storage = Storage,
         transfer = Transfer,
         assembly = emqx_ft_assembly:new(),
-        hash = crypto:hash_init(sha256)
+        hash = crypto:hash_init(sha256),
+        callback = Callback
     },
     {ok, list_local_fragments, St, ?internal([])}.
 
@@ -91,7 +93,7 @@ handle_event({list_remote_fragments, Nodes}, internal, _, St) ->
             fun
                 ({Node, {ok, {ok, Fragments}}}, Asm) ->
                     emqx_ft_assembly:append(Asm, Node, Fragments);
-                ({Node, Result}, Asm) ->
+                ({_Node, _Result}, Asm) ->
                     % TODO: log?
                     Asm
             end,
@@ -128,9 +130,11 @@ handle_event({assemble, [{Node, Segment} | Rest]}, internal, _, St = #st{}) ->
     end;
 handle_event({assemble, []}, internal, _, St = #st{}) ->
     {next_state, complete, St, ?internal([])};
-handle_event(complete, internal, _, St = #st{assembly = Asm, file = Handle}) ->
+handle_event(complete, internal, _, St = #st{assembly = Asm, file = Handle, callback = Callback}) ->
     Filemeta = emqx_ft_assembly:filemeta(Asm),
-    ok = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle),
+    Result = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle),
+    %% TODO: safe apply
+    _ = Callback(Result),
     {stop, shutdown}.
 
 % handle_continue(list_local, St = #st{storage = Storage, transfer = Transfer, assembly = Asm}) ->

+ 7 - 9
apps/emqx_ft/src/emqx_ft_assembler_sup.erl

@@ -16,24 +16,22 @@
 
 -module(emqx_ft_assembler_sup).
 
--export([start_link/1]).
+-export([start_link/0]).
 -export([start_child/3]).
 
 -behaviour(supervisor).
 -export([init/1]).
 
--define(REF(ID), {via, gproc, {n, l, {?MODULE, ID}}}).
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
-start_link(ID) ->
-    supervisor:start_link(?REF(ID), ?MODULE, []).
-
-start_child(ID, Storage, Transfer) ->
+start_child(Storage, Transfer, Callback) ->
     Childspec = #{
         id => {Storage, Transfer},
-        start => {emqx_ft_assembler, start_link, [Storage, Transfer]},
+        start => {emqx_ft_assembler, start_link, [Storage, Transfer, Callback]},
         restart => transient
     },
-    supervisor:start_child(?REF(ID), Childspec).
+    supervisor:start_child(?MODULE, Childspec).
 
 init(_) ->
     SupFlags = #{
@@ -41,4 +39,4 @@ init(_) ->
         intensity => 100,
         period => 1000
     },
-    {ok, SupFlags, []}.
+    {ok, {SupFlags, []}}.

+ 20 - 20
apps/emqx_ft/src/emqx_ft_storage_fs.erl

@@ -24,7 +24,7 @@
 -export([store_filemeta/3]).
 -export([store_segment/3]).
 -export([list/2]).
--export([assemble/2]).
+-export([assemble/3]).
 
 -export([open_file/3]).
 -export([complete/4]).
@@ -101,12 +101,12 @@
 
 %%
 
--define(PROCREF(Root), {via, gproc, {n, l, {?MODULE, Root}}}).
+% -define(PROCREF(Root), {via, gproc, {n, l, {?MODULE, Root}}}).
 
--spec start_link(root()) ->
-    {ok, pid()} | {error, already_started}.
-start_link(Root) ->
-    gen_server:start_link(?PROCREF(Root), ?MODULE, [], []).
+% -spec start_link(root()) ->
+%     {ok, pid()} | {error, already_started}.
+% start_link(Root) ->
+%     gen_server:start_link(?PROCREF(Root), ?MODULE, [], []).
 
 %% Store manifest in the backing filesystem.
 %% Atomic operation.
@@ -119,13 +119,13 @@ store_filemeta(Storage, Transfer, Meta) ->
         {ok, Meta} ->
             _ = touch_file(Filepath),
             ok;
-        {ok, Conflict} ->
+        {ok, _Conflict} ->
             % TODO
             % We won't see conflicts in case of concurrent `store_filemeta`
             % requests. It's rather odd scenario so it's fine not to worry
             % about it too much now.
             {error, conflict};
-        {error, Reason} when Reason =:= notfound; Reason =:= corrupted ->
+        {error, Reason} when Reason =:= notfound; Reason =:= corrupted; Reason =:= enoent ->
             write_file_atomic(Filepath, encode_filemeta(Meta))
     end.
 
@@ -154,15 +154,15 @@ list(Storage, Transfer) ->
             Error
     end.
 
--spec assemble(storage(), transfer()) ->
+-spec assemble(storage(), transfer(), fun((ok | {error, term()}) -> any())) ->
     % {ok, _Assembler :: pid()} | {error, incomplete} | {error, badrpc} | {error, _TODO}.
     {ok, _Assembler :: pid()} | {error, _TODO}.
-assemble(Storage, Transfer) ->
-    emqx_ft_assembler_sup:start_child(Storage, Storage, Transfer).
+assemble(Storage, Transfer, Callback) ->
+    emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback).
 
 %%
 
--opaque handle() :: {file:name(), io:device(), crypto:hash_state()}.
+-type handle() :: {file:name(), io:device(), crypto:hash_state()}.
 
 -spec open_file(storage(), transfer(), filemeta()) ->
     {ok, handle()} | {error, _TODO}.
@@ -229,12 +229,12 @@ verify_checksum(undefined, _) ->
 
 %%
 
--spec init(root()) -> {ok, storage()}.
-init(Root) ->
-    % TODO: garbage_collect(...)
-    {ok, Root}.
+% -spec init(root()) -> {ok, storage()}.
+% init(Root) ->
+%     % TODO: garbage_collect(...)
+%     {ok, Root}.
 
-%%
+% %%
 
 -define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]).
 
@@ -243,7 +243,7 @@ schema() ->
         roots => [
             {name, hoconsc:mk(string(), #{required => true})},
             {size, hoconsc:mk(non_neg_integer())},
-            {expire_at, hoconsc:mk(non_neg_integer(), #{required => true})},
+            {expire_at, hoconsc:mk(non_neg_integer())},
             {checksum, hoconsc:mk({atom(), binary()}, #{converter => converter(checksum)})},
             {segments_ttl, hoconsc:mk(pos_integer())},
             {user_data, hoconsc:mk(json_value())}
@@ -353,7 +353,7 @@ safe_decode(Content, DecodeFun) ->
     try
         {ok, DecodeFun(Content)}
     catch
-        C:R:Stacktrace ->
+        _C:_R:_Stacktrace ->
             % TODO: Log?
             {error, corrupted}
     end.
@@ -414,7 +414,7 @@ mk_filefrag(Dirname, Filename, Fun) ->
                 timestamp => Fileinfo#file_info.mtime,
                 fragment => Frag
             }};
-        {error, Reason} ->
+        {error, _Reason} ->
             false
     end.
 

+ 11 - 1
apps/emqx_ft/src/emqx_ft_sup.erl

@@ -43,7 +43,17 @@ init([]) ->
         intensity => 100,
         period => 10
     },
-    ChildSpecs = [],
+
+    AssemblerSup = #{
+        id => emqx_ft_assembler_sup,
+        start => {emqx_ft_assembler_sup, start_link, []},
+        restart => permanent,
+        shutdown => infinity,
+        type => supervisor,
+        modules => [emqx_ft_assembler_sup]
+    },
+
+    ChildSpecs = [AssemblerSup],
     {ok, {SupFlags, ChildSpecs}}.
 
 %% internal functions

+ 2 - 1
rebar.config.erl

@@ -400,7 +400,8 @@ relx_apps(ReleaseType, Edition) ->
             emqx_prometheus,
             emqx_psk,
             emqx_slow_subs,
-            emqx_plugins
+            emqx_plugins,
+            emqx_ft
         ] ++
         [quicer || is_quicer_supported()] ++
         [bcrypt || provide_bcrypt_release(ReleaseType)] ++