|
|
@@ -302,8 +302,9 @@ renew_streams(#{topic := Topic} = Data0) ->
|
|
|
{Data1, VanishedStreams} = update_progresses(Data0, NewStreamsWRanks, TopicFilter, StartTime),
|
|
|
Data2 = store_put_rank_progress(Data1, RankProgress),
|
|
|
Data3 = removed_vanished_streams(Data2, VanishedStreams),
|
|
|
- Data4 = revoke_streams(Data3),
|
|
|
- Data5 = assign_streams(Data4),
|
|
|
+ DesiredCounts = desired_stream_count_for_agents(Data3),
|
|
|
+ Data4 = revoke_streams(Data3, DesiredCounts),
|
|
|
+ Data5 = assign_streams(Data4, DesiredCounts),
|
|
|
?SLOG(info, #{
|
|
|
msg => leader_renew_streams,
|
|
|
topic_filter => TopicFilter,
|
|
|
@@ -370,12 +371,12 @@ removed_vanished_streams(Data0, VanishedStreams) ->
|
|
|
%% We revoke only from replaying agents.
|
|
|
%% After revoking, no unassigned streams appear. Streams will become unassigned
|
|
|
%% only after agents report them as acked and unsubscribed.
|
|
|
-revoke_streams(Data0) ->
|
|
|
- DesiredStreamsPerAgent = desired_stream_count_per_agent(Data0),
|
|
|
+revoke_streams(Data0, DesiredCounts) ->
|
|
|
Agents = replaying_agents(Data0),
|
|
|
lists:foldl(
|
|
|
fun(Agent, DataAcc) ->
|
|
|
- revoke_excess_streams_from_agent(DataAcc, Agent, DesiredStreamsPerAgent)
|
|
|
+ DesiredCount = maps:get(Agent, DesiredCounts),
|
|
|
+ revoke_excess_streams_from_agent(DataAcc, Agent, DesiredCount)
|
|
|
end,
|
|
|
Data0,
|
|
|
Agents
|
|
|
@@ -424,12 +425,12 @@ select_streams_for_revoke(
|
|
|
|
|
|
%% We assign streams to agents that have too few streams (< desired_stream_count_per_agent).
|
|
|
%% We assign only to replaying agents.
|
|
|
-assign_streams(Data0) ->
|
|
|
- DesiredStreamsPerAgent = desired_stream_count_per_agent(Data0),
|
|
|
+assign_streams(Data0, DesiredCounts) ->
|
|
|
Agents = replaying_agents(Data0),
|
|
|
lists:foldl(
|
|
|
fun(Agent, DataAcc) ->
|
|
|
- assign_lacking_streams(DataAcc, Agent, DesiredStreamsPerAgent)
|
|
|
+ DesiredCount = maps:get(Agent, DesiredCounts),
|
|
|
+ assign_lacking_streams(DataAcc, Agent, DesiredCount)
|
|
|
end,
|
|
|
Data0,
|
|
|
Agents
|
|
|
@@ -549,7 +550,8 @@ connect_agent(
|
|
|
}),
|
|
|
reconnect_agent(Data, Agent, AgentMetadata, AgentState);
|
|
|
_ ->
|
|
|
- DesiredCount = desired_stream_count_for_new_agent(Data),
|
|
|
+ DesiredCounts = desired_stream_count_for_agents(Data, [Agent | maps:keys(Agents)]),
|
|
|
+ DesiredCount = maps:get(Agent, DesiredCounts),
|
|
|
assign_initial_streams_to_agent(Data, Agent, AgentMetadata, DesiredCount)
|
|
|
end.
|
|
|
|
|
|
@@ -947,26 +949,40 @@ replaying_agents(#{agents := AgentStates}) ->
|
|
|
maps:to_list(AgentStates)
|
|
|
).
|
|
|
|
|
|
-desired_stream_count_per_agent(#{agents := AgentStates} = Data) ->
|
|
|
- desired_stream_count_per_agent(Data, maps:size(AgentStates)).
|
|
|
+desired_stream_count_for_agents(#{agents := AgentStates} = Data) ->
|
|
|
+ desired_stream_count_for_agents(Data, maps:keys(AgentStates)).
|
|
|
|
|
|
-desired_stream_count_for_new_agent(#{agents := AgentStates} = Data) ->
|
|
|
- desired_stream_count_per_agent(Data, maps:size(AgentStates) + 1).
|
|
|
-
|
|
|
-desired_stream_count_per_agent(Data, AgentCount) ->
|
|
|
- case AgentCount of
|
|
|
- 0 ->
|
|
|
+desired_stream_count_for_agents(Data, Agents) ->
|
|
|
+ case Agents of
|
|
|
+ [] ->
|
|
|
0;
|
|
|
_ ->
|
|
|
StreamCount = store_num_streams(Data),
|
|
|
- case StreamCount rem AgentCount of
|
|
|
- 0 ->
|
|
|
- StreamCount div AgentCount;
|
|
|
- _ ->
|
|
|
- 1 + StreamCount div AgentCount
|
|
|
- end
|
|
|
+ AgentCount = length(Agents),
|
|
|
+ maps:from_list(
|
|
|
+ lists:map(
|
|
|
+ fun({I, Agent}) ->
|
|
|
+ {Agent, desired_stream_count_for_agent(StreamCount, AgentCount, I)}
|
|
|
+ end,
|
|
|
+ enumerate(lists:sort(Agents))
|
|
|
+ )
|
|
|
+ )
|
|
|
end.
|
|
|
|
|
|
+enumerate(List) ->
|
|
|
+ enumerate(0, List).
|
|
|
+
|
|
|
+enumerate(_, []) ->
|
|
|
+ [];
|
|
|
+enumerate(I, [H | T]) ->
|
|
|
+ [{I, H} | enumerate(I + 1, T)].
|
|
|
+
|
|
|
+desired_stream_count_for_agent(StreamCount, AgentCount, I) ->
|
|
|
+ (StreamCount div AgentCount) + extra_stream_count_for_agent(StreamCount, AgentCount, I).
|
|
|
+
|
|
|
+extra_stream_count_for_agent(StreamCount, AgentCount, I) when I < (StreamCount rem AgentCount) -> 1;
|
|
|
+extra_stream_count_for_agent(_StreamCount, _AgentCount, _I) -> 0.
|
|
|
+
|
|
|
stream_progresses(Data, Streams) ->
|
|
|
lists:map(
|
|
|
fun(Stream) ->
|