Преглед изворни кода

feat: implement Pulsar Producer bridge (e5.0)

Fixes https://emqx.atlassian.net/browse/EMQX-8398
Thales Macedo Garitezi пре 2 година
родитељ
комит
ad4be08bb2
29 измењених фајлова са 2160 додато и 11 уклоњено
  1. 32 0
      .ci/docker-compose-file/docker-compose-pulsar-tcp.yaml
  2. 12 0
      .ci/docker-compose-file/toxiproxy.json
  3. 1 0
      LICENSE
  4. 22 2
      apps/emqx/test/emqx_test_janitor.erl
  5. 2 1
      apps/emqx_bridge/src/emqx_bridge.erl
  6. 2 0
      apps/emqx_bridge/src/emqx_bridge_resource.erl
  7. 19 0
      apps/emqx_bridge_pulsar/.gitignore
  8. 94 0
      apps/emqx_bridge_pulsar/BSL.txt
  9. 30 0
      apps/emqx_bridge_pulsar/README.md
  10. 2 0
      apps/emqx_bridge_pulsar/docker-ct
  11. 0 0
      apps/emqx_bridge_pulsar/etc/emqx_bridge_pulsar.conf
  12. 14 0
      apps/emqx_bridge_pulsar/include/emqx_bridge_pulsar.hrl
  13. 13 0
      apps/emqx_bridge_pulsar/rebar.config
  14. 15 0
      apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src
  15. 228 0
      apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl
  16. 14 0
      apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_app.erl
  17. 396 0
      apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl
  18. 33 0
      apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_sup.erl
  19. 819 0
      apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl
  20. 25 0
      apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE_data/pulsar_echo_consumer.erl
  21. 6 1
      apps/emqx_resource/src/emqx_resource_manager.erl
  22. 1 0
      changes/ee/feat-10378.en.md
  23. 3 1
      lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src
  24. 20 4
      lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl
  25. 3 1
      mix.exs
  26. 1 0
      rebar.config.erl
  27. 176 0
      rel/i18n/emqx_bridge_pulsar.hocon
  28. 173 0
      rel/i18n/zh/emqx_bridge_pulsar.hocon
  29. 4 1
      scripts/ct/run.sh

+ 32 - 0
.ci/docker-compose-file/docker-compose-pulsar-tcp.yaml

@@ -0,0 +1,32 @@
+version: '3'
+
+services:
+  pulsar:
+    container_name: pulsar
+    image: apachepulsar/pulsar:2.11.0
+    # ports:
+    #   - 6650:6650
+    #   - 8080:8080
+    networks:
+      emqx_bridge:
+    volumes:
+      - ../../apps/emqx/etc/certs/cert.pem:/etc/certs/server.pem
+      - ../../apps/emqx/etc/certs/key.pem:/etc/certs/key.pem
+      - ../../apps/emqx/etc/certs/cacert.pem:/etc/certs/ca.pem
+    restart: always
+    command:
+      - bash
+      - "-c"
+      - |
+        sed -i 's/^advertisedAddress=/#advertisedAddress=/' conf/standalone.conf
+        sed -ie 's/^brokerServicePort=.*/brokerServicePort=6649/' conf/standalone.conf
+        sed -i 's/^bindAddress=/#bindAddress=/' conf/standalone.conf
+        sed -i 's#^bindAddresses=#bindAddresses=plain:pulsar://0.0.0.0:6650,ssl:pulsar+ssl://0.0.0.0:6651,toxiproxy:pulsar://0.0.0.0:6652,toxiproxy_ssl:pulsar+ssl://0.0.0.0:6653#' conf/standalone.conf
+        sed -i 's#^advertisedAddress=#advertisedAddress=plain:pulsar://pulsar:6650,ssl:pulsar+ssl://pulsar:6651,toxiproxy:pulsar://toxiproxy:6652,toxiproxy_ssl:pulsar+ssl://toxiproxy:6653#' conf/standalone.conf
+        sed -i 's#^tlsCertificateFilePath=#tlsCertificateFilePath=/etc/certs/server.pem#' conf/standalone.conf
+        sed -i 's#^tlsTrustCertsFilePath=#tlsTrustCertsFilePath=/etc/certs/ca.pem#' conf/standalone.conf
+        sed -i 's#^tlsKeyFilePath=#tlsKeyFilePath=/etc/certs/key.pem#' conf/standalone.conf
+        sed -i 's#^tlsProtocols=#tlsProtocols=TLSv1.3,TLSv1.2#' conf/standalone.conf
+        sed -i 's#^tlsCiphers=#tlsCiphers=TLS_AES_256_GCM_SHA384#' conf/standalone.conf
+        echo 'advertisedListeners=plain:pulsar://pulsar:6650,ssl:pulsar+ssl://pulsar:6651,toxiproxy:pulsar://toxiproxy:6652,toxiproxy_ssl:pulsar+ssl://toxiproxy:6653' >> conf/standalone.conf
+        bin/pulsar standalone -nfw -nss

+ 12 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -107,5 +107,17 @@
     "listen": "0.0.0.0:4242",
     "upstream": "opents:4242",
     "enabled": true
+  },
+  {
+    "name": "pulsar_plain",
+    "listen": "0.0.0.0:6652",
+    "upstream": "pulsar:6652",
+    "enabled": true
+  },
+  {
+    "name": "pulsar_tls",
+    "listen": "0.0.0.0:6653",
+    "upstream": "pulsar:6653",
+    "enabled": true
   }
 ]

+ 1 - 0
LICENSE

@@ -9,3 +9,4 @@ sub-directory and some of the apps under the apps directory.
 
 Source code under apps that uses BSL License:
 - apps/emqx_bridge_kafka
+- apps/emqx_bridge_pulsar

+ 22 - 2
apps/emqx/test/emqx_test_janitor.erl

@@ -60,12 +60,12 @@ init(Parent) ->
     {ok, #{callbacks => [], owner => Parent}}.
 
 terminate(_Reason, #{callbacks := Callbacks}) ->
-    lists:foreach(fun(Fun) -> catch Fun() end, Callbacks).
+    do_terminate(Callbacks).
 
 handle_call({push, Callback}, _From, State = #{callbacks := Callbacks}) ->
     {reply, ok, State#{callbacks := [Callback | Callbacks]}};
 handle_call(terminate, _From, State = #{callbacks := Callbacks}) ->
-    lists:foreach(fun(Fun) -> catch Fun() end, Callbacks),
+    do_terminate(Callbacks),
     {stop, normal, ok, State};
 handle_call(_Req, _From, State) ->
     {reply, error, State}.
@@ -77,3 +77,23 @@ handle_info({'EXIT', Parent, _Reason}, State = #{owner := Parent}) ->
     {stop, normal, State};
 handle_info(_Msg, State) ->
     {noreply, State}.
+
+%%----------------------------------------------------------------------------------
+%% Internal fns
+%%----------------------------------------------------------------------------------
+
+do_terminate(Callbacks) ->
+    lists:foreach(
+        fun(Fun) ->
+            try
+                Fun()
+            catch
+                K:E:S ->
+                    ct:pal("error executing callback ~p: ~p", [Fun, {K, E}]),
+                    ct:pal("stacktrace: ~p", [S]),
+                    ok
+            end
+        end,
+        Callbacks
+    ),
+    ok.

+ 2 - 1
apps/emqx_bridge/src/emqx_bridge.erl

@@ -70,7 +70,8 @@
     T == dynamo;
     T == rocketmq;
     T == cassandra;
-    T == sqlserver
+    T == sqlserver;
+    T == pulsar_producer
 ).
 
 load() ->

+ 2 - 0
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -340,6 +340,8 @@ parse_confs(Type, Name, Conf) when ?IS_INGRESS_BRIDGE(Type) ->
 %% to hocon; keeping this as just `kafka' for backwards compatibility.
 parse_confs(<<"kafka">> = _Type, Name, Conf) ->
     Conf#{bridge_name => Name};
+parse_confs(<<"pulsar_producer">> = _Type, Name, Conf) ->
+    Conf#{bridge_name => Name};
 parse_confs(_Type, _Name, Conf) ->
     Conf.
 

+ 19 - 0
apps/emqx_bridge_pulsar/.gitignore

@@ -0,0 +1,19 @@
+.rebar3
+_*
+.eunit
+*.o
+*.beam
+*.plt
+*.swp
+*.swo
+.erlang.cookie
+ebin
+log
+erl_crash.dump
+.rebar
+logs
+_build
+.idea
+*.iml
+rebar3.crashdump
+*~

+ 94 - 0
apps/emqx_bridge_pulsar/BSL.txt

@@ -0,0 +1,94 @@
+Business Source License 1.1
+
+Licensor:             Hangzhou EMQ Technologies Co., Ltd.
+Licensed Work:        EMQX Enterprise Edition
+                      The Licensed Work is (c) 2023
+                      Hangzhou EMQ Technologies Co., Ltd.
+Additional Use Grant: Students and educators are granted right to copy,
+                      modify, and create derivative work for research
+                      or education.
+Change Date:          2027-02-01
+Change License:       Apache License, Version 2.0
+
+For information about alternative licensing arrangements for the Software,
+please contact Licensor: https://www.emqx.com/en/contact
+
+Notice
+
+The Business Source License (this document, or the “License”) is not an Open
+Source license. However, the Licensed Work will eventually be made available
+under an Open Source License, as stated in this License.
+
+License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
+“Business Source License” is a trademark of MariaDB Corporation Ab.
+
+-----------------------------------------------------------------------------
+
+Business Source License 1.1
+
+Terms
+
+The Licensor hereby grants you the right to copy, modify, create derivative
+works, redistribute, and make non-production use of the Licensed Work. The
+Licensor may make an Additional Use Grant, above, permitting limited
+production use.
+
+Effective on the Change Date, or the fourth anniversary of the first publicly
+available distribution of a specific version of the Licensed Work under this
+License, whichever comes first, the Licensor hereby grants you rights under
+the terms of the Change License, and the rights granted in the paragraph
+above terminate.
+
+If your use of the Licensed Work does not comply with the requirements
+currently in effect as described in this License, you must purchase a
+commercial license from the Licensor, its affiliated entities, or authorized
+resellers, or you must refrain from using the Licensed Work.
+
+All copies of the original and modified Licensed Work, and derivative works
+of the Licensed Work, are subject to this License. This License applies
+separately for each version of the Licensed Work and the Change Date may vary
+for each version of the Licensed Work released by Licensor.
+
+You must conspicuously display this License on each original or modified copy
+of the Licensed Work. If you receive the Licensed Work in original or
+modified form from a third party, the terms and conditions set forth in this
+License apply to your use of that work.
+
+Any use of the Licensed Work in violation of this License will automatically
+terminate your rights under this License for the current and all other
+versions of the Licensed Work.
+
+This License does not grant you any right in any trademark or logo of
+Licensor or its affiliates (provided that you may use a trademark or logo of
+Licensor as expressly required by this License).
+
+TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
+AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
+EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
+TITLE.
+
+MariaDB hereby grants you permission to use this License’s text to license
+your works, and to refer to it using the trademark “Business Source License”,
+as long as you comply with the Covenants of Licensor below.
+
+Covenants of Licensor
+
+In consideration of the right to use this License’s text and the “Business
+Source License” name and trademark, Licensor covenants to MariaDB, and to all
+other recipients of the licensed work to be provided by Licensor:
+
+1. To specify as the Change License the GPL Version 2.0 or any later version,
+   or a license that is compatible with GPL Version 2.0 or a later version,
+   where “compatible” means that software provided under the Change License can
+   be included in a program with software provided under GPL Version 2.0 or a
+   later version. Licensor may specify additional Change Licenses without
+   limitation.
+
+2. To either: (a) specify an additional grant of rights to use that does not
+   impose any additional restriction on the right granted in this License, as
+   the Additional Use Grant; or (b) insert the text “None”.
+
+3. To specify a Change Date.
+
+4. Not to modify this License in any other way.

+ 30 - 0
apps/emqx_bridge_pulsar/README.md

@@ -0,0 +1,30 @@
+# Pulsar Data Integration Bridge
+
+This application houses the Pulsar Producer data integration bridge
+for EMQX Enterprise Edition.  It provides the means to connect to
+Pulsar and publish messages to it.
+
+Currently, our Pulsar Producer library has its own `replayq` buffering
+implementation, so this bridge does not require buffer workers from
+`emqx_resource`.  It implements the connection management and
+interaction without need for a separate connector app, since it's not
+used by authentication and authorization applications.
+
+# Documentation links
+
+For more information on Apache Pulsar, please see its [official
+site](https://pulsar.apache.org/).
+
+# Configurations
+
+Please see [our official
+documentation](https://www.emqx.io/docs/en/v5.0/data-integration/data-bridge-pulsar.html)
+for more detailed info.
+
+# Contributing
+
+Please see our [contributing.md](../../CONTRIBUTING.md).
+
+# License
+
+See [BSL](./BSL.txt).

+ 2 - 0
apps/emqx_bridge_pulsar/docker-ct

@@ -0,0 +1,2 @@
+toxiproxy
+pulsar

+ 0 - 0
apps/emqx_bridge_pulsar/etc/emqx_bridge_pulsar.conf


+ 14 - 0
apps/emqx_bridge_pulsar/include/emqx_bridge_pulsar.hrl

@@ -0,0 +1,14 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-ifndef(EMQX_BRIDGE_PULSAR_HRL).
+-define(EMQX_BRIDGE_PULSAR_HRL, true).
+
+-define(PULSAR_HOST_OPTIONS, #{
+    default_port => 6650,
+    default_scheme => "pulsar",
+    supported_schemes => ["pulsar", "pulsar+ssl"]
+}).
+
+-endif.

+ 13 - 0
apps/emqx_bridge_pulsar/rebar.config

@@ -0,0 +1,13 @@
+%% -*- mode: erlang; -*-
+
+{erl_opts, [debug_info]}.
+{deps, [ {pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.0"}}}
+       , {emqx_connector, {path, "../../apps/emqx_connector"}}
+       , {emqx_resource, {path, "../../apps/emqx_resource"}}
+       , {emqx_bridge, {path, "../../apps/emqx_bridge"}}
+       ]}.
+
+{shell, [
+  % {config, "config/sys.config"},
+    {apps, [emqx_bridge_pulsar]}
+]}.

+ 15 - 0
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src

@@ -0,0 +1,15 @@
+{application, emqx_bridge_pulsar, [
+    {description, "EMQX Pulsar Bridge"},
+    {vsn, "0.1.0"},
+    {registered, []},
+    {mod, {emqx_bridge_pulsar_app, []}},
+    {applications, [
+        kernel,
+        stdlib,
+        pulsar
+    ]},
+    {env, []},
+    {modules, []},
+
+    {links, []}
+]}.

+ 228 - 0
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl

@@ -0,0 +1,228 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_pulsar).
+
+-include("emqx_bridge_pulsar.hrl").
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+%% hocon_schema API
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1
+]).
+%% emqx_ee_bridge "unofficial" API
+-export([conn_bridge_examples/1]).
+
+%%-------------------------------------------------------------------------------------------------
+%% `hocon_schema' API
+%%-------------------------------------------------------------------------------------------------
+
+namespace() ->
+    "bridge_pulsar".
+
+roots() ->
+    [].
+
+fields(pulsar_producer) ->
+    fields(config) ++ fields(producer_opts);
+fields(config) ->
+    [
+        {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
+        {servers,
+            mk(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("servers"),
+                    validator => emqx_schema:servers_validator(
+                        ?PULSAR_HOST_OPTIONS, _Required = true
+                    )
+                }
+            )},
+        {authentication,
+            mk(hoconsc:union([none, ref(auth_basic), ref(auth_token)]), #{
+                default => none, desc => ?DESC("authentication")
+            })}
+    ] ++ emqx_connector_schema_lib:ssl_fields();
+fields(producer_opts) ->
+    [
+        {batch_size,
+            mk(
+                pos_integer(),
+                #{default => 100, desc => ?DESC("producer_batch_size")}
+            )},
+        {compression,
+            mk(
+                hoconsc:enum([no_compression, snappy, zlib]),
+                #{default => no_compression, desc => ?DESC("producer_compression")}
+            )},
+        {send_buffer,
+            mk(emqx_schema:bytesize(), #{
+                default => <<"1MB">>, desc => ?DESC("producer_send_buffer")
+            })},
+        {sync_timeout,
+            mk(emqx_schema:duration_ms(), #{
+                default => <<"3s">>, desc => ?DESC("producer_sync_timeout")
+            })},
+        {retention_period,
+            mk(
+                hoconsc:union([infinity, emqx_schema:duration_ms()]),
+                #{default => infinity, desc => ?DESC("producer_retention_period")}
+            )},
+        {max_batch_bytes,
+            mk(
+                emqx_schema:bytesize(),
+                #{default => <<"900KB">>, desc => ?DESC("producer_max_batch_bytes")}
+            )},
+        {local_topic, mk(binary(), #{required => false, desc => ?DESC("producer_local_topic")})},
+        {pulsar_topic, mk(binary(), #{required => true, desc => ?DESC("producer_pulsar_topic")})},
+        {strategy,
+            mk(
+                hoconsc:enum([random, roundrobin, first_key_dispatch]),
+                #{default => random, desc => ?DESC("producer_strategy")}
+            )},
+        {buffer, mk(ref(producer_buffer), #{required => false, desc => ?DESC("producer_buffer")})},
+        {message,
+            mk(ref(producer_pulsar_message), #{
+                required => false, desc => ?DESC("producer_message_opts")
+            })},
+        {resource_opts,
+            mk(
+                ref(producer_resource_opts),
+                #{
+                    required => false,
+                    desc => ?DESC(emqx_resource_schema, "creation_opts")
+                }
+            )}
+    ];
+fields(producer_buffer) ->
+    [
+        {mode,
+            mk(
+                hoconsc:enum([memory, disk, hybrid]),
+                #{default => memory, desc => ?DESC("buffer_mode")}
+            )},
+        {per_partition_limit,
+            mk(
+                emqx_schema:bytesize(),
+                #{default => <<"2GB">>, desc => ?DESC("buffer_per_partition_limit")}
+            )},
+        {segment_bytes,
+            mk(
+                emqx_schema:bytesize(),
+                #{default => <<"100MB">>, desc => ?DESC("buffer_segment_bytes")}
+            )},
+        {memory_overload_protection,
+            mk(boolean(), #{
+                default => false,
+                desc => ?DESC("buffer_memory_overload_protection")
+            })}
+    ];
+fields(producer_pulsar_message) ->
+    [
+        {key,
+            mk(string(), #{default => <<"${.clientid}">>, desc => ?DESC("producer_key_template")})},
+        {value, mk(string(), #{default => <<"${.}">>, desc => ?DESC("producer_value_template")})}
+    ];
+fields(producer_resource_opts) ->
+    SupportedOpts = [
+        health_check_interval,
+        resume_interval,
+        start_after_created,
+        start_timeout,
+        auto_restart_interval
+    ],
+    lists:filtermap(
+        fun
+            ({health_check_interval = Field, MetaFn}) ->
+                {true, {Field, override_default(MetaFn, 1_000)}};
+            ({Field, _Meta}) ->
+                lists:member(Field, SupportedOpts)
+        end,
+        emqx_resource_schema:fields("creation_opts")
+    );
+fields(auth_basic) ->
+    [
+        {username, mk(binary(), #{required => true, desc => ?DESC("auth_basic_username")})},
+        {password,
+            mk(binary(), #{
+                required => true,
+                desc => ?DESC("auth_basic_password"),
+                sensitive => true,
+                converter => fun emqx_schema:password_converter/2
+            })}
+    ];
+fields(auth_token) ->
+    [
+        {jwt,
+            mk(binary(), #{
+                required => true,
+                desc => ?DESC("auth_token_jwt"),
+                sensitive => true,
+                converter => fun emqx_schema:password_converter/2
+            })}
+    ];
+fields("get_" ++ Type) ->
+    emqx_bridge_schema:status_fields() ++ fields("post_" ++ Type);
+fields("put_" ++ Type) ->
+    fields("config_" ++ Type);
+fields("post_" ++ Type) ->
+    [type_field(), name_field() | fields("config_" ++ Type)];
+fields("config_producer") ->
+    fields(pulsar_producer).
+
+desc(pulsar_producer) ->
+    ?DESC(pulsar_producer_struct);
+desc(producer_resource_opts) ->
+    ?DESC(emqx_resource_schema, "creation_opts");
+desc("get_" ++ Type) when Type =:= "producer" ->
+    ["Configuration for Pulsar using `GET` method."];
+desc("put_" ++ Type) when Type =:= "producer" ->
+    ["Configuration for Pulsar using `PUT` method."];
+desc("post_" ++ Type) when Type =:= "producer" ->
+    ["Configuration for Pulsar using `POST` method."];
+desc(Name) ->
+    lists:member(Name, struct_names()) orelse throw({missing_desc, Name}),
+    ?DESC(Name).
+
+conn_bridge_examples(_Method) ->
+    [
+        #{
+            <<"pulsar_producer">> => #{
+                summary => <<"Pulsar Producer Bridge">>,
+                value => #{todo => true}
+            }
+        }
+    ].
+
+%%-------------------------------------------------------------------------------------------------
+%% Internal fns
+%%-------------------------------------------------------------------------------------------------
+
+mk(Type, Meta) -> hoconsc:mk(Type, Meta).
+ref(Name) -> hoconsc:ref(?MODULE, Name).
+
+type_field() ->
+    {type, mk(hoconsc:enum([pulsar_producer]), #{required => true, desc => ?DESC("desc_type")})}.
+
+name_field() ->
+    {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
+
+struct_names() ->
+    [
+        auth_basic,
+        auth_token,
+        producer_buffer,
+        producer_pulsar_message
+    ].
+
+override_default(OriginalFn, NewDefault) ->
+    fun
+        (default) -> NewDefault;
+        (Field) -> OriginalFn(Field)
+    end.

+ 14 - 0
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_app.erl

@@ -0,0 +1,14 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_pulsar_app).
+
+-behaviour(application).
+
+-export([start/2, stop/1]).
+
+start(_StartType, _StartArgs) ->
+    emqx_bridge_pulsar_sup:start_link().
+
+stop(_State) ->
+    ok.

+ 396 - 0
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl

@@ -0,0 +1,396 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_pulsar_impl_producer).
+
+-include("emqx_bridge_pulsar.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+%% `emqx_resource' API
+-export([
+    callback_mode/0,
+    is_buffer_supported/0,
+    on_start/2,
+    on_stop/2,
+    on_get_status/2,
+    on_query/3,
+    on_query_async/4
+]).
+
+-type pulsar_client_id() :: atom().
+-type state() :: #{
+    pulsar_client_id := pulsar_client_id(),
+    producers := pulsar_producers:producers(),
+    sync_timeout := infinity | time:time(),
+    message_template := message_template()
+}.
+-type buffer_mode() :: memory | disk | hybrid.
+-type compression_mode() :: no_compression | snappy | zlib.
+-type partition_strategy() :: random | roundrobin | first_key_dispatch.
+-type message_template_raw() :: #{
+    key := binary(),
+    value := binary()
+}.
+-type message_template() :: #{
+    key := emqx_plugin_libs_rule:tmpl_token(),
+    value := emqx_plugin_libs_rule:tmpl_token()
+}.
+-type config() :: #{
+    authentication := _,
+    batch_size := pos_integer(),
+    bridge_name := atom(),
+    buffer := #{
+        mode := buffer_mode(),
+        per_partition_limit := emqx_schema:byte_size(),
+        segment_bytes := emqx_schema:byte_size(),
+        memory_overload_protection := boolean()
+    },
+    compression := compression_mode(),
+    max_batch_bytes := emqx_schema:bytesize(),
+    message := message_template_raw(),
+    pulsar_topic := binary(),
+    retention_period := infinity | emqx_schema:duration_ms(),
+    send_buffer := emqx_schema:bytesize(),
+    servers := binary(),
+    ssl := _,
+    strategy := partition_strategy(),
+    sync_timeout := emqx_schema:duration_ms()
+}.
+
+%%-------------------------------------------------------------------------------------
+%% `emqx_resource' API
+%%-------------------------------------------------------------------------------------
+
+callback_mode() -> async_if_possible.
+
+%% there are no queries to be made to this bridge, so we say that
+%% buffer is supported so we don't spawn unused resource buffer
+%% workers.
+is_buffer_supported() -> true.
+
+-spec on_start(manager_id(), config()) -> {ok, state()}.
+on_start(InstanceId, Config) ->
+    #{
+        authentication := _Auth,
+        bridge_name := BridgeName,
+        servers := Servers0,
+        ssl := SSL
+    } = Config,
+    Servers = format_servers(Servers0),
+    ClientId = make_client_id(InstanceId, BridgeName),
+    SSLOpts = emqx_tls_lib:to_client_opts(SSL),
+    ClientOpts = #{
+        ssl_opts => SSLOpts,
+        conn_opts => conn_opts(Config)
+    },
+    case pulsar:ensure_supervised_client(ClientId, Servers, ClientOpts) of
+        {ok, _Pid} ->
+            ?SLOG(info, #{
+                msg => "pulsar_client_started",
+                instance_id => InstanceId,
+                pulsar_hosts => Servers
+            });
+        {error, Error} ->
+            ?SLOG(error, #{
+                msg => "failed_to_start_kafka_client",
+                instance_id => InstanceId,
+                pulsar_hosts => Servers,
+                reason => Error
+            }),
+            throw(failed_to_start_pulsar_client)
+    end,
+    start_producer(Config, InstanceId, ClientId, ClientOpts).
+
+-spec on_stop(manager_id(), state()) -> ok.
+on_stop(_InstanceId, State) ->
+    #{
+        pulsar_client_id := ClientId,
+        producers := Producers
+    } = State,
+    stop_producers(ClientId, Producers),
+    stop_client(ClientId),
+    ?tp(pulsar_bridge_stopped, #{instance_id => _InstanceId}),
+    ok.
+
+-spec on_get_status(manager_id(), state()) -> connected | disconnected.
+on_get_status(_InstanceId, State) ->
+    #{
+        pulsar_client_id := ClientId,
+        producers := Producers
+    } = State,
+    case pulsar_client_sup:find_client(ClientId) of
+        {ok, Pid} ->
+            try pulsar_client:get_status(Pid) of
+                true ->
+                    get_producer_status(Producers);
+                false ->
+                    disconnected
+            catch
+                error:timeout ->
+                    disconnected;
+                exit:{noproc, _} ->
+                    disconnected
+            end;
+        {error, _} ->
+            disconnected
+    end.
+
+-spec on_query(manager_id(), {send_message, map()}, state()) -> ok | {error, timeout}.
+on_query(_InstanceId, {send_message, Message}, State) ->
+    #{
+        producers := Producers,
+        sync_timeout := SyncTimeout,
+        message_template := MessageTemplate
+    } = State,
+    PulsarMessage = render_message(Message, MessageTemplate),
+    try
+        pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout)
+    catch
+        error:timeout ->
+            {error, timeout}
+    end.
+
+-spec on_query_async(
+    manager_id(), {send_message, map()}, {ReplyFun :: function(), Args :: list()}, state()
+) ->
+    {ok, pid()}.
+on_query_async(_InstanceId, {send_message, Message}, AsyncReplyFn, State) ->
+    #{
+        producers := Producers,
+        message_template := MessageTemplate
+    } = State,
+    PulsarMessage = render_message(Message, MessageTemplate),
+    pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}).
+
+%%-------------------------------------------------------------------------------------
+%% Internal fns
+%%-------------------------------------------------------------------------------------
+
+-spec to_bin(atom() | string() | binary()) -> binary().
+to_bin(A) when is_atom(A) ->
+    atom_to_binary(A);
+to_bin(L) when is_list(L) ->
+    list_to_binary(L);
+to_bin(B) when is_binary(B) ->
+    B.
+
+-spec format_servers(binary()) -> [string()].
+format_servers(Servers0) ->
+    Servers1 = emqx_schema:parse_servers(Servers0, ?PULSAR_HOST_OPTIONS),
+    lists:map(
+        fun({Scheme, Host, Port}) ->
+            Scheme ++ "://" ++ Host ++ ":" ++ integer_to_list(Port)
+        end,
+        Servers1
+    ).
+
+-spec make_client_id(manager_id(), atom() | binary()) -> pulsar_client_id().
+make_client_id(InstanceId, BridgeName) ->
+    case is_dry_run(InstanceId) of
+        true ->
+            pulsar_producer_probe;
+        false ->
+            ClientIdBin = iolist_to_binary([
+                <<"pulsar_producer:">>,
+                to_bin(BridgeName),
+                <<":">>,
+                to_bin(node())
+            ]),
+            binary_to_atom(ClientIdBin)
+    end.
+
+-spec is_dry_run(manager_id()) -> boolean().
+is_dry_run(InstanceId) ->
+    TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX),
+    case TestIdStart of
+        nomatch ->
+            false;
+        _ ->
+            string:equal(TestIdStart, InstanceId)
+    end.
+
+conn_opts(#{authentication := none}) ->
+    #{};
+conn_opts(#{authentication := #{username := Username, password := Password}}) ->
+    #{
+        auth_data => iolist_to_binary([Username, <<":">>, Password]),
+        auth_method_name => <<"basic">>
+    };
+conn_opts(#{authentication := #{jwt := JWT}}) ->
+    #{
+        auth_data => JWT,
+        auth_method_name => <<"token">>
+    }.
+
+-spec replayq_dir(pulsar_client_id()) -> string().
+replayq_dir(ClientId) ->
+    filename:join([emqx:data_dir(), "pulsar", to_bin(ClientId)]).
+
+-spec producer_name(pulsar_client_id()) -> atom().
+producer_name(ClientId) ->
+    ClientIdBin = to_bin(ClientId),
+    binary_to_atom(
+        iolist_to_binary([
+            <<"producer-">>,
+            ClientIdBin
+        ])
+    ).
+
+-spec start_producer(config(), manager_id(), pulsar_client_id(), map()) -> {ok, state()}.
+start_producer(Config, InstanceId, ClientId, ClientOpts) ->
+    #{
+        conn_opts := ConnOpts,
+        ssl_opts := SSLOpts
+    } = ClientOpts,
+    #{
+        batch_size := BatchSize,
+        buffer := #{
+            mode := BufferMode,
+            per_partition_limit := PerPartitionLimit,
+            segment_bytes := SegmentBytes,
+            memory_overload_protection := MemOLP0
+        },
+        compression := Compression,
+        max_batch_bytes := MaxBatchBytes,
+        message := MessageTemplateOpts,
+        pulsar_topic := PulsarTopic0,
+        retention_period := RetentionPeriod,
+        send_buffer := SendBuffer,
+        strategy := Strategy,
+        sync_timeout := SyncTimeout
+    } = Config,
+    {OffloadMode, ReplayQDir} =
+        case BufferMode of
+            memory -> {false, false};
+            disk -> {false, replayq_dir(ClientId)};
+            hybrid -> {true, replayq_dir(ClientId)}
+        end,
+    MemOLP =
+        case os:type() of
+            {unix, linux} -> MemOLP0;
+            _ -> false
+        end,
+    ReplayQOpts = #{
+        replayq_dir => ReplayQDir,
+        replayq_offload_mode => OffloadMode,
+        replayq_max_total_bytes => PerPartitionLimit,
+        replayq_seg_bytes => SegmentBytes,
+        drop_if_highmem => MemOLP
+    },
+    ProducerName = producer_name(ClientId),
+    MessageTemplate = compile_message_template(MessageTemplateOpts),
+    ProducerOpts0 =
+        #{
+            batch_size => BatchSize,
+            compression => Compression,
+            conn_opts => ConnOpts,
+            max_batch_bytes => MaxBatchBytes,
+            name => ProducerName,
+            retention_period => RetentionPeriod,
+            ssl_opts => SSLOpts,
+            strategy => Strategy,
+            tcp_opts => [{sndbuf, SendBuffer}]
+        },
+    ProducerOpts = maps:merge(ReplayQOpts, ProducerOpts0),
+    PulsarTopic = binary_to_list(PulsarTopic0),
+    try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of
+        {ok, Producers} ->
+            State = #{
+                pulsar_client_id => ClientId,
+                producers => Producers,
+                sync_timeout => SyncTimeout,
+                message_template => MessageTemplate
+            },
+            ?tp(pulsar_producer_bridge_started, #{}),
+            {ok, State}
+    catch
+        Kind:Error:Stacktrace ->
+            ?SLOG(error, #{
+                msg => "failed_to_start_pulsar_producer",
+                instance_id => InstanceId,
+                kind => Kind,
+                reason => Error,
+                stacktrace => Stacktrace
+            }),
+            stop_client(ClientId),
+            throw(failed_to_start_pulsar_producer)
+    end.
+
+-spec stop_client(pulsar_client_id()) -> ok.
+stop_client(ClientId) ->
+    _ = log_when_error(
+        fun() ->
+            ok = pulsar:stop_and_delete_supervised_client(ClientId),
+            ?tp(pulsar_bridge_client_stopped, #{pulsar_client_id => ClientId}),
+            ok
+        end,
+        #{
+            msg => "failed_to_delete_pulsar_client",
+            pulsar_client_id => ClientId
+        }
+    ),
+    ok.
+
+-spec stop_producers(pulsar_client_id(), pulsar_producers:producers()) -> ok.
+stop_producers(ClientId, Producers) ->
+    _ = log_when_error(
+        fun() ->
+            ok = pulsar:stop_and_delete_supervised_producers(Producers),
+            ?tp(pulsar_bridge_producer_stopped, #{pulsar_client_id => ClientId}),
+            ok
+        end,
+        #{
+            msg => "failed_to_delete_pulsar_producer",
+            pulsar_client_id => ClientId
+        }
+    ),
+    ok.
+
+log_when_error(Fun, Log) ->
+    try
+        Fun()
+    catch
+        C:E ->
+            ?SLOG(error, Log#{
+                exception => C,
+                reason => E
+            })
+    end.
+
+-spec compile_message_template(message_template_raw()) -> message_template().
+compile_message_template(TemplateOpts) ->
+    KeyTemplate = maps:get(key, TemplateOpts, <<"${.clientid}">>),
+    ValueTemplate = maps:get(value, TemplateOpts, <<"${.}">>),
+    #{
+        key => preproc_tmpl(KeyTemplate),
+        value => preproc_tmpl(ValueTemplate)
+    }.
+
+preproc_tmpl(Template) ->
+    emqx_plugin_libs_rule:preproc_tmpl(Template).
+
+render_message(
+    Message, #{key := KeyTemplate, value := ValueTemplate}
+) ->
+    #{
+        key => render(Message, KeyTemplate),
+        value => render(Message, ValueTemplate)
+    }.
+
+render(Message, Template) ->
+    Opts = #{
+        var_trans => fun
+            (undefined) -> <<"">>;
+            (X) -> emqx_plugin_libs_rule:bin(X)
+        end,
+        return => full_binary
+    },
+    emqx_plugin_libs_rule:proc_tmpl(Template, Message, Opts).
+
+get_producer_status(Producers) ->
+    case pulsar_producers:all_connected(Producers) of
+        true -> connected;
+        false -> connecting
+    end.

+ 33 - 0
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_sup.erl

@@ -0,0 +1,33 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_pulsar_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0]).
+
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+start_link() ->
+    supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+
+%% sup_flags() = #{strategy => strategy(),         % optional
+%%                 intensity => non_neg_integer(), % optional
+%%                 period => pos_integer()}        % optional
+%% child_spec() = #{id => child_id(),       % mandatory
+%%                  start => mfargs(),      % mandatory
+%%                  restart => restart(),   % optional
+%%                  shutdown => shutdown(), % optional
+%%                  type => worker(),       % optional
+%%                  modules => modules()}   % optional
+init([]) ->
+    SupFlags = #{
+        strategy => one_for_all,
+        intensity => 0,
+        period => 1
+    },
+    ChildSpecs = [],
+    {ok, {SupFlags, ChildSpecs}}.

+ 819 - 0
apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl

@@ -0,0 +1,819 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_pulsar_impl_producer_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-import(emqx_common_test_helpers, [on_exit/1]).
+
+-define(BRIDGE_TYPE_BIN, <<"pulsar_producer">>).
+-define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_pulsar]).
+-define(RULE_TOPIC, "mqtt/rule").
+-define(RULE_TOPIC_BIN, <<?RULE_TOPIC>>).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    [
+        {group, plain},
+        {group, tls}
+    ].
+
+groups() ->
+    AllTCs = emqx_common_test_helpers:all(?MODULE),
+    OnlyOnceTCs = only_once_tests(),
+    TCs = AllTCs -- OnlyOnceTCs,
+    [
+        {plain, AllTCs},
+        {tls, TCs}
+    ].
+
+only_once_tests() ->
+    [t_create_via_http].
+
+init_per_suite(Config) ->
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_mgmt_api_test_util:end_suite(),
+    ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
+    ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?APPS)),
+    _ = application:stop(emqx_connector),
+    ok.
+
+init_per_group(plain = Type, Config) ->
+    PulsarHost = os:getenv("PULSAR_PLAIN_HOST", "toxiproxy"),
+    PulsarPort = list_to_integer(os:getenv("PULSAR_PLAIN_PORT", "6652")),
+    ProxyName = "pulsar_plain",
+    case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of
+        true ->
+            Config1 = common_init_per_group(),
+            [
+                {proxy_name, ProxyName},
+                {pulsar_host, PulsarHost},
+                {pulsar_port, PulsarPort},
+                {pulsar_type, Type},
+                {use_tls, false}
+                | Config1 ++ Config
+            ];
+        false ->
+            case os:getenv("IS_CI") of
+                "yes" ->
+                    throw(no_pulsar);
+                _ ->
+                    {skip, no_pulsar}
+            end
+    end;
+init_per_group(tls = Type, Config) ->
+    PulsarHost = os:getenv("PULSAR_TLS_HOST", "toxiproxy"),
+    PulsarPort = list_to_integer(os:getenv("PULSAR_TLS_PORT", "6653")),
+    ProxyName = "pulsar_tls",
+    case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of
+        true ->
+            Config1 = common_init_per_group(),
+            [
+                {proxy_name, ProxyName},
+                {pulsar_host, PulsarHost},
+                {pulsar_port, PulsarPort},
+                {pulsar_type, Type},
+                {use_tls, true}
+                | Config1 ++ Config
+            ];
+        false ->
+            case os:getenv("IS_CI") of
+                "yes" ->
+                    throw(no_pulsar);
+                _ ->
+                    {skip, no_pulsar}
+            end
+    end;
+init_per_group(_Group, Config) ->
+    Config.
+
+end_per_group(Group, Config) when
+    Group =:= plain
+->
+    common_end_per_group(Config),
+    ok;
+end_per_group(_Group, _Config) ->
+    ok.
+
+common_init_per_group() ->
+    ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
+    ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    application:load(emqx_bridge),
+    ok = emqx_common_test_helpers:start_apps([emqx_conf]),
+    ok = emqx_connector_test_helpers:start_apps(?APPS),
+    {ok, _} = application:ensure_all_started(emqx_connector),
+    emqx_mgmt_api_test_util:init_suite(),
+    UniqueNum = integer_to_binary(erlang:unique_integer()),
+    MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>,
+    [
+        {proxy_host, ProxyHost},
+        {proxy_port, ProxyPort},
+        {mqtt_topic, MQTTTopic}
+    ].
+
+common_end_per_group(Config) ->
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    delete_all_bridges(),
+    ok.
+
+init_per_testcase(TestCase, Config) ->
+    common_init_per_testcase(TestCase, Config).
+
+end_per_testcase(_Testcase, Config) ->
+    case proplists:get_bool(skip_does_not_apply, Config) of
+        true ->
+            ok;
+        false ->
+            ProxyHost = ?config(proxy_host, Config),
+            ProxyPort = ?config(proxy_port, Config),
+            emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+            delete_all_bridges(),
+            stop_consumer(Config),
+            %% in CI, apparently this needs more time since the
+            %% machines struggle with all the containers running...
+            emqx_common_test_helpers:call_janitor(60_000),
+            ok = snabbkaffe:stop(),
+            ok
+    end.
+
+common_init_per_testcase(TestCase, Config0) ->
+    ct:timetrap(timer:seconds(60)),
+    delete_all_bridges(),
+    UniqueNum = integer_to_binary(erlang:unique_integer()),
+    PulsarTopic =
+        <<
+            (atom_to_binary(TestCase))/binary,
+            UniqueNum/binary
+        >>,
+    PulsarType = ?config(pulsar_type, Config0),
+    Config1 = [{pulsar_topic, PulsarTopic} | Config0],
+    {Name, ConfigString, PulsarConfig} = pulsar_config(
+        TestCase, PulsarType, Config1
+    ),
+    ConsumerConfig = start_consumer(TestCase, Config1),
+    Config = ConsumerConfig ++ Config1,
+    ok = snabbkaffe:start_trace(),
+    [
+        {pulsar_name, Name},
+        {pulsar_config_string, ConfigString},
+        {pulsar_config, PulsarConfig}
+        | Config
+    ].
+
+delete_all_bridges() ->
+    lists:foreach(
+        fun(#{name := Name, type := Type}) ->
+            emqx_bridge:remove(Type, Name)
+        end,
+        emqx_bridge:list()
+    ).
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+pulsar_config(TestCase, _PulsarType, Config) ->
+    UniqueNum = integer_to_binary(erlang:unique_integer()),
+    PulsarHost = ?config(pulsar_host, Config),
+    PulsarPort = ?config(pulsar_port, Config),
+    PulsarTopic = ?config(pulsar_topic, Config),
+    AuthType = proplists:get_value(sasl_auth_mechanism, Config, none),
+    UseTLS = proplists:get_value(use_tls, Config, false),
+    Name = <<
+        (atom_to_binary(TestCase))/binary, UniqueNum/binary
+    >>,
+    MQTTTopic = proplists:get_value(mqtt_topic, Config, <<"mqtt/topic/", UniqueNum/binary>>),
+    Prefix =
+        case UseTLS of
+            true -> <<"pulsar+ssl://">>;
+            false -> <<"pulsar://">>
+        end,
+    ServerURL = iolist_to_binary([
+        Prefix,
+        PulsarHost,
+        ":",
+        integer_to_binary(PulsarPort)
+    ]),
+    ConfigString =
+        io_lib:format(
+            "bridges.pulsar_producer.~s {\n"
+            "  enable = true\n"
+            "  servers = \"~s\"\n"
+            "  sync_timeout = 5s\n"
+            "  compression = no_compression\n"
+            "  send_buffer = 1MB\n"
+            "  retention_period = infinity\n"
+            "  max_batch_bytes = 900KB\n"
+            "  batch_size = 1\n"
+            "  strategy = random\n"
+            "  buffer {\n"
+            "    mode = memory\n"
+            "    per_partition_limit = 10MB\n"
+            "    segment_bytes = 5MB\n"
+            "    memory_overload_protection = true\n"
+            "  }\n"
+            "  message {\n"
+            "    key = \"${.clientid}\"\n"
+            "    value = \"${.}\"\n"
+            "  }\n"
+            "~s"
+            "  ssl {\n"
+            "    enable = ~p\n"
+            "    verify = verify_none\n"
+            "    server_name_indication = \"auto\"\n"
+            "  }\n"
+            "  pulsar_topic = \"~s\"\n"
+            "  local_topic = \"~s\"\n"
+            "}\n",
+            [
+                Name,
+                ServerURL,
+                authentication(AuthType),
+                UseTLS,
+                PulsarTopic,
+                MQTTTopic
+            ]
+        ),
+    {Name, ConfigString, parse_and_check(ConfigString, Name)}.
+
+parse_and_check(ConfigString, Name) ->
+    {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
+    TypeBin = ?BRIDGE_TYPE_BIN,
+    hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
+    #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf,
+    Config.
+
+authentication(_) ->
+    "  authentication = none\n".
+
+resource_id(Config) ->
+    Type = ?BRIDGE_TYPE_BIN,
+    Name = ?config(pulsar_name, Config),
+    emqx_bridge_resource:resource_id(Type, Name).
+
+create_bridge(Config) ->
+    create_bridge(Config, _Overrides = #{}).
+
+create_bridge(Config, Overrides) ->
+    Type = ?BRIDGE_TYPE_BIN,
+    Name = ?config(pulsar_name, Config),
+    PulsarConfig0 = ?config(pulsar_config, Config),
+    PulsarConfig = emqx_utils_maps:deep_merge(PulsarConfig0, Overrides),
+    emqx_bridge:create(Type, Name, PulsarConfig).
+
+create_bridge_api(Config) ->
+    create_bridge_api(Config, _Overrides = #{}).
+
+create_bridge_api(Config, Overrides) ->
+    TypeBin = ?BRIDGE_TYPE_BIN,
+    Name = ?config(pulsar_name, Config),
+    PulsarConfig0 = ?config(pulsar_config, Config),
+    PulsarConfig = emqx_utils_maps:deep_merge(PulsarConfig0, Overrides),
+    Params = PulsarConfig#{<<"type">> => TypeBin, <<"name">> => Name},
+    Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    Opts = #{return_all => true},
+    ct:pal("creating bridge (via http): ~p", [Params]),
+    Res =
+        case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
+            {ok, {Status, Headers, Body0}} ->
+                {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
+            Error ->
+                Error
+        end,
+    ct:pal("bridge create result: ~p", [Res]),
+    Res.
+
+update_bridge_api(Config) ->
+    update_bridge_api(Config, _Overrides = #{}).
+
+update_bridge_api(Config, Overrides) ->
+    TypeBin = ?BRIDGE_TYPE_BIN,
+    Name = ?config(pulsar_name, Config),
+    PulsarConfig0 = ?config(pulsar_config, Config),
+    PulsarConfig = emqx_utils_maps:deep_merge(PulsarConfig0, Overrides),
+    BridgeId = emqx_bridge_resource:bridge_id(TypeBin, Name),
+    Params = PulsarConfig#{<<"type">> => TypeBin, <<"name">> => Name},
+    Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    Opts = #{return_all => true},
+    ct:pal("updating bridge (via http): ~p", [Params]),
+    Res =
+        case emqx_mgmt_api_test_util:request_api(put, Path, "", AuthHeader, Params, Opts) of
+            {ok, {_Status, _Headers, Body0}} -> {ok, emqx_utils_json:decode(Body0, [return_maps])};
+            Error -> Error
+        end,
+    ct:pal("bridge update result: ~p", [Res]),
+    Res.
+
+probe_bridge_api(Config) ->
+    probe_bridge_api(Config, _Overrides = #{}).
+
+probe_bridge_api(Config, Overrides) ->
+    TypeBin = ?BRIDGE_TYPE_BIN,
+    Name = ?config(pulsar_name, Config),
+    PulsarConfig = ?config(pulsar_config, Config),
+    Params0 = PulsarConfig#{<<"type">> => TypeBin, <<"name">> => Name},
+    Params = maps:merge(Params0, Overrides),
+    Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    Opts = #{return_all => true},
+    ct:pal("probing bridge (via http): ~p", [Params]),
+    Res =
+        case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
+            {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0};
+            Error -> Error
+        end,
+    ct:pal("bridge probe result: ~p", [Res]),
+    Res.
+
+start_consumer(TestCase, Config) ->
+    PulsarHost = ?config(pulsar_host, Config),
+    PulsarPort = ?config(pulsar_port, Config),
+    PulsarTopic = ?config(pulsar_topic, Config),
+    UseTLS = ?config(use_tls, Config),
+    %% FIXME: patch pulsar to accept binary urls...
+    Scheme =
+        case UseTLS of
+            true -> <<"pulsar+ssl://">>;
+            false -> <<"pulsar://">>
+        end,
+    URL =
+        binary_to_list(
+            <<Scheme/binary, (list_to_binary(PulsarHost))/binary, ":",
+                (integer_to_binary(PulsarPort))/binary>>
+        ),
+    ConnOpts = #{},
+    ConsumerClientId = TestCase,
+    CertsPath = emqx_common_test_helpers:deps_path(emqx, "etc/certs"),
+    SSLOpts = #{
+        enable => UseTLS,
+        keyfile => filename:join([CertsPath, "key.pem"]),
+        certfile => filename:join([CertsPath, "cert.pem"]),
+        cacertfile => filename:join([CertsPath, "cacert.pem"])
+    },
+    {ok, _ClientPid} = pulsar:ensure_supervised_client(
+        ConsumerClientId,
+        [URL],
+        #{
+            conn_opts => ConnOpts,
+            ssl_opts => emqx_tls_lib:to_client_opts(SSLOpts)
+        }
+    ),
+    ConsumerOpts = #{
+        cb_init_args => #{send_to => self()},
+        cb_module => pulsar_echo_consumer,
+        sub_type => 'Shared',
+        subscription => atom_to_list(TestCase),
+        max_consumer_num => 1,
+        %% Note!  This must not coincide with the client
+        %% id, or else weird bugs will happen, like the
+        %% consumer never starts...
+        name => test_consumer,
+        consumer_id => 1,
+        conn_opts => ConnOpts
+    },
+    {ok, Consumer} = pulsar:ensure_supervised_consumers(
+        ConsumerClientId,
+        PulsarTopic,
+        ConsumerOpts
+    ),
+    %% since connection is async, and there's currently no way to
+    %% specify the subscription initial position as `Earliest', we
+    %% need to wait until the consumer is connected to avoid
+    %% flakiness.
+    ok = wait_until_consumer_connected(Consumer),
+    [
+        {consumer_client_id, ConsumerClientId},
+        {pulsar_consumer, Consumer}
+    ].
+
+stop_consumer(Config) ->
+    ConsumerClientId = ?config(consumer_client_id, Config),
+    Consumer = ?config(pulsar_consumer, Config),
+    ok = pulsar:stop_and_delete_supervised_consumers(Consumer),
+    ok = pulsar:stop_and_delete_supervised_client(ConsumerClientId),
+    ok.
+
+wait_until_consumer_connected(Consumer) ->
+    ?retry(
+        _Sleep = 300,
+        _Attempts0 = 20,
+        true = pulsar_consumers:all_connected(Consumer)
+    ),
+    ok.
+
+wait_until_producer_connected() ->
+    wait_until_connected(pulsar_producers_sup, pulsar_producer).
+
+wait_until_connected(SupMod, Mod) ->
+    Pids = [
+        P
+     || {_Name, SupPid, _Type, _Mods} <- supervisor:which_children(SupMod),
+        P <- element(2, process_info(SupPid, links)),
+        case proc_lib:initial_call(P) of
+            {Mod, init, _} -> true;
+            _ -> false
+        end
+    ],
+    ?retry(
+        _Sleep = 300,
+        _Attempts0 = 20,
+        lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids)
+    ),
+    ok.
+
+create_rule_and_action_http(Config) ->
+    PulsarName = ?config(pulsar_name, Config),
+    BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, PulsarName),
+    Params = #{
+        enable => true,
+        sql => <<"SELECT * FROM \"", ?RULE_TOPIC, "\"">>,
+        actions => [BridgeId]
+    },
+    Path = emqx_mgmt_api_test_util:api_path(["rules"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    ct:pal("rule action params: ~p", [Params]),
+    case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
+        {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
+        Error -> Error
+    end.
+
+receive_consumed(Timeout) ->
+    receive
+        {pulsar_message, #{payloads := Payloads}} ->
+            lists:map(fun try_decode_json/1, Payloads)
+    after Timeout ->
+        ct:pal("mailbox: ~p", [process_info(self(), messages)]),
+        ct:fail("no message consumed")
+    end.
+
+try_decode_json(Payload) ->
+    case emqx_utils_json:safe_decode(Payload, [return_maps]) of
+        {error, _} ->
+            Payload;
+        {ok, JSON} ->
+            JSON
+    end.
+
+cluster(Config) ->
+    PrivDataDir = ?config(priv_dir, Config),
+    PeerModule =
+        case os:getenv("IS_CI") of
+            false ->
+                slave;
+            _ ->
+                ct_slave
+        end,
+    Cluster = emqx_common_test_helpers:emqx_cluster(
+        [core, core],
+        [
+            {apps, [emqx_conf, emqx_bridge, emqx_rule_engine, emqx_bridge_pulsar]},
+            {listener_ports, []},
+            {peer_mod, PeerModule},
+            {priv_data_dir, PrivDataDir},
+            {load_schema, true},
+            {start_autocluster, true},
+            {schema_mod, emqx_ee_conf_schema},
+            {env_handler, fun
+                (emqx) ->
+                    application:set_env(emqx, boot_modules, [broker, router]),
+                    ok;
+                (emqx_conf) ->
+                    ok;
+                (_) ->
+                    ok
+            end}
+        ]
+    ),
+    ct:pal("cluster: ~p", [Cluster]),
+    Cluster.
+
+start_cluster(Cluster) ->
+    Nodes =
+        [
+            emqx_common_test_helpers:start_slave(Name, Opts)
+         || {Name, Opts} <- Cluster
+        ],
+    on_exit(fun() ->
+        emqx_utils:pmap(
+            fun(N) ->
+                ct:pal("stopping ~p", [N]),
+                ok = emqx_common_test_helpers:stop_slave(N)
+            end,
+            Nodes
+        )
+    end),
+    Nodes.
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_start_and_produce_ok(Config) ->
+    MQTTTopic = ?config(mqtt_topic, Config),
+    ResourceId = resource_id(Config),
+    ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
+    QoS = 0,
+    Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
+    ?check_trace(
+        begin
+            ?assertMatch(
+                {ok, _},
+                create_bridge(Config)
+            ),
+            {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
+            on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+            %% Publish using local topic.
+            Message0 = emqx_message:make(ClientId, QoS, MQTTTopic, Payload),
+            emqx:publish(Message0),
+            %% Publish using rule engine.
+            Message1 = emqx_message:make(ClientId, QoS, ?RULE_TOPIC_BIN, Payload),
+            emqx:publish(Message1),
+
+            #{rule_id => RuleId}
+        end,
+        fun(#{rule_id := RuleId}, _Trace) ->
+            Data0 = receive_consumed(5_000),
+            ?assertMatch(
+                [
+                    #{
+                        <<"clientid">> := ClientId,
+                        <<"event">> := <<"message.publish">>,
+                        <<"payload">> := Payload,
+                        <<"topic">> := MQTTTopic
+                    }
+                ],
+                Data0
+            ),
+            Data1 = receive_consumed(5_000),
+            ?assertMatch(
+                [
+                    #{
+                        <<"clientid">> := ClientId,
+                        <<"event">> := <<"message.publish">>,
+                        <<"payload">> := Payload,
+                        <<"topic">> := ?RULE_TOPIC_BIN
+                    }
+                ],
+                Data1
+            ),
+            ?retry(
+                _Sleep = 100,
+                _Attempts0 = 20,
+                begin
+                    ?assertMatch(
+                        #{
+                            counters := #{
+                                dropped := 0,
+                                failed := 0,
+                                late_reply := 0,
+                                matched := 2,
+                                received := 0,
+                                retried := 0,
+                                success := 2
+                            }
+                        },
+                        emqx_resource_manager:get_metrics(ResourceId)
+                    ),
+                    ?assertEqual(
+                        1, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.success')
+                    ),
+                    ?assertEqual(
+                        0, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.failed')
+                    ),
+                    ok
+                end
+            ),
+            ok
+        end
+    ),
+    ok.
+
+%% Under normal operations, the bridge will be called async via
+%% `simple_async_query'.
+t_sync_query(Config) ->
+    ResourceId = resource_id(Config),
+    Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
+    ?check_trace(
+        begin
+            ?assertMatch({ok, _}, create_bridge_api(Config)),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+            ),
+            Message = {send_message, #{payload => Payload}},
+            ?assertMatch(
+                {ok, #{sequence_id := _}}, emqx_resource:simple_sync_query(ResourceId, Message)
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
+t_create_via_http(Config) ->
+    ?check_trace(
+        begin
+            ?assertMatch({ok, _}, create_bridge_api(Config)),
+
+            %% lightweight matrix testing some configs
+            ?assertMatch(
+                {ok, _},
+                update_bridge_api(
+                    Config,
+                    #{
+                        <<"buffer">> =>
+                            #{<<"mode">> => <<"disk">>}
+                    }
+                )
+            ),
+            ?assertMatch(
+                {ok, _},
+                update_bridge_api(
+                    Config,
+                    #{
+                        <<"buffer">> =>
+                            #{
+                                <<"mode">> => <<"hybrid">>,
+                                <<"memory_overload_protection">> => true
+                            }
+                    }
+                )
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
+t_start_stop(Config) ->
+    PulsarName = ?config(pulsar_name, Config),
+    ResourceId = resource_id(Config),
+    ?check_trace(
+        begin
+            ?assertMatch(
+                {ok, _},
+                create_bridge(Config)
+            ),
+            %% Since the connection process is async, we give it some time to
+            %% stabilize and avoid flakiness.
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+            ),
+
+            %% Check that the bridge probe API doesn't leak atoms.
+            redbug:start(
+                [
+                    "emqx_resource_manager:health_check_interval -> return",
+                    "emqx_resource_manager:with_health_check -> return"
+                ],
+                [{msgs, 100}, {time, 30_000}]
+            ),
+            ProbeRes0 = probe_bridge_api(
+                Config,
+                #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
+            ),
+            ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0),
+            AtomsBefore = erlang:system_info(atom_count),
+            %% Probe again; shouldn't have created more atoms.
+            ProbeRes1 = probe_bridge_api(
+                Config,
+                #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
+            ),
+            ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1),
+            AtomsAfter = erlang:system_info(atom_count),
+            ?assertEqual(AtomsBefore, AtomsAfter),
+
+            %% Now stop the bridge.
+            ?assertMatch(
+                {{ok, _}, {ok, _}},
+                ?wait_async_action(
+                    emqx_bridge:disable_enable(disable, ?BRIDGE_TYPE_BIN, PulsarName),
+                    #{?snk_kind := pulsar_bridge_stopped},
+                    5_000
+                )
+            ),
+
+            ok
+        end,
+        fun(Trace) ->
+            %% one for each probe, one for real
+            ?assertMatch([_, _, _], ?of_kind(pulsar_bridge_producer_stopped, Trace)),
+            ?assertMatch([_, _, _], ?of_kind(pulsar_bridge_client_stopped, Trace)),
+            ?assertMatch([_, _, _], ?of_kind(pulsar_bridge_stopped, Trace)),
+            ok
+        end
+    ),
+    ok.
+
+t_on_get_status(Config) ->
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyName = ?config(proxy_name, Config),
+    ResourceId = resource_id(Config),
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    %% Since the connection process is async, we give it some time to
+    %% stabilize and avoid flakiness.
+    ?retry(
+        _Sleep = 1_000,
+        _Attempts = 20,
+        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+    ),
+    emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+        ct:sleep(500),
+        ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId))
+    end),
+    %% Check that it recovers itself.
+    ?retry(
+        _Sleep = 1_000,
+        _Attempts = 20,
+        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+    ),
+    ok.
+
+t_cluster(Config) ->
+    MQTTTopic = ?config(mqtt_topic, Config),
+    ResourceId = resource_id(Config),
+    Cluster = cluster(Config),
+    ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
+    QoS = 0,
+    Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
+    ?check_trace(
+        begin
+            Nodes = [N1, N2 | _] = start_cluster(Cluster),
+            {ok, SRef0} = snabbkaffe:subscribe(
+                ?match_event(#{?snk_kind := pulsar_producer_bridge_started}),
+                length(Nodes),
+                15_000
+            ),
+            {ok, _} = erpc:call(N1, fun() -> create_bridge(Config) end),
+            {ok, _} = snabbkaffe:receive_events(SRef0),
+            lists:foreach(
+                fun(N) ->
+                    ?retry(
+                        _Sleep = 1_000,
+                        _Attempts0 = 20,
+                        ?assertEqual(
+                            {ok, connected},
+                            erpc:call(N, emqx_resource_manager, health_check, [ResourceId]),
+                            #{node => N}
+                        )
+                    )
+                end,
+                Nodes
+            ),
+            erpc:multicall(Nodes, fun wait_until_producer_connected/0),
+            Message0 = emqx_message:make(ClientId, QoS, MQTTTopic, Payload),
+            erpc:call(N2, emqx, publish, [Message0]),
+
+            lists:foreach(
+                fun(N) ->
+                    ?assertEqual(
+                        {ok, connected},
+                        erpc:call(N, emqx_resource_manager, health_check, [ResourceId]),
+                        #{node => N}
+                    )
+                end,
+                Nodes
+            ),
+
+            ok
+        end,
+        fun(_Trace) ->
+            Data0 = receive_consumed(10_000),
+            ?assertMatch(
+                [
+                    #{
+                        <<"clientid">> := ClientId,
+                        <<"event">> := <<"message.publish">>,
+                        <<"payload">> := Payload,
+                        <<"topic">> := MQTTTopic
+                    }
+                ],
+                Data0
+            ),
+            ok
+        end
+    ),
+    ok.

+ 25 - 0
apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE_data/pulsar_echo_consumer.erl

@@ -0,0 +1,25 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(pulsar_echo_consumer).
+
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+%% pulsar consumer API
+-export([init/2, handle_message/3]).
+
+init(Topic, Args) ->
+    ct:pal("consumer init: ~p", [#{topic => Topic, args => Args}]),
+    SendTo = maps:get(send_to, Args),
+    ?tp(pulsar_echo_consumer_init, #{topic => Topic}),
+    {ok, #{topic => Topic, send_to => SendTo}}.
+
+handle_message(Message, Payloads, State) ->
+    #{send_to := SendTo, topic := Topic} = State,
+    ct:pal(
+        "pulsar consumer received:\n  ~p",
+        [#{message => Message, payloads => Payloads}]
+    ),
+    SendTo ! {pulsar_message, #{topic => Topic, message => Message, payloads => Payloads}},
+    ?tp(pulsar_echo_consumer_message, #{topic => Topic, message => Message, payloads => Payloads}),
+    {ok, 'Individual', State}.

+ 6 - 1
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -165,8 +165,13 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
 create_dry_run(ResourceType, Config) ->
     ResId = make_test_id(),
     MgrId = set_new_owner(ResId),
+    Opts =
+        case is_map(Config) of
+            true -> maps:get(resource_opts, Config, #{});
+            false -> #{}
+        end,
     ok = emqx_resource_manager_sup:ensure_child(
-        MgrId, ResId, <<"dry_run">>, ResourceType, Config, #{}
+        MgrId, ResId, <<"dry_run">>, ResourceType, Config, Opts
     ),
     case wait_for_ready(ResId, 5000) of
         ok ->

+ 1 - 0
changes/ee/feat-10378.en.md

@@ -0,0 +1 @@
+Implement Pulsar Producer Bridge, which supports publishing messages to Pulsar from MQTT topics.

+ 3 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src

@@ -9,7 +9,9 @@
         telemetry,
         emqx_bridge_kafka,
         emqx_bridge_gcp_pubsub,
-        emqx_bridge_opents
+        emqx_bridge_cassandra,
+        emqx_bridge_opents,
+        emqx_bridge_pulsar
     ]},
     {env, []},
     {modules, []},

+ 20 - 4
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl

@@ -36,7 +36,8 @@ api_schemas(Method) ->
         ref(emqx_ee_bridge_dynamo, Method),
         ref(emqx_ee_bridge_rocketmq, Method),
         ref(emqx_ee_bridge_sqlserver, Method),
-        ref(emqx_bridge_opents, Method)
+        ref(emqx_bridge_opents, Method),
+        ref(emqx_bridge_pulsar, Method ++ "_producer")
     ].
 
 schema_modules() ->
@@ -57,7 +58,8 @@ schema_modules() ->
         emqx_ee_bridge_dynamo,
         emqx_ee_bridge_rocketmq,
         emqx_ee_bridge_sqlserver,
-        emqx_bridge_opents
+        emqx_bridge_opents,
+        emqx_bridge_pulsar
     ].
 
 examples(Method) ->
@@ -97,7 +99,8 @@ resource_type(clickhouse) -> emqx_ee_connector_clickhouse;
 resource_type(dynamo) -> emqx_ee_connector_dynamo;
 resource_type(rocketmq) -> emqx_ee_connector_rocketmq;
 resource_type(sqlserver) -> emqx_ee_connector_sqlserver;
-resource_type(opents) -> emqx_bridge_opents_connector.
+resource_type(opents) -> emqx_bridge_opents_connector;
+resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer.
 
 fields(bridges) ->
     [
@@ -165,7 +168,8 @@ fields(bridges) ->
                     required => false
                 }
             )}
-    ] ++ kafka_structs() ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++
+    ] ++ kafka_structs() ++ pulsar_structs() ++ mongodb_structs() ++ influxdb_structs() ++
+        redis_structs() ++
         pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs().
 
 mongodb_structs() ->
@@ -202,6 +206,18 @@ kafka_structs() ->
             )}
     ].
 
+pulsar_structs() ->
+    [
+        {pulsar_producer,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_pulsar, pulsar_producer)),
+                #{
+                    desc => <<"Pulsar Producer Bridge Config">>,
+                    required => false
+                }
+            )}
+    ].
+
 influxdb_structs() ->
     [
         {Protocol,

+ 3 - 1
mix.exs

@@ -169,7 +169,8 @@ defmodule EMQXUmbrella.MixProject do
       :emqx_bridge_redis,
       :emqx_bridge_rocketmq,
       :emqx_bridge_tdengine,
-      :emqx_bridge_timescale
+      :emqx_bridge_timescale,
+      :emqx_bridge_pulsar
     ])
   end
 
@@ -360,6 +361,7 @@ defmodule EMQXUmbrella.MixProject do
           emqx_ee_connector: :permanent,
           emqx_ee_bridge: :permanent,
           emqx_bridge_kafka: :permanent,
+          emqx_bridge_pulsar: :permanent,
           emqx_bridge_gcp_pubsub: :permanent,
           emqx_bridge_cassandra: :permanent,
           emqx_bridge_opents: :permanent,

+ 1 - 0
rebar.config.erl

@@ -454,6 +454,7 @@ relx_apps_per_edition(ee) ->
         emqx_ee_connector,
         emqx_ee_bridge,
         emqx_bridge_kafka,
+        emqx_bridge_pulsar,
         emqx_bridge_gcp_pubsub,
         emqx_bridge_cassandra,
         emqx_bridge_opents,

+ 176 - 0
rel/i18n/emqx_bridge_pulsar.hocon

@@ -0,0 +1,176 @@
+emqx_bridge_pulsar {
+  auth_basic {
+    desc = "Parameters for basic authentication."
+    label = "Basic auth params"
+  }
+
+  auth_basic_password {
+    desc = "Basic authentication password."
+    label = "Password"
+  }
+
+  auth_basic_username {
+    desc = "Basic authentication username."
+    label = "Username"
+  }
+
+  auth_token {
+    desc = "Parameters for token authentication."
+    label = "Token auth params"
+  }
+
+  auth_token_jwt {
+    desc = "JWT authentication token."
+    label = "JWT"
+  }
+
+  authentication {
+    desc = "Authentication configs."
+    label = "Authentication"
+  }
+
+  buffer_memory_overload_protection {
+    desc = "Applicable when buffer mode is set to <code>memory</code>\n"
+           "EMQX will drop old buffered messages under high memory pressure."
+           " The high memory threshold is defined in config <code>sysmon.os.sysmem_high_watermark</code>."
+           " NOTE: This config only works on Linux."
+    label = "Memory Overload Protection"
+  }
+
+  buffer_mode {
+    desc = "Message buffer mode.\n"
+           "\n"
+           "<code>memory</code>: Buffer all messages in memory. The messages will be lost"
+           " in case of EMQX node restart\n<code>disk</code>: Buffer all messages on disk."
+           " The messages on disk are able to survive EMQX node restart.\n"
+           "<code>hybrid</code>: Buffer message in memory first, when up to certain limit"
+           " (see <code>segment_bytes</code> config for more information), then start offloading"
+           " messages to disk, Like <code>memory</code> mode, the messages will be lost in"
+           " case of EMQX node restart."
+    label = "Buffer Mode"
+  }
+
+  buffer_per_partition_limit {
+    desc = "Number of bytes allowed to buffer for each Pulsar partition."
+           " When this limit is exceeded, old messages will be dropped in a trade for credits"
+           " for new messages to be buffered."
+    label = "Per-partition Buffer Limit"
+  }
+
+  buffer_segment_bytes {
+    desc = "Applicable when buffer mode is set to <code>disk</code> or <code>hybrid</code>.\n"
+           "This value is to specify the size of each on-disk buffer file."
+    label = "Segment File Bytes"
+  }
+
+  config_enable {
+    desc = "Enable (true) or disable (false) this Pulsar bridge."
+    label = "Enable or Disable"
+  }
+
+  desc_name {
+    desc = "Bridge name, used as a human-readable description of the bridge."
+    label = "Bridge Name"
+  }
+
+  desc_type {
+    desc = "The Bridge Type"
+    label = "Bridge Type"
+  }
+
+  producer_batch_size {
+    desc = "Maximum number of individual requests to batch in a Pulsar message."
+    label = "Batch size"
+  }
+
+  producer_buffer {
+    desc = "Configure producer message buffer.\n\n"
+           "Tell Pulsar producer how to buffer messages when EMQX has more messages to"
+           " send than Pulsar can keep up, or when Pulsar is down."
+    label = "Message Buffer"
+  }
+
+  producer_compression {
+    desc = "Compression method."
+    label = "Compression"
+  }
+
+  producer_key_template {
+    desc = "Template to render Pulsar message key."
+    label = "Message Key"
+  }
+
+  producer_local_topic {
+    desc = "MQTT topic or topic filter as data source (bridge input)."
+           " If rule action is used as data source, this config should be left empty,"
+           " otherwise messages will be duplicated in Pulsar."
+    label = "Source MQTT Topic"
+  }
+
+  producer_max_batch_bytes {
+    desc = "Maximum bytes to collect in a Pulsar message batch. Most of the Pulsar brokers"
+           " default to a limit of 5 MB batch size. EMQX's default value is less than 5 MB in"
+           " order to compensate Pulsar message encoding overheads (especially when each individual"
+           " message is very small). When a single message is over the limit, it is still"
+           " sent (as a single element batch)."
+    label = "Max Batch Bytes"
+  }
+
+  producer_message_opts {
+    desc = "Template to render a Pulsar message."
+    label = "Pulsar Message Template"
+  }
+
+  producer_pulsar_message {
+    desc = "Template to render a Pulsar message."
+    label = "Pulsar Message Template"
+  }
+
+  producer_pulsar_topic {
+    desc = "Pulsar topic name"
+    label = "Pulsar topic name"
+  }
+
+  producer_retention_period {
+    desc = "The amount of time messages will be buffered while there is no connection to"
+           " the Pulsar broker.  Longer times mean that more memory/disk will be used"
+    label = "Retention Period"
+  }
+
+  producer_send_buffer {
+    desc = "Fine tune the socket send buffer. The default value is tuned for high throughput."
+    label = "Socket Send Buffer Size"
+  }
+
+  producer_strategy {
+    desc = "Partition strategy is to tell the producer how to dispatch messages to Pulsar partitions.\n"
+           "\n"
+           "<code>random</code>: Randomly pick a partition for each message.\n"
+           "<code>roundrobin</code>: Pick each available producer in turn for each message.\n"
+           "<code>first_key_dispatch</code>: Hash Pulsar message key of the first message in a batch"
+           " to a partition number."
+    label = "Partition Strategy"
+  }
+
+  producer_sync_timeout {
+    desc = "Maximum wait time for receiving a receipt from Pulsar when publishing synchronously."
+    label = "Sync publish timeout"
+  }
+
+  producer_value_template {
+    desc = "Template to render Pulsar message value."
+    label = "Message Value"
+  }
+
+  pulsar_producer_struct {
+    desc = "Configuration for a Pulsar bridge."
+    label = "Pulsar Bridge Configuration"
+  }
+
+  servers {
+    desc = "A comma separated list of Pulsar URLs in the form <code>scheme://host[:port]</code>"
+           " for the client to connect to. The supported schemes are <code>pulsar://</code> (default)"
+           " and <code>pulsar+ssl://</code>. The default port is 6650."
+    label = "Servers"
+  }
+}

+ 173 - 0
rel/i18n/zh/emqx_bridge_pulsar.hocon

@@ -0,0 +1,173 @@
+emqx_bridge_pulsar {
+
+  pulsar_producer_struct {
+    desc = "Pulsar 桥接配置"
+    label = "Pulsar 桥接配置"
+  }
+
+  desc_type {
+    desc = "桥接类型"
+    label = "桥接类型"
+  }
+
+  desc_name {
+    desc = "桥接名字,可读描述"
+    label = "桥接名字"
+  }
+
+  config_enable {
+    desc = "启用(true)或停用该(false)Pulsar 数据桥接。"
+    label = "启用或停用"
+  }
+
+  servers {
+    desc = "以<code>scheme://host[:port]</code>形式分隔的Pulsar URL列表,"
+           "供客户端连接使用。支持的方案是 <code>pulsar://</code> (默认)"
+           "和<code>pulsar+ssl://</code>。默认的端口是6650。"
+    label = "服务员"
+  }
+
+  authentication {
+    desc = "认证参数。"
+    label = "认证"
+  }
+
+  producer_batch_size {
+    desc = "在一个Pulsar消息中批处理的单个请求的最大数量。"
+    label = "批量大小"
+  }
+
+  producer_compression {
+    desc = "压缩方法。"
+    label = "压缩"
+  }
+
+  producer_send_buffer {
+    desc = "TCP socket 的发送缓存调优。默认值是针对高吞吐量的一个推荐值。"
+    label = "Socket 发送缓存大小"
+  }
+
+  producer_sync_timeout {
+    desc = "同步发布时,从Pulsar接收发送回执的最长等待时间。"
+    label = "同步发布超时"
+  }
+
+  auth_basic_username {
+    desc = "基本认证用户名。"
+    label = "用户名"
+  }
+
+  auth_basic_password {
+    desc = "基本认证密码。"
+    label = "密码"
+  }
+
+  auth_token_jwt {
+    desc = "JWT认证令牌。"
+    label = "JWT"
+  }
+
+  producer_max_batch_bytes {
+    desc = "最大消息批量字节数。"
+           "大多数 Pulsar 环境的默认最低值是 5 MB,EMQX 的默认值比 5 MB 更小是因为需要"
+           "补偿 Pulsar 消息编码所需要的额外字节(尤其是当每条消息都很小的情况下)。"
+           "当单个消息的大小超过该限制时,它仍然会被发送,(相当于该批量中只有单个消息)。"
+    label = "最大批量字节数"
+  }
+
+  producer_retention_period {
+    desc = "当没有连接到Pulsar代理时,信息将被缓冲的时间。 较长的时间意味着将使用更多的内存/磁盘"
+    label = "保留期"
+  }
+
+  producer_local_topic {
+    desc = "MQTT 主题数据源由桥接指定,或留空由规则动作指定。"
+    label = "源 MQTT 主题"
+  }
+
+  producer_pulsar_topic {
+    desc = "Pulsar 主题名称"
+    label = "Pulsar 主题名称"
+  }
+
+  producer_strategy {
+    desc = "设置消息发布时应该如何选择 Pulsar 分区。\n\n"
+           "<code>random</code>: 为每个消息随机选择一个分区。\n"
+           "<code>roundrobin</code>: 依次为每条信息挑选可用的生产商。\n"
+           "<code>first_key_dispatch</code>: 将一批信息中的第一条信息的Pulsar信息密钥哈希到一个分区编号。"
+    label = "分区选择策略"
+  }
+
+  producer_buffer {
+    desc = "配置消息缓存的相关参数。\n\n"
+           "当 EMQX 需要发送的消息超过 Pulsar 处理能力,或者当 Pulsar 临时下线时,EMQX 内部会将消息缓存起来。"
+    label = "消息缓存"
+  }
+
+  buffer_mode {
+    desc = "消息缓存模式。\n"
+           "<code>memory</code>: 所有的消息都缓存在内存里。如果 EMQX 服务重启,缓存的消息会丢失。\n"
+           "<code>disk</code>: 缓存到磁盘上。EMQX 重启后会继续发送重启前未发送完成的消息。\n"
+           "<code>hybrid</code>: 先将消息缓存在内存中,当内存中的消息堆积超过一定限制"
+           "(配置项 <code>segment_bytes</code> 描述了该限制)后,后续的消息会缓存到磁盘上。"
+           "与 <code>memory</code> 模式一样,如果 EMQX 服务重启,缓存的消息会丢失。"
+    label = "缓存模式"
+  }
+
+  buffer_per_partition_limit {
+    desc = "为每个 Pulsar 分区设置的最大缓存字节数。当超过这个上限之后,老的消息会被丢弃,"
+           "为新的消息腾出空间。"
+    label = "Pulsar 分区缓存上限"
+  }
+
+  buffer_segment_bytes {
+    desc = "当缓存模式是 <code>disk</code> 或 <code>hybrid</code> 时适用。"
+           "该配置用于指定缓存到磁盘上的文件的大小。"
+    label = "缓存文件大小"
+  }
+
+  buffer_memory_overload_protection {
+    desc = "缓存模式是 <code>memory</code> 或 <code>hybrid</code> 时适用。"
+           "当系统处于高内存压力时,从队列中丢弃旧的消息以减缓内存增长。"
+           "内存压力值由配置项 <code>sysmon.os.sysmem_high_watermark</code> 决定。"
+           "注意,该配置仅在 Linux 系统中有效。"
+    label = "内存过载保护"
+  }
+
+  producer_message_opts {
+    desc = "用于生成 Pulsar 消息的模版。"
+    label = "Pulsar 消息模版"
+  }
+
+  producer_key_template {
+    desc = "生成 Pulsar 消息 Key 的模版。"
+    label = "消息的 Key"
+  }
+
+  producer_value_template {
+    desc = "生成 Pulsar 消息 Value 的模版。"
+    label = "消息的 Value"
+  }
+
+  auth_basic {
+    desc = "基本认证的参数。"
+    label = "基本认证参数"
+  }
+
+  auth_token {
+    desc = "令牌认证的参数。"
+    label = "Token auth params"
+  }
+
+  producer_buffer {
+    desc = "配置消息缓存的相关参数。\n\n"
+           "当 EMQX 需要发送的消息超过 Pulsar 处理能力,或者当 Pulsar 临时下线时,EMQX 内部会将消息缓存起来。"
+    label = "消息缓存"
+  }
+
+  producer_pulsar_message {
+    desc = "用于生成 Pulsar 消息的模版。"
+    label = "Pulsar 消息模版"
+  }
+
+}

+ 4 - 1
scripts/ct/run.sh

@@ -190,7 +190,10 @@ for dep in ${CT_DEPS}; do
             ;;
         opents)
             FILES+=( '.ci/docker-compose-file/docker-compose-opents.yaml' )
-            ;; 
+            ;;
+        pulsar)
+            FILES+=( '.ci/docker-compose-file/docker-compose-pulsar-tcp.yaml' )
+            ;;
         *)
             echo "unknown_ct_dependency $dep"
             exit 1