Explorar o código

Merge pull request #13079 from zmstone/0521-handle-kafka-message_too_large-error

chore: upgrade kafka producer lib wolff to 1.10.4 handle `message_too_large`
zmstone hai 1 ano
pai
achega
e381696c55

+ 1 - 1
apps/emqx_bridge_azure_event_hub/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.2"}}},
+    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
     {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
     {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},

+ 1 - 1
apps/emqx_bridge_confluent/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.2"}}},
+    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
     {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
     {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},

+ 1 - 1
apps/emqx_bridge_kafka/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.2"}}},
+    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
     {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
     {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},

+ 3 - 3
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -211,7 +211,7 @@ ensure_client(ClientId, Hosts, ClientConfig) ->
     case wolff_client_sup:find_client(ClientId) of
         {ok, _Pid} ->
             ok;
-        {error, no_such_client} ->
+        {error, #{reason := no_such_client}} ->
             case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
                 {ok, _} ->
                     ?SLOG(info, #{
@@ -543,13 +543,13 @@ check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions) ->
         {ok, Pid} ->
             ok = check_topic_status(ClientId, Pid, KafkaTopic),
             ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions);
-        {error, no_such_client} ->
+        {error, #{reason := no_such_client}} ->
             throw(#{
                 reason => cannot_find_kafka_client,
                 kafka_client => ClientId,
                 kafka_topic => KafkaTopic
             });
-        {error, client_supervisor_not_initialized} ->
+        {error, #{reason := client_supervisor_not_initialized}} ->
             throw(#{
                 reason => restarting,
                 kafka_client => ClientId,

+ 6 - 0
changes/ee/fix-13079.en.md

@@ -0,0 +1,6 @@
+Improve Kafka producer error handling for `message_too_large`.
+
+Prior to this change, Kafka producers would retry sending oversized batches (`message_too_large` error) in hopes of a server side configuration fix (`max.message.bytes`).
+
+Now, oversized messages are automatically split into single-message batches for retry.
+If a message still exceeds size limits, it will be dropped to maintain data flow.

+ 1 - 1
mix.exs

@@ -210,7 +210,7 @@ defmodule EMQXUmbrella.MixProject do
       {:hstreamdb_erl,
        github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"},
       {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
-      {:wolff, github: "kafka4beam/wolff", tag: "1.10.2"},
+      {:wolff, github: "kafka4beam/wolff", tag: "1.10.4"},
       {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
       {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
       {:brod, github: "kafka4beam/brod", tag: "3.16.8"},