فهرست منبع

Merge pull request #6762 from JimMoen/refactor-topic-metrics-api

refactor(api): topic_metrics api swagger spec
JimMoen 4 سال پیش
والد
کامیت
e97e7f6fdd

+ 1 - 1
.github/workflows/run_api_tests.yaml

@@ -86,7 +86,7 @@ jobs:
     - uses: actions/checkout@v2
       with:
         repository: emqx/emqx-fvt
-        ref: 1.0.3-dev2
+        ref: 1.0.4-dev1
         path: .
     - uses: actions/setup-java@v1
       with:

+ 1 - 0
apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl

@@ -1799,6 +1799,7 @@ t_clients_api(_) ->
     %% kickout
     {204, _} =
         request(delete, "/gateway/mqttsn/clients/client_id_test1"),
+    timer:sleep(100),
     {200, #{data := []}} = request(get, "/gateway/mqttsn/clients"),
 
     send_disconnect_msg(Socket, undefined),

+ 2 - 2
apps/emqx_modules/src/emqx_telemetry_api.erl

@@ -71,13 +71,13 @@ schema("/telemetry/data") ->
      }.
 
 status_schema(Desc) ->
-    mk(ref(?MODULE, status), #{desc => Desc}).
+    mk(ref(?MODULE, status), #{in => body, desc => Desc}).
 
 fields(status) ->
     [ { enable
       , mk( boolean()
           , #{ desc => <<"Telemetry status">>
-             , default => false
+             , default => true
              , example => false
              })
       }

+ 224 - 117
apps/emqx_modules/src/emqx_topic_metrics_api.erl

@@ -18,163 +18,259 @@
 
 -behaviour(minirest_api).
 
--import(emqx_mgmt_util, [ properties/1
-                        , schema/1
-                        , object_schema/1
-                        , object_schema/2
-                        , object_array_schema/2
-                        , error_schema/2
-                        ]).
+-include_lib("typerefl/include/types.hrl").
 
--export([api_spec/0]).
+-import( hoconsc
+       , [ mk/2
+         , ref/1
+         , ref/2
+         , array/1
+         , map/2]).
 
 -export([ topic_metrics/2
         , operate_topic_metrics/2
         ]).
 
--define(ERROR_TOPIC, 'ERROR_TOPIC').
+-export([ cluster_accumulation_metrics/0
+        , cluster_accumulation_metrics/1]).
 
--define(EXCEED_LIMIT, 'EXCEED_LIMIT').
+-export([ api_spec/0
+        , paths/0
+        , schema/1
+        , fields/1
+        ]).
 
+-define(ERROR_TOPIC, 'ERROR_TOPIC').
+-define(EXCEED_LIMIT, 'EXCEED_LIMIT').
 -define(BAD_TOPIC, 'BAD_TOPIC').
-
+-define(BAD_RPC, 'BAD_RPC').
 -define(BAD_REQUEST, 'BAD_REQUEST').
+-define(API_TAG_MQTT, [<<"mqtt">>]).
+
 
 api_spec() ->
-    {[
-        topic_metrics_api(),
-        operation_topic_metrics_api()
-    ],[]}.
-
-properties() ->
-    properties([
-        {topic, string},
-        {create_time, string, <<"Date time, rfc3339">>},
-        {reset_time, string, <<"Nullable. Date time, rfc3339.">>},
-        {metrics, object, [{'messages.dropped.count', integer},
-                           {'messages.dropped.rate', number},
-                           {'messages.in.count', integer},
-                           {'messages.in.rate', number},
-                           {'messages.out.count', integer},
-                           {'messages.out.rate', number},
-                           {'messages.qos0.in.count', integer},
-                           {'messages.qos0.in.rate', number},
-                           {'messages.qos0.out.count', integer},
-                           {'messages.qos0.out.rate', number},
-                           {'messages.qos1.in.count', integer},
-                           {'messages.qos1.in.rate', number},
-                           {'messages.qos1.out.count', integer},
-                           {'messages.qos1.out.rate', number},
-                           {'messages.qos2.in.count', integer},
-                           {'messages.qos2.in.rate', number},
-                           {'messages.qos2.out.count', integer},
-                           {'messages.qos2.out.rate', number}]}
-    ]).
-
-topic_metrics_api() ->
-    MetaData = #{
-        %% Get all nodes metrics and accumulate all of these
-        get => #{
-            description => <<"List topic metrics">>,
-            responses => #{
-                <<"200">> => object_array_schema(properties(), <<"List topic metrics">>)
+    emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
+
+paths() ->
+    [ "/mqtt/topic_metrics"
+    , "/mqtt/topic_metrics/:topic"
+    ].
+
+
+schema("/mqtt/topic_metrics") ->
+    #{ 'operationId' => topic_metrics
+     , get =>
+           #{ description => <<"List topic metrics">>
+            , tags => ?API_TAG_MQTT
+            , responses  =>
+                  #{200  => mk(array(hoconsc:ref(topic_metrics)), #{ desc => <<"List all topic metrics">>})}
+            }
+     , put =>
+           #{ description => <<"Reset topic metrics by topic name. Or reset all Topic Metrics">>
+            , tags => ?API_TAG_MQTT
+            , 'requestBody' => emqx_dashboard_swagger:schema_with_examples(
+                                 ref(reset),
+                                 reset_examples())
+            , responses =>
+                  #{ 204 => <<"Reset topic metrics successfully">>
+                   , 404 => emqx_dashboard_swagger:error_codes([?ERROR_TOPIC], <<"Topic not found">>)
+                   }
             }
-        },
-        put => #{
-            description => <<"Reset topic metrics by topic name, or all">>,
-            'requestBody' => object_schema(properties([
-                {topic, string, <<"no topic will reset all">>},
-                {action, string, <<"Action, default reset">>, [reset]}
-            ])),
-            responses => #{
-                <<"200">> => schema(<<"Reset topic metrics success">>),
-                <<"404">> => error_schema(<<"Topic not found">>, [?ERROR_TOPIC])
+     , post =>
+           #{ description => <<"Create topic metrics">>
+            , tags => ?API_TAG_MQTT
+            , 'requestBody' => [topic(body)]
+            , responses =>
+                  #{ 204 => <<"Create topic metrics success">>
+                   , 409 => emqx_dashboard_swagger:error_codes([?EXCEED_LIMIT], <<"Topic metrics exceeded max limit 512">>)
+                   , 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST, ?BAD_TOPIC], <<"Topic metrics already existed or bad topic">>)
+                   }
             }
-        },
-        post => #{
-            description => <<"Create topic metrics">>,
-            'requestBody' => object_schema(properties([{topic, string}])),
-            responses => #{
-                <<"200">> => schema(<<"Create topic metrics success">>),
-                <<"409">> => error_schema(<<"Topic metrics max limit">>, [?EXCEED_LIMIT]),
-                <<"400">> => error_schema( <<"Topic metrics already exist or bad topic">>
-                                         , [?BAD_REQUEST, ?BAD_TOPIC])
+     };
+schema("/mqtt/topic_metrics/:topic") ->
+    #{ 'operationId' =>  operate_topic_metrics
+     , get =>
+           #{ description => <<"Get topic metrics">>
+            , tags => ?API_TAG_MQTT
+            , parameters => [topic(path)]
+            , responses =>
+                  #{ 200 => mk(ref(topic_metrics), #{ desc => <<"Topic metrics">> })
+                   , 404 => emqx_dashboard_swagger:error_codes([?ERROR_TOPIC], <<"Topic not found">>)
+                   }
             }
-        }
-    },
-    {"/mqtt/topic_metrics", MetaData, topic_metrics}.
-
-operation_topic_metrics_api() ->
-    MetaData = #{
-        get => #{
-            description => <<"Get topic metrics">>,
-            parameters => [topic_param()],
-            responses => #{
-                <<"200">> => object_schema(properties(), <<"Topic metrics">>),
-                <<"404">> => error_schema(<<"Topic not found">>, [?ERROR_TOPIC])
-            }},
-        delete => #{
-            description => <<"Deregister topic metrics">>,
-            parameters => [topic_param()],
-            responses => #{
-                <<"204">> => schema(<<"Deregister topic metrics">>),
-                <<"404">> => error_schema(<<"Topic not found">>, [?ERROR_TOPIC])
+     , delete =>
+           #{ description => <<"Remove the topic metrics">>
+            , tags => ?API_TAG_MQTT
+            , parameters => [topic(path)]
+            , responses =>
+                  #{ 204 => <<"Removed topic metrics successfully">>,
+                     404 => emqx_dashboard_swagger:error_codes([?ERROR_TOPIC], <<"Topic not found">>)
+                   }
             }
-        }
-    },
-    {"/mqtt/topic_metrics/:topic", MetaData, operate_topic_metrics}.
-
-topic_param() ->
-    #{
-        name => topic,
-        in => path,
-        required => true,
-        description => <<"Notice: Topic string url must encode">>,
-        schema => #{type => string}
+     }.
+
+fields(reset) ->
+    [ {topic
+      , mk( binary()
+          , #{ desc => <<"Topic Name. If this paramter is not present, all created topic metrics will be reseted">>
+             , example => <<"testtopic/1">>
+             , nullable => true})}
+    , {action
+      , mk( string()
+          , #{ desc => <<"Action Name. Only as a \"reset\"">>
+             , enum => [reset]
+             , nullable => false
+             , example => <<"reset">>})}
+    ];
+
+fields(topic_metrics) ->
+    [ { topic
+      , mk( binary()
+          , #{ desc => <<"Topic Name">>
+             , example => <<"testtopic/1">>
+             , nullable => false})},
+      { create_time
+      , mk( emqx_schema:rfc3339_system_time()
+          , #{ desc => <<"Topic Metrics created date time, in rfc3339">>
+             , nullable => false
+             , example => <<"2022-01-14T21:48:47+08:00">>})},
+      { reset_time
+      , mk( emqx_schema:rfc3339_system_time()
+          , #{ desc => <<"Topic Metrics reset date time, in rfc3339. Nullable if never reseted">>
+             , nullable => true
+             , example => <<"2022-01-14T21:48:47+08:00">>})},
+      { metrics
+      , mk( ref(metrics)
+          , #{ desc => <<"Topic Metrics fields">>
+             , nullable => false})
+      }
+    ];
+
+fields(metrics) ->
+    [ { 'messages.dropped.count'
+      , mk( integer(), #{ desc => <<"Message dropped count">>
+                        , example => 0})},
+      { 'messages.dropped.rate'
+      , mk( number(),  #{ desc => <<"Message dropped rate in 5s">>
+                        , example => 0})},
+      { 'messages.in.count'
+      , mk( integer(), #{ desc => <<"Message received count">>
+                        , example => 0})},
+      { 'messages.in.rate'
+      , mk( number(),  #{ desc => <<"Message received rate in 5s">>
+                        , example => 0})},
+      { 'messages.out.count'
+      , mk( integer(), #{ desc => <<"Message sent count">>
+                        , example => 0})},
+      { 'messages.out.rate'
+      , mk( number(),  #{ desc => <<"Message sent rate in 5s">>
+                        , example => 0})},
+      { 'messages.qos0.in.count'
+      , mk( integer(), #{ desc => <<"Message with QoS 0 received count">>
+                        , example => 0})},
+      { 'messages.qos0.in.rate'
+      , mk( number(),  #{ desc => <<"Message with QoS 0 received rate in 5s">>
+                        , example => 0})},
+      { 'messages.qos0.out.count'
+      , mk( integer(), #{ desc => <<"Message with QoS 0 sent count">>
+                        , example => 0})},
+      { 'messages.qos0.out.rate'
+      , mk( number(),  #{ desc => <<"Message with QoS 0 sent rate in 5s">>
+                        , example => 0})},
+      { 'messages.qos1.in.count'
+      , mk( integer(), #{ desc => <<"Message with QoS 1 received count">>
+                        , example => 0})},
+      { 'messages.qos1.in.rate'
+      , mk( number(),  #{ desc => <<"Message with QoS 1 received rate in 5s">>
+                        , example => 0})},
+      { 'messages.qos1.out.count'
+      , mk( integer(), #{ desc => <<"Message with QoS 1 sent count">>
+                        , example => 0})},
+      { 'messages.qos1.out.rate'
+      , mk( number(),  #{ desc => <<"Message with QoS 1 sent rate in 5s">>
+                        , example => 0})},
+      { 'messages.qos2.in.count'
+      , mk( integer(), #{ desc => <<"Message with QoS 2 sent count">>
+                        , example => 0})},
+      { 'messages.qos2.in.rate'
+      , mk( number(),  #{ desc => <<"Message with QoS 2 received rate in 5s">>
+                        , example => 0})},
+      { 'messages.qos2.out.count'
+      , mk( integer(), #{ desc => <<"Message with QoS 2 sent count">>
+                        , example => 0})},
+      { 'messages.qos2.out.rate'
+      , mk( number(),  #{ desc => <<"Message with QoS 2 sent rate in 5s">>
+                        , example => 0})}
+    ].
+
+topic(In) ->
+    case In of
+        body ->
+            Desc = <<"Raw topic string">>,
+            Example = "testtopic/1";
+        path ->
+            Desc = <<"Notice: Topic string in url path must be encoded">>,
+            Example = "testtopic%2F1"
+    end,
+    { topic
+    , mk( binary(),
+          #{ desc => Desc
+           , required => true
+           , in => In
+           , example => Example
+           })
     }.
 
+reset_examples() ->
+    #{ reset_specific_one_topic_metrics =>
+           #{ summary => <<"reset_specific_one_topic_metrics">>
+            , value =>
+                  #{ topic  => "testtopic/1"
+                   , action => "reset"
+                   }
+            }
+     , reset_all_topic_metrics =>
+           #{ summary => <<"reset_all_topic_metrics">>
+            , value =>
+                  #{ action => "reset"
+                   }
+            }
+     }.
+
 %%--------------------------------------------------------------------
 %% HTTP Callbacks
 %%--------------------------------------------------------------------
 
 topic_metrics(get, _) ->
-    case cluster_accumulation_metrics() of
-        {error, Reason} ->
-            {500, Reason};
-        {ok, Metrics} ->
-            {200, Metrics}
-    end;
+    get_cluster_response([]);
 
 topic_metrics(put, #{body := #{<<"topic">> := Topic, <<"action">> := <<"reset">>}}) ->
     case reset(Topic) of
-        ok -> {200};
-        {error, Reason} -> reason2httpresp(Reason)
+        ok ->
+            get_cluster_response([Topic]);
+        {error, Reason} ->
+            reason2httpresp(Reason)
     end;
 topic_metrics(put, #{body := #{<<"action">> := <<"reset">>}}) ->
     reset(),
-    {200};
+    get_cluster_response([]);
 
 topic_metrics(post, #{body := #{<<"topic">> := <<>>}}) ->
     {400, 'BAD_REQUEST', <<"Topic can not be empty">>};
 topic_metrics(post, #{body := #{<<"topic">> := Topic}}) ->
     case emqx_modules_conf:add_topic_metrics(Topic) of
         {ok, Topic} ->
-            {200};
+            get_cluster_response([Topic]);
         {error, Reason} ->
             reason2httpresp(Reason)
     end.
 
 operate_topic_metrics(get, #{bindings := #{topic := Topic0}}) ->
-    case cluster_accumulation_metrics(emqx_http_lib:uri_decode(Topic0)) of
-        {ok, Metrics} ->
-            {200, Metrics};
-        {error, Reason} ->
-            reason2httpresp(Reason)
-    end;
+    get_cluster_response([emqx_http_lib:uri_decode(Topic0)]);
 
 operate_topic_metrics(delete, #{bindings := #{topic := Topic0}}) ->
     case emqx_modules_conf:remove_topic_metrics(emqx_http_lib:uri_decode(Topic0)) of
-        ok -> {200};
+        ok -> {204};
         {error, Reason} -> reason2httpresp(Reason)
     end.
 
@@ -197,7 +293,8 @@ cluster_accumulation_metrics(Topic) ->
         {SuccResList, []} ->
             case lists:filter(fun({error, _}) -> false; (_) -> true
                               end, SuccResList) of
-                [] -> {error, topic_not_found};
+                [] ->
+                    {error, topic_not_found};
                 TopicMetrics ->
                     NTopicMetrics = [ [T] || T <- TopicMetrics],
                     [AccMetrics] = accumulate_nodes_metrics(NTopicMetrics),
@@ -277,8 +374,8 @@ reason2httpresp(bad_topic) ->
 reason2httpresp({quota_exceeded, bad_topic}) ->
     Msg = list_to_binary(
             io_lib:format(
-                "Max topic metrics count is ~p, and topic cannot have wildcard",
-                [emqx_topic_metrics:max_limit()])),
+              "Max topic metrics count is ~p, and topic cannot have wildcard",
+              [emqx_topic_metrics:max_limit()])),
     {400, #{code => ?BAD_REQUEST, message => Msg}};
 reason2httpresp(already_existed) ->
     Msg = <<"Topic already registered">>,
@@ -289,3 +386,13 @@ reason2httpresp(topic_not_found) ->
 reason2httpresp(not_found) ->
     Msg = <<"Topic not found">>,
     {404, #{code => ?ERROR_TOPIC, message => Msg}}.
+
+get_cluster_response(Args) ->
+    case erlang:apply(?MODULE, cluster_accumulation_metrics, Args) of
+        {error, {badrpc, RPCReason}} ->
+            {500, RPCReason};
+        {error, Reason} when is_atom(Reason) ->
+            reason2httpresp(Reason);
+        {ok, Metrics} ->
+            {200, Metrics}
+    end.