|
|
@@ -27,6 +27,8 @@
|
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
|
|
+-import(emqx_common_test_helpers, [on_exit/1]).
|
|
|
+
|
|
|
%% output functions
|
|
|
-export([inspect/3]).
|
|
|
|
|
|
@@ -399,6 +401,56 @@ t_mqtt_conn_bridge_ingress_shared_subscription(_) ->
|
|
|
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
|
|
|
ok.
|
|
|
|
|
|
+t_connect_with_more_clients_than_the_broker_accepts(_) ->
|
|
|
+ PoolSize = 100,
|
|
|
+ OrgConf = emqx_mgmt_listeners_conf:get_raw(tcp, default),
|
|
|
+ on_exit(fun() ->
|
|
|
+ emqx_mgmt_listeners_conf:update(tcp, default, OrgConf)
|
|
|
+ end),
|
|
|
+ NewConf = OrgConf#{<<"max_connections">> => 3},
|
|
|
+ {ok, _} = emqx_mgmt_listeners_conf:update(tcp, default, NewConf),
|
|
|
+ BridgeName = atom_to_binary(?FUNCTION_NAME),
|
|
|
+ BridgeID = create_bridge(
|
|
|
+ ?SERVER_CONF#{
|
|
|
+ <<"name">> => BridgeName,
|
|
|
+ <<"ingress">> => #{
|
|
|
+ <<"pool_size">> => PoolSize,
|
|
|
+ <<"remote">> => #{
|
|
|
+ <<"topic">> => <<"$share/ingress/", ?INGRESS_REMOTE_TOPIC, "/#">>,
|
|
|
+ <<"qos">> => 1
|
|
|
+ },
|
|
|
+ <<"local">> => #{
|
|
|
+ <<"topic">> => <<?INGRESS_LOCAL_TOPIC, "/${topic}">>,
|
|
|
+ <<"qos">> => <<"${qos}">>,
|
|
|
+ <<"payload">> => <<"${clientid}">>,
|
|
|
+ <<"retain">> => <<"${retain}">>
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ),
|
|
|
+ snabbkaffe:block_until(
|
|
|
+ fun
|
|
|
+ (#{msg := emqx_bridge_mqtt_connector_tcp_closed}) ->
|
|
|
+ true;
|
|
|
+ (_) ->
|
|
|
+ false
|
|
|
+ end,
|
|
|
+ 5000
|
|
|
+ ),
|
|
|
+ Trace = snabbkaffe:collect_trace(),
|
|
|
+ ?assert(
|
|
|
+ lists:any(
|
|
|
+ fun(K) ->
|
|
|
+ maps:get(msg, K, not_found) =:=
|
|
|
+ emqx_bridge_mqtt_connector_tcp_closed
|
|
|
+ end,
|
|
|
+ Trace
|
|
|
+ )
|
|
|
+ ),
|
|
|
+
|
|
|
+ {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
|
|
|
+ ok.
|
|
|
+
|
|
|
t_mqtt_egress_bridge_warns_clean_start(_) ->
|
|
|
BridgeName = atom_to_binary(?FUNCTION_NAME),
|
|
|
Action = fun() ->
|
|
|
@@ -1050,6 +1102,16 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) ->
|
|
|
Payload <- [integer_to_binary(I)]
|
|
|
],
|
|
|
|
|
|
+ Trace = snabbkaffe:collect_trace(50),
|
|
|
+ ?assert(
|
|
|
+ lists:any(
|
|
|
+ fun(K) ->
|
|
|
+ maps:get(msg, K, not_found) =:=
|
|
|
+ emqx_bridge_mqtt_connector_econnrefused_error
|
|
|
+ end,
|
|
|
+ Trace
|
|
|
+ )
|
|
|
+ ),
|
|
|
ok.
|
|
|
|
|
|
start_publisher(Topic, Interval, CtrlPid) ->
|