Просмотр исходного кода

fix(exproto): avoid udp client process leaking

porting from v4.x:
- https://github.com/emqx/emqx/pull/8575
- https://github.com/emqx/emqx/pull/8628
- https://github.com/emqx/emqx/pull/8725
JianBo He 3 лет назад
Родитель
Сommit
dfc6e34680

+ 1 - 1
apps/emqx_exhook/src/emqx_exhook_mgr.erl

@@ -298,6 +298,7 @@ handle_info(_Info, State) ->
     {noreply, State}.
 
 terminate(_Reason, State = #{servers := Servers}) ->
+    _ = unload_exhooks(),
     _ = maps:fold(
         fun(Name, _, AccIn) ->
             do_unload_server(Name, AccIn)
@@ -305,7 +306,6 @@ terminate(_Reason, State = #{servers := Servers}) ->
         State,
         Servers
     ),
-    _ = unload_exhooks(),
     ok.
 
 code_change(_OldVsn, State, _Extra) ->

+ 4 - 2
apps/emqx_exhook/src/emqx_exhook_server.erl

@@ -179,13 +179,15 @@ filter(Ls) ->
 
 -spec unload(server()) -> ok.
 unload(#{name := Name, options := ReqOpts, hookspec := HookSpecs}) ->
-    _ = do_deinit(Name, ReqOpts),
     _ = may_unload_hooks(HookSpecs),
+    _ = do_deinit(Name, ReqOpts),
     _ = emqx_exhook_sup:stop_grpc_client_channel(Name),
     ok.
 
 do_deinit(Name, ReqOpts) ->
-    _ = do_call(Name, undefined, 'on_provider_unloaded', #{}, ReqOpts),
+    %% Using shorter timeout to deinit grpc server to avoid emqx_exhook_mgr
+    %% force killed by upper supervisor
+    _ = do_call(Name, undefined, 'on_provider_unloaded', #{}, ReqOpts#{timeout => 3000}),
     ok.
 
 do_init(ChannName, ReqOpts) ->

+ 8 - 3
apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl

@@ -240,6 +240,11 @@ esockd_send(Data, #state{
 esockd_send(Data, #state{socket = {esockd_transport, Sock}}) ->
     esockd_transport:async_send(Sock, Data).
 
+keepalive_stats(recv_oct) ->
+    emqx_pd:get_counter(incoming_bytes);
+keepalive_stats(send_oct) ->
+    emqx_pd:get_counter(outgoing_bytes).
+
 is_datadram_socket({esockd_transport, _}) -> false;
 is_datadram_socket({udp, _, _}) -> true.
 
@@ -651,9 +656,9 @@ handle_timeout(
         disconnected ->
             {ok, State};
         _ ->
-            case esockd_getstat(Socket, [Stat]) of
-                {ok, [{Stat, RecvOct}]} ->
-                    handle_timeout(TRef, {Keepalive, RecvOct}, State);
+            case keepalive_stats(Stat) of
+                {ok, Oct} ->
+                    handle_timeout(TRef, {Keepalive, Oct}, State);
                 {error, Reason} ->
                     handle_info({sock_error, Reason}, State)
             end

+ 53 - 15
apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl

@@ -78,11 +78,14 @@
 
 -define(TIMER_TABLE, #{
     alive_timer => keepalive,
-    force_timer => force_close
+    force_timer => force_close,
+    idle_timer => force_close_idle
 }).
 
 -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
 
+-define(DEFAULT_IDLE_TIMEOUT, 30000).
+
 %%--------------------------------------------------------------------
 %% Info, Attrs and Caps
 %%--------------------------------------------------------------------
@@ -151,14 +154,17 @@ init(
     Ctx = maps:get(ctx, Options),
     GRpcChann = maps:get(handler, Options),
     PoolName = maps:get(pool_name, Options),
-    NConnInfo = default_conninfo(ConnInfo),
+    IdleTimeout = proplists:get_value(idle_timeout, Options, ?DEFAULT_IDLE_TIMEOUT),
+
+    NConnInfo = default_conninfo(ConnInfo#{idle_timeout => IdleTimeout}),
     ListenerId =
         case maps:get(listener, Options, undefined) of
             undefined -> undefined;
             {GwName, Type, LisName} -> emqx_gateway_utils:listener_id(GwName, Type, LisName)
         end,
+
     EnableAuthn = maps:get(enable_authn, Options, true),
-    DefaultClientInfo = default_clientinfo(ConnInfo),
+    DefaultClientInfo = default_clientinfo(NConnInfo),
     ClientInfo = DefaultClientInfo#{
         listener => ListenerId,
         enable_authn => EnableAuthn
@@ -183,7 +189,9 @@ init(
                 }
             )
     },
-    try_dispatch(on_socket_created, wrap(Req), Channel).
+    start_idle_checking_timer(
+        try_dispatch(on_socket_created, wrap(Req), Channel)
+    ).
 
 %% @private
 peercert(NoSsl, ConnInfo) when
@@ -217,6 +225,12 @@ socktype(dtls) -> 'DTLS'.
 address({Host, Port}) ->
     #{host => inet:ntoa(Host), port => Port}.
 
+%% avoid udp connection process leak
+start_idle_checking_timer(Channel = #channel{conninfo = #{socktype := udp}}) ->
+    ensure_timer(idle_timer, Channel);
+start_idle_checking_timer(Channel) ->
+    Channel.
+
 %%--------------------------------------------------------------------
 %% Handle incoming packet
 %%--------------------------------------------------------------------
@@ -285,10 +299,15 @@ handle_timeout(
             {ok, reset_timer(alive_timer, NChannel)};
         {error, timeout} ->
             Req = #{type => 'KEEPALIVE'},
-            {ok, try_dispatch(on_timer_timeout, wrap(Req), Channel)}
+            NChannel = clean_timer(alive_timer, Channel),
+            %% close connection if keepalive timeout
+            Replies = [{event, disconnected}, {close, normal}],
+            {ok, Replies, try_dispatch(on_timer_timeout, wrap(Req), NChannel)}
     end;
 handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
     {shutdown, {error, {force_close, Reason}}, Channel};
+handle_timeout(_TRef, force_close_idle, Channel) ->
+    {shutdown, idle_timeout, Channel};
 handle_timeout(_TRef, Msg, Channel) ->
     ?SLOG(warning, #{
         msg => "unexpected_timeout_signal",
@@ -390,7 +409,7 @@ handle_call(
     NConnInfo = ConnInfo#{keepalive => Interval},
     NClientInfo = ClientInfo#{keepalive => Interval},
     NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo},
-    {reply, ok, ensure_keepalive(NChannel)};
+    {reply, ok, [{event, updated}], ensure_keepalive(cancel_timer(idle_timer, NChannel))};
 handle_call(
     {subscribe_from_client, TopicFilter, Qos},
     _From,
@@ -405,21 +424,21 @@ handle_call(
             {reply, {error, ?RESP_PERMISSION_DENY, <<"Authorization deny">>}, Channel};
         _ ->
             {ok, _, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel),
-            {reply, ok, NChannel}
+            {reply, ok, [{event, updated}], NChannel}
     end;
 handle_call({subscribe, Topic, SubOpts}, _From, Channel) ->
     {ok, [{NTopicFilter, NSubOpts}], NChannel} = do_subscribe([{Topic, SubOpts}], Channel),
-    {reply, {ok, {NTopicFilter, NSubOpts}}, NChannel};
+    {reply, {ok, {NTopicFilter, NSubOpts}}, [{event, updated}], NChannel};
 handle_call(
     {unsubscribe_from_client, TopicFilter},
     _From,
     Channel = #channel{conn_state = connected}
 ) ->
     {ok, NChannel} = do_unsubscribe([{TopicFilter, #{}}], Channel),
-    {reply, ok, NChannel};
+    {reply, ok, [{event, updated}], NChannel};
 handle_call({unsubscribe, Topic}, _From, Channel) ->
     {ok, NChannel} = do_unsubscribe([Topic], Channel),
-    {reply, ok, NChannel};
+    {reply, ok, [{event, update}], NChannel};
 handle_call(subscriptions, _From, Channel = #channel{subscriptions = Subs}) ->
     {reply, {ok, maps:to_list(Subs)}, Channel};
 handle_call(
@@ -446,7 +465,7 @@ handle_call(
             {reply, ok, Channel}
     end;
 handle_call(kick, _From, Channel) ->
-    {shutdown, kicked, ok, ensure_disconnected(kicked, Channel)};
+    {reply, ok, [{event, disconnected}, {close, kicked}], Channel};
 handle_call(discard, _From, Channel) ->
     {shutdown, discarded, ok, Channel};
 handle_call(Req, _From, Channel) ->
@@ -671,6 +690,12 @@ reset_timer(Name, Channel) ->
 clean_timer(Name, Channel = #channel{timers = Timers}) ->
     Channel#channel{timers = maps:remove(Name, Timers)}.
 
+cancel_timer(Name, Channel = #channel{timers = Timers}) ->
+    emqx_misc:cancel_timer(maps:get(Name, Timers, undefined)),
+    clean_timer(Name, Channel).
+
+interval(idle_timer, #channel{conninfo = #{idle_timeout := IdleTimeout}}) ->
+    IdleTimeout;
 interval(force_timer, _) ->
     15000;
 interval(alive_timer, #channel{keepalive = Keepalive}) ->
@@ -722,10 +747,10 @@ enrich_clientinfo(InClientInfo = #{proto_name := ProtoName}, ClientInfo) ->
     NClientInfo = maps:merge(ClientInfo, maps:with(Ks, InClientInfo)),
     NClientInfo#{protocol => proto_name_to_protocol(ProtoName)}.
 
-default_conninfo(ConnInfo) ->
+default_conninfo(ConnInfo = #{peername := {PeerHost, PeerPort}}) ->
     ConnInfo#{
         clean_start => true,
-        clientid => undefined,
+        clientid => anonymous_clientid(PeerHost, PeerPort),
         username => undefined,
         conn_props => #{},
         connected => true,
@@ -739,14 +764,15 @@ default_conninfo(ConnInfo) ->
 
 default_clientinfo(#{
     peername := {PeerHost, _},
-    sockname := {_, SockPort}
+    sockname := {_, SockPort},
+    clientid := ClientId
 }) ->
     #{
         zone => default,
         protocol => exproto,
         peerhost => PeerHost,
         sockport => SockPort,
-        clientid => undefined,
+        clientid => ClientId,
         username => undefined,
         is_bridge => false,
         is_superuser => false,
@@ -764,3 +790,15 @@ proto_name_to_protocol(<<>>) ->
     exproto;
 proto_name_to_protocol(ProtoName) when is_binary(ProtoName) ->
     binary_to_atom(ProtoName).
+
+anonymous_clientid(PeerHost, PeerPort) ->
+    iolist_to_binary(
+        [
+            "exproto-anonymous-",
+            inet:ntoa(PeerHost),
+            "-",
+            integer_to_list(PeerPort),
+            "-",
+            emqx_misc:gen_id()
+        ]
+    ).

+ 14 - 1
apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl

@@ -61,7 +61,19 @@ async_call(
     Req = #{conn := Conn},
     Options = #{pool_name := PoolName}
 ) ->
-    cast(pick(PoolName, Conn), {rpc, FunName, Req, Options, self()}).
+    case pick(PoolName, Conn) of
+        false ->
+            ?SLOG(
+                error,
+                #{
+                    msg => "no_available_grpc_client",
+                    function => FunName,
+                    request => Req
+                }
+            );
+        Pid when is_pid(Pid) ->
+            cast(Pid, {rpc, FunName, Req, Options, self()})
+    end.
 
 %%--------------------------------------------------------------------
 %% cast, pick
@@ -72,6 +84,7 @@ async_call(
 cast(Deliver, Msg) ->
     gen_server:cast(Deliver, Msg).
 
+-spec pick(term(), term()) -> pid() | false.
 pick(PoolName, Conn) ->
     gproc_pool:pick_worker(PoolName, Conn).
 

+ 4 - 2
apps/emqx_gateway/test/emqx_exproto_SUITE.erl

@@ -240,8 +240,10 @@ t_keepalive_timeout(Cfg) ->
     send(Sock, ConnBin),
     {ok, ConnAckBin} = recv(Sock, 5000),
 
-    DisconnectBin = frame_disconnect(),
-    {ok, DisconnectBin} = recv(Sock, 10000),
+    %% Timed out connections are closed immediately,
+    %% so there may not be a disconnect message here
+    %%DisconnectBin = frame_disconnect(),
+    %%{ok, DisconnectBin} = recv(Sock, 10000),
 
     SockType =/= udp andalso
         begin

+ 1 - 1
rebar.config

@@ -56,7 +56,7 @@
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}}
     , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.4"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
-    , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.6"}}}
+    , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
     , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.7"}}}
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
     , {replayq, "0.3.4"}