فهرست منبع

chore: disallow disk buffer when dynamic topics are used

Fixes https://emqx.atlassian.net/browse/EMQX-13242

BREAKING CHANGE:

Kafka/Confluent/Azure Event Hub Producers with a dynamic topic (i.e., a topic that contains placeholders) no longer support disk buffering.  Only memory and hybrid modes are now supported.
Thales Macedo Garitezi 1 سال پیش
والد
کامیت
ee192e53d1

+ 1 - 1
apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_azure_event_hub, [
 {application, emqx_bridge_azure_event_hub, [
     {description, "EMQX Enterprise Azure Event Hub Bridge"},
     {description, "EMQX Enterprise Azure Event Hub Bridge"},
-    {vsn, "0.2.0"},
+    {vsn, "0.2.1"},
     {registered, []},
     {registered, []},
     {applications, [
     {applications, [
         kernel,
         kernel,

+ 1 - 1
apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl

@@ -419,7 +419,7 @@ bridge_v2_overrides() ->
         parameters =>
         parameters =>
             mk(ref(producer_kafka_opts), #{
             mk(ref(producer_kafka_opts), #{
                 required => true,
                 required => true,
-                validator => fun emqx_bridge_kafka:producer_strategy_key_validator/1
+                validator => fun emqx_bridge_kafka:producer_parameters_validator/1
             }),
             }),
         ssl => mk(ref(ssl_client_opts), #{default => #{<<"enable">> => true}}),
         ssl => mk(ref(ssl_client_opts), #{default => #{<<"enable">> => true}}),
         type => mk(
         type => mk(

+ 13 - 0
apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl

@@ -410,3 +410,16 @@ t_dynamic_topics(Config) ->
             ]
             ]
         ),
         ),
     ok.
     ok.
+
+t_disallow_disk_mode_for_dynamic_topic(Config) ->
+    ActionConfig = ?config(action_config, Config),
+    ok =
+        emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
+            [
+                {type, ?BRIDGE_TYPE_BIN},
+                {connector_name, ?config(connector_name, Config)},
+                {connector_config, ?config(connector_config, Config)},
+                {action_config, ActionConfig}
+            ]
+        ),
+    ok.

+ 1 - 1
apps/emqx_bridge_confluent/src/emqx_bridge_confluent.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_confluent, [
 {application, emqx_bridge_confluent, [
     {description, "EMQX Enterprise Confluent Connector and Action"},
     {description, "EMQX Enterprise Confluent Connector and Action"},
-    {vsn, "0.2.0"},
+    {vsn, "0.2.1"},
     {registered, []},
     {registered, []},
     {applications, [
     {applications, [
         kernel,
         kernel,

+ 1 - 1
apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl

@@ -342,7 +342,7 @@ bridge_v2_overrides() ->
         parameters =>
         parameters =>
             mk(ref(producer_kafka_opts), #{
             mk(ref(producer_kafka_opts), #{
                 required => true,
                 required => true,
-                validator => fun emqx_bridge_kafka:producer_strategy_key_validator/1
+                validator => fun emqx_bridge_kafka:producer_parameters_validator/1
             }),
             }),
         ssl => mk(ref(ssl_client_opts), #{
         ssl => mk(ref(ssl_client_opts), #{
             default => #{
             default => #{

+ 13 - 0
apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl

@@ -419,3 +419,16 @@ t_dynamic_topics(Config) ->
             ]
             ]
         ),
         ),
     ok.
     ok.
+
+t_disallow_disk_mode_for_dynamic_topic(Config) ->
+    ActionConfig = ?config(action_config, Config),
+    ok =
+        emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
+            [
+                {type, ?ACTION_TYPE_BIN},
+                {connector_name, ?config(connector_name, Config)},
+                {connector_config, ?config(connector_config, Config)},
+                {action_config, ActionConfig}
+            ]
+        ),
+    ok.

+ 26 - 2
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl

@@ -3,6 +3,8 @@
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 -module(emqx_bridge_kafka).
 -module(emqx_bridge_kafka).
 
 
+-feature(maybe_expr, enable).
+
 -behaviour(emqx_connector_examples).
 -behaviour(emqx_connector_examples).
 
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("typerefl/include/types.hrl").
@@ -40,7 +42,9 @@
 -export([
 -export([
     kafka_connector_config_fields/0,
     kafka_connector_config_fields/0,
     kafka_producer_converter/2,
     kafka_producer_converter/2,
-    producer_strategy_key_validator/1
+    producer_strategy_key_validator/1,
+    producer_buffer_mode_validator/1,
+    producer_parameters_validator/1
 ]).
 ]).
 
 
 -define(CONNECTOR_TYPE, kafka_producer).
 -define(CONNECTOR_TYPE, kafka_producer).
@@ -721,7 +725,7 @@ parameters_field(ActionOrBridgeV1) ->
             required => true,
             required => true,
             aliases => [Alias],
             aliases => [Alias],
             desc => ?DESC(producer_kafka_opts),
             desc => ?DESC(producer_kafka_opts),
-            validator => fun producer_strategy_key_validator/1
+            validator => fun producer_parameters_validator/1
         })}.
         })}.
 
 
 %% -------------------------------------------------------------------------------------------------
 %% -------------------------------------------------------------------------------------------------
@@ -781,6 +785,26 @@ consumer_topic_mapping_validator(TopicMapping0 = [_ | _]) ->
             {error, "Kafka topics must not be repeated in a bridge"}
             {error, "Kafka topics must not be repeated in a bridge"}
     end.
     end.
 
 
+producer_parameters_validator(Conf) ->
+    maybe
+        ok ?= producer_strategy_key_validator(Conf),
+        ok ?= producer_buffer_mode_validator(Conf)
+    end.
+
+producer_buffer_mode_validator(#{buffer := _} = Conf) ->
+    producer_buffer_mode_validator(emqx_utils_maps:binary_key_map(Conf));
+producer_buffer_mode_validator(#{<<"buffer">> := #{<<"mode">> := disk}, <<"topic">> := Topic}) ->
+    Template = emqx_template:parse(Topic),
+    case emqx_template:placeholders(Template) of
+        [] ->
+            ok;
+        [_ | _] ->
+            {error, <<"disk-mode buffering is disallowed when using dynamic topics">>}
+    end;
+producer_buffer_mode_validator(_) ->
+    %% `buffer' field is not required
+    ok.
+
 producer_strategy_key_validator(
 producer_strategy_key_validator(
     #{
     #{
         partition_strategy := _,
         partition_strategy := _,

+ 51 - 0
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -1227,3 +1227,54 @@ t_fixed_topic_recovers_in_disk_mode(Config) ->
         []
         []
     ),
     ),
     ok.
     ok.
+
+%% Verifies that we disallow disk mode when the kafka topic is dynamic.
+t_disallow_disk_mode_for_dynamic_topic(Config) ->
+    Type = proplists:get_value(type, Config, ?TYPE),
+    ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
+    ActionName = <<"dynamic_topic_disk">>,
+    ActionConfig = proplists:get_value(action_config, Config, action_config(ConnectorName)),
+    ?assertMatch(
+        #{},
+        emqx_bridge_v2_testlib:parse_and_check(
+            action,
+            Type,
+            ActionName,
+            emqx_utils_maps:deep_merge(
+                ActionConfig,
+                #{
+                    <<"parameters">> => #{
+                        <<"topic">> => <<"dynamic-${.payload.n}">>,
+                        <<"buffer">> => #{
+                            <<"mode">> => <<"hybrid">>
+                        }
+                    }
+                }
+            )
+        )
+    ),
+    ?assertThrow(
+        {_SchemaMod, [
+            #{
+                reason := <<"disk-mode buffering is disallowed when using dynamic topics">>,
+                kind := validation_error
+            }
+        ]},
+        emqx_bridge_v2_testlib:parse_and_check(
+            action,
+            Type,
+            ActionName,
+            emqx_utils_maps:deep_merge(
+                ActionConfig,
+                #{
+                    <<"parameters">> => #{
+                        <<"topic">> => <<"dynamic-${.payload.n}">>,
+                        <<"buffer">> => #{
+                            <<"mode">> => <<"disk">>
+                        }
+                    }
+                }
+            )
+        )
+    ),
+    ok.

+ 1 - 0
changes/ee/breaking-14015.en.md

@@ -0,0 +1 @@
+Kafka/Confluent/Azure Event Hub Producers with a dynamic topic (i.e., a topic that contains placeholders) no longer support disk buffering.  Only memory and hybrid modes are now supported.