Просмотр исходного кода

Merge pull request #14375 from thalesmg/20241209-r58-kconsu-pretty-hc-reason

feat(kafka consumer): expose more error details when doing health checks / dry runs
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
50b8ee20cc

+ 18 - 3
.ci/docker-compose-file/docker-compose-kafka.yaml

@@ -16,7 +16,7 @@ services:
     user: "${DOCKER_USER:-root}"
     volumes:
       - /tmp/emqx-ci/emqx-shared-secret:/var/lib/secret
-  kafka_1:
+  kafka_1: &kafka
     image: wurstmeister/kafka:2.13-2.8.1
     # ports:
     #   - "9192-9195:9192-9195"
@@ -29,7 +29,7 @@ services:
         condition: service_started
       ssl_cert_gen:
         condition: service_completed_successfully
-    environment:
+    environment: &kafka-env
       KAFKA_BROKER_ID: 1
       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
       KAFKA_LISTENERS: PLAINTEXT://:9092,SASL_PLAINTEXT://:9093,SSL://:9094,SASL_SSL://:9095,LOCAL_PLAINTEXT://:9192,LOCAL_SASL_PLAINTEXT://:9193,LOCAL_SSL://:9194,LOCAL_SASL_SSL://:9195,TOXIPROXY_PLAINTEXT://:9292,TOXIPROXY_SASL_PLAINTEXT://:9293,TOXIPROXY_SSL://:9294,TOXIPROXY_SASL_SSL://:9295
@@ -52,9 +52,24 @@ services:
     networks:
       emqx_bridge:
     volumes:
-      - /tmp/emqx-ci/emqx-shared-secret:/var/lib/secret
+      - /tmp/emqx-ci/emqx-shared-secret:/var/lib/secret-ro:ro
       - ./kafka/jaas.conf:/etc/kafka/jaas.conf
       - ./kafka/kafka-entrypoint.sh:/bin/kafka-entrypoint.sh
       - ./kerberos/krb5.conf:/etc/kdc/krb5.conf
       - ./kerberos/krb5.conf:/etc/krb5.conf
     command: kafka-entrypoint.sh
+  kafka_2:
+    <<: *kafka
+    container_name: kafka-2.emqx.net
+    hostname: kafka-2.emqx.net
+    environment:
+      <<: *kafka-env
+      KAFKA_BROKER_ID: 2
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2.emqx.net:9092,SASL_PLAINTEXT://kafka-2.emqx.net:9093,SSL://kafka-2.emqx.net:9094,SASL_SSL://kafka-2.emqx.net:9095,LOCAL_PLAINTEXT://localhost:19192,LOCAL_SASL_PLAINTEXT://localhost:19193,LOCAL_SSL://localhost:19194,LOCAL_SASL_SSL://localhost:19195,TOXIPROXY_PLAINTEXT://toxiproxy.emqx.net:19292,TOXIPROXY_SASL_PLAINTEXT://toxiproxy.emqx.net:19293,TOXIPROXY_SSL://toxiproxy.emqx.net:19294,TOXIPROXY_SASL_SSL://toxiproxy.emqx.net:19295
+      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+    volumes:
+      - /tmp/emqx-ci/emqx-shared-secret:/var/lib/secret-ro:ro
+      - ./kafka/jaas2.conf:/etc/kafka/jaas.conf
+      - ./kafka/kafka-entrypoint.sh:/bin/kafka-entrypoint.sh
+      - ./kerberos/krb5.conf:/etc/kdc/krb5.conf
+      - ./kerberos/krb5.conf:/etc/krb5.conf

+ 16 - 0
.ci/docker-compose-file/kafka/jaas2.conf

@@ -0,0 +1,16 @@
+KafkaServer {
+   org.apache.kafka.common.security.plain.PlainLoginModule required
+   user_admin="password"
+   user_emqxuser="password";
+
+   org.apache.kafka.common.security.scram.ScramLoginModule required
+   username="admin"
+   password="password";
+
+   com.sun.security.auth.module.Krb5LoginModule required
+   useKeyTab=true
+   storeKey=true
+   keyTab="/var/lib/secret/kafka.keytab"
+   principal="kafka/kafka-2.emqx.net@KDC.EMQX.NET";
+
+};

+ 15 - 8
.ci/docker-compose-file/kafka/kafka-entrypoint.sh

@@ -11,12 +11,18 @@ sleep 5
 
 echo "+++++++ Wait until Kerberos Keytab is created ++++++++"
 
-timeout $TIMEOUT bash -c 'until [ -f /var/lib/secret/kafka.keytab ]; do sleep 1; done'
+timeout $TIMEOUT bash -c 'until [ -f /var/lib/secret-ro/kafka.keytab ]; do sleep 1; done'
 
 
 echo "+++++++ Wait until SSL certs are generated ++++++++"
 
-timeout $TIMEOUT bash -c 'until [ -f /var/lib/secret/kafka.truststore.jks ]; do sleep 1; done'
+timeout $TIMEOUT bash -c 'until [ -f /var/lib/secret-ro/kafka.truststore.jks ]; do sleep 1; done'
+
+mkdir -p /var/lib/secret
+
+## make our own copy for this container
+cp /var/lib/secret-ro/* /var/lib/secret/
+
 keytool -list -v -keystore /var/lib/secret/kafka.keystore.jks -storepass password
 
 sleep 3
@@ -43,18 +49,19 @@ echo "+++++++ Run config commands ++++++++"
 
 kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=password],SCRAM-SHA-512=[password=password]' --entity-type users --entity-name emqxuser
 
-echo "+++++++ Creating Kafka Topics ++++++++"
-
 # create topics after re-configuration
 # there seem to be a race condition when creating the topics (too early)
-env KAFKA_CREATE_TOPICS="$KAFKA_CREATE_TOPICS_NG" KAFKA_PORT="$PORT1" create-topics.sh
+if [[ "$KAFKA_BROKER_ID" == 1 ]]; then
+  echo "+++++++ Creating Kafka Topics ++++++++"
+
+  env KAFKA_CREATE_TOPICS="$KAFKA_CREATE_TOPICS_NG" KAFKA_PORT="$PORT1" create-topics.sh
 
-# create a topic with max.message.bytes=100
-/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server "${SERVER}:${PORT1}" --topic max-100-bytes --partitions 1 --replication-factor 1 --config max.message.bytes=100
+  # create a topic with max.message.bytes=100
+  /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server "${SERVER}:${PORT1}" --topic max-100-bytes --partitions 1 --replication-factor 1 --config max.message.bytes=100
+fi
 
 echo "+++++++ Wait until Kafka ports are down ++++++++"
 
 bash -c 'while printf "" 2>>/dev/null >>/dev/tcp/$0/$1; do sleep 1; done' $SERVER $PORT1
 
 echo "+++++++ Kafka ports are down ++++++++"
-

+ 2 - 0
.ci/docker-compose-file/kerberos/run.sh

@@ -16,6 +16,7 @@ kdb5_util -P emqx -r KDC.EMQX.NET create -s
 echo "Add principals"
 
 kadmin.local -w password -q "add_principal -randkey kafka/kafka-1.emqx.net@KDC.EMQX.NET" > /dev/null
+kadmin.local -w password -q "add_principal -randkey kafka/kafka-2.emqx.net@KDC.EMQX.NET" > /dev/null
 kadmin.local -w password -q "add_principal -randkey rig@KDC.EMQX.NET"  > /dev/null
 
 # For Kerberos Authn
@@ -26,6 +27,7 @@ kadmin.local -w password -q "add_principal -randkey krb_authn_cli@KDC.EMQX.NET"
 echo "Create keytabs"
 
 kadmin.local -w password -q "ktadd  -k /var/lib/secret/kafka.keytab -norandkey kafka/kafka-1.emqx.net@KDC.EMQX.NET " > /dev/null
+kadmin.local -w password -q "ktadd  -k /var/lib/secret/kafka.keytab -norandkey kafka/kafka-2.emqx.net@KDC.EMQX.NET " > /dev/null
 kadmin.local -w password -q "ktadd  -k /var/lib/secret/rig.keytab -norandkey rig@KDC.EMQX.NET " > /dev/null
 
 # For Kerberos Authn

+ 24 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -78,6 +78,30 @@
     "upstream": "kafka-1.emqx.net:9295",
     "enabled": true
   },
+  {
+    "name": "kafka_2_plain",
+    "listen": "0.0.0.0:19292",
+    "upstream": "kafka-2.emqx.net:9292",
+    "enabled": true
+  },
+  {
+    "name": "kafka_2_sasl_plain",
+    "listen": "0.0.0.0:19293",
+    "upstream": "kafka-2.emqx.net:9293",
+    "enabled": true
+  },
+  {
+    "name": "kafka_2_ssl",
+    "listen": "0.0.0.0:19294",
+    "upstream": "kafka-2.emqx.net:9294",
+    "enabled": true
+  },
+  {
+    "name": "kafka_2_sasl_ssl",
+    "listen": "0.0.0.0:19295",
+    "upstream": "kafka-2.emqx.net:9295",
+    "enabled": true
+  },
   {
     "name": "rocketmq",
     "listen": "0.0.0.0:9876",

+ 36 - 21
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl

@@ -285,7 +285,7 @@ on_get_channels(ConnectorResId) ->
     source_resource_id(),
     connector_state()
 ) ->
-    ?status_connected | ?status_disconnected.
+    ?status_connected | {?status_disconnected | ?status_connecting, _Msg :: binary()}.
 on_get_channel_status(
     _ConnectorResId,
     SourceResId,
@@ -542,8 +542,8 @@ do_get_status(ClientID, [KafkaTopic | RestTopics], SubscriberId) ->
             case do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) of
                 ?status_connected ->
                     do_get_status(ClientID, RestTopics, SubscriberId);
-                ?status_disconnected ->
-                    ?status_disconnected
+                {Status, Message} when Status =/= ?status_connected ->
+                    {Status, Message}
             end;
         {error, {client_down, Context}} ->
             case infer_client_error(Context) of
@@ -571,41 +571,54 @@ do_get_status(_ClientID, _KafkaTopics = [], _SubscriberId) ->
     ?status_connected.
 
 -spec do_get_topic_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) ->
-    ?status_connected | ?status_disconnected.
+    ?status_connected | {?status_disconnected | ?status_connecting, _Msg :: binary()}.
 do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
     Results =
         lists:map(
             fun(N) ->
-                brod_client:get_leader_connection(ClientID, KafkaTopic, N)
+                {N, brod_client:get_leader_connection(ClientID, KafkaTopic, N)}
             end,
             lists:seq(0, NPartitions - 1)
         ),
-    AllLeadersOk =
-        length(Results) > 0 andalso
-            lists:all(
-                fun
-                    ({ok, _}) ->
-                        true;
-                    (_) ->
-                        false
-                end,
-                Results
-            ),
     WorkersAlive = are_subscriber_workers_alive(SubscriberId),
-    case AllLeadersOk andalso WorkersAlive of
-        true ->
+    case check_leader_connection_results(Results) of
+        ok when WorkersAlive ->
             ?status_connected;
-        false ->
-            ?status_disconnected
+        {error, no_leaders} ->
+            {?status_disconnected, <<"No leaders available (no partitions?)">>};
+        {error, {N, Reason}} ->
+            Msg = iolist_to_binary(
+                io_lib:format(
+                    "Leader for partition ~b unavailable; reason: ~0p",
+                    [N, emqx_utils:redact(Reason)]
+                )
+            ),
+            {?status_disconnected, Msg};
+        ok when not WorkersAlive ->
+            {?status_connecting, <<"Subscription workers restarting">>}
     end.
 
+check_leader_connection_results(Results) ->
+    emqx_utils:foldl_while(
+        fun
+            ({_N, {ok, _}}, _Acc) ->
+                {cont, ok};
+            ({N, {error, Reason}}, _Acc) ->
+                {halt, {error, {N, Reason}}}
+        end,
+        {error, no_leaders},
+        Results
+    ).
+
 are_subscriber_workers_alive(SubscriberId) ->
     try
         Children = supervisor:which_children(emqx_bridge_kafka_consumer_sup),
         case lists:keyfind(SubscriberId, 1, Children) of
             false ->
                 false;
-            {_, Pid, _, _} ->
+            {_, undefined, _, _} ->
+                false;
+            {_, Pid, _, _} when is_pid(Pid) ->
                 Workers = brod_group_subscriber_v2:get_workers(Pid),
                 %% we can't enforce the number of partitions on a single
                 %% node, as the group might be spread across an emqx
@@ -613,6 +626,8 @@ are_subscriber_workers_alive(SubscriberId) ->
                 lists:all(fun is_process_alive/1, maps:values(Workers))
         end
     catch
+        exit:{noproc, _} ->
+            false;
         exit:{shutdown, _} ->
             %% may happen if node is shutting down
             false

+ 27 - 7
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl

@@ -80,7 +80,6 @@ testcases(once) ->
     ].
 
 init_per_suite(Config) ->
-    emqx_common_test_helpers:clear_screen(),
     Apps = emqx_cth_suite:start(
         [
             emqx,
@@ -93,7 +92,6 @@ init_per_suite(Config) ->
         ],
         #{work_dir => emqx_cth_suite:work_dir(Config)}
     ),
-    {ok, _Api} = emqx_common_test_http:create_default_app(),
     [
         {apps, Apps},
         {bridge_type, <<"kafka_consumer">>}
@@ -111,11 +109,13 @@ init_per_group(plain = Type, Config) ->
     DirectKafkaHost = os:getenv("KAFKA_DIRECT_PLAIN_HOST", "kafka-1.emqx.net"),
     DirectKafkaPort = list_to_integer(os:getenv("KAFKA_DIRECT_PLAIN_PORT", "9092")),
     ProxyName = "kafka_plain",
+    ProxyName2 = "kafka_2_plain",
     case emqx_common_test_helpers:is_tcp_server_available(KafkaHost, KafkaPort) of
         true ->
             Config1 = common_init_per_group(),
             [
                 {proxy_name, ProxyName},
+                {proxy_name_2, ProxyName2},
                 {kafka_host, KafkaHost},
                 {kafka_port, KafkaPort},
                 {direct_kafka_host, DirectKafkaHost},
@@ -139,11 +139,13 @@ init_per_group(sasl_plain = Type, Config) ->
     DirectKafkaHost = os:getenv("KAFKA_DIRECT_SASL_HOST", "kafka-1.emqx.net"),
     DirectKafkaPort = list_to_integer(os:getenv("KAFKA_DIRECT_SASL_PORT", "9093")),
     ProxyName = "kafka_sasl_plain",
+    ProxyName2 = "kafka_2_sasl_plain",
     case emqx_common_test_helpers:is_tcp_server_available(KafkaHost, KafkaPort) of
         true ->
             Config1 = common_init_per_group(),
             [
                 {proxy_name, ProxyName},
+                {proxy_name_2, ProxyName2},
                 {kafka_host, KafkaHost},
                 {kafka_port, KafkaPort},
                 {direct_kafka_host, DirectKafkaHost},
@@ -167,11 +169,13 @@ init_per_group(ssl = Type, Config) ->
     DirectKafkaHost = os:getenv("KAFKA_DIRECT_SSL_HOST", "kafka-1.emqx.net"),
     DirectKafkaPort = list_to_integer(os:getenv("KAFKA_DIRECT_SSL_PORT", "9094")),
     ProxyName = "kafka_ssl",
+    ProxyName2 = "kafka_2_ssl",
     case emqx_common_test_helpers:is_tcp_server_available(KafkaHost, KafkaPort) of
         true ->
             Config1 = common_init_per_group(),
             [
                 {proxy_name, ProxyName},
+                {proxy_name_2, ProxyName2},
                 {kafka_host, KafkaHost},
                 {kafka_port, KafkaPort},
                 {direct_kafka_host, DirectKafkaHost},
@@ -195,11 +199,13 @@ init_per_group(sasl_ssl = Type, Config) ->
     DirectKafkaHost = os:getenv("KAFKA_DIRECT_SASL_SSL_HOST", "kafka-1.emqx.net"),
     DirectKafkaPort = list_to_integer(os:getenv("KAFKA_DIRECT_SASL_SSL_PORT", "9095")),
     ProxyName = "kafka_sasl_ssl",
+    ProxyName2 = "kafka_2_sasl_ssl",
     case emqx_common_test_helpers:is_tcp_server_available(KafkaHost, KafkaPort) of
         true ->
             Config1 = common_init_per_group(),
             [
                 {proxy_name, ProxyName},
+                {proxy_name_2, ProxyName2},
                 {kafka_host, KafkaHost},
                 {kafka_port, KafkaPort},
                 {direct_kafka_host, DirectKafkaHost},
@@ -536,8 +542,12 @@ ensure_topics(Config) ->
                 ConnConfig0#{sasl => undefined}
         end,
     case brod:create_topics(Endpoints, TopicConfigs, RequestConfig, ConnConfig) of
-        ok -> ok;
-        {error, topic_already_exists} -> ok
+        ok ->
+            %% Need some time for all brokers to stabilize...
+            ct:sleep(500),
+            ok;
+        {error, topic_already_exists} ->
+            ok
     end.
 
 shared_secret_path() ->
@@ -1137,6 +1147,17 @@ get_mqtt_port(Node) ->
     {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
     Port.
 
+with_brokers_down(Config, Fun) ->
+    ProxyName1 = ?config(proxy_name, Config),
+    ProxyName2 = ?config(proxy_name_2, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    emqx_common_test_helpers:with_failure(
+        down, ProxyName1, ProxyHost, ProxyPort, fun() ->
+            emqx_common_test_helpers:with_failure(down, ProxyName2, ProxyHost, ProxyPort, Fun)
+        end
+    ).
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -1537,8 +1558,7 @@ t_receive_after_recovery(Config) ->
             %% 1) cut the connection with kafka.
             WorkerRefs = maps:from_list([
                 {monitor(process, Pid), Pid}
-             || {_TopicPartition, Pid} <-
-                    maps:to_list(get_subscriber_workers())
+             || Pid <- maps:values(get_subscriber_workers())
             ]),
             NumMsgs = 50,
             Messages1 = [
@@ -1555,7 +1575,7 @@ t_receive_after_recovery(Config) ->
             on_exit(fun() -> emqtt:stop(C) end),
             {ok, _} = emqtt:connect(C),
             {ok, _, [0]} = emqtt:subscribe(C, MQTTTopic),
-            emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+            with_brokers_down(Config, fun() ->
                 wait_downs(WorkerRefs, _Timeout2 = 1_000),
                 %% 2) publish messages while the consumer is down.
                 %% we use `pmap' to avoid wolff sending the whole

+ 135 - 20
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl

@@ -35,15 +35,19 @@ matrix_cases() ->
     ].
 
 init_per_suite(Config) ->
-    emqx_bridge_kafka_impl_consumer_SUITE:init_per_suite(Config).
+    [
+        {proxy_host, "toxiproxy"},
+        {proxy_port, 8474}
+        | emqx_bridge_kafka_impl_consumer_SUITE:init_per_suite(Config)
+    ].
 
 end_per_suite(Config) ->
     emqx_bridge_kafka_impl_consumer_SUITE:end_per_suite(Config).
 
-init_per_testcase(TestCase, Config) ->
-    common_init_per_testcase(TestCase, Config).
-
-common_init_per_testcase(TestCase, Config0) ->
+init_per_testcase(TestCase, Config0) ->
+    ProxyHost = ?config(proxy_host, Config0),
+    ProxyPort = ?config(proxy_port, Config0),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
     ct:timetrap({seconds, 60}),
     UniqueNum = integer_to_binary(erlang:unique_integer()),
     Name = <<(atom_to_binary(TestCase))/binary, UniqueNum/binary>>,
@@ -63,9 +67,7 @@ common_init_per_testcase(TestCase, Config0) ->
         {source_config, SourceConfig},
         {connector_name, Name},
         {connector_type, ?CONNECTOR_TYPE_BIN},
-        {connector_config, ConnectorConfig},
-        {proxy_host, "toxiproxy"},
-        {proxy_port, 8474}
+        {connector_config, ConnectorConfig}
         | Config1
     ].
 
@@ -148,6 +150,9 @@ ensure_topic_and_producers(ConnectorConfig, SourceConfig, TestCase, TCConfig) ->
         num_partitions => 1
     }),
     ok = emqx_bridge_kafka_impl_consumer_SUITE:ensure_topics(CreateConfig),
+    %% Apparently, Kafka in 2+ brokers needs a moment to replicate kafka creation...  We
+    %% need to wait or else starting producers to quickly will crash...
+    ct:sleep(500),
     ProducerConfigs = emqx_bridge_kafka_impl_consumer_SUITE:start_producers(TestCase, CreateConfig),
     [{kafka_producers, ProducerConfigs} | TCConfig].
 
@@ -220,6 +225,58 @@ source_config(Overrides0) ->
         },
     emqx_utils_maps:deep_merge(CommonConfig, Overrides).
 
+create_connector_api(Config) ->
+    emqx_bridge_v2_testlib:simplify_result(
+        emqx_bridge_v2_testlib:create_connector_api(
+            Config
+        )
+    ).
+
+probe_source_api(Config, Overrides) ->
+    #{
+        kind := Kind,
+        type := Type,
+        name := Name
+    } = emqx_bridge_v2_testlib:get_common_values(Config),
+    SourceConfig = ?config(source_config, Config),
+    emqx_bridge_v2_testlib:simplify_result(
+        emqx_bridge_v2_testlib:probe_bridge_api(
+            Kind,
+            Type,
+            Name,
+            emqx_utils_maps:deep_merge(SourceConfig, Overrides)
+        )
+    ).
+
+get_source_api(Config) ->
+    #{
+        type := Type,
+        name := Name
+    } = emqx_bridge_v2_testlib:get_common_values(Config),
+    emqx_bridge_v2_testlib:simplify_result(
+        emqx_bridge_v2_testlib:get_source_api(
+            Type, Name
+        )
+    ).
+
+%% For things like listing groups, apparently different brokers return different
+%% responses, and we need to query more than one...
+all_bootstrap_hosts() ->
+    [
+        {<<"kafka-1.emqx.net">>, 9092},
+        {<<"kafka-2.emqx.net">>, 9092}
+    ].
+
+get_groups() ->
+    lists:foldl(
+        fun(Endpoint, Acc) ->
+            {ok, Groups} = brod:list_groups(Endpoint, _ConnOpts = #{}),
+            Groups ++ Acc
+        end,
+        [],
+        all_bootstrap_hosts()
+    ).
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -359,7 +416,6 @@ t_bad_bootstrap_host(Config) ->
 t_absent_group_id(Config) ->
     ?check_trace(
         begin
-            #{<<"bootstrap_hosts">> := BootstrapHosts} = ?config(connector_config, Config),
             SourceConfig = ?config(source_config, Config),
             SourceName = ?config(source_name, Config),
             ?assertEqual(
@@ -371,11 +427,18 @@ t_absent_group_id(Config) ->
                 )
             ),
             {ok, {{_, 201, _}, _, _}} = emqx_bridge_v2_testlib:create_bridge_api(Config),
-            [Endpoint] = emqx_bridge_kafka_impl:hosts(BootstrapHosts),
+            ?retry(
+                1_000,
+                10,
+                ?assertMatch(
+                    {200, #{<<"status">> := <<"connected">>}},
+                    get_source_api(Config)
+                )
+            ),
             GroupId = emqx_bridge_kafka_impl_consumer:consumer_group_id(#{}, SourceName),
             ct:pal("generated group id: ~p", [GroupId]),
-            ?retry(100, 10, begin
-                {ok, Groups} = brod:list_groups(Endpoint, _ConnOpts = #{}),
+            ?retry(1_000, 10, begin
+                Groups = get_groups(),
                 ?assertMatch(
                     [_],
                     [Group || Group = {_, Id, _} <- Groups, Id == GroupId],
@@ -393,18 +456,24 @@ t_absent_group_id(Config) ->
 t_empty_group_id(Config) ->
     ?check_trace(
         begin
-            #{<<"bootstrap_hosts">> := BootstrapHosts} = ?config(connector_config, Config),
             SourceName = ?config(source_name, Config),
             {ok, {{_, 201, _}, _, _}} =
                 emqx_bridge_v2_testlib:create_bridge_api(
                     Config,
                     #{<<"parameters">> => #{<<"group_id">> => <<"">>}}
                 ),
-            [Endpoint] = emqx_bridge_kafka_impl:hosts(BootstrapHosts),
+            ?retry(
+                1_000,
+                10,
+                ?assertMatch(
+                    {200, #{<<"status">> := <<"connected">>}},
+                    get_source_api(Config)
+                )
+            ),
             GroupId = emqx_bridge_kafka_impl_consumer:consumer_group_id(#{}, SourceName),
             ct:pal("generated group id: ~p", [GroupId]),
-            ?retry(100, 10, begin
-                {ok, Groups} = brod:list_groups(Endpoint, _ConnOpts = #{}),
+            ?retry(1_000, 10, begin
+                Groups = get_groups(),
                 ?assertMatch(
                     [_],
                     [Group || Group = {_, Id, _} <- Groups, Id == GroupId],
@@ -420,16 +489,22 @@ t_empty_group_id(Config) ->
 t_custom_group_id(Config) ->
     ?check_trace(
         begin
-            #{<<"bootstrap_hosts">> := BootstrapHosts} = ?config(connector_config, Config),
             CustomGroupId = <<"my_group_id">>,
             {ok, {{_, 201, _}, _, _}} =
                 emqx_bridge_v2_testlib:create_bridge_api(
                     Config,
                     #{<<"parameters">> => #{<<"group_id">> => CustomGroupId}}
                 ),
-            [Endpoint] = emqx_bridge_kafka_impl:hosts(BootstrapHosts),
-            ?retry(100, 10, begin
-                {ok, Groups} = brod:list_groups(Endpoint, _ConnOpts = #{}),
+            ?retry(
+                1_000,
+                10,
+                ?assertMatch(
+                    {200, #{<<"status">> := <<"connected">>}},
+                    get_source_api(Config)
+                )
+            ),
+            ?retry(1_000, 10, begin
+                Groups = get_groups(),
                 ?assertMatch(
                     [_],
                     [Group || Group = {_, Id, _} <- Groups, Id == CustomGroupId],
@@ -467,3 +542,43 @@ t_repeated_topics(Config) ->
         []
     ),
     ok.
+
+%% Verifies that we return an error containing information to debug connection issues when
+%% one of the partition leaders is unreachable.
+t_pretty_api_dry_run_reason(Config) ->
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyName = "kafka_2_plain",
+    ?check_trace(
+        begin
+            {ok, {{_, 201, _}, _, _}} =
+                emqx_bridge_v2_testlib:create_bridge_api(Config),
+            emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+                Res = probe_source_api(
+                    Config,
+                    #{<<"parameters">> => #{<<"topic">> => <<"test-topic-three-partitions">>}}
+                ),
+                ?assertMatch({400, _}, Res),
+                {400, #{<<"message">> := Msg}} = Res,
+                ?assertEqual(
+                    match,
+                    re:run(Msg, <<"Leader for partition . unavailable; reason: ">>, [
+                        {capture, none}
+                    ]),
+                    #{message => Msg}
+                )
+            end),
+            %% Wait for recovery; avoids affecting other test cases due to Kafka restabilizing...
+            ?retry(
+                1_000,
+                10,
+                ?assertMatch(
+                    {200, #{<<"status">> := <<"connected">>}},
+                    get_source_api(Config)
+                )
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.

+ 31 - 27
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -44,6 +44,7 @@ init_per_suite(Config) ->
     KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "toxiproxy.emqx.net"),
     KafkaPort = list_to_integer(os:getenv("KAFKA_PLAIN_PORT", "9292")),
     ProxyName = "kafka_plain",
+    ProxyName2 = "kafka_2_plain",
     DirectKafkaHost = os:getenv("KAFKA_DIRECT_PLAIN_HOST", "kafka-1.emqx.net"),
     DirectKafkaPort = list_to_integer(os:getenv("KAFKA_DIRECT_PLAIN_PORT", "9092")),
     emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
@@ -66,6 +67,7 @@ init_per_suite(Config) ->
         {proxy_host, ProxyHost},
         {proxy_port, ProxyPort},
         {proxy_name, ProxyName},
+        {proxy_name_2, ProxyName2},
         {kafka_host, KafkaHost},
         {kafka_port, KafkaPort},
         {direct_kafka_host, DirectKafkaHost},
@@ -379,6 +381,17 @@ tap_telemetry(HandlerId) ->
 simplify_result(Res) ->
     emqx_bridge_v2_testlib:simplify_result(Res).
 
+with_brokers_down(Config, Fun) ->
+    ProxyName1 = ?config(proxy_name, Config),
+    ProxyName2 = ?config(proxy_name_2, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    emqx_common_test_helpers:with_failure(
+        down, ProxyName1, ProxyHost, ProxyPort, fun() ->
+            emqx_common_test_helpers:with_failure(down, ProxyName2, ProxyHost, ProxyPort, Fun)
+        end
+    ).
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -588,9 +601,6 @@ t_http_api_get(_Config) ->
     ok.
 
 t_create_connector_while_connection_is_down(Config) ->
-    ProxyName = ?config(proxy_name, Config),
-    ProxyHost = ?config(proxy_host, Config),
-    ProxyPort = ?config(proxy_port, Config),
     KafkaHost = ?config(kafka_host, Config),
     KafkaPort = ?config(kafka_port, Config),
     Host = iolist_to_binary([KafkaHost, ":", integer_to_binary(KafkaPort)]),
@@ -622,7 +632,7 @@ t_create_connector_while_connection_is_down(Config) ->
             Disconnected = atom_to_binary(?status_disconnected),
             %% Initially, the connection cannot be stablished.  Messages are not buffered,
             %% hence the status is `?status_disconnected'.
-            emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+            with_brokers_down(Config, fun() ->
                 {ok, {{_, 201, _}, _, #{<<"status">> := Disconnected}}} =
                     emqx_bridge_v2_testlib:create_connector_api(ConnectorParams),
                 {ok, {{_, 201, _}, _, #{<<"status">> := Disconnected}}} =
@@ -673,7 +683,7 @@ t_create_connector_while_connection_is_down(Config) ->
             %% `?status_connecting' to avoid destroying wolff_producers and their replayq
             %% buffers.
             Connecting = atom_to_binary(?status_connecting),
-            emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+            with_brokers_down(Config, fun() ->
                 ?retry(
                     _Sleep0 = 1_100,
                     _Attempts0 = 10,
@@ -1133,9 +1143,6 @@ t_dynamic_topics(Config) ->
 %% Checks that messages accumulated in disk mode for a fixed topic producer are kicked off
 %% when the action is later restarted and kafka is online.
 t_fixed_topic_recovers_in_disk_mode(Config) ->
-    ProxyName = ?config(proxy_name, Config),
-    ProxyHost = ?config(proxy_host, Config),
-    ProxyPort = ?config(proxy_port, Config),
     Type = proplists:get_value(type, Config, ?TYPE),
     ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
     ConnectorConfig = proplists:get_value(
@@ -1191,26 +1198,23 @@ t_fixed_topic_recovers_in_disk_mode(Config) ->
             ),
             %% Cut connection to kafka and enqueue some messages
             ActionId = emqx_bridge_v2:id(Type, ActionName),
-            SentMessages =
-                emqx_common_test_helpers:with_failure(
-                    down, ProxyName, ProxyHost, ProxyPort, fun() ->
-                        ct:sleep(100),
-                        SentMessages = [send_message(ActionName) || _ <- lists:seq(1, 5)],
-                        ?assertEqual(5, emqx_resource_metrics:matched_get(ActionId)),
-                        ?retry(
-                            _Sleep = 200,
-                            _Attempts = 20,
-                            ?assertEqual(5, emqx_resource_metrics:queuing_get(ActionId))
-                        ),
-                        ?assertEqual(0, emqx_resource_metrics:success_get(ActionId)),
-                        %% Turn off action, restore kafka connection
-                        ?assertMatch(
-                            {204, _},
-                            emqx_bridge_v2_testlib:disable_kind_api(action, Type, ActionName)
-                        ),
-                        SentMessages
-                    end
+            SentMessages = with_brokers_down(Config, fun() ->
+                ct:sleep(100),
+                SentMessages = [send_message(ActionName) || _ <- lists:seq(1, 5)],
+                ?assertEqual(5, emqx_resource_metrics:matched_get(ActionId)),
+                ?retry(
+                    _Sleep = 200,
+                    _Attempts = 20,
+                    ?assertEqual(5, emqx_resource_metrics:queuing_get(ActionId))
+                ),
+                ?assertEqual(0, emqx_resource_metrics:success_get(ActionId)),
+                %% Turn off action, restore kafka connection
+                ?assertMatch(
+                    {204, _},
+                    emqx_bridge_v2_testlib:disable_kind_api(action, Type, ActionName)
                 ),
+                SentMessages
+            end),
             %% Restart action; should've shot enqueued messages
             ?tapTelemetry(),
             ?assertMatch(

+ 1 - 0
changes/ee/fix-14375.en.md

@@ -0,0 +1 @@
+Added more information to Kafka Consumer source dry run results, to ease debugging issues with Kafka brokers.