فهرست منبع

feat: topic metrics api (#5520)

DDDHuang 4 سال پیش
والد
کامیت
8125ec7d08

+ 1 - 1
apps/emqx_modules/etc/emqx_modules.conf

@@ -24,7 +24,7 @@ event_message {
 }
 
 topic_metrics {
-    topics = ["topic/#"]
+    topics = []
 }
 
 rewrite {

+ 148 - 40
apps/emqx_modules/src/emqx_topic_metrics.erl

@@ -37,12 +37,17 @@
         , disable/0
         ]).
 
--export([ metrics/1
+-export([ max_limit/0]).
+
+-export([ metrics/0
+        , metrics/1
         , register/1
-        , unregister/1
-        , unregister_all/0
+        , deregister/1
+        , deregister_all/0
         , is_registered/1
         , all_registered_topics/0
+        , reset/0
+        , reset/1
         ]).
 
 %% gen_server callbacks
@@ -92,6 +97,9 @@
 %%------------------------------------------------------------------------------
 %% APIs
 %%------------------------------------------------------------------------------
+max_limit() ->
+    ?MAX_TOPICS.
+
 enable() ->
     emqx_hooks:put('message.publish',   {?MODULE, on_message_publish, []}),
     emqx_hooks:put('message.dropped',   {?MODULE, on_message_dropped, []}),
@@ -100,7 +108,8 @@ enable() ->
 disable() ->
     emqx_hooks:del('message.publish',   {?MODULE, on_message_publish}),
     emqx_hooks:del('message.dropped',   {?MODULE, on_message_dropped}),
-    emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered}).
+    emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered}),
+    deregister_all().
 
 on_message_publish(#message{topic = Topic, qos = QoS}) ->
     case is_registered(Topic) of
@@ -143,77 +152,101 @@ start_link() ->
 stop() ->
     gen_server:stop(?MODULE).
 
+metrics() ->
+    [format(TopicMetrics) || TopicMetrics <- ets:tab2list(?TAB)].
+
 metrics(Topic) ->
     case ets:lookup(?TAB, Topic) of
         [] ->
             {error, topic_not_found};
-        [{Topic, CRef}] ->
-            lists:foldl(fun(Metric, Acc) ->
-                            [{to_count(Metric), counters:get(CRef, metric_idx(Metric))},
-                             {to_rate(Metric), rate(Topic, Metric)} | Acc]
-                        end, [], ?TOPIC_METRICS)
+        [TopicMetrics] ->
+            format(TopicMetrics)
     end.
 
 register(Topic) when is_binary(Topic) ->
     gen_server:call(?MODULE, {register, Topic}).
 
-unregister(Topic) when is_binary(Topic) ->
-    gen_server:call(?MODULE, {unregister, Topic}).
+deregister(Topic) when is_binary(Topic) ->
+    gen_server:call(?MODULE, {deregister, Topic}).
 
-unregister_all() ->
-    gen_server:call(?MODULE, {unregister, all}).
+deregister_all() ->
+    gen_server:call(?MODULE, {deregister, all}).
 
 is_registered(Topic) ->
     ets:member(?TAB, Topic).
 
 all_registered_topics() ->
-    [Topic || {Topic, _CRef} <- ets:tab2list(?TAB)].
+    [Topic || {Topic, _} <- ets:tab2list(?TAB)].
+
+reset(Topic) ->
+    case is_registered(Topic) of
+        true ->
+            gen_server:call(?MODULE, {reset, Topic});
+        false ->
+            {error, topic_not_found}
+    end.
+
+reset() ->
+    gen_server:call(?MODULE, {reset, all}).
 
 %%--------------------------------------------------------------------
 %% gen_server callbacks
 %%--------------------------------------------------------------------
 
-init([_Opts]) ->
+init([Opts]) ->
     erlang:process_flag(trap_exit, true),
     ok = emqx_tables:new(?TAB, [{read_concurrency, true}]),
     erlang:send_after(timer:seconds(?TICKING_INTERVAL), self(), ticking),
-    {ok, #state{speeds = #{}}, hibernate}.
+    Fun =
+        fun(Topic, CurrentSpeeds) ->
+            case do_register(Topic, CurrentSpeeds) of
+                {ok, NSpeeds} ->
+                    NSpeeds;
+                {error, already_existed} ->
+                    CurrentSpeeds;
+                {error, quota_exceeded} ->
+                    error("max topic metrics quota exceeded")
+            end
+        end,
+    {ok, #state{speeds = lists:foldl(Fun, #{}, maps:get(topics, Opts, []))}, hibernate}.
 
 handle_call({register, Topic}, _From, State = #state{speeds = Speeds}) ->
-    case is_registered(Topic) of
-        true ->
-            {reply, {error, already_existed}, State};
-        false ->
-            case number_of_registered_topics() < ?MAX_TOPICS of
-                true ->
-                    CRef = counters:new(counters_size(), [write_concurrency]),
-                    true = ets:insert(?TAB, {Topic, CRef}),
-                    [counters:put(CRef, Idx, 0) || Idx <- lists:seq(1, counters_size())],
-                    NSpeeds = lists:foldl(fun(Metric, Acc) ->
-                                              maps:put({Topic, Metric}, #speed{}, Acc)
-                                          end, Speeds, ?TOPIC_METRICS),
-                    {reply, ok, State#state{speeds = NSpeeds}};
-                false ->
-                    {reply, {error, quota_exceeded}, State}
-            end
+    case do_register(Topic, Speeds) of
+        {ok, NSpeeds} ->
+            {reply, ok, State#state{speeds = NSpeeds}};
+        Error ->
+            {reply, Error, State}
     end;
 
-handle_call({unregister, all}, _From, State) ->
-    [delete_counters(Topic) || {Topic, _CRef} <- ets:tab2list(?TAB)],
+handle_call({deregister, all}, _From, State) ->
+    true = ets:delete_all_objects(?TAB),
+    update_config([]),
     {reply, ok, State#state{speeds = #{}}};
 
-handle_call({unregister, Topic}, _From, State = #state{speeds = Speeds}) ->
+handle_call({deregister, Topic}, _From, State = #state{speeds = Speeds}) ->
     case is_registered(Topic) of
         false ->
             {reply, ok, State};
         true ->
-            ok = delete_counters(Topic),
+            true = ets:delete(?TAB, Topic),
             NSpeeds = lists:foldl(fun(Metric, Acc) ->
                                       maps:remove({Topic, Metric}, Acc)
                                   end, Speeds, ?TOPIC_METRICS),
+            remove_topic_config(Topic),
             {reply, ok, State#state{speeds = NSpeeds}}
     end;
 
+handle_call({reset, all}, _From, State = #state{speeds = Speeds}) ->
+    Fun =
+        fun(T, NSpeeds) ->
+            reset_topic(T, NSpeeds)
+        end,
+    {reply, ok, State#state{speeds = lists:foldl(Fun, Speeds, ets:tab2list(?TAB))}};
+
+handle_call({reset, Topic}, _From, State = #state{speeds = Speeds}) ->
+    NSpeeds = reset_topic(Topic, Speeds),
+    {reply, ok, State#state{speeds = NSpeeds}};
+
 handle_call({get_rates, Topic, Metric}, _From, State = #state{speeds = Speeds}) ->
     case is_registered(Topic) of
         false ->
@@ -249,9 +282,83 @@ handle_info(Info, State) ->
 terminate(_Reason, _State) ->
     ok.
 
+reset_topic({Topic, Data}, Speeds) ->
+    CRef = maps:get(counter_ref, Data),
+    ok = reset_counter(CRef),
+    ResetTime = emqx_rule_funcs:now_rfc3339(),
+    true = ets:insert(?TAB, {Topic, Data#{reset_time => ResetTime}}),
+    Fun =
+        fun(Metric, CurrentSpeeds) ->
+            maps:put({Topic, Metric}, #speed{}, CurrentSpeeds)
+        end,
+    lists:foldl(Fun, Speeds, ?TOPIC_METRICS);
+reset_topic(Topic, Speeds) ->
+    T = hd(ets:lookup(?TAB, Topic)),
+    reset_topic(T, Speeds).
+
 %%------------------------------------------------------------------------------
 %% Internal Functions
 %%------------------------------------------------------------------------------
+do_register(Topic, Speeds) ->
+    case is_registered(Topic) of
+        true ->
+            {error, already_existed};
+        false ->
+            case number_of_registered_topics() < ?MAX_TOPICS of
+                true ->
+                    CreateTime = emqx_rule_funcs:now_rfc3339(),
+                    CRef = counters:new(counters_size(), [write_concurrency]),
+                    ok = reset_counter(CRef),
+                    Data = #{
+                        counter_ref => CRef,
+                        create_time => CreateTime},
+                    true = ets:insert(?TAB, {Topic, Data}),
+                    NSpeeds = lists:foldl(fun(Metric, Acc) ->
+                                              maps:put({Topic, Metric}, #speed{}, Acc)
+                                          end, Speeds, ?TOPIC_METRICS),
+                    add_topic_config(Topic),
+                    {ok, NSpeeds};
+                false ->
+                    {error, quota_exceeded}
+            end
+    end.
+
+format({Topic, Data}) ->
+    CRef = maps:get(counter_ref, Data),
+    Fun =
+        fun(Key, Metrics) ->
+            CounterKey = to_count(Key),
+            Counter    = counters:get(CRef, metric_idx(Key)),
+            RateKey    = to_rate(Key),
+            Rate       = emqx_rule_funcs:float(rate(Topic, Key), 4),
+            maps:put(RateKey, Rate, maps:put(CounterKey, Counter, Metrics))
+        end,
+    Metrics = lists:foldl(Fun, #{}, ?TOPIC_METRICS),
+    CreateTime = maps:get(create_time, Data),
+    TopicMetrics = #{
+        topic => Topic,
+        metrics => Metrics,
+        create_time => CreateTime
+    },
+    case maps:get(reset_time, Data, undefined) of
+        undefined ->
+            TopicMetrics;
+        ResetTime ->
+            TopicMetrics#{reset_time => ResetTime}
+    end.
+
+remove_topic_config(Topic) when is_binary(Topic) ->
+    Topics = emqx_config:get_raw([<<"topic_metrics">>, <<"topics">>], []) -- [Topic],
+    update_config(Topics).
+
+add_topic_config(Topic) when is_binary(Topic) ->
+    Topics = emqx_config:get_raw([<<"topic_metrics">>, <<"topics">>], []) ++ [Topic],
+    update_config(Topics).
+
+update_config(Topics) when is_list(Topics) ->
+    Opts = emqx_config:get_raw([<<"topic_metrics">>], #{}),
+    {ok, _} = emqx:update_config([topic_metrics], maps:put(<<"topics">>, Topics, Opts)),
+    ok.
 
 try_inc(Topic, Metric) ->
     _ = inc(Topic, Metric),
@@ -277,7 +384,8 @@ val(Topic, Metric) ->
     case ets:lookup(?TAB, Topic) of
         [] ->
             {error, topic_not_found};
-        [{Topic, CRef}] ->
+        [{Topic, Data}] ->
+            CRef = maps:get(counter_ref, Data),
             case metric_idx(Metric) of
                 {error, invalid_metric} ->
                     {error, invalid_metric};
@@ -344,14 +452,14 @@ to_rate('messages.qos2.out') ->
 to_rate('messages.dropped') ->
     'messages.dropped.rate'.
 
-delete_counters(Topic) ->
-    true = ets:delete(?TAB, Topic),
+reset_counter(CRef) ->
+    [counters:put(CRef, Idx, 0) || Idx <- lists:seq(1, counters_size())],
     ok.
 
 get_counters(Topic) ->
     case ets:lookup(?TAB, Topic) of
         [] -> {error, topic_not_found};
-        [{Topic, CRef}] -> CRef
+        [{Topic, Data}] -> maps:get(counter_ref, Data)
     end.
 
 counters_size() ->

+ 236 - 190
apps/emqx_modules/src/emqx_topic_metrics_api.erl

@@ -13,195 +13,241 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%--------------------------------------------------------------------
-
+%% TODO: refactor uri path
 -module(emqx_topic_metrics_api).
 
-% -rest_api(#{name   => list_all_topic_metrics,
-%             method => 'GET',
-%             path   => "/topic-metrics",
-%             func   => list,
-%             descr  => "A list of all topic metrics of all nodes in the cluster"}).
-
-% -rest_api(#{name   => list_topic_metrics,
-%             method => 'GET',
-%             path   => "/topic-metrics/:bin:topic",
-%             func   => list,
-%             descr  => "A list of specfied topic metrics of all nodes in the cluster"}).
-
-% -rest_api(#{name   => register_topic_metrics,
-%             method => 'POST',
-%             path   => "/topic-metrics",
-%             func   => register,
-%             descr  => "Register topic metrics"}).
-
-% -rest_api(#{name   => unregister_all_topic_metrics,
-%             method => 'DELETE',
-%             path   => "/topic-metrics",
-%             func   => unregister,
-%             descr  => "Unregister all topic metrics"}).
-
-% -rest_api(#{name   => unregister_topic_metrics,
-%             method => 'DELETE',
-%             path   => "/topic-metrics/:bin:topic",
-%             func   => unregister,
-%             descr  => "Unregister topic metrics"}).
-
-% -export([ list/2
-%         , register/2
-%         , unregister/2
-%         ]).
-
-% -export([ get_topic_metrics/2
-%         , register_topic_metrics/2
-%         , unregister_topic_metrics/2
-%         , unregister_all_topic_metrics/1
-%         ]).
-
-% list(#{topic := Topic0}, _Params) ->
-%     execute_when_enabled(fun() ->
-%         Topic = emqx_mgmt_util:urldecode(Topic0),
-%         case safe_validate(Topic) of
-%             true ->
-%                 case get_topic_metrics(Topic) of
-%                     {error, Reason} -> return({error, Reason});
-%                     Metrics         -> return({ok, maps:from_list(Metrics)})
-%                 end;
-%             false ->
-%                 return({error, invalid_topic_name})
-%         end
-%     end);
-
-% list(_Bindings, _Params) ->
-%     execute_when_enabled(fun() ->
-%         case get_all_topic_metrics() of
-%             {error, Reason} -> return({error, Reason});
-%             Metrics         -> return({ok, Metrics})
-%         end
-%     end).
-
-% register(_Bindings, Params) ->
-%     execute_when_enabled(fun() ->
-%         case proplists:get_value(<<"topic">>, Params) of
-%             undefined ->
-%                 return({error, missing_required_params});
-%             Topic ->
-%                 case safe_validate(Topic) of
-%                     true ->
-%                         register_topic_metrics(Topic),
-%                         return(ok);
-%                     false ->
-%                         return({error, invalid_topic_name})
-%                 end
-%         end
-%     end).
-
-% unregister(Bindings, _Params) when map_size(Bindings) =:= 0 ->
-%     execute_when_enabled(fun() ->
-%         unregister_all_topic_metrics(),
-%         return(ok)
-%     end);
-
-% unregister(#{topic := Topic0}, _Params) ->
-%     execute_when_enabled(fun() ->
-%         Topic = emqx_mgmt_util:urldecode(Topic0),
-%         case safe_validate(Topic) of
-%             true ->
-%                 unregister_topic_metrics(Topic),
-%                 return(ok);
-%             false ->
-%                 return({error, invalid_topic_name})
-%         end
-%     end).
-
-% execute_when_enabled(Fun) ->
-%     case emqx_modules:find_module(topic_metrics) of
-%         true ->
-%             Fun();
-%         false ->
-%             return({error, module_not_loaded})
-%     end.
-
-% safe_validate(Topic) ->
-%     try emqx_topic:validate(name, Topic) of
-%         true -> true
-%     catch
-%         error:_Error ->
-%             false
-%     end.
-
-% get_all_topic_metrics() ->
-%     lists:foldl(fun(Topic, Acc) ->
-%                     case get_topic_metrics(Topic) of
-%                         {error, _Reason} ->
-%                             Acc;
-%                         Metrics ->
-%                             [#{topic => Topic, metrics => Metrics} | Acc]
-%                     end
-%                 end, [], emqx_mod_topic_metrics:all_registered_topics()).
-
-% get_topic_metrics(Topic) ->
-%     lists:foldl(fun(Node, Acc) ->
-%                     case get_topic_metrics(Node, Topic) of
-%                         {error, _Reason} ->
-%                             Acc;
-%                         Metrics ->
-%                             case Acc of
-%                                 [] -> Metrics;
-%                                 _ ->
-%                                     lists:foldl(fun({K, V}, Acc0) ->
-%                                                     [{K, V + proplists:get_value(K, Metrics, 0)} | Acc0]
-%                                                 end, [], Acc)
-%                             end
-%                     end
-%                 end, [], ekka_mnesia:running_nodes()).
-
-% get_topic_metrics(Node, Topic) when Node =:= node() ->
-%     emqx_mod_topic_metrics:metrics(Topic);
-% get_topic_metrics(Node, Topic) ->
-%     rpc_call(Node, get_topic_metrics, [Node, Topic]).
-
-% register_topic_metrics(Topic) ->
-%     Results = [register_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()],
-%     case lists:any(fun(Item) -> Item =:= ok end, Results) of
-%         true  -> ok;
-%         false -> lists:last(Results)
-%     end.
-
-% register_topic_metrics(Node, Topic) when Node =:= node() ->
-%     emqx_mod_topic_metrics:register(Topic);
-% register_topic_metrics(Node, Topic) ->
-%     rpc_call(Node, register_topic_metrics, [Node, Topic]).
-
-% unregister_topic_metrics(Topic) ->
-%     Results = [unregister_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()],
-%     case lists:any(fun(Item) -> Item =:= ok end, Results) of
-%         true  -> ok;
-%         false -> lists:last(Results)
-%     end.
-
-% unregister_topic_metrics(Node, Topic) when Node =:= node() ->
-%     emqx_mod_topic_metrics:unregister(Topic);
-% unregister_topic_metrics(Node, Topic) ->
-%     rpc_call(Node, unregister_topic_metrics, [Node, Topic]).
-
-% unregister_all_topic_metrics() ->
-%     Results = [unregister_all_topic_metrics(Node) || Node <- ekka_mnesia:running_nodes()],
-%     case lists:any(fun(Item) -> Item =:= ok end, Results) of
-%         true  -> ok;
-%         false -> lists:last(Results)
-%     end.
-
-% unregister_all_topic_metrics(Node) when Node =:= node() ->
-%     emqx_mod_topic_metrics:unregister_all();
-% unregister_all_topic_metrics(Node) ->
-%     rpc_call(Node, unregister_topic_metrics, [Node]).
-
-% rpc_call(Node, Fun, Args) ->
-%     case rpc:call(Node, ?MODULE, Fun, Args) of
-%         {badrpc, Reason} -> {error, Reason};
-%         Res -> Res
-%     end.
-
-% return(_) ->
-% %%    TODO: V5 API
-%     ok.
+-behavior(minirest_api).
+
+-import(emqx_mgmt_util, [ request_body_schema/1
+                        , response_schema/1
+                        , response_schema/2
+                        , response_array_schema/2
+                        , response_error_schema/2
+                        ]).
+
+-export([api_spec/0]).
+
+-export([ list_topic/2
+        , list_topic_metrics/2
+        , operate_topic_metrics/2
+        , reset_all_topic_metrics/2
+        , reset_topic_metrics/2
+        ]).
+
+-define(ERROR_TOPIC, 'ERROR_TOPIC').
+
+-define(EXCEED_LIMIT, 'EXCEED_LIMIT').
+
+-define(BAD_REQUEST, 'BAD_REQUEST').
+
+api_spec() ->
+    {
+        [
+            list_topic_api(),
+            list_topic_metrics_api(),
+            get_topic_metrics_api(),
+            reset_all_topic_metrics_api(),
+            reset_topic_metrics_api()
+        ],
+        [
+            topic_metrics_schema()
+        ]
+    }.
+
+topic_metrics_schema() ->
+    #{
+        topic_metrics => #{
+            type => object,
+            properties => #{
+                topic => #{type => string},
+                create_time => #{
+                    type => string,
+                    description => <<"Date time, rfc3339">>
+                },
+                reset_time => #{
+                    type => string,
+                    description => <<"Nullable. Date time, rfc3339.">>
+                },
+                metrics => #{
+                    type => object,
+                    properties => #{
+                        'messages.dropped.count'  => #{type => integer},
+                        'messages.dropped.rate'   => #{type => number},
+                        'messages.in.count'       => #{type => integer},
+                        'messages.in.rate'        => #{type => number},
+                        'messages.out.count'      => #{type => integer},
+                        'messages.out.rate'       => #{type => number},
+                        'messages.qos0.in.count'  => #{type => integer},
+                        'messages.qos0.in.rate'   => #{type => number},
+                        'messages.qos0.out.count' => #{type => integer},
+                        'messages.qos0.out.rate'  => #{type => number},
+                        'messages.qos1.in.count'  => #{type => integer},
+                        'messages.qos1.in.rate'   => #{type => number},
+                        'messages.qos1.out.count' => #{type => integer},
+                        'messages.qos1.out.rate'  => #{type => number},
+                        'messages.qos2.in.count'  => #{type => integer},
+                        'messages.qos2.in.rate'   => #{type => number},
+                        'messages.qos2.out.count' => #{type => integer},
+                        'messages.qos2.out.rate'  => #{type => number}
+                    }
+                }
+            }
+        }
+    }.
+
+list_topic_api() ->
+    Path = "/mqtt/topic_metrics",
+    TopicSchema = #{
+        type => object,
+        properties => #{
+            topic => #{
+                type => string}}},
+    MetaData = #{
+        get => #{
+            description => <<"List topic">>,
+            responses => #{
+                <<"200">> =>
+                    response_array_schema(<<"List topic">>, TopicSchema)}}},
+    {Path, MetaData, list_topic}.
+
+list_topic_metrics_api() ->
+    Path = "/mqtt/topic_metrics/metrics",
+    MetaData = #{
+        get => #{
+            description => <<"List topic metrics">>,
+            responses => #{
+                <<"200">> =>
+                    response_array_schema(<<"List topic metrics">>, topic_metrics)}}},
+    {Path, MetaData, list_topic_metrics}.
+
+get_topic_metrics_api() ->
+    Path = "/mqtt/topic_metrics/metrics/:topic",
+    MetaData = #{
+        get => #{
+            description => <<"List topic metrics">>,
+            parameters => [topic_param()],
+            responses => #{
+                <<"200">> =>
+                    response_schema(<<"List topic metrics">>, topic_metrics)}},
+        put => #{
+            description => <<"Register topic metrics">>,
+            parameters => [topic_param()],
+            responses => #{
+                <<"200">> =>
+                    response_schema(<<"Register topic metrics">>),
+                <<"409">> =>
+                    response_error_schema(<<"Topic metrics max limit">>, [?EXCEED_LIMIT]),
+                <<"400">> =>
+                    response_error_schema(<<"Topic metrics already exist">>, [?BAD_REQUEST])}},
+        delete => #{
+            description => <<"Deregister topic metrics">>,
+            parameters => [topic_param()],
+            responses => #{
+                <<"200">> =>
+                    response_schema(<<"Deregister topic metrics">>)}}},
+    {Path, MetaData, operate_topic_metrics}.
+
+reset_all_topic_metrics_api() ->
+    Path = "/mqtt/topic_metrics/reset",
+    MetaData = #{
+        put => #{
+            description => <<"Reset all topic metrics">>,
+            responses => #{
+                <<"200">> =>
+                    response_schema(<<"Reset all topic metrics">>)}}},
+    {Path, MetaData, reset_all_topic_metrics}.
+
+reset_topic_metrics_api() ->
+    Path = "/mqtt/topic_metrics/reset/:topic",
+    MetaData = #{
+        put => #{
+            description => <<"Reset topic metrics">>,
+            parameters => [topic_param()],
+            responses => #{
+                <<"200">> =>
+                    response_schema(<<"Reset topic metrics">>)}}},
+    {Path, MetaData, reset_topic_metrics}.
+
+topic_param() ->
+    #{
+        name => topic,
+        in => path,
+        required => true,
+        schema => #{type => string}
+    }.
+
+topic_param(Request) ->
+    cowboy_req:binding(topic, Request).
+
+%%--------------------------------------------------------------------
+%% api callback
+list_topic(get, _) ->
+    list_topics().
+
+list_topic_metrics(get, _) ->
+    list_metrics().
+
+operate_topic_metrics(Method, Request) ->
+    Topic = topic_param(Request),
+    case Method of
+        get ->
+            get_metrics(Topic);
+        put ->
+            register(Topic);
+        delete ->
+            deregister(Topic)
+    end.
+
+reset_all_topic_metrics(put, _) ->
+    reset().
+
+reset_topic_metrics(put, Request) ->
+    Topic = topic_param(Request),
+    reset(Topic).
+
+%%--------------------------------------------------------------------
+%% api apply
+list_topics() ->
+    {200, emqx_topic_metrics:all_registered_topics()}.
+
+list_metrics() ->
+    {200, emqx_topic_metrics:metrics()}.
+
+register(Topic) ->
+    case emqx_topic_metrics:register(Topic) of
+        {error, quota_exceeded} ->
+            Message = list_to_binary(io_lib:format("Max topic metrics count is  ~p",
+                                        [emqx_topic_metrics:max_limit()])),
+            {409, #{code => ?EXCEED_LIMIT, message => Message}};
+        {error, already_existed} ->
+            Message = list_to_binary(io_lib:format("Topic ~p already registered", [Topic])),
+            {400, #{code => ?BAD_REQUEST, message => Message}};
+        ok ->
+            {200}
+    end.
+
+deregister(Topic) ->
+    _ = emqx_topic_metrics:deregister(Topic),
+    {200}.
+
+get_metrics(Topic) ->
+    case emqx_topic_metrics:metrics(Topic) of
+        {error, topic_not_found} ->
+            Message = list_to_binary(io_lib:format("Topic ~p not found", [Topic])),
+            {404, #{code => ?ERROR_TOPIC, message => Message}};
+        Metrics ->
+            {200, Metrics}
+    end.
+
+reset() ->
+    ok = emqx_topic_metrics:reset(),
+    {200}.
+
+reset(Topic) ->
+    case emqx_topic_metrics:reset(Topic) of
+        {error, topic_not_found} ->
+            Message = list_to_binary(io_lib:format("Topic ~p not found", [Topic])),
+            {404, #{code => ?ERROR_TOPIC, message => Message}};
+        ok ->
+            {200}
+    end.

+ 9 - 3
apps/emqx_modules/test/emqx_topic_metrics_SUITE.erl

@@ -19,6 +19,11 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+
+-define(TOPIC, <<"""
+topic_metrics: {
+  topics : []}""">>).
+
 -include_lib("eunit/include/eunit.hrl").
 
 all() -> emqx_ct:all(?MODULE).
@@ -26,6 +31,7 @@ all() -> emqx_ct:all(?MODULE).
 init_per_suite(Config) ->
     emqx_ct_helpers:boot_modules(all),
     emqx_ct_helpers:start_apps([emqx_modules]),
+    ok = emqx_config:init_load(emqx_modules_schema, ?TOPIC),
     Config.
 
 end_per_suite(_Config) ->
@@ -43,7 +49,7 @@ t_nonexistent_topic_metrics(_) ->
     ?assertEqual({error, invalid_metric}, emqx_topic_metrics:inc(<<"a/b/c">>, 'invalid.metrics')),
     ?assertEqual({error, invalid_metric}, emqx_topic_metrics:rate(<<"a/b/c">>, 'invalid.metrics')),
     % ?assertEqual({error, invalid_metric}, emqx_topic_metrics:rates(<<"a/b/c">>, 'invalid.metrics')),
-    emqx_topic_metrics:unregister(<<"a/b/c">>),
+    emqx_topic_metrics:deregister(<<"a/b/c">>),
     emqx_topic_metrics:disable().
 
 t_topic_metrics(_) ->
@@ -60,7 +66,7 @@ t_topic_metrics(_) ->
     ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
     ?assert(emqx_topic_metrics:rate(<<"a/b/c">>, 'messages.in') =:= 0),
     % ?assert(emqx_topic_metrics:rates(<<"a/b/c">>, 'messages.in') =:= #{long => 0,medium => 0,short => 0}),
-    emqx_topic_metrics:unregister(<<"a/b/c">>),
+    emqx_topic_metrics:deregister(<<"a/b/c">>),
     emqx_topic_metrics:disable().
 
 t_hook(_) ->
@@ -91,5 +97,5 @@ t_hook(_) ->
     ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.out')),
     ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')),
     ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')),
-    emqx_topic_metrics:unregister(<<"a/b/c">>),
+    emqx_topic_metrics:deregister(<<"a/b/c">>),
     emqx_topic_metrics:disable().

+ 5 - 0
apps/emqx_rule_engine/src/emqx_rule_funcs.erl

@@ -93,6 +93,7 @@
         , bool/1
         , int/1
         , float/1
+        , float/2
         , map/1
         , bin2hexstr/1
         , hexstr2bin/1
@@ -516,6 +517,10 @@ int(Data) ->
 float(Data) ->
     emqx_rule_utils:float(Data).
 
+float(Data, Decimals) when Decimals > 0 ->
+    Data1 = ?MODULE:float(Data),
+    list_to_float(float_to_list(Data1, [{decimals, Decimals}])).
+
 map(Data) ->
     emqx_rule_utils:map(Data).