Просмотр исходного кода

refactor: mgmt rm `generate_response/1`

JimMoen 3 лет назад
Родитель
Сommit
9998b613c8

+ 18 - 13
apps/emqx/src/emqx_listeners.erl

@@ -87,13 +87,21 @@ do_list_raw() ->
     Listeners = maps:to_list(RawWithDefault),
     lists:flatmap(fun format_raw_listeners/1, Listeners).
 
-format_raw_listeners({Type, Conf}) ->
+format_raw_listeners({Type0, Conf}) ->
+    Type = binary_to_atom(Type0),
     lists:map(
         fun({LName, LConf0}) when is_map(LConf0) ->
-            Running = is_running(binary_to_atom(Type), listener_id(Type, LName), LConf0),
+            Bind = parse_bind(LConf0),
+            Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}),
             LConf1 = maps:remove(<<"authentication">>, LConf0),
             LConf2 = maps:put(<<"running">>, Running, LConf1),
-            {Type, LName, LConf2}
+            CurrConn =
+                case Running of
+                    true -> current_conns(Type, LName, Bind);
+                    false -> 0
+                end,
+            LConf3 = maps:put(<<"current_connections">>, CurrConn, LConf2),
+            {Type0, LName, LConf3}
         end,
         maps:to_list(Conf)
     ).
@@ -112,16 +120,7 @@ is_running(ListenerId) ->
     end.
 
 is_running(Type, ListenerId, Conf) when Type =:= tcp; Type =:= ssl ->
-    ListenOn =
-        case Conf of
-            #{bind := Bind} ->
-                Bind;
-            #{<<"bind">> := Bind} ->
-                case emqx_schema:to_ip_port(binary_to_list(Bind)) of
-                    {ok, L} -> L;
-                    {error, _} -> binary_to_integer(Bind)
-                end
-        end,
+    #{bind := ListenOn} = Conf,
     try esockd:listener({ListenerId, ListenOn}) of
         Pid when is_pid(Pid) ->
             true
@@ -545,3 +544,9 @@ str(B) when is_binary(B) ->
     binary_to_list(B);
 str(S) when is_list(S) ->
     S.
+
+parse_bind(#{<<"bind">> := Bind}) ->
+    case emqx_schema:to_ip_port(binary_to_list(Bind)) of
+        {ok, L} -> L;
+        {error, _} -> binary_to_integer(Bind)
+    end.

+ 11 - 2
apps/emqx_authn/src/emqx_authn_api.erl

@@ -1162,8 +1162,17 @@ delete_user(ChainName, AuthenticatorID, UserID) ->
     end.
 
 list_users(ChainName, AuthenticatorID, QueryString) ->
-    Response = emqx_authentication:list_users(ChainName, AuthenticatorID, QueryString),
-    emqx_mgmt_util:generate_response(Response).
+    case emqx_authentication:list_users(ChainName, AuthenticatorID, QueryString) of
+        {error, page_limit_invalid} ->
+            {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
+        {error, Reason} ->
+            {400, #{
+                code => <<"INVALID_PARAMETER">>,
+                message => list_to_binary(io_lib:format("Reason ~p", [Reason]))
+            }};
+        Result ->
+            {200, Result}
+    end.
 
 update_config(Path, ConfigRequest) ->
     emqx_conf:update(Path, ConfigRequest, #{

+ 34 - 16
apps/emqx_authz/src/emqx_authz_api_mnesia.erl

@@ -405,14 +405,23 @@ fields(meta) ->
 %%--------------------------------------------------------------------
 
 users(get, #{query_string := QueryString}) ->
-    Response = emqx_mgmt_api:node_query(
-        node(),
-        QueryString,
-        ?ACL_TABLE,
-        ?ACL_USERNAME_QSCHEMA,
-        ?QUERY_USERNAME_FUN
-    ),
-    emqx_mgmt_util:generate_response(Response);
+    case
+        emqx_mgmt_api:node_query(
+            node(),
+            QueryString,
+            ?ACL_TABLE,
+            ?ACL_USERNAME_QSCHEMA,
+            ?QUERY_USERNAME_FUN
+        )
+    of
+        {error, page_limit_invalid} ->
+            {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
+        {error, Node, {badrpc, R}} ->
+            Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
+            {500, #{code => <<"NODE_DOWN">>, message => Message}};
+        Result ->
+            {200, Result}
+    end;
 users(post, #{body := Body}) when is_list(Body) ->
     lists:foreach(
         fun(#{<<"username">> := Username, <<"rules">> := Rules}) ->
@@ -423,14 +432,23 @@ users(post, #{body := Body}) when is_list(Body) ->
     {204}.
 
 clients(get, #{query_string := QueryString}) ->
-    Response = emqx_mgmt_api:node_query(
-        node(),
-        QueryString,
-        ?ACL_TABLE,
-        ?ACL_CLIENTID_QSCHEMA,
-        ?QUERY_CLIENTID_FUN
-    ),
-    emqx_mgmt_util:generate_response(Response);
+    case
+        emqx_mgmt_api:node_query(
+            node(),
+            QueryString,
+            ?ACL_TABLE,
+            ?ACL_CLIENTID_QSCHEMA,
+            ?QUERY_CLIENTID_FUN
+        )
+    of
+        {error, page_limit_invalid} ->
+            {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
+        {error, Node, {badrpc, R}} ->
+            Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
+            {500, #{code => <<"NODE_DOWN">>, message => Message}};
+        Result ->
+            {200, Result}
+    end;
 clients(post, #{body := Body}) when is_list(Body) ->
     lists:foreach(
         fun(#{<<"clientid">> := ClientID, <<"rules">> := Rules}) ->

+ 31 - 22
apps/emqx_gateway/src/emqx_gateway_api_clients.erl

@@ -101,30 +101,39 @@ clients(get, #{
     bindings := #{name := Name0},
     query_string := QString
 }) ->
-    with_gateway(Name0, fun(GwName, _) ->
+    Fun = fun(GwName, _) ->
         TabName = emqx_gateway_cm:tabname(info, GwName),
-        case maps:get(<<"node">>, QString, undefined) of
-            undefined ->
-                Response = emqx_mgmt_api:cluster_query(
-                    QString,
-                    TabName,
-                    ?CLIENT_QSCHEMA,
-                    ?QUERY_FUN
-                ),
-                emqx_mgmt_util:generate_response(Response);
-            Node1 ->
-                Node = binary_to_atom(Node1, utf8),
-                QStringWithoutNode = maps:without([<<"node">>], QString),
-                Response = emqx_mgmt_api:node_query(
-                    Node,
-                    QStringWithoutNode,
-                    TabName,
-                    ?CLIENT_QSCHEMA,
-                    ?QUERY_FUN
-                ),
-                emqx_mgmt_util:generate_response(Response)
+        Result =
+            case maps:get(<<"node">>, QString, undefined) of
+                undefined ->
+                    emqx_mgmt_api:cluster_query(
+                        QString,
+                        TabName,
+                        ?CLIENT_QSCHEMA,
+                        ?QUERY_FUN
+                    );
+                Node0 ->
+                    Node1 = binary_to_atom(Node0, utf8),
+                    QStringWithoutNode = maps:without([<<"node">>], QString),
+                    emqx_mgmt_api:node_query(
+                        Node1,
+                        QStringWithoutNode,
+                        TabName,
+                        ?CLIENT_QSCHEMA,
+                        ?QUERY_FUN
+                    )
+            end,
+        case Result of
+            {error, page_limit_invalid} ->
+                {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
+            {error, Node, {badrpc, R}} ->
+                Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
+                {500, #{code => <<"NODE_DOWN">>, message => Message}};
+            Response ->
+                {200, Response}
         end
-    end).
+    end,
+    with_gateway(Name0, Fun).
 
 clients_insta(get, #{
     bindings := #{

+ 9 - 2
apps/emqx_management/src/emqx_mgmt_api_alarms.erl

@@ -91,8 +91,15 @@ alarms(get, #{query_string := QString}) ->
             true -> ?ACTIVATED_ALARM;
             false -> ?DEACTIVATED_ALARM
         end,
-    Response = emqx_mgmt_api:cluster_query(QString, Table, [], {?MODULE, query}),
-    emqx_mgmt_util:generate_response(Response);
+    case emqx_mgmt_api:cluster_query(QString, Table, [], {?MODULE, query}) of
+        {error, page_limit_invalid} ->
+            {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
+        {error, Node, {badrpc, R}} ->
+            Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
+            {500, #{code => <<"NODE_DOWN">>, message => Message}};
+        Response ->
+            {200, Response}
+    end;
 
 alarms(delete, _Params) ->
     _ = emqx_mgmt:delete_all_deactivated_alarms(),

+ 18 - 11
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -454,17 +454,24 @@ set_keepalive(put, #{bindings := #{clientid := ClientID}, body := Body}) ->
 %% api apply
 
 list_clients(QString) ->
-    case maps:get(<<"node">>, QString, undefined) of
-        undefined ->
-            Response = emqx_mgmt_api:cluster_query(QString, ?CLIENT_QTAB,
-                                                   ?CLIENT_QSCHEMA, ?QUERY_FUN),
-            emqx_mgmt_util:generate_response(Response);
-        Node1 ->
-            Node = binary_to_atom(Node1, utf8),
-            QStringWithoutNode = maps:without([<<"node">>], QString),
-            Response = emqx_mgmt_api:node_query(Node, QStringWithoutNode,
-                                                ?CLIENT_QTAB, ?CLIENT_QSCHEMA, ?QUERY_FUN),
-            emqx_mgmt_util:generate_response(Response)
+    Result = case maps:get(<<"node">>, QString, undefined) of
+                 undefined ->
+                     emqx_mgmt_api:cluster_query(QString, ?CLIENT_QTAB,
+                                                 ?CLIENT_QSCHEMA, ?QUERY_FUN);
+                 Node0 ->
+                     Node1 = binary_to_atom(Node0, utf8),
+                     QStringWithoutNode = maps:without([<<"node">>], QString),
+                     emqx_mgmt_api:node_query(Node1, QStringWithoutNode,
+                                              ?CLIENT_QTAB, ?CLIENT_QSCHEMA, ?QUERY_FUN)
+             end,
+    case Result of
+        {error, page_limit_invalid} ->
+            {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
+        {error, Node, {badrpc, R}} ->
+            Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
+            {500, #{code => <<"NODE_DOWN">>, message => Message}};
+        Response ->
+            {200, Response}
     end.
 
 lookup(#{clientid := ClientID}) ->

+ 88 - 10
apps/emqx_management/src/emqx_mgmt_api_listeners.erl

@@ -20,8 +20,11 @@
 
 -export([namespace/0, api_spec/0, paths/0, schema/1, fields/1]).
 -import(emqx_dashboard_swagger, [error_codes/2, error_codes/1]).
+-define(LISTENER_TYPE, [quic, wss, ws, ssl, tcp]).
+-define(LISTENER_STATUS, [enable, disable]).
 
 -export([
+    listener_status/2,
     list_listeners/2,
     crud_listeners_by_id/2,
     list_listeners_on_node/2,
@@ -55,6 +58,7 @@ api_spec() ->
 
 paths() ->
     [
+        "/listener/status",
         "/listeners",
         "/listeners/:id",
         "/listeners/:id/:action",
@@ -63,13 +67,29 @@ paths() ->
         "/nodes/:node/listeners/:id/:action"
     ].
 
+
+schema("/listener/status") ->
+    #{
+        'operationId' => listener_status,
+        get => #{
+            tags => [<<"listeners">>],
+            desc => <<"List all running node's listeners live status.">>,
+            %% responses => #{200 => ?HOCON(?ARRAY(?R_REF(listeners)))}
+            %% Current we only support all node's listeners is the same,
+            %% so we don't return the node information right now.
+            responses => #{200 => ?HOCON(?ARRAY(?R_REF(listener_status)))}
+        }
+    };
 schema("/listeners") ->
     #{
         'operationId' => list_listeners,
         get => #{
             tags => [<<"listeners">>],
             desc => <<"List all running node's listeners.">>,
-            responses => #{200 => ?HOCON(?ARRAY(?R_REF(listeners)))}
+            %% responses => #{200 => ?HOCON(?ARRAY(?R_REF(listeners)))}
+            %% Current we only support all node's listeners is the same,
+            %% so we don't return the node information right now.
+            responses => #{200 => ?HOCON(?ARRAY(listener_schema()))}
         }
     };
 schema("/listeners/:id") ->
@@ -80,17 +100,29 @@ schema("/listeners/:id") ->
             desc => <<"List all running node's listeners for the specified id.">>,
             parameters => [?R_REF(listener_id)],
             responses => #{
-                200 => ?HOCON(?ARRAY(?R_REF(listeners)))
+                %% 200 => ?HOCON(?ARRAY(?R_REF(listeners)))
+                200 => ?HOCON(listener_schema())
             }
         },
         put => #{
             tags => [<<"listeners">>],
-            desc => <<"Create or update the specified listener on all nodes.">>,
+            desc => <<"Update the specified listener on all nodes.">>,
+            parameters => [?R_REF(listener_id)],
+            'requestBody' => ?HOCON(listener_schema(), #{}),
+            responses => #{
+                200 => ?HOCON(listener_schema(), #{}),
+                400 => error_codes(['BAD_REQUEST']),
+                404 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND)
+            }
+        },
+        post => #{
+            tags => [<<"listeners">>],
+            desc => <<"Create the specified listener on all nodes.">>,
             parameters => [?R_REF(listener_id)],
             'requestBody' => ?HOCON(listener_schema(), #{}),
             responses => #{
                 200 => ?HOCON(listener_schema(), #{}),
-                400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND)
+                400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'])
             }
         },
         delete => #{
@@ -230,6 +262,23 @@ fields(node) ->
                 in => path
             })}
     ];
+fields(listener_status) ->
+    [
+        {type, ?HOCON(?ENUM(?LISTENER_TYPE), #{desc => "Listener type", required => true})},
+        {enable, ?HOCON(boolean(), #{desc => "Listener enable", required => true})},
+        {number, ?HOCON(non_neg_integer(), #{desc => "Listener number", required => true})},
+        {status, ?HOCON(?R_REF(status))},
+        {node_status, ?HOCON(?ARRAY(?R_REF(node_status)))}
+    ];
+fields(status) ->
+    [
+        {max_connections,
+            ?HOCON(hoconsc:union([infinity, integer()]), #{desc => "Max connections"})},
+        {current_connections,
+            ?HOCON(non_neg_integer(), #{desc => "Current connections"})}
+    ];
+fields(node_status) ->
+    fields(node) ++ fields(status);
 fields(Type) ->
     Listeners = listeners_info(),
     [Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type],
@@ -244,6 +293,7 @@ listeners_info() ->
         fun({Type, #{type := ?MAP(_Name, ?R_REF(Mod, Field))}}) ->
             Fields0 = hocon_schema:fields(Mod, Field),
             Fields1 = lists:keydelete("authentication", 1, Fields0),
+            Fields2 = lists:keydelete("limiter", 1, Fields1),
             TypeAtom = list_to_existing_atom(Type),
             #{
                 ref => ?R_REF(TypeAtom),
@@ -256,7 +306,7 @@ listeners_info() ->
                             required => true,
                             validator => fun validate_id/1
                         })}
-                    | Fields1
+                    | Fields2
                 ]
             }
         end,
@@ -270,6 +320,10 @@ validate_id(Id) ->
     end.
 
 %% api
+listener_status(get, _Request) ->
+
+    {200, []}.
+
 list_listeners(get, _Request) ->
     {200, list_listeners()}.
 
@@ -278,11 +332,35 @@ crud_listeners_by_id(get, #{bindings := #{id := Id}}) ->
 crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) ->
     case parse_listener_conf(Body0) of
         {Id, Type, Name, Conf} ->
-            case emqx_conf:update([listeners, Type, Name], Conf, ?OPTS(cluster)) of
-                {ok, #{raw_config := _RawConf}} ->
-                    crud_listeners_by_id(get, #{bindings => #{id => Id}});
-                {error, Reason} ->
-                    {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
+            Key = [listeners, Type, Name],
+            case emqx_conf:get(Key, undefined) of
+                undefined -> {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}};
+                _PrevConf ->
+                    case emqx_conf:update(Key, Conf, ?OPTS(cluster)) of
+                        {ok, #{raw_config := _RawConf}} ->
+                            crud_listeners_by_id(get, #{bindings => #{id => Id}});
+                        {error, Reason} ->
+                            {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
+                    end
+            end;
+        {error, Reason} ->
+            {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
+        _ ->
+            {400, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_ID_INCONSISTENT}}
+    end;
+crud_listeners_by_id(post, #{bindings := #{id := Id}, body := Body0}) ->
+    case parse_listener_conf(Body0) of
+        {Id, Type, Name, Conf} ->
+            Key = [listeners, Type, Name],
+            case emqx_conf:get(Key, undefined) of
+                undefined ->
+                    case emqx_conf:update([listeners, Type, Name], Conf, ?OPTS(cluster)) of
+                        {ok, #{raw_config := _RawConf}} ->
+                            crud_listeners_by_id(get, #{bindings => #{id => Id}});
+                        {error, Reason} ->
+                            {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
+                    end;
+                _ -> {400, #{code => 'BAD_LISTENER_ID', message => <<"Already Exist">>}}
             end;
         {error, Reason} ->
             {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};

+ 17 - 9
apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl

@@ -113,15 +113,23 @@ parameters() ->
     ].
 
 subscriptions(get, #{query_string := QString}) ->
-    case maps:get(<<"node">>, QString, undefined) of
-        undefined ->
-            Response = emqx_mgmt_api:cluster_query(QString, ?SUBS_QTABLE,
-                                                   ?SUBS_QSCHEMA, ?QUERY_FUN),
-            emqx_mgmt_util:generate_response(Response);
-        Node ->
-            Response = emqx_mgmt_api:node_query(binary_to_atom(Node, utf8), QString,
-                                                ?SUBS_QTABLE, ?SUBS_QSCHEMA, ?QUERY_FUN),
-            emqx_mgmt_util:generate_response(Response)
+    Response =
+        case maps:get(<<"node">>, QString, undefined) of
+            undefined ->
+                emqx_mgmt_api:cluster_query(QString, ?SUBS_QTABLE,
+                                            ?SUBS_QSCHEMA, ?QUERY_FUN);
+            Node0 ->
+                emqx_mgmt_api:node_query(binary_to_atom(Node0, utf8), QString,
+                                         ?SUBS_QTABLE, ?SUBS_QSCHEMA, ?QUERY_FUN)
+        end,
+    case Response of
+        {error, page_limit_invalid} ->
+            {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
+        {error, Node, {badrpc, R}} ->
+            Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
+            {500, #{code => <<"NODE_DOWN">>, message => Message}};
+        Result ->
+            {200, Result}
     end.
 
 format(Items) when is_list(Items) ->

+ 10 - 3
apps/emqx_management/src/emqx_mgmt_api_topics.erl

@@ -103,9 +103,16 @@ topic(get, #{bindings := Bindings}) ->
 %%%==============================================================================================
 %% api apply
 do_list(Params) ->
-    Response = emqx_mgmt_api:node_query(
-        node(), Params, emqx_route, ?TOPICS_QUERY_SCHEMA, {?MODULE, query}),
-    emqx_mgmt_util:generate_response(Response).
+    case emqx_mgmt_api:node_query(
+        node(), Params, emqx_route, ?TOPICS_QUERY_SCHEMA, {?MODULE, query}) of
+        {error, page_limit_invalid} ->
+            {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
+        {error, Node, {badrpc, R}} ->
+            Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
+            {500, #{code => <<"NODE_DOWN">>, message => Message}};
+        Response ->
+            {200, Response}
+    end.
 
 lookup(#{topic := Topic}) ->
     case emqx_router:lookup_routes(Topic) of

+ 0 - 17
apps/emqx_management/src/emqx_mgmt_util.erl

@@ -43,9 +43,6 @@
         , batch_schema/1
         ]).
 
--export([generate_response/1]).
-
-
 -export([urldecode/1]).
 
 -define(KB, 1024).
@@ -262,17 +259,3 @@ bad_request() ->
     bad_request(<<"Bad Request">>).
 bad_request(Desc) ->
     object_schema(properties([{message, string}, {code, string}]), Desc).
-
-%%%==============================================================================================
-%% Response util
-
-generate_response(QueryResult) ->
-    case QueryResult of
-        {error, page_limit_invalid} ->
-            {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
-        {error, Node, {badrpc, R}} ->
-            Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
-            {500, #{code => <<"NODE_DOWN">>, message => Message}};
-        Response ->
-            {200, Response}
-    end.