ソースを参照

Merge pull request #10117 from SergeTupchiy/EMQX-8889_copy_plugins_on_joining_a_cluster

fix: copy plugins to a new node joining a cluster
SergeTupchiy 2 年 前
コミット
9f9d16dd48

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -25,6 +25,7 @@
 {emqx_mgmt_trace,2}.
 {emqx_persistent_session,1}.
 {emqx_plugin_libs,1}.
+{emqx_plugins,1}.
 {emqx_prometheus,1}.
 {emqx_resource,1}.
 {emqx_retainer,1}.

+ 1 - 3
apps/emqx_management/src/emqx_mgmt_api_plugins.erl

@@ -323,8 +323,6 @@ get_plugins() ->
 upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) ->
     [{FileName, Bin}] = maps:to_list(maps:without([type], Plugin)),
     %% File bin is too large, we use rpc:multicall instead of cluster_rpc:multicall
-    %% TODO what happens when a new node join in?
-    %% emqx_plugins_monitor should copy plugins from other core node when boot-up.
     NameVsn = string:trim(FileName, trailing, ".tar.gz"),
     case emqx_plugins:describe(NameVsn) of
         {error, #{error := "bad_info_file", return := {enoent, _}}} ->
@@ -456,8 +454,8 @@ delete_package(Name) ->
 
 %% for RPC plugin update
 ensure_action(Name, start) ->
-    _ = emqx_plugins:ensure_enabled(Name),
     _ = emqx_plugins:ensure_started(Name),
+    _ = emqx_plugins:ensure_enabled(Name),
     ok;
 ensure_action(Name, stop) ->
     _ = emqx_plugins:ensure_stopped(Name),

+ 1 - 1
apps/emqx_plugins/src/emqx_plugins.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_plugins, [
     {description, "EMQX Plugin Management"},
-    {vsn, "0.1.2"},
+    {vsn, "0.1.3"},
     {modules, []},
     {mod, {emqx_plugins_app, []}},
     {applications, [kernel, stdlib, emqx]},

+ 60 - 1
apps/emqx_plugins/src/emqx_plugins.erl

@@ -47,7 +47,8 @@
 
 -export([
     get_config/2,
-    put_config/2
+    put_config/2,
+    get_tar/1
 ]).
 
 %% internal
@@ -113,6 +114,33 @@ do_ensure_installed(NameVsn) ->
             }}
     end.
 
+-spec get_tar(name_vsn()) -> {ok, binary()} | {error, any}.
+get_tar(NameVsn) ->
+    TarGz = pkg_file(NameVsn),
+    case file:read_file(TarGz) of
+        {ok, Content} ->
+            {ok, Content};
+        {error, _} ->
+            case maybe_create_tar(NameVsn, TarGz, install_dir()) of
+                ok ->
+                    file:read_file(TarGz);
+                Err ->
+                    Err
+            end
+    end.
+
+maybe_create_tar(NameVsn, TarGzName, InstallDir) when is_binary(InstallDir) ->
+    maybe_create_tar(NameVsn, TarGzName, binary_to_list(InstallDir));
+maybe_create_tar(NameVsn, TarGzName, InstallDir) ->
+    case filelib:wildcard(filename:join(dir(NameVsn), "**")) of
+        [_ | _] = PluginFiles ->
+            InstallDir1 = string:trim(InstallDir, trailing, "/") ++ "/",
+            PluginFiles1 = [{string:prefix(F, InstallDir1), F} || F <- PluginFiles],
+            erl_tar:create(TarGzName, PluginFiles1, [compressed]);
+        _ ->
+            {error, plugin_not_found}
+    end.
+
 write_tar_file_content(BaseDir, TarContent) ->
     lists:foreach(
         fun({Name, Bin}) ->
@@ -393,6 +421,7 @@ do_ensure_started(NameVsn) ->
     tryit(
         "start_plugins",
         fun() ->
+            ok = ensure_exists_and_installed(NameVsn),
             Plugin = do_read_plugin(NameVsn),
             ok = load_code_start_apps(NameVsn, Plugin)
         end
@@ -446,6 +475,36 @@ do_read_plugin({file, InfoFile}, Options) ->
 do_read_plugin(NameVsn, Options) ->
     do_read_plugin({file, info_file(NameVsn)}, Options).
 
+ensure_exists_and_installed(NameVsn) ->
+    case filelib:is_dir(dir(NameVsn)) of
+        true ->
+            ok;
+        _ ->
+            Nodes = [N || N <- mria:running_nodes(), N /= node()],
+            case get_from_any_node(Nodes, NameVsn, []) of
+                {ok, TarContent} ->
+                    ok = file:write_file(pkg_file(NameVsn), TarContent),
+                    ok = do_ensure_installed(NameVsn);
+                {error, NodeErrors} ->
+                    ?SLOG(error, #{
+                        msg => "failed_to_copy_plugin_from_other_nodes",
+                        name_vsn => NameVsn,
+                        node_errors => NodeErrors
+                    }),
+                    {error, plugin_not_found}
+            end
+    end.
+
+get_from_any_node([], _NameVsn, Errors) ->
+    {error, Errors};
+get_from_any_node([Node | T], NameVsn, Errors) ->
+    case emqx_plugins_proto_v1:get_tar(Node, NameVsn, infinity) of
+        {ok, _} = Res ->
+            Res;
+        Err ->
+            get_from_any_node(T, NameVsn, [{Node, Err} | Errors])
+    end.
+
 plugins_readme(NameVsn, #{fill_readme := true}, Info) ->
     case file:read_file(readme_file(NameVsn)) of
         {ok, Bin} -> Info#{readme => Bin};

+ 35 - 0
apps/emqx_plugins/src/proto/emqx_plugins_proto_v1.erl

@@ -0,0 +1,35 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_plugins_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+    get_tar/3
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+-type name_vsn() :: binary() | string().
+
+introduced_in() ->
+    "5.0.21".
+
+-spec get_tar(node(), name_vsn(), timeout()) -> {ok, binary()} | {error, any}.
+get_tar(Node, NameVsn, Timeout) ->
+    rpc:call(Node, emqx_plugins, get_tar, [NameVsn], Timeout).

+ 133 - 11
apps/emqx_plugins/test/emqx_plugins_SUITE.erl

@@ -36,10 +36,30 @@
 -define(EMQX_ELIXIR_PLUGIN_TEMPLATE_TAG, "0.1.0-2").
 -define(PACKAGE_SUFFIX, ".tar.gz").
 
-all() -> emqx_common_test_helpers:all(?MODULE).
+all() ->
+    [
+        {group, copy_plugin},
+        {group, create_tar_copy_plugin},
+        emqx_common_test_helpers:all(?MODULE)
+    ].
+
+groups() ->
+    [
+        {copy_plugin, [sequence], [group_t_copy_plugin_to_a_new_node]},
+        {create_tar_copy_plugin, [sequence], [group_t_copy_plugin_to_a_new_node]}
+    ].
+
+init_per_group(copy_plugin, Config) ->
+    Config;
+init_per_group(create_tar_copy_plugin, Config) ->
+    [{remove_tar, true} | Config].
+
+end_per_group(_Group, _Config) ->
+    ok.
 
 init_per_suite(Config) ->
     WorkDir = proplists:get_value(data_dir, Config),
+    filelib:ensure_path(WorkDir),
     OrigInstallDir = emqx_plugins:get_config(install_dir, undefined),
     emqx_common_test_helpers:start_apps([emqx_conf]),
     emqx_plugins:put_config(install_dir, WorkDir),
@@ -71,15 +91,7 @@ end_per_testcase(TestCase, Config) ->
     ?MODULE:TestCase({'end', Config}).
 
 get_demo_plugin_package() ->
-    get_demo_plugin_package(
-        #{
-            release_name => ?EMQX_PLUGIN_TEMPLATE_RELEASE_NAME,
-            git_url => ?EMQX_PLUGIN_TEMPLATE_URL,
-            vsn => ?EMQX_PLUGIN_TEMPLATE_VSN,
-            tag => ?EMQX_PLUGIN_TEMPLATE_TAG,
-            shdir => emqx_plugins:install_dir()
-        }
-    ).
+    get_demo_plugin_package(emqx_plugins:install_dir()).
 
 get_demo_plugin_package(
     #{
@@ -98,7 +110,17 @@ get_demo_plugin_package(
         TargetName
     ]),
     ok = file:write_file(Pkg, PluginBin),
-    Opts#{package => Pkg}.
+    Opts#{package => Pkg};
+get_demo_plugin_package(Dir) ->
+    get_demo_plugin_package(
+        #{
+            release_name => ?EMQX_PLUGIN_TEMPLATE_RELEASE_NAME,
+            git_url => ?EMQX_PLUGIN_TEMPLATE_URL,
+            vsn => ?EMQX_PLUGIN_TEMPLATE_VSN,
+            tag => ?EMQX_PLUGIN_TEMPLATE_TAG,
+            shdir => Dir
+        }
+    ).
 
 bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
 bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
@@ -479,6 +501,106 @@ t_elixir_plugin(Config) ->
     ?assertEqual([], emqx_plugins:list()),
     ok.
 
+group_t_copy_plugin_to_a_new_node({init, Config}) ->
+    WorkDir = proplists:get_value(data_dir, Config),
+    FromInstallDir = filename:join(WorkDir, atom_to_list(plugins_copy_from)),
+    file:del_dir_r(FromInstallDir),
+    ok = filelib:ensure_path(FromInstallDir),
+    ToInstallDir = filename:join(WorkDir, atom_to_list(plugins_copy_to)),
+    file:del_dir_r(ToInstallDir),
+    ok = filelib:ensure_path(ToInstallDir),
+    #{package := Package, release_name := PluginName} = get_demo_plugin_package(FromInstallDir),
+    [{CopyFrom, CopyFromOpts}, {CopyTo, CopyToOpts}] =
+        emqx_common_test_helpers:emqx_cluster(
+            [
+                {core, plugins_copy_from},
+                {core, plugins_copy_to}
+            ],
+            #{
+                apps => [emqx_conf, emqx_plugins],
+                env => [
+                    {emqx, init_config_load_done, false},
+                    {emqx, boot_modules, []}
+                ],
+                load_schema => false
+            }
+        ),
+    CopyFromNode = emqx_common_test_helpers:start_slave(
+        CopyFrom, maps:remove(join_to, CopyFromOpts)
+    ),
+    ok = rpc:call(CopyFromNode, emqx_plugins, put_config, [install_dir, FromInstallDir]),
+    CopyToNode = emqx_common_test_helpers:start_slave(CopyTo, maps:remove(join_to, CopyToOpts)),
+    ok = rpc:call(CopyToNode, emqx_plugins, put_config, [install_dir, ToInstallDir]),
+    NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
+    ok = rpc:call(CopyFromNode, emqx_plugins, ensure_installed, [NameVsn]),
+    ok = rpc:call(CopyFromNode, emqx_plugins, ensure_started, [NameVsn]),
+    ok = rpc:call(CopyFromNode, emqx_plugins, ensure_enabled, [NameVsn]),
+    case proplists:get_bool(remove_tar, Config) of
+        true ->
+            %% Test the case when a plugin is installed, but its original tar file is removed
+            %% and must be re-created
+            ok = file:delete(filename:join(FromInstallDir, NameVsn ++ ?PACKAGE_SUFFIX));
+        false ->
+            ok
+    end,
+    [
+        {from_install_dir, FromInstallDir},
+        {to_install_dir, ToInstallDir},
+        {copy_from_node, CopyFromNode},
+        {copy_to_node, CopyToNode},
+        {name_vsn, NameVsn},
+        {plugin_name, PluginName}
+        | Config
+    ];
+group_t_copy_plugin_to_a_new_node({'end', Config}) ->
+    CopyFromNode = proplists:get_value(copy_from_node, Config),
+    CopyToNode = proplists:get_value(copy_to_node, Config),
+    ok = rpc:call(CopyFromNode, emqx_config, delete_override_conf_files, []),
+    ok = rpc:call(CopyToNode, emqx_config, delete_override_conf_files, []),
+    rpc:call(CopyToNode, ekka, leave, []),
+    rpc:call(CopyFromNode, ekka, leave, []),
+    {ok, _} = emqx_common_test_helpers:stop_slave(CopyToNode),
+    {ok, _} = emqx_common_test_helpers:stop_slave(CopyFromNode),
+    ok = file:del_dir_r(proplists:get_value(to_install_dir, Config)),
+    ok = file:del_dir_r(proplists:get_value(from_install_dir, Config));
+group_t_copy_plugin_to_a_new_node(Config) ->
+    CopyFromNode = proplists:get_value(copy_from_node, Config),
+    CopyToNode = proplists:get_value(copy_to_node, Config),
+    CopyToDir = proplists:get_value(to_install_dir, Config),
+    CopyFromPluginsState = rpc:call(CopyFromNode, emqx_plugins, get_config, [[states], []]),
+    NameVsn = proplists:get_value(name_vsn, Config),
+    PluginName = proplists:get_value(plugin_name, Config),
+    PluginApp = list_to_atom(PluginName),
+    ?assertMatch([#{enable := true, name_vsn := NameVsn}], CopyFromPluginsState),
+    ?assert(
+        proplists:is_defined(
+            PluginApp,
+            rpc:call(CopyFromNode, application, which_applications, [])
+        )
+    ),
+    ?assertEqual([], filelib:wildcard(filename:join(CopyToDir, "**"))),
+    %% Check that a new node doesn't have this plugin before it joins the cluster
+    ?assertEqual([], rpc:call(CopyToNode, emqx_conf, get, [[plugins, states], []])),
+    ?assertMatch({error, _}, rpc:call(CopyToNode, emqx_plugins, describe, [NameVsn])),
+    ?assertNot(
+        proplists:is_defined(
+            PluginApp,
+            rpc:call(CopyToNode, application, which_applications, [])
+        )
+    ),
+    ok = rpc:call(CopyToNode, ekka, join, [CopyFromNode]),
+    %% Mimic cluster-override conf copying
+    ok = rpc:call(CopyToNode, emqx_plugins, put_config, [[states], CopyFromPluginsState]),
+    %% Plugin copying is triggered upon app restart on a new node.
+    %% This is similar to emqx_conf, which copies cluster-override conf upon start,
+    %% see: emqx_conf_app:init_conf/0
+    ok = rpc:call(CopyToNode, application, stop, [emqx_plugins]),
+    {ok, _} = rpc:call(CopyToNode, application, ensure_all_started, [emqx_plugins]),
+    ?assertMatch(
+        {ok, #{running_status := running, config_status := enabled}},
+        rpc:call(CopyToNode, emqx_plugins, describe, [NameVsn])
+    ).
+
 make_tar(Cwd, NameWithVsn) ->
     make_tar(Cwd, NameWithVsn, NameWithVsn).
 

+ 2 - 0
changes/ce/fix-10117.en.md

@@ -0,0 +1,2 @@
+Fix an error occurring when a joining node doesn't have plugins that are installed on other nodes in the cluster.
+After this change, the joining node will copy all the necessary plugins from other nodes.