|
|
@@ -1264,87 +1264,6 @@ t_multiple_topic_mappings(Config) ->
|
|
|
),
|
|
|
ok.
|
|
|
|
|
|
-%% 2+ pull workers do not duplicate delivered messages
|
|
|
-t_multiple_pull_workers(Config) ->
|
|
|
- ct:timetrap({seconds, 121}),
|
|
|
- BridgeName = ?config(consumer_name, Config),
|
|
|
- TopicMapping = ?config(topic_mapping, Config),
|
|
|
- ResourceId = resource_id(Config),
|
|
|
- ?check_trace(
|
|
|
- #{timetrap => 120_000},
|
|
|
- begin
|
|
|
- NConsumers = 3,
|
|
|
- start_and_subscribe_mqtt(Config),
|
|
|
- {ok, SRef0} =
|
|
|
- snabbkaffe:subscribe(
|
|
|
- ?match_event(#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}),
|
|
|
- NConsumers,
|
|
|
- infinity
|
|
|
- ),
|
|
|
- {ok, _} = create_bridge(
|
|
|
- Config,
|
|
|
- #{
|
|
|
- <<"consumer">> => #{
|
|
|
- %% reduce flakiness
|
|
|
- <<"ack_deadline">> => <<"10m">>,
|
|
|
- <<"ack_retry_interval">> => <<"1s">>,
|
|
|
- <<"consumer_workers_per_topic">> => NConsumers
|
|
|
- },
|
|
|
- <<"resource_opts">> => #{
|
|
|
- %% Used by worker when patching subscritpion; we increase it a bit
|
|
|
- %% here because (at least) the gcp emulator tends to time out /
|
|
|
- %% throttle (?) workers targeting the same subscription, making
|
|
|
- %% the test flakier.
|
|
|
- <<"request_ttl">> => <<"5s">>,
|
|
|
- <<"resume_interval">> => <<"1s">>
|
|
|
- }
|
|
|
- }
|
|
|
- ),
|
|
|
- {ok, _} = snabbkaffe:receive_events(SRef0),
|
|
|
- [#{pubsub_topic := Topic}] = TopicMapping,
|
|
|
- Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
|
|
|
- Messages = [#{<<"data">> => Payload}],
|
|
|
- ?retry(
|
|
|
- 500,
|
|
|
- 20,
|
|
|
- ?assertMatch(
|
|
|
- {ok, connected},
|
|
|
- health_check(Config)
|
|
|
- )
|
|
|
- ),
|
|
|
- pubsub_publish(Config, Topic, Messages),
|
|
|
- {ok, Published} = receive_published(),
|
|
|
- ?assertMatch(
|
|
|
- [#{payload := #{<<"value">> := Payload}}],
|
|
|
- Published
|
|
|
- ),
|
|
|
- ?retry(
|
|
|
- _Interval = 200,
|
|
|
- _NAttempts = 20,
|
|
|
- ?assertEqual(1, emqx_resource_metrics:received_get(ResourceId))
|
|
|
- ),
|
|
|
-
|
|
|
- assert_non_received_metrics(BridgeName),
|
|
|
-
|
|
|
- wait_acked(#{n => 1, timeout => 90_000}),
|
|
|
-
|
|
|
- ok
|
|
|
- end,
|
|
|
- [
|
|
|
- prop_all_pulled_are_acked(),
|
|
|
- prop_handled_only_once(),
|
|
|
- {"message is processed only once", fun(Trace) ->
|
|
|
- ?assertMatch({timeout, _}, receive_published(#{timeout => 5_000})),
|
|
|
- ?assertMatch(
|
|
|
- [#{?snk_span := start}, #{?snk_span := {complete, _}}],
|
|
|
- ?of_kind("gcp_pubsub_consumer_worker_handle_message", Trace)
|
|
|
- ),
|
|
|
- ok
|
|
|
- end}
|
|
|
- ]
|
|
|
- ),
|
|
|
- ok.
|
|
|
-
|
|
|
t_nonexistent_topic(Config) ->
|
|
|
BridgeName = ?config(bridge_name, Config),
|
|
|
[Mapping0] = ?config(topic_mapping, Config),
|