|
@@ -564,7 +564,7 @@ on_kafka_ack(_Partition, message_too_large, {ReplyFn, Args}) ->
|
|
|
%% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise,
|
|
%% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise,
|
|
|
%% `emqx_resource_manager' will kill the wolff producers and messages might be lost.
|
|
%% `emqx_resource_manager' will kill the wolff producers and messages might be lost.
|
|
|
on_get_status(
|
|
on_get_status(
|
|
|
- _InstId,
|
|
|
|
|
|
|
+ ConnResId,
|
|
|
#{client_id := ClientId} = State
|
|
#{client_id := ClientId} = State
|
|
|
) ->
|
|
) ->
|
|
|
%% Note: we must avoid returning `?status_disconnected' here if the connector ever was
|
|
%% Note: we must avoid returning `?status_disconnected' here if the connector ever was
|
|
@@ -574,7 +574,7 @@ on_get_status(
|
|
|
%% held in wolff producer's replayq.
|
|
%% held in wolff producer's replayq.
|
|
|
case check_client_connectivity(ClientId) of
|
|
case check_client_connectivity(ClientId) of
|
|
|
ok ->
|
|
ok ->
|
|
|
- maybe_check_health_check_topic(State);
|
|
|
|
|
|
|
+ maybe_check_health_check_topic(ConnResId, State);
|
|
|
{error, {find_client, _Error}} ->
|
|
{error, {find_client, _Error}} ->
|
|
|
?status_connecting;
|
|
?status_connecting;
|
|
|
{error, {connectivity, Error}} ->
|
|
{error, {connectivity, Error}} ->
|
|
@@ -648,21 +648,23 @@ check_client_connectivity(ClientId) ->
|
|
|
{error, {find_client, Reason}}
|
|
{error, {find_client, Reason}}
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-maybe_check_health_check_topic(#{health_check_topic := Topic} = ConnectorState) when
|
|
|
|
|
|
|
+maybe_check_health_check_topic(ConnResId, #{health_check_topic := Topic} = ConnectorState) when
|
|
|
is_binary(Topic)
|
|
is_binary(Topic)
|
|
|
->
|
|
->
|
|
|
#{client_id := ClientId} = ConnectorState,
|
|
#{client_id := ClientId} = ConnectorState,
|
|
|
MaxPartitions = all_partitions,
|
|
MaxPartitions = all_partitions,
|
|
|
- try check_topic_and_leader_connections(ClientId, Topic, MaxPartitions) of
|
|
|
|
|
|
|
+ try check_topic_and_leader_connections(ConnResId, ClientId, Topic, MaxPartitions) of
|
|
|
ok ->
|
|
ok ->
|
|
|
?status_connected
|
|
?status_connected
|
|
|
catch
|
|
catch
|
|
|
|
|
+ throw:{unhealthy_target, Msg} ->
|
|
|
|
|
+ {?status_disconnected, ConnectorState, Msg};
|
|
|
throw:#{reason := {connection_down, _} = Reason} ->
|
|
throw:#{reason := {connection_down, _} = Reason} ->
|
|
|
{?status_disconnected, ConnectorState, Reason};
|
|
{?status_disconnected, ConnectorState, Reason};
|
|
|
throw:#{reason := Reason} ->
|
|
throw:#{reason := Reason} ->
|
|
|
{?status_connecting, ConnectorState, Reason}
|
|
{?status_connecting, ConnectorState, Reason}
|
|
|
end;
|
|
end;
|
|
|
-maybe_check_health_check_topic(_) ->
|
|
|
|
|
|
|
+maybe_check_health_check_topic(_ConnResId, _ConnState) ->
|
|
|
%% Cannot infer further information. Maybe upgraded from older version.
|
|
%% Cannot infer further information. Maybe upgraded from older version.
|
|
|
?status_connected.
|
|
?status_connected.
|
|
|
|
|
|