Просмотр исходного кода

Merge pull request #10822 from zhongwencool/authz-improve

feat: support emqx_config:update([authrization],Conf) update
zhongwencool 2 лет назад
Родитель
Сommit
9d8a5716ec

+ 22 - 6
apps/emqx/src/emqx.erl

@@ -239,14 +239,30 @@ remove_config([RootName | _] = KeyPath, Opts) ->
 
 -spec reset_config(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
     {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
-reset_config([RootName | _] = KeyPath, Opts) ->
+reset_config([RootName | SubKeys] = KeyPath, Opts) ->
     case emqx_config:get_default_value(KeyPath) of
         {ok, Default} ->
-            emqx_config_handler:update_config(
-                emqx_config:get_schema_mod(RootName),
-                KeyPath,
-                {{update, Default}, Opts}
-            );
+            Mod = emqx_config:get_schema_mod(RootName),
+            case SubKeys =:= [] of
+                true ->
+                    emqx_config_handler:update_config(
+                        Mod,
+                        KeyPath,
+                        {{update, Default}, Opts}
+                    );
+                false ->
+                    NewConf =
+                        emqx_utils_maps:deep_put(
+                            SubKeys,
+                            emqx_config:get_raw([RootName], #{}),
+                            Default
+                        ),
+                    emqx_config_handler:update_config(
+                        Mod,
+                        [RootName],
+                        {{update, NewConf}, Opts}
+                    )
+            end;
         {error, _} = Error ->
             Error
     end.

+ 11 - 27
apps/emqx/src/emqx_config_handler.erl

@@ -31,8 +31,7 @@
     remove_handler/1,
     update_config/3,
     get_raw_cluster_override_conf/0,
-    info/0,
-    merge_to_old_config/2
+    info/0
 ]).
 
 %% gen_server callbacks
@@ -332,31 +331,16 @@ do_post_config_update(
     SubOldConf = get_sub_config(ConfKey, OldConf),
     SubNewConf = get_sub_config(ConfKey, NewConf),
     SubHandlers = get_sub_handlers(ConfKey, Handlers),
-    case
-        do_post_config_update(
-            SubConfKeyPath,
-            SubHandlers,
-            SubOldConf,
-            SubNewConf,
-            AppEnvs,
-            UpdateArgs,
-            Result,
-            ConfKeyPath
-        )
-    of
-        {ok, Result1} ->
-            call_post_config_update(
-                Handlers,
-                OldConf,
-                NewConf,
-                AppEnvs,
-                up_req(UpdateArgs),
-                Result1,
-                ConfKeyPath
-            );
-        Error ->
-            Error
-    end.
+    do_post_config_update(
+        SubConfKeyPath,
+        SubHandlers,
+        SubOldConf,
+        SubNewConf,
+        AppEnvs,
+        UpdateArgs,
+        Result,
+        ConfKeyPath
+    ).
 
 get_sub_handlers(ConfKey, Handlers) ->
     case maps:find(ConfKey, Handlers) of

+ 2 - 2
apps/emqx/test/emqx_common_test_helpers.erl

@@ -277,7 +277,7 @@ wait_for_app_processes(_) ->
 %% and stop others, and then the `application:start/2' callback is
 %% never called again for this application.
 perform_sanity_checks(emqx_rule_engine) ->
-    ensure_config_handler(emqx_rule_engine, [rule_engine, rules]),
+    ensure_config_handler(emqx_rule_engine, [rule_engine, rules, '?']),
     ok;
 perform_sanity_checks(emqx_bridge) ->
     ensure_config_handler(emqx_bridge, [bridges]),
@@ -289,7 +289,7 @@ ensure_config_handler(Module, ConfigPath) ->
     #{handlers := Handlers} = sys:get_state(emqx_config_handler),
     case emqx_utils_maps:deep_get(ConfigPath, Handlers, not_found) of
         #{{mod} := Module} -> ok;
-        _NotFound -> error({config_handler_missing, ConfigPath, Module})
+        NotFound -> error({config_handler_missing, ConfigPath, Module, NotFound})
     end,
     ok.
 

+ 2 - 52
apps/emqx/test/emqx_config_handler_SUITE.erl

@@ -130,7 +130,7 @@ t_root_key_update(_Config) ->
     ?assertEqual(
         {ok, #{
             config => 0.81,
-            post_config_update => #{?MODULE => ok},
+            post_config_update => #{},
             raw_config => <<"81%">>
         }},
         emqx:update_config(SubKey, "81%", Opts)
@@ -174,7 +174,7 @@ t_sub_key_update_remove(_Config) ->
 
     %% remove
     ?assertEqual(
-        {ok, #{post_config_update => #{emqx_config_handler_SUITE => ok}}},
+        {ok, #{post_config_update => #{?MODULE => ok}}},
         emqx:remove_config(KeyPath)
     ),
     ?assertError(
@@ -184,18 +184,6 @@ t_sub_key_update_remove(_Config) ->
     ?assertEqual(false, lists:member(<<"cpu_check_interval">>, OSKey)),
     ?assert(length(OSKey) > 0),
 
-    ?assertEqual(
-        {ok, #{
-            config => 60000,
-            post_config_update => #{?MODULE => ok},
-            raw_config => <<"60s">>
-        }},
-        emqx:reset_config(KeyPath, Opts)
-    ),
-    OSKey1 = maps:keys(emqx:get_raw_config([sysmon, os])),
-    ?assertEqual(true, lists:member(<<"cpu_check_interval">>, OSKey1)),
-    ?assert(length(OSKey1) > 1),
-
     ok = emqx_config_handler:remove_handler(KeyPath),
     ok = emqx_config_handler:remove_handler(KeyPath2),
     ok.
@@ -292,44 +280,6 @@ t_get_raw_cluster_override_conf(_Config) ->
     ?assertEqual(OldInfo, NewInfo),
     ok.
 
-t_save_config_failed(_Config) ->
-    ok.
-
-t_update_sub(_Config) ->
-    PathKey = [sysmon],
-    Opts = #{rawconf_with_defaults => true},
-    ok = emqx_config_handler:add_handler(PathKey, ?MODULE),
-    %% update sub key
-    #{<<"os">> := OS1} = emqx:get_raw_config(PathKey),
-    {ok, Res} = emqx:update_config(PathKey ++ [os, cpu_check_interval], <<"120s">>, Opts),
-    ?assertMatch(
-        #{
-            config := 120000,
-            post_config_update := #{?MODULE := ok},
-            raw_config := <<"120s">>
-        },
-        Res
-    ),
-    ?assertMatch(#{os := #{cpu_check_interval := 120000}}, emqx:get_config(PathKey)),
-    #{<<"os">> := OS2} = emqx:get_raw_config(PathKey),
-    ?assertEqual(lists:sort(maps:keys(OS1)), lists:sort(maps:keys(OS2))),
-
-    %% update sub key
-    SubKey = PathKey ++ [os, cpu_high_watermark],
-    ?assertEqual(
-        {ok, #{
-            config => 0.81,
-            post_config_update => #{?MODULE => ok},
-            raw_config => <<"81%">>
-        }},
-        emqx:update_config(SubKey, "81%", Opts)
-    ),
-    ?assertEqual(0.81, emqx:get_config(SubKey)),
-    ?assertEqual("81%", emqx:get_raw_config(SubKey)),
-
-    ok = emqx_config_handler:remove_handler(PathKey),
-    ok.
-
 pre_config_update([sysmon], UpdateReq, _RawConf) ->
     {ok, UpdateReq};
 pre_config_update([sysmon, os], UpdateReq, _RawConf) ->

+ 1 - 0
apps/emqx_authz/include/emqx_authz.hrl

@@ -43,6 +43,7 @@
 -define(CMD_MOVE_BEFORE(Before), {before, Before}).
 -define(CMD_MOVE_AFTER(After), {'after', After}).
 
+-define(ROOT_KEY, [authorization]).
 -define(CONF_KEY_PATH, [authorization, sources]).
 
 -define(RE_PLACEHOLDER, "\\$\\{[a-z0-9_]+\\}").

+ 1 - 1
apps/emqx_authz/src/emqx_authz.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_authz, [
     {description, "An OTP application"},
-    {vsn, "0.1.21"},
+    {vsn, "0.1.22"},
     {registered, []},
     {mod, {emqx_authz_app, []}},
     {applications, [

+ 76 - 39
apps/emqx_authz/src/emqx_authz.erl

@@ -101,6 +101,7 @@ init() ->
     ok = register_metrics(),
     ok = init_metrics(client_info_source()),
     emqx_conf:add_handler(?CONF_KEY_PATH, ?MODULE),
+    emqx_conf:add_handler(?ROOT_KEY, ?MODULE),
     Sources = emqx_conf:get(?CONF_KEY_PATH, []),
     ok = check_dup_types(Sources),
     NSources = create_sources(Sources),
@@ -109,6 +110,7 @@ init() ->
 deinit() ->
     ok = emqx_hooks:del('client.authorize', {?MODULE, authorize}),
     emqx_conf:remove_handler(?CONF_KEY_PATH),
+    emqx_conf:remove_handler(?ROOT_KEY),
     emqx_authz_utils:cleanup_resources().
 
 lookup() ->
@@ -139,14 +141,29 @@ update({?CMD_DELETE, Type}, Sources) ->
 update(Cmd, Sources) ->
     emqx_authz_utils:update_config(?CONF_KEY_PATH, {Cmd, Sources}).
 
-pre_config_update(_, Cmd, Sources) ->
-    try do_pre_config_update(Cmd, Sources) of
+pre_config_update(Path, Cmd, Sources) ->
+    try do_pre_config_update(Path, Cmd, Sources) of
         {error, Reason} -> {error, Reason};
         NSources -> {ok, NSources}
     catch
         _:Reason -> {error, Reason}
     end.
 
+do_pre_config_update(?CONF_KEY_PATH, Cmd, Sources) ->
+    do_pre_config_update(Cmd, Sources);
+do_pre_config_update(?ROOT_KEY, NewConf, OldConf) ->
+    do_pre_config_replace(NewConf, OldConf).
+
+%% override the entire config when updating the root key
+%% emqx_conf:update(?ROOT_KEY, Conf);
+do_pre_config_replace(Conf, Conf) ->
+    Conf;
+do_pre_config_replace(NewConf, OldConf) ->
+    #{<<"sources">> := NewSources} = NewConf,
+    #{<<"sources">> := OldSources} = OldConf,
+    NewSources1 = do_pre_config_update({?CMD_REPLACE, NewSources}, OldSources),
+    NewConf#{<<"sources">> := NewSources1}.
+
 do_pre_config_update({?CMD_MOVE, _, _} = Cmd, Sources) ->
     do_move(Cmd, Sources);
 do_pre_config_update({?CMD_PREPEND, Source}, Sources) ->
@@ -179,47 +196,53 @@ do_pre_config_update({Op, Source}, Sources) ->
 
 post_config_update(_, _, undefined, _OldSource, _AppEnvs) ->
     ok;
-post_config_update(_, Cmd, NewSources, _OldSource, _AppEnvs) ->
-    Actions = do_post_config_update(Cmd, NewSources),
+post_config_update(Path, Cmd, NewSources, _OldSource, _AppEnvs) ->
+    Actions = do_post_config_update(Path, Cmd, NewSources),
     ok = update_authz_chain(Actions),
     ok = emqx_authz_cache:drain_cache().
 
-do_post_config_update({?CMD_MOVE, _Type, _Where} = Cmd, _Sources) ->
-    InitedSources = lookup(),
-    do_move(Cmd, InitedSources);
-do_post_config_update({?CMD_PREPEND, RawNewSource}, Sources) ->
-    InitedNewSource = create_source(get_source_by_type(type(RawNewSource), Sources)),
-    %% create metrics
+do_post_config_update(?CONF_KEY_PATH, {?CMD_MOVE, _Type, _Where} = Cmd, _Sources) ->
+    do_move(Cmd, lookup());
+do_post_config_update(?CONF_KEY_PATH, {?CMD_PREPEND, RawNewSource}, Sources) ->
     TypeName = type(RawNewSource),
-    ok = emqx_metrics_worker:create_metrics(
-        authz_metrics,
-        TypeName,
-        [total, allow, deny, nomatch],
-        [total]
-    ),
-    [InitedNewSource] ++ lookup();
-do_post_config_update({?CMD_APPEND, RawNewSource}, Sources) ->
-    InitedNewSource = create_source(get_source_by_type(type(RawNewSource), Sources)),
-    lookup() ++ [InitedNewSource];
-do_post_config_update({{?CMD_REPLACE, Type}, RawNewSource}, Sources) ->
+    NewSources = create_sources([get_source_by_type(TypeName, Sources)]),
+    NewSources ++ lookup();
+do_post_config_update(?CONF_KEY_PATH, {?CMD_APPEND, RawNewSource}, Sources) ->
+    NewSources = create_sources([get_source_by_type(type(RawNewSource), Sources)]),
+    lookup() ++ NewSources;
+do_post_config_update(?CONF_KEY_PATH, {{?CMD_REPLACE, Type}, RawNewSource}, Sources) ->
     OldSources = lookup(),
     {OldSource, Front, Rear} = take(Type, OldSources),
     NewSource = get_source_by_type(type(RawNewSource), Sources),
     InitedSources = update_source(type(RawNewSource), OldSource, NewSource),
     Front ++ [InitedSources] ++ Rear;
-do_post_config_update({{?CMD_DELETE, Type}, _RawNewSource}, _Sources) ->
+do_post_config_update(?CONF_KEY_PATH, {{?CMD_DELETE, Type}, _RawNewSource}, _Sources) ->
     OldInitedSources = lookup(),
     {OldSource, Front, Rear} = take(Type, OldInitedSources),
-    %% delete metrics
-    ok = emqx_metrics_worker:clear_metrics(authz_metrics, Type),
-    ok = ensure_resource_deleted(OldSource),
-    clear_certs(OldSource),
+    ok = ensure_deleted(OldSource, #{clear_metric => true}),
     Front ++ Rear;
-do_post_config_update({?CMD_REPLACE, _RawNewSources}, Sources) ->
-    %% overwrite the entire config!
-    OldInitedSources = lookup(),
-    lists:foreach(fun ensure_resource_deleted/1, OldInitedSources),
-    lists:foreach(fun clear_certs/1, OldInitedSources),
+do_post_config_update(?CONF_KEY_PATH, {?CMD_REPLACE, _RawNewSources}, Sources) ->
+    overwrite_entire_sources(Sources);
+do_post_config_update(?ROOT_KEY, Conf, Conf) ->
+    #{sources := Sources} = Conf,
+    Sources;
+do_post_config_update(?ROOT_KEY, _Conf, NewConf) ->
+    #{sources := NewSources} = NewConf,
+    overwrite_entire_sources(NewSources).
+
+overwrite_entire_sources(Sources) ->
+    PrevSources = lookup(),
+    NewSourcesTypes = lists:map(fun type/1, Sources),
+    EnsureDelete = fun(S) ->
+        TypeName = type(S),
+        Opts =
+            case lists:member(TypeName, NewSourcesTypes) of
+                true -> #{clear_metric => false};
+                false -> #{clear_metric => true}
+            end,
+        ensure_deleted(S, Opts)
+    end,
+    lists:foreach(EnsureDelete, PrevSources),
     create_sources(Sources).
 
 %% @doc do source move
@@ -238,8 +261,14 @@ do_move({?CMD_MOVE, Type, ?CMD_MOVE_AFTER(After)}, Sources) ->
     {S2, Front2, Rear2} = take(After, Front1 ++ Rear1),
     Front2 ++ [S2, S1] ++ Rear2.
 
-ensure_resource_deleted(#{enable := false}) ->
+ensure_deleted(#{enable := false}, _) ->
     ok;
+ensure_deleted(Source, #{clear_metric := ClearMetric}) ->
+    TypeName = type(Source),
+    ensure_resource_deleted(Source),
+    clear_certs(Source),
+    ClearMetric andalso emqx_metrics_worker:clear_metrics(authz_metrics, TypeName).
+
 ensure_resource_deleted(#{type := Type} = Source) ->
     Module = authz_module(Type),
     Module:destroy(Source).
@@ -287,12 +316,18 @@ update_source(Type, OldSource, NewSource) ->
 
 init_metrics(Source) ->
     TypeName = type(Source),
-    emqx_metrics_worker:create_metrics(
-        authz_metrics,
-        TypeName,
-        [total, allow, deny, nomatch],
-        [total]
-    ).
+    case emqx_metrics_worker:has_metrics(authz_metrics, TypeName) of
+        %% Don't reset the metrics if it already exists
+        true ->
+            ok;
+        false ->
+            emqx_metrics_worker:create_metrics(
+                authz_metrics,
+                TypeName,
+                [total, allow, deny, nomatch],
+                [total]
+            )
+    end.
 
 %%--------------------------------------------------------------------
 %% AuthZ callbacks
@@ -487,7 +522,9 @@ write_acl_file(#{<<"rules">> := Rules} = Source0) ->
     ok = check_acl_file_rules(AclPath, Rules),
     ok = write_file(AclPath, Rules),
     Source1 = maps:remove(<<"rules">>, Source0),
-    maps:put(<<"path">>, AclPath, Source1).
+    maps:put(<<"path">>, AclPath, Source1);
+write_acl_file(Source) ->
+    Source.
 
 %% @doc where the acl.conf file is stored.
 acl_conf_file() ->

+ 71 - 0
apps/emqx_authz/test/emqx_authz_SUITE.erl

@@ -272,9 +272,80 @@ t_update_source(_) ->
         ],
         emqx_conf:get([authorization, sources], [])
     ),
+    ?assertMatch(
+        [
+            #{type := http, enable := false},
+            #{type := mongodb, enable := false},
+            #{type := mysql, enable := false},
+            #{type := postgresql, enable := false},
+            #{type := redis, enable := false},
+            #{type := file, enable := false}
+        ],
+        emqx_authz:lookup()
+    ),
 
     {ok, _} = emqx_authz:update(?CMD_REPLACE, []).
 
+t_replace_all(_) ->
+    RootKey = [<<"authorization">>],
+    Conf = emqx:get_raw_config(RootKey),
+    emqx_authz_utils:update_config(RootKey, Conf#{
+        <<"sources">> => [
+            ?SOURCE6, ?SOURCE5, ?SOURCE4, ?SOURCE3, ?SOURCE2, ?SOURCE1
+        ]
+    }),
+    %% config
+    ?assertMatch(
+        [
+            #{type := file, enable := true},
+            #{type := redis, enable := true},
+            #{type := postgresql, enable := true},
+            #{type := mysql, enable := true},
+            #{type := mongodb, enable := true},
+            #{type := http, enable := true}
+        ],
+        emqx_conf:get([authorization, sources], [])
+    ),
+    %% hooks status
+    ?assertMatch(
+        [
+            #{type := file, enable := true},
+            #{type := redis, enable := true},
+            #{type := postgresql, enable := true},
+            #{type := mysql, enable := true},
+            #{type := mongodb, enable := true},
+            #{type := http, enable := true}
+        ],
+        emqx_authz:lookup()
+    ),
+    Ids = [http, mongodb, mysql, postgresql, redis, file],
+    %% metrics
+    lists:foreach(
+        fun(Id) ->
+            ?assert(emqx_metrics_worker:has_metrics(authz_metrics, Id), Id)
+        end,
+        Ids
+    ),
+
+    ?assertMatch(
+        {ok, _},
+        emqx_authz_utils:update_config(
+            RootKey,
+            Conf#{<<"sources">> => [?SOURCE1#{<<"enable">> => false}]}
+        )
+    ),
+    %% hooks status
+    ?assertMatch([#{type := http, enable := false}], emqx_authz:lookup()),
+    %% metrics
+    ?assert(emqx_metrics_worker:has_metrics(authz_metrics, http)),
+    lists:foreach(
+        fun(Id) ->
+            ?assertNot(emqx_metrics_worker:has_metrics(authz_metrics, Id), Id)
+        end,
+        Ids -- [http]
+    ),
+    ok.
+
 t_delete_source(_) ->
     {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE1]),
 

+ 10 - 5
apps/emqx_authz/test/emqx_authz_test_lib.erl

@@ -25,21 +25,26 @@
 -define(DEFAULT_CHECK_AVAIL_TIMEOUT, 1000).
 
 reset_authorizers() ->
-    reset_authorizers(deny, false).
+    reset_authorizers(deny, false, []).
 
 restore_authorizers() ->
-    reset_authorizers(allow, true).
+    reset_authorizers(allow, true, []).
 
-reset_authorizers(Nomatch, ChacheEnabled) ->
+reset_authorizers(Nomatch, CacheEnabled, Source) ->
     {ok, _} = emqx:update_config(
         [authorization],
         #{
             <<"no_match">> => atom_to_binary(Nomatch),
-            <<"cache">> => #{<<"enable">> => atom_to_binary(ChacheEnabled)},
-            <<"sources">> => []
+            <<"cache">> => #{<<"enable">> => atom_to_binary(CacheEnabled)},
+            <<"sources">> => Source
         }
     ),
     ok.
+%% Don't reset sources
+reset_authorizers(Nomatch, CacheEnabled) ->
+    {ok, _} = emqx:update_config([<<"authorization">>, <<"no_match">>], Nomatch),
+    {ok, _} = emqx:update_config([<<"authorization">>, <<"cache">>, <<"enable">>], CacheEnabled),
+    ok.
 
 setup_config(BaseConfig, SpecialParams) ->
     Config = maps:merge(BaseConfig, SpecialParams),

+ 6 - 1
apps/emqx_bridge/src/emqx_bridge.erl

@@ -39,7 +39,8 @@
     disable_enable/3,
     remove/2,
     check_deps_and_remove/3,
-    list/0
+    list/0,
+    reload_hook/1
 ]).
 
 -export([
@@ -133,6 +134,10 @@ safe_load_bridge(Type, Name, Conf, Opts) ->
             })
     end.
 
+reload_hook(Bridges) ->
+    ok = unload_hook(),
+    ok = load_hook(Bridges).
+
 load_hook() ->
     Bridges = emqx:get_config([bridges], #{}),
     load_hook(Bridges).

+ 25 - 3
apps/emqx_bridge/src/emqx_bridge_app.erl

@@ -69,10 +69,32 @@ pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) ->
             {ok, ConfNew}
     end.
 
-post_config_update(Path, '$remove', _, OldConf, _AppEnvs) ->
-    _ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf);
-post_config_update(Path, _Req, NewConf, OldConf, _AppEnvs) ->
+post_config_update([bridges, BridgeType, BridgeName] = Path, '$remove', _, OldConf, _AppEnvs) ->
+    _ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf),
+    ok = emqx_bridge_resource:remove(BridgeType, BridgeName),
+    Bridges = emqx_utils_maps:deep_remove([BridgeType, BridgeName], emqx:get_config([bridges])),
+    emqx_bridge:reload_hook(Bridges),
+    ?tp(bridge_post_config_update_done, #{}),
+    ok;
+post_config_update([bridges, BridgeType, BridgeName] = Path, _Req, NewConf, undefined, _AppEnvs) ->
+    _ = emqx_connector_ssl:try_clear_certs(filename:join(Path), NewConf, undefined),
+    ResOpts = emqx_resource:fetch_creation_opts(NewConf),
+    ok = emqx_bridge_resource:create(BridgeType, BridgeName, NewConf, ResOpts),
+    Bridges = emqx_utils_maps:deep_put(
+        [BridgeType, BridgeName], emqx:get_config([bridges]), NewConf
+    ),
+    emqx_bridge:reload_hook(Bridges),
+    ?tp(bridge_post_config_update_done, #{}),
+    ok;
+post_config_update([bridges, BridgeType, BridgeName] = Path, _Req, NewConf, OldConf, _AppEnvs) ->
     _ = emqx_connector_ssl:try_clear_certs(filename:join(Path), NewConf, OldConf),
+    ResOpts = emqx_resource:fetch_creation_opts(NewConf),
+    ok = emqx_bridge_resource:update(BridgeType, BridgeName, {OldConf, NewConf}, ResOpts),
+    Bridges = emqx_utils_maps:deep_put(
+        [BridgeType, BridgeName], emqx:get_config([bridges]), NewConf
+    ),
+    emqx_bridge:reload_hook(Bridges),
+    ?tp(bridge_post_config_update_done, #{}),
     ok.
 
 %% internal functions

+ 6 - 2
apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl

@@ -110,9 +110,13 @@ t_upload_error(Config) ->
     Name = "cool_name",
     Data = <<"data"/utf8>>,
 
-    {ok, _} = emqx_conf:update(
-        [file_transfer, storage, local, exporter, s3, bucket], <<"invalid-bucket">>, #{}
+    Conf = emqx_conf:get_raw([file_transfer], #{}),
+    Conf1 = emqx_utils_maps:deep_put(
+        [<<"storage">>, <<"local">>, <<"exporter">>, <<"s3">>, <<"bucket">>],
+        Conf,
+        <<"invalid-bucket">>
     ),
+    {ok, _} = emqx_conf:update([file_transfer], Conf1, #{}),
 
     ?assertEqual(
         {error, unspecified_error},

+ 3 - 0
apps/emqx_rule_engine/include/rule_engine.hrl

@@ -121,3 +121,6 @@
             false
     end)
 ).
+
+-define(KEY_PATH, [rule_engine, rules]).
+-define(RULE_PATH(RULE), [rule_engine, rules, RULE]).

+ 13 - 11
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -26,8 +26,7 @@
 -export([start_link/0]).
 
 -export([
-    post_config_update/5,
-    config_key_path/0
+    post_config_update/5
 ]).
 
 %% Rule Management
@@ -102,9 +101,6 @@
 
 -type action_name() :: binary() | #{function := binary()}.
 
-config_key_path() ->
-    [rule_engine, rules].
-
 -spec start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}.
 start_link() ->
     gen_server:start_link({local, ?RULE_ENGINE}, ?MODULE, [], []).
@@ -112,7 +108,13 @@ start_link() ->
 %%------------------------------------------------------------------------------
 %% The config handler for emqx_rule_engine
 %%------------------------------------------------------------------------------
-post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) ->
+post_config_update(?RULE_PATH(RuleId), _Req, NewRule, undefined, _AppEnvs) ->
+    create_rule(NewRule#{id => bin(RuleId)});
+post_config_update(?RULE_PATH(RuleId), '$remove', undefined, _OldRule, _AppEnvs) ->
+    delete_rule(bin(RuleId));
+post_config_update(?RULE_PATH(RuleId), _Req, NewRule, _OldRule, _AppEnvs) ->
+    update_rule(NewRule#{id => bin(RuleId)});
+post_config_update([rule_engine], _Req, #{rules := NewRules}, #{rules := OldRules}, _AppEnvs) ->
     #{added := Added, removed := Removed, changed := Updated} =
         emqx_utils_maps:diff_maps(NewRules, OldRules),
     try
@@ -134,7 +136,7 @@ post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) ->
             end,
             Added
         ),
-        {ok, get_rules()}
+        ok
     catch
         throw:#{kind := _} = Error ->
             {error, Error}
@@ -247,11 +249,11 @@ ensure_action_removed(RuleId, ActionName) ->
     case emqx:get_raw_config([rule_engine, rules, RuleId], not_found) of
         not_found ->
             ok;
-        #{<<"actions">> := Acts} ->
+        #{<<"actions">> := Acts} = Conf ->
             NewActs = [AName || AName <- Acts, FilterFunc(AName, ActionName)],
             {ok, _} = emqx_conf:update(
-                emqx_rule_engine:config_key_path() ++ [RuleId, actions],
-                NewActs,
+                ?RULE_PATH(RuleId),
+                Conf#{<<"actions">> => NewActs},
                 #{override_to => cluster}
             ),
             ok
@@ -372,7 +374,7 @@ init([]) ->
         {write_concurrency, true},
         {read_concurrency, true}
     ]),
-    ok = emqx_config_handler:add_handler(
+    ok = emqx_conf:add_handler(
         [rule_engine, jq_implementation_module],
         emqx_rule_engine_schema
     ),

+ 5 - 10
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -351,14 +351,13 @@ param_path_id() ->
             {400, #{code => 'BAD_REQUEST', message => <<"empty rule id is not allowed">>}};
         Id ->
             Params = filter_out_request_body(add_metadata(Params0)),
-            ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
             case emqx_rule_engine:get_rule(Id) of
                 {ok, _Rule} ->
                     {400, #{code => 'BAD_REQUEST', message => <<"rule id already exists">>}};
                 not_found ->
+                    ConfPath = ?RULE_PATH(Id),
                     case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
-                        {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
-                            [Rule] = get_one_rule(AllRules, Id),
+                        {ok, #{post_config_update := #{emqx_rule_engine := Rule}}} ->
                             {201, format_rule_info_resp(Rule)};
                         {error, Reason} ->
                             ?SLOG(error, #{
@@ -396,10 +395,9 @@ param_path_id() ->
     end;
 '/rules/:id'(put, #{bindings := #{id := Id}, body := Params0}) ->
     Params = filter_out_request_body(Params0),
-    ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
+    ConfPath = ?RULE_PATH(Id),
     case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
-        {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
-            [Rule] = get_one_rule(AllRules, Id),
+        {ok, #{post_config_update := #{emqx_rule_engine := Rule}}} ->
             {200, format_rule_info_resp(Rule)};
         {error, Reason} ->
             ?SLOG(error, #{
@@ -412,7 +410,7 @@ param_path_id() ->
 '/rules/:id'(delete, #{bindings := #{id := Id}}) ->
     case emqx_rule_engine:get_rule(Id) of
         {ok, _Rule} ->
-            ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
+            ConfPath = ?RULE_PATH(Id),
             case emqx_conf:remove(ConfPath, #{override_to => cluster}) of
                 {ok, _} ->
                     {204};
@@ -655,9 +653,6 @@ aggregate_metrics(AllMetrics) ->
         AllMetrics
     ).
 
-get_one_rule(AllRules, Id) ->
-    [R || R = #{id := Id0} <- AllRules, Id0 == Id].
-
 add_metadata(Params) ->
     Params#{
         <<"metadata">> => #{

+ 6 - 2
apps/emqx_rule_engine/src/emqx_rule_engine_app.erl

@@ -29,11 +29,15 @@ start(_Type, _Args) ->
     ok = emqx_rule_events:reload(),
     SupRet = emqx_rule_engine_sup:start_link(),
     ok = emqx_rule_engine:load_rules(),
-    emqx_conf:add_handler(emqx_rule_engine:config_key_path(), emqx_rule_engine),
+    RulePath = [RuleEngine | _] = ?KEY_PATH,
+    emqx_conf:add_handler(RulePath ++ ['?'], emqx_rule_engine),
+    emqx_conf:add_handler([RuleEngine], emqx_rule_engine),
     emqx_rule_engine_cli:load(),
     SupRet.
 
 stop(_State) ->
     emqx_rule_engine_cli:unload(),
-    emqx_conf:remove_handler(emqx_rule_engine:config_key_path()),
+    RulePath = [RuleEngine | _] = ?KEY_PATH,
+    emqx_conf:remove_handler(RulePath ++ ['?']),
+    emqx_conf:remove_handler([RuleEngine]),
     ok = emqx_rule_events:unload().

+ 10 - 12
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -472,19 +472,17 @@ t_ensure_action_removed(_) ->
     Id = <<"t_ensure_action_removed">>,
     GetSelectedData = <<"emqx_rule_sqltester:get_selected_data">>,
     emqx:update_config(
-        [rule_engine, rules],
+        [rule_engine, rules, Id],
         #{
-            Id => #{
-                <<"actions">> => [
-                    #{<<"function">> => GetSelectedData},
-                    #{<<"function">> => <<"console">>},
-                    #{<<"function">> => <<"republish">>},
-                    <<"mysql:foo">>,
-                    <<"mqtt:bar">>
-                ],
-                <<"description">> => <<"">>,
-                <<"sql">> => <<"SELECT * FROM \"t/#\"">>
-            }
+            <<"actions">> => [
+                #{<<"function">> => GetSelectedData},
+                #{<<"function">> => <<"console">>},
+                #{<<"function">> => <<"republish">>},
+                <<"mysql:foo">>,
+                <<"mqtt:bar">>
+            ],
+            <<"description">> => <<"">>,
+            <<"sql">> => <<"SELECT * FROM \"t/#\"">>
         }
     ),
     ?assertMatch(

+ 1 - 1
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src

@@ -1,6 +1,6 @@
 {application, emqx_ee_schema_registry, [
     {description, "EMQX Schema Registry"},
-    {vsn, "0.1.3"},
+    {vsn, "0.1.4"},
     {registered, [emqx_ee_schema_registry_sup]},
     {mod, {emqx_ee_schema_registry_app, []}},
     {applications, [

+ 18 - 24
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl

@@ -104,38 +104,32 @@ list_schemas() ->
 %%-------------------------------------------------------------------------------------------------
 %% `emqx_config_handler' API
 %%-------------------------------------------------------------------------------------------------
-
+%% remove
 post_config_update(
-    [?CONF_KEY_ROOT, schemas] = _Path,
+    [?CONF_KEY_ROOT, schemas, Name],
+    '$remove',
+    _NewSchemas,
+    _OldSchemas,
+    _AppEnvs
+) ->
+    async_delete_serdes([Name]),
+    ok;
+%% add or update
+post_config_update(
+    [?CONF_KEY_ROOT, schemas, NewName],
     _Cmd,
-    NewConf = #{schemas := NewSchemas},
-    OldConf = #{},
+    NewSchemas,
+    %% undefined or OldSchemas
+    _,
     _AppEnvs
 ) ->
-    OldSchemas = maps:get(schemas, OldConf, #{}),
-    #{
-        added := Added,
-        changed := Changed0,
-        removed := Removed
-    } = emqx_utils_maps:diff_maps(NewSchemas, OldSchemas),
-    Changed = maps:map(fun(_N, {_Old, New}) -> New end, Changed0),
-    RemovedNames = maps:keys(Removed),
-    case RemovedNames of
-        [] ->
-            ok;
-        _ ->
-            async_delete_serdes(RemovedNames)
-    end,
-    SchemasToBuild = maps:to_list(maps:merge(Changed, Added)),
-    case build_serdes(SchemasToBuild) of
+    case build_serdes([{NewName, NewSchemas}]) of
         ok ->
-            {ok, NewConf};
+            {ok, #{NewName => NewSchemas}};
         {error, Reason, SerdesToRollback} ->
             lists:foreach(fun ensure_serde_absent/1, SerdesToRollback),
             {error, Reason}
-    end;
-post_config_update(_Path, _Cmd, NewConf, _OldConf, _AppEnvs) ->
-    {ok, NewConf}.
+    end.
 
 %%-------------------------------------------------------------------------------------------------
 %% `gen_server' API

+ 2 - 2
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl

@@ -11,9 +11,9 @@
 
 start(_StartType, _StartArgs) ->
     ok = mria_rlog:wait_for_shards([?SCHEMA_REGISTRY_SHARD], infinity),
-    emqx_conf:add_handler(?CONF_KEY_PATH, emqx_ee_schema_registry),
+    emqx_conf:add_handler([?CONF_KEY_ROOT, schemas, '?'], emqx_ee_schema_registry),
     emqx_ee_schema_registry_sup:start_link().
 
 stop(_State) ->
-    emqx_conf:remove_handler(?CONF_KEY_PATH),
+    emqx_conf:remove_handler([?CONF_KEY_ROOT, schemas, '?']),
     ok.

+ 13 - 9
lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl

@@ -607,21 +607,25 @@ t_fail_rollback(Config) ->
     SerdeType = ?config(serde_type, Config),
     OkSchema = emqx_utils_maps:binary_key_map(schema_params(SerdeType)),
     BrokenSchema = OkSchema#{<<"source">> := <<"{}">>},
-    %% hopefully, for this small map, the key order is used.
-    Serdes = #{
-        <<"a">> => OkSchema,
-        <<"z">> => BrokenSchema
-    },
+
+    ?assertMatch(
+        {ok, _},
+        emqx_conf:update(
+            [?CONF_KEY_ROOT, schemas, <<"a">>],
+            OkSchema,
+            #{}
+        )
+    ),
     ?assertMatch(
         {error, _},
         emqx_conf:update(
-            [?CONF_KEY_ROOT, schemas],
-            Serdes,
+            [?CONF_KEY_ROOT, schemas, <<"z">>],
+            BrokenSchema,
             #{}
         )
     ),
-    %% no serdes should be in the table
-    ?assertEqual({error, not_found}, emqx_ee_schema_registry:get_serde(<<"a">>)),
+    ?assertMatch({ok, #{name := <<"a">>}}, emqx_ee_schema_registry:get_serde(<<"a">>)),
+    %% no z serdes should be in the table
     ?assertEqual({error, not_found}, emqx_ee_schema_registry:get_serde(<<"z">>)),
     ok.