|
|
@@ -18,6 +18,7 @@
|
|
|
-module(emqx_config_handler).
|
|
|
|
|
|
-include("logger.hrl").
|
|
|
+-include_lib("hocon/include/hoconsc.hrl").
|
|
|
|
|
|
-behaviour(gen_server).
|
|
|
|
|
|
@@ -28,6 +29,7 @@
|
|
|
, remove_handler/1
|
|
|
, update_config/3
|
|
|
, get_raw_cluster_override_conf/0
|
|
|
+ , info/0
|
|
|
, merge_to_old_config/2
|
|
|
]).
|
|
|
|
|
|
@@ -74,9 +76,11 @@ update_config(SchemaModule, ConfKeyPath, UpdateArgs) ->
|
|
|
AtomKeyPath = [atom(Key) || Key <- ConfKeyPath],
|
|
|
gen_server:call(?MODULE, {change_config, SchemaModule, AtomKeyPath, UpdateArgs}, infinity).
|
|
|
|
|
|
--spec add_handler(emqx_config:config_key_path(), handler_name()) -> ok.
|
|
|
+-spec add_handler(emqx_config:config_key_path(), handler_name()) ->
|
|
|
+ ok | {error, {conflict, list()}}.
|
|
|
add_handler(ConfKeyPath, HandlerName) ->
|
|
|
- gen_server:call(?MODULE, {add_handler, ConfKeyPath, HandlerName}, infinity).
|
|
|
+ assert_callback_function(HandlerName),
|
|
|
+ gen_server:call(?MODULE, {add_handler, ConfKeyPath, HandlerName}).
|
|
|
|
|
|
%% @doc Remove handler asynchronously
|
|
|
-spec remove_handler(emqx_config:config_key_path()) -> ok.
|
|
|
@@ -86,19 +90,21 @@ remove_handler(ConfKeyPath) ->
|
|
|
get_raw_cluster_override_conf() ->
|
|
|
gen_server:call(?MODULE, get_raw_cluster_override_conf).
|
|
|
|
|
|
+info() ->
|
|
|
+ gen_server:call(?MODULE, info).
|
|
|
+
|
|
|
%%============================================================================
|
|
|
|
|
|
-spec init(term()) -> {ok, state()}.
|
|
|
init(_) ->
|
|
|
process_flag(trap_exit, true),
|
|
|
- {ok, #{handlers => #{?MOD => ?MODULE}}}.
|
|
|
+ Handlers = load_prev_handlers(),
|
|
|
+ {ok, #{handlers => Handlers#{?MOD => ?MODULE}}}.
|
|
|
|
|
|
handle_call({add_handler, ConfKeyPath, HandlerName}, _From, State = #{handlers := Handlers}) ->
|
|
|
case deep_put_handler(ConfKeyPath, Handlers, HandlerName) of
|
|
|
- {ok, NewHandlers} ->
|
|
|
- {reply, ok, State#{handlers => NewHandlers}};
|
|
|
- Error ->
|
|
|
- {reply, Error, State}
|
|
|
+ {ok, NewHandlers} -> {reply, ok, State#{handlers => NewHandlers}};
|
|
|
+ {error, _Reason} = Error -> {reply, Error, State}
|
|
|
end;
|
|
|
|
|
|
handle_call({change_config, SchemaModule, ConfKeyPath, UpdateArgs}, _From,
|
|
|
@@ -108,48 +114,62 @@ handle_call({change_config, SchemaModule, ConfKeyPath, UpdateArgs}, _From,
|
|
|
handle_call(get_raw_cluster_override_conf, _From, State) ->
|
|
|
Reply = emqx_config:read_override_conf(#{override_to => cluster}),
|
|
|
{reply, Reply, State};
|
|
|
+handle_call(info, _From, State) ->
|
|
|
+ {reply, State, State};
|
|
|
handle_call(_Request, _From, State) ->
|
|
|
- Reply = ok,
|
|
|
- {reply, Reply, State}.
|
|
|
+ {reply, ok, State}.
|
|
|
|
|
|
-handle_cast({remove_handler, ConfKeyPath},
|
|
|
- State = #{handlers := Handlers}) ->
|
|
|
- {noreply, State#{handlers => emqx_map_lib:deep_remove(ConfKeyPath ++ [?MOD], Handlers)}};
|
|
|
+handle_cast({remove_handler, ConfKeyPath}, State = #{handlers := Handlers}) ->
|
|
|
+ NewHandlers = do_remove_handler(ConfKeyPath, Handlers),
|
|
|
+ {noreply, State#{handlers => NewHandlers}};
|
|
|
handle_cast(_Msg, State) ->
|
|
|
{noreply, State}.
|
|
|
|
|
|
handle_info(_Info, State) ->
|
|
|
{noreply, State}.
|
|
|
|
|
|
-terminate(_Reason, _State) ->
|
|
|
+terminate(_Reason, #{handlers := Handlers}) ->
|
|
|
+ save_handlers(Handlers),
|
|
|
ok.
|
|
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
{ok, State}.
|
|
|
|
|
|
-deep_put_handler([], Handlers, Mod) when is_map(Handlers) ->
|
|
|
+deep_put_handler([], Handlers, Mod) ->
|
|
|
{ok, Handlers#{?MOD => Mod}};
|
|
|
-deep_put_handler([], _Handlers, Mod) ->
|
|
|
- {ok, #{?MOD => Mod}};
|
|
|
-deep_put_handler([?WKEY | KeyPath], Handlers, Mod) ->
|
|
|
- deep_put_handler2(?WKEY, KeyPath, Handlers, Mod);
|
|
|
deep_put_handler([Key | KeyPath], Handlers, Mod) ->
|
|
|
- case maps:find(?WKEY, Handlers) of
|
|
|
- error ->
|
|
|
- deep_put_handler2(Key, KeyPath, Handlers, Mod);
|
|
|
- {ok, _SubHandlers} ->
|
|
|
- {error, {cannot_override_a_wildcard_path, [?WKEY | KeyPath]}}
|
|
|
- end.
|
|
|
-
|
|
|
-deep_put_handler2(Key, KeyPath, Handlers, Mod) ->
|
|
|
SubHandlers = maps:get(Key, Handlers, #{}),
|
|
|
case deep_put_handler(KeyPath, SubHandlers, Mod) of
|
|
|
- {ok, SubHandlers1} ->
|
|
|
- {ok, Handlers#{Key => SubHandlers1}};
|
|
|
- Error ->
|
|
|
- Error
|
|
|
+ {ok, NewSubHandlers} ->
|
|
|
+ NewHandlers = Handlers#{Key => NewSubHandlers},
|
|
|
+ case check_handler_conflict(NewHandlers) of
|
|
|
+ ok -> {ok, NewHandlers};
|
|
|
+ {error, Reason} -> {error, Reason}
|
|
|
+ end;
|
|
|
+ {error, _Reason} = Error -> Error
|
|
|
+ end.
|
|
|
+
|
|
|
+%% Make sure that Specify Key and ?WKEY cannot be on the same level.
|
|
|
+%%
|
|
|
+%% [k1, ?, ?], [k1, ?], [k1] is allow.
|
|
|
+%% [K1, ?, k2], [k1, ?, k3] is allow.
|
|
|
+%% [k1, ?, ?], [k1, ?, k2] is not allow.
|
|
|
+check_handler_conflict(Handlers) ->
|
|
|
+ Keys = filter_top_level_handlers(Handlers),
|
|
|
+ case lists:member(?WKEY, Keys) of
|
|
|
+ true when length(Keys) =:= 1 -> ok;
|
|
|
+ true -> {error, {conflict, Keys}};
|
|
|
+ false -> ok
|
|
|
end.
|
|
|
|
|
|
+filter_top_level_handlers(Handlers) ->
|
|
|
+ maps:fold(
|
|
|
+ fun
|
|
|
+ (K, #{?MOD := _}, Acc) -> [K | Acc];
|
|
|
+ (_K, #{}, Acc) -> Acc;
|
|
|
+ (?MOD, _, Acc) -> Acc
|
|
|
+ end, [], Handlers).
|
|
|
+
|
|
|
handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) ->
|
|
|
try
|
|
|
do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs)
|
|
|
@@ -157,9 +177,12 @@ handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) ->
|
|
|
throw : Reason ->
|
|
|
{error, Reason};
|
|
|
Error : Reason : ST ->
|
|
|
- ?SLOG(error, #{msg => "change_config_failed",
|
|
|
+ ?SLOG(error, #{msg => "change_config_crashed",
|
|
|
exception => Error,
|
|
|
reason => Reason,
|
|
|
+ update_req => UpdateArgs,
|
|
|
+ module => SchemaModule,
|
|
|
+ key_path => ConfKeyPath,
|
|
|
stacktrace => ST
|
|
|
}),
|
|
|
{error, config_update_crashed}
|
|
|
@@ -174,11 +197,12 @@ do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) ->
|
|
|
{error, Result}
|
|
|
end.
|
|
|
|
|
|
+process_update_request([_], _Handlers, {remove, _Opts}) ->
|
|
|
+ {error, "remove_root_is_forbidden"};
|
|
|
process_update_request(ConfKeyPath, _Handlers, {remove, Opts}) ->
|
|
|
OldRawConf = emqx_config:get_root_raw(ConfKeyPath),
|
|
|
BinKeyPath = bin_path(ConfKeyPath),
|
|
|
NewRawConf = emqx_map_lib:deep_remove(BinKeyPath, OldRawConf),
|
|
|
- _ = remove_from_local_if_cluster_change(BinKeyPath, Opts),
|
|
|
OverrideConf = remove_from_override_config(BinKeyPath, Opts),
|
|
|
{ok, NewRawConf, OverrideConf, Opts};
|
|
|
process_update_request(ConfKeyPath, Handlers, {{update, UpdateReq}, Opts}) ->
|
|
|
@@ -198,25 +222,22 @@ do_update_config([], Handlers, OldRawConf, UpdateReq, ConfKeyPath) ->
|
|
|
do_update_config([ConfKey | SubConfKeyPath], Handlers, OldRawConf,
|
|
|
UpdateReq, ConfKeyPath0) ->
|
|
|
ConfKeyPath = ConfKeyPath0 ++ [ConfKey],
|
|
|
- SubOldRawConf = get_sub_config(bin(ConfKey), OldRawConf),
|
|
|
+ ConfKeyBin = bin(ConfKey),
|
|
|
+ SubOldRawConf = get_sub_config(ConfKeyBin, OldRawConf),
|
|
|
SubHandlers = get_sub_handlers(ConfKey, Handlers),
|
|
|
case do_update_config(SubConfKeyPath, SubHandlers, SubOldRawConf, UpdateReq, ConfKeyPath) of
|
|
|
- {ok, NewUpdateReq} ->
|
|
|
- call_pre_config_update(Handlers, OldRawConf, #{bin(ConfKey) => NewUpdateReq},
|
|
|
- ConfKeyPath);
|
|
|
- Error ->
|
|
|
- Error
|
|
|
+ {ok, NewUpdateReq} -> merge_to_old_config(#{ConfKeyBin => NewUpdateReq}, OldRawConf);
|
|
|
+ Error -> Error
|
|
|
end.
|
|
|
|
|
|
check_and_save_configs(SchemaModule, ConfKeyPath, Handlers, NewRawConf, OverrideConf,
|
|
|
UpdateArgs, Opts) ->
|
|
|
OldConf = emqx_config:get_root(ConfKeyPath),
|
|
|
- FullRawConf = with_full_raw_confs(NewRawConf),
|
|
|
- {AppEnvs, CheckedConf} = emqx_config:check_config(SchemaModule, FullRawConf),
|
|
|
- NewConf = maps:with(maps:keys(OldConf), CheckedConf),
|
|
|
- _ = remove_from_local_if_cluster_change(ConfKeyPath, Opts),
|
|
|
+ Schema = schema(SchemaModule, ConfKeyPath),
|
|
|
+ {AppEnvs, #{root := NewConf}} = emqx_config:check_config(Schema, #{<<"root">> => NewRawConf}),
|
|
|
case do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, #{}) of
|
|
|
{ok, Result0} ->
|
|
|
+ remove_from_local_if_cluster_change(ConfKeyPath, Opts),
|
|
|
case save_configs(ConfKeyPath, AppEnvs, NewConf, NewRawConf, OverrideConf,
|
|
|
UpdateArgs, Opts) of
|
|
|
{ok, Result1} ->
|
|
|
@@ -259,8 +280,7 @@ get_sub_config(ConfKey, Conf) when is_map(Conf) ->
|
|
|
get_sub_config(_, _Conf) -> %% the Conf is a primitive
|
|
|
undefined.
|
|
|
|
|
|
-call_pre_config_update(Handlers, OldRawConf, UpdateReq, ConfKeyPath) ->
|
|
|
- HandlerName = maps:get(?MOD, Handlers, undefined),
|
|
|
+call_pre_config_update(#{?MOD := HandlerName}, OldRawConf, UpdateReq, ConfKeyPath) ->
|
|
|
case erlang:function_exported(HandlerName, pre_config_update, 3) of
|
|
|
true ->
|
|
|
case HandlerName:pre_config_update(ConfKeyPath, UpdateReq, OldRawConf) of
|
|
|
@@ -268,21 +288,25 @@ call_pre_config_update(Handlers, OldRawConf, UpdateReq, ConfKeyPath) ->
|
|
|
{error, Reason} -> {error, {pre_config_update, HandlerName, Reason}}
|
|
|
end;
|
|
|
false -> merge_to_old_config(UpdateReq, OldRawConf)
|
|
|
- end.
|
|
|
+ end;
|
|
|
+call_pre_config_update(_Handlers, OldRawConf, UpdateReq, _ConfKeyPath) ->
|
|
|
+ merge_to_old_config(UpdateReq, OldRawConf).
|
|
|
|
|
|
-call_post_config_update(Handlers, OldConf, NewConf, AppEnvs, UpdateReq, Result, ConfKeyPath) ->
|
|
|
- HandlerName = maps:get(?MOD, Handlers, undefined),
|
|
|
+call_post_config_update(#{?MOD := HandlerName}, OldConf, NewConf,
|
|
|
+ AppEnvs, UpdateReq, Result, ConfKeyPath) ->
|
|
|
case erlang:function_exported(HandlerName, post_config_update, 5) of
|
|
|
true ->
|
|
|
- case HandlerName:post_config_update(ConfKeyPath, UpdateReq, NewConf, OldConf,
|
|
|
- AppEnvs) of
|
|
|
+ case HandlerName:post_config_update(ConfKeyPath, UpdateReq,
|
|
|
+ NewConf, OldConf, AppEnvs) of
|
|
|
ok -> {ok, Result};
|
|
|
- {ok, Result1} ->
|
|
|
- {ok, Result#{HandlerName => Result1}};
|
|
|
+ {ok, Result1} -> {ok, Result#{HandlerName => Result1}};
|
|
|
{error, Reason} -> {error, {post_config_update, HandlerName, Reason}}
|
|
|
end;
|
|
|
false -> {ok, Result}
|
|
|
- end.
|
|
|
+ end;
|
|
|
+call_post_config_update(_Handlers, _OldConf, _NewConf, _AppEnvs,
|
|
|
+ _UpdateReq, Result, _ConfKeyPath) ->
|
|
|
+ {ok, Result}.
|
|
|
|
|
|
save_configs(ConfKeyPath, AppEnvs, CheckedConf, NewRawConf, OverrideConf, UpdateArgs, Opts) ->
|
|
|
case emqx_config:save_configs(AppEnvs, CheckedConf, NewRawConf, OverrideConf, Opts) of
|
|
|
@@ -295,6 +319,7 @@ save_configs(ConfKeyPath, AppEnvs, CheckedConf, NewRawConf, OverrideConf, Update
|
|
|
%% 1. the old config is undefined
|
|
|
%% 2. either the old or the new config is not of map type
|
|
|
%% the behaviour is merging the new the config to the old config if they are maps.
|
|
|
+
|
|
|
merge_to_old_config(UpdateReq, RawConf) when is_map(UpdateReq), is_map(RawConf) ->
|
|
|
{ok, maps:merge(RawConf, UpdateReq)};
|
|
|
merge_to_old_config(UpdateReq, _RawConf) ->
|
|
|
@@ -302,13 +327,12 @@ merge_to_old_config(UpdateReq, _RawConf) ->
|
|
|
|
|
|
%% local-override.conf priority is higher than cluster-override.conf
|
|
|
%% If we want cluster to take effect, we must remove the local.
|
|
|
-remove_from_local_if_cluster_change(BinKeyPath, Opts) ->
|
|
|
- case maps:get(override, Opts, local) of
|
|
|
- local -> ok;
|
|
|
- cluster ->
|
|
|
- Local = remove_from_override_config(BinKeyPath, Opts#{override_to => local}),
|
|
|
- emqx_config:save_to_override_conf(Local, Opts)
|
|
|
- end.
|
|
|
+remove_from_local_if_cluster_change(BinKeyPath, #{override_to := cluster} = Opts) ->
|
|
|
+ Local = remove_from_override_config(BinKeyPath, Opts#{override_to => local}),
|
|
|
+ _ = emqx_config:save_to_override_conf(Local, Opts),
|
|
|
+ ok;
|
|
|
+remove_from_local_if_cluster_change(_BinKeyPath, _Opts) ->
|
|
|
+ ok.
|
|
|
|
|
|
remove_from_override_config(_BinKeyPath, #{persistent := false}) ->
|
|
|
undefined;
|
|
|
@@ -337,9 +361,6 @@ return_rawconf(ConfKeyPath, #{rawconf_with_defaults := true}) ->
|
|
|
return_rawconf(ConfKeyPath, _) ->
|
|
|
emqx_config:get_raw(ConfKeyPath).
|
|
|
|
|
|
-with_full_raw_confs(PartialConf) ->
|
|
|
- maps:merge(emqx_config:get_raw([]), PartialConf).
|
|
|
-
|
|
|
bin_path(ConfKeyPath) -> [bin(Key) || Key <- ConfKeyPath].
|
|
|
|
|
|
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
|
|
|
@@ -351,3 +372,43 @@ atom(Str) when is_list(Str) ->
|
|
|
list_to_atom(Str);
|
|
|
atom(Atom) when is_atom(Atom) ->
|
|
|
Atom.
|
|
|
+
|
|
|
+-dialyzer({nowarn_function, do_remove_handler/2}).
|
|
|
+do_remove_handler(ConfKeyPath, Handlers) ->
|
|
|
+ NewHandlers = emqx_map_lib:deep_remove(ConfKeyPath ++ [?MOD], Handlers),
|
|
|
+ remove_empty_leaf(ConfKeyPath, NewHandlers).
|
|
|
+
|
|
|
+remove_empty_leaf([], Handlers) -> Handlers;
|
|
|
+remove_empty_leaf(KeyPath, Handlers) ->
|
|
|
+ case emqx_map_lib:deep_find(KeyPath, Handlers) =:= {ok, #{}} of
|
|
|
+ true -> %% empty leaf
|
|
|
+ Handlers1 = emqx_map_lib:deep_remove(KeyPath, Handlers),
|
|
|
+ SubKeyPath = lists:sublist(KeyPath, length(KeyPath) - 1),
|
|
|
+ remove_empty_leaf(SubKeyPath, Handlers1);
|
|
|
+ false -> Handlers
|
|
|
+ end.
|
|
|
+
|
|
|
+assert_callback_function(Mod) ->
|
|
|
+ case erlang:function_exported(Mod, pre_config_update, 3) orelse
|
|
|
+ erlang:function_exported(Mod, post_config_update, 5) of
|
|
|
+ true -> ok;
|
|
|
+ false -> error(#{msg => "bad_emqx_config_handler_callback", module => Mod})
|
|
|
+ end,
|
|
|
+ ok.
|
|
|
+
|
|
|
+schema(SchemaModule, [RootKey | _]) ->
|
|
|
+ Roots = hocon_schema:roots(SchemaModule),
|
|
|
+ Field =
|
|
|
+ case lists:keyfind(bin(RootKey), 1, Roots) of
|
|
|
+ {_, {Ref, ?REF(Ref)}} -> {Ref, ?R_REF(SchemaModule, Ref)};
|
|
|
+ {_, Field0} -> Field0
|
|
|
+ end,
|
|
|
+ #{roots => [root], fields => #{root => [Field]}}.
|
|
|
+
|
|
|
+load_prev_handlers() ->
|
|
|
+ Handlers = application:get_env(emqx, ?MODULE, #{}),
|
|
|
+ application:unset_env(emqx, ?MODULE),
|
|
|
+ Handlers.
|
|
|
+
|
|
|
+save_handlers(Handlers) ->
|
|
|
+ application:set_env(emqx, ?MODULE, Handlers).
|