Просмотр исходного кода

Merge pull request #12067 from thalesmg/fix-kafka-check-client-m-20231130

fix(kafka_producer): add `resource_opts` to connector schema, and check for client connectivity
Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
cb60880bab

+ 73 - 0
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -146,6 +146,35 @@ create_bridge(Config, Overrides) ->
     ct:pal("creating bridge with config: ~p", [BridgeConfig]),
     emqx_bridge_v2:create(BridgeType, BridgeName, BridgeConfig).
 
+maybe_json_decode(X) ->
+    case emqx_utils_json:safe_decode(X, [return_maps]) of
+        {ok, Decoded} -> Decoded;
+        {error, _} -> X
+    end.
+
+request(Method, Path, Params) ->
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    Opts = #{return_all => true},
+    case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of
+        {ok, {Status, Headers, Body0}} ->
+            Body = maybe_json_decode(Body0),
+            {ok, {Status, Headers, Body}};
+        {error, {Status, Headers, Body0}} ->
+            Body =
+                case emqx_utils_json:safe_decode(Body0, [return_maps]) of
+                    {ok, Decoded0 = #{<<"message">> := Msg0}} ->
+                        Msg = maybe_json_decode(Msg0),
+                        Decoded0#{<<"message">> := Msg};
+                    {ok, Decoded0} ->
+                        Decoded0;
+                    {error, _} ->
+                        Body0
+                end,
+            {error, {Status, Headers, Body}};
+        Error ->
+            Error
+    end.
+
 list_bridges_api() ->
     Params = [],
     Path = emqx_mgmt_api_test_util:api_path(["actions"]),
@@ -209,6 +238,50 @@ create_bridge_api(Config, Overrides) ->
     ct:pal("bridge create result: ~p", [Res]),
     Res.
 
+create_connector_api(Config) ->
+    create_connector_api(Config, _Overrides = #{}).
+
+create_connector_api(Config, Overrides) ->
+    ConnectorConfig0 = ?config(connector_config, Config),
+    ConnectorName = ?config(connector_name, Config),
+    ConnectorType = ?config(connector_type, Config),
+    Method = post,
+    Path = emqx_mgmt_api_test_util:api_path(["connectors"]),
+    ConnectorConfig = emqx_utils_maps:deep_merge(ConnectorConfig0, Overrides),
+    Params = ConnectorConfig#{<<"type">> => ConnectorType, <<"name">> => ConnectorName},
+    ct:pal("creating connector (http):\n  ~p", [Params]),
+    Res = request(Method, Path, Params),
+    ct:pal("connector create (http) result:\n  ~p", [Res]),
+    Res.
+
+create_action_api(Config) ->
+    create_action_api(Config, _Overrides = #{}).
+
+create_action_api(Config, Overrides) ->
+    ActionName = ?config(action_name, Config),
+    ActionType = ?config(action_type, Config),
+    ActionConfig0 = ?config(action_config, Config),
+    ActionConfig = emqx_utils_maps:deep_merge(ActionConfig0, Overrides),
+    Params = ActionConfig#{<<"type">> => ActionType, <<"name">> => ActionName},
+    Method = post,
+    Path = emqx_mgmt_api_test_util:api_path(["actions"]),
+    ct:pal("creating action (http):\n  ~p", [Params]),
+    Res = request(Method, Path, Params),
+    ct:pal("action create (http) result:\n  ~p", [Res]),
+    Res.
+
+get_action_api(Config) ->
+    ActionName = ?config(action_name, Config),
+    ActionType = ?config(action_type, Config),
+    ActionId = emqx_bridge_resource:bridge_id(ActionType, ActionName),
+    Params = [],
+    Method = get,
+    Path = emqx_mgmt_api_test_util:api_path(["actions", ActionId]),
+    ct:pal("getting action (http)"),
+    Res = request(Method, Path, Params),
+    ct:pal("get action (http) result:\n  ~p", [Res]),
+    Res.
+
 update_bridge_api(Config) ->
     update_bridge_api(Config, _Overrides = #{}).
 

+ 12 - 7
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl

@@ -269,7 +269,11 @@ fields(Field) when
     Field == "put_connector";
     Field == "post_connector"
 ->
-    emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, kafka_connector_config_fields());
+    emqx_connector_schema:api_fields(
+        Field,
+        ?CONNECTOR_TYPE,
+        kafka_connector_config_fields()
+    );
 fields("post_" ++ Type) ->
     [type_field(Type), name_field() | fields("config_" ++ Type)];
 fields("put_" ++ Type) ->
@@ -508,8 +512,7 @@ fields(consumer_opts) ->
         {value_encoding_mode,
             mk(enum([none, base64]), #{
                 default => none, desc => ?DESC(consumer_value_encoding_mode)
-            })},
-        {resource_opts, mk(ref(resource_opts), #{default => #{}})}
+            })}
     ];
 fields(consumer_topic_mapping) ->
     [
@@ -623,7 +626,7 @@ kafka_connector_config_fields() ->
             })},
         {socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})},
         {ssl, mk(ref(ssl_client_opts), #{})}
-    ].
+    ] ++ [resource_opts()].
 
 producer_opts(ActionOrBridgeV1) ->
     [
@@ -631,9 +634,11 @@ producer_opts(ActionOrBridgeV1) ->
         %% for egress bridges with this config, the published messages
         %% will be forwarded to such bridges.
         {local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})},
-        parameters_field(ActionOrBridgeV1),
-        {resource_opts, mk(ref(resource_opts), #{default => #{}, desc => ?DESC(resource_opts)})}
-    ].
+        parameters_field(ActionOrBridgeV1)
+    ] ++ [resource_opts() || ActionOrBridgeV1 =:= action].
+
+resource_opts() ->
+    {resource_opts, mk(ref(resource_opts), #{default => #{}, desc => ?DESC(resource_opts)})}.
 
 %% Since e5.3.1, we want to rename the field 'kafka' to 'parameters'
 %% However we need to keep it backward compatible for generated schema json (version 0.1.0)

+ 53 - 15
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -81,11 +81,24 @@ on_start(InstId, Config) ->
     ClientId = InstId,
     emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId),
     ok = ensure_client(ClientId, Hosts, ClientConfig),
-    %% Check if this is a dry run
-    {ok, #{
-        client_id => ClientId,
-        installed_bridge_v2s => #{}
-    }}.
+    %% Note: we must return `{error, _}' here if the client cannot connect so that the
+    %% connector will immediately enter the `?status_disconnected' state, and then avoid
+    %% giving the impression that channels/actions may be added immediately and start
+    %% buffering, which won't happen if it's `?status_connecting'.  That would lead to
+    %% data loss, since Kafka Producer uses wolff's internal buffering, which is started
+    %% only when its producers start.
+    case check_client_connectivity(ClientId) of
+        ok ->
+            {ok, #{
+                client_id => ClientId,
+                installed_bridge_v2s => #{}
+            }};
+        {error, {find_client, Reason}} ->
+            %% Race condition?  Crash?  We just checked it with `ensure_client'...
+            {error, Reason};
+        {error, {connectivity, Reason}} ->
+            {error, Reason}
+    end.
 
 on_add_channel(
     InstId,
@@ -478,14 +491,18 @@ on_get_status(
     _InstId,
     #{client_id := ClientId} = State
 ) ->
-    case wolff_client_sup:find_client(ClientId) of
-        {ok, Pid} ->
-            case wolff_client:check_connectivity(Pid) of
-                ok -> ?status_connected;
-                {error, Error} -> {?status_connecting, State, Error}
-            end;
-        {error, _Reason} ->
-            ?status_connecting
+    %% Note: we must avoid returning `?status_disconnected' here if the connector ever was
+    %% connected.  If the connector ever connected, wolff producers might have been
+    %% sucessfully started, and returning `?status_disconnected' will make resource
+    %% manager try to restart the producers / connector, thus potentially dropping data
+    %% held in wolff producer's replayq.
+    case check_client_connectivity(ClientId) of
+        ok ->
+            ?status_connected;
+        {error, {find_client, _Error}} ->
+            ?status_connecting;
+        {error, {connectivity, Error}} ->
+            {?status_connecting, State, Error}
     end.
 
 on_get_channel_status(
@@ -496,13 +513,19 @@ on_get_channel_status(
         installed_bridge_v2s := Channels
     } = _State
 ) ->
+    %% Note: we must avoid returning `?status_disconnected' here. Returning
+    %% `?status_disconnected' will make resource manager try to restart the producers /
+    %% connector, thus potentially dropping data held in wolff producer's replayq.  The
+    %% only exception is if the topic does not exist ("unhealthy target").
     #{kafka_topic := KafkaTopic} = maps:get(ChannelId, Channels),
     try
         ok = check_topic_and_leader_connections(ClientId, KafkaTopic),
         ?status_connected
     catch
-        throw:#{reason := restarting} ->
-            ?status_connecting
+        throw:{unhealthy_target, Msg} ->
+            throw({unhealthy_target, Msg});
+        K:E ->
+            {?status_connecting, {K, E}}
     end.
 
 check_topic_and_leader_connections(ClientId, KafkaTopic) ->
@@ -524,6 +547,21 @@ check_topic_and_leader_connections(ClientId, KafkaTopic) ->
             })
     end.
 
+-spec check_client_connectivity(wolff:client_id()) ->
+    ok | {error, {connectivity | find_client, term()}}.
+check_client_connectivity(ClientId) ->
+    case wolff_client_sup:find_client(ClientId) of
+        {ok, Pid} ->
+            case wolff_client:check_connectivity(Pid) of
+                ok ->
+                    ok;
+                {error, Error} ->
+                    {error, {connectivity, Error}}
+            end;
+        {error, Reason} ->
+            {error, {find_client, Reason}}
+    end.
+
 check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic) when is_pid(ClientPid) ->
     Leaders =
         case wolff_client:get_leader_connections(ClientPid, KafkaTopic) of

+ 231 - 36
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -22,6 +22,7 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("brod/include/brod.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
@@ -35,6 +36,14 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
+    ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
+    ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
+    KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "toxiproxy.emqx.net"),
+    KafkaPort = list_to_integer(os:getenv("KAFKA_PLAIN_PORT", "9292")),
+    ProxyName = "kafka_plain",
+    DirectKafkaHost = os:getenv("KAFKA_DIRECT_PLAIN_HOST", "kafka-1.emqx.net"),
+    DirectKafkaPort = list_to_integer(os:getenv("KAFKA_DIRECT_PLAIN_PORT", "9092")),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
     Apps = emqx_cth_suite:start(
         [
             emqx,
@@ -50,17 +59,34 @@ init_per_suite(Config) ->
     ),
     {ok, _} = emqx_common_test_http:create_default_app(),
     emqx_bridge_kafka_impl_producer_SUITE:wait_until_kafka_is_up(),
-    [{apps, Apps} | Config].
+    [
+        {apps, Apps},
+        {proxy_host, ProxyHost},
+        {proxy_port, ProxyPort},
+        {proxy_name, ProxyName},
+        {kafka_host, KafkaHost},
+        {kafka_port, KafkaPort},
+        {direct_kafka_host, DirectKafkaHost},
+        {direct_kafka_port, DirectKafkaPort}
+        | Config
+    ].
 
 end_per_suite(Config) ->
     Apps = ?config(apps, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
     emqx_cth_suite:stop(Apps),
     ok.
 
 init_per_testcase(_TestCase, Config) ->
     Config.
 
-end_per_testcase(_TestCase, _Config) ->
+end_per_testcase(_TestCase, Config) ->
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
     emqx_common_test_helpers:call_janitor(60_000),
     ok.
 
@@ -69,6 +95,13 @@ end_per_testcase(_TestCase, _Config) ->
 %%-------------------------------------------------------------------------------------
 
 check_send_message_with_bridge(BridgeName) ->
+    #{offset := Offset, payload := Payload} = send_message(BridgeName),
+    %% ######################################
+    %% Check if message is sent to Kafka
+    %% ######################################
+    check_kafka_message_payload(Offset, Payload).
+
+send_message(ActionName) ->
     %% ######################################
     %% Create Kafka message
     %% ######################################
@@ -84,11 +117,8 @@ check_send_message_with_bridge(BridgeName) ->
     %% ######################################
     %% Send message
     %% ######################################
-    emqx_bridge_v2:send_message(?TYPE, BridgeName, Msg, #{}),
-    %% ######################################
-    %% Check if message is sent to Kafka
-    %% ######################################
-    check_kafka_message_payload(Offset, Payload).
+    emqx_bridge_v2:send_message(?TYPE, ActionName, Msg, #{}),
+    #{offset => Offset, payload => Payload}.
 
 resolve_kafka_offset() ->
     KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
@@ -106,6 +136,14 @@ check_kafka_message_payload(Offset, ExpectedPayload) ->
     {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset),
     ?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0).
 
+action_config(ConnectorName) ->
+    action_config(ConnectorName, _Overrides = #{}).
+
+action_config(ConnectorName, Overrides) ->
+    Cfg0 = bridge_v2_config(ConnectorName),
+    Cfg1 = emqx_utils_maps:rename(<<"kafka">>, <<"parameters">>, Cfg0),
+    emqx_utils_maps:deep_merge(Cfg1, Overrides).
+
 bridge_v2_config(ConnectorName) ->
     #{
         <<"connector">> => ConnectorName,
@@ -131,7 +169,9 @@ bridge_v2_config(ConnectorName) ->
             <<"query_mode">> => <<"sync">>,
             <<"required_acks">> => <<"all_isr">>,
             <<"sync_query_timeout">> => <<"5s">>,
-            <<"topic">> => emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition()
+            <<"topic">> => list_to_binary(
+                emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition()
+            )
         },
         <<"local_topic">> => <<"kafka_t/#">>,
         <<"resource_opts">> => #{
@@ -140,32 +180,37 @@ bridge_v2_config(ConnectorName) ->
     }.
 
 connector_config() ->
-    #{
-        <<"authentication">> => <<"none">>,
-        <<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()),
-        <<"connect_timeout">> => <<"5s">>,
-        <<"enable">> => true,
-        <<"metadata_request_timeout">> => <<"5s">>,
-        <<"min_metadata_refresh_interval">> => <<"3s">>,
-        <<"socket_opts">> =>
-            #{
-                <<"recbuf">> => <<"1024KB">>,
-                <<"sndbuf">> => <<"1024KB">>,
-                <<"tcp_keepalive">> => <<"none">>
-            },
-        <<"ssl">> =>
-            #{
-                <<"ciphers">> => [],
-                <<"depth">> => 10,
-                <<"enable">> => false,
-                <<"hibernate_after">> => <<"5s">>,
-                <<"log_level">> => <<"notice">>,
-                <<"reuse_sessions">> => true,
-                <<"secure_renegotiate">> => true,
-                <<"verify">> => <<"verify_peer">>,
-                <<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>]
-            }
-    }.
+    connector_config(_Overrides = #{}).
+
+connector_config(Overrides) ->
+    Defaults =
+        #{
+            <<"authentication">> => <<"none">>,
+            <<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()),
+            <<"connect_timeout">> => <<"5s">>,
+            <<"enable">> => true,
+            <<"metadata_request_timeout">> => <<"5s">>,
+            <<"min_metadata_refresh_interval">> => <<"3s">>,
+            <<"socket_opts">> =>
+                #{
+                    <<"recbuf">> => <<"1024KB">>,
+                    <<"sndbuf">> => <<"1024KB">>,
+                    <<"tcp_keepalive">> => <<"none">>
+                },
+            <<"ssl">> =>
+                #{
+                    <<"ciphers">> => [],
+                    <<"depth">> => 10,
+                    <<"enable">> => false,
+                    <<"hibernate_after">> => <<"5s">>,
+                    <<"log_level">> => <<"notice">>,
+                    <<"reuse_sessions">> => true,
+                    <<"secure_renegotiate">> => true,
+                    <<"verify">> => <<"verify_peer">>,
+                    <<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>]
+                }
+        },
+    emqx_utils_maps:deep_merge(Defaults, Overrides).
 
 kafka_hosts_string() ->
     KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "kafka-1.emqx.net"),
@@ -350,13 +395,13 @@ t_bad_url(_Config) ->
         {ok, #{
             resource_data :=
                 #{
-                    status := connecting,
+                    status := ?status_disconnected,
                     error := [#{reason := unresolvable_hostname}]
                 }
         }},
         emqx_connector:lookup(?TYPE, ConnectorName)
     ),
-    ?assertMatch({ok, #{status := connecting}}, emqx_bridge_v2:lookup(?TYPE, ActionName)),
+    ?assertMatch({ok, #{status := ?status_disconnected}}, emqx_bridge_v2:lookup(?TYPE, ActionName)),
     ok.
 
 t_parameters_key_api_spec(_Config) ->
@@ -383,3 +428,153 @@ t_http_api_get(_Config) ->
         emqx_bridge_testlib:list_bridges_api()
     ),
     ok.
+
+t_create_connector_while_connection_is_down(Config) ->
+    ProxyName = ?config(proxy_name, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    KafkaHost = ?config(kafka_host, Config),
+    KafkaPort = ?config(kafka_port, Config),
+    Host = iolist_to_binary([KafkaHost, ":", integer_to_binary(KafkaPort)]),
+    ?check_trace(
+        begin
+            Type = ?TYPE,
+            ConnectorConfig = connector_config(#{
+                <<"bootstrap_hosts">> => Host,
+                <<"resource_opts">> =>
+                    #{<<"health_check_interval">> => <<"500ms">>}
+            }),
+            ConnectorName = <<"c1">>,
+            ConnectorId = emqx_connector_resource:resource_id(Type, ConnectorName),
+            ConnectorParams = [
+                {connector_config, ConnectorConfig},
+                {connector_name, ConnectorName},
+                {connector_type, Type}
+            ],
+            ActionName = ConnectorName,
+            ActionId = emqx_bridge_v2:id(?TYPE, ActionName, ConnectorName),
+            ActionConfig = action_config(
+                ConnectorName
+            ),
+            ActionParams = [
+                {action_config, ActionConfig},
+                {action_name, ActionName},
+                {action_type, Type}
+            ],
+            Disconnected = atom_to_binary(?status_disconnected),
+            %% Initially, the connection cannot be stablished.  Messages are not buffered,
+            %% hence the status is `?status_disconnected'.
+            emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+                {ok, {{_, 201, _}, _, #{<<"status">> := Disconnected}}} =
+                    emqx_bridge_v2_testlib:create_connector_api(ConnectorParams),
+                {ok, {{_, 201, _}, _, #{<<"status">> := Disconnected}}} =
+                    emqx_bridge_v2_testlib:create_action_api(ActionParams),
+                #{offset := Offset1} = send_message(ActionName),
+                #{offset := Offset2} = send_message(ActionName),
+                #{offset := Offset3} = send_message(ActionName),
+                ?assertEqual([Offset1], lists:usort([Offset1, Offset2, Offset3])),
+                ?assertEqual(3, emqx_resource_metrics:matched_get(ActionId)),
+                ?assertEqual(3, emqx_resource_metrics:failed_get(ActionId)),
+                ?assertEqual(0, emqx_resource_metrics:queuing_get(ActionId)),
+                ?assertEqual(0, emqx_resource_metrics:inflight_get(ActionId)),
+                ?assertEqual(0, emqx_resource_metrics:dropped_get(ActionId)),
+                ok
+            end),
+            %% Let the connector and action recover
+            Connected = atom_to_binary(?status_connected),
+            ?retry(
+                _Sleep0 = 1_100,
+                _Attempts0 = 10,
+                begin
+                    _ = emqx_resource:health_check(ConnectorId),
+                    _ = emqx_resource:health_check(ActionId),
+                    ?assertMatch(
+                        {ok, #{
+                            status := ?status_connected,
+                            resource_data :=
+                                #{
+                                    status := ?status_connected,
+                                    added_channels :=
+                                        #{
+                                            ActionId := #{
+                                                status := ?status_connected
+                                            }
+                                        }
+                                }
+                        }},
+                        emqx_bridge_v2:lookup(Type, ActionName),
+                        #{action_id => ActionId}
+                    ),
+                    ?assertMatch(
+                        {ok, {{_, 200, _}, _, #{<<"status">> := Connected}}},
+                        emqx_bridge_v2_testlib:get_action_api(ActionParams)
+                    )
+                end
+            ),
+            %% Now the connection drops again; this time, status should be
+            %% `?status_connecting' to avoid destroying wolff_producers and their replayq
+            %% buffers.
+            Connecting = atom_to_binary(?status_connecting),
+            emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+                ?retry(
+                    _Sleep0 = 1_100,
+                    _Attempts0 = 10,
+                    begin
+                        _ = emqx_resource:health_check(ConnectorId),
+                        _ = emqx_resource:health_check(ActionId),
+                        ?assertMatch(
+                            {ok, #{
+                                status := ?status_connecting,
+                                resource_data :=
+                                    #{
+                                        status := ?status_connecting,
+                                        added_channels :=
+                                            #{
+                                                ActionId := #{
+                                                    status := ?status_connecting
+                                                }
+                                            }
+                                    }
+                            }},
+                            emqx_bridge_v2:lookup(Type, ActionName),
+                            #{action_id => ActionId}
+                        ),
+                        ?assertMatch(
+                            {ok, {{_, 200, _}, _, #{<<"status">> := Connecting}}},
+                            emqx_bridge_v2_testlib:get_action_api(ActionParams)
+                        )
+                    end
+                ),
+                %% This should get enqueued by wolff producers.
+                spawn_link(fun() -> send_message(ActionName) end),
+                PreviousMatched = 3,
+                PreviousFailed = 3,
+                ?retry(
+                    _Sleep2 = 100,
+                    _Attempts2 = 10,
+                    ?assertEqual(PreviousMatched + 1, emqx_resource_metrics:matched_get(ActionId))
+                ),
+                ?assertEqual(PreviousFailed, emqx_resource_metrics:failed_get(ActionId)),
+                ?assertEqual(1, emqx_resource_metrics:queuing_get(ActionId)),
+                ?assertEqual(0, emqx_resource_metrics:inflight_get(ActionId)),
+                ?assertEqual(0, emqx_resource_metrics:dropped_get(ActionId)),
+                ?assertEqual(0, emqx_resource_metrics:success_get(ActionId)),
+                ok
+            end),
+            ?retry(
+                _Sleep2 = 600,
+                _Attempts2 = 20,
+                begin
+                    _ = emqx_resource:health_check(ConnectorId),
+                    _ = emqx_resource:health_check(ActionId),
+                    ?assertEqual(1, emqx_resource_metrics:success_get(ActionId), #{
+                        metrics => emqx_bridge_v2:get_metrics(Type, ActionName)
+                    }),
+                    ok
+                end
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.

+ 20 - 13
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -1111,7 +1111,7 @@ is_channel_id(Id) ->
 %% Check if channel is installed in the connector state.
 %% There is no need to query the conncector if the channel is not
 %% installed as the query will fail anyway.
-pre_query_channel_check({Id, _} = _Request, Channels) when
+pre_query_channel_check({Id, _} = _Request, Channels, QueryOpts) when
     is_map_key(Id, Channels)
 ->
     ChannelStatus = maps:get(Id, Channels),
@@ -1119,18 +1119,25 @@ pre_query_channel_check({Id, _} = _Request, Channels) when
         true ->
             ok;
         false ->
-            maybe_throw_channel_not_installed(Id)
+            maybe_throw_channel_not_installed(Id, QueryOpts)
     end;
-pre_query_channel_check({Id, _} = _Request, _Channels) ->
-    maybe_throw_channel_not_installed(Id);
-pre_query_channel_check(_Request, _Channels) ->
+pre_query_channel_check({Id, _} = _Request, _Channels, QueryOpts) ->
+    maybe_throw_channel_not_installed(Id, QueryOpts);
+pre_query_channel_check(_Request, _Channels, _QueryOpts) ->
     ok.
 
-maybe_throw_channel_not_installed(Id) ->
-    %% Fail with a recoverable error if the channel is not installed
-    %% so that the operation can be retried. It is emqx_resource_manager's
-    %% responsibility to ensure that the channel installation is retried.
+maybe_throw_channel_not_installed(Id, QueryOpts) ->
+    %% Fail with a recoverable error if the channel is not installed and there are buffer
+    %% workers involved so that the operation can be retried.  Otherwise, this is
+    %% unrecoverable.  It is emqx_resource_manager's responsibility to ensure that the
+    %% channel installation is retried.
+    IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
     case is_channel_id(Id) of
+        true when IsSimpleQuery ->
+            error(
+                {unrecoverable_error,
+                    iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))}
+            );
         true ->
             error(
                 {recoverable_error,
@@ -1191,7 +1198,7 @@ apply_query_fun(
         ?APPLY_RESOURCE(
             call_query,
             begin
-                pre_query_channel_check(Request, Channels),
+                pre_query_channel_check(Request, Channels, QueryOpts),
                 Mod:on_query(extract_connector_id(Id), Request, ResSt)
             end,
             Request
@@ -1222,7 +1229,7 @@ apply_query_fun(
             AsyncWorkerMRef = undefined,
             InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef),
             ok = inflight_append(InflightTID, InflightItem),
-            pre_query_channel_check(Request, Channels),
+            pre_query_channel_check(Request, Channels, QueryOpts),
             Result = Mod:on_query_async(
                 extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, ResSt
             ),
@@ -1249,7 +1256,7 @@ apply_query_fun(
         ?APPLY_RESOURCE(
             call_batch_query,
             begin
-                pre_query_channel_check(FirstRequest, Channels),
+                pre_query_channel_check(FirstRequest, Channels, QueryOpts),
                 Mod:on_batch_query(extract_connector_id(Id), Requests, ResSt)
             end,
             Batch
@@ -1291,7 +1298,7 @@ apply_query_fun(
             AsyncWorkerMRef = undefined,
             InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef),
             ok = inflight_append(InflightTID, InflightItem),
-            pre_query_channel_check(FirstRequest, Channels),
+            pre_query_channel_check(FirstRequest, Channels, QueryOpts),
             Result = Mod:on_batch_query_async(
                 extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, ResSt
             ),

+ 5 - 0
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -1228,6 +1228,11 @@ channel_status({connecting, Error}) ->
         status => connecting,
         error => Error
     };
+channel_status(?status_disconnected) ->
+    #{
+        status => ?status_disconnected,
+        error => <<"Disconnected for unknown reason">>
+    };
 channel_status(connecting) ->
     #{
         status => connecting,