Просмотр исходного кода

fix(postgres): correctly handle batch queries with disabled prepared statements

Fixes https://emqx.atlassian.net/browse/EMQX-12549
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
fd5e844cc1

+ 70 - 7
apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl

@@ -102,6 +102,18 @@ init_per_group(Group, Config) when
         {connector_type, group_to_type(Group)}
         | Config
     ];
+init_per_group(batch_enabled, Config) ->
+    [
+        {batch_size, 10},
+        {batch_time, <<"10ms">>}
+        | Config
+    ];
+init_per_group(batch_disabled, Config) ->
+    [
+        {batch_size, 1},
+        {batch_time, <<"0ms">>}
+        | Config
+    ];
 init_per_group(_Group, Config) ->
     Config.
 
@@ -262,17 +274,68 @@ t_start_action_or_source_with_disabled_connector(Config) ->
     ok.
 
 t_disable_prepared_statements(matrix) ->
-    [[postgres], [timescale], [matrix]];
+    [
+        [postgres, batch_disabled],
+        [postgres, batch_enabled],
+        [timescale, batch_disabled],
+        [timescale, batch_enabled],
+        [matrix, batch_disabled],
+        [matrix, batch_enabled]
+    ];
 t_disable_prepared_statements(Config0) ->
+    BatchSize = ?config(batch_size, Config0),
+    BatchTime = ?config(batch_time, Config0),
     ConnectorConfig0 = ?config(connector_config, Config0),
     ConnectorConfig = maps:merge(ConnectorConfig0, #{<<"disable_prepared_statements">> => true}),
-    Config = lists:keyreplace(connector_config, 1, Config0, {connector_config, ConnectorConfig}),
-    ok = emqx_bridge_v2_testlib:t_sync_query(
-        Config,
-        fun make_message/0,
-        fun(Res) -> ?assertMatch({ok, _}, Res) end,
-        postgres_bridge_connector_on_query_return
+    BridgeConfig0 = ?config(bridge_config, Config0),
+    BridgeConfig = emqx_utils_maps:deep_merge(
+        BridgeConfig0,
+        #{
+            <<"resource_opts">> => #{
+                <<"batch_size">> => BatchSize,
+                <<"batch_time">> => BatchTime,
+                <<"query_mode">> => <<"async">>
+            }
+        }
     ),
+    Config1 = lists:keyreplace(connector_config, 1, Config0, {connector_config, ConnectorConfig}),
+    Config = lists:keyreplace(bridge_config, 1, Config1, {bridge_config, BridgeConfig}),
+    ?check_trace(
+        #{timetrap => 5_000},
+        begin
+            ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge_api(Config)),
+            RuleTopic = <<"t/postgres">>,
+            Type = ?config(bridge_type, Config),
+            {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(Type, RuleTopic, Config),
+            ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+            ),
+            {ok, C} = emqtt:start_link(),
+            {ok, _} = emqtt:connect(C),
+            lists:foreach(
+                fun(N) ->
+                    emqtt:publish(C, RuleTopic, integer_to_binary(N))
+                end,
+                lists:seq(1, BatchSize)
+            ),
+            case BatchSize > 1 of
+                true ->
+                    ?block_until(#{
+                        ?snk_kind := "postgres_success_batch_result",
+                        row_count := BatchSize
+                    }),
+                    ok;
+                false ->
+                    ok
+            end,
+            ok
+        end,
+        []
+    ),
+    emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
     ok = emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
     emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
     ok = emqx_bridge_v2_testlib:t_create_via_http(Config),

+ 7 - 2
apps/emqx_postgresql/src/emqx_postgresql.erl

@@ -420,8 +420,12 @@ get_templated_statement(Key, #{installed_channels := Channels} = _State) when
 ->
     BinKey = to_bin(Key),
     ChannelState = maps:get(BinKey, Channels),
-    ChannelPreparedStatements = maps:get(prepares, ChannelState),
-    maps:get(BinKey, ChannelPreparedStatements);
+    case ChannelState of
+        #{prepares := disabled, query_templates := #{BinKey := {ExprTemplate, _}}} ->
+            ExprTemplate;
+        #{prepares := #{BinKey := ExprTemplate}} ->
+            ExprTemplate
+    end;
 get_templated_statement(Key, #{prepares := PrepStatements}) ->
     BinKey = to_bin(Key),
     maps:get(BinKey, PrepStatements).
@@ -785,6 +789,7 @@ handle_batch_result([{error, Error} | _Rest], _Acc) ->
     TranslatedError = translate_to_log_context(Error),
     {error, {unrecoverable_error, export_error(TranslatedError)}};
 handle_batch_result([], Acc) ->
+    ?tp("postgres_success_batch_result", #{row_count => Acc}),
     {ok, Acc}.
 
 translate_to_log_context({error, Reason}) ->