Просмотр исходного кода

Merge pull request #14113 from thalesmg/20241030-m-fix-join-while-starting

fix(machine boot): serialize start/stop apps
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
c55fd028ba

+ 0 - 106
apps/emqx/test/emqx_common_test_helpers.erl

@@ -72,8 +72,6 @@
 ]).
 
 -export([
-    emqx_cluster/1,
-    emqx_cluster/2,
     start_ekka/0,
     start_epmd/0,
     start_peer/2,
@@ -685,9 +683,6 @@ ensure_quic_listener(Name, UdpPort, ExtraSettings) ->
 %% Clusterisation and multi-node testing
 %%
 
--type cluster_spec() :: [node_spec()].
--type node_spec() :: role() | {role(), shortname()} | {role(), shortname(), node_opts()}.
--type role() :: core | replicant.
 -type shortname() :: atom().
 -type nodename() :: atom().
 -type node_opts() :: #{
@@ -722,58 +717,6 @@ ensure_quic_listener(Name, UdpPort, ExtraSettings) ->
     listener_ports => [{Type :: tcp | ssl | ws | wss, inet:port_number()}]
 }.
 
--spec emqx_cluster(cluster_spec()) -> [{shortname(), node_opts()}].
-emqx_cluster(Specs) ->
-    emqx_cluster(Specs, #{}).
-
--spec emqx_cluster(cluster_spec(), node_opts()) -> [{shortname(), node_opts()}].
-emqx_cluster(Specs, CommonOpts) when is_list(CommonOpts) ->
-    emqx_cluster(Specs, maps:from_list(CommonOpts));
-emqx_cluster(Specs0, CommonOpts) ->
-    Specs1 = lists:zip(Specs0, lists:seq(1, length(Specs0))),
-    Specs = expand_node_specs(Specs1, CommonOpts),
-    %% Assign grpc ports
-    GenRpcPorts = maps:from_list([
-        {node_name(Name), {tcp, gen_rpc_port(base_port(Num))}}
-     || {{_, Name, _}, Num} <- Specs
-    ]),
-    %% Set the default node of the cluster:
-    CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs],
-    JoinTo =
-        case CoreNodes of
-            [First | _] -> First;
-            _ -> undefined
-        end,
-    NodeOpts = fun(Number) ->
-        #{
-            base_port => base_port(Number),
-            env => [
-                {mria, core_nodes, CoreNodes},
-                {gen_rpc, client_config_per_node, {internal, GenRpcPorts}}
-            ]
-        }
-    end,
-    RoleOpts = fun
-        (core) ->
-            #{
-                join_to => JoinTo,
-                env => [
-                    {mria, node_role, core}
-                ]
-            };
-        (replicant) ->
-            #{
-                env => [
-                    {mria, node_role, replicant},
-                    {ekka, cluster_discovery, {static, [{seeds, CoreNodes}]}}
-                ]
-            }
-    end,
-    [
-        {Name, merge_opts(merge_opts(NodeOpts(Number), RoleOpts(Role)), Opts)}
-     || {{Role, Name, Opts}, Number} <- Specs
-    ].
-
 %% Lower level starting API
 
 -spec start_peer(shortname(), node_opts()) -> nodename().
@@ -999,26 +942,10 @@ node_name(Name) ->
             list_to_atom(atom_to_list(Name) ++ "@" ++ host())
     end.
 
-gen_node_name(Num) ->
-    list_to_atom("autocluster_node" ++ integer_to_list(Num)).
-
 host() ->
     [_, Host] = string:tokens(atom_to_list(node()), "@"),
     Host.
 
-merge_opts(Opts1, Opts2) ->
-    maps:merge_with(
-        fun
-            (env, Env1, Env2) -> lists:usort(Env2 ++ Env1);
-            (conf, Conf1, Conf2) -> lists:usort(Conf2 ++ Conf1);
-            (apps, Apps1, Apps2) -> lists:usort(Apps2 ++ Apps1);
-            (load_apps, Apps1, Apps2) -> lists:usort(Apps2 ++ Apps1);
-            (_Option, _Old, Value) -> Value
-        end,
-        Opts1,
-        Opts2
-    ).
-
 set_envs(Node, Env) ->
     lists:foreach(
         fun({Application, Key, Value}) ->
@@ -1040,9 +967,6 @@ is_lib(Path) ->
 
 %% Ports
 
-base_port(Number) ->
-    10000 + Number * 100.
-
 gen_rpc_port(BasePort) ->
     BasePort - 1.
 
@@ -1060,36 +984,6 @@ listener_port(BasePort, ws) ->
 listener_port(BasePort, wss) ->
     BasePort + 4.
 
-%% Autocluster helpers
-
-expand_node_specs(Specs, CommonOpts) ->
-    lists:map(
-        fun({Spec, Num}) ->
-            {
-                case Spec of
-                    core ->
-                        {core, gen_node_name(Num), CommonOpts};
-                    replicant ->
-                        {replicant, gen_node_name(Num), CommonOpts};
-                    {Role, Name} when is_atom(Name) ->
-                        {Role, Name, CommonOpts};
-                    {Role, Opts} when is_list(Opts) ->
-                        Opts1 = maps:from_list(Opts),
-                        {Role, gen_node_name(Num), merge_opts(CommonOpts, Opts1)};
-                    {Role, Name, Opts} when is_list(Opts) ->
-                        Opts1 = maps:from_list(Opts),
-                        {Role, Name, merge_opts(CommonOpts, Opts1)};
-                    {Role, Opts} ->
-                        {Role, gen_node_name(Num), merge_opts(CommonOpts, Opts)};
-                    {Role, Name, Opts} ->
-                        {Role, Name, merge_opts(CommonOpts, Opts)}
-                end,
-                Num
-            }
-        end,
-        Specs
-    ).
-
 %% Useful when iterating on the tests in a loop, to get rid of all the garbaged printed
 %% before the test itself beings.
 %% Only actually does anything if the environment variable `CLEAR_SCREEN' is set to `true'

+ 15 - 64
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl

@@ -589,65 +589,6 @@ projection_optional_span(Trace) ->
      || #{?snk_kind := K} = Evt <- Trace
     ].
 
-cluster(Config) ->
-    PrivDataDir = ?config(priv_dir, Config),
-    Cluster = emqx_common_test_helpers:emqx_cluster(
-        [core, core],
-        [
-            {apps, [emqx_conf, emqx_rule_engine, emqx_bridge_gcp_pubsub, emqx_bridge]},
-            {listener_ports, []},
-            {priv_data_dir, PrivDataDir},
-            {load_schema, true},
-            {start_autocluster, true},
-            {schema_mod, emqx_enterprise_schema},
-            {env_handler, fun
-                (emqx) ->
-                    application:set_env(emqx, boot_modules, [broker]),
-                    ok;
-                (emqx_conf) ->
-                    ok;
-                (_) ->
-                    ok
-            end}
-        ]
-    ),
-    ct:pal("cluster: ~p", [Cluster]),
-    Cluster.
-
-start_cluster(Cluster) ->
-    Nodes = lists:map(
-        fun({Name, Opts}) ->
-            ct:pal("starting ~p", [Name]),
-            emqx_common_test_helpers:start_peer(Name, Opts)
-        end,
-        Cluster
-    ),
-    NumNodes = length(Nodes),
-    on_exit(fun() ->
-        emqx_utils:pmap(
-            fun(N) ->
-                ct:pal("stopping ~p", [N]),
-                emqx_common_test_helpers:stop_peer(N)
-            end,
-            Nodes
-        )
-    end),
-    {ok, _} = snabbkaffe:block_until(
-        %% -1 because only those that join the first node will emit the event.
-        ?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}),
-        30_000
-    ),
-    Nodes.
-
-wait_for_cluster_rpc(Node) ->
-    %% need to wait until the config handler is ready after
-    %% restarting during the cluster join.
-    ?retry(
-        _Sleep0 = 100,
-        _Attempts0 = 50,
-        true = is_pid(erpc:call(Node, erlang, whereis, [emqx_config_handler]))
-    ).
-
 setup_and_start_listeners(Node, NodeOpts) ->
     erpc:call(
         Node,
@@ -686,6 +627,10 @@ dedup(_X, [Y | Rest]) ->
 dedup(_X, []) ->
     [].
 
+get_mqtt_port(Node) ->
+    {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
+    Port.
+
 %%------------------------------------------------------------------------------
 %% Trace properties
 %%------------------------------------------------------------------------------
@@ -2278,12 +2223,19 @@ t_cluster_subscription(Config) ->
         }
     ] = ?config(topic_mapping, Config),
     BridgeId = bridge_id(Config),
-    Cluster = [{_N1, Opts1} | _] = cluster(Config),
+    AppSpecs = [emqx_conf, emqx_rule_engine, emqx_bridge_gcp_pubsub, emqx_bridge],
     ?check_trace(
         begin
-            Nodes = [N1, N2] = start_cluster(Cluster),
+            Nodes =
+                [N1, N2] = emqx_cth_cluster:start(
+                    [
+                        {gcp_pubsub_consumer_subscription1, #{apps => AppSpecs}},
+                        {gcp_pubsub_consumer_subscription2, #{apps => AppSpecs}}
+                    ],
+                    #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
+                ),
+            on_exit(fun() -> emqx_cth_cluster:stop(Nodes) end),
             NumNodes = length(Nodes),
-            lists:foreach(fun wait_for_cluster_rpc/1, Nodes),
             erpc:call(N2, fun() -> {ok, _} = create_bridge(Config) end),
             lists:foreach(
                 fun(N) ->
@@ -2303,8 +2255,7 @@ t_cluster_subscription(Config) ->
                 10_000
             ),
 
-            setup_and_start_listeners(N1, Opts1),
-            TCPPort1 = emqx_common_test_helpers:listener_port(Opts1, tcp),
+            TCPPort1 = get_mqtt_port(N1),
             {ok, C1} = emqtt:start_link([{port, TCPPort1}, {proto_ver, v5}]),
             on_exit(fun() -> catch emqtt:stop(C1) end),
             {ok, _} = emqtt:connect(C1),

+ 38 - 97
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl

@@ -1009,7 +1009,7 @@ setup_group_subscriber_spy_fn() ->
 
 setup_group_subscriber_spy(TestPid) ->
     ok = meck:new(brod_group_subscriber_v2, [
-        passthrough, no_link, no_history, non_strict
+        passthrough, no_link, no_history
     ]),
     ok = meck:expect(
         brod_group_subscriber_v2,
@@ -1035,15 +1035,6 @@ setup_group_subscriber_spy(TestPid) ->
 kill_group_subscriber_spy() ->
     meck:unload(brod_group_subscriber_v2).
 
-wait_for_cluster_rpc(Node) ->
-    %% need to wait until the config handler is ready after
-    %% restarting during the cluster join.
-    ?retry(
-        _Sleep0 = 100,
-        _Attempts0 = 50,
-        true = is_pid(erpc:call(Node, erlang, whereis, [emqx_config_handler]))
-    ).
-
 setup_and_start_listeners(Node, NodeOpts) ->
     erpc:call(
         Node,
@@ -1068,39 +1059,25 @@ setup_and_start_listeners(Node, NodeOpts) ->
         end
     ).
 
-cluster(Config) ->
-    PrivDataDir = ?config(priv_dir, Config),
-    ExtraEnvHandlerHook = setup_group_subscriber_spy_fn(),
-    Cluster = emqx_common_test_helpers:emqx_cluster(
-        [core, core],
+cluster(TestCase, Config) ->
+    AppSpecs = [
+        emqx_conf,
+        emqx_rule_engine,
+        {emqx_bridge_kafka, #{after_start => setup_group_subscriber_spy_fn()}},
+        emqx_bridge
+    ],
+    NodeSpecs = emqx_cth_cluster:mk_nodespecs(
         [
-            {apps, [emqx_conf, emqx_rule_engine, emqx_bridge_kafka, emqx_bridge]},
-            {listener_ports, []},
-            {priv_data_dir, PrivDataDir},
-            {load_schema, true},
-            {start_autocluster, true},
-            {schema_mod, emqx_enterprise_schema},
-            {load_apps, [emqx_machine]},
-            {env_handler, fun
-                (emqx) ->
-                    application:set_env(emqx, boot_modules, [broker]),
-                    ExtraEnvHandlerHook(),
-                    ok;
-                (emqx_conf) ->
-                    ok;
-                (_) ->
-                    ok
-            end}
-        ]
+            {node_name(TestCase, 1), #{apps => AppSpecs}},
+            {node_name(TestCase, 2), #{apps => AppSpecs}}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
     ),
-    ct:pal("cluster: ~p", [Cluster]),
-    Cluster.
+    ct:pal("cluster: ~p", [NodeSpecs]),
+    NodeSpecs.
 
-start_peer(Name, Opts) ->
-    Node = emqx_common_test_helpers:start_peer(Name, Opts),
-    % Make it possible to call `ct:pal` and friends (if running under rebar3)
-    _ = emqx_cth_cluster:share_load_module(Node, cthr),
-    Node.
+node_name(TestCase, N) ->
+    binary_to_atom(iolist_to_binary(io_lib:format("~s_~b", [TestCase, N]))).
 
 start_async_publisher(Config, KafkaTopic) ->
     TId = ets:new(kafka_payloads, [public, ordered_set]),
@@ -1156,6 +1133,10 @@ health_check(Node, Config) ->
         {ok, Status}
     end).
 
+get_mqtt_port(Node) ->
+    {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
+    Port.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -1763,29 +1744,16 @@ t_cluster_group(Config) ->
     KafkaTopic = ?config(kafka_topic, Config),
     KafkaName = ?config(kafka_name, Config),
     BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, KafkaName),
-    Cluster = cluster(Config),
+    Cluster = cluster(?FUNCTION_NAME, Config),
     ?check_trace(
         begin
-            Nodes =
-                [_N1, N2 | _] = [
-                    start_peer(Name, Opts)
-                 || {Name, Opts} <- Cluster
-                ],
-            on_exit(fun() ->
-                emqx_utils:pmap(
-                    fun(N) ->
-                        ct:pal("stopping ~p", [N]),
-                        ok = emqx_common_test_helpers:stop_peer(N)
-                    end,
-                    Nodes
-                )
-            end),
+            Nodes = [_N1, N2 | _] = emqx_cth_cluster:start(Cluster),
+            on_exit(fun() -> emqx_cth_cluster:stop(Nodes) end),
             {ok, SRef0} = snabbkaffe:subscribe(
                 ?match_event(#{?snk_kind := kafka_consumer_subscriber_started}),
                 length(Nodes),
                 15_000
             ),
-            wait_for_cluster_rpc(N2),
             erpc:call(N2, fun() -> {ok, _} = create_bridge(Config) end),
             {ok, _} = snabbkaffe:receive_events(SRef0),
             lists:foreach(
@@ -1845,15 +1813,15 @@ t_node_joins_existing_cluster(Config) ->
     KafkaTopic = ?config(kafka_topic, Config),
     KafkaName = ?config(kafka_name, Config),
     BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, KafkaName),
-    Cluster = cluster(Config),
+    Cluster = cluster(?FUNCTION_NAME, Config),
     ?check_trace(
         begin
-            [{Name1, Opts1}, {Name2, Opts2} | _] = Cluster,
-            ct:pal("starting ~p", [Name1]),
-            N1 = start_peer(Name1, Opts1),
+            [NodeSpec1, NodeSpec2 | _] = Cluster,
+            ct:pal("starting ~p", [NodeSpec1]),
+            [N1] = emqx_cth_cluster:start([NodeSpec1]),
             on_exit(fun() ->
                 ct:pal("stopping ~p", [N1]),
-                ok = emqx_common_test_helpers:stop_peer(N1)
+                ok = emqx_cth_cluster:stop([N1])
             end),
             {{ok, _}, {ok, _}} =
                 ?wait_async_action(
@@ -1880,8 +1848,7 @@ t_node_joins_existing_cluster(Config) ->
             ),
 
             %% Now, we start the second node and have it join the cluster.
-            setup_and_start_listeners(N1, Opts1),
-            TCPPort1 = emqx_common_test_helpers:listener_port(Opts1, tcp),
+            TCPPort1 = get_mqtt_port(N1),
             {ok, C1} = emqtt:start_link([{port, TCPPort1}, {proto_ver, v5}]),
             on_exit(fun() -> catch emqtt:stop(C1) end),
             {ok, _} = emqtt:connect(C1),
@@ -1892,14 +1859,13 @@ t_node_joins_existing_cluster(Config) ->
                 1,
                 30_000
             ),
-            ct:pal("starting ~p", [Name2]),
-            N2 = start_peer(Name2, Opts2),
+            ct:pal("starting ~p", [NodeSpec2]),
+            [N2] = emqx_cth_cluster:start([NodeSpec2]),
             on_exit(fun() ->
                 ct:pal("stopping ~p", [N2]),
-                ok = emqx_common_test_helpers:stop_peer(N2)
+                ok = emqx_cth_cluster:stop([N2])
             end),
             Nodes = [N1, N2],
-            wait_for_cluster_rpc(N2),
 
             {ok, _} = snabbkaffe:receive_events(SRef0),
             ?retry(
@@ -1977,40 +1943,16 @@ t_cluster_node_down(Config) ->
     KafkaTopic = ?config(kafka_topic, Config),
     KafkaName = ?config(kafka_name, Config),
     BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, KafkaName),
-    Cluster = cluster(Config),
+    Cluster = cluster(?FUNCTION_NAME, Config),
     ?check_trace(
         begin
-            {_N2, Opts2} = lists:nth(2, Cluster),
-            NumNodes = length(Cluster),
-            Nodes =
-                [N1, N2 | _] =
-                lists:map(
-                    fun({Name, Opts}) ->
-                        ct:pal("starting ~p", [Name]),
-                        start_peer(Name, Opts)
-                    end,
-                    Cluster
-                ),
-            on_exit(fun() ->
-                emqx_utils:pmap(
-                    fun(N) ->
-                        ct:pal("stopping ~p", [N]),
-                        ok = emqx_common_test_helpers:stop_peer(N)
-                    end,
-                    Nodes
-                )
-            end),
+            Nodes = [N1, N2 | _] = emqx_cth_cluster:start(Cluster),
+            on_exit(fun() -> emqx_cth_cluster:stop(Nodes) end),
             {ok, SRef0} = snabbkaffe:subscribe(
                 ?match_event(#{?snk_kind := kafka_consumer_subscriber_started}),
                 length(Nodes),
                 15_000
             ),
-            wait_for_cluster_rpc(N2),
-            {ok, _} = snabbkaffe:block_until(
-                %% -1 because only those that join the first node will emit the event.
-                ?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}),
-                30_000
-            ),
             erpc:call(N2, fun() -> {ok, _} = create_bridge(Config) end),
             {ok, _} = snabbkaffe:receive_events(SRef0),
             lists:foreach(
@@ -2031,8 +1973,7 @@ t_cluster_node_down(Config) ->
 
             %% Now, we stop one of the nodes and watch the group
             %% rebalance.
-            setup_and_start_listeners(N2, Opts2),
-            TCPPort = emqx_common_test_helpers:listener_port(Opts2, tcp),
+            TCPPort = get_mqtt_port(N2),
             {ok, C} = emqtt:start_link([{port, TCPPort}, {proto_ver, v5}]),
             on_exit(fun() -> catch emqtt:stop(C) end),
             {ok, _} = emqtt:connect(C),
@@ -2040,7 +1981,7 @@ t_cluster_node_down(Config) ->
             {TId, Pid} = start_async_publisher(Config, KafkaTopic),
 
             ct:pal("stopping node ~p", [N1]),
-            ok = emqx_common_test_helpers:stop_peer(N1),
+            ok = emqx_cth_cluster:stop([N1]),
 
             %% Give some time for the consumers in remaining node to
             %% rebalance.

+ 141 - 69
apps/emqx_machine/src/emqx_machine_boot.erl

@@ -18,19 +18,37 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
--export([post_boot/0]).
--export([stop_apps/0, ensure_apps_started/0]).
--export([sorted_reboot_apps/0]).
--export([start_autocluster/0]).
--export([stop_port_apps/0]).
--export([read_apps/0]).
+%% API
+-export([
+    start_link/0,
+
+    stop_apps/0,
+    ensure_apps_started/0,
+    sorted_reboot_apps/0,
+    stop_port_apps/0
+]).
+
+%% `gen_server' API
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2
+]).
+
+%% Internal exports (for `emqx_machine_terminator' only)
+-export([do_stop_apps/0]).
 
 -dialyzer({no_match, [basic_reboot_apps/0]}).
 
 -ifdef(TEST).
--export([sorted_reboot_apps/1, reboot_apps/0]).
+-export([read_apps/0, sorted_reboot_apps/1, reboot_apps/0, start_autocluster/0]).
 -endif.
 
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
 %% These apps are always (re)started by emqx_machine:
 -define(BASIC_REBOOT_APPS, [gproc, esockd, ranch, cowboy, emqx_durable_storage, emqx]).
 
@@ -41,34 +59,19 @@
 %% release, depending on the build flags:
 -define(OPTIONAL_APPS, [bcrypt, observer]).
 
-post_boot() ->
-    ok = ensure_apps_started(),
-    ok = print_vsn(),
-    ok = start_autocluster(),
-    ignore.
+%% calls/casts/infos
+-record(start_apps, {}).
+-record(stop_apps, {}).
 
--ifdef(TEST).
-print_vsn() -> ok.
-% TEST
--else.
-print_vsn() ->
-    ?ULOG("~ts ~ts is running now!~n", [emqx_app:get_description(), emqx_app:get_release()]).
-% TEST
--endif.
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
 
-start_autocluster() ->
-    ekka:callback(stop, fun emqx_machine_boot:stop_apps/0),
-    ekka:callback(start, fun emqx_machine_boot:ensure_apps_started/0),
-    %% returns 'ok' or a pid or 'any()' as in spec
-    _ = ekka:autocluster(emqx),
-    ok.
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 stop_apps() ->
-    ?SLOG(notice, #{msg => "stopping_emqx_apps"}),
-    _ = emqx_alarm_handler:unload(),
-    ok = emqx_conf_app:unset_config_loaded(),
-    ok = emqx_plugins:ensure_stopped(),
-    lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())).
+    gen_server:call(?MODULE, #stop_apps{}, infinity).
 
 %% Those port apps are terminated after the main apps
 %% Don't need to stop when reboot.
@@ -84,34 +87,60 @@ stop_port_apps() ->
         [os_mon, jq]
     ).
 
-stop_one_app(App) ->
-    ?SLOG(debug, #{msg => "stopping_app", app => App}),
-    try
-        _ = application:stop(App)
-    catch
-        C:E ->
-            ?SLOG(error, #{
-                msg => "failed_to_stop_app",
-                app => App,
-                exception => C,
-                reason => E
-            })
-    end.
-
 ensure_apps_started() ->
-    ?SLOG(notice, #{msg => "(re)starting_emqx_apps"}),
-    lists:foreach(fun start_one_app/1, sorted_reboot_apps()),
-    ?tp(emqx_machine_boot_apps_started, #{}).
+    gen_server:call(?MODULE, #start_apps{}, infinity).
 
-start_one_app(App) ->
-    ?SLOG(debug, #{msg => "starting_app", app => App}),
-    case application:ensure_all_started(App, restart_type(App)) of
-        {ok, Apps} ->
-            ?SLOG(debug, #{msg => "started_apps", apps => Apps});
-        {error, Reason} ->
-            ?SLOG(critical, #{msg => "failed_to_start_app", app => App, reason => Reason}),
-            error({failed_to_start_app, App, Reason})
-    end.
+sorted_reboot_apps() ->
+    RebootApps = reboot_apps(),
+    Apps0 = [{App, app_deps(App, RebootApps)} || App <- RebootApps],
+    Apps = emqx_machine_boot_runtime_deps:inject(Apps0, runtime_deps()),
+    sorted_reboot_apps(Apps).
+
+%%------------------------------------------------------------------------------
+%% `gen_server' API
+%%------------------------------------------------------------------------------
+
+init(_Opts) ->
+    %% Ensure that the stop callback is set, so that join requests concurrent to the
+    %% startup are serialized here.
+    %% It would still be problematic if a join request arrives before this process is
+    %% started, though.
+    ekka:callback(stop, fun ?MODULE:stop_apps/0),
+    do_start_apps(),
+    ok = print_vsn(),
+    ok = start_autocluster(),
+    State = #{},
+    {ok, State}.
+
+handle_call(#start_apps{}, _From, State) ->
+    do_start_apps(),
+    {reply, ok, State};
+handle_call(#stop_apps{}, _From, State) ->
+    do_stop_apps(),
+    {reply, ok, State};
+handle_call(_Call, _From, State) ->
+    {reply, ignored, State}.
+
+handle_cast(_Cast, State) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%------------------------------------------------------------------------------
+%% Internal exports
+%%------------------------------------------------------------------------------
+
+%% Callers who wish to stop applications should not call this directly, and instead use
+%% `stop_apps/0`.  The only exception is `emqx_machine_terminator': it should call this
+%% block directly in its try-catch block, without the possibility of crashing this
+%% process.
+do_stop_apps() ->
+    ?SLOG(notice, #{msg => "stopping_emqx_apps"}),
+    _ = emqx_alarm_handler:unload(),
+    ok = emqx_conf_app:unset_config_loaded(),
+    ok = emqx_plugins:ensure_stopped(),
+    lists:foreach(fun stop_one_app/1, lists:reverse(?MODULE:sorted_reboot_apps())).
 
 restart_type(App) ->
     PermanentApps =
@@ -123,6 +152,26 @@ restart_type(App) ->
             temporary
     end.
 
+start_autocluster() ->
+    ekka:callback(stop, fun ?MODULE:stop_apps/0),
+    ekka:callback(start, fun ?MODULE:ensure_apps_started/0),
+    %% returns 'ok' or a pid or 'any()' as in spec
+    _ = ekka:autocluster(emqx),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
+
+-ifdef(TEST).
+print_vsn() -> ok.
+% TEST
+-else.
+print_vsn() ->
+    ?ULOG("~ts ~ts is running now!~n", [emqx_app:get_description(), emqx_app:get_release()]).
+% TEST
+-endif.
+
 %% list of app names which should be rebooted when:
 %% 1. due to static config change
 %% 2. after join a cluster
@@ -149,13 +198,6 @@ basic_reboot_apps() ->
     BusinessApps = CommonBusinessApps ++ EditionSpecificApps,
     ?BASIC_REBOOT_APPS ++ (BusinessApps -- excluded_apps()).
 
-%% @doc Read business apps belonging to the current profile/edition.
-read_apps() ->
-    PrivDir = code:priv_dir(emqx_machine),
-    RebootListPath = filename:join([PrivDir, "reboot_lists.eterm"]),
-    {ok, [Apps]} = file:consult(RebootListPath),
-    Apps.
-
 excluded_apps() ->
     %% Optional apps _should_ be (re)started automatically, but only
     %% when they are found in the release:
@@ -168,11 +210,34 @@ is_app(Name) ->
         _ -> false
     end.
 
-sorted_reboot_apps() ->
-    RebootApps = reboot_apps(),
-    Apps0 = [{App, app_deps(App, RebootApps)} || App <- RebootApps],
-    Apps = emqx_machine_boot_runtime_deps:inject(Apps0, runtime_deps()),
-    sorted_reboot_apps(Apps).
+do_start_apps() ->
+    ?SLOG(notice, #{msg => "(re)starting_emqx_apps"}),
+    lists:foreach(fun start_one_app/1, ?MODULE:sorted_reboot_apps()),
+    ?tp(emqx_machine_boot_apps_started, #{}).
+
+start_one_app(App) ->
+    ?SLOG(debug, #{msg => "starting_app", app => App}),
+    case application:ensure_all_started(App, restart_type(App)) of
+        {ok, Apps} ->
+            ?SLOG(debug, #{msg => "started_apps", apps => Apps});
+        {error, Reason} ->
+            ?SLOG(critical, #{msg => "failed_to_start_app", app => App, reason => Reason}),
+            error({failed_to_start_app, App, Reason})
+    end.
+
+stop_one_app(App) ->
+    ?SLOG(debug, #{msg => "stopping_app", app => App}),
+    try
+        _ = application:stop(App)
+    catch
+        C:E ->
+            ?SLOG(error, #{
+                msg => "failed_to_stop_app",
+                app => App,
+                exception => C,
+                reason => E
+            })
+    end.
 
 app_deps(App, RebootApps) ->
     case application:get_key(App, applications) of
@@ -180,6 +245,13 @@ app_deps(App, RebootApps) ->
         {ok, List} -> lists:filter(fun(A) -> lists:member(A, RebootApps) end, List)
     end.
 
+%% @doc Read business apps belonging to the current profile/edition.
+read_apps() ->
+    PrivDir = code:priv_dir(emqx_machine),
+    RebootListPath = filename:join([PrivDir, "reboot_lists.eterm"]),
+    {ok, [Apps]} = file:consult(RebootListPath),
+    Apps.
+
 runtime_deps() ->
     [
         %% `emqx_bridge' is special in that it needs all the bridges apps to

+ 9 - 3
apps/emqx_machine/src/emqx_machine_sup.erl

@@ -29,10 +29,16 @@ start_link() ->
 
 init([]) ->
     Terminator = child_worker(emqx_machine_terminator, [], transient),
-    BootApps = child_worker(emqx_machine_boot, post_boot, [], temporary),
-    GlobalGC = child_worker(emqx_global_gc, [], permanent),
+    %% Must start before `emqx_machine_boot'.
     ReplicantHealthProbe = child_worker(emqx_machine_replicant_health_probe, [], transient),
-    Children = [Terminator, ReplicantHealthProbe, BootApps, GlobalGC],
+    Booter = child_worker(emqx_machine_boot, [], permanent),
+    GlobalGC = child_worker(emqx_global_gc, [], permanent),
+    Children = [
+        Terminator,
+        ReplicantHealthProbe,
+        Booter,
+        GlobalGC
+    ],
     SupFlags = #{
         strategy => one_for_one,
         intensity => 100,

+ 1 - 1
apps/emqx_machine/src/emqx_machine_terminator.erl

@@ -96,7 +96,7 @@ handle_call(?DO_IT, _From, State) ->
     try
         %% stop port apps before stopping other apps.
         emqx_machine_boot:stop_port_apps(),
-        emqx_machine_boot:stop_apps()
+        emqx_machine_boot:do_stop_apps()
     catch
         C:E:St ->
             Apps = [element(1, A) || A <- application:which_applications()],

+ 8 - 0
apps/emqx_machine/test/emqx_machine_SUITE.erl

@@ -56,6 +56,7 @@ end_per_suite(Config) ->
 
 app_specs() ->
     [
+        emqx,
         emqx_conf,
         emqx_prometheus,
         emqx_modules,
@@ -113,8 +114,15 @@ t_shutdown_reboot(Config) ->
         [{machine_reboot_SUITE1, #{role => core, apps => app_specs()}}],
         #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
     ),
+    SortedApps = app_specs(),
     try
         erpc:call(Node, fun() ->
+            %% Since `emqx_cth_*' starts applications without going through
+            %% `emqx_machine', we need to start this manually.
+            {ok, _} = emqx_machine_boot:start_link(),
+            ok = meck:new(emqx_machine_boot, [passthrough]),
+            ok = meck:expect(emqx_machine_boot, sorted_reboot_apps, 0, SortedApps),
+
             true = emqx:is_running(node()),
             emqx_machine_boot:stop_apps(),
             false = emqx:is_running(node()),

+ 1 - 1
apps/emqx_message_transformation/src/emqx_message_transformation.app.src

@@ -1,6 +1,6 @@
 {application, emqx_message_transformation, [
     {description, "EMQX Message Transformation"},
-    {vsn, "0.1.3"},
+    {vsn, "0.1.4"},
     {registered, [emqx_message_transformation_sup, emqx_message_transformation_registry]},
     {mod, {emqx_message_transformation_app, []}},
     {applications, [

+ 2 - 2
apps/emqx_message_transformation/src/emqx_message_transformation_app.erl

@@ -21,14 +21,14 @@ start(_Type, _Args) ->
     {ok, Sup} = emqx_message_transformation_sup:start_link(),
     ok = emqx_variform:inject_allowed_module(emqx_message_transformation_bif),
     ok = emqx_message_transformation_config:add_handler(),
-    ok = emqx_message_transformation:register_hooks(),
     ok = emqx_message_transformation_config:load(),
+    ok = emqx_message_transformation:register_hooks(),
     {ok, Sup}.
 
 -spec stop(term()) -> ok.
 stop(_State) ->
-    ok = emqx_message_transformation_config:unload(),
     ok = emqx_message_transformation:unregister_hooks(),
+    ok = emqx_message_transformation_config:unload(),
     ok = emqx_message_transformation_config:remove_handler(),
     ok = emqx_variform:erase_allowed_module(emqx_message_transformation_bif),
     ok.

+ 4 - 0
apps/emqx_plugins/test/emqx_plugins_SUITE.erl

@@ -914,6 +914,10 @@ t_start_node_with_plugin_enabled({init, Config}) ->
         emqx_conf,
         emqx_ctl,
         {emqx_plugins, #{
+            after_start => fun() ->
+                {ok, Pid} = emqx_machine_boot:start_link(),
+                unlink(Pid)
+            end,
             config =>
                 #{
                     plugins =>

+ 15 - 55
apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl

@@ -388,58 +388,6 @@ receive_published(Line) ->
         ct:fail("publish not received, line ~b", [Line])
     end.
 
-cluster(Config) ->
-    PrivDataDir = ?config(priv_dir, Config),
-    Cluster = emqx_common_test_helpers:emqx_cluster(
-        [core, core],
-        [
-            {apps, [
-                emqx_conf,
-                emqx_rule_engine,
-                emqx_schema_registry
-            ]},
-            {listener_ports, []},
-            {priv_data_dir, PrivDataDir},
-            {load_schema, true},
-            {start_autocluster, true},
-            {schema_mod, emqx_enterprise_schema},
-            {load_apps, [emqx_machine]},
-            {env_handler, fun
-                (emqx) ->
-                    application:set_env(emqx, boot_modules, [broker]),
-                    ok;
-                (emqx_conf) ->
-                    ok;
-                (_) ->
-                    ok
-            end}
-        ]
-    ),
-    ct:pal("cluster:\n  ~p", [Cluster]),
-    Cluster.
-
-start_cluster(Cluster) ->
-    Nodes = [
-        emqx_common_test_helpers:start_peer(Name, Opts)
-     || {Name, Opts} <- Cluster
-    ],
-    NumNodes = length(Nodes),
-    on_exit(fun() ->
-        emqx_utils:pmap(
-            fun(N) ->
-                ct:pal("stopping ~p", [N]),
-                ok = emqx_common_test_helpers:stop_peer(N)
-            end,
-            Nodes
-        )
-    end),
-    {ok, _} = snabbkaffe:block_until(
-        %% -1 because only those that join the first node will emit the event.
-        ?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}),
-        30_000
-    ),
-    Nodes.
-
 wait_for_cluster_rpc(Node) ->
     %% need to wait until the config handler is ready after
     %% restarting during the cluster join.
@@ -703,7 +651,15 @@ t_fail_rollback(Config) ->
 
 t_cluster_serde_build(Config) ->
     SerdeType = ?config(serde_type, Config),
-    Cluster = cluster(Config),
+    AppSpecs = [
+        emqx_conf,
+        emqx_rule_engine,
+        emqx_schema_registry
+    ],
+    ClusterSpec = [
+        {cluster_serde_build1, #{apps => AppSpecs}},
+        {cluster_serde_build2, #{apps => AppSpecs}}
+    ],
     SerdeName = my_serde,
     Schema = schema_params(SerdeType),
     #{
@@ -712,9 +668,13 @@ t_cluster_serde_build(Config) ->
     } = test_params_for(SerdeType, encode_decode1),
     ?check_trace(
         begin
-            Nodes = [N1, N2 | _] = start_cluster(Cluster),
+            Nodes =
+                [N1, N2 | _] = emqx_cth_cluster:start(
+                    ClusterSpec,
+                    #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
+                ),
+            on_exit(fun() -> emqx_cth_cluster:stop(Nodes) end),
             NumNodes = length(Nodes),
-            wait_for_cluster_rpc(N2),
             ?assertMatch(
                 ok,
                 erpc:call(N2, emqx_schema_registry, add_schema, [SerdeName, Schema])

+ 1 - 0
changes/ce/fix-14113.en.md

@@ -0,0 +1 @@
+Fixed a potential race condition where a node that is still starting could crash if it attempted to join a cluster.