Просмотр исходного кода

fix(resource manager): break "remove channels when disconnected" invariant from state machine

Fixes https://emqx.atlassian.net/browse/EMQX-13599

Fixes https://github.com/emqx/emqx/issues/13979
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
331346fc5b

+ 4 - 0
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -280,10 +280,14 @@ lookup(ConfRootName, Type, Name) ->
             ChannelStatus = maps:get(BridgeV2Id, Channels, undefined),
             {DisplayBridgeV2Status, ErrorMsg} =
                 case {ChannelStatus, ConnectorStatus} of
+                    {_, ?status_disconnected} ->
+                        {?status_disconnected, <<"Resource not operational">>};
                     {#{status := ?status_connected}, _} ->
                         {?status_connected, <<"">>};
                     {#{error := resource_not_operational}, ?status_connecting} ->
                         {?status_connecting, <<"Not installed">>};
+                    {#{error := not_added_yet}, _} ->
+                        {?status_connecting, <<"Not installed">>};
                     {#{status := Status, error := undefined}, _} ->
                         {Status, <<"Unknown reason">>};
                     {#{status := Status, error := Error}, _} ->

+ 2 - 1
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl

@@ -459,7 +459,8 @@ t_repeated_topics(Config) ->
                 emqx_bridge_v2_testlib:create_source_api([{source_name, Name2} | Config]),
             ?assertEqual(
                 match,
-                re:run(Error, <<"Topics .* already exist in other sources">>, [{capture, none}])
+                re:run(Error, <<"Topics .* already exist in other sources">>, [{capture, none}]),
+                #{error => Error}
             ),
             ok
         end,

+ 8 - 1
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -1321,7 +1321,7 @@ extract_connector_id(Id) when is_binary(Id) ->
 %% There is no need to query the conncector if the channel is not
 %% installed as the query will fail anyway.
 pre_query_channel_check(Id, {Id, _} = _Request, ChanSt, IsSimpleQuery) ->
-    case emqx_resource_manager:channel_status_is_channel_added(ChanSt) of
+    case is_channel_apt_for_queries(ChanSt) of
         true ->
             ok;
         false ->
@@ -2365,6 +2365,13 @@ buffer_worker(_Tid) ->
 is_simple_query(#{simple_query := Bool}) ->
     Bool.
 
+is_channel_apt_for_queries(?status_connected) ->
+    true;
+is_channel_apt_for_queries(?status_connecting) ->
+    true;
+is_channel_apt_for_queries(_) ->
+    false.
+
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 adjust_batch_time_test_() ->

+ 224 - 110
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -50,12 +50,12 @@
     is_exist/1,
     get_metrics/1,
     reset_metrics/1,
-    channel_status_is_channel_added/1,
     get_query_mode_and_last_error/2
 ]).
 
 -export([
-    set_resource_status_connecting/1
+    set_resource_status_connecting/1,
+    external_error/1
 ]).
 
 % Server
@@ -71,6 +71,13 @@
 -export([stop/2]).
 -endif.
 
+%%------------------------------------------------------------------------------
+%% Type definitions
+%%------------------------------------------------------------------------------
+
+-define(not_added_yet, {?MODULE, not_added_yet}).
+-define(add_channel_failed(REASON), {?MODULE, add_channel_failed, REASON}).
+
 % State record
 -record(data, {
     id,
@@ -101,8 +108,8 @@
     },
     %% Callers waiting on health check
     hc_pending_callers = #{resource => [], channel => #{}} :: #{
-        resource := [gen_server:from()],
-        channel := #{channel_id() => [gen_server:from()]}
+        resource := [gen_statem:from()],
+        channel := #{channel_id() => [gen_statem:from()]}
     },
     extra
 }).
@@ -146,11 +153,15 @@
 %% calls/casts/generic timeouts
 -record(add_channel, {channel_id :: channel_id(), config :: map()}).
 -record(start_channel_health_check, {channel_id :: channel_id()}).
+-record(retry_add_channel, {channel_id :: channel_id()}).
 
 -type generic_timeout(Id, Content) :: {{timeout, Id}, timeout(), Content}.
 -type start_channel_health_check_action() :: generic_timeout(
     #start_channel_health_check{}, #start_channel_health_check{}
 ).
+-type retry_add_channel_action() :: generic_timeout(
+    #retry_add_channel{}, #retry_add_channel{}
+).
 
 %%------------------------------------------------------------------------------
 %% API
@@ -623,9 +634,10 @@ handle_event({call, From}, {channel_health_check, ChannelId}, _State, Data) ->
 %%--------------------------
 %% State: CONNECTING
 %%--------------------------
-handle_event(enter, _OldState, ?state_connecting = State, Data) ->
+handle_event(enter, _OldState, ?state_connecting = State, Data0) ->
+    Data = abort_all_channel_health_checks(Data0),
     ok = log_status_consistency(State, Data),
-    {keep_state_and_data, [{state_timeout, 0, health_check}]};
+    {keep_state, Data, [{state_timeout, 0, health_check}]};
 handle_event(internal, start_resource, ?state_connecting, Data) ->
     start_resource(Data, undefined);
 handle_event(state_timeout, health_check, ?state_connecting, Data) ->
@@ -643,7 +655,7 @@ handle_event(enter, _OldState, ?state_connected = State, Data) ->
     ok = log_status_consistency(State, Data),
     _ = emqx_alarm:safe_deactivate(Data#data.id),
     ?tp(resource_connected_enter, #{}),
-    {keep_state_and_data, resource_health_check_actions(Data)};
+    {keep_state, Data, resource_health_check_actions(Data)};
 handle_event(state_timeout, health_check, ?state_connected, Data) ->
     start_resource_health_check(Data);
 handle_event(
@@ -664,13 +676,17 @@ handle_event(
     Data
 ) ->
     handle_start_channel_health_check(Data, ChannelId);
+handle_event(
+    {timeout, #retry_add_channel{channel_id = ChannelId}}, _, ?state_connected = _State, Data
+) ->
+    handle_retry_add_channel(Data, ChannelId);
 %%--------------------------
 %% State: DISCONNECTED
 %%--------------------------
 handle_event(enter, _OldState, ?state_disconnected = State, Data0) ->
     ok = log_status_consistency(State, Data0),
     ?tp(resource_disconnected_enter, #{}),
-    Data = handle_abort_all_channel_health_checks(Data0),
+    Data = abort_all_channel_health_checks(Data0),
     {keep_state, Data, retry_actions(Data)};
 handle_event(state_timeout, auto_retry, ?state_disconnected, Data) ->
     ?tp(resource_auto_reconnect, #{}),
@@ -679,9 +695,10 @@ handle_event(state_timeout, auto_retry, ?state_disconnected, Data) ->
 %% State: STOPPED
 %% The stopped state is entered after the resource has been explicitly stopped
 %%--------------------------
-handle_event(enter, _OldState, ?state_stopped = State, Data) ->
+handle_event(enter, _OldState, ?state_stopped = State, Data0) ->
+    Data = abort_all_channel_health_checks(Data0),
     ok = log_status_consistency(State, Data),
-    {keep_state_and_data, []};
+    {keep_state, Data};
 %%--------------------------
 %% The following events can be handled in any other state
 %%--------------------------
@@ -716,6 +733,9 @@ handle_event(
     is_map_key(Pid, CHCWorkers)
 ->
     handle_channel_health_check_worker_down(Data0, Pid, Res);
+handle_event({timeout, #retry_add_channel{channel_id = _}}, _, _State, _Data) ->
+    %% We only add channels to the resource state in the connected state.
+    {keep_state_and_data, [postpone]};
 handle_event({timeout, #start_channel_health_check{channel_id = _}}, _, _State, _Data) ->
     %% Stale health check action; currently, we only probe channel health when connected.
     keep_state_and_data;
@@ -814,11 +834,12 @@ start_resource(Data, From) ->
             ),
             _ = maybe_alarm(?status_disconnected, IsDryRun, ResId, Err, Data#data.error),
             %% Add channels and raise alarms
-            NewData1 = channels_health_check(?status_disconnected, add_channels(Data)),
+            {Actions0, NewData1} = channels_health_check(?status_disconnected, add_channels(Data)),
             %% Keep track of the error reason why the connection did not work
             %% so that the Reason can be returned when the verification call is made.
             NewData2 = NewData1#data{status = ?status_disconnected, error = Err},
-            Actions = maybe_reply(retry_actions(NewData2), From, Err),
+            Actions1 = maybe_reply(retry_actions(NewData2), From, Err),
+            Actions = Actions1 ++ Actions0,
             {next_state, ?state_disconnected, update_state(NewData2), Actions}
     end.
 
@@ -848,9 +869,12 @@ maybe_update_callback_mode(Data = #data{mod = ResourceType, state = ResourceStat
             Data#data{callback_mode = CallMode}
     end.
 
-add_channels_in_list([], Data) ->
-    Data;
-add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
+add_channels_in_list(ChannelsWithConfigs, Data) ->
+    add_channels_in_list(ChannelsWithConfigs, Data, _Actions = []).
+
+add_channels_in_list([], Data, Actions) ->
+    {Actions, Data};
+add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data, Actions) ->
     #data{
         id = ResId,
         mod = Mod,
@@ -872,6 +896,7 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
                 channel_status_new_waiting_for_health_check(ChannelConfig),
                 AddedChannelsMap
             ),
+            NewActions = Actions,
             NewData = Data#data{
                 state = NewState,
                 added_channels = NewAddedChannelsMap
@@ -890,16 +915,17 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
             ),
             NewAddedChannelsMap = maps:put(
                 ChannelID,
-                channel_status(Error, ChannelConfig),
+                channel_status(?add_channel_failed(Reason), ChannelConfig),
                 AddedChannelsMap
             ),
+            NewActions = [retry_add_channel_action(ChannelID, ChannelConfig, Data) | Actions],
             NewData = Data#data{
                 added_channels = NewAddedChannelsMap
             },
             %% Raise an alarm since the channel could not be added
             _ = maybe_alarm(?status_disconnected, IsDryRun, ChannelID, Error, no_prev_error)
     end,
-    add_channels_in_list(Rest, NewData).
+    add_channels_in_list(Rest, NewData, NewActions).
 
 maybe_stop_resource(#data{status = Status} = Data) when Status =/= ?rm_status_stopped ->
     stop_resource(Data);
@@ -928,11 +954,11 @@ stop_resource(#data{id = ResId} = Data) ->
 
 remove_channels(Data) ->
     Channels = maps:keys(Data#data.added_channels),
-    remove_channels_in_list(Channels, Data, false).
+    remove_channels_in_list(Channels, Data).
 
-remove_channels_in_list([], Data, _KeepInChannelMap) ->
+remove_channels_in_list([], Data) ->
     Data;
-remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
+remove_channels_in_list([ChannelID | Rest], Data) ->
     #data{
         id = ResId,
         added_channels = AddedChannelsMap,
@@ -942,14 +968,8 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
         type = Type
     } = Data,
     IsDryRun = emqx_resource:is_dry_run(ResId),
-    NewAddedChannelsMap =
-        case KeepInChannelMap of
-            true ->
-                AddedChannelsMap;
-            false ->
-                _ = maybe_clear_alarm(IsDryRun, ChannelID),
-                maps:remove(ChannelID, AddedChannelsMap)
-        end,
+    _ = maybe_clear_alarm(IsDryRun, ChannelID),
+    NewAddedChannelsMap = maps:remove(ChannelID, AddedChannelsMap),
     case safe_call_remove_channel(ResId, Mod, State, ChannelID) of
         {ok, NewState} ->
             NewData = Data#data{
@@ -974,7 +994,7 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
                 added_channels = NewAddedChannelsMap
             }
     end,
-    remove_channels_in_list(Rest, NewData, KeepInChannelMap).
+    remove_channels_in_list(Rest, NewData).
 
 safe_call_remove_channel(_ResId, _Mod, undefined = State, _ChannelID) ->
     {ok, State};
@@ -1042,7 +1062,8 @@ handle_not_connected_add_channel(From, ChannelId, ChannelConfig, State, Data) ->
     NewData = add_or_update_channel_status(Data, ChannelId, ChannelConfig, State),
     {keep_state, update_state(NewData), [{reply, From, ok}]}.
 
-handle_remove_channel(From, ChannelId, Data) ->
+handle_remove_channel(From, ChannelId, Data0) ->
+    Data = abort_health_checks_for_channel(Data0, ChannelId),
     Channels = Data#data.added_channels,
     IsDryRun = emqx_resource:is_dry_run(Data#data.id),
     _ = maybe_clear_alarm(IsDryRun, ChannelId),
@@ -1197,9 +1218,9 @@ continue_resource_health_check_connected(NewStatus, Data0) ->
     case NewStatus of
         ?status_connected ->
             {Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0),
-            Data2 = channels_health_check(?status_connected, Data1),
+            {Actions0, Data2} = channels_health_check(?status_connected, Data1),
             Data = update_state(Data2),
-            Actions = Replies ++ resource_health_check_actions(Data),
+            Actions = Replies ++ Actions0 ++ resource_health_check_actions(Data),
             {keep_state, Data, Actions};
         _ ->
             #data{id = ResId, group = Group, type = Type} = Data0,
@@ -1218,8 +1239,8 @@ continue_resource_health_check_connected(NewStatus, Data0) ->
             %% between the two here, as resource manager also has `stopped', which is
             %% not a valid status at the time of writing.
             {Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0),
-            Data = channels_health_check(NewStatus, Data1),
-            Actions = Replies,
+            {Actions0, Data} = channels_health_check(NewStatus, Data1),
+            Actions = Replies ++ Actions0,
             {next_state, NewStatus, Data, Actions}
     end.
 
@@ -1228,16 +1249,16 @@ continue_resource_health_check_not_connected(NewStatus, Data0) ->
     {Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0),
     case NewStatus of
         ?status_connected ->
-            Data = channels_health_check(?status_connected, Data1),
-            Actions = Replies,
+            {Actions0, Data} = channels_health_check(?status_connected, Data1),
+            Actions = Replies ++ Actions0,
             {next_state, ?state_connected, Data, Actions};
         ?status_connecting ->
-            Data = channels_health_check(?status_connecting, Data1),
-            Actions = Replies ++ resource_health_check_actions(Data),
+            {Actions0, Data} = channels_health_check(?status_connecting, Data1),
+            Actions = Replies ++ Actions0 ++ resource_health_check_actions(Data),
             {next_state, ?status_connecting, Data, Actions};
         ?status_disconnected ->
-            Data = channels_health_check(?status_disconnected, Data1),
-            Actions = Replies,
+            {Actions0, Data} = channels_health_check(?status_disconnected, Data1),
+            Actions = Replies ++ Actions0,
             {next_state, ?state_disconnected, Data, Actions}
     end.
 
@@ -1277,7 +1298,9 @@ handle_manual_channel_health_check(
     is_map_key(ChannelId, Channels)
 ->
     %% No ongoing health check: reply with current status.
-    {keep_state_and_data, [{reply, From, without_channel_config(maps:get(ChannelId, Channels))}]};
+    {keep_state_and_data, [
+        {reply, From, to_external_channel_status(maps:get(ChannelId, Channels))}
+    ]};
 handle_manual_channel_health_check(
     From,
     _Data,
@@ -1287,22 +1310,21 @@ handle_manual_channel_health_check(
         {reply, From, channel_error_status(channel_not_found)}
     ]}.
 
--spec channels_health_check(resource_status(), data()) -> data().
+-spec channels_health_check(resource_status(), data()) -> {[gen_statem:action()], data()}.
 channels_health_check(?status_connected = _ConnectorStatus, Data0) ->
     Channels = maps:to_list(Data0#data.added_channels),
-    %% All channels with a status different from connected or connecting are
-    %% not added
     ChannelsNotAdded = [
         ChannelId
      || {ChannelId, Status} <- Channels,
         not channel_status_is_channel_added(Status)
     ],
-    %% Attempt to add channels that are not added
+    %% Attempt to add channels to resource state that are not added yet
     ChannelsNotAddedWithConfigs = get_config_for_channels(Data0, ChannelsNotAdded),
-    Data1 = add_channels_in_list(ChannelsNotAddedWithConfigs, Data0),
-    %% Now that we have done the adding, we can get the status of all channels
+    {Actions, Data1} = add_channels_in_list(ChannelsNotAddedWithConfigs, Data0),
+    %% Now that we have done the adding, we can get the status of all channels (execept
+    %% unhealthy ones)
     Data2 = trigger_health_check_for_added_channels(Data1),
-    update_state(Data2);
+    {Actions, update_state(Data2)};
 channels_health_check(?status_connecting = _ConnectorStatus, Data0) ->
     %% Whenever the resource is connecting:
     %% 1. Change the status of all added channels to connecting
@@ -1340,33 +1362,34 @@ channels_health_check(?status_connecting = _ConnectorStatus, Data0) ->
         ChannelsWithNewAndPrevErrorStatuses
     ),
     Data1 = Data0#data{added_channels = NewChannels},
-    update_state(Data1);
-channels_health_check(ConnectorStatus, Data0) ->
-    %% Whenever the resource is not connected and not connecting:
+    {_Actions = [], update_state(Data1)};
+channels_health_check(?status_disconnected = ConnectorStatus, Data1) ->
+    %% Whenever the resource is disconnected:
     %% 1. Remove all added channels
     %% 2. Change the status to an error status
     %% 3. Raise alarms
-    Channels = Data0#data.added_channels,
-    ChannelsToRemove = [
-        ChannelId
-     || {ChannelId, Status} <- maps:to_list(Channels),
-        channel_status_is_channel_added(Status)
-    ],
-    Data1 = remove_channels_in_list(ChannelsToRemove, Data0, true),
+    Channels = Data1#data.added_channels,
     ChannelsWithNewAndOldStatuses =
-        [
-            {ChannelId, OldStatus,
-                channel_status(
-                    {error,
-                        resource_not_connected_channel_error_msg(
-                            ConnectorStatus,
-                            ChannelId,
-                            Data1
-                        )},
-                    Config
-                )}
-         || {ChannelId, #{config := Config} = OldStatus} <- maps:to_list(Data1#data.added_channels)
-        ],
+        lists:map(
+            fun
+                ({ChannelId, #{error := ?not_added_yet} = OldStatus}) ->
+                    {ChannelId, OldStatus, OldStatus};
+                ({ChannelId, #{error := ?add_channel_failed(_)} = OldStatus}) ->
+                    {ChannelId, OldStatus, OldStatus};
+                ({ChannelId, #{config := Config} = OldStatus}) ->
+                    {ChannelId, OldStatus,
+                        channel_status(
+                            {error,
+                                resource_not_connected_channel_error_msg(
+                                    ConnectorStatus,
+                                    ChannelId,
+                                    Data1
+                                )},
+                            Config
+                        )}
+            end,
+            maps:to_list(Data1#data.added_channels)
+        ),
     %% Raise alarms
     IsDryRun = emqx_resource:is_dry_run(Data1#data.id),
     _ = lists:foreach(
@@ -1384,7 +1407,7 @@ channels_health_check(ConnectorStatus, Data0) ->
         ChannelsWithNewAndOldStatuses
     ),
     Data2 = Data1#data{added_channels = NewChannels},
-    update_state(Data2).
+    {_Actions = [], update_state(Data2)}.
 
 resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1) ->
     ResourceId = Data1#data.id,
@@ -1404,27 +1427,40 @@ resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1) ->
 generic_timeout_action(Id, Timeout, Content) ->
     {{timeout, Id}, Timeout, Content}.
 
--spec start_channel_health_check_action(channel_id(), map(), map(), data() | timeout()) ->
+-spec start_channel_health_check_action(channel_id(), map(), map(), data()) ->
     [start_channel_health_check_action()].
 start_channel_health_check_action(ChannelId, NewChanStatus, PreviousChanStatus, Data = #data{}) ->
-    Timeout = get_channel_health_check_interval(ChannelId, NewChanStatus, PreviousChanStatus, Data),
+    ConfigSources =
+        lists:map(
+            fun
+                (#{config := Config}) ->
+                    Config;
+                (_) ->
+                    #{}
+            end,
+            [NewChanStatus, PreviousChanStatus]
+        ),
+    Timeout = get_channel_health_check_interval(ChannelId, ConfigSources, Data),
     Event = #start_channel_health_check{channel_id = ChannelId},
     [generic_timeout_action(Event, Timeout, Event)].
 
-get_channel_health_check_interval(ChannelId, NewChanStatus, PreviousChanStatus, Data) ->
+-spec retry_add_channel_action(channel_id(), map(), data()) -> retry_add_channel_action().
+retry_add_channel_action(ChannelId, ChannelConfig, Data) ->
+    Timeout = get_channel_health_check_interval(ChannelId, [ChannelConfig], Data),
+    Event = #retry_add_channel{channel_id = ChannelId},
+    generic_timeout_action(Event, Timeout, Event).
+
+get_channel_health_check_interval(ChannelId, ConfigSources, Data) ->
     emqx_utils:foldl_while(
         fun
-            (#{config := #{resource_opts := #{health_check_interval := HCInterval}}}, _Acc) ->
+            (#{resource_opts := #{health_check_interval := HCInterval}}, _Acc) ->
                 {halt, HCInterval};
             (_, Acc) ->
                 {cont, Acc}
         end,
         ?HEALTHCHECK_INTERVAL,
-        [
-            NewChanStatus,
-            PreviousChanStatus,
-            maps:get(ChannelId, Data#data.added_channels, #{})
-        ]
+        ConfigSources ++
+            [emqx_utils_maps:deep_get([ChannelId, config], Data#data.added_channels, #{})]
     ).
 
 %% Currently, we only call resource channel health checks when the underlying resource is
@@ -1437,7 +1473,7 @@ trigger_health_check_for_added_channels(Data0 = #data{hc_workers = HCWorkers0})
     NewOngoing = maps:filter(
         fun(ChannelId, OldStatus) ->
             (not is_map_key(ChannelId, Ongoing0)) andalso
-                channel_status_is_channel_added(OldStatus)
+                is_channel_apt_for_health_check(OldStatus)
         end,
         Data0#data.added_channels
     ),
@@ -1470,12 +1506,10 @@ continue_channel_health_check_connected(ChannelId, OldStatus, CurrentStatus, Dat
             Data1
     end.
 
-continue_channel_health_check_connected_no_update_during_check(ChannelId, OldStatus, Data1) ->
+continue_channel_health_check_connected_no_update_during_check(ChannelId, OldStatus, Data) ->
     %% Remove the added channels with a status different from connected or connecting
-    NewStatus = maps:get(ChannelId, Data1#data.added_channels),
-    ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)],
-    Data = remove_channels_in_list(ChannelsToRemove, Data1, true),
-    IsDryRun = emqx_resource:is_dry_run(Data1#data.id),
+    NewStatus = maps:get(ChannelId, Data#data.added_channels),
+    IsDryRun = emqx_resource:is_dry_run(Data#data.id),
     %% Raise/clear alarms
     case NewStatus of
         #{status := ?status_connected} ->
@@ -1585,7 +1619,7 @@ handle_channel_health_check_worker_down_new_channels_and_status(
 reply_pending_channel_health_check_callers(
     ChannelId, Status0, Data0 = #data{hc_pending_callers = Pending0}
 ) ->
-    Status = without_channel_config(Status0),
+    Status = to_external_channel_status(Status0),
     #{channel := CPending0} = Pending0,
     Pending = maps:get(ChannelId, CPending0, []),
     Actions = [{reply, From, Status} || From <- Pending],
@@ -1593,6 +1627,21 @@ reply_pending_channel_health_check_callers(
     Data = Data0#data{hc_pending_callers = Pending0#{channel := CPending}},
     {Actions, Data}.
 
+handle_retry_add_channel(Data0, ChannelId) ->
+    ?tp(retry_add_channel, #{channel_id => ChannelId}),
+    maybe
+        {ok, StatusMap} ?= maps:find(ChannelId, Data0#data.added_channels),
+        %% Must contain config map if in data.
+        #{config := #{} = ChannelConfig} = StatusMap,
+        {Actions, Data1} = add_channels_in_list([{ChannelId, ChannelConfig}], Data0),
+        Data = trigger_health_check_for_added_channels(Data1),
+        {keep_state, Data, Actions}
+    else
+        error ->
+            %% Channel has been removed since timer was set?
+            keep_state_and_data
+    end.
+
 get_config_for_channels(Data0, ChannelsWithoutConfig) ->
     ResId = Data0#data.id,
     Mod = Data0#data.mod,
@@ -1651,7 +1700,7 @@ maybe_alarm(_Status, false, ResId, Error, _PrevError) ->
             {error, Reason} ->
                 emqx_utils:readable_error_msg(Reason);
             _ ->
-                Error1 = without_channel_config(Error),
+                Error1 = to_external_channel_status(Error),
                 emqx_utils:readable_error_msg(Error1)
         end,
     emqx_alarm:safe_activate(
@@ -1659,11 +1708,15 @@ maybe_alarm(_Status, false, ResId, Error, _PrevError) ->
         #{resource_id => ResId, reason => resource_down},
         <<"resource down: ", HrError/binary>>
     ),
-    ?tp(resource_activate_alarm, #{resource_id => ResId}).
+    ?tp(resource_activate_alarm, #{resource_id => ResId, error => HrError}).
 
 without_channel_config(Map) ->
     maps:without([config], Map).
 
+to_external_channel_status(StatusMap0) ->
+    StatusMap = without_channel_config(StatusMap0),
+    maps:update_with(error, fun external_error/1, StatusMap).
+
 -spec maybe_resume_resource_workers(resource_id(), resource_status()) -> ok.
 maybe_resume_resource_workers(ResId, ?status_connected) ->
     lists:foreach(
@@ -1704,6 +1757,8 @@ status_to_error(_) ->
     {error, undefined}.
 
 %% Compatibility
+external_error(?not_added_yet) -> not_added_yet;
+external_error(?add_channel_failed(Reason)) -> external_error(Reason);
 external_error({error, Reason}) -> Reason;
 external_error(Other) -> Other.
 
@@ -1716,7 +1771,9 @@ maybe_reply(Actions, From, Reply) ->
 data_record_to_external_map(Data) ->
     AddedChannelsWithoutConfigs =
         maps:map(
-            fun(_ChanID, Status) -> without_channel_config(Status) end,
+            fun(_ChanID, Status) ->
+                to_external_channel_status(Status)
+            end,
             Data#data.added_channels
         ),
     #{
@@ -1776,7 +1833,7 @@ channel_status_not_added(ChannelConfig) ->
         %%                 connected and the on_channel_get_status callback has returned
         %%                 connected. The error field should be undefined.
         status => ?status_disconnected,
-        error => not_added_yet,
+        error => ?not_added_yet,
         config => ChannelConfig
     }.
 
@@ -1825,6 +1882,12 @@ channel_status({?status_connected, Error}, ChannelConfig) ->
         error => Error,
         config => ChannelConfig
     };
+channel_status(?add_channel_failed(_Reason) = Error, ChannelConfig) ->
+    #{
+        status => ?status_disconnected,
+        error => Error,
+        config => ChannelConfig
+    };
 channel_status({error, Reason}, ChannelConfig) ->
     S = channel_error_status(Reason),
     S#{config => ChannelConfig}.
@@ -1835,19 +1898,24 @@ channel_error_status(Reason) ->
         error => Reason
     }.
 
-channel_status_is_channel_added(#{status := St}) ->
-    channel_status_is_channel_added(St);
-channel_status_is_channel_added(?status_connected) ->
-    true;
-channel_status_is_channel_added(?status_connecting) ->
-    true;
-channel_status_is_channel_added(_Status) ->
-    false.
+is_channel_apt_for_health_check(#{error := {unhealthy_target, _}}) ->
+    false;
+is_channel_apt_for_health_check(#{error := unhealthy_target}) ->
+    false;
+is_channel_apt_for_health_check(StatusMap) ->
+    channel_status_is_channel_added(StatusMap).
+
+channel_status_is_channel_added(#{error := ?not_added_yet}) ->
+    false;
+channel_status_is_channel_added(#{error := ?add_channel_failed(_)}) ->
+    false;
+channel_status_is_channel_added(_StatusMap) ->
+    true.
 
 -spec add_or_update_channel_status(data(), channel_id(), map(), resource_state()) -> data().
 add_or_update_channel_status(Data, ChannelId, ChannelConfig, State) ->
     Channels = Data#data.added_channels,
-    ChannelStatus = channel_status({error, resource_not_operational}, ChannelConfig),
+    ChannelStatus = channel_status_not_added(ChannelConfig),
     NewChannels = maps:put(ChannelId, ChannelStatus, Channels),
     ResStatus = state_to_status(State),
     IsDryRun = emqx_resource:is_dry_run(ChannelId),
@@ -1866,10 +1934,18 @@ tag(Group, Type) ->
     Str = emqx_utils_conv:str(Group) ++ "/" ++ emqx_utils_conv:str(Type),
     string:uppercase(Str).
 
+%% For still unknown reasons (e.g.: `emqx_metrics_worker' process might die?), metrics
+%% might be lost for a running resource, and future attempts to bump them result in
+%% errors.  As mitigation, we ensure such metrics are created here so that restarting
+%% the resource or resetting its metrics can recreate them.
+ensure_metrics(ResId) ->
+    {ok, _} = emqx_resource:ensure_metrics(ResId),
+    ok.
+
 %% When a resource enters a `?status_disconnected' state, late channel health check
 %% replies are useless and could corrup state.
--spec handle_abort_all_channel_health_checks(data()) -> data().
-handle_abort_all_channel_health_checks(Data0) ->
+-spec abort_all_channel_health_checks(data()) -> data().
+abort_all_channel_health_checks(Data0) ->
     #data{
         hc_workers = #{channel := CHCWorkers} = HCWorkers0,
         hc_pending_callers = #{channel := CPending} = Pending0
@@ -1898,17 +1974,55 @@ handle_abort_all_channel_health_checks(Data0) ->
 
 abort_channel_health_check(Pid) ->
     %% We're already linked to the worker pids due to `spawn_link'.
+    MRef = monitor(process, Pid),
     exit(Pid, kill),
+    receive
+        {'DOWN', MRef, process, Pid, _} ->
+            ok
+    end,
     %% Clean the exit signal so it doesn't contaminate state handling.
     receive
         {'EXIT', Pid, _} ->
             ok
+    after 0 -> ok
     end.
 
-%% For still unknown reasons (e.g.: `emqx_metrics_worker' process might die?), metrics
-%% might be lost for a running resource, and future attempts to bump them result in
-%% errors.  As mitigation, we ensure such metrics are created here so that restarting
-%% the resource or resetting its metrics can recreate them.
-ensure_metrics(ResId) ->
-    {ok, _} = emqx_resource:ensure_metrics(ResId),
-    ok.
+map_take_or(Map, Key, Default) ->
+    maybe
+        error ?= maps:take(Key, Map),
+        {Default, Map}
+    end.
+
+abort_health_checks_for_channel(Data0, ChannelId) ->
+    #data{
+        hc_workers = #{channel := #{ongoing := Ongoing0} = CHCWorkers0} = HCWorkers0,
+        hc_pending_callers = #{channel := CPending0} = Pending0
+    } = Data0,
+    Ongoing = maps:remove(ChannelId, Ongoing0),
+    {Callers, CPending} = map_take_or(CPending0, ChannelId, []),
+    lists:foreach(
+        fun(From) ->
+            gen_statem:reply(From, {error, resource_disconnected})
+        end,
+        Callers
+    ),
+    CHCWorkers = maps:fold(
+        fun
+            (Pid, ChannelId0, Acc) when is_pid(Pid), ChannelId0 == ChannelId ->
+                ?tp(warning, "aborting_channel_hc", #{channel_id => ChannelId, pid => Pid}),
+                abort_channel_health_check(Pid),
+                maps:remove(Pid, Acc);
+            (ChannelId0, _Config, Acc) when ChannelId0 == ChannelId ->
+                maps:remove(ChannelId0, Acc);
+            (_, _, Acc) ->
+                Acc
+        end,
+        CHCWorkers0,
+        CHCWorkers0
+    ),
+    HCWorkers = HCWorkers0#{channel := CHCWorkers#{ongoing := Ongoing}},
+    Pending = Pending0#{channel := CPending},
+    Data0#data{
+        hc_workers = HCWorkers,
+        hc_pending_callers = Pending
+    }.