|
|
@@ -75,6 +75,29 @@
|
|
|
}
|
|
|
}).
|
|
|
|
|
|
+-define(INGRESS_CONF_NO_PAYLOAD_TEMPLATE, #{
|
|
|
+ <<"remote">> => #{
|
|
|
+ <<"topic">> => <<?INGRESS_REMOTE_TOPIC, "/#">>,
|
|
|
+ <<"qos">> => 2
|
|
|
+ },
|
|
|
+ <<"local">> => #{
|
|
|
+ <<"topic">> => <<?INGRESS_LOCAL_TOPIC, "/${topic}">>,
|
|
|
+ <<"qos">> => <<"${qos}">>,
|
|
|
+ <<"retain">> => <<"${retain}">>
|
|
|
+ }
|
|
|
+}).
|
|
|
+
|
|
|
+-define(EGRESS_CONF_NO_PAYLOAD_TEMPLATE, #{
|
|
|
+ <<"local">> => #{
|
|
|
+ <<"topic">> => <<?EGRESS_LOCAL_TOPIC, "/#">>
|
|
|
+ },
|
|
|
+ <<"remote">> => #{
|
|
|
+ <<"topic">> => <<?EGRESS_REMOTE_TOPIC, "/${topic}">>,
|
|
|
+ <<"qos">> => <<"${qos}">>,
|
|
|
+ <<"retain">> => <<"${retain}">>
|
|
|
+ }
|
|
|
+}).
|
|
|
+
|
|
|
inspect(Selected, _Envs, _Args) ->
|
|
|
persistent_term:put(?MODULE, #{inspect => Selected}).
|
|
|
|
|
|
@@ -209,6 +232,76 @@ t_mqtt_conn_bridge_ingress(_) ->
|
|
|
|
|
|
ok.
|
|
|
|
|
|
+t_mqtt_conn_bridge_ingress_no_payload_template(_) ->
|
|
|
+ User1 = <<"user1">>,
|
|
|
+ %% create an MQTT bridge, using POST
|
|
|
+ {ok, 201, Bridge} = request(
|
|
|
+ post,
|
|
|
+ uri(["bridges"]),
|
|
|
+ ?SERVER_CONF(User1)#{
|
|
|
+ <<"type">> => ?TYPE_MQTT,
|
|
|
+ <<"name">> => ?BRIDGE_NAME_INGRESS,
|
|
|
+ <<"ingress">> => ?INGRESS_CONF_NO_PAYLOAD_TEMPLATE
|
|
|
+ }
|
|
|
+ ),
|
|
|
+ #{
|
|
|
+ <<"type">> := ?TYPE_MQTT,
|
|
|
+ <<"name">> := ?BRIDGE_NAME_INGRESS
|
|
|
+ } = jsx:decode(Bridge),
|
|
|
+ BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS),
|
|
|
+
|
|
|
+ %% we now test if the bridge works as expected
|
|
|
+ RemoteTopic = <<?INGRESS_REMOTE_TOPIC, "/1">>,
|
|
|
+ LocalTopic = <<?INGRESS_LOCAL_TOPIC, "/", RemoteTopic/binary>>,
|
|
|
+ Payload = <<"hello">>,
|
|
|
+ emqx:subscribe(LocalTopic),
|
|
|
+ timer:sleep(100),
|
|
|
+ %% PUBLISH a message to the 'remote' broker, as we have only one broker,
|
|
|
+ %% the remote broker is also the local one.
|
|
|
+ emqx:publish(emqx_message:make(RemoteTopic, Payload)),
|
|
|
+ %% we should receive a message on the local broker, with specified topic
|
|
|
+ ?assert(
|
|
|
+ receive
|
|
|
+ {deliver, LocalTopic, #message{payload = MapMsg}} ->
|
|
|
+ ct:pal("local broker got message: ~p on topic ~p", [MapMsg, LocalTopic]),
|
|
|
+ %% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here.
|
|
|
+ case jsx:decode(MapMsg) of
|
|
|
+ #{<<"payload">> := Payload} ->
|
|
|
+ true;
|
|
|
+ _ ->
|
|
|
+ false
|
|
|
+ end;
|
|
|
+ Msg ->
|
|
|
+ ct:pal("Msg: ~p", [Msg]),
|
|
|
+ false
|
|
|
+ after 100 ->
|
|
|
+ false
|
|
|
+ end
|
|
|
+ ),
|
|
|
+
|
|
|
+ %% verify the metrics of the bridge
|
|
|
+ {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress]), []),
|
|
|
+ ?assertMatch(
|
|
|
+ #{
|
|
|
+ <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1},
|
|
|
+ <<"node_metrics">> :=
|
|
|
+ [
|
|
|
+ #{
|
|
|
+ <<"node">> := _,
|
|
|
+ <<"metrics">> :=
|
|
|
+ #{<<"matched">> := 0, <<"received">> := 1}
|
|
|
+ }
|
|
|
+ ]
|
|
|
+ },
|
|
|
+ jsx:decode(BridgeStr)
|
|
|
+ ),
|
|
|
+
|
|
|
+ %% delete the bridge
|
|
|
+ {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []),
|
|
|
+ {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
|
|
+
|
|
|
+ ok.
|
|
|
+
|
|
|
t_mqtt_conn_bridge_egress(_) ->
|
|
|
%% then we add a mqtt connector, using POST
|
|
|
User1 = <<"user1">>,
|
|
|
@@ -276,6 +369,80 @@ t_mqtt_conn_bridge_egress(_) ->
|
|
|
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
|
|
ok.
|
|
|
|
|
|
+t_mqtt_conn_bridge_egress_no_payload_template(_) ->
|
|
|
+ %% then we add a mqtt connector, using POST
|
|
|
+ User1 = <<"user1">>,
|
|
|
+
|
|
|
+ {ok, 201, Bridge} = request(
|
|
|
+ post,
|
|
|
+ uri(["bridges"]),
|
|
|
+ ?SERVER_CONF(User1)#{
|
|
|
+ <<"type">> => ?TYPE_MQTT,
|
|
|
+ <<"name">> => ?BRIDGE_NAME_EGRESS,
|
|
|
+ <<"egress">> => ?EGRESS_CONF_NO_PAYLOAD_TEMPLATE
|
|
|
+ }
|
|
|
+ ),
|
|
|
+ #{
|
|
|
+ <<"type">> := ?TYPE_MQTT,
|
|
|
+ <<"name">> := ?BRIDGE_NAME_EGRESS
|
|
|
+ } = jsx:decode(Bridge),
|
|
|
+ BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
|
|
|
+ ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
|
|
|
+ %% we now test if the bridge works as expected
|
|
|
+ LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
|
|
|
+ RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
|
|
|
+ Payload = <<"hello">>,
|
|
|
+ emqx:subscribe(RemoteTopic),
|
|
|
+ timer:sleep(100),
|
|
|
+ %% PUBLISH a message to the 'local' broker, as we have only one broker,
|
|
|
+ %% the remote broker is also the local one.
|
|
|
+ emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
|
|
+
|
|
|
+ %% we should receive a message on the "remote" broker, with specified topic
|
|
|
+ ?assert(
|
|
|
+ receive
|
|
|
+ {deliver, RemoteTopic, #message{payload = MapMsg, from = From}} ->
|
|
|
+ ct:pal("local broker got message: ~p on topic ~p", [MapMsg, RemoteTopic]),
|
|
|
+ %% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here.
|
|
|
+ Size = byte_size(ResourceID),
|
|
|
+ ?assertMatch(<<ResourceID:Size/binary, _/binary>>, From),
|
|
|
+ case jsx:decode(MapMsg) of
|
|
|
+ #{<<"payload">> := Payload} ->
|
|
|
+ true;
|
|
|
+ _ ->
|
|
|
+ false
|
|
|
+ end;
|
|
|
+ Msg ->
|
|
|
+ ct:pal("Msg: ~p", [Msg]),
|
|
|
+ false
|
|
|
+ after 100 ->
|
|
|
+ false
|
|
|
+ end
|
|
|
+ ),
|
|
|
+
|
|
|
+ %% verify the metrics of the bridge
|
|
|
+ {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
|
|
+ ?assertMatch(
|
|
|
+ #{
|
|
|
+ <<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
|
|
|
+ <<"node_metrics">> :=
|
|
|
+ [
|
|
|
+ #{
|
|
|
+ <<"node">> := _,
|
|
|
+ <<"metrics">> :=
|
|
|
+ #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}
|
|
|
+ }
|
|
|
+ ]
|
|
|
+ },
|
|
|
+ jsx:decode(BridgeStr)
|
|
|
+ ),
|
|
|
+
|
|
|
+ %% delete the bridge
|
|
|
+ {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
|
|
|
+ {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
|
|
+
|
|
|
+ ok.
|
|
|
+
|
|
|
t_egress_custom_clientid_prefix(_Config) ->
|
|
|
User1 = <<"user1">>,
|
|
|
{ok, 201, Bridge} = request(
|