|
|
@@ -37,6 +37,7 @@ groups() ->
|
|
|
TCs = emqx_common_test_helpers:all(?MODULE),
|
|
|
ConnClientTCs = [ t_connected_client_count_persistent
|
|
|
, t_connected_client_count_anonymous
|
|
|
+ , t_connected_client_count_transient_takeover
|
|
|
, t_connected_client_stats
|
|
|
],
|
|
|
OtherTCs = TCs -- ConnClientTCs,
|
|
|
@@ -461,6 +462,80 @@ t_connected_client_count_anonymous({'end', _Config}) ->
|
|
|
snabbkaffe:stop(),
|
|
|
ok.
|
|
|
|
|
|
+t_connected_client_count_transient_takeover({init, Config}) ->
|
|
|
+ ok = snabbkaffe:start_trace(),
|
|
|
+ process_flag(trap_exit, true),
|
|
|
+ Config;
|
|
|
+t_connected_client_count_transient_takeover(Config) when is_list(Config) ->
|
|
|
+ ConnFun = ?config(conn_fun, Config),
|
|
|
+ ClientID = <<"clientid">>,
|
|
|
+ ?assertEqual(0, emqx_cm:get_connected_client_count()),
|
|
|
+ %% we spawn several clients simultaneously to cause the race
|
|
|
+ %% condition for the client id lock
|
|
|
+ NumClients = 20,
|
|
|
+ {ok, {ok, [_, _]}} =
|
|
|
+ wait_for_events(
|
|
|
+ fun() ->
|
|
|
+ lists:foreach(
|
|
|
+ fun(_) ->
|
|
|
+ spawn(
|
|
|
+ fun() ->
|
|
|
+ {ok, ConnPid} =
|
|
|
+ emqtt:start_link([ {clean_start, true}
|
|
|
+ , {clientid, ClientID}
|
|
|
+ | Config]),
|
|
|
+ %% don't assert the result: most of them fail
|
|
|
+ %% during the race
|
|
|
+ emqtt:ConnFun(ConnPid),
|
|
|
+ ok
|
|
|
+ end),
|
|
|
+ ok
|
|
|
+ end,
|
|
|
+ lists:seq(1, NumClients))
|
|
|
+ end,
|
|
|
+ %% there can be only one channel that wins the race for the
|
|
|
+ %% lock for this client id. we also expect a decrement
|
|
|
+ %% event because the client dies along with the ephemeral
|
|
|
+ %% process.
|
|
|
+ [ emqx_cm_connected_client_count_inc
|
|
|
+ , emqx_cm_connected_client_count_dec
|
|
|
+ ],
|
|
|
+ 1000),
|
|
|
+ %% Since more than one pair of inc/dec may be emitted, we need to
|
|
|
+ %% wait for full stabilization
|
|
|
+ timer:sleep(100),
|
|
|
+ %% It must be 0 again because we spawn-linked the clients in
|
|
|
+ %% ephemeral processes above, and all should be dead now.
|
|
|
+ ?assertEqual(0, emqx_cm:get_connected_client_count()),
|
|
|
+ %% connecting again
|
|
|
+ {ok, ConnPid1} = emqtt:start_link([ {clean_start, true}
|
|
|
+ , {clientid, ClientID}
|
|
|
+ | Config
|
|
|
+ ]),
|
|
|
+ {{ok, _}, {ok, [_]}} =
|
|
|
+ wait_for_events(
|
|
|
+ fun() -> emqtt:ConnFun(ConnPid1) end,
|
|
|
+ [emqx_cm_connected_client_count_inc]
|
|
|
+ ),
|
|
|
+ ?assertEqual(1, emqx_cm:get_connected_client_count()),
|
|
|
+ %% abnormal exit of channel process
|
|
|
+ [ChanPid] = emqx_cm:all_channels(),
|
|
|
+ {ok, {ok, [_, _]}} =
|
|
|
+ wait_for_events(
|
|
|
+ fun() ->
|
|
|
+ exit(ChanPid, kill),
|
|
|
+ ok
|
|
|
+ end,
|
|
|
+ [ emqx_cm_connected_client_count_dec
|
|
|
+ , emqx_cm_process_down
|
|
|
+ ]
|
|
|
+ ),
|
|
|
+ ?assertEqual(0, emqx_cm:get_connected_client_count()),
|
|
|
+ ok;
|
|
|
+t_connected_client_count_transient_takeover({'end', _Config}) ->
|
|
|
+ snabbkaffe:stop(),
|
|
|
+ ok.
|
|
|
+
|
|
|
t_connected_client_stats({init, Config}) ->
|
|
|
ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats),
|
|
|
{ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),
|