فهرست منبع

fix(kafka consumer): ensure supervisor is started

Fixes https://emqx.atlassian.net/browse/EMQX-13619

When (re)starting a node with an existing and enabled Kafka Consumer source, crash logs
could be seen.

```
17:11:31.861 [warning] [reason: %{reason: {:noproc, {:gen_server, :call, [:emqx_bridge_sup, {:start_child, %{id: :emqx_bridge_kafka_consumer_sup, restart: :permanent, shutdown: :infinity, start: {:emqx_bridge_kafka_consumer_sup, :start_link, []}, type: :supervisor, modules: [:emqx_bridge_kafka_consumer_sup]}}, :infinity]}}, stacktrace: [{:gen_server, :call, 3, [file: ~c"gen_server.erl", line: 419]}, {:emqx_bridge_kafka_impl_consumer, :ensure_consumer_supervisor_started, 0, [file: ~c"src/emqx_bridge_kafka_impl_consumer.erl", line: 395]}, {:emqx_bridge_kafka_impl_consumer, :start_consumer, 5, [file: ~c"src/emqx_bridge_kafka_impl_consumer.erl", line: 427]}, {:emqx_bridge_kafka_impl_consumer, :on_add_channel, 4, [file: ~c"src/emqx_bridge_kafka_impl_consumer.erl", line: 254]}, {:emqx_resource, :call_add_channel, 5, [file: ~c"src/emqx_resource.erl", line: 565]}, {:emqx_resource_manager, :add_channels_in_list, 2, [file: ~c"src/emqx_resource_manager.erl", line: 872]}, {:emqx_resource_manager, :channels_health_check, 2, [file: ~c"src/emqx_resource_manager.erl", line: 1304]}, {:emqx_resource_manager, :continue_resource_health_check_not_connected, 2, [file: ~c"src/emqx_resource_manager.erl", line: 1235]}, {:gen_statem, :loop_state_callback, 11, [file: ~c"gen_statem.erl", line: 1397]}, {:proc_lib, :init_p_do_apply, 3, [file: ~c"proc_lib.erl", line: 241]}], exception: :exit}, msg: ~c"add_channel_failed", resource_id: "connector:kafka_consumer:a", channel_id: "source:kafka_consumer:b:connector:kafka_consumer:a"]
2024-12-03T17:11:31.861584+00:00 [warning] tag: CONNECTOR/KAFKA_CONSUMER, msg: add_channel_failed, reason: #{reason => {noproc,{gen_server,call,[emqx_bridge_sup,{start_child,#{id => emqx_bridge_kafka_consumer_sup,restart => permanent,shutdown => infinity,start => {emqx_bridge_kafka_consumer_sup,start_link,[]},type => supervisor,modules => [emqx_bridge_kafka_consumer_sup]}},infinity]}},stacktrace => [{gen_server,call,3,[{file,"gen_server.erl"},{line,419}]},{emqx_bridge_kafka_impl_consumer,ensure_consumer_supervisor_started,0,[{file,"src/emqx_bridge_kafka_impl_consumer.erl"},{line,395}]},{emqx_bridge_kafka_impl_consumer,start_consumer,5,[{file,"src/emqx_bridge_kafka_impl_consumer.erl"},{line,427}]},{emqx_bridge_kafka_impl_consumer,on_add_channel,4,[{file,"src/emqx_bridge_kafka_impl_consumer.erl"},{line,254}]},{emqx_resource,call_add_channel,5,[{file,"src/emqx_resource.erl"},{line,565}]},{emqx_resource_manager,add_channels_in_list,2,[{file,"src/emqx_resource_manager.erl"},{line,872}]},{emqx_resource_manager,channels_health_check,2,[{file,"src/emqx_resource_manager.erl"},{line,1304}]},{emqx_resource_manager,continue_resource_health_check_not_connected,2,[{file,"src/emqx_resource_manager.erl"},{line,1235}]},{gen_statem,loop_state_callback,11,[{file,"gen_statem.erl"},{line,1397}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,241}]}],exception => exit}, resource_id: <<"connector:kafka_consumer:a">>, channel_id: <<"source:kafka_consumer:b:connector:kafka_consumer:a">>
```
Thales Macedo Garitezi 1 سال پیش
والد
کامیت
c176fd1e2d

+ 4 - 1
apps/emqx_bridge_kafka/mix.exs

@@ -18,7 +18,10 @@ defmodule EMQXBridgeKafka.MixProject do
   end
 
   def application do
-    [extra_applications: UMP.extra_applications()]
+    [
+      extra_applications: UMP.extra_applications(),
+      mod: {:emqx_bridge_kafka_app, []}
+    ]
   end
 
   def deps() do

+ 3 - 2
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src

@@ -1,8 +1,8 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge_kafka, [
     {description, "EMQX Enterprise Kafka Bridge"},
-    {vsn, "0.5.1"},
-    {registered, [emqx_bridge_kafka_consumer_sup]},
+    {vsn, "0.5.2"},
+    {registered, [emqx_bridge_kafka_sup, emqx_bridge_kafka_consumer_sup]},
     {applications, [
         kernel,
         stdlib,
@@ -12,6 +12,7 @@
         brod,
         brod_gssapi
     ]},
+    {mod, {emqx_bridge_kafka_app, []}},
     {env, [
         {emqx_action_info_modules, [
             emqx_bridge_kafka_producer_action_info,

+ 25 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_app.erl

@@ -0,0 +1,25 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_kafka_app).
+
+-behaviour(application).
+
+%% `application' API
+-export([start/2, stop/1]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+%%------------------------------------------------------------------------------
+%% `application' API
+%%------------------------------------------------------------------------------
+
+-spec start(application:start_type(), term()) -> {ok, pid()}.
+start(_Type, _Args) ->
+    emqx_bridge_kafka_sup:start_link().
+
+-spec stop(term()) -> ok.
+stop(_State) ->
+    ok.

+ 0 - 21
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl

@@ -381,26 +381,6 @@ make_subscriber_id(BridgeName) ->
     BridgeNameBin = to_bin(BridgeName),
     <<"kafka_subscriber:", BridgeNameBin/binary>>.
 
-ensure_consumer_supervisor_started() ->
-    Mod = emqx_bridge_kafka_consumer_sup,
-    ChildSpec =
-        #{
-            id => Mod,
-            start => {Mod, start_link, []},
-            restart => permanent,
-            shutdown => infinity,
-            type => supervisor,
-            modules => [Mod]
-        },
-    case supervisor:start_child(emqx_bridge_sup, ChildSpec) of
-        {ok, _Pid} ->
-            ok;
-        {error, already_present} ->
-            ok;
-        {error, {already_started, _Pid}} ->
-            ok
-    end.
-
 -spec start_consumer(
     source_config(),
     connector_resource_id(),
@@ -424,7 +404,6 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID, ConnState) ->
             value_encoding_mode := ValueEncodingMode
         } = Params0
     } = Config,
-    ok = ensure_consumer_supervisor_started(),
     ?tp(kafka_consumer_sup_started, #{}),
     TopicMapping = ensure_topic_mapping(Params0),
     InitialState = #{

+ 46 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_sup.erl

@@ -0,0 +1,46 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_kafka_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% `supervisor' API
+-export([init/1]).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%%------------------------------------------------------------------------------
+%% `supervisor' API
+%%------------------------------------------------------------------------------
+
+init([]) ->
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 10,
+        period => 10
+    },
+    ConsumerSup = sup_spec(emqx_bridge_kafka_consumer_sup),
+    ChildSpecs = [ConsumerSup],
+    {ok, {SupFlags, ChildSpecs}}.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
+
+sup_spec(Mod) ->
+    #{
+        id => Mod,
+        start => {Mod, start_link, []},
+        restart => permanent,
+        shutdown => infinity,
+        type => supervisor
+    }.

تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است
+ 7 - 0
changes/ee/fix-14345.en.md