Преглед изворни кода

feat(quic): support QUIC streams process hibernation

William Yang пре 1 година
родитељ
комит
036c6d9aa6

+ 12 - 5
apps/emqx/src/emqx_connection.erl

@@ -106,6 +106,8 @@
     idle_timeout :: integer() | infinity,
     idle_timeout :: integer() | infinity,
     %% Idle Timer
     %% Idle Timer
     idle_timer :: option(reference()),
     idle_timer :: option(reference()),
+    %% Idle Timeout
+    hibernate_after :: integer() | infinity,
     %% Zone name
     %% Zone name
     zone :: atom(),
     zone :: atom(),
     %% Listener Type and Name
     %% Listener Type and Name
@@ -361,6 +363,7 @@ init_state(
         stats_timer = StatsTimer,
         stats_timer = StatsTimer,
         idle_timeout = IdleTimeout,
         idle_timeout = IdleTimeout,
         idle_timer = IdleTimer,
         idle_timer = IdleTimer,
+        hibernate_after = maps:get(hibernate_after, Opts, IdleTimeout),
         zone = Zone,
         zone = Zone,
         listener = Listener,
         listener = Listener,
         limiter_buffer = queue:new(),
         limiter_buffer = queue:new(),
@@ -410,23 +413,27 @@ exit_on_sock_error(Reason) ->
 recvloop(
 recvloop(
     Parent,
     Parent,
     State = #state{
     State = #state{
-        idle_timeout = IdleTimeout0,
+        hibernate_after = HibernateAfterMs,
+        channel = Channel,
         zone = Zone
         zone = Zone
     }
     }
 ) ->
 ) ->
-    IdleTimeout =
-        case IdleTimeout0 of
+    HibernateTimeout =
+        case HibernateAfterMs of
             infinity -> infinity;
             infinity -> infinity;
-            _ -> IdleTimeout0 + 100
+            _ -> HibernateAfterMs
         end,
         end,
     receive
     receive
         Msg ->
         Msg ->
             handle_recv(Msg, Parent, State)
             handle_recv(Msg, Parent, State)
-    after IdleTimeout ->
+    after HibernateTimeout ->
         case emqx_olp:backoff_hibernation(Zone) of
         case emqx_olp:backoff_hibernation(Zone) of
             true ->
             true ->
                 recvloop(Parent, State);
                 recvloop(Parent, State);
             false ->
             false ->
+                ClientId = emqx_channel:info(clientid, Channel),
+                undefined =/= ClientId andalso
+                    emqx_cm:set_chan_stats(ClientId, stats(State)),
                 hibernate(Parent, cancel_stats_timer(State))
                 hibernate(Parent, cancel_stats_timer(State))
         end
         end
     end.
     end.

+ 4 - 2
apps/emqx/src/emqx_listeners.erl

@@ -457,11 +457,13 @@ do_start_listener(quic, Name, Id, #{bind := Bind} = Opts) ->
                 peer_bidi_stream_count => maps:get(peer_bidi_stream_count, Opts, 10),
                 peer_bidi_stream_count => maps:get(peer_bidi_stream_count, Opts, 10),
                 zone => zone(Opts),
                 zone => zone(Opts),
                 listener => {quic, Name},
                 listener => {quic, Name},
-                limiter => Limiter
+                limiter => Limiter,
+                hibernate_after => maps:get(hibernate_after, ListenOpts)
             },
             },
             StreamOpts = #{
             StreamOpts = #{
                 stream_callback => emqx_quic_stream,
                 stream_callback => emqx_quic_stream,
-                active => 1
+                active => 1,
+                hibernate_after => maps:get(hibernate_after, ListenOpts)
             },
             },
             quicer:spawn_listener(
             quicer:spawn_listener(
                 Id,
                 Id,

+ 2 - 1
apps/emqx/src/emqx_quic_connection.erl

@@ -169,6 +169,7 @@ new_stream(
         parse_state := PS,
         parse_state := PS,
         channel := Channel,
         channel := Channel,
         serialize := Serialize,
         serialize := Serialize,
+        hibernate_after := HibernateAfterMs,
         conn_shared_state := SS
         conn_shared_state := SS
     } = S
     } = S
 ) ->
 ) ->
@@ -190,7 +191,7 @@ new_stream(
         Conn,
         Conn,
         SOpts1,
         SOpts1,
         Props,
         Props,
-        [{hibernate_after, 1000}]
+        [{hibernate_after, HibernateAfterMs}]
     ),
     ),
     case quicer:handoff_stream(Stream, NewStreamOwner, {PS, Serialize, Channel}) of
     case quicer:handoff_stream(Stream, NewStreamOwner, {PS, Serialize, Channel}) of
         ok ->
         ok ->

+ 2 - 2
apps/emqx/src/emqx_schema.erl

@@ -3330,9 +3330,9 @@ is_quic_ssl_opts(Name) ->
         "certfile",
         "certfile",
         "keyfile",
         "keyfile",
         "verify",
         "verify",
-        "password"
+        "password",
+        "hibernate_after"
         %% Followings are planned
         %% Followings are planned
-        %% , "hibernate_after"
         %% , "fail_if_no_peer_cert"
         %% , "fail_if_no_peer_cert"
         %% , "handshake_timeout"
         %% , "handshake_timeout"
         %% , "gc_after_handshake"
         %% , "gc_after_handshake"

+ 2 - 1
apps/emqx/test/emqx_common_test_helpers.erl

@@ -664,7 +664,8 @@ ensure_quic_listener(Name, UdpPort, ExtraSettings) ->
         idle_timeout => 15000,
         idle_timeout => 15000,
         ssl_options => #{
         ssl_options => #{
             certfile => filename:join(code:lib_dir(emqx), "etc/certs/cert.pem"),
             certfile => filename:join(code:lib_dir(emqx), "etc/certs/cert.pem"),
-            keyfile => filename:join(code:lib_dir(emqx), "etc/certs/key.pem")
+            keyfile => filename:join(code:lib_dir(emqx), "etc/certs/key.pem"),
+            hibernate_after => 30000
         },
         },
         limiter => #{},
         limiter => #{},
         max_connections => 1024000,
         max_connections => 1024000,

+ 3 - 0
changes/ce/feat-14112.en.md

@@ -0,0 +1,3 @@
+Add support `ssl_options.hibernate_after` in QUIC listener to reduce memory footprint of QUIC transport.
+
+