Ver código fonte

Merge pull request #10795 from thalesmg/fix-schema-registry-rule-iolist-r50

fix(schema_registry): ensure `schema_encode` output in rule engine is a binary (r5.0)
Thales Macedo Garitezi 2 anos atrás
pai
commit
a172a6bc33

+ 5 - 1
apps/emqx_rule_engine/src/emqx_rule_funcs.erl

@@ -1115,7 +1115,11 @@ date_to_unix_ts(TimeUnit, Offset, FormatString, InputString) ->
 '$handle_undefined_function'(schema_decode, Args) ->
     error({args_count_error, {schema_decode, Args}});
 '$handle_undefined_function'(schema_encode, [SchemaId, Term | MoreArgs]) ->
-    emqx_ee_schema_registry_serde:encode(SchemaId, Term, MoreArgs);
+    %% encode outputs iolists, but when the rule actions process those
+    %% it might wrongly encode them as JSON lists, so we force them to
+    %% binaries here.
+    IOList = emqx_ee_schema_registry_serde:encode(SchemaId, Term, MoreArgs),
+    iolist_to_binary(IOList);
 '$handle_undefined_function'(schema_encode, Args) ->
     error({args_count_error, {schema_encode, Args}});
 '$handle_undefined_function'(sprintf, [Format | Args]) ->

+ 13 - 4
lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl

@@ -82,8 +82,12 @@ make_trace_fn_action() ->
     #{function => Fn, args => #{}}.
 
 create_rule_http(RuleParams) ->
+    create_rule_http(RuleParams, _Overrides = #{}).
+
+create_rule_http(RuleParams, Overrides) ->
     RepublishTopic = <<"republish/schema_registry">>,
     emqx:subscribe(RepublishTopic),
+    PayloadTemplate = maps:get(payload_template, Overrides, <<>>),
     DefaultParams = #{
         enable => true,
         actions => [
@@ -93,7 +97,7 @@ create_rule_http(RuleParams) ->
                 <<"args">> =>
                     #{
                         <<"topic">> => RepublishTopic,
-                        <<"payload">> => <<>>,
+                        <<"payload">> => PayloadTemplate,
                         <<"qos">> => 0,
                         <<"retain">> => false,
                         <<"user_properties">> => <<>>
@@ -177,10 +181,12 @@ test_params_for(avro, encode1) ->
             "from t\n"
         >>,
     Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
+    PayloadTemplate = <<"${.encoded}">>,
     ExtraArgs = [],
     #{
         sql => SQL,
         payload => Payload,
+        payload_template => PayloadTemplate,
         extra_args => ExtraArgs
     };
 test_params_for(avro, decode1) ->
@@ -251,10 +257,12 @@ test_params_for(protobuf, encode1) ->
             "from t\n"
         >>,
     Payload = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>},
+    PayloadTemplate = <<"${.encoded}">>,
     ExtraArgs = [<<"Person">>],
     #{
         sql => SQL,
         payload => Payload,
+        payload_template => PayloadTemplate,
         extra_args => ExtraArgs
     };
 test_params_for(protobuf, union1) ->
@@ -487,17 +495,18 @@ t_encode(Config) ->
     #{
         sql := SQL,
         payload := Payload,
+        payload_template := PayloadTemplate,
         extra_args := ExtraArgs
     } = test_params_for(SerdeType, encode1),
-    {ok, _} = create_rule_http(#{sql => SQL}),
+    {ok, _} = create_rule_http(#{sql => SQL}, #{payload_template => PayloadTemplate}),
     PayloadBin = emqx_utils_json:encode(Payload),
     emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
     Published = receive_published(?LINE),
     ?assertMatch(
-        #{payload := #{<<"encoded">> := _}},
+        #{payload := P} when is_binary(P),
         Published
     ),
-    #{payload := #{<<"encoded">> := Encoded}} = Published,
+    #{payload := Encoded} = Published,
     {ok, #{deserializer := Deserializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
     ?assertEqual(Payload, apply(Deserializer, [Encoded | ExtraArgs])),
     ok.