Просмотр исходного кода

test: fix flaky kafka consumer test

Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
64faccf50b

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge, [
     {description, "EMQX bridges"},
-    {vsn, "0.1.13"},
+    {vsn, "0.1.14"},
     {registered, [emqx_bridge_sup]},
     {mod, {emqx_bridge_app, []}},
     {applications, [

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_engine.app.src

@@ -2,7 +2,7 @@
 {application, emqx_rule_engine, [
     {description, "EMQX Rule Engine"},
     % strict semver, bump manually!
-    {vsn, "5.0.11"},
+    {vsn, "5.0.12"},
     {modules, []},
     {registered, [emqx_rule_engine_sup, emqx_rule_engine]},
     {applications, [kernel, stdlib, rulesql, getopt, emqx_ctl]},

+ 21 - 5
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl

@@ -1744,7 +1744,18 @@ t_node_joins_existing_cluster(Config) ->
             setup_group_subscriber_spy(N1),
             {{ok, _}, {ok, _}} =
                 ?wait_async_action(
-                    erpc:call(N1, fun() -> {ok, _} = create_bridge(Config) end),
+                    erpc:call(N1, fun() ->
+                        {ok, _} = create_bridge(
+                            Config,
+                            #{
+                                <<"kafka">> =>
+                                    #{
+                                        <<"offset_reset_policy">> =>
+                                            <<"earliest">>
+                                    }
+                            }
+                        )
+                    end),
                     #{?snk_kind := kafka_consumer_subscriber_started},
                     15_000
                 ),
@@ -1775,14 +1786,19 @@ t_node_joins_existing_cluster(Config) ->
             wait_for_cluster_rpc(N2),
 
             {ok, _} = snabbkaffe:receive_events(SRef0),
-            ?assertMatch({ok, _}, erpc:call(N2, emqx_bridge, lookup, [BridgeId])),
+            ?retry(
+                _Sleep1 = 100,
+                _Attempts1 = 50,
+                ?assertMatch({ok, _}, erpc:call(N2, emqx_bridge, lookup, [BridgeId]))
+            ),
+
             %% Give some time for the consumers in both nodes to
             %% rebalance.
             {ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, Nodes, 30_000),
             %% Publish some messages so we can check they came from each node.
             ?retry(
-                _Sleep1 = 100,
-                _Attempts1 = 50,
+                _Sleep2 = 100,
+                _Attempts2 = 50,
                 true = erpc:call(N2, emqx_router, has_routes, [MQTTTopic])
             ),
             {ok, SRef1} =
@@ -1792,7 +1808,7 @@ t_node_joins_existing_cluster(Config) ->
                         ?snk_span := {complete, _}
                     }),
                     NPartitions,
-                    10_000
+                    20_000
                 ),
             lists:foreach(
                 fun(N) ->