|
|
@@ -22,15 +22,15 @@
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
|
|
|
--define(CONF_DEFAULT, <<"connectors: {}">>).
|
|
|
+%% output functions
|
|
|
+-export([ inspect/3
|
|
|
+ ]).
|
|
|
+
|
|
|
-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
|
|
|
-define(CONNECTR_TYPE, <<"mqtt">>).
|
|
|
-define(CONNECTR_NAME, <<"test_connector">>).
|
|
|
--define(CONNECTR_ID, <<"mqtt:test_connector">>).
|
|
|
-define(BRIDGE_NAME_INGRESS, <<"ingress_test_bridge">>).
|
|
|
-define(BRIDGE_NAME_EGRESS, <<"egress_test_bridge">>).
|
|
|
--define(BRIDGE_ID_INGRESS, <<"mqtt:ingress_test_bridge">>).
|
|
|
--define(BRIDGE_ID_EGRESS, <<"mqtt:egress_test_bridge">>).
|
|
|
-define(MQTT_CONNECOTR(Username),
|
|
|
#{
|
|
|
<<"server">> => <<"127.0.0.1:1883">>,
|
|
|
@@ -70,6 +70,9 @@
|
|
|
<<"failed">> := FAILED, <<"rate">> := SPEED,
|
|
|
<<"rate_last5m">> := SPEED5M, <<"rate_max">> := SPEEDMAX}).
|
|
|
|
|
|
+inspect(Selected, _Envs, _Args) ->
|
|
|
+ persistent_term:put(?MODULE, #{inspect => Selected}).
|
|
|
+
|
|
|
all() ->
|
|
|
emqx_common_test_helpers:all(?MODULE).
|
|
|
|
|
|
@@ -92,21 +95,38 @@ init_per_suite(Config) ->
|
|
|
%% some testcases (may from other app) already get emqx_connector started
|
|
|
_ = application:stop(emqx_resource),
|
|
|
_ = application:stop(emqx_connector),
|
|
|
- ok = emqx_common_test_helpers:start_apps([emqx_connector, emqx_bridge, emqx_dashboard]),
|
|
|
- ok = emqx_config:init_load(emqx_connector_schema, ?CONF_DEFAULT),
|
|
|
+ ok = emqx_common_test_helpers:start_apps([emqx_rule_engine, emqx_connector,
|
|
|
+ emqx_bridge, emqx_dashboard]),
|
|
|
+ ok = emqx_config:init_load(emqx_connector_schema, <<"connectors: {}">>),
|
|
|
+ ok = emqx_config:init_load(emqx_rule_engine_schema, <<"rule_engine {rules {}}">>),
|
|
|
ok = emqx_config:init_load(emqx_bridge_schema, ?BRIDGE_CONF_DEFAULT),
|
|
|
Config.
|
|
|
|
|
|
end_per_suite(_Config) ->
|
|
|
- emqx_common_test_helpers:stop_apps([emqx_connector, emqx_bridge, emqx_dashboard]),
|
|
|
+ emqx_common_test_helpers:stop_apps([emqx_rule_engine, emqx_connector, emqx_bridge, emqx_dashboard]),
|
|
|
ok.
|
|
|
|
|
|
init_per_testcase(_, Config) ->
|
|
|
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
|
|
|
+ %% assert we there's no connectors and no bridges at first
|
|
|
+ {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
|
|
|
+ {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
|
|
Config.
|
|
|
end_per_testcase(_, _Config) ->
|
|
|
+ clear_resources(),
|
|
|
ok.
|
|
|
|
|
|
+clear_resources() ->
|
|
|
+ lists:foreach(fun(#{id := Id}) ->
|
|
|
+ ok = emqx_rule_engine:delete_rule(Id)
|
|
|
+ end, emqx_rule_engine:get_rules()),
|
|
|
+ lists:foreach(fun(#{id := Id}) ->
|
|
|
+ ok = emqx_bridge:remove(Id)
|
|
|
+ end, emqx_bridge:list()),
|
|
|
+ lists:foreach(fun(#{<<"id">> := Id}) ->
|
|
|
+ ok = emqx_connector:delete(Id)
|
|
|
+ end, emqx_connector:list()).
|
|
|
+
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Testcases
|
|
|
%%------------------------------------------------------------------------------
|
|
|
@@ -123,32 +143,21 @@ t_mqtt_crud_apis(_) ->
|
|
|
, <<"name">> => ?CONNECTR_NAME
|
|
|
}),
|
|
|
|
|
|
- %ct:pal("---connector: ~p", [Connector]),
|
|
|
- ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
|
|
|
- , <<"type">> := ?CONNECTR_TYPE
|
|
|
- , <<"name">> := ?CONNECTR_NAME
|
|
|
- , <<"server">> := <<"127.0.0.1:1883">>
|
|
|
- , <<"username">> := User1
|
|
|
- , <<"password">> := <<"">>
|
|
|
- , <<"proto_ver">> := <<"v4">>
|
|
|
- , <<"ssl">> := #{<<"enable">> := false}
|
|
|
- }, jsx:decode(Connector)),
|
|
|
-
|
|
|
- %% create a again returns an error
|
|
|
- {ok, 400, RetMsg} = request(post, uri(["connectors"]),
|
|
|
- ?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE
|
|
|
- , <<"name">> => ?CONNECTR_NAME
|
|
|
- }),
|
|
|
- ?assertMatch(
|
|
|
- #{ <<"code">> := _
|
|
|
- , <<"message">> := <<"connector already exists">>
|
|
|
- }, jsx:decode(RetMsg)),
|
|
|
+ #{ <<"id">> := ConnctorID
|
|
|
+ , <<"type">> := ?CONNECTR_TYPE
|
|
|
+ , <<"name">> := ?CONNECTR_NAME
|
|
|
+ , <<"server">> := <<"127.0.0.1:1883">>
|
|
|
+ , <<"username">> := User1
|
|
|
+ , <<"password">> := <<"">>
|
|
|
+ , <<"proto_ver">> := <<"v4">>
|
|
|
+ , <<"ssl">> := #{<<"enable">> := false}
|
|
|
+ } = jsx:decode(Connector),
|
|
|
|
|
|
%% update the request-path of the connector
|
|
|
User2 = <<"user2">>,
|
|
|
- {ok, 200, Connector2} = request(put, uri(["connectors", ?CONNECTR_ID]),
|
|
|
+ {ok, 200, Connector2} = request(put, uri(["connectors", ConnctorID]),
|
|
|
?MQTT_CONNECOTR(User2)),
|
|
|
- ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
|
|
|
+ ?assertMatch(#{ <<"id">> := ConnctorID
|
|
|
, <<"server">> := <<"127.0.0.1:1883">>
|
|
|
, <<"username">> := User2
|
|
|
, <<"password">> := <<"">>
|
|
|
@@ -158,7 +167,7 @@ t_mqtt_crud_apis(_) ->
|
|
|
|
|
|
%% list all connectors again, assert Connector2 is in it
|
|
|
{ok, 200, Connector2Str} = request(get, uri(["connectors"]), []),
|
|
|
- ?assertMatch([#{ <<"id">> := ?CONNECTR_ID
|
|
|
+ ?assertMatch([#{ <<"id">> := ConnctorID
|
|
|
, <<"type">> := ?CONNECTR_TYPE
|
|
|
, <<"name">> := ?CONNECTR_NAME
|
|
|
, <<"server">> := <<"127.0.0.1:1883">>
|
|
|
@@ -169,8 +178,8 @@ t_mqtt_crud_apis(_) ->
|
|
|
}], jsx:decode(Connector2Str)),
|
|
|
|
|
|
%% get the connector by id
|
|
|
- {ok, 200, Connector3Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []),
|
|
|
- ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
|
|
|
+ {ok, 200, Connector3Str} = request(get, uri(["connectors", ConnctorID]), []),
|
|
|
+ ?assertMatch(#{ <<"id">> := ConnctorID
|
|
|
, <<"type">> := ?CONNECTR_TYPE
|
|
|
, <<"name">> := ?CONNECTR_NAME
|
|
|
, <<"server">> := <<"127.0.0.1:1883">>
|
|
|
@@ -181,11 +190,11 @@ t_mqtt_crud_apis(_) ->
|
|
|
}, jsx:decode(Connector3Str)),
|
|
|
|
|
|
%% delete the connector
|
|
|
- {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
|
|
|
+ {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
|
|
|
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
|
|
|
|
|
|
%% update a deleted connector returns an error
|
|
|
- {ok, 404, ErrMsg2} = request(put, uri(["connectors", ?CONNECTR_ID]),
|
|
|
+ {ok, 404, ErrMsg2} = request(put, uri(["connectors", ConnctorID]),
|
|
|
?MQTT_CONNECOTR(User2)),
|
|
|
?assertMatch(
|
|
|
#{ <<"code">> := _
|
|
|
@@ -194,10 +203,6 @@ t_mqtt_crud_apis(_) ->
|
|
|
ok.
|
|
|
|
|
|
t_mqtt_conn_bridge_ingress(_) ->
|
|
|
- %% assert we there's no connectors and no bridges at first
|
|
|
- {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
|
|
|
- {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
|
|
-
|
|
|
%% then we add a mqtt connector, using POST
|
|
|
User1 = <<"user1">>,
|
|
|
{ok, 201, Connector} = request(post, uri(["connectors"]),
|
|
|
@@ -205,28 +210,28 @@ t_mqtt_conn_bridge_ingress(_) ->
|
|
|
, <<"name">> => ?CONNECTR_NAME
|
|
|
}),
|
|
|
|
|
|
- ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
|
|
|
- , <<"server">> := <<"127.0.0.1:1883">>
|
|
|
- , <<"num_of_bridges">> := 0
|
|
|
- , <<"username">> := User1
|
|
|
- , <<"password">> := <<"">>
|
|
|
- , <<"proto_ver">> := <<"v4">>
|
|
|
- , <<"ssl">> := #{<<"enable">> := false}
|
|
|
- }, jsx:decode(Connector)),
|
|
|
+ #{ <<"id">> := ConnctorID
|
|
|
+ , <<"server">> := <<"127.0.0.1:1883">>
|
|
|
+ , <<"num_of_bridges">> := 0
|
|
|
+ , <<"username">> := User1
|
|
|
+ , <<"password">> := <<"">>
|
|
|
+ , <<"proto_ver">> := <<"v4">>
|
|
|
+ , <<"ssl">> := #{<<"enable">> := false}
|
|
|
+ } = jsx:decode(Connector),
|
|
|
|
|
|
%% ... and a MQTT bridge, using POST
|
|
|
%% we bind this bridge to the connector created just now
|
|
|
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
|
|
- ?MQTT_BRIDGE_INGRESS(?CONNECTR_ID)#{
|
|
|
+ ?MQTT_BRIDGE_INGRESS(ConnctorID)#{
|
|
|
<<"type">> => ?CONNECTR_TYPE,
|
|
|
<<"name">> => ?BRIDGE_NAME_INGRESS
|
|
|
}),
|
|
|
|
|
|
- ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_INGRESS
|
|
|
- , <<"type">> := <<"mqtt">>
|
|
|
- , <<"status">> := <<"connected">>
|
|
|
- , <<"connector">> := ?CONNECTR_ID
|
|
|
- }, jsx:decode(Bridge)),
|
|
|
+ #{ <<"id">> := BridgeIDIngress
|
|
|
+ , <<"type">> := <<"mqtt">>
|
|
|
+ , <<"status">> := <<"connected">>
|
|
|
+ , <<"connector">> := ConnctorID
|
|
|
+ } = jsx:decode(Bridge),
|
|
|
|
|
|
%% we now test if the bridge works as expected
|
|
|
|
|
|
@@ -236,8 +241,8 @@ t_mqtt_conn_bridge_ingress(_) ->
|
|
|
emqx:subscribe(LocalTopic),
|
|
|
%% PUBLISH a message to the 'remote' broker, as we have only one broker,
|
|
|
%% the remote broker is also the local one.
|
|
|
+ wait_for_resource_ready(BridgeIDIngress, 5),
|
|
|
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
|
|
|
-
|
|
|
%% we should receive a message on the local broker, with specified topic
|
|
|
?assert(
|
|
|
receive
|
|
|
@@ -252,25 +257,21 @@ t_mqtt_conn_bridge_ingress(_) ->
|
|
|
end),
|
|
|
|
|
|
%% get the connector by id, verify the num_of_bridges now is 1
|
|
|
- {ok, 200, Connector1Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []),
|
|
|
- ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
|
|
|
+ {ok, 200, Connector1Str} = request(get, uri(["connectors", ConnctorID]), []),
|
|
|
+ ?assertMatch(#{ <<"id">> := ConnctorID
|
|
|
, <<"num_of_bridges">> := 1
|
|
|
}, jsx:decode(Connector1Str)),
|
|
|
|
|
|
%% delete the bridge
|
|
|
- {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_INGRESS]), []),
|
|
|
+ {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []),
|
|
|
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
|
|
|
|
|
%% delete the connector
|
|
|
- {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
|
|
|
+ {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
|
|
|
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
|
|
|
ok.
|
|
|
|
|
|
t_mqtt_conn_bridge_egress(_) ->
|
|
|
- %% assert we there's no connectors and no bridges at first
|
|
|
- {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
|
|
|
- {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
|
|
-
|
|
|
%% then we add a mqtt connector, using POST
|
|
|
User1 = <<"user1">>,
|
|
|
{ok, 201, Connector} = request(post, uri(["connectors"]),
|
|
|
@@ -279,29 +280,28 @@ t_mqtt_conn_bridge_egress(_) ->
|
|
|
}),
|
|
|
|
|
|
%ct:pal("---connector: ~p", [Connector]),
|
|
|
- ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
|
|
|
- , <<"server">> := <<"127.0.0.1:1883">>
|
|
|
- , <<"username">> := User1
|
|
|
- , <<"password">> := <<"">>
|
|
|
- , <<"proto_ver">> := <<"v4">>
|
|
|
- , <<"ssl">> := #{<<"enable">> := false}
|
|
|
- }, jsx:decode(Connector)),
|
|
|
+ #{ <<"id">> := ConnctorID
|
|
|
+ , <<"server">> := <<"127.0.0.1:1883">>
|
|
|
+ , <<"username">> := User1
|
|
|
+ , <<"password">> := <<"">>
|
|
|
+ , <<"proto_ver">> := <<"v4">>
|
|
|
+ , <<"ssl">> := #{<<"enable">> := false}
|
|
|
+ } = jsx:decode(Connector),
|
|
|
|
|
|
%% ... and a MQTT bridge, using POST
|
|
|
%% we bind this bridge to the connector created just now
|
|
|
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
|
|
- ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{
|
|
|
+ ?MQTT_BRIDGE_EGRESS(ConnctorID)#{
|
|
|
<<"type">> => ?CONNECTR_TYPE,
|
|
|
<<"name">> => ?BRIDGE_NAME_EGRESS
|
|
|
}),
|
|
|
|
|
|
- %ct:pal("---bridge: ~p", [Bridge]),
|
|
|
- ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
|
|
|
- , <<"type">> := ?CONNECTR_TYPE
|
|
|
- , <<"name">> := ?BRIDGE_NAME_EGRESS
|
|
|
- , <<"status">> := <<"connected">>
|
|
|
- , <<"connector">> := ?CONNECTR_ID
|
|
|
- }, jsx:decode(Bridge)),
|
|
|
+ #{ <<"id">> := BridgeIDEgress
|
|
|
+ , <<"type">> := ?CONNECTR_TYPE
|
|
|
+ , <<"name">> := ?BRIDGE_NAME_EGRESS
|
|
|
+ , <<"status">> := <<"connected">>
|
|
|
+ , <<"connector">> := ConnctorID
|
|
|
+ } = jsx:decode(Bridge),
|
|
|
|
|
|
%% we now test if the bridge works as expected
|
|
|
LocalTopic = <<"local_topic/1">>,
|
|
|
@@ -310,6 +310,7 @@ t_mqtt_conn_bridge_egress(_) ->
|
|
|
emqx:subscribe(RemoteTopic),
|
|
|
%% PUBLISH a message to the 'local' broker, as we have only one broker,
|
|
|
%% the remote broker is also the local one.
|
|
|
+ wait_for_resource_ready(BridgeIDEgress, 5),
|
|
|
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
|
|
|
|
|
%% we should receive a message on the "remote" broker, with specified topic
|
|
|
@@ -326,19 +327,19 @@ t_mqtt_conn_bridge_egress(_) ->
|
|
|
end),
|
|
|
|
|
|
%% verify the metrics of the bridge
|
|
|
- {ok, 200, BridgeStr} = request(get, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
|
|
|
- ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
|
|
|
+ {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
|
|
+ ?assertMatch(#{ <<"id">> := BridgeIDEgress
|
|
|
, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)
|
|
|
, <<"node_metrics">> :=
|
|
|
[#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}]
|
|
|
}, jsx:decode(BridgeStr)),
|
|
|
|
|
|
%% delete the bridge
|
|
|
- {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
|
|
|
+ {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
|
|
|
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
|
|
|
|
|
%% delete the connector
|
|
|
- {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
|
|
|
+ {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
|
|
|
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
|
|
|
ok.
|
|
|
|
|
|
@@ -346,10 +347,6 @@ t_mqtt_conn_bridge_egress(_) ->
|
|
|
%% - update a connector should also update all of the the bridges
|
|
|
%% - cannot delete a connector that is used by at least one bridge
|
|
|
t_mqtt_conn_update(_) ->
|
|
|
- %% assert we there's no connectors and no bridges at first
|
|
|
- {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
|
|
|
- {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
|
|
-
|
|
|
%% then we add a mqtt connector, using POST
|
|
|
{ok, 201, Connector} = request(post, uri(["connectors"]),
|
|
|
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)
|
|
|
@@ -358,44 +355,41 @@ t_mqtt_conn_update(_) ->
|
|
|
}),
|
|
|
|
|
|
%ct:pal("---connector: ~p", [Connector]),
|
|
|
- ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
|
|
|
- , <<"server">> := <<"127.0.0.1:1883">>
|
|
|
- }, jsx:decode(Connector)),
|
|
|
+ #{ <<"id">> := ConnctorID
|
|
|
+ , <<"server">> := <<"127.0.0.1:1883">>
|
|
|
+ } = jsx:decode(Connector),
|
|
|
|
|
|
%% ... and a MQTT bridge, using POST
|
|
|
%% we bind this bridge to the connector created just now
|
|
|
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
|
|
- ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{
|
|
|
+ ?MQTT_BRIDGE_EGRESS(ConnctorID)#{
|
|
|
<<"type">> => ?CONNECTR_TYPE,
|
|
|
<<"name">> => ?BRIDGE_NAME_EGRESS
|
|
|
}),
|
|
|
- ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
|
|
|
- , <<"type">> := <<"mqtt">>
|
|
|
- , <<"name">> := ?BRIDGE_NAME_EGRESS
|
|
|
- , <<"status">> := <<"connected">>
|
|
|
- , <<"connector">> := ?CONNECTR_ID
|
|
|
- }, jsx:decode(Bridge)),
|
|
|
+ #{ <<"id">> := BridgeIDEgress
|
|
|
+ , <<"type">> := <<"mqtt">>
|
|
|
+ , <<"name">> := ?BRIDGE_NAME_EGRESS
|
|
|
+ , <<"status">> := <<"connected">>
|
|
|
+ , <<"connector">> := ConnctorID
|
|
|
+ } = jsx:decode(Bridge),
|
|
|
+ wait_for_resource_ready(BridgeIDEgress, 2),
|
|
|
|
|
|
%% then we try to update 'server' of the connector, to an unavailable IP address
|
|
|
%% the update should fail because of 'unreachable' or 'connrefused'
|
|
|
- {ok, 400, _ErrorMsg} = request(put, uri(["connectors", ?CONNECTR_ID]),
|
|
|
+ {ok, 400, _ErrorMsg} = request(put, uri(["connectors", ConnctorID]),
|
|
|
?MQTT_CONNECOTR2(<<"127.0.0.1:2603">>)),
|
|
|
%% we fix the 'server' parameter to a normal one, it should work
|
|
|
- {ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]),
|
|
|
+ {ok, 200, _} = request(put, uri(["connectors", ConnctorID]),
|
|
|
?MQTT_CONNECOTR2(<<"127.0.0.1 : 1883">>)),
|
|
|
%% delete the bridge
|
|
|
- {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
|
|
|
+ {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
|
|
|
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
|
|
|
|
|
%% delete the connector
|
|
|
- {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
|
|
|
+ {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
|
|
|
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
|
|
|
|
|
|
t_mqtt_conn_update2(_) ->
|
|
|
- %% assert we there's no connectors and no bridges at first
|
|
|
- {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
|
|
|
- {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
|
|
-
|
|
|
%% then we add a mqtt connector, using POST
|
|
|
%% but this connector is point to a unreachable server "2603"
|
|
|
{ok, 201, Connector} = request(post, uri(["connectors"]),
|
|
|
@@ -404,38 +398,71 @@ t_mqtt_conn_update2(_) ->
|
|
|
, <<"name">> => ?CONNECTR_NAME
|
|
|
}),
|
|
|
|
|
|
- ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
|
|
|
- , <<"server">> := <<"127.0.0.1:2603">>
|
|
|
- }, jsx:decode(Connector)),
|
|
|
+ #{ <<"id">> := ConnctorID
|
|
|
+ , <<"server">> := <<"127.0.0.1:2603">>
|
|
|
+ } = jsx:decode(Connector),
|
|
|
|
|
|
%% ... and a MQTT bridge, using POST
|
|
|
%% we bind this bridge to the connector created just now
|
|
|
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
|
|
- ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{
|
|
|
+ ?MQTT_BRIDGE_EGRESS(ConnctorID)#{
|
|
|
<<"type">> => ?CONNECTR_TYPE,
|
|
|
<<"name">> => ?BRIDGE_NAME_EGRESS
|
|
|
}),
|
|
|
- ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
|
|
|
- , <<"type">> := <<"mqtt">>
|
|
|
- , <<"name">> := ?BRIDGE_NAME_EGRESS
|
|
|
- , <<"status">> := <<"disconnected">>
|
|
|
- , <<"connector">> := ?CONNECTR_ID
|
|
|
- }, jsx:decode(Bridge)),
|
|
|
+ #{ <<"id">> := BridgeIDEgress
|
|
|
+ , <<"type">> := <<"mqtt">>
|
|
|
+ , <<"name">> := ?BRIDGE_NAME_EGRESS
|
|
|
+ , <<"status">> := <<"disconnected">>
|
|
|
+ , <<"connector">> := ConnctorID
|
|
|
+ } = jsx:decode(Bridge),
|
|
|
+ %% We try to fix the 'server' parameter, to another unavailable server..
|
|
|
+ %% The update should success: we don't check the connectivity of the new config
|
|
|
+ %% if the resource is now disconnected.
|
|
|
+ {ok, 200, _} = request(put, uri(["connectors", ConnctorID]),
|
|
|
+ ?MQTT_CONNECOTR2(<<"127.0.0.1:2604">>)),
|
|
|
%% we fix the 'server' parameter to a normal one, it should work
|
|
|
- {ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]),
|
|
|
+ {ok, 200, _} = request(put, uri(["connectors", ConnctorID]),
|
|
|
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)),
|
|
|
- {ok, 200, BridgeStr} = request(get, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
|
|
|
- ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
|
|
|
+ {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
|
|
+ ?assertMatch(#{ <<"id">> := BridgeIDEgress
|
|
|
, <<"status">> := <<"connected">>
|
|
|
}, jsx:decode(BridgeStr)),
|
|
|
%% delete the bridge
|
|
|
- {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
|
|
|
+ {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
|
|
|
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
|
|
|
|
|
%% delete the connector
|
|
|
- {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
|
|
|
+ {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
|
|
|
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
|
|
|
|
|
|
+t_mqtt_conn_update3(_) ->
|
|
|
+ %% we add a mqtt connector, using POST
|
|
|
+ {ok, 201, Connector} = request(post, uri(["connectors"]),
|
|
|
+ ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)
|
|
|
+ #{ <<"type">> => ?CONNECTR_TYPE
|
|
|
+ , <<"name">> => ?CONNECTR_NAME
|
|
|
+ }),
|
|
|
+ #{ <<"id">> := ConnctorID } = jsx:decode(Connector),
|
|
|
+
|
|
|
+ %% ... and a MQTT bridge, using POST
|
|
|
+ %% we bind this bridge to the connector created just now
|
|
|
+ {ok, 201, Bridge} = request(post, uri(["bridges"]),
|
|
|
+ ?MQTT_BRIDGE_EGRESS(ConnctorID)#{
|
|
|
+ <<"type">> => ?CONNECTR_TYPE,
|
|
|
+ <<"name">> => ?BRIDGE_NAME_EGRESS
|
|
|
+ }),
|
|
|
+ #{ <<"id">> := BridgeIDEgress
|
|
|
+ , <<"connector">> := ConnctorID
|
|
|
+ } = jsx:decode(Bridge),
|
|
|
+ wait_for_resource_ready(BridgeIDEgress, 2),
|
|
|
+
|
|
|
+ %% delete the connector should fail because it is in use by a bridge
|
|
|
+ {ok, 403, _} = request(delete, uri(["connectors", ConnctorID]), []),
|
|
|
+ %% delete the bridge
|
|
|
+ {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
|
|
|
+ %% the connector now can be deleted without problems
|
|
|
+ {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []).
|
|
|
+
|
|
|
t_mqtt_conn_testing(_) ->
|
|
|
%% APIs for testing the connectivity
|
|
|
%% then we add a mqtt connector, using POST
|
|
|
@@ -450,6 +477,153 @@ t_mqtt_conn_testing(_) ->
|
|
|
<<"name">> => ?BRIDGE_NAME_EGRESS
|
|
|
}).
|
|
|
|
|
|
+t_ingress_mqtt_bridge_with_rules(_) ->
|
|
|
+ {ok, 201, Connector} = request(post, uri(["connectors"]),
|
|
|
+ ?MQTT_CONNECOTR(<<"user1">>)#{ <<"type">> => ?CONNECTR_TYPE
|
|
|
+ , <<"name">> => ?CONNECTR_NAME
|
|
|
+ }),
|
|
|
+ #{ <<"id">> := ConnctorID } = jsx:decode(Connector),
|
|
|
+
|
|
|
+ {ok, 201, Bridge} = request(post, uri(["bridges"]),
|
|
|
+ ?MQTT_BRIDGE_INGRESS(ConnctorID)#{
|
|
|
+ <<"type">> => ?CONNECTR_TYPE,
|
|
|
+ <<"name">> => ?BRIDGE_NAME_INGRESS
|
|
|
+ }),
|
|
|
+ #{ <<"id">> := BridgeIDIngress } = jsx:decode(Bridge),
|
|
|
+
|
|
|
+ {ok, 201, Rule} = request(post, uri(["rules"]),
|
|
|
+ #{<<"name">> => <<"A rule get messages from a source mqtt bridge">>,
|
|
|
+ <<"enable">> => true,
|
|
|
+ <<"outputs">> => [#{<<"function">> => "emqx_connector_api_SUITE:inspect"}],
|
|
|
+ <<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">>
|
|
|
+ }),
|
|
|
+ #{<<"id">> := RuleId} = jsx:decode(Rule),
|
|
|
+
|
|
|
+ %% we now test if the bridge works as expected
|
|
|
+
|
|
|
+ RemoteTopic = <<"remote_topic/1">>,
|
|
|
+ LocalTopic = <<"local_topic/", RemoteTopic/binary>>,
|
|
|
+ Payload = <<"hello">>,
|
|
|
+ emqx:subscribe(LocalTopic),
|
|
|
+ %% PUBLISH a message to the 'remote' broker, as we have only one broker,
|
|
|
+ %% the remote broker is also the local one.
|
|
|
+ wait_for_resource_ready(BridgeIDIngress, 5),
|
|
|
+ emqx:publish(emqx_message:make(RemoteTopic, Payload)),
|
|
|
+ %% we should receive a message on the local broker, with specified topic
|
|
|
+ ?assert(
|
|
|
+ receive
|
|
|
+ {deliver, LocalTopic, #message{payload = Payload}} ->
|
|
|
+ ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]),
|
|
|
+ true;
|
|
|
+ Msg ->
|
|
|
+ ct:pal("Msg: ~p", [Msg]),
|
|
|
+ false
|
|
|
+ after 100 ->
|
|
|
+ false
|
|
|
+ end),
|
|
|
+ %% and also the rule should be matched, with matched + 1:
|
|
|
+ {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
|
|
|
+ #{ <<"id">> := RuleId
|
|
|
+ , <<"metrics">> := #{<<"matched">> := 1}
|
|
|
+ } = jsx:decode(Rule1),
|
|
|
+ %% we also check if the outputs of the rule is triggered
|
|
|
+ ?assertMatch(#{inspect := #{
|
|
|
+ event := '$bridges/mqtt',
|
|
|
+ id := MsgId,
|
|
|
+ payload := Payload,
|
|
|
+ topic := RemoteTopic,
|
|
|
+ qos := 0,
|
|
|
+ dup := false,
|
|
|
+ retain := false,
|
|
|
+ pub_props := #{},
|
|
|
+ timestamp := _
|
|
|
+ }} when is_binary(MsgId), persistent_term:get(?MODULE)),
|
|
|
+
|
|
|
+ {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
|
|
|
+ {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []),
|
|
|
+ {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []).
|
|
|
+
|
|
|
+t_egress_mqtt_bridge_with_rules(_) ->
|
|
|
+ {ok, 201, Connector} = request(post, uri(["connectors"]),
|
|
|
+ ?MQTT_CONNECOTR(<<"user1">>)#{ <<"type">> => ?CONNECTR_TYPE
|
|
|
+ , <<"name">> => ?CONNECTR_NAME
|
|
|
+ }),
|
|
|
+ #{ <<"id">> := ConnctorID } = jsx:decode(Connector),
|
|
|
+
|
|
|
+ {ok, 201, Bridge} = request(post, uri(["bridges"]),
|
|
|
+ ?MQTT_BRIDGE_EGRESS(ConnctorID)#{
|
|
|
+ <<"type">> => ?CONNECTR_TYPE,
|
|
|
+ <<"name">> => ?BRIDGE_NAME_EGRESS
|
|
|
+ }),
|
|
|
+ #{ <<"id">> := BridgeIDEgress } = jsx:decode(Bridge),
|
|
|
+
|
|
|
+ {ok, 201, Rule} = request(post, uri(["rules"]),
|
|
|
+ #{<<"name">> => <<"A rule send messages to a sink mqtt bridge">>,
|
|
|
+ <<"enable">> => true,
|
|
|
+ <<"outputs">> => [BridgeIDEgress],
|
|
|
+ <<"sql">> => <<"SELECT * from \"t/1\"">>
|
|
|
+ }),
|
|
|
+ #{<<"id">> := RuleId} = jsx:decode(Rule),
|
|
|
+
|
|
|
+ %% we now test if the bridge works as expected
|
|
|
+ LocalTopic = <<"local_topic/1">>,
|
|
|
+ RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
|
|
|
+ Payload = <<"hello">>,
|
|
|
+ emqx:subscribe(RemoteTopic),
|
|
|
+ %% PUBLISH a message to the 'local' broker, as we have only one broker,
|
|
|
+ %% the remote broker is also the local one.
|
|
|
+ wait_for_resource_ready(BridgeIDEgress, 5),
|
|
|
+ emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
|
|
+ %% we should receive a message on the "remote" broker, with specified topic
|
|
|
+ ?assert(
|
|
|
+ receive
|
|
|
+ {deliver, RemoteTopic, #message{payload = Payload}} ->
|
|
|
+ ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
|
|
|
+ true;
|
|
|
+ Msg ->
|
|
|
+ ct:pal("Msg: ~p", [Msg]),
|
|
|
+ false
|
|
|
+ after 100 ->
|
|
|
+ false
|
|
|
+ end),
|
|
|
+ emqx:unsubscribe(RemoteTopic),
|
|
|
+
|
|
|
+ %% PUBLISH a message to the rule.
|
|
|
+ Payload2 = <<"hi">>,
|
|
|
+ RuleTopic = <<"t/1">>,
|
|
|
+ RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>,
|
|
|
+ emqx:subscribe(RemoteTopic2),
|
|
|
+ wait_for_resource_ready(BridgeIDEgress, 5),
|
|
|
+ emqx:publish(emqx_message:make(RuleTopic, Payload2)),
|
|
|
+ {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
|
|
|
+ #{ <<"id">> := RuleId
|
|
|
+ , <<"metrics">> := #{<<"matched">> := 1}
|
|
|
+ } = jsx:decode(Rule1),
|
|
|
+ %% we should receive a message on the "remote" broker, with specified topic
|
|
|
+ ?assert(
|
|
|
+ receive
|
|
|
+ {deliver, RemoteTopic2, #message{payload = Payload2}} ->
|
|
|
+ ct:pal("local broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]),
|
|
|
+ true;
|
|
|
+ Msg ->
|
|
|
+ ct:pal("Msg: ~p", [Msg]),
|
|
|
+ false
|
|
|
+ after 100 ->
|
|
|
+ false
|
|
|
+ end),
|
|
|
+
|
|
|
+ %% verify the metrics of the bridge
|
|
|
+ {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
|
|
+ ?assertMatch(#{ <<"id">> := BridgeIDEgress
|
|
|
+ , <<"metrics">> := ?metrics(2, 2, 0, _, _, _)
|
|
|
+ , <<"node_metrics">> :=
|
|
|
+ [#{<<"node">> := _, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)}]
|
|
|
+ }, jsx:decode(BridgeStr)),
|
|
|
+
|
|
|
+ {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
|
|
|
+ {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
|
|
|
+ {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []).
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% HTTP Request
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -483,3 +657,13 @@ auth_header_() ->
|
|
|
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
|
|
|
{"Authorization", "Bearer " ++ binary_to_list(Token)}.
|
|
|
|
|
|
+wait_for_resource_ready(InstId, 0) ->
|
|
|
+ ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]),
|
|
|
+ ct:fail(wait_resource_timeout);
|
|
|
+wait_for_resource_ready(InstId, Retry) ->
|
|
|
+ case emqx_bridge:lookup(InstId) of
|
|
|
+ {ok, #{resource_data := #{status := started}}} -> ok;
|
|
|
+ _ ->
|
|
|
+ timer:sleep(100),
|
|
|
+ wait_for_resource_ready(InstId, Retry-1)
|
|
|
+ end.
|