|
|
@@ -238,6 +238,24 @@ wait_connection_process_unregistered(ClientId) ->
|
|
|
?assertEqual([], emqx_cm:lookup_channels(ClientId))
|
|
|
).
|
|
|
|
|
|
+wait_channel_disconnected(ClientId) ->
|
|
|
+ ?retry(
|
|
|
+ _Timeout = 100,
|
|
|
+ _Retries = 20,
|
|
|
+ case emqx_cm:lookup_channels(ClientId) of
|
|
|
+ [] ->
|
|
|
+ false;
|
|
|
+ [ChanPid] ->
|
|
|
+ false = emqx_cm:is_channel_connected(ChanPid)
|
|
|
+ end
|
|
|
+ ).
|
|
|
+
|
|
|
+disconnect_client(ClientPid) ->
|
|
|
+ ClientId = proplists:get_value(clientid, emqtt:info(ClientPid)),
|
|
|
+ ok = emqtt:disconnect(ClientPid),
|
|
|
+ false = wait_channel_disconnected(ClientId),
|
|
|
+ ok.
|
|
|
+
|
|
|
messages(Topic, Payloads) ->
|
|
|
messages(Topic, Payloads, ?QOS_2).
|
|
|
|
|
|
@@ -661,7 +679,7 @@ t_publish_many_while_client_is_gone_qos1(Config) ->
|
|
|
%% Ensure that PUBACKs are propagated to the channel.
|
|
|
pong = emqtt:ping(Client1),
|
|
|
|
|
|
- ok = emqtt:disconnect(Client1),
|
|
|
+ ok = disconnect_client(Client1),
|
|
|
maybe_kill_connection_process(ClientId, Config),
|
|
|
|
|
|
Pubs2 = [
|
|
|
@@ -708,7 +726,7 @@ t_publish_many_while_client_is_gone_qos1(Config) ->
|
|
|
[maps:with([packet_id, topic, payload], M) || M <- lists:sublist(Msgs2, NSame)]
|
|
|
),
|
|
|
|
|
|
- ok = emqtt:disconnect(Client2).
|
|
|
+ ok = disconnect_client(Client2).
|
|
|
|
|
|
t_publish_while_client_is_gone(Config) ->
|
|
|
%% A persistent session should receive messages in its
|
|
|
@@ -823,7 +841,7 @@ t_publish_many_while_client_is_gone(Config) ->
|
|
|
PubRels1
|
|
|
),
|
|
|
|
|
|
- ok = emqtt:disconnect(Client1),
|
|
|
+ ok = disconnect_client(Client1),
|
|
|
maybe_kill_connection_process(ClientId, Config),
|
|
|
|
|
|
Pubs2 = [
|
|
|
@@ -887,7 +905,7 @@ t_publish_many_while_client_is_gone(Config) ->
|
|
|
%% Ensure that PUBCOMPs are propagated to the channel.
|
|
|
pong = emqtt:ping(Client2),
|
|
|
|
|
|
- ok = emqtt:disconnect(Client2),
|
|
|
+ ok = disconnect_client(Client2),
|
|
|
maybe_kill_connection_process(ClientId, Config),
|
|
|
|
|
|
{ok, Client3} = emqtt:start_link([{clean_start, false} | ClientOpts]),
|
|
|
@@ -901,7 +919,7 @@ t_publish_many_while_client_is_gone(Config) ->
|
|
|
Msgs3
|
|
|
),
|
|
|
|
|
|
- ok = emqtt:disconnect(Client3).
|
|
|
+ ok = disconnect_client(Client3).
|
|
|
|
|
|
t_clean_start_drops_subscriptions(Config) ->
|
|
|
%% 1. A persistent session is started and disconnected.
|