Просмотр исходного кода

Merge pull request #9672 from JimMoen/0103-fix-mqtt-bridge

Fix the problem that the bridge is not available when the Payload template is empty in the MQTT bridge.
JimMoen 3 лет назад
Родитель
Сommit
54ebc27d24

+ 167 - 0
apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl

@@ -75,6 +75,29 @@
     }
 }).
 
+-define(INGRESS_CONF_NO_PAYLOAD_TEMPLATE, #{
+    <<"remote">> => #{
+        <<"topic">> => <<?INGRESS_REMOTE_TOPIC, "/#">>,
+        <<"qos">> => 2
+    },
+    <<"local">> => #{
+        <<"topic">> => <<?INGRESS_LOCAL_TOPIC, "/${topic}">>,
+        <<"qos">> => <<"${qos}">>,
+        <<"retain">> => <<"${retain}">>
+    }
+}).
+
+-define(EGRESS_CONF_NO_PAYLOAD_TEMPLATE, #{
+    <<"local">> => #{
+        <<"topic">> => <<?EGRESS_LOCAL_TOPIC, "/#">>
+    },
+    <<"remote">> => #{
+        <<"topic">> => <<?EGRESS_REMOTE_TOPIC, "/${topic}">>,
+        <<"qos">> => <<"${qos}">>,
+        <<"retain">> => <<"${retain}">>
+    }
+}).
+
 inspect(Selected, _Envs, _Args) ->
     persistent_term:put(?MODULE, #{inspect => Selected}).
 
@@ -209,6 +232,76 @@ t_mqtt_conn_bridge_ingress(_) ->
 
     ok.
 
+t_mqtt_conn_bridge_ingress_no_payload_template(_) ->
+    User1 = <<"user1">>,
+    %% create an MQTT bridge, using POST
+    {ok, 201, Bridge} = request(
+        post,
+        uri(["bridges"]),
+        ?SERVER_CONF(User1)#{
+            <<"type">> => ?TYPE_MQTT,
+            <<"name">> => ?BRIDGE_NAME_INGRESS,
+            <<"ingress">> => ?INGRESS_CONF_NO_PAYLOAD_TEMPLATE
+        }
+    ),
+    #{
+        <<"type">> := ?TYPE_MQTT,
+        <<"name">> := ?BRIDGE_NAME_INGRESS
+    } = jsx:decode(Bridge),
+    BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS),
+
+    %% we now test if the bridge works as expected
+    RemoteTopic = <<?INGRESS_REMOTE_TOPIC, "/1">>,
+    LocalTopic = <<?INGRESS_LOCAL_TOPIC, "/", RemoteTopic/binary>>,
+    Payload = <<"hello">>,
+    emqx:subscribe(LocalTopic),
+    timer:sleep(100),
+    %% PUBLISH a message to the 'remote' broker, as we have only one broker,
+    %% the remote broker is also the local one.
+    emqx:publish(emqx_message:make(RemoteTopic, Payload)),
+    %% we should receive a message on the local broker, with specified topic
+    ?assert(
+        receive
+            {deliver, LocalTopic, #message{payload = MapMsg}} ->
+                ct:pal("local broker got message: ~p on topic ~p", [MapMsg, LocalTopic]),
+                %% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here.
+                case jsx:decode(MapMsg) of
+                    #{<<"payload">> := Payload} ->
+                        true;
+                    _ ->
+                        false
+                end;
+            Msg ->
+                ct:pal("Msg: ~p", [Msg]),
+                false
+        after 100 ->
+            false
+        end
+    ),
+
+    %% verify the metrics of the bridge
+    {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress]), []),
+    ?assertMatch(
+        #{
+            <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1},
+            <<"node_metrics">> :=
+                [
+                    #{
+                        <<"node">> := _,
+                        <<"metrics">> :=
+                            #{<<"matched">> := 0, <<"received">> := 1}
+                    }
+                ]
+        },
+        jsx:decode(BridgeStr)
+    ),
+
+    %% delete the bridge
+    {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []),
+    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+
+    ok.
+
 t_mqtt_conn_bridge_egress(_) ->
     %% then we add a mqtt connector, using POST
     User1 = <<"user1">>,
@@ -276,6 +369,80 @@ t_mqtt_conn_bridge_egress(_) ->
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
     ok.
 
+t_mqtt_conn_bridge_egress_no_payload_template(_) ->
+    %% then we add a mqtt connector, using POST
+    User1 = <<"user1">>,
+
+    {ok, 201, Bridge} = request(
+        post,
+        uri(["bridges"]),
+        ?SERVER_CONF(User1)#{
+            <<"type">> => ?TYPE_MQTT,
+            <<"name">> => ?BRIDGE_NAME_EGRESS,
+            <<"egress">> => ?EGRESS_CONF_NO_PAYLOAD_TEMPLATE
+        }
+    ),
+    #{
+        <<"type">> := ?TYPE_MQTT,
+        <<"name">> := ?BRIDGE_NAME_EGRESS
+    } = jsx:decode(Bridge),
+    BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
+    ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
+    %% we now test if the bridge works as expected
+    LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
+    RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
+    Payload = <<"hello">>,
+    emqx:subscribe(RemoteTopic),
+    timer:sleep(100),
+    %% PUBLISH a message to the 'local' broker, as we have only one broker,
+    %% the remote broker is also the local one.
+    emqx:publish(emqx_message:make(LocalTopic, Payload)),
+
+    %% we should receive a message on the "remote" broker, with specified topic
+    ?assert(
+        receive
+            {deliver, RemoteTopic, #message{payload = MapMsg, from = From}} ->
+                ct:pal("local broker got message: ~p on topic ~p", [MapMsg, RemoteTopic]),
+                %% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here.
+                Size = byte_size(ResourceID),
+                ?assertMatch(<<ResourceID:Size/binary, _/binary>>, From),
+                case jsx:decode(MapMsg) of
+                    #{<<"payload">> := Payload} ->
+                        true;
+                    _ ->
+                        false
+                end;
+            Msg ->
+                ct:pal("Msg: ~p", [Msg]),
+                false
+        after 100 ->
+            false
+        end
+    ),
+
+    %% verify the metrics of the bridge
+    {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
+    ?assertMatch(
+        #{
+            <<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
+            <<"node_metrics">> :=
+                [
+                    #{
+                        <<"node">> := _,
+                        <<"metrics">> :=
+                            #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}
+                    }
+                ]
+        },
+        jsx:decode(BridgeStr)
+    ),
+
+    %% delete the bridge
+    {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
+    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+
+    ok.
+
 t_egress_custom_clientid_prefix(_Config) ->
     User1 = <<"user1">>,
     {ok, 201, Bridge} = request(

+ 9 - 8
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl

@@ -71,14 +71,13 @@ to_remote_msg(#message{flags = Flags0} = Msg, Vars) ->
 to_remote_msg(MapMsg, #{
     remote := #{
         topic := TopicToken,
-        payload := PayloadToken,
         qos := QoSToken,
         retain := RetainToken
-    },
+    } = Remote,
     mountpoint := Mountpoint
 }) when is_map(MapMsg) ->
     Topic = replace_vars_in_str(TopicToken, MapMsg),
-    Payload = process_payload(PayloadToken, MapMsg),
+    Payload = process_payload(Remote, MapMsg),
     QoS = replace_simple_var(QoSToken, MapMsg),
     Retain = replace_simple_var(RetainToken, MapMsg),
     PubProps = maps:get(pub_props, MapMsg, #{}),
@@ -100,16 +99,15 @@ to_broker_msg(
     #{
         local := #{
             topic := TopicToken,
-            payload := PayloadToken,
             qos := QoSToken,
             retain := RetainToken
-        },
+        } = Local,
         mountpoint := Mountpoint
     },
     Props
 ) ->
     Topic = replace_vars_in_str(TopicToken, MapMsg),
-    Payload = process_payload(PayloadToken, MapMsg),
+    Payload = process_payload(Local, MapMsg),
     QoS = replace_simple_var(QoSToken, MapMsg),
     Retain = replace_simple_var(RetainToken, MapMsg),
     PubProps = maps:get(pub_props, MapMsg, #{}),
@@ -121,9 +119,12 @@ to_broker_msg(
         )
     ).
 
-process_payload([], Msg) ->
+process_payload(From, MapMsg) ->
+    do_process_payload(maps:get(payload, From, undefined), MapMsg).
+
+do_process_payload(undefined, Msg) ->
     emqx_json:encode(Msg);
-process_payload(Tks, Msg) ->
+do_process_payload(Tks, Msg) ->
     replace_vars_in_str(Tks, Msg).
 
 %% Replace a string contains vars to another string in which the placeholders are replace by the

+ 1 - 0
changes/v5.0.14/fix-9672.en.md

@@ -0,0 +1 @@
+Fix the problem that the bridge is not available when the Payload template is empty in the MQTT bridge.

+ 1 - 0
changes/v5.0.14/fix-9672.zh.md

@@ -0,0 +1 @@
+修复 MQTT 桥接中 Payload 模板为空时桥接不可用的问题。