Преглед изворни кода

feat(mongo srv): support srv record for mongo connector

zhouzb пре 4 година
родитељ
комит
f8a625a67f
2 измењених фајлова са 129 додато и 74 уклоњено
  1. 1 1
      apps/emqx_connector/rebar.config
  2. 128 73
      apps/emqx_connector/src/emqx_connector_mongo.erl

+ 1 - 1
apps/emqx_connector/rebar.config

@@ -8,7 +8,7 @@
   {mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.1"}}},
   {epgsql, {git, "https://github.com/epgsql/epgsql", {tag, "4.4.0"}}},
   %% NOTE: mind poolboy version when updating mongodb-erlang version
-  {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.9"}}},
+  {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.10"}}},
   %% NOTE: mind poolboy version when updating eredis_cluster version
   {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.7"}}},
   %% mongodb-erlang uses a special fork https://github.com/comtihon/poolboy.git

+ 128 - 73
apps/emqx_connector/src/emqx_connector_mongo.erl

@@ -20,10 +20,6 @@
 -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
 -include_lib("emqx/include/logger.hrl").
 
--type server() :: emqx_schema:ip_port().
--reflect_type([server/0]).
--typerefl_from_string({server/0, emqx_connector_schema_lib, to_ip_port}).
-
 %% callbacks of behaviour emqx_resource
 -export([ on_start/2
         , on_stop/2
@@ -37,6 +33,7 @@
 -export([roots/0, fields/1]).
 
 -export([mongo_query/5]).
+
 %%=====================================================================
 roots() ->
     [ {config, #{type => hoconsc:union(
@@ -55,7 +52,7 @@ fields(rs) ->
     [ {mongo_type, #{type => rs,
                      default => rs}}
     , {servers, fun servers/1}
-    , {replica_set_name, fun emqx_connector_schema_lib:database/1}
+    , {replica_set_name, fun replica_set_name/1}
     ] ++ mongo_fields();
 fields(sharded) ->
     [ {mongo_type, #{type => sharded,
@@ -77,7 +74,8 @@ fields(topology) ->
     ].
 
 mongo_fields() ->
-    [ {pool_size, fun emqx_connector_schema_lib:pool_size/1}
+    [ {srv_record, fun srv_record/1}
+    , {pool_size, fun emqx_connector_schema_lib:pool_size/1}
     , {username, fun emqx_connector_schema_lib:username/1}
     , {password, fun emqx_connector_schema_lib:password/1}
     , {auth_source, #{type => binary(),
@@ -92,35 +90,32 @@ on_jsonify(Config) ->
     Config.
 
 %% ===================================================================
-on_start(InstId, Config = #{server := Server,
-                            mongo_type := single}) ->
-    ?SLOG(info, #{msg => "starting mongodb single connector",
-                  connector => InstId, config => Config}),
-    Opts = [{type, single},
-            {hosts, [emqx_connector_schema_lib:ip_port_to_string(Server)]}
-            ],
-    do_start(InstId, Opts, Config);
-
-on_start(InstId, Config = #{servers := Servers,
-                            mongo_type := rs,
-                            replica_set_name := RsName}) ->
-    ?SLOG(info, #{msg => "starting mongodb rs connector",
-                  connector => InstId, config => Config}),
-    Opts = [{type,  {rs, RsName}},
-            {hosts, [emqx_connector_schema_lib:ip_port_to_string(S)
-                     || S <- Servers]}
-           ],
-    do_start(InstId, Opts, Config);
-
-on_start(InstId, Config = #{servers := Servers,
-                            mongo_type := sharded}) ->
-    ?SLOG(info, #{msg => "starting mongodb sharded connector",
-                  connector => InstId, config => Config}),
-    Opts = [{type, sharded},
-            {hosts, [emqx_connector_schema_lib:ip_port_to_string(S)
-                     || S <- Servers]}
-            ],
-    do_start(InstId, Opts, Config).
+
+on_start(InstId, Config = #{mongo_type := Type,
+                            pool_size := PoolSize,
+                            ssl := SSL}) ->
+    Msg = case Type of
+              single -> "starting_mongodb_single_connector";
+              rs -> "starting_mongodb_replica_set_connector";
+              sharded -> "starting_mongodb_sharded_connector"
+          end,
+    ?SLOG(info, #{msg => Msg, connector => InstId, config => Config}),
+    NConfig = may_parse_srv_and_txt_records(Config),
+    SslOpts = case maps:get(enable, SSL) of
+                  true ->
+                      [{ssl, true},
+                       {ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)}
+                      ];
+                  false -> [{ssl, false}]
+              end,
+    Topology = maps:get(topology, NConfig, #{}),
+    Opts = [{type, init_type(NConfig)},
+            {pool_size, PoolSize},
+            {options, init_topology_options(maps:to_list(Topology), [])},
+            {worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)}],
+    PoolName = emqx_plugin_libs_pool:pool_name(InstId),
+    _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts),
+    {ok, #{poolname => PoolName, type => Type}}.
 
 on_stop(InstId, #{poolname := PoolName}) ->
     ?SLOG(info, #{msg => "stopping mongodb connector",
@@ -184,39 +179,10 @@ mongo_query(Conn, find_one, Collection, Selector, Projector) ->
 mongo_query(_Conn, _Action, _Collection, _Selector, _Projector) ->
     ok.
 
-do_start(InstId, Opts0, Config = #{mongo_type := Type,
-                                   database := Database,
-                                   pool_size := PoolSize,
-                                   ssl := SSL}) ->
-    SslOpts = case maps:get(enable, SSL) of
-                  true ->
-                      [{ssl, true},
-                       {ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)}
-                      ];
-                  false -> [{ssl, false}]
-              end,
-    Topology= maps:get(topology, Config, #{}),
-    Opts = Opts0 ++
-           [{pool_size, PoolSize},
-            {options, init_topology_options(maps:to_list(Topology), [])},
-            {worker_options, init_worker_options(maps:to_list(Config), SslOpts)}],
-    %% test the connection
-    TestOpts = case maps:is_key(server, Config) of
-                  true ->
-                    Server = maps:get(server, Config),
-                    host_port(Server);
-                  false ->
-                    Servers = maps:get(servers, Config),
-                    host_port(erlang:hd(Servers))
-              end ++ [{database, Database}],
-    {ok, TestConn} = mc_worker_api:connect(TestOpts),
-
-    PoolName = emqx_plugin_libs_pool:pool_name(InstId),
-    _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts),
-    {ok, #{poolname => PoolName,
-           type => Type,
-           test_conn => TestConn,
-           test_opts => TestOpts}}.
+init_type(#{type := rs, replica_set_name := ReplicaSetName}) ->
+    {rs, ReplicaSetName};
+init_type(#{type := Type}) ->
+    Type.
 
 init_topology_options([{pool_size, Val}| R], Acc) ->
     init_topology_options(R, [{pool_size, Val}| Acc]);
@@ -261,17 +227,106 @@ init_worker_options([_ | R], Acc) ->
     init_worker_options(R, Acc);
 init_worker_options([], Acc) -> Acc.
 
-host_port({Host, Port}) ->
-    [{host, Host}, {port, Port}].
-
-server(type) -> server();
+server(type) -> binary();
 server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")];
 server(_) -> undefined.
 
-servers(type) -> hoconsc:array(server());
+servers(type) -> binary();
 servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")];
 servers(_) -> undefined.
 
 duration(type) -> emqx_schema:duration_ms();
 duration(nullable) -> true;
 duration(_) -> undefined.
+
+replica_set_name(type) -> binary();
+replica_set_name(nullable) -> true;
+replica_set_name(_) -> undefined.
+
+srv_record(type) -> boolean();
+srv_record(default) -> false;
+srv_record(_) -> undefined.
+
+parse_servers(Type, Servers) when is_binary(Servers) ->
+    parse_servers(Type, binary_to_list(Servers));
+parse_servers(Type, Servers) when is_list(Servers) ->
+    case string:split(Servers, ",", trailing) of
+        [Host | _] when Type =:= single -> 
+            [Host];
+        Hosts ->
+            Hosts
+    end.
+
+may_parse_srv_and_txt_records(#{server := Server} = Config) ->
+    NConfig = maps:remove(server, Config),
+    may_parse_srv_and_txt_records_(NConfig#{servers => Server});
+may_parse_srv_and_txt_records(Config) ->
+    may_parse_srv_and_txt_records_(Config).
+
+may_parse_srv_and_txt_records_(#{mongo_type := Type,
+                                 srv_record := false,
+                                 servers := Servers} = Config) ->
+    case Type =:= rs andalso maps:is_key(replica_set_name, Config) =:= false of
+        true ->
+            error({missing_parameter, replica_set_name});
+        false ->
+            Config#{hosts => parse_servers(Type, Servers)}
+    end;
+may_parse_srv_and_txt_records_(#{mongo_type := Type,
+                                 srv_record := true,
+                                 servers := Servers} = Config) ->
+    NServers = binary_to_list(Servers),
+    Hosts = parse_srv_records(Type, NServers),
+    ExtraOpts = parse_txt_records(Type, NServers),
+    maps:merge(Config#{hosts => Hosts}, ExtraOpts).
+
+parse_srv_records(Type, Server) ->
+    case inet_res:lookup("_mongodb._tcp." ++ Server, in, srv) of
+        [] ->
+            error(service_not_found);
+        Services ->
+            case [Host ++ ":" ++ integer_to_list(Port) || {_, _, Port, Host} <- Services] of
+                [H | _] when Type =:= single -> 
+                    [H];
+                Hosts ->
+                    Hosts
+            end
+    end.
+
+parse_txt_records(Type, Server) ->
+    case inet_res:lookup(Server, in, txt) of
+        [] ->
+            #{};
+        [[QueryString]] ->
+            case uri_string:dissect_query(QueryString) of
+                {error, _, _} ->
+                    error({invalid_txt_record, invalid_query_string});
+                Options ->
+                    Fields = case Type of
+                                 rs -> ["authSource", "replicaSet"];
+                                 _ -> ["authSource"]
+                             end,
+                    take_and_convert(Fields, Options)
+            end;
+        _ ->
+            error({invalid_txt_record, multiple_records})
+    end.
+
+take_and_convert(Fields, Options) ->
+    take_and_convert(Fields, Options, #{}).
+
+take_and_convert([], [_ | _], _Acc) ->
+    error({invalid_txt_record, invalid_option});
+take_and_convert([], [], Acc) ->
+    Acc;
+take_and_convert([Field | More], Options, Acc) ->
+    case lists:keytake(Field, 1, Options) of
+        {value, {"authSource", V}, NOptions} ->
+            take_and_convert(More, NOptions, Acc#{auth_source => list_to_binary(V)});
+        {value, {"replicaSet", V}, NOptions} ->
+            take_and_convert(More, NOptions, Acc#{replica_set_name => list_to_binary(V)});
+        {value, _, _} ->
+            error({invalid_txt_record, invalid_option});
+        false ->
+            take_and_convert(More, Options, Acc)
+    end.