Przeglądaj źródła

fix(mysql): be explicit that batch queries are parameterless

So that mysql client won't attempt to prepare them automatically, thus
trashing the server's prepared statements table and making interaction
overall heavier.
Andrew Mayorov 2 lat temu
rodzic
commit
fc37d9b3cd

+ 3 - 0
.ci/docker-compose-file/docker-compose-mysql-tcp.yaml

@@ -18,4 +18,7 @@ services:
       - --collation-server=utf8mb4_general_ci
       - --collation-server=utf8mb4_general_ci
       - --lower-case-table-names=1
       - --lower-case-table-names=1
       - --max-allowed-packet=128M
       - --max-allowed-packet=128M
+      # Severely limit maximum number of prepared statements the server must permit
+      # so that we hit potential resource exhaustion earlier in tests.
+      - --max-prepared-stmt-count=64
       - --skip-symbolic-links
       - --skip-symbolic-links

+ 3 - 0
.ci/docker-compose-file/docker-compose-mysql-tls.yaml

@@ -25,6 +25,9 @@ services:
       - --collation-server=utf8mb4_general_ci
       - --collation-server=utf8mb4_general_ci
       - --lower-case-table-names=1
       - --lower-case-table-names=1
       - --max-allowed-packet=128M
       - --max-allowed-packet=128M
+      # Severely limit maximum number of prepared statements the server must permit
+      # so that we hit potential resource exhaustion earlier in tests.
+      - --max-prepared-stmt-count=64
       - --ssl-ca=/etc/certs/ca-cert.pem
       - --ssl-ca=/etc/certs/ca-cert.pem
       - --ssl-cert=/etc/certs/server-cert.pem
       - --ssl-cert=/etc/certs/server-cert.pem
       - --ssl-key=/etc/certs/server-key.pem
       - --ssl-key=/etc/certs/server-key.pem

+ 8 - 8
apps/emqx_connector/src/emqx_connector_mysql.erl

@@ -392,13 +392,13 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens})
 
 
 on_batch_insert(InstId, BatchReqs, InsertPart, Tokens, State) ->
 on_batch_insert(InstId, BatchReqs, InsertPart, Tokens, State) ->
     SQL = emqx_plugin_libs_rule:proc_batch_sql(BatchReqs, InsertPart, Tokens),
     SQL = emqx_plugin_libs_rule:proc_batch_sql(BatchReqs, InsertPart, Tokens),
-    on_sql_query(InstId, query, SQL, [], default_timeout, State).
+    on_sql_query(InstId, query, SQL, no_params, default_timeout, State).
 
 
 on_sql_query(
 on_sql_query(
     InstId,
     InstId,
     SQLFunc,
     SQLFunc,
     SQLOrKey,
     SQLOrKey,
-    Data,
+    Params,
     Timeout,
     Timeout,
     #{poolname := PoolName} = State
     #{poolname := PoolName} = State
 ) ->
 ) ->
@@ -409,9 +409,9 @@ on_sql_query(
         {ok, Conn} ->
         {ok, Conn} ->
             ?tp(
             ?tp(
                 mysql_connector_send_query,
                 mysql_connector_send_query,
-                #{sql_func => SQLFunc, sql_or_key => SQLOrKey, data => Data}
+                #{sql_func => SQLFunc, sql_or_key => SQLOrKey, data => Params}
             ),
             ),
-            do_sql_query(SQLFunc, Conn, SQLOrKey, Data, Timeout, LogMeta);
+            do_sql_query(SQLFunc, Conn, SQLOrKey, Params, Timeout, LogMeta);
         {error, disconnected} ->
         {error, disconnected} ->
             ?SLOG(
             ?SLOG(
                 error,
                 error,
@@ -423,8 +423,8 @@ on_sql_query(
             {error, {recoverable_error, disconnected}}
             {error, {recoverable_error, disconnected}}
     end.
     end.
 
 
-do_sql_query(SQLFunc, Conn, SQLOrKey, Data, Timeout, LogMeta) ->
-    try mysql:SQLFunc(Conn, SQLOrKey, Data, Timeout) of
+do_sql_query(SQLFunc, Conn, SQLOrKey, Params, Timeout, LogMeta) ->
+    try mysql:SQLFunc(Conn, SQLOrKey, Params, no_filtermap_fun, Timeout) of
         {error, disconnected} ->
         {error, disconnected} ->
             ?SLOG(
             ?SLOG(
                 error,
                 error,
@@ -466,7 +466,7 @@ do_sql_query(SQLFunc, Conn, SQLOrKey, Data, Timeout, LogMeta) ->
         error:badarg ->
         error:badarg ->
             ?SLOG(
             ?SLOG(
                 error,
                 error,
-                LogMeta#{msg => "mysql_connector_invalid_params", params => Data}
+                LogMeta#{msg => "mysql_connector_invalid_params", params => Params}
             ),
             ),
-            {error, {unrecoverable_error, {invalid_params, Data}}}
+            {error, {unrecoverable_error, {invalid_params, Params}}}
     end.
     end.

+ 52 - 13
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl

@@ -28,6 +28,9 @@
 -define(MYSQL_DATABASE, "mqtt").
 -define(MYSQL_DATABASE, "mqtt").
 -define(MYSQL_USERNAME, "root").
 -define(MYSQL_USERNAME, "root").
 -define(MYSQL_PASSWORD, "public").
 -define(MYSQL_PASSWORD, "public").
+-define(MYSQL_POOL_SIZE, 4).
+
+-define(WORKER_POOL_SIZE, 4).
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% CT boilerplate
 %% CT boilerplate
@@ -168,11 +171,13 @@ mysql_config(BridgeType, Config) ->
             "  database = ~p\n"
             "  database = ~p\n"
             "  username = ~p\n"
             "  username = ~p\n"
             "  password = ~p\n"
             "  password = ~p\n"
+            "  pool_size = ~b\n"
             "  sql = ~p\n"
             "  sql = ~p\n"
             "  resource_opts = {\n"
             "  resource_opts = {\n"
             "    request_timeout = 500ms\n"
             "    request_timeout = 500ms\n"
             "    batch_size = ~b\n"
             "    batch_size = ~b\n"
             "    query_mode = ~s\n"
             "    query_mode = ~s\n"
+            "    worker_pool_size = ~b\n"
             "  }\n"
             "  }\n"
             "  ssl = {\n"
             "  ssl = {\n"
             "    enable = ~w\n"
             "    enable = ~w\n"
@@ -185,9 +190,11 @@ mysql_config(BridgeType, Config) ->
                 ?MYSQL_DATABASE,
                 ?MYSQL_DATABASE,
                 ?MYSQL_USERNAME,
                 ?MYSQL_USERNAME,
                 ?MYSQL_PASSWORD,
                 ?MYSQL_PASSWORD,
+                ?MYSQL_POOL_SIZE,
                 ?SQL_BRIDGE,
                 ?SQL_BRIDGE,
                 BatchSize,
                 BatchSize,
                 QueryMode,
                 QueryMode,
+                ?WORKER_POOL_SIZE,
                 TlsEnabled
                 TlsEnabled
             ]
             ]
         ),
         ),
@@ -265,27 +272,26 @@ connect_direct_mysql(Config) ->
     {ok, Pid} = mysql:start_link(Opts ++ SslOpts),
     {ok, Pid} = mysql:start_link(Opts ++ SslOpts),
     Pid.
     Pid.
 
 
+query_direct_mysql(Config, Query) ->
+    Pid = connect_direct_mysql(Config),
+    try
+        mysql:query(Pid, Query)
+    after
+        mysql:stop(Pid)
+    end.
+
 % These funs connect and then stop the mysql connection
 % These funs connect and then stop the mysql connection
 connect_and_create_table(Config) ->
 connect_and_create_table(Config) ->
-    DirectPid = connect_direct_mysql(Config),
-    ok = mysql:query(DirectPid, ?SQL_CREATE_TABLE),
-    mysql:stop(DirectPid).
+    query_direct_mysql(Config, ?SQL_CREATE_TABLE).
 
 
 connect_and_drop_table(Config) ->
 connect_and_drop_table(Config) ->
-    DirectPid = connect_direct_mysql(Config),
-    ok = mysql:query(DirectPid, ?SQL_DROP_TABLE),
-    mysql:stop(DirectPid).
+    query_direct_mysql(Config, ?SQL_DROP_TABLE).
 
 
 connect_and_clear_table(Config) ->
 connect_and_clear_table(Config) ->
-    DirectPid = connect_direct_mysql(Config),
-    ok = mysql:query(DirectPid, ?SQL_DELETE),
-    mysql:stop(DirectPid).
+    query_direct_mysql(Config, ?SQL_DELETE).
 
 
 connect_and_get_payload(Config) ->
 connect_and_get_payload(Config) ->
-    DirectPid = connect_direct_mysql(Config),
-    Result = mysql:query(DirectPid, ?SQL_SELECT),
-    mysql:stop(DirectPid),
-    Result.
+    query_direct_mysql(Config, ?SQL_SELECT).
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% Testcases
 %% Testcases
@@ -505,6 +511,39 @@ t_bad_sql_parameter(Config) ->
     end,
     end,
     ok.
     ok.
 
 
+t_workload_fits_prepared_statement_limit(Config) ->
+    N = 50,
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    Results = lists:append(
+        emqx_misc:pmap(
+            fun(_) ->
+                [
+                    begin
+                        Payload = integer_to_binary(erlang:unique_integer()),
+                        Timestamp = erlang:system_time(millisecond),
+                        send_message(Config, #{payload => Payload, timestamp => Timestamp})
+                    end
+                 || _ <- lists:seq(1, N)
+                ]
+            end,
+            lists:seq(1, ?WORKER_POOL_SIZE * ?MYSQL_POOL_SIZE),
+            _Timeout = 10_000
+        )
+    ),
+    ?assertEqual(
+        [],
+        [R || R <- Results, R /= ok]
+    ),
+    {ok, _, [[_Var, Count]]} =
+        query_direct_mysql(Config, "SHOW GLOBAL STATUS LIKE 'Prepared_stmt_count'"),
+    ?assertEqual(
+        ?MYSQL_POOL_SIZE,
+        binary_to_integer(Count)
+    ).
+
 t_unprepared_statement_query(Config) ->
 t_unprepared_statement_query(Config) ->
     ?assertMatch(
     ?assertMatch(
         {ok, _},
         {ok, _},