Explorar el Código

feat: rabbitmq bridge v2 integration

zhongwencool hace 2 años
padre
commit
b444c82a42

+ 3 - 1
.ci/docker-compose-file/docker-compose-rabbitmq.yaml

@@ -9,10 +9,12 @@ services:
     expose:
       - "15672"
       - "5672"
+      - "5671"
     # We don't want to take ports from the host
-    # ports:
+    #ports:
     #   - "15672:15672"
     #   - "5672:5672"
+    #   - "5671:5671"
     volumes:
       - ./certs/ca.crt:/opt/certs/ca.crt
       - ./certs/server.crt:/opt/certs/server.crt

+ 8 - 1
apps/emqx/test/emqx_common_test_helpers.erl

@@ -584,7 +584,14 @@ is_tcp_server_available(Host, Port) ->
     Timeout :: integer()
 ) -> boolean.
 is_tcp_server_available(Host, Port, Timeout) ->
-    case gen_tcp:connect(Host, Port, [], Timeout) of
+    case
+        gen_tcp:connect(
+            emqx_utils_conv:str(Host),
+            emqx_utils_conv:int(Port),
+            [],
+            Timeout
+        )
+    of
         {ok, Socket} ->
             gen_tcp:close(Socket),
             true;

+ 6 - 1
apps/emqx_bridge/src/emqx_action_info.erl

@@ -39,6 +39,7 @@
     transform_bridge_v1_config_to_action_config/4,
     action_convert_from_connector/3
 ]).
+-export([clean_cache/0]).
 
 -callback bridge_v1_type_name() ->
     atom()
@@ -77,7 +78,7 @@
 ]).
 
 %% ====================================================================
-%% Hadcoded list of info modules for actions
+%% HardCoded list of info modules for actions
 %% TODO: Remove this list once we have made sure that all relevants
 %% apps are loaded before this module is called.
 %% ====================================================================
@@ -103,6 +104,7 @@ hard_coded_action_info_modules_ee() ->
         emqx_bridge_iotdb_action_info,
         emqx_bridge_es_action_info,
         emqx_bridge_opents_action_info,
+        emqx_bridge_rabbitmq_action_info,
         emqx_bridge_greptimedb_action_info,
         emqx_bridge_tdengine_action_info
     ].
@@ -313,6 +315,9 @@ build_cache() ->
     persistent_term:put(internal_emqx_action_persistent_term_info_key(), ActionInfoMap),
     ActionInfoMap.
 
+clean_cache() ->
+    persistent_term:erase(internal_emqx_action_persistent_term_info_key()).
+
 action_info_modules() ->
     ActionInfoModules = [
         action_info_modules(App)

+ 1 - 0
apps/emqx_bridge/src/emqx_bridge_app.erl

@@ -44,6 +44,7 @@ stop(_State) ->
     emqx_conf:remove_handler(?TOP_LELVE_HDLR_PATH),
     ok = emqx_bridge:unload(),
     ok = emqx_bridge_v2:unload(),
+    emqx_action_info:clean_cache(),
     ok.
 
 -if(?EMQX_RELEASE_EDITION == ee).

+ 2 - 1
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src

@@ -1,7 +1,8 @@
 {application, emqx_bridge_rabbitmq, [
     {description, "EMQX Enterprise RabbitMQ Bridge"},
-    {vsn, "0.1.7"},
+    {vsn, "0.1.8"},
     {registered, []},
+    {mod, {emqx_bridge_rabbitmq_app, []}},
     {applications, [
         kernel,
         stdlib,

+ 2 - 2
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 -module(emqx_bridge_rabbitmq).
 
@@ -22,7 +22,7 @@
 ]).
 
 %% -------------------------------------------------------------------------------------------------
-%% Callback used by HTTP API
+%% Callback used by HTTP API v1
 %% -------------------------------------------------------------------------------------------------
 
 conn_bridge_examples(Method) ->

+ 77 - 0
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_action_info.erl

@@ -0,0 +1,77 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_rabbitmq_action_info).
+
+-behaviour(emqx_action_info).
+
+-export([
+    bridge_v1_type_name/0,
+    action_type_name/0,
+    connector_type_name/0,
+    schema_module/0,
+    bridge_v1_config_to_connector_config/1,
+    bridge_v1_config_to_action_config/2,
+    is_source/0,
+    is_action/0
+]).
+
+-define(SCHEMA_MODULE, emqx_bridge_rabbitmq_pubsub_schema).
+-import(emqx_utils_conv, [bin/1]).
+
+bridge_v1_type_name() -> rabbitmq.
+
+action_type_name() -> rabbitmq.
+
+connector_type_name() -> rabbitmq.
+
+schema_module() -> ?SCHEMA_MODULE.
+
+is_source() -> true.
+is_action() -> true.
+
+bridge_v1_config_to_connector_config(BridgeV1Config) ->
+    ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(publisher_action)),
+    ActionParametersKeys = schema_keys(?SCHEMA_MODULE:fields(action_parameters)),
+    ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
+    ConnectorTopLevelKeys = schema_keys(
+        emqx_bridge_rabbitmq_connector_schema:fields("config_connector")
+    ),
+    ConnectorKeys = (maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys)),
+    ConnectorConfig0 = maps:with(ConnectorKeys, BridgeV1Config),
+    emqx_utils_maps:update_if_present(
+        <<"resource_opts">>,
+        fun emqx_connector_schema:project_to_connector_resource_opts/1,
+        ConnectorConfig0
+    ).
+
+bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
+    ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(publisher_action)),
+    ActionParametersKeys = schema_keys(?SCHEMA_MODULE:fields(action_parameters)),
+    ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
+    ActionConfig0 = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
+    emqx_utils_maps:update_if_present(
+        <<"resource_opts">>,
+        fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
+        ActionConfig0#{<<"connector">> => ConnectorName}
+    ).
+
+schema_keys(Schema) ->
+    [bin(Key) || {Key, _} <- Schema].
+
+make_config_map(PickKeys, IndentKeys, Config) ->
+    Conf0 = maps:with(PickKeys, Config),
+    emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0).

+ 26 - 0
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_app.erl

@@ -0,0 +1,26 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_rabbitmq_app).
+
+-behaviour(application).
+
+-export([start/2, stop/1]).
+
+start(_StartType, _StartArgs) ->
+    emqx_bridge_rabbitmq_sup:start_link().
+
+stop(_State) ->
+    ok.

+ 291 - 416
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl

@@ -1,9 +1,9 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 
 -module(emqx_bridge_rabbitmq_connector).
-
+%-feature(maybe_expr, enable).
 -include_lib("emqx_connector/include/emqx_connector.hrl").
 -include_lib("emqx_resource/include/emqx_resource.hrl").
 -include_lib("typerefl/include/types.hrl").
@@ -22,17 +22,16 @@
 %% hocon_schema callbacks
 -export([namespace/0, roots/0, fields/1]).
 
-%% HTTP API callbacks
--export([values/1]).
-
 %% emqx_resource callbacks
 -export([
-    %% Required callbacks
     on_start/2,
+    on_add_channel/4,
+    on_remove_channel/3,
+    on_get_channels/1,
     on_stop/2,
     callback_mode/0,
-    %% Optional callbacks
     on_get_status/2,
+    on_get_channel_status/3,
     on_query/3,
     on_batch_query/3
 ]).
@@ -41,142 +40,18 @@
 -export([connect/1]).
 
 %% Internal callbacks
--export([publish_messages/3]).
+-export([publish_messages/4]).
 
 namespace() -> "rabbitmq".
 
+%% bridge v1
 roots() ->
     [{config, #{type => hoconsc:ref(?MODULE, config)}}].
 
+%% bridge v1 called by emqx_bridge_rabbitmq
 fields(config) ->
-    [
-        {server,
-            hoconsc:mk(
-                typerefl:binary(),
-                #{
-                    default => <<"localhost">>,
-                    desc => ?DESC("server")
-                }
-            )},
-        {port,
-            hoconsc:mk(
-                emqx_schema:port_number(),
-                #{
-                    default => 5672,
-                    desc => ?DESC("server")
-                }
-            )},
-        {username,
-            hoconsc:mk(
-                typerefl:binary(),
-                #{
-                    required => true,
-                    desc => ?DESC("username")
-                }
-            )},
-        {password, emqx_connector_schema_lib:password_field(#{required => true})},
-        {pool_size,
-            hoconsc:mk(
-                typerefl:pos_integer(),
-                #{
-                    default => 8,
-                    desc => ?DESC("pool_size")
-                }
-            )},
-        {timeout,
-            hoconsc:mk(
-                emqx_schema:timeout_duration_ms(),
-                #{
-                    default => <<"5s">>,
-                    desc => ?DESC("timeout")
-                }
-            )},
-        {wait_for_publish_confirmations,
-            hoconsc:mk(
-                boolean(),
-                #{
-                    default => true,
-                    desc => ?DESC("wait_for_publish_confirmations")
-                }
-            )},
-        {publish_confirmation_timeout,
-            hoconsc:mk(
-                emqx_schema:timeout_duration_ms(),
-                #{
-                    default => <<"30s">>,
-                    desc => ?DESC("timeout")
-                }
-            )},
-
-        {virtual_host,
-            hoconsc:mk(
-                typerefl:binary(),
-                #{
-                    default => <<"/">>,
-                    desc => ?DESC("virtual_host")
-                }
-            )},
-        {heartbeat,
-            hoconsc:mk(
-                emqx_schema:timeout_duration_ms(),
-                #{
-                    default => <<"30s">>,
-                    desc => ?DESC("heartbeat")
-                }
-            )},
-        %% Things related to sending messages to RabbitMQ
-        {exchange,
-            hoconsc:mk(
-                typerefl:binary(),
-                #{
-                    required => true,
-                    desc => ?DESC("exchange")
-                }
-            )},
-        {routing_key,
-            hoconsc:mk(
-                typerefl:binary(),
-                #{
-                    required => true,
-                    desc => ?DESC("routing_key")
-                }
-            )},
-        {delivery_mode,
-            hoconsc:mk(
-                hoconsc:enum([non_persistent, persistent]),
-                #{
-                    default => non_persistent,
-                    desc => ?DESC("delivery_mode")
-                }
-            )},
-        {payload_template,
-            hoconsc:mk(
-                binary(),
-                #{
-                    default => <<"${.}">>,
-                    desc => ?DESC("payload_template")
-                }
-            )}
-    ] ++ emqx_connector_schema_lib:ssl_fields().
-
-values(post) ->
-    maps:merge(values(put), #{name => <<"connector">>});
-values(get) ->
-    values(post);
-values(put) ->
-    #{
-        server => <<"localhost">>,
-        port => 5672,
-        enable => true,
-        pool_size => 8,
-        type => rabbitmq,
-        username => <<"guest">>,
-        password => <<"******">>,
-        routing_key => <<"my_routing_key">>,
-        payload_template => <<"">>
-    };
-values(_) ->
-    #{}.
+    emqx_bridge_rabbitmq_connector_schema:fields(connector) ++
+        emqx_bridge_rabbitmq_pubsub_schema:fields(action_parameters).
 
 %% ===================================================================
 %% Callbacks defined in emqx_resource
@@ -186,127 +61,84 @@ values(_) ->
 
 callback_mode() -> always_sync.
 
-%% emqx_resource callback
-
-%% emqx_resource callback called when the resource is started
-
--spec on_start(resource_id(), term()) -> {ok, resource_state()} | {error, _}.
-on_start(
-    InstanceID,
-    #{
-        pool_size := PoolSize,
-        payload_template := PayloadTemplate,
-        delivery_mode := InitialDeliveryMode
-    } = InitialConfig
-) ->
-    DeliveryMode =
-        case InitialDeliveryMode of
-            non_persistent -> 1;
-            persistent -> 2
-        end,
-    Config = InitialConfig#{
-        delivery_mode => DeliveryMode
-    },
+on_start(InstanceID, Config) ->
     ?SLOG(info, #{
         msg => "starting_rabbitmq_connector",
         connector => InstanceID,
         config => emqx_utils:redact(Config)
     }),
+    init_secret(),
     Options = [
         {config, Config},
-        %% The pool_size is read by ecpool and decides the number of workers in
-        %% the pool
-        {pool_size, PoolSize},
+        {pool_size, maps:get(pool_size, Config)},
         {pool, InstanceID}
     ],
-    ProcessedTemplate = emqx_placeholder:preproc_tmpl(PayloadTemplate),
-    State = #{
-        poolname => InstanceID,
-        processed_payload_template => ProcessedTemplate,
-        config => Config
-    },
-    %% Initialize RabbitMQ's secret library so that the password is encrypted
-    %% in the log files.
-    case credentials_obfuscation:secret() of
-        ?PENDING_SECRET ->
-            Bytes = crypto:strong_rand_bytes(128),
-            %% The password can appear in log files if we don't do this
-            credentials_obfuscation:set_secret(Bytes);
-        _ ->
-            %% Already initialized
-            ok
-    end,
     case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of
         ok ->
-            {ok, State};
+            {ok, #{channels => #{}}};
         {error, Reason} ->
-            ?SLOG(info, #{
+            ?SLOG(error, #{
                 msg => "rabbitmq_connector_start_failed",
-                error_reason => Reason,
+                reason => Reason,
                 config => emqx_utils:redact(Config)
             }),
             {error, Reason}
     end.
 
-%% emqx_resource callback called when the resource is stopped
-
--spec on_stop(resource_id(), resource_state()) -> term().
-on_stop(
-    ResourceID,
-    _State
+on_add_channel(
+    InstanceId,
+    #{channels := Channels} = State,
+    ChannelId,
+    Config
 ) ->
+    case maps:is_key(ChannelId, Channels) of
+        true ->
+            {error, already_exists};
+        false ->
+            ProcParam = preproc_parameter(Config),
+            case make_channel(InstanceId, ChannelId, ProcParam) of
+                {ok, RabbitChannels} ->
+                    Channel = #{param => ProcParam, rabbitmq => RabbitChannels},
+                    NewChannels = maps:put(ChannelId, Channel, Channels),
+                    {ok, State#{channels => NewChannels}};
+                {error, Error} ->
+                    ?SLOG(error, #{
+                        msg => "failed_to_start_rabbitmq_channel",
+                        instance_id => InstanceId,
+                        params => emqx_utils:redact(Config),
+                        error => Error
+                    }),
+                    {error, Error}
+            end
+    end.
+
+on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) ->
+    try_unsubscribe(ChannelId, Channels),
+    {ok, State#{channels => maps:remove(ChannelId, Channels)}}.
+
+on_get_channels(InstanceId) ->
+    emqx_bridge_v2:get_channels_for_connector(InstanceId).
+
+on_stop(ResourceID, _State) ->
     ?SLOG(info, #{
         msg => "stopping_rabbitmq_connector",
         connector => ResourceID
     }),
-    stop_clients_and_pool(ResourceID).
-
-stop_clients_and_pool(PoolName) ->
-    Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
-    Clients = [
-        begin
-            {ok, Client} = ecpool_worker:client(Worker),
-            Client
-        end
-     || Worker <- Workers
-    ],
-    %% We need to stop the pool before stopping the workers as the pool monitors the workers
-    StopResult = emqx_resource_pool:stop(PoolName),
-    lists:foreach(fun stop_worker/1, Clients),
-    StopResult.
-
-stop_worker({Channel, Connection}) ->
-    amqp_channel:close(Channel),
-    amqp_connection:close(Connection).
-
-%% This is the callback function that is called by ecpool when the pool is
-%% started
+    lists:foreach(
+        fun({_Name, Worker}) ->
+            case ecpool_worker:client(Worker) of
+                {ok, Conn} -> amqp_connection:close(Conn);
+                _ -> ok
+            end
+        end,
+        ecpool:workers(ResourceID)
+    ),
+    emqx_resource_pool:stop(ResourceID).
 
+%% This is the callback function that is called by ecpool
 -spec connect(term()) -> {ok, {pid(), pid()}, map()} | {error, term()}.
 connect(Options) ->
     Config = proplists:get_value(config, Options),
-    try
-        create_rabbitmq_connection_and_channel(Config)
-    catch
-        _:{error, Reason} ->
-            ?SLOG(error, #{
-                msg => "rabbitmq_connector_connection_failed",
-                error_type => error,
-                error_reason => Reason,
-                config => emqx_utils:redact(Config)
-            }),
-            {error, Reason};
-        Type:Reason ->
-            ?SLOG(error, #{
-                msg => "rabbitmq_connector_connection_failed",
-                error_type => Type,
-                error_reason => Reason,
-                config => emqx_utils:redact(Config)
-            }),
-            {error, Reason}
-    end.
-
-create_rabbitmq_connection_and_channel(Config) ->
     #{
         server := Host,
         port := Port,
@@ -314,237 +146,164 @@ create_rabbitmq_connection_and_channel(Config) ->
         password := WrappedPassword,
         timeout := Timeout,
         virtual_host := VirtualHost,
-        heartbeat := Heartbeat,
-        wait_for_publish_confirmations := WaitForPublishConfirmations
+        heartbeat := Heartbeat
     } = Config,
     %% TODO: teach `amqp` to accept 0-arity closures as passwords.
     Password = emqx_secret:unwrap(WrappedPassword),
-    SSLOptions =
-        case maps:get(ssl, Config, #{}) of
-            #{enable := true} = SSLOpts ->
-                emqx_tls_lib:to_client_opts(SSLOpts);
-            _ ->
-                none
-        end,
-    RabbitMQConnectionOptions =
+    RabbitMQConnOptions =
         #amqp_params_network{
-            host = erlang:binary_to_list(Host),
+            host = Host,
             port = Port,
-            ssl_options = SSLOptions,
+            ssl_options = to_ssl_options(Config),
             username = Username,
             password = Password,
             connection_timeout = Timeout,
             virtual_host = VirtualHost,
             heartbeat = Heartbeat
         },
-    {ok, RabbitMQConnection} =
-        case amqp_connection:start(RabbitMQConnectionOptions) of
-            {ok, Connection} ->
-                {ok, Connection};
-            {error, Reason} ->
-                erlang:error({error, Reason})
-        end,
-    {ok, RabbitMQChannel} =
-        case amqp_connection:open_channel(RabbitMQConnection) of
-            {ok, Channel} ->
-                {ok, Channel};
-            {error, OpenChannelErrorReason} ->
-                erlang:error({error, OpenChannelErrorReason})
-        end,
-    %% We need to enable confirmations if we want to wait for them
-    case WaitForPublishConfirmations of
-        true ->
-            case amqp_channel:call(RabbitMQChannel, #'confirm.select'{}) of
-                #'confirm.select_ok'{} ->
-                    ok;
-                Error ->
-                    ConfirmModeErrorReason =
-                        erlang:iolist_to_binary(
-                            io_lib:format(
-                                "Could not enable RabbitMQ confirmation mode ~p",
-                                [Error]
-                            )
-                        ),
-                    erlang:error({error, ConfirmModeErrorReason})
-            end;
-        false ->
-            ok
-    end,
-    {ok, {RabbitMQConnection, RabbitMQChannel}, #{
-        supervisees => [RabbitMQConnection, RabbitMQChannel]
-    }}.
-
-%% emqx_resource callback called to check the status of the resource
+    case amqp_connection:start(RabbitMQConnOptions) of
+        {ok, RabbitMQConn} ->
+            {ok, RabbitMQConn};
+        {error, Reason} ->
+            ?SLOG(error, #{
+                msg => "rabbitmq_connector_connection_failed",
+                reason => Reason,
+                config => emqx_utils:redact(Config)
+            }),
+            {error, Reason}
+    end.
 
 -spec on_get_status(resource_id(), term()) ->
     {connected, resource_state()} | {disconnected, resource_state(), binary()}.
-on_get_status(
-    _InstId,
-    #{
-        poolname := PoolName
-    } = State
-) ->
-    Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
-    Clients = [
-        begin
-            {ok, Client} = ecpool_worker:client(Worker),
-            Client
-        end
-     || Worker <- Workers
-    ],
-    CheckResults = [
-        check_worker(Client)
-     || Client <- Clients
-    ],
-    Connected = length(CheckResults) > 0 andalso lists:all(fun(R) -> R end, CheckResults),
-    case Connected of
-        true ->
-            {connected, State};
-        false ->
-            {disconnected, State, <<"not_connected">>}
-    end;
-on_get_status(
-    _InstId,
-    State
-) ->
-    {disconnect, State, <<"not_connected: no connection pool in state">>}.
-
-check_worker({Channel, Connection}) ->
-    erlang:is_process_alive(Channel) andalso erlang:is_process_alive(Connection).
+on_get_status(PoolName, #{channels := Channels} = State) ->
+    ChannelNum = maps:size(Channels),
+    Conns = get_rabbitmq_connections(PoolName),
+    Check =
+        lists:all(
+            fun(Conn) ->
+                [{num_channels, ActualNum}] = amqp_connection:info(Conn, [num_channels]),
+                ChannelNum >= ActualNum
+            end,
+            Conns
+        ),
+    case Check andalso Conns =/= [] of
+        true -> {connected, State};
+        false -> {disconnected, State, <<"not_connected">>}
+    end.
 
-%% emqx_resource callback that is called when a non-batch query is received
+on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
+    case emqx_utils_maps:deep_find([ChannelId, rabbitmq], Channels) of
+        {ok, RabbitMQ} ->
+            case lists:all(fun is_process_alive/1, maps:values(RabbitMQ)) of
+                true -> connected;
+                false -> {error, not_connected}
+            end;
+        _ ->
+            {error, not_exists}
+    end.
 
--spec on_query(resource_id(), Request, resource_state()) -> query_result() when
-    Request :: {RequestType, Data},
-    RequestType :: send_message,
-    Data :: map().
-on_query(
-    ResourceID,
-    {RequestType, Data},
-    #{
-        poolname := PoolName,
-        processed_payload_template := PayloadTemplate,
-        config := Config
-    } = State
-) ->
+on_query(ResourceID, {ChannelId, Data} = MsgReq, State) ->
     ?SLOG(debug, #{
         msg => "rabbitmq_connector_received_query",
         connector => ResourceID,
-        type => RequestType,
+        channel => ChannelId,
         data => Data,
         state => emqx_utils:redact(State)
     }),
-    MessageData = format_data(PayloadTemplate, Data),
-    Res = ecpool:pick_and_do(
-        PoolName,
-        {?MODULE, publish_messages, [Config, [MessageData]]},
-        no_handover
-    ),
-    handle_result(Res).
-
-%% emqx_resource callback that is called when a batch query is received
+    #{channels := Channels} = State,
+    case maps:find(ChannelId, Channels) of
+        {ok, #{param := ProcParam, rabbitmq := RabbitMQ}} ->
+            Res = ecpool:pick_and_do(
+                ResourceID,
+                {?MODULE, publish_messages, [RabbitMQ, ProcParam, [MsgReq]]},
+                no_handover
+            ),
+            handle_result(Res);
+        error ->
+            {error, {unrecoverable_error, {invalid_message_tag, ChannelId}}}
+    end.
 
--spec on_batch_query(resource_id(), BatchReq, resource_state()) -> query_result() when
-    BatchReq :: nonempty_list({'send_message', map()}).
-on_batch_query(
-    ResourceID,
-    BatchReq,
-    State
-) ->
+on_batch_query(ResourceID, [{ChannelId, _Data} | _] = Batch, State) ->
     ?SLOG(debug, #{
         msg => "rabbitmq_connector_received_batch_query",
         connector => ResourceID,
-        data => BatchReq,
+        data => Batch,
         state => emqx_utils:redact(State)
     }),
-    %% Currently we only support batch requests with the send_message key
-    {Keys, MessagesToInsert} = lists:unzip(BatchReq),
-    ensure_keys_are_of_type_send_message(Keys),
-    %% Pick out the payload template
-    #{
-        processed_payload_template := PayloadTemplate,
-        poolname := PoolName,
-        config := Config
-    } = State,
-    %% Create batch payload
-    FormattedMessages = [
-        format_data(PayloadTemplate, Data)
-     || Data <- MessagesToInsert
-    ],
-    %% Publish the messages
-    Res = ecpool:pick_and_do(
-        PoolName,
-        {?MODULE, publish_messages, [Config, FormattedMessages]},
-        no_handover
-    ),
-    handle_result(Res).
+    #{channels := Channels} = State,
+    case maps:find(ChannelId, Channels) of
+        {ok, #{param := ProcParam, rabbitmq := RabbitMQ}} ->
+            Res = ecpool:pick_and_do(
+                ResourceID,
+                {?MODULE, publish_messages, [RabbitMQ, ProcParam, Batch]},
+                no_handover
+            ),
+            handle_result(Res);
+        error ->
+            {error, {unrecoverable_error, {invalid_message_tag, ChannelId}}}
+    end.
 
 publish_messages(
-    {_Connection, Channel},
+    Conn,
+    RabbitMQ,
     #{
         delivery_mode := DeliveryMode,
+        payload_template := PayloadTmpl,
         routing_key := RoutingKey,
         exchange := Exchange,
         wait_for_publish_confirmations := WaitForPublishConfirmations,
         publish_confirmation_timeout := PublishConfirmationTimeout
-    } = _Config,
+    },
     Messages
 ) ->
-    MessageProperties = #'P_basic'{
-        headers = [],
-        delivery_mode = DeliveryMode
-    },
-    Method = #'basic.publish'{
-        exchange = Exchange,
-        routing_key = RoutingKey
-    },
-    _ = [
-        amqp_channel:cast(
-            Channel,
-            Method,
-            #amqp_msg{
-                payload = Message,
-                props = MessageProperties
-            }
-        )
-     || Message <- Messages
-    ],
-    case WaitForPublishConfirmations of
-        true ->
-            case amqp_channel:wait_for_confirms(Channel, PublishConfirmationTimeout) of
+    case maps:find(Conn, RabbitMQ) of
+        {ok, Channel} ->
+            MessageProperties = #'P_basic'{
+                headers = [],
+                delivery_mode = DeliveryMode
+            },
+            Method = #'basic.publish'{
+                exchange = Exchange,
+                routing_key = RoutingKey
+            },
+            lists:foreach(
+                fun({_, MsgRaw}) ->
+                    amqp_channel:cast(
+                        Channel,
+                        Method,
+                        #amqp_msg{
+                            payload = format_data(PayloadTmpl, MsgRaw),
+                            props = MessageProperties
+                        }
+                    )
+                end,
+                Messages
+            ),
+            case WaitForPublishConfirmations of
                 true ->
-                    ok;
+                    case amqp_channel:wait_for_confirms(Channel, PublishConfirmationTimeout) of
+                        true ->
+                            ok;
+                        false ->
+                            erlang:error(
+                                {recoverable_error,
+                                    <<"RabbitMQ: Got NACK when waiting for message acknowledgment.">>}
+                            );
+                        timeout ->
+                            erlang:error(
+                                {recoverable_error,
+                                    <<"RabbitMQ: Timeout when waiting for message acknowledgment.">>}
+                            )
+                    end;
                 false ->
-                    erlang:error(
-                        {recoverable_error,
-                            <<"RabbitMQ: Got NACK when waiting for message acknowledgment.">>}
-                    );
-                timeout ->
-                    erlang:error(
-                        {recoverable_error,
-                            <<"RabbitMQ: Timeout when waiting for message acknowledgment.">>}
-                    )
+                    ok
             end;
-        false ->
-            ok
-    end.
-
-ensure_keys_are_of_type_send_message(Keys) ->
-    case lists:all(fun is_send_message_atom/1, Keys) of
-        true ->
-            ok;
-        false ->
+        error ->
             erlang:error(
-                {unrecoverable_error,
-                    <<"Unexpected type for batch message (Expected send_message)">>}
+                {recoverable_error, {<<"RabbitMQ: channel_not_found">>, Conn, RabbitMQ}}
             )
     end.
 
-is_send_message_atom(send_message) ->
-    true;
-is_send_message_atom(_) ->
-    false.
-
 format_data([], Msg) ->
     emqx_utils_json:encode(Msg);
 format_data(Tokens, Msg) ->
@@ -554,3 +313,119 @@ handle_result({error, ecpool_empty}) ->
     {error, {recoverable_error, ecpool_empty}};
 handle_result(Res) ->
     Res.
+
+make_channel(PoolName, ChannelId, Params) ->
+    Conns = get_rabbitmq_connections(PoolName),
+    make_channel(Conns, PoolName, ChannelId, Params, #{}).
+
+make_channel([], _PoolName, _ChannelId, _Param, Acc) ->
+    {ok, Acc};
+make_channel([Conn | Conns], PoolName, ChannelId, Params, Acc) ->
+    maybe
+        {ok, RabbitMQChannel} ?= amqp_connection:open_channel(Conn),
+        ok ?= try_confirm_channel(Params, RabbitMQChannel),
+        ok ?= try_subscribe(Params, RabbitMQChannel, PoolName, ChannelId),
+        NewAcc = Acc#{Conn => RabbitMQChannel},
+        make_channel(Conns, PoolName, ChannelId, Params, NewAcc)
+    end.
+
+%% We need to enable confirmations if we want to wait for them
+try_confirm_channel(#{wait_for_publish_confirmations := true}, Channel) ->
+    case amqp_channel:call(Channel, #'confirm.select'{}) of
+        #'confirm.select_ok'{} ->
+            ok;
+        Error ->
+            Reason =
+                iolist_to_binary(
+                    io_lib:format(
+                        "Could not enable RabbitMQ confirmation mode ~p",
+                        [Error]
+                    )
+                ),
+            {error, Reason}
+    end;
+try_confirm_channel(#{wait_for_publish_confirmations := false}, _Channel) ->
+    ok.
+
+%% Initialize Rabbitmq's secret library so that the password is encrypted
+%% in the log files.
+init_secret() ->
+    case credentials_obfuscation:secret() of
+        ?PENDING_SECRET ->
+            Bytes = crypto:strong_rand_bytes(128),
+            %% The password can appear in log files if we don't do this
+            credentials_obfuscation:set_secret(Bytes);
+        _ ->
+            %% Already initialized
+            ok
+    end.
+
+preproc_parameter(#{config_root := actions, parameters := Parameter}) ->
+    #{
+        payload_template := PayloadTemplate,
+        delivery_mode := InitialDeliveryMode
+    } = Parameter,
+    Parameter#{
+        delivery_mode => delivery_mode(InitialDeliveryMode),
+        payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
+        config_root => actions
+    };
+preproc_parameter(#{config_root := sources, parameters := Parameter, hookpoints := Hooks}) ->
+    #{
+        payload_template := PayloadTmpl,
+        qos := QosTmpl,
+        topic := TopicTmpl
+    } = Parameter,
+    Parameter#{
+        payload_template => emqx_placeholder:preproc_tmpl(PayloadTmpl),
+        qos => preproc_qos(QosTmpl),
+        topic => emqx_placeholder:preproc_tmpl(TopicTmpl),
+        hookpoints => Hooks,
+        config_root => sources
+    }.
+
+preproc_qos(Qos) when is_integer(Qos) -> Qos;
+preproc_qos(Qos) -> emqx_placeholder:preproc_tmpl(Qos).
+
+delivery_mode(non_persistent) -> 1;
+delivery_mode(persistent) -> 2.
+
+to_ssl_options(#{ssl := #{enable := true} = SSLOpts}) ->
+    emqx_tls_lib:to_client_opts(SSLOpts);
+to_ssl_options(_) ->
+    none.
+
+get_rabbitmq_connections(PoolName) ->
+    lists:filtermap(
+        fun({_Name, Worker}) ->
+            case ecpool_worker:client(Worker) of
+                {ok, Conn} -> {true, Conn};
+                _ -> false
+            end
+        end,
+        ecpool:workers(PoolName)
+    ).
+
+try_subscribe(
+    #{queue := Queue, no_ack := NoAck, config_root := sources} = Params,
+    RabbitChan,
+    PoolName,
+    ChannelId
+) ->
+    WorkState = {RabbitChan, PoolName, Params},
+    {ok, ConsumePid} = emqx_bridge_rabbitmq_sup:ensure_started(ChannelId, WorkState),
+    BasicConsume = #'basic.consume'{queue = Queue, no_ack = NoAck},
+    #'basic.consume_ok'{consumer_tag = _} =
+        amqp_channel:subscribe(RabbitChan, BasicConsume, ConsumePid),
+    ok;
+try_subscribe(#{config_root := actions}, _RabbitChan, _PoolName, _ChannelId) ->
+    ok.
+
+try_unsubscribe(ChannelId, Channels) ->
+    case emqx_utils_maps:deep_find([ChannelId, rabbitmq], Channels) of
+        {ok, RabbitMQ} ->
+            lists:foreach(fun(Pid) -> catch amqp_channel:close(Pid) end, maps:values(RabbitMQ)),
+            emqx_bridge_rabbitmq_sup:ensure_deleted(ChannelId);
+        _ ->
+            ok
+    end.

+ 139 - 0
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector_schema.erl

@@ -0,0 +1,139 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_rabbitmq_connector_schema).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+-define(TYPE, rabbitmq).
+
+-export([roots/0, fields/1, desc/1, namespace/0]).
+-export([connector_examples/1, connector_example_values/0]).
+
+%%======================================================================================
+%% Hocon Schema Definitions
+namespace() -> ?TYPE.
+
+roots() -> [].
+
+fields("config_connector") ->
+    emqx_bridge_schema:common_bridge_fields() ++
+        fields(connector) ++
+        emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
+fields(connector) ->
+    [
+        {server,
+            ?HOCON(
+                string(),
+                #{
+                    default => <<"localhost">>,
+                    desc => ?DESC("server")
+                }
+            )},
+        {port,
+            ?HOCON(
+                emqx_schema:port_number(),
+                #{
+                    default => 5672,
+                    desc => ?DESC("server")
+                }
+            )},
+        {username,
+            ?HOCON(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("username")
+                }
+            )},
+        {password, emqx_connector_schema_lib:password_field(#{required => true})},
+        {pool_size,
+            ?HOCON(
+                pos_integer(),
+                #{
+                    default => 8,
+                    desc => ?DESC("pool_size")
+                }
+            )},
+        {timeout,
+            ?HOCON(
+                emqx_schema:timeout_duration_ms(),
+                #{
+                    default => <<"5s">>,
+                    desc => ?DESC("timeout")
+                }
+            )},
+        {virtual_host,
+            ?HOCON(
+                binary(),
+                #{
+                    default => <<"/">>,
+                    desc => ?DESC("virtual_host")
+                }
+            )},
+        {heartbeat,
+            ?HOCON(
+                emqx_schema:timeout_duration_ms(),
+                #{
+                    default => <<"30s">>,
+                    desc => ?DESC("heartbeat")
+                }
+            )}
+    ] ++
+        emqx_connector_schema_lib:ssl_fields();
+fields(connector_resource_opts) ->
+    emqx_connector_schema:resource_opts_fields();
+fields("post") ->
+    emqx_connector_schema:type_and_name_fields(?TYPE) ++ fields("config_connector");
+fields("put") ->
+    fields("config_connector");
+fields("get") ->
+    emqx_bridge_schema:status_fields() ++ fields("config_connector").
+
+desc("config_connector") ->
+    ?DESC("config_connector");
+desc(_) ->
+    undefined.
+
+connector_examples(Method) ->
+    [
+        #{
+            <<"rabbitmq">> =>
+                #{
+                    summary => <<"Rabbitmq Connector">>,
+                    value => emqx_connector_schema:connector_values(
+                        Method, ?TYPE, connector_example_values()
+                    )
+                }
+        }
+    ].
+
+connector_example_values() ->
+    #{
+        name => <<"rabbitmq_connector">>,
+        type => rabbitmq,
+        enable => true,
+        server => <<"127.0.0.1">>,
+        port => 5672,
+        username => <<"guest">>,
+        password => <<"******">>,
+        pool_size => 8,
+        timeout => <<"5s">>,
+        virtual_host => <<"/">>,
+        heartbeat => <<"30s">>,
+        ssl => #{enable => false}
+    }.

+ 273 - 0
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl

@@ -0,0 +1,273 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_rabbitmq_pubsub_schema).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+-export([roots/0, fields/1, desc/1, namespace/0]).
+
+-export([
+    bridge_v2_examples/1,
+    source_examples/1
+]).
+
+-define(ACTION_TYPE, rabbitmq).
+-define(SOURCE_TYPE, rabbitmq).
+
+%%======================================================================================
+%% Hocon Schema Definitions
+namespace() -> "bridge_rabbitmq".
+
+roots() -> [].
+
+fields(action) ->
+    {rabbitmq,
+        ?HOCON(
+            ?MAP(name, ?R_REF(publisher_action)),
+            #{
+                desc => <<"RabbitMQ Action Config">>,
+                required => false
+            }
+        )};
+fields(publisher_action) ->
+    emqx_bridge_v2_schema:make_producer_action_schema(
+        ?HOCON(
+            ?R_REF(action_parameters),
+            #{
+                required => true,
+                desc => ?DESC(action_parameters)
+            }
+        ),
+        #{resource_opts_ref => ?R_REF(action_resource_opts)}
+    );
+fields(action_parameters) ->
+    [
+        {wait_for_publish_confirmations,
+            hoconsc:mk(
+                boolean(),
+                #{
+                    default => true,
+                    desc => ?DESC("wait_for_publish_confirmations")
+                }
+            )},
+        {publish_confirmation_timeout,
+            hoconsc:mk(
+                emqx_schema:timeout_duration_ms(),
+                #{
+                    default => <<"30s">>,
+                    desc => ?DESC("timeout")
+                }
+            )},
+        {exchange,
+            hoconsc:mk(
+                typerefl:binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("exchange")
+                }
+            )},
+        {routing_key,
+            hoconsc:mk(
+                typerefl:binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("routing_key")
+                }
+            )},
+        {delivery_mode,
+            hoconsc:mk(
+                hoconsc:enum([non_persistent, persistent]),
+                #{
+                    default => non_persistent,
+                    desc => ?DESC("delivery_mode")
+                }
+            )},
+        {payload_template,
+            hoconsc:mk(
+                binary(),
+                #{
+                    default => <<"${.}">>,
+                    desc => ?DESC("payload_template")
+                }
+            )}
+    ];
+fields(source) ->
+    {rabbitmq,
+        ?HOCON(
+            hoconsc:map(name, ?R_REF(subscriber_source)),
+            #{
+                desc => <<"MQTT Subscriber Source Config">>,
+                required => false
+            }
+        )};
+fields(subscriber_source) ->
+    emqx_bridge_v2_schema:make_consumer_action_schema(
+        ?HOCON(
+            ?R_REF(ingress_parameters),
+            #{
+                required => true,
+                desc => ?DESC("source_parameters")
+            }
+        )
+    );
+fields(ingress_parameters) ->
+    [
+        {wait_for_publish_confirmations,
+            hoconsc:mk(
+                boolean(),
+                #{
+                    default => true,
+                    desc => ?DESC("wait_for_publish_confirmations")
+                }
+            )},
+        {topic,
+            ?HOCON(
+                binary(),
+                #{
+                    required => true,
+                    validator => fun emqx_schema:non_empty_string/1,
+                    desc => ?DESC("ingress_topic")
+                }
+            )},
+        {qos,
+            ?HOCON(
+                ?UNION([emqx_schema:qos(), binary()]),
+                #{
+                    default => 0,
+                    desc => ?DESC("ingress_qos")
+                }
+            )},
+        {payload_template,
+            ?HOCON(
+                binary(),
+                #{
+                    required => false,
+                    desc => ?DESC("ingress_payload_template")
+                }
+            )},
+        {queue,
+            ?HOCON(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("ingress_queue")
+                }
+            )},
+        {no_ack,
+            ?HOCON(
+                boolean(),
+                #{
+                    required => false,
+                    default => true,
+                    desc => ?DESC("ingress_no_ack")
+                }
+            )}
+    ];
+fields(action_resource_opts) ->
+    emqx_bridge_v2_schema:action_resource_opts_fields();
+fields(source_resource_opts) ->
+    emqx_bridge_v2_schema:source_resource_opts_fields();
+fields(Field) when
+    Field == "get_bridge_v2";
+    Field == "post_bridge_v2";
+    Field == "put_bridge_v2"
+->
+    emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(publisher_action));
+fields(Field) when
+    Field == "get_source";
+    Field == "post_source";
+    Field == "put_source"
+->
+    emqx_bridge_v2_schema:api_fields(Field, ?SOURCE_TYPE, fields(subscriber_source));
+fields(What) ->
+    error({emqx_bridge_mqtt_pubsub_schema, missing_field_handler, What}).
+%% v2: api schema
+%% The parameter equals to
+%%   `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1
+%%   `get_connector`, `post_connector`, `put_connector` from emqx_connector_schema:api_schema/1
+%%--------------------------------------------------------------------
+%% v1/v2
+
+desc("config") ->
+    ?DESC("desc_config");
+desc(action_resource_opts) ->
+    ?DESC(emqx_resource_schema, "creation_opts");
+desc(source_resource_opts) ->
+    ?DESC(emqx_resource_schema, "creation_opts");
+desc(action_parameters) ->
+    ?DESC(action_parameters);
+desc(ingress_parameters) ->
+    ?DESC(ingress_parameters);
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+    ["Configuration for WebHook using `", string:to_upper(Method), "` method."];
+desc("http_action") ->
+    ?DESC("desc_config");
+desc("parameters_opts") ->
+    ?DESC("config_parameters_opts");
+desc(publisher_action) ->
+    ?DESC(publisher_action);
+desc(subscriber_source) ->
+    ?DESC(subscriber_source);
+desc(_) ->
+    undefined.
+
+bridge_v2_examples(Method) ->
+    [
+        #{
+            <<"rabbitmq">> => #{
+                summary => <<"RabbitMQ Producer Action">>,
+                value => emqx_bridge_v2_schema:action_values(
+                    Method,
+                    _ActionType = ?ACTION_TYPE,
+                    _ConnectorType = rabbitmq,
+                    #{
+                        parameters => #{
+                            wait_for_publish_confirmations => true,
+                            publish_confirmation_timeout => <<"30s">>,
+                            exchange => <<"test_exchange">>,
+                            routing_key => <<"/">>,
+                            delivery_mode => <<"non_persistent">>,
+                            payload_template => <<"${.payload}">>
+                        }
+                    }
+                )
+            }
+        }
+    ].
+
+source_examples(Method) ->
+    [
+        #{
+            <<"rabbitmq">> => #{
+                summary => <<"RabbitMQ Subscriber Source">>,
+                value => emqx_bridge_v2_schema:source_values(
+                    Method,
+                    _SourceType = ?SOURCE_TYPE,
+                    _ConnectorType = rabbitmq,
+                    #{
+                        parameters => #{
+                            topic => <<"${payload.mqtt_topic}">>,
+                            qos => <<"${payload.mqtt_qos}">>,
+                            payload_template => <<"${payload.mqtt_payload}">>,
+                            queue => <<"test_queue">>,
+                            no_ack => true
+                        }
+                    }
+                )
+            }
+        }
+    ].

+ 28 - 0
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_source_sup.erl

@@ -0,0 +1,28 @@
+-module(emqx_bridge_rabbitmq_source_sup).
+
+-behaviour(supervisor).
+%% API
+-export([start_link/0]).
+-export([init/1]).
+
+start_link() ->
+    supervisor:start_link(?MODULE, []).
+
+init([]) ->
+    SupFlags = #{
+        strategy => simple_one_for_one,
+        intensity => 100,
+        period => 10
+    },
+    {ok, {SupFlags, [worker_spec()]}}.
+
+worker_spec() ->
+    Mod = emqx_bridge_rabbitmq_source_worker,
+    #{
+        id => Mod,
+        start => {Mod, start_link, []},
+        restart => transient,
+        shutdown => brutal_kill,
+        type => worker,
+        modules => [Mod]
+    }.

+ 82 - 0
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_source_worker.erl

@@ -0,0 +1,82 @@
+-module(emqx_bridge_rabbitmq_source_worker).
+
+-behaviour(gen_server).
+
+-export([start_link/1]).
+-export([
+    init/1,
+    handle_continue/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2
+]).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+start_link(Args) ->
+    gen_server:start_link(?MODULE, Args, []).
+
+init({_RabbitChannel, _InstanceId, _Params} = State) ->
+    {ok, State, {continue, confirm_ok}}.
+
+handle_continue(confirm_ok, State) ->
+    receive
+        #'basic.consume_ok'{} -> {noreply, State}
+    end.
+
+handle_call(_Request, _From, State) ->
+    {reply, ok, State}.
+
+handle_cast(_Request, State) ->
+    {noreply, State}.
+
+handle_info(
+    {#'basic.deliver'{delivery_tag = Tag}, #amqp_msg{
+        payload = Payload,
+        props = #'P_basic'{message_id = MessageId, headers = Headers}
+    }},
+    {Channel, InstanceId, Params} = State
+) ->
+    #{
+        hookpoints := Hooks,
+        payload_template := PayloadTmpl,
+        qos := QoSTmpl,
+        topic := TopicTmpl,
+        no_ack := NoAck
+    } = Params,
+    MQTTMsg = emqx_message:make(
+        make_message_id(MessageId),
+        InstanceId,
+        render(Payload, QoSTmpl),
+        render(Payload, TopicTmpl),
+        render(Payload, PayloadTmpl),
+        #{},
+        make_headers(Headers)
+    ),
+    _ = emqx:publish(MQTTMsg),
+    lists:foreach(fun(Hook) -> emqx_hooks:run(Hook, [MQTTMsg]) end, Hooks),
+    (NoAck =:= false) andalso
+        amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
+    emqx_resource_metrics:received_inc(InstanceId),
+    {noreply, State};
+handle_info(#'basic.cancel_ok'{}, State) ->
+    {stop, normal, State};
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+render(_Message, QoS) when is_integer(QoS) -> QoS;
+render(Message, PayloadTmpl) ->
+    Opts = #{return => full_binary},
+    emqx_placeholder:proc_tmpl(PayloadTmpl, Message, Opts).
+
+make_message_id(undefined) -> emqx_guid:gen();
+make_message_id(Id) -> Id.
+
+make_headers(undefined) ->
+    #{};
+make_headers(Headers) when is_list(Headers) ->
+    maps:from_list([{Key, Value} || {Key, _Type, Value} <- Headers]).

+ 75 - 0
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_sup.erl

@@ -0,0 +1,75 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_rabbitmq_sup).
+
+-behaviour(supervisor).
+
+-export([ensure_started/2]).
+-export([ensure_deleted/1]).
+-export([start_link/0]).
+-export([init/1]).
+
+-define(BRIDGE_SUP, ?MODULE).
+
+ensure_started(SuperId, Config) ->
+    {ok, SuperPid} = ensure_supervisor_started(SuperId),
+    case supervisor:start_child(SuperPid, [Config]) of
+        {ok, WorkPid} ->
+            {ok, WorkPid};
+        {error, {already_started, WorkPid}} ->
+            {ok, WorkPid};
+        {error, Error} ->
+            {error, Error}
+    end.
+
+ensure_deleted(SuperId) ->
+    maybe
+        Pid = erlang:whereis(?BRIDGE_SUP),
+        true ?= Pid =/= undefined,
+        ok ?= supervisor:terminate_child(Pid, SuperId),
+        ok ?= supervisor:delete_child(Pid, SuperId)
+    else
+        false -> ok;
+        {error, not_found} -> ok;
+        Error -> Error
+    end.
+
+ensure_supervisor_started(Id) ->
+    SupervisorSpec =
+        #{
+            id => Id,
+            start => {emqx_bridge_rabbitmq_source_sup, start_link, []},
+            restart => permanent,
+            type => supervisor
+        },
+    case supervisor:start_child(?BRIDGE_SUP, SupervisorSpec) of
+        {ok, Pid} ->
+            {ok, Pid};
+        {error, {already_started, Pid}} ->
+            {ok, Pid}
+    end.
+
+start_link() ->
+    supervisor:start_link({local, ?BRIDGE_SUP}, ?MODULE, []).
+
+init([]) ->
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 50,
+        period => 10
+    },
+    ChildSpecs = [],
+    {ok, {SupFlags, ChildSpecs}}.

+ 98 - 101
apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_SUITE.erl

@@ -14,7 +14,8 @@
 
 %% See comment in
 %% apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl for how to
-%% run this without bringing up the whole CI infrastucture
+%% run this without bringing up the whole CI infrastructure
+-define(TYPE, <<"rabbitmq">>).
 
 rabbit_mq_host() ->
     <<"rabbitmq">>.
@@ -34,8 +35,14 @@ rabbit_mq_routing_key() ->
 get_channel_connection(Config) ->
     proplists:get_value(channel_connection, Config).
 
+get_rabbitmq(Config) ->
+    proplists:get_value(rabbitmq, Config).
+
+get_tls(Config) ->
+    proplists:get_value(tls, Config).
+
 %%------------------------------------------------------------------------------
-%% Common Test Setup, Teardown and Testcase List
+%% Common Test Setup, Tear down and Testcase List
 %%------------------------------------------------------------------------------
 
 all() ->
@@ -101,35 +108,25 @@ common_init_per_group(Opts) ->
     {ok, _} = application:ensure_all_started(emqx_connector),
     {ok, _} = application:ensure_all_started(amqp_client),
     emqx_mgmt_api_test_util:init_suite(),
-    ChannelConnection = setup_rabbit_mq_exchange_and_queue(Opts),
-    [{channel_connection, ChannelConnection}].
+    #{host := Host, port := Port, tls := UseTLS} = Opts,
+    ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS),
+    [
+        {channel_connection, ChannelConnection},
+        {rabbitmq, #{server => Host, port => Port}},
+        {tls, UseTLS}
+    ].
 
-setup_rabbit_mq_exchange_and_queue(#{host := RabbitMQHost, port := RabbitMQPort, tls := UseTLS}) ->
+setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS) ->
     SSLOptions =
         case UseTLS of
-            false ->
-                none;
-            true ->
-                CertsDir = filename:join([
-                    emqx_common_test_helpers:proj_root(),
-                    ".ci",
-                    "docker-compose-file",
-                    "certs"
-                ]),
-                emqx_tls_lib:to_client_opts(
-                    #{
-                        enable => true,
-                        cacertfile => filename:join([CertsDir, "ca.crt"]),
-                        certfile => filename:join([CertsDir, "client.pem"]),
-                        keyfile => filename:join([CertsDir, "client.key"])
-                    }
-                )
+            false -> none;
+            true -> emqx_tls_lib:to_client_opts(ssl_options(UseTLS))
         end,
-    %% Create an exachange and a queue
+    %% Create an exchange and a queue
     {ok, Connection} =
         amqp_connection:start(#amqp_params_network{
-            host = RabbitMQHost,
-            port = RabbitMQPort,
+            host = Host,
+            port = Port,
             ssl_options = SSLOptions
         }),
     {ok, Channel} = amqp_connection:open_channel(Connection),
@@ -184,8 +181,7 @@ init_per_testcase(_, Config) ->
 end_per_testcase(_, _Config) ->
     ok.
 
-rabbitmq_config(Config) ->
-    %%SQL = maps:get(sql, Config, sql_insert_template_for_bridge()),
+rabbitmq_config(UseTLS, Config) ->
     BatchSize = maps:get(batch_size, Config, 1),
     BatchTime = maps:get(batch_time_ms, Config, 0),
     Name = atom_to_binary(?MODULE),
@@ -196,6 +192,7 @@ rabbitmq_config(Config) ->
         io_lib:format(
             "bridges.rabbitmq.~s {\n"
             "  enable = true\n"
+            "  ssl = ~s\n"
             "  server = \"~s\"\n"
             "  port = ~p\n"
             "  username = \"guest\"\n"
@@ -210,6 +207,7 @@ rabbitmq_config(Config) ->
             "}\n",
             [
                 Name,
+                hocon_pp:do(ssl_options(UseTLS), #{embedded => true}),
                 Server,
                 Port,
                 rabbit_mq_routing_key(),
@@ -222,80 +220,86 @@ rabbitmq_config(Config) ->
     ct:pal(ConfigString),
     parse_and_check(ConfigString, <<"rabbitmq">>, Name).
 
+ssl_options(true) ->
+    CertsDir = filename:join([
+        emqx_common_test_helpers:proj_root(),
+        ".ci",
+        "docker-compose-file",
+        "certs"
+    ]),
+    #{
+        enable => true,
+        cacertfile => filename:join([CertsDir, "ca.crt"]),
+        certfile => filename:join([CertsDir, "client.pem"]),
+        keyfile => filename:join([CertsDir, "client.key"])
+    };
+ssl_options(false) ->
+    #{
+        enable => false
+    }.
+
 parse_and_check(ConfigString, BridgeType, Name) ->
     {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
     hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
     #{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf,
     RetConfig.
 
-make_bridge(Config) ->
-    Type = <<"rabbitmq">>,
-    Name = atom_to_binary(?MODULE),
-    BridgeConfig = rabbitmq_config(Config),
-    {ok, _} = emqx_bridge:create(
-        Type,
-        Name,
-        BridgeConfig
-    ),
-    emqx_bridge_resource:bridge_id(Type, Name).
+create_bridge(Name, UseTLS, Config) ->
+    BridgeConfig = rabbitmq_config(UseTLS, Config),
+    {ok, _} = emqx_bridge:create(?TYPE, Name, BridgeConfig),
+    emqx_bridge_resource:bridge_id(?TYPE, Name).
 
-delete_bridge() ->
-    Type = <<"rabbitmq">>,
-    Name = atom_to_binary(?MODULE),
-    ok = emqx_bridge:remove(Type, Name).
+delete_bridge(Name) ->
+    ok = emqx_bridge:remove(?TYPE, Name).
 
 %%------------------------------------------------------------------------------
 %% Test Cases
 %%------------------------------------------------------------------------------
 
-t_make_delete_bridge(_Config) ->
-    make_bridge(#{}),
-    %% Check that the new brige is in the list of bridges
+t_create_delete_bridge(Config) ->
+    Name = atom_to_binary(?FUNCTION_NAME),
+    RabbitMQ = get_rabbitmq(Config),
+    UseTLS = get_tls(Config),
+    create_bridge(Name, UseTLS, RabbitMQ),
     Bridges = emqx_bridge:list(),
-    Name = atom_to_binary(?MODULE),
-    IsRightName =
-        fun
-            (#{name := BName}) when BName =:= Name ->
-                true;
-            (_) ->
-                false
-        end,
-    ?assert(lists:any(IsRightName, Bridges)),
-    delete_bridge(),
+    Any = fun(#{name := BName}) -> BName =:= Name end,
+    ?assert(lists:any(Any, Bridges), Bridges),
+    ok = delete_bridge(Name),
     BridgesAfterDelete = emqx_bridge:list(),
-    ?assertNot(lists:any(IsRightName, BridgesAfterDelete)),
+    ?assertNot(lists:any(Any, BridgesAfterDelete), BridgesAfterDelete),
     ok.
 
-t_make_delete_bridge_non_existing_server(_Config) ->
-    make_bridge(#{server => <<"non_existing_server">>, port => 3174}),
-    %% Check that the new brige is in the list of bridges
+t_create_delete_bridge_non_existing_server(Config) ->
+    Name = atom_to_binary(?FUNCTION_NAME),
+    UseTLS = get_tls(Config),
+    create_bridge(Name, UseTLS, #{server => <<"non_existing_server">>, port => 3174}),
+    %% Check that the new bridge is in the list of bridges
     Bridges = emqx_bridge:list(),
-    Name = atom_to_binary(?MODULE),
-    IsRightName =
-        fun
-            (#{name := BName}) when BName =:= Name ->
-                true;
-            (_) ->
-                false
-        end,
-    ?assert(lists:any(IsRightName, Bridges)),
-    delete_bridge(),
+    Any = fun(#{name := BName}) -> BName =:= Name end,
+    ?assert(lists:any(Any, Bridges)),
+    ok = delete_bridge(Name),
     BridgesAfterDelete = emqx_bridge:list(),
-    ?assertNot(lists:any(IsRightName, BridgesAfterDelete)),
+    ?assertNot(lists:any(Any, BridgesAfterDelete)),
     ok.
 
 t_send_message_query(Config) ->
-    BridgeID = make_bridge(#{batch_size => 1}),
+    Name = atom_to_binary(?FUNCTION_NAME),
+    RabbitMQ = get_rabbitmq(Config),
+    UseTLS = get_tls(Config),
+    BridgeID = create_bridge(Name, UseTLS, RabbitMQ#{batch_size => 1}),
     Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000},
     %% This will use the SQL template included in the bridge
     emqx_bridge:send_message(BridgeID, Payload),
     %% Check that the data got to the database
     ?assertEqual(Payload, receive_simple_test_message(Config)),
-    delete_bridge(),
+    ok = delete_bridge(Name),
     ok.
 
 t_send_message_query_with_template(Config) ->
-    BridgeID = make_bridge(#{
+    Name = atom_to_binary(?FUNCTION_NAME),
+    RabbitMQ = get_rabbitmq(Config),
+    UseTLS = get_tls(Config),
+    BridgeID = create_bridge(Name, UseTLS, RabbitMQ#{
         batch_size => 1,
         payload_template =>
             <<
@@ -318,24 +322,27 @@ t_send_message_query_with_template(Config) ->
         <<"secret">> => 42
     },
     ?assertEqual(ExpectedResult, receive_simple_test_message(Config)),
-    delete_bridge(),
+    ok = delete_bridge(Name),
     ok.
 
 t_send_simple_batch(Config) ->
-    BridgeConf =
-        #{
-            batch_size => 100
-        },
-    BridgeID = make_bridge(BridgeConf),
+    Name = atom_to_binary(?FUNCTION_NAME),
+    RabbitMQ = get_rabbitmq(Config),
+    BridgeConf = RabbitMQ#{batch_size => 100},
+    UseTLS = get_tls(Config),
+    BridgeID = create_bridge(Name, UseTLS, BridgeConf),
     Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000},
     emqx_bridge:send_message(BridgeID, Payload),
     ?assertEqual(Payload, receive_simple_test_message(Config)),
-    delete_bridge(),
+    ok = delete_bridge(Name),
     ok.
 
 t_send_simple_batch_with_template(Config) ->
+    Name = atom_to_binary(?FUNCTION_NAME),
+    RabbitMQ = get_rabbitmq(Config),
+    UseTLS = get_tls(Config),
     BridgeConf =
-        #{
+        RabbitMQ#{
             batch_size => 100,
             payload_template =>
                 <<
@@ -347,7 +354,7 @@ t_send_simple_batch_with_template(Config) ->
                     "}"
                 >>
         },
-    BridgeID = make_bridge(BridgeConf),
+    BridgeID = create_bridge(Name, UseTLS, BridgeConf),
     Payload = #{
         <<"key">> => 7,
         <<"data">> => <<"RabbitMQ">>,
@@ -358,20 +365,21 @@ t_send_simple_batch_with_template(Config) ->
         <<"secret">> => 42
     },
     ?assertEqual(ExpectedResult, receive_simple_test_message(Config)),
-    delete_bridge(),
+    ok = delete_bridge(Name),
     ok.
 
 t_heavy_batching(Config) ->
+    Name = atom_to_binary(?FUNCTION_NAME),
     NumberOfMessages = 20000,
-    BridgeConf = #{
+    RabbitMQ = get_rabbitmq(Config),
+    UseTLS = get_tls(Config),
+    BridgeConf = RabbitMQ#{
         batch_size => 10173,
         batch_time_ms => 50
     },
-    BridgeID = make_bridge(BridgeConf),
+    BridgeID = create_bridge(Name, UseTLS, BridgeConf),
     SendMessage = fun(Key) ->
-        Payload = #{
-            <<"key">> => Key
-        },
+        Payload = #{<<"key">> => Key},
         emqx_bridge:send_message(BridgeID, Payload)
     end,
     [SendMessage(Key) || Key <- lists:seq(1, NumberOfMessages)],
@@ -385,7 +393,7 @@ t_heavy_batching(Config) ->
         lists:seq(1, NumberOfMessages)
     ),
     ?assertEqual(NumberOfMessages, maps:size(AllMessages)),
-    delete_bridge(),
+    ok = delete_bridge(Name),
     ok.
 
 receive_simple_test_message(Config) ->
@@ -410,17 +418,6 @@ receive_simple_test_message(Config) ->
             #'basic.cancel_ok'{consumer_tag = ConsumerTag} =
                 amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}),
             emqx_utils_json:decode(Content#amqp_msg.payload)
+    after 5000 ->
+        ?assert(false, "Did not receive message within 5 second")
     end.
-
-rabbitmq_config() ->
-    Config =
-        #{
-            server => rabbit_mq_host(),
-            port => 5672,
-            exchange => rabbit_mq_exchange(),
-            routing_key => rabbit_mq_routing_key()
-        },
-    #{<<"config">> => Config}.
-
-test_data() ->
-    #{<<"msg_field">> => <<"Hello">>}.

+ 43 - 22
apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl

@@ -14,18 +14,18 @@
 -include_lib("amqp_client/include/amqp_client.hrl").
 
 %% This test SUITE requires a running RabbitMQ instance. If you don't want to
-%% bring up the whole CI infrastuctucture with the `scripts/ct/run.sh` script
+%% bring up the whole CI infrastructure with the `scripts/ct/run.sh` script
 %% you can create a clickhouse instance with the following command.
 %% 5672 is the default port for AMQP 0-9-1 and 15672 is the default port for
-%% the HTTP managament interface.
+%% the HTTP management interface.
 %%
 %% docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3.11-management
 
 rabbit_mq_host() ->
-    <<"rabbitmq">>.
+    list_to_binary(os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq")).
 
 rabbit_mq_port() ->
-    5672.
+    list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")).
 
 rabbit_mq_password() ->
     <<"guest">>.
@@ -43,17 +43,16 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    case
-        emqx_common_test_helpers:is_tcp_server_available(
-            erlang:binary_to_list(rabbit_mq_host()), rabbit_mq_port()
-        )
-    of
+    Host = rabbit_mq_host(),
+    Port = rabbit_mq_port(),
+    ct:pal("rabbitmq:~p~n", [{Host, Port}]),
+    case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
         true ->
             Apps = emqx_cth_suite:start(
                 [emqx_conf, emqx_connector, emqx_bridge_rabbitmq],
                 #{work_dir => emqx_cth_suite:work_dir(Config)}
             ),
-            ChannelConnection = setup_rabbit_mq_exchange_and_queue(),
+            ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port),
             [{channel_connection, ChannelConnection}, {suite_apps, Apps} | Config];
         false ->
             case os:getenv("IS_CI") of
@@ -64,12 +63,12 @@ init_per_suite(Config) ->
             end
     end.
 
-setup_rabbit_mq_exchange_and_queue() ->
-    %% Create an exachange and a queue
+setup_rabbit_mq_exchange_and_queue(Host, Port) ->
+    %% Create an exchange and a queue
     {ok, Connection} =
         amqp_connection:start(#amqp_params_network{
-            host = erlang:binary_to_list(rabbit_mq_host()),
-            port = rabbit_mq_port()
+            host = binary_to_list(Host),
+            port = Port
         }),
     {ok, Channel} = amqp_connection:open_channel(Connection),
     %% Create an exchange
@@ -122,7 +121,7 @@ end_per_suite(Config) ->
 
 t_lifecycle(Config) ->
     perform_lifecycle_check(
-        erlang:atom_to_binary(?MODULE),
+        erlang:atom_to_binary(?FUNCTION_NAME),
         rabbitmq_config(),
         Config
     ).
@@ -144,12 +143,11 @@ t_start_passfile(Config) ->
     ).
 
 perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) ->
-    #{
-        channel := Channel
-    } = get_channel_connection(TestConfig),
+    #{channel := Channel} = get_channel_connection(TestConfig),
     CheckedConfig = check_config(InitialConfig),
     #{
-        state := #{poolname := PoolName} = State,
+        id := PoolName,
+        state := State,
         status := InitialStatus
     } = create_local_resource(ResourceID, CheckedConfig),
     ?assertEqual(InitialStatus, connected),
@@ -211,9 +209,14 @@ create_local_resource(ResourceID, CheckedConfig) ->
 
 perform_query(PoolName, Channel) ->
     %% Send message to queue:
-    ok = emqx_resource:query(PoolName, {query, test_data()}),
+    ActionConfig = rabbitmq_action_config(),
+    ChannelId = <<"test_channel">>,
+    ?assertEqual(ok, emqx_resource_manager:add_channel(PoolName, ChannelId, ActionConfig)),
+    ok = emqx_resource:query(PoolName, {ChannelId, payload()}),
     %% Get the message from queue:
-    ok = receive_simple_test_message(Channel).
+    ok = receive_simple_test_message(Channel),
+    ?assertEqual(ok, emqx_resource_manager:remove_channel(PoolName, ChannelId)),
+    ok.
 
 receive_simple_test_message(Channel) ->
     #'basic.consume_ok'{consumer_tag = ConsumerTag} =
@@ -238,6 +241,8 @@ receive_simple_test_message(Channel) ->
             #'basic.cancel_ok'{consumer_tag = ConsumerTag} =
                 amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}),
             ok
+    after 5000 ->
+        ?assert(false, "Did not receive message within 5 second")
     end.
 
 rabbitmq_config() ->
@@ -255,5 +260,21 @@ rabbitmq_config(Overrides) ->
         },
     #{<<"config">> => maps:merge(Config, Overrides)}.
 
+payload() ->
+    #{<<"payload">> => test_data()}.
+
 test_data() ->
-    #{<<"msg_field">> => <<"Hello">>}.
+    #{<<"Hello">> => <<"World">>}.
+
+rabbitmq_action_config() ->
+    #{
+        config_root => actions,
+        parameters => #{
+            delivery_mode => non_persistent,
+            exchange => rabbit_mq_exchange(),
+            payload_template => <<"${.payload}">>,
+            publish_confirmation_timeout => 30000,
+            routing_key => rabbit_mq_routing_key(),
+            wait_for_publish_confirmations => true
+        }
+    }.

+ 14 - 0
apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl

@@ -64,6 +64,8 @@ resource_type(greptimedb) ->
     emqx_bridge_greptimedb_connector;
 resource_type(tdengine) ->
     emqx_bridge_tdengine_connector;
+resource_type(rabbitmq) ->
+    emqx_bridge_rabbitmq_connector;
 resource_type(Type) ->
     error({unknown_connector_type, Type}).
 
@@ -82,6 +84,8 @@ connector_impl_module(opents) ->
     emqx_bridge_opents_connector;
 connector_impl_module(tdengine) ->
     emqx_bridge_tdengine_connector;
+connector_impl_module(rabbitmq) ->
+    emqx_bridge_rabbitmq_connector;
 connector_impl_module(_ConnectorType) ->
     undefined.
 
@@ -258,6 +262,14 @@ connector_structs() ->
                     desc => <<"TDengine Connector Config">>,
                     required => false
                 }
+            )},
+        {rabbitmq,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_rabbitmq_connector_schema, "config_connector")),
+                #{
+                    desc => <<"RabbitMQ Connector Config">>,
+                    required => false
+                }
             )}
     ].
 
@@ -281,6 +293,7 @@ schema_modules() ->
         emqx_bridge_redis_schema,
         emqx_bridge_iotdb_connector,
         emqx_bridge_es_connector,
+        emqx_bridge_rabbitmq_connector_schema,
         emqx_bridge_opents_connector,
         emqx_bridge_greptimedb,
         emqx_bridge_tdengine_connector
@@ -317,6 +330,7 @@ api_schemas(Method) ->
         api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method),
         api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method),
         api_ref(emqx_bridge_opents_connector, <<"opents">>, Method),
+        api_ref(emqx_bridge_rabbitmq_connector_schema, <<"rabbitmq">>, Method),
         api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_connector"),
         api_ref(emqx_bridge_tdengine_connector, <<"tdengine">>, Method)
     ].

+ 3 - 2
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -91,7 +91,6 @@ api_schemas(Method) ->
         %% We need to map the `type' field of a request (binary) to a
         %% connector schema module.
         api_ref(emqx_bridge_http_schema, <<"http">>, Method ++ "_connector"),
-        % api_ref(emqx_bridge_mqtt_connector_schema, <<"mqtt_subscriber">>, Method ++ "_connector"),
         api_ref(emqx_bridge_mqtt_connector_schema, <<"mqtt">>, Method ++ "_connector")
     ].
 
@@ -166,7 +165,9 @@ connector_type_to_bridge_types(opents) ->
 connector_type_to_bridge_types(greptimedb) ->
     [greptimedb];
 connector_type_to_bridge_types(tdengine) ->
-    [tdengine].
+    [tdengine];
+connector_type_to_bridge_types(rabbitmq) ->
+    [rabbitmq].
 
 actions_config_name(action) -> <<"actions">>;
 actions_config_name(source) -> <<"sources">>.

+ 1 - 0
rebar.config

@@ -14,6 +14,7 @@
     warn_unused_import,
     warn_obsolete_guard,
     compressed,
+    {feature, maybe_expr, enable},
     nowarn_unused_import,
     {d, snk_kind, msg}
 ]}.