Просмотр исходного кода

Merge pull request #11890 from thalesmg/fix-kafka-unhealthy-r53-20231106

fix(resource): take error from action/connector before attempting query
Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
b255836cbd

+ 4 - 2
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -537,11 +537,13 @@ do_send_msg_with_enabled_config(
     BridgeType, BridgeName, Message, QueryOpts0, Config
 ) ->
     QueryMode = get_query_mode(BridgeType, Config),
+    ConnectorName = maps:get(connector, Config),
+    ConnectorResId = emqx_connector_resource:resource_id(BridgeType, ConnectorName),
     QueryOpts = maps:merge(
         emqx_bridge:query_opts(Config),
         QueryOpts0#{
-            query_mode => QueryMode,
-            query_mode_cache_override => false
+            connector_resource_id => ConnectorResId,
+            query_mode => QueryMode
         }
     ),
     BridgeV2Id = id(BridgeType, BridgeName),

+ 2 - 5
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -557,11 +557,8 @@ check_topic_status(ClientId, WolffClientPid, KafkaTopic) ->
         ok ->
             ok;
         {error, unknown_topic_or_partition} ->
-            throw(#{
-                error => unknown_kafka_topic,
-                kafka_client_id => ClientId,
-                kafka_topic => KafkaTopic
-            });
+            Msg = iolist_to_binary([<<"Unknown topic or partition: ">>, KafkaTopic]),
+            throw({unhealthy_target, Msg});
         {error, Reason} ->
             throw(#{
                 error => failed_to_check_topic_status,

+ 8 - 2
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl

@@ -574,8 +574,14 @@ t_nonexistent_topic(_Config) ->
         erlang:list_to_atom(Type), erlang:list_to_atom(Name), Conf
     ),
     % TODO: make sure the user facing APIs for Bridge V1 also get this error
-    #{status := disconnected, error := #{error := unknown_kafka_topic}} = emqx_bridge_v2:health_check(
-        ?BRIDGE_TYPE_V2, list_to_atom(Name)
+    ?assertMatch(
+        #{
+            status := disconnected,
+            error := {unhealthy_target, <<"Unknown topic or partition: undefined-test-topic">>}
+        },
+        emqx_bridge_v2:health_check(
+            ?BRIDGE_TYPE_V2, list_to_atom(Name)
+        )
     ),
     ok = emqx_bridge:remove(list_to_atom(Type), list_to_atom(Name)),
     delete_all_bridges(),

+ 27 - 0
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -140,6 +140,33 @@ t_local_topic(_) ->
     ok = emqx_connector:remove(?TYPE, test_connector),
     ok.
 
+t_unknown_topic(_Config) ->
+    ConnectorName = <<"test_connector">>,
+    BridgeName = <<"test_bridge">>,
+    BridgeV2Config0 = bridge_v2_config(ConnectorName),
+    BridgeV2Config = emqx_utils_maps:deep_put(
+        [<<"kafka">>, <<"topic">>],
+        BridgeV2Config0,
+        <<"nonexistent">>
+    ),
+    ConnectorConfig = connector_config(),
+    {ok, _} = emqx_connector:create(?TYPE, ConnectorName, ConnectorConfig),
+    {ok, _} = emqx_bridge_v2:create(?TYPE, BridgeName, BridgeV2Config),
+    Payload = <<"will be dropped">>,
+    emqx:publish(emqx_message:make(<<"kafka_t/local">>, Payload)),
+    BridgeV2Id = emqx_bridge_v2:id(?TYPE, BridgeName),
+    ?retry(
+        _Sleep0 = 50,
+        _Attempts0 = 100,
+        begin
+            ?assertEqual(1, emqx_resource_metrics:matched_get(BridgeV2Id)),
+            ?assertEqual(1, emqx_resource_metrics:dropped_get(BridgeV2Id)),
+            ?assertEqual(1, emqx_resource_metrics:dropped_resource_stopped_get(BridgeV2Id)),
+            ok
+        end
+    ),
+    ok.
+
 check_send_message_with_bridge(BridgeName) ->
     %% ######################################
     %% Create Kafka message

+ 2 - 2
apps/emqx_resource/include/emqx_resource.hrl

@@ -22,7 +22,7 @@
 -type resource_spec() :: map().
 -type resource_state() :: term().
 -type resource_status() :: connected | disconnected | connecting | stopped.
--type channel_status() :: connected | connecting.
+-type channel_status() :: connected | connecting | disconnected.
 -type callback_mode() :: always_sync | async_if_possible.
 -type query_mode() ::
     simple_sync
@@ -47,7 +47,7 @@
     simple_query => boolean(),
     reply_to => reply_fun(),
     query_mode => query_mode(),
-    query_mode_cache_override => boolean()
+    connector_resource_id => resource_id()
 }.
 -type resource_data() :: #{
     id := resource_id(),

+ 10 - 28
apps/emqx_resource/src/emqx_resource.erl

@@ -379,32 +379,32 @@ query(ResId, Request) ->
 -spec query(resource_id(), Request :: term(), query_opts()) ->
     Result :: term().
 query(ResId, Request, Opts) ->
-    case get_query_mode_error(ResId, Opts) of
+    case emqx_resource_manager:get_query_mode_and_last_error(ResId, Opts) of
         {error, _} = ErrorTuple ->
             ErrorTuple;
-        {_, unhealthy_target} ->
+        {ok, {_, unhealthy_target}} ->
             emqx_resource_metrics:matched_inc(ResId),
             emqx_resource_metrics:dropped_resource_stopped_inc(ResId),
             ?RESOURCE_ERROR(unhealthy_target, "unhealthy target");
-        {_, {unhealthy_target, _Message}} ->
+        {ok, {_, {unhealthy_target, Message}}} ->
             emqx_resource_metrics:matched_inc(ResId),
             emqx_resource_metrics:dropped_resource_stopped_inc(ResId),
-            ?RESOURCE_ERROR(unhealthy_target, "unhealthy target");
-        {simple_async, _} ->
+            ?RESOURCE_ERROR(unhealthy_target, Message);
+        {ok, {simple_async, _}} ->
             %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
             %% so the buffer worker does not need to lookup the cache again
             emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts);
-        {simple_sync, _} ->
+        {ok, {simple_sync, _}} ->
             %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
             %% so the buffer worker does not need to lookup the cache again
             emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts);
-        {simple_async_internal_buffer, _} ->
+        {ok, {simple_async_internal_buffer, _}} ->
             %% This is for bridges/connectors that have internal buffering, such
             %% as Kafka and Pulsar producers.
             %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
             %% so the buffer worker does not need to lookup the cache again
             emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts);
-        {simple_sync_internal_buffer, _} ->
+        {ok, {simple_sync_internal_buffer, _}} ->
             %% This is for bridges/connectors that have internal buffering, such
             %% as Kafka and Pulsar producers.
             %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
@@ -412,30 +412,12 @@ query(ResId, Request, Opts) ->
             emqx_resource_buffer_worker:simple_sync_internal_buffer_query(
                 ResId, Request, Opts
             );
-        {sync, _} ->
+        {ok, {sync, _}} ->
             emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
-        {async, _} ->
+        {ok, {async, _}} ->
             emqx_resource_buffer_worker:async_query(ResId, Request, Opts)
     end.
 
-get_query_mode_error(ResId, Opts) ->
-    case maps:get(query_mode_cache_override, Opts, true) of
-        false ->
-            case Opts of
-                #{query_mode := QueryMode} ->
-                    {QueryMode, ok};
-                _ ->
-                    {async, unhealthy_target}
-            end;
-        true ->
-            case emqx_resource_manager:lookup_cached(ResId) of
-                {ok, _Group, #{query_mode := QM, error := Error}} ->
-                    {QM, Error};
-                {error, not_found} ->
-                    {error, not_found}
-            end
-    end.
-
 -spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term().
 simple_sync_query(ResId, Request) ->
     emqx_resource_buffer_worker:simple_sync_query(ResId, Request).

+ 43 - 1
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -45,7 +45,8 @@
     lookup_cached/1,
     get_metrics/1,
     reset_metrics/1,
-    channel_status_is_channel_added/1
+    channel_status_is_channel_added/1,
+    get_query_mode_and_last_error/2
 ]).
 
 -export([
@@ -75,6 +76,7 @@
     extra
 }).
 -type data() :: #data{}.
+-type channel_status_map() :: #{status := channel_status(), error := term()}.
 
 -define(NAME(ResId), {n, l, {?MODULE, ResId}}).
 -define(REF(ResId), {via, gproc, ?NAME(ResId)}).
@@ -326,6 +328,46 @@ remove_channel(ResId, ChannelId) ->
 get_channels(ResId) ->
     safe_call(ResId, get_channels, ?T_OPERATION).
 
+-spec get_query_mode_and_last_error(resource_id(), query_opts()) ->
+    {ok, {query_mode(), LastError}} | {error, not_found}
+when
+    LastError ::
+        unhealthy_target
+        | {unhealthy_target, binary()}
+        | channel_status_map()
+        | term().
+get_query_mode_and_last_error(RequestResId, Opts = #{connector_resource_id := ResId}) ->
+    do_get_query_mode_error(ResId, RequestResId, Opts);
+get_query_mode_and_last_error(RequestResId, Opts) ->
+    do_get_query_mode_error(RequestResId, RequestResId, Opts).
+
+do_get_query_mode_error(ResId, RequestResId, Opts) ->
+    case emqx_resource_manager:lookup_cached(ResId) of
+        {ok, _Group, ResourceData} ->
+            QM = get_query_mode(ResourceData, Opts),
+            Error = get_error(RequestResId, ResourceData),
+            {ok, {QM, Error}};
+        {error, not_found} ->
+            {error, not_found}
+    end.
+
+get_query_mode(_ResourceData, #{query_mode := QM}) ->
+    QM;
+get_query_mode(#{query_mode := QM}, _Opts) ->
+    QM.
+
+get_error(ResId, #{added_channels := #{} = Channels} = ResourceData) when
+    is_map_key(ResId, Channels)
+->
+    case maps:get(ResId, Channels) of
+        #{error := Error} ->
+            Error;
+        _ ->
+            maps:get(error, ResourceData, undefined)
+    end;
+get_error(_ResId, #{error := Error}) ->
+    Error.
+
 %% Server start/stop callbacks
 
 %% @doc Function called from the supervisor to actually start the server