Parcourir la source

fix(rule actions): check `republish` action result for metrics

Fixes https://emqx.atlassian.net/browse/EMQX-12328
Thales Macedo Garitezi il y a 1 an
Parent
commit
39615e1cb6

+ 12 - 2
apps/emqx_rule_engine/src/emqx_rule_actions.erl

@@ -238,8 +238,18 @@ safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) ->
         payload = Payload,
         timestamp = erlang:system_time(millisecond)
     },
-    _ = emqx_broker:safe_publish(Msg),
-    emqx_metrics:inc_msg(Msg).
+    case emqx_broker:safe_publish(Msg) of
+        [_ | _] ->
+            emqx_metrics:inc_msg(Msg),
+            ok;
+        disconnect ->
+            error;
+        [] ->
+            %% Have to check previous logs to distinguish between schema validation
+            %% failure, no subscribers, blocked by authz, or anything else in the
+            %% `message.publish' hook evaluation.
+            error
+    end.
 
 parse_simple_var(Data) when is_binary(Data) ->
     emqx_template:parse(Data);

+ 7 - 4
apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl

@@ -24,6 +24,7 @@
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(CONF_DEFAULT, <<"rule_engine {rules {}}">>).
+-define(REPUBLISH_TOPIC, <<"rule_apply_test_SUITE">>).
 
 all() ->
     [
@@ -129,7 +130,7 @@ republish_action() ->
                 <<"payload">> => <<"MY PL">>,
                 <<"qos">> => 0,
                 <<"retain">> => false,
-                <<"topic">> => <<"rule_apply_test_SUITE">>,
+                <<"topic">> => ?REPUBLISH_TOPIC,
                 <<"user_properties">> => <<>>
             },
         <<"function">> => <<"republish">>
@@ -139,6 +140,8 @@ console_print_action() ->
     #{<<"function">> => <<"console">>}.
 
 basic_apply_rule_test_helper(Action, TraceType, StopAfterRender, PayloadEncode) ->
+    %% Subscribe to republish action target topic so there's at least one subscriber.
+    _ = emqx:subscribe(?REPUBLISH_TOPIC),
     %% Create Rule
     RuleTopic = iolist_to_binary([<<"my_rule_topic/">>, atom_to_binary(?FUNCTION_NAME)]),
     SQL = <<"SELECT payload.id as id, payload as payload FROM \"", RuleTopic/binary, "\"">>,
@@ -181,7 +184,7 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender, PayloadEncode)
         _NAttempts0 = 20,
         begin
             Bin = read_rule_trace_file(TraceName, TraceType, Now),
-            io:format("THELOG:~n~s", [Bin]),
+            ct:pal("THELOG:~n~s", [Bin]),
             case PayloadEncode of
                 hidden ->
                     ?assertEqual(nomatch, binary:match(Bin, [<<"my_payload_msg">>]));
@@ -207,7 +210,7 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender, PayloadEncode)
                 _NAttempts0 = 20,
                 begin
                     Bin = read_rule_trace_file(TraceName, TraceType, Now),
-                    io:format("THELOG2:~n~s", [Bin]),
+                    ct:pal("THELOG2:~n~s", [Bin]),
                     ?assertNotEqual(
                         nomatch, binary:match(Bin, [<<"action_stopped_after_template_rendering">>])
                     )
@@ -219,7 +222,7 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender, PayloadEncode)
                 _NAttempts0 = 20,
                 begin
                     Bin = read_rule_trace_file(TraceName, TraceType, Now),
-                    io:format("THELOG3:~n~s", [Bin]),
+                    ct:pal("THELOG3:~n~s", [Bin]),
                     ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_success">>])),
                     do_final_log_check(Action, Bin)
                 end

+ 86 - 1
apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl

@@ -392,9 +392,12 @@ create_failure_tracing_rule() ->
         sql => <<"select * from \"$events/schema_validation_failed\" ">>,
         actions => [make_trace_fn_action()]
     },
+    create_rule(Params).
+
+create_rule(Params) ->
     Path = emqx_mgmt_api_test_util:api_path(["rules"]),
     Res = request(post, Path, Params),
-    ct:pal("create failure tracing rule result:\n  ~p", [Res]),
+    ct:pal("create rule result:\n  ~p", [Res]),
     case Res of
         {ok, {{_, 201, _}, _, #{<<"id">> := RuleId}}} ->
             on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
@@ -1384,3 +1387,85 @@ t_load_config(_Config) ->
     ?assertIndexOrder([Name4, Name3, Name5], <<"t/a">>),
 
     ok.
+
+%% Checks that the republish action failure metric increases when schema validation fails
+%% for an outgoing message.  Though this is arguably more appropriate as an
+%% `emqx_rule_runtime' test case, it's simpler to setup the conditions here.
+t_republish_action_failure(_Config) ->
+    ?check_trace(
+        begin
+            Name1 = <<"1">>,
+            %% Always fails
+            Check1A = sql_check(<<"select 1 where false">>),
+            Validation1A = validation(Name1, [Check1A]),
+            {201, _} = insert(Validation1A),
+
+            RuleTopic = <<"some/topic">>,
+            Params = #{
+                enable => true,
+                sql => iolist_to_binary([
+                    <<"select * from \"">>,
+                    RuleTopic,
+                    <<"\"">>
+                ]),
+                actions => [
+                    #{
+                        function => <<"republish">>,
+                        args =>
+                            #{
+                                <<"mqtt_properties">> => #{},
+                                <<"payload">> => <<"aaa">>,
+                                <<"qos">> => 0,
+                                <<"retain">> => false,
+                                <<"topic">> => <<"t/republished">>,
+                                <<"user_properties">> => <<>>
+                            }
+                    }
+                ]
+            },
+            {201, #{<<"id">> := RuleId}} = create_rule(Params),
+            C = connect(<<"c1">>),
+            {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+            ok = publish(C, RuleTopic, #{}),
+            ?assertNotReceive({publish, _}),
+
+            ?assertMatch(
+                #{
+                    matched := 1,
+                    failed := 0,
+                    passed := 1,
+                    'actions.total' := 1,
+                    'actions.success' := 0,
+                    'actions.failed' := 1,
+                    'actions.failed.unknown' := 1,
+                    'actions.failed.out_of_service' := 0
+                },
+                emqx_metrics_worker:get_counters(rule_metrics, RuleId)
+            ),
+
+            %% `publish' return type is different when failure action is `disconnect'
+            Validation1B = validation(Name1, [Check1A], #{<<"failure_action">> => <<"disconnect">>}),
+            {200, _} = update(Validation1B),
+
+            ok = publish(C, RuleTopic, #{}),
+            ?assertNotReceive({publish, _}),
+
+            ?assertMatch(
+                #{
+                    matched := 2,
+                    failed := 0,
+                    passed := 2,
+                    'actions.total' := 2,
+                    'actions.success' := 0,
+                    'actions.failed' := 2,
+                    'actions.failed.unknown' := 2,
+                    'actions.failed.out_of_service' := 0
+                },
+                emqx_metrics_worker:get_counters(rule_metrics, RuleId)
+            ),
+
+            ok
+        end,
+        []
+    ),
+    ok.

+ 1 - 0
changes/ce/fix-13207.en.md

@@ -0,0 +1 @@
+Previously, if the `republish` rule engine action failed to publish the message, the action success metrics were always increased.  Now, if the action detects it doesn't reach at least one subscriber, action failure metrics are increased.