Explorar o código

feat(quic): WIP multi-stream

William Yang %!s(int64=3) %!d(string=hai) anos
pai
achega
00b59b4939

+ 10 - 11
apps/emqx/src/emqx_connection.erl

@@ -525,11 +525,10 @@ handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
     inc_counter(incoming_bytes, Oct),
     ok = emqx_metrics:inc('bytes.received', Oct),
     when_bytes_in(Oct, Data, State);
-handle_msg({quic, Data, _Sock, _, _, _}, State) ->
-    Oct = iolist_size(Data),
-    inc_counter(incoming_bytes, Oct),
-    ok = emqx_metrics:inc('bytes.received', Oct),
-    when_bytes_in(Oct, Data, State);
+handle_msg({quic, Data, _Stream, #{len := Len}}, State) when is_binary(Data) ->
+    inc_counter(incoming_bytes, Len),
+    ok = emqx_metrics:inc('bytes.received', Len),
+    when_bytes_in(Len, Data, State);
 handle_msg(check_cache, #state{limiter_buffer = Cache} = State) ->
     case queue:peek(Cache) of
         empty ->
@@ -893,12 +892,12 @@ handle_info({sock_error, Reason}, State) ->
         false -> ok
     end,
     handle_info({sock_closed, Reason}, close_socket(State));
-handle_info({quic, peer_send_shutdown, _Stream}, State) ->
-    handle_info({sock_closed, force}, close_socket(State));
-handle_info({quic, closed, _Channel, ReasonFlag}, State) ->
-    handle_info({sock_closed, ReasonFlag}, State);
-handle_info({quic, closed, _Stream}, State) ->
-    handle_info({sock_closed, force}, State);
+%% handle_info({quic, peer_send_shutdown, _Stream}, State) ->
+%%     handle_info({sock_closed, force}, close_socket(State));
+%% handle_info({quic, closed, _Channel, ReasonFlag}, State) ->
+%%     handle_info({sock_closed, ReasonFlag}, State);
+%% handle_info({quic, closed, _Stream}, State) ->
+%%     handle_info({sock_closed, force}, State);
 handle_info(Info, State) ->
     with_channel(handle_info, [Info], State).
 

+ 2 - 1
apps/emqx/src/emqx_listeners.erl

@@ -375,7 +375,8 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
                 {keep_alive_interval_ms, maps:get(keep_alive_interval, Opts, 0)},
                 {idle_timeout_ms, maps:get(idle_timeout, Opts, 0)},
                 {handshake_idle_timeout_ms, maps:get(handshake_idle_timeout, Opts, 10000)},
-                {server_resumption_level, 2}
+                {server_resumption_level, 2},
+                {verify, none}
             ],
             ConnectionOpts = #{
                 conn_callback => emqx_quic_connection,

+ 90 - 14
apps/emqx/src/emqx_quic_connection.erl

@@ -22,24 +22,42 @@
 -define(QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0).
 -endif.
 
-%% Callbacks
+-behavior(quicer_connection).
+
 -export([
     init/1,
-    new_conn/2,
-    connected/2,
-    shutdown/2
+    new_conn/3,
+    connected/3,
+    transport_shutdown/3,
+    shutdown/3,
+    closed/3,
+    local_address_changed/3,
+    peer_address_changed/3,
+    streams_available/3,
+    peer_needs_streams/3,
+    resumed/3,
+    nst_received/3,
+    new_stream/3
 ]).
 
 -type cb_state() :: map() | proplists:proplist().
+-type cb_ret() :: ok.
 
--spec init(cb_state()) -> cb_state().
 init(ConnOpts) when is_list(ConnOpts) ->
     init(maps:from_list(ConnOpts));
+init(#{stream_opts := SOpts} = S) when is_list(SOpts) ->
+    init(S#{stream_opts := maps:from_list(SOpts)});
 init(ConnOpts) when is_map(ConnOpts) ->
-    ConnOpts.
+    {ok, ConnOpts}.
+
+closed(_Conn, #{is_peer_acked := true}, S) ->
+    {stop, normal, S};
+closed(_Conn, #{is_peer_acked := false}, S) ->
+    {stop, abnorml, S}.
 
--spec new_conn(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}.
-new_conn(Conn, #{zone := Zone} = S) ->
+-spec new_conn(quicer:connection_handler(), quicer:new_conn_props(), cb_state()) ->
+    {ok, cb_state()} | {error, any()}.
+new_conn(Conn, #{version := _Vsn}, #{zone := Zone} = S) ->
     process_flag(trap_exit, true),
     case emqx_olp:is_overloaded() andalso is_zone_olp_enabled(Zone) of
         false ->
@@ -47,7 +65,7 @@ new_conn(Conn, #{zone := Zone} = S) ->
             receive
                 {Pid, stream_acceptor_ready} ->
                     ok = quicer:async_handshake(Conn),
-                    {ok, S};
+                    {ok, S#{conn => Conn}};
                 {'EXIT', Pid, _Reason} ->
                     {error, stream_accept_error}
             end;
@@ -56,18 +74,76 @@ new_conn(Conn, #{zone := Zone} = S) ->
             {error, overloaded}
     end.
 
--spec connected(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}.
-connected(Conn, #{slow_start := false} = S) ->
+-spec connected(quicer:connection_handler(), quicer:connected_props(), cb_state()) ->
+    {ok, cb_state()} | {error, any()}.
+connected(Conn, _Props, #{slow_start := false} = S) ->
     {ok, _Pid} = emqx_connection:start_link(emqx_quic_stream, Conn, S),
     {ok, S};
-connected(_Conn, S) ->
+connected(_Conn, _Props, S) ->
+    {ok, S}.
+
+-spec resumed(quicer:connection_handle(), SessionData :: binary() | false, cb_state()) -> cb_ret().
+resumed(Conn, Data, #{resumed_callback := ResumeFun} = S) when
+    is_function(ResumeFun)
+->
+    ResumeFun(Conn, Data, S);
+resumed(_Conn, _Data, S) ->
     {ok, S}.
 
--spec shutdown(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}.
-shutdown(Conn, S) ->
+-spec nst_received(quicer:connection_handle(), TicketBin :: binary(), cb_state()) -> cb_ret().
+nst_received(_Conn, _Data, S) ->
+    {stop, no_nst_for_server, S}.
+
+-spec new_stream(quicer:stream_handle(), quicer:new_stream_props(), cb_state()) -> cb_ret().
+new_stream(
+    Stream,
+    #{is_orphan := true} = Props,
+    #{
+        conn := Conn,
+        streams := Streams,
+        stream_opts := SOpts
+    } = CBState
+) ->
+    %% Spawn new stream
+    case quicer_stream:start_link(emqx_quic_stream, Stream, Conn, SOpts, Props) of
+        {ok, StreamOwner} ->
+            quicer_connection:handoff_stream(Stream, StreamOwner),
+            {ok, CBState#{streams := [{StreamOwner, Stream} | Streams]}};
+        Other ->
+            Other
+    end.
+-spec shutdown(quicer:connection_handle(), quicer:error_code(), cb_state()) -> cb_ret().
+shutdown(Conn, _ErrorCode, S) ->
     quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0),
     {ok, S}.
 
+-spec transport_shutdown(quicer:connection_handle(), quicer:transport_shutdown_props(), cb_state()) ->
+    cb_ret().
+transport_shutdown(_C, _DownInfo, S) ->
+    {ok, S}.
+
+-spec peer_address_changed(quicer:connection_handle(), quicer:quicer_addr(), cb_state) -> cb_ret().
+peer_address_changed(_C, _NewAddr, S) ->
+    {ok, S}.
+
+-spec local_address_changed(quicer:connection_handle(), quicer:quicer_addr(), cb_state()) ->
+    cb_ret().
+local_address_changed(_C, _NewAddr, S) ->
+    {ok, S}.
+
+-spec streams_available(
+    quicer:connection_handle(),
+    {BidirStreams :: non_neg_integer(), UnidirStreams :: non_neg_integer()},
+    cb_state()
+) -> cb_ret().
+streams_available(_C, {_BidirCnt, _UnidirCnt}, S) ->
+    {ok, S}.
+
+-spec peer_needs_streams(quicer:connection_handle(), undefined, cb_state()) -> cb_ret().
+%% for https://github.com/microsoft/msquic/issues/3120
+peer_needs_streams(_C, undefined, S) ->
+    {ok, S}.
+
 -spec is_zone_olp_enabled(emqx_types:zone()) -> boolean().
 is_zone_olp_enabled(Zone) ->
     case emqx_config:get_zone_conf(Zone, [overload_protection]) of

+ 1 - 1
apps/emqx/src/emqx_quic_stream.erl

@@ -37,7 +37,7 @@ wait({ConnOwner, Conn}) ->
     ConnOwner ! {self(), stream_acceptor_ready},
     receive
         %% from msquic
-        {quic, new_stream, Stream} ->
+        {quic, new_stream, Stream, _Props} ->
             {ok, {quic, Conn, Stream}};
         {'EXIT', ConnOwner, _Reason} ->
             {error, enotconn}

+ 8 - 0
apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl

@@ -78,6 +78,14 @@ end_per_group(_Group, _Config) ->
 
 init_per_suite(Config) ->
     %% Start Apps
+    %% dbg:tracer(process, {fun dbg:dhandler/2,group_leader()}),
+    %% dbg:p(all,c),
+    %% dbg:tp(emqx_quic_connection,cx),
+    %% dbg:tp(emqx_quic_stream,cx),
+    %% dbg:tp(emqtt_quic,cx),
+    %% dbg:tp(emqtt,cx),
+    %% dbg:tp(emqtt_quic_stream,cx),
+    %% dbg:tp(emqtt_quic_connection,cx),
     emqx_common_test_helpers:boot_modules(all),
     emqx_common_test_helpers:start_apps([]),
     Config.