Przeglądaj źródła

Merge pull request #13063 from JimMoen/improve-plugin-config-behavior

fix(plugin): serde not found on new-joined nodes
JianBo He 1 rok temu
rodzic
commit
4053356597

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -64,6 +64,7 @@
 {emqx_node_rebalance_status,2}.
 {emqx_persistent_session_ds,1}.
 {emqx_plugins,1}.
+{emqx_plugins,2}.
 {emqx_prometheus,1}.
 {emqx_prometheus,2}.
 {emqx_resource,1}.

+ 21 - 10
apps/emqx_management/src/emqx_mgmt_api_plugins.erl

@@ -21,6 +21,8 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx_plugins/include/emqx_plugins.hrl").
 
+-dialyzer({no_match, [format_plugin_avsc_and_i18n/1]}).
+
 -export([
     api_spec/0,
     fields/1,
@@ -178,6 +180,9 @@ schema("/plugins/:name/config") ->
             responses => #{
                 %% avro data, json encoded
                 200 => hoconsc:mk(binary()),
+                400 => emqx_dashboard_swagger:error_codes(
+                    ['BAD_CONFIG'], <<"Plugin Config Not Found">>
+                ),
                 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>)
             }
         },
@@ -488,13 +493,13 @@ update_plugin(put, #{bindings := #{name := Name, action := Action}}) ->
 plugin_config(get, #{bindings := #{name := NameVsn}}) ->
     case emqx_plugins:describe(NameVsn) of
         {ok, _} ->
-            case emqx_plugins:get_config(NameVsn) of
-                {ok, AvroJson} ->
+            case emqx_plugins:get_config(NameVsn, ?CONFIG_FORMAT_MAP, ?plugin_conf_not_found) of
+                {ok, AvroJson} when is_map(AvroJson) ->
                     {200, #{<<"content-type">> => <<"'application/json'">>}, AvroJson};
-                {error, _} ->
+                {ok, ?plugin_conf_not_found} ->
                     {400, #{
                         code => 'BAD_CONFIG',
-                        message => <<"Failed to get plugin config">>
+                        message => <<"Plugin Config Not Found">>
                     }}
             end;
         _ ->
@@ -503,7 +508,7 @@ plugin_config(get, #{bindings := #{name := NameVsn}}) ->
 plugin_config(put, #{bindings := #{name := NameVsn}, body := AvroJsonMap}) ->
     case emqx_plugins:describe(NameVsn) of
         {ok, _} ->
-            case emqx_plugins:decode_plugin_avro_config(NameVsn, AvroJsonMap) of
+            case emqx_plugins:decode_plugin_config_map(NameVsn, AvroJsonMap) of
                 {ok, AvroValueConfig} ->
                     Nodes = emqx:running_nodes(),
                     %% cluster call with config in map (binary key-value)
@@ -534,7 +539,7 @@ update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->
         {error, Reason} ->
             {400, #{code => 'BAD_POSITION', message => Reason}};
         Position ->
-            case emqx_plugins:ensure_enabled(Name, Position, _ConfLocation = global) of
+            case emqx_plugins:ensure_enabled(Name, Position, global) of
                 ok ->
                     {204};
                 {error, Reason} ->
@@ -599,9 +604,9 @@ ensure_action(Name, restart) ->
     ok.
 
 %% for RPC plugin avro encoded config update
-do_update_plugin_config(Name, AvroJsonMap, PluginConfigMap) ->
+do_update_plugin_config(NameVsn, AvroJsonMap, PluginConfigMap) ->
     %% TODO: maybe use `PluginConfigMap` to validate config
-    emqx_plugins:put_config(Name, AvroJsonMap, PluginConfigMap).
+    emqx_plugins:put_config(NameVsn, AvroJsonMap, PluginConfigMap).
 
 %%--------------------------------------------------------------------
 %% Helper functions
@@ -694,10 +699,11 @@ aggregate_status([{Node, Plugins} | List], Acc) ->
         ),
     aggregate_status(List, NewAcc).
 
+-if(?EMQX_RELEASE_EDITION == ee).
 format_plugin_avsc_and_i18n(NameVsn) ->
     #{
-        avsc => try_read_file(fun() -> emqx_plugins:plugin_avsc(NameVsn) end),
-        i18n => try_read_file(fun() -> emqx_plugins:plugin_i18n(NameVsn) end)
+        avsc => try_read_file(fun() -> emqx_plugins:plugin_schema_json(NameVsn) end),
+        i18n => try_read_file(fun() -> emqx_plugins:plugin_i18n_json(NameVsn) end)
     }.
 
 try_read_file(Fun) ->
@@ -706,6 +712,11 @@ try_read_file(Fun) ->
         _ -> null
     end.
 
+-else.
+format_plugin_avsc_and_i18n(_NameVsn) ->
+    #{avsc => null, i18n => null}.
+-endif.
+
 % running_status: running loaded, stopped
 %% config_status: not_configured disable enable
 plugin_status(#{running_status := running}) -> running;

+ 13 - 1
apps/emqx_plugins/include/emqx_plugins.hrl

@@ -21,15 +21,27 @@
 
 -define(PLUGIN_SERDE_TAB, emqx_plugins_schema_serde_tab).
 
--define(CONFIG_FORMAT_AVRO, config_format_avro).
+-define(CONFIG_FORMAT_BIN, config_format_bin).
 -define(CONFIG_FORMAT_MAP, config_format_map).
 
+-define(plugin_conf_not_found, plugin_conf_not_found).
+
 -type schema_name() :: binary().
 -type avsc_path() :: string().
 
 -type encoded_data() :: iodata().
 -type decoded_data() :: map().
 
+%% "my_plugin-0.1.0"
+-type name_vsn() :: binary() | string().
+%% the parse result of the JSON info file
+-type plugin_info() :: map().
+-type schema_json_map() :: map().
+-type i18n_json_map() :: map().
+-type raw_plugin_config_content() :: binary().
+-type plugin_config_map() :: map().
+-type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}.
+
 -record(plugin_schema_serde, {
     name :: schema_name(),
     eval_context :: term(),

+ 0 - 8
apps/emqx_plugins/src/emqx_plugins.appup.src

@@ -1,8 +0,0 @@
-%% -*- mode: erlang -*-
-{"0.1.0",
- [ {<<".*">>, []}
- ],
- [
-   {<<".*">>, []}
- ]
-}.

+ 298 - 116
apps/emqx_plugins/src/emqx_plugins.erl

@@ -16,8 +16,11 @@
 
 -module(emqx_plugins).
 
--include_lib("emqx/include/logger.hrl").
+-feature(maybe_expr, enable).
+
 -include("emqx_plugins.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
 
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
@@ -25,15 +28,16 @@
 
 -export([
     describe/1,
-    plugin_avsc/1,
-    plugin_i18n/1,
-    plugin_avro/1,
+    plugin_schema_json/1,
+    plugin_i18n_json/1,
+    raw_plugin_config_content/1,
     parse_name_vsn/1,
     make_name_vsn_string/2
 ]).
 
 %% Package operations
 -export([
+    ensure_installed/0,
     ensure_installed/1,
     ensure_uninstalled/1,
     ensure_enabled/1,
@@ -65,9 +69,10 @@
 
 %% Package utils
 -export([
-    decode_plugin_avro_config/2,
+    decode_plugin_config_map/2,
     install_dir/0,
-    avsc_file_path/1
+    avsc_file_path/1,
+    with_plugin_avsc/1
 ]).
 
 %% `emqx_config_handler' API
@@ -79,7 +84,10 @@
 -export([get_tar/1]).
 
 %% Internal export
--export([do_ensure_started/1]).
+-export([
+    ensure_config_map/1,
+    do_ensure_started/1
+]).
 %% for test cases
 -export([put_config_internal/2]).
 
@@ -96,36 +104,26 @@
 
 -define(MAX_KEEP_BACKUP_CONFIGS, 10).
 
-%% "my_plugin-0.1.0"
--type name_vsn() :: binary() | string().
-%% the parse result of the JSON info file
--type plugin() :: map().
--type schema_json() :: map().
--type i18n_json() :: map().
--type avro_binary() :: binary().
--type plugin_config() :: map().
--type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}.
-
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------
 
 %% @doc Describe a plugin.
--spec describe(name_vsn()) -> {ok, plugin()} | {error, any()}.
+-spec describe(name_vsn()) -> {ok, plugin_info()} | {error, any()}.
 describe(NameVsn) ->
     read_plugin_info(NameVsn, #{fill_readme => true}).
 
--spec plugin_avsc(name_vsn()) -> {ok, schema_json()} | {error, any()}.
-plugin_avsc(NameVsn) ->
+-spec plugin_schema_json(name_vsn()) -> {ok, schema_json_map()} | {error, any()}.
+plugin_schema_json(NameVsn) ->
     read_plugin_avsc(NameVsn).
 
--spec plugin_i18n(name_vsn()) -> {ok, i18n_json()} | {error, any()}.
-plugin_i18n(NameVsn) ->
+-spec plugin_i18n_json(name_vsn()) -> {ok, i18n_json_map()} | {error, any()}.
+plugin_i18n_json(NameVsn) ->
     read_plugin_i18n(NameVsn).
 
--spec plugin_avro(name_vsn()) -> {ok, avro_binary()} | {error, any()}.
-plugin_avro(NameVsn) ->
-    read_plugin_avro(NameVsn).
+-spec raw_plugin_config_content(name_vsn()) -> {ok, raw_plugin_config_content()} | {error, any()}.
+raw_plugin_config_content(NameVsn) ->
+    read_plugin_hocon(NameVsn).
 
 parse_name_vsn(NameVsn) when is_binary(NameVsn) ->
     parse_name_vsn(binary_to_list(NameVsn));
@@ -141,15 +139,32 @@ make_name_vsn_string(Name, Vsn) ->
 %%--------------------------------------------------------------------
 %% Package operations
 
+%% @doc Start all configured plugins are started.
+-spec ensure_installed() -> ok.
+ensure_installed() ->
+    Fun = fun(#{name_vsn := NameVsn}) ->
+        case ensure_installed(NameVsn) of
+            ok -> [];
+            {error, Reason} -> [{NameVsn, Reason}]
+        end
+    end,
+    ok = for_plugins(Fun).
+
 %% @doc Install a .tar.gz package placed in install_dir.
 -spec ensure_installed(name_vsn()) -> ok | {error, map()}.
 ensure_installed(NameVsn) ->
     case read_plugin_info(NameVsn, #{}) of
         {ok, _} ->
-            ok;
+            ok,
+            _ = maybe_ensure_plugin_config(NameVsn);
         {error, _} ->
             ok = purge(NameVsn),
-            do_ensure_installed(NameVsn)
+            case ensure_exists_and_installed(NameVsn) of
+                ok ->
+                    maybe_post_op_after_installed(NameVsn);
+                {error, _Reason} = Err ->
+                    Err
+            end
     end.
 
 %% @doc Ensure files and directories for the given plugin are being deleted.
@@ -230,7 +245,17 @@ delete_package(NameVsn) ->
 %% @doc Start all configured plugins are started.
 -spec ensure_started() -> ok.
 ensure_started() ->
-    ok = for_plugins(fun ?MODULE:do_ensure_started/1).
+    Fun = fun
+        (#{name_vsn := NameVsn, enable := true}) ->
+            case do_ensure_started(NameVsn) of
+                ok -> [];
+                {error, Reason} -> [{NameVsn, Reason}]
+            end;
+        (#{name_vsn := NameVsn, enable := false}) ->
+            ?SLOG(debug, #{msg => "plugin_disabled", name_vsn => NameVsn}),
+            []
+    end,
+    ok = for_plugins(Fun).
 
 %% @doc Start a plugin from Management API or CLI.
 %% the input is a <name>-<vsn> string.
@@ -247,7 +272,17 @@ ensure_started(NameVsn) ->
 %% @doc Stop all plugins before broker stops.
 -spec ensure_stopped() -> ok.
 ensure_stopped() ->
-    for_plugins(fun ?MODULE:ensure_stopped/1).
+    Fun = fun
+        (#{name_vsn := NameVsn, enable := true}) ->
+            case ensure_stopped(NameVsn) of
+                ok -> [];
+                {error, Reason} -> [{NameVsn, Reason}]
+            end;
+        (#{name_vsn := NameVsn, enable := false}) ->
+            ?SLOG(debug, #{msg => "plugin_disabled", action => stop_plugin, name_vsn => NameVsn}),
+            []
+    end,
+    ok = for_plugins(Fun).
 
 %% @doc Stop a plugin from Management API or CLI.
 -spec ensure_stopped(name_vsn()) -> ok | {error, term()}.
@@ -260,37 +295,48 @@ ensure_stopped(NameVsn) ->
         end
     ).
 
-get_config(Name, Vsn, Options, Default) ->
-    get_config(make_name_vsn_string(Name, Vsn), Options, Default).
+get_config(Name, Vsn, Opt, Default) ->
+    get_config(make_name_vsn_string(Name, Vsn), Opt, Default).
 
 -spec get_config(name_vsn()) ->
-    {ok, plugin_config()}
+    {ok, plugin_config_map() | any()}
     | {error, term()}.
 get_config(NameVsn) ->
-    get_config(bin(NameVsn), #{format => ?CONFIG_FORMAT_MAP}).
+    get_config(NameVsn, ?CONFIG_FORMAT_MAP, #{}).
 
--spec get_config(name_vsn(), Options :: map()) ->
-    {ok, avro_binary() | plugin_config()}
+-spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP | ?CONFIG_FORMAT_BIN) ->
+    {ok, raw_plugin_config_content() | plugin_config_map() | any()}
+    | {error, term()}.
+get_config(NameVsn, ?CONFIG_FORMAT_MAP) ->
+    get_config(NameVsn, ?CONFIG_FORMAT_MAP, #{});
+get_config(NameVsn, ?CONFIG_FORMAT_BIN) ->
+    get_config_bin(NameVsn).
+
+%% Present default config value only in map format.
+-spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP, any()) ->
+    {ok, plugin_config_map() | any()}
     | {error, term()}.
-get_config(NameVsn, #{format := ?CONFIG_FORMAT_AVRO}) ->
+get_config(NameVsn, ?CONFIG_FORMAT_MAP, Default) ->
+    {ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(bin(NameVsn)), Default)}.
+
+get_config_bin(NameVsn) ->
     %% no default value when get raw binary config
-    case read_plugin_avro(NameVsn) of
-        {ok, _AvroJson} = Res -> Res;
+    case read_plugin_hocon(NameVsn) of
+        {ok, _Map} = Res -> Res;
         {error, _Reason} = Err -> Err
-    end;
-get_config(NameVsn, Options = #{format := ?CONFIG_FORMAT_MAP}) ->
-    get_config(NameVsn, Options, #{}).
-
-get_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}, Default) ->
-    {ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), Default)}.
+    end.
 
 %% @doc Update plugin's config.
 %% RPC call from Management API or CLI.
-%% the avro Json Map and plugin config ALWAYS be valid before calling this function.
-put_config(NameVsn, AvroJsonMap, _DecodedPluginConfig) ->
-    AvroJsonBin = emqx_utils_json:encode(AvroJsonMap),
-    ok = backup_and_write_avro_bin(NameVsn, AvroJsonBin),
-    ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), AvroJsonMap),
+%% the plugin config Json Map and plugin config ALWAYS be valid before calling this function.
+put_config(NameVsn, ConfigJsonMap, DecodedPluginConfig) when not is_binary(NameVsn) ->
+    put_config(bin(NameVsn), ConfigJsonMap, DecodedPluginConfig);
+put_config(NameVsn, ConfigJsonMap, _DecodedPluginConfig) ->
+    HoconBin = hocon_pp:do(ConfigJsonMap, #{}),
+    ok = backup_and_write_hocon_bin(NameVsn, HoconBin),
+    %% TODO: callback in plugin's on_config_changed (config update by mgmt API)
+    %% TODO: callback in plugin's on_config_upgraded (config vsn upgrade v1 -> v2)
+    ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), ConfigJsonMap),
     ok.
 
 %% @doc Stop and then start the plugin.
@@ -302,7 +348,7 @@ restart(NameVsn) ->
 
 %% @doc List all installed plugins.
 %% Including the ones that are installed, but not enabled in config.
--spec list() -> [plugin()].
+-spec list() -> [plugin_info()].
 list() ->
     Pattern = filename:join([install_dir(), "*", "release.json"]),
     All = lists:filtermap(
@@ -323,15 +369,24 @@ list() ->
 %%--------------------------------------------------------------------
 %% Package utils
 
--spec decode_plugin_avro_config(name_vsn(), map() | binary()) -> {ok, map()} | {error, any()}.
-decode_plugin_avro_config(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) ->
-    decode_plugin_avro_config(NameVsn, emqx_utils_json:encode(AvroJsonMap));
-decode_plugin_avro_config(NameVsn, AvroJsonBin) ->
+-spec decode_plugin_config_map(name_vsn(), map() | binary()) -> {ok, map()} | {error, any()}.
+decode_plugin_config_map(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) ->
+    decode_plugin_config_map(NameVsn, emqx_utils_json:encode(AvroJsonMap));
+decode_plugin_config_map(NameVsn, AvroJsonBin) ->
     case emqx_plugins_serde:decode(NameVsn, AvroJsonBin) of
         {ok, Config} -> {ok, Config};
         {error, ReasonMap} -> {error, ReasonMap}
     end.
 
+-spec with_plugin_avsc(name_vsn()) -> boolean().
+with_plugin_avsc(NameVsn) ->
+    case read_plugin_info(NameVsn, #{fill_readme => false}) of
+        {ok, #{<<"with_config_schema">> := WithAvsc}} when is_boolean(WithAvsc) ->
+            WithAvsc;
+        _ ->
+            false
+    end.
+
 get_config_interal(Key, Default) when is_atom(Key) ->
     get_config_interal([Key], Default);
 get_config_interal(Path, Default) ->
@@ -438,7 +493,6 @@ do_ensure_installed(NameVsn) ->
             ok = write_tar_file_content(install_dir(), TarContent),
             case read_plugin_info(NameVsn, #{}) of
                 {ok, _} ->
-                    ok = maybe_post_op_after_install(NameVsn),
                     ok;
                 {error, Reason} ->
                     ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}),
@@ -603,7 +657,11 @@ tryit(WhichOp, F) ->
                 exception => Reason,
                 stacktrace => Stacktrace
             }),
-            {error, {failed, WhichOp}}
+            {error, #{
+                which_op => WhichOp,
+                exception => Reason,
+                stacktrace => Stacktrace
+            }}
     end.
 
 %% read plugin info from the JSON file
@@ -611,16 +669,16 @@ tryit(WhichOp, F) ->
 read_plugin_info(NameVsn, Options) ->
     tryit(
         atom_to_list(?FUNCTION_NAME),
-        fun() -> {ok, do_read_plugin2(NameVsn, Options)} end
+        fun() -> {ok, do_read_plugin(NameVsn, Options)} end
     ).
 
 do_read_plugin(NameVsn) ->
-    do_read_plugin2(NameVsn, #{}).
+    do_read_plugin(NameVsn, #{}).
 
-do_read_plugin2(NameVsn, Option) ->
-    do_read_plugin3(NameVsn, info_file_path(NameVsn), Option).
+do_read_plugin(NameVsn, Option) ->
+    do_read_plugin(NameVsn, info_file_path(NameVsn), Option).
 
-do_read_plugin3(NameVsn, InfoFilePath, Options) ->
+do_read_plugin(NameVsn, InfoFilePath, Options) ->
     {ok, PlainMap} = (read_file_fun(InfoFilePath, "bad_info_file", #{read_mode => ?JSON_MAP}))(),
     Info0 = check_plugin(PlainMap, NameVsn, InfoFilePath),
     Info1 = plugins_readme(NameVsn, Options, Info0),
@@ -642,12 +700,12 @@ read_plugin_i18n(NameVsn, Options) ->
         read_file_fun(i18n_file_path(NameVsn), "bad_i18n_file", Options)
     ).
 
-read_plugin_avro(NameVsn) ->
-    read_plugin_avro(NameVsn, #{read_mode => ?RAW_BIN}).
-read_plugin_avro(NameVsn, Options) ->
+read_plugin_hocon(NameVsn) ->
+    read_plugin_hocon(NameVsn, #{read_mode => ?RAW_BIN}).
+read_plugin_hocon(NameVsn, Options) ->
     tryit(
         atom_to_list(?FUNCTION_NAME),
-        read_file_fun(avro_config_file(NameVsn), "bad_avro_file", Options)
+        read_file_fun(plugin_config_file(NameVsn), "bad_hocon_file", Options)
     ).
 
 ensure_exists_and_installed(NameVsn) ->
@@ -659,7 +717,7 @@ ensure_exists_and_installed(NameVsn) ->
             case get_tar(NameVsn) of
                 {ok, TarContent} ->
                     ok = file:write_file(pkg_file_path(NameVsn), TarContent),
-                    ok = do_ensure_installed(NameVsn);
+                    do_ensure_installed(NameVsn);
                 _ ->
                     %% If not, try to get it from the cluster.
                     do_get_from_cluster(NameVsn)
@@ -668,33 +726,51 @@ ensure_exists_and_installed(NameVsn) ->
 
 do_get_from_cluster(NameVsn) ->
     Nodes = [N || N <- mria:running_nodes(), N /= node()],
-    case get_from_any_node(Nodes, NameVsn, []) of
+    case get_plugin_tar_from_any_node(Nodes, NameVsn, []) of
         {ok, TarContent} ->
             ok = file:write_file(pkg_file_path(NameVsn), TarContent),
             ok = do_ensure_installed(NameVsn);
         {error, NodeErrors} when Nodes =/= [] ->
-            ?SLOG(error, #{
-                msg => "failed_to_copy_plugin_from_other_nodes",
+            ErrMeta = #{
+                error_msg => "failed_to_copy_plugin_from_other_nodes",
                 name_vsn => NameVsn,
-                node_errors => NodeErrors
-            }),
-            {error, plugin_not_found};
+                node_errors => NodeErrors,
+                reason => not_found
+            },
+            ?SLOG(error, ErrMeta),
+            {error, ErrMeta};
         {error, _} ->
-            ?SLOG(error, #{
-                msg => "no_nodes_to_copy_plugin_from",
-                name_vsn => NameVsn
-            }),
-            {error, plugin_not_found}
+            ErrMeta = #{
+                error_msg => "no_nodes_to_copy_plugin_from",
+                name_vsn => NameVsn,
+                reason => not_found
+            },
+            ?SLOG(error, ErrMeta),
+            {error, ErrMeta}
     end.
 
-get_from_any_node([], _NameVsn, Errors) ->
+get_plugin_tar_from_any_node([], _NameVsn, Errors) ->
     {error, Errors};
-get_from_any_node([Node | T], NameVsn, Errors) ->
+get_plugin_tar_from_any_node([Node | T], NameVsn, Errors) ->
     case emqx_plugins_proto_v1:get_tar(Node, NameVsn, infinity) of
         {ok, _} = Res ->
             Res;
         Err ->
-            get_from_any_node(T, NameVsn, [{Node, Err} | Errors])
+            get_plugin_tar_from_any_node(T, NameVsn, [{Node, Err} | Errors])
+    end.
+
+get_plugin_config_from_any_node([], _NameVsn, Errors) ->
+    {error, Errors};
+get_plugin_config_from_any_node([Node | T], NameVsn, Errors) ->
+    case
+        emqx_plugins_proto_v2:get_config(
+            Node, NameVsn, ?CONFIG_FORMAT_MAP, ?plugin_conf_not_found, 5_000
+        )
+    of
+        {ok, _} = Res ->
+            Res;
+        Err ->
+            get_plugin_config_from_any_node(T, NameVsn, [{Node, Err} | Errors])
     end.
 
 plugins_readme(NameVsn, #{fill_readme := true}, Info) ->
@@ -1011,29 +1087,31 @@ configured() ->
     get_config_interal(states, []).
 
 for_plugins(ActionFun) ->
-    case lists:flatmap(fun(I) -> for_plugin(I, ActionFun) end, configured()) of
-        [] -> ok;
-        Errors -> erlang:error(#{function => ActionFun, errors => Errors})
+    case lists:flatmap(ActionFun, configured()) of
+        [] ->
+            ok;
+        Errors ->
+            ErrMeta = #{function => ActionFun, errors => Errors},
+            ?tp(
+                for_plugins_action_error_occurred,
+                ErrMeta
+            ),
+            ?SLOG(error, ErrMeta),
+            ok
     end.
 
-for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) ->
-    case Fun(NameVsn) of
-        ok -> [];
-        {error, Reason} -> [{NameVsn, Reason}]
-    end;
-for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) ->
-    ?SLOG(debug, #{msg => "plugin_disabled", name_vsn => NameVsn}),
-    [].
-
-maybe_post_op_after_install(NameVsn) ->
+maybe_post_op_after_installed(NameVsn) ->
     _ = maybe_load_config_schema(NameVsn),
-    _ = maybe_create_config_dir(NameVsn),
+    _ = ensure_state(NameVsn, no_move, false, global),
     ok.
 
 maybe_load_config_schema(NameVsn) ->
     AvscPath = avsc_file_path(NameVsn),
-    filelib:is_regular(AvscPath) andalso
-        do_load_config_schema(NameVsn, AvscPath).
+    _ =
+        with_plugin_avsc(NameVsn) andalso
+            filelib:is_regular(AvscPath) andalso
+            do_load_config_schema(NameVsn, AvscPath),
+    _ = maybe_create_config_dir(NameVsn).
 
 do_load_config_schema(NameVsn, AvscPath) ->
     case emqx_plugins_serde:add_schema(NameVsn, AvscPath) of
@@ -1043,28 +1121,107 @@ do_load_config_schema(NameVsn, AvscPath) ->
     end.
 
 maybe_create_config_dir(NameVsn) ->
-    ConfigDir = plugin_config_dir(NameVsn),
-    case filelib:ensure_path(ConfigDir) of
-        ok ->
-            ok;
+    with_plugin_avsc(NameVsn) andalso
+        do_create_config_dir(NameVsn).
+
+do_create_config_dir(NameVsn) ->
+    case plugin_config_dir(NameVsn) of
         {error, Reason} ->
-            ?SLOG(warning, #{
-                msg => "failed_to_create_plugin_config_dir",
-                dir => ConfigDir,
-                reason => Reason
-            }),
-            {error, {mkdir_failed, ConfigDir, Reason}}
+            {error, {gen_config_dir_failed, Reason}};
+        ConfigDir ->
+            case filelib:ensure_path(ConfigDir) of
+                ok ->
+                    %% get config from other nodes or get from tarball
+                    _ = maybe_ensure_plugin_config(NameVsn),
+                    ok;
+                {error, Reason} ->
+                    ?SLOG(warning, #{
+                        msg => "failed_to_create_plugin_config_dir",
+                        dir => ConfigDir,
+                        reason => Reason
+                    }),
+                    {error, {mkdir_failed, ConfigDir, Reason}}
+            end
+    end.
+
+maybe_ensure_plugin_config(NameVsn) ->
+    maybe
+        true ?= with_plugin_avsc(NameVsn),
+        _ = ensure_plugin_config(NameVsn)
+    else
+        _ -> ok
+    end.
+
+ensure_plugin_config(NameVsn) ->
+    %% fetch plugin hocon config from cluster
+    Nodes = [N || N <- mria:running_nodes(), N /= node()],
+    ensure_plugin_config(NameVsn, Nodes).
+ensure_plugin_config(NameVsn, []) ->
+    ?SLOG(debug, #{
+        msg => "default_plugin_config_used",
+        name_vsn => NameVsn,
+        reason => "no_other_running_nodes"
+    }),
+    cp_default_config_file(NameVsn);
+ensure_plugin_config(NameVsn, Nodes) ->
+    case get_plugin_config_from_any_node(Nodes, NameVsn, []) of
+        {ok, ConfigMap} when is_map(ConfigMap) ->
+            HoconBin = hocon_pp:do(ConfigMap, #{}),
+            ok = file:write_file(plugin_config_file(NameVsn), HoconBin),
+            ensure_config_map(NameVsn);
+        _ ->
+            ?SLOG(error, #{msg => "config_not_found_from_cluster", name_vsn => NameVsn}),
+            %% otherwise cp default hocon file
+            %% i.e. Clean installation
+            cp_default_config_file(NameVsn)
+    end.
+
+cp_default_config_file(NameVsn) ->
+    %% always copy default hocon file into config dir when can not get config from other nodes
+    Source = default_plugin_config_file(NameVsn),
+    Destination = plugin_config_file(NameVsn),
+    maybe
+        true ?= filelib:is_regular(Source),
+        %% destination path not existed (not configured)
+        true ?= (not filelib:is_regular(Destination)),
+        case file:copy(Source, Destination) of
+            {ok, _} ->
+                ok;
+            {error, Reason} ->
+                ?SLOG(warning, #{
+                    msg => "failed_to_copy_plugin_default_hocon_config",
+                    source => Source,
+                    destination => Destination,
+                    reason => Reason
+                })
+        end
+    else
+        _ -> ensure_config_map(NameVsn)
+    end.
+
+ensure_config_map(NameVsn) ->
+    with_plugin_avsc(NameVsn) andalso
+        do_ensure_config_map(NameVsn).
+
+do_ensure_config_map(NameVsn) ->
+    case read_plugin_hocon(NameVsn, #{read_mode => ?JSON_MAP}) of
+        {ok, ConfigJsonMap} ->
+            {ok, Config} = decode_plugin_config_map(NameVsn, ConfigJsonMap),
+            put_config(NameVsn, ConfigJsonMap, Config);
+        _ ->
+            ?SLOG(warning, #{msg => "failed_to_read_plugin_config_hocon", name_vsn => NameVsn}),
+            ok
     end.
 
 %% @private Backup the current config to a file with a timestamp suffix and
 %% then save the new config to the config file.
-backup_and_write_avro_bin(NameVsn, AvroBin) ->
+backup_and_write_hocon_bin(NameVsn, HoconBin) ->
     %% this may fail, but we don't care
     %% e.g. read-only file system
-    Path = avro_config_file(NameVsn),
+    Path = plugin_config_file(NameVsn),
     _ = filelib:ensure_dir(Path),
     TmpFile = Path ++ ".tmp",
-    case file:write_file(TmpFile, AvroBin) of
+    case file:write_file(TmpFile, HoconBin) of
         ok ->
             backup_and_replace(Path, TmpFile);
         {error, Reason} ->
@@ -1146,9 +1303,29 @@ read_file_fun(Path, ErrMsg, #{read_mode := ?JSON_MAP}) ->
 plugin_dir(NameVsn) ->
     wrap_list_path(filename:join([install_dir(), NameVsn])).
 
--spec plugin_config_dir(name_vsn()) -> string().
+-spec plugin_priv_dir(name_vsn()) -> string().
+plugin_priv_dir(NameVsn) ->
+    case read_plugin_info(NameVsn, #{fill_readme => false}) of
+        {ok, #{<<"name">> := Name, <<"metadata_vsn">> := Vsn}} ->
+            AppDir = make_name_vsn_string(Name, Vsn),
+            wrap_list_path(filename:join([plugin_dir(NameVsn), AppDir, "priv"]));
+        _ ->
+            wrap_list_path(filename:join([install_dir(), NameVsn, "priv"]))
+    end.
+
+-spec plugin_config_dir(name_vsn()) -> string() | {error, Reason :: string()}.
 plugin_config_dir(NameVsn) ->
-    wrap_list_path(filename:join([plugin_dir(NameVsn), "data", "configs"])).
+    case parse_name_vsn(NameVsn) of
+        {ok, NameAtom, _Vsn} ->
+            wrap_list_path(filename:join([emqx:data_dir(), "plugins", atom_to_list(NameAtom)]));
+        {error, Reason} ->
+            ?SLOG(warning, #{
+                msg => "failed_to_generate_plugin_config_dir_for_plugin",
+                plugin_namevsn => NameVsn,
+                reason => Reason
+            }),
+            {error, Reason}
+    end.
 
 %% Files
 -spec pkg_file_path(name_vsn()) -> string().
@@ -1161,15 +1338,20 @@ info_file_path(NameVsn) ->
 
 -spec avsc_file_path(name_vsn()) -> string().
 avsc_file_path(NameVsn) ->
-    wrap_list_path(filename:join([plugin_dir(NameVsn), "config_schema.avsc"])).
+    wrap_list_path(filename:join([plugin_priv_dir(NameVsn), "config_schema.avsc"])).
+
+-spec plugin_config_file(name_vsn()) -> string().
+plugin_config_file(NameVsn) ->
+    wrap_list_path(filename:join([plugin_config_dir(NameVsn), "config.hocon"])).
 
--spec avro_config_file(name_vsn()) -> string().
-avro_config_file(NameVsn) ->
-    wrap_list_path(filename:join([plugin_config_dir(NameVsn), "config.avro"])).
+%% should only used when plugin installing
+-spec default_plugin_config_file(name_vsn()) -> string().
+default_plugin_config_file(NameVsn) ->
+    wrap_list_path(filename:join([plugin_priv_dir(NameVsn), "config.hocon"])).
 
 -spec i18n_file_path(name_vsn()) -> string().
 i18n_file_path(NameVsn) ->
-    wrap_list_path(filename:join([plugin_dir(NameVsn), "config_i18n.json"])).
+    wrap_list_path(filename:join([plugin_priv_dir(NameVsn), "config_i18n.json"])).
 
 -spec readme_file(name_vsn()) -> string().
 readme_file(NameVsn) ->

+ 2 - 1
apps/emqx_plugins/src/emqx_plugins_app.erl

@@ -27,8 +27,9 @@
 
 start(_Type, _Args) ->
     %% load all pre-configured
-    ok = emqx_plugins:ensure_started(),
     {ok, Sup} = emqx_plugins_sup:start_link(),
+    ok = emqx_plugins:ensure_installed(),
+    ok = emqx_plugins:ensure_started(),
     ok = emqx_config_handler:add_handler([?CONF_ROOT], emqx_plugins),
     {ok, Sup}.
 

+ 5 - 7
apps/emqx_plugins/src/emqx_plugins_serde.erl

@@ -33,7 +33,6 @@
     init/1,
     handle_call/3,
     handle_cast/2,
-    handle_continue/2,
     terminate/2
 ]).
 
@@ -126,11 +125,10 @@ init(_) ->
     ]),
     State = #{},
     AvscPaths = get_plugin_avscs(),
-    {ok, State, {continue, {build_serdes, AvscPaths}}}.
-
-handle_continue({build_serdes, AvscPaths}, State) ->
+    %% force build all schemas at startup
+    %% otherwise plugin schema may not be available when needed
     _ = build_serdes(AvscPaths),
-    {noreply, State}.
+    {ok, State}.
 
 handle_call({build_serdes, NameVsn, AvscPath}, _From, State) ->
     BuildRes = do_build_serde({NameVsn, AvscPath}),
@@ -153,10 +151,10 @@ terminate(_Reason, _State) ->
 
 -spec get_plugin_avscs() -> [{string(), string()}].
 get_plugin_avscs() ->
-    Pattern = filename:join([emqx_plugins:install_dir(), "*", "config_schema.avsc"]),
+    Pattern = filename:join([emqx_plugins:install_dir(), "*", "*", "priv", "config_schema.avsc"]),
     lists:foldl(
         fun(AvscPath, AccIn) ->
-            [_, NameVsn | _] = lists:reverse(filename:split(AvscPath)),
+            [_, _, _, NameVsn | _] = lists:reverse(filename:split(AvscPath)),
             [{to_bin(NameVsn), AvscPath} | AccIn]
         end,
         _Acc0 = [],

+ 41 - 0
apps/emqx_plugins/src/proto/emqx_plugins_proto_v2.erl

@@ -0,0 +1,41 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_plugins_proto_v2).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+    get_tar/3,
+    get_config/5
+]).
+
+-include("emqx_plugins.hrl").
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.7.0".
+
+-spec get_tar(node(), name_vsn(), timeout()) -> {ok, binary()} | {error, any()}.
+get_tar(Node, NameVsn, Timeout) ->
+    rpc:call(Node, emqx_plugins, get_tar, [NameVsn], Timeout).
+
+-spec get_config(
+    node(), name_vsn(), ?CONFIG_FORMAT_MAP, any(), timeout()
+) -> {ok, map() | any()} | {error, any()}.
+get_config(Node, NameVsn, Opt, Default, Timeout) ->
+    rpc:call(Node, emqx_plugins, get_config, [NameVsn, Opt, Default], Timeout).

+ 37 - 6
apps/emqx_plugins/test/emqx_plugins_SUITE.erl

@@ -21,6 +21,7 @@
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(EMQX_PLUGIN_APP_NAME, my_emqx_plugin).
 -define(EMQX_PLUGIN_TEMPLATE_RELEASE_NAME, atom_to_list(?EMQX_PLUGIN_APP_NAME)).
@@ -273,9 +274,15 @@ t_start_restart_and_stop(Config) ->
     %% fake enable bar-2
     ok = ensure_state(Bar2, rear, true),
     %% should cause an error
-    ?assertError(
-        #{function := _, errors := [_ | _]},
-        emqx_plugins:ensure_started()
+    ?check_trace(
+        emqx_plugins:ensure_started(),
+        fun(Trace) ->
+            ?assertMatch(
+                [#{function := _, errors := [_ | _]}],
+                ?of_kind(for_plugins_action_error_occurred, Trace)
+            ),
+            ok
+        end
     ),
     %% but demo plugin should still be running
     assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
@@ -337,7 +344,7 @@ t_enable_disable({'end', Config}) ->
 t_enable_disable(Config) ->
     NameVsn = proplists:get_value(name_vsn, Config),
     ok = emqx_plugins:ensure_installed(NameVsn),
-    ?assertEqual([], emqx_plugins:configured()),
+    ?assertEqual([#{name_vsn => NameVsn, enable => false}], emqx_plugins:configured()),
     ok = emqx_plugins:ensure_enabled(NameVsn),
     ?assertEqual([#{name_vsn => NameVsn, enable => true}], emqx_plugins:configured()),
     ok = emqx_plugins:ensure_disabled(NameVsn),
@@ -379,9 +386,10 @@ t_bad_tar_gz(Config) ->
         }},
         emqx_plugins:ensure_installed("fake-vsn")
     ),
+    %% the plugin tarball can not be found on any nodes
     ?assertMatch(
         {error, #{
-            error_msg := "failed_to_extract_plugin_package",
+            error_msg := "no_nodes_to_copy_plugin_from",
             reason := not_found
         }},
         emqx_plugins:ensure_installed("nonexisting")
@@ -556,7 +564,7 @@ t_load_config_from_cli({'end', Config}) ->
 t_load_config_from_cli(Config) when is_list(Config) ->
     NameVsn = ?config(name_vsn, Config),
     ok = emqx_plugins:ensure_installed(NameVsn),
-    ?assertEqual([], emqx_plugins:configured()),
+    ?assertEqual([#{name_vsn => NameVsn, enable => false}], emqx_plugins:configured()),
     ok = emqx_plugins:ensure_enabled(NameVsn),
     ok = emqx_plugins:ensure_started(NameVsn),
     Params0 = unused,
@@ -687,6 +695,14 @@ group_t_copy_plugin_to_a_new_node(Config) ->
     %% see: emqx_conf_app:init_conf/0
     ok = rpc:call(CopyToNode, application, stop, [emqx_plugins]),
     {ok, _} = rpc:call(CopyToNode, application, ensure_all_started, [emqx_plugins]),
+
+    %% Plugin config should be synced from `CopyFromNode`
+    %% by application `emqx` and `emqx_conf`
+    %% FIXME: in test case, we manually do it here
+    ok = rpc:call(CopyToNode, emqx_plugins, put_config_internal, [[states], CopyFromPluginsState]),
+    ok = rpc:call(CopyToNode, emqx_plugins, ensure_installed, []),
+    ok = rpc:call(CopyToNode, emqx_plugins, ensure_started, []),
+
     ?assertMatch(
         {ok, #{running_status := running, config_status := enabled}},
         rpc:call(CopyToNode, emqx_plugins, describe, [NameVsn])
@@ -739,6 +755,16 @@ group_t_copy_plugin_to_a_new_node_single_node(Config) ->
     ct:pal("~p install_dir:\n  ~p", [
         CopyToNode, erpc:call(CopyToNode, file, list_dir, [ToInstallDir])
     ]),
+
+    %% Plugin config should be synced from `CopyFromNode`
+    %% by application `emqx` and `emqx_conf`
+    %% FIXME: in test case, we manually do it here
+    ok = rpc:call(CopyToNode, emqx_plugins, put_config_internal, [
+        [states], [#{enable => true, name_vsn => NameVsn}]
+    ]),
+    ok = rpc:call(CopyToNode, emqx_plugins, ensure_installed, []),
+    ok = rpc:call(CopyToNode, emqx_plugins, ensure_started, []),
+
     ?assertMatch(
         {ok, #{running_status := running, config_status := enabled}},
         rpc:call(CopyToNode, emqx_plugins, describe, [NameVsn])
@@ -785,6 +811,11 @@ group_t_cluster_leave(Config) ->
     ok = erpc:call(N1, emqx_plugins, ensure_installed, [NameVsn]),
     ok = erpc:call(N1, emqx_plugins, ensure_started, [NameVsn]),
     ok = erpc:call(N1, emqx_plugins, ensure_enabled, [NameVsn]),
+
+    ok = erpc:call(N2, emqx_plugins, ensure_installed, [NameVsn]),
+    ok = erpc:call(N2, emqx_plugins, ensure_started, [NameVsn]),
+    ok = erpc:call(N2, emqx_plugins, ensure_enabled, [NameVsn]),
+
     Params = unused,
     %% 2 nodes running
     ?assertMatch(

+ 1 - 10
scripts/ensure-rebar3.sh

@@ -4,17 +4,8 @@ set -euo pipefail
 
 [ "${DEBUG:-0}" -eq 1 ] && set -x
 
-## rebar3 tag 3.19.0-emqx-1 is compiled using latest official OTP-24 image.
-## we have to use an otp24-compiled rebar3 because the defination of record #application{}
-## in systools.hrl is changed in otp24.
 OTP_VSN="${OTP_VSN:-$(./scripts/get-otp-vsn.sh)}"
 case ${OTP_VSN} in
-    23*)
-        VERSION="3.16.1-emqx-1"
-        ;;
-    24*)
-        VERSION="3.18.0-emqx-1"
-        ;;
     25*)
         VERSION="3.19.0-emqx-9"
         ;;
@@ -22,7 +13,7 @@ case ${OTP_VSN} in
         VERSION="3.20.0-emqx-1"
         ;;
     *)
-        echo "Unsupporetd Erlang/OTP version $OTP_VSN"
+        echo "Unsupported Erlang/OTP version $OTP_VSN"
         exit 1
         ;;
 esac

+ 1 - 1
scripts/get-otp-vsn.sh

@@ -2,4 +2,4 @@
 
 set -euo pipefail
 
-erl  -noshell -eval '{ok, Version} = file:read_file(filename:join([code:root_dir(), "releases", erlang:system_info(otp_release), "OTP_VERSION"])), io:fwrite(Version), halt().'
+erl -noshell -eval '{ok, Version} = file:read_file(filename:join([code:root_dir(), "releases", erlang:system_info(otp_release), "OTP_VERSION"])), io:fwrite(Version), halt().'