Просмотр исходного кода

feat: migrate kafka consumer bridge to source + connector

Fixes https://emqx.atlassian.net/browse/EMQX-11848
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
6b9844ae82

+ 1 - 0
apps/emqx_bridge/src/emqx_action_info.erl

@@ -92,6 +92,7 @@ hard_coded_action_info_modules_ee() ->
         emqx_bridge_gcp_pubsub_consumer_action_info,
         emqx_bridge_gcp_pubsub_producer_action_info,
         emqx_bridge_kafka_action_info,
+        emqx_bridge_kafka_consumer_action_info,
         emqx_bridge_kinesis_action_info,
         emqx_bridge_hstreamdb_action_info,
         emqx_bridge_matrix_action_info,

+ 17 - 7
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -765,19 +765,26 @@ create_dry_run(ConfRootKey, Type, Conf0) ->
             {error, Reason1}
     end.
 
-create_dry_run_helper(ConfRootKey, BridgeType, ConnectorRawConf, BridgeV2RawConf) ->
+create_dry_run_helper(ConfRootKey, BridgeV2Type, ConnectorRawConf, BridgeV2RawConf) ->
     BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
-    ConnectorType = connector_type(BridgeType),
+    ConnectorType = connector_type(BridgeV2Type),
     OnReadyCallback =
         fun(ConnectorId) ->
             {_, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId),
-            ChannelTestId = id(BridgeType, BridgeName, ConnectorName),
-            Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2RawConf),
+            ChannelTestId = id(BridgeV2Type, BridgeName, ConnectorName),
+            BridgeV2Conf0 = fill_defaults(
+                BridgeV2Type,
+                BridgeV2RawConf,
+                bin(ConfRootKey),
+                emqx_bridge_v2_schema,
+                #{make_serializable => false}
+            ),
+            BridgeV2Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2Conf0),
             AugmentedConf = augment_channel_config(
                 ConfRootKey,
-                BridgeType,
+                BridgeV2Type,
                 BridgeName,
-                Conf
+                BridgeV2Conf
             ),
             case emqx_resource_manager:add_channel(ConnectorId, ChannelTestId, AugmentedConf) of
                 {error, Reason} ->
@@ -1204,8 +1211,11 @@ perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], E
     perform_bridge_changes(Tasks, Errors).
 
 fill_defaults(Type, RawConf, TopLevelConf, SchemaModule) ->
+    fill_defaults(Type, RawConf, TopLevelConf, SchemaModule, _Opts = #{}).
+
+fill_defaults(Type, RawConf, TopLevelConf, SchemaModule, Opts) ->
     PackedConf = pack_bridge_conf(Type, RawConf, TopLevelConf),
-    FullConf = emqx_config:fill_defaults(SchemaModule, PackedConf, #{}),
+    FullConf = emqx_config:fill_defaults(SchemaModule, PackedConf, Opts),
     unpack_bridge_conf(Type, FullConf, TopLevelConf).
 
 pack_bridge_conf(Type, RawConf, TopLevelConf) ->

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge_v2_api.erl

@@ -775,7 +775,7 @@ handle_update(ConfRootKey, Id, Conf0) ->
         Id,
         case emqx_bridge_v2:lookup(ConfRootKey, BridgeType, BridgeName) of
             {ok, _} ->
-                RawConf = emqx:get_raw_config([bridges, BridgeType, BridgeName], #{}),
+                RawConf = emqx:get_raw_config([ConfRootKey, BridgeType, BridgeName], #{}),
                 Conf = emqx_utils:deobfuscate(Conf1, RawConf),
                 update_bridge(ConfRootKey, BridgeType, BridgeName, Conf);
             {error, not_found} ->

+ 41 - 21
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -89,6 +89,7 @@ end_per_testcase(_Testcase, Config) ->
             %% in CI, apparently this needs more time since the
             %% machines struggle with all the containers running...
             emqx_common_test_helpers:call_janitor(60_000),
+            delete_all_bridges_and_connectors(),
             ok = snabbkaffe:stop(),
             ok
     end.
@@ -132,7 +133,13 @@ parse_and_check(Kind, Type, Name, InnerConfigMap0) ->
     TypeBin = emqx_utils_conv:bin(Type),
     RawConf = #{RootBin => #{TypeBin => #{Name => InnerConfigMap0}}},
     #{RootBin := #{TypeBin := #{Name := InnerConfigMap}}} = hocon_tconf:check_plain(
-        emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}
+        emqx_bridge_v2_schema,
+        RawConf,
+        #{
+            required => false,
+            atom_key => false,
+            make_serializable => true
+        }
     ),
     InnerConfigMap.
 
@@ -140,7 +147,13 @@ parse_and_check_connector(Type, Name, InnerConfigMap0) ->
     TypeBin = emqx_utils_conv:bin(Type),
     RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap0}}},
     #{<<"connectors">> := #{TypeBin := #{Name := InnerConfigMap}}} = hocon_tconf:check_plain(
-        emqx_connector_schema, RawConf, #{required => false, atom_key => false}
+        emqx_connector_schema,
+        RawConf,
+        #{
+            required => false,
+            atom_key => false,
+            make_serializable => true
+        }
     ),
     InnerConfigMap.
 
@@ -282,20 +295,23 @@ list_bridges_api() ->
     ct:pal("list bridges result: ~p", [Res]),
     Res.
 
+get_source_api(BridgeType, BridgeName) ->
+    get_bridge_api(source, BridgeType, BridgeName).
+
 get_bridge_api(BridgeType, BridgeName) ->
+    get_bridge_api(action, BridgeType, BridgeName).
+
+get_bridge_api(BridgeKind, BridgeType, BridgeName) ->
     BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
     Params = [],
-    Path = emqx_mgmt_api_test_util:api_path(["actions", BridgeId]),
-    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
-    Opts = #{return_all => true},
-    ct:pal("get bridge ~p (via http)", [{BridgeType, BridgeName}]),
-    Res =
-        case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader, Params, Opts) of
-            {ok, {Status, Headers, Body0}} ->
-                {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
-            Error ->
-                Error
+    Root =
+        case BridgeKind of
+            source -> "sources";
+            action -> "actions"
         end,
+    Path = emqx_mgmt_api_test_util:api_path([Root, BridgeId]),
+    ct:pal("get bridge ~p (via http)", [{BridgeKind, BridgeType, BridgeName}]),
+    Res = request(get, Path, Params),
     ct:pal("get bridge ~p result: ~p", [{BridgeType, BridgeName}, Res]),
     Res.
 
@@ -672,7 +688,8 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
     end,
     ok.
 
-%% - `ProduceFn': produces a message in the remote system that shall be consumed.
+%% - `ProduceFn': produces a message in the remote system that shall be consumed.  May be
+%%    a `{function(), integer()}' tuple.
 %% - `Tracepoint': marks the end of consumed message processing.
 t_consume(Config, Opts) ->
     #{
@@ -683,14 +700,17 @@ t_consume(Config, Opts) ->
     } = Opts,
     ?check_trace(
         begin
-            ?assertMatch(
-                {{ok, _}, {ok, _}},
-                snabbkaffe:wait_async_action(
-                    fun() -> create_bridge_api(Config) end,
-                    ConsumerReadyTPFn,
-                    15_000
-                )
-            ),
+            ConsumerReadyTimeout = maps:get(consumer_ready_timeout, Opts, 15_000),
+            case ConsumerReadyTPFn of
+                {Predicate, NEvents} when is_function(Predicate) ->
+                    {ok, SRef0} = snabbkaffe:subscribe(Predicate, NEvents, ConsumerReadyTimeout);
+                Predicate when is_function(Predicate) ->
+                    {ok, SRef0} = snabbkaffe:subscribe(
+                        Predicate, _NEvents = 1, ConsumerReadyTimeout
+                    )
+            end,
+            ?assertMatch({ok, _}, create_bridge_api(Config)),
+            ?assertMatch({ok, _}, snabbkaffe:receive_events(SRef0)),
             ok = add_source_hookpoint(Config),
             ?retry(
                 _Sleep = 200,

+ 6 - 1
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src

@@ -12,7 +12,12 @@
         brod,
         brod_gssapi
     ]},
-    {env, [{emqx_action_info_modules, [emqx_bridge_kafka_action_info]}]},
+    {env, [
+        {emqx_action_info_modules, [
+            emqx_bridge_kafka_action_info,
+            emqx_bridge_kafka_consumer_action_info
+        ]}
+    ]},
     {modules, []},
 
     {links, []}

+ 105 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_action_info.erl

@@ -0,0 +1,105 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_kafka_consumer_action_info).
+
+-behaviour(emqx_action_info).
+
+-export([
+    is_source/0,
+    is_action/0,
+    bridge_v1_type_name/0,
+    action_type_name/0,
+    connector_type_name/0,
+    schema_module/0,
+    connector_action_config_to_bridge_v1_config/2,
+    bridge_v1_config_to_action_config/2
+]).
+
+is_source() -> true.
+
+is_action() -> false.
+
+bridge_v1_type_name() -> kafka_consumer.
+
+action_type_name() -> kafka_consumer.
+
+connector_type_name() -> kafka_consumer.
+
+schema_module() -> emqx_bridge_kafka_consumer_schema.
+
+connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
+    V1Config1 = maps:remove(<<"connector">>, ActionConfig),
+    V1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, V1Config1),
+    V1Config3 = maybe_fabricate_topic_mapping(V1Config2),
+    {Params1, V1Config4} = maps:take(<<"parameters">>, V1Config3),
+    TopLevelCfgKeys = [to_bin(K) || {K, _} <- emqx_bridge_kafka:fields(consumer_opts), K =/= kafka],
+    TopLevelCfg = maps:with(TopLevelCfgKeys, Params1),
+    %% `topic' is v2-only
+    Params = maps:without([<<"topic">> | TopLevelCfgKeys], Params1),
+    V1Config5 = emqx_utils_maps:deep_merge(V1Config4, TopLevelCfg),
+    V1Config = emqx_utils_maps:update_if_present(
+        <<"resource_opts">>,
+        %% Slightly different from default source resource opts...
+        fun(RO) -> maps:with(v1_fields(connector_resource_opts), RO) end,
+        V1Config5
+    ),
+    maps:put(<<"kafka">>, Params, V1Config).
+
+bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
+    Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config(
+        BridgeV1Conf, ConnectorName, schema_module(), source_parameters
+    ),
+    TopicMapping = maps:get(<<"topic_mapping">>, BridgeV1Conf, []),
+    Params0 = maps:get(<<"kafka">>, BridgeV1Conf, #{}),
+    Params1 = maps:with(source_parameters_field_keys(), Params0),
+    Params2 = emqx_utils_maps:put_if(
+        Params1, <<"topic_mapping">>, TopicMapping, TopicMapping =/= []
+    ),
+    Params = maybe_set_kafka_topic(Params2),
+    {source, action_type_name(), maps:put(<<"parameters">>, Params, Config0)}.
+
+%%------------------------------------------------------------------------------------------
+%% Internal helper functions
+%%------------------------------------------------------------------------------------------
+
+%% The new schema has a single kafka topic, so we take it from topic mapping when
+%% converting from v1.
+maybe_set_kafka_topic(#{<<"topic_mapping">> := [#{<<"kafka_topic">> := Topic} | _]} = Params) ->
+    Params#{<<"topic">> => Topic};
+maybe_set_kafka_topic(Params) ->
+    Params.
+
+%% The old schema requires `topic_mapping', which is now hidden.
+maybe_fabricate_topic_mapping(#{<<"parameters">> := Params0} = BridgeV1Config0) ->
+    #{<<"topic">> := Topic} = Params0,
+    case maps:get(<<"topic_mapping">>, Params0, undefined) of
+        [_ | _] ->
+            BridgeV1Config0;
+        _ ->
+            %% Have to fabricate an MQTT topic, unfortunately...  QoS and payload already
+            %% have defaults.
+            FakeTopicMapping = #{
+                <<"kafka_topic">> => Topic,
+                <<"mqtt_topic">> => <<>>
+            },
+            Params = Params0#{<<"topic_mapping">> => [FakeTopicMapping]},
+            BridgeV1Config0#{<<"parameters">> := Params}
+    end.
+
+v1_fields(StructName) ->
+    [
+        to_bin(K)
+     || {K, _} <- emqx_bridge_kafka:fields(StructName)
+    ].
+
+source_parameters_field_keys() ->
+    [
+        to_bin(K)
+     || {K, _} <- emqx_bridge_kafka_consumer_schema:fields(source_parameters)
+    ].
+
+to_bin(B) when is_binary(B) -> B;
+to_bin(L) when is_list(L) -> list_to_binary(L);
+to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).

+ 233 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl

@@ -0,0 +1,233 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_kafka_consumer_schema).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+-export([
+    source_examples/1,
+    connector_examples/1
+]).
+
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1
+]).
+
+-define(CONNECTOR_TYPE, kafka_consumer).
+-define(SOURCE_TYPE, kafka_consumer).
+
+%%-------------------------------------------------------------------------------------------------
+%% `hocon_schema' API
+%%-------------------------------------------------------------------------------------------------
+
+namespace() -> "kafka_consumer".
+
+roots() -> [].
+
+%%=========================================
+%% Source fields
+%%=========================================
+fields(source) ->
+    {kafka_consumer,
+        mk(
+            hoconsc:map(name, ref(?MODULE, consumer_source)),
+            #{
+                desc => <<"Kafka Consumer Source Config">>,
+                required => false
+            }
+        )};
+fields(consumer_source) ->
+    emqx_bridge_v2_schema:make_consumer_action_schema(
+        mk(
+            ref(?MODULE, source_parameters),
+            #{
+                required => true,
+                desc => ?DESC(consumer_source)
+            }
+        )
+    );
+fields(source_parameters) ->
+    Fields0 = emqx_bridge_kafka:fields(consumer_kafka_opts),
+    Fields1 = emqx_bridge_kafka:fields(consumer_opts),
+    Fields2 = proplists:delete(kafka, Fields1),
+    Fields = lists:map(
+        fun
+            ({topic_mapping = Name, Sc}) ->
+                %% to please dialyzer...
+                Override = #{
+                    type => hocon_schema:field_schema(Sc, type),
+                    required => false,
+                    default => [],
+                    validator => fun(_) -> ok end,
+                    importance => ?IMPORTANCE_HIDDEN
+                },
+                {Name, hocon_schema:override(Sc, Override)};
+            (FieldSchema) ->
+                FieldSchema
+        end,
+        Fields0 ++ Fields2
+    ),
+    [
+        {topic,
+            mk(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC(emqx_bridge_kafka, consumer_kafka_topic)
+                }
+            )}
+        | Fields
+    ];
+%%=========================================
+%% HTTP API fields: source
+%%=========================================
+fields(Field) when
+    Field == "get_source";
+    Field == "post_source";
+    Field == "put_source"
+->
+    emqx_bridge_v2_schema:api_fields(Field, ?SOURCE_TYPE, fields(consumer_source));
+%%=========================================
+%% Connector fields
+%%=========================================
+fields("config_connector") ->
+    emqx_connector_schema:common_fields() ++
+        emqx_bridge_kafka:kafka_connector_config_fields();
+%%=========================================
+%% HTTP API fields: connector
+%%=========================================
+fields(Field) when
+    Field == "get_connector";
+    Field == "put_connector";
+    Field == "post_connector"
+->
+    emqx_connector_schema:api_fields(
+        Field,
+        ?CONNECTOR_TYPE,
+        emqx_bridge_kafka:kafka_connector_config_fields()
+    ).
+
+desc("config_connector") ->
+    ?DESC("config_connector");
+desc(source_parameters) ->
+    ?DESC(source_parameters);
+desc(consumer_source) ->
+    ?DESC(consumer_source);
+desc(connector_resource_opts) ->
+    ?DESC(emqx_resource_schema, "resource_opts");
+desc(source_resource_opts) ->
+    ?DESC(emqx_resource_schema, "resource_opts");
+desc(Field) when
+    Field =:= "get_connector";
+    Field =:= "put_connector";
+    Field =:= "post_connector"
+->
+    "Configuration for Kafka Consumer Connector.";
+desc(Field) when
+    Field =:= "get_source";
+    Field =:= "put_source";
+    Field =:= "post_source"
+->
+    "Configuration for Kafka Consumer Source.";
+desc(Name) ->
+    throw({missing_desc, ?MODULE, Name}).
+
+%%-------------------------------------------------------------------------------------------------
+%% `emqx_bridge_v2_schema' "unofficial" API
+%%-------------------------------------------------------------------------------------------------
+
+source_examples(Method) ->
+    [
+        #{
+            <<"kafka_consumer">> => #{
+                summary => <<"Kafka Consumer Source">>,
+                value => source_example(Method)
+            }
+        }
+    ].
+
+connector_examples(Method) ->
+    [
+        #{
+            <<"kafka_consumer">> => #{
+                summary => <<"Kafka Consumer Connector">>,
+                value => connector_example(Method)
+            }
+        }
+    ].
+
+%%-------------------------------------------------------------------------------------------------
+%% Helper fns
+%%-------------------------------------------------------------------------------------------------
+
+source_example(post) ->
+    maps:merge(
+        source_example(put),
+        #{
+            type => <<"kafka_consumer">>,
+            name => <<"my_source">>
+        }
+    );
+source_example(get) ->
+    maps:merge(
+        source_example(put),
+        #{
+            status => <<"connected">>,
+            node_status => [
+                #{
+                    node => <<"emqx@localhost">>,
+                    status => <<"connected">>
+                }
+            ]
+        }
+    );
+source_example(put) ->
+    #{
+        parameters =>
+            #{
+                topic => <<"mytopic">>
+            },
+        resource_opts =>
+            #{
+                health_check_interval => <<"30s">>
+            }
+    }.
+
+connector_example(get) ->
+    maps:merge(
+        connector_example(post),
+        #{
+            status => <<"connected">>,
+            node_status => [
+                #{
+                    node => <<"emqx@localhost">>,
+                    status => <<"connected">>
+                }
+            ]
+        }
+    );
+connector_example(post) ->
+    maps:merge(
+        connector_example(put),
+        #{
+            type => <<"kafka_consumer">>,
+            name => <<"my_connector">>
+        }
+    );
+connector_example(put) ->
+    #{
+        bootstrap_hosts => <<"kafka.emqx.net:9092">>,
+        resource_opts =>
+            #{
+                start_after_created => true,
+                health_check_interval => <<"30s">>,
+                start_timeout => <<"5s">>
+            }
+    }.

+ 236 - 121
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl

@@ -11,7 +11,12 @@
     query_mode/1,
     on_start/2,
     on_stop/2,
-    on_get_status/2
+    on_get_status/2,
+
+    on_add_channel/4,
+    on_remove_channel/3,
+    on_get_channels/1,
+    on_get_channel_status/3
 ]).
 
 %% `brod_group_consumer' API
@@ -30,45 +35,57 @@
 -include_lib("brod/include/brod.hrl").
 -include_lib("emqx_resource/include/emqx_resource.hrl").
 
--type config() :: #{
+-type connector_config() :: #{
     authentication := term(),
     bootstrap_hosts := binary(),
-    bridge_name := atom(),
-    kafka := #{
-        max_batch_bytes := emqx_schema:bytesize(),
-        max_rejoin_attempts := non_neg_integer(),
-        offset_commit_interval_seconds := pos_integer(),
-        offset_reset_policy := offset_reset_policy(),
-        topic := binary()
-    },
-    topic_mapping := nonempty_list(
-        #{
-            kafka_topic := kafka_topic(),
-            mqtt_topic := emqx_types:topic(),
-            qos := emqx_types:qos(),
-            payload_template := string()
-        }
-    ),
+    connector_name := atom() | binary(),
+    connector_type := atom() | binary(),
+    socket_opts := _,
     ssl := _,
     any() => term()
 }.
+-type source_config() :: #{
+    bridge_name := atom(),
+    hookpoints := [binary()],
+    parameters := source_parameters()
+}.
+-type source_parameters() :: #{
+    key_encoding_mode := encoding_mode(),
+    max_batch_bytes := emqx_schema:bytesize(),
+    max_rejoin_attempts := non_neg_integer(),
+    offset_commit_interval_seconds := pos_integer(),
+    offset_reset_policy := offset_reset_policy(),
+    topic := kafka_topic(),
+    value_encoding_mode := encoding_mode(),
+    topic_mapping => [one_topic_mapping()]
+}.
+-type one_topic_mapping() :: #{
+    kafka_topic => kafka_topic(),
+    mqtt_topic => emqx_types:topic(),
+    qos => emqx_types:qos(),
+    payload_template => string()
+}.
 -type subscriber_id() :: emqx_bridge_kafka_consumer_sup:child_id().
 -type kafka_topic() :: brod:topic().
 -type kafka_message() :: #kafka_message{}.
--type state() :: #{
-    kafka_topics := nonempty_list(kafka_topic()),
+-type connector_state() :: #{
+    kafka_client_id := brod:client_id(),
+    installed_sources := #{source_resource_id() => source_state()}
+}.
+-type source_state() :: #{
     subscriber_id := subscriber_id(),
-    kafka_client_id := brod:client_id()
+    kafka_client_id := brod:client_id(),
+    kafka_topics := [kafka_topic()]
 }.
 -type offset_reset_policy() :: latest | earliest.
 -type encoding_mode() :: none | base64.
 -type consumer_init_data() :: #{
-    hookpoint := binary(),
+    hookpoints := [binary()],
     key_encoding_mode := encoding_mode(),
-    resource_id := resource_id(),
+    resource_id := source_resource_id(),
     topic_mapping := #{
         kafka_topic() := #{
-            payload_template := emqx_placeholder:tmpl_token(),
+            payload_template => emqx_placeholder:tmpl_token(),
             mqtt_topic_template => emqx_placeholder:tmpl_token(),
             qos => emqx_types:qos()
         }
@@ -76,13 +93,13 @@
     value_encoding_mode := encoding_mode()
 }.
 -type consumer_state() :: #{
-    hookpoint := binary(),
-    kafka_topic := binary(),
+    hookpoints := [binary()],
+    kafka_topic := kafka_topic(),
     key_encoding_mode := encoding_mode(),
-    resource_id := resource_id(),
+    resource_id := source_resource_id(),
     topic_mapping := #{
         kafka_topic() := #{
-            payload_template := emqx_placeholder:tmpl_token(),
+            payload_template => emqx_placeholder:tmpl_token(),
             mqtt_topic_template => emqx_placeholder:tmpl_token(),
             qos => emqx_types:qos()
         }
@@ -90,7 +107,7 @@
     value_encoding_mode := encoding_mode()
 }.
 -type subscriber_init_info() :: #{
-    topic => brod:topic(),
+    topic := brod:topic(),
     parition => brod:partition(),
     group_id => brod:group_id(),
     commit_fun => brod_group_subscriber_v2:commit_fun()
@@ -103,7 +120,7 @@
 
 %% Allocatable resources
 -define(kafka_client_id, kafka_client_id).
--define(kafka_subscriber_id, kafka_subscriber_id).
+-define(kafka_subscriber_ids, kafka_subscriber_ids).
 
 %%-------------------------------------------------------------------------------------
 %% `emqx_resource' API
@@ -116,27 +133,19 @@ callback_mode() ->
 query_mode(_Config) ->
     no_queries.
 
--spec on_start(resource_id(), config()) -> {ok, state()}.
-on_start(ResourceId, Config) ->
+-spec on_start(connector_resource_id(), connector_config()) -> {ok, connector_state()}.
+on_start(ConnectorResId, Config) ->
     #{
         authentication := Auth,
         bootstrap_hosts := BootstrapHosts0,
-        bridge_type := BridgeType,
-        bridge_name := BridgeName,
-        hookpoint := _,
-        kafka := #{
-            max_batch_bytes := _,
-            max_rejoin_attempts := _,
-            offset_commit_interval_seconds := _,
-            offset_reset_policy := _
-        },
+        connector_type := ConnectorType,
+        connector_name := ConnectorName,
         socket_opts := SocketOpts0,
-        ssl := SSL,
-        topic_mapping := _
+        ssl := SSL
     } = Config,
     BootstrapHosts = emqx_bridge_kafka_impl:hosts(BootstrapHosts0),
     %% Note: this is distinct per node.
-    ClientID = make_client_id(ResourceId, BridgeType, BridgeName),
+    ClientID = make_client_id(ConnectorResId, ConnectorType, ConnectorName),
     ClientOpts0 =
         case Auth of
             none -> [];
@@ -145,34 +154,37 @@ on_start(ResourceId, Config) ->
     ClientOpts = add_ssl_opts(ClientOpts0, SSL),
     SocketOpts = emqx_bridge_kafka_impl:socket_opts(SocketOpts0),
     ClientOpts1 = [{extra_sock_opts, SocketOpts} | ClientOpts],
-    ok = emqx_resource:allocate_resource(ResourceId, ?kafka_client_id, ClientID),
+    ok = emqx_resource:allocate_resource(ConnectorResId, ?kafka_client_id, ClientID),
     case brod:start_client(BootstrapHosts, ClientID, ClientOpts1) of
         ok ->
             ?tp(
                 kafka_consumer_client_started,
-                #{client_id => ClientID, resource_id => ResourceId}
+                #{client_id => ClientID, resource_id => ConnectorResId}
             ),
             ?SLOG(info, #{
                 msg => "kafka_consumer_client_started",
-                resource_id => ResourceId,
+                resource_id => ConnectorResId,
                 kafka_hosts => BootstrapHosts
             });
         {error, Reason} ->
             ?SLOG(error, #{
                 msg => "failed_to_start_kafka_consumer_client",
-                resource_id => ResourceId,
+                resource_id => ConnectorResId,
                 kafka_hosts => BootstrapHosts,
                 reason => emqx_utils:redact(Reason)
             }),
             throw(?CLIENT_DOWN_MESSAGE)
     end,
-    start_consumer(Config, ResourceId, ClientID).
-
--spec on_stop(resource_id(), state()) -> ok.
-on_stop(ResourceId, _State = undefined) ->
-    case emqx_resource:get_allocated_resources(ResourceId) of
-        #{?kafka_client_id := ClientID, ?kafka_subscriber_id := SubscriberId} ->
-            stop_subscriber(SubscriberId),
+    {ok, #{
+        kafka_client_id => ClientID,
+        installed_sources => #{}
+    }}.
+
+-spec on_stop(resource_id(), connector_state()) -> ok.
+on_stop(ConnectorResId, _State = undefined) ->
+    case emqx_resource:get_allocated_resources(ConnectorResId) of
+        #{?kafka_client_id := ClientID, ?kafka_subscriber_ids := SubscriberIds} ->
+            lists:foreach(fun stop_subscriber/1, SubscriberIds),
             stop_client(ClientID),
             ?tp(kafka_consumer_subcriber_and_client_stopped, #{}),
             ok;
@@ -183,29 +195,91 @@ on_stop(ResourceId, _State = undefined) ->
         _ ->
             ok
     end;
-on_stop(_ResourceId, State) ->
+on_stop(ConnectorResId, State) ->
     #{
-        subscriber_id := SubscriberId,
+        installed_sources := InstalledSources,
         kafka_client_id := ClientID
     } = State,
-    stop_subscriber(SubscriberId),
+    maps:foreach(
+        fun(_SourceResId, #{subscriber_id := SubscriberId}) ->
+            stop_subscriber(SubscriberId)
+        end,
+        InstalledSources
+    ),
     stop_client(ClientID),
+    ?tp(kafka_consumer_subcriber_and_client_stopped, #{instance_id => ConnectorResId}),
     ok.
 
--spec on_get_status(resource_id(), state()) -> connected | disconnected.
-on_get_status(_ResourceID, State) ->
+-spec on_get_status(resource_id(), connector_state()) -> connected | disconnected.
+on_get_status(_ResourceID, _State) ->
+    ?status_connected.
+
+-spec on_add_channel(
+    connector_resource_id(),
+    connector_state(),
+    source_resource_id(),
+    source_config()
+) ->
+    {ok, connector_state()}.
+on_add_channel(ConnectorResId, ConnectorState0, SourceResId, SourceConfig) ->
     #{
-        subscriber_id := SubscriberId,
         kafka_client_id := ClientID,
-        kafka_topics := KafkaTopics
-    } = State,
-    case do_get_status(ClientID, KafkaTopics, SubscriberId) of
-        {disconnected, Message} ->
-            {disconnected, State, Message};
-        Res ->
-            Res
+        installed_sources := InstalledSources0
+    } = ConnectorState0,
+    case start_consumer(SourceConfig, ConnectorResId, SourceResId, ClientID) of
+        {ok, SourceState} ->
+            InstalledSources = InstalledSources0#{SourceResId => SourceState},
+            ConnectorState = ConnectorState0#{installed_sources := InstalledSources},
+            {ok, ConnectorState};
+        Error = {error, _} ->
+            Error
     end.
 
+-spec on_remove_channel(
+    connector_resource_id(),
+    connector_state(),
+    source_resource_id()
+) ->
+    {ok, connector_state()}.
+on_remove_channel(ConnectorResId, ConnectorState0, SourceResId) ->
+    #{installed_sources := InstalledSources0} = ConnectorState0,
+    case maps:take(SourceResId, InstalledSources0) of
+        {SourceState, InstalledSources} ->
+            #{subscriber_id := SubscriberId} = SourceState,
+            stop_subscriber(SubscriberId),
+            deallocate_subscriber_id(ConnectorResId, SubscriberId),
+            ok;
+        error ->
+            InstalledSources = InstalledSources0
+    end,
+    ConnectorState = ConnectorState0#{installed_sources := InstalledSources},
+    {ok, ConnectorState}.
+
+-spec on_get_channels(connector_resource_id()) ->
+    [{action_resource_id(), source_config()}].
+on_get_channels(ConnectorResId) ->
+    emqx_bridge_v2:get_channels_for_connector(ConnectorResId).
+
+-spec on_get_channel_status(
+    connector_resource_id(),
+    source_resource_id(),
+    connector_state()
+) ->
+    ?status_connected | ?status_disconnected.
+on_get_channel_status(
+    _ConnectorResId,
+    SourceResId,
+    ConnectorState = #{installed_sources := InstalledSources}
+) when is_map_key(SourceResId, InstalledSources) ->
+    #{kafka_client_id := ClientID} = ConnectorState,
+    #{
+        kafka_topics := KafkaTopics,
+        subscriber_id := SubscriberId
+    } = maps:get(SourceResId, InstalledSources),
+    do_get_status(ClientID, KafkaTopics, SubscriberId);
+on_get_channel_status(_ConnectorResId, _SourceResId, _ConnectorState) ->
+    ?status_disconnected.
+
 %%-------------------------------------------------------------------------------------
 %% `brod_group_subscriber' API
 %%-------------------------------------------------------------------------------------
@@ -227,18 +301,13 @@ handle_message(Message, State) ->
 
 do_handle_message(Message, State) ->
     #{
-        hookpoint := Hookpoint,
+        hookpoints := Hookpoints,
         kafka_topic := KafkaTopic,
         key_encoding_mode := KeyEncodingMode,
-        resource_id := ResourceId,
+        resource_id := SourceResId,
         topic_mapping := TopicMapping,
         value_encoding_mode := ValueEncodingMode
     } = State,
-    #{
-        mqtt_topic_template := MQTTTopicTemplate,
-        qos := MQTTQoS,
-        payload_template := PayloadTemplate
-    } = maps:get(KafkaTopic, TopicMapping),
     FullMessage = #{
         headers => maps:from_list(Message#kafka_message.headers),
         key => encode(Message#kafka_message.key, KeyEncodingMode),
@@ -248,16 +317,31 @@ do_handle_message(Message, State) ->
         ts_type => Message#kafka_message.ts_type,
         value => encode(Message#kafka_message.value, ValueEncodingMode)
     },
-    Payload = render(FullMessage, PayloadTemplate),
-    MQTTTopic = render(FullMessage, MQTTTopicTemplate),
-    MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, Payload),
-    _ = emqx_broker:safe_publish(MQTTMessage),
-    emqx_hooks:run(Hookpoint, [FullMessage]),
-    emqx_resource_metrics:received_inc(ResourceId),
+    LegacyMQTTConfig = maps:get(KafkaTopic, TopicMapping, #{}),
+    legacy_maybe_publish_mqtt_message(LegacyMQTTConfig, SourceResId, FullMessage),
+    lists:foreach(fun(Hookpoint) -> emqx_hooks:run(Hookpoint, [FullMessage]) end, Hookpoints),
+    emqx_resource_metrics:received_inc(SourceResId),
     %% note: just `ack' does not commit the offset to the
     %% kafka consumer group.
     {ok, commit, State}.
 
+legacy_maybe_publish_mqtt_message(
+    _MQTTConfig = #{
+        payload_template := PayloadTemplate,
+        qos := MQTTQoS,
+        mqtt_topic_template := MQTTTopicTemplate
+    },
+    SourceResId,
+    FullMessage
+) when MQTTTopicTemplate =/= <<>> ->
+    Payload = render(FullMessage, PayloadTemplate),
+    MQTTTopic = render(FullMessage, MQTTTopicTemplate),
+    MQTTMessage = emqx_message:make(SourceResId, MQTTQoS, MQTTTopic, Payload),
+    _ = emqx_broker:safe_publish(MQTTMessage),
+    ok;
+legacy_maybe_publish_mqtt_message(_MQTTConfig, _SourceResId, _FullMessage) ->
+    ok.
+
 %%-------------------------------------------------------------------------------------
 %% Helper fns
 %%-------------------------------------------------------------------------------------
@@ -292,28 +376,33 @@ ensure_consumer_supervisor_started() ->
             ok
     end.
 
--spec start_consumer(config(), resource_id(), brod:client_id()) -> {ok, state()}.
-start_consumer(Config, ResourceId, ClientID) ->
+-spec start_consumer(
+    source_config(),
+    connector_resource_id(),
+    source_resource_id(),
+    brod:client_id()
+) ->
+    {ok, source_state()} | {error, term()}.
+start_consumer(Config, ConnectorResId, SourceResId, ClientID) ->
     #{
-        bootstrap_hosts := BootstrapHosts0,
         bridge_name := BridgeName,
-        hookpoint := Hookpoint,
-        kafka := #{
+        hookpoints := Hookpoints,
+        parameters := #{
+            key_encoding_mode := KeyEncodingMode,
             max_batch_bytes := MaxBatchBytes,
             max_rejoin_attempts := MaxRejoinAttempts,
             offset_commit_interval_seconds := OffsetCommitInterval,
-            offset_reset_policy := OffsetResetPolicy0
-        },
-        key_encoding_mode := KeyEncodingMode,
-        topic_mapping := TopicMapping0,
-        value_encoding_mode := ValueEncodingMode
+            offset_reset_policy := OffsetResetPolicy0,
+            topic := _Topic,
+            value_encoding_mode := ValueEncodingMode
+        } = Params0
     } = Config,
     ok = ensure_consumer_supervisor_started(),
-    TopicMapping = convert_topic_mapping(TopicMapping0),
+    TopicMapping = ensure_topic_mapping(Params0),
     InitialState = #{
         key_encoding_mode => KeyEncodingMode,
-        hookpoint => Hookpoint,
-        resource_id => ResourceId,
+        hookpoints => Hookpoints,
+        resource_id => SourceResId,
         topic_mapping => TopicMapping,
         value_encoding_mode => ValueEncodingMode
     },
@@ -355,30 +444,38 @@ start_consumer(Config, ResourceId, ClientID) ->
     %% automatically, so we should not spawn duplicate workers.
     SubscriberId = make_subscriber_id(BridgeName),
     ?tp(kafka_consumer_about_to_start_subscriber, #{}),
-    ok = emqx_resource:allocate_resource(ResourceId, ?kafka_subscriber_id, SubscriberId),
+    ok = allocate_subscriber_id(ConnectorResId, SubscriberId),
     ?tp(kafka_consumer_subscriber_allocated, #{}),
     case emqx_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of
         {ok, _ConsumerPid} ->
             ?tp(
                 kafka_consumer_subscriber_started,
-                #{resource_id => ResourceId, subscriber_id => SubscriberId}
+                #{resource_id => SourceResId, subscriber_id => SubscriberId}
             ),
             {ok, #{
                 subscriber_id => SubscriberId,
                 kafka_client_id => ClientID,
                 kafka_topics => KafkaTopics
             }};
-        {error, Reason2} ->
+        {error, Reason} ->
             ?SLOG(error, #{
                 msg => "failed_to_start_kafka_consumer",
-                resource_id => ResourceId,
-                kafka_hosts => emqx_bridge_kafka_impl:hosts(BootstrapHosts0),
-                reason => emqx_utils:redact(Reason2)
+                resource_id => SourceResId,
+                reason => emqx_utils:redact(Reason)
             }),
-            stop_client(ClientID),
-            throw(failed_to_start_kafka_consumer)
+            {error, Reason}
     end.
 
+%% This is to ensure backwards compatibility with the deprectated topic mapping.
+-spec ensure_topic_mapping(source_parameters()) -> #{kafka_topic() := map()}.
+ensure_topic_mapping(#{topic_mapping := [_ | _] = TM}) ->
+    %% There is an existing topic mapping: legacy config.  We use it and ignore the single
+    %% pubsub topic so that the bridge keeps working as before.
+    convert_topic_mapping(TM);
+ensure_topic_mapping(#{topic := KafkaTopic}) ->
+    %% No topic mapping: generate one without MQTT templates.
+    #{KafkaTopic => #{}}.
+
 -spec stop_subscriber(emqx_bridge_kafka_consumer_sup:child_id()) -> ok.
 stop_subscriber(SubscriberId) ->
     _ = log_when_error(
@@ -415,36 +512,38 @@ do_get_status(ClientID, [KafkaTopic | RestTopics], SubscriberId) ->
     case brod:get_partitions_count(ClientID, KafkaTopic) of
         {ok, NPartitions} ->
             case do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) of
-                connected -> do_get_status(ClientID, RestTopics, SubscriberId);
-                disconnected -> disconnected
+                ?status_connected ->
+                    do_get_status(ClientID, RestTopics, SubscriberId);
+                ?status_disconnected ->
+                    ?status_disconnected
             end;
         {error, {client_down, Context}} ->
             case infer_client_error(Context) of
                 auth_error ->
                     Message = "Authentication error. " ++ ?CLIENT_DOWN_MESSAGE,
-                    {disconnected, Message};
+                    {?status_disconnected, Message};
                 {auth_error, Message0} ->
                     Message = binary_to_list(Message0) ++ "; " ++ ?CLIENT_DOWN_MESSAGE,
-                    {disconnected, Message};
+                    {?status_disconnected, Message};
                 connection_refused ->
                     Message = "Connection refused. " ++ ?CLIENT_DOWN_MESSAGE,
-                    {disconnected, Message};
+                    {?status_disconnected, Message};
                 _ ->
-                    {disconnected, ?CLIENT_DOWN_MESSAGE}
+                    {?status_disconnected, ?CLIENT_DOWN_MESSAGE}
             end;
         {error, leader_not_available} ->
             Message =
                 "Leader connection not available. Please check the Kafka topic used,"
                 " the connection parameters and Kafka cluster health",
-            {disconnected, Message};
+            {?status_disconnected, Message};
         _ ->
-            disconnected
+            ?status_disconnected
     end;
 do_get_status(_ClientID, _KafkaTopics = [], _SubscriberId) ->
-    connected.
+    ?status_connected.
 
 -spec do_get_topic_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) ->
-    connected | disconnected.
+    ?status_connected | ?status_disconnected.
 do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
     Results =
         lists:map(
@@ -467,9 +566,9 @@ do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
     WorkersAlive = are_subscriber_workers_alive(SubscriberId),
     case AllLeadersOk andalso WorkersAlive of
         true ->
-            connected;
+            ?status_connected;
         false ->
-            disconnected
+            ?status_disconnected
     end.
 
 are_subscriber_workers_alive(SubscriberId) ->
@@ -507,19 +606,19 @@ consumer_group_id(BridgeName0) ->
     BridgeName = to_bin(BridgeName0),
     <<"emqx-kafka-consumer-", BridgeName/binary>>.
 
--spec is_dry_run(resource_id()) -> boolean().
-is_dry_run(ResourceId) ->
-    TestIdStart = string:find(ResourceId, ?TEST_ID_PREFIX),
+-spec is_dry_run(connector_resource_id()) -> boolean().
+is_dry_run(ConnectorResId) ->
+    TestIdStart = string:find(ConnectorResId, ?TEST_ID_PREFIX),
     case TestIdStart of
         nomatch ->
             false;
         _ ->
-            string:equal(TestIdStart, ResourceId)
+            string:equal(TestIdStart, ConnectorResId)
     end.
 
--spec make_client_id(resource_id(), binary(), atom() | binary()) -> atom().
-make_client_id(ResourceId, BridgeType, BridgeName) ->
-    case is_dry_run(ResourceId) of
+-spec make_client_id(connector_resource_id(), binary(), atom() | binary()) -> atom().
+make_client_id(ConnectorResId, BridgeType, BridgeName) ->
+    case is_dry_run(ConnectorResId) of
         false ->
             ClientID0 = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName),
             binary_to_atom(ClientID0);
@@ -583,3 +682,19 @@ infer_client_error(Error) ->
         _ ->
             undefined
     end.
+
+allocate_subscriber_id(ConnectorResId, SubscriberId) ->
+    AllocatedResources = emqx_resource:get_allocated_resources(ConnectorResId),
+    AllocatedSubscriberIds0 = maps:get(?kafka_subscriber_ids, AllocatedResources, []),
+    AllocatedSubscriberIds = lists:usort([SubscriberId | AllocatedSubscriberIds0]),
+    ok = emqx_resource:allocate_resource(
+        ConnectorResId, ?kafka_subscriber_ids, AllocatedSubscriberIds
+    ).
+
+deallocate_subscriber_id(ConnectorResId, SubscriberId) ->
+    AllocatedResources = emqx_resource:get_allocated_resources(ConnectorResId),
+    AllocatedSubscriberIds0 = maps:get(?kafka_subscriber_ids, AllocatedResources, []),
+    AllocatedSubscriberIds = AllocatedSubscriberIds0 -- [SubscriberId],
+    ok = emqx_resource:allocate_resource(
+        ConnectorResId, ?kafka_subscriber_ids, AllocatedSubscriberIds
+    ).

+ 69 - 53
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl

@@ -15,6 +15,8 @@
 -import(emqx_common_test_helpers, [on_exit/1]).
 
 -define(BRIDGE_TYPE_BIN, <<"kafka_consumer">>).
+-define(CONNECTOR_TYPE_BIN, <<"kafka_consumer">>).
+-define(SOURCE_TYPE_BIN, <<"kafka_consumer">>).
 -define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_kafka]).
 
 %%------------------------------------------------------------------------------
@@ -78,13 +80,29 @@ testcases(once) ->
     ].
 
 init_per_suite(Config) ->
-    [{bridge_type, <<"kafka_consumer">>} | Config].
+    emqx_common_test_helpers:clear_screen(),
+    Apps = emqx_cth_suite:start(
+        [
+            emqx,
+            emqx_conf,
+            emqx_bridge_kafka,
+            emqx_bridge,
+            emqx_rule_engine,
+            emqx_management,
+            emqx_mgmt_api_test_util:emqx_dashboard()
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    {ok, _Api} = emqx_common_test_http:create_default_app(),
+    [
+        {apps, Apps},
+        {bridge_type, <<"kafka_consumer">>}
+        | Config
+    ].
 
-end_per_suite(_Config) ->
-    emqx_mgmt_api_test_util:end_suite(),
-    ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
-    ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?APPS)),
-    _ = application:stop(emqx_connector),
+end_per_suite(Config) ->
+    Apps = ?config(apps, Config),
+    emqx_cth_suite:stop(Apps),
     ok.
 
 init_per_group(plain = Type, Config) ->
@@ -242,11 +260,6 @@ common_init_per_group() ->
     ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
     ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
     emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
-    application:load(emqx_bridge),
-    ok = emqx_common_test_helpers:start_apps([emqx_conf]),
-    ok = emqx_connector_test_helpers:start_apps(?APPS),
-    {ok, _} = application:ensure_all_started(emqx_connector),
-    emqx_mgmt_api_test_util:init_suite(),
     UniqueNum = integer_to_binary(erlang:unique_integer()),
     MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>,
     [
@@ -262,7 +275,7 @@ common_end_per_group(Config) ->
     ProxyHost = ?config(proxy_host, Config),
     ProxyPort = ?config(proxy_port, Config),
     emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
-    delete_all_bridges(),
+    emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
     ok.
 
 end_per_group(Group, Config) when
@@ -327,7 +340,7 @@ init_per_testcase(TestCase, Config) ->
 
 common_init_per_testcase(TestCase, Config0) ->
     ct:timetrap(timer:seconds(60)),
-    delete_all_bridges(),
+    emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
     emqx_config:delete_override_conf_files(),
     KafkaTopic0 =
         <<
@@ -363,7 +376,12 @@ common_init_per_testcase(TestCase, Config0) ->
         {kafka_name, Name},
         {kafka_config_string, ConfigString},
         {kafka_config, KafkaConfig},
-        {kafka_producers, ProducersConfigs}
+        {kafka_producers, ProducersConfigs},
+        {bridge_kind, source},
+        {connector_name, Name},
+        {connector_type, ?CONNECTOR_TYPE_BIN},
+        {source_type, ?SOURCE_TYPE_BIN},
+        {source_name, Name}
         | Config
     ].
 
@@ -372,7 +390,7 @@ end_per_testcase(_Testcase, Config) ->
     ProxyPort = ?config(proxy_port, Config),
     ProducersConfigs = ?config(kafka_producers, Config),
     emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
-    delete_all_bridges(),
+    emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
     #{clientid := KafkaProducerClientId, producers := ProducersMapping} =
         ProducersConfigs,
     lists:foreach(
@@ -681,19 +699,6 @@ create_bridge_wait_for_balance(Config) ->
         kill_group_subscriber_spy()
     end.
 
-delete_bridge(Config) ->
-    Type = ?BRIDGE_TYPE_BIN,
-    Name = ?config(kafka_name, Config),
-    emqx_bridge:remove(Type, Name).
-
-delete_all_bridges() ->
-    lists:foreach(
-        fun(#{name := Name, type := Type}) ->
-            emqx_bridge:remove(Type, Name)
-        end,
-        emqx_bridge:list()
-    ).
-
 create_bridge_api(Config) ->
     create_bridge_api(Config, _Overrides = #{}).
 
@@ -752,9 +757,8 @@ send_message(Config, Payload) ->
     emqx_bridge:send_message(BridgeId, Payload).
 
 resource_id(Config) ->
-    Type = ?BRIDGE_TYPE_BIN,
     Name = ?config(kafka_name, Config),
-    emqx_bridge_resource:resource_id(Type, Name).
+    emqx_bridge_v2:source_id(?SOURCE_TYPE_BIN, Name, Name).
 
 instance_id(Config) ->
     ResourceId = resource_id(Config),
@@ -1084,6 +1088,12 @@ cluster(Config) ->
     ct:pal("cluster: ~p", [Cluster]),
     Cluster.
 
+start_peer(Name, Opts) ->
+    Node = emqx_common_test_helpers:start_peer(Name, Opts),
+    % Make it possible to call `ct:pal` and friends (if running under rebar3)
+    _ = emqx_cth_cluster:share_load_module(Node, cthr),
+    Node.
+
 start_async_publisher(Config, KafkaTopic) ->
     TId = ets:new(kafka_payloads, [public, ordered_set]),
     Loop = fun Go() ->
@@ -1129,6 +1139,15 @@ kill_resource_managers() ->
         supervisor:which_children(emqx_resource_manager_sup)
     ).
 
+health_check(Config) ->
+    health_check(node(), Config).
+
+health_check(Node, Config) ->
+    erpc:call(Node, fun() ->
+        #{status := Status} = emqx_bridge_v2_testlib:health_check_channel(Config),
+        {ok, Status}
+    end).
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -1344,19 +1363,13 @@ t_on_get_status(Config) ->
     ProxyPort = ?config(proxy_port, Config),
     ProxyHost = ?config(proxy_host, Config),
     ProxyName = ?config(proxy_name, Config),
-    KafkaName = ?config(kafka_name, Config),
-    ResourceId = emqx_bridge_resource:resource_id(kafka_consumer, KafkaName),
     ?assertMatch(
         {ok, _},
         create_bridge(Config)
     ),
-    %% Since the connection process is async, we give it some time to
-    %% stabilize and avoid flakiness.
-    ct:sleep(1_200),
-    ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
+    ?retry(100, 20, ?assertEqual({ok, connected}, health_check(Config))),
     emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
-        ct:sleep(500),
-        ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId))
+        ?retry(100, 20, ?assertEqual({ok, disconnected}, health_check(Config)))
     end),
     ok.
 
@@ -1390,14 +1403,16 @@ t_failed_creation_then_fixed(Config) ->
     ?assertMatch(
         {{ok, _}, {ok, _}},
         ?wait_async_action(
-            update_bridge_api(Config),
+            update_bridge_api(Config, #{
+                <<"resource_opts">> =>
+                    #{<<"health_check_interval">> => <<"1s">>}
+            }),
             #{?snk_kind := kafka_consumer_subscriber_started},
             60_000
         )
     ),
     wait_until_subscribers_are_ready(NPartitions, 120_000),
-    ResourceId = resource_id(Config),
-    ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
+    ?assertEqual({ok, connected}, health_check(Config)),
     ping_until_healthy(Config, _Period = 1_500, _Timeout = 24_000),
 
     {ok, C} = emqtt:start_link(),
@@ -1459,7 +1474,6 @@ t_receive_after_recovery(Config) ->
     KafkaName = ?config(kafka_name, Config),
     KafkaNameA = binary_to_atom(KafkaName),
     KafkaClientId = consumer_clientid(Config),
-    ResourceId = resource_id(Config),
     ?check_trace(
         begin
             {ok, _} = create_bridge(
@@ -1467,7 +1481,7 @@ t_receive_after_recovery(Config) ->
                 #{<<"kafka">> => #{<<"offset_reset_policy">> => <<"earliest">>}}
             ),
             ping_until_healthy(Config, _Period = 1_500, _Timeout0 = 24_000),
-            {ok, connected} = emqx_resource_manager:health_check(ResourceId),
+            {ok, connected} = health_check(Config),
             %% 0) ensure each partition commits its offset so it can
             %% recover later.
             Messages0 = [
@@ -1718,14 +1732,13 @@ t_cluster_group(Config) ->
     NPartitions = ?config(num_partitions, Config),
     KafkaTopic = ?config(kafka_topic, Config),
     KafkaName = ?config(kafka_name, Config),
-    ResourceId = resource_id(Config),
     BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, KafkaName),
     Cluster = cluster(Config),
     ?check_trace(
         begin
             Nodes =
                 [_N1, N2 | _] = [
-                    emqx_common_test_helpers:start_peer(Name, Opts)
+                    start_peer(Name, Opts)
                  || {Name, Opts} <- Cluster
                 ],
             on_exit(fun() ->
@@ -1765,7 +1778,7 @@ t_cluster_group(Config) ->
                 fun(N) ->
                     ?assertEqual(
                         {ok, connected},
-                        erpc:call(N, emqx_resource_manager, health_check, [ResourceId]),
+                        health_check(N, Config),
                         #{node => N}
                     )
                 end,
@@ -1801,14 +1814,13 @@ t_node_joins_existing_cluster(Config) ->
     NPartitions = ?config(num_partitions, Config),
     KafkaTopic = ?config(kafka_topic, Config),
     KafkaName = ?config(kafka_name, Config),
-    ResourceId = resource_id(Config),
     BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, KafkaName),
     Cluster = cluster(Config),
     ?check_trace(
         begin
             [{Name1, Opts1}, {Name2, Opts2} | _] = Cluster,
             ct:pal("starting ~p", [Name1]),
-            N1 = emqx_common_test_helpers:start_peer(Name1, Opts1),
+            N1 = start_peer(Name1, Opts1),
             on_exit(fun() ->
                 ct:pal("stopping ~p", [N1]),
                 ok = emqx_common_test_helpers:stop_peer(N1)
@@ -1834,7 +1846,7 @@ t_node_joins_existing_cluster(Config) ->
             {ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, [N1], 30_000),
             ?assertEqual(
                 {ok, connected},
-                erpc:call(N1, emqx_resource_manager, health_check, [ResourceId])
+                health_check(N1, Config)
             ),
 
             %% Now, we start the second node and have it join the cluster.
@@ -1851,7 +1863,7 @@ t_node_joins_existing_cluster(Config) ->
                 30_000
             ),
             ct:pal("starting ~p", [Name2]),
-            N2 = emqx_common_test_helpers:start_peer(Name2, Opts2),
+            N2 = start_peer(Name2, Opts2),
             on_exit(fun() ->
                 ct:pal("stopping ~p", [N2]),
                 ok = emqx_common_test_helpers:stop_peer(N2)
@@ -1944,7 +1956,7 @@ t_cluster_node_down(Config) ->
                 lists:map(
                     fun({Name, Opts}) ->
                         ct:pal("starting ~p", [Name]),
-                        emqx_common_test_helpers:start_peer(Name, Opts)
+                        start_peer(Name, Opts)
                     end,
                     Cluster
                 ),
@@ -2130,7 +2142,6 @@ t_resource_manager_crash_after_subscriber_started(Config) ->
                 _ ->
                     ct:fail("unexpected result: ~p", [Res])
             end,
-            ?assertMatch(ok, delete_bridge(Config)),
             ?retry(
                 _Sleep = 50,
                 _Attempts = 50,
@@ -2143,6 +2154,7 @@ t_resource_manager_crash_after_subscriber_started(Config) ->
     ok.
 
 t_resource_manager_crash_before_subscriber_started(Config) ->
+    Name = ?config(kafka_name, Config),
     ?check_trace(
         begin
             ?force_ordering(
@@ -2183,11 +2195,15 @@ t_resource_manager_crash_before_subscriber_started(Config) ->
                 {ok, _} ->
                     %% the new manager may have had time to startup
                     %% before the resource status cache is read...
+                    {ok, {{_, 204, _}, _, _}} =
+                        emqx_bridge_testlib:delete_bridge_http_api_v1(#{
+                            name => Name,
+                            type => ?BRIDGE_TYPE_BIN
+                        }),
                     ok;
                 _ ->
                     ct:fail("unexpected result: ~p", [Res])
             end,
-            ?assertMatch(ok, delete_bridge(Config)),
             ?retry(
                 _Sleep = 50,
                 _Attempts = 50,

+ 341 - 0
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl

@@ -0,0 +1,341 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_v2_kafka_consumer_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-import(emqx_common_test_helpers, [on_exit/1]).
+
+-define(CONNECTOR_TYPE_BIN, <<"kafka_consumer">>).
+-define(SOURCE_TYPE_BIN, <<"kafka_consumer">>).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    All0 = emqx_common_test_helpers:all(?MODULE),
+    All = All0 -- matrix_cases(),
+    Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()),
+    Groups ++ All.
+
+groups() ->
+    emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()).
+
+matrix_cases() ->
+    [
+        t_start_stop
+    ].
+
+init_per_suite(Config) ->
+    emqx_bridge_kafka_impl_consumer_SUITE:init_per_suite(Config).
+
+end_per_suite(Config) ->
+    emqx_bridge_kafka_impl_consumer_SUITE:end_per_suite(Config).
+
+init_per_testcase(TestCase, Config) ->
+    common_init_per_testcase(TestCase, Config).
+
+common_init_per_testcase(TestCase, Config0) ->
+    ct:timetrap({seconds, 60}),
+    UniqueNum = integer_to_binary(erlang:unique_integer()),
+    Name = <<(atom_to_binary(TestCase))/binary, UniqueNum/binary>>,
+    ConnectorConfig = connector_config(Name, Config0),
+    Topic = Name,
+    SourceConfig = source_config(#{
+        connector => Name,
+        parameters => #{topic => Topic}
+    }),
+    Config1 = ensure_topic_and_producers(ConnectorConfig, SourceConfig, TestCase, Config0),
+    ct:comment(get_matrix_params(Config1)),
+    [
+        {kafka_topic, Topic},
+        {bridge_kind, source},
+        {source_type, ?SOURCE_TYPE_BIN},
+        {source_name, Name},
+        {source_config, SourceConfig},
+        {connector_name, Name},
+        {connector_type, ?CONNECTOR_TYPE_BIN},
+        {connector_config, ConnectorConfig},
+        {proxy_host, "toxiproxy"},
+        {proxy_port, 8474}
+        | Config1
+    ].
+
+end_per_testcase(TestCase, Config) ->
+    emqx_bridge_v2_testlib:end_per_testcase(TestCase, Config),
+    ok.
+
+auth_config(Config) ->
+    AuthType0 = maps:get(auth, get_matrix_params(Config)),
+    AuthType =
+        case AuthType0 of
+            none -> none;
+            sasl_auth_plain -> plain;
+            sasl_auth_scram256 -> scram_sha_256;
+            sasl_auth_scram512 -> scram_sha_512;
+            sasl_auth_kerberos -> kerberos
+        end,
+    {ok, #{<<"authentication">> := Auth}} =
+        hocon:binary(emqx_bridge_kafka_impl_consumer_SUITE:authentication(AuthType)),
+    Auth.
+
+get_matrix_params(Config) ->
+    case group_path(Config) of
+        undefined ->
+            #{
+                host => <<"toxiproxy.emqx.net">>,
+                port => 9292,
+                tls => plain,
+                auth => none,
+                proxy_name => "kafka_plain"
+            };
+        [TLS, Auth | _] ->
+            #{
+                host => <<"toxiproxy.emqx.net">>,
+                port => toxiproxy_kafka_port(#{tls => TLS, auth => Auth}),
+                tls => TLS,
+                auth => Auth,
+                proxy_name => toxiproxy_proxy_name(#{tls => TLS, auth => Auth})
+            }
+    end.
+
+toxiproxy_kafka_port(#{tls := plain, auth := none}) -> 9292;
+toxiproxy_kafka_port(#{tls := tls, auth := none}) -> 9294;
+toxiproxy_kafka_port(#{tls := tls, auth := sasl_auth_kerberos}) -> 9095;
+toxiproxy_kafka_port(#{tls := plain, auth := sasl_auth_kerberos}) -> 9093;
+toxiproxy_kafka_port(#{tls := plain, auth := _}) -> 9293;
+toxiproxy_kafka_port(#{tls := tls, auth := _}) -> 9295.
+
+toxiproxy_proxy_name(#{tls := plain, auth := none}) -> "kafka_plain";
+toxiproxy_proxy_name(#{tls := tls, auth := none}) -> "kafka_ssl";
+toxiproxy_proxy_name(#{tls := plain, auth := _}) -> "kafka_sasl_plain";
+toxiproxy_proxy_name(#{tls := tls, auth := _}) -> "kafka_sasl_ssl".
+
+toxiproxy_host(#{auth := sasl_auth_kerberos}) -> <<"kafka-1.emqx.net">>;
+toxiproxy_host(_) -> <<"toxiproxy.emqx.net">>.
+
+group_path(Config) ->
+    case emqx_common_test_helpers:group_path(Config) of
+        [] ->
+            undefined;
+        Path ->
+            Path
+    end.
+
+merge(Maps) ->
+    lists:foldl(fun(M, Acc) -> emqx_utils_maps:deep_merge(Acc, M) end, #{}, Maps).
+
+ensure_topic_and_producers(ConnectorConfig, SourceConfig, TestCase, TCConfig) ->
+    #{tls := TLS, auth := Auth} = get_matrix_params(TCConfig),
+    Topic = emqx_utils_maps:deep_get([<<"parameters">>, <<"topic">>], SourceConfig),
+    [{Host, Port}] = emqx_bridge_kafka_impl:hosts(maps:get(<<"bootstrap_hosts">>, ConnectorConfig)),
+    CreateConfig = maps:to_list(#{
+        topic_mapping => [#{kafka_topic => Topic}],
+        kafka_host => Host,
+        kafka_port => Port,
+        direct_kafka_host => Host,
+        direct_kafka_port => Port,
+        use_tls => TLS =:= tls,
+        use_sasl => Auth =/= none,
+        num_partitions => 1
+    }),
+    ok = emqx_bridge_kafka_impl_consumer_SUITE:ensure_topics(CreateConfig),
+    ProducerConfigs = emqx_bridge_kafka_impl_consumer_SUITE:start_producers(TestCase, CreateConfig),
+    [{kafka_producers, ProducerConfigs} | TCConfig].
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+connector_config(Name, Config) ->
+    connector_config1(
+        Name,
+        connector_overrides(Config)
+    ).
+
+connector_config1(Name, Overrides0 = #{}) ->
+    Overrides = emqx_utils_maps:binary_key_map(Overrides0),
+    InnerConfigMap0 =
+        #{
+            <<"enable">> => true,
+            <<"tags">> => [<<"bridge">>],
+            <<"description">> => <<"my cool bridge">>,
+
+            <<"authentication">> => <<"please override">>,
+            <<"bootstrap_hosts">> => <<"please override">>,
+            <<"connect_timeout">> => <<"5s">>,
+            <<"metadata_request_timeout">> => <<"5s">>,
+            <<"min_metadata_refresh_interval">> => <<"3s">>,
+
+            <<"resource_opts">> =>
+                #{
+                    <<"health_check_interval">> => <<"2s">>,
+                    <<"start_after_created">> => true,
+                    <<"start_timeout">> => <<"5s">>
+                }
+        },
+    InnerConfigMap = emqx_utils_maps:deep_merge(InnerConfigMap0, Overrides),
+    emqx_bridge_v2_testlib:parse_and_check_connector(?SOURCE_TYPE_BIN, Name, InnerConfigMap).
+
+connector_overrides(TCConfig) ->
+    MatrixParams = #{tls := TLS} = get_matrix_params(TCConfig),
+    Host = toxiproxy_host(MatrixParams),
+    Port = toxiproxy_kafka_port(MatrixParams),
+    BootstrapHosts = <<Host/binary, ":", (integer_to_binary(Port))/binary>>,
+    AuthConfig = auth_config(TCConfig),
+    #{
+        <<"bootstrap_hosts">> => BootstrapHosts,
+        <<"authentication">> => AuthConfig,
+        <<"ssl">> => #{<<"enable">> => TLS =:= tls}
+    }.
+
+source_config(Overrides0) ->
+    Overrides = emqx_utils_maps:binary_key_map(Overrides0),
+    CommonConfig =
+        #{
+            <<"enable">> => true,
+            <<"connector">> => <<"please override">>,
+            <<"parameters">> =>
+                #{
+                    <<"key_encoding_mode">> => <<"none">>,
+                    <<"max_batch_bytes">> => <<"896KB">>,
+                    <<"max_rejoin_attempts">> => <<"5">>,
+                    <<"offset_reset_policy">> => <<"latest">>,
+                    <<"topic">> => <<"please override">>,
+                    <<"value_encoding_mode">> => <<"none">>
+                },
+            <<"resource_opts">> => #{
+                <<"health_check_interval">> => <<"2s">>,
+                <<"resume_interval">> => <<"2s">>
+            }
+        },
+    maps:merge(CommonConfig, Overrides).
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_start_stop(matrix) ->
+    [
+        [plain, none],
+        [plain, sasl_auth_plain],
+        [plain, sasl_auth_scram256],
+        [plain, sasl_auth_scram512],
+        [plain, sasl_auth_kerberos],
+        [tls, none],
+        [tls, sasl_auth_plain]
+    ];
+t_start_stop(Config) ->
+    ok = emqx_bridge_v2_testlib:t_start_stop(Config, kafka_consumer_subcriber_and_client_stopped),
+    ok.
+
+t_create_via_http(Config) ->
+    ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
+    ok.
+
+t_consume(Config) ->
+    Topic = ?config(kafka_topic, Config),
+    NumPartitions = 1,
+    Key = <<"mykey">>,
+    Payload = #{<<"key">> => <<"value">>},
+    Encoded = emqx_utils_json:encode(Payload),
+    Headers = [{<<"hkey">>, <<"hvalue">>}],
+    HeadersMap = maps:from_list(Headers),
+    ProduceFn = fun() ->
+        emqx_bridge_kafka_impl_consumer_SUITE:publish(
+            Config,
+            Topic,
+            [
+                #{
+                    key => Key,
+                    value => Encoded,
+                    headers => Headers
+                }
+            ]
+        )
+    end,
+    CheckFn = fun(Message) ->
+        ?assertMatch(
+            #{
+                headers := HeadersMap,
+                key := Key,
+                offset := _,
+                topic := Topic,
+                ts := _,
+                ts_type := _,
+                value := Encoded
+            },
+            Message
+        )
+    end,
+    ok = emqx_bridge_v2_testlib:t_consume(
+        Config,
+        #{
+            consumer_ready_tracepoint => ?match_n_events(
+                NumPartitions,
+                #{?snk_kind := kafka_consumer_subscriber_init}
+            ),
+            produce_fn => ProduceFn,
+            check_fn => CheckFn,
+            produce_tracepoint => ?match_event(
+                #{
+                    ?snk_kind := kafka_consumer_handle_message,
+                    ?snk_span := {complete, _}
+                }
+            )
+        }
+    ),
+    ok.
+
+t_update_topic(Config) ->
+    %% Tests that, if a bridge originally has the legacy field `topic_mapping' filled in
+    %% and later is updated using v2 APIs, then the legacy field is cleared and the new
+    %% `topic' field is used.
+    ConnectorConfig = ?config(connector_config, Config),
+    SourceConfig = ?config(source_config, Config),
+    Name = ?config(source_name, Config),
+    V1Config0 = emqx_action_info:connector_action_config_to_bridge_v1_config(
+        ?SOURCE_TYPE_BIN,
+        ConnectorConfig,
+        SourceConfig
+    ),
+    V1Config = emqx_utils_maps:deep_put(
+        [<<"kafka">>, <<"topic_mapping">>],
+        V1Config0,
+        [
+            #{
+                <<"kafka_topic">> => <<"old_topic">>,
+                <<"mqtt_topic">> => <<"">>,
+                <<"qos">> => 2,
+                <<"payload_template">> => <<"template">>
+            }
+        ]
+    ),
+    %% Note: using v1 API
+    {ok, {{_, 201, _}, _, _}} = emqx_bridge_testlib:create_bridge_api(
+        ?SOURCE_TYPE_BIN,
+        Name,
+        V1Config
+    ),
+    ?assertMatch(
+        {ok, {{_, 200, _}, _, #{<<"parameters">> := #{<<"topic">> := <<"old_topic">>}}}},
+        emqx_bridge_v2_testlib:get_source_api(?SOURCE_TYPE_BIN, Name)
+    ),
+    %% Note: we don't add `topic_mapping' again here to the parameters.
+    {ok, {{_, 200, _}, _, _}} = emqx_bridge_v2_testlib:update_bridge_api(
+        Config,
+        #{<<"parameters">> => #{<<"topic">> => <<"new_topic">>}}
+    ),
+    ?assertMatch(
+        {ok, {{_, 200, _}, _, #{<<"parameters">> := #{<<"topic">> := <<"new_topic">>}}}},
+        emqx_bridge_v2_testlib:get_source_api(?SOURCE_TYPE_BIN, Name)
+    ),
+    ok.

+ 2 - 1
apps/emqx_connector/src/emqx_connector_resource.erl

@@ -159,7 +159,7 @@ update(ConnectorId, {OldConf, Conf}) ->
 update(Type, Name, {OldConf, Conf}) ->
     update(Type, Name, {OldConf, Conf}, #{}).
 
-update(Type, Name, {OldConf, Conf}, Opts) ->
+update(Type, Name, {OldConf, Conf0}, Opts) ->
     %% TODO: sometimes its not necessary to restart the connector connection.
     %%
     %% - if the connection related configs like `servers` is updated, we should restart/start
@@ -168,6 +168,7 @@ update(Type, Name, {OldConf, Conf}, Opts) ->
     %% the `method` or `headers` of a WebHook is changed, then the connector can be updated
     %% without restarting the connector.
     %%
+    Conf = Conf0#{connector_type => bin(Type), connector_name => bin(Name)},
     case emqx_utils_maps:if_only_to_toggle_enable(OldConf, Conf) of
         false ->
             ?SLOG(info, #{

+ 13 - 1
apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl

@@ -34,6 +34,8 @@ resource_type(gcp_pubsub_producer) ->
     emqx_bridge_gcp_pubsub_impl_producer;
 resource_type(hstreamdb) ->
     emqx_bridge_hstreamdb_connector;
+resource_type(kafka_consumer) ->
+    emqx_bridge_kafka_impl_consumer;
 resource_type(kafka_producer) ->
     emqx_bridge_kafka_impl_producer;
 resource_type(kinesis) ->
@@ -156,11 +158,19 @@ connector_structs() ->
                     required => false
                 }
             )},
+        {kafka_consumer,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_kafka_consumer_schema, "config_connector")),
+                #{
+                    desc => <<"Kafka Consumer Connector Config">>,
+                    required => false
+                }
+            )},
         {kafka_producer,
             mk(
                 hoconsc:map(name, ref(emqx_bridge_kafka, "config_connector")),
                 #{
-                    desc => <<"Kafka Connector Config">>,
+                    desc => <<"Kafka Producer Connector Config">>,
                     required => false
                 }
             )},
@@ -344,6 +354,7 @@ schema_modules() ->
         emqx_bridge_gcp_pubsub_producer_schema,
         emqx_bridge_hstreamdb,
         emqx_bridge_kafka,
+        emqx_bridge_kafka_consumer_schema,
         emqx_bridge_kinesis,
         emqx_bridge_matrix,
         emqx_bridge_mongodb,
@@ -392,6 +403,7 @@ api_schemas(Method) ->
         ),
         api_ref(emqx_bridge_hstreamdb, <<"hstreamdb">>, Method ++ "_connector"),
         api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
+        api_ref(emqx_bridge_kafka_consumer_schema, <<"kafka_consumer">>, Method ++ "_connector"),
         api_ref(emqx_bridge_kinesis, <<"kinesis">>, Method ++ "_connector"),
         api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"),
         api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"),

+ 7 - 1
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -134,6 +134,8 @@ connector_type_to_bridge_types(gcp_pubsub_producer) ->
     [gcp_pubsub, gcp_pubsub_producer];
 connector_type_to_bridge_types(hstreamdb) ->
     [hstreamdb];
+connector_type_to_bridge_types(kafka_consumer) ->
+    [kafka_consumer];
 connector_type_to_bridge_types(kafka_producer) ->
     [kafka, kafka_producer];
 connector_type_to_bridge_types(kinesis) ->
@@ -205,7 +207,11 @@ bridge_configs_to_transform(
                 emqx_utils_maps:deep_get(
                     [<<"actions">>, to_bin(BridgeType), to_bin(BridgeName)],
                     RawConfig,
-                    undefined
+                    emqx_utils_maps:deep_get(
+                        [<<"sources">>, to_bin(BridgeType), to_bin(BridgeName)],
+                        RawConfig,
+                        undefined
+                    )
                 ),
             [
                 {BridgeType, BridgeName, BridgeConf, ConnectorFields, PreviousRawConfig}

+ 14 - 4
apps/emqx_connector/test/emqx_connector_SUITE.erl

@@ -136,20 +136,30 @@ t_connector_lifecycle(_Config) ->
     ?assert(meck:validate(?CONNECTOR)),
     ?assertMatch(
         [
-            {_, {?CONNECTOR, callback_mode, []}, _},
             {_, {?CONNECTOR, on_start, [_, _]}, {ok, connector_state}},
             {_, {?CONNECTOR, on_get_status, [_, connector_state]}, connected},
             {_, {?CONNECTOR, on_stop, [_, connector_state]}, ok},
-            {_, {?CONNECTOR, on_stop, [_, connector_state]}, ok},
             {_, {?CONNECTOR, on_start, [_, _]}, {ok, connector_state}},
             {_, {?CONNECTOR, on_get_status, [_, connector_state]}, connected},
             {_, {?CONNECTOR, on_stop, [_, connector_state]}, ok},
-            {_, {?CONNECTOR, callback_mode, []}, _},
             {_, {?CONNECTOR, on_start, [_, _]}, {ok, connector_state}},
             {_, {?CONNECTOR, on_get_status, [_, connector_state]}, connected},
             {_, {?CONNECTOR, on_stop, [_, connector_state]}, ok}
         ],
-        meck:history(?CONNECTOR)
+        lists:filter(
+            fun({_, {?CONNECTOR, Fun, _Args}, _}) ->
+                lists:member(
+                    Fun, [
+                        on_start,
+                        on_stop,
+                        on_get_channels,
+                        on_get_status,
+                        on_add_channel
+                    ]
+                )
+            end,
+            meck:history(?CONNECTOR)
+        )
     ),
     ok.
 

+ 1 - 0
changes/ee/feat-12595.en.md

@@ -0,0 +1 @@
+The Kafka Consumer bridge has been split into connector and source components. Old Kafka Consumer bridges will be upgraded automatically.

+ 18 - 0
rel/i18n/emqx_bridge_kafka_consumer_schema.hocon

@@ -0,0 +1,18 @@
+emqx_bridge_kafka_consumer_schema {
+
+  source_parameters.desc:
+  """Source specific configs."""
+  source_parameters.label:
+  """Source Specific Configs"""
+
+  consumer_source.desc:
+  """Source configs."""
+  consumer_source.label:
+  """Source"""
+
+  config_connector.desc:
+  """Configuration for a Kafka Consumer Client."""
+  config_connector.label:
+  """Kafka Consumer Client Configuration"""
+
+}