Parcourir la source

Merge pull request #14172 from thalesmg/20241106-r583-zombie-connector-probe

fix(resource manager): avoid zombie resource after HTTP API request is forcefully killed
Thales Macedo Garitezi il y a 1 an
Parent
commit
3abd5c54fd

+ 87 - 18
apps/emqx_resource/src/emqx_resource_cache_cleaner.erl

@@ -16,7 +16,19 @@
 
 -module(emqx_resource_cache_cleaner).
 
--export([start_link/0]).
+-behaviour(gen_server).
+
+-include_lib("snabbkaffe/include/trace.hrl").
+
+%% API
+-export([
+    start_link/0,
+
+    add_cache/2,
+    add_dry_run/2
+]).
+
+%% `gen_server' API
 -export([
     init/1,
     handle_call/3,
@@ -24,51 +36,108 @@
     handle_info/2,
     terminate/2
 ]).
--export([add/2]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
 
 -define(SERVER, ?MODULE).
 
+%% calls/casts/infos
+-record(add_cache, {id :: emqx_resource:resource_id(), pid :: pid()}).
+-record(add_dry_run, {id :: emqx_resource:resource_id(), pid :: pid()}).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
 start_link() ->
     gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
 
-add(ID, Pid) ->
-    gen_server:call(?SERVER, {add, ID, Pid}, infinity).
+add_cache(ID, Pid) ->
+    gen_server:call(?SERVER, #add_cache{id = ID, pid = Pid}, infinity).
+
+add_dry_run(ID, Pid) ->
+    gen_server:cast(?SERVER, #add_dry_run{id = ID, pid = Pid}).
+
+%%------------------------------------------------------------------------------
+%% `gen_server' API
+%%------------------------------------------------------------------------------
 
 init(_) ->
     process_flag(trap_exit, true),
-    {ok, #{pmon => emqx_pmon:new()}}.
+    State = #{
+        cache_pmon => emqx_pmon:new(),
+        dry_run_pmon => emqx_pmon:new()
+    },
+    {ok, State}.
 
-handle_call({add, ID, Pid}, _From, #{pmon := Pmon} = State) ->
+handle_call(#add_cache{id = ID, pid = Pid}, _From, #{cache_pmon := Pmon} = State) ->
     NewPmon = emqx_pmon:monitor(Pid, ID, Pmon),
-    {reply, ok, State#{pmon => NewPmon}};
+    {reply, ok, State#{cache_pmon := NewPmon}};
 handle_call(_Request, _From, State) ->
     {reply, ok, State}.
 
+handle_cast(#add_dry_run{id = ID, pid = Pid}, #{dry_run_pmon := Pmon0} = State0) ->
+    Pmon = emqx_pmon:monitor(Pid, ID, Pmon0),
+    State = State0#{dry_run_pmon := Pmon},
+    {noreply, State};
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
-handle_info({'DOWN', _MRef, process, Pid, _Reason}, #{pmon := Pmon} = State) ->
-    NewPmon =
-        case emqx_pmon:find(Pid, Pmon) of
-            {ok, ID} ->
-                maybe_erase_cache(Pid, ID),
-                emqx_pmon:erase(Pid, Pmon);
-            error ->
-                Pmon
-        end,
-    {noreply, State#{pmon => NewPmon}};
+handle_info({'DOWN', _MRef, process, Pid, _Reason}, State0) ->
+    State = handle_down(Pid, State0),
+    {noreply, State};
 handle_info(_Info, State) ->
     {noreply, State}.
 
 terminate(_Reason, _State) ->
     ok.
 
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
+
+handle_down(Pid, State0) ->
+    #{
+        cache_pmon := CachePmon,
+        dry_run_pmon := DryrunPmon
+    } = State0,
+    case emqx_pmon:find(Pid, CachePmon) of
+        {ok, ID} ->
+            handle_down_cache(ID, Pid, State0);
+        error ->
+            case emqx_pmon:find(Pid, DryrunPmon) of
+                {ok, ID} ->
+                    handle_down_dry_run(ID, Pid, State0);
+                error ->
+                    State0
+            end
+    end.
+
+handle_down_cache(ID, Pid, State0) ->
+    #{cache_pmon := Pmon0} = State0,
+    maybe_erase_cache(Pid, ID),
+    Pmon = emqx_pmon:erase(Pid, Pmon0),
+    State0#{cache_pmon := Pmon}.
+
+handle_down_dry_run(ID, Pid, State0) ->
+    #{dry_run_pmon := Pmon0} = State0,
+    %% No need to wait here: since it's a dry run resource, it won't be recreated,
+    %% assuming the ID is random enough.
+    spawn(fun() ->
+        emqx_resource_manager_sup:delete_child(ID),
+        ?tp("resource_cache_cleaner_deleted_child", #{id => ID})
+    end),
+    Pmon = emqx_pmon:erase(Pid, Pmon0),
+    State0#{dry_run_pmon := Pmon}.
+
 maybe_erase_cache(DownManager, ID) ->
     case emqx_resource_cache:read_manager_pid(ID) =:= DownManager of
         true ->
             emqx_resource_cache:erase(ID);
         false ->
             %% already erased, or already replaced by another manager due to quick
-            %% retart by supervisor
+            %% restart by supervisor
             ok
     end.

+ 4 - 1
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -249,6 +249,9 @@ create_dry_run(ResId, ResourceType, Config, OnReadyCallback) ->
             true -> maps:get(resource_opts, Config, #{});
             false -> #{}
         end,
+    %% Ensure that the dry run resource is terminated, even if this process is forcefully
+    %% killed (e.g.: cowboy / HTTP API request times out).
+    emqx_resource_cache_cleaner:add_dry_run(ResId, self()),
     ok = emqx_resource_manager_sup:ensure_child(
         ResId, <<"dry_run">>, ResourceType, Config, Opts
     ),
@@ -547,11 +550,11 @@ start_link(ResId, Group, ResourceType, Config, Opts) ->
 init({DataIn, Opts}) ->
     process_flag(trap_exit, true),
     Data = DataIn#data{pid = self()},
+    emqx_resource_cache_cleaner:add_cache(Data#data.id, self()),
     case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
         true ->
             %% init the cache so that lookup/1 will always return something
             UpdatedData = update_state(Data#data{status = ?status_connecting}),
-            emqx_resource_cache_cleaner:add(Data#data.id, self()),
             {ok, ?state_connecting, UpdatedData, {next_event, internal, start_resource}};
         false ->
             %% init the cache so that lookup/1 will always return something

+ 9 - 3
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -74,15 +74,16 @@ set_callback_mode(Mode) ->
 on_start(_InstId, #{create_error := true}) ->
     ?tp(connector_demo_start_error, #{}),
     error("some error");
-on_start(InstId, #{create_error := {delay, Delay, Agent}} = Opts) ->
+on_start(InstId, #{create_error := {delay, Delay, Agent}} = State0) ->
     ?tp(connector_demo_start_delay, #{}),
+    State = maps:remove(create_error, State0),
     case emqx_utils_agent:get_and_update(Agent, fun(St) -> {St, called} end) of
         not_called ->
             emqx_resource:allocate_resource(InstId, i_should_be_deallocated, yep),
             timer:sleep(Delay),
-            on_start(InstId, maps:remove(create_error, Opts));
+            on_start(InstId, State);
         called ->
-            on_start(InstId, maps:remove(create_error, Opts))
+            on_start(InstId, State)
     end;
 on_start(InstId, #{name := Name} = Opts) ->
     Register = maps:get(register, Opts, false),
@@ -99,6 +100,11 @@ on_stop(_InstId, undefined) ->
     ok;
 on_stop(_InstId, #{stop_error := true}) ->
     {error, stop_error};
+on_stop(InstId, #{stop_error := {ask, HowToStop}} = State) ->
+    case HowToStop() of
+        continue ->
+            on_stop(InstId, maps:remove(stop_error, State))
+    end;
 on_stop(InstId, #{pid := Pid}) ->
     persistent_term:erase(?PT_CHAN_KEY(InstId)),
     stop_counter_process(Pid).

+ 82 - 0
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -3360,6 +3360,83 @@ t_resource_and_channel_health_check_race(_Config) ->
     ),
     ok.
 
+%% Simulates the race condition where a dry run request takes too long, and the HTTP API
+%% request process is then forcefully killed before it has the chance to properly cleanup
+%% and remove the dry run/probe resource.
+t_dryrun_timeout_then_force_kill_during_stop(_Config) ->
+    ?check_trace(
+        #{timetrap => 30_000},
+        begin
+            ?force_ordering(
+                #{?snk_kind := connector_demo_on_stop_will_delay},
+                #{?snk_kind := will_kill_request}
+            ),
+
+            %% Simulates a cowboy request process.
+            {ok, StartAgent} = emqx_utils_agent:start_link(not_called),
+            {ok, StopAgent} = emqx_utils_agent:start_link({delay, 1_000}),
+            HowToStop = fun() ->
+                %% Delay only the first time, so test cleanup is faster.
+                Action = emqx_utils_agent:get_and_update(StopAgent, fun
+                    (continue) ->
+                        {continue, continue};
+                    ({delay, _} = Delay) ->
+                        {Delay, continue}
+                end),
+                case Action of
+                    {delay, Delay} ->
+                        ?tp(connector_demo_on_stop_will_delay, #{}),
+                        timer:sleep(Delay),
+                        continue;
+                    continue ->
+                        continue
+                end
+            end,
+            {Pid, MRef} = spawn_monitor(fun() ->
+                Res = dryrun(
+                    ?ID,
+                    ?TEST_RESOURCE,
+                    #{
+                        name => test_resource,
+                        create_error => {delay, 1_000, StartAgent},
+                        stop_error => {ask, HowToStop},
+                        resource_opts => #{
+                            health_check_interval => 100,
+                            start_timeout => 100
+                        }
+                    }
+                ),
+                exit(Res)
+            end),
+            on_exit(fun() -> exit(Pid, kill) end),
+
+            %% Simulates cowboy forcefully killing the request after it takes too long and the caller
+            %% has already closed the connection.
+            spawn_link(fun() ->
+                ?tp(will_kill_request, #{}),
+                exit(Pid, kill)
+            end),
+
+            receive
+                {'DOWN', MRef, process, Pid, Reason} ->
+                    ct:pal("request ~p died: ~p", [Pid, Reason]),
+                    ?assertEqual(killed, Reason),
+                    ok
+            end,
+
+            ?block_until(#{?snk_kind := "resource_cache_cleaner_deleted_child"}),
+
+            %% No children should be lingering
+            ?assertEqual([], supervisor:which_children(emqx_resource_manager_sup)),
+            %% Cache should be clean too
+            ?assertEqual([], emqx_resource:list_instances()),
+
+            ok
+        end,
+        []
+    ),
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Helpers
 %%------------------------------------------------------------------------------
@@ -3618,6 +3695,11 @@ create(Id, Group, Type, Config, Opts) ->
     on_exit(fun() -> emqx_resource:remove_local(Id) end),
     Res.
 
+dryrun(Id, Type, Config) ->
+    TestPid = self(),
+    OnReady = fun(ResId) -> TestPid ! {resource_ready, ResId} end,
+    emqx_resource:create_dry_run_local(Id, Type, Config, OnReady).
+
 log_consistency_prop() ->
     {"check state and cache consistency", fun ?MODULE:log_consistency_prop/1}.
 log_consistency_prop(Trace) ->

+ 1 - 0
changes/ce/fix-14172.en.md

@@ -0,0 +1 @@
+Fixed a potential race condition where testing a connector using the HTTP API could leave lingering resources if the HTTP request timed out.