Просмотр исходного кода

feat: refactor MQTT bridge to source, action, and connector

This commit:

* refactors the MQTT V1 bridge into connector, source and action
* Extends the compatibility layer so it works for sources
* Fixes the MQTT bridge test suite so that all test cases passes

We still need to add a HTTP API handling sources. Also, we still need to
add HTTP API example schemes and examples for the MQTT
connector/action/source.

We should also make sure that we handle the corner cases of the MQTT V1
bridge automatic upgrade downgrade in a sufficiently good way:

* An error is currently thrown when converting an MQTT V1 bridge without
  egress or ingress config.
* If there is a source and action with the same name we will currently
  throw an error in the compatibility layer.
* We will also throw an error when converting an MQTT V1 bridge with
  both ingress and egress.

The above is probably the right thing to do  but we have to make sure
that we return a reasonable error to the user when this happens.

(partly)
Fixes:
https://emqx.atlassian.net/browse/EMQX-11489
Kjell Winblad 2 лет назад
Родитель
Сommit
f199a0f24a

+ 67 - 9
apps/emqx_bridge/src/emqx_action_info.erl

@@ -26,7 +26,10 @@
     bridge_v1_type_to_action_type/1,
     bridge_v1_type_name/1,
     is_action_type/1,
-    registered_schema_modules/0,
+    is_source/1,
+    is_action/1,
+    registered_schema_modules_actions/0,
+    registered_schema_modules_sources/0,
     connector_action_config_to_bridge_v1_config/2,
     connector_action_config_to_bridge_v1_config/3,
     bridge_v1_config_to_connector_config/2,
@@ -51,19 +54,26 @@
     ConnectorConfig :: map(), ActionConfig :: map()
 ) -> map().
 %% Define this if the automatic config upgrade is not enough for the connector.
--callback bridge_v1_config_to_connector_config(BridgeV1Config :: map()) -> map().
+-callback bridge_v1_config_to_connector_config(BridgeV1Config :: map()) ->
+    map() | {ConnectorTypeName :: atom(), map()}.
 %% Define this if the automatic config upgrade is not enough for the bridge.
 %% If you want to make use of the automatic config upgrade, you can call
 %% emqx_action_info:transform_bridge_v1_config_to_action_config/4 in your
 %% implementation and do some adjustments on the result.
 -callback bridge_v1_config_to_action_config(BridgeV1Config :: map(), ConnectorName :: binary()) ->
-    map().
+    map() | {source | action, ActionTypeName :: atom(), map()} | 'none'.
+-callback is_source() ->
+    boolean().
+-callback is_action() ->
+    boolean().
 
 -optional_callbacks([
     bridge_v1_type_name/0,
     connector_action_config_to_bridge_v1_config/2,
     bridge_v1_config_to_connector_config/1,
-    bridge_v1_config_to_action_config/2
+    bridge_v1_config_to_action_config/2,
+    is_source/0,
+    is_action/0
 ]).
 
 %% ====================================================================
@@ -96,7 +106,10 @@ hard_coded_action_info_modules_ee() ->
 -endif.
 
 hard_coded_action_info_modules_common() ->
-    [emqx_bridge_http_action_info].
+    [
+        emqx_bridge_http_action_info,
+        emqx_bridge_mqtt_pubsub_action_info
+    ].
 
 hard_coded_action_info_modules() ->
     hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee().
@@ -178,10 +191,33 @@ is_action_type(Type) ->
         _ -> true
     end.
 
-registered_schema_modules() ->
+%% Returns true if the action is an ingress action, false otherwise.
+is_source(Bin) when is_binary(Bin) ->
+    is_source(binary_to_existing_atom(Bin));
+is_source(Type) ->
+    ActionInfoMap = info_map(),
+    IsSourceMap = maps:get(is_source, ActionInfoMap),
+    maps:get(Type, IsSourceMap, false).
+
+%% Returns true if the action is an egress action, false otherwise.
+is_action(Bin) when is_binary(Bin) ->
+    is_action(binary_to_existing_atom(Bin));
+is_action(Type) ->
+    ActionInfoMap = info_map(),
+    IsActionMap = maps:get(is_action, ActionInfoMap),
+    maps:get(Type, IsActionMap, true).
+
+registered_schema_modules_actions() ->
+    InfoMap = info_map(),
+    Schemas = maps:get(action_type_to_schema_module, InfoMap),
+    All = maps:to_list(Schemas),
+    [{Type, SchemaMod} || {Type, SchemaMod} <- All, is_action(Type)].
+
+registered_schema_modules_sources() ->
     InfoMap = info_map(),
     Schemas = maps:get(action_type_to_schema_module, InfoMap),
-    maps:to_list(Schemas).
+    All = maps:to_list(Schemas),
+    [{Type, SchemaMod} || {Type, SchemaMod} <- All, is_source(Type)].
 
 connector_action_config_to_bridge_v1_config(ActionOrBridgeType, ConnectorConfig, ActionConfig) ->
     Module = get_action_info_module(ActionOrBridgeType),
@@ -293,7 +329,9 @@ initial_info_map() ->
         action_type_to_bridge_v1_type => #{},
         action_type_to_connector_type => #{},
         action_type_to_schema_module => #{},
-        action_type_to_info_module => #{}
+        action_type_to_info_module => #{},
+        is_source => #{},
+        is_action => #{}
     }.
 
 get_info_map(Module) ->
@@ -312,6 +350,20 @@ get_info_map(Module) ->
             false ->
                 {ActionType, [ActionType]}
         end,
+    IsIngress =
+        case erlang:function_exported(Module, is_source, 0) of
+            true ->
+                Module:is_source();
+            false ->
+                false
+        end,
+    IsEgress =
+        case erlang:function_exported(Module, is_action, 0) of
+            true ->
+                Module:is_action();
+            false ->
+                true
+        end,
     #{
         action_type_names =>
             lists:foldl(
@@ -351,5 +403,11 @@ get_info_map(Module) ->
                 end,
                 #{ActionType => Module},
                 BridgeV1Types
-            )
+            ),
+        is_source => #{
+            ActionType => IsIngress
+        },
+        is_action => #{
+            ActionType => IsEgress
+        }
     }.

+ 7 - 1
apps/emqx_bridge/src/emqx_bridge.erl

@@ -353,7 +353,13 @@ get_metrics(Type, Name) ->
             case emqx_bridge_v2:bridge_v1_is_valid(Type, Name) of
                 true ->
                     BridgeV2Type = emqx_bridge_v2:bridge_v2_type_to_connector_type(Type),
-                    emqx_bridge_v2:get_metrics(BridgeV2Type, Name);
+                    try
+                        ConfRootKey = emqx_bridge_v2:get_conf_root_key_if_only_one(Type, Name),
+                        emqx_bridge_v2:get_metrics(ConfRootKey, BridgeV2Type, Name)
+                    catch
+                        error:Reason ->
+                            {error, Reason}
+                    end;
                 false ->
                     {error, not_bridge_v1_compatible}
             end;

+ 11 - 3
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -548,9 +548,17 @@ schema("/bridges_probe") ->
         Id,
         case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
             true ->
-                BridgeV2Type = emqx_bridge_v2:bridge_v2_type_to_connector_type(BridgeType),
-                ok = emqx_bridge_v2:reset_metrics(BridgeV2Type, BridgeName),
-                ?NO_CONTENT;
+                try
+                    ConfRootKey = emqx_bridge_v2:get_conf_root_key_if_only_one(
+                        BridgeType, BridgeName
+                    ),
+                    BridgeV2Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeType),
+                    ok = emqx_bridge_v2:reset_metrics(ConfRootKey, BridgeV2Type, BridgeName),
+                    ?NO_CONTENT
+                catch
+                    error:Reason ->
+                        ?BAD_REQUEST(Reason)
+                end;
             false ->
                 ok = emqx_bridge_resource:reset_metrics(
                     emqx_bridge_resource:resource_id(BridgeType, BridgeName)

+ 2 - 1
apps/emqx_bridge/src/emqx_bridge_lib.erl

@@ -82,7 +82,8 @@ external_ids(Type, Name) ->
 get_conf(BridgeType, BridgeName) ->
     case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
         true ->
-            emqx_conf:get_raw([actions, BridgeType, BridgeName]);
+            ConfRootName = emqx_bridge_v2:get_conf_root_key_if_only_one(BridgeType, BridgeName),
+            emqx_conf:get_raw([ConfRootName, BridgeType, BridgeName]);
         false ->
             undefined
     end.

Разница между файлами не показана из-за своего большого размера
+ 392 - 147
apps/emqx_bridge/src/emqx_bridge_v2.erl


+ 28 - 14
apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl

@@ -79,7 +79,7 @@ api_schema(Method) ->
     hoconsc:union(bridge_api_union(APISchemas)).
 
 registered_api_schemas(Method) ->
-    RegisteredSchemas = emqx_action_info:registered_schema_modules(),
+    RegisteredSchemas = emqx_action_info:registered_schema_modules_actions(),
     [
         api_ref(SchemaModule, atom_to_binary(BridgeV2Type), Method ++ "_bridge_v2")
      || {BridgeV2Type, SchemaModule} <- RegisteredSchemas
@@ -189,29 +189,43 @@ tags() ->
 -dialyzer({nowarn_function, roots/0}).
 
 roots() ->
-    case fields(actions) of
-        [] ->
-            [
-                {actions,
-                    ?HOCON(hoconsc:map(name, typerefl:map()), #{importance => ?IMPORTANCE_LOW})}
-            ];
-        _ ->
-            [{actions, ?HOCON(?R_REF(actions), #{importance => ?IMPORTANCE_LOW})}]
-    end.
+    ActionsRoot =
+        case fields(actions) of
+            [] ->
+                [
+                    {actions,
+                        ?HOCON(hoconsc:map(name, typerefl:map()), #{importance => ?IMPORTANCE_LOW})}
+                ];
+            _ ->
+                [{actions, ?HOCON(?R_REF(actions), #{importance => ?IMPORTANCE_LOW})}]
+        end,
+    SourcesRoot =
+        [{sources, ?HOCON(?R_REF(sources), #{importance => ?IMPORTANCE_LOW})}],
+    ActionsRoot ++ SourcesRoot.
 
 fields(actions) ->
-    registered_schema_fields();
+    registered_schema_fields_actions();
+fields(sources) ->
+    registered_schema_fields_sources();
 fields(resource_opts) ->
     resource_opts_fields(_Overrides = []).
 
-registered_schema_fields() ->
+registered_schema_fields_actions() ->
     [
         Module:fields(action)
-     || {_BridgeV2Type, Module} <- emqx_action_info:registered_schema_modules()
+     || {_BridgeV2Type, Module} <- emqx_action_info:registered_schema_modules_actions()
+    ].
+
+registered_schema_fields_sources() ->
+    [
+        Module:fields(source)
+     || {_BridgeV2Type, Module} <- emqx_action_info:registered_schema_modules_sources()
     ].
 
 desc(actions) ->
     ?DESC("desc_bridges_v2");
+desc(sources) ->
+    ?DESC("desc_sources");
 desc(resource_opts) ->
     ?DESC(emqx_resource_schema, "resource_opts");
 desc(_) ->
@@ -264,7 +278,7 @@ examples(Method) ->
             ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]),
             lists:foldl(MergeFun, Examples, ConnectorExamples)
         end,
-    SchemaModules = [Mod || {_, Mod} <- emqx_action_info:registered_schema_modules()],
+    SchemaModules = [Mod || {_, Mod} <- emqx_action_info:registered_schema_modules_actions()],
     lists:foldl(Fun, #{}, SchemaModules).
 
 top_level_common_action_keys() ->

+ 271 - 133
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl

@@ -20,8 +20,13 @@
 -include_lib("emqx_resource/include/emqx_resource.hrl").
 
 -behaviour(emqx_resource).
+-behaviour(ecpool_worker).
+
+%% ecpool
+-export([connect/1]).
 
 -export([on_message_received/3]).
+-export([handle_disconnect/1]).
 
 %% callbacks of behaviour emqx_resource
 -export([
@@ -30,11 +35,25 @@
     on_stop/2,
     on_query/3,
     on_query_async/4,
-    on_get_status/2
+    on_get_status/2,
+    on_add_channel/4,
+    on_remove_channel/3,
+    on_get_channel_status/3,
+    on_get_channels/1
 ]).
 
 -export([on_async_result/2]).
 
+-type name() :: term().
+
+-type option() ::
+    {name, name()}
+    | {ingress, map()}
+    %% see `emqtt:option()`
+    | {client_opts, map()}.
+
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
 -define(HEALTH_CHECK_TIMEOUT, 1000).
 -define(INGRESS, "I").
 -define(EGRESS, "E").
@@ -42,142 +61,205 @@
 %% ===================================================================
 %% When use this bridge as a data source, ?MODULE:on_message_received will be called
 %% if the bridge received msgs from the remote broker.
-on_message_received(Msg, HookPoint, ResId) ->
+
+on_message_received(Msg, HookPoints, ResId) ->
     emqx_resource_metrics:received_inc(ResId),
-    emqx_hooks:run(HookPoint, [Msg]).
+    lists:foreach(
+        fun(HookPoint) ->
+            emqx_hooks:run(HookPoint, [Msg])
+        end,
+        HookPoints
+    ),
+    ok.
 
 %% ===================================================================
 callback_mode() -> async_if_possible.
 
-on_start(ResourceId, Conf) ->
+on_start(ResourceId, #{server := Server} = Conf) ->
     ?SLOG(info, #{
         msg => "starting_mqtt_connector",
         connector => ResourceId,
         config => emqx_utils:redact(Conf)
     }),
-    case start_ingress(ResourceId, Conf) of
+    TopicToHandlerIndex = emqx_topic_index:new(),
+    StartConf = Conf#{topic_to_handler_index => TopicToHandlerIndex},
+    case start_mqtt_clients(ResourceId, StartConf) of
         {ok, Result1} ->
-            case start_egress(ResourceId, Conf) of
-                {ok, Result2} ->
-                    {ok, maps:merge(Result1, Result2)};
-                {error, Reason} ->
-                    _ = stop_ingress(Result1),
-                    {error, Reason}
-            end;
+            {ok, Result1#{
+                installed_channels => #{},
+                clean_start => maps:get(clean_start, Conf),
+                topic_to_handler_index => TopicToHandlerIndex,
+                server => Server
+            }};
         {error, Reason} ->
             {error, Reason}
     end.
 
-start_ingress(ResourceId, Conf) ->
-    ClientOpts = mk_client_opts(ResourceId, ?INGRESS, Conf),
-    case mk_ingress_config(ResourceId, Conf) of
-        Ingress = #{} ->
-            start_ingress(ResourceId, Ingress, ClientOpts);
-        undefined ->
-            {ok, #{}}
-    end.
-
-start_ingress(ResourceId, Ingress, ClientOpts) ->
-    PoolName = <<ResourceId/binary, ":ingress">>,
-    PoolSize = choose_ingress_pool_size(ResourceId, Ingress),
-    Options = [
-        {name, PoolName},
-        {pool_size, PoolSize},
-        {ingress, Ingress},
-        {client_opts, ClientOpts}
-    ],
-    ok = emqx_resource:allocate_resource(ResourceId, ingress_pool_name, PoolName),
-    case emqx_resource_pool:start(PoolName, emqx_bridge_mqtt_ingress, Options) of
-        ok ->
-            {ok, #{ingress_pool_name => PoolName}};
-        {error, {start_pool_failed, _, Reason}} ->
-            {error, Reason}
-    end.
-
-choose_ingress_pool_size(<<?TEST_ID_PREFIX, _/binary>>, _) ->
-    1;
-choose_ingress_pool_size(
-    ResourceId,
-    #{remote := #{topic := RemoteTopic}, pool_size := PoolSize}
+on_add_channel(
+    _InstId,
+    #{
+        installed_channels := InstalledChannels,
+        clean_start := CleanStart
+    } = OldState,
+    ChannelId,
+    #{config_root := actions} = ChannelConfig
 ) ->
-    case emqx_topic:parse(RemoteTopic) of
-        {#share{} = _Filter, _SubOpts} ->
-            % NOTE: this is shared subscription, many workers may subscribe
-            PoolSize;
-        {_Filter, #{}} when PoolSize > 1 ->
-            % NOTE: this is regular subscription, only one worker should subscribe
+    %% Publisher channel
+    %% make a warning if clean_start is set to false
+    case CleanStart of
+        false ->
+            ?tp(
+                mqtt_clean_start_egress_action_warning,
+                #{
+                    channel_id => ChannelId,
+                    resource_id => _InstId
+                }
+            ),
             ?SLOG(warning, #{
-                msg => "mqtt_bridge_ingress_pool_size_ignored",
-                connector => ResourceId,
-                reason =>
-                    "Remote topic filter is not a shared subscription, "
-                    "ingress pool will start with a single worker",
-                config_pool_size => PoolSize,
-                pool_size => 1
-            }),
-            1;
-        {_Filter, #{}} when PoolSize == 1 ->
-            1
-    end.
+                msg => "mqtt_publisher_clean_start_false",
+                reason => "clean_start is set to false when using MQTT publisher action, " ++
+                    "which may cause unexpected behavior. " ++
+                    "For example, if the client ID is already subscribed to topics, " ++
+                    "we might receive messages that are unhanded.",
+                channel => ChannelId,
+                config => emqx_utils:redact(ChannelConfig)
+            });
+        true ->
+            ok
+    end,
+    ChannelState0 = maps:get(parameters, ChannelConfig),
+    ChannelState = emqx_bridge_mqtt_egress:config(ChannelState0),
+    NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
+    NewState = OldState#{installed_channels => NewInstalledChannels},
+    {ok, NewState};
+on_add_channel(
+    _ResourceId,
+    #{
+        installed_channels := InstalledChannels,
+        pool_name := PoolName,
+        topic_to_handler_index := TopicToHandlerIndex,
+        server := Server
+    } = OldState,
+    ChannelId,
+    #{hookpoints := HookPoints} = ChannelConfig
+) ->
+    %% Add ingress channel
+    ChannelState0 = maps:get(parameters, ChannelConfig),
+    ChannelState1 = ChannelState0#{
+        hookpoints => HookPoints,
+        server => Server,
+        config_root => sources
+    },
+    ChannelState2 = mk_ingress_config(ChannelId, ChannelState1, TopicToHandlerIndex),
+    ok = emqx_bridge_mqtt_ingress:subscribe_channel(PoolName, ChannelState2),
+    NewInstalledChannels = maps:put(ChannelId, ChannelState2, InstalledChannels),
+    NewState = OldState#{installed_channels => NewInstalledChannels},
+    {ok, NewState}.
+
+on_remove_channel(
+    _InstId,
+    #{
+        installed_channels := InstalledChannels,
+        pool_name := PoolName,
+        topic_to_handler_index := TopicToHandlerIndex
+    } = OldState,
+    ChannelId
+) ->
+    ChannelState = maps:get(ChannelId, InstalledChannels),
+    case ChannelState of
+        #{
+            config_root := sources
+        } ->
+            emqx_bridge_mqtt_ingress:unsubscribe_channel(
+                PoolName, ChannelState, ChannelId, TopicToHandlerIndex
+            ),
+            ok;
+        _ ->
+            ok
+    end,
+    NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
+    %% Update state
+    NewState = OldState#{installed_channels => NewInstalledChannels},
+    {ok, NewState}.
+
+on_get_channel_status(
+    _ResId,
+    ChannelId,
+    #{
+        installed_channels := Channels
+    } = _State
+) when is_map_key(ChannelId, Channels) ->
+    %% The channel should be ok as long as the MQTT client is ok
+    connected.
 
-start_egress(ResourceId, Conf) ->
-    % NOTE
-    % We are ignoring the user configuration here because there's currently no reliable way
-    % to ensure proper session recovery according to the MQTT spec.
-    ClientOpts = maps:put(clean_start, true, mk_client_opts(ResourceId, ?EGRESS, Conf)),
-    case mk_egress_config(Conf) of
-        Egress = #{} ->
-            start_egress(ResourceId, Egress, ClientOpts);
-        undefined ->
-            {ok, #{}}
-    end.
+on_get_channels(ResId) ->
+    emqx_bridge_v2:get_channels_for_connector(ResId).
 
-start_egress(ResourceId, Egress, ClientOpts) ->
-    PoolName = <<ResourceId/binary, ":egress">>,
-    PoolSize = maps:get(pool_size, Egress),
+start_mqtt_clients(ResourceId, Conf) ->
+    ClientOpts = mk_client_opts(ResourceId, Conf),
+    start_mqtt_clients(ResourceId, Conf, ClientOpts).
+
+start_mqtt_clients(ResourceId, StartConf, ClientOpts) ->
+    PoolName = <<ResourceId/binary>>,
+    #{
+        pool_size := PoolSize
+    } = StartConf,
     Options = [
         {name, PoolName},
         {pool_size, PoolSize},
         {client_opts, ClientOpts}
     ],
-    ok = emqx_resource:allocate_resource(ResourceId, egress_pool_name, PoolName),
-    case emqx_resource_pool:start(PoolName, emqx_bridge_mqtt_egress, Options) of
+    ok = emqx_resource:allocate_resource(ResourceId, pool_name, PoolName),
+    case emqx_resource_pool:start(PoolName, ?MODULE, Options) of
         ok ->
-            {ok, #{
-                egress_pool_name => PoolName,
-                egress_config => emqx_bridge_mqtt_egress:config(Egress)
-            }};
+            {ok, #{pool_name => PoolName}};
         {error, {start_pool_failed, _, Reason}} ->
             {error, Reason}
     end.
 
-on_stop(ResourceId, _State) ->
+on_stop(ResourceId, State) ->
     ?SLOG(info, #{
         msg => "stopping_mqtt_connector",
         connector => ResourceId
     }),
+    %% on_stop can be called with State = undefined
+    StateMap =
+        case State of
+            Map when is_map(State) ->
+                Map;
+            _ ->
+                #{}
+        end,
+    case maps:get(topic_to_handler_index, StateMap, undefined) of
+        undefined ->
+            ok;
+        TopicToHandlerIndex ->
+            emqx_topic_index:delete(TopicToHandlerIndex)
+    end,
     Allocated = emqx_resource:get_allocated_resources(ResourceId),
-    ok = stop_ingress(Allocated),
-    ok = stop_egress(Allocated).
-
-stop_ingress(#{ingress_pool_name := PoolName}) ->
-    emqx_resource_pool:stop(PoolName);
-stop_ingress(#{}) ->
-    ok.
+    ok = stop_helper(Allocated).
 
-stop_egress(#{egress_pool_name := PoolName}) ->
-    emqx_resource_pool:stop(PoolName);
-stop_egress(#{}) ->
-    ok.
+stop_helper(#{pool_name := PoolName}) ->
+    emqx_resource_pool:stop(PoolName).
 
 on_query(
     ResourceId,
-    {send_message, Msg},
-    #{egress_pool_name := PoolName, egress_config := Config}
+    {ChannelId, Msg},
+    #{pool_name := PoolName} = State
 ) ->
-    ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => ResourceId}),
-    handle_send_result(with_egress_client(PoolName, send, [Msg, Config]));
-on_query(ResourceId, {send_message, Msg}, #{}) ->
+    ?TRACE(
+        "QUERY",
+        "send_msg_to_remote_node",
+        #{
+            message => Msg,
+            connector => ResourceId,
+            channel_id => ChannelId
+        }
+    ),
+    Channels = maps:get(installed_channels, State),
+    ChannelConfig = maps:get(ChannelId, Channels),
+    handle_send_result(with_egress_client(PoolName, send, [Msg, ChannelConfig]));
+on_query(ResourceId, {_ChannelId, Msg}, #{}) ->
     ?SLOG(error, #{
         msg => "forwarding_unavailable",
         connector => ResourceId,
@@ -187,13 +269,15 @@ on_query(ResourceId, {send_message, Msg}, #{}) ->
 
 on_query_async(
     ResourceId,
-    {send_message, Msg},
+    {ChannelId, Msg},
     CallbackIn,
-    #{egress_pool_name := PoolName, egress_config := Config}
+    #{pool_name := PoolName} = State
 ) ->
     ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => ResourceId}),
     Callback = {fun on_async_result/2, [CallbackIn]},
-    Result = with_egress_client(PoolName, send_async, [Msg, Callback, Config]),
+    Channels = maps:get(installed_channels, State),
+    ChannelConfig = maps:get(ChannelId, Channels),
+    Result = with_egress_client(PoolName, send_async, [Msg, Callback, ChannelConfig]),
     case Result of
         ok ->
             ok;
@@ -202,7 +286,7 @@ on_query_async(
         {error, Reason} ->
             {error, classify_error(Reason)}
     end;
-on_query_async(ResourceId, {send_message, Msg}, _Callback, #{}) ->
+on_query_async(ResourceId, {_ChannelId, Msg}, _Callback, #{}) ->
     ?SLOG(error, #{
         msg => "forwarding_unavailable",
         connector => ResourceId,
@@ -251,7 +335,7 @@ classify_error(Reason) ->
     {unrecoverable_error, Reason}.
 
 on_get_status(_ResourceId, State) ->
-    Pools = maps:to_list(maps:with([ingress_pool_name, egress_pool_name], State)),
+    Pools = maps:to_list(maps:with([pool_name], State)),
     Workers = [{Pool, Worker} || {Pool, PN} <- Pools, {_Name, Worker} <- ecpool:workers(PN)],
     try emqx_utils:pmap(fun get_status/1, Workers, ?HEALTH_CHECK_TIMEOUT) of
         Statuses ->
@@ -261,12 +345,10 @@ on_get_status(_ResourceId, State) ->
             connecting
     end.
 
-get_status({Pool, Worker}) ->
+get_status({_Pool, Worker}) ->
     case ecpool_worker:client(Worker) of
-        {ok, Client} when Pool == ingress_pool_name ->
+        {ok, Client} ->
             emqx_bridge_mqtt_ingress:status(Client);
-        {ok, Client} when Pool == egress_pool_name ->
-            emqx_bridge_mqtt_egress:status(Client);
         {error, _} ->
             disconnected
     end.
@@ -284,30 +366,19 @@ combine_status(Statuses) ->
     end.
 
 mk_ingress_config(
-    ResourceId,
-    #{
-        ingress := Ingress = #{remote := _},
-        server := Server,
-        hookpoint := HookPoint
-    }
+    ChannelId,
+    IngressChannelConfig,
+    TopicToHandlerIndex
 ) ->
-    Ingress#{
-        server => Server,
-        on_message_received => {?MODULE, on_message_received, [HookPoint, ResourceId]}
-    };
-mk_ingress_config(ResourceId, #{ingress := #{remote := _}} = Conf) ->
-    error({no_hookpoint_provided, ResourceId, Conf});
-mk_ingress_config(_ResourceId, #{}) ->
-    undefined.
-
-mk_egress_config(#{egress := Egress = #{remote := _}}) ->
-    Egress;
-mk_egress_config(#{}) ->
-    undefined.
+    HookPoints = maps:get(hookpoints, IngressChannelConfig, []),
+    NewConf = IngressChannelConfig#{
+        on_message_received => {?MODULE, on_message_received, [HookPoints, ChannelId]},
+        ingress_list => [IngressChannelConfig]
+    },
+    emqx_bridge_mqtt_ingress:config(NewConf, ChannelId, TopicToHandlerIndex).
 
 mk_client_opts(
     ResourceId,
-    ClientScope,
     Config = #{
         server := Server,
         keepalive := KeepAlive,
@@ -327,14 +398,15 @@ mk_client_opts(
             % A load balancing server (such as haproxy) is often set up before the emqx broker server.
             % When the load balancing server enables mqtt connection packet inspection,
             % non-standard mqtt connection packets might be filtered out by LB.
-            bridge_mode
+            bridge_mode,
+            topic_to_handler_index
         ],
         Config
     ),
     Name = parse_id_to_name(ResourceId),
     mk_client_opt_password(Options#{
         hosts => [HostPort],
-        clientid => clientid(Name, ClientScope, Config),
+        clientid => clientid(Name, Config),
         connect_timeout => 30,
         keepalive => ms_to_s(KeepAlive),
         force_ping => true,
@@ -357,9 +429,75 @@ mk_client_opt_password(Options) ->
 ms_to_s(Ms) ->
     erlang:ceil(Ms / 1000).
 
-clientid(Name, ClientScope, _Conf = #{clientid_prefix := Prefix}) when
+clientid(Name, _Conf = #{clientid_prefix := Prefix}) when
     is_binary(Prefix) andalso Prefix =/= <<>>
 ->
-    emqx_bridge_mqtt_lib:clientid_base([Prefix, $:, Name, ClientScope]);
-clientid(Name, ClientScope, _Conf) ->
-    emqx_bridge_mqtt_lib:clientid_base([Name, ClientScope]).
+    emqx_bridge_mqtt_lib:clientid_base([Prefix, $:, Name]);
+clientid(Name, _Conf) ->
+    emqx_bridge_mqtt_lib:clientid_base([Name]).
+
+%% @doc Start an ingress bridge worker.
+-spec connect([option() | {ecpool_worker_id, pos_integer()}]) ->
+    {ok, pid()} | {error, _Reason}.
+connect(Options) ->
+    WorkerId = proplists:get_value(ecpool_worker_id, Options),
+    ?SLOG(debug, #{
+        msg => "ingress_client_starting",
+        options => emqx_utils:redact(Options)
+    }),
+    Name = proplists:get_value(name, Options),
+    WorkerId = proplists:get_value(ecpool_worker_id, Options),
+    WorkerId = proplists:get_value(ecpool_worker_id, Options),
+    ClientOpts = proplists:get_value(client_opts, Options),
+    case emqtt:start_link(mk_client_opts(Name, WorkerId, ClientOpts)) of
+        {ok, Pid} ->
+            connect(Pid, Name);
+        {error, Reason} = Error ->
+            ?SLOG(error, #{
+                msg => "client_start_failed",
+                config => emqx_utils:redact(ClientOpts),
+                reason => Reason
+            }),
+            Error
+    end.
+
+mk_client_opts(
+    Name,
+    WorkerId,
+    ClientOpts = #{
+        clientid := ClientId,
+        topic_to_handler_index := TopicToHandlerIndex
+    }
+) ->
+    ClientOpts#{
+        clientid := mk_clientid(WorkerId, ClientId),
+        msg_handler => mk_client_event_handler(Name, TopicToHandlerIndex)
+    }.
+
+mk_clientid(WorkerId, ClientId) ->
+    iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]).
+
+mk_client_event_handler(Name, TopicToHandlerIndex) ->
+    #{
+        publish => {fun emqx_bridge_mqtt_ingress:handle_publish/3, [Name, TopicToHandlerIndex]},
+        disconnected => {fun ?MODULE:handle_disconnect/1, []}
+    }.
+
+-spec connect(pid(), name()) ->
+    {ok, pid()} | {error, _Reason}.
+connect(Pid, Name) ->
+    case emqtt:connect(Pid) of
+        {ok, _Props} ->
+            {ok, Pid};
+        {error, Reason} = Error ->
+            ?SLOG(warning, #{
+                msg => "ingress_client_connect_failed",
+                reason => Reason,
+                name => Name
+            }),
+            _ = catch emqtt:stop(Pid),
+            Error
+    end.
+
+handle_disconnect(_Reason) ->
+    ok.

+ 67 - 2
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl

@@ -1,4 +1,4 @@
-%%--------------------------------------------------------------------
+%%-------------------------------------------------------------------
 %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
@@ -30,6 +30,10 @@
     parse_server/1
 ]).
 
+-export([
+    connector_examples/1
+]).
+
 -import(emqx_schema, [mk_duration/2]).
 
 -import(hoconsc, [mk/2, ref/2]).
@@ -61,6 +65,39 @@ fields("config") ->
                     }
                 )}
         ];
+fields("config_connector") ->
+    [
+        {enable,
+            mk(
+                boolean(),
+                #{
+                    desc => <<"Enable or disable this connector">>,
+                    default => true
+                }
+            )},
+        {description, emqx_schema:description_schema()},
+        {resource_opts,
+            mk(
+                hoconsc:ref(creation_opts),
+                #{
+                    required => false,
+                    desc => ?DESC(emqx_resource_schema, "creation_opts")
+                }
+            )},
+        {pool_size, fun egress_pool_size/1}
+        % {ingress,
+        %     mk(
+        %         hoconsc:array(
+        %             hoconsc:ref(connector_ingress)
+        %         ),
+        %         #{
+        %             required => {false, recursively},
+        %             desc => ?DESC("ingress_desc")
+        %         }
+        %     )}
+    ] ++ fields("server_configs");
+fields(creation_opts) ->
+    emqx_connector_schema:resource_opts_fields();
 fields("server_configs") ->
     [
         {mode,
@@ -131,6 +168,7 @@ fields("server_configs") ->
 fields("ingress") ->
     [
         {pool_size, fun ingress_pool_size/1},
+        %% array
         {remote,
             mk(
                 ref(?MODULE, "ingress_remote"),
@@ -144,6 +182,22 @@ fields("ingress") ->
                 }
             )}
     ];
+fields(connector_ingress) ->
+    [
+        {remote,
+            mk(
+                ref(?MODULE, "ingress_remote"),
+                #{desc => ?DESC("ingress_remote")}
+            )},
+        {local,
+            mk(
+                ref(?MODULE, "ingress_local"),
+                #{
+                    desc => ?DESC("ingress_local"),
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )}
+    ];
 fields("ingress_remote") ->
     [
         {topic,
@@ -269,7 +323,15 @@ fields("egress_remote") ->
                     desc => ?DESC("payload")
                 }
             )}
-    ].
+    ];
+fields("get_connector") ->
+    fields("config_connector");
+fields("post_connector") ->
+    fields("config_connector");
+fields("put_connector") ->
+    fields("config_connector");
+fields(What) ->
+    error({emqx_bridge_mqtt_connector_schema, missing_field_handler, What}).
 
 ingress_pool_size(desc) ->
     ?DESC("ingress_pool_size");
@@ -304,3 +366,6 @@ qos() ->
 parse_server(Str) ->
     #{hostname := Host, port := Port} = emqx_schema:parse_server(Str, ?MQTT_HOST_OPTS),
     {Host, Port}.
+
+connector_examples(_Method) ->
+    [#{}].

+ 0 - 84
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl

@@ -20,33 +20,16 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 
--behaviour(ecpool_worker).
-
-%% ecpool
--export([connect/1]).
-
 -export([
     config/1,
     send/3,
     send_async/4
 ]).
 
-%% management APIs
--export([
-    status/1,
-    info/1
-]).
-
--type name() :: term().
 -type message() :: emqx_types:message() | map().
 -type callback() :: {function(), [_Arg]} | {module(), atom(), [_Arg]}.
 -type remote_message() :: #mqtt_msg{}.
 
--type option() ::
-    {name, name()}
-    %% see `emqtt:option()`
-    | {client_opts, map()}.
-
 -type egress() :: #{
     local => #{
         topic => emqx_types:topic()
@@ -54,51 +37,6 @@
     remote := emqx_bridge_mqtt_msg:msgvars()
 }.
 
-%% @doc Start an ingress bridge worker.
--spec connect([option() | {ecpool_worker_id, pos_integer()}]) ->
-    {ok, pid()} | {error, _Reason}.
-connect(Options) ->
-    ?SLOG(debug, #{
-        msg => "egress_client_starting",
-        options => emqx_utils:redact(Options)
-    }),
-    Name = proplists:get_value(name, Options),
-    WorkerId = proplists:get_value(ecpool_worker_id, Options),
-    ClientOpts = proplists:get_value(client_opts, Options),
-    case emqtt:start_link(mk_client_opts(WorkerId, ClientOpts)) of
-        {ok, Pid} ->
-            connect(Pid, Name);
-        {error, Reason} = Error ->
-            ?SLOG(error, #{
-                msg => "egress_client_start_failed",
-                config => emqx_utils:redact(ClientOpts),
-                reason => Reason
-            }),
-            Error
-    end.
-
-mk_client_opts(WorkerId, ClientOpts = #{clientid := ClientId}) ->
-    ClientOpts#{clientid := mk_clientid(WorkerId, ClientId)}.
-
-mk_clientid(WorkerId, ClientId) ->
-    emqx_bridge_mqtt_lib:bytes23(ClientId, WorkerId).
-
-connect(Pid, Name) ->
-    case emqtt:connect(Pid) of
-        {ok, _Props} ->
-            {ok, Pid};
-        {error, Reason} = Error ->
-            ?SLOG(warning, #{
-                msg => "egress_client_connect_failed",
-                reason => Reason,
-                name => Name
-            }),
-            _ = catch emqtt:stop(Pid),
-            Error
-    end.
-
-%%
-
 -spec config(map()) ->
     egress().
 config(#{remote := RC = #{}} = Conf) ->
@@ -137,25 +75,3 @@ to_remote_msg(Msg = #{}, Remote) ->
         props = emqx_utils:pub_props_to_packet(PubProps),
         payload = Payload
     }.
-
-%%
-
--spec info(pid()) ->
-    [{atom(), term()}].
-info(Pid) ->
-    emqtt:info(Pid).
-
--spec status(pid()) ->
-    emqx_resource:resource_status().
-status(Pid) ->
-    try
-        case proplists:get_value(socket, info(Pid)) of
-            Socket when Socket /= undefined ->
-                connected;
-            undefined ->
-                connecting
-        end
-    catch
-        exit:{noproc, _} ->
-            disconnected
-    end.

+ 185 - 104
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl

@@ -17,129 +17,188 @@
 -module(emqx_bridge_mqtt_ingress).
 
 -include_lib("emqx/include/logger.hrl").
-
--behaviour(ecpool_worker).
-
-%% ecpool
--export([connect/1]).
+-include_lib("emqx/include/emqx_mqtt.hrl").
 
 %% management APIs
 -export([
     status/1,
-    info/1
+    info/1,
+    subscribe_channel/2,
+    unsubscribe_channel/4,
+    config/3
 ]).
 
--export([handle_publish/5]).
--export([handle_disconnect/1]).
-
--type name() :: term().
+-export([handle_publish/3]).
+
+subscribe_channel(PoolName, ChannelConfig) ->
+    Workers = ecpool:workers(PoolName),
+    PoolSize = length(Workers),
+    Results = [
+        subscribe_channel(Pid, Name, ChannelConfig, Idx, PoolSize)
+     || {{Name, Idx}, Pid} <- Workers
+    ],
+    case proplists:get_value(error, Results, ok) of
+        ok ->
+            ok;
+        Error ->
+            Error
+    end.
 
--type option() ::
-    {name, name()}
-    | {ingress, map()}
-    %% see `emqtt:option()`
-    | {client_opts, map()}.
+subscribe_channel(WorkerPid, Name, Ingress, WorkerIdx, PoolSize) ->
+    case ecpool_worker:client(WorkerPid) of
+        {ok, Client} ->
+            subscribe_channel_helper(Client, Name, Ingress, WorkerIdx, PoolSize);
+        {error, Reason} ->
+            error({client_not_found, Reason})
+    end.
 
--type ingress() :: #{
-    server := string(),
-    remote := #{
-        topic := emqx_types:topic(),
-        qos => emqx_types:qos()
-    },
-    local := emqx_bridge_mqtt_msg:msgvars(),
-    on_message_received := {module(), atom(), [term()]}
-}.
-
-%% @doc Start an ingress bridge worker.
--spec connect([option() | {ecpool_worker_id, pos_integer()}]) ->
-    {ok, pid()} | {error, _Reason}.
-connect(Options) ->
-    ?SLOG(debug, #{
-        msg => "ingress_client_starting",
-        options => emqx_utils:redact(Options)
-    }),
-    Name = proplists:get_value(name, Options),
-    WorkerId = proplists:get_value(ecpool_worker_id, Options),
-    Ingress = config(proplists:get_value(ingress, Options), Name),
-    ClientOpts = proplists:get_value(client_opts, Options),
-    case emqtt:start_link(mk_client_opts(Name, WorkerId, Ingress, ClientOpts)) of
-        {ok, Pid} ->
-            connect(Pid, Name, Ingress);
+subscribe_channel_helper(Client, Name, Ingress, WorkerIdx, PoolSize) ->
+    IngressList = maps:get(ingress_list, Ingress, []),
+    SubscribeResults = subscribe_remote_topics(
+        Client, IngressList, WorkerIdx, PoolSize, Name
+    ),
+    %% Find error if any using proplists:get_value/2
+    case proplists:get_value(error, SubscribeResults, ok) of
+        ok ->
+            ok;
         {error, Reason} = Error ->
             ?SLOG(error, #{
-                msg => "client_start_failed",
-                config => emqx_utils:redact(ClientOpts),
+                msg => "ingress_client_subscribe_failed",
+                ingress => Ingress,
+                name => Name,
                 reason => Reason
             }),
             Error
     end.
 
-mk_client_opts(Name, WorkerId, Ingress, ClientOpts = #{clientid := ClientId}) ->
-    ClientOpts#{
-        clientid := mk_clientid(WorkerId, ClientId),
-        msg_handler => mk_client_event_handler(Name, Ingress)
-    }.
+subscribe_remote_topics(Pid, IngressList, WorkerIdx, PoolSize, Name) ->
+    [subscribe_remote_topic(Pid, Ingress, WorkerIdx, PoolSize, Name) || Ingress <- IngressList].
 
-mk_clientid(WorkerId, ClientId) ->
-    emqx_bridge_mqtt_lib:bytes23(ClientId, WorkerId).
-
-mk_client_event_handler(Name, Ingress = #{}) ->
-    IngressVars = maps:with([server], Ingress),
-    OnMessage = maps:get(on_message_received, Ingress, undefined),
-    LocalPublish =
-        case Ingress of
-            #{local := Local = #{topic := _}} ->
-                Local;
-            #{} ->
-                undefined
-        end,
-    #{
-        publish => {fun ?MODULE:handle_publish/5, [Name, OnMessage, LocalPublish, IngressVars]},
-        disconnected => {fun ?MODULE:handle_disconnect/1, []}
-    }.
+subscribe_remote_topic(
+    Pid, #{remote := #{topic := RemoteTopic, qos := QoS}} = _Remote, WorkerIdx, PoolSize, Name
+) ->
+    case should_subscribe(RemoteTopic, WorkerIdx, PoolSize, Name, _LogWarn = true) of
+        true ->
+            emqtt:subscribe(Pid, RemoteTopic, QoS);
+        false ->
+            ok
+    end.
 
--spec connect(pid(), name(), ingress()) ->
-    {ok, pid()} | {error, _Reason}.
-connect(Pid, Name, Ingress) ->
-    case emqtt:connect(Pid) of
-        {ok, _Props} ->
-            case subscribe_remote_topic(Pid, Ingress) of
-                {ok, _, _RCs} ->
-                    {ok, Pid};
-                {error, Reason} = Error ->
-                    ?SLOG(error, #{
-                        msg => "ingress_client_subscribe_failed",
-                        ingress => Ingress,
-                        name => Name,
-                        reason => Reason
-                    }),
-                    _ = catch emqtt:stop(Pid),
-                    Error
-            end;
-        {error, Reason} = Error ->
+should_subscribe(RemoteTopic, WorkerIdx, PoolSize, Name, LogWarn) ->
+    IsFirstWorker = WorkerIdx == 1,
+    case emqx_topic:parse(RemoteTopic) of
+        {#share{} = _Filter, _SubOpts} ->
+            % NOTE: this is shared subscription, many workers may subscribe
+            true;
+        {_Filter, #{}} when PoolSize > 1, IsFirstWorker, LogWarn ->
+            % NOTE: this is regular subscription, only one worker should subscribe
             ?SLOG(warning, #{
-                msg => "ingress_client_connect_failed",
-                reason => Reason,
-                name => Name
+                msg => "mqtt_pool_size_ignored",
+                connector => Name,
+                reason =>
+                    "Remote topic filter is not a shared subscription, "
+                    "only a single connection will be used from the connection pool",
+                config_pool_size => PoolSize,
+                pool_size => PoolSize
             }),
-            _ = catch emqtt:stop(Pid),
-            Error
+            IsFirstWorker;
+        {_Filter, #{}} ->
+            % NOTE: this is regular subscription, only one worker should subscribe
+            IsFirstWorker
     end.
 
-subscribe_remote_topic(Pid, #{remote := #{topic := RemoteTopic, qos := QoS}}) ->
-    emqtt:subscribe(Pid, RemoteTopic, QoS).
+unsubscribe_channel(PoolName, ChannelConfig, ChannelId, TopicToHandlerIndex) ->
+    Workers = ecpool:workers(PoolName),
+    PoolSize = length(Workers),
+    _ = [
+        unsubscribe_channel(Pid, Name, ChannelConfig, Idx, PoolSize, ChannelId, TopicToHandlerIndex)
+     || {{Name, Idx}, Pid} <- Workers
+    ],
+    ok.
 
-%%
+unsubscribe_channel(WorkerPid, Name, Ingress, WorkerIdx, PoolSize, ChannelId, TopicToHandlerIndex) ->
+    case ecpool_worker:client(WorkerPid) of
+        {ok, Client} ->
+            unsubscribe_channel_helper(
+                Client, Name, Ingress, WorkerIdx, PoolSize, ChannelId, TopicToHandlerIndex
+            );
+        {error, Reason} ->
+            error({client_not_found, Reason})
+    end.
 
--spec config(map(), name()) ->
-    ingress().
-config(#{remote := RC, local := LC} = Conf, BridgeName) ->
-    Conf#{
+unsubscribe_channel_helper(
+    Client, Name, Ingress, WorkerIdx, PoolSize, ChannelId, TopicToHandlerIndex
+) ->
+    IngressList = maps:get(ingress_list, Ingress, []),
+    unsubscribe_remote_topics(
+        Client, IngressList, WorkerIdx, PoolSize, Name, ChannelId, TopicToHandlerIndex
+    ).
+
+unsubscribe_remote_topics(
+    Pid, IngressList, WorkerIdx, PoolSize, Name, ChannelId, TopicToHandlerIndex
+) ->
+    [
+        unsubscribe_remote_topic(
+            Pid, Ingress, WorkerIdx, PoolSize, Name, ChannelId, TopicToHandlerIndex
+        )
+     || Ingress <- IngressList
+    ].
+
+unsubscribe_remote_topic(
+    Pid,
+    #{remote := #{topic := RemoteTopic}} = _Remote,
+    WorkerIdx,
+    PoolSize,
+    Name,
+    ChannelId,
+    TopicToHandlerIndex
+) ->
+    emqx_topic_index:delete(RemoteTopic, ChannelId, TopicToHandlerIndex),
+    case should_subscribe(RemoteTopic, WorkerIdx, PoolSize, Name, _NoWarn = false) of
+        true ->
+            case emqtt:unsubscribe(Pid, RemoteTopic) of
+                {ok, _Properties, _ReasonCodes} ->
+                    ok;
+                {error, Reason} ->
+                    ?SLOG(warning, #{
+                        msg => "unsubscribe_mqtt_topic_failed",
+                        channel_id => Name,
+                        reason => Reason
+                    }),
+                    ok
+            end;
+        false ->
+            ok
+    end.
+
+config(#{ingress_list := IngressList} = Conf, Name, TopicToHandlerIndex) ->
+    NewIngressList = [
+        fix_remote_config(Ingress, Name, TopicToHandlerIndex, Conf)
+     || Ingress <- IngressList
+    ],
+    Conf#{ingress_list => NewIngressList}.
+
+fix_remote_config(#{remote := RC, local := LC}, BridgeName, TopicToHandlerIndex, Conf) ->
+    FixedConf = Conf#{
         remote => parse_remote(RC, BridgeName),
         local => emqx_bridge_mqtt_msg:parse(LC)
-    }.
+    },
+    insert_to_topic_to_handler_index(FixedConf, TopicToHandlerIndex, BridgeName),
+    FixedConf.
+
+insert_to_topic_to_handler_index(
+    #{remote := #{topic := Topic}} = Conf, TopicToHandlerIndex, BridgeName
+) ->
+    TopicPattern =
+        case emqx_topic:parse(Topic) of
+            {#share{group = _Group, topic = TP}, _} ->
+                TP;
+            _ ->
+                Topic
+        end,
+    emqx_topic_index:insert(TopicPattern, BridgeName, Conf, TopicToHandlerIndex).
 
-parse_remote(#{qos := QoSIn} = Conf, BridgeName) ->
+parse_remote(#{qos := QoSIn} = Remote, BridgeName) ->
     QoS = downgrade_ingress_qos(QoSIn),
     case QoS of
         QoSIn ->
@@ -152,7 +211,7 @@ parse_remote(#{qos := QoSIn} = Conf, BridgeName) ->
                 name => BridgeName
             })
     end,
-    Conf#{qos => QoS}.
+    Remote#{qos => QoS}.
 
 downgrade_ingress_qos(2) ->
     1;
@@ -183,17 +242,39 @@ status(Pid) ->
 
 %%
 
-handle_publish(#{properties := Props} = MsgIn, Name, OnMessage, LocalPublish, IngressVars) ->
-    Msg = import_msg(MsgIn, IngressVars),
+handle_publish(
+    #{properties := Props, topic := Topic} = MsgIn,
+    Name,
+    TopicToHandlerIndex
+) ->
     ?SLOG(debug, #{
         msg => "ingress_publish_local",
-        message => Msg,
+        message => MsgIn,
         name => Name
     }),
-    maybe_on_message_received(Msg, OnMessage),
-    maybe_publish_local(Msg, LocalPublish, Props).
+    Matches = emqx_topic_index:matches(Topic, TopicToHandlerIndex, []),
+    lists:foreach(
+        fun(Match) ->
+            handle_match(TopicToHandlerIndex, Match, MsgIn, Name, Props)
+        end,
+        Matches
+    ),
+    ok.
 
-handle_disconnect(_Reason) ->
+handle_match(
+    TopicToHandlerIndex,
+    Match,
+    MsgIn,
+    _Name,
+    Props
+) ->
+    [ChannelConfig] = emqx_topic_index:get_record(Match, TopicToHandlerIndex),
+    #{on_message_received := OnMessage} = ChannelConfig,
+    Msg = import_msg(MsgIn, ChannelConfig),
+
+    maybe_on_message_received(Msg, OnMessage),
+    LocalPublish = maps:get(local, ChannelConfig, undefined),
+    _ = maybe_publish_local(Msg, LocalPublish, Props),
     ok.
 
 maybe_on_message_received(Msg, {Mod, Func, Args}) ->

+ 221 - 0
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_action_info.erl

@@ -0,0 +1,221 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_mqtt_pubsub_action_info).
+
+-behaviour(emqx_action_info).
+
+-export([
+    bridge_v1_type_name/0,
+    action_type_name/0,
+    connector_type_name/0,
+    schema_module/0,
+    bridge_v1_config_to_connector_config/1,
+    bridge_v1_config_to_action_config/2,
+    connector_action_config_to_bridge_v1_config/2,
+    is_source/0
+]).
+
+bridge_v1_type_name() -> mqtt.
+
+action_type_name() -> mqtt.
+
+connector_type_name() -> mqtt.
+
+schema_module() -> emqx_bridge_mqtt_pubsub_schema.
+
+is_source() -> true.
+
+bridge_v1_config_to_connector_config(Config) ->
+    %% Transform the egress part to mqtt_publisher connector config
+    SimplifiedConfig = check_and_simplify_bridge_v1_config(Config),
+    ConnectorConfigMap = make_connector_config_from_bridge_v1_config(SimplifiedConfig),
+    {mqtt, ConnectorConfigMap}.
+
+make_connector_config_from_bridge_v1_config(Config) ->
+    ConnectorConfigSchema = emqx_bridge_mqtt_connector_schema:fields("config_connector"),
+    ConnectorTopFields = [
+        erlang:atom_to_binary(FieldName, utf8)
+     || {FieldName, _} <- ConnectorConfigSchema
+    ],
+    ConnectorConfigMap = maps:with(ConnectorTopFields, Config),
+    ResourceOptsSchema = emqx_bridge_mqtt_connector_schema:fields(creation_opts),
+    ResourceOptsTopFields = [
+        erlang:atom_to_binary(FieldName, utf8)
+     || {FieldName, _} <- ResourceOptsSchema
+    ],
+    ResourceOptsMap = maps:get(<<"resource_opts">>, ConnectorConfigMap, #{}),
+    ResourceOptsMap2 = maps:with(ResourceOptsTopFields, ResourceOptsMap),
+    ConnectorConfigMap2 = maps:put(<<"resource_opts">>, ResourceOptsMap2, ConnectorConfigMap),
+    IngressMap0 = maps:get(<<"ingress">>, Config, #{}),
+    EgressMap = maps:get(<<"egress">>, Config, #{}),
+    % %% Move pool_size to the top level
+    PoolSizeIngress = maps:get(<<"pool_size">>, IngressMap0, undefined),
+    PoolSize =
+        case PoolSizeIngress of
+            undefined ->
+                DefaultPoolSize = emqx_connector_schema_lib:pool_size(default),
+                maps:get(<<"pool_size">>, EgressMap, DefaultPoolSize);
+            _ ->
+                PoolSizeIngress
+        end,
+    % IngressMap1 = maps:remove(<<"pool_size">>, IngressMap0),
+    %% Remove ingress part from the config
+    ConnectorConfigMap3 = maps:remove(<<"ingress">>, ConnectorConfigMap2),
+    %% Remove egress part from the config
+    ConnectorConfigMap4 = maps:remove(<<"egress">>, ConnectorConfigMap3),
+    ConnectorConfigMap5 = maps:put(<<"pool_size">>, PoolSize, ConnectorConfigMap4),
+    % ConnectorConfigMap4 =
+    %     case IngressMap1 =:= #{} of
+    %         true ->
+    %             ConnectorConfigMap3;
+    %         false ->
+    %             maps:put(<<"ingress">>, [IngressMap1], ConnectorConfigMap3)
+    %     end,
+    ConnectorConfigMap5.
+
+bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
+    SimplifiedConfig = check_and_simplify_bridge_v1_config(BridgeV1Config),
+    bridge_v1_config_to_action_config_helper(
+        SimplifiedConfig, ConnectorName
+    ).
+
+bridge_v1_config_to_action_config_helper(
+    #{
+        <<"egress">> := EgressMap0
+    } = Config,
+    ConnectorName
+) ->
+    %% Transform the egress part to mqtt_publisher connector config
+    SchemaFields = emqx_bridge_mqtt_pubsub_schema:fields("mqtt_publisher_action"),
+    ResourceOptsSchemaFields = emqx_bridge_mqtt_pubsub_schema:fields("resource_opts"),
+    ConfigMap1 = general_action_conf_map_from_bridge_v1_config(
+        Config, ConnectorName, SchemaFields, ResourceOptsSchemaFields
+    ),
+    LocalTopicMap = maps:get(<<"local">>, EgressMap0, #{}),
+    LocalTopic = maps:get(<<"topic">>, LocalTopicMap, undefined),
+    EgressMap1 = maps:remove(<<"local">>, EgressMap0),
+    %% Add parameters field (Egress map) to the action config
+    ConfigMap2 = maps:put(<<"parameters">>, EgressMap1, ConfigMap1),
+    ConfigMap3 =
+        case LocalTopic of
+            undefined ->
+                ConfigMap2;
+            _ ->
+                maps:put(<<"local_topic">>, LocalTopic, ConfigMap2)
+        end,
+    {action, mqtt, ConfigMap3};
+bridge_v1_config_to_action_config_helper(
+    #{
+        <<"ingress">> := IngressMap
+    } = Config,
+    ConnectorName
+) ->
+    %% Transform the egress part to mqtt_publisher connector config
+    SchemaFields = emqx_bridge_mqtt_pubsub_schema:fields("mqtt_subscriber_source"),
+    ResourceOptsSchemaFields = emqx_bridge_mqtt_pubsub_schema:fields("resource_opts"),
+    ConfigMap1 = general_action_conf_map_from_bridge_v1_config(
+        Config, ConnectorName, SchemaFields, ResourceOptsSchemaFields
+    ),
+    IngressMap1 = maps:remove(<<"pool_size">>, IngressMap),
+    %% Add parameters field (Egress map) to the action config
+    ConfigMap2 = maps:put(<<"parameters">>, IngressMap1, ConfigMap1),
+    {source, mqtt, ConfigMap2};
+bridge_v1_config_to_action_config_helper(
+    _Config,
+    _ConnectorName
+) ->
+    error({incompatible_bridge_v1, no_matching_action_or_source}).
+
+general_action_conf_map_from_bridge_v1_config(
+    Config, ConnectorName, SchemaFields, ResourceOptsSchemaFields
+) ->
+    ShemaFieldsNames = [
+        erlang:atom_to_binary(FieldName, utf8)
+     || {FieldName, _} <- SchemaFields
+    ],
+    ActionConfig0 = maps:with(ShemaFieldsNames, Config),
+    ResourceOptsSchemaFieldsNames = [
+        erlang:atom_to_binary(FieldName, utf8)
+     || {FieldName, _} <- ResourceOptsSchemaFields
+    ],
+    ResourceOptsMap = maps:get(<<"resource_opts">>, ActionConfig0, #{}),
+    ResourceOptsMap2 = maps:with(ResourceOptsSchemaFieldsNames, ResourceOptsMap),
+    %% Only put resource_opts if the original config has it
+    ActionConfig1 =
+        case maps:is_key(<<"resource_opts">>, ActionConfig0) of
+            true ->
+                maps:put(<<"resource_opts">>, ResourceOptsMap2, ActionConfig0);
+            false ->
+                ActionConfig0
+        end,
+    ActionConfig2 = maps:put(<<"connector">>, ConnectorName, ActionConfig1),
+    ActionConfig2.
+
+check_and_simplify_bridge_v1_config(
+    #{
+        <<"egress">> := EgressMap
+    } = Config
+) when map_size(EgressMap) =:= 0 ->
+    check_and_simplify_bridge_v1_config(maps:remove(<<"egress">>, Config));
+check_and_simplify_bridge_v1_config(
+    #{
+        <<"ingress">> := IngressMap
+    } = Config
+) when map_size(IngressMap) =:= 0 ->
+    check_and_simplify_bridge_v1_config(maps:remove(<<"ingress">>, Config));
+check_and_simplify_bridge_v1_config(#{
+    <<"egress">> := _EGressMap,
+    <<"ingress">> := _InGressMap
+}) ->
+    %% We should crash beacuse we don't support upgrading when ingress and egress exist at the same time
+    error(
+        {unsupported_config,
+            <<"Upgrade not supported when ingress and egress exist in the same MQTT bridge. Please divide the egress and ingress part to separate bridges in the configuration.">>}
+    );
+check_and_simplify_bridge_v1_config(SimplifiedConfig) ->
+    SimplifiedConfig.
+
+connector_action_config_to_bridge_v1_config(
+    ConnectorConfig, ActionConfig
+) ->
+    Params = maps:get(<<"parameters">>, ActionConfig, #{}),
+    ResourceOptsConnector = maps:get(<<"resource_opts">>, ConnectorConfig, #{}),
+    ResourceOptsAction = maps:get(<<"resource_opts">>, ActionConfig, #{}),
+    ResourceOpts = maps:merge(ResourceOptsConnector, ResourceOptsAction),
+    %% Check the direction of the action
+    Direction =
+        case maps:get(<<"remote">>, Params) of
+            #{<<"retain">> := _} ->
+                %% Only source has retain
+                <<"publisher">>;
+            _ ->
+                <<"subscriber">>
+        end,
+    Parms2 = maps:remove(<<"direction">>, Params),
+    DefaultPoolSize = emqx_connector_schema_lib:pool_size(default),
+    PoolSize = maps:get(<<"pool_size">>, ConnectorConfig, DefaultPoolSize),
+    Parms3 = maps:put(<<"pool_size">>, PoolSize, Parms2),
+    ConnectorConfig2 = maps:remove(<<"pool_size">>, ConnectorConfig),
+    LocalTopic = maps:get(<<"local_topic">>, ActionConfig, undefined),
+    BridgeV1Conf0 =
+        case {Direction, LocalTopic} of
+            {<<"publisher">>, undefined} ->
+                #{<<"egress">> => Parms3};
+            {<<"publisher">>, LocalT} ->
+                #{
+                    <<"egress">> => Parms3,
+                    <<"local">> =>
+                        #{
+                            <<"topic">> => LocalT
+                        }
+                };
+            {<<"subscriber">>, _} ->
+                #{<<"ingress">> => Parms3}
+        end,
+    BridgeV1Conf1 = maps:merge(BridgeV1Conf0, ConnectorConfig2),
+    BridgeV1Conf2 = BridgeV1Conf1#{
+        <<"resource_opts">> => ResourceOpts
+    },
+    BridgeV1Conf2.

+ 129 - 0
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl

@@ -0,0 +1,129 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_mqtt_pubsub_schema).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+-import(hoconsc, [mk/2, ref/2]).
+
+-export([roots/0, fields/1, desc/1, namespace/0]).
+
+-export([
+    bridge_v2_examples/1,
+    conn_bridge_examples/1
+]).
+
+%%======================================================================================
+%% Hocon Schema Definitions
+namespace() -> "bridge_mqtt_publisher".
+
+roots() -> [].
+
+fields(action) ->
+    {mqtt,
+        mk(
+            hoconsc:map(name, ref(?MODULE, "mqtt_publisher_action")),
+            #{
+                desc => <<"MQTT Publisher Action Config">>,
+                required => false
+            }
+        )};
+fields("mqtt_publisher_action") ->
+    emqx_bridge_v2_schema:make_producer_action_schema(
+        hoconsc:mk(
+            hoconsc:ref(?MODULE, action_parameters),
+            #{
+                required => true,
+                desc => ?DESC("action_parameters")
+            }
+        )
+    );
+fields(action_parameters) ->
+    Fields0 = emqx_bridge_mqtt_connector_schema:fields("egress"),
+    Fields1 = proplists:delete(pool_size, Fields0),
+    Fields2 = proplists:delete(local, Fields1),
+    Fields2;
+fields(source) ->
+    {mqtt,
+        mk(
+            hoconsc:map(name, ref(?MODULE, "mqtt_subscriber_source")),
+            #{
+                desc => <<"MQTT Subscriber Source Config">>,
+                required => false
+            }
+        )};
+fields("mqtt_subscriber_source") ->
+    emqx_bridge_v2_schema:make_consumer_action_schema(
+        hoconsc:mk(
+            hoconsc:ref(?MODULE, ingress_parameters),
+            #{
+                required => true,
+                desc => ?DESC("source_parameters")
+            }
+        )
+    );
+fields(ingress_parameters) ->
+    Fields0 = emqx_bridge_mqtt_connector_schema:fields("ingress"),
+    Fields1 = proplists:delete(pool_size, Fields0),
+    Fields1;
+fields("resource_opts") ->
+    UnsupportedOpts = [enable_batch, batch_size, batch_time],
+    lists:filter(
+        fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end,
+        emqx_resource_schema:fields("creation_opts")
+    );
+fields("get_connector") ->
+    emqx_bridge_mqtt_connector_schema:fields("config_connector");
+fields("get_bridge_v2") ->
+    fields("mqtt_publisher_action");
+fields("post_bridge_v2") ->
+    fields("mqtt_publisher_action");
+fields("put_bridge_v2") ->
+    fields("mqtt_publisher_action");
+fields(What) ->
+    error({emqx_bridge_mqtt_pubsub_schema, missing_field_handler, What}).
+%% v2: api schema
+%% The parameter equls to
+%%   `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1
+%%   `get_connector`, `post_connector`, `put_connector` from emqx_connector_schema:api_schema/1
+%%--------------------------------------------------------------------
+%% v1/v2
+
+desc("config") ->
+    ?DESC("desc_config");
+desc("resource_opts") ->
+    ?DESC(emqx_resource_schema, "creation_opts");
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+    ["Configuration for WebHook using `", string:to_upper(Method), "` method."];
+desc("config_connector") ->
+    ?DESC("desc_config");
+desc("http_action") ->
+    ?DESC("desc_config");
+desc("parameters_opts") ->
+    ?DESC("config_parameters_opts");
+desc(_) ->
+    undefined.
+
+bridge_v2_examples(_Method) ->
+    [
+        #{}
+    ].
+
+conn_bridge_examples(_Method) ->
+    [
+        #{}
+    ].

+ 18 - 24
apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl

@@ -238,7 +238,8 @@ t_conf_bridge_authn_passfile(Config) ->
             post,
             uri(["bridges"]),
             ?SERVER_CONF(<<>>, <<"file://im/pretty/sure/theres/no/such/file">>)#{
-                <<"name">> => <<"t_conf_bridge_authn_no_passfile">>
+                <<"name">> => <<"t_conf_bridge_authn_no_passfile">>,
+                <<"ingress">> => ?INGRESS_CONF#{<<"pool_size">> => 1}
             }
         ),
     ?assertMatch({match, _}, re:run(Reason, <<"failed_to_read_secret_file">>)).
@@ -397,32 +398,25 @@ t_mqtt_conn_bridge_ingress_shared_subscription(_) ->
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
     ok.
 
-t_mqtt_egress_bridge_ignores_clean_start(_) ->
+t_mqtt_egress_bridge_warns_clean_start(_) ->
     BridgeName = atom_to_binary(?FUNCTION_NAME),
-    BridgeID = create_bridge(
-        ?SERVER_CONF#{
-            <<"name">> => BridgeName,
-            <<"egress">> => ?EGRESS_CONF,
-            <<"clean_start">> => false
-        }
-    ),
+    Action = fun() ->
+        BridgeID = create_bridge(
+            ?SERVER_CONF#{
+                <<"name">> => BridgeName,
+                <<"egress">> => ?EGRESS_CONF,
+                <<"clean_start">> => false
+            }
+        ),
 
-    ResourceID = emqx_bridge_resource:resource_id(BridgeID),
-    {ok, _Group, #{state := #{egress_pool_name := EgressPoolName}}} =
-        emqx_resource_manager:lookup_cached(ResourceID),
-    ClientInfo = ecpool:pick_and_do(
-        EgressPoolName,
-        {emqx_bridge_mqtt_egress, info, []},
-        no_handover
-    ),
-    ?assertMatch(
-        #{clean_start := true},
-        maps:from_list(ClientInfo)
+        %% delete the bridge
+        {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), [])
+    end,
+    ?wait_async_action(
+        Action(),
+        #{?snk_kind := mqtt_clean_start_egress_action_warning},
+        10000
     ),
-
-    %% delete the bridge
-    {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
-
     ok.
 
 t_mqtt_conn_bridge_ingress_downgrades_qos_2(_) ->

+ 20 - 3
apps/emqx_connector/src/emqx_connector_resource.erl

@@ -51,11 +51,11 @@
 
 -export([parse_url/1]).
 
--callback connector_config(ParsedConfig) ->
+-callback connector_config(ParsedConfig, Context) ->
     ParsedConfig
 when
-    ParsedConfig :: #{atom() => any()}.
--optional_callbacks([connector_config/1]).
+    ParsedConfig :: #{atom() => any()}, Context :: #{atom() => any()}.
+-optional_callbacks([connector_config/2]).
 
 -if(?EMQX_RELEASE_EDITION == ee).
 connector_to_resource_type(ConnectorType) ->
@@ -81,6 +81,10 @@ connector_impl_module(_ConnectorType) ->
 
 connector_to_resource_type_ce(http) ->
     emqx_bridge_http_connector;
+connector_to_resource_type_ce(mqtt) ->
+    emqx_bridge_mqtt_connector;
+% connector_to_resource_type_ce(mqtt_subscriber) ->
+%     emqx_bridge_mqtt_subscriber_connector;
 connector_to_resource_type_ce(ConnectorType) ->
     error({no_bridge_v2, ConnectorType}).
 
@@ -276,6 +280,12 @@ remove(Type, Name, _Conf, _Opts) ->
     emqx_resource:remove_local(resource_id(Type, Name)).
 
 %% convert connector configs to what the connector modules want
+parse_confs(
+    <<"mqtt">> = Type,
+    Name,
+    Conf
+) ->
+    insert_hookpoints(Type, Name, Conf);
 parse_confs(
     <<"http">>,
     _Name,
@@ -307,6 +317,13 @@ parse_confs(
 parse_confs(ConnectorType, Name, Config) ->
     connector_config(ConnectorType, Name, Config).
 
+insert_hookpoints(Type, Name, Conf) ->
+    BId = emqx_bridge_resource:bridge_id(Type, Name),
+    BridgeHookpoint = emqx_bridge_resource:bridge_hookpoint(BId),
+    ConnectorHookpoint = connector_hookpoint(BId),
+    HookPoints = [BridgeHookpoint, ConnectorHookpoint],
+    Conf#{hookpoints => HookPoints}.
+
 connector_config(ConnectorType, Name, Config) ->
     Mod = connector_impl_module(ConnectorType),
     case erlang:function_exported(Mod, connector_config, 2) of

+ 114 - 47
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -90,7 +90,9 @@ api_schemas(Method) ->
     [
         %% We need to map the `type' field of a request (binary) to a
         %% connector schema module.
-        api_ref(emqx_bridge_http_schema, <<"http">>, Method ++ "_connector")
+        api_ref(emqx_bridge_http_schema, <<"http">>, Method ++ "_connector"),
+        % api_ref(emqx_bridge_mqtt_connector_schema, <<"mqtt_subscriber">>, Method ++ "_connector"),
+        api_ref(emqx_bridge_mqtt_connector_schema, <<"mqtt">>, Method ++ "_connector")
     ].
 
 api_ref(Module, Type, Method) ->
@@ -110,10 +112,11 @@ examples(Method) ->
 
 -if(?EMQX_RELEASE_EDITION == ee).
 schema_modules() ->
-    [emqx_bridge_http_schema] ++ emqx_connector_ee_schema:schema_modules().
+    [emqx_bridge_http_schema, emqx_bridge_mqtt_connector_schema] ++
+        emqx_connector_ee_schema:schema_modules().
 -else.
 schema_modules() ->
-    [emqx_bridge_http_schema].
+    [emqx_bridge_http_schema, emqx_bridge_mqtt_connector_schema].
 -endif.
 
 %% @doc Return old bridge(v1) and/or connector(v2) type
@@ -136,6 +139,8 @@ connector_type_to_bridge_types(influxdb) ->
     [influxdb, influxdb_api_v1, influxdb_api_v2];
 connector_type_to_bridge_types(mysql) ->
     [mysql];
+connector_type_to_bridge_types(mqtt) ->
+    [mqtt];
 connector_type_to_bridge_types(pgsql) ->
     [pgsql];
 connector_type_to_bridge_types(redis) ->
@@ -151,7 +156,8 @@ connector_type_to_bridge_types(iotdb) ->
 connector_type_to_bridge_types(elasticsearch) ->
     [elasticsearch].
 
-actions_config_name() -> <<"actions">>.
+actions_config_name(action) -> <<"actions">>;
+actions_config_name(source) -> <<"sources">>.
 
 has_connector_field(BridgeConf, ConnectorFields) ->
     lists:any(
@@ -185,40 +191,58 @@ bridge_configs_to_transform(
     end.
 
 split_bridge_to_connector_and_action(
-    {ConnectorsMap, {BridgeType, BridgeName, BridgeV1Conf, ConnectorFields, PreviousRawConfig}}
+    {
+        {ConnectorsMap, OrgConnectorType},
+        {BridgeType, BridgeName, BridgeV1Conf, ConnectorFields, PreviousRawConfig}
+    }
 ) ->
-    ConnectorMap =
+    {ConnectorMap, ConnectorType} =
         case emqx_action_info:has_custom_bridge_v1_config_to_connector_config(BridgeType) of
             true ->
-                emqx_action_info:bridge_v1_config_to_connector_config(
-                    BridgeType, BridgeV1Conf
-                );
+                case
+                    emqx_action_info:bridge_v1_config_to_connector_config(
+                        BridgeType, BridgeV1Conf
+                    )
+                of
+                    {ConType, ConMap} ->
+                        {ConMap, ConType};
+                    ConMap ->
+                        {ConMap, OrgConnectorType}
+                end;
             false ->
                 %% We do an automatic transformation to get the connector config
                 %% if the callback is not defined.
                 %% Get connector fields from bridge config
-                lists:foldl(
-                    fun({ConnectorFieldName, _Spec}, ToTransformSoFar) ->
-                        ConnectorFieldNameBin = to_bin(ConnectorFieldName),
-                        case maps:is_key(ConnectorFieldNameBin, BridgeV1Conf) of
-                            true ->
-                                PrevFieldConfig =
-                                    maybe_project_to_connector_resource_opts(
+                NewCConMap =
+                    lists:foldl(
+                        fun({ConnectorFieldName, _Spec}, ToTransformSoFar) ->
+                            ConnectorFieldNameBin = to_bin(ConnectorFieldName),
+                            case maps:is_key(ConnectorFieldNameBin, BridgeV1Conf) of
+                                true ->
+                                    PrevFieldConfig =
+                                        maybe_project_to_connector_resource_opts(
+                                            ConnectorFieldNameBin,
+                                            maps:get(ConnectorFieldNameBin, BridgeV1Conf)
+                                        ),
+                                    NewToTransform0 = maps:put(
                                         ConnectorFieldNameBin,
-                                        maps:get(ConnectorFieldNameBin, BridgeV1Conf)
+                                        PrevFieldConfig,
+                                        ToTransformSoFar
                                     ),
-                                maps:put(
-                                    ConnectorFieldNameBin,
-                                    PrevFieldConfig,
+                                    NewToTransform1 = maps:put(
+                                        to_bin(ConnectorFieldName),
+                                        maps:get(to_bin(ConnectorFieldName), BridgeV1Conf),
+                                        NewToTransform0
+                                    ),
+                                    NewToTransform1;
+                                false ->
                                     ToTransformSoFar
-                                );
-                            false ->
-                                ToTransformSoFar
-                        end
-                    end,
-                    #{},
-                    ConnectorFields
-                )
+                            end
+                        end,
+                        #{},
+                        ConnectorFields
+                    ),
+                {NewCConMap, OrgConnectorType}
         end,
     %% Generate a connector name, if needed.  Avoid doing so if there was a previous config.
     ConnectorName =
@@ -226,18 +250,29 @@ split_bridge_to_connector_and_action(
             #{<<"connector">> := ConnectorName0} -> ConnectorName0;
             _ -> generate_connector_name(ConnectorsMap, BridgeName, 0)
         end,
-    ActionMap =
+    OrgActionType = emqx_action_info:bridge_v1_type_to_action_type(BridgeType),
+    {ActionMap, ActionType, ActionOrSource} =
         case emqx_action_info:has_custom_bridge_v1_config_to_action_config(BridgeType) of
             true ->
-                emqx_action_info:bridge_v1_config_to_action_config(
-                    BridgeType, BridgeV1Conf, ConnectorName
-                );
+                case
+                    emqx_action_info:bridge_v1_config_to_action_config(
+                        BridgeType, BridgeV1Conf, ConnectorName
+                    )
+                of
+                    {ActionOrSource0, ActionType0, ActionMap0} ->
+                        {ActionMap0, ActionType0, ActionOrSource0};
+                    ActionMap0 ->
+                        {ActionMap0, OrgActionType, action}
+                end;
             false ->
-                transform_bridge_v1_config_to_action_config(
-                    BridgeV1Conf, ConnectorName, ConnectorFields
-                )
+                ActionMap0 =
+                    transform_bridge_v1_config_to_action_config(
+                        BridgeV1Conf, ConnectorName, ConnectorFields
+                    ),
+                {ActionMap0, OrgActionType}
         end,
-    {BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}.
+    {BridgeType, BridgeName, ActionMap, ActionType, ActionOrSource, ConnectorName, ConnectorMap,
+        ConnectorType}.
 
 maybe_project_to_connector_resource_opts(<<"resource_opts">>, OldResourceOpts) ->
     project_to_connector_resource_opts(OldResourceOpts);
@@ -307,9 +342,9 @@ generate_connector_name(ConnectorsMap, BridgeName, Attempt) ->
     ConnectorNameList =
         case Attempt of
             0 ->
-                io_lib:format("connector_~s", [BridgeName]);
+                io_lib:format("~s", [BridgeName]);
             _ ->
-                io_lib:format("connector_~s_~p", [BridgeName, Attempt + 1])
+                io_lib:format("~s_~p", [BridgeName, Attempt + 1])
         end,
     ConnectorName = iolist_to_binary(ConnectorNameList),
     case maps:is_key(ConnectorName, ConnectorsMap) of
@@ -340,7 +375,10 @@ transform_old_style_bridges_to_connector_and_actions_of_type(
         ),
     ConnectorsWithTypeMap = maps:get(to_bin(ConnectorType), ConnectorsConfMap, #{}),
     BridgeConfigsToTransformWithConnectorConf = lists:zip(
-        lists:duplicate(length(BridgeConfigsToTransform), ConnectorsWithTypeMap),
+        lists:duplicate(
+            length(BridgeConfigsToTransform),
+            {ConnectorsWithTypeMap, ConnectorType}
+        ),
         BridgeConfigsToTransform
     ),
     ActionConnectorTuples = lists:map(
@@ -349,10 +387,14 @@ transform_old_style_bridges_to_connector_and_actions_of_type(
     ),
     %% Add connectors and actions and remove bridges
     lists:foldl(
-        fun({BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}, RawConfigSoFar) ->
+        fun(
+            {BridgeType, BridgeName, ActionMap, NewActionType, ActionOrSource, ConnectorName,
+                ConnectorMap, NewConnectorType},
+            RawConfigSoFar
+        ) ->
             %% Add connector
             RawConfigSoFar1 = emqx_utils_maps:deep_put(
-                [<<"connectors">>, to_bin(ConnectorType), ConnectorName],
+                [<<"connectors">>, to_bin(NewConnectorType), ConnectorName],
                 RawConfigSoFar,
                 ConnectorMap
             ),
@@ -362,12 +404,21 @@ transform_old_style_bridges_to_connector_and_actions_of_type(
                 RawConfigSoFar1
             ),
             %% Add action
-            ActionType = emqx_action_info:bridge_v1_type_to_action_type(to_bin(BridgeType)),
-            RawConfigSoFar3 = emqx_utils_maps:deep_put(
-                [actions_config_name(), to_bin(ActionType), BridgeName],
-                RawConfigSoFar2,
-                ActionMap
-            ),
+            RawConfigSoFar3 =
+                case ActionMap of
+                    none ->
+                        RawConfigSoFar2;
+                    _ ->
+                        emqx_utils_maps:deep_put(
+                            [
+                                actions_config_name(ActionOrSource),
+                                to_bin(NewActionType),
+                                BridgeName
+                            ],
+                            RawConfigSoFar2,
+                            ActionMap
+                        )
+                end,
             RawConfigSoFar3
         end,
         RawConfig,
@@ -454,7 +505,23 @@ fields(connectors) ->
                     desc => <<"HTTP Connector Config">>,
                     required => false
                 }
+            )},
+        {mqtt,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_mqtt_connector_schema, "config_connector")),
+                #{
+                    desc => <<"MQTT Publisher Connector Config">>,
+                    required => false
+                }
             )}
+        % {mqtt_subscriber,
+        %     mk(
+        %         hoconsc:map(name, ref(emqx_bridge_mqtt_connector_schema, "config_connector")),
+        %         #{
+        %             desc => <<"MQTT Subscriber Connector Config">>,
+        %             required => false
+        %         }
+        %     )}
     ] ++ enterprise_fields_connectors();
 fields("node_status") ->
     [