Quellcode durchsuchen

Merge pull request #10778 from thalesmg/refactor-pulsar-on-stop-v50

feat(pulsar): ensure allocated resources are removed on failures (v5.0)
Thales Macedo Garitezi vor 2 Jahren
Ursprung
Commit
37061b484a

+ 26 - 9
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl

@@ -81,6 +81,7 @@ on_start(InstanceId, Config) ->
     } = Config,
     Servers = format_servers(Servers0),
     ClientId = make_client_id(InstanceId, BridgeName),
+    ok = emqx_resource:allocate_resource(InstanceId, pulsar_client_id, ClientId),
     SSLOpts = emqx_tls_lib:to_client_opts(SSL),
     ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)),
     ClientOpts = #{
@@ -116,15 +117,29 @@ on_start(InstanceId, Config) ->
     start_producer(Config, InstanceId, ClientId, ClientOpts).
 
 -spec on_stop(resource_id(), state()) -> ok.
-on_stop(_InstanceId, State) ->
-    #{
-        pulsar_client_id := ClientId,
-        producers := Producers
-    } = State,
-    stop_producers(ClientId, Producers),
-    stop_client(ClientId),
-    ?tp(pulsar_bridge_stopped, #{instance_id => _InstanceId}),
-    ok.
+on_stop(InstanceId, _State) ->
+    case emqx_resource:get_allocated_resources(InstanceId) of
+        #{pulsar_client_id := ClientId, pulsar_producers := Producers} ->
+            stop_producers(ClientId, Producers),
+            stop_client(ClientId),
+            ?tp(pulsar_bridge_stopped, #{
+                instance_id => InstanceId,
+                pulsar_client_id => ClientId,
+                pulsar_producers => Producers
+            }),
+            ok;
+        #{pulsar_client_id := ClientId} ->
+            stop_client(ClientId),
+            ?tp(pulsar_bridge_stopped, #{
+                instance_id => InstanceId,
+                pulsar_client_id => ClientId,
+                pulsar_producers => undefined
+            }),
+            ok;
+        _ ->
+            ?tp(pulsar_bridge_stopped, #{instance_id => InstanceId}),
+            ok
+    end.
 
 -spec on_get_status(resource_id(), state()) -> connected | disconnected.
 on_get_status(_InstanceId, State = #{}) ->
@@ -325,6 +340,8 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
     ?tp(pulsar_producer_about_to_start_producers, #{producer_name => ProducerName}),
     try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of
         {ok, Producers} ->
+            ok = emqx_resource:allocate_resource(InstanceId, pulsar_producers, Producers),
+            ?tp(pulsar_producer_producers_allocated, #{}),
             State = #{
                 pulsar_client_id => ClientId,
                 producers => Producers,

+ 97 - 10
apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl

@@ -43,7 +43,9 @@ only_once_tests() ->
         t_send_when_down,
         t_send_when_timeout,
         t_failure_to_start_producer,
-        t_producer_process_crash
+        t_producer_process_crash,
+        t_resource_manager_crash_after_producers_started,
+        t_resource_manager_crash_before_producers_started
     ].
 
 init_per_suite(Config) ->
@@ -429,7 +431,19 @@ wait_until_producer_connected() ->
     wait_until_connected(pulsar_producers_sup, pulsar_producer).
 
 wait_until_connected(SupMod, Mod) ->
-    Pids = [
+    Pids = get_pids(SupMod, Mod),
+    ?retry(
+        _Sleep = 300,
+        _Attempts0 = 20,
+        lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids)
+    ),
+    ok.
+
+get_pulsar_producers() ->
+    get_pids(pulsar_producers_sup, pulsar_producer).
+
+get_pids(SupMod, Mod) ->
+    [
         P
      || {_Name, SupPid, _Type, _Mods} <- supervisor:which_children(SupMod),
         P <- element(2, process_info(SupPid, links)),
@@ -437,13 +451,7 @@ wait_until_connected(SupMod, Mod) ->
             {Mod, init, _} -> true;
             _ -> false
         end
-    ],
-    ?retry(
-        _Sleep = 300,
-        _Attempts0 = 20,
-        lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids)
-    ),
-    ok.
+    ].
 
 create_rule_and_action_http(Config) ->
     PulsarName = ?config(pulsar_name, Config),
@@ -528,6 +536,18 @@ start_cluster(Cluster) ->
     end),
     Nodes.
 
+kill_resource_managers() ->
+    ct:pal("gonna kill resource managers"),
+    lists:foreach(
+        fun({_, Pid, _, _}) ->
+            ct:pal("terminating resource manager ~p", [Pid]),
+            %% sys:terminate(Pid, stop),
+            exit(Pid, kill),
+            ok
+        end,
+        supervisor:which_children(emqx_resource_manager_sup)
+    ).
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -921,7 +941,11 @@ t_producer_process_crash(Config) ->
                     ok
             after 1_000 -> ct:fail("pid didn't die")
             end,
-            ?assertEqual({ok, connecting}, emqx_resource_manager:health_check(ResourceId)),
+            ?retry(
+                _Sleep0 = 50,
+                _Attempts0 = 50,
+                ?assertEqual({ok, connecting}, emqx_resource_manager:health_check(ResourceId))
+            ),
             %% Should recover given enough time.
             ?retry(
                 _Sleep = 1_000,
@@ -952,6 +976,69 @@ t_producer_process_crash(Config) ->
     ),
     ok.
 
+t_resource_manager_crash_after_producers_started(Config) ->
+    ?check_trace(
+        begin
+            ?force_ordering(
+                #{?snk_kind := pulsar_producer_producers_allocated},
+                #{?snk_kind := will_kill_resource_manager}
+            ),
+            ?force_ordering(
+                #{?snk_kind := resource_manager_killed},
+                #{?snk_kind := pulsar_producer_bridge_started}
+            ),
+            spawn_link(fun() ->
+                ?tp(will_kill_resource_manager, #{}),
+                kill_resource_managers(),
+                ?tp(resource_manager_killed, #{}),
+                ok
+            end),
+            %% even if the resource manager is dead, we can still
+            %% clear the allocated resources.
+            {{error, {config_update_crashed, {killed, _}}}, {ok, _}} =
+                ?wait_async_action(
+                    create_bridge(Config),
+                    #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := Producers} when
+                        Producers =/= undefined,
+                    10_000
+                ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
+t_resource_manager_crash_before_producers_started(Config) ->
+    ?check_trace(
+        begin
+            ?force_ordering(
+                #{?snk_kind := pulsar_producer_capture_name},
+                #{?snk_kind := will_kill_resource_manager}
+            ),
+            ?force_ordering(
+                #{?snk_kind := resource_manager_killed},
+                #{?snk_kind := pulsar_producer_about_to_start_producers}
+            ),
+            spawn_link(fun() ->
+                ?tp(will_kill_resource_manager, #{}),
+                kill_resource_managers(),
+                ?tp(resource_manager_killed, #{}),
+                ok
+            end),
+            %% even if the resource manager is dead, we can still
+            %% clear the allocated resources.
+            {{error, {config_update_crashed, {killed, _}}}, {ok, _}} =
+                ?wait_async_action(
+                    create_bridge(Config),
+                    #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined},
+                    10_000
+                ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
 t_cluster(Config) ->
     MQTTTopic = ?config(mqtt_topic, Config),
     ResourceId = resource_id(Config),

+ 2 - 0
apps/emqx_resource/include/emqx_resource.hrl

@@ -121,3 +121,5 @@
 
 -define(TEST_ID_PREFIX, "_probe_:").
 -define(RES_METRICS, resource_metrics).
+
+-define(RESOURCE_ALLOCATION_TAB, emqx_resource_allocations).

+ 50 - 2
apps/emqx_resource/src/emqx_resource.erl

@@ -79,7 +79,13 @@
     query/2,
     query/3,
     %% query the instance without batching and queuing messages.
-    simple_sync_query/2
+    simple_sync_query/2,
+    %% functions used by connectors to register resources that must be
+    %% freed when stopping or even when a resource manager crashes.
+    allocate_resource/3,
+    has_allocated_resources/1,
+    get_allocated_resources/1,
+    forget_allocated_resources/1
 ]).
 
 %% Direct calls to the callback module
@@ -372,6 +378,9 @@ is_buffer_supported(Module) ->
     {ok, resource_state()} | {error, Reason :: term()}.
 call_start(ResId, Mod, Config) ->
     try
+        %% If the previous manager process crashed without cleaning up
+        %% allocated resources, clean them up.
+        clean_allocated_resources(ResId, Mod),
         Mod:on_start(ResId, Config)
     catch
         throw:Error ->
@@ -390,7 +399,16 @@ call_health_check(ResId, Mod, ResourceState) ->
 
 -spec call_stop(resource_id(), module(), resource_state()) -> term().
 call_stop(ResId, Mod, ResourceState) ->
-    ?SAFE_CALL(Mod:on_stop(ResId, ResourceState)).
+    ?SAFE_CALL(begin
+        Res = Mod:on_stop(ResId, ResourceState),
+        case Res of
+            ok ->
+                emqx_resource:forget_allocated_resources(ResId);
+            _ ->
+                ok
+        end,
+        Res
+    end).
 
 -spec check_config(resource_type(), raw_resource_config()) ->
     {ok, resource_config()} | {error, term()}.
@@ -486,7 +504,37 @@ apply_reply_fun({F, A}, Result) when is_function(F) ->
 apply_reply_fun(From, Result) ->
     gen_server:reply(From, Result).
 
+-spec allocate_resource(resource_id(), any(), term()) -> ok.
+allocate_resource(InstanceId, Key, Value) ->
+    true = ets:insert(?RESOURCE_ALLOCATION_TAB, {InstanceId, Key, Value}),
+    ok.
+
+-spec has_allocated_resources(resource_id()) -> boolean().
+has_allocated_resources(InstanceId) ->
+    ets:member(?RESOURCE_ALLOCATION_TAB, InstanceId).
+
+-spec get_allocated_resources(resource_id()) -> map().
+get_allocated_resources(InstanceId) ->
+    Objects = ets:lookup(?RESOURCE_ALLOCATION_TAB, InstanceId),
+    maps:from_list([{K, V} || {_InstanceId, K, V} <- Objects]).
+
+-spec forget_allocated_resources(resource_id()) -> ok.
+forget_allocated_resources(InstanceId) ->
+    true = ets:delete(?RESOURCE_ALLOCATION_TAB, InstanceId),
+    ok.
+
 %% =================================================================================
 
 filter_instances(Filter) ->
     [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
+
+clean_allocated_resources(ResourceId, ResourceMod) ->
+    case emqx_resource:has_allocated_resources(ResourceId) of
+        true ->
+            %% The resource entries in the ETS table are erased inside
+            %% `call_stop' if the call is successful.
+            ok = emqx_resource:call_stop(ResourceId, ResourceMod, _ResourceState = undefined),
+            ok;
+        false ->
+            ok
+    end.

+ 3 - 1
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -500,8 +500,10 @@ stop_resource(#data{state = ResState, id = ResId} = Data) ->
     %% We don't care the return value of the Mod:on_stop/2.
     %% The callback mod should make sure the resource is stopped after on_stop/2
     %% is returned.
-    case ResState /= undefined of
+    HasAllocatedResources = emqx_resource:has_allocated_resources(ResId),
+    case ResState =/= undefined orelse HasAllocatedResources of
         true ->
+            %% we clear the allocated resources after stop is successful
             emqx_resource:call_stop(Data#data.id, Data#data.mod, ResState);
         false ->
             ok

+ 8 - 0
apps/emqx_resource/src/emqx_resource_manager_sup.erl

@@ -17,6 +17,8 @@
 
 -behaviour(supervisor).
 
+-include("emqx_resource.hrl").
+
 -export([ensure_child/5, delete_child/1]).
 
 -export([start_link/0]).
@@ -36,6 +38,12 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
+    %% Maps resource_id() to one or more allocated resources.
+    emqx_utils_ets:new(?RESOURCE_ALLOCATION_TAB, [
+        bag,
+        public,
+        {read_concurrency, true}
+    ]),
     ChildSpecs = [
         #{
             id => emqx_resource_manager,

+ 1 - 0
changes/ee/feat-10778.en.md

@@ -0,0 +1 @@
+Refactored Pulsar Producer bridge to avoid leaking resources during crashes.