Browse Source

feat(kafka consumer): allow specifying `max_wait_time`

Thales Macedo Garitezi 1 year atrás
parent
commit
def24c2ab2

+ 8 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl

@@ -92,6 +92,14 @@ fields(source_parameters) ->
                     required => false,
                     desc => ?DESC(group_id)
                 }
+            )},
+        {max_wait_time,
+            mk(
+                emqx_schema:timeout_duration_ms(),
+                #{
+                    default => <<"1s">>,
+                    desc => ?DESC("max_wait_time")
+                }
             )}
         | Fields
     ];

+ 2 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl

@@ -414,6 +414,7 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) ->
         parameters := #{
             key_encoding_mode := KeyEncodingMode,
             max_batch_bytes := MaxBatchBytes,
+            max_wait_time := MaxWaitTime,
             max_rejoin_attempts := MaxRejoinAttempts,
             offset_commit_interval_seconds := OffsetCommitInterval,
             offset_reset_policy := OffsetResetPolicy0,
@@ -445,6 +446,7 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) ->
     ConsumerConfig = [
         {begin_offset, BeginOffset},
         {max_bytes, MaxBatchBytes},
+        {max_wait_time, MaxWaitTime},
         {offset_reset_policy, OffsetResetPolicy}
     ],
     GroupConfig = [

+ 1 - 0
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl

@@ -207,6 +207,7 @@ source_config(Overrides0) ->
                 #{
                     <<"key_encoding_mode">> => <<"none">>,
                     <<"max_batch_bytes">> => <<"896KB">>,
+                    <<"max_wait_time">> => <<"500ms">>,
                     <<"max_rejoin_attempts">> => <<"5">>,
                     <<"offset_reset_policy">> => <<"latest">>,
                     <<"topic">> => <<"please override">>,

+ 1 - 0
changes/ee/feat-14079.en.md

@@ -0,0 +1 @@
+Added the option of setting `max_wait_time` for Kafka Consumer sources.

+ 5 - 0
rel/i18n/emqx_bridge_kafka_consumer_schema.hocon

@@ -20,4 +20,9 @@ emqx_bridge_kafka_consumer_schema {
   group_id.label:
   """Custom Consumer Group Id"""
 
+  max_wait_time.desc:
+  """Maximum amount of time that is waited for the Kafka broker to send a fetch response."""
+  max_wait_time.label:
+  """Max Wait Time"""
+
 }