Browse Source

fix(aggregator): refactor supervision tree

Instead of using a aggregator supervisor with a fixed local name, we should allow
specifying different names so each action app will spawn its own aggregator supervisor.
Thales Macedo Garitezi 1 year ago
parent
commit
3a29696a48

+ 1 - 0
apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src

@@ -19,6 +19,7 @@
             emqx_bridge_s3_connector_info
         ]}
     ]},
+    {mod, {emqx_bridge_s3_app, []}},
     {modules, []},
     {links, []}
 ]}.

+ 2 - 2
apps/emqx_connector_aggregator/src/emqx_connector_aggreg_app.erl

@@ -1,7 +1,7 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_connector_aggreg_app).
+-module(emqx_bridge_s3_app).
 
 -behaviour(application).
 -export([start/2, stop/1]).
@@ -15,7 +15,7 @@
 %%------------------------------------------------------------------------------
 
 start(_StartType, _StartArgs) ->
-    emqx_connector_aggreg_sup:start_link().
+    emqx_bridge_s3_sup:start_link().
 
 stop(_State) ->
     ok.

+ 5 - 3
apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl

@@ -87,6 +87,8 @@
     channels := #{channel_id() => channel_state()}
 }.
 
+-define(AGGREG_SUP, emqx_bridge_s3_sup).
+
 %%
 
 -spec callback_mode() -> callback_mode().
@@ -224,8 +226,8 @@ start_channel(State, #{
         client_config => maps:get(client_config, State),
         uploader_config => maps:with([min_part_size, max_part_size], Parameters)
     },
-    _ = emqx_connector_aggreg_sup:delete_child(AggregId),
-    {ok, SupPid} = emqx_connector_aggreg_sup:start_child(#{
+    _ = ?AGGREG_SUP:delete_child(AggregId),
+    {ok, SupPid} = ?AGGREG_SUP:start_child(#{
         id => AggregId,
         start =>
             {emqx_connector_aggreg_upload_sup, start_link, [AggregId, AggregOpts, DeliveryOpts]},
@@ -238,7 +240,7 @@ start_channel(State, #{
         aggreg_id => AggregId,
         bucket => Bucket,
         supervisor => SupPid,
-        on_stop => fun() -> emqx_connector_aggreg_sup:delete_child(AggregId) end
+        on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end
     }.
 
 upload_options(Parameters) ->

+ 54 - 0
apps/emqx_bridge_s3/src/emqx_bridge_s3_sup.erl

@@ -0,0 +1,54 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_s3_sup).
+
+%% API
+-export([
+    start_link/0,
+    start_child/1,
+    delete_child/1
+]).
+
+%% `supervisor' API
+-export([init/1]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+start_child(ChildSpec) ->
+    supervisor:start_child(?MODULE, ChildSpec).
+
+delete_child(ChildId) ->
+    case supervisor:terminate_child(?MODULE, ChildId) of
+        ok ->
+            supervisor:delete_child(?MODULE, ChildId);
+        Error ->
+            Error
+    end.
+
+%%------------------------------------------------------------------------------
+%% `supervisor' API
+%%------------------------------------------------------------------------------
+
+init([]) ->
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 1,
+        period => 1
+    },
+    ChildSpecs = [],
+    {ok, {SupFlags, ChildSpecs}}.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------

+ 0 - 42
apps/emqx_connector_aggregator/src/emqx_connector_aggreg_sup.erl

@@ -1,42 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%--------------------------------------------------------------------
-
--module(emqx_connector_aggreg_sup).
-
--export([
-    start_link/0,
-    start_child/1,
-    delete_child/1
-]).
-
--behaviour(supervisor).
--export([init/1]).
-
--define(SUPREF, ?MODULE).
-
-%%
-
-start_link() ->
-    supervisor:start_link({local, ?SUPREF}, ?MODULE, root).
-
-start_child(ChildSpec) ->
-    supervisor:start_child(?SUPREF, ChildSpec).
-
-delete_child(ChildId) ->
-    case supervisor:terminate_child(?SUPREF, ChildId) of
-        ok ->
-            supervisor:delete_child(?SUPREF, ChildId);
-        Error ->
-            Error
-    end.
-
-%%
-
-init(root) ->
-    SupFlags = #{
-        strategy => one_for_one,
-        intensity => 1,
-        period => 1
-    },
-    {ok, {SupFlags, []}}.

+ 0 - 1
apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src

@@ -7,7 +7,6 @@
         stdlib
     ]},
     {env, []},
-    {mod, {emqx_connector_aggreg_app, []}},
     {modules, []},
     {links, []}
 ]}.