Explorar o código

Merge pull request #9564 from thalesmg/kafka-source-ee50

feat: implement kafka consumer (ee5.0)
Thales Macedo Garitezi %!s(int64=3) %!d(string=hai) anos
pai
achega
cf1cce011b
Modificáronse 38 ficheiros con 3659 adicións e 371 borrados
  1. 19 16
      .ci/docker-compose-file/docker-compose-kafka.yaml
  2. 4 1
      .ci/docker-compose-file/docker-compose-toxiproxy.yaml
  3. 2 5
      .ci/docker-compose-file/docker-compose.yaml
  4. 0 46
      .ci/docker-compose-file/kafka/generate-certs.sh
  5. 1 0
      .ci/docker-compose-file/kafka/kafka-entrypoint.sh
  6. 24 0
      .ci/docker-compose-file/toxiproxy.json
  7. 1 1
      Makefile
  8. 1 1
      apps/emqx/rebar.config
  9. 113 19
      apps/emqx/test/emqx_common_test_helpers.erl
  10. 12 1
      apps/emqx/test/emqx_test_janitor.erl
  11. 3 1
      apps/emqx_bridge/rebar.config
  12. 1 1
      apps/emqx_bridge/src/emqx_bridge.app.src
  13. 10 5
      apps/emqx_bridge/src/emqx_bridge.erl
  14. 13 3
      apps/emqx_bridge/src/emqx_bridge_resource.erl
  15. 0 2
      apps/emqx_bridge/src/emqx_bridge_sup.erl
  16. 0 1
      apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl
  17. 1 1
      apps/emqx_conf/src/emqx_cluster_rpc.erl
  18. 1 3
      apps/emqx_conf/test/emqx_conf_app_SUITE.erl
  19. 2 2
      apps/emqx_plugins/test/emqx_plugins_SUITE.erl
  20. 1 1
      apps/emqx_resource/include/emqx_resource.hrl
  21. 2 0
      changes/ee/feat-9564.en.md
  22. 2 0
      changes/ee/feat-9564.zh.md
  23. 186 13
      lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf
  24. 1 1
      lib-ee/emqx_ee_bridge/rebar.config
  25. 1 1
      lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src
  26. 29 12
      lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl
  27. 208 41
      lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl
  28. 31 27
      lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl
  29. 499 0
      lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl
  30. 16 49
      lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl
  31. 79 0
      lib-ee/emqx_ee_bridge/src/kafka/emqx_ee_bridge_kafka_consumer_sup.erl
  32. 1917 0
      lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl
  33. 154 76
      lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl
  34. 2 2
      lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl
  35. 287 0
      lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl
  36. 2 2
      mix.exs
  37. 1 1
      rebar.config
  38. 33 36
      scripts/ct/run.sh

+ 19 - 16
.ci/docker-compose-file/docker-compose-kafka.yaml

@@ -10,32 +10,34 @@ services:
     networks:
       emqx_bridge:
   ssl_cert_gen:
-    image: fredrikhgrelland/alpine-jdk11-openssl
+    # see https://github.com/emqx/docker-images
+    image:  ghcr.io/emqx/certgen:latest
     container_name: ssl_cert_gen
+    user: "${DOCKER_USER:-root}"
     volumes:
-      - emqx-shared-secret:/var/lib/secret
-      - ./kafka/generate-certs.sh:/bin/generate-certs.sh
-    entrypoint: /bin/sh
-    command: /bin/generate-certs.sh
+      - /tmp/emqx-ci/emqx-shared-secret:/var/lib/secret
   kdc:
     hostname: kdc.emqx.net
     image:  ghcr.io/emqx/emqx-builder/5.0-28:1.13.4-24.3.4.2-2-ubuntu20.04
     container_name: kdc.emqx.net
+    expose:
+      - 88 # kdc
+      - 749 # admin server
+    # ports:
+    #   - 88:88
+    #   - 749:749
     networks:
       emqx_bridge:
     volumes:
-      - emqx-shared-secret:/var/lib/secret
+      - /tmp/emqx-ci/emqx-shared-secret:/var/lib/secret
       - ./kerberos/krb5.conf:/etc/kdc/krb5.conf
       - ./kerberos/krb5.conf:/etc/krb5.conf
       - ./kerberos/run.sh:/usr/bin/run.sh
     command: run.sh
   kafka_1:
-    image: wurstmeister/kafka:2.13-2.7.0
-    ports:
-      - "9092:9092"
-      - "9093:9093"
-      - "9094:9094"
-      - "9095:9095"
+    image: wurstmeister/kafka:2.13-2.8.1
+    # ports:
+    #   - "9192-9195:9192-9195"
     container_name: kafka-1.emqx.net
     hostname: kafka-1.emqx.net
     depends_on:
@@ -48,9 +50,9 @@ services:
     environment:
       KAFKA_BROKER_ID: 1
       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
-      KAFKA_LISTENERS: PLAINTEXT://:9092,SASL_PLAINTEXT://:9093,SSL://:9094,SASL_SSL://:9095
-      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1.emqx.net:9092,SASL_PLAINTEXT://kafka-1.emqx.net:9093,SSL://kafka-1.emqx.net:9094,SASL_SSL://kafka-1.emqx.net:9095
-      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL
+      KAFKA_LISTENERS: PLAINTEXT://:9092,SASL_PLAINTEXT://:9093,SSL://:9094,SASL_SSL://:9095,LOCAL_PLAINTEXT://:9192,LOCAL_SASL_PLAINTEXT://:9193,LOCAL_SSL://:9194,LOCAL_SASL_SSL://:9195,TOXIPROXY_PLAINTEXT://:9292,TOXIPROXY_SASL_PLAINTEXT://:9293,TOXIPROXY_SSL://:9294,TOXIPROXY_SASL_SSL://:9295
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1.emqx.net:9092,SASL_PLAINTEXT://kafka-1.emqx.net:9093,SSL://kafka-1.emqx.net:9094,SASL_SSL://kafka-1.emqx.net:9095,LOCAL_PLAINTEXT://localhost:9192,LOCAL_SASL_PLAINTEXT://localhost:9193,LOCAL_SSL://localhost:9194,LOCAL_SASL_SSL://localhost:9195,TOXIPROXY_PLAINTEXT://toxiproxy.emqx.net:9292,TOXIPROXY_SASL_PLAINTEXT://toxiproxy.emqx.net:9293,TOXIPROXY_SSL://toxiproxy.emqx.net:9294,TOXIPROXY_SASL_SSL://toxiproxy.emqx.net:9295
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL,LOCAL_PLAINTEXT:PLAINTEXT,LOCAL_SASL_PLAINTEXT:SASL_PLAINTEXT,LOCAL_SSL:SSL,LOCAL_SASL_SSL:SASL_SSL,TOXIPROXY_PLAINTEXT:PLAINTEXT,TOXIPROXY_SASL_PLAINTEXT:SASL_PLAINTEXT,TOXIPROXY_SSL:SSL,TOXIPROXY_SASL_SSL:SASL_SSL
       KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
       KAFKA_SASL_ENABLED_MECHANISMS: PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,GSSAPI
       KAFKA_SASL_KERBEROS_SERVICE_NAME: kafka
@@ -58,6 +60,7 @@ services:
       KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas.conf"
       KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
       KAFKA_CREATE_TOPICS_NG: test-topic-one-partition:1:1,test-topic-two-partitions:2:1,test-topic-three-partitions:3:1,
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
       KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
       KAFKA_SSL_TRUSTSTORE_LOCATION: /var/lib/secret/kafka.truststore.jks
       KAFKA_SSL_TRUSTSTORE_PASSWORD: password
@@ -67,7 +70,7 @@ services:
     networks:
       emqx_bridge:
     volumes:
-      - emqx-shared-secret:/var/lib/secret
+      - /tmp/emqx-ci/emqx-shared-secret:/var/lib/secret
       - ./kafka/jaas.conf:/etc/kafka/jaas.conf
       - ./kafka/kafka-entrypoint.sh:/bin/kafka-entrypoint.sh
       - ./kerberos/krb5.conf:/etc/kdc/krb5.conf

+ 4 - 1
.ci/docker-compose-file/docker-compose-toxiproxy.yaml

@@ -6,7 +6,10 @@ services:
     image: ghcr.io/shopify/toxiproxy:2.5.0
     restart: always
     networks:
-      - emqx_bridge
+      emqx_bridge:
+        aliases:
+          - toxiproxy
+          - toxiproxy.emqx.net
     volumes:
       - "./toxiproxy.json:/config/toxiproxy.json"
     ports:

+ 2 - 5
.ci/docker-compose-file/docker-compose.yaml

@@ -18,12 +18,12 @@ services:
       - emqx_bridge
     volumes:
       - ../..:/emqx
-      - emqx-shared-secret:/var/lib/secret
+      - /tmp/emqx-ci/emqx-shared-secret:/var/lib/secret
       - ./kerberos/krb5.conf:/etc/kdc/krb5.conf
       - ./kerberos/krb5.conf:/etc/krb5.conf
     working_dir: /emqx
     tty: true
-    user: "${UID_GID}"
+    user: "${DOCKER_USER:-root}"
 
 networks:
   emqx_bridge:
@@ -37,6 +37,3 @@ networks:
           gateway: 172.100.239.1
         - subnet: 2001:3200:3200::/64
           gateway: 2001:3200:3200::1
-
-volumes:   # add this section
-  emqx-shared-secret:    # does not need anything underneath this

+ 0 - 46
.ci/docker-compose-file/kafka/generate-certs.sh

@@ -1,46 +0,0 @@
-#!/usr/bin/bash
-
-set -euo pipefail
-
-set -x
-
-# Source https://github.com/zmstone/docker-kafka/blob/master/generate-certs.sh
-
-HOST="*."
-DAYS=3650
-PASS="password"
-
-cd /var/lib/secret/
-
-# Delete old files
-(rm ca.key ca.crt server.key server.csr server.crt client.key client.csr client.crt server.p12 kafka.keystore.jks kafka.truststore.jks 2>/dev/null || true)
-
-ls
-
-echo '== Generate self-signed server and client certificates'
-echo '= generate CA'
-openssl req -new -x509 -keyout ca.key -out ca.crt -days $DAYS -nodes -subj "/C=SE/ST=Stockholm/L=Stockholm/O=brod/OU=test/CN=$HOST"
-
-echo '= generate server certificate request'
-openssl req -newkey rsa:2048 -sha256 -keyout server.key -out server.csr -days "$DAYS" -nodes -subj "/C=SE/ST=Stockholm/L=Stockholm/O=brod/OU=test/CN=$HOST"
-
-echo '= sign server certificate'
-openssl x509 -req -CA ca.crt -CAkey ca.key -in server.csr -out server.crt -days "$DAYS" -CAcreateserial
-
-echo '= generate client certificate request'
-openssl req -newkey rsa:2048 -sha256 -keyout client.key -out client.csr -days "$DAYS" -nodes -subj "/C=SE/ST=Stockholm/L=Stockholm/O=brod/OU=test/CN=$HOST"
-
-echo '== sign client certificate'
-openssl x509 -req -CA ca.crt -CAkey ca.key -in client.csr -out client.crt -days $DAYS -CAserial ca.srl
-
-echo '= Convert self-signed certificate to PKCS#12 format'
-openssl pkcs12 -export -name "$HOST" -in server.crt -inkey server.key -out server.p12 -CAfile ca.crt -passout pass:"$PASS"
-
-echo '= Import PKCS#12 into a java keystore'
-
-echo $PASS | keytool -importkeystore -destkeystore kafka.keystore.jks -srckeystore server.p12 -srcstoretype pkcs12 -alias "$HOST" -storepass "$PASS"
-
-
-echo '= Import CA into java truststore'
-
-echo yes | keytool -keystore kafka.truststore.jks -alias CARoot -import -file ca.crt -storepass "$PASS"

+ 1 - 0
.ci/docker-compose-file/kafka/kafka-entrypoint.sh

@@ -17,6 +17,7 @@ timeout $TIMEOUT bash -c 'until [ -f /var/lib/secret/kafka.keytab ]; do sleep 1;
 echo "+++++++ Wait until SSL certs are generated ++++++++"
 
 timeout $TIMEOUT bash -c 'until [ -f /var/lib/secret/kafka.truststore.jks ]; do sleep 1; done'
+keytool -list -v -keystore /var/lib/secret/kafka.keystore.jks -storepass password
 
 sleep 3
 

+ 24 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -53,5 +53,29 @@
     "listen": "0.0.0.0:8000",
     "upstream": "dynamo:8000",
     "enabled": true
+  },
+  {
+    "name": "kafka_plain",
+    "listen": "0.0.0.0:9292",
+    "upstream": "kafka-1.emqx.net:9292",
+    "enabled": true
+  },
+  {
+    "name": "kafka_sasl_plain",
+    "listen": "0.0.0.0:9293",
+    "upstream": "kafka-1.emqx.net:9293",
+    "enabled": true
+  },
+  {
+    "name": "kafka_ssl",
+    "listen": "0.0.0.0:9294",
+    "upstream": "kafka-1.emqx.net:9294",
+    "enabled": true
+  },
+  {
+    "name": "kafka_sasl_ssl",
+    "listen": "0.0.0.0:9295",
+    "upstream": "kafka-1.emqx.net:9295",
+    "enabled": true
   }
 ]

+ 1 - 1
Makefile

@@ -107,7 +107,7 @@ endef
 $(foreach app,$(APPS),$(eval $(call gen-app-prop-target,$(app))))
 
 .PHONY: ct-suite
-ct-suite: $(REBAR)
+ct-suite: $(REBAR) merge-config
 ifneq ($(TESTCASE),)
 ifneq ($(GROUP),)
 	$(REBAR) ct -v --readable=$(CT_READABLE) --name $(CT_NODE_NAME) --suite $(SUITE)  --case $(TESTCASE) --group $(GROUP)

+ 1 - 1
apps/emqx/rebar.config

@@ -33,7 +33,7 @@
     {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},
     {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
     {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
-    {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.0"}}}
+    {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.7"}}}
 ]}.
 
 {plugins, [{rebar3_proper, "0.12.1"}]}.

+ 113 - 19
apps/emqx/test/emqx_common_test_helpers.erl

@@ -37,6 +37,7 @@
     deps_path/2,
     flush/0,
     flush/1,
+    load/1,
     render_and_load_app_config/1,
     render_and_load_app_config/2
 ]).
@@ -66,14 +67,16 @@
     emqx_cluster/2,
     start_epmd/0,
     start_slave/2,
-    stop_slave/1
+    stop_slave/1,
+    listener_port/2
 ]).
 
 -export([clear_screen/0]).
 -export([with_mock/4]).
 -export([
     on_exit/1,
-    call_janitor/0
+    call_janitor/0,
+    call_janitor/1
 ]).
 
 %% Toxiproxy API
@@ -587,6 +590,12 @@ ensure_quic_listener(Name, UdpPort, ExtraSettings) ->
     %% Whether to execute `emqx_config:init_load(SchemaMod)`
     %% default: true
     load_schema => boolean(),
+    %% If we want to exercise the scenario where a node joins an
+    %% existing cluster where there has already been some
+    %% configuration changes (via cluster rpc), then we need to enable
+    %% autocluster so that the joining node will restart the
+    %% `emqx_conf' app and correctly catch up the config.
+    start_autocluster => boolean(),
     %% Eval by emqx_config:put/2
     conf => [{KeyPath :: list(), Val :: term()}],
     %% Fast option to config listener port
@@ -637,25 +646,53 @@ emqx_cluster(Specs0, CommonOpts) ->
 %% Lower level starting API
 
 -spec start_slave(shortname(), node_opts()) -> nodename().
-start_slave(Name, Opts) ->
-    {ok, Node} = ct_slave:start(
-        list_to_atom(atom_to_list(Name) ++ "@" ++ host()),
-        [
-            {kill_if_fail, true},
-            {monitor_master, true},
-            {init_timeout, 10000},
-            {startup_timeout, 10000},
-            {erl_flags, erl_flags()}
-        ]
-    ),
-
+start_slave(Name, Opts) when is_list(Opts) ->
+    start_slave(Name, maps:from_list(Opts));
+start_slave(Name, Opts) when is_map(Opts) ->
+    SlaveMod = maps:get(peer_mod, Opts, ct_slave),
+    Node = node_name(Name),
+    DoStart =
+        fun() ->
+            case SlaveMod of
+                ct_slave ->
+                    ct_slave:start(
+                        Node,
+                        [
+                            {kill_if_fail, true},
+                            {monitor_master, true},
+                            {init_timeout, 10000},
+                            {startup_timeout, 10000},
+                            {erl_flags, erl_flags()}
+                        ]
+                    );
+                slave ->
+                    slave:start_link(host(), Name, ebin_path())
+            end
+        end,
+    case DoStart() of
+        {ok, _} ->
+            ok;
+        {error, started_not_connected, _} ->
+            ok;
+        Other ->
+            throw(Other)
+    end,
     pong = net_adm:ping(Node),
+    put_peer_mod(Node, SlaveMod),
     setup_node(Node, Opts),
+    ok = snabbkaffe:forward_trace(Node),
     Node.
 
 %% Node stopping
-stop_slave(Node) ->
-    ct_slave:stop(Node).
+stop_slave(Node0) ->
+    Node = node_name(Node0),
+    SlaveMod = get_peer_mod(Node),
+    erase_peer_mod(Node),
+    case SlaveMod:stop(Node) of
+        ok -> ok;
+        {ok, _} -> ok;
+        {error, not_started, _} -> ok
+    end.
 
 %% EPMD starting
 start_epmd() ->
@@ -693,9 +730,27 @@ setup_node(Node, Opts) when is_map(Opts) ->
         {Type, listener_port(BasePort, Type)}
      || Type <- [tcp, ssl, ws, wss]
     ]),
+    %% we need a fresh data dir for each peer node to avoid unintended
+    %% successes due to sharing of data in the cluster.
+    PrivDataDir = maps:get(priv_data_dir, Opts, "/tmp"),
+    %% If we want to exercise the scenario where a node joins an
+    %% existing cluster where there has already been some
+    %% configuration changes (via cluster rpc), then we need to enable
+    %% autocluster so that the joining node will restart the
+    %% `emqx_conf' app and correctly catch up the config.
+    StartAutocluster = maps:get(start_autocluster, Opts, false),
 
     %% Load env before doing anything to avoid overriding
-    [ok = rpc:call(Node, application, load, [App]) || App <- LoadApps],
+    lists:foreach(fun(App) -> rpc:call(Node, ?MODULE, load, [App]) end, LoadApps),
+    %% Ensure a clean mnesia directory for each run to avoid
+    %% inter-test flakiness.
+    MnesiaDataDir = filename:join([
+        PrivDataDir,
+        node(),
+        integer_to_list(erlang:unique_integer()),
+        "mnesia"
+    ]),
+    erpc:call(Node, application, set_env, [mnesia, dir, MnesiaDataDir]),
 
     %% Needs to be set explicitly because ekka:start() (which calls `gen`) is called without Handler
     %% in emqx_common_test_helpers:start_apps(...)
@@ -721,7 +776,19 @@ setup_node(Node, Opts) when is_map(Opts) ->
             %% Otherwise, configuration gets loaded and all preset env in EnvHandler is lost
             LoadSchema andalso
                 begin
+                    %% to avoid sharing data between executions and/or
+                    %% nodes.  these variables might notbe in the
+                    %% config file (e.g.: emqx_ee_conf_schema).
+                    NodeDataDir = filename:join([
+                        PrivDataDir,
+                        node(),
+                        integer_to_list(erlang:unique_integer())
+                    ]),
+                    os:putenv("EMQX_NODE__DATA_DIR", NodeDataDir),
+                    os:putenv("EMQX_NODE__COOKIE", atom_to_list(erlang:get_cookie())),
                     emqx_config:init_load(SchemaMod),
+                    os:unsetenv("EMQX_NODE__DATA_DIR"),
+                    os:unsetenv("EMQX_NODE__COOKIE"),
                     application:set_env(emqx, init_config_load_done, true)
                 end,
 
@@ -748,6 +815,8 @@ setup_node(Node, Opts) when is_map(Opts) ->
         undefined ->
             ok;
         _ ->
+            StartAutocluster andalso
+                (ok = rpc:call(Node, emqx_machine_boot, start_autocluster, [])),
             case rpc:call(Node, ekka, join, [JoinTo]) of
                 ok ->
                     ok;
@@ -762,8 +831,27 @@ setup_node(Node, Opts) when is_map(Opts) ->
 
 %% Helpers
 
+put_peer_mod(Node, SlaveMod) ->
+    put({?MODULE, Node}, SlaveMod),
+    ok.
+
+get_peer_mod(Node) ->
+    case get({?MODULE, Node}) of
+        undefined -> ct_slave;
+        SlaveMod -> SlaveMod
+    end.
+
+erase_peer_mod(Node) ->
+    erase({?MODULE, Node}).
+
 node_name(Name) ->
-    list_to_atom(lists:concat([Name, "@", host()])).
+    case string:tokens(atom_to_list(Name), "@") of
+        [_Name, _Host] ->
+            %% the name already has a @
+            Name;
+        _ ->
+            list_to_atom(atom_to_list(Name) ++ "@" ++ host())
+    end.
 
 gen_node_name(Num) ->
     list_to_atom("autocluster_node" ++ integer_to_list(Num)).
@@ -804,6 +892,9 @@ base_port(Number) ->
 gen_rpc_port(BasePort) ->
     BasePort - 1.
 
+listener_port(Opts, Type) when is_map(Opts) ->
+    BasePort = maps:get(base_port, Opts),
+    listener_port(BasePort, Type);
 listener_port(BasePort, tcp) ->
     BasePort;
 listener_port(BasePort, ssl) ->
@@ -988,8 +1079,11 @@ latency_up_proxy(off, Name, ProxyHost, ProxyPort) ->
 %% stop the janitor gracefully to ensure proper cleanup order and less
 %% noise in the logs.
 call_janitor() ->
+    call_janitor(15_000).
+
+call_janitor(Timeout) ->
     Janitor = get_or_spawn_janitor(),
-    exit(Janitor, normal),
+    ok = emqx_test_janitor:stop(Janitor, Timeout),
     ok.
 
 get_or_spawn_janitor() ->

+ 12 - 1
apps/emqx/test/emqx_test_janitor.erl

@@ -30,6 +30,8 @@
 %% API
 -export([
     start_link/0,
+    stop/1,
+    stop/2,
     push_on_exit_callback/2
 ]).
 
@@ -40,6 +42,12 @@
 start_link() ->
     gen_server:start_link(?MODULE, self(), []).
 
+stop(Server) ->
+    stop(Server, 15_000).
+
+stop(Server, Timeout) ->
+    gen_server:call(Server, terminate, Timeout).
+
 push_on_exit_callback(Server, Callback) when is_function(Callback, 0) ->
     gen_server:call(Server, {push, Callback}).
 
@@ -52,10 +60,13 @@ init(Parent) ->
     {ok, #{callbacks => [], owner => Parent}}.
 
 terminate(_Reason, #{callbacks := Callbacks}) ->
-    lists:foreach(fun(Fun) -> Fun() end, Callbacks).
+    lists:foreach(fun(Fun) -> catch Fun() end, Callbacks).
 
 handle_call({push, Callback}, _From, State = #{callbacks := Callbacks}) ->
     {reply, ok, State#{callbacks := [Callback | Callbacks]}};
+handle_call(terminate, _From, State = #{callbacks := Callbacks}) ->
+    lists:foreach(fun(Fun) -> Fun() end, Callbacks),
+    {stop, normal, ok, State};
 handle_call(_Req, _From, State) ->
     {reply, error, State}.
 

+ 3 - 1
apps/emqx_bridge/rebar.config

@@ -1,5 +1,7 @@
 {erl_opts, [debug_info]}.
-{deps, [{emqx, {path, "../emqx"}}]}.
+{deps, [ {emqx, {path, "../emqx"}}
+       , {emqx_resource, {path, "../../apps/emqx_resource"}}
+       ]}.
 
 {shell, [
     % {config, "config/sys.config"},

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge.app.src

@@ -2,7 +2,7 @@
 {application, emqx_bridge, [
     {description, "EMQX bridges"},
     {vsn, "0.1.13"},
-    {registered, []},
+    {registered, [emqx_bridge_sup]},
     {mod, {emqx_bridge_app, []}},
     {applications, [
         kernel,

+ 10 - 5
apps/emqx_bridge/src/emqx_bridge.erl

@@ -55,6 +55,10 @@
     T == gcp_pubsub;
     T == influxdb_api_v1;
     T == influxdb_api_v2;
+    %% TODO: rename this to `kafka_producer' after alias support is
+    %% added to hocon; keeping this as just `kafka' for backwards
+    %% compatibility.
+    T == kafka;
     T == redis_single;
     T == redis_sentinel;
     T == redis_cluster;
@@ -137,12 +141,12 @@ load_hook(Bridges) ->
         maps:to_list(Bridges)
     ).
 
-do_load_hook(Type, #{local_topic := _}) when ?EGRESS_DIR_BRIDGES(Type) ->
+do_load_hook(Type, #{local_topic := LocalTopic}) when
+    ?EGRESS_DIR_BRIDGES(Type) andalso is_binary(LocalTopic)
+->
     emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
 do_load_hook(mqtt, #{egress := #{local := #{topic := _}}}) ->
     emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
-do_load_hook(kafka, #{producer := #{mqtt := #{topic := _}}}) ->
-    emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
 do_load_hook(_Type, _Conf) ->
     ok.
 
@@ -223,6 +227,7 @@ post_config_update(_, _Req, NewConf, OldConf, _AppEnv) ->
     ]),
     ok = unload_hook(),
     ok = load_hook(NewConf),
+    ?tp(bridge_post_config_update_done, #{}),
     Result.
 
 list() ->
@@ -407,8 +412,8 @@ get_matched_bridge_id(BType, Conf, Topic, BName, Acc) when ?EGRESS_DIR_BRIDGES(B
     end;
 get_matched_bridge_id(mqtt, #{egress := #{local := #{topic := Filter}}}, Topic, BName, Acc) ->
     do_get_matched_bridge_id(Topic, Filter, mqtt, BName, Acc);
-get_matched_bridge_id(kafka, #{producer := #{mqtt := #{topic := Filter}}}, Topic, BName, Acc) ->
-    do_get_matched_bridge_id(Topic, Filter, kafka, BName, Acc).
+get_matched_bridge_id(_BType, _Conf, _Topic, _BName, Acc) ->
+    Acc.
 
 do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc) ->
     case emqx_topic:match(Topic, Filter) of

+ 13 - 3
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -16,6 +16,7 @@
 -module(emqx_bridge_resource).
 
 -include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 
 -export([
     bridge_to_resource_type/1,
@@ -45,7 +46,12 @@
 ]).
 
 %% bi-directional bridge with producer/consumer or ingress/egress configs
--define(IS_BI_DIR_BRIDGE(TYPE), TYPE =:= <<"mqtt">>; TYPE =:= <<"kafka">>).
+-define(IS_BI_DIR_BRIDGE(TYPE),
+    (TYPE) =:= <<"mqtt">>
+).
+-define(IS_INGRESS_BRIDGE(TYPE),
+    (TYPE) =:= <<"kafka_consumer">> orelse ?IS_BI_DIR_BRIDGE(TYPE)
+).
 
 -if(?EMQX_RELEASE_EDITION == ee).
 bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
@@ -219,7 +225,7 @@ recreate(Type, Name, Conf, Opts) ->
     ).
 
 create_dry_run(Type, Conf0) ->
-    TmpPath0 = iolist_to_binary(["bridges-create-dry-run:", emqx_misc:gen_id(8)]),
+    TmpPath0 = iolist_to_binary([?TEST_ID_PREFIX, emqx_misc:gen_id(8)]),
     TmpPath = emqx_misc:safe_filename(TmpPath0),
     Conf = emqx_map_lib:safe_atom_key_map(Conf0),
     case emqx_connector_ssl:convert_certs(TmpPath, Conf) of
@@ -297,12 +303,16 @@ parse_confs(
                 max_retries => Retry
             }
     };
-parse_confs(Type, Name, Conf) when ?IS_BI_DIR_BRIDGE(Type) ->
+parse_confs(Type, Name, Conf) when ?IS_INGRESS_BRIDGE(Type) ->
     %% For some drivers that can be used as data-sources, we need to provide a
     %% hookpoint. The underlying driver will run `emqx_hooks:run/3` when it
     %% receives a message from the external database.
     BId = bridge_id(Type, Name),
     Conf#{hookpoint => <<"$bridges/", BId/binary>>, bridge_name => Name};
+%% TODO: rename this to `kafka_producer' after alias support is added
+%% to hocon; keeping this as just `kafka' for backwards compatibility.
+parse_confs(<<"kafka">> = _Type, Name, Conf) ->
+    Conf#{bridge_name => Name};
 parse_confs(_Type, _Name, Conf) ->
     Conf.
 

+ 0 - 2
apps/emqx_bridge/src/emqx_bridge_sup.erl

@@ -34,5 +34,3 @@ init([]) ->
     },
     ChildSpecs = [],
     {ok, {SupFlags, ChildSpecs}}.
-
-%% internal functions

+ 0 - 1
apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl

@@ -38,7 +38,6 @@ init_per_suite(_Config) ->
     ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
     ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
     {ok, _} = application:ensure_all_started(emqx_connector),
-    snabbkaffe:fix_ct_logging(),
     [].
 
 end_per_suite(_Config) ->

+ 1 - 1
apps/emqx_conf/src/emqx_cluster_rpc.erl

@@ -204,7 +204,7 @@ do_multicall(M, F, A, RequiredSyncs, Timeout) ->
 query(TnxId) ->
     transaction(fun ?MODULE:trans_query/1, [TnxId]).
 
--spec reset() -> reset.
+-spec reset() -> ok.
 reset() -> gen_server:call(?MODULE, reset).
 
 -spec status() -> {'atomic', [map()]} | {'aborted', Reason :: term()}.

+ 1 - 3
apps/emqx_conf/test/emqx_conf_app_SUITE.erl

@@ -25,7 +25,6 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 t_copy_conf_override_on_restarts(_Config) ->
-    net_kernel:start(['master@127.0.0.1', longnames]),
     ct:timetrap({seconds, 120}),
     snabbkaffe:fix_ct_logging(),
     Cluster = cluster([core, core, core]),
@@ -165,11 +164,10 @@ cluster(Specs) ->
         {env, Env},
         {apps, [emqx_conf]},
         {load_schema, false},
-        {join_to, false},
+        {join_to, true},
         {env_handler, fun
             (emqx) ->
                 application:set_env(emqx, boot_modules, []),
-                io:format("~p~p~n", [node(), application:get_all_env(emqx)]),
                 ok;
             (_) ->
                 ok

+ 2 - 2
apps/emqx_plugins/test/emqx_plugins_SUITE.erl

@@ -559,8 +559,8 @@ group_t_copy_plugin_to_a_new_node({'end', Config}) ->
     ok = rpc:call(CopyToNode, emqx_config, delete_override_conf_files, []),
     rpc:call(CopyToNode, ekka, leave, []),
     rpc:call(CopyFromNode, ekka, leave, []),
-    {ok, _} = emqx_common_test_helpers:stop_slave(CopyToNode),
-    {ok, _} = emqx_common_test_helpers:stop_slave(CopyFromNode),
+    ok = emqx_common_test_helpers:stop_slave(CopyToNode),
+    ok = emqx_common_test_helpers:stop_slave(CopyFromNode),
     ok = file:del_dir_r(proplists:get_value(to_install_dir, Config)),
     ok = file:del_dir_r(proplists:get_value(from_install_dir, Config));
 group_t_copy_plugin_to_a_new_node(Config) ->

+ 1 - 1
apps/emqx_resource/include/emqx_resource.hrl

@@ -119,5 +119,5 @@
 -define(AUTO_RESTART_INTERVAL, 60000).
 -define(AUTO_RESTART_INTERVAL_RAW, <<"60s">>).
 
--define(TEST_ID_PREFIX, "_test_:").
+-define(TEST_ID_PREFIX, "_probe_:").
 -define(RES_METRICS, resource_metrics).

+ 2 - 0
changes/ee/feat-9564.en.md

@@ -0,0 +1,2 @@
+Implemented Kafka Consumer bridge.
+Now it's possible to consume messages from Kafka and publish them to MQTT topics.

+ 2 - 0
changes/ee/feat-9564.zh.md

@@ -0,0 +1,2 @@
+实现了 Kafka 消费者桥接。
+现在可以从 Kafka 消费消息并将其发布到 MQTT 主题。

+ 186 - 13
lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf

@@ -39,30 +39,30 @@ emqx_ee_bridge_kafka {
             zh: "桥接名字"
         }
     }
-    producer_opts {
+    kafka_producer {
         desc {
-            en: "Local MQTT data source and Kafka bridge configs."
-            zh: "本地 MQTT 数据源和 Kafka 桥接的配置。"
+            en: "Kafka Producer configuration."
+            zh: "Kafka Producer 配置。"
         }
         label {
-            en: "MQTT to Kafka"
-            zh: "MQTT 到 Kafka"
+            en: "Kafka Producer"
+            zh: "Kafka Producer"
         }
     }
-    producer_mqtt_opts {
+    producer_opts {
         desc {
-            en: "MQTT data source. Optional when used as a rule-engine action."
-            zh: "需要桥接到 MQTT 源主题。"
+            en: "Local MQTT data source and Kafka bridge configs."
+            zh: "本地 MQTT 数据源和 Kafka 桥接的配置。"
         }
         label {
-            en: "MQTT Source Topic"
-            zh: "MQTT 源主题"
+            en: "MQTT to Kafka"
+            zh: "MQTT 到 Kafka"
         }
     }
     mqtt_topic {
         desc {
-            en: "MQTT topic or topic as data source (bridge input)."
-            zh: "指定 MQTT 主题作为桥接的数据源"
+            en: "MQTT topic or topic as data source (bridge input).  Should not configure this if the bridge is used as a rule action."
+            zh: "指定 MQTT 主题作为桥接的数据源。 若该桥接用于规则的动作,则必须将该配置项删除。"
         }
         label {
             en: "Source MQTT Topic"
@@ -218,7 +218,7 @@ emqx_ee_bridge_kafka {
     }
     socket_nodelay {
         desc {
-            en: "When set to 'true', TCP buffer sent as soon as possible. "
+            en: "When set to 'true', TCP buffer is sent as soon as possible. "
                 "Otherwise, the OS kernel may buffer small TCP packets for a while (40 ms by default)."
             zh: "设置‘true’让系统内核立即发送。否则当需要发送的内容很少时,可能会有一定延迟(默认 40 毫秒)。"
         }
@@ -473,4 +473,177 @@ emqx_ee_bridge_kafka {
             zh: "GSSAPI/Kerberos"
         }
     }
+
+    kafka_consumer {
+        desc {
+            en: "Kafka Consumer configuration."
+            zh: "Kafka 消费者配置。"
+        }
+        label {
+            en: "Kafka Consumer"
+            zh: "Kafka 消费者"
+        }
+    }
+    consumer_opts {
+        desc {
+            en: "Local MQTT publish and Kafka consumer configs."
+            zh: "本地 MQTT 转发 和 Kafka 消费者配置。"
+        }
+        label {
+            en: "MQTT to Kafka"
+            zh: "MQTT 到 Kafka"
+        }
+    }
+    consumer_kafka_opts {
+        desc {
+            en: "Kafka consumer configs."
+            zh: "Kafka消费者配置。"
+        }
+        label {
+            en: "Kafka Consumer"
+            zh: "Kafka 消费者"
+        }
+    }
+    consumer_mqtt_opts {
+        desc {
+            en: "Local MQTT message publish."
+            zh: "本地 MQTT 消息转发。"
+        }
+        label {
+            en: "MQTT publish"
+            zh: "MQTT 转发"
+        }
+    }
+    consumer_mqtt_topic {
+        desc {
+            en: "Local topic to which consumed Kafka messages should be published to."
+            zh: "设置 Kafka 消息向哪个本地 MQTT 主题转发消息。"
+        }
+        label {
+            en: "MQTT Topic"
+            zh: "MQTT主题"
+        }
+    }
+    consumer_mqtt_qos {
+        desc {
+            en: "MQTT QoS used to publish messages consumed from Kafka."
+            zh: "转发 MQTT 消息时使用的 QoS。"
+        }
+        label {
+            en: "QoS"
+            zh: "QoS"
+        }
+    }
+    consumer_mqtt_payload {
+        desc {
+            en: "The template for transforming the incoming Kafka message."
+                "  By default, it will use JSON format to serialize"
+                " inputs from the Kafka message.  Such fields are:\n"
+                "<code>headers</code>: an object containing string key-value pairs.\n"
+                "<code>key</code>: Kafka message key (uses the chosen key encoding).\n"
+                "<code>offset</code>: offset for the message.\n"
+                "<code>topic</code>: Kafka topic.\n"
+                "<code>ts</code>: message timestamp.\n"
+                "<code>ts_type</code>: message timestamp type, which is one of"
+                " <code>create</code>, <code>append</code> or <code>undefined</code>.\n"
+                "<code>value</code>: Kafka message value (uses the chosen value encoding).\n"
+            zh: "用于转换收到的 Kafka 消息的模板。 "
+                "默认情况下,它将使用 JSON 格式来序列化来自 Kafka 的所有字段。 "
+                "这些字段包括:"
+                "<code>headers</code>:一个包含字符串键值对的 JSON 对象。\n"
+                "<code>key</code>:Kafka 消息的键(使用选择的编码方式编码)。\n"
+                "<code>offset</code>:消息的偏移量。\n"
+                "<code>topic</code>:Kafka 主题。\n"
+                "<code>ts</code>: 消息的时间戳。\n"
+                "<code>ts_type</code>:消息的时间戳类型,值可能是:"
+                " <code>create</code>, <code>append</code> 或 <code>undefined</code>。\n"
+                "<code>value</code>: Kafka 消息值(使用选择的编码方式编码)。\n"
+
+        }
+        label {
+            en: "MQTT Payload Template"
+            zh: "MQTT Payload Template"
+        }
+    }
+    consumer_kafka_topic {
+        desc {
+            en: "Kafka topic to consume from."
+            zh: "指定从哪个 Kafka 主题消费消息。"
+        }
+        label {
+            en: "Kafka topic"
+            zh: "Kafka 主题 "
+        }
+    }
+    consumer_max_batch_bytes {
+        desc {
+            en: "Maximum bytes to fetch in a batch of messages."
+                "Please note that if the configured value is smaller than the message size in Kafka, it may negatively impact the fetch performance."
+            zh: "在一批消息中要取的最大字节数。"
+                "如果该配置小于 Kafka 中消息到大小,则可能会影响消费性能。"
+        }
+        label {
+            en: "Max Bytes"
+            zh: "最大字节数"
+        }
+    }
+    consumer_max_rejoin_attempts {
+        desc {
+            en: "Maximum number of times allowed for a member to re-join the group. If the consumer group can not reach balance after this configured number of attempts, the consumer group member will restart after a delay."
+            zh: "允许一个成员重新加入小组的最大次数。如果超过改配置次数后仍不能成功加入消费组,则会在延迟一段时间后再重试。"
+        }
+        label {
+            en: "Max Rejoin Attempts"
+            zh: "最大的重新加入尝试"
+        }
+    }
+    consumer_offset_reset_policy {
+        desc {
+            en: "Defines how the consumers should reset the start offset when "
+                "a topic partition has and invalid or no initial offset."
+            zh: "定义当一个主题分区的初始偏移量无效或没有初始偏移量时,"
+                "消费者应如何重置开始偏移量。"
+        }
+        label {
+            en: "Offset Reset Policy"
+            zh: "偏移重置策略"
+        }
+    }
+    consumer_offset_commit_interval_seconds {
+        desc {
+            en: "Defines the time interval between two offset commit requests sent for each consumer group."
+            zh: "指定 Kafka 消费组偏移量提交的时间间隔。"
+        }
+        label {
+            en: "Offset Commit Interval"
+            zh: "偏移承诺间隔"
+        }
+    }
+    consumer_topic_mapping {
+        desc {
+            en: "Defines the mapping between Kafka topics and MQTT topics.  Must contain at least one item."
+            zh: "指定 Kafka 主题和 MQTT 主题之间的映射。 必须至少包含一个项目。"
+        }
+        label {
+            en: "Topic Mapping"
+            zh: "主题映射关系"
+        }
+    }
+    consumer_encoding_mode {
+        desc {
+            en: "Defines how the key or value from the Kafka message is"
+                " dealt with before being forwarded via MQTT.\n"
+                "<code>none</code> Uses the key or value from the Kafka message unchanged."
+                "  Note: in this case, then the key or value must be a valid UTF-8 string.\n"
+                "<code>base64</code> Uses base-64 encoding on the received key or value."
+            zh: "定义了在通过MQTT转发之前如何处理Kafka消息的键或值。"
+                "<code>none</code> 使用Kafka消息中的键或值,不改变。"
+                "  注意:在这种情况下,那么键或值必须是一个有效的UTF-8字符串。\n"
+                "<code>base64</code> 对收到的密钥或值使用base-64编码。"
+        }
+        label {
+            en: "Encoding Mode"
+            zh: "编码模式"
+        }
+    }
 }

+ 1 - 1
lib-ee/emqx_ee_bridge/rebar.config

@@ -2,7 +2,7 @@
 {deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.5"}}}
        , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.2"}}}
        , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}}
-       , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.7"}}}
+       , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}
        , {emqx_connector, {path, "../../apps/emqx_connector"}}
        , {emqx_resource, {path, "../../apps/emqx_resource"}}
        , {emqx_bridge, {path, "../../apps/emqx_bridge"}}

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src

@@ -1,7 +1,7 @@
 {application, emqx_ee_bridge, [
     {description, "EMQX Enterprise data bridges"},
     {vsn, "0.1.7"},
-    {registered, []},
+    {registered, [emqx_ee_bridge_kafka_consumer_sup]},
     {applications, [
         kernel,
         stdlib,

+ 29 - 12
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl

@@ -15,7 +15,8 @@
 api_schemas(Method) ->
     [
         ref(emqx_ee_bridge_gcp_pubsub, Method),
-        ref(emqx_ee_bridge_kafka, Method),
+        ref(emqx_ee_bridge_kafka, Method ++ "_consumer"),
+        ref(emqx_ee_bridge_kafka, Method ++ "_producer"),
         ref(emqx_ee_bridge_mysql, Method),
         ref(emqx_ee_bridge_pgsql, Method),
         ref(emqx_ee_bridge_mongodb, Method ++ "_rs"),
@@ -64,7 +65,10 @@ examples(Method) ->
     lists:foldl(Fun, #{}, schema_modules()).
 
 resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8));
-resource_type(kafka) -> emqx_bridge_impl_kafka;
+resource_type(kafka_consumer) -> emqx_bridge_impl_kafka_consumer;
+%% TODO: rename this to `kafka_producer' after alias support is added
+%% to hocon; keeping this as just `kafka' for backwards compatibility.
+resource_type(kafka) -> emqx_bridge_impl_kafka_producer;
 resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb;
 resource_type(gcp_pubsub) -> emqx_ee_connector_gcp_pubsub;
 resource_type(mongodb_rs) -> emqx_ee_connector_mongodb;
@@ -85,14 +89,6 @@ resource_type(dynamo) -> emqx_ee_connector_dynamo.
 
 fields(bridges) ->
     [
-        {kafka,
-            mk(
-                hoconsc:map(name, ref(emqx_ee_bridge_kafka, "config")),
-                #{
-                    desc => <<"Kafka Bridge Config">>,
-                    required => false
-                }
-            )},
         {hstreamdb,
             mk(
                 hoconsc:map(name, ref(emqx_ee_bridge_hstreamdb, "config")),
@@ -133,8 +129,8 @@ fields(bridges) ->
                     required => false
                 }
             )}
-    ] ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ pgsql_structs() ++
-        clickhouse_structs().
+    ] ++ kafka_structs() ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++
+        pgsql_structs() ++ clickhouse_structs().
 
 mongodb_structs() ->
     [
@@ -149,6 +145,27 @@ mongodb_structs() ->
      || Type <- [mongodb_rs, mongodb_sharded, mongodb_single]
     ].
 
+kafka_structs() ->
+    [
+        %% TODO: rename this to `kafka_producer' after alias support
+        %% is added to hocon; keeping this as just `kafka' for
+        %% backwards compatibility.
+        {kafka,
+            mk(
+                hoconsc:map(name, ref(emqx_ee_bridge_kafka, kafka_producer)),
+                #{
+                    desc => <<"Kafka Producer Bridge Config">>,
+                    required => false,
+                    converter => fun emqx_ee_bridge_kafka:kafka_producer_converter/2
+                }
+            )},
+        {kafka_consumer,
+            mk(
+                hoconsc:map(name, ref(emqx_ee_bridge_kafka, kafka_consumer)),
+                #{desc => <<"Kafka Consumer Bridge Config">>, required => false}
+            )}
+    ].
+
 influxdb_structs() ->
     [
         {Protocol,

+ 208 - 41
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl

@@ -30,27 +30,102 @@
     host_opts/0
 ]).
 
+-export([kafka_producer_converter/2]).
+
 %% -------------------------------------------------------------------------------------------------
 %% api
 
 conn_bridge_examples(Method) ->
     [
         #{
+            %% TODO: rename this to `kafka_producer' after alias
+            %% support is added to hocon; keeping this as just `kafka'
+            %% for backwards compatibility.
             <<"kafka">> => #{
-                summary => <<"Kafka Bridge">>,
-                value => values(Method)
+                summary => <<"Kafka Producer Bridge">>,
+                value => values({Method, producer})
+            }
+        },
+        #{
+            <<"kafka_consumer">> => #{
+                summary => <<"Kafka Consumer Bridge">>,
+                value => values({Method, consumer})
             }
         }
     ].
 
-values(get) ->
-    maps:merge(values(post), ?METRICS_EXAMPLE);
-values(post) ->
+values({get, KafkaType}) ->
+    maps:merge(values({post, KafkaType}), ?METRICS_EXAMPLE);
+values({post, KafkaType}) ->
+    maps:merge(values(common_config), values(KafkaType));
+values({put, KafkaType}) ->
+    values({post, KafkaType});
+values(common_config) ->
+    #{
+        authentication => #{
+            mechanism => <<"plain">>,
+            username => <<"username">>,
+            password => <<"password">>
+        },
+        bootstrap_hosts => <<"localhost:9092">>,
+        connect_timeout => <<"5s">>,
+        enable => true,
+        metadata_request_timeout => <<"4s">>,
+        min_metadata_refresh_interval => <<"3s">>,
+        socket_opts => #{
+            sndbuf => <<"1024KB">>,
+            recbuf => <<"1024KB">>,
+            nodelay => true
+        }
+    };
+values(producer) ->
     #{
-        bootstrap_hosts => <<"localhost:9092">>
+        kafka => #{
+            topic => <<"kafka-topic">>,
+            message => #{
+                key => <<"${.clientid}">>,
+                value => <<"${.}">>,
+                timestamp => <<"${.timestamp}">>
+            },
+            max_batch_bytes => <<"896KB">>,
+            compression => <<"no_compression">>,
+            partition_strategy => <<"random">>,
+            required_acks => <<"all_isr">>,
+            partition_count_refresh_interval => <<"60s">>,
+            max_inflight => 10,
+            buffer => #{
+                mode => <<"hybrid">>,
+                per_partition_limit => <<"2GB">>,
+                segment_bytes => <<"100MB">>,
+                memory_overload_protection => true
+            }
+        },
+        local_topic => <<"mqtt/local/topic">>
     };
-values(put) ->
-    values(post).
+values(consumer) ->
+    #{
+        kafka => #{
+            max_batch_bytes => <<"896KB">>,
+            offset_reset_policy => <<"reset_to_latest">>,
+            offset_commit_interval_seconds => 5
+        },
+        key_encoding_mode => <<"none">>,
+        topic_mapping => [
+            #{
+                kafka_topic => <<"kafka-topic-1">>,
+                mqtt_topic => <<"mqtt/topic/1">>,
+                qos => 1,
+                payload_template => <<"${.}">>
+            },
+            #{
+                kafka_topic => <<"kafka-topic-2">>,
+                mqtt_topic => <<"mqtt/topic/2">>,
+                qos => 2,
+                payload_template => <<"v = ${.value}">>
+            }
+        ],
+        value_encoding_mode => <<"none">>
+    }.
 
 %% -------------------------------------------------------------------------------------------------
 %% Hocon Schema Definitions
@@ -60,14 +135,22 @@ host_opts() ->
 
 namespace() -> "bridge_kafka".
 
-roots() -> ["config"].
+roots() -> ["config_consumer", "config_producer"].
 
-fields("post") ->
-    [type_field(), name_field() | fields("config")];
-fields("put") ->
-    fields("config");
-fields("get") ->
-    emqx_bridge_schema:status_fields() ++ fields("post");
+fields("post_" ++ Type) ->
+    [type_field(), name_field() | fields("config_" ++ Type)];
+fields("put_" ++ Type) ->
+    fields("config_" ++ Type);
+fields("get_" ++ Type) ->
+    emqx_bridge_schema:status_fields() ++ fields("post_" ++ Type);
+fields("config_producer") ->
+    fields(kafka_producer);
+fields("config_consumer") ->
+    fields(kafka_consumer);
+fields(kafka_producer) ->
+    fields("config") ++ fields(producer_opts);
+fields(kafka_consumer) ->
+    fields("config") ++ fields(consumer_opts);
 fields("config") ->
     [
         {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
@@ -104,8 +187,6 @@ fields("config") ->
             mk(hoconsc:union([none, ref(auth_username_password), ref(auth_gssapi_kerberos)]), #{
                 default => none, desc => ?DESC("authentication")
             })},
-        {producer, mk(hoconsc:union([none, ref(producer_opts)]), #{desc => ?DESC(producer_opts)})},
-        %{consumer, mk(hoconsc:union([none, ref(consumer_opts)]), #{desc => ?DESC(consumer_opts)})},
         {socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})}
     ] ++ emqx_connector_schema_lib:ssl_fields();
 fields(auth_username_password) ->
@@ -156,15 +237,16 @@ fields(socket_opts) ->
     ];
 fields(producer_opts) ->
     [
-        {mqtt, mk(ref(producer_mqtt_opts), #{desc => ?DESC(producer_mqtt_opts)})},
+        %% Note: there's an implicit convention in `emqx_bridge' that,
+        %% for egress bridges with this config, the published messages
+        %% will be forwarded to such bridges.
+        {local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})},
         {kafka,
             mk(ref(producer_kafka_opts), #{
                 required => true,
                 desc => ?DESC(producer_kafka_opts)
             })}
     ];
-fields(producer_mqtt_opts) ->
-    [{topic, mk(binary(), #{desc => ?DESC(mqtt_topic)})}];
 fields(producer_kafka_opts) ->
     [
         {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})},
@@ -241,28 +323,72 @@ fields(producer_buffer) ->
                 default => false,
                 desc => ?DESC(buffer_memory_overload_protection)
             })}
+    ];
+fields(consumer_opts) ->
+    [
+        {kafka,
+            mk(ref(consumer_kafka_opts), #{required => false, desc => ?DESC(consumer_kafka_opts)})},
+        {topic_mapping,
+            mk(
+                hoconsc:array(ref(consumer_topic_mapping)),
+                #{
+                    required => true,
+                    desc => ?DESC(consumer_topic_mapping),
+                    validator => fun consumer_topic_mapping_validator/1
+                }
+            )},
+        {key_encoding_mode,
+            mk(enum([none, base64]), #{
+                default => none, desc => ?DESC(consumer_encoding_mode)
+            })},
+        {value_encoding_mode,
+            mk(enum([none, base64]), #{
+                default => none, desc => ?DESC(consumer_encoding_mode)
+            })}
+    ];
+fields(consumer_topic_mapping) ->
+    [
+        {kafka_topic, mk(binary(), #{required => true, desc => ?DESC(consumer_kafka_topic)})},
+        {mqtt_topic, mk(binary(), #{required => true, desc => ?DESC(consumer_mqtt_topic)})},
+        {qos, mk(emqx_schema:qos(), #{default => 0, desc => ?DESC(consumer_mqtt_qos)})},
+        {payload_template,
+            mk(
+                string(),
+                #{default => <<"${.}">>, desc => ?DESC(consumer_mqtt_payload)}
+            )}
+    ];
+fields(consumer_kafka_opts) ->
+    [
+        {max_batch_bytes,
+            mk(emqx_schema:bytesize(), #{
+                default => "896KB", desc => ?DESC(consumer_max_batch_bytes)
+            })},
+        {max_rejoin_attempts,
+            mk(non_neg_integer(), #{
+                hidden => true,
+                default => 5,
+                desc => ?DESC(consumer_max_rejoin_attempts)
+            })},
+        {offset_reset_policy,
+            mk(
+                enum([reset_to_latest, reset_to_earliest, reset_by_subscriber]),
+                #{default => reset_to_latest, desc => ?DESC(consumer_offset_reset_policy)}
+            )},
+        {offset_commit_interval_seconds,
+            mk(
+                pos_integer(),
+                #{default => 5, desc => ?DESC(consumer_offset_commit_interval_seconds)}
+            )}
     ].
 
-% fields(consumer_opts) ->
-%     [
-%         {kafka, mk(ref(consumer_kafka_opts), #{required => true, desc => ?DESC(consumer_kafka_opts)})},
-%         {mqtt, mk(ref(consumer_mqtt_opts), #{required => true, desc => ?DESC(consumer_mqtt_opts)})}
-%     ];
-% fields(consumer_mqtt_opts) ->
-%     [ {topic, mk(string(), #{desc => ?DESC(consumer_mqtt_topic)})}
-%     ];
-
-% fields(consumer_mqtt_opts) ->
-%     [ {topic, mk(string(), #{desc => ?DESC(consumer_mqtt_topic)})}
-%     ];
-% fields(consumer_kafka_opts) ->
-%     [ {topic, mk(string(), #{desc => ?DESC(consumer_kafka_topic)})}
-%     ].
-
 desc("config") ->
     ?DESC("desc_config");
-desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
-    ["Configuration for Kafka using `", string:to_upper(Method), "` method."];
+desc("get_" ++ Type) when Type =:= "consumer"; Type =:= "producer" ->
+    ["Configuration for Kafka using `GET` method."];
+desc("put_" ++ Type) when Type =:= "consumer"; Type =:= "producer" ->
+    ["Configuration for Kafka using `PUT` method."];
+desc("post_" ++ Type) when Type =:= "consumer"; Type =:= "producer" ->
+    ["Configuration for Kafka using `POST` method."];
 desc(Name) ->
     lists:member(Name, struct_names()) orelse throw({missing_desc, Name}),
     ?DESC(Name).
@@ -272,20 +398,61 @@ struct_names() ->
         auth_gssapi_kerberos,
         auth_username_password,
         kafka_message,
+        kafka_producer,
+        kafka_consumer,
         producer_buffer,
         producer_kafka_opts,
-        producer_mqtt_opts,
         socket_opts,
-        producer_opts
+        producer_opts,
+        consumer_opts,
+        consumer_kafka_opts,
+        consumer_topic_mapping
     ].
 
 %% -------------------------------------------------------------------------------------------------
 %% internal
 type_field() ->
-    {type, mk(enum([kafka]), #{required => true, desc => ?DESC("desc_type")})}.
+    {type,
+        %% TODO: rename `kafka' to `kafka_producer' after alias
+        %% support is added to hocon; keeping this as just `kafka' for
+        %% backwards compatibility.
+        mk(enum([kafka_consumer, kafka]), #{required => true, desc => ?DESC("desc_type")})}.
 
 name_field() ->
     {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
 
 ref(Name) ->
     hoconsc:ref(?MODULE, Name).
+
+kafka_producer_converter(undefined, _HoconOpts) ->
+    undefined;
+kafka_producer_converter(
+    #{<<"producer">> := OldOpts0, <<"bootstrap_hosts">> := _} = Config0, _HoconOpts
+) ->
+    %% old schema
+    MQTTOpts = maps:get(<<"mqtt">>, OldOpts0, #{}),
+    LocalTopic = maps:get(<<"topic">>, MQTTOpts, undefined),
+    KafkaOpts = maps:get(<<"kafka">>, OldOpts0),
+    Config = maps:without([<<"producer">>], Config0),
+    case LocalTopic =:= undefined of
+        true ->
+            Config#{<<"kafka">> => KafkaOpts};
+        false ->
+            Config#{<<"kafka">> => KafkaOpts, <<"local_topic">> => LocalTopic}
+    end;
+kafka_producer_converter(Config, _HoconOpts) ->
+    %% new schema
+    Config.
+
+consumer_topic_mapping_validator(_TopicMapping = []) ->
+    {error, "There must be at least one Kafka-MQTT topic mapping"};
+consumer_topic_mapping_validator(TopicMapping = [_ | _]) ->
+    NumEntries = length(TopicMapping),
+    KafkaTopics = [KT || #{<<"kafka_topic">> := KT} <- TopicMapping],
+    DistinctKafkaTopics = length(lists:usort(KafkaTopics)),
+    case DistinctKafkaTopics =:= NumEntries of
+        true ->
+            ok;
+        false ->
+            {error, "Kafka topics must not be repeated in a bridge"}
+    end.

+ 31 - 27
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl

@@ -4,34 +4,38 @@
 
 %% Kafka connection configuration
 -module(emqx_bridge_impl_kafka).
--behaviour(emqx_resource).
 
-%% callbacks of behaviour emqx_resource
 -export([
-    callback_mode/0,
-    on_start/2,
-    on_stop/2,
-    on_query/3,
-    on_query_async/4,
-    on_get_status/2,
-    is_buffer_supported/0
+    hosts/1,
+    make_client_id/2,
+    sasl/1
 ]).
 
-is_buffer_supported() -> true.
-
-callback_mode() -> async_if_possible.
-
-on_start(InstId, Config) ->
-    emqx_bridge_impl_kafka_producer:on_start(InstId, Config).
-
-on_stop(InstId, State) ->
-    emqx_bridge_impl_kafka_producer:on_stop(InstId, State).
-
-on_query(InstId, Req, State) ->
-    emqx_bridge_impl_kafka_producer:on_query(InstId, Req, State).
-
-on_query_async(InstId, Req, ReplyFn, State) ->
-    emqx_bridge_impl_kafka_producer:on_query_async(InstId, Req, ReplyFn, State).
-
-on_get_status(InstId, State) ->
-    emqx_bridge_impl_kafka_producer:on_get_status(InstId, State).
+%% Parse comma separated host:port list into a [{Host,Port}] list
+hosts(Hosts) when is_binary(Hosts) ->
+    hosts(binary_to_list(Hosts));
+hosts(Hosts) when is_list(Hosts) ->
+    kpro:parse_endpoints(Hosts).
+
+%% Client ID is better to be unique to make it easier for Kafka side trouble shooting.
+make_client_id(KafkaType0, BridgeName0) ->
+    KafkaType = to_bin(KafkaType0),
+    BridgeName = to_bin(BridgeName0),
+    iolist_to_binary([KafkaType, ":", BridgeName, ":", atom_to_list(node())]).
+
+sasl(none) ->
+    undefined;
+sasl(#{mechanism := Mechanism, username := Username, password := Password}) ->
+    {Mechanism, Username, emqx_secret:wrap(Password)};
+sasl(#{
+    kerberos_principal := Principal,
+    kerberos_keytab_file := KeyTabFile
+}) ->
+    {callback, brod_gssapi, {gssapi, KeyTabFile, Principal}}.
+
+to_bin(A) when is_atom(A) ->
+    atom_to_binary(A);
+to_bin(L) when is_list(L) ->
+    list_to_binary(L);
+to_bin(B) when is_binary(B) ->
+    B.

+ 499 - 0
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl

@@ -0,0 +1,499 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_impl_kafka_consumer).
+
+-behaviour(emqx_resource).
+
+%% `emqx_resource' API
+-export([
+    callback_mode/0,
+    is_buffer_supported/0,
+    on_start/2,
+    on_stop/2,
+    on_get_status/2
+]).
+
+%% `brod_group_consumer' API
+-export([
+    init/2,
+    handle_message/2
+]).
+
+-ifdef(TEST).
+-export([consumer_group_id/1]).
+-endif.
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+%% needed for the #kafka_message record definition
+-include_lib("brod/include/brod.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+
+-type config() :: #{
+    authentication := term(),
+    bootstrap_hosts := binary(),
+    bridge_name := atom(),
+    kafka := #{
+        max_batch_bytes := emqx_schema:bytesize(),
+        max_rejoin_attempts := non_neg_integer(),
+        offset_commit_interval_seconds := pos_integer(),
+        offset_reset_policy := offset_reset_policy(),
+        topic := binary()
+    },
+    topic_mapping := nonempty_list(
+        #{
+            kafka_topic := kafka_topic(),
+            mqtt_topic := emqx_types:topic(),
+            qos := emqx_types:qos(),
+            payload_template := string()
+        }
+    ),
+    ssl := _,
+    any() => term()
+}.
+-type subscriber_id() :: emqx_ee_bridge_kafka_consumer_sup:child_id().
+-type kafka_topic() :: brod:topic().
+-type state() :: #{
+    kafka_topics := nonempty_list(kafka_topic()),
+    subscriber_id := subscriber_id(),
+    kafka_client_id := brod:client_id()
+}.
+-type offset_reset_policy() :: reset_to_latest | reset_to_earliest | reset_by_subscriber.
+%% -type mqtt_payload() :: full_message | message_value.
+-type encoding_mode() :: none | base64.
+-type consumer_init_data() :: #{
+    hookpoint := binary(),
+    key_encoding_mode := encoding_mode(),
+    resource_id := resource_id(),
+    topic_mapping := #{
+        kafka_topic() := #{
+            payload_template := emqx_plugin_libs_rule:tmpl_token(),
+            mqtt_topic => emqx_types:topic(),
+            qos => emqx_types:qos()
+        }
+    },
+    value_encoding_mode := encoding_mode()
+}.
+-type consumer_state() :: #{
+    hookpoint := binary(),
+    kafka_topic := binary(),
+    key_encoding_mode := encoding_mode(),
+    resource_id := resource_id(),
+    topic_mapping := #{
+        kafka_topic() := #{
+            payload_template := emqx_plugin_libs_rule:tmpl_token(),
+            mqtt_topic => emqx_types:topic(),
+            qos => emqx_types:qos()
+        }
+    },
+    value_encoding_mode := encoding_mode()
+}.
+-type subscriber_init_info() :: #{
+    topic => brod:topic(),
+    parition => brod:partition(),
+    group_id => brod:group_id(),
+    commit_fun => brod_group_subscriber_v2:commit_fun()
+}.
+
+%%-------------------------------------------------------------------------------------
+%% `emqx_resource' API
+%%-------------------------------------------------------------------------------------
+
+callback_mode() ->
+    async_if_possible.
+
+%% there are no queries to be made to this bridge, so we say that
+%% buffer is supported so we don't spawn unused resource buffer
+%% workers.
+is_buffer_supported() ->
+    true.
+
+-spec on_start(manager_id(), config()) -> {ok, state()}.
+on_start(InstanceId, Config) ->
+    #{
+        authentication := Auth,
+        bootstrap_hosts := BootstrapHosts0,
+        bridge_name := BridgeName,
+        hookpoint := _,
+        kafka := #{
+            max_batch_bytes := _,
+            max_rejoin_attempts := _,
+            offset_commit_interval_seconds := _,
+            offset_reset_policy := _
+        },
+        ssl := SSL,
+        topic_mapping := _
+    } = Config,
+    BootstrapHosts = emqx_bridge_impl_kafka:hosts(BootstrapHosts0),
+    KafkaType = kafka_consumer,
+    %% Note: this is distinct per node.
+    ClientID = make_client_id(InstanceId, KafkaType, BridgeName),
+    ClientOpts0 =
+        case Auth of
+            none -> [];
+            Auth -> [{sasl, emqx_bridge_impl_kafka:sasl(Auth)}]
+        end,
+    ClientOpts = add_ssl_opts(ClientOpts0, SSL),
+    case brod:start_client(BootstrapHosts, ClientID, ClientOpts) of
+        ok ->
+            ?tp(
+                kafka_consumer_client_started,
+                #{client_id => ClientID, instance_id => InstanceId}
+            ),
+            ?SLOG(info, #{
+                msg => "kafka_consumer_client_started",
+                instance_id => InstanceId,
+                kafka_hosts => BootstrapHosts
+            });
+        {error, Reason} ->
+            ?SLOG(error, #{
+                msg => "failed_to_start_kafka_consumer_client",
+                instance_id => InstanceId,
+                kafka_hosts => BootstrapHosts,
+                reason => emqx_misc:redact(Reason)
+            }),
+            throw(failed_to_start_kafka_client)
+    end,
+    start_consumer(Config, InstanceId, ClientID).
+
+-spec on_stop(manager_id(), state()) -> ok.
+on_stop(_InstanceID, State) ->
+    #{
+        subscriber_id := SubscriberId,
+        kafka_client_id := ClientID
+    } = State,
+    stop_subscriber(SubscriberId),
+    stop_client(ClientID),
+    ok.
+
+-spec on_get_status(manager_id(), state()) -> connected | disconnected.
+on_get_status(_InstanceID, State) ->
+    #{
+        subscriber_id := SubscriberId,
+        kafka_client_id := ClientID,
+        kafka_topics := KafkaTopics
+    } = State,
+    do_get_status(ClientID, KafkaTopics, SubscriberId).
+
+%%-------------------------------------------------------------------------------------
+%% `brod_group_subscriber' API
+%%-------------------------------------------------------------------------------------
+
+-spec init(subscriber_init_info(), consumer_init_data()) -> {ok, consumer_state()}.
+init(GroupData, State0) ->
+    ?tp(kafka_consumer_subscriber_init, #{group_data => GroupData, state => State0}),
+    #{topic := KafkaTopic} = GroupData,
+    State = State0#{kafka_topic => KafkaTopic},
+    {ok, State}.
+
+-spec handle_message(#kafka_message{}, consumer_state()) -> {ok, commit, consumer_state()}.
+handle_message(Message, State) ->
+    ?tp_span(
+        kafka_consumer_handle_message,
+        #{message => Message, state => State},
+        do_handle_message(Message, State)
+    ).
+
+do_handle_message(Message, State) ->
+    #{
+        hookpoint := Hookpoint,
+        kafka_topic := KafkaTopic,
+        key_encoding_mode := KeyEncodingMode,
+        resource_id := ResourceId,
+        topic_mapping := TopicMapping,
+        value_encoding_mode := ValueEncodingMode
+    } = State,
+    #{
+        mqtt_topic := MQTTTopic,
+        qos := MQTTQoS,
+        payload_template := PayloadTemplate
+    } = maps:get(KafkaTopic, TopicMapping),
+    FullMessage = #{
+        headers => maps:from_list(Message#kafka_message.headers),
+        key => encode(Message#kafka_message.key, KeyEncodingMode),
+        offset => Message#kafka_message.offset,
+        topic => KafkaTopic,
+        ts => Message#kafka_message.ts,
+        ts_type => Message#kafka_message.ts_type,
+        value => encode(Message#kafka_message.value, ValueEncodingMode)
+    },
+    Payload = render(FullMessage, PayloadTemplate),
+    MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, Payload),
+    _ = emqx:publish(MQTTMessage),
+    emqx:run_hook(Hookpoint, [FullMessage]),
+    emqx_resource_metrics:received_inc(ResourceId),
+    %% note: just `ack' does not commit the offset to the
+    %% kafka consumer group.
+    {ok, commit, State}.
+
+%%-------------------------------------------------------------------------------------
+%% Helper fns
+%%-------------------------------------------------------------------------------------
+
+add_ssl_opts(ClientOpts, #{enable := false}) ->
+    ClientOpts;
+add_ssl_opts(ClientOpts, SSL) ->
+    [{ssl, emqx_tls_lib:to_client_opts(SSL)} | ClientOpts].
+
+-spec make_subscriber_id(atom() | binary()) -> emqx_ee_bridge_kafka_consumer_sup:child_id().
+make_subscriber_id(BridgeName) ->
+    BridgeNameBin = to_bin(BridgeName),
+    <<"kafka_subscriber:", BridgeNameBin/binary>>.
+
+ensure_consumer_supervisor_started() ->
+    Mod = emqx_ee_bridge_kafka_consumer_sup,
+    ChildSpec =
+        #{
+            id => Mod,
+            start => {Mod, start_link, []},
+            restart => permanent,
+            shutdown => infinity,
+            type => supervisor,
+            modules => [Mod]
+        },
+    case supervisor:start_child(emqx_bridge_sup, ChildSpec) of
+        {ok, _Pid} ->
+            ok;
+        {error, already_present} ->
+            ok;
+        {error, {already_started, _Pid}} ->
+            ok
+    end.
+
+-spec start_consumer(config(), manager_id(), brod:client_id()) -> {ok, state()}.
+start_consumer(Config, InstanceId, ClientID) ->
+    #{
+        bootstrap_hosts := BootstrapHosts0,
+        bridge_name := BridgeName,
+        hookpoint := Hookpoint,
+        kafka := #{
+            max_batch_bytes := MaxBatchBytes,
+            max_rejoin_attempts := MaxRejoinAttempts,
+            offset_commit_interval_seconds := OffsetCommitInterval,
+            offset_reset_policy := OffsetResetPolicy
+        },
+        key_encoding_mode := KeyEncodingMode,
+        topic_mapping := TopicMapping0,
+        value_encoding_mode := ValueEncodingMode
+    } = Config,
+    ok = ensure_consumer_supervisor_started(),
+    TopicMapping = convert_topic_mapping(TopicMapping0),
+    InitialState = #{
+        key_encoding_mode => KeyEncodingMode,
+        hookpoint => Hookpoint,
+        resource_id => emqx_bridge_resource:resource_id(kafka_consumer, BridgeName),
+        topic_mapping => TopicMapping,
+        value_encoding_mode => ValueEncodingMode
+    },
+    %% note: the group id should be the same for all nodes in the
+    %% cluster, so that the load gets distributed between all
+    %% consumers and we don't repeat messages in the same cluster.
+    GroupID = consumer_group_id(BridgeName),
+    ConsumerConfig = [
+        {max_bytes, MaxBatchBytes},
+        {offset_reset_policy, OffsetResetPolicy}
+    ],
+    GroupConfig = [
+        {max_rejoin_attempts, MaxRejoinAttempts},
+        {offset_commit_interval_seconds, OffsetCommitInterval}
+    ],
+    KafkaTopics = maps:keys(TopicMapping),
+    GroupSubscriberConfig =
+        #{
+            client => ClientID,
+            group_id => GroupID,
+            topics => KafkaTopics,
+            cb_module => ?MODULE,
+            init_data => InitialState,
+            message_type => message,
+            consumer_config => ConsumerConfig,
+            group_config => GroupConfig
+        },
+    %% Below, we spawn a single `brod_group_consumer_v2' worker, with
+    %% no option for a pool of those. This is because that worker
+    %% spawns one worker for each assigned topic-partition
+    %% automatically, so we should not spawn duplicate workers.
+    SubscriberId = make_subscriber_id(BridgeName),
+    case emqx_ee_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of
+        {ok, _ConsumerPid} ->
+            ?tp(
+                kafka_consumer_subscriber_started,
+                #{instance_id => InstanceId, subscriber_id => SubscriberId}
+            ),
+            {ok, #{
+                subscriber_id => SubscriberId,
+                kafka_client_id => ClientID,
+                kafka_topics => KafkaTopics
+            }};
+        {error, Reason2} ->
+            ?SLOG(error, #{
+                msg => "failed_to_start_kafka_consumer",
+                instance_id => InstanceId,
+                kafka_hosts => emqx_bridge_impl_kafka:hosts(BootstrapHosts0),
+                reason => emqx_misc:redact(Reason2)
+            }),
+            stop_client(ClientID),
+            throw(failed_to_start_kafka_consumer)
+    end.
+
+-spec stop_subscriber(emqx_ee_bridge_kafka_consumer_sup:child_id()) -> ok.
+stop_subscriber(SubscriberId) ->
+    _ = log_when_error(
+        fun() ->
+            emqx_ee_bridge_kafka_consumer_sup:ensure_child_deleted(SubscriberId)
+        end,
+        #{
+            msg => "failed_to_delete_kafka_subscriber",
+            subscriber_id => SubscriberId
+        }
+    ),
+    ok.
+
+-spec stop_client(brod:client_id()) -> ok.
+stop_client(ClientID) ->
+    _ = log_when_error(
+        fun() ->
+            brod:stop_client(ClientID)
+        end,
+        #{
+            msg => "failed_to_delete_kafka_consumer_client",
+            client_id => ClientID
+        }
+    ),
+    ok.
+
+do_get_status(ClientID, [KafkaTopic | RestTopics], SubscriberId) ->
+    case brod:get_partitions_count(ClientID, KafkaTopic) of
+        {ok, NPartitions} ->
+            case do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) of
+                connected -> do_get_status(ClientID, RestTopics, SubscriberId);
+                disconnected -> disconnected
+            end;
+        _ ->
+            disconnected
+    end;
+do_get_status(_ClientID, _KafkaTopics = [], _SubscriberId) ->
+    connected.
+
+-spec do_get_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) ->
+    connected | disconnected.
+do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
+    Results =
+        lists:map(
+            fun(N) ->
+                brod_client:get_leader_connection(ClientID, KafkaTopic, N)
+            end,
+            lists:seq(0, NPartitions - 1)
+        ),
+    AllLeadersOk =
+        length(Results) > 0 andalso
+            lists:all(
+                fun
+                    ({ok, _}) ->
+                        true;
+                    (_) ->
+                        false
+                end,
+                Results
+            ),
+    WorkersAlive = are_subscriber_workers_alive(SubscriberId),
+    case AllLeadersOk andalso WorkersAlive of
+        true ->
+            connected;
+        false ->
+            disconnected
+    end.
+
+are_subscriber_workers_alive(SubscriberId) ->
+    Children = supervisor:which_children(emqx_ee_bridge_kafka_consumer_sup),
+    case lists:keyfind(SubscriberId, 1, Children) of
+        false ->
+            false;
+        {_, Pid, _, _} ->
+            Workers = brod_group_subscriber_v2:get_workers(Pid),
+            %% we can't enforce the number of partitions on a single
+            %% node, as the group might be spread across an emqx
+            %% cluster.
+            lists:all(fun is_process_alive/1, maps:values(Workers))
+    end.
+
+log_when_error(Fun, Log) ->
+    try
+        Fun()
+    catch
+        C:E ->
+            ?SLOG(error, Log#{
+                exception => C,
+                reason => E
+            })
+    end.
+
+-spec consumer_group_id(atom() | binary()) -> binary().
+consumer_group_id(BridgeName0) ->
+    BridgeName = to_bin(BridgeName0),
+    <<"emqx-kafka-consumer-", BridgeName/binary>>.
+
+-spec is_dry_run(manager_id()) -> boolean().
+is_dry_run(InstanceId) ->
+    TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX),
+    case TestIdStart of
+        nomatch ->
+            false;
+        _ ->
+            string:equal(TestIdStart, InstanceId)
+    end.
+
+-spec make_client_id(manager_id(), kafka_consumer, atom() | binary()) -> atom().
+make_client_id(InstanceId, KafkaType, KafkaName) ->
+    case is_dry_run(InstanceId) of
+        false ->
+            ClientID0 = emqx_bridge_impl_kafka:make_client_id(KafkaType, KafkaName),
+            binary_to_atom(ClientID0);
+        true ->
+            %% It is a dry run and we don't want to leak too many
+            %% atoms.
+            probing_brod_consumers
+    end.
+
+convert_topic_mapping(TopicMappingList) ->
+    lists:foldl(
+        fun(Fields, Acc) ->
+            #{
+                kafka_topic := KafkaTopic,
+                mqtt_topic := MQTTTopic,
+                qos := QoS,
+                payload_template := PayloadTemplate0
+            } = Fields,
+            PayloadTemplate = emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate0),
+            Acc#{
+                KafkaTopic => #{
+                    payload_template => PayloadTemplate,
+                    mqtt_topic => MQTTTopic,
+                    qos => QoS
+                }
+            }
+        end,
+        #{},
+        TopicMappingList
+    ).
+
+render(FullMessage, PayloadTemplate) ->
+    Opts = #{
+        return => full_binary,
+        var_trans => fun
+            (undefined) ->
+                <<>>;
+            (X) ->
+                emqx_plugin_libs_rule:bin(X)
+        end
+    },
+    emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, FullMessage, Opts).
+
+encode(Value, none) ->
+    Value;
+encode(Value, base64) ->
+    base64:encode(Value).
+
+to_bin(B) when is_binary(B) -> B;
+to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).

+ 16 - 49
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl

@@ -22,44 +22,39 @@
 
 -include_lib("emqx/include/logger.hrl").
 
+%% TODO: rename this to `kafka_producer' after alias support is added
+%% to hocon; keeping this as just `kafka' for backwards compatibility.
+-define(BRIDGE_TYPE, kafka).
+
 callback_mode() -> async_if_possible.
 
 %% @doc Config schema is defined in emqx_ee_bridge_kafka.
 on_start(InstId, Config) ->
     #{
-        bridge_name := BridgeName,
+        authentication := Auth,
         bootstrap_hosts := Hosts0,
+        bridge_name := BridgeName,
         connect_timeout := ConnTimeout,
+        kafka := KafkaConfig = #{message := MessageTemplate, topic := KafkaTopic},
         metadata_request_timeout := MetaReqTimeout,
         min_metadata_refresh_interval := MinMetaRefreshInterval,
         socket_opts := SocketOpts,
-        authentication := Auth,
         ssl := SSL
     } = Config,
-    %% TODO: change this to `kafka_producer` after refactoring for kafka_consumer
-    BridgeType = kafka,
-    ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
-    _ = maybe_install_wolff_telemetry_handlers(ResourceID),
-    %% it's a bug if producer config is not found
-    %% the caller should not try to start a producer if
-    %% there is no producer config
-    ProducerConfigWrapper = get_required(producer, Config, no_kafka_producer_config),
-    ProducerConfig = get_required(kafka, ProducerConfigWrapper, no_kafka_producer_parameters),
-    MessageTemplate = get_required(message, ProducerConfig, no_kafka_message_template),
-    Hosts = hosts(Hosts0),
-    ClientId = make_client_id(BridgeName),
+    BridgeType = ?BRIDGE_TYPE,
+    ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
+    _ = maybe_install_wolff_telemetry_handlers(ResourceId),
+    Hosts = emqx_bridge_impl_kafka:hosts(Hosts0),
+    ClientId = emqx_bridge_impl_kafka:make_client_id(BridgeType, BridgeName),
     ClientConfig = #{
         min_metadata_refresh_interval => MinMetaRefreshInterval,
         connect_timeout => ConnTimeout,
         client_id => ClientId,
         request_timeout => MetaReqTimeout,
         extra_sock_opts => socket_opts(SocketOpts),
-        sasl => sasl(Auth),
+        sasl => emqx_bridge_impl_kafka:sasl(Auth),
         ssl => ssl(SSL)
     },
-    #{
-        topic := KafkaTopic
-    } = ProducerConfig,
     case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
         {ok, _} ->
             ?SLOG(info, #{
@@ -85,7 +80,7 @@ on_start(InstId, Config) ->
             _ ->
                 string:equal(TestIdStart, InstId)
         end,
-    WolffProducerConfig = producers_config(BridgeName, ClientId, ProducerConfig, IsDryRun),
+    WolffProducerConfig = producers_config(BridgeName, ClientId, KafkaConfig, IsDryRun),
     case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of
         {ok, Producers} ->
             {ok, #{
@@ -93,7 +88,7 @@ on_start(InstId, Config) ->
                 client_id => ClientId,
                 kafka_topic => KafkaTopic,
                 producers => Producers,
-                resource_id => ResourceID
+                resource_id => ResourceId
             }};
         {error, Reason2} ->
             ?SLOG(error, #{
@@ -265,12 +260,6 @@ do_get_status(Client, KafkaTopic) ->
             disconnected
     end.
 
-%% Parse comma separated host:port list into a [{Host,Port}] list
-hosts(Hosts) when is_binary(Hosts) ->
-    hosts(binary_to_list(Hosts));
-hosts(Hosts) when is_list(Hosts) ->
-    kpro:parse_endpoints(Hosts).
-
 %% Extra socket options, such as sndbuf size etc.
 socket_opts(Opts) when is_map(Opts) ->
     socket_opts(maps:to_list(Opts));
@@ -298,16 +287,6 @@ adjust_socket_buffer(Bytes, Opts) ->
             [{buffer, max(Bytes1, Bytes)} | Acc1]
     end.
 
-sasl(none) ->
-    undefined;
-sasl(#{mechanism := Mechanism, username := Username, password := Password}) ->
-    {Mechanism, Username, emqx_secret:wrap(Password)};
-sasl(#{
-    kerberos_principal := Principal,
-    kerberos_keytab_file := KeyTabFile
-}) ->
-    {callback, brod_gssapi, {gssapi, KeyTabFile, Principal}}.
-
 ssl(#{enable := true} = SSL) ->
     emqx_tls_lib:to_client_opts(SSL);
 ssl(_) ->
@@ -339,8 +318,7 @@ producers_config(BridgeName, ClientId, Input, IsDryRun) ->
             disk -> {false, replayq_dir(ClientId)};
             hybrid -> {true, replayq_dir(ClientId)}
         end,
-    %% TODO: change this once we add kafka source
-    BridgeType = kafka,
+    BridgeType = ?BRIDGE_TYPE,
     ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
     #{
         name => make_producer_name(BridgeName, IsDryRun),
@@ -366,12 +344,6 @@ partitioner(key_dispatch) -> first_key_dispatch.
 replayq_dir(ClientId) ->
     filename:join([emqx:data_dir(), "kafka", ClientId]).
 
-%% Client ID is better to be unique to make it easier for Kafka side trouble shooting.
-make_client_id(BridgeName) when is_atom(BridgeName) ->
-    make_client_id(atom_to_list(BridgeName));
-make_client_id(BridgeName) ->
-    iolist_to_binary([BridgeName, ":", atom_to_list(node())]).
-
 %% Producer name must be an atom which will be used as a ETS table name for
 %% partition worker lookup.
 make_producer_name(BridgeName, IsDryRun) when is_atom(BridgeName) ->
@@ -400,11 +372,6 @@ with_log_at_error(Fun, Log) ->
             })
     end.
 
-get_required(Field, Config, Throw) ->
-    Value = maps:get(Field, Config, none),
-    Value =:= none andalso throw(Throw),
-    Value.
-
 %% we *must* match the bridge id in the event metadata with that in
 %% the handler config; otherwise, multiple kafka producer bridges will
 %% install multiple handlers to the same wolff events, multiplying the

+ 79 - 0
lib-ee/emqx_ee_bridge/src/kafka/emqx_ee_bridge_kafka_consumer_sup.erl

@@ -0,0 +1,79 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ee_bridge_kafka_consumer_sup).
+
+-behaviour(supervisor).
+
+%% `supervisor' API
+-export([init/1]).
+
+%% API
+-export([
+    start_link/0,
+    child_spec/2,
+    start_child/2,
+    ensure_child_deleted/1
+]).
+
+-type child_id() :: binary().
+-export_type([child_id/0]).
+
+%%--------------------------------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------------------------------
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+-spec child_spec(child_id(), map()) -> supervisor:child_spec().
+child_spec(Id, GroupSubscriberConfig) ->
+    Mod = brod_group_subscriber_v2,
+    #{
+        id => Id,
+        start => {Mod, start_link, [GroupSubscriberConfig]},
+        restart => permanent,
+        shutdown => 10_000,
+        type => worker,
+        modules => [Mod]
+    }.
+
+-spec start_child(child_id(), map()) -> {ok, pid()} | {error, term()}.
+start_child(Id, GroupSubscriberConfig) ->
+    ChildSpec = child_spec(Id, GroupSubscriberConfig),
+    case supervisor:start_child(?MODULE, ChildSpec) of
+        {ok, Pid} ->
+            {ok, Pid};
+        {ok, Pid, _Info} ->
+            {ok, Pid};
+        {error, already_present} ->
+            supervisor:restart_child(?MODULE, Id);
+        {error, {already_started, Pid}} ->
+            {ok, Pid};
+        {error, Error} ->
+            {error, Error}
+    end.
+
+-spec ensure_child_deleted(child_id()) -> ok.
+ensure_child_deleted(Id) ->
+    case supervisor:terminate_child(?MODULE, Id) of
+        ok ->
+            ok = supervisor:delete_child(?MODULE, Id),
+            ok;
+        {error, not_found} ->
+            ok
+    end.
+
+%%--------------------------------------------------------------------------------------------
+%% `supervisor' API
+%%--------------------------------------------------------------------------------------------
+
+init([]) ->
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 100,
+        period => 10
+    },
+    ChildSpecs = [],
+    {ok, {SupFlags, ChildSpecs}}.

A diferenza do arquivo foi suprimida porque é demasiado grande
+ 1917 - 0
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl


+ 154 - 76
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl

@@ -11,7 +11,7 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("brod/include/brod.hrl").
 
--define(PRODUCER, emqx_bridge_impl_kafka).
+-define(PRODUCER, emqx_bridge_impl_kafka_producer).
 
 %%------------------------------------------------------------------------------
 %% Things for REST API tests
@@ -30,16 +30,15 @@
 -include_lib("emqx/include/emqx.hrl").
 -include("emqx_dashboard.hrl").
 
--define(CONTENT_TYPE, "application/x-www-form-urlencoded").
-
 -define(HOST, "http://127.0.0.1:18083").
 
 %% -define(API_VERSION, "v5").
 
 -define(BASE_PATH, "/api/v5").
 
--define(APP_DASHBOARD, emqx_dashboard).
--define(APP_MANAGEMENT, emqx_management).
+%% TODO: rename this to `kafka_producer' after alias support is added
+%% to hocon; keeping this as just `kafka' for backwards compatibility.
+-define(BRIDGE_TYPE, "kafka").
 
 %%------------------------------------------------------------------------------
 %% CT boilerplate
@@ -71,6 +70,10 @@ wait_until_kafka_is_up(Attempts) ->
     end.
 
 init_per_suite(Config) ->
+    %% ensure loaded
+    _ = application:load(emqx_ee_bridge),
+    _ = emqx_ee_bridge:module_info(),
+    application:load(emqx_bridge),
     ok = emqx_common_test_helpers:start_apps([emqx_conf]),
     ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]),
     {ok, _} = application:ensure_all_started(emqx_connector),
@@ -102,6 +105,13 @@ init_per_group(GroupName, Config) ->
 end_per_group(_, _) ->
     ok.
 
+init_per_testcase(_TestCase, Config) ->
+    Config.
+
+end_per_testcase(_TestCase, _Config) ->
+    delete_all_bridges(),
+    ok.
+
 set_special_configs(emqx_management) ->
     Listeners = #{http => #{port => 8081}},
     Config = #{
@@ -222,7 +232,7 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) ->
     ok.
 
 kafka_bridge_rest_api_helper(Config) ->
-    BridgeType = "kafka",
+    BridgeType = ?BRIDGE_TYPE,
     BridgeName = "my_kafka_bridge",
     BridgeID = emqx_bridge_resource:bridge_id(
         erlang:list_to_binary(BridgeType),
@@ -233,6 +243,7 @@ kafka_bridge_rest_api_helper(Config) ->
         erlang:list_to_binary(BridgeName)
     ),
     UrlEscColon = "%3A",
+    BridgesProbeParts = ["bridges_probe"],
     BridgeIdUrlEnc = BridgeType ++ UrlEscColon ++ BridgeName,
     BridgesParts = ["bridges"],
     BridgesPartsIdDeleteAlsoActions = ["bridges", BridgeIdUrlEnc ++ "?also_delete_dep_actions"],
@@ -266,24 +277,18 @@ kafka_bridge_rest_api_helper(Config) ->
     %% Create new Kafka bridge
     KafkaTopic = "test-topic-one-partition",
     CreateBodyTmp = #{
-        <<"type">> => <<"kafka">>,
+        <<"type">> => <<?BRIDGE_TYPE>>,
         <<"name">> => <<"my_kafka_bridge">>,
         <<"bootstrap_hosts">> => iolist_to_binary(maps:get(<<"bootstrap_hosts">>, Config)),
         <<"enable">> => true,
         <<"authentication">> => maps:get(<<"authentication">>, Config),
-        <<"producer">> => #{
-            <<"mqtt">> => #{
-                topic => <<"t/#">>
-            },
-            <<"kafka">> => #{
-                <<"topic">> => iolist_to_binary(KafkaTopic),
-                <<"buffer">> => #{
-                    <<"memory_overload_protection">> => <<"false">>
-                },
-                <<"message">> => #{
-                    <<"key">> => <<"${clientid}">>,
-                    <<"value">> => <<"${.payload}">>
-                }
+        <<"local_topic">> => <<"t/#">>,
+        <<"kafka">> => #{
+            <<"topic">> => iolist_to_binary(KafkaTopic),
+            <<"buffer">> => #{<<"memory_overload_protection">> => <<"false">>},
+            <<"message">> => #{
+                <<"key">> => <<"${clientid}">>,
+                <<"value">> => <<"${.payload}">>
             }
         }
     },
@@ -295,6 +300,13 @@ kafka_bridge_rest_api_helper(Config) ->
     {ok, 201, _Data} = show(http_post(BridgesParts, show(CreateBody))),
     %% Check that the new bridge is in the list of bridges
     true = MyKafkaBridgeExists(),
+    %% Probe should work
+    {ok, 204, _} = http_post(BridgesProbeParts, CreateBody),
+    %% no extra atoms should be created when probing
+    AtomsBefore = erlang:system_info(atom_count),
+    {ok, 204, _} = http_post(BridgesProbeParts, CreateBody),
+    AtomsAfter = erlang:system_info(atom_count),
+    ?assertEqual(AtomsBefore, AtomsAfter),
     %% Create a rule that uses the bridge
     {ok, 201, _Rule} = http_post(
         ["rules"],
@@ -355,6 +367,7 @@ kafka_bridge_rest_api_helper(Config) ->
     %% Cleanup
     {ok, 204, _} = show(http_delete(BridgesPartsIdDeleteAlsoActions)),
     false = MyKafkaBridgeExists(),
+    delete_all_bridges(),
     ok.
 
 %%------------------------------------------------------------------------------
@@ -371,9 +384,10 @@ t_failed_creation_then_fix(Config) ->
     ValidAuthSettings = valid_sasl_plain_settings(),
     WrongAuthSettings = ValidAuthSettings#{"password" := "wrong"},
     Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
+    Type = ?BRIDGE_TYPE,
     Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
-    ResourceId = emqx_bridge_resource:resource_id("kafka", Name),
-    BridgeId = emqx_bridge_resource:bridge_id("kafka", Name),
+    ResourceId = emqx_bridge_resource:resource_id(Type, Name),
+    BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
     KafkaTopic = "test-topic-one-partition",
     WrongConf = config(#{
         "authentication" => WrongAuthSettings,
@@ -397,18 +411,20 @@ t_failed_creation_then_fix(Config) ->
         "ssl" => #{}
     }),
     %% creates, but fails to start producers
-    %% FIXME: change to kafka_producer after config refactoring
-    ?assertMatch(ok, emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), WrongConf, #{})),
-    ?assertThrow(failed_to_start_kafka_producer, ?PRODUCER:on_start(ResourceId, WrongConf)),
+    {ok, #{config := WrongConfigAtom1}} = emqx_bridge:create(
+        Type, erlang:list_to_atom(Name), WrongConf
+    ),
+    WrongConfigAtom = WrongConfigAtom1#{bridge_name => Name},
+    ?assertThrow(failed_to_start_kafka_producer, ?PRODUCER:on_start(ResourceId, WrongConfigAtom)),
     %% before throwing, it should cleanup the client process.
     ?assertEqual([], supervisor:which_children(wolff_client_sup)),
-    %% FIXME: change to kafka_producer after config refactoring
     %% must succeed with correct config
-    ?assertMatch(ok, emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), ValidConf, #{})),
-    {ok, State} = ?PRODUCER:on_start(ResourceId, ValidConf),
-    %% To make sure we get unique value
-    timer:sleep(1),
-    Time = erlang:monotonic_time(),
+    {ok, #{config := ValidConfigAtom1}} = emqx_bridge:create(
+        Type, erlang:list_to_atom(Name), ValidConf
+    ),
+    ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name},
+    {ok, State} = ?PRODUCER:on_start(ResourceId, ValidConfigAtom),
+    Time = erlang:unique_integer(),
     BinTime = integer_to_binary(Time),
     Msg = #{
         clientid => BinTime,
@@ -423,6 +439,7 @@ t_failed_creation_then_fix(Config) ->
     %% TODO: refactor those into init/end per testcase
     ok = ?PRODUCER:on_stop(ResourceId, State),
     ok = emqx_bridge_resource:remove(BridgeId),
+    delete_all_bridges(),
     ok.
 
 %%------------------------------------------------------------------------------
@@ -487,6 +504,7 @@ publish_helper(
     },
     Conf0
 ) ->
+    delete_all_bridges(),
     HostsString =
         case {AuthSettings, SSLSettings} of
             {"none", Map} when map_size(Map) =:= 0 ->
@@ -500,8 +518,8 @@ publish_helper(
         end,
     Hash = erlang:phash2([HostsString, AuthSettings, SSLSettings]),
     Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
-    InstId = emqx_bridge_resource:resource_id("kafka", Name),
-    BridgeId = emqx_bridge_resource:bridge_id("kafka", Name),
+    Type = ?BRIDGE_TYPE,
+    InstId = emqx_bridge_resource:resource_id(Type, Name),
     KafkaTopic = "test-topic-one-partition",
     Conf = config(
         #{
@@ -509,30 +527,38 @@ publish_helper(
             "kafka_hosts_string" => HostsString,
             "kafka_topic" => KafkaTopic,
             "instance_id" => InstId,
+            "local_topic" => <<"mqtt/local">>,
             "ssl" => SSLSettings
         },
         Conf0
     ),
-
-    emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), Conf, #{}),
-    %% To make sure we get unique value
-    timer:sleep(1),
-    Time = erlang:monotonic_time(),
+    {ok, _} = emqx_bridge:create(
+        <<?BRIDGE_TYPE>>, list_to_binary(Name), Conf
+    ),
+    Time = erlang:unique_integer(),
     BinTime = integer_to_binary(Time),
+    Partition = 0,
     Msg = #{
         clientid => BinTime,
         payload => <<"payload">>,
         timestamp => Time
     },
-    {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
-    ct:pal("base offset before testing ~p", [Offset]),
-    StartRes = ?PRODUCER:on_start(InstId, Conf),
-    {ok, State} = StartRes,
+    {ok, Offset0} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition),
+    ct:pal("base offset before testing ~p", [Offset0]),
+    {ok, _Group, #{state := State}} = emqx_resource:get_instance(InstId),
     ok = send(CtConfig, InstId, Msg, State),
-    {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
-    ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
-    ok = ?PRODUCER:on_stop(InstId, State),
-    ok = emqx_bridge_resource:remove(BridgeId),
+    {ok, {_, [KafkaMsg0]}} = brod:fetch(kafka_hosts(), KafkaTopic, Partition, Offset0),
+    ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg0),
+
+    %% test that it forwards from local mqtt topic as well
+    {ok, Offset1} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition),
+    ct:pal("base offset before testing (2) ~p", [Offset1]),
+    emqx:publish(emqx_message:make(<<"mqtt/local">>, <<"payload">>)),
+    ct:sleep(2_000),
+    {ok, {_, [KafkaMsg1]}} = brod:fetch(kafka_hosts(), KafkaTopic, Partition, Offset1),
+    ?assertMatch(#kafka_message{value = <<"payload">>}, KafkaMsg1),
+
+    delete_all_bridges(),
     ok.
 
 default_config() ->
@@ -545,18 +571,24 @@ config(Args0, More) ->
     Args1 = maps:merge(default_config(), Args0),
     Args = maps:merge(Args1, More),
     ConfText = hocon_config(Args),
-    ct:pal("Running tests with conf:\n~s", [ConfText]),
-    {ok, Conf} = hocon:binary(ConfText),
-    #{config := Parsed} = hocon_tconf:check_plain(
-        emqx_ee_bridge_kafka,
-        #{<<"config">> => Conf},
-        #{atom_key => true}
-    ),
+    {ok, Conf} = hocon:binary(ConfText, #{format => map}),
+    ct:pal("Running tests with conf:\n~p", [Conf]),
     InstId = maps:get("instance_id", Args),
     <<"bridge:", BridgeId/binary>> = InstId,
-    Parsed#{bridge_name => erlang:element(2, emqx_bridge_resource:parse_bridge_id(BridgeId))}.
+    {Type, Name} = emqx_bridge_resource:parse_bridge_id(BridgeId),
+    TypeBin = atom_to_binary(Type),
+    hocon_tconf:check_plain(
+        emqx_bridge_schema,
+        Conf,
+        #{atom_key => false, required => false}
+    ),
+    #{<<"bridges">> := #{TypeBin := #{Name := Parsed}}} = Conf,
+    Parsed.
 
 hocon_config(Args) ->
+    InstId = maps:get("instance_id", Args),
+    <<"bridge:", BridgeId/binary>> = InstId,
+    {_Type, Name} = emqx_bridge_resource:parse_bridge_id(BridgeId),
     AuthConf = maps:get("authentication", Args),
     AuthTemplate = iolist_to_binary(hocon_config_template_authentication(AuthConf)),
     AuthConfRendered = bbmustache:render(AuthTemplate, AuthConf),
@@ -567,6 +599,7 @@ hocon_config(Args) ->
         iolist_to_binary(hocon_config_template()),
         Args#{
             "authentication" => AuthConfRendered,
+            "bridge_name" => Name,
             "ssl" => SSLConfRendered
         }
     ),
@@ -574,23 +607,34 @@ hocon_config(Args) ->
 
 %% erlfmt-ignore
 hocon_config_template() ->
+%% TODO: rename the type to `kafka_producer' after alias support is
+%% added to hocon; keeping this as just `kafka' for backwards
+%% compatibility.
 """
-bootstrap_hosts = \"{{ kafka_hosts_string }}\"
-enable = true
-authentication = {{{ authentication }}}
-ssl = {{{ ssl }}}
-producer = {
-    mqtt {
-       topic = \"t/#\"
+bridges.kafka.{{ bridge_name }} {
+  bootstrap_hosts = \"{{ kafka_hosts_string }}\"
+  enable = true
+  authentication = {{{ authentication }}}
+  ssl = {{{ ssl }}}
+  local_topic = \"{{ local_topic }}\"
+  kafka = {
+    message = {
+      key = \"${clientid}\"
+      value = \"${.payload}\"
+      timestamp = \"${timestamp}\"
     }
-    kafka = {
-        topic = \"{{ kafka_topic }}\"
-        message = {key = \"${clientid}\", value = \"${.payload}\"}
-        partition_strategy = {{ partition_strategy }}
-        buffer = {
-            memory_overload_protection = false
-        }
+    buffer = {
+      memory_overload_protection = false
     }
+    partition_strategy = {{ partition_strategy }}
+    topic = \"{{ kafka_topic }}\"
+  }
+  metadata_request_timeout = 5s
+  min_metadata_refresh_interval = 3s
+  socket_opts {
+    nodelay = true
+  }
+  connect_timeout = 5s
 }
 """.
 
@@ -631,22 +675,42 @@ hocon_config_template_ssl(_) ->
 """.
 
 kafka_hosts_string() ->
-    "kafka-1.emqx.net:9092,".
+    KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "kafka-1.emqx.net"),
+    KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"),
+    KafkaHost ++ ":" ++ KafkaPort ++ ",".
 
 kafka_hosts_string_sasl() ->
-    "kafka-1.emqx.net:9093,".
+    KafkaHost = os:getenv("KAFKA_SASL_PLAIN_HOST", "kafka-1.emqx.net"),
+    KafkaPort = os:getenv("KAFKA_SASL_PLAIN_PORT", "9093"),
+    KafkaHost ++ ":" ++ KafkaPort ++ ",".
 
 kafka_hosts_string_ssl() ->
-    "kafka-1.emqx.net:9094,".
+    KafkaHost = os:getenv("KAFKA_SSL_HOST", "kafka-1.emqx.net"),
+    KafkaPort = os:getenv("KAFKA_SSL_PORT", "9094"),
+    KafkaHost ++ ":" ++ KafkaPort ++ ",".
 
 kafka_hosts_string_ssl_sasl() ->
-    "kafka-1.emqx.net:9095,".
+    KafkaHost = os:getenv("KAFKA_SASL_SSL_HOST", "kafka-1.emqx.net"),
+    KafkaPort = os:getenv("KAFKA_SASL_SSL_PORT", "9095"),
+    KafkaHost ++ ":" ++ KafkaPort ++ ",".
+
+shared_secret_path() ->
+    os:getenv("CI_SHARED_SECRET_PATH", "/var/lib/secret").
+
+shared_secret(client_keyfile) ->
+    filename:join([shared_secret_path(), "client.key"]);
+shared_secret(client_certfile) ->
+    filename:join([shared_secret_path(), "client.crt"]);
+shared_secret(client_cacertfile) ->
+    filename:join([shared_secret_path(), "ca.crt"]);
+shared_secret(rig_keytab) ->
+    filename:join([shared_secret_path(), "rig.keytab"]).
 
 valid_ssl_settings() ->
     #{
-        "cacertfile" => <<"/var/lib/secret/ca.crt">>,
-        "certfile" => <<"/var/lib/secret/client.crt">>,
-        "keyfile" => <<"/var/lib/secret/client.key">>,
+        "cacertfile" => shared_secret(client_cacertfile),
+        "certfile" => shared_secret(client_certfile),
+        "keyfile" => shared_secret(client_keyfile),
         "enable" => <<"true">>
     }.
 
@@ -670,7 +734,7 @@ valid_sasl_scram512_settings() ->
 valid_sasl_kerberos_settings() ->
     #{
         "kerberos_principal" => "rig@KDC.EMQX.NET",
-        "kerberos_keytab_file" => "/var/lib/secret/rig.keytab"
+        "kerberos_keytab_file" => shared_secret(rig_keytab)
     }.
 
 kafka_hosts() ->
@@ -732,3 +796,17 @@ api_path(Parts) ->
 json(Data) ->
     {ok, Jsx} = emqx_json:safe_decode(Data, [return_maps]),
     Jsx.
+
+delete_all_bridges() ->
+    lists:foreach(
+        fun(#{name := Name, type := Type}) ->
+            emqx_bridge:remove(Type, Name)
+        end,
+        emqx_bridge:list()
+    ),
+    %% at some point during the tests, sometimes `emqx_bridge:list()'
+    %% returns an empty list, but `emqx:get_config([bridges])' returns
+    %% a bunch of orphan test bridges...
+    lists:foreach(fun emqx_resource:remove/1, emqx_resource:list_instances()),
+    emqx_config:put([bridges], #{}),
+    ok.

+ 2 - 2
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl

@@ -1105,13 +1105,13 @@ do_econnrefused_or_timeout_test(Config, Error) ->
             ?assertMatch(
                 #{
                     dropped := Dropped,
-                    failed := 0,
+                    failed := Failed,
                     inflight := Inflight,
                     matched := Matched,
                     queuing := Queueing,
                     retried := 0,
                     success := 0
-                } when Matched >= 1 andalso Inflight + Queueing + Dropped =< 2,
+                } when Matched >= 1 andalso Inflight + Queueing + Dropped + Failed =< 2,
                 CurrentMetrics
             );
         {timeout, async} ->

+ 287 - 0
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl

@@ -0,0 +1,287 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ee_bridge_kafka_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+%%===========================================================================
+%% Test cases
+%%===========================================================================
+
+kafka_producer_test() ->
+    Conf1 = parse(kafka_producer_old_hocon(_WithLocalTopic0 = false)),
+    Conf2 = parse(kafka_producer_old_hocon(_WithLocalTopic1 = true)),
+    Conf3 = parse(kafka_producer_new_hocon()),
+
+    ?assertMatch(
+        #{
+            <<"bridges">> :=
+                #{
+                    <<"kafka">> :=
+                        #{
+                            <<"myproducer">> :=
+                                #{<<"kafka">> := #{}}
+                        }
+                }
+        },
+        check(Conf1)
+    ),
+    ?assertNotMatch(
+        #{
+            <<"bridges">> :=
+                #{
+                    <<"kafka">> :=
+                        #{
+                            <<"myproducer">> :=
+                                #{<<"local_topic">> := _}
+                        }
+                }
+        },
+        check(Conf1)
+    ),
+    ?assertMatch(
+        #{
+            <<"bridges">> :=
+                #{
+                    <<"kafka">> :=
+                        #{
+                            <<"myproducer">> :=
+                                #{
+                                    <<"kafka">> := #{},
+                                    <<"local_topic">> := <<"mqtt/local">>
+                                }
+                        }
+                }
+        },
+        check(Conf2)
+    ),
+    ?assertMatch(
+        #{
+            <<"bridges">> :=
+                #{
+                    <<"kafka">> :=
+                        #{
+                            <<"myproducer">> :=
+                                #{
+                                    <<"kafka">> := #{},
+                                    <<"local_topic">> := <<"mqtt/local">>
+                                }
+                        }
+                }
+        },
+        check(Conf3)
+    ),
+
+    ok.
+
+kafka_consumer_test() ->
+    Conf1 = parse(kafka_consumer_hocon()),
+    ?assertMatch(
+        #{
+            <<"bridges">> :=
+                #{
+                    <<"kafka_consumer">> :=
+                        #{
+                            <<"my_consumer">> := _
+                        }
+                }
+        },
+        check(Conf1)
+    ),
+
+    %% Bad: can't repeat kafka topics.
+    BadConf1 = emqx_map_lib:deep_put(
+        [<<"bridges">>, <<"kafka_consumer">>, <<"my_consumer">>, <<"topic_mapping">>],
+        Conf1,
+        [
+            #{
+                <<"kafka_topic">> => <<"t1">>,
+                <<"mqtt_topic">> => <<"mqtt/t1">>,
+                <<"qos">> => 1,
+                <<"payload_template">> => <<"${.}">>
+            },
+            #{
+                <<"kafka_topic">> => <<"t1">>,
+                <<"mqtt_topic">> => <<"mqtt/t2">>,
+                <<"qos">> => 2,
+                <<"payload_template">> => <<"v = ${.value}">>
+            }
+        ]
+    ),
+    ?assertThrow(
+        {_, [
+            #{
+                path := "bridges.kafka_consumer.my_consumer.topic_mapping",
+                reason := "Kafka topics must not be repeated in a bridge"
+            }
+        ]},
+        check(BadConf1)
+    ),
+
+    %% Bad: there must be at least 1 mapping.
+    BadConf2 = emqx_map_lib:deep_put(
+        [<<"bridges">>, <<"kafka_consumer">>, <<"my_consumer">>, <<"topic_mapping">>],
+        Conf1,
+        []
+    ),
+    ?assertThrow(
+        {_, [
+            #{
+                path := "bridges.kafka_consumer.my_consumer.topic_mapping",
+                reason := "There must be at least one Kafka-MQTT topic mapping"
+            }
+        ]},
+        check(BadConf2)
+    ),
+
+    ok.
+
+%%===========================================================================
+%% Helper functions
+%%===========================================================================
+
+parse(Hocon) ->
+    {ok, Conf} = hocon:binary(Hocon),
+    Conf.
+
+check(Conf) when is_map(Conf) ->
+    hocon_tconf:check_plain(emqx_bridge_schema, Conf).
+
+%%===========================================================================
+%% Data section
+%%===========================================================================
+
+%% erlfmt-ignore
+kafka_producer_old_hocon(_WithLocalTopic = true) ->
+    kafka_producer_old_hocon("mqtt {topic = \"mqtt/local\"}\n");
+kafka_producer_old_hocon(_WithLocalTopic = false) ->
+    kafka_producer_old_hocon("mqtt {}\n");
+kafka_producer_old_hocon(MQTTConfig) when is_list(MQTTConfig) ->
+"""
+bridges.kafka {
+  myproducer {
+    authentication = \"none\"
+    bootstrap_hosts = \"toxiproxy:9292\"
+    connect_timeout = \"5s\"
+    metadata_request_timeout = \"5s\"
+    min_metadata_refresh_interval = \"3s\"
+    producer {
+      kafka {
+        buffer {
+          memory_overload_protection = false
+          mode = \"memory\"
+          per_partition_limit = \"2GB\"
+          segment_bytes = \"100MB\"
+        }
+        compression = \"no_compression\"
+        max_batch_bytes = \"896KB\"
+        max_inflight = 10
+        message {
+          key = \"${.clientid}\"
+          timestamp = \"${.timestamp}\"
+          value = \"${.}\"
+        }
+        partition_count_refresh_interval = \"60s\"
+        partition_strategy = \"random\"
+        required_acks = \"all_isr\"
+        topic = \"test-topic-two-partitions\"
+      }
+""" ++ MQTTConfig ++
+"""
+    }
+    socket_opts {
+      nodelay = true
+      recbuf = \"1024KB\"
+      sndbuf = \"1024KB\"
+    }
+    ssl {enable = false, verify = \"verify_peer\"}
+  }
+}
+""".
+
+kafka_producer_new_hocon() ->
+    ""
+    "\n"
+    "bridges.kafka {\n"
+    "  myproducer {\n"
+    "    authentication = \"none\"\n"
+    "    bootstrap_hosts = \"toxiproxy:9292\"\n"
+    "    connect_timeout = \"5s\"\n"
+    "    metadata_request_timeout = \"5s\"\n"
+    "    min_metadata_refresh_interval = \"3s\"\n"
+    "    kafka {\n"
+    "      buffer {\n"
+    "        memory_overload_protection = false\n"
+    "        mode = \"memory\"\n"
+    "        per_partition_limit = \"2GB\"\n"
+    "        segment_bytes = \"100MB\"\n"
+    "      }\n"
+    "      compression = \"no_compression\"\n"
+    "      max_batch_bytes = \"896KB\"\n"
+    "      max_inflight = 10\n"
+    "      message {\n"
+    "        key = \"${.clientid}\"\n"
+    "        timestamp = \"${.timestamp}\"\n"
+    "        value = \"${.}\"\n"
+    "      }\n"
+    "      partition_count_refresh_interval = \"60s\"\n"
+    "      partition_strategy = \"random\"\n"
+    "      required_acks = \"all_isr\"\n"
+    "      topic = \"test-topic-two-partitions\"\n"
+    "    }\n"
+    "    local_topic = \"mqtt/local\"\n"
+    "    socket_opts {\n"
+    "      nodelay = true\n"
+    "      recbuf = \"1024KB\"\n"
+    "      sndbuf = \"1024KB\"\n"
+    "    }\n"
+    "    ssl {enable = false, verify = \"verify_peer\"}\n"
+    "  }\n"
+    "}\n"
+    "".
+
+%% erlfmt-ignore
+kafka_consumer_hocon() ->
+"""
+bridges.kafka_consumer.my_consumer {
+  enable = true
+  bootstrap_hosts = \"kafka-1.emqx.net:9292\"
+  connect_timeout = 5s
+  min_metadata_refresh_interval = 3s
+  metadata_request_timeout = 5s
+  authentication = {
+    mechanism = plain
+    username = emqxuser
+    password = password
+  }
+  kafka {
+    max_batch_bytes = 896KB
+    max_rejoin_attempts = 5
+    offset_commit_interval_seconds = 3
+    offset_reset_policy = reset_to_latest
+  }
+  topic_mapping = [
+    {
+      kafka_topic = \"kafka-topic-1\"
+      mqtt_topic = \"mqtt/topic/1\"
+      qos = 1
+      payload_template = \"${.}\"
+    },
+    {
+      kafka_topic = \"kafka-topic-2\"
+      mqtt_topic = \"mqtt/topic/2\"
+      qos = 2
+      payload_template = \"v = ${.value}\"
+    }
+  ]
+  key_encoding_mode = none
+  value_encoding_mode = none
+  ssl {
+    enable = false
+    verify = verify_none
+    server_name_indication = \"auto\"
+  }
+}
+""".

+ 2 - 2
mix.exs

@@ -68,7 +68,7 @@ defmodule EMQXUmbrella.MixProject do
       {:telemetry, "1.1.0"},
       # in conflict by emqtt and hocon
       {:getopt, "1.0.2", override: true},
-      {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.0", override: true},
+      {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.7", override: true},
       {:hocon, github: "emqx/hocon", tag: "0.37.0", override: true},
       {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true},
       {:esasl, github: "emqx/esasl", tag: "0.2.0"},
@@ -135,7 +135,7 @@ defmodule EMQXUmbrella.MixProject do
       {:wolff, github: "kafka4beam/wolff", tag: "1.7.5"},
       {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.2", override: true},
       {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"},
-      {:brod, github: "kafka4beam/brod", tag: "3.16.7"},
+      {:brod, github: "kafka4beam/brod", tag: "3.16.8"},
       {:snappyer, "1.2.8", override: true},
       {:supervisor3, "1.1.11", override: true}
     ]

+ 1 - 1
rebar.config

@@ -68,7 +68,7 @@
     , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x
     , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
     , {getopt, "1.0.2"}
-    , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.0"}}}
+    , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.7"}}}
     , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.37.0"}}}
     , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}
     , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}

+ 33 - 36
scripts/ct/run.sh

@@ -21,11 +21,16 @@ help() {
     echo "                        otherwise it runs the entire app's CT"
 }
 
-if command -v docker-compose; then
+set +e
+if docker compose version; then
+    DC='docker compose'
+elif command -v docker-compose; then
     DC='docker-compose'
 else
-    DC='docker compose'
+    echo 'Neither "docker compose" or "docker-compose" are available, stop.'
+    exit 1
 fi
+set -e
 
 WHICH_APP='novalue'
 CONSOLE='no'
@@ -154,14 +159,11 @@ for dep in ${CT_DEPS}; do
                      '.ci/docker-compose-file/docker-compose-pgsql-tls.yaml' )
             ;;
         kafka)
-            # Kafka container generates root owned ssl files
-            # the files are shared with EMQX (with a docker volume)
-            NEED_ROOT=yes
             FILES+=( '.ci/docker-compose-file/docker-compose-kafka.yaml' )
             ;;
         tdengine)
             FILES+=( '.ci/docker-compose-file/docker-compose-tdengine-restful.yaml' )
-            ;; 
+            ;;
         clickhouse)
             FILES+=( '.ci/docker-compose-file/docker-compose-clickhouse.yaml' )
             ;;
@@ -180,47 +182,43 @@ F_OPTIONS=""
 for file in "${FILES[@]}"; do
     F_OPTIONS="$F_OPTIONS -f $file"
 done
-ORIG_UID_GID="$UID:$UID"
-if [[ "${NEED_ROOT:-}" == 'yes' ]]; then
-    export UID_GID='root:root'
-else
-    # Passing $UID to docker-compose to be used in erlang container
-    # as owner of the main process to avoid git repo permissions issue.
-    # Permissions issue happens because we are mounting local filesystem
-    # where files are owned by $UID to docker container where it's using
-    # root (UID=0) by default, and git is not happy about it.
-    export UID_GID="$ORIG_UID_GID"
-fi
 
-# /emqx is where the source dir is mounted to the Erlang container
-# in .ci/docker-compose-file/docker-compose.yaml
+DOCKER_USER="$(id -u)"
+export DOCKER_USER
+
 TTY=''
 if [[ -t 1 ]]; then
     TTY='-t'
 fi
 
-function restore_ownership {
-    if [[ -n ${EMQX_TEST_DO_NOT_RUN_SUDO+x} ]] || ! sudo chown -R "$ORIG_UID_GID" . >/dev/null 2>&1; then
-        docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "chown -R $ORIG_UID_GID /emqx" >/dev/null 2>&1 || true
-    fi
-}
-
-restore_ownership
-trap restore_ownership EXIT
-
+# ensure directory with secrets is created by current user before running compose
+mkdir -p /tmp/emqx-ci/emqx-shared-secret
 
 if [ "$STOP" = 'no' ]; then
     # some left-over log file has to be deleted before a new docker-compose up
     rm -f '.ci/docker-compose-file/redis/*.log'
+    set +e
     # shellcheck disable=2086 # no quotes for F_OPTIONS
     $DC $F_OPTIONS up -d --build --remove-orphans
+    RESULT=$?
+    if [ $RESULT -ne 0 ]; then
+        mkdir -p _build/test/logs
+        LOG='_build/test/logs/docker-compose.log'
+        echo "Dumping docker-compose log to $LOG"
+        # shellcheck disable=2086 # no quotes for F_OPTIONS
+        $DC $F_OPTIONS logs --no-color --timestamps > "$LOG"
+        exit 1
+    fi
+    set -e
 fi
 
-echo "Fixing file owners and permissions for $UID_GID"
-# rebar and hex cache directory need to be writable by $UID
-docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "mkdir -p /.cache && chown $UID_GID /.cache && chown -R $UID_GID /emqx/.git /emqx/.ci /emqx/_build/default/lib"
-# need to initialize .erlang.cookie manually here because / is not writable by $UID
-docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "openssl rand -base64 16 > /.erlang.cookie && chown $UID_GID /.erlang.cookie && chmod 0400 /.erlang.cookie"
+# rebar, mix and hex cache directory need to be writable by $DOCKER_USER
+docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "mkdir -p /.cache /.hex /.mix && chown $DOCKER_USER /.cache /.hex /.mix"
+# need to initialize .erlang.cookie manually here because / is not writable by $DOCKER_USER
+docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "openssl rand -base64 16 > /.erlang.cookie && chown $DOCKER_USER /.erlang.cookie && chmod 0400 /.erlang.cookie"
+# the user must exist inside the container for `whoami` to work
+docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "useradd --uid $DOCKER_USER -M -d / emqx" || true
+docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "chown -R $DOCKER_USER /var/lib/secret" || true
 
 if [ "$ONLY_UP" = 'yes' ]; then
     exit 0
@@ -242,8 +240,7 @@ else
         docker exec -e IS_CI="$IS_CI" -e PROFILE="$PROFILE" -i $TTY "$ERLANG_CONTAINER" bash -c "./rebar3 ct $REBAR3CT"
     fi
     RESULT=$?
-    restore_ownership
-    if [ $RESULT -ne 0 ]; then
+    if [ "$RESULT" -ne 0 ]; then
         LOG='_build/test/logs/docker-compose.log'
         echo "Dumping docker-compose log to $LOG"
         # shellcheck disable=2086 # no quotes for F_OPTIONS
@@ -253,5 +250,5 @@ else
         # shellcheck disable=2086 # no quotes for F_OPTIONS
         $DC $F_OPTIONS down
     fi
-    exit $RESULT
+    exit "$RESULT"
 fi