Просмотр исходного кода

Merge pull request #7675 from zhongwencool/api-listener

refatcor: simplify updating the listener
zhongwencool 3 лет назад
Родитель
Сommit
051ed84fc8

+ 1 - 1
apps/emqx/etc/emqx.conf

@@ -43,7 +43,7 @@ listeners.tcp.default {
   ##
   ##
   ## @doc listeners.tcp.<name>.access_rules
   ## @doc listeners.tcp.<name>.access_rules
   ## ValueType: Array<AccessRules>
   ## ValueType: Array<AccessRules>
-  ## Default: []
+  ## Default: ["allow all"]
   ## Examples:
   ## Examples:
   ##   access_rules: [
   ##   access_rules: [
   ##     "deny 192.168.0.0/24",
   ##     "deny 192.168.0.0/24",

+ 64 - 52
apps/emqx/src/emqx_listeners.erl

@@ -50,11 +50,11 @@
     parse_listener_id/1
     parse_listener_id/1
 ]).
 ]).
 
 
--export([post_config_update/5]).
+-export([pre_config_update/3, post_config_update/5]).
 
 
 -export([format_addr/1]).
 -export([format_addr/1]).
 
 
--define(CONF_KEY_PATH, [listeners]).
+-define(CONF_KEY_PATH, [listeners, '?', '?']).
 -define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]).
 -define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]).
 
 
 -spec id_example() -> atom().
 -spec id_example() -> atom().
@@ -202,7 +202,13 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
                 "Failed to start listener ~ts on ~ts: ~0p~n",
                 "Failed to start listener ~ts on ~ts: ~0p~n",
                 [ListenerId, BindStr, Reason]
                 [ListenerId, BindStr, Reason]
             ),
             ),
-            error({failed_to_start, ListenerId, BindStr, Reason})
+            Msg = lists:flatten(
+                io_lib:format(
+                    "~ts(~ts) : ~p",
+                    [ListenerId, BindStr, element(1, Reason)]
+                )
+            ),
+            {error, {failed_to_start, Msg}}
     end.
     end.
 
 
 %% @doc Restart all listeners
 %% @doc Restart all listeners
@@ -214,7 +220,7 @@ restart() ->
 restart_listener(ListenerId) ->
 restart_listener(ListenerId) ->
     apply_on_listener(ListenerId, fun restart_listener/3).
     apply_on_listener(ListenerId, fun restart_listener/3).
 
 
--spec restart_listener(atom(), atom(), map()) -> ok | {error, term()}.
+-spec restart_listener(atom(), atom(), map() | {map(), map()}) -> ok | {error, term()}.
 restart_listener(Type, ListenerName, {OldConf, NewConf}) ->
 restart_listener(Type, ListenerName, {OldConf, NewConf}) ->
     restart_listener(Type, ListenerName, OldConf, NewConf);
     restart_listener(Type, ListenerName, OldConf, NewConf);
 restart_listener(Type, ListenerName, Conf) ->
 restart_listener(Type, ListenerName, Conf) ->
@@ -334,54 +340,38 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) ->
             {ok, {skipped, quic_app_missing}}
             {ok, {skipped, quic_app_missing}}
     end.
     end.
 
 
-delete_authentication(Type, ListenerName, _Conf) ->
-    emqx_authentication:delete_chain(listener_id(Type, ListenerName)).
-
 %% Update the listeners at runtime
 %% Update the listeners at runtime
-post_config_update(_, _Req, NewListeners, OldListeners, _AppEnvs) ->
-    #{added := Added, removed := Removed, changed := Updated} =
-        diff_listeners(NewListeners, OldListeners),
-    try
-        perform_listener_changes(fun stop_listener/3, Removed),
-        perform_listener_changes(fun delete_authentication/3, Removed),
-        perform_listener_changes(fun start_listener/3, Added),
-        perform_listener_changes(fun restart_listener/3, Updated)
-    catch
-        error:{failed_to_start, ListenerId, Bind, Reason} ->
-            Error = lists:flatten(
-                io_lib:format(
-                    "~ts(~ts) failed with ~ts",
-                    [ListenerId, Bind, element(1, Reason)]
-                )
-            ),
-            {error, Error}
-    end.
-
-perform_listener_changes(Action, MapConfs) ->
-    lists:foreach(
-        fun({Id, Conf}) ->
-            {ok, #{type := Type, name := Name}} = parse_listener_id(Id),
-            Action(Type, Name, Conf)
-        end,
-        maps:to_list(MapConfs)
-    ).
-
-diff_listeners(NewListeners, OldListeners) ->
-    emqx_map_lib:diff_maps(flatten_listeners(NewListeners), flatten_listeners(OldListeners)).
-
-flatten_listeners(Conf0) ->
-    maps:from_list(
-        lists:append([
-            do_flatten_listeners(Type, Conf)
-         || {Type, Conf} <- maps:to_list(Conf0)
-        ])
-    ).
-
-do_flatten_listeners(Type, Conf0) ->
-    [
-        {listener_id(Type, Name), maps:remove(authentication, Conf)}
-     || {Name, Conf} <- maps:to_list(Conf0)
-    ].
+pre_config_update([listeners, Type, Name], {create, NewConf}, undefined) ->
+    CertsDir = certs_dir(Type, Name),
+    {ok, convert_certs(CertsDir, NewConf)};
+pre_config_update([listeners, _Type, _Name], {create, _NewConf}, _RawConf) ->
+    {error, already_exist};
+pre_config_update([listeners, _Type, _Name], {update, _Request}, undefined) ->
+    {error, not_found};
+pre_config_update([listeners, Type, Name], {update, Request}, RawConf) ->
+    NewConf = emqx_map_lib:deep_merge(RawConf, Request),
+    CertsDir = certs_dir(Type, Name),
+    {ok, convert_certs(CertsDir, NewConf)};
+pre_config_update(_Path, _Request, RawConf) ->
+    {ok, RawConf}.
+
+post_config_update([listeners, Type, Name], {create, _Request}, NewConf, undefined, _AppEnvs) ->
+    start_listener(Type, Name, NewConf);
+post_config_update([listeners, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) ->
+    restart_listener(Type, Name, {OldConf, NewConf});
+post_config_update([listeners, _Type, _Name], '$remove', undefined, undefined, _AppEnvs) ->
+    {error, not_found};
+post_config_update([listeners, Type, Name], '$remove', undefined, OldConf, _AppEnvs) ->
+    case stop_listener(Type, Name, OldConf) of
+        ok ->
+            _ = emqx_authentication:delete_chain(listener_id(Type, Name)),
+            CertsDir = certs_dir(Type, Name),
+            clear_certs(CertsDir, OldConf);
+        Err ->
+            Err
+    end;
+post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) ->
+    ok.
 
 
 esockd_opts(Type, Opts0) ->
 esockd_opts(Type, Opts0) ->
     Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
     Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
@@ -518,7 +508,10 @@ foreach_listeners(Do) ->
     lists:foreach(
     lists:foreach(
         fun({Id, LConf}) ->
         fun({Id, LConf}) ->
             {ok, #{type := Type, name := Name}} = parse_listener_id(Id),
             {ok, #{type := Type, name := Name}} = parse_listener_id(Id),
-            Do(Type, Name, LConf)
+            case Do(Type, Name, LConf) of
+                {error, {failed_to_start, _} = Reason} -> error(Reason);
+                _ -> ok
+            end
         end,
         end,
         list()
         list()
     ).
     ).
@@ -552,3 +545,22 @@ parse_bind(#{<<"bind">> := Bind}) ->
         {ok, L} -> L;
         {ok, L} -> L;
         {error, _} -> binary_to_integer(Bind)
         {error, _} -> binary_to_integer(Bind)
     end.
     end.
+
+%% The relative dir for ssl files.
+certs_dir(Type, Name) ->
+    iolist_to_binary(filename:join(["listeners", Type, Name])).
+
+convert_certs(CertsDir, Conf) ->
+    case emqx_tls_lib:ensure_ssl_files(CertsDir, maps:get(<<"ssl">>, Conf, undefined)) of
+        {ok, undefined} ->
+            Conf;
+        {ok, SSL} ->
+            Conf#{<<"ssl">> => SSL};
+        {error, Reason} ->
+            ?SLOG(error, Reason#{msg => "bad_ssl_config"}),
+            throw({bad_ssl_config, Reason})
+    end.
+
+clear_certs(CertsDir, Conf) ->
+    OldSSL = maps:get(<<"ssl">>, Conf, undefined),
+    emqx_tls_lib:delete_ssl_files(CertsDir, undefined, OldSSL).

+ 4 - 2
apps/emqx/src/emqx_schema.erl

@@ -1680,7 +1680,8 @@ mqtt_listener() ->
                     #{
                     #{
                         desc =>
                         desc =>
                             "The access control rules for this listener.<br/>"
                             "The access control rules for this listener.<br/>"
-                            "See: https://github.com/emqtt/esockd#allowdeny"
+                            "See: https://github.com/emqtt/esockd#allowdeny",
+                        default => [<<"allow all">>]
                     }
                     }
                 )},
                 )},
             {"proxy_protocol",
             {"proxy_protocol",
@@ -1700,7 +1701,8 @@ mqtt_listener() ->
                     #{
                     #{
                         desc =>
                         desc =>
                             "Timeout for proxy protocol. EMQX will close the TCP connection "
                             "Timeout for proxy protocol. EMQX will close the TCP connection "
-                            "if proxy protocol packet is not received within the timeout."
+                            "if proxy protocol packet is not received within the timeout.",
+                        default => "3s"
                     }
                     }
                 )},
                 )},
             {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME,
             {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME,

+ 18 - 18
apps/emqx_dashboard/src/emqx_dashboard_swagger.erl

@@ -398,36 +398,36 @@ trans_desc(Init, Hocon, Func, Name) ->
     end.
     end.
 
 
 trans_description(Spec, Hocon) ->
 trans_description(Spec, Hocon) ->
-    case trans_desc(<<"desc">>, Hocon, undefined) of
+    Desc =
+        case desc_struct(Hocon) of
+            undefined -> undefined;
+            ?DESC(_, _) = Struct -> get_i18n(<<"desc">>, Struct, undefined);
+            Struct -> to_bin(Struct)
+        end,
+    case Desc of
         undefined -> Spec;
         undefined -> Spec;
-        Value -> Spec#{description => Value}
+        Desc -> Spec#{description => Desc}
     end.
     end.
 
 
+get_i18n(Key, Struct, Default) ->
+    {ok, #{cache := Cache, lang := Lang}} = emqx_dashboard:get_i18n(),
+    Desc = hocon_schema:resolve_schema(Struct, Cache),
+    emqx_map_lib:deep_get([Key, Lang], Desc, Default).
+
 trans_label(Spec, Hocon, Default) ->
 trans_label(Spec, Hocon, Default) ->
-    Label = trans_desc(<<"label">>, Hocon, Default),
+    Label =
+        case desc_struct(Hocon) of
+            ?DESC(_, _) = Struct -> get_i18n(<<"label">>, Struct, Default);
+            _ -> Default
+        end,
     Spec#{label => Label}.
     Spec#{label => Label}.
 
 
-trans_desc(Key, Hocon, Default) ->
-    case resolve_desc(Key, desc_struct(Hocon)) of
-        undefined -> Default;
-        Value -> to_bin(Value)
-    end.
-
 desc_struct(Hocon) ->
 desc_struct(Hocon) ->
     case hocon_schema:field_schema(Hocon, desc) of
     case hocon_schema:field_schema(Hocon, desc) of
         undefined -> hocon_schema:field_schema(Hocon, description);
         undefined -> hocon_schema:field_schema(Hocon, description);
         Struct -> Struct
         Struct -> Struct
     end.
     end.
 
 
-resolve_desc(_Key, Bin) when is_binary(Bin) -> Bin;
-resolve_desc(Key, Struct) ->
-    {ok, #{cache := Cache, lang := Lang}} = emqx_dashboard:get_i18n(),
-    Desc = hocon_schema:resolve_schema(Struct, Cache),
-    case is_map(Desc) of
-        true -> emqx_map_lib:deep_get([Key, Lang], Desc, undefined);
-        false -> Desc
-    end.
-
 request_body(#{content := _} = Content, _Module, _Options) ->
 request_body(#{content := _} = Content, _Module, _Options) ->
     {Content, []};
     {Content, []};
 request_body([], _Module, _Options) ->
 request_body([], _Module, _Options) ->

+ 191 - 51
apps/emqx_management/src/emqx_mgmt_api_listeners.erl

@@ -30,9 +30,7 @@
 
 
 %% for rpc call
 %% for rpc call
 -export([
 -export([
-    do_list_listeners/0,
-    do_update_listener/2,
-    do_remove_listener/1
+    do_list_listeners/0
 ]).
 ]).
 
 
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx.hrl").
@@ -43,7 +41,6 @@
 -define(LISTENER_NOT_FOUND, <<"Listener id not found">>).
 -define(LISTENER_NOT_FOUND, <<"Listener id not found">>).
 -define(LISTENER_ID_INCONSISTENT, <<"Path and body's listener id not match">>).
 -define(LISTENER_ID_INCONSISTENT, <<"Path and body's listener id not match">>).
 -define(ADDR_PORT_INUSE, <<"Addr port in use">>).
 -define(ADDR_PORT_INUSE, <<"Addr port in use">>).
-
 -define(OPTS(_OverrideTo_), #{rawconf_with_defaults => true, override_to => _OverrideTo_}).
 -define(OPTS(_OverrideTo_), #{rawconf_with_defaults => true, override_to => _OverrideTo_}).
 
 
 namespace() -> "listeners".
 namespace() -> "listeners".
@@ -65,7 +62,13 @@ schema("/listeners_status") ->
         get => #{
         get => #{
             tags => [<<"listeners">>],
             tags => [<<"listeners">>],
             desc => <<"List all running node's listeners live status. group by listener type">>,
             desc => <<"List all running node's listeners live status. group by listener type">>,
-            responses => #{200 => ?HOCON(?ARRAY(?R_REF(listener_type_status)))}
+            responses => #{
+                200 =>
+                    emqx_dashboard_swagger:schema_with_example(
+                        ?ARRAY(?R_REF(listener_type_status)),
+                        listener_type_status_example()
+                    )
+            }
         }
         }
     };
     };
 schema("/listeners") ->
 schema("/listeners") ->
@@ -78,10 +81,16 @@ schema("/listeners") ->
                 {type,
                 {type,
                     ?HOCON(
                     ?HOCON(
                         ?ENUM(listeners_type()),
                         ?ENUM(listeners_type()),
-                        #{desc => "Listener type", in => query, required => false}
+                        #{desc => "Listener type", in => query, required => false, example => tcp}
                     )}
                     )}
             ],
             ],
-            responses => #{200 => ?HOCON(?ARRAY(?R_REF(listener_id_status)))}
+            responses => #{
+                200 =>
+                    emqx_dashboard_swagger:schema_with_example(
+                        ?ARRAY(?R_REF(listener_id_status)),
+                        listener_id_status_example()
+                    )
+            }
         }
         }
     };
     };
 schema("/listeners/:id") ->
 schema("/listeners/:id") ->
@@ -92,7 +101,7 @@ schema("/listeners/:id") ->
             desc => <<"List all running node's listeners for the specified id.">>,
             desc => <<"List all running node's listeners for the specified id.">>,
             parameters => [?R_REF(listener_id)],
             parameters => [?R_REF(listener_id)],
             responses => #{
             responses => #{
-                200 => ?HOCON(listener_schema(#{bind => true})),
+                200 => listener_schema(#{bind => true}),
                 404 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND)
                 404 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND)
             }
             }
         },
         },
@@ -100,9 +109,9 @@ schema("/listeners/:id") ->
             tags => [<<"listeners">>],
             tags => [<<"listeners">>],
             desc => <<"Update the specified listener on all nodes.">>,
             desc => <<"Update the specified listener on all nodes.">>,
             parameters => [?R_REF(listener_id)],
             parameters => [?R_REF(listener_id)],
-            'requestBody' => ?HOCON(listener_schema(#{bind => false}), #{}),
+            'requestBody' => listener_schema(#{bind => false}),
             responses => #{
             responses => #{
-                200 => ?HOCON(listener_schema(#{bind => true}), #{}),
+                200 => listener_schema(#{bind => true}),
                 400 => error_codes(['BAD_REQUEST']),
                 400 => error_codes(['BAD_REQUEST']),
                 404 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND)
                 404 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND)
             }
             }
@@ -111,9 +120,9 @@ schema("/listeners/:id") ->
             tags => [<<"listeners">>],
             tags => [<<"listeners">>],
             desc => <<"Create the specified listener on all nodes.">>,
             desc => <<"Create the specified listener on all nodes.">>,
             parameters => [?R_REF(listener_id)],
             parameters => [?R_REF(listener_id)],
-            'requestBody' => ?HOCON(listener_schema(#{bind => true}), #{}),
+            'requestBody' => listener_schema(#{bind => true}),
             responses => #{
             responses => #{
-                200 => ?HOCON(listener_schema(#{bind => true}), #{}),
+                200 => listener_schema(#{bind => true}),
                 400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'])
                 400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'])
             }
             }
         },
         },
@@ -149,8 +158,9 @@ fields(listener_id) ->
         {id,
         {id,
             ?HOCON(atom(), #{
             ?HOCON(atom(), #{
                 desc => "Listener id",
                 desc => "Listener id",
-                example => 'tcp:default',
+                example => 'tcp:demo',
                 validator => fun validate_id/1,
                 validator => fun validate_id/1,
+                required => true,
                 in => path
                 in => path
             })}
             })}
     ];
     ];
@@ -184,7 +194,13 @@ fields(listener_id_status) ->
     fields(listener_id) ++
     fields(listener_id) ++
         [
         [
             {enable, ?HOCON(boolean(), #{desc => "Listener enable", required => true})},
             {enable, ?HOCON(boolean(), #{desc => "Listener enable", required => true})},
-            {number, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId number"})},
+            {number, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId counter"})},
+            {bind,
+                ?HOCON(
+                    hoconsc:union([emqx_schema:ip_port(), integer()]),
+                    #{desc => "Listener bind addr", required => true}
+                )},
+            {acceptors, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId acceptors"})},
             {status, ?HOCON(?R_REF(status))},
             {status, ?HOCON(?R_REF(status))},
             {node_status, ?HOCON(?ARRAY(?R_REF(node_status)))}
             {node_status, ?HOCON(?ARRAY(?R_REF(node_status)))}
         ];
         ];
@@ -202,7 +218,10 @@ fields(Type) ->
     Schema.
     Schema.
 
 
 listener_schema(Opts) ->
 listener_schema(Opts) ->
-    ?UNION(lists:map(fun(#{ref := Ref}) -> Ref end, listeners_info(Opts))).
+    emqx_dashboard_swagger:schema_with_example(
+        ?UNION(lists:map(fun(#{ref := Ref}) -> Ref end, listeners_info(Opts))),
+        tcp_schema_example()
+    ).
 
 
 listeners_type() ->
 listeners_type() ->
     lists:map(
     lists:map(
@@ -266,7 +285,13 @@ validate_id(Id) ->
 %% api
 %% api
 listener_type_status(get, _Request) ->
 listener_type_status(get, _Request) ->
     Listeners = maps:to_list(listener_status_by_type(list_listeners(), #{})),
     Listeners = maps:to_list(listener_status_by_type(list_listeners(), #{})),
-    List = lists:map(fun({Type, L}) -> L#{type => Type} end, Listeners),
+    List = lists:map(
+        fun({Type, L}) ->
+            L1 = maps:without([bind, acceptors], L),
+            L1#{type => Type}
+        end,
+        Listeners
+    ),
     {200, List}.
     {200, List}.
 
 
 list_listeners(get, #{query_string := Query}) ->
 list_listeners(get, #{query_string := Query}) ->
@@ -291,15 +316,17 @@ crud_listeners_by_id(get, #{bindings := #{id := Id0}}) ->
 crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) ->
 crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) ->
     case parse_listener_conf(Body0) of
     case parse_listener_conf(Body0) of
         {Id, Type, Name, Conf} ->
         {Id, Type, Name, Conf} ->
-            Key = [listeners, Type, Name],
-            case emqx_conf:get_raw(Key, undefined) of
+            Path = [listeners, Type, Name],
+            case emqx_conf:get_raw(Path, undefined) of
                 undefined ->
                 undefined ->
                     {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}};
                     {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}};
                 PrevConf ->
                 PrevConf ->
                     MergeConf = emqx_map_lib:deep_merge(PrevConf, Conf),
                     MergeConf = emqx_map_lib:deep_merge(PrevConf, Conf),
-                    case emqx_conf:update(Key, MergeConf, ?OPTS(cluster)) of
+                    case update(Path, MergeConf) of
                         {ok, #{raw_config := _RawConf}} ->
                         {ok, #{raw_config := _RawConf}} ->
                             crud_listeners_by_id(get, #{bindings => #{id => Id}});
                             crud_listeners_by_id(get, #{bindings => #{id => Id}});
+                        {error, not_found} ->
+                            {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}};
                         {error, Reason} ->
                         {error, Reason} ->
                             {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
                             {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
                     end
                     end
@@ -312,17 +339,14 @@ crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) ->
 crud_listeners_by_id(post, #{bindings := #{id := Id}, body := Body0}) ->
 crud_listeners_by_id(post, #{bindings := #{id := Id}, body := Body0}) ->
     case parse_listener_conf(Body0) of
     case parse_listener_conf(Body0) of
         {Id, Type, Name, Conf} ->
         {Id, Type, Name, Conf} ->
-            Key = [listeners, Type, Name],
-            case emqx_conf:get(Key, undefined) of
-                undefined ->
-                    case emqx_conf:update([listeners, Type, Name], Conf, ?OPTS(cluster)) of
-                        {ok, #{raw_config := _RawConf}} ->
-                            crud_listeners_by_id(get, #{bindings => #{id => Id}});
-                        {error, Reason} ->
-                            {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
-                    end;
-                _ ->
-                    {400, #{code => 'BAD_LISTENER_ID', message => <<"Already Exist">>}}
+            Path = [listeners, Type, Name],
+            case create(Path, Conf) of
+                {ok, #{raw_config := _RawConf}} ->
+                    crud_listeners_by_id(get, #{bindings => #{id => Id}});
+                {error, already_exist} ->
+                    {400, #{code => 'BAD_LISTENER_ID', message => <<"Already Exist">>}};
+                {error, Reason} ->
+                    {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
             end;
             end;
         {error, Reason} ->
         {error, Reason} ->
             {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
             {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
@@ -331,8 +355,9 @@ crud_listeners_by_id(post, #{bindings := #{id := Id}, body := Body0}) ->
     end;
     end;
 crud_listeners_by_id(delete, #{bindings := #{id := Id}}) ->
 crud_listeners_by_id(delete, #{bindings := #{id := Id}}) ->
     {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
     {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
-    case emqx_conf:remove([listeners, Type, Name], ?OPTS(cluster)) of
+    case remove([listeners, Type, Name]) of
         {ok, _} -> {204};
         {ok, _} -> {204};
+        {error, not_found} -> {204};
         {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
         {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
     end.
     end.
 
 
@@ -465,23 +490,6 @@ do_list_listeners() ->
         <<"listeners">> => Listeners
         <<"listeners">> => Listeners
     }.
     }.
 
 
--spec do_update_listener(string(), emqx_config:update_request()) ->
-    {ok, map()} | {error, _}.
-do_update_listener(Id, Config) ->
-    {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
-    case emqx:update_config([listeners, Type, Name], Config, ?OPTS(local)) of
-        {ok, #{raw_config := RawConf}} -> {ok, RawConf};
-        {error, Reason} -> {error, Reason}
-    end.
-
--spec do_remove_listener(string()) -> ok.
-do_remove_listener(Id) ->
-    {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
-    case emqx:remove_config([listeners, Type, Name], ?OPTS(local)) of
-        {ok, _} -> ok;
-        {error, Reason} -> error(Reason)
-    end.
-
 wrap_rpc({badrpc, Reason}) ->
 wrap_rpc({badrpc, Reason}) ->
     {error, Reason};
     {error, Reason};
 wrap_rpc(Res) ->
 wrap_rpc(Res) ->
@@ -492,7 +500,9 @@ format_status(Key, Node, Listener, Acc) ->
         <<"id">> := Id,
         <<"id">> := Id,
         <<"running">> := Running,
         <<"running">> := Running,
         <<"max_connections">> := MaxConnections,
         <<"max_connections">> := MaxConnections,
-        <<"current_connections">> := CurrentConnections
+        <<"current_connections">> := CurrentConnections,
+        <<"acceptors">> := Acceptors,
+        <<"bind">> := Bind
     } = Listener,
     } = Listener,
     GroupKey = maps:get(Key, Listener),
     GroupKey = maps:get(Key, Listener),
     case maps:find(GroupKey, Acc) of
     case maps:find(GroupKey, Acc) of
@@ -501,6 +511,8 @@ format_status(Key, Node, Listener, Acc) ->
                 GroupKey => #{
                 GroupKey => #{
                     enable => Running,
                     enable => Running,
                     ids => [Id],
                     ids => [Id],
+                    acceptors => Acceptors,
+                    bind => Bind,
                     status => #{
                     status => #{
                         max_connections => MaxConnections,
                         max_connections => MaxConnections,
                         current_connections => CurrentConnections
                         current_connections => CurrentConnections
@@ -555,6 +567,134 @@ format_status(Key, Node, Listener, Acc) ->
             }
             }
     end.
     end.
 
 
-max_conn(_Int1, infinity) -> infinity;
-max_conn(infinity, _Int) -> infinity;
+max_conn(_Int1, <<"infinity">>) -> <<"infinity">>;
+max_conn(<<"infinity">>, _Int) -> <<"infinity">>;
 max_conn(Int1, Int2) -> Int1 + Int2.
 max_conn(Int1, Int2) -> Int1 + Int2.
+
+update(Path, Conf) ->
+    wrap(emqx_conf:update(Path, {update, Conf}, ?OPTS(cluster))).
+
+create(Path, Conf) ->
+    wrap(emqx_conf:update(Path, {create, Conf}, ?OPTS(cluster))).
+
+remove(Path) ->
+    wrap(emqx_conf:remove(Path, ?OPTS(cluster))).
+
+wrap({error, {post_config_update, emqx_listeners, Reason}}) -> {error, Reason};
+wrap({error, {pre_config_update, emqx_listeners, Reason}}) -> {error, Reason};
+wrap({error, Reason}) -> {error, Reason};
+wrap(Ok) -> Ok.
+
+listener_type_status_example() ->
+    [
+        #{
+            enable => false,
+            ids => ["tcp:demo"],
+            node_status => #{
+                'emqx@127.0.0.1' => #{
+                    current_connections => 11,
+                    max_connections => 1024000
+                },
+                'emqx@127.0.0.2' => #{
+                    current_connections => 10,
+                    max_connections => 1024000
+                }
+            },
+            status => #{
+                current_connections => 21,
+                max_connections => 2048000
+            },
+            type => tcp
+        },
+        #{
+            enable => false,
+            ids => ["ssl:default"],
+            node_status => #{
+                'emqx@127.0.0.1' => #{
+                    current_connections => 31,
+                    max_connections => infinity
+                },
+                'emqx@127.0.0.2' => #{
+                    current_connections => 40,
+                    max_connections => infinity
+                }
+            },
+            status => #{
+                current_connections => 71,
+                max_connections => infinity
+            },
+            type => ssl
+        }
+    ].
+
+listener_id_status_example() ->
+    [
+        #{
+            acceptors => 16,
+            bind => <<"0.0.0.0:1884">>,
+            enable => true,
+            id => <<"tcp:demo">>,
+            node_status => #{
+                'emqx@127.0.0.1' => #{
+                    current_connections => 100,
+                    max_connections => 1024000
+                },
+                'emqx@127.0.0.2' => #{
+                    current_connections => 101,
+                    max_connections => 1024000
+                }
+            },
+            number => 2,
+            status => #{
+                current_connections => 201,
+                max_connections => 2048000
+            }
+        },
+        #{
+            acceptors => 32,
+            bind => <<"0.0.0.0:1883">>,
+            enable => true,
+            id => <<"tcp:default">>,
+            node_status => #{
+                'emqx@127.0.0.1' => #{
+                    current_connections => 300,
+                    max_connections => infinity
+                },
+                'emqx@127.0.0.2' => #{
+                    current_connections => 201,
+                    max_connections => infinity
+                }
+            },
+            number => 2,
+            status => #{
+                current_connections => 501,
+                max_connections => infinity
+            }
+        }
+    ].
+
+tcp_schema_example() ->
+    #{
+        acceptors => 16,
+        access_rules => ["allow all"],
+        bind => <<"0.0.0.0:1884">>,
+        current_connections => 10240,
+        id => <<"tcp:demo">>,
+        max_connections => 204800,
+        mountpoint => <<"/">>,
+        proxy_protocol => false,
+        proxy_protocol_timeout => <<"3s">>,
+        running => true,
+        tcp => #{
+            active_n => 100,
+            backlog => 1024,
+            buffer => <<"4KB">>,
+            high_watermark => <<"1MB">>,
+            nodelay => false,
+            reuseaddr => true,
+            send_timeout => <<"15s">>,
+            send_timeout_close => true
+        },
+        type => tcp,
+        zone => default
+    }.

+ 25 - 21
apps/emqx_management/src/emqx_mgmt_api_sys.erl

@@ -22,14 +22,14 @@
 -include_lib("typerefl/include/types.hrl").
 -include_lib("typerefl/include/types.hrl").
 
 
 %% API
 %% API
--export([ api_spec/0
-        , paths/0
-        , schema/1
-        , namespace/0
-        ]).
+-export([
+    api_spec/0,
+    paths/0,
+    schema/1,
+    namespace/0
+]).
 
 
--export([ sys/2
-        ]).
+-export([sys/2]).
 
 
 -define(TAGS, [<<"sys">>]).
 -define(TAGS, [<<"sys">>]).
 
 
@@ -61,8 +61,8 @@ schema("/mqtt/sys_topics") ->
                 responses =>
                 responses =>
                     #{
                     #{
                         200 => schema_sys_topics()
                         200 => schema_sys_topics()
-                     }
-             },
+                    }
+            },
         put =>
         put =>
             #{
             #{
                 tags => ?TAGS,
                 tags => ?TAGS,
@@ -71,20 +71,24 @@ schema("/mqtt/sys_topics") ->
                 responses =>
                 responses =>
                     #{
                     #{
                         200 => schema_sys_topics()
                         200 => schema_sys_topics()
-                     }
-             }
-     }.
+                    }
+            }
+    }.
 
 
 schema_sys_topics() ->
 schema_sys_topics() ->
     emqx_dashboard_swagger:schema_with_example(
     emqx_dashboard_swagger:schema_with_example(
-      hoconsc:ref(emqx_schema, "sys_topics"), example_sys_topics()).
+        hoconsc:ref(emqx_schema, "sys_topics"), example_sys_topics()
+    ).
 
 
 example_sys_topics() ->
 example_sys_topics() ->
-    #{<<"sys_event_messages">> =>
-      #{<<"client_connected">> => true,
-        <<"client_disconnected">> => true,
-        <<"client_subscribed">> => false,
-        <<"client_unsubscribed">> => false},
-      <<"sys_heartbeat_interval">> => <<"30s">>,
-      <<"sys_msg_interval">> => <<"1m">>
-     }.
+    #{
+        <<"sys_event_messages">> =>
+            #{
+                <<"client_connected">> => true,
+                <<"client_disconnected">> => true,
+                <<"client_subscribed">> => false,
+                <<"client_unsubscribed">> => false
+            },
+        <<"sys_heartbeat_interval">> => <<"30s">>,
+        <<"sys_msg_interval">> => <<"1m">>
+    }.

+ 0 - 12
apps/emqx_management/src/proto/emqx_management_proto_v1.erl

@@ -26,9 +26,6 @@
     list_subscriptions/1,
     list_subscriptions/1,
 
 
     list_listeners/1,
     list_listeners/1,
-    remove_listener/2,
-
-    update_listener/3,
     subscribe/3,
     subscribe/3,
     unsubscribe/3,
     unsubscribe/3,
 
 
@@ -58,15 +55,6 @@ list_subscriptions(Node) ->
 list_listeners(Node) ->
 list_listeners(Node) ->
     rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []).
     rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []).
 
 
--spec remove_listener(node(), string()) -> ok | {badrpc, _}.
-remove_listener(Node, Id) ->
-    rpc:call(Node, emqx_mgmt_api_listeners, do_remove_listener, [Id]).
-
--spec update_listener(node(), atom(), emqx_config:update_request()) ->
-    {ok, map()} | {error, _} | {badrpc, _}.
-update_listener(Node, Id, Config) ->
-    rpc:call(Node, emqx_mgmt_api_listeners, do_update_listener, [Id, Config]).
-
 -spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) ->
 -spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) ->
     {subscribe, _} | {error, atom()} | {badrpc, _}.
     {subscribe, _} | {error, atom()} | {badrpc, _}.
 subscribe(Node, ClientId, TopicTables) ->
 subscribe(Node, ClientId, TopicTables) ->

+ 50 - 5
apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl

@@ -39,14 +39,44 @@ t_list_listeners(_) ->
     ?assertEqual(length(Expect), length(Res)),
     ?assertEqual(length(Expect), length(Res)),
     ok.
     ok.
 
 
-t_crud_listeners_by_id(_) ->
-    TcpListenerId = <<"tcp:default">>,
+t_tcp_crud_listeners_by_id(_) ->
+    ListenerId = <<"tcp:default">>,
     NewListenerId = <<"tcp:new">>,
     NewListenerId = <<"tcp:new">>,
-    TcpPath = emqx_mgmt_api_test_util:api_path(["listeners", TcpListenerId]),
+    MinListenerId = <<"tcp:min">>,
+    BadId = <<"tcp:bad">>,
+    Type = <<"tcp">>,
+    crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type).
+
+t_ssl_crud_listeners_by_id(_) ->
+    ListenerId = <<"ssl:default">>,
+    NewListenerId = <<"ssl:new">>,
+    MinListenerId = <<"ssl:min">>,
+    BadId = <<"ssl:bad">>,
+    Type = <<"ssl">>,
+    crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type).
+
+t_ws_crud_listeners_by_id(_) ->
+    ListenerId = <<"ws:default">>,
+    NewListenerId = <<"ws:new">>,
+    MinListenerId = <<"ws:min">>,
+    BadId = <<"ws:bad">>,
+    Type = <<"ws">>,
+    crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type).
+
+t_wss_crud_listeners_by_id(_) ->
+    ListenerId = <<"wss:default">>,
+    NewListenerId = <<"wss:new">>,
+    MinListenerId = <<"wss:min">>,
+    BadId = <<"wss:bad">>,
+    Type = <<"wss">>,
+    crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type).
+
+crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type) ->
+    TcpPath = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]),
     NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]),
     NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]),
     TcpListener = request(get, TcpPath, [], []),
     TcpListener = request(get, TcpPath, [], []),
 
 
-    %% create
+    %% create with full options
     ?assertEqual({error, not_found}, is_running(NewListenerId)),
     ?assertEqual({error, not_found}, is_running(NewListenerId)),
     ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, NewPath, [], [])),
     ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, NewPath, [], [])),
     NewConf = TcpListener#{
     NewConf = TcpListener#{
@@ -59,8 +89,22 @@ t_crud_listeners_by_id(_) ->
     ?assertMatch(Create, Get1),
     ?assertMatch(Create, Get1),
     ?assert(is_running(NewListenerId)),
     ?assert(is_running(NewListenerId)),
 
 
+    %% create with required options
+    MinPath = emqx_mgmt_api_test_util:api_path(["listeners", MinListenerId]),
+    ?assertEqual({error, not_found}, is_running(MinListenerId)),
+    ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, MinPath, [], [])),
+    MinConf = #{
+        <<"id">> => MinListenerId,
+        <<"bind">> => <<"0.0.0.0:3883">>,
+        <<"type">> => Type
+    },
+    MinCreate = request(post, MinPath, [], MinConf),
+    ?assertEqual(lists:sort(maps:keys(TcpListener)), lists:sort(maps:keys(MinCreate))),
+    MinGet = request(get, MinPath, [], []),
+    ?assertMatch(MinCreate, MinGet),
+    ?assert(is_running(MinListenerId)),
+
     %% bad create(same port)
     %% bad create(same port)
-    BadId = <<"tcp:bad">>,
     BadPath = emqx_mgmt_api_test_util:api_path(["listeners", BadId]),
     BadPath = emqx_mgmt_api_test_util:api_path(["listeners", BadId]),
     BadConf = TcpListener#{
     BadConf = TcpListener#{
         <<"id">> => BadId,
         <<"id">> => BadId,
@@ -79,6 +123,7 @@ t_crud_listeners_by_id(_) ->
 
 
     %% delete
     %% delete
     ?assertEqual([], delete(NewPath)),
     ?assertEqual([], delete(NewPath)),
+    ?assertEqual([], delete(MinPath)),
     ?assertEqual({error, not_found}, is_running(NewListenerId)),
     ?assertEqual({error, not_found}, is_running(NewListenerId)),
     ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, NewPath, [], [])),
     ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, NewPath, [], [])),
     ?assertEqual([], delete(NewPath)),
     ?assertEqual([], delete(NewPath)),

+ 20 - 15
apps/emqx_management/test/emqx_mgmt_api_sys_SUITE.erl

@@ -33,22 +33,27 @@ end_per_suite(_) ->
 t_get_put(_) ->
 t_get_put(_) ->
     {ok, Default} = get_sys_topics_config(),
     {ok, Default} = get_sys_topics_config(),
     ?assertEqual(
     ?assertEqual(
-       #{<<"sys_event_messages">> =>
-         #{<<"client_connected">> => true,
-           <<"client_disconnected">> => true,
-           <<"client_subscribed">> => false,
-           <<"client_unsubscribed">> => false
-          },
-         <<"sys_heartbeat_interval">> => <<"30s">>,
-         <<"sys_msg_interval">> => <<"1m">>}, Default),
+        #{
+            <<"sys_event_messages">> =>
+                #{
+                    <<"client_connected">> => true,
+                    <<"client_disconnected">> => true,
+                    <<"client_subscribed">> => false,
+                    <<"client_unsubscribed">> => false
+                },
+            <<"sys_heartbeat_interval">> => <<"30s">>,
+            <<"sys_msg_interval">> => <<"1m">>
+        },
+        Default
+    ),
 
 
-   NConfig = Default#{
-               <<"sys_msg_interval">> => <<"4m">>,
-               <<"sys_event_messages">> => #{<<"client_subscribed">> => false}
-              },
-   {ok, ConfigResp} = put_sys_topics_config(NConfig),
-   ?assertEqual(NConfig, ConfigResp),
-   {ok, Default} = put_sys_topics_config(Default).
+    NConfig = Default#{
+        <<"sys_msg_interval">> => <<"4m">>,
+        <<"sys_event_messages">> => #{<<"client_subscribed">> => false}
+    },
+    {ok, ConfigResp} = put_sys_topics_config(NConfig),
+    ?assertEqual(NConfig, ConfigResp),
+    {ok, Default} = put_sys_topics_config(Default).
 
 
 get_sys_topics_config() ->
 get_sys_topics_config() ->
     Path = emqx_mgmt_api_test_util:api_path(["mqtt", "sys_topics"]),
     Path = emqx_mgmt_api_test_util:api_path(["mqtt", "sys_topics"]),

+ 1 - 1
mix.exs

@@ -69,7 +69,7 @@ defmodule EMQXUmbrella.MixProject do
       {:getopt, "1.0.2", override: true},
       {:getopt, "1.0.2", override: true},
       {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "0.18.0", override: true},
       {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "0.18.0", override: true},
       {:hocon, github: "emqx/hocon", tag: "0.26.7", override: true},
       {:hocon, github: "emqx/hocon", tag: "0.26.7", override: true},
-      {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.4.1", override: true},
+      {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.1", override: true},
       {:esasl, github: "emqx/esasl", tag: "0.2.0"},
       {:esasl, github: "emqx/esasl", tag: "0.2.0"},
       {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},
       {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},
       # in conflict by ehttpc and emqtt
       # in conflict by ehttpc and emqtt

+ 1 - 1
rebar.config

@@ -67,7 +67,7 @@
     , {getopt, "1.0.2"}
     , {getopt, "1.0.2"}
     , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.18.0"}}}
     , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.18.0"}}}
     , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.26.7"}}}
     , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.26.7"}}}
-    , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.1"}}}
+    , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.1"}}}
     , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
     , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
     , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}
     , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}
     ]}.
     ]}.

+ 1 - 1
rebar.config.erl

@@ -95,7 +95,7 @@ project_app_dirs(Edition) ->
 
 
 plugins() ->
 plugins() ->
     [ {relup_helper,{git,"https://github.com/emqx/relup_helper", {tag, "2.0.0"}}}
     [ {relup_helper,{git,"https://github.com/emqx/relup_helper", {tag, "2.0.0"}}}
-    , {er_coap_client, {git, "https://github.com/emqx/er_coap_client", {tag, "v1.0.4"}}}
+    , {er_coap_client, {git, "https://github.com/emqx/er_coap_client", {tag, "v1.0.5"}}}
       %% emqx main project does not require port-compiler
       %% emqx main project does not require port-compiler
       %% pin at root level for deterministic
       %% pin at root level for deterministic
     , {pc, {git, "https://github.com/emqx/port_compiler.git", {tag, "v1.11.1"}}}
     , {pc, {git, "https://github.com/emqx/port_compiler.git", {tag, "v1.11.1"}}}