Przeglądaj źródła

Merge remote-tracking branch 'origin/master' into 0621-merge-release-51-to-master

Zaiming (Stone) Shi 2 lat temu
rodzic
commit
c58a98954b

+ 1 - 1
apps/emqx/rebar.config

@@ -29,7 +29,7 @@
     {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}},
     {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.3"}}},
     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
-    {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.9"}}},
+    {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.10"}}},
     {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},
     {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
     {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},

+ 1 - 1
apps/emqx/test/emqx_schema_tests.erl

@@ -108,7 +108,7 @@ ssl_opts_version_gap_test_() ->
 
 ssl_opts_cert_depth_test() ->
     Sc = emqx_schema:server_ssl_opts_schema(#{}, false),
-    Reason = #{expected_type => "non_neg_integer()"},
+    Reason = #{expected => "non_neg_integer()"},
     ?assertThrow(
         {_Sc, [#{kind := validation_error, reason := Reason}]},
         validate(Sc, #{<<"depth">> => -1})

+ 1 - 1
apps/emqx_bridge_kafka/rebar.config

@@ -1,6 +1,6 @@
 %% -*- mode: erlang; -*-
 {erl_opts, [debug_info]}.
-{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.5"}}}
+{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.6"}}}
        , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}}
        , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0"}}}
        , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}

+ 55 - 20
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -73,6 +73,12 @@ on_start(InstId, Config) ->
         sasl => emqx_bridge_kafka_impl:sasl(Auth),
         ssl => ssl(SSL)
     },
+    case do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) of
+        unhealthy_target ->
+            throw(unhealthy_target);
+        _ ->
+            ok
+    end,
     case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
         {ok, _} ->
             ?SLOG(info, #{
@@ -108,7 +114,9 @@ on_start(InstId, Config) ->
                 kafka_topic => KafkaTopic,
                 producers => Producers,
                 resource_id => ResourceId,
-                sync_query_timeout => SyncQueryTimeout
+                sync_query_timeout => SyncQueryTimeout,
+                hosts => Hosts,
+                kafka_config => KafkaConfig
             }};
         {error, Reason2} ->
             ?SLOG(error, #{
@@ -131,6 +139,7 @@ on_start(InstId, Config) ->
                     client_id => ClientId
                 }
             ),
+
             throw(
                 "Failed to start Kafka client. Please check the logs for errors and check"
                 " the connection parameters."
@@ -294,34 +303,60 @@ on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) ->
 %% Note: since wolff client has its own replayq that is not managed by
 %% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here.  Otherwise,
 %% `emqx_resource_manager' will kill the wolff producers and messages might be lost.
-on_get_status(_InstId, #{client_id := ClientId, kafka_topic := KafkaTopic}) ->
+on_get_status(_InstId, #{client_id := ClientId} = State) ->
     case wolff_client_sup:find_client(ClientId) of
         {ok, Pid} ->
-            do_get_status(Pid, KafkaTopic);
+            case do_get_status(Pid, State) of
+                ok -> connected;
+                unhealthy_target -> {disconnected, State, unhealthy_target};
+                error -> connecting
+            end;
         {error, _Reason} ->
             connecting
     end.
 
-do_get_status(Client, KafkaTopic) ->
-    %% TODO: add a wolff_producers:check_connectivity
+do_get_status(Client, #{kafka_topic := KafkaTopic, hosts := Hosts, kafka_config := KafkaConfig}) ->
+    case do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) of
+        unhealthy_target ->
+            unhealthy_target;
+        _ ->
+            case do_get_healthy_leaders(Client, KafkaTopic) of
+                [] -> error;
+                _ -> ok
+            end
+    end.
+
+do_get_healthy_leaders(Client, KafkaTopic) ->
     case wolff_client:get_leader_connections(Client, KafkaTopic) of
         {ok, Leaders} ->
-            %% Kafka is considered healthy as long as any of the partition leader is reachable
-            case
-                lists:any(
-                    fun({_Partition, Pid}) ->
-                        is_pid(Pid) andalso erlang:is_process_alive(Pid)
-                    end,
-                    Leaders
-                )
-            of
-                true ->
-                    connected;
-                false ->
-                    connecting
-            end;
+            %% Kafka is considered healthy as long as any of the partition leader is reachable.
+            lists:filtermap(
+                fun({_Partition, Pid}) ->
+                    case is_pid(Pid) andalso erlang:is_process_alive(Pid) of
+                        true -> {true, Pid};
+                        _ -> false
+                    end
+                end,
+                Leaders
+            );
         {error, _} ->
-            connecting
+            []
+    end.
+
+do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) ->
+    CheckTopicFun =
+        fun() ->
+            wolff_client:check_if_topic_exists(Hosts, KafkaConfig, KafkaTopic)
+        end,
+    try
+        case emqx_utils:nolink_apply(CheckTopicFun, 5_000) of
+            ok -> ok;
+            {error, unknown_topic_or_partition} -> unhealthy_target;
+            _ -> error
+        end
+    catch
+        _:_ ->
+            error
     end.
 
 ssl(#{enable := true} = SSL) ->

+ 32 - 0
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl

@@ -472,6 +472,38 @@ t_failed_creation_then_fix(Config) ->
     delete_all_bridges(),
     ok.
 
+t_table_removed(_Config) ->
+    HostsString = kafka_hosts_string_sasl(),
+    AuthSettings = valid_sasl_plain_settings(),
+    Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
+    Type = ?BRIDGE_TYPE,
+    Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
+    ResourceId = emqx_bridge_resource:resource_id(Type, Name),
+    BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
+    KafkaTopic = "undefined-test-topic",
+    Conf = config(#{
+        "authentication" => AuthSettings,
+        "kafka_hosts_string" => HostsString,
+        "kafka_topic" => KafkaTopic,
+        "instance_id" => ResourceId,
+        "producer" => #{
+            "kafka" => #{
+                "buffer" => #{
+                    "memory_overload_protection" => false
+                }
+            }
+        },
+        "ssl" => #{}
+    }),
+    {ok, #{config := ValidConfigAtom1}} = emqx_bridge:create(
+        Type, erlang:list_to_atom(Name), Conf
+    ),
+    ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name},
+    ?assertThrow(_, ?PRODUCER:on_start(ResourceId, ValidConfigAtom)),
+    ok = emqx_bridge_resource:remove(BridgeId),
+    delete_all_bridges(),
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Helper functions
 %%------------------------------------------------------------------------------

+ 69 - 11
apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl

@@ -179,18 +179,39 @@ sql_drop_table() ->
 sql_check_table_exist() ->
     "SELECT COUNT(*) FROM user_tables WHERE table_name = 'MQTT_TEST'".
 
+new_jamdb_connection(Config) ->
+    JamdbOpts = [
+        {host, ?config(oracle_host, Config)},
+        {port, ?config(oracle_port, Config)},
+        {user, "system"},
+        {password, "oracle"},
+        {sid, ?SID}
+    ],
+    jamdb_oracle:start(JamdbOpts).
+
+close_jamdb_connection(Conn) ->
+    jamdb_oracle:stop(Conn).
+
 reset_table(Config) ->
-    ResourceId = resource_id(Config),
-    drop_table_if_exists(Config),
-    {ok, [{proc_result, 0, _}]} = emqx_resource:simple_sync_query(
-        ResourceId, {sql, sql_create_table()}
-    ),
+    {ok, Conn} = new_jamdb_connection(Config),
+    try
+        ok = drop_table_if_exists(Conn),
+        {ok, [{proc_result, 0, _}]} = jamdb_oracle:sql_query(Conn, sql_create_table())
+    after
+        close_jamdb_connection(Conn)
+    end,
     ok.
 
+drop_table_if_exists(Conn) when is_pid(Conn) ->
+    {ok, [{proc_result, 0, _}]} = jamdb_oracle:sql_query(Conn, sql_drop_table()),
+    ok;
 drop_table_if_exists(Config) ->
-    ResourceId = resource_id(Config),
-    {ok, [{proc_result, 0, _}]} =
-        emqx_resource:simple_sync_query(ResourceId, {query, sql_drop_table()}),
+    {ok, Conn} = new_jamdb_connection(Config),
+    try
+        ok = drop_table_if_exists(Conn)
+    after
+        close_jamdb_connection(Conn)
+    end,
     ok.
 
 oracle_config(TestCase, _ConnectionType, Config) ->
@@ -216,7 +237,7 @@ oracle_config(TestCase, _ConnectionType, Config) ->
             "  pool_size = 1\n"
             "  sql = \"~s\"\n"
             "  resource_opts = {\n"
-            "     health_check_interval = \"5s\"\n"
+            "     health_check_interval = \"15s\"\n"
             "     request_ttl = \"30s\"\n"
             "     query_mode = \"async\"\n"
             "     batch_size = 3\n"
@@ -349,13 +370,13 @@ t_sync_query(Config) ->
     ResourceId = resource_id(Config),
     ?check_trace(
         begin
+            reset_table(Config),
             ?assertMatch({ok, _}, create_bridge_api(Config)),
             ?retry(
                 _Sleep = 1_000,
                 _Attempts = 20,
                 ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
             ),
-            reset_table(Config),
             MsgId = erlang:unique_integer(),
             Params = #{
                 topic => ?config(mqtt_topic, Config),
@@ -381,13 +402,13 @@ t_batch_sync_query(Config) ->
     BridgeId = bridge_id(Config),
     ?check_trace(
         begin
+            reset_table(Config),
             ?assertMatch({ok, _}, create_bridge_api(Config)),
             ?retry(
                 _Sleep = 1_000,
                 _Attempts = 30,
                 ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
             ),
-            reset_table(Config),
             MsgId = erlang:unique_integer(),
             Params = #{
                 topic => ?config(mqtt_topic, Config),
@@ -464,6 +485,7 @@ t_start_stop(Config) ->
     ResourceId = resource_id(Config),
     ?check_trace(
         begin
+            reset_table(Config),
             ?assertMatch({ok, _}, create_bridge(Config)),
             %% Since the connection process is async, we give it some time to
             %% stabilize and avoid flakiness.
@@ -515,6 +537,7 @@ t_on_get_status(Config) ->
     ProxyHost = ?config(proxy_host, Config),
     ProxyName = ?config(proxy_name, Config),
     ResourceId = resource_id(Config),
+    reset_table(Config),
     ?assertMatch({ok, _}, create_bridge(Config)),
     %% Since the connection process is async, we give it some time to
     %% stabilize and avoid flakiness.
@@ -547,10 +570,45 @@ t_no_sid_nor_service_name(Config0) ->
     ),
     ok.
 
+t_missing_table(Config) ->
+    ResourceId = resource_id(Config),
+    ?check_trace(
+        begin
+            drop_table_if_exists(Config),
+            ?assertMatch({ok, _}, create_bridge_api(Config)),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertMatch(
+                    {ok, Status} when Status =:= disconnected orelse Status =:= connecting,
+                    emqx_resource_manager:health_check(ResourceId)
+                )
+            ),
+            MsgId = erlang:unique_integer(),
+            Params = #{
+                topic => ?config(mqtt_topic, Config),
+                id => MsgId,
+                payload => ?config(oracle_name, Config),
+                retain => true
+            },
+            Message = {send_message, Params},
+            ?assertMatch(
+                {error, {resource_error, #{reason := not_connected}}},
+                emqx_resource:simple_sync_query(ResourceId, Message)
+            ),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertNotMatch([], ?of_kind(oracle_undefined_table, Trace)),
+            ok
+        end
+    ).
+
 t_table_removed(Config) ->
     ResourceId = resource_id(Config),
     ?check_trace(
         begin
+            reset_table(Config),
             ?assertMatch({ok, _}, create_bridge_api(Config)),
             ?retry(
                 _Sleep = 1_000,

+ 67 - 0
apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl

@@ -257,6 +257,12 @@ query_resource(Config, Request) ->
     ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
     emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
 
+query_resource_sync(Config, Request) ->
+    Name = ?config(pgsql_name, Config),
+    BridgeType = ?config(pgsql_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+    emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request).
+
 query_resource_async(Config, Request) ->
     query_resource_async(Config, Request, _Opts = #{}).
 
@@ -634,3 +640,64 @@ t_nasty_sql_string(Config) ->
             1_000
         ),
     ?assertEqual(Payload, connect_and_get_payload(Config)).
+
+t_missing_table(Config) ->
+    Name = ?config(pgsql_name, Config),
+    BridgeType = ?config(pgsql_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+
+    ?check_trace(
+        begin
+            connect_and_drop_table(Config),
+            ?assertMatch({ok, _}, create_bridge(Config)),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertMatch(
+                    {ok, Status} when Status == connecting orelse Status == disconnected,
+                    emqx_resource_manager:health_check(ResourceID)
+                )
+            ),
+            Val = integer_to_binary(erlang:unique_integer()),
+            SentData = #{payload => Val, timestamp => 1668602148000},
+            Timeout = 1000,
+            ?assertMatch(
+                {error, {resource_error, #{reason := unhealthy_target}}},
+                query_resource(Config, {send_message, SentData, [], Timeout})
+            ),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertMatch([_, _, _], ?of_kind(pgsql_undefined_table, Trace)),
+            ok
+        end
+    ),
+    connect_and_create_table(Config),
+    ok.
+
+t_table_removed(Config) ->
+    Name = ?config(pgsql_name, Config),
+    BridgeType = ?config(pgsql_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+    ?check_trace(
+        begin
+            connect_and_create_table(Config),
+            ?assertMatch({ok, _}, create_bridge(Config)),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
+            ),
+            connect_and_drop_table(Config),
+            Val = integer_to_binary(erlang:unique_integer()),
+            SentData = #{payload => Val, timestamp => 1668602148000},
+            ?assertMatch(
+                {error, {unrecoverable_error, {error, error, <<"42P01">>, undefined_table, _, _}}},
+                query_resource_sync(Config, {send_message, SentData, []})
+            ),
+            ok
+        end,
+        []
+    ),
+    connect_and_create_table(Config),
+    ok.

+ 1 - 1
apps/emqx_conf/test/emqx_conf_schema_tests.erl

@@ -236,7 +236,7 @@ log_rotation_count_limit_test() ->
             mismatches := #{"handler_name" :=
             #{kind := validation_error,
                 path := "log.file.default.rotation_count",
-                reason := #{expected_type := "1..128"},
+                reason := #{expected := "1..128"},
                 value := Count}
             }}]},
             hocon_tconf:generate(emqx_conf_schema, ConfMap0))

+ 41 - 1
apps/emqx_connector/src/emqx_connector_mysql.erl

@@ -222,6 +222,8 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) ->
                 {ok, NState} ->
                     %% return new state with prepared statements
                     {connected, NState};
+                {error, {undefined_table, NState}} ->
+                    {disconnected, NState, unhealthy_target};
                 {error, _Reason} ->
                     %% do not log error, it is logged in prepare_sql_to_conn
                     connecting
@@ -233,7 +235,37 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) ->
 do_get_status(Conn) ->
     ok == element(1, mysql:query(Conn, <<"SELECT count(1) AS T">>)).
 
-do_check_prepares(#{prepare_statement := Prepares}) when is_map(Prepares) ->
+do_check_prepares(
+    #{
+        pool_name := PoolName,
+        prepare_statement := #{send_message := SQL}
+    } = State
+) ->
+    % it's already connected. Verify if target table still exists
+    Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
+    lists:foldl(
+        fun
+            (WorkerPid, ok) ->
+                case ecpool_worker:client(WorkerPid) of
+                    {ok, Conn} ->
+                        case mysql:prepare(Conn, get_status, SQL) of
+                            {error, {1146, _, _}} ->
+                                {error, {undefined_table, State}};
+                            {ok, Statement} ->
+                                mysql:unprepare(Conn, Statement);
+                            _ ->
+                                ok
+                        end;
+                    _ ->
+                        ok
+                end;
+            (_, Acc) ->
+                Acc
+        end,
+        ok,
+        Workers
+    );
+do_check_prepares(#{prepare_statement := Statement}) when is_map(Statement) ->
     ok;
 do_check_prepares(State = #{pool_name := PoolName, prepare_statement := {error, Prepares}}) ->
     %% retry to prepare
@@ -241,6 +273,9 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_statement := {error,
         ok ->
             %% remove the error
             {ok, State#{prepare_statement => Prepares}};
+        {error, undefined_table} ->
+            %% indicate the error
+            {error, {undefined_table, State#{prepare_statement => {error, Prepares}}}};
         {error, Reason} ->
             {error, Reason}
     end.
@@ -320,6 +355,11 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList]) when is_pid(Conn) ->
         {ok, _Key} ->
             ?SLOG(info, LogMeta#{result => success}),
             prepare_sql_to_conn(Conn, PrepareList);
+        {error, {1146, _, _} = Reason} ->
+            %% Target table is not created
+            ?tp(mysql_undefined_table, #{}),
+            ?SLOG(error, LogMeta#{result => failed, reason => Reason}),
+            {error, undefined_table};
         {error, Reason} ->
             % FIXME: we should try to differ on transient failers and
             % syntax failures. Retrying syntax failures is not very productive.

+ 49 - 4
apps/emqx_connector/src/emqx_connector_pgsql.erl

@@ -238,6 +238,8 @@ on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
             case Reason of
                 ecpool_empty ->
                     {error, {recoverable_error, Reason}};
+                {error, error, _, undefined_table, _, _} ->
+                    {error, {unrecoverable_error, Reason}};
                 _ ->
                     Result
             end;
@@ -269,7 +271,10 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) ->
                 {ok, NState} ->
                     %% return new state with prepared statements
                     {connected, NState};
-                false ->
+                {error, {undefined_table, NState}} ->
+                    %% return new state indicating that we are connected but the target table is not created
+                    {disconnected, NState, unhealthy_target};
+                {error, _Reason} ->
                     %% do not log error, it is logged in prepare_sql_to_conn
                     connecting
             end;
@@ -280,6 +285,34 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) ->
 do_get_status(Conn) ->
     ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")).
 
+do_check_prepares(
+    #{
+        pool_name := PoolName,
+        prepare_sql := #{<<"send_message">> := SQL}
+    } = State
+) ->
+    % it's already connected. Verify if target table still exists
+    Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
+    lists:foldl(
+        fun
+            (WorkerPid, ok) ->
+                case ecpool_worker:client(WorkerPid) of
+                    {ok, Conn} ->
+                        case epgsql:parse2(Conn, "get_status", SQL, []) of
+                            {error, {_, _, _, undefined_table, _, _}} ->
+                                {error, {undefined_table, State}};
+                            _ ->
+                                ok
+                        end;
+                    _ ->
+                        ok
+                end;
+            (_, Acc) ->
+                Acc
+        end,
+        ok,
+        Workers
+    );
 do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) ->
     ok;
 do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) ->
@@ -288,8 +321,11 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepar
         {ok, Sts} ->
             %% remove the error
             {ok, State#{prepare_sql => Prepares, prepare_statement := Sts}};
-        _Error ->
-            false
+        {error, undefined_table} ->
+            %% indicate the error
+            {error, {undefined_table, State#{prepare_sql => {error, Prepares}}}};
+        Error ->
+            {error, Error}
     end.
 
 %% ===================================================================
@@ -373,7 +409,7 @@ init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName}) ->
                         msg => <<"PostgreSQL init prepare statement failed">>, error => Error
                     },
                     ?SLOG(error, LogMeta),
-                    %% mark the prepare_sqlas failed
+                    %% mark the prepare_sql as failed
                     State#{prepare_sql => {error, Prepares}}
             end
     end.
@@ -414,6 +450,11 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Co
     case epgsql:parse2(Conn, Key, SQL, []) of
         {ok, Statement} ->
             prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => Statement});
+        {error, {error, error, _, undefined_table, _, _} = Error} ->
+            %% Target table is not created
+            ?tp(pgsql_undefined_table, #{}),
+            ?SLOG(error, LogMeta#{msg => "PostgreSQL parse failed", error => Error}),
+            {error, undefined_table};
         {error, Error} = Other ->
             ?SLOG(error, LogMeta#{msg => "PostgreSQL parse failed", error => Error}),
             Other
@@ -424,6 +465,10 @@ to_bin(Bin) when is_binary(Bin) ->
 to_bin(Atom) when is_atom(Atom) ->
     erlang:atom_to_binary(Atom).
 
+handle_result({error, {recoverable_error, _Error}} = Res) ->
+    Res;
+handle_result({error, {unrecoverable_error, _Error}} = Res) ->
+    Res;
 handle_result({error, disconnected}) ->
     {error, {recoverable_error, disconnected}};
 handle_result({error, Error}) ->

+ 120 - 39
apps/emqx_oracle/src/emqx_oracle.erl

@@ -9,8 +9,6 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
--define(ORACLE_DEFAULT_PORT, 1521).
-
 %%====================================================================
 %% Exports
 %%====================================================================
@@ -26,7 +24,7 @@
 ]).
 
 %% callbacks for ecpool
--export([connect/1, prepare_sql_to_conn/2]).
+-export([connect/1, prepare_sql_to_conn/3]).
 
 %% Internal exports used to execute code with ecpool worker
 -export([
@@ -39,18 +37,15 @@
     oracle_host_options/0
 ]).
 
--define(ACTION_SEND_MESSAGE, send_message).
-
+-define(ORACLE_DEFAULT_PORT, 1521).
 -define(SYNC_QUERY_MODE, no_handover).
-
+-define(DEFAULT_POOL_SIZE, 8).
+-define(OPT_TIMEOUT, 30000).
+-define(MAX_CURSORS, 10).
 -define(ORACLE_HOST_OPTIONS, #{
     default_port => ?ORACLE_DEFAULT_PORT
 }).
 
--define(MAX_CURSORS, 10).
--define(DEFAULT_POOL_SIZE, 8).
--define(OPT_TIMEOUT, 30000).
-
 -type prepares() :: #{atom() => binary()}.
 -type params_tokens() :: #{atom() => list()}.
 
@@ -105,7 +100,7 @@ on_start(
     ],
     PoolName = InstId,
     Prepares = parse_prepare_sql(Config),
-    InitState = #{pool_name => PoolName, prepare_statement => #{}},
+    InitState = #{pool_name => PoolName},
     State = maps:merge(InitState, Prepares),
     case emqx_resource_pool:start(InstId, ?MODULE, Options) of
         ok ->
@@ -148,7 +143,7 @@ on_query(
 on_batch_query(
     InstId,
     BatchReq,
-    #{pool_name := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State
+    #{pool_name := PoolName, params_tokens := Tokens, prepare_sql := Sts} = State
 ) ->
     case BatchReq of
         [{Key, _} = Request | _] ->
@@ -241,7 +236,13 @@ on_get_status(_InstId, #{pool_name := Pool} = State) ->
                     connected;
                 {ok, NState} ->
                     %% return new state with prepared statements
-                    {connected, NState}
+                    {connected, NState};
+                {error, {undefined_table, NState}} ->
+                    %% return new state indicating that we are connected but the target table is not created
+                    {disconnected, NState, unhealthy_target};
+                {error, _Reason} ->
+                    %% do not log error, it is logged in prepare_sql_to_conn
+                    connecting
             end;
         false ->
             disconnected
@@ -250,11 +251,46 @@ on_get_status(_InstId, #{pool_name := Pool} = State) ->
 do_get_status(Conn) ->
     ok == element(1, jamdb_oracle:sql_query(Conn, "select 1 from dual")).
 
-do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) ->
-    ok;
-do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) ->
-    {ok, Sts} = prepare_sql(Prepares, PoolName),
-    {ok, State#{prepare_sql => Prepares, prepare_statement := Sts}}.
+do_check_prepares(
+    #{
+        pool_name := PoolName,
+        prepare_sql := #{<<"send_message">> := SQL},
+        params_tokens := #{<<"send_message">> := Tokens}
+    } = State
+) ->
+    % it's already connected. Verify if target table still exists
+    Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
+    lists:foldl(
+        fun
+            (WorkerPid, ok) ->
+                case ecpool_worker:client(WorkerPid) of
+                    {ok, Conn} ->
+                        case check_if_table_exists(Conn, SQL, Tokens) of
+                            {error, undefined_table} -> {error, {undefined_table, State}};
+                            _ -> ok
+                        end;
+                    _ ->
+                        ok
+                end;
+            (_, Acc) ->
+                Acc
+        end,
+        ok,
+        Workers
+    );
+do_check_prepares(
+    State = #{pool_name := PoolName, prepare_sql := {error, Prepares}, params_tokens := TokensMap}
+) ->
+    case prepare_sql(Prepares, PoolName, TokensMap) of
+        %% remove the error
+        {ok, Sts} ->
+            {ok, State#{prepare_sql => Sts}};
+        {error, undefined_table} ->
+            %% indicate the error
+            {error, {undefined_table, State#{prepare_sql => {error, Prepares}}}};
+        {error, _Reason} = Error ->
+            Error
+    end.
 
 %% ===================================================================
 
@@ -312,36 +348,81 @@ parse_prepare_sql([], Prepares, Tokens) ->
         params_tokens => Tokens
     }.
 
-init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName}) ->
-    {ok, Sts} = prepare_sql(Prepares, PoolName),
-    State#{prepare_statement := Sts}.
+init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName, params_tokens := TokensMap}) ->
+    case prepare_sql(Prepares, PoolName, TokensMap) of
+        {ok, Sts} ->
+            State#{prepare_sql := Sts};
+        Error ->
+            LogMeta = #{
+                msg => <<"Oracle Database init prepare statement failed">>, error => Error
+            },
+            ?SLOG(error, LogMeta),
+            %% mark the prepare_sql as failed
+            State#{prepare_sql => {error, Prepares}}
+    end.
 
-prepare_sql(Prepares, PoolName) when is_map(Prepares) ->
-    prepare_sql(maps:to_list(Prepares), PoolName);
-prepare_sql(Prepares, PoolName) ->
-    Data = do_prepare_sql(Prepares, PoolName),
-    {ok, _Sts} = Data,
-    ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}),
-    Data.
+prepare_sql(Prepares, PoolName, TokensMap) when is_map(Prepares) ->
+    prepare_sql(maps:to_list(Prepares), PoolName, TokensMap);
+prepare_sql(Prepares, PoolName, TokensMap) ->
+    case do_prepare_sql(Prepares, PoolName, TokensMap) of
+        {ok, _Sts} = Ok ->
+            %% prepare for reconnect
+            ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}),
+            Ok;
+        Error ->
+            Error
+    end.
 
-do_prepare_sql(Prepares, PoolName) ->
-    do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, #{}).
+do_prepare_sql(Prepares, PoolName, TokensMap) ->
+    do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, TokensMap, #{}).
 
-do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) ->
+do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, TokensMap, _LastSts) ->
     {ok, Conn} = ecpool_worker:client(Worker),
-    {ok, Sts} = prepare_sql_to_conn(Conn, Prepares),
-    do_prepare_sql(T, Prepares, PoolName, Sts);
-do_prepare_sql([], _Prepares, _PoolName, LastSts) ->
+    case prepare_sql_to_conn(Conn, Prepares, TokensMap) of
+        {ok, Sts} ->
+            do_prepare_sql(T, Prepares, PoolName, TokensMap, Sts);
+        Error ->
+            Error
+    end;
+do_prepare_sql([], _Prepares, _PoolName, _TokensMap, LastSts) ->
     {ok, LastSts}.
 
-prepare_sql_to_conn(Conn, Prepares) ->
-    prepare_sql_to_conn(Conn, Prepares, #{}).
+prepare_sql_to_conn(Conn, Prepares, TokensMap) ->
+    prepare_sql_to_conn(Conn, Prepares, TokensMap, #{}).
 
-prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements};
-prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) ->
+prepare_sql_to_conn(Conn, [], _TokensMap, Statements) when is_pid(Conn) -> {ok, Statements};
+prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], TokensMap, Statements) when is_pid(Conn) ->
     LogMeta = #{msg => "Oracle Database Prepare Statement", name => Key, prepare_sql => SQL},
+    Tokens = maps:get(Key, TokensMap, []),
     ?SLOG(info, LogMeta),
-    prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => SQL}).
+    case check_if_table_exists(Conn, SQL, Tokens) of
+        ok ->
+            ?SLOG(info, LogMeta#{result => success}),
+            prepare_sql_to_conn(Conn, PrepareList, TokensMap, Statements#{Key => SQL});
+        {error, undefined_table} = Error ->
+            %% Target table is not created
+            ?SLOG(error, LogMeta#{result => failed, reason => "table does not exist"}),
+            ?tp(oracle_undefined_table, #{}),
+            Error;
+        Error ->
+            Error
+    end.
+
+check_if_table_exists(Conn, SQL, Tokens) ->
+    {Event, _Headers} = emqx_rule_events:eventmsg_publish(
+        emqx_message:make(<<"t/opic">>, "test query")
+    ),
+    SqlQuery = "begin " ++ binary_to_list(SQL) ++ "; rollback; end;",
+    Params = emqx_placeholder:proc_sql(Tokens, Event),
+    case jamdb_oracle:sql_query(Conn, {SqlQuery, Params}) of
+        {ok, [{proc_result, 0, _Description}]} ->
+            ok;
+        {ok, [{proc_result, 6550, _Description}]} ->
+            %% Target table is not created
+            {error, undefined_table};
+        Reason ->
+            {error, Reason}
+    end.
 
 to_bin(Bin) when is_binary(Bin) ->
     Bin;

+ 8 - 6
apps/emqx_resource/src/emqx_resource.erl

@@ -278,20 +278,22 @@ query(ResId, Request) ->
     Result :: term().
 query(ResId, Request, Opts) ->
     case emqx_resource_manager:lookup_cached(ResId) of
-        {ok, _Group, #{query_mode := QM}} ->
-            case QM of
-                simple_async ->
+        {ok, _Group, #{query_mode := QM, error := Error}} ->
+            case {QM, Error} of
+                {_, unhealthy_target} ->
+                    ?RESOURCE_ERROR(unhealthy_target, "unhealthy target");
+                {simple_async, _} ->
                     %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
                     %% so the buffer worker does not need to lookup the cache again
                     Opts1 = Opts#{is_buffer_supported => true},
                     emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1);
-                simple_sync ->
+                {simple_sync, _} ->
                     %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
                     %% so the buffer worker does not need to lookup the cache again
                     emqx_resource_buffer_worker:simple_sync_query(ResId, Request);
-                sync ->
+                {sync, _} ->
                     emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
-                async ->
+                {async, _} ->
                     emqx_resource_buffer_worker:async_query(ResId, Request, Opts)
             end;
         {error, not_found} ->

+ 19 - 0
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -927,6 +927,7 @@ merge_counters(OldCounters, DeltaCounters) ->
 flush_metrics(Data = #{id := Id, counters := Counters}) ->
     bump_counters(Id, Counters),
     set_gauges(Data),
+    log_expired_message_count(Data),
     ensure_metrics_flush_timer(Data#{counters := #{}}).
 
 -spec ensure_metrics_flush_timer(data()) -> data().
@@ -966,6 +967,22 @@ do_bump_counters1(dropped_resource_not_found, Val, Id) ->
 do_bump_counters1(dropped_resource_stopped, Val, Id) ->
     emqx_resource_metrics:dropped_resource_stopped_inc(Id, Val).
 
+-spec log_expired_message_count(data()) -> ok.
+log_expired_message_count(_Data = #{id := Id, index := Index, counters := Counters}) ->
+    ExpiredCount = maps:get(dropped_expired, Counters, 0),
+    case ExpiredCount > 0 of
+        false ->
+            ok;
+        true ->
+            ?SLOG(info, #{
+                msg => "buffer_worker_dropped_expired_messages",
+                resource_id => Id,
+                worker_index => Index,
+                expired_count => ExpiredCount
+            }),
+            ok
+    end.
+
 -spec set_gauges(data()) -> ok.
 set_gauges(_Data = #{id := Id, index := Index, queue := Q, inflight_tid := InflightTID}) ->
     emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)),
@@ -985,6 +1002,8 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
     case emqx_resource_manager:lookup_cached(Id) of
         {ok, _Group, #{status := stopped}} ->
             ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
+        {ok, _Group, #{status := connecting, error := unhealthy_target}} ->
+            {error, {unrecoverable_error, unhealthy_target}};
         {ok, _Group, Resource} ->
             do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource);
         {error, not_found} ->

+ 1 - 1
apps/emqx_resource/test/emqx_resource_schema_tests.erl

@@ -97,7 +97,7 @@ worker_pool_size_test_() ->
             {_, [
                 #{
                     kind := validation_error,
-                    reason := #{expected_type := _},
+                    reason := #{expected := _},
                     value := WorkerPoolSize
                 }
             ]},

+ 1 - 0
changes/ce/feat-11115.en.md

@@ -0,0 +1 @@
+Added info logs to indicate when buffered messages are dropped due to time-to-live (TTL) expiration.

+ 1 - 0
changes/ce/fix-11118.en.md

@@ -0,0 +1 @@
+Ensure that validation errors in REST API responses are slightly less confusing. Now, if there are out-of-range errors, they will be presented as `{"value": 42, "reason": {"expected": "1..10"}, ...}`, replacing the previous usage of `expected_type` with `expected`.

+ 1 - 0
changes/ee/fix-10645.en.md

@@ -0,0 +1 @@
+Changes health check for Oracle Database, PostgreSql, MySql and Kafka Producer data bridges to ensure target table/topic exists.

+ 71 - 0
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl

@@ -110,6 +110,7 @@ end_per_suite(_Config) ->
     ok.
 
 init_per_testcase(_Testcase, Config) ->
+    connect_and_create_table(Config),
     connect_and_clear_table(Config),
     delete_bridge(Config),
     snabbkaffe:start_trace(),
@@ -241,6 +242,12 @@ query_resource(Config, Request) ->
     ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
     emqx_resource:query(ResourceID, Request, #{timeout => 500}).
 
+sync_query_resource(Config, Request) ->
+    Name = ?config(mysql_name, Config),
+    BridgeType = ?config(mysql_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+    emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request).
+
 query_resource_async(Config, Request) ->
     Name = ?config(mysql_name, Config),
     BridgeType = ?config(mysql_bridge_type, Config),
@@ -480,6 +487,7 @@ t_write_timeout(Config) ->
     ProxyHost = ?config(proxy_host, Config),
     QueryMode = ?config(query_mode, Config),
     {ok, _} = create_bridge(Config),
+    connect_and_create_table(Config),
     Val = integer_to_binary(erlang:unique_integer()),
     SentData = #{payload => Val, timestamp => 1668602148000},
     Timeout = 1000,
@@ -641,6 +649,7 @@ t_workload_fits_prepared_statement_limit(Config) ->
     ).
 
 t_unprepared_statement_query(Config) ->
+    ok = connect_and_create_table(Config),
     ?assertMatch(
         {ok, _},
         create_bridge(Config)
@@ -668,6 +677,7 @@ t_unprepared_statement_query(Config) ->
 %% Test doesn't work with batch enabled since batch doesn't use
 %% prepared statements as such; it has its own query generation process
 t_uninitialized_prepared_statement(Config) ->
+    connect_and_create_table(Config),
     ?assertMatch(
         {ok, _},
         create_bridge(Config)
@@ -705,3 +715,64 @@ t_uninitialized_prepared_statement(Config) ->
         end
     ),
     ok.
+
+t_missing_table(Config) ->
+    Name = ?config(mysql_name, Config),
+    BridgeType = ?config(mysql_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+
+    ?check_trace(
+        begin
+            connect_and_drop_table(Config),
+            ?assertMatch({ok, _}, create_bridge(Config)),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertMatch(
+                    {ok, Status} when Status == connecting orelse Status == disconnected,
+                    emqx_resource_manager:health_check(ResourceID)
+                )
+            ),
+            Val = integer_to_binary(erlang:unique_integer()),
+            SentData = #{payload => Val, timestamp => 1668602148000},
+            Timeout = 1000,
+            ?assertMatch(
+                {error, {resource_error, #{reason := unhealthy_target}}},
+                query_resource(Config, {send_message, SentData, [], Timeout})
+            ),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertMatch([_, _, _], ?of_kind(mysql_undefined_table, Trace)),
+            ok
+        end
+    ).
+
+t_table_removed(Config) ->
+    Name = ?config(mysql_name, Config),
+    BridgeType = ?config(mysql_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+    ?check_trace(
+        begin
+            connect_and_create_table(Config),
+            ?assertMatch({ok, _}, create_bridge(Config)),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
+            ),
+            connect_and_drop_table(Config),
+            Val = integer_to_binary(erlang:unique_integer()),
+            SentData = #{payload => Val, timestamp => 1668602148000},
+            Timeout = 1000,
+            ?assertMatch(
+                {error,
+                    {unrecoverable_error,
+                        {1146, <<"42S02">>, <<"Table 'mqtt.mqtt_test' doesn't exist">>}}},
+                sync_query_resource(Config, {send_message, SentData, [], Timeout})
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.

+ 1 - 1
lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_serde_SUITE.erl

@@ -109,7 +109,7 @@ t_avro_invalid_json_schema(_Config) ->
     Params = schema_params(avro),
     WrongParams = Params#{source := <<"{">>},
     ?assertMatch(
-        {error, #{reason := #{expected_type := _}}},
+        {error, #{reason := #{expected := _}}},
         emqx_ee_schema_registry:add_schema(SerdeName, WrongParams)
     ),
     ok.

+ 5 - 3
mix.exs

@@ -72,7 +72,7 @@ defmodule EMQXUmbrella.MixProject do
       # in conflict by emqtt and hocon
       {:getopt, "1.0.2", override: true},
       {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true},
-      {:hocon, github: "emqx/hocon", tag: "0.39.9", override: true},
+      {:hocon, github: "emqx/hocon", tag: "0.39.10", override: true},
       {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true},
       {:esasl, github: "emqx/esasl", tag: "0.2.0"},
       {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},
@@ -95,7 +95,9 @@ defmodule EMQXUmbrella.MixProject do
        github: "emqx/ranch", ref: "de8ba2a00817c0a6eb1b8f20d6fb3e44e2c9a5aa", override: true},
       # in conflict by grpc and eetcd
       {:gpb, "4.19.7", override: true, runtime: false},
-      {:hackney, github: "emqx/hackney", tag: "1.18.1-1", override: true}
+      {:hackney, github: "emqx/hackney", tag: "1.18.1-1", override: true},
+      # set by hackney (dependency)
+      {:ssl_verify_fun, "1.1.6", override: true}
     ] ++
       emqx_apps(profile_info, version) ++
       enterprise_deps(profile_info) ++ bcrypt_dep() ++ jq_dep() ++ quicer_dep()
@@ -194,7 +196,7 @@ defmodule EMQXUmbrella.MixProject do
     [
       {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"},
       {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.11", override: true},
-      {:wolff, github: "kafka4beam/wolff", tag: "1.7.5"},
+      {:wolff, github: "kafka4beam/wolff", tag: "1.7.6"},
       {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true},
       {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0"},
       {:brod, github: "kafka4beam/brod", tag: "3.16.8"},

+ 1 - 1
rebar.config

@@ -75,7 +75,7 @@
     , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
     , {getopt, "1.0.2"}
     , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}
-    , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.9"}}}
+    , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.10"}}}
     , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}
     , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
     , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}