Sfoglia il codice sorgente

refactor: listeners api; add: listeners list function; fix: listener
already start error

DDDHuang 4 anni fa
parent
commit
9642bcce88

+ 56 - 1
apps/emqx/src/emqx_listeners.erl

@@ -20,9 +20,11 @@
 -include("emqx_mqtt.hrl").
 
 %% APIs
--export([ start/0
+-export([ list/0
+        , start/0
         , restart/0
         , stop/0
+        , is_running/1
         ]).
 
 -export([ start_listener/1
@@ -33,6 +35,57 @@
         , restart_listener/3
         ]).
 
+-spec(list() -> [{ListenerId :: atom(), ListenerConf :: map()}]).
+list() ->
+    Zones = maps:to_list(emqx_config:get([zones], #{})),
+    lists:append([list(ZoneName, ZoneConf) || {ZoneName, ZoneConf} <- Zones]).
+
+list(ZoneName, ZoneConf) ->
+    Listeners = maps:to_list(maps:get(listeners, ZoneConf, #{})),
+    [
+        begin
+            ListenerId = listener_id(ZoneName, LName),
+            Running = is_running(ListenerId),
+            Conf = merge_zone_and_listener_confs(ZoneConf, LConf),
+            {ListenerId, maps:put(running, Running, Conf)}
+        end
+        || {LName, LConf} <- Listeners].
+
+-spec is_running(ListenerId :: atom()) -> boolean() | {error, no_found}.
+is_running(ListenerId) ->
+    Zones = maps:to_list(emqx_config:get([zones], #{})),
+    Listeners = lists:append(
+        [
+            [{listener_id(ZoneName, LName),merge_zone_and_listener_confs(ZoneConf, LConf)}
+                || {LName, LConf} <- maps:to_list(maps:get(listeners, ZoneConf, #{}))]
+        || {ZoneName, ZoneConf} <- Zones]),
+    case proplists:get_value(ListenerId, Listeners, undefined) of
+        undefined ->
+            {error, no_found};
+        Conf ->
+            is_running(ListenerId, Conf)
+    end.
+
+is_running(ListenerId, #{type := tcp, bind := ListenOn})->
+    try esockd:listener({ListenerId, ListenOn}) of
+        Pid when is_pid(Pid)->
+            true
+    catch _:_ ->
+        false
+    end;
+
+is_running(ListenerId, #{type := ws})->
+    try
+        Info = ranch:info(ListenerId),
+        proplists:get_value(status, Info) =:= running
+    catch _:_ ->
+        false
+    end;
+
+is_running(_ListenerId, #{type := quic})->
+%%    TODO: quic support
+    {error, no_found}.
+
 %% @doc Start all listeners.
 -spec(start() -> ok).
 start() ->
@@ -52,6 +105,8 @@ start_listener(ZoneName, ListenerName, #{type := Type, bind := Bind} = Conf) ->
         {ok, _} ->
             console_print("Start ~s listener ~s on ~s successfully.~n",
                 [Type, listener_id(ZoneName, ListenerName), format(Bind)]);
+        {error, {already_started, Pid}} ->
+            {error, {already_started, Pid}};
         {error, Reason} ->
             io:format(standard_error, "Failed to start ~s listener ~s on ~s: ~0p~n",
                       [Type, listener_id(ZoneName, ListenerName), format(Bind), Reason]),

+ 31 - 26
apps/emqx_management/src/emqx_mgmt.erl

@@ -85,7 +85,10 @@
 %% Listeners
 -export([ list_listeners/0
         , list_listeners/1
-        , restart_listener/2
+        , list_listeners/2
+        , list_listeners_by_id/1
+        , get_listener/2
+        , manage_listener/2
         ]).
 
 %% Alarms
@@ -451,37 +454,39 @@ reload_plugin(Node, Plugin) ->
 %%--------------------------------------------------------------------
 
 list_listeners() ->
-    [{Node, list_listeners(Node)} || Node <- ekka_mnesia:running_nodes()].
+    lists:append([list_listeners(Node) || Node <- ekka_mnesia:running_nodes()]).
+
+list_listeners(Node, Identifier) ->
+    listener_id_filter(Identifier, list_listeners(Node)).
 
 list_listeners(Node) when Node =:= node() ->
-    Tcp = lists:map(fun({{Protocol, ListenOn}, _Pid}) ->
-        #{protocol        => Protocol,
-          listen_on       => ListenOn,
-          identifier      => Protocol,
-          acceptors       => esockd:get_acceptors({Protocol, ListenOn}),
-          max_conns       => esockd:get_max_connections({Protocol, ListenOn}),
-          current_conns   => esockd:get_current_connections({Protocol, ListenOn}),
-          shutdown_count  => esockd:get_shutdown_count({Protocol, ListenOn})}
-    end, esockd:listeners()),
-    Http = lists:map(fun({Protocol, Opts}) ->
-        #{protocol        => Protocol,
-          listen_on       => proplists:get_value(port, Opts),
-          acceptors       => maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0),
-          max_conns       => proplists:get_value(max_connections, Opts),
-          current_conns   => proplists:get_value(all_connections, Opts),
-          shutdown_count  => []}
-    end, ranch:info()),
-    Tcp ++ Http;
+    [{Id, maps:put(node, Node, Conf)} || {Id, Conf} <- emqx_listeners:list()];
 
 list_listeners(Node) ->
     rpc_call(Node, list_listeners, [Node]).
 
--spec restart_listener(node(), atom()) -> ok | {error, term()}.
-restart_listener(Node, Identifier) when Node =:= node() ->
-    emqx_listeners:restart_listener(Identifier);
+list_listeners_by_id(Identifier) ->
+    listener_id_filter(Identifier, list_listeners()).
+
+get_listener(Node, Identifier) ->
+    case listener_id_filter(Identifier, list_listeners(Node)) of
+        [] ->
+            {error, not_found};
+        [Listener] ->
+            Listener
+    end.
+
+listener_id_filter(Identifier, Listeners) ->
+    Filter =
+        fun({Id, _}) -> Id =:= Identifier end,
+    lists:filter(Filter, Listeners).
 
-restart_listener(Node, Identifier) ->
-    rpc_call(Node, restart_listener, [Node, Identifier]).
+-spec manage_listener(Operation :: start_listener|stop_listener|restart_listener, Param :: map()) ->
+    ok | {error, Reason :: term()}.
+manage_listener(Operation, #{identifier := Identifier, node := Node}) when Node =:= node()->
+    erlang:apply(emqx_listeners, Operation, [Identifier]);
+manage_listener(Operation, Param = #{node := Node}) ->
+    rpc_call(Node, restart_listener, [Operation, Param]).
 
 %%--------------------------------------------------------------------
 %% Get Alarms
@@ -542,7 +547,7 @@ item(route, {Topic, Node}) ->
     #{topic => Topic, node => Node}.
 
 %%--------------------------------------------------------------------
-%% Internel Functions.
+%% Internal Functions.
 %%--------------------------------------------------------------------
 
 rpc_call(Node, Fun, Args) ->

+ 309 - 48
apps/emqx_management/src/emqx_mgmt_api_listeners.erl

@@ -16,58 +16,319 @@
 
 -module(emqx_mgmt_api_listeners).
 
--rest_api(#{name   => list_listeners,
-            method => 'GET',
-            path   => "/listeners/",
-            func   => list,
-            descr  => "A list of listeners in the cluster"}).
-
--rest_api(#{name   => list_node_listeners,
-            method => 'GET',
-            path   => "/nodes/:atom:node/listeners",
-            func   => list,
-            descr  => "A list of listeners on the node"}).
-
--rest_api(#{name   => restart_listener,
-            method => 'PUT',
-            path   => "/listeners/:atom:identifier/restart",
-            func   => restart,
-            descr  => "Restart a listener in the cluster"}).
-
--rest_api(#{name   => restart_node_listener,
-            method => 'PUT',
-            path   => "/nodes/:atom:node/listeners/:atom:identifier/restart",
-            func   => restart,
-            descr  => "Restart a listener on a node"}).
-
--export([list/2, restart/2]).
-
-%% List listeners on a node.
-list(#{node := Node}, _Params) ->
-    emqx_mgmt:return({ok, format(emqx_mgmt:list_listeners(Node))});
+-behavior(minirest_api).
+
+-export([api_spec/0]).
+
+-export([ listeners/2
+        , listener/2
+        , node_listener/2
+        , node_listeners/2
+        , manage_listeners/2
+        , manage_nodes_listeners/2]).
+
+-export([format/1]).
+
+-include_lib("emqx/include/emqx.hrl").
+
+api_spec() ->
+    {
+        [
+            listeners_api(),
+            restart_listeners_api(),
+            nodes_listeners_api(),
+            nodes_listener_api(),
+            manage_listeners_api(),
+            manage_nodes_listeners_api()
+        ],
+        [listener_schema()]
+    }.
+
+listener_schema() ->
+    #{
+        listener => #{
+        type => object,
+        properties => #{
+            node => #{
+                type => string,
+                description => <<"Node">>,
+                example => node()},
+            identifier => #{
+                type => string,
+                description => <<"Identifier">>},
+            acceptors => #{
+                type => integer,
+                description => <<"Number of Acceptor proce">>},
+            max_conn => #{
+                type => integer,
+                description => <<"Maximum number of allowed connection">>},
+            type => #{
+                type => string,
+                description => <<"Plugin decription">>},
+            listen_on => #{
+                type => string,
+                description => <<"Litening port">>},
+            running => #{
+                type => boolean,
+                description => <<"Open or close">>},
+            auth => #{
+                type => boolean,
+                description => <<"Has auth">>}}}}.
+
+listeners_api() ->
+    Metadata = #{
+        get => #{
+            description => "List listeners in cluster",
+            responses => #{
+                <<"200">> =>
+                    emqx_mgmt_util:response_array_schema(<<"List all listeners">>, <<"listener">>)}}},
+    {"/listeners", Metadata, listeners}.
+
+restart_listeners_api() ->
+    Metadata = #{
+        get => #{
+            description => "List listeners by listener ID",
+            parameters => [param_path_identifier()],
+            responses => #{
+                <<"404">> =>
+                    emqx_mgmt_util:response_error_schema(<<"Listener id not found">>, ['BAD_LISTENER_ID']),
+                <<"200">> =>
+                    emqx_mgmt_util:response_array_schema(<<"List listener info ok">>, <<"listener">>)}}},
+    {"/listeners/:identifier", Metadata, listener}.
+
+manage_listeners_api() ->
+    Metadata = #{
+        get => #{
+            description => "Restart listeners in cluster",
+            parameters => [
+                param_path_identifier(),
+                param_path_operation()],
+            responses => #{
+                <<"500">> =>
+                    emqx_mgmt_util:response_error_schema(<<"Operation  Failed">>, ['INTERNAL_ERROR']),
+                <<"404">> =>
+                    emqx_mgmt_util:response_error_schema(<<"Listener id not found">>,
+                        ['BAD_LISTENER_ID']),
+                <<"400">> =>
+                    emqx_mgmt_util:response_error_schema(<<"Listener id not found">>,
+                        ['BAD_REQUEST']),
+                <<"200">> =>
+                    emqx_mgmt_util:response_schema(<<"Operation success">>)}}},
+    {"/listeners/:identifier/:operation", Metadata, manage_listeners}.
+
+manage_nodes_listeners_api() ->
+    Metadata = #{
+        get => #{
+            description => "Restart listeners in cluster",
+            parameters => [
+                param_path_node(),
+                param_path_identifier(),
+                param_path_operation()],
+            responses => #{
+                <<"500">> =>
+                    emqx_mgmt_util:response_error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']),
+                <<"404">> =>
+                    emqx_mgmt_util:response_error_schema(<<"Bad node or Listener id not found">>,
+                        ['BAD_NODE_NAME','BAD_LISTENER_ID']),
+                <<"400">> =>
+                    emqx_mgmt_util:response_error_schema(<<"Listener id not found">>,
+                        ['BAD_REQUEST']),
+                <<"200">> =>
+                    emqx_mgmt_util:response_schema(<<"Operation success">>)}}},
+    {"/node/:node/listeners/:identifier/:operation", Metadata, manage_nodes_listeners}.
+
+nodes_listeners_api() ->
+    Metadata = #{
+        get => #{
+            description => "Get listener info in one node",
+            parameters => [param_path_node(), param_path_identifier()],
+            responses => #{
+                <<"404">> =>
+                    emqx_mgmt_util:response_error_schema(<<"Node name or listener id not found">>,
+                        ['BAD_NODE_NAME', 'BAD_LISTENER_ID']),
+                <<"200">> =>
+                    emqx_mgmt_util:response_schema(<<"Get listener info ok">>, <<"listener">>)}}},
+    {"/nodes/:node/listeners/:identifier", Metadata, node_listener}.
+
+nodes_listener_api() ->
+    Metadata = #{
+        get => #{
+            description => "List listeners in one node",
+            parameters => [param_path_node()],
+            responses => #{
+                <<"404">> =>
+                    emqx_mgmt_util:response_error_schema(<<"Listener id not found">>),
+                <<"200">> =>
+                    emqx_mgmt_util:response_schema(<<"Get listener info ok">>, <<"listener">>)}}},
+    {"/nodes/:node/listeners", Metadata, node_listeners}.
+%%%==============================================================================================
+%% parameters
+param_path_node() ->
+    #{
+        name => node,
+        in => path,
+        schema => #{type => string},
+        required => true,
+        example => node()
+    }.
+
+param_path_identifier() ->
+    {Example,_} = hd(emqx_mgmt:list_listeners(node())),
+    #{
+        name => identifier,
+        in => path,
+        schema => #{type => string},
+        required => true,
+        example => Example
+    }.
+
+param_path_operation()->
+    #{
+        name => operation,
+        in => path,
+        required => true,
+        schema => #{
+            type => string,
+            enum => [start, stop, restart]},
+        example => restart
+    }.
+
+%%%==============================================================================================
+%% api
+listeners(get, _Request) ->
+    list().
+
+listener(get, Request) ->
+    ListenerID = binary_to_atom(cowboy_req:binding(identifier, Request)),
+    get_listeners(#{identifier => ListenerID}).
+
+node_listeners(get, Request) ->
+    Node = binary_to_atom(cowboy_req:binding(node, Request)),
+    get_listeners(#{node => Node}).
+
+node_listener(get, Request) ->
+    Node = binary_to_atom(cowboy_req:binding(node, Request)),
+    ListenerID = binary_to_atom(cowboy_req:binding(identifier, Request)),
+    get_listeners(#{node => Node, identifier => ListenerID}).
+
+manage_listeners(_, Request) ->
+    Identifier = binary_to_atom(cowboy_req:binding(identifier, Request)),
+    Operation = binary_to_atom(cowboy_req:binding(operation, Request)),
+    manage(Operation, #{identifier => Identifier}).
+
+manage_nodes_listeners(_, Request) ->
+    Node = binary_to_atom(cowboy_req:binding(node, Request)),
+    Identifier = binary_to_atom(cowboy_req:binding(identifier, Request)),
+    Operation = binary_to_atom(cowboy_req:binding(operation, Request)),
+    manage(Operation, #{identifier => Identifier, node => Node}).
+
+%%%==============================================================================================
 
 %% List listeners in the cluster.
-list(_Binding, _Params) ->
-    emqx_mgmt:return({ok, [#{node => Node, listeners => format(Listeners)}
-                              || {Node, Listeners} <- emqx_mgmt:list_listeners()]}).
-
-%% Restart listeners on a node.
-restart(#{node := Node, identifier := Identifier}, _Params) ->
-    case emqx_mgmt:restart_listener(Node, Identifier) of
-        ok -> emqx_mgmt:return({ok, "Listener restarted."});
-        {error, Error} -> emqx_mgmt:return({error, Error})
-    end;
-%% Restart listeners on all nodes in the cluster.
-restart(#{identifier := Identifier}, _Params) ->
-    Results = [{Node, emqx_mgmt:restart_listener(Node, Identifier)} || {Node, _Info} <- emqx_mgmt:list_nodes()],
-    case lists:filter(fun({_, Result}) -> Result =/= ok end, Results) of
-        [] -> emqx_mgmt:return(ok);
-        Errors -> emqx_mgmt:return({error, {restart, Errors}})
+list() ->
+    {200, format(emqx_mgmt:list_listeners())}.
+
+get_listeners(Param) ->
+    case list_listener(Param) of
+        {error, not_found} ->
+            Identifier = maps:get(identifier, Param),
+            Reason = list_to_binary(io_lib:format("Error listener identifier ~p", [Identifier])),
+            {404, #{code => 'BAD_LISTENER_ID', message => Reason}};
+        {error, nodedown} ->
+            Node = maps:get(node, Param),
+            Reason = list_to_binary(io_lib:format("Node ~p rpc failed", [Node])),
+            Response = #{code => 'BAD_NODE_NAME', message => Reason},
+            {404, Response};
+        [] ->
+            Identifier = maps:get(identifier, Param),
+            Reason = list_to_binary(io_lib:format("Error listener identifier ~p", [Identifier])),
+            {404, #{code => 'BAD_LISTENER_ID', message => Reason}};
+        Data ->
+            {200, Data}
     end.
 
+manage(Operation0, Param) ->
+    OperationMap = #{start => start_listener, stop => stop_listener, restart => restart_listener},
+    Operation = maps:get(Operation0, OperationMap),
+    case list_listener(Param) of
+        {error, not_found} ->
+            Identifier = maps:get(identifier, Param),
+            Reason = list_to_binary(io_lib:format("Error listener identifier ~p", [Identifier])),
+            {404, #{code => 'BAD_LISTENER_ID', message => Reason}};
+        {error, nodedown} ->
+            Node = maps:get(node, Param),
+            Reason = list_to_binary(io_lib:format("Node ~p rpc failed", [Node])),
+            Response = #{code => 'BAD_NODE_NAME', message => Reason},
+            {404, Response};
+        [] ->
+            Identifier = maps:get(identifier, Param),
+            Reason = list_to_binary(io_lib:format("Error listener identifier ~p", [Identifier])),
+            {404, #{code => 'RESOURCE_NOT_FOUND', message => Reason}};
+        ListenersOrSingleListener ->
+            manage_(Operation, ListenersOrSingleListener)
+    end.
+
+manage_(Operation, Listener) when is_map(Listener) ->
+    manage_(Operation, [Listener]);
+manage_(Operation, Listeners) when is_list(Listeners) ->
+    Results = [emqx_mgmt:manage_listener(Operation, Listener) || Listener <- Listeners],
+    case lists:filter(fun(Result) -> Result =/= ok end, Results) of
+        [] ->
+            {200};
+        Errors ->
+            case lists:filter(fun({error, {already_started, _}}) -> false; (_) -> true end, Results) of
+                [] ->
+                    Identifier = maps:get(identifier, hd(Listeners)),
+                    Message = list_to_binary(io_lib:format("Already Started: ~s", [Identifier])),
+                    {400, #{code => 'BAD_REQUEST', message => Message}};
+                _ ->
+                    case lists:filter(fun({error,not_found}) -> false; (_) -> true end, Results) of
+                        [] ->
+                            Identifier = maps:get(identifier, hd(Listeners)),
+                            Message = list_to_binary(io_lib:format("Already Stoped: ~s", [Identifier])),
+                            {400, #{code => 'BAD_REQUEST', message => Message}};
+                        _ ->
+                            Reason = list_to_binary(io_lib:format("~p", [Errors])),
+                            {500, #{code => 'UNKNOW_ERROR', message => Reason}}
+                    end
+            end
+    end.
+
+%%%==============================================================================================
+%% util function
+list_listener(Params) ->
+    format(list_listener_(Params)).
+
+list_listener_(#{node := Node, identifier := Identifier}) ->
+    emqx_mgmt:get_listener(Node, Identifier);
+list_listener_(#{identifier := Identifier}) ->
+    emqx_mgmt:list_listeners_by_id(Identifier);
+list_listener_(#{node := Node}) ->
+    emqx_mgmt:list_listeners(Node);
+list_listener_(#{}) ->
+    emqx_mgmt:list_listeners().
+
 format(Listeners) when is_list(Listeners) ->
-    [ Info#{listen_on => list_to_binary(esockd:to_string(ListenOn))}
-     || Info = #{listen_on := ListenOn} <- Listeners ];
+    [format(Listener) || Listener <- Listeners];
 
-format({error, Reason}) -> [{error, Reason}].
+format({error, Reason}) ->
+    {error, Reason};
 
+format({Identifier, Conf}) ->
+    #{
+        identifier      => Identifier,
+        node            => maps:get(node, Conf),
+        acceptors       => maps:get(acceptors, Conf),
+        max_conn        => maps:get(max_connections, Conf),
+        type            => maps:get(type, Conf),
+        listen_on       => list_to_binary(esockd:to_string(maps:get(bind, Conf))),
+        running         => trans_running(Conf),
+        auth            => maps:get(enable, maps:get(auth, Conf))
+    }.
+trans_running(Conf) ->
+    case maps:get(running, Conf) of
+        {error, _} ->
+            false;
+        Running ->
+            Running
+    end.

+ 120 - 0
apps/emqx_management/test/emqx_mgmt_listeners_api_SUITE.erl

@@ -0,0 +1,120 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_mgmt_listeners_api_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+
+all() ->
+    emqx_ct:all(?MODULE).
+
+init_per_suite(Config) ->
+    ekka_mnesia:start(),
+    emqx_mgmt_auth:mnesia(boot),
+    emqx_ct_helpers:start_apps([emqx_management], fun set_special_configs/1),
+    Config.
+
+end_per_suite(_) ->
+    emqx_ct_helpers:stop_apps([emqx_management]).
+
+set_special_configs(emqx_management) ->
+    emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}],
+        applications =>[#{id => "admin", secret => "public"}]}),
+    ok;
+set_special_configs(_App) ->
+    ok.
+
+t_list_listeners(_) ->
+    Path = emqx_mgmt_api_test_util:api_path(["listeners"]),
+    get_api(Path).
+
+t_list_node_listeners(_) ->
+    Path = emqx_mgmt_api_test_util:api_path(["nodes", atom_to_binary(node(), utf8), "listeners"]),
+    get_api(Path).
+
+t_get_listeners(_) ->
+    LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())),
+    Identifier = maps:get(identifier, LocalListener),
+    Path = emqx_mgmt_api_test_util:api_path(["listeners", atom_to_list(Identifier)]),
+    get_api(Path).
+
+t_get_node_listeners(_) ->
+    LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())),
+    Identifier = maps:get(identifier, LocalListener),
+    Path = emqx_mgmt_api_test_util:api_path(
+        ["nodes", atom_to_binary(node(), utf8), "listeners", atom_to_list(Identifier)]),
+    get_api(Path).
+
+t_stop_listener(_) ->
+    LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())),
+    Identifier = maps:get(identifier, LocalListener),
+    Path = emqx_mgmt_api_test_util:api_path(["listeners", atom_to_list(Identifier), "stop"]),
+    {ok, _} = emqx_mgmt_api_test_util:request_api(get, Path),
+    GetPath = emqx_mgmt_api_test_util:api_path(["listeners", atom_to_list(Identifier)]),
+    {ok, ListenersResponse} = emqx_mgmt_api_test_util:request_api(get, GetPath),
+    Listeners = emqx_json:decode(ListenersResponse, [return_maps]),
+    [listener_stats(Listener, false) || Listener <- Listeners].
+
+get_api(Path) ->
+    {ok, ListenersData} = emqx_mgmt_api_test_util:request_api(get, Path),
+    LocalListeners = emqx_mgmt_api_listeners:format(emqx_mgmt:list_listeners()),
+    case emqx_json:decode(ListenersData, [return_maps]) of
+        [Listener] ->
+            Identifier = binary_to_atom(maps:get(<<"identifier">>, Listener), utf8),
+            Filter =
+                fun(Local) ->
+                    maps:get(identifier, Local) =:= Identifier
+                end,
+            LocalListener = hd(lists:filter(Filter, LocalListeners)),
+            comparison_listener(LocalListener, Listener);
+        Listeners when is_list(Listeners) ->
+            ?assertEqual(erlang:length(LocalListeners), erlang:length(Listeners)),
+            Fun =
+                fun(LocalListener) ->
+                    Identifier = maps:get(identifier, LocalListener),
+                    IdentifierBinary = atom_to_binary(Identifier, utf8),
+                    Filter =
+                        fun(Listener) ->
+                            maps:get(<<"identifier">>, Listener) =:= IdentifierBinary
+                        end,
+                    Listener = hd(lists:filter(Filter, Listeners)),
+                    comparison_listener(LocalListener, Listener)
+                end,
+            lists:foreach(Fun, LocalListeners);
+        Listener when is_map(Listener) ->
+            Identifier = binary_to_atom(maps:get(<<"identifier">>, Listener), utf8),
+            Filter =
+                fun(Local) ->
+                    maps:get(identifier, Local) =:= Identifier
+                end,
+            LocalListener = hd(lists:filter(Filter, LocalListeners)),
+            comparison_listener(LocalListener, Listener)
+    end.
+
+comparison_listener(Local, Response) ->
+    ?assertEqual(maps:get(identifier, Local), binary_to_atom(maps:get(<<"identifier">>, Response))),
+    ?assertEqual(maps:get(node, Local), binary_to_atom(maps:get(<<"node">>, Response))),
+    ?assertEqual(maps:get(acceptors, Local), maps:get(<<"acceptors">>, Response)),
+    ?assertEqual(maps:get(max_conn, Local), maps:get(<<"max_conn">>, Response)),
+    ?assertEqual(maps:get(listen_on, Local), maps:get(<<"listen_on">>, Response)),
+    ?assertEqual(maps:get(running, Local), maps:get(<<"running">>, Response)),
+    ?assertEqual(maps:get(auth, Local), maps:get(<<"auth">>, Response)).
+
+
+listener_stats(Listener, Stats) ->
+    ?assertEqual(maps:get(<<"running">>, Listener), Stats).