Browse Source

fix(kafka_consumer): validate topic mapping in v2 schema

Fixes https://emqx.atlassian.net/browse/EMQX-12008
Thales Macedo Garitezi 2 năm trước cách đây
mục cha
commit
a852695950

+ 3 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl

@@ -32,6 +32,9 @@
     producer_opts/1
 ]).
 
+%% Internal export to be used in v2 schema
+-export([consumer_topic_mapping_validator/1]).
+
 -export([
     kafka_connector_config_fields/0,
     kafka_producer_converter/2,

+ 7 - 1
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl

@@ -65,7 +65,7 @@ fields(source_parameters) ->
                     type => hocon_schema:field_schema(Sc, type),
                     required => false,
                     default => [],
-                    validator => fun(_) -> ok end,
+                    validator => fun legacy_consumer_topic_mapping_validator/1,
                     importance => ?IMPORTANCE_HIDDEN
                 },
                 {Name, hocon_schema:override(Sc, Override)};
@@ -231,3 +231,9 @@ connector_example(put) ->
                 start_timeout => <<"5s">>
             }
     }.
+
+legacy_consumer_topic_mapping_validator(_TopicMapping = []) ->
+    %% Can be (and should be, unless it has migrated from v1) empty in v2.
+    ok;
+legacy_consumer_topic_mapping_validator(TopicMapping = [_ | _]) ->
+    emqx_bridge_kafka:consumer_topic_mapping_validator(TopicMapping).

+ 33 - 2
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl

@@ -74,6 +74,7 @@ testcases(once) ->
         t_node_joins_existing_cluster,
         t_cluster_node_down,
         t_multiple_topic_mappings,
+        t_duplicated_kafka_topics,
         t_dynamic_mqtt_topic,
         t_resource_manager_crash_after_subscriber_started,
         t_resource_manager_crash_before_subscriber_started
@@ -292,7 +293,10 @@ end_per_group(_Group, _Config) ->
 init_per_testcase(t_cluster_group = TestCase, Config0) ->
     Config = emqx_utils:merge_opts(Config0, [{num_partitions, 6}]),
     common_init_per_testcase(TestCase, Config);
-init_per_testcase(t_multiple_topic_mappings = TestCase, Config0) ->
+init_per_testcase(TestCase, Config0) when
+    TestCase =:= t_multiple_topic_mappings;
+    TestCase =:= t_duplicated_kafka_topics
+->
     KafkaTopicBase =
         <<
             (atom_to_binary(TestCase))/binary,
@@ -671,7 +675,12 @@ authentication(_) ->
 parse_and_check(ConfigString, Name) ->
     {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
     TypeBin = ?BRIDGE_TYPE_BIN,
-    hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
+    #{<<"bridges">> := #{TypeBin := #{Name := _}}} =
+        hocon_tconf:check_plain(
+            emqx_bridge_schema,
+            RawConf,
+            #{required => false, atom_key => false}
+        ),
     #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf,
     Config.
 
@@ -1359,6 +1368,28 @@ t_multiple_topic_mappings(Config) ->
     ),
     ok.
 
+%% Although we have a test for the v1 schema, the v1 compatibility layer does some
+%% shenanigans that do not go through V1 schema validations...
+t_duplicated_kafka_topics(Config) ->
+    #{<<"topic_mapping">> := [#{<<"kafka_topic">> := KT} | _] = TM0} =
+        ?config(kafka_config, Config),
+    TM = [M#{<<"kafka_topic">> := KT} || M <- TM0],
+    ?check_trace(
+        begin
+            ?assertMatch(
+                {error, {{_, 400, _}, _, _}},
+                create_bridge_api(
+                    Config,
+                    #{<<"topic_mapping">> => TM}
+                )
+            ),
+
+            ok
+        end,
+        []
+    ),
+    ok.
+
 t_on_get_status(Config) ->
     ProxyPort = ?config(proxy_port, Config),
     ProxyHost = ?config(proxy_host, Config),