Pārlūkot izejas kodu

Merge pull request #14079 from zmstone/241025-upgrade-kafka-consumer-lib

chore: upgrade Kafka client libs
zmstone 1 gadu atpakaļ
vecāks
revīzija
e90c1c403b

+ 3 - 3
apps/emqx_bridge_azure_event_hub/rebar.config

@@ -3,10 +3,10 @@
 {erl_opts, [debug_info]}.
 {deps, [
     {wolff, "4.0.3"},
-    {kafka_protocol, "4.1.8"},
+    {kafka_protocol, "4.1.9"},
     {brod_gssapi, "0.1.3"},
-    {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
-    {snappyer, "1.2.9"},
+    {brod, "4.3.1"},
+    {snappyer, "1.2.10"},
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}},
     {emqx_bridge, {path, "../../apps/emqx_bridge"}}

+ 3 - 3
apps/emqx_bridge_confluent/rebar.config

@@ -3,10 +3,10 @@
 {erl_opts, [debug_info]}.
 {deps, [
     {wolff, "4.0.3"},
-    {kafka_protocol, "4.1.8"},
+    {kafka_protocol, "4.1.9"},
     {brod_gssapi, "0.1.3"},
-    {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
-    {snappyer, "1.2.9"},
+    {brod, "4.3.1"},
+    {snappyer, "1.2.10"},
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}},
     {emqx_bridge, {path, "../../apps/emqx_bridge"}}

+ 3 - 3
apps/emqx_bridge_kafka/rebar.config

@@ -3,10 +3,10 @@
 {erl_opts, [debug_info]}.
 {deps, [
     {wolff, "4.0.3"},
-    {kafka_protocol, "4.1.8"},
+    {kafka_protocol, "4.1.9"},
     {brod_gssapi, "0.1.3"},
-    {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
-    {snappyer, "1.2.9"},
+    {brod, "4.3.1"},
+    {snappyer, "1.2.10"},
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}},
     {emqx_bridge, {path, "../../apps/emqx_bridge"}}

+ 9 - 2
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_action_info.erl

@@ -36,8 +36,7 @@ connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
     {Params1, V1Config4} = maps:take(<<"parameters">>, V1Config3),
     TopLevelCfgKeys = [to_bin(K) || {K, _} <- emqx_bridge_kafka:fields(consumer_opts), K =/= kafka],
     TopLevelCfg = maps:with(TopLevelCfgKeys, Params1),
-    %% `topic' is v2-only
-    Params = maps:without([<<"topic">> | TopLevelCfgKeys], Params1),
+    Params = maps:with(v1_source_parameters(), Params1),
     V1Config5 = emqx_utils_maps:deep_merge(V1Config4, TopLevelCfg),
     V1Config = emqx_utils_maps:update_if_present(
         <<"resource_opts">>,
@@ -64,6 +63,14 @@ bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
 %% Internal helper functions
 %%------------------------------------------------------------------------------------------
 
+v1_source_parameters() ->
+    [
+        <<"max_batch_bytes">>,
+        <<"max_rejoin_attempts">>,
+        <<"offset_commit_interval_seconds">>,
+        <<"offset_reset_policy">>
+    ].
+
 %% The new schema has a single kafka topic, so we take it from topic mapping when
 %% converting from v1.
 maybe_set_kafka_topic(#{<<"topic_mapping">> := [#{<<"kafka_topic">> := Topic} | _]} = Params) ->

+ 8 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl

@@ -92,6 +92,14 @@ fields(source_parameters) ->
                     required => false,
                     desc => ?DESC(group_id)
                 }
+            )},
+        {max_wait_time,
+            mk(
+                emqx_schema:timeout_duration_ms(),
+                #{
+                    default => <<"1s">>,
+                    desc => ?DESC("max_wait_time")
+                }
             )}
         | Fields
     ];

+ 3 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl

@@ -54,6 +54,7 @@
     group_id => binary(),
     key_encoding_mode := encoding_mode(),
     max_batch_bytes := emqx_schema:bytesize(),
+    max_wait_time := non_neg_integer(),
     max_rejoin_attempts := non_neg_integer(),
     offset_commit_interval_seconds := pos_integer(),
     offset_reset_policy := offset_reset_policy(),
@@ -414,6 +415,7 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) ->
         parameters := #{
             key_encoding_mode := KeyEncodingMode,
             max_batch_bytes := MaxBatchBytes,
+            max_wait_time := MaxWaitTime,
             max_rejoin_attempts := MaxRejoinAttempts,
             offset_commit_interval_seconds := OffsetCommitInterval,
             offset_reset_policy := OffsetResetPolicy0,
@@ -445,6 +447,7 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) ->
     ConsumerConfig = [
         {begin_offset, BeginOffset},
         {max_bytes, MaxBatchBytes},
+        {max_wait_time, MaxWaitTime},
         {offset_reset_policy, OffsetResetPolicy}
     ],
     GroupConfig = [

+ 0 - 1
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl

@@ -17,7 +17,6 @@
 -define(BRIDGE_TYPE_BIN, <<"kafka_consumer">>).
 -define(CONNECTOR_TYPE_BIN, <<"kafka_consumer">>).
 -define(SOURCE_TYPE_BIN, <<"kafka_consumer">>).
--define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_kafka]).
 
 %%------------------------------------------------------------------------------
 %% CT boilerplate

+ 1 - 0
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl

@@ -207,6 +207,7 @@ source_config(Overrides0) ->
                 #{
                     <<"key_encoding_mode">> => <<"none">>,
                     <<"max_batch_bytes">> => <<"896KB">>,
+                    <<"max_wait_time">> => <<"500ms">>,
                     <<"max_rejoin_attempts">> => <<"5">>,
                     <<"offset_reset_policy">> => <<"latest">>,
                     <<"topic">> => <<"please override">>,

+ 1 - 0
changes/ee/feat-14079.en.md

@@ -0,0 +1 @@
+Added the option of setting `max_wait_time` for Kafka Consumer sources.

+ 5 - 0
changes/ee/fix-14079.en.md

@@ -0,0 +1,5 @@
+Fix Kafka consumer latency issue when partitions share the same partition leader in Kafka.
+
+When fetching from a Kafka partition leader, the request is blocked until the previously sent fetch requests have returned.
+This is because Kafka only serves one inflight fetch request at a time, it causes a head-of-line blocking if there happends to be more than one partitions sharing the same parition leader broker.
+The fix in this change is to make sure partition consumers create their own TCP connection to the partition leader.

+ 6 - 4
mix.exs

@@ -160,6 +160,8 @@ defmodule EMQXUmbrella.MixProject do
       common_dep(:sasl_auth),
       # avlizer currently uses older :erlavro version
       common_dep(:erlavro),
+      # in conflict by erlavro
+      common_dep(:snappyer),
       common_dep(:crc32cer)
     ]
   end
@@ -278,13 +280,13 @@ defmodule EMQXUmbrella.MixProject do
   def common_dep(:brod_gssapi), do: {:brod_gssapi, "0.1.3"}
 
   def common_dep(:kafka_protocol),
-    do: {:kafka_protocol, "4.1.8", override: true}
+    do: {:kafka_protocol, "4.1.9", override: true}
 
-  def common_dep(:brod), do: {:brod, github: "kafka4beam/brod", tag: "3.18.0"}
+  def common_dep(:brod), do: {:brod, "4.3.1"}
   ## TODO: remove `mix.exs` from `wolff` and remove this override
   ## TODO: remove `mix.exs` from `pulsar` and remove this override
-  def common_dep(:snappyer), do: {:snappyer, "1.2.9", override: true}
-  def common_dep(:crc32cer), do: {:crc32cer, "0.1.8", override: true}
+  def common_dep(:snappyer), do: {:snappyer, "1.2.10", override: true}
+  def common_dep(:crc32cer), do: {:crc32cer, "0.1.11", override: true}
   def common_dep(:jesse), do: {:jesse, github: "emqx/jesse", tag: "1.8.1.1"}
   def common_dep(:erlavro), do: {:erlavro, github: "emqx/erlavro", tag: "2.10.0", override: true}
 

+ 4 - 1
rel/i18n/emqx_bridge_kafka.hocon

@@ -63,7 +63,10 @@ consumer_offset_commit_interval_seconds.label:
 """Offset Commit Interval"""
 
 consumer_max_batch_bytes.desc:
-"""Set how many bytes to pull from Kafka in each fetch request. Please note that if the configured value is smaller than the message size in Kafka, it may negatively impact the fetch performance."""
+"""Set how many bytes to pull from Kafka in each fetch request.
+Messages are fetched in batches by the consumer, and if the first record batch in the first non-empty
+partition of the fetch is larger than this value, the record batch will still be returned to ensure
+that the consumer can make progress. As such, this is not an absolute maximum. Set `1` for minimal latency."""
 
 consumer_max_batch_bytes.label:
 """Fetch Bytes"""

+ 5 - 0
rel/i18n/emqx_bridge_kafka_consumer_schema.hocon

@@ -20,4 +20,9 @@ emqx_bridge_kafka_consumer_schema {
   group_id.label:
   """Custom Consumer Group Id"""
 
+  max_wait_time.desc:
+  """Maximum amount of time that is waited for the Kafka broker to send a fetch response."""
+  max_wait_time.label:
+  """Max Wait Time"""
+
 }