Przeglądaj źródła

Merge pull request #6696 from terry-xiaoyu/rule_metrics2

Improve rule metrics
Shawn 4 lat temu
rodzic
commit
552ea7d2fc

+ 15 - 3
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -18,6 +18,7 @@
 -behaviour(minirest_api).
 -behaviour(minirest_api).
 
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("typerefl/include/types.hrl").
+-include_lib("emqx/include/logger.hrl").
 
 
 -import(hoconsc, [mk/2, array/1, enum/1]).
 -import(hoconsc, [mk/2, array/1, enum/1]).
 
 
@@ -371,8 +372,12 @@ zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) ->
 
 
 pick_bridges_by_id(Id, BridgesAllNodes) ->
 pick_bridges_by_id(Id, BridgesAllNodes) ->
     lists:foldl(fun(BridgesOneNode, Acc) ->
     lists:foldl(fun(BridgesOneNode, Acc) ->
-            [BridgeInfo] = [Bridge || Bridge = #{id := Id0} <- BridgesOneNode, Id0 == Id],
-            [BridgeInfo | Acc]
+            case [Bridge || Bridge = #{id := Id0} <- BridgesOneNode, Id0 == Id] of
+                [BridgeInfo] -> [BridgeInfo | Acc];
+                [] ->
+                    ?SLOG(warning, #{msg => "bridge_inconsistent_in_cluster", bridge => Id}),
+                    Acc
+            end
         end, [], BridgesAllNodes).
         end, [], BridgesAllNodes).
 
 
 format_bridge_info([FirstBridge | _] = Bridges) ->
 format_bridge_info([FirstBridge | _] = Bridges) ->
@@ -418,9 +423,16 @@ format_resp(#{id := Id, raw_config := RawConf,
         name => maps:get(<<"name">>, RawConf, BridgeName),
         name => maps:get(<<"name">>, RawConf, BridgeName),
         node => node(),
         node => node(),
         status => IsConnected(Status),
         status => IsConnected(Status),
-        metrics => Metrics
+        metrics => format_metrics(Metrics)
     }.
     }.
 
 
+format_metrics(#{
+        counters := #{failed := Failed, exception := Ex, matched := Match, success := Succ},
+        rate := #{
+            matched := #{current := Rate, last5m := Rate5m, max := RateMax}
+        } }) ->
+    ?METRICS(Match, Succ, Failed + Ex, Rate, Rate5m, RateMax).
+
 rpc_multicall(Func, Args) ->
 rpc_multicall(Func, Args) ->
     Nodes = mria_mnesia:running_nodes(),
     Nodes = mria_mnesia:running_nodes(),
     ResL = erpc:multicall(Nodes, ?MODULE, Func, Args, 15000),
     ResL = erpc:multicall(Nodes, ?MODULE, Func, Args, 15000),

+ 5 - 4
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl

@@ -165,7 +165,7 @@ handle_publish(Msg, undefined) ->
     ?SLOG(error, #{msg => "cannot_publish_to_local_broker_as"
     ?SLOG(error, #{msg => "cannot_publish_to_local_broker_as"
                           "_'ingress'_is_not_configured",
                           "_'ingress'_is_not_configured",
                    message => Msg});
                    message => Msg});
-handle_publish(Msg0, Vars) ->
+handle_publish(#{properties := Props} = Msg0, Vars) ->
     Msg = format_msg_received(Msg0),
     Msg = format_msg_received(Msg0),
     ?SLOG(debug, #{msg => "publish_to_local_broker",
     ?SLOG(debug, #{msg => "publish_to_local_broker",
                    message => Msg, vars => Vars}),
                    message => Msg, vars => Vars}),
@@ -174,7 +174,7 @@ handle_publish(Msg0, Vars) ->
             _ = erlang:apply(Mod, Func, [Msg | Args]);
             _ = erlang:apply(Mod, Func, [Msg | Args]);
         _ -> ok
         _ -> ok
     end,
     end,
-    maybe_publish_to_local_broker(Msg0, Vars).
+    maybe_publish_to_local_broker(Msg, Vars, Props).
 
 
 handle_disconnected(Reason, Parent) ->
 handle_disconnected(Reason, Parent) ->
     Parent ! {disconnected, self(), Reason}.
     Parent ! {disconnected, self(), Reason}.
@@ -195,14 +195,15 @@ sub_remote_topics(ClientPid, #{remote_topic := FromTopic, remote_qos := QoS}) ->
 process_config(Config) ->
 process_config(Config) ->
     maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config).
     maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config).
 
 
-maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars) ->
+maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars,
+        Props) ->
     case maps:get(local_topic, Vars, undefined) of
     case maps:get(local_topic, Vars, undefined) of
         undefined ->
         undefined ->
             ok; %% local topic is not set, discard it
             ok; %% local topic is not set, discard it
         _ ->
         _ ->
             case emqx_topic:match(Topic, SubTopic) of
             case emqx_topic:match(Topic, SubTopic) of
                 true ->
                 true ->
-                    _ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)),
+                    _ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props)),
                     ok;
                     ok;
                 false ->
                 false ->
                     ?SLOG(warning, #{msg => "discard_message_as_topic_not_matched",
                     ?SLOG(warning, #{msg => "discard_message_as_topic_not_matched",

+ 3 - 3
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl

@@ -20,7 +20,7 @@
         , from_binary/1
         , from_binary/1
         , make_pub_vars/2
         , make_pub_vars/2
         , to_remote_msg/2
         , to_remote_msg/2
-        , to_broker_msg/2
+        , to_broker_msg/3
         , estimate_size/1
         , estimate_size/1
         ]).
         ]).
 
 
@@ -78,9 +78,9 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
     Msg#message{topic = topic(Mountpoint, Topic)}.
     Msg#message{topic = topic(Mountpoint, Topic)}.
 
 
 %% published from remote node over a MQTT connection
 %% published from remote node over a MQTT connection
-to_broker_msg(#{dup := Dup, properties := Props} = MapMsg,
+to_broker_msg(#{dup := Dup} = MapMsg,
             #{local_topic := TopicToken, payload := PayloadToken,
             #{local_topic := TopicToken, payload := PayloadToken,
-              local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) ->
+              local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}, Props) ->
     Topic = replace_vars_in_str(TopicToken, MapMsg),
     Topic = replace_vars_in_str(TopicToken, MapMsg),
     Payload = process_payload(PayloadToken, MapMsg),
     Payload = process_payload(PayloadToken, MapMsg),
     QoS = replace_simple_var(QoSToken, MapMsg),
     QoS = replace_simple_var(QoSToken, MapMsg),

+ 32 - 4
apps/emqx_connector/test/emqx_connector_api_SUITE.erl

@@ -523,7 +523,21 @@ t_ingress_mqtt_bridge_with_rules(_) ->
     %% and also the rule should be matched, with matched + 1:
     %% and also the rule should be matched, with matched + 1:
     {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
     {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
     #{ <<"id">> := RuleId
     #{ <<"id">> := RuleId
-     , <<"metrics">> := #{<<"matched">> := 1}
+     , <<"metrics">> := #{
+         <<"sql.matched">> := 1,
+         <<"sql.passed">> := 1,
+         <<"sql.failed">> := 0,
+         <<"sql.failed.exception">> := 0,
+         <<"sql.failed.no_result">> := 0,
+         <<"sql.matched.rate">> := _,
+         <<"sql.matched.rate.max">> := _,
+         <<"sql.matched.rate.last5m">> := _,
+         <<"outputs.total">> := 1,
+         <<"outputs.success">> := 1,
+         <<"outputs.failed">> := 0,
+         <<"outputs.failed.out_of_service">> := 0,
+         <<"outputs.failed.unknown">> := 0
+       }
      } = jsx:decode(Rule1),
      } = jsx:decode(Rule1),
     %% we also check if the outputs of the rule is triggered
     %% we also check if the outputs of the rule is triggered
     ?assertMatch(#{inspect := #{
     ?assertMatch(#{inspect := #{
@@ -578,7 +592,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
     ?assert(
     ?assert(
         receive
         receive
             {deliver, RemoteTopic, #message{payload = Payload}} ->
             {deliver, RemoteTopic, #message{payload = Payload}} ->
-                ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
+                ct:pal("remote broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
                 true;
                 true;
             Msg ->
             Msg ->
                 ct:pal("Msg: ~p", [Msg]),
                 ct:pal("Msg: ~p", [Msg]),
@@ -598,13 +612,27 @@ t_egress_mqtt_bridge_with_rules(_) ->
     emqx:publish(emqx_message:make(RuleTopic, Payload2)),
     emqx:publish(emqx_message:make(RuleTopic, Payload2)),
     {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
     {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
     #{ <<"id">> := RuleId
     #{ <<"id">> := RuleId
-     , <<"metrics">> := #{<<"matched">> := 1}
+     , <<"metrics">> := #{
+         <<"sql.matched">> := 1,
+         <<"sql.passed">> := 1,
+         <<"sql.failed">> := 0,
+         <<"sql.failed.exception">> := 0,
+         <<"sql.failed.no_result">> := 0,
+         <<"sql.matched.rate">> := _,
+         <<"sql.matched.rate.max">> := _,
+         <<"sql.matched.rate.last5m">> := _,
+         <<"outputs.total">> := 1,
+         <<"outputs.success">> := 1,
+         <<"outputs.failed">> := 0,
+         <<"outputs.failed.out_of_service">> := 0,
+         <<"outputs.failed.unknown">> := 0
+       }
      } = jsx:decode(Rule1),
      } = jsx:decode(Rule1),
     %% we should receive a message on the "remote" broker, with specified topic
     %% we should receive a message on the "remote" broker, with specified topic
     ?assert(
     ?assert(
         receive
         receive
             {deliver, RemoteTopic2, #message{payload = Payload2}} ->
             {deliver, RemoteTopic2, #message{payload = Payload2}} ->
-                ct:pal("local broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]),
+                ct:pal("remote broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]),
                 true;
                 true;
             Msg ->
             Msg ->
                 ct:pal("Msg: ~p", [Msg]),
                 ct:pal("Msg: ~p", [Msg]),

+ 97 - 100
apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl

@@ -28,17 +28,13 @@
         , inc/4
         , inc/4
         , get/3
         , get/3
         , get_rate/2
         , get_rate/2
-        , create_metrics/2
+        , get_counters/2
+        , create_metrics/3
+        , create_metrics/4
         , clear_metrics/2
         , clear_metrics/2
         ]).
         ]).
 
 
 -export([ get_metrics/2
 -export([ get_metrics/2
-        , get_matched/2
-        , get_success/2
-        , get_failed/2
-        , inc_matched/2
-        , inc_success/2
-        , inc_failed/2
         ]).
         ]).
 
 
 %% gen_server callbacks
 %% gen_server callbacks
@@ -61,13 +57,14 @@
 
 
 -export_type([metrics/0]).
 -export_type([metrics/0]).
 
 
+-type rate() :: #{
+    current => float(),
+    max => float(),
+    last5m => float()
+}.
 -type metrics() :: #{
 -type metrics() :: #{
-    matched => integer(),
-    success => integer(),
-    failed => integer(),
-    rate => float(),
-    rate_max => float(),
-    rate_last5m => float()
+    counters => #{atom() => integer()},
+    rate => #{atom() => rate()}
 }.
 }.
 -type handler_name() :: atom().
 -type handler_name() :: atom().
 -type metric_id() :: binary().
 -type metric_id() :: binary().
@@ -75,7 +72,6 @@
 -define(CntrRef(Name), {?MODULE, Name}).
 -define(CntrRef(Name), {?MODULE, Name}).
 -define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)).
 -define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)).
 
 
-%% the rate of 'matched'
 -record(rate, {
 -record(rate, {
             max = 0 :: number(),
             max = 0 :: number(),
             current = 0 :: number(),
             current = 0 :: number(),
@@ -107,35 +103,41 @@ child_spec(Name) ->
      , modules => [emqx_plugin_libs_metrics]
      , modules => [emqx_plugin_libs_metrics]
      }.
      }.
 
 
--spec(create_metrics(handler_name(), metric_id()) -> ok).
-create_metrics(Name, Id) ->
-    gen_server:call(Name, {create_metrics, Id}).
+-spec(create_metrics(handler_name(), metric_id(), [atom()]) -> ok | {error, term()}).
+create_metrics(Name, Id, Metrics) ->
+    create_metrics(Name, Id, Metrics, Metrics).
+
+-spec(create_metrics(handler_name(), metric_id(), [atom()], [atom()]) -> ok | {error, term()}).
+create_metrics(Name, Id, Metrics, RateMetrics) ->
+    gen_server:call(Name, {create_metrics, Id, Metrics, RateMetrics}).
 
 
 -spec(clear_metrics(handler_name(), metric_id()) -> ok).
 -spec(clear_metrics(handler_name(), metric_id()) -> ok).
 clear_metrics(Name, Id) ->
 clear_metrics(Name, Id) ->
     gen_server:call(Name, {delete_metrics, Id}).
     gen_server:call(Name, {delete_metrics, Id}).
 
 
--spec(get(handler_name(), metric_id(), atom()) -> number()).
+-spec(get(handler_name(), metric_id(), atom() | integer()) -> number()).
 get(Name, Id, Metric) ->
 get(Name, Id, Metric) ->
-    case get_couters_ref(Name, Id) of
+    case get_ref(Name, Id) of
         not_found -> 0;
         not_found -> 0;
-        Ref -> counters:get(Ref, metrics_idx(Metric))
+        Ref when is_atom(Metric) ->
+            counters:get(Ref, idx_metric(Name, Id, Metric));
+        Ref when is_integer(Metric) ->
+            counters:get(Ref, Metric)
     end.
     end.
 
 
 -spec(get_rate(handler_name(), metric_id()) -> map()).
 -spec(get_rate(handler_name(), metric_id()) -> map()).
 get_rate(Name, Id) ->
 get_rate(Name, Id) ->
     gen_server:call(Name, {get_rate, Id}).
     gen_server:call(Name, {get_rate, Id}).
 
 
+-spec(get_counters(handler_name(), metric_id()) -> map()).
+get_counters(Name, Id) ->
+    maps:map(fun(_Metric, Index) ->
+            get(Name, Id, Index)
+        end, get_indexes(Name, Id)).
+
 -spec(get_metrics(handler_name(), metric_id()) -> metrics()).
 -spec(get_metrics(handler_name(), metric_id()) -> metrics()).
 get_metrics(Name, Id) ->
 get_metrics(Name, Id) ->
-    #{max := Max, current := Current, last5m := Last5M} = get_rate(Name, Id),
-    #{matched => get_matched(Name, Id),
-      success => get_success(Name, Id),
-      failed => get_failed(Name, Id),
-      rate => Current,
-      rate_max => Max,
-      rate_last5m => Last5M
-    }.
+    #{rate => get_rate(Name, Id), counters => get_counters(Name, Id)}.
 
 
 -spec inc(handler_name(), metric_id(), atom()) -> ok.
 -spec inc(handler_name(), metric_id(), atom()) -> ok.
 inc(Name, Id, Metric) ->
 inc(Name, Id, Metric) ->
@@ -143,33 +145,7 @@ inc(Name, Id, Metric) ->
 
 
 -spec inc(handler_name(), metric_id(), atom(), pos_integer()) -> ok.
 -spec inc(handler_name(), metric_id(), atom(), pos_integer()) -> ok.
 inc(Name, Id, Metric, Val) ->
 inc(Name, Id, Metric, Val) ->
-    case get_couters_ref(Name, Id) of
-        not_found ->
-            %% this may occur when increasing a counter for
-            %% a rule that was created from a remove node.
-            create_metrics(Name, Id),
-            counters:add(get_couters_ref(Name, Id), metrics_idx(Metric), Val);
-        Ref ->
-            counters:add(Ref, metrics_idx(Metric), Val)
-    end.
-
-inc_matched(Name, Id) ->
-    inc(Name, Id, 'matched', 1).
-
-inc_success(Name, Id) ->
-    inc(Name, Id, 'success', 1).
-
-inc_failed(Name, Id) ->
-    inc(Name, Id, 'failed', 1).
-
-get_matched(Name, Id) ->
-    get(Name, Id, 'matched').
-
-get_success(Name, Id) ->
-    get(Name, Id, 'success').
-
-get_failed(Name, Id) ->
-    get(Name, Id, 'failed').
+    counters:add(get_ref(Name, Id), idx_metric(Name, Id, Metric), Val).
 
 
 start_link(Name) ->
 start_link(Name) ->
     gen_server:start_link({local, Name}, ?MODULE, Name, []).
     gen_server:start_link({local, Name}, ?MODULE, Name, []).
@@ -181,31 +157,36 @@ init(Name) ->
     persistent_term:put(?CntrRef(Name), #{}),
     persistent_term:put(?CntrRef(Name), #{}),
     {ok, #state{}}.
     {ok, #state{}}.
 
 
-handle_call({get_rate, _Id}, _From, State = #state{rates = undefined}) ->
-    {reply, format_rate(#rate{}), State};
 handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) ->
 handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) ->
     {reply, case maps:get(Id, Rates, undefined) of
     {reply, case maps:get(Id, Rates, undefined) of
-                undefined -> format_rate(#rate{});
-                Rate -> format_rate(Rate)
+                undefined -> #{};
+                RatesPerId -> format_rates_of_id(RatesPerId)
             end, State};
             end, State};
 
 
-handle_call({create_metrics, Id}, _From,
+handle_call({create_metrics, Id, Metrics, RateMetrics}, _From,
             State = #state{metric_ids = MIDs, rates = Rates}) ->
             State = #state{metric_ids = MIDs, rates = Rates}) ->
-    {reply, create_counters(get_self_name(), Id),
-     State#state{metric_ids = sets:add_element(Id, MIDs),
-                 rates =  case Rates of
-                                    undefined -> #{Id => #rate{}};
-                                    _ -> Rates#{Id => #rate{}}
-                                end}};
+    case RateMetrics -- Metrics of
+        [] ->
+            RatePerId = maps:from_list([{M, #rate{}} || M <- RateMetrics]),
+            Rate1 = case Rates of
+                undefined -> #{Id => RatePerId};
+                _ -> Rates#{Id => RatePerId}
+            end,
+            {reply, create_counters(get_self_name(), Id, Metrics),
+                State#state{metric_ids = sets:add_element(Id, MIDs),
+                            rates = Rate1}};
+        _ ->
+            {reply, {error, not_super_set_of, {RateMetrics, Metrics}}, State}
+    end;
 
 
 handle_call({delete_metrics, Id}, _From,
 handle_call({delete_metrics, Id}, _From,
             State = #state{metric_ids = MIDs, rates = Rates}) ->
             State = #state{metric_ids = MIDs, rates = Rates}) ->
     {reply, delete_counters(get_self_name(), Id),
     {reply, delete_counters(get_self_name(), Id),
      State#state{metric_ids = sets:del_element(Id, MIDs),
      State#state{metric_ids = sets:del_element(Id, MIDs),
-                 rates =  case Rates of
-                                    undefined -> undefined;
-                                    _ -> maps:remove(Id, Rates)
-                                end}};
+                 rates = case Rates of
+                        undefined -> undefined;
+                        _ -> maps:remove(Id, Rates)
+                    end}};
 
 
 handle_call(_Request, _From, State) ->
 handle_call(_Request, _From, State) ->
     {reply, ok, State}.
     {reply, ok, State}.
@@ -218,10 +199,12 @@ handle_info(ticking, State = #state{rates = undefined}) ->
     {noreply, State};
     {noreply, State};
 
 
 handle_info(ticking, State = #state{rates = Rates0}) ->
 handle_info(ticking, State = #state{rates = Rates0}) ->
-    Rates = maps:map(
-                    fun(Id, Rate) ->
-                        calculate_rate(get_matched(get_self_name(), Id), Rate)
-                    end, Rates0),
+    Rates =
+        maps:map(fun(Id, RatesPerID) ->
+            maps:map(fun(Metric, Rate) ->
+                calculate_rate(get(get_self_name(), Id, Metric), Rate)
+            end, RatesPerID)
+        end, Rates0),
     erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
     erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
     {noreply, State#state{rates = Rates}};
     {noreply, State#state{rates = Rates}};
 
 
@@ -243,29 +226,49 @@ stop(Name) ->
 %% Internal Functions
 %% Internal Functions
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 
 
-create_counters(Name, Id) ->
-    case get_couters_ref(Name, Id) of
-        not_found ->
-            Counters = get_all_counters(Name),
-            CntrRef = counters:new(max_counters_size(), [write_concurrency]),
-            persistent_term:put(?CntrRef(Name), Counters#{Id => CntrRef});
-        _Ref -> ok
-    end.
+create_counters(_Name, _Id, []) ->
+    error({create_counter_error, must_provide_a_list_of_metrics});
+create_counters(Name, Id, Metrics) ->
+    %% backup the old counters
+    OlderCounters = maps:with(Metrics, get_counters(Name, Id)),
+    %% create the new counter
+    Size = length(Metrics),
+    Indexes = maps:from_list(lists:zip(Metrics, lists:seq(1, Size))),
+    Counters = get_pterm(Name),
+    CntrRef = counters:new(Size, [write_concurrency]),
+    persistent_term:put(?CntrRef(Name),
+        Counters#{Id => #{ref => CntrRef, indexes => Indexes}}),
+    %% restore the old counters
+    lists:foreach(fun({Metric, N}) ->
+            inc(Name, Id, Metric, N)
+        end, maps:to_list(OlderCounters)).
 
 
 delete_counters(Name, Id) ->
 delete_counters(Name, Id) ->
-    persistent_term:put(?CntrRef(Name), maps:remove(Id, get_all_counters(Name))).
+    persistent_term:put(?CntrRef(Name), maps:remove(Id, get_pterm(Name))).
 
 
-get_couters_ref(Name, Id) ->
-    maps:get(Id, get_all_counters(Name), not_found).
+get_ref(Name, Id) ->
+    case maps:find(Id, get_pterm(Name)) of
+        {ok, #{ref := Ref}} -> Ref;
+        error -> not_found
+    end.
+
+idx_metric(Name, Id, Metric) ->
+    maps:get(Metric, get_indexes(Name, Id)).
 
 
-get_all_counters(Name) ->
+get_indexes(Name, Id) ->
+    case maps:find(Id, get_pterm(Name)) of
+        {ok, #{indexes := Indexes}} -> Indexes;
+        error -> #{}
+    end.
+
+get_pterm(Name) ->
     persistent_term:get(?CntrRef(Name), #{}).
     persistent_term:get(?CntrRef(Name), #{}).
 
 
 calculate_rate(_CurrVal, undefined) ->
 calculate_rate(_CurrVal, undefined) ->
     undefined;
     undefined;
 calculate_rate(CurrVal, #rate{max = MaxRate0, last_v = LastVal,
 calculate_rate(CurrVal, #rate{max = MaxRate0, last_v = LastVal,
-                                     tick = Tick, last5m_acc = AccRate5Min0,
-                                     last5m_smpl = Last5MinSamples0}) ->
+                    tick = Tick, last5m_acc = AccRate5Min0,
+                    last5m_smpl = Last5MinSamples0}) ->
     %% calculate the current rate based on the last value of the counter
     %% calculate the current rate based on the last value of the counter
     CurrRate = (CurrVal - LastVal) / ?SAMPLING,
     CurrRate = (CurrVal - LastVal) / ?SAMPLING,
 
 
@@ -292,8 +295,13 @@ calculate_rate(CurrVal, #rate{max = MaxRate0, last_v = LastVal,
                 last_v = CurrVal, last5m_acc = Acc5Min,
                 last_v = CurrVal, last5m_acc = Acc5Min,
                 last5m_smpl = Last5MinSamples, tick = Tick + 1}.
                 last5m_smpl = Last5MinSamples, tick = Tick + 1}.
 
 
+format_rates_of_id(RatesPerId) ->
+    maps:map(fun(_Metric, Rates) ->
+            format_rate(Rates)
+        end, RatesPerId).
+
 format_rate(#rate{max = Max, current = Current, last5m = Last5Min}) ->
 format_rate(#rate{max = Max, current = Current, last5m = Last5Min}) ->
-    #{max => Max, current => precision(Current, 2), last5m => precision(Last5Min, 2)}.
+    #{max => precision(Max, 2), current => precision(Current, 2), last5m => precision(Last5Min, 2)}.
 
 
 precision(Float, N) ->
 precision(Float, N) ->
     Base = math:pow(10, N),
     Base = math:pow(10, N),
@@ -302,14 +310,3 @@ precision(Float, N) ->
 get_self_name() ->
 get_self_name() ->
     {registered_name, Name} = process_info(self(), registered_name),
     {registered_name, Name} = process_info(self(), registered_name),
     Name.
     Name.
-
-%%------------------------------------------------------------------------------
-%% Metrics Definitions
-%%------------------------------------------------------------------------------
-
-max_counters_size() -> 32.
-metrics_idx('matched') -> 1;
-metrics_idx('success') -> 2;
-metrics_idx('failed') -> 3;
-metrics_idx(_) -> 32.
-

+ 94 - 22
apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl

@@ -23,22 +23,11 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("common_test/include/ct.hrl").
 
 
 all() ->
 all() ->
-    [ {group, metrics}
-    , {group, rate} ].
+    emqx_common_test_helpers:all(?MODULE).
 
 
 suite() ->
 suite() ->
     [{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}].
     [{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}].
 
 
-groups() ->
-    [{metrics, [sequence],
-        [ t_rule
-        , t_no_creation_1
-        ]},
-    {rate, [sequence],
-        [ rule_rate
-        ]}
-    ].
-
 -define(NAME, ?MODULE).
 -define(NAME, ?MODULE).
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
@@ -59,12 +48,95 @@ init_per_testcase(_, Config) ->
 end_per_testcase(_, _Config) ->
 end_per_testcase(_, _Config) ->
     ok.
     ok.
 
 
-t_no_creation_1(_) ->
-    ?assertEqual(ok, emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched')).
+t_get_metrics(_) ->
+    Metrics = [a, b, c],
+    ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics),
+    %% all the metrics are set to zero at start
+    ?assertMatch(#{
+        rate := #{
+            a := #{current := 0.0, max := 0.0, last5m := 0.0},
+            b := #{current := 0.0, max := 0.0, last5m := 0.0},
+            c := #{current := 0.0, max := 0.0, last5m := 0.0}
+        },
+        counters := #{
+            a := 0,
+            b := 0,
+            c := 0
+        }
+    }, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
+    ct:sleep(1500),
+    ?LET(#{
+        rate := #{
+            a := #{current := CurrA, max := MaxA, last5m := _},
+            b := #{current := CurrB, max := MaxB, last5m := _},
+            c := #{current := CurrC, max := MaxC, last5m := _}
+        },
+        counters := #{
+            a := 1,
+            b := 1,
+            c := 2
+        }
+    }, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>),
+    {?assert(CurrA > 0), ?assert(CurrB > 0), ?assert(CurrC > 0),
+     ?assert(MaxA > 0), ?assert(MaxB > 0), ?assert(MaxC > 0)}),
+    ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>).
+
+t_get_metrics_2(_) ->
+    Metrics = [a, b, c],
+    ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics,
+        [a]),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
+    ?assertMatch(#{
+        rate := Rate = #{
+            a := #{current := _, max := _, last5m := _}
+        },
+        counters := #{
+            a := 1,
+            b := 1,
+            c := 1
+        }
+    } when map_size(Rate) =:= 1, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)),
+    ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>).
+
+t_recreate_metrics(_) ->
+    ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, [a]),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a),
+    ?assertMatch(#{
+            rate := R = #{
+                a := #{current := _, max := _, last5m := _}
+            },
+            counters := C = #{
+                a := 1
+            }
+        } when map_size(R) == 1 andalso map_size(C) == 1,
+        emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)),
+    %% we create the metrics again, to add some counters
+    ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, [a, b, c]),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
+    ?assertMatch(#{
+            rate := R = #{
+                a := #{current := _, max := _, last5m := _},
+                b := #{current := _, max := _, last5m := _},
+                c := #{current := _, max := _, last5m := _}
+            },
+            counters := C = #{
+                a := 1, b := 1, c := 1
+            }
+        } when map_size(R) == 3 andalso map_size(C) == 3,
+        emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)),
+    ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>).
 
 
-t_rule(_) ->
-    ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>),
-    ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule2">>),
+t_inc_matched(_) ->
+    Metrics = ['rules.matched'],
+    ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>, Metrics),
+    ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule2">>, Metrics),
     ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'),
     ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'),
     ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule2">>, 'rules.matched'),
     ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule2">>, 'rules.matched'),
     ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule2">>, 'rules.matched'),
     ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule2">>, 'rules.matched'),
@@ -74,20 +146,20 @@ t_rule(_) ->
     ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule1">>),
     ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule1">>),
     ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule2">>).
     ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule2">>).
 
 
-rule_rate(_) ->
-    ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>),
-    ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule:2">>),
+t_rate(_) ->
+    ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>, ['rules.matched']),
+    ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule:2">>, ['rules.matched']),
     ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'),
     ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'),
     ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'),
     ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'),
     ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule:2">>, 'rules.matched'),
     ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule:2">>, 'rules.matched'),
     ?assertEqual(2, emqx_plugin_libs_metrics:get(?NAME, <<"rule1">>, 'rules.matched')),
     ?assertEqual(2, emqx_plugin_libs_metrics:get(?NAME, <<"rule1">>, 'rules.matched')),
     ct:sleep(1000),
     ct:sleep(1000),
-    ?LET(#{max := Max, current := Current},
+    ?LET(#{'rules.matched' := #{max := Max, current := Current}},
          emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>),
          emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>),
          {?assert(Max =< 2),
          {?assert(Max =< 2),
           ?assert(Current =< 2)}),
           ?assert(Current =< 2)}),
     ct:sleep(2100),
     ct:sleep(2100),
-    ?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>),
+    ?LET(#{'rules.matched' := #{max := Max, current := Current, last5m := Last5Min}}, emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>),
          {?assert(Max =< 2),
          {?assert(Max =< 2),
           ?assert(Current == 0),
           ?assert(Current == 0),
           ?assert(Last5Min =< 0.67)}),
           ?assert(Last5Min =< 0.67)}),

+ 8 - 4
apps/emqx_resource/src/emqx_resource.erl

@@ -205,7 +205,12 @@ query(InstId, Request, AfterQuery) ->
         {ok, #{mod := Mod, state := ResourceState, status := started}} ->
         {ok, #{mod := Mod, state := ResourceState, status := started}} ->
             %% the resource state is readonly to Module:on_query/4
             %% the resource state is readonly to Module:on_query/4
             %% and the `after_query()` functions should be thread safe
             %% and the `after_query()` functions should be thread safe
-            Mod:on_query(InstId, Request, AfterQuery, ResourceState);
+            ok = emqx_plugin_libs_metrics:inc(resource_metrics, InstId, matched),
+            try Mod:on_query(InstId, Request, AfterQuery, ResourceState)
+            catch Err:Reason:ST ->
+                emqx_plugin_libs_metrics:inc(resource_metrics, InstId, exception),
+                erlang:raise(Err, Reason, ST)
+            end;
         {error, not_found} ->
         {error, not_found} ->
             query_error(not_found, <<"the resource id not exists">>)
             query_error(not_found, <<"the resource id not exists">>)
     end.
     end.
@@ -346,9 +351,8 @@ filter_instances(Filter) ->
     [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
     [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
 
 
 inc_metrics_funcs(InstId) ->
 inc_metrics_funcs(InstId) ->
-    OnFailed = [{fun emqx_plugin_libs_metrics:inc_failed/2, [resource_metrics, InstId]}],
-    OnSucc = [ {fun emqx_plugin_libs_metrics:inc_matched/2, [resource_metrics, InstId]}
-             , {fun emqx_plugin_libs_metrics:inc_success/2, [resource_metrics, InstId]}
+    OnFailed = [{fun emqx_plugin_libs_metrics:inc/3, [resource_metrics, InstId, failed]}],
+    OnSucc = [ {fun emqx_plugin_libs_metrics:inc/3, [resource_metrics, InstId, success]}
              ],
              ],
     {OnSucc, OnFailed}.
     {OnSucc, OnFailed}.
 
 

+ 2 - 1
apps/emqx_resource/src/emqx_resource_instance.erl

@@ -176,7 +176,8 @@ do_create(InstId, ResourceType, Config, Opts) ->
         {error, not_found} ->
         {error, not_found} ->
             case do_start(InstId, ResourceType, Config, Opts) of
             case do_start(InstId, ResourceType, Config, Opts) of
                 ok ->
                 ok ->
-                    ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId),
+                    ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId,
+                            [matched, success, failed, exception], [matched]),
                     {ok, force_lookup(InstId)};
                     {ok, force_lookup(InstId)};
                 Error ->
                 Error ->
                     Error
                     Error

+ 34 - 4
apps/emqx_rule_engine/src/emqx_rule_api_schema.erl

@@ -81,11 +81,41 @@ fields("rule_test") ->
     ];
     ];
 
 
 fields("metrics") ->
 fields("metrics") ->
-    [ {"matched", sc(integer(), #{desc => "How much times this rule is matched"})}
-    , {"rate", sc(float(), #{desc => "The rate of matched, times/second"})}
-    , {"rate_max", sc(float(), #{desc => "The max rate of matched, times/second"})}
-    , {"rate_last5m", sc(float(),
+    [ {"sql.matched", sc(integer(), #{
+            desc => "How much times the FROM clause of the SQL is matched."
+        })}
+    , {"sql.matched.rate", sc(float(), #{desc => "The rate of matched, times/second"})}
+    , {"sql.matched.rate.max", sc(float(), #{desc => "The max rate of matched, times/second"})}
+    , {"sql.matched.rate.last5m", sc(float(),
         #{desc => "The average rate of matched in last 5 mins, times/second"})}
         #{desc => "The average rate of matched in last 5 mins, times/second"})}
+    , {"sql.passed", sc(integer(), #{desc => "How much times the SQL is passed"})}
+    , {"sql.failed", sc(integer(), #{desc => "How much times the SQL is failed"})}
+    , {"sql.failed.exception", sc(integer(), #{
+            desc => "How much times the SQL is failed due to exceptions. "
+                    "This may because of a crash when calling a SQL function, or "
+                    "trying to do arithmetic operation on undefined variables"
+        })}
+    , {"sql.failed.unknown", sc(integer(), #{
+            desc => "How much times the SQL is failed due to an unknown error."
+        })}
+    , {"outputs.total", sc(integer(), #{
+            desc => "How much times the outputs are called by the rule. "
+                    "This value may serveral times of 'sql.matched', depending on the "
+                    "number of the outputs of the rule."
+        })}
+    , {"outputs.success", sc(integer(), #{
+            desc => "How much times the rule success to call the outputs."
+        })}
+    , {"outputs.failed", sc(integer(), #{
+            desc => "How much times the rule failed to call the outputs."
+        })}
+    , {"outputs.failed.out_of_service", sc(integer(), #{
+            desc => "How much times the rule failed to call outputs due to the output is "
+                    "out of service. For example, a bridge is disabled or stopped."
+        })}
+    , {"outputs.failed.unknown", sc(integer(), #{
+            desc => "How much times the rule failed to call outputs due to to an unknown error."
+        })}
     ];
     ];
 
 
 fields("node_metrics") ->
 fields("node_metrics") ->

+ 22 - 6
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -71,6 +71,22 @@
 
 
 -define(T_CALL, infinity).
 -define(T_CALL, infinity).
 
 
+%% NOTE: This order cannot be changed! This is to make the metric working during relup.
+%% Append elements to this list to add new metrics.
+-define(METRICS, [ 'sql.matched'
+                 , 'sql.passed'
+                 , 'sql.failed'
+                 , 'sql.failed.exception'
+                 , 'sql.failed.no_result'
+                 , 'outputs.total'
+                 , 'outputs.success'
+                 , 'outputs.failed'
+                 , 'outputs.failed.out_of_service'
+                 , 'outputs.failed.unknown'
+                ]).
+
+-define(RATE_METRICS, ['sql.matched']).
+
 config_key_path() ->
 config_key_path() ->
     [rule_engine, rules].
     [rule_engine, rules].
 
 
@@ -103,7 +119,7 @@ post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) ->
 load_rules() ->
 load_rules() ->
     maps_foreach(fun({Id, Rule}) ->
     maps_foreach(fun({Id, Rule}) ->
             {ok, _} = create_rule(Rule#{id => bin(Id)})
             {ok, _} = create_rule(Rule#{id => bin(Id)})
-        end, emqx_conf:get([rule_engine, rules], #{})).
+        end, emqx:get_config([rule_engine, rules], #{})).
 
 
 -spec create_rule(map()) -> {ok, rule()} | {error, term()}.
 -spec create_rule(map()) -> {ok, rule()} | {error, term()}.
 create_rule(Params = #{id := RuleId}) when is_binary(RuleId) ->
 create_rule(Params = #{id := RuleId}) when is_binary(RuleId) ->
@@ -162,10 +178,10 @@ get_rule(Id) ->
 load_hooks_for_rule(#{from := Topics}) ->
 load_hooks_for_rule(#{from := Topics}) ->
     lists:foreach(fun emqx_rule_events:load/1, Topics).
     lists:foreach(fun emqx_rule_events:load/1, Topics).
 
 
-add_metrics_for_rule(#{id := Id}) ->
-    ok = emqx_plugin_libs_metrics:create_metrics(rule_metrics, Id).
+add_metrics_for_rule(Id) ->
+    ok = emqx_plugin_libs_metrics:create_metrics(rule_metrics, Id, ?METRICS, ?RATE_METRICS).
 
 
-clear_metrics_for_rule(#{id := Id}) ->
+clear_metrics_for_rule(Id) ->
     ok = emqx_plugin_libs_metrics:clear_metrics(rule_metrics, Id).
     ok = emqx_plugin_libs_metrics:clear_metrics(rule_metrics, Id).
 
 
 unload_hooks_for_rule(#{id := Id, from := Topics}) ->
 unload_hooks_for_rule(#{id := Id, from := Topics}) ->
@@ -243,7 +259,7 @@ do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) ->
 
 
 do_insert_rule(#{id := Id} = Rule) ->
 do_insert_rule(#{id := Id} = Rule) ->
     ok = load_hooks_for_rule(Rule),
     ok = load_hooks_for_rule(Rule),
-    ok = add_metrics_for_rule(Rule),
+    ok = add_metrics_for_rule(Id),
     true = ets:insert(?RULE_TAB, {Id, maps:remove(id, Rule)}),
     true = ets:insert(?RULE_TAB, {Id, maps:remove(id, Rule)}),
     ok.
     ok.
 
 
@@ -251,7 +267,7 @@ do_delete_rule(RuleId) ->
     case get_rule(RuleId) of
     case get_rule(RuleId) of
         {ok, Rule} ->
         {ok, Rule} ->
             ok = unload_hooks_for_rule(Rule),
             ok = unload_hooks_for_rule(Rule),
-            ok = clear_metrics_for_rule(Rule),
+            ok = clear_metrics_for_rule(RuleId),
             true = ets:delete(?RULE_TAB, RuleId),
             true = ets:delete(?RULE_TAB, RuleId),
             ok;
             ok;
         not_found -> ok
         not_found -> ok

+ 66 - 18
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -43,6 +43,40 @@
         {error, REASON} ->
         {error, REASON} ->
             {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(REASON)}}
             {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(REASON)}}
     end).
     end).
+-define(METRICS(MATCH, PASS, FAIL, FAIL_EX, FAIL_NORES, O_TOTAL, O_FAIL, O_FAIL_OOS,
+        O_FAIL_UNKNOWN, O_SUCC, RATE, RATE_MAX, RATE_5),
+    #{
+        'sql.matched' => MATCH,
+        'sql.passed' => PASS,
+        'sql.failed' => FAIL,
+        'sql.failed.exception' => FAIL_EX,
+        'sql.failed.no_result' => FAIL_NORES,
+        'outputs.total' => O_TOTAL,
+        'outputs.failed' => O_FAIL,
+        'outputs.failed.out_of_service' => O_FAIL_OOS,
+        'outputs.failed.unknown' => O_FAIL_UNKNOWN,
+        'outputs.success' => O_SUCC,
+        'sql.matched.rate' => RATE,
+        'sql.matched.rate.max' => RATE_MAX,
+        'sql.matched.rate.last5m' => RATE_5
+    }).
+-define(metrics(MATCH, PASS, FAIL, FAIL_EX, FAIL_NORES, O_TOTAL, O_FAIL, O_FAIL_OOS,
+        O_FAIL_UNKNOWN, O_SUCC, RATE, RATE_MAX, RATE_5),
+    #{
+        'sql.matched' := MATCH,
+        'sql.passed' := PASS,
+        'sql.failed' := FAIL,
+        'sql.failed.exception' := FAIL_EX,
+        'sql.failed.no_result' := FAIL_NORES,
+        'outputs.total' := O_TOTAL,
+        'outputs.failed' := O_FAIL,
+        'outputs.failed.out_of_service' := O_FAIL_OOS,
+        'outputs.failed.unknown' := O_FAIL_UNKNOWN,
+        'outputs.success' := O_SUCC,
+        'sql.matched.rate' := RATE,
+        'sql.matched.rate.max' := RATE_MAX,
+        'sql.matched.rate.last5m' := RATE_5
+    }).
 
 
 namespace() -> "rule".
 namespace() -> "rule".
 
 
@@ -212,7 +246,7 @@ param_path_id() ->
 
 
 '/rules/:id'(delete, #{bindings := #{id := Id}}) ->
 '/rules/:id'(delete, #{bindings := #{id := Id}}) ->
     ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
     ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
-    case emqx:remove_config(ConfPath, #{}) of
+    case emqx_conf:remove(ConfPath, #{}) of
         {ok, _} -> {204};
         {ok, _} -> {204};
         {error, Reason} ->
         {error, Reason} ->
             ?SLOG(error, #{msg => "delete_rule_failed",
             ?SLOG(error, #{msg => "delete_rule_failed",
@@ -268,17 +302,23 @@ printable_function_name(Mod, Func) ->
     list_to_binary(lists:concat([Mod,":",Func])).
     list_to_binary(lists:concat([Mod,":",Func])).
 
 
 get_rule_metrics(Id) ->
 get_rule_metrics(Id) ->
-    Format = fun (Node, #{matched := Matched,
-                          rate := Current,
-                          rate_max := Max,
-                          rate_last5m := Last5M
-                        }) ->
-        #{ metrics => #{
-            matched => Matched,
-            rate => Current,
-            rate_max => Max,
-            rate_last5m => Last5M
-           }
+    Format = fun (Node, #{
+            counters :=
+                #{'sql.matched' := Matched, 'sql.passed' := Passed, 'sql.failed' := Failed,
+                 'sql.failed.exception' := FailedEx,
+                 'sql.failed.no_result' := FailedNoRes,
+                 'outputs.total' := OTotal,
+                 'outputs.failed' := OFailed,
+                 'outputs.failed.out_of_service' := OFailedOOS,
+                 'outputs.failed.unknown' := OFailedUnknown,
+                 'outputs.success' := OFailedSucc
+                 },
+            rate :=
+                #{'sql.matched' :=
+                    #{current := Current, max := Max, last5m := Last5M}
+                 }}) ->
+        #{ metrics => ?METRICS(Matched, Passed, Failed, FailedEx, FailedNoRes,
+            OTotal, OFailed, OFailedOOS, OFailedUnknown, OFailedSucc, Current, Max, Last5M)
          , node => Node
          , node => Node
          }
          }
     end,
     end,
@@ -286,13 +326,21 @@ get_rule_metrics(Id) ->
      || Node <- mria_mnesia:running_nodes()].
      || Node <- mria_mnesia:running_nodes()].
 
 
 aggregate_metrics(AllMetrics) ->
 aggregate_metrics(AllMetrics) ->
-    InitMetrics = #{matched => 0, rate => 0, rate_max => 0, rate_last5m => 0},
+    InitMetrics = ?METRICS(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0),
     lists:foldl(fun
     lists:foldl(fun
-        (#{metrics := #{matched := Match1, rate := Rate1,
-                        rate_max := RateMax1, rate_last5m := Rate5m1}},
-         #{matched := Match0, rate := Rate0, rate_max := RateMax0, rate_last5m := Rate5m0}) ->
-            #{matched => Match1 + Match0, rate => Rate1 + Rate0,
-              rate_max => RateMax1 + RateMax0, rate_last5m => Rate5m1 + Rate5m0}
+        (#{metrics := ?metrics(Match1, Passed1, Failed1, FailedEx1, FailedNoRes1,
+             OTotal1, OFailed1, OFailedOOS1, OFailedUnknown1, OFailedSucc1,
+             Rate1, RateMax1, Rate5m1)},
+          ?metrics(Match0, Passed0, Failed0, FailedEx0, FailedNoRes0,
+             OTotal0, OFailed0, OFailedOOS0, OFailedUnknown0, OFailedSucc0,
+             Rate0, RateMax0, Rate5m0)) ->
+        ?METRICS(Match1 + Match0, Passed1 + Passed0, Failed1 + Failed0,
+             FailedEx1 + FailedEx0, FailedNoRes1 + FailedNoRes0,
+             OTotal1 + OTotal0, OFailed1 + OFailed0,
+             OFailedOOS1 + OFailedOOS0,
+             OFailedUnknown1 + OFailedUnknown0,
+             OFailedSucc1 + OFailedSucc0,
+             Rate1 + Rate0, RateMax1 + RateMax0, Rate5m1 + Rate5m0)
         end, InitMetrics, AllMetrics).
         end, InitMetrics, AllMetrics).
 
 
 get_one_rule(AllRules, Id) ->
 get_one_rule(AllRules, Id) ->

+ 33 - 12
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -55,18 +55,23 @@ apply_rules([Rule = #{id := RuleID}|More], Input) ->
     catch
     catch
         %% ignore the errors if select or match failed
         %% ignore the errors if select or match failed
         _:{select_and_transform_error, Error} ->
         _:{select_and_transform_error, Error} ->
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
             ?SLOG(warning, #{msg => "SELECT_clause_exception",
             ?SLOG(warning, #{msg => "SELECT_clause_exception",
                              rule_id => RuleID, reason => Error});
                              rule_id => RuleID, reason => Error});
         _:{match_conditions_error, Error} ->
         _:{match_conditions_error, Error} ->
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
             ?SLOG(warning, #{msg => "WHERE_clause_exception",
             ?SLOG(warning, #{msg => "WHERE_clause_exception",
                              rule_id => RuleID, reason => Error});
                              rule_id => RuleID, reason => Error});
         _:{select_and_collect_error, Error} ->
         _:{select_and_collect_error, Error} ->
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
             ?SLOG(warning, #{msg => "FOREACH_clause_exception",
             ?SLOG(warning, #{msg => "FOREACH_clause_exception",
                              rule_id => RuleID, reason => Error});
                              rule_id => RuleID, reason => Error});
         _:{match_incase_error, Error} ->
         _:{match_incase_error, Error} ->
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
             ?SLOG(warning, #{msg => "INCASE_clause_exception",
             ?SLOG(warning, #{msg => "INCASE_clause_exception",
                              rule_id => RuleID, reason => Error});
                              rule_id => RuleID, reason => Error});
         Class:Error:StkTrace ->
         Class:Error:StkTrace ->
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
             ?SLOG(error, #{msg => "apply_rule_failed",
             ?SLOG(error, #{msg => "apply_rule_failed",
                            rule_id => RuleID,
                            rule_id => RuleID,
                            exception => Class,
                            exception => Class,
@@ -81,6 +86,7 @@ apply_rule_discard_result(Rule, Input) ->
     ok.
     ok.
 
 
 apply_rule(Rule = #{id := RuleID}, Input) ->
 apply_rule(Rule = #{id := RuleID}, Input) ->
+    ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.matched'),
     clear_rule_payload(),
     clear_rule_payload(),
     do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})).
     do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})).
 
 
@@ -99,10 +105,16 @@ do_apply_rule(#{
     case ?RAISE(match_conditions(Conditions, ColumnsAndSelected),
     case ?RAISE(match_conditions(Conditions, ColumnsAndSelected),
                 {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
                 {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
         true ->
         true ->
-            ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId),
             Collection2 = filter_collection(Input, InCase, DoEach, Collection),
             Collection2 = filter_collection(Input, InCase, DoEach, Collection),
+            case Collection2 of
+                [] ->
+                    ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.failed.no_result');
+                _ ->
+                    ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.passed')
+            end,
             {ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]};
             {ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]};
         false ->
         false ->
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.failed.no_result'),
             {error, nomatch}
             {error, nomatch}
     end;
     end;
 
 
@@ -117,9 +129,10 @@ do_apply_rule(#{id := RuleId,
     case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)),
     case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)),
                 {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
                 {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
         true ->
         true ->
-            ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId),
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.passed'),
             {ok, handle_output_list(RuleId, Outputs, Selected, Input)};
             {ok, handle_output_list(RuleId, Outputs, Selected, Input)};
         false ->
         false ->
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.failed.no_result'),
             {error, nomatch}
             {error, nomatch}
     end.
     end.
 
 
@@ -235,24 +248,32 @@ handle_output_list(RuleId, Outputs, Selected, Envs) ->
     [handle_output(RuleId, Out, Selected, Envs) || Out <- Outputs].
     [handle_output(RuleId, Out, Selected, Envs) || Out <- Outputs].
 
 
 handle_output(RuleId, OutId, Selected, Envs) ->
 handle_output(RuleId, OutId, Selected, Envs) ->
+    ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.total'),
     try
     try
-        do_handle_output(OutId, Selected, Envs)
+        Result = do_handle_output(OutId, Selected, Envs),
+        ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.success'),
+        Result
     catch
     catch
+        throw:out_of_service ->
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed'),
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed.out_of_service'),
+            ?SLOG(warning, #{msg => "out_of_service", output => OutId});
         Err:Reason:ST ->
         Err:Reason:ST ->
-            ok = emqx_plugin_libs_metrics:inc_failed(rule_metrics, RuleId),
-            Level = case Err of throw -> debug; _ -> error end,
-            ?SLOG(Level, #{msg => "output_failed",
-                           output => OutId,
-                           exception => Err,
-                           reason => Reason,
-                           stacktrace => ST
-                          })
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed'),
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed.unknown'),
+            ?SLOG(error, #{msg => "output_failed", output => OutId, exception => Err,
+                           reason => Reason, stacktrace => ST})
     end.
     end.
 
 
 do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) ->
 do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) ->
     ?TRACE("BRIDGE", "output_to_bridge", #{bridge_id => BridgeId}),
     ?TRACE("BRIDGE", "output_to_bridge", #{bridge_id => BridgeId}),
-    emqx_bridge:send_message(BridgeId, Selected);
+    case emqx_bridge:send_message(BridgeId, Selected) of
+        {error, {Err, _}} when Err == bridge_not_found; Err == bridge_stopped ->
+            throw(out_of_service);
+        Result -> Result
+    end;
 do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
 do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
+    %% the function can also throw 'out_of_service'
     Mod:Func(Selected, Envs, Args).
     Mod:Func(Selected, Envs, Args).
 
 
 eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
 eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->

+ 2 - 2
apps/emqx_rule_engine/src/emqx_rule_sqltester.erl

@@ -41,7 +41,7 @@ test(#{sql := Sql, context := Context}) ->
 
 
 test_rule(Sql, Select, Context, EventTopics) ->
 test_rule(Sql, Select, Context, EventTopics) ->
     RuleId = iolist_to_binary(["sql_tester:", emqx_misc:gen_id(16)]),
     RuleId = iolist_to_binary(["sql_tester:", emqx_misc:gen_id(16)]),
-    ok = emqx_plugin_libs_metrics:create_metrics(rule_metrics, RuleId),
+    ok = emqx_rule_engine:add_metrics_for_rule(RuleId),
     Rule = #{
     Rule = #{
         id => RuleId,
         id => RuleId,
         sql => Sql,
         sql => Sql,
@@ -62,7 +62,7 @@ test_rule(Sql, Select, Context, EventTopics) ->
         {ok, Data} -> {ok, flatten(Data)};
         {ok, Data} -> {ok, flatten(Data)};
         {error, nomatch} -> {error, nomatch}
         {error, nomatch} -> {error, nomatch}
     after
     after
-        ok = emqx_plugin_libs_metrics:clear_metrics(rule_metrics, RuleId)
+        ok = emqx_rule_engine:clear_metrics_for_rule(RuleId)
     end.
     end.
 
 
 get_selected_data(Selected, _Envs, _Args) ->
 get_selected_data(Selected, _Envs, _Args) ->