Просмотр исходного кода

feat: update listeners from cli

某文 2 лет назад
Родитель
Сommit
d0d6992a14
2 измененных файлов с 289 добавлено и 50 удалено
  1. 135 50
      apps/emqx/src/emqx_listeners.erl
  2. 154 0
      apps/emqx/test/emqx_listeners_update_SUITE.erl

+ 135 - 50
apps/emqx/src/emqx_listeners.erl

@@ -57,6 +57,7 @@
 ]).
 
 -export([pre_config_update/3, post_config_update/5]).
+-export([create_listener/3, remove_listener/3, update_listener/3]).
 
 -export([format_bind/1]).
 
@@ -65,8 +66,8 @@
 -endif.
 
 -type listener_id() :: atom() | binary().
-
--define(CONF_KEY_PATH, [listeners, '?', '?']).
+-define(ROOT_KEY, listeners).
+-define(CONF_KEY_PATH, [?ROOT_KEY, '?', '?']).
 -define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]).
 -define(MARK_DEL, ?TOMBSTONE_CONFIG_CHANGE_REQ).
 
@@ -212,7 +213,10 @@ shutdown_count(_, _, _) ->
 start() ->
     %% The ?MODULE:start/0 will be called by emqx_app when emqx get started,
     %% so we install the config handler here.
+    %% callback when http api request
     ok = emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE),
+    %% callback when reload from config file
+    ok = emqx_config_handler:add_handler([?ROOT_KEY], ?MODULE),
     foreach_listeners(fun start_listener/3).
 
 -spec start_listener(listener_id()) -> ok | {error, term()}.
@@ -287,7 +291,8 @@ restart_listener(Type, ListenerName, OldConf, NewConf) ->
 stop() ->
     %% The ?MODULE:stop/0 will be called by emqx_app when emqx is going to shutdown,
     %% so we uninstall the config handler here.
-    _ = emqx_config_handler:remove_handler(?CONF_KEY_PATH),
+    ok = emqx_config_handler:remove_handler(?CONF_KEY_PATH),
+    ok = emqx_config_handler:remove_handler([?ROOT_KEY]),
     foreach_listeners(fun stop_listener/3).
 
 -spec stop_listener(listener_id()) -> ok | {error, term()}.
@@ -463,50 +468,34 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
     end.
 
 %% Update the listeners at runtime
-pre_config_update([listeners, Type, Name], {create, NewConf}, V) when
+pre_config_update([?ROOT_KEY, Type, Name], {create, NewConf}, V) when
     V =:= undefined orelse V =:= ?TOMBSTONE_VALUE
 ->
-    CertsDir = certs_dir(Type, Name),
-    {ok, convert_certs(CertsDir, NewConf)};
-pre_config_update([listeners, _Type, _Name], {create, _NewConf}, _RawConf) ->
+    {ok, convert_certs(Type, Name, NewConf)};
+pre_config_update([?ROOT_KEY, _Type, _Name], {create, _NewConf}, _RawConf) ->
     {error, already_exist};
-pre_config_update([listeners, _Type, _Name], {update, _Request}, undefined) ->
+pre_config_update([?ROOT_KEY, _Type, _Name], {update, _Request}, undefined) ->
     {error, not_found};
-pre_config_update([listeners, Type, Name], {update, Request}, RawConf) ->
-    NewConfT = emqx_utils_maps:deep_merge(RawConf, Request),
-    NewConf = ensure_override_limiter_conf(NewConfT, Request),
-    CertsDir = certs_dir(Type, Name),
-    {ok, convert_certs(CertsDir, NewConf)};
-pre_config_update([listeners, _Type, _Name], {action, _Action, Updated}, RawConf) ->
-    NewConf = emqx_utils_maps:deep_merge(RawConf, Updated),
-    {ok, NewConf};
-pre_config_update([listeners, _Type, _Name], ?MARK_DEL, _RawConf) ->
+pre_config_update([?ROOT_KEY, Type, Name], {update, Request}, RawConf) ->
+    RawConf1 = emqx_utils_maps:deep_merge(RawConf, Request),
+    RawConf2 = ensure_override_limiter_conf(RawConf1, Request),
+    {ok, convert_certs(Type, Name, RawConf2)};
+pre_config_update([?ROOT_KEY, _Type, _Name], {action, _Action, Updated}, RawConf) ->
+    {ok, emqx_utils_maps:deep_merge(RawConf, Updated)};
+pre_config_update([?ROOT_KEY, _Type, _Name], ?MARK_DEL, _RawConf) ->
     {ok, ?TOMBSTONE_VALUE};
-pre_config_update(_Path, _Request, RawConf) ->
-    {ok, RawConf}.
-
-post_config_update([listeners, Type, Name], {create, _Request}, NewConf, undefined, _AppEnvs) ->
-    start_listener(Type, Name, NewConf);
-post_config_update([listeners, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) ->
-    try_clear_ssl_files(certs_dir(Type, Name), NewConf, OldConf),
-    ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
-    case NewConf of
-        #{enabled := true} -> restart_listener(Type, Name, {OldConf, NewConf});
-        _ -> ok
-    end;
-post_config_update([listeners, Type, Name], Op, _, OldConf, _AppEnvs) when
-    Op =:= ?MARK_DEL andalso is_map(OldConf)
-->
-    ok = unregister_ocsp_stapling_refresh(Type, Name),
-    case stop_listener(Type, Name, OldConf) of
-        ok ->
-            _ = emqx_authentication:delete_chain(listener_id(Type, Name)),
-            CertsDir = certs_dir(Type, Name),
-            clear_certs(CertsDir, OldConf);
-        Err ->
-            Err
-    end;
-post_config_update([listeners, Type, Name], {action, _Action, _}, NewConf, OldConf, _AppEnvs) ->
+pre_config_update([?ROOT_KEY], RawConf, RawConf) ->
+    {ok, RawConf};
+pre_config_update([?ROOT_KEY], NewConf, _RawConf) ->
+    {ok, convert_certs(NewConf)}.
+
+post_config_update([?ROOT_KEY, Type, Name], {create, _Request}, NewConf, undefined, _AppEnvs) ->
+    create_listener(Type, Name, NewConf);
+post_config_update([?ROOT_KEY, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) ->
+    update_listener(Type, Name, {OldConf, NewConf});
+post_config_update([?ROOT_KEY, Type, Name], ?MARK_DEL, _, OldConf = #{}, _AppEnvs) ->
+    remove_listener(Type, Name, OldConf);
+post_config_update([?ROOT_KEY, Type, Name], {action, _Action, _}, NewConf, OldConf, _AppEnvs) ->
     #{enabled := NewEnabled} = NewConf,
     #{enabled := OldEnabled} = OldConf,
     case {NewEnabled, OldEnabled} of
@@ -523,9 +512,65 @@ post_config_update([listeners, Type, Name], {action, _Action, _}, NewConf, OldCo
             ok = unregister_ocsp_stapling_refresh(Type, Name),
             stop_listener(Type, Name, OldConf)
     end;
+post_config_update([?ROOT_KEY], _Request, OldConf, OldConf, _AppEnvs) ->
+    ok;
+post_config_update([?ROOT_KEY], _Request, NewConf, OldConf, _AppEnvs) ->
+    #{added := Added, removed := Removed, changed := Changed} = diff_confs(NewConf, OldConf),
+    Updated = lists:map(fun({{{T, N}, Old}, {_, New}}) -> {{T, N}, {Old, New}} end, Changed),
+    perform_listener_changes([
+        {fun ?MODULE:remove_listener/3, Removed},
+        {fun ?MODULE:update_listener/3, Updated},
+        {fun ?MODULE:create_listener/3, Added}
+    ]);
 post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) ->
     ok.
 
+create_listener(Type, Name, NewConf) ->
+    Res = start_listener(Type, Name, NewConf),
+    recreate_authenticator(Res, Type, Name, NewConf).
+
+recreate_authenticator(ok, Type, Name, Conf) ->
+    Chain = listener_id(Type, Name),
+    _ = emqx_authentication:delete_chain(Chain),
+    case maps:get(authentication, Conf, []) of
+        [] -> ok;
+        AuthN -> emqx_authentication:create_authenticator(Chain, AuthN)
+    end;
+recreate_authenticator(Error, _Type, _Name, _NewConf) ->
+    Error.
+
+remove_listener(Type, Name, OldConf) ->
+    ok = unregister_ocsp_stapling_refresh(Type, Name),
+    case stop_listener(Type, Name, OldConf) of
+        ok ->
+            _ = emqx_authentication:delete_chain(listener_id(Type, Name)),
+            clear_certs(certs_dir(Type, Name), OldConf);
+        Err ->
+            Err
+    end.
+
+update_listener(Type, Name, {OldConf, NewConf}) ->
+    try_clear_ssl_files(certs_dir(Type, Name), NewConf, OldConf),
+    ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
+    Res = restart_listener(Type, Name, {OldConf, NewConf}),
+    recreate_authenticator(Res, Type, Name, NewConf).
+
+perform_listener_changes([]) ->
+    ok;
+perform_listener_changes([{Action, ConfL} | Tasks]) ->
+    case perform_listener_changes(Action, ConfL) of
+        ok -> perform_listener_changes(Tasks);
+        {error, Reason} -> {error, Reason}
+    end.
+
+perform_listener_changes(_Action, []) ->
+    ok;
+perform_listener_changes(Action, [{{Type, Name}, Diff} | MapConf]) ->
+    case Action(Type, Name, Diff) of
+        ok -> perform_listener_changes(Action, MapConf);
+        {error, Reason} -> {error, Reason}
+    end.
+
 esockd_opts(ListenerId, Type, Opts0) ->
     Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
     Limiter = limiter(Opts0),
@@ -701,6 +746,29 @@ del_limiter_bucket(Id, Conf) ->
             )
     end.
 
+diff_confs(NewConfs, OldConfs) ->
+    emqx_utils:diff_lists(
+        flatten_confs(NewConfs),
+        flatten_confs(OldConfs),
+        fun({Key, _}) -> Key end
+    ).
+
+flatten_confs(Conf0) ->
+    lists:flatmap(
+        fun({Type, Conf}) ->
+            do_flatten_confs(Type, Conf)
+        end,
+        maps:to_list(Conf0)
+    ).
+
+do_flatten_confs(Type, Conf0) ->
+    FilterFun =
+        fun
+            ({_Name, ?TOMBSTONE_TYPE}) -> false;
+            ({Name, Conf}) -> {true, {{Type, Name}, Conf}}
+        end,
+    lists:filtermap(FilterFun, maps:to_list(Conf0)).
+
 enable_authn(Opts) ->
     maps:get(enable_authn, Opts, true).
 
@@ -762,14 +830,32 @@ parse_bind(#{<<"bind">> := Bind}) ->
 certs_dir(Type, Name) ->
     iolist_to_binary(filename:join(["listeners", Type, Name])).
 
-convert_certs(CertsDir, Conf) ->
+convert_certs(ListenerConf) ->
+    maps:fold(
+        fun(Type, Listeners0, Acc) ->
+            Listeners1 =
+                maps:fold(
+                    fun(Name, Conf, Acc1) ->
+                        Acc1#{Name => convert_certs(Type, Name, Conf)}
+                    end,
+                    #{},
+                    Listeners0
+                ),
+            Acc#{Type => Listeners1}
+        end,
+        #{},
+        ListenerConf
+    ).
+
+convert_certs(Type, Name, Conf) ->
+    CertsDir = certs_dir(Type, Name),
     case emqx_tls_lib:ensure_ssl_files(CertsDir, get_ssl_options(Conf)) of
         {ok, undefined} ->
             Conf;
         {ok, SSL} ->
             Conf#{<<"ssl_options">> => SSL};
         {error, Reason} ->
-            ?SLOG(error, Reason#{msg => "bad_ssl_config"}),
+            ?SLOG(error, Reason#{msg => "bad_ssl_config", type => Type, name => Name}),
             throw({bad_ssl_config, Reason})
     end.
 
@@ -791,13 +877,15 @@ try_clear_ssl_files(CertsDir, NewConf, OldConf) ->
     OldSSL = get_ssl_options(OldConf),
     emqx_tls_lib:delete_ssl_files(CertsDir, NewSSL, OldSSL).
 
-get_ssl_options(Conf) ->
+get_ssl_options(Conf = #{}) ->
     case maps:find(ssl_options, Conf) of
         {ok, SSL} ->
             SSL;
         error ->
             maps:get(<<"ssl_options">>, Conf, undefined)
-    end.
+    end;
+get_ssl_options(_) ->
+    undefined.
 
 %% @doc Get QUIC optional settings for low level tunings.
 %% @see quicer:quic_settings()
@@ -889,8 +977,5 @@ unregister_ocsp_stapling_refresh(Type, Name) ->
     emqx_ocsp_cache:unregister_listener(ListenerId),
     ok.
 
-%% There is currently an issue with frontend
-%% infinity is not a good value for it, so we use 5m for now
 default_max_conn() ->
-    %% TODO: <<"infinity">>
-    5_000_000.
+    <<"infinity">>.

+ 154 - 0
apps/emqx/test/emqx_listeners_update_SUITE.erl

@@ -0,0 +1,154 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_listeners_update_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_schema.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-import(emqx_listeners, [current_conns/2, is_running/1]).
+
+-define(LISTENERS, [listeners]).
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    emqx_common_test_helpers:boot_modules(all),
+    emqx_common_test_helpers:start_apps([]),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_common_test_helpers:stop_apps([]).
+
+init_per_testcase(_TestCase, Config) ->
+    Init = emqx:get_raw_config(?LISTENERS),
+    [{init_conf, Init} | Config].
+
+end_per_testcase(_TestCase, Config) ->
+    Conf = ?config(init_conf, Config),
+    {ok, _} = emqx:update_config(?LISTENERS, Conf),
+    ok.
+
+t_default_conf(_Config) ->
+    ?assertMatch(
+        #{
+            <<"tcp">> := #{<<"default">> := #{<<"bind">> := <<"0.0.0.0:1883">>}},
+            <<"ssl">> := #{<<"default">> := #{<<"bind">> := <<"0.0.0.0:8883">>}},
+            <<"ws">> := #{<<"default">> := #{<<"bind">> := <<"0.0.0.0:8083">>}},
+            <<"wss">> := #{<<"default">> := #{<<"bind">> := <<"0.0.0.0:8084">>}}
+        },
+        emqx:get_raw_config(?LISTENERS)
+    ),
+    ?assertMatch(
+        #{
+            tcp := #{default := #{bind := {{0, 0, 0, 0}, 1883}}},
+            ssl := #{default := #{bind := {{0, 0, 0, 0}, 8883}}},
+            ws := #{default := #{bind := {{0, 0, 0, 0}, 8083}}},
+            wss := #{default := #{bind := {{0, 0, 0, 0}, 8084}}}
+        },
+        emqx:get_config(?LISTENERS)
+    ),
+    ok.
+
+t_update_conf(_Conf) ->
+    Raw = emqx:get_raw_config(?LISTENERS),
+    Raw1 = emqx_utils_maps:deep_put(
+        [<<"tcp">>, <<"default">>, <<"bind">>], Raw, <<"127.0.0.1:1883">>
+    ),
+    Raw2 = emqx_utils_maps:deep_put(
+        [<<"ssl">>, <<"default">>, <<"bind">>], Raw1, <<"127.0.0.1:8883">>
+    ),
+    Raw3 = emqx_utils_maps:deep_put(
+        [<<"ws">>, <<"default">>, <<"bind">>], Raw2, <<"0.0.0.0:8083">>
+    ),
+    Raw4 = emqx_utils_maps:deep_put(
+        [<<"wss">>, <<"default">>, <<"bind">>], Raw3, <<"127.0.0.1:8084">>
+    ),
+    ?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw4)),
+    ?assertMatch(
+        #{
+            <<"tcp">> := #{<<"default">> := #{<<"bind">> := <<"127.0.0.1:1883">>}},
+            <<"ssl">> := #{<<"default">> := #{<<"bind">> := <<"127.0.0.1:8883">>}},
+            <<"ws">> := #{<<"default">> := #{<<"bind">> := <<"0.0.0.0:8083">>}},
+            <<"wss">> := #{<<"default">> := #{<<"bind">> := <<"127.0.0.1:8084">>}}
+        },
+        emqx:get_raw_config(?LISTENERS)
+    ),
+    BindTcp = {{127, 0, 0, 1}, 1883},
+    BindSsl = {{127, 0, 0, 1}, 8883},
+    BindWs = {{0, 0, 0, 0}, 8083},
+    BindWss = {{127, 0, 0, 1}, 8084},
+    ?assertMatch(
+        #{
+            tcp := #{default := #{bind := BindTcp}},
+            ssl := #{default := #{bind := BindSsl}},
+            ws := #{default := #{bind := BindWs}},
+            wss := #{default := #{bind := BindWss}}
+        },
+        emqx:get_config(?LISTENERS)
+    ),
+    ?assertError(not_found, current_conns(<<"tcp:default">>, {{0, 0, 0, 0}, 1883})),
+    ?assertError(not_found, current_conns(<<"ssl:default">>, {{0, 0, 0, 0}, 8883})),
+
+    ?assertEqual(0, current_conns(<<"tcp:default">>, BindTcp)),
+    ?assertEqual(0, current_conns(<<"ssl:default">>, BindSsl)),
+
+    ?assertEqual({0, 0, 0, 0}, proplists:get_value(ip, ranch:info('ws:default'))),
+    ?assertEqual({127, 0, 0, 1}, proplists:get_value(ip, ranch:info('wss:default'))),
+    ?assert(is_running('ws:default')),
+    ?assert(is_running('wss:default')),
+    ok.
+
+t_add_delete_conf(_Conf) ->
+    Raw = emqx:get_raw_config(?LISTENERS),
+    %% add
+    #{<<"tcp">> := #{<<"default">> := Tcp}} = Raw,
+    NewBind = <<"127.0.0.1:1987">>,
+    Raw1 = emqx_utils_maps:deep_put([<<"tcp">>, <<"new">>], Raw, Tcp#{<<"bind">> => NewBind}),
+    Raw2 = emqx_utils_maps:deep_put([<<"ssl">>, <<"default">>], Raw1, ?TOMBSTONE_VALUE),
+    ?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw2)),
+    ?assertEqual(0, current_conns(<<"tcp:new">>, {{127, 0, 0, 1}, 1987})),
+    ?assertError(not_found, current_conns(<<"ssl:default">>, {{0, 0, 0, 0}, 8883})),
+    %% deleted
+    ?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw)),
+    ?assertError(not_found, current_conns(<<"tcp:new">>, {{127, 0, 0, 1}, 1987})),
+    ?assertEqual(0, current_conns(<<"ssl:default">>, {{0, 0, 0, 0}, 8883})),
+    ok.
+
+t_delete_default_conf(_Conf) ->
+    Raw = emqx:get_raw_config(?LISTENERS),
+    %% delete default listeners
+    Raw1 = emqx_utils_maps:deep_put([<<"tcp">>, <<"default">>], Raw, ?TOMBSTONE_VALUE),
+    Raw2 = emqx_utils_maps:deep_put([<<"ssl">>, <<"default">>], Raw1, ?TOMBSTONE_VALUE),
+    Raw3 = emqx_utils_maps:deep_put([<<"ws">>, <<"default">>], Raw2, ?TOMBSTONE_VALUE),
+    Raw4 = emqx_utils_maps:deep_put([<<"wss">>, <<"default">>], Raw3, ?TOMBSTONE_VALUE),
+    ?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw4)),
+    ?assertError(not_found, current_conns(<<"tcp:default">>, {{0, 0, 0, 0}, 1883})),
+    ?assertError(not_found, current_conns(<<"ssl:default">>, {{0, 0, 0, 0}, 8883})),
+    ?assertMatch({error, not_found}, is_running('ws:default')),
+    ?assertMatch({error, not_found}, is_running('wss:default')),
+
+    %% reset
+    ?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw)),
+    ?assertEqual(0, current_conns(<<"tcp:default">>, {{0, 0, 0, 0}, 1883})),
+    ?assertEqual(0, current_conns(<<"ssl:default">>, {{0, 0, 0, 0}, 8883})),
+    ?assert(is_running('ws:default')),
+    ?assert(is_running('wss:default')),
+    ok.