Explorar o código

Merge pull request #13464 from thalesmg/20240712-m-res-manager-shutdown-logs

chore: attempt to reduce race condition supervisor noproc shutdown error logs
Thales Macedo Garitezi hai 1 ano
pai
achega
8d535bbd24
Modificáronse 1 ficheiros con 41 adicións e 14 borrados
  1. 41 14
      apps/emqx_resource/src/emqx_resource_manager.erl

+ 41 - 14
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -59,7 +59,7 @@
 ]).
 
 % Server
--export([start_link/5]).
+-export([start_link/5, where/1]).
 
 % Behaviour
 -export([init/1, callback_mode/0, handle_event/4, terminate/3]).
@@ -156,6 +156,9 @@
 %% API
 %%------------------------------------------------------------------------------
 
+where(ResId) ->
+    gproc:where(?NAME(ResId)).
+
 %% @doc Called from emqx_resource when starting a resource instance.
 %%
 %% Triggers the emqx_resource_manager_sup supervisor to actually create
@@ -277,17 +280,7 @@ remove(ResId) when is_binary(ResId) ->
 -spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}.
 remove(ResId, ClearMetrics) when is_binary(ResId) ->
     try
-        case safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) of
-            {error, timeout} ->
-                ?tp(error, "forcefully_stopping_resource_due_to_timeout", #{
-                    action => remove,
-                    resource_id => ResId
-                }),
-                force_kill(ResId),
-                ok;
-            Res ->
-                Res
-        end
+        do_remove(ResId, ClearMetrics)
     after
         %% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process
         %% If the 'remove' call above had succeeded, this is mostly a no-op but still needed to avoid race condition.
@@ -295,6 +288,31 @@ remove(ResId, ClearMetrics) when is_binary(ResId) ->
         emqx_resource_manager_sup:delete_child(ResId)
     end.
 
+do_remove(ResId, ClearMetrics) ->
+    case gproc:whereis_name(?NAME(ResId)) of
+        undefined ->
+            ok;
+        Pid when is_pid(Pid) ->
+            MRef = monitor(process, Pid),
+            case safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) of
+                {error, timeout} ->
+                    ?tp(error, "forcefully_stopping_resource_due_to_timeout", #{
+                        action => remove,
+                        resource_id => ResId
+                    }),
+                    force_kill(ResId, MRef),
+                    ok;
+                ok ->
+                    receive
+                        {'DOWN', MRef, process, Pid, _} ->
+                            ok
+                    end,
+                    ok;
+                Res ->
+                    Res
+            end
+    end.
+
 %% @doc Stops and then starts an instance that was already running
 -spec restart(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}.
 restart(ResId, Opts) when is_binary(ResId) ->
@@ -332,7 +350,7 @@ stop(ResId, Timeout) ->
                 action => stop,
                 resource_id => ResId
             }),
-            force_kill(ResId),
+            force_kill(ResId, _MRef = undefined),
             ok;
         {error, _Reason} = Error ->
             Error
@@ -469,12 +487,21 @@ get_error(ResId, #{added_channels := #{} = Channels} = ResourceData) when
 get_error(_ResId, #{error := Error}) ->
     Error.
 
-force_kill(ResId) ->
+force_kill(ResId, MRef0) ->
     case gproc:whereis_name(?NAME(ResId)) of
         undefined ->
             ok;
         Pid when is_pid(Pid) ->
+            MRef =
+                case MRef0 of
+                    undefined -> monitor(process, Pid);
+                    _ -> MRef0
+                end,
             exit(Pid, kill),
+            receive
+                {'DOWN', MRef, process, Pid, _} ->
+                    ok
+            end,
             try_clean_allocated_resources(ResId),
             ok
     end.