Просмотр исходного кода

feat: improve keepalive_multiplier and keepalive_check_interval

zhongwencool 1 год назад
Родитель
Сommit
4942f6f75a

+ 20 - 13
apps/emqx/src/emqx_channel.erl

@@ -544,8 +544,10 @@ handle_in(
         {error, ReasonCode} ->
             handle_out(disconnect, ReasonCode, Channel)
     end;
-handle_in(?PACKET(?PINGREQ), Channel) ->
-    {ok, ?PACKET(?PINGRESP), Channel};
+handle_in(?PACKET(?PINGREQ), Channel = #channel{keepalive = Keepalive}) ->
+    {ok, NKeepalive} = emqx_keepalive:check(Keepalive),
+    NChannel = Channel#channel{keepalive = NKeepalive},
+    {ok, ?PACKET(?PINGRESP), reset_timer(keepalive, NChannel)};
 handle_in(
     ?DISCONNECT_PACKET(ReasonCode, Properties),
     Channel = #channel{conninfo = ConnInfo}
@@ -1229,11 +1231,12 @@ handle_call(
     {keepalive, Interval},
     Channel = #channel{
         keepalive = KeepAlive,
-        conninfo = ConnInfo
+        conninfo = ConnInfo,
+        clientinfo = #{zone := Zone}
     }
 ) ->
     ClientId = info(clientid, Channel),
-    NKeepalive = emqx_keepalive:update(timer:seconds(Interval), KeepAlive),
+    NKeepalive = emqx_keepalive:update(Zone, Interval, KeepAlive),
     NConnInfo = maps:put(keepalive, Interval, ConnInfo),
     NChannel = Channel#channel{keepalive = NKeepalive, conninfo = NConnInfo},
     SockInfo = maps:get(sockinfo, emqx_cm:get_chan_info(ClientId), #{}),
@@ -1333,22 +1336,22 @@ die_if_test_compiled() ->
     | {shutdown, Reason :: term(), channel()}.
 handle_timeout(
     _TRef,
-    {keepalive, _StatVal},
+    keepalive,
     Channel = #channel{keepalive = undefined}
 ) ->
     {ok, Channel};
 handle_timeout(
     _TRef,
-    {keepalive, _StatVal},
+    keepalive,
     Channel = #channel{conn_state = disconnected}
 ) ->
     {ok, Channel};
 handle_timeout(
     _TRef,
-    {keepalive, StatVal},
+    keepalive,
     Channel = #channel{keepalive = Keepalive}
 ) ->
-    case emqx_keepalive:check(StatVal, Keepalive) of
+    case emqx_keepalive:check(Keepalive) of
         {ok, NKeepalive} ->
             NChannel = Channel#channel{keepalive = NKeepalive},
             {ok, reset_timer(keepalive, NChannel)};
@@ -1459,10 +1462,16 @@ reset_timer(Name, Time, Channel) ->
     ensure_timer(Name, Time, clean_timer(Name, Channel)).
 
 clean_timer(Name, Channel = #channel{timers = Timers}) ->
-    Channel#channel{timers = maps:remove(Name, Timers)}.
+    case maps:take(Name, Timers) of
+        error ->
+            Channel;
+        {TRef, NTimers} ->
+            ok = emqx_utils:cancel_timer(TRef),
+            Channel#channel{timers = NTimers}
+    end.
 
 interval(keepalive, #channel{keepalive = KeepAlive}) ->
-    emqx_keepalive:info(interval, KeepAlive);
+    emqx_keepalive:info(check_interval, KeepAlive);
 interval(retry_delivery, #channel{session = Session}) ->
     emqx_session:info(retry_interval, Session);
 interval(expire_awaiting_rel, #channel{session = Session}) ->
@@ -2320,9 +2329,7 @@ ensure_keepalive_timer(0, Channel) ->
 ensure_keepalive_timer(disabled, Channel) ->
     Channel;
 ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) ->
-    Multiplier = get_mqtt_conf(Zone, keepalive_multiplier),
-    RecvCnt = emqx_pd:get_counter(recv_pkt),
-    Keepalive = emqx_keepalive:init(RecvCnt, round(timer:seconds(Interval) * Multiplier)),
+    Keepalive = emqx_keepalive:init(Zone, Interval),
     ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}).
 
 clear_keepalive(Channel = #channel{timers = Timers}) ->

+ 1 - 3
apps/emqx/src/emqx_connection.erl

@@ -729,9 +729,7 @@ handle_timeout(
         disconnected ->
             {ok, State};
         _ ->
-            %% recv_pkt: valid MQTT message
-            RecvCnt = emqx_pd:get_counter(recv_pkt),
-            handle_timeout(TRef, {keepalive, RecvCnt}, State)
+            with_channel(handle_timeout, [TRef, keepalive], State)
     end;
 handle_timeout(TRef, Msg, State) ->
     with_channel(handle_timeout, [TRef, Msg], State).

+ 77 - 21
apps/emqx/src/emqx_keepalive.erl

@@ -19,10 +19,12 @@
 -export([
     init/1,
     init/2,
+    init/3,
     info/1,
     info/2,
+    check/1,
     check/2,
-    update/2
+    update/3
 ]).
 
 -elvis([{elvis_style, no_if_expression, disable}]).
@@ -30,8 +32,12 @@
 -export_type([keepalive/0]).
 
 -record(keepalive, {
-    interval :: pos_integer(),
-    statval :: non_neg_integer()
+    check_interval :: pos_integer(),
+    %% the received packets since last keepalive check
+    statval :: non_neg_integer(),
+    %% The number of idle intervals allowed before disconnecting the client.
+    idle_milliseconds = 0 :: non_neg_integer(),
+    max_idle_millisecond :: pos_integer()
 }).
 
 -opaque keepalive() :: #keepalive{}.
@@ -39,7 +45,11 @@
 
 %% @doc Init keepalive.
 -spec init(Interval :: non_neg_integer()) -> keepalive().
-init(Interval) -> init(0, Interval).
+init(Interval) -> init(default, 0, Interval).
+
+init(Zone, Interval) ->
+    RecvCnt = emqx_pd:get_counter(recv_pkt),
+    init(Zone, RecvCnt, Interval).
 
 %% from mqtt-v3.1.1 specific
 %% A Keep Alive value of zero (0) has the effect of turning off the keep alive mechanism.
@@ -53,42 +63,88 @@ init(Interval) -> init(0, Interval).
 %% typically this is a few minutes.
 %% The maximum value is (65535s) 18 hours 12 minutes and 15 seconds.
 %% @doc Init keepalive.
--spec init(StatVal :: non_neg_integer(), Interval :: non_neg_integer()) -> keepalive() | undefined.
-init(StatVal, Interval) when Interval > 0 andalso Interval =< ?MAX_INTERVAL ->
-    #keepalive{interval = Interval, statval = StatVal};
-init(_, 0) ->
+-spec init(
+    Zone :: atom(),
+    StatVal :: non_neg_integer(),
+    Second :: non_neg_integer()
+) -> keepalive() | undefined.
+init(Zone, StatVal, Second) when Second > 0 andalso Second =< ?MAX_INTERVAL ->
+    #{keepalive_multiplier := Mul, keepalive_check_interval := CheckInterval} =
+        emqx_config:get_zone_conf(Zone, [mqtt]),
+    MilliSeconds = timer:seconds(Second),
+    Interval = emqx_utils:clamp(CheckInterval, 1000, max(MilliSeconds div 2, 1000)),
+    MaxIdleMs = ceil(MilliSeconds * Mul),
+    #keepalive{
+        check_interval = Interval,
+        statval = StatVal,
+        idle_milliseconds = 0,
+        max_idle_millisecond = MaxIdleMs
+    };
+init(_Zone, _, 0) ->
     undefined;
-init(StatVal, Interval) when Interval > ?MAX_INTERVAL -> init(StatVal, ?MAX_INTERVAL).
+init(Zone, StatVal, Interval) when Interval > ?MAX_INTERVAL -> init(Zone, StatVal, ?MAX_INTERVAL).
 
 %% @doc Get Info of the keepalive.
 -spec info(keepalive()) -> emqx_types:infos().
 info(#keepalive{
-    interval = Interval,
-    statval = StatVal
+    check_interval = Interval,
+    statval = StatVal,
+    idle_milliseconds = IdleIntervals,
+    max_idle_millisecond = MaxMs
 }) ->
     #{
-        interval => Interval,
-        statval => StatVal
+        check_interval => Interval,
+        statval => StatVal,
+        idle_milliseconds => IdleIntervals,
+        max_idle_millisecond => MaxMs
     }.
 
--spec info(interval | statval, keepalive()) ->
+-spec info(check_interval | statval | idle_milliseconds, keepalive()) ->
     non_neg_integer().
-info(interval, #keepalive{interval = Interval}) ->
+info(check_interval, #keepalive{check_interval = Interval}) ->
     Interval;
 info(statval, #keepalive{statval = StatVal}) ->
     StatVal;
-info(interval, undefined) ->
+info(idle_milliseconds, #keepalive{idle_milliseconds = Val}) ->
+    Val;
+info(check_interval, undefined) ->
     0.
 
+check(Keepalive = #keepalive{}) ->
+    RecvCnt = emqx_pd:get_counter(recv_pkt),
+    check(RecvCnt, Keepalive);
+check(Keepalive) ->
+    {ok, Keepalive}.
+
 %% @doc Check keepalive.
 -spec check(non_neg_integer(), keepalive()) ->
     {ok, keepalive()} | {error, timeout}.
-check(Val, #keepalive{statval = Val}) -> {error, timeout};
-check(Val, KeepAlive) -> {ok, KeepAlive#keepalive{statval = Val}}.
+
+check(
+    NewVal,
+    #keepalive{
+        statval = NewVal,
+        idle_milliseconds = IdleAcc,
+        check_interval = Interval,
+        max_idle_millisecond = Max
+    }
+) when IdleAcc + Interval >= Max ->
+    {error, timeout};
+check(
+    NewVal,
+    #keepalive{
+        statval = NewVal,
+        idle_milliseconds = IdleAcc,
+        check_interval = Interval
+    } = KeepAlive
+) ->
+    {ok, KeepAlive#keepalive{statval = NewVal, idle_milliseconds = IdleAcc + Interval}};
+check(NewVal, #keepalive{} = KeepAlive) ->
+    {ok, KeepAlive#keepalive{statval = NewVal, idle_milliseconds = 0}}.
 
 %% @doc Update keepalive.
 %% The statval of the previous keepalive will be used,
 %% and normal checks will begin from the next cycle.
--spec update(non_neg_integer(), keepalive() | undefined) -> keepalive() | undefined.
-update(Interval, undefined) -> init(0, Interval);
-update(Interval, #keepalive{statval = StatVal}) -> init(StatVal, Interval).
+-spec update(atom(), non_neg_integer(), keepalive() | undefined) -> keepalive() | undefined.
+update(Zone, Interval, undefined) -> init(Zone, 0, Interval);
+update(Zone, Interval, #keepalive{statval = StatVal}) -> init(Zone, StatVal, Interval).

+ 9 - 1
apps/emqx/src/emqx_schema.erl

@@ -3613,9 +3613,17 @@ mqtt_general() ->
                     desc => ?DESC(mqtt_keepalive_multiplier)
                 }
             )},
+        {"keepalive_check_interval",
+            sc(
+                timeout_duration(),
+                #{
+                    default => <<"30s">>,
+                    desc => ?DESC(mqtt_keepalive_check_interval)
+                }
+            )},
         {"retry_interval",
             sc(
-                duration(),
+                timeout_duration(),
                 #{
                     default => <<"30s">>,
                     desc => ?DESC(mqtt_retry_interval)

+ 1 - 2
apps/emqx/src/emqx_ws_connection.erl

@@ -555,8 +555,7 @@ handle_info(Info, State) ->
 handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) ->
     shutdown(idle_timeout, State);
 handle_timeout(TRef, keepalive, State) when is_reference(TRef) ->
-    RecvOct = emqx_pd:get_counter(recv_oct),
-    handle_timeout(TRef, {keepalive, RecvOct}, State);
+    with_channel(handle_timeout, [TRef, keepalive], State);
 handle_timeout(
     TRef,
     emit_stats,

+ 1 - 0
apps/emqx/test/emqx_config_SUITE.erl

@@ -428,6 +428,7 @@ zone_global_defaults() ->
                 ignore_loop_deliver => false,
                 keepalive_backoff => 0.75,
                 keepalive_multiplier => 1.5,
+                keepalive_check_interval => 30000,
                 max_awaiting_rel => 100,
                 max_clientid_len => 65535,
                 max_inflight => 32,

+ 162 - 4
apps/emqx/test/emqx_keepalive_SUITE.erl

@@ -19,22 +19,180 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-include_lib("emqx/include/emqx.hrl").
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
 
 all() -> emqx_common_test_helpers:all(?MODULE).
 
+init_per_suite(Config) ->
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx,
+                "listeners {"
+                "tcp.default.bind = 1883,"
+                "ssl.default = marked_for_deletion,"
+                "quic.default = marked_for_deletion,"
+                "ws.default = marked_for_deletion,"
+                "wss.default = marked_for_deletion"
+                "}"}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    [{apps, Apps} | Config].
+
+end_per_suite(Config) ->
+    emqx_cth_suite:stop(?config(apps, Config)).
+
+t_check_keepalive_default_timeout(_) ->
+    emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5),
+    emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 30000),
+    erlang:process_flag(trap_exit, true),
+    ClientID = <<"default">>,
+    KeepaliveSec = 10,
+    {ok, C} = emqtt:start_link([
+        {keepalive, KeepaliveSec},
+        {clientid, binary_to_list(ClientID)}
+    ]),
+    {ok, _} = emqtt:connect(C),
+    emqtt:pause(C),
+    [ChannelPid] = emqx_cm:lookup_channels(ClientID),
+    erlang:link(ChannelPid),
+    CheckInterval = emqx_utils:clamp(keepalive_check_interval(), 1000, 5000),
+    ?assertMatch(5000, CheckInterval),
+    %% when keepalive_check_interval is 30s and keepalive_multiplier is 1.5
+    %% connect T0(packet = 1, idle_milliseconds = 0)
+    %% check1 T1(packet = 1, idle_milliseconds = 1 * CheckInterval = 5000)
+    %% check2 T2(packet = 1, idle_milliseconds = 2 * CheckInterval = 10000)
+    %% check2 T3(packet = 1, idle_milliseconds = 3 * CheckInterval = 15000) -> timeout
+    Timeout = CheckInterval * 3,
+    %% connector but not send a packet.
+    ?assertMatch(
+        no_keepalive_timeout_received,
+        receive_msg_in_time(ChannelPid, C, Timeout - 200),
+        Timeout - 200
+    ),
+    ?assertMatch(ok, receive_msg_in_time(ChannelPid, C, 1200)).
+
+t_check_keepalive_other_timeout(_) ->
+    emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5),
+    emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 2000),
+    erlang:process_flag(trap_exit, true),
+    ClientID = <<"other">>,
+    KeepaliveSec = 10,
+    {ok, C} = emqtt:start_link([
+        {keepalive, KeepaliveSec},
+        {clientid, binary_to_list(ClientID)}
+    ]),
+    {ok, _} = emqtt:connect(C),
+    emqtt:pause(C),
+    {ok, _, [0]} = emqtt:subscribe(C, <<"mytopic">>, []),
+    [ChannelPid] = emqx_cm:lookup_channels(ClientID),
+    erlang:link(ChannelPid),
+    %%CheckInterval = ceil(keepalive_check_factor() * KeepaliveSec * 1000),
+    CheckInterval = emqx_utils:clamp(keepalive_check_interval(), 1000, 5000),
+    ?assertMatch(2000, CheckInterval),
+    %% when keepalive_check_interval is 2s and keepalive_multiplier is 1.5
+    %% connect T0(packet = 1, idle_milliseconds = 0)
+    %% subscribe T1(packet = 2, idle_milliseconds = 0)
+    %% check1 T2(packet = 2, idle_milliseconds = 1 * CheckInterval = 2000)
+    %% check2 T3(packet = 2, idle_milliseconds = 2 * CheckInterval = 4000)
+    %% check3 T4(packet = 2, idle_milliseconds = 3 * CheckInterval = 6000)
+    %% check4 T5(packet = 2, idle_milliseconds = 4 * CheckInterval = 8000)
+    %% check4 T6(packet = 2, idle_milliseconds = 5 * CheckInterval = 10000)
+    %% check4 T7(packet = 2, idle_milliseconds = 6 * CheckInterval = 12000)
+    %% check4 T8(packet = 2, idle_milliseconds = 7 * CheckInterval = 14000)
+    %% check4 T9(packet = 2, idle_milliseconds = 8 * CheckInterval = 16000) > 15000 timeout
+    Timeout = CheckInterval * 9,
+    ?assertMatch(
+        no_keepalive_timeout_received,
+        receive_msg_in_time(ChannelPid, C, Timeout - 200),
+        Timeout - 200
+    ),
+    ?assertMatch(ok, receive_msg_in_time(ChannelPid, C, 1200), Timeout).
+
+t_check_keepalive_ping_reset_timer(_) ->
+    emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5),
+    emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 100000),
+    erlang:process_flag(trap_exit, true),
+    ClientID = <<"ping_reset">>,
+    KeepaliveSec = 10,
+    {ok, C} = emqtt:start_link([
+        {keepalive, KeepaliveSec},
+        {clientid, binary_to_list(ClientID)}
+    ]),
+    {ok, _} = emqtt:connect(C),
+    emqtt:pause(C),
+    ct:sleep(1000),
+    emqtt:resume(C),
+    pong = emqtt:ping(C),
+    emqtt:pause(C),
+    [ChannelPid] = emqx_cm:lookup_channels(ClientID),
+    erlang:link(ChannelPid),
+    CheckInterval = emqx_utils:clamp(keepalive_check_interval(), 1000, 5000),
+    ?assertMatch(5000, CheckInterval),
+    %% when keepalive_check_interval is 30s and keepalive_multiplier is 1.5
+    %% connect T0(packet = 1, idle_milliseconds = 0)
+    %% sleep 1000ms
+    %% ping (packet = 2, idle_milliseconds = 0) restart timer
+    %% check1 T1(packet = 1, idle_milliseconds = 1 * CheckInterval = 5000)
+    %% check2 T2(packet = 1, idle_milliseconds = 2 * CheckInterval = 10000)
+    %% check2 T3(packet = 1, idle_milliseconds = 3 * CheckInterval = 15000) -> timeout
+    Timeout = CheckInterval * 3,
+    ?assertMatch(
+        no_keepalive_timeout_received,
+        receive_msg_in_time(ChannelPid, C, Timeout - 200),
+        Timeout - 200
+    ),
+    ?assertMatch(ok, receive_msg_in_time(ChannelPid, C, 1200)).
+
 t_check(_) ->
+    emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5),
+    emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 30000),
     Keepalive = emqx_keepalive:init(60),
-    ?assertEqual(60, emqx_keepalive:info(interval, Keepalive)),
+    ?assertEqual(30000, emqx_keepalive:info(check_interval, Keepalive)),
     ?assertEqual(0, emqx_keepalive:info(statval, Keepalive)),
     Info = emqx_keepalive:info(Keepalive),
     ?assertEqual(
         #{
-            interval => 60,
-            statval => 0
+            check_interval => 30000,
+            statval => 0,
+            idle_milliseconds => 0,
+            %% 60 * 1.5 * 1000
+            max_idle_millisecond => 90000
         },
         Info
     ),
     {ok, Keepalive1} = emqx_keepalive:check(1, Keepalive),
     ?assertEqual(1, emqx_keepalive:info(statval, Keepalive1)),
-    ?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive1)).
+    {ok, Keepalive2} = emqx_keepalive:check(1, Keepalive1),
+    ?assertEqual(1, emqx_keepalive:info(statval, Keepalive2)),
+    {ok, Keepalive3} = emqx_keepalive:check(1, Keepalive2),
+    ?assertEqual(1, emqx_keepalive:info(statval, Keepalive3)),
+    ?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive3)),
+
+    Keepalive4 = emqx_keepalive:init(90),
+    ?assertEqual(30000, emqx_keepalive:info(check_interval, Keepalive4)),
+
+    Keepalive5 = emqx_keepalive:init(1),
+    ?assertEqual(1000, emqx_keepalive:info(check_interval, Keepalive5)),
+    ok.
+
+keepalive_multiplier() ->
+    emqx_config:get_zone_conf(default, [mqtt, keepalive_multiplier]).
+
+keepalive_check_interval() ->
+    emqx_config:get_zone_conf(default, [mqtt, keepalive_check_interval]).
+
+receive_msg_in_time(ChannelPid, C, Timeout) ->
+    receive
+        {'EXIT', ChannelPid, {shutdown, keepalive_timeout}} ->
+            receive
+                {'EXIT', C, {shutdown, tcp_closed}} ->
+                    ok
+            after 500 ->
+                throw(no_tcp_closed_from_mqtt_client)
+            end
+    after Timeout ->
+        no_keepalive_timeout_received
+    end.

+ 4 - 4
apps/emqx_gateway_coap/src/emqx_coap_channel.erl

@@ -85,7 +85,7 @@
 
 -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
 
--define(DEF_IDLE_TIME, timer:seconds(30)).
+-define(DEF_IDLE_SECONDS, 30).
 
 -import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]).
 
@@ -149,7 +149,7 @@ init(
             mountpoint => Mountpoint
         }
     ),
-    Heartbeat = maps:get(heartbeat, Config, ?DEF_IDLE_TIME),
+    Heartbeat = maps:get(heartbeat, Config, ?DEF_IDLE_SECONDS),
     #channel{
         ctx = Ctx,
         conninfo = ConnInfo,
@@ -378,7 +378,7 @@ ensure_keepalive_timer(Channel) ->
     ensure_keepalive_timer(fun ensure_timer/4, Channel).
 
 ensure_keepalive_timer(Fun, #channel{keepalive = KeepAlive} = Channel) ->
-    Heartbeat = emqx_keepalive:info(interval, KeepAlive),
+    Heartbeat = emqx_keepalive:info(check_interval, KeepAlive),
     Fun(keepalive, Heartbeat, keepalive, Channel).
 
 check_auth_state(Msg, #channel{connection_required = false} = Channel) ->
@@ -495,7 +495,7 @@ enrich_conninfo(
 ) ->
     case Queries of
         #{<<"clientid">> := ClientId} ->
-            Interval = maps:get(interval, emqx_keepalive:info(KeepAlive)),
+            Interval = emqx_keepalive:info(check_interval, KeepAlive),
             NConnInfo = ConnInfo#{
                 clientid => ClientId,
                 proto_name => <<"CoAP">>,

+ 1 - 7
apps/emqx_gateway_coap/src/emqx_coap_schema.erl

@@ -19,12 +19,6 @@
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("typerefl/include/types.hrl").
 
--type duration() :: non_neg_integer().
-
--typerefl_from_string({duration/0, emqx_schema, to_duration}).
-
--reflect_type([duration/0]).
-
 %% config schema provides
 -export([namespace/0, fields/1, desc/1]).
 
@@ -34,7 +28,7 @@ fields(coap) ->
     [
         {heartbeat,
             sc(
-                duration(),
+                emqx_schema:duration_s(),
                 #{
                     default => <<"30s">>,
                     desc => ?DESC(coap_heartbeat)

+ 1 - 1
apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_coap, [
     {description, "CoAP Gateway"},
-    {vsn, "0.1.8"},
+    {vsn, "0.1.9"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 4 - 3
apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl

@@ -100,7 +100,7 @@ init_per_testcase(t_heartbeat, Config) ->
     OldConf = emqx:get_raw_config([gateway, coap]),
     {ok, _} = emqx_gateway_conf:update_gateway(
         coap,
-        OldConf#{<<"heartbeat">> => <<"800ms">>}
+        OldConf#{<<"heartbeat">> => <<"1s">>}
     ),
     [
         {old_conf, OldConf},
@@ -216,8 +216,9 @@ t_heartbeat(Config) ->
             [],
             emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
         ),
-
-        timer:sleep(Heartbeat * 2),
+        %% The minimum timeout time is 1 second.
+        %% 1.5 * Heartbeat + 0.5 * Heartbeat(< 1s) = 1.5 * 1 + 1 = 2.5
+        timer:sleep(Heartbeat * 2 + 1000),
         ?assertEqual(
             [],
             emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)

+ 2 - 2
apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl

@@ -715,7 +715,7 @@ ensure_keepalive_timer(Interval, Channel) when Interval =< 0 ->
     Channel;
 ensure_keepalive_timer(Interval, Channel) ->
     StatVal = emqx_gateway_conn:keepalive_stats(recv),
-    Keepalive = emqx_keepalive:init(StatVal, timer:seconds(Interval)),
+    Keepalive = emqx_keepalive:init(default, StatVal, Interval),
     ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}).
 
 ensure_timer(Name, Channel = #channel{timers = Timers}) ->
@@ -746,7 +746,7 @@ interval(force_close_idle, #channel{conninfo = #{idle_timeout := IdleTimeout}})
 interval(force_close, _) ->
     15000;
 interval(keepalive, #channel{keepalive = Keepalive}) ->
-    emqx_keepalive:info(interval, Keepalive).
+    emqx_keepalive:info(check_interval, Keepalive).
 
 %%--------------------------------------------------------------------
 %% Dispatch

+ 1 - 1
apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_exproto, [
     {description, "ExProto Gateway"},
-    {vsn, "0.1.10"},
+    {vsn, "0.1.11"},
     {registered, []},
     {applications, [kernel, stdlib, grpc, emqx, emqx_gateway]},
     {env, []},

+ 1 - 1
apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_gbt32960, [
     {description, "GBT32960 Gateway"},
-    {vsn, "0.1.2"},
+    {vsn, "0.1.3"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 1 - 1
apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl

@@ -506,7 +506,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) ->
     Channel#channel{timers = maps:remove(Name, Timers)}.
 
 interval(alive_timer, #channel{keepalive = KeepAlive}) ->
-    emqx_keepalive:info(interval, KeepAlive);
+    emqx_keepalive:info(check_interval, KeepAlive);
 interval(retry_timer, #channel{retx_interval = RetxIntv}) ->
     RetxIntv.
 

+ 1 - 1
apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_jt808, [
     {description, "JT/T 808 Gateway"},
-    {vsn, "0.0.3"},
+    {vsn, "0.1.0"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 1 - 1
apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl

@@ -616,7 +616,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) ->
     Channel#channel{timers = maps:remove(Name, Timers)}.
 
 interval(alive_timer, #channel{keepalive = KeepAlive}) ->
-    emqx_keepalive:info(interval, KeepAlive);
+    emqx_keepalive:info(check_interval, KeepAlive);
 interval(retry_timer, #channel{retx_interval = RetxIntv}) ->
     RetxIntv.
 

+ 1 - 1
apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_mqttsn, [
     {description, "MQTT-SN Gateway"},
-    {vsn, "0.2.0"},
+    {vsn, "0.2.1"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 2 - 2
apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl

@@ -430,7 +430,7 @@ ensure_keepalive(Channel = #channel{conninfo = ConnInfo}) ->
 ensure_keepalive_timer(0, Channel) ->
     Channel;
 ensure_keepalive_timer(Interval, Channel) ->
-    Keepalive = emqx_keepalive:init(round(timer:seconds(Interval))),
+    Keepalive = emqx_keepalive:init(Interval),
     ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}).
 
 %%--------------------------------------------------------------------
@@ -2245,7 +2245,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) ->
     Channel#channel{timers = maps:remove(Name, Timers)}.
 
 interval(keepalive, #channel{keepalive = KeepAlive}) ->
-    emqx_keepalive:info(interval, KeepAlive);
+    emqx_keepalive:info(check_interval, KeepAlive);
 interval(retry_delivery, #channel{session = Session}) ->
     emqx_mqttsn_session:info(retry_interval, Session);
 interval(expire_awaiting_rel, #channel{session = Session}) ->

+ 1 - 1
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -1109,7 +1109,7 @@ t_keepalive(_Config) ->
     [Pid] = emqx_cm:lookup_channels(list_to_binary(ClientId)),
     %% will reset to max keepalive if keepalive > max keepalive
     #{conninfo := #{keepalive := InitKeepalive}} = emqx_connection:info(Pid),
-    ?assertMatch({keepalive, 65535000, _}, element(5, element(9, sys:get_state(Pid)))),
+    ?assertMatch({keepalive, _, _, _, 65536500}, element(5, element(9, sys:get_state(Pid)))),
 
     {ok, NewClient} = emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body),
     #{<<"keepalive">> := 11} = emqx_utils_json:decode(NewClient, [return_maps]),

+ 9 - 0
rel/i18n/emqx_schema.hocon

@@ -855,6 +855,15 @@ The default value 1.5 is following the MQTT 5.0 specification. This multiplier i
 mqtt_keepalive_multiplier.label:
 """Keep Alive Multiplier"""
 
+mqtt_keepalive_check_interval.desc:
+"""The frequency of checking for incoming MQTT packets determines how often the server will check for new MQTT packets.
+If a certain amount of time passes without any packets being sent from the client,this time will be added up.
+Once the accumulated time exceeds the keepalive interval * the keepalive multiplier, the connection will be terminated.
+The default is set to 30 seconds, with a minimum value of 1 second and a maximum value of Interval/2."""
+
+mqtt_keepalive_check_interval.label:
+"""Keep Alive Check Interval"""
+
 force_gc_bytes.desc:
 """GC the process after specified number of bytes have passed through."""