|
|
@@ -248,6 +248,50 @@ t_mqtt_conn_bridge_ingress(_) ->
|
|
|
|
|
|
ok.
|
|
|
|
|
|
+t_mqtt_conn_bridge_ingress_full_context(_Config) ->
|
|
|
+ User1 = <<"user1">>,
|
|
|
+ IngressConf =
|
|
|
+ emqx_utils_maps:deep_merge(
|
|
|
+ ?INGRESS_CONF,
|
|
|
+ #{<<"local">> => #{<<"payload">> => <<"${.}">>}}
|
|
|
+ ),
|
|
|
+ {ok, 201, _Bridge} = request(
|
|
|
+ post,
|
|
|
+ uri(["bridges"]),
|
|
|
+ ?SERVER_CONF(User1)#{
|
|
|
+ <<"type">> => ?TYPE_MQTT,
|
|
|
+ <<"name">> => ?BRIDGE_NAME_INGRESS,
|
|
|
+ <<"ingress">> => IngressConf
|
|
|
+ }
|
|
|
+ ),
|
|
|
+
|
|
|
+ RemoteTopic = <<?INGRESS_REMOTE_TOPIC, "/1">>,
|
|
|
+ LocalTopic = <<?INGRESS_LOCAL_TOPIC, "/", RemoteTopic/binary>>,
|
|
|
+ Payload = <<"hello">>,
|
|
|
+ emqx:subscribe(LocalTopic),
|
|
|
+ timer:sleep(100),
|
|
|
+ %% PUBLISH a message to the 'remote' broker, as we have only one broker,
|
|
|
+ %% the remote broker is also the local one.
|
|
|
+ emqx:publish(emqx_message:make(RemoteTopic, Payload)),
|
|
|
+ %% we should receive a message on the local broker, with specified topic
|
|
|
+ #message{payload = EncodedPayload} = assert_mqtt_msg_received(LocalTopic),
|
|
|
+ ?assertMatch(
|
|
|
+ #{
|
|
|
+ <<"dup">> := false,
|
|
|
+ <<"id">> := _,
|
|
|
+ <<"message_received_at">> := _,
|
|
|
+ <<"payload">> := <<"hello">>,
|
|
|
+ <<"pub_props">> := #{},
|
|
|
+ <<"qos">> := 0,
|
|
|
+ <<"retain">> := false,
|
|
|
+ <<"server">> := <<"127.0.0.1:1883">>,
|
|
|
+ <<"topic">> := <<"ingress_remote_topic/1">>
|
|
|
+ },
|
|
|
+ emqx_utils_json:decode(EncodedPayload, [return_maps])
|
|
|
+ ),
|
|
|
+
|
|
|
+ ok.
|
|
|
+
|
|
|
t_mqtt_conn_bridge_ingress_shared_subscription(_) ->
|
|
|
PoolSize = 4,
|
|
|
Ns = lists:seq(1, 10),
|