Просмотр исходного кода

Merge pull request #13617 from thalesmg/20240814-r58-kafka-group-id-schema

chore(kafka consumer): treat provided empty string as absent group id
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
b408cccfcc

+ 0 - 3
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl

@@ -90,9 +90,6 @@ fields(source_parameters) ->
                 binary(),
                 #{
                     required => false,
-                    validator => [
-                        emqx_resource_validator:not_empty("Group id must not be empty")
-                    ],
                     desc => ?DESC(group_id)
                 }
             )}

+ 3 - 1
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl

@@ -627,7 +627,9 @@ log_when_error(Fun, Log) ->
     end.
 
 -spec consumer_group_id(#{group_id => binary(), any() => term()}, atom() | binary()) -> binary().
-consumer_group_id(#{group_id := GroupId}, _BridgeName) when is_binary(GroupId) ->
+consumer_group_id(#{group_id := GroupId}, _BridgeName) when
+    is_binary(GroupId) andalso GroupId =/= <<"">>
+->
     GroupId;
 consumer_group_id(_ConsumerParams, BridgeName0) ->
     BridgeName = to_bin(BridgeName0),

+ 3 - 7
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl

@@ -217,13 +217,9 @@ custom_group_id_test() ->
         BaseConfig,
         #{<<"parameters">> => #{<<"group_id">> => <<>>}}
     ),
-    ?assertThrow(
-        {_, [
-            #{
-                path := "sources.kafka_consumer.my_consumer.parameters.group_id",
-                reason := "Group id must not be empty"
-            }
-        ]},
+    %% Empty strings will be treated as absent by the connector.
+    ?assertMatch(
+        #{<<"parameters">> := #{<<"group_id">> := <<"">>}},
         emqx_bridge_v2_testlib:parse_and_check(source, kafka_consumer, my_consumer, BadSourceConfig)
     ),
 

+ 63 - 0
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl

@@ -352,6 +352,69 @@ t_bad_bootstrap_host(Config) ->
     ),
     ok.
 
+%% Checks that a group id is automatically generated if a custom one is not provided in
+%% the config.
+t_absent_group_id(Config) ->
+    ?check_trace(
+        begin
+            #{<<"bootstrap_hosts">> := BootstrapHosts} = ?config(connector_config, Config),
+            SourceConfig = ?config(source_config, Config),
+            SourceName = ?config(source_name, Config),
+            ?assertEqual(
+                undefined,
+                emqx_utils_maps:deep_get(
+                    [<<"parameters">>, <<"group_id">>],
+                    SourceConfig,
+                    undefined
+                )
+            ),
+            {ok, {{_, 201, _}, _, _}} = emqx_bridge_v2_testlib:create_bridge_api(Config),
+            [Endpoint] = emqx_bridge_kafka_impl:hosts(BootstrapHosts),
+            GroupId = emqx_bridge_kafka_impl_consumer:consumer_group_id(#{}, SourceName),
+            ct:pal("generated group id: ~p", [GroupId]),
+            ?retry(100, 10, begin
+                {ok, Groups} = brod:list_groups(Endpoint, _ConnOpts = #{}),
+                ?assertMatch(
+                    [_],
+                    [Group || Group = {_, Id, _} <- Groups, Id == GroupId],
+                    #{groups => Groups}
+                )
+            end),
+            ok
+        end,
+        []
+    ),
+    ok.
+
+%% Checks that a group id is automatically generated if an empty string is provided in the
+%% config.
+t_empty_group_id(Config) ->
+    ?check_trace(
+        begin
+            #{<<"bootstrap_hosts">> := BootstrapHosts} = ?config(connector_config, Config),
+            SourceName = ?config(source_name, Config),
+            {ok, {{_, 201, _}, _, _}} =
+                emqx_bridge_v2_testlib:create_bridge_api(
+                    Config,
+                    #{<<"parameters">> => #{<<"group_id">> => <<"">>}}
+                ),
+            [Endpoint] = emqx_bridge_kafka_impl:hosts(BootstrapHosts),
+            GroupId = emqx_bridge_kafka_impl_consumer:consumer_group_id(#{}, SourceName),
+            ct:pal("generated group id: ~p", [GroupId]),
+            ?retry(100, 10, begin
+                {ok, Groups} = brod:list_groups(Endpoint, _ConnOpts = #{}),
+                ?assertMatch(
+                    [_],
+                    [Group || Group = {_, Id, _} <- Groups, Id == GroupId],
+                    #{groups => Groups}
+                )
+            end),
+            ok
+        end,
+        []
+    ),
+    ok.
+
 t_custom_group_id(Config) ->
     ?check_trace(
         begin