Jelajahi Sumber

feat(listeners): Add identifier to listeners

Listeners are internally identifiered by the listen-on tuple
which is not UI friendly when we have to find a listener by this
'signature'.

The listeners are actually named in configs, but the names are
discarded in the parsing functions.

This commit is to keep the name and provide an API to find
listener by name (identifier).
Zaiming Shi 5 tahun lalu
induk
melakukan
153977609e

+ 1 - 0
lib-opensource/emqx_management/src/emqx_mgmt.erl

@@ -535,6 +535,7 @@ list_listeners(Node) when Node =:= node() ->
     Tcp = lists:map(fun({{Protocol, ListenOn}, _Pid}) ->
     Tcp = lists:map(fun({{Protocol, ListenOn}, _Pid}) ->
         #{protocol        => Protocol,
         #{protocol        => Protocol,
           listen_on       => ListenOn,
           listen_on       => ListenOn,
+          identifier      => emqx_listeners:find_id_by_listen_on(ListenOn),
           acceptors       => esockd:get_acceptors({Protocol, ListenOn}),
           acceptors       => esockd:get_acceptors({Protocol, ListenOn}),
           max_conns       => esockd:get_max_connections({Protocol, ListenOn}),
           max_conns       => esockd:get_max_connections({Protocol, ListenOn}),
           current_conns   => esockd:get_current_connections({Protocol, ListenOn}),
           current_conns   => esockd:get_current_connections({Protocol, ListenOn}),

+ 28 - 15
lib-opensource/emqx_management/src/emqx_mgmt_cli.erl

@@ -510,14 +510,14 @@ trace_off(Who, Name) ->
 
 
 listeners([]) ->
 listeners([]) ->
     foreach(fun({{Protocol, ListenOn}, _Pid}) ->
     foreach(fun({{Protocol, ListenOn}, _Pid}) ->
-                Info = [{acceptors,      esockd:get_acceptors({Protocol, ListenOn})},
+                Info = [{identifier,     {string, emqx_listeners:find_id_by_listen_on(ListenOn)}},
+                        {acceptors,      esockd:get_acceptors({Protocol, ListenOn})},
                         {max_conns,      esockd:get_max_connections({Protocol, ListenOn})},
                         {max_conns,      esockd:get_max_connections({Protocol, ListenOn})},
                         {current_conn,   esockd:get_current_connections({Protocol, ListenOn})},
                         {current_conn,   esockd:get_current_connections({Protocol, ListenOn})},
-                        {shutdown_count, esockd:get_shutdown_count({Protocol, ListenOn})}],
+                        {shutdown_count, esockd:get_shutdown_count({Protocol, ListenOn})}
+                       ],
                     emqx_ctl:print("listener on ~s:~s~n", [Protocol, esockd:to_string(ListenOn)]),
                     emqx_ctl:print("listener on ~s:~s~n", [Protocol, esockd:to_string(ListenOn)]),
-                foreach(fun({Key, Val}) ->
-                            emqx_ctl:print("  ~-16s: ~w~n", [Key, Val])
-                        end, Info)
+                foreach(fun indent_print/1, Info)
             end, esockd:listeners()),
             end, esockd:listeners()),
     foreach(fun({Protocol, Opts}) ->
     foreach(fun({Protocol, Opts}) ->
                 Info = [{acceptors,      maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0)},
                 Info = [{acceptors,      maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0)},
@@ -525,9 +525,7 @@ listeners([]) ->
                         {current_conn,   proplists:get_value(all_connections, Opts)},
                         {current_conn,   proplists:get_value(all_connections, Opts)},
                         {shutdown_count, []}],
                         {shutdown_count, []}],
                     emqx_ctl:print("listener on ~s:~p~n", [Protocol, proplists:get_value(port, Opts)]),
                     emqx_ctl:print("listener on ~s:~p~n", [Protocol, proplists:get_value(port, Opts)]),
-                foreach(fun({Key, Val}) ->
-                            emqx_ctl:print("  ~-16s: ~w~n", [Key, Val])
-                        end, Info)
+                foreach(fun indent_print/1, Info)
             end, ranch:info());
             end, ranch:info());
 
 
 listeners(["stop",  Name = "http" ++ _N, ListenOn]) ->
 listeners(["stop",  Name = "http" ++ _N, ListenOn]) ->
@@ -538,22 +536,32 @@ listeners(["stop",  Name = "http" ++ _N, ListenOn]) ->
             emqx_ctl:print("Failed to stop ~s listener on ~s, error:~p~n", [Name, ListenOn, Error])
             emqx_ctl:print("Failed to stop ~s listener on ~s, error:~p~n", [Name, ListenOn, Error])
     end;
     end;
 
 
-listeners(["stop", Proto, ListenOn]) ->
+listeners(["stop", "mqtt:" ++ _ = Identifier]) ->
+    stop_listener(emqx_listeners:find_by_id(Identifier), Identifier);
+
+listeners(["stop", _Proto, ListenOn]) ->
+    %% this clause is kept to be backward compatible
     ListenOn1 = case string:tokens(ListenOn, ":") of
     ListenOn1 = case string:tokens(ListenOn, ":") of
         [Port]     -> list_to_integer(Port);
         [Port]     -> list_to_integer(Port);
         [IP, Port] -> {IP, list_to_integer(Port)}
         [IP, Port] -> {IP, list_to_integer(Port)}
     end,
     end,
-    case emqx_listeners:stop_listener({list_to_atom(Proto), ListenOn1, []}) of
-        ok ->
-            emqx_ctl:print("Stop ~s listener on ~s successfully.~n", [Proto, ListenOn]);
-        {error, Error} ->
-            emqx_ctl:print("Failed to stop ~s listener on ~s, error:~p~n", [Proto, ListenOn, Error])
-    end;
+    stop_listener(emqx_listeners:find_by_listen_on(ListenOn1), ListenOn1);
 
 
 listeners(_) ->
 listeners(_) ->
     emqx_ctl:usage([{"listeners",                        "List listeners"},
     emqx_ctl:usage([{"listeners",                        "List listeners"},
                     {"listeners stop    <Proto> <Port>", "Stop a listener"}]).
                     {"listeners stop    <Proto> <Port>", "Stop a listener"}]).
 
 
+stop_listener(false, Input) ->
+    emqx_ctl:print("No such listener ~p~n", [Input]);
+stop_listener(#{listen_on := ListenOn} = Listener, _Input) ->
+    ID = emqx_listeners:identifier(Listener),
+    case emqx_listeners:stop_listener(Listener) of
+        ok ->
+            emqx_ctl:print("Stop ~s listener on ~s successfully.~n", [ID, ListenOn]);
+        {error, Error} ->
+            emqx_ctl:print("Failed to stop ~s listener on ~s, error:~p~n", [ID, ListenOn, Error])
+    end.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% @doc data Command
 %% @doc data Command
 
 
@@ -707,3 +715,8 @@ format(_, Val) ->
     Val.
     Val.
 
 
 bin(S) -> iolist_to_binary(S).
 bin(S) -> iolist_to_binary(S).
+
+indent_print({Key, {string, Val}}) ->
+    emqx_ctl:print("  ~-16s: ~s~n", [Key, Val]);
+indent_print({Key, Val}) ->
+    emqx_ctl:print("  ~-16s: ~w~n", [Key, Val]).

+ 19 - 5
priv/emqx.schema

@@ -1996,8 +1996,15 @@ end}.
                                   Other -> Other
                                   Other -> Other
                               end
                               end
                       end,
                       end,
-                      [{Atom(Type), ListenOnN, [{deflate_options, DeflateOpts(Prefix)},
-                                              {tcp_options, TcpOpts(Prefix)} | LisOpts(Prefix)]}]
+                      [#{ proto => Atom(Type)
+                        , name => Name
+                        , listen_on => ListenOnN
+                        , opts => [ {deflate_options, DeflateOpts(Prefix)}
+                                  , {tcp_options, TcpOpts(Prefix)}
+                                  | LisOpts(Prefix)
+                                  ]
+                        }
+                      ]
                    end,
                    end,
     SslListeners = fun(Type, Name) ->
     SslListeners = fun(Type, Name) ->
                        Prefix = string:join(["listener", Type, Name], "."),
                        Prefix = string:join(["listener", Type, Name], "."),
@@ -2005,9 +2012,16 @@ end}.
                            undefined ->
                            undefined ->
                                [];
                                [];
                            ListenOn ->
                            ListenOn ->
-                               [{Atom(Type), ListenOn, [{deflate_options, DeflateOpts(Prefix)},
-                                                        {tcp_options, TcpOpts(Prefix)},
-                                                        {ssl_options, SslOpts(Prefix)} | LisOpts(Prefix)]}]
+                               [#{ proto => Atom(Type)
+                                 , name => Name
+                                 , listen_on => ListenOn
+                                 , opts => [ {deflate_options, DeflateOpts(Prefix)}
+                                           , {tcp_options, TcpOpts(Prefix)}
+                                           , {ssl_options, SslOpts(Prefix)}
+                                           | LisOpts(Prefix)
+                                           ]
+                                 }
+                               ]
                        end
                        end
                    end,
                    end,
 
 

+ 63 - 14
src/emqx_listeners.erl

@@ -33,11 +33,43 @@
         , restart_listener/3
         , restart_listener/3
         ]).
         ]).
 
 
--type(listener() :: {esockd:proto(), esockd:listen_on(), [esockd:option()]}).
+-export([ find_id_by_listen_on/1
+        , find_by_listen_on/1
+        , find_by_id/1
+        , identifier/1
+        ]).
 
 
-%%--------------------------------------------------------------------
-%% APIs
-%%--------------------------------------------------------------------
+-type(listener() :: #{ name := binary()
+                     , proto := esockd:proto()
+                     , listen_on := esockd:listen_on()
+                     , opts := [esockd:option()]
+                     }).
+
+%% @doc Find listener identifier by listen-on.
+%% Return empty string (binary) if listener is not found in config.
+-spec(find_id_by_listen_on(esockd:listen_on()) -> binary()).
+find_id_by_listen_on(ListenOn) ->
+    case find_by_listen_on(ListenOn) of
+        false -> <<>>;
+        L -> identifier(L)
+    end.
+
+%% @doc Find listener by listen-on.
+%% Return 'false' if not found.
+-spec(find_by_listen_on(esockd:listen_on()) -> listener() | false).
+find_by_listen_on(ListenOn) ->
+    find_by_listen_on(ListenOn, emqx:get_env(listeners, [])).
+
+%% @doc Find listener by identifier.
+%% Return 'false' if not found.
+-spec(find_by_id(string() | binary()) -> listener() | false).
+find_by_id(Id) ->
+    find_by_id(iolist_to_binary(Id), emqx:get_env(listeners, [])).
+
+%% @doc Return the ID of the given listener.
+-spec identifier(listener()) -> binary().
+identifier(#{proto := Proto, name := Name}) ->
+    identifier(Proto, Name).
 
 
 %% @doc Start all listeners.
 %% @doc Start all listeners.
 -spec(start() -> ok).
 -spec(start() -> ok).
@@ -45,13 +77,14 @@ start() ->
     lists:foreach(fun start_listener/1, emqx:get_env(listeners, [])).
     lists:foreach(fun start_listener/1, emqx:get_env(listeners, [])).
 
 
 -spec(start_listener(listener()) -> ok).
 -spec(start_listener(listener()) -> ok).
-start_listener({Proto, ListenOn, Options}) ->
+start_listener(#{proto := Proto, name := Name, listen_on := ListenOn, opts := Options}) ->
+    ID = identifier(Proto, Name),
     case start_listener(Proto, ListenOn, Options) of
     case start_listener(Proto, ListenOn, Options) of
-        {ok, _} -> io:format("Start mqtt:~s listener on ~s successfully.~n",
-                             [Proto, format(ListenOn)]);
+        {ok, _} -> io:format("Start ~s listener on ~s successfully.~n",
+                             [ID, format(ListenOn)]);
         {error, Reason} ->
         {error, Reason} ->
-            io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~0p~n!",
-                      [Proto, format(ListenOn), Reason]),
+            io:format(standard_error, "Failed to start mqtt listener ~s on ~s - ~0p~n!",
+                      [ID, format(ListenOn), Reason]),
             error(Reason)
             error(Reason)
     end.
     end.
 
 
@@ -115,7 +148,7 @@ restart() ->
     lists:foreach(fun restart_listener/1, emqx:get_env(listeners, [])).
     lists:foreach(fun restart_listener/1, emqx:get_env(listeners, [])).
 
 
 -spec(restart_listener(listener()) -> any()).
 -spec(restart_listener(listener()) -> any()).
-restart_listener({Proto, ListenOn, Options}) ->
+restart_listener(#{proto := Proto, listen_on := ListenOn, opts := Options}) ->
     restart_listener(Proto, ListenOn, Options).
     restart_listener(Proto, ListenOn, Options).
 
 
 -spec(restart_listener(esockd:proto(), esockd:listen_on(), [esockd:option()]) -> any()).
 -spec(restart_listener(esockd:proto(), esockd:listen_on(), [esockd:option()]) -> any()).
@@ -138,14 +171,14 @@ stop() ->
     lists:foreach(fun stop_listener/1, emqx:get_env(listeners, [])).
     lists:foreach(fun stop_listener/1, emqx:get_env(listeners, [])).
 
 
 -spec(stop_listener(listener()) -> ok | {error, term()}).
 -spec(stop_listener(listener()) -> ok | {error, term()}).
-stop_listener({Proto, ListenOn, Opts}) ->
+stop_listener(#{proto := Proto, name := Name, listen_on := ListenOn, opts := Opts}) ->
+    ID = identifier(Proto, Name),
     StopRet = stop_listener(Proto, ListenOn, Opts),
     StopRet = stop_listener(Proto, ListenOn, Opts),
     case StopRet of
     case StopRet of
-        ok -> io:format("Stop mqtt:~s listener on ~s successfully.~n",
-                        [Proto, format(ListenOn)]);
+        ok -> io:format("Stop ~s listener on ~s successfully.~n", [ID, format(ListenOn)]);
         {error, Reason} ->
         {error, Reason} ->
             io:format(standard_error, "Failed to stop mqtt:~s listener on ~s - ~p~n.",
             io:format(standard_error, "Failed to stop mqtt:~s listener on ~s - ~p~n.",
-                      [Proto, format(ListenOn), Reason])
+                      [ID, format(ListenOn), Reason])
     end,
     end,
     StopRet.
     StopRet.
 
 
@@ -181,3 +214,19 @@ ws_name(Name, {_Addr, Port}) ->
     ws_name(Name, Port);
     ws_name(Name, Port);
 ws_name(Name, Port) ->
 ws_name(Name, Port) ->
     list_to_atom(lists:concat([Name, ":", Port])).
     list_to_atom(lists:concat([Name, ":", Port])).
+
+identifier(Proto, Name) when is_atom(Proto) ->
+    identifier(atom_to_list(Proto), Name);
+identifier(Proto, Name) ->
+    iolist_to_binary(["mqtt", ":", Proto, ":", Name]).
+
+find_by_listen_on(ListenOn, []) -> error({unknown_listener, ListenOn});
+find_by_listen_on(ListenOn, [#{listen_on := ListenOn} = L | _]) -> L;
+find_by_listen_on(ListenOn, [_ | Rest]) -> find_by_listen_on(ListenOn, Rest).
+
+find_by_id(_Id, []) -> false;
+find_by_id(Id, [L | Rest]) ->
+    case identifier(L) =:= Id of
+        true -> L;
+        false -> find_by_id(Id, Rest)
+    end.