Sfoglia il codice sorgente

fix(gateway): add node_status into the result of /gateway/{name}/listeners

firest 3 anni fa
parent
commit
db9cb6c4a0

+ 3 - 0
apps/emqx/priv/bpapi.versions

@@ -24,3 +24,6 @@
 {emqx_delayed,1}.
 {emqx_mgmt_cluster,1}.
 {emqx_retainer,1}.
+{emqx_gateway_http_proto_v1,1}.
+{emqx_gateway_api_listeners_proto_v1,1}.
+

+ 22 - 0
apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf

@@ -111,4 +111,26 @@ emqx_gateway_api_listeners {
             zh: """监听器 ID"""
         }
     }
+
+    listener_node_status {
+        desc {
+            en: """listener status of each node in the cluster"""
+            zh: """监听器在集群中每个节点上的状态"""
+        }
+    }
+
+    node {
+        desc {
+            en: """Node Name"""
+            zh: """节点名称"""
+        }
+    }
+
+    running {
+        desc {
+            en: """Is In Listening"""
+            zh: """是否正在监听"""
+        }
+    }
+
 }

+ 118 - 2
apps/emqx_gateway/src/emqx_gateway_api_listeners.erl

@@ -58,6 +58,9 @@
     import_users/2
 ]).
 
+%% RPC
+-export([do_listeners_cluster_status/1]).
+
 %%--------------------------------------------------------------------
 %% minirest behaviour callbacks
 %%--------------------------------------------------------------------
@@ -80,7 +83,38 @@ paths() ->
 
 listeners(get, #{bindings := #{name := Name0}}) ->
     with_gateway(Name0, fun(GwName, _) ->
-        {200, emqx_gateway_conf:listeners(GwName)}
+        Listeners = emqx_gateway_conf:listeners(GwName),
+        ListenOns = lists:map(
+            fun(#{id := Id, <<"bind">> := ListenOn}) ->
+                {Id, ListenOn}
+            end,
+            Listeners
+        ),
+
+        ClusterStatus = listeners_cluster_status(ListenOns),
+
+        Result = lists:map(
+            fun(#{id := Id} = Listener) ->
+                Listener#{
+                    node_status =>
+                        lists:foldl(
+                            fun(Info, Acc) ->
+                                case maps:get(Id, Info, undefined) of
+                                    undefined ->
+                                        Acc;
+                                    Status ->
+                                        [Status | Acc]
+                                end
+                            end,
+                            [],
+                            ClusterStatus
+                        )
+                }
+            end,
+            Listeners
+        ),
+
+        {200, Result}
     end);
 listeners(post, #{bindings := #{name := Name0}, body := LConf}) ->
     with_gateway(Name0, fun(GwName, Gateway) ->
@@ -280,7 +314,7 @@ schema("/gateway/:name/listeners") ->
                     ?STANDARD_RESP(
                         #{
                             200 => emqx_dashboard_swagger:schema_with_example(
-                                hoconsc:array(emqx_gateway_api:listener_schema()),
+                                hoconsc:array(listener_node_status_schema()),
                                 examples_listener_list()
                             )
                         }
@@ -553,9 +587,32 @@ params_paging_in_qs() ->
 roots() ->
     [listener].
 
+fields(listener_node_status) ->
+    [{node_status, mk(hoconsc:array(ref(node_status)), #{desc => ?DESC(listener_node_status)})}];
+fields(node_status) ->
+    [
+        {node, mk(node, #{desc => ?DESC(node)})},
+        {running, mk(boolean(), #{desc => ?DESC(running)})}
+    ];
+fields(tcp_listener) ->
+    emqx_gateway_api:fields(tcp_listener) ++ fields(listener_node_status);
+fields(ssl_listener) ->
+    emqx_gateway_api:fields(ssl_listener) ++ fields(listener_node_status);
+fields(udp_listener) ->
+    emqx_gateway_api:fields(udp_listener) ++ fields(listener_node_status);
+fields(dtls_listener) ->
+    emqx_gateway_api:fields(dtls_listener) ++ fields(listener_node_status);
 fields(_) ->
     [].
 
+listener_node_status_schema() ->
+    hoconsc:union([
+        ref(tcp_listener),
+        ref(ssl_listener),
+        ref(udp_listener),
+        ref(dtls_listener)
+    ]).
+
 %%--------------------------------------------------------------------
 %% examples
 
@@ -587,7 +644,13 @@ examples_listener() ->
                                 high_watermark => <<"1MB">>,
                                 nodelay => false,
                                 reuseaddr => true
+                            },
+                        node_status => [
+                            #{
+                                node => <<"node@127.0.0.1">>,
+                                running => true
                             }
+                        ]
                     }
             },
         ssl_listener =>
@@ -620,7 +683,13 @@ examples_listener() ->
                             #{
                                 active_n => 100,
                                 backlog => 1024
+                            },
+                        node_status => [
+                            #{
+                                node => <<"node@127.0.0.1">>,
+                                running => true
                             }
+                        ]
                     }
             },
         udp_listener =>
@@ -639,7 +708,13 @@ examples_listener() ->
                                 buffer => <<"10KB">>,
                                 reuseaddr => true
                             }
+                    },
+                node_status => [
+                    #{
+                        node => <<"node@127.0.0.1">>,
+                        running => true
                     }
+                ]
             },
         dtls_listener =>
             #{
@@ -666,7 +741,13 @@ examples_listener() ->
                             #{
                                 active_n => 100,
                                 backlog => 1024
+                            },
+                        node_status => [
+                            #{
+                                node => <<"node@127.0.0.1">>,
+                                running => true
                             }
+                        ]
                     }
             },
         dtls_listener_with_psk_ciphers =>
@@ -694,7 +775,13 @@ examples_listener() ->
                                         "RSA-PSK-AES128-CBC-SHA256,RSA-PSK-AES256-CBC-SHA,RSA-PSK-AES128-CBC-SHA"
                                     >>,
                                 fail_if_no_peer_cert => false
+                            },
+                        node_status => [
+                            #{
+                                node => <<"node@127.0.0.1">>,
+                                running => true
                             }
+                        ]
                     }
             },
         lisetner_with_authn =>
@@ -715,7 +802,36 @@ examples_listener() ->
                                 password_hash_algorithm =>
                                     #{name => <<"sha256">>},
                                 user_id_type => <<"username">>
+                            },
+                        node_status => [
+                            #{
+                                node => <<"node@127.0.0.1">>,
+                                running => true
                             }
+                        ]
                     }
             }
     }.
+
+listeners_cluster_status(Listeners) ->
+    Nodes = mria_mnesia:running_nodes(),
+    case emqx_gateway_api_listeners_proto_v1:listeners_cluster_status(Nodes, Listeners) of
+        {Results, []} ->
+            Results;
+        {_, _BadNodes} ->
+            error(badrpc)
+    end.
+
+do_listeners_cluster_status(Listeners) ->
+    Node = node(),
+    maps:from_list(
+        lists:map(
+            fun({Id, ListenOn}) ->
+                {Id, #{
+                    node => Node,
+                    running => emqx_gateway_utils:is_running(Id, ListenOn)
+                }}
+            end,
+            Listeners
+        )
+    ).

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway_http.erl

@@ -146,7 +146,7 @@ gateway_status(GwName) ->
 
 cluster_gateway_status(GwName) ->
     Nodes = mria_mnesia:running_nodes(),
-    case emqx_gateway_http_proto_v1:get_node_status(Nodes, GwName) of
+    case emqx_gateway_http_proto_v1:get_cluster_status(Nodes, GwName) of
         {Results, []} ->
             Results;
         {_, _BadNodes} ->

+ 3 - 1
apps/emqx_gateway/src/emqx_gateway_utils.erl

@@ -326,7 +326,9 @@ parse_listener_id(Id) ->
         _:_ -> error({invalid_listener_id, Id})
     end.
 
-is_running(ListenerId, #{<<"bind">> := ListenOn0}) ->
+is_running(ListenerId, #{<<"bind">> := ListenOn}) ->
+    is_running(ListenerId, ListenOn);
+is_running(ListenerId, ListenOn0) ->
     ListenOn = emqx_gateway_utils:parse_listenon(ListenOn0),
     try esockd:listener({ListenerId, ListenOn}) of
         Pid when is_pid(Pid) ->

+ 34 - 0
apps/emqx_gateway/src/proto/emqx_gateway_api_listeners_proto_v1.erl

@@ -0,0 +1,34 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_gateway_api_listeners_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+    listeners_cluster_status/2
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.0.0".
+
+-spec listeners_cluster_status([node()], list()) ->
+    emqx_rpc:multicall_result([map()]).
+listeners_cluster_status(Nodes, Listeners) ->
+    rpc:multicall(Nodes, emqx_gateway_api_listeners, do_listeners_cluster_status, [Listeners]).

+ 0 - 1
apps/emqx_gateway/src/proto/emqx_gateway_http_proto_v1.erl

@@ -20,7 +20,6 @@
 
 -export([
     introduced_in/0,
-
     get_cluster_status/2
 ]).