Просмотр исходного кода

fix(kafka): handle message_too_large

bump 'failed' counter
zmstone 1 год назад
Родитель
Сommit
3ac4ddcbe3

+ 9 - 4
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -487,12 +487,17 @@ do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) ->
 %% Wolff producer never gives up retrying
 %% so there can only be 'ok' results.
 on_kafka_ack(_Partition, Offset, {ReplyFn, Args}) when is_integer(Offset) ->
-    %% the ReplyFn is emqx_resource_buffer_worker:handle_async_reply/2
+    %% the ReplyFn is emqx_rule_runtime:inc_action_metrics/2
     apply(ReplyFn, Args ++ [ok]);
 on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) ->
-    %% wolff should bump the dropped_queue_full counter
-    %% do not apply the callback (which is basically to bump success or fail counter)
-    ok.
+    %% wolff should bump the dropped_queue_full counter in handle_telemetry_event/4
+    %% so there is no need to apply the callback here
+    ok;
+on_kafka_ack(_Partition, message_too_large, {ReplyFn, Args}) ->
+    %% wolff should bump the message 'dropped' counter with handle_telemetry_event/4.
+    %% however 'dropped' is not mapped to EMQX metrics name
+    %% so we reply error here
+    apply(ReplyFn, Args ++ [{error, message_too_large}]).
 
 %% Note: since wolff client has its own replayq that is not managed by
 %% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here.  Otherwise,

+ 27 - 3
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -185,6 +185,10 @@ action_config(ConnectorName, Overrides) ->
     emqx_utils_maps:deep_merge(Cfg1, Overrides).
 
 bridge_v2_config(ConnectorName) ->
+    KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
+    bridge_v2_config(ConnectorName, KafkaTopic).
+
+bridge_v2_config(ConnectorName, KafkaTopic) ->
     #{
         <<"connector">> => ConnectorName,
         <<"enable">> => true,
@@ -209,9 +213,7 @@ bridge_v2_config(ConnectorName) ->
             <<"query_mode">> => <<"sync">>,
             <<"required_acks">> => <<"all_isr">>,
             <<"sync_query_timeout">> => <<"5s">>,
-            <<"topic">> => list_to_binary(
-                emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition()
-            )
+            <<"topic">> => list_to_binary(KafkaTopic)
         },
         <<"local_topic">> => <<"kafka_t/#">>,
         <<"resource_opts">> => #{
@@ -378,6 +380,28 @@ t_local_topic(_) ->
     ok = emqx_connector:remove(?TYPE, test_connector),
     ok.
 
+t_message_too_large(_) ->
+    BridgeV2Config = bridge_v2_config(<<"test_connector4">>, "max-100-bytes"),
+    ConnectorConfig = connector_config(),
+    {ok, _} = emqx_connector:create(?TYPE, test_connector4, ConnectorConfig),
+    BridgeName = test_bridge4,
+    {ok, _} = emqx_bridge_v2:create(?TYPE, BridgeName, BridgeV2Config),
+    BridgeV2Id = emqx_bridge_v2:id(?TYPE, BridgeName),
+    TooLargePayload = iolist_to_binary(lists:duplicate(100, 100)),
+    ?assertEqual(0, emqx_resource_metrics:failed_get(BridgeV2Id)),
+    emqx:publish(emqx_message:make(<<"kafka_t/hej">>, TooLargePayload)),
+    ?retry(
+        _Sleep0 = 50,
+        _Attempts0 = 100,
+        begin
+            ?assertEqual(1, emqx_resource_metrics:failed_get(BridgeV2Id)),
+            ok
+        end
+    ),
+    ok = emqx_bridge_v2:remove(?TYPE, BridgeName),
+    ok = emqx_connector:remove(?TYPE, test_connector4),
+    ok.
+
 t_unknown_topic(_Config) ->
     ConnectorName = <<"test_connector">>,
     BridgeName = <<"test_bridge">>,