|
|
@@ -19,6 +19,9 @@
|
|
|
-include_lib("typerefl/include/types.hrl").
|
|
|
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
|
|
|
|
|
|
+-type server() :: string().
|
|
|
+-reflect_type([server/0]).
|
|
|
+
|
|
|
%% callbacks of behaviour emqx_resource
|
|
|
-export([ on_start/2
|
|
|
, on_stop/2
|
|
|
@@ -36,16 +39,28 @@
|
|
|
structs() -> [""].
|
|
|
|
|
|
fields("") ->
|
|
|
- [ {mongo_type, fun mongo_type/1}
|
|
|
+ [ {config, #{type => hoconsc:union(
|
|
|
+ [ hoconsc:ref(?MODULE, single)
|
|
|
+ , hoconsc:ref(?MODULE, rs)
|
|
|
+ , hoconsc:ref(?MODULE, sharded)
|
|
|
+ ])}}
|
|
|
+ ];
|
|
|
+fields(single) ->
|
|
|
+ [ {mongo_type, #{type => single,
|
|
|
+ default => single}}
|
|
|
+ , {server, fun server/1}
|
|
|
+ ] ++ mongo_fields();
|
|
|
+fields(rs) ->
|
|
|
+ [ {mongo_type, #{type => rs,
|
|
|
+ default => rs}}
|
|
|
, {servers, fun servers/1}
|
|
|
- , {pool_size, fun emqx_connector_schema_lib:pool_size/1}
|
|
|
- , {login, fun emqx_connector_schema_lib:username/1}
|
|
|
- , {password, fun emqx_connector_schema_lib:password/1}
|
|
|
- , {auth_source, fun auth_source/1}
|
|
|
- , {database, fun emqx_connector_schema_lib:database/1}
|
|
|
- ] ++
|
|
|
- % mongodb_rs_set_name_fields() ++
|
|
|
- emqx_connector_schema_lib:ssl_fields();
|
|
|
+ , {replicaset_name, fun emqx_connector_schema_lib:database/1}
|
|
|
+ ] ++ mongo_fields();
|
|
|
+fields(sharded) ->
|
|
|
+ [ {mongo_type, #{type => sharded,
|
|
|
+ default => sharded}}
|
|
|
+ , {servers, fun servers/1}
|
|
|
+ ] ++ mongo_fields();
|
|
|
fields(topology) ->
|
|
|
[ {max_overflow, fun emqx_connector_schema_lib:pool_size/1}
|
|
|
, {overflow_ttl, fun duration/1}
|
|
|
@@ -59,40 +74,42 @@ fields(topology) ->
|
|
|
, {min_heartbeat_frequency_ms, fun duration/1}
|
|
|
].
|
|
|
|
|
|
+mongo_fields() ->
|
|
|
+ [ {pool_size, fun emqx_connector_schema_lib:pool_size/1}
|
|
|
+ , {login, fun emqx_connector_schema_lib:username/1}
|
|
|
+ , {password, fun emqx_connector_schema_lib:password/1}
|
|
|
+ , {auth_source, fun auth_source/1}
|
|
|
+ , {database, fun emqx_connector_schema_lib:database/1}
|
|
|
+ ] ++
|
|
|
+ emqx_connector_schema_lib:ssl_fields().
|
|
|
+
|
|
|
on_jsonify(Config) ->
|
|
|
Config.
|
|
|
|
|
|
%% ===================================================================
|
|
|
-on_start(InstId, #{servers := Servers,
|
|
|
- mongo_type := Type,
|
|
|
- database := Database,
|
|
|
- pool_size := PoolSize,
|
|
|
- ssl := SSL} = Config) ->
|
|
|
+on_start(InstId, #{config := #{server := Server,
|
|
|
+ mongo_type := single} = Config}) ->
|
|
|
logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]),
|
|
|
- SslOpts = case maps:get(enable, SSL) of
|
|
|
- true ->
|
|
|
- [{ssl, true},
|
|
|
- {ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)}
|
|
|
- ];
|
|
|
- false -> [{ssl, false}]
|
|
|
- end,
|
|
|
- Hosts = [string:trim(H) || H <- string:tokens(binary_to_list(Servers), ",")],
|
|
|
- Opts = [{type, init_type(Type, Config)},
|
|
|
- {hosts, Hosts},
|
|
|
- {pool_size, PoolSize},
|
|
|
- {options, init_topology_options(maps:to_list(Config), [])},
|
|
|
- {worker_options, init_worker_options(maps:to_list(Config), SslOpts)}],
|
|
|
+ Opts = [{type, single},
|
|
|
+ {hosts, [Server]}
|
|
|
+ ],
|
|
|
+ do_start(InstId, Opts, Config);
|
|
|
|
|
|
- %% test the connection
|
|
|
- TestOpts = [{database, Database}] ++ host_port(hd(Hosts)),
|
|
|
- {ok, TestConn} = mc_worker_api:connect(TestOpts),
|
|
|
+on_start(InstId, #{config := #{servers := Servers,
|
|
|
+ mongo_type := rs,
|
|
|
+ replicaset_name := RsName} = Config}) ->
|
|
|
+ logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]),
|
|
|
+ Opts = [{type, {rs, RsName}},
|
|
|
+ {hosts, Servers}],
|
|
|
+ do_start(InstId, Opts, Config);
|
|
|
|
|
|
- PoolName = emqx_plugin_libs_pool:pool_name(InstId),
|
|
|
- _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts),
|
|
|
- {ok, #{poolname => PoolName,
|
|
|
- type => Type,
|
|
|
- test_conn => TestConn,
|
|
|
- test_opts => TestOpts}}.
|
|
|
+on_start(InstId, #{config := #{servers := Servers,
|
|
|
+ mongo_type := sharded} = Config}) ->
|
|
|
+ logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]),
|
|
|
+ Opts = [{type, sharded},
|
|
|
+ {hosts, Servers}
|
|
|
+ ],
|
|
|
+ do_start(InstId, Opts, Config).
|
|
|
|
|
|
on_stop(InstId, #{poolname := PoolName}) ->
|
|
|
logger:info("stopping mongodb connector: ~p", [InstId]),
|
|
|
@@ -138,10 +155,38 @@ mongo_query(Conn, find, Collection, Selector, Docs) ->
|
|
|
mongo_query(_Conn, _Action, _Collection, _Selector, _Docs) ->
|
|
|
ok.
|
|
|
|
|
|
-init_type(rs, #{rs_set_name := Name}) ->
|
|
|
- {rs, Name};
|
|
|
-init_type(Type, _Opts) ->
|
|
|
- Type.
|
|
|
+do_start(InstId, Opts0, Config = #{mongo_type := Type,
|
|
|
+ database := Database,
|
|
|
+ pool_size := PoolSize,
|
|
|
+ ssl := SSL}) ->
|
|
|
+ SslOpts = case maps:get(enable, SSL) of
|
|
|
+ true ->
|
|
|
+ [{ssl, true},
|
|
|
+ {ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)}
|
|
|
+ ];
|
|
|
+ false -> [{ssl, false}]
|
|
|
+ end,
|
|
|
+ Opts = Opts0 ++
|
|
|
+ [{pool_size, PoolSize},
|
|
|
+ {options, init_topology_options(maps:to_list(Config), [])},
|
|
|
+ {worker_options, init_worker_options(maps:to_list(Config), SslOpts)}],
|
|
|
+ %% test the connection
|
|
|
+ TestOpts = case maps:is_key(server, Config) of
|
|
|
+ true ->
|
|
|
+ Server = maps:get(server, Config),
|
|
|
+ host_port(Server);
|
|
|
+ false ->
|
|
|
+ Servers = maps:get(servers, Config),
|
|
|
+ host_port(erlang:hd(Servers))
|
|
|
+ end ++ [{database, Database}],
|
|
|
+ {ok, TestConn} = mc_worker_api:connect(TestOpts),
|
|
|
+
|
|
|
+ PoolName = emqx_plugin_libs_pool:pool_name(InstId),
|
|
|
+ _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts),
|
|
|
+ {ok, #{poolname => PoolName,
|
|
|
+ type => Type,
|
|
|
+ test_conn => TestConn,
|
|
|
+ test_opts => TestOpts}}.
|
|
|
|
|
|
init_topology_options([{pool_size, Val}| R], Acc) ->
|
|
|
init_topology_options(R, [{pool_size, Val}| Acc]);
|
|
|
@@ -196,21 +241,17 @@ host_port(HostPort) ->
|
|
|
[{host, Host1}]
|
|
|
end.
|
|
|
|
|
|
-% mongodb_rs_set_name_fields() ->
|
|
|
-% [ {rs_set_name, fun emqx_connector_schema_lib:database/1}
|
|
|
-% ].
|
|
|
+server(type) -> server();
|
|
|
+server(validator) -> [?REQUIRED("the field 'server' is required")];
|
|
|
+server(_) -> undefined.
|
|
|
|
|
|
-auth_source(type) -> binary();
|
|
|
-auth_source(nullable) -> true;
|
|
|
-auth_source(_) -> undefined.
|
|
|
-
|
|
|
-servers(type) -> binary();
|
|
|
+servers(type) -> hoconsc:array(server());
|
|
|
servers(validator) -> [?REQUIRED("the field 'servers' is required")];
|
|
|
servers(_) -> undefined.
|
|
|
|
|
|
-mongo_type(type) -> hoconsc:enum([single, unknown, shared, rs]);
|
|
|
-mongo_type(default) -> single;
|
|
|
-mongo_type(_) -> undefined.
|
|
|
+auth_source(type) -> binary();
|
|
|
+auth_source(nullable) -> true;
|
|
|
+auth_source(_) -> undefined.
|
|
|
|
|
|
duration(type) -> emqx_schema:duration_ms();
|
|
|
duration(nullable) -> true;
|