|
|
@@ -446,7 +446,7 @@ take_from_context(Context, Message) ->
|
|
|
|
|
|
decode(Payload, #{type := none}, _Transformation) ->
|
|
|
{ok, Payload};
|
|
|
-decode(Payload, #{type := json}, Transformation) ->
|
|
|
+decode(Payload, #{type := json}, Transformation) when is_binary(Payload) ->
|
|
|
case emqx_utils_json:safe_decode(Payload, [return_maps]) of
|
|
|
{ok, JSON} ->
|
|
|
{ok, JSON};
|
|
|
@@ -461,7 +461,7 @@ decode(Payload, #{type := json}, Transformation) ->
|
|
|
},
|
|
|
{error, TraceFailureContext}
|
|
|
end;
|
|
|
-decode(Payload, #{type := avro, schema := SerdeName}, Transformation) ->
|
|
|
+decode(Payload, #{type := avro, schema := SerdeName}, Transformation) when is_binary(Payload) ->
|
|
|
try
|
|
|
{ok, emqx_schema_registry_serde:decode(SerdeName, Payload)}
|
|
|
catch
|
|
|
@@ -531,7 +531,26 @@ decode(
|
|
|
}
|
|
|
},
|
|
|
{error, TraceFailureContext}
|
|
|
- end.
|
|
|
+ end;
|
|
|
+decode(NotABinary, #{} = Decoder, Transformation) ->
|
|
|
+ DecoderContext0 = maps:with([type, name, message_type], Decoder),
|
|
|
+ DecoderContext1 = emqx_utils_maps:rename(name, schema_name, DecoderContext0),
|
|
|
+ DecoderContext = emqx_utils_maps:rename(type, decoder, DecoderContext1),
|
|
|
+ Context =
|
|
|
+ maps:merge(
|
|
|
+ DecoderContext,
|
|
|
+ #{
|
|
|
+ reason => <<"payload must be a binary">>,
|
|
|
+ hint => <<"check the transformation(s) before this one for inconsistencies">>,
|
|
|
+ bad_payload => NotABinary
|
|
|
+ }
|
|
|
+ ),
|
|
|
+ TraceFailureContext = #trace_failure_context{
|
|
|
+ transformation = Transformation,
|
|
|
+ tag = "payload_decode_failed",
|
|
|
+ context = Context
|
|
|
+ },
|
|
|
+ {error, TraceFailureContext}.
|
|
|
|
|
|
encode(Payload, #{type := none}, _Transformation) ->
|
|
|
{ok, Payload};
|