Forráskód Böngészése

Merge pull request #13393 from thalesmg/20240702-r57-test-plugin-start-enabled

fix(plugins): ensure plugin apps are restarted when restarting `emqx_plugins`
Thales Macedo Garitezi 1 éve
szülő
commit
35f1ddc0eb

+ 1 - 1
apps/emqx_machine/src/emqx_machine.app.src

@@ -3,7 +3,7 @@
     {id, "emqx_machine"},
     {description, "The EMQX Machine"},
     % strict semver, bump manually!
-    {vsn, "0.3.2"},
+    {vsn, "0.3.3"},
     {modules, []},
     {registered, []},
     {applications, [kernel, stdlib, emqx_ctl, redbug]},

+ 1 - 0
apps/emqx_machine/src/emqx_machine_boot.erl

@@ -66,6 +66,7 @@ stop_apps() ->
     ?SLOG(notice, #{msg => "stopping_emqx_apps"}),
     _ = emqx_alarm_handler:unload(),
     ok = emqx_conf_app:unset_config_loaded(),
+    ok = emqx_plugins:ensure_stopped(),
     lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())).
 
 %% Those port apps are terminated after the main apps

+ 1 - 1
apps/emqx_plugins/src/emqx_plugins.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_plugins, [
     {description, "EMQX Plugin Management"},
-    {vsn, "0.2.1"},
+    {vsn, "0.2.2"},
     {modules, []},
     {mod, {emqx_plugins_app, []}},
     {applications, [kernel, stdlib, emqx, erlavro]},

+ 7 - 5
apps/emqx_plugins/src/emqx_plugins.erl

@@ -299,8 +299,10 @@ ensure_stopped() ->
     Fun = fun
         (#{name_vsn := NameVsn, enable := true}) ->
             case ensure_stopped(NameVsn) of
-                ok -> [];
-                {error, Reason} -> [{NameVsn, Reason}]
+                ok ->
+                    [];
+                {error, Reason} ->
+                    [{NameVsn, Reason}]
             end;
         (#{name_vsn := NameVsn, enable := false}) ->
             ?SLOG(debug, #{msg => "plugin_disabled", action => stop_plugin, name_vsn => NameVsn}),
@@ -1077,15 +1079,15 @@ stop_app(App) ->
     case application:stop(App) of
         ok ->
             ?SLOG(debug, #{msg => "stop_plugin_successfully", app => App}),
-            ok = unload_moudle_and_app(App);
+            ok = unload_module_and_app(App);
         {error, {not_started, App}} ->
             ?SLOG(debug, #{msg => "plugin_not_started", app => App}),
-            ok = unload_moudle_and_app(App);
+            ok = unload_module_and_app(App);
         {error, Reason} ->
             throw(#{msg => "failed_to_stop_app", app => App, reason => Reason})
     end.
 
-unload_moudle_and_app(App) ->
+unload_module_and_app(App) ->
     case application:get_key(App, modules) of
         {ok, Modules} ->
             lists:foreach(fun code:soft_purge/1, Modules);

+ 2 - 0
apps/emqx_plugins/src/emqx_plugins_app.erl

@@ -19,6 +19,7 @@
 -behaviour(application).
 
 -include("emqx_plugins.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
 
 -export([
     start/2,
@@ -31,6 +32,7 @@ start(_Type, _Args) ->
     ok = emqx_plugins:ensure_installed(),
     ok = emqx_plugins:ensure_started(),
     ok = emqx_config_handler:add_handler([?CONF_ROOT], emqx_plugins),
+    ?tp("emqx_plugins_app_started", #{}),
     {ok, Sup}.
 
 stop(_State) ->

+ 150 - 0
apps/emqx_plugins/test/emqx_plugins_SUITE.erl

@@ -48,6 +48,8 @@
 -define(EMQX_ELIXIR_PLUGIN_TEMPLATE_TAG, "0.1.0-2").
 -define(PACKAGE_SUFFIX, ".tar.gz").
 
+-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)).
+
 all() ->
     [
         {group, copy_plugin},
@@ -140,6 +142,39 @@ bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
 bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
 bin(B) when is_binary(B) -> B.
 
+hookpoints() ->
+    [
+        'client.connect',
+        'client.connack',
+        'client.connected',
+        'client.disconnected',
+        'client.authenticate',
+        'client.authorize',
+        'client.subscribe',
+        'client.unsubscribe',
+        'session.created',
+        'session.subscribed',
+        'session.unsubscribed',
+        'session.resumed',
+        'session.discarded',
+        'session.takenover',
+        'session.terminated',
+        'message.publish',
+        'message.puback',
+        'message.delivered',
+        'message.acked',
+        'message.dropped'
+    ].
+
+get_hook_modules() ->
+    lists:flatmap(
+        fun(HookPoint) ->
+            CBs = emqx_hooks:lookup(HookPoint),
+            [Mod || {callback, {Mod, _Fn, _Args}, _Filter, _Prio} <- CBs]
+        end,
+        hookpoints()
+    ).
+
 t_demo_install_start_stop_uninstall({init, Config}) ->
     Opts = #{package := Package} = get_demo_plugin_package(),
     NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
@@ -256,9 +291,18 @@ t_start_restart_and_stop({init, Config}) ->
 t_start_restart_and_stop({'end', _Config}) ->
     ok;
 t_start_restart_and_stop(Config) ->
+    %% pre-condition
+    Hooks0 = get_hook_modules(),
+    ?assertNot(lists:member(?EMQX_PLUGIN_APP_NAME, Hooks0), #{hooks => Hooks0}),
+
     NameVsn = proplists:get_value(name_vsn, Config),
     ok = emqx_plugins:ensure_installed(NameVsn),
     ok = emqx_plugins:ensure_enabled(NameVsn),
+
+    %% Application is not yet started.
+    Hooks1 = get_hook_modules(),
+    ?assertNot(lists:member(?EMQX_PLUGIN_APP_NAME, Hooks1), #{hooks => Hooks1}),
+
     FakeInfo =
         "name=bar, rel_vsn=\"2\", rel_apps=[\"bar-9\"],"
         "description=\"desc bar\"",
@@ -271,6 +315,10 @@ t_start_restart_and_stop(Config) ->
     ok = emqx_plugins:ensure_started(),
     assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
 
+    %% Should have called the application start callback, which in turn adds hooks.
+    Hooks2 = get_hook_modules(),
+    ?assert(lists:member(?EMQX_PLUGIN_APP_NAME, Hooks2), #{hooks => Hooks2}),
+
     %% fake enable bar-2
     ok = ensure_state(Bar2, rear, true),
     %% should cause an error
@@ -292,6 +340,10 @@ t_start_restart_and_stop(Config) ->
     assert_app_running(?EMQX_PLUGIN_APP_NAME, false),
     ok = ensure_state(Bar2, rear, false),
 
+    %% Should have called the application stop callback, which removes the hooks.
+    Hooks3 = get_hook_modules(),
+    ?assertNot(lists:member(?EMQX_PLUGIN_APP_NAME, Hooks3), #{hooks => Hooks3}),
+
     ok = emqx_plugins:restart(NameVsn),
     assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
     %% repeat
@@ -371,6 +423,15 @@ assert_app_running(Name, false) ->
     AllApps = application:which_applications(),
     ?assertEqual(false, lists:keyfind(Name, 1, AllApps)).
 
+assert_started_and_hooks_loaded() ->
+    PluginConfig = emqx_plugins:list(),
+    ct:pal("plugin config:\n  ~p", [PluginConfig]),
+    ?assertMatch([_], PluginConfig),
+    assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
+    Hooks = get_hook_modules(),
+    ?assert(lists:member(?EMQX_PLUGIN_APP_NAME, Hooks), #{hooks => Hooks}),
+    ok.
+
 t_bad_tar_gz({init, Config}) ->
     Config;
 t_bad_tar_gz({'end', _Config}) ->
@@ -841,6 +902,95 @@ group_t_cluster_leave(Config) ->
     ),
     ok.
 
+%% Checks that starting a node with a plugin enabled starts it correctly, and that the
+%% hooks added by the plugin's `application:start/2' callback are indeed in place.
+%% See also: https://github.com/emqx/emqx/issues/13378
+t_start_node_with_plugin_enabled({init, Config}) ->
+    #{package := Package, shdir := InstallDir} = get_demo_plugin_package(),
+    NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
+    AppSpecs = [
+        emqx,
+        emqx_conf,
+        emqx_ctl,
+        {emqx_plugins, #{
+            config =>
+                #{
+                    plugins =>
+                        #{
+                            install_dir => InstallDir,
+                            states =>
+                                [
+                                    #{
+                                        enable => true,
+                                        name_vsn => NameVsn
+                                    }
+                                ]
+                        }
+                }
+        }}
+    ],
+    Name1 = t_cluster_start_enabled1,
+    Name2 = t_cluster_start_enabled2,
+    Specs = emqx_cth_cluster:mk_nodespecs(
+        [
+            {Name1, #{role => core, apps => AppSpecs, join_to => undefined}},
+            {Name2, #{role => core, apps => AppSpecs, join_to => undefined}}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
+    ),
+    Names = [Name1, Name2],
+    Nodes = [emqx_cth_cluster:node_name(N) || N <- Names],
+    [
+        {node_specs, Specs},
+        {nodes, Nodes},
+        {name_vsn, NameVsn}
+        | Config
+    ];
+t_start_node_with_plugin_enabled({'end', Config}) ->
+    Nodes = ?config(nodes, Config),
+    ok = emqx_cth_cluster:stop(Nodes),
+    ok;
+t_start_node_with_plugin_enabled(Config) when is_list(Config) ->
+    NodeSpecs = ?config(node_specs, Config),
+    ?check_trace(
+        #{timetrap => 10_000},
+        begin
+            [N1, N2 | _] = emqx_cth_cluster:start(NodeSpecs),
+            ?ON(N1, assert_started_and_hooks_loaded()),
+            ?ON(N2, assert_started_and_hooks_loaded()),
+            %% Now make them join.
+            %% N.B.: We need to start autocluster so that applications are restarted in
+            %% order, and also we need to override the config loader to emulate what
+            %% `emqx_cth_cluster' does and avoid the node crashing due to lack of config
+            %% keys.
+            ok = ?ON(N2, emqx_machine_boot:start_autocluster()),
+            ?ON(N2, begin
+                StartCallback0 =
+                    case ekka:env({callback, start}) of
+                        {ok, SC0} -> SC0;
+                        _ -> fun() -> ok end
+                    end,
+                StartCallback = fun() ->
+                    ok = emqx_app:set_config_loader(emqx_cth_suite),
+                    StartCallback0()
+                end,
+                ekka:callback(start, StartCallback)
+            end),
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    ?ON(N2, ekka:join(N1)),
+                    #{?snk_kind := "emqx_plugins_app_started"}
+                ),
+            ct:pal("checking N1 state"),
+            ?ON(N1, assert_started_and_hooks_loaded()),
+            ct:pal("checking N2 state"),
+            ?ON(N2, assert_started_and_hooks_loaded()),
+            ok
+        end,
+        []
+    ),
+    ok.
+
 make_tar(Cwd, NameWithVsn) ->
     make_tar(Cwd, NameWithVsn, NameWithVsn).
 

+ 1 - 0
changes/ce/fix-13393.en.md

@@ -0,0 +1 @@
+Fixed an issue where plugin applications were not restarted after a node joins a cluster, leading to an inconsistent state where hooks were not properly installed.