Преглед изворни кода

Merge pull request #8798 from zmstone/0815-feat-add-kafka-connector

feat: Add Kafka connector
Zaiming (Stone) Shi пре 3 година
родитељ
комит
f6ac4c3a76
28 измењених фајлова са 1988 додато и 36 уклоњено
  1. 73 0
      .ci/docker-compose-file/docker-compose-kafka.yaml
  2. 1 1
      .ci/docker-compose-file/docker-compose-python.yaml
  3. 6 0
      .ci/docker-compose-file/docker-compose.yaml
  4. 46 0
      .ci/docker-compose-file/kafka/generate-certs.sh
  5. 16 0
      .ci/docker-compose-file/kafka/jaas.conf
  6. 49 0
      .ci/docker-compose-file/kafka/run_add_scram_users.sh
  7. 23 0
      .ci/docker-compose-file/kerberos/krb5.conf
  8. 25 0
      .ci/docker-compose-file/kerberos/run.sh
  9. 1 1
      apps/emqx/rebar.config
  10. 9 0
      apps/emqx/test/emqx_common_test_helpers.erl
  11. 1 1
      apps/emqx_bridge/src/emqx_bridge_api.erl
  12. 2 2
      apps/emqx_bridge/src/emqx_bridge_resource.erl
  13. 24 6
      apps/emqx_resource/src/emqx_resource.erl
  14. 12 6
      apps/emqx_resource/src/emqx_resource_manager.erl
  15. 1 0
      lib-ee/emqx_ee_bridge/docker-ct
  16. 471 0
      lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf
  17. 4 0
      lib-ee/emqx_ee_bridge/rebar.config
  18. 2 1
      lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src
  19. 14 4
      lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl
  20. 273 0
      lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl
  21. 33 0
      lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl
  22. 270 0
      lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl
  23. 559 0
      lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl
  24. 3 1
      lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src
  25. 9 3
      mix.exs
  26. 2 2
      rebar.config
  27. 53 6
      scripts/ct/run.sh
  28. 6 2
      scripts/find-suites.sh

+ 73 - 0
.ci/docker-compose-file/docker-compose-kafka.yaml

@@ -0,0 +1,73 @@
+version: '3.9'
+
+services:
+  zookeeper:
+    image: wurstmeister/zookeeper
+    ports:
+      - "2181:2181"
+    container_name: zookeeper
+    hostname: zookeeper
+    networks:
+      emqx_bridge:
+  ssl_cert_gen:
+    image: fredrikhgrelland/alpine-jdk11-openssl
+    container_name: ssl_cert_gen
+    volumes:
+      - emqx-shared-secret:/var/lib/secret
+      - ./kafka/generate-certs.sh:/bin/generate-certs.sh
+    entrypoint: /bin/sh
+    command: /bin/generate-certs.sh
+  kdc:
+    hostname: kdc.emqx.net
+    image:  ghcr.io/emqx/emqx-builder/5.0-17:1.13.4-24.2.1-1-ubuntu20.04
+    container_name: kdc.emqx.net
+    networks:
+      emqx_bridge:
+    volumes:
+      - emqx-shared-secret:/var/lib/secret
+      - ./kerberos/krb5.conf:/etc/kdc/krb5.conf
+      - ./kerberos/krb5.conf:/etc/krb5.conf
+      - ./kerberos/run.sh:/usr/bin/run.sh
+    command: run.sh
+  kafka_1:
+    image: wurstmeister/kafka:2.13-2.7.0
+    ports:
+      - "9092:9092"
+      - "9093:9093"
+      - "9094:9094"
+      - "9095:9095"
+    container_name: kafka-1.emqx.net
+    hostname: kafka-1.emqx.net
+    depends_on:
+      - "kdc"
+      - "zookeeper"
+      - "ssl_cert_gen"
+    environment:
+      KAFKA_BROKER_ID: 1
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_LISTENERS: PLAINTEXT://:9092,SASL_PLAINTEXT://:9093,SSL://:9094,SASL_SSL://:9095
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1.emqx.net:9092,SASL_PLAINTEXT://kafka-1.emqx.net:9093,SSL://kafka-1.emqx.net:9094,SASL_SSL://kafka-1.emqx.net:9095
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL
+      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,GSSAPI
+      KAFKA_SASL_KERBEROS_SERVICE_NAME: kafka
+      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
+      KAFKA_JMX_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas.conf"
+      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
+      KAFKA_CREATE_TOPICS: test-topic-one-partition:1:1,test-topic-two-partitions:2:1,test-topic-three-partitions:3:1,
+      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
+      KAFKA_SSL_TRUSTSTORE_LOCATION: /var/lib/secret/kafka.truststore.jks
+      KAFKA_SSL_TRUSTSTORE_PASSWORD: password
+      KAFKA_SSL_KEYSTORE_LOCATION: /var/lib/secret/kafka.keystore.jks
+      KAFKA_SSL_KEYSTORE_PASSWORD: password
+      KAFKA_SSL_KEY_PASSWORD: password
+    networks:
+      emqx_bridge:
+    volumes:
+      - emqx-shared-secret:/var/lib/secret
+      - ./kafka/jaas.conf:/etc/kafka/jaas.conf
+      - ./kafka/run_add_scram_users.sh:/bin/run_add_scram_users.sh
+      - ./kerberos/krb5.conf:/etc/kdc/krb5.conf
+      - ./kerberos/krb5.conf:/etc/krb5.conf
+    command: run_add_scram_users.sh
+

+ 1 - 1
.ci/docker-compose-file/docker-compose-python.yaml

@@ -2,7 +2,7 @@ version: '3.9'
 
 services:
   python:
-    container_name: python 
+    container_name: python
     image: python:3.7.2-alpine3.9
     depends_on:
       - emqx1

+ 6 - 0
.ci/docker-compose-file/docker-compose.yaml

@@ -18,6 +18,9 @@ services:
       - emqx_bridge
     volumes:
       - ../..:/emqx
+      - emqx-shared-secret:/var/lib/secret
+      - ./kerberos/krb5.conf:/etc/kdc/krb5.conf
+      - ./kerberos/krb5.conf:/etc/krb5.conf
     working_dir: /emqx
     tty: true
 
@@ -33,3 +36,6 @@ networks:
           gateway: 172.100.239.1
         - subnet: 2001:3200:3200::/64
           gateway: 2001:3200:3200::1
+
+volumes:   # add this section
+  emqx-shared-secret:    # does not need anything underneath this

+ 46 - 0
.ci/docker-compose-file/kafka/generate-certs.sh

@@ -0,0 +1,46 @@
+#!/usr/bin/bash
+
+set -euo pipefail
+
+set -x
+
+# Source https://github.com/zmstone/docker-kafka/blob/master/generate-certs.sh
+
+HOST="*."
+DAYS=3650
+PASS="password"
+
+cd /var/lib/secret/
+
+# Delete old files
+(rm ca.key ca.crt server.key server.csr server.crt client.key client.csr client.crt server.p12 kafka.keystore.jks kafka.truststore.jks 2>/dev/null || true)
+
+ls
+
+echo '== Generate self-signed server and client certificates'
+echo '= generate CA'
+openssl req -new -x509 -keyout ca.key -out ca.crt -days $DAYS -nodes -subj "/C=SE/ST=Stockholm/L=Stockholm/O=brod/OU=test/CN=$HOST"
+
+echo '= generate server certificate request'
+openssl req -newkey rsa:2048 -sha256 -keyout server.key -out server.csr -days "$DAYS" -nodes -subj "/C=SE/ST=Stockholm/L=Stockholm/O=brod/OU=test/CN=$HOST"
+
+echo '= sign server certificate'
+openssl x509 -req -CA ca.crt -CAkey ca.key -in server.csr -out server.crt -days "$DAYS" -CAcreateserial
+
+echo '= generate client certificate request'
+openssl req -newkey rsa:2048 -sha256 -keyout client.key -out client.csr -days "$DAYS" -nodes -subj "/C=SE/ST=Stockholm/L=Stockholm/O=brod/OU=test/CN=$HOST"
+
+echo '== sign client certificate'
+openssl x509 -req -CA ca.crt -CAkey ca.key -in client.csr -out client.crt -days $DAYS -CAserial ca.srl
+
+echo '= Convert self-signed certificate to PKCS#12 format'
+openssl pkcs12 -export -name "$HOST" -in server.crt -inkey server.key -out server.p12 -CAfile ca.crt -passout pass:"$PASS"
+
+echo '= Import PKCS#12 into a java keystore'
+
+echo $PASS | keytool -importkeystore -destkeystore kafka.keystore.jks -srckeystore server.p12 -srcstoretype pkcs12 -alias "$HOST" -storepass "$PASS"
+
+
+echo '= Import CA into java truststore'
+
+echo yes | keytool -keystore kafka.truststore.jks -alias CARoot -import -file ca.crt -storepass "$PASS"

+ 16 - 0
.ci/docker-compose-file/kafka/jaas.conf

@@ -0,0 +1,16 @@
+KafkaServer {
+   org.apache.kafka.common.security.plain.PlainLoginModule required
+   user_admin="password"
+   user_emqxuser="password";
+
+   org.apache.kafka.common.security.scram.ScramLoginModule required
+   username="admin"
+   password="password";
+
+   com.sun.security.auth.module.Krb5LoginModule required
+   useKeyTab=true
+   storeKey=true
+   keyTab="/var/lib/secret/kafka.keytab"
+   principal="kafka/kafka-1.emqx.net@KDC.EMQX.NET";
+
+};

+ 49 - 0
.ci/docker-compose-file/kafka/run_add_scram_users.sh

@@ -0,0 +1,49 @@
+#!/usr/bin/env bash
+
+set -euo pipefail
+
+
+TIMEOUT=60
+
+echo "+++++++ Sleep for a while to make sure that old keytab and truststore is deleted ++++++++"
+
+sleep 5
+
+echo "+++++++ Wait until Kerberos Keytab is created ++++++++"
+
+timeout $TIMEOUT bash -c 'until [ -f /var/lib/secret/kafka.keytab ]; do sleep 1; done'
+
+
+echo "+++++++ Wait until SSL certs are generated ++++++++"
+
+timeout $TIMEOUT bash -c 'until [ -f /var/lib/secret/kafka.truststore.jks ]; do sleep 1; done'
+
+sleep 3
+
+echo "+++++++ Starting Kafka ++++++++"
+
+start-kafka.sh &
+
+SERVER=localhost
+PORT1=9092
+PORT2=9093
+TIMEOUT=60
+
+echo "+++++++ Wait until Kafka ports are up ++++++++"
+
+# shellcheck disable=SC2016
+timeout $TIMEOUT bash -c 'until printf "" 2>>/dev/null >>/dev/tcp/$0/$1; do sleep 1; done' $SERVER $PORT1
+
+# shellcheck disable=SC2016
+timeout $TIMEOUT bash -c 'until printf "" 2>>/dev/null >>/dev/tcp/$0/$1; do sleep 1; done' $SERVER $PORT2
+
+echo "+++++++ Run config commands ++++++++"
+
+kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=password],SCRAM-SHA-512=[password=password]' --entity-type users --entity-name emqxuser
+
+echo "+++++++ Wait until Kafka ports are down ++++++++"
+
+bash -c 'while printf "" 2>>/dev/null >>/dev/tcp/$0/$1; do sleep 1; done' $SERVER $PORT1
+
+echo "+++++++ Kafka ports are down ++++++++"
+

+ 23 - 0
.ci/docker-compose-file/kerberos/krb5.conf

@@ -0,0 +1,23 @@
+[libdefaults]
+  default_realm = KDC.EMQX.NET
+  ticket_lifetime = 24h
+  renew_lifetime = 7d
+  forwardable = true
+  rdns = false
+  dns_lookup_kdc   = no
+  dns_lookup_realm = no
+
+[realms]
+  KDC.EMQX.NET = {
+    kdc = kdc
+    admin_server = kadmin
+  }
+
+[domain_realm]
+  kdc.emqx.net = KDC.EMQX.NET
+  .kdc.emqx.net = KDC.EMQX.NET
+
+[logging]
+  kdc = FILE:/var/log/kerberos/krb5kdc.log
+  admin_server = FILE:/var/log/kerberos/kadmin.log
+  default = FILE:/var/log/kerberos/krb5lib.log

+ 25 - 0
.ci/docker-compose-file/kerberos/run.sh

@@ -0,0 +1,25 @@
+#!/bin/sh
+
+
+echo "Remove old keytabs"
+
+rm -f /var/lib/secret/kafka.keytab > /dev/null 2>&1
+rm -f /var/lib/secret/rig.keytab > /dev/null 2>&1
+
+echo "Create realm"
+
+kdb5_util -P emqx -r KDC.EMQX.NET create -s
+
+echo "Add principals"
+
+kadmin.local -w password -q "add_principal -randkey kafka/kafka-1.emqx.net@KDC.EMQX.NET"
+kadmin.local -w password -q "add_principal -randkey rig@KDC.EMQX.NET"  > /dev/null
+
+
+echo "Create keytabs"
+
+kadmin.local -w password -q "ktadd  -k /var/lib/secret/kafka.keytab -norandkey kafka/kafka-1.emqx.net@KDC.EMQX.NET " > /dev/null
+kadmin.local -w password -q "ktadd  -k /var/lib/secret/rig.keytab -norandkey rig@KDC.EMQX.NET " > /dev/null
+
+echo STARTING KDC
+/usr/sbin/krb5kdc -n

+ 1 - 1
apps/emqx/rebar.config

@@ -22,7 +22,7 @@
 %% This rebar.config is necessary because the app may be used as a
 %% `git_subdir` dependency in other projects.
 {deps, [
-    {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}},
+    {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}},
     {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}},
     {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
     {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}},

+ 9 - 0
apps/emqx/test/emqx_common_test_helpers.erl

@@ -32,6 +32,7 @@
     stop_apps/1,
     reload/2,
     app_path/2,
+    proj_root/0,
     deps_path/2,
     flush/0,
     flush/1
@@ -245,6 +246,14 @@ stop_apps(Apps) ->
     [application:stop(App) || App <- Apps ++ [emqx, ekka, mria, mnesia]],
     ok.
 
+proj_root() ->
+    filename:join(
+        lists:takewhile(
+            fun(X) -> iolist_to_binary(X) =/= <<"_build">> end,
+            filename:split(app_path(emqx, "."))
+        )
+    ).
+
 %% backward compatible
 deps_path(App, RelativePath) -> app_path(App, RelativePath).
 

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -92,7 +92,7 @@ param_path_operation_cluster() ->
             #{
                 in => path,
                 required => true,
-                example => <<"start">>,
+                example => <<"restart">>,
                 desc => ?DESC("desc_param_path_operation_cluster")
             }
         )}.

+ 2 - 2
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -44,7 +44,7 @@
 ]).
 
 %% bi-directional bridge with producer/consumer or ingress/egress configs
--define(IS_BI_DIR_BRIDGE(TYPE), TYPE == <<"mqtt">>; TYPE == <<"kafka">>).
+-define(IS_BI_DIR_BRIDGE(TYPE), TYPE =:= <<"mqtt">>; TYPE =:= <<"kafka">>).
 
 -if(?EMQX_RELEASE_EDITION == ee).
 bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
@@ -261,7 +261,7 @@ parse_confs(Type, Name, Conf) when ?IS_BI_DIR_BRIDGE(Type) ->
     %% hookpoint. The underlying driver will run `emqx_hooks:run/3` when it
     %% receives a message from the external database.
     BName = bridge_id(Type, Name),
-    Conf#{hookpoint => <<"$bridges/", BName/binary>>};
+    Conf#{hookpoint => <<"$bridges/", BName/binary>>, bridge_name => Name};
 parse_confs(_Type, _Name, Conf) ->
     Conf.
 

+ 24 - 6
apps/emqx_resource/src/emqx_resource.erl

@@ -93,7 +93,8 @@
     %% verify if the resource is working normally
     call_health_check/3,
     %% stop the instance
-    call_stop/3
+    call_stop/3,
+    is_buffer_supported/1
 ]).
 
 %% list all the instances, id only.
@@ -117,7 +118,8 @@
     on_batch_query/3,
     on_query_async/4,
     on_batch_query_async/4,
-    on_get_status/2
+    on_get_status/2,
+    is_buffer_supported/0
 ]).
 
 %% when calling emqx_resource:start/1
@@ -155,6 +157,8 @@
     | {resource_status(), resource_state()}
     | {resource_status(), resource_state(), term()}.
 
+-callback is_buffer_supported() -> boolean().
+
 -spec list_types() -> [module()].
 list_types() ->
     discover_resource_mods().
@@ -256,10 +260,15 @@ query(ResId, Request) ->
     Result :: term().
 query(ResId, Request, Opts) ->
     case emqx_resource_manager:ets_lookup(ResId) of
-        {ok, _Group, #{query_mode := QM}} ->
-            case QM of
-                sync -> emqx_resource_worker:sync_query(ResId, Request, Opts);
-                async -> emqx_resource_worker:async_query(ResId, Request, Opts)
+        {ok, _Group, #{query_mode := QM, mod := Module}} ->
+            IsBufferSupported = is_buffer_supported(Module),
+            case {IsBufferSupported, QM} of
+                {true, _} ->
+                    emqx_resource_worker:simple_sync_query(ResId, Request);
+                {false, sync} ->
+                    emqx_resource_worker:sync_query(ResId, Request, Opts);
+                {false, async} ->
+                    emqx_resource_worker:async_query(ResId, Request, Opts)
             end;
         {error, not_found} ->
             ?RESOURCE_ERROR(not_found, "resource not found")
@@ -336,6 +345,15 @@ list_group_instances(Group) -> emqx_resource_manager:list_group(Group).
 get_callback_mode(Mod) ->
     Mod:callback_mode().
 
+-spec is_buffer_supported(module()) -> boolean().
+is_buffer_supported(Module) ->
+    try
+        Module:is_buffer_supported()
+    catch
+        _:_ ->
+            false
+    end.
+
 -spec call_start(manager_id(), module(), resource_config()) ->
     {ok, resource_state()} | {error, Reason :: term()}.
 call_start(MgrId, Mod, Config) ->

+ 12 - 6
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -148,14 +148,20 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
         ],
         [matched]
     ),
-    ok = emqx_resource_worker_sup:start_workers(ResId, Opts),
-    case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
+    case emqx_resource:is_buffer_supported(ResourceType) of
         true ->
-            wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT));
+            %% the resource it self supports
+            %% buffer, so there is no need for resource workers
+            ok;
         false ->
-            ok
-    end,
-    ok.
+            ok = emqx_resource_worker_sup:start_workers(ResId, Opts),
+            case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
+                true ->
+                    wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT));
+                false ->
+                    ok
+            end
+    end.
 
 %% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance.
 %%

+ 1 - 0
lib-ee/emqx_ee_bridge/docker-ct

@@ -1,2 +1,3 @@
 mongo
 mongo_rs_sharded
+kafka

+ 471 - 0
lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf

@@ -0,0 +1,471 @@
+emqx_ee_bridge_kafka {
+    config_enable {
+        desc {
+            en: "Enable (true) or disable (false) this Kafka bridge."
+            zh: "启用(true)或停用该(false)Kafka 数据桥接。"
+        }
+        label {
+            en: "Enable or Disable"
+            zh: "启用或停用"
+        }
+    }
+    desc_config {
+        desc {
+            en: """Configuration for a Kafka bridge."""
+            zh: """Kafka 桥接配置"""
+        }
+        label {
+            en: "Kafka Bridge Configuration"
+            zh: "Kafka 桥接配置"
+        }
+    }
+    desc_type {
+        desc {
+            en: """The Bridge Type"""
+            zh: """桥接类型"""
+        }
+        label {
+            en: "Bridge Type"
+            zh: "桥接类型"
+        }
+    }
+    desc_name {
+        desc {
+            en: """Bridge name, used as a human-readable description of the bridge."""
+            zh: """桥接名字,可读描述"""
+        }
+        label {
+            en: "Bridge Name"
+            zh: "桥接名字"
+        }
+    }
+    producer_opts {
+        desc {
+            en: "Local MQTT data source and Kafka bridge configs."
+            zh: "本地 MQTT 数据源和 Kafka 桥接的配置。"
+        }
+        label {
+            en: "MQTT to Kafka"
+            zh: "MQTT 到 Kafka"
+        }
+    }
+    producer_mqtt_opts {
+        desc {
+            en: "MQTT data source. Optional when used as a rule-engine action."
+            zh: "需要桥接到 MQTT 源主题。"
+        }
+        label {
+            en: "MQTT Source Topic"
+            zh: "MQTT 源主题"
+        }
+    }
+    mqtt_topic {
+        desc {
+            en: "MQTT topic or topic as data source (bridge input)."
+            zh: "指定 MQTT 主题作为桥接的数据源"
+        }
+        label {
+            en: "Source MQTT Topic"
+            zh: "源 MQTT 主题"
+        }
+    }
+    producer_kafka_opts {
+        desc {
+            en: "Kafka producer configs."
+            zh: "Kafka 生产者参数。"
+        }
+        label {
+            en: "Kafka Producer"
+            zh: "生产者参数"
+        }
+    }
+    bootstrap_hosts {
+        desc {
+            en: "A comma separated list of Kafka <code>host:port</code> endpoints to bootstrap the client."
+            zh: "用逗号分隔的 <code>host:port</code> 主机列表。"
+        }
+        label {
+            en: "Bootstrap Hosts"
+            zh: "主机列表"
+        }
+    }
+    connect_timeout {
+        desc {
+            en: "Maximum wait time for TCP connection establishment (including authentication time if enabled)."
+            zh: "建立 TCP 连接时的最大等待时长(若启用认证,这个等待时长也包含完成认证所需时间)。"
+        }
+        label {
+            en: "Connect Timeout"
+            zh: "连接超时"
+        }
+    }
+    min_metadata_refresh_interval {
+        desc {
+            en: "Minimum time interval the client has to wait before refreshing Kafka broker and topic metadata. "
+                "Setting too small value may add extra load on Kafka."
+            zh: "刷新 Kafka broker 和 Kafka 主题元数据段最短时间间隔。设置太小可能会增加 Kafka 压力。"
+        }
+        label {
+            en: "Min Metadata Refresh Interval"
+            zh: "元数据刷新最小间隔"
+        }
+    }
+    metadata_request_timeout {
+        desc {
+            en: "Maximum wait time when fetching metadata from Kafka."
+            zh: "刷新元数据时最大等待时长。"
+        }
+        label {
+            en: "Metadata Request Timeout"
+            zh: "元数据请求超时"
+        }
+    }
+    authentication {
+        desc {
+            en: "Authentication configs."
+            zh: "认证参数。"
+        }
+        label {
+            en: "Authentication"
+            zh: "认证"
+        }
+    }
+    socket_opts {
+        desc {
+            en: "Extra socket options."
+            zh: "更多 Socket 参数设置。"
+        }
+        label {
+            en: "Socket Options"
+            zh: "Socket 参数"
+        }
+    }
+    auth_sasl_mechanism {
+        desc {
+            en: "SASL authentication mechanism."
+            zh: "SASL 认证方法名称。"
+        }
+        label {
+            en: "Mechanism"
+            zh: "认证方法"
+        }
+    }
+    auth_sasl_username {
+        desc {
+            en: "SASL authentication username."
+            zh: "SASL 认证的用户名。"
+        }
+        label {
+            en: "Username"
+            zh: "用户名"
+        }
+    }
+    auth_sasl_password {
+        desc {
+            en: "SASL authentication password."
+            zh: "SASL 认证的密码。"
+        }
+        label {
+            en: "Password"
+            zh: "密码"
+        }
+    }
+    auth_kerberos_principal {
+        desc {
+            en: "SASL GSSAPI authentication Kerberos principal. "
+                "For example <code>client_name@MY.KERBEROS.REALM.MYDOMAIN.COM</code>, "
+                "NOTE: The realm in use has to be configured in /etc/krb5.conf in EMQX nodes."
+            zh: "SASL GSSAPI 认证方法的 Kerberos principal,"
+                "例如 <code>client_name@MY.KERBEROS.REALM.MYDOMAIN.COM</code>"
+                "注意:这里使用的 realm 需要配置在 EMQX 服务器的 /etc/krb5.conf 中"
+        }
+        label {
+            en: "Kerberos Principal"
+            zh: "Kerberos Principal"
+        }
+    }
+    auth_kerberos_keytab_file {
+        desc {
+            en: "SASL GSSAPI authentication Kerberos keytab file path. "
+                "NOTE: This file has to be placed in EMQX nodes, and the EMQX service runner user requires read permission."
+            zh: "SASL GSSAPI 认证方法的 Kerberos keytab 文件。"
+                "注意:该文件需要上传到 EMQX 服务器中,且运行 EMQX 服务的系统账户需要有读取权限。"
+        }
+        label {
+            en: "Kerberos keytab file"
+            zh: "Kerberos keytab 文件"
+        }
+    }
+    socket_send_buffer {
+        desc {
+            en: "Fine tune the socket send buffer. The default value is tuned for high throughput."
+            zh: "TCP socket 的发送缓存调优。默认值是针对高吞吐量的一个推荐值。"
+        }
+        label {
+            en: "Socket Send Buffer Size"
+            zh: "Socket 发送缓存大小"
+        }
+    }
+    socket_receive_buffer {
+        desc {
+            en: "Fine tune the socket receive buffer. The default value is tuned for high throughput."
+            zh: "TCP socket 的收包缓存调优。默认值是针对高吞吐量的一个推荐值。"
+        }
+        label {
+            en: "Socket Receive Buffer Size"
+            zh: "Socket 收包缓存大小"
+        }
+    }
+    socket_nodelay {
+        desc {
+            en: "When set to 'true', TCP buffer sent as soon as possible. "
+                "Otherwise, the OS kernel may buffer small TCP packets for a while (40 ms by default)."
+            zh: "设置 ‘true' 让系统内核立即发送。否则当需要发送当内容很少时,可能会有一定延迟(默认 40 毫秒)。"
+        }
+        label {
+            en: "No Delay"
+            zh: "是否延迟发送"
+        }
+    }
+    kafka_topic {
+        desc {
+            en: "Kafka topic name"
+            zh: "Kafka 主题名称"
+        }
+        label {
+            en: "Kafka Topic Name"
+            zh: "Kafka 主题名称"
+        }
+    }
+    kafka_message {
+        desc {
+            en: "Template to render a Kafka message."
+            zh: "用于生成 Kafka 消息的模版。"
+        }
+        label {
+            en: "Kafka Message Template"
+            zh: "Kafka 消息模版"
+        }
+    }
+    kafka_message_key {
+        desc {
+            en: "Template to render Kafka message key. "
+                "If the desired variable for this template is not found in the input data "
+                "<code>NULL</code> is used."
+            zh: "生成 Kafka 消息 Key 的模版。当所需要的输入没有时,会使用 <code>NULL</code>。"
+        }
+        label {
+            en: "Message Key"
+            zh: "消息的 Key"
+        }
+    }
+    kafka_message_value {
+        desc {
+            en: "Template to render Kafka message value. "
+                "If the desired variable for this template is not found in the input data "
+                "<code>NULL</code> is used."
+            zh: "生成 Kafka 消息 Value 的模版。当所需要的输入没有时,会使用 <code>NULL</code>。"
+        }
+        label {
+            en: "Message Value"
+            zh: "消息的 Value"
+        }
+    }
+    kafka_message_timestamp {
+        desc {
+            en: "Which timestamp to use. "
+                "The timestamp is expected to be a millisecond precision Unix epoch "
+                "which can be in string format, e.g. <code>1661326462115</code> or "
+                "<code>'1661326462115'</code>. "
+                "When the desired data field for this template is not found, "
+                "or if the found data is not a valid integer, "
+                "the current system timestamp will be used."
+            zh: "生成 Kafka 消息时间戳的模版。"
+                "该时间必需是一个整型数值(可以是字符串格式)例如 <code>1661326462115</code> "
+                "或 <code>'1661326462115'</code>。"
+                "当所需的输入字段不存在,或不是一个整型时,"
+                "则会使用当前系统时间。"
+        }
+        label {
+            en: "Message Timestamp"
+            zh: "消息的时间戳"
+        }
+    }
+    max_batch_bytes {
+        desc {
+            en: "Maximum bytes to collect in a Kafka message batch. "
+                "Most of the Kafka brokers default to a limit of 1 MB batch size. "
+                "EMQX's default value is less than 1 MB in order to compensate "
+                "Kafka message encoding overheads (especially when each individual message is very small). "
+                "When a single message is over the limit, it is still sent (as a single element batch)."
+            zh: "最大消息批量字节数。"
+                "大多数 Kafka 环境的默认最低值是 1 MB,EMQX 的默认值比 1 MB 更小是因为需要"
+                "补偿 Kafka 消息编码索需要的额外字节(尤其是当每条消息都很小的情况下)。"
+                "当单个消息的大小超过该限制时,它仍然会被发送,(相当于该批量中只有单个消息)。"
+        }
+        label {
+            en: "Max Batch Bytes"
+            zh: "最大批量字节数"
+        }
+    }
+    compression {
+        desc {
+            en: "Compression method."
+            zh: "压缩方法。"
+        }
+        label {
+            en: "Compression"
+            zh: "压缩"
+        }
+    }
+    partition_strategy {
+        desc {
+            en: "Partition strategy is to tell the producer how to dispatch messages to Kafka partitions.\n\n"
+                "<code>random</code>: Randomly pick a partition for each message\n"
+                "<code>key_dispatch</code>: Hash Kafka message key to a partition number\n"
+            zh: "设置消息发布时应该如何选择 Kafka 分区。\n\n"
+                "<code>random</code>: 为每个消息随机选择一个分区。\n"
+                "<code>key_dispatch</code>: Hash Kafka message key to a partition number\n"
+        }
+        label {
+            en: "Partition Strategy"
+            zh: "分区选择策略"
+        }
+    }
+    required_acks {
+        desc {
+            en: "Required acknowledgements for Kafka partition leader to wait for its followers "
+                "before it sends back the acknowledgement to EMQX Kafka producer\n\n"
+                "<code>all_isr</code>: Require all in-sync replicas to acknowledge.\n"
+                "<code>leader_only</code>: Require only the partition-leader's acknowledgement.\n"
+                "<code>none</code>: No need for Kafka to acknowledge at all.\n"
+            zh: "设置 Kafka leader 在返回给 EMQX 确认之前需要等待多少个 follower 的确认。\n\n"
+                "<code>all_isr</code>: 需要所有的在线复制者都确认。\n"
+                "<code>leader_only</code>: 仅需要分区 leader 确认。\n"
+                "<code>none</code>: 无需 Kafka 回复任何确认。\n"
+        }
+        label {
+            en: "Required Acks"
+            zh: "Kafka 确认数量"
+        }
+    }
+    partition_count_refresh_interval {
+        desc {
+            en: "The time interval for Kafka producer to discover increased number of partitions.\n"
+                "After the number of partitions is increased in Kafka, EMQX will start taking the \n"
+                "discovered partitions into account when dispatching messages per <code>partition_strategy</code>."
+            zh: "配置 Kafka 刷新分区数量的时间间隔。\n"
+                "EMQX 发现 Kafka 分区数量增加后,会开始按 <code>partition_strategy<code> 配置,把消息发送到新的分区中。"
+        }
+        label {
+            en: "Partition Count Refresh Interval"
+            zh: "分区数量刷新间隔"
+        }
+    }
+    max_inflight {
+        desc {
+            en: "Maximum number of batches allowed for Kafka producer (per-partition) to send before receiving acknowledgement from Kafka. "
+                "Greater value typically means better throughput. However, there can be a risk of message reordering when this "
+                "value is greater than 1."
+            zh: "设置 Kafka 生产者(每个分区一个)在收到 Kafka 的确认前最多发送多少个请求(批量)。"
+                "调大这个值通常可以增加吞吐量,但是,当该值设置大于 1 是存在消息乱序的风险。"
+        }
+        label {
+            en: "Max Inflight"
+            zh: "飞行窗口"
+        }
+    }
+    producer_buffer {
+        desc {
+            en: "Configure producer message buffer.\n\n"
+                "Tell Kafka producer how to buffer messages when EMQX has more messages to send than "
+                "Kafka can keep up, or when Kafka is down.\n\n"
+            zh: "配置消息缓存的相关参数。\n\n"
+                "当 EMQX 需要发送的消息超过 Kafka 处理能力,或者当 Kafka 临时下线时,EMQX 内部会将消息缓存起来。"
+        }
+        label {
+            en: "Message Buffer"
+            zh: "消息缓存"
+        }
+    }
+    buffer_mode {
+        desc {
+            en: "Message buffer mode.\n\n"
+                "<code>memory</code>: Buffer all messages in memory. The messages will be lost in case of EMQX node restart\n"
+                "<code>disc</code>: Buffer all messages on disk. The messages on disk are able to survive EMQX node restart.\n"
+                "<code>hybrid</code>: Buffer message in memory first, when up to certain limit "
+                "(see <code>segment_bytes</code> config for more information), then start offloading "
+                "messages to disk, Like <code>memory</code> mode, the messages will be lost in case of "
+                "EMQX node restart."
+            zh: "消息缓存模式。\n"
+                "<code>memory</code>: 所有的消息都缓存在内存里。如果 EMQX 服务重启,缓存的消息会丢失。\n"
+                "<code>disc</code>: 缓存到磁盘上。EMQX 重启后会继续发送重启前未发送完成的消息。\n"
+                "<code>hybrid</code>: 先将消息缓存在内存中,当内存中的消息堆积超过一定限制"
+                "(配置项 <code>segment_bytes</code> 描述了该限制)后,后续的消息会缓存到磁盘上。"
+                "与 <code>memory</code> 模式一样,如果 EMQX 服务重启,缓存的消息会丢失。"
+        }
+        label {
+            en: "Buffer Mode"
+            zh: "缓存模式"
+        }
+    }
+    buffer_per_partition_limit {
+        desc {
+            en: "Number of bytes allowed to buffer for each Kafka partition. "
+                "When this limit is exceeded, old messages will be dropped in a trade for credits "
+                "for new messages to be buffered."
+            zh: "为每个 Kafka 分区设置的最大缓存字节数。当超过这个上限之后,老的消息会被丢弃,"
+                "为新的消息腾出空间。"
+        }
+        label {
+            en: "Per-partition Buffer Limit"
+            zh: "Kafka 分区缓存上限"
+        }
+    }
+    buffer_segment_bytes {
+        desc {
+            en: "Applicable when buffer mode is set to <code>disk</code> or <code>hybrid</code>.\n"
+                "This value is to specify the size of each on-disk buffer file."
+            zh: "当缓存模式是 <code>disk</code> 或 <code>hybrid</code> 时适用。"
+                "该配置用于指定缓存到磁盘上的文件的大小。"
+        }
+        label {
+            en: "Segment File Bytes"
+            zh: "缓存文件大小"
+        }
+    }
+    buffer_memory_overload_protection {
+        desc {
+            en: "Applicable when buffer mode is set to <code>memory</code> or <code>hybrid</code>.\n"
+                "EMQX will drop old cached messages under high memory pressure. "
+                "The high memory threshold is defined in config <code>sysmon.os.sysmem_high_watermark</code>."
+            zh: "缓存模式是 <code>memory</code> 或 <code>hybrid</code> 时适用。"
+                "当系统处于高内存压力时,从队列中丢弃旧的消息以减缓内存增长。"
+                "内存压力值由配置项 <code>sysmon.os.sysmem_high_watermark</code> 决定。"
+        }
+        label {
+            en: "Memory Overload Protection"
+            zh: "内存过载保护"
+        }
+    }
+    auth_username_password {
+        desc {
+            en: "Username/password based authentication."
+            zh: "基于用户名密码的认证。"
+        }
+        label {
+            en: "Username/password Auth"
+            zh: "用户名密码认证"
+        }
+    }
+    auth_gssapi_kerberos {
+        desc {
+            en: "Use GSSAPI/Kerberos authentication."
+            zh: "使用 GSSAPI/Kerberos 认证。"
+        }
+        label {
+            en: "GSSAPI/Kerberos"
+            zh: "GSSAPI/Kerberos"
+        }
+    }
+}

+ 4 - 0
lib-ee/emqx_ee_bridge/rebar.config

@@ -1,5 +1,9 @@
 {erl_opts, [debug_info]}.
 {deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}}
+       , {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.6.4"}}}
+       , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.0"}}}
+       , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}}
+       , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.4"}}}
        , {emqx_connector, {path, "../../apps/emqx_connector"}}
        , {emqx_resource, {path, "../../apps/emqx_resource"}}
        , {emqx_bridge, {path, "../../apps/emqx_bridge"}}

+ 2 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src

@@ -3,7 +3,8 @@
     {registered, []},
     {applications, [
         kernel,
-        stdlib
+        stdlib,
+        emqx_ee_connector
     ]},
     {env, []},
     {modules, []},

+ 14 - 4
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl

@@ -14,6 +14,7 @@
 
 api_schemas(Method) ->
     [
+        ref(emqx_ee_bridge_kafka, Method),
         ref(emqx_ee_bridge_mysql, Method),
         ref(emqx_ee_bridge_mongodb, Method ++ "_rs"),
         ref(emqx_ee_bridge_mongodb, Method ++ "_sharded"),
@@ -26,6 +27,7 @@ api_schemas(Method) ->
 
 schema_modules() ->
     [
+        emqx_ee_bridge_kafka,
         emqx_ee_bridge_hstreamdb,
         emqx_ee_bridge_influxdb,
         emqx_ee_bridge_mongodb,
@@ -45,6 +47,7 @@ examples(Method) ->
     lists:foldl(Fun, #{}, schema_modules()).
 
 resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8));
+resource_type(kafka) -> emqx_bridge_impl_kafka;
 resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb;
 resource_type(mongodb_rs) -> emqx_connector_mongo;
 resource_type(mongodb_sharded) -> emqx_connector_mongo;
@@ -56,6 +59,11 @@ resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb.
 
 fields(bridges) ->
     [
+        {kafka,
+            mk(
+                hoconsc:map(name, ref(emqx_ee_bridge_kafka, "config")),
+                #{desc => <<"EMQX Enterprise Config">>}
+            )},
         {hstreamdb,
             mk(
                 hoconsc:map(name, ref(emqx_ee_bridge_hstreamdb, "config")),
@@ -66,8 +74,9 @@ fields(bridges) ->
                 hoconsc:map(name, ref(emqx_ee_bridge_mysql, "config")),
                 #{desc => <<"EMQX Enterprise Config">>}
             )}
-    ] ++ fields(mongodb) ++ fields(influxdb);
-fields(mongodb) ->
+    ] ++ mongodb_structs() ++ influxdb_structs().
+
+mongodb_structs() ->
     [
         {Type,
             mk(
@@ -75,8 +84,9 @@ fields(mongodb) ->
                 #{desc => <<"EMQX Enterprise Config">>}
             )}
      || Type <- [mongodb_rs, mongodb_sharded, mongodb_single]
-    ];
-fields(influxdb) ->
+    ].
+
+influxdb_structs() ->
     [
         {Protocol,
             mk(

+ 273 - 0
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl

@@ -0,0 +1,273 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_ee_bridge_kafka).
+
+-include_lib("emqx_bridge/include/emqx_bridge.hrl").
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+%% allow atoms like scram_sha_256 and scram_sha_512
+%% i.e. the _256 part does not start with a-z
+-elvis([
+    {elvis_style, atom_naming_convention, #{
+        regex => "^([a-z][a-z0-9]*_?)([a-z0-9]*_?)*$",
+        enclosed_atoms => ".*"
+    }}
+]).
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+-export([
+    conn_bridge_examples/1
+]).
+
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1
+]).
+
+%% -------------------------------------------------------------------------------------------------
+%% api
+
+conn_bridge_examples(Method) ->
+    [
+        #{
+            <<"kafka">> => #{
+                summary => <<"Kafka Bridge">>,
+                value => values(Method)
+            }
+        }
+    ].
+
+values(get) ->
+    maps:merge(values(post), ?METRICS_EXAMPLE);
+values(post) ->
+    #{
+        bootstrap_hosts => <<"localhost:9092">>
+    };
+values(put) ->
+    values(post).
+
+%% -------------------------------------------------------------------------------------------------
+%% Hocon Schema Definitions
+
+namespace() -> "bridge_kafka".
+
+roots() -> ["config"].
+
+fields("post") ->
+    [type_field(), name_field() | fields("config")];
+fields("put") ->
+    fields("config");
+fields("get") ->
+    emqx_bridge_schema:metrics_status_fields() ++ fields("post");
+fields("config") ->
+    [
+        {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
+        {bootstrap_hosts, mk(binary(), #{required => true, desc => ?DESC(bootstrap_hosts)})},
+        {connect_timeout,
+            mk(emqx_schema:duration_ms(), #{
+                default => "5s",
+                desc => ?DESC(connect_timeout)
+            })},
+        {min_metadata_refresh_interval,
+            mk(
+                emqx_schema:duration_ms(),
+                #{
+                    default => "3s",
+                    desc => ?DESC(min_metadata_refresh_interval)
+                }
+            )},
+        {metadata_request_timeout,
+            mk(emqx_schema:duration_ms(), #{
+                default => "5s",
+                desc => ?DESC(metadata_request_timeout)
+            })},
+        {authentication,
+            mk(hoconsc:union([none, ref(auth_username_password), ref(auth_gssapi_kerberos)]), #{
+                default => none, desc => ?DESC("authentication")
+            })},
+        {producer, mk(hoconsc:union([none, ref(producer_opts)]), #{desc => ?DESC(producer_opts)})},
+        %{consumer, mk(hoconsc:union([none, ref(consumer_opts)]), #{desc => ?DESC(consumer_opts)})},
+        {socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})}
+    ] ++ emqx_connector_schema_lib:ssl_fields();
+fields(auth_username_password) ->
+    [
+        {mechanism,
+            mk(enum([plain, scram_sha_256, scram_sha_512]), #{
+                required => true, desc => ?DESC(auth_sasl_mechanism)
+            })},
+        {username, mk(binary(), #{required => true, desc => ?DESC(auth_sasl_username)})},
+        {password,
+            mk(binary(), #{required => true, sensitive => true, desc => ?DESC(auth_sasl_password)})}
+    ];
+fields(auth_gssapi_kerberos) ->
+    [
+        {kerberos_principal,
+            mk(binary(), #{
+                required => true,
+                desc => ?DESC(auth_kerberos_principal)
+            })},
+        {kerberos_keytab_file,
+            mk(binary(), #{
+                required => true,
+                desc => ?DESC(auth_kerberos_keytab_file)
+            })}
+    ];
+fields(socket_opts) ->
+    [
+        {sndbuf,
+            mk(
+                emqx_schema:bytesize(),
+                #{default => "1024KB", desc => ?DESC(socket_send_buffer)}
+            )},
+        {recbuf,
+            mk(
+                emqx_schema:bytesize(),
+                #{default => "1024KB", desc => ?DESC(socket_receive_buffer)}
+            )},
+        {nodelay,
+            mk(
+                boolean(),
+                #{default => true, desc => ?DESC(socket_nodelay)}
+            )}
+    ];
+fields(producer_opts) ->
+    [
+        {mqtt, mk(ref(producer_mqtt_opts), #{desc => ?DESC(producer_mqtt_opts)})},
+        {kafka,
+            mk(ref(producer_kafka_opts), #{
+                required => true,
+                desc => ?DESC(producer_kafka_opts)
+            })}
+    ];
+fields(producer_mqtt_opts) ->
+    [{topic, mk(string(), #{desc => ?DESC(mqtt_topic)})}];
+fields(producer_kafka_opts) ->
+    [
+        {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})},
+        {message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})},
+        {max_batch_bytes,
+            mk(emqx_schema:bytesize(), #{default => "896KB", desc => ?DESC(max_batch_bytes)})},
+        {compression,
+            mk(enum([no_compression, snappy, gzip]), #{
+                default => no_compression, desc => ?DESC(compression)
+            })},
+        {partition_strategy,
+            mk(
+                enum([random, key_dispatch]),
+                #{default => random, desc => ?DESC(partition_strategy)}
+            )},
+        {required_acks,
+            mk(
+                enum([all_isr, leader_only, none]),
+                #{
+                    default => all_isr,
+                    desc => ?DESC(required_acks)
+                }
+            )},
+        {partition_count_refresh_interval,
+            mk(
+                emqx_schema:duration_s(),
+                #{
+                    default => "60s",
+                    desc => ?DESC(partition_count_refresh_interval)
+                }
+            )},
+        {max_inflight,
+            mk(
+                pos_integer(),
+                #{
+                    default => 10,
+                    desc => ?DESC(max_inflight)
+                }
+            )},
+        {buffer,
+            mk(ref(producer_buffer), #{
+                required => false,
+                desc => ?DESC(producer_buffer)
+            })}
+    ];
+fields(kafka_message) ->
+    [
+        {key, mk(string(), #{default => "${clientid}", desc => ?DESC(kafka_message_key)})},
+        {value, mk(string(), #{default => "${payload}", desc => ?DESC(kafka_message_value)})},
+        {timestamp,
+            mk(string(), #{
+                default => "${timestamp}", desc => ?DESC(kafka_message_timestamp)
+            })}
+    ];
+fields(producer_buffer) ->
+    [
+        {mode,
+            mk(
+                enum([memory, disk, hybrid]),
+                #{default => memory, desc => ?DESC(buffer_mode)}
+            )},
+        {per_partition_limit,
+            mk(
+                emqx_schema:bytesize(),
+                #{default => "2GB", desc => ?DESC(buffer_per_partition_limit)}
+            )},
+        {segment_bytes,
+            mk(
+                emqx_schema:bytesize(),
+                #{default => "100MB", desc => ?DESC(buffer_segment_bytes)}
+            )},
+        {memory_overload_protection,
+            mk(boolean(), #{
+                %% different from 4.x
+                default => true,
+                desc => ?DESC(buffer_memory_overload_protection)
+            })}
+    ].
+
+% fields(consumer_opts) ->
+%     [
+%         {kafka, mk(ref(consumer_kafka_opts), #{required => true, desc => ?DESC(consumer_kafka_opts)})},
+%         {mqtt, mk(ref(consumer_mqtt_opts), #{required => true, desc => ?DESC(consumer_mqtt_opts)})}
+%     ];
+% fields(consumer_mqtt_opts) ->
+%     [ {topic, mk(string(), #{desc => ?DESC(consumer_mqtt_topic)})}
+%     ];
+
+% fields(consumer_mqtt_opts) ->
+%     [ {topic, mk(string(), #{desc => ?DESC(consumer_mqtt_topic)})}
+%     ];
+% fields(consumer_kafka_opts) ->
+%     [ {topic, mk(string(), #{desc => ?DESC(consumer_kafka_topic)})}
+%     ].
+
+desc("config") ->
+    ?DESC("desc_config");
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+    ["Configuration for Kafka using `", string:to_upper(Method), "` method."];
+desc(Name) ->
+    lists:member(Name, struct_names()) orelse throw({missing_desc, Name}),
+    ?DESC(Name).
+
+struct_names() ->
+    [
+        auth_gssapi_kerberos,
+        auth_username_password,
+        kafka_message,
+        producer_buffer,
+        producer_kafka_opts,
+        producer_mqtt_opts,
+        socket_opts,
+        producer_opts
+    ].
+
+%% -------------------------------------------------------------------------------------------------
+%% internal
+type_field() ->
+    {type, mk(enum([kafka]), #{required => true, desc => ?DESC("desc_type")})}.
+
+name_field() ->
+    {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
+
+ref(Name) ->
+    hoconsc:ref(?MODULE, Name).

+ 33 - 0
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl

@@ -0,0 +1,33 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+%% Kafka connection configuration
+-module(emqx_bridge_impl_kafka).
+-behaviour(emqx_resource).
+
+%% callbacks of behaviour emqx_resource
+-export([
+    callback_mode/0,
+    on_start/2,
+    on_stop/2,
+    on_query/3,
+    on_get_status/2,
+    is_buffer_supported/0
+]).
+
+is_buffer_supported() -> true.
+
+callback_mode() -> async_if_possible.
+
+on_start(InstId, Config) ->
+    emqx_bridge_impl_kafka_producer:on_start(InstId, Config).
+
+on_stop(InstId, State) ->
+    emqx_bridge_impl_kafka_producer:on_stop(InstId, State).
+
+on_query(InstId, Msg, State) ->
+    emqx_bridge_impl_kafka_producer:on_query(InstId, Msg, State).
+
+on_get_status(InstId, State) ->
+    emqx_bridge_impl_kafka_producer:on_get_status(InstId, State).

+ 270 - 0
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl

@@ -0,0 +1,270 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_impl_kafka_producer).
+
+%% callbacks of behaviour emqx_resource
+-export([
+    callback_mode/0,
+    on_start/2,
+    on_stop/2,
+    on_query/3,
+    on_get_status/2
+]).
+
+-export([on_kafka_ack/3]).
+
+-include_lib("emqx/include/logger.hrl").
+
+callback_mode() -> async_if_possible.
+
+%% @doc Config schema is defined in emqx_ee_bridge_kafka.
+on_start(InstId, Config) ->
+    #{
+        bridge_name := BridgeName,
+        bootstrap_hosts := Hosts0,
+        connect_timeout := ConnTimeout,
+        metadata_request_timeout := MetaReqTimeout,
+        min_metadata_refresh_interval := MinMetaRefreshInterval,
+        socket_opts := SocketOpts,
+        authentication := Auth,
+        ssl := SSL
+    } = Config,
+    %% it's a bug if producer config is not found
+    %% the caller should not try to start a producer if
+    %% there is no producer config
+    ProducerConfigWrapper = get_required(producer, Config, no_kafka_producer_config),
+    ProducerConfig = get_required(kafka, ProducerConfigWrapper, no_kafka_producer_parameters),
+    MessageTemplate = get_required(message, ProducerConfig, no_kafka_message_template),
+    Hosts = hosts(Hosts0),
+    ClientId = make_client_id(BridgeName),
+    ClientConfig = #{
+        min_metadata_refresh_interval => MinMetaRefreshInterval,
+        connect_timeout => ConnTimeout,
+        client_id => ClientId,
+        request_timeout => MetaReqTimeout,
+        extra_sock_opts => socket_opts(SocketOpts),
+        sasl => sasl(Auth),
+        ssl => ssl(SSL)
+    },
+    #{
+        topic := KafkaTopic
+    } = ProducerConfig,
+    case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
+        {ok, _} ->
+            ?SLOG(info, #{
+                msg => "kafka_client_started",
+                instance_id => InstId,
+                kafka_hosts => Hosts
+            });
+        {error, Reason} ->
+            ?SLOG(error, #{
+                msg => "failed_to_start_kafka_client",
+                instance_id => InstId,
+                kafka_hosts => Hosts,
+                reason => Reason
+            }),
+            throw(failed_to_start_kafka_client)
+    end,
+    WolffProducerConfig = producers_config(BridgeName, ClientId, ProducerConfig),
+    case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of
+        {ok, Producers} ->
+            {ok, #{
+                message_template => compile_message_template(MessageTemplate),
+                client_id => ClientId,
+                producers => Producers
+            }};
+        {error, Reason2} ->
+            ?SLOG(error, #{
+                msg => "failed_to_start_kafka_producer",
+                instance_id => InstId,
+                kafka_hosts => Hosts,
+                kafka_topic => KafkaTopic,
+                reason => Reason2
+            }),
+            throw(failed_to_start_kafka_producer)
+    end.
+
+on_stop(_InstId, #{client_id := ClientID, producers := Producers}) ->
+    with_log_at_error(
+        fun() -> wolff:stop_and_delete_supervised_producers(Producers) end,
+        #{
+            msg => "failed_to_delete_kafka_producer",
+            client_id => ClientID
+        }
+    ),
+    with_log_at_error(
+        fun() -> wolff:stop_and_delete_supervised_client(ClientID) end,
+        #{
+            msg => "failed_to_delete_kafka_client",
+            client_id => ClientID
+        }
+    ).
+
+%% @doc The callback API for rule-engine (or bridge without rules)
+%% The input argument `Message' is an enriched format (as a map())
+%% of the original #message{} record.
+%% The enrichment is done by rule-engine or by the data bridge framework.
+%% E.g. the output of rule-engine process chain
+%% or the direct mapping from an MQTT message.
+on_query(_InstId, {send_message, Message}, #{message_template := Template, producers := Producers}) ->
+    KafkaMessage = render_message(Template, Message),
+    %% The retuned information is discarded here.
+    %% If the producer process is down when sending, this function would
+    %% raise an error exception which is to be caught by the caller of this callback
+    {_Partition, _Pid} = wolff:send(Producers, [KafkaMessage], {fun ?MODULE:on_kafka_ack/3, [#{}]}),
+    ok.
+
+compile_message_template(#{
+    key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate
+}) ->
+    #{
+        key => emqx_plugin_libs_rule:preproc_tmpl(KeyTemplate),
+        value => emqx_plugin_libs_rule:preproc_tmpl(ValueTemplate),
+        timestamp => emqx_plugin_libs_rule:preproc_tmpl(TimestampTemplate)
+    }.
+
+render_message(
+    #{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, Message
+) ->
+    #{
+        key => render(KeyTemplate, Message),
+        value => render(ValueTemplate, Message),
+        ts => render_timestamp(TimestampTemplate, Message)
+    }.
+
+render(Template, Message) ->
+    emqx_plugin_libs_rule:proc_tmpl(Template, Message).
+
+render_timestamp(Template, Message) ->
+    try
+        binary_to_integer(render(Template, Message))
+    catch
+        _:_ ->
+            erlang:system_time(millisecond)
+    end.
+
+on_kafka_ack(_Partition, _Offset, _Extra) ->
+    %% Do nothing so far.
+    %% Maybe need to bump some counters?
+    ok.
+
+on_get_status(_InstId, _State) ->
+    connected.
+
+%% Parse comma separated host:port list into a [{Host,Port}] list
+hosts(Hosts) when is_binary(Hosts) ->
+    hosts(binary_to_list(Hosts));
+hosts(Hosts) when is_list(Hosts) ->
+    kpro:parse_endpoints(Hosts).
+
+%% Extra socket options, such as sndbuf size etc.
+socket_opts(Opts) when is_map(Opts) ->
+    socket_opts(maps:to_list(Opts));
+socket_opts(Opts) when is_list(Opts) ->
+    socket_opts_loop(Opts, []).
+
+socket_opts_loop([], Acc) ->
+    lists:reverse(Acc);
+socket_opts_loop([{T, Bytes} | Rest], Acc) when
+    T =:= sndbuf orelse T =:= recbuf orelse T =:= buffer
+->
+    Acc1 = [{T, Bytes} | adjust_socket_buffer(Bytes, Acc)],
+    socket_opts_loop(Rest, Acc1);
+socket_opts_loop([Other | Rest], Acc) ->
+    socket_opts_loop(Rest, [Other | Acc]).
+
+%% https://www.erlang.org/doc/man/inet.html
+%% For TCP it is recommended to have val(buffer) >= val(recbuf)
+%% to avoid performance issues because of unnecessary copying.
+adjust_socket_buffer(Bytes, Opts) ->
+    case lists:keytake(buffer, 1, Opts) of
+        false ->
+            [{buffer, Bytes} | Opts];
+        {value, {buffer, Bytes1}, Acc1} ->
+            [{buffer, max(Bytes1, Bytes)} | Acc1]
+    end.
+
+sasl(none) ->
+    undefined;
+sasl(#{mechanism := Mechanism, username := Username, password := Password}) ->
+    {Mechanism, Username, emqx_secret:wrap(Password)};
+sasl(#{
+    kerberos_principal := Principal,
+    kerberos_keytab_file := KeyTabFile
+}) ->
+    {callback, brod_gssapi, {gssapi, KeyTabFile, Principal}}.
+
+ssl(#{enable := true} = SSL) ->
+    emqx_tls_lib:to_client_opts(SSL);
+ssl(_) ->
+    [].
+
+producers_config(BridgeName, ClientId, Input) ->
+    #{
+        max_batch_bytes := MaxBatchBytes,
+        compression := Compression,
+        partition_strategy := PartitionStrategy,
+        required_acks := RequiredAcks,
+        partition_count_refresh_interval := PCntRefreshInterval,
+        max_inflight := MaxInflight,
+        buffer := #{
+            mode := BufferMode,
+            per_partition_limit := PerPartitionLimit,
+            segment_bytes := SegmentBytes,
+            memory_overload_protection := MemOLP
+        }
+    } = Input,
+
+    {OffloadMode, ReplayqDir} =
+        case BufferMode of
+            memory -> {false, false};
+            disk -> {false, replayq_dir(ClientId)};
+            hybrid -> {true, replayq_dir(ClientId)}
+        end,
+    #{
+        name => make_producer_name(BridgeName),
+        partitioner => PartitionStrategy,
+        partition_count_refresh_interval_seconds => PCntRefreshInterval,
+        replayq_dir => ReplayqDir,
+        replayq_offload_mode => OffloadMode,
+        replayq_max_total_bytes => PerPartitionLimit,
+        replayq_seg_bytes => SegmentBytes,
+        drop_if_highmem => MemOLP,
+        required_acks => RequiredAcks,
+        max_batch_bytes => MaxBatchBytes,
+        max_send_ahead => MaxInflight - 1,
+        compression => Compression
+    }.
+
+replayq_dir(ClientId) ->
+    filename:join([emqx:data_dir(), "kafka", ClientId]).
+
+%% Client ID is better to be unique to make it easier for Kafka side trouble shooting.
+make_client_id(BridgeName) when is_atom(BridgeName) ->
+    make_client_id(atom_to_list(BridgeName));
+make_client_id(BridgeName) ->
+    iolist_to_binary([BridgeName, ":", atom_to_list(node())]).
+
+%% Producer name must be an atom which will be used as a ETS table name for
+%% partition worker lookup.
+make_producer_name(BridgeName) when is_atom(BridgeName) ->
+    make_producer_name(atom_to_list(BridgeName));
+make_producer_name(BridgeName) ->
+    list_to_atom("kafka_producer_" ++ BridgeName).
+
+with_log_at_error(Fun, Log) ->
+    try
+        Fun()
+    catch
+        C:E ->
+            ?SLOG(error, Log#{
+                exception => C,
+                reason => E
+            })
+    end.
+
+get_required(Field, Config, Throw) ->
+    Value = maps:get(Field, Config, none),
+    Value =:= none andalso throw(Throw),
+    Value.

+ 559 - 0
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl

@@ -0,0 +1,559 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_impl_kafka_producer_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("brod/include/brod.hrl").
+
+-define(PRODUCER, emqx_bridge_impl_kafka).
+
+%%------------------------------------------------------------------------------
+%% Things for REST API tests
+%%------------------------------------------------------------------------------
+
+-import(
+    emqx_common_test_http,
+    [
+        request_api/3,
+        request_api/5,
+        get_http_data/1
+    ]
+).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("emqx/include/emqx.hrl").
+-include("emqx_dashboard.hrl").
+
+-define(CONTENT_TYPE, "application/x-www-form-urlencoded").
+
+-define(HOST, "http://127.0.0.1:18083").
+
+%% -define(API_VERSION, "v5").
+
+-define(BASE_PATH, "/api/v5").
+
+-define(APP_DASHBOARD, emqx_dashboard).
+-define(APP_MANAGEMENT, emqx_management).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+wait_until_kafka_is_up() ->
+    wait_until_kafka_is_up(0).
+
+wait_until_kafka_is_up(300) ->
+    ct:fail("Kafka is not up even though we have waited for a while");
+wait_until_kafka_is_up(Attempts) ->
+    KafkaTopic = "test-topic-one-partition",
+    case resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0) of
+        {ok, _} ->
+            ok;
+        _ ->
+            timer:sleep(1000),
+            wait_until_kafka_is_up(Attempts + 1)
+    end.
+
+init_per_suite(Config) ->
+    %% Need to unload emqx_authz. See emqx_machine_SUITE:init_per_suite for
+    %% more info.
+    application:unload(emqx_authz),
+    emqx_common_test_helpers:start_apps(
+        [emqx_conf, emqx_rule_engine, emqx_bridge, emqx_management, emqx_dashboard],
+        fun set_special_configs/1
+    ),
+    application:set_env(emqx_machine, applications, [
+        emqx_prometheus,
+        emqx_modules,
+        emqx_dashboard,
+        emqx_gateway,
+        emqx_statsd,
+        emqx_resource,
+        emqx_rule_engine,
+        emqx_bridge,
+        emqx_ee_bridge,
+        emqx_plugin_libs,
+        emqx_management,
+        emqx_retainer,
+        emqx_exhook,
+        emqx_authn,
+        emqx_authz,
+        emqx_plugin
+    ]),
+    {ok, _} = application:ensure_all_started(emqx_machine),
+    wait_until_kafka_is_up(),
+    %% Wait until bridges API is up
+    (fun WaitUntilRestApiUp() ->
+        case show(http_get(["bridges"])) of
+            {ok, 200, _Res} ->
+                ok;
+            Val ->
+                ct:pal("REST API for bridges not up. Wait and try again. Response: ~p", [Val]),
+                timer:sleep(1000),
+                WaitUntilRestApiUp()
+        end
+    end)(),
+    Config.
+
+end_per_suite(Config) ->
+    emqx_common_test_helpers:stop_apps([
+        emqx_prometheus,
+        emqx_modules,
+        emqx_dashboard,
+        emqx_gateway,
+        emqx_statsd,
+        emqx_resource,
+        emqx_rule_engine,
+        emqx_bridge,
+        emqx_ee_bridge,
+        emqx_plugin_libs,
+        emqx_management,
+        emqx_retainer,
+        emqx_exhook,
+        emqx_authn,
+        emqx_authz,
+        emqx_plugin,
+        emqx_conf,
+        emqx_bridge,
+        emqx_management,
+        emqx_dashboard,
+        emqx_machine
+    ]),
+    mria:stop(),
+    Config.
+
+set_special_configs(emqx_management) ->
+    Listeners = #{http => #{port => 8081}},
+    Config = #{
+        listeners => Listeners,
+        applications => [#{id => "admin", secret => "public"}]
+    },
+    emqx_config:put([emqx_management], Config),
+    ok;
+set_special_configs(emqx_dashboard) ->
+    emqx_dashboard_api_test_helpers:set_default_config(),
+    ok;
+set_special_configs(_) ->
+    ok.
+%%------------------------------------------------------------------------------
+%% Test cases for all combinations of SSL, no SSL and authentication types
+%%------------------------------------------------------------------------------
+
+t_publish_no_auth(_CtConfig) ->
+    publish_with_and_without_ssl("none").
+
+t_publish_sasl_plain(_CtConfig) ->
+    publish_with_and_without_ssl(valid_sasl_plain_settings()).
+
+t_publish_sasl_scram256(_CtConfig) ->
+    publish_with_and_without_ssl(valid_sasl_scram256_settings()).
+
+t_publish_sasl_scram512(_CtConfig) ->
+    publish_with_and_without_ssl(valid_sasl_scram512_settings()).
+
+t_publish_sasl_kerberos(_CtConfig) ->
+    publish_with_and_without_ssl(valid_sasl_kerberos_settings()).
+
+%%------------------------------------------------------------------------------
+%% Test cases for REST api
+%%------------------------------------------------------------------------------
+
+show(X) ->
+    % erlang:display('______________ SHOW ______________:'),
+    % erlang:display(X),
+    X.
+
+t_kafka_bridge_rest_api_plain_text(_CtConfig) ->
+    kafka_bridge_rest_api_all_auth_methods(false).
+
+t_kafka_bridge_rest_api_ssl(_CtConfig) ->
+    kafka_bridge_rest_api_all_auth_methods(true).
+
+kafka_bridge_rest_api_all_auth_methods(UseSSL) ->
+    NormalHostsString =
+        case UseSSL of
+            true -> kafka_hosts_string_ssl();
+            false -> kafka_hosts_string()
+        end,
+    kafka_bridge_rest_api_helper(#{
+        <<"bootstrap_hosts">> => NormalHostsString,
+        <<"authentication">> => <<"none">>
+    }),
+    SASLHostsString =
+        case UseSSL of
+            true -> kafka_hosts_string_ssl_sasl();
+            false -> kafka_hosts_string_sasl()
+        end,
+    BinifyMap = fun(Map) ->
+        maps:from_list([
+            {erlang:iolist_to_binary(K), erlang:iolist_to_binary(V)}
+         || {K, V} <- maps:to_list(Map)
+        ])
+    end,
+    SSLSettings =
+        case UseSSL of
+            true -> #{<<"ssl">> => BinifyMap(valid_ssl_settings())};
+            false -> #{}
+        end,
+    kafka_bridge_rest_api_helper(
+        maps:merge(
+            #{
+                <<"bootstrap_hosts">> => SASLHostsString,
+                <<"authentication">> => BinifyMap(valid_sasl_plain_settings())
+            },
+            SSLSettings
+        )
+    ),
+    kafka_bridge_rest_api_helper(
+        maps:merge(
+            #{
+                <<"bootstrap_hosts">> => SASLHostsString,
+                <<"authentication">> => BinifyMap(valid_sasl_scram256_settings())
+            },
+            SSLSettings
+        )
+    ),
+    kafka_bridge_rest_api_helper(
+        maps:merge(
+            #{
+                <<"bootstrap_hosts">> => SASLHostsString,
+                <<"authentication">> => BinifyMap(valid_sasl_scram512_settings())
+            },
+            SSLSettings
+        )
+    ),
+    kafka_bridge_rest_api_helper(
+        maps:merge(
+            #{
+                <<"bootstrap_hosts">> => SASLHostsString,
+                <<"authentication">> => BinifyMap(valid_sasl_kerberos_settings())
+            },
+            SSLSettings
+        )
+    ),
+    ok.
+
+kafka_bridge_rest_api_helper(Config) ->
+    UrlEscColon = "%3A",
+    BridgeIdUrlEnc = "kafka" ++ UrlEscColon ++ "my_kafka_bridge",
+    BridgesParts = ["bridges"],
+    BridgesPartsId = ["bridges", BridgeIdUrlEnc],
+    OpUrlFun = fun(OpName) -> ["bridges", BridgeIdUrlEnc, "operation", OpName] end,
+    BridgesPartsOpDisable = OpUrlFun("disable"),
+    BridgesPartsOpEnable = OpUrlFun("enable"),
+    BridgesPartsOpRestart = OpUrlFun("restart"),
+    BridgesPartsOpStop = OpUrlFun("stop"),
+    %% List bridges
+    MyKafkaBridgeExists = fun() ->
+        {ok, _Code, BridgesData} = show(http_get(BridgesParts)),
+        Bridges = show(json(BridgesData)),
+        lists:any(
+            fun
+                (#{<<"name">> := <<"my_kafka_bridge">>}) -> true;
+                (_) -> false
+            end,
+            Bridges
+        )
+    end,
+    %% Delete if my_kafka_bridge exists
+    case MyKafkaBridgeExists() of
+        true ->
+            %% Delete the bridge my_kafka_bridge
+            show(
+                '========================================== DELETE ========================================'
+            ),
+            {ok, 204, <<>>} = show(http_delete(BridgesPartsId));
+        false ->
+            ok
+    end,
+    false = MyKafkaBridgeExists(),
+    %% Create new Kafka bridge
+    CreateBodyTmp = #{
+        <<"type">> => <<"kafka">>,
+        <<"name">> => <<"my_kafka_bridge">>,
+        <<"bootstrap_hosts">> => maps:get(<<"bootstrap_hosts">>, Config),
+        <<"enable">> => true,
+        <<"authentication">> => maps:get(<<"authentication">>, Config),
+        <<"producer">> => #{
+            <<"mqtt">> => #{
+                topic => <<"t/#">>
+            },
+            <<"kafka">> => #{
+                <<"topic">> => <<"test-topic-one-partition">>
+            }
+        }
+    },
+    CreateBody =
+        case maps:is_key(<<"ssl">>, Config) of
+            true -> CreateBodyTmp#{<<"ssl">> => maps:get(<<"ssl">>, Config)};
+            false -> CreateBodyTmp
+        end,
+    {ok, 201, _Data} = show(http_post(BridgesParts, show(CreateBody))),
+    %% Check that the new bridge is in the list of bridges
+    true = MyKafkaBridgeExists(),
+    %% Perform operations
+    {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})),
+    {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})),
+    {ok, 200, _} = show(http_post(show(BridgesPartsOpEnable), #{})),
+    {ok, 200, _} = show(http_post(show(BridgesPartsOpEnable), #{})),
+    {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})),
+    {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})),
+    {ok, 200, _} = show(http_post(show(BridgesPartsOpRestart), #{})),
+    %% Cleanup
+    {ok, 204, _} = show(http_delete(BridgesPartsId)),
+    false = MyKafkaBridgeExists(),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Helper functions
+%%------------------------------------------------------------------------------
+
+publish_with_and_without_ssl(AuthSettings) ->
+    publish_helper(#{
+        auth_settings => AuthSettings,
+        ssl_settings => #{}
+    }),
+    publish_helper(#{
+        auth_settings => AuthSettings,
+        ssl_settings => valid_ssl_settings()
+    }).
+
+publish_helper(#{
+    auth_settings := AuthSettings,
+    ssl_settings := SSLSettings
+}) ->
+    HostsString =
+        case {AuthSettings, SSLSettings} of
+            {"none", Map} when map_size(Map) =:= 0 ->
+                kafka_hosts_string();
+            {"none", Map} when map_size(Map) =/= 0 ->
+                kafka_hosts_string_ssl();
+            {_, Map} when map_size(Map) =:= 0 ->
+                kafka_hosts_string_sasl();
+            {_, _} ->
+                kafka_hosts_string_ssl_sasl()
+        end,
+    Hash = erlang:phash2([HostsString, AuthSettings, SSLSettings]),
+    Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
+    InstId = emqx_bridge_resource:resource_id("kafka", Name),
+    KafkaTopic = "test-topic-one-partition",
+    Conf = config(#{
+        "authentication" => AuthSettings,
+        "kafka_hosts_string" => HostsString,
+        "kafka_topic" => KafkaTopic,
+        "instance_id" => InstId,
+        "ssl" => SSLSettings
+    }),
+    %% To make sure we get unique value
+    timer:sleep(1),
+    Time = erlang:monotonic_time(),
+    BinTime = integer_to_binary(Time),
+    Msg = #{
+        clientid => BinTime,
+        payload => <<"payload">>,
+        timestamp => Time
+    },
+    {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
+    ct:pal("base offset before testing ~p", [Offset]),
+    StartRes = ?PRODUCER:on_start(InstId, Conf),
+    {ok, State} = StartRes,
+    OnQueryRes = ?PRODUCER:on_query(InstId, {send_message, Msg}, State),
+    ok = OnQueryRes,
+    {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
+    ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
+    ok = ?PRODUCER:on_stop(InstId, State),
+    ok.
+
+config(Args) ->
+    ConfText = hocon_config(Args),
+    ct:pal("Running tests with conf:\n~s", [ConfText]),
+    {ok, Conf} = hocon:binary(ConfText),
+    #{config := Parsed} = hocon_tconf:check_plain(
+        emqx_ee_bridge_kafka,
+        #{<<"config">> => Conf},
+        #{atom_key => true}
+    ),
+    InstId = maps:get("instance_id", Args),
+    <<"bridge:", BridgeId/binary>> = InstId,
+    Parsed#{bridge_name => erlang:element(2, emqx_bridge_resource:parse_bridge_id(BridgeId))}.
+
+hocon_config(Args) ->
+    AuthConf = maps:get("authentication", Args),
+    AuthTemplate = iolist_to_binary(hocon_config_template_authentication(AuthConf)),
+    AuthConfRendered = bbmustache:render(AuthTemplate, AuthConf),
+    SSLConf = maps:get("ssl", Args, #{}),
+    SSLTemplate = iolist_to_binary(hocon_config_template_ssl(SSLConf)),
+    SSLConfRendered = bbmustache:render(SSLTemplate, SSLConf),
+    Hocon = bbmustache:render(
+        iolist_to_binary(hocon_config_template()),
+        Args#{
+            "authentication" => AuthConfRendered,
+            "ssl" => SSLConfRendered
+        }
+    ),
+    Hocon.
+
+%% erlfmt-ignore
+hocon_config_template() ->
+"""
+bootstrap_hosts = \"{{ kafka_hosts_string }}\"
+enable = true
+authentication = {{{ authentication }}} 
+ssl = {{{ ssl }}}
+producer = {
+    mqtt {
+       topic = \"t/#\"
+    }
+    kafka = {
+        topic = \"{{ kafka_topic }}\"
+    }
+}
+""".
+
+%% erlfmt-ignore
+hocon_config_template_authentication("none") ->
+    "none";
+hocon_config_template_authentication(#{"mechanism" := _}) ->
+"""
+{
+    mechanism = {{ mechanism }}
+    password = {{ password }}
+    username = {{ username }}
+}
+""";
+hocon_config_template_authentication(#{"kerberos_principal" := _}) ->
+"""
+{
+    kerberos_principal = \"{{ kerberos_principal }}\"
+    kerberos_keytab_file = \"{{ kerberos_keytab_file }}\"
+}
+""".
+
+%% erlfmt-ignore
+hocon_config_template_ssl(Map) when map_size(Map) =:= 0 ->
+"""
+{
+    enable = false
+}
+""";
+hocon_config_template_ssl(_) ->
+"""
+{
+    enable = true
+    cacertfile = \"{{{cacertfile}}}\"
+    certfile = \"{{{certfile}}}\"
+    keyfile = \"{{{keyfile}}}\"
+}
+""".
+
+kafka_hosts_string() ->
+    "kafka-1.emqx.net:9092,".
+
+kafka_hosts_string_sasl() ->
+    "kafka-1.emqx.net:9093,".
+
+kafka_hosts_string_ssl() ->
+    "kafka-1.emqx.net:9094,".
+
+kafka_hosts_string_ssl_sasl() ->
+    "kafka-1.emqx.net:9095,".
+
+valid_ssl_settings() ->
+    #{
+        "cacertfile" => <<"/var/lib/secret/ca.crt">>,
+        "certfile" => <<"/var/lib/secret/client.crt">>,
+        "keyfile" => <<"/var/lib/secret/client.key">>,
+        "enable" => <<"true">>
+    }.
+
+valid_sasl_plain_settings() ->
+    #{
+        "mechanism" => "plain",
+        "username" => "emqxuser",
+        "password" => "password"
+    }.
+
+valid_sasl_scram256_settings() ->
+    (valid_sasl_plain_settings())#{
+        "mechanism" => "scram_sha_256"
+    }.
+
+valid_sasl_scram512_settings() ->
+    (valid_sasl_plain_settings())#{
+        "mechanism" => "scram_sha_512"
+    }.
+
+valid_sasl_kerberos_settings() ->
+    #{
+        "kerberos_principal" => "rig@KDC.EMQX.NET",
+        "kerberos_keytab_file" => "/var/lib/secret/rig.keytab"
+    }.
+
+kafka_hosts() ->
+    kpro:parse_endpoints(kafka_hosts_string()).
+
+resolve_kafka_offset(Hosts, Topic, Partition) ->
+    brod:resolve_offset(Hosts, Topic, Partition, latest).
+
+%%------------------------------------------------------------------------------
+%% Internal functions rest API helpers
+%%------------------------------------------------------------------------------
+
+bin(X) -> iolist_to_binary(X).
+
+random_num() ->
+    erlang:system_time(nanosecond).
+
+http_get(Parts) ->
+    request_api(get, api_path(Parts), auth_header_()).
+
+http_delete(Parts) ->
+    request_api(delete, api_path(Parts), auth_header_()).
+
+http_post(Parts, Body) ->
+    request_api(post, api_path(Parts), [], auth_header_(), Body).
+
+http_put(Parts, Body) ->
+    request_api(put, api_path(Parts), [], auth_header_(), Body).
+
+request_dashboard(Method, Url, Auth) ->
+    Request = {Url, [Auth]},
+    do_request_dashboard(Method, Request).
+request_dashboard(Method, Url, QueryParams, Auth) ->
+    Request = {Url ++ "?" ++ QueryParams, [Auth]},
+    do_request_dashboard(Method, Request).
+do_request_dashboard(Method, Request) ->
+    ct:pal("Method: ~p, Request: ~p", [Method, Request]),
+    case httpc:request(Method, Request, [], []) of
+        {error, socket_closed_remotely} ->
+            {error, socket_closed_remotely};
+        {ok, {{"HTTP/1.1", Code, _}, _Headers, Return}} when
+            Code >= 200 andalso Code =< 299
+        ->
+            {ok, Return};
+        {ok, {Reason, _, _}} ->
+            {error, Reason}
+    end.
+
+auth_header_() ->
+    auth_header_(<<"admin">>, <<"public">>).
+
+auth_header_(Username, Password) ->
+    {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
+    {"Authorization", "Bearer " ++ binary_to_list(Token)}.
+
+api_path(Parts) ->
+    ?HOST ++ filename:join([?BASE_PATH | Parts]).
+
+json(Data) ->
+    {ok, Jsx} = emqx_json:safe_decode(Data, [return_maps]),
+    Jsx.

+ 3 - 1
lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src

@@ -5,7 +5,9 @@
         kernel,
         stdlib,
         hstreamdb_erl,
-        influxdb
+        influxdb,
+        wolff,
+        brod
     ]},
     {env, []},
     {modules, []},

+ 9 - 3
mix.exs

@@ -44,7 +44,7 @@ defmodule EMQXUmbrella.MixProject do
     # we need several overrides here because dependencies specify
     # other exact versions, and not ranges.
     [
-      {:lc, github: "emqx/lc", tag: "0.3.1"},
+      {:lc, github: "emqx/lc", tag: "0.3.2", override: true},
       {:redbug, "2.0.7"},
       {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
       {:ehttpc, github: "emqx/ehttpc", tag: "0.4.0", override: true},
@@ -57,7 +57,7 @@ defmodule EMQXUmbrella.MixProject do
       {:grpc, github: "emqx/grpc-erl", tag: "0.6.6", override: true},
       {:minirest, github: "emqx/minirest", tag: "1.3.7", override: true},
       {:ecpool, github: "emqx/ecpool", tag: "0.5.2", override: true},
-      {:replayq, "0.3.4", override: true},
+      {:replayq, github: "emqx/replayq", tag: "0.3.4", override: true},
       {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
       {:emqtt, github: "emqx/emqtt", tag: "1.7.0-rc.2", override: true},
       {:rulesql, github: "emqx/rulesql", tag: "0.1.4"},
@@ -129,7 +129,13 @@ defmodule EMQXUmbrella.MixProject do
   defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
     [
       {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"},
-      {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.3", override: true}
+      {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.3", override: true},
+      {:wolff, github: "kafka4beam/wolff", tag: "1.6.4"},
+      {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.0", override: true},
+      {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"},
+      {:brod, github: "kafka4beam/brod", tag: "3.16.4"},
+      {:snappyer, "1.2.8", override: true},
+      {:supervisor3, "1.1.11", override: true}
     ]
   end
 

+ 2 - 2
rebar.config

@@ -44,7 +44,7 @@
 {post_hooks,[]}.
 
 {deps,
-    [ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}}
+    [ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}}
     , {redbug, "2.0.7"}
     , {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
     , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
@@ -59,7 +59,7 @@
     , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.6"}}}
     , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.7"}}}
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
-    , {replayq, "0.3.4"}
+    , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.4"}}}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
     , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.7.0-rc.2"}}}
     , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}}

+ 53 - 6
scripts/ct/run.sh

@@ -10,12 +10,20 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/../.."
 help() {
     echo
     echo "-h|--help:              To display this usage info"
-    echo "--app lib_dir/app_name: Print apps in json"
+    echo "--app lib_dir/app_name: For which app to run start docker-compose, and run common tests"
+    echo "--suites SUITE1,SUITE2: Comma separated SUITE names to run. e.g. apps/emqx/test/emqx_SUITE.erl"
     echo "--console:              Start EMQX in console mode"
+    echo "--attach:               Attach to the Erlang docker container without running any test case"
+    echo "--only-up:              Only start the testbed but do not run CT"
+    echo "--keep-up:              Keep the testbed running after CT"
 }
 
 WHICH_APP='novalue'
 CONSOLE='no'
+KEEP_UP='no'
+ONLY_UP='no'
+SUITES=''
+ATTACH='no'
 while [ "$#" -gt 0 ]; do
     case $1 in
         -h|--help)
@@ -26,10 +34,26 @@ while [ "$#" -gt 0 ]; do
             WHICH_APP="$2"
             shift 2
             ;;
+        --only-up)
+            ONLY_UP='yes'
+            shift 1
+            ;;
+        --keep-up)
+            KEEP_UP='yes'
+            shift 1
+            ;;
+        --attach)
+            ATTACH='yes'
+            shift 1
+            ;;
         --console)
             CONSOLE='yes'
             shift 1
             ;;
+        --suites)
+            SUITES="$2"
+            shift 2
+            ;;
         *)
             echo "unknown option $1"
             exit 1
@@ -45,6 +69,16 @@ fi
 ERLANG_CONTAINER='erlang24'
 DOCKER_CT_ENVS_FILE="${WHICH_APP}/docker-ct"
 
+case "${WHICH_APP}" in
+    lib-ee*)
+        ## ensure enterprise profile when testing lib-ee applications
+        export PROFILE='emqx-enterprise'
+        ;;
+    *)
+        true
+        ;;
+esac
+
 if [ -f "$DOCKER_CT_ENVS_FILE" ]; then
     # shellcheck disable=SC2002
     CT_DEPS="$(cat "$DOCKER_CT_ENVS_FILE" | xargs)"
@@ -80,6 +114,9 @@ for dep in ${CT_DEPS}; do
             FILES+=( '.ci/docker-compose-file/docker-compose-pgsql-tcp.yaml'
                      '.ci/docker-compose-file/docker-compose-pgsql-tls.yaml' )
             ;;
+        kafka)
+            FILES+=( '.ci/docker-compose-file/docker-compose-kafka.yaml' )
+            ;;
         *)
             echo "unknown_ct_dependency $dep"
             exit 1
@@ -104,13 +141,23 @@ if [[ -t 1 ]]; then
 fi
 docker exec -i $TTY "$ERLANG_CONTAINER" bash -c 'git config --global --add safe.directory /emqx'
 
-if [ "$CONSOLE" = 'yes' ]; then
+if [ "$ONLY_UP" = 'yes' ]; then
+    exit 0
+fi
+
+if [ "$ATTACH" = 'yes' ]; then
+    docker exec -it "$ERLANG_CONTAINER" bash
+elif [ "$CONSOLE" = 'yes' ]; then
     docker exec -i $TTY "$ERLANG_CONTAINER" bash -c "make run"
 else
     set +e
-    docker exec -i $TTY "$ERLANG_CONTAINER" bash -c "make ${WHICH_APP}-ct"
+    docker exec -i $TTY -e EMQX_CT_SUITES="$SUITES" "$ERLANG_CONTAINER" bash -c "BUILD_WITHOUT_QUIC=1 make ${WHICH_APP}-ct"
     RESULT=$?
-    # shellcheck disable=2086 # no quotes for F_OPTIONS
-    docker-compose $F_OPTIONS down
-    exit $RESULT
+    if [ "$KEEP_UP" = 'yes' ]; then
+        exit $RESULT
+    else
+        # shellcheck disable=2086 # no quotes for F_OPTIONS
+        docker-compose $F_OPTIONS down
+        exit $RESULT
+    fi
 fi

+ 6 - 2
scripts/find-suites.sh

@@ -8,5 +8,9 @@ set -euo pipefail
 # ensure dir
 cd -P -- "$(dirname -- "$0")/.."
 
-TESTDIR="$1/test"
-find "${TESTDIR}" -name "*_SUITE.erl" -print0 2>/dev/null | xargs -0 | tr ' ' ','
+if [ -z "${EMQX_CT_SUITES:-}" ]; then
+    TESTDIR="$1/test"
+    find "${TESTDIR}" -name "*_SUITE.erl" -print0 2>/dev/null | xargs -0 | tr ' ' ','
+else
+    echo "${EMQX_CT_SUITES}"
+fi