|
|
@@ -698,6 +698,20 @@ create_bridge(Config, Overrides) ->
|
|
|
KafkaConfig = emqx_utils_maps:deep_merge(KafkaConfig0, Overrides),
|
|
|
emqx_bridge:create(Type, Name, KafkaConfig).
|
|
|
|
|
|
+create_bridge_wait_for_balance(Config) ->
|
|
|
+ setup_group_subscriber_spy(self()),
|
|
|
+ try
|
|
|
+ Res = create_bridge(Config),
|
|
|
+ receive
|
|
|
+ {kafka_assignment, _, _} ->
|
|
|
+ Res
|
|
|
+ after 20_000 ->
|
|
|
+ ct:fail("timed out waiting for kafka assignment")
|
|
|
+ end
|
|
|
+ after
|
|
|
+ kill_group_subscriber_spy()
|
|
|
+ end.
|
|
|
+
|
|
|
delete_bridge(Config) ->
|
|
|
Type = ?BRIDGE_TYPE_BIN,
|
|
|
Name = ?config(kafka_name, Config),
|
|
|
@@ -1020,31 +1034,37 @@ reconstruct_assignments_from_events(KafkaTopic, Events0, Acc0) ->
|
|
|
setup_group_subscriber_spy_fn() ->
|
|
|
TestPid = self(),
|
|
|
fun() ->
|
|
|
- ok = meck:new(brod_group_subscriber_v2, [
|
|
|
- passthrough, no_link, no_history, non_strict
|
|
|
- ]),
|
|
|
- ok = meck:expect(
|
|
|
- brod_group_subscriber_v2,
|
|
|
- assignments_received,
|
|
|
- fun(Pid, MemberId, GenerationId, TopicAssignments) ->
|
|
|
- ?tp(
|
|
|
- kafka_assignment,
|
|
|
- #{
|
|
|
- node => node(),
|
|
|
- pid => Pid,
|
|
|
- member_id => MemberId,
|
|
|
- generation_id => GenerationId,
|
|
|
- topic_assignments => TopicAssignments
|
|
|
- }
|
|
|
- ),
|
|
|
- TestPid !
|
|
|
- {kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}},
|
|
|
- meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments])
|
|
|
- end
|
|
|
- ),
|
|
|
- ok
|
|
|
+ setup_group_subscriber_spy(TestPid)
|
|
|
end.
|
|
|
|
|
|
+setup_group_subscriber_spy(TestPid) ->
|
|
|
+ ok = meck:new(brod_group_subscriber_v2, [
|
|
|
+ passthrough, no_link, no_history, non_strict
|
|
|
+ ]),
|
|
|
+ ok = meck:expect(
|
|
|
+ brod_group_subscriber_v2,
|
|
|
+ assignments_received,
|
|
|
+ fun(Pid, MemberId, GenerationId, TopicAssignments) ->
|
|
|
+ ?tp(
|
|
|
+ kafka_assignment,
|
|
|
+ #{
|
|
|
+ node => node(),
|
|
|
+ pid => Pid,
|
|
|
+ member_id => MemberId,
|
|
|
+ generation_id => GenerationId,
|
|
|
+ topic_assignments => TopicAssignments
|
|
|
+ }
|
|
|
+ ),
|
|
|
+ TestPid !
|
|
|
+ {kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}},
|
|
|
+ meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments])
|
|
|
+ end
|
|
|
+ ),
|
|
|
+ ok.
|
|
|
+
|
|
|
+kill_group_subscriber_spy() ->
|
|
|
+ meck:unload(brod_group_subscriber_v2).
|
|
|
+
|
|
|
wait_for_cluster_rpc(Node) ->
|
|
|
%% need to wait until the config handler is ready after
|
|
|
%% restarting during the cluster join.
|
|
|
@@ -1702,10 +1722,7 @@ t_dynamic_mqtt_topic(Config) ->
|
|
|
MQTTTopic = emqx_topic:join([KafkaTopic, '#']),
|
|
|
?check_trace(
|
|
|
begin
|
|
|
- ?assertMatch(
|
|
|
- {ok, _},
|
|
|
- create_bridge(Config)
|
|
|
- ),
|
|
|
+ ?assertMatch({ok, _}, create_bridge_wait_for_balance(Config)),
|
|
|
wait_until_subscribers_are_ready(NPartitions, 40_000),
|
|
|
{ok, C} = emqtt:start_link(),
|
|
|
on_exit(fun() -> emqtt:stop(C) end),
|