|
|
@@ -121,7 +121,8 @@ enable_status() ->
|
|
|
evict_connections(N) ->
|
|
|
case enable_status() of
|
|
|
{enabled, _Kind, ServerReference, _Options} ->
|
|
|
- ok = do_evict_connections(N, ServerReference);
|
|
|
+ Stream = emqx_utils_stream:limit_length(N, connection_pid_stream()),
|
|
|
+ ok = do_evict_connections(Stream, ServerReference);
|
|
|
disabled ->
|
|
|
{error, disabled}
|
|
|
end.
|
|
|
@@ -140,16 +141,18 @@ evict_sessions(N, Nodes, ConnState) when
|
|
|
->
|
|
|
case enable_status() of
|
|
|
{enabled, _Kind, _ServerReference, _Options} ->
|
|
|
- ok = do_evict_sessions(N, Nodes, ConnState);
|
|
|
+ Stream = emqx_utils_stream:limit_length(N, channel_stream(ConnState)),
|
|
|
+ ok = do_evict_sessions(Nodes, Stream);
|
|
|
disabled ->
|
|
|
{error, disabled}
|
|
|
end.
|
|
|
|
|
|
-spec purge_sessions(non_neg_integer()) -> ok_or_error(disabled).
|
|
|
-purge_sessions(N) ->
|
|
|
+purge_sessions(N) when N > 0 ->
|
|
|
case enable_status() of
|
|
|
{enabled, _Kind, _ServerReference, _Options} ->
|
|
|
- ok = do_purge_sessions(N);
|
|
|
+ Stream = emqx_utils_stream:limit_length(N, channel_stream(any)),
|
|
|
+ ok = do_purge_sessions(Stream);
|
|
|
disabled ->
|
|
|
{error, disabled}
|
|
|
end.
|
|
|
@@ -331,39 +334,54 @@ session_count(ConnState) ->
|
|
|
stream_count(Stream) ->
|
|
|
emqx_utils_stream:fold(fun(_, Acc) -> Acc + 1 end, 0, Stream).
|
|
|
|
|
|
-take_connections(N) ->
|
|
|
- PidStream = emqx_utils_stream:map(
|
|
|
+connection_pid_stream() ->
|
|
|
+ emqx_utils_stream:map(
|
|
|
fun({_ClientId, ChanPid}) -> ChanPid end,
|
|
|
connection_stream()
|
|
|
- ),
|
|
|
- consume(N, PidStream).
|
|
|
-
|
|
|
-take_channels(N) ->
|
|
|
- Stream = emqx_utils_stream:map(
|
|
|
- fun({ClientId, _, ConnInfo, ClientInfo}) -> {ClientId, ConnInfo, ClientInfo} end,
|
|
|
- emqx_cm:all_channels_stream(?CONN_MODULES)
|
|
|
- ),
|
|
|
- consume(N, Stream).
|
|
|
-
|
|
|
-take_channels(N, ConnState) ->
|
|
|
- consume(N, channel_stream(ConnState)).
|
|
|
+ ).
|
|
|
|
|
|
-do_evict_connections(N, ServerReference) when N > 0 ->
|
|
|
- ChanPids = take_connections(N),
|
|
|
- ok = lists:foreach(
|
|
|
+do_evict_connections(ChanPidStream, ServerReference) ->
|
|
|
+ ok = emqx_utils_stream:foreach(
|
|
|
fun(ChanPid) ->
|
|
|
disconnect_channel(ChanPid, ServerReference)
|
|
|
end,
|
|
|
- ChanPids
|
|
|
+ ChanPidStream
|
|
|
).
|
|
|
|
|
|
-do_evict_sessions(N, Nodes, ConnState) when N > 0 ->
|
|
|
- Channels = take_channels(N, ConnState),
|
|
|
- ok = lists:foreach(
|
|
|
+do_evict_sessions(Nodes, ChannelStream) ->
|
|
|
+ ok = emqx_utils_stream:foreach(
|
|
|
fun({ClientId, ConnInfo, ClientInfo}) ->
|
|
|
- evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo)
|
|
|
+ case is_session_evictable(ClientId) of
|
|
|
+ true ->
|
|
|
+ evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo);
|
|
|
+ false ->
|
|
|
+ %% This should not happen normally.
|
|
|
+ %% But session may slip from the `emqx_cm_registry` due to cluster errors.
|
|
|
+ %% Such sessions cannot be evicted to another node.
|
|
|
+ %% If we do nothing here, we may enter a dead loop of evicting the same session.
|
|
|
+ %%
|
|
|
+ %% But
|
|
|
+ %% * In case of node evacuation, the session is doomed anyway.
|
|
|
+ %% * In case of node rebalance, we evict disconnected sessions only,
|
|
|
+ %% and a disconnected session slipped from `emqx_cm_registry` cannot be
|
|
|
+ %% taken over by a reconnecting client, so it is already lost.
|
|
|
+ %%
|
|
|
+ %% Therefore, it is safe to just discard the session here.
|
|
|
+ discard_session_channel(ClientId)
|
|
|
+ end
|
|
|
end,
|
|
|
- Channels
|
|
|
+ ChannelStream
|
|
|
+ ).
|
|
|
+
|
|
|
+is_session_evictable(ClientId) ->
|
|
|
+ emqx_cm_registry:lookup_channels(ClientId) =/= [].
|
|
|
+
|
|
|
+discard_session_channel(ClientId) ->
|
|
|
+ lists:foreach(
|
|
|
+ fun(ChanPid) ->
|
|
|
+ emqx_cm:discard_session(ClientId, ChanPid)
|
|
|
+ end,
|
|
|
+ emqx_cm:lookup_channels(local, ClientId)
|
|
|
).
|
|
|
|
|
|
evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo) ->
|
|
|
@@ -461,13 +479,12 @@ disconnect_channel(ChanPid, ServerReference) ->
|
|
|
'Server-Reference' => ServerReference
|
|
|
}}.
|
|
|
|
|
|
-do_purge_sessions(N) when N > 0 ->
|
|
|
- Channels = take_channels(N),
|
|
|
- ok = lists:foreach(
|
|
|
+do_purge_sessions(Stream) ->
|
|
|
+ ok = emqx_utils_stream:foreach(
|
|
|
fun({ClientId, _ConnInfo, _ClientInfo}) ->
|
|
|
emqx_cm:discard_session(ClientId)
|
|
|
end,
|
|
|
- Channels
|
|
|
+ Stream
|
|
|
).
|
|
|
|
|
|
do_purge_durable_sessions(N) when N > 0 ->
|
|
|
@@ -488,11 +505,3 @@ do_purge_durable_sessions(N) when N > 0 ->
|
|
|
|
|
|
select_random(List) when length(List) > 0 ->
|
|
|
lists:nth(rand:uniform(length(List)), List).
|
|
|
-
|
|
|
-consume(N, Stream) ->
|
|
|
- case emqx_utils_stream:consume(N, Stream) of
|
|
|
- {Items, _Stream} ->
|
|
|
- Items;
|
|
|
- Items when is_list(Items) ->
|
|
|
- Items
|
|
|
- end.
|