Browse Source

refactor(kafka_bridge): move kafka bridge into its own app

Fixes https://emqx.atlassian.net/browse/EMQX-9481
Thales Macedo Garitezi 2 năm trước cách đây
mục cha
commit
871ee90b3e

+ 11 - 0
.github/workflows/elixir_deps_check.yaml

@@ -23,7 +23,18 @@ jobs:
           mix local.hex --force
           mix local.rebar --force
           mix deps.get
+        # we check only enterprise because `rebar3 tree`, even if an
+        # enterprise app is excluded from `project_app_dirs` in
+        # `rebar.config.erl`, will still list dependencies from it.
+        # Since the enterprise profile is a superset of the
+        # community one and thus more complete, we use the former.
+        env:
+          MIX_ENV: emqx-enterprise
+          PROFILE: emqx-enterprise
       - name: check elixir deps
         run: ./scripts/check-elixir-deps-discrepancies.exs
+        env:
+          MIX_ENV: emqx-enterprise
+          PROFILE: emqx-enterprise
 
 ...

+ 94 - 0
apps/emqx_bridge_kafka/BSL.txt

@@ -0,0 +1,94 @@
+Business Source License 1.1
+
+Licensor:             Hangzhou EMQ Technologies Co., Ltd.
+Licensed Work:        EMQX Enterprise Edition
+                      The Licensed Work is (c) 2023
+                      Hangzhou EMQ Technologies Co., Ltd.
+Additional Use Grant: Students and educators are granted right to copy,
+                      modify, and create derivative work for research
+                      or education.
+Change Date:          2027-02-01
+Change License:       Apache License, Version 2.0
+
+For information about alternative licensing arrangements for the Software,
+please contact Licensor: https://www.emqx.com/en/contact
+
+Notice
+
+The Business Source License (this document, or the “License”) is not an Open
+Source license. However, the Licensed Work will eventually be made available
+under an Open Source License, as stated in this License.
+
+License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
+“Business Source License” is a trademark of MariaDB Corporation Ab.
+
+-----------------------------------------------------------------------------
+
+Business Source License 1.1
+
+Terms
+
+The Licensor hereby grants you the right to copy, modify, create derivative
+works, redistribute, and make non-production use of the Licensed Work. The
+Licensor may make an Additional Use Grant, above, permitting limited
+production use.
+
+Effective on the Change Date, or the fourth anniversary of the first publicly
+available distribution of a specific version of the Licensed Work under this
+License, whichever comes first, the Licensor hereby grants you rights under
+the terms of the Change License, and the rights granted in the paragraph
+above terminate.
+
+If your use of the Licensed Work does not comply with the requirements
+currently in effect as described in this License, you must purchase a
+commercial license from the Licensor, its affiliated entities, or authorized
+resellers, or you must refrain from using the Licensed Work.
+
+All copies of the original and modified Licensed Work, and derivative works
+of the Licensed Work, are subject to this License. This License applies
+separately for each version of the Licensed Work and the Change Date may vary
+for each version of the Licensed Work released by Licensor.
+
+You must conspicuously display this License on each original or modified copy
+of the Licensed Work. If you receive the Licensed Work in original or
+modified form from a third party, the terms and conditions set forth in this
+License apply to your use of that work.
+
+Any use of the Licensed Work in violation of this License will automatically
+terminate your rights under this License for the current and all other
+versions of the Licensed Work.
+
+This License does not grant you any right in any trademark or logo of
+Licensor or its affiliates (provided that you may use a trademark or logo of
+Licensor as expressly required by this License).
+
+TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
+AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
+EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
+TITLE.
+
+MariaDB hereby grants you permission to use this License’s text to license
+your works, and to refer to it using the trademark “Business Source License”,
+as long as you comply with the Covenants of Licensor below.
+
+Covenants of Licensor
+
+In consideration of the right to use this License’s text and the “Business
+Source License” name and trademark, Licensor covenants to MariaDB, and to all
+other recipients of the licensed work to be provided by Licensor:
+
+1. To specify as the Change License the GPL Version 2.0 or any later version,
+   or a license that is compatible with GPL Version 2.0 or a later version,
+   where “compatible” means that software provided under the Change License can
+   be included in a program with software provided under GPL Version 2.0 or a
+   later version. Licensor may specify additional Change Licenses without
+   limitation.
+
+2. To either: (a) specify an additional grant of rights to use that does not
+   impose any additional restriction on the right granted in this License, as
+   the Additional Use Grant; or (b) insert the text “None”.
+
+3. To specify a Change Date.
+
+4. Not to modify this License in any other way.

+ 19 - 0
apps/emqx_bridge_kafka/README.md

@@ -0,0 +1,19 @@
+# Kafka Data Integration Bridge
+
+This application houses the Kafka Producer and Consumer data
+integration bridges for EMQX Enterprise Edition.  It provides the
+means to connect to Kafka and publish/consume messages to/from it.
+
+Currently, our Kafka Producer library (`wolff`) has its own `replayq`
+buffering implementation, so this bridge does not require buffer
+workers from `emqx_resource`.  It implements the connection management
+and interaction without need for a separate connector app, since it's
+not used by authentication and authorization applications.
+
+## Contributing
+
+Please see our [contributing.md](../../CONTRIBUTING.md).
+
+## License
+
+See [BSL](./BSL.txt).

+ 2 - 0
apps/emqx_bridge_kafka/docker-ct

@@ -0,0 +1,2 @@
+toxiproxy
+kafka

+ 0 - 0
apps/emqx_bridge_kafka/etc/emqx_bridge_kafka.conf


+ 14 - 0
apps/emqx_bridge_kafka/rebar.config

@@ -0,0 +1,14 @@
+%% -*- mode: erlang; -*-
+{erl_opts, [debug_info]}.
+{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.5"}}}
+       , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.2"}}}
+       , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}}
+       , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}
+       , {emqx_connector, {path, "../../apps/emqx_connector"}}
+       , {emqx_resource, {path, "../../apps/emqx_resource"}}
+       , {emqx_bridge, {path, "../../apps/emqx_bridge"}}
+       ]}.
+
+{shell, [
+    {apps, [emqx_bridge_kafka]}
+]}.

+ 16 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src

@@ -0,0 +1,16 @@
+{application, emqx_bridge_kafka, [
+    {description, "EMQX Enterprise Kafka Bridge"},
+    {vsn, "0.1.0"},
+    {registered, [emqx_bridge_kafka_consumer_sup]},
+    {applications, [
+        kernel,
+        stdlib,
+        telemetry,
+        wolff,
+        brod
+    ]},
+    {env, []},
+    {modules, []},
+
+    {links, []}
+]}.

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl

@@ -1,7 +1,7 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_ee_bridge_kafka).
+-module(emqx_bridge_kafka).
 
 -include_lib("emqx_connector/include/emqx_connector.hrl").
 -include_lib("typerefl/include/types.hrl").

+ 1 - 1
lib-ee/emqx_ee_bridge/src/kafka/emqx_ee_bridge_kafka_consumer_sup.erl

@@ -2,7 +2,7 @@
 %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 
--module(emqx_ee_bridge_kafka_consumer_sup).
+-module(emqx_bridge_kafka_consumer_sup).
 
 -behaviour(supervisor).
 

+ 1 - 1
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl

@@ -3,7 +3,7 @@
 %%--------------------------------------------------------------------
 
 %% Kafka connection configuration
--module(emqx_bridge_impl_kafka).
+-module(emqx_bridge_kafka_impl).
 
 -export([
     hosts/1,

+ 14 - 13
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl

@@ -1,7 +1,7 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_bridge_impl_kafka_consumer).
+-module(emqx_bridge_kafka_impl_consumer).
 
 -behaviour(emqx_resource).
 
@@ -52,8 +52,9 @@
     ssl := _,
     any() => term()
 }.
--type subscriber_id() :: emqx_ee_bridge_kafka_consumer_sup:child_id().
+-type subscriber_id() :: emqx_bridge_kafka_consumer_sup:child_id().
 -type kafka_topic() :: brod:topic().
+-type kafka_message() :: #kafka_message{}.
 -type state() :: #{
     kafka_topics := nonempty_list(kafka_topic()),
     subscriber_id := subscriber_id(),
@@ -129,14 +130,14 @@ on_start(InstanceId, Config) ->
         ssl := SSL,
         topic_mapping := _
     } = Config,
-    BootstrapHosts = emqx_bridge_impl_kafka:hosts(BootstrapHosts0),
+    BootstrapHosts = emqx_bridge_kafka_impl:hosts(BootstrapHosts0),
     KafkaType = kafka_consumer,
     %% Note: this is distinct per node.
     ClientID = make_client_id(InstanceId, KafkaType, BridgeName),
     ClientOpts0 =
         case Auth of
             none -> [];
-            Auth -> [{sasl, emqx_bridge_impl_kafka:sasl(Auth)}]
+            Auth -> [{sasl, emqx_bridge_kafka_impl:sasl(Auth)}]
         end,
     ClientOpts = add_ssl_opts(ClientOpts0, SSL),
     case brod:start_client(BootstrapHosts, ClientID, ClientOpts) of
@@ -191,7 +192,7 @@ init(GroupData, State0) ->
     State = State0#{kafka_topic => KafkaTopic},
     {ok, State}.
 
--spec handle_message(#kafka_message{}, consumer_state()) -> {ok, commit, consumer_state()}.
+-spec handle_message(kafka_message(), consumer_state()) -> {ok, commit, consumer_state()}.
 handle_message(Message, State) ->
     ?tp_span(
         kafka_consumer_handle_message,
@@ -240,13 +241,13 @@ add_ssl_opts(ClientOpts, #{enable := false}) ->
 add_ssl_opts(ClientOpts, SSL) ->
     [{ssl, emqx_tls_lib:to_client_opts(SSL)} | ClientOpts].
 
--spec make_subscriber_id(atom() | binary()) -> emqx_ee_bridge_kafka_consumer_sup:child_id().
+-spec make_subscriber_id(atom() | binary()) -> emqx_bridge_kafka_consumer_sup:child_id().
 make_subscriber_id(BridgeName) ->
     BridgeNameBin = to_bin(BridgeName),
     <<"kafka_subscriber:", BridgeNameBin/binary>>.
 
 ensure_consumer_supervisor_started() ->
-    Mod = emqx_ee_bridge_kafka_consumer_sup,
+    Mod = emqx_bridge_kafka_consumer_sup,
     ChildSpec =
         #{
             id => Mod,
@@ -327,7 +328,7 @@ start_consumer(Config, InstanceId, ClientID) ->
     %% spawns one worker for each assigned topic-partition
     %% automatically, so we should not spawn duplicate workers.
     SubscriberId = make_subscriber_id(BridgeName),
-    case emqx_ee_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of
+    case emqx_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of
         {ok, _ConsumerPid} ->
             ?tp(
                 kafka_consumer_subscriber_started,
@@ -342,18 +343,18 @@ start_consumer(Config, InstanceId, ClientID) ->
             ?SLOG(error, #{
                 msg => "failed_to_start_kafka_consumer",
                 instance_id => InstanceId,
-                kafka_hosts => emqx_bridge_impl_kafka:hosts(BootstrapHosts0),
+                kafka_hosts => emqx_bridge_kafka_impl:hosts(BootstrapHosts0),
                 reason => emqx_misc:redact(Reason2)
             }),
             stop_client(ClientID),
             throw(failed_to_start_kafka_consumer)
     end.
 
--spec stop_subscriber(emqx_ee_bridge_kafka_consumer_sup:child_id()) -> ok.
+-spec stop_subscriber(emqx_bridge_kafka_consumer_sup:child_id()) -> ok.
 stop_subscriber(SubscriberId) ->
     _ = log_when_error(
         fun() ->
-            emqx_ee_bridge_kafka_consumer_sup:ensure_child_deleted(SubscriberId)
+            emqx_bridge_kafka_consumer_sup:ensure_child_deleted(SubscriberId)
         end,
         #{
             msg => "failed_to_delete_kafka_subscriber",
@@ -437,7 +438,7 @@ do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
     end.
 
 are_subscriber_workers_alive(SubscriberId) ->
-    Children = supervisor:which_children(emqx_ee_bridge_kafka_consumer_sup),
+    Children = supervisor:which_children(emqx_bridge_kafka_consumer_sup),
     case lists:keyfind(SubscriberId, 1, Children) of
         false ->
             false;
@@ -479,7 +480,7 @@ is_dry_run(InstanceId) ->
 make_client_id(InstanceId, KafkaType, KafkaName) ->
     case is_dry_run(InstanceId) of
         false ->
-            ClientID0 = emqx_bridge_impl_kafka:make_client_id(KafkaType, KafkaName),
+            ClientID0 = emqx_bridge_kafka_impl:make_client_id(KafkaType, KafkaName),
             binary_to_atom(ClientID0);
         true ->
             %% It is a dry run and we don't want to leak too many

+ 4 - 4
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl

@@ -1,7 +1,7 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_bridge_impl_kafka_producer).
+-module(emqx_bridge_kafka_impl_producer).
 
 -include_lib("emqx_resource/include/emqx_resource.hrl").
 
@@ -47,15 +47,15 @@ on_start(InstId, Config) ->
     BridgeType = ?BRIDGE_TYPE,
     ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
     _ = maybe_install_wolff_telemetry_handlers(ResourceId),
-    Hosts = emqx_bridge_impl_kafka:hosts(Hosts0),
-    ClientId = emqx_bridge_impl_kafka:make_client_id(BridgeType, BridgeName),
+    Hosts = emqx_bridge_kafka_impl:hosts(Hosts0),
+    ClientId = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName),
     ClientConfig = #{
         min_metadata_refresh_interval => MinMetaRefreshInterval,
         connect_timeout => ConnTimeout,
         client_id => ClientId,
         request_timeout => MetaReqTimeout,
         extra_sock_opts => socket_opts(SocketOpts),
-        sasl => emqx_bridge_impl_kafka:sasl(Auth),
+        sasl => emqx_bridge_kafka_impl:sasl(Auth),
         ssl => ssl(SSL)
     },
     case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of

+ 23 - 11
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl

@@ -1,7 +1,7 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_bridge_impl_kafka_consumer_SUITE).
+-module(emqx_bridge_kafka_impl_consumer_SUITE).
 
 -compile(nowarn_export_all).
 -compile(export_all).
@@ -15,6 +15,7 @@
 -import(emqx_common_test_helpers, [on_exit/1]).
 
 -define(BRIDGE_TYPE_BIN, <<"kafka_consumer">>).
+-define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_kafka]).
 
 %%------------------------------------------------------------------------------
 %% CT boilerplate
@@ -67,7 +68,7 @@ init_per_suite(Config) ->
 end_per_suite(_Config) ->
     emqx_mgmt_api_test_util:end_suite(),
     ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
-    ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]),
+    ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?APPS)),
     _ = application:stop(emqx_connector),
     ok.
 
@@ -228,7 +229,7 @@ common_init_per_group() ->
     emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
     application:load(emqx_bridge),
     ok = emqx_common_test_helpers:start_apps([emqx_conf]),
-    ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]),
+    ok = emqx_connector_test_helpers:start_apps(?APPS),
     {ok, _} = application:ensure_all_started(emqx_connector),
     emqx_mgmt_api_test_util:init_suite(),
     UniqueNum = integer_to_binary(erlang:unique_integer()),
@@ -408,7 +409,7 @@ start_producers(TestCase, Config) ->
     DirectKafkaPort = ?config(direct_kafka_port, Config),
     UseTLS = ?config(use_tls, Config),
     UseSASL = ?config(use_sasl, Config),
-    Hosts = emqx_bridge_impl_kafka:hosts(
+    Hosts = emqx_bridge_kafka_impl:hosts(
         DirectKafkaHost ++ ":" ++ integer_to_list(DirectKafkaPort)
     ),
     SSL =
@@ -876,7 +877,7 @@ ensure_connected(Config) ->
 
 consumer_clientid(Config) ->
     KafkaName = ?config(kafka_name, Config),
-    binary_to_atom(emqx_bridge_impl_kafka:make_client_id(kafka_consumer, KafkaName)).
+    binary_to_atom(emqx_bridge_kafka_impl:make_client_id(kafka_consumer, KafkaName)).
 
 get_client_connection(Config) ->
     KafkaHost = ?config(kafka_host, Config),
@@ -885,7 +886,7 @@ get_client_connection(Config) ->
     brod_client:get_connection(ClientID, KafkaHost, KafkaPort).
 
 get_subscriber_workers() ->
-    [{_, SubscriberPid, _, _}] = supervisor:which_children(emqx_ee_bridge_kafka_consumer_sup),
+    [{_, SubscriberPid, _, _}] = supervisor:which_children(emqx_bridge_kafka_consumer_sup),
     brod_group_subscriber_v2:get_workers(SubscriberPid).
 
 wait_downs(Refs, _Timeout) when map_size(Refs) =:= 0 ->
@@ -1069,7 +1070,7 @@ cluster(Config) ->
     Cluster = emqx_common_test_helpers:emqx_cluster(
         [core, core],
         [
-            {apps, [emqx_conf, emqx_bridge, emqx_rule_engine]},
+            {apps, [emqx_conf, emqx_bridge, emqx_rule_engine, emqx_bridge_kafka]},
             {listener_ports, []},
             {peer_mod, PeerModule},
             {priv_data_dir, PrivDataDir},
@@ -1504,7 +1505,7 @@ do_t_receive_after_recovery(Config) ->
                 _Interval = 500,
                 _NAttempts = 20,
                 begin
-                    GroupId = emqx_bridge_impl_kafka_consumer:consumer_group_id(KafkaNameA),
+                    GroupId = emqx_bridge_kafka_impl_consumer:consumer_group_id(KafkaNameA),
                     {ok, [#{partitions := Partitions}]} = brod:fetch_committed_offsets(
                         KafkaClientId, GroupId
                     ),
@@ -1745,8 +1746,12 @@ t_node_joins_existing_cluster(Config) ->
     ?check_trace(
         begin
             [{Name1, Opts1}, {Name2, Opts2} | _] = Cluster,
+            ct:pal("starting ~p", [Name1]),
             N1 = emqx_common_test_helpers:start_slave(Name1, Opts1),
-            on_exit(fun() -> ok = emqx_common_test_helpers:stop_slave(N1) end),
+            on_exit(fun() ->
+                ct:pal("stopping ~p", [N1]),
+                ok = emqx_common_test_helpers:stop_slave(N1)
+            end),
             setup_group_subscriber_spy(N1),
             {{ok, _}, {ok, _}} =
                 ?wait_async_action(
@@ -1785,8 +1790,12 @@ t_node_joins_existing_cluster(Config) ->
                 1,
                 30_000
             ),
+            ct:pal("starting ~p", [Name2]),
             N2 = emqx_common_test_helpers:start_slave(Name2, Opts2),
-            on_exit(fun() -> ok = emqx_common_test_helpers:stop_slave(N2) end),
+            on_exit(fun() ->
+                ct:pal("stopping ~p", [N2]),
+                ok = emqx_common_test_helpers:stop_slave(N2)
+            end),
             setup_group_subscriber_spy(N2),
             Nodes = [N1, N2],
             wait_for_cluster_rpc(N2),
@@ -1873,7 +1882,10 @@ t_cluster_node_down(Config) ->
             Nodes =
                 [N1, N2 | _] =
                 lists:map(
-                    fun({Name, Opts}) -> emqx_common_test_helpers:start_slave(Name, Opts) end,
+                    fun({Name, Opts}) ->
+                        ct:pal("starting ~p", [Name]),
+                        emqx_common_test_helpers:start_slave(Name, Opts)
+                    end,
                     Cluster
                 ),
             on_exit(fun() ->

+ 6 - 4
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl

@@ -2,7 +2,7 @@
 %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 
--module(emqx_bridge_impl_kafka_producer_SUITE).
+-module(emqx_bridge_kafka_impl_producer_SUITE).
 
 -compile(nowarn_export_all).
 -compile(export_all).
@@ -12,7 +12,7 @@
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("brod/include/brod.hrl").
 
--define(PRODUCER, emqx_bridge_impl_kafka_producer).
+-define(PRODUCER, emqx_bridge_kafka_impl_producer).
 
 %%------------------------------------------------------------------------------
 %% Things for REST API tests
@@ -41,6 +41,8 @@
 %% to hocon; keeping this as just `kafka' for backwards compatibility.
 -define(BRIDGE_TYPE, "kafka").
 
+-define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine, emqx_bridge_kafka]).
+
 %%------------------------------------------------------------------------------
 %% CT boilerplate
 %%------------------------------------------------------------------------------
@@ -76,7 +78,7 @@ init_per_suite(Config) ->
     _ = emqx_ee_bridge:module_info(),
     application:load(emqx_bridge),
     ok = emqx_common_test_helpers:start_apps([emqx_conf]),
-    ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]),
+    ok = emqx_connector_test_helpers:start_apps(?APPS),
     {ok, _} = application:ensure_all_started(emqx_connector),
     emqx_mgmt_api_test_util:init_suite(),
     wait_until_kafka_is_up(),
@@ -96,7 +98,7 @@ init_per_suite(Config) ->
 end_per_suite(_Config) ->
     emqx_mgmt_api_test_util:end_suite(),
     ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
-    ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]),
+    ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?APPS)),
     _ = application:stop(emqx_connector),
     ok.
 

+ 1 - 1
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl

@@ -2,7 +2,7 @@
 %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 
--module(emqx_ee_bridge_kafka_tests).
+-module(emqx_bridge_kafka_tests).
 
 -include_lib("eunit/include/eunit.hrl").
 

+ 0 - 1
lib-ee/emqx_ee_bridge/docker-ct

@@ -1,6 +1,5 @@
 toxiproxy
 influxdb
-kafka
 mongo
 mongo_rs_sharded
 mysql

+ 1 - 5
lib-ee/emqx_ee_bridge/rebar.config

@@ -1,9 +1,5 @@
 {erl_opts, [debug_info]}.
-{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.5"}}}
-       , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.2"}}}
-       , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}}
-       , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}
-       , {ecql, {git, "https://github.com/emqx/ecql.git", {tag, "v0.5.1"}}}
+{deps, [ {ecql, {git, "https://github.com/emqx/ecql.git", {tag, "v0.5.1"}}}
        , {emqx_connector, {path, "../../apps/emqx_connector"}}
        , {emqx_resource, {path, "../../apps/emqx_resource"}}
        , {emqx_bridge, {path, "../../apps/emqx_bridge"}}

+ 3 - 2
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src

@@ -1,12 +1,13 @@
 {application, emqx_ee_bridge, [
     {description, "EMQX Enterprise data bridges"},
     {vsn, "0.1.9"},
-    {registered, [emqx_ee_bridge_kafka_consumer_sup]},
+    {registered, []},
     {applications, [
         kernel,
         stdlib,
         emqx_ee_connector,
-        telemetry
+        telemetry,
+        emqx_bridge_kafka
     ]},
     {env, []},
     {modules, []},

+ 8 - 8
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl

@@ -15,8 +15,8 @@
 api_schemas(Method) ->
     [
         ref(emqx_ee_bridge_gcp_pubsub, Method),
-        ref(emqx_ee_bridge_kafka, Method ++ "_consumer"),
-        ref(emqx_ee_bridge_kafka, Method ++ "_producer"),
+        ref(emqx_bridge_kafka, Method ++ "_consumer"),
+        ref(emqx_bridge_kafka, Method ++ "_producer"),
         ref(emqx_ee_bridge_mysql, Method),
         ref(emqx_ee_bridge_pgsql, Method),
         ref(emqx_ee_bridge_mongodb, Method ++ "_rs"),
@@ -39,7 +39,7 @@ api_schemas(Method) ->
 
 schema_modules() ->
     [
-        emqx_ee_bridge_kafka,
+        emqx_bridge_kafka,
         emqx_ee_bridge_hstreamdb,
         emqx_ee_bridge_gcp_pubsub,
         emqx_ee_bridge_influxdb,
@@ -69,10 +69,10 @@ examples(Method) ->
     lists:foldl(Fun, #{}, schema_modules()).
 
 resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8));
-resource_type(kafka_consumer) -> emqx_bridge_impl_kafka_consumer;
+resource_type(kafka_consumer) -> emqx_bridge_kafka_impl_consumer;
 %% TODO: rename this to `kafka_producer' after alias support is added
 %% to hocon; keeping this as just `kafka' for backwards compatibility.
-resource_type(kafka) -> emqx_bridge_impl_kafka_producer;
+resource_type(kafka) -> emqx_bridge_kafka_impl_producer;
 resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb;
 resource_type(gcp_pubsub) -> emqx_ee_connector_gcp_pubsub;
 resource_type(mongodb_rs) -> emqx_ee_connector_mongodb;
@@ -174,16 +174,16 @@ kafka_structs() ->
         %% backwards compatibility.
         {kafka,
             mk(
-                hoconsc:map(name, ref(emqx_ee_bridge_kafka, kafka_producer)),
+                hoconsc:map(name, ref(emqx_bridge_kafka, kafka_producer)),
                 #{
                     desc => <<"Kafka Producer Bridge Config">>,
                     required => false,
-                    converter => fun emqx_ee_bridge_kafka:kafka_producer_converter/2
+                    converter => fun emqx_bridge_kafka:kafka_producer_converter/2
                 }
             )},
         {kafka_consumer,
             mk(
-                hoconsc:map(name, ref(emqx_ee_bridge_kafka, kafka_consumer)),
+                hoconsc:map(name, ref(emqx_bridge_kafka, kafka_consumer)),
                 #{desc => <<"Kafka Consumer Bridge Config">>, required => false}
             )}
     ].

+ 0 - 2
lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src

@@ -8,8 +8,6 @@
         hstreamdb_erl,
         influxdb,
         tdengine,
-        wolff,
-        brod,
         clickhouse,
         erlcloud,
         rocketmq,

+ 24 - 1
mix.exs

@@ -107,6 +107,8 @@ defmodule EMQXUmbrella.MixProject do
   end
 
   defp umbrella_apps() do
+    enterprise_apps = enterprise_umbrella_apps()
+
     "apps/*"
     |> Path.wildcard()
     |> Enum.map(fn path ->
@@ -117,9 +119,20 @@ defmodule EMQXUmbrella.MixProject do
 
       {app, path: path, manager: :rebar3, override: true}
     end)
+    |> Enum.reject(fn dep_spec ->
+      dep_spec
+      |> elem(0)
+      |> then(&MapSet.member?(enterprise_apps, &1))
+    end)
   end
 
   defp enterprise_apps(_profile_info = %{edition_type: :enterprise}) do
+    umbrella_apps =
+      Enum.map(enterprise_umbrella_apps(), fn app_name ->
+        path = "apps/#{app_name}"
+        {app_name, path: path, manager: :rebar3, override: true}
+      end)
+
     "lib-ee/*"
     |> Path.wildcard()
     |> Enum.filter(&File.dir?/1)
@@ -131,12 +144,20 @@ defmodule EMQXUmbrella.MixProject do
 
       {app, path: path, manager: :rebar3, override: true}
     end)
+    |> Enum.concat(umbrella_apps)
   end
 
   defp enterprise_apps(_profile_info) do
     []
   end
 
+  # need to remove those when listing `/apps/`...
+  defp enterprise_umbrella_apps() do
+    MapSet.new([
+      :emqx_bridge_kafka
+    ])
+  end
+
   defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
     [
       {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"},
@@ -146,7 +167,8 @@ defmodule EMQXUmbrella.MixProject do
       {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"},
       {:brod, github: "kafka4beam/brod", tag: "3.16.8"},
       {:snappyer, "1.2.8", override: true},
-      {:supervisor3, "1.1.11", override: true}
+      {:crc32cer, "0.1.8", override: true},
+      {:supervisor3, "1.1.12", override: true}
     ]
   end
 
@@ -320,6 +342,7 @@ defmodule EMQXUmbrella.MixProject do
           emqx_ee_conf: :load,
           emqx_ee_connector: :permanent,
           emqx_ee_bridge: :permanent,
+          emqx_bridge_kafka: :permanent,
           emqx_ee_schema_registry: :permanent
         ],
         else: []

+ 12 - 2
rebar.config.erl

@@ -78,6 +78,9 @@ is_cover_enabled() ->
 is_enterprise(ce) -> false;
 is_enterprise(ee) -> true.
 
+is_community_umbrella_app("apps/emqx_bridge_kafka") -> false;
+is_community_umbrella_app(_) -> true.
+
 is_jq_supported() ->
     not (false =/= os:getenv("BUILD_WITHOUT_JQ") orelse
         is_win32()) orelse
@@ -122,8 +125,14 @@ project_app_dirs() ->
     project_app_dirs(get_edition_from_profile_env()).
 
 project_app_dirs(Edition) ->
-    ["apps/*"] ++
-        case is_enterprise(Edition) of
+    IsEnterprise = is_enterprise(Edition),
+    UmbrellaApps = [
+        Path
+     || Path <- filelib:wildcard("apps/*"),
+        is_community_umbrella_app(Path) orelse IsEnterprise
+    ],
+    UmbrellaApps ++
+        case IsEnterprise of
             true -> ["lib-ee/*"];
             false -> []
         end.
@@ -428,6 +437,7 @@ relx_apps_per_edition(ee) ->
         {emqx_ee_conf, load},
         emqx_ee_connector,
         emqx_ee_bridge,
+        emqx_bridge_kafka,
         emqx_ee_schema_registry
     ];
 relx_apps_per_edition(ce) ->

+ 1 - 1
rel/i18n/emqx_ee_bridge_kafka.hocon

@@ -1,4 +1,4 @@
-emqx_ee_bridge_kafka {
+emqx_bridge_kafka {
     config_enable {
         desc {
             en: "Enable (true) or disable (false) this Kafka bridge."

+ 5 - 1
scripts/ct/run.sh

@@ -107,6 +107,10 @@ case "${WHICH_APP}" in
         ## ensure enterprise profile when testing lib-ee applications
         export PROFILE='emqx-enterprise'
         ;;
+    apps/emqx_bridge_kafka)
+        ## ensure enterprise profile when testing ee applications
+        export PROFILE='emqx-enterprise'
+        ;;
     *)
         export PROFILE="${PROFILE:-emqx}"
         ;;
@@ -172,7 +176,7 @@ for dep in ${CT_DEPS}; do
             ;;
         rocketmq)
             FILES+=( '.ci/docker-compose-file/docker-compose-rocketmq.yaml' )
-            ;; 
+            ;;
         cassandra)
             FILES+=( '.ci/docker-compose-file/docker-compose-cassandra.yaml' )
             ;;

+ 3 - 0
scripts/find-apps.sh

@@ -72,6 +72,9 @@ describe_app() {
         runner="docker"
     fi
     case "${app}" in
+        apps/emqx_bridge_kafka)
+            profile='emqx-enterprise'
+            ;;
         apps/*)
             profile='emqx'
             ;;