瀏覽代碼

fix(kafka_producer): do not return disconnected when checking status (r5.1)

Fixes https://emqx.atlassian.net/browse/EMQX-10279

Related: https://github.com/emqx/emqx/pull/11038

Since wolff client has its own replayq that lives outside the management of the buffer
workers, we must not return disconnected status for such bridge: otherwise, the resource
manager will eventually kill the producers and data may be lost.
Thales Macedo Garitezi 2 年之前
父節點
當前提交
25b0e31035
共有 2 個文件被更改,包括 7 次插入3 次删除
  1. 6 3
      apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
  2. 1 0
      changes/ee/fix-11040.en.md

+ 6 - 3
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -291,12 +291,15 @@ on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) ->
     %% do not apply the callback (which is basically to bump success or fail counter)
     ok.
 
+%% Note: since wolff client has its own replayq that is not managed by
+%% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here.  Otherwise,
+%% `emqx_resource_manager' will kill the wolff producers and messages might be lost.
 on_get_status(_InstId, #{client_id := ClientId, kafka_topic := KafkaTopic}) ->
     case wolff_client_sup:find_client(ClientId) of
         {ok, Pid} ->
             do_get_status(Pid, KafkaTopic);
         {error, _Reason} ->
-            disconnected
+            connecting
     end.
 
 do_get_status(Client, KafkaTopic) ->
@@ -315,10 +318,10 @@ do_get_status(Client, KafkaTopic) ->
                 true ->
                     connected;
                 false ->
-                    disconnected
+                    connecting
             end;
         {error, _} ->
-            disconnected
+            connecting
     end.
 
 ssl(#{enable := true} = SSL) ->

+ 1 - 0
changes/ee/fix-11040.en.md

@@ -0,0 +1 @@
+Fixed a health check issue for Kafka Producer that could lead to loss of messages when the connection to Kafka's brokers were down.