Parcourir la source

fix: add non-empty validator

Thales Macedo Garitezi il y a 1 an
Parent
commit
eb113fa578

+ 3 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl

@@ -90,6 +90,9 @@ fields(source_parameters) ->
                 binary(),
                 #{
                     required => false,
+                    validator => [
+                        emqx_resource_validator:not_empty("Group id must not be empty")
+                    ],
                     desc => ?DESC(group_id)
                 }
             )}

+ 53 - 6
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl

@@ -204,6 +204,41 @@ test_keepalive_validation(Name, Conf) ->
         [?_assertThrow(_, check(C)) || C <- InvalidConfs] ++
         [?_assertThrow(_, check_atom_key(C)) || C <- InvalidConfs].
 
+%% assert compatibility
+bridge_schema_json_test() ->
+    JSON = iolist_to_binary(emqx_dashboard_schema_api:bridge_schema_json()),
+    Map = emqx_utils_json:decode(JSON),
+    Path = [<<"components">>, <<"schemas">>, <<"bridge_kafka.post_producer">>, <<"properties">>],
+    ?assertMatch(#{<<"kafka">> := _}, emqx_utils_maps:deep_get(Path, Map)).
+
+custom_group_id_test() ->
+    BaseConfig = kafka_consumer_source_config(),
+    BadSourceConfig = emqx_utils_maps:deep_merge(
+        BaseConfig,
+        #{<<"parameters">> => #{<<"group_id">> => <<>>}}
+    ),
+    ?assertThrow(
+        {_, [
+            #{
+                path := "sources.kafka_consumer.my_consumer.parameters.group_id",
+                reason := "Group id must not be empty"
+            }
+        ]},
+        emqx_bridge_v2_testlib:parse_and_check(source, kafka_consumer, my_consumer, BadSourceConfig)
+    ),
+
+    CustomId = <<"custom_id">>,
+    OkSourceConfig = emqx_utils_maps:deep_merge(
+        BaseConfig,
+        #{<<"parameters">> => #{<<"group_id">> => CustomId}}
+    ),
+    ?assertMatch(
+        #{<<"parameters">> := #{<<"group_id">> := CustomId}},
+        emqx_bridge_v2_testlib:parse_and_check(source, kafka_consumer, my_consumer, OkSourceConfig)
+    ),
+
+    ok.
+
 %%===========================================================================
 %% Helper functions
 %%===========================================================================
@@ -355,9 +390,21 @@ kafka_consumer_hocon() ->
     "\n   }"
     "\n }".
 
-%% assert compatibility
-bridge_schema_json_test() ->
-    JSON = iolist_to_binary(emqx_dashboard_schema_api:bridge_schema_json()),
-    Map = emqx_utils_json:decode(JSON),
-    Path = [<<"components">>, <<"schemas">>, <<"bridge_kafka.post_producer">>, <<"properties">>],
-    ?assertMatch(#{<<"kafka">> := _}, emqx_utils_maps:deep_get(Path, Map)).
+kafka_consumer_source_config() ->
+    #{
+        <<"enable">> => true,
+        <<"connector">> => <<"my_connector">>,
+        <<"parameters">> =>
+            #{
+                <<"key_encoding_mode">> => <<"none">>,
+                <<"max_batch_bytes">> => <<"896KB">>,
+                <<"max_rejoin_attempts">> => <<"5">>,
+                <<"offset_reset_policy">> => <<"latest">>,
+                <<"topic">> => <<"please override">>,
+                <<"value_encoding_mode">> => <<"none">>
+            },
+        <<"resource_opts">> => #{
+            <<"health_check_interval">> => <<"2s">>,
+            <<"resume_interval">> => <<"2s">>
+        }
+    }.