Parcourir la source

fix(kafka): fix template processing for header

Fixes https://emqx.atlassian.net/browse/EMQX-10846
Paulo Zulato il y a 2 ans
Parent
commit
0b86f04bae

+ 4 - 1
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -664,6 +664,9 @@ kvlist_headers([#{<<"key">> := K, <<"value">> := V} | Headers], Acc) ->
     kvlist_headers(Headers, [{K, V} | Acc]);
 kvlist_headers([{K, V} | Headers], Acc) ->
     kvlist_headers(Headers, [{K, V} | Acc]);
+kvlist_headers([KVList | Headers], Acc) when is_list(KVList) ->
+    %% for instance, when user sets a json list as headers like '[{"foo":"bar"}, {"foo2":"bar2"}]'.
+    kvlist_headers(KVList ++ Headers, Acc);
 kvlist_headers([BadHeader | _], _) ->
     throw({bad_kafka_header, BadHeader}).
 
@@ -694,7 +697,7 @@ merge_kafka_headers(HeadersTks, ExtHeaders, Msg) ->
         [undefined] ->
             ExtHeaders;
         [MaybeJson] when is_binary(MaybeJson) ->
-            case emqx_utils_json:safe_decode(MaybeJson) of
+            case emqx_utils_json:safe_decode(MaybeJson, [return_maps]) of
                 {ok, JsonTerm} when is_map(JsonTerm) ->
                     maps:to_list(JsonTerm) ++ ExtHeaders;
                 {ok, JsonTerm} when is_list(JsonTerm) ->

+ 83 - 38
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl

@@ -560,7 +560,7 @@ t_send_message_with_headers(Config) ->
         "kafka_hosts_string" => HostsString,
         "kafka_topic" => KafkaTopic,
         "instance_id" => ResourceId,
-        "kafka_headers" => <<"${pub_props}">>,
+        "kafka_headers" => <<"${payload.header}">>,
         "kafka_ext_headers" => emqx_utils_json:encode(
             [
                 #{
@@ -568,8 +568,8 @@ t_send_message_with_headers(Config) ->
                     <<"kafka_ext_header_value">> => <<"${clientid}">>
                 },
                 #{
-                    <<"kafka_ext_header_key">> => <<"payload">>,
-                    <<"kafka_ext_header_value">> => <<"${payload}">>
+                    <<"kafka_ext_header_key">> => <<"ext_header_val">>,
+                    <<"kafka_ext_header_value">> => <<"${payload.ext_header_val}">>
                 }
             ]
         ),
@@ -587,12 +587,42 @@ t_send_message_with_headers(Config) ->
     ),
     ConfigAtom = ConfigAtom1#{bridge_name => Name},
     {ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom),
-    Time = erlang:unique_integer(),
-    BinTime = integer_to_binary(Time),
-    Msg = #{
-        clientid => BinTime,
-        payload => <<"payload">>,
-        timestamp => Time
+    Time1 = erlang:unique_integer(),
+    BinTime1 = integer_to_binary(Time1),
+    Payload1 = emqx_utils_json:encode(
+        #{
+            <<"header">> => #{
+                <<"foo">> => <<"bar">>
+            },
+            <<"ext_header_val">> => <<"ext header ok">>
+        }
+    ),
+    Msg1 = #{
+        clientid => BinTime1,
+        payload => Payload1,
+        timestamp => Time1
+    },
+    Time2 = erlang:unique_integer(),
+    BinTime2 = integer_to_binary(Time2),
+    Payload2 = emqx_utils_json:encode(
+        #{
+            <<"header">> => [
+                #{
+                    <<"key">> => <<"foo1">>,
+                    <<"value">> => <<"bar1">>
+                },
+                #{
+                    <<"key">> => <<"foo2">>,
+                    <<"value">> => <<"bar2">>
+                }
+            ],
+            <<"ext_header_val">> => <<"ext header ok">>
+        }
+    ),
+    Msg2 = #{
+        clientid => BinTime2,
+        payload => Payload2,
+        timestamp => Time2
     },
     {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
     ct:pal("base offset before testing ~p", [Offset]),
@@ -603,7 +633,8 @@ t_send_message_with_headers(Config) ->
         end,
     ?check_trace(
         begin
-            ok = send(Config, ResourceId, Msg, State)
+            ok = send(Config, ResourceId, Msg1, State),
+            ok = send(Config, ResourceId, Msg2, State)
         end,
         fun(Trace) ->
             ?assertMatch(
@@ -616,11 +647,27 @@ t_send_message_with_headers(Config) ->
                                     [{var, [<<"clientid">>]}]
                                 },
                                 {
-                                    [{str, <<"payload">>}],
-                                    [{var, [<<"payload">>]}]
+                                    [{str, <<"ext_header_val">>}],
+                                    [{var, [<<"payload">>, <<"ext_header_val">>]}]
+                                }
+                            ],
+                            headers_tokens := [{var, [<<"payload">>, <<"header">>]}],
+                            headers_val_encode_mode := json
+                        }
+                    },
+                    #{
+                        headers_config := #{
+                            ext_headers_tokens := [
+                                {
+                                    [{str, <<"clientid">>}],
+                                    [{var, [<<"clientid">>]}]
+                                },
+                                {
+                                    [{str, <<"ext_header_val">>}],
+                                    [{var, [<<"payload">>, <<"ext_header_val">>]}]
                                 }
                             ],
-                            headers_tokens := [{var, [<<"pub_props">>]}],
+                            headers_tokens := [{var, [<<"payload">>, <<"header">>]}],
                             headers_val_encode_mode := json
                         }
                     }
@@ -629,16 +676,28 @@ t_send_message_with_headers(Config) ->
             )
         end
     ),
-    {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
+    {ok, {_, KafkaMsgs}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
     ?assertMatch(
-        #kafka_message{
-            headers = [
-                {<<"clientid">>, _},
-                {<<"payload">>, <<"\"payload\"">>}
-            ],
-            key = BinTime
-        },
-        KafkaMsg
+        [
+            #kafka_message{
+                headers = [
+                    {<<"foo">>, <<"\"bar\"">>},
+                    {<<"clientid">>, _},
+                    {<<"ext_header_val">>, <<"\"ext header ok\"">>}
+                ],
+                key = BinTime1
+            },
+            #kafka_message{
+                headers = [
+                    {<<"foo1">>, <<"\"bar1\"">>},
+                    {<<"foo2">>, <<"\"bar2\"">>},
+                    {<<"clientid">>, _},
+                    {<<"ext_header_val">>, <<"\"ext header ok\"">>}
+                ],
+                key = BinTime2
+            }
+        ],
+        KafkaMsgs
     ),
     %% TODO: refactor those into init/end per testcase
     ok = ?PRODUCER:on_stop(ResourceId, State),
@@ -769,7 +828,7 @@ t_wrong_headers_from_message(Config) ->
         timestamp => Time2
     },
     ?assertError(
-        {badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"foo">>, <<"bar">>}]}}}},
+        {badmatch, {error, {unrecoverable_error, {bad_kafka_header, #{<<"foo">> := <<"bar">>}}}}},
         send(Config, ResourceId, Msg2, State)
     ),
     Time3 = erlang:unique_integer(),
@@ -780,23 +839,9 @@ t_wrong_headers_from_message(Config) ->
         timestamp => Time3
     },
     ?assertError(
-        {badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"key">>, <<"foo">>}]}}}},
+        {badmatch, {error, {unrecoverable_error, {bad_kafka_header, #{<<"key">> := <<"foo">>}}}}},
         send(Config, ResourceId, Msg3, State)
     ),
-    Time4 = erlang:unique_integer(),
-    Payload4 = <<"[{\"key\":\"foo\", \"value\":\"bar\"}]">>,
-    Msg4 = #{
-        clientid => integer_to_binary(Time4),
-        payload => Payload4,
-        timestamp => Time4
-    },
-    ?assertError(
-        {badmatch,
-            {error,
-                {unrecoverable_error,
-                    {bad_kafka_header, [{<<"key">>, <<"foo">>}, {<<"value">>, <<"bar">>}]}}}},
-        send(Config, ResourceId, Msg4, State)
-    ),
     %% TODO: refactor those into init/end per testcase
     ok = ?PRODUCER:on_stop(ResourceId, State),
     ?assertEqual([], supervisor:which_children(wolff_client_sup)),

+ 1 - 0
changes/ee/fix-11527.en.md

@@ -0,0 +1 @@
+Fixed an issue with Kafka header handling when placeholders resolve to an array of key-value pairs (e.g.: `[{"key": "foo", "value": "bar"}]`).