Просмотр исходного кода

feat(bridges): start one mqtt bridge worker for each in/out channel

Shawn 4 лет назад
Родитель
Сommit
eb8822ce41

+ 16 - 11
apps/emqx_bridge/etc/emqx_bridge.conf

@@ -5,7 +5,8 @@
 bridges.mqtt.my_mqtt_bridge {
     server = "127.0.0.1:1883"
     proto_ver = "v4"
-    clientid = "client1"
+    ## the clientid will be the concatenation of `clientid_prefix` and ids in `in` and `out`.
+    clientid_prefix = "emqx_bridge_"
     username = "username1"
     password = ""
     clean_start = true
@@ -26,20 +27,24 @@ bridges.mqtt.my_mqtt_bridge {
         certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
         cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
     }
+    ## we will create one MQTT connection for each element of the `in`
     in: [{
-        subscribe_remote_topic = "msg/#"
+        id = "pull_msgs_from_aws"
+        subscribe_remote_topic = "aws/#"
         subscribe_qos = 1
-        publish_local_topic = "from_aws/${topic}"
-        publish_payload = "${payload}"
-        publish_qos = "${qos}"
-        publish_retain = "${retain}"
+        local_topic = "from_aws/${topic}"
+        payload = "${payload}"
+        qos = "${qos}"
+        retain = "${retain}"
     }]
+    ## we will create one MQTT connection for each element of the `out`
     out: [{
-        subscribe_local_topic = "msg/#"
-        publish_remote_topic = "from_emqx/${topic}"
-        publish_payload = "${payload}"
-        publish_qos = 1
-        publish_retain = false
+        id = "push_msgs_to_aws"
+        subscribe_local_topic = "emqx/#"
+        remote_topic = "from_emqx/${topic}"
+        payload = "${payload}"
+        qos = 1
+        retain = false
     }]
 }
 

+ 27 - 16
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl

@@ -50,8 +50,9 @@ start(Config) ->
     Parent = self(),
     {Host, Port} = maps:get(server, Config),
     Mountpoint = maps:get(receive_mountpoint, Config, undefined),
-    Subscriptions = maps:get(subscriptions, Config, []),
-    Handlers = make_hdlr(Parent, Mountpoint),
+    Subscriptions = maps:get(subscriptions, Config),
+    Vars = emqx_bridge_msg:make_pub_vars(Mountpoint, Subscriptions),
+    Handlers = make_hdlr(Parent, Vars),
     Config1 = Config#{
         msg_handler => Handlers,
         host => Host,
@@ -59,7 +60,7 @@ start(Config) ->
         force_ping => true,
         proto_ver => maps:get(proto_ver, Config, v4)
     },
-    case emqtt:start_link(without_config(Config1)) of
+    case emqtt:start_link(process_config(Config1)) of
         {ok, Pid} ->
             case emqtt:connect(Pid) of
                 {ok, _} ->
@@ -156,25 +157,35 @@ handle_puback(#{packet_id := PktId, reason_code := RC}, Parent)
 handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) ->
     ?LOG(warning, "Publish ~p to remote node falied, reason_code: ~p", [PktId, RC]).
 
-handle_publish(Msg, Mountpoint) ->
-    emqx_broker:publish(emqx_bridge_msg:to_broker_msg(Msg, Mountpoint)).
+handle_publish(Msg, undefined) ->
+    ?LOG(error, "Cannot publish to local broker as 'bridge.mqtt.<name>.in' not configured, msg: ~p", [Msg]);
+handle_publish(Msg, Vars) ->
+    ?LOG(debug, "Publish to local broker, msg: ~p, vars: ~p", [Msg, Vars]),
+    emqx_broker:publish(emqx_bridge_msg:to_broker_msg(Msg, Vars)).
 
 handle_disconnected(Reason, Parent) ->
     Parent ! {disconnected, self(), Reason}.
 
-make_hdlr(Parent, Mountpoint) ->
+make_hdlr(Parent, Vars) ->
     #{puback => {fun ?MODULE:handle_puback/2, [Parent]},
-      publish => {fun ?MODULE:handle_publish/2, [Mountpoint]},
+      publish => {fun ?MODULE:handle_publish/2, [Vars]},
       disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]}
      }.
 
-subscribe_remote_topics(ClientPid, Subscriptions) ->
-    lists:foreach(fun(#{subscribe_remote_topic := FromTopic, subscribe_qos := QoS}) ->
-        case emqtt:subscribe(ClientPid, FromTopic, QoS) of
-            {ok, _, _} -> ok;
-            Error -> throw(Error)
-        end
-    end, Subscriptions).
+subscribe_remote_topics(_ClientPid, undefined) -> ok;
+subscribe_remote_topics(ClientPid, #{subscribe_remote_topic := FromTopic, subscribe_qos := QoS}) ->
+    case emqtt:subscribe(ClientPid, FromTopic, QoS) of
+        {ok, _, _} -> ok;
+        Error -> throw(Error)
+    end.
+
+process_config(#{name := Name, clientid_prefix := Prefix} = Config) ->
+    Conf0 = maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config),
+    Conf0#{clientid => iolist_to_binary([str(Prefix), str(Name)])}.
 
-without_config(Config) ->
-    maps:without([conn_type, address, receive_mountpoint, subscriptions], Config).
+str(A) when is_atom(A) ->
+    atom_to_list(A);
+str(B) when is_binary(B) ->
+    binary_to_list(B);
+str(S) when is_list(S) ->
+    S.

+ 0 - 1
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_sup.erl

@@ -65,6 +65,5 @@ drop_bridge(Name) ->
         ok ->
             supervisor:delete_child(?MODULE, Name);
         {error, Error} ->
-            ?LOG(error, "Delete bridge failed, error : ~p", [Error]),
             {error, Error}
     end.

+ 35 - 31
apps/emqx_bridge_mqtt/src/emqx_bridge_msg.erl

@@ -18,9 +18,8 @@
 
 -export([ to_binary/1
         , from_binary/1
-        , to_export/3
-        , to_broker_msgs/1
-        , to_broker_msg/1
+        , make_pub_vars/2
+        , to_remote_msg/3
         , to_broker_msg/2
         , estimate_size/1
         ]).
@@ -44,6 +43,12 @@
     payload := binary()
 }.
 
+make_pub_vars(_, undefined) -> undefined;
+make_pub_vars(Mountpoint, #{payload := _, qos := _, retain := _, remote_topic := Topic} = Conf) ->
+    Conf#{topic => Topic, mountpoint => Mountpoint};
+make_pub_vars(Mountpoint, #{payload := _, qos := _, retain := _, local_topic := Topic} = Conf) ->
+    Conf#{topic => Topic, mountpoint => Mountpoint}.
+
 %% @doc Make export format:
 %% 1. Mount topic to a prefix
 %% 2. Fix QoS to 1
@@ -51,37 +56,52 @@
 %% Shame that we have to know the callback module here
 %% would be great if we can get rid of #mqtt_msg{} record
 %% and use #message{} in all places.
--spec to_export(emqx_bridge_rpc | emqx_bridge_worker, variables(), msg())
+-spec to_remote_msg(emqx_bridge_rpc | emqx_bridge_worker, msg(), variables())
         -> exp_msg().
-to_export(emqx_bridge_mqtt, Vars, #message{flags = Flags0} = Msg) ->
+to_remote_msg(emqx_bridge_mqtt, #message{flags = Flags0} = Msg, Vars) ->
     Retain0 = maps:get(retain, Flags0, false),
     MapMsg = maps:put(retain, Retain0, emqx_message:to_map(Msg)),
-    to_export(emqx_bridge_mqtt, Vars, MapMsg);
-to_export(emqx_bridge_mqtt, #{topic := TopicToken, payload := PayloadToken,
-        qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint},
-        MapMsg) when is_map(MapMsg) ->
+    to_remote_msg(emqx_bridge_mqtt, MapMsg, Vars);
+to_remote_msg(emqx_bridge_mqtt, MapMsg, #{topic := TopicToken, payload := PayloadToken,
+        qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) ->
     Topic = replace_vars_in_str(TopicToken, MapMsg),
     Payload = replace_vars_in_str(PayloadToken, MapMsg),
-    QoS = replace_vars(QoSToken, MapMsg),
-    Retain = replace_vars(RetainToken, MapMsg),
+    QoS = replace_simple_var(QoSToken, MapMsg),
+    Retain = replace_simple_var(RetainToken, MapMsg),
     #mqtt_msg{qos = QoS,
               retain = Retain,
               topic = topic(Mountpoint, Topic),
               props = #{},
               payload = Payload};
-to_export(_Module, #{mountpoint := Mountpoint},
-          #message{topic = Topic} = Msg) ->
+to_remote_msg(_Module, #message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
     Msg#message{topic = topic(Mountpoint, Topic)}.
 
+%% published from remote node over a MQTT connection
+to_broker_msg(#{dup := Dup, properties := Props} = MapMsg,
+            #{topic := TopicToken, payload := PayloadToken,
+              qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) ->
+    Topic = replace_vars_in_str(TopicToken, MapMsg),
+    Payload = replace_vars_in_str(PayloadToken, MapMsg),
+    QoS = replace_simple_var(QoSToken, MapMsg),
+    Retain = replace_simple_var(RetainToken, MapMsg),
+    set_headers(Props,
+        emqx_message:set_flags(#{dup => Dup, retain => Retain},
+            emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))).
+
+%% Replace a string contains vars to another string in which the placeholders are replace by the
+%% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be:
+%% "a: 1".
 replace_vars_in_str(Tokens, Data) when is_list(Tokens) ->
     emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => full_binary});
 replace_vars_in_str(Val, _Data) ->
     Val.
 
-replace_vars(Tokens, Data) when is_list(Tokens) ->
+%% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result
+%% value will be an integer 1.
+replace_simple_var(Tokens, Data) when is_list(Tokens) ->
     [Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}),
     Var;
-replace_vars(Val, _Data) ->
+replace_simple_var(Val, _Data) ->
     Val.
 
 %% @doc Make `binary()' in order to make iodata to be persisted on disk.
@@ -98,22 +118,6 @@ from_binary(Bin) -> binary_to_term(Bin).
 estimate_size(#message{topic = Topic, payload = Payload}) ->
     size(Topic) + size(Payload).
 
-%% @doc By message/batch receiver, transform received batch into
-%% messages to deliver to local brokers.
-to_broker_msgs(Batch) -> lists:map(fun to_broker_msg/1, Batch).
-
-to_broker_msg(#message{} = Msg) ->
-    %% internal format from another EMQX node via rpc
-    Msg;
-to_broker_msg(Msg) ->
-    to_broker_msg(Msg, undefined).
-to_broker_msg(#{qos := QoS, dup := Dup, retain := Retain, topic := Topic,
-                properties := Props, payload := Payload}, Mountpoint) ->
-    %% published from remote node over a MQTT connection
-    set_headers(Props,
-        emqx_message:set_flags(#{dup => Dup, retain => Retain},
-            emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))).
-
 set_headers(undefined, Msg) ->
     Msg;
 set_headers(Val, Msg) ->

+ 41 - 145
apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl

@@ -90,13 +90,9 @@
         ]).
 
 -export([ get_forwards/1
-        , ensure_forward_present/2
-        , ensure_forward_absent/2
         ]).
 
 -export([ get_subscriptions/1
-        , ensure_subscription_present/3
-        , ensure_subscription_absent/2
         ]).
 
 %% Internal
@@ -183,45 +179,23 @@ get_forwards(Name) -> gen_statem:call(name(Name), get_forwards, timer:seconds(10
 -spec get_subscriptions(id()) -> [{emqx_topic:topic(), qos()}].
 get_subscriptions(Name) -> gen_statem:call(name(Name), get_subscriptions).
 
-%% @doc Add a new forward (local topic subscription).
--spec ensure_forward_present(id(), topic()) -> ok.
-ensure_forward_present(Name, Topic) ->
-    gen_statem:call(name(Name), {ensure_forward_present, topic(Topic)}).
-
-%% @doc Ensure a forward topic is deleted.
--spec ensure_forward_absent(id(), topic()) -> ok.
-ensure_forward_absent(Name, Topic) ->
-    gen_statem:call(name(Name), {ensure_forward_absent, topic(Topic)}).
-
-%% @doc Ensure subscribed to remote topic.
-%% NOTE: only applicable when connection module is emqx_bridge_mqtt
-%%       return `{error, no_remote_subscription_support}' otherwise.
--spec ensure_subscription_present(id(), topic(), qos()) -> ok | {error, any()}.
-ensure_subscription_present(Name, Topic, QoS) ->
-    gen_statem:call(name(Name), {ensure_subscription_present, topic(Topic), QoS}).
-
-%% @doc Ensure unsubscribed from remote topic.
-%% NOTE: only applicable when connection module is emqx_bridge_mqtt
--spec ensure_subscription_absent(id(), topic()) -> ok.
-ensure_subscription_absent(Name, Topic) ->
-    gen_statem:call(name(Name), {ensure_subscription_absent, topic(Topic)}).
-
 callback_mode() -> [state_functions].
 
 %% @doc Config should be a map().
-init(Opts) ->
+init(#{name := Name} = ConnectOpts) ->
+    ?LOG(info, "starting bridge worker for ~p", [Name]),
     erlang:process_flag(trap_exit, true),
-    ConnectOpts = maps:get(config, Opts),
     ConnectModule = conn_type(maps:get(conn_type, ConnectOpts)),
-    Forwards = maps:get(forwards, Opts, []),
-    Queue = open_replayq(maps:get(replayq, Opts, #{})),
-    State = init_state(Opts),
+    Forwards = maps:get(forwards, ConnectOpts, #{}),
+    Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})),
+    State = init_state(ConnectOpts),
     self() ! idle,
-    {ok, idle, State#{connect_module => ConnectModule,
-                      connect_opts => pre_process_opts(ConnectOpts),
-                      forwards => Forwards,
-                      replayq => Queue
-                     }}.
+    {ok, idle, State#{
+        connect_module => ConnectModule,
+        connect_opts => pre_process_opts(ConnectOpts),
+        forwards => Forwards,
+        replayq => Queue
+    }}.
 
 init_state(Opts) ->
     IfRecordMetrics = maps:get(if_record_metrics, Opts, true),
@@ -241,27 +215,29 @@ init_state(Opts) ->
       if_record_metrics => IfRecordMetrics,
       name => Name}.
 
-open_replayq(QCfg) ->
+open_replayq(Name, QCfg) ->
     Dir = maps:get(dir, QCfg, undefined),
     SegBytes = maps:get(seg_bytes, QCfg, ?DEFAULT_SEG_BYTES),
     MaxTotalSize = maps:get(max_total_size, QCfg, ?DEFAULT_MAX_TOTAL_SIZE),
     QueueConfig = case Dir =:= undefined orelse Dir =:= "" of
         true -> #{mem_only => true};
-        false -> #{dir => Dir, seg_bytes => SegBytes, max_total_size => MaxTotalSize}
+        false -> #{dir => filename:join([Dir, node(), Name]),
+                   seg_bytes => SegBytes, max_total_size => MaxTotalSize}
     end,
     replayq:open(QueueConfig#{sizer => fun emqx_bridge_msg:estimate_size/1,
                               marshaller => fun ?MODULE:msg_marshaller/1}).
 
 pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts) ->
-    ConnectOpts#{subscriptions => [pre_process_in_out(In) || In <- InConf],
-                 forwards => [pre_process_in_out(Out) || Out <- OutConf]}.
+    ConnectOpts#{subscriptions => pre_process_in_out(InConf),
+                 forwards => pre_process_in_out(OutConf)}.
 
-pre_process_in_out(Conf) ->
-    Conf1 = pre_process_conf(publish_local_topic, Conf),
-    Conf2 = pre_process_conf(publish_remote_topic, Conf1),
-    Conf3 = pre_process_conf(publish_payload, Conf2),
-    Conf4 = pre_process_conf(publish_qos, Conf3),
-    pre_process_conf(publish_retain, Conf4).
+pre_process_in_out(undefined) -> undefined;
+pre_process_in_out(Conf) when is_map(Conf) ->
+    Conf1 = pre_process_conf(local_topic, Conf),
+    Conf2 = pre_process_conf(remote_topic, Conf1),
+    Conf3 = pre_process_conf(payload, Conf2),
+    Conf4 = pre_process_conf(qos, Conf3),
+    pre_process_conf(retain, Conf4).
 
 pre_process_conf(Key, Conf) ->
     case maps:find(Key, Conf) of
@@ -350,19 +326,7 @@ common(_StateName, {call, From}, ensure_stopped, #{connection := Conn,
 common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) ->
     {keep_state_and_data, [{reply, From, Forwards}]};
 common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) ->
-    {keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, [])}]};
-common(_StateName, {call, From}, {ensure_forward_present, Topic}, State) ->
-    {Result, NewState} = do_ensure_forward_present(Topic, State),
-    {keep_state, NewState, [{reply, From, Result}]};
-common(_StateName, {call, From}, {ensure_subscription_present, Topic, QoS}, State) ->
-    {Result, NewState} = do_ensure_subscription_present(Topic, QoS, State),
-    {keep_state, NewState, [{reply, From, Result}]};
-common(_StateName, {call, From}, {ensure_forward_absent, Topic}, State) ->
-    {Result, NewState} = do_ensure_forward_absent(Topic, State),
-    {keep_state, NewState, [{reply, From, Result}]};
-common(_StateName, {call, From}, {ensure_subscription_absent, Topic}, State) ->
-    {Result, NewState} = do_ensure_subscription_absent(Topic, State),
-    {keep_state, NewState, [{reply, From, Result}]};
+    {keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]};
 common(_StateName, info, {deliver, _, Msg},
        State = #{replayq := Q, if_record_metrics := IfRecordMetric}) ->
     Msgs = collect([Msg]),
@@ -379,75 +343,15 @@ common(StateName, Type, Content, #{name := Name} = State) ->
           [Name, Type, StateName, Content]),
     {keep_state, State}.
 
-do_ensure_forward_present(Topic, #{forwards := Forwards, name := Name} = State) ->
-    case is_local_sub_present(Topic, Forwards) of
-        true ->
-            {ok, State};
-        false ->
-            R = subscribe_local_topic(Topic, Name),
-            {R, State#{forwards => [Topic | Forwards]}}
-    end.
-
-do_ensure_subscription_present(_Topic, _QoS, #{connection := undefined} = State) ->
-    {{error, no_connection}, State};
-do_ensure_subscription_present(_Topic, _QoS, #{connect_module := emqx_bridge_rpc} = State) ->
-    {{error, no_remote_subscription_support}, State};
-do_ensure_subscription_present(Topic, QoS, #{connect_module := ConnectModule,
-                                          connection := Conn} = State) ->
-    case is_remote_sub_present(Topic, maps:get(subscriptions, Conn, [])) of
-        true ->
-            {ok, State};
-        false ->
-            case ConnectModule:ensure_subscribed(Conn, Topic, QoS) of
-                {error, Error} ->
-                    {{error, Error}, State};
-                Conn1 ->
-                    {ok, State#{connection => Conn1}}
-            end
-    end.
-
-do_ensure_forward_absent(Topic, #{forwards := Forwards} = State) ->
-    case is_local_sub_present(Topic, Forwards) of
-        true ->
-            R = do_unsubscribe(Topic),
-            {R, State#{forwards => lists:delete(Topic, Forwards)}};
-        false ->
-            {ok, State}
-    end.
-do_ensure_subscription_absent(_Topic, #{connection := undefined} = State) ->
-    {{error, no_connection}, State};
-do_ensure_subscription_absent(_Topic, #{connect_module := emqx_bridge_rpc} = State) ->
-    {{error, no_remote_subscription_support}, State};
-do_ensure_subscription_absent(Topic, #{connect_module := ConnectModule,
-                                       connection := Conn} = State) ->
-    case is_remote_sub_present(Topic, maps:get(subscriptions, Conn, [])) of
-        true ->
-            case ConnectModule:ensure_unsubscribed(Conn, Topic) of
-                {error, Error} ->
-                    {{error, Error}, State};
-                Conn1 ->
-                    {ok, State#{connection => Conn1}}
-            end;
-        false ->
-            {ok, State}
-    end.
-
-is_local_sub_present(Topic, Configs) ->
-    is_topic_present(subscribe_local_topic, Topic, Configs).
-is_remote_sub_present(Topic, Configs) ->
-    is_topic_present(subscribe_remote_topic, Topic, Configs).
-
-is_topic_present(Type, Topic, Configs) ->
-    lists:any(fun(Conf) ->
-            Topic == maps:get(Type, Conf, undefined)
-        end, Configs).
-
 do_connect(#{forwards := Forwards,
              connect_module := ConnectModule,
              connect_opts := ConnectOpts,
              inflight := Inflight,
              name := Name} = State) ->
-    ok = subscribe_local_topics(Forwards, Name),
+    case Forwards of
+        undefined -> ok;
+        #{subscribe_local_topic := Topic} -> subscribe_local_topic(Topic, Name)
+    end,
     case ConnectModule:start(ConnectOpts) of
         {ok, Conn} ->
             ?tp(info, connected, #{name => Name, inflight => length(Inflight)}),
@@ -503,16 +407,18 @@ pop_and_send_loop(#{replayq := Q, connect_module := Module} = State, N) ->
     end.
 
 %% Assert non-empty batch because we have a is_empty check earlier.
+do_send(#{forwards := undefined}, _QAckRef, Batch) ->
+    ?LOG(error, "cannot forward messages to remote broker as 'bridge.mqtt.<name>.in' not configured, msg: ~p", [Batch]);
 do_send(#{inflight := Inflight,
           connect_module := Module,
           connection := Connection,
           mountpoint := Mountpoint,
           forwards := Forwards,
           if_record_metrics := IfRecordMetrics} = State, QAckRef, [_ | _] = Batch) ->
-    Vars = make_export_variables(Mountpoint, Forwards),
+    Vars = emqx_bridge_msg:make_pub_vars(Mountpoint, Forwards),
     ExportMsg = fun(Message) ->
                     bridges_metrics_inc(IfRecordMetrics, 'bridge.mqtt.message_sent'),
-                    emqx_bridge_msg:to_export(Module, Vars, Message)
+                    emqx_bridge_msg:to_remote_msg(Module, Message, Vars)
                 end,
     case Module:send(Connection, [ExportMsg(M) || M <- Batch]) of
         {ok, Refs} ->
@@ -524,15 +430,6 @@ do_send(#{inflight := Inflight,
             {error, State}
     end.
 
-make_export_variables(Mountpoint, #{
-        publish_remote_topic := PubTopic,
-        publish_payload := PayloadTmpl,
-        publish_qos := PubQoS,
-        publish_retain := PubRetain}) ->
-    #{topic => PubTopic, payload => PayloadTmpl,
-      qos => PubQoS, retain => PubRetain,
-      mountpoint => Mountpoint}.
-
 %% map as set, ack-reference -> 1
 map_set(Ref) when is_reference(Ref) ->
     %% QoS-0 or RPC call returns a reference
@@ -578,9 +475,6 @@ drop_acked_batches(Q, [#{send_ack_ref := Refs,
             All
     end.
 
-subscribe_local_topics(Topics, Name) ->
-    lists:foreach(fun(Topic) -> subscribe_local_topic(Topic, Name) end, Topics).
-
 subscribe_local_topic(Topic, Name) ->
     do_subscribe(Topic, Name).
 
@@ -597,14 +491,9 @@ validate(RawTopic) ->
 
 do_subscribe(RawTopic, Name) ->
     TopicFilter = validate(RawTopic),
-    {Topic, SubOpts} = emqx_topic:parse(TopicFilter, #{qos => ?QOS_1}),
+    {Topic, SubOpts} = emqx_topic:parse(TopicFilter, #{qos => ?QOS_2}),
     emqx_broker:subscribe(Topic, Name, SubOpts).
 
-do_unsubscribe(RawTopic) ->
-    TopicFilter = validate(RawTopic),
-    {Topic, _SubOpts} = emqx_topic:parse(TopicFilter),
-    emqx_broker:unsubscribe(Topic).
-
 disconnect(#{connection := Conn,
              connect_module := Module
             } = State) when Conn =/= undefined ->
@@ -622,7 +511,7 @@ format_mountpoint(undefined) ->
 format_mountpoint(Prefix) ->
     binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)).
 
-name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])).
+name(Id) -> list_to_atom(str(Id)).
 
 register_metrics() ->
     lists:foreach(fun emqx_metrics:ensure/1,
@@ -657,3 +546,10 @@ conn_type(mqtt) ->
     emqx_bridge_mqtt;
 conn_type(Mod) when is_atom(Mod) ->
     Mod.
+
+str(A) when is_atom(A) ->
+    atom_to_list(A);
+str(B) when is_binary(B) ->
+    binary_to_list(B);
+str(S) when is_list(S) ->
+    S.

+ 117 - 61
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -40,7 +40,7 @@ fields("config") ->
     , {reconnect_interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "30s")}
     , {proto_ver, fun proto_ver/1}
     , {bridge_mode, emqx_schema:t(boolean(), undefined, true)}
-    , {clientid, emqx_schema:t(string())}
+    , {clientid_prefix, emqx_schema:t(string())}
     , {username, emqx_schema:t(string())}
     , {password, emqx_schema:t(string())}
     , {clean_start, emqx_schema:t(boolean(), undefined, true)}
@@ -54,14 +54,14 @@ fields("config") ->
 
 fields("in") ->
     [ {subscribe_remote_topic, #{type => binary(), nullable => false}}
-    , {publish_local_topic, emqx_schema:t(binary(), undefined, <<"${topic}">>)}
+    , {local_topic, emqx_schema:t(binary(), undefined, <<"${topic}">>)}
     , {subscribe_qos, emqx_schema:t(hoconsc:union([0, 1, 2, binary()]), undefined, 1)}
-    ] ++ publish_confs();
+    ] ++ common_inout_confs();
 
 fields("out") ->
     [ {subscribe_local_topic, #{type => binary(), nullable => false}}
-    , {publish_remote_topic, emqx_schema:t(binary(), undefined, <<"${topic}">>)}
-    ] ++ publish_confs();
+    , {remote_topic, emqx_schema:t(binary(), undefined, <<"${topic}">>)}
+    ] ++ common_inout_confs();
 
 fields("replayq") ->
     [ {dir, hoconsc:union([boolean(), string()])}
@@ -70,10 +70,13 @@ fields("replayq") ->
     , {max_total_bytes, emqx_schema:t(emqx_schema:bytesize(), undefined, "1024MB")}
     ].
 
+common_inout_confs() ->
+    [{id, #{type => binary(), nullable => false}}] ++ publish_confs().
+
 publish_confs() ->
-    [ {publish_qos, emqx_schema:t(hoconsc:union([0, 1, 2, binary()]), undefined, <<"${qos}">>)}
-    , {publish_retain, emqx_schema:t(hoconsc:union([boolean(), binary()]), undefined, <<"${retain}">>)}
-    , {publish_payload, emqx_schema:t(binary(), undefined, <<"${payload}">>)}
+    [ {qos, emqx_schema:t(hoconsc:union([0, 1, 2, binary()]), undefined, <<"${qos}">>)}
+    , {retain, emqx_schema:t(hoconsc:union([boolean(), binary()]), undefined, <<"${retain}">>)}
+    , {payload, emqx_schema:t(binary(), undefined, <<"${payload}">>)}
     ].
 
 proto_ver(type) -> hoconsc:enum([v3, v4, v5]);
@@ -81,72 +84,125 @@ proto_ver(default) -> v4;
 proto_ver(_) -> undefined.
 
 %% ===================================================================
-on_start(InstId, #{server := Server,
-                   reconnect_interval := ReconnIntv,
-                   proto_ver := ProtoVer,
-                   bridge_mode := BridgeMod,
-                   clientid := ClientID,
-                   username := User,
-                   password := Password,
-                   clean_start := CleanStart,
-                   keepalive := KeepAlive,
-                   retry_interval := RetryIntv,
-                   max_inflight := MaxInflight,
-                   replayq := ReplayQ,
-                   in := In,
-                   out := Out,
-                   ssl := #{enable := EnableSsl} = Ssl} = Conf) ->
+on_start(InstId, Conf) ->
     logger:info("starting mqtt connector: ~p, ~p", [InstId, Conf]),
-    BridgeName = binary_to_atom(InstId, latin1),
-    BridgeConf = Conf#{
-        name => BridgeName,
-        config => #{
-            conn_type => mqtt,
-            subscriptions => In,
-            forwards => Out,
-            replayq => ReplayQ,
-            %% connection opts
-            server => Server,
-            reconnect_interval => ReconnIntv,
-            proto_ver => ProtoVer,
-            bridge_mode => BridgeMod,
-            clientid => ClientID,
-            username => User,
-            password => Password,
-            clean_start => CleanStart,
-            keepalive => KeepAlive,
-            retry_interval => RetryIntv,
-            max_inflight => MaxInflight,
-            ssl => EnableSsl,
-            ssl_opts => maps:to_list(maps:remove(enable, Ssl)),
-            if_record_metrics => true
-        }
-    },
-    case emqx_bridge_mqtt_sup:create_bridge(BridgeConf) of
-        {ok, _Pid} ->
-            start_bridge(BridgeName);
-        {error, {already_started, _Pid}} ->
-            start_bridge(BridgeName);
-        {error, Reason} ->
-            {error, Reason}
-    end.
+    NamePrefix = binary_to_list(InstId),
+    BasicConf = basic_config(Conf),
+    InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, sub_bridges => []}},
+    InOutConfigs = check_channel_id_dup(maps:get(in, Conf, []) ++ maps:get(out, Conf, [])),
+    lists:foldl(fun
+            (_InOutConf, {error, Reason}) ->
+                {error, Reason};
+            (InOutConf, {ok, #{sub_bridges := SubBridges} = Res}) ->
+                case create_channel(InOutConf, NamePrefix, BasicConf) of
+                    {error, Reason} -> {error, Reason};
+                    {ok, Name} -> {ok, Res#{sub_bridges => [Name | SubBridges]}}
+                end
+        end, InitRes, InOutConfigs).
 
 on_stop(InstId, #{}) ->
     logger:info("stopping mqtt connector: ~p", [InstId]),
-    emqx_bridge_mqtt_sup:drop_bridge(InstId).
+    case emqx_bridge_mqtt_sup:drop_bridge(InstId) of
+        ok -> ok;
+        {error, not_found} -> ok;
+        {error, Reason} ->
+            logger:error("stop bridge failed, error: ~p", [Reason])
+    end.
 
 %% TODO: let the emqx_resource trigger on_query/4 automatically according to the
 %%  `in` and `out` config
+on_query(InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix,
+        baisc_conf := BasicConf}) ->
+    logger:debug("create channel to connector: ~p, conf: ~p", [InstId, Conf]),
+    create_channel(Conf, Prefix, BasicConf);
 on_query(InstId, {publish_to_local, Msg}, _AfterQuery, _State) ->
     logger:debug("publish to local node, connector: ~p, msg: ~p", [InstId, Msg]);
 on_query(InstId, {publish_to_remote, Msg}, _AfterQuery, _State) ->
     logger:debug("publish to remote node, connector: ~p, msg: ~p", [InstId, Msg]).
 
-on_health_check(_InstId, #{bridge_name := Name}) ->
-    {ok, emqx_bridge_worker:ping(Name)}.
+on_health_check(_InstId, #{sub_bridges := NameList} = State) ->
+    Results = [{Name, emqx_bridge_worker:ping(Name)} || Name <- NameList],
+    case lists:all(fun({_, pong}) -> true; ({_, _}) -> false end, Results) of
+        true -> {ok, State};
+        false -> {error, {some_sub_bridge_down, Results}, State}
+    end.
+
+check_channel_id_dup(Confs) ->
+    lists:foreach(fun(#{id := Id}) ->
+            case length([Id || #{id := Id0} <- Confs, Id0 == Id]) of
+                1 -> ok;
+                L when L > 1 -> error({mqtt_bridge_conf, {duplicate_id_found, Id}})
+            end
+        end, Confs),
+    Confs.
+
+%% this is an `in` bridge
+create_channel(#{subscribe_remote_topic := _, id := BridgeId} = InConf, NamePrefix, BasicConf) ->
+    logger:info("creating 'in' channel for: ~p", [BridgeId]),
+    create_sub_bridge(BasicConf#{name => bridge_name(NamePrefix, BridgeId),
+        subscriptions => InConf, forwards => undefined});
+%% this is an `out` bridge
+create_channel(#{subscribe_local_topic := _, id := BridgeId} = OutConf, NamePrefix, BasicConf) ->
+    logger:info("creating 'out' channel for: ~p", [BridgeId]),
+    create_sub_bridge(BasicConf#{name => bridge_name(NamePrefix, BridgeId),
+        subscriptions => undefined, forwards => OutConf}).
+
+create_sub_bridge(#{name := Name} = Conf) ->
+    case emqx_bridge_mqtt_sup:create_bridge(Conf) of
+        {ok, _Pid} ->
+            start_sub_bridge(Name);
+        {error, {already_started, _Pid}} ->
+            ok;
+        {error, Reason} ->
+            {error, Reason}
+    end.
 
-start_bridge(Name) ->
+start_sub_bridge(Name) ->
     case emqx_bridge_worker:ensure_started(Name) of
-        ok -> {ok, #{bridge_name => Name}};
+        ok -> {ok, Name};
         {error, Reason} -> {error, Reason}
     end.
+
+basic_config(#{
+        server := Server,
+        reconnect_interval := ReconnIntv,
+        proto_ver := ProtoVer,
+        bridge_mode := BridgeMod,
+        clientid_prefix := ClientIdPrefix,
+        username := User,
+        password := Password,
+        clean_start := CleanStart,
+        keepalive := KeepAlive,
+        retry_interval := RetryIntv,
+        max_inflight := MaxInflight,
+        replayq := ReplayQ,
+        ssl := #{enable := EnableSsl} = Ssl}) ->
+    #{
+        conn_type => mqtt,
+        replayq => ReplayQ,
+        %% connection opts
+        server => Server,
+        reconnect_interval => ReconnIntv,
+        proto_ver => ProtoVer,
+        bridge_mode => BridgeMod,
+        clientid_prefix => ClientIdPrefix,
+        username => User,
+        password => Password,
+        clean_start => CleanStart,
+        keepalive => KeepAlive,
+        retry_interval => RetryIntv,
+        max_inflight => MaxInflight,
+        ssl => EnableSsl,
+        ssl_opts => maps:to_list(maps:remove(enable, Ssl)),
+        if_record_metrics => true
+    }.
+
+bridge_name(Prefix, Id) ->
+    list_to_atom(str(Prefix) ++ ":" ++ str(Id)).
+
+str(A) when is_atom(A) ->
+    atom_to_list(A);
+str(B) when is_binary(B) ->
+    binary_to_list(B);
+str(S) when is_list(S) ->
+    S.

+ 1 - 0
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -222,6 +222,7 @@ do_compare('<=', L, R) -> L =< R;
 do_compare('>=', L, R) -> L >= R;
 do_compare('<>', L, R) -> L /= R;
 do_compare('!=', L, R) -> L /= R;
+do_compare('~=', T, F) -> emqx_topic:match(T, F);
 do_compare('=~', T, F) -> emqx_topic:match(T, F).
 
 number(Bin) ->