|
|
@@ -141,11 +141,8 @@ t_open_session_race_condition(_) ->
|
|
|
end
|
|
|
end,
|
|
|
N = 1000,
|
|
|
- [spawn(
|
|
|
- fun() ->
|
|
|
- spawn(OpenASession),
|
|
|
- spawn(OpenASession)
|
|
|
- end) || _ <- lists:seq(1, N)],
|
|
|
+ Pids = lists:flatten([[spawn_monitor(OpenASession), spawn_monitor(OpenASession)] ||
|
|
|
+ _ <- lists:seq(1, N)]),
|
|
|
|
|
|
WaitingRecv = fun _Wr(N1, N2, 0) ->
|
|
|
{N1, N2};
|
|
|
@@ -158,14 +155,29 @@ t_open_session_race_condition(_) ->
|
|
|
|
|
|
{Succeeded, Failed} = WaitingRecv(0, 0, 2 * N),
|
|
|
ct:pal("Race condition status: succeeded=~p failed=~p~n", [Succeeded, Failed]),
|
|
|
+ ?assertEqual(2 * N, length(Pids)),
|
|
|
+ WaitForDowns =
|
|
|
+ fun _Wd([{Pid, _Ref}]) -> Pid;
|
|
|
+ _Wd(Pids0) ->
|
|
|
+ receive
|
|
|
+ {'DOWN', DownRef, process, DownPid, _} ->
|
|
|
+ ?assert(lists:member({DownPid, DownRef}, Pids0)),
|
|
|
+ _Wd(lists:delete({DownPid, DownRef}, Pids0))
|
|
|
+ after
|
|
|
+ 10000 ->
|
|
|
+ exit(timeout)
|
|
|
+ end
|
|
|
+ end,
|
|
|
+ Winner = WaitForDowns(Pids),
|
|
|
|
|
|
?assertMatch([_], ets:lookup(emqx_channel, ClientId)),
|
|
|
- [Pid] = emqx_cm:lookup_channels(ClientId),
|
|
|
- ?assertMatch([_], ets:lookup(emqx_channel_conn, {ClientId, Pid})),
|
|
|
+ ?assertEqual([Winner], emqx_cm:lookup_channels(ClientId)),
|
|
|
+ ?assertMatch([_], ets:lookup(emqx_channel_conn, {ClientId, Winner})),
|
|
|
?assertMatch([_], ets:lookup(emqx_channel_registry, ClientId)),
|
|
|
|
|
|
- exit(Pid, kill),
|
|
|
- timer:sleep(100), %% TODO deterministic
|
|
|
+ exit(Winner, kill),
|
|
|
+ receive {'DOWN', _, process, Winner, _} -> ok end,
|
|
|
+ ignored = gen_server:call(emqx_cm, ignore, infinity), %% sync
|
|
|
?assertEqual([], emqx_cm:lookup_channels(ClientId)).
|
|
|
|
|
|
t_discard_session(_) ->
|