Просмотр исходного кода

feat(schema): add support for schemes in server parser/validator

Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
dc48032309
2 измененных файлов с 240 добавлено и 9 удалено
  1. 74 8
      apps/emqx/src/emqx_schema.erl
  2. 166 1
      apps/emqx/test/emqx_schema_tests.erl

+ 74 - 8
apps/emqx/src/emqx_schema.erl

@@ -42,7 +42,12 @@
 -type ip_port() :: tuple() | integer().
 -type cipher() :: map().
 -type port_number() :: 1..65536.
--type server_parse_option() :: #{default_port => port_number(), no_port => boolean()}.
+-type server_parse_option() :: #{
+    default_port => port_number(),
+    no_port => boolean(),
+    supported_schemes => [string()],
+    default_scheme => string()
+}.
 -type url() :: binary().
 -type json_binary() :: binary().
 
@@ -2896,7 +2901,10 @@ servers_validator(Opts, Required) ->
 %% `no_port': by default it's `false', when set to `true',
 %%            a `throw' exception is raised if the port is found.
 -spec parse_server(undefined | string() | binary(), server_parse_option()) ->
-    {string(), port_number()}.
+    string()
+    | {string(), port_number()}
+    | {string(), string()}
+    | {string(), string(), port_number()}.
 parse_server(Str, Opts) ->
     case parse_servers(Str, Opts) of
         undefined ->
@@ -2910,7 +2918,12 @@ parse_server(Str, Opts) ->
 %% @doc Parse comma separated `host[:port][,host[:port]]' endpoints
 %% into a list of `{Host, Port}' tuples or just `Host' string.
 -spec parse_servers(undefined | string() | binary(), server_parse_option()) ->
-    [{string(), port_number()}].
+    [
+        string()
+        | {string(), port_number()}
+        | {string(), string()}
+        | {string(), string(), port_number()}
+    ].
 parse_servers(undefined, _Opts) ->
     %% should not parse 'undefined' as string,
     %% not to throw exception either,
@@ -2956,6 +2969,9 @@ split_host_port(Str) ->
 do_parse_server(Str, Opts) ->
     DefaultPort = maps:get(default_port, Opts, undefined),
     NotExpectingPort = maps:get(no_port, Opts, false),
+    DefaultScheme = maps:get(default_scheme, Opts, undefined),
+    SupportedSchemes = maps:get(supported_schemes, Opts, []),
+    NotExpectingScheme = (not is_list(DefaultScheme)) andalso length(SupportedSchemes) =:= 0,
     case is_integer(DefaultPort) andalso NotExpectingPort of
         true ->
             %% either provide a default port from schema,
@@ -2964,24 +2980,74 @@ do_parse_server(Str, Opts) ->
         false ->
             ok
     end,
+    case is_list(DefaultScheme) andalso (not lists:member(DefaultScheme, SupportedSchemes)) of
+        true ->
+            %% inconsistent schema
+            error("bad_schema");
+        false ->
+            ok
+    end,
     %% do not split with space, there should be no space allowed between host and port
     case string:tokens(Str, ":") of
-        [Hostname, Port] ->
+        [Scheme, "//" ++ Hostname, Port] ->
             NotExpectingPort andalso throw("not_expecting_port_number"),
-            {check_hostname(Hostname), parse_port(Port)};
-        [Hostname] ->
+            NotExpectingScheme andalso throw("not_expecting_scheme"),
+            {check_scheme(Scheme, Opts), check_hostname(Hostname), parse_port(Port)};
+        [Scheme, "//" ++ Hostname] ->
+            NotExpectingScheme andalso throw("not_expecting_scheme"),
             case is_integer(DefaultPort) of
                 true ->
-                    {check_hostname(Hostname), DefaultPort};
+                    {check_scheme(Scheme, Opts), check_hostname(Hostname), DefaultPort};
                 false when NotExpectingPort ->
-                    check_hostname(Hostname);
+                    {check_scheme(Scheme, Opts), check_hostname(Hostname)};
                 false ->
                     throw("missing_port_number")
             end;
+        [Hostname, Port] ->
+            NotExpectingPort andalso throw("not_expecting_port_number"),
+            case is_list(DefaultScheme) of
+                false ->
+                    {check_hostname(Hostname), parse_port(Port)};
+                true ->
+                    {DefaultScheme, check_hostname(Hostname), parse_port(Port)}
+            end;
+        [Hostname] ->
+            case is_integer(DefaultPort) orelse NotExpectingPort of
+                true ->
+                    ok;
+                false ->
+                    throw("missing_port_number")
+            end,
+            case is_list(DefaultScheme) orelse NotExpectingScheme of
+                true ->
+                    ok;
+                false ->
+                    throw("missing_scheme")
+            end,
+            case {is_integer(DefaultPort), is_list(DefaultScheme)} of
+                {true, true} ->
+                    {DefaultScheme, check_hostname(Hostname), DefaultPort};
+                {true, false} ->
+                    {check_hostname(Hostname), DefaultPort};
+                {false, true} ->
+                    {DefaultScheme, check_hostname(Hostname)};
+                {false, false} ->
+                    check_hostname(Hostname)
+            end;
         _ ->
             throw("bad_host_port")
     end.
 
+check_scheme(Str, Opts) ->
+    SupportedSchemes = maps:get(supported_schemes, Opts, []),
+    IsSupported = lists:member(Str, SupportedSchemes),
+    case IsSupported of
+        true ->
+            Str;
+        false ->
+            throw("unsupported_scheme")
+    end.
+
 check_hostname(Str) ->
     %% not intended to use inet_parse:domain here
     %% only checking space because it interferes the parsing

+ 166 - 1
apps/emqx/test/emqx_schema_tests.erl

@@ -350,7 +350,7 @@ parse_server_test_() ->
             )
         ),
         ?T(
-            "multiple servers wihtout port, mixed list(binary|string)",
+            "multiple servers without port, mixed list(binary|string)",
             ?assertEqual(
                 ["host1", "host2"],
                 Parse2([<<"host1">>, "host2"], #{no_port => true})
@@ -447,6 +447,171 @@ parse_server_test_() ->
                 "bad_schema",
                 emqx_schema:parse_server("whatever", #{default_port => 10, no_port => true})
             )
+        ),
+        ?T(
+            "scheme, hostname and port",
+            ?assertEqual(
+                {"pulsar+ssl", "host", 6651},
+                emqx_schema:parse_server(
+                    "pulsar+ssl://host:6651",
+                    #{
+                        default_port => 6650,
+                        supported_schemes => ["pulsar", "pulsar+ssl"]
+                    }
+                )
+            )
+        ),
+        ?T(
+            "scheme and hostname, default port",
+            ?assertEqual(
+                {"pulsar", "host", 6650},
+                emqx_schema:parse_server(
+                    "pulsar://host",
+                    #{
+                        default_port => 6650,
+                        supported_schemes => ["pulsar", "pulsar+ssl"]
+                    }
+                )
+            )
+        ),
+        ?T(
+            "scheme and hostname, no port",
+            ?assertEqual(
+                {"pulsar", "host"},
+                emqx_schema:parse_server(
+                    "pulsar://host",
+                    #{
+                        no_port => true,
+                        supported_schemes => ["pulsar", "pulsar+ssl"]
+                    }
+                )
+            )
+        ),
+        ?T(
+            "scheme and hostname, missing port",
+            ?assertThrow(
+                "missing_port_number",
+                emqx_schema:parse_server(
+                    "pulsar://host",
+                    #{
+                        no_port => false,
+                        supported_schemes => ["pulsar", "pulsar+ssl"]
+                    }
+                )
+            )
+        ),
+        ?T(
+            "hostname, default scheme, no default port",
+            ?assertEqual(
+                {"pulsar", "host"},
+                emqx_schema:parse_server(
+                    "host",
+                    #{
+                        default_scheme => "pulsar",
+                        no_port => true,
+                        supported_schemes => ["pulsar", "pulsar+ssl"]
+                    }
+                )
+            )
+        ),
+        ?T(
+            "hostname, default scheme, default port",
+            ?assertEqual(
+                {"pulsar", "host", 6650},
+                emqx_schema:parse_server(
+                    "host",
+                    #{
+                        default_port => 6650,
+                        default_scheme => "pulsar",
+                        supported_schemes => ["pulsar", "pulsar+ssl"]
+                    }
+                )
+            )
+        ),
+        ?T(
+            "just hostname, expecting missing scheme",
+            ?assertThrow(
+                "missing_scheme",
+                emqx_schema:parse_server(
+                    "host",
+                    #{
+                        no_port => true,
+                        supported_schemes => ["pulsar", "pulsar+ssl"]
+                    }
+                )
+            )
+        ),
+        ?T(
+            "hostname, default scheme, defined port",
+            ?assertEqual(
+                {"pulsar", "host", 6651},
+                emqx_schema:parse_server(
+                    "host:6651",
+                    #{
+                        default_port => 6650,
+                        default_scheme => "pulsar",
+                        supported_schemes => ["pulsar", "pulsar+ssl"]
+                    }
+                )
+            )
+        ),
+        ?T(
+            "inconsistent scheme opts",
+            ?assertError(
+                "bad_schema",
+                emqx_schema:parse_server(
+                    "pulsar+ssl://host:6651",
+                    #{
+                        default_port => 6650,
+                        default_scheme => "something",
+                        supported_schemes => ["not", "supported"]
+                    }
+                )
+            )
+        ),
+        ?T(
+            "hostname, default scheme, defined port",
+            ?assertEqual(
+                {"pulsar", "host", 6651},
+                emqx_schema:parse_server(
+                    "host:6651",
+                    #{
+                        default_port => 6650,
+                        default_scheme => "pulsar",
+                        supported_schemes => ["pulsar", "pulsar+ssl"]
+                    }
+                )
+            )
+        ),
+        ?T(
+            "unsupported scheme",
+            ?assertThrow(
+                "unsupported_scheme",
+                emqx_schema:parse_server(
+                    "pulsar+quic://host:6651",
+                    #{
+                        default_port => 6650,
+                        supported_schemes => ["pulsar"]
+                    }
+                )
+            )
+        ),
+        ?T(
+            "multiple hostnames with schemes (1)",
+            ?assertEqual(
+                [
+                    {"pulsar", "host", 6649},
+                    {"pulsar+ssl", "other.host", 6651},
+                    {"pulsar", "yet.another", 6650}
+                ],
+                emqx_schema:parse_servers(
+                    "pulsar://host:6649, pulsar+ssl://other.host:6651,pulsar://yet.another",
+                    #{
+                        default_port => 6650,
+                        supported_schemes => ["pulsar", "pulsar+ssl"]
+                    }
+                )
+            )
         )
     ].