Просмотр исходного кода

Merge pull request #13277 from zmstone/0617-handle-kafka-message_too_large-return

0617 handle kafka message too large return
zmstone 1 год назад
Родитель
Сommit
ae22a64157

+ 3 - 0
.ci/docker-compose-file/kafka/kafka-entrypoint.sh

@@ -49,6 +49,9 @@ echo "+++++++ Creating Kafka Topics ++++++++"
 # there seem to be a race condition when creating the topics (too early)
 env KAFKA_CREATE_TOPICS="$KAFKA_CREATE_TOPICS_NG" KAFKA_PORT="$PORT1" create-topics.sh
 
+# create a topic with max.message.bytes=100
+/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server "${SERVER}:${PORT1}" --topic max-100-bytes --partitions 1 --replication-factor 1 --config max.message.bytes=100
+
 echo "+++++++ Wait until Kafka ports are down ++++++++"
 
 bash -c 'while printf "" 2>>/dev/null >>/dev/tcp/$0/$1; do sleep 1; done' $SERVER $PORT1

+ 1 - 1
apps/emqx_bridge_azure_event_hub/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
+    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.5"}}},
     {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
     {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

+ 1 - 1
apps/emqx_bridge_confluent/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
+    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.5"}}},
     {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
     {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

+ 1 - 1
apps/emqx_bridge_kafka/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
+    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.5"}}},
     {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
     {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

+ 9 - 4
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -487,12 +487,17 @@ do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) ->
 %% Wolff producer never gives up retrying
 %% so there can only be 'ok' results.
 on_kafka_ack(_Partition, Offset, {ReplyFn, Args}) when is_integer(Offset) ->
-    %% the ReplyFn is emqx_resource_buffer_worker:handle_async_reply/2
+    %% the ReplyFn is emqx_rule_runtime:inc_action_metrics/2
     apply(ReplyFn, Args ++ [ok]);
 on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) ->
-    %% wolff should bump the dropped_queue_full counter
-    %% do not apply the callback (which is basically to bump success or fail counter)
-    ok.
+    %% wolff should bump the dropped_queue_full counter in handle_telemetry_event/4
+    %% so there is no need to apply the callback here
+    ok;
+on_kafka_ack(_Partition, message_too_large, {ReplyFn, Args}) ->
+    %% wolff should bump the message 'dropped' counter with handle_telemetry_event/4.
+    %% however 'dropped' is not mapped to EMQX metrics name
+    %% so we reply error here
+    apply(ReplyFn, Args ++ [{error, message_too_large}]).
 
 %% Note: since wolff client has its own replayq that is not managed by
 %% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here.  Otherwise,

+ 27 - 3
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -185,6 +185,10 @@ action_config(ConnectorName, Overrides) ->
     emqx_utils_maps:deep_merge(Cfg1, Overrides).
 
 bridge_v2_config(ConnectorName) ->
+    KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
+    bridge_v2_config(ConnectorName, KafkaTopic).
+
+bridge_v2_config(ConnectorName, KafkaTopic) ->
     #{
         <<"connector">> => ConnectorName,
         <<"enable">> => true,
@@ -209,9 +213,7 @@ bridge_v2_config(ConnectorName) ->
             <<"query_mode">> => <<"sync">>,
             <<"required_acks">> => <<"all_isr">>,
             <<"sync_query_timeout">> => <<"5s">>,
-            <<"topic">> => list_to_binary(
-                emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition()
-            )
+            <<"topic">> => list_to_binary(KafkaTopic)
         },
         <<"local_topic">> => <<"kafka_t/#">>,
         <<"resource_opts">> => #{
@@ -378,6 +380,28 @@ t_local_topic(_) ->
     ok = emqx_connector:remove(?TYPE, test_connector),
     ok.
 
+t_message_too_large(_) ->
+    BridgeV2Config = bridge_v2_config(<<"test_connector4">>, "max-100-bytes"),
+    ConnectorConfig = connector_config(),
+    {ok, _} = emqx_connector:create(?TYPE, test_connector4, ConnectorConfig),
+    BridgeName = test_bridge4,
+    {ok, _} = emqx_bridge_v2:create(?TYPE, BridgeName, BridgeV2Config),
+    BridgeV2Id = emqx_bridge_v2:id(?TYPE, BridgeName),
+    TooLargePayload = iolist_to_binary(lists:duplicate(100, 100)),
+    ?assertEqual(0, emqx_resource_metrics:failed_get(BridgeV2Id)),
+    emqx:publish(emqx_message:make(<<"kafka_t/hej">>, TooLargePayload)),
+    ?retry(
+        _Sleep0 = 50,
+        _Attempts0 = 100,
+        begin
+            ?assertEqual(1, emqx_resource_metrics:failed_get(BridgeV2Id)),
+            ok
+        end
+    ),
+    ok = emqx_bridge_v2:remove(?TYPE, BridgeName),
+    ok = emqx_connector:remove(?TYPE, test_connector4),
+    ok.
+
 t_unknown_topic(_Config) ->
     ConnectorName = <<"test_connector">>,
     BridgeName = <<"test_bridge">>,

changes/ee/fix-13079.en.md → changes/ee/fix-13277.en.md


+ 1 - 1
mix.exs

@@ -211,7 +211,7 @@ defmodule EMQXUmbrella.MixProject do
       {:hstreamdb_erl,
        github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"},
       {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
-      {:wolff, github: "kafka4beam/wolff", tag: "1.10.4"},
+      {:wolff, github: "kafka4beam/wolff", tag: "1.10.5"},
       {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
       {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
       {:brod, github: "kafka4beam/brod", tag: "3.18.0"},