Parcourir la source

Merge pull request #10754 from fix/EMQX-10056/mqtt

feat(mqttconn): employ ecpool instead of single worker
Andrew Mayorov il y a 2 ans
Parent
commit
a2688325e5

+ 4 - 4
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -58,14 +58,14 @@
 ).
 
 -if(?EMQX_RELEASE_EDITION == ee).
-bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
-bridge_to_resource_type(mqtt) -> emqx_connector_mqtt;
+bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector;
+bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector;
 bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http;
 bridge_to_resource_type(webhook) -> emqx_connector_http;
 bridge_to_resource_type(BridgeType) -> emqx_ee_bridge:resource_type(BridgeType).
 -else.
-bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
-bridge_to_resource_type(mqtt) -> emqx_connector_mqtt;
+bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector;
+bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector;
 bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http;
 bridge_to_resource_type(webhook) -> emqx_connector_http.
 -endif.

+ 8 - 1
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -47,7 +47,14 @@
     <<"server">> => SERVER,
     <<"username">> => <<"user1">>,
     <<"password">> => <<"">>,
-    <<"proto_ver">> => <<"v5">>
+    <<"proto_ver">> => <<"v5">>,
+    <<"egress">> => #{
+        <<"remote">> => #{
+            <<"topic">> => <<"emqx/${topic}">>,
+            <<"qos">> => <<"${qos}">>,
+            <<"retain">> => false
+        }
+    }
 }).
 -define(MQTT_BRIDGE(SERVER), ?MQTT_BRIDGE(SERVER, <<"mqtt_egress_test_bridge">>)).
 

+ 32 - 0
apps/emqx_bridge_mqtt/README.md

@@ -0,0 +1,32 @@
+# EMQX MQTT Broker Bridge
+
+This application connects EMQX to virtually any MQTT broker adhering to either [MQTTv3][1] or [MQTTv5][2] standard. The connection is facilitated through the _MQTT bridge_ abstraction, allowing for the flow of data in both directions: from the remote broker to EMQX (ingress) and from EMQX to the remote broker (egress).
+
+User can create a rule and easily ingest into a remote MQTT broker by leveraging [EMQX Rules][3].
+
+
+# Documentation
+
+- Refer to [Bridge Data into MQTT Broker][4] for how to use EMQX dashboard to set up ingress or egress bridge, or even both at the same time.
+
+- Refer to [EMQX Rules][3] for the EMQX rules engine introduction.
+
+
+# HTTP APIs
+
+Several APIs are provided for bridge management, refer to [API Docs - Bridges](https://docs.emqx.com/en/enterprise/v5.0/admin/api-docs.html#tag/Bridges) for more detailed information.
+
+
+# Contributing
+
+Please see our [contributing guide](../../CONTRIBUTING.md).
+
+
+# License
+
+Apache License 2.0, see [LICENSE](../../APL.txt).
+
+[1]: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1
+[2]: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html
+[3]: https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html
+[4]: https://www.emqx.io/docs/en/v5.0/data-integration/data-bridge-mqtt.html

+ 3 - 0
apps/emqx_bridge_mqtt/rebar.config

@@ -0,0 +1,3 @@
+{deps, [
+    {emqx, {path, "../../apps/emqx"}}
+]}.

+ 18 - 0
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src

@@ -0,0 +1,18 @@
+%% -*- mode: erlang -*-
+{application, emqx_bridge_mqtt, [
+    {description, "EMQX MQTT Broker Bridge"},
+    {vsn, "0.1.0"},
+    {registered, []},
+    {applications, [
+        kernel,
+        stdlib,
+        emqx,
+        emqx_resource,
+        emqx_bridge,
+        emqtt
+    ]},
+    {env, []},
+    {modules, []},
+    {licenses, ["Apache 2.0"]},
+    {links, []}
+]}.

+ 340 - 0
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl

@@ -0,0 +1,340 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_mqtt_connector).
+
+-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx/include/logger.hrl").
+
+-behaviour(emqx_resource).
+
+-export([on_message_received/3]).
+
+%% callbacks of behaviour emqx_resource
+-export([
+    callback_mode/0,
+    on_start/2,
+    on_stop/2,
+    on_query/3,
+    on_query_async/4,
+    on_get_status/2
+]).
+
+-export([on_async_result/2]).
+
+-define(HEALTH_CHECK_TIMEOUT, 1000).
+
+%% ===================================================================
+%% When use this bridge as a data source, ?MODULE:on_message_received will be called
+%% if the bridge received msgs from the remote broker.
+on_message_received(Msg, HookPoint, ResId) ->
+    emqx_resource_metrics:received_inc(ResId),
+    emqx:run_hook(HookPoint, [Msg]).
+
+%% ===================================================================
+callback_mode() -> async_if_possible.
+
+on_start(ResourceId, Conf) ->
+    ?SLOG(info, #{
+        msg => "starting_mqtt_connector",
+        connector => ResourceId,
+        config => emqx_utils:redact(Conf)
+    }),
+    case start_ingress(ResourceId, Conf) of
+        {ok, Result1} ->
+            case start_egress(ResourceId, Conf) of
+                {ok, Result2} ->
+                    {ok, maps:merge(Result1, Result2)};
+                {error, Reason} ->
+                    _ = stop_ingress(Result1),
+                    {error, Reason}
+            end;
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+start_ingress(ResourceId, Conf) ->
+    ClientOpts = mk_client_opts(ResourceId, "ingress", Conf),
+    case mk_ingress_config(ResourceId, Conf) of
+        Ingress = #{} ->
+            start_ingress(ResourceId, Ingress, ClientOpts);
+        undefined ->
+            {ok, #{}}
+    end.
+
+start_ingress(ResourceId, Ingress, ClientOpts) ->
+    PoolName = <<ResourceId/binary, ":ingress">>,
+    PoolSize = choose_ingress_pool_size(ResourceId, Ingress),
+    Options = [
+        {name, PoolName},
+        {pool_size, PoolSize},
+        {ingress, Ingress},
+        {client_opts, ClientOpts}
+    ],
+    case emqx_resource_pool:start(PoolName, emqx_bridge_mqtt_ingress, Options) of
+        ok ->
+            {ok, #{ingress_pool_name => PoolName}};
+        {error, {start_pool_failed, _, Reason}} ->
+            {error, Reason}
+    end.
+
+choose_ingress_pool_size(
+    ResourceId,
+    #{remote := #{topic := RemoteTopic}, pool_size := PoolSize}
+) ->
+    case emqx_topic:parse(RemoteTopic) of
+        {_Filter, #{share := _Name}} ->
+            % NOTE: this is shared subscription, many workers may subscribe
+            PoolSize;
+        {_Filter, #{}} ->
+            % NOTE: this is regular subscription, only one worker should subscribe
+            ?SLOG(warning, #{
+                msg => "mqtt_bridge_ingress_pool_size_ignored",
+                connector => ResourceId,
+                reason =>
+                    "Remote topic filter is not a shared subscription, "
+                    "ingress pool will start with a single worker",
+                config_pool_size => PoolSize,
+                pool_size => 1
+            }),
+            1
+    end.
+
+start_egress(ResourceId, Conf) ->
+    % NOTE
+    % We are ignoring the user configuration here because there's currently no reliable way
+    % to ensure proper session recovery according to the MQTT spec.
+    ClientOpts = maps:put(clean_start, true, mk_client_opts(ResourceId, "egress", Conf)),
+    case mk_egress_config(Conf) of
+        Egress = #{} ->
+            start_egress(ResourceId, Egress, ClientOpts);
+        undefined ->
+            {ok, #{}}
+    end.
+
+start_egress(ResourceId, Egress, ClientOpts) ->
+    PoolName = <<ResourceId/binary, ":egress">>,
+    PoolSize = maps:get(pool_size, Egress),
+    Options = [
+        {name, PoolName},
+        {pool_size, PoolSize},
+        {client_opts, ClientOpts}
+    ],
+    case emqx_resource_pool:start(PoolName, emqx_bridge_mqtt_egress, Options) of
+        ok ->
+            {ok, #{
+                egress_pool_name => PoolName,
+                egress_config => emqx_bridge_mqtt_egress:config(Egress)
+            }};
+        {error, {start_pool_failed, _, Reason}} ->
+            {error, Reason}
+    end.
+
+on_stop(ResourceId, State) ->
+    ?SLOG(info, #{
+        msg => "stopping_mqtt_connector",
+        connector => ResourceId
+    }),
+    ok = stop_ingress(State),
+    ok = stop_egress(State).
+
+stop_ingress(#{ingress_pool_name := PoolName}) ->
+    emqx_resource_pool:stop(PoolName);
+stop_ingress(#{}) ->
+    ok.
+
+stop_egress(#{egress_pool_name := PoolName}) ->
+    emqx_resource_pool:stop(PoolName);
+stop_egress(#{}) ->
+    ok.
+
+on_query(
+    ResourceId,
+    {send_message, Msg},
+    #{egress_pool_name := PoolName, egress_config := Config}
+) ->
+    ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => ResourceId}),
+    handle_send_result(with_egress_client(PoolName, send, [Msg, Config]));
+on_query(ResourceId, {send_message, Msg}, #{}) ->
+    ?SLOG(error, #{
+        msg => "forwarding_unavailable",
+        connector => ResourceId,
+        message => Msg,
+        reason => "Egress is not configured"
+    }).
+
+on_query_async(
+    ResourceId,
+    {send_message, Msg},
+    CallbackIn,
+    #{egress_pool_name := PoolName, egress_config := Config}
+) ->
+    ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => ResourceId}),
+    Callback = {fun on_async_result/2, [CallbackIn]},
+    Result = with_egress_client(PoolName, send_async, [Msg, Callback, Config]),
+    case Result of
+        ok ->
+            ok;
+        {ok, Pid} when is_pid(Pid) ->
+            {ok, Pid};
+        {error, Reason} ->
+            {error, classify_error(Reason)}
+    end;
+on_query_async(ResourceId, {send_message, Msg}, _Callback, #{}) ->
+    ?SLOG(error, #{
+        msg => "forwarding_unavailable",
+        connector => ResourceId,
+        message => Msg,
+        reason => "Egress is not configured"
+    }).
+
+with_egress_client(ResourceId, Fun, Args) ->
+    ecpool:pick_and_do(ResourceId, {emqx_bridge_mqtt_egress, Fun, Args}, no_handover).
+
+on_async_result(Callback, Result) ->
+    apply_callback_function(Callback, handle_send_result(Result)).
+
+apply_callback_function(F, Result) when is_function(F) ->
+    erlang:apply(F, [Result]);
+apply_callback_function({F, A}, Result) when is_function(F), is_list(A) ->
+    erlang:apply(F, A ++ [Result]);
+apply_callback_function({M, F, A}, Result) when is_atom(M), is_atom(F), is_list(A) ->
+    erlang:apply(M, F, A ++ [Result]).
+
+handle_send_result(ok) ->
+    ok;
+handle_send_result({ok, #{reason_code := ?RC_SUCCESS}}) ->
+    ok;
+handle_send_result({ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}}) ->
+    ok;
+handle_send_result({ok, Reply}) ->
+    {error, classify_reply(Reply)};
+handle_send_result({error, Reason}) ->
+    {error, classify_error(Reason)}.
+
+classify_reply(Reply = #{reason_code := _}) ->
+    {unrecoverable_error, Reply}.
+
+classify_error(disconnected = Reason) ->
+    {recoverable_error, Reason};
+classify_error(ecpool_empty) ->
+    {recoverable_error, disconnected};
+classify_error({disconnected, _RC, _} = Reason) ->
+    {recoverable_error, Reason};
+classify_error({shutdown, _} = Reason) ->
+    {recoverable_error, Reason};
+classify_error(shutdown = Reason) ->
+    {recoverable_error, Reason};
+classify_error(Reason) ->
+    {unrecoverable_error, Reason}.
+
+on_get_status(_ResourceId, State) ->
+    Pools = maps:to_list(maps:with([ingress_pool_name, egress_pool_name], State)),
+    Workers = [{Pool, Worker} || {Pool, PN} <- Pools, {_Name, Worker} <- ecpool:workers(PN)],
+    try emqx_utils:pmap(fun get_status/1, Workers, ?HEALTH_CHECK_TIMEOUT) of
+        Statuses ->
+            combine_status(Statuses)
+    catch
+        exit:timeout ->
+            connecting
+    end.
+
+get_status({Pool, Worker}) ->
+    case ecpool_worker:client(Worker) of
+        {ok, Client} when Pool == ingress_pool_name ->
+            emqx_bridge_mqtt_ingress:status(Client);
+        {ok, Client} when Pool == egress_pool_name ->
+            emqx_bridge_mqtt_egress:status(Client);
+        {error, _} ->
+            disconnected
+    end.
+
+combine_status(Statuses) ->
+    %% NOTE
+    %% Natural order of statuses: [connected, connecting, disconnected]
+    %% * `disconnected` wins over any other status
+    %% * `connecting` wins over `connected`
+    case lists:reverse(lists:usort(Statuses)) of
+        [Status | _] ->
+            Status;
+        [] ->
+            disconnected
+    end.
+
+mk_ingress_config(
+    ResourceId,
+    #{
+        ingress := Ingress = #{remote := _},
+        server := Server,
+        hookpoint := HookPoint
+    }
+) ->
+    Ingress#{
+        server => Server,
+        on_message_received => {?MODULE, on_message_received, [HookPoint, ResourceId]}
+    };
+mk_ingress_config(ResourceId, #{ingress := #{remote := _}} = Conf) ->
+    error({no_hookpoint_provided, ResourceId, Conf});
+mk_ingress_config(_ResourceId, #{}) ->
+    undefined.
+
+mk_egress_config(#{egress := Egress = #{remote := _}}) ->
+    Egress;
+mk_egress_config(#{}) ->
+    undefined.
+
+mk_client_opts(
+    ResourceId,
+    ClientScope,
+    Config = #{
+        server := Server,
+        keepalive := KeepAlive,
+        ssl := #{enable := EnableSsl} = Ssl
+    }
+) ->
+    HostPort = emqx_bridge_mqtt_connector_schema:parse_server(Server),
+    Options = maps:with(
+        [
+            proto_ver,
+            username,
+            password,
+            clean_start,
+            retry_interval,
+            max_inflight,
+            % Opening a connection in bridge mode will form a non-standard mqtt connection message.
+            % A load balancing server (such as haproxy) is often set up before the emqx broker server.
+            % When the load balancing server enables mqtt connection packet inspection,
+            % non-standard mqtt connection packets might be filtered out by LB.
+            bridge_mode
+        ],
+        Config
+    ),
+    Options#{
+        hosts => [HostPort],
+        clientid => clientid(ResourceId, ClientScope, Config),
+        connect_timeout => 30,
+        keepalive => ms_to_s(KeepAlive),
+        force_ping => true,
+        ssl => EnableSsl,
+        ssl_opts => maps:to_list(maps:remove(enable, Ssl))
+    }.
+
+ms_to_s(Ms) ->
+    erlang:ceil(Ms / 1000).
+
+clientid(Id, ClientScope, _Conf = #{clientid_prefix := Prefix}) when is_binary(Prefix) ->
+    iolist_to_binary([Prefix, ":", Id, ":", ClientScope, ":", atom_to_list(node())]);
+clientid(Id, ClientScope, _Conf) ->
+    iolist_to_binary([Id, ":", ClientScope, ":", atom_to_list(node())]).

+ 23 - 10
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl

@@ -14,7 +14,7 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
--module(emqx_connector_mqtt_schema).
+-module(emqx_bridge_mqtt_connector_schema).
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
@@ -68,7 +68,8 @@ fields("server_configs") ->
                 hoconsc:enum([cluster_shareload]),
                 #{
                     default => cluster_shareload,
-                    desc => ?DESC("mode")
+                    desc => ?DESC("mode"),
+                    deprecated => {since, "v5.1.0 & e5.1.0"}
                 }
             )},
         {server, emqx_schema:servers_sc(#{desc => ?DESC("server")}, ?MQTT_HOST_OPTS)},
@@ -133,16 +134,17 @@ fields("server_configs") ->
     ] ++ emqx_connector_schema_lib:ssl_fields();
 fields("ingress") ->
     [
-        {"remote",
+        {pool_size, fun ingress_pool_size/1},
+        {remote,
             mk(
                 ref(?MODULE, "ingress_remote"),
-                #{desc => ?DESC(emqx_connector_mqtt_schema, "ingress_remote")}
+                #{desc => ?DESC("ingress_remote")}
             )},
-        {"local",
+        {local,
             mk(
                 ref(?MODULE, "ingress_local"),
                 #{
-                    desc => ?DESC(emqx_connector_mqtt_schema, "ingress_local")
+                    desc => ?DESC("ingress_local")
                 }
             )}
     ];
@@ -204,19 +206,20 @@ fields("ingress_local") ->
     ];
 fields("egress") ->
     [
-        {"local",
+        {pool_size, fun egress_pool_size/1},
+        {local,
             mk(
                 ref(?MODULE, "egress_local"),
                 #{
-                    desc => ?DESC(emqx_connector_mqtt_schema, "egress_local"),
+                    desc => ?DESC("egress_local"),
                     required => false
                 }
             )},
-        {"remote",
+        {remote,
             mk(
                 ref(?MODULE, "egress_remote"),
                 #{
-                    desc => ?DESC(emqx_connector_mqtt_schema, "egress_remote"),
+                    desc => ?DESC("egress_remote"),
                     required => true
                 }
             )}
@@ -272,6 +275,16 @@ fields("egress_remote") ->
             )}
     ].
 
+ingress_pool_size(desc) ->
+    ?DESC("ingress_pool_size");
+ingress_pool_size(Prop) ->
+    emqx_connector_schema_lib:pool_size(Prop).
+
+egress_pool_size(desc) ->
+    ?DESC("egress_pool_size");
+egress_pool_size(Prop) ->
+    emqx_connector_schema_lib:pool_size(Prop).
+
 desc("server_configs") ->
     ?DESC("server_configs");
 desc("ingress") ->

+ 162 - 0
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl

@@ -0,0 +1,162 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_mqtt_egress).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
+
+-behaviour(ecpool_worker).
+
+%% ecpool
+-export([connect/1]).
+
+-export([
+    config/1,
+    send/3,
+    send_async/4
+]).
+
+%% management APIs
+-export([
+    status/1,
+    info/1
+]).
+
+-type name() :: term().
+-type message() :: emqx_types:message() | map().
+-type callback() :: {function(), [_Arg]} | {module(), atom(), [_Arg]}.
+-type remote_message() :: #mqtt_msg{}.
+
+-type option() ::
+    {name, name()}
+    %% see `emqtt:option()`
+    | {client_opts, map()}.
+
+-type egress() :: #{
+    local => #{
+        topic => emqx_topic:topic()
+    },
+    remote := emqx_bridge_mqtt_msg:msgvars()
+}.
+
+%% @doc Start an ingress bridge worker.
+-spec connect([option() | {ecpool_worker_id, pos_integer()}]) ->
+    {ok, pid()} | {error, _Reason}.
+connect(Options) ->
+    ?SLOG(debug, #{
+        msg => "egress_client_starting",
+        options => emqx_utils:redact(Options)
+    }),
+    Name = proplists:get_value(name, Options),
+    WorkerId = proplists:get_value(ecpool_worker_id, Options),
+    ClientOpts = proplists:get_value(client_opts, Options),
+    case emqtt:start_link(mk_client_opts(WorkerId, ClientOpts)) of
+        {ok, Pid} ->
+            connect(Pid, Name);
+        {error, Reason} = Error ->
+            ?SLOG(error, #{
+                msg => "egress_client_start_failed",
+                config => emqx_utils:redact(ClientOpts),
+                reason => Reason
+            }),
+            Error
+    end.
+
+mk_client_opts(WorkerId, ClientOpts = #{clientid := ClientId}) ->
+    ClientOpts#{clientid := mk_clientid(WorkerId, ClientId)}.
+
+mk_clientid(WorkerId, ClientId) ->
+    iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]).
+
+connect(Pid, Name) ->
+    case emqtt:connect(Pid) of
+        {ok, _Props} ->
+            {ok, Pid};
+        {error, Reason} = Error ->
+            ?SLOG(warning, #{
+                msg => "egress_client_connect_failed",
+                reason => Reason,
+                name => Name
+            }),
+            _ = catch emqtt:stop(Pid),
+            Error
+    end.
+
+%%
+
+-spec config(map()) ->
+    egress().
+config(#{remote := RC = #{}} = Conf) ->
+    Conf#{remote => emqx_bridge_mqtt_msg:parse(RC)}.
+
+-spec send(pid(), message(), egress()) ->
+    ok.
+send(Pid, MsgIn, Egress) ->
+    emqtt:publish(Pid, export_msg(MsgIn, Egress)).
+
+-spec send_async(pid(), message(), callback(), egress()) ->
+    ok | {ok, pid()}.
+send_async(Pid, MsgIn, Callback, Egress) ->
+    ok = emqtt:publish_async(Pid, export_msg(MsgIn, Egress), _Timeout = infinity, Callback),
+    {ok, Pid}.
+
+export_msg(Msg, #{remote := Remote}) ->
+    to_remote_msg(Msg, Remote).
+
+-spec to_remote_msg(message(), emqx_bridge_mqtt_msg:msgvars()) ->
+    remote_message().
+to_remote_msg(#message{flags = Flags} = Msg, Vars) ->
+    {EventMsg, _} = emqx_rule_events:eventmsg_publish(Msg),
+    to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars);
+to_remote_msg(Msg = #{}, Remote) ->
+    #{
+        topic := Topic,
+        payload := Payload,
+        qos := QoS,
+        retain := Retain
+    } = emqx_bridge_mqtt_msg:render(Msg, Remote),
+    PubProps = maps:get(pub_props, Msg, #{}),
+    #mqtt_msg{
+        qos = QoS,
+        retain = Retain,
+        topic = Topic,
+        props = emqx_utils:pub_props_to_packet(PubProps),
+        payload = Payload
+    }.
+
+%%
+
+-spec info(pid()) ->
+    [{atom(), term()}].
+info(Pid) ->
+    emqtt:info(Pid).
+
+-spec status(pid()) ->
+    emqx_resource:resource_status().
+status(Pid) ->
+    try
+        case proplists:get_value(socket, info(Pid)) of
+            Socket when Socket /= undefined ->
+                connected;
+            undefined ->
+                connecting
+        end
+    catch
+        exit:{noproc, _} ->
+            disconnected
+    end.

+ 274 - 0
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl

@@ -0,0 +1,274 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_mqtt_ingress).
+
+-include_lib("emqx/include/logger.hrl").
+
+-behaviour(ecpool_worker).
+
+%% ecpool
+-export([connect/1]).
+
+%% management APIs
+-export([
+    status/1,
+    info/1
+]).
+
+-export([handle_publish/5]).
+-export([handle_disconnect/1]).
+
+-type name() :: term().
+
+-type option() ::
+    {name, name()}
+    | {ingress, map()}
+    %% see `emqtt:option()`
+    | {client_opts, map()}.
+
+-type ingress() :: #{
+    server := string(),
+    remote := #{
+        topic := emqx_topic:topic(),
+        qos => emqx_types:qos()
+    },
+    local := emqx_bridge_mqtt_msg:msgvars(),
+    on_message_received := {module(), atom(), [term()]}
+}.
+
+%% @doc Start an ingress bridge worker.
+-spec connect([option() | {ecpool_worker_id, pos_integer()}]) ->
+    {ok, pid()} | {error, _Reason}.
+connect(Options) ->
+    ?SLOG(debug, #{
+        msg => "ingress_client_starting",
+        options => emqx_utils:redact(Options)
+    }),
+    Name = proplists:get_value(name, Options),
+    WorkerId = proplists:get_value(ecpool_worker_id, Options),
+    Ingress = config(proplists:get_value(ingress, Options), Name),
+    ClientOpts = proplists:get_value(client_opts, Options),
+    case emqtt:start_link(mk_client_opts(Name, WorkerId, Ingress, ClientOpts)) of
+        {ok, Pid} ->
+            connect(Pid, Name, Ingress);
+        {error, Reason} = Error ->
+            ?SLOG(error, #{
+                msg => "client_start_failed",
+                config => emqx_utils:redact(ClientOpts),
+                reason => Reason
+            }),
+            Error
+    end.
+
+mk_client_opts(Name, WorkerId, Ingress, ClientOpts = #{clientid := ClientId}) ->
+    ClientOpts#{
+        clientid := mk_clientid(WorkerId, ClientId),
+        msg_handler => mk_client_event_handler(Name, Ingress)
+    }.
+
+mk_clientid(WorkerId, ClientId) ->
+    iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]).
+
+mk_client_event_handler(Name, Ingress = #{}) ->
+    IngressVars = maps:with([server], Ingress),
+    OnMessage = maps:get(on_message_received, Ingress, undefined),
+    LocalPublish =
+        case Ingress of
+            #{local := Local = #{topic := _}} ->
+                Local;
+            #{} ->
+                undefined
+        end,
+    #{
+        publish => {fun ?MODULE:handle_publish/5, [Name, OnMessage, LocalPublish, IngressVars]},
+        disconnected => {fun ?MODULE:handle_disconnect/1, []}
+    }.
+
+-spec connect(pid(), name(), ingress()) ->
+    {ok, pid()} | {error, _Reason}.
+connect(Pid, Name, Ingress) ->
+    case emqtt:connect(Pid) of
+        {ok, _Props} ->
+            case subscribe_remote_topic(Pid, Ingress) of
+                {ok, _, _RCs} ->
+                    {ok, Pid};
+                {error, Reason} = Error ->
+                    ?SLOG(error, #{
+                        msg => "ingress_client_subscribe_failed",
+                        ingress => Ingress,
+                        name => Name,
+                        reason => Reason
+                    }),
+                    _ = catch emqtt:stop(Pid),
+                    Error
+            end;
+        {error, Reason} = Error ->
+            ?SLOG(warning, #{
+                msg => "ingress_client_connect_failed",
+                reason => Reason,
+                name => Name
+            }),
+            _ = catch emqtt:stop(Pid),
+            Error
+    end.
+
+subscribe_remote_topic(Pid, #{remote := #{topic := RemoteTopic, qos := QoS}}) ->
+    emqtt:subscribe(Pid, RemoteTopic, QoS).
+
+%%
+
+-spec config(map(), name()) ->
+    ingress().
+config(#{remote := RC, local := LC} = Conf, BridgeName) ->
+    Conf#{
+        remote => parse_remote(RC, BridgeName),
+        local => emqx_bridge_mqtt_msg:parse(LC)
+    }.
+
+parse_remote(#{qos := QoSIn} = Conf, BridgeName) ->
+    QoS = downgrade_ingress_qos(QoSIn),
+    case QoS of
+        QoSIn ->
+            ok;
+        _ ->
+            ?SLOG(warning, #{
+                msg => "downgraded_unsupported_ingress_qos",
+                qos_configured => QoSIn,
+                qos_used => QoS,
+                name => BridgeName
+            })
+    end,
+    Conf#{qos => QoS}.
+
+downgrade_ingress_qos(2) ->
+    1;
+downgrade_ingress_qos(QoS) ->
+    QoS.
+
+%%
+
+-spec info(pid()) ->
+    [{atom(), term()}].
+info(Pid) ->
+    emqtt:info(Pid).
+
+-spec status(pid()) ->
+    emqx_resource:resource_status().
+status(Pid) ->
+    try
+        case proplists:get_value(socket, info(Pid)) of
+            Socket when Socket /= undefined ->
+                connected;
+            undefined ->
+                connecting
+        end
+    catch
+        exit:{noproc, _} ->
+            disconnected
+    end.
+
+%%
+
+handle_publish(#{properties := Props} = MsgIn, Name, OnMessage, LocalPublish, IngressVars) ->
+    Msg = import_msg(MsgIn, IngressVars),
+    ?SLOG(debug, #{
+        msg => "ingress_publish_local",
+        message => Msg,
+        name => Name
+    }),
+    maybe_on_message_received(Msg, OnMessage),
+    maybe_publish_local(Msg, LocalPublish, Props).
+
+handle_disconnect(_Reason) ->
+    ok.
+
+maybe_on_message_received(Msg, {Mod, Func, Args}) ->
+    erlang:apply(Mod, Func, [Msg | Args]);
+maybe_on_message_received(_Msg, undefined) ->
+    ok.
+
+maybe_publish_local(Msg, Local = #{}, Props) ->
+    emqx_broker:publish(to_broker_msg(Msg, Local, Props));
+maybe_publish_local(_Msg, undefined, _Props) ->
+    ok.
+
+%%
+
+import_msg(
+    #{
+        dup := Dup,
+        payload := Payload,
+        properties := Props,
+        qos := QoS,
+        retain := Retain,
+        topic := Topic
+    },
+    #{server := Server}
+) ->
+    #{
+        id => emqx_guid:to_hexstr(emqx_guid:gen()),
+        server => Server,
+        payload => Payload,
+        topic => Topic,
+        qos => QoS,
+        dup => Dup,
+        retain => Retain,
+        pub_props => printable_maps(Props),
+        message_received_at => erlang:system_time(millisecond)
+    }.
+
+printable_maps(undefined) ->
+    #{};
+printable_maps(Headers) ->
+    maps:fold(
+        fun
+            ('User-Property', V0, AccIn) when is_list(V0) ->
+                AccIn#{
+                    'User-Property' => maps:from_list(V0),
+                    'User-Property-Pairs' => [
+                        #{
+                            key => Key,
+                            value => Value
+                        }
+                     || {Key, Value} <- V0
+                    ]
+                };
+            (K, V0, AccIn) ->
+                AccIn#{K => V0}
+        end,
+        #{},
+        Headers
+    ).
+
+%% published from remote node over a MQTT connection
+to_broker_msg(Msg, Vars, undefined) ->
+    to_broker_msg(Msg, Vars, #{});
+to_broker_msg(#{dup := Dup} = Msg, Local, Props) ->
+    #{
+        topic := Topic,
+        payload := Payload,
+        qos := QoS,
+        retain := Retain
+    } = emqx_bridge_mqtt_msg:render(Msg, Local),
+    PubProps = maps:get(pub_props, Msg, #{}),
+    emqx_message:set_headers(
+        Props#{properties => emqx_utils:pub_props_to_packet(PubProps)},
+        emqx_message:set_flags(
+            #{dup => Dup, retain => Retain},
+            emqx_message:make(bridge, QoS, Topic, Payload)
+        )
+    ).

+ 95 - 0
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_msg.erl

@@ -0,0 +1,95 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_mqtt_msg).
+
+-export([parse/1]).
+-export([render/2]).
+
+-export_type([msgvars/0]).
+
+-type template() :: emqx_plugin_libs_rule:tmpl_token().
+
+-type msgvars() :: #{
+    topic => template(),
+    qos => template() | emqx_types:qos(),
+    retain => template() | boolean(),
+    payload => template() | undefined
+}.
+
+%%
+
+-spec parse(#{
+    topic => iodata(),
+    qos => iodata() | emqx_types:qos(),
+    retain => iodata() | boolean(),
+    payload => iodata()
+}) ->
+    msgvars().
+parse(Conf) ->
+    Acc1 = parse_field(topic, Conf, Conf),
+    Acc2 = parse_field(qos, Conf, Acc1),
+    Acc3 = parse_field(payload, Conf, Acc2),
+    parse_field(retain, Conf, Acc3).
+
+parse_field(Key, Conf, Acc) ->
+    case Conf of
+        #{Key := Val} when is_binary(Val) ->
+            Acc#{Key => emqx_plugin_libs_rule:preproc_tmpl(Val)};
+        #{Key := Val} ->
+            Acc#{Key => Val};
+        #{} ->
+            Acc
+    end.
+
+render(
+    Msg,
+    #{
+        topic := TopicToken,
+        qos := QoSToken,
+        retain := RetainToken
+    } = Vars
+) ->
+    #{
+        topic => render_string(TopicToken, Msg),
+        payload => render_payload(Vars, Msg),
+        qos => render_simple_var(QoSToken, Msg),
+        retain => render_simple_var(RetainToken, Msg)
+    }.
+
+render_payload(From, MapMsg) ->
+    do_render_payload(maps:get(payload, From, undefined), MapMsg).
+
+do_render_payload(undefined, Msg) ->
+    emqx_utils_json:encode(Msg);
+do_render_payload(Tks, Msg) ->
+    render_string(Tks, Msg).
+
+%% Replace a string contains vars to another string in which the placeholders are replace by the
+%% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be:
+%% "a: 1".
+render_string(Tokens, Data) when is_list(Tokens) ->
+    emqx_placeholder:proc_tmpl(Tokens, Data, #{return => full_binary});
+render_string(Val, _Data) ->
+    Val.
+
+%% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result
+%% value will be an integer 1.
+render_simple_var(Tokens, Data) when is_list(Tokens) ->
+    [Var] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}),
+    Var;
+render_simple_var(Val, _Data) ->
+    Val.

+ 1 - 1
apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl

@@ -42,7 +42,7 @@ fields("config") ->
                     }
                 )}
         ] ++
-        emqx_connector_mqtt_schema:fields("config");
+        emqx_bridge_mqtt_connector_schema:fields("config");
 fields("creation_opts") ->
     Opts = emqx_resource_schema:fields("creation_opts"),
     [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)];

+ 61 - 10
apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl

@@ -22,9 +22,7 @@
 
 -include("emqx/include/emqx.hrl").
 -include_lib("eunit/include/eunit.hrl").
--include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
--include("emqx_dashboard/include/emqx_dashboard.hrl").
 
 %% output functions
 -export([inspect/3]).
@@ -132,13 +130,11 @@ suite() ->
 
 init_per_suite(Config) ->
     _ = application:load(emqx_conf),
-    %% some testcases (may from other app) already get emqx_connector started
-    _ = application:stop(emqx_resource),
-    _ = application:stop(emqx_connector),
     ok = emqx_common_test_helpers:start_apps(
         [
             emqx_rule_engine,
             emqx_bridge,
+            emqx_bridge_mqtt,
             emqx_dashboard
         ],
         fun set_special_configs/1
@@ -152,9 +148,10 @@ init_per_suite(Config) ->
 
 end_per_suite(_Config) ->
     emqx_common_test_helpers:stop_apps([
-        emqx_rule_engine,
+        emqx_dashboard,
+        emqx_bridge_mqtt,
         emqx_bridge,
-        emqx_dashboard
+        emqx_rule_engine
     ]),
     ok.
 
@@ -221,6 +218,12 @@ t_mqtt_conn_bridge_ingress(_) ->
         request(put, uri(["bridges", BridgeIDIngress]), ServerConf)
     ),
 
+    %% non-shared subscription, verify that only one client is subscribed
+    ?assertEqual(
+        1,
+        length(emqx:subscribers(<<?INGRESS_REMOTE_TOPIC, "/#">>))
+    ),
+
     %% we now test if the bridge works as expected
     RemoteTopic = <<?INGRESS_REMOTE_TOPIC, "/1">>,
     LocalTopic = <<?INGRESS_LOCAL_TOPIC, "/", RemoteTopic/binary>>,
@@ -245,6 +248,48 @@ t_mqtt_conn_bridge_ingress(_) ->
 
     ok.
 
+t_mqtt_conn_bridge_ingress_shared_subscription(_) ->
+    PoolSize = 4,
+    Ns = lists:seq(1, 10),
+    BridgeName = atom_to_binary(?FUNCTION_NAME),
+    BridgeID = create_bridge(
+        ?SERVER_CONF(<<>>)#{
+            <<"type">> => ?TYPE_MQTT,
+            <<"name">> => BridgeName,
+            <<"ingress">> => #{
+                <<"pool_size">> => PoolSize,
+                <<"remote">> => #{
+                    <<"topic">> => <<"$share/ingress/", ?INGRESS_REMOTE_TOPIC, "/#">>,
+                    <<"qos">> => 1
+                },
+                <<"local">> => #{
+                    <<"topic">> => <<?INGRESS_LOCAL_TOPIC, "/${topic}">>,
+                    <<"qos">> => <<"${qos}">>,
+                    <<"payload">> => <<"${clientid}">>,
+                    <<"retain">> => <<"${retain}">>
+                }
+            }
+        }
+    ),
+
+    RemoteTopic = <<?INGRESS_REMOTE_TOPIC, "/1">>,
+    LocalTopic = <<?INGRESS_LOCAL_TOPIC, "/", RemoteTopic/binary>>,
+    ok = emqx:subscribe(LocalTopic),
+
+    _ = emqx_utils:pmap(
+        fun emqx:publish/1,
+        [emqx_message:make(RemoteTopic, <<>>) || _ <- Ns]
+    ),
+    _ = [assert_mqtt_msg_received(LocalTopic) || _ <- Ns],
+
+    ?assertEqual(
+        PoolSize,
+        length(emqx_shared_sub:subscribers(<<"ingress">>, <<?INGRESS_REMOTE_TOPIC, "/#">>))
+    ),
+
+    {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
+    ok.
+
 t_mqtt_egress_bridge_ignores_clean_start(_) ->
     BridgeName = atom_to_binary(?FUNCTION_NAME),
     BridgeID = create_bridge(
@@ -256,11 +301,17 @@ t_mqtt_egress_bridge_ignores_clean_start(_) ->
         }
     ),
 
-    {ok, _, #{state := #{name := WorkerName}}} =
-        emqx_resource:get_instance(emqx_bridge_resource:resource_id(BridgeID)),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeID),
+    {ok, _Group, #{state := #{egress_pool_name := EgressPoolName}}} =
+        emqx_resource_manager:lookup_cached(ResourceID),
+    ClientInfo = ecpool:pick_and_do(
+        EgressPoolName,
+        {emqx_bridge_mqtt_egress, info, []},
+        no_handover
+    ),
     ?assertMatch(
         #{clean_start := true},
-        maps:from_list(emqx_connector_mqtt_worker:info(WorkerName))
+        maps:from_list(ClientInfo)
     ),
 
     %% delete the bridge

+ 1 - 1
apps/emqx_connector/src/emqx_connector.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_connector, [
     {description, "EMQX Data Integration Connectors"},
-    {vsn, "0.1.24"},
+    {vsn, "0.1.25"},
     {registered, []},
     {mod, {emqx_connector_app, []}},
     {applications, [

+ 0 - 319
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -1,319 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
--module(emqx_connector_mqtt).
-
--include("emqx_connector.hrl").
-
--include_lib("typerefl/include/types.hrl").
--include_lib("hocon/include/hoconsc.hrl").
--include_lib("emqx/include/logger.hrl").
-
--behaviour(supervisor).
--behaviour(emqx_resource).
-
-%% API and callbacks for supervisor
--export([
-    callback_mode/0,
-    start_link/0,
-    init/1,
-    create_bridge/1,
-    drop_bridge/1,
-    bridges/0
-]).
-
--export([on_message_received/3]).
-
-%% callbacks of behaviour emqx_resource
--export([
-    on_start/2,
-    on_stop/2,
-    on_query/3,
-    on_query_async/4,
-    on_get_status/2
-]).
-
--export([on_async_result/2]).
-
--behaviour(hocon_schema).
-
--import(hoconsc, [mk/2]).
-
--export([
-    roots/0,
-    fields/1
-]).
-
-%%=====================================================================
-%% Hocon schema
-roots() ->
-    fields("config").
-
-fields("config") ->
-    emqx_connector_mqtt_schema:fields("config");
-fields("get") ->
-    [
-        {num_of_bridges,
-            mk(
-                integer(),
-                #{desc => ?DESC("num_of_bridges")}
-            )}
-    ] ++ fields("post");
-fields("put") ->
-    emqx_connector_mqtt_schema:fields("server_configs");
-fields("post") ->
-    [
-        {type,
-            mk(
-                mqtt,
-                #{
-                    required => true,
-                    desc => ?DESC("type")
-                }
-            )},
-        {name,
-            mk(
-                binary(),
-                #{
-                    required => true,
-                    desc => ?DESC("name")
-                }
-            )}
-    ] ++ fields("put").
-
-%% ===================================================================
-%% supervisor APIs
-start_link() ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-
-init([]) ->
-    SupFlag = #{
-        strategy => one_for_one,
-        intensity => 100,
-        period => 10
-    },
-    {ok, {SupFlag, []}}.
-
-bridge_spec(Config) ->
-    {Name, NConfig} = maps:take(name, Config),
-    #{
-        id => Name,
-        start => {emqx_connector_mqtt_worker, start_link, [Name, NConfig]},
-        restart => temporary,
-        shutdown => 1000
-    }.
-
--spec bridges() -> [{_Name, _Status}].
-bridges() ->
-    [
-        {Name, emqx_connector_mqtt_worker:status(Name)}
-     || {Name, _Pid, _, _} <- supervisor:which_children(?MODULE)
-    ].
-
-create_bridge(Config) ->
-    supervisor:start_child(?MODULE, bridge_spec(Config)).
-
-drop_bridge(Name) ->
-    case supervisor:terminate_child(?MODULE, Name) of
-        ok ->
-            supervisor:delete_child(?MODULE, Name);
-        {error, not_found} ->
-            ok;
-        {error, Error} ->
-            {error, Error}
-    end.
-
-%% ===================================================================
-%% When use this bridge as a data source, ?MODULE:on_message_received will be called
-%% if the bridge received msgs from the remote broker.
-on_message_received(Msg, HookPoint, ResId) ->
-    emqx_resource_metrics:received_inc(ResId),
-    emqx:run_hook(HookPoint, [Msg]).
-
-%% ===================================================================
-callback_mode() -> async_if_possible.
-
-on_start(InstanceId, Conf) ->
-    ?SLOG(info, #{
-        msg => "starting_mqtt_connector",
-        connector => InstanceId,
-        config => emqx_utils:redact(Conf)
-    }),
-    BasicConf = basic_config(Conf),
-    BridgeConf = BasicConf#{
-        name => InstanceId,
-        clientid => clientid(InstanceId, Conf),
-        subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), Conf, InstanceId),
-        forwards => make_forward_confs(maps:get(egress, Conf, undefined))
-    },
-    case ?MODULE:create_bridge(BridgeConf) of
-        {ok, _Pid} ->
-            ensure_mqtt_worker_started(InstanceId, BridgeConf);
-        {error, {already_started, _Pid}} ->
-            ok = ?MODULE:drop_bridge(InstanceId),
-            {ok, _} = ?MODULE:create_bridge(BridgeConf),
-            ensure_mqtt_worker_started(InstanceId, BridgeConf);
-        {error, Reason} ->
-            {error, Reason}
-    end.
-
-on_stop(_InstId, #{name := InstanceId}) ->
-    ?SLOG(info, #{
-        msg => "stopping_mqtt_connector",
-        connector => InstanceId
-    }),
-    case ?MODULE:drop_bridge(InstanceId) of
-        ok ->
-            ok;
-        {error, not_found} ->
-            ok;
-        {error, Reason} ->
-            ?SLOG(error, #{
-                msg => "stop_mqtt_connector_error",
-                connector => InstanceId,
-                reason => Reason
-            })
-    end.
-
-on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) ->
-    ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
-    case emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg) of
-        ok ->
-            ok;
-        {error, Reason} ->
-            classify_error(Reason)
-    end.
-
-on_query_async(_InstId, {send_message, Msg}, CallbackIn, #{name := InstanceId}) ->
-    ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
-    Callback = {fun on_async_result/2, [CallbackIn]},
-    case emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, Callback) of
-        ok ->
-            ok;
-        {ok, Pid} ->
-            {ok, Pid};
-        {error, Reason} ->
-            classify_error(Reason)
-    end.
-
-on_async_result(Callback, ok) ->
-    apply_callback_function(Callback, ok);
-on_async_result(Callback, {ok, _} = Ok) ->
-    apply_callback_function(Callback, Ok);
-on_async_result(Callback, {error, Reason}) ->
-    apply_callback_function(Callback, classify_error(Reason)).
-
-apply_callback_function(F, Result) when is_function(F) ->
-    erlang:apply(F, [Result]);
-apply_callback_function({F, A}, Result) when is_function(F), is_list(A) ->
-    erlang:apply(F, A ++ [Result]);
-apply_callback_function({M, F, A}, Result) when is_atom(M), is_atom(F), is_list(A) ->
-    erlang:apply(M, F, A ++ [Result]).
-
-on_get_status(_InstId, #{name := InstanceId}) ->
-    emqx_connector_mqtt_worker:status(InstanceId).
-
-classify_error(disconnected = Reason) ->
-    {error, {recoverable_error, Reason}};
-classify_error({disconnected, _RC, _} = Reason) ->
-    {error, {recoverable_error, Reason}};
-classify_error({shutdown, _} = Reason) ->
-    {error, {recoverable_error, Reason}};
-classify_error(shutdown = Reason) ->
-    {error, {recoverable_error, Reason}};
-classify_error(Reason) ->
-    {error, {unrecoverable_error, Reason}}.
-
-ensure_mqtt_worker_started(InstanceId, BridgeConf) ->
-    case emqx_connector_mqtt_worker:connect(InstanceId) of
-        {ok, Properties} ->
-            {ok, #{name => InstanceId, config => BridgeConf, props => Properties}};
-        {error, Reason} ->
-            {error, Reason}
-    end.
-
-make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 ->
-    undefined;
-make_sub_confs(undefined, _Conf, _) ->
-    undefined;
-make_sub_confs(SubRemoteConf, Conf, ResourceId) ->
-    case maps:find(hookpoint, Conf) of
-        error ->
-            error({no_hookpoint_provided, Conf});
-        {ok, HookPoint} ->
-            MFA = {?MODULE, on_message_received, [HookPoint, ResourceId]},
-            SubRemoteConf#{on_message_received => MFA}
-    end.
-
-make_forward_confs(EmptyMap) when map_size(EmptyMap) == 0 ->
-    undefined;
-make_forward_confs(undefined) ->
-    undefined;
-make_forward_confs(FrowardConf) ->
-    FrowardConf.
-
-basic_config(
-    #{
-        server := Server,
-        proto_ver := ProtoVer,
-        bridge_mode := BridgeMode,
-        clean_start := CleanStart,
-        keepalive := KeepAlive,
-        retry_interval := RetryIntv,
-        max_inflight := MaxInflight,
-        ssl := #{enable := EnableSsl} = Ssl
-    } = Conf
-) ->
-    BasicConf = #{
-        %% connection opts
-        server => Server,
-        %% 30s
-        connect_timeout => 30,
-        auto_reconnect => true,
-        proto_ver => ProtoVer,
-        %% Opening bridge_mode will form a non-standard mqtt connection message.
-        %% A load balancing server (such as haproxy) is often set up before the emqx broker server.
-        %% When the load balancing server enables mqtt connection packet inspection,
-        %% non-standard mqtt connection packets will be filtered out by LB.
-        %% So let's disable bridge_mode.
-        bridge_mode => BridgeMode,
-        keepalive => ms_to_s(KeepAlive),
-        clean_start => CleanStart,
-        retry_interval => RetryIntv,
-        max_inflight => MaxInflight,
-        ssl => EnableSsl,
-        ssl_opts => maps:to_list(maps:remove(enable, Ssl))
-    },
-    maybe_put_fields([username, password], Conf, BasicConf).
-
-maybe_put_fields(Fields, Conf, Acc0) ->
-    lists:foldl(
-        fun(Key, Acc) ->
-            case maps:find(Key, Conf) of
-                error -> Acc;
-                {ok, Val} -> Acc#{Key => Val}
-            end
-        end,
-        Acc0,
-        Fields
-    ).
-
-ms_to_s(Ms) ->
-    erlang:ceil(Ms / 1000).
-
-clientid(Id, _Conf = #{clientid_prefix := Prefix = <<_/binary>>}) ->
-    iolist_to_binary([Prefix, ":", Id, ":", atom_to_list(node())]);
-clientid(Id, _Conf) ->
-    iolist_to_binary([Id, ":", atom_to_list(node())]).

+ 0 - 1
apps/emqx_connector/src/emqx_connector_sup.erl

@@ -33,7 +33,6 @@ init([]) ->
         period => 20
     },
     ChildSpecs = [
-        child_spec(emqx_connector_mqtt),
         child_spec(emqx_connector_jwt_sup)
     ],
     {ok, {SupFlags, ChildSpecs}}.

+ 0 - 168
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl

@@ -1,168 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_connector_mqtt_msg).
-
--export([
-    to_binary/1,
-    from_binary/1,
-    make_pub_vars/2,
-    to_remote_msg/2,
-    to_broker_msg/3,
-    estimate_size/1
-]).
-
--export([
-    replace_vars_in_str/2,
-    replace_simple_var/2
-]).
-
--export_type([msg/0]).
-
--include_lib("emqx/include/emqx.hrl").
-
--include_lib("emqtt/include/emqtt.hrl").
-
--type msg() :: emqx_types:message().
--type exp_msg() :: emqx_types:message() | #mqtt_msg{}.
--type remote_config() :: #{
-    topic := binary(),
-    qos := original | integer(),
-    retain := original | boolean(),
-    payload := binary()
-}.
--type variables() :: #{
-    mountpoint := undefined | binary(),
-    remote := remote_config()
-}.
-
-make_pub_vars(_, undefined) ->
-    undefined;
-make_pub_vars(Mountpoint, Conf) when is_map(Conf) ->
-    Conf#{mountpoint => Mountpoint}.
-
-%% @doc Make export format:
-%% 1. Mount topic to a prefix
-%% 2. Fix QoS to 1
-%% @end
-%% Shame that we have to know the callback module here
-%% would be great if we can get rid of #mqtt_msg{} record
-%% and use #message{} in all places.
--spec to_remote_msg(msg() | map(), variables()) ->
-    exp_msg().
-to_remote_msg(#message{flags = Flags0} = Msg, Vars) ->
-    Retain0 = maps:get(retain, Flags0, false),
-    {Columns, _} = emqx_rule_events:eventmsg_publish(Msg),
-    MapMsg = maps:put(retain, Retain0, Columns),
-    to_remote_msg(MapMsg, Vars);
-to_remote_msg(MapMsg, #{
-    remote := #{
-        topic := TopicToken,
-        qos := QoSToken,
-        retain := RetainToken
-    } = Remote,
-    mountpoint := Mountpoint
-}) when is_map(MapMsg) ->
-    Topic = replace_vars_in_str(TopicToken, MapMsg),
-    Payload = process_payload(Remote, MapMsg),
-    QoS = replace_simple_var(QoSToken, MapMsg),
-    Retain = replace_simple_var(RetainToken, MapMsg),
-    PubProps = maps:get(pub_props, MapMsg, #{}),
-    #mqtt_msg{
-        qos = QoS,
-        retain = Retain,
-        topic = topic(Mountpoint, Topic),
-        props = emqx_utils:pub_props_to_packet(PubProps),
-        payload = Payload
-    };
-to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
-    Msg#message{topic = topic(Mountpoint, Topic)}.
-
-%% published from remote node over a MQTT connection
-to_broker_msg(Msg, Vars, undefined) ->
-    to_broker_msg(Msg, Vars, #{});
-to_broker_msg(
-    #{dup := Dup} = MapMsg,
-    #{
-        local := #{
-            topic := TopicToken,
-            qos := QoSToken,
-            retain := RetainToken
-        } = Local,
-        mountpoint := Mountpoint
-    },
-    Props
-) ->
-    Topic = replace_vars_in_str(TopicToken, MapMsg),
-    Payload = process_payload(Local, MapMsg),
-    QoS = replace_simple_var(QoSToken, MapMsg),
-    Retain = replace_simple_var(RetainToken, MapMsg),
-    PubProps = maps:get(pub_props, MapMsg, #{}),
-    set_headers(
-        Props#{properties => emqx_utils:pub_props_to_packet(PubProps)},
-        emqx_message:set_flags(
-            #{dup => Dup, retain => Retain},
-            emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload)
-        )
-    ).
-
-process_payload(From, MapMsg) ->
-    do_process_payload(maps:get(payload, From, undefined), MapMsg).
-
-do_process_payload(undefined, Msg) ->
-    emqx_utils_json:encode(Msg);
-do_process_payload(Tks, Msg) ->
-    replace_vars_in_str(Tks, Msg).
-
-%% Replace a string contains vars to another string in which the placeholders are replace by the
-%% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be:
-%% "a: 1".
-replace_vars_in_str(Tokens, Data) when is_list(Tokens) ->
-    emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => full_binary});
-replace_vars_in_str(Val, _Data) ->
-    Val.
-
-%% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result
-%% value will be an integer 1.
-replace_simple_var(Tokens, Data) when is_list(Tokens) ->
-    [Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}),
-    Var;
-replace_simple_var(Val, _Data) ->
-    Val.
-
-%% @doc Make `binary()' in order to make iodata to be persisted on disk.
--spec to_binary(msg()) -> binary().
-to_binary(Msg) -> term_to_binary(Msg).
-
-%% @doc Unmarshal binary into `msg()'.
--spec from_binary(binary()) -> msg().
-from_binary(Bin) -> binary_to_term(Bin).
-
-%% @doc Estimate the size of a message.
-%% Count only the topic length + payload size
-%% There is no topic and payload for event message. So count all `Msg` term
--spec estimate_size(msg()) -> integer().
-estimate_size(#message{topic = Topic, payload = Payload}) ->
-    size(Topic) + size(Payload);
-estimate_size(#{topic := Topic, payload := Payload}) ->
-    size(Topic) + size(Payload);
-estimate_size(Term) ->
-    erlang:external_size(Term).
-
-set_headers(Val, Msg) ->
-    emqx_message:set_headers(Val, Msg).
-topic(undefined, Topic) -> Topic;
-topic(Prefix, Topic) -> emqx_topic:prepend(Prefix, Topic).

+ 0 - 465
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl

@@ -1,465 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
-%% @doc Bridge works in two layers (1) batching layer (2) transport layer
-%% The `bridge' batching layer collects local messages in batches and sends over
-%% to remote MQTT node/cluster via `connection' transport layer.
-%% In case `REMOTE' is also an EMQX node, `connection' is recommended to be
-%% the `gen_rpc' based implementation `emqx_bridge_rpc'. Otherwise `connection'
-%% has to be `emqx_connector_mqtt_mod'.
-%%
-%% ```
-%% +------+                        +--------+
-%% | EMQX |                        | REMOTE |
-%% |      |                        |        |
-%% |   (bridge) <==(connection)==> |        |
-%% |      |                        |        |
-%% |      |                        |        |
-%% +------+                        +--------+
-%% '''
-%%
-%%
-%% This module implements 2 kinds of APIs with regards to batching and
-%% messaging protocol. (1) A `gen_statem' based local batch collector;
-%% (2) APIs for incoming remote batches/messages.
-%%
-%% Batch collector state diagram
-%%
-%% [idle] --(0) --> [connecting] --(2)--> [connected]
-%%                  |        ^                 |
-%%                  |        |                 |
-%%                  '--(1)---'--------(3)------'
-%%
-%% (0): auto or manual start
-%% (1): retry timeout
-%% (2): successfully connected to remote node/cluster
-%% (3): received {disconnected, Reason} OR
-%%      failed to send to remote node/cluster.
-%%
-%% NOTE: A bridge worker may subscribe to multiple (including wildcard)
-%% local topics, and the underlying `emqx_bridge_connect' may subscribe to
-%% multiple remote topics, however, worker/connections are not designed
-%% to support automatic load-balancing, i.e. in case it can not keep up
-%% with the amount of messages coming in, administrator should split and
-%% balance topics between worker/connections manually.
-%%
-%% NOTES:
-%% * Local messages are all normalised to QoS-1 when exporting to remote
-
--module(emqx_connector_mqtt_worker).
-
--include_lib("snabbkaffe/include/snabbkaffe.hrl").
--include_lib("emqx/include/logger.hrl").
-
-%% APIs
--export([
-    start_link/2,
-    stop/1
-]).
-
-%% management APIs
--export([
-    connect/1,
-    status/1,
-    ping/1,
-    info/1,
-    send_to_remote/2,
-    send_to_remote_async/3
-]).
-
--export([handle_publish/3]).
--export([handle_disconnect/1]).
-
--export_type([
-    config/0,
-    ack_ref/0
-]).
-
--type name() :: term().
-% -type qos() :: emqx_types:qos().
--type config() :: map().
--type ack_ref() :: term().
-% -type topic() :: emqx_types:topic().
-
--include_lib("emqx/include/logger.hrl").
--include_lib("emqx/include/emqx_mqtt.hrl").
-
--define(REF(Name), {via, gproc, ?NAME(Name)}).
--define(NAME(Name), {n, l, Name}).
-
-%% @doc Start a bridge worker. Supported configs:
-%% mountpoint: The topic mount point for messages sent to remote node/cluster
-%%      `undefined', `<<>>' or `""' to disable
-%% forwards: Local topics to subscribe.
-%%
-%% Find more connection specific configs in the callback modules
-%% of emqx_bridge_connect behaviour.
--spec start_link(name(), map()) ->
-    {ok, pid()} | {error, _Reason}.
-start_link(Name, BridgeOpts) ->
-    ?SLOG(debug, #{
-        msg => "client_starting",
-        name => Name,
-        options => BridgeOpts
-    }),
-    Conf = init_config(Name, BridgeOpts),
-    Options = mk_client_options(Conf, BridgeOpts),
-    case emqtt:start_link(Options) of
-        {ok, Pid} ->
-            true = gproc:reg_other(?NAME(Name), Pid, Conf),
-            {ok, Pid};
-        {error, Reason} = Error ->
-            ?SLOG(error, #{
-                msg => "client_start_failed",
-                config => emqx_utils:redact(BridgeOpts),
-                reason => Reason
-            }),
-            Error
-    end.
-
-init_config(Name, Opts) ->
-    Mountpoint = maps:get(forward_mountpoint, Opts, undefined),
-    Subscriptions = maps:get(subscriptions, Opts, undefined),
-    Forwards = maps:get(forwards, Opts, undefined),
-    #{
-        mountpoint => format_mountpoint(Mountpoint),
-        subscriptions => pre_process_subscriptions(Subscriptions, Name, Opts),
-        forwards => pre_process_forwards(Forwards)
-    }.
-
-mk_client_options(Conf, BridgeOpts) ->
-    Server = iolist_to_binary(maps:get(server, BridgeOpts)),
-    HostPort = emqx_connector_mqtt_schema:parse_server(Server),
-    Mountpoint = maps:get(receive_mountpoint, BridgeOpts, undefined),
-    Subscriptions = maps:get(subscriptions, Conf),
-    Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions),
-    CleanStart =
-        case Subscriptions of
-            #{remote := _} ->
-                maps:get(clean_start, BridgeOpts);
-            undefined ->
-                %% NOTE
-                %% We are ignoring the user configuration here because there's currently no reliable way
-                %% to ensure proper session recovery according to the MQTT spec.
-                true
-        end,
-    Opts = maps:without(
-        [
-            address,
-            auto_reconnect,
-            conn_type,
-            mountpoint,
-            forwards,
-            receive_mountpoint,
-            subscriptions
-        ],
-        BridgeOpts
-    ),
-    Opts#{
-        msg_handler => mk_client_event_handler(Vars, #{server => Server}),
-        hosts => [HostPort],
-        clean_start => CleanStart,
-        force_ping => true,
-        proto_ver => maps:get(proto_ver, BridgeOpts, v4)
-    }.
-
-mk_client_event_handler(Vars, Opts) when Vars /= undefined ->
-    #{
-        publish => {fun ?MODULE:handle_publish/3, [Vars, Opts]},
-        disconnected => {fun ?MODULE:handle_disconnect/1, []}
-    };
-mk_client_event_handler(undefined, _Opts) ->
-    undefined.
-
-connect(Name) ->
-    #{subscriptions := Subscriptions} = get_config(Name),
-    case emqtt:connect(get_pid(Name)) of
-        {ok, Properties} ->
-            case subscribe_remote_topics(Name, Subscriptions) of
-                ok ->
-                    {ok, Properties};
-                {ok, _, _RCs} ->
-                    {ok, Properties};
-                {error, Reason} = Error ->
-                    ?SLOG(error, #{
-                        msg => "client_subscribe_failed",
-                        subscriptions => Subscriptions,
-                        reason => Reason
-                    }),
-                    Error
-            end;
-        {error, Reason} = Error ->
-            ?SLOG(warning, #{
-                msg => "client_connect_failed",
-                reason => Reason,
-                name => Name
-            }),
-            Error
-    end.
-subscribe_remote_topics(Ref, #{remote := #{topic := FromTopic, qos := QoS}}) ->
-    emqtt:subscribe(ref(Ref), FromTopic, QoS);
-subscribe_remote_topics(_Ref, undefined) ->
-    ok.
-
-stop(Ref) ->
-    emqtt:stop(ref(Ref)).
-
-info(Ref) ->
-    emqtt:info(ref(Ref)).
-
-status(Ref) ->
-    try
-        case proplists:get_value(socket, info(Ref)) of
-            Socket when Socket /= undefined ->
-                connected;
-            undefined ->
-                connecting
-        end
-    catch
-        exit:{noproc, _} ->
-            disconnected
-    end.
-
-ping(Ref) ->
-    emqtt:ping(ref(Ref)).
-
-send_to_remote(Name, MsgIn) ->
-    trycall(fun() -> do_send(Name, export_msg(Name, MsgIn)) end).
-
-do_send(Name, {true, Msg}) ->
-    case emqtt:publish(get_pid(Name), Msg) of
-        ok ->
-            ok;
-        {ok, #{reason_code := RC}} when
-            RC =:= ?RC_SUCCESS;
-            RC =:= ?RC_NO_MATCHING_SUBSCRIBERS
-        ->
-            ok;
-        {ok, #{reason_code := RC, reason_code_name := Reason}} ->
-            ?SLOG(warning, #{
-                msg => "remote_publish_failed",
-                message => Msg,
-                reason_code => RC,
-                reason_code_name => Reason
-            }),
-            {error, Reason};
-        {error, Reason} ->
-            ?SLOG(info, #{
-                msg => "client_failed",
-                reason => Reason
-            }),
-            {error, Reason}
-    end;
-do_send(_Name, false) ->
-    ok.
-
-send_to_remote_async(Name, MsgIn, Callback) ->
-    trycall(fun() -> do_send_async(Name, export_msg(Name, MsgIn), Callback) end).
-
-do_send_async(Name, {true, Msg}, Callback) ->
-    Pid = get_pid(Name),
-    ok = emqtt:publish_async(Pid, Msg, _Timeout = infinity, Callback),
-    {ok, Pid};
-do_send_async(_Name, false, _Callback) ->
-    ok.
-
-ref(Pid) when is_pid(Pid) ->
-    Pid;
-ref(Term) ->
-    ?REF(Term).
-
-trycall(Fun) ->
-    try
-        Fun()
-    catch
-        throw:noproc ->
-            {error, disconnected};
-        exit:{noproc, _} ->
-            {error, disconnected}
-    end.
-
-format_mountpoint(undefined) ->
-    undefined;
-format_mountpoint(Prefix) ->
-    binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)).
-
-pre_process_subscriptions(undefined, _, _) ->
-    undefined;
-pre_process_subscriptions(
-    #{remote := RC, local := LC} = Conf,
-    BridgeName,
-    BridgeOpts
-) when is_map(Conf) ->
-    Conf#{
-        remote => pre_process_in_remote(RC, BridgeName, BridgeOpts),
-        local => pre_process_in_out_common(LC)
-    };
-pre_process_subscriptions(Conf, _, _) when is_map(Conf) ->
-    %% have no 'local' field in the config
-    undefined.
-
-pre_process_forwards(undefined) ->
-    undefined;
-pre_process_forwards(#{remote := RC} = Conf) when is_map(Conf) ->
-    Conf#{remote => pre_process_in_out_common(RC)};
-pre_process_forwards(Conf) when is_map(Conf) ->
-    %% have no 'remote' field in the config
-    undefined.
-
-pre_process_in_out_common(Conf0) ->
-    Conf1 = pre_process_conf(topic, Conf0),
-    Conf2 = pre_process_conf(qos, Conf1),
-    Conf3 = pre_process_conf(payload, Conf2),
-    pre_process_conf(retain, Conf3).
-
-pre_process_conf(Key, Conf) ->
-    case maps:find(Key, Conf) of
-        error ->
-            Conf;
-        {ok, Val} when is_binary(Val) ->
-            Conf#{Key => emqx_plugin_libs_rule:preproc_tmpl(Val)};
-        {ok, Val} ->
-            Conf#{Key => Val}
-    end.
-
-pre_process_in_remote(#{qos := QoSIn} = Conf, BridgeName, BridgeOpts) ->
-    QoS = downgrade_ingress_qos(QoSIn),
-    case QoS of
-        QoSIn ->
-            ok;
-        _ ->
-            ?SLOG(warning, #{
-                msg => "downgraded_unsupported_ingress_qos",
-                qos_configured => QoSIn,
-                qos_used => QoS,
-                name => BridgeName,
-                options => BridgeOpts
-            })
-    end,
-    Conf#{qos => QoS}.
-
-downgrade_ingress_qos(2) ->
-    1;
-downgrade_ingress_qos(QoS) ->
-    QoS.
-
-get_pid(Name) ->
-    case gproc:where(?NAME(Name)) of
-        Pid when is_pid(Pid) ->
-            Pid;
-        undefined ->
-            throw(noproc)
-    end.
-
-get_config(Name) ->
-    try
-        gproc:lookup_value(?NAME(Name))
-    catch
-        error:badarg ->
-            throw(noproc)
-    end.
-
-export_msg(Name, Msg) ->
-    case get_config(Name) of
-        #{forwards := Forwards = #{}, mountpoint := Mountpoint} ->
-            {true, export_msg(Mountpoint, Forwards, Msg)};
-        #{forwards := undefined} ->
-            ?SLOG(error, #{
-                msg => "forwarding_unavailable",
-                message => Msg,
-                reason => "egress is not configured"
-            }),
-            false
-    end.
-
-export_msg(Mountpoint, Forwards, Msg) ->
-    Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards),
-    emqx_connector_mqtt_msg:to_remote_msg(Msg, Vars).
-
-%%
-
-handle_publish(#{properties := Props} = MsgIn, Vars, Opts) ->
-    Msg = import_msg(MsgIn, Opts),
-    ?SLOG(debug, #{
-        msg => "publish_local",
-        message => Msg,
-        vars => Vars
-    }),
-    case Vars of
-        #{on_message_received := {Mod, Func, Args}} ->
-            _ = erlang:apply(Mod, Func, [Msg | Args]);
-        _ ->
-            ok
-    end,
-    maybe_publish_local(Msg, Vars, Props).
-
-handle_disconnect(_Reason) ->
-    ok.
-
-maybe_publish_local(Msg, Vars, Props) ->
-    case emqx_utils_maps:deep_get([local, topic], Vars, undefined) of
-        %% local topic is not set, discard it
-        undefined ->
-            ok;
-        _ ->
-            emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props))
-    end.
-
-import_msg(
-    #{
-        dup := Dup,
-        payload := Payload,
-        properties := Props,
-        qos := QoS,
-        retain := Retain,
-        topic := Topic
-    },
-    #{server := Server}
-) ->
-    #{
-        id => emqx_guid:to_hexstr(emqx_guid:gen()),
-        server => Server,
-        payload => Payload,
-        topic => Topic,
-        qos => QoS,
-        dup => Dup,
-        retain => Retain,
-        pub_props => printable_maps(Props),
-        message_received_at => erlang:system_time(millisecond)
-    }.
-
-printable_maps(undefined) ->
-    #{};
-printable_maps(Headers) ->
-    maps:fold(
-        fun
-            ('User-Property', V0, AccIn) when is_list(V0) ->
-                AccIn#{
-                    'User-Property' => maps:from_list(V0),
-                    'User-Property-Pairs' => [
-                        #{
-                            key => Key,
-                            value => Value
-                        }
-                     || {Key, Value} <- V0
-                    ]
-                };
-            (K, V0, AccIn) ->
-                AccIn#{K => V0}
-        end,
-        #{},
-        Headers
-    ).

+ 2 - 1
apps/emqx_resource/src/emqx_resource.erl

@@ -121,7 +121,8 @@
 
 -export_type([
     resource_id/0,
-    resource_data/0
+    resource_data/0,
+    resource_status/0
 ]).
 
 -optional_callbacks([

+ 3 - 0
changes/ce/perf-10754.en.md

@@ -0,0 +1,3 @@
+The MQTT bridge has been enhanced to utilize connection pooling and leverage available parallelism, substantially improving throughput.
+
+As a consequence, single MQTT bridge now uses a pool of `clientid`s to connect to the remote broker.

+ 3 - 1
mix.exs

@@ -59,7 +59,7 @@ defmodule EMQXUmbrella.MixProject do
       {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
       {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true},
       {:minirest, github: "emqx/minirest", tag: "1.3.10", override: true},
-      {:ecpool, github: "emqx/ecpool", tag: "0.5.3", override: true},
+      {:ecpool, github: "emqx/ecpool", tag: "0.5.4", override: true},
       {:replayq, github: "emqx/replayq", tag: "0.3.7", override: true},
       {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
       # maybe forbid to fetch quicer
@@ -310,6 +310,7 @@ defmodule EMQXUmbrella.MixProject do
             :emqx_connector,
             :emqx_exhook,
             :emqx_bridge,
+            :emqx_bridge_mqtt,
             :emqx_modules,
             :emqx_management,
             :emqx_retainer,
@@ -372,6 +373,7 @@ defmodule EMQXUmbrella.MixProject do
         emqx_gateway_exproto: :permanent,
         emqx_exhook: :permanent,
         emqx_bridge: :permanent,
+        emqx_bridge_mqtt: :permanent,
         emqx_rule_engine: :permanent,
         emqx_modules: :permanent,
         emqx_management: :permanent,

+ 1 - 1
rebar.config

@@ -66,7 +66,7 @@
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
     , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
     , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.10"}}}
-    , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.3"}}}
+    , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.4"}}}
     , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
     , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.5"}}}

+ 1 - 0
rebar.config.erl

@@ -427,6 +427,7 @@ relx_apps(ReleaseType, Edition) ->
             emqx_gateway_exproto,
             emqx_exhook,
             emqx_bridge,
+            emqx_bridge_mqtt,
             emqx_rule_engine,
             emqx_modules,
             emqx_management,

+ 20 - 1
rel/i18n/emqx_connector_mqtt_schema.hocon

@@ -1,4 +1,4 @@
-emqx_connector_mqtt_schema {
+emqx_bridge_mqtt_connector_schema {
 
 bridge_mode.desc:
 """If enable bridge mode.
@@ -32,6 +32,14 @@ is configured, then both the data got from the rule and the MQTT messages that m
 egress_desc.label:
 """Egress Configs"""
 
+egress_pool_size.desc:
+"""Size of the pool of MQTT clients that will publish messages to the remote broker.<br/>
+Each MQTT client will be assigned 'clientid' of the form '${clientid_prefix}:${bridge_name}:egress:${node}:${n}'
+where 'n' is the number of a client inside the pool."""
+
+egress_pool_size.label:
+"""Pool Size"""
+
 egress_local.desc:
 """The configs about receiving messages from local broker."""
 
@@ -75,6 +83,17 @@ ingress_desc.desc:
 ingress_desc.label:
 """Ingress Configs"""
 
+ingress_pool_size.desc:
+"""Size of the pool of MQTT clients that will ingest messages from the remote broker.<br/>
+This value will be respected only if 'remote.topic' is a shared subscription topic or topic-filter
+(for example `$share/name1/topic1` or `$share/name2/topic2/#`), otherwise only a single MQTT client will be used.
+Each MQTT client will be assigned 'clientid' of the form '${clientid_prefix}:${bridge_name}:ingress:${node}:${n}'
+where 'n' is the number of a client inside the pool.
+NOTE: Non-shared subscription will not work well when EMQX is clustered."""
+
+ingress_pool_size.label:
+"""Pool Size"""
+
 ingress_local.desc:
 """The configs about sending message to the local broker."""
 

+ 0 - 21
rel/i18n/emqx_connector_mqtt.hocon

@@ -1,21 +0,0 @@
-emqx_connector_mqtt {
-
-name.desc:
-"""Connector name, used as a human-readable description of the connector."""
-
-name.label:
-"""Connector Name"""
-
-num_of_bridges.desc:
-"""The current number of bridges that are using this connector."""
-
-num_of_bridges.label:
-"""Num of Bridges"""
-
-type.desc:
-"""The Connector Type."""
-
-type.label:
-"""Connector Type"""
-
-}