فهرست منبع

Merge pull request #12274 from qzhuyan/dev/william/quic-listn-conf-reload

feat: quic listener conf reload
William Yang 2 سال پیش
والد
کامیت
8cd99cb0c8

+ 1 - 1
apps/emqx/rebar.config.script

@@ -24,7 +24,7 @@ IsQuicSupp = fun() ->
 end,
 
 Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}},
-Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.308"}}}.
+Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.311"}}}.
 
 Dialyzer = fun(Config) ->
     {dialyzer, OldDialyzerConfig} = lists:keyfind(dialyzer, 1, Config),

+ 80 - 53
apps/emqx/src/emqx_listeners.erl

@@ -303,6 +303,8 @@ update_listener(Type, Name, OldConf, NewConf) ->
         ok ->
             ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
             ok;
+        {skip, Error} when Type =:= quic ->
+            {error, {rollbacked, Error}};
         {error, _Reason} ->
             restart_listener(Type, Name, OldConf, NewConf)
     end.
@@ -411,49 +413,10 @@ do_start_listener(Type, Name, Id, Opts) when ?COWBOY_LISTENER(Type) ->
     end;
 %% Start MQTT/QUIC listener
 do_start_listener(quic, Name, Id, #{bind := Bind} = Opts) ->
-    ListenOn =
-        case Bind of
-            {Addr, Port} when tuple_size(Addr) == 4 ->
-                %% IPv4
-                lists:flatten(io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]));
-            {Addr, Port} when tuple_size(Addr) == 8 ->
-                %% IPv6
-                lists:flatten(io_lib:format("[~ts]:~w", [inet:ntoa(Addr), Port]));
-            Port ->
-                Port
-        end,
-
+    ListenOn = quic_listen_on(Bind),
     case [A || {quicer, _, _} = A <- application:which_applications()] of
         [_] ->
-            DefAcceptors = erlang:system_info(schedulers_online) * 8,
-            SSLOpts = maps:get(ssl_options, Opts, #{}),
-            ListenOpts =
-                [
-                    {certfile, emqx_schema:naive_env_interpolation(maps:get(certfile, SSLOpts))},
-                    {keyfile, emqx_schema:naive_env_interpolation(maps:get(keyfile, SSLOpts))},
-                    {alpn, ["mqtt"]},
-                    {conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])},
-                    {keep_alive_interval_ms, maps:get(keep_alive_interval, Opts, 0)},
-                    {idle_timeout_ms, maps:get(idle_timeout, Opts, 0)},
-                    {handshake_idle_timeout_ms, maps:get(handshake_idle_timeout, Opts, 10000)},
-                    {server_resumption_level, maps:get(server_resumption_level, Opts, 2)},
-                    {verify, maps:get(verify, SSLOpts, verify_none)}
-                ] ++
-                    case maps:get(cacertfile, SSLOpts, undefined) of
-                        undefined ->
-                            [];
-                        <<>> ->
-                            [];
-                        "" ->
-                            [];
-                        CaCertFile ->
-                            [{cacertfile, emqx_schema:naive_env_interpolation(CaCertFile)}]
-                    end ++
-                    case maps:get(password, SSLOpts, undefined) of
-                        undefined -> [];
-                        Password -> [{password, str(Password)}]
-                    end ++
-                    optional_quic_listener_opts(Opts),
+            ListenOpts = to_quicer_listener_opts(Opts),
             Limiter = limiter(Opts),
             ConnectionOpts = #{
                 conn_callback => emqx_quic_connection,
@@ -470,7 +433,7 @@ do_start_listener(quic, Name, Id, #{bind := Bind} = Opts) ->
             quicer:spawn_listener(
                 Id,
                 ListenOn,
-                {maps:from_list(ListenOpts), ConnectionOpts, StreamOpts}
+                {ListenOpts, ConnectionOpts, StreamOpts}
             );
         [] ->
             {ok, {skipped, quic_app_missing}}
@@ -506,6 +469,31 @@ do_update_listener(Type, Name, OldConf, NewConf) when
     ok = ranch:set_protocol_options(Id, WsOpts),
     %% No-op if the listener was not suspended.
     ranch:resume_listener(Id);
+do_update_listener(quic = Type, Name, _OldConf, NewConf) ->
+    case quicer:listener(listener_id(Type, Name)) of
+        {ok, ListenerPid} ->
+            case quicer_listener:reload(ListenerPid, to_quicer_listener_opts(NewConf)) of
+                ok ->
+                    ok;
+                {error, _} = Error ->
+                    %% @TODO: prefer: case quicer_listener:reload(ListenerPid, to_quicer_listener_opts(OldConf)) of
+                    case quicer_listener:unlock(ListenerPid, 3000) of
+                        ok ->
+                            ?ELOG("Failed to reload QUIC listener ~p, but Rollback success\n", [
+                                Error
+                            ]),
+                            {skip, Error};
+                        RestoreErr ->
+                            ?ELOG(
+                                "Failed to reload QUIC listener ~p, and Rollback failed as well\n",
+                                [Error]
+                            ),
+                            {error, {rollback_fail, RestoreErr}}
+                    end
+            end;
+        E ->
+            E
+    end;
 do_update_listener(_Type, _Name, _OldConf, _NewConf) ->
     {error, not_supported}.
 
@@ -897,18 +885,16 @@ get_ssl_options(_) ->
 
 %% @doc Get QUIC optional settings for low level tunings.
 %% @see quicer:quic_settings()
--spec optional_quic_listener_opts(map()) -> proplists:proplist().
+-spec optional_quic_listener_opts(map()) -> map().
 optional_quic_listener_opts(Conf) when is_map(Conf) ->
-    maps:to_list(
-        maps:filter(
-            fun(Name, _V) ->
-                lists:member(
-                    Name,
-                    quic_listener_optional_settings()
-                )
-            end,
-            Conf
-        )
+    maps:filter(
+        fun(Name, _V) ->
+            lists:member(
+                Name,
+                quic_listener_optional_settings()
+            )
+        end,
+        Conf
     ).
 
 -spec quic_listener_optional_settings() -> [atom()].
@@ -991,3 +977,44 @@ default_max_conn() ->
 ensure_max_conns(<<"infinity">>) -> <<"infinity">>;
 ensure_max_conns(MaxConn) when is_binary(MaxConn) -> binary_to_integer(MaxConn);
 ensure_max_conns(MaxConn) -> MaxConn.
+
+-spec quic_listen_on(X :: any()) -> quicer:listen_on().
+quic_listen_on(Bind) ->
+    case Bind of
+        {Addr, Port} when tuple_size(Addr) == 4 ->
+            %% IPv4
+            lists:flatten(io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]));
+        {Addr, Port} when tuple_size(Addr) == 8 ->
+            %% IPv6
+            lists:flatten(io_lib:format("[~ts]:~w", [inet:ntoa(Addr), Port]));
+        Port ->
+            Port
+    end.
+
+-spec to_quicer_listener_opts(map()) -> quicer:listener_opts().
+to_quicer_listener_opts(Opts) ->
+    DefAcceptors = erlang:system_info(schedulers_online) * 8,
+    SSLOpts = maps:from_list(ssl_opts(Opts)),
+    Opts1 = maps:filter(
+        fun
+            (cacertfile, undefined) -> fasle;
+            (password, undefined) -> fasle;
+            (_, _) -> true
+        end,
+        Opts
+    ),
+    Opts2 = maps:merge(
+        Opts#{
+            alpn => ["mqtt"],
+            conn_acceptors => max(DefAcceptors, maps:get(acceptors, Opts1, 0)),
+            %% @NOTE: Backward compatibility START
+            server_resumption_level => maps:get(server_resumption_level, Opts, 2),
+            idle_timeout_ms => maps:get(idle_timeout, Opts, 0),
+            keep_alive_interval_ms => maps:get(keep_alive_interval, Opts, 0),
+            handshake_idle_timeout_ms => maps:get(handshake_idle_timeout, Opts, 10000)
+            %% @NOTE: Backward compatibility END
+        },
+        SSLOpts
+    ),
+    %% @NOTE: Optional options take precedence over required options
+    maps:merge(Opts2, optional_quic_listener_opts(Opts)).

+ 2 - 2
apps/emqx/src/emqx_quic_stream.erl

@@ -139,8 +139,8 @@ fast_close({ConnOwner, Conn, _ConnInfo}) when is_pid(ConnOwner) ->
     _ = quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0),
     ok;
 fast_close({quic, _Conn, Stream, _Info}) ->
-    %% Force flush
-    _ = quicer:async_shutdown_stream(Stream),
+    %% Force flush, cutoff time 3s
+    _ = quicer:shutdown_stream(Stream, 3000),
     %% @FIXME Since we shutdown the control stream, we shutdown the connection as well
     %% *BUT* Msquic does not flush the send buffer if we shutdown the connection after
     %% gracefully shutdown the stream.

+ 197 - 0
apps/emqx/test/emqx_listeners_SUITE.erl

@@ -363,6 +363,188 @@ t_wss_update_opts(Config) ->
         ok = emqtt:stop(C3)
     end).
 
+t_quic_update_opts(Config) ->
+    ListenerType = quic,
+    ConnectFun = connect_fun(ListenerType),
+    PrivDir = ?config(priv_dir, Config),
+    Host = "127.0.0.1",
+    Port = emqx_common_test_helpers:select_free_port(ListenerType),
+    Conf = #{
+        <<"enable">> => true,
+        <<"bind">> => format_bind({Host, Port}),
+        <<"ssl_options">> => #{
+            <<"cacertfile">> => filename:join(PrivDir, "ca.pem"),
+            <<"password">> => ?SERVER_KEY_PASSWORD,
+            <<"certfile">> => filename:join(PrivDir, "server-password.pem"),
+            <<"keyfile">> => filename:join(PrivDir, "server-password.key"),
+            <<"verify">> => verify_none
+        }
+    },
+    ClientSSLOpts = [
+        {verify, verify_peer},
+        {customize_hostname_check, [{match_fun, fun(_, _) -> true end}]}
+    ],
+    with_listener(ListenerType, updated, Conf, fun() ->
+        %% Client connects successfully.
+        C1 = ConnectFun(Host, Port, [
+            {cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts
+        ]),
+
+        %% Change the listener SSL configuration: another set of cert/key files.
+        {ok, _} = emqx:update_config(
+            [listeners, ListenerType, updated],
+            {update, #{
+                <<"ssl_options">> => #{
+                    <<"cacertfile">> => filename:join(PrivDir, "ca-next.pem"),
+                    <<"certfile">> => filename:join(PrivDir, "server.pem"),
+                    <<"keyfile">> => filename:join(PrivDir, "server.key")
+                }
+            }}
+        ),
+
+        %% Unable to connect with old SSL options, server's cert is signed by another CA.
+        ?assertError(
+            {transport_down, #{error := _, status := Status}} when
+                (Status =:= bad_certificate orelse
+                    Status =:= cert_untrusted_root orelse
+                    Status =:= handshake_failure),
+            ConnectFun(Host, Port, [
+                {cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts
+            ])
+        ),
+
+        C2 = ConnectFun(Host, Port, [
+            {cacertfile, filename:join(PrivDir, "ca-next.pem")} | ClientSSLOpts
+        ]),
+
+        %% Change the listener SSL configuration: require peer certificate.
+        {ok, _} = emqx:update_config(
+            [listeners, ListenerType, updated],
+            {update, #{
+                <<"ssl_options">> => #{
+                    <<"verify">> => verify_peer,
+                    <<"fail_if_no_peer_cert">> => true
+                }
+            }}
+        ),
+
+        %% Unable to connect with old SSL options, certificate is now required.
+        ?assertExceptionOneOf(
+            {exit, _},
+            {error, _},
+            ConnectFun(Host, Port, [
+                {cacertfile, filename:join(PrivDir, "ca-next.pem")} | ClientSSLOpts
+            ])
+        ),
+
+        C3 = ConnectFun(Host, Port, [
+            {cacertfile, filename:join(PrivDir, "ca-next.pem")},
+            {certfile, filename:join(PrivDir, "client.pem")},
+            {keyfile, filename:join(PrivDir, "client.key")}
+            | ClientSSLOpts
+        ]),
+
+        %% Both pre- and post-update clients should be alive.
+        ?assertEqual(pong, emqtt:ping(C1)),
+        ?assertEqual(pong, emqtt:ping(C2)),
+        ?assertEqual(pong, emqtt:ping(C3)),
+
+        ok = emqtt:stop(C1),
+        ok = emqtt:stop(C2),
+        ok = emqtt:stop(C3)
+    end).
+
+t_quic_update_opts_fail(Config) ->
+    ListenerType = quic,
+    ConnectFun = connect_fun(ListenerType),
+    PrivDir = ?config(priv_dir, Config),
+    Host = "127.0.0.1",
+    Port = emqx_common_test_helpers:select_free_port(ListenerType),
+    Conf = #{
+        <<"enable">> => true,
+        <<"bind">> => format_bind({Host, Port}),
+        <<"ssl_options">> => #{
+            <<"cacertfile">> => filename:join(PrivDir, "ca.pem"),
+            <<"password">> => ?SERVER_KEY_PASSWORD,
+            <<"certfile">> => filename:join(PrivDir, "server-password.pem"),
+            <<"keyfile">> => filename:join(PrivDir, "server-password.key"),
+            <<"verify">> => verify_none
+        }
+    },
+    ClientSSLOpts = [
+        {verify, verify_peer},
+        {customize_hostname_check, [{match_fun, fun(_, _) -> true end}]}
+    ],
+    with_listener(ListenerType, updated, Conf, fun() ->
+        %% GIVEN: an working Listener that client could connect to.
+        C1 = ConnectFun(Host, Port, [
+            {cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts
+        ]),
+
+        %% WHEN: reload the listener with invalid SSL options (certfile and keyfile missmatch).
+        UpdateResult1 = emqx:update_config(
+            [listeners, ListenerType, updated],
+            {update, #{
+                <<"ssl_options">> => #{
+                    <<"cacertfile">> => filename:join(PrivDir, "ca-next.pem"),
+                    <<"certfile">> => filename:join(PrivDir, "server.pem"),
+                    <<"keyfile">> => filename:join(PrivDir, "server-password.key")
+                }
+            }}
+        ),
+
+        %% THEN: Reload failed but old listener is rollbacked.
+        ?assertMatch(
+            {error, {post_config_update, emqx_listeners, {{rollbacked, {error, tls_error}}, _}}},
+            UpdateResult1
+        ),
+
+        %% THEN: Client with old TLS options could still connect
+        C2 = ConnectFun(Host, Port, [
+            {cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts
+        ]),
+
+        %% WHEN: Change the listener SSL configuration again
+        UpdateResult2 = emqx:update_config(
+            [listeners, ListenerType, updated],
+            {update, #{
+                <<"ssl_options">> => #{
+                    <<"cacertfile">> => filename:join(PrivDir, "ca-next.pem"),
+                    <<"certfile">> => filename:join(PrivDir, "server.pem"),
+                    <<"keyfile">> => filename:join(PrivDir, "server.key")
+                }
+            }}
+        ),
+        %% THEN: update should success
+        ?assertMatch({ok, _}, UpdateResult2),
+
+        %% THEN: Client with old TLS options could not connect
+        %% Unable to connect with old SSL options, server's cert is signed by another CA.
+        ?assertError(
+            {transport_down, #{error := _, status := Status}} when
+                (Status =:= bad_certificate orelse
+                    Status =:= cert_untrusted_root orelse
+                    Status =:= handshake_failure),
+            ConnectFun(Host, Port, [
+                {cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts
+            ])
+        ),
+
+        %% THEN: Client with new TLS options could connect
+        C3 = ConnectFun(Host, Port, [
+            {cacertfile, filename:join(PrivDir, "ca-next.pem")} | ClientSSLOpts
+        ]),
+
+        %% Both pre- and post-update clients should be alive.
+        ?assertEqual(pong, emqtt:ping(C1)),
+        ?assertEqual(pong, emqtt:ping(C2)),
+        ?assertEqual(pong, emqtt:ping(C3)),
+
+        ok = emqtt:stop(C1),
+        ok = emqtt:stop(C2),
+        ok = emqtt:stop(C3)
+    end).
+
 with_listener(Type, Name, Config, Then) ->
     {ok, _} = emqx:update_config([listeners, Type, Name], {create, Config}),
     try
@@ -379,6 +561,14 @@ emqtt_connect_ssl(Host, Port, SSLOpts) ->
         ssl_opts => SSLOpts
     }).
 
+emqtt_connect_quic(Host, Port, SSLOpts) ->
+    emqtt_connect(fun emqtt:quic_connect/1, #{
+        hosts => [{Host, Port}],
+        connect_timeout => 1,
+        ssl => true,
+        ssl_opts => SSLOpts
+    }).
+
 emqtt_connect_wss(Host, Port, SSLOpts) ->
     emqtt_connect(fun emqtt:ws_connect/1, #{
         hosts => [{Host, Port}],
@@ -440,3 +630,10 @@ generate_tls_certs(Config) ->
 
 format_bind(Bind) ->
     iolist_to_binary(emqx_listeners:format_bind(Bind)).
+
+connect_fun(ssl) ->
+    fun emqtt_connect_ssl/3;
+connect_fun(quic) ->
+    fun emqtt_connect_quic/3;
+connect_fun(wss) ->
+    fun emqtt_connect_wss/3.

+ 2 - 0
changes/feat-12274.en.md

@@ -0,0 +1,2 @@
+Enable dynamic TLS configuration updates for QUIC MQTT listeners without disrupting existing connections.
+Implement a fail-safe mechanism that reverts to the previous TLS configuration in case of update failures. 

+ 1 - 1
mix.exs

@@ -794,7 +794,7 @@ defmodule EMQXUmbrella.MixProject do
   defp quicer_dep() do
     if enable_quicer?(),
       # in conflict with emqx and emqtt
-      do: [{:quicer, github: "emqx/quic", tag: "0.0.308", override: true}],
+      do: [{:quicer, github: "emqx/quic", tag: "0.0.311", override: true}],
       else: []
   end
 

+ 1 - 1
rebar.config.erl

@@ -36,7 +36,7 @@ assert_otp() ->
     end.
 
 quicer() ->
-    {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.308"}}}.
+    {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.311"}}}.
 
 jq() ->
     {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.12"}}}.