Browse Source

refactor(rule): add more metrics for rule and bridges

Shawn 4 years ago
parent
commit
67a60e1153

+ 8 - 1
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -418,9 +418,16 @@ format_resp(#{id := Id, raw_config := RawConf,
         name => maps:get(<<"name">>, RawConf, BridgeName),
         node => node(),
         status => IsConnected(Status),
-        metrics => Metrics
+        metrics => format_metrics(Metrics)
     }.
 
+format_metrics(#{
+        counters := #{failed := Failed, exception := Ex, matched := Match, success := Succ},
+        rate := #{
+            matched := #{current := Rate, last5m := Rate5m, max := RateMax}
+        } }) ->
+    ?METRICS(Match, Succ, Failed + Ex, Rate, Rate5m, RateMax).
+
 rpc_multicall(Func, Args) ->
     Nodes = mria_mnesia:running_nodes(),
     ResL = erpc:multicall(Nodes, ?MODULE, Func, Args, 15000),

+ 32 - 4
apps/emqx_connector/test/emqx_connector_api_SUITE.erl

@@ -523,7 +523,21 @@ t_ingress_mqtt_bridge_with_rules(_) ->
     %% and also the rule should be matched, with matched + 1:
     {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
     #{ <<"id">> := RuleId
-     , <<"metrics">> := #{<<"matched">> := 1}
+     , <<"metrics">> := #{
+         <<"matched">> := 1,
+         <<"passed">> := 1,
+         <<"failed">> := 0,
+         <<"failed.exception">> := 0,
+         <<"failed.no_result">> := 0,
+         <<"rate">> := _,
+         <<"rate_max">> := _,
+         <<"rate_last5m">> := _,
+         <<"outputs.total">> := 1,
+         <<"outputs.success">> := 1,
+         <<"outputs.failed">> := 0,
+         <<"outputs.failed.out_of_service">> := 0,
+         <<"outputs.failed.unknown">> := 0
+       }
      } = jsx:decode(Rule1),
     %% we also check if the outputs of the rule is triggered
     ?assertMatch(#{inspect := #{
@@ -578,7 +592,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
     ?assert(
         receive
             {deliver, RemoteTopic, #message{payload = Payload}} ->
-                ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
+                ct:pal("remote broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
                 true;
             Msg ->
                 ct:pal("Msg: ~p", [Msg]),
@@ -598,13 +612,27 @@ t_egress_mqtt_bridge_with_rules(_) ->
     emqx:publish(emqx_message:make(RuleTopic, Payload2)),
     {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
     #{ <<"id">> := RuleId
-     , <<"metrics">> := #{<<"matched">> := 1}
+     , <<"metrics">> := #{
+         <<"matched">> := 1,
+         <<"passed">> := 1,
+         <<"failed">> := 0,
+         <<"failed.exception">> := 0,
+         <<"failed.no_result">> := 0,
+         <<"rate">> := _,
+         <<"rate_max">> := _,
+         <<"rate_last5m">> := _,
+         <<"outputs.total">> := 1,
+         <<"outputs.success">> := 1,
+         <<"outputs.failed">> := 0,
+         <<"outputs.failed.out_of_service">> := 0,
+         <<"outputs.failed.unknown">> := 0
+       }
      } = jsx:decode(Rule1),
     %% we should receive a message on the "remote" broker, with specified topic
     ?assert(
         receive
             {deliver, RemoteTopic2, #message{payload = Payload2}} ->
-                ct:pal("local broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]),
+                ct:pal("remote broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]),
                 true;
             Msg ->
                 ct:pal("Msg: ~p", [Msg]),

+ 22 - 18
apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl

@@ -28,7 +28,7 @@
         , inc/4
         , get/3
         , get_rate/2
-        , get_all_counters/2
+        , get_counters/2
         , create_metrics/3
         , create_metrics/4
         , clear_metrics/2
@@ -129,15 +129,15 @@ get(Name, Id, Metric) ->
 get_rate(Name, Id) ->
     gen_server:call(Name, {get_rate, Id}).
 
--spec(get_all_counters(handler_name(), metric_id()) -> map()).
-get_all_counters(Name, Id) ->
+-spec(get_counters(handler_name(), metric_id()) -> map()).
+get_counters(Name, Id) ->
     maps:map(fun(_Metric, Index) ->
             get(Name, Id, Index)
         end, get_indexes(Name, Id)).
 
 -spec(get_metrics(handler_name(), metric_id()) -> metrics()).
 get_metrics(Name, Id) ->
-    #{rate => get_rate(Name, Id), counters => get_all_counters(Name, Id)}.
+    #{rate => get_rate(Name, Id), counters => get_counters(Name, Id)}.
 
 -spec inc(handler_name(), metric_id(), atom()) -> ok.
 inc(Name, Id, Metric) ->
@@ -145,7 +145,7 @@ inc(Name, Id, Metric) ->
 
 -spec inc(handler_name(), metric_id(), atom(), pos_integer()) -> ok.
 inc(Name, Id, Metric, Val) ->
-    counters:add(get_ref(Name, Id), idx_metric(Name, Id,Metric), Val).
+    counters:add(get_ref(Name, Id), idx_metric(Name, Id, Metric), Val).
 
 start_link(Name) ->
     gen_server:start_link({local, Name}, ?MODULE, Name, []).
@@ -176,8 +176,8 @@ handle_call({create_metrics, Id, Metrics, RateMetrics}, _From,
                 State#state{metric_ids = sets:add_element(Id, MIDs),
                             rates = Rate1}};
         _ ->
-            {reply, {error, metrics_to}, State}
-    end.
+            {reply, {error, not_super_set_of, {RateMetrics, Metrics}}, State}
+    end;
 
 handle_call({delete_metrics, Id}, _From,
             State = #state{metric_ids = MIDs, rates = Rates}) ->
@@ -229,35 +229,39 @@ stop(Name) ->
 create_counters(_Name, _Id, []) ->
     error({create_counter_error, must_provide_a_list_of_metrics});
 create_counters(Name, Id, Metrics) ->
+    %% backup the old counters
+    OlderCounters = maps:with(Metrics, get_counters(Name, Id)),
+    %% create the new counter
     Size = length(Metrics),
     Indexes = maps:from_list(lists:zip(Metrics, lists:seq(1, Size))),
-    Counters = get_counters(Name),
+    Counters = get_pterm(Name),
     CntrRef = counters:new(Size, [write_concurrency]),
     persistent_term:put(?CntrRef(Name),
-        Counters#{Id => #{ref => CntrRef, indexes => Indexes}}).
+        Counters#{Id => #{ref => CntrRef, indexes => Indexes}}),
+    %% restore the old counters
+    lists:foreach(fun({Metric, N}) ->
+            inc(Name, Id, Metric, N)
+        end, maps:to_list(OlderCounters)).
 
 delete_counters(Name, Id) ->
-    persistent_term:put(?CntrRef(Name), maps:remove(Id, get_counters(Name))).
+    persistent_term:put(?CntrRef(Name), maps:remove(Id, get_pterm(Name))).
 
 get_ref(Name, Id) ->
-    case maps:find(Id, get_counters(Name)) of
+    case maps:find(Id, get_pterm(Name)) of
         {ok, #{ref := Ref}} -> Ref;
         error -> not_found
     end.
 
 idx_metric(Name, Id, Metric) ->
-    case get_indexes(Name, Id) of
-        not_found -> not_found;
-        Indexes -> maps:get(Metric, Indexes, not_found)
-    end.
+    maps:get(Metric, get_indexes(Name, Id)).
 
 get_indexes(Name, Id) ->
-    case maps:find(Id, get_counters(Name)) of
+    case maps:find(Id, get_pterm(Name)) of
         {ok, #{indexes := Indexes}} -> Indexes;
-        error -> not_found
+        error -> #{}
     end.
 
-get_counters(Name) ->
+get_pterm(Name) ->
     persistent_term:get(?CntrRef(Name), #{}).
 
 calculate_rate(_CurrVal, undefined) ->

+ 29 - 0
apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl

@@ -104,6 +104,35 @@ t_get_metrics_2(_) ->
     } when map_size(Rate) =:= 1, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)),
     ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>).
 
+t_recreate_metrics(_) ->
+    ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, [a]),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a),
+    ?assertMatch(#{
+            rate := R = #{
+                a := #{current := _, max := _, last5m := _}
+            },
+            counters := C = #{
+                a := 1
+            }
+        } when map_size(R) == 1 andalso map_size(C) == 1,
+        emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)),
+    %% we create the metrics again, to add some counters
+    ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, [a, b, c]),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b),
+    ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
+    ?assertMatch(#{
+            rate := R = #{
+                a := #{current := _, max := _, last5m := _},
+                b := #{current := _, max := _, last5m := _},
+                c := #{current := _, max := _, last5m := _}
+            },
+            counters := C = #{
+                a := 1, b := 1, c := 1
+            }
+        } when map_size(R) == 3 andalso map_size(C) == 3,
+        emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)),
+    ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>).
+
 t_inc_matched(_) ->
     Metrics = ['rules.matched'],
     ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>, Metrics),

+ 8 - 4
apps/emqx_resource/src/emqx_resource.erl

@@ -205,7 +205,12 @@ query(InstId, Request, AfterQuery) ->
         {ok, #{mod := Mod, state := ResourceState, status := started}} ->
             %% the resource state is readonly to Module:on_query/4
             %% and the `after_query()` functions should be thread safe
-            Mod:on_query(InstId, Request, AfterQuery, ResourceState);
+            ok = emqx_plugin_libs_metrics:inc(resource_metrics, InstId, matched),
+            try Mod:on_query(InstId, Request, AfterQuery, ResourceState)
+            catch Err:Reason:ST ->
+                emqx_plugin_libs_metrics:inc(resource_metrics, InstId, exception),
+                erlang:raise(Err, Reason, ST)
+            end;
         {error, not_found} ->
             query_error(not_found, <<"the resource id not exists">>)
     end.
@@ -346,9 +351,8 @@ filter_instances(Filter) ->
     [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
 
 inc_metrics_funcs(InstId) ->
-    OnFailed = [{fun emqx_plugin_libs_metrics:inc/2, [resource_metrics, InstId, failed]}],
-    OnSucc = [ {fun emqx_plugin_libs_metrics:inc/2, [resource_metrics, InstId, matched]}
-             , {fun emqx_plugin_libs_metrics:inc/2, [resource_metrics, success]}
+    OnFailed = [{fun emqx_plugin_libs_metrics:inc/3, [resource_metrics, InstId, failed]}],
+    OnSucc = [ {fun emqx_plugin_libs_metrics:inc/3, [resource_metrics, InstId, success]}
              ],
     {OnSucc, OnFailed}.
 

+ 1 - 1
apps/emqx_resource/src/emqx_resource_instance.erl

@@ -177,7 +177,7 @@ do_create(InstId, ResourceType, Config, Opts) ->
             case do_start(InstId, ResourceType, Config, Opts) of
                 ok ->
                     ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId,
-                            [matched, success, failed], [matched]),
+                            [matched, success, failed, exception], [matched]),
                     {ok, force_lookup(InstId)};
                 Error ->
                     Error

+ 13 - 5
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -71,6 +71,14 @@
 
 -define(T_CALL, infinity).
 
+%% NOTE: This order cannot be changed! This is to make the metric working during relup.
+%% Append elements to this list to add new metrics.
+-define(METRICS, ['matched', 'passed', 'failed', 'failed.exception', 'failed.no_result',
+    'outputs.total', 'outputs.success', 'outputs.failed', 'outputs.failed.out_of_service',
+    'outputs.failed.unknown']).
+
+-define(RATE_METRICS, ['matched']).
+
 config_key_path() ->
     [rule_engine, rules].
 
@@ -162,10 +170,10 @@ get_rule(Id) ->
 load_hooks_for_rule(#{from := Topics}) ->
     lists:foreach(fun emqx_rule_events:load/1, Topics).
 
-add_metrics_for_rule(#{id := Id}) ->
-    ok = emqx_plugin_libs_metrics:create_metrics(rule_metrics, Id).
+add_metrics_for_rule(Id) ->
+    ok = emqx_plugin_libs_metrics:create_metrics(rule_metrics, Id, ?METRICS, ?RATE_METRICS).
 
-clear_metrics_for_rule(#{id := Id}) ->
+clear_metrics_for_rule(Id) ->
     ok = emqx_plugin_libs_metrics:clear_metrics(rule_metrics, Id).
 
 unload_hooks_for_rule(#{id := Id, from := Topics}) ->
@@ -243,7 +251,7 @@ do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) ->
 
 do_insert_rule(#{id := Id} = Rule) ->
     ok = load_hooks_for_rule(Rule),
-    ok = add_metrics_for_rule(Rule),
+    ok = add_metrics_for_rule(Id),
     true = ets:insert(?RULE_TAB, {Id, maps:remove(id, Rule)}),
     ok.
 
@@ -251,7 +259,7 @@ do_delete_rule(RuleId) ->
     case get_rule(RuleId) of
         {ok, Rule} ->
             ok = unload_hooks_for_rule(Rule),
-            ok = clear_metrics_for_rule(Rule),
+            ok = clear_metrics_for_rule(RuleId),
             true = ets:delete(?RULE_TAB, RuleId),
             ok;
         not_found -> ok

+ 65 - 17
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -43,6 +43,40 @@
         {error, REASON} ->
             {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(REASON)}}
     end).
+-define(METRICS(MATCH, PASS, FAIL, FAIL_EX, FAIL_NORES, O_TOTAL, O_FAIL, O_FAIL_OOS,
+        O_FAIL_UNKNOWN, O_SUCC, RATE, RATE_MAX, RATE_5),
+    #{
+        matched => MATCH,
+        passed => PASS,
+        failed => FAIL,
+        'failed.exception' => FAIL_EX,
+        'failed.no_result' => FAIL_NORES,
+        'outputs.total' => O_TOTAL,
+        'outputs.failed' => O_FAIL,
+        'outputs.failed.out_of_service' => O_FAIL_OOS,
+        'outputs.failed.unknown' => O_FAIL_UNKNOWN,
+        'outputs.success' => O_SUCC,
+        rate => RATE,
+        rate_max => RATE_MAX,
+        rate_last5m => RATE_5
+    }).
+-define(metrics(MATCH, PASS, FAIL, FAIL_EX, FAIL_NORES, O_TOTAL, O_FAIL, O_FAIL_OOS,
+        O_FAIL_UNKNOWN, O_SUCC, RATE, RATE_MAX, RATE_5),
+    #{
+        matched := MATCH,
+        passed := PASS,
+        failed := FAIL,
+        'failed.exception' := FAIL_EX,
+        'failed.no_result' := FAIL_NORES,
+        'outputs.total' := O_TOTAL,
+        'outputs.failed' := O_FAIL,
+        'outputs.failed.out_of_service' := O_FAIL_OOS,
+        'outputs.failed.unknown' := O_FAIL_UNKNOWN,
+        'outputs.success' := O_SUCC,
+        rate := RATE,
+        rate_max := RATE_MAX,
+        rate_last5m := RATE_5
+    }).
 
 namespace() -> "rule".
 
@@ -268,17 +302,23 @@ printable_function_name(Mod, Func) ->
     list_to_binary(lists:concat([Mod,":",Func])).
 
 get_rule_metrics(Id) ->
-    Format = fun (Node, #{matched := Matched,
-                          rate := Current,
-                          rate_max := Max,
-                          rate_last5m := Last5M
-                        }) ->
-        #{ metrics => #{
-            matched => Matched,
-            rate => Current,
-            rate_max => Max,
-            rate_last5m => Last5M
-           }
+    Format = fun (Node, #{
+            counters :=
+                #{matched := Matched, passed := Passed, failed := Failed,
+                 'failed.exception' := FailedEx,
+                 'failed.no_result' := FailedNoRes,
+                 'outputs.total' := OTotal,
+                 'outputs.failed' := OFailed,
+                 'outputs.failed.out_of_service' := OFailedOOS,
+                 'outputs.failed.unknown' := OFailedUnknown,
+                 'outputs.success' := OFailedSucc
+                 },
+            rate :=
+                #{matched :=
+                    #{current := Current, max := Max, last5m := Last5M}
+                 }}) ->
+        #{ metrics => ?METRICS(Matched, Passed, Failed, FailedEx, FailedNoRes,
+            OTotal, OFailed, OFailedOOS, OFailedUnknown, OFailedSucc, Current, Max, Last5M)
          , node => Node
          }
     end,
@@ -286,13 +326,21 @@ get_rule_metrics(Id) ->
      || Node <- mria_mnesia:running_nodes()].
 
 aggregate_metrics(AllMetrics) ->
-    InitMetrics = #{matched => 0, rate => 0, rate_max => 0, rate_last5m => 0},
+    InitMetrics = ?METRICS(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0),
     lists:foldl(fun
-        (#{metrics := #{matched := Match1, rate := Rate1,
-                        rate_max := RateMax1, rate_last5m := Rate5m1}},
-         #{matched := Match0, rate := Rate0, rate_max := RateMax0, rate_last5m := Rate5m0}) ->
-            #{matched => Match1 + Match0, rate => Rate1 + Rate0,
-              rate_max => RateMax1 + RateMax0, rate_last5m => Rate5m1 + Rate5m0}
+        (#{metrics := ?metrics(Match1, Passed1, Failed1, FailedEx1, FailedNoRes1,
+             OTotal1, OFailed1, OFailedOOS1, OFailedUnknown1, OFailedSucc1,
+             Rate1, RateMax1, Rate5m1)},
+          ?metrics(Match0, Passed0, Failed0, FailedEx0, FailedNoRes0,
+             OTotal0, OFailed0, OFailedOOS0, OFailedUnknown0, OFailedSucc0,
+             Rate0, RateMax0, Rate5m0)) ->
+        ?METRICS(Match1 + Match0, Passed1 + Passed0, Failed1 + Failed0,
+             FailedEx1 + FailedEx0, FailedNoRes1 + FailedNoRes0,
+             OTotal1 + OTotal0, OFailed1 + OFailed0,
+             OFailedOOS1 + OFailedOOS0,
+             OFailedUnknown1 + OFailedUnknown0,
+             OFailedSucc1 + OFailedSucc0,
+             Rate1 + Rate0, RateMax1 + RateMax0, Rate5m1 + Rate5m0)
         end, InitMetrics, AllMetrics).
 
 get_one_rule(AllRules, Id) ->

+ 32 - 12
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -55,18 +55,23 @@ apply_rules([Rule = #{id := RuleID}|More], Input) ->
     catch
         %% ignore the errors if select or match failed
         _:{select_and_transform_error, Error} ->
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'),
             ?SLOG(warning, #{msg => "SELECT_clause_exception",
                              rule_id => RuleID, reason => Error});
         _:{match_conditions_error, Error} ->
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'),
             ?SLOG(warning, #{msg => "WHERE_clause_exception",
                              rule_id => RuleID, reason => Error});
         _:{select_and_collect_error, Error} ->
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'),
             ?SLOG(warning, #{msg => "FOREACH_clause_exception",
                              rule_id => RuleID, reason => Error});
         _:{match_incase_error, Error} ->
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'),
             ?SLOG(warning, #{msg => "INCASE_clause_exception",
                              rule_id => RuleID, reason => Error});
         Class:Error:StkTrace ->
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'),
             ?SLOG(error, #{msg => "apply_rule_failed",
                            rule_id => RuleID,
                            exception => Class,
@@ -81,6 +86,7 @@ apply_rule_discard_result(Rule, Input) ->
     ok.
 
 apply_rule(Rule = #{id := RuleID}, Input) ->
+    ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, matched),
     clear_rule_payload(),
     do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})).
 
@@ -99,10 +105,16 @@ do_apply_rule(#{
     case ?RAISE(match_conditions(Conditions, ColumnsAndSelected),
                 {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
         true ->
-            ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId),
             Collection2 = filter_collection(Input, InCase, DoEach, Collection),
+            case Collection2 of
+                [] ->
+                    ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'failed.no_result');
+                _ ->
+                    ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, passed)
+            end,
             {ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]};
         false ->
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'failed.no_result'),
             {error, nomatch}
     end;
 
@@ -117,9 +129,10 @@ do_apply_rule(#{id := RuleId,
     case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)),
                 {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
         true ->
-            ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId),
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, passed),
             {ok, handle_output_list(RuleId, Outputs, Selected, Input)};
         false ->
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'failed.no_result'),
             {error, nomatch}
     end.
 
@@ -235,23 +248,30 @@ handle_output_list(RuleId, Outputs, Selected, Envs) ->
     [handle_output(RuleId, Out, Selected, Envs) || Out <- Outputs].
 
 handle_output(RuleId, OutId, Selected, Envs) ->
+    ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.total'),
     try
-        do_handle_output(OutId, Selected, Envs)
+        Result = do_handle_output(OutId, Selected, Envs),
+        ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.success'),
+        Result
     catch
+        throw:Reason ->
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed'),
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed.out_of_service'),
+            ?SLOG(error, #{msg => "output_failed", output => OutId, reason => Reason});
         Err:Reason:ST ->
-            ok = emqx_plugin_libs_metrics:inc_failed(rule_metrics, RuleId),
-            Level = case Err of throw -> debug; _ -> error end,
-            ?SLOG(Level, #{msg => "output_failed",
-                           output => OutId,
-                           exception => Err,
-                           reason => Reason,
-                           stacktrace => ST
-                          })
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed'),
+            ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed.unknown'),
+            ?SLOG(error, #{msg => "output_failed", output => OutId, exception => Err,
+                           reason => Reason, stacktrace => ST})
     end.
 
 do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) ->
     ?TRACE("BRIDGE", "output_to_bridge", #{bridge_id => BridgeId}),
-    emqx_bridge:send_message(BridgeId, Selected);
+    case emqx_bridge:send_message(BridgeId, Selected) of
+        {error, {Err, _}} when Err == bridge_not_found; Err == bridge_stopped ->
+            throw(bridge_out_of_service);
+        Result -> Result
+    end;
 do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
     Mod:Func(Selected, Envs, Args).
 

+ 2 - 2
apps/emqx_rule_engine/src/emqx_rule_sqltester.erl

@@ -41,7 +41,7 @@ test(#{sql := Sql, context := Context}) ->
 
 test_rule(Sql, Select, Context, EventTopics) ->
     RuleId = iolist_to_binary(["sql_tester:", emqx_misc:gen_id(16)]),
-    ok = emqx_plugin_libs_metrics:create_metrics(rule_metrics, RuleId),
+    ok = emqx_rule_engine:add_metrics_for_rule(RuleId),
     Rule = #{
         id => RuleId,
         sql => Sql,
@@ -62,7 +62,7 @@ test_rule(Sql, Select, Context, EventTopics) ->
         {ok, Data} -> {ok, flatten(Data)};
         {error, nomatch} -> {error, nomatch}
     after
-        ok = emqx_plugin_libs_metrics:clear_metrics(rule_metrics, RuleId)
+        ok = emqx_rule_engine:clear_metrics_for_rule(RuleId)
     end.
 
 get_selected_data(Selected, _Envs, _Args) ->