Bladeren bron

feat(quic): conn scope keepalive

William Yang 1 jaar geleden
bovenliggende
commit
472f2b7201

+ 36 - 3
apps/emqx/src/emqx_channel.erl

@@ -276,7 +276,8 @@ init(
         },
         Zone
     ),
-    {NClientInfo, NConnInfo} = take_conn_info_fields([ws_cookie, peersni], ClientInfo, ConnInfo),
+    {NClientInfo, NConnInfo0} = take_conn_info_fields([ws_cookie, peersni], ClientInfo, ConnInfo),
+    NConnInfo = maybe_quic_shared_state(NConnInfo0, Opts),
     #channel{
         conninfo = NConnInfo,
         clientinfo = NClientInfo,
@@ -295,6 +296,11 @@ init(
         pendings = []
     }.
 
+maybe_quic_shared_state(ConnInfo, #{conn_shared_state := QSS}) ->
+    ConnInfo#{conn_shared_state => QSS};
+maybe_quic_shared_state(ConnInfo, _) ->
+    ConnInfo.
+
 set_peercert_infos(NoSSL, ClientInfo, _) when
     NoSSL =:= nossl;
     NoSSL =:= undefined
@@ -2374,17 +2380,44 @@ init_alias_maximum(_ConnPkt, _ClientInfo) ->
 
 %% MQTT 5
 ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel = #channel{conninfo = ConnInfo}) ->
+    ensure_quic_conn_idle_timeout(Interval, Channel),
     ensure_keepalive_timer(Interval, Channel#channel{conninfo = ConnInfo#{keepalive => Interval}});
 %% MQTT 3,4
 ensure_keepalive(_AckProps, Channel = #channel{conninfo = ConnInfo}) ->
+    ensure_quic_conn_idle_timeout(maps:get(keepalive, ConnInfo), Channel),
     ensure_keepalive_timer(maps:get(keepalive, ConnInfo), Channel).
 
+ensure_quic_conn_idle_timeout(Timeout, #channel{
+    clientinfo = #{zone := Zone},
+    conninfo = #{socktype := quic, sock := Sock}
+}) ->
+    Conn = element(2, Sock),
+    #{keepalive_multiplier := Mul} =
+        emqx_config:get_zone_conf(Zone, [mqtt]),
+    %%% The original idle_timeout is from the listener, now we update it per connection
+    %%% Conn could be closed so we don't check the ret val
+    _ = quicer:setopt(Conn, settings, #{idle_timeout_ms => timer:seconds(Timeout * Mul)}, false),
+    ok;
+ensure_quic_conn_idle_timeout(_, _) ->
+    ok.
+
 ensure_keepalive_timer(0, Channel) ->
     Channel;
 ensure_keepalive_timer(disabled, Channel) ->
     Channel;
-ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) ->
-    Keepalive = emqx_keepalive:init(Zone, Interval),
+ensure_keepalive_timer(
+    Interval, Channel = #channel{clientinfo = #{zone := Zone}, conninfo = ConnInfo}
+) ->
+    Val =
+        case maps:get(conn_shared_state, ConnInfo, undefined) of
+            #{cnts_ref := CntRef} ->
+                fun() ->
+                    emqx_quic_connection:read_cnt(CntRef, control_packet)
+                end;
+            undefined ->
+                emqx_pd:get_counter(recv_pkt)
+        end,
+    Keepalive = emqx_keepalive:init(Zone, Val, Interval),
     ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}).
 
 clear_keepalive(Channel = #channel{timers = Timers}) ->

+ 17 - 8
apps/emqx/src/emqx_connection.erl

@@ -120,8 +120,8 @@
     %% limiter timers
     limiter_timer :: undefined | reference(),
 
-    %% QUIC conn owner pid if in use.
-    quic_conn_pid :: option(pid())
+    %% QUIC conn shared state
+    quic_conn_ss :: option(map())
 }).
 
 -record(retry, {
@@ -317,7 +317,8 @@ init_state(
         sockname => Sockname,
         peercert => Peercert,
         peersni => PeerSNI,
-        conn_mod => ?MODULE
+        conn_mod => ?MODULE,
+        sock => Socket
     },
 
     LimiterTypes = [?LIMITER_BYTES_IN, ?LIMITER_MESSAGE_IN],
@@ -365,7 +366,7 @@ init_state(
         limiter_buffer = queue:new(),
         limiter_timer = undefined,
         %% for quic streams to inherit
-        quic_conn_pid = maps:get(conn_pid, Opts, undefined)
+        quic_conn_ss = maps:get(conn_shared_state, Opts, undefined)
     }.
 
 run_loop(
@@ -595,11 +596,13 @@ handle_msg(
         channel = Channel,
         serialize = Serialize,
         parse_state = PS,
-        quic_conn_pid = QuicConnPid
+        quic_conn_ss = QSS
     }
 ) ->
-    QuicConnPid =/= undefined andalso
-        emqx_quic_connection:activate_data_streams(QuicConnPid, {PS, Serialize, Channel}),
+    QSS =/= undefined andalso
+        emqx_quic_connection:activate_data_streams(
+            maps:get(conn_pid, QSS), {PS, Serialize, Channel}
+        ),
     ClientId = emqx_channel:info(clientid, Channel),
     emqx_cm:insert_channel_info(ClientId, info(State), stats(State));
 handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
@@ -799,7 +802,13 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
 %%--------------------------------------------------------------------
 %% Handle incoming packet
 
-handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) ->
+handle_incoming(Packet, #state{quic_conn_ss = QSS} = State) when is_record(Packet, mqtt_packet) ->
+    QSS =/= undefined andalso
+        emqx_quic_connection:step_cnt(
+            maps:get(cnts_ref, QSS),
+            control_packet,
+            1
+        ),
     ok = inc_incoming_stats(Packet),
     with_channel(handle_in, [Packet], State);
 handle_incoming(FrameError, State) ->

+ 18 - 3
apps/emqx/src/emqx_keepalive.erl

@@ -35,6 +35,8 @@
     check_interval :: pos_integer(),
     %% the received packets since last keepalive check
     statval :: non_neg_integer(),
+    %% stat reader func
+    stat_reader :: fun() | undefined,
     %% The number of idle intervals allowed before disconnecting the client.
     idle_milliseconds = 0 :: non_neg_integer(),
     max_idle_millisecond :: pos_integer()
@@ -65,18 +67,26 @@ init(Zone, Interval) ->
 %% @doc Init keepalive.
 -spec init(
     Zone :: atom(),
-    StatVal :: non_neg_integer(),
+    StatVal :: non_neg_integer() | Reader :: fun(),
     Second :: non_neg_integer()
 ) -> keepalive() | undefined.
-init(Zone, StatVal, Second) when Second > 0 andalso Second =< ?MAX_INTERVAL ->
+init(Zone, Stat, Second) when Second > 0 andalso Second =< ?MAX_INTERVAL ->
     #{keepalive_multiplier := Mul, keepalive_check_interval := CheckInterval} =
         emqx_config:get_zone_conf(Zone, [mqtt]),
     MilliSeconds = timer:seconds(Second),
     Interval = emqx_utils:clamp(CheckInterval, 1000, max(MilliSeconds div 2, 1000)),
     MaxIdleMs = ceil(MilliSeconds * Mul),
+    {StatVal, ReaderFun} =
+        case is_function(Stat) of
+            true ->
+                {Stat(), Stat};
+            false ->
+                {Stat, undefined}
+        end,
     #keepalive{
         check_interval = Interval,
         statval = StatVal,
+        stat_reader = ReaderFun,
         idle_milliseconds = 0,
         max_idle_millisecond = MaxIdleMs
     };
@@ -110,9 +120,14 @@ info(idle_milliseconds, #keepalive{idle_milliseconds = Val}) ->
 info(check_interval, undefined) ->
     0.
 
-check(Keepalive = #keepalive{}) ->
+check(Keepalive = #keepalive{stat_reader = undefined}) ->
     RecvCnt = emqx_pd:get_counter(recv_pkt),
     check(RecvCnt, Keepalive);
+check(Keepalive = #keepalive{stat_reader = ReaderFun}) when
+    is_function(ReaderFun)
+->
+    RecvCnt = ReaderFun(),
+    check(RecvCnt, Keepalive);
 check(Keepalive) ->
     {ok, Keepalive}.
 

+ 31 - 6
apps/emqx/src/emqx_quic_connection.erl

@@ -47,6 +47,12 @@
     handle_info/2
 ]).
 
+%% Connection scope shared counter
+-export([step_cnt/3]).
+-export([read_cnt/2]).
+
+-define(MAX_CNTS, 8).
+
 -export_type([cb_state/0, cb_ret/0]).
 
 -type cb_state() :: #{
@@ -62,6 +68,8 @@
     streams := [{pid(), quicer:stream_handle()}],
     %% New stream opts
     stream_opts := map(),
+    %% Connection Scope Counters, shared by streams for MQTT layer
+    cnts_ref := counters:counters_ref(),
     %% If connection is resumed from session ticket
     is_resumed => boolean(),
     %% mqtt message serializer config
@@ -102,7 +110,7 @@ new_conn(
     #{zone := Zone, conn := undefined, ctrl_pid := undefined} = S
 ) ->
     process_flag(trap_exit, true),
-    ?SLOG(debug, ConnInfo),
+    ?SLOG(debug, ConnInfo#{conn => Conn}),
     case emqx_olp:is_overloaded() andalso is_zone_olp_enabled(Zone) of
         false ->
             %% Start control stream process
@@ -160,7 +168,8 @@ new_stream(
         limiter := Limiter,
         parse_state := PS,
         channel := Channel,
-        serialize := Serialize
+        serialize := Serialize,
+        conn_shared_state := SS
     } = S
 ) ->
     %% Cherry pick options for data streams
@@ -172,7 +181,8 @@ new_stream(
         parse_state => PS,
         channel => Channel,
         serialize => Serialize,
-        quic_event_mask => ?QUICER_STREAM_EVENT_MASK_START_COMPLETE
+        quic_event_mask => ?QUICER_STREAM_EVENT_MASK_START_COMPLETE,
+        conn_shared_state => SS
     },
     {ok, NewStreamOwner} = quicer_stream:start_link(
         emqx_quic_data_stream,
@@ -235,7 +245,7 @@ streams_available(_C, {BidirCnt, UnidirCnt}, S) ->
     cb_ret().
 peer_needs_streams(_C, _StreamType, S) ->
     ?SLOG(info, #{
-        msg => "ignore_peer_needs_more_streams", info => maps:with([conn_pid, ctrl_pid], S)
+        msg => "ignore_peer_needs_more_streams", info => maps:with([conn_shared_state, ctrl_pid], S)
     }),
     {ok, S}.
 
@@ -288,6 +298,17 @@ handle_info({'EXIT', Pid, Reason}, #{streams := Streams} = S) ->
             {stop, unknown_pid_down, S}
     end.
 
+-spec step_cnt(counters:counters_ref(), control_packet, integer()) -> ok.
+step_cnt(CounterRef, Name, Incr) when is_atom(Name) ->
+    counters:add(CounterRef, cnt_id(Name), Incr).
+
+-spec read_cnt(counters:counters_ref(), control_packet) -> integer().
+read_cnt(CounterRef, Name) ->
+    counters:get(CounterRef, cnt_id(Name)).
+
+cnt_id(control_packet) ->
+    1.
+
 %%%
 %%%  Internals
 %%%
@@ -302,15 +323,19 @@ is_zone_olp_enabled(Zone) ->
 
 -spec init_cb_state(map()) -> cb_state().
 init_cb_state(#{zone := _Zone} = Map) ->
+    SS = #{
+        cnts_ref => counters:new(?MAX_CNTS, [write_concurrency]),
+        conn_pid => self()
+    },
     Map#{
-        conn_pid => self(),
         ctrl_pid => undefined,
         conn => undefined,
         streams => [],
         parse_state => undefined,
         channel => undefined,
         serialize => undefined,
-        is_resumed => false
+        is_resumed => false,
+        conn_shared_state => SS
     }.
 
 %% BUILD_WITHOUT_QUIC

+ 18 - 8
apps/emqx/src/emqx_quic_data_stream.erl

@@ -82,11 +82,11 @@ activate_data(StreamPid, {PS, Serialize, Channel}) ->
     {ok, cb_state()}.
 init_handoff(
     Stream,
-    _StreamOpts,
+    #{conn_shared_state := ConnSharedState} = _StreamOpts,
     Connection,
     #{is_orphan := true, flags := Flags}
 ) ->
-    {ok, init_state(Stream, Connection, Flags)}.
+    {ok, init_state(Stream, Connection, Flags, ConnSharedState)}.
 
 %%
 %% @doc Post handoff data stream
@@ -215,10 +215,17 @@ do_handle_appl_msg(
         {error, E} ->
             {stop, E, S}
     end;
-do_handle_appl_msg({incoming, #mqtt_packet{} = Packet}, #{channel := Channel} = S) when
+do_handle_appl_msg(
+    {incoming, #mqtt_packet{} = Packet},
+    #{
+        channel := Channel,
+        conn_shared_state := #{cnts_ref := SharedCntsRef}
+    } = S
+) when
     Channel =/= undefined
 ->
     ok = inc_incoming_stats(Packet),
+    _ = emqx_quic_connection:step_cnt(SharedCntsRef, control_packet, 1),
     with_channel(handle_in, [Packet], S);
 do_handle_appl_msg({incoming, {frame_error, _} = FE}, #{channel := Channel} = S) when
     Channel =/= undefined
@@ -321,14 +328,15 @@ serialize_packet(Packet, Serialize) ->
 -spec init_state(
     quicer:stream_handle(),
     quicer:connection_handle(),
-    non_neg_integer()
+    non_neg_integer(),
+    map()
 ) ->
     % @TODO
     map().
-init_state(Stream, Connection, OpenFlags) ->
-    init_state(Stream, Connection, OpenFlags, undefined).
+init_state(Stream, Connection, OpenFlags, ConnSharedState) ->
+    init_state(Stream, Connection, OpenFlags, ConnSharedState, undefined).
 
-init_state(Stream, Connection, OpenFlags, PS) ->
+init_state(Stream, Connection, OpenFlags, ConnSharedState, PS) ->
     %% quic stream handle
     #{
         stream => Stream,
@@ -350,7 +358,9 @@ init_state(Stream, Connection, OpenFlags, PS) ->
         %% serialize opts for connection
         serialize => undefined,
         %% Current working queue
-        task_queue => queue:new()
+        task_queue => queue:new(),
+        %% Connection Shared State
+        conn_shared_state => ConnSharedState
     }.
 
 -spec do_handle_call(term(), cb_state()) -> cb_ret().

+ 179 - 0
apps/emqx/test/emqx_quic_multistreams_SUITE.erl

@@ -1980,6 +1980,185 @@ t_listener_with_lowlevel_settings(_Config) ->
     ]),
     ok = emqtt:disconnect(C).
 
+t_keep_alive(Config) ->
+    process_flag(trap_exit, true),
+
+    Topic = atom_to_binary(?FUNCTION_NAME),
+    PubQos = ?config(pub_qos, Config),
+    SubQos = ?config(sub_qos, Config),
+    RecQos = calc_qos(PubQos, SubQos),
+    PktId1 = calc_pkt_id(RecQos, 1),
+    Topic2 = <<Topic/binary, "_two">>,
+    %% GIVEN: keepalive is 2s
+    {ok, C} = emqtt:start_link([{proto_ver, v5}, {force_ping, false}, {keepalive, 2} | Config]),
+    {ok, _} = emqtt:quic_connect(C),
+
+    %% WHEN: we have active data on data stream only
+    %% but keep client ctrl stream quiet with meck
+    meck:new(emqtt, [no_link, passthrough, no_history]),
+    meck:expect(emqtt, connected, fun
+        (info, {timeout, _TRef, keepalive}, State) ->
+            {keep_state, State};
+        (Arg1, Arg2, Arg3) ->
+            meck:passthrough([Arg1, Arg2, Arg3])
+    end),
+    {ok, _, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [
+        {Topic, [{qos, SubQos}]}
+    ]),
+    {ok, _, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [
+        {Topic2, [{qos, SubQos}]}
+    ]),
+    ok = emqtt:publish_async(
+        C,
+        {new_data_stream, []},
+        Topic,
+        <<"stream data 1">>,
+        [{qos, PubQos}],
+        undefined
+    ),
+    ok = emqtt:publish_async(
+        C,
+        {new_data_stream, []},
+        Topic2,
+        <<"stream data 2">>,
+        [{qos, PubQos}],
+        undefined
+    ),
+    PubRecvs = recv_pub(2),
+
+    ?assertMatch(
+        [
+            {publish, #{
+                client_pid := C,
+                packet_id := PktId1,
+                payload := <<"stream data", _/binary>>,
+                qos := RecQos
+            }},
+            {publish, #{
+                client_pid := C,
+                packet_id := PktId1,
+                payload := <<"stream data", _/binary>>,
+                qos := RecQos
+            }}
+        ],
+        PubRecvs
+    ),
+    Payloads = [P || {publish, #{payload := P}} <- PubRecvs],
+    ?assert(
+        [<<"stream data 1">>, <<"stream data 2">>] == Payloads orelse
+            [<<"stream data 2">>, <<"stream data 1">>] == Payloads
+    ),
+
+    %% THEN: after 4s, idle timeout , client should get disconnected.
+    receive
+        {disconnected, ?RC_KEEP_ALIVE_TIMEOUT, _} ->
+            meck:unload(emqtt),
+            ok
+    after 4000 ->
+        meck:unload(emqtt),
+        ct:fail("Didnt shutdown ~p", [process_info(self(), messages)])
+    end.
+
+t_keep_alive_idle_ctrl_stream(Config) ->
+    process_flag(trap_exit, true),
+
+    Topic = atom_to_binary(?FUNCTION_NAME),
+    PubQos = ?config(pub_qos, Config),
+    SubQos = ?config(sub_qos, Config),
+    RecQos = calc_qos(PubQos, SubQos),
+    PktId1 = calc_pkt_id(RecQos, 1),
+    Topic2 = <<Topic/binary, "_two">>,
+    %% GIVEN: keepalive is 2s
+    {ok, C} = emqtt:start_link([{proto_ver, v5}, {force_ping, false}, {keepalive, 2} | Config]),
+    {ok, _} = emqtt:quic_connect(C),
+
+    %% WHEN: we have active data on data stream only
+    %% but keep ctrl stream quiet with meck
+    meck:new(emqtt, [no_link, passthrough, no_history]),
+    meck:expect(emqtt, connected, fun
+        (info, {timeout, _TRef, keepalive}, State) ->
+            {keep_state, State};
+        (Arg1, Arg2, Arg3) ->
+            meck:passthrough([Arg1, Arg2, Arg3])
+    end),
+    {ok, _, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [
+        {Topic, [{qos, SubQos}]}
+    ]),
+    {ok, _, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [
+        {Topic2, [{qos, SubQos}]}
+    ]),
+    ok = emqtt:publish_async(
+        C,
+        {new_data_stream, []},
+        Topic,
+        <<"stream data 1">>,
+        [{qos, PubQos}],
+        undefined
+    ),
+    ok = emqtt:publish_async(
+        C,
+        {new_data_stream, []},
+        Topic2,
+        <<"stream data 2">>,
+        [{qos, PubQos}],
+        undefined
+    ),
+    PubRecvs = recv_pub(2),
+
+    ?assertMatch(
+        [
+            {publish, #{
+                client_pid := C,
+                packet_id := PktId1,
+                payload := <<"stream data", _/binary>>,
+                qos := RecQos
+            }},
+            {publish, #{
+                client_pid := C,
+                packet_id := PktId1,
+                payload := <<"stream data", _/binary>>,
+                qos := RecQos
+            }}
+        ],
+        PubRecvs
+    ),
+    Payloads = [P || {publish, #{payload := P}} <- PubRecvs],
+    ?assert(
+        [<<"stream data 1">>, <<"stream data 2">>] == Payloads orelse
+            [<<"stream data 2">>, <<"stream data 1">>] == Payloads
+    ),
+
+    %% WHEN: keep data stream still active
+    timer:sleep(1000),
+    ok = emqtt:publish_async(
+        C,
+        {new_data_stream, []},
+        Topic,
+        <<"stream data 1">>,
+        [{qos, PubQos}],
+        undefined
+    ),
+    ok = emqtt:publish_async(
+        C,
+        {new_data_stream, []},
+        Topic2,
+        <<"stream data 2">>,
+        [{qos, PubQos}],
+        undefined
+    ),
+
+    %% THEN: after 4s, client should NOT get disconnected,
+    %%       because data stream is active.
+    receive
+        {disconnected, ?RC_KEEP_ALIVE_TIMEOUT, _} ->
+            meck:unload(emqtt),
+            ct:fail("Should not disconnect")
+        %% 4s - 1s
+    after 3000 ->
+        meck:unload(emqtt),
+        ok
+    end.
+
 %%--------------------------------------------------------------------
 %% Helper functions
 %%--------------------------------------------------------------------

+ 3 - 2
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -1768,12 +1768,13 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}, Opts) ->
     Node = maps:get(node, ClientInfo0, WhichNode),
     ClientInfo1 = emqx_utils_maps:deep_remove([conninfo, clientid], ClientInfo0),
     ClientInfo2 = emqx_utils_maps:deep_remove([conninfo, username], ClientInfo1),
+    ClientInfo3 = emqx_utils_maps:deep_remove([conninfo, sock], ClientInfo2),
     StatsMap = maps:without(
         [memory, next_pkt_id, total_heap_size],
         maps:from_list(ClientStats)
     ),
-    ClientInfo3 = maps:remove(will_msg, ClientInfo2),
-    ClientInfoMap0 = maps:fold(fun take_maps_from_inner/3, #{}, ClientInfo3),
+    ClientInfo4 = maps:remove(will_msg, ClientInfo3),
+    ClientInfoMap0 = maps:fold(fun take_maps_from_inner/3, #{}, ClientInfo4),
     {IpAddress, Port} = peername_dispart(maps:get(peername, ClientInfoMap0)),
     Connected = maps:get(conn_state, ClientInfoMap0) =:= connected,
     ClientInfoMap1 = maps:merge(StatsMap, ClientInfoMap0),

+ 1 - 1
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -1102,7 +1102,7 @@ t_keepalive(Config) ->
     [Pid] = emqx_cm:lookup_channels(list_to_binary(ClientId)),
     %% will reset to max keepalive if keepalive > max keepalive
     #{conninfo := #{keepalive := InitKeepalive}} = emqx_connection:info(Pid),
-    ?assertMatch({keepalive, _, _, _, 65536500}, element(5, element(9, sys:get_state(Pid)))),
+    ?assertMatch({keepalive, _, _, _, _, 65536500}, element(5, element(9, sys:get_state(Pid)))),
 
     ?assertMatch(
         {ok, {?HTTP200, _, #{<<"keepalive">> := 11}}},

+ 6 - 0
changes/ce/feat-13814.en.md

@@ -0,0 +1,6 @@
+Connection Scope Keepalive for MQTT over QUIC Multi-Stream:
+
+Introduced a new feature to keep MQTT connections alive when data streams are active but contrl stream is quiet.
+Previously, clients were required to send MQTT.PINGREQ on idle control streams to keep the connection alive.
+A shared state is now maintained for each connection, tracking activity from all streams.
+This shared state is used to determine if the connection is still alive, reducing the risk of keepalive timeouts due to Head-of-Line (HOL) blocking.