|
|
@@ -735,6 +735,89 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
|
|
|
assert_mqtt_msg_received(RemoteTopic, Payload2),
|
|
|
ok.
|
|
|
|
|
|
+t_mqtt_conn_bridge_egress_async_reconnect(_) ->
|
|
|
+ User1 = <<"user1">>,
|
|
|
+ BridgeIDEgress = create_bridge(
|
|
|
+ ?SERVER_CONF(User1)#{
|
|
|
+ <<"type">> => ?TYPE_MQTT,
|
|
|
+ <<"name">> => ?BRIDGE_NAME_EGRESS,
|
|
|
+ <<"egress">> => ?EGRESS_CONF,
|
|
|
+ <<"resource_opts">> => #{
|
|
|
+ <<"worker_pool_size">> => 2,
|
|
|
+ <<"query_mode">> => <<"async">>,
|
|
|
+ %% using a long time so we can test recovery
|
|
|
+ <<"request_timeout">> => <<"15s">>,
|
|
|
+ %% to make it check the healthy quickly
|
|
|
+ <<"health_check_interval">> => <<"0.5s">>,
|
|
|
+ %% to make it reconnect quickly
|
|
|
+ <<"auto_restart_interval">> => <<"1s">>
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ),
|
|
|
+
|
|
|
+ on_exit(fun() ->
|
|
|
+ %% delete the bridge
|
|
|
+ {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
|
|
|
+ {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
|
|
+ ok
|
|
|
+ end),
|
|
|
+
|
|
|
+ Self = self(),
|
|
|
+ LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
|
|
|
+ RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
|
|
|
+ emqx:subscribe(RemoteTopic),
|
|
|
+
|
|
|
+ Publisher = start_publisher(LocalTopic, 200, Self),
|
|
|
+ ct:sleep(1000),
|
|
|
+
|
|
|
+ %% stop the listener 1883 to make the bridge disconnected
|
|
|
+ ok = emqx_listeners:stop_listener('tcp:default'),
|
|
|
+ ct:sleep(1500),
|
|
|
+ ?assertMatch(
|
|
|
+ #{<<"status">> := Status} when Status == <<"connecting">>; Status == <<"disconnected">>,
|
|
|
+ request_bridge(BridgeIDEgress)
|
|
|
+ ),
|
|
|
+
|
|
|
+ %% start the listener 1883 to make the bridge reconnected
|
|
|
+ ok = emqx_listeners:start_listener('tcp:default'),
|
|
|
+ timer:sleep(1500),
|
|
|
+ ?assertMatch(
|
|
|
+ #{<<"status">> := <<"connected">>},
|
|
|
+ request_bridge(BridgeIDEgress)
|
|
|
+ ),
|
|
|
+
|
|
|
+ N = stop_publisher(Publisher),
|
|
|
+
|
|
|
+ %% all those messages should eventually be delivered
|
|
|
+ [
|
|
|
+ assert_mqtt_msg_received(RemoteTopic, Payload)
|
|
|
+ || I <- lists:seq(1, N),
|
|
|
+ Payload <- [integer_to_binary(I)]
|
|
|
+ ],
|
|
|
+
|
|
|
+ ok.
|
|
|
+
|
|
|
+start_publisher(Topic, Interval, CtrlPid) ->
|
|
|
+ spawn_link(fun() -> publisher(Topic, 1, Interval, CtrlPid) end).
|
|
|
+
|
|
|
+stop_publisher(Pid) ->
|
|
|
+ _ = Pid ! {self(), stop},
|
|
|
+ receive
|
|
|
+ {Pid, N} -> N
|
|
|
+ after 1_000 -> ct:fail("publisher ~p did not stop", [Pid])
|
|
|
+ end.
|
|
|
+
|
|
|
+publisher(Topic, N, Delay, CtrlPid) ->
|
|
|
+ _ = emqx:publish(emqx_message:make(Topic, integer_to_binary(N))),
|
|
|
+ receive
|
|
|
+ {CtrlPid, stop} ->
|
|
|
+ CtrlPid ! {self(), N}
|
|
|
+ after Delay ->
|
|
|
+ publisher(Topic, N + 1, Delay, CtrlPid)
|
|
|
+ end.
|
|
|
+
|
|
|
+%%
|
|
|
+
|
|
|
assert_mqtt_msg_received(Topic) ->
|
|
|
assert_mqtt_msg_received(Topic, '_', 200).
|
|
|
|