|
@@ -343,31 +343,9 @@ test_stepdown_session(Action, Reason) ->
|
|
|
end,
|
|
end,
|
|
|
% sync
|
|
% sync
|
|
|
ignored = gen_server:call(?CM, ignore, infinity),
|
|
ignored = gen_server:call(?CM, ignore, infinity),
|
|
|
- ok = flush_emqx_cm_pool(),
|
|
|
|
|
|
|
+ ok = emqx_pool:flush_async_tasks(?CM_POOL),
|
|
|
?assertEqual([], emqx_cm:lookup_channels(ClientId)).
|
|
?assertEqual([], emqx_cm:lookup_channels(ClientId)).
|
|
|
|
|
|
|
|
-%% Channel deregistration is delegated to emqx_pool as a sync tasks.
|
|
|
|
|
-%% The emqx_pool is pool of workers, and there is no way to know
|
|
|
|
|
-%% which worker was picked for the last deregistration task.
|
|
|
|
|
-%% This help function creates a large enough number of async tasks
|
|
|
|
|
-%% to sync with the pool workers.
|
|
|
|
|
-%% The number of tasks should be large enough to ensure all workers have
|
|
|
|
|
-%% the chance to work on at least one of the tasks.
|
|
|
|
|
-flush_emqx_cm_pool() ->
|
|
|
|
|
- Self = self(),
|
|
|
|
|
- L = lists:seq(1, 1000),
|
|
|
|
|
- lists:foreach(
|
|
|
|
|
- fun(I) -> emqx_pool:async_submit_to_pool(?CM_POOL, fun() -> Self ! {done, I} end, []) end, L
|
|
|
|
|
- ),
|
|
|
|
|
- lists:foreach(
|
|
|
|
|
- fun(I) ->
|
|
|
|
|
- receive
|
|
|
|
|
- {done, I} -> ok
|
|
|
|
|
- end
|
|
|
|
|
- end,
|
|
|
|
|
- L
|
|
|
|
|
- ).
|
|
|
|
|
-
|
|
|
|
|
t_discard_session_race(_) ->
|
|
t_discard_session_race(_) ->
|
|
|
ClientId = rand_client_id(),
|
|
ClientId = rand_client_id(),
|
|
|
?check_trace(
|
|
?check_trace(
|