Explorar o código

refactor(listener-schema): use a tombstone for deleted listeners

Zaiming (Stone) Shi %!s(int64=2) %!d(string=hai) anos
pai
achega
2e9dca280c

+ 11 - 5
apps/emqx/src/emqx_config_handler.erl

@@ -447,11 +447,17 @@ merge_to_override_config(RawConf, Opts) ->
 up_req({remove, _Opts}) -> '$remove';
 up_req({{update, Req}, _Opts}) -> Req.
 
-return_change_result(ConfKeyPath, {{update, _Req}, Opts}) ->
-    #{
-        config => emqx_config:get(ConfKeyPath),
-        raw_config => return_rawconf(ConfKeyPath, Opts)
-    };
+return_change_result(ConfKeyPath, {{update, Req}, Opts}) ->
+    case Req =/= emqx_schema:tombstone() of
+        true ->
+            #{
+                config => emqx_config:get(ConfKeyPath),
+                raw_config => return_rawconf(ConfKeyPath, Opts)
+            };
+        false ->
+            %% like remove, nothing to return
+            #{}
+    end;
 return_change_result(_ConfKeyPath, {remove, _Opts}) ->
     #{}.
 

+ 49 - 22
apps/emqx/src/emqx_listeners.erl

@@ -22,7 +22,9 @@
 -include("emqx_mqtt.hrl").
 -include("logger.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
 %% APIs
 -export([
     list_raw/0,
@@ -33,7 +35,8 @@
     is_running/1,
     current_conns/2,
     max_conns/2,
-    id_example/0
+    id_example/0,
+    default_max_conn/0
 ]).
 
 -export([
@@ -61,8 +64,12 @@
 -export([certs_dir/2]).
 -endif.
 
+-type listener_id() :: atom() | binary().
+
 -define(CONF_KEY_PATH, [listeners, '?', '?']).
 -define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]).
+-define(MARK_DEL, marked_for_deletion).
+-define(MARK_DEL_BIN, <<"marked_for_deletion">>).
 
 -spec id_example() -> atom().
 id_example() -> 'tcp:default'.
@@ -105,19 +112,22 @@ do_list_raw() ->
 
 format_raw_listeners({Type0, Conf}) ->
     Type = binary_to_atom(Type0),
-    lists:map(
-        fun({LName, LConf0}) when is_map(LConf0) ->
-            Bind = parse_bind(LConf0),
-            Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}),
-            LConf1 = maps:remove(<<"authentication">>, LConf0),
-            LConf3 = maps:put(<<"running">>, Running, LConf1),
-            CurrConn =
-                case Running of
-                    true -> current_conns(Type, LName, Bind);
-                    false -> 0
-                end,
-            LConf4 = maps:put(<<"current_connections">>, CurrConn, LConf3),
-            {Type0, LName, LConf4}
+    lists:filtermap(
+        fun
+            ({LName, LConf0}) when is_map(LConf0) ->
+                Bind = parse_bind(LConf0),
+                Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}),
+                LConf1 = maps:remove(<<"authentication">>, LConf0),
+                LConf3 = maps:put(<<"running">>, Running, LConf1),
+                CurrConn =
+                    case Running of
+                        true -> current_conns(Type, LName, Bind);
+                        false -> 0
+                    end,
+                LConf4 = maps:put(<<"current_connections">>, CurrConn, LConf3),
+                {true, {Type0, LName, LConf4}};
+            ({_LName, _MarkDel}) ->
+                false
         end,
         maps:to_list(Conf)
     ).
@@ -195,7 +205,7 @@ start() ->
     ok = emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE),
     foreach_listeners(fun start_listener/3).
 
--spec start_listener(atom()) -> ok | {error, term()}.
+-spec start_listener(listener_id()) -> ok | {error, term()}.
 start_listener(ListenerId) ->
     apply_on_listener(ListenerId, fun start_listener/3).
 
@@ -246,7 +256,7 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
 restart() ->
     foreach_listeners(fun restart_listener/3).
 
--spec restart_listener(atom()) -> ok | {error, term()}.
+-spec restart_listener(listener_id()) -> ok | {error, term()}.
 restart_listener(ListenerId) ->
     apply_on_listener(ListenerId, fun restart_listener/3).
 
@@ -271,7 +281,7 @@ stop() ->
     _ = emqx_config_handler:remove_handler(?CONF_KEY_PATH),
     foreach_listeners(fun stop_listener/3).
 
--spec stop_listener(atom()) -> ok | {error, term()}.
+-spec stop_listener(listener_id()) -> ok | {error, term()}.
 stop_listener(ListenerId) ->
     apply_on_listener(ListenerId, fun stop_listener/3).
 
@@ -419,7 +429,9 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
     end.
 
 %% Update the listeners at runtime
-pre_config_update([listeners, Type, Name], {create, NewConf}, undefined) ->
+pre_config_update([listeners, Type, Name], {create, NewConf}, V) when
+    V =:= undefined orelse V =:= ?MARK_DEL_BIN
+->
     CertsDir = certs_dir(Type, Name),
     {ok, convert_certs(CertsDir, NewConf)};
 pre_config_update([listeners, _Type, _Name], {create, _NewConf}, _RawConf) ->
@@ -434,6 +446,8 @@ pre_config_update([listeners, Type, Name], {update, Request}, RawConf) ->
 pre_config_update([listeners, _Type, _Name], {action, _Action, Updated}, RawConf) ->
     NewConf = emqx_utils_maps:deep_merge(RawConf, Updated),
     {ok, NewConf};
+pre_config_update([listeners, _Type, _Name], ?MARK_DEL, _RawConf) ->
+    {ok, ?MARK_DEL};
 pre_config_update(_Path, _Request, RawConf) ->
     {ok, RawConf}.
 
@@ -446,9 +460,9 @@ post_config_update([listeners, Type, Name], {update, _Request}, NewConf, OldConf
         #{enabled := true} -> restart_listener(Type, Name, {OldConf, NewConf});
         _ -> ok
     end;
-post_config_update([listeners, _Type, _Name], '$remove', undefined, undefined, _AppEnvs) ->
-    ok;
-post_config_update([listeners, Type, Name], '$remove', undefined, OldConf, _AppEnvs) ->
+post_config_update([listeners, Type, Name], Op, _, OldConf, _AppEnvs) when
+    Op =:= ?MARK_DEL andalso is_map(OldConf)
+->
     ok = unregister_ocsp_stapling_refresh(Type, Name),
     case stop_listener(Type, Name, OldConf) of
         ok ->
@@ -611,6 +625,7 @@ format_bind(Bin) when is_binary(Bin) ->
 listener_id(Type, ListenerName) ->
     list_to_atom(lists:append([str(Type), ":", str(ListenerName)])).
 
+-spec parse_listener_id(listener_id()) -> {ok, #{type => atom(), name => atom()}} | {error, term()}.
 parse_listener_id(Id) ->
     case string:split(str(Id), ":", leading) of
         [Type, Name] ->
@@ -836,3 +851,15 @@ unregister_ocsp_stapling_refresh(Type, Name) ->
     ListenerId = listener_id(Type, Name),
     emqx_ocsp_cache:unregister_listener(ListenerId),
     ok.
+
+%% There is currently an issue with frontend
+%% infinity is not a good value for it, so we use 5m for now
+default_max_conn() ->
+    %% TODO: <<"infinity">>
+    5_000_000.
+
+-ifdef(TEST).
+%% since it's a copy-paste. we need to ensure it's the same atom.
+ensure_same_atom_test() ->
+    ?assertEqual(?MARK_DEL, emqx_schema:tombstone()).
+-endif.

+ 72 - 34
apps/emqx/src/emqx_schema.erl

@@ -100,6 +100,13 @@
     convert_servers/2
 ]).
 
+%% tombstone types
+-export([
+    tombstone/0,
+    tombstone_map/2,
+    get_tombstone_map_value_type/1
+]).
+
 -behaviour(hocon_schema).
 
 -reflect_type([
@@ -777,45 +784,48 @@ fields("listeners") ->
     [
         {"tcp",
             sc(
-                map(name, ref("mqtt_tcp_listener")),
+                tombstone_map(name, ref("mqtt_tcp_listener")),
                 #{
                     desc => ?DESC(fields_listeners_tcp),
-                    default => default_listener(tcp),
+                    converter => fun(X, _) ->
+                        ensure_default_listener(X, tcp)
+                    end,
                     required => {false, recursively}
                 }
             )},
         {"ssl",
             sc(
-                map(name, ref("mqtt_ssl_listener")),
+                tombstone_map(name, ref("mqtt_ssl_listener")),
                 #{
                     desc => ?DESC(fields_listeners_ssl),
-                    default => default_listener(ssl),
+                    converter => fun(X, _) -> ensure_default_listener(X, ssl) end,
                     required => {false, recursively}
                 }
             )},
         {"ws",
             sc(
-                map(name, ref("mqtt_ws_listener")),
+                tombstone_map(name, ref("mqtt_ws_listener")),
                 #{
                     desc => ?DESC(fields_listeners_ws),
-                    default => default_listener(ws),
+                    converter => fun(X, _) -> ensure_default_listener(X, ws) end,
                     required => {false, recursively}
                 }
             )},
         {"wss",
             sc(
-                map(name, ref("mqtt_wss_listener")),
+                tombstone_map(name, ref("mqtt_wss_listener")),
                 #{
                     desc => ?DESC(fields_listeners_wss),
-                    default => default_listener(wss),
+                    converter => fun(X, _) -> ensure_default_listener(X, wss) end,
                     required => {false, recursively}
                 }
             )},
         {"quic",
             sc(
-                map(name, ref("mqtt_quic_listener")),
+                tombstone_map(name, ref("mqtt_quic_listener")),
                 #{
                     desc => ?DESC(fields_listeners_quic),
+                    converter => fun keep_default_tombstone/2,
                     required => {false, recursively}
                 }
             )}
@@ -1943,7 +1953,7 @@ base_listener(Bind) ->
             sc(
                 hoconsc:union([infinity, pos_integer()]),
                 #{
-                    default => <<"infinity">>,
+                    default => emqx_listeners:default_max_conn(),
                     desc => ?DESC(base_listener_max_connections)
                 }
             )},
@@ -3092,20 +3102,12 @@ assert_required_field(Conf, Key, ErrorMessage) ->
 
 default_listener(tcp) ->
     #{
-        <<"default">> =>
-            #{
-                <<"bind">> => <<"0.0.0.0:1883">>,
-                <<"max_connections">> => 1024000
-            }
+        <<"bind">> => <<"0.0.0.0:1883">>
     };
 default_listener(ws) ->
     #{
-        <<"default">> =>
-            #{
-                <<"bind">> => <<"0.0.0.0:8083">>,
-                <<"max_connections">> => 1024000,
-                <<"websocket">> => #{<<"mqtt_path">> => <<"/mqtt">>}
-            }
+        <<"bind">> => <<"0.0.0.0:8083">>,
+        <<"websocket">> => #{<<"mqtt_path">> => <<"/mqtt">>}
     };
 default_listener(SSLListener) ->
     %% The env variable is resolved in emqx_tls_lib by calling naive_env_interpolate
@@ -3120,22 +3122,14 @@ default_listener(SSLListener) ->
     case SSLListener of
         ssl ->
             #{
-                <<"default">> =>
-                    #{
-                        <<"bind">> => <<"0.0.0.0:8883">>,
-                        <<"max_connections">> => 512000,
-                        <<"ssl_options">> => SslOptions
-                    }
+                <<"bind">> => <<"0.0.0.0:8883">>,
+                <<"ssl_options">> => SslOptions
             };
         wss ->
             #{
-                <<"default">> =>
-                    #{
-                        <<"bind">> => <<"0.0.0.0:8084">>,
-                        <<"max_connections">> => 512000,
-                        <<"ssl_options">> => SslOptions,
-                        <<"websocket">> => #{<<"mqtt_path">> => <<"/mqtt">>}
-                    }
+                <<"bind">> => <<"0.0.0.0:8084">>,
+                <<"ssl_options">> => SslOptions,
+                <<"websocket">> => #{<<"mqtt_path">> => <<"/mqtt">>}
             }
     end.
 
@@ -3196,3 +3190,47 @@ special_env(_Name) ->
 -else.
 special_env(_Name) -> error.
 -endif.
+
+%% The tombstone atom.
+tombstone() ->
+    marked_for_deletion.
+
+%% Make a map type, the value of which is allowed to be 'marked_for_deletion'
+%% 'marked_for_delition' is a special value which means the key is deleted.
+%% This is used to support the 'delete' operation in configs,
+%% since deleting the key would result in default value being used.
+tombstone_map(Name, Type) ->
+    %% marked_for_deletion must be the last member of the union
+    %% because we need to first union member to populate the default values
+    map(Name, ?UNION([Type, tombstone()])).
+
+%% inverse of mark_del_map
+get_tombstone_map_value_type(Schema) ->
+    %% TODO: violation of abstraction, expose an API in hoconsc
+    %% hoconsc:map_value_type(Schema)
+    ?MAP(_Name, Union) = hocon_schema:field_schema(Schema, type),
+    %% TODO: violation of abstraction, fix hoconsc:union_members/1
+    ?UNION(Members) = Union,
+    Tombstone = tombstone(),
+    [Type, Tombstone] = hoconsc:union_members(Members),
+    Type.
+
+%% Keep the 'default' tombstone, but delete others.
+keep_default_tombstone(Map, _Opts) when is_map(Map) ->
+    maps:filter(
+        fun(Key, Value) ->
+            Key =:= <<"default">> orelse Value =/= atom_to_binary(tombstone())
+        end,
+        Map
+    );
+keep_default_tombstone(Value, _Opts) ->
+    Value.
+
+ensure_default_listener(undefined, ListenerType) ->
+    %% let the schema's default value do its job
+    #{<<"default">> => default_listener(ListenerType)};
+ensure_default_listener(#{<<"default">> := _} = Map, _ListenerType) ->
+    keep_default_tombstone(Map, #{});
+ensure_default_listener(Map, ListenerType) ->
+    NewMap = Map#{<<"default">> => default_listener(ListenerType)},
+    keep_default_tombstone(NewMap, #{}).

+ 5 - 0
apps/emqx_conf/src/emqx_conf.erl

@@ -24,6 +24,7 @@
 -export([get_by_node/2, get_by_node/3]).
 -export([update/3, update/4]).
 -export([remove/2, remove/3]).
+-export([tombstone/2]).
 -export([reset/2, reset/3]).
 -export([dump_schema/1, dump_schema/3]).
 -export([schema_module/0]).
@@ -107,6 +108,10 @@ update(Node, KeyPath, UpdateReq, Opts0) when Node =:= node() ->
 update(Node, KeyPath, UpdateReq, Opts) ->
     emqx_conf_proto_v2:update(Node, KeyPath, UpdateReq, Opts).
 
+%% @doc Mark the specified key path as tombstone
+tombstone(KeyPath, Opts) ->
+    update(KeyPath, emqx_schema:tombstone(), Opts).
+
 %% @doc remove all value of key path in cluster-override.conf or local-override.conf.
 -spec remove(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
     {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.

+ 3 - 2
apps/emqx_dashboard/src/emqx_dashboard_swagger.erl

@@ -237,8 +237,9 @@ parse_spec_ref(Module, Path, Options) ->
             erlang:apply(Module, schema, [Path])
             %% better error message
         catch
-            error:Reason ->
-                throw({error, #{mfa => {Module, schema, [Path]}, reason => Reason}})
+            error:Reason:Stacktrace ->
+                MoreInfo = #{module => Module, path => Path, reason => Reason},
+                erlang:raise(error, MoreInfo, Stacktrace)
         end,
     {Specs, Refs} = maps:fold(
         fun(Method, Meta, {Acc, RefsAcc}) ->

+ 7 - 5
apps/emqx_management/src/emqx_mgmt_api_listeners.erl

@@ -293,12 +293,14 @@ listeners_type() ->
 listeners_info(Opts) ->
     Listeners = hocon_schema:fields(emqx_schema, "listeners"),
     lists:map(
-        fun({Type, #{type := ?MAP(_Name, ?R_REF(Mod, Field))}}) ->
-            Fields0 = hocon_schema:fields(Mod, Field),
+        fun({ListenerType, Schema}) ->
+            Type = emqx_schema:get_tombstone_map_value_type(Schema),
+            ?R_REF(Mod, StructName) = Type,
+            Fields0 = hocon_schema:fields(Mod, StructName),
             Fields1 = lists:keydelete("authentication", 1, Fields0),
             Fields3 = required_bind(Fields1, Opts),
-            Ref = listeners_ref(Type, Opts),
-            TypeAtom = list_to_existing_atom(Type),
+            Ref = listeners_ref(ListenerType, Opts),
+            TypeAtom = list_to_existing_atom(ListenerType),
             #{
                 ref => ?R_REF(Ref),
                 schema => [
@@ -642,7 +644,7 @@ create(Path, Conf) ->
     wrap(emqx_conf:update(Path, {create, Conf}, ?OPTS(cluster))).
 
 ensure_remove(Path) ->
-    wrap(emqx_conf:remove(Path, ?OPTS(cluster))).
+    wrap(emqx_conf:update(Path, emqx_schema:tombstone(), ?OPTS(cluster))).
 
 wrap({error, {post_config_update, emqx_listeners, Reason}}) -> {error, Reason};
 wrap({error, {pre_config_update, emqx_listeners, Reason}}) -> {error, Reason};

+ 11 - 13
apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl

@@ -51,13 +51,13 @@ init_per_group(with_defaults_in_file, Config) ->
     %% if there is no config file, the such deletion would result in a deletion
     %% of the default listener.
     Name = atom_to_list(?MODULE) ++ "-default-listeners",
-    TmpConfFullPath = inject_tmp_config_content(Name, default_listeners_hcon_text()),
+    TmpConfFullPath = inject_tmp_config_content(Name, default_listeners_hocon_text()),
     emqx_mgmt_api_test_util:init_suite([emqx_conf]),
     [{injected_conf_file, TmpConfFullPath} | Config].
 
 end_per_group(Group, Config) ->
-    emqx_conf:remove([listeners, tcp, new], #{override_to => cluster}),
-    emqx_conf:remove([listeners, tcp, new1], #{override_to => local}),
+    emqx_conf:tombstone([listeners, tcp, new], #{override_to => cluster}),
+    emqx_conf:tombstone([listeners, tcp, new1], #{override_to => local}),
     case Group =:= with_defaults_in_file of
         true ->
             {_, File} = lists:keyfind(injected_conf_file, 1, Config),
@@ -94,16 +94,16 @@ t_max_connection_default({init, Config}) ->
 t_max_connection_default({'end', Config}) ->
     ok = file:delete(proplists:get_value(tmp_config_file, Config));
 t_max_connection_default(Config) when is_list(Config) ->
-    %% Check infinity is binary not atom.
     #{<<"listeners">> := Listeners} = emqx_mgmt_api_listeners:do_list_listeners(),
     Target = lists:filter(
         fun(#{<<"id">> := Id}) -> Id =:= 'tcp:max_connection_test' end,
         Listeners
     ),
-    ?assertMatch([#{<<"max_connections">> := <<"infinity">>}], Target),
+    DefaultMaxConn = emqx_listeners:default_max_conn(),
+    ?assertMatch([#{<<"max_connections">> := DefaultMaxConn}], Target),
     NewPath = emqx_mgmt_api_test_util:api_path(["listeners", "tcp:max_connection_test"]),
-    ?assertMatch(#{<<"max_connections">> := <<"infinity">>}, request(get, NewPath, [], [])),
-    emqx_conf:remove([listeners, tcp, max_connection_test], #{override_to => cluster}),
+    ?assertMatch(#{<<"max_connections">> := DefaultMaxConn}, request(get, NewPath, [], [])),
+    emqx_conf:tombstone([listeners, tcp, max_connection_test], #{override_to => cluster}),
     ok.
 
 t_list_listeners(Config) when is_list(Config) ->
@@ -114,7 +114,7 @@ t_list_listeners(Config) when is_list(Config) ->
 
     %% POST /listeners
     ListenerId = <<"tcp:default">>,
-    NewListenerId = <<"tcp:new">>,
+    NewListenerId = <<"tcp:new11">>,
 
     OriginPath = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]),
     NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]),
@@ -128,7 +128,7 @@ t_list_listeners(Config) when is_list(Config) ->
     OriginListener2 = maps:remove(<<"id">>, OriginListener),
     Port = integer_to_binary(?PORT),
     NewConf = OriginListener2#{
-        <<"name">> => <<"new">>,
+        <<"name">> => <<"new11">>,
         <<"bind">> => <<"0.0.0.0:", Port/binary>>,
         <<"max_connections">> := <<"infinity">>
     },
@@ -298,8 +298,6 @@ crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type, Port
     OriginPath = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]),
     NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]),
     OriginListener = request(get, OriginPath, [], []),
-    ct:pal("raw conf: ~p~n", [emqx_config:get_raw([listeners])]),
-    ct:pal("OriginListener:~p", [OriginListener]),
 
     %% create with full options
     ?assertEqual({error, not_found}, is_running(NewListenerId)),
@@ -314,7 +312,7 @@ crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type, Port
     ?assertEqual(lists:sort(maps:keys(OriginListener)), lists:sort(maps:keys(Create))),
     Get1 = request(get, NewPath, [], []),
     ?assertMatch(Create, Get1),
-    ?assert(is_running(NewListenerId)),
+    ?assertEqual({true, NewListenerId}, {is_running(NewListenerId), NewListenerId}),
 
     %% create with required options
     MinPath = emqx_mgmt_api_test_util:api_path(["listeners", MinListenerId]),
@@ -448,7 +446,7 @@ data_file(Name) ->
 cert_file(Name) ->
     data_file(filename:join(["certs", Name])).
 
-default_listeners_hcon_text() ->
+default_listeners_hocon_text() ->
     Sc = #{roots => emqx_schema:fields("listeners")},
     Listeners = hocon_tconf:make_serializable(Sc, #{}, #{}),
     Config = #{<<"listeners">> => Listeners},