Преглед изворни кода

refactor(connector): db connector provide default port

JimMoen пре 4 година
родитељ
комит
ab7c2b72e3

+ 1 - 2
apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl

@@ -50,8 +50,7 @@ fields(?CONF_NS) ->
     , {query, fun query/1}
     , {query_timeout, fun query_timeout/1}
     ] ++ emqx_authn_schema:common_fields()
-    ++ emqx_connector_schema_lib:relational_db_fields()
-    ++ emqx_connector_schema_lib:ssl_fields().
+    ++ emqx_connector_mysql:fields(config).
 
 query(type) -> string();
 query(_) -> undefined.

+ 3 - 3
apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl

@@ -54,9 +54,9 @@ fields(?CONF_NS) ->
     , {backend, emqx_authn_schema:backend(postgresql)}
     , {password_hash_algorithm, fun emqx_authn_password_hashing:type_ro/1}
     , {query, fun query/1}
-    ] ++ emqx_authn_schema:common_fields()
-    ++ emqx_connector_schema_lib:relational_db_fields()
-    ++ emqx_connector_schema_lib:ssl_fields().
+    ] ++
+    emqx_authn_schema:common_fields() ++
+    proplists:delete(named_queries, emqx_connector_pgsql:fields(config)).
 
 query(type) -> string();
 query(_) -> undefined.

+ 2 - 4
apps/emqx_authz/src/emqx_authz_schema.erl

@@ -144,12 +144,10 @@ fields(redis_cluster) ->
     [ {cmd, query()} ].
 
 http_common_fields() ->
-    [ {type, #{type => http}}
-    , {enable, #{type => boolean(), default => true}}
-    , {url, fun url/1}
+    [ {url, fun url/1}
     , {request_timeout, mk_duration("request timeout", #{default => "30s"})}
     , {body, #{type => map(), nullable => true}}
-    ] ++ proplists:delete(base_url, emqx_connector_http:fields(config)).
+    ] ++ proplists:delete(base_url, connector_fields(http)).
 
 mongo_common_fields() ->
     [ {collection, #{type => atom()}}

+ 31 - 0
apps/emqx_connector/include/emqx_connector.hrl

@@ -1,4 +1,35 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
 -define(VALID, emqx_resource_validator).
 -define(NOT_EMPTY(MSG), ?VALID:not_empty(MSG)).
 -define(MAX(MAXV), ?VALID:max(number, MAXV)).
 -define(MIN(MINV), ?VALID:min(number, MINV)).
+
+-define(MYSQL_DEFAULT_PORT, 3306).
+-define(MONGO_DEFAULT_PORT, 27017).
+-define(REDIS_DEFAULT_PORT, 6379).
+-define(PGSQL_DEFAULT_PORT, 5432).
+
+-define(SERVERS_DESC, "A Node list for Cluster to connect to. The nodes should be splited with ',', such as: 'Node[,Node]'<br>\nFor each Node should be:<br>").
+
+-define(SERVER_DESC(TYPE, DEFAULT_PORT), """
+The IPv4 or IPv6 address or host name to connect to.<br>
+A host entry has the following form: 'Host[:Port]'<br>
+The """ ++ TYPE ++ " default port " ++ DEFAULT_PORT ++ " is used if '[:Port]' isn't present"
+).
+
+-define(THROW_ERROR(Str), erlang:throw({error, Str})).

+ 23 - 1
apps/emqx_connector/src/emqx_connector_ldap.erl

@@ -135,7 +135,7 @@ connect(Opts) ->
     {ok, LDAP}.
 
 ldap_fields() ->
-    [ {servers, fun emqx_connector_schema_lib:servers/1}
+    [ {servers, fun servers/1}
     , {port, fun port/1}
     , {pool_size, fun emqx_connector_schema_lib:pool_size/1}
     , {bind_dn, fun bind_dn/1}
@@ -144,6 +144,11 @@ ldap_fields() ->
     , {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
     ].
 
+servers(type) -> list();
+servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")];
+servers(converter) -> fun to_servers_raw/1;
+servers(_) -> undefined.
+
 bind_dn(type) -> binary();
 bind_dn(default) -> 0;
 bind_dn(_) -> undefined.
@@ -154,3 +159,20 @@ port(_) -> undefined.
 
 duration(type) -> emqx_schema:duration_ms();
 duration(_) -> undefined.
+
+to_servers_raw(Servers) ->
+    {ok, lists:map( fun(Server) ->
+                        case string:tokens(Server, ": ") of
+                            [Ip] ->
+                                [{host, Ip}];
+                            [Ip, Port] ->
+                                [{host, Ip}, {port, list_to_integer(Port)}]
+                        end
+                    end, string:tokens(str(Servers), ", "))}.
+
+str(A) when is_atom(A) ->
+    atom_to_list(A);
+str(B) when is_binary(B) ->
+    binary_to_list(B);
+str(S) when is_list(S) ->
+    S.

+ 28 - 9
apps/emqx_connector/src/emqx_connector_mongo.erl

@@ -20,10 +20,6 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
--type server() :: emqx_schema:ip_port().
--reflect_type([server/0]).
--typerefl_from_string({server/0, emqx_connector_schema_lib, to_ip_port}).
-
 -behaviour(emqx_resource).
 
 %% callbacks of behaviour emqx_resource
@@ -42,13 +38,18 @@
 
 -define(HEALTH_CHECK_TIMEOUT, 10000).
 
+%% mongo servers don't need parse
+-define( MONGO_HOST_OPTIONS
+       , #{ host_type => hostname
+          , default_port => ?MONGO_DEFAULT_PORT}).
+
 %%=====================================================================
 roots() ->
     [ {config, #{type => hoconsc:union(
-                          [ hoconsc:ref(?MODULE, single)
-                          , hoconsc:ref(?MODULE, rs)
-                          , hoconsc:ref(?MODULE, sharded)
-                          ])}}
+                           [ hoconsc:ref(?MODULE, single)
+                           , hoconsc:ref(?MODULE, rs)
+                           , hoconsc:ref(?MODULE, sharded)
+                           ])}}
     ].
 
 fields(single) ->
@@ -284,11 +285,18 @@ init_worker_options([_ | R], Acc) ->
     init_worker_options(R, Acc);
 init_worker_options([], Acc) -> Acc.
 
-server(type) -> binary();
+%% ===================================================================
+%% Schema funcs
+
+server(type) -> emqx_schema:ip_port();
+server(nullable) -> false;
 server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")];
+server(converter) -> fun to_server_raw/1;
+server(desc) -> ?SERVER_DESC("MongoDB", integer_to_list(?MONGO_DEFAULT_PORT));
 server(_) -> undefined.
 
 servers(type) -> binary();
+servers(nullable) -> false;
 servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")];
 servers(_) -> undefined.
 
@@ -312,6 +320,9 @@ srv_record(type) -> boolean();
 srv_record(default) -> false;
 srv_record(_) -> undefined.
 
+%% ===================================================================
+%% Internal funcs
+
 parse_servers(Type, Servers) when is_binary(Servers) ->
     parse_servers(Type, binary_to_list(Servers));
 parse_servers(Type, Servers) when is_list(Servers) ->
@@ -395,3 +406,11 @@ take_and_convert([Field | More], Options, Acc) ->
         false ->
             take_and_convert(More, Options, Acc)
     end.
+
+%% ===================================================================
+%% typereflt funcs
+
+-spec to_server_raw(string())
+      -> {string(), pos_integer()}.
+to_server_raw(Server) ->
+    emqx_connector_schema_lib:parse_server(Server, ?MONGO_HOST_OPTIONS).

+ 19 - 0
apps/emqx_connector/src/emqx_connector_mysql.erl

@@ -15,6 +15,7 @@
 %%--------------------------------------------------------------------
 -module(emqx_connector_mysql).
 
+-include("emqx_connector.hrl").
 -include_lib("typerefl/include/types.hrl").
 -include_lib("emqx/include/logger.hrl").
 
@@ -33,15 +34,28 @@
 
 -export([do_health_check/1]).
 
+-define( MYSQL_HOST_OPTIONS
+       , #{ host_type => inet_addr
+          , default_port => ?MYSQL_DEFAULT_PORT}).
+
 %%=====================================================================
 %% Hocon schema
 roots() ->
     [{config, #{type => hoconsc:ref(?MODULE, config)}}].
 
 fields(config) ->
+    [ {server, fun server/1}
+    ] ++
     emqx_connector_schema_lib:relational_db_fields() ++
     emqx_connector_schema_lib:ssl_fields().
 
+server(type) -> emqx_schema:ip_port();
+server(nullable) -> false;
+server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")];
+server(converter) -> fun to_server/1;
+server(desc) -> ?SERVER_DESC("MySQL", integer_to_list(?MYSQL_DEFAULT_PORT));
+server(_) -> undefined.
+
 %% ===================================================================
 on_start(InstId, #{server := {Host, Port},
                    database := DB,
@@ -106,3 +120,8 @@ reconn_interval(false) -> false.
 
 connect(Options) ->
     mysql:start_link(Options).
+
+-spec to_server(string())
+      -> {inet:ip_address() | inet:hostname(), pos_integer()}.
+to_server(Str) ->
+    emqx_connector_schema_lib:parse_server(Str, ?MYSQL_HOST_OPTIONS).

+ 23 - 1
apps/emqx_connector/src/emqx_connector_pgsql.erl

@@ -15,6 +15,7 @@
 %%--------------------------------------------------------------------
 -module(emqx_connector_pgsql).
 
+-include("emqx_connector.hrl").
 -include_lib("typerefl/include/types.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("epgsql/include/epgsql.hrl").
@@ -38,13 +39,19 @@
 
 -export([do_health_check/1]).
 
+-define( PGSQL_HOST_OPTIONS
+       , #{ host_type => inet_addr
+          , default_port => ?PGSQL_DEFAULT_PORT}).
+
+
 %%=====================================================================
 
 roots() ->
     [{config, #{type => hoconsc:ref(?MODULE, config)}}].
 
 fields(config) ->
-    [{named_queries, fun named_queries/1}] ++
+    [ {named_queries, fun named_queries/1}
+    , {server, fun server/1}] ++
     emqx_connector_schema_lib:relational_db_fields() ++
     emqx_connector_schema_lib:ssl_fields().
 
@@ -52,6 +59,13 @@ named_queries(type) -> map();
 named_queries(nullable) -> true;
 named_queries(_) -> undefined.
 
+server(type) -> emqx_schema:ip_port();
+server(nullable) -> false;
+server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")];
+server(converter) -> fun to_server/1;
+server(desc) -> ?SERVER_DESC("PostgreSQL", integer_to_list(?PGSQL_DEFAULT_PORT));
+server(_) -> undefined.
+
 %% ===================================================================
 on_start(InstId, #{server := {Host, Port},
                    database := DB,
@@ -163,3 +177,11 @@ conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
     conn_opts(Opts, [Opt | Acc]);
 conn_opts([_Opt | Opts], Acc) ->
     conn_opts(Opts, Acc).
+
+%% ===================================================================
+%% typereflt funcs
+
+-spec to_server(string())
+      -> {inet:ip_address() | inet:hostname(), pos_integer()}.
+to_server(Str) ->
+    emqx_connector_schema_lib:parse_server(Str, ?PGSQL_HOST_OPTIONS).

+ 42 - 39
apps/emqx_connector/src/emqx_connector_redis.erl

@@ -19,21 +19,6 @@
 -include_lib("typerefl/include/types.hrl").
 -include_lib("emqx/include/logger.hrl").
 
--type server() :: tuple().
-%% {"127.0.0.1", 7000}
-%% For eredis:start_link/1~7
--reflect_type([server/0]).
--typerefl_from_string({server/0, ?MODULE, to_server}).
-
--type servers() :: list().
-%% [{"127.0.0.1", 7000}, {"127.0.0.2", 7000}]
-%% For eredis_cluster
--reflect_type([servers/0]).
--typerefl_from_string({servers/0, ?MODULE, to_servers}).
-
--export([ to_server/1
-        , to_servers/1]).
-
 -export([roots/0, fields/1]).
 
 -behaviour(emqx_resource).
@@ -51,6 +36,12 @@
 
 -export([cmd/3]).
 
+%% redis host don't need parse
+-define( REDIS_HOST_OPTIONS
+       , #{ host_type => hostname
+          , default_port => ?REDIS_DEFAULT_PORT}).
+
+
 %%=====================================================================
 roots() ->
     [ {config, #{type => hoconsc:union(
@@ -62,21 +53,21 @@ roots() ->
     ].
 
 fields(single) ->
-    [ {server, #{type => server()}}
+    [ {server, fun server/1}
     , {redis_type, #{type => hoconsc:enum([single]),
                      default => single}}
     ] ++
     redis_fields() ++
     emqx_connector_schema_lib:ssl_fields();
 fields(cluster) ->
-    [ {servers, #{type => servers()}}
+    [ {servers, fun servers/1}
     , {redis_type, #{type => hoconsc:enum([cluster]),
                      default => cluster}}
     ] ++
     redis_fields() ++
     emqx_connector_schema_lib:ssl_fields();
 fields(sentinel) ->
-    [ {servers, #{type => servers()}}
+    [ {servers, fun servers/1}
     , {redis_type, #{type => hoconsc:enum([sentinel]),
                      default => sentinel}}
     , {sentinel, #{type => string()}}
@@ -84,6 +75,20 @@ fields(sentinel) ->
     redis_fields() ++
     emqx_connector_schema_lib:ssl_fields().
 
+server(type) -> emqx_schema:ip_port();
+server(nullable) -> false;
+server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")];
+server(converter) -> fun to_server_raw/1;
+server(desc) -> ?SERVER_DESC("Redis", integer_to_list(?REDIS_DEFAULT_PORT));
+server(_) -> undefined.
+
+servers(type) -> list();
+servers(nullable) -> false;
+servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")];
+servers(converter) -> fun to_servers_raw/1;
+servers(desc) -> ?SERVERS_DESC ++ server(desc);
+servers(_) -> undefined.
+
 %% ===================================================================
 on_start(InstId, #{redis_type := Type,
                    database := Database,
@@ -185,24 +190,22 @@ redis_fields() ->
     , {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
     ].
 
-to_server(Server) ->
-    try {ok, parse_server(Server)}
-    catch
-        throw : Error  ->
-            Error
-    end.
-
-to_servers(Servers) ->
-    try {ok, lists:map(fun parse_server/1, string:tokens(Servers, ", "))}
-    catch
-        throw : _Reason ->
-            {error, Servers}
-    end.
-
-parse_server(Server) ->
-    case string:tokens(Server, ": ") of
-        [Host, Port] ->
-            {Host, list_to_integer(Port)};
-        _ ->
-            throw({error, Server})
-    end.
+-spec to_server_raw(string())
+      -> {string(), pos_integer()}.
+to_server_raw(Server) ->
+    emqx_connector_schema_lib:parse_server(Server, ?REDIS_HOST_OPTIONS).
+
+-spec to_servers_raw(string())
+      -> [{string(), pos_integer()}].
+to_servers_raw(Servers) ->
+    lists:map( fun(Server) ->
+                   emqx_connector_schema_lib:parse_server(Server, ?REDIS_HOST_OPTIONS)
+               end
+             , string:tokens(str(Servers), ", ")).
+
+str(A) when is_atom(A) ->
+    atom_to_list(A);
+str(B) when is_binary(B) ->
+    binary_to_list(B);
+str(S) when is_list(S) ->
+    S.

+ 15 - 0
apps/emqx_connector/src/emqx_connector_schema.erl

@@ -1,3 +1,18 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
 -module(emqx_connector_schema).
 
 -behaviour(hocon_schema).

+ 55 - 40
apps/emqx_connector/src/emqx_connector_schema_lib.erl

@@ -22,34 +22,27 @@
         , ssl_fields/0
         ]).
 
--export([ to_ip_port/1
-        , ip_port_to_string/1
-        , to_servers/1
+-export([ ip_port_to_string/1
+        , parse_server/2
         ]).
 
 -export([ pool_size/1
         , database/1
         , username/1
         , password/1
-        , servers/1
         , auto_reconnect/1
         ]).
 
--typerefl_from_string({ip_port/0, emqx_connector_schema_lib, to_ip_port}).
--typerefl_from_string({servers/0, emqx_connector_schema_lib, to_servers}).
-
 -type database() :: binary().
 -type pool_size() :: integer().
 -type username() :: binary().
 -type password() :: binary().
--type servers() :: list().
 
 -reflect_type([ database/0
               , pool_size/0
               , username/0
               , password/0
-              , servers/0
-             ]).
+              ]).
 
 -export([roots/0, fields/1]).
 
@@ -65,19 +58,13 @@ ssl_fields() ->
     ].
 
 relational_db_fields() ->
-    [ {server, fun server/1}
-    , {database, fun database/1}
+    [ {database, fun database/1}
     , {pool_size, fun pool_size/1}
     , {username, fun username/1}
     , {password, fun password/1}
     , {auto_reconnect, fun auto_reconnect/1}
     ].
 
-server(type) -> emqx_schema:ip_port();
-server(nullable) -> false;
-server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")];
-server(_) -> undefined.
-
 database(type) -> binary();
 database(nullable) -> false;
 database(validator) -> [?NOT_EMPTY("the value of the field 'database' cannot be empty")];
@@ -100,31 +87,59 @@ auto_reconnect(type) -> boolean();
 auto_reconnect(default) -> true;
 auto_reconnect(_) -> undefined.
 
-servers(type) -> servers();
-servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")];
-servers(_) -> undefined.
-
-to_ip_port(Str) ->
-     case string:tokens(Str, ": ") of
-         [Ip, Port] ->
-             case inet:parse_address(Ip) of
-                 {ok, R} -> {ok, {R, list_to_integer(Port)}};
-                 _ -> {error, Str}
-             end;
-         _ -> {error, Str}
-     end.
-
 ip_port_to_string({Ip, Port}) when is_list(Ip) ->
     iolist_to_binary([Ip, ":", integer_to_list(Port)]);
 ip_port_to_string({Ip, Port}) when is_tuple(Ip) ->
     iolist_to_binary([inet:ntoa(Ip), ":", integer_to_list(Port)]).
 
-to_servers(Str) ->
-    {ok, lists:map(fun(Server) ->
-             case string:tokens(Server, ": ") of
-                 [Ip] ->
-                     [{host, Ip}];
-                 [Ip, Port] ->
-                     [{host, Ip}, {port, list_to_integer(Port)}]
-             end
-         end, string:tokens(Str, " , "))}.
+parse_server(Str, #{host_type := inet_addr, default_port := DefaultPort}) ->
+    try string:tokens(str(Str), ": ") of
+        [Ip, Port] ->
+            case parse_ip(Ip) of
+                {ok, R}    -> {R, list_to_integer(Port)}
+            end;
+        [Ip] ->
+            case parse_ip(Ip) of
+                {ok, R}    -> {R, DefaultPort}
+            end;
+        _ ->
+            ?THROW_ERROR("Bad server schema.")
+    catch
+        error : Reason ->
+            ?THROW_ERROR(Reason)
+    end;
+parse_server(Str, #{host_type := hostname, default_port := DefaultPort}) ->
+    try string:tokens(str(Str), ": ") of
+        [Ip, Port] ->
+            {Ip, list_to_integer(Port)};
+        [Ip] ->
+            {Ip, DefaultPort};
+        _ ->
+            ?THROW_ERROR("Bad server schema.")
+    catch
+        error : Reason ->
+            ?THROW_ERROR(Reason)
+    end;
+parse_server(_, _) ->
+    ?THROW_ERROR("Invalid Host").
+
+parse_ip(Str) ->
+    case inet:parse_address(Str) of
+        {ok, R} ->
+            {ok, R};
+        _ ->
+            %% check is a rfc1035's hostname
+            case inet_parse:domain(Str) of
+                true ->
+                    {ok, Str};
+                _ ->
+                    ?THROW_ERROR("Bad IP or Host")
+            end
+    end.
+
+str(A) when is_atom(A) ->
+    atom_to_list(A);
+str(B) when is_binary(B) ->
+    binary_to_list(B);
+str(S) when is_list(S) ->
+    S.

+ 0 - 2
apps/emqx_dashboard/src/emqx_dashboard_swagger.erl

@@ -445,8 +445,6 @@ typename_to_spec("file()", _Mod) -> #{type => string, example => <<"/path/to/fil
 typename_to_spec("ip_port()", _Mod) -> #{type => string, example => <<"127.0.0.1:80">>};
 typename_to_spec("ip_ports()", _Mod) -> #{type => string, example => <<"127.0.0.1:80, 127.0.0.2:80">>};
 typename_to_spec("url()", _Mod) -> #{type => string, example => <<"http://127.0.0.1">>};
-typename_to_spec("server()", Mod) -> typename_to_spec("ip_port()", Mod);
-typename_to_spec("servers()", Mod) -> typename_to_spec("ip_ports()", Mod);
 typename_to_spec("connect_timeout()", Mod) -> typename_to_spec("timeout()", Mod);
 typename_to_spec("timeout()", _Mod) -> #{<<"oneOf">> => [#{type => string, example => infinity},
     #{type => integer, example => 100}], example => infinity};