Просмотр исходного кода

feat: async API to support tcp keepalive inet options

Zaiming Shi 4 лет назад
Родитель
Сommit
6701d716dd
3 измененных файлов с 111 добавлено и 1 удалено
  1. 1 0
      src/emqx_cm.erl
  2. 58 1
      src/emqx_connection.erl
  3. 52 0
      test/emqx_mqtt_SUITE.erl

+ 1 - 0
src/emqx_cm.erl

@@ -111,6 +111,7 @@ start_link() ->
 insert_channel_info(ClientId, Info, Stats) ->
     Chan = {ClientId, self()},
     true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}),
+    ?tp(debug, insert_channel_info, #{client_id => ClientId}),
     ok.
 
 %% @private

+ 58 - 1
src/emqx_connection.erl

@@ -41,8 +41,13 @@
         , stats/1
         ]).
 
+-export([ async_set_keepalive/4
+        , async_set_socket_options/2
+        ]).
+
 -export([ call/2
         , call/3
+        , cast/2
         ]).
 
 %% Callback
@@ -56,7 +61,7 @@
         ]).
 
 %% Internal callback
--export([wakeup_from_hib/2, recvloop/2]).
+-export([wakeup_from_hib/2, recvloop/2, get_state/1]).
 
 %% Export for CT
 -export([set_field/3]).
@@ -184,6 +189,35 @@ stats(#state{transport = Transport,
     ProcStats = emqx_misc:proc_stats(),
     lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
 
+%% @doc Set TCP keepalive socket options to override system defaults.
+%% Idle: The number of seconds a connection needs to be idle before
+%%       TCP begins sending out keep-alive probes (Linux default 7200).
+%% Interval: The number of seconds between TCP keep-alive probes
+%%           (Linux default 75).
+%% Probes: The maximum number of TCP keep-alive probes to send before
+%%         giving up and killing the connection if no response is
+%%         obtained from the other end (Linux default 9).
+%%
+%% NOTE: This API sets TCP socket options, which has nothing to do with
+%%       the MQTT layer's keepalive (PINGREQ and PINGRESP).
+async_set_keepalive(Pid, Idle, Interval, Probes) ->
+    Options = [ {keepalive, true}
+              , {raw, 6, 4, <<Idle:32/native>>}
+              , {raw, 6, 5, <<Interval:32/native>>}
+              , {raw, 6, 6, <<Probes:32/native>>}
+              ],
+    async_set_socket_options(Pid, Options).
+
+%% @doc Set custom socket options.
+%% This API is made async because the call might be originated from
+%% a hookpoint callback (otherwise deadlock).
+%% If failed to set, the error message is logged.
+async_set_socket_options(Pid, Options) ->
+    cast(Pid, {async_set_socket_options, Options}).
+
+cast(Pid, Req) ->
+    gen_server:cast(Pid, Req).
+
 call(Pid, Req) ->
     call(Pid, Req, infinity).
 call(Pid, Req, Timeout) ->
@@ -366,6 +400,9 @@ handle_msg({'$gen_call', From, Req}, State) ->
             gen_server:reply(From, Reply),
             stop(Reason, NState)
     end;
+handle_msg({'$gen_cast', Req}, State) ->
+    NewState = handle_cast(Req, State),
+    {ok, NewState};
 
 handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
     ?LOG(debug, "RECV ~0p", [Data]),
@@ -692,6 +729,22 @@ handle_info({sock_error, Reason}, State) ->
 handle_info(Info, State) ->
     with_channel(handle_info, [Info], State).
 
+%%--------------------------------------------------------------------
+%% Handle Info
+
+handle_cast({async_set_socket_options, Opts},
+            State = #state{transport = Transport,
+                           socket    = Socket
+                          }) ->
+    case Transport:setopts(Socket, Opts) of
+        ok -> ?tp(info, "custom_socket_options_successfully", #{opts => Opts});
+        Err -> ?tp(error, "failed_to_set_custom_socket_optionn", #{reason => Err})
+    end,
+    State;
+handle_cast(Req, State) ->
+    ?tp(error, "received_unknown_cast", #{cast => Req}),
+    State.
+
 %%--------------------------------------------------------------------
 %% Ensure rate limit
 
@@ -820,3 +873,7 @@ set_field(Name, Value, State) ->
     Pos = emqx_misc:index_of(Name, record_info(fields, state)),
     setelement(Pos+1, State, Value).
 
+get_state(Pid) ->
+    State = sys:get_state(Pid),
+    maps:from_list(lists:zip(record_info(fields, state),
+                             tl(tuple_to_list(State)))).

+ 52 - 0
test/emqx_mqtt_SUITE.erl

@@ -22,6 +22,7 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(STATS_KYES, [recv_pkt, recv_msg, send_pkt, send_msg,
                      recv_oct, recv_cnt, send_oct, send_cnt,
@@ -38,6 +39,19 @@ init_per_suite(Config) ->
 end_per_suite(_Config) ->
     emqx_ct_helpers:stop_apps([]).
 
+init_per_testcase(TestCase, Config) ->
+    case erlang:function_exported(?MODULE, TestCase, 2) of
+        true -> ?MODULE:TestCase(init, Config);
+        false -> Config
+    end.
+
+end_per_testcase(TestCase, Config) ->
+    case erlang:function_exported(?MODULE, TestCase, 2) of
+        true -> ?MODULE:TestCase('end', Config);
+        false -> ok
+    end,
+    Config.
+
 t_conn_stats(_) ->
     with_client(fun(CPid) ->
                             Stats = emqx_connection:stats(CPid),
@@ -134,3 +148,41 @@ with_client(TestFun, _Options) ->
             emqtt:stop(C)
     end.
 
+t_async_set_keepalive(init, Config) ->
+    ok = snabbkaffe:start_trace(),
+    Config;
+t_async_set_keepalive('end', _Config) ->
+    snabbkaffe:stop(),
+    ok.
+
+t_async_set_keepalive(_) ->
+    ClientID = <<"client-tcp-keepalive">>,
+    {ok, Client} = emqtt:start_link([{host, "localhost"},
+                                     {proto_ver,v5},
+                                     {clientid, ClientID},
+                                     {clean_start, false}]),
+    {ok, _} = emqtt:connect(Client),
+    {ok, _} = ?block_until(#{?snk_kind := insert_channel_info,
+                             client_id := ClientID}, 2000, 100),
+    [Pid] = emqx_cm:lookup_channels(ClientID),
+    State = emqx_connection:get_state(Pid),
+    Transport = maps:get(transport, State),
+    Socket = maps:get(socket, State),
+    ?assert(is_port(Socket)),
+    Opts = [{raw, 6, 4, 4}, {raw, 6, 5, 4}, {raw, 6, 6, 4}],
+    {ok, [ {raw, 6, 4, <<Idle:32/native>>}
+         , {raw, 6, 5, <<Interval:32/native>>}
+         , {raw, 6, 6, <<Probes:32/native>>}
+         ]} = Transport:getopts(Socket, Opts),
+    ct:pal("Idle=~p, Interval=~p, Probes=~p", [Idle, Interval, Probes]),
+    emqx_connection:async_set_keepalive(Pid, Idle + 1, Interval + 1, Probes + 1),
+    {ok, _} = ?block_until(#{?snk_kind := "custom_socket_options_successfully"}, 1000),
+    {ok, [ {raw, 6, 4, <<NewIdle:32/native>>}
+         , {raw, 6, 5, <<NewInterval:32/native>>}
+         , {raw, 6, 6, <<NewProbes:32/native>>}
+         ]} = Transport:getopts(Socket, Opts),
+    ?assertEqual(NewIdle, Idle + 1),
+    ?assertEqual(NewInterval, Interval + 1),
+    ?assertEqual(NewProbes, Probes + 1),
+    emqtt:stop(Client),
+    ok.