Просмотр исходного кода

Merge pull request #10813 from thalesmg/refactor-kafka-on-stop-v50

feat(kafka): ensure allocated resources are removed on failures
Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
67e182e0c9

+ 6 - 2
Makefile

@@ -104,7 +104,7 @@ APPS=$(shell $(SCRIPTS)/find-apps.sh)
 
 .PHONY: $(APPS:%=%-ct)
 define gen-app-ct-target
-$1-ct: $(REBAR) merge-config
+$1-ct: $(REBAR) merge-config clean-test-cluster-config
 	$(eval SUITES := $(shell $(SCRIPTS)/find-suites.sh $1))
 ifneq ($(SUITES),)
 		@ENABLE_COVER_COMPILE=1 $(REBAR) ct -c -v \
@@ -127,7 +127,7 @@ endef
 $(foreach app,$(APPS),$(eval $(call gen-app-prop-target,$(app))))
 
 .PHONY: ct-suite
-ct-suite: $(REBAR) merge-config
+ct-suite: $(REBAR) merge-config clean-test-cluster-config
 ifneq ($(TESTCASE),)
 ifneq ($(GROUP),)
 	$(REBAR) ct -v --readable=$(CT_READABLE) --name $(CT_NODE_NAME) --suite $(SUITE)  --case $(TESTCASE) --group $(GROUP)
@@ -294,3 +294,7 @@ fmt: $(REBAR)
 	@$(SCRIPTS)/erlfmt -w '{apps,lib-ee}/*/{src,include,test}/**/*.{erl,hrl,app.src}'
 	@$(SCRIPTS)/erlfmt -w 'rebar.config.erl'
 	@mix format
+
+.PHONY: clean-test-cluster-config
+clean-test-cluster-config:
+	@rm -f apps/emqx_conf/data/configs/cluster.hocon || true

+ 2 - 0
apps/emqx/test/emqx_authentication_SUITE.erl

@@ -98,6 +98,8 @@ init_per_suite(Config) ->
     LogLevel = emqx_logger:get_primary_log_level(),
     ok = emqx_logger:set_log_level(debug),
     application:set_env(ekka, strict_mode, true),
+    emqx_config:erase_all(),
+    emqx_common_test_helpers:stop_apps([]),
     emqx_common_test_helpers:boot_modules(all),
     emqx_common_test_helpers:start_apps([]),
     [{log_level, LogLevel} | Config].

+ 22 - 5
apps/emqx_bridge/src/emqx_bridge.erl

@@ -227,9 +227,13 @@ post_config_update(_, _Req, NewConf, OldConf, _AppEnv) ->
         diff_confs(NewConf, OldConf),
     %% The config update will be failed if any task in `perform_bridge_changes` failed.
     Result = perform_bridge_changes([
-        {fun emqx_bridge_resource:remove/4, Removed},
-        {fun emqx_bridge_resource:create/4, Added},
-        {fun emqx_bridge_resource:update/4, Updated}
+        #{action => fun emqx_bridge_resource:remove/4, data => Removed},
+        #{
+            action => fun emqx_bridge_resource:create/4,
+            data => Added,
+            on_exception_fn => fun emqx_bridge_resource:remove/4
+        },
+        #{action => fun emqx_bridge_resource:update/4, data => Updated}
     ]),
     ok = unload_hook(),
     ok = load_hook(NewConf),
@@ -345,7 +349,8 @@ perform_bridge_changes(Tasks) ->
 
 perform_bridge_changes([], Result) ->
     Result;
-perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) ->
+perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Result0) ->
+    OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end),
     Result = maps:fold(
         fun
             ({_Type, _Name}, _Conf, {error, Reason}) ->
@@ -359,9 +364,21 @@ perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) ->
                 end;
             ({Type, Name}, Conf, _) ->
                 ResOpts = emqx_resource:fetch_creation_opts(Conf),
-                case Action(Type, Name, Conf, ResOpts) of
+                try Action(Type, Name, Conf, ResOpts) of
                     {error, Reason} -> {error, Reason};
                     Return -> Return
+                catch
+                    Kind:Error:Stacktrace ->
+                        ?SLOG(error, #{
+                            msg => "bridge_config_update_exception",
+                            kind => Kind,
+                            error => Error,
+                            type => Type,
+                            name => Name,
+                            stacktrace => Stacktrace
+                        }),
+                        OnException(Type, Name, Conf, ResOpts),
+                        erlang:raise(Kind, Error, Stacktrace)
                 end
         end,
         Result0,

+ 1 - 1
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_kafka, [
     {description, "EMQX Enterprise Kafka Bridge"},
-    {vsn, "0.1.2"},
+    {vsn, "0.1.3"},
     {registered, [emqx_bridge_kafka_consumer_sup]},
     {applications, [
         kernel,

+ 46 - 12
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl

@@ -101,6 +101,10 @@
     " the connection parameters."
 ).
 
+%% Allocatable resources
+-define(kafka_client_id, kafka_client_id).
+-define(kafka_subscriber_id, kafka_subscriber_id).
+
 %%-------------------------------------------------------------------------------------
 %% `emqx_resource' API
 %%-------------------------------------------------------------------------------------
@@ -140,6 +144,7 @@ on_start(ResourceId, Config) ->
             Auth -> [{sasl, emqx_bridge_kafka_impl:sasl(Auth)}]
         end,
     ClientOpts = add_ssl_opts(ClientOpts0, SSL),
+    ok = emqx_resource:allocate_resource(ResourceId, ?kafka_client_id, ClientID),
     case brod:start_client(BootstrapHosts, ClientID, ClientOpts) of
         ok ->
             ?tp(
@@ -163,7 +168,21 @@ on_start(ResourceId, Config) ->
     start_consumer(Config, ResourceId, ClientID).
 
 -spec on_stop(resource_id(), state()) -> ok.
-on_stop(_ResourceID, State) ->
+on_stop(ResourceId, _State = undefined) ->
+    case emqx_resource:get_allocated_resources(ResourceId) of
+        #{?kafka_client_id := ClientID, ?kafka_subscriber_id := SubscriberId} ->
+            stop_subscriber(SubscriberId),
+            stop_client(ClientID),
+            ?tp(kafka_consumer_subcriber_and_client_stopped, #{}),
+            ok;
+        #{?kafka_client_id := ClientID} ->
+            stop_client(ClientID),
+            ?tp(kafka_consumer_just_client_stopped, #{}),
+            ok;
+        _ ->
+            ok
+    end;
+on_stop(_ResourceId, State) ->
     #{
         subscriber_id := SubscriberId,
         kafka_client_id := ClientID
@@ -333,6 +352,9 @@ start_consumer(Config, ResourceId, ClientID) ->
     %% spawns one worker for each assigned topic-partition
     %% automatically, so we should not spawn duplicate workers.
     SubscriberId = make_subscriber_id(BridgeName),
+    ?tp(kafka_consumer_about_to_start_subscriber, #{}),
+    ok = emqx_resource:allocate_resource(ResourceId, ?kafka_subscriber_id, SubscriberId),
+    ?tp(kafka_consumer_subscriber_allocated, #{}),
     case emqx_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of
         {ok, _ConsumerPid} ->
             ?tp(
@@ -359,7 +381,13 @@ start_consumer(Config, ResourceId, ClientID) ->
 stop_subscriber(SubscriberId) ->
     _ = log_when_error(
         fun() ->
-            emqx_bridge_kafka_consumer_sup:ensure_child_deleted(SubscriberId)
+            try
+                emqx_bridge_kafka_consumer_sup:ensure_child_deleted(SubscriberId)
+            catch
+                exit:{noproc, _} ->
+                    %% may happen when node is shutting down
+                    ok
+            end
         end,
         #{
             msg => "failed_to_delete_kafka_subscriber",
@@ -443,16 +471,22 @@ do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
     end.
 
 are_subscriber_workers_alive(SubscriberId) ->
-    Children = supervisor:which_children(emqx_bridge_kafka_consumer_sup),
-    case lists:keyfind(SubscriberId, 1, Children) of
-        false ->
-            false;
-        {_, Pid, _, _} ->
-            Workers = brod_group_subscriber_v2:get_workers(Pid),
-            %% we can't enforce the number of partitions on a single
-            %% node, as the group might be spread across an emqx
-            %% cluster.
-            lists:all(fun is_process_alive/1, maps:values(Workers))
+    try
+        Children = supervisor:which_children(emqx_bridge_kafka_consumer_sup),
+        case lists:keyfind(SubscriberId, 1, Children) of
+            false ->
+                false;
+            {_, Pid, _, _} ->
+                Workers = brod_group_subscriber_v2:get_workers(Pid),
+                %% we can't enforce the number of partitions on a single
+                %% node, as the group might be spread across an emqx
+                %% cluster.
+                lists:all(fun is_process_alive/1, maps:values(Workers))
+        end
+    catch
+        exit:{shutdown, _} ->
+            %% may happen if node is shutting down
+            false
     end.
 
 log_when_error(Fun, Log) ->

+ 64 - 21
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -23,6 +23,11 @@
 
 -include_lib("emqx/include/logger.hrl").
 
+%% Allocatable resources
+-define(kafka_resource_id, kafka_resource_id).
+-define(kafka_client_id, kafka_client_id).
+-define(kafka_producers, kafka_producers).
+
 %% TODO: rename this to `kafka_producer' after alias support is added
 %% to hocon; keeping this as just `kafka' for backwards compatibility.
 -define(BRIDGE_TYPE, kafka).
@@ -46,9 +51,11 @@ on_start(InstId, Config) ->
     } = Config,
     BridgeType = ?BRIDGE_TYPE,
     ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
+    ok = emqx_resource:allocate_resource(InstId, ?kafka_resource_id, ResourceId),
     _ = maybe_install_wolff_telemetry_handlers(ResourceId),
     Hosts = emqx_bridge_kafka_impl:hosts(Hosts0),
     ClientId = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName),
+    ok = emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId),
     ClientConfig = #{
         min_metadata_refresh_interval => MinMetaRefreshInterval,
         connect_timeout => ConnTimeout,
@@ -86,6 +93,7 @@ on_start(InstId, Config) ->
     WolffProducerConfig = producers_config(BridgeName, ClientId, KafkaConfig, IsDryRun),
     case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of
         {ok, Producers} ->
+            ok = emqx_resource:allocate_resource(InstId, ?kafka_producers, Producers),
             {ok, #{
                 message_template => compile_message_template(MessageTemplate),
                 client_id => ClientId,
@@ -120,28 +128,63 @@ on_start(InstId, Config) ->
             )
     end.
 
-on_stop(_InstanceID, #{client_id := ClientID, producers := Producers, resource_id := ResourceID}) ->
-    _ = with_log_at_error(
-        fun() -> wolff:stop_and_delete_supervised_producers(Producers) end,
-        #{
-            msg => "failed_to_delete_kafka_producer",
-            client_id => ClientID
-        }
-    ),
-    _ = with_log_at_error(
-        fun() -> wolff:stop_and_delete_supervised_client(ClientID) end,
-        #{
-            msg => "failed_to_delete_kafka_client",
-            client_id => ClientID
-        }
-    ),
-    with_log_at_error(
-        fun() -> uninstall_telemetry_handlers(ResourceID) end,
+on_stop(InstanceId, _State) ->
+    case emqx_resource:get_allocated_resources(InstanceId) of
         #{
-            msg => "failed_to_uninstall_telemetry_handlers",
-            client_id => ClientID
-        }
-    ).
+            ?kafka_client_id := ClientId,
+            ?kafka_producers := Producers,
+            ?kafka_resource_id := ResourceId
+        } ->
+            _ = with_log_at_error(
+                fun() -> wolff:stop_and_delete_supervised_producers(Producers) end,
+                #{
+                    msg => "failed_to_delete_kafka_producer",
+                    client_id => ClientId
+                }
+            ),
+            _ = with_log_at_error(
+                fun() -> wolff:stop_and_delete_supervised_client(ClientId) end,
+                #{
+                    msg => "failed_to_delete_kafka_client",
+                    client_id => ClientId
+                }
+            ),
+            _ = with_log_at_error(
+                fun() -> uninstall_telemetry_handlers(ResourceId) end,
+                #{
+                    msg => "failed_to_uninstall_telemetry_handlers",
+                    resource_id => ResourceId
+                }
+            ),
+            ok;
+        #{?kafka_client_id := ClientId, ?kafka_resource_id := ResourceId} ->
+            _ = with_log_at_error(
+                fun() -> wolff:stop_and_delete_supervised_client(ClientId) end,
+                #{
+                    msg => "failed_to_delete_kafka_client",
+                    client_id => ClientId
+                }
+            ),
+            _ = with_log_at_error(
+                fun() -> uninstall_telemetry_handlers(ResourceId) end,
+                #{
+                    msg => "failed_to_uninstall_telemetry_handlers",
+                    resource_id => ResourceId
+                }
+            ),
+            ok;
+        #{?kafka_resource_id := ResourceId} ->
+            _ = with_log_at_error(
+                fun() -> uninstall_telemetry_handlers(ResourceId) end,
+                #{
+                    msg => "failed_to_uninstall_telemetry_handlers",
+                    resource_id => ResourceId
+                }
+            ),
+            ok;
+        _ ->
+            ok
+    end.
 
 on_query(
     _InstId,

+ 137 - 1
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl

@@ -59,7 +59,9 @@ only_once_tests() ->
         t_cluster_group,
         t_node_joins_existing_cluster,
         t_cluster_node_down,
-        t_multiple_topic_mappings
+        t_multiple_topic_mappings,
+        t_resource_manager_crash_after_subscriber_started,
+        t_resource_manager_crash_before_subscriber_started
     ].
 
 init_per_suite(Config) ->
@@ -333,6 +335,7 @@ init_per_testcase(TestCase, Config) ->
 common_init_per_testcase(TestCase, Config0) ->
     ct:timetrap(timer:seconds(60)),
     delete_all_bridges(),
+    emqx_config:delete_override_conf_files(),
     KafkaTopic =
         <<
             (atom_to_binary(TestCase))/binary,
@@ -1117,6 +1120,24 @@ stop_async_publisher(Pid) ->
     end,
     ok.
 
+kill_resource_managers() ->
+    ct:pal("gonna kill resource managers"),
+    lists:foreach(
+        fun({_, Pid, _, _}) ->
+            ct:pal("terminating resource manager ~p", [Pid]),
+            Ref = monitor(process, Pid),
+            exit(Pid, kill),
+            receive
+                {'DOWN', Ref, process, Pid, killed} ->
+                    ok
+            after 500 ->
+                ct:fail("pid ~p didn't die!", [Pid])
+            end,
+            ok
+        end,
+        supervisor:which_children(emqx_resource_manager_sup)
+    ).
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -2019,3 +2040,118 @@ t_begin_offset_earliest(Config) ->
         end
     ),
     ok.
+
+t_resource_manager_crash_after_subscriber_started(Config) ->
+    ?check_trace(
+        begin
+            ?force_ordering(
+                #{?snk_kind := kafka_consumer_subscriber_allocated},
+                #{?snk_kind := will_kill_resource_manager}
+            ),
+            ?force_ordering(
+                #{?snk_kind := resource_manager_killed},
+                #{?snk_kind := kafka_consumer_subscriber_started}
+            ),
+            spawn_link(fun() ->
+                ?tp(will_kill_resource_manager, #{}),
+                kill_resource_managers(),
+                ?tp(resource_manager_killed, #{}),
+                ok
+            end),
+
+            %% even if the resource manager is dead, we can still
+            %% clear the allocated resources.
+
+            %% We avoid asserting only the `config_update_crashed'
+            %% error here because there's a race condition (just a
+            %% problem for the test assertion below) in which the
+            %% `emqx_resource_manager:create/5' call returns a failure
+            %% (not checked) and then `lookup' in that module is
+            %% delayed enough so that the manager supervisor has time
+            %% to restart the manager process and for the latter to
+            %% startup successfully.  Occurs frequently in CI...
+
+            {Res, {ok, _}} =
+                ?wait_async_action(
+                    create_bridge(Config),
+                    #{?snk_kind := kafka_consumer_subcriber_and_client_stopped},
+                    10_000
+                ),
+            case Res of
+                {error, {config_update_crashed, {killed, _}}} ->
+                    ok;
+                {ok, _} ->
+                    %% the new manager may have had time to startup
+                    %% before the resource status cache is read...
+                    ok;
+                _ ->
+                    ct:fail("unexpected result: ~p", [Res])
+            end,
+            ?assertMatch({ok, _}, delete_bridge(Config)),
+            ?retry(
+                _Sleep = 50,
+                _Attempts = 50,
+                ?assertEqual([], supervisor:which_children(emqx_bridge_kafka_consumer_sup))
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
+t_resource_manager_crash_before_subscriber_started(Config) ->
+    ?check_trace(
+        begin
+            ?force_ordering(
+                #{?snk_kind := kafka_consumer_client_started},
+                #{?snk_kind := will_kill_resource_manager}
+            ),
+            ?force_ordering(
+                #{?snk_kind := resource_manager_killed},
+                #{?snk_kind := kafka_consumer_about_to_start_subscriber}
+            ),
+            spawn_link(fun() ->
+                ?tp(will_kill_resource_manager, #{}),
+                kill_resource_managers(),
+                ?tp(resource_manager_killed, #{}),
+                ok
+            end),
+
+            %% even if the resource manager is dead, we can still
+            %% clear the allocated resources.
+
+            %% We avoid asserting only the `config_update_crashed'
+            %% error here because there's a race condition (just a
+            %% problem for the test assertion below) in which the
+            %% `emqx_resource_manager:create/5' call returns a failure
+            %% (not checked) and then `lookup' in that module is
+            %% delayed enough so that the manager supervisor has time
+            %% to restart the manager process and for the latter to
+            %% startup successfully.  Occurs frequently in CI...
+            {Res, {ok, _}} =
+                ?wait_async_action(
+                    create_bridge(Config),
+                    #{?snk_kind := kafka_consumer_just_client_stopped},
+                    10_000
+                ),
+            case Res of
+                {error, {config_update_crashed, {killed, _}}} ->
+                    ok;
+                {ok, _} ->
+                    %% the new manager may have had time to startup
+                    %% before the resource status cache is read...
+                    ok;
+                _ ->
+                    ct:fail("unexpected result: ~p", [Res])
+            end,
+            ?assertMatch({ok, _}, delete_bridge(Config)),
+            ?retry(
+                _Sleep = 50,
+                _Attempts = 50,
+                ?assertEqual([], supervisor:which_children(emqx_bridge_kafka_consumer_sup))
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.

+ 2 - 0
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl

@@ -446,6 +446,8 @@ t_failed_creation_then_fix(Config) ->
     ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
     %% TODO: refactor those into init/end per testcase
     ok = ?PRODUCER:on_stop(ResourceId, State),
+    ?assertEqual([], supervisor:which_children(wolff_client_sup)),
+    ?assertEqual([], supervisor:which_children(wolff_producers_sup)),
     ok = emqx_bridge_resource:remove(BridgeId),
     delete_all_bridges(),
     ok.

+ 8 - 4
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl

@@ -60,6 +60,10 @@
     sync_timeout := emqx_schema:duration_ms()
 }.
 
+%% Allocatable resources
+-define(pulsar_client_id, pulsar_client_id).
+-define(pulsar_producers, pulsar_producers).
+
 %%-------------------------------------------------------------------------------------
 %% `emqx_resource' API
 %%-------------------------------------------------------------------------------------
@@ -81,7 +85,7 @@ on_start(InstanceId, Config) ->
     } = Config,
     Servers = format_servers(Servers0),
     ClientId = make_client_id(InstanceId, BridgeName),
-    ok = emqx_resource:allocate_resource(InstanceId, pulsar_client_id, ClientId),
+    ok = emqx_resource:allocate_resource(InstanceId, ?pulsar_client_id, ClientId),
     SSLOpts = emqx_tls_lib:to_client_opts(SSL),
     ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)),
     ClientOpts = #{
@@ -119,7 +123,7 @@ on_start(InstanceId, Config) ->
 -spec on_stop(resource_id(), state()) -> ok.
 on_stop(InstanceId, _State) ->
     case emqx_resource:get_allocated_resources(InstanceId) of
-        #{pulsar_client_id := ClientId, pulsar_producers := Producers} ->
+        #{?pulsar_client_id := ClientId, ?pulsar_producers := Producers} ->
             stop_producers(ClientId, Producers),
             stop_client(ClientId),
             ?tp(pulsar_bridge_stopped, #{
@@ -128,7 +132,7 @@ on_stop(InstanceId, _State) ->
                 pulsar_producers => Producers
             }),
             ok;
-        #{pulsar_client_id := ClientId} ->
+        #{?pulsar_client_id := ClientId} ->
             stop_client(ClientId),
             ?tp(pulsar_bridge_stopped, #{
                 instance_id => InstanceId,
@@ -340,7 +344,7 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
     ?tp(pulsar_producer_about_to_start_producers, #{producer_name => ProducerName}),
     try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of
         {ok, Producers} ->
-            ok = emqx_resource:allocate_resource(InstanceId, pulsar_producers, Producers),
+            ok = emqx_resource:allocate_resource(InstanceId, ?pulsar_producers, Producers),
             ?tp(pulsar_producer_producers_allocated, #{}),
             State = #{
                 pulsar_client_id => ClientId,

+ 16 - 1
apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl

@@ -285,6 +285,11 @@ create_bridge(Config, Overrides) ->
     PulsarConfig = emqx_utils_maps:deep_merge(PulsarConfig0, Overrides),
     emqx_bridge:create(Type, Name, PulsarConfig).
 
+delete_bridge(Config) ->
+    Type = ?BRIDGE_TYPE_BIN,
+    Name = ?config(pulsar_name, Config),
+    emqx_bridge:remove(Type, Name).
+
 create_bridge_api(Config) ->
     create_bridge_api(Config, _Overrides = #{}).
 
@@ -541,8 +546,14 @@ kill_resource_managers() ->
     lists:foreach(
         fun({_, Pid, _, _}) ->
             ct:pal("terminating resource manager ~p", [Pid]),
-            %% sys:terminate(Pid, stop),
+            Ref = monitor(process, Pid),
             exit(Pid, kill),
+            receive
+                {'DOWN', Ref, process, Pid, killed} ->
+                    ok
+            after 500 ->
+                ct:fail("pid ~p didn't die!", [Pid])
+            end,
             ok
         end,
         supervisor:which_children(emqx_resource_manager_sup)
@@ -1002,6 +1013,8 @@ t_resource_manager_crash_after_producers_started(Config) ->
                         Producers =/= undefined,
                     10_000
                 ),
+            ?assertMatch({ok, _}, delete_bridge(Config)),
+            ?assertEqual([], get_pulsar_producers()),
             ok
         end,
         []
@@ -1033,6 +1046,8 @@ t_resource_manager_crash_before_producers_started(Config) ->
                     #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined},
                     10_000
                 ),
+            ?assertMatch({ok, _}, delete_bridge(Config)),
+            ?assertEqual([], get_pulsar_producers()),
             ok
         end,
         []

+ 1 - 1
apps/emqx_resource/src/emqx_resource.erl

@@ -533,7 +533,7 @@ clean_allocated_resources(ResourceId, ResourceMod) ->
         true ->
             %% The resource entries in the ETS table are erased inside
             %% `call_stop' if the call is successful.
-            ok = emqx_resource:call_stop(ResourceId, ResourceMod, _ResourceState = undefined),
+            ok = call_stop(ResourceId, ResourceMod, _ResourceState = undefined),
             ok;
         false ->
             ok

+ 1 - 2
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -192,14 +192,13 @@ remove(ResId) when is_binary(ResId) ->
 %% @doc Stops a running resource_manager and optionally clears the metrics for the resource
 -spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}.
 remove(ResId, ClearMetrics) when is_binary(ResId) ->
-    ResourceManagerPid = gproc:whereis_name(?NAME(ResId)),
     try
         safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION)
     after
         %% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process
         %% If the 'remove' call babove had succeeded, this is mostly a no-op but still needed to avoid race condition.
         %% Otherwise this is a 'infinity' shutdown, so it may take arbitrary long.
-        emqx_resource_manager_sup:delete_child(ResourceManagerPid)
+        emqx_resource_manager_sup:delete_child(ResId)
     end.
 
 %% @doc Stops and then starts an instance that was already running

+ 19 - 18
apps/emqx_resource/src/emqx_resource_manager_sup.erl

@@ -26,12 +26,12 @@
 -export([init/1]).
 
 ensure_child(ResId, Group, ResourceType, Config, Opts) ->
-    _ = supervisor:start_child(?MODULE, [ResId, Group, ResourceType, Config, Opts]),
+    _ = supervisor:start_child(?MODULE, child_spec(ResId, Group, ResourceType, Config, Opts)),
     ok.
 
-delete_child(Pid) ->
-    _ = supervisor:terminate_child(?MODULE, Pid),
-    _ = supervisor:delete_child(?MODULE, Pid),
+delete_child(ResId) ->
+    _ = supervisor:terminate_child(?MODULE, ResId),
+    _ = supervisor:delete_child(?MODULE, ResId),
     ok.
 
 start_link() ->
@@ -44,18 +44,19 @@ init([]) ->
         public,
         {read_concurrency, true}
     ]),
-    ChildSpecs = [
-        #{
-            id => emqx_resource_manager,
-            start => {emqx_resource_manager, start_link, []},
-            restart => transient,
-            %% never force kill a resource manager.
-            %% becasue otherwise it may lead to release leak,
-            %% resource_manager's terminate callback calls resource on_stop
-            shutdown => infinity,
-            type => worker,
-            modules => [emqx_resource_manager]
-        }
-    ],
-    SupFlags = #{strategy => simple_one_for_one, intensity => 10, period => 10},
+    ChildSpecs = [],
+    SupFlags = #{strategy => one_for_one, intensity => 10, period => 10},
     {ok, {SupFlags, ChildSpecs}}.
+
+child_spec(ResId, Group, ResourceType, Config, Opts) ->
+    #{
+        id => ResId,
+        start => {emqx_resource_manager, start_link, [ResId, Group, ResourceType, Config, Opts]},
+        restart => transient,
+        %% never force kill a resource manager.
+        %% becasue otherwise it may lead to release leak,
+        %% resource_manager's terminate callback calls resource on_stop
+        shutdown => infinity,
+        type => worker,
+        modules => [emqx_resource_manager]
+    }.

+ 1 - 1
changes/ee/feat-10778.en.md

@@ -1 +1 @@
-Refactored Pulsar Producer bridge to avoid leaking resources during crashes.
+Refactored Pulsar Producer bridge to avoid leaking resources during crashes at creation.

+ 1 - 0
changes/ee/feat-10813.en.md

@@ -0,0 +1 @@
+Refactored Kafka Producer and Consumer bridges to avoid leaking resources during crashes at creation.