|
|
@@ -572,33 +572,54 @@ check_client_connectivity(ClientId) ->
|
|
|
{error, {find_client, Reason}}
|
|
|
end.
|
|
|
|
|
|
+is_alive(Pid) ->
|
|
|
+ is_pid(Pid) andalso erlang:is_process_alive(Pid).
|
|
|
+
|
|
|
+error_summary(Map, [Error]) ->
|
|
|
+ Map#{error => Error};
|
|
|
+error_summary(Map, [Error | More]) ->
|
|
|
+ Map#{first_error => Error, total_errors => length(More) + 1}.
|
|
|
+
|
|
|
check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) ->
|
|
|
- Leaders =
|
|
|
- case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of
|
|
|
- {ok, LeadersToCheck} ->
|
|
|
- %% Kafka is considered healthy as long as any of the partition leader is reachable.
|
|
|
- lists:filtermap(
|
|
|
- fun({_Partition, Pid}) ->
|
|
|
- case is_pid(Pid) andalso erlang:is_process_alive(Pid) of
|
|
|
- true -> {true, Pid};
|
|
|
- _ -> false
|
|
|
- end
|
|
|
- end,
|
|
|
- LeadersToCheck
|
|
|
- );
|
|
|
- {error, _} ->
|
|
|
- []
|
|
|
- end,
|
|
|
- case Leaders of
|
|
|
- [] ->
|
|
|
+ case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of
|
|
|
+ {ok, Leaders} ->
|
|
|
+ %% Kafka is considered healthy as long as any of the partition leader is reachable.
|
|
|
+ case lists:partition(fun({_Partition, Pid}) -> is_alive(Pid) end, Leaders) of
|
|
|
+ {[], Errors} ->
|
|
|
+ throw(
|
|
|
+ error_summary(
|
|
|
+ #{
|
|
|
+ cause => "no_connected_partition_leader",
|
|
|
+ kafka_client => ClientId,
|
|
|
+ kafka_topic => KafkaTopic
|
|
|
+ },
|
|
|
+ Errors
|
|
|
+ )
|
|
|
+ );
|
|
|
+ {_, []} ->
|
|
|
+ ok;
|
|
|
+ {_, Errors} ->
|
|
|
+ ?SLOG(
|
|
|
+ warning,
|
|
|
+ "not_all_kafka_partitions_connected",
|
|
|
+ error_summary(
|
|
|
+ #{
|
|
|
+ kafka_client => ClientId,
|
|
|
+ kafka_topic => KafkaTopic
|
|
|
+ },
|
|
|
+ Errors
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ ok
|
|
|
+ end;
|
|
|
+ {error, Reason} ->
|
|
|
+ %% If failed to fetch metadata, wolff_client logs a warning level message
|
|
|
+ %% which includes the reason for each seed host
|
|
|
throw(#{
|
|
|
- error => no_connected_partition_leader,
|
|
|
+ cause => Reason,
|
|
|
kafka_client => ClientId,
|
|
|
- kafka_topic => KafkaTopic,
|
|
|
- partitions_limit => MaxPartitions
|
|
|
- });
|
|
|
- _ ->
|
|
|
- ok
|
|
|
+ kafka_topic => KafkaTopic
|
|
|
+ })
|
|
|
end.
|
|
|
|
|
|
check_topic_status(ClientId, WolffClientPid, KafkaTopic) ->
|