Procházet zdrojové kódy

Merge pull request #14048 from thalesmg/20241022-r58-kprodu-hc-connecting

feat(kafka producer): do not consider action disconnected once created
Thales Macedo Garitezi před 1 rokem
rodič
revize
c84f6857b3

+ 5 - 1
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -626,10 +626,13 @@ on_get_channel_status(
     } = maps:get(ActionResId, Channels),
     } = maps:get(ActionResId, Channels),
     try
     try
         ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions),
         ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions),
+        ?tp("kafka_producer_action_connected", #{}),
         ?status_connected
         ?status_connected
     catch
     catch
         throw:{unhealthy_target, Msg} ->
         throw:{unhealthy_target, Msg} ->
-            throw({unhealthy_target, Msg});
+            %% Once a producer is started, we don't want to delete it.  Apparently,
+            %% metadata queries may report false negatives about topic existence.
+            {?status_connecting, Msg};
         K:E ->
         K:E ->
             {?status_connecting, {K, E}}
             {?status_connecting, {K, E}}
     end.
     end.
@@ -753,6 +756,7 @@ check_topic_status(ClientId, WolffClientPid, KafkaTopic) ->
             ok;
             ok;
         {error, unknown_topic_or_partition} ->
         {error, unknown_topic_or_partition} ->
             Msg = iolist_to_binary([<<"Unknown topic or partition: ">>, KafkaTopic]),
             Msg = iolist_to_binary([<<"Unknown topic or partition: ">>, KafkaTopic]),
+            ?tp("kafka_producer_action_unknown_topic", #{}),
             throw({unhealthy_target, Msg});
             throw({unhealthy_target, Msg});
         {error, Reason} ->
         {error, Reason} ->
             throw(#{
             throw(#{

+ 91 - 2
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -56,11 +56,10 @@ init_per_suite(Config) ->
             emqx_bridge,
             emqx_bridge,
             emqx_rule_engine,
             emqx_rule_engine,
             emqx_management,
             emqx_management,
-            {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
+            emqx_mgmt_api_test_util:emqx_dashboard()
         ],
         ],
         #{work_dir => emqx_cth_suite:work_dir(Config)}
         #{work_dir => emqx_cth_suite:work_dir(Config)}
     ),
     ),
-    {ok, _} = emqx_common_test_http:create_default_app(),
     emqx_bridge_kafka_impl_producer_SUITE:wait_until_kafka_is_up(),
     emqx_bridge_kafka_impl_producer_SUITE:wait_until_kafka_is_up(),
     [
     [
         {apps, Apps},
         {apps, Apps},
@@ -209,6 +208,15 @@ ensure_kafka_topic(KafkaTopic) ->
         {error, topic_already_exists} -> ok
         {error, topic_already_exists} -> ok
     end.
     end.
 
 
+delete_kafka_topic(KafkaTopic) ->
+    Timeout = 1_000,
+    ConnConfig = #{},
+    Endpoints = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
+    case brod:delete_topics(Endpoints, [KafkaTopic], Timeout, ConnConfig) of
+        ok -> ok;
+        {error, unknown_topic_or_partition} -> ok
+    end.
+
 action_config(ConnectorName) ->
 action_config(ConnectorName) ->
     action_config(ConnectorName, _Overrides = #{}).
     action_config(ConnectorName, _Overrides = #{}).
 
 
@@ -368,6 +376,9 @@ tap_telemetry(HandlerId) ->
 
 
 -define(tapTelemetry(), tap_telemetry(?FUNCTION_NAME)).
 -define(tapTelemetry(), tap_telemetry(?FUNCTION_NAME)).
 
 
+simplify_result(Res) ->
+    emqx_bridge_v2_testlib:simplify_result(Res).
+
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% Testcases
 %% Testcases
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
@@ -1353,3 +1364,81 @@ t_migrate_old_replayq_dir(Config) ->
         end
         end
     ),
     ),
     ok.
     ok.
+
+%% Checks that we don't report a producer as `?status_disconnected' if it's already
+%% created.
+t_inexistent_topic_after_created(Config) ->
+    Type = proplists:get_value(type, Config, ?TYPE),
+    ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
+    ConnectorConfig = proplists:get_value(
+        connector_config, Config, connector_config_toxiproxy(Config)
+    ),
+    ActionName = atom_to_binary(?FUNCTION_NAME),
+    ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)),
+    Topic = atom_to_binary(?FUNCTION_NAME),
+    ActionConfig = emqx_bridge_v2_testlib:parse_and_check(
+        action,
+        Type,
+        ActionName,
+        emqx_utils_maps:deep_merge(
+            ActionConfig1,
+            #{
+                <<"parameters">> => #{
+                    <<"topic">> => Topic
+                },
+                <<"resource_opts">> => #{
+                    <<"health_check_interval">> => <<"1s">>
+                }
+            }
+        )
+    ),
+    ConnectorParams = [
+        {connector_config, ConnectorConfig},
+        {connector_name, ConnectorName},
+        {connector_type, Type}
+    ],
+    ActionParams = [
+        {action_config, ActionConfig},
+        {action_name, ActionName},
+        {action_type, Type}
+    ],
+    ?check_trace(
+        #{timetrap => 7_000},
+        begin
+            ensure_kafka_topic(Topic),
+            {201, #{<<"status">> := <<"connected">>}} =
+                simplify_result(emqx_bridge_v2_testlib:create_connector_api(ConnectorParams)),
+
+            %% Initially connected
+            ?assertMatch(
+                {201, #{<<"status">> := <<"connected">>}},
+                simplify_result(emqx_bridge_v2_testlib:create_action_api(ActionParams))
+            ),
+
+            %% After deleting the topic and a health check, it becomes connecting.
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    delete_kafka_topic(Topic),
+                    #{?snk_kind := "kafka_producer_action_unknown_topic"}
+                ),
+            ?assertMatch(
+                {200, #{<<"status">> := <<"connecting">>}},
+                simplify_result(emqx_bridge_v2_testlib:get_action_api(ActionParams))
+            ),
+
+            %% Recovers after topic is back
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    ensure_kafka_topic(Topic),
+                    #{?snk_kind := "kafka_producer_action_connected"}
+                ),
+            ?assertMatch(
+                {200, #{<<"status">> := <<"connected">>}},
+                simplify_result(emqx_bridge_v2_testlib:get_action_api(ActionParams))
+            ),
+
+            ok
+        end,
+        []
+    ),
+    ok.

+ 1 - 0
changes/ee/feat-14048.en.md

@@ -0,0 +1 @@
+Kafka/Confluent/Azure Event Hub Producer actions, once created successfully and healthy, no longer considered unhealthy even if they detect an unknown topic.  They will continue to queue messages even in such condition.