فهرست منبع

Merge pull request #14345 from thalesmg/20241204-r584-fix-kconsu-sup-tree

fix(kafka consumer): ensure supervisor is started
Thales Macedo Garitezi 1 سال پیش
والد
کامیت
90ef4a94ba

+ 4 - 1
apps/emqx_bridge_kafka/mix.exs

@@ -18,7 +18,10 @@ defmodule EMQXBridgeKafka.MixProject do
   end
 
   def application do
-    [extra_applications: UMP.extra_applications()]
+    [
+      extra_applications: UMP.extra_applications(),
+      mod: {:emqx_bridge_kafka_app, []}
+    ]
   end
 
   def deps() do

+ 3 - 2
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src

@@ -1,8 +1,8 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge_kafka, [
     {description, "EMQX Enterprise Kafka Bridge"},
-    {vsn, "0.5.1"},
-    {registered, [emqx_bridge_kafka_consumer_sup]},
+    {vsn, "0.5.2"},
+    {registered, [emqx_bridge_kafka_sup, emqx_bridge_kafka_consumer_sup]},
     {applications, [
         kernel,
         stdlib,
@@ -12,6 +12,7 @@
         brod,
         brod_gssapi
     ]},
+    {mod, {emqx_bridge_kafka_app, []}},
     {env, [
         {emqx_action_info_modules, [
             emqx_bridge_kafka_producer_action_info,

+ 25 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_app.erl

@@ -0,0 +1,25 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_kafka_app).
+
+-behaviour(application).
+
+%% `application' API
+-export([start/2, stop/1]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+%%------------------------------------------------------------------------------
+%% `application' API
+%%------------------------------------------------------------------------------
+
+-spec start(application:start_type(), term()) -> {ok, pid()}.
+start(_Type, _Args) ->
+    emqx_bridge_kafka_sup:start_link().
+
+-spec stop(term()) -> ok.
+stop(_State) ->
+    ok.

+ 0 - 21
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl

@@ -381,26 +381,6 @@ make_subscriber_id(BridgeName) ->
     BridgeNameBin = to_bin(BridgeName),
     <<"kafka_subscriber:", BridgeNameBin/binary>>.
 
-ensure_consumer_supervisor_started() ->
-    Mod = emqx_bridge_kafka_consumer_sup,
-    ChildSpec =
-        #{
-            id => Mod,
-            start => {Mod, start_link, []},
-            restart => permanent,
-            shutdown => infinity,
-            type => supervisor,
-            modules => [Mod]
-        },
-    case supervisor:start_child(emqx_bridge_sup, ChildSpec) of
-        {ok, _Pid} ->
-            ok;
-        {error, already_present} ->
-            ok;
-        {error, {already_started, _Pid}} ->
-            ok
-    end.
-
 -spec start_consumer(
     source_config(),
     connector_resource_id(),
@@ -424,7 +404,6 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID, ConnState) ->
             value_encoding_mode := ValueEncodingMode
         } = Params0
     } = Config,
-    ok = ensure_consumer_supervisor_started(),
     ?tp(kafka_consumer_sup_started, #{}),
     TopicMapping = ensure_topic_mapping(Params0),
     InitialState = #{

+ 46 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_sup.erl

@@ -0,0 +1,46 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_kafka_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% `supervisor' API
+-export([init/1]).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%%------------------------------------------------------------------------------
+%% `supervisor' API
+%%------------------------------------------------------------------------------
+
+init([]) ->
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 10,
+        period => 10
+    },
+    ConsumerSup = sup_spec(emqx_bridge_kafka_consumer_sup),
+    ChildSpecs = [ConsumerSup],
+    {ok, {SupFlags, ChildSpecs}}.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
+
+sup_spec(Mod) ->
+    #{
+        id => Mod,
+        start => {Mod, start_link, []},
+        restart => permanent,
+        shutdown => infinity,
+        type => supervisor
+    }.

تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است
+ 7 - 0
changes/ee/fix-14345.en.md