Просмотр исходного кода

feat: plugin config with avro schema and apis

JimMoen 1 год назад
Родитель
Сommit
8db5e51592

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -46,6 +46,7 @@
 {emqx_metrics,2}.
 {emqx_mgmt_api_plugins,1}.
 {emqx_mgmt_api_plugins,2}.
+{emqx_mgmt_api_plugins,3}.
 {emqx_mgmt_cluster,1}.
 {emqx_mgmt_cluster,2}.
 {emqx_mgmt_cluster,3}.

+ 148 - 14
apps/emqx_management/src/emqx_mgmt_api_plugins.erl

@@ -19,7 +19,7 @@
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("emqx/include/logger.hrl").
-%%-include_lib("emqx_plugins/include/emqx_plugins.hrl").
+-include_lib("emqx_plugins/include/emqx_plugins.hrl").
 
 -export([
     api_spec/0,
@@ -34,6 +34,8 @@
     upload_install/2,
     plugin/2,
     update_plugin/2,
+    plugin_config/2,
+    plugin_schema/2,
     update_boot_order/2
 ]).
 
@@ -43,7 +45,8 @@
     install_package/2,
     delete_package/1,
     describe_package/1,
-    ensure_action/2
+    ensure_action/2,
+    do_update_plugin_config/3
 ]).
 
 -define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_.]*$").
@@ -52,7 +55,11 @@
 %% app_name must be a snake_case (no '-' allowed).
 -define(VSN_WILDCARD, "-*.tar.gz").
 
-namespace() -> "plugins".
+-define(CONTENT_PLUGIN, plugin).
+-define(CONTENT_CONFIG, config).
+
+namespace() ->
+    "plugins".
 
 api_spec() ->
     emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
@@ -64,6 +71,8 @@ paths() ->
         "/plugins/:name",
         "/plugins/install",
         "/plugins/:name/:action",
+        "/plugins/:name/config",
+        "/plugins/:name/schema",
         "/plugins/:name/move"
     ].
 
@@ -97,10 +106,10 @@ schema("/plugins/install") ->
                         schema => #{
                             type => object,
                             properties => #{
-                                plugin => #{type => string, format => binary}
+                                ?CONTENT_PLUGIN => #{type => string, format => binary}
                             }
                         },
-                        encoding => #{plugin => #{'contentType' => 'application/gzip'}}
+                        encoding => #{?CONTENT_PLUGIN => #{'contentType' => 'application/gzip'}}
                     }
                 }
             },
@@ -157,6 +166,70 @@ schema("/plugins/:name/:action") ->
             }
         }
     };
+schema("/plugins/:name/config") ->
+    #{
+        'operationId' => plugin_config,
+        get => #{
+            summary =>
+                <<"Get plugin config">>,
+            description =>
+                "Get plugin config by avro encoded binary config. Schema defined by user's schema.avsc file.<br/>",
+            tags => ?TAGS,
+            parameters => [hoconsc:ref(name)],
+            responses => #{
+                %% binary avro encoded config
+                200 => hoconsc:mk(binary()),
+                404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>)
+            }
+        },
+        put => #{
+            summary =>
+                <<"Update plugin config">>,
+            description =>
+                "Update plugin config by avro encoded binary config. Schema defined by user's schema.avsc file.<br/>",
+            tags => ?TAGS,
+            parameters => [hoconsc:ref(name)],
+            'requestBody' => #{
+                content => #{
+                    'multipart/form-data' => #{
+                        schema => #{
+                            type => object,
+                            properties => #{
+                                ?CONTENT_CONFIG => #{type => string, format => binary}
+                            }
+                        },
+                        encoding => #{?CONTENT_CONFIG => #{'contentType' => 'application/gzip'}}
+                    }
+                }
+            },
+            responses => #{
+                204 => <<"Config updated successfully">>,
+                400 => emqx_dashboard_swagger:error_codes(
+                    ['UNEXPECTED_ERROR'], <<"Update plugin config failed">>
+                ),
+                404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>)
+            }
+        }
+    };
+schema("/plugins/:name/schema") ->
+    #{
+        'operationId' => plugin_schema,
+        get => #{
+            summary => <<"Get installed plugin's avro schema">>,
+            description =>
+                "Get plugin's config avro schema.",
+            tags => ?TAGS,
+            parameters => [hoconsc:ref(name)],
+            responses => #{
+                %% avro schema and i18n json object
+                200 => hoconsc:mk(binary()),
+                404 => emqx_dashboard_swagger:error_codes(
+                    ['NOT_FOUND', 'FILE_NOT_EXISTED'],
+                    <<"Plugin Not Found or Plugin not given a schema file">>
+                )
+            }
+        }
+    };
 schema("/plugins/:name/move") ->
     #{
         'operationId' => update_boot_order,
@@ -338,7 +411,7 @@ upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) -
     %% File bin is too large, we use rpc:multicall instead of cluster_rpc:multicall
     NameVsn = string:trim(FileName, trailing, ".tar.gz"),
     case emqx_plugins:describe(NameVsn) of
-        {error, #{error := "bad_info_file", return := {enoent, _}}} ->
+        {error, #{error_msg := "bad_info_file", reason := {enoent, _}}} ->
             case emqx_plugins:parse_name_vsn(FileName) of
                 {ok, AppName, _Vsn} ->
                     AppDir = filename:join(emqx_plugins:install_dir(), AppName),
@@ -394,7 +467,7 @@ do_install_package(FileName, Bin) ->
             ),
             Reason =
                 case hd(Filtered) of
-                    {error, #{error := Reason0}} -> Reason0;
+                    {error, #{error_msg := Reason0}} -> Reason0;
                     {error, #{reason := Reason0}} -> Reason0
                 end,
             {400, #{
@@ -418,6 +491,42 @@ update_plugin(put, #{bindings := #{name := Name, action := Action}}) ->
     Res = emqx_mgmt_api_plugins_proto_v2:ensure_action(Name, Action),
     return(204, Res).
 
+plugin_config(get, #{bindings := #{name := Name}}) ->
+    case emqx_plugins:get_plugin_config(Name, #{format => ?CONFIG_FORMAT_AVRO}) of
+        {ok, AvroBin} ->
+            {200, #{<<"content-type">> => <<"application/octet-stream">>}, AvroBin};
+        {error, _} ->
+            {400, #{
+                code => 'BAD_CONFIG',
+                message => <<"Failed to get plugin config">>
+            }}
+    end;
+plugin_config(put, #{bindings := #{name := Name}, body := #{<<"config">> := RawAvro}}) ->
+    case emqx_plugins:decode_plugin_avro_config(Name, RawAvro) of
+        {ok, Config} ->
+            Nodes = emqx:running_nodes(),
+            _Res = emqx_mgmt_api_plugins_proto_v3:update_plugin_config(
+                Nodes, Name, RawAvro, Config
+            ),
+            {204};
+        {error, Reason} ->
+            {400, #{
+                code => 'BAD_CONFIG',
+                message => readable_error_msg(Reason)
+            }}
+    end.
+
+plugin_schema(get, #{bindings := #{name := NameVsn}}) ->
+    case emqx_plugins:describe(NameVsn) of
+        {ok, _Plugin} ->
+            {200, format_plugin_schema_with_i18n(NameVsn)};
+        _ ->
+            {404, #{
+                code => 'NOT_FOUND',
+                message => <<"Plugin Not Found">>
+            }}
+    end.
+
 update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->
     case parse_position(Body, Name) of
         {error, Reason} ->
@@ -429,7 +538,7 @@ update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->
                 {error, Reason} ->
                     {400, #{
                         code => 'MOVE_FAILED',
-                        message => iolist_to_binary(io_lib:format("~p", [Reason]))
+                        message => readable_error_msg(Reason)
                     }}
             end
     end.
@@ -443,7 +552,7 @@ install_package(FileName, Bin) ->
     ok = file:write_file(File, Bin),
     PackageName = string:trim(FileName, trailing, ".tar.gz"),
     case emqx_plugins:ensure_installed(PackageName) of
-        {error, #{return := not_found}} = NotFound ->
+        {error, #{reason := not_found}} = NotFound ->
             NotFound;
         {error, Reason} = Error ->
             ?SLOG(error, Reason#{msg => "failed_to_install_plugin"}),
@@ -454,9 +563,9 @@ install_package(FileName, Bin) ->
     end.
 
 %% For RPC plugin get
-describe_package(Name) ->
+describe_package(NameVsn) ->
     Node = node(),
-    case emqx_plugins:describe(Name) of
+    case emqx_plugins:describe(NameVsn) of
         {ok, Plugin} -> {Node, [Plugin]};
         _ -> {Node, []}
     end.
@@ -487,12 +596,25 @@ ensure_action(Name, restart) ->
     _ = emqx_plugins:restart(Name),
     ok.
 
+%% for RPC plugin avro encoded config update
+do_update_plugin_config(Name, Avro, PluginConfig) ->
+    emqx_plugins:put_plugin_config(Name, Avro, PluginConfig).
+
+%%--------------------------------------------------------------------
+%% Helper functions
+%%--------------------------------------------------------------------
+
 return(Code, ok) ->
     {Code};
-return(_, {error, #{error := "bad_info_file", return := {enoent, _} = Reason}}) ->
-    {404, #{code => 'NOT_FOUND', message => iolist_to_binary(io_lib:format("~p", [Reason]))}};
+return(_, {error, #{error_msg := "bad_info_file", reason := {enoent, _} = Reason}}) ->
+    {404, #{code => 'NOT_FOUND', message => readable_error_msg(Reason)}};
+return(_, {error, #{error_msg := "bad_avro_config_file", reason := {enoent, _} = Reason}}) ->
+    {404, #{code => 'NOT_FOUND', message => readable_error_msg(Reason)}};
 return(_, {error, Reason}) ->
-    {400, #{code => 'PARAM_ERROR', message => iolist_to_binary(io_lib:format("~p", [Reason]))}}.
+    {400, #{code => 'PARAM_ERROR', message => readable_error_msg(Reason)}}.
+
+readable_error_msg(Msg) ->
+    emqx_utils:readable_error_msg(Msg).
 
 parse_position(#{<<"position">> := <<"front">>}, _) ->
     front;
@@ -563,6 +685,18 @@ aggregate_status([{Node, Plugins} | List], Acc) ->
         ),
     aggregate_status(List, NewAcc).
 
+format_plugin_schema_with_i18n(NameVsn) ->
+    #{
+        avsc => try_read_file(fun() -> emqx_plugins:plugin_avsc(NameVsn) end),
+        i18n => try_read_file(fun() -> emqx_plugins:plugin_i18n(NameVsn) end)
+    }.
+
+try_read_file(Fun) ->
+    case Fun() of
+        {ok, Bin} -> Bin;
+        _ -> null
+    end.
+
 % running_status: running loaded, stopped
 %% config_status: not_configured disable enable
 plugin_status(#{running_status := running}) -> running;

+ 1 - 0
apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v2.erl

@@ -24,6 +24,7 @@
     describe_package/2,
     delete_package/1,
     ensure_action/2
+    %% plugin_config/2
 ]).
 
 -include_lib("emqx/include/bpapi.hrl").

+ 65 - 0
apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v3.erl

@@ -0,0 +1,65 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_mgmt_api_plugins_proto_v3).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+    get_plugins/1,
+    install_package/3,
+    describe_package/2,
+    delete_package/1,
+    ensure_action/2,
+    update_plugin_config/4
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.7.0".
+
+-spec get_plugins([node()]) -> emqx_rpc:multicall_result().
+get_plugins(Nodes) ->
+    rpc:multicall(Nodes, emqx_mgmt_api_plugins, get_plugins, [], 15000).
+
+-spec install_package([node()], binary() | string(), binary()) -> emqx_rpc:multicall_result().
+install_package(Nodes, Filename, Bin) ->
+    rpc:multicall(Nodes, emqx_mgmt_api_plugins, install_package, [Filename, Bin], 25000).
+
+-spec describe_package([node()], binary() | string()) -> emqx_rpc:multicall_result().
+describe_package(Nodes, Name) ->
+    rpc:multicall(Nodes, emqx_mgmt_api_plugins, describe_package, [Name], 10000).
+
+-spec delete_package(binary() | string()) -> ok | {error, any()}.
+delete_package(Name) ->
+    emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, delete_package, [Name], all, 10000).
+
+-spec ensure_action(binary() | string(), 'restart' | 'start' | 'stop') -> ok | {error, any()}.
+ensure_action(Name, Action) ->
+    emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, ensure_action, [Name, Action], all, 10000).
+
+-spec update_plugin_config(
+    [node()],
+    binary() | string(),
+    binary(),
+    map()
+) ->
+    emqx_rpc:multicall_result().
+update_plugin_config(Nodes, Name, RawAvro, PluginConfig) ->
+    rpc:multicall(
+        Nodes, emqx_mgmt_api_plugins, do_update_plugin_config, [Name, RawAvro, PluginConfig], 10000
+    ).

+ 21 - 0
apps/emqx_plugins/include/emqx_plugins.hrl

@@ -19,4 +19,25 @@
 
 -define(CONF_ROOT, plugins).
 
+-define(PLUGIN_SERDE_TAB, emqx_plugins_schema_serde_tab).
+
+-define(CONFIG_FORMAT_AVRO, config_format_avro).
+-define(CONFIG_FORMAT_MAP, config_format_map).
+
+-type schema_name() :: binary().
+-type avsc() :: binary().
+
+-type encoded_data() :: iodata().
+-type decoded_data() :: map().
+
+-record(plugin_schema_serde, {
+    name :: schema_name(),
+    eval_context :: term(),
+    %% TODO: fields to mark schema import status
+    %% scheam_imported :: boolean(),
+    %% for future use
+    extra = []
+}).
+-type plugin_schema_serde() :: #plugin_schema_serde{}.
+
 -endif.

+ 1 - 1
apps/emqx_plugins/src/emqx_plugins.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_plugins, [
     {description, "EMQX Plugin Management"},
-    {vsn, "0.1.8"},
+    {vsn, "0.2.0"},
     {modules, []},
     {mod, {emqx_plugins_app, []}},
     {applications, [kernel, stdlib, emqx]},

+ 420 - 257
apps/emqx_plugins/src/emqx_plugins.erl

@@ -16,7 +16,6 @@
 
 -module(emqx_plugins).
 
--include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include("emqx_plugins.hrl").
 
@@ -24,6 +23,15 @@
 -include_lib("eunit/include/eunit.hrl").
 -endif.
 
+-export([
+    describe/1,
+    plugin_avsc/1,
+    plugin_i18n/1,
+    plugin_avro/1,
+    parse_name_vsn/1
+]).
+
+%% Package operations
 -export([
     ensure_installed/1,
     ensure_uninstalled/1,
@@ -35,21 +43,26 @@
     delete_package/1
 ]).
 
+%% Plugin runtime management
 -export([
     ensure_started/0,
     ensure_started/1,
     ensure_stopped/0,
     ensure_stopped/1,
+    get_plugin_config/1,
+    get_plugin_config/2,
+    put_plugin_config/3,
     restart/1,
-    list/0,
-    describe/1,
-    parse_name_vsn/1
+    list/0
 ]).
 
+%% Package utils
 -export([
+    decode_plugin_avro_config/2,
     get_config/2,
     put_config/2,
-    get_tar/1
+    get_tar/1,
+    install_dir/0
 ]).
 
 %% `emqx_config_handler' API
@@ -57,21 +70,26 @@
     post_config_update/5
 ]).
 
-%% internal
+%% Internal export
 -export([do_ensure_started/1]).
--export([
-    install_dir/0
-]).
 
 -ifdef(TEST).
 -compile(export_all).
 -compile(nowarn_export_all).
 -endif.
 
+%% Defines
+-define(PLUGIN_PERSIS_CONFIG_KEY(NameVsn), {?MODULE, NameVsn}).
+
+%% Types
 %% "my_plugin-0.1.0"
 -type name_vsn() :: binary() | string().
 %% the parse result of the JSON info file
 -type plugin() :: map().
+-type schema_json() :: map().
+-type i18n_json() :: map().
+-type avro_binary() :: binary().
+-type plugin_config() :: map().
 -type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}.
 
 %%--------------------------------------------------------------------
@@ -80,12 +98,36 @@
 
 %% @doc Describe a plugin.
 -spec describe(name_vsn()) -> {ok, plugin()} | {error, any()}.
-describe(NameVsn) -> read_plugin(NameVsn, #{fill_readme => true}).
+describe(NameVsn) ->
+    read_plugin_info(NameVsn, #{fill_readme => true}).
+
+-spec plugin_avsc(name_vsn()) -> {ok, schema_json()} | {error, any()}.
+plugin_avsc(NameVsn) ->
+    read_plugin_avsc(NameVsn).
+
+-spec plugin_i18n(name_vsn()) -> {ok, i18n_json()} | {error, any()}.
+plugin_i18n(NameVsn) ->
+    read_plugin_i18n(NameVsn).
+
+-spec plugin_avro(name_vsn()) -> {ok, avro_binary()} | {error, any()}.
+plugin_avro(NameVsn) ->
+    read_plugin_avro(NameVsn).
+
+parse_name_vsn(NameVsn) when is_binary(NameVsn) ->
+    parse_name_vsn(binary_to_list(NameVsn));
+parse_name_vsn(NameVsn) when is_list(NameVsn) ->
+    case lists:splitwith(fun(X) -> X =/= $- end, NameVsn) of
+        {AppName, [$- | Vsn]} -> {ok, list_to_atom(AppName), Vsn};
+        _ -> {error, "bad_name_vsn"}
+    end.
+
+%%--------------------------------------------------------------------
+%% Package operations
 
 %% @doc Install a .tar.gz package placed in install_dir.
 -spec ensure_installed(name_vsn()) -> ok | {error, map()}.
 ensure_installed(NameVsn) ->
-    case read_plugin(NameVsn, #{}) of
+    case read_plugin_info(NameVsn, #{}) of
         {ok, _} ->
             ok;
         {error, _} ->
@@ -93,33 +135,183 @@ ensure_installed(NameVsn) ->
             do_ensure_installed(NameVsn)
     end.
 
-do_ensure_installed(NameVsn) ->
-    TarGz = pkg_file(NameVsn),
-    case erl_tar:extract(TarGz, [compressed, memory]) of
-        {ok, TarContent} ->
-            ok = write_tar_file_content(install_dir(), TarContent),
-            case read_plugin(NameVsn, #{}) of
-                {ok, _} ->
-                    ok;
-                {error, Reason} ->
-                    ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}),
-                    ok = delete_tar_file_content(install_dir(), TarContent),
-                    {error, Reason}
-            end;
-        {error, {_, enoent}} ->
+%% @doc Ensure files and directories for the given plugin are being deleted.
+%% If a plugin is running, or enabled, an error is returned.
+-spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}.
+ensure_uninstalled(NameVsn) ->
+    case read_plugin_info(NameVsn, #{}) of
+        {ok, #{running_status := RunningSt}} when RunningSt =/= stopped ->
             {error, #{
-                reason => "failed_to_extract_plugin_package",
-                path => TarGz,
-                return => not_found
+                error_msg => "bad_plugin_running_status",
+                hint => "stop_the_plugin_first"
             }};
-        {error, Reason} ->
+        {ok, #{config_status := enabled}} ->
             {error, #{
-                reason => "bad_plugin_package",
-                path => TarGz,
-                return => Reason
-            }}
+                error_msg => "bad_plugin_config_status",
+                hint => "disable_the_plugin_first"
+            }};
+        _ ->
+            purge(NameVsn),
+            ensure_delete(NameVsn)
+    end.
+
+%% @doc Ensure a plugin is enabled to the end of the plugins list.
+-spec ensure_enabled(name_vsn()) -> ok | {error, any()}.
+ensure_enabled(NameVsn) ->
+    ensure_enabled(NameVsn, no_move).
+
+%% @doc Ensure a plugin is enabled at the given position of the plugin list.
+-spec ensure_enabled(name_vsn(), position()) -> ok | {error, any()}.
+ensure_enabled(NameVsn, Position) ->
+    ensure_state(NameVsn, Position, _Enabled = true, _ConfLocation = local).
+
+-spec ensure_enabled(name_vsn(), position(), local | global) -> ok | {error, any()}.
+ensure_enabled(NameVsn, Position, ConfLocation) when
+    ConfLocation =:= local; ConfLocation =:= global
+->
+    ensure_state(NameVsn, Position, _Enabled = true, ConfLocation).
+
+%% @doc Ensure a plugin is disabled.
+-spec ensure_disabled(name_vsn()) -> ok | {error, any()}.
+ensure_disabled(NameVsn) ->
+    ensure_state(NameVsn, no_move, false, _ConfLocation = local).
+
+%% @doc Delete extracted dir
+%% In case one lib is shared by multiple plugins.
+%% it might be the case that purging one plugin's install dir
+%% will cause deletion of loaded beams.
+%% It should not be a problem, because shared lib should
+%% reside in all the plugin install dirs.
+-spec purge(name_vsn()) -> ok.
+purge(NameVsn) ->
+    _ = maybe_purge_plugin_config(NameVsn),
+    purge_plugin(NameVsn).
+
+%% @doc Delete the package file.
+-spec delete_package(name_vsn()) -> ok.
+delete_package(NameVsn) ->
+    File = pkg_file(NameVsn),
+    _ = emqx_plugins_serde:delete_schema(NameVsn),
+    case file:delete(File) of
+        ok ->
+            ?SLOG(info, #{msg => "purged_plugin_dir", path => File}),
+            ok;
+        {error, enoent} ->
+            ok;
+        {error, Reason} ->
+            ?SLOG(error, #{
+                msg => "failed_to_delete_package_file",
+                path => File,
+                reason => Reason
+            }),
+            {error, Reason}
+    end.
+
+%%--------------------------------------------------------------------
+%% Plugin runtime management
+
+%% @doc Start all configured plugins are started.
+-spec ensure_started() -> ok.
+ensure_started() ->
+    ok = for_plugins(fun ?MODULE:do_ensure_started/1).
+
+%% @doc Start a plugin from Management API or CLI.
+%% the input is a <name>-<vsn> string.
+-spec ensure_started(name_vsn()) -> ok | {error, term()}.
+ensure_started(NameVsn) ->
+    case do_ensure_started(NameVsn) of
+        ok ->
+            ok;
+        {error, Reason} ->
+            ?SLOG(alert, Reason#{msg => "failed_to_start_plugin"}),
+            {error, Reason}
+    end.
+
+%% @doc Stop all plugins before broker stops.
+-spec ensure_stopped() -> ok.
+ensure_stopped() ->
+    for_plugins(fun ?MODULE:ensure_stopped/1).
+
+%% @doc Stop a plugin from Management API or CLI.
+-spec ensure_stopped(name_vsn()) -> ok | {error, term()}.
+ensure_stopped(NameVsn) ->
+    tryit(
+        "stop_plugin",
+        fun() ->
+            Plugin = do_read_plugin(NameVsn),
+            ensure_apps_stopped(Plugin)
+        end
+    ).
+
+-spec get_plugin_config(name_vsn()) ->
+    {ok, plugin_config()} | {error, term()}.
+get_plugin_config(NameVsn) ->
+    get_plugin_config(NameVsn, #{format => ?CONFIG_FORMAT_MAP}).
+
+-spec get_plugin_config(name_vsn(), Options :: map()) ->
+    {ok, avro_binary() | plugin_config()}
+    | {error, term()}.
+get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_AVRO}) ->
+    case read_plugin_avro(NameVsn) of
+        {ok, _AvroBin} = Res -> Res;
+        {error, _Reason} = Err -> Err
+    end;
+get_plugin_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}) ->
+    persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), #{}).
+
+%% @doc Update plugin's config.
+%% RPC call from Management API or CLI.
+%% the avro binary and plugin config ALWAYS be valid before calling this function.
+put_plugin_config(NameVsn, RawAvro, PluginConfig) ->
+    ok = write_avro_bin(NameVsn, RawAvro),
+    ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), PluginConfig),
+    ok.
+
+%% @doc Stop and then start the plugin.
+restart(NameVsn) ->
+    case ensure_stopped(NameVsn) of
+        ok -> ensure_started(NameVsn);
+        {error, Reason} -> {error, Reason}
     end.
 
+%% @doc List all installed plugins.
+%% Including the ones that are installed, but not enabled in config.
+-spec list() -> [plugin()].
+list() ->
+    Pattern = filename:join([install_dir(), "*", "release.json"]),
+    All = lists:filtermap(
+        fun(JsonFilePath) ->
+            [_, NameVsn | _] = lists:reverse(filename:split(JsonFilePath)),
+            case read_plugin_info(NameVsn, #{}) of
+                {ok, Info} ->
+                    {true, Info};
+                {error, Reason} ->
+                    ?SLOG(warning, Reason),
+                    false
+            end
+        end,
+        filelib:wildcard(Pattern)
+    ),
+    do_list(configured(), All).
+
+%%--------------------------------------------------------------------
+%% Package utils
+
+-spec decode_plugin_avro_config(name_vsn(), binary()) -> {ok, map()} | {error, any()}.
+decode_plugin_avro_config(NameVsn, RawAvro) ->
+    case emqx_plugins_serde:decode(NameVsn, RawAvro) of
+        {ok, Config} -> {ok, Config};
+        {error, ReasonMap} -> {error, ReasonMap}
+    end.
+
+get_config(Key, Default) when is_atom(Key) ->
+    get_config([Key], Default);
+get_config(Path, Default) ->
+    emqx_conf:get([?CONF_ROOT | Path], Default).
+
+put_config(Key, Value) ->
+    do_put_config(Key, Value, _ConfLocation = local).
+
 -spec get_tar(name_vsn()) -> {ok, binary()} | {error, any}.
 get_tar(NameVsn) ->
     TarGz = pkg_file(NameVsn),
@@ -135,10 +327,14 @@ get_tar(NameVsn) ->
             end
     end.
 
+%%--------------------------------------------------------------------
+%% Internal
+%%--------------------------------------------------------------------
+
 maybe_create_tar(NameVsn, TarGzName, InstallDir) when is_binary(InstallDir) ->
     maybe_create_tar(NameVsn, TarGzName, binary_to_list(InstallDir));
 maybe_create_tar(NameVsn, TarGzName, InstallDir) ->
-    case filelib:wildcard(filename:join(dir(NameVsn), "**")) of
+    case filelib:wildcard(filename:join(plugin_dir(NameVsn), "**")) of
         [_ | _] = PluginFiles ->
             InstallDir1 = string:trim(InstallDir, trailing, "/") ++ "/",
             PluginFiles1 = [{string:prefix(F, InstallDir1), F} || F <- PluginFiles],
@@ -207,24 +403,32 @@ top_dir_test_() ->
     ].
 -endif.
 
-%% @doc Ensure files and directories for the given plugin are being deleted.
-%% If a plugin is running, or enabled, an error is returned.
--spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}.
-ensure_uninstalled(NameVsn) ->
-    case read_plugin(NameVsn, #{}) of
-        {ok, #{running_status := RunningSt}} when RunningSt =/= stopped ->
+do_ensure_installed(NameVsn) ->
+    TarGz = pkg_file(NameVsn),
+    case erl_tar:extract(TarGz, [compressed, memory]) of
+        {ok, TarContent} ->
+            ok = write_tar_file_content(install_dir(), TarContent),
+            case read_plugin_info(NameVsn, #{}) of
+                {ok, _} ->
+                    ok = maybe_post_op_after_install(NameVsn),
+                    ok;
+                {error, Reason} ->
+                    ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}),
+                    ok = delete_tar_file_content(install_dir(), TarContent),
+                    {error, Reason}
+            end;
+        {error, {_, enoent}} ->
             {error, #{
-                reason => "bad_plugin_running_status",
-                hint => "stop_the_plugin_first"
+                error_msg => "failed_to_extract_plugin_package",
+                path => TarGz,
+                reason => not_found
             }};
-        {ok, #{config_status := enabled}} ->
+        {error, Reason} ->
             {error, #{
-                reason => "bad_plugin_config_status",
-                hint => "disable_the_plugin_first"
-            }};
-        _ ->
-            purge(NameVsn),
-            ensure_delete(NameVsn)
+                error_msg => "bad_plugin_package",
+                path => TarGz,
+                reason => Reason
+            }}
     end.
 
 ensure_delete(NameVsn0) ->
@@ -233,37 +437,19 @@ ensure_delete(NameVsn0) ->
     put_configured(lists:filter(fun(#{name_vsn := N1}) -> bin(N1) =/= NameVsn end, List)),
     ok.
 
-%% @doc Ensure a plugin is enabled to the end of the plugins list.
--spec ensure_enabled(name_vsn()) -> ok | {error, any()}.
-ensure_enabled(NameVsn) ->
-    ensure_enabled(NameVsn, no_move).
-
-%% @doc Ensure a plugin is enabled at the given position of the plugin list.
--spec ensure_enabled(name_vsn(), position()) -> ok | {error, any()}.
-ensure_enabled(NameVsn, Position) ->
-    ensure_state(NameVsn, Position, _Enabled = true, _ConfLocation = local).
-
--spec ensure_enabled(name_vsn(), position(), local | global) -> ok | {error, any()}.
-ensure_enabled(NameVsn, Position, ConfLocation) when
-    ConfLocation =:= local; ConfLocation =:= global
-->
-    ensure_state(NameVsn, Position, _Enabled = true, ConfLocation).
-
-%% @doc Ensure a plugin is disabled.
--spec ensure_disabled(name_vsn()) -> ok | {error, any()}.
-ensure_disabled(NameVsn) ->
-    ensure_state(NameVsn, no_move, false, _ConfLocation = local).
-
 ensure_state(NameVsn, Position, State, ConfLocation) when is_binary(NameVsn) ->
     ensure_state(binary_to_list(NameVsn), Position, State, ConfLocation);
 ensure_state(NameVsn, Position, State, ConfLocation) ->
-    case read_plugin(NameVsn, #{}) of
+    case read_plugin_info(NameVsn, #{}) of
         {ok, _} ->
             Item = #{
                 name_vsn => NameVsn,
                 enable => State
             },
-            tryit("ensure_state", fun() -> ensure_configured(Item, Position, ConfLocation) end);
+            tryit(
+                "ensure_state",
+                fun() -> ensure_configured(Item, Position, ConfLocation) end
+            );
         {error, Reason} ->
             {error, Reason}
     end.
@@ -295,7 +481,7 @@ add_new_configured(Configured, {Action, NameVsn}, Item) ->
     {Front, Rear} = lists:splitwith(SplitFun, Configured),
     Rear =:= [] andalso
         throw(#{
-            error => "position_anchor_plugin_not_configured",
+            error_msg => "position_anchor_plugin_not_configured",
             hint => "maybe_install_and_configure",
             name_vsn => NameVsn
         }),
@@ -307,37 +493,21 @@ add_new_configured(Configured, {Action, NameVsn}, Item) ->
             Front ++ [Anchor, Item | Rear0]
     end.
 
-%% @doc Delete the package file.
--spec delete_package(name_vsn()) -> ok.
-delete_package(NameVsn) ->
-    File = pkg_file(NameVsn),
-    case file:delete(File) of
-        ok ->
-            ?SLOG(info, #{msg => "purged_plugin_dir", path => File}),
-            ok;
-        {error, enoent} ->
-            ok;
-        {error, Reason} ->
-            ?SLOG(error, #{
-                msg => "failed_to_delete_package_file",
-                path => File,
-                reason => Reason
-            }),
-            {error, Reason}
-    end.
+maybe_purge_plugin_config(NameVsn) ->
+    _ = persistent_term:erase(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn)),
+    ok.
 
-%% @doc Delete extracted dir
-%% In case one lib is shared by multiple plugins.
-%% it might be the case that purging one plugin's install dir
-%% will cause deletion of loaded beams.
-%% It should not be a problem, because shared lib should
-%% reside in all the plugin install dirs.
--spec purge(name_vsn()) -> ok.
-purge(NameVsn) ->
-    Dir = dir(NameVsn),
+purge_plugin(NameVsn) ->
+    Dir = plugin_dir(NameVsn),
+    purge_plugin_dir(Dir).
+
+purge_plugin_dir(Dir) ->
     case file:del_dir_r(Dir) of
         ok ->
-            ?SLOG(info, #{msg => "purged_plugin_dir", dir => Dir});
+            ?SLOG(info, #{
+                msg => "purged_plugin_dir",
+                dir => Dir
+            });
         {error, enoent} ->
             ok;
         {error, Reason} ->
@@ -349,72 +519,10 @@ purge(NameVsn) ->
             {error, Reason}
     end.
 
-%% @doc Start all configured plugins are started.
--spec ensure_started() -> ok.
-ensure_started() ->
-    ok = for_plugins(fun ?MODULE:do_ensure_started/1).
-
-%% @doc Start a plugin from Management API or CLI.
-%% the input is a <name>-<vsn> string.
--spec ensure_started(name_vsn()) -> ok | {error, term()}.
-ensure_started(NameVsn) ->
-    case do_ensure_started(NameVsn) of
-        ok ->
-            ok;
-        {error, Reason} ->
-            ?SLOG(alert, #{
-                msg => "failed_to_start_plugin",
-                reason => Reason
-            }),
-            {error, Reason}
-    end.
-
-%% @doc Stop all plugins before broker stops.
--spec ensure_stopped() -> ok.
-ensure_stopped() ->
-    for_plugins(fun ?MODULE:ensure_stopped/1).
-
-%% @doc Stop a plugin from Management API or CLI.
--spec ensure_stopped(name_vsn()) -> ok | {error, term()}.
-ensure_stopped(NameVsn) ->
-    tryit(
-        "stop_plugin",
-        fun() ->
-            Plugin = do_read_plugin(NameVsn),
-            ensure_apps_stopped(Plugin)
-        end
-    ).
-
-%% @doc Stop and then start the plugin.
-restart(NameVsn) ->
-    case ensure_stopped(NameVsn) of
-        ok -> ensure_started(NameVsn);
-        {error, Reason} -> {error, Reason}
-    end.
-
-%% @doc List all installed plugins.
-%% Including the ones that are installed, but not enabled in config.
--spec list() -> [plugin()].
-list() ->
-    Pattern = filename:join([install_dir(), "*", "release.json"]),
-    All = lists:filtermap(
-        fun(JsonFile) ->
-            case read_plugin({file, JsonFile}, #{}) of
-                {ok, Info} ->
-                    {true, Info};
-                {error, Reason} ->
-                    ?SLOG(warning, Reason),
-                    false
-            end
-        end,
-        filelib:wildcard(Pattern)
-    ),
-    list(configured(), All).
-
 %% Make sure configured ones are ordered in front.
-list([], All) ->
+do_list([], All) ->
     All;
-list([#{name_vsn := NameVsn} | Rest], All) ->
+do_list([#{name_vsn := NameVsn} | Rest], All) ->
     SplitF = fun(#{<<"name">> := Name, <<"rel_vsn">> := Vsn}) ->
         bin([Name, "-", Vsn]) =/= bin(NameVsn)
     end,
@@ -424,9 +532,9 @@ list([#{name_vsn := NameVsn} | Rest], All) ->
                 msg => "configured_plugin_not_installed",
                 name_vsn => NameVsn
             }),
-            list(Rest, All);
+            do_list(Rest, All);
         {Front, [I | Rear]} ->
-            [I | list(Rest, Front ++ Rear)]
+            [I | do_list(Rest, Front ++ Rear)]
     end.
 
 do_ensure_started(NameVsn) ->
@@ -439,23 +547,26 @@ do_ensure_started(NameVsn) ->
                     ok = load_code_start_apps(NameVsn, Plugin);
                 {error, plugin_not_found} ->
                     ?SLOG(error, #{
-                        msg => "plugin_not_found",
+                        error_msg => "plugin_not_found",
                         name_vsn => NameVsn
-                    })
+                    }),
+                    ok
             end
         end
     ).
 
+%%--------------------------------------------------------------------
+
 %% try the function, catch 'throw' exceptions as normal 'error' return
 %% other exceptions with stacktrace logged.
 tryit(WhichOp, F) ->
     try
         F()
     catch
-        throw:Reason ->
+        throw:ReasonMap ->
             %% thrown exceptions are known errors
             %% translate to a return value without stacktrace
-            {error, Reason};
+            {error, ReasonMap};
         error:Reason:Stacktrace ->
             %% unexpected errors, log stacktrace
             ?SLOG(warning, #{
@@ -469,33 +580,44 @@ tryit(WhichOp, F) ->
 
 %% read plugin info from the JSON file
 %% returns {ok, Info} or {error, Reason}
-read_plugin(NameVsn, Options) ->
+read_plugin_info(NameVsn, Options) ->
     tryit(
-        "read_plugin_info",
-        fun() -> {ok, do_read_plugin(NameVsn, Options)} end
+        atom_to_list(?FUNCTION_NAME),
+        fun() -> {ok, do_read_plugin2(NameVsn, Options)} end
     ).
 
-do_read_plugin(Plugin) -> do_read_plugin(Plugin, #{}).
+do_read_plugin(NameVsn) ->
+    do_read_plugin2(NameVsn, #{}).
 
-do_read_plugin({file, InfoFile}, Options) ->
-    [_, NameVsn | _] = lists:reverse(filename:split(InfoFile)),
-    case hocon:load(InfoFile, #{format => richmap}) of
-        {ok, RichMap} ->
-            Info0 = check_plugin(hocon_maps:ensure_plain(RichMap), NameVsn, InfoFile),
-            Info1 = plugins_readme(NameVsn, Options, Info0),
-            plugin_status(NameVsn, Info1);
-        {error, Reason} ->
-            throw(#{
-                error => "bad_info_file",
-                path => InfoFile,
-                return => Reason
-            })
-    end;
-do_read_plugin(NameVsn, Options) ->
-    do_read_plugin({file, info_file(NameVsn)}, Options).
+do_read_plugin2(NameVsn, Option) ->
+    do_read_plugin3(NameVsn, info_file(NameVsn), Option).
+
+do_read_plugin3(NameVsn, InfoFilePath, Options) ->
+    {ok, PlainMap} = (read_file_fun(InfoFilePath, "bad_info_file"))(),
+    Info0 = check_plugin(PlainMap, NameVsn, InfoFilePath),
+    Info1 = plugins_readme(NameVsn, Options, Info0),
+    plugin_status(NameVsn, Info1).
+
+read_plugin_avsc(NameVsn) ->
+    tryit(
+        atom_to_list(?FUNCTION_NAME),
+        read_file_fun(schema_file(NameVsn), "bad_avsc_file")
+    ).
+
+read_plugin_i18n(NameVsn) ->
+    tryit(
+        atom_to_list(?FUNCTION_NAME),
+        read_file_fun(i18n_file(NameVsn), "bad_i18n_file")
+    ).
+
+read_plugin_avro(NameVsn) ->
+    tryit(
+        atom_to_list(?FUNCTION_NAME),
+        read_file_fun(schema_file(NameVsn), "bad_avro_file")
+    ).
 
 ensure_exists_and_installed(NameVsn) ->
-    case filelib:is_dir(dir(NameVsn)) of
+    case filelib:is_dir(plugin_dir(NameVsn)) of
         true ->
             ok;
         false ->
@@ -581,10 +703,6 @@ plugin_status(NameVsn, Info) ->
         config_status => ConfSt
     }.
 
-bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
-bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
-bin(B) when is_binary(B) -> B.
-
 check_plugin(
     #{
         <<"name">> := Name,
@@ -593,7 +711,7 @@ check_plugin(
         <<"description">> := _
     } = Info,
     NameVsn,
-    File
+    FilePath
 ) ->
     case bin(NameVsn) =:= bin([Name, "-", Vsn]) of
         true ->
@@ -605,7 +723,7 @@ check_plugin(
             catch
                 _:_ ->
                     throw(#{
-                        error => "bad_rel_apps",
+                        error_msg => "bad_rel_apps",
                         rel_apps => Apps,
                         hint => "A non-empty string list of app_name-app_vsn format"
                     })
@@ -613,16 +731,16 @@ check_plugin(
             Info;
         false ->
             throw(#{
-                error => "name_vsn_mismatch",
+                error_msg => "name_vsn_mismatch",
                 name_vsn => NameVsn,
-                path => File,
+                path => FilePath,
                 name => Name,
                 rel_vsn => Vsn
             })
     end;
 check_plugin(_What, NameVsn, File) ->
     throw(#{
-        error => "bad_info_file_content",
+        error_msg => "bad_info_file_content",
         mandatory_fields => [rel_vsn, name, rel_apps, description],
         name_vsn => NameVsn,
         path => File
@@ -678,7 +796,7 @@ do_load_plugin_app(AppName, Ebin) ->
                     ok;
                 {error, Reason} ->
                     throw(#{
-                        error => "failed_to_load_plugin_beam",
+                        error_msg => "failed_to_load_plugin_beam",
                         path => BeamFile,
                         reason => Reason
                     })
@@ -693,7 +811,7 @@ do_load_plugin_app(AppName, Ebin) ->
             ok;
         {error, Reason} ->
             throw(#{
-                error => "failed_to_load_plugin_app",
+                error_msg => "failed_to_load_plugin_app",
                 name => AppName,
                 reason => Reason
             })
@@ -710,7 +828,7 @@ start_app(App) ->
             ok;
         {error, {ErrApp, Reason}} ->
             throw(#{
-                error => "failed_to_start_plugin_app",
+                error_msg => "failed_to_start_plugin_app",
                 app => App,
                 err_app => ErrApp,
                 reason => Reason
@@ -775,7 +893,7 @@ stop_app(App) ->
             ?SLOG(debug, #{msg => "plugin_not_started", app => App}),
             ok = unload_moudle_and_app(App);
         {error, Reason} ->
-            throw(#{error => "failed_to_stop_app", app => App, reason => Reason})
+            throw(#{error_msg => "failed_to_stop_app", app => App, reason => Reason})
     end.
 
 unload_moudle_and_app(App) ->
@@ -802,44 +920,58 @@ is_needed_by(AppToStop, RunningApp) ->
         undefined -> false
     end.
 
-put_config(Key, Value) ->
-    put_config(Key, Value, _ConfLocation = local).
-
-put_config(Key, Value, ConfLocation) when is_atom(Key) ->
-    put_config([Key], Value, ConfLocation);
-put_config(Path, Values, _ConfLocation = local) when is_list(Path) ->
+do_put_config(Key, Value, ConfLocation) when is_atom(Key) ->
+    do_put_config([Key], Value, ConfLocation);
+do_put_config(Path, Values, _ConfLocation = local) when is_list(Path) ->
     Opts = #{rawconf_with_defaults => true, override_to => cluster},
     %% Already in cluster_rpc, don't use emqx_conf:update, dead calls
     case emqx:update_config([?CONF_ROOT | Path], bin_key(Values), Opts) of
         {ok, _} -> ok;
         Error -> Error
     end;
-put_config(Path, Values, _ConfLocation = global) when is_list(Path) ->
+do_put_config(Path, Values, _ConfLocation = global) when is_list(Path) ->
     Opts = #{rawconf_with_defaults => true, override_to => cluster},
     case emqx_conf:update([?CONF_ROOT | Path], bin_key(Values), Opts) of
         {ok, _} -> ok;
         Error -> Error
     end.
 
-bin_key(Map) when is_map(Map) ->
-    maps:fold(fun(K, V, Acc) -> Acc#{bin(K) => V} end, #{}, Map);
-bin_key(List = [#{} | _]) ->
-    lists:map(fun(M) -> bin_key(M) end, List);
-bin_key(Term) ->
-    Term.
+%%--------------------------------------------------------------------
+%% `emqx_config_handler' API
+%%--------------------------------------------------------------------
 
-get_config(Key, Default) when is_atom(Key) ->
-    get_config([Key], Default);
-get_config(Path, Default) ->
-    emqx_conf:get([?CONF_ROOT | Path], Default).
+post_config_update([?CONF_ROOT], _Req, #{states := NewStates}, #{states := OldStates}, _Envs) ->
+    NewStatesIndex = maps:from_list([{NV, S} || S = #{name_vsn := NV} <- NewStates]),
+    OldStatesIndex = maps:from_list([{NV, S} || S = #{name_vsn := NV} <- OldStates]),
+    #{changed := Changed} = emqx_utils_maps:diff_maps(NewStatesIndex, OldStatesIndex),
+    maps:foreach(fun enable_disable_plugin/2, Changed),
+    ok;
+post_config_update(_Path, _Req, _NewConf, _OldConf, _Envs) ->
+    ok.
 
-install_dir() -> get_config(install_dir, "").
+enable_disable_plugin(NameVsn, {#{enable := true}, #{enable := false}}) ->
+    %% errors are already logged in this fn
+    _ = ensure_stopped(NameVsn),
+    ok;
+enable_disable_plugin(NameVsn, {#{enable := false}, #{enable := true}}) ->
+    %% errors are already logged in this fn
+    _ = ensure_started(NameVsn),
+    ok;
+enable_disable_plugin(_NameVsn, _Diff) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Helper functions
+%%--------------------------------------------------------------------
+
+install_dir() ->
+    get_config(install_dir, "").
 
 put_configured(Configured) ->
     put_configured(Configured, _ConfLocation = local).
 
 put_configured(Configured, ConfLocation) ->
-    ok = put_config(states, bin_key(Configured), ConfLocation).
+    ok = do_put_config(states, bin_key(Configured), ConfLocation).
 
 configured() ->
     get_config(states, []).
@@ -862,25 +994,69 @@ for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) ->
     }),
     [].
 
-parse_name_vsn(NameVsn) when is_binary(NameVsn) ->
-    parse_name_vsn(binary_to_list(NameVsn));
-parse_name_vsn(NameVsn) when is_list(NameVsn) ->
-    case lists:splitwith(fun(X) -> X =/= $- end, NameVsn) of
-        {AppName, [$- | Vsn]} -> {ok, list_to_atom(AppName), Vsn};
-        _ -> {error, "bad_name_vsn"}
+maybe_post_op_after_install(NameVsn) ->
+    _ = maybe_load_config_schema(NameVsn),
+    _ = maybe_create_config_dir(NameVsn),
+    ok.
+
+maybe_load_config_schema(NameVsn) ->
+    case read_plugin_avsc(NameVsn) of
+        {ok, Avsc} ->
+            case emqx_plugins_serde:add_schema(NameVsn, Avsc) of
+                ok -> ok;
+                {error, already_exists} -> ok;
+                {error, Reason} -> {error, Reason}
+            end;
+        {error, Reason} ->
+            ?SLOG(warning, Reason)
     end.
 
-pkg_file(NameVsn) ->
-    filename:join([install_dir(), bin([NameVsn, ".tar.gz"])]).
+maybe_create_config_dir(NameVsn) ->
+    case filelib:ensure_path(plugin_config_dir(NameVsn)) of
+        ok -> ok;
+        {error, Reason} -> ?SLOG(warning, Reason)
+    end.
 
-dir(NameVsn) ->
+write_avro_bin(NameVsn, AvroBin) ->
+    ok = file:write_file(avro_config_file(NameVsn), AvroBin).
+
+read_file_fun(Path, ErrMsg) ->
+    fun() ->
+        case hocon:load(Path, #{format => richmap}) of
+            {ok, RichMap} ->
+                {ok, hocon_maps:ensure_plain(RichMap)};
+            {error, Reason} ->
+                ErrMeta = #{error_msg => ErrMsg, reason => Reason},
+                ?SLOG(warning, ErrMeta),
+                throw(ErrMeta)
+        end
+    end.
+
+%% Directorys
+plugin_dir(NameVsn) ->
     filename:join([install_dir(), NameVsn]).
 
+plugin_config_dir(NameVsn) ->
+    filename:join([plugin_dir(NameVsn), "data", "configs"]).
+
+%% Files
+pkg_file(NameVsn) ->
+    filename:join([install_dir(), bin([NameVsn, ".tar.gz"])]).
+
 info_file(NameVsn) ->
-    filename:join([dir(NameVsn), "release.json"]).
+    filename:join([plugin_dir(NameVsn), "release.json"]).
+
+schema_file(NameVsn) ->
+    filename:join([plugin_dir(NameVsn), "config_schema.avsc"]).
+
+avro_config_file(NameVsn) ->
+    filename:join([plugin_config_dir(NameVsn), "config.avro"]).
+
+i18n_file(NameVsn) ->
+    filename:join([plugin_dir(NameVsn), "i18n.json"]).
 
 readme_file(NameVsn) ->
-    filename:join([dir(NameVsn), "README.md"]).
+    filename:join([plugin_dir(NameVsn), "README.md"]).
 
 running_apps() ->
     lists:map(
@@ -890,26 +1066,13 @@ running_apps() ->
         application:which_applications(infinity)
     ).
 
-%%--------------------------------------------------------------------
-%% `emqx_config_handler' API
-%%--------------------------------------------------------------------
-
-post_config_update([?CONF_ROOT], _Req, #{states := NewStates}, #{states := OldStates}, _Envs) ->
-    NewStatesIndex = maps:from_list([{NV, S} || S = #{name_vsn := NV} <- NewStates]),
-    OldStatesIndex = maps:from_list([{NV, S} || S = #{name_vsn := NV} <- OldStates]),
-    #{changed := Changed} = emqx_utils_maps:diff_maps(NewStatesIndex, OldStatesIndex),
-    maps:foreach(fun enable_disable_plugin/2, Changed),
-    ok;
-post_config_update(_Path, _Req, _NewConf, _OldConf, _Envs) ->
-    ok.
+bin_key(Map) when is_map(Map) ->
+    maps:fold(fun(K, V, Acc) -> Acc#{bin(K) => V} end, #{}, Map);
+bin_key(List = [#{} | _]) ->
+    lists:map(fun(M) -> bin_key(M) end, List);
+bin_key(Term) ->
+    Term.
 
-enable_disable_plugin(NameVsn, {#{enable := true}, #{enable := false}}) ->
-    %% errors are already logged in this fn
-    _ = ensure_stopped(NameVsn),
-    ok;
-enable_disable_plugin(NameVsn, {#{enable := false}, #{enable := true}}) ->
-    %% errors are already logged in this fn
-    _ = ensure_started(NameVsn),
-    ok;
-enable_disable_plugin(_NameVsn, _Diff) ->
-    ok.
+bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
+bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
+bin(B) when is_binary(B) -> B.

+ 274 - 0
apps/emqx_plugins/src/emqx_plugins_serde.erl

@@ -0,0 +1,274 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_plugins_serde).
+
+-include("emqx_plugins.hrl").
+-include_lib("emqx/include/logger.hrl").
+
+%% API
+-export([
+    start_link/0,
+    get_serde/1,
+    add_schema/2,
+    get_schema/1,
+    delete_schema/1
+]).
+
+%% `gen_server' API
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_continue/2,
+    terminate/2
+]).
+
+-export([
+    decode/2,
+    encode/2
+]).
+
+%%-------------------------------------------------------------------------------------------------
+%% API
+%%-------------------------------------------------------------------------------------------------
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+-spec get_serde(schema_name()) -> {ok, plugin_schema_serde()} | {error, not_found}.
+get_serde(SchemaName) ->
+    case ets:lookup(?PLUGIN_SERDE_TAB, to_bin(SchemaName)) of
+        [] ->
+            {error, not_found};
+        [Serde] ->
+            {ok, Serde}
+    end.
+
+-spec add_schema(schema_name(), avsc()) -> ok | {error, term()}.
+add_schema(Name, Avsc) ->
+    case get_serde(Name) of
+        {ok, _Serde} ->
+            ?SLOG(warning, #{msg => "plugin_avsc_schema_already_exists", name_vsn => Name}),
+            {error, already_exists};
+        {error, not_found} ->
+            case gen_server:call(?MODULE, {build_serdes, to_bin(Name), Avsc}) of
+                ok ->
+                    ?SLOG(debug, #{msg => "plugin_avsc_schema_added", name_vsn => Name}),
+                    ok;
+                {error, Reason} = E ->
+                    ?SLOG(error, #{
+                        msg => "plugin_avsc_schema_added_failed",
+                        reason => emqx_utils:readable_error_msg(Reason)
+                    }),
+                    E
+            end
+    end.
+
+get_schema(NameVsn) ->
+    Path = emqx_plugins:schema_file(NameVsn),
+    case read_avsc_file(Path) of
+        {ok, Avsc} ->
+            {ok, Avsc};
+        {error, Reason} ->
+            ?SLOG(warning, Reason),
+            {error, Reason}
+    end.
+
+-spec delete_schema(schema_name()) -> ok | {error, term()}.
+delete_schema(NameVsn) ->
+    case get_serde(NameVsn) of
+        {ok, _Serde} ->
+            async_delete_serdes([NameVsn]),
+            ok;
+        {error, not_found} ->
+            {error, not_found}
+    end.
+
+-spec decode(schema_name(), encoded_data()) -> {ok, decoded_data()} | {error, any()}.
+decode(SerdeName, RawData) ->
+    with_serde(
+        "decode_avro_binary",
+        eval_serde_fun(?FUNCTION_NAME, "bad_avro_binary", SerdeName, [RawData])
+    ).
+
+-spec encode(schema_name(), decoded_data()) -> {ok, encoded_data()} | {error, any()}.
+encode(SerdeName, Data) ->
+    with_serde(
+        "encode_avro_data",
+        eval_serde_fun(?FUNCTION_NAME, "bad_avro_data", SerdeName, [Data])
+    ).
+
+%%-------------------------------------------------------------------------------------------------
+%% `gen_server' API
+%%-------------------------------------------------------------------------------------------------
+
+init(_) ->
+    process_flag(trap_exit, true),
+    ok = emqx_utils_ets:new(?PLUGIN_SERDE_TAB, [
+        public, ordered_set, {keypos, #plugin_schema_serde.name}
+    ]),
+    State = #{},
+    SchemasMap = read_plugin_avsc(),
+    {ok, State, {continue, {build_serdes, SchemasMap}}}.
+
+handle_continue({build_serdes, SchemasMap}, State) ->
+    _ = build_serdes(SchemasMap),
+    {noreply, State}.
+
+handle_call({build_serdes, {NameVsn, Avsc}}, _From, State) ->
+    BuildRes = do_build_serde(NameVsn, Avsc),
+    {reply, BuildRes, State};
+handle_call(_Call, _From, State) ->
+    {reply, {error, unknown_call}, State}.
+
+handle_cast({delete_serdes, Names}, State) ->
+    lists:foreach(fun ensure_serde_absent/1, Names),
+    {noreply, State};
+handle_cast(_Cast, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+%%-------------------------------------------------------------------------------------------------
+%% Internal fns
+%%-------------------------------------------------------------------------------------------------
+
+read_plugin_avsc() ->
+    Pattern = filename:join([emqx_plugins:install_dir(), "*", "config_schema.avsc"]),
+    lists:foldl(
+        fun(AvscPath, AccIn) ->
+            case read_avsc_file(AvscPath) of
+                {ok, Avsc} ->
+                    [_, NameVsn | _] = lists:reverse(filename:split(AvscPath)),
+                    AccIn#{to_bin(NameVsn) => Avsc};
+                {error, Reason} ->
+                    ?SLOG(warning, Reason),
+                    AccIn
+            end
+        end,
+        _Acc0 = #{},
+        filelib:wildcard(Pattern)
+    ).
+
+build_serdes(Schemas) ->
+    maps:foreach(fun do_build_serde/2, Schemas).
+
+do_build_serde(NameVsn, Avsc) ->
+    try
+        Serde = make_serde(NameVsn, Avsc),
+        true = ets:insert(?PLUGIN_SERDE_TAB, Serde),
+        ok
+    catch
+        Kind:Error:Stacktrace ->
+            ?SLOG(
+                error,
+                #{
+                    msg => "error_building_plugin_schema_serde",
+                    name => NameVsn,
+                    kind => Kind,
+                    error => Error,
+                    stacktrace => Stacktrace
+                }
+            ),
+            {error, Error}
+    end.
+
+make_serde(NameVsn, Avsc) ->
+    Store0 = avro_schema_store:new([map]),
+    %% import the schema into the map store with an assigned name
+    %% if it's a named schema (e.g. struct), then Name is added as alias
+    Store = avro_schema_store:import_schema_json(NameVsn, Avsc, Store0),
+    #plugin_schema_serde{
+        name = NameVsn,
+        eval_context = Store
+    }.
+
+ensure_serde_absent(Name) when not is_binary(Name) ->
+    ensure_serde_absent(to_bin(Name));
+ensure_serde_absent(Name) ->
+    case get_serde(Name) of
+        {ok, _Serde} ->
+            _ = ets:delete(?PLUGIN_SERDE_TAB, Name),
+            ok;
+        {error, not_found} ->
+            ok
+    end.
+
+async_delete_serdes(Names) ->
+    gen_server:cast(?MODULE, {delete_serdes, Names}).
+
+with_serde(WhichOp, Fun) ->
+    try
+        Fun()
+    catch
+        throw:Reason ->
+            ?SLOG(error, Reason#{
+                which_op => WhichOp,
+                reason => emqx_utils:readable_error_msg(Reason)
+            }),
+            {error, Reason};
+        error:Reason:Stacktrace ->
+            %% unexpected errors, log stacktrace
+            ?SLOG(warning, #{
+                msg => "plugin_schema_op_failed",
+                which_op => WhichOp,
+                exception => Reason,
+                stacktrace => Stacktrace
+            }),
+            {error, #{
+                which_op => WhichOp,
+                reason => Reason
+            }}
+    end.
+
+eval_serde_fun(Op, ErrMsg, SerdeName, Args) ->
+    fun() ->
+        case get_serde(SerdeName) of
+            {ok, Serde} ->
+                eval_serde(Op, Serde, Args);
+            {error, not_found} ->
+                throw(#{
+                    error_msg => ErrMsg,
+                    reason => plugin_serde_not_found,
+                    serde_name => SerdeName
+                })
+        end
+    end.
+
+eval_serde(decode, #plugin_schema_serde{name = Name, eval_context = Store}, [Data]) ->
+    Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]),
+    {ok, avro_binary_decoder:decode(Data, Name, Store, Opts)};
+eval_serde(encode, #plugin_schema_serde{name = Name, eval_context = Store}, [Data]) ->
+    {ok, avro_binary_encoder:encode(Store, Name, Data)};
+eval_serde(_, _, _) ->
+    throw(#{error_msg => "unexpected_plugin_avro_op"}).
+
+read_avsc_file(Path) ->
+    case file:read_file(Path) of
+        {ok, Bin} ->
+            {ok, Bin};
+        {error, _} ->
+            {error, #{
+                error => "failed_to_read_plugin_schema",
+                path => Path
+            }}
+    end.
+
+to_bin(A) when is_atom(A) -> atom_to_binary(A);
+to_bin(L) when is_list(L) -> iolist_to_binary(L);
+to_bin(B) when is_binary(B) -> B.

+ 11 - 1
apps/emqx_plugins/src/emqx_plugins_sup.erl

@@ -32,4 +32,14 @@ init([]) ->
             intensity => 100,
             period => 10
         },
-    {ok, {SupFlags, []}}.
+    ChildSpecs = [child_spec(emqx_plugins_serde)],
+    {ok, {SupFlags, ChildSpecs}}.
+
+child_spec(Mod) ->
+    #{
+        id => Mod,
+        start => {Mod, start_link, []},
+        restart => permanent,
+        shutdown => 5_000,
+        type => worker
+    }.

+ 12 - 12
apps/emqx_plugins/test/emqx_plugins_SUITE.erl

@@ -346,7 +346,7 @@ t_enable_disable(Config) ->
     ?assertEqual([#{name_vsn => NameVsn, enable => true}], emqx_plugins:configured()),
     ?assertMatch(
         {error, #{
-            reason := "bad_plugin_config_status",
+            error_msg := "bad_plugin_config_status",
             hint := "disable_the_plugin_first"
         }},
         emqx_plugins:ensure_uninstalled(NameVsn)
@@ -374,15 +374,15 @@ t_bad_tar_gz(Config) ->
     ok = file:write_file(FakeTarTz, "a\n"),
     ?assertMatch(
         {error, #{
-            reason := "bad_plugin_package",
-            return := eof
+            error_msg := "bad_plugin_package",
+            reason := eof
         }},
         emqx_plugins:ensure_installed("fake-vsn")
     ),
     ?assertMatch(
         {error, #{
-            reason := "failed_to_extract_plugin_package",
-            return := not_found
+            error_msg := "failed_to_extract_plugin_package",
+            reason := not_found
         }},
         emqx_plugins:ensure_installed("nonexisting")
     ),
@@ -412,7 +412,7 @@ t_bad_tar_gz2(Config) ->
     ?assert(filelib:is_regular(TarGz)),
     %% failed to install, it also cleans up the bad content of .tar.gz file
     ?assertMatch({error, _}, emqx_plugins:ensure_installed(NameVsn)),
-    ?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:dir(NameVsn))),
+    ?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:plugin_dir(NameVsn))),
     %% but the tar.gz file is still around
     ?assert(filelib:is_regular(TarGz)),
     ok.
@@ -440,8 +440,8 @@ t_tar_vsn_content_mismatch(Config) ->
     %% failed to install, it also cleans up content of the bad .tar.gz file even
     %% if in other directory
     ?assertMatch({error, _}, emqx_plugins:ensure_installed(NameVsn)),
-    ?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:dir(NameVsn))),
-    ?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:dir("foo-0.2"))),
+    ?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:plugin_dir(NameVsn))),
+    ?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:plugin_dir("foo-0.2"))),
     %% the tar.gz file is still around
     ?assert(filelib:is_regular(TarGz)),
     ok.
@@ -455,15 +455,15 @@ t_bad_info_json(Config) ->
     ok = write_info_file(Config, NameVsn, "bad-syntax"),
     ?assertMatch(
         {error, #{
-            error := "bad_info_file",
-            return := {parse_error, _}
+            error_msg := "bad_info_file",
+            reason := {parse_error, _}
         }},
         emqx_plugins:describe(NameVsn)
     ),
     ok = write_info_file(Config, NameVsn, "{\"bad\": \"obj\"}"),
     ?assertMatch(
         {error, #{
-            error := "bad_info_file_content",
+            error_msg := "bad_info_file_content",
             mandatory_fields := _
         }},
         emqx_plugins:describe(NameVsn)
@@ -499,7 +499,7 @@ t_elixir_plugin(Config) ->
     ok = emqx_plugins:ensure_installed(NameVsn),
     %% idempotent
     ok = emqx_plugins:ensure_installed(NameVsn),
-    {ok, Info} = emqx_plugins:read_plugin(NameVsn, #{}),
+    {ok, Info} = emqx_plugins:read_plugin_info(NameVsn, #{}),
     ?assertEqual([Info], emqx_plugins:list()),
     %% start
     ok = emqx_plugins:ensure_started(NameVsn),

+ 2 - 2
apps/emqx_plugins/test/emqx_plugins_tests.erl

@@ -57,7 +57,7 @@ read_plugin_test() ->
                 ok = write_file(InfoFile, FakeInfo),
                 ?assertMatch(
                     {error, #{error := "bad_rel_apps"}},
-                    emqx_plugins:read_plugin(NameVsn, #{})
+                    emqx_plugins:read_plugin_info(NameVsn, #{})
                 )
             after
                 emqx_plugins:purge(NameVsn)
@@ -109,7 +109,7 @@ purge_test() ->
     with_rand_install_dir(
         fun(_Dir) ->
             File = emqx_plugins:info_file("a-1"),
-            Dir = emqx_plugins:dir("a-1"),
+            Dir = emqx_plugins:plugin_dir("a-1"),
             ok = filelib:ensure_dir(File),
             ?assertMatch({ok, _}, file:read_file_info(Dir)),
             ?assertEqual(ok, emqx_plugins:purge("a-1")),