ソースを参照

fix(resource manager): force kill process if stuck when stopping/removing

Fixes https://emqx.atlassian.net/browse/EMQX-12357
Thales Macedo Garitezi 1 年間 前
コミット
3013189cd7

+ 1 - 0
apps/emqx_resource/src/emqx_resource.erl

@@ -85,6 +85,7 @@
     get_allocated_resources_list/1,
     forget_allocated_resources/1,
     deallocate_resource/2,
+    clean_allocated_resources/2,
     %% Get channel config from resource
     call_get_channel_config/3,
     % Call the format query result function

+ 47 - 3
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -63,6 +63,10 @@
 %% Internal exports.
 -export([worker_resource_health_check/1, worker_channel_health_check/2]).
 
+-ifdef(TEST).
+-export([stop/2]).
+-endif.
+
 % State record
 -record(data, {
     id,
@@ -254,7 +258,17 @@ remove(ResId) when is_binary(ResId) ->
 -spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}.
 remove(ResId, ClearMetrics) when is_binary(ResId) ->
     try
-        safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION)
+        case safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) of
+            {error, timeout} ->
+                ?tp(error, "forcefully_stopping_resource_due_to_timeout", #{
+                    action => remove,
+                    resource_id => ResId
+                }),
+                force_kill(ResId),
+                ok;
+            Res ->
+                Res
+        end
     after
         %% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process
         %% If the 'remove' call above had succeeded, this is mostly a no-op but still needed to avoid race condition.
@@ -287,9 +301,20 @@ start(ResId, Opts) ->
 %% @doc Stop the resource
 -spec stop(resource_id()) -> ok | {error, Reason :: term()}.
 stop(ResId) ->
-    case safe_call(ResId, stop, ?T_OPERATION) of
+    stop(ResId, ?T_OPERATION).
+
+-spec stop(resource_id(), timeout()) -> ok | {error, Reason :: term()}.
+stop(ResId, Timeout) ->
+    case safe_call(ResId, stop, Timeout) of
         ok ->
             ok;
+        {error, timeout} ->
+            ?tp(error, "forcefully_stopping_resource_due_to_timeout", #{
+                action => stop,
+                resource_id => ResId
+            }),
+            force_kill(ResId),
+            ok;
         {error, _Reason} = Error ->
             Error
     end.
@@ -406,6 +431,25 @@ get_error(ResId, #{added_channels := #{} = Channels} = ResourceData) when
 get_error(_ResId, #{error := Error}) ->
     Error.
 
+force_kill(ResId) ->
+    case gproc:whereis_name(?NAME(ResId)) of
+        undefined ->
+            ok;
+        Pid when is_pid(Pid) ->
+            exit(Pid, kill),
+            try_clean_allocated_resources(ResId),
+            ok
+    end.
+
+try_clean_allocated_resources(ResId) ->
+    case try_read_cache(ResId) of
+        #data{mod = Mod} ->
+            catch emqx_resource:clean_allocated_resources(ResId, Mod),
+            ok;
+        _ ->
+            ok
+    end.
+
 %% Server start/stop callbacks
 
 %% @doc Function called from the supervisor to actually start the server
@@ -737,7 +781,7 @@ maybe_stop_resource(#data{status = ?rm_status_stopped} = Data) ->
     Data.
 
 stop_resource(#data{state = ResState, id = ResId} = Data) ->
-    %% We don't care the return value of the Mod:on_stop/2.
+    %% We don't care about the return value of `Mod:on_stop/2'.
     %% The callback mod should make sure the resource is stopped after on_stop/2
     %% is returned.
     HasAllocatedResources = emqx_resource:has_allocated_resources(ResId),

+ 13 - 0
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -71,6 +71,16 @@ set_callback_mode(Mode) ->
 on_start(_InstId, #{create_error := true}) ->
     ?tp(connector_demo_start_error, #{}),
     error("some error");
+on_start(InstId, #{create_error := {delay, Delay, Agent}} = Opts) ->
+    ?tp(connector_demo_start_delay, #{}),
+    case emqx_utils_agent:get_and_update(Agent, fun(St) -> {St, called} end) of
+        not_called ->
+            emqx_resource:allocate_resource(InstId, i_should_be_deallocated, yep),
+            timer:sleep(Delay),
+            on_start(InstId, maps:remove(create_error, Opts));
+        called ->
+            on_start(InstId, maps:remove(create_error, Opts))
+    end;
 on_start(InstId, #{name := Name} = Opts) ->
     Register = maps:get(register, Opts, false),
     StopError = maps:get(stop_error, Opts, false),
@@ -81,6 +91,9 @@ on_start(InstId, #{name := Name} = Opts) ->
         pid => spawn_counter_process(Name, Register)
     }}.
 
+on_stop(_InstId, undefined) ->
+    ?tp(connector_demo_free_resources_without_state, #{}),
+    ok;
 on_stop(_InstId, #{stop_error := true}) ->
     {error, stop_error};
 on_stop(InstId, #{pid := Pid}) ->

+ 37 - 0
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -3189,6 +3189,43 @@ t_non_blocking_channel_health_check(_Config) ->
     ),
     ok.
 
+%% Test that `stop' forcefully stops the resource manager even if it's stuck on a sync
+%% call such as `on_start', and that the claimed resources, if any, are freed.
+t_force_stop(_Config) ->
+    ?check_trace(
+        begin
+            {ok, Agent} = emqx_utils_agent:start_link(not_called),
+            {ok, _} =
+                create(
+                    ?ID,
+                    ?DEFAULT_RESOURCE_GROUP,
+                    ?TEST_RESOURCE,
+                    #{
+                        name => test_resource,
+                        create_error => {delay, 30_000, Agent}
+                    },
+                    #{
+                        health_check_interval => 100,
+                        start_timeout => 100
+                    }
+                ),
+            ?assertEqual(ok, emqx_resource_manager:stop(?ID, _Timeout = 100)),
+            ok
+        end,
+        [
+            log_consistency_prop(),
+            fun(Trace) ->
+                ?assertMatch([_ | _], ?of_kind(connector_demo_start_delay, Trace)),
+                ?assertMatch(
+                    [_ | _], ?of_kind("forcefully_stopping_resource_due_to_timeout", Trace)
+                ),
+                ?assertMatch([_ | _], ?of_kind(connector_demo_free_resources_without_state, Trace)),
+                ok
+            end
+        ]
+    ),
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Helpers
 %%------------------------------------------------------------------------------

+ 66 - 0
apps/emqx_utils/test/emqx_utils_agent.erl

@@ -0,0 +1,66 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% @doc Similar to Elixir's [`Agent'](https://hexdocs.pm/elixir/Agent.html).
+
+-module(emqx_utils_agent).
+
+%% API
+-export([start_link/1, get/1, get_and_update/2]).
+
+%% `gen_server' API
+-export([init/1, handle_call/3]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+-type state() :: term().
+
+-type get_and_update_fn() :: fun((state()) -> {term(), state()}).
+
+-record(get_and_update, {fn :: get_and_update_fn()}).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+-spec start_link(state()) -> gen_server:start_ret().
+start_link(InitState) ->
+    gen_server:start_link(?MODULE, InitState, []).
+
+-spec get(gen_server:server_ref()) -> term().
+get(ServerRef) ->
+    Fn = fun(St) -> {St, St} end,
+    gen_server:call(ServerRef, #get_and_update{fn = Fn}).
+
+-spec get_and_update(gen_server:server_ref(), get_and_update_fn()) -> term().
+get_and_update(ServerRef, Fn) ->
+    gen_server:call(ServerRef, #get_and_update{fn = Fn}).
+
+%%------------------------------------------------------------------------------
+%% `gen_server' API
+%%------------------------------------------------------------------------------
+
+init(InitState) ->
+    {ok, InitState}.
+
+handle_call(#get_and_update{fn = Fn}, _From, State0) ->
+    {Reply, State} = Fn(State0),
+    {reply, Reply, State}.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------