Просмотр исходного кода

Merge pull request #14106 from thalesmg/20241029-r58-kconsu-validate-topics-same-connector

fix(kafka consumer): ensure no repeated topics in the same connector
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
71b790bb32

+ 18 - 0
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -458,6 +458,24 @@ create_action_api(Config, Overrides) ->
     ct:pal("action create (http) result:\n  ~p", [Res]),
     Res.
 
+create_source_api(Config) ->
+    create_source_api(Config, _Overrides = #{}).
+
+create_source_api(Config, Overrides) ->
+    #{
+        kind := source,
+        type := Type,
+        name := Name
+    } = get_common_values(Config),
+    ActionConfig0 = get_value(source_config, Config),
+    ActionConfig = emqx_utils_maps:deep_merge(ActionConfig0, Overrides),
+    Params = ActionConfig#{<<"type">> => Type, <<"name">> => Name},
+    Path = emqx_mgmt_api_test_util:api_path(["sources"]),
+    ct:pal("creating source (http):\n  ~p", [Params]),
+    Res = request(post, Path, Params),
+    ct:pal("source create (http) result:\n  ~p", [Res]),
+    simplify_result(Res).
+
 get_action_api(Config) ->
     ActionName = ?config(action_name, Config),
     ActionType = ?config(action_type, Config),

+ 29 - 3
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl

@@ -251,7 +251,7 @@ on_add_channel(ConnectorResId, ConnectorState0, SourceResId, SourceConfig) ->
         kafka_client_id := ClientID,
         installed_sources := InstalledSources0
     } = ConnectorState0,
-    case start_consumer(SourceConfig, ConnectorResId, SourceResId, ClientID) of
+    case start_consumer(SourceConfig, ConnectorResId, SourceResId, ClientID, ConnectorState0) of
         {ok, SourceState} ->
             InstalledSources = InstalledSources0#{SourceResId => SourceState},
             ConnectorState = ConnectorState0#{installed_sources := InstalledSources},
@@ -405,10 +405,11 @@ ensure_consumer_supervisor_started() ->
     source_config(),
     connector_resource_id(),
     source_resource_id(),
-    brod:client_id()
+    brod:client_id(),
+    connector_state()
 ) ->
     {ok, source_state()} | {error, term()}.
-start_consumer(Config, ConnectorResId, SourceResId, ClientID) ->
+start_consumer(Config, ConnectorResId, SourceResId, ClientID, ConnState) ->
     #{
         bridge_name := BridgeName,
         hookpoints := Hookpoints,
@@ -455,6 +456,7 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) ->
         {offset_commit_interval_seconds, OffsetCommitInterval}
     ],
     KafkaTopics = maps:keys(TopicMapping),
+    ensure_no_repeated_topics(KafkaTopics, ConnState),
     GroupSubscriberConfig =
         #{
             client => ClientID,
@@ -494,6 +496,30 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) ->
             {error, Reason}
     end.
 
+%% Currently, brod treats a consumer process to a specific topic as a singleton (per
+%% client id / connector), meaning that the first subscriber to a given topic will define
+%% the consumer options for all other consumers, and those options persist even after the
+%% original consumer group is terminated.  We enforce that, if the user wants to consume
+%% multiple times from the same topic, then they must create a different connector.
+ensure_no_repeated_topics(KafkaTopics, ConnState) ->
+    #{installed_sources := Sources} = ConnState,
+    InstalledTopics = lists:flatmap(fun(#{kafka_topics := Ts}) -> Ts end, maps:values(Sources)),
+    case KafkaTopics -- InstalledTopics of
+        KafkaTopics ->
+            %% all new topics
+            ok;
+        NewTopics ->
+            ExistingTopics0 = KafkaTopics -- NewTopics,
+            ExistingTopics = lists:join(<<", ">>, ExistingTopics0),
+            Message = iolist_to_binary([
+                <<"Topics ">>,
+                ExistingTopics,
+                <<" already exist in other sources associated with this connector.">>,
+                <<" If you want to repeat topics, create new connector and source(s).">>
+            ]),
+            throw(Message)
+    end.
+
 %% This is to ensure backwards compatibility with the deprectated topic mapping.
 -spec ensure_topic_mapping(source_parameters()) -> #{kafka_topic() := map()}.
 ensure_topic_mapping(#{topic_mapping := [_ | _] = TM}) ->

+ 25 - 0
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl

@@ -441,3 +441,28 @@ t_custom_group_id(Config) ->
         []
     ),
     ok.
+
+%% Currently, brod treats a consumer process to a specific topic as a singleton (per
+%% client id / connector), meaning that the first subscriber to a given topic will define
+%% the consumer options for all other consumers, and those options persist even after the
+%% original consumer group is terminated.  We enforce that, if the user wants to consume
+%% multiple times from the same topic, then they must create a different connector.
+t_repeated_topics(Config) ->
+    ?check_trace(
+        begin
+            %% first source is fine
+            {ok, {{_, 201, _}, _, _}} =
+                emqx_bridge_v2_testlib:create_bridge_api(Config),
+            %% second source fails to create
+            Name2 = <<"duplicated">>,
+            {201, #{<<"error">> := Error}} =
+                emqx_bridge_v2_testlib:create_source_api([{source_name, Name2} | Config]),
+            ?assertEqual(
+                match,
+                re:run(Error, <<"Topics .* already exist in other sources">>, [{capture, none}])
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.

+ 1 - 0
changes/ee/breaking-14106.en.md

@@ -0,0 +1 @@
+Added a validation that forbids a single Kafka Consumer connector from containing sources with repeated Kafka topics.  If you want to repeat topics, create new connector and source(s).