|
|
@@ -185,7 +185,7 @@ t_takeover(Config) ->
|
|
|
t_takeover_willmsg(Config) ->
|
|
|
process_flag(trap_exit, true),
|
|
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
|
- WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
|
|
|
+ WillTopic = <<ClientId/binary, <<"_willtopic">>/binary>>,
|
|
|
Middle = ?CNT div 2,
|
|
|
Client1Msgs = messages(ClientId, 0, Middle),
|
|
|
Client2Msgs = messages(ClientId, Middle, ?CNT div 2),
|
|
|
@@ -899,16 +899,11 @@ t_kick_session(Config) ->
|
|
|
{fun start_client/5, [
|
|
|
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
|
|
|
]},
|
|
|
- %% kick may fail (not found) without this delay
|
|
|
- {
|
|
|
- fun(CTX) ->
|
|
|
- timer:sleep(300),
|
|
|
- CTX
|
|
|
- end,
|
|
|
- []
|
|
|
- },
|
|
|
+ {fun wait_for_chan_reg/2, [ClientId]},
|
|
|
%% WHEN: client is kicked with kick_session
|
|
|
- {fun kick_client/2, [ClientId]}
|
|
|
+ {fun kick_client/2, [ClientId]},
|
|
|
+ {fun wait_for_chan_dereg/2, [ClientId]},
|
|
|
+ {fun wait_for_pub_client_down/1, []}
|
|
|
]),
|
|
|
FCtx = lists:foldl(
|
|
|
fun({Fun, Args}, Ctx) ->
|
|
|
@@ -929,6 +924,30 @@ t_kick_session(Config) ->
|
|
|
?assert(not is_process_alive(CPid1)),
|
|
|
ok.
|
|
|
|
|
|
+wait_for_chan_reg(CTX, ClientId) ->
|
|
|
+ ?retry(
|
|
|
+ 3_000,
|
|
|
+ 100,
|
|
|
+ true = is_map(emqx_cm:get_chan_info(ClientId))
|
|
|
+ ),
|
|
|
+ CTX.
|
|
|
+
|
|
|
+wait_for_chan_dereg(CTX, ClientId) ->
|
|
|
+ ?retry(
|
|
|
+ 3_000,
|
|
|
+ 100,
|
|
|
+ undefined = emqx_cm:get_chan_info(ClientId)
|
|
|
+ ),
|
|
|
+ CTX.
|
|
|
+
|
|
|
+wait_for_pub_client_down(#{client := [_SubClient, PubClient]} = CTX) ->
|
|
|
+ ?retry(
|
|
|
+ 3_000,
|
|
|
+ 100,
|
|
|
+ false = is_process_alive(PubClient)
|
|
|
+ ),
|
|
|
+ CTX.
|
|
|
+
|
|
|
%% t_takover_in_cluster(_) ->
|
|
|
%% todo.
|
|
|
|