Pārlūkot izejas kodu

fix(kafka): fix result handling when sending message with invalid header

Fixes https://emqx.atlassian.net/browse/EMQX-10846
Paulo Zulato 2 gadi atpakaļ
vecāks
revīzija
60e6217496

+ 79 - 28
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -213,7 +213,7 @@ on_stop(InstanceId, _State) ->
     ok.
 
 on_query(
-    _InstId,
+    InstId,
     {send_message, Message},
     #{
         message_template := Template,
@@ -229,19 +229,34 @@ on_query(
         ext_headers_tokens => KafkaExtHeadersTokens,
         headers_val_encode_mode => KafkaHeadersValEncodeMode
     },
-    KafkaMessage = render_message(Template, KafkaHeaders, Message),
-    ?tp(
-        emqx_bridge_kafka_impl_producer_sync_query,
-        #{headers_config => KafkaHeaders, instance_id => _InstId}
-    ),
     try
-        {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout),
-        ok
+        KafkaMessage = render_message(Template, KafkaHeaders, Message),
+        ?tp(
+            emqx_bridge_kafka_impl_producer_sync_query,
+            #{headers_config => KafkaHeaders, instance_id => InstId}
+        ),
+        do_send_msg(sync, KafkaMessage, Producers, SyncTimeout)
     catch
-        error:{producer_down, _} = Reason ->
-            {error, Reason};
-        error:timeout ->
-            {error, timeout}
+        throw:{bad_kafka_header, _} = Error ->
+            ?tp(
+                emqx_bridge_kafka_impl_producer_sync_query_failed,
+                #{
+                    headers_config => KafkaHeaders,
+                    instance_id => InstId,
+                    reason => Error
+                }
+            ),
+            {error, {unrecoverable_error, Error}};
+        throw:{bad_kafka_headers, _} = Error ->
+            ?tp(
+                emqx_bridge_kafka_impl_producer_sync_query_failed,
+                #{
+                    headers_config => KafkaHeaders,
+                    instance_id => InstId,
+                    reason => Error
+                }
+            ),
+            {error, {unrecoverable_error, Error}}
     end.
 
 %% @doc The callback API for rule-engine (or bridge without rules)
@@ -251,7 +266,7 @@ on_query(
 %% E.g. the output of rule-engine process chain
 %% or the direct mapping from an MQTT message.
 on_query_async(
-    _InstId,
+    InstId,
     {send_message, Message},
     AsyncReplyFn,
     #{
@@ -267,21 +282,35 @@ on_query_async(
         ext_headers_tokens => KafkaExtHeadersTokens,
         headers_val_encode_mode => KafkaHeadersValEncodeMode
     },
-    KafkaMessage = render_message(Template, KafkaHeaders, Message),
-    ?tp(
-        emqx_bridge_kafka_impl_producer_async_query,
-        #{headers_config => KafkaHeaders, instance_id => _InstId}
-    ),
-    %% * Must be a batch because wolff:send and wolff:send_sync are batch APIs
-    %% * Must be a single element batch because wolff books calls, but not batch sizes
-    %%   for counters and gauges.
-    Batch = [KafkaMessage],
-    %% The retuned information is discarded here.
-    %% If the producer process is down when sending, this function would
-    %% raise an error exception which is to be caught by the caller of this callback
-    {_Partition, Pid} = wolff:send(Producers, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}),
-    %% this Pid is so far never used because Kafka producer is by-passing the buffer worker
-    {ok, Pid}.
+    try
+        KafkaMessage = render_message(Template, KafkaHeaders, Message),
+        ?tp(
+            emqx_bridge_kafka_impl_producer_async_query,
+            #{headers_config => KafkaHeaders, instance_id => InstId}
+        ),
+        do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn)
+    catch
+        throw:{bad_kafka_header, _} = Error ->
+            ?tp(
+                emqx_bridge_kafka_impl_producer_async_query_failed,
+                #{
+                    headers_config => KafkaHeaders,
+                    instance_id => InstId,
+                    reason => Error
+                }
+            ),
+            {error, {unrecoverable_error, Error}};
+        throw:{bad_kafka_headers, _} = Error ->
+            ?tp(
+                emqx_bridge_kafka_impl_producer_async_query_failed,
+                #{
+                    headers_config => KafkaHeaders,
+                    instance_id => InstId,
+                    reason => Error
+                }
+            ),
+            {error, {unrecoverable_error, Error}}
+    end.
 
 compile_message_template(T) ->
     KeyTemplate = maps:get(key, T, <<"${.clientid}">>),
@@ -337,6 +366,28 @@ render_timestamp(Template, Message) ->
             erlang:system_time(millisecond)
     end.
 
+do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) ->
+    try
+        {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout),
+        ok
+    catch
+        error:{producer_down, _} = Reason ->
+            {error, Reason};
+        error:timeout ->
+            {error, timeout}
+    end;
+do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) ->
+    %% * Must be a batch because wolff:send and wolff:send_sync are batch APIs
+    %% * Must be a single element batch because wolff books calls, but not batch sizes
+    %%   for counters and gauges.
+    Batch = [KafkaMessage],
+    %% The retuned information is discarded here.
+    %% If the producer process is down when sending, this function would
+    %% raise an error exception which is to be caught by the caller of this callback
+    {_Partition, Pid} = wolff:send(Producers, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}),
+    %% this Pid is so far never used because Kafka producer is by-passing the buffer worker
+    {ok, Pid}.
+
 %% Wolff producer never gives up retrying
 %% so there can only be 'ok' results.
 on_kafka_ack(_Partition, Offset, {ReplyFn, Args}) when is_integer(Offset) ->

+ 84 - 0
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl

@@ -677,6 +677,90 @@ t_wrong_headers(_Config) ->
     ),
     ok.
 
+t_wrong_headers_from_message(Config) ->
+    HostsString = kafka_hosts_string_sasl(),
+    AuthSettings = valid_sasl_plain_settings(),
+    Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
+    Type = ?BRIDGE_TYPE,
+    Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
+    ResourceId = emqx_bridge_resource:resource_id(Type, Name),
+    BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
+    KafkaTopic = "test-topic-one-partition",
+    Conf = config_with_headers(#{
+        "authentication" => AuthSettings,
+        "kafka_hosts_string" => HostsString,
+        "kafka_topic" => KafkaTopic,
+        "instance_id" => ResourceId,
+        "kafka_headers" => <<"${payload}">>,
+        "producer" => #{
+            "kafka" => #{
+                "buffer" => #{
+                    "memory_overload_protection" => false
+                }
+            }
+        },
+        "ssl" => #{}
+    }),
+    {ok, #{config := ConfigAtom1}} = emqx_bridge:create(
+        Type, erlang:list_to_atom(Name), Conf
+    ),
+    ConfigAtom = ConfigAtom1#{bridge_name => Name},
+    {ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom),
+    Time1 = erlang:unique_integer(),
+    Payload1 = <<"wrong_header">>,
+    Msg1 = #{
+        clientid => integer_to_binary(Time1),
+        payload => Payload1,
+        timestamp => Time1
+    },
+    ?assertError(
+        {badmatch, {error, {unrecoverable_error, {bad_kafka_headers, Payload1}}}},
+        send(Config, ResourceId, Msg1, State)
+    ),
+    Time2 = erlang:unique_integer(),
+    Payload2 = <<"[{\"foo\":\"bar\"}, {\"foo2\":\"bar2\"}]">>,
+    Msg2 = #{
+        clientid => integer_to_binary(Time2),
+        payload => Payload2,
+        timestamp => Time2
+    },
+    ?assertError(
+        {badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"foo">>, <<"bar">>}]}}}},
+        send(Config, ResourceId, Msg2, State)
+    ),
+    Time3 = erlang:unique_integer(),
+    Payload3 = <<"[{\"key\":\"foo\"}, {\"value\":\"bar\"}]">>,
+    Msg3 = #{
+        clientid => integer_to_binary(Time3),
+        payload => Payload3,
+        timestamp => Time3
+    },
+    ?assertError(
+        {badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"key">>, <<"foo">>}]}}}},
+        send(Config, ResourceId, Msg3, State)
+    ),
+    Time4 = erlang:unique_integer(),
+    Payload4 = <<"[{\"key\":\"foo\", \"value\":\"bar\"}]">>,
+    Msg4 = #{
+        clientid => integer_to_binary(Time4),
+        payload => Payload4,
+        timestamp => Time4
+    },
+    ?assertError(
+        {badmatch,
+            {error,
+                {unrecoverable_error,
+                    {bad_kafka_header, [{<<"key">>, <<"foo">>}, {<<"value">>, <<"bar">>}]}}}},
+        send(Config, ResourceId, Msg4, State)
+    ),
+    %% TODO: refactor those into init/end per testcase
+    ok = ?PRODUCER:on_stop(ResourceId, State),
+    ?assertEqual([], supervisor:which_children(wolff_client_sup)),
+    ?assertEqual([], supervisor:which_children(wolff_producers_sup)),
+    ok = emqx_bridge_resource:remove(BridgeId),
+    delete_all_bridges(),
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Helper functions
 %%------------------------------------------------------------------------------

+ 1 - 0
changes/ee/fix-11508.en.md

@@ -0,0 +1 @@
+Fix message error handling on Kafka bridge when headers translate to an invalid value.