Просмотр исходного кода

refactor(machine boot): avoid cyclic module dependency

Thales Macedo Garitezi 1 год назад
Родитель
Сommit
47c1b8c2e1

+ 1 - 1
apps/emqx/test/emqx_common_test_helpers.erl

@@ -908,7 +908,7 @@ setup_node(Node, Opts) when is_map(Opts) ->
                     %% setting extra apps to be restarted after
                     %% setting extra apps to be restarted after
                     %% joining.
                     %% joining.
                     set_envs(Node, Env),
                     set_envs(Node, Env),
-                    ok = erpc:call(Node, emqx_machine_app_booter, start_autocluster, [])
+                    ok = erpc:call(Node, emqx_machine_boot, start_autocluster, [])
                 end,
                 end,
             case rpc:call(Node, ekka, join, [JoinTo]) of
             case rpc:call(Node, ekka, join, [JoinTo]) of
                 ok ->
                 ok ->

+ 0 - 161
apps/emqx_machine/src/emqx_machine_app_booter.erl

@@ -1,161 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
--module(emqx_machine_app_booter).
-
-%% @doc This process serves as a serialization point for starting and stopping
-%% applications as part of the boot process or when joining a new cluster.  One motivation
-%% for this is that a join request might start while the node is still starting its list
-%% of applications (e.g. when booting the first time).
-
--include_lib("emqx/include/logger.hrl").
--include_lib("snabbkaffe/include/snabbkaffe.hrl").
-
-%% API
--export([
-    start_link/0,
-
-    start_apps/0,
-    stop_apps/0
-]).
-
-%% `gen_server' API
--export([
-    init/1,
-    handle_call/3,
-    handle_cast/2,
-    handle_info/2
-]).
-
-%% Internal exports
--export([start_autocluster/0]).
-%% Internal exports (for `emqx_machine_terminator' only)
--export([do_stop_apps/0]).
-
-%%------------------------------------------------------------------------------
-%% Type declarations
-%%------------------------------------------------------------------------------
-
-%% calls/casts/infos
--record(start_apps, {}).
--record(stop_apps, {}).
-
-%%------------------------------------------------------------------------------
-%% API
-%%------------------------------------------------------------------------------
-
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-start_apps() ->
-    gen_server:call(?MODULE, #start_apps{}, infinity).
-
-stop_apps() ->
-    gen_server:call(?MODULE, #stop_apps{}, infinity).
-
-%%------------------------------------------------------------------------------
-%% `gen_server' API
-%%------------------------------------------------------------------------------
-
-init(_Opts) ->
-    %% Ensure that the stop callback is set, so that join requests concurrent to the
-    %% startup are serialized here.
-    %% It would still be problematic if a join request arrives before this process is
-    %% started, though.
-    ekka:callback(stop, fun emqx_machine_boot:stop_apps/0),
-    do_start_apps(),
-    ok = print_vsn(),
-    ok = start_autocluster(),
-    State = #{},
-    {ok, State}.
-
-handle_call(#start_apps{}, _From, State) ->
-    do_start_apps(),
-    {reply, ok, State};
-handle_call(#stop_apps{}, _From, State) ->
-    do_stop_apps(),
-    {reply, ok, State};
-handle_call(_Call, _From, State) ->
-    {reply, ignored, State}.
-
-handle_cast(_Cast, State) ->
-    {noreply, State}.
-
-handle_info(_Info, State) ->
-    {noreply, State}.
-
-%%------------------------------------------------------------------------------
-%% Internal exports
-%%------------------------------------------------------------------------------
-
-%% Callers who wish to stop applications should not call this directly, and instead use
-%% `stop_apps/0`.  The only exception is `emqx_machine_terminator': it should call this
-%% block directly in its try-catch block, without the possibility of crashing this
-%% process.
-do_stop_apps() ->
-    ?SLOG(notice, #{msg => "stopping_emqx_apps"}),
-    _ = emqx_alarm_handler:unload(),
-    ok = emqx_conf_app:unset_config_loaded(),
-    ok = emqx_plugins:ensure_stopped(),
-    lists:foreach(fun stop_one_app/1, lists:reverse(emqx_machine_boot:sorted_reboot_apps())).
-
-start_autocluster() ->
-    ekka:callback(stop, fun emqx_machine_boot:stop_apps/0),
-    ekka:callback(start, fun emqx_machine_boot:ensure_apps_started/0),
-    %% returns 'ok' or a pid or 'any()' as in spec
-    _ = ekka:autocluster(emqx),
-    ok.
-
-%%------------------------------------------------------------------------------
-%% Internal fns
-%%------------------------------------------------------------------------------
-
--ifdef(TEST).
-print_vsn() -> ok.
-% TEST
--else.
-print_vsn() ->
-    ?ULOG("~ts ~ts is running now!~n", [emqx_app:get_description(), emqx_app:get_release()]).
-% TEST
--endif.
-
-do_start_apps() ->
-    ?SLOG(notice, #{msg => "(re)starting_emqx_apps"}),
-    lists:foreach(fun start_one_app/1, emqx_machine_boot:sorted_reboot_apps()),
-    ?tp(emqx_machine_boot_apps_started, #{}).
-
-start_one_app(App) ->
-    ?SLOG(debug, #{msg => "starting_app", app => App}),
-    case application:ensure_all_started(App, emqx_machine_boot:restart_type(App)) of
-        {ok, Apps} ->
-            ?SLOG(debug, #{msg => "started_apps", apps => Apps});
-        {error, Reason} ->
-            ?SLOG(critical, #{msg => "failed_to_start_app", app => App, reason => Reason}),
-            error({failed_to_start_app, App, Reason})
-    end.
-
-stop_one_app(App) ->
-    ?SLOG(debug, #{msg => "stopping_app", app => App}),
-    try
-        _ = application:stop(App)
-    catch
-        C:E ->
-            ?SLOG(error, #{
-                msg => "failed_to_stop_app",
-                app => App,
-                exception => C,
-                reason => E
-            })
-    end.

+ 145 - 34
apps/emqx_machine/src/emqx_machine_boot.erl

@@ -18,18 +18,37 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 
--export([stop_apps/0, ensure_apps_started/0]).
--export([sorted_reboot_apps/0]).
--export([stop_port_apps/0]).
--export([read_apps/0]).
--export([restart_type/1]).
+%% API
+-export([
+    start_link/0,
+
+    stop_apps/0,
+    ensure_apps_started/0,
+    sorted_reboot_apps/0,
+    stop_port_apps/0
+]).
+
+%% `gen_server' API
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2
+]).
+
+%% Internal exports (for `emqx_machine_terminator' only)
+-export([do_stop_apps/0]).
 
 
 -dialyzer({no_match, [basic_reboot_apps/0]}).
 -dialyzer({no_match, [basic_reboot_apps/0]}).
 
 
 -ifdef(TEST).
 -ifdef(TEST).
--export([sorted_reboot_apps/1, reboot_apps/0]).
+-export([read_apps/0, sorted_reboot_apps/1, reboot_apps/0, start_autocluster/0]).
 -endif.
 -endif.
 
 
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
 %% These apps are always (re)started by emqx_machine:
 %% These apps are always (re)started by emqx_machine:
 -define(BASIC_REBOOT_APPS, [gproc, esockd, ranch, cowboy, emqx_durable_storage, emqx]).
 -define(BASIC_REBOOT_APPS, [gproc, esockd, ranch, cowboy, emqx_durable_storage, emqx]).
 
 
@@ -40,8 +59,19 @@
 %% release, depending on the build flags:
 %% release, depending on the build flags:
 -define(OPTIONAL_APPS, [bcrypt, observer]).
 -define(OPTIONAL_APPS, [bcrypt, observer]).
 
 
+%% calls/casts/infos
+-record(start_apps, {}).
+-record(stop_apps, {}).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
 stop_apps() ->
 stop_apps() ->
-    emqx_machine_app_booter:stop_apps().
+    gen_server:call(?MODULE, #stop_apps{}, infinity).
 
 
 %% Those port apps are terminated after the main apps
 %% Those port apps are terminated after the main apps
 %% Don't need to stop when reboot.
 %% Don't need to stop when reboot.
@@ -57,22 +87,60 @@ stop_port_apps() ->
         [os_mon, jq]
         [os_mon, jq]
     ).
     ).
 
 
-stop_one_app(App) ->
-    ?SLOG(debug, #{msg => "stopping_app", app => App}),
-    try
-        _ = application:stop(App)
-    catch
-        C:E ->
-            ?SLOG(error, #{
-                msg => "failed_to_stop_app",
-                app => App,
-                exception => C,
-                reason => E
-            })
-    end.
-
 ensure_apps_started() ->
 ensure_apps_started() ->
-    emqx_machine_app_booter:start_apps().
+    gen_server:call(?MODULE, #start_apps{}, infinity).
+
+sorted_reboot_apps() ->
+    RebootApps = reboot_apps(),
+    Apps0 = [{App, app_deps(App, RebootApps)} || App <- RebootApps],
+    Apps = emqx_machine_boot_runtime_deps:inject(Apps0, runtime_deps()),
+    sorted_reboot_apps(Apps).
+
+%%------------------------------------------------------------------------------
+%% `gen_server' API
+%%------------------------------------------------------------------------------
+
+init(_Opts) ->
+    %% Ensure that the stop callback is set, so that join requests concurrent to the
+    %% startup are serialized here.
+    %% It would still be problematic if a join request arrives before this process is
+    %% started, though.
+    ekka:callback(stop, fun ?MODULE:stop_apps/0),
+    do_start_apps(),
+    ok = print_vsn(),
+    ok = start_autocluster(),
+    State = #{},
+    {ok, State}.
+
+handle_call(#start_apps{}, _From, State) ->
+    do_start_apps(),
+    {reply, ok, State};
+handle_call(#stop_apps{}, _From, State) ->
+    do_stop_apps(),
+    {reply, ok, State};
+handle_call(_Call, _From, State) ->
+    {reply, ignored, State}.
+
+handle_cast(_Cast, State) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%------------------------------------------------------------------------------
+%% Internal exports
+%%------------------------------------------------------------------------------
+
+%% Callers who wish to stop applications should not call this directly, and instead use
+%% `stop_apps/0`.  The only exception is `emqx_machine_terminator': it should call this
+%% block directly in its try-catch block, without the possibility of crashing this
+%% process.
+do_stop_apps() ->
+    ?SLOG(notice, #{msg => "stopping_emqx_apps"}),
+    _ = emqx_alarm_handler:unload(),
+    ok = emqx_conf_app:unset_config_loaded(),
+    ok = emqx_plugins:ensure_stopped(),
+    lists:foreach(fun stop_one_app/1, lists:reverse(?MODULE:sorted_reboot_apps())).
 
 
 restart_type(App) ->
 restart_type(App) ->
     PermanentApps =
     PermanentApps =
@@ -84,6 +152,26 @@ restart_type(App) ->
             temporary
             temporary
     end.
     end.
 
 
+start_autocluster() ->
+    ekka:callback(stop, fun ?MODULE:stop_apps/0),
+    ekka:callback(start, fun ?MODULE:ensure_apps_started/0),
+    %% returns 'ok' or a pid or 'any()' as in spec
+    _ = ekka:autocluster(emqx),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
+
+-ifdef(TEST).
+print_vsn() -> ok.
+% TEST
+-else.
+print_vsn() ->
+    ?ULOG("~ts ~ts is running now!~n", [emqx_app:get_description(), emqx_app:get_release()]).
+% TEST
+-endif.
+
 %% list of app names which should be rebooted when:
 %% list of app names which should be rebooted when:
 %% 1. due to static config change
 %% 1. due to static config change
 %% 2. after join a cluster
 %% 2. after join a cluster
@@ -110,13 +198,6 @@ basic_reboot_apps() ->
     BusinessApps = CommonBusinessApps ++ EditionSpecificApps,
     BusinessApps = CommonBusinessApps ++ EditionSpecificApps,
     ?BASIC_REBOOT_APPS ++ (BusinessApps -- excluded_apps()).
     ?BASIC_REBOOT_APPS ++ (BusinessApps -- excluded_apps()).
 
 
-%% @doc Read business apps belonging to the current profile/edition.
-read_apps() ->
-    PrivDir = code:priv_dir(emqx_machine),
-    RebootListPath = filename:join([PrivDir, "reboot_lists.eterm"]),
-    {ok, [Apps]} = file:consult(RebootListPath),
-    Apps.
-
 excluded_apps() ->
 excluded_apps() ->
     %% Optional apps _should_ be (re)started automatically, but only
     %% Optional apps _should_ be (re)started automatically, but only
     %% when they are found in the release:
     %% when they are found in the release:
@@ -129,11 +210,34 @@ is_app(Name) ->
         _ -> false
         _ -> false
     end.
     end.
 
 
-sorted_reboot_apps() ->
-    RebootApps = reboot_apps(),
-    Apps0 = [{App, app_deps(App, RebootApps)} || App <- RebootApps],
-    Apps = emqx_machine_boot_runtime_deps:inject(Apps0, runtime_deps()),
-    sorted_reboot_apps(Apps).
+do_start_apps() ->
+    ?SLOG(notice, #{msg => "(re)starting_emqx_apps"}),
+    lists:foreach(fun start_one_app/1, ?MODULE:sorted_reboot_apps()),
+    ?tp(emqx_machine_boot_apps_started, #{}).
+
+start_one_app(App) ->
+    ?SLOG(debug, #{msg => "starting_app", app => App}),
+    case application:ensure_all_started(App, restart_type(App)) of
+        {ok, Apps} ->
+            ?SLOG(debug, #{msg => "started_apps", apps => Apps});
+        {error, Reason} ->
+            ?SLOG(critical, #{msg => "failed_to_start_app", app => App, reason => Reason}),
+            error({failed_to_start_app, App, Reason})
+    end.
+
+stop_one_app(App) ->
+    ?SLOG(debug, #{msg => "stopping_app", app => App}),
+    try
+        _ = application:stop(App)
+    catch
+        C:E ->
+            ?SLOG(error, #{
+                msg => "failed_to_stop_app",
+                app => App,
+                exception => C,
+                reason => E
+            })
+    end.
 
 
 app_deps(App, RebootApps) ->
 app_deps(App, RebootApps) ->
     case application:get_key(App, applications) of
     case application:get_key(App, applications) of
@@ -141,6 +245,13 @@ app_deps(App, RebootApps) ->
         {ok, List} -> lists:filter(fun(A) -> lists:member(A, RebootApps) end, List)
         {ok, List} -> lists:filter(fun(A) -> lists:member(A, RebootApps) end, List)
     end.
     end.
 
 
+%% @doc Read business apps belonging to the current profile/edition.
+read_apps() ->
+    PrivDir = code:priv_dir(emqx_machine),
+    RebootListPath = filename:join([PrivDir, "reboot_lists.eterm"]),
+    {ok, [Apps]} = file:consult(RebootListPath),
+    Apps.
+
 runtime_deps() ->
 runtime_deps() ->
     [
     [
         %% `emqx_bridge' is special in that it needs all the bridges apps to
         %% `emqx_bridge' is special in that it needs all the bridges apps to

+ 2 - 2
apps/emqx_machine/src/emqx_machine_sup.erl

@@ -29,9 +29,9 @@ start_link() ->
 
 
 init([]) ->
 init([]) ->
     Terminator = child_worker(emqx_machine_terminator, [], transient),
     Terminator = child_worker(emqx_machine_terminator, [], transient),
-    %% Must start before `app_booter'.
+    %% Must start before `emqx_machine_boot'.
     ReplicantHealthProbe = child_worker(emqx_machine_replicant_health_probe, [], transient),
     ReplicantHealthProbe = child_worker(emqx_machine_replicant_health_probe, [], transient),
-    Booter = child_worker(emqx_machine_app_booter, [], permanent),
+    Booter = child_worker(emqx_machine_boot, [], permanent),
     GlobalGC = child_worker(emqx_global_gc, [], permanent),
     GlobalGC = child_worker(emqx_global_gc, [], permanent),
     Children = [
     Children = [
         Terminator,
         Terminator,

+ 1 - 1
apps/emqx_machine/src/emqx_machine_terminator.erl

@@ -96,7 +96,7 @@ handle_call(?DO_IT, _From, State) ->
     try
     try
         %% stop port apps before stopping other apps.
         %% stop port apps before stopping other apps.
         emqx_machine_boot:stop_port_apps(),
         emqx_machine_boot:stop_port_apps(),
-        emqx_machine_app_booter:do_stop_apps()
+        emqx_machine_boot:do_stop_apps()
     catch
     catch
         C:E:St ->
         C:E:St ->
             Apps = [element(1, A) || A <- application:which_applications()],
             Apps = [element(1, A) || A <- application:which_applications()],

+ 1 - 1
apps/emqx_machine/test/emqx_machine_SUITE.erl

@@ -119,7 +119,7 @@ t_shutdown_reboot(Config) ->
         erpc:call(Node, fun() ->
         erpc:call(Node, fun() ->
             %% Since `emqx_cth_*' starts applications without going through
             %% Since `emqx_cth_*' starts applications without going through
             %% `emqx_machine', we need to start this manually.
             %% `emqx_machine', we need to start this manually.
-            {ok, _} = emqx_machine_app_booter:start_link(),
+            {ok, _} = emqx_machine_boot:start_link(),
             ok = meck:new(emqx_machine_boot, [passthrough]),
             ok = meck:new(emqx_machine_boot, [passthrough]),
             ok = meck:expect(emqx_machine_boot, sorted_reboot_apps, 0, SortedApps),
             ok = meck:expect(emqx_machine_boot, sorted_reboot_apps, 0, SortedApps),
 
 

+ 2 - 2
apps/emqx_plugins/test/emqx_plugins_SUITE.erl

@@ -915,7 +915,7 @@ t_start_node_with_plugin_enabled({init, Config}) ->
         emqx_ctl,
         emqx_ctl,
         {emqx_plugins, #{
         {emqx_plugins, #{
             after_start => fun() ->
             after_start => fun() ->
-                {ok, Pid} = emqx_machine_app_booter:start_link(),
+                {ok, Pid} = emqx_machine_boot:start_link(),
                 unlink(Pid)
                 unlink(Pid)
             end,
             end,
             config =>
             config =>
@@ -978,7 +978,7 @@ t_start_node_with_plugin_enabled(Config) when is_list(Config) ->
             %% order, and also we need to override the config loader to emulate what
             %% order, and also we need to override the config loader to emulate what
             %% `emqx_cth_cluster' does and avoid the node crashing due to lack of config
             %% `emqx_cth_cluster' does and avoid the node crashing due to lack of config
             %% keys.
             %% keys.
-            ok = ?ON(N2, emqx_machine_app_booter:start_autocluster()),
+            ok = ?ON(N2, emqx_machine_boot:start_autocluster()),
             ?ON(N2, begin
             ?ON(N2, begin
                 StartCallback0 =
                 StartCallback0 =
                     case ekka:env({callback, start}) of
                     case ekka:env({callback, start}) of