Просмотр исходного кода

refactor(mqttconn): split ingress/egress into 2 separate pools

Each with a more refined set of responsibilities, at the cost of slight
code duplication. Also provide two different config fields for each pool
size.
Andrew Mayorov 2 лет назад
Родитель
Сommit
a5fc26736d

+ 8 - 1
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -47,7 +47,14 @@
     <<"server">> => SERVER,
     <<"username">> => <<"user1">>,
     <<"password">> => <<"">>,
-    <<"proto_ver">> => <<"v5">>
+    <<"proto_ver">> => <<"v5">>,
+    <<"egress">> => #{
+        <<"remote">> => #{
+            <<"topic">> => <<"emqx/${topic}">>,
+            <<"qos">> => <<"${qos}">>,
+            <<"retain">> => false
+        }
+    }
 }).
 -define(MQTT_BRIDGE(SERVER), ?MQTT_BRIDGE(SERVER, <<"mqtt_egress_test_bridge">>)).
 

+ 5 - 5
apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl

@@ -22,9 +22,7 @@
 
 -include("emqx/include/emqx.hrl").
 -include_lib("eunit/include/eunit.hrl").
--include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
--include("emqx_dashboard/include/emqx_dashboard.hrl").
 
 %% output functions
 -export([inspect/3]).
@@ -259,8 +257,8 @@ t_mqtt_conn_bridge_ingress_shared_subscription(_) ->
         ?SERVER_CONF(<<>>)#{
             <<"type">> => ?TYPE_MQTT,
             <<"name">> => BridgeName,
-            <<"pool_size">> => PoolSize,
             <<"ingress">> => #{
+                <<"pool_size">> => PoolSize,
                 <<"remote">> => #{
                     <<"topic">> => <<"$share/ingress/", ?INGRESS_REMOTE_TOPIC, "/#">>,
                     <<"qos">> => 1
@@ -305,9 +303,11 @@ t_mqtt_egress_bridge_ignores_clean_start(_) ->
     ),
 
     ResourceID = emqx_bridge_resource:resource_id(BridgeID),
+    {ok, _Group, #{state := #{egress_pool_name := EgressPoolName}}} =
+        emqx_resource_manager:lookup_cached(ResourceID),
     ClientInfo = ecpool:pick_and_do(
-        ResourceID,
-        {emqx_connector_mqtt_worker, info, []},
+        EgressPoolName,
+        {emqx_connector_mqtt_egress, info, []},
         no_handover
     ),
     ?assertMatch(

+ 182 - 60
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -52,34 +52,134 @@ on_start(ResourceId, Conf) ->
         connector => ResourceId,
         config => emqx_utils:redact(Conf)
     }),
-    BasicOpts = mk_worker_opts(ResourceId, Conf),
-    BridgeOpts = BasicOpts#{
-        ingress => mk_ingress_config(maps:get(ingress, Conf, #{}), Conf, ResourceId),
-        egress => maps:get(egress, Conf, #{})
-    },
-    {ok, ClientOpts, WorkerConf} = emqx_connector_mqtt_worker:init(ResourceId, BridgeOpts),
-    case emqx_resource_pool:start(ResourceId, emqx_connector_mqtt_worker, ClientOpts) of
+    case start_ingress(ResourceId, Conf) of
+        {ok, Result1} ->
+            case start_egress(ResourceId, Conf) of
+                {ok, Result2} ->
+                    {ok, maps:merge(Result1, Result2)};
+                {error, Reason} ->
+                    _ = stop_ingress(Result1),
+                    {error, Reason}
+            end;
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+start_ingress(ResourceId, Conf) ->
+    ClientOpts = mk_client_opts(ResourceId, "ingress", Conf),
+    case mk_ingress_config(ResourceId, Conf) of
+        Ingress = #{} ->
+            start_ingress(ResourceId, Ingress, ClientOpts);
+        undefined ->
+            {ok, #{}}
+    end.
+
+start_ingress(ResourceId, Ingress, ClientOpts) ->
+    PoolName = <<ResourceId/binary, ":ingress">>,
+    PoolSize = choose_ingress_pool_size(Ingress),
+    Options = [
+        {name, PoolName},
+        {pool_size, PoolSize},
+        {ingress, Ingress},
+        {client_opts, ClientOpts}
+    ],
+    case emqx_resource_pool:start(PoolName, emqx_connector_mqtt_ingress, Options) of
         ok ->
-            {ok, #{config => WorkerConf}};
+            {ok, #{ingress_pool_name => PoolName}};
         {error, {start_pool_failed, _, Reason}} ->
             {error, Reason}
     end.
 
-on_stop(ResourceId, #{}) ->
+choose_ingress_pool_size(#{remote := #{topic := RemoteTopic}, pool_size := PoolSize}) ->
+    case emqx_topic:parse(RemoteTopic) of
+        {_Filter, #{share := _Name}} ->
+            % NOTE: this is shared subscription, many workers may subscribe
+            PoolSize;
+        {_Filter, #{}} ->
+            % NOTE: this is regular subscription, only one worker should subscribe
+            ?SLOG(warning, #{
+                msg => "ingress_pool_size_ignored",
+                reason =>
+                    "Remote topic filter is not a shared subscription, "
+                    "ingress pool will start with a single worker",
+                config_pool_size => PoolSize,
+                pool_size => 1
+            }),
+            1
+    end.
+
+start_egress(ResourceId, Conf) ->
+    % NOTE
+    % We are ignoring the user configuration here because there's currently no reliable way
+    % to ensure proper session recovery according to the MQTT spec.
+    ClientOpts = maps:put(clean_start, true, mk_client_opts(ResourceId, "egress", Conf)),
+    case mk_egress_config(Conf) of
+        Egress = #{} ->
+            start_egress(ResourceId, Egress, ClientOpts);
+        undefined ->
+            {ok, #{}}
+    end.
+
+start_egress(ResourceId, Egress, ClientOpts) ->
+    PoolName = <<ResourceId/binary, ":egress">>,
+    PoolSize = maps:get(pool_size, Egress),
+    Options = [
+        {name, PoolName},
+        {pool_size, PoolSize},
+        {client_opts, ClientOpts}
+    ],
+    case emqx_resource_pool:start(PoolName, emqx_connector_mqtt_egress, Options) of
+        ok ->
+            {ok, #{
+                egress_pool_name => PoolName,
+                egress_config => emqx_connector_mqtt_egress:config(Egress)
+            }};
+        {error, {start_pool_failed, _, Reason}} ->
+            {error, Reason}
+    end.
+
+on_stop(ResourceId, State) ->
     ?SLOG(info, #{
         msg => "stopping_mqtt_connector",
         connector => ResourceId
     }),
-    emqx_resource_pool:stop(ResourceId).
+    ok = stop_ingress(State),
+    ok = stop_egress(State).
+
+stop_ingress(#{ingress_pool_name := PoolName}) ->
+    emqx_resource_pool:stop(PoolName);
+stop_ingress(#{}) ->
+    ok.
+
+stop_egress(#{egress_pool_name := PoolName}) ->
+    emqx_resource_pool:stop(PoolName);
+stop_egress(#{}) ->
+    ok.
 
-on_query(ResourceId, {send_message, Msg}, #{config := Config}) ->
+on_query(
+    ResourceId,
+    {send_message, Msg},
+    #{egress_pool_name := PoolName, egress_config := Config}
+) ->
     ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => ResourceId}),
-    handle_send_result(with_worker(ResourceId, send_to_remote, [Msg, Config])).
+    handle_send_result(with_worker(PoolName, send, [Msg, Config]));
+on_query(ResourceId, {send_message, Msg}, #{}) ->
+    ?SLOG(error, #{
+        msg => "forwarding_unavailable",
+        connector => ResourceId,
+        message => Msg,
+        reason => "Egress is not configured"
+    }).
 
-on_query_async(ResourceId, {send_message, Msg}, CallbackIn, #{config := Config}) ->
+on_query_async(
+    ResourceId,
+    {send_message, Msg},
+    CallbackIn,
+    #{egress_pool_name := PoolName, egress_config := Config}
+) ->
     ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => ResourceId}),
     Callback = {fun on_async_result/2, [CallbackIn]},
-    Result = with_worker(ResourceId, send_to_remote_async, [Msg, Callback, Config]),
+    Result = with_worker(PoolName, send_async, [Msg, Callback, Config]),
     case Result of
         ok ->
             ok;
@@ -87,13 +187,20 @@ on_query_async(ResourceId, {send_message, Msg}, CallbackIn, #{config := Config})
             {ok, Pid};
         {error, Reason} ->
             {error, classify_error(Reason)}
-    end.
+    end;
+on_query_async(ResourceId, {send_message, Msg}, _Callback, #{}) ->
+    ?SLOG(error, #{
+        msg => "forwarding_unavailable",
+        connector => ResourceId,
+        message => Msg,
+        reason => "Egress is not configured"
+    }).
 
 with_worker(ResourceId, Fun, Args) ->
     Worker = ecpool:get_client(ResourceId),
     case is_pid(Worker) andalso ecpool_worker:client(Worker) of
         {ok, Client} ->
-            erlang:apply(emqx_connector_mqtt_worker, Fun, [Client | Args]);
+            erlang:apply(emqx_connector_mqtt_egress, Fun, [Client | Args]);
         {error, Reason} ->
             {error, Reason};
         false ->
@@ -135,8 +242,9 @@ classify_error(shutdown = Reason) ->
 classify_error(Reason) ->
     {unrecoverable_error, Reason}.
 
-on_get_status(ResourceId, #{}) ->
-    Workers = [Worker || {_Name, Worker} <- ecpool:workers(ResourceId)],
+on_get_status(_ResourceId, State) ->
+    Pools = maps:to_list(maps:with([ingress_pool_name, egress_pool_name], State)),
+    Workers = [{Pool, Worker} || {Pool, PN} <- Pools, {_Name, Worker} <- ecpool:workers(PN)],
     try emqx_utils:pmap(fun get_status/1, Workers, ?HEALTH_CHECK_TIMEOUT) of
         Statuses ->
             combine_status(Statuses)
@@ -145,10 +253,12 @@ on_get_status(ResourceId, #{}) ->
             connecting
     end.
 
-get_status(Worker) ->
+get_status({Pool, Worker}) ->
     case ecpool_worker:client(Worker) of
-        {ok, Client} ->
-            emqx_connector_mqtt_worker:status(Client);
+        {ok, Client} when Pool == ingress_pool_name ->
+            emqx_connector_mqtt_ingress:status(Client);
+        {ok, Client} when Pool == egress_pool_name ->
+            emqx_connector_mqtt_egress:status(Client);
         {error, _} ->
             disconnected
     end.
@@ -165,56 +275,68 @@ combine_status(Statuses) ->
             disconnected
     end.
 
-mk_ingress_config(Ingress, _Conf, _) when map_size(Ingress) == 0 ->
-    Ingress;
-mk_ingress_config(Ingress, #{hookpoint := HookPoint}, ResourceId) ->
-    MFA = {?MODULE, on_message_received, [HookPoint, ResourceId]},
-    Ingress#{on_message_received => MFA};
-mk_ingress_config(_Ingress, Conf, ResourceId) ->
-    error({no_hookpoint_provided, ResourceId, Conf}).
-
-mk_worker_opts(
+mk_ingress_config(
     ResourceId,
     #{
+        ingress := Ingress = #{remote := _},
+        server := Server,
+        hookpoint := HookPoint
+    }
+) ->
+    Ingress#{
+        server => Server,
+        on_message_received => {?MODULE, on_message_received, [HookPoint, ResourceId]}
+    };
+mk_ingress_config(ResourceId, #{ingress := #{remote := _}} = Conf) ->
+    error({no_hookpoint_provided, ResourceId, Conf});
+mk_ingress_config(_ResourceId, #{}) ->
+    undefined.
+
+mk_egress_config(#{egress := Egress = #{remote := _}}) ->
+    Egress;
+mk_egress_config(#{}) ->
+    undefined.
+
+mk_client_opts(
+    ResourceId,
+    ClientScope,
+    Config = #{
         server := Server,
-        pool_size := PoolSize,
-        proto_ver := ProtoVer,
-        bridge_mode := BridgeMode,
-        clean_start := CleanStart,
         keepalive := KeepAlive,
-        retry_interval := RetryIntv,
-        max_inflight := MaxInflight,
         ssl := #{enable := EnableSsl} = Ssl
-    } = Conf
+    }
 ) ->
-    Options = #{
-        server => Server,
-        pool_size => PoolSize,
-        %% 30s
+    HostPort = emqx_connector_mqtt_schema:parse_server(Server),
+    Options = maps:with(
+        [
+            proto_ver,
+            username,
+            password,
+            clean_start,
+            retry_interval,
+            max_inflight,
+            % Opening a connection in bridge mode will form a non-standard mqtt connection message.
+            % A load balancing server (such as haproxy) is often set up before the emqx broker server.
+            % When the load balancing server enables mqtt connection packet inspection,
+            % non-standard mqtt connection packets might be filtered out by LB.
+            bridge_mode
+        ],
+        Config
+    ),
+    Options#{
+        hosts => [HostPort],
+        clientid => clientid(ResourceId, ClientScope, Config),
         connect_timeout => 30,
-        proto_ver => ProtoVer,
-        %% Opening a connection in bridge mode will form a non-standard mqtt connection message.
-        %% A load balancing server (such as haproxy) is often set up before the emqx broker server.
-        %% When the load balancing server enables mqtt connection packet inspection,
-        %% non-standard mqtt connection packets might be filtered out by LB.
-        clientid => clientid(ResourceId, Conf),
-        bridge_mode => BridgeMode,
         keepalive => ms_to_s(KeepAlive),
-        clean_start => CleanStart,
-        retry_interval => RetryIntv,
-        max_inflight => MaxInflight,
+        force_ping => true,
         ssl => EnableSsl,
         ssl_opts => maps:to_list(maps:remove(enable, Ssl))
-    },
-    maps:merge(
-        Options,
-        maps:with([username, password], Conf)
-    ).
+    }.
 
 ms_to_s(Ms) ->
     erlang:ceil(Ms / 1000).
 
-clientid(Id, _Conf = #{clientid_prefix := Prefix}) when is_binary(Prefix) ->
-    iolist_to_binary([Prefix, ":", Id, ":", atom_to_list(node())]);
-clientid(Id, _Conf) ->
-    iolist_to_binary([Id, ":", atom_to_list(node())]).
+clientid(Id, ClientScope, _Conf = #{clientid_prefix := Prefix}) when is_binary(Prefix) ->
+    iolist_to_binary([Prefix, ":", Id, ":", ClientScope, ":", atom_to_list(node())]);
+clientid(Id, ClientScope, _Conf) ->
+    iolist_to_binary([Id, ":", ClientScope, ":", atom_to_list(node())]).

+ 162 - 0
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_egress.erl

@@ -0,0 +1,162 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_connector_mqtt_egress).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
+
+-behaviour(ecpool_worker).
+
+%% ecpool
+-export([connect/1]).
+
+-export([
+    config/1,
+    send/3,
+    send_async/4
+]).
+
+%% management APIs
+-export([
+    status/1,
+    info/1
+]).
+
+-type name() :: term().
+-type message() :: emqx_types:message() | map().
+-type callback() :: {function(), [_Arg]} | {module(), atom(), [_Arg]}.
+-type remote_message() :: #mqtt_msg{}.
+
+-type option() ::
+    {name, name()}
+    %% see `emqtt:option()`
+    | {client_opts, map()}.
+
+-type egress() :: #{
+    local => #{
+        topic => emqx_topic:topic()
+    },
+    remote := emqx_connector_mqtt_msg:msgvars()
+}.
+
+%% @doc Start an ingress bridge worker.
+-spec connect([option() | {ecpool_worker_id, pos_integer()}]) ->
+    {ok, pid()} | {error, _Reason}.
+connect(Options) ->
+    ?SLOG(debug, #{
+        msg => "egress_client_starting",
+        options => emqx_utils:redact(Options)
+    }),
+    Name = proplists:get_value(name, Options),
+    WorkerId = proplists:get_value(ecpool_worker_id, Options),
+    ClientOpts = proplists:get_value(client_opts, Options),
+    case emqtt:start_link(mk_client_opts(WorkerId, ClientOpts)) of
+        {ok, Pid} ->
+            connect(Pid, Name);
+        {error, Reason} = Error ->
+            ?SLOG(error, #{
+                msg => "egress_client_start_failed",
+                config => emqx_utils:redact(ClientOpts),
+                reason => Reason
+            }),
+            Error
+    end.
+
+mk_client_opts(WorkerId, ClientOpts = #{clientid := ClientId}) ->
+    ClientOpts#{clientid := mk_clientid(WorkerId, ClientId)}.
+
+mk_clientid(WorkerId, ClientId) ->
+    iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]).
+
+connect(Pid, Name) ->
+    case emqtt:connect(Pid) of
+        {ok, _Props} ->
+            {ok, Pid};
+        {error, Reason} = Error ->
+            ?SLOG(warning, #{
+                msg => "egress_client_connect_failed",
+                reason => Reason,
+                name => Name
+            }),
+            _ = catch emqtt:stop(Pid),
+            Error
+    end.
+
+%%
+
+-spec config(map()) ->
+    egress().
+config(#{remote := RC = #{}} = Conf) ->
+    Conf#{remote => emqx_connector_mqtt_msg:parse(RC)}.
+
+-spec send(pid(), message(), egress()) ->
+    ok.
+send(Pid, MsgIn, Egress) ->
+    emqtt:publish(Pid, export_msg(MsgIn, Egress)).
+
+-spec send_async(pid(), message(), callback(), egress()) ->
+    ok | {ok, pid()}.
+send_async(Pid, MsgIn, Callback, Egress) ->
+    ok = emqtt:publish_async(Pid, export_msg(MsgIn, Egress), _Timeout = infinity, Callback),
+    {ok, Pid}.
+
+export_msg(Msg, #{remote := Remote}) ->
+    to_remote_msg(Msg, Remote).
+
+-spec to_remote_msg(message(), emqx_connector_mqtt_msg:msgvars()) ->
+    remote_message().
+to_remote_msg(#message{flags = Flags} = Msg, Vars) ->
+    {EventMsg, _} = emqx_rule_events:eventmsg_publish(Msg),
+    to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars);
+to_remote_msg(Msg = #{}, Remote) ->
+    #{
+        topic := Topic,
+        payload := Payload,
+        qos := QoS,
+        retain := Retain
+    } = emqx_connector_mqtt_msg:render(Msg, Remote),
+    PubProps = maps:get(pub_props, Msg, #{}),
+    #mqtt_msg{
+        qos = QoS,
+        retain = Retain,
+        topic = Topic,
+        props = emqx_utils:pub_props_to_packet(PubProps),
+        payload = Payload
+    }.
+
+%%
+
+-spec info(pid()) ->
+    [{atom(), term()}].
+info(Pid) ->
+    emqtt:info(Pid).
+
+-spec status(pid()) ->
+    emqx_resource:resource_status().
+status(Pid) ->
+    try
+        case proplists:get_value(socket, info(Pid)) of
+            Socket when Socket /= undefined ->
+                connected;
+            undefined ->
+                connecting
+        end
+    catch
+        exit:{noproc, _} ->
+            disconnected
+    end.

+ 272 - 0
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_ingress.erl

@@ -0,0 +1,272 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_connector_mqtt_ingress).
+
+-include_lib("emqx/include/logger.hrl").
+
+-behaviour(ecpool_worker).
+
+%% ecpool
+-export([connect/1]).
+
+%% management APIs
+-export([
+    status/1,
+    info/1
+]).
+
+-export([handle_publish/4]).
+-export([handle_disconnect/1]).
+
+-type name() :: term().
+
+-type option() ::
+    {name, name()}
+    | {ingress, map()}
+    %% see `emqtt:option()`
+    | {client_opts, map()}.
+
+-type ingress() :: #{
+    server := string(),
+    remote := #{
+        topic := emqx_topic:topic(),
+        qos => emqx_types:qos()
+    },
+    local := emqx_connector_mqtt_msg:msgvars(),
+    on_message_received := {module(), atom(), [term()]}
+}.
+
+%% @doc Start an ingress bridge worker.
+-spec connect([option() | {ecpool_worker_id, pos_integer()}]) ->
+    {ok, pid()} | {error, _Reason}.
+connect(Options) ->
+    ?SLOG(debug, #{
+        msg => "ingress_client_starting",
+        options => emqx_utils:redact(Options)
+    }),
+    Name = proplists:get_value(name, Options),
+    WorkerId = proplists:get_value(ecpool_worker_id, Options),
+    Ingress = config(proplists:get_value(ingress, Options), Name),
+    ClientOpts = proplists:get_value(client_opts, Options),
+    case emqtt:start_link(mk_client_opts(WorkerId, Ingress, ClientOpts)) of
+        {ok, Pid} ->
+            connect(Pid, Name, Ingress);
+        {error, Reason} = Error ->
+            ?SLOG(error, #{
+                msg => "client_start_failed",
+                config => emqx_utils:redact(ClientOpts),
+                reason => Reason
+            }),
+            Error
+    end.
+
+mk_client_opts(WorkerId, Ingress, ClientOpts = #{clientid := ClientId}) ->
+    ClientOpts#{
+        clientid := mk_clientid(WorkerId, ClientId),
+        msg_handler => mk_client_event_handler(Ingress)
+    }.
+
+mk_clientid(WorkerId, ClientId) ->
+    iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]).
+
+mk_client_event_handler(Ingress = #{}) ->
+    IngressVars = maps:with([server], Ingress),
+    OnMessage = maps:get(on_message_received, Ingress, undefined),
+    LocalPublish =
+        case Ingress of
+            #{local := Local = #{topic := _}} ->
+                Local;
+            #{} ->
+                undefined
+        end,
+    #{
+        publish => {fun ?MODULE:handle_publish/4, [OnMessage, LocalPublish, IngressVars]},
+        disconnected => {fun ?MODULE:handle_disconnect/1, []}
+    }.
+
+-spec connect(pid(), name(), ingress()) ->
+    {ok, pid()} | {error, _Reason}.
+connect(Pid, Name, Ingress) ->
+    case emqtt:connect(Pid) of
+        {ok, _Props} ->
+            case subscribe_remote_topic(Pid, Ingress) of
+                {ok, _, _RCs} ->
+                    {ok, Pid};
+                {error, Reason} = Error ->
+                    ?SLOG(error, #{
+                        msg => "ingress_client_subscribe_failed",
+                        ingress => Ingress,
+                        reason => Reason
+                    }),
+                    _ = catch emqtt:stop(Pid),
+                    Error
+            end;
+        {error, Reason} = Error ->
+            ?SLOG(warning, #{
+                msg => "ingress_client_connect_failed",
+                reason => Reason,
+                name => Name
+            }),
+            _ = catch emqtt:stop(Pid),
+            Error
+    end.
+
+subscribe_remote_topic(Pid, #{remote := #{topic := RemoteTopic, qos := QoS}}) ->
+    emqtt:subscribe(Pid, RemoteTopic, QoS).
+
+%%
+
+-spec config(map(), name()) ->
+    ingress().
+config(#{remote := RC, local := LC} = Conf, BridgeName) ->
+    Conf#{
+        remote => parse_remote(RC, BridgeName),
+        local => emqx_connector_mqtt_msg:parse(LC)
+    }.
+
+parse_remote(#{qos := QoSIn} = Conf, BridgeName) ->
+    QoS = downgrade_ingress_qos(QoSIn),
+    case QoS of
+        QoSIn ->
+            ok;
+        _ ->
+            ?SLOG(warning, #{
+                msg => "downgraded_unsupported_ingress_qos",
+                qos_configured => QoSIn,
+                qos_used => QoS,
+                name => BridgeName
+            })
+    end,
+    Conf#{qos => QoS}.
+
+downgrade_ingress_qos(2) ->
+    1;
+downgrade_ingress_qos(QoS) ->
+    QoS.
+
+%%
+
+-spec info(pid()) ->
+    [{atom(), term()}].
+info(Pid) ->
+    emqtt:info(Pid).
+
+-spec status(pid()) ->
+    emqx_resource:resource_status().
+status(Pid) ->
+    try
+        case proplists:get_value(socket, info(Pid)) of
+            Socket when Socket /= undefined ->
+                connected;
+            undefined ->
+                connecting
+        end
+    catch
+        exit:{noproc, _} ->
+            disconnected
+    end.
+
+%%
+
+handle_publish(#{properties := Props} = MsgIn, OnMessage, LocalPublish, IngressVars) ->
+    Msg = import_msg(MsgIn, IngressVars),
+    ?SLOG(debug, #{
+        msg => "publish_local",
+        message => Msg
+    }),
+    maybe_on_message_received(Msg, OnMessage),
+    maybe_publish_local(Msg, LocalPublish, Props).
+
+handle_disconnect(_Reason) ->
+    ok.
+
+maybe_on_message_received(Msg, {Mod, Func, Args}) ->
+    erlang:apply(Mod, Func, [Msg | Args]);
+maybe_on_message_received(_Msg, undefined) ->
+    ok.
+
+maybe_publish_local(Msg, Local = #{}, Props) ->
+    emqx_broker:publish(to_broker_msg(Msg, Local, Props));
+maybe_publish_local(_Msg, undefined, _Props) ->
+    ok.
+
+%%
+
+import_msg(
+    #{
+        dup := Dup,
+        payload := Payload,
+        properties := Props,
+        qos := QoS,
+        retain := Retain,
+        topic := Topic
+    },
+    #{server := Server}
+) ->
+    #{
+        id => emqx_guid:to_hexstr(emqx_guid:gen()),
+        server => Server,
+        payload => Payload,
+        topic => Topic,
+        qos => QoS,
+        dup => Dup,
+        retain => Retain,
+        pub_props => printable_maps(Props),
+        message_received_at => erlang:system_time(millisecond)
+    }.
+
+printable_maps(undefined) ->
+    #{};
+printable_maps(Headers) ->
+    maps:fold(
+        fun
+            ('User-Property', V0, AccIn) when is_list(V0) ->
+                AccIn#{
+                    'User-Property' => maps:from_list(V0),
+                    'User-Property-Pairs' => [
+                        #{
+                            key => Key,
+                            value => Value
+                        }
+                     || {Key, Value} <- V0
+                    ]
+                };
+            (K, V0, AccIn) ->
+                AccIn#{K => V0}
+        end,
+        #{},
+        Headers
+    ).
+
+%% published from remote node over a MQTT connection
+to_broker_msg(Msg, Vars, undefined) ->
+    to_broker_msg(Msg, Vars, #{});
+to_broker_msg(#{dup := Dup} = Msg, Local, Props) ->
+    #{
+        topic := Topic,
+        payload := Payload,
+        qos := QoS,
+        retain := Retain
+    } = emqx_connector_mqtt_msg:render(Msg, Local),
+    PubProps = maps:get(pub_props, Msg, #{}),
+    emqx_message:set_headers(
+        Props#{properties => emqx_utils:pub_props_to_packet(PubProps)},
+        emqx_message:set_flags(
+            #{dup => Dup, retain => Retain},
+            emqx_message:make(bridge, QoS, Topic, Payload)
+        )
+    ).

+ 95 - 0
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl

@@ -0,0 +1,95 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_connector_mqtt_msg).
+
+-export([parse/1]).
+-export([render/2]).
+
+-export_type([msgvars/0]).
+
+-type template() :: emqx_plugin_libs_rule:tmpl_token().
+
+-type msgvars() :: #{
+    topic => template(),
+    qos => template() | emqx_types:qos(),
+    retain => template() | boolean(),
+    payload => template() | undefined
+}.
+
+%%
+
+-spec parse(#{
+    topic => iodata(),
+    qos => iodata() | emqx_types:qos(),
+    retain => iodata() | boolean(),
+    payload => iodata()
+}) ->
+    msgvars().
+parse(Conf) ->
+    Acc1 = parse_field(topic, Conf, Conf),
+    Acc2 = parse_field(qos, Conf, Acc1),
+    Acc3 = parse_field(payload, Conf, Acc2),
+    parse_field(retain, Conf, Acc3).
+
+parse_field(Key, Conf, Acc) ->
+    case Conf of
+        #{Key := Val} when is_binary(Val) ->
+            Acc#{Key => emqx_plugin_libs_rule:preproc_tmpl(Val)};
+        #{Key := Val} ->
+            Acc#{Key => Val};
+        #{} ->
+            Acc
+    end.
+
+render(
+    Msg,
+    #{
+        topic := TopicToken,
+        qos := QoSToken,
+        retain := RetainToken
+    } = Vars
+) ->
+    #{
+        topic => render_string(TopicToken, Msg),
+        payload => render_payload(Vars, Msg),
+        qos => render_simple_var(QoSToken, Msg),
+        retain => render_simple_var(RetainToken, Msg)
+    }.
+
+render_payload(From, MapMsg) ->
+    do_render_payload(maps:get(payload, From, undefined), MapMsg).
+
+do_render_payload(undefined, Msg) ->
+    emqx_utils_json:encode(Msg);
+do_render_payload(Tks, Msg) ->
+    render_string(Tks, Msg).
+
+%% Replace a string contains vars to another string in which the placeholders are replace by the
+%% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be:
+%% "a: 1".
+render_string(Tokens, Data) when is_list(Tokens) ->
+    emqx_placeholder:proc_tmpl(Tokens, Data, #{return => full_binary});
+render_string(Val, _Data) ->
+    Val.
+
+%% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result
+%% value will be an integer 1.
+render_simple_var(Tokens, Data) when is_list(Tokens) ->
+    [Var] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}),
+    Var;
+render_simple_var(Val, _Data) ->
+    Val.

+ 16 - 5
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl

@@ -73,7 +73,6 @@ fields("server_configs") ->
                 }
             )},
         {server, emqx_schema:servers_sc(#{desc => ?DESC("server")}, ?MQTT_HOST_OPTS)},
-        {pool_size, fun emqx_connector_schema_lib:pool_size/1},
         {clientid_prefix, mk(binary(), #{required => false, desc => ?DESC("clientid_prefix")})},
         {reconnect_interval, mk(string(), #{deprecated => {since, "v5.0.16"}})},
         {proto_ver,
@@ -135,12 +134,13 @@ fields("server_configs") ->
     ] ++ emqx_connector_schema_lib:ssl_fields();
 fields("ingress") ->
     [
-        {"remote",
+        {pool_size, fun ingress_pool_size/1},
+        {remote,
             mk(
                 ref(?MODULE, "ingress_remote"),
                 #{desc => ?DESC(emqx_connector_mqtt_schema, "ingress_remote")}
             )},
-        {"local",
+        {local,
             mk(
                 ref(?MODULE, "ingress_local"),
                 #{
@@ -206,7 +206,8 @@ fields("ingress_local") ->
     ];
 fields("egress") ->
     [
-        {"local",
+        {pool_size, fun egress_pool_size/1},
+        {local,
             mk(
                 ref(?MODULE, "egress_local"),
                 #{
@@ -214,7 +215,7 @@ fields("egress") ->
                     required => false
                 }
             )},
-        {"remote",
+        {remote,
             mk(
                 ref(?MODULE, "egress_remote"),
                 #{
@@ -274,6 +275,16 @@ fields("egress_remote") ->
             )}
     ].
 
+ingress_pool_size(desc) ->
+    ?DESC("ingress_pool_size");
+ingress_pool_size(Prop) ->
+    emqx_connector_schema_lib:pool_size(Prop).
+
+egress_pool_size(desc) ->
+    ?DESC("egress_pool_size");
+egress_pool_size(Prop) ->
+    emqx_connector_schema_lib:pool_size(Prop).
+
 desc("server_configs") ->
     ?DESC("server_configs");
 desc("ingress") ->

+ 0 - 490
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl

@@ -1,490 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_connector_mqtt_worker).
-
--include_lib("emqx/include/logger.hrl").
--include_lib("emqx/include/emqx.hrl").
-
-%% APIs
--export([
-    init/2,
-    connect/1,
-    stop/1
-]).
-
-%% management APIs
--export([
-    status/1,
-    ping/1,
-    info/1,
-    send_to_remote/3,
-    send_to_remote_async/4
-]).
-
--export([handle_publish/4]).
--export([handle_disconnect/1]).
-
--export_type([config/0]).
-
--type template() :: emqx_plugin_libs_rule:tmpl_token().
-
--type name() :: term().
--type options() :: #{
-    % endpoint
-    server := iodata(),
-    pool_size := pos_integer(),
-    % emqtt client options
-    proto_ver := v3 | v4 | v5,
-    username := binary(),
-    password := binary(),
-    clientid := binary(),
-    clean_start := boolean(),
-    max_inflight := pos_integer(),
-    connect_timeout := pos_integer(),
-    retry_interval := timeout(),
-    keepalive := non_neg_integer(),
-    bridge_mode := boolean(),
-    ssl := boolean(),
-    ssl_opts := proplists:proplist(),
-    % bridge options
-    ingress := map(),
-    egress := map()
-}.
-
--type client_option() ::
-    emqtt:option()
-    | {pool_size, pos_integer()}
-    | {name, name()}
-    | {ingress, ingress() | undefined}.
-
--type config() :: egress() | undefined.
-
--type ingress() :: #{
-    remote := #{
-        topic := emqx_topic:topic(),
-        qos => emqx_types:qos()
-    },
-    local := msgvars(),
-    on_message_received := {module(), atom(), [term()]}
-}.
-
--type egress() :: #{
-    local => #{
-        topic => emqx_topic:topic()
-    },
-    remote := msgvars()
-}.
-
--type msgvars() :: #{
-    topic => template(),
-    qos => template() | emqx_types:qos(),
-    retain => template() | boolean(),
-    payload => template() | undefined
-}.
-
--include_lib("emqx/include/logger.hrl").
--include_lib("emqx/include/emqx_mqtt.hrl").
-
--spec init(name(), options()) ->
-    {ok, [client_option()], config()}.
-init(Name, BridgeOpts) ->
-    Ingress = pre_process_ingress(maps:get(ingress, BridgeOpts), Name, BridgeOpts),
-    Egress = pre_process_egress(maps:get(egress, BridgeOpts)),
-    ClientOpts = mk_client_options(Name, Ingress, BridgeOpts),
-    {ok, maps:to_list(ClientOpts), Egress}.
-
-%% @doc Start a bridge worker.
--spec connect([client_option() | {ecpool_worker_id, pos_integer()}]) ->
-    {ok, pid()} | {error, _Reason}.
-connect(ClientOpts0) ->
-    ?SLOG(debug, #{
-        msg => "client_starting",
-        options => emqx_utils:redact(ClientOpts0)
-    }),
-    {value, {_, Name}, ClientOpts1} = lists:keytake(name, 1, ClientOpts0),
-    {value, {_, WorkerId}, ClientOpts} = lists:keytake(ecpool_worker_id, 1, ClientOpts1),
-    case emqtt:start_link(mk_emqtt_opts(WorkerId, ClientOpts)) of
-        {ok, Pid} ->
-            connect(Pid, Name, WorkerId, ClientOpts);
-        {error, Reason} = Error ->
-            ?SLOG(error, #{
-                msg => "client_start_failed",
-                config => emqx_utils:redact(ClientOpts),
-                reason => Reason
-            }),
-            Error
-    end.
-
-mk_emqtt_opts(WorkerId, ClientOpts) ->
-    ClientId = proplists:get_value(clientid, ClientOpts),
-    lists:keystore(clientid, 1, ClientOpts, {clientid, mk_clientid(WorkerId, ClientId)}).
-
-mk_clientid(WorkerId, ClientId) ->
-    iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]).
-
-connect(Pid, Name, WorkerId, ClientOpts) ->
-    case emqtt:connect(Pid) of
-        {ok, _Props} ->
-            Ingress = proplists:get_value(ingress, ClientOpts),
-            case subscribe_remote_topic(Pid, WorkerId, Ingress) of
-                false ->
-                    {ok, Pid};
-                {ok, _, _RCs} ->
-                    {ok, Pid};
-                {error, Reason} = Error ->
-                    ?SLOG(error, #{
-                        msg => "client_subscribe_failed",
-                        ingress => Ingress,
-                        reason => Reason
-                    }),
-                    _ = catch emqtt:stop(Pid),
-                    Error
-            end;
-        {error, Reason} = Error ->
-            ?SLOG(warning, #{
-                msg => "client_connect_failed",
-                reason => Reason,
-                name => Name
-            }),
-            _ = catch emqtt:stop(Pid),
-            Error
-    end.
-
-subscribe_remote_topic(Pid, WorkerId, #{remote := #{topic := RemoteTopic, qos := QoS}}) ->
-    case emqx_topic:parse(RemoteTopic) of
-        {_Filter, #{share := _Name}} ->
-            % NOTE: this is shared subscription, each worker may subscribe
-            emqtt:subscribe(Pid, RemoteTopic, QoS);
-        {_Filter, #{}} when WorkerId =:= 1 ->
-            % NOTE: this is regular subscription, only the first worker should subscribe
-            emqtt:subscribe(Pid, RemoteTopic, QoS);
-        {_Filter, #{}} ->
-            false
-    end;
-subscribe_remote_topic(_Ref, _, undefined) ->
-    false.
-
-mk_client_options(Name, Ingress, BridgeOpts) ->
-    Server = iolist_to_binary(maps:get(server, BridgeOpts)),
-    HostPort = emqx_connector_mqtt_schema:parse_server(Server),
-    CleanStart =
-        case Ingress of
-            #{remote := _} ->
-                maps:get(clean_start, BridgeOpts);
-            undefined ->
-                %% NOTE
-                %% We are ignoring the user configuration here because there's currently no reliable way
-                %% to ensure proper session recovery according to the MQTT spec.
-                true
-        end,
-    Opts = maps:with(
-        [
-            pool_size,
-            proto_ver,
-            username,
-            password,
-            clientid,
-            max_inflight,
-            connect_timeout,
-            retry_interval,
-            keepalive,
-            bridge_mode,
-            ssl,
-            ssl_opts
-        ],
-        BridgeOpts
-    ),
-    Opts#{
-        name => Name,
-        ingress => Ingress,
-        msg_handler => mk_client_event_handler(Ingress, #{server => Server}),
-        hosts => [HostPort],
-        clean_start => CleanStart,
-        force_ping => true
-    }.
-
-mk_client_event_handler(Ingress = #{}, Opts) ->
-    OnMessage = maps:get(on_message_received, Ingress, undefined),
-    LocalPublish =
-        case Ingress of
-            #{local := Local = #{topic := _}} ->
-                Local;
-            #{} ->
-                undefined
-        end,
-    #{
-        publish => {fun ?MODULE:handle_publish/4, [OnMessage, LocalPublish, Opts]},
-        disconnected => {fun ?MODULE:handle_disconnect/1, []}
-    };
-mk_client_event_handler(undefined, _Opts) ->
-    undefined.
-
-stop(Pid) ->
-    emqtt:stop(Pid).
-
-info(Pid) ->
-    emqtt:info(Pid).
-
-status(Pid) ->
-    try
-        case proplists:get_value(socket, info(Pid)) of
-            Socket when Socket /= undefined ->
-                connected;
-            undefined ->
-                connecting
-        end
-    catch
-        exit:{noproc, _} ->
-            disconnected
-    end.
-
-ping(Pid) ->
-    emqtt:ping(Pid).
-
-send_to_remote(Pid, MsgIn, Conf) ->
-    do_send(Pid, export_msg(MsgIn, Conf)).
-
-do_send(Pid, Msg) when Msg /= undefined ->
-    emqtt:publish(Pid, Msg);
-do_send(_Name, undefined) ->
-    ok.
-
-send_to_remote_async(Pid, MsgIn, Callback, Conf) ->
-    do_send_async(Pid, export_msg(MsgIn, Conf), Callback).
-
-do_send_async(Pid, Msg, Callback) when Msg /= undefined ->
-    ok = emqtt:publish_async(Pid, Msg, _Timeout = infinity, Callback),
-    {ok, Pid};
-do_send_async(_Pid, undefined, _Callback) ->
-    ok.
-
-pre_process_ingress(
-    #{remote := RC, local := LC} = Conf,
-    BridgeName,
-    BridgeOpts
-) when is_map(Conf) ->
-    Conf#{
-        remote => pre_process_in_remote(RC, BridgeName, BridgeOpts),
-        local => pre_process_common(LC)
-    };
-pre_process_ingress(Conf, _, _) when is_map(Conf) ->
-    %% have no 'local' field in the config
-    undefined.
-
-pre_process_egress(#{remote := RC} = Conf) when is_map(Conf) ->
-    Conf#{remote => pre_process_common(RC)};
-pre_process_egress(Conf) when is_map(Conf) ->
-    %% have no 'remote' field in the config
-    undefined.
-
-pre_process_common(Conf0) ->
-    Conf1 = pre_process_conf(topic, Conf0),
-    Conf2 = pre_process_conf(qos, Conf1),
-    Conf3 = pre_process_conf(payload, Conf2),
-    pre_process_conf(retain, Conf3).
-
-pre_process_conf(Key, Conf) ->
-    case maps:find(Key, Conf) of
-        error ->
-            Conf;
-        {ok, Val} when is_binary(Val) ->
-            Conf#{Key => emqx_plugin_libs_rule:preproc_tmpl(Val)};
-        {ok, Val} ->
-            Conf#{Key => Val}
-    end.
-
-pre_process_in_remote(#{qos := QoSIn} = Conf, BridgeName, BridgeOpts) ->
-    QoS = downgrade_ingress_qos(QoSIn),
-    case QoS of
-        QoSIn ->
-            ok;
-        _ ->
-            ?SLOG(warning, #{
-                msg => "downgraded_unsupported_ingress_qos",
-                qos_configured => QoSIn,
-                qos_used => QoS,
-                name => BridgeName,
-                options => BridgeOpts
-            })
-    end,
-    Conf#{qos => QoS}.
-
-downgrade_ingress_qos(2) ->
-    1;
-downgrade_ingress_qos(QoS) ->
-    QoS.
-
-export_msg(Msg, #{remote := Remote}) ->
-    to_remote_msg(Msg, Remote);
-export_msg(Msg, undefined) ->
-    ?SLOG(error, #{
-        msg => "forwarding_unavailable",
-        message => Msg,
-        reason => "egress is not configured"
-    }),
-    undefined.
-
-%%
-
-handle_publish(#{properties := Props} = MsgIn, OnMessage, LocalPublish, Opts) ->
-    Msg = import_msg(MsgIn, Opts),
-    ?SLOG(debug, #{
-        msg => "publish_local",
-        message => Msg
-    }),
-    maybe_on_message_received(Msg, OnMessage),
-    maybe_publish_local(Msg, LocalPublish, Props).
-
-handle_disconnect(_Reason) ->
-    ok.
-
-maybe_on_message_received(Msg, {Mod, Func, Args}) ->
-    erlang:apply(Mod, Func, [Msg | Args]);
-maybe_on_message_received(_Msg, undefined) ->
-    ok.
-
-maybe_publish_local(Msg, Local = #{}, Props) ->
-    emqx_broker:publish(to_broker_msg(Msg, Local, Props));
-maybe_publish_local(_Msg, undefined, _Props) ->
-    ok.
-
-import_msg(
-    #{
-        dup := Dup,
-        payload := Payload,
-        properties := Props,
-        qos := QoS,
-        retain := Retain,
-        topic := Topic
-    },
-    #{server := Server}
-) ->
-    #{
-        id => emqx_guid:to_hexstr(emqx_guid:gen()),
-        server => Server,
-        payload => Payload,
-        topic => Topic,
-        qos => QoS,
-        dup => Dup,
-        retain => Retain,
-        pub_props => printable_maps(Props),
-        message_received_at => erlang:system_time(millisecond)
-    }.
-
-printable_maps(undefined) ->
-    #{};
-printable_maps(Headers) ->
-    maps:fold(
-        fun
-            ('User-Property', V0, AccIn) when is_list(V0) ->
-                AccIn#{
-                    'User-Property' => maps:from_list(V0),
-                    'User-Property-Pairs' => [
-                        #{
-                            key => Key,
-                            value => Value
-                        }
-                     || {Key, Value} <- V0
-                    ]
-                };
-            (K, V0, AccIn) ->
-                AccIn#{K => V0}
-        end,
-        #{},
-        Headers
-    ).
-
-%% Shame that we have to know the callback module here
-%% would be great if we can get rid of #mqtt_msg{} record
-%% and use #message{} in all places.
--spec to_remote_msg(emqx_types:message() | map(), msgvars()) ->
-    #mqtt_msg{}.
-to_remote_msg(#message{flags = Flags} = Msg, Vars) ->
-    {EventMsg, _} = emqx_rule_events:eventmsg_publish(Msg),
-    to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars);
-to_remote_msg(
-    MapMsg,
-    #{
-        topic := TopicToken,
-        qos := QoSToken,
-        retain := RetainToken
-    } = Remote
-) when is_map(MapMsg) ->
-    Topic = replace_vars_in_str(TopicToken, MapMsg),
-    Payload = process_payload(Remote, MapMsg),
-    QoS = replace_simple_var(QoSToken, MapMsg),
-    Retain = replace_simple_var(RetainToken, MapMsg),
-    PubProps = maps:get(pub_props, MapMsg, #{}),
-    #mqtt_msg{
-        qos = QoS,
-        retain = Retain,
-        topic = Topic,
-        props = emqx_utils:pub_props_to_packet(PubProps),
-        payload = Payload
-    }.
-
-%% published from remote node over a MQTT connection
-to_broker_msg(Msg, Vars, undefined) ->
-    to_broker_msg(Msg, Vars, #{});
-to_broker_msg(
-    #{dup := Dup} = MapMsg,
-    #{
-        topic := TopicToken,
-        qos := QoSToken,
-        retain := RetainToken
-    } = Local,
-    Props
-) ->
-    Topic = replace_vars_in_str(TopicToken, MapMsg),
-    Payload = process_payload(Local, MapMsg),
-    QoS = replace_simple_var(QoSToken, MapMsg),
-    Retain = replace_simple_var(RetainToken, MapMsg),
-    PubProps = maps:get(pub_props, MapMsg, #{}),
-    set_headers(
-        Props#{properties => emqx_utils:pub_props_to_packet(PubProps)},
-        emqx_message:set_flags(
-            #{dup => Dup, retain => Retain},
-            emqx_message:make(bridge, QoS, Topic, Payload)
-        )
-    ).
-
-process_payload(From, MapMsg) ->
-    do_process_payload(maps:get(payload, From, undefined), MapMsg).
-
-do_process_payload(undefined, Msg) ->
-    emqx_utils_json:encode(Msg);
-do_process_payload(Tks, Msg) ->
-    replace_vars_in_str(Tks, Msg).
-
-%% Replace a string contains vars to another string in which the placeholders are replace by the
-%% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be:
-%% "a: 1".
-replace_vars_in_str(Tokens, Data) when is_list(Tokens) ->
-    emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => full_binary});
-replace_vars_in_str(Val, _Data) ->
-    Val.
-
-%% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result
-%% value will be an integer 1.
-replace_simple_var(Tokens, Data) when is_list(Tokens) ->
-    [Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}),
-    Var;
-replace_simple_var(Val, _Data) ->
-    Val.
-
-set_headers(Val, Msg) ->
-    emqx_message:set_headers(Val, Msg).

+ 2 - 1
apps/emqx_resource/src/emqx_resource.erl

@@ -121,7 +121,8 @@
 
 -export_type([
     resource_id/0,
-    resource_data/0
+    resource_data/0,
+    resource_status/0
 ]).
 
 -optional_callbacks([

+ 18 - 0
rel/i18n/emqx_connector_mqtt_schema.hocon

@@ -32,6 +32,14 @@ is configured, then both the data got from the rule and the MQTT messages that m
 egress_desc.label:
 """Egress Configs"""
 
+egress_pool_size.desc:
+"""Size of the pool of MQTT clients that will publish messages to the remote broker.<br/>
+        Each MQTT client will be assigned 'clientid' of the form '${clientid_prefix}:${bridge_name}:egress:${node}:${n}'
+        where 'n' is the number of a client inside the pool."""
+
+egress_pool_size.label:
+"""Pool Size"""
+
 egress_local.desc:
 """The configs about receiving messages from local broker."""
 
@@ -75,6 +83,16 @@ ingress_desc.desc:
 ingress_desc.label:
 """Ingress Configs"""
 
+ingress_pool_size.desc:
+"""Size of the pool of MQTT clients that will ingest messages from the remote broker.<br/>
+        This value will be respected only if 'remote.topic' is a shared subscription topic filter,
+        otherwise only a single MQTT client will be used.
+        Each MQTT client will be assigned 'clientid' of the form '${clientid_prefix}:${bridge_name}:ingress:${node}:${n}'
+        where 'n' is the number of a client inside the pool."""
+
+ingress_pool_size.label:
+"""Pool Size"""
+
 ingress_local.desc:
 """The configs about sending message to the local broker."""