Просмотр исходного кода

Merge pull request #8866 from HJianBo/port-exhook-exproto-bugfixes

fix(exproto): avoid udp client process leaking
JianBo He 3 лет назад
Родитель
Сommit
9e07f074bf

+ 4 - 0
CHANGES-5.0.md

@@ -11,6 +11,8 @@
 * Fix delayed publish inaccurate caused by os time change. [#8926](https://github.com/emqx/emqx/pull/8926)
 * Fix that EMQX can't start when the retainer is disabled [#8911](https://github.com/emqx/emqx/pull/8911)
 * Fix that redis authn will deny the unknown users [#8934](https://github.com/emqx/emqx/pull/8934)
+* Fix ExProto UDP client keepalive checking error.
+  This causes the clients to not expire as long as a new UDP packet arrives [#8866](https://github.com/emqx/emqx/pull/8866)
 
 ## Enhancements
 
@@ -19,6 +21,8 @@
 * Remove `node.etc_dir` from emqx.conf, because it is never used.
   Also allow user to customize the logging directory [#8892](https://github.com/emqx/emqx/pull/8892)
 * Added a new API `POST /listeners` for creating listener. [#8876](https://github.com/emqx/emqx/pull/8876)
+* Close ExProto client process immediately if it's keepalive timeouted. [#8866](https://github.com/emqx/emqx/pull/8866)
+* Upgrade grpc-erl driver to 0.6.7 to support batch operation in sending stream. [#8866](https://github.com/emqx/emqx/pull/8866)
 
 # 5.0.7
 

+ 2 - 0
apps/emqx_exhook/include/emqx_exhook.hrl

@@ -43,6 +43,8 @@
     {'message.dropped', {emqx_exhook_handler, on_message_dropped, []}}
 ]).
 
+-define(SERVER_FORCE_SHUTDOWN_TIMEOUT, 5000).
+
 -endif.
 
 -define(CMD_MOVE_FRONT, front).

+ 4 - 2
apps/emqx_exhook/src/emqx_exhook_mgr.erl

@@ -21,6 +21,7 @@
 
 -include("emqx_exhook.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 %% APIs
 -export([start_link/0]).
@@ -297,7 +298,8 @@ handle_info(refresh_tick, State) ->
 handle_info(_Info, State) ->
     {noreply, State}.
 
-terminate(_Reason, State = #{servers := Servers}) ->
+terminate(Reason, State = #{servers := Servers}) ->
+    _ = unload_exhooks(),
     _ = maps:fold(
         fun(Name, _, AccIn) ->
             do_unload_server(Name, AccIn)
@@ -305,7 +307,7 @@ terminate(_Reason, State = #{servers := Servers}) ->
         State,
         Servers
     ),
-    _ = unload_exhooks(),
+    ?tp(info, exhook_mgr_terminated, #{reason => Reason, servers => Servers}),
     ok.
 
 code_change(_OldVsn, State, _Extra) ->

+ 5 - 2
apps/emqx_exhook/src/emqx_exhook_server.erl

@@ -179,13 +179,16 @@ filter(Ls) ->
 
 -spec unload(server()) -> ok.
 unload(#{name := Name, options := ReqOpts, hookspec := HookSpecs}) ->
-    _ = do_deinit(Name, ReqOpts),
     _ = may_unload_hooks(HookSpecs),
+    _ = do_deinit(Name, ReqOpts),
     _ = emqx_exhook_sup:stop_grpc_client_channel(Name),
     ok.
 
 do_deinit(Name, ReqOpts) ->
-    _ = do_call(Name, undefined, 'on_provider_unloaded', #{}, ReqOpts),
+    %% Override the request timeout to deinit grpc server to
+    %% avoid emqx_exhook_mgr force killed by upper supervisor
+    NReqOpts = ReqOpts#{timeout => ?SERVER_FORCE_SHUTDOWN_TIMEOUT},
+    _ = do_call(Name, undefined, 'on_provider_unloaded', #{}, NReqOpts),
     ok.
 
 do_init(ChannName, ReqOpts) ->

+ 13 - 3
apps/emqx_exhook/src/emqx_exhook_sup.erl

@@ -16,6 +16,8 @@
 
 -module(emqx_exhook_sup).
 
+-include("emqx_exhook.hrl").
+
 -behaviour(supervisor).
 
 -export([
@@ -28,11 +30,13 @@
     stop_grpc_client_channel/1
 ]).
 
--define(CHILD(Mod, Type, Args), #{
+-define(DEFAULT_TIMEOUT, 5000).
+
+-define(CHILD(Mod, Type, Args, Timeout), #{
     id => Mod,
     start => {Mod, start_link, Args},
     type => Type,
-    shutdown => 15000
+    shutdown => Timeout
 }).
 
 %%--------------------------------------------------------------------
@@ -45,7 +49,7 @@ start_link() ->
 init([]) ->
     _ = emqx_exhook_metrics:init(),
     _ = emqx_exhook_mgr:init_ref_counter_table(),
-    Mngr = ?CHILD(emqx_exhook_mgr, worker, []),
+    Mngr = ?CHILD(emqx_exhook_mgr, worker, [], force_shutdown_timeout()),
     {ok, {{one_for_one, 10, 100}, [Mngr]}}.
 
 %%--------------------------------------------------------------------
@@ -70,3 +74,9 @@ stop_grpc_client_channel(Name) ->
         _:_:_ ->
             ok
     end.
+
+%% Calculate the maximum timeout, which will help to shutdown the
+%% emqx_exhook_mgr process correctly.
+force_shutdown_timeout() ->
+    Factor = max(3, length(emqx:get_config([exhook, servers])) + 1),
+    Factor * ?SERVER_FORCE_SHUTDOWN_TIMEOUT.

+ 35 - 0
apps/emqx_exhook/test/emqx_exhook_SUITE.erl

@@ -24,6 +24,7 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("emqx/include/emqx_hooks.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(DEFAULT_CLUSTER_NAME_ATOM, emqxcl).
 
@@ -313,6 +314,40 @@ t_cluster_name(_) ->
     ),
     emqx_exhook_mgr:disable(<<"default">>).
 
+t_stop_timeout(_) ->
+    snabbkaffe:start_trace(),
+    meck:new(emqx_exhook_demo_svr, [passthrough, no_history]),
+    meck:expect(
+        emqx_exhook_demo_svr,
+        on_provider_unloaded,
+        fun(Req, Md) ->
+            %% ensure sleep time greater than emqx_exhook_mgr shutdown timeout
+            timer:sleep(20000),
+            meck:passthrough([Req, Md])
+        end
+    ),
+
+    %% stop application
+    application:stop(emqx_exhook),
+    ?block_until(#{?snk_kind := exhook_mgr_terminated}, 20000),
+
+    %% all exhook hooked point should be unloaded
+    Mods = lists:flatten(
+        lists:map(
+            fun({hook, _, Cbs}) ->
+                lists:map(fun({callback, {M, _, _}, _, _}) -> M end, Cbs)
+            end,
+            ets:tab2list(emqx_hooks)
+        )
+    ),
+    ?assertEqual(false, lists:any(fun(M) -> M == emqx_exhook_handler end, Mods)),
+
+    %% ensure started for other tests
+    emqx_common_test_helpers:start_apps([emqx_exhook]),
+
+    snabbkaffe:stop(),
+    meck:unload(emqx_exhook_demo_svr).
+
 %%--------------------------------------------------------------------
 %% Cases Helpers
 %%--------------------------------------------------------------------

+ 10 - 1
apps/emqx_exhook/test/emqx_exhook_demo_svr.erl

@@ -80,7 +80,16 @@ stop() ->
 
 stop(Name) ->
     grpc:stop_server(Name),
-    to_atom_name(Name) ! stop.
+    case whereis(to_atom_name(Name)) of
+        undefined ->
+            ok;
+        Pid ->
+            Ref = erlang:monitor(process, Pid),
+            Pid ! stop,
+            receive
+                {'DOWN', Ref, process, Pid, _Reason} -> ok
+            end
+    end.
 
 take() ->
     to_atom_name(?NAME) ! {take, self()},

+ 20 - 11
apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl

@@ -19,6 +19,7 @@
 
 -include_lib("emqx/include/types.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 %% API
 -export([
@@ -51,6 +52,9 @@
 %% Internal callback
 -export([wakeup_from_hib/2, recvloop/2]).
 
+%% for channel module
+-export([keepalive_stats/1]).
+
 -record(state, {
     %% TCP/SSL/UDP/DTLS Wrapped Socket
     socket :: {esockd_transport, esockd:socket()} | {udp, _, _},
@@ -240,6 +244,11 @@ esockd_send(Data, #state{
 esockd_send(Data, #state{socket = {esockd_transport, Sock}}) ->
     esockd_transport:async_send(Sock, Data).
 
+keepalive_stats(recv) ->
+    emqx_pd:get_counter(recv_pkt);
+keepalive_stats(send) ->
+    emqx_pd:get_counter(send_pkt).
+
 is_datadram_socket({esockd_transport, _}) -> false;
 is_datadram_socket({udp, _, _}) -> true.
 
@@ -568,9 +577,15 @@ terminate(
         channel = Channel
     }
 ) ->
-    ?SLOG(debug, #{msg => "conn_process_terminated", reason => Reason}),
     _ = ChannMod:terminate(Reason, Channel),
     _ = close_socket(State),
+    ClientId =
+        try ChannMod:info(clientid, Channel) of
+            Id -> Id
+        catch
+            _:_ -> undefined
+        end,
+    ?tp(debug, conn_process_terminated, #{reason => Reason, clientid => ClientId}),
     exit(Reason).
 
 %%--------------------------------------------------------------------
@@ -635,28 +650,22 @@ handle_timeout(
     Keepalive,
     State = #state{
         chann_mod = ChannMod,
-        socket = Socket,
         channel = Channel
     }
 ) when
     Keepalive == keepalive;
     Keepalive == keepalive_send
 ->
-    Stat =
+    StatVal =
         case Keepalive of
-            keepalive -> recv_oct;
-            keepalive_send -> send_oct
+            keepalive -> keepalive_stats(recv);
+            keepalive_send -> keepalive_stats(send)
         end,
     case ChannMod:info(conn_state, Channel) of
         disconnected ->
             {ok, State};
         _ ->
-            case esockd_getstat(Socket, [Stat]) of
-                {ok, [{Stat, RecvOct}]} ->
-                    handle_timeout(TRef, {Keepalive, RecvOct}, State);
-                {error, Reason} ->
-                    handle_info({sock_error, Reason}, State)
-            end
+            handle_timeout(TRef, {Keepalive, StatVal}, State)
     end;
 handle_timeout(
     _TRef,

+ 45 - 17
apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl

@@ -78,7 +78,8 @@
 
 -define(TIMER_TABLE, #{
     alive_timer => keepalive,
-    force_timer => force_close
+    force_timer => force_close,
+    idle_timer => force_close_idle
 }).
 
 -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
@@ -151,14 +152,17 @@ init(
     Ctx = maps:get(ctx, Options),
     GRpcChann = maps:get(handler, Options),
     PoolName = maps:get(pool_name, Options),
-    NConnInfo = default_conninfo(ConnInfo),
+    IdleTimeout = emqx_gateway_utils:idle_timeout(Options),
+
+    NConnInfo = default_conninfo(ConnInfo#{idle_timeout => IdleTimeout}),
     ListenerId =
         case maps:get(listener, Options, undefined) of
             undefined -> undefined;
             {GwName, Type, LisName} -> emqx_gateway_utils:listener_id(GwName, Type, LisName)
         end,
+
     EnableAuthn = maps:get(enable_authn, Options, true),
-    DefaultClientInfo = default_clientinfo(ConnInfo),
+    DefaultClientInfo = default_clientinfo(NConnInfo),
     ClientInfo = DefaultClientInfo#{
         listener => ListenerId,
         enable_authn => EnableAuthn
@@ -183,7 +187,9 @@ init(
                 }
             )
     },
-    try_dispatch(on_socket_created, wrap(Req), Channel).
+    start_idle_checking_timer(
+        try_dispatch(on_socket_created, wrap(Req), Channel)
+    ).
 
 %% @private
 peercert(NoSsl, ConnInfo) when
@@ -217,6 +223,12 @@ socktype(dtls) -> 'DTLS'.
 address({Host, Port}) ->
     #{host => inet:ntoa(Host), port => Port}.
 
+%% avoid udp connection process leak
+start_idle_checking_timer(Channel = #channel{conninfo = #{socktype := udp}}) ->
+    ensure_timer(idle_timer, Channel);
+start_idle_checking_timer(Channel) ->
+    Channel.
+
 %%--------------------------------------------------------------------
 %% Handle incoming packet
 %%--------------------------------------------------------------------
@@ -285,10 +297,15 @@ handle_timeout(
             {ok, reset_timer(alive_timer, NChannel)};
         {error, timeout} ->
             Req = #{type => 'KEEPALIVE'},
-            {ok, try_dispatch(on_timer_timeout, wrap(Req), Channel)}
+            NChannel = remove_timer_ref(alive_timer, Channel),
+            %% close connection if keepalive timeout
+            Replies = [{event, disconnected}, {close, keepalive_timeout}],
+            {ok, Replies, try_dispatch(on_timer_timeout, wrap(Req), NChannel)}
     end;
 handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
     {shutdown, {error, {force_close, Reason}}, Channel};
+handle_timeout(_TRef, force_close_idle, Channel) ->
+    {shutdown, idle_timeout, Channel};
 handle_timeout(_TRef, Msg, Channel) ->
     ?SLOG(warning, #{
         msg => "unexpected_timeout_signal",
@@ -390,7 +407,7 @@ handle_call(
     NConnInfo = ConnInfo#{keepalive => Interval},
     NClientInfo = ClientInfo#{keepalive => Interval},
     NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo},
-    {reply, ok, ensure_keepalive(NChannel)};
+    {reply, ok, [{event, updated}], ensure_keepalive(cancel_timer(idle_timer, NChannel))};
 handle_call(
     {subscribe_from_client, TopicFilter, Qos},
     _From,
@@ -405,21 +422,21 @@ handle_call(
             {reply, {error, ?RESP_PERMISSION_DENY, <<"Authorization deny">>}, Channel};
         _ ->
             {ok, _, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel),
-            {reply, ok, NChannel}
+            {reply, ok, [{event, updated}], NChannel}
     end;
 handle_call({subscribe, Topic, SubOpts}, _From, Channel) ->
     {ok, [{NTopicFilter, NSubOpts}], NChannel} = do_subscribe([{Topic, SubOpts}], Channel),
-    {reply, {ok, {NTopicFilter, NSubOpts}}, NChannel};
+    {reply, {ok, {NTopicFilter, NSubOpts}}, [{event, updated}], NChannel};
 handle_call(
     {unsubscribe_from_client, TopicFilter},
     _From,
     Channel = #channel{conn_state = connected}
 ) ->
     {ok, NChannel} = do_unsubscribe([{TopicFilter, #{}}], Channel),
-    {reply, ok, NChannel};
+    {reply, ok, [{event, updated}], NChannel};
 handle_call({unsubscribe, Topic}, _From, Channel) ->
     {ok, NChannel} = do_unsubscribe([Topic], Channel),
-    {reply, ok, NChannel};
+    {reply, ok, [{event, update}], NChannel};
 handle_call(subscriptions, _From, Channel = #channel{subscriptions = Subs}) ->
     {reply, {ok, maps:to_list(Subs)}, Channel};
 handle_call(
@@ -446,7 +463,7 @@ handle_call(
             {reply, ok, Channel}
     end;
 handle_call(kick, _From, Channel) ->
-    {shutdown, kicked, ok, ensure_disconnected(kicked, Channel)};
+    {reply, ok, [{event, disconnected}, {close, kicked}], Channel};
 handle_call(discard, _From, Channel) ->
     {shutdown, discarded, ok, Channel};
 handle_call(Req, _From, Channel) ->
@@ -648,7 +665,8 @@ ensure_keepalive(Channel = #channel{clientinfo = ClientInfo}) ->
 ensure_keepalive_timer(Interval, Channel) when Interval =< 0 ->
     Channel;
 ensure_keepalive_timer(Interval, Channel) ->
-    Keepalive = emqx_keepalive:init(timer:seconds(Interval)),
+    StatVal = emqx_gateway_conn:keepalive_stats(recv),
+    Keepalive = emqx_keepalive:init(StatVal, timer:seconds(Interval)),
     ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
 
 ensure_timer(Name, Channel = #channel{timers = Timers}) ->
@@ -666,11 +684,17 @@ ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
     Channel#channel{timers = Timers#{Name => TRef}}.
 
 reset_timer(Name, Channel) ->
-    ensure_timer(Name, clean_timer(Name, Channel)).
+    ensure_timer(Name, remove_timer_ref(Name, Channel)).
 
-clean_timer(Name, Channel = #channel{timers = Timers}) ->
+cancel_timer(Name, Channel = #channel{timers = Timers}) ->
+    emqx_misc:cancel_timer(maps:get(Name, Timers, undefined)),
+    remove_timer_ref(Name, Channel).
+
+remove_timer_ref(Name, Channel = #channel{timers = Timers}) ->
     Channel#channel{timers = maps:remove(Name, Timers)}.
 
+interval(idle_timer, #channel{conninfo = #{idle_timeout := IdleTimeout}}) ->
+    IdleTimeout;
 interval(force_timer, _) ->
     15000;
 interval(alive_timer, #channel{keepalive = Keepalive}) ->
@@ -725,7 +749,7 @@ enrich_clientinfo(InClientInfo = #{proto_name := ProtoName}, ClientInfo) ->
 default_conninfo(ConnInfo) ->
     ConnInfo#{
         clean_start => true,
-        clientid => undefined,
+        clientid => anonymous_clientid(),
         username => undefined,
         conn_props => #{},
         connected => true,
@@ -739,14 +763,15 @@ default_conninfo(ConnInfo) ->
 
 default_clientinfo(#{
     peername := {PeerHost, _},
-    sockname := {_, SockPort}
+    sockname := {_, SockPort},
+    clientid := ClientId
 }) ->
     #{
         zone => default,
         protocol => exproto,
         peerhost => PeerHost,
         sockport => SockPort,
-        clientid => undefined,
+        clientid => ClientId,
         username => undefined,
         is_bridge => false,
         is_superuser => false,
@@ -764,3 +789,6 @@ proto_name_to_protocol(<<>>) ->
     exproto;
 proto_name_to_protocol(ProtoName) when is_binary(ProtoName) ->
     binary_to_atom(ProtoName).
+
+anonymous_clientid() ->
+    iolist_to_binary(["exproto-", emqx_misc:gen_id()]).

+ 9 - 1
apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl

@@ -56,12 +56,19 @@ start_link(Pool, Id) ->
         []
     ).
 
+-spec async_call(atom(), map(), map()) -> ok.
 async_call(
     FunName,
     Req = #{conn := Conn},
     Options = #{pool_name := PoolName}
 ) ->
-    cast(pick(PoolName, Conn), {rpc, FunName, Req, Options, self()}).
+    case pick(PoolName, Conn) of
+        false ->
+            reply(self(), FunName, {error, no_available_grpc_client});
+        Pid when is_pid(Pid) ->
+            cast(Pid, {rpc, FunName, Req, Options, self()})
+    end,
+    ok.
 
 %%--------------------------------------------------------------------
 %% cast, pick
@@ -72,6 +79,7 @@ async_call(
 cast(Deliver, Msg) ->
     gen_server:cast(Deliver, Msg).
 
+-spec pick(term(), term()) -> pid() | false.
 pick(PoolName, Conn) ->
     gproc_pool:pick_worker(PoolName, Conn).
 

+ 93 - 12
apps/emqx_gateway/test/emqx_exproto_SUITE.erl

@@ -20,6 +20,7 @@
 -compile(nowarn_export_all).
 
 -include_lib("emqx/include/emqx_hooks.hrl").
+-include_lib("eunit/include/eunit.hrl").
 
 -import(
     emqx_exproto_echo_svr,
@@ -38,6 +39,7 @@
 
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(TCPOPTS, [binary, {active, false}]).
 -define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]).
@@ -62,6 +64,9 @@
 all() ->
     [{group, Name} || Name <- metrics()].
 
+suite() ->
+    [{timetrap, {seconds, 30}}].
+
 groups() ->
     Cases = emqx_common_test_helpers:all(?MODULE),
     [{Name, Cases} || Name <- metrics()].
@@ -87,6 +92,7 @@ set_special_cfg(emqx_gateway) ->
         [gateway, exproto],
         #{
             server => #{bind => 9100},
+            idle_timeout => 5000,
             handler => #{address => "http://127.0.0.1:9001"},
             listeners => listener_confs(LisType)
         }
@@ -223,14 +229,16 @@ t_acl_deny(Cfg) ->
     close(Sock).
 
 t_keepalive_timeout(Cfg) ->
+    ok = snabbkaffe:start_trace(),
     SockType = proplists:get_value(listener_type, Cfg),
     Sock = open(SockType),
 
+    ClientId1 = <<"keepalive_test_client1">>,
     Client = #{
         proto_name => <<"demo">>,
         proto_ver => <<"v0.1">>,
-        clientid => <<"test_client_1">>,
-        keepalive => 2
+        clientid => ClientId1,
+        keepalive => 5
     },
     Password = <<"123456">>,
 
@@ -238,16 +246,42 @@ t_keepalive_timeout(Cfg) ->
     ConnAckBin = frame_connack(0),
 
     send(Sock, ConnBin),
-    {ok, ConnAckBin} = recv(Sock, 5000),
-
-    DisconnectBin = frame_disconnect(),
-    {ok, DisconnectBin} = recv(Sock, 10000),
-
-    SockType =/= udp andalso
-        begin
-            {error, closed} = recv(Sock, 5000)
-        end,
-    ok.
+    {ok, ConnAckBin} = recv(Sock),
+
+    case SockType of
+        udp ->
+            %% another udp client should not affect the first
+            %% udp client keepalive check
+            timer:sleep(4000),
+            Sock2 = open(SockType),
+            ConnBin2 = frame_connect(
+                Client#{clientid => <<"keepalive_test_client2">>},
+                Password
+            ),
+            send(Sock2, ConnBin2),
+            %% first client will be keepalive timeouted in 6s
+            ?assertMatch(
+                {ok, #{
+                    clientid := ClientId1,
+                    reason := {shutdown, {sock_closed, keepalive_timeout}}
+                }},
+                ?block_until(#{?snk_kind := conn_process_terminated}, 8000)
+            );
+        _ ->
+            ?assertMatch(
+                {ok, #{
+                    clientid := ClientId1,
+                    reason := {shutdown, {sock_closed, keepalive_timeout}}
+                }},
+                ?block_until(#{?snk_kind := conn_process_terminated}, 12000)
+            ),
+            Trace = snabbkaffe:collect_trace(),
+            %% conn process should be terminated
+            ?assertEqual(1, length(?of_kind(conn_process_terminated, Trace))),
+            %% socket port should be closed
+            ?assertEqual({error, closed}, recv(Sock, 5000))
+    end,
+    snabbkaffe:stop().
 
 t_hook_connected_disconnected(Cfg) ->
     SockType = proplists:get_value(listener_type, Cfg),
@@ -337,6 +371,8 @@ t_hook_session_subscribed_unsubscribed(Cfg) ->
         error(hook_is_not_running)
     end,
 
+    send(Sock, frame_disconnect()),
+
     close(Sock),
     emqx_hooks:del('session.subscribed', {?MODULE, hook_fun3}),
     emqx_hooks:del('session.unsubscribed', {?MODULE, hook_fun4}).
@@ -373,6 +409,48 @@ t_hook_message_delivered(Cfg) ->
     close(Sock),
     emqx_hooks:del('message.delivered', {?MODULE, hook_fun5}).
 
+t_idle_timeout(Cfg) ->
+    ok = snabbkaffe:start_trace(),
+    SockType = proplists:get_value(listener_type, Cfg),
+    Sock = open(SockType),
+
+    %% need to create udp client by sending something
+    case SockType of
+        udp ->
+            %% nothing to do
+            ok = meck:new(emqx_exproto_gcli, [passthrough, no_history]),
+            ok = meck:expect(
+                emqx_exproto_gcli,
+                async_call,
+                fun(FunName, _Req, _GClient) ->
+                    self() ! {hreply, FunName, ok},
+                    ok
+                end
+            ),
+            %% send request, but nobody can respond to it
+            ClientId = <<"idle_test_client1">>,
+            Client = #{
+                proto_name => <<"demo">>,
+                proto_ver => <<"v0.1">>,
+                clientid => ClientId,
+                keepalive => 5
+            },
+            Password = <<"123456">>,
+            ConnBin = frame_connect(Client, Password),
+            send(Sock, ConnBin),
+            ?assertMatch(
+                {ok, #{reason := {shutdown, idle_timeout}}},
+                ?block_until(#{?snk_kind := conn_process_terminated}, 10000)
+            ),
+            ok = meck:unload(emqx_exproto_gcli);
+        _ ->
+            ?assertMatch(
+                {ok, #{reason := {shutdown, idle_timeout}}},
+                ?block_until(#{?snk_kind := conn_process_terminated}, 10000)
+            )
+    end,
+    snabbkaffe:stop().
+
 %%--------------------------------------------------------------------
 %% Utils
 
@@ -422,6 +500,9 @@ send({ssl, Sock}, Bin) ->
 send({dtls, Sock}, Bin) ->
     ssl:send(Sock, Bin).
 
+recv(Sock) ->
+    recv(Sock, infinity).
+
 recv({tcp, Sock}, Ts) ->
     gen_tcp:recv(Sock, 0, Ts);
 recv({udp, Sock}, Ts) ->

+ 1 - 1
mix.exs

@@ -54,7 +54,7 @@ defmodule EMQXUmbrella.MixProject do
       {:esockd, github: "emqx/esockd", tag: "5.9.4", override: true},
       {:ekka, github: "emqx/ekka", tag: "0.13.4", override: true},
       {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
-      {:grpc, github: "emqx/grpc-erl", tag: "0.6.6", override: true},
+      {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true},
       {:minirest, github: "emqx/minirest", tag: "1.3.7", override: true},
       {:ecpool, github: "emqx/ecpool", tag: "0.5.2"},
       {:replayq, "0.3.4", override: true},

+ 1 - 1
rebar.config

@@ -56,7 +56,7 @@
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}}
     , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.4"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
-    , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.6"}}}
+    , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
     , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.7"}}}
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
     , {replayq, "0.3.4"}