Sfoglia il codice sorgente

feat(ft): simplify responder mechanism

Make responder short-lived process responsible for a single task, and
manage them with supervisor + gproc.
Andrew Mayorov 3 anni fa
parent
commit
2cdf486bf4

+ 30 - 58
apps/emqx_ft/src/emqx_ft.erl

@@ -35,7 +35,7 @@
     decode_filemeta/1
 ]).
 
--export([on_assemble_timeout/1]).
+-export([on_assemble/2]).
 
 -export_type([
     clientid/0,
@@ -227,45 +227,25 @@ on_fin(PacketId, Msg, FileId, Checksum) ->
     }),
     %% TODO: handle checksum? Do we need it?
     FinPacketKey = {self(), PacketId},
-    _ =
-        case
-            emqx_ft_responder:register(
-                FinPacketKey, fun ?MODULE:on_assemble_timeout/1, ?ASSEMBLE_TIMEOUT
-            )
-        of
-            %% We have new fin packet
-            ok ->
-                Callback = callback(FinPacketKey, FileId),
-                case assemble(transfer(Msg, FileId), Callback) of
-                    %% Assembling started, packet will be acked by the callback or the responder
-                    {ok, _} ->
-                        undefined;
-                    %% Assembling failed, unregister the packet key
-                    {error, Reason} ->
-                        ?SLOG(warning, #{
-                            msg => "assemble_not_started",
-                            mqtt_msg => Msg,
-                            file_id => FileId,
-                            reason => Reason
-                        }),
-                        case emqx_ft_responder:unregister(FinPacketKey) of
-                            %% We successfully unregistered the packet key,
-                            %% so we can send the error code at once
-                            ok ->
-                                ?RC_UNSPECIFIED_ERROR;
-                            %% Someone else already unregistered the key,
-                            %% that is, either responder or someone else acked the packet,
-                            %% we do not have to ack
-                            {error, not_found} ->
-                                undefined
-                        end
-                end;
-            %% Fin packet already received.
-            %% Since we are still handling the previous one,
-            %% we probably have retransmit here
-            {error, already_registered} ->
-                undefined
-        end.
+    case emqx_ft_responder:start(FinPacketKey, fun ?MODULE:on_assemble/2, ?ASSEMBLE_TIMEOUT) of
+        %% We have new fin packet
+        {ok, _} ->
+            Callback = fun(Result) -> emqx_ft_responder:ack(FinPacketKey, Result) end,
+            case assemble(transfer(Msg, FileId), Callback) of
+                %% Assembling started, packet will be acked by the callback or the responder
+                {ok, _} ->
+                    ok;
+                %% Assembling failed, ack through the responder
+                {error, _} = Error ->
+                    emqx_ft_responder:ack(FinPacketKey, Error)
+            end;
+        %% Fin packet already received.
+        %% Since we are still handling the previous one,
+        %% we probably have retransmit here
+        {error, {already_started, _}} ->
+            ok
+    end,
+    undefined.
 
 assemble(Transfer, Callback) ->
     try
@@ -278,28 +258,20 @@ assemble(Transfer, Callback) ->
             {error, {internal_error, E}}
     end.
 
-callback({ChanPid, PacketId} = Key, _FileId) ->
-    fun(Result) ->
-        case emqx_ft_responder:unregister(Key) of
-            ok ->
-                case Result of
-                    ok ->
-                        erlang:send(ChanPid, {puback, PacketId, [], ?RC_SUCCESS});
-                    {error, _} ->
-                        erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR})
-                end;
-            {error, not_found} ->
-                ok
-        end
-    end.
-
 transfer(Msg, FileId) ->
     ClientId = Msg#message.from,
     {ClientId, FileId}.
 
-on_assemble_timeout({ChanPid, PacketId}) ->
-    ?SLOG(warning, #{msg => "on_assemble_timeout", packet_id => PacketId}),
-    erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}).
+on_assemble({ChanPid, PacketId}, Result) ->
+    ?SLOG(debug, #{msg => "on_assemble", packet_id => PacketId, result => Result}),
+    case Result of
+        {ack, ok} ->
+            erlang:send(ChanPid, {puback, PacketId, [], ?RC_SUCCESS});
+        {ack, {error, _}} ->
+            erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR});
+        timeout ->
+            erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR})
+    end.
 
 validate(Validations, Fun) ->
     case do_validate(Validations, []) of

+ 38 - 84
apps/emqx_ft/src/emqx_ft_responder.erl

@@ -23,73 +23,50 @@
 
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
--export([start_link/0]).
+%% API
+-export([start/3]).
+-export([ack/2]).
 
--export([
-    register/3,
-    unregister/1
-]).
+%% Supervisor API
+-export([start_link/3]).
 
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
 
--define(SERVER, ?MODULE).
--define(TAB, ?MODULE).
+-define(REF(Key), {via, gproc, {n, l, {?MODULE, Key}}}).
 
 -type key() :: term().
+-type respfun() :: fun(({ack, _Result} | timeout) -> _SideEffect).
 
 %%--------------------------------------------------------------------
 %% API
 %% -------------------------------------------------------------------
 
--spec start_link() -> startlink_ret().
-start_link() ->
-    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-
--spec register(Key, DefaultAction, Timeout) -> ok | {error, already_registered} when
-    Key :: key(),
-    DefaultAction :: fun((Key) -> any()),
-    Timeout :: timeout().
-register(Key, DefaultAction, Timeout) ->
-    case ets:lookup(?TAB, Key) of
-        [] ->
-            gen_server:call(?SERVER, {register, Key, DefaultAction, Timeout});
-        [{Key, _Action, _Ref}] ->
-            {error, already_registered}
-    end.
-
--spec unregister(Key) -> ok | {error, not_found} when
-    Key :: key().
-unregister(Key) ->
-    gen_server:call(?SERVER, {unregister, Key}).
+-spec start(key(), timeout(), respfun()) -> startlink_ret().
+start(Key, RespFun, Timeout) ->
+    emqx_ft_responder_sup:start_child(Key, RespFun, Timeout).
+
+-spec ack(key(), _Result) -> _Return.
+ack(Key, Result) ->
+    % TODO: it's possible to avoid term copy
+    gen_server:call(?REF(Key), {ack, Result}, infinity).
+
+-spec start_link(key(), timeout(), respfun()) -> startlink_ret().
+start_link(Key, RespFun, Timeout) ->
+    gen_server:start_link(?REF(Key), ?MODULE, {Key, RespFun, Timeout}, []).
 
 %%--------------------------------------------------------------------
 %% gen_server callbacks
 %% -------------------------------------------------------------------
 
-init([]) ->
-    _ = ets:new(?TAB, [named_table, protected, set, {read_concurrency, true}]),
-    {ok, #{}}.
-
-handle_call({register, Key, DefaultAction, Timeout}, _From, State) ->
-    ?SLOG(warning, #{msg => "register", key => Key, timeout => Timeout}),
-    case ets:lookup(?TAB, Key) of
-        [] ->
-            TRef = erlang:start_timer(Timeout, self(), {timeout, Key}),
-            true = ets:insert(?TAB, {Key, DefaultAction, TRef}),
-            {reply, ok, State};
-        [{_, _Action, _Ref}] ->
-            {reply, {error, already_registered}, State}
-    end;
-handle_call({unregister, Key}, _From, State) ->
-    ?SLOG(warning, #{msg => "unregister", key => Key}),
-    case ets:lookup(?TAB, Key) of
-        [] ->
-            {reply, {error, not_found}, State};
-        [{_, _Action, TRef}] ->
-            _ = erlang:cancel_timer(TRef),
-            true = ets:delete(?TAB, Key),
-            {reply, ok, State}
-    end;
+init({Key, RespFun, Timeout}) ->
+    _ = erlang:process_flag(trap_exit, true),
+    _TRef = erlang:send_after(Timeout, self(), timeout),
+    {ok, {Key, RespFun}}.
+
+handle_call({ack, Result}, _From, {Key, RespFun}) ->
+    Ret = apply(RespFun, [Key, {ack, Result}]),
+    ?tp(ft_responder_ack, #{key => Key, result => Result, return => Ret}),
+    {stop, {shutdown, Ret}, Ret, undefined};
 handle_call(Msg, _From, State) ->
     ?SLOG(warning, #{msg => "unknown_call", call_msg => Msg}),
     {reply, {error, unknown_call}, State}.
@@ -98,40 +75,17 @@ handle_cast(Msg, State) ->
     ?SLOG(warning, #{msg => "unknown_cast", cast_msg => Msg}),
     {noreply, State}.
 
-handle_info({timeout, TRef, {timeout, Key}}, State) ->
-    case ets:lookup(?TAB, Key) of
-        [] ->
-            {noreply, State};
-        [{_, Action, TRef}] ->
-            _ = erlang:cancel_timer(TRef),
-            true = ets:delete(?TAB, Key),
-            ok = safe_apply(Action, [Key]),
-            ?tp(ft_timeout_action_applied, #{key => Key}),
-            {noreply, State}
-    end;
+handle_info(timeout, {Key, RespFun}) ->
+    Ret = apply(RespFun, [Key, timeout]),
+    ?tp(ft_responder_timeout, #{key => Key, return => Ret}),
+    {stop, {shutdown, Ret}, undefined};
 handle_info(Msg, State) ->
     ?SLOG(warning, #{msg => "unknown_message", info_msg => Msg}),
     {noreply, State}.
 
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-terminate(_Reason, _State) ->
+terminate(_Reason, undefined) ->
+    ok;
+terminate(Reason, {Key, RespFun}) ->
+    Ret = apply(RespFun, [Key, timeout]),
+    ?tp(ft_responder_shutdown, #{key => Key, reason => Reason, return => Ret}),
     ok.
-
-%%--------------------------------------------------------------------
-%% Internal functions
-%%--------------------------------------------------------------------
-
-safe_apply(Fun, Args) ->
-    try apply(Fun, Args) of
-        _ -> ok
-    catch
-        Class:Reason:Stacktrace ->
-            ?SLOG(error, #{
-                msg => "safe_apply_failed",
-                class => Class,
-                reason => Reason,
-                stacktrace => Stacktrace
-            })
-    end.

+ 48 - 0
apps/emqx_ft/src/emqx_ft_responder_sup.erl

@@ -0,0 +1,48 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_responder_sup).
+
+-export([start_link/0]).
+-export([start_child/3]).
+
+-behaviour(supervisor).
+-export([init/1]).
+
+-define(SUPERVISOR, ?MODULE).
+
+%%
+
+-spec start_link() -> {ok, pid()}.
+start_link() ->
+    supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []).
+
+start_child(Key, RespFun, Timeout) ->
+    supervisor:start_child(?SUPERVISOR, [Key, RespFun, Timeout]).
+
+-spec init(_) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
+init(_) ->
+    Flags = #{
+        strategy => simple_one_for_one,
+        intensity => 100,
+        period => 100
+    },
+    ChildSpec = #{
+        id => responder,
+        start => {emqx_ft_responder, start_link, []},
+        restart => temporary
+    },
+    {ok, {Flags, [ChildSpec]}}.

+ 3 - 3
apps/emqx_ft/src/emqx_ft_sup.erl

@@ -53,12 +53,12 @@ init([]) ->
     },
 
     Responder = #{
-        id => emqx_ft_responder,
-        start => {emqx_ft_responder, start_link, []},
+        id => emqx_ft_responder_sup,
+        start => {emqx_ft_responder_sup, start_link, []},
         restart => permanent,
         shutdown => infinity,
         type => worker,
-        modules => [emqx_ft_responder]
+        modules => [emqx_ft_responder_sup]
     },
 
     ChildSpecs = [Responder, AssemblerSup, FileReaderSup],

+ 32 - 33
apps/emqx_ft/test/emqx_ft_responder_SUITE.erl

@@ -19,7 +19,6 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
--include_lib("common_test/include/ct.hrl").
 -include_lib("stdlib/include/assert.hrl").
 -include_lib("emqx/include/asserts.hrl").
 
@@ -39,58 +38,58 @@ init_per_testcase(_Case, Config) ->
 end_per_testcase(_Case, _Config) ->
     ok.
 
-t_register_unregister(_Config) ->
+t_start_ack(_Config) ->
     Key = <<"test">>,
-    DefaultAction = fun(_) -> ok end,
-    ?assertEqual(
-        ok,
-        emqx_ft_responder:register(Key, DefaultAction, 1000)
+    DefaultAction = fun(_Key, {ack, Ref}) -> Ref end,
+    ?assertMatch(
+        {ok, _Pid},
+        emqx_ft_responder:start(Key, DefaultAction, 1000)
     ),
-    ?assertEqual(
-        {error, already_registered},
-        emqx_ft_responder:register(Key, DefaultAction, 1000)
+    ?assertMatch(
+        {error, {already_started, _Pid}},
+        emqx_ft_responder:start(Key, DefaultAction, 1000)
     ),
+    Ref = make_ref(),
     ?assertEqual(
-        ok,
-        emqx_ft_responder:unregister(Key)
+        Ref,
+        emqx_ft_responder:ack(Key, Ref)
     ),
-    ?assertEqual(
-        {error, not_found},
-        emqx_ft_responder:unregister(Key)
+    ?assertExit(
+        {noproc, _},
+        emqx_ft_responder:ack(Key, Ref)
     ).
 
 t_timeout(_Config) ->
     Key = <<"test">>,
     Self = self(),
-    DefaultAction = fun(K) -> Self ! {timeout, K} end,
-    ok = emqx_ft_responder:register(Key, DefaultAction, 20),
+    DefaultAction = fun(K, timeout) -> Self ! {timeout, K} end,
+    {ok, _Pid} = emqx_ft_responder:start(Key, DefaultAction, 20),
     receive
         {timeout, Key} ->
             ok
     after 100 ->
         ct:fail("emqx_ft_responder not called")
     end,
-    ?assertEqual(
-        {error, not_found},
-        emqx_ft_responder:unregister(Key)
+    ?assertExit(
+        {noproc, _},
+        emqx_ft_responder:ack(Key, oops)
     ).
 
-t_action_exception(_Config) ->
-    Key = <<"test">>,
-    DefaultAction = fun(K) -> error({oops, K}) end,
-
-    ?assertWaitEvent(
-        emqx_ft_responder:register(Key, DefaultAction, 10),
-        #{?snk_kind := ft_timeout_action_applied, key := <<"test">>},
-        1000
-    ),
-    ?assertEqual(
-        {error, not_found},
-        emqx_ft_responder:unregister(Key)
-    ).
+% t_action_exception(_Config) ->
+%     Key = <<"test">>,
+%     DefaultAction = fun(K) -> error({oops, K}) end,
+%     ?assertWaitEvent(
+%         emqx_ft_responder:start(Key, DefaultAction, 10),
+%         #{?snk_kind := ft_timeout_action_applied, key := <<"test">>},
+%         1000
+%     ),
+%     ?assertEqual(
+%         {error, not_found},
+%         emqx_ft_responder:ack(Key, oops)
+%     ).
 
 t_unknown_msgs(_Config) ->
-    Pid = whereis(emqx_ft_responder),
+    {ok, Pid} = emqx_ft_responder:start(make_ref(), fun(_, _) -> ok end, 100),
     Pid ! {unknown_msg, <<"test">>},
     ok = gen_server:cast(Pid, {unknown_msg, <<"test">>}),
     ?assertEqual(