Просмотр исходного кода

Merge pull request #5346 from tigercl/feat/mongo-auhtn

feat(authn): support mongodb authn
tigercl 4 лет назад
Родитель
Сommit
59f645dc59

+ 15 - 0
apps/emqx_authn/etc/emqx_authn.conf

@@ -6,6 +6,21 @@ emqx_authn: {
         #     mechanism: password-based
         #     server_type: built-in-database
         #     user_id_type: clientid
+        # },
+        # {
+        #     name: "authenticator2"
+        #     mechanism: password-based
+        #     server_type: mongodb
+        #     server: "127.0.0.1:27017"
+        #     database: mqtt
+        #     collection: users
+        #     selector: {
+        #         username: "${mqtt-username}"
+        #     }
+        #     password_hash_field: password_hash
+        #     salt_field: salt
+        #     password_hash_algorithm: sha256
+        #     salt_position: prefix
         # }
     ]
 }

+ 2 - 22
apps/emqx_authn/include/emqx_authn.hrl

@@ -20,6 +20,8 @@
 -define(VER_1, <<"1">>).
 -define(VER_2, <<"2">>).
 
+-define(RE_PLACEHOLDER, "\\$\\{[a-z0-9\\-]+\\}").
+
 -record(authenticator,
         { id :: binary()
         , name :: binary()
@@ -35,25 +37,3 @@
         }).
 
 -define(AUTH_SHARD, emqx_authn_shard).
-
--define(CLUSTER_CALL(Module, Func, Args), ?CLUSTER_CALL(Module, Func, Args, ok)).
-
--define(CLUSTER_CALL(Module, Func, Args, ResParttern),
-    fun() ->
-        case LocalResult = erlang:apply(Module, Func, Args) of
-            ResParttern ->
-                Nodes = nodes(),
-                {ResL, BadNodes} = rpc:multicall(Nodes, Module, Func, Args, 5000),
-                NResL = lists:zip(Nodes - BadNodes, ResL),
-                Errors = lists:filter(fun({_, ResParttern}) -> false;
-                                         (_) -> true
-                                      end, NResL),
-                OtherErrors = [{BadNode, node_does_not_exist} || BadNode <- BadNodes],
-                case Errors ++ OtherErrors of
-                    [] -> LocalResult;
-                    NErrors -> {error, NErrors}
-                end;
-            ErrorResult ->
-                {error, ErrorResult}
-        end
-    end()).

+ 2 - 0
apps/emqx_authn/src/emqx_authn.erl

@@ -314,6 +314,8 @@ authenticator_provider(#{mechanism := 'password-based', server_type := 'mysql'})
     emqx_authn_mysql;
 authenticator_provider(#{mechanism := 'password-based', server_type := 'pgsql'}) ->
     emqx_authn_pgsql;
+authenticator_provider(#{mechanism := 'password-based', server_type := 'mongodb'}) ->
+    emqx_authn_mongodb;
 authenticator_provider(#{mechanism := 'password-based', server_type := 'http-server'}) ->
     emqx_authn_http;
 authenticator_provider(#{mechanism := jwt}) ->

+ 10 - 2
apps/emqx_authn/src/emqx_authn_api.erl

@@ -775,7 +775,11 @@ definitions() ->
                 default => true
             },
             ssl => minirest:ref(<<"ssl">>),
-            password_hash_algorithm => minirest:ref(<<"password_hash_algorithm">>),
+            password_hash_algorithm => #{
+                type => string,
+                enum => [<<"plain">>, <<"md5">>, <<"sha">>, <<"sha256">>, <<"sha512">>, <<"bcrypt">>],
+                default => <<"sha256">>
+            },
             salt_position => #{
                 type => string,
                 enum => [<<"prefix">>, <<"suffix">>],
@@ -822,7 +826,11 @@ definitions() ->
                 type => boolean,
                 default => true
             },
-            password_hash_algorithm => minirest:ref(<<"password_hash_algorithm">>),
+            password_hash_algorithm => #{
+                type => string,
+                enum => [<<"plain">>, <<"md5">>, <<"sha">>, <<"sha256">>, <<"sha512">>, <<"bcrypt">>],
+                default => <<"sha256">>
+            },
             salt_position => #{
                 type => string,
                 enum => [<<"prefix">>, <<"suffix">>],

+ 3 - 0
apps/emqx_authn/src/emqx_authn_schema.erl

@@ -47,6 +47,9 @@ authenticators(type) ->
     hoconsc:array({union, [ hoconsc:ref(emqx_authn_mnesia, config)
                           , hoconsc:ref(emqx_authn_mysql, config)
                           , hoconsc:ref(emqx_authn_pgsql, config)
+                          , hoconsc:ref(emqx_authn_mongodb, standalone)
+                          , hoconsc:ref(emqx_authn_mongodb, 'replica-set')
+                          , hoconsc:ref(emqx_authn_mongodb, sharded)
                           , hoconsc:ref(emqx_authn_http, get)
                           , hoconsc:ref(emqx_authn_http, post)
                           , hoconsc:ref(emqx_authn_jwt, 'hmac-based')

+ 34 - 21
apps/emqx_authn/src/emqx_authn_utils.erl

@@ -16,36 +16,53 @@
 
 -module(emqx_authn_utils).
 
--export([ replace_placeholder/2
+-export([ replace_placeholders/2
+        , replace_placeholder/2
         , gen_salt/0
+        , bin/1
         ]).
 
 %%------------------------------------------------------------------------------
 %% APIs
 %%------------------------------------------------------------------------------
 
-replace_placeholder(PlaceHolders, Data) ->
-    replace_placeholder(PlaceHolders, Data, []).
+replace_placeholders(PlaceHolders, Data) ->
+    replace_placeholders(PlaceHolders, Data, []).
 
-replace_placeholder([], _Data, Acc) ->
+replace_placeholders([], _Credential, Acc) ->
     lists:reverse(Acc);
-replace_placeholder([<<"${mqtt-username}">> | More], #{username := Username} = Data, Acc) ->
-    replace_placeholder(More, Data, [convert_to_sql_param(Username) | Acc]);
-replace_placeholder([<<"${mqtt-clientid}">> | More], #{clientid := ClientID} = Data, Acc) ->
-    replace_placeholder(More, Data, [convert_to_sql_param(ClientID) | Acc]);
-replace_placeholder([<<"${ip-address}">> | More], #{peerhost := IPAddress} = Data, Acc) ->
-    replace_placeholder(More, Data, [convert_to_sql_param(IPAddress) | Acc]);
-replace_placeholder([<<"${cert-subject}">> | More], #{dn := Subject} = Data, Acc) ->
-    replace_placeholder(More, Data, [convert_to_sql_param(Subject) | Acc]);
-replace_placeholder([<<"${cert-common-name}">> | More], #{cn := CommonName} = Data, Acc) ->
-    replace_placeholder(More, Data, [convert_to_sql_param(CommonName) | Acc]);
-replace_placeholder([_ | More], Data, Acc) ->
-    replace_placeholder(More, Data, [null | Acc]).
+replace_placeholders([Placeholder | More], Credential, Acc) ->
+    case replace_placeholder(Placeholder, Credential) of
+        undefined ->
+            error({cannot_get_variable, Placeholder});
+        V ->
+            replace_placeholders(More, Credential, [convert_to_sql_param(V) | Acc])
+    end.
+
+replace_placeholder(<<"${mqtt-username}">>, Credential) ->
+    maps:get(username, Credential, undefined);
+replace_placeholder(<<"${mqtt-clientid}">>, Credential) ->
+    maps:get(clientid, Credential, undefined);
+replace_placeholder(<<"${mqtt-password}">>, Credential) ->
+    maps:get(password, Credential, undefined);
+replace_placeholder(<<"${ip-address}">>, Credential) ->
+    maps:get(peerhost, Credential, undefined);
+replace_placeholder(<<"${cert-subject}">>, Credential) ->
+    maps:get(dn, Credential, undefined);
+replace_placeholder(<<"${cert-common-name}">>, Credential) ->
+    maps:get(cn, Credential, undefined);
+replace_placeholder(Constant, _) ->
+    Constant.
+
 
 gen_salt() ->
     <<X:128/big-unsigned-integer>> = crypto:strong_rand_bytes(16),
     iolist_to_binary(io_lib:format("~32.16.0b", [X])).
 
+bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
+bin(L) when is_list(L) -> list_to_binary(L);
+bin(X) -> X.
+
 %%------------------------------------------------------------------------------
 %% Internal functions
 %%------------------------------------------------------------------------------
@@ -53,8 +70,4 @@ gen_salt() ->
 convert_to_sql_param(undefined) ->
     null;
 convert_to_sql_param(V) ->
-    bin(V).
-
-bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
-bin(L) when is_list(L) -> list_to_binary(L);
-bin(X) -> X.
+    bin(V).

+ 0 - 1
apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl

@@ -132,7 +132,6 @@ destroy(#{user_group := UserGroup}) ->
                                end, mnesia:select(?TAB, MatchSpec, write))
         end).
 
-%% TODO: binary to atom
 add_user(#{user_id := UserID,
            password := Password}, #{user_group := UserGroup} = State) ->
     trans(

+ 39 - 48
apps/emqx_authn/src/simple_authn/emqx_authn_http.erl

@@ -17,6 +17,7 @@
 -module(emqx_authn_http).
 
 -include("emqx_authn.hrl").
+-include_lib("emqx/include/logger.hrl").
 -include_lib("typerefl/include/types.hrl").
 
 -behaviour(hocon_schema).
@@ -122,15 +123,16 @@ create(#{ method := Method
              , headers         => normalize_headers(Headers)
              , form_data       => maps:to_list(FormData)
              , request_timeout => RequestTimeout
+             , '_unique'       => Unique
              },
     case emqx_resource:create_local(Unique,
                                     emqx_connector_http,
                                     Config#{base_url => maps:remove(query, URIMap),
                                             pool_type => random}) of
         {ok, _} ->
-            {ok, State#{resource_id => Unique}};
+            {ok, State};
         {error, already_created} ->
-            {ok, State#{resource_id => Unique}};
+            {ok, State};
         {error, Reason} ->
             {error, Reason}
     end.
@@ -146,27 +148,33 @@ update(Config, State) ->
 
 authenticate(#{auth_method := _}, _) ->
     ignore;
-authenticate(Credential, #{resource_id := ResourceID,
+authenticate(Credential, #{'_unique' := Unique,
                            method := Method,
                            request_timeout := RequestTimeout} = State) ->
-    Request = generate_request(Credential, State),
-    case emqx_resource:query(ResourceID, {Method, Request, RequestTimeout}) of
-        {ok, 204, _Headers} -> ok;
-        {ok, 200, Headers, Body} ->
-            ContentType = proplists:get_value(<<"content-type">>, Headers, <<"application/json">>),
-            case safely_parse_body(ContentType, Body) of
-                {ok, _NBody} ->
-                    %% TODO: Return by user property
-                    ok;
-                {error, _Reason} ->
-                    ok
-            end;
-        {error, _Reason} ->
+    try
+        Request = generate_request(Credential, State),
+        case emqx_resource:query(Unique, {Method, Request, RequestTimeout}) of
+            {ok, 204, _Headers} -> ok;
+            {ok, 200, Headers, Body} ->
+                ContentType = proplists:get_value(<<"content-type">>, Headers, <<"application/json">>),
+                case safely_parse_body(ContentType, Body) of
+                    {ok, _NBody} ->
+                        %% TODO: Return by user property
+                        ok;
+                    {error, _Reason} ->
+                        ok
+                end;
+            {error, _Reason} ->
+                ignore
+        end
+    catch
+        error:Reason ->
+            ?LOG(warning, "The following error occurred in '~s' during authentication: ~p", [Unique, Reason]),
             ignore
     end.
 
-destroy(#{resource_id := ResourceID}) ->
-    _ = emqx_resource:remove_local(ResourceID),
+destroy(#{'_unique' := Unique}) ->
+    _ = emqx_resource:remove_local(Unique),
     ok.
 
 %%--------------------------------------------------------------------
@@ -242,31 +250,18 @@ generate_request(Credential, #{method := Method,
             {NPath, Headers, Body}
     end.
 
-replace_placeholders(FormData0, Credential) ->
-    FormData = lists:map(fun({K, V0}) ->
-                             case replace_placeholder(V0, Credential) of
-                                 undefined -> {K, undefined};
-                                 V -> {K, bin(V)}
-                             end
-                         end, FormData0),
-    lists:filter(fun({_, V}) ->
-                    V =/= undefined
-                 end, FormData).
-
-replace_placeholder(<<"${mqtt-username}">>, Credential) ->
-    maps:get(username, Credential, undefined);
-replace_placeholder(<<"${mqtt-clientid}">>, Credential) ->
-    maps:get(clientid, Credential, undefined);
-replace_placeholder(<<"${mqtt-password}">>, Credential) ->
-    maps:get(password, Credential, undefined);
-replace_placeholder(<<"${ip-address}">>, Credential) ->
-    maps:get(peerhost, Credential, undefined);
-replace_placeholder(<<"${cert-subject}">>, Credential) ->
-    maps:get(dn, Credential, undefined);
-replace_placeholder(<<"${cert-common-name}">>, Credential) ->
-    maps:get(cn, Credential, undefined);
-replace_placeholder(Constant, _) ->
-    Constant.
+replace_placeholders(KVs, Credential) ->
+    replace_placeholders(KVs, Credential, []).
+
+replace_placeholders([], _Credential, Acc) ->
+    lists:reverse(Acc);
+replace_placeholders([{K, V0} | More], Credential, Acc) ->
+    case emqx_authn_utils:replace_placeholder(V0, Credential) of
+        undefined ->
+            error({cannot_get_variable, V0});
+        V ->
+            replace_placeholders(More, Credential, [{K, emqx_authn_utils:bin(V)} | Acc])
+    end.
 
 append_query(Path, []) ->
     Path;
@@ -300,8 +295,4 @@ parse_body(<<"application/json">>, Body) ->
 parse_body(<<"application/x-www-form-urlencoded">>, Body) ->
     {ok, cow_qs:parse_qs(Body)};
 parse_body(ContentType, _) ->
-    {error, {unsupported_content_type, ContentType}}.
-
-bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
-bin(L) when is_list(L) -> list_to_binary(L);
-bin(X) -> X.
+    {error, {unsupported_content_type, ContentType}}.

+ 227 - 0
apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl

@@ -0,0 +1,227 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_authn_mongodb).
+
+-include("emqx_authn.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("typerefl/include/types.hrl").
+
+-behaviour(hocon_schema).
+
+-export([ structs/0
+        , fields/1
+        ]).
+
+-export([ create/1
+        , update/2
+        , authenticate/2
+        , destroy/1
+        ]).
+
+%%------------------------------------------------------------------------------
+%% Hocon Schema
+%%------------------------------------------------------------------------------
+
+structs() -> [""].
+
+fields("") ->
+    [ {config, {union, [ hoconsc:t(standalone)
+                       , hoconsc:t('replica-set')
+                       , hoconsc:t(sharded)
+                       ]}}
+    ];
+
+fields(standalone) ->
+    common_fields() ++ emqx_connector_mongo:fields(single);
+
+fields('replica-set') ->
+    common_fields() ++ emqx_connector_mongo:fields(rs);
+
+fields(sharded) ->
+    common_fields() ++ emqx_connector_mongo:fields(sharded).
+
+common_fields() ->
+    [ {name,                    fun emqx_authn_schema:authenticator_name/1}
+    , {mechanism,               {enum, ['password-based']}}
+    , {server_type,             {enum, [mongodb]}}
+    , {collection,              fun collection/1}
+    , {selector,                fun selector/1}
+    , {password_hash_field,     fun password_hash_field/1}
+    , {salt_field,              fun salt_field/1}
+    , {password_hash_algorithm, fun password_hash_algorithm/1}
+    , {salt_position,           fun salt_position/1}
+    ].
+
+collection(type) -> binary();
+collection(nullable) -> false;
+collection(_) -> undefined.
+
+selector(type) -> map();
+selector(nullable) -> false;
+selector(_) -> undefined.
+
+password_hash_field(type) -> binary();
+password_hash_field(nullable) -> false;
+password_hash_field(_) -> undefined.
+
+salt_field(type) -> binary();
+salt_field(nullable) -> true;
+salt_field(_) -> undefined.
+
+password_hash_algorithm(type) -> {enum, [plain, md5, sha, sha256, sha512, bcrypt]};
+password_hash_algorithm(default) -> sha256;
+password_hash_algorithm(_) -> undefined.
+
+salt_position(type) -> {enum, [prefix, suffix]};
+salt_position(default) -> prefix;
+salt_position(_) -> undefined.
+
+%%------------------------------------------------------------------------------
+%% APIs
+%%------------------------------------------------------------------------------
+
+create(#{ selector := Selector
+        , '_unique' := Unique
+        } = Config) ->
+    NSelector = parse_selector(Selector),
+    State = maps:with([ collection
+                      , password_hash_field
+                      , salt_field
+                      , password_hash_algorithm
+                      , salt_position
+                      , '_unique'], Config),
+    NState = State#{selector => NSelector},
+    case emqx_resource:create_local(Unique, emqx_connector_mongo, Config) of
+        {ok, _} ->
+            {ok, NState};
+        {error, already_created} ->
+            {ok, NState};
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+update(Config, State) ->
+    case create(Config) of
+        {ok, NewState} ->
+            ok = destroy(State),
+            {ok, NewState};
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+authenticate(#{auth_method := _}, _) ->
+    ignore;
+authenticate(#{password := Password} = Credential,
+             #{ collection := Collection
+              , selector := Selector0
+              , '_unique' := Unique
+              } = State) ->
+    try
+        Selector1 = replace_placeholders(Selector0, Credential),
+        Selector2 = normalize_selector(Selector1),
+        case emqx_resource:query(Unique, {find_one, Collection, Selector2, #{}}) of
+            undefined -> ignore;
+            {error, Reason} ->
+                ?LOG(error, "['~s'] Query failed: ~p", [Unique, Reason]),
+                ignore;
+            Doc ->
+                case check_password(Password, Doc, State) of
+                    ok -> ok;
+                    {error, {cannot_find_password_hash_field, PasswordHashField}} ->
+                        ?LOG(error, "['~s'] Can't find password hash field: ~s", [Unique, PasswordHashField]),
+                        {error, bad_username_or_password};
+                    {error, Reason} ->
+                        {error, Reason}
+                end
+        end
+    catch
+        error:Error ->
+            ?LOG(warning, "The following error occurred in '~s' during authentication: ~p", [Unique, Error]),
+            ignore
+    end.
+
+destroy(#{'_unique' := Unique}) ->
+    _ = emqx_resource:remove_local(Unique),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Internal functions
+%%------------------------------------------------------------------------------
+
+parse_selector(Selector) ->
+    NSelector = emqx_json:encode(Selector),
+    Tokens = re:split(NSelector, "(" ++ ?RE_PLACEHOLDER ++ ")", [{return, binary}, group, trim]),
+    parse_selector(Tokens, []).
+
+parse_selector([], Acc) ->
+    lists:reverse(Acc);
+parse_selector([[Constant, Placeholder] | Tokens], Acc) ->
+    parse_selector(Tokens, [{placeholder, Placeholder}, {constant, Constant} | Acc]);
+parse_selector([[Constant] | Tokens], Acc) ->
+    parse_selector(Tokens, [{constant, Constant} | Acc]).
+
+replace_placeholders(Selector, Credential) ->
+    lists:map(fun({constant, Constant}) ->
+                  Constant;
+                 ({placeholder, Placeholder}) ->
+                  case emqx_authn_utils:replace_placeholder(Placeholder, Credential) of
+                      undefined -> error({cannot_get_variable, Placeholder});
+                      Value -> Value
+                  end
+              end, Selector).
+
+normalize_selector(Selector) ->
+    emqx_json:decode(iolist_to_binary(Selector), [return_maps]).
+
+check_password(undefined, _Selected, _State) ->
+    {error, bad_username_or_password};
+check_password(Password,
+               Doc,
+               #{password_hash_algorithm := bcrypt,
+                 password_hash_field := PasswordHashField}) ->
+    case maps:get(PasswordHashField, Doc, undefined) of
+        undefined ->
+            {error, {cannot_find_password_hash_field, PasswordHashField}};
+        Hash ->
+            case {ok, Hash} =:= bcrypt:hashpw(Password, Hash) of
+                true -> ok;
+                false -> {error, bad_username_or_password}
+            end
+    end;
+check_password(Password,
+               Doc,
+               #{password_hash_algorithm := Algorithm,
+                 password_hash_field := PasswordHashField,
+                 salt_position := SaltPosition} = State) ->
+    case maps:get(PasswordHashField, Doc, undefined) of
+        undefined ->
+            {error, {cannot_find_password_hash_field, PasswordHashField}};
+        Hash ->
+            Salt = case maps:get(salt_field, State, undefined) of
+                       undefined -> <<>>;
+                       SaltField -> maps:get(SaltField, Doc, <<>>)
+                   end,
+            case Hash =:= hash(Algorithm, Password, Salt, SaltPosition) of
+                true -> ok;
+                false -> {error, bad_username_or_password}
+            end
+    end.
+
+hash(Algorithm, Password, Salt, prefix) ->
+    emqx_passwd:hash(Algorithm, <<Salt/binary, Password/binary>>);
+hash(Algorithm, Password, Salt, suffix) ->
+    emqx_passwd:hash(Algorithm, <<Password/binary, Salt/binary>>).

+ 30 - 36
apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl

@@ -17,6 +17,7 @@
 -module(emqx_authn_mysql).
 
 -include("emqx_authn.hrl").
+-include_lib("emqx/include/logger.hrl").
 -include_lib("typerefl/include/types.hrl").
 
 -behaviour(hocon_schema).
@@ -46,25 +47,12 @@ fields(config) ->
     , {query,                   fun query/1}
     , {query_timeout,           fun query_timeout/1}
     ] ++ emqx_connector_schema_lib:relational_db_fields()
-    ++ emqx_connector_schema_lib:ssl_fields();
+    ++ emqx_connector_schema_lib:ssl_fields().
 
-fields(bcrypt) ->
-    [ {name, {enum, [bcrypt]}}
-    , {salt_rounds, fun salt_rounds/1}
-    ];
-
-fields(other_algorithms) ->
-    [ {name, {enum, [plain, md5, sha, sha256, sha512]}}
-    ].
-
-password_hash_algorithm(type) -> {union, [hoconsc:ref(bcrypt), hoconsc:ref(other_algorithms)]};
-password_hash_algorithm(default) -> #{<<"name">> => sha256};
+password_hash_algorithm(type) -> {enum, [plain, md5, sha, sha256, sha512, bcrypt]};
+password_hash_algorithm(default) -> sha256;
 password_hash_algorithm(_) -> undefined.
 
-salt_rounds(type) -> integer();
-salt_rounds(default) -> 10;
-salt_rounds(_) -> undefined.
-
 salt_position(type) -> {enum, [prefix, suffix]};
 salt_position(default) -> prefix;
 salt_position(_) -> undefined.
@@ -92,12 +80,13 @@ create(#{ password_hash_algorithm := Algorithm
               salt_position => SaltPosition,
               query => Query,
               placeholders => PlaceHolders,
-              query_timeout => QueryTimeout},
+              query_timeout => QueryTimeout,
+              '_unique' => Unique},
     case emqx_resource:create_local(Unique, emqx_connector_mysql, Config) of
         {ok, _} ->
-            {ok, State#{resource_id => Unique}};
+            {ok, State};
         {error, already_created} ->
-            {ok, State#{resource_id => Unique}};
+            {ok, State};
         {error, Reason} ->
             {error, Reason}
     end.
@@ -114,36 +103,41 @@ update(Config, State) ->
 authenticate(#{auth_method := _}, _) ->
     ignore;
 authenticate(#{password := Password} = Credential,
-             #{resource_id := ResourceID,
-               placeholders := PlaceHolders,
+             #{placeholders := PlaceHolders,
                query := Query,
-               query_timeout := Timeout} = State) ->
-    Params = emqx_authn_utils:replace_placeholder(PlaceHolders, Credential),
-    case emqx_resource:query(ResourceID, {sql, Query, Params, Timeout}) of
-        {ok, _Columns, []} -> ignore;
-        {ok, Columns, Rows} ->
-            %% TODO: Support superuser
-            Selected = maps:from_list(lists:zip(Columns, Rows)),
-            check_password(Password, Selected, State);
-        {error, _Reason} ->
+               query_timeout := Timeout,
+               '_unique' := Unique} = State) ->
+    try
+        Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential),
+        case emqx_resource:query(Unique, {sql, Query, Params, Timeout}) of
+            {ok, _Columns, []} -> ignore;
+            {ok, Columns, Rows} ->
+                %% TODO: Support superuser
+                Selected = maps:from_list(lists:zip(Columns, Rows)),
+                check_password(Password, Selected, State);
+            {error, _Reason} ->
+                ignore
+        end
+    catch
+        error:Reason ->
+            ?LOG(warning, "The following error occurred in '~s' during authentication: ~p", [Unique, Reason]),
             ignore
     end.
 
-destroy(#{resource_id := ResourceID}) ->
-    _ = emqx_resource:remove_local(ResourceID),
+destroy(#{'_unique' := Unique}) ->
+    _ = emqx_resource:remove_local(Unique),
     ok.
     
 %%------------------------------------------------------------------------------
 %% Internal functions
 %%------------------------------------------------------------------------------
 
-check_password(undefined, _Algorithm, _Selected) ->
+check_password(undefined, _Selected, _State) ->
     {error, bad_username_or_password};
 check_password(Password,
                #{password_hash := Hash},
                #{password_hash_algorithm := bcrypt}) ->
-    {ok, Hash0} = bcrypt:hashpw(Password, Hash),
-    case list_to_binary(Hash0) =:= Hash of
+    case {ok, Hash} =:= bcrypt:hashpw(Password, Hash) of
         true -> ok;
         false -> {error, bad_username_or_password}
     end;
@@ -163,7 +157,7 @@ check_password(Password,
 
 %% TODO: Support prepare
 parse_query(Query) ->
-    case re:run(Query, "\\$\\{[a-z0-9\\_]+\\}", [global, {capture, all, binary}]) of
+    case re:run(Query, ?RE_PLACEHOLDER, [global, {capture, all, binary}]) of
         {match, Captured} ->
             PlaceHolders = [PlaceHolder || PlaceHolder <- Captured],
             NQuery = re:replace(Query, "'\\$\\{[a-z0-9\\_]+\\}'", "?", [global, {return, binary}]),

+ 29 - 21
apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl

@@ -17,6 +17,7 @@
 -module(emqx_authn_pgsql).
 
 -include("emqx_authn.hrl").
+-include_lib("emqx/include/logger.hrl").
 -include_lib("typerefl/include/types.hrl").
 
 -behaviour(hocon_schema).
@@ -45,7 +46,8 @@ fields(config) ->
     ] ++ emqx_connector_schema_lib:relational_db_fields()
     ++ emqx_connector_schema_lib:ssl_fields().
 
-password_hash_algorithm(type) -> string();
+password_hash_algorithm(type) -> {enum, [plain, md5, sha, sha256, sha512, bcrypt]};
+password_hash_algorithm(default) -> sha256;
 password_hash_algorithm(_) -> undefined.
 
 query(type) -> string();
@@ -65,12 +67,13 @@ create(#{ query := Query0
     State = #{query => Query,
               placeholders => PlaceHolders,
               password_hash_algorithm => Algorithm,
-              salt_position => SaltPosition},
+              salt_position => SaltPosition,
+              '_unique' => Unique},
     case emqx_resource:create_local(Unique, emqx_connector_pgsql, Config) of
         {ok, _} ->
-            {ok, State#{resource_id => Unique}};
+            {ok, State};
         {error, already_created} ->
-            {ok, State#{resource_id => Unique}};
+            {ok, State};
         {error, Reason} ->
             {error, Reason}
     end.
@@ -87,35 +90,40 @@ update(Config, State) ->
 authenticate(#{auth_method := _}, _) ->
     ignore;
 authenticate(#{password := Password} = Credential,
-             #{resource_id := ResourceID,
-               query := Query,
-               placeholders := PlaceHolders} = State) ->
-    Params = emqx_authn_utils:replace_placeholder(PlaceHolders, Credential),
-    case emqx_resource:query(ResourceID, {sql, Query, Params}) of
-        {ok, _Columns, []} -> ignore;
-        {ok, Columns, Rows} ->
-            %% TODO: Support superuser
-            Selected = maps:from_list(lists:zip(Columns, Rows)),
-            check_password(Password, Selected, State);
-        {error, _Reason} ->
+             #{query := Query,
+               placeholders := PlaceHolders,
+               '_unique' := Unique} = State) ->
+    try
+        Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential),
+        case emqx_resource:query(Unique, {sql, Query, Params}) of
+            {ok, _Columns, []} -> ignore;
+            {ok, Columns, Rows} ->
+                %% TODO: Support superuser
+                Selected = maps:from_list(lists:zip(Columns, Rows)),
+                check_password(Password, Selected, State);
+            {error, _Reason} ->
+                ignore
+        end
+    catch
+        error:Reason ->
+            ?LOG(warning, "The following error occurred in '~s' during authentication: ~p", [Unique, Reason]),
             ignore
     end.
 
-destroy(#{resource_id := ResourceID}) ->
-    _ = emqx_resource:remove_local(ResourceID),
+destroy(#{'_unique' := Unique}) ->
+    _ = emqx_resource:remove_local(Unique),
     ok.
     
 %%------------------------------------------------------------------------------
 %% Internal functions
 %%------------------------------------------------------------------------------
 
-check_password(undefined, _Algorithm, _Selected) ->
+check_password(undefined, _Selected, _State) ->
     {error, bad_username_or_password};
 check_password(Password,
                #{password_hash := Hash},
                #{password_hash_algorithm := bcrypt}) ->
-    {ok, Hash0} = bcrypt:hashpw(Password, Hash),
-    case list_to_binary(Hash0) =:= Hash of
+    case {ok, Hash} =:= bcrypt:hashpw(Password, Hash) of
         true -> ok;
         false -> {error, bad_username_or_password}
     end;
@@ -135,7 +143,7 @@ check_password(Password,
 
 %% TODO: Support prepare
 parse_query(Query) ->
-    case re:run(Query, "\\$\\{[a-z0-9\\_]+\\}", [global, {capture, all, binary}]) of
+    case re:run(Query, ?RE_PLACEHOLDER, [global, {capture, all, binary}]) of
         {match, Captured} ->
             PlaceHolders = [PlaceHolder || PlaceHolder <- Captured],
             Replacements = ["$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))],

+ 6 - 3
apps/emqx_connector/src/emqx_connector_mongo.erl

@@ -149,11 +149,14 @@ connect(Opts) ->
     WorkerOptions = proplists:get_value(worker_options, Opts, []),
     mongo_api:connect(Type, Hosts, Options, WorkerOptions).
 
-mongo_query(Conn, find, Collection, Selector, Docs) ->
-    mongo_api:find(Conn, Collection, Selector, Docs);
+mongo_query(Conn, find, Collection, Selector, Projector) ->
+    mongo_api:find(Conn, Collection, Selector, Projector);
+
+mongo_query(Conn, find_one, Collection, Selector, Projector) ->
+    mongo_api:find_one(Conn, Collection, Selector, Projector);
 
 %% Todo xxx
-mongo_query(_Conn, _Action, _Collection, _Selector, _Docs) ->
+mongo_query(_Conn, _Action, _Collection, _Selector, _Projector) ->
     ok.
 
 do_start(InstId, Opts0, Config = #{mongo_type := Type,

+ 1 - 1
apps/emqx_connector/src/emqx_connector_schema_lib.erl

@@ -71,7 +71,7 @@ ssl_fields() ->
                        [ hoconsc:ref(?MODULE, ssl_on)
                        , hoconsc:ref(?MODULE, ssl_off)
                        ]),
-              default => hoconsc:ref(?MODULE, ssl_off)
+              default => #{<<"enable">> => false}
              }
       }
     ].