Browse Source

feat: hide keepalive_backoff, introduce keepalive_multiplier

Zhongwen Deng 2 years ago
parent
commit
e26ce5816e

+ 6 - 5
apps/emqx/src/emqx_channel.erl

@@ -1199,12 +1199,13 @@ handle_call(list_authz_cache, Channel) ->
 handle_call(
     {keepalive, Interval},
     Channel = #channel{
-        keepalive = KeepAlive,
+        keepalive = _KeepAlive,
         conninfo = ConnInfo
     }
 ) ->
     ClientId = info(clientid, Channel),
-    NKeepalive = emqx_keepalive:set(interval, Interval * 1000, KeepAlive),
+    RecvCnt = emqx_pd:get_counter(recv_pkt),
+    NKeepalive = emqx_keepalive:init(RecvCnt, Interval * 1000),
     NConnInfo = maps:put(keepalive, Interval, ConnInfo),
     NChannel = Channel#channel{keepalive = NKeepalive, conninfo = NConnInfo},
     SockInfo = maps:get(sockinfo, emqx_cm:get_chan_info(ClientId), #{}),
@@ -2025,9 +2026,9 @@ ensure_keepalive_timer(0, Channel) ->
 ensure_keepalive_timer(disabled, Channel) ->
     Channel;
 ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) ->
-    Backoff = get_mqtt_conf(Zone, keepalive_backoff),
-    RecvOct = emqx_pd:get_counter(incoming_bytes),
-    Keepalive = emqx_keepalive:init(RecvOct, round(timer:seconds(Interval) * Backoff)),
+    Multiplier = get_mqtt_conf(Zone, keepalive_multiplier),
+    RecvCnt = emqx_pd:get_counter(recv_pkt),
+    Keepalive = emqx_keepalive:init(RecvCnt, round(timer:seconds(Interval) * Multiplier)),
     ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
 
 clear_keepalive(Channel = #channel{timers = Timers}) ->

+ 26 - 48
apps/emqx/src/emqx_keepalive.erl

@@ -21,8 +21,7 @@
     init/2,
     info/1,
     info/2,
-    check/2,
-    set/3
+    check/2
 ]).
 
 -elvis([{elvis_style, no_if_expression, disable}]).
@@ -31,78 +30,57 @@
 
 -record(keepalive, {
     interval :: pos_integer(),
-    statval :: non_neg_integer(),
-    repeat :: non_neg_integer()
+    statval :: non_neg_integer()
 }).
 
 -opaque keepalive() :: #keepalive{}.
+-define(MAX_INTERVAL, 65535000).
 
 %% @doc Init keepalive.
 -spec init(Interval :: non_neg_integer()) -> keepalive().
 init(Interval) -> init(0, Interval).
 
+%% from mqtt-v3.1.1 specific
+%% A Keep Alive value of zero (0) has the effect of turning off the keep alive mechanism.
+%% This means that, in this case, the Server is not required
+%% to disconnect the Client on the grounds of inactivity.
+%% Note that a Server is permitted to disconnect a Client that it determines
+%% to be inactive or non-responsive at any time,
+%% regardless of the Keep Alive value provided by that Client.
+%%  Non normative comment
+%%The actual value of the Keep Alive is application specific;
+%% typically this is a few minutes.
+%% The maximum value is (65535s) 18 hours 12 minutes and 15 seconds.
 %% @doc Init keepalive.
 -spec init(StatVal :: non_neg_integer(), Interval :: non_neg_integer()) -> keepalive().
-init(StatVal, Interval) when Interval > 0 ->
-    #keepalive{
-        interval = Interval,
-        statval = StatVal,
-        repeat = 0
-    }.
+init(StatVal, Interval) when Interval > 0 andalso Interval =< ?MAX_INTERVAL ->
+    #keepalive{interval = Interval, statval = StatVal};
+init(_, 0) ->
+    undefined;
+init(StatVal, Interval) when Interval > ?MAX_INTERVAL -> init(StatVal, ?MAX_INTERVAL).
 
 %% @doc Get Info of the keepalive.
 -spec info(keepalive()) -> emqx_types:infos().
 info(#keepalive{
     interval = Interval,
-    statval = StatVal,
-    repeat = Repeat
+    statval = StatVal
 }) ->
     #{
         interval => Interval,
-        statval => StatVal,
-        repeat => Repeat
+        statval => StatVal
     }.
 
--spec info(interval | statval | repeat, keepalive()) ->
+-spec info(interval | statval, keepalive()) ->
     non_neg_integer().
 info(interval, #keepalive{interval = Interval}) ->
     Interval;
 info(statval, #keepalive{statval = StatVal}) ->
     StatVal;
-info(repeat, #keepalive{repeat = Repeat}) ->
-    Repeat.
+info(interval, undefined) ->
+    0.
 
 %% @doc Check keepalive.
 -spec check(non_neg_integer(), keepalive()) ->
     {ok, keepalive()} | {error, timeout}.
-check(
-    NewVal,
-    KeepAlive = #keepalive{
-        statval = OldVal,
-        repeat = Repeat
-    }
-) ->
-    if
-        NewVal =/= OldVal ->
-            {ok, KeepAlive#keepalive{statval = NewVal, repeat = 0}};
-        Repeat < 1 ->
-            {ok, KeepAlive#keepalive{repeat = Repeat + 1}};
-        true ->
-            {error, timeout}
-    end.
-
-%% from mqtt-v3.1.1 specific
-%% A Keep Alive value of zero (0) has the effect of turning off the keep alive mechanism.
-%% This means that, in this case, the Server is not required
-%% to disconnect the Client on the grounds of inactivity.
-%% Note that a Server is permitted to disconnect a Client that it determines
-%% to be inactive or non-responsive at any time,
-%% regardless of the Keep Alive value provided by that Client.
-%%  Non normative comment
-%%The actual value of the Keep Alive is application specific;
-%% typically this is a few minutes.
-%% The maximum value is (65535s) 18 hours 12 minutes and 15 seconds.
-%% @doc Update keepalive's interval
--spec set(interval, non_neg_integer(), keepalive()) -> keepalive().
-set(interval, Interval, KeepAlive) when Interval >= 0 andalso Interval =< 65535000 ->
-    KeepAlive#keepalive{interval = Interval}.
+check(Val, #keepalive{statval = Val}) -> {error, timeout};
+check(Val, KeepAlive) -> {ok, KeepAlive#keepalive{statval = Val}}.

+ 43 - 3
apps/emqx/src/emqx_schema.erl

@@ -77,6 +77,7 @@
     validate_heap_size/1,
     user_lookup_fun_tr/2,
     validate_alarm_actions/1,
+    validate_keepalive_multiplier/1,
     non_empty_string/1,
     validations/0,
     naive_env_interpolation/1
@@ -109,7 +110,8 @@
     servers_validator/2,
     servers_sc/2,
     convert_servers/1,
-    convert_servers/2
+    convert_servers/2,
+    mqtt_converter/2
 ]).
 
 %% tombstone types
@@ -150,6 +152,8 @@
 
 -define(BIT(Bits), (1 bsl (Bits))).
 -define(MAX_UINT(Bits), (?BIT(Bits) - 1)).
+-define(DEFAULT_MULTIPLIER, 1.5).
+-define(DEFAULT_BACKOFF, 0.75).
 
 namespace() -> broker.
 
@@ -172,6 +176,7 @@ roots(high) ->
                 ref("mqtt"),
                 #{
                     desc => ?DESC(mqtt),
+                    converter => fun ?MODULE:mqtt_converter/2,
                     importance => ?IMPORTANCE_MEDIUM
                 }
             )},
@@ -522,8 +527,19 @@ fields("mqtt") ->
             sc(
                 number(),
                 #{
-                    default => 0.75,
-                    desc => ?DESC(mqtt_keepalive_backoff)
+                    default => ?DEFAULT_BACKOFF,
+                    %% Must add required => false, zone schema has no default.
+                    required => false,
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )},
+        {"keepalive_multiplier",
+            sc(
+                number(),
+                #{
+                    default => ?DEFAULT_MULTIPLIER,
+                    validator => fun ?MODULE:validate_keepalive_multiplier/1,
+                    desc => ?DESC(mqtt_keepalive_multiplier)
                 }
             )},
         {"max_subscriptions",
@@ -2744,6 +2760,13 @@ validate_heap_size(Siz) when is_integer(Siz) ->
 validate_heap_size(_SizStr) ->
     {error, invalid_heap_size}.
 
+validate_keepalive_multiplier(Multiplier) when
+    is_number(Multiplier) andalso Multiplier >= 1.0 andalso Multiplier =< 65535.0
+->
+    ok;
+validate_keepalive_multiplier(_Multiplier) ->
+    {error, #{reason => keepalive_multiplier_out_of_range, min => 1, max => 65535}}.
+
 validate_alarm_actions(Actions) ->
     UnSupported = lists:filter(
         fun(Action) -> Action =/= log andalso Action =/= publish end, Actions
@@ -3385,3 +3408,20 @@ ensure_default_listener(Map, ListenerType) ->
 
 cert_file(_File, client) -> undefined;
 cert_file(File, server) -> iolist_to_binary(filename:join(["${EMQX_ETC_DIR}", "certs", File])).
+
+mqtt_converter(#{<<"keepalive_multiplier">> := Multi} = Mqtt, _Opts) ->
+    case round(Multi * 100) =:= round(?DEFAULT_MULTIPLIER * 100) of
+        false ->
+            %% Multiplier is provided, and it's not default value
+            Mqtt;
+        true ->
+            %% Multiplier is default value, fallback to use Backoff value
+            %% Backoff default value was half of Multiplier default value
+            %% so there is no need to compare Backoff with its default.
+            Backoff = maps:get(<<"keepalive_backoff">>, Mqtt, ?DEFAULT_BACKOFF),
+            Mqtt#{<<"keepalive_multiplier">> => Backoff * 2}
+    end;
+mqtt_converter(#{<<"keepalive_backoff">> := Backoff} = Mqtt, _Opts) ->
+    Mqtt#{<<"keepalive_multiplier">> => Backoff * 2};
+mqtt_converter(Mqtt, _Opts) ->
+    Mqtt.

+ 2 - 8
apps/emqx/test/emqx_keepalive_SUITE.erl

@@ -27,20 +27,14 @@ t_check(_) ->
     Keepalive = emqx_keepalive:init(60),
     ?assertEqual(60, emqx_keepalive:info(interval, Keepalive)),
     ?assertEqual(0, emqx_keepalive:info(statval, Keepalive)),
-    ?assertEqual(0, emqx_keepalive:info(repeat, Keepalive)),
     Info = emqx_keepalive:info(Keepalive),
     ?assertEqual(
         #{
             interval => 60,
-            statval => 0,
-            repeat => 0
+            statval => 0
         },
         Info
     ),
     {ok, Keepalive1} = emqx_keepalive:check(1, Keepalive),
     ?assertEqual(1, emqx_keepalive:info(statval, Keepalive1)),
-    ?assertEqual(0, emqx_keepalive:info(repeat, Keepalive1)),
-    {ok, Keepalive2} = emqx_keepalive:check(1, Keepalive1),
-    ?assertEqual(1, emqx_keepalive:info(statval, Keepalive2)),
-    ?assertEqual(1, emqx_keepalive:info(repeat, Keepalive2)),
-    ?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive2)).
+    ?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive1)).

+ 37 - 0
apps/emqx/test/emqx_schema_tests.erl

@@ -655,6 +655,43 @@ password_converter_test() ->
     ?assertThrow("must_quote", emqx_schema:password_converter(foobar, #{})),
     ok.
 
+-define(MQTT(B, M), #{<<"keepalive_backoff">> => B, <<"keepalive_multiplier">> => M}).
+
+keepalive_convert_test() ->
+    ?assertEqual(undefined, emqx_schema:mqtt_converter(undefined, #{})),
+    DefaultBackoff = 0.75,
+    DefaultMultiplier = 1.5,
+    Default = ?MQTT(DefaultBackoff, DefaultMultiplier),
+    ?assertEqual(Default, emqx_schema:mqtt_converter(Default, #{})),
+    ?assertEqual(?MQTT(1.5, 3), emqx_schema:mqtt_converter(?MQTT(1.5, 3), #{})),
+    ?assertEqual(
+        ?MQTT(DefaultBackoff, 3), emqx_schema:mqtt_converter(?MQTT(DefaultBackoff, 3), #{})
+    ),
+    ?assertEqual(?MQTT(1, 2), emqx_schema:mqtt_converter(?MQTT(1, DefaultMultiplier), #{})),
+    ?assertEqual(?MQTT(1.5, 3), emqx_schema:mqtt_converter(?MQTT(1.5, 3), #{})),
+
+    ?assertEqual(#{}, emqx_schema:mqtt_converter(#{}, #{})),
+    ?assertEqual(
+        #{<<"keepalive_backoff">> => 1.5, <<"keepalive_multiplier">> => 3.0},
+        emqx_schema:mqtt_converter(#{<<"keepalive_backoff">> => 1.5}, #{})
+    ),
+    ?assertEqual(
+        #{<<"keepalive_multiplier">> => 5.0},
+        emqx_schema:mqtt_converter(#{<<"keepalive_multiplier">> => 5.0}, #{})
+    ),
+    ?assertEqual(
+        #{
+            <<"keepalive_backoff">> => DefaultBackoff,
+            <<"keepalive_multiplier">> => DefaultMultiplier
+        },
+        emqx_schema:mqtt_converter(#{<<"keepalive_backoff">> => DefaultBackoff}, #{})
+    ),
+    ?assertEqual(
+        #{<<"keepalive_multiplier">> => DefaultMultiplier},
+        emqx_schema:mqtt_converter(#{<<"keepalive_multiplier">> => DefaultMultiplier}, #{})
+    ),
+    ok.
+
 url_type_test_() ->
     [
         ?_assertEqual(

+ 2 - 1
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -546,7 +546,8 @@ fields(authz_cache) ->
     ];
 fields(keepalive) ->
     [
-        {interval, hoconsc:mk(integer(), #{desc => <<"Keepalive time, with the unit of second">>})}
+        {interval,
+            hoconsc:mk(range(0, 65535), #{desc => <<"Keepalive time, with the unit of second">>})}
     ];
 fields(subscribe) ->
     [

+ 20 - 2
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -244,13 +244,31 @@ t_keepalive(_Config) ->
     Body = #{interval => 11},
     {error, {"HTTP/1.1", 404, "Not Found"}} =
         emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body),
-    {ok, C1} = emqtt:start_link(#{username => Username, clientid => ClientId}),
+    %% 65535 is the max value of keepalive
+    MaxKeepalive = 65535,
+    InitKeepalive = round(MaxKeepalive / 1.5 + 1),
+    {ok, C1} = emqtt:start_link(#{
+        username => Username, clientid => ClientId, keepalive => InitKeepalive
+    }),
     {ok, _} = emqtt:connect(C1),
+    [Pid] = emqx_cm:lookup_channels(list_to_binary(ClientId)),
+    %% will reset to max keepalive if keepalive > max keepalive
+    #{conninfo := #{keepalive := InitKeepalive}} = emqx_connection:info(Pid),
+    ?assertMatch({keepalive, 65535000, _}, element(5, element(9, sys:get_state(Pid)))),
+
     {ok, NewClient} = emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body),
     #{<<"keepalive">> := 11} = emqx_utils_json:decode(NewClient, [return_maps]),
-    [Pid] = emqx_cm:lookup_channels(list_to_binary(ClientId)),
     #{conninfo := #{keepalive := Keepalive}} = emqx_connection:info(Pid),
     ?assertEqual(11, Keepalive),
+    %% Disable keepalive
+    Body1 = #{interval => 0},
+    {ok, NewClient1} = emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body1),
+    #{<<"keepalive">> := 0} = emqx_utils_json:decode(NewClient1, [return_maps]),
+    ?assertMatch(#{conninfo := #{keepalive := 0}}, emqx_connection:info(Pid)),
+    %% Maximal keepalive
+    Body2 = #{interval => 65536},
+    {error, {"HTTP/1.1", 400, _}} =
+        emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body2),
     emqtt:disconnect(C1),
     ok.
 

+ 4 - 0
changes/ce/feat-10702.en.md

@@ -0,0 +1,4 @@
+Introduce a more straightforward configuration option `keepalive_multiplier` and
+deprecate the old `keepalive_backoff` configuration.
+After this enhancement, EMQX checks the client's keepalive timeout status
+period by multiplying the "Client Requested Keepalive Interval" with `keepalive_multiplier`.

+ 6 - 5
rel/i18n/emqx_schema.hocon

@@ -799,7 +799,7 @@ fields_tcp_opts_high_watermark.desc:
 by the VM socket implementation reaches this limit."""
 
 fields_tcp_opts_high_watermark.label:
-"""TCP 高水位线"""
+"""TCP high watermark"""
 
 fields_mqtt_quic_listener_stateless_operation_expiration_ms.desc:
 """The time limit between operations for the same endpoint, in milliseconds. Default: 100"""
@@ -885,11 +885,12 @@ and an MQTT message is published to the system topic <code>$SYS/sysmon/long_sche
 sysmon_vm_long_schedule.label:
 """Enable Long Schedule monitoring."""
 
-mqtt_keepalive_backoff.desc:
-"""The coefficient EMQX uses to confirm whether the keep alive duration of the client expires. Formula: Keep Alive * Backoff * 2"""
+mqtt_keepalive_multiplier.desc:
+"""Keep-Alive Timeout = Keep-Alive interval × Keep-Alive Multiplier.
+The default value 1.5 is following the MQTT 5.0 specification. This multiplier is adjustable, providing system administrators flexibility for tailoring to their specific needs. For instance, if a client's 10-second Keep-Alive interval PINGREQ gets delayed by an extra 10 seconds, changing the multiplier to 2 lets EMQX tolerate this delay."""
 
-mqtt_keepalive_backoff.label:
-"""Keep Alive Backoff"""
+mqtt_keepalive_multiplier.label:
+"""Keep Alive Multiplier"""
 
 force_gc_bytes.desc:
 """GC the process after specified number of bytes have passed through."""