|
|
@@ -39,18 +39,16 @@
|
|
|
|
|
|
-export([mongo_query/5, mongo_insert/3, check_worker_health/1]).
|
|
|
|
|
|
+%% for testing
|
|
|
+-export([maybe_resolve_srv_and_txt_records/1]).
|
|
|
+
|
|
|
-define(HEALTH_CHECK_TIMEOUT, 30000).
|
|
|
|
|
|
%% mongo servers don't need parse
|
|
|
-define(MONGO_HOST_OPTIONS, #{
|
|
|
- host_type => hostname,
|
|
|
default_port => ?MONGO_DEFAULT_PORT
|
|
|
}).
|
|
|
|
|
|
--ifdef(TEST).
|
|
|
--export([to_servers_raw/1]).
|
|
|
--endif.
|
|
|
-
|
|
|
%%=====================================================================
|
|
|
roots() ->
|
|
|
[
|
|
|
@@ -73,7 +71,7 @@ fields(single) ->
|
|
|
required => true,
|
|
|
desc => ?DESC("single_mongo_type")
|
|
|
}},
|
|
|
- {server, fun server/1},
|
|
|
+ {server, server()},
|
|
|
{w_mode, fun w_mode/1}
|
|
|
] ++ mongo_fields();
|
|
|
fields(rs) ->
|
|
|
@@ -84,7 +82,7 @@ fields(rs) ->
|
|
|
required => true,
|
|
|
desc => ?DESC("rs_mongo_type")
|
|
|
}},
|
|
|
- {servers, fun servers/1},
|
|
|
+ {servers, servers()},
|
|
|
{w_mode, fun w_mode/1},
|
|
|
{r_mode, fun r_mode/1},
|
|
|
{replica_set_name, fun replica_set_name/1}
|
|
|
@@ -97,7 +95,7 @@ fields(sharded) ->
|
|
|
required => true,
|
|
|
desc => ?DESC("sharded_mongo_type")
|
|
|
}},
|
|
|
- {servers, fun servers/1},
|
|
|
+ {servers, servers()},
|
|
|
{w_mode, fun w_mode/1}
|
|
|
] ++ mongo_fields();
|
|
|
fields(topology) ->
|
|
|
@@ -161,7 +159,7 @@ on_start(
|
|
|
sharded -> "starting_mongodb_sharded_connector"
|
|
|
end,
|
|
|
?SLOG(info, #{msg => Msg, connector => InstId, config => Config}),
|
|
|
- NConfig = #{hosts := Hosts} = may_parse_srv_and_txt_records(Config),
|
|
|
+ NConfig = #{hosts := Hosts} = maybe_resolve_srv_and_txt_records(Config),
|
|
|
SslOpts =
|
|
|
case maps:get(enable, SSL) of
|
|
|
true ->
|
|
|
@@ -387,19 +385,13 @@ init_worker_options([], Acc) ->
|
|
|
%% ===================================================================
|
|
|
%% Schema funcs
|
|
|
|
|
|
-server(type) -> emqx_schema:host_port();
|
|
|
-server(required) -> true;
|
|
|
-server(validator) -> [?NOT_EMPTY("the value of the field 'server' cannot be empty")];
|
|
|
-server(converter) -> fun to_server_raw/1;
|
|
|
-server(desc) -> ?DESC("server");
|
|
|
-server(_) -> undefined.
|
|
|
+server() ->
|
|
|
+ Meta = #{desc => ?DESC("server")},
|
|
|
+ emqx_schema:servers_sc(Meta, ?MONGO_HOST_OPTIONS).
|
|
|
|
|
|
-servers(type) -> list();
|
|
|
-servers(required) -> true;
|
|
|
-servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")];
|
|
|
-servers(converter) -> fun to_servers_raw/1;
|
|
|
-servers(desc) -> ?DESC("servers");
|
|
|
-servers(_) -> undefined.
|
|
|
+servers() ->
|
|
|
+ Meta = #{desc => ?DESC("servers")},
|
|
|
+ emqx_schema:servers_sc(Meta, ?MONGO_HOST_OPTIONS).
|
|
|
|
|
|
w_mode(type) -> hoconsc:enum([unsafe, safe]);
|
|
|
w_mode(desc) -> ?DESC("w_mode");
|
|
|
@@ -434,163 +426,109 @@ srv_record(_) -> undefined.
|
|
|
%% ===================================================================
|
|
|
%% Internal funcs
|
|
|
|
|
|
-may_parse_srv_and_txt_records(#{server := Server} = Config) ->
|
|
|
+maybe_resolve_srv_and_txt_records(#{server := Server} = Config) ->
|
|
|
NConfig = maps:remove(server, Config),
|
|
|
- may_parse_srv_and_txt_records_(NConfig#{servers => [Server]});
|
|
|
-may_parse_srv_and_txt_records(Config) ->
|
|
|
- may_parse_srv_and_txt_records_(Config).
|
|
|
+ maybe_resolve_srv_and_txt_records1(Server, NConfig);
|
|
|
+maybe_resolve_srv_and_txt_records(#{servers := Servers} = Config) ->
|
|
|
+ NConfig = maps:remove(servers, Config),
|
|
|
+ maybe_resolve_srv_and_txt_records1(Servers, NConfig).
|
|
|
|
|
|
-may_parse_srv_and_txt_records_(
|
|
|
+maybe_resolve_srv_and_txt_records1(
|
|
|
+ Servers0,
|
|
|
#{
|
|
|
mongo_type := Type,
|
|
|
- srv_record := false,
|
|
|
- servers := Servers
|
|
|
+ srv_record := false
|
|
|
} = Config
|
|
|
) ->
|
|
|
case Type =:= rs andalso maps:is_key(replica_set_name, Config) =:= false of
|
|
|
true ->
|
|
|
- error({missing_parameter, replica_set_name});
|
|
|
+ throw(#{
|
|
|
+ reason => "missing_parameter",
|
|
|
+ param => replica_set_name
|
|
|
+ });
|
|
|
false ->
|
|
|
- Config#{hosts => servers_to_bin(lists:flatten(Servers))}
|
|
|
+ Servers = parse_servers(Servers0),
|
|
|
+ Config#{hosts => format_hosts(Servers)}
|
|
|
end;
|
|
|
-may_parse_srv_and_txt_records_(
|
|
|
+maybe_resolve_srv_and_txt_records1(
|
|
|
+ Servers,
|
|
|
#{
|
|
|
mongo_type := Type,
|
|
|
- srv_record := true,
|
|
|
- servers := Servers
|
|
|
+ srv_record := true
|
|
|
} = Config
|
|
|
) ->
|
|
|
- Hosts = parse_srv_records(Type, Servers),
|
|
|
- ExtraOpts = parse_txt_records(Type, Servers),
|
|
|
+ %% when srv is in use, it's typically only one DNS resolution needed,
|
|
|
+ %% however, by the schema definition, it's allowed to configure more than one.
|
|
|
+ %% here we keep only the fist
|
|
|
+ [{DNS, _IgnorePort} | _] = parse_servers(Servers),
|
|
|
+ DnsRecords = resolve_srv_records(DNS),
|
|
|
+ Hosts = format_hosts(DnsRecords),
|
|
|
+ ?tp(info, resolved_srv_records, #{dns => DNS, resolved_hosts => Hosts}),
|
|
|
+ ExtraOpts = resolve_txt_records(Type, DNS),
|
|
|
+ ?tp(info, resolved_txt_records, #{dns => DNS, resolved_options => ExtraOpts}),
|
|
|
maps:merge(Config#{hosts => Hosts}, ExtraOpts).
|
|
|
|
|
|
-parse_srv_records(Type, Servers) ->
|
|
|
- Fun = fun(AccIn, {IpOrHost, _Port}) ->
|
|
|
- case
|
|
|
- inet_res:lookup(
|
|
|
- "_mongodb._tcp." ++
|
|
|
- ip_or_host_to_string(IpOrHost),
|
|
|
- in,
|
|
|
- srv
|
|
|
- )
|
|
|
- of
|
|
|
- [] ->
|
|
|
- error(service_not_found);
|
|
|
- Services ->
|
|
|
- [
|
|
|
- [server_to_bin({Host, Port}) || {_, _, Port, Host} <- Services]
|
|
|
- | AccIn
|
|
|
- ]
|
|
|
- end
|
|
|
- end,
|
|
|
- Res = lists:foldl(Fun, [], Servers),
|
|
|
- case Type of
|
|
|
- single -> lists:nth(1, Res);
|
|
|
- _ -> Res
|
|
|
+resolve_srv_records(DNS0) ->
|
|
|
+ DNS = "_mongodb._tcp." ++ DNS0,
|
|
|
+ DnsData = emqx_connector_lib:resolve_dns(DNS, srv),
|
|
|
+ case [{Host, Port} || {_, _, Port, Host} <- DnsData] of
|
|
|
+ [] ->
|
|
|
+ throw(#{
|
|
|
+ reason => "failed_to_resolve_srv_record",
|
|
|
+ dns => DNS
|
|
|
+ });
|
|
|
+ L ->
|
|
|
+ L
|
|
|
end.
|
|
|
|
|
|
-parse_txt_records(Type, Servers) ->
|
|
|
- Fields =
|
|
|
- case Type of
|
|
|
- rs -> ["authSource", "replicaSet"];
|
|
|
- _ -> ["authSource"]
|
|
|
- end,
|
|
|
- Fun = fun(AccIn, {IpOrHost, _Port}) ->
|
|
|
- case inet_res:lookup(IpOrHost, in, txt) of
|
|
|
- [] ->
|
|
|
- #{};
|
|
|
- [[QueryString]] ->
|
|
|
- case uri_string:dissect_query(QueryString) of
|
|
|
- {error, _, _} ->
|
|
|
- error({invalid_txt_record, invalid_query_string});
|
|
|
- Options ->
|
|
|
- maps:merge(AccIn, take_and_convert(Fields, Options))
|
|
|
- end;
|
|
|
- _ ->
|
|
|
- error({invalid_txt_record, multiple_records})
|
|
|
- end
|
|
|
- end,
|
|
|
- lists:foldl(Fun, #{}, Servers).
|
|
|
-
|
|
|
-take_and_convert(Fields, Options) ->
|
|
|
- take_and_convert(Fields, Options, #{}).
|
|
|
-
|
|
|
-take_and_convert([], [_ | _], _Acc) ->
|
|
|
- error({invalid_txt_record, invalid_option});
|
|
|
-take_and_convert([], [], Acc) ->
|
|
|
- Acc;
|
|
|
-take_and_convert([Field | More], Options, Acc) ->
|
|
|
- case lists:keytake(Field, 1, Options) of
|
|
|
- {value, {"authSource", V}, NOptions} ->
|
|
|
- take_and_convert(More, NOptions, Acc#{auth_source => list_to_binary(V)});
|
|
|
- {value, {"replicaSet", V}, NOptions} ->
|
|
|
- take_and_convert(More, NOptions, Acc#{replica_set_name => list_to_binary(V)});
|
|
|
- {value, _, _} ->
|
|
|
- error({invalid_txt_record, invalid_option});
|
|
|
- false ->
|
|
|
- take_and_convert(More, Options, Acc)
|
|
|
+resolve_txt_records(Type, DNS) ->
|
|
|
+ case emqx_connector_lib:resolve_dns(DNS, txt) of
|
|
|
+ [] ->
|
|
|
+ #{};
|
|
|
+ [[QueryString]] = L ->
|
|
|
+ %% e.g. "authSource=admin&replicaSet=atlas-wrnled-shard-0"
|
|
|
+ case uri_string:dissect_query(QueryString) of
|
|
|
+ {error, _, _} ->
|
|
|
+ throw(#{
|
|
|
+ reason => "bad_txt_record_resolution",
|
|
|
+ resolved => L
|
|
|
+ });
|
|
|
+ Options ->
|
|
|
+ convert_options(Type, normalize_options(Options))
|
|
|
+ end;
|
|
|
+ L ->
|
|
|
+ throw(#{
|
|
|
+ reason => "multiple_txt_records",
|
|
|
+ resolved => L
|
|
|
+ })
|
|
|
end.
|
|
|
|
|
|
--spec ip_or_host_to_string(binary() | string() | tuple()) ->
|
|
|
- string().
|
|
|
-ip_or_host_to_string(Ip) when is_tuple(Ip) ->
|
|
|
- inet:ntoa(Ip);
|
|
|
-ip_or_host_to_string(Host) ->
|
|
|
- str(Host).
|
|
|
-
|
|
|
-servers_to_bin([Server | Rest]) ->
|
|
|
- [server_to_bin(Server) | servers_to_bin(Rest)];
|
|
|
-servers_to_bin([]) ->
|
|
|
- [].
|
|
|
+normalize_options([]) ->
|
|
|
+ [];
|
|
|
+normalize_options([{Name, Value} | Options]) ->
|
|
|
+ [{string:lowercase(Name), Value} | normalize_options(Options)].
|
|
|
+
|
|
|
+convert_options(rs, Options) ->
|
|
|
+ M1 = maybe_add_option(auth_source, "authSource", Options),
|
|
|
+ M2 = maybe_add_option(replica_set_name, "replicaSet", Options),
|
|
|
+ maps:merge(M1, M2);
|
|
|
+convert_options(_, Options) ->
|
|
|
+ maybe_add_option(auth_source, "authSource", Options).
|
|
|
+
|
|
|
+maybe_add_option(ConfigKey, OptName0, Options) ->
|
|
|
+ OptName = string:lowercase(OptName0),
|
|
|
+ case lists:keyfind(OptName, 1, Options) of
|
|
|
+ {_, OptValue} ->
|
|
|
+ #{ConfigKey => iolist_to_binary(OptValue)};
|
|
|
+ false ->
|
|
|
+ #{}
|
|
|
+ end.
|
|
|
|
|
|
-server_to_bin({IpOrHost, Port}) ->
|
|
|
- iolist_to_binary(ip_or_host_to_string(IpOrHost) ++ ":" ++ integer_to_list(Port)).
|
|
|
+format_host({Host, Port}) ->
|
|
|
+ iolist_to_binary([Host, ":", integer_to_list(Port)]).
|
|
|
|
|
|
-%% ===================================================================
|
|
|
-%% typereflt funcs
|
|
|
-
|
|
|
--spec to_server_raw(string()) ->
|
|
|
- {string(), pos_integer()}.
|
|
|
-to_server_raw(Server) ->
|
|
|
- emqx_connector_schema_lib:parse_server(Server, ?MONGO_HOST_OPTIONS).
|
|
|
-
|
|
|
--spec to_servers_raw(string()) ->
|
|
|
- [{string(), pos_integer()}].
|
|
|
-to_servers_raw(Servers) ->
|
|
|
- lists:map(
|
|
|
- fun(Server) ->
|
|
|
- emqx_connector_schema_lib:parse_server(Server, ?MONGO_HOST_OPTIONS)
|
|
|
- end,
|
|
|
- split_servers(Servers)
|
|
|
- ).
|
|
|
+format_hosts(Hosts) ->
|
|
|
+ lists:map(fun format_host/1, Hosts).
|
|
|
|
|
|
-split_servers(L) when is_list(L) ->
|
|
|
- PossibleTypes = [
|
|
|
- list(binary()),
|
|
|
- list(string()),
|
|
|
- string()
|
|
|
- ],
|
|
|
- TypeChecks = lists:map(fun(T) -> typerefl:typecheck(T, L) end, PossibleTypes),
|
|
|
- case TypeChecks of
|
|
|
- [ok, _, _] ->
|
|
|
- %% list(binary())
|
|
|
- lists:map(fun binary_to_list/1, L);
|
|
|
- [_, ok, _] ->
|
|
|
- %% list(string())
|
|
|
- L;
|
|
|
- [_, _, ok] ->
|
|
|
- %% string()
|
|
|
- string:tokens(L, ", ");
|
|
|
- [_, _, _] ->
|
|
|
- %% invalid input
|
|
|
- throw("List of servers must contain only strings")
|
|
|
- end;
|
|
|
-split_servers(B) when is_binary(B) ->
|
|
|
- string:tokens(str(B), ", ").
|
|
|
-
|
|
|
-str(A) when is_atom(A) ->
|
|
|
- atom_to_list(A);
|
|
|
-str(B) when is_binary(B) ->
|
|
|
- binary_to_list(B);
|
|
|
-str(S) when is_list(S) ->
|
|
|
- S.
|
|
|
+parse_servers(HoconValue) ->
|
|
|
+ emqx_schema:parse_servers(HoconValue, ?MONGO_HOST_OPTIONS).
|