Jelajahi Sumber

Merge pull request #14319 from thalesmg/20241129-r584-resource-manager-abort-chcs

fix(resource manager): break "remove channels when disconnected" invariant from state machine
Thales Macedo Garitezi 1 tahun lalu
induk
melakukan
d252768262

+ 4 - 0
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -280,10 +280,14 @@ lookup(ConfRootName, Type, Name) ->
             ChannelStatus = maps:get(BridgeV2Id, Channels, undefined),
             {DisplayBridgeV2Status, ErrorMsg} =
                 case {ChannelStatus, ConnectorStatus} of
+                    {_, ?status_disconnected} ->
+                        {?status_disconnected, <<"Resource not operational">>};
                     {#{status := ?status_connected}, _} ->
                         {?status_connected, <<"">>};
                     {#{error := resource_not_operational}, ?status_connecting} ->
                         {?status_connecting, <<"Not installed">>};
+                    {#{error := not_added_yet}, _} ->
+                        {?status_connecting, <<"Not installed">>};
                     {#{status := Status, error := undefined}, _} ->
                         {Status, <<"Unknown reason">>};
                     {#{status := Status, error := Error}, _} ->

+ 2 - 1
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl

@@ -459,7 +459,8 @@ t_repeated_topics(Config) ->
                 emqx_bridge_v2_testlib:create_source_api([{source_name, Name2} | Config]),
             ?assertEqual(
                 match,
-                re:run(Error, <<"Topics .* already exist in other sources">>, [{capture, none}])
+                re:run(Error, <<"Topics .* already exist in other sources">>, [{capture, none}]),
+                #{error => Error}
             ),
             ok
         end,

+ 10 - 7
apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl

@@ -127,6 +127,8 @@ common_init_per_testcase(TestCase, Config0) ->
     ),
     ok = snabbkaffe:start_trace(),
     [
+        {bridge_type, ?BRIDGE_TYPE_BIN},
+        {bridge_name, Name},
         {oracle_name, Name},
         {oracle_config_string, ConfigString},
         {oracle_config, OracleConfig}
@@ -730,18 +732,20 @@ t_no_sid_nor_service_name(Config0) ->
     ok.
 
 t_missing_table(Config) ->
-    ResourceId = resource_id(Config),
+    Name = ?config(bridge_name, Config),
     ?check_trace(
         begin
             drop_table_if_exists(Config),
             ?assertMatch({ok, _}, create_bridge_api(Config)),
-            ActionId = emqx_bridge_v2:id(?BRIDGE_TYPE_BIN, ?config(oracle_name, Config)),
             ?retry(
                 _Sleep = 1_000,
                 _Attempts = 20,
                 ?assertMatch(
-                    {ok, Status} when Status =:= disconnected orelse Status =:= connecting,
-                    emqx_resource_manager:health_check(ResourceId)
+                    {ok, #{
+                        <<"status">> := <<"disconnected">>,
+                        <<"status_reason">> := <<"{unhealthy_target,", _/binary>>
+                    }},
+                    emqx_bridge_testlib:get_bridge_api(Config)
                 )
             ),
             ?block_until(#{?snk_kind := oracle_undefined_table}),
@@ -752,10 +756,9 @@ t_missing_table(Config) ->
                 payload => ?config(oracle_name, Config),
                 retain => true
             },
-            Message = {ActionId, Params},
             ?assertMatch(
-                {error, {resource_error, #{reason := not_connected}}},
-                emqx_resource:simple_sync_query(ResourceId, Message)
+                {error, {resource_error, #{reason := unhealthy_target}}},
+                emqx_bridge_v2:send_message(?BRIDGE_TYPE_BIN, Name, Params, _QueryOpts = #{})
             ),
             ok
         end,

+ 21 - 1
apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl

@@ -803,17 +803,37 @@ t_table_removed(Config) ->
     BridgeType = ?config(pgsql_bridge_type, Config),
     ?check_trace(
         begin
+            ct:pal("creating table"),
             connect_and_create_table(Config),
-            ?assertMatch({ok, _}, create_bridge(Config)),
+            ct:pal("creating bridge"),
+            ?assertMatch(
+                {ok, _},
+                create_bridge(Config, #{
+                    <<"resource_opts">> => #{
+                        <<"health_check_interval">> => <<"1s">>
+                    }
+                })
+            ),
+            ct:pal("checking bridge health"),
             ?retry(
                 _Sleep = 100,
                 _Attempts = 200,
                 ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name))
             ),
+            ct:pal("dropping table"),
             connect_and_drop_table(Config),
             Val = integer_to_binary(erlang:unique_integer()),
             SentData = #{payload => Val, timestamp => 1668602148000},
             ActionId = emqx_bridge_v2:id(BridgeType, Name),
+            ?retry(
+                _Sleep = 100,
+                _Attempts = 200,
+                ?assertMatch(
+                    #{error := {unhealthy_target, _}, status := disconnected},
+                    emqx_bridge_v2:health_check(BridgeType, Name)
+                )
+            ),
+            ct:pal("sending query"),
             case query_resource_sync(Config, {ActionId, SentData}) of
                 {error, {unrecoverable_error, _}} ->
                     ok;

+ 8 - 1
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -1321,7 +1321,7 @@ extract_connector_id(Id) when is_binary(Id) ->
 %% There is no need to query the conncector if the channel is not
 %% installed as the query will fail anyway.
 pre_query_channel_check(Id, {Id, _} = _Request, ChanSt, IsSimpleQuery) ->
-    case emqx_resource_manager:channel_status_is_channel_added(ChanSt) of
+    case is_channel_apt_for_queries(ChanSt) of
         true ->
             ok;
         false ->
@@ -2365,6 +2365,13 @@ buffer_worker(_Tid) ->
 is_simple_query(#{simple_query := Bool}) ->
     Bool.
 
+is_channel_apt_for_queries(?status_connected) ->
+    true;
+is_channel_apt_for_queries(?status_connecting) ->
+    true;
+is_channel_apt_for_queries(_) ->
+    false.
+
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 adjust_batch_time_test_() ->

+ 228 - 113
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -50,12 +50,12 @@
     is_exist/1,
     get_metrics/1,
     reset_metrics/1,
-    channel_status_is_channel_added/1,
     get_query_mode_and_last_error/2
 ]).
 
 -export([
-    set_resource_status_connecting/1
+    set_resource_status_connecting/1,
+    external_error/1
 ]).
 
 % Server
@@ -71,6 +71,13 @@
 -export([stop/2]).
 -endif.
 
+%%------------------------------------------------------------------------------
+%% Type definitions
+%%------------------------------------------------------------------------------
+
+-define(not_added_yet, {?MODULE, not_added_yet}).
+-define(add_channel_failed(REASON), {?MODULE, add_channel_failed, REASON}).
+
 % State record
 -record(data, {
     id,
@@ -101,8 +108,8 @@
     },
     %% Callers waiting on health check
     hc_pending_callers = #{resource => [], channel => #{}} :: #{
-        resource := [gen_server:from()],
-        channel := #{channel_id() => [gen_server:from()]}
+        resource := [gen_statem:from()],
+        channel := #{channel_id() => [gen_statem:from()]}
     },
     extra
 }).
@@ -146,11 +153,15 @@
 %% calls/casts/generic timeouts
 -record(add_channel, {channel_id :: channel_id(), config :: map()}).
 -record(start_channel_health_check, {channel_id :: channel_id()}).
+-record(retry_add_channel, {channel_id :: channel_id()}).
 
 -type generic_timeout(Id, Content) :: {{timeout, Id}, timeout(), Content}.
 -type start_channel_health_check_action() :: generic_timeout(
     #start_channel_health_check{}, #start_channel_health_check{}
 ).
+-type retry_add_channel_action() :: generic_timeout(
+    #retry_add_channel{}, #retry_add_channel{}
+).
 
 %%------------------------------------------------------------------------------
 %% API
@@ -623,9 +634,10 @@ handle_event({call, From}, {channel_health_check, ChannelId}, _State, Data) ->
 %%--------------------------
 %% State: CONNECTING
 %%--------------------------
-handle_event(enter, _OldState, ?state_connecting = State, Data) ->
+handle_event(enter, _OldState, ?state_connecting = State, Data0) ->
+    Data = abort_all_channel_health_checks(Data0),
     ok = log_status_consistency(State, Data),
-    {keep_state_and_data, [{state_timeout, 0, health_check}]};
+    {keep_state, Data, [{state_timeout, 0, health_check}]};
 handle_event(internal, start_resource, ?state_connecting, Data) ->
     start_resource(Data, undefined);
 handle_event(state_timeout, health_check, ?state_connecting, Data) ->
@@ -643,7 +655,7 @@ handle_event(enter, _OldState, ?state_connected = State, Data) ->
     ok = log_status_consistency(State, Data),
     _ = emqx_alarm:safe_deactivate(Data#data.id),
     ?tp(resource_connected_enter, #{}),
-    {keep_state_and_data, resource_health_check_actions(Data)};
+    {keep_state, Data, resource_health_check_actions(Data)};
 handle_event(state_timeout, health_check, ?state_connected, Data) ->
     start_resource_health_check(Data);
 handle_event(
@@ -664,13 +676,17 @@ handle_event(
     Data
 ) ->
     handle_start_channel_health_check(Data, ChannelId);
+handle_event(
+    {timeout, #retry_add_channel{channel_id = ChannelId}}, _, ?state_connected = _State, Data
+) ->
+    handle_retry_add_channel(Data, ChannelId);
 %%--------------------------
 %% State: DISCONNECTED
 %%--------------------------
 handle_event(enter, _OldState, ?state_disconnected = State, Data0) ->
     ok = log_status_consistency(State, Data0),
     ?tp(resource_disconnected_enter, #{}),
-    Data = handle_abort_all_channel_health_checks(Data0),
+    Data = abort_all_channel_health_checks(Data0),
     {keep_state, Data, retry_actions(Data)};
 handle_event(state_timeout, auto_retry, ?state_disconnected, Data) ->
     ?tp(resource_auto_reconnect, #{}),
@@ -679,9 +695,10 @@ handle_event(state_timeout, auto_retry, ?state_disconnected, Data) ->
 %% State: STOPPED
 %% The stopped state is entered after the resource has been explicitly stopped
 %%--------------------------
-handle_event(enter, _OldState, ?state_stopped = State, Data) ->
+handle_event(enter, _OldState, ?state_stopped = State, Data0) ->
+    Data = abort_all_channel_health_checks(Data0),
     ok = log_status_consistency(State, Data),
-    {keep_state_and_data, []};
+    {keep_state, Data};
 %%--------------------------
 %% The following events can be handled in any other state
 %%--------------------------
@@ -716,6 +733,9 @@ handle_event(
     is_map_key(Pid, CHCWorkers)
 ->
     handle_channel_health_check_worker_down(Data0, Pid, Res);
+handle_event({timeout, #retry_add_channel{channel_id = _}}, _, _State, _Data) ->
+    %% We only add channels to the resource state in the connected state.
+    {keep_state_and_data, [postpone]};
 handle_event({timeout, #start_channel_health_check{channel_id = _}}, _, _State, _Data) ->
     %% Stale health check action; currently, we only probe channel health when connected.
     keep_state_and_data;
@@ -814,11 +834,12 @@ start_resource(Data, From) ->
             ),
             _ = maybe_alarm(?status_disconnected, IsDryRun, ResId, Err, Data#data.error),
             %% Add channels and raise alarms
-            NewData1 = channels_health_check(?status_disconnected, add_channels(Data)),
+            {Actions0, NewData1} = channels_health_check(?status_disconnected, add_channels(Data)),
             %% Keep track of the error reason why the connection did not work
             %% so that the Reason can be returned when the verification call is made.
             NewData2 = NewData1#data{status = ?status_disconnected, error = Err},
-            Actions = maybe_reply(retry_actions(NewData2), From, Err),
+            Actions1 = maybe_reply(retry_actions(NewData2), From, Err),
+            Actions = Actions1 ++ Actions0,
             {next_state, ?state_disconnected, update_state(NewData2), Actions}
     end.
 
@@ -848,9 +869,12 @@ maybe_update_callback_mode(Data = #data{mod = ResourceType, state = ResourceStat
             Data#data{callback_mode = CallMode}
     end.
 
-add_channels_in_list([], Data) ->
-    Data;
-add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
+add_channels_in_list(ChannelsWithConfigs, Data) ->
+    add_channels_in_list(ChannelsWithConfigs, Data, _Actions = []).
+
+add_channels_in_list([], Data, Actions) ->
+    {Actions, Data};
+add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data, Actions) ->
     #data{
         id = ResId,
         mod = Mod,
@@ -872,6 +896,7 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
                 channel_status_new_waiting_for_health_check(ChannelConfig),
                 AddedChannelsMap
             ),
+            NewActions = Actions,
             NewData = Data#data{
                 state = NewState,
                 added_channels = NewAddedChannelsMap
@@ -890,16 +915,17 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
             ),
             NewAddedChannelsMap = maps:put(
                 ChannelID,
-                channel_status(Error, ChannelConfig),
+                channel_status(?add_channel_failed(Reason), ChannelConfig),
                 AddedChannelsMap
             ),
+            NewActions = [retry_add_channel_action(ChannelID, ChannelConfig, Data) | Actions],
             NewData = Data#data{
                 added_channels = NewAddedChannelsMap
             },
             %% Raise an alarm since the channel could not be added
             _ = maybe_alarm(?status_disconnected, IsDryRun, ChannelID, Error, no_prev_error)
     end,
-    add_channels_in_list(Rest, NewData).
+    add_channels_in_list(Rest, NewData, NewActions).
 
 maybe_stop_resource(#data{status = Status} = Data) when Status =/= ?rm_status_stopped ->
     stop_resource(Data);
@@ -928,11 +954,11 @@ stop_resource(#data{id = ResId} = Data) ->
 
 remove_channels(Data) ->
     Channels = maps:keys(Data#data.added_channels),
-    remove_channels_in_list(Channels, Data, false).
+    remove_channels_in_list(Channels, Data).
 
-remove_channels_in_list([], Data, _KeepInChannelMap) ->
+remove_channels_in_list([], Data) ->
     Data;
-remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
+remove_channels_in_list([ChannelID | Rest], Data) ->
     #data{
         id = ResId,
         added_channels = AddedChannelsMap,
@@ -942,14 +968,8 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
         type = Type
     } = Data,
     IsDryRun = emqx_resource:is_dry_run(ResId),
-    NewAddedChannelsMap =
-        case KeepInChannelMap of
-            true ->
-                AddedChannelsMap;
-            false ->
-                _ = maybe_clear_alarm(IsDryRun, ChannelID),
-                maps:remove(ChannelID, AddedChannelsMap)
-        end,
+    _ = maybe_clear_alarm(IsDryRun, ChannelID),
+    NewAddedChannelsMap = maps:remove(ChannelID, AddedChannelsMap),
     case safe_call_remove_channel(ResId, Mod, State, ChannelID) of
         {ok, NewState} ->
             NewData = Data#data{
@@ -974,7 +994,7 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
                 added_channels = NewAddedChannelsMap
             }
     end,
-    remove_channels_in_list(Rest, NewData, KeepInChannelMap).
+    remove_channels_in_list(Rest, NewData).
 
 safe_call_remove_channel(_ResId, _Mod, undefined = State, _ChannelID) ->
     {ok, State};
@@ -1042,7 +1062,8 @@ handle_not_connected_add_channel(From, ChannelId, ChannelConfig, State, Data) ->
     NewData = add_or_update_channel_status(Data, ChannelId, ChannelConfig, State),
     {keep_state, update_state(NewData), [{reply, From, ok}]}.
 
-handle_remove_channel(From, ChannelId, Data) ->
+handle_remove_channel(From, ChannelId, Data0) ->
+    Data = abort_health_checks_for_channel(Data0, ChannelId),
     Channels = Data#data.added_channels,
     IsDryRun = emqx_resource:is_dry_run(Data#data.id),
     _ = maybe_clear_alarm(IsDryRun, ChannelId),
@@ -1197,9 +1218,9 @@ continue_resource_health_check_connected(NewStatus, Data0) ->
     case NewStatus of
         ?status_connected ->
             {Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0),
-            Data2 = channels_health_check(?status_connected, Data1),
+            {Actions0, Data2} = channels_health_check(?status_connected, Data1),
             Data = update_state(Data2),
-            Actions = Replies ++ resource_health_check_actions(Data),
+            Actions = Replies ++ Actions0 ++ resource_health_check_actions(Data),
             {keep_state, Data, Actions};
         _ ->
             #data{id = ResId, group = Group, type = Type} = Data0,
@@ -1218,8 +1239,8 @@ continue_resource_health_check_connected(NewStatus, Data0) ->
             %% between the two here, as resource manager also has `stopped', which is
             %% not a valid status at the time of writing.
             {Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0),
-            Data = channels_health_check(NewStatus, Data1),
-            Actions = Replies,
+            {Actions0, Data} = channels_health_check(NewStatus, Data1),
+            Actions = Replies ++ Actions0,
             {next_state, NewStatus, Data, Actions}
     end.
 
@@ -1228,16 +1249,16 @@ continue_resource_health_check_not_connected(NewStatus, Data0) ->
     {Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0),
     case NewStatus of
         ?status_connected ->
-            Data = channels_health_check(?status_connected, Data1),
-            Actions = Replies,
+            {Actions0, Data} = channels_health_check(?status_connected, Data1),
+            Actions = Replies ++ Actions0,
             {next_state, ?state_connected, Data, Actions};
         ?status_connecting ->
-            Data = channels_health_check(?status_connecting, Data1),
-            Actions = Replies ++ resource_health_check_actions(Data),
+            {Actions0, Data} = channels_health_check(?status_connecting, Data1),
+            Actions = Replies ++ Actions0 ++ resource_health_check_actions(Data),
             {next_state, ?status_connecting, Data, Actions};
         ?status_disconnected ->
-            Data = channels_health_check(?status_disconnected, Data1),
-            Actions = Replies,
+            {Actions0, Data} = channels_health_check(?status_disconnected, Data1),
+            Actions = Replies ++ Actions0,
             {next_state, ?state_disconnected, Data, Actions}
     end.
 
@@ -1277,7 +1298,9 @@ handle_manual_channel_health_check(
     is_map_key(ChannelId, Channels)
 ->
     %% No ongoing health check: reply with current status.
-    {keep_state_and_data, [{reply, From, without_channel_config(maps:get(ChannelId, Channels))}]};
+    {keep_state_and_data, [
+        {reply, From, to_external_channel_status(maps:get(ChannelId, Channels))}
+    ]};
 handle_manual_channel_health_check(
     From,
     _Data,
@@ -1287,22 +1310,21 @@ handle_manual_channel_health_check(
         {reply, From, channel_error_status(channel_not_found)}
     ]}.
 
--spec channels_health_check(resource_status(), data()) -> data().
+-spec channels_health_check(resource_status(), data()) -> {[gen_statem:action()], data()}.
 channels_health_check(?status_connected = _ConnectorStatus, Data0) ->
     Channels = maps:to_list(Data0#data.added_channels),
-    %% All channels with a status different from connected or connecting are
-    %% not added
     ChannelsNotAdded = [
         ChannelId
      || {ChannelId, Status} <- Channels,
         not channel_status_is_channel_added(Status)
     ],
-    %% Attempt to add channels that are not added
+    %% Attempt to add channels to resource state that are not added yet
     ChannelsNotAddedWithConfigs = get_config_for_channels(Data0, ChannelsNotAdded),
-    Data1 = add_channels_in_list(ChannelsNotAddedWithConfigs, Data0),
-    %% Now that we have done the adding, we can get the status of all channels
+    {Actions, Data1} = add_channels_in_list(ChannelsNotAddedWithConfigs, Data0),
+    %% Now that we have done the adding, we can get the status of all channels (execept
+    %% unhealthy ones)
     Data2 = trigger_health_check_for_added_channels(Data1),
-    update_state(Data2);
+    {Actions, update_state(Data2)};
 channels_health_check(?status_connecting = _ConnectorStatus, Data0) ->
     %% Whenever the resource is connecting:
     %% 1. Change the status of all added channels to connecting
@@ -1340,33 +1362,35 @@ channels_health_check(?status_connecting = _ConnectorStatus, Data0) ->
         ChannelsWithNewAndPrevErrorStatuses
     ),
     Data1 = Data0#data{added_channels = NewChannels},
-    update_state(Data1);
-channels_health_check(ConnectorStatus, Data0) ->
-    %% Whenever the resource is not connected and not connecting:
-    %% 1. Remove all added channels
-    %% 2. Change the status to an error status
-    %% 3. Raise alarms
-    Channels = Data0#data.added_channels,
-    ChannelsToRemove = [
-        ChannelId
-     || {ChannelId, Status} <- maps:to_list(Channels),
-        channel_status_is_channel_added(Status)
-    ],
-    Data1 = remove_channels_in_list(ChannelsToRemove, Data0, true),
+    {_Actions = [], update_state(Data1)};
+channels_health_check(?status_disconnected = ConnectorStatus, Data1) ->
+    %% Whenever the resource is disconnected:
+    %% 1. Change the status of channels to an error status
+    %%    - Except for channels yet to be added to the resource state.  Those need to keep
+    %%    those special errors so they are added or retried.
+    %% 2. Raise alarms
+    Channels = Data1#data.added_channels,
     ChannelsWithNewAndOldStatuses =
-        [
-            {ChannelId, OldStatus,
-                channel_status(
-                    {error,
-                        resource_not_connected_channel_error_msg(
-                            ConnectorStatus,
-                            ChannelId,
-                            Data1
-                        )},
-                    Config
-                )}
-         || {ChannelId, #{config := Config} = OldStatus} <- maps:to_list(Data1#data.added_channels)
-        ],
+        lists:map(
+            fun
+                ({ChannelId, #{error := ?not_added_yet} = OldStatus}) ->
+                    {ChannelId, OldStatus, OldStatus};
+                ({ChannelId, #{error := ?add_channel_failed(_)} = OldStatus}) ->
+                    {ChannelId, OldStatus, OldStatus};
+                ({ChannelId, #{config := Config} = OldStatus}) ->
+                    {ChannelId, OldStatus,
+                        channel_status(
+                            {error,
+                                resource_not_connected_channel_error_msg(
+                                    ConnectorStatus,
+                                    ChannelId,
+                                    Data1
+                                )},
+                            Config
+                        )}
+            end,
+            maps:to_list(Data1#data.added_channels)
+        ),
     %% Raise alarms
     IsDryRun = emqx_resource:is_dry_run(Data1#data.id),
     _ = lists:foreach(
@@ -1384,7 +1408,7 @@ channels_health_check(ConnectorStatus, Data0) ->
         ChannelsWithNewAndOldStatuses
     ),
     Data2 = Data1#data{added_channels = NewChannels},
-    update_state(Data2).
+    {_Actions = [], update_state(Data2)}.
 
 resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1) ->
     ResourceId = Data1#data.id,
@@ -1404,27 +1428,40 @@ resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1) ->
 generic_timeout_action(Id, Timeout, Content) ->
     {{timeout, Id}, Timeout, Content}.
 
--spec start_channel_health_check_action(channel_id(), map(), map(), data() | timeout()) ->
+-spec start_channel_health_check_action(channel_id(), map(), map(), data()) ->
     [start_channel_health_check_action()].
 start_channel_health_check_action(ChannelId, NewChanStatus, PreviousChanStatus, Data = #data{}) ->
-    Timeout = get_channel_health_check_interval(ChannelId, NewChanStatus, PreviousChanStatus, Data),
+    ConfigSources =
+        lists:map(
+            fun
+                (#{config := Config}) ->
+                    Config;
+                (_) ->
+                    #{}
+            end,
+            [NewChanStatus, PreviousChanStatus]
+        ),
+    Timeout = get_channel_health_check_interval(ChannelId, ConfigSources, Data),
     Event = #start_channel_health_check{channel_id = ChannelId},
     [generic_timeout_action(Event, Timeout, Event)].
 
-get_channel_health_check_interval(ChannelId, NewChanStatus, PreviousChanStatus, Data) ->
+-spec retry_add_channel_action(channel_id(), map(), data()) -> retry_add_channel_action().
+retry_add_channel_action(ChannelId, ChannelConfig, Data) ->
+    Timeout = get_channel_health_check_interval(ChannelId, [ChannelConfig], Data),
+    Event = #retry_add_channel{channel_id = ChannelId},
+    generic_timeout_action(Event, Timeout, Event).
+
+get_channel_health_check_interval(ChannelId, ConfigSources, Data) ->
     emqx_utils:foldl_while(
         fun
-            (#{config := #{resource_opts := #{health_check_interval := HCInterval}}}, _Acc) ->
+            (#{resource_opts := #{health_check_interval := HCInterval}}, _Acc) ->
                 {halt, HCInterval};
             (_, Acc) ->
                 {cont, Acc}
         end,
         ?HEALTHCHECK_INTERVAL,
-        [
-            NewChanStatus,
-            PreviousChanStatus,
-            maps:get(ChannelId, Data#data.added_channels, #{})
-        ]
+        ConfigSources ++
+            [emqx_utils_maps:deep_get([ChannelId, config], Data#data.added_channels, #{})]
     ).
 
 %% Currently, we only call resource channel health checks when the underlying resource is
@@ -1437,7 +1474,7 @@ trigger_health_check_for_added_channels(Data0 = #data{hc_workers = HCWorkers0})
     NewOngoing = maps:filter(
         fun(ChannelId, OldStatus) ->
             (not is_map_key(ChannelId, Ongoing0)) andalso
-                channel_status_is_channel_added(OldStatus)
+                is_channel_apt_for_health_check(OldStatus)
         end,
         Data0#data.added_channels
     ),
@@ -1470,12 +1507,10 @@ continue_channel_health_check_connected(ChannelId, OldStatus, CurrentStatus, Dat
             Data1
     end.
 
-continue_channel_health_check_connected_no_update_during_check(ChannelId, OldStatus, Data1) ->
+continue_channel_health_check_connected_no_update_during_check(ChannelId, OldStatus, Data) ->
     %% Remove the added channels with a status different from connected or connecting
-    NewStatus = maps:get(ChannelId, Data1#data.added_channels),
-    ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)],
-    Data = remove_channels_in_list(ChannelsToRemove, Data1, true),
-    IsDryRun = emqx_resource:is_dry_run(Data1#data.id),
+    NewStatus = maps:get(ChannelId, Data#data.added_channels),
+    IsDryRun = emqx_resource:is_dry_run(Data#data.id),
     %% Raise/clear alarms
     case NewStatus of
         #{status := ?status_connected} ->
@@ -1585,7 +1620,7 @@ handle_channel_health_check_worker_down_new_channels_and_status(
 reply_pending_channel_health_check_callers(
     ChannelId, Status0, Data0 = #data{hc_pending_callers = Pending0}
 ) ->
-    Status = without_channel_config(Status0),
+    Status = to_external_channel_status(Status0),
     #{channel := CPending0} = Pending0,
     Pending = maps:get(ChannelId, CPending0, []),
     Actions = [{reply, From, Status} || From <- Pending],
@@ -1593,6 +1628,21 @@ reply_pending_channel_health_check_callers(
     Data = Data0#data{hc_pending_callers = Pending0#{channel := CPending}},
     {Actions, Data}.
 
+handle_retry_add_channel(Data0, ChannelId) ->
+    ?tp(retry_add_channel, #{channel_id => ChannelId}),
+    maybe
+        {ok, StatusMap} ?= maps:find(ChannelId, Data0#data.added_channels),
+        %% Must contain config map if in data.
+        #{config := #{} = ChannelConfig} = StatusMap,
+        {Actions, Data1} = add_channels_in_list([{ChannelId, ChannelConfig}], Data0),
+        Data = trigger_health_check_for_added_channels(Data1),
+        {keep_state, Data, Actions}
+    else
+        error ->
+            %% Channel has been removed since timer was set?
+            keep_state_and_data
+    end.
+
 get_config_for_channels(Data0, ChannelsWithoutConfig) ->
     ResId = Data0#data.id,
     Mod = Data0#data.mod,
@@ -1651,7 +1701,7 @@ maybe_alarm(_Status, false, ResId, Error, _PrevError) ->
             {error, Reason} ->
                 emqx_utils:readable_error_msg(Reason);
             _ ->
-                Error1 = without_channel_config(Error),
+                Error1 = to_external_channel_status(Error),
                 emqx_utils:readable_error_msg(Error1)
         end,
     emqx_alarm:safe_activate(
@@ -1659,11 +1709,15 @@ maybe_alarm(_Status, false, ResId, Error, _PrevError) ->
         #{resource_id => ResId, reason => resource_down},
         <<"resource down: ", HrError/binary>>
     ),
-    ?tp(resource_activate_alarm, #{resource_id => ResId}).
+    ?tp(resource_activate_alarm, #{resource_id => ResId, error => HrError}).
 
 without_channel_config(Map) ->
     maps:without([config], Map).
 
+to_external_channel_status(StatusMap0) ->
+    StatusMap = without_channel_config(StatusMap0),
+    maps:update_with(error, fun external_error/1, StatusMap).
+
 -spec maybe_resume_resource_workers(resource_id(), resource_status()) -> ok.
 maybe_resume_resource_workers(ResId, ?status_connected) ->
     lists:foreach(
@@ -1704,6 +1758,8 @@ status_to_error(_) ->
     {error, undefined}.
 
 %% Compatibility
+external_error(?not_added_yet) -> not_added_yet;
+external_error(?add_channel_failed(Reason)) -> external_error(Reason);
 external_error({error, Reason}) -> Reason;
 external_error(Other) -> Other.
 
@@ -1716,7 +1772,9 @@ maybe_reply(Actions, From, Reply) ->
 data_record_to_external_map(Data) ->
     AddedChannelsWithoutConfigs =
         maps:map(
-            fun(_ChanID, Status) -> without_channel_config(Status) end,
+            fun(_ChanID, Status) ->
+                to_external_channel_status(Status)
+            end,
             Data#data.added_channels
         ),
     #{
@@ -1776,7 +1834,7 @@ channel_status_not_added(ChannelConfig) ->
         %%                 connected and the on_channel_get_status callback has returned
         %%                 connected. The error field should be undefined.
         status => ?status_disconnected,
-        error => not_added_yet,
+        error => ?not_added_yet,
         config => ChannelConfig
     }.
 
@@ -1825,6 +1883,12 @@ channel_status({?status_connected, Error}, ChannelConfig) ->
         error => Error,
         config => ChannelConfig
     };
+channel_status(?add_channel_failed(_Reason) = Error, ChannelConfig) ->
+    #{
+        status => ?status_disconnected,
+        error => Error,
+        config => ChannelConfig
+    };
 channel_status({error, Reason}, ChannelConfig) ->
     S = channel_error_status(Reason),
     S#{config => ChannelConfig}.
@@ -1835,19 +1899,24 @@ channel_error_status(Reason) ->
         error => Reason
     }.
 
-channel_status_is_channel_added(#{status := St}) ->
-    channel_status_is_channel_added(St);
-channel_status_is_channel_added(?status_connected) ->
-    true;
-channel_status_is_channel_added(?status_connecting) ->
-    true;
-channel_status_is_channel_added(_Status) ->
-    false.
+is_channel_apt_for_health_check(#{error := {unhealthy_target, _}}) ->
+    false;
+is_channel_apt_for_health_check(#{error := unhealthy_target}) ->
+    false;
+is_channel_apt_for_health_check(StatusMap) ->
+    channel_status_is_channel_added(StatusMap).
+
+channel_status_is_channel_added(#{error := ?not_added_yet}) ->
+    false;
+channel_status_is_channel_added(#{error := ?add_channel_failed(_)}) ->
+    false;
+channel_status_is_channel_added(_StatusMap) ->
+    true.
 
 -spec add_or_update_channel_status(data(), channel_id(), map(), resource_state()) -> data().
 add_or_update_channel_status(Data, ChannelId, ChannelConfig, State) ->
     Channels = Data#data.added_channels,
-    ChannelStatus = channel_status({error, resource_not_operational}, ChannelConfig),
+    ChannelStatus = channel_status_not_added(ChannelConfig),
     NewChannels = maps:put(ChannelId, ChannelStatus, Channels),
     ResStatus = state_to_status(State),
     IsDryRun = emqx_resource:is_dry_run(ChannelId),
@@ -1866,10 +1935,18 @@ tag(Group, Type) ->
     Str = emqx_utils_conv:str(Group) ++ "/" ++ emqx_utils_conv:str(Type),
     string:uppercase(Str).
 
+%% For still unknown reasons (e.g.: `emqx_metrics_worker' process might die?), metrics
+%% might be lost for a running resource, and future attempts to bump them result in
+%% errors.  As mitigation, we ensure such metrics are created here so that restarting
+%% the resource or resetting its metrics can recreate them.
+ensure_metrics(ResId) ->
+    {ok, _} = emqx_resource:ensure_metrics(ResId),
+    ok.
+
 %% When a resource enters a `?status_disconnected' state, late channel health check
 %% replies are useless and could corrup state.
--spec handle_abort_all_channel_health_checks(data()) -> data().
-handle_abort_all_channel_health_checks(Data0) ->
+-spec abort_all_channel_health_checks(data()) -> data().
+abort_all_channel_health_checks(Data0) ->
     #data{
         hc_workers = #{channel := CHCWorkers} = HCWorkers0,
         hc_pending_callers = #{channel := CPending} = Pending0
@@ -1898,17 +1975,55 @@ handle_abort_all_channel_health_checks(Data0) ->
 
 abort_channel_health_check(Pid) ->
     %% We're already linked to the worker pids due to `spawn_link'.
+    MRef = monitor(process, Pid),
     exit(Pid, kill),
+    receive
+        {'DOWN', MRef, process, Pid, _} ->
+            ok
+    end,
     %% Clean the exit signal so it doesn't contaminate state handling.
     receive
         {'EXIT', Pid, _} ->
             ok
+    after 0 -> ok
     end.
 
-%% For still unknown reasons (e.g.: `emqx_metrics_worker' process might die?), metrics
-%% might be lost for a running resource, and future attempts to bump them result in
-%% errors.  As mitigation, we ensure such metrics are created here so that restarting
-%% the resource or resetting its metrics can recreate them.
-ensure_metrics(ResId) ->
-    {ok, _} = emqx_resource:ensure_metrics(ResId),
-    ok.
+map_take_or(Map, Key, Default) ->
+    maybe
+        error ?= maps:take(Key, Map),
+        {Default, Map}
+    end.
+
+abort_health_checks_for_channel(Data0, ChannelId) ->
+    #data{
+        hc_workers = #{channel := #{ongoing := Ongoing0} = CHCWorkers0} = HCWorkers0,
+        hc_pending_callers = #{channel := CPending0} = Pending0
+    } = Data0,
+    Ongoing = maps:remove(ChannelId, Ongoing0),
+    {Callers, CPending} = map_take_or(CPending0, ChannelId, []),
+    lists:foreach(
+        fun(From) ->
+            gen_statem:reply(From, {error, resource_disconnected})
+        end,
+        Callers
+    ),
+    CHCWorkers = maps:fold(
+        fun
+            (Pid, ChannelId0, Acc) when is_pid(Pid), ChannelId0 == ChannelId ->
+                ?tp(warning, "aborting_channel_hc", #{channel_id => ChannelId, pid => Pid}),
+                abort_channel_health_check(Pid),
+                maps:remove(Pid, Acc);
+            (ChannelId0, _Config, Acc) when ChannelId0 == ChannelId ->
+                maps:remove(ChannelId0, Acc);
+            (_, _, Acc) ->
+                Acc
+        end,
+        CHCWorkers0,
+        CHCWorkers0
+    ),
+    HCWorkers = HCWorkers0#{channel := CHCWorkers#{ongoing := Ongoing}},
+    Pending = Pending0#{channel := CPending},
+    Data0#data{
+        hc_workers = HCWorkers,
+        hc_pending_callers = Pending
+    }.

+ 5 - 0
changes/ce/fix-14319.en.md

@@ -0,0 +1,5 @@
+Refactored resource management internal state machine.  As consequence, some race condition bugs have been eliminated.  One such example is the HTTP action, which, when under incoming traffic and when its health check flap, may produce errors like the following:
+
+```
+2024-11-29T14:58:17.994119+00:00 [error] msg: action_not_found, connector: <<"connector:http:a">>, action_id: <<"action:http:a:connector:http:a">>
+```