Ver código fonte

fix(resource): remove dry run resources asynchronously

Fixes https://emqx.atlassian.net/browse/EMQX-13583

Removal is now done asynchronously by the cache cleaner.  If the resource process is stuck
and not responding to calls, doing the removal synchronously here would take more time
than the defined timeout, possibly timing out HTTP API requests.
Thales Macedo Garitezi 1 ano atrás
pai
commit
acc3f255f6

+ 48 - 0
apps/emqx_connector/test/emqx_connector_SUITE.erl

@@ -20,6 +20,7 @@
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(CONNECTOR, emqx_connector_dummy_impl).
 
@@ -327,6 +328,53 @@ t_no_buffer_workers(Config) ->
     ?assertEqual([], supervisor:which_children(emqx_resource_buffer_worker_sup)),
     ok.
 
+%% Checks that the maximum timeout (currently) set by `resource_opts.health_check_timeout'
+%% is respected when doing a dry run, even if the removal gets stuck because the resource
+%% process is unresponsive.
+t_dryrun_timeout({'init', Config}) ->
+    meck:new(emqx_connector_resource, [passthrough]),
+    meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR),
+    meck:new(?CONNECTOR, [non_strict]),
+    meck:expect(?CONNECTOR, resource_type, 0, dummy),
+    meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible),
+    %% hang forever
+    meck:expect(?CONNECTOR, on_start, fun(_ConnResId, _Opts) ->
+        receive
+            go -> ok
+        end
+    end),
+    meck:expect(?CONNECTOR, on_get_channels, 1, []),
+    meck:expect(?CONNECTOR, on_add_channel, 4, {ok, connector_state}),
+    meck:expect(?CONNECTOR, on_stop, 2, ok),
+    meck:expect(?CONNECTOR, on_get_status, 2, connected),
+    meck:expect(?CONNECTOR, query_mode, 1, sync),
+    Config;
+t_dryrun_timeout({'end', _Config}) ->
+    meck:unload(),
+    ok;
+t_dryrun_timeout(Config) when is_list(Config) ->
+    Type = kafka_producer,
+    Conf0 = connector_config(),
+    Timeout = 100,
+    Conf = Conf0#{<<"resource_opts">> => #{<<"health_check_interval">> => Timeout}},
+    %% Minimum timeout is capped at 5 s in `emqx_resource_manager'...  Plus, we need to
+    %% wait for removal of stuck process, which itself has another 5 s timeout.
+    ct:timetrap(15_000),
+    %% Cache cleaner is triggered when the process initiating the dry run dies.
+    Pid = spawn_link(fun() ->
+        Res = emqx_connector_resource:create_dry_run(Type, Conf),
+        ?assertEqual({error, timeout}, Res),
+        ok
+    end),
+    MRef = monitor(process, Pid),
+    receive
+        {'DOWN', MRef, _, _, _} ->
+            ok
+    end,
+    %% Should be removed asynchronously by cache cleaner.
+    ?retry(1_000, 7, ?assertEqual([], emqx_resource:list_instances())),
+    ok.
+
 %% helpers
 
 connector_config() ->

+ 1 - 0
apps/emqx_resource/src/emqx_resource_cache_cleaner.erl

@@ -126,6 +126,7 @@ handle_down_dry_run([ID | Rest], Pid, State0) ->
     %% No need to wait here: since it's a dry run resource, it won't be recreated,
     %% assuming the ID is random enough.
     spawn(fun() ->
+        _ = emqx_resource_manager:remove(ID),
         emqx_resource_manager_sup:delete_child(ID),
         ?tp("resource_cache_cleaner_deleted_child", #{id => ID})
     end),

+ 5 - 2
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -273,10 +273,13 @@ create_dry_run(ResId, ResourceType, Config, OnReadyCallback) ->
                     Error
             end;
         {error, Reason} ->
-            _ = remove(ResId),
+            %% Removal is done asynchronously.  See comment below.
             {error, Reason};
         timeout ->
-            _ = remove(ResId),
+            %% Removal is done asynchronously by the cache cleaner.  If the resource
+            %% process is stuck and not responding to calls, doing the removal
+            %% synchronously here would take more time than the defined timeout, possibly
+            %% timing out HTTP API requests.
             {error, timeout}
     end.
 

+ 6 - 1
apps/emqx_resource/src/emqx_resource_manager_sup.erl

@@ -56,11 +56,16 @@ init([]) ->
     {ok, {SupFlags, ChildSpecs}}.
 
 child_spec(ResId, Group, ResourceType, Config, Opts) ->
+    RestartType =
+        case emqx_resource:is_dry_run(ResId) of
+            true -> temporary;
+            false -> transient
+        end,
     #{
         id => ResId,
         start =>
             {emqx_resource_manager, start_link, [ResId, Group, ResourceType, Config, Opts]},
-        restart => transient,
+        restart => RestartType,
         %% never force kill a resource manager.
         %% because otherwise it may lead to release leak,
         %% resource_manager's terminate callback calls resource on_stop

+ 22 - 19
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -1120,29 +1120,32 @@ create_dry_run_local_succ() ->
 
 t_create_dry_run_local_failed(_) ->
     ct:timetrap({seconds, 120}),
-    ct:pal("creating with creation error"),
-    Res1 = emqx_resource:create_dry_run_local(
-        ?TEST_RESOURCE,
-        #{create_error => true}
-    ),
-    ?assertMatch({error, _}, Res1),
+    emqx_utils:nolink_apply(fun() ->
+        ct:pal("creating with creation error"),
+        Res1 = emqx_resource:create_dry_run_local(
+            ?TEST_RESOURCE,
+            #{create_error => true}
+        ),
+        ?assertMatch({error, _}, Res1),
 
-    ct:pal("creating with health check error"),
-    Res2 = emqx_resource:create_dry_run_local(
-        ?TEST_RESOURCE,
-        #{name => test_resource, health_check_error => true}
-    ),
-    ?assertMatch({error, _}, Res2),
+        ct:pal("creating with health check error"),
+        Res2 = emqx_resource:create_dry_run_local(
+            ?TEST_RESOURCE,
+            #{name => test_resource, health_check_error => true}
+        ),
+        ?assertMatch({error, _}, Res2),
 
-    ct:pal("creating with stop error"),
-    Res3 = emqx_resource:create_dry_run_local(
-        ?TEST_RESOURCE,
-        #{name => test_resource, stop_error => true}
-    ),
-    ?assertEqual(ok, Res3),
+        ct:pal("creating with stop error"),
+        Res3 = emqx_resource:create_dry_run_local(
+            ?TEST_RESOURCE,
+            #{name => test_resource, stop_error => true}
+        ),
+        ?assertEqual(ok, Res3),
+        ok
+    end),
     ?retry(
         100,
-        5,
+        50,
         ?assertEqual(
             [],
             emqx_resource:list_instances_verbose()