Просмотр исходного кода

Merge pull request #5584 from qzhuyan/dev/william/5.0-quic-support-2

quicer 0.0.8
William Yang 4 лет назад
Родитель
Сommit
341df69166

+ 1 - 1
apps/emqx/rebar.config

@@ -29,7 +29,7 @@
            [ meck
            [ meck
            , {bbmustache,"1.10.0"}
            , {bbmustache,"1.10.0"}
            , {emqx_ct_helpers, {git,"https://github.com/emqx/emqx-ct-helpers.git", {branch,"hocon"}}}
            , {emqx_ct_helpers, {git,"https://github.com/emqx/emqx-ct-helpers.git", {branch,"hocon"}}}
-           , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.2"}}}
+           , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.3"}}}
            ]},
            ]},
          {extra_src_dirs, [{"test",[recursive]}]}
          {extra_src_dirs, [{"test",[recursive]}]}
        ]}
        ]}

+ 1 - 1
apps/emqx/rebar.config.script

@@ -18,7 +18,7 @@ IsQuicSupp = fun() ->
              end,
              end,
 
 
 Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {branch, "0.6.0"}}},
 Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {branch, "0.6.0"}}},
-Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {branch, "0.0.7"}}},
+Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {branch, "0.0.8"}}},
 
 
 ExtraDeps = fun(C) ->
 ExtraDeps = fun(C) ->
                 {deps, Deps0} = lists:keyfind(deps, 1, C),
                 {deps, Deps0} = lists:keyfind(deps, 1, C),

+ 3 - 1
apps/emqx/src/emqx_connection.erl

@@ -135,7 +135,9 @@
                             , system_code_change/4
                             , system_code_change/4
                             ]}).
                             ]}).
 
 
--spec(start_link(esockd:transport(), esockd:socket(), emqx_channel:opts())
+-spec(start_link(esockd:transport(),
+                 esockd:socket() | {pid(), quicer:connection_handler()},
+                 emqx_channel:opts())
       -> {ok, pid()}).
       -> {ok, pid()}).
 start_link(Transport, Socket, Options) ->
 start_link(Transport, Socket, Options) ->
     Args = [self(), Transport, Socket, Options],
     Args = [self(), Transport, Socket, Options],

+ 7 - 8
apps/emqx/src/emqx_listeners.erl

@@ -178,24 +178,23 @@ do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts)
 do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) ->
 do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) ->
     case [ A || {quicer, _, _} = A<-application:which_applications() ] of
     case [ A || {quicer, _, _} = A<-application:which_applications() ] of
         [_] ->
         [_] ->
-            %% @fixme unsure why we need reopen lib and reopen config.
-            quicer_nif:open_lib(),
-            quicer_nif:reg_open(),
             DefAcceptors = erlang:system_info(schedulers_online) * 8,
             DefAcceptors = erlang:system_info(schedulers_online) * 8,
             ListenOpts = [ {cert, maps:get(certfile, Opts)}
             ListenOpts = [ {cert, maps:get(certfile, Opts)}
                          , {key, maps:get(keyfile, Opts)}
                          , {key, maps:get(keyfile, Opts)}
                          , {alpn, ["mqtt"]}
                          , {alpn, ["mqtt"]}
-                         , {conn_acceptors, maps:get(acceptors, Opts, DefAcceptors)}
-                         , {idle_timeout_ms, emqx_config:get_zone_conf(zone(Opts),
-                                                [mqtt, idle_timeout])}
+                         , {conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])}
+                         , {idle_timeout_ms, lists:max([
+                                                emqx_config:get_zone_conf(zone(Opts), [mqtt, idle_timeout]) * 3
+                                              , timer:seconds(maps:get(idle_timeout, Opts))]
+                                              )}
                          ],
                          ],
-            ConnectionOpts = #{conn_callback => emqx_quic_connection
+            ConnectionOpts = #{ conn_callback => emqx_quic_connection
                               , peer_unidi_stream_count => 1
                               , peer_unidi_stream_count => 1
                               , peer_bidi_stream_count => 10
                               , peer_bidi_stream_count => 10
                               , zone => zone(Opts)
                               , zone => zone(Opts)
                               , listener => {quic, ListenerName}
                               , listener => {quic, ListenerName}
                               },
                               },
-            StreamOpts = [],
+            StreamOpts = [{stream_callback, emqx_quic_stream}],
             quicer:start_listener(listener_id(quic, ListenerName),
             quicer:start_listener(listener_id(quic, ListenerName),
                                   port(ListenOn), {ListenOpts, ConnectionOpts, StreamOpts});
                                   port(ListenOn), {ListenOpts, ConnectionOpts, StreamOpts});
         [] ->
         [] ->

+ 36 - 3
apps/emqx/src/emqx_quic_connection.erl

@@ -17,8 +17,41 @@
 -module(emqx_quic_connection).
 -module(emqx_quic_connection).
 
 
 %% Callbacks
 %% Callbacks
--export([ new_conn/2
+-export([ init/1
+        , new_conn/2
+        , connected/2
+        , shutdown/2
         ]).
         ]).
 
 
-new_conn(Conn, {_L, COpts, _S}) when is_map(COpts) ->
-    emqx_connection:start_link(emqx_quic_stream, Conn, COpts).
+-type cb_state() :: map() | proplists:proplist().
+
+
+-spec init(cb_state()) -> cb_state().
+init(ConnOpts) when is_list(ConnOpts) ->
+    init(maps:from_list(ConnOpts));
+init(ConnOpts) when is_map(ConnOpts) ->
+    ConnOpts.
+
+-spec new_conn(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}.
+new_conn(Conn, S) ->
+    process_flag(trap_exit, true),
+    {ok, Pid} = emqx_connection:start_link(emqx_quic_stream, {self(), Conn}, S),
+    receive
+        {Pid, stream_acceptor_ready} ->
+            ok = quicer:async_handshake(Conn),
+            {ok, S};
+        {'EXIT', Pid, _Reason} ->
+            {error, stream_accept_error}
+    end.
+
+-spec connected(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}.
+connected(Conn, #{slow_start := false} = S) ->
+    {ok, _Pid} = emqx_connection:start_link(emqx_quic_stream, Conn, S),
+    {ok, S};
+connected(_Conn, S) ->
+    {ok, S}.
+
+-spec shutdown(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}.
+shutdown(Conn, S) ->
+    quicer:async_close_connection(Conn),
+    {ok, S}.

+ 16 - 4
apps/emqx/src/emqx_quic_stream.erl

@@ -31,8 +31,16 @@
         , peercert/1
         , peercert/1
         ]).
         ]).
 
 
-wait(Conn) ->
-    quicer:accept_stream(Conn, []).
+wait({ConnOwner, Conn}) ->
+    {ok, Conn} = quicer:async_accept_stream(Conn, []),
+    ConnOwner ! {self(), stream_acceptor_ready},
+    receive
+        %% from msquic
+        {quic, new_stream, Stream} ->
+            {ok, Stream};
+        {'EXIT', ConnOwner, _Reason} ->
+            {error, enotconn}
+    end.
 
 
 type(_) ->
 type(_) ->
     quic.
     quic.
@@ -44,6 +52,7 @@ sockname(S) ->
     quicer:sockname(S).
     quicer:sockname(S).
 
 
 peercert(_S) ->
 peercert(_S) ->
+    %% @todo but unsupported by msquic
     nossl.
     nossl.
 
 
 getstat(Socket, Stats) ->
 getstat(Socket, Stats) ->
@@ -88,5 +97,8 @@ ensure_ok_or_exit(Fun, Args = [Sock|_]) when is_atom(Fun), is_list(Args) ->
 async_send(Stream, Data, Options) when is_list(Data) ->
 async_send(Stream, Data, Options) when is_list(Data) ->
     async_send(Stream, iolist_to_binary(Data), Options);
     async_send(Stream, iolist_to_binary(Data), Options);
 async_send(Stream, Data, _Options) when is_binary(Data) ->
 async_send(Stream, Data, _Options) when is_binary(Data) ->
-    {ok, _Len} = quicer:send(Stream, Data),
-    ok.
+    case quicer:send(Stream, Data) of
+        {ok, _Len} -> ok;
+        Other ->
+            Other
+    end.

+ 1 - 1
rebar.config

@@ -55,7 +55,7 @@
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}
     , {replayq, "0.3.3"}
     , {replayq, "0.3.3"}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
-    , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.2"}}}
+    , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.3"}}}
     , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.2"}}}
     , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.2"}}}
     , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x
     , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x
     , {getopt, "1.0.2"}
     , {getopt, "1.0.2"}

+ 1 - 1
rebar.config.erl

@@ -16,7 +16,7 @@ bcrypt() ->
     {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {branch, "0.6.0"}}}.
     {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {branch, "0.6.0"}}}.
 
 
 quicer() ->
 quicer() ->
-    {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.7"}}}.
+    {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.8"}}}.
 
 
 deps(Config) ->
 deps(Config) ->
     {deps, OldDeps} = lists:keyfind(deps, 1, Config),
     {deps, OldDeps} = lists:keyfind(deps, 1, Config),