Просмотр исходного кода

fix(resource manager): handle resource and channel concurrent results race condition

Fixes https://emqx.atlassian.net/browse/EMQX-13284

0) Connector and channel are initially healthy and in resource manager state.
1) Connector and channel HCs fire concurrently.
2) Connector HC returns `disconnected`.  This makes manager call `on_remove_channel` for
   each channel, removing them from the connector state.
3) Channel HC returns `connected`.
4) A new connector HC returns `connected`.
5) Should contain action both in connector state and in resource manager's
`added_channels`.  Original bug: connector state didn't contain the channel state, but
`added_channels` did.
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
6b85bbf44b

+ 1 - 1
apps/emqx_resource/src/emqx_resource.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_resource, [
     {description, "Manager for all external resources"},
-    {vsn, "0.1.34"},
+    {vsn, "0.1.35"},
     {registered, []},
     {mod, {emqx_resource_app, []}},
     {applications, [

+ 60 - 32
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -90,15 +90,13 @@
     hc_workers = #{
         resource => #{},
         channel => #{
-            ongoing => #{},
-            pending => []
+            ongoing => #{}
         }
     } :: #{
-        resource := #{{pid(), reference()} => true},
+        resource := #{pid() => true},
         channel := #{
-            {pid(), reference()} => channel_id(),
-            ongoing := #{channel_id() => channel_status_map()},
-            pending := [channel_id()]
+            pid() => channel_id(),
+            ongoing := #{channel_id() => channel_status_map()}
         }
     },
     %% Callers waiting on health check
@@ -662,10 +660,11 @@ handle_event(
 %%--------------------------
 %% State: DISCONNECTED
 %%--------------------------
-handle_event(enter, _OldState, ?state_disconnected = State, Data) ->
-    ok = log_status_consistency(State, Data),
+handle_event(enter, _OldState, ?state_disconnected = State, Data0) ->
+    ok = log_status_consistency(State, Data0),
     ?tp(resource_disconnected_enter, #{}),
-    {keep_state_and_data, retry_actions(Data)};
+    Data = handle_abort_all_channel_health_checks(Data0),
+    {keep_state, Data, retry_actions(Data)};
 handle_event(state_timeout, auto_retry, ?state_disconnected, Data) ->
     ?tp(resource_auto_reconnect, #{}),
     start_resource(Data, undefined);
@@ -792,7 +791,6 @@ start_resource(Data, From) ->
             %% Perform an initial health_check immediately before transitioning into a connected state
             UpdatedData2 = add_channels(UpdatedData1),
             UpdatedData3 = maybe_update_callback_mode(UpdatedData2),
-
             Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok),
             {next_state, ?state_connecting, update_state(UpdatedData3), Actions};
         {error, Reason} = Err ->
@@ -869,8 +867,7 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
             NewData = Data#data{
                 state = NewState,
                 added_channels = NewAddedChannelsMap
-            },
-            add_channels_in_list(Rest, NewData);
+            };
         {error, Reason} = Error ->
             IsDryRun = emqx_resource:is_dry_run(ResId),
             ?SLOG(
@@ -883,7 +880,6 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
                 },
                 #{tag => tag(Group, Type)}
             ),
-            AddedChannelsMap = Data#data.added_channels,
             NewAddedChannelsMap = maps:put(
                 ChannelID,
                 channel_status(Error, ChannelConfig),
@@ -893,9 +889,9 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
                 added_channels = NewAddedChannelsMap
             },
             %% Raise an alarm since the channel could not be added
-            _ = maybe_alarm(?status_disconnected, IsDryRun, ChannelID, Error, no_prev_error),
-            add_channels_in_list(Rest, NewData)
-    end.
+            _ = maybe_alarm(?status_disconnected, IsDryRun, ChannelID, Error, no_prev_error)
+    end,
+    add_channels_in_list(Rest, NewData).
 
 maybe_stop_resource(#data{status = Status} = Data) when Status =/= ?rm_status_stopped ->
     stop_resource(Data);
@@ -951,8 +947,7 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
             NewData = Data#data{
                 state = NewState,
                 added_channels = NewAddedChannelsMap
-            },
-            remove_channels_in_list(Rest, NewData, KeepInChannelMap);
+            };
         {error, Reason} ->
             ?SLOG(
                 log_level(IsDryRun),
@@ -968,9 +963,9 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
             ),
             NewData = Data#data{
                 added_channels = NewAddedChannelsMap
-            },
-            remove_channels_in_list(Rest, NewData, KeepInChannelMap)
-    end.
+            }
+    end,
+    remove_channels_in_list(Rest, NewData, KeepInChannelMap).
 
 safe_call_remove_channel(_ResId, _Mod, undefined = State, _ChannelID) ->
     {ok, State};
@@ -1293,8 +1288,7 @@ channels_health_check(?status_connected = _ConnectorStatus, Data0) ->
         not channel_status_is_channel_added(Status)
     ],
     %% Attempt to add channels that are not added
-    ChannelsNotAddedWithConfigs =
-        get_config_for_channels(Data0, ChannelsNotAdded),
+    ChannelsNotAddedWithConfigs = get_config_for_channels(Data0, ChannelsNotAdded),
     Data1 = add_channels_in_list(ChannelsNotAddedWithConfigs, Data0),
     %% Now that we have done the adding, we can get the status of all channels
     Data2 = trigger_health_check_for_added_channels(Data1),
@@ -1428,12 +1422,7 @@ get_channel_health_check_interval(ChannelId, NewChanStatus, PreviousChanStatus,
 -spec trigger_health_check_for_added_channels(data()) -> data().
 trigger_health_check_for_added_channels(Data0 = #data{hc_workers = HCWorkers0}) ->
     #{
-        channel :=
-            #{
-                %% TODO: rm pending
-                %% pending := CPending0,
-                ongoing := Ongoing0
-            }
+        channel := #{ongoing := Ongoing0}
     } = HCWorkers0,
     NewOngoing = maps:filter(
         fun(ChannelId, OldStatus) ->
@@ -1526,15 +1515,15 @@ worker_channel_health_check(Data, ChannelId) ->
     exit({ok, channel_status(RawStatus, ChannelConfig)}).
 
 -spec handle_channel_health_check_worker_down(
-    data(), {pid(), reference()}, {ok, channel_status_map()}
+    data(), pid(), {ok, channel_status_map()}
 ) ->
     gen_statem:event_handler_result(state(), data()).
-handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) ->
+handle_channel_health_check_worker_down(Data0, Pid, ExitResult) ->
     #data{
         hc_workers = HCWorkers0 = #{channel := CHCWorkers0},
         added_channels = AddedChannels0
     } = Data0,
-    {ChannelId, CHCWorkers1} = maps:take(WorkerRef, CHCWorkers0),
+    {ChannelId, CHCWorkers1} = maps:take(Pid, CHCWorkers0),
     %% The channel might have got removed while the health check was going on
     CurrentStatus = maps:get(ChannelId, AddedChannels0, channel_not_added),
     {AddedChannels, NewStatus} =
@@ -1863,3 +1852,42 @@ log_level(false) -> warning.
 tag(Group, Type) ->
     Str = emqx_utils_conv:str(Group) ++ "/" ++ emqx_utils_conv:str(Type),
     string:uppercase(Str).
+
+%% When a resource enters a `?status_disconnected' state, late channel health check
+%% replies are useless and could corrup state.
+-spec handle_abort_all_channel_health_checks(data()) -> data().
+handle_abort_all_channel_health_checks(Data0) ->
+    #data{
+        hc_workers = #{channel := CHCWorkers} = HCWorkers0,
+        hc_pending_callers = #{channel := CPending} = Pending0
+    } = Data0,
+    lists:foreach(
+        fun(From) ->
+            gen_statem:reply(From, {error, resource_disconnected})
+        end,
+        lists:flatten(maps:values(CPending))
+    ),
+    maps:foreach(
+        fun
+            (Pid, _ChannelId) when is_pid(Pid) ->
+                abort_channel_health_check(Pid);
+            (_, _) ->
+                ok
+        end,
+        CHCWorkers
+    ),
+    HCWorkers = HCWorkers0#{channel := #{ongoing => #{}}},
+    Pending = Pending0#{channel := #{}},
+    Data0#data{
+        hc_workers = HCWorkers,
+        hc_pending_callers = Pending
+    }.
+
+abort_channel_health_check(Pid) ->
+    %% We're already linked to the worker pids due to `spawn_link'.
+    exit(Pid, kill),
+    %% Clean the exit signal so it doesn't contaminate state handling.
+    receive
+        {'EXIT', Pid, _} ->
+            ok
+    end.

+ 36 - 0
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -312,6 +312,18 @@ on_get_status(_InstId, #{pid := Pid, health_check_error := {delay, Delay}}) ->
         true -> ?status_connected;
         false -> ?status_disconnected
     end;
+on_get_status(ConnResId, #{health_check_agent := Agent}) ->
+    case get_agent_health_check_action(Agent, resource_health_check) of
+        {ask, Pid} ->
+            Alias = alias([reply]),
+            Pid ! {waiting_health_check_result, Alias, resource, ConnResId},
+            receive
+                {Alias, Result} ->
+                    Result
+            end;
+        Result ->
+            Result
+    end;
 on_get_status(_InstId, #{pid := Pid}) ->
     timer:sleep(300),
     case is_process_alive(Pid) of
@@ -332,6 +344,18 @@ on_remove_channel(ConnResId, ConnSt0, ChanId) ->
 on_get_channels(ConnResId) ->
     persistent_term:get(?PT_CHAN_KEY(ConnResId), []).
 
+on_get_channel_status(ConnResId, ChanId, #{health_check_agent := Agent}) ->
+    case get_agent_health_check_action(Agent, channel_health_check) of
+        {ask, Pid} ->
+            Alias = alias([reply]),
+            Pid ! {waiting_health_check_result, Alias, channel, ConnResId, ChanId},
+            receive
+                {Alias, Result} ->
+                    Result
+            end;
+        Result ->
+            Result
+    end;
 on_get_channel_status(_ConnResId, ChanId, #{channels := Chans}) ->
     case Chans of
         #{ChanId := #{health_check_delay := Delay}} ->
@@ -512,3 +536,15 @@ do_add_channel(ConnResId, ChanId, ChanCfg) ->
 do_remove_channel(ConnResId, ChanId) ->
     Chans = persistent_term:get(?PT_CHAN_KEY(ConnResId), []),
     persistent_term:put(?PT_CHAN_KEY(ConnResId), proplists:delete(ChanId, Chans)).
+
+get_agent_health_check_action(Agent, Key) ->
+    emqx_utils_agent:get_and_update(Agent, fun(Old) ->
+        case Old of
+            #{Key := [Action]} ->
+                {Action, Old};
+            #{Key := [Action | Actions]} ->
+                {Action, Old#{Key := Actions}};
+            #{Key := Action} when not is_list(Action) ->
+                {Action, Old}
+        end
+    end).

+ 110 - 1
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -32,6 +32,10 @@
 -define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX),
     {query, FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX}
 ).
+-define(tpal(MSG), begin
+    ct:pal(MSG),
+    ?tp(notice, MSG, #{})
+end).
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
@@ -3241,6 +3245,109 @@ t_force_stop(_Config) ->
     ),
     ok.
 
+%% https://emqx.atlassian.net/browse/EEC-1101
+t_resource_and_channel_health_check_race(_Config) ->
+    ?check_trace(
+        #{timetrap => 5_000},
+        begin
+            %% 0) Connector and channel are initially healthy and in resource manager
+            %%    state.
+            AgentState0 = #{
+                resource_health_check => connected,
+                channel_health_check => connected
+            },
+            {ok, Agent} = emqx_utils_agent:start_link(AgentState0),
+            ConnName = <<"cname">>,
+            %% Needs to have this form to satifisfy internal, implicit requirements of
+            %% `emqx_resource_cache'.
+            ConnResId = <<"connector:ctype:", ConnName/binary>>,
+            {ok, _} =
+                create(
+                    ConnResId,
+                    ?DEFAULT_RESOURCE_GROUP,
+                    ?TEST_RESOURCE,
+                    #{
+                        name => test_resource,
+                        health_check_agent => Agent
+                    },
+                    #{
+                        health_check_interval => 100,
+                        start_timeout => 100
+                    }
+                ),
+            %% Needs to have this form to satifisfy internal, implicit requirements of
+            %% `emqx_resource_cache'.
+            ChanId = <<"action:atype:aname:", ConnResId/binary>>,
+            ok =
+                emqx_resource_manager:add_channel(
+                    ConnResId,
+                    ChanId,
+                    #{resource_opts => #{health_check_interval => 100}}
+                ),
+            ?assertMatch({ok, connected}, emqx_resource:health_check(ConnResId)),
+            ?assertMatch(
+                #{status := connected}, emqx_resource:channel_health_check(ConnResId, ChanId)
+            ),
+
+            %% 1) Connector and channel HCs fire concurrently
+            ?tpal("1) Connector and channel HCs fire concurrently"),
+            Me = self(),
+            _ = emqx_utils_agent:get_and_update(Agent, fun(Old) ->
+                {Old, Old#{
+                    resource_health_check := {ask, Me},
+                    channel_health_check := [{ask, Me}, connected]
+                }}
+            end),
+            receive
+                {waiting_health_check_result, ConnHCAlias1, resource, _ConnResId1} ->
+                    ?tpal("received connector hc request"),
+                    ok
+            end,
+            receive
+                {waiting_health_check_result, ChanHCAlias1, channel, _ConnResId2, _ChanId} ->
+                    ?tpal("received channel hc request"),
+                    ok
+            end,
+            %% 2) Connector HC returns `disconnected'.  This makes manager call
+            %%    `on_remove_channel' for each channel, removing them from the connector
+            %%    state.
+            ?tpal("2) Connector HC returns `disconnected'"),
+            _ = emqx_utils_agent:get_and_update(Agent, fun(Old) ->
+                {Old, Old#{resource_health_check := connected}}
+            end),
+            {_, {ok, _}} =
+                ?wait_async_action(
+                    ConnHCAlias1 ! {ConnHCAlias1, disconnected},
+                    #{?snk_kind := resource_disconnected_enter}
+                ),
+            %% 3) Channel HC returns `connected'.
+            ?tpal("3) Channel HC returns `connected'"),
+            ChanHCAlias1 ! {ChanHCAlias1, connected},
+            %% 4) A new connector HC returns `connected'.
+            ?tpal("4) A new connector HC returns `connected'"),
+            ?assertMatch({ok, connected}, emqx_resource:health_check(ConnResId)),
+            ?assertMatch(
+                #{status := connected}, emqx_resource:channel_health_check(ConnResId, ChanId)
+            ),
+            %% 5) Should contain action both in connector state and in resource manager's
+            %% `added_channels'.  Original bug: connector state didn't contain the channel
+            %% state, but `added_channels' did.
+            ?assertMatch(
+                [
+                    {?DEFAULT_RESOURCE_GROUP, #{
+                        status := connected,
+                        state := #{channels := #{ChanId := _}},
+                        added_channels := #{ChanId := _}
+                    }}
+                ],
+                emqx_resource_cache:read(ConnResId)
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Helpers
 %%------------------------------------------------------------------------------
@@ -3495,7 +3602,9 @@ create(Id, Group, Type, Config) ->
     emqx_resource:create_local(Id, Group, Type, Config, #{}).
 
 create(Id, Group, Type, Config, Opts) ->
-    emqx_resource:create_local(Id, Group, Type, Config, Opts).
+    Res = emqx_resource:create_local(Id, Group, Type, Config, Opts),
+    on_exit(fun() -> emqx_resource:remove_local(Id) end),
+    Res.
 
 log_consistency_prop() ->
     {"check state and cache consistency", fun ?MODULE:log_consistency_prop/1}.

+ 1 - 0
changes/ce/fix-14001.en.md

@@ -0,0 +1 @@
+Fixed a race condition where a resource (connector/action/source/authentication/authorization) could report having a connected, healthy channel where in actuality it was broken, after a moment of disconnection.