Browse Source

feat(bridge): new configs for mqtt bridge

Shawn 4 years ago
parent
commit
4f82debbe7

+ 11 - 7
apps/emqx_bridge/etc/emqx_bridge.conf

@@ -27,15 +27,19 @@ bridges.mqtt.my_mqtt_bridge {
         cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
         cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
     }
     }
     in: [{
     in: [{
-        from_remote_topic = "msg/#"
-        to_local_topic = "from_aws/${topic}"
-        payload_template = "${message}"
-        qos = 1
+        subscribe_remote_topic = "msg/#"
+        subscribe_qos = 1
+        publish_local_topic = "from_aws/${topic}"
+        publish_payload = "${payload}"
+        publish_qos = "${qos}"
+        publish_retain = "${retain}"
     }]
     }]
     out: [{
     out: [{
-        from_local_topic = "msg/#"
-        to_remote_topic = "from_emqx/${topic}"
-        payload_template = "${message}"
+        subscribe_local_topic = "msg/#"
+        publish_remote_topic = "from_emqx/${topic}"
+        publish_payload = "${payload}"
+        publish_qos = 1
+        publish_retain = false
     }]
     }]
 }
 }
 
 

+ 5 - 15
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl

@@ -34,9 +34,6 @@
         , handle_disconnected/2
         , handle_disconnected/2
         ]).
         ]).
 
 
--export([ check_subscriptions/1
-        ]).
-
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 
 
@@ -54,7 +51,6 @@ start(Config) ->
     {Host, Port} = maps:get(server, Config),
     {Host, Port} = maps:get(server, Config),
     Mountpoint = maps:get(receive_mountpoint, Config, undefined),
     Mountpoint = maps:get(receive_mountpoint, Config, undefined),
     Subscriptions = maps:get(subscriptions, Config, []),
     Subscriptions = maps:get(subscriptions, Config, []),
-    Subscriptions1 = check_subscriptions(Subscriptions),
     Handlers = make_hdlr(Parent, Mountpoint),
     Handlers = make_hdlr(Parent, Mountpoint),
     Config1 = Config#{
     Config1 = Config#{
         msg_handler => Handlers,
         msg_handler => Handlers,
@@ -68,8 +64,8 @@ start(Config) ->
             case emqtt:connect(Pid) of
             case emqtt:connect(Pid) of
                 {ok, _} ->
                 {ok, _} ->
                     try
                     try
-                        Subscriptions2 = subscribe_remote_topics(Pid, Subscriptions1),
-                        {ok, #{client_pid => Pid, subscriptions => Subscriptions2}}
+                        ok = subscribe_remote_topics(Pid, Subscriptions),
+                        {ok, #{client_pid => Pid, subscriptions => Subscriptions}}
                     catch
                     catch
                         throw : Reason ->
                         throw : Reason ->
                             ok = stop(#{client_pid => Pid}),
                             ok = stop(#{client_pid => Pid}),
@@ -173,18 +169,12 @@ make_hdlr(Parent, Mountpoint) ->
      }.
      }.
 
 
 subscribe_remote_topics(ClientPid, Subscriptions) ->
 subscribe_remote_topics(ClientPid, Subscriptions) ->
-    lists:map(fun({Topic, Qos}) ->
-        case emqtt:subscribe(ClientPid, Topic, Qos) of
-            {ok, _, _} -> {Topic, Qos};
+    lists:foreach(fun(#{subscribe_remote_topic := FromTopic, subscribe_qos := QoS}) ->
+        case emqtt:subscribe(ClientPid, FromTopic, QoS) of
+            {ok, _, _} -> ok;
             Error -> throw(Error)
             Error -> throw(Error)
         end
         end
     end, Subscriptions).
     end, Subscriptions).
 
 
 without_config(Config) ->
 without_config(Config) ->
     maps:without([conn_type, address, receive_mountpoint, subscriptions], Config).
     maps:without([conn_type, address, receive_mountpoint, subscriptions], Config).
-
-check_subscriptions(Subscriptions) ->
-    lists:map(fun(#{qos := QoS, topic := Topic}) ->
-        true = emqx_topic:validate({filter, Topic}),
-        {Topic, QoS}
-    end, Subscriptions).

+ 33 - 10
apps/emqx_bridge_mqtt/src/emqx_bridge_msg.erl

@@ -36,6 +36,14 @@
 -type msg() :: emqx_types:message().
 -type msg() :: emqx_types:message().
 -type exp_msg() :: emqx_types:message() | #mqtt_msg{}.
 -type exp_msg() :: emqx_types:message() | #mqtt_msg{}.
 
 
+-type variables() :: #{
+    mountpoint := undefined | binary(),
+    topic := binary(),
+    qos := original | integer(),
+    retain := original | boolean(),
+    payload := binary()
+}.
+
 %% @doc Make export format:
 %% @doc Make export format:
 %% 1. Mount topic to a prefix
 %% 1. Mount topic to a prefix
 %% 2. Fix QoS to 1
 %% 2. Fix QoS to 1
@@ -43,24 +51,39 @@
 %% Shame that we have to know the callback module here
 %% Shame that we have to know the callback module here
 %% would be great if we can get rid of #mqtt_msg{} record
 %% would be great if we can get rid of #mqtt_msg{} record
 %% and use #message{} in all places.
 %% and use #message{} in all places.
--spec to_export(emqx_bridge_rpc | emqx_bridge_worker,
-                undefined | binary(), msg()) -> exp_msg().
-to_export(emqx_bridge_mqtt, Mountpoint,
-          #message{topic = Topic,
-                   payload = Payload,
-                   flags = Flags,
-                   qos = QoS
-                  }) ->
-    Retain = maps:get(retain, Flags, false),
+-spec to_export(emqx_bridge_rpc | emqx_bridge_worker, variables(), msg())
+        -> exp_msg().
+to_export(emqx_bridge_mqtt, Vars, #message{flags = Flags0} = Msg) ->
+    Retain0 = maps:get(retain, Flags0, false),
+    MapMsg = maps:put(retain, Retain0, emqx_message:to_map(Msg)),
+    to_export(emqx_bridge_mqtt, Vars, MapMsg);
+to_export(emqx_bridge_mqtt, #{topic := TopicToken, payload := PayloadToken,
+        qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint},
+        MapMsg) when is_map(MapMsg) ->
+    Topic = replace_vars_in_str(TopicToken, MapMsg),
+    Payload = replace_vars_in_str(PayloadToken, MapMsg),
+    QoS = replace_vars(QoSToken, MapMsg),
+    Retain = replace_vars(RetainToken, MapMsg),
     #mqtt_msg{qos = QoS,
     #mqtt_msg{qos = QoS,
               retain = Retain,
               retain = Retain,
               topic = topic(Mountpoint, Topic),
               topic = topic(Mountpoint, Topic),
               props = #{},
               props = #{},
               payload = Payload};
               payload = Payload};
-to_export(_Module, Mountpoint,
+to_export(_Module, #{mountpoint := Mountpoint},
           #message{topic = Topic} = Msg) ->
           #message{topic = Topic} = Msg) ->
     Msg#message{topic = topic(Mountpoint, Topic)}.
     Msg#message{topic = topic(Mountpoint, Topic)}.
 
 
+replace_vars_in_str(Tokens, Data) when is_list(Tokens) ->
+    emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => full_binary});
+replace_vars_in_str(Val, _Data) ->
+    Val.
+
+replace_vars(Tokens, Data) when is_list(Tokens) ->
+    [Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}),
+    Var;
+replace_vars(Val, _Data) ->
+    Val.
+
 %% @doc Make `binary()' in order to make iodata to be persisted on disk.
 %% @doc Make `binary()' in order to make iodata to be persisted on disk.
 -spec to_binary(msg()) -> binary().
 -spec to_binary(msg()) -> binary().
 to_binary(Msg) -> term_to_binary(Msg).
 to_binary(Msg) -> term_to_binary(Msg).

+ 49 - 11
apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl

@@ -215,15 +215,15 @@ init(Opts) ->
     ConnectModule = conn_type(maps:get(conn_type, ConnectOpts)),
     ConnectModule = conn_type(maps:get(conn_type, ConnectOpts)),
     Forwards = maps:get(forwards, Opts, []),
     Forwards = maps:get(forwards, Opts, []),
     Queue = open_replayq(maps:get(replayq, Opts, #{})),
     Queue = open_replayq(maps:get(replayq, Opts, #{})),
-    State = init_opts(Opts),
+    State = init_state(Opts),
     self() ! idle,
     self() ! idle,
     {ok, idle, State#{connect_module => ConnectModule,
     {ok, idle, State#{connect_module => ConnectModule,
-                      connect_opts => ConnectOpts,
+                      connect_opts => pre_process_opts(ConnectOpts),
                       forwards => Forwards,
                       forwards => Forwards,
                       replayq => Queue
                       replayq => Queue
                      }}.
                      }}.
 
 
-init_opts(Opts) ->
+init_state(Opts) ->
     IfRecordMetrics = maps:get(if_record_metrics, Opts, true),
     IfRecordMetrics = maps:get(if_record_metrics, Opts, true),
     ReconnDelayMs = maps:get(reconnect_interval, Opts, ?DEFAULT_RECONNECT_DELAY_MS),
     ReconnDelayMs = maps:get(reconnect_interval, Opts, ?DEFAULT_RECONNECT_DELAY_MS),
     StartType = maps:get(start_type, Opts, manual),
     StartType = maps:get(start_type, Opts, manual),
@@ -252,6 +252,26 @@ open_replayq(QCfg) ->
     replayq:open(QueueConfig#{sizer => fun emqx_bridge_msg:estimate_size/1,
     replayq:open(QueueConfig#{sizer => fun emqx_bridge_msg:estimate_size/1,
                               marshaller => fun ?MODULE:msg_marshaller/1}).
                               marshaller => fun ?MODULE:msg_marshaller/1}).
 
 
+pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts) ->
+    ConnectOpts#{subscriptions => [pre_process_in_out(In) || In <- InConf],
+                 forwards => [pre_process_in_out(Out) || Out <- OutConf]}.
+
+pre_process_in_out(Conf) ->
+    Conf1 = pre_process_conf(publish_local_topic, Conf),
+    Conf2 = pre_process_conf(publish_remote_topic, Conf1),
+    Conf3 = pre_process_conf(publish_payload, Conf2),
+    Conf4 = pre_process_conf(publish_qos, Conf3),
+    pre_process_conf(publish_retain, Conf4).
+
+pre_process_conf(Key, Conf) ->
+    case maps:find(Key, Conf) of
+        error -> Conf;
+        {ok, Val} when is_binary(Val) ->
+            Conf#{Key => emqx_plugin_libs_rule:preproc_tmpl(Val)};
+        {ok, Val} ->
+            Conf#{Key => Val}
+    end.
+
 code_change(_Vsn, State, Data, _Extra) ->
 code_change(_Vsn, State, Data, _Extra) ->
     {ok, State, Data}.
     {ok, State, Data}.
 
 
@@ -360,7 +380,7 @@ common(StateName, Type, Content, #{name := Name} = State) ->
     {keep_state, State}.
     {keep_state, State}.
 
 
 do_ensure_forward_present(Topic, #{forwards := Forwards, name := Name} = State) ->
 do_ensure_forward_present(Topic, #{forwards := Forwards, name := Name} = State) ->
-    case is_topic_present(Topic, Forwards) of
+    case is_local_sub_present(Topic, Forwards) of
         true ->
         true ->
             {ok, State};
             {ok, State};
         false ->
         false ->
@@ -374,7 +394,7 @@ do_ensure_subscription_present(_Topic, _QoS, #{connect_module := emqx_bridge_rpc
     {{error, no_remote_subscription_support}, State};
     {{error, no_remote_subscription_support}, State};
 do_ensure_subscription_present(Topic, QoS, #{connect_module := ConnectModule,
 do_ensure_subscription_present(Topic, QoS, #{connect_module := ConnectModule,
                                           connection := Conn} = State) ->
                                           connection := Conn} = State) ->
-    case is_topic_present(Topic, maps:get(subscriptions, Conn, [])) of
+    case is_remote_sub_present(Topic, maps:get(subscriptions, Conn, [])) of
         true ->
         true ->
             {ok, State};
             {ok, State};
         false ->
         false ->
@@ -387,7 +407,7 @@ do_ensure_subscription_present(Topic, QoS, #{connect_module := ConnectModule,
     end.
     end.
 
 
 do_ensure_forward_absent(Topic, #{forwards := Forwards} = State) ->
 do_ensure_forward_absent(Topic, #{forwards := Forwards} = State) ->
-    case is_topic_present(Topic, Forwards) of
+    case is_local_sub_present(Topic, Forwards) of
         true ->
         true ->
             R = do_unsubscribe(Topic),
             R = do_unsubscribe(Topic),
             {R, State#{forwards => lists:delete(Topic, Forwards)}};
             {R, State#{forwards => lists:delete(Topic, Forwards)}};
@@ -400,7 +420,7 @@ do_ensure_subscription_absent(_Topic, #{connect_module := emqx_bridge_rpc} = Sta
     {{error, no_remote_subscription_support}, State};
     {{error, no_remote_subscription_support}, State};
 do_ensure_subscription_absent(Topic, #{connect_module := ConnectModule,
 do_ensure_subscription_absent(Topic, #{connect_module := ConnectModule,
                                        connection := Conn} = State) ->
                                        connection := Conn} = State) ->
-    case is_topic_present(Topic, maps:get(subscriptions, Conn, [])) of
+    case is_remote_sub_present(Topic, maps:get(subscriptions, Conn, [])) of
         true ->
         true ->
             case ConnectModule:ensure_unsubscribed(Conn, Topic) of
             case ConnectModule:ensure_unsubscribed(Conn, Topic) of
                 {error, Error} ->
                 {error, Error} ->
@@ -412,8 +432,15 @@ do_ensure_subscription_absent(Topic, #{connect_module := ConnectModule,
             {ok, State}
             {ok, State}
     end.
     end.
 
 
-is_topic_present(Topic, Topics) ->
-    lists:member(Topic, Topics) orelse false =/= lists:keyfind(Topic, 1, Topics).
+is_local_sub_present(Topic, Configs) ->
+    is_topic_present(subscribe_local_topic, Topic, Configs).
+is_remote_sub_present(Topic, Configs) ->
+    is_topic_present(subscribe_remote_topic, Topic, Configs).
+
+is_topic_present(Type, Topic, Configs) ->
+    lists:any(fun(Conf) ->
+            Topic == maps:get(Type, Conf, undefined)
+        end, Configs).
 
 
 do_connect(#{forwards := Forwards,
 do_connect(#{forwards := Forwards,
              connect_module := ConnectModule,
              connect_module := ConnectModule,
@@ -451,7 +478,7 @@ retry_inflight(State, [#{q_ack_ref := QAckRef, batch := Batch} | Rest] = OldInf)
             {error, State1#{inflight := NewInf ++ OldInf}}
             {error, State1#{inflight := NewInf ++ OldInf}}
     end.
     end.
 
 
-pop_and_send(#{inflight := Inflight, max_inflight := Max } = State) ->
+pop_and_send(#{inflight := Inflight, max_inflight := Max} = State) ->
     pop_and_send_loop(State, Max - length(Inflight)).
     pop_and_send_loop(State, Max - length(Inflight)).
 
 
 pop_and_send_loop(State, 0) ->
 pop_and_send_loop(State, 0) ->
@@ -480,10 +507,12 @@ do_send(#{inflight := Inflight,
           connect_module := Module,
           connect_module := Module,
           connection := Connection,
           connection := Connection,
           mountpoint := Mountpoint,
           mountpoint := Mountpoint,
+          forwards := Forwards,
           if_record_metrics := IfRecordMetrics} = State, QAckRef, [_ | _] = Batch) ->
           if_record_metrics := IfRecordMetrics} = State, QAckRef, [_ | _] = Batch) ->
+    Vars = make_export_variables(Mountpoint, Forwards),
     ExportMsg = fun(Message) ->
     ExportMsg = fun(Message) ->
                     bridges_metrics_inc(IfRecordMetrics, 'bridge.mqtt.message_sent'),
                     bridges_metrics_inc(IfRecordMetrics, 'bridge.mqtt.message_sent'),
-                    emqx_bridge_msg:to_export(Module, Mountpoint, Message)
+                    emqx_bridge_msg:to_export(Module, Vars, Message)
                 end,
                 end,
     case Module:send(Connection, [ExportMsg(M) || M <- Batch]) of
     case Module:send(Connection, [ExportMsg(M) || M <- Batch]) of
         {ok, Refs} ->
         {ok, Refs} ->
@@ -495,6 +524,15 @@ do_send(#{inflight := Inflight,
             {error, State}
             {error, State}
     end.
     end.
 
 
+make_export_variables(Mountpoint, #{
+        publish_remote_topic := PubTopic,
+        publish_payload := PayloadTmpl,
+        publish_qos := PubQoS,
+        publish_retain := PubRetain}) ->
+    #{topic => PubTopic, payload => PayloadTmpl,
+      qos => PubQoS, retain => PubRetain,
+      mountpoint => Mountpoint}.
+
 %% map as set, ack-reference -> 1
 %% map as set, ack-reference -> 1
 map_set(Ref) when is_reference(Ref) ->
 map_set(Ref) when is_reference(Ref) ->
     %% QoS-0 or RPC call returns a reference
     %% QoS-0 or RPC call returns a reference

+ 15 - 11
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -53,17 +53,15 @@ fields("config") ->
     ] ++ emqx_connector_schema_lib:ssl_fields();
     ] ++ emqx_connector_schema_lib:ssl_fields();
 
 
 fields("in") ->
 fields("in") ->
-    [ {from_remote_topic, #{type => binary(), nullable => false}}
-    , {to_local_topic, #{type => binary(), nullable => false}}
-    , {qos, emqx_schema:t(integer(), undefined, 1)}
-    , {payload_template, emqx_schema:t(binary(), undefined, <<"${message}">>)}
-    ];
+    [ {subscribe_remote_topic, #{type => binary(), nullable => false}}
+    , {publish_local_topic, emqx_schema:t(binary(), undefined, <<"${topic}">>)}
+    , {subscribe_qos, emqx_schema:t(hoconsc:union([0, 1, 2, binary()]), undefined, 1)}
+    ] ++ publish_confs();
 
 
 fields("out") ->
 fields("out") ->
-    [ {to_remote_topic, #{type => binary(), nullable => false}}
-    , {from_local_topic, #{type => binary(), nullable => false}}
-    , {payload_template, emqx_schema:t(binary(), undefined, <<"${payload}">>)}
-    ];
+    [ {subscribe_local_topic, #{type => binary(), nullable => false}}
+    , {publish_remote_topic, emqx_schema:t(binary(), undefined, <<"${topic}">>)}
+    ] ++ publish_confs();
 
 
 fields("replayq") ->
 fields("replayq") ->
     [ {dir, hoconsc:union([boolean(), string()])}
     [ {dir, hoconsc:union([boolean(), string()])}
@@ -72,6 +70,12 @@ fields("replayq") ->
     , {max_total_bytes, emqx_schema:t(emqx_schema:bytesize(), undefined, "1024MB")}
     , {max_total_bytes, emqx_schema:t(emqx_schema:bytesize(), undefined, "1024MB")}
     ].
     ].
 
 
+publish_confs() ->
+    [ {publish_qos, emqx_schema:t(hoconsc:union([0, 1, 2, binary()]), undefined, <<"${qos}">>)}
+    , {publish_retain, emqx_schema:t(hoconsc:union([boolean(), binary()]), undefined, <<"${retain}">>)}
+    , {publish_payload, emqx_schema:t(binary(), undefined, <<"${payload}">>)}
+    ].
+
 proto_ver(type) -> hoconsc:enum([v3, v4, v5]);
 proto_ver(type) -> hoconsc:enum([v3, v4, v5]);
 proto_ver(default) -> v4;
 proto_ver(default) -> v4;
 proto_ver(_) -> undefined.
 proto_ver(_) -> undefined.
@@ -138,8 +142,8 @@ on_query(InstId, {publish_to_local, Msg}, _AfterQuery, _State) ->
 on_query(InstId, {publish_to_remote, Msg}, _AfterQuery, _State) ->
 on_query(InstId, {publish_to_remote, Msg}, _AfterQuery, _State) ->
     logger:debug("publish to remote node, connector: ~p, msg: ~p", [InstId, Msg]).
     logger:debug("publish to remote node, connector: ~p, msg: ~p", [InstId, Msg]).
 
 
-on_health_check(_InstId, #{bridge_worker := Worker}) ->
-    {ok, emqx_bridge_worker:ping(Worker)}.
+on_health_check(_InstId, #{bridge_name := Name}) ->
+    {ok, emqx_bridge_worker:ping(Name)}.
 
 
 start_bridge(Name) ->
 start_bridge(Name) ->
     case emqx_bridge_worker:ensure_started(Name) of
     case emqx_bridge_worker:ensure_started(Name) of