Просмотр исходного кода

feat: do not fail on other nodes when the RPC succeeds on the first node

zhongwencool 1 год назад
Родитель
Сommit
e1c3b7587d

+ 6 - 0
apps/emqx/include/emqx.hrl

@@ -95,4 +95,10 @@
     until :: integer()
 }).
 
+%%--------------------------------------------------------------------
+%% Configurations
+%%--------------------------------------------------------------------
+-define(KIND_REPLICATE, replicate).
+-define(KIND_INITIATE, initiate).
+
 -endif.

+ 28 - 9
apps/emqx/src/emqx.erl

@@ -61,9 +61,12 @@
     get_raw_config/2,
     update_config/2,
     update_config/3,
+    update_config/4,
     remove_config/1,
     remove_config/2,
+    remove_config/3,
     reset_config/2,
+    reset_config/3,
     data_dir/0,
     etc_file/1,
     cert_file/1,
@@ -195,38 +198,52 @@ get_raw_config(KeyPath, Default) ->
 -spec update_config(emqx_utils_maps:config_key_path(), emqx_config:update_request()) ->
     {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
 update_config(KeyPath, UpdateReq) ->
-    update_config(KeyPath, UpdateReq, #{}).
+    update_config(KeyPath, UpdateReq, #{}, #{}).
+
+update_config(KeyPath, UpdateReq, Opts) ->
+    update_config(KeyPath, UpdateReq, Opts, #{}).
 
 -spec update_config(
     emqx_utils_maps:config_key_path(),
     emqx_config:update_request(),
-    emqx_config:update_opts()
+    emqx_config:update_opts(),
+    map()
 ) ->
     {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
-update_config([RootName | _] = KeyPath, UpdateReq, Opts) ->
+update_config([RootName | _] = KeyPath, UpdateReq, Opts, ClusterRpcOpts) ->
     emqx_config_handler:update_config(
         emqx_config:get_schema_mod(RootName),
         KeyPath,
-        {{update, UpdateReq}, Opts}
+        {{update, UpdateReq}, Opts},
+        ClusterRpcOpts
     ).
 
 -spec remove_config(emqx_utils_maps:config_key_path()) ->
     {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
 remove_config(KeyPath) ->
-    remove_config(KeyPath, #{}).
+    remove_config(KeyPath, #{}, #{}).
 
 -spec remove_config(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
     {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
-remove_config([RootName | _] = KeyPath, Opts) ->
+remove_config([_RootName | _] = KeyPath, Opts) ->
+    remove_config(KeyPath, Opts, #{}).
+
+-spec remove_config(emqx_utils_maps:config_key_path(), emqx_config:update_opts(), map()) ->
+    {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
+remove_config([RootName | _] = KeyPath, Opts, ClusterRpcOpts) ->
     emqx_config_handler:update_config(
         emqx_config:get_schema_mod(RootName),
         KeyPath,
-        {remove, Opts}
+        {remove, Opts},
+        ClusterRpcOpts
     ).
 
 -spec reset_config(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
     {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
 reset_config([RootName | SubKeys] = KeyPath, Opts) ->
+    reset_config([RootName | SubKeys] = KeyPath, Opts, #{}).
+
+reset_config([RootName | SubKeys] = KeyPath, Opts, ClusterRpcOpts) ->
     case emqx_config:get_default_value(KeyPath) of
         {ok, Default} ->
             Mod = emqx_config:get_schema_mod(RootName),
@@ -235,7 +252,8 @@ reset_config([RootName | SubKeys] = KeyPath, Opts) ->
                     emqx_config_handler:update_config(
                         Mod,
                         KeyPath,
-                        {{update, Default}, Opts}
+                        {{update, Default}, Opts},
+                        ClusterRpcOpts
                     );
                 false ->
                     NewConf =
@@ -247,7 +265,8 @@ reset_config([RootName | SubKeys] = KeyPath, Opts) ->
                     emqx_config_handler:update_config(
                         Mod,
                         [RootName],
-                        {{update, NewConf}, Opts}
+                        {{update, NewConf}, Opts},
+                        ClusterRpcOpts
                     )
             end;
         {error, _} = Error ->

+ 159 - 69
apps/emqx/src/emqx_config_handler.erl

@@ -18,6 +18,7 @@
 -module(emqx_config_handler).
 
 -include("logger.hrl").
+-include("emqx.hrl").
 -include("emqx_schema.hrl").
 -include_lib("hocon/include/hocon_types.hrl").
 
@@ -30,6 +31,7 @@
     add_handler/2,
     remove_handler/1,
     update_config/3,
+    update_config/4,
     get_raw_cluster_override_conf/0,
     info/0
 ]).
@@ -50,12 +52,17 @@
 -define(WKEY, '?').
 
 -type handler_name() :: module().
+-type cluster_rpc_opts() :: map().
 
 -optional_callbacks([
     pre_config_update/3,
+    pre_config_update/4,
     propagated_pre_config_update/3,
+    propagated_pre_config_update/4,
     post_config_update/5,
-    propagated_post_config_update/5
+    post_config_update/6,
+    propagated_post_config_update/5,
+    propagated_post_config_update/6
 ]).
 
 -callback pre_config_update([atom()], emqx_config:update_request(), emqx_config:raw_config()) ->
@@ -83,6 +90,35 @@
 ) ->
     ok | {ok, Result :: any()} | {error, Reason :: term()}.
 
+-callback pre_config_update(
+    [atom()], emqx_config:update_request(), emqx_config:raw_config(), cluster_rpc_opts()
+) ->
+    ok | {ok, emqx_config:update_request()} | {error, term()}.
+-callback propagated_pre_config_update(
+    [binary()], emqx_config:update_request(), emqx_config:raw_config(), cluster_rpc_opts()
+) ->
+    ok | {ok, emqx_config:update_request()} | {error, term()}.
+
+-callback post_config_update(
+    [atom()],
+    emqx_config:update_request(),
+    emqx_config:config(),
+    emqx_config:config(),
+    emqx_config:app_envs(),
+    cluster_rpc_opts()
+) ->
+    ok | {ok, Result :: any()} | {error, Reason :: term()}.
+
+-callback propagated_post_config_update(
+    [atom()],
+    emqx_config:update_request(),
+    emqx_config:config(),
+    emqx_config:config(),
+    emqx_config:app_envs(),
+    cluster_rpc_opts()
+) ->
+    ok | {ok, Result :: any()} | {error, Reason :: term()}.
+
 -type state() :: #{handlers := any()}.
 -type config_key_path() :: emqx_utils_maps:config_key_path().
 
@@ -92,12 +128,17 @@ start_link() ->
 stop() ->
     gen_server:stop(?MODULE).
 
--spec update_config(module(), config_key_path(), emqx_config:update_args()) ->
-    {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
 update_config(SchemaModule, ConfKeyPath, UpdateArgs) ->
+    update_config(SchemaModule, ConfKeyPath, UpdateArgs, #{}).
+
+-spec update_config(module(), config_key_path(), emqx_config:update_args(), map()) ->
+    {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
+update_config(SchemaModule, ConfKeyPath, UpdateArgs, ClusterOpts) ->
     %% force convert the path to a list of atoms, as there maybe some wildcard names/ids in the path
     AtomKeyPath = [atom(Key) || Key <- ConfKeyPath],
-    gen_server:call(?MODULE, {change_config, SchemaModule, AtomKeyPath, UpdateArgs}, infinity).
+    gen_server:call(
+        ?MODULE, {change_config, SchemaModule, AtomKeyPath, UpdateArgs, ClusterOpts}, infinity
+    ).
 
 -spec add_handler(config_key_path(), handler_name()) ->
     ok | {error, {conflict, list()}}.
@@ -130,11 +171,11 @@ handle_call({add_handler, ConfKeyPath, HandlerName}, _From, State = #{handlers :
         {error, _Reason} = Error -> {reply, Error, State}
     end;
 handle_call(
-    {change_config, SchemaModule, ConfKeyPath, UpdateArgs},
+    {change_config, SchemaModule, ConfKeyPath, UpdateArgs, ClusterRpcOpts},
     _From,
     #{handlers := Handlers} = State
 ) ->
-    Result = handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs),
+    Result = handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs, ClusterRpcOpts),
     {reply, Result, State};
 handle_call(get_raw_cluster_override_conf, _From, State) ->
     Reply = emqx_config:read_override_conf(#{override_to => cluster}),
@@ -203,9 +244,9 @@ filter_top_level_handlers(Handlers) ->
         Handlers
     ).
 
-handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) ->
+handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs, ClusterRpcOpts) ->
     try
-        do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs)
+        do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs, ClusterRpcOpts)
     catch
         throw:Reason ->
             {error, Reason};
@@ -217,13 +258,14 @@ handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) ->
                 update_req => UpdateArgs,
                 module => SchemaModule,
                 key_path => ConfKeyPath,
+                cluster_rpc_opts => ClusterRpcOpts,
                 stacktrace => ST
             }),
             {error, {config_update_crashed, Reason}}
     end.
 
-do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) ->
-    case process_update_request(ConfKeyPath, Handlers, UpdateArgs) of
+do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs, ClusterOpts) ->
+    case process_update_request(ConfKeyPath, Handlers, UpdateArgs, ClusterOpts) of
         {ok, NewRawConf, OverrideConf, Opts} ->
             check_and_save_configs(
                 SchemaModule,
@@ -232,23 +274,24 @@ do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) ->
                 NewRawConf,
                 OverrideConf,
                 UpdateArgs,
-                Opts
+                Opts,
+                ClusterOpts
             );
         {error, Result} ->
             {error, Result}
     end.
 
-process_update_request([_], _Handlers, {remove, _Opts}) ->
+process_update_request([_], _Handlers, {remove, _Opts}, _ClusterRpcOpts) ->
     {error, "remove_root_is_forbidden"};
-process_update_request(ConfKeyPath, _Handlers, {remove, Opts}) ->
+process_update_request(ConfKeyPath, _Handlers, {remove, Opts}, _ClusterRpcOpts) ->
     OldRawConf = emqx_config:get_root_raw(ConfKeyPath),
     BinKeyPath = bin_path(ConfKeyPath),
     NewRawConf = emqx_utils_maps:deep_remove(BinKeyPath, OldRawConf),
     OverrideConf = remove_from_override_config(BinKeyPath, Opts),
     {ok, NewRawConf, OverrideConf, Opts};
-process_update_request(ConfKeyPath, Handlers, {{update, UpdateReq}, Opts}) ->
+process_update_request(ConfKeyPath, Handlers, {{update, UpdateReq}, Opts}, ClusterRpcOpts) ->
     OldRawConf = emqx_config:get_root_raw(ConfKeyPath),
-    case do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq) of
+    case do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq, ClusterRpcOpts) of
         {ok, NewRawConf} ->
             OverrideConf = merge_to_override_config(NewRawConf, Opts),
             {ok, NewRawConf, OverrideConf, Opts};
@@ -256,15 +299,16 @@ process_update_request(ConfKeyPath, Handlers, {{update, UpdateReq}, Opts}) ->
             Error
     end.
 
-do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq) ->
-    do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq, []).
+do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq, ClusterRpcOpts) ->
+    do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq, ClusterRpcOpts, []).
 
-do_update_config([], Handlers, OldRawConf, UpdateReq, ConfKeyPath) ->
+do_update_config([], Handlers, OldRawConf, UpdateReq, ClusterRpcOpts, ConfKeyPath) ->
     call_pre_config_update(#{
         handlers => Handlers,
         old_raw_conf => OldRawConf,
         update_req => UpdateReq,
         conf_key_path => ConfKeyPath,
+        cluster_rpc_opts => ClusterRpcOpts,
         callback => pre_config_update,
         is_propagated => false
     });
@@ -273,13 +317,18 @@ do_update_config(
     Handlers,
     OldRawConf,
     UpdateReq,
+    ClusterRpcOpts,
     ConfKeyPath0
 ) ->
     ConfKeyPath = ConfKeyPath0 ++ [ConfKey],
     ConfKeyBin = bin(ConfKey),
     SubOldRawConf = get_sub_config(ConfKeyBin, OldRawConf),
     SubHandlers = get_sub_handlers(ConfKey, Handlers),
-    case do_update_config(SubConfKeyPath, SubHandlers, SubOldRawConf, UpdateReq, ConfKeyPath) of
+    case
+        do_update_config(
+            SubConfKeyPath, SubHandlers, SubOldRawConf, UpdateReq, ClusterRpcOpts, ConfKeyPath
+        )
+    of
         {ok, NewUpdateReq} ->
             merge_to_old_config(#{ConfKeyBin => NewUpdateReq}, OldRawConf);
         Error ->
@@ -293,12 +342,18 @@ check_and_save_configs(
     NewRawConf,
     OverrideConf,
     UpdateArgs,
-    Opts
+    Opts,
+    ClusterOpts
 ) ->
     Schema = schema(SchemaModule, ConfKeyPath),
+    Kind = maps:get(kind, ClusterOpts, ?KIND_INITIATE),
     {AppEnvs, NewConf} = emqx_config:check_config(Schema, NewRawConf),
     OldConf = emqx_config:get_root(ConfKeyPath),
-    case do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, #{}) of
+    case
+        do_post_config_update(
+            ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, ClusterOpts, #{}
+        )
+    of
         {ok, Result0} ->
             post_update_ok(
                 AppEnvs,
@@ -310,21 +365,24 @@ check_and_save_configs(
                 UpdateArgs,
                 Result0
             );
-        {error, {post_config_update, HandlerName, Reason}} ->
-            HandlePostFailureFun =
-                fun() ->
-                    post_update_ok(
-                        AppEnvs,
-                        NewConf,
-                        NewRawConf,
-                        OverrideConf,
-                        Opts,
-                        ConfKeyPath,
-                        UpdateArgs,
-                        #{}
-                    )
-                end,
-            {error, {post_config_update, HandlerName, {Reason, HandlePostFailureFun}}}
+        {error, {post_config_update, HandlerName, Reason}} when Kind =/= ?KIND_INITIATE ->
+            ?SLOG(critical, #{
+                msg => "post_config_update_failed_but_save_the_config_anyway",
+                handler => HandlerName,
+                reason => Reason
+            }),
+            post_update_ok(
+                AppEnvs,
+                NewConf,
+                NewRawConf,
+                OverrideConf,
+                Opts,
+                ConfKeyPath,
+                UpdateArgs,
+                #{}
+            );
+        {error, _} = Error ->
+            Error
     end.
 
 post_update_ok(AppEnvs, NewConf, NewRawConf, OverrideConf, Opts, ConfKeyPath, UpdateArgs, Result0) ->
@@ -332,7 +390,9 @@ post_update_ok(AppEnvs, NewConf, NewRawConf, OverrideConf, Opts, ConfKeyPath, Up
     Result1 = return_change_result(ConfKeyPath, UpdateArgs),
     {ok, Result1#{post_config_update => Result0}}.
 
-do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, Result) ->
+do_post_config_update(
+    ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, ClusterOpts, Result
+) ->
     do_post_config_update(
         ConfKeyPath,
         Handlers,
@@ -340,6 +400,7 @@ do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateAr
         NewConf,
         AppEnvs,
         UpdateArgs,
+        ClusterOpts,
         Result,
         []
     ).
@@ -352,6 +413,7 @@ do_post_config_update(
     AppEnvs,
     UpdateArgs,
     Result,
+    ClusterOpts,
     ConfKeyPath
 ) ->
     call_post_config_update(#{
@@ -362,6 +424,7 @@ do_post_config_update(
         update_req => up_req(UpdateArgs),
         result => Result,
         conf_key_path => ConfKeyPath,
+        cluster_rpc_opts => ClusterOpts,
         callback => post_config_update
     });
 do_post_config_update(
@@ -371,6 +434,7 @@ do_post_config_update(
     NewConf,
     AppEnvs,
     UpdateArgs,
+    ClusterOpts,
     Result,
     ConfKeyPath0
 ) ->
@@ -385,6 +449,7 @@ do_post_config_update(
         SubNewConf,
         AppEnvs,
         UpdateArgs,
+        ClusterOpts,
         Result,
         ConfKeyPath
     ).
@@ -428,37 +493,41 @@ call_proper_pre_config_update(
     #{
         handlers := #{?MOD := Module},
         callback := Callback,
-        update_req := UpdateReq,
-        old_raw_conf := OldRawConf
+        update_req := UpdateReq
     } = Ctx
 ) ->
-    case erlang:function_exported(Module, Callback, 3) of
-        true ->
-            case apply_pre_config_update(Module, Ctx) of
-                {ok, NewUpdateReq} ->
-                    {ok, NewUpdateReq};
-                ok ->
-                    {ok, UpdateReq};
-                {error, Reason} ->
-                    {error, {pre_config_update, Module, Reason}}
-            end;
-        false ->
-            merge_to_old_config(UpdateReq, OldRawConf)
+    Arity = get_function_arity(Module, Callback, [3, 4]),
+    case apply_pre_config_update(Module, Callback, Arity, Ctx) of
+        {ok, NewUpdateReq} ->
+            {ok, NewUpdateReq};
+        ok ->
+            {ok, UpdateReq};
+        {error, Reason} ->
+            {error, {pre_config_update, Module, Reason}}
     end;
 call_proper_pre_config_update(
     #{update_req := UpdateReq}
 ) ->
     {ok, UpdateReq}.
 
-apply_pre_config_update(Module, #{
+apply_pre_config_update(Module, Callback, 3, #{
+    conf_key_path := ConfKeyPath,
+    update_req := UpdateReq,
+    old_raw_conf := OldRawConf
+}) ->
+    Module:Callback(ConfKeyPath, UpdateReq, OldRawConf);
+apply_pre_config_update(Module, Callback, 4, #{
     conf_key_path := ConfKeyPath,
     update_req := UpdateReq,
     old_raw_conf := OldRawConf,
-    callback := Callback
+    cluster_rpc_opts := ClusterRpcOpts
 }) ->
-    Module:Callback(
-        ConfKeyPath, UpdateReq, OldRawConf
-    ).
+    Module:Callback(ConfKeyPath, UpdateReq, OldRawConf, ClusterRpcOpts);
+apply_pre_config_update(_Module, _Callback, false, #{
+    update_req := UpdateReq,
+    old_raw_conf := OldRawConf
+}) ->
+    merge_to_old_config(UpdateReq, OldRawConf).
 
 propagate_pre_config_updates_to_subconf(
     #{handlers := #{?WKEY := _}} = Ctx
@@ -560,28 +629,23 @@ call_proper_post_config_update(
         result := Result
     } = Ctx
 ) ->
-    case erlang:function_exported(Module, Callback, 5) of
-        true ->
-            case apply_post_config_update(Module, Ctx) of
-                ok -> {ok, Result};
-                {ok, Result1} -> {ok, Result#{Module => Result1}};
-                {error, Reason} -> {error, {post_config_update, Module, Reason}}
-            end;
-        false ->
-            {ok, Result}
+    Arity = get_function_arity(Module, Callback, [5, 6]),
+    case apply_post_config_update(Module, Callback, Arity, Ctx) of
+        ok -> {ok, Result};
+        {ok, Result1} -> {ok, Result#{Module => Result1}};
+        {error, Reason} -> {error, {post_config_update, Module, Reason}}
     end;
 call_proper_post_config_update(
     #{result := Result} = _Ctx
 ) ->
     {ok, Result}.
 
-apply_post_config_update(Module, #{
+apply_post_config_update(Module, Callback, 5, #{
     conf_key_path := ConfKeyPath,
     update_req := UpdateReq,
     new_conf := NewConf,
     old_conf := OldConf,
-    app_envs := AppEnvs,
-    callback := Callback
+    app_envs := AppEnvs
 }) ->
     Module:Callback(
         ConfKeyPath,
@@ -589,7 +653,25 @@ apply_post_config_update(Module, #{
         NewConf,
         OldConf,
         AppEnvs
-    ).
+    );
+apply_post_config_update(Module, Callback, 6, #{
+    conf_key_path := ConfKeyPath,
+    update_req := UpdateReq,
+    cluster_rpc_opts := ClusterRpcOpts,
+    new_conf := NewConf,
+    old_conf := OldConf,
+    app_envs := AppEnvs
+}) ->
+    Module:Callback(
+        ConfKeyPath,
+        UpdateReq,
+        NewConf,
+        OldConf,
+        AppEnvs,
+        ClusterRpcOpts
+    );
+apply_post_config_update(_Module, _Callback, false, _Ctx) ->
+    ok.
 
 propagate_post_config_updates_to_subconf(
     #{handlers := #{?WKEY := _}} = Ctx
@@ -811,3 +893,11 @@ load_prev_handlers() ->
 
 save_handlers(Handlers) ->
     application:set_env(emqx, ?MODULE, Handlers).
+
+get_function_arity(_Module, _Callback, []) ->
+    false;
+get_function_arity(Module, Callback, [Arity | Opts]) ->
+    case erlang:function_exported(Module, Callback, Arity) of
+        true -> Arity;
+        false -> get_function_arity(Module, Callback, Opts)
+    end.

+ 1 - 8
apps/emqx/test/emqx_config_handler_SUITE.erl

@@ -415,14 +415,7 @@ assert_update_result(FailedPath, Update, Expect) ->
 
 assert_update_result(Paths, UpdatePath, Update, Expect) ->
     with_update_result(Paths, UpdatePath, Update, fun(Old, Result) ->
-        case Expect of
-            {error, {post_config_update, ?MODULE, post_config_update_error}} ->
-                ?assertMatch(
-                    {error, {post_config_update, ?MODULE, {post_config_update_error, _}}}, Result
-                );
-            _ ->
-                ?assertEqual(Expect, Result)
-        end,
+        ?assertEqual(Expect, Result),
         New = emqx:get_raw_config(UpdatePath, undefined),
         ?assertEqual(Old, New)
     end).

+ 1 - 1
apps/emqx/test/emqx_listeners_SUITE.erl

@@ -593,7 +593,7 @@ t_quic_update_opts_fail(Config) ->
 
         %% THEN: Reload failed but old listener is rollbacked.
         ?assertMatch(
-            {error, {post_config_update, emqx_listeners, {{rollbacked, {error, tls_error}}, _}}},
+            {error, {post_config_update, emqx_listeners, {rollbacked, {error, tls_error}}}},
             UpdateResult1
         ),
 

+ 38 - 28
apps/emqx_auth/src/emqx_authz/emqx_authz.erl

@@ -51,7 +51,7 @@
     set_feature_available/2
 ]).
 
--export([post_config_update/5, pre_config_update/3]).
+-export([pre_config_update/4, post_config_update/5]).
 
 -export([
     maybe_read_source_files/1,
@@ -194,8 +194,8 @@ update({?CMD_DELETE, Type}, Sources) ->
 update(Cmd, Sources) ->
     emqx_authz_utils:update_config(?CONF_KEY_PATH, {Cmd, Sources}).
 
-pre_config_update(Path, Cmd, Sources) ->
-    try do_pre_config_update(Path, Cmd, Sources) of
+pre_config_update(Path, Cmd, Sources, ClusterRpcOpts) ->
+    try do_pre_config_update(Path, Cmd, Sources, ClusterRpcOpts) of
         {error, Reason} -> {error, Reason};
         NSources -> {ok, NSources}
     catch
@@ -215,58 +215,64 @@ pre_config_update(Path, Cmd, Sources) ->
             {error, Reason}
     end.
 
-do_pre_config_update(?CONF_KEY_PATH, Cmd, Sources) ->
-    do_pre_config_update(Cmd, Sources);
-do_pre_config_update(?ROOT_KEY, {?CMD_MERGE, NewConf}, OldConf) ->
-    do_pre_config_merge(NewConf, OldConf);
-do_pre_config_update(?ROOT_KEY, NewConf, OldConf) ->
-    do_pre_config_replace(NewConf, OldConf).
+do_pre_config_update(?CONF_KEY_PATH, Cmd, Sources, Opts) ->
+    do_pre_config_update(Cmd, Sources, Opts);
+do_pre_config_update(?ROOT_KEY, {?CMD_MERGE, NewConf}, OldConf, Opts) ->
+    do_pre_config_merge(NewConf, OldConf, Opts);
+do_pre_config_update(?ROOT_KEY, NewConf, OldConf, Opts) ->
+    do_pre_config_replace(NewConf, OldConf, Opts).
 
-do_pre_config_merge(NewConf, OldConf) ->
+do_pre_config_merge(NewConf, OldConf, Opts) ->
     MergeConf = emqx_utils_maps:deep_merge(OldConf, NewConf),
     NewSources = merge_sources(OldConf, NewConf),
-    do_pre_config_replace(MergeConf#{<<"sources">> => NewSources}, OldConf).
+    do_pre_config_replace(MergeConf#{<<"sources">> => NewSources}, OldConf, Opts).
 
 %% override the entire config when updating the root key
 %% emqx_conf:update(?ROOT_KEY, Conf);
-do_pre_config_replace(Conf, Conf) ->
+do_pre_config_replace(Conf, Conf, _Opts) ->
     Conf;
-do_pre_config_replace(NewConf, OldConf) ->
+do_pre_config_replace(NewConf, OldConf, Opts) ->
     NewSources = get_sources(NewConf),
     OldSources = get_sources(OldConf),
-    ReplaceSources = do_pre_config_update({?CMD_REPLACE, NewSources}, OldSources),
+    ReplaceSources = do_pre_config_update({?CMD_REPLACE, NewSources}, OldSources, Opts),
     NewConf#{<<"sources">> => ReplaceSources}.
 
-do_pre_config_update({?CMD_MOVE, _, _} = Cmd, Sources) ->
+do_pre_config_update({?CMD_MOVE, _, _} = Cmd, Sources, _Opts) ->
     do_move(Cmd, Sources);
-do_pre_config_update({?CMD_PREPEND, Source}, Sources) ->
+do_pre_config_update({?CMD_PREPEND, Source}, Sources, _Opts) ->
     NSource = maybe_write_source_files(Source),
     NSources = [NSource] ++ Sources,
     ok = check_dup_types(NSources),
     NSources;
-do_pre_config_update({?CMD_APPEND, Source}, Sources) ->
+do_pre_config_update({?CMD_APPEND, Source}, Sources, _Opts) ->
     NSource = maybe_write_source_files(Source),
     NSources = Sources ++ [NSource],
     ok = check_dup_types(NSources),
     NSources;
-do_pre_config_update({{?CMD_REPLACE, Type}, Source}, Sources) ->
+do_pre_config_update({{?CMD_REPLACE, Type}, Source}, Sources, _Opts) ->
     NSource = maybe_write_source_files(Source),
     {_Old, Front, Rear} = take(Type, Sources),
     NSources = Front ++ [NSource | Rear],
     ok = check_dup_types(NSources),
     NSources;
-do_pre_config_update({{?CMD_DELETE, Type}, _Source}, Sources) ->
-    {_Old, Front, Rear} = take(Type, Sources),
-    NSources = Front ++ Rear,
-    NSources;
-do_pre_config_update({?CMD_REPLACE, Sources}, _OldSources) ->
+do_pre_config_update({{?CMD_DELETE, Type}, _Source}, Sources, Opts) ->
+    case type_take(Type, Sources) of
+        not_found ->
+            case emqx_cluster_rpc:is_initiator(Opts) of
+                true -> throw({not_found_source, Type});
+                false -> Sources
+            end;
+        {_Found, NSources} ->
+            NSources
+    end;
+do_pre_config_update({?CMD_REPLACE, Sources}, _OldSources, _Opts) ->
     %% overwrite the entire config!
     NSources = lists:map(fun maybe_write_source_files/1, Sources),
     ok = check_dup_types(NSources),
     NSources;
-do_pre_config_update({?CMD_REORDER, NewSourcesOrder}, OldSources) ->
+do_pre_config_update({?CMD_REORDER, NewSourcesOrder}, OldSources, _Opts) ->
     reorder_sources(NewSourcesOrder, OldSources);
-do_pre_config_update({Op, Source}, Sources) ->
+do_pre_config_update({Op, Source}, Sources, _Opts) ->
     throw({bad_request, #{op => Op, source => Source, sources => Sources}}).
 
 post_config_update(_, _, undefined, _OldSource, _AppEnvs) ->
@@ -293,9 +299,13 @@ do_post_config_update(?CONF_KEY_PATH, {{?CMD_REPLACE, Type}, RawNewSource}, Sour
     Front ++ [InitedSources] ++ Rear;
 do_post_config_update(?CONF_KEY_PATH, {{?CMD_DELETE, Type}, _RawNewSource}, _Sources) ->
     OldInitedSources = lookup(),
-    {OldSource, Front, Rear} = take(Type, OldInitedSources),
-    ok = ensure_deleted(OldSource, #{clear_metric => true}),
-    Front ++ Rear;
+    case type_take(Type, OldInitedSources) of
+        not_found ->
+            OldInitedSources;
+        {Found, NSources} ->
+            ok = ensure_deleted(Found, #{clear_metric => true}),
+            NSources
+    end;
 do_post_config_update(?CONF_KEY_PATH, {?CMD_REPLACE, _RawNewSources}, Sources) ->
     overwrite_entire_sources(Sources);
 do_post_config_update(?CONF_KEY_PATH, {?CMD_REORDER, NewSourcesOrder}, _Sources) ->

+ 9 - 22
apps/emqx_conf/src/emqx_cluster_rpc.erl

@@ -27,6 +27,7 @@
     query/1,
     reset/0,
     status/0,
+    is_initiator/1,
     skip_failed_commit/1,
     fast_forward_to_commit/2,
     on_mria_stop/1,
@@ -66,6 +67,7 @@
 -export_type([tnx_id/0, succeed_num/0]).
 
 -include_lib("emqx/include/logger.hrl").
+-include_lib("emqx/include/emqx.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include("emqx_conf.hrl").
 
@@ -78,8 +80,6 @@
 -define(INITIATE(MFA), {initiate, MFA}).
 -define(CATCH_UP, catch_up).
 -define(TIMEOUT, timer:minutes(1)).
--define(APPLY_KIND_REPLICATE, replicate).
--define(APPLY_KIND_INITIATE, initiate).
 -define(IS_STATUS(_A_), (_A_ =:= peers_lagging orelse _A_ =:= stopped_nodes)).
 
 -type tnx_id() :: pos_integer().
@@ -224,6 +224,9 @@ reset() -> gen_server:call(?MODULE, reset).
 status() ->
     transaction(fun ?MODULE:trans_status/0, []).
 
+is_initiator(Opts) ->
+    ?KIND_INITIATE =:= maps:get(kind, Opts, ?KIND_INITIATE).
+
 %% DO NOT delete this on_leave_clean/0, It's use when rpc before v560.
 on_leave_clean() ->
     on_leave_clean(node()).
@@ -398,7 +401,7 @@ catch_up(#{node := Node, retry_interval := RetryMs, is_leaving := false} = State
             ?tp(cluster_rpc_caught_up, #{}),
             ?TIMEOUT;
         {atomic, {still_lagging, NextId, MFA}} ->
-            {Succeed, _} = apply_mfa(NextId, MFA, ?APPLY_KIND_REPLICATE),
+            {Succeed, _} = apply_mfa(NextId, MFA, ?KIND_REPLICATE),
             case Succeed orelse SkipResult of
                 true ->
                     case transaction(fun ?MODULE:commit/2, [Node, NextId]) of
@@ -520,7 +523,7 @@ init_mfa(Node, MFA) ->
             },
             ok = mnesia:write(?CLUSTER_MFA, MFARec, write),
             ok = commit(Node, TnxId),
-            case apply_mfa(TnxId, MFA, ?APPLY_KIND_INITIATE) of
+            case apply_mfa(TnxId, MFA, ?KIND_INITIATE) of
                 {true, Result} -> {ok, TnxId, Result};
                 {false, Error} -> mnesia:abort(Error)
             end;
@@ -571,23 +574,7 @@ trans_query(TnxId) ->
 apply_mfa(TnxId, {M, F, A}, Kind) ->
     Res =
         try
-            case erlang:apply(M, F, A) of
-                {error, {post_config_update, HandlerName, {Reason0, PostFailureFun}}} when
-                    Kind =/= ?APPLY_KIND_INITIATE
-                ->
-                    ?SLOG(error, #{
-                        msg => "post_config_update_failed",
-                        handler => HandlerName,
-                        reason => Reason0
-                    }),
-                    PostFailureFun();
-                {error, {post_config_update, HandlerName, {Reason0, _Fun}}} when
-                    Kind =:= ?APPLY_KIND_INITIATE
-                ->
-                    {error, {post_config_update, HandlerName, Reason0}};
-                Result ->
-                    Result
-            end
+            erlang:apply(M, F, A ++ [#{kind => Kind}])
         catch
             throw:Reason ->
                 {error, #{reason => Reason}};
@@ -607,7 +594,7 @@ is_success(ok) -> true;
 is_success({ok, _}) -> true;
 is_success(_) -> false.
 
-log_and_alarm(IsSuccess, Res, #{kind := ?APPLY_KIND_INITIATE} = Meta) ->
+log_and_alarm(IsSuccess, Res, #{kind := ?KIND_INITIATE} = Meta) ->
     %% no alarm or error log in case of failure at originating a new cluster-call
     %% because nothing is committed
     case IsSuccess of

+ 12 - 9
apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl

@@ -234,7 +234,7 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
     {atomic, [_Status | L]} = emqx_cluster_rpc:status(),
     ?assertEqual([], L),
     ets:insert(test, {other_mfa_result, ok}),
-    {ok, 2, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], 1, 1000),
+    {ok, 2, ok} = multicall(?MODULE, format, ["format:~p~n", [?FUNCTION_NAME]], 1, 1000),
     ct:sleep(1000),
     {atomic, NewStatus} = emqx_cluster_rpc:status(),
     ?assertEqual(3, length(NewStatus)),
@@ -251,7 +251,7 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
 
 t_del_stale_mfa(_Config) ->
     {atomic, []} = emqx_cluster_rpc:status(),
-    MFA = {M, F, A} = {io, format, ["format:~p~n", [?FUNCTION_NAME]]},
+    MFA = {M, F, A} = {?MODULE, format, ["format:~p~n", [?FUNCTION_NAME]]},
     Keys = lists:seq(1, 50),
     Keys2 = lists:seq(51, 150),
     Ids =
@@ -292,7 +292,7 @@ t_del_stale_mfa(_Config) ->
 
 t_skip_failed_commit(_Config) ->
     {atomic, []} = emqx_cluster_rpc:status(),
-    {ok, 1, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000),
+    {ok, 1, ok} = multicall(?MODULE, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000),
     ct:sleep(180),
     {atomic, List1} = emqx_cluster_rpc:status(),
     Node = node(),
@@ -312,7 +312,7 @@ t_skip_failed_commit(_Config) ->
 
 t_fast_forward_commit(_Config) ->
     {atomic, []} = emqx_cluster_rpc:status(),
-    {ok, 1, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000),
+    {ok, 1, ok} = multicall(?MODULE, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000),
     ct:sleep(180),
     {atomic, List1} = emqx_cluster_rpc:status(),
     Node = node(),
@@ -393,22 +393,25 @@ receive_msg(Count, Msg) when Count > 0 ->
         {Msg, flush_msg([])}
     end.
 
-echo(Pid, Msg) ->
+echo(Pid, Msg, _) ->
     erlang:send(Pid, Msg),
     ok.
 
-echo_delay(Pid, Msg) ->
+format(Fmt, Args, _Opts) ->
+    io:format(Fmt, Args).
+
+echo_delay(Pid, Msg, _) ->
     timer:sleep(rand:uniform(150)),
     erlang:send(Pid, {msg, Msg, erlang:system_time(), self()}),
     ok.
 
-failed_on_node(Pid) ->
+failed_on_node(Pid, _) ->
     case Pid =:= self() of
         true -> ok;
         false -> "MFA return not ok"
     end.
 
-failed_on_node_by_odd(Pid) ->
+failed_on_node_by_odd(Pid, _) ->
     case Pid =:= self() of
         true ->
             ok;
@@ -421,7 +424,7 @@ failed_on_node_by_odd(Pid) ->
             end
     end.
 
-failed_on_other_recover_after_retry(Pid) ->
+failed_on_other_recover_after_retry(Pid, _) ->
     case Pid =:= self() of
         true ->
             ok;

+ 20 - 9
apps/emqx_management/src/emqx_mgmt_api_plugins.erl

@@ -47,8 +47,10 @@
     get_plugins/0,
     install_package/2,
     delete_package/1,
+    delete_package/2,
     describe_package/1,
     ensure_action/2,
+    ensure_action/3,
     do_update_plugin_config/3
 ]).
 
@@ -399,7 +401,7 @@ validate_name(Name) ->
 %% API CallBack Begin
 list_plugins(get, _) ->
     Nodes = emqx:running_nodes(),
-    {Plugins, []} = emqx_mgmt_api_plugins_proto_v2:get_plugins(Nodes),
+    {Plugins, []} = emqx_mgmt_api_plugins_proto_v3:get_plugins(Nodes),
     {200, format_plugins(Plugins)}.
 
 get_plugins() ->
@@ -451,7 +453,7 @@ upload_install(post, #{}) ->
 do_install_package(FileName, Bin) ->
     %% TODO: handle bad nodes
     Nodes = emqx:running_nodes(),
-    {[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v2:install_package(Nodes, FileName, Bin),
+    {[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v3:install_package(Nodes, FileName, Bin),
     case lists:filter(fun(R) -> R =/= ok end, Res) of
         [] ->
             {204};
@@ -477,17 +479,17 @@ do_install_package(FileName, Bin) ->
 
 plugin(get, #{bindings := #{name := Name}}) ->
     Nodes = emqx:running_nodes(),
-    {Plugins, _} = emqx_mgmt_api_plugins_proto_v2:describe_package(Nodes, Name),
+    {Plugins, _} = emqx_mgmt_api_plugins_proto_v3:describe_package(Nodes, Name),
     case format_plugins(Plugins) of
         [Plugin] -> {200, Plugin};
         [] -> {404, #{code => 'NOT_FOUND', message => Name}}
     end;
 plugin(delete, #{bindings := #{name := Name}}) ->
-    Res = emqx_mgmt_api_plugins_proto_v2:delete_package(Name),
+    Res = emqx_mgmt_api_plugins_proto_v3:delete_package(Name),
     return(204, Res).
 
 update_plugin(put, #{bindings := #{name := Name, action := Action}}) ->
-    Res = emqx_mgmt_api_plugins_proto_v2:ensure_action(Name, Action),
+    Res = emqx_mgmt_api_plugins_proto_v3:ensure_action(Name, Action),
     return(204, Res).
 
 plugin_config(get, #{bindings := #{name := NameVsn}}) ->
@@ -583,8 +585,12 @@ describe_package(NameVsn) ->
         _ -> {Node, []}
     end.
 
-%% For RPC plugin delete
+%% Tip: Don't delete delete_package/1, use before v571 cluster_rpc
 delete_package(Name) ->
+    delete_package(Name, #{}).
+
+%% For RPC plugin delete
+delete_package(Name, _Opts) ->
     case emqx_plugins:ensure_stopped(Name) of
         ok ->
             _ = emqx_plugins:ensure_disabled(Name),
@@ -595,19 +601,24 @@ delete_package(Name) ->
             Error
     end.
 
+%% Tip: Don't delete ensure_action/2, use before v571 cluster_rpc
+ensure_action(Name, Action) ->
+    ensure_action(Name, Action, #{}).
+
 %% for RPC plugin update
 %% TODO: catch thrown error to return 400
 %% - plugin_not_found
 %% - otp vsn assertion failed
-ensure_action(Name, start) ->
+
+ensure_action(Name, start, _Opts) ->
     _ = emqx_plugins:ensure_started(Name),
     _ = emqx_plugins:ensure_enabled(Name),
     ok;
-ensure_action(Name, stop) ->
+ensure_action(Name, stop, _Opts) ->
     _ = emqx_plugins:ensure_stopped(Name),
     _ = emqx_plugins:ensure_disabled(Name),
     ok;
-ensure_action(Name, restart) ->
+ensure_action(Name, restart, _Opts) ->
     _ = emqx_plugins:ensure_enabled(Name),
     _ = emqx_plugins:restart(Name),
     ok.

+ 3 - 1
apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl

@@ -97,7 +97,7 @@ t_broker(_Config) ->
 t_cluster(_Config) ->
     SelfNode = node(),
     FakeNode = 'fake@127.0.0.1',
-    MFA = {io, format, [""]},
+    MFA = {?MODULE, format, [""]},
     meck:new(mria_mnesia, [non_strict, passthrough, no_link]),
     meck:expect(mria_mnesia, running_nodes, 0, [SelfNode, FakeNode]),
     {atomic, {ok, TnxId, _}} =
@@ -353,3 +353,5 @@ t_autocluster_leave(Config) ->
             10_000
         )
     ).
+
+format(Str, Opts) -> io:format("str:~s: Opts:~p", [Str, Opts]).

+ 6 - 0
apps/emqx_resource/src/emqx_resource.erl

@@ -50,6 +50,7 @@
     remove_local/1,
     reset_metrics/1,
     reset_metrics_local/1,
+    reset_metrics_local/2,
     %% Create metrics for a resource ID
     create_metrics/1,
     %% Delete metrics for a resource ID
@@ -337,8 +338,13 @@ remove_local(ResId) ->
             ok
     end.
 
+%% Tip: Don't delete reset_metrics_local/1, use before v571 rpc
 -spec reset_metrics_local(resource_id()) -> ok.
 reset_metrics_local(ResId) ->
+    reset_metrics_local(ResId, #{}).
+
+-spec reset_metrics_local(resource_id(), map()) -> ok.
+reset_metrics_local(ResId, _ClusterOpts) ->
     emqx_resource_manager:reset_metrics(ResId).
 
 -spec reset_metrics(resource_id()) -> ok | {error, Reason :: term()}.

+ 7 - 1
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -56,7 +56,8 @@
     unload_hooks_for_rule/1,
     maybe_add_metrics_for_rule/1,
     clear_metrics_for_rule/1,
-    reset_metrics_for_rule/1
+    reset_metrics_for_rule/1,
+    reset_metrics_for_rule/2
 ]).
 
 %% exported for `emqx_telemetry'
@@ -302,8 +303,13 @@ maybe_add_metrics_for_rule(Id) ->
 clear_metrics_for_rule(Id) ->
     ok = emqx_metrics_worker:clear_metrics(rule_metrics, Id).
 
+%% Tip: Don't delete reset_metrics_for_rule/1, use before v571 rpc
 -spec reset_metrics_for_rule(rule_id()) -> ok.
 reset_metrics_for_rule(Id) ->
+    reset_metrics_for_rule(Id, #{}).
+
+-spec reset_metrics_for_rule(rule_id(), map()) -> ok.
+reset_metrics_for_rule(Id, _Opts) ->
     emqx_metrics_worker:reset_metrics(rule_metrics, Id).
 
 unload_hooks_for_rule(#{id := Id, from := Topics}) ->