Просмотр исходного кода

fix(machine boot): serialize start/stop apps

Fixes https://emqx.atlassian.net/browse/EMQX-11702

The original issue was that a join request (`ekka:join/1`) was issued while the node was
still starting all its applications.  When joining a cluster, mria calls its `stop`
callback before stopping itself.  However, previous to this patch, that callback is empty
when the node has not yet started.  Hence, mria stopped itself while application were
being started.  When one of these attempted to create its tables, it crashed the whole
node:

```
{"Kernel pid terminated",application_controller,"{application_start_failure,emqx_machine,{{shutdown,{failed_to_start_child,emqx_machine_boot,{'EXIT',{{failed_to_start_app,emqx_gcp_device,{emqx_gcp_device,{bad_return,{{emqx_gcp_device_app,start,[normal,[]]},{'EXIT',{noproc,{gen_server,call,[mria_schema,{create_table,emqx_gcp_device,[...
```

To fix this, here we introduce a process to serialize starting/stopping all applications,
and also register the stop mria callback sooner.  This process starts all applications in
an uninterruptible fashion, meaning that the startup sequence must complete before it's
stopped by the join request.
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
38586a4c5e

+ 129 - 0
apps/emqx_machine/src/emqx_machine_app_booter.erl

@@ -0,0 +1,129 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_machine_app_booter).
+
+%% @doc This process serves as a serialization point for starting and stopping
+%% applications as part of the boot process or when joining a new cluster.  One motivation
+%% for this is that a join request might start while the node is still starting its list
+%% of applications (e.g. when booting the first time).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+%% API
+-export([
+    start_link/0,
+
+    start_apps/0,
+    stop_apps/0
+]).
+
+%% `gen_server' API
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2
+]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+%% calls/casts/infos
+-record(start_apps, {}).
+-record(stop_apps, {}).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+start_apps() ->
+    gen_server:call(?MODULE, #start_apps{}, infinity).
+
+stop_apps() ->
+    gen_server:call(?MODULE, #stop_apps{}, infinity).
+
+%%------------------------------------------------------------------------------
+%% `gen_server' API
+%%------------------------------------------------------------------------------
+
+init(_Opts) ->
+    %% Ensure that the stop callback is set, so that join requests concurrent to the
+    %% startup are serialized here.
+    %% It would still be problematic if a join request arrives before this process is
+    %% started, though.
+    ekka:callback(stop, fun emqx_machine_boot:stop_apps/0),
+    State = #{},
+    {ok, State}.
+
+handle_call(#start_apps{}, _From, State) ->
+    handle_start_apps(),
+    {reply, ok, State};
+handle_call(#stop_apps{}, _From, State) ->
+    handle_stop_apps(),
+    {reply, ok, State};
+handle_call(_Call, _From, State) ->
+    {reply, ignored, State}.
+
+handle_cast(_Cast, State) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
+
+handle_start_apps() ->
+    ?SLOG(notice, #{msg => "(re)starting_emqx_apps"}),
+    lists:foreach(fun start_one_app/1, emqx_machine_boot:sorted_reboot_apps()),
+    ?tp(emqx_machine_boot_apps_started, #{}).
+
+handle_stop_apps() ->
+    ?SLOG(notice, #{msg => "stopping_emqx_apps"}),
+    _ = emqx_alarm_handler:unload(),
+    ok = emqx_conf_app:unset_config_loaded(),
+    ok = emqx_plugins:ensure_stopped(),
+    lists:foreach(fun stop_one_app/1, lists:reverse(emqx_machine_boot:sorted_reboot_apps())).
+
+start_one_app(App) ->
+    ?SLOG(debug, #{msg => "starting_app", app => App}),
+    case application:ensure_all_started(App, emqx_machine_boot:restart_type(App)) of
+        {ok, Apps} ->
+            ?SLOG(debug, #{msg => "started_apps", apps => Apps});
+        {error, Reason} ->
+            ?SLOG(critical, #{msg => "failed_to_start_app", app => App, reason => Reason}),
+            error({failed_to_start_app, App, Reason})
+    end.
+
+stop_one_app(App) ->
+    ?SLOG(debug, #{msg => "stopping_app", app => App}),
+    try
+        _ = application:stop(App)
+    catch
+        C:E ->
+            ?SLOG(error, #{
+                msg => "failed_to_stop_app",
+                app => App,
+                exception => C,
+                reason => E
+            })
+    end.

+ 3 - 18
apps/emqx_machine/src/emqx_machine_boot.erl

@@ -24,6 +24,7 @@
 -export([start_autocluster/0]).
 -export([stop_port_apps/0]).
 -export([read_apps/0]).
+-export([restart_type/1]).
 
 -dialyzer({no_match, [basic_reboot_apps/0]}).
 
@@ -64,11 +65,7 @@ start_autocluster() ->
     ok.
 
 stop_apps() ->
-    ?SLOG(notice, #{msg => "stopping_emqx_apps"}),
-    _ = emqx_alarm_handler:unload(),
-    ok = emqx_conf_app:unset_config_loaded(),
-    ok = emqx_plugins:ensure_stopped(),
-    lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())).
+    emqx_machine_app_booter:stop_apps().
 
 %% Those port apps are terminated after the main apps
 %% Don't need to stop when reboot.
@@ -99,19 +96,7 @@ stop_one_app(App) ->
     end.
 
 ensure_apps_started() ->
-    ?SLOG(notice, #{msg => "(re)starting_emqx_apps"}),
-    lists:foreach(fun start_one_app/1, sorted_reboot_apps()),
-    ?tp(emqx_machine_boot_apps_started, #{}).
-
-start_one_app(App) ->
-    ?SLOG(debug, #{msg => "starting_app", app => App}),
-    case application:ensure_all_started(App, restart_type(App)) of
-        {ok, Apps} ->
-            ?SLOG(debug, #{msg => "started_apps", apps => Apps});
-        {error, Reason} ->
-            ?SLOG(critical, #{msg => "failed_to_start_app", app => App, reason => Reason}),
-            error({failed_to_start_app, App, Reason})
-    end.
+    emqx_machine_app_booter:start_apps().
 
 restart_type(App) ->
     PermanentApps =

+ 10 - 2
apps/emqx_machine/src/emqx_machine_sup.erl

@@ -29,10 +29,18 @@ start_link() ->
 
 init([]) ->
     Terminator = child_worker(emqx_machine_terminator, [], transient),
+    ReplicantHealthProbe = child_worker(emqx_machine_replicant_health_probe, [], transient),
+    %% Must start before `post_boot'.
+    Booter = child_worker(emqx_machine_app_booter, [], permanent),
     BootApps = child_worker(emqx_machine_boot, post_boot, [], temporary),
     GlobalGC = child_worker(emqx_global_gc, [], permanent),
-    ReplicantHealthProbe = child_worker(emqx_machine_replicant_health_probe, [], transient),
-    Children = [Terminator, ReplicantHealthProbe, BootApps, GlobalGC],
+    Children = [
+        Terminator,
+        ReplicantHealthProbe,
+        Booter,
+        BootApps,
+        GlobalGC
+    ],
     SupFlags = #{
         strategy => one_for_one,
         intensity => 100,

+ 8 - 0
apps/emqx_machine/test/emqx_machine_SUITE.erl

@@ -56,6 +56,7 @@ end_per_suite(Config) ->
 
 app_specs() ->
     [
+        emqx,
         emqx_conf,
         emqx_prometheus,
         emqx_modules,
@@ -113,8 +114,15 @@ t_shutdown_reboot(Config) ->
         [{machine_reboot_SUITE1, #{role => core, apps => app_specs()}}],
         #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
     ),
+    SortedApps = app_specs(),
     try
         erpc:call(Node, fun() ->
+            %% Since `emqx_cth_*' starts applications without going through
+            %% `emqx_machine', we need to start this manually.
+            {ok, _} = emqx_machine_app_booter:start_link(),
+            ok = meck:new(emqx_machine_boot, [passthrough]),
+            ok = meck:expect(emqx_machine_boot, sorted_reboot_apps, 0, SortedApps),
+
             true = emqx:is_running(node()),
             emqx_machine_boot:stop_apps(),
             false = emqx:is_running(node()),

+ 4 - 0
apps/emqx_plugins/test/emqx_plugins_SUITE.erl

@@ -914,6 +914,10 @@ t_start_node_with_plugin_enabled({init, Config}) ->
         emqx_conf,
         emqx_ctl,
         {emqx_plugins, #{
+            after_start => fun() ->
+                {ok, Pid} = emqx_machine_app_booter:start_link(),
+                unlink(Pid)
+            end,
             config =>
                 #{
                     plugins =>

+ 1 - 0
changes/ce/fix-14113.en.md

@@ -0,0 +1 @@
+Fixed a potential race condition where a node that is still starting could crash if it attempted to join a cluster.