|
@@ -706,7 +706,9 @@ prop_all_pulled_are_acked(Trace) ->
|
|
|
|| #{messages := Msgs} <- ?of_kind(gcp_pubsub_consumer_worker_decoded_messages, Trace),
|
|
|| #{messages := Msgs} <- ?of_kind(gcp_pubsub_consumer_worker_decoded_messages, Trace),
|
|
|
#{<<"message">> := #{<<"messageId">> := MsgId}} <- Msgs
|
|
#{<<"message">> := #{<<"messageId">> := MsgId}} <- Msgs
|
|
|
],
|
|
],
|
|
|
- AckedMsgIds0 = ?projection(acks, ?of_kind(gcp_pubsub_consumer_worker_acknowledged, Trace)),
|
|
|
|
|
|
|
+ %% we just need to check that it _tries_ to ack each id; the result itself doesn't
|
|
|
|
|
+ %% matter, as it might timeout.
|
|
|
|
|
+ AckedMsgIds0 = ?projection(acks, ?of_kind(gcp_pubsub_consumer_worker_will_acknowledge, Trace)),
|
|
|
AckedMsgIds1 = [
|
|
AckedMsgIds1 = [
|
|
|
MsgId
|
|
MsgId
|
|
|
|| PendingAcks <- AckedMsgIds0, {MsgId, _AckId} <- maps:to_list(PendingAcks)
|
|
|| PendingAcks <- AckedMsgIds0, {MsgId, _AckId} <- maps:to_list(PendingAcks)
|
|
@@ -1172,7 +1174,12 @@ t_multiple_topic_mappings(Config) ->
|
|
|
?assertMatch(
|
|
?assertMatch(
|
|
|
{{ok, _}, {ok, _}},
|
|
{{ok, _}, {ok, _}},
|
|
|
?wait_async_action(
|
|
?wait_async_action(
|
|
|
- create_bridge(Config),
|
|
|
|
|
|
|
+ create_bridge(
|
|
|
|
|
+ Config,
|
|
|
|
|
+ #{
|
|
|
|
|
+ <<"consumer">> => #{<<"ack_deadline">> => <<"10m">>}
|
|
|
|
|
+ }
|
|
|
|
|
+ ),
|
|
|
#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
|
|
#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
|
|
|
40_000
|
|
40_000
|
|
|
)
|
|
)
|
|
@@ -1233,7 +1240,7 @@ t_multiple_topic_mappings(Config) ->
|
|
|
],
|
|
],
|
|
|
Published
|
|
Published
|
|
|
),
|
|
),
|
|
|
- wait_acked(#{n => 2}),
|
|
|
|
|
|
|
+ ?block_until(#{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}, 20_000),
|
|
|
?retry(
|
|
?retry(
|
|
|
_Interval = 200,
|
|
_Interval = 200,
|
|
|
_NAttempts = 20,
|
|
_NAttempts = 20,
|