Преглед изворни кода

Merge pull request #8218 from zhongwencool/enabled-listener

feat: add enabled for listeners
zhongwencool пре 3 година
родитељ
комит
14ba93d1df

+ 5 - 5
apps/emqx/i18n/emqx_schema_i18n.conf

@@ -1895,16 +1895,16 @@ QUIC listeners
     }
 }
 
-fields_mqtt_quic_listener_enabled {
+fields_listener_enabled {
     desc {
         en: """
-Enable QUIC listener.
+Enable listener.
 """
-        zh: """启用 QUIC 监听器"""
+        zh: """启监听器"""
     }
     label: {
-        en: "Enable QUIC listener"
-        zh: "启用 QUIC 监听器"
+        en: "Enable listener"
+        zh: "启监听器"
     }
 }
 

+ 22 - 1
apps/emqx/src/emqx_listeners.erl

@@ -260,6 +260,12 @@ stop_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
                 [listener_id(Type, ListenerName), format_addr(Bind)]
             ),
             ok;
+        {error, not_found} ->
+            ?ELOG(
+                "Failed to stop listener ~ts on ~ts: ~0p~n",
+                [listener_id(Type, ListenerName), format_addr(Bind), already_stopped]
+            ),
+            ok;
         {error, Reason} ->
             ?ELOG(
                 "Failed to stop listener ~ts on ~ts: ~0p~n",
@@ -360,13 +366,19 @@ pre_config_update([listeners, Type, Name], {update, Request}, RawConf) ->
     NewConf = ensure_override_limiter_conf(NewConfT, Request),
     CertsDir = certs_dir(Type, Name),
     {ok, convert_certs(CertsDir, NewConf)};
+pre_config_update([listeners, _Type, _Name], {action, _Action, Updated}, RawConf) ->
+    NewConf = emqx_map_lib:deep_merge(RawConf, Updated),
+    {ok, NewConf};
 pre_config_update(_Path, _Request, RawConf) ->
     {ok, RawConf}.
 
 post_config_update([listeners, Type, Name], {create, _Request}, NewConf, undefined, _AppEnvs) ->
     start_listener(Type, Name, NewConf);
 post_config_update([listeners, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) ->
-    restart_listener(Type, Name, {OldConf, NewConf});
+    case NewConf of
+        #{<<"enabled">> := true} -> restart_listener(Type, Name, {OldConf, NewConf});
+        _ -> ok
+    end;
 post_config_update([listeners, _Type, _Name], '$remove', undefined, undefined, _AppEnvs) ->
     {error, not_found};
 post_config_update([listeners, Type, Name], '$remove', undefined, OldConf, _AppEnvs) ->
@@ -378,6 +390,15 @@ post_config_update([listeners, Type, Name], '$remove', undefined, OldConf, _AppE
         Err ->
             Err
     end;
+post_config_update([listeners, Type, Name], {action, _Action, _}, NewConf, OldConf, _AppEnvs) ->
+    #{enabled := NewEnabled} = NewConf,
+    #{enabled := OldEnabled} = OldConf,
+    case {NewEnabled, OldEnabled} of
+        {true, true} -> restart_listener(Type, Name, {OldConf, NewConf});
+        {true, false} -> start_listener(Type, Name, NewConf);
+        {false, true} -> stop_listener(Type, Name, OldConf);
+        {false, false} -> stop_listener(Type, Name, OldConf)
+    end;
 post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) ->
     ok.
 

+ 8 - 8
apps/emqx/src/emqx_schema.erl

@@ -845,14 +845,6 @@ fields("mqtt_wss_listener") ->
         ];
 fields("mqtt_quic_listener") ->
     [
-        {"enabled",
-            sc(
-                boolean(),
-                #{
-                    default => true,
-                    desc => ?DESC(fields_mqtt_quic_listener_enabled)
-                }
-            )},
         %% TODO: ensure cacertfile is configurable
         {"certfile",
             sc(
@@ -1568,6 +1560,14 @@ mqtt_listener(Bind) ->
 
 base_listener(Bind) ->
     [
+        {"enabled",
+            sc(
+                boolean(),
+                #{
+                    default => true,
+                    desc => ?DESC(fields_listener_enabled)
+                }
+            )},
         {"bind",
             sc(
                 hoconsc:union([ip_port(), integer()]),

+ 1 - 17
apps/emqx/src/proto/emqx_broker_proto_v1.erl

@@ -24,11 +24,7 @@
     forward/3,
     forward_async/3,
     list_client_subscriptions/2,
-    list_subscriptions_via_topic/2,
-
-    start_listener/2,
-    stop_listener/2,
-    restart_listener/2
+    list_subscriptions_via_topic/2
 ]).
 
 -include("bpapi.hrl").
@@ -56,15 +52,3 @@ list_client_subscriptions(Node, ClientId) ->
 -spec list_subscriptions_via_topic(node(), emqx_types:topic()) -> [emqx_types:subopts()].
 list_subscriptions_via_topic(Node, Topic) ->
     rpc:call(Node, emqx_broker, subscriptions_via_topic, [Topic]).
-
--spec start_listener(node(), atom()) -> ok | {error, _} | {badrpc, _}.
-start_listener(Node, Id) ->
-    rpc:call(Node, emqx_listeners, start_listener, [Id]).
-
--spec stop_listener(node(), atom()) -> ok | {error, _} | {badrpc, _}.
-stop_listener(Node, Id) ->
-    rpc:call(Node, emqx_listeners, stop_listener, [Id]).
-
--spec restart_listener(node(), atom()) -> ok | {error, _} | {badrpc, _}.
-restart_listener(Node, Id) ->
-    rpc:call(Node, emqx_listeners, restart_listener, [Id]).

+ 25 - 50
apps/emqx_management/src/emqx_mgmt_api_listeners.erl

@@ -148,7 +148,7 @@ schema("/listeners/:id/:action") ->
             ],
             responses => #{
                 200 => <<"Updated">>,
-                400 => error_codes(['BAD_REQUEST'])
+                400 => error_codes(['BAD_REQUEST', 'BAD_LISTENER_ID'])
             }
         }
     }.
@@ -362,66 +362,38 @@ crud_listeners_by_id(delete, #{bindings := #{id := Id}}) ->
     end.
 
 parse_listener_conf(Conf0) ->
-    Conf1 = maps:remove(<<"running">>, Conf0),
-    Conf2 = maps:remove(<<"current_connections">>, Conf1),
-    {IdBin, Conf3} = maps:take(<<"id">>, Conf2),
-    {TypeBin, Conf4} = maps:take(<<"type">>, Conf3),
+    Conf1 = maps:without([<<"running">>, <<"current_connections">>], Conf0),
+    {IdBin, Conf2} = maps:take(<<"id">>, Conf1),
+    {TypeBin, Conf3} = maps:take(<<"type">>, Conf2),
     {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(IdBin),
     TypeAtom = binary_to_existing_atom(TypeBin),
     case Type =:= TypeAtom of
-        true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf4};
+        true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf3};
         false -> {error, listener_type_inconsistent}
     end.
 
 action_listeners_by_id(post, #{bindings := #{id := Id, action := Action}}) ->
-    Results = [action_listeners(Node, Id, Action) || Node <- mria_mnesia:running_nodes()],
-    case
-        lists:filter(
-            fun
-                ({_, {200}}) -> false;
-                (_) -> true
-            end,
-            Results
-        )
-    of
-        [] -> {200};
-        Errors -> {400, #{code => 'BAD_REQUEST', message => action_listeners_err(Errors)}}
+    {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
+    Path = [listeners, Type, Name],
+    case emqx_conf:get_raw(Path, undefined) of
+        undefined ->
+            {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}};
+        _PrevConf ->
+            case action(Path, Action, enabled(Action)) of
+                {ok, #{raw_config := _RawConf}} ->
+                    {200};
+                {error, not_found} ->
+                    {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}};
+                {error, Reason} ->
+                    {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
+            end
     end.
 
 %%%==============================================================================================
 
-action_listeners(Node, Id, Action) ->
-    {Node, do_action_listeners(Action, Node, Id)}.
-
-do_action_listeners(start, Node, Id) ->
-    case wrap_rpc(emqx_broker_proto_v1:start_listener(Node, Id)) of
-        ok -> {200};
-        {error, {already_started, _}} -> {200};
-        {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
-    end;
-do_action_listeners(stop, Node, Id) ->
-    case wrap_rpc(emqx_broker_proto_v1:stop_listener(Node, Id)) of
-        ok -> {200};
-        {error, not_found} -> {200};
-        {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
-    end;
-do_action_listeners(restart, Node, Id) ->
-    case wrap_rpc(emqx_broker_proto_v1:restart_listener(Node, Id)) of
-        ok -> {200};
-        {error, not_found} -> do_action_listeners(start, Node, Id);
-        {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
-    end.
-
-action_listeners_err(Errors) ->
-    list_to_binary(
-        lists:foldl(
-            fun({Node, Err}, Str) ->
-                err_msg_str(#{node => Node, error => Err}) ++ "; " ++ Str
-            end,
-            "",
-            Errors
-        )
-    ).
+enabled(start) -> #{<<"enabled">> => true};
+enabled(stop) -> #{<<"enabled">> => false};
+enabled(restart) -> #{<<"enabled">> => true}.
 
 err_msg(Atom) when is_atom(Atom) -> atom_to_binary(Atom);
 err_msg(Reason) -> list_to_binary(err_msg_str(Reason)).
@@ -574,6 +546,9 @@ max_conn(Int1, Int2) -> Int1 + Int2.
 update(Path, Conf) ->
     wrap(emqx_conf:update(Path, {update, Conf}, ?OPTS(cluster))).
 
+action(Path, Action, Conf) ->
+    wrap(emqx_conf:update(Path, {action, Action, Conf}, ?OPTS(cluster))).
+
 create(Path, Conf) ->
     wrap(emqx_conf:update(Path, {create, Conf}, ?OPTS(cluster))).
 

+ 1 - 1
mix.exs

@@ -54,7 +54,7 @@ defmodule EMQXUmbrella.MixProject do
       {:esockd, github: "emqx/esockd", tag: "5.9.3", override: true},
       {:ekka, github: "emqx/ekka", tag: "0.12.9", override: true},
       {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
-      {:minirest, github: "emqx/minirest", tag: "1.3.3", override: true},
+      {:minirest, github: "emqx/minirest", tag: "1.3.4", override: true},
       {:ecpool, github: "emqx/ecpool", tag: "0.5.2"},
       {:replayq, "0.3.4", override: true},
       {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},

+ 1 - 1
rebar.config

@@ -56,7 +56,7 @@
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.3"}}}
     , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.12.9"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
-    , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.3"}}}
+    , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.4"}}}
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
     , {replayq, "0.3.4"}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}