Pārlūkot izejas kodu

fix(message transformation): handle wrongly decoded values and other crashes

Fixes https://emqx.atlassian.net/browse/EMQX-12853
Thales Macedo Garitezi 1 gadu atpakaļ
vecāks
revīzija
5cc51be405

+ 38 - 10
apps/emqx_message_transformation/src/emqx_message_transformation.erl

@@ -29,7 +29,7 @@
 ]).
 
 %% Internal exports
--export([run_transformation/2, trace_failure_context_to_map/1]).
+-export([run_transformation/2, trace_failure_context_to_map/1, prettify_operation/1]).
 
 %%------------------------------------------------------------------------------
 %% Type declarations
@@ -173,6 +173,19 @@ run_transformation(Transformation, MessageIn) ->
             {FailureAction, TraceFailureContext}
     end.
 
+prettify_operation(Operation0) ->
+    %% TODO: remove injected bif module
+    Operation = maps:update_with(
+        value,
+        fun(V) -> iolist_to_binary(emqx_variform:decompile(V)) end,
+        Operation0
+    ),
+    maps:update_with(
+        key,
+        fun(Path) -> iolist_to_binary(lists:join(".", Path)) end,
+        Operation
+    ).
+
 %%------------------------------------------------------------------------------
 %% Internal functions
 %%------------------------------------------------------------------------------
@@ -181,17 +194,32 @@ run_transformation(Transformation, MessageIn) ->
     {ok, eval_context()} | {error, trace_failure_context()}.
 eval_operation(Operation, Transformation, Context) ->
     #{key := K, value := V} = Operation,
-    case eval_variform(K, V, Context) of
-        {error, Reason} ->
-            FailureContext = #trace_failure_context{
+    try
+        case eval_variform(K, V, Context) of
+            {error, Reason} ->
+                FailureContext = #trace_failure_context{
+                    transformation = Transformation,
+                    tag = "transformation_eval_operation_failure",
+                    context = #{reason => Reason}
+                },
+                {error, FailureContext};
+            {ok, Rendered} ->
+                NewContext = put_value(K, Rendered, Context),
+                {ok, NewContext}
+        end
+    catch
+        Class:Error:Stacktrace ->
+            FailureContext1 = #trace_failure_context{
                 transformation = Transformation,
-                tag = "transformation_eval_operation_failure",
-                context = #{reason => Reason}
+                tag = "transformation_eval_operation_exception",
+                context = #{
+                    kind => Class,
+                    reason => Error,
+                    stacktrace => Stacktrace,
+                    operation => prettify_operation(Operation)
+                }
             },
-            {error, FailureContext};
-        {ok, Rendered} ->
-            NewContext = put_value(K, Rendered, Context),
-            {ok, NewContext}
+            {error, FailureContext1}
     end.
 
 -spec eval_variform([binary(), ...], _, eval_context()) ->

+ 1 - 11
apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl

@@ -715,17 +715,7 @@ transformation_out(Transformation) ->
     ).
 
 operation_out(Operation0) ->
-    %% TODO: remove injected bif module
-    Operation = maps:update_with(
-        value,
-        fun(V) -> iolist_to_binary(emqx_variform:decompile(V)) end,
-        Operation0
-    ),
-    maps:update_with(
-        key,
-        fun(Path) -> iolist_to_binary(lists:join(".", Path)) end,
-        Operation
-    ).
+    emqx_message_transformation:prettify_operation(Operation0).
 
 dryrun_input_message_in(Params) ->
     %% We already check the params against the schema at the API boundary, so we can

+ 49 - 0
apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl

@@ -1750,6 +1750,55 @@ t_final_payload_must_be_binary(_Config) ->
     ),
     ok.
 
+%% Checks that an input value that does not respect the declared encoding bumps the
+%% failure metric as expected.  Also, such a crash does not lead to the message continuing
+%% the publication process.
+t_bad_decoded_value_failure_metric(_Config) ->
+    ?check_trace(
+        begin
+            Name = <<"bar">>,
+            Operations = [operation(<<"payload.msg">>, <<"payload">>)],
+            Transformation = transformation(Name, Operations, #{
+                <<"payload_decoder">> => #{<<"type">> => <<"none">>},
+                <<"payload_encoder">> => #{<<"type">> => <<"json">>}
+            }),
+            {201, _} = insert(Transformation),
+            C = connect(<<"c1">>),
+            {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+            ok = publish(C, <<"t/1">>, {raw, <<"aaa">>}),
+            ?assertNotReceive({publish, _}),
+            ?retry(
+                100,
+                10,
+                ?assertMatch(
+                    {200, #{
+                        <<"metrics">> :=
+                            #{
+                                <<"matched">> := 1,
+                                <<"succeeded">> := 0,
+                                <<"failed">> := 1
+                            },
+                        <<"node_metrics">> :=
+                            [
+                                #{
+                                    <<"node">> := _,
+                                    <<"metrics">> := #{
+                                        <<"matched">> := 1,
+                                        <<"succeeded">> := 0,
+                                        <<"failed">> := 1
+                                    }
+                                }
+                            ]
+                    }},
+                    get_metrics(Name)
+                )
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
 %% Smoke test for the `json_encode' and `json_decode' BIFs.
 t_json_encode_decode_smoke_test(_Config) ->
     ?check_trace(