Przeglądaj źródła

Merge pull request #7668 from DDDHuang/mysql_prepare

fix: mysql support prepare sql
DDDHuang 3 lat temu
rodzic
commit
b395b4becb

+ 10 - 12
apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl

@@ -23,6 +23,8 @@
 -behaviour(hocon_schema).
 -behaviour(hocon_schema).
 -behaviour(emqx_authentication).
 -behaviour(emqx_authentication).
 
 
+-define(PREPARE_KEY, ?MODULE).
+
 -export([
 -export([
     namespace/0,
     namespace/0,
     roots/0,
     roots/0,
@@ -54,7 +56,7 @@ fields(?CONF_NS) ->
         {query, fun query/1},
         {query, fun query/1},
         {query_timeout, fun query_timeout/1}
         {query_timeout, fun query_timeout/1}
     ] ++ emqx_authn_schema:common_fields() ++
     ] ++ emqx_authn_schema:common_fields() ++
-        emqx_connector_mysql:fields(config).
+    proplists:delete(prepare_statement, emqx_connector_mysql:fields(config)).
 
 
 desc(?CONF_NS) ->
 desc(?CONF_NS) ->
     "Configuration for authentication using MySQL database.";
     "Configuration for authentication using MySQL database.";
@@ -89,12 +91,11 @@ create(
     } = Config
     } = Config
 ) ->
 ) ->
     ok = emqx_authn_password_hashing:init(Algorithm),
     ok = emqx_authn_password_hashing:init(Algorithm),
-    {Query, PlaceHolders} = emqx_authn_utils:parse_sql(Query0, '?'),
+    {PrepareSql, TmplToken} = emqx_authn_utils:parse_sql(Query0, '?'),
     ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
     ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
     State = #{
     State = #{
         password_hash_algorithm => Algorithm,
         password_hash_algorithm => Algorithm,
-        query => Query,
-        placeholders => PlaceHolders,
+        tmpl_token => TmplToken,
         query_timeout => QueryTimeout,
         query_timeout => QueryTimeout,
         resource_id => ResourceId
         resource_id => ResourceId
     },
     },
@@ -103,12 +104,10 @@ create(
             ResourceId,
             ResourceId,
             ?RESOURCE_GROUP,
             ?RESOURCE_GROUP,
             emqx_connector_mysql,
             emqx_connector_mysql,
-            Config,
+            Config#{prepare_statement => #{?PREPARE_KEY => PrepareSql}},
             #{}
             #{}
         )
         )
     of
     of
-        {ok, already_created} ->
-            {ok, State};
         {ok, _} ->
         {ok, _} ->
             {ok, State};
             {ok, State};
         {error, Reason} ->
         {error, Reason} ->
@@ -129,15 +128,14 @@ authenticate(#{auth_method := _}, _) ->
 authenticate(
 authenticate(
     #{password := Password} = Credential,
     #{password := Password} = Credential,
     #{
     #{
-        placeholders := PlaceHolders,
-        query := Query,
+        tmpl_token := TmplToken,
         query_timeout := Timeout,
         query_timeout := Timeout,
         resource_id := ResourceId,
         resource_id := ResourceId,
         password_hash_algorithm := Algorithm
         password_hash_algorithm := Algorithm
     }
     }
 ) ->
 ) ->
-    Params = emqx_authn_utils:render_sql_params(PlaceHolders, Credential),
-    case emqx_resource:query(ResourceId, {sql, Query, Params, Timeout}) of
+    Params = emqx_authn_utils:render_sql_params(TmplToken, Credential),
+    case emqx_resource:query(ResourceId, {prepared_query, ?PREPARE_KEY, Params, Timeout}) of
         {ok, _Columns, []} ->
         {ok, _Columns, []} ->
             ignore;
             ignore;
         {ok, Columns, [Row | _]} ->
         {ok, Columns, [Row | _]} ->
@@ -156,7 +154,7 @@ authenticate(
             ?SLOG(error, #{
             ?SLOG(error, #{
                 msg => "mysql_query_failed",
                 msg => "mysql_query_failed",
                 resource => ResourceId,
                 resource => ResourceId,
-                query => Query,
+                tmpl_token => TmplToken,
                 params => Params,
                 params => Params,
                 timeout => Timeout,
                 timeout => Timeout,
                 reason => Reason
                 reason => Reason

+ 2 - 2
apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl

@@ -60,7 +60,7 @@ fields(?CONF_NS) ->
         {query, fun query/1}
         {query, fun query/1}
     ] ++
     ] ++
         emqx_authn_schema:common_fields() ++
         emqx_authn_schema:common_fields() ++
-        proplists:delete(named_queries, emqx_connector_pgsql:fields(config)).
+        proplists:delete(prepare_statement, emqx_connector_pgsql:fields(config)).
 
 
 desc(?CONF_NS) ->
 desc(?CONF_NS) ->
     "Configuration for PostgreSQL authentication backend.";
     "Configuration for PostgreSQL authentication backend.";
@@ -101,7 +101,7 @@ create(
             ResourceId,
             ResourceId,
             ?RESOURCE_GROUP,
             ?RESOURCE_GROUP,
             emqx_connector_pgsql,
             emqx_connector_pgsql,
-            Config#{named_queries => #{ResourceId => Query}},
+            Config#{prepare_statement => #{ResourceId => Query}},
             #{}
             #{}
         )
         )
     of
     of

+ 4 - 4
apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl

@@ -198,10 +198,10 @@ t_update(_Config) ->
                 >>
                 >>
         },
         },
 
 
-    {ok, _} = emqx:update_config(
-        ?PATH,
-        {create_authenticator, ?GLOBAL, IncorrectConfig}
-    ),
+        {ok, _} = emqx:update_config(
+            ?PATH,
+            {create_authenticator, ?GLOBAL, IncorrectConfig}
+        ),
 
 
     {error, not_authorized} = emqx_access_control:authenticate(
     {error, not_authorized} = emqx_access_control:authenticate(
         #{
         #{

+ 2 - 2
apps/emqx_authz/src/emqx_authz_api_schema.erl

@@ -56,11 +56,11 @@ fields(mongo_sharded) ->
 fields(mysql) ->
 fields(mysql) ->
     authz_common_fields(mysql) ++
     authz_common_fields(mysql) ++
         [{query, mk(binary(), #{required => true})}] ++
         [{query, mk(binary(), #{required => true})}] ++
-        emqx_connector_mysql:fields(config);
+        proplists:delete(prepare_statement, emqx_connector_mysql:fields(config));
 fields(postgresql) ->
 fields(postgresql) ->
     authz_common_fields(postgresql) ++
     authz_common_fields(postgresql) ++
         [{query, mk(binary(), #{required => true})}] ++
         [{query, mk(binary(), #{required => true})}] ++
-        proplists:delete(named_queries, emqx_connector_pgsql:fields(config));
+        proplists:delete(prepare_statement, emqx_connector_pgsql:fields(config));
 fields(redis_single) ->
 fields(redis_single) ->
     authz_redis_common_fields() ++
     authz_redis_common_fields() ++
         emqx_connector_redis:fields(single);
         emqx_connector_redis:fields(single);

+ 10 - 16
apps/emqx_authz/src/emqx_authz_mysql.erl

@@ -23,6 +23,8 @@
 
 
 -behaviour(emqx_authz).
 -behaviour(emqx_authz).
 
 
+-define(PREPARE_KEY, ?MODULE).
+
 %% AuthZ Callbacks
 %% AuthZ Callbacks
 -export([
 -export([
     description/0,
     description/0,
@@ -47,22 +49,14 @@
 description() ->
 description() ->
     "AuthZ with Mysql".
     "AuthZ with Mysql".
 
 
-init(#{query := SQL} = Source) ->
+init(#{query := SQL} = Source0) ->
+    {PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?PLACEHOLDERS),
+    Source = Source0#{prepare_statement => #{?PREPARE_KEY => PrepareSQL}},
     case emqx_authz_utils:create_resource(emqx_connector_mysql, Source) of
     case emqx_authz_utils:create_resource(emqx_connector_mysql, Source) of
         {error, Reason} ->
         {error, Reason} ->
             error({load_config_error, Reason});
             error({load_config_error, Reason});
         {ok, Id} ->
         {ok, Id} ->
-            Source#{
-                annotations =>
-                    #{
-                        id => Id,
-                        query => emqx_authz_utils:parse_sql(
-                            SQL,
-                            '?',
-                            ?PLACEHOLDERS
-                        )
-                    }
-            }
+            Source#{annotations => #{id => Id, tmpl_oken => TmplToken}}
     end.
     end.
 
 
 destroy(#{annotations := #{id := Id}}) ->
 destroy(#{annotations := #{id := Id}}) ->
@@ -75,12 +69,12 @@ authorize(
     #{
     #{
         annotations := #{
         annotations := #{
             id := ResourceID,
             id := ResourceID,
-            query := {Query, Params}
+            tmpl_oken := TmplToken
         }
         }
     }
     }
 ) ->
 ) ->
-    RenderParams = emqx_authz_utils:render_sql_params(Params, Client),
-    case emqx_resource:query(ResourceID, {sql, Query, RenderParams}) of
+    RenderParams = emqx_authz_utils:render_sql_params(TmplToken, Client),
+    case emqx_resource:query(ResourceID, {prepared_query, ?PREPARE_KEY, RenderParams}) of
         {ok, _Columns, []} ->
         {ok, _Columns, []} ->
             nomatch;
             nomatch;
         {ok, Columns, Rows} ->
         {ok, Columns, Rows} ->
@@ -89,7 +83,7 @@ authorize(
             ?SLOG(error, #{
             ?SLOG(error, #{
                 msg => "query_mysql_error",
                 msg => "query_mysql_error",
                 reason => Reason,
                 reason => Reason,
-                query => Query,
+                tmpl_oken => TmplToken,
                 params => RenderParams,
                 params => RenderParams,
                 resource_id => ResourceID
                 resource_id => ResourceID
             }),
             }),

+ 1 - 1
apps/emqx_authz/src/emqx_authz_postgresql.erl

@@ -59,7 +59,7 @@ init(#{query := SQL0} = Source) ->
             ResourceID,
             ResourceID,
             ?RESOURCE_GROUP,
             ?RESOURCE_GROUP,
             emqx_connector_pgsql,
             emqx_connector_pgsql,
-            Source#{named_queries => #{ResourceID => SQL}},
+            Source#{prepare_statement => #{ResourceID => SQL}},
             #{}
             #{}
         )
         )
     of
     of

+ 0 - 11
apps/emqx_connector/i18n/emqx_connector_pgsql.conf

@@ -19,15 +19,4 @@ The PostgreSQL default port 5432 is used if `[:Port]` is not specified.
             }
             }
     }
     }
 
 
-    name_queries_desc {
-        desc {
-          en: "Key-value list of SQL prepared statements."
-          zh: "SQL 预处理语句列表。"
-        }
-        label: {
-              en: "SQL Prepared Statements List"
-              zh: "SQL 预处理语句列表"
-            }
-    }
-
 }
 }

+ 11 - 0
apps/emqx_connector/i18n/emqx_connector_schema_lib.conf

@@ -11,6 +11,17 @@ emqx_connector_schema_lib {
             }
             }
     }
     }
 
 
+    prepare_statement {
+        desc {
+          en: "Key-value list of SQL prepared statements."
+          zh: "SQL 预处理语句列表。"
+        }
+        label: {
+              en: "SQL Prepared Statements List"
+              zh: "SQL 预处理语句列表"
+            }
+    }
+
     database_desc {
     database_desc {
         desc {
         desc {
           en: "Database name."
           en: "Database name."

+ 142 - 20
apps/emqx_connector/src/emqx_connector_mysql.erl

@@ -29,7 +29,10 @@
         , on_health_check/2
         , on_health_check/2
         ]).
         ]).
 
 
--export([connect/1]).
+%% ecpool connect & reconnect
+-export([connect/1, prepare_sql_to_conn/2]).
+
+-export([prepare_sql/2]).
 
 
 -export([roots/0, fields/1]).
 -export([roots/0, fields/1]).
 
 
@@ -48,7 +51,8 @@ fields(config) ->
     [ {server, fun server/1}
     [ {server, fun server/1}
     ] ++
     ] ++
     emqx_connector_schema_lib:relational_db_fields() ++
     emqx_connector_schema_lib:relational_db_fields() ++
-    emqx_connector_schema_lib:ssl_fields().
+    emqx_connector_schema_lib:ssl_fields() ++
+    emqx_connector_schema_lib:prepare_statement_fields().
 
 
 server(type) -> emqx_schema:ip_port();
 server(type) -> emqx_schema:ip_port();
 server(required) -> true;
 server(required) -> true;
@@ -81,8 +85,10 @@ on_start(InstId, #{server := {Host, Port},
                {auto_reconnect, reconn_interval(AutoReconn)},
                {auto_reconnect, reconn_interval(AutoReconn)},
                {pool_size, PoolSize}],
                {pool_size, PoolSize}],
     PoolName = emqx_plugin_libs_pool:pool_name(InstId),
     PoolName = emqx_plugin_libs_pool:pool_name(InstId),
+    Prepares = maps:get(prepare_statement, Config, #{}),
+    State = init_prepare(#{poolname => PoolName, prepare_statement => Prepares}),
     case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
     case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
-        ok              -> {ok, #{poolname => PoolName}};
+        ok              -> {ok, State};
         {error, Reason} -> {error, Reason}
         {error, Reason} -> {error, Reason}
     end.
     end.
 
 
@@ -91,31 +97,82 @@ on_stop(InstId, #{poolname := PoolName}) ->
                   connector => InstId}),
                   connector => InstId}),
     emqx_plugin_libs_pool:stop_pool(PoolName).
     emqx_plugin_libs_pool:stop_pool(PoolName).
 
 
-on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) ->
-    on_query(InstId, {sql, SQL, [], default_timeout}, AfterQuery, State);
-on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := _PoolName} = State) ->
-    on_query(InstId, {sql, SQL, Params, default_timeout}, AfterQuery, State);
-on_query(InstId, {sql, SQL, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) ->
-    ?TRACE("QUERY", "mysql_connector_received", #{connector => InstId, sql => SQL, state => State}),
-    case Result = ecpool:pick_and_do(
-                    PoolName,
-                    {mysql, query, [SQL, Params, Timeout]},
-                    no_handover) of
+on_query(InstId, {Type, SQLOrKey}, AfterQuery, State) ->
+    on_query(InstId, {Type, SQLOrKey, [], default_timeout}, AfterQuery, State);
+on_query(InstId, {Type, SQLOrKey, Params}, AfterQuery, State) ->
+    on_query(InstId, {Type, SQLOrKey, Params, default_timeout}, AfterQuery, State);
+on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery,
+  #{poolname := PoolName, prepare_statement := Prepares} = State) ->
+    LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
+    ?TRACE("QUERY", "mysql_connector_received", LogMeta),
+    Worker = ecpool:get_client(PoolName),
+    {ok, Conn} = ecpool_worker:client(Worker),
+    MySqlFunction = mysql_function(Type),
+    Result = erlang:apply(mysql, MySqlFunction, [Conn, SQLOrKey, Params, Timeout]),
+    case Result of
+        {error, disconnected} ->
+            ?SLOG(error,
+                LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected}),
+            %% kill the poll worker to trigger reconnection
+            _ = exit(Conn, restart),
+            emqx_resource:query_failed(AfterQuery),
+            Result;
+        {error, not_prepared} ->
+            ?SLOG(warning,
+                LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared}),
+            case prepare_sql(Prepares, PoolName) of
+                ok ->
+                    %% not return result, next loop will try again
+                    on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery, State);
+                {error, Reason} ->
+                    ?SLOG(error,
+                        LogMeta#{msg => "mysql_connector_do_prepare_failed", reason => Reason}),
+                    emqx_resource:query_failed(AfterQuery),
+                    {error, Reason}
+            end;
         {error, Reason} ->
         {error, Reason} ->
-            ?SLOG(error, #{msg => "mysql_connector_do_sql_query_failed",
-                connector => InstId, sql => SQL, reason => Reason}),
-            emqx_resource:query_failed(AfterQuery);
+            ?SLOG(error,
+            LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}),
+            emqx_resource:query_failed(AfterQuery),
+            Result;
         _ ->
         _ ->
-            emqx_resource:query_success(AfterQuery)
-    end,
-    Result.
+            emqx_resource:query_success(AfterQuery),
+            Result
+    end.
+
+mysql_function(sql) -> query;
+mysql_function(prepared_query) -> execute.
 
 
 on_health_check(_InstId, #{poolname := PoolName} = State) ->
 on_health_check(_InstId, #{poolname := PoolName} = State) ->
-    emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State).
+    case emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State) of
+        {ok, State} ->
+            case do_health_check_prepares(State) of
+                ok->
+                    {ok, State};
+                {ok, NState} ->
+                    {ok, NState};
+                {error, _Reason} ->
+                    {error, health_check_failed, State}
+            end;
+        {error, health_check_failed, State} ->
+            {error, health_check_failed, State}
+    end.
 
 
 do_health_check(Conn) ->
 do_health_check(Conn) ->
     ok == element(1, mysql:query(Conn, <<"SELECT count(1) AS T">>)).
     ok == element(1, mysql:query(Conn, <<"SELECT count(1) AS T">>)).
 
 
+do_health_check_prepares(#{prepare_statement := Prepares})when is_map(Prepares) ->
+    ok;
+do_health_check_prepares(State = #{poolname := PoolName, prepare_statement := {error, Prepares}}) ->
+    %% retry to prepare
+    case prepare_sql(Prepares, PoolName) of
+        ok ->
+            %% remove the error
+            {ok, State#{prepare_statement => Prepares}};
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
 %% ===================================================================
 %% ===================================================================
 reconn_interval(true) -> 15;
 reconn_interval(true) -> 15;
 reconn_interval(false) -> false.
 reconn_interval(false) -> false.
@@ -127,3 +184,68 @@ connect(Options) ->
       -> {inet:ip_address() | inet:hostname(), pos_integer()}.
       -> {inet:ip_address() | inet:hostname(), pos_integer()}.
 to_server(Str) ->
 to_server(Str) ->
     emqx_connector_schema_lib:parse_server(Str, ?MYSQL_HOST_OPTIONS).
     emqx_connector_schema_lib:parse_server(Str, ?MYSQL_HOST_OPTIONS).
+
+init_prepare(State = #{prepare_statement := Prepares, poolname := PoolName}) ->
+    case maps:size(Prepares) of
+        0 ->
+            State;
+        _ ->
+            case prepare_sql(Prepares, PoolName) of
+                ok ->
+                    State;
+                {error, Reason} ->
+                    LogMeta = #{msg => <<"MySQL init prepare statement failed">>, reason => Reason},
+                    ?SLOG(error, LogMeta),
+                    %% mark the prepare_statement as failed
+                    State#{prepare_statement => {error, Prepares}}
+            end
+    end.
+
+prepare_sql(Prepares, PoolName) when is_map(Prepares) ->
+    prepare_sql(maps:to_list(Prepares), PoolName);
+prepare_sql(Prepares, PoolName) ->
+    case do_prepare_sql(Prepares, PoolName) of
+        ok ->
+            %% prepare for reconnect
+            ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}),
+            ok;
+        {error, R} ->
+            {error, R}
+    end.
+
+do_prepare_sql(Prepares, PoolName) ->
+    Conns =
+        [begin
+            {ok, Conn} = ecpool_worker:client(Worker),
+            Conn
+         end || {_Name, Worker} <- ecpool:workers(PoolName)],
+    prepare_sql_to_conn_list(Conns, Prepares).
+
+prepare_sql_to_conn_list([], _PrepareList) -> ok;
+prepare_sql_to_conn_list([Conn | ConnList], PrepareList) ->
+    case prepare_sql_to_conn(Conn, PrepareList) of
+        ok ->
+            prepare_sql_to_conn_list(ConnList, PrepareList);
+        {error, R} ->
+            %% rollback
+            Fun = fun({Key, _}) -> _ = unprepare_sql_to_conn(Conn, Key), ok end,
+            lists:foreach(Fun, PrepareList),
+            {error, R}
+    end.
+
+prepare_sql_to_conn(Conn, []) when is_pid(Conn) -> ok;
+prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList]) when is_pid(Conn) ->
+    LogMeta = #{msg => "MySQL Prepare Statement", name => Key, prepare_sql => SQL},
+    ?SLOG(info, LogMeta),
+    _ = unprepare_sql_to_conn(Conn, Key),
+    case mysql:prepare(Conn, Key, SQL) of
+        {ok, _Key} ->
+            ?SLOG(info, LogMeta#{result => success}),
+            prepare_sql_to_conn(Conn, PrepareList);
+        {error, Reason} ->
+            ?SLOG(error, LogMeta#{result => failed, reason => Reason}),
+            {error, Reason}
+    end.
+
+unprepare_sql_to_conn(Conn, PrepareSqlKey) ->
+    mysql:unprepare(Conn, PrepareSqlKey).

+ 6 - 11
apps/emqx_connector/src/emqx_connector_pgsql.erl

@@ -51,15 +51,10 @@ roots() ->
     [{config, #{type => hoconsc:ref(?MODULE, config)}}].
     [{config, #{type => hoconsc:ref(?MODULE, config)}}].
 
 
 fields(config) ->
 fields(config) ->
-    [ {named_queries, fun named_queries/1}
-    , {server, fun server/1}] ++
+    [{server, fun server/1}] ++
     emqx_connector_schema_lib:relational_db_fields() ++
     emqx_connector_schema_lib:relational_db_fields() ++
-    emqx_connector_schema_lib:ssl_fields().
-
-named_queries(type) -> map();
-named_queries(desc) -> ?DESC("name_queries_desc");
-named_queries(required) -> false;
-named_queries(_) -> undefined.
+    emqx_connector_schema_lib:ssl_fields() ++
+    emqx_connector_schema_lib:prepare_statement_fields().
 
 
 server(type) -> emqx_schema:ip_port();
 server(type) -> emqx_schema:ip_port();
 server(required) -> true;
 server(required) -> true;
@@ -92,7 +87,7 @@ on_start(InstId, #{server := {Host, Port},
                {database, DB},
                {database, DB},
                {auto_reconnect, reconn_interval(AutoReconn)},
                {auto_reconnect, reconn_interval(AutoReconn)},
                {pool_size, PoolSize},
                {pool_size, PoolSize},
-               {named_queries, maps:to_list(maps:get(named_queries, Config, #{}))}],
+               {prepare_statement, maps:to_list(maps:get(prepare_statement, Config, #{}))}],
     PoolName = emqx_plugin_libs_pool:pool_name(InstId),
     PoolName = emqx_plugin_libs_pool:pool_name(InstId),
     case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
     case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
         ok              -> {ok, #{poolname => PoolName}};
         ok              -> {ok, #{poolname => PoolName}};
@@ -135,10 +130,10 @@ connect(Opts) ->
     Host     = proplists:get_value(host, Opts),
     Host     = proplists:get_value(host, Opts),
     Username = proplists:get_value(username, Opts),
     Username = proplists:get_value(username, Opts),
     Password = proplists:get_value(password, Opts),
     Password = proplists:get_value(password, Opts),
-    NamedQueries = proplists:get_value(named_queries, Opts),
+    PrepareStatement = proplists:get_value(prepare_statement, Opts),
     case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of
     case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of
         {ok, Conn} ->
         {ok, Conn} ->
-            case parse(Conn, NamedQueries) of
+            case parse(Conn, PrepareStatement) of
                 ok -> {ok, Conn};
                 ok -> {ok, Conn};
                 {error, Reason} -> {error, Reason}
                 {error, Reason} -> {error, Reason}
             end;
             end;

+ 10 - 0
apps/emqx_connector/src/emqx_connector_schema_lib.erl

@@ -21,6 +21,7 @@
 
 
 -export([ relational_db_fields/0
 -export([ relational_db_fields/0
         , ssl_fields/0
         , ssl_fields/0
+        , prepare_statement_fields/0
         ]).
         ]).
 
 
 -export([ ip_port_to_string/1
 -export([ ip_port_to_string/1
@@ -67,6 +68,15 @@ relational_db_fields() ->
     , {auto_reconnect, fun auto_reconnect/1}
     , {auto_reconnect, fun auto_reconnect/1}
     ].
     ].
 
 
+prepare_statement_fields() ->
+    [ {prepare_statement, fun prepare_statement/1}
+    ].
+
+prepare_statement(type) -> map();
+prepare_statement(desc) -> ?DESC("prepare_statement");
+prepare_statement(required) -> false;
+prepare_statement(_) -> undefined.
+
 database(type) -> binary();
 database(type) -> binary();
 database(desc) -> ?DESC("database_desc");
 database(desc) -> ?DESC("database_desc");
 database(required) -> true;
 database(required) -> true;