Просмотр исходного кода

feat(bridge-mqtt): Update the configuration file to hocon (#5142)

turtleDeng 4 лет назад
Родитель
Сommit
d4f726419f

Разница между файлами не показана из-за своего большого размера
+ 52 - 170
apps/emqx_bridge_mqtt/etc/emqx_bridge_mqtt.conf


+ 0 - 244
apps/emqx_bridge_mqtt/priv/emqx_bridge_mqtt.schema

@@ -1,244 +0,0 @@
-%%-*- mode: erlang -*-
-%%--------------------------------------------------------------------
-%% Bridges
-%%--------------------------------------------------------------------
-{mapping, "bridge.mqtt.$name.address", "emqx_bridge_mqtt.bridges", [
-  {datatype, string}
-]}.
-
-{mapping, "bridge.mqtt.$name.proto_ver", "emqx_bridge_mqtt.bridges", [
-  {datatype, {enum, [mqttv3, mqttv4, mqttv5]}}
-]}.
-
-{mapping, "bridge.mqtt.$name.bridge_mode", "emqx_bridge_mqtt.bridges", [
-  {default, false},
-  {datatype, {enum, [true, false]}}
-]}.
-
-{mapping, "bridge.mqtt.$name.start_type", "emqx_bridge_mqtt.bridges", [
-  {datatype, {enum, [manual, auto]}},
-  {default, auto}
-]}.
-
-{mapping, "bridge.mqtt.$name.clientid", "emqx_bridge_mqtt.bridges", [
-  {datatype, string}
-]}.
-
-{mapping, "bridge.mqtt.$name.clean_start", "emqx_bridge_mqtt.bridges", [
-  {default, true},
-  {datatype, {enum, [true, false]}}
-]}.
-
-{mapping, "bridge.mqtt.$name.username", "emqx_bridge_mqtt.bridges", [
-  {datatype, string}
-]}.
-
-{mapping, "bridge.mqtt.$name.password", "emqx_bridge_mqtt.bridges", [
-  {datatype, string}
-]}.
-
-{mapping, "bridge.mqtt.$name.forwards", "emqx_bridge_mqtt.bridges", [
-  {datatype, string},
-  {default, ""}
-]}.
-
-{mapping, "bridge.mqtt.$name.forward_mountpoint", "emqx_bridge_mqtt.bridges", [
-  {datatype, string}
-]}.
-
-{mapping, "bridge.mqtt.$name.subscription.$id.topic", "emqx_bridge_mqtt.bridges", [
-  {datatype, string}
-]}.
-
-{mapping, "bridge.mqtt.$name.subscription.$id.qos", "emqx_bridge_mqtt.bridges", [
-  {datatype, integer}
-]}.
-
-{mapping, "bridge.mqtt.$name.receive_mountpoint", "emqx_bridge_mqtt.bridges", [
-  {datatype, string}
-]}.
-
-{mapping, "bridge.mqtt.$name.ssl", "emqx_bridge_mqtt.bridges", [
-  {datatype, flag},
-  {default, off}
-]}.
-
-{mapping, "bridge.mqtt.$name.cacertfile", "emqx_bridge_mqtt.bridges", [
-  {datatype, string}
-]}.
-
-{mapping, "bridge.mqtt.$name.certfile", "emqx_bridge_mqtt.bridges", [
-  {datatype, string}
-]}.
-
-{mapping, "bridge.mqtt.$name.keyfile", "emqx_bridge_mqtt.bridges", [
-  {datatype, string}
-]}.
-
-{mapping, "bridge.mqtt.$name.ciphers", "emqx_bridge_mqtt.bridges", [
-  {datatype, string}
-]}.
-
-{mapping, "bridge.mqtt.$name.psk_ciphers", "emqx_bridge_mqtt.bridges", [
-  {datatype, string}
-]}.
-
-{mapping, "bridge.mqtt.$name.keepalive", "emqx_bridge_mqtt.bridges", [
-  {default, "10s"},
-  {datatype, {duration, s}}
-]}.
-
-{mapping, "bridge.mqtt.$name.tls_versions", "emqx_bridge_mqtt.bridges", [
-  {datatype, string},
-  {default, "tlsv1.3,tlsv1.2,tlsv1.1,tlsv1"}
-]}.
-
-{mapping, "bridge.mqtt.$name.reconnect_interval", "emqx_bridge_mqtt.bridges", [
-  {default, "30s"},
-  {datatype, {duration, ms}}
-]}.
-
-{mapping, "bridge.mqtt.$name.retry_interval", "emqx_bridge_mqtt.bridges", [
-  {default, "20s"},
-  {datatype, {duration, s}}
-]}.
-
-{mapping, "bridge.mqtt.$name.max_inflight_size", "emqx_bridge_mqtt.bridges", [
-   {default, 0},
-   {datatype, integer}
- ]}.
-
-{mapping, "bridge.mqtt.$name.batch_size", "emqx_bridge_mqtt.bridges", [
-  {default, 0},
-  {datatype, integer}
-]}.
-
-{mapping, "bridge.mqtt.$name.queue.replayq_dir", "emqx_bridge_mqtt.bridges", [
-  {datatype, string}
-]}.
-
-{mapping, "bridge.mqtt.$name.queue.replayq_seg_bytes", "emqx_bridge_mqtt.bridges", [
-  {datatype, bytesize}
-]}.
-
-{mapping, "bridge.mqtt.$name.queue.max_total_size", "emqx_bridge_mqtt.bridges", [
-  {datatype, bytesize}
-]}.
-
-{translation, "emqx_bridge_mqtt.bridges", fun(Conf) ->
-
-    MapPSKCiphers = fun(PSKCiphers) ->
-                        lists:map(
-                          fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha};
-                             ("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha};
-                             ("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha};
-                             ("PSK-RC4-SHA") -> {psk, rc4_128, sha}
-                          end, PSKCiphers)
-                    end,
-
-    Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end,
-
-    IsSsl = fun(cacertfile)   -> true;
-               (certfile)     -> true;
-               (keyfile)      -> true;
-               (ciphers)      -> true;
-               (psk_ciphers)  -> true;
-               (tls_versions) -> true;
-               (_Opt)         -> false
-            end,
-
-    Parse = fun(tls_versions, Vers) ->
-                    [{versions, [list_to_atom(S) || S <- Split(Vers)]}];
-               (ciphers, Ciphers) ->
-                    [{ciphers, Split(Ciphers)}];
-               (psk_ciphers, Ciphers) ->
-                    [{ciphers, MapPSKCiphers(Split(Ciphers))}, {user_lookup_fun, {fun emqx_psk:lookup/3, <<>>}}];
-               (Opt, Val) ->
-                    [{Opt, Val}]
-            end,
-
-    Merge = fun(forwards, Val, Opts) ->
-                  [{forwards, string:tokens(Val, ",")}|Opts];
-               (Opt, Val, Opts) ->
-                  case IsSsl(Opt) of
-                      true ->
-                          SslOpts = Parse(Opt, Val) ++ proplists:get_value(ssl_opts, Opts, []),
-                          lists:ukeymerge(1, [{ssl_opts, SslOpts}], lists:usort(Opts));
-                      false ->
-                          [{Opt, Val}|Opts]
-                  end
-            end,
-    Queue = fun(Name) ->
-                Configs = cuttlefish_variable:filter_by_prefix("bridge.mqtt." ++ Name ++ ".queue", Conf),
-
-                QOpts = [{list_to_atom(QOpt), QValue}|| {[_, _, _, "queue", QOpt], QValue} <- Configs],
-                maps:from_list(QOpts)
-            end,
-    Subscriptions = fun(Name) ->
-                        Configs = cuttlefish_variable:filter_by_prefix("bridge.mqtt." ++ Name ++ ".subscription", Conf),
-                        lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, _, "subscription", I, "topic"], Topic} <- Configs])],
-                                  [QoS || {_, QoS} <- lists:sort([{I, QoS} || {[_, _, _, "subscription", I, "qos"], QoS} <- Configs])])
-                    end,
-    IsNodeAddr = fun(Addr) ->
-                      case string:tokens(Addr, "@") of
-                          [_NodeName, _Hostname] -> true;
-                           _ -> false
-                      end
-                 end,
-    ConnMod = fun(Name) ->
-
-                      [AddrConfig] = cuttlefish_variable:filter_by_prefix("bridge.mqtt." ++ Name ++ ".address", Conf),
-                      {_, Addr} = AddrConfig,
-
-                      Subs = Subscriptions(Name),
-                      case IsNodeAddr(Addr) of
-                          true when Subs =/= [] ->
-                              error({"subscriptions are not supported when bridging between emqx nodes", Name, Subs});
-                          true ->
-                              emqx_bridge_rpc;
-                          false ->
-                              emqx_bridge_mqtt
-                      end
-              end,
-
-    %% to be backward compatible
-    Translate =
-        fun Tr(queue, Q, Cfg) ->
-                NewQ = maps:fold(Tr, #{}, Q),
-                Cfg#{queue => NewQ};
-            Tr(address, Addr0, Cfg) ->
-                Addr = case IsNodeAddr(Addr0) of
-                           true -> list_to_atom(Addr0);
-                           false -> Addr0
-                       end,
-                Cfg#{address => Addr};
-            Tr(reconnect_interval, Ms, Cfg) ->
-                Cfg#{reconnect_delay_ms => Ms};
-            Tr(proto_ver, Ver, Cfg) ->
-                Cfg#{proto_ver =>
-                   case Ver of
-                       mqttv3 -> v3;
-                       mqttv4 -> v4;
-                       mqttv5 -> v5;
-                       _ -> v4
-                   end};
-            Tr(max_inflight_size, Size, Cfg) ->
-                Cfg#{max_inflight => Size};
-            Tr(Key, Value, Cfg) ->
-                Cfg#{Key => Value}
-         end,
-    C = lists:foldl(
-        fun({["bridge", "mqtt", Name, Opt], Val}, Acc) ->
-                %% e.g #{aws => [{OptKey, OptVal}]}
-                Init = [{list_to_atom(Opt), Val},
-                        {connect_module, ConnMod(Name)},
-                        {subscriptions, Subscriptions(Name)},
-                        {queue, Queue(Name)}],
-                maps:update_with(list_to_atom(Name), fun(Opts) -> Merge(list_to_atom(Opt), Val, Opts) end, Init, Acc);
-           (_, Acc) -> Acc
-        end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("bridge.mqtt", Conf))),
-    C1 = maps:map(fun(Bn, Bc) ->
-                      maps:to_list(maps:fold(Translate, #{}, maps:from_list(Bc)))
-                  end, C),
-    maps:to_list(C1)
-end}.

+ 0 - 74
apps/emqx_bridge_mqtt/src/emqx_bridge_connect.erl

@@ -1,74 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_bridge_connect).
-
--export([start/2]).
-
--export_type([config/0, connection/0]).
-
--optional_callbacks([ensure_subscribed/3, ensure_unsubscribed/2]).
-
-%% map fields depend on implementation
--type(config() :: map()).
--type(connection() :: term()).
--type(batch() :: emqx_protal:batch()).
--type(ack_ref() :: emqx_bridge_worker:ack_ref()).
--type(topic() :: emqx_topic:topic()).
--type(qos() :: emqx_mqtt_types:qos()).
-
--include_lib("emqx/include/logger.hrl").
-
--logger_header("[Bridge Connect]").
-
-%% establish the connection to remote node/cluster
-%% protal worker (the caller process) should be expecting
-%% a message {disconnected, conn_ref()} when disconnected.
--callback start(config()) -> {ok, connection()} | {error, any()}.
-
-%% send to remote node/cluster
-%% bridge worker (the caller process) should be expecting
-%% a message {batch_ack, reference()} when batch is acknowledged by remote node/cluster
--callback send(connection(), batch()) -> {ok, ack_ref()} | {ok, integer()} | {error, any()}.
-
-%% called when owner is shutting down.
--callback stop(connection()) -> ok.
-
--callback ensure_subscribed(connection(), topic(), qos()) -> ok.
-
--callback ensure_unsubscribed(connection(), topic()) -> ok.
-
-start(Module, Config) ->
-    case Module:start(Config) of
-        {ok, Conn} ->
-            {ok, Conn};
-        {error, Reason} ->
-            Config1 = obfuscate(Config),
-            ?LOG(error, "Failed to connect with module=~p\n"
-                 "config=~p\nreason:~p", [Module, Config1, Reason]),
-            {error, Reason}
-    end.
-
-obfuscate(Map) ->
-    maps:fold(fun(K, V, Acc) ->
-                      case is_sensitive(K) of
-                          true -> [{K, '***'} | Acc];
-                          false -> [{K, V} | Acc]
-                      end
-              end, [], Map).
-
-is_sensitive(password) -> true;
-is_sensitive(_) -> false.

+ 40 - 54
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl

@@ -18,15 +18,11 @@
 
 -module(emqx_bridge_mqtt).
 
--behaviour(emqx_bridge_connect).
-
-%% behaviour callbacks
 -export([ start/1
         , send/2
         , stop/1
         ]).
 
-%% optional behaviour callbacks
 -export([ ensure_subscribed/3
         , ensure_unsubscribed/2
         ]).
@@ -37,6 +33,9 @@
         , handle_disconnected/2
         ]).
 
+-export([ check_subscriptions/1
+        ]).
+
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 
@@ -49,26 +48,31 @@
 %% emqx_bridge_connect callbacks
 %%--------------------------------------------------------------------
 
-start(Config = #{address := Address}) ->
+start(Config) ->
     Parent = self(),
+    Address = maps:get(address, Config),
     Mountpoint = maps:get(receive_mountpoint, Config, undefined),
+    Subscriptions = maps:get(subscriptions, Config, []),
+    Subscriptions1 = check_subscriptions(Subscriptions),
     Handlers = make_hdlr(Parent, Mountpoint),
     {Host, Port} = case string:tokens(Address, ":") of
                        [H] -> {H, 1883};
                        [H, P] -> {H, list_to_integer(P)}
                    end,
-    ClientConfig = Config#{msg_handler => Handlers,
-                           host => Host,
-                           port => Port,
-                           force_ping => true
-                          },
-    case emqtt:start_link(replvar(ClientConfig)) of
+    Config1 = Config#{
+        msg_handler => Handlers,
+        host => Host,
+        port => Port,
+        force_ping => true,
+        proto_ver => maps:get(proto_ver, Config, v4)
+    },
+    case emqtt:start_link(without_config(Config1)) of
         {ok, Pid} ->
             case emqtt:connect(Pid) of
                 {ok, _} ->
                     try
-                        subscribe_remote_topics(Pid, maps:get(subscriptions, Config, [])),
-                        {ok, #{client_pid => Pid}}
+                        Subscriptions2 = subscribe_remote_topics(Pid, Subscriptions1),
+                        {ok, #{client_pid => Pid, subscriptions => Subscriptions2}}
                     catch
                         throw : Reason ->
                             ok = stop(#{client_pid => Pid}),
@@ -86,25 +90,25 @@ stop(#{client_pid := Pid}) ->
     safe_stop(Pid, fun() -> emqtt:stop(Pid) end, 1000),
     ok.
 
-ensure_subscribed(#{client_pid := Pid}, Topic, QoS) when is_pid(Pid) ->
+ensure_subscribed(#{client_pid := Pid, subscriptions := Subs} = Conn, Topic, QoS) when is_pid(Pid) ->
     case emqtt:subscribe(Pid, Topic, QoS) of
-        {ok, _, _} -> ok;
-        Error -> Error
+        {ok, _, _} -> Conn#{subscriptions => [{Topic, QoS}|Subs]};
+        Error -> {error, Error}
     end;
 ensure_subscribed(_Conn, _Topic, _QoS) ->
     %% return ok for now
     %% next re-connect should should call start with new topic added to config
     ok.
 
-ensure_unsubscribed(#{client_pid := Pid}, Topic) when is_pid(Pid) ->
+ensure_unsubscribed(#{client_pid := Pid, subscriptions := Subs} = Conn, Topic) when is_pid(Pid) ->
     case emqtt:unsubscribe(Pid, Topic) of
-        {ok, _, _} -> ok;
-        Error -> Error
+        {ok, _, _} -> Conn#{subscriptions => lists:keydelete(Topic, 1, Subs)};
+        Error -> {error, Error}
     end;
-ensure_unsubscribed(_, _) ->
+ensure_unsubscribed(Conn, _) ->
     %% return ok for now
     %% next re-connect should should call start with this topic deleted from config
-    ok.
+    Conn.
 
 safe_stop(Pid, StopF, Timeout) ->
     MRef = monitor(process, Pid),
@@ -169,36 +173,18 @@ make_hdlr(Parent, Mountpoint) ->
      }.
 
 subscribe_remote_topics(ClientPid, Subscriptions) ->
-    lists:foreach(fun({Topic, Qos}) ->
-                          case emqtt:subscribe(ClientPid, Topic, Qos) of
-                              {ok, _, _} -> ok;
-                              Error -> throw(Error)
-                          end
-                  end, Subscriptions).
-
-%%--------------------------------------------------------------------
-%% Internal funcs
-%%--------------------------------------------------------------------
-
-replvar(Options) ->
-    replvar([clientid, max_inflight], Options).
-
-replvar([], Options) ->
-    Options;
-replvar([Key|More], Options) ->
-    case maps:get(Key, Options, undefined) of
-        undefined ->
-            replvar(More, Options);
-        Val ->
-            replvar(More, maps:put(Key, feedvar(Key, Val, Options), Options))
-    end.
-
-%% ${node} => node()
-feedvar(clientid, ClientId, _) ->
-    iolist_to_binary(re:replace(ClientId, "\\${node}", atom_to_list(node())));
-
-feedvar(max_inflight, 0, _) ->
-    infinity;
-
-feedvar(max_inflight, Size, _) ->
-    Size.
+    lists:map(fun({Topic, Qos}) ->
+        case emqtt:subscribe(ClientPid, Topic, Qos) of
+            {ok, _, _} -> {Topic, Qos};
+            Error -> throw(Error)
+        end
+    end, Subscriptions).
+
+without_config(Config) ->
+    maps:without([conn_type, address, receive_mountpoint, subscriptions], Config).
+
+check_subscriptions(Subscriptions) ->
+    lists:map(fun(#{qos := QoS, topic := Topic}) ->
+        true = emqx_topic:validate({filter, Topic}),
+        {Topic, QoS}
+    end, Subscriptions).

+ 89 - 0
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_schema.erl

@@ -0,0 +1,89 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_mqtt_schema).
+
+-include_lib("typerefl/include/types.hrl").
+
+-behaviour(hocon_schema).
+
+-export([ structs/0
+        , fields/1]).
+
+structs() -> ["emqx_bridge_mqtt"].
+
+fields("emqx_bridge_mqtt") ->
+    [ {bridges, hoconsc:array("bridges")}
+    ];
+
+fields("bridges") ->
+    [ {name, emqx_schema:t(string(), undefined, true)}
+    , {start_type, fun start_type/1}
+    , {forwards, fun forwards/1}
+    , {forward_mountpoint, emqx_schema:t(string())}
+    , {reconnect_interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "30s")}
+    , {batch_size, emqx_schema:t(integer(), undefined, 100)}
+    , {queue, emqx_schema:t(hoconsc:ref("queue"))}
+    , {config, hoconsc:union([hoconsc:ref("mqtt"), hoconsc:ref("rpc")])}
+    ];
+
+fields("mqtt") ->
+    [ {conn_type, fun conn_type/1}
+    , {address, emqx_schema:t(string(), undefined, "127.0.0.1:1883")}
+    , {proto_ver, fun proto_ver/1}
+    , {bridge_mode, emqx_schema:t(boolean(), undefined, true)}
+    , {clientid, emqx_schema:t(string())}
+    , {username, emqx_schema:t(string())}
+    , {password, emqx_schema:t(string())}
+    , {clean_start, emqx_schema:t(boolean(), undefined, true)}
+    , {keepalive, emqx_schema:t(integer(), undefined, 300)}
+    , {subscriptions, hoconsc:array("subscriptions")}
+    , {receive_mountpoint, emqx_schema:t(string())}
+    , {retry_interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "30s")}
+    , {max_inflight, emqx_schema:t(integer(), undefined, 32)}
+    ];
+
+fields("rpc") ->
+    [ {conn_type, fun conn_type/1}
+    , {node, emqx_schema:t(atom(), undefined, 'emqx@127.0.0.1')}
+    ];
+
+fields("subscriptions") ->
+    [ {topic, #{type => binary(), nullable => false}}
+    , {qos, emqx_schema:t(integer(), undefined, 1)}
+    ];
+
+fields("queue") ->
+    [ {replayq_dir, hoconsc:union([boolean(), string()])}
+    , {replayq_seg_bytes, emqx_schema:t(emqx_schema:bytesize(), undefined, "100MB")}
+    , {replayq_offload_mode, emqx_schema:t(boolean(), undefined, false)}
+    , {replayq_max_total_bytes, emqx_schema:t(emqx_schema:bytesize(), undefined, "1024MB")}
+    ].
+
+conn_type(type) -> hoconsc:enum([mqtt, rpc]);
+conn_type(_) -> undefined.
+
+proto_ver(type) -> hoconsc:enum([v3, v4, v5]);
+proto_ver(default) -> v4;
+proto_ver(_) -> undefined.
+
+start_type(type) -> hoconsc:enum([auto, manual]);
+start_type(default) -> auto;
+start_type(_) -> undefined.
+
+forwards(type) -> hoconsc:array(binary());
+forwards(default) -> [];
+forwards(_) -> undefined.

+ 14 - 25
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_sup.erl

@@ -24,37 +24,33 @@
 
 %% APIs
 -export([ start_link/0
-        , start_link/1
         ]).
 
--export([ create_bridge/2
+-export([ create_bridge/1
         , drop_bridge/1
         , bridges/0
-        , is_bridge_exist/1
         ]).
 
 %% supervisor callbacks
 -export([init/1]).
 
--define(SUP, ?MODULE).
 -define(WORKER_SUP, emqx_bridge_worker_sup).
 
-start_link() -> start_link(?SUP).
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
-start_link(Name) ->
-    supervisor:start_link({local, Name}, ?MODULE, Name).
-
-init(?SUP) ->
-    BridgesConf = application:get_env(?APP, bridges, []),
+init([]) ->
+    BridgesConf = emqx_config:get([?APP, bridges], []),
     BridgeSpec = lists:map(fun bridge_spec/1, BridgesConf),
     SupFlag = #{strategy => one_for_one,
                 intensity => 100,
                 period => 10},
     {ok, {SupFlag, BridgeSpec}}.
 
-bridge_spec({Name, Config}) ->
+bridge_spec(Config) ->
+    Name = list_to_atom(maps:get(name, Config)),
     #{id => Name,
-      start => {emqx_bridge_worker, start_link, [Name, Config]},
+      start => {emqx_bridge_worker, start_link, [Config]},
       restart => permanent,
       shutdown => 5000,
       type => worker,
@@ -62,22 +58,15 @@ bridge_spec({Name, Config}) ->
 
 -spec(bridges() -> [{node(), map()}]).
 bridges() ->
-    [{Name, emqx_bridge_worker:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?SUP)].
-
--spec(is_bridge_exist(atom() | pid()) -> boolean()).
-is_bridge_exist(Id) ->
-    case supervisor:get_childspec(?SUP, Id) of
-        {ok, _ChildSpec} -> true;
-        {error, _Error} -> false
-    end.
+    [{Name, emqx_bridge_worker:status(Name)} || {Name, _Pid, _, _} <- supervisor:which_children(?MODULE)].
 
-create_bridge(Id, Config) ->
-    supervisor:start_child(?SUP, bridge_spec({Id, Config})).
+create_bridge(Config) ->
+    supervisor:start_child(?MODULE, bridge_spec(Config)).
 
-drop_bridge(Id) ->
-    case supervisor:terminate_child(?SUP, Id) of
+drop_bridge(Name) ->
+    case supervisor:terminate_child(?MODULE, Name) of
         ok ->
-            supervisor:delete_child(?SUP, Id);
+            supervisor:delete_child(?MODULE, Name);
         {error, Error} ->
             ?LOG(error, "Delete bridge failed, error : ~p", [Error]),
             {error, Error}

+ 10 - 15
apps/emqx_bridge_mqtt/src/emqx_bridge_rpc.erl

@@ -18,9 +18,6 @@
 
 -module(emqx_bridge_rpc).
 
--behaviour(emqx_bridge_connect).
-
-%% behaviour callbacks
 -export([ start/1
         , send/2
         , stop/1
@@ -33,17 +30,15 @@
 
 -type ack_ref() :: emqx_bridge_worker:ack_ref().
 -type batch() :: emqx_bridge_worker:batch().
--type node_or_tuple() :: atom() | {atom(), term()}.
-
 -define(HEARTBEAT_INTERVAL, timer:seconds(1)).
 
 -define(RPC, emqx_rpc).
 
-start(#{address := Remote}) ->
-    case poke(Remote) of
+start(#{node := RemoteNode}) ->
+    case poke(RemoteNode) of
         ok ->
-            Pid = proc_lib:spawn_link(?MODULE, heartbeat, [self(), Remote]),
-            {ok, #{client_pid => Pid, address => Remote}};
+            Pid = proc_lib:spawn_link(?MODULE, heartbeat, [self(), RemoteNode]),
+            {ok, #{client_pid => Pid, remote_node => RemoteNode}};
         Error ->
             Error
     end.
@@ -62,9 +57,9 @@ stop(#{client_pid := Pid}) when is_pid(Pid) ->
     ok.
 
 %% @doc Callback for `emqx_bridge_connect' behaviour
--spec send(#{address := node_or_tuple(), _ => _}, batch()) -> {ok, ack_ref()} | {error, any()}.
-send(#{address := Remote}, Batch) ->
-    case ?RPC:call(Remote, ?MODULE, handle_send, [Batch]) of
+-spec send(#{remote_node := atom(), _ => _}, batch()) -> {ok, ack_ref()} | {error, any()}.
+send(#{remote_node := RemoteNode}, Batch) ->
+    case ?RPC:call(RemoteNode, ?MODULE, handle_send, [Batch]) of
         ok ->
             Ref = make_ref(),
             self() ! {batch_ack, Ref},
@@ -93,8 +88,8 @@ heartbeat(Parent, RemoteNode) ->
             end
     end.
 
-poke(Node) ->
-    case ?RPC:call(Node, erlang, node, []) of
-        Node -> ok;
+poke(RemoteNode) ->
+    case ?RPC:call(RemoteNode, erlang, node, []) of
+        RemoteNode -> ok;
         {badrpc, Reason} -> {error, Reason}
     end.

+ 131 - 160
apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl

@@ -66,7 +66,6 @@
 
 %% APIs
 -export([ start_link/1
-        , start_link/2
         , register_metrics/0
         , stop/1
         ]).
@@ -86,7 +85,6 @@
 %% management APIs
 -export([ ensure_started/1
         , ensure_stopped/1
-        , ensure_stopped/2
         , status/1
         ]).
 
@@ -125,14 +123,13 @@
 -define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)).
 -define(DEFAULT_SEG_BYTES, (1 bsl 20)).
 -define(DEFAULT_MAX_TOTAL_SIZE, (1 bsl 31)).
--define(NO_BRIDGE_HANDLER, undefined).
 
 %% @doc Start a bridge worker. Supported configs:
 %% start_type: 'manual' (default) or 'auto', when manual, bridge will stay
 %%      at 'idle' state until a manual call to start it.
 %% connect_module: The module which implements emqx_bridge_connect behaviour
 %%      and work as message batch transport layer
-%% reconnect_delay_ms: Delay in milli-seconds for the bridge worker to retry
+%% reconnect_interval: Delay in milli-seconds for the bridge worker to retry
 %%      in case of transportation failure.
 %% max_inflight: Max number of batches allowed to send-ahead before receiving
 %%       confirmation from remote node/cluster
@@ -148,128 +145,98 @@
 %%
 %% Find more connection specific configs in the callback modules
 %% of emqx_bridge_connect behaviour.
-start_link(Config) when is_list(Config) ->
-    start_link(maps:from_list(Config));
-start_link(Config) ->
-    gen_statem:start_link(?MODULE, Config, []).
-
-start_link(Name, Config) when is_list(Config) ->
-    start_link(Name, maps:from_list(Config));
-start_link(Name, Config) ->
-    Name1 = name(Name),
-    gen_statem:start_link({local, Name1}, ?MODULE, Config#{name => Name1}, []).
+start_link(Opts) when is_list(Opts) ->
+    start_link(maps:from_list(Opts));
+start_link(Opts) ->
+    case maps:get(name, Opts, undefined) of
+        undefined ->
+            gen_statem:start_link(?MODULE, Opts, []);
+        Name ->
+            Name1 = name(Name),
+            gen_statem:start_link({local, Name1}, ?MODULE, Opts#{name => Name1}, [])
+    end.
 
 ensure_started(Name) ->
     gen_statem:call(name(Name), ensure_started).
 
 %% @doc Manually stop bridge worker. State idempotency ensured.
-ensure_stopped(Id) ->
-    ensure_stopped(Id, 1000).
-
-ensure_stopped(Id, Timeout) ->
-    Pid = case id(Id) of
-              P when is_pid(P) -> P;
-              N -> whereis(N)
-          end,
-    case Pid of
-        undefined ->
-            ok;
-        _ ->
-            MRef = monitor(process, Pid),
-            unlink(Pid),
-            _ = gen_statem:call(id(Id), ensure_stopped, Timeout),
-            receive
-                {'DOWN', MRef, _, _, _} ->
-                    ok
-            after
-                Timeout ->
-                    exit(Pid, kill)
-            end
-    end.
+ensure_stopped(Name) ->
+    gen_statem:call(name(Name), ensure_stopped, 5000).
 
 stop(Pid) -> gen_statem:stop(Pid).
 
 status(Pid) when is_pid(Pid) ->
     gen_statem:call(Pid, status);
-status(Id) ->
-    gen_statem:call(name(Id), status).
+status(Name) ->
+    gen_statem:call(name(Name), status).
 
 %% @doc Return all forwards (local subscriptions).
 -spec get_forwards(id()) -> [topic()].
-get_forwards(Id) -> gen_statem:call(id(Id), get_forwards, timer:seconds(1000)).
+get_forwards(Name) -> gen_statem:call(name(Name), get_forwards, timer:seconds(1000)).
 
 %% @doc Return all subscriptions (subscription over mqtt connection to remote broker).
 -spec get_subscriptions(id()) -> [{emqx_topic:topic(), qos()}].
-get_subscriptions(Id) -> gen_statem:call(id(Id), get_subscriptions).
+get_subscriptions(Name) -> gen_statem:call(name(Name), get_subscriptions).
 
 %% @doc Add a new forward (local topic subscription).
 -spec ensure_forward_present(id(), topic()) -> ok.
-ensure_forward_present(Id, Topic) ->
-    gen_statem:call(id(Id), {ensure_present, forwards, topic(Topic)}).
+ensure_forward_present(Name, Topic) ->
+    gen_statem:call(name(Name), {ensure_forward_present, topic(Topic)}).
 
 %% @doc Ensure a forward topic is deleted.
 -spec ensure_forward_absent(id(), topic()) -> ok.
-ensure_forward_absent(Id, Topic) ->
-    gen_statem:call(id(Id), {ensure_absent, forwards, topic(Topic)}).
+ensure_forward_absent(Name, Topic) ->
+    gen_statem:call(name(Name), {ensure_forward_absent, topic(Topic)}).
 
 %% @doc Ensure subscribed to remote topic.
 %% NOTE: only applicable when connection module is emqx_bridge_mqtt
 %%       return `{error, no_remote_subscription_support}' otherwise.
 -spec ensure_subscription_present(id(), topic(), qos()) -> ok | {error, any()}.
-ensure_subscription_present(Id, Topic, QoS) ->
-    gen_statem:call(id(Id), {ensure_present, subscriptions, {topic(Topic), QoS}}).
+ensure_subscription_present(Name, Topic, QoS) ->
+    gen_statem:call(name(Name), {ensure_subscription_present, topic(Topic), QoS}).
 
 %% @doc Ensure unsubscribed from remote topic.
 %% NOTE: only applicable when connection module is emqx_bridge_mqtt
 -spec ensure_subscription_absent(id(), topic()) -> ok.
-ensure_subscription_absent(Id, Topic) ->
-    gen_statem:call(id(Id), {ensure_absent, subscriptions, topic(Topic)}).
+ensure_subscription_absent(Name, Topic) ->
+    gen_statem:call(name(Name), {ensure_subscription_absent, topic(Topic)}).
 
 callback_mode() -> [state_functions].
 
 %% @doc Config should be a map().
-init(Config) ->
+init(Opts) ->
     erlang:process_flag(trap_exit, true),
-    ConnectModule = maps:get(connect_module, Config),
-    Subscriptions = maps:get(subscriptions, Config, []),
-    Forwards = maps:get(forwards, Config, []),
-    Queue = open_replayq(Config),
-    State = init_opts(Config),
-    Topics = [iolist_to_binary(T) || T <- Forwards],
-    Subs = check_subscriptions(Subscriptions),
-    ConnectCfg = get_conn_cfg(Config),
+    ConnectOpts = maps:get(config, Opts),
+    ConnectModule = conn_type(maps:get(conn_type, ConnectOpts)),
+    Forwards = maps:get(forwards, Opts, []),
+    Queue = open_replayq(maps:get(queue, Opts, #{})),
+    State = init_opts(Opts),
     self() ! idle,
     {ok, idle, State#{connect_module => ConnectModule,
-                      connect_cfg => ConnectCfg,
-                      forwards => Topics,
-                      subscriptions => Subs,
+                      connect_opts => ConnectOpts,
+                      forwards => Forwards,
                       replayq => Queue
                      }}.
 
-init_opts(Config) ->
-    IfRecordMetrics = maps:get(if_record_metrics, Config, true),
-    ReconnDelayMs = maps:get(reconnect_delay_ms, Config, ?DEFAULT_RECONNECT_DELAY_MS),
-    StartType = maps:get(start_type, Config, manual),
-    BridgeHandler = maps:get(bridge_handler, Config, ?NO_BRIDGE_HANDLER),
-    Mountpoint = maps:get(forward_mountpoint, Config, undefined),
-    ReceiveMountpoint = maps:get(receive_mountpoint, Config, undefined),
-    MaxInflightSize = maps:get(max_inflight, Config, ?DEFAULT_BATCH_SIZE),
-    BatchSize = maps:get(batch_size, Config, ?DEFAULT_BATCH_SIZE),
-    Name = maps:get(name, Config, undefined),
+init_opts(Opts) ->
+    IfRecordMetrics = maps:get(if_record_metrics, Opts, true),
+    ReconnDelayMs = maps:get(reconnect_interval, Opts, ?DEFAULT_RECONNECT_DELAY_MS),
+    StartType = maps:get(start_type, Opts, manual),
+    Mountpoint = maps:get(forward_mountpoint, Opts, undefined),
+    MaxInflightSize = maps:get(max_inflight, Opts, ?DEFAULT_BATCH_SIZE),
+    BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
+    Name = maps:get(name, Opts, undefined),
     #{start_type => StartType,
-      reconnect_delay_ms => ReconnDelayMs,
+      reconnect_interval => ReconnDelayMs,
       batch_size => BatchSize,
       mountpoint => format_mountpoint(Mountpoint),
-      receive_mountpoint => ReceiveMountpoint,
       inflight => [],
       max_inflight => MaxInflightSize,
       connection => undefined,
-      bridge_handler => BridgeHandler,
       if_record_metrics => IfRecordMetrics,
       name => Name}.
 
-open_replayq(Config) ->
-    QCfg = maps:get(queue, Config, #{}),
+open_replayq(QCfg) ->
     Dir = maps:get(replayq_dir, QCfg, undefined),
     SegBytes = maps:get(replayq_seg_bytes, QCfg, ?DEFAULT_SEG_BYTES),
     MaxTotalSize = maps:get(max_total_size, QCfg, ?DEFAULT_MAX_TOTAL_SIZE),
@@ -280,22 +247,6 @@ open_replayq(Config) ->
     replayq:open(QueueConfig#{sizer => fun emqx_bridge_msg:estimate_size/1,
                               marshaller => fun ?MODULE:msg_marshaller/1}).
 
-check_subscriptions(Subscriptions) ->
-    lists:map(fun({Topic, QoS}) ->
-        Topic1 = iolist_to_binary(Topic),
-        true = emqx_topic:validate({filter, Topic1}),
-        {Topic1, QoS}
-    end, Subscriptions).
-
-get_conn_cfg(Config) ->
-    maps:without([connect_module,
-                  queue,
-                  reconnect_delay_ms,
-                  forwards,
-                  mountpoint,
-                  name
-                 ], Config).
-
 code_change(_Vsn, State, Data, _Extra) ->
     {ok, State, Data}.
 
@@ -321,14 +272,10 @@ idle(info, idle, #{start_type := auto} = State) ->
 idle(state_timeout, reconnect, State) ->
     connecting(State);
 
-idle(info, {batch_ack, Ref}, State) ->
-    NewState = handle_batch_ack(State, Ref),
-    {keep_state, NewState};
-
 idle(Type, Content, State) ->
     common(idle, Type, Content, State).
 
-connecting(#{reconnect_delay_ms := ReconnectDelayMs} = State) ->
+connecting(#{reconnect_interval := ReconnectDelayMs} = State) ->
     case do_connect(State) of
         {ok, State1} ->
             {next_state, connected, State1, {state_timeout, 0, connected}};
@@ -348,7 +295,7 @@ connected(internal, maybe_send, State) ->
     {keep_state, NewState};
 
 connected(info, {disconnected, Conn, Reason},
-          #{connection := Connection, name := Name, reconnect_delay_ms := ReconnectDelayMs} = State) ->
+          #{connection := Connection, name := Name, reconnect_interval := ReconnectDelayMs} = State) ->
     ?tp(info, disconnected, #{name => Name, reason => Reason}),
     case Conn =:= maps:get(client_pid, Connection, undefined)  of
         true ->
@@ -365,19 +312,27 @@ connected(Type, Content, State) ->
 %% Common handlers
 common(StateName, {call, From}, status, _State) ->
     {keep_state_and_data, [{reply, From, StateName}]};
-common(_StateName, {call, From}, ensure_started, _State) ->
-    {keep_state_and_data, [{reply, From, connected}]};
-common(_StateName, {call, From}, ensure_stopped, _State) ->
-    {stop_and_reply, {shutdown, manual}, [{reply, From, ok}]};
+common(_StateName, {call, From}, ensure_stopped, #{connection := undefined} = _State) ->
+    {keep_state_and_data, [{reply, From, ok}]};
+common(_StateName, {call, From}, ensure_stopped, #{connection := Conn,
+                                                   connect_module := ConnectModule} = State) ->
+    Reply = ConnectModule:stop(Conn),
+    {next_state, idle, State#{connection => undefined}, [{reply, From, Reply}]};
 common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) ->
     {keep_state_and_data, [{reply, From, Forwards}]};
-common(_StateName, {call, From}, get_subscriptions, #{subscriptions := Subs}) ->
-    {keep_state_and_data, [{reply, From, Subs}]};
-common(_StateName, {call, From}, {ensure_present, What, Topic}, State) ->
-    {Result, NewState} = ensure_present(What, Topic, State),
+common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) ->
+    {keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, [])}]};
+common(_StateName, {call, From}, {ensure_forward_present, Topic}, State) ->
+    {Result, NewState} = do_ensure_forward_present(Topic, State),
+    {keep_state, NewState, [{reply, From, Result}]};
+common(_StateName, {call, From}, {ensure_subscription_present, Topic, QoS}, State) ->
+    {Result, NewState} = do_ensure_subscription_present(Topic, QoS, State),
+    {keep_state, NewState, [{reply, From, Result}]};
+common(_StateName, {call, From}, {ensure_forward_absent, Topic}, State) ->
+    {Result, NewState} = do_ensure_forward_absent(Topic, State),
     {keep_state, NewState, [{reply, From, Result}]};
-common(_StateName, {call, From}, {ensure_absent, What, Topic}, State) ->
-    {Result, NewState} = ensure_absent(What, Topic, State),
+common(_StateName, {call, From}, {ensure_subscription_absent, Topic}, State) ->
+    {Result, NewState} = do_ensure_subscription_absent(Topic, State),
     {keep_state, NewState, [{reply, From, Result}]};
 common(_StateName, info, {deliver, _, Msg},
        State = #{replayq := Q, if_record_metrics := IfRecordMetric}) ->
@@ -395,77 +350,79 @@ common(StateName, Type, Content, #{name := Name} = State) ->
           [Name, Type, StateName, Content]),
     {keep_state, State}.
 
-eval_bridge_handler(State = #{bridge_handler := ?NO_BRIDGE_HANDLER}, _Msg) ->
-    State;
-eval_bridge_handler(State = #{bridge_handler := Handler}, Msg) ->
-    Handler(Msg),
-    State.
+do_ensure_forward_present(Topic, #{forwards := Forwards, name := Name} = State) ->
+    case is_topic_present(Topic, Forwards) of
+        true ->
+            {ok, State};
+        false ->
+            R = subscribe_local_topic(Topic, Name),
+            {R, State#{forwards => [Topic | Forwards]}}
+    end.
 
-ensure_present(Key, Topic, State) ->
-    Topics = maps:get(Key, State),
-    case is_topic_present(Topic, Topics) of
+do_ensure_subscription_present(_Topic, _QoS, #{connection := undefined} = State) ->
+    {{error, no_connection}, State};
+do_ensure_subscription_present(_Topic, _QoS, #{connect_module := emqx_bridge_rpc} = State) ->
+    {{error, no_remote_subscription_support}, State};
+do_ensure_subscription_present(Topic, QoS, #{connect_module := ConnectModule,
+                                          connection := Conn} = State) ->
+    case is_topic_present(Topic, maps:get(subscriptions, Conn, [])) of
         true ->
             {ok, State};
         false ->
-            R = do_ensure_present(Key, Topic, State),
-            {R, State#{Key := lists:usort([Topic | Topics])}}
+            case ConnectModule:ensure_subscribed(Conn, Topic, QoS) of
+                {error, Error} ->
+                    {{error, Error}, State};
+                Conn1 ->
+                    {ok, State#{connection => Conn1}}
+            end
     end.
 
-ensure_absent(Key, Topic, State) ->
-    Topics = maps:get(Key, State),
-    case is_topic_present(Topic, Topics) of
+do_ensure_forward_absent(Topic, #{forwards := Forwards} = State) ->
+    case is_topic_present(Topic, Forwards) of
         true ->
-            R = do_ensure_absent(Key, Topic, State),
-            {R, State#{Key := ensure_topic_absent(Topic, Topics)}};
+            R = do_unsubscribe(Topic),
+            {R, State#{forwards => lists:delete(Topic, Forwards)}};
+        false ->
+            {ok, State}
+    end.
+do_ensure_subscription_absent(_Topic, #{connection := undefined} = State) ->
+    {{error, no_connection}, State};
+do_ensure_subscription_absent(_Topic, #{connect_module := emqx_bridge_rpc} = State) ->
+    {{error, no_remote_subscription_support}, State};
+do_ensure_subscription_absent(Topic, #{connect_module := ConnectModule,
+                                       connection := Conn} = State) ->
+    case is_topic_present(Topic, maps:get(subscriptions, Conn, [])) of
+        true ->
+            case ConnectModule:ensure_unsubscribed(Conn, Topic) of
+                {error, Error} ->
+                    {{error, Error}, State};
+                Conn1 ->
+                    {ok, State#{connection => Conn1}}
+            end;
         false ->
             {ok, State}
     end.
 
-ensure_topic_absent(_Topic, []) -> [];
-ensure_topic_absent(Topic, [{_, _} | _] = L) -> lists:keydelete(Topic, 1, L);
-ensure_topic_absent(Topic, L) -> lists:delete(Topic, L).
-
-is_topic_present({Topic, _QoS}, Topics) ->
-    is_topic_present(Topic, Topics);
 is_topic_present(Topic, Topics) ->
     lists:member(Topic, Topics) orelse false =/= lists:keyfind(Topic, 1, Topics).
 
 do_connect(#{forwards := Forwards,
-             subscriptions := Subs,
              connect_module := ConnectModule,
-             connect_cfg := ConnectCfg,
+             connect_opts := ConnectOpts,
              inflight := Inflight,
              name := Name} = State) ->
     ok = subscribe_local_topics(Forwards, Name),
-    case emqx_bridge_connect:start(ConnectModule, ConnectCfg#{subscriptions => Subs}) of
+    case ConnectModule:start(ConnectOpts) of
         {ok, Conn} ->
-            Res = eval_bridge_handler(State#{connection => Conn}, connected),
             ?tp(info, connected, #{name => Name, inflight => length(Inflight)}),
-            {ok, Res};
+            {ok, State#{connection => Conn}};
         {error, Reason} ->
+            ConnectOpts1 = obfuscate(ConnectOpts),
+            ?LOG(error, "Failed to connect with module=~p\n"
+                 "config=~p\nreason:~p", [ConnectModule, ConnectOpts1, Reason]),
             {error, Reason, State}
     end.
 
-do_ensure_present(forwards, Topic, #{name := Name}) ->
-    subscribe_local_topic(Topic, Name);
-do_ensure_present(subscriptions, _Topic, #{connection := undefined}) ->
-    {error, no_connection};
-do_ensure_present(subscriptions, _Topic, #{connect_module := emqx_bridge_rpc}) ->
-    {error, no_remote_subscription_support};
-do_ensure_present(subscriptions, {Topic, QoS}, #{connect_module := ConnectModule,
-                                                 connection := Conn}) ->
-    ConnectModule:ensure_subscribed(Conn, Topic, QoS).
-
-do_ensure_absent(forwards, Topic, _) ->
-    do_unsubscribe(Topic);
-do_ensure_absent(subscriptions, _Topic, #{connection := undefined}) ->
-    {error, no_connection};
-do_ensure_absent(subscriptions, _Topic, #{connect_module := emqx_bridge_rpc}) ->
-    {error, no_remote_subscription_support};
-do_ensure_absent(subscriptions, Topic, #{connect_module := ConnectModule,
-                                         connection := Conn}) ->
-    ConnectModule:ensure_unsubscribed(Conn, Topic).
-
 collect(Acc) ->
     receive
         {deliver, _, Msg} ->
@@ -605,10 +562,9 @@ disconnect(#{connection := Conn,
              connect_module := Module
             } = State) when Conn =/= undefined ->
     Module:stop(Conn),
-    State0 = State#{connection => undefined},
-    eval_bridge_handler(State0, disconnected);
+    State#{connection => undefined};
 disconnect(State) ->
-    eval_bridge_handler(State, disconnected).
+        State.
 
 %% Called only when replayq needs to dump it to disk.
 msg_marshaller(Bin) when is_binary(Bin) -> emqx_bridge_msg:from_binary(Bin);
@@ -621,9 +577,6 @@ format_mountpoint(Prefix) ->
 
 name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])).
 
-id(Pid) when is_pid(Pid) -> Pid;
-id(Name) -> name(Name).
-
 register_metrics() ->
     lists:foreach(fun emqx_metrics:ensure/1,
                   ['bridge.mqtt.message_sent',
@@ -639,3 +592,21 @@ bridges_metrics_inc(true, Metric, Value) ->
     emqx_metrics:inc(Metric, Value);
 bridges_metrics_inc(_IsRecordMetric, _Metric, _Value) ->
     ok.
+
+obfuscate(Map) ->
+    maps:fold(fun(K, V, Acc) ->
+                      case is_sensitive(K) of
+                          true -> [{K, '***'} | Acc];
+                          false -> [{K, V} | Acc]
+                      end
+              end, [], Map).
+
+is_sensitive(password) -> true;
+is_sensitive(_) -> false.
+
+conn_type(rpc) ->
+    emqx_bridge_rpc;
+conn_type(mqtt) ->
+    emqx_bridge_mqtt;
+conn_type(Mod) when is_atom(Mod) ->
+    Mod.

+ 1 - 1
apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_tests.erl

@@ -44,4 +44,4 @@ send_and_ack_test() ->
         ok = emqx_bridge_mqtt:stop(Conn)
     after
         meck:unload(emqtt)
-    end.
+    end.

+ 2 - 2
apps/emqx_bridge_mqtt/test/emqx_bridge_rpc_tests.erl

@@ -30,8 +30,8 @@ send_and_ack_test() ->
                 end),
     meck:new(emqx_bridge_worker, [passthrough, no_history]),
     try
-        {ok, #{client_pid := Pid, address := Node}} = emqx_bridge_rpc:start(#{address => node()}),
-        {ok, Ref} = emqx_bridge_rpc:send(#{address => Node}, []),
+        {ok, #{client_pid := Pid, remote_node := Node}} = emqx_bridge_rpc:start(#{node => node()}),
+        {ok, Ref} = emqx_bridge_rpc:send(#{remote_node => Node}, []),
         receive
             {batch_ack, Ref} ->
                 ok

+ 0 - 3
apps/emqx_bridge_mqtt/test/emqx_bridge_stub_conn.erl

@@ -16,9 +16,6 @@
 
 -module(emqx_bridge_stub_conn).
 
--behaviour(emqx_bridge_connect).
-
-%% behaviour callbacks
 -export([ start/1
         , send/2
         , stop/1

+ 252 - 223
apps/emqx_bridge_mqtt/test/emqx_bridge_worker_SUITE.erl

@@ -59,13 +59,12 @@ init_per_suite(Config) ->
         nonode@nohost -> net_kernel:start(['emqx@127.0.0.1', longnames]);
         _ -> ok
     end,
-    ok = application:set_env(gen_rpc, tcp_client_num, 1),
-    emqx_ct_helpers:start_apps([emqx_modules, emqx_bridge_mqtt]),
+    emqx_ct_helpers:start_apps([emqx_bridge_mqtt]),
     emqx_logger:set_log_level(error),
     [{log_level, error} | Config].
 
 end_per_suite(_Config) ->
-    emqx_ct_helpers:stop_apps([emqx_bridge_mqtt, emqx_modules]).
+    emqx_ct_helpers:stop_apps([emqx_bridge_mqtt]).
 
 init_per_testcase(_TestCase, Config) ->
     ok = snabbkaffe:start_trace(),
@@ -74,260 +73,290 @@ init_per_testcase(_TestCase, Config) ->
 end_per_testcase(_TestCase, _Config) ->
     ok = snabbkaffe:stop().
 
-t_mngr(Config) when is_list(Config) ->
-    Subs = [{<<"a">>, 1}, {<<"b">>, 2}],
-    Cfg = #{address => node(),
-            forwards => [<<"mngr">>],
-            connect_module => emqx_bridge_rpc,
-            mountpoint => <<"forwarded">>,
-            subscriptions => Subs,
-            start_type => auto},
-    Name = ?FUNCTION_NAME,
-    {ok, Pid} = emqx_bridge_worker:start_link(Name, Cfg),
-    try
-        ?assertEqual([<<"mngr">>], emqx_bridge_worker:get_forwards(Name)),
-        ?assertEqual(ok, emqx_bridge_worker:ensure_forward_present(Name, "mngr")),
-        ?assertEqual(ok, emqx_bridge_worker:ensure_forward_present(Name, "mngr2")),
-        ?assertEqual([<<"mngr">>, <<"mngr2">>], emqx_bridge_worker:get_forwards(Pid)),
-        ?assertEqual(ok, emqx_bridge_worker:ensure_forward_absent(Name, "mngr2")),
-        ?assertEqual(ok, emqx_bridge_worker:ensure_forward_absent(Name, "mngr3")),
-        ?assertEqual([<<"mngr">>], emqx_bridge_worker:get_forwards(Pid)),
-        ?assertEqual({error, no_remote_subscription_support},
-                     emqx_bridge_worker:ensure_subscription_present(Pid, <<"t">>, 0)),
-        ?assertEqual({error, no_remote_subscription_support},
-                     emqx_bridge_worker:ensure_subscription_absent(Pid, <<"t">>)),
-        ?assertEqual(Subs, emqx_bridge_worker:get_subscriptions(Pid))
-    after
-        ok = emqx_bridge_worker:stop(Pid)
-    end.
+t_rpc_mngr(_Config) ->
+    Name = "rpc_name",
+    Cfg = #{
+        name => Name,
+        forwards => [<<"mngr">>],
+        forward_mountpoint => <<"forwarded">>,
+        start_type => auto,
+        config => #{
+            conn_type => rpc,
+            node => node()
+        }
+    },
+    {ok, Pid} = emqx_bridge_mqtt_sup:create_bridge(Cfg),
+    ?assertEqual([<<"mngr">>], emqx_bridge_worker:get_forwards(Name)),
+    ?assertEqual(ok, emqx_bridge_worker:ensure_forward_present(Name, "mngr")),
+    ?assertEqual(ok, emqx_bridge_worker:ensure_forward_present(Name, "mngr2")),
+    ?assertEqual([<<"mngr2">>, <<"mngr">>], emqx_bridge_worker:get_forwards(Name)),
+    ?assertEqual(ok, emqx_bridge_worker:ensure_forward_absent(Name, "mngr2")),
+    ?assertEqual(ok, emqx_bridge_worker:ensure_forward_absent(Name, "mngr3")),
+    ?assertEqual([<<"mngr">>], emqx_bridge_worker:get_forwards(Name)),
+    ?assertEqual({error, no_remote_subscription_support},
+                    emqx_bridge_worker:ensure_subscription_present(Name, <<"t">>, 0)),
+    ?assertEqual({error, no_remote_subscription_support},
+                    emqx_bridge_worker:ensure_subscription_absent(Name, <<"t">>)),
+    ok = emqx_bridge_worker:stop(Pid).
+
+t_mqtt_mngr(_Config) ->
+    Name = "mqtt_name",
+    Cfg = #{
+        name => Name,
+        forwards => [<<"mngr">>],
+        forward_mountpoint => <<"forwarded">>,
+        start_type => auto,
+        config => #{
+            address => "127.0.0.1:1883",
+            conn_type => mqtt,
+            clientid => <<"client1">>,
+            keepalive => 300,
+            subscriptions => [#{topic => <<"t/#">>, qos => 1}]
+        }
+    },
+    {ok, Pid} = emqx_bridge_mqtt_sup:create_bridge(Cfg),
+    ?assertEqual([<<"mngr">>], emqx_bridge_worker:get_forwards(Name)),
+    ?assertEqual(ok, emqx_bridge_worker:ensure_forward_present(Name, "mngr")),
+    ?assertEqual(ok, emqx_bridge_worker:ensure_forward_present(Name, "mngr2")),
+    ?assertEqual([<<"mngr2">>, <<"mngr">>], emqx_bridge_worker:get_forwards(Name)),
+    ?assertEqual(ok, emqx_bridge_worker:ensure_forward_absent(Name, "mngr2")),
+    ?assertEqual(ok, emqx_bridge_worker:ensure_forward_absent(Name, "mngr3")),
+    ?assertEqual([<<"mngr">>], emqx_bridge_worker:get_forwards(Name)),
+    ?assertEqual(ok, emqx_bridge_worker:ensure_subscription_present(Name, <<"t">>, 0)),
+    ?assertEqual(ok, emqx_bridge_worker:ensure_subscription_absent(Name, <<"t">>)),
+    ?assertEqual([{<<"t/#">>,1}], emqx_bridge_worker:get_subscriptions(Name)),
+    ok = emqx_bridge_worker:stop(Pid).
 
 %% A loopback RPC to local node
-t_rpc(Config) when is_list(Config) ->
-    Cfg = #{address => node(),
-            forwards => [<<"t_rpc/#">>],
-            connect_module => emqx_bridge_rpc,
-            forward_mountpoint => <<"forwarded">>,
-            start_type => auto},
-    {ok, Pid} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg),
-    ClientId = <<"ClientId">>,
-    try
-        {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]),
-        {ok, _Props} = emqtt:connect(ConnPid),
-        {ok, _Props, [1]} = emqtt:subscribe(ConnPid, {<<"forwarded/t_rpc/one">>, ?QOS_1}),
-        timer:sleep(100),
-        {ok, _PacketId} = emqtt:publish(ConnPid, <<"t_rpc/one">>, <<"hello">>, ?QOS_1),
-        timer:sleep(100),
-        ?assertEqual(1, length(receive_messages(1))),
-        emqtt:disconnect(ConnPid)
-    after
-        ok = emqx_bridge_worker:stop(Pid)
-    end.
+t_rpc(_Config) ->
+    Name = "rpc",
+    Cfg = #{
+        name => Name,
+        forwards => [<<"t_rpc/#">>],
+        forward_mountpoint => <<"forwarded">>,
+        start_type => auto,
+        config => #{
+            conn_type => rpc,
+            node => node()
+        }
+    },
+    {ok, Pid} = emqx_bridge_mqtt_sup:create_bridge(Cfg),
+    {ok, ConnPid} = emqtt:start_link([{clientid, <<"ClientId">>}]),
+    {ok, _Props} = emqtt:connect(ConnPid),
+    {ok, _Props, [1]} = emqtt:subscribe(ConnPid, {<<"forwarded/t_rpc/one">>, ?QOS_1}),
+    timer:sleep(100),
+    {ok, _PacketId} = emqtt:publish(ConnPid, <<"t_rpc/one">>, <<"hello">>, ?QOS_1),
+    timer:sleep(100),
+    ?assertEqual(1, length(receive_messages(1))),
+    emqtt:disconnect(ConnPid),
+    emqx_bridge_worker:stop(Pid).
 
 %% Full data loopback flow explained:
 %% mqtt-client ----> local-broker ---(local-subscription)--->
 %% bridge(export) --- (mqtt-connection)--> local-broker ---(remote-subscription) -->
 %% bridge(import) --> mqtt-client
-t_mqtt(Config) when is_list(Config) ->
+t_mqtt(_Config) ->
     SendToTopic = <<"t_mqtt/one">>,
     SendToTopic2 = <<"t_mqtt/two">>,
     SendToTopic3 = <<"t_mqtt/three">>,
     Mountpoint = <<"forwarded/${node}/">>,
-    Cfg = #{address => "127.0.0.1:1883",
-            forwards => [SendToTopic],
-            connect_module => emqx_bridge_mqtt,
-            forward_mountpoint => Mountpoint,
-            username => "user",
-            clean_start => true,
-            clientid => "bridge_aws",
-            keepalive => 60000,
-            password => "passwd",
-            proto_ver => mqttv4,
-            queue => #{replayq_dir => "data/t_mqtt/",
-                       replayq_seg_bytes => 10000,
-                       batch_bytes_limit => 1000,
-                       batch_count_limit => 10
-                      },
-            reconnect_delay_ms => 1000,
-            ssl => false,
-            %% Consume back to forwarded message for verification
-            %% NOTE: this is a indefenite loopback without mocking emqx_bridge_worker:import_batch/1
-            subscriptions => [{SendToTopic2, _QoS = 1}],
-            receive_mountpoint => <<"receive/aws/">>,
-            start_type => auto},
-    {ok, Pid} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg),
-    ClientId = <<"client-1">>,
-    try
-        ?assertEqual([{SendToTopic2, 1}], emqx_bridge_worker:get_subscriptions(Pid)),
-        ok = emqx_bridge_worker:ensure_subscription_present(Pid, SendToTopic3, _QoS = 1),
-        ?assertEqual([{SendToTopic3, 1},{SendToTopic2, 1}],
-                     emqx_bridge_worker:get_subscriptions(Pid)),
-        {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]),
-        {ok, _Props} = emqtt:connect(ConnPid),
-        emqtt:subscribe(ConnPid, <<"forwarded/+/t_mqtt/one">>, 1),
-        %% message from a different client, to avoid getting terminated by no-local
-        Max = 10,
-        Msgs = lists:seq(1, Max),
-        lists:foreach(fun(I) ->
-                          {ok, _PacketId} = emqtt:publish(ConnPid, SendToTopic, integer_to_binary(I), ?QOS_1)
-                      end, Msgs),
-        ?assertEqual(10, length(receive_messages(200))),
+    Name = "mqtt",
+    Cfg = #{
+        name => Name,
+        forwards => [SendToTopic],
+        forward_mountpoint => Mountpoint,
+        start_type => auto,
+        config => #{
+            address => "127.0.0.1:1883",
+            conn_type => mqtt,
+            clientid => <<"client1">>,
+            keepalive => 300,
+            subscriptions => [#{topic => SendToTopic2, qos => 1}],
+            receive_mountpoint => <<"receive/aws/">>
+        },
+        queue => #{
+            replayq_dir => "data/t_mqtt/",
+            replayq_seg_bytes => 10000,
+            batch_bytes_limit => 1000,
+            batch_count_limit => 10
+        }
+    },
+    {ok, Pid} = emqx_bridge_mqtt_sup:create_bridge(Cfg),
+    ?assertEqual([{SendToTopic2, 1}], emqx_bridge_worker:get_subscriptions(Name)),
+    ok = emqx_bridge_worker:ensure_subscription_present(Name, SendToTopic3, _QoS = 1),
+    ?assertEqual([{SendToTopic3, 1},{SendToTopic2, 1}],
+                    emqx_bridge_worker:get_subscriptions(Name)),
+    {ok, ConnPid} = emqtt:start_link([{clientid, <<"client-1">>}]),
+    {ok, _Props} = emqtt:connect(ConnPid),
+    emqtt:subscribe(ConnPid, <<"forwarded/+/t_mqtt/one">>, 1),
+    %% message from a different client, to avoid getting terminated by no-local
+    Max = 10,
+    Msgs = lists:seq(1, Max),
+    lists:foreach(fun(I) ->
+                        {ok, _PacketId} = emqtt:publish(ConnPid, SendToTopic, integer_to_binary(I), ?QOS_1)
+                    end, Msgs),
+    ?assertEqual(10, length(receive_messages(200))),
 
-        emqtt:subscribe(ConnPid, <<"receive/aws/t_mqtt/two">>, 1),
-        %% message from a different client, to avoid getting terminated by no-local
-        Max = 10,
-        Msgs = lists:seq(1, Max),
-        lists:foreach(fun(I) ->
-                          {ok, _PacketId} = emqtt:publish(ConnPid, SendToTopic2, integer_to_binary(I), ?QOS_1)
-                      end, Msgs),
-        ?assertEqual(10, length(receive_messages(200))),
+    emqtt:subscribe(ConnPid, <<"receive/aws/t_mqtt/two">>, 1),
+    %% message from a different client, to avoid getting terminated by no-local
+    Max = 10,
+    Msgs = lists:seq(1, Max),
+    lists:foreach(fun(I) ->
+                        {ok, _PacketId} = emqtt:publish(ConnPid, SendToTopic2, integer_to_binary(I), ?QOS_1)
+                    end, Msgs),
+    ?assertEqual(10, length(receive_messages(200))),
 
-        emqtt:disconnect(ConnPid)
-    after
-        ok = emqx_bridge_worker:stop(Pid)
-    end.
+    emqtt:disconnect(ConnPid),
+    ok = emqx_bridge_worker:stop(Pid).
 
 t_stub_normal(Config) when is_list(Config) ->
-    Cfg = #{forwards => [<<"t_stub_normal/#">>],
-            connect_module => emqx_bridge_stub_conn,
-            forward_mountpoint => <<"forwarded">>,
-            start_type => auto,
+    Name = "stub_normal",
+    Cfg = #{
+        name => Name,
+        forwards => [<<"t_stub_normal/#">>],
+        forward_mountpoint => <<"forwarded">>,
+        start_type => auto,
+        config => #{
+            conn_type => emqx_bridge_stub_conn,
             client_pid => self()
-           },
-    {ok, Pid} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg),
+        }
+    },
+    {ok, Pid} = emqx_bridge_mqtt_sup:create_bridge(Cfg),
     receive
         {Pid, emqx_bridge_stub_conn, ready} -> ok
     after
         5000 ->
             error(timeout)
     end,
-    ClientId = <<"ClientId">>,
-    try
-        {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]),
-        {ok, _} = emqtt:connect(ConnPid),
-        {ok, _PacketId} = emqtt:publish(ConnPid, <<"t_stub_normal/one">>, <<"hello">>, ?QOS_1),
-        receive
-            {stub_message, WorkerPid, BatchRef, _Batch} ->
-                WorkerPid ! {batch_ack, BatchRef},
-                ok
-        after
-            5000 ->
-                error(timeout)
-        end,
-        ?SNK_WAIT(inflight_drained),
-        ?SNK_WAIT(replayq_drained),
-        emqtt:disconnect(ConnPid)
+    {ok, ConnPid} = emqtt:start_link([{clientid, <<"ClientId">>}]),
+    {ok, _} = emqtt:connect(ConnPid),
+    {ok, _PacketId} = emqtt:publish(ConnPid, <<"t_stub_normal/one">>, <<"hello">>, ?QOS_1),
+    receive
+        {stub_message, WorkerPid, BatchRef, _Batch} ->
+            WorkerPid ! {batch_ack, BatchRef},
+            ok
     after
-        ok = emqx_bridge_worker:stop(Pid)
-    end.
+        5000 ->
+            error(timeout)
+    end,
+    ?SNK_WAIT(inflight_drained),
+    ?SNK_WAIT(replayq_drained),
+    emqtt:disconnect(ConnPid),
+    ok = emqx_bridge_worker:stop(Pid).
 
-t_stub_overflow(Config) when is_list(Config) ->
+t_stub_overflow(_Config) ->
     Topic = <<"t_stub_overflow/one">>,
     MaxInflight = 20,
-    Cfg = #{forwards => [Topic],
-            connect_module => emqx_bridge_stub_conn,
-            forward_mountpoint => <<"forwarded">>,
-            start_type => auto,
-            client_pid => self(),
-            max_inflight => MaxInflight
-           },
-    {ok, Worker} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg),
-    ClientId = <<"ClientId">>,
-    try
-        {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]),
-        {ok, _} = emqtt:connect(ConnPid),
-        lists:foreach(
-          fun(I) ->
-                  Data = integer_to_binary(I),
-                  _ = emqtt:publish(ConnPid, Topic, Data, ?QOS_1)
-          end, lists:seq(1, MaxInflight * 2)),
-        ?SNK_WAIT(inflight_full),
-        Acks = stub_receive(MaxInflight),
-        lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end, Acks),
-        Acks2 = stub_receive(MaxInflight),
-        lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end, Acks2),
-        ?SNK_WAIT(inflight_drained),
-        ?SNK_WAIT(replayq_drained),
-        emqtt:disconnect(ConnPid)
-    after
-        ok = emqx_bridge_worker:stop(Worker)
-    end.
+    Name = "stub_overflow",
+    Cfg = #{
+        name => Name,
+        forwards => [<<"t_stub_overflow/one">>],
+        forward_mountpoint => <<"forwarded">>,
+        start_type => auto,
+        max_inflight => MaxInflight,
+        config => #{
+            conn_type => emqx_bridge_stub_conn,
+            client_pid => self()
+        }
+    },
+    {ok, Worker} = emqx_bridge_mqtt_sup:create_bridge(Cfg),
+    {ok, ConnPid} = emqtt:start_link([{clientid, <<"ClientId">>}]),
+    {ok, _} = emqtt:connect(ConnPid),
+    lists:foreach(
+        fun(I) ->
+                Data = integer_to_binary(I),
+                _ = emqtt:publish(ConnPid, Topic, Data, ?QOS_1)
+        end, lists:seq(1, MaxInflight * 2)),
+    ?SNK_WAIT(inflight_full),
+    Acks = stub_receive(MaxInflight),
+    lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end, Acks),
+    Acks2 = stub_receive(MaxInflight),
+    lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end, Acks2),
+    ?SNK_WAIT(inflight_drained),
+    ?SNK_WAIT(replayq_drained),
+    emqtt:disconnect(ConnPid),
+    ok = emqx_bridge_worker:stop(Worker).
 
-t_stub_random_order(Config) when is_list(Config) ->
+t_stub_random_order(_Config) ->
     Topic = <<"t_stub_random_order/a">>,
     MaxInflight = 10,
-    Cfg = #{forwards => [Topic],
-            connect_module => emqx_bridge_stub_conn,
-            forward_mountpoint => <<"forwarded">>,
-            start_type => auto,
-            client_pid => self(),
-            max_inflight => MaxInflight
-           },
-    {ok, Worker} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg),
+    Name = "stub_random_order",
+    Cfg = #{
+        name => Name,
+        forwards => [Topic],
+        forward_mountpoint => <<"forwarded">>,
+        start_type => auto,
+        max_inflight => MaxInflight,
+        config => #{
+            conn_type => emqx_bridge_stub_conn,
+            client_pid => self()
+        }
+    },
+    {ok, Worker} = emqx_bridge_mqtt_sup:create_bridge(Cfg),
     ClientId = <<"ClientId">>,
-    try
-        {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]),
-        {ok, _} = emqtt:connect(ConnPid),
-        lists:foreach(
-          fun(I) ->
-                  Data = integer_to_binary(I),
-                  _ = emqtt:publish(ConnPid, Topic, Data, ?QOS_1)
-          end, lists:seq(1, MaxInflight)),
-        Acks = stub_receive(MaxInflight),
-        lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end,
-                      lists:reverse(Acks)),
-        ?SNK_WAIT(inflight_drained),
-        ?SNK_WAIT(replayq_drained),
-        emqtt:disconnect(ConnPid)
-    after
-        ok = emqx_bridge_worker:stop(Worker)
-    end.
+    {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]),
+    {ok, _} = emqtt:connect(ConnPid),
+    lists:foreach(
+        fun(I) ->
+                Data = integer_to_binary(I),
+                _ = emqtt:publish(ConnPid, Topic, Data, ?QOS_1)
+        end, lists:seq(1, MaxInflight)),
+    Acks = stub_receive(MaxInflight),
+    lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end,
+                    lists:reverse(Acks)),
+    ?SNK_WAIT(inflight_drained),
+    ?SNK_WAIT(replayq_drained),
+    emqtt:disconnect(ConnPid),
+    ok = emqx_bridge_worker:stop(Worker).
 
-t_stub_retry_inflight(Config) when is_list(Config) ->
+t_stub_retry_inflight(_Config) ->
     Topic = <<"to_stub_retry_inflight/a">>,
     MaxInflight = 10,
-    Cfg = #{forwards => [Topic],
-            connect_module => emqx_bridge_stub_conn,
-            forward_mountpoint => <<"forwarded">>,
-            reconnect_delay_ms => 10,
-            start_type => auto,
-            client_pid => self(),
-            max_inflight => MaxInflight
-           },
-    {ok, Worker} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg),
+    Name = "stub_retry_inflight",
+    Cfg = #{
+        name => Name,
+        forwards => [Topic],
+        forward_mountpoint => <<"forwarded">>,
+        reconnect_interval => 10,
+        start_type => auto,
+        max_inflight => MaxInflight,
+        config => #{
+            conn_type => emqx_bridge_stub_conn,
+            client_pid => self()
+        }
+    },
+    {ok, Worker} = emqx_bridge_mqtt_sup:create_bridge(Cfg),
     ClientId = <<"ClientId2">>,
-    try
-        case ?block_until(#{?snk_kind := connected, inflight := 0}, 2000, 1000) of
-            {ok, #{inflight := 0}} -> ok;
-            Other -> ct:fail("~p", [Other])
-        end,
-        {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]),
-        {ok, _} = emqtt:connect(ConnPid),
-        lists:foreach(
-          fun(I) ->
-                  Data = integer_to_binary(I),
-                  _ = emqtt:publish(ConnPid, Topic, Data, ?QOS_1)
-          end, lists:seq(1, MaxInflight)),
-        %% receive acks but do not ack
-        Acks1 = stub_receive(MaxInflight),
-        ?assertEqual(MaxInflight, length(Acks1)),
-        %% simulate a disconnect
-        Worker ! {disconnected, self(), test},
-        ?SNK_WAIT(disconnected),
-        case ?block_until(#{?snk_kind := connected, inflight := MaxInflight}, 2000, 20) of
-            {ok, _} -> ok;
-            Error -> ct:fail("~p", [Error])
-        end,
-        %% expect worker to retry inflight, so to receive acks again
-        Acks2 = stub_receive(MaxInflight),
-        ?assertEqual(MaxInflight, length(Acks2)),
-        lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end,
-                      lists:reverse(Acks2)),
-        ?SNK_WAIT(inflight_drained),
-        ?SNK_WAIT(replayq_drained),
-        emqtt:disconnect(ConnPid)
-    after
-        ok = emqx_bridge_worker:stop(Worker)
-    end.
+    case ?block_until(#{?snk_kind := connected, inflight := 0}, 2000, 1000) of
+        {ok, #{inflight := 0}} -> ok;
+        Other -> ct:fail("~p", [Other])
+    end,
+    {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]),
+    {ok, _} = emqtt:connect(ConnPid),
+    lists:foreach(
+        fun(I) ->
+                Data = integer_to_binary(I),
+                _ = emqtt:publish(ConnPid, Topic, Data, ?QOS_1)
+        end, lists:seq(1, MaxInflight)),
+    %% receive acks but do not ack
+    Acks1 = stub_receive(MaxInflight),
+    ?assertEqual(MaxInflight, length(Acks1)),
+    %% simulate a disconnect
+    Worker ! {disconnected, self(), test},
+    ?SNK_WAIT(disconnected),
+    case ?block_until(#{?snk_kind := connected, inflight := MaxInflight}, 2000, 20) of
+        {ok, _} -> ok;
+        Error -> ct:fail("~p", [Error])
+    end,
+    %% expect worker to retry inflight, so to receive acks again
+    Acks2 = stub_receive(MaxInflight),
+    ?assertEqual(MaxInflight, length(Acks2)),
+    lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end,
+                    lists:reverse(Acks2)),
+    ?SNK_WAIT(inflight_drained),
+    ?SNK_WAIT(replayq_drained),
+    emqtt:disconnect(ConnPid),
+    ok = emqx_bridge_worker:stop(Worker).
 
 stub_receive(N) ->
     stub_receive(N, []).

+ 20 - 19
apps/emqx_bridge_mqtt/test/emqx_bridge_worker_tests.erl

@@ -15,7 +15,6 @@
 %%--------------------------------------------------------------------
 
 -module(emqx_bridge_worker_tests).
--behaviour(emqx_bridge_connect).
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("emqx/include/emqx.hrl").
@@ -69,14 +68,14 @@ disturbance_test() ->
     emqx_bridge_worker:register_metrics(),
     Ref = make_ref(),
     TestPid = self(),
-    Config = make_config(Ref, TestPid, {ok, #{client_pid =>  TestPid}}),
-    {ok, Pid} = emqx_bridge_worker:start_link(?BRIDGE_NAME, Config),
-    ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)),
+    Config = make_config(Ref, TestPid, {ok, #{client_pid => TestPid}}),
+    {ok, Pid} = emqx_bridge_worker:start_link(Config#{name => disturbance}),
+    ?assertEqual(Pid, whereis(emqx_bridge_worker_disturbance)),
     ?WAIT({connection_start_attempt, Ref}, 1000),
     Pid ! {disconnected, TestPid, test},
     ?WAIT({connection_start_attempt, Ref}, 1000),
     emqx_metrics:stop(),
-    ok = emqx_bridge_worker:stop(?BRIDGE_REG_NAME).
+    ok = emqx_bridge_worker:stop(Pid).
 
 % % %% buffer should continue taking in messages when disconnected
 % buffer_when_disconnected_test_() ->
@@ -113,22 +112,24 @@ manual_start_stop_test() ->
     emqx_bridge_worker:register_metrics(),
     Ref = make_ref(),
     TestPid = self(),
+    BridgeName = manual_start_stop,
     Config0 = make_config(Ref, TestPid, {ok, #{client_pid =>  TestPid}}),
     Config = Config0#{start_type := manual},
-    {ok, Pid} = emqx_bridge_worker:start_link(?BRIDGE_NAME, Config),
+    {ok, Pid} = emqx_bridge_worker:start_link(Config#{name => BridgeName}),
     %% call ensure_started again should yeld the same result
-    ok = emqx_bridge_worker:ensure_started(?BRIDGE_NAME),
-    ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)),
-    emqx_bridge_worker:ensure_stopped(unknown),
-    emqx_bridge_worker:ensure_stopped(Pid),
-    emqx_bridge_worker:ensure_stopped(?BRIDGE_REG_NAME),
-    emqx_metrics:stop().
+    ok = emqx_bridge_worker:ensure_started(BridgeName),
+    emqx_bridge_worker:ensure_stopped(BridgeName),
+    emqx_metrics:stop(),
+    ok = emqx_bridge_worker:stop(Pid).
 
 make_config(Ref, TestPid, Result) ->
-    #{test_pid => TestPid,
-      test_ref => Ref,
-      connect_module => ?MODULE,
-      reconnect_delay_ms => 50,
-      connect_result => Result,
-      start_type => auto
-     }.
+    #{
+        start_type => auto,
+        reconnect_interval => 50,
+        config => #{
+            test_pid => TestPid,
+            test_ref => Ref,
+            conn_type => ?MODULE,
+            connect_result => Result
+        }
+    }.

+ 1 - 1
apps/emqx_rule_actions/src/emqx_bridge_mqtt_actions.erl

@@ -526,7 +526,7 @@ connect(Options = #{disk_cache := DiskCache, ecpool_worker_id := Id, pool_name :
             end
     end,
     Options2 = maps:without([ecpool_worker_id, pool_name, append], Options1),
-    emqx_bridge_worker:start_link(name(Pool, Id), Options2).
+    emqx_bridge_worker:start_link(Options2#{name => name(Pool, Id)}).
 name(Pool, Id) ->
     list_to_atom(atom_to_list(Pool) ++ ":" ++ integer_to_list(Id)).
 pool_name(ResId) ->

+ 0 - 1
data/loaded_plugins.tmpl

@@ -2,4 +2,3 @@
 {emqx_dashboard, true}.
 {emqx_modules, {{enable_plugin_emqx_modules}}}.
 {emqx_retainer, {{enable_plugin_emqx_retainer}}}.
-{emqx_bridge_mqtt, {{enable_plugin_emqx_bridge_mqtt}}}.

+ 4 - 3
rebar.config.erl

@@ -192,8 +192,7 @@ overlay_vars_rel(RelType) ->
                  cloud -> "vm.args";
                  edge -> "vm.args.edge"
              end,
-    [ {enable_plugin_emqx_bridge_mqtt, RelType =:= edge}
-    , {enable_plugin_emqx_modules, false} %% modules is not a plugin in ce
+    [ {enable_plugin_emqx_modules, false} %% modules is not a plugin in ce
     , {enable_plugin_emqx_retainer, true}
     , {vm_args_file, VmArgs}
     ].
@@ -256,6 +255,7 @@ relx_apps(ReleaseType) ->
     , emqx_connector
     , emqx_data_bridge
     , emqx_rule_engine
+    , emqx_bridge_mqtt
     ]
     ++ [emqx_telemetry || not is_enterprise()]
     ++ [emqx_modules || not is_enterprise()]
@@ -282,7 +282,6 @@ relx_plugin_apps(ReleaseType) ->
     [ emqx_retainer
     , emqx_management
     , emqx_dashboard
-    , emqx_bridge_mqtt
     , emqx_sn
     , emqx_coap
     , emqx_stomp
@@ -375,6 +374,8 @@ emqx_etc_overlay_common() ->
      {"{{base_dir}}/lib/emqx_telemetry/etc/emqx_telemetry.conf", "etc/plugins/emqx_telemetry.conf"},
      {"{{base_dir}}/lib/emqx_authn/etc/emqx_authn.conf", "etc/plugins/emqx_authn.conf"},
      {"{{base_dir}}/lib/emqx_authz/etc/emqx_authz.conf", "etc/plugins/authz.conf"},
+     {"{{base_dir}}/lib/emqx_rule_engine/etc/emqx_rule_engine.conf", "etc/plugins/emqx_rule_engine.conf"},
+     {"{{base_dir}}/lib/emqx_bridge_mqtt/etc/emqx_bridge_mqtt.conf", "etc/plugins/emqx_bridge_mqtt.conf"},
      %% TODO: check why it has to end with .paho
      %% and why it is put to etc/plugins dir
      {"{{base_dir}}/lib/emqx/etc/acl.conf.paho", "etc/plugins/acl.conf.paho"}].