Explorar o código

Merge pull request #14104 from thalesmg/20241029-r58-test-flaky-kconsu

test(kafka consumer): fix flaky test
Thales Macedo Garitezi hai 1 ano
pai
achega
144c48a847

+ 12 - 1
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -15,6 +15,10 @@
 
 -define(ROOT_KEY_ACTIONS, actions).
 -define(ROOT_KEY_SOURCES, sources).
+-define(tpal(MSG), begin
+    ct:pal(MSG),
+    ?tp(notice, MSG, #{})
+end).
 
 %% ct setup helpers
 
@@ -946,7 +950,9 @@ t_consume(Config, Opts) ->
         check_fn := CheckFn,
         produce_tracepoint := TracePointFn
     } = Opts,
+    TestTimeout = maps:get(test_timeout, Opts, 60_000),
     ?check_trace(
+        #{timetrap => TestTimeout},
         begin
             ConsumerReadyTimeout = maps:get(consumer_ready_timeout, Opts, 15_000),
             case ConsumerReadyTPFn of
@@ -957,9 +963,12 @@ t_consume(Config, Opts) ->
                         Predicate, _NEvents = 1, ConsumerReadyTimeout
                     )
             end,
+            ?tpal("creating connector and source"),
             ?assertMatch({ok, _}, create_bridge_api(Config)),
             ?assertMatch({ok, _}, snabbkaffe:receive_events(SRef0)),
+            ?tpal("adding hookpoint"),
             ok = add_source_hookpoint(Config),
+            ?tpal("waiting until connected"),
             ?retry(
                 _Sleep = 200,
                 _Attempts = 20,
@@ -968,14 +977,16 @@ t_consume(Config, Opts) ->
                     health_check_channel(Config)
                 )
             ),
+            ?tpal("producing message and waiting for it to be consumed"),
             ?assertMatch(
                 {_, {ok, _}},
                 snabbkaffe:wait_async_action(
                     ProduceFn,
                     TracePointFn,
-                    15_000
+                    infinity
                 )
             ),
+            ?tpal("waiting for consumed message"),
             receive
                 {consumed_message, Message} ->
                     CheckFn(Message)

+ 3 - 2
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl

@@ -209,7 +209,7 @@ source_config(Overrides0) ->
                     <<"max_batch_bytes">> => <<"896KB">>,
                     <<"max_wait_time">> => <<"500ms">>,
                     <<"max_rejoin_attempts">> => <<"5">>,
-                    <<"offset_reset_policy">> => <<"latest">>,
+                    <<"offset_reset_policy">> => <<"earliest">>,
                     <<"topic">> => <<"please override">>,
                     <<"value_encoding_mode">> => <<"none">>
                 },
@@ -218,7 +218,7 @@ source_config(Overrides0) ->
                 <<"resume_interval">> => <<"2s">>
             }
         },
-    maps:merge(CommonConfig, Overrides).
+    emqx_utils_maps:deep_merge(CommonConfig, Overrides).
 
 %%------------------------------------------------------------------------------
 %% Testcases
@@ -280,6 +280,7 @@ t_consume(Config) ->
     ok = emqx_bridge_v2_testlib:t_consume(
         Config,
         #{
+            test_timeout => timer:seconds(20),
             consumer_ready_tracepoint => ?match_n_events(
                 NumPartitions,
                 #{?snk_kind := kafka_consumer_subscriber_init}