ソースを参照

refactor(connector): parse servers for `rs` and `sharded` mongo_type

JimMoen 4 年 前
コミット
28735dc6d7
1 ファイル変更71 行追加44 行削除
  1. 71 44
      apps/emqx_connector/src/emqx_connector_mongo.erl

+ 71 - 44
apps/emqx_connector/src/emqx_connector_mongo.erl

@@ -298,6 +298,8 @@ server(_) -> undefined.
 servers(type) -> binary();
 servers(nullable) -> false;
 servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be empty")];
+servers(converter) -> fun to_servers_raw/1;
+servers(desc) -> ?SERVERS_DESC ++ server(desc);
 servers(_) -> undefined.
 
 w_mode(type) -> hoconsc:enum([unsafe, safe]);
@@ -323,19 +325,9 @@ srv_record(_) -> undefined.
 %% ===================================================================
 %% Internal funcs
 
-parse_servers(Type, Servers) when is_binary(Servers) ->
-    parse_servers(Type, binary_to_list(Servers));
-parse_servers(Type, Servers) when is_list(Servers) ->
-    case string:split(Servers, ",", all) of
-        [Host | _] when Type =:= single ->
-            [Host];
-        Hosts ->
-            Hosts
-    end.
-
 may_parse_srv_and_txt_records(#{server := Server} = Config) ->
     NConfig = maps:remove(server, Config),
-    may_parse_srv_and_txt_records_(NConfig#{servers => Server});
+    may_parse_srv_and_txt_records_(NConfig#{servers => [Server]});
 may_parse_srv_and_txt_records(Config) ->
     may_parse_srv_and_txt_records_(Config).
 
@@ -346,47 +338,52 @@ may_parse_srv_and_txt_records_(#{mongo_type := Type,
         true ->
             error({missing_parameter, replica_set_name});
         false ->
-            Config#{hosts => parse_servers(Type, Servers)}
+            Config#{hosts => servers_to_bin(Servers)}
     end;
 may_parse_srv_and_txt_records_(#{mongo_type := Type,
                                  srv_record := true,
                                  servers := Servers} = Config) ->
-    NServers = binary_to_list(Servers),
-    Hosts = parse_srv_records(Type, NServers),
-    ExtraOpts = parse_txt_records(Type, NServers),
+    Hosts = parse_srv_records(Type, Servers),
+    ExtraOpts = parse_txt_records(Type, Servers),
     maps:merge(Config#{hosts => Hosts}, ExtraOpts).
 
-parse_srv_records(Type, Server) ->
-    case inet_res:lookup("_mongodb._tcp." ++ Server, in, srv) of
-        [] ->
-            error(service_not_found);
-        Services ->
-            case [Host ++ ":" ++ integer_to_list(Port) || {_, _, Port, Host} <- Services] of
-                [H | _] when Type =:= single ->
-                    [H];
-                Hosts ->
-                    Hosts
-            end
+parse_srv_records(Type, Servers) ->
+    Fun = fun(AccIn, {IpOrHost, _Port}) ->
+                  case inet_res:lookup("_mongodb._tcp." ++ ip_or_host_to_string(IpOrHost), in, srv) of
+                      [] ->
+                          error(service_not_found);
+                      Services ->
+                          [ [server_to_bin({Host, Port}) || {_, _, Port, Host} <- Services]
+                          | AccIn]
+                  end
+          end,
+    Res = lists:foldl(Fun, [], Servers),
+    case Type of
+        single -> lists:nth(1, Res);
+        _      -> Res
     end.
 
-parse_txt_records(Type, Server) ->
-    case inet_res:lookup(Server, in, txt) of
-        [] ->
-            #{};
-        [[QueryString]] ->
-            case uri_string:dissect_query(QueryString) of
-                {error, _, _} ->
-                    error({invalid_txt_record, invalid_query_string});
-                Options ->
-                    Fields = case Type of
-                                 rs -> ["authSource", "replicaSet"];
-                                 _ -> ["authSource"]
-                             end,
-                    take_and_convert(Fields, Options)
-            end;
-        _ ->
-            error({invalid_txt_record, multiple_records})
-    end.
+parse_txt_records(Type, Servers) ->
+    Fun = fun(AccIn, {IpOrHost, _Port}) ->
+                  case inet_res:lookup(IpOrHost, in, txt) of
+                      [] ->
+                          #{};
+                      [[QueryString]] ->
+                          case uri_string:dissect_query(QueryString) of
+                              {error, _, _} ->
+                                  error({invalid_txt_record, invalid_query_string});
+                              Options ->
+                                  Fields = case Type of
+                                               rs -> ["authSource", "replicaSet"];
+                                               _ -> ["authSource"]
+                                           end,
+                                  maps:merge(AccIn, take_and_convert(Fields, Options))
+                          end;
+                      _ ->
+                          error({invalid_txt_record, multiple_records})
+                  end
+          end,
+    lists:foldl(Fun, #{}, Servers).
 
 take_and_convert(Fields, Options) ->
     take_and_convert(Fields, Options, #{}).
@@ -407,6 +404,21 @@ take_and_convert([Field | More], Options, Acc) ->
             take_and_convert(More, Options, Acc)
     end.
 
+-spec ip_or_host_to_string(binary() | string() | tuple())
+      -> string().
+ip_or_host_to_string(Ip) when is_tuple(Ip) ->
+    inet:ntoa(Ip);
+ip_or_host_to_string(Host) ->
+    str(Host).
+
+servers_to_bin([Server | Rest]) ->
+    [server_to_bin(Server) | servers_to_bin(Rest)];
+servers_to_bin([]) ->
+    [].
+
+server_to_bin({IpOrHost, Port}) ->
+    iolist_to_binary(ip_or_host_to_string(IpOrHost) ++ ":" ++ integer_to_list(Port)).
+
 %% ===================================================================
 %% typereflt funcs
 
@@ -414,3 +426,18 @@ take_and_convert([Field | More], Options, Acc) ->
       -> {string(), pos_integer()}.
 to_server_raw(Server) ->
     emqx_connector_schema_lib:parse_server(Server, ?MONGO_HOST_OPTIONS).
+
+-spec to_servers_raw(string())
+      -> [{string(), pos_integer()}].
+to_servers_raw(Servers) ->
+    lists:map( fun(Server) ->
+                   emqx_connector_schema_lib:parse_server(Server, ?MONGO_HOST_OPTIONS)
+               end
+             , string:tokens(str(Servers), ", ")).
+
+str(A) when is_atom(A) ->
+    atom_to_list(A);
+str(B) when is_binary(B) ->
+    binary_to_list(B);
+str(S) when is_list(S) ->
+    S.