Browse Source

feat(bridge): let mqtt bridge work with rules

Shawn 4 years ago
parent
commit
c0258e1be6

+ 27 - 15
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -28,6 +28,8 @@
         , bridges/0
         ]).
 
+-export([on_message_received/2]).
+
 %% callbacks of behaviour emqx_resource
 -export([ on_start/2
         , on_stop/2
@@ -83,6 +85,12 @@ drop_bridge(Name) ->
             {error, Error}
     end.
 
+%% ===================================================================
+%% When use this bridge as a data source, ?MODULE:on_message_received/2 will be called
+%% if the bridge received msgs from the remote broker.
+on_message_received(Msg, ChannelName) ->
+    emqx:run_hook(ChannelName, [Msg]).
+
 %% ===================================================================
 on_start(InstId, Conf) ->
     logger:info("starting mqtt connector: ~p, ~p", [InstId, Conf]),
@@ -112,10 +120,9 @@ on_stop(InstId, #{channels := NameList}) ->
 on_query(_InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix,
         baisc_conf := BasicConf}) ->
     create_channel(Conf, Prefix, BasicConf);
-on_query(InstId, {publish_to_local, Msg}, _AfterQuery, _State) ->
-    logger:debug("publish to local node, connector: ~p, msg: ~p", [InstId, Msg]);
-on_query(InstId, {publish_to_remote, Msg}, _AfterQuery, _State) ->
-    logger:debug("publish to remote node, connector: ~p, msg: ~p", [InstId, Msg]).
+on_query(_InstId, {send_to_remote, ChannelName, Msg}, _AfterQuery, _State) ->
+    logger:debug("send msg to remote node on channel: ~p, msg: ~p", [ChannelName, Msg]),
+    emqx_connector_mqtt_worker:send_to_remote(ChannelName, Msg).
 
 on_health_check(_InstId, #{channels := NameList} = State) ->
     Results = [{Name, emqx_connector_mqtt_worker:ping(Name)} || Name <- NameList],
@@ -124,25 +131,30 @@ on_health_check(_InstId, #{channels := NameList} = State) ->
         false -> {error, {some_channel_down, Results}, State}
     end.
 
-create_channel({{ingress_channels, Id}, #{subscribe_remote_topic := RemoteT,
-        local_topic := LocalT} = Conf}, NamePrefix, BasicConf) ->
+create_channel({{ingress_channels, Id}, #{subscribe_remote_topic := RemoteT} = Conf},
+        NamePrefix, BasicConf) ->
+    LocalT = maps:get(local_topic, Conf, undefined),
     Name = ingress_channel_name(NamePrefix, Id),
-    logger:info("creating ingress channel ~p, remote ~s -> local ~s",
-        [Name, RemoteT, LocalT]),
+    logger:info("creating ingress channel ~p, remote ~s -> local ~s", [Name, RemoteT, LocalT]),
     do_create_channel(BasicConf#{
         name => Name,
         clientid => clientid(Name),
-        subscriptions => Conf, forwards => undefined});
-
-create_channel({{egress_channels, Id}, #{subscribe_local_topic := LocalT,
-        remote_topic := RemoteT} = Conf}, NamePrefix, BasicConf) ->
+        subscriptions => Conf#{
+            local_topic => LocalT,
+            on_message_received => {fun ?MODULE:on_message_received/2, [Name]}
+        },
+        forwards => undefined});
+
+create_channel({{egress_channels, Id}, #{remote_topic := RemoteT} = Conf},
+        NamePrefix, BasicConf) ->
+    LocalT = maps:get(subscribe_local_topic, Conf, undefined),
     Name = egress_channel_name(NamePrefix, Id),
-    logger:info("creating egress channel ~p, local ~s -> remote ~s",
-        [Name, LocalT, RemoteT]),
+    logger:info("creating egress channel ~p, local ~s -> remote ~s", [Name, LocalT, RemoteT]),
     do_create_channel(BasicConf#{
         name => Name,
         clientid => clientid(Name),
-        subscriptions => undefined, forwards => Conf}).
+        subscriptions => undefined,
+        forwards => Conf#{subscribe_local_topic => LocalT}}).
 
 remove_channel(ChannelName) ->
     logger:info("removing channel ~p", [ChannelName]),

+ 8 - 2
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl

@@ -159,9 +159,15 @@ handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) ->
 
 handle_publish(Msg, undefined) ->
     ?LOG(error, "cannot publish to local broker as 'bridge.mqtt.<name>.in' not configured, msg: ~p", [Msg]);
-handle_publish(Msg, Vars) ->
+handle_publish(Msg, #{on_message_received := {OnMsgRcvdFunc, Args}} = Vars) ->
     ?LOG(debug, "publish to local broker, msg: ~p, vars: ~p", [Msg, Vars]),
-    emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)).
+    emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1),
+    _ = erlang:apply(OnMsgRcvdFunc, [Msg, Args]),
+    case maps:get(local_topic, Vars, undefined) of
+        undefined -> ok;
+        _Topic ->
+            emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars))
+    end.
 
 handle_disconnected(Reason, Parent) ->
     Parent ! {disconnected, self(), Reason}.

+ 7 - 7
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl

@@ -36,17 +36,15 @@
 
 -type variables() :: #{
     mountpoint := undefined | binary(),
-    topic := binary(),
+    remote_topic := binary(),
     qos := original | integer(),
     retain := original | boolean(),
     payload := binary()
 }.
 
 make_pub_vars(_, undefined) -> undefined;
-make_pub_vars(Mountpoint, #{payload := _, qos := _, retain := _, remote_topic := Topic} = Conf) ->
-    Conf#{topic => Topic, mountpoint => Mountpoint};
-make_pub_vars(Mountpoint, #{payload := _, qos := _, retain := _, local_topic := Topic} = Conf) ->
-    Conf#{topic => Topic, mountpoint => Mountpoint}.
+make_pub_vars(Mountpoint, Conf) when is_map(Conf) ->
+    Conf#{mountpoint => Mountpoint}.
 
 %% @doc Make export format:
 %% 1. Mount topic to a prefix
@@ -61,7 +59,7 @@ to_remote_msg(#message{flags = Flags0} = Msg, Vars) ->
     Retain0 = maps:get(retain, Flags0, false),
     MapMsg = maps:put(retain, Retain0, emqx_message:to_map(Msg)),
     to_remote_msg(MapMsg, Vars);
-to_remote_msg(MapMsg, #{topic := TopicToken, payload := PayloadToken,
+to_remote_msg(MapMsg, #{remote_topic := TopicToken, payload := PayloadToken,
         qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) ->
     Topic = replace_vars_in_str(TopicToken, MapMsg),
     Payload = replace_vars_in_str(PayloadToken, MapMsg),
@@ -77,7 +75,7 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
 
 %% published from remote node over a MQTT connection
 to_broker_msg(#{dup := Dup, properties := Props} = MapMsg,
-            #{topic := TopicToken, payload := PayloadToken,
+            #{local_topic := TopicToken, payload := PayloadToken,
               qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) ->
     Topic = replace_vars_in_str(TopicToken, MapMsg),
     Payload = replace_vars_in_str(PayloadToken, MapMsg),
@@ -115,6 +113,8 @@ from_binary(Bin) -> binary_to_term(Bin).
 %% Count only the topic length + payload size
 -spec estimate_size(msg()) -> integer().
 estimate_size(#message{topic = Topic, payload = Payload}) ->
+    size(Topic) + size(Payload);
+estimate_size(#{topic := Topic, payload := Payload}) ->
     size(Topic) + size(Payload).
 
 set_headers(undefined, Msg) ->

+ 5 - 3
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl

@@ -43,13 +43,15 @@ fields("config") ->
     ] ++ emqx_connector_schema_lib:ssl_fields();
 
 fields("ingress_channels") ->
-    [ {subscribe_remote_topic, #{type => binary(), nullable => false}}
-    , {local_topic, hoconsc:mk(binary(), #{default => <<"${topic}">>})}
+    %% the message maybe subscribed by rules, in this case 'local_topic' is not necessary
+    [ {subscribe_remote_topic, hoconsc:mk(binary(), #{nullable => false})}
+    , {local_topic, hoconsc:mk(binary())}
     , {subscribe_qos, hoconsc:mk(qos(), #{default => 1})}
     ] ++ common_inout_confs();
 
 fields("egress_channels") ->
-    [ {subscribe_local_topic, #{type => binary(), nullable => false}}
+    %% the message maybe sent from rules, in this case 'subscribe_local_topic' is not necessary
+    [ {subscribe_local_topic, hoconsc:mk(binary())}
     , {remote_topic, hoconsc:mk(binary(), #{default => <<"${topic}">>})}
     ] ++ common_inout_confs();
 

+ 17 - 24
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl

@@ -87,6 +87,7 @@
         , ensure_stopped/1
         , status/1
         , ping/1
+        , send_to_remote/2
         ]).
 
 -export([ get_forwards/1
@@ -171,6 +172,11 @@ ping(Pid) when is_pid(Pid) ->
 ping(Name) ->
     gen_statem:call(name(Name), ping).
 
+send_to_remote(Pid, Msg) when is_pid(Pid) ->
+    gen_statem:cast(Pid, {send_to_remote, Msg});
+send_to_remote(Name, Msg) ->
+    gen_statem:cast(name(Name), {send_to_remote, Msg}).
+
 %% @doc Return all forwards (local subscriptions).
 -spec get_forwards(id()) -> [topic()].
 get_forwards(Name) -> gen_statem:call(name(Name), get_forwards, timer:seconds(1000)).
@@ -194,7 +200,6 @@ init(#{name := Name} = ConnectOpts) ->
     }}.
 
 init_state(Opts) ->
-    IfRecordMetrics = maps:get(if_record_metrics, Opts, true),
     ReconnDelayMs = maps:get(reconnect_interval, Opts, ?DEFAULT_RECONNECT_DELAY_MS),
     StartType = maps:get(start_type, Opts, manual),
     Mountpoint = maps:get(forward_mountpoint, Opts, undefined),
@@ -208,7 +213,6 @@ init_state(Opts) ->
       inflight => [],
       max_inflight => MaxInflightSize,
       connection => undefined,
-      if_record_metrics => IfRecordMetrics,
       name => Name}.
 
 open_replayq(Name, QCfg) ->
@@ -321,17 +325,15 @@ common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := F
     {keep_state_and_data, [{reply, From, Forwards}]};
 common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) ->
     {keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]};
-common(_StateName, info, {deliver, _, Msg},
-       State = #{replayq := Q, if_record_metrics := IfRecordMetric}) ->
+common(_StateName, info, {deliver, _, Msg}, State = #{replayq := Q}) ->
     Msgs = collect([Msg]),
-    bridges_metrics_inc(IfRecordMetric,
-                        'bridge.mqtt.message_received',
-                        length(Msgs)
-                       ),
     NewQ = replayq:append(Q, Msgs),
     {keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}};
 common(_StateName, info, {'EXIT', _, _}, State) ->
     {keep_state, State};
+common(_StateName, cast, {send_to_remote, Msg}, #{replayq := Q} = State) ->
+    NewQ = replayq:append(Q, [Msg]),
+    {keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}};
 common(StateName, Type, Content, #{name := Name} = State) ->
     ?LOG(notice, "Bridge ~p discarded ~p type event at state ~p:~p",
           [Name, Type, StateName, Content]),
@@ -401,11 +403,10 @@ do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Batch) ->
 do_send(#{inflight := Inflight,
           connection := Connection,
           mountpoint := Mountpoint,
-          connect_opts := #{forwards := Forwards},
-          if_record_metrics := IfRecordMetrics} = State, QAckRef, [_ | _] = Batch) ->
+          connect_opts := #{forwards := Forwards}} = State, QAckRef, [_ | _] = Batch) ->
     Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards),
     ExportMsg = fun(Message) ->
-                    bridges_metrics_inc(IfRecordMetrics, 'bridge.mqtt.message_sent'),
+                    emqx_metrics:inc('bridge.mqtt.message_sent_to_remote'),
                     emqx_connector_mqtt_msg:to_remote_msg(Message, Vars)
                 end,
     ?LOG(debug, "publish to remote broker, msg: ~p, vars: ~p", [Batch, Vars]),
@@ -464,6 +465,8 @@ drop_acked_batches(Q, [#{send_ack_ref := Refs,
             All
     end.
 
+subscribe_local_topic(undefined, _Name) ->
+    ok;
 subscribe_local_topic(Topic, Name) ->
     do_subscribe(Topic, Name).
 
@@ -487,7 +490,7 @@ disconnect(#{connection := Conn} = State) when Conn =/= undefined ->
     emqx_connector_mqtt_mod:stop(Conn),
     State#{connection => undefined};
 disconnect(State) ->
-        State.
+    State.
 
 %% Called only when replayq needs to dump it to disk.
 msg_marshaller(Bin) when is_binary(Bin) -> emqx_connector_mqtt_msg:from_binary(Bin);
@@ -502,20 +505,10 @@ name(Id) -> list_to_atom(str(Id)).
 
 register_metrics() ->
     lists:foreach(fun emqx_metrics:ensure/1,
-                  ['bridge.mqtt.message_sent',
-                   'bridge.mqtt.message_received'
+                  ['bridge.mqtt.message_sent_to_remote',
+                   'bridge.mqtt.message_received_from_remote'
                   ]).
 
-bridges_metrics_inc(true, Metric) ->
-    emqx_metrics:inc(Metric);
-bridges_metrics_inc(_IsRecordMetric, _Metric) ->
-    ok.
-
-bridges_metrics_inc(true, Metric, Value) ->
-    emqx_metrics:inc(Metric, Value);
-bridges_metrics_inc(_IsRecordMetric, _Metric, _Value) ->
-    ok.
-
 obfuscate(Map) ->
     maps:fold(fun(K, V, Acc) ->
                       case is_sensitive(K) of