浏览代码

Merge pull request #12830 from thalesmg/async-channel-hc-m-20240404

feat(resource): non-blocking channel health checks
Thales Macedo Garitezi 1 年之前
父节点
当前提交
04ba2aaf8a

+ 6 - 0
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -858,6 +858,12 @@ do_start_stop_bridges(Type, Config) ->
             <<"status_reason">> := <<"connack_timeout">>
         } ->
             ok;
+        #{
+            <<"node_status">> := [_, _ | _],
+            <<"status">> := <<"disconnected">>,
+            <<"status_reason">> := <<"connack_timeout">>
+        } ->
+            ok;
         #{
             <<"node_status">> := [_],
             <<"status">> := <<"connecting">>

+ 175 - 45
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -61,7 +61,7 @@
 -export([init/1, callback_mode/0, handle_event/4, terminate/3]).
 
 %% Internal exports.
--export([worker_resource_health_check/1]).
+-export([worker_resource_health_check/1, worker_channel_health_check/2]).
 
 % State record
 -record(data, {
@@ -78,12 +78,24 @@
     pid,
     added_channels = #{},
     %% Reference to process performing resource health check.
-    hc_workers = #{resource => #{}, channel => #{}} :: #{
-        resource | channel := #{{pid(), reference()} => true}
+    hc_workers = #{
+        resource => #{},
+        channel => #{
+            pending => [],
+            previous_status => #{}
+        }
+    } :: #{
+        resource := #{{pid(), reference()} => true},
+        channel := #{
+            {pid(), reference()} => channel_id(),
+            pending := [channel_id()],
+            previous_status := #{channel_id() => channel_status_map()}
+        }
     },
     %% Callers waiting on health check
-    hc_pending_callers = #{resource => [], channel => []} :: #{
-        resource | channel := [gen_server:from()]
+    hc_pending_callers = #{resource => [], channel => #{}} :: #{
+        resource := [gen_server:from()],
+        channel := #{channel_id() => [gen_server:from()]}
     },
     extra
 }).
@@ -107,6 +119,12 @@
 -define(state_disconnected, disconnected).
 -define(state_stopped, stopped).
 
+-type state() ::
+    ?state_stopped
+    | ?state_disconnected
+    | ?state_connecting
+    | ?state_connected.
+
 -define(IS_STATUS(ST),
     ST =:= ?status_connecting; ST =:= ?status_connected; ST =:= ?status_disconnected
 ).
@@ -339,6 +357,7 @@ add_channel(ResId, ChannelId, Config) ->
     Result = safe_call(ResId, {add_channel, ChannelId, Config}, ?T_OPERATION),
     %% Wait for health_check to finish
     _ = health_check(ResId),
+    _ = channel_health_check(ResId, ChannelId),
     Result.
 
 remove_channel(ResId, ChannelId) ->
@@ -538,11 +557,20 @@ handle_event(
     info,
     {'DOWN', Ref, process, Pid, Res},
     State0,
-    Data0 = #data{hc_workers = #{resource := HCWorkers}}
+    Data0 = #data{hc_workers = #{resource := RHCWorkers}}
 ) when
-    is_map_key({Pid, Ref}, HCWorkers)
+    is_map_key({Pid, Ref}, RHCWorkers)
 ->
     handle_resource_health_check_worker_down(State0, Data0, {Pid, Ref}, Res);
+handle_event(
+    info,
+    {'DOWN', Ref, process, Pid, Res},
+    _State,
+    Data0 = #data{hc_workers = #{channel := CHCWorkers}}
+) when
+    is_map_key({Pid, Ref}, CHCWorkers)
+->
+    handle_channel_health_check_worker_down(Data0, {Pid, Ref}, Res);
 % Ignore all other events
 handle_event(EventType, EventData, State, Data) ->
     ?SLOG(
@@ -558,7 +586,7 @@ handle_event(EventType, EventData, State, Data) ->
     keep_state_and_data.
 
 log_status_consistency(Status, #data{status = Status} = Data) ->
-    log_cache_consistency(read_cache(Data#data.id), Data);
+    log_cache_consistency(read_cache(Data#data.id), remove_runtime_data(Data));
 log_status_consistency(Status, Data) ->
     ?tp(warning, "inconsistent_status", #{
         status => Status,
@@ -869,7 +897,7 @@ handle_manual_resource_health_check(From, Data0) ->
     Data = Data0#data{hc_pending_callers = Pending},
     start_resource_health_check(Data).
 
-reply_pending_health_check_callers(Status, resource, Data0 = #data{hc_pending_callers = Pending0}) ->
+reply_pending_resource_health_check_callers(Status, Data0 = #data{hc_pending_callers = Pending0}) ->
     #{resource := RPending} = Pending0,
     Actions = [{reply, From, {ok, Status}} || From <- RPending],
     Data = Data0#data{hc_pending_callers = Pending0#{resource := []}},
@@ -888,13 +916,13 @@ start_resource_health_check(#data{hc_workers = #{resource := HCWorkers}}) when
     keep_state_and_data;
 start_resource_health_check(#data{} = Data0) ->
     #data{hc_workers = HCWorkers0 = #{resource := RHCWorkers0}} = Data0,
-    WorkerRef = {_Pid, _Ref} = spawn_health_check_worker(Data0),
+    WorkerRef = {_Pid, _Ref} = spawn_resource_health_check_worker(Data0),
     HCWorkers = HCWorkers0#{resource := RHCWorkers0#{WorkerRef => true}},
     Data = Data0#data{hc_workers = HCWorkers},
     {keep_state, Data}.
 
--spec spawn_health_check_worker(data()) -> {pid(), reference()}.
-spawn_health_check_worker(#data{} = Data) ->
+-spec spawn_resource_health_check_worker(data()) -> {pid(), reference()}.
+spawn_resource_health_check_worker(#data{} = Data) ->
     spawn_monitor(?MODULE, worker_resource_health_check, [Data]).
 
 %% separated so it can be spec'ed and placate dialyzer tantrums...
@@ -939,7 +967,7 @@ continue_with_health_check(#data{} = Data0, CurrentState, HCRes) ->
 continue_resource_health_check_connected(NewStatus, Data0) ->
     case NewStatus of
         ?status_connected ->
-            {Replies, Data1} = reply_pending_health_check_callers(NewStatus, resource, Data0),
+            {Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0),
             Data2 = channels_health_check(?status_connected, Data1),
             Data = update_state(Data2, Data0),
             Actions = Replies ++ health_check_actions(Data),
@@ -954,13 +982,13 @@ continue_resource_health_check_connected(NewStatus, Data0) ->
             %% subset of resource manager state...  But there should be a conversion
             %% between the two here, as resource manager also has `stopped', which is
             %% not a valid status at the time of writing.
-            {Replies, Data} = reply_pending_health_check_callers(NewStatus, resource, Data0),
+            {Replies, Data} = reply_pending_resource_health_check_callers(NewStatus, Data0),
             {next_state, NewStatus, channels_health_check(NewStatus, Data), Replies}
     end.
 
 %% Continuation to be used when the current resource state is not `?state_connected'.
 continue_resource_health_check_not_connected(NewStatus, Data0) ->
-    {Replies, Data} = reply_pending_health_check_callers(NewStatus, resource, Data0),
+    {Replies, Data} = reply_pending_resource_health_check_callers(NewStatus, Data0),
     case NewStatus of
         ?status_connected ->
             {next_state, ?state_connected, channels_health_check(?status_connected, Data), Replies};
@@ -975,6 +1003,30 @@ continue_resource_health_check_not_connected(NewStatus, Data0) ->
 
 handle_manual_channel_health_check(From, #data{state = undefined}, _ChannelId) ->
     {keep_state_and_data, [{reply, From, channel_status({error, resource_disconnected})}]};
+handle_manual_channel_health_check(
+    From,
+    #data{
+        added_channels = Channels,
+        hc_pending_callers = #{channel := CPending0} = Pending0,
+        hc_workers = #{channel := #{previous_status := PreviousStatus}}
+    } = Data0,
+    ChannelId
+) when
+    is_map_key(ChannelId, Channels),
+    is_map_key(ChannelId, PreviousStatus)
+->
+    %% Ongoing health check.
+    CPending = maps:update_with(
+        ChannelId,
+        fun(OtherCallers) ->
+            [From | OtherCallers]
+        end,
+        [From],
+        CPending0
+    ),
+    Pending = Pending0#{channel := CPending},
+    Data = Data0#data{hc_pending_callers = Pending},
+    {keep_state, Data};
 handle_manual_channel_health_check(
     From,
     #data{added_channels = Channels} = _Data,
@@ -982,6 +1034,7 @@ handle_manual_channel_health_check(
 ) when
     is_map_key(ChannelId, Channels)
 ->
+    %% No ongoing health check: reply with current status.
     {keep_state_and_data, [{reply, From, maps:get(ChannelId, Channels)}]};
 handle_manual_channel_health_check(
     From,
@@ -990,10 +1043,6 @@ handle_manual_channel_health_check(
 ) ->
     {keep_state_and_data, [{reply, From, channel_status({error, channel_not_found})}]}.
 
-get_channel_status_channel_added(#data{id = ResId, mod = Mod, state = State}, ChannelId) ->
-    RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State),
-    channel_status(RawStatus).
-
 -spec channels_health_check(resource_status(), data()) -> data().
 channels_health_check(?status_connected = _ConnectorStatus, Data0) ->
     Channels = maps:to_list(Data0#data.added_channels),
@@ -1009,8 +1058,7 @@ channels_health_check(?status_connected = _ConnectorStatus, Data0) ->
         get_config_for_channels(Data0, ChannelsNotAdded),
     Data1 = add_channels_in_list(ChannelsNotAddedWithConfigs, Data0),
     %% Now that we have done the adding, we can get the status of all channels
-    Data2 = channel_status_for_all_channels(Data1),
-    update_state(Data2, Data0);
+    trigger_health_check_for_added_channels(Data1);
 channels_health_check(?status_connecting = _ConnectorStatus, Data0) ->
     %% Whenever the resource is connecting:
     %% 1. Change the status of all added channels to connecting
@@ -1105,41 +1153,117 @@ resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1) ->
         )
     ).
 
-channel_status_for_all_channels(Data) ->
-    Channels = maps:to_list(Data#data.added_channels),
-    AddedChannelsWithOldAndNewStatus = [
-        {ChannelId, OldStatus, get_channel_status_channel_added(Data, ChannelId)}
-     || {ChannelId, OldStatus} <- Channels,
+%% Currently, we only call resource channel health checks when the underlying resource is
+%% `?status_connected'.
+-spec trigger_health_check_for_added_channels(data()) -> data().
+trigger_health_check_for_added_channels(Data0 = #data{hc_workers = HCWorkers0}) ->
+    #{channel := CHCWorkers0} = HCWorkers0,
+    PreviousStatus = maps:from_list([
+        {ChannelId, OldStatus}
+     || {ChannelId, OldStatus} <- maps:to_list(Data0#data.added_channels),
         channel_status_is_channel_added(OldStatus)
-    ],
+    ]),
+    ChannelsToCheck = maps:keys(PreviousStatus),
+    case ChannelsToCheck of
+        [] ->
+            %% Nothing to do.
+            Data0;
+        [ChannelId | Rest] ->
+            %% Shooting one check at a time.  We could increase concurrency in the future.
+            CHCWorkers = CHCWorkers0#{pending := Rest, previous_status := PreviousStatus},
+            Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}},
+            start_channel_health_check(Data1, ChannelId)
+    end.
+
+-spec continue_channel_health_check_connected(data()) -> data().
+continue_channel_health_check_connected(Data0) ->
+    #data{hc_workers = HCWorkers0} = Data0,
+    #{channel := #{previous_status := PreviousStatus} = CHCWorkers0} = HCWorkers0,
+    CHCWorkers = CHCWorkers0#{previous_status := #{}},
+    Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}},
     %% Remove the added channels with a a status different from connected or connecting
+    CheckedChannels = [
+        {ChannelId, NewStatus}
+     || {ChannelId, NewStatus} <- maps:to_list(Data0#data.added_channels),
+        is_map_key(ChannelId, PreviousStatus)
+    ],
     ChannelsToRemove = [
         ChannelId
-     || {ChannelId, _, NewStatus} <- AddedChannelsWithOldAndNewStatus,
+     || {ChannelId, NewStatus} <- CheckedChannels,
         not channel_status_is_channel_added(NewStatus)
     ],
-    Data1 = remove_channels_in_list(ChannelsToRemove, Data, true),
+    Data = remove_channels_in_list(ChannelsToRemove, Data1, true),
     %% Raise/clear alarms
     lists:foreach(
         fun
-            ({ID, _OldStatus, #{status := ?status_connected}}) ->
+            ({ID, #{status := ?status_connected}}) ->
                 _ = maybe_clear_alarm(ID);
-            ({ID, OldStatus, NewStatus}) ->
+            ({ID, NewStatus}) ->
+                OldStatus = maps:get(ID, PreviousStatus),
                 _ = maybe_alarm(NewStatus, ID, NewStatus, OldStatus)
         end,
-        AddedChannelsWithOldAndNewStatus
+        CheckedChannels
     ),
-    %% Update the ChannelsMap
-    ChannelsMap = Data1#data.added_channels,
-    NewChannelsMap =
-        lists:foldl(
-            fun({ChannelId, _, NewStatus}, Acc) ->
-                maps:put(ChannelId, NewStatus, Acc)
-            end,
-            ChannelsMap,
-            AddedChannelsWithOldAndNewStatus
-        ),
-    Data1#data{added_channels = NewChannelsMap}.
+    Data.
+
+-spec start_channel_health_check(data(), channel_id()) -> data().
+start_channel_health_check(#data{} = Data0, ChannelId) ->
+    #data{hc_workers = HCWorkers0 = #{channel := CHCWorkers0}} = Data0,
+    WorkerRef = {_Pid, _Ref} = spawn_channel_health_check_worker(Data0, ChannelId),
+    HCWorkers = HCWorkers0#{channel := CHCWorkers0#{WorkerRef => ChannelId}},
+    Data0#data{hc_workers = HCWorkers}.
+
+-spec spawn_channel_health_check_worker(data(), channel_id()) -> {pid(), reference()}.
+spawn_channel_health_check_worker(#data{} = Data, ChannelId) ->
+    spawn_monitor(?MODULE, worker_channel_health_check, [Data, ChannelId]).
+
+%% separated so it can be spec'ed and placate dialyzer tantrums...
+-spec worker_channel_health_check(data(), channel_id()) -> no_return().
+worker_channel_health_check(Data, ChannelId) ->
+    #data{id = ResId, mod = Mod, state = State} = Data,
+    RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State),
+    exit({ok, channel_status(RawStatus)}).
+
+-spec handle_channel_health_check_worker_down(
+    data(), {pid(), reference()}, {ok, channel_status_map()}
+) ->
+    gen_statem:event_handler_result(state(), data()).
+handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) ->
+    #data{
+        hc_workers = HCWorkers0 = #{channel := CHCWorkers0},
+        added_channels = AddedChannels0
+    } = Data0,
+    {ChannelId, CHCWorkers1} = maps:take(WorkerRef, CHCWorkers0),
+    case ExitResult of
+        {ok, NewStatus} ->
+            %% `emqx_resource:call_channel_health_check' catches all exceptions.
+            AddedChannels = maps:put(ChannelId, NewStatus, AddedChannels0)
+    end,
+    Data1 = Data0#data{added_channels = AddedChannels},
+    {Replies, Data2} = reply_pending_channel_health_check_callers(ChannelId, NewStatus, Data1),
+    case CHCWorkers1 of
+        #{pending := [NextChannelId | Rest]} ->
+            CHCWorkers = CHCWorkers1#{pending := Rest},
+            HCWorkers = HCWorkers0#{channel := CHCWorkers},
+            Data3 = Data2#data{hc_workers = HCWorkers},
+            Data = start_channel_health_check(Data3, NextChannelId),
+            {keep_state, update_state(Data, Data0), Replies};
+        #{pending := []} ->
+            HCWorkers = HCWorkers0#{channel := CHCWorkers1},
+            Data3 = Data2#data{hc_workers = HCWorkers},
+            Data = continue_channel_health_check_connected(Data3),
+            {keep_state, update_state(Data, Data0), Replies}
+    end.
+
+reply_pending_channel_health_check_callers(
+    ChannelId, Status, Data0 = #data{hc_pending_callers = Pending0}
+) ->
+    #{channel := CPending0} = Pending0,
+    Pending = maps:get(ChannelId, CPending0, []),
+    Actions = [{reply, From, Status} || From <- Pending],
+    CPending = maps:remove(ChannelId, CPending0),
+    Data = Data0#data{hc_pending_callers = Pending0#{channel := CPending}},
+    {Actions, Data}.
 
 get_config_for_channels(Data0, ChannelsWithoutConfig) ->
     ResId = Data0#data.id,
@@ -1181,8 +1305,14 @@ update_state(Data, _DataWas) ->
 
 remove_runtime_data(#data{} = Data0) ->
     Data0#data{
-        hc_workers = #{resource => #{}, channel => #{}},
-        hc_pending_callers = #{resource => [], channel => []}
+        hc_workers = #{
+            resource => #{},
+            channel => #{pending => [], previous_status => #{}}
+        },
+        hc_pending_callers = #{
+            resource => [],
+            channel => #{}
+        }
     }.
 
 health_check_interval(Opts) ->

+ 43 - 2
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -31,7 +31,12 @@
     on_query_async/4,
     on_batch_query/3,
     on_batch_query_async/4,
-    on_get_status/2
+    on_get_status/2,
+
+    on_add_channel/4,
+    on_remove_channel/3,
+    on_get_channels/1,
+    on_get_channel_status/3
 ]).
 
 -export([counter_loop/0, set_callback_mode/1]).
@@ -40,6 +45,7 @@
 -export([roots/0]).
 
 -define(CM_KEY, {?MODULE, callback_mode}).
+-define(PT_CHAN_KEY(CONN_RES_ID), {?MODULE, chans, CONN_RES_ID}).
 
 roots() ->
     [
@@ -71,12 +77,14 @@ on_start(InstId, #{name := Name} = Opts) ->
     {ok, Opts#{
         id => InstId,
         stop_error => StopError,
+        channels => #{},
         pid => spawn_counter_process(Name, Register)
     }}.
 
 on_stop(_InstId, #{stop_error := true}) ->
     {error, stop_error};
-on_stop(_InstId, #{pid := Pid}) ->
+on_stop(InstId, #{pid := Pid}) ->
+    persistent_term:erase(?PT_CHAN_KEY(InstId)),
     stop_counter_process(Pid).
 
 on_query(_InstId, get_state, State) ->
@@ -295,6 +303,31 @@ on_get_status(_InstId, #{pid := Pid}) ->
         false -> ?status_disconnected
     end.
 
+on_add_channel(ConnResId, ConnSt0, ChanId, ChanCfg) ->
+    ConnSt = emqx_utils_maps:deep_put([channels, ChanId], ConnSt0, ChanCfg),
+    do_add_channel(ConnResId, ChanId, ChanCfg),
+    {ok, ConnSt}.
+
+on_remove_channel(ConnResId, ConnSt0, ChanId) ->
+    ConnSt = emqx_utils_maps:deep_remove([channels, ChanId], ConnSt0),
+    do_remove_channel(ConnResId, ChanId),
+    {ok, ConnSt}.
+
+on_get_channels(ConnResId) ->
+    persistent_term:get(?PT_CHAN_KEY(ConnResId), []).
+
+on_get_channel_status(_ConnResId, ChanId, #{channels := Chans}) ->
+    case Chans of
+        #{ChanId := #{health_check_delay := Delay}} ->
+            ?tp(connector_demo_channel_health_check_delay, #{}),
+            timer:sleep(Delay),
+            ?status_connected;
+        #{ChanId := _ChanCfg} ->
+            ?status_connected;
+        #{} ->
+            ?status_disconnected
+    end.
+
 spawn_counter_process(Name, Register) ->
     Pid = spawn_link(?MODULE, counter_loop, []),
     true = maybe_register(Name, Pid, Register),
@@ -455,3 +488,11 @@ make_random_reply(N) ->
         3 ->
             {error, {unrecoverable_error, N}}
     end.
+
+do_add_channel(ConnResId, ChanId, ChanCfg) ->
+    Chans = persistent_term:get(?PT_CHAN_KEY(ConnResId), []),
+    persistent_term:put(?PT_CHAN_KEY(ConnResId), [{ChanId, ChanCfg} | Chans]).
+
+do_remove_channel(ConnResId, ChanId) ->
+    Chans = persistent_term:get(?PT_CHAN_KEY(ConnResId), []),
+    persistent_term:put(?PT_CHAN_KEY(ConnResId), proplists:delete(ChanId, Chans)).

+ 49 - 0
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -3141,6 +3141,55 @@ t_non_blocking_resource_health_check(_Config) ->
     ),
     ok.
 
+t_non_blocking_channel_health_check(_Config) ->
+    ?check_trace(
+        begin
+            {ok, _} =
+                create(
+                    ?ID,
+                    ?DEFAULT_RESOURCE_GROUP,
+                    ?TEST_RESOURCE,
+                    #{name => test_resource, health_check_error => {delay, 500}},
+                    #{health_check_interval => 100}
+                ),
+            ChanId = <<"chan">>,
+            ok =
+                emqx_resource_manager:add_channel(
+                    ?ID,
+                    ChanId,
+                    #{health_check_delay => 500}
+                ),
+
+            %% concurrently attempt to health check the resource; should do it only once
+            %% for all callers
+            NumCallers = 20,
+            Expected = lists:duplicate(
+                NumCallers,
+                #{error => undefined, status => connected}
+            ),
+            ?assertEqual(
+                Expected,
+                emqx_utils:pmap(
+                    fun(_) -> emqx_resource_manager:channel_health_check(?ID, ChanId) end,
+                    lists:seq(1, NumCallers)
+                )
+            ),
+
+            NumCallers
+        end,
+        [
+            log_consistency_prop(),
+            fun(NumCallers, Trace) ->
+                %% shouldn't have one health check per caller
+                SubTrace = ?of_kind(connector_demo_channel_health_check_delay, Trace),
+                ?assertMatch([_ | _], SubTrace),
+                ?assert(length(SubTrace) < (NumCallers div 2), #{trace => Trace}),
+                ok
+            end
+        ]
+    ),
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Helpers
 %%------------------------------------------------------------------------------

+ 1 - 0
changes/ce/fix-12830.en.md

@@ -0,0 +1 @@
+Made channel (action/source) health checks non-blocking operations.  This means that operations such as updating or removing an action/source data integration won't be blocked by a lengthy running health check.