瀏覽代碼

fix(emqx_machine): Fix start/stop callbacks (#5969)

* fix(emqx_machine): Fix start/stop callbacks

* chore(ekka): Bump version to 0.11.1

* fix(router): Wait for the tables

* fix(emqx_cluster_rpc): Stop cluster RPC when joining a cluster

* fix(emqx_app): Fix a deadlock when joining the cluster

* fix(emqx_telemetry): Wait for mnesia tables

* test(ct_helper): Start ekka before emqx
k32 4 年之前
父節點
當前提交
9fdd5e6a7e

+ 1 - 1
apps/emqx/rebar.config

@@ -15,7 +15,7 @@
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}}
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}}
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}}
-    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.0"}}}
+    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.1"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
     , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.20.3"}}}
     , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.20.3"}}}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}

+ 2 - 3
apps/emqx/src/emqx_app.erl

@@ -41,7 +41,7 @@
 start(_Type, _Args) ->
 start(_Type, _Args) ->
     ok = maybe_load_config(),
     ok = maybe_load_config(),
     ok = maybe_start_quicer(),
     ok = maybe_start_quicer(),
-    ensure_ekka_started(),
+    wait_boot_shards(),
     {ok, Sup} = emqx_sup:start_link(),
     {ok, Sup} = emqx_sup:start_link(),
     ok = maybe_start_listeners(),
     ok = maybe_start_listeners(),
     ok = emqx_alarm_handler:load(),
     ok = emqx_alarm_handler:load(),
@@ -55,8 +55,7 @@ prep_stop(_State) ->
 
 
 stop(_State) -> ok.
 stop(_State) -> ok.
 
 
-ensure_ekka_started() ->
-    ekka:start(),
+wait_boot_shards() ->
     ok = mria_rlog:wait_for_shards(?BOOT_SHARDS, infinity).
     ok = mria_rlog:wait_for_shards(?BOOT_SHARDS, infinity).
 
 
 %% @doc Call this function to make emqx boot without loading config,
 %% @doc Call this function to make emqx boot without loading config,

+ 1 - 0
apps/emqx/src/emqx_router_helper.erl

@@ -92,6 +92,7 @@ monitor(Node) when is_atom(Node) ->
 
 
 init([]) ->
 init([]) ->
     ok = ekka:monitor(membership),
     ok = ekka:monitor(membership),
+    _ = mria:wait_for_tables([?ROUTING_NODE]),
     {ok, _} = mnesia:subscribe({table, ?ROUTING_NODE, simple}),
     {ok, _} = mnesia:subscribe({table, ?ROUTING_NODE, simple}),
     Nodes = lists:foldl(
     Nodes = lists:foldl(
               fun(Node, Acc) ->
               fun(Node, Acc) ->

+ 2 - 1
apps/emqx/test/emqx_common_test_helpers.erl

@@ -133,6 +133,7 @@ start_apps(Apps, Handler) when is_function(Handler) ->
     %% Load all application code to beam vm first
     %% Load all application code to beam vm first
     %% Because, minirest, ekka etc.. application will scan these modules
     %% Because, minirest, ekka etc.. application will scan these modules
     lists:foreach(fun load/1, [emqx | Apps]),
     lists:foreach(fun load/1, [emqx | Apps]),
+    ekka:start(),
     lists:foreach(fun(App) -> start_app(App, Handler) end, [emqx | Apps]).
     lists:foreach(fun(App) -> start_app(App, Handler) end, [emqx | Apps]).
 
 
 load(App) ->
 load(App) ->
@@ -195,7 +196,7 @@ generate_config(SchemaModule, ConfigFile) when is_atom(SchemaModule) ->
 
 
 -spec(stop_apps(list()) -> ok).
 -spec(stop_apps(list()) -> ok).
 stop_apps(Apps) ->
 stop_apps(Apps) ->
-    [application:stop(App) || App <- Apps ++ [emqx, mria, mnesia]],
+    [application:stop(App) || App <- Apps ++ [emqx, ekka, mria, mnesia]],
     ok.
     ok.
 
 
 %% backward compatible
 %% backward compatible

+ 1 - 0
apps/emqx_machine/src/emqx_cluster_rpc.erl

@@ -135,6 +135,7 @@ skip_failed_commit(Node) ->
 
 
 %% @private
 %% @private
 init([Node, RetryMs]) ->
 init([Node, RetryMs]) ->
+    _ = mria:wait_for_tables([?CLUSTER_MFA]),
     {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}),
     {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}),
     {ok, #{node => Node, retry_interval => RetryMs}, {continue, ?CATCH_UP}}.
     {ok, #{node => Node, retry_interval => RetryMs}, {continue, ?CATCH_UP}}.
 
 

+ 11 - 7
apps/emqx_machine/src/emqx_machine_boot.erl

@@ -18,7 +18,7 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/logger.hrl").
 
 
 -export([post_boot/0]).
 -export([post_boot/0]).
--export([stop_apps/1, ensure_apps_started/0]).
+-export([stop_apps/0, ensure_apps_started/0]).
 -export([sorted_reboot_apps/0]).
 -export([sorted_reboot_apps/0]).
 -export([start_autocluster/0]).
 -export([start_autocluster/0]).
 
 
@@ -42,15 +42,16 @@ print_vsn() ->
 
 
 
 
 start_autocluster() ->
 start_autocluster() ->
-    ekka:callback(prepare, fun ?MODULE:stop_apps/1),
-    ekka:callback(reboot,  fun ?MODULE:ensure_apps_started/0),
+    ekka:callback(stop,  fun emqx_machine_boot:stop_apps/0),
+    ekka:callback(start, fun emqx_machine_boot:ensure_apps_started/0),
     _ = ekka:autocluster(emqx), %% returns 'ok' or a pid or 'any()' as in spec
     _ = ekka:autocluster(emqx), %% returns 'ok' or a pid or 'any()' as in spec
     ok.
     ok.
 
 
-stop_apps(Reason) ->
-    ?SLOG(info, #{msg => "stopping_apps", reason => Reason}),
+stop_apps() ->
+    ?SLOG(notice, #{msg => "stopping_emqx_apps"}),
     _ = emqx_alarm_handler:unload(),
     _ = emqx_alarm_handler:unload(),
-    lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())).
+    lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())),
+    emqx_machine_sup:stop_cluster_rpc().
 
 
 stop_one_app(App) ->
 stop_one_app(App) ->
     ?SLOG(debug, #{msg => "stopping_app", app => App}),
     ?SLOG(debug, #{msg => "stopping_app", app => App}),
@@ -64,8 +65,11 @@ stop_one_app(App) ->
                 reason => E})
                 reason => E})
     end.
     end.
 
 
-
 ensure_apps_started() ->
 ensure_apps_started() ->
+    ?SLOG(notice, #{msg => "(re)starting_emqx_apps"}),
+    %% FIXME: Hack spawning the cluster RPC asynchronously to avoid a
+    %% deadlock somewhere in EMQ X startup
+    spawn_link(fun() -> emqx_machine_sup:start_cluster_rpc() end),
     lists:foreach(fun start_one_app/1, sorted_reboot_apps()).
     lists:foreach(fun start_one_app/1, sorted_reboot_apps()).
 
 
 start_one_app(App) ->
 start_one_app(App) ->

+ 32 - 0
apps/emqx_machine/src/emqx_machine_sup.erl

@@ -21,6 +21,8 @@
 -behaviour(supervisor).
 -behaviour(supervisor).
 
 
 -export([ start_link/0
 -export([ start_link/0
+        , stop_cluster_rpc/0
+        , start_cluster_rpc/0
         ]).
         ]).
 
 
 -export([init/1]).
 -export([init/1]).
@@ -28,6 +30,26 @@
 start_link() ->
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 
+stop_cluster_rpc() ->
+    case whereis(?MODULE) of
+        undefined ->
+            ok;
+        _ ->
+            _ = supervisor:terminate_child(?MODULE, emqx_cluster_rpc_handler),
+            _ = supervisor:terminate_child(?MODULE, emqx_cluster_rpc),
+            ok
+    end.
+
+start_cluster_rpc() ->
+    case whereis(?MODULE) of
+        undefined ->
+            ok;
+        _ ->
+            ensure_running(emqx_cluster_rpc),
+            ensure_running(emqx_cluster_rpc_handler),
+            ok
+    end.
+
 init([]) ->
 init([]) ->
     GlobalGC = child_worker(emqx_global_gc, [], permanent),
     GlobalGC = child_worker(emqx_global_gc, [], permanent),
     Terminator = child_worker(emqx_machine_terminator, [], transient),
     Terminator = child_worker(emqx_machine_terminator, [], transient),
@@ -52,3 +74,13 @@ child_worker(M, Func, Args, Restart) ->
       type     => worker,
       type     => worker,
       modules  => [M]
       modules  => [M]
      }.
      }.
+
+ensure_running(Id) ->
+    %% Assuming Id == locally registered name
+    case whereis(Id) of
+        undefined ->
+            _ = supervisor:restart_child(?MODULE, Id),
+            ok;
+        _ ->
+            ok
+    end.

+ 1 - 1
apps/emqx_machine/src/emqx_machine_terminator.erl

@@ -80,7 +80,7 @@ handle_cast(_Cast, State) ->
 
 
 handle_call(?DO_IT, _From, State) ->
 handle_call(?DO_IT, _From, State) ->
     try
     try
-        emqx_machine_boot:stop_apps(normal)
+        emqx_machine_boot:stop_apps()
     catch
     catch
         C : E : St ->
         C : E : St ->
             Apps = [element(1, A) || A <- application:which_applications()],
             Apps = [element(1, A) || A <- application:which_applications()],

+ 2 - 2
apps/emqx_machine/test/emqx_machine_SUITE.erl

@@ -33,9 +33,9 @@ end_per_suite(_Config) ->
     emqx_common_test_helpers:stop_apps([]).
     emqx_common_test_helpers:stop_apps([]).
 
 
 t_shutdown_reboot(_Config) ->
 t_shutdown_reboot(_Config) ->
-    emqx_machine_boot:stop_apps(normal),
+    emqx_machine_boot:stop_apps(),
     false = emqx:is_running(node()),
     false = emqx:is_running(node()),
     emqx_machine_boot:ensure_apps_started(),
     emqx_machine_boot:ensure_apps_started(),
     true = emqx:is_running(node()),
     true = emqx:is_running(node()),
-    ok = emqx_machine_boot:stop_apps(for_test),
+    ok = emqx_machine_boot:stop_apps(),
     false = emqx:is_running(node()).
     false = emqx:is_running(node()).

+ 1 - 0
apps/emqx_modules/src/emqx_telemetry.erl

@@ -102,6 +102,7 @@ mnesia(boot) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 start_link() ->
 start_link() ->
+    _ = mria:wait_for_tables([?TELEMETRY]),
     Opts = emqx:get_config([telemetry], #{}),
     Opts = emqx:get_config([telemetry], #{}),
     gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
     gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
 
 

+ 1 - 1
rebar.config

@@ -50,7 +50,7 @@
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}}
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}}
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}}
-    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.0"}}}
+    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.1"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
     , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.5"}}}
     , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.5"}}}
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}