Просмотр исходного кода

Merge pull request #14297 from thalesmg/20241127-r584-async-resource-removal

fix(resource): remove dry run resources asynchronously
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
b0bdb220a1

+ 48 - 0
apps/emqx_connector/test/emqx_connector_SUITE.erl

@@ -20,6 +20,7 @@
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(CONNECTOR, emqx_connector_dummy_impl).
 
@@ -327,6 +328,53 @@ t_no_buffer_workers(Config) ->
     ?assertEqual([], supervisor:which_children(emqx_resource_buffer_worker_sup)),
     ok.
 
+%% Checks that the maximum timeout (currently) set by `resource_opts.health_check_timeout'
+%% is respected when doing a dry run, even if the removal gets stuck because the resource
+%% process is unresponsive.
+t_dryrun_timeout({'init', Config}) ->
+    meck:new(emqx_connector_resource, [passthrough]),
+    meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR),
+    meck:new(?CONNECTOR, [non_strict]),
+    meck:expect(?CONNECTOR, resource_type, 0, dummy),
+    meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible),
+    %% hang forever
+    meck:expect(?CONNECTOR, on_start, fun(_ConnResId, _Opts) ->
+        receive
+            go -> ok
+        end
+    end),
+    meck:expect(?CONNECTOR, on_get_channels, 1, []),
+    meck:expect(?CONNECTOR, on_add_channel, 4, {ok, connector_state}),
+    meck:expect(?CONNECTOR, on_stop, 2, ok),
+    meck:expect(?CONNECTOR, on_get_status, 2, connected),
+    meck:expect(?CONNECTOR, query_mode, 1, sync),
+    Config;
+t_dryrun_timeout({'end', _Config}) ->
+    meck:unload(),
+    ok;
+t_dryrun_timeout(Config) when is_list(Config) ->
+    Type = kafka_producer,
+    Conf0 = connector_config(),
+    Timeout = 100,
+    Conf = Conf0#{<<"resource_opts">> => #{<<"health_check_interval">> => Timeout}},
+    %% Minimum timeout is capped at 5 s in `emqx_resource_manager'...  Plus, we need to
+    %% wait for removal of stuck process, which itself has another 5 s timeout.
+    ct:timetrap(15_000),
+    %% Cache cleaner is triggered when the process initiating the dry run dies.
+    Pid = spawn_link(fun() ->
+        Res = emqx_connector_resource:create_dry_run(Type, Conf),
+        ?assertEqual({error, timeout}, Res),
+        ok
+    end),
+    MRef = monitor(process, Pid),
+    receive
+        {'DOWN', MRef, _, _, _} ->
+            ok
+    end,
+    %% Should be removed asynchronously by cache cleaner.
+    ?retry(1_000, 7, ?assertEqual([], emqx_resource:list_instances())),
+    ok.
+
 %% helpers
 
 connector_config() ->

+ 18 - 5
apps/emqx_resource/src/emqx_resource_cache_cleaner.erl

@@ -79,7 +79,7 @@ handle_call(_Request, _From, State) ->
     {reply, ok, State}.
 
 handle_cast(#add_dry_run{id = ID, pid = Pid}, #{dry_run_pmon := Pmon0} = State0) ->
-    Pmon = emqx_pmon:monitor(Pid, ID, Pmon0),
+    Pmon = append_monitor(Pmon0, Pid, ID),
     State = State0#{dry_run_pmon := Pmon},
     {noreply, State};
 handle_cast(_Msg, State) ->
@@ -108,8 +108,8 @@ handle_down(Pid, State0) ->
             handle_down_cache(ID, Pid, State0);
         error ->
             case emqx_pmon:find(Pid, DryrunPmon) of
-                {ok, ID} ->
-                    handle_down_dry_run(ID, Pid, State0);
+                {ok, IDs} ->
+                    handle_down_dry_run(IDs, Pid, State0);
                 error ->
                     State0
             end
@@ -121,16 +121,20 @@ handle_down_cache(ID, Pid, State0) ->
     Pmon = emqx_pmon:erase(Pid, Pmon0),
     State0#{cache_pmon := Pmon}.
 
-handle_down_dry_run(ID, Pid, State0) ->
+handle_down_dry_run([ID | Rest], Pid, State0) ->
     #{dry_run_pmon := Pmon0} = State0,
     %% No need to wait here: since it's a dry run resource, it won't be recreated,
     %% assuming the ID is random enough.
     spawn(fun() ->
+        _ = emqx_resource_manager:remove(ID),
         emqx_resource_manager_sup:delete_child(ID),
         ?tp("resource_cache_cleaner_deleted_child", #{id => ID})
     end),
     Pmon = emqx_pmon:erase(Pid, Pmon0),
-    State0#{dry_run_pmon := Pmon}.
+    State = State0#{dry_run_pmon := Pmon},
+    handle_down_dry_run(Rest, Pid, State);
+handle_down_dry_run([], _Pid, State) ->
+    State.
 
 maybe_erase_cache(DownManager, ID) ->
     case emqx_resource_cache:read_manager_pid(ID) =:= DownManager of
@@ -141,3 +145,12 @@ maybe_erase_cache(DownManager, ID) ->
             %% restart by supervisor
             ok
     end.
+
+append_monitor(Pmon0, Pid, Value) ->
+    case emqx_pmon:find(Pid, Pmon0) of
+        error ->
+            emqx_pmon:monitor(Pid, [Value], Pmon0);
+        {ok, Values} ->
+            Pmon = emqx_pmon:demonitor(Pid, Pmon0),
+            emqx_pmon:monitor(Pid, [Value | Values], Pmon)
+    end.

+ 8 - 3
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -273,10 +273,13 @@ create_dry_run(ResId, ResourceType, Config, OnReadyCallback) ->
                     Error
             end;
         {error, Reason} ->
-            _ = remove(ResId),
+            %% Removal is done asynchronously.  See comment below.
             {error, Reason};
         timeout ->
-            _ = remove(ResId),
+            %% Removal is done asynchronously by the cache cleaner.  If the resource
+            %% process is stuck and not responding to calls, doing the removal
+            %% synchronously here would take more time than the defined timeout, possibly
+            %% timing out HTTP API requests.
             {error, timeout}
     end.
 
@@ -1750,7 +1753,9 @@ safe_call(ResId, Message, Timeout) ->
         exit:{R, _} when R == noproc; R == normal; R == shutdown ->
             {error, not_found};
         exit:{timeout, _} ->
-            {error, timeout}
+            {error, timeout};
+        exit:{{shutdown, removed}, _} ->
+            {error, not_found}
     end.
 
 %% Helper functions for chanel status data

+ 6 - 1
apps/emqx_resource/src/emqx_resource_manager_sup.erl

@@ -56,11 +56,16 @@ init([]) ->
     {ok, {SupFlags, ChildSpecs}}.
 
 child_spec(ResId, Group, ResourceType, Config, Opts) ->
+    RestartType =
+        case emqx_resource:is_dry_run(ResId) of
+            true -> temporary;
+            false -> transient
+        end,
     #{
         id => ResId,
         start =>
             {emqx_resource_manager, start_link, [ResId, Group, ResourceType, Config, Opts]},
-        restart => transient,
+        restart => RestartType,
         %% never force kill a resource manager.
         %% because otherwise it may lead to release leak,
         %% resource_manager's terminate callback calls resource on_stop

+ 22 - 19
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -1120,29 +1120,32 @@ create_dry_run_local_succ() ->
 
 t_create_dry_run_local_failed(_) ->
     ct:timetrap({seconds, 120}),
-    ct:pal("creating with creation error"),
-    Res1 = emqx_resource:create_dry_run_local(
-        ?TEST_RESOURCE,
-        #{create_error => true}
-    ),
-    ?assertMatch({error, _}, Res1),
+    emqx_utils:nolink_apply(fun() ->
+        ct:pal("creating with creation error"),
+        Res1 = emqx_resource:create_dry_run_local(
+            ?TEST_RESOURCE,
+            #{create_error => true}
+        ),
+        ?assertMatch({error, _}, Res1),
 
-    ct:pal("creating with health check error"),
-    Res2 = emqx_resource:create_dry_run_local(
-        ?TEST_RESOURCE,
-        #{name => test_resource, health_check_error => true}
-    ),
-    ?assertMatch({error, _}, Res2),
+        ct:pal("creating with health check error"),
+        Res2 = emqx_resource:create_dry_run_local(
+            ?TEST_RESOURCE,
+            #{name => test_resource, health_check_error => true}
+        ),
+        ?assertMatch({error, _}, Res2),
 
-    ct:pal("creating with stop error"),
-    Res3 = emqx_resource:create_dry_run_local(
-        ?TEST_RESOURCE,
-        #{name => test_resource, stop_error => true}
-    ),
-    ?assertEqual(ok, Res3),
+        ct:pal("creating with stop error"),
+        Res3 = emqx_resource:create_dry_run_local(
+            ?TEST_RESOURCE,
+            #{name => test_resource, stop_error => true}
+        ),
+        ?assertEqual(ok, Res3),
+        ok
+    end),
     ?retry(
         100,
-        5,
+        50,
         ?assertEqual(
             [],
             emqx_resource:list_instances_verbose()