|
|
@@ -62,7 +62,8 @@ fields(sharded) ->
|
|
|
, {servers, fun servers/1}
|
|
|
] ++ mongo_fields();
|
|
|
fields(topology) ->
|
|
|
- [ {max_overflow, fun emqx_connector_schema_lib:pool_size/1}
|
|
|
+ [ {pool_size, fun emqx_connector_schema_lib:pool_size/1}
|
|
|
+ , {max_overflow, fun emqx_connector_schema_lib:pool_size/1}
|
|
|
, {overflow_ttl, fun duration/1}
|
|
|
, {overflow_check_period, fun duration/1}
|
|
|
, {local_threshold_ms, fun duration/1}
|
|
|
@@ -81,6 +82,8 @@ mongo_fields() ->
|
|
|
, {auth_source, #{type => binary(),
|
|
|
nullable => true}}
|
|
|
, {database, fun emqx_connector_schema_lib:database/1}
|
|
|
+ , {topology, #{type => hoconsc:ref(?MODULE, topology),
|
|
|
+ nullable => true}}
|
|
|
] ++
|
|
|
emqx_connector_schema_lib:ssl_fields().
|
|
|
|
|
|
@@ -170,9 +173,10 @@ do_start(InstId, Opts0, Config = #{mongo_type := Type,
|
|
|
];
|
|
|
false -> [{ssl, false}]
|
|
|
end,
|
|
|
+ Topology= maps:get(topology, Config, #{}),
|
|
|
Opts = Opts0 ++
|
|
|
[{pool_size, PoolSize},
|
|
|
- {options, init_topology_options(maps:to_list(Config), [])},
|
|
|
+ {options, init_topology_options(maps:to_list(Topology), [])},
|
|
|
{worker_options, init_worker_options(maps:to_list(Config), SslOpts)}],
|
|
|
%% test the connection
|
|
|
TestOpts = case maps:is_key(server, Config) of
|