|
|
@@ -16,6 +16,7 @@
|
|
|
|
|
|
%% MQTT/QUIC Stream
|
|
|
-module(emqx_quic_stream).
|
|
|
+-include_lib("quicer/include/quicer.hrl").
|
|
|
|
|
|
%% emqx transport Callbacks
|
|
|
-export([
|
|
|
@@ -38,7 +39,7 @@ wait({ConnOwner, Conn}) ->
|
|
|
receive
|
|
|
%% from msquic
|
|
|
{quic, new_stream, Stream} ->
|
|
|
- {ok, Stream};
|
|
|
+ {ok, {quic, Conn, Stream}};
|
|
|
{'EXIT', ConnOwner, _Reason} ->
|
|
|
{error, enotconn}
|
|
|
end.
|
|
|
@@ -46,18 +47,18 @@ wait({ConnOwner, Conn}) ->
|
|
|
type(_) ->
|
|
|
quic.
|
|
|
|
|
|
-peername(S) ->
|
|
|
- quicer:peername(S).
|
|
|
+peername({quic, Conn, _Stream}) ->
|
|
|
+ quicer:peername(Conn).
|
|
|
|
|
|
-sockname(S) ->
|
|
|
- quicer:sockname(S).
|
|
|
+sockname({quic, Conn, _Stream}) ->
|
|
|
+ quicer:sockname(Conn).
|
|
|
|
|
|
peercert(_S) ->
|
|
|
%% @todo but unsupported by msquic
|
|
|
nossl.
|
|
|
|
|
|
-getstat(Socket, Stats) ->
|
|
|
- case quicer:getstat(Socket, Stats) of
|
|
|
+getstat({quic, Conn, _Stream}, Stats) ->
|
|
|
+ case quicer:getstat(Conn, Stats) of
|
|
|
{error, _} -> {error, closed};
|
|
|
Res -> Res
|
|
|
end.
|
|
|
@@ -84,9 +85,9 @@ getopts(_Socket, _Opts) ->
|
|
|
{buffer, 80000}
|
|
|
]}.
|
|
|
|
|
|
-fast_close(Stream) ->
|
|
|
- %% Stream might be closed already.
|
|
|
- _ = quicer:async_close_stream(Stream),
|
|
|
+fast_close({quic, _Conn, Stream}) ->
|
|
|
+ %% Flush send buffer, gracefully shutdown
|
|
|
+ quicer:async_shutdown_stream(Stream),
|
|
|
ok.
|
|
|
|
|
|
-spec ensure_ok_or_exit(atom(), list(term())) -> term().
|
|
|
@@ -102,9 +103,7 @@ ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) ->
|
|
|
Result
|
|
|
end.
|
|
|
|
|
|
-async_send(Stream, Data, Options) when is_list(Data) ->
|
|
|
- async_send(Stream, iolist_to_binary(Data), Options);
|
|
|
-async_send(Stream, Data, _Options) when is_binary(Data) ->
|
|
|
+async_send({quic, _Conn, Stream}, Data, _Options) ->
|
|
|
case quicer:send(Stream, Data) of
|
|
|
{ok, _Len} -> ok;
|
|
|
Other -> Other
|