Просмотр исходного кода

fix(plugins): update plugin order on whole cluster

Fixes https://emqx.atlassian.net/browse/EMQX-10879
Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
4dfbc859f9

+ 1 - 0
apps/emqx/test/emqx_cth_cluster.erl

@@ -20,6 +20,7 @@
 -export([stop/1]).
 
 -export([share_load_module/2]).
+-export([node_name/1]).
 
 -define(APPS_CLUSTERING, [gen_rpc, mria, ekka]).
 

+ 9 - 3
apps/emqx_dashboard/test/emqx_dashboard_api_test_helpers.erl

@@ -28,7 +28,8 @@
     multipart_formdata_request/4,
     host/0,
     uri/0,
-    uri/1
+    uri/1,
+    uri/2
 ]).
 
 -define(HOST, "http://127.0.0.1:18083").
@@ -96,10 +97,15 @@ request(Username, Method, Url, Body) ->
 host() ->
     ?HOST.
 
-uri() -> uri([]).
+uri() ->
+    uri([]).
+
 uri(Parts) when is_list(Parts) ->
+    uri(host(), Parts).
+
+uri(Host, Parts) when is_list(Host), is_list(Parts) ->
     NParts = [E || E <- Parts],
-    host() ++ "/" ++ to_list(filename:join([?BASE_PATH, ?API_VERSION | NParts])).
+    Host ++ "/" ++ to_list(filename:join([?BASE_PATH, ?API_VERSION | NParts])).
 
 auth_header(Username) ->
     Password = <<"public">>,

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_api_plugins.erl

@@ -420,7 +420,7 @@ update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->
         {error, Reason} ->
             {400, #{code => 'BAD_POSITION', message => Reason}};
         Position ->
-            case emqx_plugins:ensure_enabled(Name, Position) of
+            case emqx_plugins:ensure_enabled(Name, Position, _ConfLocation = global) of
                 ok ->
                     {200};
                 {error, Reason} ->

+ 182 - 8
apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl

@@ -19,11 +19,14 @@
 -compile(nowarn_export_all).
 
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
 
 -define(EMQX_PLUGIN_TEMPLATE_NAME, "emqx_plugin_template").
 -define(EMQX_PLUGIN_TEMPLATE_VSN, "5.0.0").
 -define(PACKAGE_SUFFIX, ".tar.gz").
 
+-define(CLUSTER_API_SERVER(PORT), ("http://127.0.0.1:" ++ (integer_to_list(PORT)))).
+
 all() ->
     emqx_common_test_helpers:all(?MODULE).
 
@@ -48,6 +51,25 @@ end_per_suite(Config) ->
     emqx_mgmt_api_test_util:end_suite([emqx_plugins, emqx_conf]),
     ok.
 
+init_per_testcase(t_cluster_update_order = TestCase, Config0) ->
+    Config = [{api_port, 18085} | Config0],
+    Cluster = [Node1 | _] = cluster(TestCase, Config),
+    {ok, API} = init_api(Node1),
+    [
+        {api, API},
+        {cluster, Cluster}
+        | Config
+    ];
+init_per_testcase(_TestCase, Config) ->
+    Config.
+
+end_per_testcase(t_cluster_update_order, Config) ->
+    Cluster = ?config(cluster, Config),
+    emqx_cth_cluster:stop(Cluster),
+    ok;
+end_per_testcase(_TestCase, _Config) ->
+    ok.
+
 t_plugins(Config) ->
     DemoShDir = proplists:get_value(demo_sh_dir, Config),
     PackagePath = get_demo_plugin_package(DemoShDir),
@@ -141,9 +163,83 @@ t_delete_non_existing(_Config) ->
     ),
     ok.
 
-list_plugins() ->
-    Path = emqx_mgmt_api_test_util:api_path(["plugins"]),
-    case emqx_mgmt_api_test_util:request_api(get, Path) of
+t_cluster_update_order(Config) ->
+    DemoShDir = proplists:get_value(demo_sh_dir, Config),
+    PackagePath1 = get_demo_plugin_package(DemoShDir),
+    NameVsn1 = filename:basename(PackagePath1, ?PACKAGE_SUFFIX),
+    Name2Str = ?EMQX_PLUGIN_TEMPLATE_NAME ++ "_a",
+    NameVsn2 = Name2Str ++ "-" ++ ?EMQX_PLUGIN_TEMPLATE_VSN,
+    PackagePath2 = create_renamed_package(PackagePath1, NameVsn2),
+    Name1 = list_to_binary(?EMQX_PLUGIN_TEMPLATE_NAME),
+    Name2 = list_to_binary(Name2Str),
+
+    ok = install_plugin(Config, PackagePath1),
+    ok = install_plugin(Config, PackagePath2),
+    %% to get them configured...
+    {ok, _} = update_plugin(Config, NameVsn1, "start"),
+    {ok, _} = update_plugin(Config, NameVsn2, "start"),
+
+    ?assertMatch(
+        {ok, [
+            #{<<"name">> := Name1},
+            #{<<"name">> := Name2}
+        ]},
+        list_plugins(Config)
+    ),
+
+    ct:pal("moving to rear"),
+    ?assertMatch({ok, _}, update_boot_order(NameVsn1, #{position => rear}, Config)),
+    ?assertMatch(
+        {ok, [
+            #{<<"name">> := Name2},
+            #{<<"name">> := Name1}
+        ]},
+        list_plugins(Config)
+    ),
+
+    ct:pal("moving to front"),
+    ?assertMatch({ok, _}, update_boot_order(NameVsn1, #{position => front}, Config)),
+    ?assertMatch(
+        {ok, [
+            #{<<"name">> := Name1},
+            #{<<"name">> := Name2}
+        ]},
+        list_plugins(Config)
+    ),
+
+    ct:pal("moving after"),
+    NameVsn2Bin = list_to_binary(NameVsn2),
+    ?assertMatch(
+        {ok, _},
+        update_boot_order(NameVsn1, #{position => <<"after:", NameVsn2Bin/binary>>}, Config)
+    ),
+    ?assertMatch(
+        {ok, [
+            #{<<"name">> := Name2},
+            #{<<"name">> := Name1}
+        ]},
+        list_plugins(Config)
+    ),
+
+    ct:pal("moving before"),
+    ?assertMatch(
+        {ok, _},
+        update_boot_order(NameVsn1, #{position => <<"before:", NameVsn2Bin/binary>>}, Config)
+    ),
+    ?assertMatch(
+        {ok, [
+            #{<<"name">> := Name1},
+            #{<<"name">> := Name2}
+        ]},
+        list_plugins(Config)
+    ),
+
+    ok.
+
+list_plugins(Config) ->
+    #{host := Host, auth := Auth} = get_host_and_auth(Config),
+    Path = emqx_mgmt_api_test_util:api_path(Host, ["plugins"]),
+    case emqx_mgmt_api_test_util:request_api(get, Path, Auth) of
         {ok, Apps} -> {ok, emqx_utils_json:decode(Apps, [return_maps])};
         Error -> Error
     end.
@@ -172,16 +268,46 @@ install_plugin(FilePath) ->
         Error -> Error
     end.
 
+install_plugin(Config, FilePath) ->
+    #{host := Host, auth := Auth} = get_host_and_auth(Config),
+    Path = emqx_mgmt_api_test_util:api_path(Host, ["plugins", "install"]),
+    case
+        emqx_mgmt_api_test_util:upload_request(
+            Path,
+            FilePath,
+            "plugin",
+            <<"application/gzip">>,
+            [],
+            Auth
+        )
+    of
+        {ok, {{"HTTP/1.1", 200, "OK"}, _Headers, <<>>}} -> ok;
+        Error -> Error
+    end.
+
 update_plugin(Name, Action) ->
     Path = emqx_mgmt_api_test_util:api_path(["plugins", Name, Action]),
     emqx_mgmt_api_test_util:request_api(put, Path).
 
-update_boot_order(Name, MoveBody) ->
-    Auth = emqx_mgmt_api_test_util:auth_header_(),
-    Path = emqx_mgmt_api_test_util:api_path(["plugins", Name, "move"]),
+update_plugin(Config, Name, Action) when is_list(Config) ->
+    #{host := Host, auth := Auth} = get_host_and_auth(Config),
+    Path = emqx_mgmt_api_test_util:api_path(Host, ["plugins", Name, Action]),
+    emqx_mgmt_api_test_util:request_api(put, Path, Auth).
+
+update_boot_order(Name, MoveBody, Config) ->
+    #{host := Host, auth := Auth} = get_host_and_auth(Config),
+    Path = emqx_mgmt_api_test_util:api_path(Host, ["plugins", Name, "move"]),
     case emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, MoveBody) of
-        {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
-        Error -> Error
+        {ok, Res} ->
+            Resp =
+                case emqx_utils_json:safe_decode(Res, [return_maps]) of
+                    {ok, Decoded} -> Decoded;
+                    {error, _} -> Res
+                end,
+            ct:pal("update_boot_order response:\n  ~p", [Resp]),
+            {ok, Resp};
+        Error ->
+            Error
     end.
 
 uninstall_plugin(Name) ->
@@ -218,3 +344,51 @@ update_release_json(["release.json"], FileContent, NewName) ->
     emqx_utils_json:encode(ContentMap#{<<"name">> => NewName});
 update_release_json(_FileName, FileContent, _NewName) ->
     FileContent.
+
+cluster(TestCase, Config) ->
+    APIPort = ?config(api_port, Config),
+    AppSpecs = app_specs(Config),
+    Node1Apps = AppSpecs ++ [app_spec_dashboard(APIPort)],
+    Node2Apps = AppSpecs,
+    Node1Name = emqx_mgmt_api_plugins_SUITE1,
+    Node1 = emqx_cth_cluster:node_name(Node1Name),
+    emqx_cth_cluster:start(
+        [
+            {Node1Name, #{role => core, apps => Node1Apps, join_to => Node1}},
+            {emqx_mgmt_api_plugins_SUITE2, #{role => core, apps => Node2Apps, join_to => Node1}}
+        ],
+        #{work_dir => filename:join(?config(priv_dir, Config), TestCase)}
+    ).
+
+app_specs(_Config) ->
+    [
+        emqx_conf,
+        emqx,
+        emqx_management,
+        emqx_plugins
+    ].
+
+app_spec_dashboard(APIPort) ->
+    {emqx_dashboard, #{
+        config =>
+            #{
+                dashboard =>
+                    #{
+                        listeners =>
+                            #{
+                                http =>
+                                    #{bind => APIPort}
+                            }
+                    }
+            }
+    }}.
+
+init_api(Node) ->
+    erpc:call(Node, emqx_common_test_http, create_default_app, []).
+
+get_host_and_auth(Config) when is_list(Config) ->
+    API = ?config(api, Config),
+    APIPort = ?config(api_port, Config),
+    Host = ?CLUSTER_API_SERVER(APIPort),
+    Auth = emqx_common_test_http:auth_header(API),
+    #{host => Host, auth => Auth}.

+ 16 - 3
apps/emqx_management/test/emqx_mgmt_api_test_util.erl

@@ -61,6 +61,9 @@ request(Method, Url, Body) ->
 uri(Parts) ->
     emqx_dashboard_api_test_helpers:uri(Parts).
 
+uri(Host, Parts) ->
+    emqx_dashboard_api_test_helpers:uri(Host, Parts).
+
 %% compatible_mode will return as same as 'emqx_dashboard_api_test_helpers:request'
 request_api_with_body(Method, Url, Body) ->
     Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]},
@@ -144,9 +147,15 @@ build_http_header(X) when is_list(X) ->
 build_http_header(X) ->
     [X].
 
+default_server() ->
+    ?SERVER.
+
 api_path(Parts) ->
     join_http_path([?SERVER, ?BASE_PATH | Parts]).
 
+api_path(Host, Parts) ->
+    join_http_path([Host, ?BASE_PATH | Parts]).
+
 api_path_without_base_path(Parts) ->
     join_http_path([?SERVER | Parts]).
 
@@ -193,9 +202,13 @@ upload_request(URL, FilePath, Name, MimeType, RequestData, AuthorizationToken) -
     ContentLength = integer_to_list(length(binary_to_list(RequestBody))),
     Headers = [
         {"Content-Length", ContentLength},
-        case AuthorizationToken =/= undefined of
-            true -> {"Authorization", "Bearer " ++ binary_to_list(AuthorizationToken)};
-            false -> {}
+        case AuthorizationToken of
+            _ when is_tuple(AuthorizationToken) ->
+                AuthorizationToken;
+            _ when is_binary(AuthorizationToken) ->
+                {"Authorization", "Bearer " ++ binary_to_list(AuthorizationToken)};
+            _ ->
+                {}
         end
     ],
     HTTPOptions = [],

+ 1 - 1
apps/emqx_plugins/src/emqx_plugins.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_plugins, [
     {description, "EMQX Plugin Management"},
-    {vsn, "0.1.5"},
+    {vsn, "0.1.6"},
     {modules, []},
     {mod, {emqx_plugins_app, []}},
     {applications, [kernel, stdlib, emqx]},

+ 31 - 12
apps/emqx_plugins/src/emqx_plugins.erl

@@ -29,6 +29,7 @@
     ensure_uninstalled/1,
     ensure_enabled/1,
     ensure_enabled/2,
+    ensure_enabled/3,
     ensure_disabled/1,
     purge/1,
     delete_package/1
@@ -240,28 +241,34 @@ ensure_enabled(NameVsn) ->
 %% @doc Ensure a plugin is enabled at the given position of the plugin list.
 -spec ensure_enabled(name_vsn(), position()) -> ok | {error, any()}.
 ensure_enabled(NameVsn, Position) ->
-    ensure_state(NameVsn, Position, true).
+    ensure_state(NameVsn, Position, _Enabled = true, _ConfLocation = local).
+
+-spec ensure_enabled(name_vsn(), position(), local | global) -> ok | {error, any()}.
+ensure_enabled(NameVsn, Position, ConfLocation) when
+    ConfLocation =:= local; ConfLocation =:= global
+->
+    ensure_state(NameVsn, Position, _Enabled = true, ConfLocation).
 
 %% @doc Ensure a plugin is disabled.
 -spec ensure_disabled(name_vsn()) -> ok | {error, any()}.
 ensure_disabled(NameVsn) ->
-    ensure_state(NameVsn, no_move, false).
+    ensure_state(NameVsn, no_move, false, _ConfLocation = local).
 
-ensure_state(NameVsn, Position, State) when is_binary(NameVsn) ->
-    ensure_state(binary_to_list(NameVsn), Position, State);
-ensure_state(NameVsn, Position, State) ->
+ensure_state(NameVsn, Position, State, ConfLocation) when is_binary(NameVsn) ->
+    ensure_state(binary_to_list(NameVsn), Position, State, ConfLocation);
+ensure_state(NameVsn, Position, State, ConfLocation) ->
     case read_plugin(NameVsn, #{}) of
         {ok, _} ->
             Item = #{
                 name_vsn => NameVsn,
                 enable => State
             },
-            tryit("ensure_state", fun() -> ensure_configured(Item, Position) end);
+            tryit("ensure_state", fun() -> ensure_configured(Item, Position, ConfLocation) end);
         {error, Reason} ->
             {error, Reason}
     end.
 
-ensure_configured(#{name_vsn := NameVsn} = Item, Position) ->
+ensure_configured(#{name_vsn := NameVsn} = Item, Position, ConfLocation) ->
     Configured = configured(),
     SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
     {Front, Rear} = lists:splitwith(SplitFun, Configured),
@@ -274,7 +281,7 @@ ensure_configured(#{name_vsn := NameVsn} = Item, Position) ->
             [] ->
                 add_new_configured(Configured, Position, Item)
         end,
-    ok = put_configured(NewConfigured).
+    ok = put_configured(NewConfigured, ConfLocation).
 
 add_new_configured(Configured, no_move, Item) ->
     %% default to rear
@@ -787,14 +794,23 @@ is_needed_by(AppToStop, RunningApp) ->
         undefined -> false
     end.
 
-put_config(Key, Value) when is_atom(Key) ->
-    put_config([Key], Value);
-put_config(Path, Values) when is_list(Path) ->
+put_config(Key, Value) ->
+    put_config(Key, Value, _ConfLocation = local).
+
+put_config(Key, Value, ConfLocation) when is_atom(Key) ->
+    put_config([Key], Value, ConfLocation);
+put_config(Path, Values, _ConfLocation = local) when is_list(Path) ->
     Opts = #{rawconf_with_defaults => true, override_to => cluster},
     %% Already in cluster_rpc, don't use emqx_conf:update, dead calls
     case emqx:update_config([?CONF_ROOT | Path], bin_key(Values), Opts) of
         {ok, _} -> ok;
         Error -> Error
+    end;
+put_config(Path, Values, _ConfLocation = global) when is_list(Path) ->
+    Opts = #{rawconf_with_defaults => true, override_to => cluster},
+    case emqx_conf:update([?CONF_ROOT | Path], bin_key(Values), Opts) of
+        {ok, _} -> ok;
+        Error -> Error
     end.
 
 bin_key(Map) when is_map(Map) ->
@@ -812,7 +828,10 @@ get_config(Path, Default) ->
 install_dir() -> get_config(install_dir, "").
 
 put_configured(Configured) ->
-    ok = put_config(states, bin_key(Configured)).
+    put_configured(Configured, _ConfLocation = local).
+
+put_configured(Configured, ConfLocation) ->
+    ok = put_config(states, bin_key(Configured), ConfLocation).
 
 configured() ->
     get_config(states, []).

+ 1 - 1
apps/emqx_plugins/src/emqx_plugins_cli.erl

@@ -70,7 +70,7 @@ restart(NameVsn, LogFun) ->
     ?PRINT(emqx_plugins:restart(NameVsn), LogFun).
 
 ensure_enabled(NameVsn, Position, LogFun) ->
-    ?PRINT(emqx_plugins:ensure_enabled(NameVsn, Position), LogFun).
+    ?PRINT(emqx_plugins:ensure_enabled(NameVsn, Position, _ConfLocation = global), LogFun).
 
 ensure_disabled(NameVsn, LogFun) ->
     ?PRINT(emqx_plugins:ensure_disabled(NameVsn), LogFun).

+ 8 - 4
apps/emqx_plugins/test/emqx_plugins_SUITE.erl

@@ -216,7 +216,7 @@ t_position(Config) ->
     PosApp2 = <<"position-2">>,
     ok = write_info_file(Config, PosApp2, FakeInfo),
     %% fake a disabled plugin in config
-    ok = emqx_plugins:ensure_state(PosApp2, {before, NameVsn}, false),
+    ok = ensure_state(PosApp2, {before, NameVsn}, false),
     ListFun = fun() ->
         lists:map(
             fun(
@@ -255,14 +255,14 @@ t_start_restart_and_stop(Config) ->
     Bar2 = <<"bar-2">>,
     ok = write_info_file(Config, Bar2, FakeInfo),
     %% fake a disabled plugin in config
-    ok = emqx_plugins:ensure_state(Bar2, front, false),
+    ok = ensure_state(Bar2, front, false),
 
     assert_app_running(emqx_plugin_template, false),
     ok = emqx_plugins:ensure_started(),
     assert_app_running(emqx_plugin_template, true),
 
     %% fake enable bar-2
-    ok = emqx_plugins:ensure_state(Bar2, rear, true),
+    ok = ensure_state(Bar2, rear, true),
     %% should cause an error
     ?assertError(
         #{function := _, errors := [_ | _]},
@@ -274,7 +274,7 @@ t_start_restart_and_stop(Config) ->
     %% stop all
     ok = emqx_plugins:ensure_stopped(),
     assert_app_running(emqx_plugin_template, false),
-    ok = emqx_plugins:ensure_state(Bar2, rear, false),
+    ok = ensure_state(Bar2, rear, false),
 
     ok = emqx_plugins:restart(NameVsn),
     assert_app_running(emqx_plugin_template, true),
@@ -826,3 +826,7 @@ make_tar(Cwd, NameWithVsn, TarfileVsn) ->
     after
         file:set_cwd(OriginalCwd)
     end.
+
+ensure_state(NameVsn, Position, Enabled) ->
+    %% NOTE: this is an internal function that is (legacy) exported in test builds only...
+    emqx_plugins:ensure_state(NameVsn, Position, Enabled, _ConfLocation = local).

+ 1 - 0
changes/ce/fix-11548.en.md

@@ -0,0 +1 @@
+Fixed an issue that prevented the plugin order to be updated on the whole cluster.