|
|
@@ -15,6 +15,10 @@
|
|
|
|
|
|
-define(ROOT_KEY_ACTIONS, actions).
|
|
|
-define(ROOT_KEY_SOURCES, sources).
|
|
|
+-define(tpal(MSG), begin
|
|
|
+ ct:pal(MSG),
|
|
|
+ ?tp(notice, MSG, #{})
|
|
|
+end).
|
|
|
|
|
|
%% ct setup helpers
|
|
|
|
|
|
@@ -946,7 +950,9 @@ t_consume(Config, Opts) ->
|
|
|
check_fn := CheckFn,
|
|
|
produce_tracepoint := TracePointFn
|
|
|
} = Opts,
|
|
|
+ TestTimeout = maps:get(test_timeout, Opts, 60_000),
|
|
|
?check_trace(
|
|
|
+ #{timetrap => TestTimeout},
|
|
|
begin
|
|
|
ConsumerReadyTimeout = maps:get(consumer_ready_timeout, Opts, 15_000),
|
|
|
case ConsumerReadyTPFn of
|
|
|
@@ -957,9 +963,12 @@ t_consume(Config, Opts) ->
|
|
|
Predicate, _NEvents = 1, ConsumerReadyTimeout
|
|
|
)
|
|
|
end,
|
|
|
+ ?tpal("creating connector and source"),
|
|
|
?assertMatch({ok, _}, create_bridge_api(Config)),
|
|
|
?assertMatch({ok, _}, snabbkaffe:receive_events(SRef0)),
|
|
|
+ ?tpal("adding hookpoint"),
|
|
|
ok = add_source_hookpoint(Config),
|
|
|
+ ?tpal("waiting until connected"),
|
|
|
?retry(
|
|
|
_Sleep = 200,
|
|
|
_Attempts = 20,
|
|
|
@@ -968,14 +977,16 @@ t_consume(Config, Opts) ->
|
|
|
health_check_channel(Config)
|
|
|
)
|
|
|
),
|
|
|
+ ?tpal("producing message and waiting for it to be consumed"),
|
|
|
?assertMatch(
|
|
|
{_, {ok, _}},
|
|
|
snabbkaffe:wait_async_action(
|
|
|
ProduceFn,
|
|
|
TracePointFn,
|
|
|
- 15_000
|
|
|
+ infinity
|
|
|
)
|
|
|
),
|
|
|
+ ?tpal("waiting for consumed message"),
|
|
|
receive
|
|
|
{consumed_message, Message} ->
|
|
|
CheckFn(Message)
|