ソースを参照

Merge pull request #7528 from EMQ-YangM/reset_metrics

feat: impl reset_metrics api
Yang Miao 3 年 前
コミット
45ea26bc17

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -1,4 +1,5 @@
 {emqx,1}.
+{emqx_rule_engine,1}.
 {emqx_bridge,1}.
 {emqx_authn,1}.
 {emqx_authz,1}.

+ 4 - 0
apps/emqx_bridge/src/emqx_bridge.erl

@@ -52,6 +52,7 @@
         , update/3
         , stop/2
         , restart/2
+        , reset_metrics/1
         ]).
 
 -export([ send_message/2
@@ -210,6 +211,9 @@ lookup(Type, Name, RawConf) ->
                    raw_config => RawConf}}
     end.
 
+reset_metrics(ResourceId) ->
+    emqx_resource:reset_metrics(ResourceId).
+
 stop(Type, Name) ->
     emqx_resource:stop(resource_id(Type, Name)).
 

+ 23 - 1
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -34,6 +34,7 @@
         , '/bridges/:id'/2
         , '/bridges/:id/operation/:operation'/2
         , '/nodes/:node/bridges/:id/operation/:operation'/2
+        , '/bridges/:id/reset_metrics'/2
         ]).
 
 -export([ lookup_from_local_node/2
@@ -76,7 +77,8 @@ api_spec() ->
     emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
 
 paths() -> ["/bridges", "/bridges/:id", "/bridges/:id/operation/:operation",
-            "/nodes/:node/bridges/:id/operation/:operation"].
+            "/nodes/:node/bridges/:id/operation/:operation",
+            "/bridges/:id/reset_metrics"].
 
 error_schema(Code, Message) when is_atom(Code) ->
     error_schema([Code], Message);
@@ -282,6 +284,20 @@ schema("/bridges/:id") ->
         }
     };
 
+schema("/bridges/:id/reset_metrics") ->
+    #{
+        'operationId' => '/bridges/:id/reset_metrics',
+        put => #{
+            tags => [<<"bridges">>],
+            summary => <<"Reset Bridge Metrics">>,
+            description => <<"Reset a bridge metrics by Id">>,
+            parameters => [param_path_id()],
+            responses => #{
+                200 => <<"Reset success">>,
+                400 => error_schema(['BAD_REQUEST'], "RPC Call Failed")
+            }
+        }
+    };
 schema("/bridges/:id/operation/:operation") ->
     #{
         'operationId' => '/bridges/:id/operation/:operation',
@@ -363,6 +379,12 @@ schema("/nodes/:node/bridges/:id/operation/:operation") ->
                 {500, error_msg('INTERNAL_ERROR', Reason)}
         end).
 
+'/bridges/:id/reset_metrics'(put, #{bindings := #{id := Id}}) ->
+    case emqx_bridge:reset_metrics(Id) of
+        ok -> {200, <<"Reset success">>};
+        Reason -> {400, error_msg('BAD_REQUEST', Reason)}
+    end.
+
 lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
     Nodes = mria_mnesia:running_nodes(),
     case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of

+ 24 - 0
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -333,6 +333,30 @@ t_enable_disable_bridges(_) ->
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
 
+t_reset_bridges(_) ->
+    %% assert we there's no bridges at first
+    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+
+    Port = start_http_server(fun handle_fun_200_ok/2),
+    URL1 = ?URL(Port, "abc"),
+    {ok, 201, Bridge} = request(post, uri(["bridges"]),
+        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)),
+    %ct:pal("the bridge ==== ~p", [Bridge]),
+    #{ <<"type">> := ?BRIDGE_TYPE
+     , <<"name">> := ?BRIDGE_NAME
+     , <<"status">> := <<"connected">>
+     , <<"node_status">> := [_|_]
+     , <<"metrics">> := _
+     , <<"node_metrics">> := [_|_]
+     , <<"url">> := URL1
+     } = jsx:decode(Bridge),
+    BridgeID = emqx_bridge:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
+    {ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "reset_metrics"]), []),
+
+    %% delete the bridge
+    {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
+    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
+
 request(Method, Url, Body) ->
     request(<<"bridge_admin">>, Method, Url, Body).
 

+ 23 - 0
apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl

@@ -32,6 +32,7 @@
         , create_metrics/3
         , create_metrics/4
         , clear_metrics/2
+        , reset_metrics/2
         , has_metrics/2
         ]).
 
@@ -116,6 +117,10 @@ create_metrics(Name, Id, Metrics, RateMetrics) ->
 clear_metrics(Name, Id) ->
     gen_server:call(Name, {delete_metrics, Id}).
 
+-spec(reset_metrics(handler_name(), metric_id()) -> ok).
+reset_metrics(Name, Id) ->
+    gen_server:call(Name, {reset_metrics, Id}).
+
 -spec(has_metrics(handler_name(), metric_id()) -> boolean()).
 has_metrics(Name, Id) ->
     case get_ref(Name, Id) of
@@ -143,6 +148,13 @@ get_counters(Name, Id) ->
             get(Name, Id, Index)
         end, get_indexes(Name, Id)).
 
+-spec reset_counters(handler_name(), metric_id()) -> ok.
+reset_counters(Name, Id) ->
+    Indexes = maps:values(get_indexes(Name, Id)),
+    Ref = get_ref(Name, Id),
+    [counters:put(Ref, Idx, 0) || Idx <- Indexes ],
+    ok.
+
 -spec(get_metrics(handler_name(), metric_id()) -> metrics()).
 get_metrics(Name, Id) ->
     #{rate => get_rate(Name, Id), counters => get_counters(Name, Id)}.
@@ -198,6 +210,17 @@ handle_call({delete_metrics, Id}, _From,
                         _ -> maps:remove(Id, Rates)
                     end}};
 
+handle_call({reset_metrics, Id}, _From,
+            State = #state{rates = Rates}) ->
+    {reply, reset_counters(get_self_name(), Id),
+     State#state{rates = case Rates of
+                             undefined -> undefined;
+                             _ -> ResetRate =
+                                      maps:map(fun(_Key, _Value) -> #rate{} end,
+                                               maps:get(Id, Rates, #{})),
+                                  maps:put(Id, ResetRate, Rates)
+                         end}};
+
 handle_call(_Request, _From, State) ->
     {reply, ok, State}.
 

+ 38 - 0
apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl

@@ -85,6 +85,44 @@ t_get_metrics(_) ->
      ?assert(MaxA > 0), ?assert(MaxB > 0), ?assert(MaxC > 0)}),
     ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>).
 
+t_reset_metrics(_) ->
+    Metrics = [a, b, c],
+    ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics),
+    %% all the metrics are set to zero at start
+    ?assertMatch(#{
+        rate := #{
+            a := #{current := 0.0, max := 0.0, last5m := 0.0},
+            b := #{current := 0.0, max := 0.0, last5m := 0.0},
+            c := #{current := 0.0, max := 0.0, last5m := 0.0}
+        },
+        counters := #{
+            a := 0,
+            b := 0,
+            c := 0
+        }
+    }, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
+    ct:sleep(1500),
+    ok = emqx_plugin_libs_metrics:reset_metrics(?NAME, <<"testid">>),
+    ?LET(#{
+        rate := #{
+            a := #{current := CurrA, max := MaxA, last5m := _},
+            b := #{current := CurrB, max := MaxB, last5m := _},
+            c := #{current := CurrC, max := MaxC, last5m := _}
+        },
+        counters := #{
+            a := 0,
+            b := 0,
+            c := 0
+        }
+    }, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>),
+    {?assert(CurrA == 0), ?assert(CurrB == 0), ?assert(CurrC == 0),
+     ?assert(MaxA == 0), ?assert(MaxB == 0), ?assert(MaxC == 0)}),
+    ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>).
+
 t_get_metrics_2(_) ->
     Metrics = [a, b, c],
     ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics,

+ 10 - 0
apps/emqx_resource/src/emqx_resource.erl

@@ -53,6 +53,8 @@
         , recreate_local/4
         , remove/1 %% remove the config and stop the instance
         , remove_local/1
+        , reset_metrics/1
+        , reset_metrics_local/1
         ]).
 
 %% Calls to the callback module with current resource state
@@ -184,6 +186,14 @@ remove(InstId) ->
 remove_local(InstId) ->
     call_instance(InstId, {remove, InstId}).
 
+-spec reset_metrics_local(instance_id()) -> ok.
+reset_metrics_local(InstId) ->
+    call_instance(InstId, {reset_metrics, InstId}).
+
+-spec reset_metrics(instance_id()) -> ok | {error, Reason :: term()}.
+reset_metrics(InstId) ->
+    wrap_rpc(emqx_resource_proto_v1:reset_metrics(InstId)).
+
 %% =================================================================================
 -spec query(instance_id(), Request :: term()) -> Result :: term().
 query(InstId, Request) ->

+ 10 - 0
apps/emqx_resource/src/emqx_resource_instance.erl

@@ -25,6 +25,7 @@
 %% load resource instances from *.conf files
 -export([ lookup/1
         , get_metrics/1
+        , reset_metrics/1
         , list_all/0
         , list_group/1
         ]).
@@ -77,6 +78,9 @@ make_test_id() ->
 get_metrics(InstId) ->
     emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId).
 
+reset_metrics(InstId) ->
+    emqx_plugin_libs_metrics:reset_metrics(resource_metrics, InstId).
+
 force_lookup(InstId) ->
     {ok, _Group, Data} = lookup(InstId),
     Data.
@@ -114,6 +118,9 @@ handle_call({create_dry_run, ResourceType, Config}, _From, State) ->
 handle_call({recreate, InstId, ResourceType, Config, Opts}, _From, State) ->
     {reply, do_recreate(InstId, ResourceType, Config, Opts), State};
 
+handle_call({reset_metrics, InstId}, _From, State) ->
+    {reply, do_reset_metrics(InstId), State};
+
 handle_call({remove, InstId}, _From, State) ->
     {reply, do_remove(InstId), State};
 
@@ -222,6 +229,9 @@ do_create_dry_run(ResourceType, Config) ->
             {error, Reason}
     end.
 
+do_reset_metrics(Instance) ->
+    reset_metrics(Instance).
+
 do_remove(Instance) ->
     do_remove(Instance, true).
 

+ 6 - 0
apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl

@@ -24,6 +24,7 @@
         , create_dry_run/2
         , recreate/4
         , remove/1
+        , reset_metrics/1
         ]).
 
 -include_lib("emqx/include/bpapi.hrl").
@@ -61,3 +62,8 @@ recreate(InstId, ResourceType, Config, Opts) ->
           emqx_cluster_rpc:multicall_return(ok).
 remove(InstId) ->
     emqx_cluster_rpc:multicall(emqx_resource, remove_local, [InstId]).
+
+-spec reset_metrics(emqx_resource:instance_id()) ->
+          emqx_cluster_rpc:multicall_return(ok).
+reset_metrics(InstId) ->
+    emqx_cluster_rpc:multicall(emqx_resource, reset_metrics_local, [InstId]).

+ 14 - 1
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -294,7 +294,7 @@ t_create_dry_run_local(_) ->
 
     ?assertEqual(undefined, whereis(test_resource)).
 
-t_create_dry_run_local_failed(_) -> 
+t_create_dry_run_local_failed(_) ->
     {Res, _} = emqx_resource:create_dry_run_local(?TEST_RESOURCE,
                        #{cteate_error => true}),
     ?assertEqual(error, Res),
@@ -313,6 +313,19 @@ t_test_func(_) ->
     ?assertEqual(ok, erlang:apply(emqx_resource_validator:max(array, 10), [[a,b,c,d]])),
     ?assertEqual(ok, erlang:apply(emqx_resource_validator:max(string, 10), ["less10"])).
 
+t_reset_metrics(_) ->
+    {ok, _} = emqx_resource:create(
+                ?ID,
+                ?DEFAULT_RESOURCE_GROUP,
+                ?TEST_RESOURCE,
+                #{name => test_resource}),
+
+    #{pid := Pid} = emqx_resource:query(?ID, get_state),
+    emqx_resource:reset_metrics(?ID),
+    ?assert(is_process_alive(Pid)),
+    ok = emqx_resource:remove(?ID),
+    ?assertNot(is_process_alive(Pid)).
+
 %%------------------------------------------------------------------------------
 %% Helpers
 %%------------------------------------------------------------------------------

+ 5 - 0
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -56,6 +56,7 @@
         , unload_hooks_for_rule/1
         , maybe_add_metrics_for_rule/1
         , clear_metrics_for_rule/1
+        , reset_metrics_for_rule/1
         ]).
 
 %% exported for `emqx_telemetry'
@@ -195,6 +196,10 @@ maybe_add_metrics_for_rule(Id) ->
 clear_metrics_for_rule(Id) ->
     ok = emqx_plugin_libs_metrics:clear_metrics(rule_metrics, Id).
 
+-spec(reset_metrics_for_rule(rule_id()) -> ok).
+reset_metrics_for_rule(Id) ->
+    emqx_plugin_libs_metrics:reset_metrics(rule_metrics, Id).
+
 unload_hooks_for_rule(#{id := Id, from := Topics}) ->
     lists:foreach(fun(Topic) ->
         case get_rules_with_same_event(Topic) of

+ 23 - 1
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -28,7 +28,7 @@
 -export([api_spec/0, paths/0, schema/1, namespace/0]).
 
 %% API callbacks
--export(['/rule_events'/2, '/rule_test'/2, '/rules'/2, '/rules/:id'/2]).
+-export(['/rule_events'/2, '/rule_test'/2, '/rules'/2, '/rules/:id'/2, '/rules/:id/reset_metrics'/2]).
 
 -define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~ts Not Found", [(ID)]))).
 -define(ERR_BADARGS(REASON),
@@ -166,6 +166,21 @@ schema("/rules/:id") ->
         }
     };
 
+schema("/rules/:id/reset_metrics") ->
+    #{
+        operationId => '/rules/:id/reset_metrics',
+        put => #{
+            tags => [<<"rules">>],
+            description => <<"Reset a rule metrics">>,
+            summary => <<"Reset a Rule Metrics">>,
+            parameters => param_path_id(),
+            responses => #{
+                400 => error_schema('BAD_REQUEST', "RPC Call Failed"),
+                200 => <<"Reset Success">>
+            }
+        }
+    };
+
 schema("/rule_test") ->
     #{
         operationId => '/rule_test',
@@ -262,10 +277,17 @@ replace_sql_clrf(#{ <<"sql">> := SQL } = Params) ->
                            id => Id, reason => Reason}),
             {500, #{code => 'INTERNAL_ERROR', message => ?ERR_BADARGS(Reason)}}
     end.
+'/rules/:id/reset_metrics'(put, #{bindings := #{id := RuleId}}) ->
+    case emqx_rule_engine_proto_v1:reset_metrics(RuleId) of
+        {ok, _TxnId, _Result} -> {200, <<"Reset Success">>};
+        Failed -> {400, #{code => 'BAD_REQUEST',
+                          message => err_msg(Failed)}}
+    end.
 
 %%------------------------------------------------------------------------------
 %% Internal functions
 %%------------------------------------------------------------------------------
+
 err_msg(Msg) ->
     list_to_binary(io_lib:format("~0p", [Msg])).
 

+ 35 - 0
apps/emqx_rule_engine/src/proto/emqx_rule_engine_proto_v1.erl

@@ -0,0 +1,35 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_rule_engine_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([ introduced_in/0
+
+        , reset_metrics/1
+        ]).
+
+-include_lib("emqx/include/bpapi.hrl").
+-include_lib("emqx_rule_engine/include/rule_engine.hrl").
+
+introduced_in() ->
+    "5.0.0".
+
+-spec reset_metrics(rule_id()) ->
+          emqx_cluster_rpc:multicall_return(ok).
+reset_metrics(RuleId) ->
+    emqx_cluster_rpc:multicall(emqx_rule_engine, reset_metrics_for_rule, [RuleId]).

+ 3 - 0
apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl

@@ -46,6 +46,9 @@ t_crud_rule_api(_Config) ->
     ct:pal("RList : ~p", [Rules]),
     ?assert(length(Rules) > 0),
 
+    {200, Rule0} = emqx_rule_engine_api:'/rules/:id/reset_metrics'(put, #{bindings => #{id => RuleID}}),
+    ?assertEqual(<<"Reset Success">>, Rule0),
+
     {200, Rule1} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}}),
     ct:pal("RShow : ~p", [Rule1]),
     ?assertEqual(Rule, Rule1),