Просмотр исходного кода

Merge pull request #8673 from qzhuyan/dev/william/quic-listener-config-timeouts

feat(quic): more listener options
Xinyu Liu 3 лет назад
Родитель
Сommit
4318159b85
3 измененных файлов с 69 добавлено и 19 удалено
  1. 32 3
      apps/emqx/i18n/emqx_schema_i18n.conf
  2. 18 13
      apps/emqx/src/emqx_listeners.erl
  3. 19 3
      apps/emqx/src/emqx_schema.erl

+ 32 - 3
apps/emqx/i18n/emqx_schema_i18n.conf

@@ -1967,11 +1967,10 @@ Path to the secret key file.
 fields_mqtt_quic_listener_idle_timeout {
 fields_mqtt_quic_listener_idle_timeout {
     desc {
     desc {
         en: """
         en: """
-Close transport-layer connections from the clients that have not sent MQTT CONNECT
-message within this interval.
+How long a connection can go idle before it is gracefully shut down. 0 to disable
 """
 """
         zh: """
         zh: """
-关闭在此间隔内未发送 MQTT CONNECT 消息的客户端的传输层连接。
+一个连接在被关闭之前可以空闲多长时间。0表示禁用
 """
 """
     }
     }
     label: {
     label: {
@@ -1980,6 +1979,36 @@ message within this interval.
     }
     }
 }
 }
 
 
+fields_mqtt_quic_listener_handshake_idle_timeout {
+    desc {
+        en: """
+How long a handshake can idle before it is discarded.
+"""
+        zh: """
+一个握手在被丢弃之前可以空闲多长时间。
+"""
+    }
+    label: {
+        en: "Handshake Idle Timeout"
+        zh: "握手发呆超时时间"
+    }
+}
+
+fields_mqtt_quic_listener_keep_alive_interval {
+    desc {
+        en: """
+How often to send PING frames to keep a connection alive. 0 means disabled.
+"""
+        zh: """
+发送 PING 帧的频率,以保活连接. 设为0,禁用
+"""
+    }
+    label: {
+        en: "Keep Alive Interval"
+        zh: "PING 保活频率"
+    }
+}
+
 base_listener_bind {
 base_listener_bind {
     desc {
     desc {
         en: """
         en: """

+ 18 - 13
apps/emqx/src/emqx_listeners.erl

@@ -335,23 +335,31 @@ do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
         wss -> cowboy:start_tls(Id, RanchOpts, WsOpts)
         wss -> cowboy:start_tls(Id, RanchOpts, WsOpts)
     end;
     end;
 %% Start MQTT/QUIC listener
 %% Start MQTT/QUIC listener
-do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) ->
+do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
+    ListenOn =
+        case Bind of
+            {Addr, Port} when tuple_size(Addr) == 4 ->
+                %% IPv4
+                lists:flatten(io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]));
+            {Addr, Port} when tuple_size(Addr) == 8 ->
+                %% IPv6
+                lists:flatten(io_lib:format("[~ts]:~w", [inet:ntoa(Addr), Port]));
+            Port ->
+                Port
+        end,
+
     case [A || {quicer, _, _} = A <- application:which_applications()] of
     case [A || {quicer, _, _} = A <- application:which_applications()] of
         [_] ->
         [_] ->
             DefAcceptors = erlang:system_info(schedulers_online) * 8,
             DefAcceptors = erlang:system_info(schedulers_online) * 8,
-            IdleTimeout = timer:seconds(maps:get(idle_timeout, Opts)),
             ListenOpts = [
             ListenOpts = [
                 {cert, maps:get(certfile, Opts)},
                 {cert, maps:get(certfile, Opts)},
                 {key, maps:get(keyfile, Opts)},
                 {key, maps:get(keyfile, Opts)},
                 {alpn, ["mqtt"]},
                 {alpn, ["mqtt"]},
                 {conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])},
                 {conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])},
-                {keep_alive_interval_ms, ceil(IdleTimeout / 3)},
-                {server_resumption_level, 2},
-                {idle_timeout_ms,
-                    lists:max([
-                        emqx_config:get_zone_conf(zone(Opts), [mqtt, idle_timeout]) * 3,
-                        IdleTimeout
-                    ])}
+                {keep_alive_interval_ms, maps:get(keep_alive_interval, Opts, 0)},
+                {idle_timeout_ms, maps:get(idle_timeout, Opts, 0)},
+                {handshake_idle_timeout_ms, maps:get(handshake_idle_timeout, Opts, 10000)},
+                {server_resumption_level, 2}
             ],
             ],
             ConnectionOpts = #{
             ConnectionOpts = #{
                 conn_callback => emqx_quic_connection,
                 conn_callback => emqx_quic_connection,
@@ -366,7 +374,7 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) ->
             add_limiter_bucket(Id, Opts),
             add_limiter_bucket(Id, Opts),
             quicer:start_listener(
             quicer:start_listener(
                 Id,
                 Id,
-                port(ListenOn),
+                ListenOn,
                 {ListenOpts, ConnectionOpts, StreamOpts}
                 {ListenOpts, ConnectionOpts, StreamOpts}
             );
             );
         [] ->
         [] ->
@@ -482,9 +490,6 @@ ip_port(Port) when is_integer(Port) ->
 ip_port({Addr, Port}) ->
 ip_port({Addr, Port}) ->
     [{ip, Addr}, {port, Port}].
     [{ip, Addr}, {port, Port}].
 
 
-port(Port) when is_integer(Port) -> Port;
-port({_Addr, Port}) when is_integer(Port) -> Port.
-
 esockd_access_rules(StrRules) ->
 esockd_access_rules(StrRules) ->
     Access = fun(S) ->
     Access = fun(S) ->
         [A, CIDR] = string:tokens(S, " "),
         [A, CIDR] = string:tokens(S, " "),

+ 19 - 3
apps/emqx/src/emqx_schema.erl

@@ -867,11 +867,27 @@ fields("mqtt_quic_listener") ->
         {"ciphers", ciphers_schema(quic)},
         {"ciphers", ciphers_schema(quic)},
         {"idle_timeout",
         {"idle_timeout",
             sc(
             sc(
-                duration(),
+                duration_ms(),
                 #{
                 #{
-                    default => "15s",
+                    default => "0",
                     desc => ?DESC(fields_mqtt_quic_listener_idle_timeout)
                     desc => ?DESC(fields_mqtt_quic_listener_idle_timeout)
                 }
                 }
+            )},
+        {"handshake_idle_timeout",
+            sc(
+                duration_ms(),
+                #{
+                    default => "10s",
+                    desc => ?DESC(fields_mqtt_quic_listener_handshake_idle_timeout)
+                }
+            )},
+        {"keep_alive_interval",
+            sc(
+                duration_ms(),
+                #{
+                    default => 0,
+                    desc => ?DESC(fields_mqtt_quic_listener_keep_alive_interval)
+                }
             )}
             )}
     ] ++ base_listener(14567);
     ] ++ base_listener(14567);
 fields("ws_opts") ->
 fields("ws_opts") ->
@@ -905,7 +921,7 @@ fields("ws_opts") ->
                 duration(),
                 duration(),
                 #{
                 #{
                     default => "7200s",
                     default => "7200s",
-                    desc => ?DESC(fields_mqtt_quic_listener_idle_timeout)
+                    desc => ?DESC(fields_ws_opts_idle_timeout)
                 }
                 }
             )},
             )},
         {"max_frame_size",
         {"max_frame_size",