Browse Source

feat(kafka producer): do not consider action disconnected once created

Fixes https://emqx.atlassian.net/browse/EMQX-13338
Thales Macedo Garitezi 1 year ago
parent
commit
bd515883ea

+ 5 - 1
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -626,10 +626,13 @@ on_get_channel_status(
     } = maps:get(ActionResId, Channels),
     try
         ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions),
+        ?tp("kafka_producer_action_connected", #{}),
         ?status_connected
     catch
         throw:{unhealthy_target, Msg} ->
-            throw({unhealthy_target, Msg});
+            %% Once a producer is started, we don't want to delete it.  Apparently,
+            %% metadata queries may report false negatives about topic existence.
+            {?status_connecting, Msg};
         K:E ->
             {?status_connecting, {K, E}}
     end.
@@ -753,6 +756,7 @@ check_topic_status(ClientId, WolffClientPid, KafkaTopic) ->
             ok;
         {error, unknown_topic_or_partition} ->
             Msg = iolist_to_binary([<<"Unknown topic or partition: ">>, KafkaTopic]),
+            ?tp("kafka_producer_action_unknown_topic", #{}),
             throw({unhealthy_target, Msg});
         {error, Reason} ->
             throw(#{

+ 91 - 2
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -56,11 +56,10 @@ init_per_suite(Config) ->
             emqx_bridge,
             emqx_rule_engine,
             emqx_management,
-            {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
+            emqx_mgmt_api_test_util:emqx_dashboard()
         ],
         #{work_dir => emqx_cth_suite:work_dir(Config)}
     ),
-    {ok, _} = emqx_common_test_http:create_default_app(),
     emqx_bridge_kafka_impl_producer_SUITE:wait_until_kafka_is_up(),
     [
         {apps, Apps},
@@ -209,6 +208,15 @@ ensure_kafka_topic(KafkaTopic) ->
         {error, topic_already_exists} -> ok
     end.
 
+delete_kafka_topic(KafkaTopic) ->
+    Timeout = 1_000,
+    ConnConfig = #{},
+    Endpoints = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
+    case brod:delete_topics(Endpoints, [KafkaTopic], Timeout, ConnConfig) of
+        ok -> ok;
+        {error, unknown_topic_or_partition} -> ok
+    end.
+
 action_config(ConnectorName) ->
     action_config(ConnectorName, _Overrides = #{}).
 
@@ -368,6 +376,9 @@ tap_telemetry(HandlerId) ->
 
 -define(tapTelemetry(), tap_telemetry(?FUNCTION_NAME)).
 
+simplify_result(Res) ->
+    emqx_bridge_v2_testlib:simplify_result(Res).
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -1353,3 +1364,81 @@ t_migrate_old_replayq_dir(Config) ->
         end
     ),
     ok.
+
+%% Checks that we don't report a producer as `?status_disconnected' if it's already
+%% created.
+t_inexistent_topic_after_created(Config) ->
+    Type = proplists:get_value(type, Config, ?TYPE),
+    ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
+    ConnectorConfig = proplists:get_value(
+        connector_config, Config, connector_config_toxiproxy(Config)
+    ),
+    ActionName = atom_to_binary(?FUNCTION_NAME),
+    ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)),
+    Topic = atom_to_binary(?FUNCTION_NAME),
+    ActionConfig = emqx_bridge_v2_testlib:parse_and_check(
+        action,
+        Type,
+        ActionName,
+        emqx_utils_maps:deep_merge(
+            ActionConfig1,
+            #{
+                <<"parameters">> => #{
+                    <<"topic">> => Topic
+                },
+                <<"resource_opts">> => #{
+                    <<"health_check_interval">> => <<"1s">>
+                }
+            }
+        )
+    ),
+    ConnectorParams = [
+        {connector_config, ConnectorConfig},
+        {connector_name, ConnectorName},
+        {connector_type, Type}
+    ],
+    ActionParams = [
+        {action_config, ActionConfig},
+        {action_name, ActionName},
+        {action_type, Type}
+    ],
+    ?check_trace(
+        #{timetrap => 7_000},
+        begin
+            ensure_kafka_topic(Topic),
+            {201, #{<<"status">> := <<"connected">>}} =
+                simplify_result(emqx_bridge_v2_testlib:create_connector_api(ConnectorParams)),
+
+            %% Initially connected
+            ?assertMatch(
+                {201, #{<<"status">> := <<"connected">>}},
+                simplify_result(emqx_bridge_v2_testlib:create_action_api(ActionParams))
+            ),
+
+            %% After deleting the topic and a health check, it becomes connecting.
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    delete_kafka_topic(Topic),
+                    #{?snk_kind := "kafka_producer_action_unknown_topic"}
+                ),
+            ?assertMatch(
+                {200, #{<<"status">> := <<"connecting">>}},
+                simplify_result(emqx_bridge_v2_testlib:get_action_api(ActionParams))
+            ),
+
+            %% Recovers after topic is back
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    ensure_kafka_topic(Topic),
+                    #{?snk_kind := "kafka_producer_action_connected"}
+                ),
+            ?assertMatch(
+                {200, #{<<"status">> := <<"connected">>}},
+                simplify_result(emqx_bridge_v2_testlib:get_action_api(ActionParams))
+            ),
+
+            ok
+        end,
+        []
+    ),
+    ok.

+ 1 - 0
changes/ee/feat-14048.en.md

@@ -0,0 +1 @@
+Kafka/Confluent/Azure Event Hub Producer actions, once created successfully and healthy, no longer considered unhealthy even if they detect an unknown topic.  They will continue to queue messages even in such condition.