فهرست منبع

feat: impl rule_engine reset_metrics api

EMQ-YangM 3 سال پیش
والد
کامیت
fa0c4d17ee

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -1,4 +1,5 @@
 {emqx,1}.
 {emqx,1}.
+{emqx_rule_engine,1}.
 {emqx_bridge,1}.
 {emqx_bridge,1}.
 {emqx_authn,1}.
 {emqx_authn,1}.
 {emqx_authz,1}.
 {emqx_authz,1}.

+ 4 - 0
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -56,6 +56,7 @@
         , unload_hooks_for_rule/1
         , unload_hooks_for_rule/1
         , maybe_add_metrics_for_rule/1
         , maybe_add_metrics_for_rule/1
         , clear_metrics_for_rule/1
         , clear_metrics_for_rule/1
+        , reset_metrics_for_rule/1
         ]).
         ]).
 
 
 %% exported for `emqx_telemetry'
 %% exported for `emqx_telemetry'
@@ -195,6 +196,9 @@ maybe_add_metrics_for_rule(Id) ->
 clear_metrics_for_rule(Id) ->
 clear_metrics_for_rule(Id) ->
     ok = emqx_plugin_libs_metrics:clear_metrics(rule_metrics, Id).
     ok = emqx_plugin_libs_metrics:clear_metrics(rule_metrics, Id).
 
 
+reset_metrics_for_rule(Id) ->
+    emqx_plugin_libs_metrics:reset_metrics(rule_metrics, Id).
+
 unload_hooks_for_rule(#{id := Id, from := Topics}) ->
 unload_hooks_for_rule(#{id := Id, from := Topics}) ->
     lists:foreach(fun(Topic) ->
     lists:foreach(fun(Topic) ->
         case get_rules_with_same_event(Topic) of
         case get_rules_with_same_event(Topic) of

+ 23 - 1
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -28,7 +28,7 @@
 -export([api_spec/0, paths/0, schema/1, namespace/0]).
 -export([api_spec/0, paths/0, schema/1, namespace/0]).
 
 
 %% API callbacks
 %% API callbacks
--export(['/rule_events'/2, '/rule_test'/2, '/rules'/2, '/rules/:id'/2]).
+-export(['/rule_events'/2, '/rule_test'/2, '/rules'/2, '/rules/:id'/2, '/rules/:id/reset_metrics'/2]).
 
 
 -define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~ts Not Found", [(ID)]))).
 -define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~ts Not Found", [(ID)]))).
 -define(ERR_BADARGS(REASON),
 -define(ERR_BADARGS(REASON),
@@ -166,6 +166,21 @@ schema("/rules/:id") ->
         }
         }
     };
     };
 
 
+schema("/rules/:id/reset_metrics") ->
+    #{
+        operationId => '/rules/:id/reset_metrics',
+        put => #{
+            tags => [<<"rules">>],
+            description => <<"Reset a rule metrics">>,
+            summary => <<"Reset a Rule Metrics">>,
+            parameters => param_path_id(),
+            responses => #{
+                400 => error_schema('BAD_REQUEST', "RPC Call Failed"),
+                200 => <<"Reset Success">>
+            }
+        }
+    };
+
 schema("/rule_test") ->
 schema("/rule_test") ->
     #{
     #{
         operationId => '/rule_test',
         operationId => '/rule_test',
@@ -262,10 +277,17 @@ replace_sql_clrf(#{ <<"sql">> := SQL } = Params) ->
                            id => Id, reason => Reason}),
                            id => Id, reason => Reason}),
             {500, #{code => 'INTERNAL_ERROR', message => ?ERR_BADARGS(Reason)}}
             {500, #{code => 'INTERNAL_ERROR', message => ?ERR_BADARGS(Reason)}}
     end.
     end.
+'/rules/:id/reset_metrics'(put, #{bindings := #{id := RuleId}}) ->
+    case emqx_rule_engine_proto_v1:reset_metrics(RuleId) of
+        {ok, _TxnId, _Result} -> {200, <<"Reset Success">>};
+        Failed -> {400, #{code => 'BAD_REQUEST',
+                          message => err_msg(Failed)}}
+    end.
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% Internal functions
 %% Internal functions
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
+
 err_msg(Msg) ->
 err_msg(Msg) ->
     list_to_binary(io_lib:format("~0p", [Msg])).
     list_to_binary(io_lib:format("~0p", [Msg])).
 
 

+ 35 - 0
apps/emqx_rule_engine/src/proto/emqx_rule_engine_proto_v1.erl

@@ -0,0 +1,35 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_rule_engine_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([ introduced_in/0
+
+        , reset_metrics/1
+        ]).
+
+-include_lib("emqx/include/bpapi.hrl").
+-include_lib("emqx_rule_engine/include/rule_engine.hrl").
+
+introduced_in() ->
+    "5.0.0".
+
+-spec reset_metrics(emqx_rule_engine:rule_id()) ->
+          emqx_cluster_rpc:multicall_return(ok).
+reset_metrics(RuleId) ->
+    emqx_cluster_rpc:multicall(emqx_rule_engine, reset_metrics_for_rule, [RuleId]).

+ 3 - 0
apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl

@@ -46,6 +46,9 @@ t_crud_rule_api(_Config) ->
     ct:pal("RList : ~p", [Rules]),
     ct:pal("RList : ~p", [Rules]),
     ?assert(length(Rules) > 0),
     ?assert(length(Rules) > 0),
 
 
+    {200, Rule0} = emqx_rule_engine_api:'/rules/:id/reset_metrics'(put, #{bindings => #{id => RuleID}}),
+    ?assertEqual(<<"Reset Success">>, Rule0),
+
     {200, Rule1} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}}),
     {200, Rule1} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}}),
     ct:pal("RShow : ~p", [Rule1]),
     ct:pal("RShow : ~p", [Rule1]),
     ?assertEqual(Rule, Rule1),
     ?assertEqual(Rule, Rule1),