Просмотр исходного кода

Merge pull request #13916 from thalesmg/20241002-m-rule-runtime-failed-metrics

fix(rule engine metrics): bump `failed` metric when its sub-counters are bumped
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
6b3f0ced2e

+ 16 - 8
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -88,7 +88,7 @@ apply_rule(Rule = #{id := RuleID}, Columns, Envs) ->
     catch
     catch
         %% ignore the errors if select or match failed
         %% ignore the errors if select or match failed
         _:Reason = {select_and_transform_error, Error} ->
         _:Reason = {select_and_transform_error, Error} ->
-            ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'),
+            ok = metrics_inc_exception(RuleID),
             trace_rule_sql(
             trace_rule_sql(
                 "SELECT_clause_exception",
                 "SELECT_clause_exception",
                 #{
                 #{
@@ -98,7 +98,7 @@ apply_rule(Rule = #{id := RuleID}, Columns, Envs) ->
             ),
             ),
             {error, Reason};
             {error, Reason};
         _:Reason = {match_conditions_error, Error} ->
         _:Reason = {match_conditions_error, Error} ->
-            ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'),
+            ok = metrics_inc_exception(RuleID),
             trace_rule_sql(
             trace_rule_sql(
                 "WHERE_clause_exception",
                 "WHERE_clause_exception",
                 #{
                 #{
@@ -108,7 +108,7 @@ apply_rule(Rule = #{id := RuleID}, Columns, Envs) ->
             ),
             ),
             {error, Reason};
             {error, Reason};
         _:Reason = {select_and_collect_error, Error} ->
         _:Reason = {select_and_collect_error, Error} ->
-            ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'),
+            ok = metrics_inc_exception(RuleID),
             trace_rule_sql(
             trace_rule_sql(
                 "FOREACH_clause_exception",
                 "FOREACH_clause_exception",
                 #{
                 #{
@@ -118,7 +118,7 @@ apply_rule(Rule = #{id := RuleID}, Columns, Envs) ->
             ),
             ),
             {error, Reason};
             {error, Reason};
         _:Reason = {match_incase_error, Error} ->
         _:Reason = {match_incase_error, Error} ->
-            ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'),
+            ok = metrics_inc_exception(RuleID),
             trace_rule_sql(
             trace_rule_sql(
                 "INCASE_clause_exception",
                 "INCASE_clause_exception",
                 #{
                 #{
@@ -128,7 +128,7 @@ apply_rule(Rule = #{id := RuleID}, Columns, Envs) ->
             ),
             ),
             {error, Reason};
             {error, Reason};
         Class:Error:StkTrace ->
         Class:Error:StkTrace ->
-            ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'),
+            ok = metrics_inc_exception(RuleID),
             trace_rule_sql(
             trace_rule_sql(
                 "apply_rule_failed",
                 "apply_rule_failed",
                 #{
                 #{
@@ -186,7 +186,7 @@ do_apply_rule(
             case FinalCollection of
             case FinalCollection of
                 [] ->
                 [] ->
                     trace_rule_sql("SQL_yielded_no_result"),
                     trace_rule_sql("SQL_yielded_no_result"),
-                    ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result');
+                    ok = metrics_inc_no_result(RuleId);
                 _ ->
                 _ ->
                     trace_rule_sql(
                     trace_rule_sql(
                         "SQL_yielded_result", #{result => FinalCollection}, debug
                         "SQL_yielded_result", #{result => FinalCollection}, debug
@@ -197,7 +197,7 @@ do_apply_rule(
             {ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- FinalCollection]};
             {ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- FinalCollection]};
         false ->
         false ->
             trace_rule_sql("SQL_yielded_no_result"),
             trace_rule_sql("SQL_yielded_no_result"),
-            ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
+            ok = metrics_inc_no_result(RuleId),
             {error, nomatch}
             {error, nomatch}
     end;
     end;
 do_apply_rule(
 do_apply_rule(
@@ -218,7 +218,7 @@ do_apply_rule(
             {ok, handle_action_list(RuleId, Actions, Selected, maps:merge(Columns, Envs))};
             {ok, handle_action_list(RuleId, Actions, Selected, maps:merge(Columns, Envs))};
         false ->
         false ->
             trace_rule_sql("SQL_yielded_no_result"),
             trace_rule_sql("SQL_yielded_no_result"),
-            ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
+            ok = metrics_inc_no_result(RuleId),
             {error, nomatch}
             {error, nomatch}
     end.
     end.
 
 
@@ -924,3 +924,11 @@ trace_rule_sql(Message, Extra, Level) ->
         Message,
         Message,
         Extra
         Extra
     ).
     ).
+
+metrics_inc_no_result(RuleId) ->
+    ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
+    ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed').
+
+metrics_inc_exception(RuleId) ->
+    ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.exception'),
+    ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed').

+ 46 - 1
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -142,7 +142,8 @@ groups() ->
         ]},
         ]},
         {metrics_fail, [], [
         {metrics_fail, [], [
             t_rule_metrics_sync_fail,
             t_rule_metrics_sync_fail,
-            t_rule_metrics_async_fail
+            t_rule_metrics_async_fail,
+            t_failed_rule_metrics
         ]},
         ]},
         {metrics_fail_simple, [], [
         {metrics_fail_simple, [], [
             t_rule_metrics_sync_fail,
             t_rule_metrics_sync_fail,
@@ -3982,6 +3983,47 @@ t_sqlselect_client_attr(_) ->
     emqx_rule_engine:delete_rule(?TMP_RULEID),
     emqx_rule_engine:delete_rule(?TMP_RULEID),
     emqx_config:put_zone_conf(default, [mqtt, client_attrs_init], []).
     emqx_config:put_zone_conf(default, [mqtt, client_attrs_init], []).
 
 
+%% Checks that we bump both `failed' and one of its sub-counters when failures occur while
+%% evaluating rule SQL.
+t_failed_rule_metrics(_Config) ->
+    {ok, C} = emqtt:start_link(emqtt_client_config()),
+    emqtt:connect(C),
+    RuleId = atom_to_binary(?FUNCTION_NAME),
+    %% Never matches
+    create_rule(
+        RuleId,
+        <<"select 1 from \"t\" where false">>
+    ),
+    {ok, _} = emqtt:publish(C, <<"t">>, <<"hi">>, [{qos, 2}]),
+    ?assertMatch(
+        #{
+            'matched' := 1,
+            'failed' := 1,
+            'failed.no_result' := 1,
+            'failed.exception' := 0
+        },
+        get_counters(RuleId)
+    ),
+    %% Exception in select clause
+    delete_rule(RuleId),
+    create_rule(
+        RuleId,
+        <<"select map_get(1, 1) from \"t\"">>
+    ),
+    {ok, _} = emqtt:publish(C, <<"t">>, <<"hi">>, [{qos, 2}]),
+    ?assertMatch(
+        #{
+            'matched' := 1,
+            'failed' := 1,
+            'failed.no_result' := 0,
+            'failed.exception' := 1
+        },
+        get_counters(RuleId)
+    ),
+    delete_rule(RuleId),
+    emqtt:stop(C),
+    ok.
+
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% Internal helpers
 %% Internal helpers
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
@@ -3989,6 +4031,9 @@ t_sqlselect_client_attr(_) ->
 is_ee() ->
 is_ee() ->
     emqx_release:edition() == ee.
     emqx_release:edition() == ee.
 
 
+get_counters(RuleId) ->
+    emqx_metrics_worker:get_counters(rule_metrics, RuleId).
+
 republish_action(Topic) ->
 republish_action(Topic) ->
     republish_action(Topic, <<"${payload}">>).
     republish_action(Topic, <<"${payload}">>).
 
 

+ 1 - 0
changes/ce/fix-13916.en.md

@@ -0,0 +1 @@
+Previously, when a rule's `failed.no_result` or `failed.exception` metrics were bumped, their corresponding parent metric `failed` was not also bumped.