|
|
@@ -23,6 +23,9 @@
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
|
|
+-define(SERVERS, [exhook, servers]).
|
|
|
+-define(EXHOOK, [exhook]).
|
|
|
+
|
|
|
%% APIs
|
|
|
-export([start_link/0]).
|
|
|
|
|
|
@@ -148,7 +151,7 @@ update_config(KeyPath, UpdateReq) ->
|
|
|
Error
|
|
|
end.
|
|
|
|
|
|
-pre_config_update(_, {add, #{<<"name">> := Name} = Conf}, OldConf) ->
|
|
|
+pre_config_update(?SERVERS, {add, #{<<"name">> := Name} = Conf}, OldConf) ->
|
|
|
case lists:any(fun(#{<<"name">> := ExistedName}) -> ExistedName =:= Name end, OldConf) of
|
|
|
true ->
|
|
|
throw(already_exists);
|
|
|
@@ -156,48 +159,36 @@ pre_config_update(_, {add, #{<<"name">> := Name} = Conf}, OldConf) ->
|
|
|
NConf = maybe_write_certs(Conf),
|
|
|
{ok, OldConf ++ [NConf]}
|
|
|
end;
|
|
|
-pre_config_update(_, {update, Name, Conf}, OldConf) ->
|
|
|
- case replace_conf(Name, fun(_) -> Conf end, OldConf) of
|
|
|
- not_found -> throw(not_found);
|
|
|
- NewConf -> {ok, lists:map(fun maybe_write_certs/1, NewConf)}
|
|
|
- end;
|
|
|
-pre_config_update(_, {delete, ToDelete}, OldConf) ->
|
|
|
- case do_delete(ToDelete, OldConf) of
|
|
|
- not_found -> throw(not_found);
|
|
|
- NewConf -> {ok, NewConf}
|
|
|
- end;
|
|
|
-pre_config_update(_, {move, Name, Position}, OldConf) ->
|
|
|
- case do_move(Name, Position, OldConf) of
|
|
|
- not_found -> throw(not_found);
|
|
|
- NewConf -> {ok, NewConf}
|
|
|
- end;
|
|
|
-pre_config_update(_, {enable, Name, Enable}, OldConf) ->
|
|
|
- case
|
|
|
- replace_conf(
|
|
|
- Name,
|
|
|
- fun(Conf) -> Conf#{<<"enable">> => Enable} end,
|
|
|
- OldConf
|
|
|
- )
|
|
|
- of
|
|
|
- not_found -> throw(not_found);
|
|
|
- NewConf -> {ok, lists:map(fun maybe_write_certs/1, NewConf)}
|
|
|
- end.
|
|
|
+pre_config_update(?SERVERS, {update, Name, Conf}, OldConf) ->
|
|
|
+ NewConf = replace_conf(Name, fun(_) -> Conf end, OldConf),
|
|
|
+ {ok, lists:map(fun maybe_write_certs/1, NewConf)};
|
|
|
+pre_config_update(?SERVERS, {delete, ToDelete}, OldConf) ->
|
|
|
+ {ok, do_delete(ToDelete, OldConf)};
|
|
|
+pre_config_update(?SERVERS, {move, Name, Position}, OldConf) ->
|
|
|
+ {ok, do_move(Name, Position, OldConf)};
|
|
|
+pre_config_update(?SERVERS, {enable, Name, Enable}, OldConf) ->
|
|
|
+ ReplaceFun = fun(Conf) -> Conf#{<<"enable">> => Enable} end,
|
|
|
+ NewConf = replace_conf(Name, ReplaceFun, OldConf),
|
|
|
+ {ok, lists:map(fun maybe_write_certs/1, NewConf)};
|
|
|
+pre_config_update(?EXHOOK, NewConf, _OldConf) when NewConf =:= #{} ->
|
|
|
+ {ok, NewConf#{<<"servers">> => []}};
|
|
|
+pre_config_update(?EXHOOK, NewConf = #{<<"servers">> := Servers}, _OldConf) ->
|
|
|
+ {ok, NewConf#{<<"servers">> => lists:map(fun maybe_write_certs/1, Servers)}}.
|
|
|
|
|
|
post_config_update(_KeyPath, UpdateReq, NewConf, OldConf, _AppEnvs) ->
|
|
|
- Result = call({update_config, UpdateReq, NewConf}),
|
|
|
+ Result = call({update_config, UpdateReq, NewConf, OldConf}),
|
|
|
try_clear_ssl_files(UpdateReq, NewConf, OldConf),
|
|
|
{ok, Result}.
|
|
|
|
|
|
-%%=====================================================================
|
|
|
-
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% gen_server callbacks
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
init([]) ->
|
|
|
process_flag(trap_exit, true),
|
|
|
- emqx_conf:add_handler([exhook, servers], ?MODULE),
|
|
|
- ServerL = emqx:get_config([exhook, servers]),
|
|
|
+ emqx_conf:add_handler(?EXHOOK, ?MODULE),
|
|
|
+ emqx_conf:add_handler(?SERVERS, ?MODULE),
|
|
|
+ ServerL = emqx:get_config(?SERVERS),
|
|
|
Servers = load_all_servers(ServerL),
|
|
|
Servers2 = reorder(ServerL, Servers),
|
|
|
refresh_tick(),
|
|
|
@@ -222,22 +213,16 @@ handle_call(
|
|
|
OrderServers = sort_name_by_order(Infos, Servers),
|
|
|
{reply, OrderServers, State};
|
|
|
handle_call(
|
|
|
- {update_config, {move, _Name, _Position}, NewConfL},
|
|
|
+ {update_config, {move, _Name, _Position}, NewConfL, _},
|
|
|
_From,
|
|
|
#{servers := Servers} = State
|
|
|
) ->
|
|
|
Servers2 = reorder(NewConfL, Servers),
|
|
|
{reply, ok, State#{servers := Servers2}};
|
|
|
-handle_call({update_config, {delete, ToDelete}, _}, _From, State) ->
|
|
|
- emqx_exhook_metrics:on_server_deleted(ToDelete),
|
|
|
-
|
|
|
- #{servers := Servers} = State2 = do_unload_server(ToDelete, State),
|
|
|
-
|
|
|
- Servers2 = maps:remove(ToDelete, Servers),
|
|
|
-
|
|
|
- {reply, ok, update_servers(Servers2, State2)};
|
|
|
+handle_call({update_config, {delete, ToDelete}, _, _}, _From, State) ->
|
|
|
+ {reply, ok, remove_server(ToDelete, State)};
|
|
|
handle_call(
|
|
|
- {update_config, {add, RawConf}, NewConfL},
|
|
|
+ {update_config, {add, RawConf}, NewConfL, _},
|
|
|
_From,
|
|
|
#{servers := Servers} = State
|
|
|
) ->
|
|
|
@@ -246,14 +231,30 @@ handle_call(
|
|
|
Servers2 = Servers#{Name => Server},
|
|
|
Servers3 = reorder(NewConfL, Servers2),
|
|
|
{reply, Result, State#{servers := Servers3}};
|
|
|
-handle_call({lookup, Name}, _From, State) ->
|
|
|
- {reply, where_is_server(Name, State), State};
|
|
|
-handle_call({update_config, {update, Name, _Conf}, NewConfL}, _From, State) ->
|
|
|
+handle_call({update_config, {update, Name, _Conf}, NewConfL, _}, _From, State) ->
|
|
|
{Result, State2} = restart_server(Name, NewConfL, State),
|
|
|
{reply, Result, State2};
|
|
|
-handle_call({update_config, {enable, Name, _Enable}, NewConfL}, _From, State) ->
|
|
|
+handle_call({update_config, {enable, Name, _Enable}, NewConfL, _}, _From, State) ->
|
|
|
{Result, State2} = restart_server(Name, NewConfL, State),
|
|
|
{reply, Result, State2};
|
|
|
+handle_call({update_config, _, ConfL, ConfL}, _From, State) ->
|
|
|
+ {reply, ok, State};
|
|
|
+handle_call({update_config, _, #{servers := NewConfL}, #{servers := OldConfL}}, _From, State) ->
|
|
|
+ #{
|
|
|
+ removed := Removed,
|
|
|
+ added := Added,
|
|
|
+ changed := Updated
|
|
|
+ } = emqx_utils:diff_lists(NewConfL, OldConfL, fun(#{name := Name}) -> Name end),
|
|
|
+ State2 = remove_servers(Removed, State),
|
|
|
+ {UpdateRes, State3} = restart_servers(Updated, NewConfL, State2),
|
|
|
+ {AddRes, State4 = #{servers := Servers4}} = add_servers(Added, State3),
|
|
|
+ State5 = State4#{servers => reorder(NewConfL, Servers4)},
|
|
|
+ case UpdateRes =:= [] andalso AddRes =:= [] of
|
|
|
+ true -> {reply, ok, State5};
|
|
|
+ false -> {reply, {error, #{added => AddRes, updated => UpdateRes}}, State5}
|
|
|
+ end;
|
|
|
+handle_call({lookup, Name}, _From, State) ->
|
|
|
+ {reply, where_is_server(Name, State), State};
|
|
|
handle_call({server_info, Name}, _From, State) ->
|
|
|
case where_is_server(Name, State) of
|
|
|
not_found ->
|
|
|
@@ -287,6 +288,22 @@ handle_call(_Request, _From, State) ->
|
|
|
Reply = ok,
|
|
|
{reply, Reply, State}.
|
|
|
|
|
|
+remove_servers(Removes, State) ->
|
|
|
+ lists:foldl(
|
|
|
+ fun(Conf, Acc) ->
|
|
|
+ ToDelete = maps:get(name, Conf),
|
|
|
+ remove_server(ToDelete, Acc)
|
|
|
+ end,
|
|
|
+ State,
|
|
|
+ Removes
|
|
|
+ ).
|
|
|
+
|
|
|
+remove_server(ToDelete, State) ->
|
|
|
+ emqx_exhook_metrics:on_server_deleted(ToDelete),
|
|
|
+ #{servers := Servers} = State2 = do_unload_server(ToDelete, State),
|
|
|
+ Servers2 = maps:remove(ToDelete, Servers),
|
|
|
+ update_servers(Servers2, State2).
|
|
|
+
|
|
|
handle_cast(_Msg, State) ->
|
|
|
{noreply, State}.
|
|
|
|
|
|
@@ -310,6 +327,8 @@ terminate(Reason, State = #{servers := Servers}) ->
|
|
|
Servers
|
|
|
),
|
|
|
?tp(info, exhook_mgr_terminated, #{reason => Reason, servers => Servers}),
|
|
|
+ emqx_conf:remove_handler(?SERVERS),
|
|
|
+ emqx_conf:remove_handler(?EXHOOK),
|
|
|
ok.
|
|
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
@@ -325,6 +344,22 @@ unload_exhooks() ->
|
|
|
|| {Name, {M, F, _A}} <- ?ENABLED_HOOKS
|
|
|
].
|
|
|
|
|
|
+add_servers(Added, State) ->
|
|
|
+ lists:foldl(
|
|
|
+ fun(Conf = #{name := Name}, {ResAcc, StateAcc}) ->
|
|
|
+ case do_load_server(options_to_server(Conf)) of
|
|
|
+ {ok, Server} ->
|
|
|
+ #{servers := Servers} = StateAcc,
|
|
|
+ Servers2 = Servers#{Name => Server},
|
|
|
+ {ResAcc, update_servers(Servers2, StateAcc)};
|
|
|
+ {Err, StateAcc1} ->
|
|
|
+ {[Err | ResAcc], StateAcc1}
|
|
|
+ end
|
|
|
+ end,
|
|
|
+ {[], State},
|
|
|
+ Added
|
|
|
+ ).
|
|
|
+
|
|
|
do_load_server(#{name := Name} = Server) ->
|
|
|
case emqx_exhook_server:load(Name, Server) of
|
|
|
{ok, ServerState} ->
|
|
|
@@ -401,8 +436,7 @@ clean_reload_timer(#{timer := Timer}) ->
|
|
|
_ = erlang:cancel_timer(Timer),
|
|
|
ok.
|
|
|
|
|
|
--spec do_move(binary(), position(), list(server_options())) ->
|
|
|
- not_found | list(server_options()).
|
|
|
+-spec do_move(binary(), position(), list(server_options())) -> list(server_options()).
|
|
|
do_move(Name, Position, ConfL) ->
|
|
|
move(ConfL, Name, Position, []).
|
|
|
|
|
|
@@ -411,7 +445,7 @@ move([#{<<"name">> := Name} = Server | T], Name, Position, HeadL) ->
|
|
|
move([Server | T], Name, Position, HeadL) ->
|
|
|
move(T, Name, Position, [Server | HeadL]);
|
|
|
move([], _Name, _Position, _HeadL) ->
|
|
|
- not_found.
|
|
|
+ throw(not_found).
|
|
|
|
|
|
move_to(?CMD_MOVE_FRONT, Server, ServerL) ->
|
|
|
[Server | ServerL];
|
|
|
@@ -429,8 +463,7 @@ move_to([H | T], Position, Server, HeadL) ->
|
|
|
move_to([], _Position, _Server, _HeadL) ->
|
|
|
not_found.
|
|
|
|
|
|
--spec do_delete(binary(), list(server_options())) ->
|
|
|
- not_found | list(server_options()).
|
|
|
+-spec do_delete(binary(), list(server_options())) -> list(server_options()).
|
|
|
do_delete(ToDelete, OldConf) ->
|
|
|
case lists:any(fun(#{<<"name">> := ExistedName}) -> ExistedName =:= ToDelete end, OldConf) of
|
|
|
true ->
|
|
|
@@ -439,7 +472,7 @@ do_delete(ToDelete, OldConf) ->
|
|
|
OldConf
|
|
|
);
|
|
|
false ->
|
|
|
- not_found
|
|
|
+ throw(not_found)
|
|
|
end.
|
|
|
|
|
|
-spec reorder(list(server_options()), servers()) -> servers().
|
|
|
@@ -471,9 +504,7 @@ where_is_server(Name, #{servers := Servers}) ->
|
|
|
|
|
|
-type replace_fun() :: fun((server_options()) -> server_options()).
|
|
|
|
|
|
--spec replace_conf(binary(), replace_fun(), list(server_options())) ->
|
|
|
- not_found
|
|
|
- | list(server_options()).
|
|
|
+-spec replace_conf(binary(), replace_fun(), list(server_options())) -> list(server_options()).
|
|
|
replace_conf(Name, ReplaceFun, ConfL) ->
|
|
|
replace_conf(ConfL, Name, ReplaceFun, []).
|
|
|
|
|
|
@@ -483,7 +514,20 @@ replace_conf([#{<<"name">> := Name} = H | T], Name, ReplaceFun, HeadL) ->
|
|
|
replace_conf([H | T], Name, ReplaceFun, HeadL) ->
|
|
|
replace_conf(T, Name, ReplaceFun, [H | HeadL]);
|
|
|
replace_conf([], _, _, _) ->
|
|
|
- not_found.
|
|
|
+ throw(not_found).
|
|
|
+
|
|
|
+restart_servers(Updated, NewConfL, State) ->
|
|
|
+ lists:foldl(
|
|
|
+ fun({_Old, Conf}, {ResAcc, StateAcc}) ->
|
|
|
+ Name = maps:get(name, Conf),
|
|
|
+ case restart_server(Name, NewConfL, StateAcc) of
|
|
|
+ {ok, StateAcc1} -> {ResAcc, StateAcc1};
|
|
|
+ {Err, StateAcc1} -> {[Err | ResAcc], StateAcc1}
|
|
|
+ end
|
|
|
+ end,
|
|
|
+ {[], State},
|
|
|
+ Updated
|
|
|
+ ).
|
|
|
|
|
|
-spec restart_server(binary(), list(server_options()), state()) ->
|
|
|
{ok, state()}
|
|
|
@@ -612,6 +656,16 @@ try_clear_ssl_files({Op, Name, _}, NewConfs, OldConfs) when
|
|
|
NewSSL = find_server_ssl_cfg(Name, NewConfs),
|
|
|
OldSSL = find_server_ssl_cfg(Name, OldConfs),
|
|
|
emqx_tls_lib:delete_ssl_files(ssl_file_path(Name), NewSSL, OldSSL);
|
|
|
+%% replace the whole config from the cli
|
|
|
+try_clear_ssl_files(_Req, #{servers := NewServers}, #{servers := OldServers}) ->
|
|
|
+ lists:foreach(
|
|
|
+ fun(#{name := Name} = Conf) ->
|
|
|
+ NewSSL = find_server_ssl_cfg(Name, NewServers),
|
|
|
+ OldSSL = maps:get(ssl, Conf, undefined),
|
|
|
+ emqx_tls_lib:delete_ssl_files(ssl_file_path(Name), NewSSL, OldSSL)
|
|
|
+ end,
|
|
|
+ OldServers
|
|
|
+ );
|
|
|
try_clear_ssl_files(_Req, _NewConf, _OldConf) ->
|
|
|
ok.
|
|
|
|