Forráskód Böngészése

fix(postgres_bridge): fix table existence check and handle sync_required

Fixes https://emqx.atlassian.net/browse/EMQX-10629

During health checking, we check whether tables in the SQL statement
exist.  Such check was done by asking the backend to parse the
statement using a named prepared statements.  Concurrent health checks
could then result in the error:

```erlang
{error,{error,error,<<"42P05">>,duplicate_prepared_statement,<<"prepared statement \"get_status\" already exists">>,[{file,<<"prepare.c">>},{line,<<"451">>},{routine,<<"StorePreparedStatement">>},{severity,<<"ERROR">>}]}}
```

This could lead to an inconsistent state in the driver process, which
would crash later when a message from the backend (`READY_FOR_QUERY`, "idle"):

```
  2023-07-24T13:05:58.892043+00:00 [error] Generic server <0.2134.0> terminating. Reason: {'module could not be loaded',[{undefined,handle_message,[90,<<"I">>,...
```

Added calls to `epgsql:sync/1` for functions that could return
`{error, sync_required}`.

Also, redundant calls to `parse2` were removed to reduce the number of requests.
Thales Macedo Garitezi 2 éve
szülő
commit
7a16ff4f04

+ 28 - 0
apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl

@@ -700,3 +700,31 @@ t_table_removed(Config) ->
     ),
     connect_and_create_table(Config),
     ok.
+
+t_concurrent_health_checks(Config) ->
+    Name = ?config(pgsql_name, Config),
+    BridgeType = ?config(pgsql_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+    ?check_trace(
+        begin
+            connect_and_create_table(Config),
+            ?assertMatch({ok, _}, create_bridge(Config)),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
+            ),
+            emqx_utils:pmap(
+                fun(_) ->
+                    ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
+                end,
+                lists:seq(1, 20)
+            ),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertEqual([], ?of_kind(postgres_connector_bad_parse2, Trace)),
+            ok
+        end
+    ),
+    ok.

+ 59 - 25
apps/emqx_connector/src/emqx_connector_pgsql.erl

@@ -62,6 +62,11 @@
         prepare_statement := epgsql:statement()
     }.
 
+%% FIXME: add `{error, sync_required}' to `epgsql:execute_batch'
+%% We want to be able to call sync if any message from the backend leaves the driver in an
+%% inconsistent state needing sync.
+-dialyzer({nowarn_function, [execute_batch/3]}).
+
 %%=====================================================================
 
 roots() ->
@@ -252,6 +257,8 @@ on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
                 reason => Reason
             }),
             case Reason of
+                sync_required ->
+                    {error, {recoverable_error, Reason}};
                 ecpool_empty ->
                     {error, {recoverable_error, Reason}};
                 {error, error, _, undefined_table, _, _} ->
@@ -307,28 +314,13 @@ do_check_prepares(
         prepare_sql := #{<<"send_message">> := SQL}
     } = State
 ) ->
-    % it's already connected. Verify if target table still exists
-    Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
-    lists:foldl(
-        fun
-            (WorkerPid, ok) ->
-                case ecpool_worker:client(WorkerPid) of
-                    {ok, Conn} ->
-                        case epgsql:parse2(Conn, "get_status", SQL, []) of
-                            {error, {_, _, _, undefined_table, _, _}} ->
-                                {error, {undefined_table, State}};
-                            _ ->
-                                ok
-                        end;
-                    _ ->
-                        ok
-                end;
-            (_, Acc) ->
-                Acc
-        end,
-        ok,
-        Workers
-    );
+    WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
+    case validate_table_existence(WorkerPids, SQL) of
+        ok ->
+            ok;
+        {error, undefined_table} ->
+            {error, {undefined_table, State}}
+    end;
 do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) ->
     ok;
 do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) ->
@@ -344,6 +336,30 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepar
             {error, Error}
     end.
 
+-spec validate_table_existence([pid()], binary()) -> ok | {error, undefined_table}.
+validate_table_existence([WorkerPid | Rest], SQL) ->
+    try ecpool_worker:client(WorkerPid) of
+        {ok, Conn} ->
+            case epgsql:parse2(Conn, "", SQL, []) of
+                {error, {_, _, _, undefined_table, _, _}} ->
+                    {error, undefined_table};
+                Res when is_tuple(Res) andalso ok == element(1, Res) ->
+                    ok;
+                Res ->
+                    ?tp(postgres_connector_bad_parse2, #{result => Res}),
+                    validate_table_existence(Rest, SQL)
+            end;
+        _ ->
+            validate_table_existence(Rest, SQL)
+    catch
+        exit:{noproc, _} ->
+            validate_table_existence(Rest, SQL)
+    end;
+validate_table_existence([], _SQL) ->
+    %% All workers either replied an unexpected error; we will retry
+    %% on the next health check.
+    ok.
+
 %% ===================================================================
 
 connect(Opts) ->
@@ -358,13 +374,31 @@ connect(Opts) ->
     end.
 
 query(Conn, SQL, Params) ->
-    epgsql:equery(Conn, SQL, Params).
+    case epgsql:equery(Conn, SQL, Params) of
+        {error, sync_required} = Res ->
+            ok = epgsql:sync(Conn),
+            Res;
+        Res ->
+            Res
+    end.
 
 prepared_query(Conn, Name, Params) ->
-    epgsql:prepared_query2(Conn, Name, Params).
+    case epgsql:prepared_query2(Conn, Name, Params) of
+        {error, sync_required} = Res ->
+            ok = epgsql:sync(Conn),
+            Res;
+        Res ->
+            Res
+    end.
 
 execute_batch(Conn, Statement, Params) ->
-    epgsql:execute_batch(Conn, Statement, Params).
+    case epgsql:execute_batch(Conn, Statement, Params) of
+        {error, sync_required} = Res ->
+            ok = epgsql:sync(Conn),
+            Res;
+        Res ->
+            Res
+    end.
 
 conn_opts(Opts) ->
     conn_opts(Opts, []).

+ 1 - 0
changes/ee/fix-11338.en.md

@@ -0,0 +1 @@
+Fixed an issue where the PostgreSQL bridge connection could crash under high message rates.