Просмотр исходного кода

fix(message transformation): increment failures when final payload is not a binary

Fixes https://emqx.atlassian.net/browse/EMQX-12849
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
eaa3e9c6d1

+ 13 - 2
apps/emqx_message_transformation/src/emqx_message_transformation.erl

@@ -280,12 +280,22 @@ run_transformations(Transformations, Message = #message{headers = Headers}) ->
     end.
 
 do_run_transformations(Transformations, Message) ->
+    LastTransformation = #{name := LastTransformationName} = lists:last(Transformations),
     Fun = fun(Transformation, MessageAcc) ->
         #{name := Name} = Transformation,
         emqx_message_transformation_registry:inc_matched(Name),
         case run_transformation(Transformation, MessageAcc) of
             {ok, #message{} = NewAcc} ->
-                emqx_message_transformation_registry:inc_succeeded(Name),
+                %% If this is the last transformation, we can't bump its success counter
+                %% yet.  We perform a check to see if the final payload is encoded as a
+                %% binary after all transformations have run, and it's the last
+                %% transformation's responsibility to properly encode it.
+                case Name =:= LastTransformationName of
+                    true ->
+                        ok;
+                    false ->
+                        emqx_message_transformation_registry:inc_succeeded(Name)
+                end,
                 {cont, NewAcc};
             {ignore, TraceFailureContext} ->
                 trace_failure_from_context(TraceFailureContext),
@@ -307,11 +317,12 @@ do_run_transformations(Transformations, Message) ->
         #message{} = FinalMessage ->
             case is_payload_properly_encoded(FinalMessage) of
                 true ->
+                    emqx_message_transformation_registry:inc_succeeded(LastTransformationName),
                     FinalMessage;
                 false ->
                     %% Take the last validation's failure action, as it's the one
                     %% responsible for getting the right encoding.
-                    LastTransformation = lists:last(Transformations),
+                    emqx_message_transformation_registry:inc_failed(LastTransformationName),
                     #{failure_action := FailureAction} = LastTransformation,
                     trace_failure(LastTransformation, "transformation_bad_encoding", #{
                         action => FailureAction,

+ 95 - 4
apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl

@@ -1635,23 +1635,114 @@ t_load_config(_Config) ->
 t_final_payload_must_be_binary(_Config) ->
     ?check_trace(
         begin
-            Name = <<"foo">>,
+            Name1 = <<"foo">>,
             Operations = [operation(<<"payload.hello">>, <<"concat(['world'])">>)],
-            Transformation = transformation(Name, Operations, #{
+            Transformation1 = transformation(Name1, Operations, #{
                 <<"payload_decoder">> => #{<<"type">> => <<"json">>},
                 <<"payload_encoder">> => #{<<"type">> => <<"none">>}
             }),
-            {201, _} = insert(Transformation),
+            {201, _} = insert(Transformation1),
 
             C = connect(<<"c1">>),
             {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
             ok = publish(C, <<"t/1">>, #{x => 1, y => true}),
             ?assertNotReceive({publish, _}),
+
+            ?retry(
+                100,
+                10,
+                ?assertMatch(
+                    {200, #{
+                        <<"metrics">> :=
+                            #{
+                                <<"matched">> := 1,
+                                <<"succeeded">> := 0,
+                                <<"failed">> := 1
+                            },
+                        <<"node_metrics">> :=
+                            [
+                                #{
+                                    <<"node">> := _,
+                                    <<"metrics">> := #{
+                                        <<"matched">> := 1,
+                                        <<"succeeded">> := 0,
+                                        <<"failed">> := 1
+                                    }
+                                }
+                            ]
+                    }},
+                    get_metrics(Name1)
+                )
+            ),
+
+            %% When there are multiple transformations for a topic, the last one is
+            %% responsible for properly encoding the payload to a binary.
+            Name2 = <<"bar">>,
+            Transformation2 = transformation(Name2, _Operations = [], #{
+                <<"payload_decoder">> => #{<<"type">> => <<"none">>},
+                <<"payload_encoder">> => #{<<"type">> => <<"none">>}
+            }),
+            {201, _} = insert(Transformation2),
+
+            ok = publish(C, <<"t/1">>, #{x => 1, y => true}),
+            ?assertNotReceive({publish, _}),
+
+            %% The old, first transformation succeeds.
+            ?assertMatch(
+                {200, #{
+                    <<"metrics">> :=
+                        #{
+                            <<"matched">> := 2,
+                            <<"succeeded">> := 1,
+                            <<"failed">> := 1
+                        },
+                    <<"node_metrics">> :=
+                        [
+                            #{
+                                <<"node">> := _,
+                                <<"metrics">> := #{
+                                    <<"matched">> := 2,
+                                    <<"succeeded">> := 1,
+                                    <<"failed">> := 1
+                                }
+                            }
+                        ]
+                }},
+                get_metrics(Name1)
+            ),
+
+            %% The last transformation gets the failure metric bump.
+            ?assertMatch(
+                {200, #{
+                    <<"metrics">> :=
+                        #{
+                            <<"matched">> := 1,
+                            <<"succeeded">> := 0,
+                            <<"failed">> := 1
+                        },
+                    <<"node_metrics">> :=
+                        [
+                            #{
+                                <<"node">> := _,
+                                <<"metrics">> := #{
+                                    <<"matched">> := 1,
+                                    <<"succeeded">> := 0,
+                                    <<"failed">> := 1
+                                }
+                            }
+                        ]
+                }},
+                get_metrics(Name2)
+            ),
+
             ok
         end,
         fun(Trace) ->
             ?assertMatch(
-                [#{message := "transformation_bad_encoding"}],
+                [
+                    #{message := "transformation_bad_encoding"},
+                    #{message := "transformation_bad_encoding"}
+                ],
                 ?of_kind(message_transformation_failed, Trace)
             ),
             ok