瀏覽代碼

feat: implement kafka consumer

Thales Macedo Garitezi 2 年之前
父節點
當前提交
969fa03ecc

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge.app.src

@@ -2,7 +2,7 @@
 {application, emqx_bridge, [
     {description, "EMQX bridges"},
     {vsn, "0.1.13"},
-    {registered, []},
+    {registered, [emqx_bridge_sup]},
     {mod, {emqx_bridge_app, []}},
     {applications, [
         kernel,

+ 6 - 6
apps/emqx_bridge/src/emqx_bridge.erl

@@ -55,6 +55,7 @@
     T == gcp_pubsub;
     T == influxdb_api_v1;
     T == influxdb_api_v2;
+    T == kafka_producer;
     T == redis_single;
     T == redis_sentinel;
     T == redis_cluster;
@@ -137,12 +138,12 @@ load_hook(Bridges) ->
         maps:to_list(Bridges)
     ).
 
-do_load_hook(Type, #{local_topic := _}) when ?EGRESS_DIR_BRIDGES(Type) ->
+do_load_hook(Type, #{local_topic := LocalTopic}) when
+    ?EGRESS_DIR_BRIDGES(Type) andalso is_binary(LocalTopic)
+->
     emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
 do_load_hook(mqtt, #{egress := #{local := #{topic := _}}}) ->
     emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
-do_load_hook(kafka, #{producer := #{mqtt := #{topic := _}}}) ->
-    emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
 do_load_hook(_Type, _Conf) ->
     ok.
 
@@ -223,6 +224,7 @@ post_config_update(_, _Req, NewConf, OldConf, _AppEnv) ->
     ]),
     ok = unload_hook(),
     ok = load_hook(NewConf),
+    ?tp(bridge_post_config_update_done, #{}),
     Result.
 
 list() ->
@@ -406,9 +408,7 @@ get_matched_bridge_id(BType, Conf, Topic, BName, Acc) when ?EGRESS_DIR_BRIDGES(B
             do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc)
     end;
 get_matched_bridge_id(mqtt, #{egress := #{local := #{topic := Filter}}}, Topic, BName, Acc) ->
-    do_get_matched_bridge_id(Topic, Filter, mqtt, BName, Acc);
-get_matched_bridge_id(kafka, #{producer := #{mqtt := #{topic := Filter}}}, Topic, BName, Acc) ->
-    do_get_matched_bridge_id(Topic, Filter, kafka, BName, Acc).
+    do_get_matched_bridge_id(Topic, Filter, mqtt, BName, Acc).
 
 do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc) ->
     case emqx_topic:match(Topic, Filter) of

+ 9 - 2
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -45,7 +45,12 @@
 ]).
 
 %% bi-directional bridge with producer/consumer or ingress/egress configs
--define(IS_BI_DIR_BRIDGE(TYPE), TYPE =:= <<"mqtt">>; TYPE =:= <<"kafka">>).
+-define(IS_BI_DIR_BRIDGE(TYPE),
+    TYPE =:= <<"mqtt">>
+).
+-define(IS_INGRESS_BRIDGE(TYPE),
+    TYPE =:= <<"kafka_consumer">> orelse ?IS_BI_DIR_BRIDGE(TYPE)
+).
 
 -if(?EMQX_RELEASE_EDITION == ee).
 bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
@@ -297,12 +302,14 @@ parse_confs(
                 max_retries => Retry
             }
     };
-parse_confs(Type, Name, Conf) when ?IS_BI_DIR_BRIDGE(Type) ->
+parse_confs(Type, Name, Conf) when ?IS_INGRESS_BRIDGE(Type) ->
     %% For some drivers that can be used as data-sources, we need to provide a
     %% hookpoint. The underlying driver will run `emqx_hooks:run/3` when it
     %% receives a message from the external database.
     BId = bridge_id(Type, Name),
     Conf#{hookpoint => <<"$bridges/", BId/binary>>, bridge_name => Name};
+parse_confs(<<"kafka_producer">> = _Type, Name, Conf) ->
+    Conf#{bridge_name => Name};
 parse_confs(_Type, _Name, Conf) ->
     Conf.
 

+ 0 - 2
apps/emqx_bridge/src/emqx_bridge_sup.erl

@@ -34,5 +34,3 @@ init([]) ->
     },
     ChildSpecs = [],
     {ok, {SupFlags, ChildSpecs}}.
-
-%% internal functions

+ 3 - 0
changes/ee/feat-9564.en.md

@@ -0,0 +1,3 @@
+Implemented Kafka Consumer bridge.
+Now it's possible to consume messages from Kafka and publish them to
+MQTT topics.

+ 2 - 0
changes/ee/feat-9564.zh.md

@@ -0,0 +1,2 @@
+实现了Kafka消费者桥。
+现在可以从Kafka消费消息并将其发布到 MQTT主题。

+ 135 - 11
lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf

@@ -39,24 +39,24 @@ emqx_ee_bridge_kafka {
             zh: "桥接名字"
         }
     }
-    producer_opts {
+    kafka_producer {
         desc {
-            en: "Local MQTT data source and Kafka bridge configs."
-            zh: "本地 MQTT 数据源和 Kafka 桥接的配置。"
+            en: "Kafka Producer configuration."
+            zh: "Kafka Producer 配置。"
         }
         label {
-            en: "MQTT to Kafka"
-            zh: "MQTT 到 Kafka"
+            en: "Kafka Producer"
+            zh: "Kafka Producer"
         }
     }
-    producer_mqtt_opts {
+    producer_opts {
         desc {
-            en: "MQTT data source. Optional when used as a rule-engine action."
-            zh: "需要桥接到 MQTT 源主题。"
+            en: "Local MQTT data source and Kafka bridge configs."
+            zh: "本地 MQTT 数据源和 Kafka 桥接的配置。"
         }
         label {
-            en: "MQTT Source Topic"
-            zh: "MQTT 源主题"
+            en: "MQTT to Kafka"
+            zh: "MQTT 到 Kafka"
         }
     }
     mqtt_topic {
@@ -218,7 +218,7 @@ emqx_ee_bridge_kafka {
     }
     socket_nodelay {
         desc {
-            en: "When set to 'true', TCP buffer sent as soon as possible. "
+            en: "When set to 'true', TCP buffer is sent as soon as possible. "
                 "Otherwise, the OS kernel may buffer small TCP packets for a while (40 ms by default)."
             zh: "设置‘true’让系统内核立即发送。否则当需要发送的内容很少时,可能会有一定延迟(默认 40 毫秒)。"
         }
@@ -473,4 +473,128 @@ emqx_ee_bridge_kafka {
             zh: "GSSAPI/Kerberos"
         }
     }
+
+    kafka_consumer {
+        desc {
+            en: "Kafka Consumer configuration."
+            zh: "Kafka Consumer的配置。"
+        }
+        label {
+            en: "Kafka Consumer"
+            zh: "Kafka Consumer"
+        }
+    }
+    consumer_opts {
+        desc {
+            en: "Local MQTT data sink and Kafka bridge configs."
+            zh: "本地MQTT数据汇和Kafka桥配置。"
+        }
+        label {
+            en: "MQTT to Kafka"
+            zh: "MQTT 到 Kafka"
+        }
+    }
+    consumer_kafka_opts {
+        desc {
+            en: "Kafka consumer configs."
+            zh: "Kafka消费者配置。"
+        }
+        label {
+            en: "Kafka Consumer"
+            zh: "卡夫卡消费者"
+        }
+    }
+    consumer_mqtt_opts {
+        desc {
+            en: "MQTT data sink."
+            zh: "MQTT数据汇。"
+        }
+        label {
+            en: "MQTT data sink."
+            zh: "MQTT数据汇。"
+        }
+    }
+    consumer_mqtt_topic {
+        desc {
+            en: "Local topic to which consumed Kafka messages should be published to."
+            zh: "消耗的Kafka消息应该被发布到的本地主题。"
+        }
+        label {
+            en: "MQTT Topic"
+            zh: "MQTT主题"
+        }
+    }
+    consumer_mqtt_qos {
+        desc {
+            en: "MQTT QoS used to publish messages consumed from Kafka."
+            zh: "MQTT QoS用于发布从Kafka消耗的消息。"
+        }
+        label {
+            en: "MQTT Topic QoS"
+            zh: "MQTT 主题服务质量"
+        }
+    }
+    consumer_mqtt_payload {
+        desc {
+            en: "The payload of the MQTT message to be published.\n"
+                "<code>full_message</code> will encode all data available as a JSON object,"
+                "<code>message_value</code> will directly use the Kafka message value as the "
+                "MQTT message payload."
+            zh: "要发布的MQTT消息的有效载荷。"
+                "<code>full_message</code>将把所有可用数据编码为JSON对象,"
+                "<code>message_value</code>将直接使用Kafka消息值作为MQTT消息的有效载荷。"
+        }
+        label {
+            en: "MQTT Payload"
+            zh: "MQTT有效载荷"
+        }
+    }
+    consumer_kafka_topic {
+        desc {
+            en: "Kafka topic to consume from."
+            zh: "从Kafka主题消费。"
+        }
+        label {
+            en: "Kafka topic"
+            zh: "卡夫卡主题 "
+        }
+    }
+    consumer_max_batch_bytes {
+        desc {
+            en: "Maximum bytes to fetch in a batch of messages."
+                "NOTE: this value might be expanded to retry when "
+                "it is not enough to fetch even a single message, "
+                "then slowly shrink back to the given value."
+            zh: "在一批消息中要取的最大字节数。"
+                "注意:这个值可能会被扩大,"
+                "当它甚至不足以取到一条消息时,就会重试,"
+                "然后慢慢缩回到给定的值。"
+        }
+        label {
+            en: "Max Bytes"
+            zh: "最大字节数"
+        }
+    }
+    consumer_max_rejoin_attempts {
+        desc {
+            en: "Maximum number of times allowed for a member to re-join the group."
+            zh: "允许一个成员重新加入小组的最大次数。"
+        }
+        label {
+            en: "Max Rejoin Attempts"
+            zh: "最大的重新加入尝试"
+        }
+    }
+    consumer_offset_reset_policy {
+        desc {
+            en: "Defines how the consumers should reset the start offset when "
+                "a topic partition has and invalid or no initial offset."
+            zh: "定义当一个主题分区的初始偏移量无效或没有初始偏移量时,"
+                "消费者应如何重置开始偏移量。"
+        }
+        label {
+            en: "Offset Reset Policy"
+            zh: "偏移重置政策"
+        }
+    }
 }

+ 1 - 1
lib-ee/emqx_ee_bridge/rebar.config

@@ -2,7 +2,7 @@
 {deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.5"}}}
        , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.2"}}}
        , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}}
-       , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.7"}}}
+       , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}
        , {emqx_connector, {path, "../../apps/emqx_connector"}}
        , {emqx_resource, {path, "../../apps/emqx_resource"}}
        , {emqx_bridge, {path, "../../apps/emqx_bridge"}}

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src

@@ -1,7 +1,7 @@
 {application, emqx_ee_bridge, [
     {description, "EMQX Enterprise data bridges"},
     {vsn, "0.1.7"},
-    {registered, []},
+    {registered, [emqx_ee_bridge_kafka_consumer_sup]},
     {applications, [
         kernel,
         stdlib,

+ 14 - 11
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl

@@ -64,7 +64,8 @@ examples(Method) ->
     lists:foldl(Fun, #{}, schema_modules()).
 
 resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8));
-resource_type(kafka) -> emqx_bridge_impl_kafka;
+resource_type(kafka_consumer) -> emqx_bridge_impl_kafka_consumer;
+resource_type(kafka_producer) -> emqx_bridge_impl_kafka_producer;
 resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb;
 resource_type(gcp_pubsub) -> emqx_ee_connector_gcp_pubsub;
 resource_type(mongodb_rs) -> emqx_ee_connector_mongodb;
@@ -85,14 +86,6 @@ resource_type(dynamo) -> emqx_ee_connector_dynamo.
 
 fields(bridges) ->
     [
-        {kafka,
-            mk(
-                hoconsc:map(name, ref(emqx_ee_bridge_kafka, "config")),
-                #{
-                    desc => <<"Kafka Bridge Config">>,
-                    required => false
-                }
-            )},
         {hstreamdb,
             mk(
                 hoconsc:map(name, ref(emqx_ee_bridge_hstreamdb, "config")),
@@ -133,8 +126,8 @@ fields(bridges) ->
                     required => false
                 }
             )}
-    ] ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ pgsql_structs() ++
-        clickhouse_structs().
+    ] ++ kafka_structs() ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++
+        pgsql_structs() ++ clickhouse_structs().
 
 mongodb_structs() ->
     [
@@ -149,6 +142,16 @@ mongodb_structs() ->
      || Type <- [mongodb_rs, mongodb_sharded, mongodb_single]
     ].
 
+kafka_structs() ->
+    [
+        {Type,
+            mk(
+                hoconsc:map(name, ref(emqx_ee_bridge_kafka, Type)),
+                #{desc => <<"EMQX Enterprise Config">>, required => false}
+            )}
+     || Type <- [kafka_producer, kafka_consumer]
+    ].
+
 influxdb_structs() ->
     [
         {Protocol,

+ 51 - 23
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl

@@ -68,6 +68,10 @@ fields("put") ->
     fields("config");
 fields("get") ->
     emqx_bridge_schema:status_fields() ++ fields("post");
+fields(kafka_producer) ->
+    fields("config") ++ fields(producer_opts);
+fields(kafka_consumer) ->
+    fields("config") ++ fields(consumer_opts);
 fields("config") ->
     [
         {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
@@ -104,8 +108,6 @@ fields("config") ->
             mk(hoconsc:union([none, ref(auth_username_password), ref(auth_gssapi_kerberos)]), #{
                 default => none, desc => ?DESC("authentication")
             })},
-        {producer, mk(hoconsc:union([none, ref(producer_opts)]), #{desc => ?DESC(producer_opts)})},
-        %{consumer, mk(hoconsc:union([none, ref(consumer_opts)]), #{desc => ?DESC(consumer_opts)})},
         {socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})}
     ] ++ emqx_connector_schema_lib:ssl_fields();
 fields(auth_username_password) ->
@@ -156,15 +158,16 @@ fields(socket_opts) ->
     ];
 fields(producer_opts) ->
     [
-        {mqtt, mk(ref(producer_mqtt_opts), #{desc => ?DESC(producer_mqtt_opts)})},
+        %% Note: there's an implicit convention in `emqx_bridge' that,
+        %% for egress bridges with this config, the published messages
+        %% will be forwarded to such bridges.
+        {local_topic, mk(binary(), #{desc => ?DESC(mqtt_topic)})},
         {kafka,
             mk(ref(producer_kafka_opts), #{
                 required => true,
                 desc => ?DESC(producer_kafka_opts)
             })}
     ];
-fields(producer_mqtt_opts) ->
-    [{topic, mk(binary(), #{desc => ?DESC(mqtt_topic)})}];
 fields(producer_kafka_opts) ->
     [
         {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})},
@@ -241,24 +244,45 @@ fields(producer_buffer) ->
                 default => false,
                 desc => ?DESC(buffer_memory_overload_protection)
             })}
+    ];
+fields(consumer_opts) ->
+    [
+        {kafka,
+            mk(ref(consumer_kafka_opts), #{required => true, desc => ?DESC(consumer_kafka_opts)})},
+        {mqtt, mk(ref(consumer_mqtt_opts), #{required => true, desc => ?DESC(consumer_mqtt_opts)})}
+    ];
+fields(consumer_mqtt_opts) ->
+    [
+        {topic,
+            mk(binary(), #{
+                required => true,
+                desc => ?DESC(consumer_mqtt_topic)
+            })},
+        {qos, mk(emqx_schema:qos(), #{default => 0, desc => ?DESC(consumer_mqtt_qos)})},
+        {payload,
+            mk(
+                enum([full_message, message_value]),
+                #{default => full_message, desc => ?DESC(consumer_mqtt_payload)}
+            )}
+    ];
+fields(consumer_kafka_opts) ->
+    [
+        {topic, mk(binary(), #{desc => ?DESC(consumer_kafka_topic)})},
+        {max_batch_bytes,
+            mk(emqx_schema:bytesize(), #{
+                default => "896KB", desc => ?DESC(consumer_max_batch_bytes)
+            })},
+        {max_rejoin_attempts,
+            mk(non_neg_integer(), #{
+                default => 5, desc => ?DESC(consumer_max_rejoin_attempts)
+            })},
+        {offset_reset_policy,
+            mk(
+                enum([reset_to_latest, reset_to_earliest, reset_by_subscriber]),
+                #{default => reset_to_latest, desc => ?DESC(consumer_offset_reset_policy)}
+            )}
     ].
 
-% fields(consumer_opts) ->
-%     [
-%         {kafka, mk(ref(consumer_kafka_opts), #{required => true, desc => ?DESC(consumer_kafka_opts)})},
-%         {mqtt, mk(ref(consumer_mqtt_opts), #{required => true, desc => ?DESC(consumer_mqtt_opts)})}
-%     ];
-% fields(consumer_mqtt_opts) ->
-%     [ {topic, mk(string(), #{desc => ?DESC(consumer_mqtt_topic)})}
-%     ];
-
-% fields(consumer_mqtt_opts) ->
-%     [ {topic, mk(string(), #{desc => ?DESC(consumer_mqtt_topic)})}
-%     ];
-% fields(consumer_kafka_opts) ->
-%     [ {topic, mk(string(), #{desc => ?DESC(consumer_kafka_topic)})}
-%     ].
-
 desc("config") ->
     ?DESC("desc_config");
 desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
@@ -272,11 +296,15 @@ struct_names() ->
         auth_gssapi_kerberos,
         auth_username_password,
         kafka_message,
+        kafka_producer,
+        kafka_consumer,
         producer_buffer,
         producer_kafka_opts,
-        producer_mqtt_opts,
         socket_opts,
-        producer_opts
+        producer_opts,
+        consumer_opts,
+        consumer_kafka_opts,
+        consumer_mqtt_opts
     ].
 
 %% -------------------------------------------------------------------------------------------------

+ 31 - 27
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl

@@ -4,34 +4,38 @@
 
 %% Kafka connection configuration
 -module(emqx_bridge_impl_kafka).
--behaviour(emqx_resource).
 
-%% callbacks of behaviour emqx_resource
 -export([
-    callback_mode/0,
-    on_start/2,
-    on_stop/2,
-    on_query/3,
-    on_query_async/4,
-    on_get_status/2,
-    is_buffer_supported/0
+    hosts/1,
+    make_client_id/2,
+    sasl/1
 ]).
 
-is_buffer_supported() -> true.
-
-callback_mode() -> async_if_possible.
-
-on_start(InstId, Config) ->
-    emqx_bridge_impl_kafka_producer:on_start(InstId, Config).
-
-on_stop(InstId, State) ->
-    emqx_bridge_impl_kafka_producer:on_stop(InstId, State).
-
-on_query(InstId, Req, State) ->
-    emqx_bridge_impl_kafka_producer:on_query(InstId, Req, State).
-
-on_query_async(InstId, Req, ReplyFn, State) ->
-    emqx_bridge_impl_kafka_producer:on_query_async(InstId, Req, ReplyFn, State).
-
-on_get_status(InstId, State) ->
-    emqx_bridge_impl_kafka_producer:on_get_status(InstId, State).
+%% Parse comma separated host:port list into a [{Host,Port}] list
+hosts(Hosts) when is_binary(Hosts) ->
+    hosts(binary_to_list(Hosts));
+hosts(Hosts) when is_list(Hosts) ->
+    kpro:parse_endpoints(Hosts).
+
+%% Client ID is better to be unique to make it easier for Kafka side trouble shooting.
+make_client_id(KafkaType0, BridgeName0) ->
+    KafkaType = to_bin(KafkaType0),
+    BridgeName = to_bin(BridgeName0),
+    iolist_to_binary([KafkaType, ":", BridgeName, ":", atom_to_list(node())]).
+
+sasl(none) ->
+    undefined;
+sasl(#{mechanism := Mechanism, username := Username, password := Password}) ->
+    {Mechanism, Username, emqx_secret:wrap(Password)};
+sasl(#{
+    kerberos_principal := Principal,
+    kerberos_keytab_file := KeyTabFile
+}) ->
+    {callback, brod_gssapi, {gssapi, KeyTabFile, Principal}}.
+
+to_bin(A) when is_atom(A) ->
+    atom_to_binary(A);
+to_bin(L) when is_list(L) ->
+    list_to_binary(L);
+to_bin(B) when is_binary(B) ->
+    B.

+ 381 - 0
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl

@@ -0,0 +1,381 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_impl_kafka_consumer).
+
+-behaviour(emqx_resource).
+
+%% `emqx_resource' API
+-export([
+    callback_mode/0,
+    is_buffer_supported/0,
+    on_start/2,
+    on_stop/2,
+    on_get_status/2
+]).
+
+%% `brod_group_consumer' API
+-export([
+    init/2,
+    handle_message/2
+]).
+
+-ifdef(TEST).
+-export([consumer_group_id/1]).
+-endif.
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+%% needed for the #kafka_message record definition
+-include_lib("brod/include/brod.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+
+-type config() :: #{
+    authentication := term(),
+    bootstrap_hosts := binary(),
+    bridge_name := atom(),
+    kafka := #{
+        max_batch_bytes := emqx_schema:bytesize(),
+        max_rejoin_attempts := non_neg_integer(),
+        offset_reset_policy := offset_reset_policy(),
+        topic := binary()
+    },
+    mqtt := #{
+        topic := emqx_types:topic(),
+        qos := emqx_types:qos(),
+        payload := mqtt_payload()
+    },
+    ssl := _,
+    any() => term()
+}.
+-type subscriber_id() :: emqx_ee_bridge_kafka_consumer_sup:child_id().
+-type state() :: #{
+    kafka_topic := binary(),
+    subscriber_id := subscriber_id(),
+    kafka_client_id := brod:client_id()
+}.
+-type offset_reset_policy() :: reset_to_latest | reset_to_earliest | reset_by_subscriber.
+-type mqtt_payload() :: full_message | message_value.
+-type consumer_state() :: #{
+    resource_id := resource_id(),
+    mqtt := #{
+        payload := mqtt_payload(),
+        topic => emqx_types:topic(),
+        qos => emqx_types:qos()
+    },
+    hookpoint := binary(),
+    kafka_topic := binary()
+}.
+
+%%-------------------------------------------------------------------------------------
+%% `emqx_resource' API
+%%-------------------------------------------------------------------------------------
+
+callback_mode() ->
+    async_if_possible.
+
+%% there are no queries to be made to this bridge, so we say that
+%% buffer is supported so we don't spawn unused resource buffer
+%% workers.
+is_buffer_supported() ->
+    true.
+
+-spec on_start(manager_id(), config()) -> {ok, state()}.
+on_start(InstanceId, Config) ->
+    ensure_consumer_supervisor_started(),
+    #{
+        authentication := Auth,
+        bootstrap_hosts := BootstrapHosts0,
+        bridge_name := BridgeName,
+        hookpoint := Hookpoint,
+        kafka := #{
+            max_batch_bytes := MaxBatchBytes,
+            max_rejoin_attempts := MaxRejoinAttempts,
+            offset_reset_policy := OffsetResetPolicy,
+            topic := KafkaTopic
+        },
+        mqtt := #{topic := MQTTTopic, qos := MQTTQoS, payload := MQTTPayload},
+        ssl := SSL
+    } = Config,
+    BootstrapHosts = emqx_bridge_impl_kafka:hosts(BootstrapHosts0),
+    GroupConfig = [{max_rejoin_attempts, MaxRejoinAttempts}],
+    ConsumerConfig = [
+        {max_bytes, MaxBatchBytes},
+        {offset_reset_policy, OffsetResetPolicy}
+    ],
+    InitialState = #{
+        resource_id => emqx_bridge_resource:resource_id(kafka_consumer, BridgeName),
+        mqtt => #{
+            payload => MQTTPayload,
+            topic => MQTTTopic,
+            qos => MQTTQoS
+        },
+        hookpoint => Hookpoint,
+        kafka_topic => KafkaTopic
+    },
+    KafkaType = kafka_consumer,
+    %% Note: this is distinct per node.
+    ClientID0 = emqx_bridge_impl_kafka:make_client_id(KafkaType, BridgeName),
+    ClientID = binary_to_atom(ClientID0),
+    ClientOpts0 =
+        case Auth of
+            none -> [];
+            Auth -> [{sasl, emqx_bridge_impl_kafka:sasl(Auth)}]
+        end,
+    ClientOpts = add_ssl_opts(ClientOpts0, SSL),
+    case brod:start_client(BootstrapHosts, ClientID, ClientOpts) of
+        ok ->
+            ?tp(
+                kafka_consumer_client_started,
+                #{client_id => ClientID, instance_id => InstanceId}
+            ),
+            ?SLOG(info, #{
+                msg => "kafka_consumer_client_started",
+                instance_id => InstanceId,
+                kafka_hosts => BootstrapHosts
+            });
+        {error, Reason} ->
+            ?SLOG(error, #{
+                msg => "failed_to_start_kafka_consumer_client",
+                instance_id => InstanceId,
+                kafka_hosts => BootstrapHosts,
+                reason => Reason
+            }),
+            throw(failed_to_start_kafka_client)
+    end,
+    %% note: the group id should be the same for all nodes in the
+    %% cluster, so that the load gets distributed between all
+    %% consumers and we don't repeat messages in the same cluster.
+    GroupID = consumer_group_id(BridgeName),
+    GroupSubscriberConfig =
+        #{
+            client => ClientID,
+            group_id => GroupID,
+            topics => [KafkaTopic],
+            cb_module => ?MODULE,
+            init_data => InitialState,
+            message_type => message,
+            consumer_config => ConsumerConfig,
+            group_config => GroupConfig
+        },
+    %% Below, we spawn a single `brod_group_consumer_v2' worker, with
+    %% no option for a pool of those. This is because that worker
+    %% spawns one worker for each assigned topic-partition
+    %% automatically, so we should not spawn duplicate workers.
+    SubscriberId = make_subscriber_id(BridgeName),
+    case emqx_ee_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of
+        {ok, _ConsumerPid} ->
+            ?tp(
+                kafka_consumer_subscriber_started,
+                #{instance_id => InstanceId, subscriber_id => SubscriberId}
+            ),
+            {ok, #{
+                subscriber_id => SubscriberId,
+                kafka_client_id => ClientID,
+                kafka_topic => KafkaTopic
+            }};
+        {error, Reason2} ->
+            ?SLOG(error, #{
+                msg => "failed_to_start_kafka_consumer",
+                instance_id => InstanceId,
+                kafka_hosts => BootstrapHosts,
+                kafka_topic => KafkaTopic,
+                reason => Reason2
+            }),
+            stop_client(ClientID),
+            throw(failed_to_start_kafka_consumer)
+    end.
+
+-spec on_stop(manager_id(), state()) -> ok.
+on_stop(_InstanceID, State) ->
+    #{
+        subscriber_id := SubscriberId,
+        kafka_client_id := ClientID
+    } = State,
+    stop_subscriber(SubscriberId),
+    stop_client(ClientID),
+    ok.
+
+-spec on_get_status(manager_id(), state()) -> connected | disconnected.
+on_get_status(_InstanceID, State) ->
+    #{
+        subscriber_id := SubscriberId,
+        kafka_client_id := ClientID,
+        kafka_topic := KafkaTopic
+    } = State,
+    case brod:get_partitions_count(ClientID, KafkaTopic) of
+        {ok, NPartitions} ->
+            do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions);
+        _ ->
+            disconnected
+    end.
+
+%%-------------------------------------------------------------------------------------
+%% `brod_group_subscriber' API
+%%-------------------------------------------------------------------------------------
+
+-spec init(_, consumer_state()) -> {ok, consumer_state()}.
+init(_GroupData, State) ->
+    ?tp(kafka_consumer_subscriber_init, #{group_data => _GroupData, state => State}),
+    {ok, State}.
+
+-spec handle_message(#kafka_message{}, consumer_state()) -> {ok, commit, consumer_state()}.
+handle_message(Message, State) ->
+    ?tp_span(
+        kafka_consumer_handle_message,
+        #{message => Message, state => State},
+        begin
+            #{
+                resource_id := ResourceId,
+                hookpoint := Hookpoint,
+                kafka_topic := KafkaTopic,
+                mqtt := #{
+                    topic := MQTTTopic,
+                    payload := MQTTPayload,
+                    qos := MQTTQoS
+                }
+            } = State,
+            FullMessage = #{
+                offset => Message#kafka_message.offset,
+                key => Message#kafka_message.key,
+                value => Message#kafka_message.value,
+                ts => Message#kafka_message.ts,
+                ts_type => Message#kafka_message.ts_type,
+                headers => maps:from_list(Message#kafka_message.headers),
+                topic => KafkaTopic
+            },
+            Payload =
+                case MQTTPayload of
+                    full_message ->
+                        FullMessage;
+                    message_value ->
+                        Message#kafka_message.value
+                end,
+            EncodedPayload = emqx_json:encode(Payload),
+            MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, EncodedPayload),
+            _ = emqx:publish(MQTTMessage),
+            emqx:run_hook(Hookpoint, [FullMessage]),
+            emqx_resource_metrics:received_inc(ResourceId),
+            %% note: just `ack' does not commit the offset to the
+            %% kafka consumer group.
+            {ok, commit, State}
+        end
+    ).
+
+%%-------------------------------------------------------------------------------------
+%% Helper fns
+%%-------------------------------------------------------------------------------------
+
+add_ssl_opts(ClientOpts, #{enable := false}) ->
+    ClientOpts;
+add_ssl_opts(ClientOpts, SSL) ->
+    [{ssl, emqx_tls_lib:to_client_opts(SSL)} | ClientOpts].
+
+-spec make_subscriber_id(atom()) -> emqx_ee_bridge_kafka_consumer_sup:child_id().
+make_subscriber_id(BridgeName) ->
+    BridgeNameBin = atom_to_binary(BridgeName),
+    <<"kafka_subscriber:", BridgeNameBin/binary>>.
+
+ensure_consumer_supervisor_started() ->
+    Mod = emqx_ee_bridge_kafka_consumer_sup,
+    ChildSpec =
+        #{
+            id => Mod,
+            start => {Mod, start_link, []},
+            restart => permanent,
+            shutdown => infinity,
+            type => supervisor,
+            modules => [Mod]
+        },
+    case supervisor:start_child(emqx_bridge_sup, ChildSpec) of
+        {ok, _Pid} ->
+            ok;
+        {error, already_present} ->
+            ok;
+        {error, {already_started, _Pid}} ->
+            ok
+    end.
+
+-spec stop_subscriber(emqx_ee_bridge_kafka_consumer_sup:child_id()) -> ok.
+stop_subscriber(SubscriberId) ->
+    _ = log_when_error(
+        fun() ->
+            emqx_ee_bridge_kafka_consumer_sup:ensure_child_deleted(SubscriberId)
+        end,
+        #{
+            msg => "failed_to_delete_kafka_subscriber",
+            subscriber_id => SubscriberId
+        }
+    ),
+    ok.
+
+-spec stop_client(brod:client_id()) -> ok.
+stop_client(ClientID) ->
+    _ = log_when_error(
+        fun() ->
+            brod:stop_client(ClientID)
+        end,
+        #{
+            msg => "failed_to_delete_kafka_consumer_client",
+            client_id => ClientID
+        }
+    ),
+    ok.
+
+-spec do_get_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) ->
+    connected | disconnected.
+do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
+    Results =
+        lists:map(
+            fun(N) ->
+                brod_client:get_leader_connection(ClientID, KafkaTopic, N)
+            end,
+            lists:seq(0, NPartitions - 1)
+        ),
+    AllLeadersOk =
+        length(Results) > 0 andalso
+            lists:all(
+                fun
+                    ({ok, _}) ->
+                        true;
+                    (_) ->
+                        false
+                end,
+                Results
+            ),
+    WorkersAlive = are_subscriber_workers_alive(SubscriberId),
+    case AllLeadersOk andalso WorkersAlive of
+        true ->
+            connected;
+        false ->
+            disconnected
+    end.
+
+are_subscriber_workers_alive(SubscriberId) ->
+    Children = supervisor:which_children(emqx_ee_bridge_kafka_consumer_sup),
+    case lists:keyfind(SubscriberId, 1, Children) of
+        false ->
+            false;
+        {_, Pid, _, _} ->
+            Workers = brod_group_subscriber_v2:get_workers(Pid),
+            %% we can't enforce the number of partitions on a single
+            %% node, as the group might be spread across an emqx
+            %% cluster.
+            lists:all(fun is_process_alive/1, maps:values(Workers))
+    end.
+
+log_when_error(Fun, Log) ->
+    try
+        Fun()
+    catch
+        C:E ->
+            ?SLOG(error, Log#{
+                exception => C,
+                reason => E
+            })
+    end.
+
+-spec consumer_group_id(atom()) -> binary().
+consumer_group_id(BridgeName0) ->
+    BridgeName = atom_to_binary(BridgeName0),
+    <<"emqx-kafka-consumer:", BridgeName/binary>>.

+ 13 - 49
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl

@@ -27,39 +27,31 @@ callback_mode() -> async_if_possible.
 %% @doc Config schema is defined in emqx_ee_bridge_kafka.
 on_start(InstId, Config) ->
     #{
-        bridge_name := BridgeName,
+        authentication := Auth,
         bootstrap_hosts := Hosts0,
+        bridge_name := BridgeName,
         connect_timeout := ConnTimeout,
+        kafka := KafkaConfig = #{message := MessageTemplate, topic := KafkaTopic},
         metadata_request_timeout := MetaReqTimeout,
         min_metadata_refresh_interval := MinMetaRefreshInterval,
         socket_opts := SocketOpts,
-        authentication := Auth,
         ssl := SSL
     } = Config,
-    %% TODO: change this to `kafka_producer` after refactoring for kafka_consumer
-    BridgeType = kafka,
-    ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
-    _ = maybe_install_wolff_telemetry_handlers(ResourceID),
-    %% it's a bug if producer config is not found
-    %% the caller should not try to start a producer if
-    %% there is no producer config
-    ProducerConfigWrapper = get_required(producer, Config, no_kafka_producer_config),
-    ProducerConfig = get_required(kafka, ProducerConfigWrapper, no_kafka_producer_parameters),
-    MessageTemplate = get_required(message, ProducerConfig, no_kafka_message_template),
-    Hosts = hosts(Hosts0),
-    ClientId = make_client_id(BridgeName),
+    BridgeType = kafka_consumer,
+    ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
+    _ = maybe_install_wolff_telemetry_handlers(ResourceId),
+    Hosts = emqx_bridge_impl_kafka:hosts(Hosts0),
+    KafkaType = kafka_producer,
+    ClientId = emqx_bridge_impl_kafka:make_client_id(KafkaType, BridgeName),
     ClientConfig = #{
         min_metadata_refresh_interval => MinMetaRefreshInterval,
         connect_timeout => ConnTimeout,
         client_id => ClientId,
         request_timeout => MetaReqTimeout,
         extra_sock_opts => socket_opts(SocketOpts),
-        sasl => sasl(Auth),
+        sasl => emqx_bridge_impl_kafka:sasl(Auth),
         ssl => ssl(SSL)
     },
-    #{
-        topic := KafkaTopic
-    } = ProducerConfig,
     case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
         {ok, _} ->
             ?SLOG(info, #{
@@ -85,7 +77,7 @@ on_start(InstId, Config) ->
             _ ->
                 string:equal(TestIdStart, InstId)
         end,
-    WolffProducerConfig = producers_config(BridgeName, ClientId, ProducerConfig, IsDryRun),
+    WolffProducerConfig = producers_config(BridgeName, ClientId, KafkaConfig, IsDryRun),
     case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of
         {ok, Producers} ->
             {ok, #{
@@ -93,7 +85,7 @@ on_start(InstId, Config) ->
                 client_id => ClientId,
                 kafka_topic => KafkaTopic,
                 producers => Producers,
-                resource_id => ResourceID
+                resource_id => ResourceId
             }};
         {error, Reason2} ->
             ?SLOG(error, #{
@@ -265,12 +257,6 @@ do_get_status(Client, KafkaTopic) ->
             disconnected
     end.
 
-%% Parse comma separated host:port list into a [{Host,Port}] list
-hosts(Hosts) when is_binary(Hosts) ->
-    hosts(binary_to_list(Hosts));
-hosts(Hosts) when is_list(Hosts) ->
-    kpro:parse_endpoints(Hosts).
-
 %% Extra socket options, such as sndbuf size etc.
 socket_opts(Opts) when is_map(Opts) ->
     socket_opts(maps:to_list(Opts));
@@ -298,16 +284,6 @@ adjust_socket_buffer(Bytes, Opts) ->
             [{buffer, max(Bytes1, Bytes)} | Acc1]
     end.
 
-sasl(none) ->
-    undefined;
-sasl(#{mechanism := Mechanism, username := Username, password := Password}) ->
-    {Mechanism, Username, emqx_secret:wrap(Password)};
-sasl(#{
-    kerberos_principal := Principal,
-    kerberos_keytab_file := KeyTabFile
-}) ->
-    {callback, brod_gssapi, {gssapi, KeyTabFile, Principal}}.
-
 ssl(#{enable := true} = SSL) ->
     emqx_tls_lib:to_client_opts(SSL);
 ssl(_) ->
@@ -339,8 +315,7 @@ producers_config(BridgeName, ClientId, Input, IsDryRun) ->
             disk -> {false, replayq_dir(ClientId)};
             hybrid -> {true, replayq_dir(ClientId)}
         end,
-    %% TODO: change this once we add kafka source
-    BridgeType = kafka,
+    BridgeType = kafka_producer,
     ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
     #{
         name => make_producer_name(BridgeName, IsDryRun),
@@ -366,12 +341,6 @@ partitioner(key_dispatch) -> first_key_dispatch.
 replayq_dir(ClientId) ->
     filename:join([emqx:data_dir(), "kafka", ClientId]).
 
-%% Client ID is better to be unique to make it easier for Kafka side trouble shooting.
-make_client_id(BridgeName) when is_atom(BridgeName) ->
-    make_client_id(atom_to_list(BridgeName));
-make_client_id(BridgeName) ->
-    iolist_to_binary([BridgeName, ":", atom_to_list(node())]).
-
 %% Producer name must be an atom which will be used as a ETS table name for
 %% partition worker lookup.
 make_producer_name(BridgeName, IsDryRun) when is_atom(BridgeName) ->
@@ -400,11 +369,6 @@ with_log_at_error(Fun, Log) ->
             })
     end.
 
-get_required(Field, Config, Throw) ->
-    Value = maps:get(Field, Config, none),
-    Value =:= none andalso throw(Throw),
-    Value.
-
 %% we *must* match the bridge id in the event metadata with that in
 %% the handler config; otherwise, multiple kafka producer bridges will
 %% install multiple handlers to the same wolff events, multiplying the

+ 79 - 0
lib-ee/emqx_ee_bridge/src/kafka/emqx_ee_bridge_kafka_consumer_sup.erl

@@ -0,0 +1,79 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ee_bridge_kafka_consumer_sup).
+
+-behaviour(supervisor).
+
+%% `supervisor' API
+-export([init/1]).
+
+%% API
+-export([
+    start_link/0,
+    child_spec/2,
+    start_child/2,
+    ensure_child_deleted/1
+]).
+
+-type child_id() :: binary().
+-export_type([child_id/0]).
+
+%%--------------------------------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------------------------------
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+-spec child_spec(child_id(), map()) -> supervisor:child_spec().
+child_spec(Id, GroupSubscriberConfig) ->
+    Mod = brod_group_subscriber_v2,
+    #{
+        id => Id,
+        start => {Mod, start_link, [GroupSubscriberConfig]},
+        restart => permanent,
+        shutdown => 10_000,
+        type => worker,
+        modules => [Mod]
+    }.
+
+-spec start_child(child_id(), map()) -> {ok, pid()} | {error, term()}.
+start_child(Id, GroupSubscriberConfig) ->
+    ChildSpec = child_spec(Id, GroupSubscriberConfig),
+    case supervisor:start_child(?MODULE, ChildSpec) of
+        {ok, Pid} ->
+            {ok, Pid};
+        {ok, Pid, _Info} ->
+            {ok, Pid};
+        {error, already_present} ->
+            supervisor:restart_child(?MODULE, Id);
+        {error, {already_started, Pid}} ->
+            {ok, Pid};
+        {error, Error} ->
+            {error, Error}
+    end.
+
+-spec ensure_child_deleted(child_id()) -> ok.
+ensure_child_deleted(Id) ->
+    case supervisor:terminate_child(?MODULE, Id) of
+        ok ->
+            ok = supervisor:delete_child(?MODULE, Id),
+            ok;
+        {error, not_found} ->
+            ok
+    end.
+
+%%--------------------------------------------------------------------------------------------
+%% `supervisor' API
+%%--------------------------------------------------------------------------------------------
+
+init([]) ->
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 100,
+        period => 10
+    },
+    ChildSpecs = [],
+    {ok, {SupFlags, ChildSpecs}}.

文件差異過大導致無法顯示
+ 1351 - 0
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl


+ 138 - 66
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl

@@ -11,7 +11,7 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("brod/include/brod.hrl").
 
--define(PRODUCER, emqx_bridge_impl_kafka).
+-define(PRODUCER, emqx_bridge_impl_kafka_producer).
 
 %%------------------------------------------------------------------------------
 %% Things for REST API tests
@@ -71,6 +71,10 @@ wait_until_kafka_is_up(Attempts) ->
     end.
 
 init_per_suite(Config) ->
+    %% ensure loaded
+    _ = application:load(emqx_ee_bridge),
+    _ = emqx_ee_bridge:module_info(),
+    application:load(emqx_bridge),
     ok = emqx_common_test_helpers:start_apps([emqx_conf]),
     ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]),
     {ok, _} = application:ensure_all_started(emqx_connector),
@@ -102,6 +106,13 @@ init_per_group(GroupName, Config) ->
 end_per_group(_, _) ->
     ok.
 
+init_per_testcase(_TestCase, Config) ->
+    Config.
+
+end_per_testcase(_TestCase, _Config) ->
+    delete_all_bridges(),
+    ok.
+
 set_special_configs(emqx_management) ->
     Listeners = #{http => #{port => 8081}},
     Config = #{
@@ -222,7 +233,7 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) ->
     ok.
 
 kafka_bridge_rest_api_helper(Config) ->
-    BridgeType = "kafka",
+    BridgeType = "kafka_producer",
     BridgeName = "my_kafka_bridge",
     BridgeID = emqx_bridge_resource:bridge_id(
         erlang:list_to_binary(BridgeType),
@@ -266,24 +277,18 @@ kafka_bridge_rest_api_helper(Config) ->
     %% Create new Kafka bridge
     KafkaTopic = "test-topic-one-partition",
     CreateBodyTmp = #{
-        <<"type">> => <<"kafka">>,
+        <<"type">> => <<"kafka_producer">>,
         <<"name">> => <<"my_kafka_bridge">>,
         <<"bootstrap_hosts">> => iolist_to_binary(maps:get(<<"bootstrap_hosts">>, Config)),
         <<"enable">> => true,
         <<"authentication">> => maps:get(<<"authentication">>, Config),
-        <<"producer">> => #{
-            <<"mqtt">> => #{
-                topic => <<"t/#">>
-            },
-            <<"kafka">> => #{
-                <<"topic">> => iolist_to_binary(KafkaTopic),
-                <<"buffer">> => #{
-                    <<"memory_overload_protection">> => <<"false">>
-                },
-                <<"message">> => #{
-                    <<"key">> => <<"${clientid}">>,
-                    <<"value">> => <<"${.payload}">>
-                }
+        <<"local_topic">> => <<"t/#">>,
+        <<"kafka">> => #{
+            <<"topic">> => iolist_to_binary(KafkaTopic),
+            <<"buffer">> => #{<<"memory_overload_protection">> => <<"false">>},
+            <<"message">> => #{
+                <<"key">> => <<"${clientid}">>,
+                <<"value">> => <<"${.payload}">>
             }
         }
     },
@@ -355,6 +360,7 @@ kafka_bridge_rest_api_helper(Config) ->
     %% Cleanup
     {ok, 204, _} = show(http_delete(BridgesPartsIdDeleteAlsoActions)),
     false = MyKafkaBridgeExists(),
+    delete_all_bridges(),
     ok.
 
 %%------------------------------------------------------------------------------
@@ -371,9 +377,10 @@ t_failed_creation_then_fix(Config) ->
     ValidAuthSettings = valid_sasl_plain_settings(),
     WrongAuthSettings = ValidAuthSettings#{"password" := "wrong"},
     Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
+    Type = kafka_producer,
     Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
-    ResourceId = emqx_bridge_resource:resource_id("kafka", Name),
-    BridgeId = emqx_bridge_resource:bridge_id("kafka", Name),
+    ResourceId = emqx_bridge_resource:resource_id("kafka_producer", Name),
+    BridgeId = emqx_bridge_resource:bridge_id("kafka_producer", Name),
     KafkaTopic = "test-topic-one-partition",
     WrongConf = config(#{
         "authentication" => WrongAuthSettings,
@@ -397,15 +404,19 @@ t_failed_creation_then_fix(Config) ->
         "ssl" => #{}
     }),
     %% creates, but fails to start producers
-    %% FIXME: change to kafka_producer after config refactoring
-    ?assertMatch(ok, emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), WrongConf, #{})),
-    ?assertThrow(failed_to_start_kafka_producer, ?PRODUCER:on_start(ResourceId, WrongConf)),
+    {ok, #{config := WrongConfigAtom1}} = emqx_bridge:create(
+        Type, erlang:list_to_atom(Name), WrongConf
+    ),
+    WrongConfigAtom = WrongConfigAtom1#{bridge_name => Name},
+    ?assertThrow(failed_to_start_kafka_producer, ?PRODUCER:on_start(ResourceId, WrongConfigAtom)),
     %% before throwing, it should cleanup the client process.
     ?assertEqual([], supervisor:which_children(wolff_client_sup)),
-    %% FIXME: change to kafka_producer after config refactoring
     %% must succeed with correct config
-    ?assertMatch(ok, emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), ValidConf, #{})),
-    {ok, State} = ?PRODUCER:on_start(ResourceId, ValidConf),
+    {ok, #{config := ValidConfigAtom1}} = emqx_bridge:create(
+        Type, erlang:list_to_atom(Name), ValidConf
+    ),
+    ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name},
+    {ok, State} = ?PRODUCER:on_start(ResourceId, ValidConfigAtom),
     %% To make sure we get unique value
     timer:sleep(1),
     Time = erlang:monotonic_time(),
@@ -423,6 +434,7 @@ t_failed_creation_then_fix(Config) ->
     %% TODO: refactor those into init/end per testcase
     ok = ?PRODUCER:on_stop(ResourceId, State),
     ok = emqx_bridge_resource:remove(BridgeId),
+    delete_all_bridges(),
     ok.
 
 %%------------------------------------------------------------------------------
@@ -487,6 +499,7 @@ publish_helper(
     },
     Conf0
 ) ->
+    delete_all_bridges(),
     HostsString =
         case {AuthSettings, SSLSettings} of
             {"none", Map} when map_size(Map) =:= 0 ->
@@ -500,8 +513,8 @@ publish_helper(
         end,
     Hash = erlang:phash2([HostsString, AuthSettings, SSLSettings]),
     Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
-    InstId = emqx_bridge_resource:resource_id("kafka", Name),
-    BridgeId = emqx_bridge_resource:bridge_id("kafka", Name),
+    Type = "kafka_producer",
+    InstId = emqx_bridge_resource:resource_id(Type, Name),
     KafkaTopic = "test-topic-one-partition",
     Conf = config(
         #{
@@ -509,30 +522,40 @@ publish_helper(
             "kafka_hosts_string" => HostsString,
             "kafka_topic" => KafkaTopic,
             "instance_id" => InstId,
+            "local_topic" => <<"mqtt/local">>,
             "ssl" => SSLSettings
         },
         Conf0
     ),
-
-    emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), Conf, #{}),
+    {ok, _} = emqx_bridge:create(
+        <<"kafka_producer">>, list_to_binary(Name), Conf
+    ),
     %% To make sure we get unique value
     timer:sleep(1),
     Time = erlang:monotonic_time(),
     BinTime = integer_to_binary(Time),
+    Partition = 0,
     Msg = #{
         clientid => BinTime,
         payload => <<"payload">>,
         timestamp => Time
     },
-    {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
-    ct:pal("base offset before testing ~p", [Offset]),
-    StartRes = ?PRODUCER:on_start(InstId, Conf),
-    {ok, State} = StartRes,
+    {ok, Offset0} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition),
+    ct:pal("base offset before testing ~p", [Offset0]),
+    {ok, _Group, #{state := State}} = emqx_resource:get_instance(InstId),
     ok = send(CtConfig, InstId, Msg, State),
-    {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
-    ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
-    ok = ?PRODUCER:on_stop(InstId, State),
-    ok = emqx_bridge_resource:remove(BridgeId),
+    {ok, {_, [KafkaMsg0]}} = brod:fetch(kafka_hosts(), KafkaTopic, Partition, Offset0),
+    ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg0),
+
+    %% test that it forwards from local mqtt topic as well
+    {ok, Offset1} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition),
+    ct:pal("base offset before testing (2) ~p", [Offset1]),
+    emqx:publish(emqx_message:make(<<"mqtt/local">>, <<"payload">>)),
+    ct:sleep(2_000),
+    {ok, {_, [KafkaMsg1]}} = brod:fetch(kafka_hosts(), KafkaTopic, Partition, Offset1),
+    ?assertMatch(#kafka_message{value = <<"payload">>}, KafkaMsg1),
+
+    delete_all_bridges(),
     ok.
 
 default_config() ->
@@ -545,18 +568,24 @@ config(Args0, More) ->
     Args1 = maps:merge(default_config(), Args0),
     Args = maps:merge(Args1, More),
     ConfText = hocon_config(Args),
-    ct:pal("Running tests with conf:\n~s", [ConfText]),
-    {ok, Conf} = hocon:binary(ConfText),
-    #{config := Parsed} = hocon_tconf:check_plain(
-        emqx_ee_bridge_kafka,
-        #{<<"config">> => Conf},
-        #{atom_key => true}
-    ),
+    {ok, Conf} = hocon:binary(ConfText, #{format => map}),
+    ct:pal("Running tests with conf:\n~p", [Conf]),
     InstId = maps:get("instance_id", Args),
     <<"bridge:", BridgeId/binary>> = InstId,
-    Parsed#{bridge_name => erlang:element(2, emqx_bridge_resource:parse_bridge_id(BridgeId))}.
+    {Type, Name} = emqx_bridge_resource:parse_bridge_id(BridgeId),
+    TypeBin = atom_to_binary(Type),
+    hocon_tconf:check_plain(
+        emqx_bridge_schema,
+        Conf,
+        #{atom_key => false, required => false}
+    ),
+    #{<<"bridges">> := #{TypeBin := #{Name := Parsed}}} = Conf,
+    Parsed.
 
 hocon_config(Args) ->
+    InstId = maps:get("instance_id", Args),
+    <<"bridge:", BridgeId/binary>> = InstId,
+    {_Type, Name} = emqx_bridge_resource:parse_bridge_id(BridgeId),
     AuthConf = maps:get("authentication", Args),
     AuthTemplate = iolist_to_binary(hocon_config_template_authentication(AuthConf)),
     AuthConfRendered = bbmustache:render(AuthTemplate, AuthConf),
@@ -567,6 +596,7 @@ hocon_config(Args) ->
         iolist_to_binary(hocon_config_template()),
         Args#{
             "authentication" => AuthConfRendered,
+            "bridge_name" => Name,
             "ssl" => SSLConfRendered
         }
     ),
@@ -575,22 +605,30 @@ hocon_config(Args) ->
 %% erlfmt-ignore
 hocon_config_template() ->
 """
-bootstrap_hosts = \"{{ kafka_hosts_string }}\"
-enable = true
-authentication = {{{ authentication }}}
-ssl = {{{ ssl }}}
-producer = {
-    mqtt {
-       topic = \"t/#\"
+bridges.kafka_producer.{{ bridge_name }} {
+  bootstrap_hosts = \"{{ kafka_hosts_string }}\"
+  enable = true
+  authentication = {{{ authentication }}}
+  ssl = {{{ ssl }}}
+  local_topic = \"{{ local_topic }}\"
+  kafka = {
+    message = {
+      key = \"${clientid}\"
+      value = \"${.payload}\"
+      timestamp = \"${timestamp}\"
     }
-    kafka = {
-        topic = \"{{ kafka_topic }}\"
-        message = {key = \"${clientid}\", value = \"${.payload}\"}
-        partition_strategy = {{ partition_strategy }}
-        buffer = {
-            memory_overload_protection = false
-        }
+    buffer = {
+      memory_overload_protection = false
     }
+    partition_strategy = {{ partition_strategy }}
+    topic = \"{{ kafka_topic }}\"
+  }
+  metadata_request_timeout = 5s
+  min_metadata_refresh_interval = 3s
+  socket_opts {
+    nodelay = true
+  }
+  connect_timeout = 5s
 }
 """.
 
@@ -631,22 +669,42 @@ hocon_config_template_ssl(_) ->
 """.
 
 kafka_hosts_string() ->
-    "kafka-1.emqx.net:9092,".
+    KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "kafka-1.emqx.net"),
+    KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"),
+    KafkaHost ++ ":" ++ KafkaPort ++ ",".
 
 kafka_hosts_string_sasl() ->
-    "kafka-1.emqx.net:9093,".
+    KafkaHost = os:getenv("KAFKA_SASL_PLAIN_HOST", "kafka-1.emqx.net"),
+    KafkaPort = os:getenv("KAFKA_SASL_PLAIN_PORT", "9093"),
+    KafkaHost ++ ":" ++ KafkaPort ++ ",".
 
 kafka_hosts_string_ssl() ->
-    "kafka-1.emqx.net:9094,".
+    KafkaHost = os:getenv("KAFKA_SSL_HOST", "kafka-1.emqx.net"),
+    KafkaPort = os:getenv("KAFKA_SSL_PORT", "9094"),
+    KafkaHost ++ ":" ++ KafkaPort ++ ",".
 
 kafka_hosts_string_ssl_sasl() ->
-    "kafka-1.emqx.net:9095,".
+    KafkaHost = os:getenv("KAFKA_SASL_SSL_HOST", "kafka-1.emqx.net"),
+    KafkaPort = os:getenv("KAFKA_SASL_SSL_PORT", "9095"),
+    KafkaHost ++ ":" ++ KafkaPort ++ ",".
+
+shared_secret_path() ->
+    os:getenv("CI_SHARED_SECRET_PATH", "/var/lib/secret").
+
+shared_secret(client_keyfile) ->
+    filename:join([shared_secret_path(), "client.key"]);
+shared_secret(client_certfile) ->
+    filename:join([shared_secret_path(), "client.crt"]);
+shared_secret(client_cacertfile) ->
+    filename:join([shared_secret_path(), "ca.crt"]);
+shared_secret(rig_keytab) ->
+    filename:join([shared_secret_path(), "rig.keytab"]).
 
 valid_ssl_settings() ->
     #{
-        "cacertfile" => <<"/var/lib/secret/ca.crt">>,
-        "certfile" => <<"/var/lib/secret/client.crt">>,
-        "keyfile" => <<"/var/lib/secret/client.key">>,
+        "cacertfile" => shared_secret(client_cacertfile),
+        "certfile" => shared_secret(client_certfile),
+        "keyfile" => shared_secret(client_keyfile),
         "enable" => <<"true">>
     }.
 
@@ -670,7 +728,7 @@ valid_sasl_scram512_settings() ->
 valid_sasl_kerberos_settings() ->
     #{
         "kerberos_principal" => "rig@KDC.EMQX.NET",
-        "kerberos_keytab_file" => "/var/lib/secret/rig.keytab"
+        "kerberos_keytab_file" => shared_secret(rig_keytab)
     }.
 
 kafka_hosts() ->
@@ -732,3 +790,17 @@ api_path(Parts) ->
 json(Data) ->
     {ok, Jsx} = emqx_json:safe_decode(Data, [return_maps]),
     Jsx.
+
+delete_all_bridges() ->
+    lists:foreach(
+        fun(#{name := Name, type := Type}) ->
+            emqx_bridge:remove(Type, Name)
+        end,
+        emqx_bridge:list()
+    ),
+    %% at some point during the tests, sometimes `emqx_bridge:list()'
+    %% returns an empty list, but `emqx:get_config([bridges])' returns
+    %% a bunch of orphan test bridges...
+    lists:foreach(fun emqx_resource:remove/1, emqx_resource:list_instances()),
+    emqx_config:put([bridges], #{}),
+    ok.

+ 1 - 1
mix.exs

@@ -135,7 +135,7 @@ defmodule EMQXUmbrella.MixProject do
       {:wolff, github: "kafka4beam/wolff", tag: "1.7.5"},
       {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.2", override: true},
       {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"},
-      {:brod, github: "kafka4beam/brod", tag: "3.16.7"},
+      {:brod, github: "kafka4beam/brod", tag: "3.16.8"},
       {:snappyer, "1.2.8", override: true},
       {:supervisor3, "1.1.11", override: true}
     ]