ソースを参照

Merge pull request #8092 from qzhuyan/dev/william/bump-quicer

 feat: new quicer 0.0.11
William Yang 3 年 前
コミット
3c7dd5d6c4

+ 1 - 1
apps/emqx/etc/emqx.conf

@@ -31,7 +31,7 @@ listeners.wss.default {
 }
 
 # listeners.quic.default {
-#  enabled = false
+#  enabled = true
 #  bind = "0.0.0.0:14567"
 #  max_connections = 1024000
 #  keyfile = "{{ platform_etc_dir }}/certs/key.pem"

+ 1 - 1
apps/emqx/rebar.config

@@ -43,7 +43,7 @@
             {meck, "0.9.2"},
             {proper, "1.4.0"},
             {bbmustache, "1.10.0"},
-            {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.5.0"}}}
+            {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.6.0"}}}
         ]},
         {extra_src_dirs, [{"test", [recursive]}]}
     ]}

+ 32 - 23
apps/emqx/rebar.config.script

@@ -1,37 +1,46 @@
 %% -*- mode: erlang -*-
 IsCentos6 = fun() ->
-                case file:read_file("/etc/centos-release") of
-                    {ok, <<"CentOS release 6", _/binary >>} ->
-                        true;
-                    _ ->
-                        false
-                end
-            end,
+    case file:read_file("/etc/centos-release") of
+        {ok, <<"CentOS release 6", _/binary>>} ->
+            true;
+        _ ->
+            false
+    end
+end,
 
 IsWin32 = fun() ->
-                win32 =:= element(1, os:type())
-          end,
+    win32 =:= element(1, os:type())
+end,
 
 IsMacOS = fun() ->
-               {unix, darwin} =:= os:type()
-          end,
+    {unix, darwin} =:= os:type()
+end,
 
 IsQuicSupp = fun() ->
-                not (IsCentos6() orelse IsWin32()
-                     orelse IsMacOS() orelse
-                     false =/= os:getenv("BUILD_WITHOUT_QUIC")
-                    )
-                orelse "1" == os:getenv("BUILD_WITH_QUIC")
-             end,
+    not (IsCentos6() orelse IsWin32() orelse
+        IsMacOS() orelse
+        false =/= os:getenv("BUILD_WITHOUT_QUIC")) orelse
+        "1" == os:getenv("BUILD_WITH_QUIC")
+end,
 
 Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}},
-Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.9"}}},
+Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.11"}}}.
 
 ExtraDeps = fun(C) ->
-                {deps, Deps0} = lists:keyfind(deps, 1, C),
-                Deps = Deps0 ++ [Bcrypt || not IsWin32()] ++
-                [ Quicer || IsQuicSupp()],
-                lists:keystore(deps, 1, C, {deps, Deps})
-            end,
+    {deps, Deps0} = lists:keyfind(deps, 1, C),
+    {erl_opts, ErlOpts0} = lists:keyfind(erl_opts, 1, C),
+    IsQuic = IsQuicSupp(),
+    New = [
+        {deps, Deps0 ++ [Bcrypt || not IsWin32()] ++ [Quicer || IsQuic]},
+        {erl_opts, ErlOpts0 ++ [{d, 'BUILD_WITHOUT_QUIC'} || not IsQuic]}
+    ],
+    lists:foldl(
+        fun({Key, _Val} = KV, Acc) ->
+            lists:keystore(Key, 1, Acc, KV)
+        end,
+        C,
+        New
+    )
+end,
 
 ExtraDeps(CONFIG).

+ 1 - 1
apps/emqx/src/emqx_app.erl

@@ -113,7 +113,7 @@ is_quicer_app_present() ->
     end.
 
 is_quic_listener_configured() ->
-    emqx_listeners:has_enabled_listener_conf_by_type(quic).
+    maps:is_key(quic, emqx:get_config([listeners])).
 
 get_description() -> emqx_release:description().
 

+ 11 - 4
apps/emqx/src/emqx_listeners.erl

@@ -143,9 +143,13 @@ is_running(Type, ListenerId, _Conf) when Type =:= ws; Type =:= wss ->
         _:_ ->
             false
     end;
-is_running(quic, _ListenerId, _Conf) ->
-    %% TODO: quic support
-    false.
+is_running(quic, ListenerId, _Conf) ->
+    case quicer:listener(ListenerId) of
+        {ok, Pid} when is_pid(Pid) ->
+            true;
+        _ ->
+            false
+    end.
 
 current_conns(ID, ListenOn) ->
     {ok, #{type := Type, name := Name}} = parse_listener_id(ID),
@@ -325,15 +329,18 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) ->
     case [A || {quicer, _, _} = A <- application:which_applications()] of
         [_] ->
             DefAcceptors = erlang:system_info(schedulers_online) * 8,
+            IdleTimeout = timer:seconds(maps:get(idle_timeout, Opts)),
             ListenOpts = [
                 {cert, maps:get(certfile, Opts)},
                 {key, maps:get(keyfile, Opts)},
                 {alpn, ["mqtt"]},
                 {conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])},
+                {keep_alive_interval_ms, ceil(IdleTimeout / 3)},
+                {server_resumption_level, 2},
                 {idle_timeout_ms,
                     lists:max([
                         emqx_config:get_zone_conf(zone(Opts), [mqtt, idle_timeout]) * 3,
-                        timer:seconds(maps:get(idle_timeout, Opts))
+                        IdleTimeout
                     ])}
             ],
             ConnectionOpts = #{

+ 7 - 1
apps/emqx/src/emqx_quic_connection.erl

@@ -16,6 +16,12 @@
 
 -module(emqx_quic_connection).
 
+-ifndef(BUILD_WITHOUT_QUIC).
+-include_lib("quicer/include/quicer.hrl").
+-else.
+-define(QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0).
+-endif.
+
 %% Callbacks
 -export([
     init/1,
@@ -59,5 +65,5 @@ connected(_Conn, S) ->
 
 -spec shutdown(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}.
 shutdown(Conn, S) ->
-    quicer:async_close_connection(Conn),
+    quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0),
     {ok, S}.

+ 11 - 13
apps/emqx/src/emqx_quic_stream.erl

@@ -38,7 +38,7 @@ wait({ConnOwner, Conn}) ->
     receive
         %% from msquic
         {quic, new_stream, Stream} ->
-            {ok, Stream};
+            {ok, {quic, Conn, Stream}};
         {'EXIT', ConnOwner, _Reason} ->
             {error, enotconn}
     end.
@@ -46,18 +46,18 @@ wait({ConnOwner, Conn}) ->
 type(_) ->
     quic.
 
-peername(S) ->
-    quicer:peername(S).
+peername({quic, Conn, _Stream}) ->
+    quicer:peername(Conn).
 
-sockname(S) ->
-    quicer:sockname(S).
+sockname({quic, Conn, _Stream}) ->
+    quicer:sockname(Conn).
 
 peercert(_S) ->
     %% @todo but unsupported by msquic
     nossl.
 
-getstat(Socket, Stats) ->
-    case quicer:getstat(Socket, Stats) of
+getstat({quic, Conn, _Stream}, Stats) ->
+    case quicer:getstat(Conn, Stats) of
         {error, _} -> {error, closed};
         Res -> Res
     end.
@@ -84,9 +84,9 @@ getopts(_Socket, _Opts) ->
         {buffer, 80000}
     ]}.
 
-fast_close(Stream) ->
-    %% Stream might be closed already.
-    _ = quicer:async_close_stream(Stream),
+fast_close({quic, _Conn, Stream}) ->
+    %% Flush send buffer, gracefully shutdown
+    quicer:async_shutdown_stream(Stream),
     ok.
 
 -spec ensure_ok_or_exit(atom(), list(term())) -> term().
@@ -102,9 +102,7 @@ ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) ->
             Result
     end.
 
-async_send(Stream, Data, Options) when is_list(Data) ->
-    async_send(Stream, iolist_to_binary(Data), Options);
-async_send(Stream, Data, _Options) when is_binary(Data) ->
+async_send({quic, _Conn, Stream}, Data, _Options) ->
     case quicer:send(Stream, Data) of
         {ok, _Len} -> ok;
         Other -> Other

+ 1 - 1
apps/emqx/test/emqx_common_test_helpers.erl

@@ -472,7 +472,6 @@ ensure_dashboard_listeners_started(_App) ->
 -spec ensure_quic_listener(Name :: atom(), UdpPort :: inet:port_number()) -> ok.
 ensure_quic_listener(Name, UdpPort) ->
     application:ensure_all_started(quicer),
-    emqx_config:put([listeners, quic, Name, mountpoint], <<>>),
     Conf = #{
         acceptors => 16,
         bind => {{0, 0, 0, 0}, UdpPort},
@@ -491,6 +490,7 @@ ensure_quic_listener(Name, UdpPort) ->
         mountpoint => <<>>,
         zone => default
     },
+    emqx_config:put([listeners, quic, Name], Conf),
     case emqx_listeners:start_listener(quic, Name, Conf) of
         ok -> ok;
         {error, {already_started, _Pid}} -> ok

+ 2 - 1
apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl

@@ -252,7 +252,8 @@ t_connect_will_message(Config) ->
     {ok, _, [2]} = emqtt:subscribe(Client4, Topic, qos2),
     ok = emqtt:disconnect(Client3),
     %% [MQTT-3.1.2-10]
-    ?assertEqual(0, length(receive_messages(1))),
+    MsgRecv = receive_messages(1),
+    ?assertEqual([], MsgRecv),
     ok = emqtt:disconnect(Client4).
 
 t_batch_subscribe(init, Config) ->

+ 1 - 1
apps/emqx_connector/rebar.config

@@ -21,7 +21,7 @@
     %% eredis_cluster's dependency getting resolved earlier.
     %% Here we pin 1.5.2 to avoid surprises in the future.
     {poolboy, {git, "https://github.com/emqx/poolboy.git", {tag, "1.5.2"}}},
-    {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.5.1"}}}
+    {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.6.0"}}}
 ]}.
 
 {shell, [

+ 2 - 2
mix.exs

@@ -58,7 +58,7 @@ defmodule EMQXUmbrella.MixProject do
       {:ecpool, github: "emqx/ecpool", tag: "0.5.2"},
       {:replayq, "0.3.4", override: true},
       {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
-      {:emqtt, github: "emqx/emqtt", tag: "1.5.1", override: true},
+      {:emqtt, github: "emqx/emqtt", tag: "1.6.0", override: true},
       {:rulesql, github: "emqx/rulesql", tag: "0.1.4"},
       {:observer_cli, "1.7.1"},
       {:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"},
@@ -591,7 +591,7 @@ defmodule EMQXUmbrella.MixProject do
   defp quicer_dep() do
     if enable_quicer?(),
       # in conflict with emqx and emqtt
-      do: [{:quicer, github: "emqx/quic", tag: "0.0.9", override: true}],
+      do: [{:quicer, github: "emqx/quic", tag: "0.0.11", override: true}],
       else: []
   end
 

+ 1 - 1
rebar.config

@@ -60,7 +60,7 @@
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
     , {replayq, "0.3.4"}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
-    , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.5.1"}}}
+    , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.6.0"}}}
     , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}}
     , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x
     , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}

+ 3 - 2
rebar.config.erl

@@ -38,7 +38,7 @@ bcrypt() ->
     {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}}.
 
 quicer() ->
-    {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.9"}}}.
+    {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.11"}}}.
 
 jq() ->
     {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.4"}}}.
@@ -138,7 +138,8 @@ common_compile_opts(Edition, Vsn) ->
         {compile_info, [{emqx_vsn, Vsn}]},
         {d, 'EMQX_RELEASE_EDITION', Edition}
     ] ++
-        [{d, 'EMQX_BENCHMARK'} || os:getenv("EMQX_BENCHMARK") =:= "1"].
+        [{d, 'EMQX_BENCHMARK'} || os:getenv("EMQX_BENCHMARK") =:= "1"] ++
+        [{d, 'BUILD_WITHOUT_QUIC'} || not is_quicer_supported()].
 
 prod_compile_opts(Edition, Vsn) ->
     [