Parcourir la source

Merge pull request #12812 from thalesmg/async-res-manager-health-check-m-20240328

feat(resource manager): perform non-blocking health checks
Thales Macedo Garitezi il y a 1 an
Parent
commit
069cd4fbb4

+ 34 - 9
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -825,22 +825,47 @@ do_start_stop_bridges(Type, Config) ->
     %% Connecting to this endpoint should always timeout
     BadServer = iolist_to_binary(io_lib:format("localhost:~B", [ListenPort])),
     BadName = <<"bad_", (atom_to_binary(Type))/binary>>,
+    CreateRes0 = request_json(
+        post,
+        uri(["bridges"]),
+        ?MQTT_BRIDGE(BadServer, BadName),
+        Config
+    ),
     ?assertMatch(
         {ok, 201, #{
             <<"type">> := ?BRIDGE_TYPE_MQTT,
             <<"name">> := BadName,
             <<"enable">> := true,
-            <<"server">> := BadServer,
-            <<"status">> := <<"connecting">>,
-            <<"node_status">> := [_ | _]
+            <<"server">> := BadServer
         }},
-        request_json(
-            post,
-            uri(["bridges"]),
-            ?MQTT_BRIDGE(BadServer, BadName),
-            Config
-        )
+        CreateRes0
     ),
+    {ok, 201, CreateRes1} = CreateRes0,
+    case CreateRes1 of
+        #{
+            <<"node_status">> := [
+                #{
+                    <<"status">> := <<"disconnected">>,
+                    <<"status_reason">> := <<"connack_timeout">>
+                },
+                #{<<"status">> := <<"connecting">>}
+                | _
+            ],
+            %% `inconsistent': one node is `?status_disconnected' (because it has already
+            %% timed out), the other node is `?status_connecting' (started later and
+            %% haven't timed out yet)
+            <<"status">> := <<"inconsistent">>,
+            <<"status_reason">> := <<"connack_timeout">>
+        } ->
+            ok;
+        #{
+            <<"node_status">> := [_],
+            <<"status">> := <<"connecting">>
+        } ->
+            ok;
+        _ ->
+            error({unexpected_result, CreateRes1})
+    end,
     BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName),
     ?assertMatch(
         %% request from product: return 400 on such errors

+ 1 - 1
apps/emqx_resource/src/emqx_resource.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_resource, [
     {description, "Manager for all external resources"},
-    {vsn, "0.1.28"},
+    {vsn, "0.1.29"},
     {registered, []},
     {mod, {emqx_resource_app, []}},
     {applications, [

+ 160 - 75
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -60,6 +60,9 @@
 % Behaviour
 -export([init/1, callback_mode/0, handle_event/4, terminate/3]).
 
+%% Internal exports.
+-export([worker_resource_health_check/1]).
+
 % State record
 -record(data, {
     id,
@@ -73,7 +76,15 @@
     state,
     error,
     pid,
-    added_channels,
+    added_channels = #{},
+    %% Reference to process performing resource health check.
+    hc_workers = #{resource => #{}, channel => #{}} :: #{
+        resource | channel := #{{pid(), reference()} => true}
+    },
+    %% Callers waiting on health check
+    hc_pending_callers = #{resource => [], channel => []} :: #{
+        resource | channel := [gen_server:from()]
+    },
     extra
 }).
 -type data() :: #data{}.
@@ -153,13 +164,13 @@ create(ResId, Group, ResourceType, Config, Opts) ->
     case SpawnBufferWorkers andalso lists:member(QueryMode, [sync, async]) of
         true ->
             %% start resource workers as the query type requires them
-            ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts),
-            case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
-                true ->
-                    wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT));
-                false ->
-                    ok
-            end;
+            ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts);
+        false ->
+            ok
+    end,
+    case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
+        true ->
+            wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT));
         false ->
             ok
     end.
@@ -455,7 +466,7 @@ handle_event({call, From}, {remove, ClearMetrics}, _State, Data) ->
 handle_event({call, From}, lookup, _State, #data{group = Group} = Data) ->
     Reply = {ok, Group, data_record_to_external_map(Data)},
     {keep_state_and_data, [{reply, From, Reply}]};
-% Called when doing a manually health check.
+% Called when doing a manual health check.
 handle_event({call, From}, health_check, ?state_stopped, _Data) ->
     Actions = [{reply, From, {error, resource_is_stopped}}],
     {keep_state_and_data, Actions};
@@ -463,9 +474,9 @@ handle_event({call, From}, {channel_health_check, _}, ?state_stopped, _Data) ->
     Actions = [{reply, From, {error, resource_is_stopped}}],
     {keep_state_and_data, Actions};
 handle_event({call, From}, health_check, _State, Data) ->
-    handle_manually_health_check(From, Data);
+    handle_manual_resource_health_check(From, Data);
 handle_event({call, From}, {channel_health_check, ChannelId}, _State, Data) ->
-    handle_manually_channel_health_check(From, Data, ChannelId);
+    handle_manual_channel_health_check(From, Data, ChannelId);
 % State: CONNECTING
 handle_event(enter, _OldState, ?state_connecting = State, Data) ->
     ok = log_status_consistency(State, Data),
@@ -473,7 +484,7 @@ handle_event(enter, _OldState, ?state_connecting = State, Data) ->
 handle_event(internal, start_resource, ?state_connecting, Data) ->
     start_resource(Data, undefined);
 handle_event(state_timeout, health_check, ?state_connecting, Data) ->
-    handle_connecting_health_check(Data);
+    start_resource_health_check(Data);
 handle_event(
     {call, From}, {remove_channel, ChannelId}, ?state_connecting = _State, Data
 ) ->
@@ -487,7 +498,7 @@ handle_event(enter, _OldState, ?state_connected = State, Data) ->
     ?tp(resource_connected_enter, #{}),
     {keep_state_and_data, health_check_actions(Data)};
 handle_event(state_timeout, health_check, ?state_connected, Data) ->
-    handle_connected_health_check(Data);
+    start_resource_health_check(Data);
 handle_event(
     {call, From}, {add_channel, ChannelId, Config}, ?state_connected = _State, Data
 ) ->
@@ -523,6 +534,15 @@ handle_event(
 ) ->
     Channels = emqx_resource:call_get_channels(Data#data.id, Data#data.mod),
     {keep_state_and_data, {reply, From, {ok, Channels}}};
+handle_event(
+    info,
+    {'DOWN', Ref, process, Pid, Res},
+    State0,
+    Data0 = #data{hc_workers = #{resource := HCWorkers}}
+) when
+    is_map_key({Pid, Ref}, HCWorkers)
+->
+    handle_resource_health_check_worker_down(State0, Data0, {Pid, Ref}, Res);
 % Ignore all other events
 handle_event(EventType, EventData, State, Data) ->
     ?SLOG(
@@ -835,18 +855,127 @@ handle_not_connected_and_not_connecting_remove_channel(From, ChannelId, Data) ->
     _ = maybe_clear_alarm(ChannelId),
     {keep_state, update_state(NewData, Data), [{reply, From, ok}]}.
 
-handle_manually_health_check(From, Data) ->
-    with_health_check(
-        Data,
-        fun(Status, UpdatedData) ->
-            Actions = [{reply, From, {ok, Status}}],
-            {next_state, Status, channels_health_check(Status, UpdatedData), Actions}
-        end
-    ).
+handle_manual_resource_health_check(From, Data0 = #data{hc_workers = #{resource := HCWorkers}}) when
+    map_size(HCWorkers) > 0
+->
+    %% ongoing health check
+    #data{hc_pending_callers = Pending0 = #{resource := RPending0}} = Data0,
+    Pending = Pending0#{resource := [From | RPending0]},
+    Data = Data0#data{hc_pending_callers = Pending},
+    {keep_state, Data};
+handle_manual_resource_health_check(From, Data0) ->
+    #data{hc_pending_callers = Pending0 = #{resource := RPending0}} = Data0,
+    Pending = Pending0#{resource := [From | RPending0]},
+    Data = Data0#data{hc_pending_callers = Pending},
+    start_resource_health_check(Data).
+
+reply_pending_health_check_callers(Status, resource, Data0 = #data{hc_pending_callers = Pending0}) ->
+    #{resource := RPending} = Pending0,
+    Actions = [{reply, From, {ok, Status}} || From <- RPending],
+    Data = Data0#data{hc_pending_callers = Pending0#{resource := []}},
+    {Actions, Data}.
+
+start_resource_health_check(#data{state = undefined} = Data) ->
+    %% No resource running, thus disconnected.
+    %% A health check spawn when state is undefined can only happen when someone manually
+    %% asks for a health check and the resource could not initialize or has not had enough
+    %% time to do so.  Let's assume the continuation is as if we were `?status_connecting'.
+    continue_resource_health_check_not_connected(?status_disconnected, Data);
+start_resource_health_check(#data{hc_workers = #{resource := HCWorkers}}) when
+    map_size(HCWorkers) > 0
+->
+    %% Already ongoing
+    keep_state_and_data;
+start_resource_health_check(#data{} = Data0) ->
+    #data{hc_workers = HCWorkers0 = #{resource := RHCWorkers0}} = Data0,
+    WorkerRef = {_Pid, _Ref} = spawn_health_check_worker(Data0),
+    HCWorkers = HCWorkers0#{resource := RHCWorkers0#{WorkerRef => true}},
+    Data = Data0#data{hc_workers = HCWorkers},
+    {keep_state, Data}.
+
+-spec spawn_health_check_worker(data()) -> {pid(), reference()}.
+spawn_health_check_worker(#data{} = Data) ->
+    spawn_monitor(?MODULE, worker_resource_health_check, [Data]).
+
+%% separated so it can be spec'ed and placate dialyzer tantrums...
+-spec worker_resource_health_check(data()) -> no_return().
+worker_resource_health_check(Data) ->
+    HCRes = emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state),
+    exit({ok, HCRes}).
+
+handle_resource_health_check_worker_down(CurrentState, Data0, WorkerRef, ExitResult) ->
+    #data{hc_workers = HCWorkers0 = #{resource := RHCWorkers0}} = Data0,
+    HCWorkers = HCWorkers0#{resource := maps:remove(WorkerRef, RHCWorkers0)},
+    Data1 = Data0#data{hc_workers = HCWorkers},
+    case ExitResult of
+        {ok, HCRes} ->
+            continue_with_health_check(Data1, CurrentState, HCRes);
+        _ ->
+            %% Unexpected: `emqx_resource:call_health_check' catches all exceptions.
+            continue_with_health_check(Data1, CurrentState, {error, ExitResult})
+    end.
 
-handle_manually_channel_health_check(From, #data{state = undefined}, _ChannelId) ->
+continue_with_health_check(#data{} = Data0, CurrentState, HCRes) ->
+    #data{
+        id = ResId,
+        error = PrevError
+    } = Data0,
+    {NewStatus, NewState, Err} = parse_health_check_result(HCRes, Data0),
+    _ = maybe_alarm(NewStatus, ResId, Err, PrevError),
+    ok = maybe_resume_resource_workers(ResId, NewStatus),
+    Data1 = Data0#data{
+        state = NewState, status = NewStatus, error = Err
+    },
+    Data = update_state(Data1, Data0),
+    case CurrentState of
+        ?state_connected ->
+            continue_resource_health_check_connected(NewStatus, Data);
+        _ ->
+            %% `?state_connecting' | `?state_disconnected' | `?state_stopped'
+            continue_resource_health_check_not_connected(NewStatus, Data)
+    end.
+
+%% Continuation to be used when the current resource state is `?state_connected'.
+continue_resource_health_check_connected(NewStatus, Data0) ->
+    case NewStatus of
+        ?status_connected ->
+            {Replies, Data1} = reply_pending_health_check_callers(NewStatus, resource, Data0),
+            Data2 = channels_health_check(?status_connected, Data1),
+            Data = update_state(Data2, Data0),
+            Actions = Replies ++ health_check_actions(Data),
+            {keep_state, Data, Actions};
+        _ ->
+            ?SLOG(warning, #{
+                msg => "health_check_failed",
+                id => Data0#data.id,
+                status => NewStatus
+            }),
+            %% Note: works because, coincidentally, channel/resource status is a
+            %% subset of resource manager state...  But there should be a conversion
+            %% between the two here, as resource manager also has `stopped', which is
+            %% not a valid status at the time of writing.
+            {Replies, Data} = reply_pending_health_check_callers(NewStatus, resource, Data0),
+            {next_state, NewStatus, channels_health_check(NewStatus, Data), Replies}
+    end.
+
+%% Continuation to be used when the current resource state is not `?state_connected'.
+continue_resource_health_check_not_connected(NewStatus, Data0) ->
+    {Replies, Data} = reply_pending_health_check_callers(NewStatus, resource, Data0),
+    case NewStatus of
+        ?status_connected ->
+            {next_state, ?state_connected, channels_health_check(?status_connected, Data), Replies};
+        ?status_connecting ->
+            Actions = Replies ++ health_check_actions(Data),
+            {next_state, ?status_connecting, channels_health_check(?status_connecting, Data),
+                Actions};
+        ?status_disconnected ->
+            {next_state, ?state_disconnected, channels_health_check(?status_disconnected, Data),
+                Replies}
+    end.
+
+handle_manual_channel_health_check(From, #data{state = undefined}, _ChannelId) ->
     {keep_state_and_data, [{reply, From, channel_status({error, resource_disconnected})}]};
-handle_manually_channel_health_check(
+handle_manual_channel_health_check(
     From,
     #data{added_channels = Channels} = _Data,
     ChannelId
@@ -854,7 +983,7 @@ handle_manually_channel_health_check(
     is_map_key(ChannelId, Channels)
 ->
     {keep_state_and_data, [{reply, From, maps:get(ChannelId, Channels)}]};
-handle_manually_channel_health_check(
+handle_manual_channel_health_check(
     From,
     _Data,
     _ChannelId
@@ -865,56 +994,6 @@ get_channel_status_channel_added(#data{id = ResId, mod = Mod, state = State}, Ch
     RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State),
     channel_status(RawStatus).
 
-handle_connecting_health_check(Data) ->
-    with_health_check(
-        Data,
-        fun
-            (?status_connected, UpdatedData) ->
-                {next_state, ?state_connected,
-                    channels_health_check(?status_connected, UpdatedData)};
-            (?status_connecting, UpdatedData) ->
-                {keep_state, channels_health_check(?status_connecting, UpdatedData),
-                    health_check_actions(UpdatedData)};
-            (?status_disconnected, UpdatedData) ->
-                {next_state, ?state_disconnected,
-                    channels_health_check(?status_disconnected, UpdatedData)}
-        end
-    ).
-
-handle_connected_health_check(Data) ->
-    with_health_check(
-        Data,
-        fun
-            (?status_connected, UpdatedData0) ->
-                UpdatedData1 = channels_health_check(?status_connected, UpdatedData0),
-                {keep_state, UpdatedData1, health_check_actions(UpdatedData1)};
-            (Status, UpdatedData) ->
-                ?SLOG(warning, #{
-                    msg => "health_check_failed",
-                    id => Data#data.id,
-                    status => Status
-                }),
-                %% Note: works because, coincidentally, channel/resource status is a
-                %% subset of resource manager state...  But there should be a conversion
-                %% between the two here, as resource manager also has `stopped', which is
-                %% not a valid status at the time of writing.
-                {next_state, Status, channels_health_check(Status, UpdatedData)}
-        end
-    ).
-
-with_health_check(#data{state = undefined} = Data, Func) ->
-    Func(disconnected, Data);
-with_health_check(#data{error = PrevError} = Data, Func) ->
-    ResId = Data#data.id,
-    HCRes = emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state),
-    {Status, NewState, Err} = parse_health_check_result(HCRes, Data),
-    _ = maybe_alarm(Status, ResId, Err, PrevError),
-    ok = maybe_resume_resource_workers(ResId, Status),
-    UpdatedData = Data#data{
-        state = NewState, status = Status, error = Err
-    },
-    Func(Status, update_state(UpdatedData, Data)).
-
 -spec channels_health_check(resource_status(), data()) -> data().
 channels_health_check(?status_connected = _ConnectorStatus, Data0) ->
     Channels = maps:to_list(Data0#data.added_channels),
@@ -1097,9 +1176,15 @@ update_state(Data) ->
 update_state(DataWas, DataWas) ->
     DataWas;
 update_state(Data, _DataWas) ->
-    _ = insert_cache(Data#data.id, Data),
+    _ = insert_cache(Data#data.id, remove_runtime_data(Data)),
     Data.
 
+remove_runtime_data(#data{} = Data0) ->
+    Data0#data{
+        hc_workers = #{resource => #{}, channel => #{}},
+        hc_pending_callers = #{resource => [], channel => []}
+    }.
+
 health_check_interval(Opts) ->
     maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL).
 

+ 12 - 4
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -18,6 +18,7 @@
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 
 -behaviour(emqx_resource).
 
@@ -276,15 +277,22 @@ batch_individual_reply({async, ReplyFunAndArgs}, InstId, Batch, State) ->
 
 on_get_status(_InstId, #{health_check_error := true}) ->
     ?tp(connector_demo_health_check_error, #{}),
-    disconnected;
+    ?status_disconnected;
 on_get_status(_InstId, State = #{health_check_error := {msg, Message}}) ->
     ?tp(connector_demo_health_check_error, #{}),
-    {disconnected, State, Message};
+    {?status_disconnected, State, Message};
+on_get_status(_InstId, #{pid := Pid, health_check_error := {delay, Delay}}) ->
+    ?tp(connector_demo_health_check_delay, #{}),
+    timer:sleep(Delay),
+    case is_process_alive(Pid) of
+        true -> ?status_connected;
+        false -> ?status_disconnected
+    end;
 on_get_status(_InstId, #{pid := Pid}) ->
     timer:sleep(300),
     case is_process_alive(Pid) of
-        true -> connected;
-        false -> disconnected
+        true -> ?status_connected;
+        false -> ?status_disconnected
     end.
 
 spawn_counter_process(Name, Register) ->

+ 70 - 38
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -52,12 +52,20 @@ end_per_testcase(_, _Config) ->
 
 init_per_suite(Config) ->
     code:ensure_loaded(?TEST_RESOURCE),
-    ok = emqx_common_test_helpers:start_apps([emqx_conf]),
-    {ok, _} = application:ensure_all_started(emqx_resource),
-    Config.
+    Apps = emqx_cth_suite:start(
+        [
+            emqx,
+            emqx_conf,
+            emqx_resource
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    [{apps, Apps} | Config].
 
-end_per_suite(_Config) ->
-    ok = emqx_common_test_helpers:stop_apps([emqx_resource, emqx_conf]).
+end_per_suite(Config) ->
+    Apps = proplists:get_value(apps, Config),
+    emqx_cth_suite:stop(Apps),
+    ok.
 
 %%------------------------------------------------------------------------------
 %% Tests
@@ -115,10 +123,7 @@ t_create_remove(_) ->
 
             ?assertNot(is_process_alive(Pid))
         end,
-        fun(Trace) ->
-            ?assertEqual([], ?of_kind("inconsistent_status", Trace)),
-            ?assertEqual([], ?of_kind("inconsistent_cache", Trace))
-        end
+        [log_consistency_prop()]
     ).
 
 t_create_remove_local(_) ->
@@ -174,10 +179,7 @@ t_create_remove_local(_) ->
 
             ?assertNot(is_process_alive(Pid))
         end,
-        fun(Trace) ->
-            ?assertEqual([], ?of_kind("inconsistent_status", Trace)),
-            ?assertEqual([], ?of_kind("inconsistent_cache", Trace))
-        end
+        [log_consistency_prop()]
     ).
 
 t_do_not_start_after_created(_) ->
@@ -219,10 +221,7 @@ t_do_not_start_after_created(_) ->
 
             ?assertNot(is_process_alive(Pid2))
         end,
-        fun(Trace) ->
-            ?assertEqual([], ?of_kind("inconsistent_status", Trace)),
-            ?assertEqual([], ?of_kind("inconsistent_cache", Trace))
-        end
+        [log_consistency_prop()]
     ).
 
 t_query(_) ->
@@ -855,14 +854,12 @@ t_healthy_timeout(_) ->
             ),
             ?assertEqual(ok, emqx_resource:remove_local(?ID))
         end,
-        fun(Trace) ->
-            ?assertEqual([], ?of_kind("inconsistent_status", Trace)),
-            ?assertEqual([], ?of_kind("inconsistent_cache", Trace))
-        end
+        [log_consistency_prop()]
     ).
 
 t_healthy(_) ->
     ?check_trace(
+        #{timetrap => 10_000},
         begin
             ?assertMatch(
                 {ok, _},
@@ -873,10 +870,13 @@ t_healthy(_) ->
                     #{name => test_resource}
                 )
             ),
+            ct:pal("getting state"),
             {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
             timer:sleep(300),
+            ct:pal("setting state as `connecting`"),
             emqx_resource:set_resource_status_connecting(?ID),
 
+            ct:pal("health check"),
             ?assertEqual({ok, connected}, emqx_resource:health_check(?ID)),
             ?assertMatch(
                 [#{status := connected}],
@@ -894,10 +894,7 @@ t_healthy(_) ->
 
             ?assertEqual(ok, emqx_resource:remove_local(?ID))
         end,
-        fun(Trace) ->
-            ?assertEqual([], ?of_kind("inconsistent_status", Trace)),
-            ?assertEqual([], ?of_kind("inconsistent_cache", Trace))
-        end
+        [log_consistency_prop()]
     ).
 
 t_unhealthy_target(_) ->
@@ -1005,11 +1002,7 @@ t_stop_start(_) ->
             ?assertEqual(ok, emqx_resource:stop(?ID)),
             ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID))
         end,
-
-        fun(Trace) ->
-            ?assertEqual([], ?of_kind("inconsistent_status", Trace)),
-            ?assertEqual([], ?of_kind("inconsistent_cache", Trace))
-        end
+        [log_consistency_prop()]
     ).
 
 t_stop_start_local(_) ->
@@ -1064,10 +1057,7 @@ t_stop_start_local(_) ->
 
             ?assert(is_process_alive(Pid1))
         end,
-        fun(Trace) ->
-            ?assertEqual([], ?of_kind("inconsistent_status", Trace)),
-            ?assertEqual([], ?of_kind("inconsistent_cache", Trace))
-        end
+        [log_consistency_prop()]
     ).
 
 t_list_filter(_) ->
@@ -1269,10 +1259,7 @@ t_health_check_disconnected(_) ->
                 emqx_resource:health_check(?ID)
             )
         end,
-        fun(Trace) ->
-            ?assertEqual([], ?of_kind("inconsistent_status", Trace)),
-            ?assertEqual([], ?of_kind("inconsistent_cache", Trace))
-        end
+        [log_consistency_prop()]
     ).
 
 t_unblock_only_required_buffer_workers(_) ->
@@ -3116,6 +3103,44 @@ t_telemetry_handler_crash(_Config) ->
     ),
     ok.
 
+t_non_blocking_resource_health_check(_Config) ->
+    ?check_trace(
+        begin
+            {ok, _} =
+                create(
+                    ?ID,
+                    ?DEFAULT_RESOURCE_GROUP,
+                    ?TEST_RESOURCE,
+                    #{name => test_resource, health_check_error => {delay, 1_000}},
+                    #{health_check_interval => 100}
+                ),
+            %% concurrently attempt to health check the resource; should do it only once
+            %% for all callers
+            NumCallers = 20,
+            Expected = lists:duplicate(NumCallers, {ok, connected}),
+            ?assertEqual(
+                Expected,
+                emqx_utils:pmap(
+                    fun(_) -> emqx_resource:health_check(?ID) end,
+                    lists:seq(1, NumCallers)
+                )
+            ),
+
+            NumCallers
+        end,
+        [
+            log_consistency_prop(),
+            fun(NumCallers, Trace) ->
+                %% shouldn't have one health check per caller
+                SubTrace = ?of_kind(connector_demo_health_check_delay, Trace),
+                ?assertMatch([_ | _], SubTrace),
+                ?assert(length(SubTrace) < (NumCallers div 2), #{trace => Trace}),
+                ok
+            end
+        ]
+    ),
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Helpers
 %%------------------------------------------------------------------------------
@@ -3373,3 +3398,10 @@ create(Id, Group, Type, Config) ->
 
 create(Id, Group, Type, Config, Opts) ->
     emqx_resource:create_local(Id, Group, Type, Config, Opts).
+
+log_consistency_prop() ->
+    {"check state and cache consistency", fun ?MODULE:log_consistency_prop/1}.
+log_consistency_prop(Trace) ->
+    ?assertEqual([], ?of_kind("inconsistent_status", Trace)),
+    ?assertEqual([], ?of_kind("inconsistent_cache", Trace)),
+    ok.

+ 13 - 2
apps/emqx_utils/src/emqx_utils_redact.erl

@@ -65,8 +65,11 @@ redact(Term, Checker) ->
 redact_headers(Term) ->
     do_redact_headers(Term).
 
-do_redact(L, Checker) when is_list(L) ->
-    lists:map(fun(E) -> do_redact(E, Checker) end, L);
+do_redact([], _Checker) ->
+    [];
+do_redact([X | Xs], Checker) ->
+    %% Note: we could be dealing with an improper list
+    [do_redact(X, Checker) | do_redact(Xs, Checker)];
 do_redact(M, Checker) when is_map(M) ->
     maps:map(
         fun(K, V) ->
@@ -252,6 +255,14 @@ redact2_test_() ->
     Keys = [secret, passcode],
     [{case_name(atom, Key), fun() -> Case(Key, Checker) end} || Key <- Keys].
 
+redact_improper_list_test_() ->
+    %% improper lists: check that we don't crash
+    %% may arise when we redact process states with pending `gen' requests
+    [
+        ?_assertEqual([alias | foo], redact([alias | foo])),
+        ?_assertEqual([1, 2 | foo], redact([1, 2 | foo]))
+    ].
+
 deobfuscate_test() ->
     NewConf0 = #{foo => <<"bar0">>, password => <<"123456">>},
     ?assertEqual(NewConf0, deobfuscate(NewConf0, #{foo => <<"bar">>, password => <<"654321">>})),

+ 1 - 0
changes/ce/fix-12812.en.md

@@ -0,0 +1 @@
+Made resource health checks non-blocking operations.  This means that operations such as updating or removing a resource won't be blocked by a lengthy running health check.