Explorar o código

fix(resource manager): disentangle connector and channel health check frequencies

Fixes https://emqx.atlassian.net/browse/EMQX-12674
Thales Macedo Garitezi hai 1 ano
pai
achega
9f8a1885a7
Modificáronse 2 ficheiros con 123 adicións e 60 borrados
  1. 122 60
      apps/emqx_resource/src/emqx_resource_manager.erl
  2. 1 0
      changes/ce/fix-13442.en.md

+ 122 - 60
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -143,6 +143,15 @@
     perform_health_check => boolean()
 }.
 
+%% calls/casts/generic timeouts
+-record(add_channel, {channel_id :: channel_id(), config :: map()}).
+-record(start_channel_health_check, {channel_id :: channel_id()}).
+
+-type generic_timeout(Id, Content) :: {{timeout, Id}, timeout(), Content}.
+-type start_channel_health_check_action() :: generic_timeout(
+    #start_channel_health_check{}, #start_channel_health_check{}
+).
+
 %%------------------------------------------------------------------------------
 %% API
 %%------------------------------------------------------------------------------
@@ -405,7 +414,7 @@ add_channel(ResId, ChannelId, Config) ->
 ) ->
     ok | {error, term()}.
 add_channel(ResId, ChannelId, Config, Opts) ->
-    Result = safe_call(ResId, {add_channel, ChannelId, Config}, ?T_OPERATION),
+    Result = safe_call(ResId, #add_channel{channel_id = ChannelId, config = Config}, ?T_OPERATION),
     maybe
         true ?= maps:get(perform_health_check, Opts, true),
         %% Wait for health_check to finish
@@ -570,7 +579,9 @@ handle_event({call, From}, health_check, _State, Data) ->
     handle_manual_resource_health_check(From, Data);
 handle_event({call, From}, {channel_health_check, ChannelId}, _State, Data) ->
     handle_manual_channel_health_check(From, Data, ChannelId);
-% State: CONNECTING
+%%--------------------------
+%% State: CONNECTING
+%%--------------------------
 handle_event(enter, _OldState, ?state_connecting = State, Data) ->
     ok = log_status_consistency(State, Data),
     {keep_state_and_data, [{state_timeout, 0, health_check}]};
@@ -582,25 +593,39 @@ handle_event(
     {call, From}, {remove_channel, ChannelId}, ?state_connecting = _State, Data
 ) ->
     handle_remove_channel(From, ChannelId, Data);
+%%--------------------------
 %% State: CONNECTED
 %% The connected state is entered after a successful on_start/2 of the callback mod
 %% and successful health_checks
+%%--------------------------
 handle_event(enter, _OldState, ?state_connected = State, Data) ->
     ok = log_status_consistency(State, Data),
     _ = emqx_alarm:safe_deactivate(Data#data.id),
     ?tp(resource_connected_enter, #{}),
-    {keep_state_and_data, health_check_actions(Data)};
+    {keep_state_and_data, resource_health_check_actions(Data)};
 handle_event(state_timeout, health_check, ?state_connected, Data) ->
     start_resource_health_check(Data);
 handle_event(
-    {call, From}, {add_channel, ChannelId, Config}, ?state_connected = _State, Data
+    {call, From},
+    #add_channel{channel_id = ChannelId, config = Config},
+    ?state_connected = _State,
+    Data
 ) ->
     handle_add_channel(From, Data, ChannelId, Config);
 handle_event(
     {call, From}, {remove_channel, ChannelId}, ?state_connected = _State, Data
 ) ->
     handle_remove_channel(From, ChannelId, Data);
+handle_event(
+    {timeout, #start_channel_health_check{channel_id = ChannelId}},
+    _,
+    ?state_connected = _State,
+    Data
+) ->
+    handle_start_channel_health_check(Data, ChannelId);
+%%--------------------------
 %% State: DISCONNECTED
+%%--------------------------
 handle_event(enter, _OldState, ?state_disconnected = State, Data) ->
     ok = log_status_consistency(State, Data),
     ?tp(resource_disconnected_enter, #{}),
@@ -608,14 +633,18 @@ handle_event(enter, _OldState, ?state_disconnected = State, Data) ->
 handle_event(state_timeout, auto_retry, ?state_disconnected, Data) ->
     ?tp(resource_auto_reconnect, #{}),
     start_resource(Data, undefined);
+%%--------------------------
 %% State: STOPPED
 %% The stopped state is entered after the resource has been explicitly stopped
+%%--------------------------
 handle_event(enter, _OldState, ?state_stopped = State, Data) ->
     ok = log_status_consistency(State, Data),
     {keep_state_and_data, []};
+%%--------------------------
 %% The following events can be handled in any other state
+%%--------------------------
 handle_event(
-    {call, From}, {add_channel, ChannelId, Config}, State, Data
+    {call, From}, #add_channel{channel_id = ChannelId, config = Config}, State, Data
 ) ->
     handle_not_connected_add_channel(From, ChannelId, Config, State, Data);
 handle_event(
@@ -645,6 +674,9 @@ handle_event(
     is_map_key(Pid, CHCWorkers)
 ->
     handle_channel_health_check_worker_down(Data0, Pid, Res);
+handle_event({timeout, #start_channel_health_check{channel_id = _}}, _, _State, _Data) ->
+    %% Stale health check action; currently, we only probe channel health when connected.
+    keep_state_and_data;
 % Ignore all other events
 handle_event(EventType, EventData, State, Data) ->
     ?SLOG(
@@ -702,7 +734,7 @@ retry_actions(Data) ->
             [{state_timeout, RetryInterval, auto_retry}]
     end.
 
-health_check_actions(Data) ->
+resource_health_check_actions(Data) ->
     [{state_timeout, health_check_interval(Data#data.opts), health_check}].
 
 handle_remove_event(From, ClearMetrics, Data) ->
@@ -1079,7 +1111,7 @@ continue_resource_health_check_connected(NewStatus, Data0) ->
             {Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0),
             Data2 = channels_health_check(?status_connected, Data1),
             Data = update_state(Data2, Data0),
-            Actions = Replies ++ health_check_actions(Data),
+            Actions = Replies ++ resource_health_check_actions(Data),
             {keep_state, Data, Actions};
         _ ->
             ?SLOG(warning, #{
@@ -1091,23 +1123,28 @@ continue_resource_health_check_connected(NewStatus, Data0) ->
             %% subset of resource manager state...  But there should be a conversion
             %% between the two here, as resource manager also has `stopped', which is
             %% not a valid status at the time of writing.
-            {Replies, Data} = reply_pending_resource_health_check_callers(NewStatus, Data0),
-            {next_state, NewStatus, channels_health_check(NewStatus, Data), Replies}
+            {Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0),
+            Data = channels_health_check(NewStatus, Data1),
+            Actions = Replies,
+            {next_state, NewStatus, Data, Actions}
     end.
 
 %% Continuation to be used when the current resource state is not `?state_connected'.
 continue_resource_health_check_not_connected(NewStatus, Data0) ->
-    {Replies, Data} = reply_pending_resource_health_check_callers(NewStatus, Data0),
+    {Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0),
     case NewStatus of
         ?status_connected ->
-            {next_state, ?state_connected, channels_health_check(?status_connected, Data), Replies};
+            Data = channels_health_check(?status_connected, Data1),
+            Actions = Replies,
+            {next_state, ?state_connected, Data, Actions};
         ?status_connecting ->
-            Actions = Replies ++ health_check_actions(Data),
-            {next_state, ?status_connecting, channels_health_check(?status_connecting, Data),
-                Actions};
+            Data = channels_health_check(?status_connecting, Data1),
+            Actions = Replies ++ resource_health_check_actions(Data),
+            {next_state, ?status_connecting, Data, Actions};
         ?status_disconnected ->
-            {next_state, ?state_disconnected, channels_health_check(?status_disconnected, Data),
-                Replies}
+            Data = channels_health_check(?status_disconnected, Data1),
+            Actions = Replies,
+            {next_state, ?state_disconnected, Data, Actions}
     end.
 
 handle_manual_channel_health_check(From, #data{state = undefined}, _ChannelId) ->
@@ -1269,38 +1306,60 @@ resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1) ->
         )
     ).
 
+-spec generic_timeout_action(Id, timeout(), Content) -> generic_timeout(Id, Content).
+generic_timeout_action(Id, Timeout, Content) ->
+    {{timeout, Id}, Timeout, Content}.
+
+-spec start_channel_health_check_action(channel_id(), map(), map(), data() | timeout()) ->
+    [start_channel_health_check_action()].
+start_channel_health_check_action(ChannelId, NewChanStatus, PreviousChanStatus, Data = #data{}) ->
+    Timeout = get_channel_health_check_interval(ChannelId, NewChanStatus, PreviousChanStatus, Data),
+    Event = #start_channel_health_check{channel_id = ChannelId},
+    [generic_timeout_action(Event, Timeout, Event)].
+
+get_channel_health_check_interval(ChannelId, NewChanStatus, PreviousChanStatus, Data) ->
+    emqx_utils:foldl_while(
+        fun
+            (#{config := #{resource_opts := #{health_check_interval := HCInterval}}}, _Acc) ->
+                {halt, HCInterval};
+            (_, Acc) ->
+                {cont, Acc}
+        end,
+        ?HEALTHCHECK_INTERVAL,
+        [
+            NewChanStatus,
+            PreviousChanStatus,
+            maps:get(ChannelId, Data#data.added_channels, #{})
+        ]
+    ).
+
 %% Currently, we only call resource channel health checks when the underlying resource is
 %% `?status_connected'.
 -spec trigger_health_check_for_added_channels(data()) -> data().
 trigger_health_check_for_added_channels(Data0 = #data{hc_workers = HCWorkers0}) ->
     #{
-        channel := CHCWorkers0 =
+        channel :=
             #{
-                pending := CPending0,
+                %% TODO: rm pending
+                %% pending := CPending0,
                 ongoing := Ongoing0
             }
     } = HCWorkers0,
     NewOngoing = maps:filter(
         fun(ChannelId, OldStatus) ->
-            not is_map_key(ChannelId, Ongoing0) and
+            (not is_map_key(ChannelId, Ongoing0)) andalso
                 channel_status_is_channel_added(OldStatus)
         end,
         Data0#data.added_channels
     ),
     ChannelsToCheck = maps:keys(NewOngoing),
-    case ChannelsToCheck of
-        [] ->
-            %% Nothing to do.
-            Data0;
-        [ChannelId | Rest] ->
-            %% Shooting one check at a time.  We could increase concurrency in the future.
-            CHCWorkers = CHCWorkers0#{
-                pending := CPending0 ++ Rest,
-                ongoing := maps:merge(Ongoing0, NewOngoing)
-            },
-            Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}},
-            start_channel_health_check(Data1, ChannelId)
-    end.
+    lists:foldl(
+        fun(ChannelId, Acc) ->
+            start_channel_health_check(Acc, ChannelId)
+        end,
+        Data0,
+        ChannelsToCheck
+    ).
 
 -spec continue_channel_health_check_connected(
     channel_id(), channel_status_map(), channel_status_map(), data()
@@ -1338,12 +1397,29 @@ continue_channel_health_check_connected_no_update_during_check(ChannelId, OldSta
     end,
     Data.
 
+-spec handle_start_channel_health_check(data(), channel_id()) ->
+    gen_statem:event_handler_result(state(), data()).
+handle_start_channel_health_check(Data0, ChannelId) ->
+    Data = start_channel_health_check(Data0, ChannelId),
+    {keep_state, Data}.
+
 -spec start_channel_health_check(data(), channel_id()) -> data().
-start_channel_health_check(#data{} = Data0, ChannelId) ->
+start_channel_health_check(
+    #data{added_channels = AddedChannels, hc_workers = #{channel := #{ongoing := CHCOngoing0}}} =
+        Data0,
+    ChannelId
+) when
+    is_map_key(ChannelId, AddedChannels) andalso (not is_map_key(ChannelId, CHCOngoing0))
+->
     #data{hc_workers = HCWorkers0 = #{channel := CHCWorkers0}} = Data0,
     WorkerPid = spawn_channel_health_check_worker(Data0, ChannelId),
-    HCWorkers = HCWorkers0#{channel := CHCWorkers0#{WorkerPid => ChannelId}},
-    Data0#data{hc_workers = HCWorkers}.
+    ChannelStatus = maps:get(ChannelId, AddedChannels),
+    CHCOngoing = CHCOngoing0#{ChannelId => ChannelStatus},
+    CHCWorkers = CHCWorkers0#{WorkerPid => ChannelId, ongoing := CHCOngoing},
+    HCWorkers = HCWorkers0#{channel := CHCWorkers},
+    Data0#data{hc_workers = HCWorkers};
+start_channel_health_check(Data, _ChannelId) ->
+    Data.
 
 -spec spawn_channel_health_check_worker(data(), channel_id()) -> pid().
 spawn_channel_health_check_worker(#data{} = Data, ChannelId) ->
@@ -1380,33 +1456,19 @@ handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) ->
     #{ongoing := Ongoing0} = CHCWorkers1,
     {PreviousChanStatus, Ongoing1} = maps:take(ChannelId, Ongoing0),
     CHCWorkers2 = CHCWorkers1#{ongoing := Ongoing1},
-    CHCWorkers3 = emqx_utils_maps:deep_remove([ongoing, ChannelId], CHCWorkers2),
     Data1 = Data0#data{added_channels = AddedChannels},
     {Replies, Data2} = reply_pending_channel_health_check_callers(ChannelId, NewStatus, Data1),
-    case CHCWorkers1 of
-        #{pending := [NextChannelId | Rest]} ->
-            CHCWorkers = CHCWorkers3#{pending := Rest},
-            HCWorkers = HCWorkers0#{channel := CHCWorkers},
-            Data3 = Data2#data{hc_workers = HCWorkers},
-            Data4 = continue_channel_health_check_connected(
-                ChannelId,
-                PreviousChanStatus,
-                CurrentStatus,
-                Data3
-            ),
-            Data = start_channel_health_check(Data4, NextChannelId),
-            {keep_state, update_state(Data, Data0), Replies};
-        #{pending := []} ->
-            HCWorkers = HCWorkers0#{channel := CHCWorkers3},
-            Data3 = Data2#data{hc_workers = HCWorkers},
-            Data = continue_channel_health_check_connected(
-                ChannelId,
-                PreviousChanStatus,
-                CurrentStatus,
-                Data3
-            ),
-            {keep_state, update_state(Data, Data0), Replies}
-    end.
+    HCWorkers = HCWorkers0#{channel := CHCWorkers2},
+    Data3 = Data2#data{hc_workers = HCWorkers},
+    Data = continue_channel_health_check_connected(
+        ChannelId,
+        PreviousChanStatus,
+        CurrentStatus,
+        Data3
+    ),
+    CHCActions = start_channel_health_check_action(ChannelId, NewStatus, PreviousChanStatus, Data),
+    Actions = Replies ++ CHCActions,
+    {keep_state, update_state(Data, Data0), Actions}.
 
 handle_channel_health_check_worker_down_new_channels_and_status(
     ChannelId,

+ 1 - 0
changes/ce/fix-13442.en.md

@@ -0,0 +1 @@
+Fixed an issue where the health check interval values of actions/sources were not being taken into account.