소스 검색

Merge pull request #13901 from zmstone/0930-fix-prepared-statement-name-clash

0930 fix posgres prepared statement name clash
zmstone 1 년 전
부모
커밋
e4f07f92de

+ 50 - 0
apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl

@@ -338,3 +338,53 @@ t_disable_prepared_statements(Config0) ->
     emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
     ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
     ok.
+
+t_update_with_invalid_prepare(Config) ->
+    ConnectorName = ?config(connector_name, Config),
+    BridgeName = ?config(bridge_name, Config),
+    {ok, _} = emqx_bridge_v2_testlib:create_bridge_api(Config),
+    %% arrivedx is a bad column name
+    BadSQL = <<
+        "INSERT INTO mqtt_test(payload, arrivedx) "
+        "VALUES (${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
+    >>,
+    Override = #{<<"parameters">> => #{<<"sql">> => BadSQL}},
+    {ok, {{_, 200, "OK"}, _Headers1, Body1}} =
+        emqx_bridge_v2_testlib:update_bridge_api(Config, Override),
+    ?assertMatch(#{<<"status">> := <<"disconnected">>}, Body1),
+    Error1 = maps:get(<<"error">>, Body1),
+    case re:run(Error1, <<"undefined_column">>, [{capture, none}]) of
+        match ->
+            ok;
+        nomatch ->
+            ct:fail(#{
+                expected_pattern => "undefined_column",
+                got => Error1
+            })
+    end,
+    %% assert that although there was an error returned, the invliad SQL is actually put
+    C1 = [{action_name, BridgeName}, {action_type, pgsql} | Config],
+    {ok, {{_, 200, "OK"}, _, Action}} = emqx_bridge_v2_testlib:get_action_api(C1),
+    #{<<"parameters">> := #{<<"sql">> := FetchedSQL}} = Action,
+    ?assertEqual(FetchedSQL, BadSQL),
+
+    %% update again with the original sql
+    {ok, {{_, 200, "OK"}, _Headers2, Body2}} =
+        emqx_bridge_v2_testlib:update_bridge_api(Config, #{}),
+    %% the error should be gone now, and status should be 'connected'
+    ?assertMatch(#{<<"error">> := <<>>, <<"status">> := <<"connected">>}, Body2),
+    %% finally check if ecpool worker should have exactly one of reconnect callback
+    ConnectorResId = <<"connector:pgsql:", ConnectorName/binary>>,
+    Workers = ecpool:workers(ConnectorResId),
+    [_ | _] = WorkerPids = lists:map(fun({_, Pid}) -> Pid end, Workers),
+    lists:foreach(
+        fun(Pid) ->
+            [{emqx_postgresql, prepare_sql_to_conn, Args}] =
+                ecpool_worker:get_reconnect_callbacks(Pid),
+            Sig = emqx_postgresql:get_reconnect_callback_signature(Args),
+            BridgeResId = <<"action:pgsql:", BridgeName/binary, $:, ConnectorResId/binary>>,
+            ?assertEqual(BridgeResId, Sig)
+        end,
+        WorkerPids
+    ),
+    ok.

+ 1 - 1
apps/emqx_postgresql/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7.1.2"}}},
+    {epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7.1.3"}}},
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}}
 ]}.

+ 1 - 1
apps/emqx_postgresql/src/emqx_postgresql.app.src

@@ -1,6 +1,6 @@
 {application, emqx_postgresql, [
     {description, "EMQX PostgreSQL Database Connector"},
-    {vsn, "0.2.4"},
+    {vsn, "0.2.5"},
     {registered, []},
     {applications, [
         kernel,

+ 32 - 17
apps/emqx_postgresql/src/emqx_postgresql.erl

@@ -54,7 +54,7 @@
 -export([disable_prepared_statements/0]).
 
 %% for ecpool workers usage
--export([do_get_status/1, prepare_sql_to_conn/2]).
+-export([do_get_status/1, prepare_sql_to_conn/2, get_reconnect_callback_signature/1]).
 
 -define(PGSQL_HOST_OPTIONS, #{
     default_port => ?PGSQL_DEFAULT_PORT
@@ -279,9 +279,15 @@ close_prepared_statement([WorkerPid | Rest], ChannelId, State) ->
     %% again anyway.
     try ecpool_worker:client(WorkerPid) of
         {ok, Conn} ->
-            Statement = get_templated_statement(ChannelId, State),
-            _ = epgsql:close(Conn, Statement),
-            close_prepared_statement(Rest, ChannelId, State);
+            ok = ecpool_worker:remove_reconnect_callback_by_signature(WorkerPid, ChannelId),
+            case get_templated_statement(ChannelId, State) of
+                {ok, Statement} ->
+                    _ = epgsql:close(Conn, Statement),
+                    close_prepared_statement(Rest, ChannelId, State);
+                error ->
+                    %% channel was not added
+                    ok
+            end;
         _ ->
             close_prepared_statement(Rest, ChannelId, State)
     catch
@@ -375,7 +381,7 @@ on_batch_query(
             ?SLOG(error, Log),
             {error, {unrecoverable_error, batch_prepare_not_implemented}};
         {_Statement, RowTemplate} ->
-            StatementTemplate = get_templated_statement(BinKey, State),
+            {ok, StatementTemplate} = get_templated_statement(BinKey, State),
             Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
             emqx_trace:rendered_action_template(
                 Key,
@@ -443,20 +449,23 @@ get_template(Key, #{query_templates := Templates}) ->
     BinKey = to_bin(Key),
     maps:get(BinKey, Templates, undefined).
 
-get_templated_statement(Key, #{installed_channels := Channels} = _State) when
-    is_map_key(Key, Channels)
-->
+get_templated_statement(Key, #{installed_channels := Channels} = _State) ->
     BinKey = to_bin(Key),
-    ChannelState = maps:get(BinKey, Channels),
-    case ChannelState of
-        #{prepares := disabled, query_templates := #{BinKey := {ExprTemplate, _}}} ->
-            ExprTemplate;
-        #{prepares := #{BinKey := ExprTemplate}} ->
-            ExprTemplate
+    case is_map_key(BinKey, Channels) of
+        true ->
+            ChannelState = maps:get(BinKey, Channels),
+            case ChannelState of
+                #{prepares := disabled, query_templates := #{BinKey := {ExprTemplate, _}}} ->
+                    {ok, ExprTemplate};
+                #{prepares := #{BinKey := ExprTemplate}} ->
+                    {ok, ExprTemplate}
+            end;
+        false ->
+            error
     end;
 get_templated_statement(Key, #{prepares := PrepStatements}) ->
     BinKey = to_bin(Key),
-    maps:get(BinKey, PrepStatements).
+    {ok, maps:get(BinKey, PrepStatements)}.
 
 on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
     try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
@@ -644,13 +653,13 @@ conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
 conn_opts([_Opt | Opts], Acc) ->
     conn_opts(Opts, Acc).
 
-parse_sql_template(Config, SQLID) ->
+parse_sql_template(Config, ChannelId) ->
     Queries =
         case Config of
             #{prepare_statement := Qs} ->
                 Qs;
             #{sql := Query} ->
-                #{SQLID => Query};
+                #{ChannelId => Query};
             #{} ->
                 #{}
         end,
@@ -700,6 +709,12 @@ prepare_sql(Templates, PoolName) ->
             Error
     end.
 
+%% this callback accepts the arg list provided to
+%% ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]})
+%% so ecpool_worker can de-duplicate the callbacks based on the signature.
+get_reconnect_callback_signature([[{ChannelId, _Template}]]) ->
+    ChannelId.
+
 do_prepare_sql(Templates, PoolName) ->
     do_prepare_sql(ecpool:workers(PoolName), Templates, #{}).
 

+ 5 - 0
changes/ee/fix-13901.en.md

@@ -0,0 +1,5 @@
+Fix prepared statements for Postgres integration.
+
+Prior to this fix, when updating a Postgres integration action,
+if an invalid prepared-statements is used, for example reference to an unknown table column name,
+it may cause the action to apply the oldest version prepared-statement from the past.

+ 2 - 2
mix.exs

@@ -205,7 +205,7 @@ defmodule EMQXUmbrella.MixProject do
 
   def common_dep(:cowboy), do: {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}
   def common_dep(:jsone), do: {:jsone, github: "emqx/jsone", tag: "1.7.1", override: true}
-  def common_dep(:ecpool), do: {:ecpool, github: "emqx/ecpool", tag: "0.5.7", override: true}
+  def common_dep(:ecpool), do: {:ecpool, github: "emqx/ecpool", tag: "0.5.10", override: true}
   def common_dep(:replayq), do: {:replayq, github: "emqx/replayq", tag: "0.3.8", override: true}
   def common_dep(:jsx), do: {:jsx, github: "talentdeficit/jsx", tag: "v3.1.0", override: true}
   # in conflict by emqtt and hocon
@@ -216,7 +216,7 @@ defmodule EMQXUmbrella.MixProject do
   def common_dep(:ra), do: {:ra, github: "emqx/ra", tag: "v2.14.0-emqx-1", override: true}
 
   # in conflict by emqx_connector and system_monitor
-  def common_dep(:epgsql), do: {:epgsql, github: "emqx/epgsql", tag: "4.7.1.2", override: true}
+  def common_dep(:epgsql), do: {:epgsql, github: "emqx/epgsql", tag: "4.7.1.3", override: true}
   def common_dep(:sasl_auth), do: {:sasl_auth, "2.3.3", override: true}
   def common_dep(:gen_rpc), do: {:gen_rpc, github: "emqx/gen_rpc", tag: "3.4.0", override: true}
 

+ 1 - 1
rebar.config

@@ -87,7 +87,7 @@
     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.4.0"}}},
     {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}},
     {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.4.3"}}},
-    {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.7"}}},
+    {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.10"}}},
     {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.8"}}},
     {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
     {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.13.0"}}},