Explorar o código

Merge pull request #12847 from thalesmg/fix-kill-hc-procs-m-20240408

fix(resource manager): clean up any running health checks when terminating, account for ongoing channel health checks, update data and reply immediately when receiving an update
Thales Macedo Garitezi hai 1 ano
pai
achega
bd7ff8a03f
Modificáronse 1 ficheiros con 94 adicións e 57 borrados
  1. 94 57
      apps/emqx_resource/src/emqx_resource_manager.erl

+ 94 - 57
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -81,15 +81,15 @@
     hc_workers = #{
         resource => #{},
         channel => #{
-            pending => [],
-            previous_status => #{}
+            ongoing => #{},
+            pending => []
         }
     } :: #{
         resource := #{{pid(), reference()} => true},
         channel := #{
             {pid(), reference()} => channel_id(),
-            pending := [channel_id()],
-            previous_status := #{channel_id() => channel_status_map()}
+            ongoing := #{channel_id() => channel_status_map()},
+            pending := [channel_id()]
         }
     },
     %% Callers waiting on health check
@@ -446,6 +446,7 @@ init({DataIn, Opts}) ->
 terminate({shutdown, removed}, _State, _Data) ->
     ok;
 terminate(_Reason, _State, Data) ->
+    ok = terminate_health_check_workers(Data),
     _ = maybe_stop_resource(Data),
     _ = erase_cache(Data),
     ok.
@@ -555,22 +556,22 @@ handle_event(
     {keep_state_and_data, {reply, From, {ok, Channels}}};
 handle_event(
     info,
-    {'DOWN', Ref, process, Pid, Res},
+    {'EXIT', Pid, Res},
     State0,
     Data0 = #data{hc_workers = #{resource := RHCWorkers}}
 ) when
-    is_map_key({Pid, Ref}, RHCWorkers)
+    is_map_key(Pid, RHCWorkers)
 ->
-    handle_resource_health_check_worker_down(State0, Data0, {Pid, Ref}, Res);
+    handle_resource_health_check_worker_down(State0, Data0, Pid, Res);
 handle_event(
     info,
-    {'DOWN', Ref, process, Pid, Res},
+    {'EXIT', Pid, Res},
     _State,
     Data0 = #data{hc_workers = #{channel := CHCWorkers}}
 ) when
-    is_map_key({Pid, Ref}, CHCWorkers)
+    is_map_key(Pid, CHCWorkers)
 ->
-    handle_channel_health_check_worker_down(Data0, {Pid, Ref}, Res);
+    handle_channel_health_check_worker_down(Data0, Pid, Res);
 % Ignore all other events
 handle_event(EventType, EventData, State, Data) ->
     ?SLOG(
@@ -634,6 +635,7 @@ health_check_actions(Data) ->
 handle_remove_event(From, ClearMetrics, Data) ->
     %% stop the buffer workers first, brutal_kill, so it should be fast
     ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts),
+    ok = terminate_health_check_workers(Data),
     %% now stop the resource, this can be slow
     _ = stop_resource(Data),
     case ClearMetrics of
@@ -793,6 +795,35 @@ safe_call_remove_channel(_ResId, _Mod, undefined = State, _ChannelID) ->
 safe_call_remove_channel(ResId, Mod, State, ChannelID) ->
     emqx_resource:call_remove_channel(ResId, Mod, State, ChannelID).
 
+%% For cases where we need to terminate and there are running health checks.
+terminate_health_check_workers(Data) ->
+    #data{
+        hc_workers = #{resource := RHCWorkers, channel := CHCWorkers},
+        hc_pending_callers = #{resource := RPending, channel := CPending}
+    } = Data,
+    maps:foreach(
+        fun(Pid, _) ->
+            exit(Pid, kill)
+        end,
+        RHCWorkers
+    ),
+    maps:foreach(
+        fun
+            (Pid, _) when is_pid(Pid) ->
+                exit(Pid, kill);
+            (_, _) ->
+                ok
+        end,
+        CHCWorkers
+    ),
+    Pending = lists:flatten([RPending, maps:values(CPending)]),
+    lists:foreach(
+        fun(From) ->
+            gen_statem:reply(From, {error, resource_shutting_down})
+        end,
+        Pending
+    ).
+
 make_test_id() ->
     RandId = iolist_to_binary(emqx_utils:gen_id(16)),
     <<?TEST_ID_PREFIX, RandId/binary>>.
@@ -916,14 +947,14 @@ start_resource_health_check(#data{hc_workers = #{resource := HCWorkers}}) when
     keep_state_and_data;
 start_resource_health_check(#data{} = Data0) ->
     #data{hc_workers = HCWorkers0 = #{resource := RHCWorkers0}} = Data0,
-    WorkerRef = {_Pid, _Ref} = spawn_resource_health_check_worker(Data0),
-    HCWorkers = HCWorkers0#{resource := RHCWorkers0#{WorkerRef => true}},
+    WorkerPid = spawn_resource_health_check_worker(Data0),
+    HCWorkers = HCWorkers0#{resource := RHCWorkers0#{WorkerPid => true}},
     Data = Data0#data{hc_workers = HCWorkers},
     {keep_state, Data}.
 
--spec spawn_resource_health_check_worker(data()) -> {pid(), reference()}.
+-spec spawn_resource_health_check_worker(data()) -> pid().
 spawn_resource_health_check_worker(#data{} = Data) ->
-    spawn_monitor(?MODULE, worker_resource_health_check, [Data]).
+    spawn_link(?MODULE, worker_resource_health_check, [Data]).
 
 %% separated so it can be spec'ed and placate dialyzer tantrums...
 -spec worker_resource_health_check(data()) -> no_return().
@@ -1008,12 +1039,12 @@ handle_manual_channel_health_check(
     #data{
         added_channels = Channels,
         hc_pending_callers = #{channel := CPending0} = Pending0,
-        hc_workers = #{channel := #{previous_status := PreviousStatus}}
+        hc_workers = #{channel := #{ongoing := Ongoing}}
     } = Data0,
     ChannelId
 ) when
     is_map_key(ChannelId, Channels),
-    is_map_key(ChannelId, PreviousStatus)
+    is_map_key(ChannelId, Ongoing)
 ->
     %% Ongoing health check.
     CPending = maps:update_with(
@@ -1158,65 +1189,66 @@ resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1) ->
 %% `?status_connected'.
 -spec trigger_health_check_for_added_channels(data()) -> data().
 trigger_health_check_for_added_channels(Data0 = #data{hc_workers = HCWorkers0}) ->
-    #{channel := CHCWorkers0} = HCWorkers0,
-    PreviousStatus = maps:from_list([
-        {ChannelId, OldStatus}
-     || {ChannelId, OldStatus} <- maps:to_list(Data0#data.added_channels),
-        channel_status_is_channel_added(OldStatus)
-    ]),
-    ChannelsToCheck = maps:keys(PreviousStatus),
+    #{
+        channel := CHCWorkers0 =
+            #{
+                pending := CPending0,
+                ongoing := Ongoing0
+            }
+    } = HCWorkers0,
+    NewOngoing = maps:filter(
+        fun(ChannelId, OldStatus) ->
+            not is_map_key(ChannelId, Ongoing0) and
+                channel_status_is_channel_added(OldStatus)
+        end,
+        Data0#data.added_channels
+    ),
+    ChannelsToCheck = maps:keys(NewOngoing),
     case ChannelsToCheck of
         [] ->
             %% Nothing to do.
             Data0;
         [ChannelId | Rest] ->
             %% Shooting one check at a time.  We could increase concurrency in the future.
-            CHCWorkers = CHCWorkers0#{pending := Rest, previous_status := PreviousStatus},
+            CHCWorkers = CHCWorkers0#{
+                pending := CPending0 ++ Rest,
+                ongoing := maps:merge(Ongoing0, NewOngoing)
+            },
             Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}},
             start_channel_health_check(Data1, ChannelId)
     end.
 
--spec continue_channel_health_check_connected(data()) -> data().
-continue_channel_health_check_connected(Data0) ->
+-spec continue_channel_health_check_connected(channel_id(), channel_status_map(), data()) -> data().
+continue_channel_health_check_connected(ChannelId, OldStatus, Data0) ->
     #data{hc_workers = HCWorkers0} = Data0,
-    #{channel := #{previous_status := PreviousStatus} = CHCWorkers0} = HCWorkers0,
-    CHCWorkers = CHCWorkers0#{previous_status := #{}},
+    #{channel := CHCWorkers0} = HCWorkers0,
+    CHCWorkers = emqx_utils_maps:deep_remove([ongoing, ChannelId], CHCWorkers0),
     Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}},
     %% Remove the added channels with a a status different from connected or connecting
-    CheckedChannels = [
-        {ChannelId, NewStatus}
-     || {ChannelId, NewStatus} <- maps:to_list(Data0#data.added_channels),
-        is_map_key(ChannelId, PreviousStatus)
-    ],
-    ChannelsToRemove = [
-        ChannelId
-     || {ChannelId, NewStatus} <- CheckedChannels,
-        not channel_status_is_channel_added(NewStatus)
-    ],
+    NewStatus = maps:get(ChannelId, Data0#data.added_channels),
+    ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)],
     Data = remove_channels_in_list(ChannelsToRemove, Data1, true),
     %% Raise/clear alarms
-    lists:foreach(
-        fun
-            ({ID, #{status := ?status_connected}}) ->
-                _ = maybe_clear_alarm(ID);
-            ({ID, NewStatus}) ->
-                OldStatus = maps:get(ID, PreviousStatus),
-                _ = maybe_alarm(NewStatus, ID, NewStatus, OldStatus)
-        end,
-        CheckedChannels
-    ),
+    case NewStatus of
+        #{status := ?status_connected} ->
+            _ = maybe_clear_alarm(ChannelId),
+            ok;
+        _ ->
+            _ = maybe_alarm(NewStatus, ChannelId, NewStatus, OldStatus),
+            ok
+    end,
     Data.
 
 -spec start_channel_health_check(data(), channel_id()) -> data().
 start_channel_health_check(#data{} = Data0, ChannelId) ->
     #data{hc_workers = HCWorkers0 = #{channel := CHCWorkers0}} = Data0,
-    WorkerRef = {_Pid, _Ref} = spawn_channel_health_check_worker(Data0, ChannelId),
-    HCWorkers = HCWorkers0#{channel := CHCWorkers0#{WorkerRef => ChannelId}},
+    WorkerPid = spawn_channel_health_check_worker(Data0, ChannelId),
+    HCWorkers = HCWorkers0#{channel := CHCWorkers0#{WorkerPid => ChannelId}},
     Data0#data{hc_workers = HCWorkers}.
 
--spec spawn_channel_health_check_worker(data(), channel_id()) -> {pid(), reference()}.
+-spec spawn_channel_health_check_worker(data(), channel_id()) -> pid().
 spawn_channel_health_check_worker(#data{} = Data, ChannelId) ->
-    spawn_monitor(?MODULE, worker_channel_health_check, [Data, ChannelId]).
+    spawn_link(?MODULE, worker_channel_health_check, [Data, ChannelId]).
 
 %% separated so it can be spec'ed and placate dialyzer tantrums...
 -spec worker_channel_health_check(data(), channel_id()) -> no_return().
@@ -1240,19 +1272,24 @@ handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) ->
             %% `emqx_resource:call_channel_health_check' catches all exceptions.
             AddedChannels = maps:put(ChannelId, NewStatus, AddedChannels0)
     end,
+    #{ongoing := Ongoing0} = CHCWorkers1,
+    {PreviousChanStatus, Ongoing1} = maps:take(ChannelId, Ongoing0),
+    CHCWorkers2 = CHCWorkers1#{ongoing := Ongoing1},
+    CHCWorkers3 = emqx_utils_maps:deep_remove([ongoing, ChannelId], CHCWorkers2),
     Data1 = Data0#data{added_channels = AddedChannels},
     {Replies, Data2} = reply_pending_channel_health_check_callers(ChannelId, NewStatus, Data1),
     case CHCWorkers1 of
         #{pending := [NextChannelId | Rest]} ->
-            CHCWorkers = CHCWorkers1#{pending := Rest},
+            CHCWorkers = CHCWorkers3#{pending := Rest},
             HCWorkers = HCWorkers0#{channel := CHCWorkers},
             Data3 = Data2#data{hc_workers = HCWorkers},
-            Data = start_channel_health_check(Data3, NextChannelId),
+            Data4 = continue_channel_health_check_connected(ChannelId, PreviousChanStatus, Data3),
+            Data = start_channel_health_check(Data4, NextChannelId),
             {keep_state, update_state(Data, Data0), Replies};
         #{pending := []} ->
-            HCWorkers = HCWorkers0#{channel := CHCWorkers1},
+            HCWorkers = HCWorkers0#{channel := CHCWorkers3},
             Data3 = Data2#data{hc_workers = HCWorkers},
-            Data = continue_channel_health_check_connected(Data3),
+            Data = continue_channel_health_check_connected(ChannelId, PreviousChanStatus, Data3),
             {keep_state, update_state(Data, Data0), Replies}
     end.
 
@@ -1308,7 +1345,7 @@ remove_runtime_data(#data{} = Data0) ->
     Data0#data{
         hc_workers = #{
             resource => #{},
-            channel => #{pending => [], previous_status => #{}}
+            channel => #{pending => [], ongoing => #{}}
         },
         hc_pending_callers = #{
             resource => [],