Explorar el Código

fix: ingress only bridge causes egress bridge traffic stop

William Yang hace 3 años
padre
commit
6b6bb09d4a
Se han modificado 2 ficheros con 139 adiciones y 19 borrados
  1. 10 4
      apps/emqx_bridge/src/emqx_bridge.erl
  2. 129 15
      apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl

+ 10 - 4
apps/emqx_bridge/src/emqx_bridge.erl

@@ -138,7 +138,7 @@ on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
     {ok, Message}.
 
 send_to_matched_egress_bridges(Topic, Msg) ->
-    MatchedBridgeIds = get_matched_bridges(Topic),
+    MatchedBridgeIds = get_matched_egress_bridges(Topic),
     lists:foreach(
         fun(Id) ->
             try send_message(Id, Msg) of
@@ -339,13 +339,19 @@ flatten_confs(Conf0) ->
 do_flatten_confs(Type, Conf0) ->
     [{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)].
 
-get_matched_bridges(Topic) ->
+get_matched_egress_bridges(Topic) ->
     Bridges = emqx:get_config([bridges], #{}),
     maps:fold(
         fun(BType, Conf, Acc0) ->
             maps:fold(
-                fun(BName, BConf, Acc1) ->
-                    get_matched_bridge_id(BType, BConf, Topic, BName, Acc1)
+                fun
+                    (BName, #{egress := _} = BConf, Acc1) when BType =:= mqtt ->
+                        get_matched_bridge_id(BType, BConf, Topic, BName, Acc1);
+                    (_BName, #{ingress := _}, Acc1) when BType =:= mqtt ->
+                        %% ignore ingress only bridge
+                        Acc1;
+                    (BName, BConf, Acc1) ->
+                        get_matched_bridge_id(BType, BConf, Topic, BName, Acc1)
                 end,
                 Acc0,
                 Conf

+ 129 - 15
apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl

@@ -34,6 +34,13 @@
 -define(NAME_MQTT, <<"my_mqtt_bridge">>).
 -define(BRIDGE_NAME_INGRESS, <<"ingress_mqtt_bridge">>).
 -define(BRIDGE_NAME_EGRESS, <<"egress_mqtt_bridge">>).
+
+%% Having ingress/egress prefixs of topic names to avoid dead loop while bridging
+-define(INGRESS_REMOTE_TOPIC, "ingress_remote_topic").
+-define(INGRESS_LOCAL_TOPIC, "ingress_local_topic").
+-define(EGRESS_REMOTE_TOPIC, "egress_remote_topic").
+-define(EGRESS_LOCAL_TOPIC, "egress_local_topic").
+
 -define(SERVER_CONF(Username), #{
     <<"server">> => <<"127.0.0.1:1883">>,
     <<"username">> => Username,
@@ -44,11 +51,11 @@
 
 -define(INGRESS_CONF, #{
     <<"remote">> => #{
-        <<"topic">> => <<"remote_topic/#">>,
+        <<"topic">> => <<?INGRESS_REMOTE_TOPIC, "/#">>,
         <<"qos">> => 2
     },
     <<"local">> => #{
-        <<"topic">> => <<"local_topic/${topic}">>,
+        <<"topic">> => <<?INGRESS_LOCAL_TOPIC, "/${topic}">>,
         <<"qos">> => <<"${qos}">>,
         <<"payload">> => <<"${payload}">>,
         <<"retain">> => <<"${retain}">>
@@ -57,10 +64,10 @@
 
 -define(EGRESS_CONF, #{
     <<"local">> => #{
-        <<"topic">> => <<"local_topic/#">>
+        <<"topic">> => <<?EGRESS_LOCAL_TOPIC, "/#">>
     },
     <<"remote">> => #{
-        <<"topic">> => <<"remote_topic/${topic}">>,
+        <<"topic">> => <<?EGRESS_REMOTE_TOPIC, "/${topic}">>,
         <<"payload">> => <<"${payload}">>,
         <<"qos">> => <<"${qos}">>,
         <<"retain">> => <<"${retain}">>
@@ -155,8 +162,8 @@ t_mqtt_conn_bridge_ingress(_) ->
     BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS),
 
     %% we now test if the bridge works as expected
-    RemoteTopic = <<"remote_topic/1">>,
-    LocalTopic = <<"local_topic/", RemoteTopic/binary>>,
+    RemoteTopic = <<?INGRESS_REMOTE_TOPIC, "/1">>,
+    LocalTopic = <<?INGRESS_LOCAL_TOPIC, "/", RemoteTopic/binary>>,
     Payload = <<"hello">>,
     emqx:subscribe(LocalTopic),
     timer:sleep(100),
@@ -219,8 +226,8 @@ t_mqtt_conn_bridge_egress(_) ->
     } = jsx:decode(Bridge),
     BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
     %% we now test if the bridge works as expected
-    LocalTopic = <<"local_topic/1">>,
-    RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
+    LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
+    RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
     Payload = <<"hello">>,
     emqx:subscribe(RemoteTopic),
     timer:sleep(100),
@@ -264,6 +271,113 @@ t_mqtt_conn_bridge_egress(_) ->
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
     ok.
 
+t_mqtt_conn_bridge_ingress_and_egress(_) ->
+    User1 = <<"user1">>,
+    %% create an MQTT bridge, using POST
+    {ok, 201, Bridge} = request(
+        post,
+        uri(["bridges"]),
+        ?SERVER_CONF(User1)#{
+            <<"type">> => ?TYPE_MQTT,
+            <<"name">> => ?BRIDGE_NAME_INGRESS,
+            <<"ingress">> => ?INGRESS_CONF
+        }
+    ),
+
+    #{
+        <<"type">> := ?TYPE_MQTT,
+        <<"name">> := ?BRIDGE_NAME_INGRESS
+    } = jsx:decode(Bridge),
+    BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS),
+    {ok, 201, Bridge2} = request(
+        post,
+        uri(["bridges"]),
+        ?SERVER_CONF(User1)#{
+            <<"type">> => ?TYPE_MQTT,
+            <<"name">> => ?BRIDGE_NAME_EGRESS,
+            <<"egress">> => ?EGRESS_CONF
+        }
+    ),
+    #{
+        <<"type">> := ?TYPE_MQTT,
+        <<"name">> := ?BRIDGE_NAME_EGRESS
+    } = jsx:decode(Bridge2),
+
+    BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
+    %% we now test if the bridge works as expected
+    LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
+    RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
+    Payload = <<"hello">>,
+    emqx:subscribe(RemoteTopic),
+
+    {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []),
+    #{
+        <<"metrics">> := #{
+            <<"matched">> := CntMatched1, <<"success">> := CntSuccess1, <<"failed">> := 0
+        },
+        <<"node_metrics">> :=
+            [
+                #{
+                    <<"node">> := _,
+                    <<"metrics">> :=
+                        #{
+                            <<"matched">> := NodeCntMatched1,
+                            <<"success">> := NodeCntSuccess1,
+                            <<"failed">> := 0
+                        }
+                }
+            ]
+    } = jsx:decode(BridgeStr1),
+    timer:sleep(100),
+    %% PUBLISH a message to the 'local' broker, as we have only one broker,
+    %% the remote broker is also the local one.
+    emqx:publish(emqx_message:make(LocalTopic, Payload)),
+
+    %% we should receive a message on the "remote" broker, with specified topic
+    ?assert(
+        receive
+            {deliver, RemoteTopic, #message{payload = Payload}} ->
+                ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
+                true;
+            Msg ->
+                ct:pal("Msg: ~p", [Msg]),
+                false
+        after 100 ->
+            false
+        end
+    ),
+
+    %% verify the metrics of the bridge
+    timer:sleep(1000),
+    {ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []),
+    #{
+        <<"metrics">> := #{
+            <<"matched">> := CntMatched2, <<"success">> := CntSuccess2, <<"failed">> := 0
+        },
+        <<"node_metrics">> :=
+            [
+                #{
+                    <<"node">> := _,
+                    <<"metrics">> :=
+                        #{
+                            <<"matched">> := NodeCntMatched2,
+                            <<"success">> := NodeCntSuccess2,
+                            <<"failed">> := 0
+                        }
+                }
+            ]
+    } = jsx:decode(BridgeStr2),
+    ?assertEqual(CntMatched2, CntMatched1 + 1),
+    ?assertEqual(CntSuccess2, CntSuccess1 + 1),
+    ?assertEqual(NodeCntMatched2, NodeCntMatched1 + 1),
+    ?assertEqual(NodeCntSuccess2, NodeCntSuccess1 + 1),
+
+    %% delete the bridge
+    {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
+    {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []),
+    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+    ok.
+
 t_ingress_mqtt_bridge_with_rules(_) ->
     {ok, 201, _} = request(
         post,
@@ -290,8 +404,8 @@ t_ingress_mqtt_bridge_with_rules(_) ->
 
     %% we now test if the bridge works as expected
 
-    RemoteTopic = <<"remote_topic/1">>,
-    LocalTopic = <<"local_topic/", RemoteTopic/binary>>,
+    RemoteTopic = <<?INGRESS_REMOTE_TOPIC, "/1">>,
+    LocalTopic = <<?INGRESS_LOCAL_TOPIC, "/", RemoteTopic/binary>>,
     Payload = <<"hello">>,
     emqx:subscribe(LocalTopic),
     timer:sleep(100),
@@ -400,8 +514,8 @@ t_egress_mqtt_bridge_with_rules(_) ->
     #{<<"id">> := RuleId} = jsx:decode(Rule),
 
     %% we now test if the bridge works as expected
-    LocalTopic = <<"local_topic/1">>,
-    RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
+    LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
+    RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
     Payload = <<"hello">>,
     emqx:subscribe(RemoteTopic),
     timer:sleep(100),
@@ -426,7 +540,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
     %% PUBLISH a message to the rule.
     Payload2 = <<"hi">>,
     RuleTopic = <<"t/1">>,
-    RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>,
+    RemoteTopic2 = <<?EGRESS_REMOTE_TOPIC, "/", RuleTopic/binary>>,
     emqx:subscribe(RemoteTopic2),
     timer:sleep(100),
     emqx:publish(emqx_message:make(RuleTopic, Payload2)),
@@ -517,8 +631,8 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
     } = jsx:decode(Bridge),
     BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
     %% we now test if the bridge works as expected
-    LocalTopic = <<"local_topic/1">>,
-    RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
+    LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
+    RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
     Payload0 = <<"hello">>,
     emqx:subscribe(RemoteTopic),
     timer:sleep(100),