|
@@ -291,12 +291,15 @@ on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) ->
|
|
|
%% do not apply the callback (which is basically to bump success or fail counter)
|
|
%% do not apply the callback (which is basically to bump success or fail counter)
|
|
|
ok.
|
|
ok.
|
|
|
|
|
|
|
|
|
|
+%% Note: since wolff client has its own replayq that is not managed by
|
|
|
|
|
+%% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise,
|
|
|
|
|
+%% `emqx_resource_manager' will kill the wolff producers and messages might be lost.
|
|
|
on_get_status(_InstId, #{client_id := ClientId, kafka_topic := KafkaTopic}) ->
|
|
on_get_status(_InstId, #{client_id := ClientId, kafka_topic := KafkaTopic}) ->
|
|
|
case wolff_client_sup:find_client(ClientId) of
|
|
case wolff_client_sup:find_client(ClientId) of
|
|
|
{ok, Pid} ->
|
|
{ok, Pid} ->
|
|
|
do_get_status(Pid, KafkaTopic);
|
|
do_get_status(Pid, KafkaTopic);
|
|
|
{error, _Reason} ->
|
|
{error, _Reason} ->
|
|
|
- disconnected
|
|
|
|
|
|
|
+ connecting
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
do_get_status(Client, KafkaTopic) ->
|
|
do_get_status(Client, KafkaTopic) ->
|
|
@@ -315,10 +318,10 @@ do_get_status(Client, KafkaTopic) ->
|
|
|
true ->
|
|
true ->
|
|
|
connected;
|
|
connected;
|
|
|
false ->
|
|
false ->
|
|
|
- disconnected
|
|
|
|
|
|
|
+ connecting
|
|
|
end;
|
|
end;
|
|
|
{error, _} ->
|
|
{error, _} ->
|
|
|
- disconnected
|
|
|
|
|
|
|
+ connecting
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
ssl(#{enable := true} = SSL) ->
|
|
ssl(#{enable := true} = SSL) ->
|