|
|
@@ -27,8 +27,6 @@
|
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
|
|
--import(emqx_common_test_helpers, [on_exit/1]).
|
|
|
-
|
|
|
%% output functions
|
|
|
-export([inspect/3]).
|
|
|
|
|
|
@@ -1027,40 +1025,38 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) ->
|
|
|
ct:sleep(1000),
|
|
|
|
|
|
%% stop the listener 1883 to make the bridge disconnected
|
|
|
- ok = emqx_listeners:stop_listener('tcp:default'),
|
|
|
- ct:sleep(1500),
|
|
|
- ?assertMatch(
|
|
|
- #{<<"status">> := Status} when
|
|
|
- Status == <<"connecting">> orelse Status == <<"disconnected">>,
|
|
|
- request_bridge(BridgeIDEgress)
|
|
|
- ),
|
|
|
-
|
|
|
- %% start the listener 1883 to make the bridge reconnected
|
|
|
- ok = emqx_listeners:start_listener('tcp:default'),
|
|
|
- timer:sleep(1500),
|
|
|
- ?assertMatch(
|
|
|
- #{<<"status">> := <<"connected">>},
|
|
|
- request_bridge(BridgeIDEgress)
|
|
|
- ),
|
|
|
-
|
|
|
- N = stop_publisher(Publisher),
|
|
|
-
|
|
|
- %% all those messages should eventually be delivered
|
|
|
- [
|
|
|
- assert_mqtt_msg_received(RemoteTopic, Payload)
|
|
|
- || I <- lists:seq(1, N),
|
|
|
- Payload <- [integer_to_binary(I)]
|
|
|
- ],
|
|
|
-
|
|
|
- Trace = snabbkaffe:collect_trace(50),
|
|
|
- ?assert(
|
|
|
- lists:any(
|
|
|
- fun(K) ->
|
|
|
- maps:get(msg, K, not_found) =:=
|
|
|
- emqx_bridge_mqtt_connector_econnrefused_error
|
|
|
- end,
|
|
|
- Trace
|
|
|
- )
|
|
|
+ ?check_trace(
|
|
|
+ begin
|
|
|
+ ok = emqx_listeners:stop_listener('tcp:default'),
|
|
|
+ ct:sleep(1500),
|
|
|
+ ?assertMatch(
|
|
|
+ #{<<"status">> := Status} when
|
|
|
+ Status == <<"connecting">> orelse Status == <<"disconnected">>,
|
|
|
+ request_bridge(BridgeIDEgress)
|
|
|
+ ),
|
|
|
+
|
|
|
+ %% start the listener 1883 to make the bridge reconnected
|
|
|
+ ok = emqx_listeners:start_listener('tcp:default'),
|
|
|
+ timer:sleep(1500),
|
|
|
+ ?assertMatch(
|
|
|
+ #{<<"status">> := <<"connected">>},
|
|
|
+ request_bridge(BridgeIDEgress)
|
|
|
+ ),
|
|
|
+
|
|
|
+ N = stop_publisher(Publisher),
|
|
|
+
|
|
|
+ %% all those messages should eventually be delivered
|
|
|
+ [
|
|
|
+ assert_mqtt_msg_received(RemoteTopic, Payload)
|
|
|
+ || I <- lists:seq(1, N),
|
|
|
+ Payload <- [integer_to_binary(I)]
|
|
|
+ ],
|
|
|
+ ok
|
|
|
+ end,
|
|
|
+ fun(Trace) ->
|
|
|
+ ?assertMatch([_ | _], ?of_kind(emqx_bridge_mqtt_connector_econnrefused_error, Trace)),
|
|
|
+ ok
|
|
|
+ end
|
|
|
),
|
|
|
ok.
|
|
|
|