Sfoglia il codice sorgente

chore: improve protobuf decoding error messages

Fixes https://emqx.atlassian.net/browse/EMQX-12677
Thales Macedo Garitezi 1 anno fa
parent
commit
96c9020727

+ 11 - 0
apps/emqx_message_transformation/src/emqx_message_transformation.erl

@@ -467,6 +467,17 @@ decode(
                 }
                 }
             },
             },
             {error, TraceFailureContext};
             {error, TraceFailureContext};
+        throw:{schema_decode_error, ExtraContext} ->
+            TraceFailureContext = #trace_failure_context{
+                transformation = Transformation,
+                tag = "payload_decode_error",
+                context = ExtraContext#{
+                    decoder => protobuf,
+                    schema_name => SerdeName,
+                    message_type => MessageType
+                }
+            },
+            {error, TraceFailureContext};
         Class:Error:Stacktrace ->
         Class:Error:Stacktrace ->
             TraceFailureContext = #trace_failure_context{
             TraceFailureContext = #trace_failure_context{
                 transformation = Transformation,
                 transformation = Transformation,

+ 88 - 0
apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl

@@ -1394,6 +1394,94 @@ t_protobuf(_Config) ->
 
 
     ok.
     ok.
 
 
+%% Checks what happens if a wrong transformation chain is used.  In this case, the second
+%% transformation attempts to protobuf-decode a message that was already decoded but not
+%% re-encoded by the first transformation.
+t_protobuf_bad_chain(_Config) ->
+    ?check_trace(
+        begin
+            SerdeName = <<"myserde">>,
+            MessageType = <<"Person">>,
+            protobuf_create_serde(SerdeName),
+
+            Name1 = <<"foo">>,
+            PayloadSerde = #{
+                <<"type">> => <<"protobuf">>,
+                <<"schema">> => SerdeName,
+                <<"message_type">> => MessageType
+            },
+            NoneSerde = #{<<"type">> => <<"none">>},
+            JSONSerde = #{<<"type">> => <<"json">>},
+
+            Transformation1 = transformation(Name1, _Ops1 = [], #{
+                <<"payload_decoder">> => PayloadSerde,
+                <<"payload_encoder">> => NoneSerde
+            }),
+            {201, _} = insert(Transformation1),
+
+            %% WRONG: after the first transformation, payload is already decoded, so we
+            %% shouldn't use protobuf again.
+            Name2 = <<"bar">>,
+            Transformation2A = transformation(Name2, [], #{
+                <<"payload_decoder">> => PayloadSerde,
+                <<"payload_encoder">> => JSONSerde
+            }),
+            {201, _} = insert(Transformation2A),
+
+            C = connect(<<"c1">>),
+            {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+
+            [Payload | _] = protobuf_valid_payloads(SerdeName, MessageType),
+            ok = publish(C, <<"t/1">>, {raw, Payload}),
+            ?assertNotReceive({publish, _}),
+
+            ok
+        end,
+        fun(Trace) ->
+            ct:pal("trace:\n  ~p", [Trace]),
+            ?assertMatch(
+                [],
+                [
+                    E
+                 || #{
+                        ?snk_kind := message_transformation_failed,
+                        message := "payload_decode_schema_failure",
+                        reason := function_clause
+                    } = E <- Trace
+                ]
+            ),
+            %% No unexpected crashes
+            ?assertMatch(
+                [],
+                [
+                    E
+                 || #{
+                        ?snk_kind := message_transformation_failed,
+                        stacktrace := _
+                    } = E <- Trace
+                ]
+            ),
+            ?assertMatch(
+                [
+                    #{
+                        explain :=
+                            <<"Attempted to schema decode an already decoded message.", _/binary>>
+                    }
+                    | _
+                ],
+                [
+                    E
+                 || #{
+                        ?snk_kind := message_transformation_failed,
+                        message := "payload_decode_error"
+                    } = E <- Trace
+                ]
+            ),
+            ok
+        end
+    ),
+    ok.
+
 %% Tests that restoring a backup config works.
 %% Tests that restoring a backup config works.
 %%   * Existing transformations (identified by `name') are left untouched.
 %%   * Existing transformations (identified by `name') are left untouched.
 %%   * No transformations are removed.
 %%   * No transformations are removed.

+ 45 - 20
apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl

@@ -90,21 +90,7 @@ handle_rule_function(sparkplug_encode, [Term | MoreArgs]) ->
         [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs]
         [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs]
     );
     );
 handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) ->
 handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) ->
-    try
-        decode(SchemaId, Data, MoreArgs)
-    catch
-        error:{gpb_error, {decoding_failure, {_Data, _Schema, {error, function_clause, _Stack}}}} ->
-            throw(
-                {schema_decode_error, #{
-                    error_type => decoding_failure,
-                    schema_id => SchemaId,
-                    data => Data,
-                    more_args => MoreArgs,
-                    explain =>
-                        <<"The given data could not be decoded. Please check the input data and the schema.">>
-                }}
-            )
-    end;
+    decode(SchemaId, Data, MoreArgs);
 handle_rule_function(schema_decode, Args) ->
 handle_rule_function(schema_decode, Args) ->
     error({args_count_error, {schema_decode, Args}});
     error({args_count_error, {schema_decode, Args}});
 handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) ->
 handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) ->
@@ -162,7 +148,17 @@ encode(SerdeName, Data, VarArgs) when is_list(VarArgs) ->
 with_serde(Name, F) ->
 with_serde(Name, F) ->
     case emqx_schema_registry:get_serde(Name) of
     case emqx_schema_registry:get_serde(Name) of
         {ok, Serde} ->
         {ok, Serde} ->
-            F(Serde);
+            Meta =
+                case logger:get_process_metadata() of
+                    undefined -> #{};
+                    Meta0 -> Meta0
+                end,
+            logger:update_process_metadata(#{schema_name => Name}),
+            try
+                F(Serde)
+            after
+                logger:set_process_metadata(Meta)
+            end;
         {error, not_found} ->
         {error, not_found} ->
             error({serde_not_found, Name})
             error({serde_not_found, Name})
     end.
     end.
@@ -199,10 +195,39 @@ make_serde(json, Name, Source) ->
 eval_decode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
 eval_decode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
     Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]),
     Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]),
     avro_binary_decoder:decode(Data, Name, Store, Opts);
     avro_binary_decoder:decode(Data, Name, Store, Opts);
-eval_decode(#serde{type = protobuf, eval_context = SerdeMod}, [EncodedData, MessageName0]) ->
-    MessageName = binary_to_existing_atom(MessageName0, utf8),
-    Decoded = apply(SerdeMod, decode_msg, [EncodedData, MessageName]),
-    emqx_utils_maps:binary_key_map(Decoded);
+eval_decode(#serde{type = protobuf}, [#{} = DecodedData, MessageType]) ->
+    %% Already decoded, so it's an user error.
+    throw(
+        {schema_decode_error, #{
+            error_type => decoding_failure,
+            data => DecodedData,
+            message_type => MessageType,
+            explain =>
+                <<
+                    "Attempted to schema decode an already decoded message."
+                    " Check your rules or transformation pipeline."
+                >>
+        }}
+    );
+eval_decode(#serde{type = protobuf, eval_context = SerdeMod}, [EncodedData, MessageType0]) ->
+    MessageType = binary_to_existing_atom(MessageType0, utf8),
+    try
+        Decoded = apply(SerdeMod, decode_msg, [EncodedData, MessageType]),
+        emqx_utils_maps:binary_key_map(Decoded)
+    catch
+        error:{gpb_error, {decoding_failure, {_Data, _Schema, {error, function_clause, _Stack}}}} ->
+            #{schema_name := SchemaName} = logger:get_process_metadata(),
+            throw(
+                {schema_decode_error, #{
+                    error_type => decoding_failure,
+                    data => EncodedData,
+                    message_type => MessageType,
+                    schema_name => SchemaName,
+                    explain =>
+                        <<"The given data could not be decoded. Please check the input data and the schema.">>
+                }}
+            )
+    end;
 eval_decode(#serde{type = json, name = Name}, [Data]) ->
 eval_decode(#serde{type = json, name = Name}, [Data]) ->
     true = is_binary(Data),
     true = is_binary(Data),
     Term = json_decode(Data),
     Term = json_decode(Data),

+ 2 - 2
apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl

@@ -555,8 +555,8 @@ t_decode_fail(_Config) ->
             data := <<"ss">>,
             data := <<"ss">>,
             error_type := decoding_failure,
             error_type := decoding_failure,
             explain := _,
             explain := _,
-            more_args := [<<"Person">>],
-            schema_id := <<"my_serde">>
+            message_type := 'Person',
+            schema_name := <<"my_serde">>
         }},
         }},
         emqx_rule_funcs:schema_decode(<<"my_serde">>, Payload, <<"Person">>)
         emqx_rule_funcs:schema_decode(<<"my_serde">>, Payload, <<"Person">>)
     ),
     ),