|
|
@@ -251,7 +251,7 @@ on_add_channel(ConnectorResId, ConnectorState0, SourceResId, SourceConfig) ->
|
|
|
kafka_client_id := ClientID,
|
|
|
installed_sources := InstalledSources0
|
|
|
} = ConnectorState0,
|
|
|
- case start_consumer(SourceConfig, ConnectorResId, SourceResId, ClientID) of
|
|
|
+ case start_consumer(SourceConfig, ConnectorResId, SourceResId, ClientID, ConnectorState0) of
|
|
|
{ok, SourceState} ->
|
|
|
InstalledSources = InstalledSources0#{SourceResId => SourceState},
|
|
|
ConnectorState = ConnectorState0#{installed_sources := InstalledSources},
|
|
|
@@ -405,10 +405,11 @@ ensure_consumer_supervisor_started() ->
|
|
|
source_config(),
|
|
|
connector_resource_id(),
|
|
|
source_resource_id(),
|
|
|
- brod:client_id()
|
|
|
+ brod:client_id(),
|
|
|
+ connector_state()
|
|
|
) ->
|
|
|
{ok, source_state()} | {error, term()}.
|
|
|
-start_consumer(Config, ConnectorResId, SourceResId, ClientID) ->
|
|
|
+start_consumer(Config, ConnectorResId, SourceResId, ClientID, ConnState) ->
|
|
|
#{
|
|
|
bridge_name := BridgeName,
|
|
|
hookpoints := Hookpoints,
|
|
|
@@ -455,6 +456,7 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) ->
|
|
|
{offset_commit_interval_seconds, OffsetCommitInterval}
|
|
|
],
|
|
|
KafkaTopics = maps:keys(TopicMapping),
|
|
|
+ ensure_no_repeated_topics(KafkaTopics, ConnState),
|
|
|
GroupSubscriberConfig =
|
|
|
#{
|
|
|
client => ClientID,
|
|
|
@@ -494,6 +496,30 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) ->
|
|
|
{error, Reason}
|
|
|
end.
|
|
|
|
|
|
+%% Currently, brod treats a consumer process to a specific topic as a singleton (per
|
|
|
+%% client id / connector), meaning that the first subscriber to a given topic will define
|
|
|
+%% the consumer options for all other consumers, and those options persist even after the
|
|
|
+%% original consumer group is terminated. We enforce that, if the user wants to consume
|
|
|
+%% multiple times from the same topic, then they must create a different connector.
|
|
|
+ensure_no_repeated_topics(KafkaTopics, ConnState) ->
|
|
|
+ #{installed_sources := Sources} = ConnState,
|
|
|
+ InstalledTopics = lists:flatmap(fun(#{kafka_topics := Ts}) -> Ts end, maps:values(Sources)),
|
|
|
+ case KafkaTopics -- InstalledTopics of
|
|
|
+ KafkaTopics ->
|
|
|
+ %% all new topics
|
|
|
+ ok;
|
|
|
+ NewTopics ->
|
|
|
+ ExistingTopics0 = KafkaTopics -- NewTopics,
|
|
|
+ ExistingTopics = lists:join(<<", ">>, ExistingTopics0),
|
|
|
+ Message = iolist_to_binary([
|
|
|
+ <<"Topics ">>,
|
|
|
+ ExistingTopics,
|
|
|
+ <<" already exist in other sources associated with this connector.">>,
|
|
|
+ <<" If you want to repeat topics, create new connector and source(s).">>
|
|
|
+ ]),
|
|
|
+ throw(Message)
|
|
|
+ end.
|
|
|
+
|
|
|
%% This is to ensure backwards compatibility with the deprectated topic mapping.
|
|
|
-spec ensure_topic_mapping(source_parameters()) -> #{kafka_topic() := map()}.
|
|
|
ensure_topic_mapping(#{topic_mapping := [_ | _] = TM}) ->
|