Przeglądaj źródła

feat(quic): multi streams

William Yang 3 lat temu
rodzic
commit
9f696928b6

+ 1 - 0
apps/emqx/src/emqx_channel.erl

@@ -1136,6 +1136,7 @@ do_deliver(Publishes, Channel) when is_list(Publishes) ->
     {Packets, NChannel} =
         lists:foldl(
             fun(Publish, {Acc, Chann}) ->
+                %% @FIXME perf:  list append with copy left list
                 {Packets, NChann} = do_deliver(Publish, Chann),
                 {Packets ++ Acc, NChann}
             end,

+ 32 - 5
apps/emqx/src/emqx_connection.erl

@@ -14,7 +14,12 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
-%% MQTT/TCP|TLS Connection|QUIC Stream
+%% This module interacts with the transport layer of MQTT
+%% Transport:
+%%   - TCP connection
+%%   - TCP/TLS connection
+%%   - WebSocket
+%%   - QUIC Stream
 -module(emqx_connection).
 
 -include("emqx.hrl").
@@ -111,7 +116,13 @@
     limiter_buffer :: queue:queue(pending_req()),
 
     %% limiter timers
-    limiter_timer :: undefined | reference()
+    limiter_timer :: undefined | reference(),
+
+    %% QUIC conn pid if is a pid
+    quic_conn_pid :: maybe(pid()),
+
+    %% QUIC control stream callback state
+    quic_ctrl_state :: map()
 }).
 
 -record(retry, {
@@ -194,7 +205,7 @@
         {ok, pid()};
     (
         emqx_quic_stream,
-        {ConnOwner :: pid(), quicer:connection_handler(), quicer:new_conn_props()},
+        {ConnOwner :: pid(), quicer:connection_handle(), quicer:new_conn_props()},
         emqx_quic_connection:cb_state()
     ) ->
         {ok, pid()}.
@@ -334,6 +345,7 @@ init_state(
     },
     ParseState = emqx_frame:initial_parse_state(FrameOpts),
     Serialize = emqx_frame:serialize_opts(),
+    %% Init Channel
     Channel = emqx_channel:init(ConnInfo, Opts),
     GcState =
         case emqx_config:get_zone_conf(Zone, [force_gc]) of
@@ -364,7 +376,10 @@ init_state(
         zone = Zone,
         listener = Listener,
         limiter_buffer = queue:new(),
-        limiter_timer = undefined
+        limiter_timer = undefined,
+        %% for quic streams to inherit
+        quic_conn_pid = maps:get(conn_pid, Opts, undefined),
+        quic_ctrl_state = #{}
     }.
 
 run_loop(
@@ -600,9 +615,20 @@ handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
 handle_msg({connack, ConnAck}, State) ->
     handle_outgoing(ConnAck, State);
 handle_msg({close, Reason}, State) ->
+    %% @FIXME here it could be close due to appl error.
     ?TRACE("SOCKET", "socket_force_closed", #{reason => Reason}),
     handle_info({sock_closed, Reason}, close_socket(State));
-handle_msg({event, connected}, State = #state{channel = Channel}) ->
+handle_msg(
+    {event, connected},
+    State = #state{
+        channel = Channel,
+        serialize = Serialize,
+        parse_state = PS,
+        quic_conn_pid = QuicConnPid
+    }
+) ->
+    QuicConnPid =/= undefined andalso
+        emqx_quic_connection:activate_data_streams(QuicConnPid, {PS, Serialize, Channel}),
     ClientId = emqx_channel:info(clientid, Channel),
     emqx_cm:insert_channel_info(ClientId, info(State), stats(State));
 handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
@@ -876,6 +902,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel})
             ok;
         Error = {error, _Reason} ->
             %% Send an inet_reply to postpone handling the error
+            %% @FIXME: why not just return error?
             self() ! {inet_reply, Socket, Error},
             ok
     end.

+ 119 - 20
apps/emqx/src/emqx_quic_connection.erl

@@ -14,6 +14,7 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
+%% @doc impl. the quic connection owner process.
 -module(emqx_quic_connection).
 
 -include("logger.hrl").
@@ -41,15 +42,46 @@
     new_stream/3
 ]).
 
+-export([activate_data_streams/2]).
+
+-export([
+    handle_call/3,
+    handle_info/2
+]).
+
 -type cb_state() :: #{
+    %% connecion owner pid
+    conn_pid := pid(),
+    %% Pid of ctrl stream
     ctrl_pid := undefined | pid(),
+    %% quic connecion handle
     conn := undefined | quicer:conneciton_hanlder(),
+    %% streams that handoff from this process, excluding control stream
+    %% these streams could die/closed without effecting the connecion/session.
+
+    %@TODO type?
+    streams := [{pid(), quicer:stream_handle()}],
+    %% New stream opts
     stream_opts := map(),
+    %% If conneciton is resumed from session ticket
     is_resumed => boolean(),
+    %% mqtt message serializer config
+    serialize => undefined,
     _ => _
 }.
 -type cb_ret() :: quicer_lib:cb_ret().
 
+%% @doc  Data streams initializions are started in parallel with control streams, data streams are blocked
+%%       for the activation from control stream after it is accepted as a legit conneciton.
+%%       For security, the initial number of allowed data streams from client should be limited by
+%%       'peer_bidi_stream_count` & 'peer_unidi_stream_count`
+-spec activate_data_streams(pid(), {
+    emqx_frame:parse_state(), emqx_frame:serialize_opts(), emqx_channel:channel()
+}) -> ok.
+activate_data_streams(ConnOwner, {PS, Serialize, Channel}) ->
+    gen_server:call(ConnOwner, {activate_data_streams, {PS, Serialize, Channel}}, infinity).
+
+%% @doc conneciton owner init callback
 -spec init(map() | list()) -> {ok, cb_state()}.
 init(ConnOpts) when is_list(ConnOpts) ->
     init(maps:from_list(ConnOpts));
@@ -64,6 +96,7 @@ closed(_Conn, #{is_peer_acked := _} = Prop, S) ->
     ?SLOG(debug, Prop),
     {stop, normal, S}.
 
+%% @doc handle the new incoming connecion as the connecion acceptor.
 -spec new_conn(quicer:connection_handler(), quicer:new_conn_props(), cb_state()) ->
     {ok, cb_state()} | {error, any()}.
 new_conn(
@@ -75,15 +108,17 @@ new_conn(
     ?SLOG(debug, ConnInfo),
     case emqx_olp:is_overloaded() andalso is_zone_olp_enabled(Zone) of
         false ->
-            {ok, Pid} = emqx_connection:start_link(
+            %% Start control stream process
+            StartOption = S,
+            {ok, CtrlPid} = emqx_connection:start_link(
                 emqx_quic_stream,
                 {self(), Conn, maps:without([crypto_buffer], ConnInfo)},
-                S
+                StartOption
             ),
             receive
-                {Pid, stream_acceptor_ready} ->
+                {CtrlPid, stream_acceptor_ready} ->
                     ok = quicer:async_handshake(Conn),
-                    {ok, S#{conn := Conn, ctrl_pid := Pid}};
+                    {ok, S#{conn := Conn, ctrl_pid := CtrlPid}};
                 {'EXIT', _Pid, _Reason} ->
                     {error, stream_accept_error}
             end;
@@ -92,6 +127,7 @@ new_conn(
             {error, overloaded}
     end.
 
+%% @doc callback when connection is connected.
 -spec connected(quicer:connection_handler(), quicer:connected_props(), cb_state()) ->
     {ok, cb_state()} | {error, any()}.
 connected(Conn, Props, #{slow_start := false} = S) ->
@@ -102,6 +138,7 @@ connected(_Conn, Props, S) ->
     ?SLOG(debug, Props),
     {ok, S}.
 
+%% @doc callback when connection is resumed from 0-RTT
 -spec resumed(quicer:connection_handle(), SessionData :: binary() | false, cb_state()) -> cb_ret().
 resumed(Conn, Data, #{resumed_callback := ResumeFun} = S) when
     is_function(ResumeFun)
@@ -110,51 +147,77 @@ resumed(Conn, Data, #{resumed_callback := ResumeFun} = S) when
 resumed(_Conn, _Data, S) ->
     {ok, S#{is_resumed := true}}.
 
+%% @doc callback for receiving nst, should never happen on server.
 -spec nst_received(quicer:connection_handle(), TicketBin :: binary(), cb_state()) -> cb_ret().
 nst_received(_Conn, _Data, S) ->
     %% As server we should not recv NST!
     {stop, no_nst_for_server, S}.
 
+%% @doc callback for handling orphan data streams
+%%      depends on the connecion state and control stream state.
 -spec new_stream(quicer:stream_handle(), quicer:new_stream_props(), cb_state()) -> cb_ret().
 new_stream(
     Stream,
-    #{is_orphan := true} = Props,
+    #{is_orphan := true, flags := _Flags} = Props,
     #{
         conn := Conn,
         streams := Streams,
-        stream_opts := SOpts
-    } = CBState
+        stream_opts := SOpts,
+        zone := Zone,
+        limiter := Limiter,
+        parse_state := PS,
+        channel := Channel,
+        serialize := Serialize
+    } = S
 ) ->
-    %% Spawn new stream
-    case quicer_stream:start_link(emqx_quic_stream, Stream, Conn, SOpts, Props) of
-        {ok, StreamOwner} ->
-            quicer_connection:handoff_stream(Stream, StreamOwner),
-            {ok, CBState#{streams := [{StreamOwner, Stream} | Streams]}};
-        Other ->
-            Other
-    end.
+    %% Cherry pick options for data streams
+    SOpts1 = SOpts#{
+        is_local => false,
+        zone => Zone,
+        % unused
+        limiter => Limiter,
+        parse_state => PS,
+        channel => Channel,
+        serialize => Serialize
+    },
+    {ok, NewStreamOwner} = quicer_stream:start_link(
+        emqx_quic_data_stream,
+        Stream,
+        Conn,
+        SOpts1,
+        Props
+    ),
+    quicer:handoff_stream(Stream, NewStreamOwner, {PS, Serialize, Channel}),
+    %% @TODO keep them in ``inactive_streams'
+    {ok, S#{streams := [{NewStreamOwner, Stream} | Streams]}}.
 
+%% @doc callback for handling for remote connecion shutdown.
 -spec shutdown(quicer:connection_handle(), quicer:error_code(), cb_state()) -> cb_ret().
 shutdown(Conn, _ErrorCode, S) ->
-    %% @TODO check spec what to do with the ErrorCode?
+    %% @TODO check spec what to set for the ErrorCode?
     quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0),
     {ok, S}.
 
+%% @doc callback for handling for transport error, such as idle timeout
 -spec transport_shutdown(quicer:connection_handle(), quicer:transport_shutdown_props(), cb_state()) ->
     cb_ret().
 transport_shutdown(_C, _DownInfo, S) ->
     %% @TODO some counter
     {ok, S}.
 
+%% @doc callback for handling for peer addr changed.
 -spec peer_address_changed(quicer:connection_handle(), quicer:quicer_addr(), cb_state) -> cb_ret().
 peer_address_changed(_C, _NewAddr, S) ->
+    %% @TODO update session info?
     {ok, S}.
 
+%% @doc callback for handling local addr change, currently unused
 -spec local_address_changed(quicer:connection_handle(), quicer:quicer_addr(), cb_state()) ->
     cb_ret().
 local_address_changed(_C, _NewAddr, S) ->
     {ok, S}.
 
+%% @doc callback for handling remote stream limit updates
 -spec streams_available(
     quicer:connection_handle(),
     {BidirStreams :: non_neg_integer(), UnidirStreams :: non_neg_integer()},
@@ -166,12 +229,43 @@ streams_available(_C, {BidirCnt, UnidirCnt}, S) ->
         peer_unidi_stream_count => UnidirCnt
     }}.
 
--spec peer_needs_streams(quicer:connection_handle(), undefined, cb_state()) -> cb_ret().
-%% @TODO this is not going to get triggered.
+%% @doc callback for handling request when remote wants for more streams
+%%      should cope with rate limiting
+%% @TODO this is not going to get triggered in current version
 %% for https://github.com/microsoft/msquic/issues/3120
+-spec peer_needs_streams(quicer:connection_handle(), undefined, cb_state()) -> cb_ret().
 peer_needs_streams(_C, undefined, S) ->
     {ok, S}.
 
+%% @doc handle API calls
+handle_call(
+    {activate_data_streams, {PS, Serialize, Channel} = ActivateData},
+    _From,
+    #{streams := Streams} = S
+) ->
+    [emqx_quic_data_stream:activate_data(OwnerPid, ActivateData) || {OwnerPid, _Stream} <- Streams],
+    {reply, ok, S#{
+        %streams := [], %% @FIXME what ??????
+        channel := Channel,
+        serialize := Serialize,
+        parse_state := PS
+    }};
+handle_call(_Req, _From, S) ->
+    {reply, {error, unimpl}, S}.
+
+%% @doc handle DOWN messages from streams.
+%% @TODO handle DOWN from supervisor?
+handle_info({'DOWN', _Ref, process, Pid, Reason}, #{streams := Streams} = S) when
+    Reason =:= normal orelse
+        Reason =:= {shutdown, protocol_error}
+->
+    case proplists:is_defined(Pid, Streams) of
+        true ->
+            {ok, S};
+        false ->
+            {stop, unknown_pid_down, S}
+    end.
+
 %%%
 %%%  Internals
 %%%
@@ -185,8 +279,13 @@ is_zone_olp_enabled(Zone) ->
     end.
 
 -spec init_cb_state(map()) -> cb_state().
-init_cb_state(Map) ->
+init_cb_state(#{zone := _Zone} = Map) ->
     Map#{
+        conn_pid => self(),
         ctrl_pid => undefined,
-        conn => undefined
+        conn => undefined,
+        streams => [],
+        parse_state => undefined,
+        channel => undefined,
+        serialize => undefined
     }.

+ 466 - 0
apps/emqx/src/emqx_quic_data_stream.erl

@@ -0,0 +1,466 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%%
+%% @doc QUIC data stream
+%% Following the behaviour of emqx_connection:
+%%  The MQTT packets and their side effects are handled *atomically*.
+%%
+
+-module(emqx_quic_data_stream).
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("quicer/include/quicer.hrl").
+-include("emqx_mqtt.hrl").
+-include("logger.hrl").
+-behaviour(quicer_stream).
+
+%% Connection Callbacks
+-export([
+    init_handoff/4,
+    post_handoff/3,
+    new_stream/3,
+    start_completed/3,
+    send_complete/3,
+    peer_send_shutdown/3,
+    peer_send_aborted/3,
+    peer_receive_aborted/3,
+    send_shutdown_complete/3,
+    stream_closed/3,
+    peer_accepted/3,
+    passive/3
+]).
+
+-export([handle_stream_data/4]).
+
+-export([activate_data/2]).
+
+-export([
+    handle_call/3,
+    handle_info/2,
+    handle_continue/2
+]).
+
+%%
+%% @doc Activate the data handling.
+%%      Data handling is disabled before control stream allows the data processing.
+-spec activate_data(pid(), {
+    emqx_frame:parse_state(), emqx_frame:serialize_opts(), emqx_channel:channel()
+}) -> ok.
+activate_data(StreamPid, {PS, Serialize, Channel}) ->
+    gen_server:call(StreamPid, {activate, {PS, Serialize, Channel}}, infinity).
+
+%%
+%% @doc Handoff from previous owner, mostly from the connection owner.
+%% @TODO parse_state doesn't look necessary since we have it in post_handoff
+%% @TODO -spec
+init_handoff(
+    Stream,
+    #{parse_state := PS} = _StreamOpts,
+    Connection,
+    #{is_orphan := true, flags := Flags}
+) ->
+    {ok, init_state(Stream, Connection, Flags, PS)}.
+
+%%
+%% @doc Post handoff data stream
+%%
+%% @TODO -spec
+%%
+post_handoff(Stream, {PS, Serialize, Channel}, S) ->
+    ?tp(debug, ?FUNCTION_NAME, #{channel => Channel, serialize => Serialize}),
+    quicer:setopt(Stream, active, true),
+    {ok, S#{channel := Channel, serialize := Serialize, parse_state := PS}}.
+
+%%
+%% @doc when this proc is assigned to the owner of new stream
+%%
+new_stream(Stream, #{flags := Flags}, Connection) ->
+    {ok, init_state(Stream, Connection, Flags)}.
+
+%%
+%% @doc for local initiated stream
+%%
+peer_accepted(_Stream, _Flags, S) ->
+    %% we just ignore it
+    {ok, S}.
+
+peer_receive_aborted(Stream, ErrorCode, #{is_unidir := false} = S) ->
+    %% we abort send with same reason
+    quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode),
+    {ok, S};
+peer_receive_aborted(Stream, ErrorCode, #{is_unidir := true, is_local := true} = S) ->
+    quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode),
+    {ok, S}.
+
+peer_send_aborted(Stream, ErrorCode, #{is_unidir := false} = S) ->
+    %% we abort receive with same reason
+    quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode),
+    {ok, S};
+peer_send_aborted(Stream, ErrorCode, #{is_unidir := true, is_local := false} = S) ->
+    quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode),
+    {ok, S}.
+
+peer_send_shutdown(Stream, _Flags, S) ->
+    ok = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0),
+    {ok, S}.
+
+send_complete(_Stream, false, S) ->
+    {ok, S};
+send_complete(_Stream, true = _IsCanceled, S) ->
+    {ok, S}.
+
+send_shutdown_complete(_Stream, _Flags, S) ->
+    {ok, S}.
+
+start_completed(_Stream, #{status := success, stream_id := StreamId}, S) ->
+    {ok, S#{stream_id => StreamId}};
+start_completed(_Stream, #{status := Other}, S) ->
+    %% or we could retry
+    {stop, {start_fail, Other}, S}.
+
+handle_stream_data(
+    Stream,
+    Bin,
+    _Flags,
+    #{
+        is_unidir := false,
+        channel := undefined,
+        data_queue := Queue,
+        stream := Stream
+    } = State
+) when is_binary(Bin) ->
+    {ok, State#{data_queue := [Bin | Queue]}};
+handle_stream_data(
+    _Stream,
+    Bin,
+    _Flags,
+    #{
+        is_unidir := false,
+        channel := Channel,
+        parse_state := PS,
+        data_queue := QueuedData,
+        task_queue := TQ
+    } = State
+) when
+    Channel =/= undefined
+->
+    {MQTTPackets, NewPS} = parse_incoming(list_to_binary(lists:reverse([Bin | QueuedData])), PS),
+    NewTQ = lists:foldl(
+        fun(Item, Acc) ->
+            queue:in(Item, Acc)
+        end,
+        TQ,
+        [{incoming, P} || P <- lists:reverse(MQTTPackets)]
+    ),
+    {{continue, handle_appl_msg}, State#{parse_state := NewPS, task_queue := NewTQ}}.
+
+%% Reserved for unidi streams
+%% handle_stream_data(Stream, Bin, _Flags, #{is_unidir := true, peer_stream := PeerStream, conn := Conn} = State) ->
+%%     case PeerStream of
+%%         undefined ->
+%%             {ok, StreamProc} = quicer_stream:start_link(?MODULE, Conn,
+%%                                                         [ {open_flag, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}
+%%                                                         , {is_local, true}
+%%                                                         ]),
+%%             {ok, _} = quicer_stream:send(StreamProc, Bin),
+%%             {ok, State#{peer_stream := StreamProc}};
+%%         StreamProc when is_pid(StreamProc) ->
+%%             {ok, _} = quicer_stream:send(StreamProc, Bin),
+%%             {ok, State}
+%%     end.
+
+passive(_Stream, undefined, S) ->
+    {ok, S}.
+
+stream_closed(
+    _Stream,
+    #{
+        is_conn_shutdown := IsConnShutdown,
+        is_app_closing := IsAppClosing,
+        is_shutdown_by_app := IsAppShutdown,
+        is_closed_remotely := IsRemote,
+        status := Status,
+        error := Code
+    },
+    S
+) when
+    is_boolean(IsConnShutdown) andalso
+        is_boolean(IsAppClosing) andalso
+        is_boolean(IsAppShutdown) andalso
+        is_boolean(IsRemote) andalso
+        is_atom(Status) andalso
+        is_integer(Code)
+->
+    {stop, normal, S}.
+
+handle_call(Call, _From, S) ->
+    do_handle_call(Call, S).
+
+handle_continue(handle_appl_msg, #{task_queue := Q} = S) ->
+    case queue:out(Q) of
+        {{value, Item}, Q2} ->
+            do_handle_appl_msg(Item, S#{task_queue := Q2});
+        {empty, Q} ->
+            {ok, S}
+    end.
+
+do_handle_appl_msg(
+    {outgoing, Packets},
+    #{
+        channel := Channel,
+        stream := _Stream,
+        serialize := _Serialize
+    } = S
+) when
+    Channel =/= undefined
+->
+    case handle_outgoing(Packets, S) of
+        {ok, Size} ->
+            ok = emqx_metrics:inc('bytes.sent', Size),
+            {{continue, handle_appl_msg}, S};
+        {error, E1, E2} ->
+            {stop, {E1, E2}, S};
+        {error, E} ->
+            {stop, E, S}
+    end;
+do_handle_appl_msg({incoming, #mqtt_packet{} = Packet}, #{channel := Channel} = S) when
+    Channel =/= undefined
+->
+    with_channel(handle_in, [Packet], S);
+do_handle_appl_msg({close, Reason}, S) ->
+    %% @TODO shall we abort shutdown or graceful shutdown?
+    with_channel(handle_info, [{sock_closed, Reason}], S);
+do_handle_appl_msg({event, updated}, S) ->
+    %% Data stream don't care about connection state changes.
+    {{continue, handle_appl_msg}, S}.
+
+handle_info(Deliver = {deliver, _, _}, S) ->
+    Delivers = [Deliver],
+    with_channel(handle_deliver, [Delivers], S).
+
+with_channel(Fun, Args, #{channel := Channel, task_queue := Q} = S) when
+    Channel =/= undefined
+->
+    case apply(emqx_channel, Fun, Args ++ [Channel]) of
+        ok ->
+            {{continue, handle_appl_msg}, S};
+        {ok, Msgs, NewChannel} when is_list(Msgs) ->
+            {{continue, handle_appl_msg}, S#{
+                task_queue := queue:join(Q, queue:from_list(Msgs)),
+                channel := NewChannel
+            }};
+        {ok, Msg, NewChannel} when is_record(Msg, mqtt_packet) ->
+            {{continue, handle_appl_msg}, S#{
+                task_queue := queue:in({outgoing, Msg}, Q), channel := NewChannel
+            }};
+        %% @FIXME WTH?
+        {ok, {outgoing, _} = Msg, NewChannel} ->
+            {{continue, handle_appl_msg}, S#{task_queue := queue:in(Msg, Q), channel := NewChannel}};
+        {ok, NewChannel} ->
+            {{continue, handle_appl_msg}, S#{channel := NewChannel}};
+        %% @TODO optimisation for shutdown wrap
+        {shutdown, Reason, NewChannel} ->
+            {stop, {shutdown, Reason}, S#{channel := NewChannel}};
+        {shutdown, Reason, Msgs, NewChannel} when is_list(Msgs) ->
+            %% @TODO handle outgoing?
+            {stop, {shutdown, Reason}, S#{
+                channel := NewChannel,
+                task_queue := queue:join(Q, queue:from_list(Msgs))
+            }};
+        {shutdown, Reason, Msg, NewChannel} ->
+            {stop, {shutdown, Reason}, S#{
+                channel := NewChannel,
+                task_queue := queue:in(Msg, Q)
+            }}
+    end.
+
+%%% Internals
+handle_outgoing(#mqtt_packet{} = P, S) ->
+    handle_outgoing([P], S);
+handle_outgoing(Packets, #{serialize := Serialize, stream := Stream, is_unidir := false}) when
+    is_list(Packets)
+->
+    OutBin = [serialize_packet(P, Serialize) || P <- filter_disallowed_out(Packets)],
+    %% @TODO in which case shall we use sync send?
+    Res = quicer:async_send(Stream, OutBin),
+    ?TRACE("MQTT", "mqtt_packet_sent", #{packets => Packets}),
+    [ok = inc_outgoing_stats(P) || P <- Packets],
+    Res.
+
+serialize_packet(Packet, Serialize) ->
+    try emqx_frame:serialize_pkt(Packet, Serialize) of
+        <<>> ->
+            ?SLOG(warning, #{
+                msg => "packet_is_discarded",
+                reason => "frame_is_too_large",
+                packet => emqx_packet:format(Packet, hidden)
+            }),
+            ok = emqx_metrics:inc('delivery.dropped.too_large'),
+            ok = emqx_metrics:inc('delivery.dropped'),
+            ok = inc_outgoing_stats({error, message_too_large}),
+            <<>>;
+        Data ->
+            Data
+    catch
+        %% Maybe Never happen.
+        throw:{?FRAME_SERIALIZE_ERROR, Reason} ->
+            ?SLOG(info, #{
+                reason => Reason,
+                input_packet => Packet
+            }),
+            erlang:error({?FRAME_SERIALIZE_ERROR, Reason});
+        error:Reason:Stacktrace ->
+            ?SLOG(error, #{
+                input_packet => Packet,
+                exception => Reason,
+                stacktrace => Stacktrace
+            }),
+            erlang:error(?FRAME_SERIALIZE_ERROR)
+    end.
+
+-spec init_state(
+    quicer:stream_handle(),
+    quicer:connection_handle(),
+    quicer:new_stream_props()
+) ->
+    % @TODO
+    map().
+init_state(Stream, Connection, OpenFlags) ->
+    init_state(Stream, Connection, OpenFlags, undefined).
+
+init_state(Stream, Connection, OpenFlags, PS) ->
+    %% quic stream handle
+    #{
+        stream => Stream,
+        %% quic connection handle
+        conn => Connection,
+        %% if it is QUIC unidi stream
+        is_unidir => quicer:is_unidirectional(OpenFlags),
+        %% Frame Parse State
+        parse_state => PS,
+        %% Peer Stream handle in a pair for type unidir only
+        peer_stream => undefined,
+        %% if the stream is locally initiated.
+        is_local => false,
+        %% queue binary data when is NOT connected, in reversed order.
+        data_queue => [],
+        %% Channel from connection
+        %% `undefined' means the connection is not connected.
+        channel => undefined,
+        %% serialize opts for connection
+        serialize => undefined,
+        %% Current working queue
+        task_queue => queue:new()
+    }.
+
+-spec do_handle_call(term(), quicer_stream:cb_state()) -> quicer_stream:cb_ret().
+do_handle_call(
+    {activate, {PS, Serialize, Channel}},
+    #{
+        channel := undefined,
+        stream := Stream,
+        serialize := undefined
+    } = S
+) ->
+    NewS = S#{channel := Channel, serialize := Serialize, parse_state := PS},
+    %% We use quic protocol for flow control, and we don't check return val
+    case quicer:setopt(Stream, active, true) of
+        ok ->
+            {ok, NewS};
+        {error, E} ->
+            ?SLOG(error, #{msg => "set stream active failed", error => E}),
+            {stop, E, NewS}
+    end;
+do_handle_call(_Call, S) ->
+    {reply, {error, unimpl}, S}.
+
+%% @doc return reserved order of Packets
+parse_incoming(Data, PS) ->
+    try
+        do_parse_incoming(Data, [], PS)
+    catch
+        throw:{?FRAME_PARSE_ERROR, Reason} ->
+            ?SLOG(info, #{
+                reason => Reason,
+                input_bytes => Data
+            }),
+            {[{frame_error, Reason}], PS};
+        error:Reason:Stacktrace ->
+            ?SLOG(error, #{
+                input_bytes => Data,
+                reason => Reason,
+                stacktrace => Stacktrace
+            }),
+            {[{frame_error, Reason}], PS}
+    end.
+
+do_parse_incoming(<<>>, Packets, ParseState) ->
+    {Packets, ParseState};
+do_parse_incoming(Data, Packets, ParseState) ->
+    case emqx_frame:parse(Data, ParseState) of
+        {more, NParseState} ->
+            {Packets, NParseState};
+        {ok, Packet, Rest, NParseState} ->
+            do_parse_incoming(Rest, [Packet | Packets], NParseState)
+    end.
+
+%% followings are copied from emqx_connection
+-compile({inline, [inc_outgoing_stats/1]}).
+inc_outgoing_stats({error, message_too_large}) ->
+    inc_counter('send_msg.dropped', 1),
+    inc_counter('send_msg.dropped.too_large', 1);
+inc_outgoing_stats(Packet = ?PACKET(Type)) ->
+    inc_counter(send_pkt, 1),
+    case Type of
+        ?PUBLISH ->
+            inc_counter(send_msg, 1),
+            inc_counter(outgoing_pubs, 1),
+            inc_qos_stats(send_msg, Packet);
+        _ ->
+            ok
+    end,
+    emqx_metrics:inc_sent(Packet).
+
+inc_counter(Key, Inc) ->
+    _ = emqx_pd:inc_counter(Key, Inc),
+    ok.
+
+inc_qos_stats(Type, Packet) ->
+    case inc_qos_stats_key(Type, emqx_packet:qos(Packet)) of
+        undefined ->
+            ignore;
+        Key ->
+            inc_counter(Key, 1)
+    end.
+
+inc_qos_stats_key(send_msg, ?QOS_0) -> 'send_msg.qos0';
+inc_qos_stats_key(send_msg, ?QOS_1) -> 'send_msg.qos1';
+inc_qos_stats_key(send_msg, ?QOS_2) -> 'send_msg.qos2';
+inc_qos_stats_key(recv_msg, ?QOS_0) -> 'recv_msg.qos0';
+inc_qos_stats_key(recv_msg, ?QOS_1) -> 'recv_msg.qos1';
+inc_qos_stats_key(recv_msg, ?QOS_2) -> 'recv_msg.qos2';
+%% for bad qos
+inc_qos_stats_key(_, _) -> undefined.
+
+filter_disallowed_out(Packets) ->
+    lists:filter(fun is_datastream_out_pkt/1, Packets).
+
+is_datastream_out_pkt(#mqtt_packet{header = #mqtt_packet_header{type = Type}}) when
+    Type > 2 andalso Type < 12
+->
+    true;
+is_datastream_out_pkt(_) ->
+    false.

+ 26 - 17
apps/emqx/src/emqx_quic_stream.erl

@@ -78,17 +78,24 @@
 -type socket_info() :: #{
     is_orphan => boolean(),
     ctrl_stream_start_flags => quicer:stream_open_flags(),
-    %% quicer:new_conn_props
+    %% and quicer:new_conn_props()
     _ => _
 }.
 
--spec wait({pid(), quicer:connection_handle(), socket_info()}) ->
-    {ok, socket()} | {error, enotconn}.
+%% for accepting
+-spec wait
+    ({pid(), connection_handle(), socket_info()}) ->
+        {ok, socket()} | {error, enotconn};
+    %% For handover
+    ({pid(), connection_handle(), stream_handle(), socket_info()}) ->
+        {ok, socket()} | {error, any()}.
+
+%%% For Accepting New Remote Stream
 wait({ConnOwner, Conn, ConnInfo}) ->
     {ok, Conn} = quicer:async_accept_stream(Conn, []),
     ConnOwner ! {self(), stream_acceptor_ready},
     receive
-        %% New incoming stream, this is a *ctrl* stream
+        %% New incoming stream, this is a *control* stream
         {quic, new_stream, Stream, #{is_orphan := IsOrphan, flags := StartFlags}} ->
             SocketInfo = ConnInfo#{
                 is_orphan => IsOrphan,
@@ -101,6 +108,14 @@ wait({ConnOwner, Conn, ConnInfo}) ->
         %% Connection owner process down
         {'EXIT', ConnOwner, _Reason} ->
             {error, enotconn}
+    end;
+%% For ownership handover
+wait({PrevOwner, Conn, Stream, SocketInfo}) ->
+    case quicer:wait_for_handoff(PrevOwner, Stream) of
+        ok ->
+            {ok, socket(Conn, Stream, SocketInfo)};
+        owner_down ->
+            {error, owner_down}
     end.
 
 type(_) ->
@@ -144,9 +159,10 @@ getopts(_Socket, _Opts) ->
         {buffer, 80000}
     ]}.
 
-fast_close({quic, _Conn, Stream, _Info}) ->
-    %% Flush send buffer, gracefully shutdown
-    quicer:async_shutdown_stream(Stream),
+fast_close({quic, Conn, _Stream, _Info}) ->
+    %% Since we shutdown the control stream, we shutdown the connection as well
+    %% @TODO supply some App Error Code
+    quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0),
     ok.
 
 -spec ensure_ok_or_exit(atom(), list(term())) -> term().
@@ -187,21 +203,14 @@ peer_accepted(_Stream, undefined, S) ->
     {ok, S}.
 
 -spec peer_receive_aborted(stream_handle(), non_neg_integer(), cb_data()) -> cb_ret().
-peer_receive_aborted(Stream, ErrorCode, #{is_unidir := false} = S) ->
-    %% we abort send with same reason
-    quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode),
-    {ok, S};
-peer_receive_aborted(Stream, ErrorCode, #{is_unidir := true, is_local := true} = S) ->
+peer_receive_aborted(Stream, ErrorCode, S) ->
     quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode),
     {ok, S}.
 
 -spec peer_send_aborted(stream_handle(), non_neg_integer(), cb_data()) -> cb_ret().
-peer_send_aborted(Stream, ErrorCode, #{is_unidir := false} = S) ->
+peer_send_aborted(Stream, ErrorCode, S) ->
     %% we abort receive with same reason
-    quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode),
-    {ok, S};
-peer_send_aborted(Stream, ErrorCode, #{is_unidir := true, is_local := false} = S) ->
-    quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode),
+    quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode),
     {ok, S}.
 
 -spec peer_send_shutdown(stream_handle(), undefined, cb_data()) -> cb_ret().

+ 14 - 8
apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl

@@ -65,6 +65,7 @@ init_per_group(quic, Config) ->
     UdpPort = 1884,
     emqx_common_test_helpers:start_apps([]),
     emqx_common_test_helpers:ensure_quic_listener(?MODULE, UdpPort),
+    emqx_logger:set_log_level(debug),
     [{port, UdpPort}, {conn_fun, quic_connect} | Config];
 init_per_group(_, Config) ->
     emqx_common_test_helpers:stop_apps([]),
@@ -78,14 +79,19 @@ end_per_group(_Group, _Config) ->
 
 init_per_suite(Config) ->
     %% Start Apps
-    %% dbg:tracer(process, {fun dbg:dhandler/2,group_leader()}),
-    %% dbg:p(all,c),
-    %% dbg:tp(emqx_quic_connection,cx),
-    %% dbg:tp(emqx_quic_stream,cx),
-    %% dbg:tp(emqtt_quic,cx),
-    %% dbg:tp(emqtt,cx),
-    %% dbg:tp(emqtt_quic_stream,cx),
-    %% dbg:tp(emqtt_quic_connection,cx),
+    dbg:tracer(process, {fun dbg:dhandler/2, group_leader()}),
+    dbg:p(all, c),
+    dbg:tp(emqx_quic_connection, cx),
+    dbg:tp(quicer_connection, cx),
+    %% dbg:tp(emqx_quic_stream, cx),
+    %% dbg:tp(emqtt_quic, cx),
+    %% dbg:tp(emqtt, cx),
+    %% dbg:tp(emqtt_quic_stream, cx),
+    %% dbg:tp(emqtt_quic_connection, cx),
+    %% dbg:tp(emqx_cm, open_session, cx),
+    %% dbg:tpl(emqx_cm, lookup_channels, cx),
+    %% dbg:tpl(emqx_cm, register_channel, cx),
+    %% dbg:tpl(emqx_cm, unregister_channel, cx),
     emqx_common_test_helpers:boot_modules(all),
     emqx_common_test_helpers:start_apps([]),
     Config.

+ 190 - 0
apps/emqx/test/emqx_quic_multistreams_SUITE.erl

@@ -0,0 +1,190 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_quic_multistreams_SUITE).
+
+-compile(export_all).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-define(TOPICS, [
+    <<"TopicA">>,
+    <<"TopicA/B">>,
+    <<"Topic/C">>,
+    <<"TopicA/C">>,
+    <<"/TopicA">>
+]).
+
+%%--------------------------------------------------------------------
+%% @spec suite() -> Info
+%% Info = [tuple()]
+%% @end
+%%--------------------------------------------------------------------
+suite() ->
+    [{timetrap, {seconds, 30}}].
+
+%%--------------------------------------------------------------------
+%% @spec init_per_suite(Config0) ->
+%%     Config1 | {skip,Reason} | {skip_and_save,Reason,Config1}
+%% Config0 = Config1 = [tuple()]
+%% Reason = term()
+%% @end
+%%--------------------------------------------------------------------
+init_per_suite(Config) ->
+    UdpPort = 1884,
+    emqx_common_test_helpers:boot_modules(all),
+    emqx_common_test_helpers:start_apps([]),
+    emqx_common_test_helpers:ensure_quic_listener(?MODULE, UdpPort),
+    %% @TODO remove
+    emqx_logger:set_log_level(debug),
+
+    dbg:tracer(process, {fun dbg:dhandler/2, group_leader()}),
+    dbg:p(all, c),
+
+    %dbg:tp(emqx_quic_stream, cx),
+    %% dbg:tp(quicer_stream, cx),
+    %% dbg:tp(emqx_quic_data_stream, cx),
+    %% dbg:tp(emqx_channel, cx),
+    %% dbg:tp(emqx_packet,check,cx),
+    %% dbg:tp(emqx_frame,parse,cx),
+    %dbg:tp(emqx_quic_connection, cx),
+    [{port, UdpPort}, {conn_fun, quic_connect} | Config].
+
+%%--------------------------------------------------------------------
+%% @spec end_per_suite(Config0) -> term() | {save_config,Config1}
+%% Config0 = Config1 = [tuple()]
+%% @end
+%%--------------------------------------------------------------------
+end_per_suite(_Config) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% @spec init_per_group(GroupName, Config0) ->
+%%               Config1 | {skip,Reason} | {skip_and_save,Reason,Config1}
+%% GroupName = atom()
+%% Config0 = Config1 = [tuple()]
+%% Reason = term()
+%% @end
+%%--------------------------------------------------------------------
+init_per_group(_GroupName, Config) ->
+    Config.
+
+%%--------------------------------------------------------------------
+%% @spec end_per_group(GroupName, Config0) ->
+%%               term() | {save_config,Config1}
+%% GroupName = atom()
+%% Config0 = Config1 = [tuple()]
+%% @end
+%%--------------------------------------------------------------------
+end_per_group(_GroupName, _Config) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% @spec init_per_testcase(TestCase, Config0) ->
+%%               Config1 | {skip,Reason} | {skip_and_save,Reason,Config1}
+%% TestCase = atom()
+%% Config0 = Config1 = [tuple()]
+%% Reason = term()
+%% @end
+%%--------------------------------------------------------------------
+init_per_testcase(_TestCase, Config) ->
+    Config.
+
+%%--------------------------------------------------------------------
+%% @spec end_per_testcase(TestCase, Config0) ->
+%%               term() | {save_config,Config1} | {fail,Reason}
+%% TestCase = atom()
+%% Config0 = Config1 = [tuple()]
+%% Reason = term()
+%% @end
+%%--------------------------------------------------------------------
+end_per_testcase(_TestCase, _Config) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% @spec groups() -> [Group]
+%% Group = {GroupName,Properties,GroupsAndTestCases}
+%% GroupName = atom()
+%% Properties = [parallel | sequence | Shuffle | {RepeatType,N}]
+%% GroupsAndTestCases = [Group | {group,GroupName} | TestCase]
+%% TestCase = atom()
+%% Shuffle = shuffle | {shuffle,{integer(),integer(),integer()}}
+%% RepeatType = repeat | repeat_until_all_ok | repeat_until_all_fail |
+%%              repeat_until_any_ok | repeat_until_any_fail
+%% N = integer() | forever
+%% @end
+%%--------------------------------------------------------------------
+groups() ->
+    [].
+
+%%--------------------------------------------------------------------
+%% @spec all() -> GroupsAndTestCases | {skip,Reason}
+%% GroupsAndTestCases = [{group,GroupName} | TestCase]
+%% GroupName = atom()
+%% TestCase = atom()
+%% Reason = term()
+%% @end
+%%--------------------------------------------------------------------
+all() ->
+    [
+        tc_data_stream_sub
+    ].
+
+%%--------------------------------------------------------------------
+%% @spec TestCase(Config0) ->
+%%               ok | exit() | {skip,Reason} | {comment,Comment} |
+%%               {save_config,Config1} | {skip_and_save,Reason,Config1}
+%% Config0 = Config1 = [tuple()]
+%% Reason = term()
+%% Comment = term()
+%% @end
+%%--------------------------------------------------------------------
+
+%% @doc Test MQTT Subscribe via data_stream
+tc_data_stream_sub(Config) ->
+    Topic = lists:nth(1, ?TOPICS),
+    {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]),
+    {ok, _} = emqtt:quic_connect(C),
+    {ok, _, [1]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [{Topic, [{qos, qos1}]}]),
+    {ok, _, [2]} = emqtt:subscribe_via(
+        C,
+        {new_data_stream, []},
+        #{},
+        [{lists:nth(2, ?TOPICS), [{qos, qos2}]}]
+    ),
+    {ok, _} = emqtt:publish(C, Topic, <<"qos 2 1">>, 2),
+    {ok, _} = emqtt:publish(C, Topic, <<"qos 2 2">>, 2),
+    {ok, _} = emqtt:publish(C, Topic, <<"qos 2 3">>, 2),
+    Msgs = receive_messages(3),
+    ct:pal("recv msg: ~p", [Msgs]),
+    ?assertEqual(3, length(Msgs)),
+    ok = emqtt:disconnect(C).
+
+receive_messages(Count) ->
+    receive_messages(Count, []).
+
+receive_messages(0, Msgs) ->
+    Msgs;
+receive_messages(Count, Msgs) ->
+    receive
+        {publish, Msg} ->
+            receive_messages(Count - 1, [Msg | Msgs]);
+        _Other ->
+            receive_messages(Count, Msgs)
+    after 1000 ->
+        Msgs
+    end.