|
|
@@ -95,6 +95,11 @@
|
|
|
commit_fun => brod_group_subscriber_v2:commit_fun()
|
|
|
}.
|
|
|
|
|
|
+-define(CLIENT_DOWN_MESSAGE,
|
|
|
+ "Failed to start Kafka client. Please check the logs for errors and check"
|
|
|
+ " the connection parameters."
|
|
|
+).
|
|
|
+
|
|
|
%%-------------------------------------------------------------------------------------
|
|
|
%% `emqx_resource' API
|
|
|
%%-------------------------------------------------------------------------------------
|
|
|
@@ -152,7 +157,7 @@ on_start(InstanceId, Config) ->
|
|
|
kafka_hosts => BootstrapHosts,
|
|
|
reason => emqx_misc:redact(Reason)
|
|
|
}),
|
|
|
- throw(failed_to_start_kafka_client)
|
|
|
+ throw(?CLIENT_DOWN_MESSAGE)
|
|
|
end,
|
|
|
start_consumer(Config, InstanceId, ClientID).
|
|
|
|
|
|
@@ -173,7 +178,7 @@ on_get_status(_InstanceID, State) ->
|
|
|
kafka_client_id := ClientID,
|
|
|
kafka_topics := KafkaTopics
|
|
|
} = State,
|
|
|
- do_get_status(ClientID, KafkaTopics, SubscriberId).
|
|
|
+ do_get_status(State, ClientID, KafkaTopics, SubscriberId).
|
|
|
|
|
|
%%-------------------------------------------------------------------------------------
|
|
|
%% `brod_group_subscriber' API
|
|
|
@@ -370,22 +375,41 @@ stop_client(ClientID) ->
|
|
|
),
|
|
|
ok.
|
|
|
|
|
|
-do_get_status(ClientID, [KafkaTopic | RestTopics], SubscriberId) ->
|
|
|
+do_get_status(State, ClientID, [KafkaTopic | RestTopics], SubscriberId) ->
|
|
|
case brod:get_partitions_count(ClientID, KafkaTopic) of
|
|
|
{ok, NPartitions} ->
|
|
|
- case do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) of
|
|
|
- connected -> do_get_status(ClientID, RestTopics, SubscriberId);
|
|
|
+ case do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) of
|
|
|
+ connected -> do_get_status(State, ClientID, RestTopics, SubscriberId);
|
|
|
disconnected -> disconnected
|
|
|
end;
|
|
|
+ {error, {client_down, Context}} ->
|
|
|
+ case infer_client_error(Context) of
|
|
|
+ auth_error ->
|
|
|
+ Message = "Authentication error. " ++ ?CLIENT_DOWN_MESSAGE,
|
|
|
+ {disconnected, State, Message};
|
|
|
+ {auth_error, Message0} ->
|
|
|
+ Message = binary_to_list(Message0) ++ "; " ++ ?CLIENT_DOWN_MESSAGE,
|
|
|
+ {disconnected, State, Message};
|
|
|
+ connection_refused ->
|
|
|
+ Message = "Connection refused. " ++ ?CLIENT_DOWN_MESSAGE,
|
|
|
+ {disconnected, State, Message};
|
|
|
+ _ ->
|
|
|
+ {disconnected, State, ?CLIENT_DOWN_MESSAGE}
|
|
|
+ end;
|
|
|
+ {error, leader_not_available} ->
|
|
|
+ Message =
|
|
|
+ "Leader connection not available. Please check the Kafka topic used,"
|
|
|
+ " the connection parameters and Kafka cluster health",
|
|
|
+ {disconnected, State, Message};
|
|
|
_ ->
|
|
|
disconnected
|
|
|
end;
|
|
|
-do_get_status(_ClientID, _KafkaTopics = [], _SubscriberId) ->
|
|
|
+do_get_status(_State, _ClientID, _KafkaTopics = [], _SubscriberId) ->
|
|
|
connected.
|
|
|
|
|
|
--spec do_get_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) ->
|
|
|
+-spec do_get_status1(brod:client_id(), binary(), subscriber_id(), pos_integer()) ->
|
|
|
connected | disconnected.
|
|
|
-do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
|
|
|
+do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
|
|
|
Results =
|
|
|
lists:map(
|
|
|
fun(N) ->
|
|
|
@@ -504,3 +528,15 @@ encode(Value, base64) ->
|
|
|
|
|
|
to_bin(B) when is_binary(B) -> B;
|
|
|
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).
|
|
|
+
|
|
|
+infer_client_error(Error) ->
|
|
|
+ case Error of
|
|
|
+ [{_BrokerEndpoint, {econnrefused, _}} | _] ->
|
|
|
+ connection_refused;
|
|
|
+ [{_BrokerEndpoint, {{sasl_auth_error, Message}, _}} | _] when is_binary(Message) ->
|
|
|
+ {auth_error, Message};
|
|
|
+ [{_BrokerEndpoint, {{sasl_auth_error, _}, _}} | _] ->
|
|
|
+ auth_error;
|
|
|
+ _ ->
|
|
|
+ undefined
|
|
|
+ end.
|