Просмотр исходного кода

Merge pull request #6256 from terry-xiaoyu/resource_metrics2

Add metrics to emqx_resource and emqx_bridges
Shawn 4 лет назад
Родитель
Сommit
af5002733b

+ 8 - 9
apps/emqx_bridge/src/emqx_bridge.erl

@@ -21,7 +21,6 @@
 -export([post_config_update/5]).
 
 -export([ load_hook/0
-        , reload_hook/0
         , unload_hook/0
         ]).
 
@@ -55,22 +54,21 @@
 -export([ config_key_path/0
         ]).
 
-reload_hook() ->
-    unload_hook(),
-    load_hook().
-
 load_hook() ->
     Bridges = emqx:get_config([bridges], #{}),
+    load_hook(Bridges).
+
+load_hook(Bridges) ->
     lists:foreach(fun({_Type, Bridge}) ->
             lists:foreach(fun({_Name, BridgeConf}) ->
-                    load_hook(BridgeConf)
+                    do_load_hook(BridgeConf)
                 end, maps:to_list(Bridge))
         end, maps:to_list(Bridges)).
 
-load_hook(#{from_local_topic := _}) ->
+do_load_hook(#{from_local_topic := _}) ->
     emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}),
     ok;
-load_hook(_Conf) -> ok.
+do_load_hook(_Conf) -> ok.
 
 unload_hook() ->
     ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}).
@@ -109,7 +107,8 @@ post_config_update(_, _Req, NewConf, OldConf, _AppEnv) ->
         {fun create/3, Added},
         {fun update/3, Updated}
     ]),
-    ok = reload_hook(),
+    ok = unload_hook(),
+    ok = load_hook(NewConf),
     Result.
 
 perform_bridge_changes(Tasks) ->

+ 21 - 19
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -36,21 +36,21 @@
                 ". Bridge Ids must be of format <bridge_type>:<name>">>}}
     end).
 
--define(METRICS(SUCC, FAILED, RATE, RATE_5, RATE_MAX),
-    #{
+-define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX),
+    #{  matched => MATCH,
         success => SUCC,
         failed => FAILED,
-        rate => RATE,
-        rate_last5m => RATE_5,
-        rate_max => RATE_MAX
+        speed => RATE,
+        speed_last5m => RATE_5,
+        speed_max => RATE_MAX
     }).
--define(MATCH_METRICS(SUCC, FAILED, RATE, RATE_5, RATE_MAX),
-    #{
+-define(metrics(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX),
+    #{  matched := MATCH,
         success := SUCC,
         failed := FAILED,
-        rate := RATE,
-        rate_last5m := RATE_5,
-        rate_max := RATE_MAX
+        speed := RATE,
+        speed_last5m := RATE_5,
+        speed_max := RATE_MAX
     }).
 
 req_schema() ->
@@ -73,11 +73,12 @@ status_schema() ->
 metrics_schema() ->
     #{ type => object
      , properties => #{
+           matched => #{type => integer, example => "0"},
            success => #{type => integer, example => "0"},
            failed => #{type => integer, example => "0"},
-           rate => #{type => number, format => float, example => "0.0"},
-           rate_last5m => #{type => number, format => float, example => "0.0"},
-           rate_max => #{type => number, format => float, example => "0.0"}
+           speed => #{type => number, format => float, example => "0.0"},
+           speed_last5m => #{type => number, format => float, example => "0.0"},
+           speed_max => #{type => number, format => float, example => "0.0"}
        }
     }.
 
@@ -337,21 +338,22 @@ collect_metrics(Bridges) ->
     [maps:with([node, metrics], B) || B <- Bridges].
 
 aggregate_metrics(AllMetrics) ->
-    InitMetrics = ?METRICS(0,0,0,0,0),
-    lists:foldl(fun(#{metrics := ?MATCH_METRICS(Succ1, Failed1, Rate1, Rate5m1, RateMax1)},
-                    ?MATCH_METRICS(Succ0, Failed0, Rate0, Rate5m0, RateMax0)) ->
-            ?METRICS(Succ1 + Succ0, Failed1 + Failed0,
+    InitMetrics = ?METRICS(0,0,0,0,0,0),
+    lists:foldl(fun(#{metrics := ?metrics(Match1, Succ1, Failed1, Rate1, Rate5m1, RateMax1)},
+                    ?metrics(Match0, Succ0, Failed0, Rate0, Rate5m0, RateMax0)) ->
+            ?METRICS(Match1 + Match0, Succ1 + Succ0, Failed1 + Failed0,
                      Rate1 + Rate0, Rate5m1 + Rate5m0, RateMax1 + RateMax0)
         end, InitMetrics, AllMetrics).
 
-format_resp(#{id := Id, raw_config := RawConf, resource_data := #{mod := Mod, status := Status}}) ->
+format_resp(#{id := Id, raw_config := RawConf,
+              resource_data := #{mod := Mod, status := Status, metrics := Metrics}}) ->
     IsConnected = fun(started) -> connected; (_) -> disconnected end,
     RawConf#{
         id => Id,
         node => node(),
         bridge_type => emqx_bridge:bridge_type(Mod),
         status => IsConnected(Status),
-        metrics => ?METRICS(0,0,0,0,0)
+        metrics => Metrics
     }.
 
 rpc_multicall(Func, Args) ->

+ 3 - 2
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -127,10 +127,11 @@ on_stop(_InstId, #{name := InstanceId}) ->
                 connector => InstanceId, reason => Reason})
     end.
 
-on_query(_InstId, {send_message, Msg}, _AfterQuery, #{name := InstanceId}) ->
+on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) ->
     ?SLOG(debug, #{msg => "send msg to remote node", message => Msg,
         connector => InstanceId}),
-    emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg).
+    emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg),
+    emqx_resource:query_success(AfterQuery).
 
 on_health_check(_InstId, #{name := InstanceId} = State) ->
     case emqx_connector_mqtt_worker:ping(InstanceId) of

+ 96 - 9
apps/emqx_connector/test/emqx_connector_api_SUITE.erl

@@ -25,7 +25,8 @@
 -define(CONF_DEFAULT, <<"connectors: {}">>).
 -define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
 -define(CONNECTR_ID, <<"mqtt:test_connector">>).
--define(BRIDGE_ID, <<"mqtt:test_bridge">>).
+-define(BRIDGE_ID_INGRESS, <<"mqtt:ingress_test_bridge">>).
+-define(BRIDGE_ID_EGRESS, <<"mqtt:egress_test_bridge">>).
 -define(MQTT_CONNECOTR(Username),
 #{
     <<"server">> => <<"127.0.0.1:1883">>,
@@ -37,7 +38,7 @@
 -define(MQTT_CONNECOTR2(Server),
     ?MQTT_CONNECOTR(<<"user1">>)#{<<"server">> => Server}).
 
--define(MQTT_BRIDGE(ID),
+-define(MQTT_BRIDGE_INGRESS(ID),
 #{
     <<"connector">> => ID,
     <<"direction">> => <<"ingress">>,
@@ -49,6 +50,22 @@
     <<"retain">> => <<"${retain}">>
 }).
 
+-define(MQTT_BRIDGE_EGRESS(ID),
+#{
+    <<"connector">> => ID,
+    <<"direction">> => <<"egress">>,
+    <<"from_local_topic">> => <<"local_topic/#">>,
+    <<"to_remote_topic">> => <<"remote_topic/${topic}">>,
+    <<"payload">> => <<"${payload}">>,
+    <<"qos">> => <<"${qos}">>,
+    <<"retain">> => <<"${retain}">>
+}).
+
+-define(metrics(MATCH, SUCC, FAILED, SPEED, SPEED5M, SPEEDMAX),
+    #{<<"matched">> := MATCH, <<"success">> := SUCC,
+      <<"failed">> := FAILED, <<"speed">> := SPEED,
+      <<"speed_last5m">> := SPEED5M, <<"speed_max">> := SPEEDMAX}).
+
 all() ->
     emqx_common_test_helpers:all(?MODULE).
 
@@ -162,7 +179,7 @@ t_mqtt_crud_apis(_) ->
          }, jsx:decode(ErrMsg2)),
     ok.
 
-t_mqtt_conn_bridge(_) ->
+t_mqtt_conn_bridge_ingress(_) ->
     %% assert we there's no connectors and no bridges at first
     {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
@@ -184,10 +201,10 @@ t_mqtt_conn_bridge(_) ->
     %% ... and a MQTT bridge, using POST
     %% we bind this bridge to the connector created just now
     {ok, 201, Bridge} = request(post, uri(["bridges"]),
-                                ?MQTT_BRIDGE(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID}),
+                                ?MQTT_BRIDGE_INGRESS(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID_INGRESS}),
 
     %ct:pal("---bridge: ~p", [Bridge]),
-    ?assertMatch(#{ <<"id">> := ?BRIDGE_ID
+    ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_INGRESS
                   , <<"bridge_type">> := <<"mqtt">>
                   , <<"status">> := <<"connected">>
                   , <<"connector">> := ?CONNECTR_ID
@@ -217,7 +234,77 @@ t_mqtt_conn_bridge(_) ->
         end),
 
     %% delete the bridge
-    {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []),
+    {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_INGRESS]), []),
+    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+
+    %% delete the connector
+    {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
+    {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
+    ok.
+
+t_mqtt_conn_bridge_egress(_) ->
+    %% assert we there's no connectors and no bridges at first
+    {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
+    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+
+    %% then we add a mqtt connector, using POST
+    User1 = <<"user1">>,
+    {ok, 201, Connector} = request(post, uri(["connectors"]),
+                                ?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}),
+
+    %ct:pal("---connector: ~p", [Connector]),
+    ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
+                  , <<"server">> := <<"127.0.0.1:1883">>
+                  , <<"username">> := User1
+                  , <<"password">> := <<"">>
+                  , <<"proto_ver">> := <<"v4">>
+                  , <<"ssl">> := #{<<"enable">> := false}
+                  }, jsx:decode(Connector)),
+
+    %% ... and a MQTT bridge, using POST
+    %% we bind this bridge to the connector created just now
+    {ok, 201, Bridge} = request(post, uri(["bridges"]),
+                                ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID_EGRESS}),
+
+    %ct:pal("---bridge: ~p", [Bridge]),
+    ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
+                  , <<"bridge_type">> := <<"mqtt">>
+                  , <<"status">> := <<"connected">>
+                  , <<"connector">> := ?CONNECTR_ID
+                  }, jsx:decode(Bridge)),
+
+    %% we now test if the bridge works as expected
+    LocalTopic = <<"local_topic/1">>,
+    RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
+    Payload = <<"hello">>,
+    emqx:subscribe(RemoteTopic),
+    %% PUBLISH a message to the 'local' broker, as we have only one broker,
+    %% the remote broker is also the local one.
+    emqx:publish(emqx_message:make(LocalTopic, Payload)),
+
+    %% we should receive a message on the "remote" broker, with specified topic
+    ?assert(
+        receive
+            {deliver, RemoteTopic, #message{payload = Payload}} ->
+                ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
+                true;
+            Msg ->
+                ct:pal("Msg: ~p", [Msg]),
+                false
+        after 100 ->
+            false
+        end),
+
+    %% verify the metrics of the bridge
+    {ok, 200, BridgeStr} = request(get, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
+    ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
+                  , <<"metrics">> := ?metrics(1, 1, 0, _, _, _)
+                  , <<"node_metrics">> :=
+                      [#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}]
+                  }, jsx:decode(BridgeStr)),
+
+    %% delete the bridge
+    {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
 
     %% delete the connector
@@ -245,8 +332,8 @@ t_mqtt_conn_update(_) ->
     %% ... and a MQTT bridge, using POST
     %% we bind this bridge to the connector created just now
     {ok, 201, Bridge} = request(post, uri(["bridges"]),
-                                ?MQTT_BRIDGE(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID}),
-    ?assertMatch(#{ <<"id">> := ?BRIDGE_ID
+                                ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID_EGRESS}),
+    ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
                   , <<"bridge_type">> := <<"mqtt">>
                   , <<"status">> := <<"connected">>
                   , <<"connector">> := ?CONNECTR_ID
@@ -260,7 +347,7 @@ t_mqtt_conn_update(_) ->
     {ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]),
                                  ?MQTT_CONNECOTR2(<<"127.0.0.1 : 1883">>)),
     %% delete the bridge
-    {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []),
+    {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
 
     %% delete the connector

+ 315 - 0
apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl

@@ -0,0 +1,315 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_plugin_libs_metrics).
+
+-behaviour(gen_server).
+
+%% API functions
+-export([ start_link/1
+        , stop/1
+        , child_spec/1
+        ]).
+
+-export([ inc/3
+        , inc/4
+        , get/3
+        , get_speed/2
+        , create_metrics/2
+        , clear_metrics/2
+        ]).
+
+-export([ get_metrics/2
+        , get_matched/2
+        , get_success/2
+        , get_failed/2
+        , inc_matched/2
+        , inc_success/2
+        , inc_failed/2
+        ]).
+
+%% gen_server callbacks
+-export([ init/1
+        , handle_call/3
+        , handle_info/2
+        , handle_cast/2
+        , code_change/3
+        , terminate/2
+        ]).
+
+-ifndef(TEST).
+-define(SECS_5M, 300).
+-define(SAMPLING, 10).
+-else.
+%% Use 5 secs average speed instead of 5 mins in case of testing
+-define(SECS_5M, 5).
+-define(SAMPLING, 1).
+-endif.
+
+-export_type([metrics/0]).
+
+-type metrics() :: #{
+    matched => integer(),
+    success => integer(),
+    failed => integer(),
+    speed => float(),
+    speed_max => float(),
+    speed_last5m => float()
+}.
+-type handler_name() :: atom().
+-type metric_id() :: binary().
+
+-define(CntrRef(Name), {?MODULE, Name}).
+-define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)).
+
+%% the speed of 'matched'
+-record(speed, {
+            max = 0 :: number(),
+            current = 0 :: number(),
+            last5m = 0 :: number(),
+            %% metadata for calculating the avg speed
+            tick = 1 :: number(),
+            last_v = 0 :: number(),
+            %% metadata for calculating the 5min avg speed
+            last5m_acc = 0 :: number(),
+            last5m_smpl = [] :: list()
+        }).
+
+-record(state, {
+            metric_ids = sets:new(),
+            speeds :: undefined | #{metric_id() => #speed{}}
+        }).
+
+%%------------------------------------------------------------------------------
+%% APIs
+%%------------------------------------------------------------------------------
+
+-spec(child_spec(handler_name()) -> supervisor:child_spec()).
+child_spec(Name) ->
+    #{ id => emqx_plugin_libs_metrics
+     , start => {emqx_plugin_libs_metrics, start_link, [Name]}
+     , restart => permanent
+     , shutdown => 5000
+     , type => worker
+     , modules => [emqx_plugin_libs_metrics]
+     }.
+
+-spec(create_metrics(handler_name(), metric_id()) -> ok).
+create_metrics(Name, Id) ->
+    gen_server:call(Name, {create_metrics, Id}).
+
+-spec(clear_metrics(handler_name(), metric_id()) -> ok).
+clear_metrics(Name, Id) ->
+    gen_server:call(Name, {delete_metrics, Id}).
+
+-spec(get(handler_name(), metric_id(), atom()) -> number()).
+get(Name, Id, Metric) ->
+    case get_couters_ref(Name, Id) of
+        not_found -> 0;
+        Ref -> counters:get(Ref, metrics_idx(Metric))
+    end.
+
+-spec(get_speed(handler_name(), metric_id()) -> map()).
+get_speed(Name, Id) ->
+    gen_server:call(Name, {get_speed, Id}).
+
+-spec(get_metrics(handler_name(), metric_id()) -> metrics()).
+get_metrics(Name, Id) ->
+    #{max := Max, current := Current, last5m := Last5M} = get_speed(Name, Id),
+    #{matched => get_matched(Name, Id),
+      success => get_success(Name, Id),
+      failed => get_failed(Name, Id),
+      speed => Current,
+      speed_max => Max,
+      speed_last5m => Last5M
+    }.
+
+-spec inc(handler_name(), metric_id(), atom()) -> ok.
+inc(Name, Id, Metric) ->
+    inc(Name, Id, Metric, 1).
+
+-spec inc(handler_name(), metric_id(), atom(), pos_integer()) -> ok.
+inc(Name, Id, Metric, Val) ->
+    case get_couters_ref(Name, Id) of
+        not_found ->
+            %% this may occur when increasing a counter for
+            %% a rule that was created from a remove node.
+            create_metrics(Name, Id),
+            counters:add(get_couters_ref(Name, Id), metrics_idx(Metric), Val);
+        Ref ->
+            counters:add(Ref, metrics_idx(Metric), Val)
+    end.
+
+inc_matched(Name, Id) ->
+    inc(Name, Id, 'matched', 1).
+
+inc_success(Name, Id) ->
+    inc(Name, Id, 'success', 1).
+
+inc_failed(Name, Id) ->
+    inc(Name, Id, 'failed', 1).
+
+get_matched(Name, Id) ->
+    get(Name, Id, 'matched').
+
+get_success(Name, Id) ->
+    get(Name, Id, 'success').
+
+get_failed(Name, Id) ->
+    get(Name, Id, 'failed').
+
+start_link(Name) ->
+    gen_server:start_link({local, Name}, ?MODULE, Name, []).
+
+init(Name) ->
+    erlang:process_flag(trap_exit, true),
+    %% the speed metrics
+    erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
+    persistent_term:put(?CntrRef(Name), #{}),
+    {ok, #state{}}.
+
+handle_call({get_speed, _Id}, _From, State = #state{speeds = undefined}) ->
+    {reply, format_speed(#speed{}), State};
+handle_call({get_speed, Id}, _From, State = #state{speeds = Speeds}) ->
+    {reply, case maps:get(Id, Speeds, undefined) of
+                undefined -> format_speed(#speed{});
+                Speed -> format_speed(Speed)
+            end, State};
+
+handle_call({create_metrics, Id}, _From,
+            State = #state{metric_ids = MIDs, speeds = Speeds}) ->
+    {reply, create_counters(get_self_name(), Id),
+     State#state{metric_ids = sets:add_element(Id, MIDs),
+                 speeds =  case Speeds of
+                                    undefined -> #{Id => #speed{}};
+                                    _ -> Speeds#{Id => #speed{}}
+                                end}};
+
+handle_call({delete_metrics, Id}, _From,
+            State = #state{metric_ids = MIDs, speeds = Speeds}) ->
+    {reply, delete_counters(get_self_name(), Id),
+     State#state{metric_ids = sets:del_element(Id, MIDs),
+                 speeds =  case Speeds of
+                                    undefined -> undefined;
+                                    _ -> maps:remove(Id, Speeds)
+                                end}};
+
+handle_call(_Request, _From, State) ->
+    {reply, ok, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(ticking, State = #state{speeds = undefined}) ->
+    erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
+    {noreply, State};
+
+handle_info(ticking, State = #state{speeds = Speeds0}) ->
+    Speeds = maps:map(
+                    fun(Id, Speed) ->
+                        calculate_speed(get_matched(get_self_name(), Id), Speed)
+                    end, Speeds0),
+    erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
+    {noreply, State#state{speeds = Speeds}};
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+terminate(_Reason, #state{metric_ids = MIDs}) ->
+    Name = get_self_name(),
+    [delete_counters(Name, Id) || Id <- sets:to_list(MIDs)],
+    persistent_term:erase(?CntrRef(Name)).
+
+stop(Name) ->
+    gen_server:stop(Name).
+
+%%------------------------------------------------------------------------------
+%% Internal Functions
+%%------------------------------------------------------------------------------
+
+create_counters(Name, Id) ->
+    case get_couters_ref(Name, Id) of
+        not_found ->
+            Counters = get_all_counters(Name),
+            CntrRef = counters:new(max_counters_size(), [write_concurrency]),
+            persistent_term:put(?CntrRef(Name), Counters#{Id => CntrRef});
+        _Ref -> ok
+    end.
+
+delete_counters(Name, Id) ->
+    persistent_term:put(?CntrRef(Name), maps:remove(Id, get_all_counters(Name))).
+
+get_couters_ref(Name, Id) ->
+    maps:get(Id, get_all_counters(Name), not_found).
+
+get_all_counters(Name) ->
+    persistent_term:get(?CntrRef(Name), #{}).
+
+calculate_speed(_CurrVal, undefined) ->
+    undefined;
+calculate_speed(CurrVal, #speed{max = MaxSpeed0, last_v = LastVal,
+                                     tick = Tick, last5m_acc = AccSpeed5Min0,
+                                     last5m_smpl = Last5MinSamples0}) ->
+    %% calculate the current speed based on the last value of the counter
+    CurrSpeed = (CurrVal - LastVal) / ?SAMPLING,
+
+    %% calculate the max speed since the emqx startup
+    MaxSpeed =
+        if MaxSpeed0 >= CurrSpeed -> MaxSpeed0;
+           true -> CurrSpeed
+        end,
+
+    %% calculate the average speed in last 5 mins
+    {Last5MinSamples, Acc5Min, Last5Min} =
+        if Tick =< ?SAMPCOUNT_5M ->
+                Acc = AccSpeed5Min0 + CurrSpeed,
+                {lists:reverse([CurrSpeed | lists:reverse(Last5MinSamples0)]),
+                 Acc, Acc / Tick};
+           true ->
+                [FirstSpeed | Speeds] = Last5MinSamples0,
+                Acc =  AccSpeed5Min0 + CurrSpeed - FirstSpeed,
+                {lists:reverse([CurrSpeed | lists:reverse(Speeds)]),
+                 Acc, Acc / ?SAMPCOUNT_5M}
+        end,
+
+    #speed{max = MaxSpeed, current = CurrSpeed, last5m = Last5Min,
+                last_v = CurrVal, last5m_acc = Acc5Min,
+                last5m_smpl = Last5MinSamples, tick = Tick + 1}.
+
+format_speed(#speed{max = Max, current = Current, last5m = Last5Min}) ->
+    #{max => Max, current => precision(Current, 2), last5m => precision(Last5Min, 2)}.
+
+precision(Float, N) ->
+    Base = math:pow(10, N),
+    round(Float * Base) / Base.
+
+get_self_name() ->
+    {registered_name, Name} = process_info(self(), registered_name),
+    Name.
+
+%%------------------------------------------------------------------------------
+%% Metrics Definitions
+%%------------------------------------------------------------------------------
+
+max_counters_size() -> 32.
+metrics_idx('matched') -> 1;
+metrics_idx('success') -> 2;
+metrics_idx('failed') -> 3;
+metrics_idx(_) -> 32.
+

+ 96 - 0
apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl

@@ -0,0 +1,96 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_plugin_libs_metrics_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+all() ->
+    [ {group, metrics}
+    , {group, speed} ].
+
+suite() ->
+    [{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}].
+
+groups() ->
+    [{metrics, [sequence],
+        [ t_rule
+        , t_no_creation_1
+        ]},
+    {speed, [sequence],
+        [ rule_speed
+        ]}
+    ].
+
+-define(NAME, ?MODULE).
+
+init_per_suite(Config) ->
+    emqx_common_test_helpers:start_apps([emqx_conf]),
+    {ok, _} = emqx_plugin_libs_metrics:start_link(?NAME),
+    Config.
+
+end_per_suite(_Config) ->
+    catch emqx_plugin_libs_metrics:stop(?NAME),
+    emqx_common_test_helpers:stop_apps([emqx_conf]),
+    ok.
+
+init_per_testcase(_, Config) ->
+    catch emqx_plugin_libs_metrics:stop(?NAME),
+    {ok, _} = emqx_plugin_libs_metrics:start_link(?NAME),
+    Config.
+
+end_per_testcase(_, _Config) ->
+    ok.
+
+t_no_creation_1(_) ->
+    ?assertEqual(ok, emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched')).
+
+t_rule(_) ->
+    ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>),
+    ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule2">>),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule2">>, 'rules.matched'),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule2">>, 'rules.matched'),
+    ?assertEqual(1, emqx_plugin_libs_metrics:get(?NAME, <<"rule1">>, 'rules.matched')),
+    ?assertEqual(2, emqx_plugin_libs_metrics:get(?NAME, <<"rule2">>, 'rules.matched')),
+    ?assertEqual(0, emqx_plugin_libs_metrics:get(?NAME, <<"rule3">>, 'rules.matched')),
+    ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule1">>),
+    ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule2">>).
+
+rule_speed(_) ->
+    ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>),
+    ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule:2">>),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule:2">>, 'rules.matched'),
+    ?assertEqual(2, emqx_plugin_libs_metrics:get(?NAME, <<"rule1">>, 'rules.matched')),
+    ct:sleep(1000),
+    ?LET(#{max := Max, current := Current},
+         emqx_plugin_libs_metrics:get_speed(?NAME, <<"rule1">>),
+         {?assert(Max =< 2),
+          ?assert(Current =< 2)}),
+    ct:sleep(2100),
+    ?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_plugin_libs_metrics:get_speed(?NAME, <<"rule1">>),
+         {?assert(Max =< 2),
+          ?assert(Current == 0),
+          ?assert(Last5Min =< 0.67)}),
+    ct:sleep(3000),
+    ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule1">>),
+    ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule:2">>).

+ 7 - 6
apps/emqx_resource/include/emqx_resource.hrl

@@ -21,14 +21,15 @@
 -type resource_spec() :: map().
 -type resource_state() :: term().
 -type resource_data() :: #{
-    id => instance_id(),
-    mod => module(),
-    config => resource_config(),
-    state => resource_state(),
-    status => started | stopped
+    id := instance_id(),
+    mod := module(),
+    config := resource_config(),
+    state := resource_state(),
+    status := started | stopped,
+    metrics := emqx_plugin_libs_metrics:metrics()
 }.
 -type resource_group() :: binary().
--type after_query() :: {OnSuccess :: after_query_fun(), OnFailed :: after_query_fun()} |
+-type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} |
     undefined.
 
 %% the `after_query_fun()` is mainly for callbacks that increment counters or do some fallback

+ 17 - 5
apps/emqx_resource/src/emqx_resource.erl

@@ -122,13 +122,18 @@ is_resource_mod(Module) ->
 
 -spec query_success(after_query()) -> ok.
 query_success(undefined) -> ok;
-query_success({{OnSucc, Args}, _}) ->
-    safe_apply(OnSucc, Args).
+query_success({OnSucc, _}) ->
+    apply_query_after_calls(OnSucc).
 
 -spec query_failed(after_query()) -> ok.
 query_failed(undefined) -> ok;
-query_failed({_, {OnFailed, Args}}) ->
-    safe_apply(OnFailed, Args).
+query_failed({_, OnFailed}) ->
+    apply_query_after_calls(OnFailed).
+
+apply_query_after_calls(Funcs) ->
+    lists:foreach(fun({Fun, Args}) ->
+            safe_apply(Fun, Args)
+        end, Funcs).
 
 %% =================================================================================
 %% APIs for resource instances
@@ -175,7 +180,7 @@ remove_local(InstId) ->
 %% =================================================================================
 -spec query(instance_id(), Request :: term()) -> Result :: term().
 query(InstId, Request) ->
-    query(InstId, Request, undefined).
+    query(InstId, Request, inc_metrics_funcs(InstId)).
 
 %% same to above, also defines what to do when the Module:on_query success or failed
 %% it is the duty of the Module to apply the `after_query()` functions.
@@ -321,6 +326,13 @@ check_and_do(ResourceType, RawConfig, Do) when is_function(Do) ->
 filter_instances(Filter) ->
     [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
 
+inc_metrics_funcs(InstId) ->
+    OnFailed = [{fun emqx_plugin_libs_metrics:inc_failed/2, [resource_metrics, InstId]}],
+    OnSucc = [ {fun emqx_plugin_libs_metrics:inc_matched/2, [resource_metrics, InstId]}
+             , {fun emqx_plugin_libs_metrics:inc_success/2, [resource_metrics, InstId]}
+             ],
+    {OnSucc, OnFailed}.
+
 call_instance(InstId, Query) ->
     emqx_resource_instance:hash_call(InstId, Query).
 

+ 8 - 1
apps/emqx_resource/src/emqx_resource_instance.erl

@@ -24,6 +24,7 @@
 
 %% load resource instances from *.conf files
 -export([ lookup/1
+        , get_metrics/1
         , list_all/0
         , create_local/3
         ]).
@@ -65,9 +66,13 @@ hash_call(InstId, Request, Timeout) ->
 lookup(InstId) ->
     case ets:lookup(emqx_resource_instance, InstId) of
         [] -> {error, not_found};
-        [{_, Data}] -> {ok, Data#{id => InstId}}
+        [{_, Data}] ->
+            {ok, Data#{id => InstId, metrics => get_metrics(InstId)}}
     end.
 
+get_metrics(InstId) ->
+    emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId).
+
 force_lookup(InstId) ->
     {ok, Data} = lookup(InstId),
     Data.
@@ -174,6 +179,7 @@ do_create(InstId, ResourceType, Config) ->
                         #{mod => ResourceType, config => Config,
                           state => ResourceState, status => stopped}}),
                     _ = do_health_check(InstId),
+                    ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId),
                     {ok, force_lookup(InstId)};
                 {error, Reason} ->
                     logger:error("start ~ts resource ~ts failed: ~p",
@@ -207,6 +213,7 @@ do_remove(InstId) ->
 do_remove(Mod, InstId, ResourceState) ->
     _ = emqx_resource:call_stop(InstId, Mod, ResourceState),
     ets:delete(emqx_resource_instance, InstId),
+    ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId),
     ok.
 
 do_restart(InstId) ->

+ 5 - 2
apps/emqx_resource/src/emqx_resource_sup.erl

@@ -32,17 +32,20 @@ init([]) ->
     _ = ets:new(emqx_resource_instance, TabOpts),
 
     SupFlags = #{strategy => one_for_one, intensity => 10, period => 10},
+    Metrics = emqx_plugin_libs_metrics:child_spec(resource_metrics),
+
     Pool = ?RESOURCE_INST_MOD,
     Mod = ?RESOURCE_INST_MOD,
     ensure_pool(Pool, hash, [{size, ?POOL_SIZE}]),
-    {ok, {SupFlags, [
+    ResourceInsts = [
         begin
             ensure_pool_worker(Pool, {Pool, Idx}, Idx),
             #{id => {Mod, Idx},
               start => {Mod, start_link, [Pool, Idx]},
               restart => transient,
               shutdown => 5000, type => worker, modules => [Mod]}
-        end || Idx <- lists:seq(1, ?POOL_SIZE)]}}.
+        end || Idx <- lists:seq(1, ?POOL_SIZE)],
+    {ok, {SupFlags, [Metrics | ResourceInsts]}}.
 
 %% internal functions
 ensure_pool(Pool, Type, Opts) ->

+ 1 - 1
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -88,7 +88,7 @@ t_query(_) ->
     Failure = fun() -> Pid ! failure end,
 
     #{pid := _} = emqx_resource:query(?ID, get_state),
-    #{pid := _} = emqx_resource:query(?ID, get_state, {{Success, []}, {Failure, []}}),
+    #{pid := _} = emqx_resource:query(?ID, get_state, {[{Success, []}], [{Failure, []}]}),
 
     receive
         Message -> ?assertEqual(success, Message)

+ 2 - 2
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -163,10 +163,10 @@ load_hooks_for_rule(#{from := Topics}) ->
     lists:foreach(fun emqx_rule_events:load/1, Topics).
 
 add_metrics_for_rule(#{id := Id}) ->
-    ok = emqx_rule_metrics:create_rule_metrics(Id).
+    ok = emqx_plugin_libs_metrics:create_metrics(rule_metrics, Id).
 
 clear_metrics_for_rule(#{id := Id}) ->
-    ok = emqx_rule_metrics:clear_rule_metrics(Id).
+    ok = emqx_plugin_libs_metrics:clear_metrics(rule_metrics, Id).
 
 unload_hooks_for_rule(#{id := Id, from := Topics}) ->
     lists:foreach(fun(Topic) ->

+ 13 - 1
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -338,7 +338,19 @@ do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) ->
     BridgeChannelId.
 
 get_rule_metrics(Id) ->
-    [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id]))
+    Format = fun (Node, #{matched := Matched,
+                          speed := Current,
+                          speed_max := Max,
+                          speed_last5m := Last5M
+                        }) ->
+        #{ matched => Matched
+         , speed => Current
+         , speed_max => Max
+         , speed_last5m => Last5M
+         , node => Node
+         }
+    end,
+    [Format(Node, rpc:call(Node, emqx_plugin_libs_metrics, get_metrics, [rule_metrics, Id]))
      || Node <- mria_mnesia:running_nodes()].
 
 get_one_rule(AllRules, Id) ->

+ 1 - 6
apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl

@@ -34,10 +34,5 @@ init([]) ->
                  shutdown => 5000,
                  type => worker,
                  modules => [emqx_rule_engine]},
-    Metrics = #{id => emqx_rule_metrics,
-                start => {emqx_rule_metrics, start_link, []},
-                restart => permanent,
-                shutdown => 5000,
-                type => worker,
-                modules => [emqx_rule_metrics]},
+    Metrics = emqx_plugin_libs_metrics:child_spec(rule_metrics),
     {ok, {{one_for_one, 10, 10}, [Registry, Metrics]}}.

+ 0 - 265
apps/emqx_rule_engine/src/emqx_rule_metrics.erl

@@ -1,265 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_rule_metrics).
-
--behaviour(gen_server).
-
--include("rule_engine.hrl").
-
-%% API functions
--export([ start_link/0
-        , stop/0
-        ]).
-
--export([ get_rules_matched/1
-        ]).
-
--export([ inc/2
-        , inc/3
-        , get/2
-        , get_rule_speed/1
-        , create_rule_metrics/1
-        , clear_rule_metrics/1
-        ]).
-
--export([ get_rule_metrics/1
-        ]).
-
-%% gen_server callbacks
--export([ init/1
-        , handle_call/3
-        , handle_info/2
-        , handle_cast/2
-        , code_change/3
-        , terminate/2
-        ]).
-
--ifndef(TEST).
--define(SECS_5M, 300).
--define(SAMPLING, 10).
--else.
-%% Use 5 secs average speed instead of 5 mins in case of testing
--define(SECS_5M, 5).
--define(SAMPLING, 1).
--endif.
-
--define(CntrRef, ?MODULE).
--define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)).
-
--record(rule_speed, {
-            max = 0 :: number(),
-            current = 0 :: number(),
-            last5m = 0 :: number(),
-            %% metadata for calculating the avg speed
-            tick = 1 :: number(),
-            last_v = 0 :: number(),
-            %% metadata for calculating the 5min avg speed
-            last5m_acc = 0 :: number(),
-            last5m_smpl = [] :: list()
-        }).
-
--record(state, {
-            metric_ids = sets:new(),
-            rule_speeds :: undefined | #{rule_id() => #rule_speed{}}
-        }).
-
-%%------------------------------------------------------------------------------
-%% APIs
-%%------------------------------------------------------------------------------
-
--spec(create_rule_metrics(rule_id()) -> ok).
-create_rule_metrics(Id) ->
-    gen_server:call(?MODULE, {create_rule_metrics, Id}).
-
--spec(clear_rule_metrics(rule_id()) -> ok).
-clear_rule_metrics(Id) ->
-    gen_server:call(?MODULE, {delete_rule_metrics, Id}).
-
--spec(get(rule_id(), atom()) -> number()).
-get(Id, Metric) ->
-    case get_couters_ref(Id) of
-        not_found -> 0;
-        Ref -> counters:get(Ref, metrics_idx(Metric))
-    end.
-
--spec(get_rule_speed(rule_id()) -> map()).
-get_rule_speed(Id) ->
-    gen_server:call(?MODULE, {get_rule_speed, Id}).
-
--spec(get_rule_metrics(rule_id()) -> map()).
-get_rule_metrics(Id) ->
-    #{max := Max, current := Current, last5m := Last5M} = get_rule_speed(Id),
-    #{matched => get_rules_matched(Id),
-      speed => Current,
-      speed_max => Max,
-      speed_last5m => Last5M
-    }.
-
--spec inc(rule_id(), atom()) -> ok.
-inc(Id, Metric) ->
-    inc(Id, Metric, 1).
-
--spec inc(rule_id(), atom(), pos_integer()) -> ok.
-inc(Id, Metric, Val) ->
-    case get_couters_ref(Id) of
-        not_found ->
-            %% this may occur when increasing a counter for
-            %% a rule that was created from a remove node.
-            create_rule_metrics(Id),
-            counters:add(get_couters_ref(Id), metrics_idx(Metric), Val);
-        Ref ->
-            counters:add(Ref, metrics_idx(Metric), Val)
-    end.
-
-get_rules_matched(Id) ->
-    get(Id, 'rules.matched').
-
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-init([]) ->
-    erlang:process_flag(trap_exit, true),
-    %% the speed metrics
-    erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
-    persistent_term:put(?CntrRef, #{}),
-    {ok, #state{}}.
-
-handle_call({get_rule_speed, _Id}, _From, State = #state{rule_speeds = undefined}) ->
-    {reply, format_rule_speed(#rule_speed{}), State};
-handle_call({get_rule_speed, Id}, _From, State = #state{rule_speeds = RuleSpeeds}) ->
-    {reply, case maps:get(Id, RuleSpeeds, undefined) of
-                undefined -> format_rule_speed(#rule_speed{});
-                Speed -> format_rule_speed(Speed)
-            end, State};
-
-handle_call({create_rule_metrics, Id}, _From,
-            State = #state{metric_ids = MIDs, rule_speeds = RuleSpeeds}) ->
-    {reply, create_counters(Id),
-     State#state{metric_ids = sets:add_element(Id, MIDs),
-                 rule_speeds =  case RuleSpeeds of
-                                    undefined -> #{Id => #rule_speed{}};
-                                    _ -> RuleSpeeds#{Id => #rule_speed{}}
-                                end}};
-
-handle_call({delete_rule_metrics, Id}, _From,
-            State = #state{metric_ids = MIDs, rule_speeds = RuleSpeeds}) ->
-    {reply, delete_counters(Id),
-     State#state{metric_ids = sets:del_element(Id, MIDs),
-                 rule_speeds =  case RuleSpeeds of
-                                    undefined -> undefined;
-                                    _ -> maps:remove(Id, RuleSpeeds)
-                                end}};
-
-handle_call(_Request, _From, State) ->
-    {reply, ok, State}.
-
-handle_cast(_Msg, State) ->
-    {noreply, State}.
-
-handle_info(ticking, State = #state{rule_speeds = undefined}) ->
-    erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
-    {noreply, State};
-
-handle_info(ticking, State = #state{rule_speeds = RuleSpeeds0}) ->
-    RuleSpeeds = maps:map(
-                    fun(Id, RuleSpeed) ->
-                        calculate_speed(get_rules_matched(Id), RuleSpeed)
-                    end, RuleSpeeds0),
-    erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
-    {noreply, State#state{rule_speeds = RuleSpeeds}};
-
-handle_info(_Info, State) ->
-    {noreply, State}.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-terminate(_Reason, #state{metric_ids = MIDs}) ->
-    [delete_counters(Id) || Id <- sets:to_list(MIDs)],
-    persistent_term:erase(?CntrRef).
-
-stop() ->
-    gen_server:stop(?MODULE).
-
-%%------------------------------------------------------------------------------
-%% Internal Functions
-%%------------------------------------------------------------------------------
-
-create_counters(Id) ->
-    case get_couters_ref(Id) of
-        not_found ->
-            Counters = get_all_counters(),
-            CntrRef = counters:new(max_counters_size(), [write_concurrency]),
-            persistent_term:put(?CntrRef, Counters#{Id => CntrRef});
-        _Ref -> ok
-    end.
-
-delete_counters(Id) ->
-    persistent_term:put(?CntrRef, maps:remove(Id, get_all_counters())).
-
-get_couters_ref(Id) ->
-    maps:get(Id, get_all_counters(), not_found).
-
-get_all_counters() ->
-    persistent_term:get(?CntrRef, #{}).
-
-calculate_speed(_CurrVal, undefined) ->
-    undefined;
-calculate_speed(CurrVal, #rule_speed{max = MaxSpeed0, last_v = LastVal,
-                                     tick = Tick, last5m_acc = AccSpeed5Min0,
-                                     last5m_smpl = Last5MinSamples0}) ->
-    %% calculate the current speed based on the last value of the counter
-    CurrSpeed = (CurrVal - LastVal) / ?SAMPLING,
-
-    %% calculate the max speed since the emqx startup
-    MaxSpeed =
-        if MaxSpeed0 >= CurrSpeed -> MaxSpeed0;
-           true -> CurrSpeed
-        end,
-
-    %% calculate the average speed in last 5 mins
-    {Last5MinSamples, Acc5Min, Last5Min} =
-        if Tick =< ?SAMPCOUNT_5M ->
-                Acc = AccSpeed5Min0 + CurrSpeed,
-                {lists:reverse([CurrSpeed | lists:reverse(Last5MinSamples0)]),
-                 Acc, Acc / Tick};
-           true ->
-                [FirstSpeed | Speeds] = Last5MinSamples0,
-                Acc =  AccSpeed5Min0 + CurrSpeed - FirstSpeed,
-                {lists:reverse([CurrSpeed | lists:reverse(Speeds)]),
-                 Acc, Acc / ?SAMPCOUNT_5M}
-        end,
-
-    #rule_speed{max = MaxSpeed, current = CurrSpeed, last5m = Last5Min,
-                last_v = CurrVal, last5m_acc = Acc5Min,
-                last5m_smpl = Last5MinSamples, tick = Tick + 1}.
-
-format_rule_speed(#rule_speed{max = Max, current = Current, last5m = Last5Min}) ->
-    #{max => Max, current => precision(Current, 2), last5m => precision(Last5Min, 2)}.
-
-precision(Float, N) ->
-    Base = math:pow(10, N),
-    round(Float * Base) / Base.
-
-%%------------------------------------------------------------------------------
-%% Metrics Definitions
-%%------------------------------------------------------------------------------
-
-max_counters_size() -> 2.
-metrics_idx('rules.matched') ->       1;
-metrics_idx(_) ->                     2.
-

+ 2 - 2
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -99,7 +99,7 @@ do_apply_rule(#{
     case ?RAISE(match_conditions(Conditions, ColumnsAndSelected),
                 {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
         true ->
-            ok = emqx_rule_metrics:inc(RuleId, 'rules.matched'),
+            ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId),
             Collection2 = filter_collection(Input, InCase, DoEach, Collection),
             {ok, [handle_output_list(Outputs, Coll, Input) || Coll <- Collection2]};
         false ->
@@ -117,7 +117,7 @@ do_apply_rule(#{id := RuleId,
     case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)),
                 {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
         true ->
-            ok = emqx_rule_metrics:inc(RuleId, 'rules.matched'),
+            ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId),
             {ok, handle_output_list(Outputs, Selected, Input)};
         false ->
             {error, nomatch}

+ 2 - 2
apps/emqx_rule_engine/src/emqx_rule_sqltester.erl

@@ -41,7 +41,7 @@ test(#{sql := Sql, context := Context}) ->
 
 test_rule(Sql, Select, Context, EventTopics) ->
     RuleId = iolist_to_binary(["sql_tester:", emqx_misc:gen_id(16)]),
-    ok = emqx_rule_metrics:create_rule_metrics(RuleId),
+    ok = emqx_plugin_libs_metrics:create_metrics(rule_metrics, RuleId),
     Rule = #{
         id => RuleId,
         sql => Sql,
@@ -62,7 +62,7 @@ test_rule(Sql, Select, Context, EventTopics) ->
         {ok, Data} -> {ok, flatten(Data)};
         {error, nomatch} -> {error, nomatch}
     after
-        emqx_rule_metrics:clear_rule_metrics(RuleId)
+        ok = emqx_plugin_libs_metrics:clear_metrics(rule_metrics, RuleId)
     end.
 
 get_selected_data(Selected, _Envs, _Args) ->

+ 0 - 94
apps/emqx_rule_engine/test/emqx_rule_metrics_SUITE.erl

@@ -1,94 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_rule_metrics_SUITE).
-
--compile(export_all).
--compile(nowarn_export_all).
-
--include_lib("eunit/include/eunit.hrl").
--include_lib("common_test/include/ct.hrl").
-
-all() ->
-    [ {group, metrics}
-    , {group, speed} ].
-
-suite() ->
-    [{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}].
-
-groups() ->
-    [{metrics, [sequence],
-        [ t_rule
-        , t_no_creation_1
-        ]},
-    {speed, [sequence],
-        [ rule_speed
-        ]}
-    ].
-
-init_per_suite(Config) ->
-    emqx_common_test_helpers:start_apps([emqx_conf]),
-    {ok, _} = emqx_rule_metrics:start_link(),
-    Config.
-
-end_per_suite(_Config) ->
-    catch emqx_rule_metrics:stop(),
-    emqx_common_test_helpers:stop_apps([emqx_conf]),
-    ok.
-
-init_per_testcase(_, Config) ->
-    catch emqx_rule_metrics:stop(),
-    {ok, _} = emqx_rule_metrics:start_link(),
-    Config.
-
-end_per_testcase(_, _Config) ->
-    ok.
-
-t_no_creation_1(_) ->
-    ?assertEqual(ok, emqx_rule_metrics:inc(<<"rule1">>, 'rules.matched')).
-
-t_rule(_) ->
-    ok = emqx_rule_metrics:create_rule_metrics(<<"rule1">>),
-    ok = emqx_rule_metrics:create_rule_metrics(<<"rule2">>),
-    ok = emqx_rule_metrics:inc(<<"rule1">>, 'rules.matched'),
-    ok = emqx_rule_metrics:inc(<<"rule2">>, 'rules.matched'),
-    ok = emqx_rule_metrics:inc(<<"rule2">>, 'rules.matched'),
-    ct:pal("----couters: ---~p", [persistent_term:get(emqx_rule_metrics)]),
-    ?assertEqual(1, emqx_rule_metrics:get(<<"rule1">>, 'rules.matched')),
-    ?assertEqual(2, emqx_rule_metrics:get(<<"rule2">>, 'rules.matched')),
-    ?assertEqual(0, emqx_rule_metrics:get(<<"rule3">>, 'rules.matched')),
-    ok = emqx_rule_metrics:clear_rule_metrics(<<"rule1">>),
-    ok = emqx_rule_metrics:clear_rule_metrics(<<"rule2">>).
-
-rule_speed(_) ->
-    ok = emqx_rule_metrics:create_rule_metrics(<<"rule1">>),
-    ok = emqx_rule_metrics:create_rule_metrics(<<"rule:2">>),
-    ok = emqx_rule_metrics:inc(<<"rule1">>, 'rules.matched'),
-    ok = emqx_rule_metrics:inc(<<"rule1">>, 'rules.matched'),
-    ok = emqx_rule_metrics:inc(<<"rule:2">>, 'rules.matched'),
-    ?assertEqual(2, emqx_rule_metrics:get(<<"rule1">>, 'rules.matched')),
-    ct:sleep(1000),
-    ?LET(#{max := Max, current := Current}, emqx_rule_metrics:get_rule_speed(<<"rule1">>),
-         {?assert(Max =< 2),
-          ?assert(Current =< 2)}),
-    ct:sleep(2100),
-    ?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_rule_metrics:get_rule_speed(<<"rule1">>),
-         {?assert(Max =< 2),
-          ?assert(Current == 0),
-          ?assert(Last5Min =< 0.67)}),
-    ct:sleep(3000),
-    ok = emqx_rule_metrics:clear_rule_metrics(<<"rule1">>),
-    ok = emqx_rule_metrics:clear_rule_metrics(<<"rule:2">>).