Просмотр исходного кода

feat: add max_linger_time and max_linger_bytes to Kafka producer

zmstone 1 год назад
Родитель
Сommit
d47cc0c367

+ 1 - 1
apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_azure_event_hub, [
     {description, "EMQX Enterprise Azure Event Hub Bridge"},
-    {vsn, "0.1.8"},
+    {vsn, "0.2.0"},
     {registered, []},
     {applications, [
         kernel,

+ 2 - 0
apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl

@@ -289,6 +289,8 @@ values(producer) ->
                 key => <<"${.clientid}">>,
                 value => <<"${.}">>
             },
+            max_linger_time => <<"5ms">>,
+            max_linger_bytes => <<"10MB">>,
             max_batch_bytes => <<"896KB">>,
             partition_strategy => <<"random">>,
             required_acks => <<"all_isr">>,

+ 1 - 1
apps/emqx_bridge_confluent/src/emqx_bridge_confluent.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_confluent, [
     {description, "EMQX Enterprise Confluent Connector and Action"},
-    {vsn, "0.1.3"},
+    {vsn, "0.2.0"},
     {registered, []},
     {applications, [
         kernel,

+ 2 - 0
apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl

@@ -251,6 +251,8 @@ values(action) ->
                 key => <<"${.clientid}">>,
                 value => <<"${.}">>
             },
+            max_linger_time => <<"5ms">>,
+            max_linger_bytes => <<"10MB">>,
             max_batch_bytes => <<"896KB">>,
             partition_strategy => <<"random">>,
             required_acks => <<"all_isr">>,

+ 1 - 1
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge_kafka, [
     {description, "EMQX Enterprise Kafka Bridge"},
-    {vsn, "0.4.1"},
+    {vsn, "0.5.0"},
     {registered, [emqx_bridge_kafka_consumer_sup]},
     {applications, [
         kernel,

+ 12 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl

@@ -175,6 +175,8 @@ values(producer_values) ->
             value => <<"${.}">>,
             timestamp => <<"${.timestamp}">>
         },
+        max_linger_time => <<"5ms">>,
+        max_linger_bytes => <<"10MB">>,
         max_batch_bytes => <<"896KB">>,
         compression => <<"no_compression">>,
         partition_strategy => <<"random">>,
@@ -385,6 +387,16 @@ fields(producer_kafka_opts) ->
     [
         {topic, mk(emqx_schema:template(), #{required => true, desc => ?DESC(kafka_topic)})},
         {message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})},
+        {max_linger_time,
+            mk(emqx_schema:timeout_duration_ms(), #{
+                default => <<"0ms">>,
+                desc => ?DESC(max_linger_time)
+            })},
+        {max_linger_bytes,
+            mk(emqx_schema:bytesize(), #{
+                default => <<"10MB">>,
+                desc => ?DESC(max_linger_bytes)
+            })},
         {max_batch_bytes,
             mk(emqx_schema:bytesize(), #{default => <<"896KB">>, desc => ?DESC(max_batch_bytes)})},
         {compression,

+ 4 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -741,6 +741,8 @@ ssl(_) ->
 
 producers_config(BridgeType, BridgeName, Input, IsDryRun, ActionResId) ->
     #{
+        max_linger_time := MaxLingerTime,
+        max_linger_bytes := MaxLingerBytes,
         max_batch_bytes := MaxBatchBytes,
         compression := Compression,
         partition_strategy := PartitionStrategy,
@@ -778,6 +780,8 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, ActionResId) ->
         replayq_seg_bytes => SegmentBytes,
         drop_if_highmem => MemOLP,
         required_acks => RequiredAcks,
+        max_linger_ms => MaxLingerTime,
+        max_linger_bytes => MaxLingerBytes,
         max_batch_bytes => MaxBatchBytes,
         max_send_ahead => MaxInflight - 1,
         compression => Compression,

+ 2 - 0
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -229,6 +229,8 @@ bridge_v2_config(ConnectorName, KafkaTopic) ->
             },
             <<"compression">> => <<"no_compression">>,
             <<"kafka_header_value_encode_mode">> => <<"none">>,
+            <<"max_linger_time">> => <<"0ms">>,
+            <<"max_linger_bytes">> => <<"10MB">>,
             <<"max_batch_bytes">> => <<"896KB">>,
             <<"max_inflight">> => 10,
             <<"message">> => #{

+ 2 - 0
apps/emqx_connector/test/emqx_connector_api_SUITE.erl

@@ -86,6 +86,8 @@
         ],
         <<"kafka_header_value_encode_mode">> => <<"none">>,
         <<"kafka_headers">> => <<"${pub_props}">>,
+        <<"max_linger_time">> => <<"1ms">>,
+        <<"max_linger_bytes">> => <<"1MB">>,
         <<"max_batch_bytes">> => <<"896KB">>,
         <<"max_inflight">> => 10,
         <<"message">> => #{

+ 11 - 0
rel/i18n/emqx_bridge_azure_event_hub.hocon

@@ -104,6 +104,17 @@ discovered partitions into account when dispatching messages per <code>partition
 partition_count_refresh_interval.label:
 """Partition Count Refresh Interval"""
 
+max_linger_time.desc:
+"""Maximum duration for a per-partition producer to wait for messages in order to collect a batch to buffer.
+The default value `0` means no wait. For non-memory buffer mode, it's advised to configure at least `5ms` for less IOPS."""
+
+max_linger_time.label: "Max Linger Time"
+
+max_linger_bytes.desc:
+"""Maximum number of bytes for a per-partition producer to wait for messages in order to collect a batch to buffer."""
+
+max_linger_bytes.label: "Max Linger Bytes"
+
 max_batch_bytes.desc:
 """Maximum bytes to collect in an Azure Event Hubs message batch."""
 

+ 11 - 0
rel/i18n/emqx_bridge_confluent_producer.hocon

@@ -104,6 +104,17 @@ discovered partitions into account when dispatching messages per <code>partition
 partition_count_refresh_interval.label:
 """Partition Count Refresh Interval"""
 
+max_linger_time.desc:
+"""Maximum duration for a per-partition producer to wait for messages in order to collect a batch to buffer.
+The default value `0` means no wait. For non-memory buffer mode, it's advised to configure at least `5ms` for less IOPS."""
+
+max_linger_time.label: "Max Linger Time"
+
+max_linger_bytes.desc:
+"""Maximum number of bytes for a per-partition producer to wait for messages in order to collect a batch to buffer."""
+
+max_linger_bytes.label: "Max Linger Bytes"
+
 max_batch_bytes.desc:
 """Maximum bytes to collect in a Confluent message batch. Most of the Kafka brokers default to a limit of 1 MB batch size. EMQX's default value is less than 1 MB in order to compensate Kafka message encoding overheads (especially when each individual message is very small). When a single message is over the limit, it is still sent (as a single element batch)."""
 

+ 11 - 0
rel/i18n/emqx_bridge_kafka.hocon

@@ -178,6 +178,17 @@ discovered partitions into account when dispatching messages per <code>partition
 partition_count_refresh_interval.label:
 """Partition Count Refresh Interval"""
 
+max_linger_time.desc:
+"""Maximum duration for a per-partition producer to wait for messages in order to collect a batch to buffer.
+The default value `0` means no wait. For non-memory buffer mode, it's advised to configure at least `5ms` for less IOPS."""
+
+max_linger_time.label: "Max Linger Time"
+
+max_linger_bytes.desc:
+"""Maximum number of bytes for a per-partition producer to wait for messages in order to collect a batch to buffer."""
+
+max_linger_bytes.label: "Max Linger Bytes"
+
 max_batch_bytes.desc:
 """Maximum bytes to collect in a Kafka message batch. Most of the Kafka brokers default to a limit of 1 MB batch size. EMQX's default value is less than 1 MB in order to compensate Kafka message encoding overheads (especially when each individual message is very small). When a single message is over the limit, it is still sent (as a single element batch)."""