|
@@ -109,9 +109,13 @@ t_open_session(_) ->
|
|
|
emqx_cm:unregister_channel(<<"clientid">>),
|
|
emqx_cm:unregister_channel(<<"clientid">>),
|
|
|
ok = meck:unload(emqx_connection).
|
|
ok = meck:unload(emqx_connection).
|
|
|
|
|
|
|
|
|
|
+rand_client_id() ->
|
|
|
|
|
+ list_to_binary("client-id-" ++ integer_to_list(erlang:system_time())).
|
|
|
|
|
+
|
|
|
t_open_session_race_condition(_) ->
|
|
t_open_session_race_condition(_) ->
|
|
|
|
|
+ ClientId = rand_client_id(),
|
|
|
ClientInfo = #{zone => external,
|
|
ClientInfo = #{zone => external,
|
|
|
- clientid => <<"clientid">>,
|
|
|
|
|
|
|
+ clientid => ClientId,
|
|
|
username => <<"username">>,
|
|
username => <<"username">>,
|
|
|
peerhost => {127,0,0,1}},
|
|
peerhost => {127,0,0,1}},
|
|
|
ConnInfo = #{socktype => tcp,
|
|
ConnInfo = #{socktype => tcp,
|
|
@@ -136,11 +140,12 @@ t_open_session_race_condition(_) ->
|
|
|
exit(Reason)
|
|
exit(Reason)
|
|
|
end
|
|
end
|
|
|
end,
|
|
end,
|
|
|
|
|
+ N = 1000,
|
|
|
[spawn(
|
|
[spawn(
|
|
|
fun() ->
|
|
fun() ->
|
|
|
spawn(OpenASession),
|
|
spawn(OpenASession),
|
|
|
spawn(OpenASession)
|
|
spawn(OpenASession)
|
|
|
- end) || _ <- lists:seq(1, 1000)],
|
|
|
|
|
|
|
+ end) || _ <- lists:seq(1, N)],
|
|
|
|
|
|
|
|
WaitingRecv = fun _Wr(N1, N2, 0) ->
|
|
WaitingRecv = fun _Wr(N1, N2, 0) ->
|
|
|
{N1, N2};
|
|
{N1, N2};
|
|
@@ -151,45 +156,49 @@ t_open_session_race_condition(_) ->
|
|
|
end
|
|
end
|
|
|
end,
|
|
end,
|
|
|
|
|
|
|
|
- ct:pal("Race condition status: ~p~n", [WaitingRecv(0, 0, 2000)]),
|
|
|
|
|
|
|
+ {Succeeded, Failed} = WaitingRecv(0, 0, 2 * N),
|
|
|
|
|
+ ct:pal("Race condition status: succeeded=~p failed=~p~n", [Succeeded, Failed]),
|
|
|
|
|
|
|
|
- ?assertEqual(1, ets:info(emqx_channel, size)),
|
|
|
|
|
- ?assertEqual(1, ets:info(emqx_channel_conn, size)),
|
|
|
|
|
- ?assertEqual(1, ets:info(emqx_channel_registry, size)),
|
|
|
|
|
|
|
+ ?assertMatch([_], ets:lookup(emqx_channel, ClientId)),
|
|
|
|
|
+ [Pid] = emqx_cm:lookup_channels(ClientId),
|
|
|
|
|
+ ?assertMatch([_], ets:lookup(emqx_channel_conn, {ClientId, Pid})),
|
|
|
|
|
+ ?assertMatch([_], ets:lookup(emqx_channel_registry, ClientId)),
|
|
|
|
|
|
|
|
- [Pid] = emqx_cm:lookup_channels(<<"clientid">>),
|
|
|
|
|
- exit(Pid, kill), timer:sleep(100),
|
|
|
|
|
- ?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)).
|
|
|
|
|
|
|
+ exit(Pid, kill),
|
|
|
|
|
+ timer:sleep(100), %% TODO deterministic
|
|
|
|
|
+ ?assertEqual([], emqx_cm:lookup_channels(ClientId)).
|
|
|
|
|
|
|
|
t_discard_session(_) ->
|
|
t_discard_session(_) ->
|
|
|
|
|
+ ClientId = rand_client_id(),
|
|
|
#{conninfo := ConnInfo} = ?ChanInfo,
|
|
#{conninfo := ConnInfo} = ?ChanInfo,
|
|
|
- ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo),
|
|
|
|
|
|
|
+ ok = emqx_cm:register_channel(ClientId, self(), ConnInfo),
|
|
|
|
|
|
|
|
ok = meck:new(emqx_connection, [passthrough, no_history]),
|
|
ok = meck:new(emqx_connection, [passthrough, no_history]),
|
|
|
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
|
|
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
|
|
|
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end),
|
|
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end),
|
|
|
- ok = emqx_cm:discard_session(<<"clientid">>),
|
|
|
|
|
- ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo),
|
|
|
|
|
- ok = emqx_cm:discard_session(<<"clientid">>),
|
|
|
|
|
- ok = emqx_cm:unregister_channel(<<"clientid">>),
|
|
|
|
|
- ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo),
|
|
|
|
|
- ok = emqx_cm:discard_session(<<"clientid">>),
|
|
|
|
|
|
|
+ ok = emqx_cm:discard_session(ClientId),
|
|
|
|
|
+ ok = emqx_cm:register_channel(ClientId, self(), ConnInfo),
|
|
|
|
|
+ ok = emqx_cm:discard_session(ClientId),
|
|
|
|
|
+ ok = emqx_cm:unregister_channel(ClientId),
|
|
|
|
|
+ ok = emqx_cm:register_channel(ClientId, self(), ConnInfo),
|
|
|
|
|
+ ok = emqx_cm:discard_session(ClientId),
|
|
|
ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end),
|
|
ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end),
|
|
|
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> error(testing) end),
|
|
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> error(testing) end),
|
|
|
- ok = emqx_cm:discard_session(<<"clientid">>),
|
|
|
|
|
- ok = emqx_cm:unregister_channel(<<"clientid">>),
|
|
|
|
|
|
|
+ ok = emqx_cm:discard_session(ClientId),
|
|
|
|
|
+ ok = emqx_cm:unregister_channel(ClientId),
|
|
|
ok = meck:unload(emqx_connection).
|
|
ok = meck:unload(emqx_connection).
|
|
|
|
|
|
|
|
t_discard_session_race(_) ->
|
|
t_discard_session_race(_) ->
|
|
|
|
|
+ ClientId = rand_client_id(),
|
|
|
?check_trace(
|
|
?check_trace(
|
|
|
begin
|
|
begin
|
|
|
#{conninfo := ConnInfo0} = ?ChanInfo,
|
|
#{conninfo := ConnInfo0} = ?ChanInfo,
|
|
|
ConnInfo = ConnInfo0#{conn_mod := emqx_ws_connection},
|
|
ConnInfo = ConnInfo0#{conn_mod := emqx_ws_connection},
|
|
|
{Pid, Ref} = spawn_monitor(fun() -> receive stop -> exit(normal) end end),
|
|
{Pid, Ref} = spawn_monitor(fun() -> receive stop -> exit(normal) end end),
|
|
|
- ok = emqx_cm:register_channel(<<"clientid">>, Pid, ConnInfo),
|
|
|
|
|
|
|
+ ok = emqx_cm:register_channel(ClientId, Pid, ConnInfo),
|
|
|
Pid ! stop,
|
|
Pid ! stop,
|
|
|
receive {'DOWN', Ref, process, Pid, normal} -> ok end,
|
|
receive {'DOWN', Ref, process, Pid, normal} -> ok end,
|
|
|
- ok = emqx_cm:discard_session(<<"clientid">>),
|
|
|
|
|
|
|
+ ok = emqx_cm:discard_session(ClientId),
|
|
|
{ok, _} = ?block_until(#{?snk_kind := "session_already_gone", pid := Pid}, 1000)
|
|
{ok, _} = ?block_until(#{?snk_kind := "session_already_gone", pid := Pid}, 1000)
|
|
|
end,
|
|
end,
|
|
|
fun(_, _) ->
|
|
fun(_, _) ->
|
|
@@ -235,10 +244,10 @@ t_all_channels(_) ->
|
|
|
?assertEqual(true, is_list(emqx_cm:all_channels())).
|
|
?assertEqual(true, is_list(emqx_cm:all_channels())).
|
|
|
|
|
|
|
|
t_lock_clientid(_) ->
|
|
t_lock_clientid(_) ->
|
|
|
- {true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>),
|
|
|
|
|
- {true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>),
|
|
|
|
|
- {true, _Nodes} = emqx_cm_locker:unlock(<<"clientid">>),
|
|
|
|
|
- {true, _Nodes} = emqx_cm_locker:unlock(<<"clientid">>).
|
|
|
|
|
|
|
+ {true, Nodes} = emqx_cm_locker:lock(<<"clientid">>),
|
|
|
|
|
+ ?assertEqual({true, Nodes}, emqx_cm_locker:lock(<<"clientid">>)),
|
|
|
|
|
+ ?assertEqual({true, Nodes}, emqx_cm_locker:unlock(<<"clientid">>)),
|
|
|
|
|
+ ?assertEqual({true, Nodes}, emqx_cm_locker:unlock(<<"clientid">>)).
|
|
|
|
|
|
|
|
t_message(_) ->
|
|
t_message(_) ->
|
|
|
?CM ! testing,
|
|
?CM ! testing,
|