Преглед изворни кода

feat(connection): configurable TCP keepalive

Fixes https://emqx.atlassian.net/browse/EMQX-9852
Paulo Zulato пре 2 година
родитељ
комит
a4407764f3

+ 43 - 6
apps/emqx/src/emqx_connection.erl

@@ -49,7 +49,7 @@
 
 -export([
     async_set_keepalive/3,
-    async_set_keepalive/4,
+    async_set_keepalive/5,
     async_set_socket_options/2
 ]).
 
@@ -273,16 +273,30 @@ stats(#state{
 %% NOTE: This API sets TCP socket options, which has nothing to do with
 %%       the MQTT layer's keepalive (PINGREQ and PINGRESP).
 async_set_keepalive(Idle, Interval, Probes) ->
-    async_set_keepalive(self(), Idle, Interval, Probes).
+    async_set_keepalive(os:type(), self(), Idle, Interval, Probes).
 
-async_set_keepalive(Pid, Idle, Interval, Probes) ->
+async_set_keepalive({unix, linux}, Pid, Idle, Interval, Probes) ->
     Options = [
         {keepalive, true},
         {raw, 6, 4, <<Idle:32/native>>},
         {raw, 6, 5, <<Interval:32/native>>},
         {raw, 6, 6, <<Probes:32/native>>}
     ],
-    async_set_socket_options(Pid, Options).
+    async_set_socket_options(Pid, Options);
+async_set_keepalive({unix, darwin}, Pid, Idle, Interval, Probes) ->
+    Options = [
+        {keepalive, true},
+        {raw, 6, 16#10, <<Idle:32/native>>},
+        {raw, 6, 16#101, <<Interval:32/native>>},
+        {raw, 6, 16#102, <<Probes:32/native>>}
+    ],
+    async_set_socket_options(Pid, Options);
+async_set_keepalive(OS, _Pid, _Idle, _Interval, _Probes) ->
+    ?SLOG(warning, #{
+        msg => "Unsupported operation: set TCP keepalive",
+        os => OS
+    }),
+    ok.
 
 %% @doc Set custom socket options.
 %% This API is made async because the call might be originated from
@@ -353,6 +367,9 @@ init_state(
             false -> disabled
         end,
     IdleTimeout = emqx_channel:get_mqtt_conf(Zone, idle_timeout),
+
+    set_tcp_keepalive(Listener),
+
     IdleTimer = start_timer(IdleTimeout, idle_timeout),
     #state{
         transport = Transport,
@@ -948,8 +965,15 @@ handle_cast(
     }
 ) ->
     case Transport:setopts(Socket, Opts) of
-        ok -> ?tp(info, "custom_socket_options_successfully", #{opts => Opts});
-        Err -> ?tp(error, "failed_to_set_custom_socket_optionn", #{reason => Err})
+        ok ->
+            ?tp(debug, "custom_socket_options_successfully", #{opts => Opts});
+        {error, einval} ->
+            %% socket is already closed, ignore this error
+            ?tp(debug, "socket already closed", #{reason => socket_already_closed}),
+            ok;
+        Err ->
+            %% other errors
+            ?tp(error, "failed_to_set_custom_socket_option", #{reason => Err})
     end,
     State;
 handle_cast(Req, State) ->
@@ -1199,6 +1223,19 @@ inc_counter(Key, Inc) ->
     _ = emqx_pd:inc_counter(Key, Inc),
     ok.
 
+set_tcp_keepalive({quic, _Listener}) ->
+    ok;
+set_tcp_keepalive({Type, Id}) ->
+    Conf = emqx_config:get_listener_conf(Type, Id, [tcp_options, keepalive], <<"none">>),
+    case iolist_to_binary(Conf) of
+        <<"none">> ->
+            ok;
+        Value ->
+            %% the value is already validated by schema, so we do not validate it again.
+            {Idle, Interval, Probes} = emqx_schema:parse_tcp_keepalive(Value),
+            async_set_keepalive(Idle, Interval, Probes)
+    end.
+
 %%--------------------------------------------------------------------
 %% For CT tests
 %%--------------------------------------------------------------------

+ 1 - 1
apps/emqx/src/emqx_listeners.erl

@@ -786,7 +786,7 @@ ssl_opts(Opts) ->
 tcp_opts(Opts) ->
     maps:to_list(
         maps:without(
-            [active_n],
+            [active_n, keepalive],
             maps:get(tcp_options, Opts, #{})
         )
     ).

+ 50 - 1
apps/emqx/src/emqx_schema.erl

@@ -95,7 +95,9 @@
     non_empty_string/1,
     validations/0,
     naive_env_interpolation/1,
-    validate_server_ssl_opts/1
+    validate_server_ssl_opts/1,
+    validate_tcp_keepalive/1,
+    parse_tcp_keepalive/1
 ]).
 
 -export([qos/0]).
@@ -1389,6 +1391,15 @@ fields("tcp_opts") ->
                     default => true,
                     desc => ?DESC(fields_tcp_opts_reuseaddr)
                 }
+            )},
+        {"keepalive",
+            sc(
+                string(),
+                #{
+                    default => <<"none">>,
+                    desc => ?DESC(fields_tcp_opts_keepalive),
+                    validator => fun validate_tcp_keepalive/1
+                }
             )}
     ];
 fields("listener_ssl_opts") ->
@@ -2842,6 +2853,44 @@ validate_alarm_actions(Actions) ->
         Error -> {error, Error}
     end.
 
+validate_tcp_keepalive(Value) ->
+    case iolist_to_binary(Value) of
+        <<"none">> ->
+            ok;
+        _ ->
+            _ = parse_tcp_keepalive(Value),
+            ok
+    end.
+
+%% @doc This function is used as value validator and also run-time parser.
+parse_tcp_keepalive(Str) ->
+    try
+        [Idle, Interval, Probes] = binary:split(iolist_to_binary(Str), <<",">>, [global]),
+        %% use 10 times the Linux defaults as range limit
+        IdleInt = parse_ka_int(Idle, "Idle", 1, 7200_0),
+        IntervalInt = parse_ka_int(Interval, "Interval", 1, 75_0),
+        ProbesInt = parse_ka_int(Probes, "Probes", 1, 9_0),
+        {IdleInt, IntervalInt, ProbesInt}
+    catch
+        error:_ ->
+            throw(#{
+                reason => "Not comma separated positive integers of 'Idle,Interval,Probes' format",
+                value => Str
+            })
+    end.
+
+parse_ka_int(Bin, Name, Min, Max) ->
+    I = binary_to_integer(string:trim(Bin)),
+    case I >= Min andalso I =< Max of
+        true ->
+            I;
+        false ->
+            Msg = io_lib:format("TCP-Keepalive '~s' value must be in the rage of [~p, ~p].", [
+                Name, Min, Max
+            ]),
+            throw(#{reason => lists:flatten(Msg), value => I})
+    end.
+
 user_lookup_fun_tr(Lookup, #{make_serializable := true}) ->
     fmt_user_lookup_fun(Lookup);
 user_lookup_fun_tr(Lookup, _) ->

+ 14 - 12
apps/emqx/test/emqx_mqtt_SUITE.erl

@@ -219,13 +219,15 @@ t_async_set_keepalive('end', _Config) ->
 t_async_set_keepalive(_) ->
     case os:type() of
         {unix, darwin} ->
-            %% Mac OSX don't support the feature
-            ok;
+            do_async_set_keepalive(16#10, 16#101, 16#102);
+        {unix, linux} ->
+            do_async_set_keepalive(4, 5, 6);
         _ ->
-            do_async_set_keepalive()
+            %% don't support the feature on other OS
+            ok
     end.
 
-do_async_set_keepalive() ->
+do_async_set_keepalive(OptKeepIdle, OptKeepInterval, OptKeepCount) ->
     ClientID = <<"client-tcp-keepalive">>,
     {ok, Client} = emqtt:start_link([
         {host, "localhost"},
@@ -247,19 +249,19 @@ do_async_set_keepalive() ->
     Transport = maps:get(transport, State),
     Socket = maps:get(socket, State),
     ?assert(is_port(Socket)),
-    Opts = [{raw, 6, 4, 4}, {raw, 6, 5, 4}, {raw, 6, 6, 4}],
+    Opts = [{raw, 6, OptKeepIdle, 4}, {raw, 6, OptKeepInterval, 4}, {raw, 6, OptKeepCount, 4}],
     {ok, [
-        {raw, 6, 4, <<Idle:32/native>>},
-        {raw, 6, 5, <<Interval:32/native>>},
-        {raw, 6, 6, <<Probes:32/native>>}
+        {raw, 6, OptKeepIdle, <<Idle:32/native>>},
+        {raw, 6, OptKeepInterval, <<Interval:32/native>>},
+        {raw, 6, OptKeepCount, <<Probes:32/native>>}
     ]} = Transport:getopts(Socket, Opts),
     ct:pal("Idle=~p, Interval=~p, Probes=~p", [Idle, Interval, Probes]),
-    emqx_connection:async_set_keepalive(Pid, Idle + 1, Interval + 1, Probes + 1),
+    emqx_connection:async_set_keepalive(os:type(), Pid, Idle + 1, Interval + 1, Probes + 1),
     {ok, _} = ?block_until(#{?snk_kind := "custom_socket_options_successfully"}, 1000),
     {ok, [
-        {raw, 6, 4, <<NewIdle:32/native>>},
-        {raw, 6, 5, <<NewInterval:32/native>>},
-        {raw, 6, 6, <<NewProbes:32/native>>}
+        {raw, 6, OptKeepIdle, <<NewIdle:32/native>>},
+        {raw, 6, OptKeepInterval, <<NewInterval:32/native>>},
+        {raw, 6, OptKeepCount, <<NewProbes:32/native>>}
     ]} = Transport:getopts(Socket, Opts),
     ?assertEqual(NewIdle, Idle + 1),
     ?assertEqual(NewInterval, Interval + 1),

+ 2 - 1
apps/emqx_gateway/src/emqx_gateway_api_listeners.erl

@@ -723,7 +723,8 @@ examples_listener() ->
                                 buffer => <<"10KB">>,
                                 high_watermark => <<"1MB">>,
                                 nodelay => false,
-                                reuseaddr => true
+                                reuseaddr => true,
+                                keepalive => "none"
                             }
                     }
             },

+ 1 - 0
changes/ce/feat-10933.en.md

@@ -0,0 +1 @@
+Add support for configuring TCP keep-alive in MQTT/TCP and MQTT/SSL listeners

+ 14 - 0
rel/i18n/emqx_schema.hocon

@@ -975,6 +975,20 @@ fields_tcp_opts_nodelay.desc:
 fields_tcp_opts_nodelay.label:
 """TCP_NODELAY"""
 
+fields_tcp_opts_keepalive.desc:
+"""
+Enable TCP keepalive for MQTT connections over TCP or SSL.
+The value is three comma separated numbers in the format of 'Idle,Interval,Probes'
+ - Idle: The number of seconds a connection needs to be idle before the server begins to send out keep-alive probes (Linux default 7200).
+ - Interval: The number of seconds between TCP keep-alive probes (Linux default 75).
+ - Probes: The maximum number of TCP keep-alive probes to send before giving up and killing the connection if no response is obtained from the other end (Linux default 9).
+For example "240,30,5" means: EMQX should start sending TCP keepalive probes after the connection is in idle for 240 seconds, and the probes are sent every 30 seconds until a response is received from the MQTT client, if it misses 5 consecutive responses, EMQX should close the connection.
+Default: 'none'
+"""
+
+fields_tcp_opts_keepalive.label:
+"""TCP keepalive options"""
+
 sysmon_top_db_username.desc:
 """Username of the PostgreSQL database"""