Просмотр исходного кода

feat(postgresql): check whether target table exists

Fixes https://emqx.atlassian.net/browse/EMQX-9026
Paulo Zulato 2 лет назад
Родитель
Сommit
9454af9a8b

+ 67 - 0
apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl

@@ -257,6 +257,12 @@ query_resource(Config, Request) ->
     ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
     emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
 
+query_resource_sync(Config, Request) ->
+    Name = ?config(pgsql_name, Config),
+    BridgeType = ?config(pgsql_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+    emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request).
+
 query_resource_async(Config, Request) ->
     query_resource_async(Config, Request, _Opts = #{}).
 
@@ -634,3 +640,64 @@ t_nasty_sql_string(Config) ->
             1_000
         ),
     ?assertEqual(Payload, connect_and_get_payload(Config)).
+
+t_missing_table(Config) ->
+    Name = ?config(pgsql_name, Config),
+    BridgeType = ?config(pgsql_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+
+    ?check_trace(
+        begin
+            connect_and_drop_table(Config),
+            ?assertMatch({ok, _}, create_bridge(Config)),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertMatch(
+                    {ok, Status} when Status == connecting orelse Status == disconnected,
+                    emqx_resource_manager:health_check(ResourceID)
+                )
+            ),
+            Val = integer_to_binary(erlang:unique_integer()),
+            SentData = #{payload => Val, timestamp => 1668602148000},
+            Timeout = 1000,
+            ?assertMatch(
+                {error, {resource_error, #{reason := unhealthy_target}}},
+                query_resource(Config, {send_message, SentData, [], Timeout})
+            ),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertMatch([_, _, _], ?of_kind(pgsql_undefined_table, Trace)),
+            ok
+        end
+    ),
+    connect_and_create_table(Config),
+    ok.
+
+t_table_removed(Config) ->
+    Name = ?config(pgsql_name, Config),
+    BridgeType = ?config(pgsql_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+    ?check_trace(
+        begin
+            connect_and_create_table(Config),
+            ?assertMatch({ok, _}, create_bridge(Config)),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
+            ),
+            connect_and_drop_table(Config),
+            Val = integer_to_binary(erlang:unique_integer()),
+            SentData = #{payload => Val, timestamp => 1668602148000},
+            ?assertMatch(
+                {error, {unrecoverable_error, {error, error, <<"42P01">>, undefined_table, _, _}}},
+                query_resource_sync(Config, {send_message, SentData, []})
+            ),
+            ok
+        end,
+        []
+    ),
+    connect_and_create_table(Config),
+    ok.

+ 43 - 2
apps/emqx_connector/src/emqx_connector_pgsql.erl

@@ -238,6 +238,8 @@ on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
             case Reason of
                 ecpool_empty ->
                     {error, {recoverable_error, Reason}};
+                {error, error, _, undefined_table, _, _} ->
+                    {error, {unrecoverable_error, Reason}};
                 _ ->
                     Result
             end;
@@ -271,7 +273,10 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) ->
                     {connected, NState};
                 false ->
                     %% do not log error, it is logged in prepare_sql_to_conn
-                    connecting
+                    connecting;
+                {undefined_table, NState} ->
+                    %% return new state indicating that we are connected but the target table is not created
+                    {disconnected, NState, unhealthy_target}
             end;
         false ->
             connecting
@@ -280,6 +285,30 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) ->
 do_get_status(Conn) ->
     ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")).
 
+do_check_prepares(
+    #{
+        pool_name := PoolName,
+        prepare_sql := #{<<"send_message">> := SQL}
+    } = State
+) ->
+    % it's already connected. Verify if target table still exists
+    Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
+    lists:foldl(
+        fun
+            (WorkerPid, ok) ->
+                {ok, Conn} = ecpool_worker:client(WorkerPid),
+                case epgsql:parse2(Conn, "get_status", SQL, []) of
+                    {error, {_, _, _, undefined_table, _, _}} ->
+                        {undefined_table, State};
+                    _ ->
+                        ok
+                end;
+            (_, Acc) ->
+                Acc
+        end,
+        ok,
+        Workers
+    );
 do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) ->
     ok;
 do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) ->
@@ -288,6 +317,9 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepar
         {ok, Sts} ->
             %% remove the error
             {ok, State#{prepare_sql => Prepares, prepare_statement := Sts}};
+        {error, undefined_table} ->
+            %% indicate the error
+            {undefined_table, State#{prepare_sql => {error, Prepares}}};
         _Error ->
             false
     end.
@@ -373,7 +405,7 @@ init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName}) ->
                         msg => <<"PostgreSQL init prepare statement failed">>, error => Error
                     },
                     ?SLOG(error, LogMeta),
-                    %% mark the prepare_sqlas failed
+                    %% mark the prepare_sql as failed
                     State#{prepare_sql => {error, Prepares}}
             end
     end.
@@ -414,6 +446,11 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Co
     case epgsql:parse2(Conn, Key, SQL, []) of
         {ok, Statement} ->
             prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => Statement});
+        {error, {error, error, _, undefined_table, _, _} = Error} ->
+            %% Target table is not created
+            ?tp(pgsql_undefined_table, #{}),
+            ?SLOG(error, LogMeta#{msg => "PostgreSQL parse failed", error => Error}),
+            {error, undefined_table};
         {error, Error} = Other ->
             ?SLOG(error, LogMeta#{msg => "PostgreSQL parse failed", error => Error}),
             Other
@@ -424,6 +461,10 @@ to_bin(Bin) when is_binary(Bin) ->
 to_bin(Atom) when is_atom(Atom) ->
     erlang:atom_to_binary(Atom).
 
+handle_result({error, {recoverable_error, _Error}} = Res) ->
+    Res;
+handle_result({error, {unrecoverable_error, _Error}} = Res) ->
+    Res;
 handle_result({error, disconnected}) ->
     {error, {recoverable_error, disconnected}};
 handle_result({error, Error}) ->

+ 8 - 6
apps/emqx_resource/src/emqx_resource.erl

@@ -278,20 +278,22 @@ query(ResId, Request) ->
     Result :: term().
 query(ResId, Request, Opts) ->
     case emqx_resource_manager:lookup_cached(ResId) of
-        {ok, _Group, #{query_mode := QM}} ->
-            case QM of
-                simple_async ->
+        {ok, _Group, #{query_mode := QM, error := Error}} ->
+            case {QM, Error} of
+                {_, unhealthy_target} ->
+                    ?RESOURCE_ERROR(unhealthy_target, "unhealthy target");
+                {simple_async, _} ->
                     %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
                     %% so the buffer worker does not need to lookup the cache again
                     Opts1 = Opts#{is_buffer_supported => true},
                     emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1);
-                simple_sync ->
+                {simple_sync, _} ->
                     %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
                     %% so the buffer worker does not need to lookup the cache again
                     emqx_resource_buffer_worker:simple_sync_query(ResId, Request);
-                sync ->
+                {sync, _} ->
                     emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
-                async ->
+                {async, _} ->
                     emqx_resource_buffer_worker:async_query(ResId, Request, Opts)
             end;
         {error, not_found} ->

+ 2 - 0
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -985,6 +985,8 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
     case emqx_resource_manager:lookup_cached(Id) of
         {ok, _Group, #{status := stopped}} ->
             ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
+        {ok, _Group, #{status := connecting, error := unhealthy_target}} ->
+            {error, {unrecoverable_error, unhealthy_target}};
         {ok, _Group, Resource} ->
             do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource);
         {error, not_found} ->