Bladeren bron

Merge pull request #13621 from thalesmg/20240814-r58-mt-failure-metric

fix(message transformation): increment failures when final payload is not a binary, and handle variform eval crashes
Thales Macedo Garitezi 1 jaar geleden
bovenliggende
commit
7f045289e7

+ 51 - 12
apps/emqx_message_transformation/src/emqx_message_transformation.erl

@@ -29,7 +29,7 @@
 ]).
 
 %% Internal exports
--export([run_transformation/2, trace_failure_context_to_map/1]).
+-export([run_transformation/2, trace_failure_context_to_map/1, prettify_operation/1]).
 
 %%------------------------------------------------------------------------------
 %% Type declarations
@@ -173,6 +173,19 @@ run_transformation(Transformation, MessageIn) ->
             {FailureAction, TraceFailureContext}
     end.
 
+prettify_operation(Operation0) ->
+    %% TODO: remove injected bif module
+    Operation = maps:update_with(
+        value,
+        fun(V) -> iolist_to_binary(emqx_variform:decompile(V)) end,
+        Operation0
+    ),
+    maps:update_with(
+        key,
+        fun(Path) -> iolist_to_binary(lists:join(".", Path)) end,
+        Operation
+    ).
+
 %%------------------------------------------------------------------------------
 %% Internal functions
 %%------------------------------------------------------------------------------
@@ -181,17 +194,32 @@ run_transformation(Transformation, MessageIn) ->
     {ok, eval_context()} | {error, trace_failure_context()}.
 eval_operation(Operation, Transformation, Context) ->
     #{key := K, value := V} = Operation,
-    case eval_variform(K, V, Context) of
-        {error, Reason} ->
-            FailureContext = #trace_failure_context{
+    try
+        case eval_variform(K, V, Context) of
+            {error, Reason} ->
+                FailureContext = #trace_failure_context{
+                    transformation = Transformation,
+                    tag = "transformation_eval_operation_failure",
+                    context = #{reason => Reason}
+                },
+                {error, FailureContext};
+            {ok, Rendered} ->
+                NewContext = put_value(K, Rendered, Context),
+                {ok, NewContext}
+        end
+    catch
+        Class:Error:Stacktrace ->
+            FailureContext1 = #trace_failure_context{
                 transformation = Transformation,
-                tag = "transformation_eval_operation_failure",
-                context = #{reason => Reason}
+                tag = "transformation_eval_operation_exception",
+                context = #{
+                    kind => Class,
+                    reason => Error,
+                    stacktrace => Stacktrace,
+                    operation => prettify_operation(Operation)
+                }
             },
-            {error, FailureContext};
-        {ok, Rendered} ->
-            NewContext = put_value(K, Rendered, Context),
-            {ok, NewContext}
+            {error, FailureContext1}
     end.
 
 -spec eval_variform([binary(), ...], _, eval_context()) ->
@@ -280,12 +308,22 @@ run_transformations(Transformations, Message = #message{headers = Headers}) ->
     end.
 
 do_run_transformations(Transformations, Message) ->
+    LastTransformation = #{name := LastTransformationName} = lists:last(Transformations),
     Fun = fun(Transformation, MessageAcc) ->
         #{name := Name} = Transformation,
         emqx_message_transformation_registry:inc_matched(Name),
         case run_transformation(Transformation, MessageAcc) of
             {ok, #message{} = NewAcc} ->
-                emqx_message_transformation_registry:inc_succeeded(Name),
+                %% If this is the last transformation, we can't bump its success counter
+                %% yet.  We perform a check to see if the final payload is encoded as a
+                %% binary after all transformations have run, and it's the last
+                %% transformation's responsibility to properly encode it.
+                case Name =:= LastTransformationName of
+                    true ->
+                        ok;
+                    false ->
+                        emqx_message_transformation_registry:inc_succeeded(Name)
+                end,
                 {cont, NewAcc};
             {ignore, TraceFailureContext} ->
                 trace_failure_from_context(TraceFailureContext),
@@ -307,11 +345,12 @@ do_run_transformations(Transformations, Message) ->
         #message{} = FinalMessage ->
             case is_payload_properly_encoded(FinalMessage) of
                 true ->
+                    emqx_message_transformation_registry:inc_succeeded(LastTransformationName),
                     FinalMessage;
                 false ->
                     %% Take the last validation's failure action, as it's the one
                     %% responsible for getting the right encoding.
-                    LastTransformation = lists:last(Transformations),
+                    emqx_message_transformation_registry:inc_failed(LastTransformationName),
                     #{failure_action := FailureAction} = LastTransformation,
                     trace_failure(LastTransformation, "transformation_bad_encoding", #{
                         action => FailureAction,

+ 1 - 11
apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl

@@ -715,17 +715,7 @@ transformation_out(Transformation) ->
     ).
 
 operation_out(Operation0) ->
-    %% TODO: remove injected bif module
-    Operation = maps:update_with(
-        value,
-        fun(V) -> iolist_to_binary(emqx_variform:decompile(V)) end,
-        Operation0
-    ),
-    maps:update_with(
-        key,
-        fun(Path) -> iolist_to_binary(lists:join(".", Path)) end,
-        Operation
-    ).
+    emqx_message_transformation:prettify_operation(Operation0).
 
 dryrun_input_message_in(Params) ->
     %% We already check the params against the schema at the API boundary, so we can

+ 144 - 4
apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl

@@ -1635,23 +1635,114 @@ t_load_config(_Config) ->
 t_final_payload_must_be_binary(_Config) ->
     ?check_trace(
         begin
-            Name = <<"foo">>,
+            Name1 = <<"foo">>,
             Operations = [operation(<<"payload.hello">>, <<"concat(['world'])">>)],
-            Transformation = transformation(Name, Operations, #{
+            Transformation1 = transformation(Name1, Operations, #{
                 <<"payload_decoder">> => #{<<"type">> => <<"json">>},
                 <<"payload_encoder">> => #{<<"type">> => <<"none">>}
             }),
-            {201, _} = insert(Transformation),
+            {201, _} = insert(Transformation1),
 
             C = connect(<<"c1">>),
             {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
             ok = publish(C, <<"t/1">>, #{x => 1, y => true}),
             ?assertNotReceive({publish, _}),
+
+            ?retry(
+                100,
+                10,
+                ?assertMatch(
+                    {200, #{
+                        <<"metrics">> :=
+                            #{
+                                <<"matched">> := 1,
+                                <<"succeeded">> := 0,
+                                <<"failed">> := 1
+                            },
+                        <<"node_metrics">> :=
+                            [
+                                #{
+                                    <<"node">> := _,
+                                    <<"metrics">> := #{
+                                        <<"matched">> := 1,
+                                        <<"succeeded">> := 0,
+                                        <<"failed">> := 1
+                                    }
+                                }
+                            ]
+                    }},
+                    get_metrics(Name1)
+                )
+            ),
+
+            %% When there are multiple transformations for a topic, the last one is
+            %% responsible for properly encoding the payload to a binary.
+            Name2 = <<"bar">>,
+            Transformation2 = transformation(Name2, _Operations = [], #{
+                <<"payload_decoder">> => #{<<"type">> => <<"none">>},
+                <<"payload_encoder">> => #{<<"type">> => <<"none">>}
+            }),
+            {201, _} = insert(Transformation2),
+
+            ok = publish(C, <<"t/1">>, #{x => 1, y => true}),
+            ?assertNotReceive({publish, _}),
+
+            %% The old, first transformation succeeds.
+            ?assertMatch(
+                {200, #{
+                    <<"metrics">> :=
+                        #{
+                            <<"matched">> := 2,
+                            <<"succeeded">> := 1,
+                            <<"failed">> := 1
+                        },
+                    <<"node_metrics">> :=
+                        [
+                            #{
+                                <<"node">> := _,
+                                <<"metrics">> := #{
+                                    <<"matched">> := 2,
+                                    <<"succeeded">> := 1,
+                                    <<"failed">> := 1
+                                }
+                            }
+                        ]
+                }},
+                get_metrics(Name1)
+            ),
+
+            %% The last transformation gets the failure metric bump.
+            ?assertMatch(
+                {200, #{
+                    <<"metrics">> :=
+                        #{
+                            <<"matched">> := 1,
+                            <<"succeeded">> := 0,
+                            <<"failed">> := 1
+                        },
+                    <<"node_metrics">> :=
+                        [
+                            #{
+                                <<"node">> := _,
+                                <<"metrics">> := #{
+                                    <<"matched">> := 1,
+                                    <<"succeeded">> := 0,
+                                    <<"failed">> := 1
+                                }
+                            }
+                        ]
+                }},
+                get_metrics(Name2)
+            ),
+
             ok
         end,
         fun(Trace) ->
             ?assertMatch(
-                [#{message := "transformation_bad_encoding"}],
+                [
+                    #{message := "transformation_bad_encoding"},
+                    #{message := "transformation_bad_encoding"}
+                ],
                 ?of_kind(message_transformation_failed, Trace)
             ),
             ok
@@ -1659,6 +1750,55 @@ t_final_payload_must_be_binary(_Config) ->
     ),
     ok.
 
+%% Checks that an input value that does not respect the declared encoding bumps the
+%% failure metric as expected.  Also, such a crash does not lead to the message continuing
+%% the publication process.
+t_bad_decoded_value_failure_metric(_Config) ->
+    ?check_trace(
+        begin
+            Name = <<"bar">>,
+            Operations = [operation(<<"payload.msg">>, <<"payload">>)],
+            Transformation = transformation(Name, Operations, #{
+                <<"payload_decoder">> => #{<<"type">> => <<"none">>},
+                <<"payload_encoder">> => #{<<"type">> => <<"json">>}
+            }),
+            {201, _} = insert(Transformation),
+            C = connect(<<"c1">>),
+            {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+            ok = publish(C, <<"t/1">>, {raw, <<"aaa">>}),
+            ?assertNotReceive({publish, _}),
+            ?retry(
+                100,
+                10,
+                ?assertMatch(
+                    {200, #{
+                        <<"metrics">> :=
+                            #{
+                                <<"matched">> := 1,
+                                <<"succeeded">> := 0,
+                                <<"failed">> := 1
+                            },
+                        <<"node_metrics">> :=
+                            [
+                                #{
+                                    <<"node">> := _,
+                                    <<"metrics">> := #{
+                                        <<"matched">> := 1,
+                                        <<"succeeded">> := 0,
+                                        <<"failed">> := 1
+                                    }
+                                }
+                            ]
+                    }},
+                    get_metrics(Name)
+                )
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
 %% Smoke test for the `json_encode' and `json_decode' BIFs.
 t_json_encode_decode_smoke_test(_Config) ->
     ?check_trace(