Просмотр исходного кода

Merge pull request #13173 from kjellwinblad/kjell/pgsql_timestamp_encode_etc/EEC-1036

fix(pgsql connector): handle prepared statement already exists
Kjell Winblad 1 год назад
Родитель
Сommit
9d22089104

+ 82 - 0
apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl

@@ -135,6 +135,7 @@ end_per_testcase(_Testcase, Config) ->
     connect_and_clear_table(Config),
     ok = snabbkaffe:stop(),
     delete_bridge(Config),
+    emqx_common_test_helpers:call_janitor(),
     ok.
 
 %%------------------------------------------------------------------------------
@@ -715,6 +716,87 @@ t_missing_table(Config) ->
     connect_and_create_table(Config),
     ok.
 
+%% We test that we can handle when the prepared statement with the channel
+%% name already exists in the connection instance when we try to make a new
+%% prepared statement. It is unknown in which scenario this can happen but it
+%% has been observed in a production log file.
+%% See:
+%% https://emqx.atlassian.net/browse/EEC-1036
+t_prepared_statement_exists(Config) ->
+    Name = ?config(pgsql_name, Config),
+    BridgeType = ?config(pgsql_bridge_type, Config),
+    emqx_common_test_helpers:on_exit(fun() ->
+        meck:unload()
+    end),
+    MeckOpts = [passthrough, no_link, no_history, non_strict],
+    meck:new(emqx_postgresql, MeckOpts),
+    InsertPrepStatementDupAndThenRemoveMeck =
+        fun(Conn, Key, SQL, List) ->
+            meck:passthrough([Conn, Key, SQL, List]),
+            meck:delete(
+                epgsql,
+                parse2,
+                4
+            ),
+            meck:passthrough([Conn, Key, SQL, List])
+        end,
+    meck:expect(
+        epgsql,
+        parse2,
+        InsertPrepStatementDupAndThenRemoveMeck
+    ),
+    %% We should recover if the prepared statement name already exists in the
+    %% driver
+    ?check_trace(
+        begin
+            ?assertMatch({ok, _}, create_bridge(Config)),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertMatch(
+                    #{status := Status} when Status == connected,
+                    emqx_bridge_v2:health_check(BridgeType, Name)
+                )
+            ),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertMatch([_ | _], ?of_kind(pgsql_prepared_statement_exists, Trace)),
+            ok
+        end
+    ),
+    InsertPrepStatementDup =
+        fun(Conn, Key, SQL, List) ->
+            meck:passthrough([Conn, Key, SQL, List]),
+            meck:passthrough([Conn, Key, SQL, List])
+        end,
+    meck:expect(
+        epgsql,
+        parse2,
+        InsertPrepStatementDup
+    ),
+    %% We should get status disconnected if removing already existing statment don't help
+    ?check_trace(
+        begin
+            ?assertMatch({ok, _}, create_bridge(Config)),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertMatch(
+                    #{status := Status} when Status == disconnected,
+                    emqx_bridge_v2:health_check(BridgeType, Name)
+                )
+            ),
+            snabbkaffe_nemesis:cleanup(),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertMatch([_ | _], ?of_kind(pgsql_prepared_statement_exists, Trace)),
+            ok
+        end
+    ),
+    ok.
+
 t_table_removed(Config) ->
     Name = ?config(pgsql_name, Config),
     BridgeType = ?config(pgsql_bridge_type, Config),

+ 43 - 6
apps/emqx_postgresql/src/emqx_postgresql.erl

@@ -268,7 +268,9 @@ close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) ->
 
 close_prepared_statement([WorkerPid | Rest], ChannelId, State) ->
     %% We ignore errors since any error probably means that the
-    %% prepared statement doesn't exist.
+    %% prepared statement doesn't exist. If it exists when we try
+    %% to insert one with the same name, we will try to remove it
+    %% again anyway.
     try ecpool_worker:client(WorkerPid) of
         {ok, Conn} ->
             Statement = get_templated_statement(ChannelId, State),
@@ -689,17 +691,21 @@ do_prepare_sql([], _Prepares, LastSts) ->
     {ok, LastSts}.
 
 prepare_sql_to_conn(Conn, Prepares) ->
-    prepare_sql_to_conn(Conn, Prepares, #{}).
+    prepare_sql_to_conn(Conn, Prepares, #{}, 0).
 
-prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) ->
+prepare_sql_to_conn(Conn, [], Statements, _Attempts) when is_pid(Conn) ->
     {ok, Statements};
-prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when is_pid(Conn) ->
+prepare_sql_to_conn(Conn, [{_Key, _} | _Rest], _Statements, _MaxAttempts = 2) when is_pid(Conn) ->
+    failed_to_remove_prev_prepared_statement_error();
+prepare_sql_to_conn(
+    Conn, [{Key, {SQL, _RowTemplate}} | Rest] = ToPrepare, Statements, Attempts
+) when is_pid(Conn) ->
     LogMeta = #{msg => "postgresql_prepare_statement", name => Key, sql => SQL},
     ?SLOG(info, LogMeta),
     case epgsql:parse2(Conn, Key, SQL, []) of
         {ok, Statement} ->
-            prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement});
-        {error, {error, error, _, undefined_table, _, _} = Error} ->
+            prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}, 0);
+        {error, #error{severity = error, codename = undefined_table} = Error} ->
             %% Target table is not created
             ?tp(pgsql_undefined_table, #{}),
             LogMsg =
@@ -709,6 +715,30 @@ prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when
                 ),
             ?SLOG(error, LogMsg),
             {error, undefined_table};
+        {error, #error{severity = error, codename = duplicate_prepared_statement}} = Error ->
+            ?tp(pgsql_prepared_statement_exists, #{}),
+            LogMsg =
+                maps:merge(
+                    LogMeta#{
+                        msg => "postgresql_prepared_statment_with_same_name_already_exists",
+                        explain => <<
+                            "A prepared statement with the same name already "
+                            "exists in the driver. Will attempt to remove the "
+                            "previous prepared statement with the name and then "
+                            "try again."
+                        >>
+                    },
+                    translate_to_log_context(Error)
+                ),
+            ?SLOG(warning, LogMsg),
+            case epgsql:close(Conn, statement, Key) of
+                ok ->
+                    ?SLOG(info, #{msg => "pqsql_closed_statement_successfully"}),
+                    prepare_sql_to_conn(Conn, ToPrepare, Statements, Attempts + 1);
+                {error, CloseError} ->
+                    ?SLOG(error, #{msg => "pqsql_close_statement_failed", cause => CloseError}),
+                    failed_to_remove_prev_prepared_statement_error()
+            end;
         {error, Error} ->
             TranslatedError = translate_to_log_context(Error),
             LogMsg =
@@ -720,6 +750,13 @@ prepare_sql_to_conn(Conn, [{Key, {SQL, _RowTemplate}} | Rest], Statements) when
             {error, export_error(TranslatedError)}
     end.
 
+failed_to_remove_prev_prepared_statement_error() ->
+    Msg =
+        ("A previous prepared statement for the action already exists "
+        "but cannot be closed. Please, try to disable and then enable "
+        "the connector to resolve this issue."),
+    {error, unicode:characters_to_binary(Msg)}.
+
 to_bin(Bin) when is_binary(Bin) ->
     Bin;
 to_bin(Atom) when is_atom(Atom) ->