Просмотр исходного кода

refactor: simplify updating the listener

Zhongwen Deng 3 лет назад
Родитель
Сommit
2b73a80dba

+ 65 - 48
apps/emqx/src/emqx_listeners.erl

@@ -50,12 +50,14 @@
     parse_listener_id/1
 ]).
 
--export([post_config_update/5]).
+-export([create/2, update/2, remove/1]).
+-export([pre_config_update/3, post_config_update/5]).
 
 -export([format_addr/1]).
 
--define(CONF_KEY_PATH, [listeners]).
+-define(CONF_KEY_PATH, [listeners, '?', '?']).
 -define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]).
+-define(OPTS(_OverrideTo_), #{rawconf_with_defaults => true, override_to => _OverrideTo_}).
 
 -spec id_example() -> atom().
 id_example() -> 'tcp:default'.
@@ -202,7 +204,9 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
                 "Failed to start listener ~ts on ~ts: ~0p~n",
                 [ListenerId, BindStr, Reason]
             ),
-            error({failed_to_start, ListenerId, BindStr, Reason})
+            Msg = lists:flatten(io_lib:format("~ts(~ts) : ~p",
+                [ListenerId, BindStr, element(1, Reason)])),
+            {error, {failed_to_start, Msg}}
     end.
 
 %% @doc Restart all listeners
@@ -334,54 +338,47 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) ->
             {ok, {skipped, quic_app_missing}}
     end.
 
-delete_authentication(Type, ListenerName, _Conf) ->
-    emqx_authentication:delete_chain(listener_id(Type, ListenerName)).
+update(Path, Conf) ->
+    wrap(emqx_conf:update(Path, {update, Conf}, ?OPTS(cluster))).
 
-%% Update the listeners at runtime
-post_config_update(_, _Req, NewListeners, OldListeners, _AppEnvs) ->
-    #{added := Added, removed := Removed, changed := Updated} =
-        diff_listeners(NewListeners, OldListeners),
-    try
-        perform_listener_changes(fun stop_listener/3, Removed),
-        perform_listener_changes(fun delete_authentication/3, Removed),
-        perform_listener_changes(fun start_listener/3, Added),
-        perform_listener_changes(fun restart_listener/3, Updated)
-    catch
-        error:{failed_to_start, ListenerId, Bind, Reason} ->
-            Error = lists:flatten(
-                io_lib:format(
-                    "~ts(~ts) failed with ~ts",
-                    [ListenerId, Bind, element(1, Reason)]
-                )
-            ),
-            {error, Error}
-    end.
+create(Path, Conf) ->
+    wrap(emqx_conf:update(Path, {create, Conf}, ?OPTS(cluster))).
 
-perform_listener_changes(Action, MapConfs) ->
-    lists:foreach(
-        fun({Id, Conf}) ->
-            {ok, #{type := Type, name := Name}} = parse_listener_id(Id),
-            Action(Type, Name, Conf)
-        end,
-        maps:to_list(MapConfs)
-    ).
+remove(Path) ->
+    wrap(emqx_conf:remove(Path, ?OPTS(cluster))).
 
-diff_listeners(NewListeners, OldListeners) ->
-    emqx_map_lib:diff_maps(flatten_listeners(NewListeners), flatten_listeners(OldListeners)).
-
-flatten_listeners(Conf0) ->
-    maps:from_list(
-        lists:append([
-            do_flatten_listeners(Type, Conf)
-         || {Type, Conf} <- maps:to_list(Conf0)
-        ])
-    ).
+wrap({error, {post_config_update,?MODULE, Reason}}) -> {error, Reason};
+wrap({error, {pre_config_update,?MODULE, Reason}}) -> {error, Reason};
+wrap({error, Reason}) -> {error, Reason};
+wrap(Ok) -> Ok.
 
-do_flatten_listeners(Type, Conf0) ->
-    [
-        {listener_id(Type, Name), maps:remove(authentication, Conf)}
-     || {Name, Conf} <- maps:to_list(Conf0)
-    ].
+%% Update the listeners at runtime
+pre_config_update([listeners, Type, Name], {create, NewConf}, undefined) ->
+    CertsDir = certs_dir(Type, Name),
+    {ok, convert_certs(CertsDir, NewConf)};
+pre_config_update([listeners, _Type, _Name], {create, _NewConf}, _RawConf) ->
+    {error, already_exist};
+pre_config_update([listeners, _Type, _Name], {update, _Request}, undefined) ->
+    {error, not_found};
+pre_config_update([listeners, Type, Name], {update, Request}, RawConf) ->
+    NewConf = emqx_map_lib:deep_merge(RawConf, Request),
+    CertsDir = certs_dir(Type, Name),
+    {ok, convert_certs(CertsDir, NewConf)}.
+
+post_config_update([listeners, Type, Name], {create, _Request}, NewConf, undefined, _AppEnvs) ->
+    start_listener(Type, Name, NewConf);
+post_config_update([listeners, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) ->
+    restart_listener(Type, Name, {OldConf, NewConf});
+post_config_update([listeners, _Type, _Name], '$remove', undefined, undefined, _AppEnvs) ->
+    {error, not_found};
+post_config_update([listeners, Type, Name], '$remove', undefined, OldConf, _AppEnvs) ->
+    case stop_listener(Type, Name, OldConf) of
+        ok ->
+            emqx_authentication:delete_chain(listener_id(Type, Name)),
+            CertsDir = certs_dir(Type, Name),
+            clear_certs(CertsDir, OldConf);
+        Err -> Err
+    end.
 
 esockd_opts(Type, Opts0) ->
     Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
@@ -518,7 +515,10 @@ foreach_listeners(Do) ->
     lists:foreach(
         fun({Id, LConf}) ->
             {ok, #{type := Type, name := Name}} = parse_listener_id(Id),
-            Do(Type, Name, LConf)
+            case Do(Type, Name, LConf) of
+                {error, {failed_to_start, _} = Reason} -> error(Reason);
+                _ -> ok
+            end
         end,
         list()
     ).
@@ -552,3 +552,20 @@ parse_bind(#{<<"bind">> := Bind}) ->
         {ok, L} -> L;
         {error, _} -> binary_to_integer(Bind)
     end.
+
+%% The relative dir for ssl files.
+certs_dir(Type, Name) ->
+    iolist_to_binary(filename:join(["listeners", Type, Name])).
+
+convert_certs(CertsDir, Conf) ->
+    case emqx_tls_lib:ensure_ssl_files(CertsDir, maps:get(<<"ssl">>, Conf, undefined)) of
+        {ok, undefined} -> Conf;
+        {ok, SSL} -> Conf#{<<"ssl">> => SSL};
+        {error, Reason} ->
+            ?SLOG(error, Reason#{msg => "bad_ssl_config"}),
+            throw({bad_ssl_config, Reason})
+    end.
+
+clear_certs(CertsDir, Conf) ->
+    OldSSL = maps:get(<<"ssl">>, Conf, undefined),
+    emqx_tls_lib:delete_ssl_files(CertsDir, undefined, OldSSL).

+ 161 - 52
apps/emqx_management/src/emqx_mgmt_api_listeners.erl

@@ -30,9 +30,7 @@
 
 %% for rpc call
 -export([
-    do_list_listeners/0,
-    do_update_listener/2,
-    do_remove_listener/1
+    do_list_listeners/0
 ]).
 
 -include_lib("emqx/include/emqx.hrl").
@@ -44,8 +42,6 @@
 -define(LISTENER_ID_INCONSISTENT, <<"Path and body's listener id not match">>).
 -define(ADDR_PORT_INUSE, <<"Addr port in use">>).
 
--define(OPTS(_OverrideTo_), #{rawconf_with_defaults => true, override_to => _OverrideTo_}).
-
 namespace() -> "listeners".
 
 api_spec() ->
@@ -65,7 +61,10 @@ schema("/listeners_status") ->
         get => #{
             tags => [<<"listeners">>],
             desc => <<"List all running node's listeners live status. group by listener type">>,
-            responses => #{200 => ?HOCON(?ARRAY(?R_REF(listener_type_status)))}
+            responses => #{200 =>
+            emqx_dashboard_swagger:schema_with_example(
+                ?ARRAY(?R_REF(listener_type_status)),
+                listener_type_status_example())}
         }
     };
 schema("/listeners") ->
@@ -78,10 +77,13 @@ schema("/listeners") ->
                 {type,
                     ?HOCON(
                         ?ENUM(listeners_type()),
-                        #{desc => "Listener type", in => query, required => false}
+                        #{desc => "Listener type", in => query, required => false, example => tcp}
                     )}
             ],
-            responses => #{200 => ?HOCON(?ARRAY(?R_REF(listener_id_status)))}
+            responses => #{200 =>
+            emqx_dashboard_swagger:schema_with_example(
+                ?ARRAY(?R_REF(listener_id_status)),
+                listener_id_status_example())}
         }
     };
 schema("/listeners/:id") ->
@@ -92,7 +94,7 @@ schema("/listeners/:id") ->
             desc => <<"List all running node's listeners for the specified id.">>,
             parameters => [?R_REF(listener_id)],
             responses => #{
-                200 => ?HOCON(listener_schema(#{bind => true})),
+                200 => listener_schema(#{bind => true}),
                 404 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND)
             }
         },
@@ -100,9 +102,9 @@ schema("/listeners/:id") ->
             tags => [<<"listeners">>],
             desc => <<"Update the specified listener on all nodes.">>,
             parameters => [?R_REF(listener_id)],
-            'requestBody' => ?HOCON(listener_schema(#{bind => false}), #{}),
+            'requestBody' => listener_schema(#{bind => false}),
             responses => #{
-                200 => ?HOCON(listener_schema(#{bind => true}), #{}),
+                200 => listener_schema(#{bind => true}),
                 400 => error_codes(['BAD_REQUEST']),
                 404 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND)
             }
@@ -111,9 +113,9 @@ schema("/listeners/:id") ->
             tags => [<<"listeners">>],
             desc => <<"Create the specified listener on all nodes.">>,
             parameters => [?R_REF(listener_id)],
-            'requestBody' => ?HOCON(listener_schema(#{bind => true}), #{}),
+            'requestBody' => listener_schema(#{bind => true}),
             responses => #{
-                200 => ?HOCON(listener_schema(#{bind => true}), #{}),
+                200 => listener_schema(#{bind => true}),
                 400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'])
             }
         },
@@ -149,8 +151,9 @@ fields(listener_id) ->
         {id,
             ?HOCON(atom(), #{
                 desc => "Listener id",
-                example => 'tcp:default',
+                example => 'tcp:demo',
                 validator => fun validate_id/1,
+                required => true,
                 in => path
             })}
     ];
@@ -184,7 +187,10 @@ fields(listener_id_status) ->
     fields(listener_id) ++
         [
             {enable, ?HOCON(boolean(), #{desc => "Listener enable", required => true})},
-            {number, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId number"})},
+            {number, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId counter"})},
+            {bind, ?HOCON(hoconsc:union([emqx_schema:ip_port(), integer()]),
+                #{desc => "Listener bind addr", required => true})},
+            {acceptors, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId acceptors"})},
             {status, ?HOCON(?R_REF(status))},
             {node_status, ?HOCON(?ARRAY(?R_REF(node_status)))}
         ];
@@ -202,7 +208,9 @@ fields(Type) ->
     Schema.
 
 listener_schema(Opts) ->
-    ?UNION(lists:map(fun(#{ref := Ref}) -> Ref end, listeners_info(Opts))).
+    emqx_dashboard_swagger:schema_with_example(
+        ?UNION(lists:map(fun(#{ref := Ref}) -> Ref end, listeners_info(Opts))),
+        tcp_schema_example()).
 
 listeners_type() ->
     lists:map(
@@ -266,7 +274,11 @@ validate_id(Id) ->
 %% api
 listener_type_status(get, _Request) ->
     Listeners = maps:to_list(listener_status_by_type(list_listeners(), #{})),
-    List = lists:map(fun({Type, L}) -> L#{type => Type} end, Listeners),
+    List = lists:map(fun({Type, L}) ->
+        L1 = maps:without([bind, acceptors], L),
+        L1#{type => Type}
+                     end,
+        Listeners),
     {200, List}.
 
 list_listeners(get, #{query_string := Query}) ->
@@ -291,15 +303,17 @@ crud_listeners_by_id(get, #{bindings := #{id := Id0}}) ->
 crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) ->
     case parse_listener_conf(Body0) of
         {Id, Type, Name, Conf} ->
-            Key = [listeners, Type, Name],
-            case emqx_conf:get_raw(Key, undefined) of
+            Path = [listeners, Type, Name],
+            case emqx_conf:get_raw(Path, undefined) of
                 undefined ->
                     {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}};
                 PrevConf ->
                     MergeConf = emqx_map_lib:deep_merge(PrevConf, Conf),
-                    case emqx_conf:update(Key, MergeConf, ?OPTS(cluster)) of
+                    case emqx_listeners:update(Path, MergeConf) of
                         {ok, #{raw_config := _RawConf}} ->
                             crud_listeners_by_id(get, #{bindings => #{id => Id}});
+                        {error, not_found} ->
+                            {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}};
                         {error, Reason} ->
                             {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
                     end
@@ -312,17 +326,14 @@ crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) ->
 crud_listeners_by_id(post, #{bindings := #{id := Id}, body := Body0}) ->
     case parse_listener_conf(Body0) of
         {Id, Type, Name, Conf} ->
-            Key = [listeners, Type, Name],
-            case emqx_conf:get(Key, undefined) of
-                undefined ->
-                    case emqx_conf:update([listeners, Type, Name], Conf, ?OPTS(cluster)) of
-                        {ok, #{raw_config := _RawConf}} ->
-                            crud_listeners_by_id(get, #{bindings => #{id => Id}});
-                        {error, Reason} ->
-                            {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
-                    end;
-                _ ->
-                    {400, #{code => 'BAD_LISTENER_ID', message => <<"Already Exist">>}}
+            Path = [listeners, Type, Name],
+            case emqx_listeners:create(Path, Conf) of
+                {ok, #{raw_config := _RawConf}} ->
+                    crud_listeners_by_id(get, #{bindings => #{id => Id}});
+                {error, already_exist} ->
+                    {400, #{code => 'BAD_LISTENER_ID', message => <<"Already Exist">>}};
+                {error, Reason} ->
+                    {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
             end;
         {error, Reason} ->
             {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
@@ -331,8 +342,9 @@ crud_listeners_by_id(post, #{bindings := #{id := Id}, body := Body0}) ->
     end;
 crud_listeners_by_id(delete, #{bindings := #{id := Id}}) ->
     {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
-    case emqx_conf:remove([listeners, Type, Name], ?OPTS(cluster)) of
+    case emqx_listeners:remove([listeners, Type, Name]) of
         {ok, _} -> {204};
+        {error, not_found} -> {204};
         {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
     end.
 
@@ -465,23 +477,6 @@ do_list_listeners() ->
         <<"listeners">> => Listeners
     }.
 
--spec do_update_listener(string(), emqx_config:update_request()) ->
-    {ok, map()} | {error, _}.
-do_update_listener(Id, Config) ->
-    {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
-    case emqx:update_config([listeners, Type, Name], Config, ?OPTS(local)) of
-        {ok, #{raw_config := RawConf}} -> {ok, RawConf};
-        {error, Reason} -> {error, Reason}
-    end.
-
--spec do_remove_listener(string()) -> ok.
-do_remove_listener(Id) ->
-    {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
-    case emqx:remove_config([listeners, Type, Name], ?OPTS(local)) of
-        {ok, _} -> ok;
-        {error, Reason} -> error(Reason)
-    end.
-
 wrap_rpc({badrpc, Reason}) ->
     {error, Reason};
 wrap_rpc(Res) ->
@@ -492,7 +487,9 @@ format_status(Key, Node, Listener, Acc) ->
         <<"id">> := Id,
         <<"running">> := Running,
         <<"max_connections">> := MaxConnections,
-        <<"current_connections">> := CurrentConnections
+        <<"current_connections">> := CurrentConnections,
+        <<"acceptors">> := Acceptors,
+        <<"bind">> := Bind
     } = Listener,
     GroupKey = maps:get(Key, Listener),
     case maps:find(GroupKey, Acc) of
@@ -501,6 +498,8 @@ format_status(Key, Node, Listener, Acc) ->
                 GroupKey => #{
                     enable => Running,
                     ids => [Id],
+                    acceptors => Acceptors,
+                    bind => Bind,
                     status => #{
                         max_connections => MaxConnections,
                         current_connections => CurrentConnections
@@ -555,6 +554,116 @@ format_status(Key, Node, Listener, Acc) ->
             }
     end.
 
-max_conn(_Int1, infinity) -> infinity;
-max_conn(infinity, _Int) -> infinity;
+max_conn(_Int1, <<"infinity">>) -> <<"infinity">>;
+max_conn(<<"infinity">>, _Int) -> <<"infinity">>;
 max_conn(Int1, Int2) -> Int1 + Int2.
+
+listener_type_status_example() ->
+    [
+        #{
+            enable => false,
+            ids => ["tcp:demo"],
+            node_status => #{
+                'emqx@127.0.0.1' => #{
+                    current_connections => 11,
+                    max_connections => 1024000},
+                'emqx@127.0.0.2' => #{
+                    current_connections => 10,
+                    max_connections => 1024000}
+            },
+            status => #{
+                current_connections  => 21,
+                max_connections => 2048000
+            },
+            type => tcp
+        },
+        #{
+            enable => false,
+            ids => ["ssl:default"],
+            node_status => #{
+                'emqx@127.0.0.1' => #{
+                    current_connections => 31,
+                    max_connections => infinity},
+                'emqx@127.0.0.2' => #{
+                    current_connections => 40,
+                    max_connections => infinity}
+            },
+            status => #{
+                current_connections  => 71,
+                max_connections => infinity
+            },
+            type => ssl
+        }
+    ].
+
+listener_id_status_example() ->
+    [
+        #{
+            acceptors => 16,
+            bind =>  <<"0.0.0.0:1884">>,
+            enable =>  true,
+            id => <<"tcp:demo">>,
+            node_status => #{
+                'emqx@127.0.0.1' => #{
+                    current_connections => 100,
+                    max_connections => 1024000
+                },
+                'emqx@127.0.0.2' => #{
+                    current_connections => 101,
+                    max_connections => 1024000
+                }
+            },
+            number => 2,
+            status => #{
+                current_connections => 201,
+                max_connections => 2048000
+            }
+        },
+        #{
+            acceptors => 32,
+            bind =>  <<"0.0.0.0:1883">>,
+            enable =>  true,
+            id => <<"tcp:default">>,
+            node_status => #{
+                'emqx@127.0.0.1' => #{
+                    current_connections => 300,
+                    max_connections => infinity
+                },
+                'emqx@127.0.0.2' => #{
+                    current_connections => 201,
+                    max_connections => infinity
+                }
+            },
+            number => 2,
+            status => #{
+                current_connections => 501,
+                max_connections => infinity
+            }
+        }
+    ].
+
+tcp_schema_example() ->
+    #{
+        acceptors => 16,
+        access_rules => ["allow all"],
+        bind => <<"0.0.0.0:1884">>,
+        current_connections => 10240,
+        id => <<"tcp:demo">>,
+        max_connections => 204800,
+        mountpoint => <<"/">>,
+        proxy_protocol => false,
+        proxy_protocol_timeout =>  <<"3s">>,
+        running =>  true,
+        tcp => #{
+            active_n => 100,
+            backlog => 1024,
+            buffer => <<"4KB">>,
+            high_watermark => <<"1MB">>,
+            nodelay => false,
+            reuseaddr => true,
+            send_timeout => <<"15s">>,
+            send_timeout_close => true
+        },
+        type => tcp,
+        zone => default
+    }.

+ 0 - 12
apps/emqx_management/src/proto/emqx_management_proto_v1.erl

@@ -26,9 +26,6 @@
     list_subscriptions/1,
 
     list_listeners/1,
-    remove_listener/2,
-
-    update_listener/3,
     subscribe/3,
     unsubscribe/3,
 
@@ -58,15 +55,6 @@ list_subscriptions(Node) ->
 list_listeners(Node) ->
     rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []).
 
--spec remove_listener(node(), string()) -> ok | {badrpc, _}.
-remove_listener(Node, Id) ->
-    rpc:call(Node, emqx_mgmt_api_listeners, do_remove_listener, [Id]).
-
--spec update_listener(node(), atom(), emqx_config:update_request()) ->
-    {ok, map()} | {error, _} | {badrpc, _}.
-update_listener(Node, Id, Config) ->
-    rpc:call(Node, emqx_mgmt_api_listeners, do_update_listener, [Id, Config]).
-
 -spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) ->
     {subscribe, _} | {error, atom()} | {badrpc, _}.
 subscribe(Node, ClientId, TopicTables) ->