|
|
@@ -9,7 +9,6 @@
|
|
|
-include_lib("emqx/include/types.hrl").
|
|
|
-include_lib("emqx/include/emqx_hooks.hrl").
|
|
|
|
|
|
--include_lib("stdlib/include/qlc.hrl").
|
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
|
|
-export([
|
|
|
@@ -122,7 +121,8 @@ enable_status() ->
|
|
|
evict_connections(N) ->
|
|
|
case enable_status() of
|
|
|
{enabled, _Kind, ServerReference, _Options} ->
|
|
|
- ok = do_evict_connections(N, ServerReference);
|
|
|
+ Stream = emqx_utils_stream:limit_length(N, connection_pid_stream()),
|
|
|
+ ok = do_evict_connections(Stream, ServerReference);
|
|
|
disabled ->
|
|
|
{error, disabled}
|
|
|
end.
|
|
|
@@ -141,16 +141,18 @@ evict_sessions(N, Nodes, ConnState) when
|
|
|
->
|
|
|
case enable_status() of
|
|
|
{enabled, _Kind, _ServerReference, _Options} ->
|
|
|
- ok = do_evict_sessions(N, Nodes, ConnState);
|
|
|
+ Stream = emqx_utils_stream:limit_length(N, channel_stream(ConnState)),
|
|
|
+ ok = do_evict_sessions(Nodes, Stream);
|
|
|
disabled ->
|
|
|
{error, disabled}
|
|
|
end.
|
|
|
|
|
|
-spec purge_sessions(non_neg_integer()) -> ok_or_error(disabled).
|
|
|
-purge_sessions(N) ->
|
|
|
+purge_sessions(N) when N > 0 ->
|
|
|
case enable_status() of
|
|
|
{enabled, _Kind, _ServerReference, _Options} ->
|
|
|
- ok = do_purge_sessions(N);
|
|
|
+ Stream = emqx_utils_stream:limit_length(N, channel_stream(any)),
|
|
|
+ ok = do_purge_sessions(Stream);
|
|
|
disabled ->
|
|
|
{error, disabled}
|
|
|
end.
|
|
|
@@ -267,25 +269,31 @@ stats() ->
|
|
|
sessions => session_count()
|
|
|
}.
|
|
|
|
|
|
-connection_table() ->
|
|
|
- emqx_cm:live_connection_table(?CONN_MODULES).
|
|
|
+connection_stream() ->
|
|
|
+ emqx_cm:live_connection_stream(?CONN_MODULES).
|
|
|
|
|
|
connection_count() ->
|
|
|
- table_count(connection_table()).
|
|
|
-
|
|
|
-channel_table(any) ->
|
|
|
- qlc:q([
|
|
|
- {ClientId, ConnInfo, ClientInfo}
|
|
|
- || {ClientId, _, ConnInfo, ClientInfo} <-
|
|
|
- emqx_cm:all_channels_table(?CONN_MODULES)
|
|
|
- ]);
|
|
|
-channel_table(RequiredConnState) ->
|
|
|
- qlc:q([
|
|
|
- {ClientId, ConnInfo, ClientInfo}
|
|
|
- || {ClientId, ConnState, ConnInfo, ClientInfo} <-
|
|
|
- emqx_cm:all_channels_table(?CONN_MODULES),
|
|
|
- RequiredConnState =:= ConnState
|
|
|
- ]).
|
|
|
+ stream_count(connection_stream()).
|
|
|
+
|
|
|
+channel_stream(any) ->
|
|
|
+ emqx_utils_stream:map(
|
|
|
+ fun({ClientId, _, ConnInfo, ClientInfo}) ->
|
|
|
+ {ClientId, ConnInfo, ClientInfo}
|
|
|
+ end,
|
|
|
+ emqx_cm:all_channels_stream(?CONN_MODULES)
|
|
|
+ );
|
|
|
+channel_stream(RequiredConnState) ->
|
|
|
+ WithRequiredConnStateStream =
|
|
|
+ emqx_utils_stream:filter(
|
|
|
+ fun({_, ConnState, _, _}) -> RequiredConnState =:= ConnState end,
|
|
|
+ emqx_cm:all_channels_stream(?CONN_MODULES)
|
|
|
+ ),
|
|
|
+ emqx_utils_stream:map(
|
|
|
+ fun({ClientId, _, ConnInfo, ClientInfo}) ->
|
|
|
+ {ClientId, ConnInfo, ClientInfo}
|
|
|
+ end,
|
|
|
+ WithRequiredConnStateStream
|
|
|
+ ).
|
|
|
|
|
|
-spec all_channels_count() -> non_neg_integer().
|
|
|
all_channels_count() ->
|
|
|
@@ -312,7 +320,7 @@ all_channels_count() ->
|
|
|
|
|
|
-spec all_local_channels_count() -> non_neg_integer().
|
|
|
all_local_channels_count() ->
|
|
|
- table_count(channel_table(any)).
|
|
|
+ stream_count(channel_stream(any)).
|
|
|
|
|
|
session_count() ->
|
|
|
session_count(any) + durable_session_count().
|
|
|
@@ -321,51 +329,59 @@ durable_session_count() ->
|
|
|
emqx_persistent_session_bookkeeper:get_disconnected_session_count().
|
|
|
|
|
|
session_count(ConnState) ->
|
|
|
- table_count(channel_table(ConnState)).
|
|
|
-
|
|
|
-table_count(QH) ->
|
|
|
- qlc:fold(fun(_, Acc) -> Acc + 1 end, 0, QH).
|
|
|
-
|
|
|
-take_connections(N) ->
|
|
|
- ChanQH = qlc:q([ChanPid || {_ClientId, ChanPid} <- connection_table()]),
|
|
|
- ChanPidCursor = qlc:cursor(ChanQH),
|
|
|
- ChanPids = qlc:next_answers(ChanPidCursor, N),
|
|
|
- ok = qlc:delete_cursor(ChanPidCursor),
|
|
|
- ChanPids.
|
|
|
-
|
|
|
-take_channels(N) ->
|
|
|
- QH = qlc:q([
|
|
|
- {ClientId, ConnInfo, ClientInfo}
|
|
|
- || {ClientId, _, ConnInfo, ClientInfo} <-
|
|
|
- emqx_cm:all_channels_table(?CONN_MODULES)
|
|
|
- ]),
|
|
|
- ChanPidCursor = qlc:cursor(QH),
|
|
|
- Channels = qlc:next_answers(ChanPidCursor, N),
|
|
|
- ok = qlc:delete_cursor(ChanPidCursor),
|
|
|
- Channels.
|
|
|
-
|
|
|
-take_channels(N, ConnState) ->
|
|
|
- ChanPidCursor = qlc:cursor(channel_table(ConnState)),
|
|
|
- Channels = qlc:next_answers(ChanPidCursor, N),
|
|
|
- ok = qlc:delete_cursor(ChanPidCursor),
|
|
|
- Channels.
|
|
|
-
|
|
|
-do_evict_connections(N, ServerReference) when N > 0 ->
|
|
|
- ChanPids = take_connections(N),
|
|
|
- ok = lists:foreach(
|
|
|
+ stream_count(channel_stream(ConnState)).
|
|
|
+
|
|
|
+stream_count(Stream) ->
|
|
|
+ emqx_utils_stream:fold(fun(_, Acc) -> Acc + 1 end, 0, Stream).
|
|
|
+
|
|
|
+connection_pid_stream() ->
|
|
|
+ emqx_utils_stream:map(
|
|
|
+ fun({_ClientId, ChanPid}) -> ChanPid end,
|
|
|
+ connection_stream()
|
|
|
+ ).
|
|
|
+
|
|
|
+do_evict_connections(ChanPidStream, ServerReference) ->
|
|
|
+ ok = emqx_utils_stream:foreach(
|
|
|
fun(ChanPid) ->
|
|
|
disconnect_channel(ChanPid, ServerReference)
|
|
|
end,
|
|
|
- ChanPids
|
|
|
+ ChanPidStream
|
|
|
).
|
|
|
|
|
|
-do_evict_sessions(N, Nodes, ConnState) when N > 0 ->
|
|
|
- Channels = take_channels(N, ConnState),
|
|
|
- ok = lists:foreach(
|
|
|
+do_evict_sessions(Nodes, ChannelStream) ->
|
|
|
+ ok = emqx_utils_stream:foreach(
|
|
|
fun({ClientId, ConnInfo, ClientInfo}) ->
|
|
|
- evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo)
|
|
|
+ case is_session_evictable(ClientId) of
|
|
|
+ true ->
|
|
|
+ evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo);
|
|
|
+ false ->
|
|
|
+ %% This should not happen normally.
|
|
|
+ %% But session may slip from the `emqx_cm_registry` due to cluster errors.
|
|
|
+ %% Such sessions cannot be evicted to another node.
|
|
|
+ %% If we do nothing here, we may enter a dead loop of evicting the same session.
|
|
|
+ %%
|
|
|
+ %% But
|
|
|
+ %% * In case of node evacuation, the session is doomed anyway.
|
|
|
+ %% * In case of node rebalance, we evict disconnected sessions only,
|
|
|
+ %% and a disconnected session slipped from `emqx_cm_registry` cannot be
|
|
|
+ %% taken over by a reconnecting client, so it is already lost.
|
|
|
+ %%
|
|
|
+ %% Therefore, it is safe to just discard the session here.
|
|
|
+ discard_session_channel(ClientId)
|
|
|
+ end
|
|
|
+ end,
|
|
|
+ ChannelStream
|
|
|
+ ).
|
|
|
+
|
|
|
+is_session_evictable(ClientId) ->
|
|
|
+ emqx_cm_registry:lookup_channels(ClientId) =/= [].
|
|
|
+
|
|
|
+discard_session_channel(ClientId) ->
|
|
|
+ lists:foreach(
|
|
|
+ fun(ChanPid) ->
|
|
|
+ emqx_cm:discard_session(ClientId, ChanPid)
|
|
|
end,
|
|
|
- Channels
|
|
|
+ emqx_cm:lookup_channels(local, ClientId)
|
|
|
).
|
|
|
|
|
|
evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo) ->
|
|
|
@@ -463,13 +479,12 @@ disconnect_channel(ChanPid, ServerReference) ->
|
|
|
'Server-Reference' => ServerReference
|
|
|
}}.
|
|
|
|
|
|
-do_purge_sessions(N) when N > 0 ->
|
|
|
- Channels = take_channels(N),
|
|
|
- ok = lists:foreach(
|
|
|
+do_purge_sessions(Stream) ->
|
|
|
+ ok = emqx_utils_stream:foreach(
|
|
|
fun({ClientId, _ConnInfo, _ClientInfo}) ->
|
|
|
emqx_cm:discard_session(ClientId)
|
|
|
end,
|
|
|
- Channels
|
|
|
+ Stream
|
|
|
).
|
|
|
|
|
|
do_purge_durable_sessions(N) when N > 0 ->
|