Browse Source

Merge pull request #5791 from tigercl/fix/authn2

fix(authn): add timeout option for mysql connector
tigercl 4 years ago
parent
commit
6670bc49fe

+ 39 - 18
apps/emqx/src/emqx_authentication.erl

@@ -289,22 +289,16 @@ check_config(Config) ->
 %%------------------------------------------------------------------------------
 
 authenticate(#{listener := Listener, protocol := Protocol} = Credential, _AuthResult) ->
-    case ets:lookup(?CHAINS_TAB, Listener) of
-        [#chain{authenticators = Authenticators}] when Authenticators =/= [] ->
-            do_authenticate(Authenticators, Credential);
-        _ ->
-            case ets:lookup(?CHAINS_TAB, global_chain(Protocol)) of
-                [#chain{authenticators = Authenticators}] when Authenticators =/= [] ->
-                    do_authenticate(Authenticators, Credential);
-                _ ->
-                    ignore
-            end
+    Authenticators = get_authenticators(Listener, global_chain(Protocol)),
+    case get_enabled(Authenticators) of
+        [] -> ignore;
+        NAuthenticators -> do_authenticate(NAuthenticators, Credential)
     end.
 
 do_authenticate([], _) ->
     {stop, {error, not_authorized}};
-do_authenticate([#authenticator{provider = Provider, state = State} | More], Credential) ->
-    case Provider:authenticate(Credential, State) of
+do_authenticate([#authenticator{id = ID, provider = Provider, state = State} | More], Credential) ->
+    try Provider:authenticate(Credential, State) of
         ignore ->
             do_authenticate(More, Credential);
         Result ->
@@ -314,8 +308,32 @@ do_authenticate([#authenticator{provider = Provider, state = State} | More], Cre
             %% {continue, AuthData, AuthCache}
             %% {error, Reason}
             {stop, Result}
+    catch
+        Class:Reason:Stacktrace ->
+            ?SLOG(warning, #{msg => "unexpected_error_in_authentication",
+                             class => Class,
+                             reason => Reason,
+                             stacktrace => Stacktrace,
+                             authenticator => ID}),
+            do_authenticate(More, Credential)
     end.
 
+get_authenticators(Listener, Global) ->
+    case ets:lookup(?CHAINS_TAB, Listener) of
+        [#chain{authenticators = Authenticators}] ->
+            Authenticators;
+        _ ->
+            case ets:lookup(?CHAINS_TAB, Global) of
+                [#chain{authenticators = Authenticators}] ->
+                    Authenticators;
+                _ ->
+                    []
+            end
+    end.
+
+get_enabled(Authenticators) ->
+    [Authenticator || Authenticator <- Authenticators, Authenticator#authenticator.enable =:= true].
+
 %%------------------------------------------------------------------------------
 %% APIs
 %%------------------------------------------------------------------------------
@@ -331,7 +349,9 @@ initialize_authentication(ChainName, AuthenticatorsConfig) ->
             {ok, _} ->
                 ok;
             {error, Reason} ->
-                ?LOG(error, "Failed to create authenticator '~s': ~p", [generate_id(AuthenticatorConfig), Reason])
+                ?SLOG(error, #{msg => "failed to create authenticator",
+                               reason => Reason,
+                               authenticator => generate_id(AuthenticatorConfig)})
         end
     end, CheckedConfig).
 
@@ -536,7 +556,7 @@ handle_call({create_authenticator, ChainName, Config}, _From, #{providers := Pro
                 false ->
                     case do_create_authenticator(ChainName, AuthenticatorID, Config, Providers) of
                         {ok, Authenticator} ->
-                            NAuthenticators = Authenticators ++ [Authenticator],
+                            NAuthenticators = Authenticators ++ [Authenticator#authenticator{enable = maps:get(enable, Config)}],
                             true = ets:insert(?CHAINS_TAB, Chain#chain{authenticators = NAuthenticators}),
                             {ok, serialize_authenticator(Authenticator)};
                         {error, Reason} ->
@@ -575,7 +595,8 @@ handle_call({update_authenticator, ChainName, AuthenticatorID, Config}, _From, S
                             Unique = unique(ChainName, AuthenticatorID, Version),
                             case Provider:update(Config#{'_unique' => Unique}, ST) of
                                 {ok, NewST} ->
-                                    NewAuthenticator = Authenticator#authenticator{state = switch_version(NewST)},
+                                    NewAuthenticator = Authenticator#authenticator{state = switch_version(NewST),
+                                                                                   enable = maps:get(enable, Config)},
                                     NewAuthenticators = replace_authenticator(AuthenticatorID, NewAuthenticator, Authenticators),
                                     true = ets:insert(?CHAINS_TAB, Chain#chain{authenticators = NewAuthenticators}),
                                     {ok, serialize_authenticator(NewAuthenticator)};
@@ -629,15 +650,15 @@ handle_call({list_users, ChainName, AuthenticatorID}, _From, State) ->
     reply(Reply, State);
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "Unexpected call: ~p", [Req]),
+    ?SLOG(error, #{msg => "unexpected call", req => Req}),
     {reply, ignored, State}.
 
 handle_cast(Req, State) ->
-    ?LOG(error, "Unexpected case: ~p", [Req]),
+    ?SLOG(error, #{msg => "unexpected cast", req => Req}),
     {noreply, State}.
 
 handle_info(Info, State) ->
-    ?LOG(error, "Unexpected info: ~p", [Info]),
+    ?SLOG(error, #{msg => "unexpected info", info => Info}),
     {noreply, State}.
 
 terminate(_Reason, _State) ->

+ 25 - 1
apps/emqx_authn/src/emqx_authn_utils.erl

@@ -18,6 +18,8 @@
 
 -export([ replace_placeholders/2
         , replace_placeholder/2
+        , check_password/3
+        , is_superuser/1
         , hash/4
         , gen_salt/0
         , bin/1
@@ -55,6 +57,28 @@ replace_placeholder(<<"${cert-common-name}">>, Credential) ->
 replace_placeholder(Constant, _) ->
     Constant.
 
+check_password(undefined, _Selected, _State) ->
+    {error, bad_username_or_password};
+check_password(Password,
+               #{<<"password_hash">> := Hash},
+               #{password_hash_algorithm := bcrypt}) ->
+    case {ok, Hash} =:= bcrypt:hashpw(Password, Hash) of
+        true -> ok;
+        false -> {error, bad_username_or_password}
+    end;
+check_password(Password,
+               #{<<"password_hash">> := Hash} = Selected,
+               #{password_hash_algorithm := Algorithm,
+                 salt_position := SaltPosition}) ->
+    Salt = maps:get(<<"salt">>, Selected, <<>>),
+    case Hash =:= hash(Algorithm, Password, Salt, SaltPosition) of
+        true -> ok;
+        false -> {error, bad_username_or_password}
+    end.
+
+is_superuser(Selected) ->
+    #{is_superuser => maps:get(<<"is_superuser">>, Selected, false)}.
+
 hash(Algorithm, Password, Salt, prefix) ->
     emqx_passwd:hash(Algorithm, <<Salt/binary, Password/binary>>);
 hash(Algorithm, Password, Salt, suffix) ->
@@ -75,4 +99,4 @@ bin(X) -> X.
 convert_to_sql_param(undefined) ->
     null;
 convert_to_sql_param(V) ->
-    bin(V).
+    bin(V).

+ 20 - 23
apps/emqx_authn/src/simple_authn/emqx_authn_http.erl

@@ -156,26 +156,23 @@ authenticate(#{auth_method := _}, _) ->
 authenticate(Credential, #{'_unique' := Unique,
                            method := Method,
                            request_timeout := RequestTimeout} = State) ->
-    try
-        Request = generate_request(Credential, State),
-        case emqx_resource:query(Unique, {Method, Request, RequestTimeout}) of
-            {ok, 204, _Headers} -> {ok, #{is_superuser => false}};
-            {ok, 200, Headers, Body} ->
-                ContentType = proplists:get_value(<<"content-type">>, Headers, <<"application/json">>),
-                case safely_parse_body(ContentType, Body) of
-                    {ok, NBody} ->
-                        %% TODO: Return by user property
-                        {ok, #{is_superuser => maps:get(<<"is_superuser">>, NBody, false),
-                               user_property => NBody}};
-                    {error, _Reason} ->
-                        {ok, #{is_superuser => false}}
-                end;
-            {error, _Reason} ->
-                ignore
-        end
-    catch
-        error:Reason ->
-            ?LOG(warning, "The following error occurred in '~s' during authentication: ~p", [Unique, Reason]),
+    Request = generate_request(Credential, State),
+    case emqx_resource:query(Unique, {Method, Request, RequestTimeout}) of
+        {ok, 204, _Headers} -> {ok, #{is_superuser => false}};
+        {ok, 200, Headers, Body} ->
+            ContentType = proplists:get_value(<<"content-type">>, Headers, <<"application/json">>),
+            case safely_parse_body(ContentType, Body) of
+                {ok, NBody} ->
+                    %% TODO: Return by user property
+                    {ok, #{is_superuser => maps:get(<<"is_superuser">>, NBody, false),
+                           user_property => NBody}};
+                {error, _Reason} ->
+                    {ok, #{is_superuser => false}}
+            end;
+        {error, Reason} ->
+            ?SLOG(error, #{msg => "http_server_query_failed",
+                           resource => Unique,
+                           reason => Reason}),
             ignore
     end.
 
@@ -194,9 +191,9 @@ check_url(URL) ->
     end.
 
 check_body(Body) ->
-    lists:any(fun({_, V}) ->
-                  not is_binary(V)
-              end, maps:to_list(Body)).
+    maps:fold(fun(_K, _V, false) -> false;
+                 (_K, V, true) -> is_binary(V)
+              end, true, Body).
 
 default_headers() ->
     maps:put(<<"content-type">>,

+ 13 - 4
apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl

@@ -94,7 +94,9 @@ handle_info({http, {RequestID, Result}},
     State1 = State0#{request_id := undefined},
     case Result of
         {error, Reason} ->
-            ?LOG(error, "Failed to request jwks endpoint(~s): ~p", [Endpoint, Reason]),
+            ?SLOG(warning, #{msg => "failed_to_request_jwks_endpoint",
+                             endpoint => Endpoint,
+                             reason => Reason}),
             State1;
         {_StatusLine, _Headers, Body} ->
             try
@@ -102,7 +104,9 @@ handle_info({http, {RequestID, Result}},
                 {_, JWKs} = JWKS#jose_jwk.keys,
                 State1#{jwks := JWKs}
             catch _:_ ->
-                ?LOG(error, "Invalid jwks returned from jwks endpoint(~s): ~p~n", [Endpoint, Body]),
+                ?SLOG(warning, #{msg => "invalid_jwks_returned",
+                                 endpoint => Endpoint,
+                                 body => Body}),
                 State1
             end
     end;
@@ -136,11 +140,16 @@ handle_options(#{endpoint := Endpoint,
 
 refresh_jwks(#{endpoint := Endpoint,
                ssl_opts := SSLOpts} = State) ->
-    HTTPOpts = [{timeout, 5000}, {connect_timeout, 5000}, {ssl, SSLOpts}],
+    HTTPOpts = [ {timeout, 5000}
+               , {connect_timeout, 5000}
+               , {ssl, SSLOpts}
+               ],
     NState = case httpc:request(get, {Endpoint, [{"Accept", "application/json"}]}, HTTPOpts,
                                 [{body_format, binary}, {sync, false}, {receiver, self()}]) of
                  {error, Reason} ->
-                     ?LOG(error, "Failed to request jwks endpoint(~s): ~p", [Endpoint, Reason]),
+                     ?SLOG(warning, #{msg => "failed_to_request_jwks_endpoint",
+                                      endpoint => Endpoint,
+                                      reason => Reason}),
                      State;
                  {ok, RequestID} ->
                      State#{request_id := RequestID}

+ 21 - 23
apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl

@@ -141,29 +141,27 @@ authenticate(#{password := Password} = Credential,
               , selector := Selector0
               , '_unique' := Unique
               } = State) ->
-    try
-        Selector1 = replace_placeholders(Selector0, Credential),
-        Selector2 = normalize_selector(Selector1),
-        case emqx_resource:query(Unique, {find_one, Collection, Selector2, #{}}) of
-            undefined -> ignore;
-            {error, Reason} ->
-                ?LOG(error, "['~s'] Query failed: ~p", [Unique, Reason]),
-                ignore;
-            Doc ->
-                case check_password(Password, Doc, State) of
-                    ok ->
-                        {ok, #{is_superuser => is_superuser(Doc, State)}};
-                    {error, {cannot_find_password_hash_field, PasswordHashField}} ->
-                        ?LOG(error, "['~s'] Can't find password hash field: ~s", [Unique, PasswordHashField]),
-                        {error, bad_username_or_password};
-                    {error, Reason} ->
-                        {error, Reason}
-                end
-        end
-    catch
-        error:Error ->
-            ?LOG(warning, "The following error occurred in '~s' during authentication: ~p", [Unique, Error]),
-            ignore
+    Selector1 = replace_placeholders(Selector0, Credential),
+    Selector2 = normalize_selector(Selector1),
+    case emqx_resource:query(Unique, {find_one, Collection, Selector2, #{}}) of
+        undefined -> ignore;
+        {error, Reason} ->
+            ?SLOG(error, #{msg => "mongodb_query_failed",
+                           resource => Unique,
+                           reason => Reason}),
+            ignore;
+        Doc ->
+            case check_password(Password, Doc, State) of
+                ok ->
+                    {ok, #{is_superuser => is_superuser(Doc, State)}};
+                {error, {cannot_find_password_hash_field, PasswordHashField}} ->
+                    ?SLOG(error, #{msg => "cannot_find_password_hash_field",
+                                   resource => Unique,
+                                   password_hash_field => PasswordHashField}),
+                    ignore;
+                {error, Reason} ->
+                    {error, Reason}
+            end
     end.
 
 destroy(#{'_unique' := Unique}) ->

+ 15 - 37
apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl

@@ -114,24 +114,21 @@ authenticate(#{password := Password} = Credential,
                query := Query,
                query_timeout := Timeout,
                '_unique' := Unique} = State) ->
-    try
-        Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential),
-        case emqx_resource:query(Unique, {sql, Query, Params, Timeout}) of
-            {ok, _Columns, []} -> ignore;
-            {ok, Columns, Rows} ->
-                Selected = maps:from_list(lists:zip(Columns, Rows)),
-                case check_password(Password, Selected, State) of
-                    ok ->
-                        {ok, #{is_superuser => maps:get(<<"is_superuser">>, Selected, false)}};
-                    {error, Reason} ->
-                        {error, Reason}
-                end;
-            {error, _Reason} ->
-                ignore
-        end
-    catch
-        error:Error ->
-            ?LOG(warning, "The following error occurred in '~s' during authentication: ~p", [Unique, Error]),
+    Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential),
+    case emqx_resource:query(Unique, {sql, Query, Params, Timeout}) of
+        {ok, _Columns, []} -> ignore;
+        {ok, Columns, Rows} ->
+            Selected = maps:from_list(lists:zip(Columns, Rows)),
+            case emqx_authn_utils:check_password(Password, Selected, State) of
+                ok ->
+                    {ok, emqx_authn_utils:is_superuser(Selected)};
+                {error, Reason} ->
+                    {error, Reason}
+            end;
+        {error, Reason} ->
+            ?SLOG(error, #{msg => "mysql_query_failed",
+                           resource => Unique,
+                           reason => Reason}),
             ignore
     end.
 
@@ -143,25 +140,6 @@ destroy(#{'_unique' := Unique}) ->
 %% Internal functions
 %%------------------------------------------------------------------------------
 
-check_password(undefined, _Selected, _State) ->
-    {error, bad_username_or_password};
-check_password(Password,
-               #{<<"password_hash">> := Hash},
-               #{password_hash_algorithm := bcrypt}) ->
-    case {ok, Hash} =:= bcrypt:hashpw(Password, Hash) of
-        true -> ok;
-        false -> {error, bad_username_or_password}
-    end;
-check_password(Password,
-               #{<<"password_hash">> := Hash} = Selected,
-               #{password_hash_algorithm := Algorithm,
-                 salt_position := SaltPosition}) ->
-    Salt = maps:get(<<"salt">>, Selected, <<>>),
-    case Hash =:= emqx_authn_utils:hash(Algorithm, Password, Salt, SaltPosition) of
-        true -> ok;
-        false -> {error, bad_username_or_password}
-    end.
-
 %% TODO: Support prepare
 parse_query(Query) ->
     case re:run(Query, ?RE_PLACEHOLDER, [global, {capture, all, binary}]) of

+ 17 - 39
apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl

@@ -103,25 +103,22 @@ authenticate(#{password := Password} = Credential,
              #{query := Query,
                placeholders := PlaceHolders,
                '_unique' := Unique} = State) ->
-    try
-        Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential),
-        case emqx_resource:query(Unique, {sql, Query, Params}) of
-            {ok, _Columns, []} -> ignore;
-            {ok, Columns, Rows} ->
-                NColumns = [Name || #column{name = Name} <- Columns],
-                Selected = maps:from_list(lists:zip(NColumns, Rows)),
-                case check_password(Password, Selected, State) of
-                    ok ->
-                        {ok, #{is_superuser => maps:get(<<"is_superuser">>, Selected, false)}};
-                    {error, Reason} ->
-                        {error, Reason}
-                end;
-            {error, _Reason} ->
-                ignore
-        end
-    catch
-        error:Error ->
-            ?LOG(warning, "The following error occurred in '~s' during authentication: ~p", [Unique, Error]),
+    Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential),
+    case emqx_resource:query(Unique, {sql, Query, Params}) of
+        {ok, _Columns, []} -> ignore;
+        {ok, Columns, Rows} ->
+            NColumns = [Name || #column{name = Name} <- Columns],
+            Selected = maps:from_list(lists:zip(NColumns, Rows)),
+            case emqx_authn_utils:check_password(Password, Selected, State) of
+                ok ->
+                    {ok, emqx_authn_utils:is_superuser(Selected)};
+                {error, Reason} ->
+                    {error, Reason}
+            end;
+        {error, Reason} ->
+            ?SLOG(error, #{msg => "postgresql_query_failed",
+                           resource => Unique,
+                           reason => Reason}),
             ignore
     end.
 
@@ -133,30 +130,11 @@ destroy(#{'_unique' := Unique}) ->
 %% Internal functions
 %%------------------------------------------------------------------------------
 
-check_password(undefined, _Selected, _State) ->
-    {error, bad_username_or_password};
-check_password(Password,
-               #{<<"password_hash">> := Hash},
-               #{password_hash_algorithm := bcrypt}) ->
-    case {ok, Hash} =:= bcrypt:hashpw(Password, Hash) of
-        true -> ok;
-        false -> {error, bad_username_or_password}
-    end;
-check_password(Password,
-               #{<<"password_hash">> := Hash} = Selected,
-               #{password_hash_algorithm := Algorithm,
-                 salt_position := SaltPosition}) ->
-    Salt = maps:get(<<"salt">>, Selected, <<>>),
-    case Hash =:= emqx_authn_utils:hash(Algorithm, Password, Salt, SaltPosition) of
-        true -> ok;
-        false -> {error, bad_username_or_password}
-    end.
-
 %% TODO: Support prepare
 parse_query(Query) ->
     case re:run(Query, ?RE_PLACEHOLDER, [global, {capture, all, binary}]) of
         {match, Captured} ->
-            PlaceHolders = [PlaceHolder || PlaceHolder <- Captured],
+            PlaceHolders = [PlaceHolder || [PlaceHolder] <- Captured],
             Replacements = ["$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))],
             NQuery = lists:foldl(fun({PlaceHolder, Replacement}, Query0) ->
                                      re:replace(Query0, <<"'\\", PlaceHolder/binary, "'">>, Replacement, [{return, binary}])

+ 22 - 42
apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl

@@ -127,24 +127,26 @@ authenticate(#{password := Password} = Credential,
              #{ query := {Command, Key, Fields}
               , '_unique' := Unique
               } = State) ->
-    try
-        NKey = binary_to_list(iolist_to_binary(replace_placeholders(Key, Credential))),
-        case emqx_resource:query(Unique, {cmd, [Command, NKey | Fields]}) of
-            {ok, Values} ->
-                Selected = merge(Fields, Values),
-                case check_password(Password, Selected, State) of
-                   ok ->
-                       {ok, #{is_superuser => maps:get("is_superuser", Selected, false)}};
-                   {error, Reason} ->
-                       {error, Reason}
-                end;
-            {error, Reason} ->
-                ?LOG(error, "['~s'] Query failed: ~p", [Unique, Reason]),
-                ignore
-        end
-    catch
-        error:{cannot_get_variable, Placeholder} ->
-            ?LOG(warning, "The following error occurred in '~s' during authentication: ~p", [Unique, {cannot_get_variable, Placeholder}]),
+    NKey = binary_to_list(iolist_to_binary(replace_placeholders(Key, Credential))),
+    case emqx_resource:query(Unique, {cmd, [Command, NKey | Fields]}) of
+        {ok, Values} ->
+            case merge(Fields, Values) of
+                #{<<"password_hash">> := _} = Selected ->
+                    case emqx_authn_utils:check_password(Password, Selected, State) of
+                        ok ->
+                            {ok, emqx_authn_utils:is_superuser(Selected)};
+                        {error, Reason} ->
+                            {error, Reason}
+                    end;
+                _ ->
+                    ?SLOG(error, #{msg => "cannot_find_password_hash_field",
+                                   resource => Unique}),
+                    ignore
+            end;
+        {error, Reason} ->
+            ?SLOG(error, #{msg => "redis_query_failed",
+                           resource => Unique,
+                           reason => Reason}),
             ignore
     end.
 
@@ -209,27 +211,5 @@ merge(Fields, Value) when not is_list(Value) ->
     merge(Fields, [Value]);
 merge(Fields, Values) ->
     maps:from_list(
-        lists:filter(fun({_, V}) ->
-                         V =/= undefined
-                     end, lists:zip(Fields, Values))).
-
-check_password(undefined, _Selected, _State) ->
-    {error, bad_username_or_password};
-check_password(Password,
-               #{"password_hash" := PasswordHash},
-               #{password_hash_algorithm := bcrypt}) ->
-    case {ok, PasswordHash} =:= bcrypt:hashpw(Password, PasswordHash) of
-        true -> ok;
-        false -> {error, bad_username_or_password}
-    end;
-check_password(Password,
-               #{"password_hash" := PasswordHash} = Selected,
-               #{password_hash_algorithm := Algorithm,
-                 salt_position := SaltPosition}) ->
-    Salt = maps:get("salt", Selected, <<>>),
-    case PasswordHash =:= emqx_authn_utils:hash(Algorithm, Password, Salt, SaltPosition) of
-        true -> ok;
-        false -> {error, bad_username_or_password}
-    end;
-check_password(_Password, _Selected, _State) ->
-    ignore.
+        [{list_to_binary(K), V}
+            || {K, V} <- lists:zip(Fields, Values), V =/= undefined]).

+ 6 - 4
apps/emqx_connector/src/emqx_connector_mysql.erl

@@ -79,12 +79,14 @@ on_stop(InstId, #{poolname := PoolName}) ->
                   connector => InstId}),
     emqx_plugin_libs_pool:stop_pool(PoolName).
 
-on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) ->
-    on_query(InstId, {sql, SQL, []}, AfterQuery, #{poolname := PoolName} = State);
-on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) ->
+on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) ->
+    on_query(InstId, {sql, SQL, [], default_timeout}, AfterQuery, State);
+on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := _PoolName} = State) ->
+    on_query(InstId, {sql, SQL, Params, default_timeout}, AfterQuery, State);
+on_query(InstId, {sql, SQL, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) ->
     ?SLOG(debug, #{msg => "mysql connector received sql query",
         connector => InstId, sql => SQL, state => State}),
-    case Result = ecpool:pick_and_do(PoolName, {mysql, query, [SQL, Params]}, no_handover) of
+    case Result = ecpool:pick_and_do(PoolName, {mysql, query, [SQL, Params, Timeout]}, no_handover) of
         {error, Reason} ->
             ?SLOG(error, #{msg => "mysql connector do sql query failed",
                 connector => InstId, sql => SQL, reason => Reason}),

+ 2 - 2
apps/emqx_connector/src/emqx_connector_pgsql.erl

@@ -79,8 +79,8 @@ on_stop(InstId, #{poolname := PoolName}) ->
                   connector => InstId}),
     emqx_plugin_libs_pool:stop_pool(PoolName).
 
-on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) ->
-    on_query(InstId, {sql, SQL, []}, AfterQuery, #{poolname := PoolName} = State);
+on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) ->
+    on_query(InstId, {sql, SQL, []}, AfterQuery, State);
 on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) ->
     ?SLOG(debug, #{msg => "postgresql connector received sql query",
         connector => InstId, sql => SQL, state => State}),